taintManager
的主要功能为:当某个node被打上NoExecute
污点后,其上面的pod如果不能容忍该污点,则taintManager
将会驱逐这些pod,而新建的pod也需要容忍该污点才能调度到该node上;
通过kcm启动参数--enable-taint-manager
来确定是否启动taintManager
,true
时启动(启动参数默认值为true
);
kcm启动参数--feature-gates=TaintBasedEvictions=xxx
,默认值true,配合--enable-taint-manager
共同作用,两者均为true,才会开启污点驱逐;
当node出现NoExecute
污点时,判断node上的pod是否能容忍node的污点,不能容忍的pod,会被立即删除,能容忍所有污点的pod,则等待所有污点的容忍时间里最小值后,pod被删除;
NoExecuteTaintManager
结构体为taintManager
的主要结构体,其主要属性有:
(1)taintEvictionQueue
:不能容忍node上NoExecute
的污点的pod,会被加入到该队列中,然后pod会被删除;
(2)taintedNodes
:记录了每个node的taint;
(3)nodeUpdateQueue
:当node对象发生add、delete、update(新旧node对象的taint不相同)事件时,node会进入该队列;
(4)podUpdateQueue
:当pod对象发生add、delete、update(新旧pod对象的NodeName
或Tolerations
不相同)事件时,pod会进入该队列;
(5)nodeUpdateChannels
:nodeUpdateChannels
即8个nodeUpdateItem
类型的channel
,有worker负责消费nodeUpdateQueue
队列,然后根据node name计算出index,把node放入其中1个nodeUpdateItem
类型的channel
中;
(6)podUpdateChannels
:podUpdateChannels
即8个podUpdateItem
类型的channel
,有worker负责消费podUpdateQueue
队列,然后根据pod的node name计算出index,把pod放入其中1个podUpdateItem
类型的channel
中;
// pkg/controller/nodelifecycle/scheduler/taint_manager.go type NoExecuteTaintManager struct { client clientset.Interface recorder record.EventRecorder getPod GetPodFunc getNode GetNodeFunc getPodsAssignedToNode GetPodsByNodeNameFunc taintEvictionQueue *TimedWorkerQueue // keeps a map from nodeName to all noExecute taints on that Node taintedNodesLock sync.Mutex taintedNodes map[string][]v1.Taint nodeUpdateChannels []chan nodeUpdateItem podUpdateChannels []chan podUpdateItem nodeUpdateQueue workqueue.Interface podUpdateQueue workqueue.Interface }
taintEvictionQueue
属性是一个TimedWorkerQueue
类型的队列,调用tc.taintEvictionQueue.AddWork
,会将pod添加到该队列中,会添加一个定时器,然后到期之后会自动执行workFunc
,初始化taintEvictionQueue
时,传入的workFunc
是deletePodHandler
函数,作用是删除pod;
所以进入taintEvictionQueue
中的pod,会在设置好的时间,被删除;
pod.Spec.Tolerations
配置的是pod的污点容忍信息;
// vendor/k8s.io/api/core/v1/types.go type Toleration struct { Key string `json:"key,omitempty" protobuf:"bytes,1,opt,name=key"` Operator TolerationOperator `json:"operator,omitempty" protobuf:"bytes,2,opt,name=operator,casttype=TolerationOperator"` Value string `json:"value,omitempty" protobuf:"bytes,3,opt,name=value"` Effect TaintEffect `json:"effect,omitempty" protobuf:"bytes,4,opt,name=effect,casttype=TaintEffect"` TolerationSeconds *int64 `json:"tolerationSeconds,omitempty" protobuf:"varint,5,opt,name=tolerationSeconds"` }
Tolerations的属性值解析如下:
(1)Key
:匹配node污点的Key;
(2)Operator
:表示Tolerations中Key与node污点的Key相同时,其Value与node污点的Value的关系,默认值Equal
,代表相等,Exists
则代表Tolerations中Key与node污点的Key相同即可,不用比较其Value值;
(3)Value
:匹配node污点的Value;
(4)Effect
:匹配node污点的Effect;
(5)TolerationSeconds
:node污点容忍时间;
配置示例:
tolerations: - key: "key1" operator: "Equal" value: "value1" effect: "NoExecute" tolerationSeconds: 3600
上述配置表示如果该pod正在运行,同时一个匹配的污点被添加到其所在的node节点上,那么该pod还将继续在节点上运行3600秒,然后会被驱逐(如果在此之前其匹配的node污点被删除了,则该pod不会被驱逐);
NewNodeLifecycleController
为NodeLifecycleController
的初始化函数,里面给taintManager
注册了pod与node的EventHandler
,Add
、Update
、Delete
事件都会调用taintManager
的PodUpdated
或NodeUpdated
方法来做处理;
// pkg/controller/nodelifecycle/node_lifecycle_controller.go func NewNodeLifecycleController( ... podInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ AddFunc: func(obj interface{}) { ... if nc.taintManager != nil { nc.taintManager.PodUpdated(nil, pod) } }, UpdateFunc: func(prev, obj interface{}) { ... if nc.taintManager != nil { nc.taintManager.PodUpdated(prevPod, newPod) } }, DeleteFunc: func(obj interface{}) { ... if nc.taintManager != nil { nc.taintManager.PodUpdated(pod, nil) } }, }) ... if nc.runTaintManager { podGetter := func(name, namespace string) (*v1.Pod, error) { return nc.podLister.Pods(namespace).Get(name) } nodeLister := nodeInformer.Lister() nodeGetter := func(name string) (*v1.Node, error) { return nodeLister.Get(name) } nc.taintManager = scheduler.NewNoExecuteTaintManager(kubeClient, podGetter, nodeGetter, nc.getPodsAssignedToNode) nodeInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ AddFunc: nodeutil.CreateAddNodeHandler(func(node *v1.Node) error { nc.taintManager.NodeUpdated(nil, node) return nil }), UpdateFunc: nodeutil.CreateUpdateNodeHandler(func(oldNode, newNode *v1.Node) error { nc.taintManager.NodeUpdated(oldNode, newNode) return nil }), DeleteFunc: nodeutil.CreateDeleteNodeHandler(func(node *v1.Node) error { nc.taintManager.NodeUpdated(node, nil) return nil }), }) } ... }
tc.NodeUpdated
方法会判断新旧node对象的taint是否相同,不相同则调用tc.nodeUpdateQueue.Add
,将该node放入到nodeUpdateQueue
队列中;
// pkg/controller/nodelifecycle/scheduler/taint_manager.go func (tc *NoExecuteTaintManager) NodeUpdated(oldNode *v1.Node, newNode *v1.Node) { nodeName := "" oldTaints := []v1.Taint{} if oldNode != nil { nodeName = oldNode.Name oldTaints = getNoExecuteTaints(oldNode.Spec.Taints) } newTaints := []v1.Taint{} if newNode != nil { nodeName = newNode.Name newTaints = getNoExecuteTaints(newNode.Spec.Taints) } if oldNode != nil && newNode != nil && helper.Semantic.DeepEqual(oldTaints, newTaints) { return } updateItem := nodeUpdateItem{ nodeName: nodeName, } tc.nodeUpdateQueue.Add(updateItem) }
tc.PodUpdated
方法会判断新旧pod对象的NodeName
或Tolerations
是否相同,不相同则调用tc.podUpdateQueue.Add
,将该pod放入到podUpdateQueue
队列中;
// pkg/controller/nodelifecycle/scheduler/taint_manager.go func (tc *NoExecuteTaintManager) PodUpdated(oldPod *v1.Pod, newPod *v1.Pod) { podName := "" podNamespace := "" nodeName := "" oldTolerations := []v1.Toleration{} if oldPod != nil { podName = oldPod.Name podNamespace = oldPod.Namespace nodeName = oldPod.Spec.NodeName oldTolerations = oldPod.Spec.Tolerations } newTolerations := []v1.Toleration{} if newPod != nil { podName = newPod.Name podNamespace = newPod.Namespace nodeName = newPod.Spec.NodeName newTolerations = newPod.Spec.Tolerations } if oldPod != nil && newPod != nil && helper.Semantic.DeepEqual(oldTolerations, newTolerations) && oldPod.Spec.NodeName == newPod.Spec.NodeName { return } updateItem := podUpdateItem{ podName: podName, podNamespace: podNamespace, nodeName: nodeName, } tc.podUpdateQueue.Add(updateItem) }
看到TaintManager
的初始化方法NewNoExecuteTaintManager
中,调用CreateWorkerQueue
给taintEvictionQueue
做了初始化;
// pkg/controller/nodelifecycle/scheduler/taint_manager.go func NewNoExecuteTaintManager(...) ... { ... tm.taintEvictionQueue = CreateWorkerQueue(deletePodHandler(c, tm.emitPodDeletionEvent)) ... }
CreateWorkerQueue
函数初始化并返回TimedWorkerQueue
结构体;
// pkg/controller/nodelifecycle/scheduler/timed_workers.go func CreateWorkerQueue(f func(args *WorkArgs) error) *TimedWorkerQueue { return &TimedWorkerQueue{ workers: make(map[string]*TimedWorker), workFunc: f, } }
初始化taintEvictionQueue
时传入了deletePodHandler
作为队列中元素的处理方法;deletePodHandler
函数的主要逻辑是请求apiserver,删除pod对象,所以说,被放入到taintEvictionQueue
队列中的pod,会被删除;
// pkg/controller/nodelifecycle/scheduler/taint_manager.go func deletePodHandler(c clientset.Interface, emitEventFunc func(types.NamespacedName)) func(args *WorkArgs) error { return func(args *WorkArgs) error { ns := args.NamespacedName.Namespace name := args.NamespacedName.Name klog.V(0).Infof("NoExecuteTaintManager is deleting Pod: %v", args.NamespacedName.String()) if emitEventFunc != nil { emitEventFunc(args.NamespacedName) } var err error for i := 0; i < retries; i++ { err = c.CoreV1().Pods(ns).Delete(name, &metav1.DeleteOptions{}) if err == nil { break } time.Sleep(10 * time.Millisecond) } return err } }
再来看一下tc.taintEvictionQueue.AddWork
方法,作用是添加pod进入taintEvictionQueue
队列,即调用CreateWorker
给该pod创建一个worker来删除该pod;
// pkg/controller/nodelifecycle/scheduler/timed_workers.go func (q *TimedWorkerQueue) AddWork(args *WorkArgs, createdAt time.Time, fireAt time.Time) { key := args.KeyFromWorkArgs() klog.V(4).Infof("Adding TimedWorkerQueue item %v at %v to be fired at %v", key, createdAt, fireAt) q.Lock() defer q.Unlock() if _, exists := q.workers[key]; exists { klog.Warningf("Trying to add already existing work for %+v. Skipping.", args) return } worker := CreateWorker(args, createdAt, fireAt, q.getWrappedWorkerFunc(key)) q.workers[key] = worker }
CreateWorker
函数会先判断是否应该立即执行workFunc
,是的话立即拉起一个goroutine来执行workFunc
并返回,否则定义一个timer定时器,到时间后自动拉起一个goroutine执行workFunc
;
// pkg/controller/nodelifecycle/scheduler/timed_workers.go func CreateWorker(args *WorkArgs, createdAt time.Time, fireAt time.Time, f func(args *WorkArgs) error) *TimedWorker { delay := fireAt.Sub(createdAt) if delay <= 0 { go f(args) return nil } timer := time.AfterFunc(delay, func() { f(args) }) return &TimedWorker{ WorkItem: args, CreatedAt: createdAt, FireAt: fireAt, Timer: timer, } }
tc.taintEvictionQueue.AddWork
方法,作用是停止对应的pod的timer,即停止执行对应pod的workFunc(不删除pod);
// pkg/controller/nodelifecycle/scheduler/timed_workers.go func (w *TimedWorker) Cancel() { if w != nil { w.Timer.Stop() } }
nc.taintManager.Run
为taintManager
的启动方法,处理逻辑都在这,主要是判断node上的pod是否能容忍node的NoExecute
污点,不能容忍的pod,会被删除,能容忍所有污点的pod,则等待所有污点的容忍时间里最小值后,被删除;
主要逻辑:
(1)创建8个类型为nodeUpdateItem
的channel(缓冲区大小10),并赋值给tc.nodeUpdateChannels
;
创建8个类型为podUpdateItem
的channel(缓冲区大小1),并赋值给podUpdateChannels
;
(2)消费tc.nodeUpdateQueue
队列,根据node name计算hash,将node放入对应的tc.nodeUpdateChannels[hash]
中;
(3)消费tc.podUpdateQueue
队列,根据pod的node name计算hash,将node放入对应的tc.podUpdateChannels[hash]
中;
(4)启动8个goroutine,调用tc.worker
对其中一个tc.nodeUpdateChannels
与tc.podUpdateChannels
做处理,判断node上的pod是否能容忍node的NoExecute
污点,不能容忍的pod,会被删除,能容忍所有污点的pod,则等待所有污点的容忍时间里最小值后,被删除;
// pkg/controller/nodelifecycle/scheduler/taint_manager.go func (tc *NoExecuteTaintManager) Run(stopCh <-chan struct{}) { klog.V(0).Infof("Starting NoExecuteTaintManager") for i := 0; i < UpdateWorkerSize; i++ { tc.nodeUpdateChannels = append(tc.nodeUpdateChannels, make(chan nodeUpdateItem, NodeUpdateChannelSize)) tc.podUpdateChannels = append(tc.podUpdateChannels, make(chan podUpdateItem, podUpdateChannelSize)) } // Functions that are responsible for taking work items out of the workqueues and putting them // into channels. go func(stopCh <-chan struct{}) { for { item, shutdown := tc.nodeUpdateQueue.Get() if shutdown { break } nodeUpdate := item.(nodeUpdateItem) hash := hash(nodeUpdate.nodeName, UpdateWorkerSize) select { case <-stopCh: tc.nodeUpdateQueue.Done(item) return case tc.nodeUpdateChannels[hash] <- nodeUpdate: // tc.nodeUpdateQueue.Done is called by the nodeUpdateChannels worker } } }(stopCh) go func(stopCh <-chan struct{}) { for { item, shutdown := tc.podUpdateQueue.Get() if shutdown { break } // The fact that pods are processed by the same worker as nodes is used to avoid races // between node worker setting tc.taintedNodes and pod worker reading this to decide // whether to delete pod. // It's possible that even without this assumption this code is still correct. podUpdate := item.(podUpdateItem) hash := hash(podUpdate.nodeName, UpdateWorkerSize) select { case <-stopCh: tc.podUpdateQueue.Done(item) return case tc.podUpdateChannels[hash] <- podUpdate: // tc.podUpdateQueue.Done is called by the podUpdateChannels worker } } }(stopCh) wg := sync.WaitGroup{} wg.Add(UpdateWorkerSize) for i := 0; i < UpdateWorkerSize; i++ { go tc.worker(i, wg.Done, stopCh) } wg.Wait() }
tc.worker
方法负责消费nodeUpdateChannels
和podUpdateChannels
,分别调用tc.handleNodeUpdate
和tc.handlePodUpdate
方法做进一步处理;
// pkg/controller/nodelifecycle/scheduler/taint_manager.go func (tc *NoExecuteTaintManager) worker(worker int, done func(), stopCh <-chan struct{}) { defer done() // When processing events we want to prioritize Node updates over Pod updates, // as NodeUpdates that interest NoExecuteTaintManager should be handled as soon as possible - // we don't want user (or system) to wait until PodUpdate queue is drained before it can // start evicting Pods from tainted Nodes. for { select { case <-stopCh: return case nodeUpdate := <-tc.nodeUpdateChannels[worker]: tc.handleNodeUpdate(nodeUpdate) tc.nodeUpdateQueue.Done(nodeUpdate) case podUpdate := <-tc.podUpdateChannels[worker]: // If we found a Pod update we need to empty Node queue first. priority: for { select { case nodeUpdate := <-tc.nodeUpdateChannels[worker]: tc.handleNodeUpdate(nodeUpdate) tc.nodeUpdateQueue.Done(nodeUpdate) default: break priority } } // After Node queue is emptied we process podUpdate. tc.handlePodUpdate(podUpdate) tc.podUpdateQueue.Done(podUpdate) } } }
tc.handleNodeUpdate
方法主要是判断node上的pod是否能容忍node的NoExecute
污点,不能容忍的pod,会被删除,能容忍所有污点的pod,则等待所有污点的容忍时间里最小值后,被删除;
主要逻辑:
(1)从informer本地缓存中获取node对象;
(2)从node.Spec.Taints
中获取NoExecute
的taints
;
(3)将该node的NoExecute
的taints
更新到tc.taintedNodes
中;
(4)调用tc.getPodsAssignedToNode
,获取该node上的所有pod,如果pod数量为0,直接return;
(5)如果node的NoExecute
的taints
数量为0,则遍历该node上所有pod,调用tc.cancelWorkWithEvent
,将该pod从taintEvictionQueue
队列中移除,然后直接return;
(6)遍历该node上所有pod,调用tc.processPodOnNode
,对pod做进一步处理;
// pkg/controller/nodelifecycle/scheduler/taint_manager.go func (tc *NoExecuteTaintManager) handleNodeUpdate(nodeUpdate nodeUpdateItem) { node, err := tc.getNode(nodeUpdate.nodeName) if err != nil { if apierrors.IsNotFound(err) { // Delete klog.V(4).Infof("Noticed node deletion: %#v", nodeUpdate.nodeName) tc.taintedNodesLock.Lock() defer tc.taintedNodesLock.Unlock() delete(tc.taintedNodes, nodeUpdate.nodeName) return } utilruntime.HandleError(fmt.Errorf("cannot get node %s: %v", nodeUpdate.nodeName, err)) return } // Create or Update klog.V(4).Infof("Noticed node update: %#v", nodeUpdate) taints := getNoExecuteTaints(node.Spec.Taints) func() { tc.taintedNodesLock.Lock() defer tc.taintedNodesLock.Unlock() klog.V(4).Infof("Updating known taints on node %v: %v", node.Name, taints) if len(taints) == 0 { delete(tc.taintedNodes, node.Name) } else { tc.taintedNodes[node.Name] = taints } }() // This is critical that we update tc.taintedNodes before we call getPodsAssignedToNode: // getPodsAssignedToNode can be delayed as long as all future updates to pods will call // tc.PodUpdated which will use tc.taintedNodes to potentially delete delayed pods. pods, err := tc.getPodsAssignedToNode(node.Name) if err != nil { klog.Errorf(err.Error()) return } if len(pods) == 0 { return } // Short circuit, to make this controller a bit faster. if len(taints) == 0 { klog.V(4).Infof("All taints were removed from the Node %v. Cancelling all evictions...", node.Name) for i := range pods { tc.cancelWorkWithEvent(types.NamespacedName{Namespace: pods[i].Namespace, Name: pods[i].Name}) } return } now := time.Now() for _, pod := range pods { podNamespacedName := types.NamespacedName{Namespace: pod.Namespace, Name: pod.Name} tc.processPodOnNode(podNamespacedName, node.Name, pod.Spec.Tolerations, taints, now) } }
tc.processPodOnNode
方法主要作用是判断pod是否能容忍node上所有的NoExecute
的污点,如果不能,则将该pod加到taintEvictionQueue
队列中,能容忍所有污点的pod,则等待所有污点的容忍时间里最小值后,加到taintEvictionQueue
队列中;
主要逻辑:
(1)如果node的NoExecute
的taints
数量为0,则调用tc.cancelWorkWithEvent
,将该pod从taintEvictionQueue
队列中移除;
(2)调用v1helper.GetMatchingTolerations
,判断pod是否容忍node上所有的NoExecute
的taints,以及获取能容忍taints的容忍列表;
(3)如果不能容忍所有污点,则调用tc.taintEvictionQueue.AddWork
,将该pod加到taintEvictionQueue
队列中;
(4)如果能容忍所有污点,则等待所有污点的容忍时间里最小值后,再调用tc.taintEvictionQueue.AddWork
,将该pod加到taintEvictionQueue
队列中;
// pkg/controller/nodelifecycle/scheduler/taint_manager.go func (tc *NoExecuteTaintManager) processPodOnNode( podNamespacedName types.NamespacedName, nodeName string, tolerations []v1.Toleration, taints []v1.Taint, now time.Time, ) { if len(taints) == 0 { tc.cancelWorkWithEvent(podNamespacedName) } allTolerated, usedTolerations := v1helper.GetMatchingTolerations(taints, tolerations) if !allTolerated { klog.V(2).Infof("Not all taints are tolerated after update for Pod %v on %v", podNamespacedName.String(), nodeName) // We're canceling scheduled work (if any), as we're going to delete the Pod right away. tc.cancelWorkWithEvent(podNamespacedName) tc.taintEvictionQueue.AddWork(NewWorkArgs(podNamespacedName.Name, podNamespacedName.Namespace), time.Now(), time.Now()) return } minTolerationTime := getMinTolerationTime(usedTolerations) // getMinTolerationTime returns negative value to denote infinite toleration. if minTolerationTime < 0 { klog.V(4).Infof("New tolerations for %v tolerate forever. Scheduled deletion won't be cancelled if already scheduled.", podNamespacedName.String()) return } startTime := now triggerTime := startTime.Add(minTolerationTime) scheduledEviction := tc.taintEvictionQueue.GetWorkerUnsafe(podNamespacedName.String()) if scheduledEviction != nil { startTime = scheduledEviction.CreatedAt if startTime.Add(minTolerationTime).Before(triggerTime) { return } tc.cancelWorkWithEvent(podNamespacedName) } tc.taintEvictionQueue.AddWork(NewWorkArgs(podNamespacedName.Name, podNamespacedName.Namespace), startTime, triggerTime) }
tc.handlePodUpdate
方法最终也是调用了tc.processPodOnNode
对pod做进一步处理;
tc.processPodOnNode
方法在上面已经分析过了,这里不再进行分析;
主要逻辑:
(1)从informer本地缓存中获取pod对象;
(2)获取pod的node name,如果为空,直接return;
(3)根据node name从tc.taintedNodes
中获取node的污点,如果污点为空,直接return;
(4)调用tc.processPodOnNode
对pod做进一步处理;
// pkg/controller/nodelifecycle/scheduler/taint_manager.go func (tc *NoExecuteTaintManager) handlePodUpdate(podUpdate podUpdateItem) { pod, err := tc.getPod(podUpdate.podName, podUpdate.podNamespace) if err != nil { if apierrors.IsNotFound(err) { // Delete podNamespacedName := types.NamespacedName{Namespace: podUpdate.podNamespace, Name: podUpdate.podName} klog.V(4).Infof("Noticed pod deletion: %#v", podNamespacedName) tc.cancelWorkWithEvent(podNamespacedName) return } utilruntime.HandleError(fmt.Errorf("could not get pod %s/%s: %v", podUpdate.podName, podUpdate.podNamespace, err)) return } // We key the workqueue and shard workers by nodeName. If we don't match the current state we should not be the one processing the current object. if pod.Spec.NodeName != podUpdate.nodeName { return } // Create or Update podNamespacedName := types.NamespacedName{Namespace: pod.Namespace, Name: pod.Name} klog.V(4).Infof("Noticed pod update: %#v", podNamespacedName) nodeName := pod.Spec.NodeName if nodeName == "" { return } taints, ok := func() ([]v1.Taint, bool) { tc.taintedNodesLock.Lock() defer tc.taintedNodesLock.Unlock() taints, ok := tc.taintedNodes[nodeName] return taints, ok }() // It's possible that Node was deleted, or Taints were removed before, which triggered // eviction cancelling if it was needed. if !ok { return } tc.processPodOnNode(podNamespacedName, nodeName, pod.Spec.Tolerations, taints, time.Now()) }
taintManager
的主要功能为:当某个node被打上NoExecute
污点后,其上面的pod如果不能容忍该污点,则taintManager
将会驱逐这些pod,而新建的pod也需要容忍该污点才能调度到该node上;
通过kcm启动参数--enable-taint-manager
来确定是否启动taintManager
,true
时启动(启动参数默认值为true
);
kcm启动参数--feature-gates=TaintBasedEvictions=xxx
,默认值true,配合--enable-taint-manager
共同作用,两者均为true,才会开启污点驱逐;
当node出现NoExecute
污点时,判断node上的pod是否能容忍node的污点,不能容忍的pod,会被立即删除,能容忍所有污点的pod,则等待所有污点的容忍时间里最小值后,pod被删除;