V2EX = way to explore
V2EX 是一个关于分享和探索的地方
现在注册
已注册用户请  登录
The Go Programming Language
http://golang.org/
Go Playground
Go Projects
Revel Web Framework
shaoyie
V2EX  ›  Go 编程语言

[go]golang 的协程池本应该是这样的

  •  
  •   shaoyie · 2023-09-12 15:56:11 +08:00 · 2198 次点击
    这是一个创建于 467 天前的主题,其中的信息可能已经有所发展或是发生改变。

    看过了一下 star 比较高的协程池实现,还有字节开源的实现,完全是 java/c++之类的外行实现思路

    协程/线程池,最基本的元件 就是 队列 + 协程/线程,M:N 模型

    这两个组件在 go 里边天生就有啊,为什么再搞一套 task queue 呢?

    控制队列容量:make(chan, cap) 第二参数就可以

    想要控制协程/线程数量,再辅助一个 chan 就可以了,

    代码实现如下,100 行搞定:

    我把它放到 github 上 gopool 喜欢的老铁可以给个 star

    // GoPool is a minimalistic goroutine pool that provides a pure Go implementation
    type GoPool struct {
    	noCopy
    
    	queueLen atomic.Int32
    	doTaskN  atomic.Int32
    	workerN  atomic.Int32
    	options  Options
    
    	workerSem chan struct{}
    	queue     chan func()
    }
    
    // NewGoPool provite fixed number of goroutines, reusable. M:N model
    //
    // M: the number of reusable goroutines,
    // N: the capacity for asynchronous task queue.
    func NewGoPool(opts ...Option) *GoPool {
    	opt := setOptions(opts...)
    	if opt.minWorkers <= 0 {
    		panic("GoPool: min workers <= 0")
    	}
    	if opt.minWorkers > opt.maxWorkers {
    		panic("GoPool: min workers > max workers")
    	}
    	p := &GoPool{
    		options:   opt,
    		workerSem: make(chan struct{}, opt.maxWorkers),
    		queue:     make(chan func(), opt.queueCap),
    	}
    	for i := int32(0); i < p.options.minWorkers; i++ { // pre spawn
    		p.workerSem <- struct{}{}
    		go p.worker(func() {})
    	}
    	go p.shrink()
    	return p
    }
    
    // QueueFree returns (capacity of task-queue - length of task-queue)
    func (p *GoPool) QueueFree() int {
    	return int(p.options.queueCap - p.queueLen.Load())
    }
    
    // Workers returns current the number of workers
    func (p *GoPool) Workers() int {
    	return int(p.workerN.Load())
    }
    
    // Go submits a task to this pool.
    func (p *GoPool) Go(task func()) {
    	if task == nil {
    		panic("GoPool: Go task is nil")
    	}
    	select {
    	case p.queue <- task:
    		p.queueLen.Add(1)
    	case p.workerSem <- struct{}{}:
    		go p.worker(task)
    	}
    }
    
    func (p *GoPool) worker(task func()) {
    	p.workerN.Add(1)
    	defer func() {
    		<-p.workerSem
    		p.workerN.Add(-1)
    		if e := recover(); e != nil {
    			if p.options.panicHandler != nil {
    				p.options.panicHandler(e)
    			}
    		}
    	}()
    
    	for {
    		task()
    		task = <-p.queue
    		if task == nil {
    			break
    		}
    		p.doTaskN.Add(1)
    		p.queueLen.Add(-1)
    	}
    }
    func (p *GoPool) shrink() {
    	ticker := time.NewTicker(p.options.shrinkPeriod)
    	defer ticker.Stop()
    
    	for {
    		select {
    		case <-ticker.C:
    			doTaskN := p.doTaskN.Load()
    			p.doTaskN.Store(0)
    			if doTaskN < p.options.tasksBelowN {
    				closeN := p.workerN.Load() - p.options.minWorkers
    				for closeN > 0 {
    					p.queue <- nil
    					closeN--
    				}
    			}
    		}
    	}
    }
    
    27 条回复    2023-09-18 18:42:57 +08:00
    csh010101
        1
    csh010101  
       2023-09-12 18:47:52 +08:00
    好好好
    Nazz
        2
    Nazz  
       2023-09-12 19:16:27 +08:00   ❤️ 1
    应该是这样的:

    ```go
    type channel chan struct{}

    func (c channel) add() { c <- struct{}{} }

    func (c channel) done() { <-c }

    func (c channel) Go(f func()) {
    c.add()
    go func() {
    f()
    c.done()
    }()
    }
    ```
    lan171
        3
    lan171  
       2023-09-12 19:25:55 +08:00
    都协程了还要池化?
    ylc
        4
    ylc  
       2023-09-13 00:26:40 +08:00 via Android
    @lan171 并发太大时,代码没写好的话会造成协程数激增,池化能控制数量,而且性能也能得到提升
    lovelylain
        5
    lovelylain  
       2023-09-13 08:24:12 +08:00 via Android
    @Nazz 看上去好像也行,OP 代码这么多是完善了哪些方面?
    shaoyie
        6
    shaoyie  
    OP
       2023-09-13 08:32:40 +08:00
    @lovelylain 这不算多吧,这已经算是最精简的了吧,我就是因为看到别人实现的太复杂了,所以写了一个,功能就是可以定时收缩,加了几个计数而已
    shaoyie
        7
    shaoyie  
    OP
       2023-09-13 08:35:24 +08:00
    @shaoyie 抱歉回复错了
    Nazz
        8
    Nazz  
       2023-09-13 08:47:56 +08:00 via Android
    @lovelylain 最精简的版本,没有之一
    Mohanson
        9
    Mohanson  
       2023-09-13 09:46:44 +08:00   ❤️ 1
    XY 问题.

    都协程了就没必要池化, 协程模型需要的是速率控制. 用令牌桶算法 10 行代码就能解决问题. 谈起协程就自动类比为进程, 然后想当然认为应该有一个"协程池", "完全是 java/c++之类的外行实现思路"
    dyllen
        10
    dyllen  
       2023-09-13 10:42:52 +08:00
    之前看了有的框架有协程池化的组件,看里面的测试结果,除了内存占用优势,性能上没任何优势。
    keakon
        11
    keakon  
       2023-09-13 10:54:11 +08:00
    @shaoyie @Nazz 当队列满的时候,Go() 会阻塞调用线程,而不是加到任务队列里立刻返回,后面空闲了再由 worker 线程去执行。
    troywinter
        12
    troywinter  
       2023-09-13 13:49:06 +08:00
    一个 semaphore 就能解决的问题,不需要写这么多代码
    shaoyie
        13
    shaoyie  
    OP
       2023-09-13 13:50:03 +08:00
    @dyllen 是的,不会有肉眼可见的性能提升,至于协程复用啥的只是减少一些内存分配而已,主要还是控制数量
    shaoyie
        14
    shaoyie  
    OP
       2023-09-13 13:50:44 +08:00
    @troywinter 有 sem 不也得有个 job 队列吗
    shaoyie
        15
    shaoyie  
    OP
       2023-09-13 13:53:18 +08:00
    @Mohanson 只是一个小品,总有它存在的应用场景。
    kneo
        16
    kneo  
       2023-09-13 13:57:01 +08:00 via Android
    国内大厂自己轮的东西基本都是性能驱动。如果不是为了解决他们自己生产环境里遇见的性能问题,应该不会搞这么个东西出来。
    ZSeptember
        17
    ZSeptember  
       2023-09-13 14:05:33 +08:00
    学过 Go 的都能写出这个代码,,但是并不一定能写出那些复杂的 pool 。
    你这个本质上是一个并发控制而已,最基本的优雅关闭也没处理。
    shaoyie
        18
    shaoyie  
    OP
       2023-09-13 14:33:21 +08:00
    @kneo 是的,自己造的轮子肯定是为了解决自己生产环境的问题,不用考虑通用性
    shaoyie
        19
    shaoyie  
    OP
       2023-09-13 14:37:32 +08:00
    @ZSeptember 也不一定,有些人的实现思路就够 gopher ,自己实现一套条件变量 + queue ,我是觉得没必要,go 天生就带这些东西。另外,你说的优雅关闭 是指关闭 pool 吗?我是觉得没必要,本来 go pool 就不是很必须,再加上一个动态的 pool 就更没必要了
    bv
        20
    bv  
       2023-09-13 14:43:28 +08:00
    @Nazz 代码就应该简单直接
    TanKuku
        21
    TanKuku  
       2023-09-13 14:48:35 +08:00 via Android
    不懂就问,也没改 runtime 复用什么东西,就单纯控制数量也叫池了吗?
    Pythoner666666
        22
    Pythoner666666  
       2023-09-13 15:15:27 +08:00
    所有的池化技术,无论是 mysql redis 还是 http 的池化技术核心都是复用连接再然后是控制连接的数量不至于把 server 的连接资源都耗尽,私以为你这顶多算是控制并发。但是如果只需要控制并发你这未免也太繁琐了
    shaoyie
        23
    shaoyie  
    OP
       2023-09-13 16:29:00 +08:00
    @Pythoner666666 是的用个计数器也可以达到效果,但这样不就是更通用一点嘛,
    huija
        24
    huija  
       2023-09-13 19:00:19 +08:00
    官方池化实现不给出来了么,sync.Pool ,如果用不到的,那就说明压根不需要池化(比如协程)
    dyllen
        25
    dyllen  
       2023-09-14 10:55:06 +08:00
    @TanKuku 他这个代码控制了协程数量,也复用了协程,shrink 这个函数就是发消息结束超时生存的协程的,worker 里面没任务会阻塞等待任务来,直到 shrink 发了结束信号退出。
    lysS
        26
    lysS  
       2023-09-15 15:13:37 +08:00
    为什么要控制协程数量?即使要限制也是限制请求并发
    index90
        27
    index90  
       2023-09-18 18:42:57 +08:00
    协程池与最大并发数控制傻傻分不清?
    关于   ·   帮助文档   ·   博客   ·   API   ·   FAQ   ·   实用小工具   ·   6104 人在线   最高记录 6679   ·     Select Language
    创意工作者们的社区
    World is powered by solitude
    VERSION: 3.9.8.5 · 29ms · UTC 02:23 · PVG 10:23 · LAX 18:23 · JFK 21:23
    Developed with CodeLauncher
    ♥ Do have faith in what you're doing.