for informerType, informer := range f.informers { if !f.startedInformers[informerType] { go informer.Run(stopCh) f.startedInformers[informerType] = true } } }
informer 的实际类型是 sharedIndexInformer ,因此此处调用的就是 client-go 库 tools/cache/shared_informer.go 中的 Run 方法:
// Separate stop channel because Processor should be stopped strictly after controller processorStopCh := make(chanstruct{}) var wg wait.Group defer wg.Wait() // Wait for Processor to stop deferclose(processorStopCh) // Tell Processor to stop wg.StartWithChannel(processorStopCh, s.cacheMutationDetector.Run) wg.StartWithChannel(processorStopCh, s.processor.run)
deferfunc() { s.startedLock.Lock() defer s.startedLock.Unlock() s.stopped = true// Don't want any new listeners }() s.controller.Run(stopCh) }
for { // give the stopCh a chance to stop the loop, even in case of continue statements further down on errors select { case <-stopCh: returnnil default: }
func(f *DeltaFIFO)Pop(process PopProcessFunc)(interface{}, error) { f.lock.Lock() defer f.lock.Unlock() for { forlen(f.queue) == 0 { if f.IsClosed() { returnnil, ErrFIFOClosed }
f.cond.Wait() } id := f.queue[0] f.queue = f.queue[1:] if f.initialPopulationCount > 0 { f.initialPopulationCount-- } item, ok := f.items[id] if !ok { // Item may have been deleted subsequently. continue } delete(f.items, id) err := process(item) if e, ok := err.(ErrRequeue); ok { f.addIfNotPresent(id, item) err = e.Err } // Don't need to copyDeltas here, because we're transferring // ownership to the caller. return item, err } }
Pop 方法作为消费者方法使用,从 DeltaFIFO 的头部取出最早进入队列的资源对象,Pop 方法须传入 process 方法,这是用来接收并处理资源对象的回调方法。
当队列中没有数据时,通过 f.cond.wait 阻塞等待数据,只有收到 cond.Broadcast 时才会解除当前的阻塞状态,如果队列中部为空,取出 f.queue 头部的资源对象, 并将该对象传入 process 回调函数中,由上层消费者进行处理。
这个地方首次制定了 Process 方法为 s.HandleDeltas ,同时在 client-go/tools/cache/controller.go 中的控制循环中,循环调用了 Pop 方法:
1 2 3 4 5 6 7 8 9 10 11 12 13 14
func(c *controller)processLoop() { for { obj, err := c.config.Queue.Pop(PopProcessFunc(c.config.Process)) if err != nil { if err == ErrFIFOClosed { return } if c.config.RetryOnError { // This is the safe way to re-enqueue. c.config.Queue.AddIfNotPresent(obj) } } } }
// client-go/tools/cache/shared_informer.go func(p *processorListener)run() { stopCh := make(chanstruct{}) wait.Until(func() { for next := range p.nextCh { switch notification := next.(type) { case updateNotification: p.handler.OnUpdate(notification.oldObj, notification.newObj) case addNotification: p.handler.OnAdd(notification.newObj) case deleteNotification: p.handler.OnDelete(notification.oldObj) default: utilruntime.HandleError(fmt.Errorf("unrecognized notification: %T", next)) } } // the only way to get here is if the p.nextCh is empty and closed close(stopCh) }, 1*time.Second, stopCh) }
控制逻辑——以 Deployment 为例
7) Enqueue Object Key
这里的队列逻辑就比较简单了,就在 controller.go (控制器) 中:
1 2 3 4 5 6 7 8 9
func(dc *DeploymentController)enqueue(deployment *apps.Deployment) { key, err := controller.KeyFunc(deployment) if err != nil { utilruntime.HandleError(fmt.Errorf("couldn't get key for object %#v: %v", deployment, err)) return }
dc.queue.Add(key) }
这里的 queue 采用的是一个限速队列:
1 2 3 4 5
type DeploymentController struct { ... // Deployments that need to be synced queue workqueue.RateLimitingInterface }
func(dc *DeploymentController)syncDeployment(key string)error { namespace, name, err := cache.SplitMetaNamespaceKey(key) if err != nil { klog.ErrorS(err, "Failed to split meta namespace cache key", "cacheKey", key) return err } startTime := time.Now() klog.V(4).InfoS("Started syncing deployment", "deployment", klog.KRef(namespace, name), "startTime", startTime) deferfunc() { klog.V(4).InfoS("Finished syncing deployment", "deployment", klog.KRef(namespace, name), "duration", time.Since(startTime)) }() deployment, err := dc.dLister.Deployments(namespace).Get(name) if errors.IsNotFound(err) { klog.V(2).InfoS("Deployment has been deleted", "deployment", klog.KRef(namespace, name)) returnnil } if err != nil { return err } // Deep-copy otherwise we are mutating our cache. // TODO: Deep-copy only when needed. d := deployment.DeepCopy() everything := metav1.LabelSelector{} if reflect.DeepEqual(d.Spec.Selector, &everything) { // deployment 必须包含 selector 标签 dc.eventRecorder.Eventf(d, v1.EventTypeWarning, "SelectingAll", "This deployment is selecting all pods. A non-empty selector is required.") if d.Status.ObservedGeneration < d.Generation { d.Status.ObservedGeneration = d.Generation dc.client.AppsV1().Deployments(d.Namespace).UpdateStatus(context.TODO(), d, metav1.UpdateOptions{}) } returnnil }
// List ReplicaSets owned by this Deployment, while reconciling ControllerRef // through adoption/orphaning. // 获取 deployment 所控制的 replicaSet rsList, err := dc.getReplicaSetsForDeployment(d) if err != nil { return err } // List all Pods owned by this Deployment, grouped by their ReplicaSet. // Current uses of the podMap are: // // * check if a Pod is labeled correctly with the pod-template-hash label. // * check that no old Pods are running in the middle of Recreate Deployments. podMap, err := dc.getPodMapForDeployment(d, rsList) if err != nil { return err } if d.DeletionTimestamp != nil { return dc.syncStatusOnly(d, rsList) }
// Update deployment conditions with an Unknown condition when pausing/resuming // a deployment. In this way, we can be sure that we won't timeout when a user // resumes a Deployment with a set progressDeadlineSeconds. if err = dc.checkPausedConditions(d); err != nil { return err } if d.Spec.Paused { return dc.sync(d, rsList) } // rollback is not re-entrant in case the underlying replica sets are updated with a new // revision so we should ensure that we won't proceed to update replica sets until we // make sure that the deployment has cleaned up its rollback spec in subsequent enqueues. // 判断 deployment 事件是否是一个回滚事件 // 回滚状态发生后,为了防止底层 replicaset 被新版本更新,此时回滚状态不能被重新进入 // 直到确认 deployment 在后续队列中清除了 rollback spec if getRollbackTo(d) != nil { return dc.rollback(d, rsList) }