V2EX = way to explore
V2EX 是一个关于分享和探索的地方
现在注册
已注册用户请  登录
推荐学习书目
Learn Python the Hard Way
Python Sites
PyPI - Python Package Index
http://diveintopython.org/toc/index.html
Pocoo
值得关注的项目
PyPy
Celery
Jinja2
Read the Docs
gevent
pyenv
virtualenv
Stackless Python
Beautiful Soup
结巴中文分词
Green Unicorn
Sentry
Shovel
Pyflakes
pytest
Python 编程
pep8 Checker
Styles
PEP 8
Google Python Style Guide
Code Style from The Hitchhiker's Guide
zhanghk668
V2EX  ›  Python

python 如何实现协同任务

  •  
  •   zhanghk668 · 2016-08-05 02:16:21 +08:00 · 3130 次点击
    这是一个创建于 3040 天前的主题,其中的信息可能已经有所发展或是发生改变。

    新手,这种问题估计笔记简单。公司准备弄一个小东西。可能会使用到这种方式。简单的说,就是任务对某条件的处理。然后分成不同的任务进行处理,然后再把结果汇总。 比如

    有一个切人点,姑且叫做 TASK , TASK 里面有 TASK 这个任务需要对我们的输入进行处理。 TASK 中发现有一个满足了预期条件 case1 的结果 result1 ,就把这个结果 result1 丢进进入到 task1 里面去执行,然后还不影响 task1 的继续执行。如果发现后续满足的结果 result2,result3 等,都继续进入 task2 里面去执行。如果发现有一个不满足 case1 却匹配到了 case2 得到的结果 res1 ,就把这个 res1 丢入到 task3 里面去进行...最后把全部的结果给汇总

    每匹配一个新的 case 都不会影响到原本到 task 的持续进行。而且可能每一个 task{num}都会跟原本的 task 差不多的类似的处理机制。

    		|	CASE1
    		--------------TASK1----------
    		|	CASE2					    |
    TASK--------------TASK2----------RESULT
    		|	CASE3			 		   |
    		--------------TASK3----------
    

    起初打算以多线程+多进程来实现,可是发现不会制定每一个进程配发几个线程。然后不知道这种怎么进行,不知道有没有解决方式呢?如果有可否给个 demo 研究下

    12 条回复    2016-08-05 10:35:17 +08:00
    HFcbyqP0iVO5KM05
        1
    HFcbyqP0iVO5KM05  
       2016-08-05 07:07:52 +08:00
    你能再稍微解释一下 不影响 task1 的继续执行 是什么意思吗?
    在没拿到 result1 的时候 task1 是 yield 了还是作为一个独立线程在跑呢?
    necomancer
        2
    necomancer  
       2016-08-05 07:34:36 +08:00
    没明白一点,是运行 task 时,输入经过 case1 ,被 task1 处理以后返回值如果不满足 case1 然后满足 case2 就扔给 task3 吗?
    clino
        3
    clino  
       2016-08-05 08:07:07 +08:00
    不同的 task 需要跑在多核上吗? 还是单核也是可以的?
    Zuckonit
        4
    Zuckonit  
       2016-08-05 09:05:30 +08:00
    队列
    huluhulu
        5
    huluhulu  
       2016-08-05 09:14:15 +08:00
    queue+lock
    namco1992
        6
    namco1992  
       2016-08-05 09:16:31 +08:00
    个人建议以任务队列的形式实现。从你的描述上来看,一个 task 有可能对应了多个处理函数的入口,我建议统一入口,由入口函数再来进行路由分发到相应函数,保持一个输入一个输出的队列形式,比较容易实现。

    至于实现方式, python 可参考 rq 任务队列, http://python-rq.org/

    其实也可以自己实现,我司的轻量级监控系统就是这么实现的,也没有利用第三方库,参考了 scrapy 的 pipeline ,如下所示,将处理函数按顺序写入配置文件,触发时依次执行即可,每项任务都会 fork 一个子线程。

    'pipelines': [
    'monitor_platform.src.pipelines.loan_repay_pipelines.get_monitor_data',
    'monitor_platform.src.pipelines.loan_repay_pipelines.deal_monitor_data',
    'monitor_platform.src.pipelines.loan_repay_pipelines.send_content',
    'monitor_platform.src.pipelines.common_pipeline.send_mail',
    ],
    jixiangqd
        7
    jixiangqd  
       2016-08-05 09:38:08 +08:00
    celery 并行任务框架
    jixiangqd
        8
    jixiangqd  
       2016-08-05 09:39:00 +08:00
    如果任务比较重也可以自己搭 Hadoop , spark 之类的,也支持 python
    harry890829
        9
    harry890829  
       2016-08-05 09:41:42 +08:00
    我是开发 c/c++的,看了下,感觉就是采取多进程和任务队列来解决,一个派发,一群工作,一个合并,大概是这个结构吧
    windfarer
        10
    windfarer  
       2016-08-05 09:47:10 +08:00 via Android
    celery +1
    zhanghk668
        11
    zhanghk668  
    OP
       2016-08-05 10:12:40 +08:00
    @necomancer 就是匹配每一个 case ,如果满足就进入对应的 task,如果不满足就 pass 掉
    yufpga
        12
    yufpga  
       2016-08-05 10:35:17 +08:00
    rabbitmq+celery
    关于   ·   帮助文档   ·   博客   ·   API   ·   FAQ   ·   实用小工具   ·   1044 人在线   最高记录 6679   ·     Select Language
    创意工作者们的社区
    World is powered by solitude
    VERSION: 3.9.8.5 · 22ms · UTC 23:20 · PVG 07:20 · LAX 15:20 · JFK 18:20
    Developed with CodeLauncher
    ♥ Do have faith in what you're doing.