V2EX = way to explore
V2EX 是一个关于分享和探索的地方
现在注册
已注册用户请  登录
mutoudaren
V2EX  ›  Node.js

node 怎么实现 工作队列?

  •  
  •   mutoudaren · 2021-07-10 12:40:39 +08:00 · 5773 次点击
    这是一个创建于 1266 天前的主题,其中的信息可能已经有所发展或是发生改变。

    用 nodejs 简单的搭建了一个 http sever,可以处理 post 请求,执行比如“job1(post)”,“job2(post)”这样的请求,但是这些不同的 job 只能同步执行,也就是一个执行完了再执行另一个。

    所以有没有简单的实现,先把 post 请求存到队列里面,然后依次执行?

    第 1 条附言  ·  2021-07-10 13:53:30 +08:00
    我的意思是,http server 处理 post 请求时,会立马执行 job(post)函数( job 本身就是 async function )。所以收到两个连续的 post 请求时,会立马执行两个连续的 job(post)。由于 job(post)是 async function,所以会“同时执行”,互相产生冲突。现在只想让 job 执行完一个再执行下一个。
    27 条回复    2021-07-19 17:13:28 +08:00
    opentrade
        1
    opentrade  
       2021-07-10 12:49:40 +08:00
    类似 WSGI 的东西吧,Python 的比较多,nodejs 的没用过
    dengshen
        2
    dengshen  
       2021-07-10 13:04:36 +08:00 via iPhone
    rx 。流编程
    beginor
        3
    beginor  
       2021-07-10 13:06:20 +08:00 via Android
    promise ?
    Rrrrrr
        4
    Rrrrrr  
       2021-07-10 13:14:34 +08:00
    事件驱动了解一下?
    shiny
        5
    shiny  
       2021-07-10 13:21:17 +08:00
    Async.js
    chenqh
        6
    chenqh  
       2021-07-10 13:34:49 +08:00
    没懂
    streamrx
        7
    streamrx  
       2021-07-10 13:40:42 +08:00 via iPhone
    async 不就行了
    Sanonz
        8
    Sanonz  
       2021-07-10 14:04:51 +08:00 via iPhone
    NemoAlex
        9
    NemoAlex  
       2021-07-10 14:24:41 +08:00   ❤️ 1
    usw
        10
    usw  
       2021-07-10 14:32:10 +08:00
    桥豆麻袋,“同时执行”和“”产生冲突“具体是啥。async 函数如果没有 await 可咋整
    qping
        11
    qping  
       2021-07-10 14:32:41 +08:00
    web 请求天然就是异步的,想要搞成同步就把消息存在队列里面,然后另外搞个线程处理这个队列
    MegrezZhu
        12
    MegrezZhu  
       2021-07-10 14:44:20 +08:00
    同时执行产生冲突——那冲突的资源是什么? CPU ? IO ?

    如果只是想一个个执行的话,post handler 就只接收任务然后加到一个 job queue 里呗,然后自己起一个 worker 一个个拿去执行,这是最 naive 的方法……
    debuggerx
        13
    debuggerx  
       2021-07-10 15:03:55 +08:00 via Android
    听描述 应该是客户端 也就是请求端做串行吧
    job1 请求拿到返回了再去执行 job2 请求就好了
    lujjjh
        14
    lujjjh  
       2021-07-10 15:05:29 +08:00   ❤️ 3
    muzuiget
        15
    muzuiget  
       2021-07-10 16:19:21 +08:00
    把 job 放到一个包成一个 promise,把这些 promise 放到一个数组里,然后一个个 await 这个数组的元素。
    Exin
        16
    Exin  
       2021-07-10 16:28:19 +08:00
    连数组都不需要,最简单的实现如下

    ```js
    let last = Promise.resolve()
    onPost(post => last = last.then(() => job(post)))
    ```
    love
        17
    love  
       2021-07-10 16:50:41 +08:00
    我自己的小服务程序也是这样。
    收到请求,放在内存中请求列表,同时在硬盘上存一份 JSON 请求,然后另一个 async 函数会循环处理请求列表并删除硬盘上的对应文件。当程序重启时会加载硬盘上的所有请求,以免不小心程序各种原因重启时漏掉队列中的请求。
    April5
        18
    April5  
       2021-07-10 17:39:18 +08:00
    bull
    duan602728596
        19
    duan602728596  
       2021-07-11 10:23:03 +08:00
    当初为了实现多文件上传并限制同时上传数,写的一个实现队列的方法,你可以拿去参考参考
    https://github.com/duan602728596/Q/blob/main/src/Queue.ts
    zbinlin
        20
    zbinlin  
       2021-07-11 14:05:14 +08:00
    用 async generator 来实现比较简单

    //gist.github.com/zbinlin/541ccb9cf6e9c07f98c7605648c7ea34
    zhennann
        21
    zhennann  
       2021-07-11 16:36:05 +08:00
    两种思路:
    1 、采用分布式锁 redlock
    2 、采用 redis 做消息队列
    cszchen
        22
    cszchen  
       2021-07-11 22:26:45 +08:00 via iPhone
    node 天生就是异步啊,不需要你额外封装,大家都是头疼怎么同步,所以才有了 Promise
    libook
        23
    libook  
       2021-07-12 10:11:13 +08:00   ❤️ 1
    需求是希望收到的所有请求之间按照顺序一个一个执行是吧?

    可以用消息队列的思想,就是来请求会异步插入到一个队列里,然后单线程一个一个地消费。

    最简陋的做法是在一个 module 的根作用域下声明一个 Array,然后每次来请求都 unshift 一个消息,服务里单独跑一个循环去从数组里 pop 消息出来处理,处理完再 pop 下一个。

    有分布式需求的话就用 Redis 、RabbitMQ 之类的消息队列中间件,可以启动一个集群服务作为生产者往队列里插消息,然后启动一个消费者服务来同步一条条消费。
    soraping
        24
    soraping  
       2021-07-12 10:19:32 +08:00
    事件循环吧,已经内置了 uvlib,io 请求自动进入到事件队列里
    jianhua
        25
    jianhua  
       2021-07-12 14:53:37 +08:00
    多台服务器的话,需要一个消息队列,因为你要维护简单,redis 就可以了,实现一个简单的 pub/sub 。

    单台服务器,生产者消费者写过没有。。。
    tanrunhao
        26
    tanrunhao  
       2021-07-12 15:35:31 +08:00
    最简单是搞个 全局数组, 启动时候 setInterval 轮询数组执行, 然后请求时候把任务放进去。 这样基本满足需求
    xcstream
        27
    xcstream  
       2021-07-19 17:13:28 +08:00
    async function(){
    await f1()
    await f2()
    } ()
    关于   ·   帮助文档   ·   博客   ·   API   ·   FAQ   ·   实用小工具   ·   5554 人在线   最高记录 6679   ·     Select Language
    创意工作者们的社区
    World is powered by solitude
    VERSION: 3.9.8.5 · 27ms · UTC 06:47 · PVG 14:47 · LAX 22:47 · JFK 01:47
    Developed with CodeLauncher
    ♥ Do have faith in what you're doing.