predicates 算法主要是对集群中的 node 进行过滤,选出符合当前 pod 运行的 nodes。

调度算法说明

上节已经提到默认的调度算法在中定义了:

下面是对默认调度算法的一些说明:

默认的 predicates 调度算法主要分为五种类型:

1、第一种类型叫作 GeneralPredicates,包含 PodFitsResources、PodFitsHost、PodFitsHostPorts、PodMatchNodeSelector 四种策略,其具体含义如下所示:

  • PodFitsHost:检查宿主机的名字是否跟 Pod 的 spec.nodeName 一致
  • PodFitsHostPorts:检查 Pod 申请的宿主机端口(spec.nodePort)是不是跟已经被使用的端口有冲突
  • PodMatchNodeSelector:检查 Pod 的 nodeSelector 或者 nodeAffinity 指定的节点是否与节点匹配等
  • PodFitsResources:检查主机的资源是否满足 Pod 的需求,根据实际已经分配(Request)的资源量做调度

kubelet 在启动 Pod 前,会执行一个 Admit 操作来进行二次确认,这里二次确认的规则就是执行一遍 GeneralPredicates。

2、第二种类型是与 Volume 相关的过滤规则,主要有NoDiskConflictPred、MaxGCEPDVolumeCountPred、MaxAzureDiskVolumeCountPred、MaxCSIVolumeCountPred、MaxEBSVolumeCountPred、NoVolumeZoneConflictPred、CheckVolumeBindingPred。

3、第三种类型是宿主机相关的过滤规则,主要是 PodToleratesNodeTaintsPred。

4、第四种类型是 Pod 相关的过滤规则,主要是 MatchInterPodAffinityPred。

5、第五种类型是新增的过滤规则,与宿主机的运行状况有关,主要有 CheckNodeCondition、 CheckNodeMemoryPressure、CheckNodePIDPressure、CheckNodeDiskPressure 四种。若启用了 TaintNodesByCondition FeatureGates 则在 predicates 算法中会将该四种算法移除,TaintNodesByCondition 基于 当 node 出现 pressure 时自动为 node 打上 taints 标签,该功能在 v1.8 引入,v1.12 成为 beta 版本,目前 v1.16 中也是 beta 版本,但在 v1.13 中该功能已默认启用。

k8s.io/kubernetes/pkg/scheduler/algorithm/predicates/predicates.go:146

  1. var (
  2. predicatesOrdering = []string{CheckNodeConditionPred, CheckNodeUnschedulablePred,
  3. GeneralPred, HostNamePred, PodFitsHostPortsPred,
  4. MatchNodeSelectorPred, PodFitsResourcesPred, NoDiskConflictPred,
  5. PodToleratesNodeTaintsPred, PodToleratesNodeNoExecuteTaintsPred, CheckNodeLabelPresencePred,
  6. CheckServiceAffinityPred, MaxEBSVolumeCountPred, MaxGCEPDVolumeCountPred, MaxCSIVolumeCountPred,
  7. MaxAzureDiskVolumeCountPred, MaxCinderVolumeCountPred, CheckVolumeBindingPred, NoVolumeZoneConflictPred,
  8. CheckNodeMemoryPressurePred, CheckNodePIDPressurePred, CheckNodeDiskPressurePred, EvenPodsSpreadPred, MatchInterPodAffinityPred}
  9. )

源码分析

上节中已经说到调用预选以及优选算法的逻辑在 k8s.io/kubernetes/pkg/scheduler/core/generic_scheduler.go:189中,

findNodesThatFit() 是 predicates 策略的实际调用方法,其基本流程如下:

  • 设定最多需要检查的节点数,作为预选节点数组的容量,避免总节点过多影响调度效率
  • 通过NodeTree()不断获取下一个节点来判断该节点是否满足 pod 的调度条件
  • 通过之前注册的各种 predicates 函数来判断当前节点是否符合 pod 的调度条件
  • 最后返回满足调度条件的 node 列表,供下一步的优选操作

checkNode()是一个校验 node 是否符合要求的函数,其实际调用到的核心函数是podFitsOnNode(),再通过workqueue() 并发执行checkNode() 函数,workqueue() 会启动 16 个 goroutine 来并行计算需要筛选的 node 列表,其主要流程如下:

  • 通过 cache 中的 NodeTree() 不断获取下一个 node
  • 将当前 node 和 pod 传入podFitsOnNode() 方法中来判断当前 node 是否符合要求
  • 如果当前 node 符合要求就将当前 node 加入预选节点的数组中filtered
  • 如果当前 node 不满足要求,则加入到失败的数组中,并记录原因
  • 通过workqueue.ParallelizeUntil()并发执行checkNode()函数,一旦找到足够的可行节点数后就停止筛选更多节点
  • 若配置了 extender 则再次进行过滤已筛选出的 node

k8s.io/kubernetes/pkg/scheduler/core/generic_scheduler.go:464

  1. func (g *genericScheduler) findNodesThatFit(pluginContext *framework.PluginContext, pod *v1.Pod) ([]*v1.Node, FailedPredicateMap, framework.NodeToStatusMap, error) {
  2. var filtered []*v1.Node
  3. failedPredicateMap := FailedPredicateMap{}
  4. filteredNodesStatuses := framework.NodeToStatusMap{}
  5. if len(g.predicates) == 0 {
  6. filtered = g.cache.ListNodes()
  7. } else {
  8. allNodes := int32(g.cache.NodeTree().NumNodes())
  9. // 1.设定最多需要检查的节点数
  10. numNodesToFind := g.numFeasibleNodesToFind(allNodes)
  11. filtered = make([]*v1.Node, numNodesToFind)
  12. ......
  13. // 2.获取该 pod 的 meta 值
  14. meta := g.predicateMetaProducer(pod, g.nodeInfoSnapshot.NodeInfoMap)
  15. // 3.checkNode 为执行预选算法的函数
  16. checkNode := func(i int) {
  17. nodeName := g.cache.NodeTree().Next()
  18. // 4.podFitsOnNode 最终执行预选算法的函数
  19. fits, failedPredicates, status, err := g.podFitsOnNode(
  20. ......
  21. )
  22. if err != nil {
  23. ......
  24. }
  25. if fits {
  26. length := atomic.AddInt32(&filteredLen, 1)
  27. if length > numNodesToFind {
  28. cancel()
  29. atomic.AddInt32(&filteredLen, -1)
  30. } else {
  31. filtered[length-1] = g.nodeInfoSnapshot.NodeInfoMap[nodeName].Node()
  32. }
  33. } else {
  34. ......
  35. }
  36. }
  37. // 5.启动 16 个 goroutine 并发执行 checkNode 函数
  38. filtered = filtered[:filteredLen]
  39. if len(errs) > 0 {
  40. ......
  41. }
  42. // 6.若配置了 extender 则再次进行过滤
  43. if len(filtered) > 0 && len(g.extenders) != 0 {
  44. ......
  45. }
  46. return filtered, failedPredicateMap, filteredNodesStatuses, nil
  47. }

然后继续看如何设定最多需要检查的节点数,此过程由numFeasibleNodesToFind()进行处理,基本流程如下:

  • 如果总的 node 节点小于minFeasibleNodesToFind(默认为100)则直接返回总节点数
  • 如果节点数超过 100,则取指定百分比 percentageOfNodesToScore(默认值为 50)的节点数 ,当该百分比后的数目仍小于minFeasibleNodesToFind,则返回minFeasibleNodesToFind
  • 如果百分比后的数目大于minFeasibleNodesToFind,则返回该百分比的节点数

所以当节点数小于 100 时直接返回,大于 100 时只返回其总数的 50%。percentageOfNodesToScore 参数在 v1.12 引入,默认值为 50,kube-scheduler 在启动时可以设定该参数的值。

k8s.io/kubernetes/pkg/scheduler/core/generic_scheduler.go:441

pridicates 调度算法的核心是 podFitsOnNode() ,scheduler 的抢占机制也会执行该函数,podFitsOnNode()基本流程如下:

  • 遍历已经注册好的预选策略predicates.Ordering(),按顺序执行对应的策略函数
  • 遍历执行每个策略函数,并返回是否合适,预选失败的原因和错误
  • 如果预选函数执行失败,则加入预选失败的数组中,直接返回,后面的预选函数不会再执行
  • 如果该 node 上存在 nominated pod 则执行两次预选函数

因为引入了抢占机制,此处主要说明一下执行两次预选函数的原因:

第一次循环,若该 pod 为抢占者(nominatedPods),调度器会假设该 pod 已经运行在这个节点上,然后更新metanodeInfonominatedPods是指执行了抢占机制且已经分配到了 node(pod.Status.NominatedNodeName 已被设定) 但是还没有真正运行起来的 pod,然后再执行所有的预选函数。

而只有这两遍 predicates 算法都能通过时,这个 pod 和 node 才会被认为是可以绑定(bind)的。这样做是因为考虑到 pod affinity 等策略的执行,如果当前的 pod 与nominatedPods有依赖关系就会有问题,因为nominatedPods不能保证一定可以调度且在已指定的 node 运行成功,也可能出现被其他高优先级的 pod 抢占等问题,关于抢占问题下篇会详细介绍。

k8s.io/kubernetes/pkg/scheduler/core/generic_scheduler.go:610

  1. func (g *genericScheduler) podFitsOnNode(......) (bool, []predicates.PredicateFailureReason, *framework.Status, error) {
  2. var failedPredicates []predicates.PredicateFailureReason
  3. var status *framework.Status
  4. podsAdded := false
  5. for i := 0; i < 2; i++ {
  6. metaToUse := meta
  7. nodeInfoToUse := info
  8. if i == 0 {
  9. // 1.第一次循环加入 NominatedPods,计算 meta, nodeInfo
  10. podsAdded, metaToUse, nodeInfoToUse = addNominatedPods(pod, meta, info, queue)
  11. } else if !podsAdded || len(failedPredicates) != 0 {
  12. break
  13. }
  14. // 2.按顺序执行所有预选函数
  15. for _, predicateKey := range predicates.Ordering() {
  16. var (
  17. fit bool
  18. reasons []predicates.PredicateFailureReason
  19. err error
  20. )
  21. if predicate, exist := predicateFuncs[predicateKey]; exist {
  22. fit, reasons, err = predicate(pod, metaToUse, nodeInfoToUse)
  23. if err != nil {
  24. return false, []predicates.PredicateFailureReason{}, nil, err
  25. }
  26. // 3.任何一个预选函数执行失败则直接返回
  27. if !fit {
  28. failedPredicates = append(failedPredicates, reasons...)
  29. if !alwaysCheckAllPredicates {
  30. klog.V(5).Infoln("since alwaysCheckAllPredicates has not been set, the predicate " +
  31. "evaluation is short circuited and there are chances " +
  32. "of other predicates failing as well.")
  33. break
  34. }
  35. }
  36. }
  37. }
  38. // 4.执行 Filter Plugin
  39. status = g.framework.RunFilterPlugins(pluginContext, pod, info.Node().Name)
  40. if !status.IsSuccess() && !status.IsUnschedulable() {
  41. return false, failedPredicates, status, status.AsError()
  42. }
  43. }
  44. return len(failedPredicates) == 0 && status.IsSuccess(), failedPredicates, status, nil

至此,关于 predicates 调度算法的执行过程已经分析完。

priorities 调度算法是在 pridicates 算法后执行的,主要功能是对已经过滤出的 nodes 进行打分并选出最佳的一个 node。

调度算法说明

默认的调度算法在pkg/scheduler/algorithmprovider/defaults/defaults.go中定义了:

默认调度算法的一些说明:

priorities 算法 说明
SelectorSpreadPriority 按 service,rs,statefulset 归属计算 Node 上分布最少的同类 Pod数量,数量越少得分越高,默认权重为1
InterPodAffinityPriority pod 亲和性选择策略,默认权重为1
LeastRequestedPriority 选择空闲资源(CPU 和 Memory)最多的节点,默认权重为1,其计算方式为:score = (cpu((capacity-sum(requested))10/capacity) + memory((capacity-sum(requested))10/capacity))/2
BalancedResourceAllocation CPU、Memory 以及 Volume 资源分配最均衡的节点,默认权重为1,其计算方式为:score = 10 - variance(cpuFraction,memoryFraction,volumeFraction)*10
NodePreferAvoidPodsPriority 判断 node annotation 是否有scheduler.alpha.kubernetes.io/preferAvoidPods 标签,类似于 taints 机制,过滤标签中定义类型的 pod,默认权重为10000
NodeAffinityPriority 节点亲和性选择策略,默认权重为1
TaintTolerationPriority Pod 是否容忍节点上的 Taint,优先调度到标记了 Taint 的节点,默认权重为1
ImageLocalityPriority 待调度 Pod 需要使用的镜像是否存在于该节点,默认权重为1

源码分析

执行 priorities 调度算法的逻辑是在 PrioritizeNodes()函数中,其目的是执行每个 priority 函数为 node 打分,分数为 0-10,其功能主要有:

  • PrioritizeNodes() 通过并行运行各个优先级函数来对节点进行打分
  • 每个优先级函数会给节点打分,打分范围为 0-10 分,0 表示优先级最低的节点,10表示优先级最高的节点
  • 每个优先级函数有各自的权重
  • 最后计算所有节点的总加权分数

k8s.io/kubernetes/pkg/scheduler/core/generic_scheduler.go:691

  1. func PrioritizeNodes(......) (schedulerapi.HostPriorityList, error) {
  2. // 1.检查是否有自定义配置
  3. if len(priorityConfigs) == 0 && len(extenders) == 0 {
  4. result := make(schedulerapi.HostPriorityList, 0, len(nodes))
  5. for i := range nodes {
  6. hostPriority, err := EqualPriorityMap(pod, meta, nodeNameToInfo[nodes[i].Name])
  7. if err != nil {
  8. return nil, err
  9. }
  10. result = append(result, hostPriority)
  11. }
  12. return result, nil
  13. }
  14. ......
  15. results := make([]schedulerapi.HostPriorityList, len(priorityConfigs), len(priorityConfigs))
  16. ......
  17. // 2.使用 workqueue 启动 16 个 goroutine 并发为 node 打分
  18. workqueue.ParallelizeUntil(context.TODO(), 16, len(nodes), func(index int) {
  19. nodeInfo := nodeNameToInfo[nodes[index].Name]
  20. for i := range priorityConfigs {
  21. if priorityConfigs[i].Function != nil {
  22. continue
  23. }
  24. var err error
  25. results[i][index], err = priorityConfigs[i].Map(pod, meta, nodeInfo)
  26. if err != nil {
  27. appendError(err)
  28. results[i][index].Host = nodes[index].Name
  29. }
  30. }
  31. })
  32. // 3.执行自定义配置
  33. for i := range priorityConfigs {
  34. ......
  35. }
  36. wg.Wait()
  37. if len(errs) != 0 {
  38. return schedulerapi.HostPriorityList{}, errors.NewAggregate(errs)
  39. }
  40. // 4.运行 Score plugins
  41. scoresMap, scoreStatus := framework.RunScorePlugins(pluginContext, pod, nodes)
  42. if !scoreStatus.IsSuccess() {
  43. return schedulerapi.HostPriorityList{}, scoreStatus.AsError()
  44. }
  45. result := make(schedulerapi.HostPriorityList, 0, len(nodes))
  46. // 5.为每个 node 汇总分数
  47. for i := range nodes {
  48. result = append(result, schedulerapi.HostPriority{Host: nodes[i].Name, Score: 0})
  49. for j := range priorityConfigs {
  50. result[i].Score += results[j][i].Score * priorityConfigs[j].Weight
  51. }
  52. for j := range scoresMap {
  53. result[i].Score += scoresMap[j][i].Score
  54. }
  55. }
  56. // 6.执行 extender
  57. if len(extenders) != 0 && nodes != nil {
  58. ......
  59. }
  60. ......
  61. return result, nil
  62. }

本文主要讲述了 kube-scheduler 中的 predicates 调度算法与 priorities 调度算法的执行流程,可以看到 kube-scheduler 中有许多的调度策略,但是想要添加自己的策略并不容易,scheduler 目前已经朝着提升性能与扩展性的方向演进了,其调度部分进行性能优化的一个最根本原则就是尽最大可能将集群信息 cache 化,以便从根本上提高 predicates 和 priorities 调度算法的执行效率。第二个就是在 bind 阶段进行异步处理,只会更新其 cache 里的 pod 和 node 的信息,这种基于“乐观”假设的 API 对象更新方式,在 kubernetes 里被称作 assume,如果这次异步的 bind 过程失败了,其实也没有太大关系,等 scheduler cache 同步之后一切又恢复正常了。除了上述的“cache 化”和“乐观绑定”,还有一个重要的设计,那就是“无锁化”,predicates 调度算法与 priorities 调度算法的执行都是并行的,只有在调度队列和 scheduler cache 进行操作时,才需要加锁,而对调度队列的操作并不影响主流程。

参考:

https://kubernetes.io/docs/concepts/configuration/scheduling-framework/