Channel是Go中重要且独特的一种并发编程原语,借助其线程安全和阻塞的特性,可以实现信息传递、信号通知、互斥锁、任务编排等场景。
有 4 个 goroutine,编号为 1、2、3、4。每秒钟会有一个 goroutine 打印出它自己的编号,要求你编写程序,让输出的编号总是按照 1、2、3、4、1、2、3、4……这个顺序打印出来。
为了实现顺序的数据传递,我们可以定义一个令牌的变量,谁得到令牌,谁就可以打印一次自己的编号,同时将令牌传递给下一个 goroutine。
type Token struct{} func newWorker(id int, ch chan Token, nextCh chan Token) { for { token := <-ch // 取得令牌 fmt.Println((id + 1)) // id从1开始 time.Sleep(time.Second) nextCh <- token } } func main() { chs := []chan Token{make(chan Token), make(chan Token), make(chan Token), make(chan Token)} // 创建4个worker for i := 0; i < 4; i++ { go newWorker(i, chs[i], chs[(i+1)%4]) } //首先把令牌交给第一个worker chs[0] <- struct{}{} select {} }
首先定义一个令牌类型(Token),接着定义一个创建 worker 的方法,这个方法会从它自己的 chan 中读取令牌。哪个 goroutine 取得了令牌,就可以打印出自己编号,因为需要每秒打印一次数据,所以,我们让它休眠 1 秒后,再把令牌交给它的下家。这类场景有一个特点,就是当前持有数据的 goroutine 都有一个信箱,信箱使用 chan 实现,goroutine 只需要关注自己的信箱中的数据,处理完毕后,就把结果发送到下一家的信箱中。
chan 类型有这样一个特点:chan 如果为空,那么,receiver 接收数据的时候就会阻塞等待,直到 chan 被关闭或者有新的数据到来。利用这个机制,我们可以实现 wait/notify 的设计模式。传统的并发原语 Cond 也能实现这个功能。但是,Cond 使用起来比较复杂,容易出错,而使用 chan 实现 wait/notify 模式,就方便多了。
比如,使用 chan 实现程序的 graceful shutdown,在退出之前执行一些连接关闭、文件 close、缓存落盘等一些动作。
func main() { go func() { ...... // 执行业务处理 }() // 处理CTRL+C等中断信号 termChan := make(chan os.Signal) signal.Notify(termChan, syscall.SIGINT, syscall.SIGTERM) <-termChan // 执行退出之前的清理动作 doCleanup() fmt.Println("优雅退出") }
有时候,doCleanup 可能是一个很耗时的操作,比如十几分钟才能完成,如果程序退出需要等待这么长时间,用户是不能接受的,所以,在实践中,我们需要设置一个最长的等待时间。只要超过了这个时间,程序就不再等待,可以直接退出。所以,退出的时候分为两个阶段:closing,代表程序退出,但是清理工作还没做;closed,代表清理工作已经做完。
func main() { var closing = make(chan struct{}) var closed = make(chan struct{}) go func() { // 模拟业务处理 for { select { case <-closing: return default: // ....... 业务计算 time.Sleep(100 * time.Millisecond) } } }() // 处理CTRL+C等中断信号 termChan := make(chan os.Signal) signal.Notify(termChan, syscall.SIGINT, syscall.SIGTERM) <-termChan close(closing) // 执行退出之前的清理动作 go doCleanup(closed) select { case <-closed: case <-time.After(time.Second): fmt.Println("清理超时,不等了") } fmt.Println("优雅退出") } func doCleanup(closed chan struct{}) { time.Sleep((time.Minute)) close(closed) }
使用 chan 也可以实现互斥锁。在 chan 的内部实现中,就有一把互斥锁保护着它的所有字段。从外在表现上,chan 的发送和接收之间也存在着 happens-before 的关系,保证元素放进去之后,receiver 才能读取到。先初始化一个 capacity 等于 1 的 Channel,然后再放入一个元素。这个元素就代表锁,谁取得了这个元素,就相当于获取了这把锁。
// 使用chan实现互斥锁 type Mutex struct { ch chan struct{} } // 使用锁需要初始化 func NewMutex() *Mutex { mu := &Mutex{make(chan struct{}, 1)} mu.ch <- struct{}{} return mu } // 请求锁,直到获取到 func (m *Mutex) Lock() { <-m.ch } // 解锁 func (m *Mutex) Unlock() { select { case m.ch <- struct{}{}: default: panic("unlock of unlocked mutex") } } // 尝试获取锁 func (m *Mutex) TryLock() bool { select { case <-m.ch: return true default: } return false } // 加入一个超时的设置 func (m *Mutex) LockTimeout(timeout time.Duration) bool { timer := time.NewTimer(timeout) select { case <-m.ch: timer.Stop() return true case <-timer.C: } return false } // 锁是否已被持有 func (m *Mutex) IsLocked() bool { return len(m.ch) == 0 } func main() { m := NewMutex() ok := m.TryLock() fmt.Printf("locked v %v\n", ok) ok = m.TryLock() fmt.Printf("locked %v\n", ok) }
用 buffer 等于 1 的 chan 实现互斥锁,在初始化这个锁的时候往 Channel 中先塞入一个元素,谁把这个元素取走,谁就获取了这把锁,把元素放回去,就是释放了锁。元素在放回到 chan 之前,不会有 goroutine 能从 chan 中取出元素的,这就保证了互斥性。
利用 select+chan 的方式,很容易实现 TryLock、Timeout 的功能。具体来说就是,在 select 语句中,我们可以使用 default 实现 TryLock,使用一个 Timer 来实现 Timeout 的功能。
// 后续待补充任务异步处理