From efd3490076c391823095b4c8bd6e847ae18eb012 Mon Sep 17 00:00:00 2001 From: wojtekt Date: Tue, 29 Sep 2020 15:06:33 +0200 Subject: [PATCH] Create TransformingInformer TransformingInfomer is like a regular Informer, but allows for applying custom transform functions on the objects received via list/watch API calls. --- .../client-go/tools/cache/controller.go | 77 ++++++++--- .../client-go/tools/cache/controller_test.go | 123 ++++++++++++++++++ 2 files changed, 182 insertions(+), 18 deletions(-) diff --git a/staging/src/k8s.io/client-go/tools/cache/controller.go b/staging/src/k8s.io/client-go/tools/cache/controller.go index 3491c0116a2..3f406490bef 100644 --- a/staging/src/k8s.io/client-go/tools/cache/controller.go +++ b/staging/src/k8s.io/client-go/tools/cache/controller.go @@ -322,7 +322,7 @@ func NewInformer( // This will hold the client state, as we know it. clientState := NewStore(DeletionHandlingMetaNamespaceKeyFunc) - return clientState, newInformer(lw, objType, resyncPeriod, h, clientState) + return clientState, newInformer(lw, objType, resyncPeriod, h, clientState, nil) } // NewIndexerInformer returns an Indexer and a Controller for populating the index @@ -351,7 +351,38 @@ func NewIndexerInformer( // This will hold the client state, as we know it. clientState := NewIndexer(DeletionHandlingMetaNamespaceKeyFunc, indexers) - return clientState, newInformer(lw, objType, resyncPeriod, h, clientState) + return clientState, newInformer(lw, objType, resyncPeriod, h, clientState, nil) +} + +// TransformFunc allows for transforming an object before it will be processed +// and put into the controller cache and before the corresponding handlers will +// be called on it. +// TransformFunc (similarly to ResourceEventHandler functions) should be able +// to correctly handle the tombstone of type cache.DeletedFinalStateUnknown +// +// The most common usage pattern is to clean-up some parts of the object to +// reduce component memory usage if a given component doesn't care about them. +// given controller doesn't care for them +type TransformFunc func(interface{}) (interface{}, error) + +// NewTransformingInformer returns a Store and a controller for populating +// the store while also providing event notifications. You should only used +// the returned Store for Get/List operations; Add/Modify/Deletes will cause +// the event notifications to be faulty. +// The given transform function will be called on all objects before they will +// put put into the Store and corresponding Add/Modify/Delete handlers will +// be invokved for them. +func NewTransformingInformer( + lw ListerWatcher, + objType runtime.Object, + resyncPeriod time.Duration, + h ResourceEventHandler, + transformer TransformFunc, +) (Store, Controller) { + // This will hold the client state, as we know it. + clientState := NewStore(DeletionHandlingMetaNamespaceKeyFunc) + + return clientState, newInformer(lw, objType, resyncPeriod, h, clientState, transformer) } // newInformer returns a controller for populating the store while also @@ -374,6 +405,7 @@ func newInformer( resyncPeriod time.Duration, h ResourceEventHandler, clientState Store, + transformer TransformFunc, ) Controller { // This will hold incoming changes. Note how we pass clientState in as a // KeyLister, that way resync operations will result in the correct set @@ -393,24 +425,33 @@ func newInformer( Process: func(obj interface{}) error { // from oldest to newest for _, d := range obj.(Deltas) { - switch d.Type { - case Sync, Replaced, Added, Updated: - if old, exists, err := clientState.Get(d.Object); err == nil && exists { - if err := clientState.Update(d.Object); err != nil { - return err - } - h.OnUpdate(old, d.Object) - } else { - if err := clientState.Add(d.Object); err != nil { - return err - } - h.OnAdd(d.Object) - } - case Deleted: - if err := clientState.Delete(d.Object); err != nil { + obj := d.Object + if transformer != nil { + var err error + obj, err = transformer(obj) + if err != nil { return err } - h.OnDelete(d.Object) + } + + switch d.Type { + case Sync, Replaced, Added, Updated: + if old, exists, err := clientState.Get(obj); err == nil && exists { + if err := clientState.Update(obj); err != nil { + return err + } + h.OnUpdate(old, obj) + } else { + if err := clientState.Add(obj); err != nil { + return err + } + h.OnAdd(obj) + } + case Deleted: + if err := clientState.Delete(obj); err != nil { + return err + } + h.OnDelete(obj) } } return nil diff --git a/staging/src/k8s.io/client-go/tools/cache/controller_test.go b/staging/src/k8s.io/client-go/tools/cache/controller_test.go index 1603b9d7e4f..cf42478e057 100644 --- a/staging/src/k8s.io/client-go/tools/cache/controller_test.go +++ b/staging/src/k8s.io/client-go/tools/cache/controller_test.go @@ -24,6 +24,7 @@ import ( "time" "k8s.io/api/core/v1" + apiequality "k8s.io/apimachinery/pkg/api/equality" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/util/sets" @@ -451,3 +452,125 @@ func TestPanicPropagated(t *testing.T) { t.Errorf("timeout: the panic failed to propagate from the controller run method!") } } + +func TestTransformingInformer(t *testing.T) { + // source simulates an apiserver object endpoint. + source := fcache.NewFakeControllerSource() + + makePod := func(name, generation string) *v1.Pod { + return &v1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: name, + Namespace: "namespace", + Labels: map[string]string{"generation": generation}, + }, + Spec: v1.PodSpec{ + Hostname: "hostname", + Subdomain: "subdomain", + }, + } + } + expectedPod := func(name, generation string) *v1.Pod { + pod := makePod(name, generation) + pod.Spec.Hostname = "new-hostname" + pod.Spec.Subdomain = "" + pod.Spec.NodeName = "nodename" + return pod + } + + source.Add(makePod("pod1", "1")) + source.Modify(makePod("pod1", "2")) + + type event struct { + eventType watch.EventType + previous interface{} + current interface{} + } + events := make(chan event, 10) + recordEvent := func(eventType watch.EventType, previous, current interface{}) { + events <- event{eventType: eventType, previous: previous, current: current} + } + verifyEvent := func(eventType watch.EventType, previous, current interface{}) { + select { + case event := <-events: + if event.eventType != eventType { + t.Errorf("expected type %v, got %v", eventType, event.eventType) + } + if !apiequality.Semantic.DeepEqual(event.previous, previous) { + t.Errorf("expected previous object %#v, got %#v", previous, event.previous) + } + if !apiequality.Semantic.DeepEqual(event.current, current) { + t.Errorf("expected object %#v, got %#v", current, event.current) + } + case <-time.After(wait.ForeverTestTimeout): + t.Errorf("failed to get event") + } + } + + podTransformer := func(obj interface{}) (interface{}, error) { + pod, ok := obj.(*v1.Pod) + if !ok { + return nil, fmt.Errorf("unexpected object type: %T", obj) + } + pod.Spec.Hostname = "new-hostname" + pod.Spec.Subdomain = "" + pod.Spec.NodeName = "nodename" + + // Clear out ResourceVersion to simplify comparisons. + pod.ResourceVersion = "" + + return pod, nil + } + + store, controller := NewTransformingInformer( + source, + &v1.Pod{}, + 0, + ResourceEventHandlerFuncs{ + AddFunc: func(obj interface{}) { recordEvent(watch.Added, nil, obj) }, + UpdateFunc: func(oldObj, newObj interface{}) { recordEvent(watch.Modified, oldObj, newObj) }, + DeleteFunc: func(obj interface{}) { recordEvent(watch.Deleted, obj, nil) }, + }, + podTransformer, + ) + + verifyStore := func(expectedItems []interface{}) { + items := store.List() + if len(items) != len(expectedItems) { + t.Errorf("unexpected items %v, expected %v", items, expectedItems) + } + for _, expectedItem := range expectedItems { + found := false + for _, item := range items { + if apiequality.Semantic.DeepEqual(item, expectedItem) { + found = true + } + } + if !found { + t.Errorf("expected item %v not found in %v", expectedItem, items) + } + } + } + + stopCh := make(chan struct{}) + go controller.Run(stopCh) + + verifyEvent(watch.Added, nil, expectedPod("pod1", "2")) + verifyStore([]interface{}{expectedPod("pod1", "2")}) + + source.Add(makePod("pod2", "1")) + verifyEvent(watch.Added, nil, expectedPod("pod2", "1")) + verifyStore([]interface{}{expectedPod("pod1", "2"), expectedPod("pod2", "1")}) + + source.Add(makePod("pod3", "1")) + verifyEvent(watch.Added, nil, expectedPod("pod3", "1")) + + source.Modify(makePod("pod2", "2")) + verifyEvent(watch.Modified, expectedPod("pod2", "1"), expectedPod("pod2", "2")) + + source.Delete(makePod("pod1", "2")) + verifyEvent(watch.Deleted, expectedPod("pod1", "2"), nil) + verifyStore([]interface{}{expectedPod("pod2", "2"), expectedPod("pod3", "1")}) + + close(stopCh) +}