Controller 源码分析

源码角度学习 Controller。(Kubernetes 1.19)

下面是一张关于 Controller 运行流程的图,非常清晰易懂,在网上广为流传。

Controller%20%E6%BA%90%E7%A0%81%E5%88%86%E6%9E%90%2076b5a3082b8842e1b8549a128955b05b/Untitled.png

整个控制器的处理流程以间隔线为界分成了上下两个部分:

上半部分与 api server 交互,监听资源状态并将资源变更事件发送到 Resource Event Handler 中,这部分的代码逻辑在 Kubernetes 各个组件中都有使用,因此将其封装为一个 client-go 包;

下半部分为 controller 对事件的处理,这部分控制器完成了资源实际状态与期望状态的“拟合”,主要的控制逻辑在这个地方实现,后文中以 Deployment 为例进行分析。

本文希望更加深入,从源码层级上观察与理解 Controller 的具体逻辑和实现。


启动控制器

首先不去管控制器实现的细节,我们从入口开始,了解如何启动一个控制器。

内置控制器通常在 cmd/kube-controller-manager/app/controllermanager.go 中进行初始化:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
// NewControllerInitializers is a public map of named controller groups (you can start more than one in an init func)
// paired to their InitFunc. This allows for structured downstream composition and subdivision.
func NewControllerInitializers(loopMode ControllerLoopMode) map[string]InitFunc {
controllers := map[string]InitFunc{}
controllers["endpoint"] = startEndpointController
controllers["endpointslice"] = startEndpointSliceController
controllers["endpointslicemirroring"] = startEndpointSliceMirroringController
controllers["replicationcontroller"] = startReplicationController
controllers["podgc"] = startPodGCController
controllers["resourcequota"] = startResourceQuotaController
controllers["namespace"] = startNamespaceController
controllers["serviceaccount"] = startServiceAccountController
controllers["garbagecollector"] = startGarbageCollectorController
controllers["daemonset"] = startDaemonSetController
controllers["job"] = startJobController
controllers["deployment"] = startDeploymentController
controllers["replicaset"] = startReplicaSetController
controllers["horizontalpodautoscaling"] = startHPAController
controllers["disruption"] = startDisruptionController
controllers["statefulset"] = startStatefulSetController
controllers["cronjob"] = startCronJobController
controllers["csrsigning"] = startCSRSigningController
controllers["csrapproving"] = startCSRApprovingController
controllers["csrcleaner"] = startCSRCleanerController
controllers["ttl"] = startTTLController
controllers["bootstrapsigner"] = startBootstrapSignerController
controllers["tokencleaner"] = startTokenCleanerController
controllers["nodeipam"] = startNodeIpamController
controllers["nodelifecycle"] = startNodeLifecycleController
if loopMode == IncludeCloudLoops {
controllers["service"] = startServiceController
controllers["route"] = startRouteController
controllers["cloud-node-lifecycle"] = startCloudNodeLifecycleController
// TODO: volume controller into the IncludeCloudLoops only set.
}
controllers["persistentvolume-binder"] = startPersistentVolumeBinderController
controllers["attachdetach"] = startAttachDetachController
controllers["persistentvolume-expander"] = startVolumeExpandController
controllers["clusterrole-aggregation"] = startClusterRoleAggregrationController
controllers["pvc-protection"] = startPVCProtectionController
controllers["pv-protection"] = startPVProtectionController
controllers["ttl-after-finished"] = startTTLAfterFinishedController
controllers["root-ca-cert-publisher"] = startRootCACertPublisher
controllers["ephemeral-volume"] = startEphemeralVolumeController
if utilfeature.DefaultFeatureGate.Enabled(genericfeatures.APIServerIdentity) &&
utilfeature.DefaultFeatureGate.Enabled(genericfeatures.StorageVersionAPI) {
controllers["storage-version-gc"] = startStorageVersionGCController
}

return controllers
}

初始化函数 initFunc 在 cmd/kube-controller-manager/app/ 目录下个各文件中,以 Deployment 为例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
func startDeploymentController(ctx ControllerContext) (http.Handler, bool, error) {
if !ctx.AvailableResources[schema.GroupVersionResource{Group: "apps", Version: "v1", Resource: "deployments"}] {
return nil, false, nil
}
dc, err := deployment.NewDeploymentController(
ctx.InformerFactory.Apps().V1().Deployments(),
ctx.InformerFactory.Apps().V1().ReplicaSets(),
ctx.InformerFactory.Core().V1().Pods(),
ctx.ClientBuilder.ClientOrDie("deployment-controller"),
)
if err != nil {
return nil, true, fmt.Errorf("error creating Deployment controller: %v", err)
}
go dc.Run(int(ctx.ComponentConfig.DeploymentController.ConcurrentDeploymentSyncs), ctx.Stop)
return nil, true, nil
}

注意这里的 dc.Run() 方法,第一个参数为 worker 的数量,第二个参数为空结构体,用来接收异常停止的信号。

与之类似的,CRD 控制器的启动逻辑也是相同的:

1
2
3
4
5
6
7
8
9
10
11
12
13
func main() {
...
mydemoInformerFactory := informers.NewSharedInformerFactory(mydemoClient, time.Second*30)

//生成一个crddemo组的Mydemo对象传递给自定义控制器
controller := NewController(kubeClient, mydemoClient,
mydemoInformerFactory().V1().Mydemos())

go mydemoInformerFactory.Start(stopCh)

if err = controller.Run(2, stopCh); err != nil {
glog.Fatalf("Error running controller: %s", err.Error())
}

内置控制器中的 ctx.InformerFactory 下面 mydemoInformerFactory 等价,本身也是对 informers.SharedInformerFactory 的封装。


构建控制器

那么构建一个控制器需要做些什么?首先看控制器需要完成什么工作:

  • 从 api server 中监听对象的变化情况 (Client → Reflector List&Watch) ;
  • 将对象变化的事件同步到缓存中 (Indexer) ;
  • 根据事件类型,触发注册的 ResourceEventHandler,完成业务逻辑 (Informer)。

那么为了方便阅读,这里把这张图又放了一遍,我们将以图中的编号为序,逐步分析控制器的代码细节。

Controller%20%E6%BA%90%E7%A0%81%E5%88%86%E6%9E%90%2076b5a3082b8842e1b8549a128955b05b/Untitled.png


Client-go

Kubernetes 项目中,将与 api server 交互的操作进行了封装,这也就是 client-go,下面以 CRD 中的 controller 为例分析具体的流程:

1) List & Watch

client 对制定 Kubernetes 资源进行监控 (Watch),主要依赖于 Reflector 。初始化 Reflector 需要传入 ListerWatcher 数据接口对象,实现 List 和 Watch 方法。

调用的起点是 main.go

1
go mydemoInformerFactory.Start(stopCh)

这里调用的是 pkg/client/informers/externalversions/factory.go 中的 Start 方法,该方法会把工厂 factory 中保存的所有 informer 全部启动:

1
2
3
4
5
6
7
8
9
10
11
func (f *sharedInformerFactory) Start(stopCh <-chan struct{}) {
f.lock.Lock()
defer f.lock.Unlock()

for informerType, informer := range f.informers {
if !f.startedInformers[informerType] {
go informer.Run(stopCh)
f.startedInformers[informerType] = true
}
}
}

informer 的实际类型是 sharedIndexInformer ,因此此处调用的就是 client-go 库 tools/cache/shared_informer.go 中的 Run 方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
func (s *sharedIndexInformer) Run(stopCh <-chan struct{}) {
defer utilruntime.HandleCrash()

fifo := NewDeltaFIFOWithOptions(DeltaFIFOOptions{
KnownObjects: s.indexer,
EmitDeltaTypeReplaced: true,
})

cfg := &Config{
Queue: fifo,
ListerWatcher: s.listerWatcher,
ObjectType: s.objectType,
FullResyncPeriod: s.resyncCheckPeriod,
RetryOnError: false,
ShouldResync: s.processor.shouldResync,

Process: s.HandleDeltas,
}

func() {
s.startedLock.Lock()
defer s.startedLock.Unlock()

s.controller = New(cfg)
s.controller.(*controller).clock = s.clock
s.started = true
}()

// Separate stop channel because Processor should be stopped strictly after controller
processorStopCh := make(chan struct{})
var wg wait.Group
defer wg.Wait() // Wait for Processor to stop
defer close(processorStopCh) // Tell Processor to stop
wg.StartWithChannel(processorStopCh, s.cacheMutationDetector.Run)
wg.StartWithChannel(processorStopCh, s.processor.run)

defer func() {
s.startedLock.Lock()
defer s.startedLock.Unlock()
s.stopped = true // Don't want any new listeners
}()
s.controller.Run(stopCh)
}

需要注意的是,这里的 controller 并不是图中的控制器,这个 controller 更多的是 sharedIndexInformer 中一个底层控制逻辑的实现,可以看到在 controller 的初始化中,就直接新建了一个 Reflector:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
func (c *controller) Run(stopCh <-chan struct{}) {
defer utilruntime.HandleCrash()
go func() {
<-stopCh
c.config.Queue.Close()
}()
r := NewReflector(
c.config.ListerWatcher,
c.config.ObjectType,
c.config.Queue,
c.config.FullResyncPeriod,
)
r.ShouldResync = c.config.ShouldResync
r.clock = c.clock

c.reflectorMutex.Lock()
c.reflector = r
c.reflectorMutex.Unlock()

var wg wait.Group
defer wg.Wait()

wg.StartWithChannel(stopCh, r.Run)

wait.Until(c.processLoop, time.Second, stopCh)
}

前面提到,创建 Reflector 需要实现 List 与 Watch 方法,这一步在创建 SharedIndexInformer 时传入:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
func NewFilteredMydemoInformer(client versioned.Interface, namespace string, resyncPeriod time.Duration, indexers cache.Indexers, tweakListOptions internalinterfaces.TweakListOptionsFunc) cache.SharedIndexInformer {
return cache.NewSharedIndexInformer(
&cache.ListWatch{
ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
if tweakListOptions != nil {
tweakListOptions(&options)
}
return client.CrddemoV1().Mydemos(namespace).List(context.TODO(), options)
},
WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
if tweakListOptions != nil {
tweakListOptions(&options)
}
return client.CrddemoV1().Mydemos(namespace).Watch(context.TODO(), options)
},
},
&crddemov1.Mydemo{},
resyncPeriod,
indexers,
)
}

传入的 ListFunc 和 WatchFunc 完成了对 client 中 List/Watch 方法的封装,这两个方法都是由代码生成器生成。

同时也注意到,在创建 Reflector 时,也同时创建了一个 DeltaFIFO 作为 Store 接口的实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
// client-go/tools/cache/shared_informer.go
// sharedIndexInformer.Run 中创建 DeltaFIFO
fifo := NewDeltaFIFOWithOptions(DeltaFIFOOptions{
KnownObjects: s.indexer,
EmitDeltaTypeReplaced: true,
})

cfg := &Config{
Queue: fifo,
ListerWatcher: s.listerWatcher,
ObjectType: s.objectType,
FullResyncPeriod: s.resyncCheckPeriod,
RetryOnError: false,
ShouldResync: s.processor.shouldResync,

Process: s.HandleDeltas,
}
1
2
3
4
5
6
7
8
// client-go/tools/cache/controller.go
// controller.Run 中创建 Reflector
r := NewReflector(
c.config.ListerWatcher,
c.config.ObjectType,
c.config.Queue,
c.config.FullResyncPeriod,
)

2) Add Object

  1. 在程序第一次运行时会首先进行一次 List,用来获取该资源下所有对象数据,并导入到 DeltaFIFO 中,这部分逻辑在 cilent-go/tools/cache/reflector.goReflector.ListAndWatch 方法中,具体代码比较长,这里把几个关键步骤以流程图的形式画出来:

    流程图

    List 调用的是初始化 Informer 时传入的 ListFunc 方法, r.syncWith 将资源对象传入 DeltaFIFO 时调用的是 DeltaFIFO 中的 Replace 方法 。

  2. 另外还需要监控资源对象的变更情况,监控 (Watch) 操作是通过 HTTP 协议与 api server 建立长连接,使用 HTTP 协议的分块传输编码来实现,Reflector 中关于 Watch 部分的代码:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    for {
    // give the stopCh a chance to stop the loop, even in case of continue statements further down on errors
    select {
    case <-stopCh:
    return nil
    default:
    }

    timeoutSeconds := int64(minWatchTimeout.Seconds() * (rand.Float64() + 1.0))
    options = metav1.ListOptions{
    ResourceVersion: resourceVersion,
    TimeoutSeconds: &timeoutSeconds,
    AllowWatchBookmarks: true,
    }

    w, err := r.listerWatcher.Watch(options)
    if err != nil {
    ...
    }

    if err := r.watchHandler(w, &resourceVersion, resyncerrc, stopCh); err != nil {
    if err != errorStopRequested {
    ...
    }
    return nil
    }
    }

    [r.listerWatcher.Watch](<http://r.listerWatcher.Watch>) 函数同样也是调用 Informer 中的 WatchFunc, r.watchHandler 用于处理资源对象的变更事件,当触发 Added、Updated、Deleted 事件时,会将事件更新到 DeltaFIFO 中。

    所有向 DeltaFIFO 中传递资源对象都是调用 r.store 中的方法(其实就是 Store 抽象接口由 DeltaFIFO 实现,如果这里需要实现一个不一样的队列,可以直接自己写个队列,初始化 SharedInformer 时代入进去就行)


3) Pop Object

上面提到了 Reflector 通过调用 [r.store](http://r.store) 中的方法,完成向 DeltaFIFO 中添加对象的操作,那么这里就从 DeltaFIFO 本身的角度上来讲讲它的数据结构,以及入队出队的具体实现。

DeltaFIFO 的 Add、Update、Delete 方法中,都用到了 queueActionLocked(),它是 DeltaFIFO 实现的关键:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
func (f *DeltaFIFO) queueActionLocked(actionType DeltaType, obj interface{}) error {
id, err := f.KeyOf(obj)
if err != nil {
return KeyError{obj, err}
}

newDeltas := append(f.items[id], Delta{actionType, obj})
newDeltas = dedupDeltas(newDeltas)

if len(newDeltas) > 0 {
if _, exists := f.items[id]; !exists {
f.queue = append(f.queue, id)
}
f.items[id] = newDeltas
f.cond.Broadcast()
} else {
delete(f.items, id)
}
return nil
}

DeltaFIFO 的结构如下图所示,对于每个传入的 obj,都会根据 KeyOf 计算其 Key 值,再根据 Key 存入到 items 中,items 是一个 map 类型,键为这里得到的 Key,值为obj的切片类型。

在 obj 导入到 items 后,需要使用 dedupDeltas 进行一次去重操作。将 newDeltas 正确地放入 DeltaFIFO 中,会通知所有下游 goroutine 解除阻塞。

Controller%20%E6%BA%90%E7%A0%81%E5%88%86%E6%9E%90%2076b5a3082b8842e1b8549a128955b05b/-2.png

以上可以被称为 DeltaFIFO 的“生产者”过程,下面来看下游 goroutine 如何“消费” DeltaObj。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
func (f *DeltaFIFO) Pop(process PopProcessFunc) (interface{}, error) {
f.lock.Lock()
defer f.lock.Unlock()
for {
for len(f.queue) == 0 {
if f.IsClosed() {
return nil, ErrFIFOClosed
}

f.cond.Wait()
}
id := f.queue[0]
f.queue = f.queue[1:]
if f.initialPopulationCount > 0 {
f.initialPopulationCount--
}
item, ok := f.items[id]
if !ok {
// Item may have been deleted subsequently.
continue
}
delete(f.items, id)
err := process(item)
if e, ok := err.(ErrRequeue); ok {
f.addIfNotPresent(id, item)
err = e.Err
}
// Don't need to copyDeltas here, because we're transferring
// ownership to the caller.
return item, err
}
}

Pop 方法作为消费者方法使用,从 DeltaFIFO 的头部取出最早进入队列的资源对象,Pop 方法须传入 process 方法,这是用来接收并处理资源对象的回调方法。

当队列中没有数据时,通过 f.cond.wait 阻塞等待数据,只有收到 cond.Broadcast 时才会解除当前的阻塞状态,如果队列中部为空,取出 f.queue 头部的资源对象, 并将该对象传入 process 回调函数中,由上层消费者进行处理。

那么上层消费者是在什么时候将处理函数传入的呢?回到上面创建 sharedIndexInformer.Run 的代码中:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
// client-go/tools/cache/shared_informer.go
// sharedIndexInformer.Run 中创建 DeltaFIFO
fifo := NewDeltaFIFOWithOptions(DeltaFIFOOptions{
KnownObjects: s.indexer,
EmitDeltaTypeReplaced: true,
})

cfg := &Config{
Queue: fifo,
ListerWatcher: s.listerWatcher,
ObjectType: s.objectType,
FullResyncPeriod: s.resyncCheckPeriod,
RetryOnError: false,
ShouldResync: s.processor.shouldResync,

Process: s.HandleDeltas,
}

这个地方首次制定了 Process 方法为 s.HandleDeltas ,同时在 client-go/tools/cache/controller.go 中的控制循环中,循环调用了 Pop 方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
func (c *controller) processLoop() {
for {
obj, err := c.config.Queue.Pop(PopProcessFunc(c.config.Process))
if err != nil {
if err == ErrFIFOClosed {
return
}
if c.config.RetryOnError {
// This is the safe way to re-enqueue.
c.config.Queue.AddIfNotPresent(obj)
}
}
}
}

HandlDeltas 作为 process 的回调函数,当资源对象的操作类型为 Added、Updated、Deleted 时,将该资源对象存储到 Indexer 中,并通过 distribute 函数将资源对象分发到 SharedInformer:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
func (s *sharedIndexInformer) HandleDeltas(obj interface{}) error {
s.blockDeltas.Lock()
defer s.blockDeltas.Unlock()

// from oldest to newest
for _, d := range obj.(Deltas) {
switch d.Type {
case Sync, Replaced, Added, Updated:
s.cacheMutationDetector.AddObject(d.Object)
if old, exists, err := s.indexer.Get(d.Object); err == nil && exists {
if err := s.indexer.Update(d.Object); err != nil {
return err
}

isSync := false
switch {
case d.Type == Sync:
// Sync events are only propagated to listeners that requested resync
isSync = true
case d.Type == Replaced:
if accessor, err := meta.Accessor(d.Object); err == nil {
if oldAccessor, err := meta.Accessor(old); err == nil {
// Replaced events that didn't change resourceVersion are treated as resync events
// and only propagated to listeners that requested resync
isSync = accessor.GetResourceVersion() == oldAccessor.GetResourceVersion()
}
}
}
s.processor.distribute(updateNotification{oldObj: old, newObj: d.Object}, isSync)
} else {
if err := s.indexer.Add(d.Object); err != nil {
return err
}
s.processor.distribute(addNotification{newObj: d.Object}, false)
}
case Deleted:
if err := s.indexer.Delete(d.Object); err != nil {
return err
}
s.processor.distribute(deleteNotification{oldObj: d.Object}, false)
}
}
return nil
}

显然这个地方 s.indexer.Add 对应的是接下来的第四步, distribute 对应的是接下来的第六步(这个地方出现了分流)。


4) Add Object

Indexer 本身是一个线程安全缓存的封装 ThreadSafeMap ( client-go/tools/cache/thread_safe_store.go )。

线程安全存储的结构如下:

1
2
3
4
5
6
7
8
9
type threadSafeMap struct {
lock sync.RWMutex
items map[string]interface{}

// indexers maps a name to an IndexFunc
indexers Indexers
// indices maps a name to an Index
indices Indices
}

items 字段存储的是资源对象,key 是通过 keyFunc 函数 (默认为 MetaNamespaceKeyFunc) 计算得到,值用来存储资源对象;

indexers 与 indices 的关系如下所示:

1
2
3
4
5
6
7
type Indexers map[string]IndexFunc

type IndexFunc func(obj interface{}) ([]string, error)

type Indices map[string]Index

type Index map[string]sets.String
  • Indexers 为索引器,key 为索引器的名称,value 为索引函数。制定不同索引器是为了实现多样化的查询,如根据用户查询、根据某一标签查询等;
  • IndexFunc 为索引函数,定义为接收一个 Obj,返回检索结果列表;
  • Indices 为存储器,key 为存储器名称,value 为缓存数据;
  • Index 为缓存数据,结构为 K/V,sets 是利用 map 实现的一个集合结构,实现去重特性。

5) Store Object & Key

与 4) 步可以放在一起,本质上都是 ThreadSafeMap 中实现。


6) Dispatch Event Handler functions (Send Object to Custom Controller)

SharedInformer 中事件的处理依赖于 processorLister ,其引用关系为 sharedIndexInformersharedProcessorprocessorListenerhandler.

1
2
3
4
5
type sharedIndexInformer struct {
...
precessor *sharedProcessor
...
}

sharedIndexInformerHandleDeltas 中根据资源对象操作的类型分别调用:

1
2
3
4
5
6
7
8
9
10
11
12
case d.Type = Added, Updated:
if old, exists, err := s.indexer.Get(d.Object), err == nil && exists {
s.processor.distribute(updateNotification{oldObj: old, newObj: d.Object}, isSync)
} else {
s.processor.distribute(addNotification{newObj: d.Object}, false)
}
case Deleted:
if err := s.indexer.Delete(d.Object); err != nil {
return err
}
s.processor.distribute(deleteNotification{oldObj: d.Object}, false}
}

事件类型以 Notification 的形式传递给了 processorListenerprocessorListener 会根据事件类型调用不同的回调函数:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
// client-go/tools/cache/shared_informer.go
func (p *processorListener) run() {
stopCh := make(chan struct{})
wait.Until(func() {
for next := range p.nextCh {
switch notification := next.(type) {
case updateNotification:
p.handler.OnUpdate(notification.oldObj, notification.newObj)
case addNotification:
p.handler.OnAdd(notification.newObj)
case deleteNotification:
p.handler.OnDelete(notification.oldObj)
default:
utilruntime.HandleError(fmt.Errorf("unrecognized notification: %T", next))
}
}
// the only way to get here is if the p.nextCh is empty and closed
close(stopCh)
}, 1*time.Second, stopCh)
}

控制逻辑——以 Deployment 为例

7) Enqueue Object Key

这里的队列逻辑就比较简单了,就在 controller.go (控制器) 中:

1
2
3
4
5
6
7
8
9
func (dc *DeploymentController) enqueue(deployment *apps.Deployment) {
key, err := controller.KeyFunc(deployment)
if err != nil {
utilruntime.HandleError(fmt.Errorf("couldn't get key for object %#v: %v", deployment, err))
return
}

dc.queue.Add(key)
}

这里的 queue 采用的是一个限速队列:

1
2
3
4
5
type DeploymentController struct {
...
// Deployments that need to be synced
queue workqueue.RateLimitingInterface
}

还有一些其他队列的实现,都在 client-go/util/workqueue/ 中。


8) Get Key

pkg/controller/deployment/deployment_controller.go

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
func (dc *DeploymentController) worker() }
// 很有意思
for dc.processNextWorkItem() {
}
}

func (dc *DeploymentController) processNextWorkItem() bool {
// 从队列头部取出对象
key, quit := dc.queue.Get()
if quit {
return false
}
defer dc.queue.Done(key)
// 处理对象
err := dc.syncHandler(key.(string))
dc.handleErr(err, key)

return true
}

Deployment controller 的 worker 函数不断调用 processNextWorkItem 函数,从 workqueue 中获取待处理的对象,如果存在,则在 syncHandler 中处理相应的增删改查逻辑;如果不存在,则退出。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
func (dc *DeploymentController) syncDeployment(key string) error {
namespace, name, err := cache.SplitMetaNamespaceKey(key)
if err != nil {
klog.ErrorS(err, "Failed to split meta namespace cache key", "cacheKey", key)
return err
}

startTime := time.Now()
klog.V(4).InfoS("Started syncing deployment", "deployment", klog.KRef(namespace, name), "startTime", startTime)
defer func() {
klog.V(4).InfoS("Finished syncing deployment", "deployment", klog.KRef(namespace, name), "duration", time.Since(startTime))
}()

deployment, err := dc.dLister.Deployments(namespace).Get(name)
if errors.IsNotFound(err) {
klog.V(2).InfoS("Deployment has been deleted", "deployment", klog.KRef(namespace, name))
return nil
}
if err != nil {
return err
}

// Deep-copy otherwise we are mutating our cache.
// TODO: Deep-copy only when needed.
d := deployment.DeepCopy()

everything := metav1.LabelSelector{}
if reflect.DeepEqual(d.Spec.Selector, &everything) {
// deployment 必须包含 selector 标签
dc.eventRecorder.Eventf(d, v1.EventTypeWarning, "SelectingAll", "This deployment is selecting all pods. A non-empty selector is required.")
if d.Status.ObservedGeneration < d.Generation {
d.Status.ObservedGeneration = d.Generation
dc.client.AppsV1().Deployments(d.Namespace).UpdateStatus(context.TODO(), d, metav1.UpdateOptions{})
}
return nil
}

// List ReplicaSets owned by this Deployment, while reconciling ControllerRef
// through adoption/orphaning.
// 获取 deployment 所控制的 replicaSet
rsList, err := dc.getReplicaSetsForDeployment(d)
if err != nil {
return err
}
// List all Pods owned by this Deployment, grouped by their ReplicaSet.
// Current uses of the podMap are:
//
// * check if a Pod is labeled correctly with the pod-template-hash label.
// * check that no old Pods are running in the middle of Recreate Deployments.
podMap, err := dc.getPodMapForDeployment(d, rsList)
if err != nil {
return err
}

if d.DeletionTimestamp != nil {
return dc.syncStatusOnly(d, rsList)
}

// Update deployment conditions with an Unknown condition when pausing/resuming
// a deployment. In this way, we can be sure that we won't timeout when a user
// resumes a Deployment with a set progressDeadlineSeconds.
if err = dc.checkPausedConditions(d); err != nil {
return err
}

if d.Spec.Paused {
return dc.sync(d, rsList)
}

// rollback is not re-entrant in case the underlying replica sets are updated with a new
// revision so we should ensure that we won't proceed to update replica sets until we
// make sure that the deployment has cleaned up its rollback spec in subsequent enqueues.
// 判断 deployment 事件是否是一个回滚事件
// 回滚状态发生后,为了防止底层 replicaset 被新版本更新,此时回滚状态不能被重新进入
// 直到确认 deployment 在后续队列中清除了 rollback spec
if getRollbackTo(d) != nil {
return dc.rollback(d, rsList)
}

// 判断本次 deployment 是否是一个 scale 事件
scalingEvent, err := dc.isScalingEvent(d, rsList)
if err != nil {
return err
}
if scalingEvent {
return dc.sync(d, rsList)
}

// 更新策略决定如何更新
switch d.Spec.Strategy.Type {
case apps.RecreateDeploymentStrategyType:
return dc.rolloutRecreate(d, rsList, podMap)
case apps.RollingUpdateDeploymentStrategyType:
return dc.rolloutRolling(d, rsList)
}
return fmt.Errorf("unexpected deployment strategy type: %s", d.Spec.Strategy.Type)
}

syncDeployment 方法中先后调用了 deployment、replicaset、pod 的 Lister,获取资源的状态信息,根据状态的不同,采取不同的控制逻辑,这里先暂时忽略具体控制逻辑部分的代码,这部分对应的是图中 Handle Object

Deployment 如何做到滚动更新