-
全局任务确保唯一性,并行执行接口请求,重试错误处理
from concurrent.futures import ThreadPoolExecutor import requests import time start = time.time() url = 'https://blog.towavephone.com' def task(name): try: result = requests.get(url) print(name + 'ping ' + url + result.text[0:20]) except Exception as e: print('except:', e) if __name__ == "__main__": pool = ThreadPoolExecutor(100) for i in range(1, 500): pool.submit(task, "请求 %s:" % i) pool.shutdown() end = time.time() print("主线程结束,花费时间为:", end - start)
import queue import threading import time import requests start = time.time() url = 'https://blog.towavephone.com' def task(que): while not que.empty(): name = que.get() try: result = requests.get(url) print(name + 'ping ' + url + result.text[0:20]) except Exception as e: print('except:', e) q = queue.Queue() for i in range(1, 500): q.put('请求 %s:' % i) max_thread = 100 ts = [] for i in range(max_thread): t = threading.Thread(target=task, args=(q,)) t.start() ts.append(t) for t in ts: t.join() end = time.time() print("主线程结束,花费时间为:", end - start)
import asyncio import time import requests import httpx import uvloop start = time.time() url = 'https://blog.towavephone.com' sem = asyncio.Semaphore(100) async def task(name, client): async with sem: try: result = await client.get(url) print(name + 'ping ' + url + result.text[0:20]) except Exception as e: print('except:', e) async def main(): async with httpx.AsyncClient() as client: # 获取 EventLoop: tasks = [] for i in range(1, 500): tasks.append(task('请求 %s:' % i, client)) await asyncio.gather(*tasks) # 执行 coroutine uvloop.install() asyncio.run(main()) end = time.time() print("主线程结束,花费时间为:", end - start)
import asyncio import time import httpx import random # import uvloop start = time.time() url = 'https://blog.towavephone.com' sem = asyncio.Semaphore(1) count = 0 is_success = True async def task(name, client): async with sem: global count global is_success if not is_success: return for i in range(0, 3): try: result = await client.get(url) print(name + 'ping ' + url + result.text[0:20], count) if i == 2 and random.randint(0, 1): print('------------' + name) is_success = False break # raise asyncio.CancelledError('请求' + name + '取消') except Exception as e: print('except:', e) count = count + 1 # return result.text[0:4] async def main(): async with httpx.AsyncClient() as client: # 获取 EventLoop: tasks = [] for i in range(1, 100): tasks.append(task('请求 %s:' % i, client)) result = await asyncio.gather(*tasks) print(result) # uvloop.install() loop = asyncio.get_event_loop() loop.run_until_complete(main()) end = time.time() print("主线程结束,花费时间为:", end - start)
-
diff 文件对比选型过程,
db.bp_policies.find({ '$or': [{ 'status': 2 }] })
耗时过长,git diff 的选型过程 -
https://blog.csdn.net/konkon2012/article/details/115090606 的 6、7 步骤配置 rsync
-
为了让软链接
/var/cloudsim/nas/cloud_simulation_nas/simulation/bp_policy
->/data/nas-prod/cloud_simulation_nas/simulation/bp_policy/
生效 -
https://developer.microsoft.com/en-us/microsoft-edge/tools/vms/ 下载 Virutal Box 的 ova 镜像,密码是 Passw0rd!
-
地图方向反转问题:MutationObserver, transform: scale(1, -1)
-
道路中心线算法(多次方程模拟,中心线取点后端逻辑移到前端)、paper.js 画箭头 https://stackoverflow.com/questions/16991895/draw-line-with-arrow-cap
import matplotlib.pyplot as plt import numpy as np import numpy.polynomial.polynomial as poly # https://stackoverflow.com/questions/49037902/how-to-interpolate-a-line-between-two-other-lines-in-python def interpolate(a1, a2, poly_deg=3, n_points=10, plot=True): min_a1_x, max_a1_x = min(a1[:, 0]), max(a1[:, 0]) new_a1_x = np.linspace(min_a1_x, max_a1_x, n_points) a1_coefs = poly.polyfit(a1[:, 0], a1[:, 1], poly_deg) new_a1_y = poly.polyval(new_a1_x, a1_coefs) min_a2_x, max_a2_x = min(a2[:, 0]), max(a2[:, 0]) new_a2_x = np.linspace(min_a2_x, max_a2_x, n_points) a2_coefs = poly.polyfit(a2[:, 0], a2[:, 1], poly_deg) new_a2_y = poly.polyval(new_a2_x, a2_coefs) midx = [np.mean([new_a1_x[i], new_a2_x[i]]) for i in range(n_points)] midy = [np.mean([new_a1_y[i], new_a2_y[i]]) for i in range(n_points)] if plot: plt.plot(a1[:, 0], a1[:, 1], c='black') plt.plot(a2[:, 0], a2[:, 1], c='black') plt.plot(midx, midy, '--', c='black') plt.show() return [[x, y] for x, y in zip(midx, midy)] a1 = np.array( [ [ 113.38226806372708, 23.162344275038354 ], [ 113.3822851628109, 23.163151871405752 ], [ 113.38228818029653, 23.163380278265958 ], [ 113.3822873421064, 23.163461918002366 ], [ 113.38228516281187, 23.163597788651938 ], [ 113.38227460161451, 23.16384656353636 ], [ 113.38225507178088, 23.16411109639837 ], [ 113.3822487853537, 23.164175637052246 ] ] ) a2 = np.array( [ [ 113.38236629963569, 23.16418460568882 ], [ 113.38238390163173, 23.163970028969388 ], [ 113.38239580393373, 23.16377364097953 ], [ 113.38239580393373, 23.16377364097953 ], [ 113.38239823468551, 23.16371362655323 ], [ 113.38239823468551, 23.16371362655323 ], [ 113.3824045211124, 23.163508018469752 ], [ 113.3824045211118, 23.16320585086209 ], [ 113.3824045211118, 23.16320585086209 ], [ 113.38240284473103, 23.163134185590337 ], [ 113.38240284473103, 23.163134185590337 ], [ 113.38239907287435, 23.162987502285524 ], [ 113.38239907287435, 23.162987502285524 ], [ 113.38239748031269, 23.162955483415537 ], [ 113.38239748031269, 23.162955483415537 ], [ 113.38238549419019, 23.162341592829492 ] ] ) result = interpolate(a1, a2) print('result', result)
-
umi 新架构配置
-
https://www.cyberciti.biz/faq/vim-vi-text-editor-save-file-without-root-permission/ vim 保存文件在 root 下
-
解压 zip 文件中文乱码,需要使用这个命令
unzip -O CP936 ./tmp.zip
-
policy 聚合 mongodb 写法
-
python 时间转换过滤
-
汇总 sql 优化过程
-
深入浅出:如何正确使用 protobuf https://zhuanlan.zhihu.com/p/406832315
-
import asyncio import time import httpx import random # import uvloop start = time.time() url = 'https://blog.towavephone.com' sem = asyncio.Semaphore(100) count = 0 is_success = True async def task(name, client): async with sem: global count global is_success if not is_success: return for i in range(0, 3): try: result = await client.get(url) if result.status_code == 200: print(name + 'ping ' + url + result.text[0:20], count) break if i == 2: print('------------' + name) is_success = False break # raise asyncio.CancelledError('请求' + name + '取消') except Exception as e: print('Exception:', name, e) count = count + 1 return result.text[0:4] async def main(): async with httpx.AsyncClient() as client: # 获取 EventLoop: tasks = [] for i in range(1, 100): tasks.append(task('请求 %s:' % i, client)) result = await asyncio.gather(*tasks) print(result) # uvloop.install() loop = asyncio.get_event_loop() loop.run_until_complete(main()) end = time.time() print("主线程结束,花费时间为:", end - start)
import asyncio import time import httpx import random # import uvloop start = time.time() url = 'https://blog.towavephone.com' count = 0 is_success = True async def task(name, client): global count global is_success if not is_success: return for i in range(0, 3): try: result = await client.get(url) if result.status_code == 200: print(name + 'ping ' + url + result.text[0:20], count) break if i == 2: print('------------' + name) is_success = False break # raise asyncio.CancelledError('请求' + name + '取消') except Exception as e: print('Exception:', name, e) count = count + 1 return result.text[0:4] # async def main(): # async with httpx.AsyncClient() as client: # # 获取 EventLoop: # tasks = [] # for i in range(1, 100): # tasks.append(task('请求 %s:' % i, client)) # result = await asyncio.gather(*tasks) # print(result) # uvloop.install() # download(code) is the same async def main(): async with httpx.AsyncClient() as client: no_concurrent = 1 dltasks = set() for i in range(100): if len(dltasks) >= no_concurrent: # Wait for some download to finish before adding a new one _done, dltasks = await asyncio.wait(dltasks, return_when=asyncio.FIRST_COMPLETED) dltasks.add(asyncio.create_task(task('请求 %s:' % i, client))) print('----323') print('-4345') # Wait for the remaining downloads to finish await asyncio.wait(dltasks) loop = asyncio.get_event_loop() loop.run_until_complete(main()) end = time.time() print("主线程结束,花费时间为:", end - start)
-
线宽问题解决,需要使用 threejs 的 line2 组件以及对应的 降级策略,即识别
gl.ALIASED_LINE_WIDTH_RANGE
的MAX_LINE_WIDTH
是否大于 1,大于 1 的情况下才需要使用 line2 -
python schedule 任务并行执行
-
解决一直打印 browerslist 相关提示,设置
BROWSERSLIST_IGNORE_OLD_DATA=true
环境变量 -
多线程提交任务时候必须调用 result() 方法,否则不会抛出异常
from concurrent.futures import ThreadPoolExecutor, wait def task(n): print("Processing {}".format(n)) with ThreadPoolExecutor() as executor: futures = [ executor.submit(task, 1), executor.submit(task, 2), ] wait(futures) for fut in futures: # 这句话必须使用,否则不会抛出异常 fut.result()
-
对于 border-radius: 50% 且分布范围较大的框,box-shadow 的形状应为圆形,对应链接
阅读上一篇
英语单词复习
2022-10-19 22:08:29
阅读下一篇
UI学习笔记
2022-05-19 22:41:03