V2EX = way to explore
V2EX 是一个关于分享和探索的地方
现在注册
已注册用户请  登录
推荐学习书目
Learn Python the Hard Way
Python Sites
PyPI - Python Package Index
http://diveintopython.org/toc/index.html
Pocoo
值得关注的项目
PyPy
Celery
Jinja2
Read the Docs
gevent
pyenv
virtualenv
Stackless Python
Beautiful Soup
结巴中文分词
Green Unicorn
Sentry
Shovel
Pyflakes
pytest
Python 编程
pep8 Checker
Styles
PEP 8
Google Python Style Guide
Code Style from The Hitchhiker's Guide
DejaVud
V2EX  ›  Python

请教一个关于任务队列的问题,关于 celery/rq/huey/dramatiq。

  •  
  •   DejaVud · 278 天前 · 1877 次点击
    这是一个创建于 278 天前的主题,其中的信息可能已经有所发展或是发生改变。

    一个典型的 rq 使用示例如下:

    worker.py

    import requests
    
    def count_words_at_url(url):
        resp = requests.get(url)
        return len(resp.text.split())
    

    main.py

    from redis import Redis
    from rq import Queue
    from worker import count_words_at_url
    
    q = Queue(connection=Redis())
    result = q.enqueue(count_words_at_url, 'http://nvie.com')
    

    但是我希望生产者( main.py )和消费者( worker.py )代码是解耦的,就是不要在 main.py 中 from worker import count_words_at_url ,生产者只知道要调用函数名和参数,比如类似下面这样的形式:

    result = q.send_task(task_name="count_words_at_url", args=["http://nvie.com",])
    

    这样,我就可以把 worker 的代码项目单独独立出去,而 main.py 项目里完全不会有 count_words_at_url 函数的相关实现。

    我知道 celery 是可以这样实现的,我的问题是 rq 、huey 、dramatiq 这三个库可以实现出这样吗?

    11 条回复    2024-02-23 17:40:45 +08:00
    DejaVud
        1
    DejaVud  
    OP
       278 天前
    补充一下,我之所以有这样的需求是因为,存在一种这样的场景:
    main.py 是一个 web 后端应用,worker.py 是一个计算任务如 AI 推理,在我的理解里 web 后端代码和 AI 推理代码本应属于不同的 2 个项目,代码不应当糅合在一起,web 代码就不需要知道任务函数是如何实现的。
    Zhuzhuchenyan
        2
    Zhuzhuchenyan  
       278 天前
    Celery 可以实现类似的需求,参考 https://docs.celeryq.dev/en/latest/userguide/canvas.html#signatures
    可以将 Worker 部分完全解耦到另一个项目,只要保持 Celery 配置一致即可
    ospider
        3
    ospider  
       278 天前
    redis lpush/rpop 解君愁,这些工具基本都是过度封装。
    pollux
        4
    pollux  
       278 天前
    借助 gRPC 与任何语言做 rpc 调用
    alexsz
        5
    alexsz  
       278 天前
    rq 本身就支持:
    result = q.send_task(task_name="worker.count_words_at_url", args=["http://nvie.com",])
    DejaVud
        6
    DejaVud  
    OP
       278 天前
    @ospider 哈啊,是的,我就在想要不自己封装一下
    DejaVud
        7
    DejaVud  
    OP
       278 天前
    @pollux 但是 rpc 和消息队列还是有点使用场景区别(同步 vs 异步),我想要消息队列一些特性:异步、解耦、流量削锋。当然,不是说 rpc 不能做。
    DejaVud
        8
    DejaVud  
    OP
       278 天前
    @alexsz 好的好的,还是看文档不仔细了,我这就去试试。
    kice
        9
    kice  
       277 天前
    本质上入队的时候都是把函数名转成字符串。但是把函数放在一起的话,按理是可以提供类型提示(例如 Tab 自动完成)。

    目前看的话 taskiq 做得比较好,其他的任务队列一般般。┑( ̄Д  ̄)┍
    DejaVud
        10
    DejaVud  
    OP
       276 天前
    结贴了,为了之后来到这里搜索答案的人:
    DejaVud
        11
    DejaVud  
    OP
       276 天前
    celery:
    task = signature("xxx.task_name", options={"queue":"queue_name"})
    task.apply_async((args,), expires=expires)

    rq:
    job = q.enqueue("module_xxx.task_fucname", args=[a, b,])
    "module_xxx.task_fucname"是 rq worker 启动的项目目录包函数相对路径

    dramatiq:
    broker = XXXBroker(...)
    msg = Message(queue_name="queue_aaa", actor_name="task_name”, args=...)
    broker.enqueue(msg)
    关于   ·   帮助文档   ·   博客   ·   API   ·   FAQ   ·   实用小工具   ·   5388 人在线   最高记录 6679   ·     Select Language
    创意工作者们的社区
    World is powered by solitude
    VERSION: 3.9.8.5 · 31ms · UTC 07:32 · PVG 15:32 · LAX 23:32 · JFK 02:32
    Developed with CodeLauncher
    ♥ Do have faith in what you're doing.