之前的一篇主题发表于 180 天前 都说 go 简单 小白学完 channel 马上就不会了。
原来主题的主要目标是遍历一个文件夹内的所有文件,并根据不同的文件后缀进行归类。我也去爆栈问过,但是回答都和我这样操作文件不安全相关,并没有正面解决这个问题。
言之种种,在我做了一些针对 channel 的练习之后,算是大概搞清楚了这个例子要怎么写。可能不是 best practice ,但希望能帮到大家,特别是之前一直关注那个帖子的朋友。(有 18 个收藏,所以我一定要把它搞懂不然对不起这 18 个收藏)
======== 在开始之前,我们稍微回顾一下之前的逻辑:
『函数 1 』 是一个简单、耗时很短的功能,它会不断生成一些中间量等待下游处理;『函数 2 』 则是一个复杂、耗时长的功能。如果单线程运行,那么『函数 1 』会长时间等待浪费很多时间。 我们期待通过 goroutine 来并发处理『函数 2 』以达到提升处理性能的目的。
我用的文件处理案例,根据大佬们所说会有线程安全问题 (多个线程可能会同时创建文件夹,应当加写锁),我们先忽略这个问题,主要还是把握后面的方法论哈
1 函数 getInfo (f []fs.FileInfo, c chan<- string)
通过遍历 []fs.FileInfo
结构,进行了一些简单判断后(例如忽略文件夹、.DS_store 这种),不断将文件名写入 c
这个 string channel
中
func getInfo(f []fs.FileInfo, c chan string) {
for _, fs := range f {
// 这个 if 只是用来简单忽略掉 文件夹或者 .DS_store 这种
if fs.IsDir() || strings.HasPrefix(fs.Name(), ".") {
continue
} else {
c <- fs.Name()
}
}
}
2 函数 dealInfo (path string, typeDict map[string]int, c <-chan string)
通过 range
方法,不断获取 c
之前保存的文件名,截取后缀之后,要么转入对应文件夹,要么创建新文件夹再转入
func dealInfo(path string, typeDict map[string]int, c chan string) {
for name := range c {
sp := strings.Split(name, ".")
suffix := sp[len(sp)-1]
if _, ok := typeDict[suffix]; ok {
MoveFile(name, path, suffix)
} else {
CreateFolder(path, suffix)
MoveFile(name, path, suffix)
typeDict[suffix] = 1
fmt.Println(name)
}
}
}
========
到这里其实思路上是没有什么问题的,这里最关键的是没有注意到简单练习里不会提到的一个知识点:用 range 遍历 channel 的时候,需要主动 close channel. 否则 range 会阻塞 channel 直到 deadlock panic. 尽管所有 channel 会在 main channel 结束的时候被强制结束. (大概因为 range
遍历 channel
的时候没有错误处理?)
如果不用 range 的方式来遍历的话,我们需要写一个 if name, ok := <- c; ok { ... }
这样的东西放到一个死循环里面,也就是每次循环都要来手动判断一次 c
里面还有没有东西,没东西了我就跳出循环呗。显然 range
遍历的方式更优雅,但要考虑 close(c)
的时机。
第二个点则是如何 “并发” 处理 函数 2 。如果只用 go func()
,最多只能实现两个 goroutine 之间的通信,所以我们引入了线程池 sync
库来解决这个问题——我们需要给每个 goroutine 加入到线程池里面,但在某个线程工作结束的时候又要把它从池子里面拿掉。最后,还需要一个 wait
函数来通知主线程等待这些线程工作结束。
具体来说,我们需要改写一下前面的『函数 1 』、『函数 2 』 了:
对于『函数 1 』,原始伪代码:
func getInfo(f []fs.FileInfo, c chan<- string){
遍历 f { 处理后的 fineName 写入 c }
}
现在应当改写为:
func getInfo(f []fs.FileInfo, c chan<- string){
// 后面要用 sync.Add 加入池子,所以这里要减去。加入和减去要匹配, 重要!
defer wg.Done()
遍历 f { 处理后的 fineName 写入 c }
// 后面其他函数会用 range 来遍历,所以一定要 close ,重要!
close(c)
}
对于『函数 2 』,由于会用多个 goroutine
并发,那么每一次都需要一个 wg.Add(1)
来加入线程池,所以每一次我们还要从『函数 2 』里减去这个线程
原函数 2 伪代码:
func dealInfo(path string, typeDict map[string]int, c <-chan string){
for _, filename := range c {
判断文件;
处理文件;
}
}
现在改写为:
func dealInfo(path string, typeDict map[string]int, c <-chan string){
defer wg.Done()
for filename := range c {
判断文件;
处理文件;
}
}
非常简单,就是在循环前加一个 defer wg.Done()
就可以了。
最后,我们来写主函数的伪代码:
import ("sync", "fmt", "time", ... )
var wg sync.WaitGroup // 为了创建多线程并发,准备线程池
func getInfo( ... ) // 实现 func1
func dealInfo( ... ) // 实现 func2
func main(){
c := make(chan string, 1000)
start := time.Now()
wg.Add(1)
go getInfo(...)
for i:=0; i<16; i++ {
wg.Add(1)
go dealInfo(...)
}
wg.Wait()
fmt.Println("time: ", time.Since(start))
}
这里应该就能充分暴露前面改写过程中加入的奇怪东西的目的了 😄
可以发现,wg.Add(1)
之后,一定会紧跟一个带有 defer wg.Done()
的函数,来实现线程加减的匹配
而对于比较复杂的『函数 2 』 ,我们通过一个循环来加入 N
个 goroutine
线程。wg.Add(1)
放在循环里面,同时每个 wg.Add()
都必然对应一个 defer wg.Done()
来匹配
最后,别忘了放一个 wg.Wait()
来通知主线程等待所有 wg
的线程执行完毕——它靠的就是不断 Add
,之后又不断 Done
,直到池子里线程归零的那一瞬来判断任务全部结束的。所以 Add
和 Done
必须匹配
另外一个之前没有提到的小改动是,我们建立 c (chan string)
的时候,还给了它一些缓存。这样,由于 getInfo
处理得很快,就可以预存一些结果到 c
里面,在面对 16 个 go dealInfo
的时候,就能保证每个 dealInfo
总是能拿到东西来处理,就不会空闲等待了。这个 N
,我在哪看到资料说是最大 10000 个,好像可以通过配置修改。不过对于大部分的场景,如果要修改这个参数,不如优化代码才是正道
还有一个地方是,我们在循环加入『函数 2 』 goroutine 的时候,wg.Add(1)
放在了循环里面。由于我知道这里的循环会创建 16 个 goroutine ,所以我们也可以一开始就在循环外面 wg.Add(16)
把它一口气全加进去。由于每个循环有一个 defer wg.Done()
,所以最后线程池还是可以归零的。只是这样写如果后期要扩充数量的话会有点不好维护,还是每个循环 +1 ,N 则通过配置文件来提供更妥当。
通过这个例子,感觉自己算是摸到了一点 channel 使用的门路。也体会到了一些 『不要通过共享内存来通信,而应该通过通信来共享内存』的设计思路。
这里还有一个不错的例子,是关于并行获取 < N 的所有素数的。它用到了 3 个 channel 来处理 写入、计算、读取打印。通过这里例子,应该能对 close(channel)
的时机有更好的理解。例子,实现不是很严谨
对于 go 小白如我,这里也是班门弄斧。只是希望能够帮助到之前收藏我文章的朋友,或者其他入门 go 的小伙伴。
1
zjj19950716 2022-11-07 14:01:56 +08:00
这东西不叫线程池吧
|
2
volvo007 OP @zjj19950716 嗯嗯,好的,是叫 『等待组』吗?
|
3
volvo007 OP 补充一下,WaitGroup 是一个计数信号量,可以用来记录并维护运行的 goroutine ,不是线程池。高手们看看笑笑就好,新手们记得自己脑子里做下替换哈
此外文章里还有些概念、描述上的问题(毕竟不是专业的)。我将测试过的完整代码发到这里大家可以自取 https://pastebin.com/aHCGYfEr 已知问题:由于一开始会有若干线程同时尝试创建文件夹,所以会有几个 "file existed" 错误。捕获打印之后可以继续执行代码 |