Skip to content

cluster

Cluster provides various methods to interact with a cluster. Cluster is initialized and stored in Manager with cluster.New.

Most of the fields in a cluster (scheme, cache, client, apiReader, recorderProvider, etc.) are used to injected to related components (Controller, EventHandlers, Sources, Predicates)

Types

1. Cluster interface

type Cluster interface {
    SetFields(interface{}) error
    GetConfig() *rest.Config
    GetScheme() *runtime.Scheme
    GetClient() client.Client
    GetFieldIndexer() client.FieldIndexer
    GetCache() cache.Cache
    GetEventRecorderFor(name string) record.EventRecorder
    GetRESTMapper() meta.RESTMapper
    GetAPIReader() client.Reader
    Start(ctx context.Context) error
}

2. cluster struct

type cluster struct {
    config *rest.Config
    scheme *runtime.Scheme // scheme is injected into Controllers, EventHandlers, Sources and Predicates.
    cache cache.Cache // injected is injected into Sources
    client client.Client // client is injected into Controllers (and EventHandlers, Sources and Predicates).
    apiReader client.Reader // apiReader is the reader that will make requests to the api server and not the cache.
    fieldIndexes client.FieldIndexer
    recorderProvider *intrec.Provider // recorderProvider is used to generate event recorders that will be injected into Controllers (and EventHandlers, Sources and Predicates).
    mapper meta.RESTMapper // mapper is used to map resources to kind, and map kind and version.
    logger logr.Logger
}

New

  1. SetOptionDefaults For more details, check below

  2. Create a mapper

    mapper, err := options.MapperProvider(config)
    

  3. Create a cache with NewCache (cache.New)

    cache, err := options.NewCache(config, cache.Options{Scheme: options.Scheme, Mapper: mapper, Resync: options.SyncPeriod, Namespace: options.Namespace})
    

    For more details, read cache

  4. Create apiReader

    apiReader, err := client.New(config, clientOptions)
    

  5. Create a writeObj with NewClient (DefaultNewClient -> NewDelegatingClient)

    writeObj, err := options.NewClient(cache, config, clientOptions, options.ClientDisableCacheFor...)
    

    if options.NewClient == nil {
        options.NewClient = DefaultNewClient
    }
    
    // DefaultNewClient creates the default caching client.
    func DefaultNewClient(cache cache.Cache, config *rest.Config, options client.Options, uncachedObjects ...client.Object) (client.Client, error) {
        c, err := client.New(config, options)
        if err != nil {
            return nil, err
        }
    
        return client.NewDelegatingClient(client.NewDelegatingClientInput{
            CacheReader:     cache,
            Client:          c,
            UncachedObjects: uncachedObjects,
        })
    }
    
    &delegatingClient{
        scheme: in.Client.Scheme(),
        mapper: in.Client.RESTMapper(),
        Reader: &delegatingReader{
            CacheReader:       in.CacheReader,
            ClientReader:      in.Client,
            scheme:            in.Client.Scheme(),
            uncachedGVKs:      uncachedGVKs,
            cacheUnstructured: in.CacheUnstructured,
        },
        Writer:       in.Client,
        StatusClient: in.Client,
    }
    
  6. Create a recorderProvider

    recorderProvider, err := options.newRecorderProvider(config, options.Scheme, options.Logger.WithName("events"), options.makeBroadcaster)
    

  7. Create cluster
    &cluster{
        config:           config,
        scheme:           options.Scheme,
        cache:            cache,
        fieldIndexes:     cache,
        client:           writeObj,
        apiReader:        apiReader,
        recorderProvider: recorderProvider,
        mapper:           mapper,
        logger:           options.Logger,
    }
    

SetOptionDefaults

name value where to use
Scheme scheme.Scheme
MapperProvider func(c *rest.Config) (meta.RESTMapper, error) {return apiutil.NewDynamicRESTMapper(c, nil)}
NewClient DefaultNewClient
NewCache cache.New
newRecorderProvider intrec.NewProvider
makeBroadcaster func() (record.EventBroadcaster, bool) {return record.NewBroadcaster(), true}
Logger logf.RuntimeLog.WithName("cluster")
  1. options.Scheme = scheme.Scheme(Use the Kubernetes client-go scheme if none is specified)
  2. MapperProvider
    options.MapperProvider = func(c *rest.Config) (meta.RESTMapper, error) {
        return apiutil.NewDynamicRESTMapper(c, nil)
    }
    
  3. options.NewClient = DefaultNewClient

    func DefaultNewClient(cache cache.Cache, config *rest.Config, options client.Options, uncachedObjects ...client.Object) (client.Client, error) {
        c, err := client.New(config, options)
        if err != nil {
            return nil, err
        }
    
        return client.NewDelegatingClient(client.NewDelegatingClientInput{
            CacheReader:     cache,
            Client:          c,
            UncachedObjects: uncachedObjects,
        })
    }
    

    GetClient() returns cluster.client, a delegatingClient by default. For more details about delegatingClient you can check client

  4. options.NewCache = cache.New

  5. options.newRecorderProvider = intrec.NewProvider
  6. record.NewBroadcaster()
  7. options.Logger = logf.RuntimeLog.WithName("cluster")

SetFields

func (c *cluster) SetFields(i interface{}) error {
    if _, err := inject.ConfigInto(c.config, i); err != nil {
        return err
    }
    if _, err := inject.ClientInto(c.client, i); err != nil {
        return err
    }
    if _, err := inject.APIReaderInto(c.apiReader, i); err != nil {
        return err
    }
    if _, err := inject.SchemeInto(c.scheme, i); err != nil {
        return err
    }
    if _, err := inject.CacheInto(c.cache, i); err != nil {
        return err
    }
    if _, err := inject.MapperInto(c.mapper, i); err != nil {
        return err
    }
    return nil
}
  1. cluster.SetFields is called in manager.SetFields
  2. cluster.SetFields injects Config, Client, APIReader, Scheme, Cache and Mapper into the specified i.
  3. manager.SetFields's usage:

    1. used for reconciler passed via builder in controller

      // Inject dependencies into Reconciler
      if err := mgr.SetFields(options.Reconciler); err != nil {
          return nil, err
      }
      

    2. used for runnables added to the Manager with add function

      // Add sets dependencies on i, and adds it to the list of Runnables to start.
      func (cm *controllerManager) Add(r Runnable) error {
          cm.Lock()
          defer cm.Unlock()
          return cm.add(r)
      }
      
      func (cm *controllerManager) add(r Runnable) error {
          // Set dependencies on the object
          if err := cm.SetFields(r); err != nil {
              return err
          }
          return cm.runnables.Add(r)
      }
      

      1. Controller is passed in controller.New