f() // call f(); wait for it to return go f() // create a new goroutine that calls f(); don't wait
ch := make(chan int)
<-
运算符。在发送语句中,<-
运算符分割channel和要发送的值。在接收语句中,<-
运算符写在channel对象之前。一个不使用接收结果的接收操作也是合法的。ch <- x // 发送消息 x = <-ch // 从 channel 中接收消息 <-ch // 从 channel 接收并丢弃消息
使用内置的close函数就可以关闭一个channel:
close(ch)
以最简单方式调用make函数创建的时一个无缓冲的channel,但是我们也可以指定第二个整形参数,对应channel的容量。如果channel的容量大于零,那么该channel就是带缓冲的channel。
ch = make(chan int) // unbuffered channel ch = make(chan int, 0) // unbuffered channel ch = make(chan int, 3) // buffered channel with capacity 3
func main() { conn, err := net.Dial("tcp", "localhost:8000") if err != nil { log.Fatal(err) } done := make(chan struct{}) go func() { io.Copy(os.Stdout, conn) // NOTE: ignoring errors log.Println("done") done <- struct{}{} // signal the main goroutine }() mustCopy(conn, os.Stdin) conn.Close() <-done // wait for background goroutine to finish }
struct{}
空结构体作为channels元素的类型,虽然也可以使用bool或int类型实现同样的功能,done <- 1
语句也比done <- struct{}{}
更短。close(naturals)
x, ok := <-naturals
在下面的程序中,我们的计数器goroutine只生成100个含数字的序列,然后关闭naturals对应的channel,这将导致计算平方数的squarer对应的goroutine可以正常终止循环并关闭squares对应的channel。(在一个更复杂的程序中,可以通过defer语句关闭对应的channel。)最后,主goroutine也可以正常终止循环并退出程序。
func main() { naturals := make(chan int) squares := make(chan int) // Counter go func() { for x := 0; x < 100; x++ { naturals <- x } close(naturals) }() // Squarer go func() { for x := range naturals { squares <- x * x } close(squares) }() // Printer (in main goroutine) for x := range squares { fmt.Println(x) } }
为了表明这种意图并防止被滥用,Go语言的类型系统提供了单方向的channel类型,分别用于只发送或只接收的channel。类型chan<- int
表示一个只发送int的channel,只能发送不能接收。相反,类型<-chan int
表示一个只接收int的channel,只能接收不能发送。(箭头<-
和关键字chan的相对位置表明了channel的方向。)这种限制将在编译期检测。
这是改进的版本,这一次参数使用了单方向channel类型:
func counter(out chan<- int) { for x := 0; x < 100; x++ { out <- x } close(out) } func squarer(out chan<- int, in <-chan int) { for v := range in { out <- v * v } close(out) } func printer(in <-chan int) { for v := range in { fmt.Println(v) } } func main() { naturals := make(chan int) squares := make(chan int) go counter(naturals) go squarer(squares, naturals) printer(squares) }
调用counter(naturals)将导致将chan int
类型的naturals隐式地转换为chan<- int
类型只发送型的channel。调用printer(squares)也会导致相似的隐式转换,这一次是转换为<-chan int
类型只接收型的channel。任何双向channel向单向channel变量的赋值操作都将导致该隐式转换。
带缓存的Channel内部持有一个元素队列。队列的最大容量是在调用make函数创建channel时通过第二个参数指定的。下面的语句创建了一个可以持有三个字符串元素的带缓存Channel。图8.2是ch变量对应的channel的图形表示形式。
ch = make(chan string, 3)
向缓存Channel的发送操作就是向内部缓存队列的尾部插入元素,接收操作则是从队列的头部删除元素。如果内部缓存队列是满的,那么发送操作将阻塞直到因另一个goroutine执行接收操作而释放了新的队列空间。相反,如果channel是空的,接收操作将阻塞直到有另一个goroutine执行发送操作而向队列插入元素。
我们可以在无阻塞的情况下连续向新创建的channel发送三个值:
ch <- "A" ch <- "B" ch <- "C"
此刻,channel的内部缓存队列将是满的(图8.3),如果有第四个发送操作将发生阻塞。
如果我们接收一个值,
fmt.Println(<-ch) // "A"
那么channel的缓存队列将不是满的也不是空的(图8.4),因此对该channel执行的发送或接收操作都不会发送阻塞。通过这种方式,channel的缓存队列解耦了接收和发送的goroutine。
在某些特殊情况下,程序可能需要知道channel内部缓存的容量,可以用内置的cap函数获取:
fmt.Println(cap(ch)) // "3"
同样,对于内置的len函数,如果传入的是channel,那么将返回channel内部缓存队列中有效元素的个数。因为在并发程序中该信息会随着接收操作而失效,但是它对某些故障诊断和性能优化会有帮助。
fmt.Println(len(ch)) // "2"
在继续执行两次接收操作后channel内部的缓存队列将又成为空的,如果有第四个接收操作将发生阻塞:
fmt.Println(<-ch) // "B" fmt.Println(<-ch) // "C"
下面的例子展示了一个使用了带缓存channel的应用。它并发地向三个镜像站点发出请求,三个镜像站点分散在不同的地理位置。它们分别将收到的响应发送到带缓存channel,最后接收者只接收第一个收到的响应,也就是最快的那个响应。因此mirroredQuery函数可能在另外两个响应慢的镜像站点响应之前就返回了结果。(顺便说一下,多个goroutines并发地向同一个channel发送数据,或从同一个channel接收数据都是常见的用法。)
func mirroredQuery() string { responses := make(chan string, 3) go func() { responses <- request("asia.gopl.io") }() go func() { responses <- request("europe.gopl.io") }() go func() { responses <- request("americas.gopl.io") }() return <-responses // return the quickest response } func request(hostname string) (response string) { /* ... */ }
如果我们使用了无缓存的channel,那么两个慢的goroutines将会因为没有人接收而被永远卡住。这种情况,称为goroutines泄漏,这将是一个BUG。和垃圾变量不同,泄漏的goroutines并不会被自动回收,因此确保每个不再需要的goroutine能正常退出是重要的。
关于无缓存或带缓存channel之间的选择,或者是带缓存channel的容量大小的选择,都可能影响程序的正确性。无缓存channel更强地保证了每个发送操作与相应的同步接收操作;但是对于带缓存channel,这些操作是解耦的。同样,即使我们知道将要发送到一个channel的信息的数量上限,创建一个对应容量大小带缓存channel也是不现实的,因为这要求在执行任何接收操作之前缓存所有已经发送的值。如果未能分配足够的缓冲将导致程序死锁。
此外对于buffered channel,我们可以用一个有容量限制的buffered channel来控制并发,这类似于操作系统里的计数信号量概念。从概念上讲,channel里的n个空槽代表n个可以处理内容的token(通行证),从channel里接收一个值会释放其中的一个token,并且生成一个新的空槽位。这样保证了在没有接收介入时最多有n个发送操作。(这里可能我们拿channel里填充的槽来做token更直观一些,不过还是这样吧~)。由于channel里的元素类型并不重要,我们用一个零值的struct{}来作为其元素。
下面的crawl
函数,将对links.Extract
的调用操作用获取、释放token的操作包裹起来,来确保同一时间对其只有20个调用。信号量数量和其能操作的IO
资源数量应保持接近。
// goroutine获取token后,可以进行抓取操作,如果满20了 // 那么 goroutine 会等到有获取 token 后再去执行 var tokens = make(chan struct{}, 20) func crawl(url string) []string { fmt.Println(url) tokens <- struct{}{} // 获取 token list, err := links.Extract(url) <-tokens // 释放 token if err != nil { log.Print(err) } return list }
在并发循环中为了知道最后一个goroutine什么时候结束(最后一个结束并不一定是最后一个开始),我们需要一个递增的计数器,在每一个goroutine启动时加一,在goroutine退出时减一。这需要一种特殊的计数器,这个计数器需要在多个goroutine操作时做到安全并且提供在其减为零之前一直等待的一种方法。这种计数类型被称为sync.WaitGroup,下面的代码就用到了这种方法:
// makeThumbnails6为从通道中接收到的每个文件创建缩略图。 // 返回每个创建的缩略图所占的自己数。 func makeThumbnails6(filenames <-chan string) int64 { sizes := make(chan int64) var wg sync.WaitGroup // number of working goroutines for f := range filenames { wg.Add(1) // worker go func(f string) { defer wg.Done() thumb, err := thumbnail.ImageFile(f) if err != nil { log.Println(err) return } info, _ := os.Stat(thumb) // OK to ignore error sizes <- info.Size() }(f) } // closer go func() { wg.Wait() close(sizes) }() var total int64 for size := range sizes { total += size } return total }
注意Add和Done方法的不对策。Add是为计数器加一,必须在worker goroutine开始之前调用,而不是在goroutine中;否则的话我们没办法确定Add是在"closer" goroutine调用Wait之前被调用。并且Add还有一个参数,但Done却没有任何参数;其实它和Add(-1)是等价的。我们使用defer来确保计数器即使是在出错的情况下依然能够正确地被减掉。上面的程序代码结构是当我们使用并发循环,但又不知道迭代次数时很通常而且很地道的写法。
select语句的一般形式,和switch语句稍微有点相似。也会有几个case和最后的default选择支。每一个case代表一个通信操作(在某个channel上进行发送或者接收)并且会包含一些语句组成的一个语句块。
select { case <-ch1: // ... case x := <-ch2: // ...use x... case ch3 <- y: // ... default: // ... }
一个接收表达式可能只包含接收表达式自身(译注:不把接收到的值赋值给变量什么的),就像上面的第一个case,或者包含在一个简短的变量声明中,像第二个case里一样;第二种形式让你能够在当前 case 块中引用接收到的值。
select会等待case中有能够执行的case时去执行。当条件满足时,select才会去通信并执行case之后的语句;这时候其它通信是不会执行的。一个没有任何case的select语句写作select{},会永远地等待下去。
下面这个例子更微秒。ch这个channel的buffer大小是1,所以会交替的为空或为满,所以只有一个case可以进行下去,无论i是奇数或者偶数,它都会打印0 2 4 6 8。
ch := make(chan int, 1) for i := 0; i < 10; i++ { select { case x := <-ch: fmt.Println(x) // "0" "2" "4" "6" "8" case ch <- i: } }
如果多个case同时就绪时,select会随机地选择一个执行,这样来保证每一个channel都有平等的被select的机会。增加上面例子的buffer大小会使其输出变得不确定,因为当buffer既不为满也不为空时,select语句的执行情况就像是抛硬币的行为一样是随机的。