1 概述:
1.1 环境
版本信息如下:
a、操作系统:centos 7.6
c、kubernetes版本:v1.15.0
1.2 NodeLifeController原理概述
节点上的kubelet程序会定期更新kube-apiserver中node lease或者node status数据,kube-controller-manager中的NodeLifeController会监听这些信息变化,如果一个node很长时间都没有在kube-apiserver中进行状态更新,NodeLifeController(作为控制面板的一个组成部分)可认为node(作为数据面板的一部分)发生异常了而需要进行一些容错处理,容错处理可以有:
1)将该node上面的pod进行一定速率地驱逐(即删除),最终kube-controller-manager中的其他控制器会在其他node上重建pod。
2)让调度器不要将新pod调度到异常的节点上。
3)k8s service流量不应该流向异常node上的pod。
微观地说,NodeLifeController做了以下几件事:
1)如果节点"很久"未上传心跳给kube-apiserver,NodeLifeController会把节点的status字段的condition字段的值设置为Unknown,还会把节点上的pod的status字段中的Ready Contition的值设置为false(如此一来k8s service的流量就不会流向这些pod)。
2)为node添加noscheduler类型的taint(如此一来可影响调度器的调度工作)。
3)为node添加NoExecute类型的taint(可看成工作负载转移的前置工作)。
4)如果开启TaintBasedEvictions特性,则驱逐带NoExecute类型的taint的节点上的那些不忍受NoExecute类型的taint的pod(将工作负载进行转移)。
5)如果开启TaintBasedEvictions特性,则直接执行驱逐pod的工作(忽略daemonSet pod)。
6)为zone设置驱逐速率(其实是设置限速队列中的限流器,每一个zone就是一个限速队列)。
1.3 NodeLifeController重要的feature-gates
1)TaintBasedEvictions:
Beta版本在v1.13 ,默认为 true。开启此功能,当 node 处于 NodeNotReady、NodeUnreachable 状态时,NodeLifeController为node添加NoExecute类型的污点。
2)TaintNodesByCondition:
Beta版本在 v1.12,默认为 true。当节点处于MemoryPressure、PIDPressure、DiskPressure、NetworkUnavailable状态时,NodeLifeController为节点添加NoSchedule类型的污点。
3 源码简析:
分析NodeLifeController,重点是看启动了哪些协程,有哪些队列,哪些管道,哪些重要的结构体等。代码是清晰易懂的。
3.1 NodeLifeController启动函数Run(stopCh <-chan struct{})
典型的套路,等待informer缓存同步完成,启动各种协程。可发现,重点的属性是:nc.taintManager、nc.doNodeProcessingPassWorker、nc.doNoExecuteTaintingPass、nc.doEvictionPass、nc.monitorNodeHealth等。
func (nc *Controller) Run(stopCh <-chan struct{}) {
defer utilruntime.HandleCrash()
defer klog.Infof("Shutting down node controller")
// 等待informer的本地缓存同步完成
if !cache.WaitForNamedCacheSync("taint", stopCh, nc.leaseInformerSynced, nc.nodeInformerSynced, nc.podInformerSynced, nc.daemonSetInformerSynced) {
return
}
// 1、启动taintManager,用于驱逐带NoExecute类型的taint的节点上的不忍受NoExecute类型的taint的pod
if nc.runTaintManager {
go nc.taintManager.Run(stopCh)
}
defer nc.nodeUpdateQueue.ShutDown()
// 2、执行 nc.doNodeProcessingPassWorker,用于为node添加noscheduler类型的taint,从而影响调度器的调度
for i := 0; i < scheduler.UpdateWorkerSize; i++ {
go wait.Until(nc.doNodeProcessingPassWorker, time.Second, stopCh)
}
// 3、TaintBasedEvictions特性进入不同的业务逻辑
if nc.useTaintBasedEvictions {
// 3.1、为node添加noscheduler类型的taint(后续的真正的驱逐工作是交给taintManager)
go wait.Until(nc.doNoExecuteTaintingPass, scheduler.NodeEvictionPeriod, stopCh)
} else {
// 3.2、直接驱逐pod,会忽略daemon pod。
go wait.Until(nc.doEvictionPass, scheduler.NodeEvictionPeriod, stopCh)
}
// 4、如果节点"很久"未上传心跳给kube-apiserver,nodelifecontroller会把节点的status字段的condition字段的值设置为Unknown,还会把节点上的pod的status字段中的ready contition的值设置为false(如此一来k8s service的流量就不会流向这些pod)
// 还为zone设置驱逐速率(其实是设置限速队列中的限流器,每一个zone就是一个限速队列)
go wait.Until(func() {
if err := nc.monitorNodeHealth(); err != nil {
klog.Errorf("Error monitoring node health: %v", err)
}
}, nc.nodeMonitorPeriod, stopCh)
<-stopCh
}
3.2 nc.monitorNodeHealth()
1)监听node对象的状态,如果如果节点"很久"未上传心跳给kube-apiserver,NodeLifeController会把节点的status字段的condition字段的值设置为Unknown,还会把节点上的pod的status字段中的ready contition的值设置为false(如此一来k8s service的流量就不会流向这些pod)
2)TaintBasedEvictions的特性开启了,则NodeLifeController为NotReady或UnReachable的节点打上污点。
3)如果TaintBasedEvictions的特性没开启了,则NodeLifeController直接驱逐NotReady或UnReachable的节点上的pod。
4)NodeLifeController还为zone设置驱逐速率(其实是设置限速队列中的限流器,每一个zone就是一个限速队列)。node属于哪个zone,和node的label有关。其他协程从限速队列中取出node名称,来增加或删除污点,从而限制了污点变化的速率,最终影响驱逐的速率。
func (nc *Controller) monitorNodeHealth() error {
// We are listing nodes from local cache as we can tolerate some small delays
// comparing to state from etcd and there is eventual consistency anyway.
// 从informer的本地缓存中获取所有节点对象nodes
nodes, err := nc.nodeLister.List(labels.Everything())
if err != nil {
return err
}
// 将节点对象nodes进行分类
added, deleted, newZoneRepresentatives := nc.classifyNodes(nodes)
for i := range newZoneRepresentatives {
// 根据node得到zone的值,再填充nc.zoneStates和nc.zonePodEvictor和nc.zoneNoExecuteTainter这三个map
nc.addPodEvictorForNewZone(newZoneRepresentatives[i])
}
for i := range added {
nc.knownNodeSet[added[i].Name] = added[i]
// 根据node得到zone的值,再填充nc.zoneStates和nc.zonePodEvictor和nc.zoneNoExecuteTainter这三个map
nc.addPodEvictorForNewZone(added[i])
if nc.useTaintBasedEvictions {
// 删除node上的NotReady和UnReachable的污点
nc.markNodeAsReachable(added[i])
} else {
// 停止驱逐节点上的pod
nc.cancelPodEviction(added[i])
}
}
for i := range deleted {
delete(nc.knownNodeSet, deleted[i].Name)
}
zoneToNodeConditions := map[string][]*v1.NodeCondition{}
// 遍历每一个node对象
for i := range nodes {
var gracePeriod time.Duration
// observedReadyCondition为node对象的状态
var observedReadyCondition v1.NodeCondition
// currentReadyCondition可能和observedReadyCondition相同,但也有可能在nc.tryUpdateNodeHealth(...)中被修正一些数据
var currentReadyCondition *v1.NodeCondition
// 做一个拷贝,因为下面的nc.tryUpdateNodeHealth(node)有可能修改入参的内容
node := nodes[i].DeepCopy()
if err := wait.PollImmediate(retrySleepTime, retrySleepTime*scheduler.NodeHealthUpdateRetry, func() (bool, error) {
gracePeriod, observedReadyCondition, currentReadyCondition, err = nc.tryUpdateNodeHealth(node)
if err == nil {
return true, nil
}
name := node.Name
node, err = nc.kubeClient.CoreV1().Nodes().Get(name, metav1.GetOptions{})
if err != nil {
klog.Errorf("Failed while getting a Node to retry updating node health. Probably Node %s was deleted.", name)
return false, err
}
return false, nil
}); err != nil {
klog.Errorf("Update health of Node '%v' from Controller error: %v. "+
"Skipping - no pods will be evicted.", node.Name, err)
continue
}
// We do not treat a master node as a part of the cluster for network disruption checking.
if !system.IsMasterNode(node.Name) {
zoneToNodeConditions[utilnode.GetZoneKey(node)] = append(zoneToNodeConditions[utilnode.GetZoneKey(node)], currentReadyCondition)
}
decisionTimestamp := nc.now()
// currentReadyCondition不为nil才进入下面的业务逻辑
if currentReadyCondition != nil {
// 1)节点状态observedReadyCondition分别是False的情景
if observedReadyCondition.Status == v1.ConditionFalse {
if nc.useTaintBasedEvictions {
// We want to update the taint straight away if Node is already tainted with the UnreachableTaint
if taintutils.TaintExists(node.Spec.Taints, UnreachableTaintTemplate) {
taintToAdd := *NotReadyTaintTemplate
if !nodeutil.SwapNodeControllerTaint(nc.kubeClient, []*v1.Taint{&taintToAdd}, []*v1.Taint{UnreachableTaintTemplate}, node) {
klog.Errorf("Failed to instantly swap UnreachableTaint to NotReadyTaint. Will try again in the next cycle.")
}
} else if nc.markNodeForTainting(node) { //把节点塞进nc.zoneNoExecuteTainter中即可,后续有其他协程来执行驱逐pod的操作
// 打印日志
}
} else {
if decisionTimestamp.After(nc.nodeHealthMap[node.Name].readyTransitionTimestamp.Add(nc.podEvictionTimeout)) {
if nc.evictPods(node) { // 直接驱逐pod
// 打印日志
}
}
}
}
// 2)节点状态observedReadyCondition分别是Unknown的情景
if observedReadyCondition.Status == v1.ConditionUnknown {
if nc.useTaintBasedEvictions {
// 如果节点已具备NotReady的污点,则访问kube-apiserver修改节点的污点
if taintutils.TaintExists(node.Spec.Taints, NotReadyTaintTemplate) {
taintToAdd := *UnreachableTaintTemplate
// 删除NotReady的污点,新增Unreachable的污点
if !nodeutil.SwapNodeControllerTaint(nc.kubeClient, []*v1.Taint{&taintToAdd}, []*v1.Taint{NotReadyTaintTemplate}, node) {
// 打印日志
}
} else if nc.markNodeForTainting(node) { // 把节点塞进nc.zoneNoExecuteTainter中即可,后续有其他协程来执行驱逐pod的操作
// 打印日志
}
} else {
if decisionTimestamp.After(nc.nodeHealthMap[node.Name].probeTimestamp.Add(nc.podEvictionTimeout)) {
if nc.evictPods(node) { // 直接驱逐节点上的pod
// 打印日志
}
}
}
}
// 3)节点状态observedReadyCondition是true的情景
// 节点处于Ready状态,则去掉污点或停止驱逐pod
if observedReadyCondition.Status == v1.ConditionTrue {
if nc.useTaintBasedEvictions {
// 开启了TaintBasedEviction,则调用kube-apiserver接口为node删除污点
removed, err := nc.markNodeAsReachable(node)
} else {
// 停止驱逐pod
if nc.cancelPodEviction(node) {
// 打印日志
}
}
}
// 4)node从Ready转为NotReady状态,调用nodeutil.MarkAllPodsNotReady()把节点上的pod的status字段中的ready contition的值设置为false(如此一来k8s service的流量就不会流向这些pod)
// currentReadyCondition和observedReadyCondition在数据上不一致,说明currentReadyCondition是被修正过了
if currentReadyCondition.Status != v1.ConditionTrue && observedReadyCondition.Status == v1.ConditionTrue {
// 记录节点的状态发生了变更
nodeutil.RecordNodeStatusChange(nc.recorder, node, "NodeNotReady")
// 把节点上的pod的status字段中的ready contition的值设置为false
if err = nodeutil.MarkAllPodsNotReady(nc.kubeClient, node); err != nil {
utilruntime.HandleError(fmt.Errorf("Unable to mark all pods NotReady on node %v: %v", node.Name, err))
}
}
}
}
// 为zone设置驱逐速率(其实是设置限速队列中的限流器,每一个zone就是一个限速队列)
nc.handleDisruption(zoneToNodeConditions, nodes)
return nil
}
3.3 nc.tryUpdateNodeHealth()
它只被monitorNodeHealth( )方法调用,nc.monitorNodeHealth( )根据nc.tryUpdateNodeHealth( )的返回值来进入不同的业务代码。
大致业务逻辑是:
1)入参node是指针,在方法中有机会修改它的status字段的内容。
2)更新nc.nodeHealthMap:
savedNodeHealth对象可来自nc.nodeHealthMap,也可针对各种情景构建出来
nc.nodeHealthMap[node.Name] = savedNodeHealth
4)本地缓存的probeTimestamp字段"很久"未更新(其实是kube-apiserver中节点的心跳信息很久未改变),则
{
将返回参数currentReadyCondition设置为unknown值
修改入参node的status字段的内容为unknown值
更新nc.nodeHealthMap:nc.nodeHealthMap[node.Name] = &nodeHealthData{status: &node.Status...}
如果把入参node对象的status内容和kube-apiserver中的信息不一致,把入参node对象的status内容更新至kube-apiserver中:nc.kubeClient.CoreV1().Nodes().UpdateStatus(node)
}
// 作用1):更新本地nc.nodeHealthMap
// 作用2):有机会将在kube-apiserver中将node的status字段的condition的值设置为Unknown
// 作用3):返回的是入参节点对象node在kube-apiserver中的condition、本地缓存中的condition以及err
func (nc *Controller) tryUpdateNodeHealth(node *v1.Node) (time.Duration, v1.NodeCondition, *v1.NodeCondition, error) {
var err error
var gracePeriod time.Duration
var observedReadyCondition v1.NodeCondition
// currentReadyCondition为入参node对象的status对象的readyCondition的指针
_, currentReadyCondition := nodeutil.GetNodeCondition(&node.Status, v1.NodeReady)
if currentReadyCondition == nil {
// 进入这里,说明kubelet (or nodecontroller) 从未上传node status信息到kube-apiserver中。创建一个假的ready condition。
observedReadyCondition = v1.NodeCondition{
Type: v1.NodeReady,
Status: v1.ConditionUnknown,
LastHeartbeatTime: node.CreationTimestamp,
LastTransitionTime: node.CreationTimestamp,
}
gracePeriod = nc.nodeStartupGracePeriod
if _, found := nc.nodeHealthMap[node.Name]; found {
nc.nodeHealthMap[node.Name].status = &node.Status
} else {
nc.nodeHealthMap[node.Name] = &nodeHealthData{
status: &node.Status,
probeTimestamp: node.CreationTimestamp,
readyTransitionTimestamp: node.CreationTimestamp,
}
}
} else {
// observedReadyCondition是作为不变的副本而存在,用于后续的判断和最终的对比。
observedReadyCondition = *currentReadyCondition
gracePeriod = nc.nodeMonitorGracePeriod
}
savedNodeHealth, found := nc.nodeHealthMap[node.Name]
var savedCondition *v1.NodeCondition
var savedLease *coordv1beta1.Lease
if found {
_, savedCondition = nodeutil.GetNodeCondition(savedNodeHealth.status, v1.NodeReady)
savedLease = savedNodeHealth.lease
}
/*
针对各种情景(判断savedCondition和observedCondition),让savedNodeHealth指向新的对象
*/
var observedLease *coordv1beta1.Lease
// 开启了nodeLease特性的话
if utilfeature.DefaultFeatureGate.Enabled(features.NodeLease) {
// 获取node lease信息,以及更新savedNodeHealth对象的probeTimestamp时间
observedLease, _ = nc.leaseLister.Leases(v1.NamespaceNodeLease).Get(node.Name)
if observedLease != nil && (savedLease == nil || savedLease.Spec.RenewTime.Before(observedLease.Spec.RenewTime)) {
savedNodeHealth.lease = observedLease
savedNodeHealth.probeTimestamp = nc.now()
}
}
// 把savedNodeHealth塞回nc.nodeHealthMap这个map中,因为savedNodeHealth可能指向了新的对象
nc.nodeHealthMap[node.Name] = savedNodeHealth
// 当本地缓存的probeTimestamp字段"很久"未更新(其实是kube-apiserver中节点的心跳信息很久未改变)
/*
将返回参数currentReadyCondition(这其实也是入参node对象的status字段的ReadyCondition)设置为unknown值
修改入参node的status字段(NodeMemoryPressure、NodeDiskPressure、NodePIDPressure)的值为unknown值
更新nc.nodeHealthMap:nc.nodeHealthMap[node.Name] = &nodeHealthData{status: &node.Status...}
如果把入参node对象的status内容和kube-apiserver中的信息不一致,把入参node对象的status内容更新至kube-apiserver中:nc.kubeClient.CoreV1().Nodes().UpdateStatus(node)
*/
if nc.now().After(savedNodeHealth.probeTimestamp.Add(gracePeriod)) {
if currentReadyCondition == nil {
klog.V(2).Infof("node %v is never updated by kubelet", node.Name)
node.Status.Conditions = append(node.Status.Conditions, v1.NodeCondition{
Type: v1.NodeReady,
Status: v1.ConditionUnknown,
Reason: "NodeStatusNeverUpdated",
Message: fmt.Sprintf("Kubelet never posted node status."),
LastHeartbeatTime: node.CreationTimestamp,
LastTransitionTime: nc.now(),
})
} else {
// 1)将返回参数currentReadyCondition设置为unknown值,这其实也是入参node对象的status字段的ReadyCondition
if observedReadyCondition.Status != v1.ConditionUnknown {
currentReadyCondition.Status = v1.ConditionUnknown
currentReadyCondition.Reason = "NodeStatusUnknown"
currentReadyCondition.Message = "Kubelet stopped posting node status."
currentReadyCondition.LastHeartbeatTime = observedReadyCondition.LastHeartbeatTime
currentReadyCondition.LastTransitionTime = nc.now()
}
}
// 2)将入参node的status字段(NodeMemoryPressure、v1.NodeDiskPressure、v1.NodePIDPressure)的值为unknown
remainingNodeConditionTypes := []v1.NodeConditionType{
v1.NodeMemoryPressure,
v1.NodeDiskPressure,
v1.NodePIDPressure,
}
nowTimestamp := nc.now()
for _, nodeConditionType := range remainingNodeConditionTypes {
_, currentCondition := nodeutil.GetNodeCondition(&node.Status, nodeConditionType)
if currentCondition == nil {
node.Status.Conditions = append(node.Status.Conditions, v1.NodeCondition{
Type: nodeConditionType,
Status: v1.ConditionUnknown,
Reason: "NodeStatusNeverUpdated",
Message: "Kubelet never posted node status.",
LastHeartbeatTime: node.CreationTimestamp,
LastTransitionTime: nowTimestamp,
})
} else {
if currentCondition.Status != v1.ConditionUnknown {
currentCondition.Status = v1.ConditionUnknown
currentCondition.Reason = "NodeStatusUnknown"
currentCondition.Message = "Kubelet stopped posting node status."
currentCondition.LastTransitionTime = nowTimestamp
}
}
}
/*
如果把入参node对象的status内容和kube-apiserver中的信息不一致,把入参node对象的status内容更新至kube-apiserver中:nc.kubeClient.CoreV1().Nodes().UpdateStatus(node)
更新nc.nodeHealthMap:nc.nodeHealthMap[node.Name] = &nodeHealthData{status: &node.Status...}
*/
_, currentCondition := nodeutil.GetNodeCondition(&node.Status, v1.NodeReady)
// node的status信息可能被修改,再次和函数开头保存的observedReadyCondition进行比较
if !apiequality.Semantic.DeepEqual(currentCondition, &observedReadyCondition) {
// 3)把入参node对象的status内容更新至kube-apiserver中
if _, err = nc.kubeClient.CoreV1().Nodes().UpdateStatus(node); err != nil {
klog.Errorf("Error updating node %s: %v", node.Name, err)
return gracePeriod, observedReadyCondition, currentReadyCondition, err
}
// 4)更新nc.nodeHealthMap
nc.nodeHealthMap[node.Name] = &nodeHealthData{
status: &node.Status,
probeTimestamp: nc.nodeHealthMap[node.Name].probeTimestamp,
readyTransitionTimestamp: nc.now(),
lease: observedLease,
}
return gracePeriod, observedReadyCondition, currentReadyCondition, nil
}
}
return gracePeriod, observedReadyCondition, currentReadyCondition, err
}
3.4 doNoExecuteTaintingPass()
从nc.zoneNoExecuteTainter这个限速队列中取出node,以node的NodeReadyCondition为依据添加或移除NoExecute类型的taint。
func (nc *Controller) doNoExecuteTaintingPass() {
nc.evictorLock.Lock()
defer nc.evictorLock.Unlock()
// 遍历map,map的key是zone的名称,value是一个限速队列
for k := range nc.zoneNoExecuteTainter {
// Function should return 'false' and a time after which it should be retried, or 'true' if it shouldn't (it succeeded).
// 从限速队列中拿出元素并执行目标方法,执行目标方法失败会进行重试
nc.zoneNoExecuteTainter[k].Try(func(value scheduler.TimedValue) (bool, time.Duration) {
// value.Value就是节点名称,根据节点名称从本地缓存中获得node对象
node, err := nc.nodeLister.Get(value.Value)
if apierrors.IsNotFound(err) {
// 说明节点被删除,返回true表示不重试
return true, 0
} else if err != nil {
klog.Warningf("Failed to get Node %v from the nodeLister: %v", value.Value, err)
// 50毫秒后进行重试
return false, 50 * time.Millisecond
}
// 取出刚刚获取的node对象的status字段中的NodeReady Condition
_, condition := nodeutil.GetNodeCondition(&node.Status, v1.NodeReady)
// Because we want to mimic NodeStatus.Condition["Ready"] we make "unreachable" and "not ready" taints mutually exclusive.
taintToAdd := v1.Taint{}
oppositeTaint := v1.Taint{}
// 根据Condition的值,为taintToAdd和oppositeTaint进行赋值。"UnReachable"和"NotReady"两个污点是互斥的,只能存在一个。
switch condition.Status {
case v1.ConditionFalse:
taintToAdd = *NotReadyTaintTemplate
oppositeTaint = *UnreachableTaintTemplate
case v1.ConditionUnknown:
taintToAdd = *UnreachableTaintTemplate
oppositeTaint = *NotReadyTaintTemplate
default:
// 此时节点是ready状态,因此忽略本次打taint的操作。
return true, 0
}
// 使用nc.kubeClient访问kube-apiserver,为入参node增加指定的taint和删除指定的taint
result := nodeutil.SwapNodeControllerTaint(nc.kubeClient, []*v1.Taint{&taintToAdd}, []*v1.Taint{&oppositeTaint}, node)
if result {
// 更新指标:数量+1。
zone := utilnode.GetZoneKey(node)
evictionsNumber.WithLabelValues(zone).Inc()
}
return result, 0
})
}
}
3.5 nc.doEvictionPass
nc.zonePodEvictor是一个map,key是zone的名称,value是一个队列(队列中保存node的名称)。NodeLifeController直接访问kube-apiserver删除位于队列中的节点的pod(忽略daemonset pod)。
func (nc *Controller) doEvictionPass() {
// 加锁,因为map是非并发安全的
nc.evictorLock.Lock()
defer nc.evictorLock.Unlock()
// 遍历每一个zone,k就是zone的名称
for k := range nc.zonePodEvictor {
// 从队列中驱逐元素(其实是节点名称),访问kube-apiserver进行删除pod(会忽略daemonset pod)
nc.zonePodEvictor[k].Try(func(value scheduler.TimedValue) (bool, time.Duration) {
// 从informer本地缓存中根据节点名称得到node对象
node, err := nc.nodeLister.Get(value.Value)
if apierrors.IsNotFound(err) {
// 记录日志
} else if err != nil {
// 记录日志
}
nodeUID, _ := value.UID.(string)
// 删除节点上的pod
remaining, err := nodeutil.DeletePods(nc.kubeClient, nc.recorder, value.Value, nodeUID, nc.daemonSetStore)
if err != nil {
utilruntime.HandleError(fmt.Errorf("unable to evict node %q: %v", value.Value, err))
return false, 0
}
if remaining {
klog.Infof("Pods awaiting deletion due to Controller eviction")
}
// 记录指标
if node != nil {
zone := utilnode.GetZoneKey(node)
evictionsNumber.WithLabelValues(zone).Inc()
}
return true, 0
})
}
}
3.6 nc.doNodeProcessingPassWorker
(1)如果启用taintNodeByCondition特性,则根据node condition为node添加NoScheduler类型的污点。
(2)调用nc.reconcileNodeLabels 为node维持节点的两个特殊的label。
func (nc *Controller) doNodeProcessingPassWorker() {
for {
// 从nc.nodeUpdateQueue取出元素(本质是一个字符串)
obj, shutdown := nc.nodeUpdateQueue.Get()
if shutdown {
return
}
nodeName := obj.(string)
// 访问kube-apiserver来添加污点和删除污点。
if err := nc.doNoScheduleTaintingPass(nodeName); err != nil {
klog.Errorf("Failed to taint NoSchedule on node <%s>, requeue it: %v", nodeName, err)
// TODO(k82cn): Add nodeName back to the queue
}
// 为node维持节点的两个特殊的label。
if err := nc.reconcileNodeLabels(nodeName); err != nil {
klog.Errorf("Failed to reconcile labels for node <%s>, requeue it: %v", nodeName, err)
// TODO(yujuhong): Add nodeName back to the queue
}
nc.nodeUpdateQueue.Done(nodeName)
}
}
3.6.1 doNoScheduleTaintingPass
获得期望的污点和当前的污点,计算得出应该删除的污点和应该添加的污点,最终访问kube-apiserver来添加污点和删除污点。
func (nc *Controller) doNoScheduleTaintingPass(nodeName string) error {
node, err := nc.nodeLister.Get(nodeName)
if err != nil {
// If node not found, just ignore it.
if apierrors.IsNotFound(err) {
return nil
}
return err
}
// 根据node的condition创建相应的taint对象,例如DiskPressure Condition为True值,则创建v1.Taint{Key: "node.kubernetes.io/disk-pressure", Effect: "NoSchedule"}
// taints就是期望具备的污点
var taints []v1.Taint
for _, condition := range node.Status.Conditions {
if taintMap, found := nodeConditionToTaintKeyStatusMap[condition.Type]; found {
if taintKey, found := taintMap[condition.Status]; found {
taints = append(taints, v1.Taint{
Key: taintKey,
Effect: v1.TaintEffectNoSchedule,
})
}
}
}
if node.Spec.Unschedulable { // 如果node是不可调度的,则增加一个对象v1.Taint{Key: "node.kubernetes.io/unschedulable", Effect: "NoSchedule"}
// If unschedulable, append related taint.
taints = append(taints, v1.Taint{
Key: v1.TaintNodeUnschedulable,
Effect: v1.TaintEffectNoSchedule,
})
}
// 获取node对象已具备的NoSchedule类型的污点
nodeTaints := taintutils.TaintSetFilter(node.Spec.Taints, func(t *v1.Taint) bool {
if t.Effect != v1.TaintEffectNoSchedule {
return false
}
// Find unschedulable taint of node.
if t.Key == v1.TaintNodeUnschedulable {
return true
}
// Find node condition taints of node.
_, found := taintKeyToNodeConditionMap[t.Key]
return found
})
// 对比期望具备的污点和已具备的污点,得到此时应该新增的污点和应该删除的污点
taintsToAdd, taintsToDel := taintutils.TaintSetDiff(taints, nodeTaints)
// 期望污点和实际拥有的污点完全一致,直接返回
if len(taintsToAdd) == 0 && len(taintsToDel) == 0 {
return nil
}
// 使用nc.kubeClient访问kube-apiserver,为入参node增加指定的taint和删除指定的taint
if !nodeutil.SwapNodeControllerTaint(nc.kubeClient, taintsToAdd, taintsToDel, node) {
return fmt.Errorf("failed to swap taints of node %+v", node)
}
return nil
}
3.6.2 reconcileNodeLabels
维持节点的两个特殊的label:维持kubernetes.io/arch和kubernetes.io/os。
/*
维持kubernetes.io/arch和kubernetes.io/os这两个label
labels:
beta.kubernetes.io/arch: amd64
beta.kubernetes.io/os: linux
kubernetes.io/arch: amd64
kubernetes.io/os: linux
*/
func (nc *Controller) reconcileNodeLabels(nodeName string) error {
node, err := nc.nodeLister.Get(nodeName)
if err != nil {
// 根据节点名称在本地缓存中找不到node对象,说明节点是被删除了,直接返回
if apierrors.IsNotFound(err) {
return nil
}
return err
}
if node.Labels == nil {
// node对象无任何label,直接返回
return nil
}
// labelsToUpdate是期望新增的label
labelsToUpdate := map[string]string{}
for _, r := range labelReconcileInfo {
primaryValue, primaryExists := node.Labels[r.primaryKey]
secondaryValue, secondaryExists := node.Labels[r.secondaryKey]
if !primaryExists {
// 主label(beta.kubernetes.io/arch和beta.kubernetes.io/os)不存在,则退出本次循环
continue
}
if secondaryExists && primaryValue != secondaryValue {
// 如果次label(kubernetes.io/arch和kubernetes.io/os)存在,但是值和主label不一致,则以主label的值为准
labelsToUpdate[r.secondaryKey] = primaryValue
} else if !secondaryExists && r.ensureSecondaryExists {
// 如果次label(kubernetes.io/arch和kubernetes.io/os)不存在,并且要求它要存在,也是主label的值为准
labelsToUpdate[r.secondaryKey] = primaryValue
}
}
// 不需要新增label,则直接返回
if len(labelsToUpdate) == 0 {
return nil
}
// 使用nc.kubeClient访问kube-apiserver,为入参node增加指定的label
if !nodeutil.AddOrUpdateLabelsOnNode(nc.kubeClient, labelsToUpdate, node) {
return fmt.Errorf("failed update labels for node %+v", node)
}
return nil
}
3.7 NoExecuteTaintManager
将队列中的数据转移到管道中,启动多个worker协程来消费管道。
func (tc *NoExecuteTaintManager) Run(stopCh <-chan struct{}) {
klog.V(0).Infof("Starting NoExecuteTaintManager")
// 给tc.nodeUpdateChannels和tc.podUpdateChannels这两个切片添加元素,元素就是管道
for i := 0; i < UpdateWorkerSize; i++ {
tc.nodeUpdateChannels = append(tc.nodeUpdateChannels, make(chan nodeUpdateItem, NodeUpdateChannelSize))
tc.podUpdateChannels = append(tc.podUpdateChannels, make(chan podUpdateItem, podUpdateChannelSize))
}
// 启动独立的协程,从tc.nodeUpdateQueue队列中拿出元素,将元素转换后放入tc.nodeUpdateChannels切片中的一个管道中
go func(stopCh <-chan struct{}) {
for {
item, shutdown := tc.nodeUpdateQueue.Get()
if shutdown {
break
}
nodeUpdate := item.(nodeUpdateItem)
// 对节点名称做哈希并取模
hash := hash(nodeUpdate.nodeName, UpdateWorkerSize)
select {
case <-stopCh:
tc.nodeUpdateQueue.Done(item)
return
case tc.nodeUpdateChannels[hash] <- nodeUpdate:
// tc.nodeUpdateQueue.Done()会在worker协程中调用
}
}
}(stopCh)
// 启动独立的协程,从tc.podUpdateQueue队列中拿出元素,将元素转换后放入tc.podUpdateChannels切片中的一个管道中
go func(stopCh <-chan struct{}) {
for {
item, shutdown := tc.podUpdateQueue.Get()
if shutdown {
break
}
podUpdate := item.(podUpdateItem)
// 对节点名称做哈希并取模
hash := hash(podUpdate.nodeName, UpdateWorkerSize)
select {
case <-stopCh:
tc.podUpdateQueue.Done(item)
return
case tc.podUpdateChannels[hash] <- podUpdate:
// tc.podUpdateQueue.Done()会在worker协程中调用
}
}
}(stopCh)
wg := sync.WaitGroup{}
wg.Add(UpdateWorkerSize)
// UpdateWorkerSize等于常量8,即启动8个worker协程来消费tc.nodeUpdateChannels和tc.podUpdateChannels
for i := 0; i < UpdateWorkerSize; i++ {
go tc.worker(i, wg.Done, stopCh)
}
wg.Wait()
}
3.7.1 worker
从管道中获取数据,调用相应的目标方法。处理tc.nodeUpdateChannels的优先级比处理tc.podUpdateChannels更高。因为节点发生故障(相比实例级别,是更广泛的故障),应该尽快转移其上的工作负载。
func (tc *NoExecuteTaintManager) worker(worker int, done func(), stopCh <-chan struct{}) {
defer done()
/*
处理tc.nodeUpdateChannels和tc.nodeUpdateChannels中的元素
*/
for {
select {
case <-stopCh:
return
case nodeUpdate := <-tc.nodeUpdateChannels[worker]:
tc.handleNodeUpdate(nodeUpdate)
tc.nodeUpdateQueue.Done(nodeUpdate)
case podUpdate := <-tc.podUpdateChannels[worker]:
// 先使用for循环处理tc.nodeUpdateChannels中的元素
priority:
for {
select {
case nodeUpdate := <-tc.nodeUpdateChannels[worker]:
tc.handleNodeUpdate(nodeUpdate)
tc.nodeUpdateQueue.Done(nodeUpdate)
default:
// tc.nodeUpdateChannels中无元素,则跳出for循环
break priority
}
}
// 来到此处,说明tc.nodeUpdateChannels中无元素,用tc.handlePodUpdate()处理podUpdate
tc.handlePodUpdate(podUpdate)
tc.podUpdateQueue.Done(podUpdate)
}
}
}
3.7.2 handleNodeUpdate
获取节点的NoExecute类型的污点,其上所有的pod,调用tc.processPodOnNode(…)来处理pod。
func (tc *NoExecuteTaintManager) handleNodeUpdate(nodeUpdate nodeUpdateItem) {
// 根据节点名称获取node对象
node, err := tc.getNode(nodeUpdate.nodeName)
if err != nil {
if apierrors.IsNotFound(err) {
// Delete
klog.V(4).Infof("Noticed node deletion: %#v", nodeUpdate.nodeName)
tc.taintedNodesLock.Lock()
defer tc.taintedNodesLock.Unlock()
delete(tc.taintedNodes, nodeUpdate.nodeName)
return
}
utilruntime.HandleError(fmt.Errorf("cannot get node %s: %v", nodeUpdate.nodeName, err))
return
}
// 从node对象中获取NoExecute类型的污点
taints := getNoExecuteTaints(node.Spec.Taints)
func() {
tc.taintedNodesLock.Lock()
defer tc.taintedNodesLock.Unlock()
if len(taints) == 0 {
delete(tc.taintedNodes, node.Name)
} else {
tc.taintedNodes[node.Name] = taints
}
}()
// 获取节点上所有的pod
pods, err := getPodsAssignedToNode(tc.client, node.Name)
if err != nil {
klog.Errorf(err.Error())
return
}
if len(pods) == 0 {
return
}
if len(taints) == 0 {
// 此时说明节点是没有NoExecute类型的污点,遍历每个pod关闭驱逐操作(在队列中的驱逐操作,是在未来某个时刻执行的),直接返回
for i := range pods {
tc.cancelWorkWithEvent(types.NamespacedName{Namespace: pods[i].Namespace, Name: pods[i].Name})
}
return
}
now := time.Now()
// 遍历pod调用tc.processPodOnNode(...),它将删除pod的操作封装在定时器中。
for i := range pods {
pod := &pods[i]
podNamespacedName := types.NamespacedName{Namespace: pod.Namespace, Name: pod.Name}
tc.processPodOnNode(podNamespacedName, node.Name, pod.Spec.Tolerations, taints, now)
}
}
3.7.3 processPodOnNode
将删除pod的操作封装在定时器中。
func (tc *NoExecuteTaintManager) processPodOnNode(
podNamespacedName types.NamespacedName,
nodeName string,
tolerations []v1.Toleration,
taints []v1.Taint,
now time.Time,
) {
if len(taints) == 0 {
tc.cancelWorkWithEvent(podNamespacedName)
}
// 1、检查 pod 的 tolerations 是否匹配所有 taints
allTolerated, usedTolerations := v1helper.GetMatchingTolerations(taints, tolerations)
if !allTolerated {
// 将延时队列中的任务关闭,因为要马上驱逐pod
tc.cancelWorkWithEvent(podNamespacedName)
// 马上驱逐pod
tc.taintEvictionQueue.AddWork(NewWorkArgs(podNamespacedName.Name, podNamespacedName.Namespace), time.Now(), time.Now())
return
}
// 2、获取最小容忍时间
minTolerationTime := getMinTolerationTime(usedTolerations)
// getMinTolerationTime returns negative value to denote infinite toleration.
if minTolerationTime < 0 {
// 入参pod是容忍了node的污点,直接退出。但已经调度的驱逐操作是不会删除的。
klog.V(4).Infof("New tolerations for %v tolerate forever. Scheduled deletion won't be cancelled if already scheduled.", podNamespacedName.String())
return
}
// startTime是入队时间
startTime := now
// triggerTime是驱逐操作的触发时间
triggerTime := startTime.Add(minTolerationTime)
// 从队列中取出任务scheduledEviction
scheduledEviction := tc.taintEvictionQueue.GetWorkerUnsafe(podNamespacedName.String())
if scheduledEviction != nil {
startTime = scheduledEviction.CreatedAt
if startTime.Add(minTolerationTime).Before(triggerTime) {
// 进入此处,说明旧任务的入队时间是小于入参now,直接退出,不需要执行下面的tc.taintEvictionQueue.AddWork(...)
return
}
// 旧任务的入队时间比入参now还要大,则删除旧任务,个人认为这是为了容错
tc.cancelWorkWithEvent(podNamespacedName)
}
// 来到此处,队列中已无此pod的任务
// 新建驱逐任务,并塞进延时队列中
tc.taintEvictionQueue.AddWork(NewWorkArgs(podNamespacedName.Name, podNamespacedName.Namespace), startTime, triggerTime)
}
3.7.3.1 AddWork
func (q *TimedWorkerQueue) AddWork(args *WorkArgs, createdAt time.Time, fireAt time.Time) {
key := args.KeyFromWorkArgs()
q.Lock()
defer q.Unlock()
// map中已存在,则直接返回
if _, exists := q.workers[key]; exists {
return
}
// 新建一个TimedWorker对象,并将它放入map中
// q.getWrappedWorkerFunc(key)返回一个调用kube-apiserver DELETE POD接口的函数
worker := CreateWorker(args, createdAt, fireAt, q.getWrappedWorkerFunc(key))
q.workers[key] = worker
}
3.7.3.2 TimedWorkerQueue
// TimedWorkerQueue keeps a set of TimedWorkers that are still wait for execution.
type TimedWorkerQueue struct {
sync.Mutex
workers map[string]*TimedWorker
workFunc func(args *WorkArgs) error // 在nodelifeController中就是删除pod
}
// TimedWorkerQueue的属性workFunc的值就是这个构造方法返回的函数
func deletePodHandler(c clientset.Interface, emitEventFunc func(types.NamespacedName)) func(args *WorkArgs) error {
return func(args *WorkArgs) error {
ns := args.NamespacedName.Namespace
name := args.NamespacedName.Name
klog.V(0).Infof("NoExecuteTaintManager is deleting Pod: %v", args.NamespacedName.String())
if emitEventFunc != nil {
emitEventFunc(args.NamespacedName)
}
var err error
for i := 0; i < retries; i++ {
// 访问kube-apiserver删除pod
err = c.CoreV1().Pods(ns).Delete(name, &metav1.DeleteOptions{})
if err == nil {
break
}
time.Sleep(10 * time.Millisecond)
}
return err
}
}
3.7.3.3 TimedWorker
// 最重要的属性是Timer,Timer就是未来某个时刻执行一个指定方法的定时器
type TimedWorker struct {
WorkItem *WorkArgs
CreatedAt time.Time
FireAt time.Time
Timer *time.Timer // 值是由time.AfterFunc()来得到
}
3.8 handlePodUpdate
/*
获取pod对象和节点的污点,调用tc.processPodOnNode(...)来处理pod。
*/
func (tc *NoExecuteTaintManager) handlePodUpdate(podUpdate podUpdateItem) {
pod, err := tc.getPod(podUpdate.podName, podUpdate.podNamespace)
if err != nil {
if apierrors.IsNotFound(err) {
podNamespacedName := types.NamespacedName{Namespace: podUpdate.podNamespace, Name: podUpdate.podName}
// 由于pod不存在了,因此删除已调度的驱逐此pod的任务
tc.cancelWorkWithEvent(podNamespacedName)
return
}
utilruntime.HandleError(fmt.Errorf("could not get pod %s/%s: %v", podUpdate.podName, podUpdate.podNamespace, err))
return
}
// 容错
if pod.Spec.NodeName != podUpdate.nodeName {
return
}
podNamespacedName := types.NamespacedName{Namespace: pod.Namespace, Name: pod.Name}
nodeName := pod.Spec.NodeName
if nodeName == "" { // 容错
return
}
// 根据节点名称,从tc.taintedNodes缓存中获取该节点的污点。handleNodeUpdate中会填充tc.taintedNodes这个map。
taints, ok := func() ([]v1.Taint, bool) {
tc.taintedNodesLock.Lock()
defer tc.taintedNodesLock.Unlock()
taints, ok := tc.taintedNodes[nodeName]
return taints, ok
}()
// ok=false,那说明节点被删除或者节点的污点被移除
if !ok {
return
}
调用tc.processPodOnNode(...)来处理pod
tc.processPodOnNode(podNamespacedName, nodeName, pod.Spec.Tolerations, taints, time.Now())
}
4 总结
当节点异常时,NodeLifeController做了一系列工作:为节点打污点来影响调度器的工作,为节点打污点来驱逐pod(柔性驱逐),或者直接驱逐pod(烈性驱逐),将pod从service中摘除流量。打污点来驱逐pod的做法,是通过限速队列来限制node元素出队速度,从而影响给node打污点的速度,最终影响驱逐node上的pod的速度。pod容忍污点的时间tolerationSeconds,底层做法是将删除pod的操作封装在golang原生定时器中。熟悉NodeLifeController的源码,也能熟悉编写controller的套路,套路非常统一。