本文主要研究一下dapr的Limiter
dapr/pkg/concurrency/limiter.go
const ( // DefaultLimit is the default concurrency limit DefaultLimit = 100 ) // Limiter object type Limiter struct { limit int tickets chan int numInProgress int32 }
Limiter定义了limit、tickets、numInProgress属性
dapr/pkg/concurrency/limiter.go
// NewLimiter allocates a new ConcurrencyLimiter func NewLimiter(limit int) *Limiter { if limit <= 0 { limit = DefaultLimit } // allocate a limiter instance c := &Limiter{ limit: limit, tickets: make(chan int, limit), } // allocate the tickets: for i := 0; i < c.limit; i++ { c.tickets <- i } return c }
NewLimiter方法根据limit来创建Limiter,并挨个分配ticket
dapr/pkg/concurrency/limiter.go
// Execute adds a function to the execution queue. // if num of go routines allocated by this instance is < limit // launch a new go routine to execute job // else wait until a go routine becomes available func (c *Limiter) Execute(job func(param interface{}), param interface{}) int { ticket := <-c.tickets atomic.AddInt32(&c.numInProgress, 1) go func(param interface{}) { defer func() { c.tickets <- ticket atomic.AddInt32(&c.numInProgress, -1) }() // run the job job(param) }(param) return ticket }
Execute方法首先获取ticket,然后递增numInProgress,之后异步执行job,执行完后归还ticket
dapr/pkg/concurrency/limiter.go
// Wait will block all the previously Executed jobs completed running. // // IMPORTANT: calling the Wait function while keep calling Execute leads to // un-desired race conditions func (c *Limiter) Wait() { for i := 0; i < c.limit; i++ { <-c.tickets } }
Wait方法遍历limit,挨个等待tickets返回
dapr的Limiter定义了limit、tickets、numInProgress属性;它定义了Execute、Wait方法,同时提供NewLimiter的工厂方法。