V2EX = way to explore
V2EX 是一个关于分享和探索的地方
现在注册
已注册用户请  登录
V2EX  ›  hook923  ›  全部回复第 1 页 / 共 1 页
回复总数  1
2017-10-28 00:28:59 +08:00
回复了 ray1888 创建的主题 Python 异步与多 worker 的问题
我有个类似的做法,我是这样解决的。
因为我是多个线程共享一个数据库连接,每个线程都 execute,最后一起提交数据库。因此我在 savedata 中加了个锁

import threading
lock = threading.Lock()

from concurrent.futures import ThreadPoolExecutor

max_workers=64
sock_pool =ThreadPoolExecutor(max_workers=max_workers) #注意这 3 个 max_workers 不必都相同的
chgdata_pool = ThreadPoolExecutor(max_workers=max_workers)
chgdata_future = []
savedata_pool = ThreadPoolExecutor(max_workers=200)
savedata_future = []

def sock(参数):
接收 shock 数据的代码
chgdata_future.append(chgdata_pool.submit(chgdata,参数) ) ##异步委托一个清洗数据的函数 chgdata
其它代码
def chgdata(参数):
清洗数据的代码
savedata_future.append(savedata_pool.submit(savedata,参数)) ##异步委托一个保存数据的函数 savedata
其它代码
def savedata(参数):
保存语句生成
lock.acquire() #加个互斥锁
保存到数据库
lock.release() #释放锁
其它代码

if __name__ == '__main__':
执行 sock()之前的代码
sock_future = sock_pool.submit(sock,参数) for 参数 in 列表] ## 多个 sock 接收数据
for f in sock_future:
f.result()
for f in chgdata_future:
f.result()
for f in saveda'ta_future:
f.result()

conn.commit() #。这步你可以视你的实际需求放在 savadata 中。

每个 sock 接收数据后传递给 chgdata,不必等待 chgdata。每个 chgdata 清洗数据后传递给 savedata,不必等待 savedata。
这应该是楼主想要的效果
关于   ·   帮助文档   ·   博客   ·   API   ·   FAQ   ·   实用小工具   ·   2717 人在线   最高记录 6679   ·     Select Language
创意工作者们的社区
World is powered by solitude
VERSION: 3.9.8.5 · 9ms · UTC 04:14 · PVG 12:14 · LAX 20:14 · JFK 23:14
Developed with CodeLauncher
♥ Do have faith in what you're doing.