- 维持共享数据一致性,并发安全
- 控制流程管理,更好的协同工作
func TestDemo1(t *testing.T) { var mut sync.Mutex maxSize := 10 counter := 0 // 排水口 go func() { for { mut.Lock() if counter == maxSize { for i := 0; i < maxSize; i++ { counter-- log.Printf("OUTPUT counter = %d", counter) } } mut.Unlock() time.Sleep(1 * time.Second) } }() // 注水口 for { mut.Lock() if counter == 0 { for i := 0; i < maxSize; i++ { counter++ log.Printf(" INPUT counter = %d", counter) } } mut.Unlock() time.Sleep(1 * time.Second) } }
=== RUN TestDemo1 ··· 2020/10/06 13:52:50 INPUT counter = 8 2020/10/06 13:52:50 INPUT counter = 9 2020/10/06 13:52:50 INPUT counter = 10 2020/10/06 13:52:50 OUTPUT counter = 9 2020/10/06 13:52:50 OUTPUT counter = 8 2020/10/06 13:52:50 OUTPUT counter = 7 ···
func TestDemo2(t *testing.T) { var mut sync.Mutex maxSize := 10 counter := 0 // 排水口 go func() { for { mut.Lock() if counter != 0 { counter-- } log.Printf("OUTPUT counter = %d", counter) mut.Unlock() time.Sleep(5 * time.Second) // 为了演示效果,睡眠5秒 } }() // 注水口 for { mut.Lock() if counter != maxSize { counter++ } log.Printf(" INPUT counter = %d", counter) mut.Unlock() time.Sleep(1 * time.Second) // 为了演示效果,睡眠1秒 } }
=== RUN TestDemo2 ··· 2020/10/06 14:11:46 INPUT counter = 7 2020/10/06 14:11:47 INPUT counter = 8 2020/10/06 14:11:48 OUTPUT counter = 7 2020/10/06 14:11:48 INPUT counter = 8 2020/10/06 14:11:49 INPUT counter = 9 2020/10/06 14:11:50 INPUT counter = 10 2020/10/06 14:11:51 INPUT counter = 10 2020/10/06 14:11:52 INPUT counter = 10 2020/10/06 14:11:53 OUTPUT counter = 9 2020/10/06 14:11:53 INPUT counter = 10 2020/10/06 14:11:54 INPUT counter = 10 2020/10/06 14:11:55 INPUT counter = 10 2020/10/06 14:11:56 INPUT counter = 10 2020/10/06 14:11:57 INPUT counter = 10 2020/10/06 14:11:58 OUTPUT counter = 9 2020/10/06 14:11:58 INPUT counter = 10 2020/10/06 14:11:59 INPUT counter = 10 ···
条件变量总是与互斥锁组合使用,除了可以使用 Lock、Unlock,还有如下三个方法:
- Wait 等待通知
- Signal 单发通知
- Broadcast 广播通知
func TestDemo3(t *testing.T) { cond := sync.NewCond(new(sync.Mutex)) // 初始化条件变量 maxSize := 10 counter := 0 // 排水口 go func() { for { cond.L.Lock() // 上锁 if counter == 0 { // 没水了 cond.Wait() // 啥时候来水?等通知! } counter-- log.Printf("OUTPUT counter = %d", counter) cond.Signal() // 单发通知:已排水 cond.L.Unlock() // 解锁 time.Sleep(5 * time.Second) // 为了演示效果,睡眠5秒 } }() // 注水口 for { cond.L.Lock() // 上锁 if counter == maxSize { // 水满了 cond.Wait() // 啥时候排水?等待通知! } counter++ log.Printf(" INPUT counter = %d", counter) cond.Signal() // 单发通知:已来水 cond.L.Unlock() // 解锁 time.Sleep(1 * time.Second) // 为了演示效果,睡眠1秒 } }
=== RUN TestDemo3 ··· 2020/10/06 14:51:22 INPUT counter = 7 2020/10/06 14:51:23 INPUT counter = 8 2020/10/06 14:51:24 OUTPUT counter = 7 2020/10/06 14:51:24 INPUT counter = 8 2020/10/06 14:51:25 INPUT counter = 9 2020/10/06 14:51:26 INPUT counter = 10 2020/10/06 14:51:29 OUTPUT counter = 9 2020/10/06 14:51:29 INPUT counter = 10 2020/10/06 14:51:34 OUTPUT counter = 9 2020/10/06 14:51:34 INPUT counter = 10 ···
func TestDemo4(t *testing.T) { cond := sync.NewCond(new(sync.Mutex)) // 初始化条件变量 maxSize := 10 counter := 0 // 排水口 1 go func() { for { cond.L.Lock() // 上锁 if counter == 0 { // 没水了 //for counter == 0 { // 没水了 cond.Wait() // 啥时候来水?等通知! } counter-- log.Printf("OUTPUT A counter = %d", counter) cond.Broadcast() // 单发通知:已排水 cond.L.Unlock() // 解锁 //time.Sleep(2 * time.Second) // 为了演示效果,睡眠5秒 } }() // 排水口 2 go func() { for { cond.L.Lock() // 上锁 if counter == 0 { // 没水了 //for counter == 0 { // 没水了 cond.Wait() // 啥时候来水?等通知! } counter-- log.Printf("OUTPUT B counter = %d", counter) cond.Broadcast() // 单发通知:已排水 cond.L.Unlock() // 解锁 //time.Sleep(2 * time.Second) // 为了演示效果,睡眠5秒 } }() // 注水口 for { cond.L.Lock() // 上锁 if counter == maxSize { // 水满了 //for counter == maxSize { // 水满了 cond.Wait() // 啥时候排水?等待通知! } counter++ log.Printf(" INPUT counter = %d", counter) cond.Broadcast() // 单发通知:已来水 cond.L.Unlock() // 解锁 //time.Sleep(1 * time.Second) // 为了演示效果,睡眠1秒 } }
=== RUN TestDemo4 ··· 2020/10/07 20:57:30 OUTPUT B counter = 2 2020/10/07 20:57:30 OUTPUT B counter = 1 2020/10/07 20:57:30 OUTPUT B counter = 0 2020/10/07 20:57:30 OUTPUT A counter = -1 2020/10/07 20:57:30 OUTPUT A counter = -2 2020/10/07 20:57:30 OUTPUT A counter = -3 2020/10/07 20:57:30 OUTPUT A counter = -4 ··· 2020/10/07 20:57:31 OUTPUT B counter = -7605 2020/10/07 20:57:31 INPUT counter = -7604 2020/10/07 20:57:31 OUTPUT A counter = -7605 2020/10/07 20:57:31 OUTPUT A counter = -7606 ···
在《Go并发编程之传统同步—(1)互斥锁》文章中,程序因为没有加上互斥锁,出现过 counter 值异常的情况。
但这次程序这次加了互斥锁,按理说形成了一个临界区应该是没有问题了,所以问题应该不是出在临界区上,难道问题出在 Wait 上?
func (c *Cond) Wait() { // 检查 c 是否是被复制的,如果是就 panic c.checker.check() // 将当前 goroutine 加入等待队列 t := runtime_notifyListAdd(&c.notify) c.L.Unlock() // 等待当前 goroutine 被唤醒 runtime_notifyListWait(&c.notify, t) c.L.Lock() }
原来 Wait 内部的执行流程是,先执行了解锁,然后进入等待状态,接到通知之后,再执行加锁操作。
那按照这个代码逻辑结合输出日志,走一程序遍流程,看看能不能复现出 counter 为负值的情况:
- 注水口将 counter 累加到 10 之后,发送广播通知(Broadcast)。
- goroutine A 在“第1步”之前的时候进入了等待通知(Wait),现在接收到了广播通知(Broadcast),从 runtime_notifyListWait() 返回,并且成功执行了加锁(Lock)操作。
- goroutine B 在“第1步”之前的时候进入了等待通知(Wait),现在接收到了广播通知(Broadcast),从 runtime_notifyListWait() 返回,在执行加锁(Lock)操作的时候,发现 goroutine A 先抢占了临界区,所以一直阻塞在 c.L.Lock()。
- goroutine A 虽然完成任务后会释放锁,但是每次也成功将锁抢占,所以就这样 一直将 counter 减到了 0,然后发送广播通知(Broadcast)、解锁(Unlock)。
- goroutine B 在 goroutine A 解锁后,成功获得锁并从 Lock 方法中返回,接下来跳出 Wait 方法、跳出 if 判断,执行 counter--(0--),这时候 counter 的值是 -1
问题就出现在第五步,只要 goroutine B 加锁成功的时候,再判断一下 counter 是否为 0 就好了。
所以将 if counter == 0 改成 for counter == 0,这样上面的“第五步”就变成了
5.goroutine B 在 goroutine A 解锁后,成功加锁(Lock)并从阻塞总返回,接下来跳出 Wait 方法、再次进入 for 循环,判断 counter == 0 结果为真,再次进入等待(Wait)。
Luck() Wait() Broadcast()// Signal() Unlock() // 或者 Luck() Wait() Unlock() Broadcast()// Signal() // 两种写法都不会报错
在 go 的发送通知方法(Broadcast、Signal)上有这么一段话:
// It is allowed but not required for the caller to hold c.L \
// during the call.
在我以往的 C 多线程开发的时候,发送通知总是在锁中的:
pthread_mutex_lock(&thread->mutex); // ... pthread_cond_signal(&thread->cond); pthread_mutex_unlock(&thread->mutex);
在 man 手册中有写到:
The pthread_cond_broadcast() or pthread_cond_signal() functions may be called by a thread whether or not it currently owns the mutex that threads calling pthread_cond_wait() or pthread_cond_timedwait() have associated with the condition variable during their waits; however, if predictable scheduling behavior is required, then that mutex shall be locked by the thread calling pthread_cond_broadcast() or pthread_cond_signal().
消息通知是有即时性的,如果没有 goroutine 在等待通知,那么这次通知直接被丢弃。