官方对 kube-scheduler 的调度流程描述 :

kube-scheduler 目前包含两部分调度算法 predicates 和 priorities,首先执行 predicates 算法过滤部分 node 然后执行 priorities 算法为所有 node 打分,最后从所有 node 中选出分数最高的最为最佳的 node。

kubernetes 中所有组件的启动流程都是类似的,首先会解析命令行参数、添加默认值,kube-scheduler 的默认参数在 中定义的。然后会执行 run 方法启动主逻辑,下面直接看 kube-scheduler 的主逻辑 run 方法执行过程。

Run() 方法主要做了以下工作:

  • 初始化 scheduler 对象
  • 启动 kube-scheduler server,kube-scheduler 监听 10251 和 10259 端口,10251 端口不需要认证,可以获取 healthz metrics 等信息,10259 为安全端口,需要认证
  • 启动所有的 informer
  • 执行 sched.Run() 方法,执行主调度逻辑

k8s.io/kubernetes/cmd/kube-scheduler/app/server.go:160

  1. func Run(cc schedulerserverconfig.CompletedConfig, stopCh <-chan struct{}, registryOptions ...Option) error {
  2. ......
  3. // 1、初始化 scheduler 对象
  4. sched, err := scheduler.New(......)
  5. if err != nil {
  6. return err
  7. }
  8. // 2、启动事件广播
  9. if cc.Broadcaster != nil && cc.EventClient != nil {
  10. cc.Broadcaster.StartRecordingToSink(stopCh)
  11. }
  12. if cc.LeaderElectionBroadcaster != nil && cc.CoreEventClient != nil {
  13. cc.LeaderElectionBroadcaster.StartRecordingToSink(&corev1.EventSinkImpl{Interface: cc.CoreEventClient.Events("")})
  14. }
  15. ......
  16. // 3、启动 http server
  17. if cc.InsecureServing != nil {
  18. separateMetrics := cc.InsecureMetricsServing != nil
  19. handler := buildHandlerChain(newHealthzHandler(&cc.ComponentConfig, separateMetrics, checks...), nil, nil)
  20. if err := cc.InsecureServing.Serve(handler, 0, stopCh); err != nil {
  21. return fmt.Errorf("failed to start healthz server: %v", err)
  22. }
  23. }
  24. ......
  25. // 4、启动所有 informer
  26. go cc.PodInformer.Informer().Run(stopCh)
  27. cc.InformerFactory.Start(stopCh)
  28. cc.InformerFactory.WaitForCacheSync(stopCh)
  29. run := func(ctx context.Context) {
  30. sched.Run()
  31. <-ctx.Done()
  32. }
  33. ctx, cancel := context.WithCancel(context.TODO()) // TODO once Run() accepts a context, it should be used here
  34. defer cancel()
  35. go func() {
  36. select {
  37. case <-stopCh:
  38. cancel()
  39. case <-ctx.Done():
  40. }
  41. }()
  42. // 5、选举 leader
  43. if cc.LeaderElection != nil {
  44. ......
  45. }
  46. // 6、执行 sched.Run() 方法
  47. run(ctx)
  48. return fmt.Errorf("finished without leader elect")
  49. }

下面看一下 scheduler.New() 方法是如何初始化 scheduler 结构体的,该方法主要的功能是初始化默认的调度算法以及默认的调度器 GenericScheduler。

  • 创建 scheduler 配置文件
  • 根据默认的 DefaultProvider 初始化 schedulerAlgorithmSource 然后加载默认的预选及优选算法,然后初始化 GenericScheduler
  • 若启动参数提供了 policy config 则使用其覆盖默认的预选及优选算法并初始化 GenericScheduler,不过该参数现已被弃用

k8s.io/kubernetes/pkg/scheduler/scheduler.go:166

  1. func New(......) (*Scheduler, error) {
  2. ......
  3. // 1、创建 scheduler 的配置文件
  4. configurator := factory.NewConfigFactory(&factory.ConfigFactoryArgs{
  5. ......
  6. })
  7. var config *factory.Config
  8. source := schedulerAlgorithmSource
  9. // 2、加载默认的调度算法
  10. switch {
  11. case source.Provider != nil:
  12. // 使用默认的 ”DefaultProvider“ 初始化 config
  13. sc, err := configurator.CreateFromProvider(*source.Provider)
  14. if err != nil {
  15. return nil, fmt.Errorf("couldn't create scheduler using provider %q: %v", *source.Provider, err)
  16. }
  17. config = sc
  18. case source.Policy != nil:
  19. // 通过启动时指定的 policy source 加载 config
  20. ......
  21. default:
  22. return nil, fmt.Errorf("unsupported algorithm source: %v", source)
  23. // Additional tweaks to the config produced by the configurator.
  24. config.Recorder = recorder
  25. config.DisablePreemption = options.disablePreemption
  26. config.StopEverything = stopCh
  27. // 3.创建 scheduler 对象
  28. sched := NewFromConfig(config)
  29. ......
  30. return sched, nil
  31. }

k8s.io/kubernetes/pkg/scheduler/factory/factory.go:527

然后继续看 Run() 方法中最后执行的 sched.Run() 调度循环逻辑,若 informer 中的 cache 同步完成后会启动一个循环逻辑执行 sched.scheduleOne 方法。

k8s.io/kubernetes/pkg/scheduler/scheduler.go:313

  1. func (sched *Scheduler) Run() {
  2. if !sched.config.WaitForCacheSync() {
  3. return
  4. }
  5. go wait.Until(sched.scheduleOne, 0, sched.config.StopEverything)
  6. }

scheduleOne() 每次对一个 pod 进行调度,主要有以下步骤:

  • 从 scheduler 调度队列中取出一个 pod,如果该 pod 处于删除状态则跳过
  • 执行调度逻辑 sched.schedule() 返回通过预算及优选算法过滤后选出的最佳 node
  • 如果过滤算法没有选出合适的 node,则返回 core.FitError
  • 若没有合适的 node 会判断是否启用了抢占策略,若启用了则执行抢占机制
  • 判断是否需要 VolumeScheduling 特性
  • 执行 reserve plugin
  • pod 对应的 spec.NodeName 写上 scheduler 最终选择的 node,更新 scheduler cache
  • 请求 apiserver 异步处理最终的绑定操作,写入到 etcd
  • 执行 permit plugin
  • 执行 prebind plugin
  • 执行 postbind plugin

k8s.io/kubernetes/pkg/scheduler/scheduler.go:515

  1. func (sched *Scheduler) scheduleOne() {
  2. fwk := sched.Framework
  3. pod := sched.NextPod()
  4. if pod == nil {
  5. return
  6. }
  7. // 1.判断 pod 是否处于删除状态
  8. if pod.DeletionTimestamp != nil {
  9. ......
  10. }
  11. // 2.执行调度策略选择 node
  12. start := time.Now()
  13. pluginContext := framework.NewPluginContext()
  14. scheduleResult, err := sched.schedule(pod, pluginContext)
  15. if err != nil {
  16. if fitError, ok := err.(*core.FitError); ok {
  17. // 3.若启用抢占机制则执行
  18. if sched.DisablePreemption {
  19. ......
  20. } else {
  21. preemptionStartTime := time.Now()
  22. sched.preempt(pluginContext, fwk, pod, fitError)
  23. ......
  24. }
  25. ......
  26. metrics.PodScheduleFailures.Inc()
  27. } else {
  28. klog.Errorf("error selecting node for pod: %v", err)
  29. metrics.PodScheduleErrors.Inc()
  30. }
  31. return
  32. }
  33. ......
  34. assumedPod := pod.DeepCopy()
  35. // 4.判断是否需要 VolumeScheduling 特性
  36. allBound, err := sched.assumeVolumes(assumedPod, scheduleResult.SuggestedHost)
  37. if err != nil {
  38. klog.Errorf("error assuming volumes: %v", err)
  39. metrics.PodScheduleErrors.Inc()
  40. return
  41. }
  42. // 5.执行 "reserve" plugins
  43. if sts := fwk.RunReservePlugins(pluginContext, assumedPod, scheduleResult.SuggestedHost); !sts.IsSuccess() {
  44. .....
  45. }
  46. // 6.为 pod 设置 NodeName 字段,更新 scheduler 缓存
  47. err = sched.assume(assumedPod, scheduleResult.SuggestedHost)
  48. if err != nil {
  49. }
  50. // 7.异步请求 apiserver
  51. go func() {
  52. if !allBound {
  53. err := sched.bindVolumes(assumedPod)
  54. if err != nil {
  55. ......
  56. return
  57. }
  58. }
  59. // 8.执行 "permit" plugins
  60. permitStatus := fwk.RunPermitPlugins(pluginContext, assumedPod, scheduleResult.SuggestedHost)
  61. if !permitStatus.IsSuccess() {
  62. ......
  63. }
  64. // 9.执行 "prebind" plugins
  65. preBindStatus := fwk.RunPreBindPlugins(pluginContext, assumedPod, scheduleResult.SuggestedHost)
  66. if !preBindStatus.IsSuccess() {
  67. ......
  68. }
  69. err := sched.bind(assumedPod, scheduleResult.SuggestedHost, pluginContext)
  70. ......
  71. if err != nil {
  72. ......
  73. } else {
  74. ......
  75. // 10.执行 "postbind" plugins
  76. fwk.RunPostBindPlugins(pluginContext, assumedPod, scheduleResult.SuggestedHost)
  77. }
  78. }()
  79. }

scheduleOne() 中通过调用 sched.schedule() 来执行预选与优选算法处理:

k8s.io/kubernetes/pkg/scheduler/scheduler.go:337

sched.Algorithm 是一个 interface,主要包含四个方法,GenericScheduler 是其具体的实现:

  1. type ScheduleAlgorithm interface {
  2. Schedule(*v1.Pod, *framework.PluginContext) (scheduleResult ScheduleResult, err error)
  3. Preempt(*framework.PluginContext, *v1.Pod, error) (selectedNode *v1.Node, preemptedPods []*v1.Pod, cleanupNominatedPods []*v1.Pod, err error)
  4. Predicates() map[string]predicates.FitPredicate
  5. Prioritizers() []priorities.PriorityConfig
  6. }
  • Schedule():正常调度逻辑,包含预算与优选算法的执行
  • Preempt():抢占策略,在 pod 调度发生失败的时候尝试抢占低优先级的 pod,函数返回发生抢占的 node,被 抢占的 pods 列表,nominated node name 需要被移除的 pods 列表以及 error
  • Predicates():predicates 算法列表
  • Prioritizers():prioritizers 算法列表

kube-scheduler 提供的默认调度为 DefaultProvider,DefaultProvider 配置的 predicates 和 priorities policies 在 k8s.io/kubernetes/pkg/scheduler/algorithmprovider/defaults/defaults.go 中定义,算法具体实现是在 k8s.io/kubernetes/pkg/scheduler/algorithm/predicates/k8s.io/kubernetes/pkg/scheduler/algorithm/priorities/ 中,默认的算法如下所示:

pkg/scheduler/algorithmprovider/defaults/defaults.go

  1. func defaultPredicates() sets.String {
  2. return sets.NewString(
  3. predicates.NoVolumeZoneConflictPred,
  4. predicates.MaxEBSVolumeCountPred,
  5. predicates.MaxGCEPDVolumeCountPred,
  6. predicates.MaxAzureDiskVolumeCountPred,
  7. predicates.MaxCSIVolumeCountPred,
  8. predicates.MatchInterPodAffinityPred,
  9. predicates.NoDiskConflictPred,
  10. predicates.GeneralPred,
  11. predicates.CheckNodeMemoryPressurePred,
  12. predicates.CheckNodeDiskPressurePred,
  13. predicates.CheckNodePIDPressurePred,
  14. predicates.CheckNodeConditionPred,
  15. predicates.PodToleratesNodeTaintsPred,
  16. predicates.CheckVolumeBindingPred,
  17. )
  18. }
  19. func defaultPriorities() sets.String {
  20. return sets.NewString(
  21. priorities.SelectorSpreadPriority,
  22. priorities.InterPodAffinityPriority,
  23. priorities.LeastRequestedPriority,
  24. priorities.BalancedResourceAllocation,
  25. priorities.NodePreferAvoidPodsPriority,
  26. priorities.NodeAffinityPriority,
  27. priorities.TaintTolerationPriority,
  28. priorities.ImageLocalityPriority,
  29. )
  30. }

下面继续看 sched.Algorithm.Schedule() 调用具体调度算法的过程:

  • 检查 pod pvc 信息
  • 执行 prefilter plugins
  • 获取 scheduler cache 的快照,每次调度 pod 时都会获取一次快照
  • 执行 g.findNodesThatFit() 预选算法
  • 执行 postfilter plugin
  • 若 node 为 0 直接返回失败的 error,若 node 数为1 直接返回该 node
  • 执行 g.priorityMetaProducer() 获取 metaPrioritiesInterface,计算 pod 的metadata,检查该 node 上是否有相同 meta 的 pod
  • 执行 PrioritizeNodes() 算法
  • 执行 通过得分选择一个最佳的 node

k8s.io/kubernetes/pkg/scheduler/core/generic_scheduler.go:186

至此,scheduler 的整个过程分析完毕。

本文主要对于 kube-scheduler v1.16 的调度流程进行了分析,但其中有大量的细节都暂未提及,包括预选算法以及优选算法的具体实现、优先级与抢占调度的实现、framework 的使用及实现,因篇幅有限,部分内容会在后文继续说明。

参考:

The Kubernetes Scheduler