现在很多公司都在陆续的搭建golang的语言栈,大家有没有想过为什么会出现这种情况?一是因为go比较适合做中间件,还有一个原因就是go的并发支持比较好,也就是咱们平时所谓的高并发,并发支持离不开协程,当然协程也不是乱用的,需要管理起来,管理协程的方式就是协程池,所以协程池也并没有那么神秘,今天咱们就来一步一步的揭开协程池的面纱,如果你没有接触过go的协程这块的话也没有关系,我会尽量写的详细。
先来看一个简单的例子
func go_worker(name string) { for i := 0; i < 5; i++ { fmt.Println("我的名字是", name) time.Sleep(1 * time.Second) } fmt.Println(name, "执行完毕")}func main() { go_worker("123") go_worker("456") for i := 0; i < 5; i++ { fmt.Println("我是main") time.Sleep(1 * time.Second) }}
咱们在执行这段代码的时候,当然是按照顺序执行
go_worker(“123”)->go_worker(“456”)->我是main执行
输出结果如下
我的名字是 123我的名字是 123我的名字是 123我的名字是 123我的名字是 123123 执行完毕我的名字是 456我的名字是 456我的名字是 456我的名字是 456我的名字是 456456 执行完毕我是main我是main我是main我是main我是main
这样的执行是并行的,也就是说必须得等一个任务执行结束,下一个任务才会开始,如果某个任务比较慢的话,整个程序的效率是可想而知的,但是在go语言中,支持协程,所以我们可以把上面的代码改造一下
func go_worker(name string) { for i := 0; i < 5; i++ { fmt.Println("我的名字是", name) time.Sleep(1 * time.Second) } fmt.Println(name, "执行完毕")}func main() { go go_worker("123") //协程 go go_worker("456") //协程 for i := 0; i < 5; i++ { fmt.Println("我是main") time.Sleep(1 * time.Second) }}
我们在不同的go_worker前面加上了一个go,这样所有任务就异步的串行了起来,输出结果如下
我是main我的名字是 456我的名字是 123我的名字是 123我是main我的名字是 456我是main我的名字是 456我的名字是 123我是main我的名字是 456我的名字是 123我的名字是 456我的名字是 123我是main
大家可以看到这样的话就是各自任务执行各自的事情,互相不影响,效率也得到了很大的提升,这就是goroutine
有了协程之后就会带来一个新的问题,协程之间是如何通信的?于是就引出了管道这个概念,管道其实很简单,无非就是往里放数据,往外取数据而已
func worker(c chan int) { num := <-c //读取管道中的数据,并输出 fmt.Println("接收到参数c:", num)}func main() { //channel的创建,需要执行管道数据的类型,我们这里是int c := make(chan int) //开辟一个协程 去执行worker函数 go worker(c) c <- 2 //往管道中写入2 fmt.Println("main")}
我们可以看到上述例子,在main函数中,我们定义了一个管道,为int类型,而且往里面写入了一个2,然后在worker中读取管道c,就能获取到2
既然golang中开启协程这么方便,那么会不会存在什么坑呢?
我们可以看上图,实际业务中,不同的业务都开启不同的goroutine来执行,但是在cpu微观层面上来讲,是串行的一个指令一个指令去执行的,只是执行的非常快而已,如果指令来的太多,cpu的切换也会变多,在切换的过程中就需要消耗性能,所以协程池的主要作用就是管理goroutine,限定goroutine的个数
首先不同的任务,请求过来,直接往entryChannel中写入,entryChannel再和jobsChannel建立通信
然后我们固定开启三个协程(不一定是三个,只是用三个举例子),固定的从jobsChannel中读取数据,来进行任务处理。
其实本质上,channel就是一道桥梁,做一个中转的作用,之所以要设计一个jobsChannel和entryChannel,是为了解耦,entryChannel可以完全用做入口,jobsChannel可以做更深入的比如任务优先级,或者加锁,解锁等处理
原理清楚了,接下来我们来具体看代码实现
首先我们来处理任务 task,task无非就是业务中的各种任务,需要能实力化,并且执行,代码如下
//定义任务Task类型,每一个任务Task都可以抽象成一个函数type Task struct{ f func() error //一个task中必须包含一个具体的业务}//通过NewTask来创建一个Taskfunc NewTask(arg_f func() error) *Task{ t := Task{ f:arg_f, } return &t}//Task也需要一个执行业务的方法func (t *Task) Execute(){ t.f()//调用任务中已经绑定好的业务方法}
接下来我们来定义协程池
//定义池类型type Pool struct{ EntryChannel chan *Task WorkerNum int JobsChanel chan *Task}//创建一个协程池func NewPool(cap int) *Pool{ p := Pool{ EntryChannel: make(chan *Task), JobsChanel: make(chan *Task), WorkerNum: cap, } return &p}
协程池需要创建worker,然后不断的从JobsChannel内部任务队列中拿任务开始工作
//协程池创建worker并开始工作func (p *Pool) worker(workerId int){ //worker不断的从JobsChannel内部任务队列中拿任务 for task := range p.JobsChanel{ task.Execute() fmt.Println("workerId",workerId,"执行任务成功") }}
EntryChannel获取Task任务func (p *Pool) ReceiveTask(t *Task){ p.EntryChannel <- t}
//让协程池开始工作func (p *Pool) Run(){ //1:首先根据协程池的worker数量限定,开启固定数量的worker for i:=0; i<p.WorkerNum; i++{ go p.worker(i) } //2:从EntryChannel协程出入口取外部传递过来的任务 //并将任务送进JobsChannel中 for task := range p.EntryChannel{ p.JobsChanel <- task } //3:执行完毕需要关闭JobsChannel和EntryChannel close(p.JobsChanel) close(p.EntryChannel)}
然后我们看在main函数中
//创建一个task t:= NewTask(func() error{ fmt.Println(time.Now()) return nil }) //创建一个协程池,最大开启5个协程worker p:= NewPool(3) //开启一个协程,不断的向Pool输送打印一条时间的task任务 go func(){ for { p.ReceiveTask(t)//把任务推向EntryChannel } }() //启动协程池p p.Run()
基于上述方法,咱们一个简单的协程池设计就完成了,当然在实际生产环境中这样做还是不够的,不过这些方法能手写出来,那对golang是相当熟悉了,如果需要获取完整代码,可以拉到下方扫描二维码关注本公众号,回复"协程池"即可获取。