ListerWatcher
type
Interface
// ListerWatcher is any object that knows how to perform an initial list and start a watch on a resource.
type ListerWatcher interface {
Lister
Watcher
}
// Lister is any object that knows how to perform an initial list.
type Lister interface {
// List should return a list type object; the Items field will be extracted, and the
// ResourceVersion field will be used to start the watch in the right place.
List(options metav1.ListOptions) (runtime.Object, error)
}
// Watcher is any object that knows how to start a watch on a resource.
type Watcher interface {
// Watch should begin a watch at the specified version.
Watch(options metav1.ListOptions) (watch.Interface, error)
}
ListWatch struct
// ListFunc knows how to list resources
type ListFunc func(options metav1.ListOptions) (runtime.Object, error)
// WatchFunc knows how to watch resources
type WatchFunc func(options metav1.ListOptions) (watch.Interface, error)
// ListWatch knows how to list and watch a set of apiserver resources. It satisfies the ListerWatcher interface.
// It is a convenience function for users of NewReflector, etc.
// ListFunc and WatchFunc must not be nil
type ListWatch struct {
ListFunc ListFunc
WatchFunc WatchFunc
// DisableChunking requests no chunking for this list watcher.
DisableChunking bool
}
type Interface interface {
// Stop stops watching. Will close the channel returned by ResultChan(). Releases
// any resources used by the watch.
Stop()
// ResultChan returns a chan which will receive all the events. If an error occurs
// or Stop() is called, the implementation will close this channel and
// release any resources used by the watch.
ResultChan() <-chan Event
}
How ListWatch is used
-
Created with NewFilteredListWatchFromClient:
func NewFilteredListWatchFromClient(c Getter, resource string, namespace string, optionsModifier func(options *metav1.ListOptions)) *ListWatch { listFunc := func(options metav1.ListOptions) (runtime.Object, error) { optionsModifier(&options) return c.Get(). Namespace(namespace). Resource(resource). VersionedParams(&options, metav1.ParameterCodec). Do(context.TODO()). Get() } watchFunc := func(options metav1.ListOptions) (watch.Interface, error) { options.Watch = true optionsModifier(&options) return c.Get(). Namespace(namespace). Resource(resource). VersionedParams(&options, metav1.ParameterCodec). Watch(context.TODO()) } return &ListWatch{ListFunc: listFunc, WatchFunc: watchFunc} } -
Getter can be got from
clientset:clientset.CoreV1().RESTClient()1. listFunc and watchFunc is set inpodListWatcher := cache.NewListWatchFromClient(clientset.CoreV1().RESTClient(), "pods", v1.NamespaceDefault, fields.Everything())NewFilteredListWatchFromClient: 1.c.Get()callsNewRequest().Verb("GET")1. rest.NewRequest creates and returns a request. 1. Request.Watch 1. get retry func 1. in a for loop, runretry.Before,client.Do(req), andretry.After -
Then, listwatcher will be passed to informer.
indexer, informer := cache.NewIndexerInformer(podListWatcher, &v1.Pod{}, 0, cache.ResourceEventHandlerFuncs{...ListWatcheris used to initialize the store (FIFODelta) withListand keep the store up-to-date withWatch. For more details, you can check informer
Example
Code: You can get the event through w.ResultChan(). The example is simplified version (no error handling).
w, err := podListWatcher.Watch(metav1.ListOptions{}) // returns watch.Interface
if err != nil {
klog.Fatal(err)
}
loop:
for {
event, ok := <-w.ResultChan()
if !ok {
break loop
}
meta, err := meta.Accessor(event.Object)
if err != nil {
continue
}
resourceVersion := meta.GetResourceVersion()
klog.Infof("event: %s, resourceVersion: %s", event.Type, resourceVersion)
}
Run:
- Start the ListerWatcher for Pods.
go run main.go I0913 07:54:29.394053 92277 main.go:57] resourceVersion: 2728 - Create a Pod
You'll see the event logs:
kubectl run nginx --image=nginxI0913 07:54:29.394239 92277 main.go:64] items: 1 I0913 07:54:29.401483 92277 main.go:84] event: ADDED, resourceVersion: 503 I0913 07:55:20.475959 92277 main.go:84] event: MODIFIED, resourceVersion: 2789 I0913 07:55:38.769688 92277 main.go:84] event: MODIFIED, resourceVersion: 2812 - Patch the Pod
You'll see
kubectl patch pod nginx -p '{"metadata":{"annotations": {"key": "val"}}}' --type=mergeMODIFIEDin the logs. -
Delete the Pod
kubectl delete pod nginxI0913 08:02:23.486861 92277 main.go:84] event: MODIFIED, resourceVersion: 3294 I0913 08:02:23.929399 92277 main.go:84] event: MODIFIED, resourceVersion: 3298 I0913 08:02:24.239499 92277 main.go:84] event: MODIFIED, resourceVersion: 3300 I0913 08:02:24.244502 92277 main.go:84] event: DELETED, resourceVersion: 3301
Referece: example