在分析源码前先考虑一下 replicaset 的使用场景,在平时的操作中其实我们并不会直接操作 replicaset,replicaset 也仅有几个简单的操作,创建、删除、更新等,但其地位是非常重要的,replicaset 的主要功能就是通过 add/del pod 来达到期望的状态。

    启动流程

    首先来看 replicaSetController 对象初始化以及启动的代码,在 startReplicaSetController 中有两个比较重要的变量:

    • BurstReplicas:用来控制在一个 syncLoop 过程中 rs 最多能创建的 pod 数量,设置上限值是为了避免单个 rs 影响整个系统,默认值为 500;
    • ConcurrentRSSyncs:指的是需要启动多少个 goroutine 处理 informer 队列中的对象,默认值为 5;

    下面是 replicaSetController 初始化的具体步骤,可以看到其会监听 pod 以及 rs 两个对象的事件。

    k8s.io/kubernetes/pkg/controller/replicaset/replica_set.go:109

    1. func NewReplicaSetController(......) *ReplicaSetController {
    2. ......
    3. // 1、此处调用 NewBaseController
    4. return NewBaseController(rsInformer, podInformer, kubeClient, burstReplicas,
    5. apps.SchemeGroupVersion.WithKind("ReplicaSet"),
    6. "replicaset_controller",
    7. "replicaset",
    8. controller.RealPodControl{
    9. KubeClient: kubeClient,
    10. Recorder: eventBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "replicaset-controller"}),
    11. },
    12. )
    13. }
    14. func NewBaseController(......) *ReplicaSetController {
    15. ......
    16. // 2、ReplicaSetController 初始化
    17. rsc := &ReplicaSetController{
    18. GroupVersionKind: gvk,
    19. kubeClient: kubeClient,
    20. podControl: podControl,
    21. burstReplicas: burstReplicas,
    22. // 3、expectations 的初始化
    23. expectations: controller.NewUIDTrackingControllerExpectations(controller.NewControllerExpectations()),
    24. queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), queueName),
    25. }
    26. // 4、rsInformer 中注册的 EventHandler
    27. rsInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
    28. AddFunc: rsc.enqueueReplicaSet,
    29. UpdateFunc: rsc.updateRS,
    30. DeleteFunc: rsc.enqueueReplicaSet,
    31. })
    32. ......
    33. // 5、podInformer 中注册的 EventHandler
    34. podInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
    35. AddFunc: rsc.addPod,
    36. UpdateFunc: rsc.updatePod,
    37. DeleteFunc: rsc.deletePod,
    38. })
    39. ......
    40. return rsc
    41. }

    replicaSetController 初始化完成后会调用 Run 方法启动 5 个 goroutine 处理 informer 队列中的事件并进行 sync 操作,kube-controller-manager 中每个 controller 的启动操作都是如下所示流程。

    k8s.io/kubernetes/pkg/controller/replicaset/replica_set.go:177

    1. func (rsc *ReplicaSetController) Run(workers int, stopCh <-chan struct{}) {
    2. ......
    3. // 1、等待 informer 同步缓存
    4. if !cache.WaitForNamedCacheSync(rsc.Kind, stopCh, rsc.podListerSynced, rsc.rsListerSynced) {
    5. return
    6. }
    7. // 2、启动 5 个 goroutine 执行 worker 方法
    8. for i := 0; i < workers; i++ {
    9. go wait.Until(rsc.worker, time.Second, stopCh)
    10. }
    11. <-stopCh
    12. }
    13. // 3、worker 方法中调用 rocessNextWorkItem
    14. func (rsc *ReplicaSetController) worker() {
    15. for rsc.processNextWorkItem() {
    16. }
    17. }
    18. func (rsc *ReplicaSetController) processNextWorkItem() bool {
    19. // 4、从队列中取出对象
    20. key, quit := rsc.queue.Get()
    21. if quit {
    22. return false
    23. }
    24. defer rsc.queue.Done(key)
    25. // 5、执行 sync 操作
    26. err := rsc.syncHandler(key.(string))
    27. ......
    28. return true
    29. }

    EventHandler

    初始化 replicaSetController 时,其中有一个 expectations 字段,这是 rs 中一个比较特殊的机制,为了说清楚 expectations,先来看一下 controller 中所注册的 eventHandler,replicaSetController 会 watch pod 和 replicaSet 两个对象,eventHandler 中注册了对这两种对象的 add、update、delete 三个操作。

    addPod
    • 1、判断 pod 是否处于删除状态;
    • 2、获取该 pod 关联的 rs 以及 rsKey,入队 rs 并更新 rsKey 的 expectations;
    • 3、若 pod 对象没体现出关联的 rs 则为孤儿 pod,遍历 rsList 查找匹配的 rs,若该 rs.Namespace == pod.Namespace 并且 rs.Spec.Selector 匹配 pod.Labels,则说明该 pod 应该与此 rs 关联,将匹配的 rs 入队;

    k8s.io/kubernetes/pkg/controller/replicaset/replica_set.go:255

    1. func (rsc *ReplicaSetController) addPod(obj interface{}) {
    2. pod := obj.(*v1.Pod)
    3. if pod.DeletionTimestamp != nil {
    4. rsc.deletePod(pod)
    5. return
    6. }
    7. // 1、获取 pod 所关联的 rs
    8. if controllerRef := metav1.GetControllerOf(pod); controllerRef != nil {
    9. rs := rsc.resolveControllerRef(pod.Namespace, controllerRef)
    10. if rs == nil {
    11. return
    12. }
    13. rsKey, err := controller.KeyFunc(rs)
    14. if err != nil {
    15. return
    16. }
    17. // 2、更新 expectations,rsKey 的 add - 1
    18. rsc.expectations.CreationObserved(rsKey)
    19. rsc.enqueueReplicaSet(rs)
    20. return
    21. }
    22. rss := rsc.getPodReplicaSets(pod)
    23. if len(rss) == 0 {
    24. return
    25. }
    26. for _, rs := range rss {
    27. rsc.enqueueReplicaSet(rs)
    28. }
    29. }
    updatePod
    • 1、如果 pod label 改变或者处于删除状态,则直接删除;
    • 2、如果 pod 的 OwnerReference 发生改变,此时 oldRS 需要创建 pod,将 oldRS 入队;
    • 3、获取 pod 关联的 rs,入队 rs,若 pod 当前处于 ready 并非 available 状态,则会再次将该 rs 加入到延迟队列中,因为 pod 从 ready 到 available 状态需要触发一次 status 的更新;
    • 4、否则为孤儿 pod,遍历 rsList 查找匹配的 rs,若找到则将 rs 入队;

    k8s.io/kubernetes/pkg/controller/replicaset/replica_set.go:298

    1. func (rsc *ReplicaSetController) updatePod(old, cur interface{}) {
    2. curPod := cur.(*v1.Pod)
    3. oldPod := old.(*v1.Pod)
    4. if curPod.ResourceVersion == oldPod.ResourceVersion {
    5. return
    6. }
    7. // 1、如果 pod label 改变或者处于删除状态,则直接删除
    8. labelChanged := !reflect.DeepEqual(curPod.Labels, oldPod.Labels)
    9. if curPod.DeletionTimestamp != nil {
    10. rsc.deletePod(curPod)
    11. if labelChanged {
    12. rsc.deletePod(oldPod)
    13. }
    14. return
    15. }
    16. // 2、如果 pod 的 OwnerReference 发生改变,将 oldRS 入队
    17. curControllerRef := metav1.GetControllerOf(curPod)
    18. oldControllerRef := metav1.GetControllerOf(oldPod)
    19. controllerRefChanged := !reflect.DeepEqual(curControllerRef, oldControllerRef)
    20. if controllerRefChanged && oldControllerRef != nil {
    21. if rs := rsc.resolveControllerRef(oldPod.Namespace, oldControllerRef); rs != nil {
    22. rsc.enqueueReplicaSet(rs)
    23. }
    24. }
    25. if curControllerRef != nil {
    26. rs := rsc.resolveControllerRef(curPod.Namespace, curControllerRef)
    27. if rs == nil {
    28. return
    29. }
    30. if !podutil.IsPodReady(oldPod) && podutil.IsPodReady(curPod) && rs.Spec.MinReadySeconds > 0 {
    31. rsc.enqueueReplicaSetAfter(rs, (time.Duration(rs.Spec.MinReadySeconds)*time.Second)+time.Second)
    32. }
    33. return
    34. }
    35. // 4、查找匹配的 rs
    36. if labelChanged || controllerRefChanged {
    37. rss := rsc.getPodReplicaSets(curPod)
    38. if len(rss) == 0 {
    39. return
    40. }
    41. for _, rs := range rss {
    42. rsc.enqueueReplicaSet(rs)
    43. }
    44. }
    45. }
    deletePod
    • 1、确认该对象是否为 pod;
    • 2、判断是否为孤儿 pod;
    • 3、获取其对应的 rs 以及 rsKey;
    • 4、更新 expectations 中 rsKey 的 del 值;
    • 5、将 rs 入队;

    k8s.io/kubernetes/pkg/controller/replicaset/replica_set.go:372

    AddRS 和 DeleteRS

    以上两个操作仅仅是将对应的 rs 入队。

    UpdateRS

    其实 updateRS 也仅仅是将对应的 rs 进行入队,不过多了一个打印日志的操作,如下所示:

    1. func (rsc *ReplicaSetController) updateRS(old, cur interface{}) {
    2. oldRS := old.(*apps.ReplicaSet)
    3. curRS := cur.(*apps.ReplicaSet)
    4. if *(oldRS.Spec.Replicas) != *(curRS.Spec.Replicas) {
    5. klog.V(4).Infof("%v %v updated. Desired pod count change: %d->%d", rsc.Kind, curRS.Name, *(oldRS.Spec.Replicas), *(curRS.Spec.Replicas))
    6. }
    7. rsc.enqueueReplicaSet(cur)
    8. }

    至于 expectations 机制会在下文进行分析。

    syncReplicaSet

    syncReplicaSet 是 controller 的核心方法,它会驱动 controller 所控制的对象达到期望状态,主要逻辑如下所示:

    • 1、根据 ns/name 获取 rs 对象;
    • 2、调用 expectations.SatisfiedExpectations 判断是否需要执行真正的 sync 操作;
    • 3、获取所有 pod list;
    • 4、根据 pod label 进行过滤获取与该 rs 关联的 pod 列表,对于其中的孤儿 pod 若与该 rs label 匹配则进行关联,若已关联的 pod 与 rs label 不匹配则解除关联关系;
    • 5、调用 manageReplicas 进行同步 pod 操作,add/del pod;
    • 6、计算 rs 当前的 status 并进行更新;
    • 7、若 rs 设置了 MinReadySeconds 字段则将该 rs 加入到延迟队列中;

    k8s.io/kubernetes/pkg/controller/replicaset/replica_set.go:562

    1. func (rsc *ReplicaSetController) syncReplicaSet(key string) error {
    2. ......
    3. namespace, name, err := cache.SplitMetaNamespaceKey(key)
    4. if err != nil {
    5. return err
    6. }
    7. // 1、根据 ns/name 从 informer cache 中获取 rs 对象,
    8. // 若 rs 已经被删除则直接删除 expectations 中的对象
    9. rs, err := rsc.rsLister.ReplicaSets(namespace).Get(name)
    10. if errors.IsNotFound(err) {
    11. rsc.expectations.DeleteExpectations(key)
    12. return nil
    13. }
    14. ......
    15. // 2、判断该 rs 是否需要执行 sync 操作
    16. rsNeedsSync := rsc.expectations.SatisfiedExpectations(key)
    17. selector, err := metav1.LabelSelectorAsSelector(rs.Spec.Selector)
    18. if err != nil {
    19. ......
    20. }
    21. // 3、获取所有 pod list
    22. allPods, err := rsc.podLister.Pods(rs.Namespace).List(labels.Everything())
    23. ......
    24. // 4、过滤掉异常 pod,处于删除状态或者 failed 状态的 pod 都为非 active 状态
    25. filteredPods := controller.FilterActivePods(allPods)
    26. // 5、检查所有 pod,根据 pod 并进行 adopt 与 release 操作,最后获取与该 rs 关联的 pod list
    27. filteredPods, err = rsc.claimPods(rs, selector, filteredPods)
    28. ......
    29. // 6、若需要 sync 则执行 manageReplicas 创建/删除 pod
    30. var manageReplicasErr error
    31. if rsNeedsSync && rs.DeletionTimestamp == nil {
    32. manageReplicasErr = rsc.manageReplicas(filteredPods, rs)
    33. }
    34. rs = rs.DeepCopy()
    35. // 7、计算 rs 当前的 status
    36. newStatus := calculateStatus(rs, filteredPods, manageReplicasErr)
    37. // 8、更新 rs status
    38. updatedRS, err := updateReplicaSetStatus(rsc.kubeClient.AppsV1().ReplicaSets(rs.Namespace), rs, newStatus)
    39. // 9、判断是否需要将 rs 加入到延迟队列中
    40. if manageReplicasErr == nil && updatedRS.Spec.MinReadySeconds > 0 &&
    41. updatedRS.Status.ReadyReplicas == *(updatedRS.Spec.Replicas) &&
    42. updatedRS.Status.AvailableReplicas != *(updatedRS.Spec.Replicas) {
    43. rsc.enqueueReplicaSetAfter(updatedRS, time.Duration(updatedRS.Spec.MinReadySeconds)*time.Second)
    44. }
    45. return manageReplicasErr
    46. }

    syncReplicaSet 方法中有几个重要的操作分别为:rsc.expectations.SatisfiedExpectationsrsc.manageReplicascalculateStatus,下面一一进行分析。

    SatisfiedExpectations

    该方法主要判断 rs 是否需要执行真正的同步操作,若需要 add/del pod 或者 expectations 已过期则需要进行同步操作。

    k8s.io/kubernetes/pkg/controller/controller_utils.go:181

    1. func (r *ControllerExpectations) SatisfiedExpectations(controllerKey string) bool {
    2. // 1、若该 key 存在时,判断是否满足条件或者是否超过同步周期
    3. if exp, exists, err := r.GetExpectations(controllerKey); exists {
    4. if exp.Fulfilled() {
    5. return true
    6. } else if exp.isExpired() {
    7. return true
    8. } else {
    9. return false
    10. }
    11. } else if err != nil {
    12. ......
    13. } else {
    14. // 2、该 rs 可能为新创建的,需要进行 sync
    15. ......
    16. }
    17. return true
    18. }
    19. // 3、若 add <= 0 且 del <= 0 说明本地观察到的状态已经为期望状态了
    20. func (e *ControlleeExpectations) Fulfilled() bool {
    21. return atomic.LoadInt64(&e.add) <= 0 && atomic.LoadInt64(&e.del) <= 0
    22. }
    23. // 4、判断 key 是否过期,ExpectationsTimeout 默认值为 5 * time.Minute
    24. func (exp *ControlleeExpectations) isExpired() bool {
    25. return clock.RealClock{}.Since(exp.timestamp) > ExpectationsTimeout
    26. }
    manageReplicas

    manageReplicas 是最核心的方法,它会计算 replicaSet 需要创建或者删除多少个 pod 并调用 apiserver 的接口进行操作,在此阶段仅仅是调用 apiserver 的接口进行创建,并不保证 pod 成功运行,如果在某一轮,未能成功创建的所有 Pod 对象,则不再创建剩余的 pod。一个周期内最多只能创建或删除 500 个 pod,若超过上限值未创建完成的 pod 数会在下一个 syncLoop 继续进行处理。

    该方法主要逻辑如下所示:

    • 1、计算已存在 pod 数与期望数的差异;
    • 2、如果 diff < 0 说明 rs 实际的 pod 数未达到期望值需要继续创建 pod,首先会将需要创建的 pod 数在 expectations 中进行记录,然后调用 slowStartBatch 创建所需要的 pod,slowStartBatch 以指数级增长的方式批量创建 pod,创建 pod 过程中若出现 timeout err 则忽略,若为其他 err 则终止创建操作并更新 expectations;
    • 3、如果 diff > 0 说明可能是一次缩容操作需要删除多余的 pod,如果需要删除全部的 pod 则直接进行删除,否则会通过 getPodsToDelete 方法筛选出需要删除的 pod,具体的筛选策略在下文会将到,然后并发删除这些 pod,对于删除失败操作也会记录在 expectations 中;

    slowStartBatch 中会调用 rsc.podControl.CreatePodsWithControllerRef 方法创建 pod,若创建 pod 失败会判断是否为创建超时错误,或者可能是超时后失败,但此时认为超时并不影响后续的批量创建动作,大家知道,创建 pod 操作提交到 apiserver 后会经过认证、鉴权、以及动态访问控制三个步骤,此过程有可能会超时,即使真的创建失败了,等到 expectations 过期后在下一个 syncLoop 时会重新创建。

    k8s.io/kubernetes/pkg/controller/replicaset/replica_set.go:459

    1. func (rsc *ReplicaSetController) manageReplicas(......) error {
    2. // 1、计算已存在 pod 数与期望数的差异
    3. diff := len(filteredPods) - int(*(rs.Spec.Replicas))
    4. rsKey, err := controller.KeyFunc(rs)
    5. if err != nil {
    6. ......
    7. }
    8. 2、如果 <0,则需要创建 pod
    9. if diff < 0 {
    10. diff *= -1
    11. 3、判断需要创建的 pod 数是否超过单次 sync 上限值 500
    12. if diff > rsc.burstReplicas {
    13. diff = rsc.burstReplicas
    14. }
    15. 4、在 expectations 中进行记录,若该 key 已经存在会进行覆盖
    16. rsc.expectations.ExpectCreations(rsKey, diff)
    17. 5、调用 slowStartBatch 创建所需要的 pod
    18. successfulCreations, err := slowStartBatch(diff, controller.SlowStartInitialBatchSize, func() error {
    19. err := rsc.podControl.CreatePodsWithControllerRef(rs.Namespace, &rs.Spec.Template, rs, metav1.NewControllerRef(rs, rsc.GroupVersionKind))
    20. // 6、若为 timeout err 则忽略
    21. if err != nil && errors.IsTimeout(err) {
    22. return nil
    23. }
    24. return err
    25. })
    26. // 7、计算未创建的 pod 数,并记录在 expectations 中
    27. // 若 pod 创建成功,informer watch 到事件后会在 addPod handler 中更新 expectations
    28. if skippedPods := diff - successfulCreations; skippedPods > 0 {
    29. rsc.expectations.CreationObserved(rsKey)
    30. }
    31. return err
    32. } else if diff > 0 {
    33. // 8、若 diff >0 说明需要删除多创建的 pod
    34. if diff > rsc.burstReplicas {
    35. diff = rsc.burstReplicas
    36. }
    37. // 9、getPodsToDelete 会按照一定的策略找出需要删除的 pod 列表
    38. podsToDelete := getPodsToDelete(filteredPods, diff)
    39. // 10、在 expectations 中进行记录,若该 key 已经存在会进行覆盖
    40. rsc.expectations.ExpectDeletions(rsKey, getPodKeys(podsToDelete))
    41. // 11、进行并发删除的操作
    42. errCh := make(chan error, diff)
    43. var wg sync.WaitGroup
    44. wg.Add(diff)
    45. for _, pod := range podsToDelete {
    46. go func(targetPod *v1.Pod) {
    47. defer wg.Done()
    48. if err := rsc.podControl.DeletePod(rs.Namespace, targetPod.Name, rs); err != nil {
    49. podKey := controller.PodKey(targetPod)
    50. // 12、某次删除操作若失败会记录在 expectations 中
    51. rsc.expectations.DeletionObserved(rsKey, podKey)
    52. errCh <- err
    53. }
    54. }(pod)
    55. }
    56. wg.Wait()
    57. // 13、返回其中一条 err
    58. select {
    59. case err := <-errCh:
    60. if err != nil {
    61. return err
    62. }
    63. default:
    64. }
    65. }
    66. return nil
    67. }

    slowStartBatch 会批量创建出已计算出的 diff pod 数,创建的 pod 数依次为 1、2、4、8……,呈指数级增长,其方法如下所示:

    k8s.io/kubernetes/pkg/controller/replicaset/replica_set.go:658

    若 diff > 0 时再删除 pod 阶段会调用getPodsToDelete 对 pod 进行筛选操作,此阶段会选出最劣质的 pod,下面是用到的 6 种筛选方法:

    • 1、判断是否绑定了 node:Unassigned < assigned;
    • 2、判断 pod phase:PodPending < PodUnknown < PodRunning;
    • 3、判断 pod 状态:Not ready < ready;
    • 4、若 pod 都为 ready,则按运行时间排序,运行时间最短会被删除:empty time < less time < more time;
    • 5、根据 pod 重启次数排序:higher restart counts < lower restart counts;
    • 6、按 pod 创建时间进行排序:Empty creation time pods < newer pods < older pods;

    上面的几个排序规则遵循互斥原则,从上到下进行匹配,符合条件则排序完成,代码如下所示:

    1. func getPodsToDelete(filteredPods []*v1.Pod, diff int) []*v1.Pod {
    2. if diff < len(filteredPods) {
    3. sort.Sort(controller.ActivePods(filteredPods))
    4. }
    5. return filteredPods[:diff]
    6. }

    k8s.io/kubernetes/pkg/controller/controller_utils.go:735

    1. type ActivePods []*v1.Pod
    2. func (s ActivePods) Len() int { return len(s) }
    3. func (s ActivePods) Swap(i, j int) { s[i], s[j] = s[j], s[i] }
    4. func (s ActivePods) Less(i, j int) bool {
    5. // 1. Unassigned < assigned
    6. if s[i].Spec.NodeName != s[j].Spec.NodeName && (len(s[i].Spec.NodeName) == 0 || len(s[j].Spec.NodeName) == 0) {
    7. return len(s[i].Spec.NodeName) == 0
    8. }
    9. // 2. PodPending < PodUnknown < PodRunning
    10. m := map[v1.PodPhase]int{v1.PodPending: 0, v1.PodUnknown: 1, v1.PodRunning: 2}
    11. if m[s[i].Status.Phase] != m[s[j].Status.Phase] {
    12. return m[s[i].Status.Phase] < m[s[j].Status.Phase]
    13. }
    14. // 3. Not ready < ready
    15. if podutil.IsPodReady(s[i]) != podutil.IsPodReady(s[j]) {
    16. return !podutil.IsPodReady(s[i])
    17. }
    18. // 4. Been ready for empty time < less time < more time
    19. if podutil.IsPodReady(s[i]) && podutil.IsPodReady(s[j]) && !podReadyTime(s[i]).Equal(podReadyTime(s[j])) {
    20. return afterOrZero(podReadyTime(s[i]), podReadyTime(s[j]))
    21. }
    22. // 5. Pods with containers with higher restart counts < lower restart counts
    23. if maxContainerRestarts(s[i]) != maxContainerRestarts(s[j]) {
    24. return maxContainerRestarts(s[i]) > maxContainerRestarts(s[j])
    25. }
    26. // 6. Empty creation time pods < newer pods < older pods
    27. if !s[i].CreationTimestamp.Equal(&s[j].CreationTimestamp) {
    28. return afterOrZero(&s[i].CreationTimestamp, &s[j].CreationTimestamp)
    29. }
    30. return false
    31. }
    calculateStatus

    calculateStatus 会通过当前 pod 的状态计算出 rs 中 status 字段值,status 字段如下所示:

    1. status:
    2. availableReplicas: 10
    3. fullyLabeledReplicas: 10
    4. observedGeneration: 1
    5. readyReplicas: 10
    6. replicas: 10

    k8s.io/kubernetes/pkg/controller/replicaset/replica_set_utils.go:85

    1. func calculateStatus(......) apps.ReplicaSetStatus {
    2. newStatus := rs.Status
    3. fullyLabeledReplicasCount := 0
    4. readyReplicasCount := 0
    5. availableReplicasCount := 0
    6. templateLabel := labels.Set(rs.Spec.Template.Labels).AsSelectorPreValidated()
    7. for _, pod := range filteredPods {
    8. if templateLabel.Matches(labels.Set(pod.Labels)) {
    9. fullyLabeledReplicasCount++
    10. }
    11. if podutil.IsPodReady(pod) {
    12. readyReplicasCount++
    13. if podutil.IsPodAvailable(pod, rs.Spec.MinReadySeconds, metav1.Now()) {
    14. availableReplicasCount++
    15. }
    16. }
    17. }
    18. failureCond := GetCondition(rs.Status, apps.ReplicaSetReplicaFailure)
    19. if manageReplicasErr != nil && failureCond == nil {
    20. var reason string
    21. if diff := len(filteredPods) - int(*(rs.Spec.Replicas)); diff < 0 {
    22. reason = "FailedCreate"
    23. } else if diff > 0 {
    24. reason = "FailedDelete"
    25. }
    26. cond := NewReplicaSetCondition(apps.ReplicaSetReplicaFailure, v1.ConditionTrue, reason, manageReplicasErr.Error())
    27. SetCondition(&newStatus, cond)
    28. } else if manageReplicasErr == nil && failureCond != nil {
    29. RemoveCondition(&newStatus, apps.ReplicaSetReplicaFailure)
    30. }
    31. newStatus.Replicas = int32(len(filteredPods))
    32. newStatus.FullyLabeledReplicas = int32(fullyLabeledReplicasCount)
    33. newStatus.ReadyReplicas = int32(readyReplicasCount)
    34. newStatus.AvailableReplicas = int32(availableReplicasCount)
    35. return newStatus
    36. }

    expectations 机制

    通过上面的分析可知,在 rs 每次入队后进行 sync 操作时,首先需要判断该 rs 是否满足 expectations 机制,那么这个 expectations 的目的是什么?其实,rs 除了有 informer 的缓存外,还有一个本地缓存就是 expectations,expectations 会记录 rs 所有对象需要 add/del 的 pod 数量,若两者都为 0 则说明该 rs 所期望创建的 pod 或者删除的 pod 数已经被满足,若不满足则说明某次在 syncLoop 中创建或者删除 pod 时有失败的操作,则需要等待 expectations 过期后再次同步该 rs。

    通过上面对 eventHandler 的分析,再来总结一下触发 replicaSet 对象发生同步事件的条件:

    • 1、与 rs 相关的:AddRS、UpdateRS、DeleteRS;
    • 2、与 pod 相关的:AddPod、UpdatePod、DeletePod;
    • 3、informer 二级缓存的同步;

    但是所有的更新事件是否都需要执行 sync 操作?对于除 rs.Spec.Replicas 之外的更新操作其实都没必要执行 sync 操作,因为 spec 其他字段和 status 的更新都不需要创建或者删除 pod。

    在 sync 操作真正开始之前,依据 expectations 机制进行判断,确定是否要真正地启动一次 sync,因为在 eventHandler 阶段也会更新 expectations 值,从上面的 eventHandler 中可以看到在 addPod 中会调用 rsc.expectations.CreationObserved 更新 rsKey 的 expectations,将其 add 值 -1,在 deletePod 中调用 rsc.expectations.DeletionObserved 将其 del 值 -1。所以等到 sync 时,若 controllerKey(name 或者 ns/name)满足 expectations 机制则进行 sync 操作,而 updatePod 并不会修改 expectations,所以,expectations 的设计就是当需要创建或删除 pod 才会触发对应的 sync 操作,expectations 机制的目的就是减少不必要的 sync 操作。

    什么条件下 expectations 机制会满足?

    • 1、当 expectations 中不存在 rsKey 时,也就说首次创建 rs 时;
    • 2、当 expectations 中 del 以及 add 值都为 0 时,即 rs 所需要创建或者删除的 pod 数都已满足;
    • 3、当 expectations 过期时,即超过 5 分钟未进行 sync 操作;

    最后再看一下 expectations 中用到的几个方法:

    当在 syncLoop 中发现满足条件时,会执行 manageReplicas 方法,在 manageReplicas 中无论是为 rs 创建还是删除 pod 都会调用 ExpectCreations 和 ExpectDeletions 为 rsKey 创建 expectations 对象。

    总结

    本文主要从源码层面分析了 replicaSetController 的设计与实现,但是不得不说其在设计方面考虑了很多因素,文中只提到了笔者理解了或者思考后稍有了解的一些机制,至于其他设计思想还得自行阅读代码体会。

    下面以一个流程图总结下创建 rs 的主要流程。

    1. SatisfiedExpectations
    2. (expectations 中不存在
    3. rsKeyrsNeedsSync
    4. true)
    5. | 判断 add/del pod
    6. | |
    7. |
    8. | 创建 expectations 对象,
    9. | 并设置 add/del
    10. |
    11. create rs --> syncReplicaSet --> manageReplicas -->
    12. (为 rs 创建 pod) 调用 slowStartBatch 批量创建 pod/
    13. | 删除筛选出的多余 pod
    14. | |
    15. |
    16. | 更新 expectations 对象
    17. updateReplicaSetStatus

    参考:

    https://keyla.vip/k8s/3-master/controller/replica-set/