需要多个子Goroutine执行任务,主Goroutine需要等待子Goroutine执行完后才能继续执行
type WaitGroup struct { noCopy noCopy //辅助字段,辅助vet工具检测是否有复制使用 // 64-bit value: high 32 bits are counter, low 32 bits are waiter count. // 64-bit atomic operations require 64-bit alignment, but 32-bit // compilers do not ensure it. So we allocate 12 bytes and then use // the aligned 8 bytes in them as state, and the other 4 as storage // for the sema. state1 [3]uint32 //8字节为state(高32位是计数值、低32位是waiter数),4字节为sema信号量 } type noCopy struct{} // Lock is a no-op used by -copylocks checker from `go vet`. func (*noCopy) Lock() {} func (*noCopy) Unlock() {} // 返回state、sema的指针 func (wg *WaitGroup) state() (statep *uint64, semap *uint32) { if uintptr(unsafe.Pointer(&wg.state1))%8 == 0 { //地址是64位对齐 return (*uint64)(unsafe.Pointer(&wg.state1)), &wg.state1[2] } else { //地址是32位对齐 return (*uint64)(unsafe.Pointer(&wg.state1[1])), &wg.state1[0] } } func (wg *WaitGroup) Add(delta int) { statep, semap := wg.state() state := atomic.AddUint64(statep, uint64(delta)<<32) //原子操作增加计数值 v := int32(state >> 32) //add后的计数值 w := uint32(state) //waiter数 if v < 0 { //计数值小于0 panic("sync: negative WaitGroup counter") } if w != 0 && delta > 0 && v == int32(delta) { //Add、Wait并发调用 panic("sync: WaitGroup misuse: Add called concurrently with Wait") } if v > 0 || w == 0 { //计数大于0或者waiter数为0,不需要唤醒waiter,直接返回 return } //Add后计数为0并且有waiter if *statep != state { //Add、Wait并发调用 panic("sync: WaitGroup misuse: Add called concurrently with Wait") } // 重置waiter数为0 *statep = 0 for ; w != 0; w-- { //唤醒waiter runtime_Semrelease(semap, false, 0) } } func (wg *WaitGroup) Done() { wg.Add(-1) } func (wg *WaitGroup) Wait() { statep, semap := wg.state() for { state := atomic.LoadUint64(statep) v := int32(state >> 32) //计数值 w := uint32(state) //waiter数 if atomic.CompareAndSwapUint64(statep, state, state+1) { //waiter数新增1 runtime_Semacquire(semap) //阻塞等待 if *statep != 0 { //这里waiter被Add方法唤醒前,会重置state为0,这里不为0说明wg被重用了,panic panic("sync: WaitGroup is reused before previous Wait has returned") } return } } }
Add(delta int)方法:给计数值增加delta,因为Done也是调用Add,所以增加后的计数值等于0并且waiter数不为0时需要唤醒waiter
Add中的异常情况检查:
Wait方法:waiter数增加1,然后阻塞等待
Wait中的异常情况检查:
noCopy类型实现了Locker接口,vet工具就是对实现了Locker接口的类型进行静态检查,看是否存在复制值使用的情况
两种情况:
正确使用姿势:创建完WaitGroup后,直接调用Add方法传入预期的waiter数,然后调用相同次数的Done;或者在每个goroutine创建前调用Add(1),每个goroutine执行完后调用Done
要保证所有的Add都在Wait前调用,使用上面的姿势就没问题
package main import "sync" func main() { var wg sync.WaitGroup wg.Add(1) go func() { println("doing things") wg.Done() wg.Add(1) //这里想重用wg,但是如果和第13行并发那么会panic }() wg.Wait() }
WaitGroup虽然可以重用,但是要等上一轮的Wait方法返回后才能重用,否则就可能出现panic