内置的 kubernetes service 无法删除,其 ClusterIP 为通过 参数指定的 ip 段中的首个 ip,kubernetes endpoints 中的 ip 以及 port 可以通过 --advertise-address--secure-port 启动参数来指定。

    kubernetes service 是由 kube-apiserver 中的 bootstrap controller 进行控制的,其主要以下几个功能:

    • 创建 kubernetes service;
    • 创建 default、kube-system 和 kube-public 命名空间,如果启用了 NodeLease 特性还会创建 kube-node-lease 命名空间;
    • 提供基于 Service ClusterIP 的修复及检查功能;
    • 提供基于 Service NodePort 的修复及检查功能;

    kubernetes service 默认使用 ClusterIP 对外暴露服务,若要使用 nodePort 的方式可在 kube-apiserver 启动时通过 --kubernetes-service-node-port 参数指定对应的端口。

    bootstrap controller 的初始化以及启动是在 CreateKubeAPIServer 调用链的 InstallLegacyAPI 方法中完成的,bootstrap controller 的启停是由 apiserver 的 PostStartHookShutdownHook 进行控制的。

    k8s.io/kubernetes/pkg/master/master.go:406

    1. func (m *Master) InstallLegacyAPI(......) error {
    2. legacyRESTStorage, apiGroupInfo, err := legacyRESTStorageProvider.NewLegacyRESTStorage(restOptionsGetter)
    3. if err != nil {
    4. return fmt.Errorf("Error building core storage: %v", err)
    5. }
    6. // 初始化 bootstrap-controller
    7. controllerName := "bootstrap-controller"
    8. coreClient := corev1client.NewForConfigOrDie(c.GenericConfig.LoopbackClientConfig)
    9. bootstrapController := c.NewBootstrapController(......)
    10. m.GenericAPIServer.AddPostStartHookOrDie(controllerName, bootstrapController.PostStartHook)
    11. m.GenericAPIServer.AddPreShutdownHookOrDie(controllerName, bootstrapController.PreShutdownHook)
    12. if err := m.GenericAPIServer.InstallLegacyAPIGroup(genericapiserver.DefaultLegacyAPIPrefix, &apiGroupInfo); err != nil {
    13. return fmt.Errorf("Error in registering group versions: %v", err)
    14. }
    15. return nil
    16. }

    postStartHooks 会在 kube-apiserver 的启动方法 prepared.Run 中调用 RunPostStartHooks 启动所有 Hook。

    NewBootstrapController

    bootstrap controller 在初始化时需要设定多个参数,主要有 PublicIP、ServiceCIDR、PublicServicePort 等。PublicIP 是通过命令行参数 --advertise-address 指定的,如果没有指定,系统会自动选出一个 global IP。PublicServicePort 通过 --secure-port 启动参数来指定(默认为 6443),ServiceCIDR 通过 --service-cluster-ip-range 参数指定(默认为 10.0.0.0/24)。

    k8s.io/kubernetes/pkg/master/controller.go:89

    1. func (c *completedConfig) NewBootstrapController(......) *Controller {
    2. // 1、获取 PublicServicePort
    3. _, publicServicePort, err := c.GenericConfig.SecureServing.HostPort()
    4. if err != nil {
    5. klog.Fatalf("failed to get listener address: %v", err)
    6. }
    7. // 2、指定需要创建的 kube-system 和 kube-public
    8. systemNamespaces := []string{metav1.NamespaceSystem, metav1.NamespacePublic}
    9. if utilfeature.DefaultFeatureGate.Enabled(features.NodeLease) {
    10. systemNamespaces = append(systemNamespaces, corev1.NamespaceNodeLease)
    11. }
    12. return &Controller{
    13. ......
    14. // ServiceClusterIPRegistry 是在 CreateKubeAPIServer 初始化 RESTStorage 时初始化的,是一个 etcd 实例
    15. ServiceClusterIPRegistry: legacyRESTStorage.ServiceClusterIPAllocator,
    16. ServiceClusterIPRange: c.ExtraConfig.ServiceIPRange,
    17. SecondaryServiceClusterIPRegistry: legacyRESTStorage.SecondaryServiceClusterIPAllocator,
    18. // SecondaryServiceClusterIPRange 需要在启用 IPv6DualStack 后才能使用
    19. SecondaryServiceClusterIPRange: c.ExtraConfig.SecondaryServiceIPRange,
    20. ServiceClusterIPInterval: 3 * time.Minute,
    21. ServiceNodePortRegistry: legacyRESTStorage.ServiceNodePortAllocator,
    22. ServiceNodePortRange: c.ExtraConfig.ServiceNodePortRange,
    23. ServiceNodePortInterval: 3 * time.Minute,
    24. // API Server 绑定的IP,这个IP会作为kubernetes service的Endpoint的IP
    25. PublicIP: c.GenericConfig.PublicAddress,
    26. // 取 clusterIP range 中的第一个 IP
    27. ServiceIP: c.ExtraConfig.APIServerServiceIP,
    28. // 默认为 6443
    29. ServicePort: c.ExtraConfig.APIServerServicePort,
    30. ExtraServicePorts: c.ExtraConfig.ExtraServicePorts,
    31. ExtraEndpointPorts: c.ExtraConfig.ExtraEndpointPorts,
    32. // 这里为 6443
    33. PublicServicePort: publicServicePort,
    34. // 缺省是基于 ClusterIP 启动模式,这里为0
    35. KubernetesServiceNodePort: c.ExtraConfig.KubernetesServiceNodePort,
    36. }
    37. }

    k8s.io/kubernetes/staging/src/k8s.io/apimachinery/pkg/util/net/interface.go:323

    bootstrapController.Start

    上文已经提到了 bootstrap controller 主要的四个功能:修复 ClusterIP、修复 NodePort、更新 kubernetes service、创建系统所需要的名字空间(default、kube-system、kube-public)。bootstrap controller 在启动后首先会完成一次 ClusterIP、NodePort 和 Kubernets 服务的处理,然后异步循环运行上面的4个工作。以下是其 start 方法:

    k8s.io/kubernetes/pkg/master/controller.go:146

    1. func (c *Controller) Start() {
    2. if c.runner != nil {
    3. return
    4. }
    5. // 1、首次启动时首先从 kubernetes endpoints 中移除自身的配置,
    6. // 此时 kube-apiserver 可能处于非 ready 状态
    7. endpointPorts := createEndpointPortSpec(c.PublicServicePort, "https", c.ExtraEndpointPorts)
    8. if err := c.EndpointReconciler.RemoveEndpoints(kubernetesServiceName, c.PublicIP, endpointPorts); err != nil {
    9. klog.Errorf("Unable to remove old endpoints from kubernetes service: %v", err)
    10. }
    11. // 2、初始化 repairClusterIPs 和 repairNodePorts 对象
    12. repairClusterIPs := servicecontroller.NewRepair(......)
    13. repairNodePorts := portallocatorcontroller.NewRepair(......)
    14. // 3、首先运行一次 epairClusterIPs 和 repairNodePorts,即进行初始化
    15. if err := repairClusterIPs.RunOnce(); err != nil {
    16. klog.Fatalf("Unable to perform initial IP allocation check: %v", err)
    17. }
    18. if err := repairNodePorts.RunOnce(); err != nil {
    19. klog.Fatalf("Unable to perform initial service nodePort check: %v", err)
    20. }
    21. c.runner = async.NewRunner(c.RunKubernetesNamespaces, c.RunKubernetesService, repairClusterIPs.RunUntil, repairNodePorts.RunUntil)
    22. }

    c.RunKubernetesNamespaces

    c.RunKubernetesNamespaces 主要功能是创建 kube-system 和 kube-public 命名空间,如果启用了 NodeLease 特性功能还会创建 kube-node-lease 命名空间,之后每隔一分钟检查一次。

    k8s.io/kubernetes/pkg/master/controller.go:199

    1. func (c *Controller) RunKubernetesNamespaces(ch chan struct{}) {
    2. wait.Until(func() {
    3. for _, ns := range c.SystemNamespaces {
    4. if err := createNamespaceIfNeeded(c.NamespaceClient, ns); err != nil {
    5. runtime.HandleError(fmt.Errorf("unable to create required kubernetes system namespace %s: %v", ns, err))
    6. }
    7. }
    8. }, c.SystemNamespacesInterval, ch)
    9. }

    c.RunKubernetesService

    c.RunKubernetesService 主要是检查 kubernetes service 是否处于正常状态,并定期执行同步操作。首先调用 /healthz 接口检查 apiserver 当前是否处于 ready 状态,若处于 ready 状态然后调用 c.UpdateKubernetesService 服务更新 kubernetes service 状态。

    k8s.io/kubernetes/pkg/master/controller.go:210

    c.UpdateKubernetesService

    c.UpdateKubernetesService 的主要逻辑为:

    • 1、调用 createNamespaceIfNeeded 创建 default namespace;
    • 2、调用 c.CreateOrUpdateMasterServiceIfNeeded 为 master 创建 kubernetes service;
    • 3、调用 c.EndpointReconciler.ReconcileEndpoints 更新 master 的 endpoint;

    k8s.io/kubernetes/pkg/master/controller.go:230

    1. func (c *Controller) UpdateKubernetesService(reconcile bool) error {
    2. if err := createNamespaceIfNeeded(c.NamespaceClient, metav1.NamespaceDefault); err != nil {
    3. return err
    4. }
    5. servicePorts, serviceType := createPortAndServiceSpec(c.ServicePort, c.PublicServicePort, c.KubernetesServiceNodePort, "https", c.ExtraServicePorts)
    6. if err := c.CreateOrUpdateMasterServiceIfNeeded(kubernetesServiceName, c.ServiceIP, servicePorts, serviceType, reconcile); err != nil {
    7. return err
    8. }
    9. endpointPorts := createEndpointPortSpec(c.PublicServicePort, "https", c.ExtraEndpointPorts)
    10. if err := c.EndpointReconciler.ReconcileEndpoints(kubernetesServiceName, c.PublicIP, endpointPorts, reconcile); err != nil {
    11. return err
    12. }
    13. return nil
    14. }
    c.EndpointReconciler.ReconcileEndpoints

    一个集群中可能会有多个 apiserver 实例,因此需要统一管理 apiserver service 的 endpoints,c.EndpointReconciler.ReconcileEndpoints 就是用来管理 apiserver endpoints 的。一个集群中 apiserver 的所有实例会在 etcd 中的对应目录下创建 key,并定期更新这个 key 来上报自己的心跳信息,ReconcileEndpoints 会从 etcd 中获取 apiserver 的实例信息并更新 endpoint。

    k8s.io/kubernetes/pkg/master/reconcilers/lease.go:144

    1. func (r *leaseEndpointReconciler) ReconcileEndpoints(......) error {
    2. r.reconcilingLock.Lock()
    3. defer r.reconcilingLock.Unlock()
    4. if r.stopReconcilingCalled {
    5. return nil
    6. }
    7. // 更新 lease 信息
    8. if err := r.masterLeases.UpdateLease(ip.String()); err != nil {
    9. return err
    10. }
    11. return r.doReconcile(serviceName, endpointPorts, reconcilePorts)
    12. }
    13. func (r *leaseEndpointReconciler) doReconcile(......) error {
    14. // 1、获取 master 的 endpoint
    15. e, err := r.epAdapter.Get(corev1.NamespaceDefault, serviceName, metav1.GetOptions{})
    16. shouldCreate := false
    17. if err != nil {
    18. if !errors.IsNotFound(err) {
    19. return err
    20. }
    21. shouldCreate = true
    22. e = &corev1.Endpoints{
    23. ObjectMeta: metav1.ObjectMeta{
    24. Name: serviceName,
    25. Namespace: corev1.NamespaceDefault,
    26. },
    27. }
    28. }
    29. // 2、从 etcd 中获取所有的 master
    30. masterIPs, err := r.masterLeases.ListLeases()
    31. if err != nil {
    32. return err
    33. }
    34. if len(masterIPs) == 0 {
    35. return fmt.Errorf("no master IPs were listed in storage, refusing to erase all endpoints for the kubernetes service")
    36. }
    37. // 3、检查 endpoint 中 master 信息,如果与 etcd 中的不一致则进行更新
    38. formatCorrect, ipCorrect, portsCorrect := checkEndpointSubsetFormatWithLease(e, masterIPs, endpointPorts, reconcilePorts)
    39. if formatCorrect && ipCorrect && portsCorrect {
    40. return nil
    41. }
    42. if !formatCorrect {
    43. e.Subsets = []corev1.EndpointSubset{{
    44. Addresses: []corev1.EndpointAddress{},
    45. Ports: endpointPorts,
    46. }}
    47. }
    48. if !formatCorrect || !ipCorrect {
    49. e.Subsets[0].Addresses = make([]corev1.EndpointAddress, len(masterIPs))
    50. for ind, ip := range masterIPs {
    51. e.Subsets[0].Addresses[ind] = corev1.EndpointAddress{IP: ip}
    52. }
    53. e.Subsets = endpointsv1.RepackSubsets(e.Subsets)
    54. if !portsCorrect {
    55. e.Subsets[0].Ports = endpointPorts
    56. }
    57. if _, err = r.epAdapter.Create(corev1.NamespaceDefault, e); errors.IsAlreadyExists(err) {
    58. err = nil
    59. }
    60. } else {
    61. _, err = r.epAdapter.Update(corev1.NamespaceDefault, e)
    62. }
    63. return err
    64. }

    repairClusterIPs.RunUntil

    repairClusterIP 主要解决的问题有:

    • 保证集群中所有的 ClusterIP 都是唯一分配的;
    • 保证分配的 ClusterIP 不会超出指定范围;
    • 确保已经分配给 service 但是因为 crash 等其他原因没有正确创建 ClusterIP;
    • 自动将旧版本的 Kubernetes services 迁移到 ipallocator 原子性模型;

    repairClusterIPs.RunUntil 其实是调用 repairClusterIPs.runOnce 来处理的,其代码中的主要逻辑如下所示:

    k8s.io/kubernetes/pkg/registry/core/service/ipallocator/controller/repair.go:134

    repairNodePorts.RunUnti

    repairNodePorts 主要是用来纠正 service 中 nodePort 的信息,保证所有的 ports 都基于 cluster 创建的,当没有与 cluster 同步时会触发告警,其最终是调用 repairNodePorts.runOnce 进行处理的,主要逻辑与 ClusterIP 的处理逻辑类似。

    k8s.io/kubernetes/pkg/registry/core/service/portallocator/controller/repair.go:84

    1. func (c *Repair) runOnce() error {
    2. // 1、首先从 etcd 中获取已使用 nodeport 的快照
    3. err := wait.PollImmediate(time.Second, 10*time.Second, func() (bool, error) {
    4. var err error
    5. snapshot, err = c.alloc.Get()
    6. return err == nil, err
    7. })
    8. if err != nil {
    9. return fmt.Errorf("unable to refresh the port allocations: %v", err)
    10. }
    11. // 2、检查 snapshot 是否初始化
    12. if snapshot.Range == "" {
    13. snapshot.Range = c.portRange.String()
    14. }
    15. // 3、获取已分配 nodePort 信息
    16. stored, err := portallocator.NewFromSnapshot(snapshot)
    17. if err != nil {
    18. return fmt.Errorf("unable to rebuild allocator from snapshot: %v", err)
    19. }
    20. // 4、获取 service list
    21. list, err := c.serviceClient.Services(metav1.NamespaceAll).List(metav1.ListOptions{})
    22. if err != nil {
    23. return fmt.Errorf("unable to refresh the port block: %v", err)
    24. }
    25. rebuilt, err := portallocator.NewPortAllocator(c.portRange)
    26. if err != nil {
    27. return fmt.Errorf("unable to create port allocator: %v", err)
    28. }
    29. // 5、检查每个 Service ClusterIP 的 port,保证其处于正常状态
    30. for i := range list.Items {
    31. svc := &list.Items[i]
    32. ports := collectServiceNodePorts(svc)
    33. if len(ports) == 0 {
    34. continue
    35. }
    36. for _, port := range ports {
    37. switch err := rebuilt.Allocate(port); err {
    38. // 6、检查 port 是否泄漏
    39. case nil:
    40. if stored.Has(port) {
    41. stored.Release(port)
    42. } else {
    43. ......
    44. }
    45. delete(c.leaks, port)
    46. // 7、port 重复分配
    47. case portallocator.ErrAllocated:
    48. ......
    49. // 8、port 超出分配范围
    50. case err.(*portallocator.ErrNotInRange):
    51. ......
    52. // 9、port 已经分配完
    53. case portallocator.ErrFull:
    54. ......
    55. default:
    56. ......
    57. }
    58. }
    59. }
    60. // 10、检查 port 是否泄漏
    61. stored.ForEach(func(port int) {
    62. count, found := c.leaks[port]
    63. switch {
    64. case !found:
    65. ......
    66. count = numRepairsBeforeLeakCleanup - 1
    67. fallthrough
    68. case count > 0:
    69. c.leaks[port] = count - 1
    70. if err := rebuilt.Allocate(port); err != nil {
    71. runtime.HandleError(fmt.Errorf("the node port %d may have leaked, but can not be allocated: %v", port, err))
    72. }
    73. default:
    74. ......
    75. }
    76. })
    77. // 11、更新 snapshot
    78. if err := rebuilt.Snapshot(snapshot); err != nil {
    79. return fmt.Errorf("unable to snapshot the updated port allocations: %v", err)
    80. }
    81. ......
    82. return nil
    83. }

    以上就是 bootstrap controller 的主要实现。

    总结

    本文主要分析了 kube-apiserver 中 apiserver service 的实现,apiserver service 是通过 bootstrap controller 控制的,bootstrap controller 会保证 apiserver service 以及其 endpoint 处于正常状态,需要注意的是,apiserver service 的 endpoint 根据启动时指定的参数分为三种控制方式,本文仅分析了 lease 的实现方式,如果使用 master-count 方式,需要将每个 master 实例的 port、apiserver-count 等配置参数改为一致。