From b37e79caadf0d6c18009cd379b8de8629a889439 Mon Sep 17 00:00:00 2001 From: Patrick Ohly Date: Tue, 18 Nov 2025 18:09:46 +0100 Subject: [PATCH 1/3] client-go cache: allow passing name+logger to DeltaFIFO, RealFIFO and Reflector This improves logging and enables more informative waiting for cache sync in a following commit. It addresses one klog.TODO in the Reflector. The RealFIFOOptions and InformerOptions structs get extended the same way as DeltaFIFOOptions before: a logger may be set, but it's not required. This is not an API break. That the name has to be passed separately is a bit annoying at first glance because it could also be set directly on the logger through WithName, but keeping it separate is better: - name can be set without providing a logger - name can be defaulted - less code in the caller when passing through a logger and adding the name only in the field - last but not least, extracting the name is not supported in a portable manner by logr All in-tree references in production code get updated. While at it, logging in the fifos gets updated to follow best practices: if some code encounters an abnormal situation and then continues, it should use utilruntime.HandleErrorWithLogger instead of normal error logging. Existing "logger" fields get moved to the top because that is a more common place for such a read-only field. Kubernetes-commit: 45251e5f654e6c052659d110cd721f9fbe185191 --- tools/cache/controller.go | 14 ++++++- tools/cache/delta_fifo.go | 38 ++++++++++++------- tools/cache/reflector.go | 16 ++++++-- ...eflector_data_consistency_detector_test.go | 4 +- tools/cache/shared_informer.go | 2 +- tools/cache/the_real_fifo.go | 33 +++++++++++++--- 6 files changed, 81 insertions(+), 26 deletions(-) diff --git a/tools/cache/controller.go b/tools/cache/controller.go index b4e8295ab..a5d47716f 100644 --- a/tools/cache/controller.go +++ b/tools/cache/controller.go @@ -27,6 +27,7 @@ import ( utilruntime "k8s.io/apimachinery/pkg/util/runtime" "k8s.io/apimachinery/pkg/util/wait" clientgofeaturegate "k8s.io/client-go/features" + "k8s.io/klog/v2" "k8s.io/utils/clock" ) @@ -395,6 +396,9 @@ func DeletionHandlingObjectToName(obj interface{}) (ObjectName, error) { // InformerOptions configure a Reflector. type InformerOptions struct { + // Logger, if not nil, is used instead of klog.Background() for logging. + Logger *klog.Logger + // ListerWatcher implements List and Watch functions for the source of the resource // the informer will be informing about. ListerWatcher ListerWatcher @@ -784,7 +788,11 @@ func newInformer(clientState Store, options InformerOptions, keyFunc KeyFunc) Co // KeyLister, that way resync operations will result in the correct set // of update/delete deltas. - fifo := newQueueFIFO(clientState, options.Transform, options.Identifier, options.FIFOMetricsProvider) + logger := klog.Background() + if options.Logger != nil { + logger = *options.Logger + } + fifo := newQueueFIFO(logger, clientState, options.Transform, options.Identifier, options.FIFOMetricsProvider) cfg := &Config{ Queue: fifo, @@ -806,9 +814,10 @@ func newInformer(clientState Store, options InformerOptions, keyFunc KeyFunc) Co return New(cfg) } -func newQueueFIFO(clientState Store, transform TransformFunc, identifier InformerNameAndResource, metricsProvider FIFOMetricsProvider) Queue { +func newQueueFIFO(logger klog.Logger, clientState Store, transform TransformFunc, identifier InformerNameAndResource, metricsProvider FIFOMetricsProvider) Queue { if clientgofeaturegate.FeatureGates().Enabled(clientgofeaturegate.InOrderInformers) { options := RealFIFOOptions{ + Logger: &logger, KeyFunction: MetaNamespaceKeyFunc, Transformer: transform, Identifier: identifier, @@ -824,6 +833,7 @@ func newQueueFIFO(clientState Store, transform TransformFunc, identifier Informe return NewRealFIFOWithOptions(options) } else { return NewDeltaFIFOWithOptions(DeltaFIFOOptions{ + Logger: &logger, KnownObjects: clientState, EmitDeltaTypeReplaced: true, Transformer: transform, diff --git a/tools/cache/delta_fifo.go b/tools/cache/delta_fifo.go index 059c78bd3..12157dad5 100644 --- a/tools/cache/delta_fifo.go +++ b/tools/cache/delta_fifo.go @@ -22,6 +22,7 @@ import ( "sync" "time" + utilruntime "k8s.io/apimachinery/pkg/util/runtime" "k8s.io/apimachinery/pkg/util/sets" "k8s.io/klog/v2" @@ -31,6 +32,12 @@ import ( // DeltaFIFOOptions is the configuration parameters for DeltaFIFO. All are // optional. type DeltaFIFOOptions struct { + // If set, log output will go to this logger instead of klog.Background(). + // The name of the fifo gets added automatically. + Logger *klog.Logger + + // Name can be used to override the default "DeltaFIFO" name for the new instance. + Name string // KeyFunction is used to figure out what key an object should have. (It's // exposed in the returned DeltaFIFO's KeyOf() method, with additional @@ -55,9 +62,6 @@ type DeltaFIFOOptions struct { // If set, will be called for objects before enqueueing them. Please // see the comment on TransformFunc for details. Transformer TransformFunc - - // If set, log output will go to this logger instead of klog.Background(). - Logger *klog.Logger } // DeltaFIFO is like FIFO, but differs in two ways. One is that the @@ -102,6 +106,13 @@ type DeltaFIFOOptions struct { // threads, you could end up with multiple threads processing slightly // different versions of the same object. type DeltaFIFO struct { + // logger is a per-instance logger. This gets chosen when constructing + // the instance, with klog.Background() as default. + logger klog.Logger + + // name is the name of the fifo. It is included in the logger. + name string + // lock/cond protects access to 'items' and 'queue'. lock sync.RWMutex cond sync.Cond @@ -139,10 +150,6 @@ type DeltaFIFO struct { // Called with every object if non-nil. transformer TransformFunc - - // logger is a per-instance logger. This gets chosen when constructing - // the instance, with klog.Background() as default. - logger klog.Logger } // TransformFunc allows for transforming an object before it will be processed. @@ -263,6 +270,8 @@ func NewDeltaFIFOWithOptions(opts DeltaFIFOOptions) *DeltaFIFO { } f := &DeltaFIFO{ + logger: klog.Background(), + name: "DeltaFIFO", items: map[string]Deltas{}, queue: []string{}, keyFunc: opts.KeyFunction, @@ -270,11 +279,14 @@ func NewDeltaFIFOWithOptions(opts DeltaFIFOOptions) *DeltaFIFO { emitDeltaTypeReplaced: opts.EmitDeltaTypeReplaced, transformer: opts.Transformer, - logger: klog.Background(), } if opts.Logger != nil { f.logger = *opts.Logger } + if opts.Name != "" { + f.name = opts.Name + } + f.logger = klog.LoggerWithName(f.logger, f.name) f.cond.L = &f.lock return f } @@ -477,10 +489,10 @@ func (f *DeltaFIFO) queueActionInternalLocked(actionType, internalActionType Del // when given a non-empty list (as it is here). // If somehow it happens anyway, deal with it but complain. if oldDeltas == nil { - f.logger.Error(nil, "Impossible dedupDeltas, ignoring", "id", id, "oldDeltas", oldDeltas, "obj", obj) + utilruntime.HandleErrorWithLogger(f.logger, nil, "Impossible dedupDeltas, ignoring", "id", id, "oldDeltas", oldDeltas, "obj", obj) return nil } - f.logger.Error(nil, "Impossible dedupDeltas, breaking invariant by storing empty Deltas", "id", id, "oldDeltas", oldDeltas, "obj", obj) + utilruntime.HandleErrorWithLogger(f.logger, nil, "Impossible dedupDeltas, breaking invariant by storing empty Deltas", "id", id, "oldDeltas", oldDeltas, "obj", obj) f.items[id] = newDeltas return fmt.Errorf("Impossible dedupDeltas for id=%q: oldDeltas=%#+v, obj=%#+v; broke DeltaFIFO invariant by storing empty Deltas", id, oldDeltas, obj) } @@ -530,7 +542,7 @@ func (f *DeltaFIFO) Pop(process PopProcessFunc) (interface{}, error) { item, ok := f.items[id] if !ok { // This should never happen - f.logger.Error(nil, "Inconceivable! Item was in f.queue but not f.items; ignoring", "id", id) + utilruntime.HandleErrorWithLogger(f.logger, nil, "Inconceivable! Item was in f.queue but not f.items; ignoring", "id", id) continue } delete(f.items, id) @@ -623,7 +635,7 @@ func (f *DeltaFIFO) Replace(list []interface{}, _ string) error { deletedObj, exists, err := f.knownObjects.GetByKey(k) if err != nil { deletedObj = nil - f.logger.Error(err, "Unexpected error during lookup, placing DeleteFinalStateUnknown marker without object", "key", k) + utilruntime.HandleErrorWithLogger(f.logger, err, "Unexpected error during lookup, placing DeleteFinalStateUnknown marker without object", "key", k) } else if !exists { deletedObj = nil f.logger.Info("Key does not exist in known objects store, placing DeleteFinalStateUnknown marker without object", "key", k) @@ -666,7 +678,7 @@ func (f *DeltaFIFO) Resync() error { func (f *DeltaFIFO) syncKeyLocked(key string) error { obj, exists, err := f.knownObjects.GetByKey(key) if err != nil { - f.logger.Error(err, "Unexpected error during lookup, unable to queue object for sync", "key", key) + utilruntime.HandleErrorWithLogger(f.logger, err, "Unexpected error during lookup, unable to queue object for sync", "key", key) return nil } else if !exists { f.logger.Info("Key does not exist in known objects store, unable to queue object for sync", "key", key) diff --git a/tools/cache/reflector.go b/tools/cache/reflector.go index 4d8d73d3b..62ea68c72 100644 --- a/tools/cache/reflector.go +++ b/tools/cache/reflector.go @@ -85,6 +85,7 @@ type TransformingStore interface { // Reflector watches a specified resource and causes all changes to be reflected in the given store. type Reflector struct { + logger klog.Logger // name identifies this reflector. By default, it will be a file:line if possible. name string // The name of the type we expect to place in the store. The name @@ -227,6 +228,10 @@ func NewNamedReflector(name string, lw ListerWatcher, expectedType interface{}, // ReflectorOptions configures a Reflector. type ReflectorOptions struct { + // Logger, if not nil, is used instead of klog.Background() for logging. + // The name of the reflector gets added automatically. + Logger *klog.Logger + // Name is the Reflector's name. If unset/unspecified, the name defaults to the closest source_file.go:line // in the call stack that is outside this package. Name string @@ -289,6 +294,13 @@ func NewReflectorWithOptions(lw ListerWatcher, expectedType interface{}, store R r.name = naming.GetNameFromCallsite(internalPackages...) } + logger := klog.Background() + if options.Logger != nil { + logger = *options.Logger + } + logger = klog.LoggerWithName(logger, r.name) + r.logger = logger + if r.typeDescription == "" { r.typeDescription = getTypeDescriptionFromObject(expectedType) } @@ -299,9 +311,7 @@ func NewReflectorWithOptions(lw ListerWatcher, expectedType interface{}, store R r.useWatchList = clientfeatures.FeatureGates().Enabled(clientfeatures.WatchListClient) if r.useWatchList && watchlist.DoesClientNotSupportWatchListSemantics(lw) { - // Using klog.TODO() here because switching to a caller-provided contextual logger - // would require an API change and updating all existing call sites. - klog.TODO().V(2).Info( + r.logger.V(2).Info( "The client used to build this informer/reflector doesn't support WatchList semantics. The feature will be disabled. This is expected in unit tests but not in production. For details, see the documentation of watchlist.DoesClientNotSupportWatchListSemantics().", "feature", clientfeatures.WatchListClient, ) diff --git a/tools/cache/reflector_data_consistency_detector_test.go b/tools/cache/reflector_data_consistency_detector_test.go index 34abcb278..f6670a42c 100644 --- a/tools/cache/reflector_data_consistency_detector_test.go +++ b/tools/cache/reflector_data_consistency_detector_test.go @@ -68,12 +68,12 @@ func TestReflectorDataConsistencyDetector(t *testing.T) { } func runTestReflectorDataConsistencyDetector(t *testing.T, transformer TransformFunc) { - _, ctx := ktesting.NewTestContext(t) + logger, ctx := ktesting.NewTestContext(t) ctx, cancel := context.WithCancel(ctx) defer cancel() store := NewStore(MetaNamespaceKeyFunc) - fifo := newQueueFIFO(store, transformer, InformerNameAndResource{}, nil) + fifo := newQueueFIFO(logger, store, transformer, InformerNameAndResource{}, nil) lw := &ListWatch{ ListFunc: func(options metav1.ListOptions) (runtime.Object, error) { diff --git a/tools/cache/shared_informer.go b/tools/cache/shared_informer.go index b1339316e..45daafa61 100644 --- a/tools/cache/shared_informer.go +++ b/tools/cache/shared_informer.go @@ -563,7 +563,7 @@ func (s *sharedIndexInformer) RunWithContext(ctx context.Context) { s.startedLock.Lock() defer s.startedLock.Unlock() - fifo := newQueueFIFO(s.indexer, s.transform, s.identifier, s.fifoMetricsProvider) + fifo := newQueueFIFO(logger, s.indexer, s.transform, s.identifier, s.fifoMetricsProvider) cfg := &Config{ Queue: fifo, diff --git a/tools/cache/the_real_fifo.go b/tools/cache/the_real_fifo.go index 5ac89c945..c60d2ebb8 100644 --- a/tools/cache/the_real_fifo.go +++ b/tools/cache/the_real_fifo.go @@ -23,11 +23,19 @@ import ( utilruntime "k8s.io/apimachinery/pkg/util/runtime" "k8s.io/apimachinery/pkg/util/sets" + "k8s.io/klog/v2" utiltrace "k8s.io/utils/trace" ) // RealFIFOOptions is the configuration parameters for RealFIFO. type RealFIFOOptions struct { + // If set, log output will go to this logger instead of klog.Background(). + // The name of the fifo gets added automatically. + Logger *klog.Logger + + // Name can be used to override the default "RealFIFO" name for the new instance. + Name string + // KeyFunction is used to figure out what key an object should have. (It's // exposed in the returned RealFIFO's keyOf() method, with additional // handling around deleted objects and queue state). @@ -78,6 +86,13 @@ var _ QueueWithBatch = &RealFIFO{} // 1. delivers notifications for items that have been deleted // 2. delivers multiple notifications per item instead of simply the most recent value type RealFIFO struct { + // logger is a per-instance logger. This gets chosen when constructing + // the instance, with klog.Background() as default. + logger klog.Logger + + // name is the name of the fifo. It is included in the logger. + name string + lock sync.RWMutex cond sync.Cond @@ -623,10 +638,10 @@ func reconcileReplacement( deletedObj, exists, err := knownObjects.GetByKey(knownKey) if err != nil { deletedObj = nil - utilruntime.HandleError(fmt.Errorf("error during lookup, placing DeleteFinalStateUnknown marker without object: key=%q, err=%w", knownKey, err)) + utilruntime.HandleErrorWithLogger(klog.TODO(), err, "Error during lookup, placing DeleteFinalStateUnknown marker without object", "key", knownKey) } else if !exists { deletedObj = nil - utilruntime.HandleError(fmt.Errorf("key does not exist in known objects store, placing DeleteFinalStateUnknown marker without object: key=%q", knownKey)) + utilruntime.HandleErrorWithLogger(klog.TODO(), nil, "Key does not exist in known objects store, placing DeleteFinalStateUnknown marker without object", "key", knownKey) } retErr := onDelete(DeletedFinalStateUnknown{ Key: knownKey, @@ -683,10 +698,10 @@ func (f *RealFIFO) Resync() error { knownObj, exists, err := f.knownObjects.GetByKey(knownKey) if err != nil { - utilruntime.HandleError(fmt.Errorf("unable to queue object for sync: key=%q, err=%w", knownKey, err)) + utilruntime.HandleErrorWithLogger(f.logger, err, "Unable to queue object for sync", "key", knownKey) continue } else if !exists { - utilruntime.HandleError(fmt.Errorf("key does not exist in known objects store, unable to queue object for sync: key=%q", knownKey)) + utilruntime.HandleErrorWithLogger(f.logger, nil, "Key does not exist in known objects store, unable to queue object for sync", "key", knownKey) continue } @@ -739,6 +754,8 @@ func NewRealFIFOWithOptions(opts RealFIFOOptions) *RealFIFO { } f := &RealFIFO{ + logger: klog.Background(), + name: "RealFIFO", items: make([]Delta, 0, 10), keyFunc: opts.KeyFunction, knownObjects: opts.KnownObjects, @@ -749,7 +766,13 @@ func NewRealFIFOWithOptions(opts RealFIFOOptions) *RealFIFO { identifier: opts.Identifier, metrics: newFIFOMetrics(opts.Identifier, opts.MetricsProvider), } - + if opts.Logger != nil { + f.logger = *opts.Logger + } + if opts.Name != "" { + f.name = opts.Name + } + f.logger = klog.LoggerWithName(f.logger, f.name) f.cond.L = &f.lock return f } From e70bc766e02bf143f7de9fd2e6a40f66a9e21ffd Mon Sep 17 00:00:00 2001 From: Patrick Ohly Date: Tue, 18 Nov 2025 12:39:11 +0100 Subject: [PATCH 2/3] client-go cache: wait for cache sync via channels, better logging The main advantage is that waiting on channels creates a causal relationship between goroutines which is visible to synctest. When a controller in a synctest bubble does a WaitFor in a test's background goroutine for the controller, the test can use synctest.Wait to wait for completion of cache sync, without requiring any test specific "has controller synced" API. Without this, the test had to poll or otherwise wait for the controller. The polling in WaitForCacheSync moved the virtual clock forward by a random amount, depending on how often it had to check in wait.Poll. Now tests can be written such that all events during a test happen at a predictable time. This will be demonstrated in a separate commit for the pkg/controller/devicetainteviction unit test. The benefit for normal production is immediate continuation when the last informer is synced (not really a problem, but still...) and more important, nicer logging thanks to the names associated with the thing that is being waited for. The caller decides whether logging is enabled or disabled and describes what is being waited for (typically informer caches, but maybe also event handlers or even something else entirely as long as it implements the DoneChecker interface). Before: Waiting for caches to sync Caches are synced After: Waiting for="cache and event handler sync" Done waiting for="cache and event handler sync" instance="SharedIndexInformer *v1.Pod" Done waiting for="cache and event handler sync" instance="SharedIndexInformer *v1.ResourceClaim" Done waiting for="cache and event handler sync" instance="SharedIndexInformer *v1.ResourceSlice" Done waiting for="cache and event handler sync" instance="SharedIndexInformer *v1.DeviceClass" Done waiting for="cache and event handler sync" instance="SharedIndexInformer *v1alpha3.DeviceTaintRule" Done waiting for="cache and event handler sync" instance="SharedIndexInformer *v1.ResourceClaim + event handler k8s.io/kubernetes/pkg/controller/devicetainteviction.(*Controller).Run" Done waiting for="cache and event handler sync" instance="SharedIndexInformer *v1.Pod + event handler k8s.io/kubernetes/pkg/controller/devicetainteviction.(*Controller).Run" Done waiting for="cache and event handler sync" instance="SharedIndexInformer *v1alpha3.DeviceTaintRule + event handler k8s.io/kubernetes/pkg/controller/devicetainteviction.(*Controller).Run" Done waiting for="cache and event handler sync" instance="SharedIndexInformer *v1.ResourceSlice + event handler k8s.io/kubernetes/pkg/controller/devicetainteviction.(*Controller).Run" The "SharedIndexInformer *v1.Pod" is also how this appears in metrics. Kubernetes-commit: fdcbb6cba9a04c028b158bf66d505df7431f63fe --- tools/cache/controller.go | 48 +++- tools/cache/controller_bench_test.go | 8 +- tools/cache/controller_test.go | 10 +- tools/cache/delta_fifo.go | 46 +++- tools/cache/event_handler_name.go | 119 ++++++++++ tools/cache/event_handler_name_test.go | 87 +++++++ tools/cache/fifo.go | 50 +++- tools/cache/processor_listener_test.go | 22 +- ...eflector_data_consistency_detector_test.go | 2 +- tools/cache/shared_informer.go | 216 ++++++++++++++++-- tools/cache/shared_informer_test.go | 9 +- tools/cache/synctrack/synctrack.go | 132 +++++++++-- tools/cache/synctrack/synctrack_test.go | 195 ++++++++-------- tools/cache/the_real_fifo.go | 63 ++++- tools/cache/wait_test.go | 200 ++++++++++++++++ 15 files changed, 1040 insertions(+), 167 deletions(-) create mode 100644 tools/cache/event_handler_name.go create mode 100644 tools/cache/event_handler_name_test.go create mode 100644 tools/cache/wait_test.go diff --git a/tools/cache/controller.go b/tools/cache/controller.go index a5d47716f..75d2ca979 100644 --- a/tools/cache/controller.go +++ b/tools/cache/controller.go @@ -142,6 +142,11 @@ type Controller interface { // HasSynced delegates to the Config's Queue HasSynced() bool + // HasSyncedChecker enables waiting for syncing without polling. + // The returned DoneChecker can be passed to WaitFor. + // It delegates to the Config's Queue. + HasSyncedChecker() DoneChecker + // LastSyncResourceVersion delegates to the Reflector when there // is one, otherwise returns the empty string LastSyncResourceVersion() string @@ -168,11 +173,13 @@ func (c *controller) RunWithContext(ctx context.Context) { <-ctx.Done() c.config.Queue.Close() }() + logger := klog.FromContext(ctx) r := NewReflectorWithOptions( c.config.ListerWatcher, c.config.ObjectType, c.config.Queue, ReflectorOptions{ + Logger: &logger, ResyncPeriod: c.config.FullResyncPeriod, MinWatchTimeout: c.config.MinWatchTimeout, TypeDescription: c.config.ObjectDescription, @@ -206,6 +213,13 @@ func (c *controller) HasSynced() bool { return c.config.Queue.HasSynced() } +// HasSyncedChecker enables waiting for syncing without polling. +// The returned DoneChecker can be passed to [WaitFor]. +// It delegates to the Config's Queue. +func (c *controller) HasSyncedChecker() DoneChecker { + return c.config.Queue.HasSyncedChecker() +} + func (c *controller) LastSyncResourceVersion() string { c.reflectorMutex.RLock() defer c.reflectorMutex.RUnlock() @@ -591,6 +605,7 @@ func NewTransformingIndexerInformer( // Multiplexes updates in the form of a list of Deltas into a Store, and informs // a given handler of events OnUpdate, OnAdd, OnDelete func processDeltas( + logger klog.Logger, // Object which receives event notifications from the given deltas handler ResourceEventHandler, clientState Store, @@ -608,7 +623,7 @@ func processDeltas( if !ok { return fmt.Errorf("ReplacedAll did not contain ReplacedAllInfo: %T", obj) } - if err := processReplacedAllInfo(handler, info, clientState, isInInitialList, keyFunc); err != nil { + if err := processReplacedAllInfo(logger, handler, info, clientState, isInInitialList, keyFunc); err != nil { return err } case SyncAll: @@ -653,6 +668,7 @@ func processDeltas( // Returns an error if any Delta or transaction fails. For TransactionError, // only successful operations trigger callbacks. func processDeltasInBatch( + logger klog.Logger, handler ResourceEventHandler, clientState Store, deltas []Delta, @@ -666,7 +682,7 @@ func processDeltasInBatch( if !txnSupported { var errs []error for _, delta := range deltas { - if err := processDeltas(handler, clientState, Deltas{delta}, isInInitialList, keyFunc); err != nil { + if err := processDeltas(logger, handler, clientState, Deltas{delta}, isInInitialList, keyFunc); err != nil { errs = append(errs, err) } } @@ -731,7 +747,7 @@ func processDeltasInBatch( return nil } -func processReplacedAllInfo(handler ResourceEventHandler, info ReplacedAllInfo, clientState Store, isInInitialList bool, keyFunc KeyFunc) error { +func processReplacedAllInfo(logger klog.Logger, handler ResourceEventHandler, info ReplacedAllInfo, clientState Store, isInInitialList bool, keyFunc KeyFunc) error { var deletions []DeletedFinalStateUnknown type replacement struct { oldObj interface{} @@ -739,7 +755,7 @@ func processReplacedAllInfo(handler ResourceEventHandler, info ReplacedAllInfo, } replacements := make([]replacement, 0, len(info.Objects)) - err := reconcileReplacement(nil, clientState, info.Objects, keyFunc, + err := reconcileReplacement(logger, nil, clientState, info.Objects, keyFunc, func(obj DeletedFinalStateUnknown) error { deletions = append(deletions, obj) return nil @@ -792,7 +808,7 @@ func newInformer(clientState Store, options InformerOptions, keyFunc KeyFunc) Co if options.Logger != nil { logger = *options.Logger } - fifo := newQueueFIFO(logger, clientState, options.Transform, options.Identifier, options.FIFOMetricsProvider) + logger, fifo := newQueueFIFO(logger, options.ObjectType, clientState, options.Transform, options.Identifier, options.FIFOMetricsProvider) cfg := &Config{ Queue: fifo, @@ -803,21 +819,30 @@ func newInformer(clientState Store, options InformerOptions, keyFunc KeyFunc) Co Process: func(obj interface{}, isInInitialList bool) error { if deltas, ok := obj.(Deltas); ok { - return processDeltas(options.Handler, clientState, deltas, isInInitialList, keyFunc) + // This must be the logger *of the fifo*. + return processDeltas(logger, options.Handler, clientState, deltas, isInInitialList, keyFunc) } return errors.New("object given as Process argument is not Deltas") }, ProcessBatch: func(deltaList []Delta, isInInitialList bool) error { - return processDeltasInBatch(options.Handler, clientState, deltaList, isInInitialList, keyFunc) + // Same here. + return processDeltasInBatch(logger, options.Handler, clientState, deltaList, isInInitialList, keyFunc) }, } return New(cfg) } -func newQueueFIFO(logger klog.Logger, clientState Store, transform TransformFunc, identifier InformerNameAndResource, metricsProvider FIFOMetricsProvider) Queue { +// newQueueFIFO constructs a new FIFO, choosing between real and delta FIFO +// depending on the InOrderInformers feature gate. +// +// It returns the FIFO and the logger used by the FIFO. +// That logger includes the name used for the FIFO, +// in contrast to the logger which was passed in. +func newQueueFIFO(logger klog.Logger, objectType any, clientState Store, transform TransformFunc, identifier InformerNameAndResource, metricsProvider FIFOMetricsProvider) (klog.Logger, Queue) { if clientgofeaturegate.FeatureGates().Enabled(clientgofeaturegate.InOrderInformers) { options := RealFIFOOptions{ Logger: &logger, + Name: fmt.Sprintf("RealFIFO %T", objectType), KeyFunction: MetaNamespaceKeyFunc, Transformer: transform, Identifier: identifier, @@ -830,13 +855,16 @@ func newQueueFIFO(logger klog.Logger, clientState Store, transform TransformFunc } else { options.KnownObjects = clientState } - return NewRealFIFOWithOptions(options) + f := NewRealFIFOWithOptions(options) + return f.logger, f } else { - return NewDeltaFIFOWithOptions(DeltaFIFOOptions{ + f := NewDeltaFIFOWithOptions(DeltaFIFOOptions{ Logger: &logger, + Name: fmt.Sprintf("DeltaFIFO %T", objectType), KnownObjects: clientState, EmitDeltaTypeReplaced: true, Transformer: transform, }) + return f.logger, f } } diff --git a/tools/cache/controller_bench_test.go b/tools/cache/controller_bench_test.go index 754ab3f12..f81a3d3f9 100644 --- a/tools/cache/controller_bench_test.go +++ b/tools/cache/controller_bench_test.go @@ -26,6 +26,7 @@ import ( v1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" fcache "k8s.io/client-go/tools/cache/testing" + "k8s.io/klog/v2/ktesting" ) const handlerWaitTime = time.Millisecond @@ -33,7 +34,8 @@ const handlerWaitTime = time.Millisecond func BenchmarkAddWithSlowHandlers(b *testing.B) { for _, unlockWhileProcessing := range []bool{false, true} { b.Run(fmt.Sprintf("unlockWhileProcessing=%t", unlockWhileProcessing), func(b *testing.B) { - ctx, cancel := context.WithCancel(context.Background()) + logger, ctx := ktesting.NewTestContext(b) + ctx, cancel := context.WithCancel(ctx) source := fcache.NewFakeControllerSource() b.Cleanup(func() { cancel() @@ -65,12 +67,12 @@ func BenchmarkAddWithSlowHandlers(b *testing.B) { Process: func(obj interface{}, isInInitialList bool) error { if deltas, ok := obj.(Deltas); ok { - return processDeltas(handler, store, deltas, isInInitialList, DeletionHandlingMetaNamespaceKeyFunc) + return processDeltas(logger, handler, store, deltas, isInInitialList, DeletionHandlingMetaNamespaceKeyFunc) } return errors.New("object given as Process argument is not Deltas") }, ProcessBatch: func(deltaList []Delta, isInInitialList bool) error { - return processDeltasInBatch(handler, store, deltaList, isInInitialList, DeletionHandlingMetaNamespaceKeyFunc) + return processDeltasInBatch(logger, handler, store, deltaList, isInInitialList, DeletionHandlingMetaNamespaceKeyFunc) }, } c := New(cfg) diff --git a/tools/cache/controller_test.go b/tools/cache/controller_test.go index c4ee71f47..9528dc6ca 100644 --- a/tools/cache/controller_test.go +++ b/tools/cache/controller_test.go @@ -834,6 +834,7 @@ func TestProcessDeltasInBatch(t *testing.T) { } for _, tc := range testCases { t.Run(tc.name, func(t *testing.T) { + logger, _ := ktesting.NewTestContext(t) mockStore := &mockTxnStore{ Store: NewStore(MetaNamespaceKeyFunc), failingObjs: tc.failingObjects, @@ -851,6 +852,7 @@ func TestProcessDeltasInBatch(t *testing.T) { }, } err := processDeltasInBatch( + logger, dummyListener, mockStore, tc.deltaList, @@ -929,12 +931,12 @@ func TestReplaceEvents(t *testing.T) { Process: func(obj interface{}, isInInitialList bool) error { if deltas, ok := obj.(Deltas); ok { - return processDeltas(recorder, store, deltas, isInInitialList, DeletionHandlingMetaNamespaceKeyFunc) + return processDeltas(fifo.logger, recorder, store, deltas, isInInitialList, DeletionHandlingMetaNamespaceKeyFunc) } return errors.New("object given as Process argument is not Deltas") }, ProcessBatch: func(deltaList []Delta, isInInitialList bool) error { - return processDeltasInBatch(recorder, store, deltaList, isInInitialList, DeletionHandlingMetaNamespaceKeyFunc) + return processDeltasInBatch(fifo.logger, recorder, store, deltaList, isInInitialList, DeletionHandlingMetaNamespaceKeyFunc) }, } c := New(cfg) @@ -1066,12 +1068,12 @@ func TestResetWatch(t *testing.T) { Process: func(obj interface{}, isInInitialList bool) error { if deltas, ok := obj.(Deltas); ok { - return processDeltas(recorder, store, deltas, isInInitialList, DeletionHandlingMetaNamespaceKeyFunc) + return processDeltas(fifo.logger, recorder, store, deltas, isInInitialList, DeletionHandlingMetaNamespaceKeyFunc) } return errors.New("object given as Process argument is not Deltas") }, ProcessBatch: func(deltaList []Delta, isInInitialList bool) error { - return processDeltasInBatch(recorder, store, deltaList, isInInitialList, DeletionHandlingMetaNamespaceKeyFunc) + return processDeltasInBatch(fifo.logger, recorder, store, deltaList, isInInitialList, DeletionHandlingMetaNamespaceKeyFunc) }, } c := New(cfg) diff --git a/tools/cache/delta_fifo.go b/tools/cache/delta_fifo.go index 12157dad5..b57cc4728 100644 --- a/tools/cache/delta_fifo.go +++ b/tools/cache/delta_fifo.go @@ -126,6 +126,11 @@ type DeltaFIFO struct { // A key is in `queue` if and only if it is in `items`. queue []string + // synced is initially an open channel. It gets closed (once!) by checkSynced_locked + // as soon as the initial sync is considered complete. + synced chan struct{} + syncedClosed bool + // populated is true if the first batch of items inserted by Replace() has been populated // or Delete/Add/Update/AddIfNotPresent was called first. populated bool @@ -272,6 +277,7 @@ func NewDeltaFIFOWithOptions(opts DeltaFIFOOptions) *DeltaFIFO { f := &DeltaFIFO{ logger: klog.Background(), name: "DeltaFIFO", + synced: make(chan struct{}), items: map[string]Deltas{}, queue: []string{}, keyFunc: opts.KeyFunction, @@ -283,8 +289,8 @@ func NewDeltaFIFOWithOptions(opts DeltaFIFOOptions) *DeltaFIFO { if opts.Logger != nil { f.logger = *opts.Logger } - if opts.Name != "" { - f.name = opts.Name + if name := opts.Name; name != "" { + f.name = name } f.logger = klog.LoggerWithName(f.logger, f.name) f.cond.L = &f.lock @@ -294,6 +300,7 @@ func NewDeltaFIFOWithOptions(opts DeltaFIFOOptions) *DeltaFIFO { var ( _ = Queue(&DeltaFIFO{}) // DeltaFIFO is a Queue _ = TransformingStore(&DeltaFIFO{}) // DeltaFIFO implements TransformingStore to allow memory optimizations + _ = DoneChecker(&DeltaFIFO{}) // DeltaFIFO implements DoneChecker. ) var ( @@ -339,8 +346,36 @@ func (f *DeltaFIFO) HasSynced() bool { return f.hasSynced_locked() } +// HasSyncedChecker is done if an Add/Update/Delete/AddIfNotPresent are called first, +// or the first batch of items inserted by Replace() has been popped. +func (f *DeltaFIFO) HasSyncedChecker() DoneChecker { + return f +} + +// Name implements [DoneChecker.Name] +func (f *DeltaFIFO) Name() string { + return f.name +} + +// Done implements [DoneChecker.Done] +func (f *DeltaFIFO) Done() <-chan struct{} { + return f.synced +} + +// hasSynced_locked returns the result of a prior checkSynced_locked call. func (f *DeltaFIFO) hasSynced_locked() bool { - return f.populated && f.initialPopulationCount == 0 + return f.syncedClosed +} + +// checkSynced_locked checks whether the initial is completed. +// It must be called whenever populated or initialPopulationCount change. +func (f *DeltaFIFO) checkSynced_locked() { + synced := f.populated && f.initialPopulationCount == 0 + if synced && !f.syncedClosed { + // Initial sync is complete. + f.syncedClosed = true + close(f.synced) + } } // Add inserts an item, and puts it in the queue. The item is only enqueued @@ -349,6 +384,7 @@ func (f *DeltaFIFO) Add(obj interface{}) error { f.lock.Lock() defer f.lock.Unlock() f.populated = true + f.checkSynced_locked() return f.queueActionLocked(Added, obj) } @@ -357,6 +393,7 @@ func (f *DeltaFIFO) Update(obj interface{}) error { f.lock.Lock() defer f.lock.Unlock() f.populated = true + f.checkSynced_locked() return f.queueActionLocked(Updated, obj) } @@ -373,6 +410,7 @@ func (f *DeltaFIFO) Delete(obj interface{}) error { f.lock.Lock() defer f.lock.Unlock() f.populated = true + f.checkSynced_locked() if f.knownObjects == nil { if _, exists := f.items[id]; !exists { // Presumably, this was deleted when a relist happened. @@ -538,6 +576,7 @@ func (f *DeltaFIFO) Pop(process PopProcessFunc) (interface{}, error) { depth := len(f.queue) if f.initialPopulationCount > 0 { f.initialPopulationCount-- + f.checkSynced_locked() } item, ok := f.items[id] if !ok { @@ -650,6 +689,7 @@ func (f *DeltaFIFO) Replace(list []interface{}, _ string) error { if !f.populated { f.populated = true f.initialPopulationCount = keys.Len() + queuedDeletions + f.checkSynced_locked() } return nil diff --git a/tools/cache/event_handler_name.go b/tools/cache/event_handler_name.go new file mode 100644 index 000000000..9489f9b47 --- /dev/null +++ b/tools/cache/event_handler_name.go @@ -0,0 +1,119 @@ +/* +Copyright The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package cache + +import ( + "fmt" + "reflect" + "runtime" + "strings" +) + +func nameForHandler(handler ResourceEventHandler) (name string) { + defer func() { + // Last resort: let Sprintf handle it. + if name == "" { + name = fmt.Sprintf("%T", handler) + } + }() + + if handler == nil { + return "" + } + switch handler := handler.(type) { + case *ResourceEventHandlerFuncs: + return nameForHandlerFuncs(*handler) + case ResourceEventHandlerFuncs: + return nameForHandlerFuncs(handler) + default: + // We can use the fully qualified name of whatever + // provides the interface. We don't care whether + // it contains fields or methods which provide + // the interface methods. + value := reflect.ValueOf(handler) + if value.Type().Kind() == reflect.Interface { + // Probably not needed, but let's play it safe. + value = value.Elem() + } + if value.Type().Kind() == reflect.Pointer { + value = value.Elem() + } + name := value.Type().PkgPath() + if name != "" { + name += "." + } + if typeName := value.Type().Name(); typeName != "" { + name += typeName + } + return name + } +} + +func nameForHandlerFuncs(funcs ResourceEventHandlerFuncs) string { + return nameForFunctions(funcs.AddFunc, funcs.UpdateFunc, funcs.DeleteFunc) +} + +func nameForFunctions(fs ...any) string { + // If all functions are defined in the same place, then we + // don't care about the actual function name in + // e.g. "main.FuncName" or "main.(*Foo).FuncName-fm", instead + // we use the common qualifier. + // + // But we don't know that yet, so we also collect all names. + var qualifier string + singleQualifier := true + var names []string + for _, f := range fs { + if f == nil { + continue + } + name := nameForFunction(f) + if name == "" { + continue + } + names = append(names, name) + + newQualifier := name + index := strings.LastIndexByte(newQualifier, '.') + if index > 0 { + newQualifier = newQualifier[:index] + } + switch qualifier { + case "": + qualifier = newQualifier + case newQualifier: + // So far, so good... + default: + // Nope, different. + singleQualifier = false + } + } + + if singleQualifier { + return qualifier + } + + return strings.Join(names, "+") +} + +func nameForFunction(f any) string { + fn := runtime.FuncForPC(reflect.ValueOf(f).Pointer()) + if fn == nil { + return "" + } + return fn.Name() +} diff --git a/tools/cache/event_handler_name_test.go b/tools/cache/event_handler_name_test.go new file mode 100644 index 000000000..d17f66a08 --- /dev/null +++ b/tools/cache/event_handler_name_test.go @@ -0,0 +1,87 @@ +/* +Copyright The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package cache + +import ( + "testing" +) + +type mockHandler struct{} + +func (m mockHandler) OnAdd(any, bool) {} +func (m mockHandler) OnUpdate(any, any) {} +func (m mockHandler) OnDelete(any) {} + +func TestNameForHandler(t *testing.T) { + emptyHandler := ResourceEventHandlerFuncs{} + + for name, tc := range map[string]struct { + handler ResourceEventHandler + wantName string + }{ + "mixture": { + handler: ResourceEventHandlerFuncs{ + UpdateFunc: emptyHandler.OnUpdate, + DeleteFunc: func(any) {}, + }, + wantName: "k8s.io/client-go/tools/cache.ResourceEventHandlerFuncs.OnUpdate-fm+k8s.io/client-go/tools/cache.TestNameForHandler.func1", // Testcase must come first to get func1. + }, + "add": { + handler: ResourceEventHandlerFuncs{AddFunc: func(any) {}}, + wantName: "k8s.io/client-go/tools/cache.TestNameForHandler", + }, + "update": { + handler: ResourceEventHandlerFuncs{UpdateFunc: func(any, any) {}}, + wantName: "k8s.io/client-go/tools/cache.TestNameForHandler", + }, + "delete": { + handler: ResourceEventHandlerFuncs{DeleteFunc: func(any) {}}, + wantName: "k8s.io/client-go/tools/cache.TestNameForHandler", + }, + "all": { + handler: ResourceEventHandlerFuncs{ + AddFunc: func(any) {}, + UpdateFunc: func(any, any) {}, + DeleteFunc: func(any) {}, + }, + wantName: "k8s.io/client-go/tools/cache.TestNameForHandler", + }, + "ptrToFuncs": { + handler: &ResourceEventHandlerFuncs{AddFunc: func(any) {}}, + wantName: "k8s.io/client-go/tools/cache.TestNameForHandler", + }, + "struct": { + handler: mockHandler{}, + wantName: "k8s.io/client-go/tools/cache.mockHandler", + }, + "ptrToStruct": { + handler: &mockHandler{}, + wantName: "k8s.io/client-go/tools/cache.mockHandler", + }, + "nil": { + handler: nil, + wantName: "", + }, + } { + t.Run(name, func(t *testing.T) { + gotName := nameForHandler(tc.handler) + if gotName != tc.wantName { + t.Errorf("Got name:\n %s\nWanted name:\n %s", gotName, tc.wantName) + } + }) + } +} diff --git a/tools/cache/fifo.go b/tools/cache/fifo.go index 44d006b69..e31e2d270 100644 --- a/tools/cache/fifo.go +++ b/tools/cache/fifo.go @@ -58,6 +58,12 @@ type Queue interface { // Update, or Delete; otherwise the first batch is empty. HasSynced() bool + // HasSyncedChecker is done once the first batch of keys have all been + // popped. The first batch of keys are those of the first Replace + // operation if that happened before any Add, AddIfNotPresent, + // Update, or Delete; otherwise the first batch is empty. + HasSyncedChecker() DoneChecker + // Close the queue Close() } @@ -110,6 +116,11 @@ type FIFO struct { items map[string]interface{} queue []string + // synced is initially an open channel. It gets closed (once!) by checkSynced + // as soon as the initial sync is considered complete. + synced chan struct{} + syncedClosed bool + // populated is true if the first batch of items inserted by Replace() has been populated // or Delete/Add/Update was called first. populated bool @@ -127,7 +138,8 @@ type FIFO struct { } var ( - _ = Queue(&FIFO{}) // FIFO is a Queue + _ = Queue(&FIFO{}) // FIFO is a Queue + _ = DoneChecker(&FIFO{}) // ... and implements DoneChecker. ) // Close the queue. @@ -146,8 +158,36 @@ func (f *FIFO) HasSynced() bool { return f.hasSynced_locked() } +// HasSyncedChecker is done if an Add/Update/Delete/AddIfNotPresent are called first, +// or the first batch of items inserted by Replace() has been popped. +func (f *FIFO) HasSyncedChecker() DoneChecker { + return f +} + +// Name implements [DoneChecker.Name] +func (f *FIFO) Name() string { + return "FIFO" // FIFO doesn't seem to be used outside of a few tests, so changing the NewFIFO API to pass in a name doesn't seem worth it. +} + +// Done implements [DoneChecker.Done] +func (f *FIFO) Done() <-chan struct{} { + return f.synced +} + +// hasSynced_locked returns the result of a prior checkSynced call. func (f *FIFO) hasSynced_locked() bool { - return f.populated && f.initialPopulationCount == 0 + return f.syncedClosed +} + +// checkSynced checks whether the initial sync is completed. +// It must be called whenever populated or initialPopulationCount change +// while the mutex is still locked. +func (f *FIFO) checkSynced() { + synced := f.populated && f.initialPopulationCount == 0 + if synced && !f.syncedClosed { + f.syncedClosed = true + close(f.synced) + } } // Add inserts an item, and puts it in the queue. The item is only enqueued @@ -160,6 +200,7 @@ func (f *FIFO) Add(obj interface{}) error { f.lock.Lock() defer f.lock.Unlock() f.populated = true + f.checkSynced() if _, exists := f.items[id]; !exists { f.queue = append(f.queue, id) } @@ -184,6 +225,7 @@ func (f *FIFO) Delete(obj interface{}) error { f.lock.Lock() defer f.lock.Unlock() f.populated = true + f.checkSynced() delete(f.items, id) return err } @@ -220,6 +262,8 @@ func (f *FIFO) Pop(process PopProcessFunc) (interface{}, error) { f.queue = f.queue[1:] if f.initialPopulationCount > 0 { f.initialPopulationCount-- + // Must be done *after* process has completed. + defer f.checkSynced() } item, ok := f.items[id] if !ok { @@ -252,6 +296,7 @@ func (f *FIFO) Replace(list []interface{}, resourceVersion string) error { if !f.populated { f.populated = true f.initialPopulationCount = len(items) + f.checkSynced() } f.items = items @@ -290,6 +335,7 @@ func (f *FIFO) Resync() error { // process. func NewFIFO(keyFunc KeyFunc) *FIFO { f := &FIFO{ + synced: make(chan struct{}), items: map[string]interface{}{}, queue: []string{}, keyFunc: keyFunc, diff --git a/tools/cache/processor_listener_test.go b/tools/cache/processor_listener_test.go index 517258a16..cd8c5c739 100644 --- a/tools/cache/processor_listener_test.go +++ b/tools/cache/processor_listener_test.go @@ -17,6 +17,7 @@ limitations under the License. package cache import ( + "context" "sync" "testing" "time" @@ -29,6 +30,25 @@ const ( concurrencyLevel = 5 ) +type mockSynced struct { + context.Context + cancel func() +} + +func newMockSynced(tb testing.TB, synced bool) *mockSynced { + m := &mockSynced{} + m.Context, m.cancel = context.WithCancel(context.Background()) + if synced { + m.cancel() + } + tb.Cleanup(m.cancel) + return m +} + +func (m *mockSynced) Name() string { + return "mock" +} + func BenchmarkListener(b *testing.B) { var notification addNotification @@ -40,7 +60,7 @@ func BenchmarkListener(b *testing.B) { AddFunc: func(obj interface{}) { swg.Done() }, - }, 0, 0, time.Now(), 1024*1024, func() bool { return true }) + }, 0, 0, time.Now(), 1024*1024, newMockSynced(b, true)) var wg wait.Group defer wg.Wait() // Wait for .run and .pop to stop defer close(pl.addCh) // Tell .run and .pop to stop diff --git a/tools/cache/reflector_data_consistency_detector_test.go b/tools/cache/reflector_data_consistency_detector_test.go index f6670a42c..fc9c1529e 100644 --- a/tools/cache/reflector_data_consistency_detector_test.go +++ b/tools/cache/reflector_data_consistency_detector_test.go @@ -73,7 +73,7 @@ func runTestReflectorDataConsistencyDetector(t *testing.T, transformer Transform defer cancel() store := NewStore(MetaNamespaceKeyFunc) - fifo := newQueueFIFO(logger, store, transformer, InformerNameAndResource{}, nil) + _, fifo := newQueueFIFO(logger, nil, store, transformer, InformerNameAndResource{}, nil) lw := &ListWatch{ ListFunc: func(options metav1.ListOptions) (runtime.Object, error) { diff --git a/tools/cache/shared_informer.go b/tools/cache/shared_informer.go index 45daafa61..503cd2e2d 100644 --- a/tools/cache/shared_informer.go +++ b/tools/cache/shared_informer.go @@ -20,6 +20,7 @@ import ( "context" "errors" "fmt" + "slices" "sync" "sync/atomic" "time" @@ -194,6 +195,14 @@ type SharedInformer interface { // For that, please call HasSynced on the handle returned by // AddEventHandler. HasSynced() bool + // HasSyncedChecker completes if the shared informer's store has been + // informed by at least one full LIST of the authoritative state + // of the informer's object collection. This is unrelated to "resync". + // + // Note that this doesn't tell you if an individual handler is synced!! + // For that, please use HasSyncedChecker on the handle returned by + // AddEventHandler. + HasSyncedChecker() DoneChecker // LastSyncResourceVersion is the resource version observed when last synced with the underlying // store. The value returned is not synchronized with access to the underlying store and is not // thread-safe. @@ -247,6 +256,10 @@ type ResourceEventHandlerRegistration interface { // HasSynced reports if both the parent has synced and all pre-sync // events have been delivered. HasSynced() bool + + // HasSyncedChecker reports if both the parent has synced and all pre-sync + // events have been delivered. + HasSyncedChecker() DoneChecker } // Optional configuration options for [SharedInformer.AddEventHandlerWithOptions]. @@ -309,6 +322,7 @@ func NewSharedIndexInformerWithOptions(lw ListerWatcher, exampleObject runtime.O return &sharedIndexInformer{ indexer: NewIndexer(DeletionHandlingMetaNamespaceKeyFunc, options.Indexers), processor: processor, + synced: make(chan struct{}), listerWatcher: lw, objectType: exampleObject, objectDescription: options.ObjectDescription, @@ -414,6 +428,107 @@ func WaitForCacheSync(stopCh <-chan struct{}, cacheSyncs ...InformerSynced) bool return true } +// WaitFor waits for a set of activities to complete, like cache syncing. +// It returns true if it was successful, false if the context was canceled +// before all activities are completed. +// +// If a non-nil "what" is provided, then progress information is logged +// while waiting ("Waiting", for=""). +// +// In contrast to other WaitForCacheSync alternatives, this one here doesn't +// need polling, which makes it react immediately. When used in a synctest unit +// test, waiting completes without moving time forward randomly, which +// makes tests more predictable. +func WaitFor(ctx context.Context, what string, checkers ...DoneChecker) bool { + logger := klog.FromContext(ctx) + if what != "" { + helper, l := logger.WithCallStackHelper() + logger = l + helper() + logger.Info("Waiting", "for", what) + } + + // Check in parallel to ensure that we log "Done waiting" as soon + // as possible for each checker. The timing may be useful to know. + // We cannot log inside the goroutine, the stack unwinding wouldn't + // work, so instead each goroutine just notifies the parent + // goroutine when it's checker is done and the main goroutine then + // logs it. + var wg sync.WaitGroup + type result struct { + checker DoneChecker + done bool + } + doneChecker := make(chan result) + for _, checker := range checkers { + wg.Go(func() { + select { + case <-checker.Done(): + doneChecker <- result{checker, true} + case <-ctx.Done(): + // We can end up here even when the checker is already done, + // select is not deterministic. Check once more without blocking + // before finally giving up. + select { + case <-checker.Done(): + doneChecker <- result{checker, true} + default: + doneChecker <- result{checker, false} + } + } + }) + } + instances := make([]string, 0, len(checkers)) + for range len(checkers) { + // We are guaranteed to get exactly one result from each goroutine, so this won't block forever. + result := <-doneChecker + if result.done { + if what != "" { + logger.Info("Done waiting", "for", what, "instance", result.checker.Name()) + } + } else { + // We don't need this information unless we are a) logging or b) debugging interactively. + instances = append(instances, result.checker.Name()) + } + } + wg.Wait() + + if what != "" && len(instances) > 0 { + slices.Sort(instances) + logger.Info("Timed out waiting", "for", what, "cause", context.Cause(ctx), "instances", instances) + } + + done := len(instances) == 0 + return done +} + +// DoneChecker, in contrast to [InformerSynced], supports waiting +// for some activity to finish without polling and has a name +// that describes itself. +// +// To check for completion without blocking, use [IsDone]. +type DoneChecker interface { + // Name returns a string describing the entity that is being waited for. + // + // Note that this name might be computed, so callers should only + // get the name outside of a hot code path. + Name() string + + // Done returns a channel that will be closed on completion + // of the activity. + Done() <-chan struct{} +} + +// IsDone returns true if the activity is done, false otherwise. +func IsDone(checker DoneChecker) bool { + select { + case <-checker.Done(): + return true + default: + return false + } +} + // `*sharedIndexInformer` implements SharedIndexInformer and has three // main components. One is an indexed local cache, `indexer Indexer`. // The second main component is a Controller that pulls @@ -431,6 +546,10 @@ type sharedIndexInformer struct { indexer Indexer controller Controller + // synced gets created when creating the sharedIndexInformer. + // It gets closed when Run detects that the processor created + synced chan struct{} + processor *sharedProcessor cacheMutationDetector MutationDetector @@ -494,6 +613,10 @@ func (v *dummyController) HasSynced() bool { return v.informer.HasSynced() } +func (v *dummyController) HasSyncedChecker() DoneChecker { + return v.informer.HasSyncedChecker() +} + func (v *dummyController) LastSyncResourceVersion() string { if clientgofeaturegate.FeatureGates().Enabled(clientgofeaturegate.InformerResourceVersion) { return v.informer.LastSyncResourceVersion() @@ -563,7 +686,7 @@ func (s *sharedIndexInformer) RunWithContext(ctx context.Context) { s.startedLock.Lock() defer s.startedLock.Unlock() - fifo := newQueueFIFO(logger, s.indexer, s.transform, s.identifier, s.fifoMetricsProvider) + logger, fifo := newQueueFIFO(logger, s.objectType, s.indexer, s.transform, s.identifier, s.fifoMetricsProvider) cfg := &Config{ Queue: fifo, @@ -573,8 +696,12 @@ func (s *sharedIndexInformer) RunWithContext(ctx context.Context) { FullResyncPeriod: s.resyncCheckPeriod, ShouldResync: s.processor.shouldResync, - Process: s.HandleDeltas, - ProcessBatch: s.HandleBatchDeltas, + Process: func(obj interface{}, isInInitialList bool) error { + return s.handleDeltas(logger, obj, isInInitialList) + }, + ProcessBatch: func(deltas []Delta, isInInitialList bool) error { + return s.handleBatchDeltas(logger, deltas, isInInitialList) + }, WatchErrorHandlerWithContext: s.watchErrorHandler, } @@ -594,6 +721,15 @@ func (s *sharedIndexInformer) RunWithContext(ctx context.Context) { // has a RunWithContext method that we can use here. wg.StartWithChannel(processorStopCtx.Done(), s.cacheMutationDetector.Run) wg.StartWithContext(processorStopCtx, s.processor.run) + wg.Start(func() { + select { + case <-ctx.Done(): + // We were stopped without completing the sync. + case <-s.controller.HasSyncedChecker().Done(): + // Controller has synced and thus so have we. + close(s.synced) + } + }) defer func() { s.startedLock.Lock() @@ -610,13 +746,31 @@ func (s *sharedIndexInformer) HasStarted() bool { } func (s *sharedIndexInformer) HasSynced() bool { - s.startedLock.Lock() - defer s.startedLock.Unlock() - - if s.controller == nil { + select { + case <-s.synced: + return true + default: return false } - return s.controller.HasSynced() +} + +func (s *sharedIndexInformer) HasSyncedChecker() DoneChecker { + return &sharedIndexInformerDone{ + s: s, + } +} + +// sharedIndexInformerDone implements [NamedCacheSync] for a [sharedIndexInformer]. +type sharedIndexInformerDone struct { + s *sharedIndexInformer +} + +func (sd *sharedIndexInformerDone) Name() string { + return fmt.Sprintf("SharedIndexInformer %T", sd.s.objectType) +} + +func (sd *sharedIndexInformerDone) Done() <-chan struct{} { + return sd.s.synced } func (s *sharedIndexInformer) LastSyncResourceVersion() string { @@ -708,7 +862,7 @@ func (s *sharedIndexInformer) AddEventHandlerWithOptions(handler ResourceEventHa } } - listener := newProcessListener(logger, handler, resyncPeriod, determineResyncPeriod(logger, resyncPeriod, s.resyncCheckPeriod), s.clock.Now(), initialBufferSize, s.HasSynced) + listener := newProcessListener(logger, handler, resyncPeriod, determineResyncPeriod(logger, resyncPeriod, s.resyncCheckPeriod), s.clock.Now(), initialBufferSize, s.HasSyncedChecker()) if !s.started { return s.processor.addListener(listener), nil @@ -737,20 +891,20 @@ func (s *sharedIndexInformer) AddEventHandlerWithOptions(handler ResourceEventHa return handle, nil } -func (s *sharedIndexInformer) HandleDeltas(obj interface{}, isInInitialList bool) error { +func (s *sharedIndexInformer) handleDeltas(logger klog.Logger, obj interface{}, isInInitialList bool) error { s.blockDeltas.Lock() defer s.blockDeltas.Unlock() if deltas, ok := obj.(Deltas); ok { - return processDeltas(s, s.indexer, deltas, isInInitialList, s.keyFunc) + return processDeltas(logger, s, s.indexer, deltas, isInInitialList, s.keyFunc) } return errors.New("object given as Process argument is not Deltas") } -func (s *sharedIndexInformer) HandleBatchDeltas(deltas []Delta, isInInitialList bool) error { +func (s *sharedIndexInformer) handleBatchDeltas(logger klog.Logger, deltas []Delta, isInInitialList bool) error { s.blockDeltas.Lock() defer s.blockDeltas.Unlock() - return processDeltasInBatch(s, s.indexer, deltas, isInInitialList, s.keyFunc) + return processDeltasInBatch(logger, s, s.indexer, deltas, isInInitialList, s.keyFunc) } // Conforms to ResourceEventHandler @@ -854,6 +1008,7 @@ func (p *sharedProcessor) addListener(listener *processorListener) ResourceEvent p.listeners[listener] = true if p.listenersStarted { + p.wg.Start(listener.watchSynced) p.wg.Start(listener.run) p.wg.Start(listener.pop) } @@ -925,6 +1080,7 @@ func (p *sharedProcessor) run(ctx context.Context) { p.listenersLock.Lock() defer p.listenersLock.Unlock() for listener := range p.listeners { + p.wg.Start(listener.watchSynced) p.wg.Start(listener.run) p.wg.Start(listener.pop) } @@ -986,7 +1142,7 @@ func (p *sharedProcessor) resyncCheckPeriodChanged(logger klog.Logger, resyncChe } // processorListener relays notifications from a sharedProcessor to -// one ResourceEventHandler --- using two goroutines, two unbuffered +// one ResourceEventHandler --- using three goroutines, two unbuffered // channels, and an unbounded ring buffer. The `add(notification)` // function sends the given notification to `addCh`. One goroutine // runs `pop()`, which pumps notifications from `addCh` to `nextCh` @@ -994,16 +1150,23 @@ func (p *sharedProcessor) resyncCheckPeriodChanged(logger klog.Logger, resyncChe // Another goroutine runs `run()`, which receives notifications from // `nextCh` and synchronously invokes the appropriate handler method. // +// The third goroutine watches the upstream "has synced" channel +// and notifies a SingleFileTracker instance. That instance then +// combines the upstream state and the processListener state to +// implement the overall "event handler has synced". +// // processorListener also keeps track of the adjusted requested resync // period of the listener. type processorListener struct { logger klog.Logger nextCh chan interface{} addCh chan interface{} + done chan struct{} handler ResourceEventHandler - syncTracker *synctrack.SingleFileTracker + syncTracker *synctrack.SingleFileTracker + upstreamHasSynced DoneChecker // pendingNotifications is an unbounded ring buffer that holds all notifications not yet distributed. // There is one per listener, but a failing/stalled listener will have infinite pendingNotifications @@ -1041,13 +1204,21 @@ func (p *processorListener) HasSynced() bool { return p.syncTracker.HasSynced() } -func newProcessListener(logger klog.Logger, handler ResourceEventHandler, requestedResyncPeriod, resyncPeriod time.Duration, now time.Time, bufferSize int, hasSynced func() bool) *processorListener { +// HasNamedSync is done if the source informer has synced, and all +// corresponding events have been delivered. +func (p *processorListener) HasSyncedChecker() DoneChecker { + return p.syncTracker +} + +func newProcessListener(logger klog.Logger, handler ResourceEventHandler, requestedResyncPeriod, resyncPeriod time.Duration, now time.Time, bufferSize int, hasSynced DoneChecker) *processorListener { ret := &processorListener{ logger: logger, nextCh: make(chan interface{}), addCh: make(chan interface{}), + done: make(chan struct{}), + upstreamHasSynced: hasSynced, handler: handler, - syncTracker: &synctrack.SingleFileTracker{UpstreamHasSynced: hasSynced}, + syncTracker: synctrack.NewSingleFileTracker(fmt.Sprintf("%s + event handler %s", hasSynced.Name(), nameForHandler(handler))), pendingNotifications: *buffer.NewRingGrowing(bufferSize), requestedResyncPeriod: requestedResyncPeriod, resyncPeriod: resyncPeriod, @@ -1068,6 +1239,7 @@ func (p *processorListener) add(notification interface{}) { func (p *processorListener) pop() { defer utilruntime.HandleCrashWithLogger(p.logger) defer close(p.nextCh) // Tell .run() to stop + defer close(p.done) // Tell .watchSynced() to stop var nextCh chan<- interface{} var notification interface{} @@ -1131,6 +1303,16 @@ func (p *processorListener) run() { } } +func (p *processorListener) watchSynced() { + select { + case <-p.upstreamHasSynced.Done(): + // Notify tracker that the upstream has synced. + p.syncTracker.UpstreamHasSynced() + case <-p.done: + // Give up waiting for sync. + } +} + // shouldResync determines if the listener needs a resync. If the listener's resyncPeriod is 0, // this always returns false. func (p *processorListener) shouldResync(now time.Time) bool { diff --git a/tools/cache/shared_informer_test.go b/tools/cache/shared_informer_test.go index 0b40315a8..835e6debd 100644 --- a/tools/cache/shared_informer_test.go +++ b/tools/cache/shared_informer_test.go @@ -1134,8 +1134,13 @@ func TestAddWhileActive(t *testing.T) { return } - if !handle1.HasSynced() { - t.Error("Not synced after Run??") + select { + case <-handle1.HasSyncedChecker().Done(): + if !handle1.HasSynced() { + t.Error("Not synced after channel said we are synced??") + } + case <-time.After(10 * time.Second): + t.Error("Not synced 10 seconds after Run??") } listener2.lock.Lock() // ensure we observe it before it has synced diff --git a/tools/cache/synctrack/synctrack.go b/tools/cache/synctrack/synctrack.go index 3fa2beb6b..e941dafb8 100644 --- a/tools/cache/synctrack/synctrack.go +++ b/tools/cache/synctrack/synctrack.go @@ -20,6 +20,7 @@ limitations under the License. package synctrack import ( + "context" "sync" "sync/atomic" @@ -27,11 +28,32 @@ import ( ) // AsyncTracker helps propagate HasSynced in the face of multiple worker threads. +// The user has to monitor the upstream "has synced" +// and notify the tracker when that changes from false to true. type AsyncTracker[T comparable] struct { - UpstreamHasSynced func() bool + // name describes the instance. + name string + + // upstreamHasSynced is changed from false (initial value) to true + // when UpstreamHasSynced is called. + upstreamHasSynced atomic.Bool lock sync.Mutex waiting sets.Set[T] + + // synced gets canceled once both the tracker and upstream are synced. + // A context is convenient for this because it gives us a channel + // and handles thread-safety. + synced context.Context + cancel func() +} + +func NewAsyncTracker[T comparable](name string) *AsyncTracker[T] { + t := &AsyncTracker[T]{ + name: name, + } + t.synced, t.cancel = context.WithCancel(context.Background()) + return t } // Start should be called prior to processing each key which is part of the @@ -57,6 +79,28 @@ func (t *AsyncTracker[T]) Finished(key T) { if t.waiting != nil { t.waiting.Delete(key) } + + // Maybe synced now? + if t.upstreamHasSynced.Load() && len(t.waiting) == 0 { + // Mark as synced. + t.cancel() + } +} + +// UpstreamHasSynced needs to be called at least once as soon as +// the upstream "has synced" becomes true. It tells AsyncTracker +// that the source is synced. +// +// Must be called after handing over the initial list to Start. +func (t *AsyncTracker[T]) UpstreamHasSynced() { + // Upstream is done, but we might not be yet. + t.upstreamHasSynced.Store(true) + t.lock.Lock() + defer t.lock.Unlock() + if len(t.waiting) == 0 { + // Mark as synced. + t.cancel() + } } // HasSynced returns true if the source is synced and every key present in the @@ -64,27 +108,51 @@ func (t *AsyncTracker[T]) Finished(key T) { // itself synced until *after* it has delivered the notification for the last // key, and that notification handler must have called Start. func (t *AsyncTracker[T]) HasSynced() bool { - // Call UpstreamHasSynced first: it might take a lock, which might take - // a significant amount of time, and we can't hold our lock while - // waiting on that or a user is likely to get a deadlock. - if !t.UpstreamHasSynced() { - return false - } - t.lock.Lock() - defer t.lock.Unlock() - return t.waiting.Len() == 0 + return t.synced.Err() != nil +} + +// Done returns a channel that is closed if the source is synced and every key present in the +// initial list has been processed. This relies on the source not considering +// itself synced until *after* it has delivered the notification for the last +// key, and that notification handler must have called Start. +func (t *AsyncTracker[T]) Done() <-chan struct{} { + return t.synced.Done() +} + +func (t *AsyncTracker[T]) Name() string { + return t.name } // SingleFileTracker helps propagate HasSynced when events are processed in -// order (i.e. via a queue). +// order (i.e. via a queue). The user has to monitor the upstream "has synced" +// and notify the tracker when that changes from false to true. type SingleFileTracker struct { + // name describes the instance. + name string + // Important: count is used with atomic operations so it must be 64-bit // aligned, otherwise atomic operations will panic. Having it at the top of // the struct will guarantee that, even on 32-bit arches. // See https://pkg.go.dev/sync/atomic#pkg-note-BUG for more information. count int64 - UpstreamHasSynced func() bool + // upstreamHasSynced is changed from false (initial value) to true + // when UpstreamHasSynced is called. + upstreamHasSynced atomic.Bool + + // synced gets canceled once both the tracker and upstream are synced. + // A context is convenient for this because it gives us a channel + // and handles thread-safety. + synced context.Context + cancel func() +} + +func NewSingleFileTracker(name string) *SingleFileTracker { + t := &SingleFileTracker{ + name: name, + } + t.synced, t.cancel = context.WithCancel(context.Background()) + return t } // Start should be called prior to processing each key which is part of the @@ -103,6 +171,26 @@ func (t *SingleFileTracker) Finished() { if result < 0 { panic("synctrack: negative counter; this logic error means HasSynced may return incorrect value") } + + // Maybe synced now? + if result == 0 && t.upstreamHasSynced.Load() { + // Mark as synced. + t.cancel() + } +} + +// UpstreamHasSynced needs to be called at least once as soon as +// the upstream "has synced" becomes true. It tells SingleFileTracker +// that the source is synced. +// +// Must be called after handing over the initial list to Start. +func (t *SingleFileTracker) UpstreamHasSynced() { + // Upstream is done, but we might not be yet. + t.upstreamHasSynced.Store(true) + if atomic.LoadInt64(&t.count) == 0 { + // Mark as synced. + t.cancel() + } } // HasSynced returns true if the source is synced and every key present in the @@ -110,11 +198,17 @@ func (t *SingleFileTracker) Finished() { // itself synced until *after* it has delivered the notification for the last // key, and that notification handler must have called Start. func (t *SingleFileTracker) HasSynced() bool { - // Call UpstreamHasSynced first: it might take a lock, which might take - // a significant amount of time, and we don't want to then act on a - // stale count value. - if !t.UpstreamHasSynced() { - return false - } - return atomic.LoadInt64(&t.count) <= 0 + return t.synced.Err() != nil +} + +// Done returns a channel that is closed if the source is synced and every key present in the +// initial list has been processed. This relies on the source not considering +// itself synced until *after* it has delivered the notification for the last +// key, and that notification handler must have called Start. +func (t *SingleFileTracker) Done() <-chan struct{} { + return t.synced.Done() +} + +func (t *SingleFileTracker) Name() string { + return t.name } diff --git a/tools/cache/synctrack/synctrack_test.go b/tools/cache/synctrack/synctrack_test.go index 4cf089e22..bff8838e8 100644 --- a/tools/cache/synctrack/synctrack_test.go +++ b/tools/cache/synctrack/synctrack_test.go @@ -19,29 +19,24 @@ package synctrack import ( "strings" "sync" - "time" "testing" ) -func testSingleFileFuncs(upstreamHasSynced func() bool) (start func(), finished func(), hasSynced func() bool) { - tracker := SingleFileTracker{ - UpstreamHasSynced: upstreamHasSynced, - } - return tracker.Start, tracker.Finished, tracker.HasSynced +func testSingleFileFuncs() (upstreamHasSynced func(), start func(), finished func(), hasSynced func() bool, synced <-chan struct{}) { + tracker := NewSingleFileTracker("") + return tracker.UpstreamHasSynced, tracker.Start, tracker.Finished, tracker.HasSynced, tracker.Done() } -func testAsyncFuncs(upstreamHasSynced func() bool) (start func(), finished func(), hasSynced func() bool) { - tracker := AsyncTracker[string]{ - UpstreamHasSynced: upstreamHasSynced, - } - return func() { tracker.Start("key") }, func() { tracker.Finished("key") }, tracker.HasSynced +func testAsyncFuncs() (upstreamHasSynced func(), start func(), finished func(), hasSynced func() bool, synced <-chan struct{}) { + tracker := NewAsyncTracker[string]("") + return tracker.UpstreamHasSynced, func() { tracker.Start("key") }, func() { tracker.Finished("key") }, tracker.HasSynced, tracker.Done() } func TestBasicLogic(t *testing.T) { table := []struct { name string - construct func(func() bool) (func(), func(), func() bool) + construct func() (func(), func(), func(), func() bool, <-chan struct{}) }{ {"SingleFile", testSingleFileFuncs}, {"Async", testAsyncFuncs}, @@ -50,27 +45,87 @@ func TestBasicLogic(t *testing.T) { for _, entry := range table { t.Run(entry.name, func(t *testing.T) { table := []struct { - synced bool + synced bool + syncedBeforeFinish bool + start bool finish bool expectSynced bool }{ - {false, true, true, false}, - {true, true, false, false}, - {false, true, false, false}, - {true, true, true, true}, + {false, false, true, true, false}, + {true, false, true, false, false}, + {true, true, true, false, false}, + {false, false, true, false, false}, + {true, false, true, true, true}, + {true, true, true, true, true}, } for _, tt := range table { - Start, Finished, HasSynced := entry.construct(func() bool { return tt.synced }) + upstreamHasSynced, start, finished, hasSynced, synced := entry.construct() + syncedDone := func() bool { + select { + case <-synced: + return true + default: + return false + } + } + + if hasSynced() { + t.Errorf("for %#v got HasSynced() true before start (wanted false)", tt) + } + if syncedDone() { + t.Errorf("for %#v got Done() true before start (wanted false)", tt) + } + if tt.start { - Start() + start() + } + + if hasSynced() { + t.Errorf("for %#v got HasSynced() true after start (wanted false)", tt) + } + if syncedDone() { + t.Errorf("for %#v got Done() true after start (wanted false)", tt) + } + + // "upstream has synced" may occur before or after finished, but not before start. + if tt.synced && tt.syncedBeforeFinish { + upstreamHasSynced() + if hasSynced() { + t.Errorf("for %#v got HasSynced() true after upstreamHasSynced and before finish (wanted false)", tt) + } + if syncedDone() { + t.Errorf("for %#v got Done() true after upstreamHasSynced and before finish (wanted false)", tt) + } } if tt.finish { - Finished() + finished() } - got := HasSynced() - if e, a := tt.expectSynced, got; e != a { - t.Errorf("for %#v got %v (wanted %v)", tt, a, e) + if tt.synced && !tt.syncedBeforeFinish { + if hasSynced() { + t.Errorf("for %#v got HasSynced() true after finish and before upstreamHasSynced (wanted false)", tt) + } + if syncedDone() { + t.Errorf("for %#v got Done() true after finish and before upstreamHasSynced (wanted false)", tt) + } + upstreamHasSynced() + } + if e, a := tt.expectSynced, hasSynced(); e != a { + t.Errorf("for %#v got HasSynced() %v (wanted %v)", tt, a, e) + } + if e, a := tt.expectSynced, syncedDone(); e != a { + t.Errorf("for %#v got Done() %v (wanted %v)", tt, a, e) + } + + select { + case <-synced: + if !tt.expectSynced { + t.Errorf("for %#v got done (wanted not done)", tt) + } + default: + if tt.expectSynced { + t.Errorf("for %#v got done (wanted not done)", tt) + } } } }) @@ -78,7 +133,7 @@ func TestBasicLogic(t *testing.T) { } func TestAsyncLocking(t *testing.T) { - aft := AsyncTracker[int]{UpstreamHasSynced: func() bool { return true }} + aft := NewAsyncTracker[int]("") var wg sync.WaitGroup for _, i := range []int{0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10} { @@ -92,6 +147,7 @@ func TestAsyncLocking(t *testing.T) { }(i) } wg.Wait() + aft.UpstreamHasSynced() if !aft.HasSynced() { t.Errorf("async tracker must have made a threading error?") } @@ -99,7 +155,7 @@ func TestAsyncLocking(t *testing.T) { } func TestSingleFileCounting(t *testing.T) { - sft := SingleFileTracker{UpstreamHasSynced: func() bool { return true }} + sft := NewSingleFileTracker("") for i := 0; i < 100; i++ { sft.Start() @@ -115,6 +171,8 @@ func TestSingleFileCounting(t *testing.T) { } sft.Finished() + sft.UpstreamHasSynced() + if !sft.HasSynced() { t.Fatal("Unexpectedly not synced?") } @@ -148,24 +206,35 @@ func TestSingleFileCounting(t *testing.T) { func TestSingleFile(t *testing.T) { table := []struct { - synced bool + synced bool + syncedBeforeStops bool + starts int stops int expectSynced bool }{ - {false, 1, 1, false}, - {true, 1, 0, false}, - {false, 1, 0, false}, - {true, 1, 1, true}, + {false, false, 1, 1, false}, + {true, false, 1, 0, false}, + {true, true, 1, 0, false}, + {false, false, 1, 0, false}, + {true, false, 1, 1, true}, + {true, true, 1, 1, true}, } for _, tt := range table { - sft := SingleFileTracker{UpstreamHasSynced: func() bool { return tt.synced }} + sft := NewSingleFileTracker("") for i := 0; i < tt.starts; i++ { sft.Start() } + // "upstream has synced" may occur before or after finished, but not before start. + if tt.synced && tt.syncedBeforeStops { + sft.UpstreamHasSynced() + } for i := 0; i < tt.stops; i++ { sft.Finished() } + if tt.synced && !tt.syncedBeforeStops { + sft.UpstreamHasSynced() + } got := sft.HasSynced() if e, a := tt.expectSynced, got; e != a { t.Errorf("for %#v got %v (wanted %v)", tt, a, e) @@ -173,67 +242,3 @@ func TestSingleFile(t *testing.T) { } } - -func TestNoStaleValue(t *testing.T) { - table := []struct { - name string - construct func(func() bool) (func(), func(), func() bool) - }{ - {"SingleFile", testSingleFileFuncs}, - {"Async", testAsyncFuncs}, - } - - for _, entry := range table { - t.Run(entry.name, func(t *testing.T) { - var lock sync.Mutex - upstreamHasSynced := func() bool { - lock.Lock() - defer lock.Unlock() - return true - } - - Start, Finished, HasSynced := entry.construct(upstreamHasSynced) - - // Ordinarily the corresponding lock would be held and you wouldn't be - // able to call this function at this point. - if !HasSynced() { - t.Fatal("Unexpectedly not synced??") - } - - Start() - if HasSynced() { - t.Fatal("Unexpectedly synced??") - } - Finished() - if !HasSynced() { - t.Fatal("Unexpectedly not synced??") - } - - // Now we will prove that if the lock is held, you can't get a false - // HasSynced return. - lock.Lock() - - // This goroutine calls HasSynced - var wg sync.WaitGroup - wg.Add(1) - go func() { - defer wg.Done() - if HasSynced() { - t.Error("Unexpectedly synced??") - } - }() - - // This goroutine increments + unlocks. The sleep is to bias the - // runtime such that the other goroutine usually wins (it needs to work - // in both orderings, this one is more likely to be buggy). - go func() { - time.Sleep(time.Millisecond) - Start() - lock.Unlock() - }() - - wg.Wait() - }) - } - -} diff --git a/tools/cache/the_real_fifo.go b/tools/cache/the_real_fifo.go index c60d2ebb8..4856a9baf 100644 --- a/tools/cache/the_real_fifo.go +++ b/tools/cache/the_real_fifo.go @@ -34,6 +34,7 @@ type RealFIFOOptions struct { Logger *klog.Logger // Name can be used to override the default "RealFIFO" name for the new instance. + // Optional. Used only if Identifier.Name returns an empty string. Name string // KeyFunction is used to figure out what key an object should have. (It's @@ -98,6 +99,11 @@ type RealFIFO struct { items []Delta + // synced is initially an open channel. It gets closed (once!) by checkSynced_locked + // as soon as the initial sync is considered complete. + synced chan struct{} + syncedClosed bool + // populated is true if the first batch of items inserted by Replace() has been populated // or Delete/Add/Update was called first. populated bool @@ -160,6 +166,7 @@ type SyncAllInfo struct{} var ( _ = Queue(&RealFIFO{}) // RealFIFO is a Queue _ = TransformingStore(&RealFIFO{}) // RealFIFO implements TransformingStore to allow memory optimizations + _ = DoneChecker(&RealFIFO{}) // RealFIFO and implements DoneChecker. ) // Close the queue. @@ -196,11 +203,37 @@ func (f *RealFIFO) HasSynced() bool { return f.hasSynced_locked() } -// ignoring lint to reduce delta to the original for review. It's ok adjust later. -// -//lint:file-ignore ST1003: should not use underscores in Go names +// HasSyncedChecker is done if an Add/Update/Delete/AddIfNotPresent are called first, +// or the first batch of items inserted by Replace() has been popped. +func (f *RealFIFO) HasSyncedChecker() DoneChecker { + return f +} + +// Name implements [DoneChecker.Name] +func (f *RealFIFO) Name() string { + return f.name +} + +// Done implements [DoneChecker.Done] +func (f *RealFIFO) Done() <-chan struct{} { + return f.synced +} + +// hasSynced_locked returns the result of a prior checkSynced_locked call. func (f *RealFIFO) hasSynced_locked() bool { - return f.populated && f.initialPopulationCount == 0 + return f.syncedClosed +} + +// checkSynced_locked checks whether the initial batch of items (set via Replace) has been delivered +// and closes the synced channel as needed. It must be called after changing f.populated and/or +// f.initialPopulationCount while the mutex is still locked. +func (f *RealFIFO) checkSynced_locked() { + synced := f.populated && f.initialPopulationCount == 0 + if synced && !f.syncedClosed { + // Initial sync is complete. + f.syncedClosed = true + close(f.synced) + } } // addToItems_locked appends to the delta list. @@ -291,6 +324,7 @@ func (f *RealFIFO) Add(obj interface{}) error { defer f.lock.Unlock() f.populated = true + f.checkSynced_locked() retErr := f.addToItems_locked(Added, false, obj) return retErr @@ -302,6 +336,7 @@ func (f *RealFIFO) Update(obj interface{}) error { defer f.lock.Unlock() f.populated = true + f.checkSynced_locked() retErr := f.addToItems_locked(Updated, false, obj) return retErr @@ -315,6 +350,7 @@ func (f *RealFIFO) Delete(obj interface{}) error { defer f.lock.Unlock() f.populated = true + f.checkSynced_locked() retErr := f.addToItems_locked(Deleted, false, obj) return retErr @@ -362,6 +398,7 @@ func (f *RealFIFO) Pop(process PopProcessFunc) (interface{}, error) { defer func() { if f.initialPopulationCount > 0 { f.initialPopulationCount-- + f.checkSynced_locked() } }() @@ -482,7 +519,6 @@ func (f *RealFIFO) PopBatch(processBatch ProcessBatchFunc, processSingle PopProc unique.Insert(id) moveDeltaToProcessList(i) } - f.items = f.items[len(deltas):] // Decrement initialPopulationCount if needed. // This is done in a defer so we only do this *after* processing is complete, @@ -490,6 +526,7 @@ func (f *RealFIFO) PopBatch(processBatch ProcessBatchFunc, processSingle PopProc defer func() { if f.initialPopulationCount > 0 { f.initialPopulationCount -= len(deltas) + f.checkSynced_locked() } }() @@ -539,7 +576,7 @@ func (f *RealFIFO) Replace(newItems []interface{}, resourceVersion string) error if f.emitAtomicEvents { err = f.addReplaceToItemsLocked(newItems, resourceVersion) } else { - err = reconcileReplacement(f.items, f.knownObjects, newItems, f.keyOf, + err = reconcileReplacement(f.logger, f.items, f.knownObjects, newItems, f.keyOf, func(obj DeletedFinalStateUnknown) error { return f.addToItems_locked(Deleted, true, obj) }, @@ -554,6 +591,7 @@ func (f *RealFIFO) Replace(newItems []interface{}, resourceVersion string) error if !f.populated { f.populated = true f.initialPopulationCount = len(f.items) + f.checkSynced_locked() } return nil @@ -563,6 +601,7 @@ func (f *RealFIFO) Replace(newItems []interface{}, resourceVersion string) error // and based upon the state of the items in the queue and known objects will call onDelete and onReplace // depending upon whether the item is being deleted or replaced/added. func reconcileReplacement( + logger klog.Logger, queuedItems []Delta, knownObjects KeyListerGetter, newItems []interface{}, @@ -638,10 +677,10 @@ func reconcileReplacement( deletedObj, exists, err := knownObjects.GetByKey(knownKey) if err != nil { deletedObj = nil - utilruntime.HandleErrorWithLogger(klog.TODO(), err, "Error during lookup, placing DeleteFinalStateUnknown marker without object", "key", knownKey) + utilruntime.HandleErrorWithLogger(logger, err, "Error during lookup, placing DeleteFinalStateUnknown marker without object", "key", knownKey) } else if !exists { deletedObj = nil - utilruntime.HandleErrorWithLogger(klog.TODO(), nil, "Key does not exist in known objects store, placing DeleteFinalStateUnknown marker without object", "key", knownKey) + utilruntime.HandleErrorWithLogger(logger, nil, "Key does not exist in known objects store, placing DeleteFinalStateUnknown marker without object", "key", knownKey) } retErr := onDelete(DeletedFinalStateUnknown{ Key: knownKey, @@ -757,6 +796,7 @@ func NewRealFIFOWithOptions(opts RealFIFOOptions) *RealFIFO { logger: klog.Background(), name: "RealFIFO", items: make([]Delta, 0, 10), + synced: make(chan struct{}), keyFunc: opts.KeyFunction, knownObjects: opts.KnownObjects, transformer: opts.Transformer, @@ -769,8 +809,11 @@ func NewRealFIFOWithOptions(opts RealFIFOOptions) *RealFIFO { if opts.Logger != nil { f.logger = *opts.Logger } - if opts.Name != "" { - f.name = opts.Name + if name := opts.Name; name != "" { + f.name = name + } + if name := opts.Identifier.Name(); name != "" { + f.name = name } f.logger = klog.LoggerWithName(f.logger, f.name) f.cond.L = &f.lock diff --git a/tools/cache/wait_test.go b/tools/cache/wait_test.go new file mode 100644 index 000000000..b53683689 --- /dev/null +++ b/tools/cache/wait_test.go @@ -0,0 +1,200 @@ +/* +Copyright The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package cache + +import ( + "bytes" + "context" + "errors" + "fmt" + "os" + "runtime" + "sync" + "testing" + "testing/synctest" + "time" + + "github.com/stretchr/testify/assert" + "k8s.io/klog/v2" + "k8s.io/klog/v2/textlogger" +) + +func init() { + // The test below is sensitive to the time zone, log output uses time.Local. + time.Local = time.UTC +} + +func TestWaitFor(t *testing.T) { + for name, tc := range map[string]struct { + what string + checkers []DoneChecker + timeout time.Duration + timeoutReason string + + expectDone bool + + // Time is predictable and starts at the synctest epoch. + // %[1]d is the pid, %[2]d the line number of the WaitFor call. + expectOutput string + }{ + "empty": { + expectDone: true, + }, + "no-caches": { + what: "my-caches", + expectDone: true, + expectOutput: `I0101 00:00:00.000000 %7[1]d wait_test.go:%[2]d] "Waiting" for="my-caches" +`, + }, + "no-logging": { + checkers: []DoneChecker{newMockChecker("first", 10*time.Second), newMockChecker("second", 5*time.Second), newMockChecker("last", 0*time.Second)}, + expectDone: true, + }, + "with-logging": { + what: "my-caches", + checkers: []DoneChecker{newMockChecker("first", 10*time.Second), newMockChecker("second", 5*time.Second), newMockChecker("last", 0*time.Second)}, + expectDone: true, + expectOutput: `I0101 00:00:00.000000 %7[1]d wait_test.go:%[2]d] "Waiting" for="my-caches" +I0101 00:00:00.000000 %7[1]d wait_test.go:%[2]d] "Done waiting" for="my-caches" instance="last" +I0101 00:00:05.000000 %7[1]d wait_test.go:%[2]d] "Done waiting" for="my-caches" instance="second" +I0101 00:00:10.000000 %7[1]d wait_test.go:%[2]d] "Done waiting" for="my-caches" instance="first" +`, + }, + "some-timeout": { + timeout: time.Minute, + what: "my-caches", + checkers: []DoneChecker{newMockChecker("first", 10*time.Second), newMockChecker("second", -1), newMockChecker("last", 0*time.Second)}, + expectDone: false, + expectOutput: `I0101 00:00:00.000000 %7[1]d wait_test.go:%[2]d] "Waiting" for="my-caches" +I0101 00:00:00.000000 %7[1]d wait_test.go:%[2]d] "Done waiting" for="my-caches" instance="last" +I0101 00:00:10.000000 %7[1]d wait_test.go:%[2]d] "Done waiting" for="my-caches" instance="first" +I0101 00:01:00.000000 %7[1]d wait_test.go:%[2]d] "Timed out waiting" for="my-caches" cause="context deadline exceeded" instances=["second"] +`, + }, + "some-canceled": { + timeout: -1, + what: "my-caches", + checkers: []DoneChecker{newMockChecker("first", 10*time.Second), newMockChecker("second", -1), newMockChecker("last", 0*time.Second)}, + expectDone: false, + expectOutput: `I0101 00:00:00.000000 %7[1]d wait_test.go:%[2]d] "Waiting" for="my-caches" +I0101 00:00:00.000000 %7[1]d wait_test.go:%[2]d] "Done waiting" for="my-caches" instance="last" +I0101 00:00:00.000000 %7[1]d wait_test.go:%[2]d] "Timed out waiting" for="my-caches" cause="context canceled" instances=["first","second"] +`, + }, + "more": { + timeoutReason: "go fish", + timeout: 5 * time.Second, + what: "my-caches", + checkers: []DoneChecker{newMockChecker("first", 10*time.Second), newMockChecker("second", -1), newMockChecker("last", 0*time.Second)}, + expectDone: false, + expectOutput: `I0101 00:00:00.000000 %7[1]d wait_test.go:%[2]d] "Waiting" for="my-caches" +I0101 00:00:00.000000 %7[1]d wait_test.go:%[2]d] "Done waiting" for="my-caches" instance="last" +I0101 00:00:05.000000 %7[1]d wait_test.go:%[2]d] "Timed out waiting" for="my-caches" cause="go fish" instances=["first","second"] +`, + }, + "all": { + timeout: time.Minute, + what: "my-caches", + checkers: []DoneChecker{newMockChecker("first", -1), newMockChecker("second", -1), newMockChecker("last", -1)}, + expectDone: false, + expectOutput: `I0101 00:00:00.000000 %7[1]d wait_test.go:%[2]d] "Waiting" for="my-caches" +I0101 00:01:00.000000 %7[1]d wait_test.go:%[2]d] "Timed out waiting" for="my-caches" cause="context deadline exceeded" instances=["first","last","second"] +`, + }, + } { + t.Run(name, func(t *testing.T) { + synctest.Test(t, func(t *testing.T) { + var buffer bytes.Buffer + logger := textlogger.NewLogger(textlogger.NewConfig(textlogger.Output(&buffer))) + ctx := klog.NewContext(context.Background(), logger) + var wg sync.WaitGroup + defer wg.Wait() + if tc.timeout != 0 { + switch tc.timeoutReason { + case "": + if tc.timeout > 0 { + c, cancel := context.WithTimeout(ctx, tc.timeout) + defer cancel() + ctx = c + } else { + c, cancel := context.WithCancel(ctx) + cancel() + ctx = c + } + default: + c, cancel := context.WithCancelCause(ctx) + wg.Go(func() { + time.Sleep(tc.timeout) + cancel(errors.New(tc.timeoutReason)) + }) + ctx = c + } + } + _, _, line, _ := runtime.Caller(0) + done := WaitFor(ctx, tc.what, tc.checkers...) + expectOutput := tc.expectOutput + if expectOutput != "" { + expectOutput = fmt.Sprintf(expectOutput, os.Getpid(), line+1) + } + assert.Equal(t, tc.expectDone, done, "done") + assert.Equal(t, expectOutput, buffer.String(), "output") + }) + }) + } +} + +// newMockChecker can be created outside of a synctest bubble. +// It constructs the channel inside when Done is first called. +func newMockChecker(name string, delay time.Duration) DoneChecker { + return &mockChecker{ + name: name, + delay: delay, + } +} + +type mockChecker struct { + name string + delay time.Duration + initialized bool + done <-chan struct{} +} + +func (m *mockChecker) Name() string { return m.name } +func (m *mockChecker) Done() <-chan struct{} { + if !m.initialized { + switch { + case m.delay > 0: + // In the future. + ctx := context.Background() + // This leaks a cancel, but is hard to avoid (cannot use the parent t.Cleanup, no other way to delay calling it). Doesn't matter in a unit test. + //nolint:govet + ctx, _ = context.WithTimeout(ctx, m.delay) + m.done = ctx.Done() + case m.delay == 0: + // Immediately. + c := make(chan struct{}) + close(c) + m.done = c + default: + // Never. + c := make(chan struct{}) + m.done = c + } + m.initialized = true + } + return m.done +} From ff8083eb1d46c854723816115f475360a2082a4c Mon Sep 17 00:00:00 2001 From: Patrick Ohly Date: Fri, 21 Nov 2025 16:23:44 +0100 Subject: [PATCH 3/3] client-go informers: context-aware Start + WaitForCacheSync Passing a context to StartWithContext enables context-aware reflector logging. This is the main remaining source of log spam (output to stderr instead of per-test logger) in controller unit tests. WaitForCacheSynceWithContext takes advantage of the new cache.WaitFor + NamedHasSynced functionality to finish "immediately" (= no virtual time passed) in a synctest bubble. While at it, the return type gets improved so that a failure is easier to handle. Kubernetes-commit: 5ff323de791df88880f6e065f5de4b445e5c90ed --- informers/factory.go | 91 ++++++++++++++++++++------ tools/cache/event_handler_name.go | 4 +- tools/cache/event_handler_name_test.go | 6 ++ tools/cache/shared_informer.go | 51 ++++++++++++++- tools/cache/wait_test.go | 46 +++++++++++++ 5 files changed, 175 insertions(+), 23 deletions(-) diff --git a/informers/factory.go b/informers/factory.go index fe7b1242e..c7c735fed 100644 --- a/informers/factory.go +++ b/informers/factory.go @@ -19,6 +19,7 @@ limitations under the License. package informers import ( + context "context" reflect "reflect" sync "sync" time "time" @@ -26,6 +27,7 @@ import ( v1 "k8s.io/apimachinery/pkg/apis/meta/v1" runtime "k8s.io/apimachinery/pkg/runtime" schema "k8s.io/apimachinery/pkg/runtime/schema" + wait "k8s.io/apimachinery/pkg/util/wait" admissionregistration "k8s.io/client-go/informers/admissionregistration" apiserverinternal "k8s.io/client-go/informers/apiserverinternal" apps "k8s.io/client-go/informers/apps" @@ -158,6 +160,10 @@ func NewSharedInformerFactoryWithOptions(client kubernetes.Interface, defaultRes } func (f *sharedInformerFactory) Start(stopCh <-chan struct{}) { + f.StartWithContext(wait.ContextForChannel(stopCh)) +} + +func (f *sharedInformerFactory) StartWithContext(ctx context.Context) { f.lock.Lock() defer f.lock.Unlock() @@ -167,15 +173,9 @@ func (f *sharedInformerFactory) Start(stopCh <-chan struct{}) { for informerType, informer := range f.informers { if !f.startedInformers[informerType] { - f.wg.Add(1) - // We need a new variable in each loop iteration, - // otherwise the goroutine would use the loop variable - // and that keeps changing. - informer := informer - go func() { - defer f.wg.Done() - informer.Run(stopCh) - }() + f.wg.Go(func() { + informer.RunWithContext(ctx) + }) f.startedInformers[informerType] = true } } @@ -192,6 +192,11 @@ func (f *sharedInformerFactory) Shutdown() { } func (f *sharedInformerFactory) WaitForCacheSync(stopCh <-chan struct{}) map[reflect.Type]bool { + result := f.WaitForCacheSyncWithContext(wait.ContextForChannel(stopCh)) + return result.Synced +} + +func (f *sharedInformerFactory) WaitForCacheSyncWithContext(ctx context.Context) cache.SyncResult { informers := func() map[reflect.Type]cache.SharedIndexInformer { f.lock.Lock() defer f.lock.Unlock() @@ -205,10 +210,31 @@ func (f *sharedInformerFactory) WaitForCacheSync(stopCh <-chan struct{}) map[ref return informers }() - res := map[reflect.Type]bool{} - for informType, informer := range informers { - res[informType] = cache.WaitForCacheSync(stopCh, informer.HasSynced) + // Wait for informers to sync, without polling. + cacheSyncs := make([]cache.DoneChecker, 0, len(informers)) + for _, informer := range informers { + cacheSyncs = append(cacheSyncs, informer.HasSyncedChecker()) } + cache.WaitFor(ctx, "" /* no logging */, cacheSyncs...) + + res := cache.SyncResult{ + Synced: make(map[reflect.Type]bool, len(informers)), + } + failed := false + for informType, informer := range informers { + hasSynced := informer.HasSynced() + if !hasSynced { + failed = true + } + res.Synced[informType] = hasSynced + } + if failed { + // context.Cause is more informative than ctx.Err(). + // This must be non-nil, otherwise WaitFor wouldn't have stopped + // prematurely. + res.Err = context.Cause(ctx) + } + return res } @@ -247,27 +273,46 @@ func (f *sharedInformerFactory) InformerFor(obj runtime.Object, newFunc internal // defer factory.WaitForStop() // Returns immediately if nothing was started. // genericInformer := factory.ForResource(resource) // typedInformer := factory.SomeAPIGroup().V1().SomeType() -// factory.Start(ctx.Done()) // Start processing these informers. -// synced := factory.WaitForCacheSync(ctx.Done()) -// for v, ok := range synced { -// if !ok { -// fmt.Fprintf(os.Stderr, "caches failed to sync: %v", v) -// return -// } +// handle, err := typeInformer.Informer().AddEventHandler(...) +// if err != nil { +// return fmt.Errorf("register event handler: %v", err) +// } +// defer typeInformer.Informer().RemoveEventHandler(handle) // Avoids leaking goroutines. +// factory.StartWithContext(ctx) // Start processing these informers. +// synced := factory.WaitForCacheSyncWithContext(ctx) +// if err := synced.AsError(); err != nil { +// return err +// } +// for v := range synced { +// // Only if desired log some information similar to this. +// fmt.Fprintf(os.Stdout, "cache synced: %s", v) +// } +// +// // Also make sure that all of the initial cache events have been delivered. +// if !WaitFor(ctx, "event handler sync", handle.HasSyncedChecker()) { +// // Must have failed because of context. +// return fmt.Errorf("sync event handler: %w", context.Cause(ctx)) // } // // // Creating informers can also be created after Start, but then // // Start must be called again: // anotherGenericInformer := factory.ForResource(resource) -// factory.Start(ctx.Done()) +// factory.StartWithContext(ctx) type SharedInformerFactory interface { internalinterfaces.SharedInformerFactory // Start initializes all requested informers. They are handled in goroutines // which run until the stop channel gets closed. // Warning: Start does not block. When run in a go-routine, it will race with a later WaitForCacheSync. + // + // Contextual logging: StartWithContext should be used instead of Start in code which supports contextual logging. Start(stopCh <-chan struct{}) + // StartWithContext initializes all requested informers. They are handled in goroutines + // which run until the context gets canceled. + // Warning: StartWithContext does not block. When run in a go-routine, it will race with a later WaitForCacheSync. + StartWithContext(ctx context.Context) + // Shutdown marks a factory as shutting down. At that point no new // informers can be started anymore and Start will return without // doing anything. @@ -282,8 +327,14 @@ type SharedInformerFactory interface { // WaitForCacheSync blocks until all started informers' caches were synced // or the stop channel gets closed. + // + // Contextual logging: WaitForCacheSync should be used instead of WaitForCacheSync in code which supports contextual logging. It also returns a more useful result. WaitForCacheSync(stopCh <-chan struct{}) map[reflect.Type]bool + // WaitForCacheSyncWithContext blocks until all started informers' caches were synced + // or the context gets canceled. + WaitForCacheSyncWithContext(ctx context.Context) cache.SyncResult + // ForResource gives generic access to a shared informer of the matching type. ForResource(resource schema.GroupVersionResource) (GenericInformer, error) diff --git a/tools/cache/event_handler_name.go b/tools/cache/event_handler_name.go index 9489f9b47..d466fc6e2 100644 --- a/tools/cache/event_handler_name.go +++ b/tools/cache/event_handler_name.go @@ -50,7 +50,9 @@ func nameForHandler(handler ResourceEventHandler) (name string) { value = value.Elem() } if value.Type().Kind() == reflect.Pointer { - value = value.Elem() + if !value.IsNil() { + value = value.Elem() + } } name := value.Type().PkgPath() if name != "" { diff --git a/tools/cache/event_handler_name_test.go b/tools/cache/event_handler_name_test.go index d17f66a08..9cb1434f9 100644 --- a/tools/cache/event_handler_name_test.go +++ b/tools/cache/event_handler_name_test.go @@ -76,6 +76,12 @@ func TestNameForHandler(t *testing.T) { handler: nil, wantName: "", }, + "stored-nil": { + // This is a bit odd, but one unit test actually registered + // such an event handler and it somehow worked. + handler: (*mockHandler)(nil), + wantName: "*cache.mockHandler", + }, } { t.Run(name, func(t *testing.T) { gotName := nameForHandler(tc.handler) diff --git a/tools/cache/shared_informer.go b/tools/cache/shared_informer.go index 503cd2e2d..0f145b72a 100644 --- a/tools/cache/shared_informer.go +++ b/tools/cache/shared_informer.go @@ -20,7 +20,9 @@ import ( "context" "errors" "fmt" + "reflect" "slices" + "strings" "sync" "sync/atomic" "time" @@ -433,7 +435,10 @@ func WaitForCacheSync(stopCh <-chan struct{}, cacheSyncs ...InformerSynced) bool // before all activities are completed. // // If a non-nil "what" is provided, then progress information is logged -// while waiting ("Waiting", for=""). +// while waiting ("Waiting", for=""). Verbosity is V(0). This can +// be made less verbose by the caller with: +// +// WaitFor(klog.NewContext(ctx, klog.FromContext(ctx).V(2)), ...) // // In contrast to other WaitForCacheSync alternatives, this one here doesn't // need polling, which makes it react immediately. When used in a synctest unit @@ -529,6 +534,43 @@ func IsDone(checker DoneChecker) bool { } } +// SyncResult is the result of a shared informer factory's WaitForCacheSyncWithContext. +// Under the hood such factories use [WaitFor] to wait for all instantiated informers, +// then provide this summary of what was synced. +// +// Note that the informers may have synced already before all event handlers registered with +// those informers have synced. Code which wants to be sure that all of its state is up-to-date +// should do its own WaitFor with the informer's HasSyncedChecker() *and* the +// registration handle's HasSyncChecker() results. +type SyncResult struct { + // Err is nil if all informer caches were synced, otherwise it is + // the reason why waiting for cache syncing stopped (= context.Cause(ctx)). + Err error + + // Synced maps each registered informer in a SharedInformerFactory to + // true if it has synced, false otherwise. + Synced map[reflect.Type]bool +} + +// AsError turns a SyncResult into an error if not all caches were synced, +// otherwise it returns nil. The error wraps context.Cause(ctx) and +// includes information about the informers which were not synced. +func (c SyncResult) AsError() error { + if c.Err == nil { + return nil + } + + unsynced := make([]string, 0, len(c.Synced)) + for t, synced := range c.Synced { + if !synced { + unsynced = append(unsynced, t.String()) + } + } + slices.Sort(unsynced) + + return fmt.Errorf("failed to sync all caches: %s: %w", strings.Join(unsynced, ", "), c.Err) +} + // `*sharedIndexInformer` implements SharedIndexInformer and has three // main components. One is an indexed local cache, `indexer Indexer`. // The second main component is a Controller that pulls @@ -888,6 +930,10 @@ func (s *sharedIndexInformer) AddEventHandlerWithOptions(handler ResourceEventHa // thread adding them and the counter is temporarily zero). listener.add(addNotification{newObj: item, isInInitialList: true}) } + + // Initial list is added, now we can allow the listener to detect that "upstream has synced". + s.processor.wg.Start(listener.watchSynced) + return handle, nil } @@ -1008,7 +1054,8 @@ func (p *sharedProcessor) addListener(listener *processorListener) ResourceEvent p.listeners[listener] = true if p.listenersStarted { - p.wg.Start(listener.watchSynced) + // Not starting listener.watchSynced! + // The caller must first add the initial list, then start it. p.wg.Start(listener.run) p.wg.Start(listener.pop) } diff --git a/tools/cache/wait_test.go b/tools/cache/wait_test.go index b53683689..f03b48dd8 100644 --- a/tools/cache/wait_test.go +++ b/tools/cache/wait_test.go @@ -22,6 +22,7 @@ import ( "errors" "fmt" "os" + "reflect" "runtime" "sync" "testing" @@ -198,3 +199,48 @@ func (m *mockChecker) Done() <-chan struct{} { } return m.done } + +func TestSyncResult(t *testing.T) { + for name, tc := range map[string]struct { + result SyncResult + expectAsError string + }{ + "empty": {}, + "one": { + result: SyncResult{ + Err: errors.New("my custom cancellation reason"), + Synced: map[reflect.Type]bool{ + reflect.TypeFor[int](): true, + reflect.TypeFor[string](): false, + }, + }, + expectAsError: "failed to sync all caches: string: my custom cancellation reason", + }, + "many": { + result: SyncResult{ + Err: errors.New("my custom cancellation reason"), + Synced: map[reflect.Type]bool{ + reflect.TypeFor[int](): false, + reflect.TypeFor[string](): false, + }, + }, + expectAsError: "failed to sync all caches: int, string: my custom cancellation reason", + }, + } { + + t.Run(name, func(t *testing.T) { + actual := tc.result.AsError() + switch { + case tc.expectAsError == "" && actual != nil: + t.Fatalf("expected no error, got %v", actual) + case tc.expectAsError != "" && actual == nil: + t.Fatalf("expected %q, got no error", actual) + case tc.expectAsError != "" && actual != nil && actual.Error() != tc.expectAsError: + t.Fatalf("expected %q, got %q", tc.expectAsError, actual.Error()) + } + if tc.result.Err != nil && !errors.Is(actual, tc.result.Err) { + t.Errorf("actual error %+v should wrap %v but doesn't", actual, tc.result.Err) + } + }) + } +}