cluster
Cluster provides various methods to interact with a cluster. Cluster is initialized and stored in Manager with cluster.New.
Most of the fields in a cluster (scheme, cache, client, apiReader, recorderProvider, etc.) are used to injected to related components (Controller, EventHandlers, Sources, Predicates)
Types
1. Cluster interface
type Cluster interface {
SetFields(interface{}) error
GetConfig() *rest.Config
GetScheme() *runtime.Scheme
GetClient() client.Client
GetFieldIndexer() client.FieldIndexer
GetCache() cache.Cache
GetEventRecorderFor(name string) record.EventRecorder
GetRESTMapper() meta.RESTMapper
GetAPIReader() client.Reader
Start(ctx context.Context) error
}
2. cluster struct
type cluster struct {
config *rest.Config
scheme *runtime.Scheme // scheme is injected into Controllers, EventHandlers, Sources and Predicates.
cache cache.Cache // injected is injected into Sources
client client.Client // client is injected into Controllers (and EventHandlers, Sources and Predicates).
apiReader client.Reader // apiReader is the reader that will make requests to the api server and not the cache.
fieldIndexes client.FieldIndexer
recorderProvider *intrec.Provider // recorderProvider is used to generate event recorders that will be injected into Controllers (and EventHandlers, Sources and Predicates).
mapper meta.RESTMapper // mapper is used to map resources to kind, and map kind and version.
logger logr.Logger
}
New
-
SetOptionDefaults For more details, check below
-
Create a
mappermapper, err := options.MapperProvider(config) -
Create a
cachewithNewCache(cache.New)cache, err := options.NewCache(config, cache.Options{Scheme: options.Scheme, Mapper: mapper, Resync: options.SyncPeriod, Namespace: options.Namespace})For more details, read cache
-
Create
apiReaderapiReader, err := client.New(config, clientOptions) -
Create a
writeObjwithNewClient(DefaultNewClient -> NewDelegatingClient)writeObj, err := options.NewClient(cache, config, clientOptions, options.ClientDisableCacheFor...)if options.NewClient == nil { options.NewClient = DefaultNewClient }// DefaultNewClient creates the default caching client. func DefaultNewClient(cache cache.Cache, config *rest.Config, options client.Options, uncachedObjects ...client.Object) (client.Client, error) { c, err := client.New(config, options) if err != nil { return nil, err } return client.NewDelegatingClient(client.NewDelegatingClientInput{ CacheReader: cache, Client: c, UncachedObjects: uncachedObjects, }) }&delegatingClient{ scheme: in.Client.Scheme(), mapper: in.Client.RESTMapper(), Reader: &delegatingReader{ CacheReader: in.CacheReader, ClientReader: in.Client, scheme: in.Client.Scheme(), uncachedGVKs: uncachedGVKs, cacheUnstructured: in.CacheUnstructured, }, Writer: in.Client, StatusClient: in.Client, } -
Create a
recorderProviderrecorderProvider, err := options.newRecorderProvider(config, options.Scheme, options.Logger.WithName("events"), options.makeBroadcaster) - Create cluster
&cluster{ config: config, scheme: options.Scheme, cache: cache, fieldIndexes: cache, client: writeObj, apiReader: apiReader, recorderProvider: recorderProvider, mapper: mapper, logger: options.Logger, }
SetOptionDefaults
| name | value | where to use |
|---|---|---|
| Scheme | scheme.Scheme | |
| MapperProvider | func(c *rest.Config) (meta.RESTMapper, error) {return apiutil.NewDynamicRESTMapper(c, nil)} |
|
| NewClient | DefaultNewClient | |
| NewCache | cache.New | |
| newRecorderProvider | intrec.NewProvider | |
| makeBroadcaster | func() (record.EventBroadcaster, bool) {return record.NewBroadcaster(), true} |
|
| Logger | logf.RuntimeLog.WithName("cluster") |
options.Scheme = scheme.Scheme(Use the Kubernetes client-go scheme if none is specified)- MapperProvider
options.MapperProvider = func(c *rest.Config) (meta.RESTMapper, error) { return apiutil.NewDynamicRESTMapper(c, nil) } -
options.NewClient = DefaultNewClientfunc DefaultNewClient(cache cache.Cache, config *rest.Config, options client.Options, uncachedObjects ...client.Object) (client.Client, error) { c, err := client.New(config, options) if err != nil { return nil, err } return client.NewDelegatingClient(client.NewDelegatingClientInput{ CacheReader: cache, Client: c, UncachedObjects: uncachedObjects, }) }GetClient()returnscluster.client, a delegatingClient by default. For more details aboutdelegatingClientyou can check client -
options.NewCache = cache.New options.newRecorderProvider = intrec.NewProviderrecord.NewBroadcaster()options.Logger = logf.RuntimeLog.WithName("cluster")
SetFields
func (c *cluster) SetFields(i interface{}) error {
if _, err := inject.ConfigInto(c.config, i); err != nil {
return err
}
if _, err := inject.ClientInto(c.client, i); err != nil {
return err
}
if _, err := inject.APIReaderInto(c.apiReader, i); err != nil {
return err
}
if _, err := inject.SchemeInto(c.scheme, i); err != nil {
return err
}
if _, err := inject.CacheInto(c.cache, i); err != nil {
return err
}
if _, err := inject.MapperInto(c.mapper, i); err != nil {
return err
}
return nil
}
cluster.SetFieldsis called in manager.SetFieldscluster.SetFieldsinjectsConfig,Client,APIReader,Scheme,CacheandMapperinto the specifiedi.-
manager.SetFields's usage:
-
used for reconciler passed via builder in controller
// Inject dependencies into Reconciler if err := mgr.SetFields(options.Reconciler); err != nil { return nil, err } -
used for runnables added to the Manager with add function
// Add sets dependencies on i, and adds it to the list of Runnables to start. func (cm *controllerManager) Add(r Runnable) error { cm.Lock() defer cm.Unlock() return cm.add(r) } func (cm *controllerManager) add(r Runnable) error { // Set dependencies on the object if err := cm.SetFields(r); err != nil { return err } return cm.runnables.Add(r) }- Controller is passed in controller.New
-