一起读Go源码(第13期) sync:WaitGroup
概述
友友们好啊,接下来3篇笔记我们来读sync包的源码——sync包是Go标准库中用于并发同步和共享内存安全访问的核心包,第1篇笔记介绍WaitGroup内容,第2篇笔记介绍读写锁与互斥锁,最后一篇介绍sync包剩余内容(Once、Pool、Map等)。
WaitGroup概述
WaitGroup是Go标注库提供的等待一组协程groutine执行完的工具,功能是让主线程等待所有子协程跑完再继续执行,具具体场景有如并发处理多个文件、并发请求多个接口等,等这些操作完后再执行后续逻辑处理。举个例子,水果店老板的3个小孩负责附近街坊的配送,某天水果店同时收到来自不同地方的3个订单,水果店老板(var wg sync.WaitGroup)于是派出他的3个精兵强将(小孩)同时出去配送水果Add(3),小孩分别拿上水果出发,完成是调用wg.Done()表示配送完成。
for i := 0; i < 3; i++ { gofunc(taskID int) { defer wg.Done() log.Println("派送完成") }(i)}
此时水果店老板肯定是要等3个小孩回来再关门收摊的,所以老板等啊等wg.Wait(),等小孩全部送完水果回来才会关门;下面写一个完整的示例:
func main() { var wg sync.WaitGroup wg.Add(3) go work(&wg) go work(&wg) go work(&wg) wg.Wait() log.Println("全部完成了")}func work(wg *sync.WaitGroup) { defer wg.Done() log.Println("work work work")}//2026/03/31 20:01:01 work work work//2026/03/31 20:01:01 work work work//2026/03/31 20:01:01 work work work//2026/03/31 20:01:01 全部完成了
根据上面的例子捋一遍流程,在创建WaitGroup后,wg.Add(3)表示有3个协程要等,主函数在运行到wg.Wait()时发现计数器>0,主线程阻塞、休眠,此时主线程就是等待者。当子协程执行完wg.Done(),内部调用Add(-1),计数器最终降到0,此时Add方法将自动唤醒所有等待的主线程。主线程停止阻塞,继续执行打印语句全部完成了。
此时要注意的是Wait Group Add的协程和启动协程数量要一致,若添加少了会提前结束;若添加多了主程序会卡死崩溃all goroutines are asleep - deadlock!,因为没有协程在干活了却还在一直等。下面我们进一步来了解WaitGroup,看看具体是怎么实现这些功能的。
WaitGroup结构体
源码注释意思是:WaitGroup用于等待一组goroutines执行完成;主协程调用WaitGroup.Add方法设置需要等待的goroutines的数量。然后每个goroutines启动,再执行完毕后调用WaitGroup.Done方法。同时可以使用WaitGroup.Wait方法进行阻塞,直到所有协程都执行完成。WaitGroup在首次使用后不允许被复制。在Go内存模型的中,WaitGroup.Done调用先于它所解除阻塞点任意Wait调用的返回完成同步。
// A WaitGroup waits for a collection of goroutines to finish.// The main goroutine calls [WaitGroup.Add] to set the number of// goroutines to wait for. Then each of the goroutines// runs and calls [WaitGroup.Done] when finished. At the same time,// [WaitGroup.Wait] can be used to block until all goroutines have finished.//// A WaitGroup must not be copied after first use.//// In the terminology of [the Go memory model], a call to [WaitGroup.Done]// “synchronizes before” the return of any Wait call that it unblocks.//// [the Go memory model]: https://go.dev/ref/memtype WaitGroup struct { noCopy noCopy state atomic.Uint64 // high 32 bits are counter, low 32 bits are waiter count. sema uint32}
看完上面那段注释看WaitGroup结构体也好理解了,
-
• noCopy用于禁止WaitGroup被复制(noCopy是Go内置的编译期检查机制)。 -
• state atomic.Uint64是64位原子变量,一个变量存储两个状态(节约内存并保证原子操作),具体是高32位是协程计数器(即Add方法增加或减少该值,Done => Add(-1)),低32位是等待者数量,表示调用Wait阻塞的协程数量。 -
• sema是信号量,实现阻塞或唤醒功能。
WaitGroup方法
刚才上面的例子用到了3个方法组成了WaitGroup的核心操作,下面来看看这些方法的源码:
Add
注释的意思是:Add方法将delta(可以是负数)加到WaitGroup的计数器上;若计数器变为0,所有因调用Wait方法阻塞的协程都会被释放,若计数器变为负数,Add方法会触发panic;注意当计数器为0时,执行整数delta的Add调用必须发生在Wait调用之前,执行负数delta的Add调用,或计数器大于0时正数delta Add调用,可在任意时机执行。通常这意味着,Add调用应在创建等待的协程的语句之前执行,若复用WaitGroup等待多组件事件,新的Add调用必须在所有的Wait调用返回后执行。
当计数器为0时,执行整数delta的Add调用必须发生在Wait调用之前这句有点拗口,解释一下即计数器为0时,必须重新Add开始新一轮任务,再Wait();也不能一边Wait一边Add;例如下面这个错误实例:
gofunc() { time.Sleep(100) wg.Add(1) // 寄}()wg.Wait() // 寄
// Add adds delta, which may be negative, to the [WaitGroup] counter.// If the counter becomes zero, all goroutines blocked on [WaitGroup.Wait] are released.// If the counter goes negative, Add panics.//// Note that calls with a positive delta that occur when the counter is zero// must happen before a Wait. Calls with a negative delta, or calls with a// positive delta that start when the counter is greater than zero, may happen// at any time.// Typically this means the calls to Add should execute before the statement// creating the goroutine or other event to be waited for.// If a WaitGroup is reused to wait for several independent sets of events,// new Add calls must happen after all previous Wait calls have returned.// See the WaitGroup example.func (wg *WaitGroup) Add(delta int) { if race.Enabled { if delta < 0 { // Synchronize decrements with Wait. race.ReleaseMerge(unsafe.Pointer(wg)) } race.Disable() defer race.Enable() } state := wg.state.Add(uint64(delta) << 32) v := int32(state >> 32) w := uint32(state) if race.Enabled && delta > 0 && v == int32(delta) { // The first increment must be synchronized with Wait. // Need to model this as a read, because there can be // several concurrent wg.counter transitions from 0. race.Read(unsafe.Pointer(&wg.sema)) } if v < 0 { panic("sync: negative WaitGroup counter") } if w != 0 && delta > 0 && v == int32(delta) { panic("sync: WaitGroup misuse: Add called concurrently with Wait") } if v > 0 || w == 0 { return } // This goroutine has set counter to 0 when waiters > 0. // Now there can't be concurrent mutations of state: // - Adds must not happen concurrently with Wait, // - Wait does not increment waiters if it sees counter == 0. // Still do a cheap sanity check to detect WaitGroup misuse. if wg.state.Load() != state { panic("sync: WaitGroup misuse: Add called concurrently with Wait") } // Reset waiters count to 0. wg.state.Store(0) for ; w != 0; w-- { runtime_Semrelease(&wg.sema, false, 0) }}
我们来看看Add函数是怎么实现的:
-
1. 检测数据竞争,通过 race.Enabled实现 -
2. 更新state状态, state := wg.state.Add(uint64(delta) << 32)后得到新的计数器int32(state >> 32)和新的等待者数量uint32(state) -
3. 检测数据竞争,首次Add和Wait同步;然后是安全检查,若 v<0直接panic。 -
4. 误用检查:Add和Wait不能通过并发调用。 -
5. 若检查正常,计数器>0且没有等待者,直接返回。 -
6. 若计数器为0且有等待者,直接panic。 -
7. 重置状态, wg.state.Store(0)
总得来说,Add方法是原子修改计数器,当计数器等于0时唤醒等待的协程。
Done
// Done decrements the [WaitGroup] counter by one.func (wg *WaitGroup) Done() { wg.Add(-1)}
很直白了,调用Add(-1),让计数器-1。
Wait
// Wait blocks until the [WaitGroup] counter is zero.func (wg *WaitGroup) Wait() { if race.Enabled { race.Disable() } for { state := wg.state.Load() v := int32(state >> 32) w := uint32(state) if v == 0 { // Counter is 0, no need to wait. if race.Enabled { race.Enable() race.Acquire(unsafe.Pointer(wg)) } return } // Increment waiters count. if wg.state.CompareAndSwap(state, state+1) { if race.Enabled && w == 0 { // Wait must be synchronized with the first Add. // Need to model this is as a write to race with the read in Add. // As a consequence, can do the write only for the first waiter, // otherwise concurrent Waits will race with each other. race.Write(unsafe.Pointer(&wg.sema)) } runtime_Semacquire(&wg.sema) if wg.state.Load() != 0 { panic("sync: WaitGroup is reused before previous Wait has returned") } if race.Enabled { race.Enable() race.Acquire(unsafe.Pointer(wg)) } return } }}
Wait方法就像望夫石一样,作用就是等待——Wait阻塞当前协程,直到WaitGroup计数器变为0:
-
1. 关闭数据竞争检测, race.Disable() -
2. 通过 for {}死循环一直检查状态 -
3. 读取当前前64位状态,拆分开得到计数器(高32位)和等待者(低32位)数据,若计数器为0就不用等了,直接返回。 -
4. 此时计数器不为0且大于0,意味着还有等待者,还有协程未执行完,因此还需要阻塞等待。此时执行 wg.state.CompareAndSwap(state, state+1)的意思是将等待者数量+1,相当于告诉WaitGroup将自身标记为等待者。 -
5. 系统调用阻塞当前协程,使其休眠不再执行。当Add()方法的计数器变为0时会被唤醒。 -
6. 恢复数据竞争检测,标记同步完成。返回
总体来说,Wait()方法就是在一个for{}循环中不断检查计数器的方法,当计数器>0时原子登记等待者并阻塞休眠,当计数器为0时被Add唤醒并返回。
写在最后
本人是新手小白,如果这篇笔记中有任何错误或不准确之处,真诚地希望各位读者能够给予批评和指正,如有更好的实现方法请给我留言,谢谢!欢迎大家在评论区留言!觉得写得还不错的话欢迎大家关注一波!下一篇继续看读写锁和互斥锁。
夜雨聆风