Skip to content

kube-controller-manager

1. Kube Controller Manager

kube-controller-manager runs built-in controllers.

Entrypoint:

  1. controller-manager.go starts kube-controller-manager with only three lines.
    func main() {
        command := app.NewControllerManagerCommand()
        code := cli.Run(command)
        os.Exit(code)
    }
    
  2. NewControllerManagerCommand returns a cobra.Command
  3. NewControllerInitializers defines a set of controllers to start in the controller manager. Specifically, the following controllers are registered with the startXXXController with type InitFunc
    type InitFunc func(ctx context.Context, controllerCtx ControllerContext) (controller controller.Interface, enabled bool, err error)
    
    register("endpoint", startEndpointController)
    register("endpointslice", startEndpointSliceController)
    register("endpointslicemirroring", startEndpointSliceMirroringController)
    register("replicationcontroller", startReplicationController)
    register("podgc", startPodGCController)
    register("resourcequota", startResourceQuotaController)
    register("namespace", startNamespaceController)
    register("serviceaccount", startServiceAccountController)
    register("garbagecollector", startGarbageCollectorController)
    register("daemonset", startDaemonSetController)
    register("job", startJobController)
    register("deployment", startDeploymentController)
    register("replicaset", startReplicaSetController)
    register("horizontalpodautoscaling", startHPAController)
    register("disruption", startDisruptionController)
    register("statefulset", startStatefulSetController)
    register("cronjob", startCronJobController)
    register("csrsigning", startCSRSigningController)
    register("csrapproving", startCSRApprovingController)
    register("csrcleaner", startCSRCleanerController)
    register("ttl", startTTLController)
    register("bootstrapsigner", startBootstrapSignerController)
    register("tokencleaner", startTokenCleanerController)
    register("nodeipam", startNodeIpamController)
    register("nodelifecycle", startNodeLifecycleController)
    
  4. The core function Run calls StartControllers to start the controllers specified by controllers (map[string]InitFunc{} defined in the previous step)

Each Controller:

  1. InitFunc of each controller is defined in kube-controller-manager/app/core.go. e.g. startGarbageCollectorController
    garbageCollector, err := garbagecollector.NewGarbageCollector(
        gcClientset,
        metadataClient,
        controllerContext.RESTMapper,
        ignoredResources,
        controllerContext.ObjectOrMetadataInformerFactory,
        controllerContext.InformersStarted,
    )
    
  2. Each controller is defined under pkg/controller. e.g. pkg/controller/garbagecollector/garbagecollector.go
    type GarbageCollector struct {
        restMapper     meta.ResettableRESTMapper
        metadataClient metadata.Interface
        // garbage collector attempts to delete the items in attemptToDelete queue when the time is ripe.
        attemptToDelete workqueue.RateLimitingInterface
        // garbage collector attempts to orphan the dependents of the items in the attemptToOrphan queue, then deletes the items.
        attemptToOrphan        workqueue.RateLimitingInterface
        dependencyGraphBuilder *GraphBuilder
        // GC caches the owners that do not exist according to the API server.
        absentOwnerCache *ReferenceCache
    
        kubeClient       clientset.Interface
        eventBroadcaster record.EventBroadcaster
    
        workerLock sync.RWMutex
    }
    

2. Controller Overview

Components:

  • Reflector:
  • Delta FIFO queue:
  • Informer: Monitor Object's event and EventHandler is called for each event (usually add item to WorkQueue in event handlers in a controller).

    • sharedIndexInformer: Usually created for a specific resource (e.g. deploymentInformer) with NewSharedIndexInformer, which creates a new instance for the listwatcher.
      1. Indexer: indexed local cache. Indexer extends Store with multiple indices and restricts each accumulator to simply hold the current object.
      2. Controller that pulls objects/notifications using the ListerWatcher and pushes them into a DeltaFIFO.
      3. sharedProcessor responsible for relaying those notifications to each of the informer's clients. <- EventHandler is set to processorListener, which is stored in listeners of a sharedProcessor.
      4. listerWatcher for the target resource. e.g. for deploymentInformer

        &cache.ListWatch{
            ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
                if tweakListOptions != nil {
                    tweakListOptions(&options)
                }
                return client.AppsV1().Deployments(namespace).List(context.TODO(), options)
            },
            WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
                if tweakListOptions != nil {
                    tweakListOptions(&options)
                }
                return client.AppsV1().Deployments(namespace).Watch(context.TODO(), options)
            },
        },
        
  • Lister: Retrieve object from in-memory-cache.

  • WorkQueue: Store item for which Reconcile loop is executed.
  • Scheme: Scheme defines methods for serializing and deserializing API objects, a type registry for converting group, version, and kind information to and from Go schemas, and mappings between Go schemas of different versions. (ref: scheme.go)
  • processNextWorkItem: Process an item in WorkQueue.
  • syncHandler: Reconcile loop called by processNextWorkItem function (Function name can be different).

3. Built-in Controllers

3.1. EndpointsController

[Kubernetes] EndpointsがよくわかってないのでEndpointsControllerを読んでみた

3.2. GarbageCollector

![](garbagecollector.drawio.svg)

3.2.1. Components

3.2.1.1. GarbageCollector:
type GarbageCollector struct {
    restMapper     meta.ResettableRESTMapper
    metadataClient metadata.Interface
    // garbage collector attempts to delete the items in attemptToDelete queue when the time is ripe.
    attemptToDelete workqueue.RateLimitingInterface
    // garbage collector attempts to orphan the dependents of the items in the attemptToOrphan queue, then deletes the items.
    attemptToOrphan        workqueue.RateLimitingInterface
    dependencyGraphBuilder *GraphBuilder
    // GC caches the owners that do not exist according to the API server.
    absentOwnerCache *ReferenceCache

    kubeClient       clientset.Interface
    eventBroadcaster record.EventBroadcaster

    workerLock sync.RWMutex
}
  1. RestMapper: map resources to kind, and map kind and version to interfaces
3.2.1.2. GraphBuilder

builds a graph caching the dependencies among objects.

type GraphBuilder struct {
    restMapper meta.RESTMapper

    // each monitor list/watches a resource, the results are funneled to the
    // dependencyGraphBuilder
    monitors    monitors
    monitorLock sync.RWMutex
    // informersStarted is closed after after all of the controllers have been initialized and are running.
    // After that it is safe to start them here, before that it is not.
    informersStarted <-chan struct{}

    // stopCh drives shutdown. When a receive from it unblocks, monitors will shut down.
    // This channel is also protected by monitorLock.
    stopCh <-chan struct{}

    // running tracks whether Run() has been called.
    // it is protected by monitorLock.
    running bool

    eventRecorder record.EventRecorder

    metadataClient metadata.Interface
    // monitors are the producer of the graphChanges queue, graphBuilder alters
    // the in-memory graph according to the changes.
    graphChanges workqueue.RateLimitingInterface
    // uidToNode doesn't require a lock to protect, because only the
    // single-threaded GraphBuilder.processGraphChanges() reads/writes it.
    uidToNode *concurrentUIDToNode
    // GraphBuilder is the producer of attemptToDelete and attemptToOrphan, GC is the consumer.
    attemptToDelete workqueue.RateLimitingInterface
    attemptToOrphan workqueue.RateLimitingInterface
    // GraphBuilder and GC share the absentOwnerCache. Objects that are known to
    // be non-existent are added to the cached.
    absentOwnerCache *ReferenceCache
    sharedInformers  informerfactory.InformerFactory
    ignoredResources map[schema.GroupResource]struct{}
}
  1. monitors: a set of monitors, each of which runs a cache.Controller (1. construct and run a Reflector and pumps objects/notifications to the Config.Queue. 2. pop from the queue and process with Config.ProcessFunc).
  2. graphChanges: workqueue to store events from informer (input of all process)
  3. absentOwnerCache: Objects that are known to be non-existent are added to the cached.
  4. uidToNode (graph): a pointer of concurrentUIDToNode
    type concurrentUIDToNode struct {
        uidToNodeLock sync.RWMutex
        uidToNode     map[types.UID]*node
    }
    
  5. node
    type node struct {
        identity objectReference
        // dependents will be read by the orphan() routine, we need to protect it with a lock.
        dependentsLock sync.RWMutex
        // dependents are the nodes that have node.identity as a
        // metadata.ownerReference.
        dependents map[*node]struct{}
        // this is set by processGraphChanges() if the object has non-nil DeletionTimestamp
        // and has the FinalizerDeleteDependents.
        deletingDependents     bool
        deletingDependentsLock sync.RWMutex
        // this records if the object's deletionTimestamp is non-nil.
        beingDeleted     bool
        beingDeletedLock sync.RWMutex
        // this records if the object was constructed virtually and never observed via informer event
        virtual     bool
        virtualLock sync.RWMutex
        // when processing an Update event, we need to compare the updated
        // ownerReferences with the owners recorded in the graph.
        owners []metav1.OwnerReference
    }
    
    1. isObserved: !virtual
    2. vitual: In attemptToDeleteItem, if there's no corresponding object in API server or the latest uid is not same as item.identity.UID, the item is added back to graphChanges with virtual = true

3.2.2. Steps

NewGarbageCollector:

  1. GarbageCollector and GraphBuilder are initialized

GarbageCollector.Run():

  1. Start gc.dependencyGraphBuilder.Run() (wait until cache is synced)
    go gc.dependencyGraphBuilder.Run(ctx.Done())
    
    1. Start gb.startMonitors(): ensure the current set of monitors are running. Start each of the monitors
      gb.sharedInformers.Start(gb.stopCh)
      go monitor.Run()
      
    2. Run runProcessGraphChanges every second
      wait.Until(gb.runProcessGraphChanges, 1*time.Second, stopCh)
      
    3. runProcessGraphChanges calls processGraphChanges in a for loop.
      func (gb *GraphBuilder) runProcessGraphChanges() {
          for gb.processGraphChanges() {
          }
      }
      
    4. processGraphChanges: Get an item from graphChanges and put the corresponding node to attemptToDelete or attemptToOrphan queue.
      1. if the node in uidToNode is not observed and now observed -> node.markObserved() (Add a potentially invalid dependent to attemptToDelete queue) ref
      2. [addEvent or updateEvent] if not found in uidToNode, insertNode + processTransitions ref
      3. [addEvent or updateEvent] if found in uidToNode, reflect changes in ownerReferences and if being deleted, markBeingDeleted() + processTransitions ref
      4. [deleteEvent] if found
        1. if event.virtual (event from GarbageCollector) -> in some case set removeExistingNode to false as it's not certain. Detail: ref
        2. if removeExistingNode, removeNode, add dependents to attemptToDelete, and add owners to attemptToDelete
  2. Start gc workers
    // gc workers
    for i := 0; i < workers; i++ {
        go wait.UntilWithContext(ctx, gc.runAttemptToDeleteWorker, 1*time.Second)
        go wait.Until(gc.runAttemptToOrphanWorker, 1*time.Second, ctx.Done())
    }
    
    1. run runAttemptToDeleteWorker()
      func (gc *GarbageCollector) runAttemptToDeleteWorker(ctx context.Context) {
          for gc.processAttemptToDeleteWorker(ctx) {
          }
      }
      
      processAttemptToDeleteWorker:
      1. Get an item (node) from attemptToDelete queue.
        action := gc.attemptToDeleteWorker(ctx, item)
        
      2. Process it in attemptToDeleteWorker
        1. In case the node, converted from the item in the queue, is not observed (meaning that it's added from objectReference whose object is not found in API server), forget the item if it doesn't exist in the graph or it's observed. ref
      3. Delete the item with attemptToDeleteItem
        err := gc.attemptToDeleteItem(ctx, n)
        
        1. item.isBeingDeleted & !item.isDeletingDependents -> Do nothing and return nil
        2. Get the latest object from API server
          latest, err := gc.getObject(item.identity)
          
        3. err=NotFound -> enqueueVirtualDeleteEvent(enqueue event to graphChanges with virtual=true)
          gc.dependencyGraphBuilder.enqueueVirtualDeleteEvent(item.identity)
          
        4. latest.GetUID() != item.identity.UID -> enqueueVirtualDeleteEvent(enqueue event to graphChanges with virtual=true) and return enqueuedVirtualDeleteEventErr
          gc.dependencyGraphBuilder.enqueueVirtualDeleteEvent(item.identity)
          
        5. item.isDeletingDependents() -> gc.processDeletingDependentsItem(item) and return enqueuedVirtualDeleteEventErr
          1. If no blockingDependents -> gc.removeFinalizer(item, metav1.FinalizerDeleteDependents)
          2. For blockingDependents, if not isBeingDeleted -> gc.attemptToDelete.Add(dep)
        6. If there's no OwnerReferences -> nil
        7. Classify ownerReferences
          solid, dangling, waitingForDependentsDeletion, err := gc.classifyReferences(ctx, item, ownerReferences)
          
        8. len(solid) != 0
          1. len(dangling) == 0 && len(waitingForDependentsDeletion) == 0 -> return nil
          2. Delete owner references for dangling and waitingForDependentsDeletion (send PATCH request to API server)
        9. len(waitingForDependentsDeletion) != 0 && item.dependentsLength() != 0 for all dependents, if isBeingDeleted -> send unblockOwnerReferencesStrategicMergePatch PATCH request to API server
          1. delete with for DeletePropagationForeground (API server)
        10. default
          1. hasOrphanFinalizer -> delete with DeletePropagationOrphan
          2. hasDeleteDependentsFinalizer -> delete with DeletePropagationForeground
          3. default -> delete with DeletePropagationBackground
    2. run runAttemptToOrphanWorker() every second
      1. Get owner from attemptToOrphan
      2. attemptToOrphanWorker
        1. orphanDependents(owner, dependents): remove owner references via PATCH (API server)
        2. gc.removeFinalizer(owner, metav1.FinalizerOrphanDependents)

GarbageCollector.Sync keeps updating the resources to monitor periodically. -> GraphBuilder.syncMonitors(resources) but not found where it's called.

TestCase:

  1. TestCascadingDeletion:
    1. Pod with ownerreference to toBeDeletedRC replicationcontroller -> deleted
    2. Pod with ownerrefrerece to remainingRC and toBeDeletedRC replicatioincontroller -> remain
    3. Pod without ownerreference -> remain
    4. Delete toBeDeletedRCName replicationcontroller
  2. TestOrphaning

    1. If deleted with DeleteOptions.propagationPolicy=Orphan, the ownerreferences are just removed without deleting the object itself.
    kubectl create deploy nginx --image=nginx --replicas=1
    deployment.apps/nginx created
    kubectl get deploy
    NAME    READY   UP-TO-DATE   AVAILABLE   AGE
    nginx   1/1     1            1           3m25s
    kubectl delete deployment nginx --cascade=orphan
    deployment.apps "nginx" deleted
    kubectl get deploy
    No resources found in default namespace.
    kubectl get rs
    NAME              DESIRED   CURRENT   READY   AGE
    nginx-76d6c9b8c   1         1         1       3m40s
    kubectl get pod
    NAME                    READY   STATUS    RESTARTS   AGE
    nginx-76d6c9b8c-jcwxv   1/1     Running   0          3m46s
    

3.2.3. Ref

  1. gabagecollector.go
  2. graph_builder.go
  3. test/integration/gabagecollector/garbage_collector_test.go
  4. Enable garbage collection of custom resources

3.2.4. Memo

  1. Delete
    1. propagationPolicy=Forground -> API-server updates metadata.deletionTimestamp and adds foregroundDeletion finalizer instead of removing the object itself.
      1. curl: with '{"kind":"DeleteOptions","apiVersion":"v1","propagationPolicy":"Foreground"}'
      2. kubectl: kubectl delete --cascade=foregraound
    2. PropagationPolicy=Backgound (default) -> API-server immediately deletes the object and its dependents.
      1. curl: with '{"kind":"DeleteOptions","apiVersion":"v1","propagationPolicy":"Background"}'
      2. kubectl delete --cascade=background (or kubectl delete)
    3. PropagationPolicy=Orphan -> API-server deletes the object but not deletes the dependent objects. Instead its dependent objects remain as orphans
      1. curl: with '{"kind":"DeleteOptions","apiVersion":"v1","propagationPolicy":"Orphan"}'
      2. kubectl delete --cascade=orphan
  2. OwnerReferences
  3. Finalizers

3.3. NamespaceController

// NamespaceController is responsible for performing actions dependent upon a namespace phase
type NamespaceController struct {
    // lister that can list namespaces from a shared cache
    lister corelisters.NamespaceLister
    // returns true when the namespace cache is ready
    listerSynced cache.InformerSynced
    // namespaces that have been queued up for processing by workers
    queue workqueue.RateLimitingInterface
    // helper to delete all resources in the namespace when the namespace is deleted.
    namespacedResourcesDeleter deletion.NamespacedResourcesDeleterInterface
}
  1. API: https://github.com/kubernetes/api/blob/master/core/v1/types.go#L5601
  2. startNamespaceController

    namespaceController := namespacecontroller.NewNamespaceController(
        ctx,
        namespaceKubeClient,
        metadataClient,
        discoverResourcesFn,
        controllerContext.InformerFactory.Core().V1().Namespaces(),
        controllerContext.ComponentConfig.NamespaceController.NamespaceSyncPeriod.Duration,
        v1.FinalizerKubernetes,
    )
    go namespaceController.Run(ctx, int(controllerContext.ComponentConfig.NamespaceController.ConcurrentNamespaceSyncs))
    
    FinalizerKubernetes
    const (
        FinalizerKubernetes FinalizerName = "kubernetes"
    )
    

  3. NewNamespaceController:

    1. Init NamespaceController
      namespaceController := &NamespaceController{
          queue:                      workqueue.NewNamedRateLimitingQueue(nsControllerRateLimiter(), "namespace"),
          namespacedResourcesDeleter: deletion.NewNamespacedResourcesDeleter(kubeClient.CoreV1().Namespaces(), metadataClient, kubeClient.CoreV1(), discoverResourcesFn, finalizerToken),
      }
      
    2. Prepare event handler for namespaceInformer
      namespaceInformer.Informer().AddEventHandlerWithResyncPeriod(
          cache.ResourceEventHandlerFuncs{
              AddFunc: func(obj interface{}) {
                  namespace := obj.(*v1.Namespace)
                  namespaceController.enqueueNamespace(namespace)
              },
              UpdateFunc: func(oldObj, newObj interface{}) {
                  namespace := newObj.(*v1.Namespace)
                  namespaceController.enqueueNamespace(namespace)
              },
          },
          resyncPeriod,
      )
      
  4. Run -> worker -> workFunc
  5. workFunc:
    1. get key from queue
      key, quit := nm.queue.Get()
      
    2. sync
      err := nm.syncNamespaceFromKey(key.(string))
      
    3. syncNamespaceFromKey:
      1. get namespace
        namespace, err := nm.lister.Get(key)
        
      2. run deleter
        nm.namespacedResourcesDeleter.Delete(namespace.Name)
        
        1. Get namespace
          namespace, err := d.nsClient.Get(context.TODO(), nsName, metav1.GetOptions{})
          
        2. namespace.DeletionTimestamp == nil -> return nil
        3. Update namespace status to terminating
          namespace, err = d.retryOnConflictError(namespace, d.updateNamespaceStatusFunc)
          
          newNamespace.Status.Phase = v1.NamespaceTerminating
        4. Delete all contents in the namespace
          estimate, err := d.deleteAllContent(namespace)
          
        5. finalizeNamespace removes the specified finalizerToken and finalizes the namespace
          _, err = d.retryOnConflictError(namespace, d.finalizeNamespace)
          

3.4. DeploymentController

ToDo

References