首页
注册
登录
hook923 最近的时间轴更新
hook923
V2EX 第 262851 号会员,加入于 2017-10-27 03:05:31 +08:00
hook923
提问
技术话题
好玩
工作信息
交易信息
城市相关
hook923 最近回复了
2017-10-28 00:28:59 +08:00
回复了
ray1888
创建的主题
›
Python
›
异步与多 worker 的问题
我有个类似的做法,我是这样解决的。
因为我是多个线程共享一个数据库连接,每个线程都 execute,最后一起提交数据库。因此我在 savedata 中加了个锁
import threading
lock = threading.Lock()
from concurrent.futures import ThreadPoolExecutor
max_workers=64
sock_pool =ThreadPoolExecutor(max_workers=max_workers) #注意这 3 个 max_workers 不必都相同的
chgdata_pool = ThreadPoolExecutor(max_workers=max_workers)
chgdata_future = []
savedata_pool = ThreadPoolExecutor(max_workers=200)
savedata_future = []
def sock(参数):
接收 shock 数据的代码
chgdata_future.append(chgdata_pool.submit(chgdata,参数) ) ##异步委托一个清洗数据的函数 chgdata
其它代码
def chgdata(参数):
清洗数据的代码
savedata_future.append(savedata_pool.submit(savedata,参数)) ##异步委托一个保存数据的函数 savedata
其它代码
def savedata(参数):
保存语句生成
lock.acquire() #加个互斥锁
保存到数据库
lock.release() #释放锁
其它代码
if __name__ == '__main__':
执行 sock()之前的代码
sock_future = sock_pool.submit(sock,参数) for 参数 in 列表] ## 多个 sock 接收数据
for f in sock_future:
f.result()
for f in chgdata_future:
f.result()
for f in saveda'ta_future:
f.result()
conn.commit() #。这步你可以视你的实际需求放在 savadata 中。
每个 sock 接收数据后传递给 chgdata,不必等待 chgdata。每个 chgdata 清洗数据后传递给 savedata,不必等待 savedata。
这应该是楼主想要的效果
»
hook923 创建的更多回复
关于
·
帮助文档
·
博客
·
API
·
FAQ
·
实用小工具
·
1471 人在线
最高记录 6679
·
Select Language
创意工作者们的社区
World is powered by solitude
VERSION: 3.9.8.5 · 11ms ·
UTC 17:22
·
PVG 01:22
·
LAX 09:22
·
JFK 12:22
Developed with
CodeLauncher
♥ Do have faith in what you're doing.