diff --git a/tools/cache/controller.go b/tools/cache/controller.go index 51e0a4659..cf4f0b0c1 100644 --- a/tools/cache/controller.go +++ b/tools/cache/controller.go @@ -442,7 +442,7 @@ func NewInformerWithOptions(options InformerOptions) (Store, Controller) { } else { clientState = NewIndexer(DeletionHandlingMetaNamespaceKeyFunc, options.Indexers) } - return clientState, newInformer(clientState, options) + return clientState, newInformer(clientState, options, DeletionHandlingMetaNamespaceKeyFunc) } // NewInformer returns a Store and a controller for populating the store @@ -476,7 +476,7 @@ func NewInformer( Handler: h, ResyncPeriod: resyncPeriod, } - return clientState, newInformer(clientState, options) + return clientState, newInformer(clientState, options, DeletionHandlingMetaNamespaceKeyFunc) } // NewIndexerInformer returns an Indexer and a Controller for populating the index @@ -513,7 +513,7 @@ func NewIndexerInformer( ResyncPeriod: resyncPeriod, Indexers: indexers, } - return clientState, newInformer(clientState, options) + return clientState, newInformer(clientState, options, DeletionHandlingMetaNamespaceKeyFunc) } // NewTransformingInformer returns a Store and a controller for populating @@ -542,7 +542,7 @@ func NewTransformingInformer( ResyncPeriod: resyncPeriod, Transform: transformer, } - return clientState, newInformer(clientState, options) + return clientState, newInformer(clientState, options, DeletionHandlingMetaNamespaceKeyFunc) } // NewTransformingIndexerInformer returns an Indexer and a controller for @@ -573,7 +573,7 @@ func NewTransformingIndexerInformer( Indexers: indexers, Transform: transformer, } - return clientState, newInformer(clientState, options) + return clientState, newInformer(clientState, options, DeletionHandlingMetaNamespaceKeyFunc) } // Multiplexes updates in the form of a list of Deltas into a Store, and informs @@ -584,6 +584,7 @@ func processDeltas( clientState Store, deltas Deltas, isInInitialList bool, + keyFunc KeyFunc, ) error { // from oldest to newest for _, d := range deltas { @@ -626,6 +627,7 @@ func processDeltasInBatch( clientState Store, deltas []Delta, isInInitialList bool, + keyFunc KeyFunc, ) error { // from oldest to newest txns := make([]Transaction, 0) @@ -634,7 +636,7 @@ func processDeltasInBatch( if !txnSupported { var errs []error for _, delta := range deltas { - if err := processDeltas(handler, clientState, Deltas{delta}, isInInitialList); err != nil { + if err := processDeltas(handler, clientState, Deltas{delta}, isInInitialList, keyFunc); err != nil { errs = append(errs, err) } } @@ -703,7 +705,7 @@ func processDeltasInBatch( // Parameters // - clientState is the store you want to populate // - options contain the options to configure the controller -func newInformer(clientState Store, options InformerOptions) Controller { +func newInformer(clientState Store, options InformerOptions, keyFunc KeyFunc) Controller { // This will hold incoming changes. Note how we pass clientState in as a // KeyLister, that way resync operations will result in the correct set // of update/delete deltas. @@ -719,12 +721,12 @@ func newInformer(clientState Store, options InformerOptions) Controller { Process: func(obj interface{}, isInInitialList bool) error { if deltas, ok := obj.(Deltas); ok { - return processDeltas(options.Handler, clientState, deltas, isInInitialList) + return processDeltas(options.Handler, clientState, deltas, isInInitialList, keyFunc) } return errors.New("object given as Process argument is not Deltas") }, ProcessBatch: func(deltaList []Delta, isInInitialList bool) error { - return processDeltasInBatch(options.Handler, clientState, deltaList, isInInitialList) + return processDeltasInBatch(options.Handler, clientState, deltaList, isInInitialList, keyFunc) }, } return New(cfg) diff --git a/tools/cache/controller_test.go b/tools/cache/controller_test.go index 1456e5eaa..d12678643 100644 --- a/tools/cache/controller_test.go +++ b/tools/cache/controller_test.go @@ -853,7 +853,8 @@ func TestProcessDeltasInBatch(t *testing.T) { dummyListener, mockStore, tc.deltaList, - true) + true, + DeletionHandlingMetaNamespaceKeyFunc) if tc.assertErr != nil { assert.True(t, tc.assertErr(err)) } @@ -923,12 +924,12 @@ func TestReplaceEvents(t *testing.T) { Process: func(obj interface{}, isInInitialList bool) error { if deltas, ok := obj.(Deltas); ok { - return processDeltas(recorder, store, deltas, isInInitialList) + return processDeltas(recorder, store, deltas, isInInitialList, DeletionHandlingMetaNamespaceKeyFunc) } return errors.New("object given as Process argument is not Deltas") }, ProcessBatch: func(deltaList []Delta, isInInitialList bool) error { - return processDeltasInBatch(recorder, store, deltaList, isInInitialList) + return processDeltasInBatch(recorder, store, deltaList, isInInitialList, DeletionHandlingMetaNamespaceKeyFunc) }, } diff --git a/tools/cache/shared_informer.go b/tools/cache/shared_informer.go index 8973a33e8..bf58fe678 100644 --- a/tools/cache/shared_informer.go +++ b/tools/cache/shared_informer.go @@ -312,6 +312,7 @@ func NewSharedIndexInformerWithOptions(lw ListerWatcher, exampleObject runtime.O defaultEventHandlerResyncPeriod: options.ResyncPeriod, clock: realClock, cacheMutationDetector: NewCacheMutationDetector(fmt.Sprintf("%T", exampleObject)), + keyFunc: DeletionHandlingMetaNamespaceKeyFunc, } } @@ -449,6 +450,9 @@ type sharedIndexInformer struct { watchErrorHandler WatchErrorHandlerWithContext transform TransformFunc + + // keyFunc is called when processing deltas by the underlying process function. + keyFunc KeyFunc } // dummyController hides the fact that a SharedInformer is different from a dedicated one @@ -718,7 +722,7 @@ func (s *sharedIndexInformer) HandleDeltas(obj interface{}, isInInitialList bool defer s.blockDeltas.Unlock() if deltas, ok := obj.(Deltas); ok { - return processDeltas(s, s.indexer, deltas, isInInitialList) + return processDeltas(s, s.indexer, deltas, isInInitialList, s.keyFunc) } return errors.New("object given as Process argument is not Deltas") } @@ -726,7 +730,7 @@ func (s *sharedIndexInformer) HandleDeltas(obj interface{}, isInInitialList bool func (s *sharedIndexInformer) HandleBatchDeltas(deltas []Delta, isInInitialList bool) error { s.blockDeltas.Lock() defer s.blockDeltas.Unlock() - return processDeltasInBatch(s, s.indexer, deltas, isInInitialList) + return processDeltasInBatch(s, s.indexer, deltas, isInInitialList, s.keyFunc) } // Conforms to ResourceEventHandler