控制并发的方式有两种:
waitGroup会等待所有的goroutine执行完后在执行其他的事
import ( "fmt" "sync" "time" ) func TestWaitGroup() { wg := new(sync.WaitGroup) wg.Add(2) go func1(wg) go func2(wg) wg.Wait() fmt.Println("all finish") } func func1(wg *sync.WaitGroup) { fmt.Println("func1 start") time.Sleep(3 * time.Second) fmt.Println("func1 finish") wg.Done() } func func2(wg *sync.WaitGroup) { fmt.Println("func2 start") time.Sleep(2 * time.Second) fmt.Println("func2 finish") wg.Done() }
context是一个线程安全的请求上下文,携带了请求的全局数据,如超时时间,截止时间,信号量,k-v数据等
Context是一个接口,接口定义如下:
type Context interface { Deadline() (deadline time.Time, ok bool) // 返回 context.Context 被取消的时间,也就是完成工作的截止日期; Done() <-chan struct{} // 返回一个 Channel,这个 Channel 会在当前工作完成或者上下文被取消后关闭 Err() error // 返回 context.Context 结束的原因 //如果 context.Context 被取消,会返回 Canceled 错误; //如果 context.Context 超时,会返回 DeadlineExceeded 错误; Value(key interface{}) interface{} // 从 context.Context 中获取键对应的值,对于同一个上下文来说,多次调用 Value 并传入相同的 Key 会返回相同的结果,该方法可以用来传递请求特定的数据; }
go服务中每一个请求都是单独的goroutine处理的,对于一个请求我们会创建多个goroutine去处理,有的goroutine去做数据库操作,有的goroutine去做rpc请求,context包的作用就是在多个goroutine之间同步请求的特定数据。
每个context.Context都是在最顶层的goroutine中传递给下一层,当上层的goroutine发生错误时,可以将信号传递到下层的goroutine,及时退出该请求的所有goroutine,以减少资源的开销。
import ( "context" "fmt" ) func TestMyContext() { ctx := context.Background() func1(ctx) } func func1(ctx context.Context) { // 将需要传的值放入context中,自请求链路自上而下往下传,键值对绑定 ctx = context.WithValue(ctx,"k1","v1") func2(ctx) } func func2(ctx context.Context) { fmt.Println(ctx.Value("k1").(string)) }
请求中的所有goroutine监听ctx.Done()返回的管道中的信号,一旦监听到信号消费成功,所有goroutine立即退出返回。
// 超时或者手动触发cancel都会发消息到Done管道· context.WithTimeout(context.Background(),time.Second * 3) // 超时时间:3s // 主动取消需要手动执行cancel,cancel执行后,取消信号进入Done管道,请求取消,主动通知机制 ctx,cancel := context.WithCancel(context.Background())
超时取消请求:
// 场景二:超时取消请求 func TestContext2(){ ctx,cancel := context.WithTimeout(context.Background(),time.Second * 3) // 超时时间:3s defer cancel() func11(ctx) time.Sleep(2 * time.Second) } func func11(ctx context.Context) { resp := make(chan struct{}, 1) go func() { // 处理耗时 time.Sleep(time.Second * 10) resp <- struct{}{} }() select { case <- ctx.Done() : // 多个goroutine Done返回的管道中的消息,一旦消费到信号,立即停止工作 fmt.Println("request timeout") case num := <- resp: fmt.Println("正常业务处理:",num) } fmt.Println("finish") }
主动取消请求
// 主动取消请求 func TestContext3() { wg := new(sync.WaitGroup) wg.Add(1) ctx,cancel := context.WithCancel(context.Background()) go func111(ctx,wg) time.Sleep(2 * time.Second) // main goroutine执行2秒后取消,所有goroutine退出 cancel() wg.Wait() // 等待goroutine退出 fmt.Println("全部goroutine都已退出") } func func111(ctx context.Context, wg *sync.WaitGroup) { defer wg.Done() resp := make(chan int) go func() { // 模型耗时请求 time.Sleep(3 * time.Second) resp <- 10 }() select { case <- ctx.Done(): fmt.Println("request canceled") case num := <- resp: fmt.Println(num) } }