kubelet源码分析(二)之 NewMainKubelet

本文主要分析kubelet中的NewMainKubelet部分。

1. NewMainKubelet

NewMainKubelet主要用来初始化和构造一个kubelet结构体,kubelet结构体定义参考:

  1. if kubeDeps.PodConfig == nil {
  2. var err error
  3. kubeDeps.PodConfig, err = makePodSourceConfig(kubeCfg, kubeDeps, nodeName, bootstrapCheckpointPath)
  4. if err != nil {
  5. return nil, err
  6. }
  7. }
  1. // makePodSourceConfig creates a config.PodConfig from the given
  2. // KubeletConfiguration or returns an error.
  3. func makePodSourceConfig(kubeCfg *kubeletconfiginternal.KubeletConfiguration, kubeDeps *Dependencies, nodeName types.NodeName, bootstrapCheckpointPath string) (*config.PodConfig, error) {
  4. ...
  5. // source of all configuration
  6. cfg := config.NewPodConfig(config.PodConfigNotificationIncremental, kubeDeps.Recorder)
  7. // define file config source
  8. if kubeCfg.StaticPodPath != "" {
  9. glog.Infof("Adding pod path: %v", kubeCfg.StaticPodPath)
  10. config.NewSourceFile(kubeCfg.StaticPodPath, nodeName, kubeCfg.FileCheckFrequency.Duration, cfg.Channel(kubetypes.FileSource))
  11. }
  12. // define url config source
  13. if kubeCfg.StaticPodURL != "" {
  14. glog.Infof("Adding pod url %q with HTTP header %v", kubeCfg.StaticPodURL, manifestURLHeader)
  15. config.NewSourceURL(kubeCfg.StaticPodURL, manifestURLHeader, nodeName, kubeCfg.HTTPCheckFrequency.Duration, cfg.Channel(kubetypes.HTTPSource))
  16. }
  17. // Restore from the checkpoint path
  18. // NOTE: This MUST happen before creating the apiserver source
  19. // below, or the checkpoint would override the source of truth.
  20. ...
  21. if kubeDeps.KubeClient != nil {
  22. glog.Infof("Watching apiserver")
  23. if updatechannel == nil {
  24. updatechannel = cfg.Channel(kubetypes.ApiserverSource)
  25. }
  26. config.NewSourceApiserver(kubeDeps.KubeClient, nodeName, updatechannel)
  27. }
  28. return cfg, nil
  29. }

1.1.2. NewPodConfig

  1. // NewPodConfig creates an object that can merge many configuration sources into a stream
  2. // of normalized updates to a pod configuration.
  3. func NewPodConfig(mode PodConfigNotificationMode, recorder record.EventRecorder) *PodConfig {
  4. updates := make(chan kubetypes.PodUpdate, 50)
  5. storage := newPodStorage(updates, mode, recorder)
  6. podConfig := &PodConfig{
  7. pods: storage,
  8. mux: config.NewMux(storage),
  9. updates: updates,
  10. sources: sets.String{},
  11. }
  12. return podConfig
  13. }

1.1.3. NewSourceApiserver

  1. // NewSourceApiserver creates a config source that watches and pulls from the apiserver.
  2. func NewSourceApiserver(c clientset.Interface, nodeName types.NodeName, updates chan<- interface{}) {
  3. newSourceApiserverFromLW(lw, updates)
  4. }

serviceListernodeLister分别通过List-Watch机制监听servicenode的列表变化。

1.2.1. serviceLister

  1. serviceIndexer := cache.NewIndexer(cache.MetaNamespaceKeyFunc, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc})
  2. if kubeDeps.KubeClient != nil {
  3. serviceLW := cache.NewListWatchFromClient(kubeDeps.KubeClient.CoreV1().RESTClient(), "services", metav1.NamespaceAll, fields.Everything())
  4. r := cache.NewReflector(serviceLW, &v1.Service{}, serviceIndexer, 0)
  5. go r.Run(wait.NeverStop)
  6. }
  7. serviceLister := corelisters.NewServiceLister(serviceIndexer)

1.2.2. nodeLister

  1. nodeIndexer := cache.NewIndexer(cache.MetaNamespaceKeyFunc, cache.Indexers{})
  2. if kubeDeps.KubeClient != nil {
  3. fieldSelector := fields.Set{api.ObjectNameField: string(nodeName)}.AsSelector()
  4. nodeLW := cache.NewListWatchFromClient(kubeDeps.KubeClient.CoreV1().RESTClient(), "nodes", metav1.NamespaceAll, fieldSelector)
  5. r := cache.NewReflector(nodeLW, &v1.Node{}, nodeIndexer, 0)
  6. go r.Run(wait.NeverStop)
  7. }
  8. nodeInfo := &predicates.CachedNodeInfo{NodeLister: corelisters.NewNodeLister(nodeIndexer)}

1.3.1. containerRefManager

  1. containerRefManager := kubecontainer.NewRefManager()

1.3.2. oomWatcher

  1. oomWatcher := NewOOMWatcher(kubeDeps.CAdvisorInterface, kubeDeps.Recorder)

1.3.3. dnsConfigurer

  1. var secretManager secret.Manager
  2. var configMapManager configmap.Manager
  3. case kubeletconfiginternal.WatchChangeDetectionStrategy:
  4. secretManager = secret.NewWatchingSecretManager(kubeDeps.KubeClient)
  5. configMapManager = configmap.NewWatchingConfigMapManager(kubeDeps.KubeClient)
  6. case kubeletconfiginternal.TTLCacheChangeDetectionStrategy:
  7. secretManager = secret.NewCachingSecretManager(
  8. kubeDeps.KubeClient, manager.GetObjectTTLFromNodeFunc(klet.GetNode))
  9. configMapManager = configmap.NewCachingConfigMapManager(
  10. kubeDeps.KubeClient, manager.GetObjectTTLFromNodeFunc(klet.GetNode))
  11. case kubeletconfiginternal.GetChangeDetectionStrategy:
  12. secretManager = secret.NewSimpleSecretManager(kubeDeps.KubeClient)
  13. configMapManager = configmap.NewSimpleConfigMapManager(kubeDeps.KubeClient)
  14. default:
  15. return nil, fmt.Errorf("unknown configmap and secret manager mode: %v", kubeCfg.ConfigMapAndSecretChangeDetectionStrategy)
  16. }
  17. klet.secretManager = secretManager
  18. klet.configMapManager = configMapManager

1.3.5. livenessManager

  1. klet.livenessManager = proberesults.NewManager()

1.3.6. podManager

  1. // podManager is also responsible for keeping secretManager and configMapManager contents up-to-date.
  2. klet.podManager = kubepod.NewBasicPodManager(kubepod.NewBasicMirrorClient(klet.kubeClient), secretManager, configMapManager, checkpointManager)

1.3.7. resourceAnalyzer

  1. klet.resourceAnalyzer = serverstats.NewResourceAnalyzer(klet, kubeCfg.VolumeStatsAggPeriod.Duration)

1.3.8. containerGC

  1. // setup containerGC
  2. containerGC, err := kubecontainer.NewContainerGC(klet.containerRuntime, containerGCPolicy, klet.sourcesReady)
  3. if err != nil {
  4. return nil, err
  5. }
  6. klet.containerGC = containerGC
  7. klet.containerDeletor = newPodContainerDeletor(klet.containerRuntime, integer.IntMax(containerGCPolicy.MaxPerPodContainer, minDeadContainerInPod))

1.3.9. imageManager

  1. // setup imageManager
  2. imageManager, err := images.NewImageGCManager(klet.containerRuntime, klet.StatsProvider, kubeDeps.Recorder, nodeRef, imageGCPolicy, crOptions.PodSandboxImage)
  3. if err != nil {
  4. return nil, fmt.Errorf("failed to initialize image manager: %v", err)
  5. }
  6. klet.imageManager = imageManager

1.3.10. statusManager

  1. klet.statusManager = status.NewManager(klet.kubeClient, klet.podManager, klet)

1.3.11. probeManager

  1. klet.probeManager = prober.NewManager(
  2. klet.statusManager,
  3. klet.livenessManager,
  4. klet.runner,
  5. containerRefManager,
  6. kubeDeps.Recorder)

1.3.13. volumePluginMgr

  1. klet.volumePluginMgr, err =
  2. NewInitializedVolumePluginMgr(klet, secretManager, configMapManager, tokenManager, kubeDeps.VolumePlugins, kubeDeps.DynamicPluginProber)
  3. if err != nil {
  4. return nil, err
  5. }
  6. if klet.enablePluginsWatcher {
  7. klet.pluginWatcher = pluginwatcher.NewWatcher(klet.getPluginsDir())
  8. }

1.3.14. volumeManager

  1. // setup volumeManager
  2. klet.volumeManager = volumemanager.NewVolumeManager(
  3. kubeCfg.EnableControllerAttachDetach,
  4. nodeName,
  5. klet.podManager,
  6. klet.statusManager,
  7. klet.kubeClient,
  8. klet.volumePluginMgr,
  9. klet.containerRuntime,
  10. kubeDeps.Mounter,
  11. klet.getPodsDir(),
  12. kubeDeps.Recorder,
  13. experimentalCheckNodeCapabilitiesBeforeMount,
  14. keepTerminatedPodVolumes)

1.3.15. evictionManager

  1. evictionManager, evictionAdmitHandler := eviction.NewManager(klet.resourceAnalyzer, evictionConfig, killPodNow(klet.podWorkers, kubeDeps.Recorder), klet.imageManager, klet.containerGC, kubeDeps.Recorder, nodeRef, klet.clock)
  2. klet.evictionManager = evictionManager
  3. klet.admitHandlers.AddPodAdmitHandler(evictionAdmitHandler)

目前pod所使用的runtime只有dockerremote两种,rkt已经废弃。

  1. if containerRuntime == "rkt" {
  2. glog.Fatalln("rktnetes has been deprecated in favor of rktlet. Please see https://github.com/kubernetes-incubator/rktlet for more information.")
  3. }

runtimedocker的时候,会执行docker相关操作。

  1. switch containerRuntime {
  2. case kubetypes.DockerContainerRuntime:
  3. // Create and start the CRI shim running as a grpc server.
  4. ...
  5. // The unix socket for kubelet <-> dockershim communication.
  6. ...
  7. // Create dockerLegacyService when the logging driver is not supported.
  8. case kubetypes.RemoteContainerRuntime:
  9. // No-op.
  10. break
  11. default:
  12. return nil, fmt.Errorf("unsupported CRI runtime: %q", containerRuntime)
  13. }

1.4.1. NewDockerService

  1. // Create and start the CRI shim running as a grpc server.
  2. streamingConfig := getStreamingConfig(kubeCfg, kubeDeps, crOptions)
  3. ds, err := dockershim.NewDockerService(kubeDeps.DockerClientConfig, crOptions.PodSandboxImage, streamingConfig,
  4. &pluginSettings, runtimeCgroups, kubeCfg.CgroupDriver, crOptions.DockershimRootDirectory, !crOptions.RedirectContainerStreaming)
  5. if err != nil {
  6. return nil, err
  7. }
  8. if crOptions.RedirectContainerStreaming {
  9. klet.criHandler = ds
  10. }

1.4.2. NewDockerServer

  1. // The unix socket for kubelet <-> dockershim communication.
  2. glog.V(5).Infof("RemoteRuntimeEndpoint: %q, RemoteImageEndpoint: %q",
  3. remoteRuntimeEndpoint,
  4. remoteImageEndpoint)
  5. glog.V(2).Infof("Starting the GRPC server for the docker CRI shim.")
  6. server := dockerremote.NewDockerServer(remoteRuntimeEndpoint, ds)
  7. if err := server.Start(); err != nil {
  8. return nil, err
  9. }

1.4.3. DockerServer.Start

  1. // Start starts the dockershim grpc server.
  2. func (s *DockerServer) Start() error {
  3. // Start the internal service.
  4. if err := s.service.Start(); err != nil {
  5. glog.Errorf("Unable to start docker service")
  6. return err
  7. }
  8. glog.V(2).Infof("Start dockershim grpc server")
  9. l, err := util.CreateListener(s.endpoint)
  10. if err != nil {
  11. return fmt.Errorf("failed to listen on %q: %v", s.endpoint, err)
  12. }
  13. // Create the grpc server and register runtime and image services.
  14. s.server = grpc.NewServer(
  15. grpc.MaxRecvMsgSize(maxMsgSize),
  16. grpc.MaxSendMsgSize(maxMsgSize),
  17. )
  18. runtimeapi.RegisterRuntimeServiceServer(s.server, s.service)
  19. runtimeapi.RegisterImageServiceServer(s.server, s.service)
  20. go func() {
  21. if err := s.server.Serve(l); err != nil {
  22. glog.Fatalf("Failed to serve connections: %v", err)
  23. }
  24. }()
  25. return nil
  26. }

1.5.1. PodWorkers接口

  1. // PodWorkers is an abstract interface for testability.
  2. type PodWorkers interface {
  3. UpdatePod(options *UpdatePodOptions)
  4. ForgetNonExistingPodWorkers(desiredPods map[types.UID]empty)
  5. ForgetWorker(uid types.UID)
  6. }

podWorker主要用来对pod相应事件进行处理和同步,包含以下三个方法:UpdatePodForgetNonExistingPodWorkersForgetWorker

2. 总结

  1. NewMainKubelet主要用来构造kubelet结构体,其中kubelet除了包含必要的配置和client(例如:kubeClient、csiClient等)外,最主要的包含各种manager来管理不同的任务。
  2. 核心的manager有以下几种:

    • oomWatcher:监控pod内存是否发生OOM。
    • podManager:管理pod的生命周期,包括对pod的增删改查操作等。
    • containerGC:对死亡容器进行垃圾回收。
    • imageManager:对容器镜像进行垃圾回收。
    • statusManager:与apiserver同步pod状态,同时也作状态缓存。
    • volumeManager:对pod的volume进行attached/detached/mounted/unmounted操作。
    • evictionManager:保证节点稳定,必要时对pod进行驱逐(例如资源不足的情况下)。
  3. NewMainKubelet还包含了serviceListernodeLister来监听servicenode的列表变化。

  4. kubelet使用到的containerRuntime目前主要是docker,其中rkt已废弃。NewMainKubelet启动了dockershim grpc server来执行docker相关操作。
  5. 构建了podWorker来对pod相关的更新逻辑进行处理。