本文参考如下博客实现了一个简易的协程池
goroutine 太多仍会导致调度性能下降、GC 频繁、内存暴涨, 引发一系列问题。在面临这样的场景时, 限制 goroutine 的数量、重用 goroutine
实现的基本思路是采用生产者-消费者模型,用来执行任务的goroutine作为消费者,操作任务队列的goroutine是生产者,任务队列使用的是go中的buffer channel
任务定义:
// 任务定义 type Task struct { Handler func(v ...interface{}) // 任务处理函数 Params []interface{} // 处理函数参数列表 }
协程池定义:
// 任务池定义 type TaskPool struct { Capacity int64 // 任务池容量 RunningGoroutine int64 // 运行中的goroutine数量 TaskQueue chan *Task // 任务队列 Status int64 // 任务池状态 sync.Mutex PanicHandler func(interface{}) // goroutine异常处理机制 }
协程池状态常量定义
// 协程池状态 const( RUNNING = iota STOP )
全局异常定义:
// 池容量非法异常 var ErrInvalidPoolCap = errors.New("task pool capacity invaild") var ErrPoolAlreadyClosed = errors.New("pool is already go")
新增任务本质就是做goroutine数量检查,小于协程池容量则新启协程,超过就复用原有协程,协程的回收依赖于GC,任务是直接丢进管道,等待消费的goroutine执行
// 新增任务 func (p *TaskPool) Put(t *Task) error{ p.Lock() defer p.Unlock() if p.Status == STOP{ return ErrPoolAlreadyClosed } // 如果协程池未满则新启协程 if p.RunningGoroutine < p.Capacity{ // 协程池未满,则产生协程 p.run() } // 任务入队 p.TaskQueue <- t return nil }
执行任务其实就是监听channel消费具体的任务,这里采用的是带缓冲区的channel,所以消费生产是非阻塞的
// 从任务队列中取出任务执行 func (pool *TaskPool)run() { // 新增运行中的goroutine incRunning(pool) go func() { // 执行完成后运行中的goroutine-- defer func() { decRunning(pool) // goroutine panic if r := recover();r != nil{ if pool.PanicHandler != nil{ pool.PanicHandler(r); } else { // 默认处理 log.Printf("Worker panic: %s\n", r) } } pool.checkRunningWork() }() // 具体goroutine执行策略 for{ select { case task,ok := <- pool.TaskQueue:{ if !ok{ // 任务从管道消费失败 return } // 执行任务 task.Handler(task.Params) } } } }() }
如果某一个goroutine抛出panic就会导致整个程序崩溃退出,为了保证程序安全执行,需要对panic进行recover,进行异常处理,异常处理函数用户自定义
defer func() { decRunning(pool) // goroutine panic if r := recover();r != nil{ if pool.PanicHandler != nil{ pool.PanicHandler(r); } else { // 默认处理 log.Printf("Worker panic: %s\n", r) } } pool.checkRunningWork() }()
关闭协程池需要做两个步骤:
// 安全关闭协程池 func (p *TaskPool) CloseTask() error{ p.Lock() defer p.Unlock() if p.Status == STOP{ return ErrPoolAlreadyClosed } atomic.CompareAndSwapInt64(&p.Status,RUNNING,STOP) // 清空任务队列 for len(p.TaskQueue) > 0 { // 阻塞等待所有任务被 worker 消费 time.Sleep(1e6) // 防止等待任务清空 cpu 负载突然变大, 这里小睡一下 } return nil }
func TestMyPool() { pool,err := InitTaskPool(10) if err != nil{ panic(err) } for i := 0;i < 20;i++{ time.Sleep(1e6) pool.Put(&Task{Handler: func(v ...interface{}) { fmt.Print("i = ",i," ") },Params: []interface{}{i}}) fmt.Println("pool running goroutine size: ",pool.GetPoolRunningGSize()) } }