本文主要研究一下tempodb的Pool
tempo/tempodb/pool/pool.go
type Pool struct { cfg *Config size *atomic.Int32 workQueue chan *job shutdownCh chan struct{} }
Pool定义了cfg、size、workQueue、shutdownCh属性
tempo/tempodb/pool/pool.go
type job struct { ctx context.Context cancel context.CancelFunc payload interface{} fn JobFunc wg *sync.WaitGroup resultsCh chan []byte stop *atomic.Bool err *atomic.Error } type JobFunc func(ctx context.Context, payload interface{}) ([]byte, error)
job定义了ctx、cancel、payload、JobFunc、wg、resultsCh、stop、err属性;JobFunc接收payload,返回[]byte
类型的结果
tempo/tempodb/pool/pool.go
type Config struct { MaxWorkers int `yaml:"max_workers"` QueueDepth int `yaml:"queue_depth"` } // default is concurrency disabled func defaultConfig() *Config { return &Config{ MaxWorkers: 30, QueueDepth: 10000, } }
Config可以指定MaxWorkers、QueueDepth两个属性;defaultConfig默认的配置是MaxWorkers为30,QueueDepth为10000
tempo/tempodb/pool/pool.go
func NewPool(cfg *Config) *Pool { if cfg == nil { cfg = defaultConfig() } q := make(chan *job, cfg.QueueDepth) p := &Pool{ cfg: cfg, workQueue: q, size: atomic.NewInt32(0), shutdownCh: make(chan struct{}), } for i := 0; i < cfg.MaxWorkers; i++ { go p.worker(q) } p.reportQueueLength() metricQueryQueueMax.Set(float64(cfg.QueueDepth)) return p }
NewPool根据Config创建Pool,同时根据cfg.MaxWorkers启动对应个数的p.worker(q),然后执行p.reportQueueLength()
tempo/tempodb/pool/pool.go
func (p *Pool) worker(j <-chan *job) { for { select { case <-p.shutdownCh: return case j, ok := <-j: if !ok { return } runJob(j) p.size.Dec() } } }
worker方法通过for循环进行select,若是p.shutdownCh则直接return跳出循环;若是接收到新job则执行runJob及p.size.Dec()
tempo/tempodb/pool/pool.go
func runJob(job *job) { defer job.wg.Done() if job.stop.Load() { return } msg, err := job.fn(job.ctx, job.payload) if msg != nil { job.stop.Store(true) // one job was successful. stop all others // Commenting out job cancellations for now because of a resource leak suspected in the GCS golang client. // Issue logged here: https://github.com/googleapis/google-cloud-go/issues/3018 // job.cancel() select { case job.resultsCh <- msg: default: // if we hit default it means that something else already returned a good result. /shrug } } if err != nil { job.err.Store(err) } }
runJob方法先注册job.wg.Done()的defer,然后判断job.stop,若为true直接return;之后执行job.fn,若msg不为nil则标记job.stop为true,然后写入msg到job.resultsCh;若err不为nil则执行job.err.Store
tempo/tempodb/pool/pool.go
func (p *Pool) Shutdown() { close(p.workQueue) close(p.shutdownCh) }
Shutdown方法关闭p.workQueue、p.shutdownCh这两个channel
tempo/tempodb/pool/pool.go
func (p *Pool) RunJobs(ctx context.Context, payloads []interface{}, fn JobFunc) ([]byte, error) { ctx, cancel := context.WithCancel(ctx) defer cancel() totalJobs := len(payloads) // sanity check before we even attempt to start adding jobs if int(p.size.Load())+totalJobs > p.cfg.QueueDepth { return nil, fmt.Errorf("queue doesn't have room for %d jobs", len(payloads)) } resultsCh := make(chan []byte, 1) // way for jobs to send back results err := atomic.NewError(nil) // way for jobs to send back an error stop := atomic.NewBool(false) // way to signal to the jobs to quit wg := &sync.WaitGroup{} // way to wait for all jobs to complete // add each job one at a time. even though we checked length above these might still fail for _, payload := range payloads { wg.Add(1) j := &job{ ctx: ctx, cancel: cancel, fn: fn, payload: payload, wg: wg, resultsCh: resultsCh, stop: stop, err: err, } select { case p.workQueue <- j: p.size.Inc() default: wg.Done() stop.Store(true) return nil, fmt.Errorf("failed to add a job to work queue") } } // wait for all jobs to finish wg.Wait() // see if anything ended up in the results channel var msg []byte select { case msg = <-resultsCh: default: } // ignore err if msg != nil. otherwise errors like "context cancelled" // will take precedence over the err if msg != nil { return msg, nil } return nil, err.Load() }
RunJobs方法遍历payloads创建job,然后放到p.workQueue;它使用WaitGroup来等待所有job执行完成,最后接收msg返回
tempodb提供了一个job的pool,NewPool根据Config创建Pool,同时根据cfg.MaxWorkers启动对应个数的p.worker(q),然后执行p.reportQueueLength();RunJobs方法用于提交jobs并等待结果;Shutdown方法用于关闭pool的workQueue、shutdownCh这两个channel。
本文主要研究一下tempodb的Pool
tempo/tempodb/pool/pool.go
type Pool struct { cfg *Config size *atomic.Int32 workQueue chan *job shutdownCh chan struct{} }
Pool定义了cfg、size、workQueue、shutdownCh属性
tempo/tempodb/pool/pool.go
type job struct { ctx context.Context cancel context.CancelFunc payload interface{} fn JobFunc wg *sync.WaitGroup resultsCh chan []byte stop *atomic.Bool err *atomic.Error } type JobFunc func(ctx context.Context, payload interface{}) ([]byte, error)
job定义了ctx、cancel、payload、JobFunc、wg、resultsCh、stop、err属性;JobFunc接收payload,返回[]byte
类型的结果
tempo/tempodb/pool/pool.go
type Config struct { MaxWorkers int `yaml:"max_workers"` QueueDepth int `yaml:"queue_depth"` } // default is concurrency disabled func defaultConfig() *Config { return &Config{ MaxWorkers: 30, QueueDepth: 10000, } }
Config可以指定MaxWorkers、QueueDepth两个属性;defaultConfig默认的配置是MaxWorkers为30,QueueDepth为10000
tempo/tempodb/pool/pool.go
func NewPool(cfg *Config) *Pool { if cfg == nil { cfg = defaultConfig() } q := make(chan *job, cfg.QueueDepth) p := &Pool{ cfg: cfg, workQueue: q, size: atomic.NewInt32(0), shutdownCh: make(chan struct{}), } for i := 0; i < cfg.MaxWorkers; i++ { go p.worker(q) } p.reportQueueLength() metricQueryQueueMax.Set(float64(cfg.QueueDepth)) return p }
NewPool根据Config创建Pool,同时根据cfg.MaxWorkers启动对应个数的p.worker(q),然后执行p.reportQueueLength()
tempo/tempodb/pool/pool.go
func (p *Pool) worker(j <-chan *job) { for { select { case <-p.shutdownCh: return case j, ok := <-j: if !ok { return } runJob(j) p.size.Dec() } } }
worker方法通过for循环进行select,若是p.shutdownCh则直接return跳出循环;若是接收到新job则执行runJob及p.size.Dec()
tempo/tempodb/pool/pool.go
func runJob(job *job) { defer job.wg.Done() if job.stop.Load() { return } msg, err := job.fn(job.ctx, job.payload) if msg != nil { job.stop.Store(true) // one job was successful. stop all others // Commenting out job cancellations for now because of a resource leak suspected in the GCS golang client. // Issue logged here: https://github.com/googleapis/google-cloud-go/issues/3018 // job.cancel() select { case job.resultsCh <- msg: default: // if we hit default it means that something else already returned a good result. /shrug } } if err != nil { job.err.Store(err) } }
runJob方法先注册job.wg.Done()的defer,然后判断job.stop,若为true直接return;之后执行job.fn,若msg不为nil则标记job.stop为true,然后写入msg到job.resultsCh;若err不为nil则执行job.err.Store
tempo/tempodb/pool/pool.go
func (p *Pool) Shutdown() { close(p.workQueue) close(p.shutdownCh) }
Shutdown方法关闭p.workQueue、p.shutdownCh这两个channel
tempo/tempodb/pool/pool.go
func (p *Pool) RunJobs(ctx context.Context, payloads []interface{}, fn JobFunc) ([]byte, error) { ctx, cancel := context.WithCancel(ctx) defer cancel() totalJobs := len(payloads) // sanity check before we even attempt to start adding jobs if int(p.size.Load())+totalJobs > p.cfg.QueueDepth { return nil, fmt.Errorf("queue doesn't have room for %d jobs", len(payloads)) } resultsCh := make(chan []byte, 1) // way for jobs to send back results err := atomic.NewError(nil) // way for jobs to send back an error stop := atomic.NewBool(false) // way to signal to the jobs to quit wg := &sync.WaitGroup{} // way to wait for all jobs to complete // add each job one at a time. even though we checked length above these might still fail for _, payload := range payloads { wg.Add(1) j := &job{ ctx: ctx, cancel: cancel, fn: fn, payload: payload, wg: wg, resultsCh: resultsCh, stop: stop, err: err, } select { case p.workQueue <- j: p.size.Inc() default: wg.Done() stop.Store(true) return nil, fmt.Errorf("failed to add a job to work queue") } } // wait for all jobs to finish wg.Wait() // see if anything ended up in the results channel var msg []byte select { case msg = <-resultsCh: default: } // ignore err if msg != nil. otherwise errors like "context cancelled" // will take precedence over the err if msg != nil { return msg, nil } return nil, err.Load() }
RunJobs方法遍历payloads创建job,然后放到p.workQueue;它使用WaitGroup来等待所有job执行完成,最后接收msg返回
tempodb提供了一个job的pool,NewPool根据Config创建Pool,同时根据cfg.MaxWorkers启动对应个数的p.worker(q),然后执行p.reportQueueLength();RunJobs方法用于提交jobs并等待结果;Shutdown方法用于关闭pool的workQueue、shutdownCh这两个channel。