创建

job 的一个示例如下所示:

扩缩容

job 不支持运行时扩缩容,job 在创建后其 字段也不支持修改。

删除

通常系统中已执行完成的 job 不再需要,将它们保留在系统中会占用一定的资源,需要进行回收,pod 在执行完任务后会进入到 Completed 状态,删除 job 也会清除其创建的 pod。

  1. $ kubectl get pod
  2. pi-gdrwr 0/1 Completed 0 10m
  3. pi-rjphf 0/1 Completed 0 10m
  4. $ kubectl delete job pi
自动清理机制

每次 job 执行完成后手动回收非常麻烦,k8s 在 v1.12 版本中加入了 TTLAfterFinished feature-gates,启用该特性后会启动一个 TTL 控制器,在创建 job 时指定后可在 job 运行完成后自动回收相关联的 pod,如上文中的 yaml 所示,创建 job 时指定了 ttlSecondsAfterFinished: 60,job 在执行完成后停留 60s 会被自动回收, 若 ttlSecondsAfterFinished 设置为 0 则表示在 job 执行完成后立刻回收。当 TTL 控制器清理 job 时,它将级联删除 job,即 pod 和 job 一起被删除。不过该特性截止目前还是 Alpha 版本,请谨慎使用。

startJobController

首先还是直接看 jobController 的启动方法 startJobController,该方法中调用 NewJobController 初始化 jobController 然后调用 Run 方法启动 jobController。从初始化流程中可以看到 JobController 监听 pod 和 job 两种资源,其中 ConcurrentJobSyncs 默认值为 5。

k8s.io/kubernetes/cmd/kube-controller-manager/app/batch.go:33

Run

以下是 jobController 的 Run 方法,其中核心逻辑是调用 jm.worker 执行 syncLoop 操作,worker 方法是 syncJob 方法的别名,最终调用的是 syncJob

k8s.io/kubernetes/pkg/controller/job/job_controller.go:139

  1. func (jm *JobController) Run(workers int, stopCh <-chan struct{}) {
  2. defer utilruntime.HandleCrash()
  3. defer jm.queue.ShutDown()
  4. klog.Infof("Starting job controller")
  5. defer klog.Infof("Shutting down job controller")
  6. if !cache.WaitForNamedCacheSync("job", stopCh, jm.podStoreSynced, jm.jobStoreSynced) {
  7. return
  8. }
  9. for i := 0; i < workers; i++ {
  10. go wait.Until(jm.worker, time.Second, stopCh)
  11. }
  12. <-stopCh
  13. }

syncJob

syncJob 是 jobController 的核心方法,其主要逻辑为:

  • 1、从 lister 中获取 job 对象;
  • 3、获取 job 重试的次数;
  • 4、调用 jm.expectations.SatisfiedExpectations 判断 job 是否需能进行 sync 操作,Expectations 机制在之前写的” ReplicaSetController 源码分析“一文中详细讲解过,其主要判断条件如下:
    • 1、该 key 在 ControllerExpectations 中的 adds 和 dels 都 <= 0,即调用 apiserver 的创建和删除接口没有失败过;
    • 2、该 key 在 ControllerExpectations 中已经超过 5min 没有更新了;
    • 3、该 key 在 ControllerExpectations 中不存在,即该对象是新创建的;
    • 4、调用 GetExpectations 方法失败,内部错误;
  • 5、调用 jm.getPodsForJob 通过 selector 获取 job 关联的 pod,若有孤儿 pod 的 label 与 job 的能匹配则进行关联,若已关联的 pod label 有变化则解除与 job 的关联关系;
  • 6、分别计算 activesucceededfailed 状态的 pod 数;
  • 7、判断 job 是否为首次启动,若首次启动其 job.Status.StartTime 为空,此时首先设置 startTime,然后检查是否有 job.Spec.ActiveDeadlineSeconds 是否为空,若不为空则将其再加入到延迟队列中,等待 ActiveDeadlineSeconds 时间后会再次触发 sync 操作;
  • 8、判断 job 的重试次数是否超过了 job.Spec.BackoffLimit(默认是6次),有两个判断方法一是 job 的重试次数以及 job 的状态,二是当 job 的 restartPolicyOnFailure 时 container 的重启次数,两者任一个符合都说明 job 处于 failed 状态且原因为 BackoffLimitExceeded
  • 9、判断 job 的运行时间是否达到 job.Spec.ActiveDeadlineSeconds 中设定的值,若已达到则说明 job 此时处于 failed 状态且原因为 DeadlineExceeded
  • 10、根据以上判断如果 job 处于 failed 状态,则调用 jm.deleteJobPods 并发删除所有 active pods ;
  • 11、若非 failed 状态,根据 jobNeedsSync 判断是否要进行同步,若需要同步则调用 jm.manageJob 进行同步;
  • 12、通过检查 job.Spec.Completions 判断 job 是否已经运行完成,若 job.Spec.Completions 字段没有设置值则只要有一个 pod 运行完成该 job 就为 Completed 状态,若设置了 job.Spec.Completions 会通过判断已经运行完成状态的 pod 即 succeeded pod 数是否大于等于该值;
  • 13、通过以上判断若 job 运行完成了,则更新 job.Status.Conditionsjob.Status.CompletionTime 字段;
  • 14、如果 job 的 status 有变化,将 job 的 status 更新到 apiserver;

syncJob 中又调用了 jm.manageJob 处理非 failed 状态下的 sync 操作,下面主要分析一下该方法。

k8s.io/kubernetes/pkg/controller/job/job_controller.go:436

  1. func (jm *JobController) syncJob(key string) (bool, error) {
  2. // 1、计算每次 sync 的运行时间
  3. startTime := time.Now()
  4. defer func() {
  5. klog.V(4).Infof("Finished syncing job %q (%v)", key, time.Since(startTime))
  6. }()
  7. ns, name, err := cache.SplitMetaNamespaceKey(key)
  8. return false, err
  9. }
  10. if len(ns) == 0 || len(name) == 0 {
  11. return false, fmt.Errorf("invalid job key %q: either namespace or name is missing", key)
  12. }
  13. // 2、从 lister 中获取 job 对象
  14. sharedJob, err := jm.jobLister.Jobs(ns).Get(name)
  15. if err != nil {
  16. if errors.IsNotFound(err) {
  17. klog.V(4).Infof("Job has been deleted: %v", key)
  18. jm.expectations.DeleteExpectations(key)
  19. }
  20. return false, err
  21. }
  22. job := *sharedJob
  23. // 3、判断 job 是否已经执行完成
  24. if IsJobFinished(&job) {
  25. return true, nil
  26. }
  27. // 4、获取 job 重试的次数
  28. previousRetry := jm.queue.NumRequeues(key)
  29. // 5、判断 job 是否能进行 sync 操作
  30. jobNeedsSync := jm.expectations.SatisfiedExpectations(key)
  31. // 6、获取 job 关联的所有 pod
  32. pods, err := jm.getPodsForJob(&job)
  33. if err != nil {
  34. return false, err
  35. }
  36. // 7、分别计算 active、succeeded、failed 状态的 pod 数
  37. activePods := controller.FilterActivePods(pods)
  38. active := int32(len(activePods))
  39. succeeded, failed := getStatus(pods)
  40. conditions := len(job.Status.Conditions)
  41. // 8、判断 job 是否为首次启动
  42. if job.Status.StartTime == nil {
  43. now := metav1.Now()
  44. job.Status.StartTime = &now
  45. // 9、判断是否设定了 ActiveDeadlineSeconds 值
  46. if job.Spec.ActiveDeadlineSeconds != nil {
  47. klog.V(4).Infof("Job %s have ActiveDeadlineSeconds will sync after %d seconds",
  48. key, *job.Spec.ActiveDeadlineSeconds)
  49. jm.queue.AddAfter(key, time.Duration(*job.Spec.ActiveDeadlineSeconds)*time.Second)
  50. }
  51. }
  52. var manageJobErr error
  53. jobFailed := false
  54. var failureReason string
  55. var failureMessage string
  56. // 10、判断 job 的重启次数是否已达到上限,即处于 BackoffLimitExceeded
  57. jobHaveNewFailure := failed > job.Status.Failed
  58. exceedsBackoffLimit := jobHaveNewFailure && (active != *job.Spec.Parallelism) &&
  59. (int32(previousRetry)+1 > *job.Spec.BackoffLimit)
  60. if exceedsBackoffLimit || pastBackoffLimitOnFailure(&job, pods) {
  61. jobFailed = true
  62. failureReason = "BackoffLimitExceeded"
  63. failureMessage = "Job has reached the specified backoff limit"
  64. } else if pastActiveDeadline(&job) {
  65. jobFailed = true
  66. failureReason = "DeadlineExceeded"
  67. failureMessage = "Job was active longer than specified deadline"
  68. }
  69. // 11、如果处于 failed 状态,则调用 jm.deleteJobPods 并发删除所有 active pods
  70. if jobFailed {
  71. errCh := make(chan error, active)
  72. jm.deleteJobPods(&job, activePods, errCh)
  73. case manageJobErr = <-errCh:
  74. if manageJobErr != nil {
  75. break
  76. }
  77. default:
  78. }
  79. failed += active
  80. active = 0
  81. job.Status.Conditions = append(job.Status.Conditions, newCondition(batch.JobFailed, failureReason, failureMessage))
  82. jm.recorder.Event(&job, v1.EventTypeWarning, failureReason, failureMessage)
  83. } else {
  84. // 12、若非 failed 状态,根据 jobNeedsSync 判断是否要进行同步
  85. if jobNeedsSync && job.DeletionTimestamp == nil {
  86. active, manageJobErr = jm.manageJob(activePods, succeeded, &job)
  87. }
  88. // 13、检查 job.Spec.Completions 判断 job 是否已经运行完成
  89. completions := succeeded
  90. complete := false
  91. if job.Spec.Completions == nil {
  92. if succeeded > 0 && active == 0 {
  93. complete = true
  94. }
  95. } else {
  96. if completions >= *job.Spec.Completions {
  97. complete = true
  98. if active > 0 {
  99. jm.recorder.Event(&job, v1.EventTypeWarning, "TooManyActivePods", "Too many active pods running after completion count reached")
  100. }
  101. if completions > *job.Spec.Completions {
  102. jm.recorder.Event(&job, v1.EventTypeWarning, "TooManySucceededPods", "Too many succeeded pods running after completion count reached")
  103. }
  104. }
  105. }
  106. // 14、若 job 运行完成了,则更新 job.Status.Conditions 和 job.Status.CompletionTime 字段
  107. if complete {
  108. job.Status.Conditions = append(job.Status.Conditions, newCondition(batch.JobComplete, "", ""))
  109. now := metav1.Now()
  110. job.Status.CompletionTime = &now
  111. }
  112. }
  113. forget := false
  114. if job.Status.Succeeded < succeeded {
  115. forget = true
  116. }
  117. // 15、如果 job 的 status 有变化,将 job 的 status 更新到 apiserver
  118. if job.Status.Active != active || job.Status.Succeeded != succeeded || job.Status.Failed != failed || len(job.Status.Conditions) != conditions {
  119. job.Status.Active = active
  120. job.Status.Succeeded = succeeded
  121. job.Status.Failed = failed
  122. if err := jm.updateHandler(&job); err != nil {
  123. return forget, err
  124. }
  125. if jobHaveNewFailure && !IsJobFinished(&job) {
  126. return forget, fmt.Errorf("failed pod(s) detected for job key %q", key)
  127. }
  128. forget = true
  129. }
  130. return forget, manageJobErr
  131. }
jm.manageJob

jm.manageJob它主要做的事情就是根据 job 配置的并发数来确认当前处于 active 的 pods 数量是否合理,如果不合理的话则进行调整,其主要逻辑为:

  • 1、首先获取 job 的 active pods 数与可运行的 pod 数即 job.Spec.Parallelism
  • 2、判断如果处于 active 状态的 pods 数大于 job 设置的并发数 job.Spec.Parallelism,则并发删除多余的 active pods,需要删除的 active pods 是有一定的优先级的,删除的优先级为:

    • 1、判断是否绑定了 node:Unassigned < assigned;
    • 2、判断 pod phase:PodPending < PodUnknown < PodRunning;
    • 3、判断 pod 状态:Not ready < ready;
    • 4、若 pod 都为 ready,则按运行时间排序,运行时间最短会被删除:empty time < less time < more time;
    • 5、根据 pod 重启次数排序:higher restart counts < lower restart counts;
    • 6、按 pod 创建时间进行排序:Empty creation time pods < newer pods < older pods;
  • 3、若处于 active 状态的 pods 数小于 job 设置的并发数,则需要根据 job 的配置计算 pod 的 diff 数并进行创建,计算方法与 completionsparallelism 以及 succeeded 的 pods 数有关,计算出 diff 数后会进行批量创建,创建的 pod 数依次为 1、2、4、8……,呈指数级增长,job 创建 pod 的方式与 rs 创建 pod 是类似的,但是此处并没有限制在一个 syncLoop 中创建 pod 的上限值,创建完 pod 后会将结果记录在 job 的 expectations 中,此处并非所有的 pod 都能创建成功,若超时错误会直接忽略,因其他错误创建失败的 pod 会记录在 expectations 中, 机制的主要目的是减少不必要的 sync 操作,至于其详细的说明可以参考之前写的 ” ReplicaSetController 源码分析“ 一文;

以上就是 jobController 源码中主要的逻辑,从上文分析可以看到 jobController 的代码比较清晰,若看过前面写的几个 controller 分析会发现每个 controller 在功能实现上有很多类似的地方。