5.1. Create Controller
5.1.1. Overview
5.1.1. Implement
- Define
Controller
struct withsampleclientset
andfoosSynced
.type Controller struct { sampleclientset clientset.Interface fooSynced cache.InformerSynced // a function that can be used to determine if an informer has synced }
- Define
NewController
function- Create
Controller
with the argumentssampleclientset
andfooInformer
, which will be passed inmain.go
. - Add event handlers for
AddFunc
andDeleteFunc
to the informer. - Return the controller.
- Create
- Define
Run
, which will be called inmain.go
.- Wait until the cache is synced.
- Run
c.runWorker
repeatedly every second until the stop channel is closed.
- Define
runWorker
: do nothing
package main
import (
"time"
clientset "github.com/nakamasato/sample-controller/pkg/generated/clientset/versioned"
informers "github.com/nakamasato/sample-controller/pkg/generated/informers/externalversions/example.com/v1alpha1"
listers "github.com/nakamasato/sample-controller/pkg/generated/listers/example.com/v1alpha1"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/util/workqueue"
"k8s.io/klog/v2"
)
type Controller struct {
// sampleclientset is a clientset for our own API group
sampleclientset clientset.Interface
foosSynced cache.InformerSynced // cache is synced for foo
}
func NewController(sampleclientset clientset.Interface, fooInformer informers.FooInformer) *Controller {
controller := &Controller{
sampleclientset: sampleclientset,
foosSynced: fooInformer.Informer().HasSynced,
}
_, err := fooInformer.Informer().AddEventHandler(
cache.ResourceEventHandlerFuncs{
AddFunc: controller.handleAdd,
DeleteFunc: controller.handleDelete,
},
)
if err != nil {
klog.Fatalf("error occurred when adding event handler %s", err.Error())
klog.FlushAndExit(klog.ExitFlushTimeout, 1)
}
return controller
}
func (c *Controller) Run(stopCh chan struct{}) error {
if ok := cache.WaitForCacheSync(stopCh, c.foosSynced); !ok {
return fmt.Errorf("failed to wait for caches to sync")
}
go wait.Until(c.runWorker, time.Second, stopCh)
<-stopCh
return nil
}
func (c *Controller) runWorker() {
klog.Info("runWorker is called")
}
func (c *Controller) handleAdd(obj interface{}) {
klog.Info("handleAdd is called")
}
func (c *Controller) handleDelete(obj interface{}) {
klog.Info("handleDelete is called")
}
main.go:
import (
- "context"
"flag"
"path/filepath"
+ "time"
"k8s.io/client-go/tools/clientcmd"
"k8s.io/client-go/util/homedir"
"k8s.io/klog/v2"
- metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
clientset "github.com/nakamasato/sample-controller/pkg/generated/clientset/versioned"
+ informers "github.com/nakamasato/sample-controller/pkg/generated/informers/externalversions"
)
func main() {
@@ -34,9 +34,11 @@ func main() {
klog.Fatalf("Error building example clientset: %s", err.Error())
}
- foos, err := exampleClient.ExampleV1alpha1().Foos("").List(context
.Background(), metav1.ListOptions{})
- if err != nil {
- klog.Fatalf("listing foos %s", err.Error())
+ exampleInformerFactory := informers.NewSharedInformerFactory(examp
leClient, time.Second*30)
+ stopCh := make(chan struct{})
+ controller := NewController(exampleClient, exampleInformerFactory.
Example().V1alpha1().Foos())
+ exampleInformerFactory.Start(stopCh)
+ if err = controller.Run(stopCh); err != nil {
+ klog.Fatalf("error occurred when running controller %s",
err.Error())
}
- klog.Infof("length of foos is %d", len(foos.Items))
}
At the line of exampleInformerFactory := informers.NewSharedInformerFactory(exampleClient, time.Second*30)
, the second argument specifies ResyncPeriod, which defines the interval of resync (The resync operation consists of delivering to the handler an update notification for every object in the informer’s local cache). For more detail, please read NewSharedIndexInformer
package main
import (
"flag"
"path/filepath"
"time"
"k8s.io/client-go/tools/clientcmd"
"k8s.io/client-go/util/homedir"
"k8s.io/klog/v2"
clientset "github.com/nakamasato/sample-controller/pkg/generated/clientset/versioned"
informers "github.com/nakamasato/sample-controller/pkg/generated/informers/externalversions"
)
func main() {
klog.InitFlags(nil)
var kubeconfig *string
if home := homedir.HomeDir(); home != "" {
kubeconfig = flag.String("kubeconfig", filepath.Join(home, ".kube", "config"), "(optional)")
} else {
kubeconfig = flag.String("kubeconfig", "", "absolute path to kubeconfig file")
}
flag.Parse()
config, err := clientcmd.BuildConfigFromFlags("", *kubeconfig)
if err != nil {
klog.Fatalf("Error building kubeconfig: %s", err.Error())
}
exampleClient, err := clientset.NewForConfig(config)
if err != nil {
klog.Fatalf("Error building example clientset: %s", err.Error())
}
exampleInformerFactory := informers.NewSharedInformerFactory(exampleClient, time.Second*30)
stopCh := make(chan struct{})
controller := NewController(exampleClient, exampleInformerFactory.Example().V1alpha1().Foos())
exampleInformerFactory.Start(stopCh)
if err = controller.Run(stopCh); err != nil {
klog.Fatalf("error occurred when running controller %s", err.Error())
}
}
5.1.3 Run and check
Run the controller.
go run .
Create and delete CR.
kubectl apply -f config/sample/foo.yaml
kubectl delete -f config/sample/foo.yaml
Check the controller logs.
2022/07/18 06:36:35 handleAdd is called 2022/07/18 06:36:40 handleDelete is called
5.2. Fetch Foo object
5.2.1. Overview
Implement the following logic:
- Add
workqueue
andlisters
toController
struct. - Add the key for the triggerred object to
workqueue
inhandleAdd
andhandleUpdate
. (e.g.Foo
-><namespace>/<name>
) - Get a
workqueue
item (<namespace>/<name>
). - Get the
Foo
resource with namespace and name from thelister
. - Forget the item from
workqueue
.
5.2.2. Implement
Add
workqueue
andlisters
.import ( ... "k8s.io/client-go/util/workqueue" ... listers "github.com/nakamasato/sample-controller/pkg/generated/listers/example.com/v1alpha1" )
type Controller struct { sampleclientset clientset.Interface fooSynced cache.InformerSynced + foosLister listers.FooLister + workqueue workqueue.RateLimitingInterface }
controller := &Controller{ sampleclientset: sampleclientset, fooSynced: fooInformer.Informer().HasSynced, + foosLister: fooInformer.Lister(), + workqueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "foo"), }
Define
enqueueFoo
to convert Foo resource into namespace/name string before putting into the workqueue.func (c *Controller) handleAdd(obj interface{}) { klog.Info("handleAdd is called") c.enqueueFoo(obj) } func (c *Controller) handleDelete(obj interface{}) { klog.Info("handleDelete is called") c.enqueueFoo(obj) } // enqueueFoo takes a Foo resource and converts it into a namespace/name // string which is then put onto the work queue. This method should *not* be // passed resources of any type other than Foo. func (c *Controller) enqueueFoo(obj interface{}) { var key string var err error if key, err = cache.MetaNamespaceKeyFunc(obj); err != nil { klog.Errorf("failed to get key from the cache %s", err.Error()) return } c.workqueue.Add(key) }
Create
processNextWorkItem
function that processes a queue item from workqueue.func (c *Controller) processNextWorkItem() bool { obj, shutdown := c.workqueue.Get() if shutdown { return false } // wrap this block in a func to use defer c.workqueue.Done err := func(obj interface{}) error { // call Done to tell workqueue that the item was finished processing defer c.workqueue.Done(obj) var key string var ok bool if key, ok = obj.(string); !ok { // As the item in the workqueue is actually invalid, we call // Forget here else we'd go into a loop of attempting to // process a work item that is invalid. c.workqueue.Forget(obj) klog.Errorf("expected string in workqueue but got %#v", obj) return nil } ns, name, err := cache.SplitMetaNamespaceKey(key) if err != nil { klog.Errorf("failed to split key into namespace and name %s", err.Error()) return err } // temporary main logic foo, err := c.foosLister.Foos(ns).Get(name) if err != nil { klog.Errorf("failed to get foo resource from lister %s", err.Error()) return err } klog.Infof("Got foo %+v", foo.Spec) // Forget the queue item as it's successfully processed and // the item will not be requeued. c.workqueue.Forget(obj) klog.Infof("Successfully synced '%s'", key) return nil }(obj) if err != nil { return true } return true }
Call
processNextWorkItem
inrunWork
.func (c *Controller) runWorker() { for c.processNextWorkItem() { } }
5.2.3. Run and check
Run the controller.
go run .
Create and delete CR.
kubectl apply -f config/sample/foo.yaml
kubectl delete -f config/sample/foo.yaml
Check the controller logs.
2022/07/18 07:46:42 handleAdd is called 2022/07/18 07:46:42 Got foo {DeploymentName:foo-sample Replicas:0x1400030194c} 2022/07/18 07:46:42 Successfully synced 'default/foo-sample' 2022/07/18 07:46:49 handleDelete is called 2022/07/18 07:46:49 failed to get foo resource from lister foo.example.com "foo-sample" not found
5.3. Create/Delete Deployment for Foo resource
5.3.1. Overview
In this section, we’ll add a logic to create a Deployment
for Foo
resource.
The logic to implement is:
- Add
clientset
,informer
andlister
forDeployment
toController
. - Initialize
clientset
andinformer
inmain.go
and pass them to a Controller when initializing it. - In
syncHandler
func, add a logic to create a Deployment if there doesn’t exist a Deployment with namefoo.spec.deploymentName
in the same namespace as Foo object. - Set
OwnerReferences
for a new Deployment innewDeployment
so that the Deployment will be cleaned up when the owner,Foo
object, is deleted.
5.3.2. Implement
Import the necessary packages.
import ( ... appsinformers "k8s.io/client-go/informers/apps/v1" "k8s.io/client-go/kubernetes" appslisters "k8s.io/client-go/listers/apps/v1" )
Add fields (
kubeclientset
,deploymentsLister
, anddeploymentsSynced
) toController
.type Controller struct { + // kubeclientset is a standard kubernetes clientset + kubeclientset kubernetes.Interface // sampleclientset is a clientset for our own API group sampleclientset clientset.Interface + deploymentsLister appslisters.DeploymentLister + deploymentsSynced cache.InformerSynced + foosLister listers.FooLister foosSynced cache.InformerSynced // cache is synced for foo @@ -24,12 +39,19 @@ type Controller struct { workqueue workqueue.RateLimitingInterface }
Update
NewController
as follows:-func NewController(sampleclientset clientset.Interface, fooInformer informers.FooInformer) *Controller { +func NewController( + kubeclientset kubernetes.Interface, + sampleclientset clientset.Interface, + deploymentInformer appsinformers.DeploymentInformer, + fooInformer informers.FooInformer) *Controller { controller := &Controller{ - sampleclientset: sampleclientset, - foosSynced: fooInformer.Informer().HasSynced, - foosLister: fooInformer.Lister(), - workqueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "foo"), + kubeclientset: kubeclientset, + sampleclientset: sampleclientset, + deploymentsLister: deploymentInformer.Lister(), + deploymentsSynced: deploymentInformer.Informer().HasSynced, + foosLister: fooInformer.Lister(), + foosSynced: fooInformer.Informer().HasSynced, + workqueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "foo"), }
Update
main.go
to pass the added arguments toNewController
.import ( ... + kubeinformers "k8s.io/client-go/informers" + "k8s.io/client-go/kubernetes" ... )
func main() { ... + kubeClient, err := kubernetes.NewForConfig(config) + if err != nil { + klog.Errorf("getting kubernetes client set %s", err.Error()) + } + exampleClient, err := clientset.NewForConfig(config) if err != nil { klog.Errorf("getting client set %s", err.Error()) } + kubeInformerFactory := kubeinformers.NewSharedInformerFactory(kubeClient, time.Second*30) exampleInformerFactory := informers.NewSharedInformerFactory(exampleClient, time.Second*30) stopCh := make(chan struct{}) - controller := NewController(exampleClient, exampleInformerFactory.Example().V1alpha1().Foos()) + controller := NewController( + kubeClient, + exampleClient, + kubeInformerFactory.Apps().V1().Deployments(), + exampleInformerFactory.Example().V1alpha1().Foos(), + ) + kubeInformerFactory.Start(stopCh) ... }
Create
syncHandler
andnewDeployment
incontroller.go
.func (c *Controller) syncHandler(key string) error { ns, name, err := cache.SplitMetaNamespaceKey(key) if err != nil { klog.Errorf("failed to split key into namespace and name %s", err.Error()) return err } foo, err := c.foosLister.Foos(ns).Get(name) if err != nil { klog.Errorf("failed to get foo resource from lister %s", err.Error()) if errors.IsNotFound(err) { return nil } return err } deploymentName := foo.Spec.DeploymentName if deploymentName == "" { klog.Errorf("deploymentName must be specified %s", key) return nil } deployment, err := c.deploymentsLister.Deployments(foo.Namespace).Get(deploymentName) if errors.IsNotFound(err) { deployment, err = c.kubeclientset.AppsV1().Deployments(foo.Namespace).Create(context.TODO(), newDeployment(foo), metav1.CreateOptions{}) } if err != nil { return err } klog.Infof("deployment %s is valid", deployment.Name) return nil } func newDeployment(foo *samplev1alpha1.Foo) *appsv1.Deployment { labels := map[string]string{ "app": "nginx", "controller": foo.Name, } return &appsv1.Deployment{ ObjectMeta: metav1.ObjectMeta{ Name: foo.Spec.DeploymentName, Namespace: foo.Namespace, OwnerReferences: []metav1.OwnerReference{*metav1.NewControllerRef(foo, samplev1alpha1.SchemeGroupVersion.WithKind("Foo"))}, }, Spec: appsv1.DeploymentSpec{ Replicas: foo.Spec.Replicas, Selector: &metav1.LabelSelector{ MatchLabels: labels, }, Template: corev1.PodTemplateSpec{ ObjectMeta: metav1.ObjectMeta{ Labels: labels, }, Spec: corev1.PodSpec{ Containers: []corev1.Container{ { Name: "nginx", Image: "nginx:latest", }, }, }, }, }, } }
Add necessary imports
import ( "context" "fmt" appsv1 "k8s.io/api/apps/v1" corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/api/errors" samplev1alpha1 "github.com/nakamasato/sample-controller/pkg/apis/example.com/v1alpha1" )
Update
processNextWorkItem
to callsyncHandler
for main logic.@@ -77,20 +99,12 @@ func (c *Controller) processNextWorkItem() bool { return nil } - ns, name, err := cache.SplitMetaNamespaceKey(key) - if err != nil { - klog.Errorf("failed to split key into namespace and name %s", err.Error()) - return err + if err := c.syncHandler(key); err != nil { + // Put the item back on the workqueue to handle any transient errors. + c.workqueue.AddRateLimited(key) + return fmt.Errorf("error syncing '%s': %s, requeuing", key, err.Error()) } - // temporary main logic - foo, err := c.foosLister.Foos(ns).Get(name) - if err != nil { - klog.Errorf("failed to get foo resource from lister %s", err.Error()) - return err - } - klog.Infof("Got foo %+v", foo.Spec) - // Forget the queue item as it's successfully processed and // the item will not be requeued. c.workqueue.Forget(obj)
Delete
handleDelete
function as it’s covered byownerReferences
(details mentioned in the next step) for delete action.fooInformer.Informer().AddEventHandler( cache.ResourceEventHandlerFuncs{ AddFunc: controller.handleAdd, - DeleteFunc: controller.handleDelete, }, )
-func (c *Controller) handleDelete(obj interface{}) { - klog.Info("handleDelete is called") - c.enqueueFoo(obj) -}
5.3.3. Run and check
Run the controller.
go run .
Create
Foo
resource.kubectl apply -f config/sample/foo.yaml
Check
Deployment
:kubectl get deploy NAME READY UP-TO-DATE AVAILABLE AGE foo-sample 0/1 1 0 3s
Check
sample-controller
’s logs:2022/07/18 09:33:31 handleAdd is called 2022/07/18 09:33:31 deployment foo-sample is valid 2022/07/18 09:33:31 Successfully synced 'default/foo-sample'
Delete
Foo
resource.kubectl delete -f config/sample/foo.yaml
Check
Deployment
:kubectl get deploy No resources found in default namespace.
Deployment
is deleted when the correspondingFoo
is deleted thanks toOwnerReference
’s cascading deletion feature:Kubernetes checks for and deletes objects that no longer have owner references, like the pods left behind when you delete a ReplicaSet. When you delete an object, you can control whether Kubernetes deletes the object’s dependents automatically, in a process called cascading deletion.
5.4. Check and update Deployment if necessary
5.4.1. Overview
What needs to be done:
- In
syncHandler
- Check if the found
Deployment
is managed by thesample-controller
. - Check if the found
Deployment
’sreplicas
is same as the specifiedreplica
inFoo
resource.
- Check if the found
- In
NewController
- Set
UpdateFunc
as an event handler for the informer in order to callsyncHandler
whenFoo
resource is updated.
- Set
5.4.2. Implement
Add constant variable before
type Controller struct
.const ( // MessageResourceExists is the message used for Events when a resource // fails to sync due to a Deployment already existing MessageResourceExists = "Resource %q already exists and is not managed by Foo" )
Update
syncHandler
:Remove log
klog.Infof("deployment %s is valid", deployment.Name)
.Check if the
Deployment
is managed by the controller.// If the Deployment is not controlled by this Foo resource, we should log // a warning to the event recorder and return error msg. if !metav1.IsControlledBy(deployment, foo) { msg := fmt.Sprintf(MessageResourceExists, deployment.Name) klog.Info(msg) return fmt.Errorf("%s", msg) }
Check the replica and update
Deployment
object if replicas inDeployment
andFoo
differ.// If this number of the replicas on the Foo resource is specified, and the // number does not equal the current desired replicas on the Deployment, we // should update the Deployment resource. if foo.Spec.Replicas != nil && *foo.Spec.Replicas != *deployment.Spec.Replicas { klog.Infof("Foo %s replicas: %d, deployment replicas: %d", name, *foo.Spec.Replicas, *deployment.Spec.Replicas) deployment, err = c.kubeclientset.AppsV1().Deployments(foo.Namespace).Update(context.TODO(), newDeployment(foo), metav1.UpdateOptions{}) } // If an error occurs during Update, we'll requeue the item so we can // attempt processing again later. This could have been caused by a // temporary network failure, or any other transient reason. if err != nil { return err }
Update event handlers in
NewController
:_, err := fooInformer.Informer().AddEventHandler( cache.ResourceEventHandlerFuncs{ - AddFunc: controller.handleAdd, + AddFunc: controller.enqueueFoo, + UpdateFunc: func(old, new interface{}) { + controller.enqueueFoo(new) + }, }, )
Remove unused
handleAdd
function.-func (c *Controller) handleAdd(obj interface{}) { - klog.Info("handleAdd is called") - c.enqueueFoo(obj) -}
5.4.3. Run and check: sample-controller
updates replicas
Run the controller.
go run .
Apply
Foo
resource.kubectl apply -f config/sample/foo.yaml
kubectl get deploy NAME READY UP-TO-DATE AVAILABLE AGE foo-sample 1/1 1 1 3h41m
Increase Foo’s replica to 2.
kubectl patch foo foo-sample -p '{"spec":{"replicas": 2}}' --type=merge
logs:
2022/07/19 09:55:00 Foo foo-sample replicas: 2, deployment replicas: 1 2022/07/19 09:55:00 Successfully synced 'default/foo-sample'
Replicas of Deployment increased.
kubectl get deploy NAME READY UP-TO-DATE AVAILABLE AGE foo-sample 2/2 2 2 3h42m
Delete
Foo
resource.kubectl delete -f config/sample/foo.yaml
5.4.4. Run and check: sample-controller
wouldn’t update Deployment
that is not managed by the controller
Run the controller.
go run .
Apply
Deployment
with namefoo-sample
.kubectl create deployment foo-sample --image=nginx
Apply
Foo
resource with namefoo-sample
.kubectl apply -f config/sample/foo.yaml
Log:
2022/07/19 09:58:27 Resource "foo-sample" already exists and is not managed by Foo
Clean up.
kubectl delete -f config/sample/foo.yaml kubectl delete deploy foo-sample
5.5. Update Foo status
5.5.1. Overview
- Enable
status
subresource inCustomResourceDefinition
. - Store Deployment’s
availableReplicas
in Foo object’s status withUpdateStatus
function.
5.5.2. Implement
Create
updateFooStatus
function.func (c *Controller) updateFooStatus(foo *samplev1alpha1.Foo, deployment *appsv1.Deployment) error { // NEVER modify objects from the store. It's a read-only, local cache. // You can use DeepCopy() to make a deep copy of original object and modify this copy // Or create a copy manually for better performance fooCopy := foo.DeepCopy() fooCopy.Status.AvailableReplicas = deployment.Status.AvailableReplicas // If the CustomResourceSubresources feature gate is not enabled, // we must use Update instead of UpdateStatus to update the Status block of the Foo resource. // UpdateStatus will not allow changes to the Spec of the resource, // which is ideal for ensuring nothing other than resource status has been updated. _, err := c.sampleclientset.ExampleV1alpha1().Foos(foo.Namespace).UpdateStatus(context.TODO(), fooCopy, metav1.UpdateOptions{}) return err }
Add the logic at the end of
syncHandler
func (c *Controller) syncHandler() { ... // Finally, we update the status block of the Foo resource to reflect the // current state of the world err = c.updateFooStatus(foo, deployment) if err != nil { klog.Errorf("failed to update Foo status for %s", foo.Name) return err } return nil }
Add
subresources
toCustomResourceDefinition
inconfig/crd/foos.yaml
.subresources: status: {}
For more details, see subresources
5.5.3. Run and check
Apply the changes in CRD.
kubectl apply -f config/crd/foos.yaml
Run the controller.
go run .
Apply
Foo
kubectl apply -f config/sample/foo.yaml
Check status (not updated immediately -> will be fixed in the next section.)
kubectl get foo foo-sample -o jsonpath='{.status}' {"availableReplicas":0}%
Currently, the informer just monitors
Foo
resource, which cannot capture the update ofDeployment.status.availableReplicas
.Check status after a while
kubectl get foo foo-sample -o jsonpath='{.status}' {"availableReplicas":1}%
Delete
Foo
.kubectl delete -f config/sample/foo.yaml
5.6. Capture the update of Deployment
5.6.1. Overview
In the previous section, status.availableReplicas
is not updated immediately. This is because sample-contrller just monitors our custom resource Foo
. In this section, we’ll enable to capture changes of Deployments controlled by Foo
.
5.6.2. Implement
Add
handleObject
function.// handleObject will take any resource implementing metav1.Object and attempt // to find the Foo resource that 'owns' it. It does this by looking at the // objects metadata.ownerReferences field for an appropriate OwnerReference. // It then enqueues that Foo resource to be processed. If the object does not // have an appropriate OwnerReference, it will simply be skipped. func (c *Controller) handleObject(obj interface{}) { var object metav1.Object var ok bool if object, ok = obj.(metav1.Object); !ok { tombstone, ok := obj.(cache.DeletedFinalStateUnknown) if !ok { return } object, ok = tombstone.Obj.(metav1.Object) if !ok { return } klog.Infof("Recovered deleted object '%s' from tombstone", object.GetName()) } klog.Infof("Processing object: %s", object.GetName()) if ownerRef := metav1.GetControllerOf(object); ownerRef != nil { // If this object is not owned by a Foo, we should not do anything more // with it. if ownerRef.Kind != "Foo" { return } foo, err := c.foosLister.Foos(object.GetNamespace()).Get(ownerRef.Name) if err != nil { klog.Errorf("ignoring orphaned object '%s' of foo '%s'", object.GetSelfLink(), ownerRef.Name) return } c.enqueueFoo(foo) return } }
When
Deployment
managed byFoo
is added/updated/deleted, get the correspondingFoo
and put the key (namespace/name
) to the workqueue.Add event handlers to
deploymentInformer
inNewController
.// Set up an event handler for when Deployment resources change. This // handler will lookup the owner of the given Deployment, and if it is // owned by a Foo resource then the handler will enqueue that Foo resource for // processing. This way, we don't need to implement custom logic for // handling Deployment resources. More info on this pattern: // https://github.com/kubernetes/community/blob/8cafef897a22026d42f5e5bb3f104febe7e29830/contributors/devel/controllers.md _, err = deploymentInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ AddFunc: controller.handleObject, UpdateFunc: func(old, new interface{}) { newDepl := new.(*appsv1.Deployment) oldDepl := old.(*appsv1.Deployment) if newDepl.ResourceVersion == oldDepl.ResourceVersion { // Periodic resync will send update events for all known Deployments. // Two different versions of the same Deployment will always have different RVs. return } controller.handleObject(new) }, DeleteFunc: controller.handleObject, }) if err != nil { klog.Fatalf("error occurred when adding event handler %s", err.Error()) klog.FlushAndExit(klog.ExitFlushTimeout, 1) }
5.6.3. Run and check
- Run the controller
go run .
- Create Foo resource.
kubectl apply -f config/sample/foo.yaml
- Check Foo’s status (will be immediately updated.)
kubectl get foo foo-sample -o jsonpath='{.status}' {"availableReplicas":1}
- Check again by changing the replicas
kubectl patch foo foo-sample -p '{"spec":{"replicas": 2}}' --type=merge
- Check Foo’s status (will be immediately updated.)
kubectl get foo foo-sample -o jsonpath='{.status}' {"availableReplicas":2}
- Delete
Foo
.kubectl delete -f config/sample/foo.yaml
5.7. Create events for Foo resource
5.7.1. Overview
- Add
record.EventRecorder
toController
. - Emit an event with
c.recorder.Event
insyncHandler
.
5.7.2. Implement
Add necessary packages.
@@ -13,20 +13,34 @@ import ( "k8s.io/apimachinery/pkg/util/wait" appsinformers "k8s.io/client-go/informers/apps/v1" "k8s.io/client-go/kubernetes" + typedcorev1 "k8s.io/client-go/kubernetes/typed/core/v1" appslisters "k8s.io/client-go/listers/apps/v1" "k8s.io/client-go/tools/cache" + "k8s.io/client-go/tools/record" "k8s.io/client-go/util/workqueue" samplev1alpha1 "github.com/nakamasato/sample-controller/pkg/apis/example.com/v 1alpha1" clientset "github.com/nakamasato/sample-controller/pkg/generated/clientset/versio ned" + "github.com/nakamasato/sample-controller/pkg/generated/clientset/versioned/scheme" informers "github.com/nakamasato/sample-controller/pkg/generated/informers/extern alversions/example.com/v1alpha1" listers "github.com/nakamasato/sample-controller/pkg/generated/listers/example.com/v1alpha1"
Add
eventRecorder
toController
.type Controller struct { @@ -43,6 +57,10 @@ type Controller struct { // queue workqueue workqueue.RateLimitingInterface + + // recorder is an event recorder for recording Event resources to the + // Kubernetes API. + recorder record.EventRecorder }
Initialize
eventBroadcaster
inNewController
.func NewController( @@ -50,6 +68,11 @@ func NewController( sampleclientset clientset.Interface, deploymentInformer appsinformers.DeploymentInformer, fooInformer informers.FooInformer) *Controller { + + eventBroadcaster := record.NewBroadcaster() + eventBroadcaster.StartStructuredLogging(0) + eventBroadcaster.StartRecordingToSink(&typedcorev1.EventSinkImpl{Interface: kubeclientset.CoreV1().Events("")}) + recorder := eventBroadcaster.NewRecorder(scheme.Scheme, corev1.EventSource{Component: controllerAgentName}) controller := &Controller{ kubeclientset: kubeclientset, sampleclientset: sampleclientset, @@ -58,6 +81,7 @@ func NewController( foosLister: fooInformer.Lister(), foosSynced: fooInformer.Informer().HasSynced, workqueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "foo"), + recorder: recorder, }
Define constants.
+ const controllerAgentName = "sample-controller" const ( + // SuccessSynced is used as part of the Event 'reason' when a Foo is synced + SuccessSynced = "Synced" + // ErrResourceExists is used as part of the Event 'reason' when a Foo fails + // to sync due to a Deployment of the same name already existing. + ErrResourceExists = "ErrResourceExists" + // MessageResourceExists is the message used for Events when a resource // fails to sync due to a Deployment already existing MessageResourceExists = "Resource %q already exists and is not managed by Foo" + // MessageResourceSynced is the message used for an Event fired when a Foo + // is synced successfully + MessageResourceSynced = "Foo synced successfully" )
Record events in
syncHandler
.@@ -199,6 +223,7 @@ func (c *Controller) syncHandler(key string) error { // a warning to the event recorder and return error msg. if !metav1.IsControlledBy(deployment, foo) { msg := fmt.Sprintf(MessageResourceExists, deployment.Name) + c.recorder.Event(foo, corev1.EventTypeWarning, ErrResourceExists, msg) klog.Info(msg) return fmt.Errorf("%s", msg) } @@ -228,6 +253,7 @@ func (c *Controller) syncHandler(key string) error { return err } + c.recorder.Event(foo, corev1.EventTypeNormal, SuccessSynced, MessageResourceSynced) return nil }
Run
go mod tidy
.
5.7.3 Run and check
- Run the controller.
go run .
- Apply
Foo
.kubectl apply -f config/sample/foo.yaml
- Check
event
.kubectl get event --field-selector involvedObject.kind=Foo LAST SEEN TYPE REASON OBJECT MESSAGE 22s Normal Synced foo/foo-sample Foo synced successfully
- Delete
Foo
.kubectl delete -f config/sample/foo.yaml