话不多说, 先上测试数据, 在各种负载下均有良好表现:
// small task
const (
PoolSize = 16
BenchTimes = 1000
N = 1000
)
goos: darwin
goarch: arm64
pkg: bench
BenchmarkGwsWorkerQueue
BenchmarkGwsWorkerQueue-8 3302 357841 ns/op 55977 B/op 2053 allocs/op
BenchmarkGopool
BenchmarkGopool-8 4426 319383 ns/op 20000 B/op 1173 allocs/op
BenchmarkAnts
BenchmarkAnts-8 3026 399899 ns/op 16047 B/op 1001 allocs/op
BenchmarkNbio
BenchmarkNbio-8 4314 259668 ns/op 48028 B/op 3000 allocs/op
PASS
// medium task
const (
PoolSize = 16
BenchTimes = 1000
N = 10000
)
goos: darwin
goarch: arm64
pkg: bench
BenchmarkGwsWorkerQueue
BenchmarkGwsWorkerQueue-8 1491 808853 ns/op 57635 B/op 2008 allocs/op
BenchmarkGopool
BenchmarkGopool-8 1377 870051 ns/op 17266 B/op 1029 allocs/op
BenchmarkAnts
BenchmarkAnts-8 886 1324236 ns/op 16054 B/op 1001 allocs/op
BenchmarkNbio
BenchmarkNbio-8 1324 836092 ns/op 48000 B/op 3000 allocs/op
PASS
// large task
const (
PoolSize = 16
BenchTimes = 1000
N = 100000
)
goos: darwin
goarch: arm64
pkg: bench
BenchmarkGwsWorkerQueue
BenchmarkGwsWorkerQueue-8 193 6026196 ns/op 58162 B/op 2004 allocs/op
BenchmarkGopool
BenchmarkGopool-8 178 6942255 ns/op 17108 B/op 1019 allocs/op
BenchmarkAnts
BenchmarkAnts-8 174 6300705 ns/op 16157 B/op 1002 allocs/op
BenchmarkNbio
BenchmarkNbio-8 176 7084957 ns/op 48071 B/op 2995 allocs/op
PASS
package bench
import (
"sync"
)
type (
WorkerQueue struct {
mu *sync.Mutex // 锁
q []Job // 任务队列
maxConcurrency int32 // 最大并发
curConcurrency int32 // 当前并发
}
Job func()
)
// NewWorkerQueue 创建一个任务队列
func NewWorkerQueue(maxConcurrency int32) *WorkerQueue {
return &WorkerQueue{
mu: &sync.Mutex{},
maxConcurrency: maxConcurrency,
curConcurrency: 0,
}
}
// 获取一个任务
func (c *WorkerQueue) getJob(delta int32) Job {
c.mu.Lock()
defer c.mu.Unlock()
c.curConcurrency += delta
if c.curConcurrency >= c.maxConcurrency {
return nil
}
if n := len(c.q); n == 0 {
return nil
}
var result = c.q[0]
c.q = c.q[1:]
c.curConcurrency++
return result
}
// 递归地执行任务
func (c *WorkerQueue) do(job Job) {
job()
if nextJob := c.getJob(-1); nextJob != nil {
go c.do(nextJob)
}
}
// Push 追加任务, 有资源空闲的话会立即执行
func (c *WorkerQueue) Push(job Job) {
c.mu.Lock()
c.q = append(c.q, job)
c.mu.Unlock()
if item := c.getJob(0); item != nil {
go c.do(item)
}
}
如果觉得对你有帮助, 麻烦给 gws 点个赞吧:)
1
ihciah 2023-03-03 22:40:45 +08:00
说实话这代码我是真没看懂。。
|
2
Mitt 2023-03-03 22:45:51 +08:00 5
不要 channel 反手加了个锁可还行
|
3
Glauben 2023-03-03 22:56:00 +08:00
感觉就是普通的做法,不太理解这样的写法有什么特别的,少的时间是从功能削减上得来的吧。
|
7
littlewing 2023-03-04 00:15:29 +08:00
巧妙?
妹想到啊 妹想到啊 |
8
hsfzxjy 2023-03-04 00:32:04 +08:00 via Android 1
建议把 q 当成循环队列,复用前面空的位置,可以减少 alloc 次数
|
10
voidmnwzp 2023-03-04 01:26:25 +08:00 via iPhone
这跟 go 有啥关系啊 没了 channel 任何语言都能更轻松实现啊
|
11
Trim21 2023-03-04 01:27:30 +08:00 via Android
只看标题我以为是没 mutex 的…
|
12
securityCoding 2023-03-04 02:30:21 +08:00 via Android
看看 ring buffer 无锁队列实现方式。。。
|
15
Nazz OP |
17
rrfeng 2023-03-04 07:42:04 +08:00 via Android
benchmark 没有对比 channel 的吗?
|
19
chuanqirenwu 2023-03-04 21:31:18 +08:00
gws 的思路是自己实现一个极简的 eventloop ,而不用 go 自带的协程机制,从而没有什么额外的开销,提高性能?
|
20
Nazz OP @chuanqirenwu 同步模式没开额外协程,异步模式会开非常驻的协程,执行完任务就退出, 两种模式都没使用 channel.
|
21
MindMindMax 2023-03-05 04:16:09 +08:00
对于常规项目价值在哪? channel 的价值又在哪?
|
22
Nazz OP @MindMindMax 尽量使用 mutex 替代 chan. 很多时候保证线程安全就行了,不需要多线程通信. channel 我用得最多的地方是线程同步和超时控制.
|
23
Nazz OP @chuanqirenwu 确实有 EventLoop. 最开始我是模仿的 JS, 因为我认为 JS WebSocket API 比 gorilla/nhooyr 这些提供的都要清晰得多. 初版只有 Sync IO, Read=>Event Handler=>Write 循环往复. 后面在此基础上加了 Async IO, AIO 模式在每个连接上有读写两个任务队列(并发度分别是 N 和 1), 就是我分享的这个实现, 它需要足够的轻量. 两种模式压测表现都比 gorilla 好得多, 原因大概是 Parser 本身的简单高效和无额外常驻协程吧, 如果有, 协程数量会增加一倍.
|
24
chuanqirenwu 2023-03-05 12:45:03 +08:00 1
@Nazz 👍,虽然不怎么搞 go ,但感觉这个思路挺不错的。我看 README 的简介写的是 go websocket server ,是只支持 server 端吗? client 端没有实现?
|
25
Nazz OP @chuanqirenwu 刚实现的 client ,还在测试
|
26
rockuw 2023-03-13 10:21:57 +08:00
mutex 是比 channel 轻量,但是每个 job 新建一个 goroutine 也是有代价的。一个简单的固定 goroutine 数量的实现,测试结果还稍微好一些,分配次数则明显更低:
``` N=10000 goos: linux goarch: amd64 pkg: muwu.com/example/workerqueue cpu: Intel(R) Xeon(R) CPU E5-2682 v4 @ 2.50GHz BenchmarkGwsWorkerQueue-8 903 1310335 ns/op 55471 B/op 2010 allocs/op BenchmarkGopool-8 897 1394589 ns/op 17926 B/op 1059 allocs/op BenchmarkAnts-8 1203 1020211 ns/op 16046 B/op 1001 allocs/op BenchmarkNbio-8 956 1278696 ns/op 48017 B/op 2999 allocs/op BenchmarkChan-8 1004 1181569 ns/op 16016 B/op 1001 allocs/op ``` ```go type workerQueueV1 struct { maxConn int queue chan Job } func newWorkerQueueV1(n int) *workerQueueV1 { wq := &workerQueueV1{ maxConn: n, queue: make(chan Job, 1024), } for i := 0; i < n; i++ { go func() { for job := range wq.queue { job() } }() } return wq } func (wq *workerQueueV1) Push(job Job) { wq.queue <- job } ``` |
27
Nazz OP @rockuw 从你的 Benchmark 结果来看, 差距不大. GwsWorkqueue 是专门为 IO 任务设计的, 每个 WebSocket 连接上有读写两个任务队列, 它们非常轻量, 而且并行读写不会新增常驻协程. 量变产生质变, 每个连接上都增加常驻协程会使 CPU 使用率提高不少. 实际业务中并发不会很高, 可以用优先队列替代普通队列减少 allocs, 收益不高我懒得去优化了, 复用 goroutine 对于 IO 任务收益也不大.
|
28
ClarkAbe 2023-03-23 14:49:55 +08:00 via Android
感觉每次都起一个协程有点浪费...
|