Custom Controller 之 Informer (一)

    Informer 在很多组件的源码中可以看到,尤其是 kube-controller-manager (写这篇文章时我已经基本写完 kube-scheduler 的源码分析,着手写 kube-controller-manager 了,鉴于 controlelr 和 client-go 关联比较大,跳过来先讲讲典型的控制器工作流程中涉及到的 client-go 部分).

    Informer 是 client-go 中一个比较核心的工具,通过 Informer(实际我们用到的都不是单纯的 informer,而是组合了各种工具的 sharedInformerFactory) 我们可以轻松 List/Get 某个资源对象,可以监听资源对象的各种事件(比如创建和删除)然后触发回调函数,让我们能够在各种事件发生的时候能够作出相应的逻辑处理。举个例字,当 pod 数量变化的时候 deployment 是不是需要判断自己名下的 pod 数量是否还和预期的一样?如果少了是不是要考虑创建?

    架构概览

    自定义控制器的工作流程基本如下图所示,我们今天要分析图中上半部分的逻辑。(图片来自)

    我们开发自定义控制器的时候用到的“机制”主要定义在 client-go 的 tool/cache下:

    1556075198766

    先关注一下第一幅图中涉及到的一些 components:

    • Reflector: Reflector 类型定义在 cache 包中(tools/cache/reflector.go:47),它的作用是向 apiserver watch 特定的资源类型。这个功能通过其绑定的 ListAndWatch 方法实现。Watch 的资源可以是 in-build 的资源也可以是 custom 的资源。当 Reflector 通过 watch API 接收到存在新的资源对象实例的通知后,它使用相应的 list API 获取新创建的资源对象,然后 put 进 Delta Fifo 队列。这个步骤在 watchHandler 函数(tools/cache/reflector.go:268)中完成。
    • Informer: 一个定义在 cache 包中的基础 controller(tools/cache/controller.go:75) (一个 informer) 从 Delta Fifo 队列中 pop 出来资源对象实例(这个功能在 processLoop 中实现(tools/cache/controller.go:148))。这个 base controller 做的工作是保存这个对象用于后续检索处理用的,然后触发我们自己的控制器来处理这个对象。
    • Indexer: Indexer 提供的是 objects 之上的检索能力。Indexer 也定义在 cache 包中(tools/cache/index.go:27). 一个典型的检索使用方式是基于一个对象的 labels 创建索引。Indexer 可以基于各种索引函数维护索引。Indexer 使用一个线程安全的 store 来存储对象和其对应的 key. 还有一个默认函数 MetaNamespaceKeyFunc(tools/cache/store.go:76) 可以生成对象的 key,类似 <namespace>/<name> 格式来关联对应的对象。

    自定义控制器相关模块

    • Informer reference: 这是一个知道如何处理自定义资源对象的 Informer 实例的引用。自定义控制器需要创建合适的 Informer.
    • Indexer reference: 这是一个知道如何处理自定义资源对象的 Indexer 实例的引用. 自定义控制器代码需要创建这个引用对象,然后用于检索资源对象用于后续的处理。

    Base controller 提供了 NewIndexerInformer(tools/cache/controller.go:345) 函数来创建 Informer 和 Indexer. 在代码里我们可以直接调用这个函数或者使用工厂方法来创建 informer.

    • Resource Event Handlers: 这是一个回调函数,在 Informer 想要分发一个对象给控制器的时候会调用这个函数。典型的用法是写一个函数来获取分发过来的对象的 key,将 key 放入队列中等待进一步的处理。
    • Work queue: 这个队列是在自己的控制器代码中创建的,用来解耦一个对象的分发和处理过程。Resource event handler 函数会被写成提取分发来的对象的 key,然后将这个 key 添加到 work queue 里面。
    • Process Item 这是我们在自己代码中实现的用来处理 work queue 中拿到的 items 的函数。这里可以有一个或多个函数来处理具体的过程,这个函数的典型用法是使用 Indexer 索引或者一个 Listing wrapper 来根据相应的 key 检索对象。

    下面我们根据图中这几个步骤来跟源码。

    Reflector 会监视特定的资源,将变化写入给定的存储中,也就是 Delta FIFO queue.

    Reflector 对象定义

    Reflector 的中文含义是反射器,我们先看一下类型定义:

    !FILENAME tools/cache/reflector.go:47

    中主要就 Reflector 这个 struct 和相关的一些函数:

    ListAndWatch 首先 list 所有 items,获取当前的资源版本信息,然后使用这个版本信息来 watch(也就是从这个版本开始的所有资源变化会被关注)。我们看一下这里的 ListAndWatch 方法主要逻辑:

    !FILENAME tools/cache/reflector.go:168

    1. func (r *Reflector) ListAndWatch(stopCh <-chan struct{}) error {
    2. // list 资源
    3. list, err := r.listerWatcher.List(options)
    4. // 提取 items
    5. items, err := meta.ExtractList(list)
    6. // 更新存储(Delta FIFO)中的 items
    7. if err := r.syncWith(items, resourceVersion); err != nil {
    8. return fmt.Errorf("%s: Unable to sync list result: %v", r.name, err)
    9. }
    10. r.setLastSyncResourceVersion(resourceVersion)
    11. // ……
    12. for {
    13. select {
    14. case <-stopCh:
    15. return nil
    16. default:
    17. }
    18. timeoutSeconds := int64(minWatchTimeout.Seconds() * (rand.Float64() + 1.0))
    19. options = metav1.ListOptions{
    20. ResourceVersion: resourceVersion,
    21. TimeoutSeconds: &timeoutSeconds,
    22. }
    23. r.metrics.numberOfWatches.Inc()
    24. // 开始 watch
    25. w, err := r.listerWatcher.Watch(options)
    26. // ……
    27. // w 交给 watchHandler 处理,这里的逻辑后面分析
    28. if err := r.watchHandler(w, &resourceVersion, resyncerrc, stopCh); err != nil {
    29. if err != errorStopRequested {
    30. klog.Warningf("%s: watch of %v ended with: %v", r.name, r.expectedType, err)
    31. }
    32. return nil
    33. }
    34. }
    35. }

    第二步:watchHandler - add obj to delta fifo

    前面讲到 ListAndWatch 函数的最后一步逻辑是 watchHandler,在 ListAndWatch 中先是更新了 Delta FIFO 中的 item,然后 watch 资源对象,最后交给 watchHandler 处理,所以 watchHandler 基本可以猜到是将有变化的资源添加到 Delta FIFO 中,我们具体来看。

    !FILENAME tools/cache/reflector.go:287

    1. func (r *Reflector) watchHandler(w watch.Interface, resourceVersion *string, errc chan error, stopCh <-chan struct{}) error {
    2. // ……
    3. loop:
    4. // 这里进入一个无限循环
    5. for {
    6. select {
    7. case <-stopCh:
    8. return errorStopRequested
    9. case err := <-errc:
    10. return err
    11. // watch 返回值中的一个 channel
    12. case event, ok := <-w.ResultChan():
    13. // ……
    14. newResourceVersion := meta.GetResourceVersion()
    15. // 根据事件类型处理,有 Added Modified Deleted 3种
    16. // 3 种事件分别对应 store 中的增改删操作
    17. switch event.Type {
    18. case watch.Added:
    19. err := r.store.Add(event.Object)
    20. case watch.Modified:
    21. err := r.store.Update(event.Object)
    22. case watch.Deleted:
    23. err := r.store.Delete(event.Object)
    24. default:
    25. utilruntime.HandleError(fmt.Errorf("%s: unable to understand watch event %#v", r.name, event))
    26. }
    27. *resourceVersion = newResourceVersion
    28. r.setLastSyncResourceVersion(newResourceVersion)
    29. eventCount++
    30. }
    31. }
    32. // ……
    33. return nil
    34. }

    先看 Controller 是什么

    Controller

    Informer 会实现 Controller 接口,这个接口长这样:

    !FILENAME tools/cache/controller.go:82

    1. type Controller interface {
    2. Run(stopCh <-chan struct{})
    3. HasSynced() bool
    4. LastSyncResourceVersion() string
    5. }

    和这个 Controller 对应的有一个基础 controller 实现:

    !FILENAME tools/cache/controller.go:75

    1. type controller struct {
    2. config Config
    3. reflector *Reflector
    4. reflectorMutex sync.RWMutex
    5. clock clock.Clock
    6. }

    controller 类型结构如下:

    1556088003902

    !FILENAME tools/cache/controller.go:100

    1. func (c *controller) Run(stopCh <-chan struct{}) {
    2. defer utilruntime.HandleCrash()
    3. go func() {
    4. <-stopCh
    5. c.config.Queue.Close()
    6. }()
    7. // 内部 Reflector 创建
    8. r := NewReflector(
    9. c.config.ListerWatcher,
    10. c.config.ObjectType,
    11. c.config.Queue,
    12. c.config.FullResyncPeriod,
    13. )
    14. r.clock = c.clock
    15. c.reflectorMutex.Lock()
    16. c.reflector = r
    17. c.reflectorMutex.Unlock()
    18. var wg wait.Group
    19. defer wg.Wait()
    20. wg.StartWithChannel(stopCh, r.Run)
    21. // 循环调用 processLoop
    22. wait.Until(c.processLoop, time.Second, stopCh)
    23. }

    processLoop

    !FILENAME tools/cache/controller.go:148

    1. func (c *controller) processLoop() {
    2. for {
    3. // 主要逻辑
    4. obj, err := c.config.Queue.Pop(PopProcessFunc(c.config.Process))
    5. // 异常处理
    6. }
    7. }

    这里的 Queue 就是 Delta FIFO,Pop 是个阻塞方法,内部实现时会逐个 pop 队列中的数据,交给 PopProcessFunc 处理。我们先不看 Pop 的实现,关注一下 PopProcessFunc 是如何处理 Pop 中从队列拿出来的 item 的。

    PopProcessFunc 是一个类型,如下:

    type PopProcessFunc func(interface{}) error

    所以这里只是一个类型转换,我们关注就行:

    !FILENAME tools/cache/controller.go:367

    1. Process: func(obj interface{}) error {
    2. for _, d := range obj.(Deltas) {
    3. switch d.Type {
    4. // 更新、添加、同步、删除等操作
    5. case Sync, Added, Updated:
    6. if old, exists, err := clientState.Get(d.Object); err == nil && exists {
    7. if err := clientState.Update(d.Object); err != nil {
    8. return err
    9. }
    10. h.OnUpdate(old, d.Object)
    11. } else {
    12. if err := clientState.Add(d.Object); err != nil {
    13. return err
    14. }
    15. h.OnAdd(d.Object)
    16. }
    17. case Deleted:
    18. if err := clientState.Delete(d.Object); err != nil {
    19. return err
    20. }
    21. h.OnDelete(d.Object)
    22. }
    23. }
    24. return nil
    25. },

    这里涉及到2个点:

    • clientState
    • ResourceEventHandler (h)

    我们后面会一一分析到。

    前面说到 clientState,这个变量的初始化是clientState := NewIndexer(DeletionHandlingMetaNamespaceKeyFunc, indexers)

    NewIndexer 代码如下:

    !FILENAME tools/cache/store.go:239

    !FILENAME tools/cache/index.go:27

    1. type Indexer interface {
    2. Store
    3. Index(indexName string, obj interface{}) ([]interface{}, error)
    4. IndexKeys(indexName, indexKey string) ([]string, error)
    5. ListIndexFuncValues(indexName string) []string
    6. ByIndex(indexName, indexKey string) ([]interface{}, error)
    7. GetIndexers() Indexers
    8. AddIndexers(newIndexers Indexers) error
    9. }

    顺带看一下 NewThreadSafeStore()

    !FILENAME tools/cache/thread_safe_store.go:298

    1. func NewThreadSafeStore(indexers Indexers, indices Indices) ThreadSafeStore {
    2. return &threadSafeMap{
    3. items: map[string]interface{}{},
    4. indexers: indexers,
    5. indices: indices,
    6. }
    7. }

    然后关注一下 Process 中的err := clientState.Add(d.Object)的 Add() 方法:

    !FILENAME tools/cache/store.go:123

    1. func (c *cache) Add(obj interface{}) error {
    2. // 计算key;一般是namespace/name
    3. key, err := c.keyFunc(obj)
    4. if err != nil {
    5. return KeyError{obj, err}
    6. }
    7. // Add
    8. c.cacheStorage.Add(key, obj)
    9. return nil
    10. }

    cacheStorage 是一个 ThreadSafeStore 实例,这个 Add() 代码如下:

    !FILENAME tools/cache/thread_safe_store.go:68

    1. func (c *threadSafeMap) Add(key string, obj interface{}) {
    2. c.lock.Lock()
    3. defer c.lock.Unlock()
    4. // 拿出 old obj
    5. oldObject := c.items[key]
    6. // 写入 new obj
    7. c.items[key] = obj
    8. // 更新索引,有一堆逻辑
    9. c.updateIndices(oldObject, obj, key)
    10. }

    这块逻辑先分析到这里,后面关注 threadSafeMap 实现的时候再继续深入。

    第六步:Dispatch Event Handler functions

    我们先看一个接口 SharedInformer

    sharedIndexInformer

    !FILENAME tools/cache/shared_informer.go:43

    1. type SharedInformer interface {
    2. AddEventHandler(handler ResourceEventHandler)
    3. AddEventHandlerWithResyncPeriod(handler ResourceEventHandler, resyncPeriod time.Duration)
    4. GetStore() Store
    5. GetController() Controller
    6. Run(stopCh <-chan struct{})
    7. HasSynced() bool
    8. LastSyncResourceVersion() string
    9. }

    SharedInformer 有一个共享的 data cache,能够分发 changes 通知到缓存,到通过 AddEventHandler 注册了的 listerners. 当你接收到一个通知,缓存的内容能够保证至少和通知中的一样新。

    再看一下 SharedIndexInformer 接口:

    !FILENAME tools/cache/shared_informer.go:66

    1. type SharedIndexInformer interface {
    2. SharedInformer
    3. // AddIndexers add indexers to the informer before it starts.
    4. AddIndexers(indexers Indexers) error
    5. GetIndexer() Indexer
    6. }

    !FILENAME tools/cache/shared_informer.go:127

    1. type sharedIndexInformer struct {
    2. indexer Indexer
    3. controller Controller
    4. processor *sharedProcessor
    5. cacheMutationDetector CacheMutationDetector
    6. listerWatcher ListerWatcher
    7. objectType runtime.Object
    8. resyncCheckPeriod time.Duration
    9. defaultEventHandlerResyncPeriod time.Duration
    10. clock clock.Clock
    11. started, stopped bool
    12. startedLock sync.Mutex
    13. blockDeltas sync.Mutex
    14. }

    这个类型内包了很多我们前面看到过的对象,indexer、controller、listeratcher 都不陌生,我们看这里的 processor 是做什么的:

    sharedProcessor

    类型定义如下:

    !FILENAME tools/cache/shared_informer.go:375

    这里的重点明显是 listeners 属性了,我们继续看 listeners 的类型中的 processorListener:

    processorListener

    !FILENAME tools/cache/shared_informer.go:466

    1. type processorListener struct {
    2. nextCh chan interface{}
    3. addCh chan interface{}
    4. handler ResourceEventHandler
    5. // 一个 ring buffer,保存未分发的通知
    6. pendingNotifications buffer.RingGrowing
    7. // ……
    8. }

    processorListener 主要有2个方法:

    • run()
    • pop()

    processorListener.run()

    先看一下这个 run 做了什么:

    !FILENAME tools/cache/shared_informer.go:540

    1. func (p *processorListener) run() {
    2. stopCh := make(chan struct{})
    3. wait.Until(func() { // 一分钟执行一次这个 func()
    4. // 一分钟内的又有几次重试
    5. err := wait.ExponentialBackoff(retry.DefaultRetry, func() (bool, error) {
    6. // 等待信号 nextCh
    7. for next := range p.nextCh {
    8. // notification 是 next 的实际类型
    9. switch notification := next.(type) {
    10. case updateNotification:
    11. p.handler.OnUpdate(notification.oldObj, notification.newObj)
    12. // add
    13. case addNotification:
    14. p.handler.OnAdd(notification.newObj)
    15. // delete
    16. case deleteNotification:
    17. p.handler.OnDelete(notification.oldObj)
    18. default:
    19. utilruntime.HandleError(fmt.Errorf("unrecognized notification: %#v", next))
    20. }
    21. }
    22. })
    23. if err == nil {
    24. close(stopCh)
    25. }
    26. }, 1*time.Minute, stopCh)
    27. }

    这个 run 过程不复杂,等待信号然后调用 handler 的增删改方法做对应的处理逻辑。case 里的 Notification 再看一眼:

    !FILENAME tools/cache/shared_informer.go:176

    1. type updateNotification struct {
    2. oldObj interface{}
    3. newObj interface{}
    4. }
    5. type addNotification struct {
    6. newObj interface{}
    7. }
    8. type deleteNotification struct {
    9. oldObj interface{}
    10. }

    另外注意到for next := range p.nextCh是下面的 case 执行的前提,也就是说触发点是 p.nextCh,我们接着看 pop 过程(这里的逻辑不简单,可能得多花点精力)

    processorListener.pop()

    !FILENAME tools/cache/shared_informer.go:510

    1. func (p *processorListener) pop() {
    2. defer utilruntime.HandleCrash()
    3. defer close(p.nextCh) // Tell .run() to stop
    4. // 这个 chan 是没有初始化的
    5. var nextCh chan<- interface{}
    6. // 可以接收任意类型,其实是对应前面提到的 addNotification 等
    7. var notification interface{}
    8. // for 循环套 select 是比较常规的写法
    9. for {
    10. select {
    11. //第一遍执行到这里的时候由于 nexth 没有初始化,所以这里会阻塞(和notification有没有值没有关系,notification哪怕是nil也可以写入 chan interface{} 类型的 channel)
    12. case nextCh <- notification:
    13. var ok bool
    14. // 第二次循环,下面一个case运行过之后才有这里的逻辑
    15. notification, ok = p.pendingNotifications.ReadOne()
    16. if !ok {
    17. // 将 channel 指向 nil 相当于初始化的逆操作,会使得这个 case 条件阻塞
    18. nextCh = nil
    19. }
    20. // 这里是 for 首次执行逻辑的入口
    21. case notificationToAdd, ok := <-p.addCh:
    22. if !ok {
    23. return
    24. }
    25. // 如果是 nil,也就是第一个通知过来的时候,这时不需要用到缓存(和下面else相对)
    26. if notification == nil {
    27. // 赋值给 notification,这样上面一个 case 在接下来的一轮循化中就可以读到了
    28. notification = notificationToAdd
    29. // 相当于复制引用,nextCh 就指向了 p.nextCh,使得上面 case 写 channel 的时候本质上操作了 p.nextCh,从而 run 能够读到 p.nextCh 中的信号
    30. nextCh = p.nextCh
    31. } else {
    32. // 处理到这里的时候,其实第一个 case 已经有了首个 notification,这里的逻辑是一下子来了太多 notification 就往 pendingNotifications 缓存,在第一个 case 中 有对应的 ReadOne()操作
    33. p.pendingNotifications.WriteOne(notificationToAdd)
    34. }
    35. }
    36. }
    37. }

    这里的 pop 逻辑的入口是<-p.addCh,我们继续向上找一下这个 addCh 的来源:

    processorListener.add()

    !FILENAME tools/cache/shared_informer.go:506

    1. func (p *processorListener) add(notification interface{}) {
    2. p.addCh <- notification
    3. }

    这个 add() 方法又在哪里被调用呢?

    sharedProcessor.distribute()

    !FILENAME tools/cache/shared_informer.go:400

    1. func (p *sharedProcessor) distribute(obj interface{}, sync bool) {
    2. p.listenersLock.RLock()
    3. defer p.listenersLock.RUnlock()
    4. if sync {
    5. for _, listener := range p.syncingListeners {
    6. listener.add(obj)
    7. }
    8. } else {
    9. for _, listener := range p.listeners {
    10. listener.add(obj)
    11. }
    12. }
    13. }

    这个方法逻辑比较简洁,分发对象。我们继续看哪里进入的 distribute:

    !FILENAME tools/cache/shared_informer.go:344

    1. func (s *sharedIndexInformer) HandleDeltas(obj interface{}) error {
    2. s.blockDeltas.Lock()
    3. defer s.blockDeltas.Unlock()
    4. // from oldest to newest
    5. for _, d := range obj.(Deltas) {
    6. switch d.Type { // 根据 DeltaType 选择 case
    7. case Sync, Added, Updated:
    8. isSync := d.Type == Sync
    9. s.cacheMutationDetector.AddObject(d.Object)
    10. if old, exists, err := s.indexer.Get(d.Object); err == nil && exists {
    11. // indexer 更新的是本地 store
    12. if err := s.indexer.Update(d.Object); err != nil {
    13. return err
    14. }
    15. // 前面分析的 distribute;update
    16. s.processor.distribute(updateNotification{oldObj: old, newObj: d.Object}, isSync)
    17. } else {
    18. if err := s.indexer.Add(d.Object); err != nil {
    19. return err
    20. }
    21. // 前面分析的 distribute;add
    22. s.processor.distribute(addNotification{newObj: d.Object}, isSync)
    23. }
    24. case Deleted:
    25. if err := s.indexer.Delete(d.Object); err != nil {
    26. return err
    27. }
    28. // 前面分析的 distribute;delete
    29. s.processor.distribute(deleteNotification{oldObj: d.Object}, false)
    30. }
    31. }
    32. return nil
    33. }

    继续往前看代码逻辑。

    sharedIndexInformer.Run()

    !FILENAME tools/cache/shared_informer.go:189

    看到这里已经挺和谐了,在 sharedIndexInformer 的 Run() 方法中先是创建一个 DeltaFIFO,然后和 lw 一起初始化 cfg,利用 cfg 创建 controller,最后 Run 这个 controller,也就是最基础的 informer.

    在这段代码里我们还注意到有一步是s.processor.run,我们看一下这个 run 的逻辑。

    sharedProcessor.run()

    !FILENAME tools/cache/shared_informer.go:415

    1. func (p *sharedProcessor) run(stopCh <-chan struct{}) {
    2. func() {
    3. p.listenersLock.RLock()
    4. defer p.listenersLock.RUnlock()
    5. for _, listener := range p.listeners {
    6. // 前面详细讲过 listener.run
    7. p.wg.Start(listener.run)
    8. // 前面详细讲过 listener.pop
    9. p.wg.Start(listener.pop)
    10. }
    11. p.listenersStarted = true
    12. }()
    13. <-stopCh

    撇开细节,可以看到这里调用了内部所有 listener 的 run() 和 pop() 方法,和前面的分析呼应上了。