V2EX = way to explore
V2EX 是一个关于分享和探索的地方
现在注册
已注册用户请  登录
爱意满满的作品展示区。
Nazz
V2EX  ›  分享创造

golang 分享: 60 行代码巧妙实现一个高性能无 channel 任务队列

  •  
  •   Nazz · 2023-03-03 20:12:39 +08:00 · 4625 次点击
    这是一个创建于 612 天前的主题,其中的信息可能已经有所发展或是发生改变。

    话不多说, 先上测试数据, 在各种负载下均有良好表现:

    // small task
    const (
    	PoolSize   = 16
    	BenchTimes = 1000
    	N          = 1000
    )
    
    goos: darwin
    goarch: arm64
    pkg: bench
    BenchmarkGwsWorkerQueue
    BenchmarkGwsWorkerQueue-8   	    3302	    357841 ns/op	   55977 B/op	    2053 allocs/op
    BenchmarkGopool
    BenchmarkGopool-8           	    4426	    319383 ns/op	   20000 B/op	    1173 allocs/op
    BenchmarkAnts
    BenchmarkAnts-8             	    3026	    399899 ns/op	   16047 B/op	    1001 allocs/op
    BenchmarkNbio
    BenchmarkNbio-8             	    4314	    259668 ns/op	   48028 B/op	    3000 allocs/op
    PASS
    
    // medium task
    const (
    	PoolSize   = 16
    	BenchTimes = 1000
    	N          = 10000
    )
    
    goos: darwin
    goarch: arm64
    pkg: bench
    BenchmarkGwsWorkerQueue
    BenchmarkGwsWorkerQueue-8   	    1491	    808853 ns/op	   57635 B/op	    2008 allocs/op
    BenchmarkGopool
    BenchmarkGopool-8           	    1377	    870051 ns/op	   17266 B/op	    1029 allocs/op
    BenchmarkAnts
    BenchmarkAnts-8             	     886	   1324236 ns/op	   16054 B/op	    1001 allocs/op
    BenchmarkNbio
    BenchmarkNbio-8             	    1324	    836092 ns/op	   48000 B/op	    3000 allocs/op
    PASS
    
    // large task
    const (
    	PoolSize   = 16
    	BenchTimes = 1000
    	N          = 100000
    )
    
    goos: darwin
    goarch: arm64
    pkg: bench
    BenchmarkGwsWorkerQueue
    BenchmarkGwsWorkerQueue-8   	     193	   6026196 ns/op	   58162 B/op	    2004 allocs/op
    BenchmarkGopool
    BenchmarkGopool-8           	     178	   6942255 ns/op	   17108 B/op	    1019 allocs/op
    BenchmarkAnts
    BenchmarkAnts-8             	     174	   6300705 ns/op	   16157 B/op	    1002 allocs/op
    BenchmarkNbio
    BenchmarkNbio-8             	     176	   7084957 ns/op	   48071 B/op	    2995 allocs/op
    PASS
    

    测试代码 Benchmark

    代码实现

    package bench
    
    import (
    	"sync"
    )
    
    type (
    	WorkerQueue struct {
    		mu             *sync.Mutex // 锁
    		q              []Job       // 任务队列
    		maxConcurrency int32       // 最大并发
    		curConcurrency int32       // 当前并发
    	}
    
    	Job func()
    )
    
    // NewWorkerQueue 创建一个任务队列
    func NewWorkerQueue(maxConcurrency int32) *WorkerQueue {
    	return &WorkerQueue{
    		mu:             &sync.Mutex{},
    		maxConcurrency: maxConcurrency,
    		curConcurrency: 0,
    	}
    }
    
    // 获取一个任务
    func (c *WorkerQueue) getJob(delta int32) Job {
    	c.mu.Lock()
    	defer c.mu.Unlock()
    	c.curConcurrency += delta
    	if c.curConcurrency >= c.maxConcurrency {
    		return nil
    	}
    	if n := len(c.q); n == 0 {
    		return nil
    	}
    	var result = c.q[0]
    	c.q = c.q[1:]
    	c.curConcurrency++
    	return result
    }
    
    // 递归地执行任务
    func (c *WorkerQueue) do(job Job) {
    	job()
    	if nextJob := c.getJob(-1); nextJob != nil {
    		go c.do(nextJob)
    	}
    }
    
    // Push 追加任务, 有资源空闲的话会立即执行
    func (c *WorkerQueue) Push(job Job) {
    	c.mu.Lock()
    	c.q = append(c.q, job)
    	c.mu.Unlock()
    	if item := c.getJob(0); item != nil {
    		go c.do(item)
    	}
    }
    

    如果觉得对你有帮助, 麻烦给 gws 点个赞吧:)

    29 条回复    2023-03-23 16:32:22 +08:00
    ihciah
        1
    ihciah  
       2023-03-03 22:40:45 +08:00
    说实话这代码我是真没看懂。。
    Mitt
        2
    Mitt  
       2023-03-03 22:45:51 +08:00   ❤️ 5
    不要 channel 反手加了个锁可还行
    Glauben
        3
    Glauben  
       2023-03-03 22:56:00 +08:00
    感觉就是普通的做法,不太理解这样的写法有什么特别的,少的时间是从功能削减上得来的吧。
    Nazz
        4
    Nazz  
    OP
       2023-03-03 23:15:51 +08:00 via Android
    @ihciah 递归
    Nazz
        5
    Nazz  
    OP
       2023-03-03 23:20:29 +08:00 via Android
    @Glauben 加上 recover 结果也差不多
    Nazz
        6
    Nazz  
    OP
       2023-03-03 23:22:29 +08:00 via Android
    @Mitt mutex 比 channel 轻量
    littlewing
        7
    littlewing  
       2023-03-04 00:15:29 +08:00
    巧妙?
    妹想到啊 妹想到啊
    hsfzxjy
        8
    hsfzxjy  
       2023-03-04 00:32:04 +08:00 via Android   ❤️ 1
    建议把 q 当成循环队列,复用前面空的位置,可以减少 alloc 次数
    maocat
        9
    maocat  
       2023-03-04 00:58:38 +08:00
    @hsfzxjy 既然循环队列有了,要不再加一个 sendq 和 recvq ,直接从对应的 g 上操作 \dog
    voidmnwzp
        10
    voidmnwzp  
       2023-03-04 01:26:25 +08:00 via iPhone
    这跟 go 有啥关系啊 没了 channel 任何语言都能更轻松实现啊
    Trim21
        11
    Trim21  
       2023-03-04 01:27:30 +08:00 via Android
    只看标题我以为是没 mutex 的…
    securityCoding
        12
    securityCoding  
       2023-03-04 02:30:21 +08:00 via Android
    看看 ring buffer 无锁队列实现方式。。。
    Nazz
        13
    Nazz  
    OP
       2023-03-04 04:26:31 +08:00 via Android
    @hsfzxjy 是一个优化点
    Nazz
        14
    Nazz  
    OP
       2023-03-04 04:28:30 +08:00 via Android
    @voidmnwzp 你不妨说明白点具体是什么语言
    Nazz
        15
    Nazz  
    OP
       2023-03-04 04:51:15 +08:00 via Android
    @hsfzxjy
    @securityCoding
    其实最完美的结构是 stack. 我想到一种优化方式,先 push, 然后 swap(q[0], q[n-1]), 最后 pop
    Nazz
        16
    Nazz  
    OP
       2023-03-04 05:01:22 +08:00 via Android
    @Nazz 这种方式不能保证 fifo ,并发小的话还是 heap 好点
    rrfeng
        17
    rrfeng  
       2023-03-04 07:42:04 +08:00 via Android
    benchmark 没有对比 channel 的吗?
    Nazz
        18
    Nazz  
    OP
       2023-03-04 07:56:16 +08:00 via Android
    @rrfeng 另外几种"协程池"都用了 channel
    chuanqirenwu
        19
    chuanqirenwu  
       2023-03-04 21:31:18 +08:00
    gws 的思路是自己实现一个极简的 eventloop ,而不用 go 自带的协程机制,从而没有什么额外的开销,提高性能?
    Nazz
        20
    Nazz  
    OP
       2023-03-04 23:13:29 +08:00 via Android
    @chuanqirenwu 同步模式没开额外协程,异步模式会开非常驻的协程,执行完任务就退出, 两种模式都没使用 channel.
    MindMindMax
        21
    MindMindMax  
       2023-03-05 04:16:09 +08:00
    对于常规项目价值在哪? channel 的价值又在哪?
    Nazz
        22
    Nazz  
    OP
       2023-03-05 07:43:55 +08:00 via Android
    @MindMindMax 尽量使用 mutex 替代 chan. 很多时候保证线程安全就行了,不需要多线程通信. channel 我用得最多的地方是线程同步和超时控制.
    Nazz
        23
    Nazz  
    OP
       2023-03-05 08:18:48 +08:00
    @chuanqirenwu 确实有 EventLoop. 最开始我是模仿的 JS, 因为我认为 JS WebSocket API 比 gorilla/nhooyr 这些提供的都要清晰得多. 初版只有 Sync IO, Read=>Event Handler=>Write 循环往复. 后面在此基础上加了 Async IO, AIO 模式在每个连接上有读写两个任务队列(并发度分别是 N 和 1), 就是我分享的这个实现, 它需要足够的轻量. 两种模式压测表现都比 gorilla 好得多, 原因大概是 Parser 本身的简单高效和无额外常驻协程吧, 如果有, 协程数量会增加一倍.
    chuanqirenwu
        24
    chuanqirenwu  
       2023-03-05 12:45:03 +08:00   ❤️ 1
    @Nazz 👍,虽然不怎么搞 go ,但感觉这个思路挺不错的。我看 README 的简介写的是 go websocket server ,是只支持 server 端吗? client 端没有实现?
    Nazz
        25
    Nazz  
    OP
       2023-03-05 13:16:43 +08:00 via Android
    @chuanqirenwu 刚实现的 client ,还在测试
    rockuw
        26
    rockuw  
       2023-03-13 10:21:57 +08:00
    mutex 是比 channel 轻量,但是每个 job 新建一个 goroutine 也是有代价的。一个简单的固定 goroutine 数量的实现,测试结果还稍微好一些,分配次数则明显更低:

    ```
    N=10000
    goos: linux
    goarch: amd64
    pkg: muwu.com/example/workerqueue
    cpu: Intel(R) Xeon(R) CPU E5-2682 v4 @ 2.50GHz
    BenchmarkGwsWorkerQueue-8 903 1310335 ns/op 55471 B/op 2010 allocs/op
    BenchmarkGopool-8 897 1394589 ns/op 17926 B/op 1059 allocs/op
    BenchmarkAnts-8 1203 1020211 ns/op 16046 B/op 1001 allocs/op
    BenchmarkNbio-8 956 1278696 ns/op 48017 B/op 2999 allocs/op
    BenchmarkChan-8 1004 1181569 ns/op 16016 B/op 1001 allocs/op
    ```

    ```go
    type workerQueueV1 struct {
    maxConn int
    queue chan Job
    }

    func newWorkerQueueV1(n int) *workerQueueV1 {
    wq := &workerQueueV1{
    maxConn: n,
    queue: make(chan Job, 1024),
    }
    for i := 0; i < n; i++ {
    go func() {
    for job := range wq.queue {
    job()
    }
    }()
    }
    return wq
    }

    func (wq *workerQueueV1) Push(job Job) {
    wq.queue <- job
    }
    ```
    Nazz
        27
    Nazz  
    OP
       2023-03-13 11:19:56 +08:00
    @rockuw 从你的 Benchmark 结果来看, 差距不大. GwsWorkqueue 是专门为 IO 任务设计的, 每个 WebSocket 连接上有读写两个任务队列, 它们非常轻量, 而且并行读写不会新增常驻协程. 量变产生质变, 每个连接上都增加常驻协程会使 CPU 使用率提高不少. 实际业务中并发不会很高, 可以用优先队列替代普通队列减少 allocs, 收益不高我懒得去优化了, 复用 goroutine 对于 IO 任务收益也不大.
    ClarkAbe
        28
    ClarkAbe  
       2023-03-23 14:49:55 +08:00 via Android
    感觉每次都起一个协程有点浪费...
    Nazz
        29
    Nazz  
    OP
       2023-03-23 16:32:22 +08:00
    @ClarkAbe 为了防止栈溢出. 如果做了容量限制, 可以不开新协程.
    关于   ·   帮助文档   ·   博客   ·   API   ·   FAQ   ·   实用小工具   ·   973 人在线   最高记录 6679   ·     Select Language
    创意工作者们的社区
    World is powered by solitude
    VERSION: 3.9.8.5 · 26ms · UTC 22:04 · PVG 06:04 · LAX 14:04 · JFK 17:04
    Developed with CodeLauncher
    ♥ Do have faith in what you're doing.