本节开始主要分析kubernetes源码部分,版本基于当前最新的1.13.4。
启动分析
Kubernetes基础组件的入口均在cmd
目录下,kube-schduler入口在scheduler.go
下,如图
command
的形式,引用的是
spf13类库
通过将配置文件转化成
command
的形式,调用
Execute
方法执行定义的
Run
方法
进入
runCommand
方法,通过完成配置的初始化,调用
Run
方法,进一步启动。
Run方法分析
Run方法主要做了以下工作:
1、判断是否需要添加VolumeScheduling
新特性;
2、初始化调度参数的相关结构体;
3、配置准备事件广播;
4、健康检查相关配置;
5、Metrics
相关配置;
6、启动所有的Informer
(kubernetes主要就是通过Informer
和Workqueue
机制监听事件的变化);
7、判断是否需要LeaderElection
,决定最终的启动。
调度接口
最终的调度接口进入的是pkg
下的scheduler.go
文件,通过启动单独的协程处理调度工作。
scheduleOne方法分析
scheduleOne,顾名思义,每次调度一个Pod,整体文件如下
// scheduleOne does the entire scheduling workflow for a single pod. It is serialized on the scheduling algorithm's host fitting.
func (sched *Scheduler) scheduleOne() {
// 1.从队列中取出待调度的Pod
pod := sched.config.NextPod()
// pod could be nil when schedulerQueue is closed
if pod == nil {
return
}
if pod.DeletionTimestamp != nil {
sched.config.Recorder.Eventf(pod, v1.EventTypeWarning, "FailedScheduling", "skip schedule deleting pod: %v/%v", pod.Namespace, pod.Name)
klog.V(3).Infof("Skip schedule deleting pod: %v/%v", pod.Namespace, pod.Name)
return
}
klog.V(3).Infof("Attempting to schedule pod: %v/%v", pod.Namespace, pod.Name)
// Synchronously attempt to find a fit for the pod.
start := time.Now()
// 2.获取待调度Pod匹配的主机名
suggestedHost, err := sched.schedule(pod)
if err != nil {
// schedule() may have failed because the pod would not fit on any host, so we try to
// preempt, with the expectation that the next time the pod is tried for scheduling it
// will fit due to the preemption. It is also possible that a different pod will schedule
// into the resources that were preempted, but this is harmless.
if fitError, ok := err.(*core.FitError); ok {
preemptionStartTime := time.Now()
sched.preempt(pod, fitError)
metrics.PreemptionAttempts.Inc()
metrics.SchedulingAlgorithmPremptionEvaluationDuration.Observe(metrics.SinceInMicroseconds(preemptionStartTime))
metrics.SchedulingLatency.WithLabelValues(metrics.PreemptionEvaluation).Observe(metrics.SinceInSeconds(preemptionStartTime))
// Pod did not fit anywhere, so it is counted as a failure. If preemption
// succeeds, the pod should get counted as a success the next time we try to
// schedule it. (hopefully)
metrics.PodScheduleFailures.Inc()
} else {
klog.Errorf("error selecting node for pod: %v", err)
metrics.PodScheduleErrors.Inc()
}
return
}
metrics.SchedulingAlgorithmLatency.Observe(metrics.SinceInMicroseconds(start))
// Tell the cache to assume that a pod now is running on a given node, even though it hasn't been bound yet.
// This allows us to keep scheduling without waiting on binding to occur.
// 3.Pod与Node缓存,保证调度一直进行,不用等待每次绑定完成(绑定是一个耗时的过程)
assumedPod := pod.DeepCopy()
// Assume volumes first before assuming the pod.
//
// If all volumes are completely bound, then allBound is true and binding will be skipped.
//
// Otherwise, binding of volumes is started after the pod is assumed, but before pod binding.
//
// This function modifies 'assumedPod' if volume binding is required.
// 4.判断是否需要VolumeScheduling特性
allBound, err := sched.assumeVolumes(assumedPod, suggestedHost)
if err != nil {
klog.Errorf("error assuming volumes: %v", err)
metrics.PodScheduleErrors.Inc()
return
}
// assume modifies `assumedPod` by setting NodeName=suggestedHost
// 5.Pod对应的NodeName写上主机名,存入缓存
err = sched.assume(assumedPod, suggestedHost)
if err != nil {
klog.Errorf("error assuming pod: %v", err)
metrics.PodScheduleErrors.Inc()
return
}
// bind the pod to its host asynchronously (we can do this b/c of the assumption step above).
// 6.请求apiserver,异步处理最终的绑定,写入到etcd
go func() {
// Bind volumes first before Pod
if !allBound {
err := sched.bindVolumes(assumedPod)
if err != nil {
klog.Errorf("error binding volumes: %v", err)
metrics.PodScheduleErrors.Inc()
return
}
}
err := sched.bind(assumedPod, &v1.Binding{
ObjectMeta: metav1.ObjectMeta{Namespace: assumedPod.Namespace, Name: assumedPod.Name, UID: assumedPod.UID},
Target: v1.ObjectReference{
Kind: "Node",
Name: suggestedHost,
},
})
metrics.E2eSchedulingLatency.Observe(metrics.SinceInMicroseconds(start))
if err != nil {
klog.Errorf("error binding pod: %v", err)
metrics.PodScheduleErrors.Inc()
} else {
metrics.PodScheduleSuccesses.Inc()
}
}()
}
复制代码
主要做了以下工作:
1、从队列中取出待调度的Pod
2、根据调度算法(预选+优选)获取待调度Pod匹配的主机,如果未获取到合适的主机,判断是否需要preempt
,即Pod的抢占策略,为Pod分配节点
3、将当前Pod缓存起来,假定已经绑定成功(主要是为了将scheduling与binding过程分开)
4、判断是否需要VolumeScheduling特性继续添加Pod信息
5、Pod对应的NodeName写上主机名(调度的本质就是将为空的NodeName写上相应的Node的值)
6、启动新的binding协程,请求apiserver,异步处理最终的绑定,将结果写入到etcd中
调度算法
最终的调度在generic_scheduler.go
的Schedule
方法。调度主要分两步,预选和优选。
预选
预选算法调用的接口是findNodesThatFit
,主要代码如下:
// Filters the nodes to find the ones that fit based on the given predicate functions
// Each node is passed through the predicate functions to determine if it is a fit
func (g *genericScheduler) findNodesThatFit(pod *v1.Pod, nodes []*v1.Node) ([]*v1.Node, FailedPredicateMap, error) {
var filtered []*v1.Node
failedPredicateMap := FailedPredicateMap{}
// 该if表示,如果没有配置预选的算法,则直接将所有的Node写入匹配数组
if len(g.predicates) == 0 {
filtered = nodes
} else {
allNodes := int32(g.cache.NodeTree().NumNodes)
// numFeasibleNodesToFind保证一次性不用返回过多的Node数量,避免数组过大
numNodesToFind := g.numFeasibleNodesToFind(allNodes)
// Create filtered list with enough space to avoid growing it
// and allow assigning.
filtered = make([]*v1.Node, numNodesToFind)
errs := errors.MessageCountMap{}
var (
predicateResultLock sync.Mutex
filteredLen int32
equivClass *equivalence.Class
)
ctx, cancel := context.WithCancel(context.Background())
// We can use the same metadata producer for all nodes.
meta := g.predicateMetaProducer(pod, g.cachedNodeInfoMap)
if g.equivalenceCache != nil {
// getEquivalenceClassInfo will return immediately if no equivalence pod found
equivClass = equivalence.NewClass(pod)
}
// checkNode处理预选策略
checkNode := func(i int) {
var nodeCache *equivalence.NodeCache
// 每次获取Node信息
nodeName := g.cache.NodeTree().Next()
if g.equivalenceCache != nil {
nodeCache = g.equivalenceCache.LoadNodeCache(nodeName)
}
fits, failedPredicates, err := podFitsOnNode(
pod,
meta,
g.cachedNodeInfoMap[nodeName],
g.predicates,
nodeCache,
g.schedulingQueue,
g.alwaysCheckAllPredicates,
equivClass,
)
if err != nil {
predicateResultLock.Lock()
errs[err.Error()]++
predicateResultLock.Unlock()
return
}
if fits {
// 保证获取的Node数量在numNodesToFind内
length := atomic.AddInt32(&filteredLen, 1)
if length > numNodesToFind {
// 通知ParallelizeUntil任务结束
cancel()
atomic.AddInt32(&filteredLen, -1)
} else {
filtered[length-1] = g.cachedNodeInfoMap[nodeName].Node()
}
} else {
predicateResultLock.Lock()
failedPredicateMap[nodeName] = failedPredicates
predicateResultLock.Unlock()
}
}
// Stops searching for more nodes once the configured number of feasible nodes
// are found.
// 并行处理多个Node的checkNode工作
workqueue.ParallelizeUntil(ctx, 16, int(allNodes), checkNode)
filtered = filtered[:filteredLen]
if len(errs) > 0 {
return []*v1.Node{}, FailedPredicateMap{}, errors.CreateAggregateFromMessageCountMap(errs)
}
}
if len(filtered) > 0 && len(g.extenders) != 0 {
for _, extender := range g.extenders {
if !extender.IsInterested(pod) {
continue
}
filteredList, failedMap, err := extender.Filter(pod, filtered, g.cachedNodeInfoMap)
if err != nil {
if extender.IsIgnorable() {
klog.Warningf("Skipping extender %v as it returned error %v and has ignorable flag set",
extender, err)
continue
} else {
return []*v1.Node{}, FailedPredicateMap{}, err
}
}
for failedNodeName, failedMsg := range failedMap {
if _, found := failedPredicateMap[failedNodeName]; !found {
failedPredicateMap[failedNodeName] = []algorithm.PredicateFailureReason{}
}
failedPredicateMap[failedNodeName] = append(failedPredicateMap[failedNodeName], predicates.NewFailureReason(failedMsg))
}
filtered = filteredList
if len(filtered) == 0 {
break
}
}
}
return filtered, failedPredicateMap, nil
}
复制代码
findNodesThatFit
主要做了几个操作
1、判断是否配置了预选算法,如果没有,直接返回Node列表信息;
2、如果配置了预选算法,则同时对多个Node(最多一次16个)调用checkNode
方法,判断Pod是否可以调度在该Node上;
3、预选筛选之后,如果配置了调度的扩展算法,需要继续对筛选后的Pod与Node进行再一次的筛选,获取最终匹配的Node列表。
这里有一个注意的地方,获取匹配的Node节点数量时,通过numFeasibleNodesToFind
函数限制了每次获取的节点数,最大值为100
。这样当匹配到相应的Node数时,checkNode
方法不再调用。
这里个人觉着有些问题,当Node数量足够多的时候(大于100),由于numFeasibleNodesToFind
限制了Node数量,导致并不能扫描到所有的Node,这样可能导致最合适的Node没有被扫描到,匹配到的只是较优先的Node,则最终调度到的Node也不是最合适的Node,只是相较于比较合适。
最终实现调度判断的接口是podFitsOnNode
。
podFitsOnNode
最难理解的就是for循环了两次,根据注释,大致意思如下:
1、第一次循环,将所有的优先级比较高或者相等的nominatedPods
加入到Node中,更新meta
和nodeInfo
。nominatedPods
是指已经分配到Node内但是还没有真正运行起来的Pods。这样做可以保证优先级高的Pods不会因为现在的Pod的加入而导致调度失败;
2、第二次调度,不将nominatedPods
加入到Node内。这样的原因是因为考虑到像Pod affinity策略的话,如果当前的Pod依赖的是nominatedPods
,这样就会有问题。因为,nominatedPods
不能保证一定可以调度到相应的Node上。
// podFitsOnNode checks whether a node given by NodeInfo satisfies the given predicate functions.
// For given pod, podFitsOnNode will check if any equivalent pod exists and try to reuse its cached
// predicate results as possible.
// This function is called from two different places: Schedule and Preempt.
// When it is called from Schedule, we want to test whether the pod is schedulable
// on the node with all the existing pods on the node plus higher and equal priority
// pods nominated to run on the node.
// When it is called from Preempt, we should remove the victims of preemption and
// add the nominated pods. Removal of the victims is done by SelectVictimsOnNode().
// It removes victims from meta and NodeInfo before calling this function.
// ---
// podFitsOnNode根据给定的NodeInfo判断是否匹配相应的预选函数
// 对于一个给定的Pod,podFitsOnNode会检查之前是否有等价的Pod,这样就可以直接复用等价Pod的预选结果
// 该函数会有两个地方调用:Schedule和Preempt
// 当Schedule(正常调度)的时候,判断Node上所有已经存在的Pod和将被指定将要调度到这个Node上的其他所有高优先级Pod外,当前的Pod是否可以调度
// 当Preempt(抢占式)的时候,待定。。。
func podFitsOnNode(
pod *v1.Pod,
meta algorithm.PredicateMetadata,
info *schedulercache.NodeInfo,
predicateFuncs map[string]algorithm.FitPredicate,
nodeCache *equivalence.NodeCache,
queue internalqueue.SchedulingQueue,
alwaysCheckAllPredicates bool,
equivClass *equivalence.Class,
) (bool, []algorithm.PredicateFailureReason, error) {
var (
eCacheAvailable bool
failedPredicates []algorithm.PredicateFailureReason
)
podsAdded := false
// We run predicates twice in some cases. If the node has greater or equal priority
// nominated pods, we run them when those pods are added to meta and nodeInfo.
// If all predicates succeed in this pass, we run them again when these
// nominated pods are not added. This second pass is necessary because some
// predicates such as inter-pod affinity may not pass without the nominated pods.
// If there are no nominated pods for the node or if the first run of the
// predicates fail, we don't run the second pass.
// We consider only equal or higher priority pods in the first pass, because
// those are the current "pod" must yield to them and not take a space opened
// for running them. It is ok if the current "pod" take resources freed for
// lower priority pods.
// Requiring that the new pod is schedulable in both circumstances ensures that
// we are making a conservative decision: predicates like resources and inter-pod
// anti-affinity are more likely to fail when the nominated pods are treated
// as running, while predicates like pod affinity are more likely to fail when
// the nominated pods are treated as not running. We can't just assume the
// nominated pods are running because they are not running right now and in fact,
// they may end up getting scheduled to a different node.
// 两次循环的原因主要就是因为NominatedPods调度的不一定就是此Node,还有Pod的亲和性等问题
for i := 0; i < 2; i++ {
metaToUse := meta
nodeInfoToUse := info
if i == 0 {
// 第一次调度,根据NominatedPods更新meta和nodeInfo信息,pod根据更新后的信息去预选
// 第二次调度,meta和nodeInfo信息不变,保证pod不完全依赖于NominatedPods(主要考虑到pod亲和性之类的)
podsAdded, metaToUse, nodeInfoToUse = addNominatedPods(pod, meta, info, queue)
} else if !podsAdded || len(failedPredicates) != 0 {
break
}
// Bypass eCache if node has any nominated pods.
// TODO(bsalamat): consider using eCache and adding proper eCache invalidations
// when pods are nominated or their nominations change.
eCacheAvailable = equivClass != nil && nodeCache != nil && !podsAdded
for predicateID, predicateKey := range predicates.Ordering() {
var (
fit bool
reasons []algorithm.PredicateFailureReason
err error
)
//TODO (yastij) : compute average predicate restrictiveness to export it as Prometheus metric
if predicate, exist := predicateFuncs[predicateKey]; exist {
if eCacheAvailable {
fit, reasons, err = nodeCache.RunPredicate(predicate, predicateKey, predicateID, pod, metaToUse, nodeInfoToUse, equivClass)
} else {
fit, reasons, err = predicate(pod, metaToUse, nodeInfoToUse)
}
if err != nil {
return false, []algorithm.PredicateFailureReason{}, err
}
if !fit {
// eCache is available and valid, and predicates result is unfit, record the fail reasons
failedPredicates = append(failedPredicates, reasons...)
// if alwaysCheckAllPredicates is false, short circuit all predicates when one predicate fails.
if !alwaysCheckAllPredicates {
klog.V(5).Infoln("since alwaysCheckAllPredicates has not been set, the predicate " +
"evaluation is short circuited and there are chances " +
"of other predicates failing as well.")
break
}
}
}
}
}
return len(failedPredicates) == 0, failedPredicates, nil
}
复制代码
之后就是根据预选的调度算法,一个个判断是否都满足。这里有个小优化,如果当前的Pod在之前有一个等价的Pod,则直接从缓存返回相应上一次的结果。如果成功则不用继续调用预选算法。但是,对于缓存部分,我个人有些疑问,可能对于上一个Pod缓存的结果是成功的,但是本次调度,Node信息发生变化了,缓存结果是成功的,但是实际上可能并不一定会成功。
预选调度算法
本节主要说的是默认的调度算法。默认的代码在pkg/scheduler/algorithmprovider/defaults/defaults.go
下,defaultPredicates
方法返回的是默认的一系列预选算法。与预选相关的代码都在pkg/scheduler/algorithm/predicates/predicates.go
下
调度方法基本一致,参数为
(pod *v1.Pod, meta algorithm.PredicateMetadata, nodeInfo *schedulercache.NodeInfo)
,返回值为
(bool, []algorithm.PredicateFailureReason, error)
。
优选
预选完成之后会得到一个Node的数组。如果预选合适的节点数大于1,则需要调用优选算法根据评分获取最优的节点。
优选算法调用的接口是PrioritizeNodes
,使用与预选类似的多任务同步调用方式,采用MapReduce的思想,Map根据不同的优选算法获取对某一Node的值,根据Reduce统计最终的结果。
优选调度算法
优选调度算法默认代码在pkg/scheduler/algorithmprovider/defaults/defaults.go
下,defaultPriorities
方法返回的是默认的一系列优选算法,通过工厂模式处理相应的优选算法,代码如下
func defaultPriorities() sets.String {
return sets.NewString(
// spreads pods by minimizing the number of pods (belonging to the same service or replication controller) on the same node.
factory.RegisterPriorityConfigFactory(
"SelectorSpreadPriority",
factory.PriorityConfigFactory{
MapReduceFunction: func(args factory.PluginFactoryArgs) (algorithm.PriorityMapFunction, algorithm.PriorityReduceFunction) {
return priorities.NewSelectorSpreadPriority(args.ServiceLister, args.ControllerLister, args.ReplicaSetLister, args.StatefulSetLister)
},
Weight: 1,
},
),
// pods should be placed in the same topological domain (e.g. same node, same rack, same zone, same power domain, etc.)
// as some other pods, or, conversely, should not be placed in the same topological domain as some other pods.
factory.RegisterPriorityConfigFactory(
"InterPodAffinityPriority",
factory.PriorityConfigFactory{
Function: func(args factory.PluginFactoryArgs) algorithm.PriorityFunction {
return priorities.NewInterPodAffinityPriority(args.NodeInfo, args.NodeLister, args.PodLister, args.HardPodAffinitySymmetricWeight)
},
Weight: 1,
},
),
// Prioritize nodes by least requested utilization.
factory.RegisterPriorityFunction2("LeastRequestedPriority", priorities.LeastRequestedPriorityMap, nil, 1),
// Prioritizes nodes to help achieve balanced resource usage
factory.RegisterPriorityFunction2("BalancedResourceAllocation", priorities.BalancedResourceAllocationMap, nil, 1),
// Set this weight large enough to override all other priority functions.
// TODO: Figure out a better way to do this, maybe at same time as fixing #24720.
factory.RegisterPriorityFunction2("NodePreferAvoidPodsPriority", priorities.CalculateNodePreferAvoidPodsPriorityMap, nil, 10000),
// Prioritizes nodes that have labels matching NodeAffinity
factory.RegisterPriorityFunction2("NodeAffinityPriority", priorities.CalculateNodeAffinityPriorityMap, priorities.CalculateNodeAffinityPriorityReduce, 1),
// Prioritizes nodes that marked with taint which pod can tolerate.
factory.RegisterPriorityFunction2("TaintTolerationPriority", priorities.ComputeTaintTolerationPriorityMap, priorities.ComputeTaintTolerationPriorityReduce, 1),
// ImageLocalityPriority prioritizes nodes that have images requested by the pod present.
factory.RegisterPriorityFunction2("ImageLocalityPriority", priorities.ImageLocalityPriorityMap, nil, 1),
)
}
复制代码
用到的优选算法通过代码结构基本可以看出
每一个不同的优选策略独立成一个单独的文件。通过优选之后,调用
selectHost
方法获取分数最高的Node。如果多个Node分数相同,则使用轮询的方式得到最终的Node。
抢占调度
未完待续。。。