From 474fc8c5234000bce666a6b02f7ffbb295ef135f Mon Sep 17 00:00:00 2001 From: Andy Goldstein Date: Wed, 17 Aug 2022 15:49:26 -0400 Subject: [PATCH 1/2] Reflector: support logging Unstructured type Add an annotation that can be added to the exampleType passed to NewReflector to indicate the expected type for the Reflector. This is useful for types such as unstuctured.Unstructured, which, when used with a dynamic informer, do not have their TypeMeta filled in. Signed-off-by: Andy Goldstein --- .../dynamic/dynamicinformer/informer.go | 9 +- .../client-go/tools/cache/controller.go | 14 +- .../k8s.io/client-go/tools/cache/reflector.go | 133 ++++++++++++------ .../client-go/tools/cache/reflector_test.go | 96 +++++++++---- .../client-go/tools/cache/shared_informer.go | 73 +++++++--- 5 files changed, 228 insertions(+), 97 deletions(-) diff --git a/staging/src/k8s.io/client-go/dynamic/dynamicinformer/informer.go b/staging/src/k8s.io/client-go/dynamic/dynamicinformer/informer.go index 40878b400f6..9785613d2c5 100644 --- a/staging/src/k8s.io/client-go/dynamic/dynamicinformer/informer.go +++ b/staging/src/k8s.io/client-go/dynamic/dynamicinformer/informer.go @@ -120,7 +120,7 @@ func (f *dynamicSharedInformerFactory) WaitForCacheSync(stopCh <-chan struct{}) func NewFilteredDynamicInformer(client dynamic.Interface, gvr schema.GroupVersionResource, namespace string, resyncPeriod time.Duration, indexers cache.Indexers, tweakListOptions TweakListOptionsFunc) informers.GenericInformer { return &dynamicInformer{ gvr: gvr, - informer: cache.NewSharedIndexInformer( + informer: cache.NewSharedIndexInformerWithOptions( &cache.ListWatch{ ListFunc: func(options metav1.ListOptions) (runtime.Object, error) { if tweakListOptions != nil { @@ -136,8 +136,11 @@ func NewFilteredDynamicInformer(client dynamic.Interface, gvr schema.GroupVersio }, }, &unstructured.Unstructured{}, - resyncPeriod, - indexers, + cache.SharedIndexInformerOptions{ + ResyncPeriod: resyncPeriod, + Indexers: indexers, + ObjectDescription: gvr.String(), + }, ), } } diff --git a/staging/src/k8s.io/client-go/tools/cache/controller.go b/staging/src/k8s.io/client-go/tools/cache/controller.go index 0762da3befa..57b15fea1c9 100644 --- a/staging/src/k8s.io/client-go/tools/cache/controller.go +++ b/staging/src/k8s.io/client-go/tools/cache/controller.go @@ -50,11 +50,12 @@ type Config struct { Process ProcessFunc // ObjectType is an example object of the type this controller is - // expected to handle. Only the type needs to be right, except - // that when that is `unstructured.Unstructured` the object's - // `"apiVersion"` and `"kind"` must also be right. + // expected to handle. ObjectType runtime.Object + // ObjectDescription is the description to use when logging type-specific information about this controller. + ObjectDescription string + // FullResyncPeriod is the period at which ShouldResync is considered. FullResyncPeriod time.Duration @@ -131,11 +132,14 @@ func (c *controller) Run(stopCh <-chan struct{}) { <-stopCh c.config.Queue.Close() }() - r := NewReflector( + r := NewReflectorWithOptions( c.config.ListerWatcher, c.config.ObjectType, c.config.Queue, - c.config.FullResyncPeriod, + ReflectorOptions{ + ResyncPeriod: c.config.FullResyncPeriod, + TypeDescription: c.config.ObjectDescription, + }, ) r.ShouldResync = c.config.ShouldResync r.WatchListPageSize = c.config.WatchListPageSize diff --git a/staging/src/k8s.io/client-go/tools/cache/reflector.go b/staging/src/k8s.io/client-go/tools/cache/reflector.go index 9cd476be8a0..dc3af67ab53 100644 --- a/staging/src/k8s.io/client-go/tools/cache/reflector.go +++ b/staging/src/k8s.io/client-go/tools/cache/reflector.go @@ -54,7 +54,7 @@ type Reflector struct { // will be the stringification of expectedGVK if provided, and the // stringification of expectedType otherwise. It is for display // only, and should not be used for parsing or comparison. - expectedTypeName string + typeDescription string // An example object of the type we expect to place in the store. // Only the type needs to be right, except that when that is // `unstructured.Unstructured` the object's `"apiVersion"` and @@ -131,13 +131,13 @@ func DefaultWatchErrorHandler(r *Reflector, err error) { // Don't set LastSyncResourceVersionUnavailable - LIST call with ResourceVersion=RV already // has a semantic that it returns data at least as fresh as provided RV. // So first try to LIST with setting RV to resource version of last observed object. - klog.V(4).Infof("%s: watch of %v closed with: %v", r.name, r.expectedTypeName, err) + klog.V(4).Infof("%s: watch of %v closed with: %v", r.name, r.typeDescription, err) case err == io.EOF: // watch closed normally case err == io.ErrUnexpectedEOF: - klog.V(1).Infof("%s: Watch for %v closed with unexpected EOF: %v", r.name, r.expectedTypeName, err) + klog.V(1).Infof("%s: Watch for %v closed with unexpected EOF: %v", r.name, r.typeDescription, err) default: - utilruntime.HandleError(fmt.Errorf("%s: Failed to watch %v: %v", r.name, r.expectedTypeName, err)) + utilruntime.HandleError(fmt.Errorf("%s: Failed to watch %v: %v", r.name, r.typeDescription, err)) } } @@ -155,7 +155,37 @@ func NewNamespaceKeyedIndexerAndReflector(lw ListerWatcher, expectedType interfa return indexer, reflector } -// NewReflector creates a new Reflector object which will keep the +// NewReflector creates a new Reflector with its name defaulted to the closest source_file.go:line in the call stack +// that is outside this package. See NewReflectorWithOptions for further information. +func NewReflector(lw ListerWatcher, expectedType interface{}, store Store, resyncPeriod time.Duration) *Reflector { + return NewReflectorWithOptions(lw, expectedType, store, ReflectorOptions{ResyncPeriod: resyncPeriod}) +} + +// NewNamedReflector creates a new Reflector with the specified name. See NewReflectorWithOptions for further +// information. +func NewNamedReflector(name string, lw ListerWatcher, expectedType interface{}, store Store, resyncPeriod time.Duration) *Reflector { + return NewReflectorWithOptions(lw, expectedType, store, ReflectorOptions{Name: name, ResyncPeriod: resyncPeriod}) +} + +// ReflectorOptions configures a Reflector. +type ReflectorOptions struct { + // Name is the Reflector's name. If unset/unspecified, the name defaults to the closest source_file.go:line + // in the call stack that is outside this package. + Name string + + // TypeDescription is the Reflector's type description. If unset/unspecified, the type description is defaulted + // using the following rules: if the expectedType passed to NewReflectorWithOptions was nil, the type description is + // "". If the expectedType is an instance of *unstructured.Unstructured and its apiVersion and kind fields + // are set, the type description is the string encoding of those. Otherwise, the type description is set to the + // go type of expectedType.. + TypeDescription string + + // ResyncPeriod is the Reflector's resync period. If unset/unspecified, the resync period defaults to 0 + // (do not resync). + ResyncPeriod time.Duration +} + +// NewReflectorWithOptions creates a new Reflector object which will keep the // given store up to date with the server's contents for the given // resource. Reflector promises to only put things in the store that // have the type of expectedType, unless expectedType is nil. If @@ -165,49 +195,70 @@ func NewNamespaceKeyedIndexerAndReflector(lw ListerWatcher, expectedType interfa // "yes". This enables you to use reflectors to periodically process // everything as well as incrementally processing the things that // change. -func NewReflector(lw ListerWatcher, expectedType interface{}, store Store, resyncPeriod time.Duration) *Reflector { - return NewNamedReflector(naming.GetNameFromCallsite(internalPackages...), lw, expectedType, store, resyncPeriod) -} - -// NewNamedReflector same as NewReflector, but with a specified name for logging -func NewNamedReflector(name string, lw ListerWatcher, expectedType interface{}, store Store, resyncPeriod time.Duration) *Reflector { +func NewReflectorWithOptions(lw ListerWatcher, expectedType interface{}, store Store, options ReflectorOptions) *Reflector { realClock := &clock.RealClock{} r := &Reflector{ - name: name, - listerWatcher: lw, - store: store, + name: options.Name, + resyncPeriod: options.ResyncPeriod, + typeDescription: options.TypeDescription, + listerWatcher: lw, + store: store, // We used to make the call every 1sec (1 QPS), the goal here is to achieve ~98% traffic reduction when // API server is not healthy. With these parameters, backoff will stop at [30,60) sec interval which is // 0.22 QPS. If we don't backoff for 2min, assume API server is healthy and we reset the backoff. backoffManager: wait.NewExponentialBackoffManager(800*time.Millisecond, 30*time.Second, 2*time.Minute, 2.0, 1.0, realClock), initConnBackoffManager: wait.NewExponentialBackoffManager(800*time.Millisecond, 30*time.Second, 2*time.Minute, 2.0, 1.0, realClock), - resyncPeriod: resyncPeriod, clock: realClock, watchErrorHandler: WatchErrorHandler(DefaultWatchErrorHandler), + expectedType: reflect.TypeOf(expectedType), } - r.setExpectedType(expectedType) + + if r.name == "" { + r.name = naming.GetNameFromCallsite(internalPackages...) + } + + r.setTypeDescription(expectedType) + r.setExpectedGVK(expectedType) + return r } -func (r *Reflector) setExpectedType(expectedType interface{}) { - r.expectedType = reflect.TypeOf(expectedType) - if r.expectedType == nil { - r.expectedTypeName = defaultExpectedTypeName +func (r *Reflector) setTypeDescription(expectedType interface{}) { + if r.typeDescription != "" { return } - r.expectedTypeName = r.expectedType.String() - - if obj, ok := expectedType.(*unstructured.Unstructured); ok { - // Use gvk to check that watch event objects are of the desired type. - gvk := obj.GroupVersionKind() - if gvk.Empty() { - klog.V(4).Infof("Reflector from %s configured with expectedType of *unstructured.Unstructured with empty GroupVersionKind.", r.name) - return - } - r.expectedGVK = &gvk - r.expectedTypeName = gvk.String() + if expectedType == nil { + r.typeDescription = defaultExpectedTypeName + } else { + r.typeDescription = reflect.TypeOf(expectedType).String() } + + obj, ok := expectedType.(*unstructured.Unstructured) + if !ok { + return + } + + gvk := obj.GroupVersionKind() + if gvk.Empty() { + return + } + + r.typeDescription = gvk.String() +} + +func (r *Reflector) setExpectedGVK(expectedType interface{}) { + obj, ok := expectedType.(*unstructured.Unstructured) + if !ok { + return + } + + gvk := obj.GroupVersionKind() + if gvk.Empty() { + return + } + + r.expectedGVK = &gvk } // internalPackages are packages that ignored when creating a default reflector name. These packages are in the common @@ -218,13 +269,13 @@ var internalPackages = []string{"client-go/tools/cache/"} // objects and subsequent deltas. // Run will exit when stopCh is closed. func (r *Reflector) Run(stopCh <-chan struct{}) { - klog.V(3).Infof("Starting reflector %s (%s) from %s", r.expectedTypeName, r.resyncPeriod, r.name) + klog.V(3).Infof("Starting reflector %s (%s) from %s", r.typeDescription, r.resyncPeriod, r.name) wait.BackoffUntil(func() { if err := r.ListAndWatch(stopCh); err != nil { r.watchErrorHandler(r, err) } }, r.backoffManager, true, stopCh) - klog.V(3).Infof("Stopping reflector %s (%s) from %s", r.expectedTypeName, r.resyncPeriod, r.name) + klog.V(3).Infof("Stopping reflector %s (%s) from %s", r.typeDescription, r.resyncPeriod, r.name) } var ( @@ -254,7 +305,7 @@ func (r *Reflector) resyncChan() (<-chan time.Time, func() bool) { // and then use the resource version to watch. // It returns error if ListAndWatch didn't even try to initialize watch. func (r *Reflector) ListAndWatch(stopCh <-chan struct{}) error { - klog.V(3).Infof("Listing and watching %v from %s", r.expectedTypeName, r.name) + klog.V(3).Infof("Listing and watching %v from %s", r.typeDescription, r.name) err := r.list(stopCh) if err != nil { @@ -326,7 +377,7 @@ func (r *Reflector) ListAndWatch(stopCh <-chan struct{}) error { return err } - err = watchHandler(start, w, r.store, r.expectedType, r.expectedGVK, r.name, r.expectedTypeName, r.setLastSyncResourceVersion, r.clock, resyncerrc, stopCh) + err = watchHandler(start, w, r.store, r.expectedType, r.expectedGVK, r.name, r.typeDescription, r.setLastSyncResourceVersion, r.clock, resyncerrc, stopCh) retry.After(err) if err != nil { if err != errorStopRequested { @@ -335,16 +386,16 @@ func (r *Reflector) ListAndWatch(stopCh <-chan struct{}) error { // Don't set LastSyncResourceVersionUnavailable - LIST call with ResourceVersion=RV already // has a semantic that it returns data at least as fresh as provided RV. // So first try to LIST with setting RV to resource version of last observed object. - klog.V(4).Infof("%s: watch of %v closed with: %v", r.name, r.expectedTypeName, err) + klog.V(4).Infof("%s: watch of %v closed with: %v", r.name, r.typeDescription, err) case apierrors.IsTooManyRequests(err): - klog.V(2).Infof("%s: watch of %v returned 429 - backing off", r.name, r.expectedTypeName) + klog.V(2).Infof("%s: watch of %v returned 429 - backing off", r.name, r.typeDescription) <-r.initConnBackoffManager.Backoff().C() continue case apierrors.IsInternalError(err) && retry.ShouldRetry(): - klog.V(2).Infof("%s: retrying watch of %v internal error: %v", r.name, r.expectedTypeName, err) + klog.V(2).Infof("%s: retrying watch of %v internal error: %v", r.name, r.typeDescription, err) continue default: - klog.Warningf("%s: watch of %v ended with: %v", r.name, r.expectedTypeName, err) + klog.Warningf("%s: watch of %v ended with: %v", r.name, r.typeDescription, err) } } return nil @@ -421,8 +472,8 @@ func (r *Reflector) list(stopCh <-chan struct{}) error { } initTrace.Step("Objects listed", trace.Field{Key: "error", Value: err}) if err != nil { - klog.Warningf("%s: failed to list %v: %v", r.name, r.expectedTypeName, err) - return fmt.Errorf("failed to list %v: %w", r.expectedTypeName, err) + klog.Warningf("%s: failed to list %v: %v", r.name, r.typeDescription, err) + return fmt.Errorf("failed to list %v: %w", r.typeDescription, err) } // We check if the list was paginated and if so set the paginatedResult based on that. diff --git a/staging/src/k8s.io/client-go/tools/cache/reflector_test.go b/staging/src/k8s.io/client-go/tools/cache/reflector_test.go index eeb4affb6b1..c6cfece6f9a 100644 --- a/staging/src/k8s.io/client-go/tools/cache/reflector_test.go +++ b/staging/src/k8s.io/client-go/tools/cache/reflector_test.go @@ -138,7 +138,7 @@ func TestReflectorWatchHandlerError(t *testing.T) { go func() { fw.Stop() }() - err := watchHandler(time.Now(), fw, s, g.expectedType, g.expectedGVK, g.name, g.expectedTypeName, g.setLastSyncResourceVersion, g.clock, nevererrc, wait.NeverStop) + err := watchHandler(time.Now(), fw, s, g.expectedType, g.expectedGVK, g.name, g.typeDescription, g.setLastSyncResourceVersion, g.clock, nevererrc, wait.NeverStop) if err == nil { t.Errorf("unexpected non-error") } @@ -157,7 +157,7 @@ func TestReflectorWatchHandler(t *testing.T) { fw.Add(&v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "baz", ResourceVersion: "32"}}) fw.Stop() }() - err := watchHandler(time.Now(), fw, s, g.expectedType, g.expectedGVK, g.name, g.expectedTypeName, g.setLastSyncResourceVersion, g.clock, nevererrc, wait.NeverStop) + err := watchHandler(time.Now(), fw, s, g.expectedType, g.expectedGVK, g.name, g.typeDescription, g.setLastSyncResourceVersion, g.clock, nevererrc, wait.NeverStop) if err != nil { t.Errorf("unexpected error %v", err) } @@ -205,7 +205,7 @@ func TestReflectorStopWatch(t *testing.T) { fw := watch.NewFake() stopWatch := make(chan struct{}, 1) stopWatch <- struct{}{} - err := watchHandler(time.Now(), fw, s, g.expectedType, g.expectedGVK, g.name, g.expectedTypeName, g.setLastSyncResourceVersion, g.clock, nevererrc, stopWatch) + err := watchHandler(time.Now(), fw, s, g.expectedType, g.expectedGVK, g.name, g.typeDescription, g.setLastSyncResourceVersion, g.clock, nevererrc, stopWatch) if err != errorStopRequested { t.Errorf("expected stop error, got %q", err) } @@ -979,7 +979,7 @@ func TestReflectorFullListIfTooLarge(t *testing.T) { } } -func TestReflectorSetExpectedType(t *testing.T) { +func TestReflectorSetTypeDescription(t *testing.T) { obj := &unstructured.Unstructured{} gvk := schema.GroupVersionKind{ Group: "mygroup", @@ -987,42 +987,86 @@ func TestReflectorSetExpectedType(t *testing.T) { Kind: "MyKind", } obj.SetGroupVersionKind(gvk) + testCases := map[string]struct { - inputType interface{} - expectedTypeName string - expectedType reflect.Type - expectedGVK *schema.GroupVersionKind + inputType interface{} + customDescription string + expectedTypeDescription string }{ "Nil type": { - expectedTypeName: defaultExpectedTypeName, + expectedTypeDescription: defaultExpectedTypeName, }, "Normal type": { - inputType: &v1.Pod{}, - expectedTypeName: "*v1.Pod", - expectedType: reflect.TypeOf(&v1.Pod{}), + inputType: &v1.Pod{}, + expectedTypeDescription: "*v1.Pod", + }, + "Normal type with custom description": { + inputType: &v1.Pod{}, + customDescription: "foo", + expectedTypeDescription: "foo", }, "Unstructured type without GVK": { - inputType: &unstructured.Unstructured{}, - expectedTypeName: "*unstructured.Unstructured", - expectedType: reflect.TypeOf(&unstructured.Unstructured{}), + inputType: &unstructured.Unstructured{}, + expectedTypeDescription: "*unstructured.Unstructured", + }, + "Unstructured type without GVK, with custom description": { + inputType: &unstructured.Unstructured{}, + customDescription: "foo", + expectedTypeDescription: "foo", }, "Unstructured type with GVK": { - inputType: obj, - expectedTypeName: gvk.String(), - expectedType: reflect.TypeOf(&unstructured.Unstructured{}), - expectedGVK: &gvk, + inputType: obj, + expectedTypeDescription: gvk.String(), + }, + "Unstructured type with GVK, with custom type description": { + inputType: obj, + customDescription: "foo", + expectedTypeDescription: "foo", + }, + } + for testName, tc := range testCases { + t.Run(testName, func(t *testing.T) { + r := &Reflector{ + typeDescription: tc.customDescription, + } + + r.setTypeDescription(tc.inputType) + if tc.expectedTypeDescription != r.typeDescription { + t.Fatalf("Expected typeDescription %v, got %v", tc.expectedTypeDescription, r.typeDescription) + } + }) + } +} + +func TestReflectorSetExpectedGVK(t *testing.T) { + obj := &unstructured.Unstructured{} + gvk := schema.GroupVersionKind{ + Group: "mygroup", + Version: "v1", + Kind: "MyKind", + } + obj.SetGroupVersionKind(gvk) + + testCases := map[string]struct { + inputType interface{} + expectedGVK *schema.GroupVersionKind + }{ + "Nil type": {}, + "Some non Unstructured type": { + inputType: &v1.Pod{}, + }, + "Unstructured type without GVK": { + inputType: &unstructured.Unstructured{}, + }, + "Unstructured type with GVK": { + inputType: obj, + expectedGVK: &gvk, }, } for testName, tc := range testCases { t.Run(testName, func(t *testing.T) { r := &Reflector{} - r.setExpectedType(tc.inputType) - if tc.expectedType != r.expectedType { - t.Fatalf("Expected expectedType %v, got %v", tc.expectedType, r.expectedType) - } - if tc.expectedTypeName != r.expectedTypeName { - t.Fatalf("Expected expectedTypeName %v, got %v", tc.expectedTypeName, r.expectedTypeName) - } + r.setExpectedGVK(tc.inputType) gvkNotEqual := (tc.expectedGVK == nil) != (r.expectedGVK == nil) if tc.expectedGVK != nil && r.expectedGVK != nil { gvkNotEqual = *tc.expectedGVK != *r.expectedGVK diff --git a/staging/src/k8s.io/client-go/tools/cache/shared_informer.go b/staging/src/k8s.io/client-go/tools/cache/shared_informer.go index f5c7316a1d7..0e39f2a7eea 100644 --- a/staging/src/k8s.io/client-go/tools/cache/shared_informer.go +++ b/staging/src/k8s.io/client-go/tools/cache/shared_informer.go @@ -223,14 +223,26 @@ type SharedIndexInformer interface { GetIndexer() Indexer } -// NewSharedInformer creates a new instance for the listwatcher. +// NewSharedInformer creates a new instance for the ListerWatcher. See NewSharedIndexInformerWithOptions for full details. func NewSharedInformer(lw ListerWatcher, exampleObject runtime.Object, defaultEventHandlerResyncPeriod time.Duration) SharedInformer { return NewSharedIndexInformer(lw, exampleObject, defaultEventHandlerResyncPeriod, Indexers{}) } -// NewSharedIndexInformer creates a new instance for the listwatcher. -// The created informer will not do resyncs if the given -// defaultEventHandlerResyncPeriod is zero. Otherwise: for each +// NewSharedIndexInformer creates a new instance for the ListerWatcher and specified Indexers. See +// NewSharedIndexInformerWithOptions for full details. +func NewSharedIndexInformer(lw ListerWatcher, exampleObject runtime.Object, defaultEventHandlerResyncPeriod time.Duration, indexers Indexers) SharedIndexInformer { + return NewSharedIndexInformerWithOptions( + lw, + exampleObject, + SharedIndexInformerOptions{ + ResyncPeriod: defaultEventHandlerResyncPeriod, + Indexers: indexers, + }, + ) +} + +// NewSharedIndexInformerWithOptions creates a new instance for the ListerWatcher. +// The created informer will not do resyncs if options.ResyncPeriod is zero. Otherwise: for each // handler that with a non-zero requested resync period, whether added // before or after the informer starts, the nominal resync period is // the requested resync period rounded up to a multiple of the @@ -238,21 +250,36 @@ func NewSharedInformer(lw ListerWatcher, exampleObject runtime.Object, defaultEv // checking period is established when the informer starts running, // and is the maximum of (a) the minimum of the resync periods // requested before the informer starts and the -// defaultEventHandlerResyncPeriod given here and (b) the constant +// options.ResyncPeriod given here and (b) the constant // `minimumResyncPeriod` defined in this file. -func NewSharedIndexInformer(lw ListerWatcher, exampleObject runtime.Object, defaultEventHandlerResyncPeriod time.Duration, indexers Indexers) SharedIndexInformer { +func NewSharedIndexInformerWithOptions(lw ListerWatcher, exampleObject runtime.Object, options SharedIndexInformerOptions) SharedIndexInformer { realClock := &clock.RealClock{} - sharedIndexInformer := &sharedIndexInformer{ + + return &sharedIndexInformer{ + indexer: NewIndexer(DeletionHandlingMetaNamespaceKeyFunc, options.Indexers), processor: &sharedProcessor{clock: realClock}, - indexer: NewIndexer(DeletionHandlingMetaNamespaceKeyFunc, indexers), listerWatcher: lw, objectType: exampleObject, - resyncCheckPeriod: defaultEventHandlerResyncPeriod, - defaultEventHandlerResyncPeriod: defaultEventHandlerResyncPeriod, - cacheMutationDetector: NewCacheMutationDetector(fmt.Sprintf("%T", exampleObject)), + objectDescription: options.ObjectDescription, + resyncCheckPeriod: options.ResyncPeriod, + defaultEventHandlerResyncPeriod: options.ResyncPeriod, clock: realClock, + cacheMutationDetector: NewCacheMutationDetector(fmt.Sprintf("%T", exampleObject)), } - return sharedIndexInformer +} + +// SharedIndexInformerOptions configures a sharedIndexInformer. +type SharedIndexInformerOptions struct { + // ResyncPeriod is the default event handler resync period and resync check + // period. If unset/unspecified, these are defaulted to 0 (do not resync). + ResyncPeriod time.Duration + + // Indexers is the sharedIndexInformer's indexers. If unset/unspecified, no indexers are configured. + Indexers Indexers + + // ObjectDescription is the sharedIndexInformer's object description. This is passed through to the + // underlying Reflector's type description. + ObjectDescription string } // InformerSynced is a function that can be used to determine if an informer has synced. This is useful for determining if caches have synced. @@ -326,12 +353,13 @@ type sharedIndexInformer struct { listerWatcher ListerWatcher - // objectType is an example object of the type this informer is - // expected to handle. Only the type needs to be right, except - // that when that is `unstructured.Unstructured` the object's - // `"apiVersion"` and `"kind"` must also be right. + // objectType is an example object of the type this informer is expected to handle. If set, an event + // with an object with a mismatching type is dropped instead of being delivered to listeners. objectType runtime.Object + // objectDescription is the description of this informer's objects. This typically defaults to + objectDescription string + // resyncCheckPeriod is how often we want the reflector's resync timer to fire so it can call // shouldResync to check if any of our listeners need a resync. resyncCheckPeriod time.Duration @@ -425,12 +453,13 @@ func (s *sharedIndexInformer) Run(stopCh <-chan struct{}) { }) cfg := &Config{ - Queue: fifo, - ListerWatcher: s.listerWatcher, - ObjectType: s.objectType, - FullResyncPeriod: s.resyncCheckPeriod, - RetryOnError: false, - ShouldResync: s.processor.shouldResync, + Queue: fifo, + ListerWatcher: s.listerWatcher, + ObjectType: s.objectType, + ObjectDescription: s.objectDescription, + FullResyncPeriod: s.resyncCheckPeriod, + RetryOnError: false, + ShouldResync: s.processor.shouldResync, Process: s.HandleDeltas, WatchErrorHandler: s.watchErrorHandler, From 784ec157e67c86bc3383b326bbfe8ee70737aa4d Mon Sep 17 00:00:00 2001 From: Andy Goldstein Date: Fri, 2 Dec 2022 12:39:58 -0500 Subject: [PATCH 2/2] reflector: refactor setting typeDescription & expectedGVK Signed-off-by: Andy Goldstein --- .../k8s.io/client-go/tools/cache/reflector.go | 35 ++++++++-------- .../client-go/tools/cache/reflector_test.go | 41 +++++-------------- 2 files changed, 28 insertions(+), 48 deletions(-) diff --git a/staging/src/k8s.io/client-go/tools/cache/reflector.go b/staging/src/k8s.io/client-go/tools/cache/reflector.go index dc3af67ab53..7325ae5f2d6 100644 --- a/staging/src/k8s.io/client-go/tools/cache/reflector.go +++ b/staging/src/k8s.io/client-go/tools/cache/reflector.go @@ -217,48 +217,49 @@ func NewReflectorWithOptions(lw ListerWatcher, expectedType interface{}, store S r.name = naming.GetNameFromCallsite(internalPackages...) } - r.setTypeDescription(expectedType) - r.setExpectedGVK(expectedType) + if r.typeDescription == "" { + r.typeDescription = getTypeDescriptionFromObject(expectedType) + } + + if r.expectedGVK == nil { + r.expectedGVK = getExpectedGVKFromObject(expectedType) + } return r } -func (r *Reflector) setTypeDescription(expectedType interface{}) { - if r.typeDescription != "" { - return +func getTypeDescriptionFromObject(expectedType interface{}) string { + if expectedType == nil { + return defaultExpectedTypeName } - if expectedType == nil { - r.typeDescription = defaultExpectedTypeName - } else { - r.typeDescription = reflect.TypeOf(expectedType).String() - } + reflectDescription := reflect.TypeOf(expectedType).String() obj, ok := expectedType.(*unstructured.Unstructured) if !ok { - return + return reflectDescription } gvk := obj.GroupVersionKind() if gvk.Empty() { - return + return reflectDescription } - r.typeDescription = gvk.String() + return gvk.String() } -func (r *Reflector) setExpectedGVK(expectedType interface{}) { +func getExpectedGVKFromObject(expectedType interface{}) *schema.GroupVersionKind { obj, ok := expectedType.(*unstructured.Unstructured) if !ok { - return + return nil } gvk := obj.GroupVersionKind() if gvk.Empty() { - return + return nil } - r.expectedGVK = &gvk + return &gvk } // internalPackages are packages that ignored when creating a default reflector name. These packages are in the common diff --git a/staging/src/k8s.io/client-go/tools/cache/reflector_test.go b/staging/src/k8s.io/client-go/tools/cache/reflector_test.go index c6cfece6f9a..fe2b052b8ba 100644 --- a/staging/src/k8s.io/client-go/tools/cache/reflector_test.go +++ b/staging/src/k8s.io/client-go/tools/cache/reflector_test.go @@ -979,7 +979,7 @@ func TestReflectorFullListIfTooLarge(t *testing.T) { } } -func TestReflectorSetTypeDescription(t *testing.T) { +func TestGetTypeDescriptionFromObject(t *testing.T) { obj := &unstructured.Unstructured{} gvk := schema.GroupVersionKind{ Group: "mygroup", @@ -990,7 +990,6 @@ func TestReflectorSetTypeDescription(t *testing.T) { testCases := map[string]struct { inputType interface{} - customDescription string expectedTypeDescription string }{ "Nil type": { @@ -1000,45 +999,26 @@ func TestReflectorSetTypeDescription(t *testing.T) { inputType: &v1.Pod{}, expectedTypeDescription: "*v1.Pod", }, - "Normal type with custom description": { - inputType: &v1.Pod{}, - customDescription: "foo", - expectedTypeDescription: "foo", - }, "Unstructured type without GVK": { inputType: &unstructured.Unstructured{}, expectedTypeDescription: "*unstructured.Unstructured", }, - "Unstructured type without GVK, with custom description": { - inputType: &unstructured.Unstructured{}, - customDescription: "foo", - expectedTypeDescription: "foo", - }, "Unstructured type with GVK": { inputType: obj, expectedTypeDescription: gvk.String(), }, - "Unstructured type with GVK, with custom type description": { - inputType: obj, - customDescription: "foo", - expectedTypeDescription: "foo", - }, } for testName, tc := range testCases { t.Run(testName, func(t *testing.T) { - r := &Reflector{ - typeDescription: tc.customDescription, - } - - r.setTypeDescription(tc.inputType) - if tc.expectedTypeDescription != r.typeDescription { - t.Fatalf("Expected typeDescription %v, got %v", tc.expectedTypeDescription, r.typeDescription) + typeDescription := getTypeDescriptionFromObject(tc.inputType) + if tc.expectedTypeDescription != typeDescription { + t.Fatalf("Expected typeDescription %v, got %v", tc.expectedTypeDescription, typeDescription) } }) } } -func TestReflectorSetExpectedGVK(t *testing.T) { +func TestGetExpectedGVKFromObject(t *testing.T) { obj := &unstructured.Unstructured{} gvk := schema.GroupVersionKind{ Group: "mygroup", @@ -1065,14 +1045,13 @@ func TestReflectorSetExpectedGVK(t *testing.T) { } for testName, tc := range testCases { t.Run(testName, func(t *testing.T) { - r := &Reflector{} - r.setExpectedGVK(tc.inputType) - gvkNotEqual := (tc.expectedGVK == nil) != (r.expectedGVK == nil) - if tc.expectedGVK != nil && r.expectedGVK != nil { - gvkNotEqual = *tc.expectedGVK != *r.expectedGVK + expectedGVK := getExpectedGVKFromObject(tc.inputType) + gvkNotEqual := (tc.expectedGVK == nil) != (expectedGVK == nil) + if tc.expectedGVK != nil && expectedGVK != nil { + gvkNotEqual = *tc.expectedGVK != *expectedGVK } if gvkNotEqual { - t.Fatalf("Expected expectedGVK %v, got %v", tc.expectedGVK, r.expectedGVK) + t.Fatalf("Expected expectedGVK %v, got %v", tc.expectedGVK, expectedGVK) } }) }