在前面的文章中已经介绍过 kubelet 的架构以及启动流程,本章会继续介绍 kubelet 中的核心功能,kubelet 中包含数十个 manager 以及对 CNI、CRI、CSI 的调用。每个 manager 的功能各不相同,manager 之间也会有依赖关系,本文会介绍比较简单的 statusManager。

    statusManager 的主要功能是将 pod 状态信息同步到 apiserver,statusManage 并不会主动监控 pod 的状态,而是提供接口供其他 manager 进行调用。

    statusManager 的初始化

    kubelet 在启动流程时会在 方法中初始化其核心组件,包括各种 manager。

    k8s.io/kubernetes/pkg/kubelet/kubelet.go:335

    NewManager 是用来初始化 statusManager 对象的,其中参数的功能如下所示:

    • kubeClient:用于和 apiserver 交互;
    • podManager:负责内存中 pod 的维护;
    • podStatuses:statusManager 的 cache,保存 pod 与状态的对应关系;
    • podStatusesChannel:当其他组件调用 statusManager 更新 pod 状态时,会将 pod 的状态信息发送到podStatusesChannel 中;
    • apiStatusVersions:维护最新的 pod status 版本号,每更新一次会加1;
    • podDeletionSafety:删除 pod 的接口;

    k8s.io/kubernetes/pkg/kubelet/status/status_manager.go:118

    1. func NewManager(kubeClient clientset.Interface, podManager kubepod.Manager, podDeletionSafety PodDeletionSafetyProvider) Manager {
    2. return &manager{
    3. kubeClient: kubeClient,
    4. podManager: podManager,
    5. podStatuses: make(map[types.UID]versionedPodStatus),
    6. podStatusChannel: make(chan podStatusSyncRequest, 1000),
    7. apiStatusVersions: make(map[kubetypes.MirrorPodUID]uint64),
    8. podDeletionSafety: podDeletionSafety,
    9. }
    10. }

    在初始化完成后,kubelet 会在 Run 方法中会以 goroutine 的方式启动 statusManager。

    k8s.io/kubernetes/pkg/kubelet/kubelet.go:1398

    1. func (kl *Kubelet) Run(updates <-chan kubetypes.PodUpdate) {
    2. ......
    3. kl.statusManager.Start()
    4. ......
    5. }

    statusManager 的代码主要在 k8s.io/kubernetes/pkg/kubelet/status/ 目录中,其对外暴露的接口有以下几个:

    1. type Manager interface {
    2. // 一个 interface 用来暴露给其他组件获取 pod status 的
    3. PodStatusProvider
    4. // 启动 statusManager 的方法
    5. Start()
    6. // 设置 pod 的状态并会触发一个状态同步操作
    7. SetPodStatus(pod *v1.Pod, status v1.PodStatus)
    8. // 设置 pod .status.containerStatuses 中 container 是否为 ready 状态并触发状态同步操作
    9. SetContainerReadiness(podUID types.UID, containerID kubecontainer.ContainerID, ready bool)
    10. // 设置 pod .status.containerStatuses 中 container 是否为 started 状态并触发状态同步操作
    11. SetContainerStartup(podUID types.UID, containerID kubecontainer.ContainerID, started bool)
    12. // 将 pod .status.containerStatuses 和 .status.initContainerStatuses 中 container 的 state 置为 Terminated 状态并触发状态同步操作
    13. TerminatePod(pod *v1.Pod)
    14. // 从 statusManager 缓存 podStatuses 中删除对应的 pod
    15. RemoveOrphanedStatuses(podUIDs map[types.UID]bool)
    16. }

    pod 对应的 status 字段如下所示:

    1. status:
    2. conditions:
    3. ......
    4. containerStatuses:
    5. - containerID: containerd://64e9d88459b38e90c2a4b4d87db5acd180c820c855a55aabe38e4e11b9b83576
    6. image: docker.io/library/nginx:1.9
    7. imageID: sha256:f568d3158b1e871b713cb33aca5a9377bc21a1f644addf41368393d28c35e894
    8. lastState: {}
    9. name: nginx-pod
    10. ready: true
    11. restartCount: 0
    12. started: true
    13. state:
    14. running:
    15. startedAt: "2019-12-15T16:13:29Z"
    16. podIP: 10.15.225.15
    17. ......

    然后继续看 statusManager 的启动方法 start, 其主要逻辑为:

    • 1、设置定时器,syncPeriod 默认为 10s;
    • 2、启动 wait.Forever goroutine 同步 pod 的状态,有两种同步方式,第一种是当监听到某个 pod 状态改变时会调用 m.syncPod 进行同步,第二种是当触发定时器时调用 m.syncBatch 进行批量同步;

    k8s.io/kubernetes/pkg/kubelet/status/status_manager.go:147

    syncPod

    • 1、调用 m.needsUpdate 判断是否需要同步状态,若 apiStatusVersions 中的 status 版本号小于当前接收到的 status 版本号或者 apistatusVersions 中不存在该 status 版本号则需要同步,若不需要同步则继续检查 pod 是否处于删除状态,若处于删除状态调用 m.podDeletionSafety.PodResourcesAreReclaimed 将 pod 完全删除;
    • 2、从 apiserver 获取 pod 的 oldStatus;
    • 3、检查 pod oldStatuscurrentStatus 的 uid 是否相等,若不相等则说明 pod 被重建过;
    • 4、调用 statusutil.PatchPodStatus 同步 pod 最新的 status 至 apiserver,并将返回的 pod 作为 newPod;
    • 5、检查 newPod 是否处于 terminated 状态,若处于 terminated 状态则调用 apiserver 接口进行删除并从 cache 中清除,删除后 pod 会进行重建;

    k8s.io/kubernetes/pkg/kubelet/status/status_manager.go:514

    1. func (m *manager) syncPod(uid types.UID, status versionedPodStatus) {
    2. // 1、判断是否需要同步状态
    3. if !m.needsUpdate(uid, status) {
    4. klog.V(1).Infof("Status for pod %q is up-to-date; skipping", uid)
    5. return
    6. }
    7. // 2、获取 pod 的 oldStatus
    8. pod, err := m.kubeClient.CoreV1().Pods(status.podNamespace).Get(status.podName, metav1.GetOptions{})
    9. if errors.IsNotFound(err) {
    10. return
    11. }
    12. if err != nil {
    13. return
    14. }
    15. translatedUID := m.podManager.TranslatePodUID(pod.UID)
    16. // 3、检查 pod UID 是否已经改变
    17. if len(translatedUID) > 0 && translatedUID != kubetypes.ResolvedPodUID(uid) {
    18. return
    19. }
    20. // 4、同步 pod 最新的 status 至 apiserver
    21. oldStatus := pod.Status.DeepCopy()
    22. newPod, patchBytes, err := statusutil.PatchPodStatus(m.kubeClient, pod.Namespace, pod.Name, *oldStatus, mergePodStatus(*oldStatus, status.status))
    23. if err != nil {
    24. return
    25. }
    26. pod = newPod
    27. m.apiStatusVersions[kubetypes.MirrorPodUID(pod.UID)] = status.version
    28. // 5、若 newPod 处于 terminated 状态则调用 apiserver 删除该 pod,删除后 pod 会重建
    29. if m.canBeDeleted(pod, status.status) {
    30. deleteOptions := metav1.NewDeleteOptions(0)
    31. deleteOptions.Preconditions = metav1.NewUIDPreconditions(string(pod.UID))
    32. err = m.kubeClient.CoreV1().Pods(pod.Namespace).Delete(pod.Name, deleteOptions)
    33. if err != nil {
    34. return
    35. }
    36. // 6、从 cache 中清除
    37. m.deletePodStatus(uid)
    38. }
    39. }
    needsUpdate

    needsUpdate 方法主要是检查 pod 的状态是否需要更新,以及检查当 pod 处于 terminated 状态时保证 pod 被完全删除。

    k8s.io/kubernetes/pkg/kubelet/status/status_manager.go:570

    1. func (m *manager) needsUpdate(uid types.UID, status versionedPodStatus) bool {
    2. latest, ok := m.apiStatusVersions[kubetypes.MirrorPodUID(uid)]
    3. if !ok || latest < status.version {
    4. return true
    5. }
    6. pod, ok := m.podManager.GetPodByUID(uid)
    7. if !ok {
    8. return false
    9. }
    10. return m.canBeDeleted(pod, status.status)
    11. }
    12. func (m *manager) canBeDeleted(pod *v1.Pod, status v1.PodStatus) bool {
    13. if pod.DeletionTimestamp == nil || kubepod.IsMirrorPod(pod) {
    14. return false
    15. }
    16. // 此处说明 pod 已经处于删除状态了
    17. }
    PodResourcesAreReclaimed

    PodResourcesAreReclaimed 检查 pod 在 node 上占用的所有资源是否已经被回收,其主要逻辑为:

    • 1、检查 pod 中的所有 container 是否都处于非 running 状态;
    • 2、从 podCache 中获取 podStatus,通过 podStatus 检查 pod 中的 container 是否已被完全删除;
    • 3、检查 pod 的 volume 是否被清理;
    • 4、检查 pod 的 cgroup 是否被清理;
    • 5、若以上几个检查项都通过说明在 kubelet 端 pod 已被完全删除;

    k8s.io/kubernetes/pkg/kubelet/kubelet_pods.go:900

    1. func (kl *Kubelet) PodResourcesAreReclaimed(pod *v1.Pod, status v1.PodStatus) bool {
    2. // 1、检查 pod 中的所有 container 是否都处于非 running 状态
    3. if !notRunning(status.ContainerStatuses) {
    4. return false
    5. }
    6. runtimeStatus, err := kl.podCache.Get(pod.UID)
    7. if err != nil {
    8. return false
    9. }
    10. if len(runtimeStatus.ContainerStatuses) > 0 {
    11. var statusStr string
    12. for _, status := range runtimeStatus.ContainerStatuses {
    13. statusStr += fmt.Sprintf("%+v ", *status)
    14. }
    15. return false
    16. }
    17. // 3、检查 pod 的 volume 是否被清理
    18. if kl.podVolumesExist(pod.UID) && !kl.keepTerminatedPodVolumes {
    19. return false
    20. }
    21. // 4、检查 pod 的 cgroup 是否被清理
    22. if kl.kubeletConfiguration.CgroupsPerQOS {
    23. pcm := kl.containerManager.NewPodContainerManager()
    24. if pcm.Exists(pod) {
    25. return false
    26. }
    27. }
    28. return true
    29. }

    syncBatch

    syncBatch 是定期将 statusManager 缓存 podStatuses 中的数据同步到 apiserver 的方法,主要逻辑为:

    • 1、调用 m.podManager.GetUIDTranslations 从 podManager 中获取 mirrorPod uid 与 staticPod uid 的对应关系;
    • 2、从 apiStatusVersions 中清理已经不存在的 pod,遍历 apiStatusVersions,检查 podStatuses 以及 mirrorToPod 中是否存在该对应的 pod,若不存在则从 apiStatusVersions 中删除;
    • 3、遍历 podStatuses,首先调用 needsUpdate 检查 pod 的状态是否与 apiStatusVersions 中的一致,然后调用 needsReconcile 检查 pod 的状态是否与 podManager 中的一致,若不一致则将需要同步的 pod 加入到 updatedStatuses 列表中;
    • 4、遍历 updatedStatuses 列表,调用 m.syncPod 方法同步状态;

    syncBatch 主要是将 statusManage cache 中的数据与 apiStatusVersions 和 podManager 中的数据进行对比是否一致,若不一致则以 statusManage cache 中的数据为准同步至 apiserver。

    k8s.io/kubernetes/pkg/kubelet/status/status_manager.go:469

    1. func (m *manager) syncBatch() {
    2. var updatedStatuses []podStatusSyncRequest
    3. // 1、获取 mirrorPod 与 staticPod 的对应关系
    4. podToMirror, mirrorToPod := m.podManager.GetUIDTranslations()
    5. func() {
    6. m.podStatusesLock.RLock()
    7. defer m.podStatusesLock.RUnlock()
    8. // 2、从 apiStatusVersions 中清理已经不存在的 pod
    9. for uid := range m.apiStatusVersions {
    10. _, hasPod := m.podStatuses[types.UID(uid)]
    11. _, hasMirror := mirrorToPod[uid]
    12. if !hasPod && !hasMirror {
    13. delete(m.apiStatusVersions, uid)
    14. }
    15. }
    16. // 3、遍历 podStatuses,将需要同步状态的 pod 加入到 updatedStatuses 列表中
    17. for uid, status := range m.podStatuses {
    18. syncedUID := kubetypes.MirrorPodUID(uid)
    19. if mirrorUID, ok := podToMirror[kubetypes.ResolvedPodUID(uid)]; ok {
    20. if mirrorUID == "" {
    21. continue
    22. }
    23. syncedUID = mirrorUID
    24. }
    25. if m.needsUpdate(types.UID(syncedUID), status) {
    26. updatedStatuses = append(updatedStatuses, podStatusSyncRequest{uid, status})
    27. } else if m.needsReconcile(uid, status.status) {
    28. delete(m.apiStatusVersions, syncedUID)
    29. updatedStatuses = append(updatedStatuses, podStatusSyncRequest{uid, status})
    30. }
    31. }
    32. }()
    33. // 4、调用 m.syncPod 同步 pod 状态
    34. for _, update := range updatedStatuses {
    35. m.syncPod(update.podUID, update.status)
    36. }
    37. }

    syncBatch 中主要调用了两个方法 needsUpdateneedsReconcileneedsUpdate 在上文中已经介绍过了,下面介绍 needsReconcile 方法的主要逻辑。

    needsReconcile

    needsReconcile 对比当前 pod 的状态与 podManager 中的状态是否一致,podManager 中保存了 node 上 pod 的 object,podManager 中的数据与 apiserver 是一致的,needsReconcile 主要逻辑为:

    • 1、通过 uid 从 podManager 中获取 pod 对象;
    • 2、检查 pod 是否为 static pod,若为 static pod 则获取其对应的 mirrorPod;
    • 3、格式化 pod status subResource;
    • 4、检查 podManager 中的 status 与 statusManager cache 中的 status 是否一致,若不一致则以 statusManager 为准进行同步;

    k8s.io/kubernetes/pkg/kubelet/status/status_manager.go:598

    以上就是 statusManager 同步 pod status 的主要逻辑,下面再了解一下 statusManager 对其他组件暴露的方法。

    SetPodStatus

    SetPodStatus 是为 pod 设置 status subResource 并会触发同步操作,主要逻辑为:

    • 1、检查 pod.Status.Conditions 中的类型是否为 kubelet 创建的,kubelet 会创建 ContainersReadyInitializedReadyPodScheduledUnschedulable 五种类型的 conditions;
    • 2、调用 m.updateStatusInternal 触发更新操作;
    1. func (m *manager) SetPodStatus(pod *v1.Pod, status v1.PodStatus) {
    2. m.podStatusesLock.Lock()
    3. defer m.podStatusesLock.Unlock()
    4. for _, c := range pod.Status.Conditions {
    5. if !kubetypes.PodConditionByKubelet(c.Type) {
    6. klog.Errorf("Kubelet is trying to update pod condition %q for pod %q. "+
    7. "But it is not owned by kubelet.", string(c.Type), format.Pod(pod))
    8. }
    9. }
    10. status = *status.DeepCopy()
    11. m.updateStatusInternal(pod, status, pod.DeletionTimestamp != nil)
    12. }
    updateStatusInternal

    statusManager 对外暴露的方法中触发状态同步的操作都是由 updateStatusInternal 完成的,updateStatusInternal 会更新 statusManager 的 cache 并会将 newStatus 发送到 m.podStatusChannel 中,然后 statusManager 会调用 syncPod 方法同步到 apiserver。

    • 1、从 cache 中获取 oldStatus;
    • 2、检查 ContainerStatusesInitContainerStatuses 是否合法;
    • 3、为 status 设置 ContainersReadyPodReadyPodInitializedPodScheduled conditions;
    • 4、设置 status 的 StartTime
    • 5、格式化 status;
    • 6、将 newStatus 添加到 statusManager 的 cache podStatuses 中;
    • 7、将 newStatus 发送到 m.podStatusChannel 中;

    k8s.io/kubernetes/pkg/kubelet/status/status_manager.go:362

    1. func (m *manager) updateStatusInternal(pod *v1.Pod, status v1.PodStatus, forceUpdate bool) bool {
    2. var oldStatus v1.PodStatus
    3. // 1、从 cache 中获取 oldStatus
    4. cachedStatus, isCached := m.podStatuses[pod.UID]
    5. if isCached {
    6. oldStatus = cachedStatus.status
    7. } else if mirrorPod, ok := m.podManager.GetMirrorPodByPod(pod); ok {
    8. oldStatus = mirrorPod.Status
    9. } else {
    10. oldStatus = pod.Status
    11. }
    12. // 2、检查 ContainerStatuses 和 InitContainerStatuses 是否合法
    13. if err := checkContainerStateTransition(oldStatus.ContainerStatuses, status.ContainerStatuses, pod.Spec.RestartPolicy); err != nil {
    14. return false
    15. }
    16. if err := checkContainerStateTransition(oldStatus.InitContainerStatuses, status.InitContainerStatuses, pod.Spec.RestartPolicy); err != nil {
    17. klog.Errorf("Status update on pod %v/%v aborted: %v", pod.Namespace, pod.Name, err)
    18. return false
    19. }
    20. // 3、为 status 设置 ContainersReady、PodReady、PodInitialized、PodScheduled conditions
    21. updateLastTransitionTime(&status, &oldStatus, v1.ContainersReady)
    22. updateLastTransitionTime(&status, &oldStatus, v1.PodReady)
    23. updateLastTransitionTime(&status, &oldStatus, v1.PodInitialized)
    24. updateLastTransitionTime(&status, &oldStatus, v1.PodScheduled)
    25. if oldStatus.StartTime != nil && !oldStatus.StartTime.IsZero() {
    26. status.StartTime = oldStatus.StartTime
    27. } else if status.StartTime.IsZero() {
    28. now := metav1.Now()
    29. status.StartTime = &now
    30. }
    31. // 5、格式化 status
    32. normalizeStatus(pod, &status)
    33. if isCached && isPodStatusByKubeletEqual(&cachedStatus.status, &status) && !forceUpdate {
    34. return false
    35. }
    36. // 6、将 newStatus 添加到 statusManager 的 cache podStatuses 中
    37. newStatus := versionedPodStatus{
    38. status: status,
    39. version: cachedStatus.version + 1,
    40. podName: pod.Name,
    41. podNamespace: pod.Namespace,
    42. }
    43. m.podStatuses[pod.UID] = newStatus
    44. // 7、将 newStatus 发送到 m.podStatusChannel 中
    45. select {
    46. case m.podStatusChannel <- podStatusSyncRequest{pod.UID, newStatus}:
    47. return true
    48. default:
    49. return false
    50. }
    51. }

    SetPodStatus 方法主要会用在 kubelet 的主 syncLoop 中,并在 syncPod 方法中创建 pod 时使用。

    SetContainerReadiness

    SetContainerReadiness 方法会设置 pod status subResource 中 container 是否为 ready 状态,其主要逻辑为:

    • 1、获取 pod 对象;
    • 2、从 m.podStatuses 获取 oldStatus;
    • 3、通过 containerID 从 pod 中获取 containerStatus;
    • 4、若 container status 为 Ready 直接返回,此时该 container 状态无须更新;
    • 5、添加 PodReadyCondition 和 ContainersReadyCondition;
    • 6、调用 m.updateStatusInternal 触发同步操作;

    k8s.io/kubernetes/pkg/kubelet/status/status_manager.go:198

    1. func (m *manager) SetContainerReadiness(podUID types.UID, containerID kubecontainer.ContainerID, ready bool) {
    2. m.podStatusesLock.Lock()
    3. defer m.podStatusesLock.Unlock()
    4. // 1、获取 pod 对象
    5. pod, ok := m.podManager.GetPodByUID(podUID)
    6. if !ok {
    7. return
    8. }
    9. // 2、从 m.podStatuses 获取 oldStatus
    10. oldStatus, found := m.podStatuses[pod.UID]
    11. if !found {
    12. return
    13. }
    14. // 3、通过 containerID 从 pod 中获取 containerStatus
    15. containerStatus, _, ok := findContainerStatus(&oldStatus.status, containerID.String())
    16. if !ok {
    17. return
    18. }
    19. // 4、若 container status 为 Ready 直接返回,此时该 container 状态无须更新
    20. if containerStatus.Ready == ready {
    21. return
    22. }
    23. status := *oldStatus.status.DeepCopy()
    24. containerStatus, _, _ = findContainerStatus(&status, containerID.String())
    25. containerStatus.Ready = ready
    26. updateConditionFunc := func(conditionType v1.PodConditionType, condition v1.PodCondition) {
    27. conditionIndex := -1
    28. for i, condition := range status.Conditions {
    29. if condition.Type == conditionType {
    30. conditionIndex = i
    31. break
    32. }
    33. }
    34. if conditionIndex != -1 {
    35. status.Conditions[conditionIndex] = condition
    36. } else {
    37. status.Conditions = append(status.Conditions, condition)
    38. }
    39. }
    40. // 5、添加 PodReadyCondition 和 ContainersReadyCondition
    41. updateConditionFunc(v1.PodReady, GeneratePodReadyCondition(&pod.Spec, status.Conditions, status.ContainerStatuses, status.Phase))
    42. updateConditionFunc(v1.ContainersReady, GenerateContainersReadyCondition(&pod.Spec, status.ContainerStatuses, status.Phase))
    43. // 6、调用 m.updateStatusInternal 触发同步操作
    44. m.updateStatusInternal(pod, status, false)
    45. }

    SetContainerReadiness 方法主要被用在 proberManager 中,关于 proberManager 的功能会在后文说明。

    SetContainerStartup

    SetContainerStartup 方法会设置 pod status subResource 中 container 是否为 started 状态,主要逻辑为:

    • 1、通过 podUID 从 podManager 中获取 pod 对象;
    • 2、从 statusManager 中获取 pod 的 oldStatus;
    • 3、检查要更新的 container 是否存在;
    • 4、检查目标 container 的 started 状态是否已为期望值;
    • 5、设置目标 container 的 started 状态;
    • 6、调用 m.updateStatusInternal 触发同步操作;

    k8s.io/kubernetes/pkg/kubelet/status/status_manager.go:255

    1. func (m *manager) SetContainerStartup(podUID types.UID, containerID kubecontainer.ContainerID, started bool) {
    2. m.podStatusesLock.Lock()
    3. defer m.podStatusesLock.Unlock()
    4. // 1、通过 podUID 从 podManager 中获取 pod 对象
    5. pod, ok := m.podManager.GetPodByUID(podUID)
    6. if !ok {
    7. return
    8. }
    9. // 2、从 statusManager 中获取 pod 的 oldStatus
    10. oldStatus, found := m.podStatuses[pod.UID]
    11. if !found {
    12. return
    13. }
    14. // 3、检查要更新的 container 是否存在
    15. containerStatus, _, ok := findContainerStatus(&oldStatus.status, containerID.String())
    16. if !ok {
    17. klog.Warningf("Container startup changed for unknown container: %q - %q",
    18. format.Pod(pod), containerID.String())
    19. return
    20. }
    21. // 4、检查目标 container 的 started 状态是否已为期望值
    22. if containerStatus.Started != nil && *containerStatus.Started == started {
    23. return
    24. }
    25. // 5、设置目标 container 的 started 状态
    26. status := *oldStatus.status.DeepCopy()
    27. containerStatus, _, _ = findContainerStatus(&status, containerID.String())
    28. containerStatus.Started = &started
    29. // 6、触发同步操作
    30. m.updateStatusInternal(pod, status, false)
    31. }

    SetContainerStartup 方法也是主要被用在 proberManager 中。

    TerminatePod

    TerminatePod 方法的主要逻辑是把 pod .status.containerStatuses.status.initContainerStatuses 中 container 的 state 置为 Terminated 状态并触发状态同步操作。

    k8s.io/kubernetes/pkg/kubelet/status/status_manager.go:312

    TerminatePod 方法主要会用在 kubelet 的主 syncLoop 中。

    RemoveOrphanedStatuses

    RemoveOrphanedStatuses 的主要逻辑是从 statusManager 缓存 podStatuses 中删除对应的 pod。

    1. func (m *manager) RemoveOrphanedStatuses(podUIDs map[types.UID]bool) {
    2. m.podStatusesLock.Lock()
    3. defer m.podStatusesLock.Unlock()
    4. for key := range m.podStatuses {
    5. if _, ok := podUIDs[key]; !ok {
    6. klog.V(5).Infof("Removing %q from status map.", key)
    7. delete(m.podStatuses, key)
    8. }

    总结

    本文主要介绍了 statusManager 的功能以及使用,其功能其实非常简单,当 pod 状态改变时 statusManager 会将状态同步到 apiserver,statusManager 也提供了多个接口供其他组件调用,当其他组件需要改变 pod 的状态时会将 pod 的 status 信息发送到 statusManager 进行同步。