
k8s 源码 client-go 系列之 workqueue

今天我们来详细研究下 workqueue 相关代码。client-go 的 util/workqueue 包里主要有三个队列,分别是普通队列,延时队列,限速队列,后一个队列以前一个队列的实现为基础,层层添加新功能,我们按照 Queue、DelayingQueue、RateLimitingQueue 的顺序层层拨开来看限速队列是如何实现的。




type TypedInterface[T comparable] interface {
	Add(item T)
	Len() int
	Get() (item T, shutdown bool)
	Done(item T)
	ShuttingDown() bool


type Typed[t comparable] struct {
	// queue defines the order in which we will work on items. Every
	// element of queue should be in the dirty set and not in the
	// processing set.
	// 定义元素的处理顺序,里面所有元素都应该在 dirty set 中有,而不能出现在 processing set 中
	queue Queue[t]

	// dirty defines all of the items that need to be processed.
	// 标记所有需要被处理的元素
	dirty set[t]

	// Things that are currently being processed are in the processing set.
	// These things may be simultaneously in the dirty set. When we finish
	// processing something and remove it from this set, we'll check if
	// it's in the dirty set, and if so, add it to the queue.
	// 存放正在被处理的元素,可能同时存在于dirty set。 当我们完成处理后,会将其删除,我们会看看他是否在 dirty set 中,如果在,添加到 queue中
	processing set[t]

	// 条件变量,在多个goroutines等待、1个goroutine通知事件发生时使用
	cond *sync.Cond

	shuttingDown bool
	drain        bool

	metrics queueMetrics

	unfinishedWorkUpdatePeriod time.Duration
	clock                      clock.WithTicker

// queue is a slice which implements Queue.
type queue[T comparable] []T


// Add marks item as needing processing.
// 将元素标记成需要处理
func (q *Typed[T]) Add(item T) {
	defer q.cond.L.Unlock()
	// 队列正关闭,则直接返回
	if q.shuttingDown {
	// 已经标记为 dirty 的数据,也直接返回,因为存储在了脏数据的集合中
	if q.dirty.has(item) {
		// the same item is added again before it is processed, call the Touch
		// function if the queue cares about it (for e.g, reset its priority)
		// 相同的元素又被添加进来了,如果queue 关心他,调用 touch,这里需要自己去实现 queue的 touch方法,自带的不会处理
		if !q.processing.has(item) {


	// 添加到脏数据集合中
	// 元素如果正在被处理,那就直接返回
	if q.processing.has(item) {
	// 追加到元素数组的尾部
	// 通知有新元素到了,此时有协程阻塞就会被唤醒


为啥在添加数据的同时要添加到 dirty 脏数据集合中呢,存储在 queue 中不就可以了么?

  • 为了让 queue 中不存在重复的 items,所以加了一个 dirty set,毕竟判断 map 中是否存在某个 key 比判断 slice 中是否存在某个 item 要快得多。
  • 队列中曾经存储过该元素,但是已经被拿走还没有调用 Done() 方法时,也就是正在处理中的元素,此时再添加当前的元素应该是最新的,处理中的应该是过时的,也就是脏的。


Get() 方法尝试从 queue 中获取第一个 item,同时将其加入到 processing set 中,并且从 dirty set中删除。

 func (q *Type) Get() (item interface{}, shutdown bool) {
	defer q.cond.L.Unlock()
	// 如果当前队列中没有数据,并且没有要关闭的状态则阻塞协程
	for len(q.queue) == 0 && !q.shuttingDown {
	// 协程被激活但还没有数据,说明队列被关闭了
	if len(q.queue) == 0 {
		// We must be shutting down.
		return nil, true
	// 从队列中弹出第一个元素
	item = q.queue[0]
	// The underlying array still exists and reference this object, so the object will not be garbage collected.
	q.queue[0] = nil
	q.queue = q.queue[1:]

	// 加入到处理队列中
	 // 同时从dirty集合(需要处理的元素集合)中移除

	return item, false


Done() 方法用来标记一个 item 被处理完成了。调用 Done() 方法的时候,这个 item 被从 processing set 中删除。

 func (q *Type) Done(item interface{}) {
	defer q.cond.L.Unlock()

 	 // 从正在处理的集合中删除元素
	 // 此处判断脏数据集合,如果在处理期间又被添加回去了,则又放到队列中重新处理。
	if q.dirty.has(item) {
		q.queue = append(q.queue, item)
	} else if q.processing.len() == 0 {


// TypedDelayingInterface is an Interface that can Add an item at a later time. This makes it easier to
// requeue items after failures without ending up in a hot-loop.
// DelayingInterface 是一个可以在一个时间后添加一个元素的 Interface。这使得在失败后重新排列元素更容易而不会在热循环中结束。
type TypedDelayingInterface[T comparable] interface {
	// AddAfter adds an item to the workqueue after the indicated duration has passed
	AddAfter(item T, duration time.Duration)


// delayingType wraps an Interface and provides delayed re-enquing
type delayingType[T comparable] struct {

	// clock tracks time for delayed firing
	clock clock.Clock

	// stopCh lets us signal a shutdown to the waiting loop
	stopCh chan struct{}
	// stopOnce guarantees we only signal shutdown a single time
	stopOnce sync.Once

	// heartbeat ensures we wait no more than maxWait before firing
	heartbeat clock.Ticker

	// waitingForAddCh is a buffered channel that feeds waitingForAdd
	// 所有延迟添加的元素封装成 waitFor 放到缓冲队列中
	waitingForAddCh chan *waitFor

	// metrics counts the number of retries
	metrics retryMetrics
// waitFor holds the data to add and the time it should be added
type waitFor struct {
	data    t
	readyAt time.Time
	// index in the priority queue (heap)
	index int

在这个基础上还定义了一个 waitForPriorityQueue,用来实现 waitFor 元素的优先级队列,把需要延迟的元素形成了一个队列,按照元素的延时添加的时间(readyAt)从小到大排序。

这里我们只需要知道 waitForPriorityQueue 是一个有序的 slice,排序方式是按照时间从小到大排序的,根据 heap.Interface 的定义,我们需要实现 LenLessSwapPushPop 这几个方法:

// waitForPriorityQueue implements a priority queue for waitFor items.
// waitForPriorityQueue implements heap.Interface. The item occurring next in
// time (i.e., the item with the smallest readyAt) is at the root (index 0).
// Peek returns this minimum item at index 0. Pop returns the minimum item after
// it has been removed from the queue and placed at index Len()-1 by
// container/heap. Push adds an item at index Len(), and container/heap
// percolates it into the correct location.
type waitForPriorityQueue []*waitFor

func (pq waitForPriorityQueue) Len() int {
	return len(pq)
func (pq waitForPriorityQueue) Less(i, j int) bool {
	return pq[i].readyAt.Before(pq[j].readyAt)
func (pq waitForPriorityQueue) Swap(i, j int) {
	pq[i], pq[j] = pq[j], pq[i]
	pq[i].index = i
	pq[j].index = j

// Push adds an item to the queue. Push should not be called directly; instead,
// use `heap.Push`.
func (pq *waitForPriorityQueue) Push(x interface{}) {
	n := len(*pq)
	item := x.(*waitFor)
	item.index = n
	*pq = append(*pq, item)

// Pop removes an item from the queue. Pop should not be called directly;
// instead, use `heap.Pop`.
func (pq *waitForPriorityQueue) Pop() interface{} {
	n := len(*pq)
	item := (*pq)[n-1]
	item.index = -1
	*pq = (*pq)[0:(n - 1)]
	return item

因为延时队列利用 waitForPriorityQueue 队列管理所有延时添加的元素,所有的元素在 waitForPriorityQueue 中按照时间从效到大排序,这样延时队列的处理就会方便很多了。


// AddAfter adds the given item to the work queue after the given delay
func (q *delayingType[T]) AddAfter(item T, duration time.Duration) {
	// don't add if we're already shutting down
	if q.ShuttingDown() {


	// immediately add things with no delay
	if duration <= 0 {

	select {
	case <-q.stopCh:
		// unblock if ShutDown() is called
	case q.waitingForAddCh <- &waitFor{data: item, readyAt: q.clock.Now().Add(duration)}:

AddAfter() 就是简单把元素送到 channel 中,所以核心实现是从 channel 中获取数据的部分,如下所示:

// waitingLoop runs until the workqueue is shutdown and keeps a check on the list of items to be added.
func (q *delayingType[T]) waitingLoop() {
	defer utilruntime.HandleCrash()

	// Make a placeholder channel to use when there are no items in our list
	never := make(<-chan time.Time)

	// Make a timer that expires when the item at the head of the waiting queue is ready
	var nextReadyAtTimer clock.Timer
 	// 初始化上面的有序队列
	waitingForQueue := &waitForPriorityQueue{}
  	 // 这个map用来避免重复添加,如果重复添加则只更新时间即可
	waitingEntryByData := map[t]*waitFor{}

	for {
	 	// 队列关闭了则直接返回
		if q.TypedInterface.ShuttingDown() {

		now := q.clock.Now()

		// Add ready entries
		// 判断有序队列中是否有元素
		for waitingForQueue.Len() > 0 {
		  	// 获得有序队列中的第一个元素
			entry := waitingForQueue.Peek().(*waitFor)
			 // 元素指定的时间是否过了?没有的话就跳出循环
			if entry.readyAt.After(now) {
			 // 如果时间已经过了,那就从有序队列中拿出来放入通用队列中
			entry = heap.Pop(waitingForQueue).(*waitFor)
			delete(waitingEntryByData, entry.data)

		// Set up a wait for the first item's readyAt (if one exists)
		nextReadyAt := never
		 // 如果有序队列中有元素,那就用第一个元素指定的时间减去当前时间作为等待时间
		if waitingForQueue.Len() > 0 {
			if nextReadyAtTimer != nil {
			entry := waitingForQueue.Peek().(*waitFor)
			nextReadyAtTimer = q.clock.NewTimer(entry.readyAt.Sub(now))
			nextReadyAt = nextReadyAtTimer.C()
		// 进入各种等待
		select {
		case <-q.stopCh:

		case <-q.heartbeat.C():
			// continue the loop, which will add ready items
		 // 这个就是有序队列里面需要等待时间的信号,时间到就会有信号
		case <-nextReadyAt:
			// continue the loop, which will add ready items
		// 这里是从channel中获取元素,AddAfter()放入到channel中的元素
		case waitEntry := <-q.waitingForAddCh:
			// 时间没有过就插入到有序队列中
			if waitEntry.readyAt.After(q.clock.Now()) {
				insert(waitingForQueue, waitingEntryByData, waitEntry)
			} else {
				// 如果时间已经过了就直接放入通用队列

			drained := false
			for !drained {
				select {
				case waitEntry := <-q.waitingForAddCh:
					if waitEntry.readyAt.After(q.clock.Now()) {
						insert(waitingForQueue, waitingEntryByData, waitEntry)
					} else {
					drained = true



type TypedRateLimiter[T comparable] interface {
	// When gets an item and gets to decide how long that item should wait
	// 获取item元素应该等待多长时间
	When(item T) time.Duration
	// Forget indicates that an item is finished being retried.  Doesn't matter whether it's for failing
	// or for success, we'll stop tracking it
	 // 表示元素已经完成了重试,不管是成功还是失败都会停止跟踪,也就是抛弃该元素
	Forget(item T)
	// NumRequeues returns back how many failures the item has had
	// 返回元素失败的次数(也就是放入队列的次数)
	NumRequeues(item T) int

1.TypedBucketRateLimiter限速器是利用 golang.org/x/time/rate 包中的 Limiter 来实现稳定速率(qps)的限速器,对应的结构体如下所示:

// TypedBucketRateLimiter adapts a standard bucket to the workqueue ratelimiter API
type TypedBucketRateLimiter[T comparable] struct {
func (r *TypedBucketRateLimiter[T]) When(item T) time.Duration {
	return r.Limiter.Reserve().Delay()

func (r *TypedBucketRateLimiter[T]) NumRequeues(item T) int {
	return 0

func (r *TypedBucketRateLimiter[T]) Forget(item T) {


2.TypedItemExponentialFailureRateLimiter 是比较常用的限速器,他会根据元素错误次数逐渐累加等待时间,定义如下所示:

// TypedItemExponentialFailureRateLimiter does a simple baseDelay*2^<num-failures> limit
// dealing with max failures and expiration are up to the caller
type TypedItemExponentialFailureRateLimiter[T comparable] struct {
	failuresLock sync.Mutex
	failures     map[T]int

	baseDelay time.Duration
	maxDelay  time.Duration

func (r *TypedItemExponentialFailureRateLimiter[T]) When(item T) time.Duration {
	defer r.failuresLock.Unlock()

	exp := r.failures[item]
	r.failures[item] = r.failures[item] + 1

	// The backoff is capped such that 'calculated' value never overflows.
	backoff := float64(r.baseDelay.Nanoseconds()) * math.Pow(2, float64(exp))
	if backoff > math.MaxInt64 {
		return r.maxDelay

	calculated := time.Duration(backoff)
	if calculated > r.maxDelay {
		return r.maxDelay

	return calculated

func (r *TypedItemExponentialFailureRateLimiter[T]) NumRequeues(item T) int {
	defer r.failuresLock.Unlock()

	return r.failures[item]

func (r *TypedItemExponentialFailureRateLimiter[T]) Forget(item T) {
	defer r.failuresLock.Unlock()

	delete(r.failures, item)


3.TypedItemFastSlowRateLimiter 和上面的指数级限速器很像,都是用于错误尝试的,但是二者的限速策略不同,ItemFastSlowRateLimiter 是尝试次数超过阈值后用长延迟,否则用短延迟,具体的实现如下所示:

// TypedItemFastSlowRateLimiter does a quick retry for a certain number of attempts, then a slow retry after that
type TypedItemFastSlowRateLimiter[T comparable] struct {
    failuresLock sync.Mutex
    failures     map[T]int

    maxFastAttempts int
    fastDelay       time.Duration
    slowDelay       time.Duration

func (r *TypedItemFastSlowRateLimiter[T]) When(item T) time.Duration {
	defer r.failuresLock.Unlock()

	r.failures[item] = r.failures[item] + 1

	if r.failures[item] <= r.maxFastAttempts {
		return r.fastDelay

	return r.slowDelay

func (r *TypedItemFastSlowRateLimiter[T]) NumRequeues(item T) int {
	defer r.failuresLock.Unlock()

	return r.failures[item]

func (r *TypedItemFastSlowRateLimiter[T]) Forget(item T) {
	defer r.failuresLock.Unlock()

	delete(r.failures, item)



4.TypedMaxOfRateLimiter 限速器内部是一个限速器 slice,每次返回所有限速器里面延迟最大的一个限速器,具体的实现如下所示:

// TypedMaxOfRateLimiter calls every RateLimiter and returns the worst case response
// When used with a token bucket limiter, the burst could be apparently exceeded in cases where particular items
// were separately delayed a longer time.
type TypedMaxOfRateLimiter[T comparable] struct {
	limiters []TypedRateLimiter[T]

// client-go/util/workqueue/default_rate_limiters.go
func (r *MaxOfRateLimiter) When(item interface{}) time.Duration {
    ret := time.Duration(0)
    // 获取所有限速器里面时间最大的
    for _, limiter := range r.limiters {
        curr := limiter.When(item)
        if curr > ret {
            ret = curr
    return ret

func (r *MaxOfRateLimiter) NumRequeues(item interface{}) int {
    ret := 0
    // 同样获取所有限速器里面最大的 Requeue 次数
    for _, limiter := range r.limiters {
        curr := limiter.NumRequeues(item)
        if curr > ret {
            ret = curr
    return ret

func (r *MaxOfRateLimiter) Forget(item interface{}) {
    // 调用所有的限速器的 Forget 方法
    for _, limiter := range r.limiters {


// TypedWithMaxWaitRateLimiter have maxDelay which avoids waiting too long
type TypedWithMaxWaitRateLimiter[T comparable] struct {
	limiter  TypedRateLimiter[T]
	maxDelay time.Duration

func (w TypedWithMaxWaitRateLimiter[T]) When(item T) time.Duration {
	delay := w.limiter.When(item)
	if delay > w.maxDelay {
		return w.maxDelay

	return delay

func (w TypedWithMaxWaitRateLimiter[T]) Forget(item T) {

func (w TypedWithMaxWaitRateLimiter[T]) NumRequeues(item T) int {
	return w.limiter.NumRequeues(item)

