diff --git a/informers/factory.go b/informers/factory.go index fe7b1242e..c7c735fed 100644 --- a/informers/factory.go +++ b/informers/factory.go @@ -19,6 +19,7 @@ limitations under the License. package informers import ( + context "context" reflect "reflect" sync "sync" time "time" @@ -26,6 +27,7 @@ import ( v1 "k8s.io/apimachinery/pkg/apis/meta/v1" runtime "k8s.io/apimachinery/pkg/runtime" schema "k8s.io/apimachinery/pkg/runtime/schema" + wait "k8s.io/apimachinery/pkg/util/wait" admissionregistration "k8s.io/client-go/informers/admissionregistration" apiserverinternal "k8s.io/client-go/informers/apiserverinternal" apps "k8s.io/client-go/informers/apps" @@ -158,6 +160,10 @@ func NewSharedInformerFactoryWithOptions(client kubernetes.Interface, defaultRes } func (f *sharedInformerFactory) Start(stopCh <-chan struct{}) { + f.StartWithContext(wait.ContextForChannel(stopCh)) +} + +func (f *sharedInformerFactory) StartWithContext(ctx context.Context) { f.lock.Lock() defer f.lock.Unlock() @@ -167,15 +173,9 @@ func (f *sharedInformerFactory) Start(stopCh <-chan struct{}) { for informerType, informer := range f.informers { if !f.startedInformers[informerType] { - f.wg.Add(1) - // We need a new variable in each loop iteration, - // otherwise the goroutine would use the loop variable - // and that keeps changing. - informer := informer - go func() { - defer f.wg.Done() - informer.Run(stopCh) - }() + f.wg.Go(func() { + informer.RunWithContext(ctx) + }) f.startedInformers[informerType] = true } } @@ -192,6 +192,11 @@ func (f *sharedInformerFactory) Shutdown() { } func (f *sharedInformerFactory) WaitForCacheSync(stopCh <-chan struct{}) map[reflect.Type]bool { + result := f.WaitForCacheSyncWithContext(wait.ContextForChannel(stopCh)) + return result.Synced +} + +func (f *sharedInformerFactory) WaitForCacheSyncWithContext(ctx context.Context) cache.SyncResult { informers := func() map[reflect.Type]cache.SharedIndexInformer { f.lock.Lock() defer f.lock.Unlock() @@ -205,10 +210,31 @@ func (f *sharedInformerFactory) WaitForCacheSync(stopCh <-chan struct{}) map[ref return informers }() - res := map[reflect.Type]bool{} - for informType, informer := range informers { - res[informType] = cache.WaitForCacheSync(stopCh, informer.HasSynced) + // Wait for informers to sync, without polling. + cacheSyncs := make([]cache.DoneChecker, 0, len(informers)) + for _, informer := range informers { + cacheSyncs = append(cacheSyncs, informer.HasSyncedChecker()) } + cache.WaitFor(ctx, "" /* no logging */, cacheSyncs...) + + res := cache.SyncResult{ + Synced: make(map[reflect.Type]bool, len(informers)), + } + failed := false + for informType, informer := range informers { + hasSynced := informer.HasSynced() + if !hasSynced { + failed = true + } + res.Synced[informType] = hasSynced + } + if failed { + // context.Cause is more informative than ctx.Err(). + // This must be non-nil, otherwise WaitFor wouldn't have stopped + // prematurely. + res.Err = context.Cause(ctx) + } + return res } @@ -247,27 +273,46 @@ func (f *sharedInformerFactory) InformerFor(obj runtime.Object, newFunc internal // defer factory.WaitForStop() // Returns immediately if nothing was started. // genericInformer := factory.ForResource(resource) // typedInformer := factory.SomeAPIGroup().V1().SomeType() -// factory.Start(ctx.Done()) // Start processing these informers. -// synced := factory.WaitForCacheSync(ctx.Done()) -// for v, ok := range synced { -// if !ok { -// fmt.Fprintf(os.Stderr, "caches failed to sync: %v", v) -// return -// } +// handle, err := typeInformer.Informer().AddEventHandler(...) +// if err != nil { +// return fmt.Errorf("register event handler: %v", err) +// } +// defer typeInformer.Informer().RemoveEventHandler(handle) // Avoids leaking goroutines. +// factory.StartWithContext(ctx) // Start processing these informers. +// synced := factory.WaitForCacheSyncWithContext(ctx) +// if err := synced.AsError(); err != nil { +// return err +// } +// for v := range synced { +// // Only if desired log some information similar to this. +// fmt.Fprintf(os.Stdout, "cache synced: %s", v) +// } +// +// // Also make sure that all of the initial cache events have been delivered. +// if !WaitFor(ctx, "event handler sync", handle.HasSyncedChecker()) { +// // Must have failed because of context. +// return fmt.Errorf("sync event handler: %w", context.Cause(ctx)) // } // // // Creating informers can also be created after Start, but then // // Start must be called again: // anotherGenericInformer := factory.ForResource(resource) -// factory.Start(ctx.Done()) +// factory.StartWithContext(ctx) type SharedInformerFactory interface { internalinterfaces.SharedInformerFactory // Start initializes all requested informers. They are handled in goroutines // which run until the stop channel gets closed. // Warning: Start does not block. When run in a go-routine, it will race with a later WaitForCacheSync. + // + // Contextual logging: StartWithContext should be used instead of Start in code which supports contextual logging. Start(stopCh <-chan struct{}) + // StartWithContext initializes all requested informers. They are handled in goroutines + // which run until the context gets canceled. + // Warning: StartWithContext does not block. When run in a go-routine, it will race with a later WaitForCacheSync. + StartWithContext(ctx context.Context) + // Shutdown marks a factory as shutting down. At that point no new // informers can be started anymore and Start will return without // doing anything. @@ -282,8 +327,14 @@ type SharedInformerFactory interface { // WaitForCacheSync blocks until all started informers' caches were synced // or the stop channel gets closed. + // + // Contextual logging: WaitForCacheSync should be used instead of WaitForCacheSync in code which supports contextual logging. It also returns a more useful result. WaitForCacheSync(stopCh <-chan struct{}) map[reflect.Type]bool + // WaitForCacheSyncWithContext blocks until all started informers' caches were synced + // or the context gets canceled. + WaitForCacheSyncWithContext(ctx context.Context) cache.SyncResult + // ForResource gives generic access to a shared informer of the matching type. ForResource(resource schema.GroupVersionResource) (GenericInformer, error) diff --git a/tools/cache/controller.go b/tools/cache/controller.go index b4e8295ab..75d2ca979 100644 --- a/tools/cache/controller.go +++ b/tools/cache/controller.go @@ -27,6 +27,7 @@ import ( utilruntime "k8s.io/apimachinery/pkg/util/runtime" "k8s.io/apimachinery/pkg/util/wait" clientgofeaturegate "k8s.io/client-go/features" + "k8s.io/klog/v2" "k8s.io/utils/clock" ) @@ -141,6 +142,11 @@ type Controller interface { // HasSynced delegates to the Config's Queue HasSynced() bool + // HasSyncedChecker enables waiting for syncing without polling. + // The returned DoneChecker can be passed to WaitFor. + // It delegates to the Config's Queue. + HasSyncedChecker() DoneChecker + // LastSyncResourceVersion delegates to the Reflector when there // is one, otherwise returns the empty string LastSyncResourceVersion() string @@ -167,11 +173,13 @@ func (c *controller) RunWithContext(ctx context.Context) { <-ctx.Done() c.config.Queue.Close() }() + logger := klog.FromContext(ctx) r := NewReflectorWithOptions( c.config.ListerWatcher, c.config.ObjectType, c.config.Queue, ReflectorOptions{ + Logger: &logger, ResyncPeriod: c.config.FullResyncPeriod, MinWatchTimeout: c.config.MinWatchTimeout, TypeDescription: c.config.ObjectDescription, @@ -205,6 +213,13 @@ func (c *controller) HasSynced() bool { return c.config.Queue.HasSynced() } +// HasSyncedChecker enables waiting for syncing without polling. +// The returned DoneChecker can be passed to [WaitFor]. +// It delegates to the Config's Queue. +func (c *controller) HasSyncedChecker() DoneChecker { + return c.config.Queue.HasSyncedChecker() +} + func (c *controller) LastSyncResourceVersion() string { c.reflectorMutex.RLock() defer c.reflectorMutex.RUnlock() @@ -395,6 +410,9 @@ func DeletionHandlingObjectToName(obj interface{}) (ObjectName, error) { // InformerOptions configure a Reflector. type InformerOptions struct { + // Logger, if not nil, is used instead of klog.Background() for logging. + Logger *klog.Logger + // ListerWatcher implements List and Watch functions for the source of the resource // the informer will be informing about. ListerWatcher ListerWatcher @@ -587,6 +605,7 @@ func NewTransformingIndexerInformer( // Multiplexes updates in the form of a list of Deltas into a Store, and informs // a given handler of events OnUpdate, OnAdd, OnDelete func processDeltas( + logger klog.Logger, // Object which receives event notifications from the given deltas handler ResourceEventHandler, clientState Store, @@ -604,7 +623,7 @@ func processDeltas( if !ok { return fmt.Errorf("ReplacedAll did not contain ReplacedAllInfo: %T", obj) } - if err := processReplacedAllInfo(handler, info, clientState, isInInitialList, keyFunc); err != nil { + if err := processReplacedAllInfo(logger, handler, info, clientState, isInInitialList, keyFunc); err != nil { return err } case SyncAll: @@ -649,6 +668,7 @@ func processDeltas( // Returns an error if any Delta or transaction fails. For TransactionError, // only successful operations trigger callbacks. func processDeltasInBatch( + logger klog.Logger, handler ResourceEventHandler, clientState Store, deltas []Delta, @@ -662,7 +682,7 @@ func processDeltasInBatch( if !txnSupported { var errs []error for _, delta := range deltas { - if err := processDeltas(handler, clientState, Deltas{delta}, isInInitialList, keyFunc); err != nil { + if err := processDeltas(logger, handler, clientState, Deltas{delta}, isInInitialList, keyFunc); err != nil { errs = append(errs, err) } } @@ -727,7 +747,7 @@ func processDeltasInBatch( return nil } -func processReplacedAllInfo(handler ResourceEventHandler, info ReplacedAllInfo, clientState Store, isInInitialList bool, keyFunc KeyFunc) error { +func processReplacedAllInfo(logger klog.Logger, handler ResourceEventHandler, info ReplacedAllInfo, clientState Store, isInInitialList bool, keyFunc KeyFunc) error { var deletions []DeletedFinalStateUnknown type replacement struct { oldObj interface{} @@ -735,7 +755,7 @@ func processReplacedAllInfo(handler ResourceEventHandler, info ReplacedAllInfo, } replacements := make([]replacement, 0, len(info.Objects)) - err := reconcileReplacement(nil, clientState, info.Objects, keyFunc, + err := reconcileReplacement(logger, nil, clientState, info.Objects, keyFunc, func(obj DeletedFinalStateUnknown) error { deletions = append(deletions, obj) return nil @@ -784,7 +804,11 @@ func newInformer(clientState Store, options InformerOptions, keyFunc KeyFunc) Co // KeyLister, that way resync operations will result in the correct set // of update/delete deltas. - fifo := newQueueFIFO(clientState, options.Transform, options.Identifier, options.FIFOMetricsProvider) + logger := klog.Background() + if options.Logger != nil { + logger = *options.Logger + } + logger, fifo := newQueueFIFO(logger, options.ObjectType, clientState, options.Transform, options.Identifier, options.FIFOMetricsProvider) cfg := &Config{ Queue: fifo, @@ -795,20 +819,30 @@ func newInformer(clientState Store, options InformerOptions, keyFunc KeyFunc) Co Process: func(obj interface{}, isInInitialList bool) error { if deltas, ok := obj.(Deltas); ok { - return processDeltas(options.Handler, clientState, deltas, isInInitialList, keyFunc) + // This must be the logger *of the fifo*. + return processDeltas(logger, options.Handler, clientState, deltas, isInInitialList, keyFunc) } return errors.New("object given as Process argument is not Deltas") }, ProcessBatch: func(deltaList []Delta, isInInitialList bool) error { - return processDeltasInBatch(options.Handler, clientState, deltaList, isInInitialList, keyFunc) + // Same here. + return processDeltasInBatch(logger, options.Handler, clientState, deltaList, isInInitialList, keyFunc) }, } return New(cfg) } -func newQueueFIFO(clientState Store, transform TransformFunc, identifier InformerNameAndResource, metricsProvider FIFOMetricsProvider) Queue { +// newQueueFIFO constructs a new FIFO, choosing between real and delta FIFO +// depending on the InOrderInformers feature gate. +// +// It returns the FIFO and the logger used by the FIFO. +// That logger includes the name used for the FIFO, +// in contrast to the logger which was passed in. +func newQueueFIFO(logger klog.Logger, objectType any, clientState Store, transform TransformFunc, identifier InformerNameAndResource, metricsProvider FIFOMetricsProvider) (klog.Logger, Queue) { if clientgofeaturegate.FeatureGates().Enabled(clientgofeaturegate.InOrderInformers) { options := RealFIFOOptions{ + Logger: &logger, + Name: fmt.Sprintf("RealFIFO %T", objectType), KeyFunction: MetaNamespaceKeyFunc, Transformer: transform, Identifier: identifier, @@ -821,12 +855,16 @@ func newQueueFIFO(clientState Store, transform TransformFunc, identifier Informe } else { options.KnownObjects = clientState } - return NewRealFIFOWithOptions(options) + f := NewRealFIFOWithOptions(options) + return f.logger, f } else { - return NewDeltaFIFOWithOptions(DeltaFIFOOptions{ + f := NewDeltaFIFOWithOptions(DeltaFIFOOptions{ + Logger: &logger, + Name: fmt.Sprintf("DeltaFIFO %T", objectType), KnownObjects: clientState, EmitDeltaTypeReplaced: true, Transformer: transform, }) + return f.logger, f } } diff --git a/tools/cache/controller_bench_test.go b/tools/cache/controller_bench_test.go index 754ab3f12..f81a3d3f9 100644 --- a/tools/cache/controller_bench_test.go +++ b/tools/cache/controller_bench_test.go @@ -26,6 +26,7 @@ import ( v1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" fcache "k8s.io/client-go/tools/cache/testing" + "k8s.io/klog/v2/ktesting" ) const handlerWaitTime = time.Millisecond @@ -33,7 +34,8 @@ const handlerWaitTime = time.Millisecond func BenchmarkAddWithSlowHandlers(b *testing.B) { for _, unlockWhileProcessing := range []bool{false, true} { b.Run(fmt.Sprintf("unlockWhileProcessing=%t", unlockWhileProcessing), func(b *testing.B) { - ctx, cancel := context.WithCancel(context.Background()) + logger, ctx := ktesting.NewTestContext(b) + ctx, cancel := context.WithCancel(ctx) source := fcache.NewFakeControllerSource() b.Cleanup(func() { cancel() @@ -65,12 +67,12 @@ func BenchmarkAddWithSlowHandlers(b *testing.B) { Process: func(obj interface{}, isInInitialList bool) error { if deltas, ok := obj.(Deltas); ok { - return processDeltas(handler, store, deltas, isInInitialList, DeletionHandlingMetaNamespaceKeyFunc) + return processDeltas(logger, handler, store, deltas, isInInitialList, DeletionHandlingMetaNamespaceKeyFunc) } return errors.New("object given as Process argument is not Deltas") }, ProcessBatch: func(deltaList []Delta, isInInitialList bool) error { - return processDeltasInBatch(handler, store, deltaList, isInInitialList, DeletionHandlingMetaNamespaceKeyFunc) + return processDeltasInBatch(logger, handler, store, deltaList, isInInitialList, DeletionHandlingMetaNamespaceKeyFunc) }, } c := New(cfg) diff --git a/tools/cache/controller_test.go b/tools/cache/controller_test.go index c4ee71f47..9528dc6ca 100644 --- a/tools/cache/controller_test.go +++ b/tools/cache/controller_test.go @@ -834,6 +834,7 @@ func TestProcessDeltasInBatch(t *testing.T) { } for _, tc := range testCases { t.Run(tc.name, func(t *testing.T) { + logger, _ := ktesting.NewTestContext(t) mockStore := &mockTxnStore{ Store: NewStore(MetaNamespaceKeyFunc), failingObjs: tc.failingObjects, @@ -851,6 +852,7 @@ func TestProcessDeltasInBatch(t *testing.T) { }, } err := processDeltasInBatch( + logger, dummyListener, mockStore, tc.deltaList, @@ -929,12 +931,12 @@ func TestReplaceEvents(t *testing.T) { Process: func(obj interface{}, isInInitialList bool) error { if deltas, ok := obj.(Deltas); ok { - return processDeltas(recorder, store, deltas, isInInitialList, DeletionHandlingMetaNamespaceKeyFunc) + return processDeltas(fifo.logger, recorder, store, deltas, isInInitialList, DeletionHandlingMetaNamespaceKeyFunc) } return errors.New("object given as Process argument is not Deltas") }, ProcessBatch: func(deltaList []Delta, isInInitialList bool) error { - return processDeltasInBatch(recorder, store, deltaList, isInInitialList, DeletionHandlingMetaNamespaceKeyFunc) + return processDeltasInBatch(fifo.logger, recorder, store, deltaList, isInInitialList, DeletionHandlingMetaNamespaceKeyFunc) }, } c := New(cfg) @@ -1066,12 +1068,12 @@ func TestResetWatch(t *testing.T) { Process: func(obj interface{}, isInInitialList bool) error { if deltas, ok := obj.(Deltas); ok { - return processDeltas(recorder, store, deltas, isInInitialList, DeletionHandlingMetaNamespaceKeyFunc) + return processDeltas(fifo.logger, recorder, store, deltas, isInInitialList, DeletionHandlingMetaNamespaceKeyFunc) } return errors.New("object given as Process argument is not Deltas") }, ProcessBatch: func(deltaList []Delta, isInInitialList bool) error { - return processDeltasInBatch(recorder, store, deltaList, isInInitialList, DeletionHandlingMetaNamespaceKeyFunc) + return processDeltasInBatch(fifo.logger, recorder, store, deltaList, isInInitialList, DeletionHandlingMetaNamespaceKeyFunc) }, } c := New(cfg) diff --git a/tools/cache/delta_fifo.go b/tools/cache/delta_fifo.go index 059c78bd3..b57cc4728 100644 --- a/tools/cache/delta_fifo.go +++ b/tools/cache/delta_fifo.go @@ -22,6 +22,7 @@ import ( "sync" "time" + utilruntime "k8s.io/apimachinery/pkg/util/runtime" "k8s.io/apimachinery/pkg/util/sets" "k8s.io/klog/v2" @@ -31,6 +32,12 @@ import ( // DeltaFIFOOptions is the configuration parameters for DeltaFIFO. All are // optional. type DeltaFIFOOptions struct { + // If set, log output will go to this logger instead of klog.Background(). + // The name of the fifo gets added automatically. + Logger *klog.Logger + + // Name can be used to override the default "DeltaFIFO" name for the new instance. + Name string // KeyFunction is used to figure out what key an object should have. (It's // exposed in the returned DeltaFIFO's KeyOf() method, with additional @@ -55,9 +62,6 @@ type DeltaFIFOOptions struct { // If set, will be called for objects before enqueueing them. Please // see the comment on TransformFunc for details. Transformer TransformFunc - - // If set, log output will go to this logger instead of klog.Background(). - Logger *klog.Logger } // DeltaFIFO is like FIFO, but differs in two ways. One is that the @@ -102,6 +106,13 @@ type DeltaFIFOOptions struct { // threads, you could end up with multiple threads processing slightly // different versions of the same object. type DeltaFIFO struct { + // logger is a per-instance logger. This gets chosen when constructing + // the instance, with klog.Background() as default. + logger klog.Logger + + // name is the name of the fifo. It is included in the logger. + name string + // lock/cond protects access to 'items' and 'queue'. lock sync.RWMutex cond sync.Cond @@ -115,6 +126,11 @@ type DeltaFIFO struct { // A key is in `queue` if and only if it is in `items`. queue []string + // synced is initially an open channel. It gets closed (once!) by checkSynced_locked + // as soon as the initial sync is considered complete. + synced chan struct{} + syncedClosed bool + // populated is true if the first batch of items inserted by Replace() has been populated // or Delete/Add/Update/AddIfNotPresent was called first. populated bool @@ -139,10 +155,6 @@ type DeltaFIFO struct { // Called with every object if non-nil. transformer TransformFunc - - // logger is a per-instance logger. This gets chosen when constructing - // the instance, with klog.Background() as default. - logger klog.Logger } // TransformFunc allows for transforming an object before it will be processed. @@ -263,6 +275,9 @@ func NewDeltaFIFOWithOptions(opts DeltaFIFOOptions) *DeltaFIFO { } f := &DeltaFIFO{ + logger: klog.Background(), + name: "DeltaFIFO", + synced: make(chan struct{}), items: map[string]Deltas{}, queue: []string{}, keyFunc: opts.KeyFunction, @@ -270,11 +285,14 @@ func NewDeltaFIFOWithOptions(opts DeltaFIFOOptions) *DeltaFIFO { emitDeltaTypeReplaced: opts.EmitDeltaTypeReplaced, transformer: opts.Transformer, - logger: klog.Background(), } if opts.Logger != nil { f.logger = *opts.Logger } + if name := opts.Name; name != "" { + f.name = name + } + f.logger = klog.LoggerWithName(f.logger, f.name) f.cond.L = &f.lock return f } @@ -282,6 +300,7 @@ func NewDeltaFIFOWithOptions(opts DeltaFIFOOptions) *DeltaFIFO { var ( _ = Queue(&DeltaFIFO{}) // DeltaFIFO is a Queue _ = TransformingStore(&DeltaFIFO{}) // DeltaFIFO implements TransformingStore to allow memory optimizations + _ = DoneChecker(&DeltaFIFO{}) // DeltaFIFO implements DoneChecker. ) var ( @@ -327,8 +346,36 @@ func (f *DeltaFIFO) HasSynced() bool { return f.hasSynced_locked() } +// HasSyncedChecker is done if an Add/Update/Delete/AddIfNotPresent are called first, +// or the first batch of items inserted by Replace() has been popped. +func (f *DeltaFIFO) HasSyncedChecker() DoneChecker { + return f +} + +// Name implements [DoneChecker.Name] +func (f *DeltaFIFO) Name() string { + return f.name +} + +// Done implements [DoneChecker.Done] +func (f *DeltaFIFO) Done() <-chan struct{} { + return f.synced +} + +// hasSynced_locked returns the result of a prior checkSynced_locked call. func (f *DeltaFIFO) hasSynced_locked() bool { - return f.populated && f.initialPopulationCount == 0 + return f.syncedClosed +} + +// checkSynced_locked checks whether the initial is completed. +// It must be called whenever populated or initialPopulationCount change. +func (f *DeltaFIFO) checkSynced_locked() { + synced := f.populated && f.initialPopulationCount == 0 + if synced && !f.syncedClosed { + // Initial sync is complete. + f.syncedClosed = true + close(f.synced) + } } // Add inserts an item, and puts it in the queue. The item is only enqueued @@ -337,6 +384,7 @@ func (f *DeltaFIFO) Add(obj interface{}) error { f.lock.Lock() defer f.lock.Unlock() f.populated = true + f.checkSynced_locked() return f.queueActionLocked(Added, obj) } @@ -345,6 +393,7 @@ func (f *DeltaFIFO) Update(obj interface{}) error { f.lock.Lock() defer f.lock.Unlock() f.populated = true + f.checkSynced_locked() return f.queueActionLocked(Updated, obj) } @@ -361,6 +410,7 @@ func (f *DeltaFIFO) Delete(obj interface{}) error { f.lock.Lock() defer f.lock.Unlock() f.populated = true + f.checkSynced_locked() if f.knownObjects == nil { if _, exists := f.items[id]; !exists { // Presumably, this was deleted when a relist happened. @@ -477,10 +527,10 @@ func (f *DeltaFIFO) queueActionInternalLocked(actionType, internalActionType Del // when given a non-empty list (as it is here). // If somehow it happens anyway, deal with it but complain. if oldDeltas == nil { - f.logger.Error(nil, "Impossible dedupDeltas, ignoring", "id", id, "oldDeltas", oldDeltas, "obj", obj) + utilruntime.HandleErrorWithLogger(f.logger, nil, "Impossible dedupDeltas, ignoring", "id", id, "oldDeltas", oldDeltas, "obj", obj) return nil } - f.logger.Error(nil, "Impossible dedupDeltas, breaking invariant by storing empty Deltas", "id", id, "oldDeltas", oldDeltas, "obj", obj) + utilruntime.HandleErrorWithLogger(f.logger, nil, "Impossible dedupDeltas, breaking invariant by storing empty Deltas", "id", id, "oldDeltas", oldDeltas, "obj", obj) f.items[id] = newDeltas return fmt.Errorf("Impossible dedupDeltas for id=%q: oldDeltas=%#+v, obj=%#+v; broke DeltaFIFO invariant by storing empty Deltas", id, oldDeltas, obj) } @@ -526,11 +576,12 @@ func (f *DeltaFIFO) Pop(process PopProcessFunc) (interface{}, error) { depth := len(f.queue) if f.initialPopulationCount > 0 { f.initialPopulationCount-- + f.checkSynced_locked() } item, ok := f.items[id] if !ok { // This should never happen - f.logger.Error(nil, "Inconceivable! Item was in f.queue but not f.items; ignoring", "id", id) + utilruntime.HandleErrorWithLogger(f.logger, nil, "Inconceivable! Item was in f.queue but not f.items; ignoring", "id", id) continue } delete(f.items, id) @@ -623,7 +674,7 @@ func (f *DeltaFIFO) Replace(list []interface{}, _ string) error { deletedObj, exists, err := f.knownObjects.GetByKey(k) if err != nil { deletedObj = nil - f.logger.Error(err, "Unexpected error during lookup, placing DeleteFinalStateUnknown marker without object", "key", k) + utilruntime.HandleErrorWithLogger(f.logger, err, "Unexpected error during lookup, placing DeleteFinalStateUnknown marker without object", "key", k) } else if !exists { deletedObj = nil f.logger.Info("Key does not exist in known objects store, placing DeleteFinalStateUnknown marker without object", "key", k) @@ -638,6 +689,7 @@ func (f *DeltaFIFO) Replace(list []interface{}, _ string) error { if !f.populated { f.populated = true f.initialPopulationCount = keys.Len() + queuedDeletions + f.checkSynced_locked() } return nil @@ -666,7 +718,7 @@ func (f *DeltaFIFO) Resync() error { func (f *DeltaFIFO) syncKeyLocked(key string) error { obj, exists, err := f.knownObjects.GetByKey(key) if err != nil { - f.logger.Error(err, "Unexpected error during lookup, unable to queue object for sync", "key", key) + utilruntime.HandleErrorWithLogger(f.logger, err, "Unexpected error during lookup, unable to queue object for sync", "key", key) return nil } else if !exists { f.logger.Info("Key does not exist in known objects store, unable to queue object for sync", "key", key) diff --git a/tools/cache/event_handler_name.go b/tools/cache/event_handler_name.go new file mode 100644 index 000000000..d466fc6e2 --- /dev/null +++ b/tools/cache/event_handler_name.go @@ -0,0 +1,121 @@ +/* +Copyright The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package cache + +import ( + "fmt" + "reflect" + "runtime" + "strings" +) + +func nameForHandler(handler ResourceEventHandler) (name string) { + defer func() { + // Last resort: let Sprintf handle it. + if name == "" { + name = fmt.Sprintf("%T", handler) + } + }() + + if handler == nil { + return "" + } + switch handler := handler.(type) { + case *ResourceEventHandlerFuncs: + return nameForHandlerFuncs(*handler) + case ResourceEventHandlerFuncs: + return nameForHandlerFuncs(handler) + default: + // We can use the fully qualified name of whatever + // provides the interface. We don't care whether + // it contains fields or methods which provide + // the interface methods. + value := reflect.ValueOf(handler) + if value.Type().Kind() == reflect.Interface { + // Probably not needed, but let's play it safe. + value = value.Elem() + } + if value.Type().Kind() == reflect.Pointer { + if !value.IsNil() { + value = value.Elem() + } + } + name := value.Type().PkgPath() + if name != "" { + name += "." + } + if typeName := value.Type().Name(); typeName != "" { + name += typeName + } + return name + } +} + +func nameForHandlerFuncs(funcs ResourceEventHandlerFuncs) string { + return nameForFunctions(funcs.AddFunc, funcs.UpdateFunc, funcs.DeleteFunc) +} + +func nameForFunctions(fs ...any) string { + // If all functions are defined in the same place, then we + // don't care about the actual function name in + // e.g. "main.FuncName" or "main.(*Foo).FuncName-fm", instead + // we use the common qualifier. + // + // But we don't know that yet, so we also collect all names. + var qualifier string + singleQualifier := true + var names []string + for _, f := range fs { + if f == nil { + continue + } + name := nameForFunction(f) + if name == "" { + continue + } + names = append(names, name) + + newQualifier := name + index := strings.LastIndexByte(newQualifier, '.') + if index > 0 { + newQualifier = newQualifier[:index] + } + switch qualifier { + case "": + qualifier = newQualifier + case newQualifier: + // So far, so good... + default: + // Nope, different. + singleQualifier = false + } + } + + if singleQualifier { + return qualifier + } + + return strings.Join(names, "+") +} + +func nameForFunction(f any) string { + fn := runtime.FuncForPC(reflect.ValueOf(f).Pointer()) + if fn == nil { + return "" + } + return fn.Name() +} diff --git a/tools/cache/event_handler_name_test.go b/tools/cache/event_handler_name_test.go new file mode 100644 index 000000000..9cb1434f9 --- /dev/null +++ b/tools/cache/event_handler_name_test.go @@ -0,0 +1,93 @@ +/* +Copyright The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package cache + +import ( + "testing" +) + +type mockHandler struct{} + +func (m mockHandler) OnAdd(any, bool) {} +func (m mockHandler) OnUpdate(any, any) {} +func (m mockHandler) OnDelete(any) {} + +func TestNameForHandler(t *testing.T) { + emptyHandler := ResourceEventHandlerFuncs{} + + for name, tc := range map[string]struct { + handler ResourceEventHandler + wantName string + }{ + "mixture": { + handler: ResourceEventHandlerFuncs{ + UpdateFunc: emptyHandler.OnUpdate, + DeleteFunc: func(any) {}, + }, + wantName: "k8s.io/client-go/tools/cache.ResourceEventHandlerFuncs.OnUpdate-fm+k8s.io/client-go/tools/cache.TestNameForHandler.func1", // Testcase must come first to get func1. + }, + "add": { + handler: ResourceEventHandlerFuncs{AddFunc: func(any) {}}, + wantName: "k8s.io/client-go/tools/cache.TestNameForHandler", + }, + "update": { + handler: ResourceEventHandlerFuncs{UpdateFunc: func(any, any) {}}, + wantName: "k8s.io/client-go/tools/cache.TestNameForHandler", + }, + "delete": { + handler: ResourceEventHandlerFuncs{DeleteFunc: func(any) {}}, + wantName: "k8s.io/client-go/tools/cache.TestNameForHandler", + }, + "all": { + handler: ResourceEventHandlerFuncs{ + AddFunc: func(any) {}, + UpdateFunc: func(any, any) {}, + DeleteFunc: func(any) {}, + }, + wantName: "k8s.io/client-go/tools/cache.TestNameForHandler", + }, + "ptrToFuncs": { + handler: &ResourceEventHandlerFuncs{AddFunc: func(any) {}}, + wantName: "k8s.io/client-go/tools/cache.TestNameForHandler", + }, + "struct": { + handler: mockHandler{}, + wantName: "k8s.io/client-go/tools/cache.mockHandler", + }, + "ptrToStruct": { + handler: &mockHandler{}, + wantName: "k8s.io/client-go/tools/cache.mockHandler", + }, + "nil": { + handler: nil, + wantName: "", + }, + "stored-nil": { + // This is a bit odd, but one unit test actually registered + // such an event handler and it somehow worked. + handler: (*mockHandler)(nil), + wantName: "*cache.mockHandler", + }, + } { + t.Run(name, func(t *testing.T) { + gotName := nameForHandler(tc.handler) + if gotName != tc.wantName { + t.Errorf("Got name:\n %s\nWanted name:\n %s", gotName, tc.wantName) + } + }) + } +} diff --git a/tools/cache/fifo.go b/tools/cache/fifo.go index 44d006b69..e31e2d270 100644 --- a/tools/cache/fifo.go +++ b/tools/cache/fifo.go @@ -58,6 +58,12 @@ type Queue interface { // Update, or Delete; otherwise the first batch is empty. HasSynced() bool + // HasSyncedChecker is done once the first batch of keys have all been + // popped. The first batch of keys are those of the first Replace + // operation if that happened before any Add, AddIfNotPresent, + // Update, or Delete; otherwise the first batch is empty. + HasSyncedChecker() DoneChecker + // Close the queue Close() } @@ -110,6 +116,11 @@ type FIFO struct { items map[string]interface{} queue []string + // synced is initially an open channel. It gets closed (once!) by checkSynced + // as soon as the initial sync is considered complete. + synced chan struct{} + syncedClosed bool + // populated is true if the first batch of items inserted by Replace() has been populated // or Delete/Add/Update was called first. populated bool @@ -127,7 +138,8 @@ type FIFO struct { } var ( - _ = Queue(&FIFO{}) // FIFO is a Queue + _ = Queue(&FIFO{}) // FIFO is a Queue + _ = DoneChecker(&FIFO{}) // ... and implements DoneChecker. ) // Close the queue. @@ -146,8 +158,36 @@ func (f *FIFO) HasSynced() bool { return f.hasSynced_locked() } +// HasSyncedChecker is done if an Add/Update/Delete/AddIfNotPresent are called first, +// or the first batch of items inserted by Replace() has been popped. +func (f *FIFO) HasSyncedChecker() DoneChecker { + return f +} + +// Name implements [DoneChecker.Name] +func (f *FIFO) Name() string { + return "FIFO" // FIFO doesn't seem to be used outside of a few tests, so changing the NewFIFO API to pass in a name doesn't seem worth it. +} + +// Done implements [DoneChecker.Done] +func (f *FIFO) Done() <-chan struct{} { + return f.synced +} + +// hasSynced_locked returns the result of a prior checkSynced call. func (f *FIFO) hasSynced_locked() bool { - return f.populated && f.initialPopulationCount == 0 + return f.syncedClosed +} + +// checkSynced checks whether the initial sync is completed. +// It must be called whenever populated or initialPopulationCount change +// while the mutex is still locked. +func (f *FIFO) checkSynced() { + synced := f.populated && f.initialPopulationCount == 0 + if synced && !f.syncedClosed { + f.syncedClosed = true + close(f.synced) + } } // Add inserts an item, and puts it in the queue. The item is only enqueued @@ -160,6 +200,7 @@ func (f *FIFO) Add(obj interface{}) error { f.lock.Lock() defer f.lock.Unlock() f.populated = true + f.checkSynced() if _, exists := f.items[id]; !exists { f.queue = append(f.queue, id) } @@ -184,6 +225,7 @@ func (f *FIFO) Delete(obj interface{}) error { f.lock.Lock() defer f.lock.Unlock() f.populated = true + f.checkSynced() delete(f.items, id) return err } @@ -220,6 +262,8 @@ func (f *FIFO) Pop(process PopProcessFunc) (interface{}, error) { f.queue = f.queue[1:] if f.initialPopulationCount > 0 { f.initialPopulationCount-- + // Must be done *after* process has completed. + defer f.checkSynced() } item, ok := f.items[id] if !ok { @@ -252,6 +296,7 @@ func (f *FIFO) Replace(list []interface{}, resourceVersion string) error { if !f.populated { f.populated = true f.initialPopulationCount = len(items) + f.checkSynced() } f.items = items @@ -290,6 +335,7 @@ func (f *FIFO) Resync() error { // process. func NewFIFO(keyFunc KeyFunc) *FIFO { f := &FIFO{ + synced: make(chan struct{}), items: map[string]interface{}{}, queue: []string{}, keyFunc: keyFunc, diff --git a/tools/cache/processor_listener_test.go b/tools/cache/processor_listener_test.go index 517258a16..cd8c5c739 100644 --- a/tools/cache/processor_listener_test.go +++ b/tools/cache/processor_listener_test.go @@ -17,6 +17,7 @@ limitations under the License. package cache import ( + "context" "sync" "testing" "time" @@ -29,6 +30,25 @@ const ( concurrencyLevel = 5 ) +type mockSynced struct { + context.Context + cancel func() +} + +func newMockSynced(tb testing.TB, synced bool) *mockSynced { + m := &mockSynced{} + m.Context, m.cancel = context.WithCancel(context.Background()) + if synced { + m.cancel() + } + tb.Cleanup(m.cancel) + return m +} + +func (m *mockSynced) Name() string { + return "mock" +} + func BenchmarkListener(b *testing.B) { var notification addNotification @@ -40,7 +60,7 @@ func BenchmarkListener(b *testing.B) { AddFunc: func(obj interface{}) { swg.Done() }, - }, 0, 0, time.Now(), 1024*1024, func() bool { return true }) + }, 0, 0, time.Now(), 1024*1024, newMockSynced(b, true)) var wg wait.Group defer wg.Wait() // Wait for .run and .pop to stop defer close(pl.addCh) // Tell .run and .pop to stop diff --git a/tools/cache/reflector.go b/tools/cache/reflector.go index 4d8d73d3b..62ea68c72 100644 --- a/tools/cache/reflector.go +++ b/tools/cache/reflector.go @@ -85,6 +85,7 @@ type TransformingStore interface { // Reflector watches a specified resource and causes all changes to be reflected in the given store. type Reflector struct { + logger klog.Logger // name identifies this reflector. By default, it will be a file:line if possible. name string // The name of the type we expect to place in the store. The name @@ -227,6 +228,10 @@ func NewNamedReflector(name string, lw ListerWatcher, expectedType interface{}, // ReflectorOptions configures a Reflector. type ReflectorOptions struct { + // Logger, if not nil, is used instead of klog.Background() for logging. + // The name of the reflector gets added automatically. + Logger *klog.Logger + // Name is the Reflector's name. If unset/unspecified, the name defaults to the closest source_file.go:line // in the call stack that is outside this package. Name string @@ -289,6 +294,13 @@ func NewReflectorWithOptions(lw ListerWatcher, expectedType interface{}, store R r.name = naming.GetNameFromCallsite(internalPackages...) } + logger := klog.Background() + if options.Logger != nil { + logger = *options.Logger + } + logger = klog.LoggerWithName(logger, r.name) + r.logger = logger + if r.typeDescription == "" { r.typeDescription = getTypeDescriptionFromObject(expectedType) } @@ -299,9 +311,7 @@ func NewReflectorWithOptions(lw ListerWatcher, expectedType interface{}, store R r.useWatchList = clientfeatures.FeatureGates().Enabled(clientfeatures.WatchListClient) if r.useWatchList && watchlist.DoesClientNotSupportWatchListSemantics(lw) { - // Using klog.TODO() here because switching to a caller-provided contextual logger - // would require an API change and updating all existing call sites. - klog.TODO().V(2).Info( + r.logger.V(2).Info( "The client used to build this informer/reflector doesn't support WatchList semantics. The feature will be disabled. This is expected in unit tests but not in production. For details, see the documentation of watchlist.DoesClientNotSupportWatchListSemantics().", "feature", clientfeatures.WatchListClient, ) diff --git a/tools/cache/reflector_data_consistency_detector_test.go b/tools/cache/reflector_data_consistency_detector_test.go index 34abcb278..fc9c1529e 100644 --- a/tools/cache/reflector_data_consistency_detector_test.go +++ b/tools/cache/reflector_data_consistency_detector_test.go @@ -68,12 +68,12 @@ func TestReflectorDataConsistencyDetector(t *testing.T) { } func runTestReflectorDataConsistencyDetector(t *testing.T, transformer TransformFunc) { - _, ctx := ktesting.NewTestContext(t) + logger, ctx := ktesting.NewTestContext(t) ctx, cancel := context.WithCancel(ctx) defer cancel() store := NewStore(MetaNamespaceKeyFunc) - fifo := newQueueFIFO(store, transformer, InformerNameAndResource{}, nil) + _, fifo := newQueueFIFO(logger, nil, store, transformer, InformerNameAndResource{}, nil) lw := &ListWatch{ ListFunc: func(options metav1.ListOptions) (runtime.Object, error) { diff --git a/tools/cache/shared_informer.go b/tools/cache/shared_informer.go index b1339316e..0f145b72a 100644 --- a/tools/cache/shared_informer.go +++ b/tools/cache/shared_informer.go @@ -20,6 +20,9 @@ import ( "context" "errors" "fmt" + "reflect" + "slices" + "strings" "sync" "sync/atomic" "time" @@ -194,6 +197,14 @@ type SharedInformer interface { // For that, please call HasSynced on the handle returned by // AddEventHandler. HasSynced() bool + // HasSyncedChecker completes if the shared informer's store has been + // informed by at least one full LIST of the authoritative state + // of the informer's object collection. This is unrelated to "resync". + // + // Note that this doesn't tell you if an individual handler is synced!! + // For that, please use HasSyncedChecker on the handle returned by + // AddEventHandler. + HasSyncedChecker() DoneChecker // LastSyncResourceVersion is the resource version observed when last synced with the underlying // store. The value returned is not synchronized with access to the underlying store and is not // thread-safe. @@ -247,6 +258,10 @@ type ResourceEventHandlerRegistration interface { // HasSynced reports if both the parent has synced and all pre-sync // events have been delivered. HasSynced() bool + + // HasSyncedChecker reports if both the parent has synced and all pre-sync + // events have been delivered. + HasSyncedChecker() DoneChecker } // Optional configuration options for [SharedInformer.AddEventHandlerWithOptions]. @@ -309,6 +324,7 @@ func NewSharedIndexInformerWithOptions(lw ListerWatcher, exampleObject runtime.O return &sharedIndexInformer{ indexer: NewIndexer(DeletionHandlingMetaNamespaceKeyFunc, options.Indexers), processor: processor, + synced: make(chan struct{}), listerWatcher: lw, objectType: exampleObject, objectDescription: options.ObjectDescription, @@ -414,6 +430,147 @@ func WaitForCacheSync(stopCh <-chan struct{}, cacheSyncs ...InformerSynced) bool return true } +// WaitFor waits for a set of activities to complete, like cache syncing. +// It returns true if it was successful, false if the context was canceled +// before all activities are completed. +// +// If a non-nil "what" is provided, then progress information is logged +// while waiting ("Waiting", for=""). Verbosity is V(0). This can +// be made less verbose by the caller with: +// +// WaitFor(klog.NewContext(ctx, klog.FromContext(ctx).V(2)), ...) +// +// In contrast to other WaitForCacheSync alternatives, this one here doesn't +// need polling, which makes it react immediately. When used in a synctest unit +// test, waiting completes without moving time forward randomly, which +// makes tests more predictable. +func WaitFor(ctx context.Context, what string, checkers ...DoneChecker) bool { + logger := klog.FromContext(ctx) + if what != "" { + helper, l := logger.WithCallStackHelper() + logger = l + helper() + logger.Info("Waiting", "for", what) + } + + // Check in parallel to ensure that we log "Done waiting" as soon + // as possible for each checker. The timing may be useful to know. + // We cannot log inside the goroutine, the stack unwinding wouldn't + // work, so instead each goroutine just notifies the parent + // goroutine when it's checker is done and the main goroutine then + // logs it. + var wg sync.WaitGroup + type result struct { + checker DoneChecker + done bool + } + doneChecker := make(chan result) + for _, checker := range checkers { + wg.Go(func() { + select { + case <-checker.Done(): + doneChecker <- result{checker, true} + case <-ctx.Done(): + // We can end up here even when the checker is already done, + // select is not deterministic. Check once more without blocking + // before finally giving up. + select { + case <-checker.Done(): + doneChecker <- result{checker, true} + default: + doneChecker <- result{checker, false} + } + } + }) + } + instances := make([]string, 0, len(checkers)) + for range len(checkers) { + // We are guaranteed to get exactly one result from each goroutine, so this won't block forever. + result := <-doneChecker + if result.done { + if what != "" { + logger.Info("Done waiting", "for", what, "instance", result.checker.Name()) + } + } else { + // We don't need this information unless we are a) logging or b) debugging interactively. + instances = append(instances, result.checker.Name()) + } + } + wg.Wait() + + if what != "" && len(instances) > 0 { + slices.Sort(instances) + logger.Info("Timed out waiting", "for", what, "cause", context.Cause(ctx), "instances", instances) + } + + done := len(instances) == 0 + return done +} + +// DoneChecker, in contrast to [InformerSynced], supports waiting +// for some activity to finish without polling and has a name +// that describes itself. +// +// To check for completion without blocking, use [IsDone]. +type DoneChecker interface { + // Name returns a string describing the entity that is being waited for. + // + // Note that this name might be computed, so callers should only + // get the name outside of a hot code path. + Name() string + + // Done returns a channel that will be closed on completion + // of the activity. + Done() <-chan struct{} +} + +// IsDone returns true if the activity is done, false otherwise. +func IsDone(checker DoneChecker) bool { + select { + case <-checker.Done(): + return true + default: + return false + } +} + +// SyncResult is the result of a shared informer factory's WaitForCacheSyncWithContext. +// Under the hood such factories use [WaitFor] to wait for all instantiated informers, +// then provide this summary of what was synced. +// +// Note that the informers may have synced already before all event handlers registered with +// those informers have synced. Code which wants to be sure that all of its state is up-to-date +// should do its own WaitFor with the informer's HasSyncedChecker() *and* the +// registration handle's HasSyncChecker() results. +type SyncResult struct { + // Err is nil if all informer caches were synced, otherwise it is + // the reason why waiting for cache syncing stopped (= context.Cause(ctx)). + Err error + + // Synced maps each registered informer in a SharedInformerFactory to + // true if it has synced, false otherwise. + Synced map[reflect.Type]bool +} + +// AsError turns a SyncResult into an error if not all caches were synced, +// otherwise it returns nil. The error wraps context.Cause(ctx) and +// includes information about the informers which were not synced. +func (c SyncResult) AsError() error { + if c.Err == nil { + return nil + } + + unsynced := make([]string, 0, len(c.Synced)) + for t, synced := range c.Synced { + if !synced { + unsynced = append(unsynced, t.String()) + } + } + slices.Sort(unsynced) + + return fmt.Errorf("failed to sync all caches: %s: %w", strings.Join(unsynced, ", "), c.Err) +} + // `*sharedIndexInformer` implements SharedIndexInformer and has three // main components. One is an indexed local cache, `indexer Indexer`. // The second main component is a Controller that pulls @@ -431,6 +588,10 @@ type sharedIndexInformer struct { indexer Indexer controller Controller + // synced gets created when creating the sharedIndexInformer. + // It gets closed when Run detects that the processor created + synced chan struct{} + processor *sharedProcessor cacheMutationDetector MutationDetector @@ -494,6 +655,10 @@ func (v *dummyController) HasSynced() bool { return v.informer.HasSynced() } +func (v *dummyController) HasSyncedChecker() DoneChecker { + return v.informer.HasSyncedChecker() +} + func (v *dummyController) LastSyncResourceVersion() string { if clientgofeaturegate.FeatureGates().Enabled(clientgofeaturegate.InformerResourceVersion) { return v.informer.LastSyncResourceVersion() @@ -563,7 +728,7 @@ func (s *sharedIndexInformer) RunWithContext(ctx context.Context) { s.startedLock.Lock() defer s.startedLock.Unlock() - fifo := newQueueFIFO(s.indexer, s.transform, s.identifier, s.fifoMetricsProvider) + logger, fifo := newQueueFIFO(logger, s.objectType, s.indexer, s.transform, s.identifier, s.fifoMetricsProvider) cfg := &Config{ Queue: fifo, @@ -573,8 +738,12 @@ func (s *sharedIndexInformer) RunWithContext(ctx context.Context) { FullResyncPeriod: s.resyncCheckPeriod, ShouldResync: s.processor.shouldResync, - Process: s.HandleDeltas, - ProcessBatch: s.HandleBatchDeltas, + Process: func(obj interface{}, isInInitialList bool) error { + return s.handleDeltas(logger, obj, isInInitialList) + }, + ProcessBatch: func(deltas []Delta, isInInitialList bool) error { + return s.handleBatchDeltas(logger, deltas, isInInitialList) + }, WatchErrorHandlerWithContext: s.watchErrorHandler, } @@ -594,6 +763,15 @@ func (s *sharedIndexInformer) RunWithContext(ctx context.Context) { // has a RunWithContext method that we can use here. wg.StartWithChannel(processorStopCtx.Done(), s.cacheMutationDetector.Run) wg.StartWithContext(processorStopCtx, s.processor.run) + wg.Start(func() { + select { + case <-ctx.Done(): + // We were stopped without completing the sync. + case <-s.controller.HasSyncedChecker().Done(): + // Controller has synced and thus so have we. + close(s.synced) + } + }) defer func() { s.startedLock.Lock() @@ -610,13 +788,31 @@ func (s *sharedIndexInformer) HasStarted() bool { } func (s *sharedIndexInformer) HasSynced() bool { - s.startedLock.Lock() - defer s.startedLock.Unlock() - - if s.controller == nil { + select { + case <-s.synced: + return true + default: return false } - return s.controller.HasSynced() +} + +func (s *sharedIndexInformer) HasSyncedChecker() DoneChecker { + return &sharedIndexInformerDone{ + s: s, + } +} + +// sharedIndexInformerDone implements [NamedCacheSync] for a [sharedIndexInformer]. +type sharedIndexInformerDone struct { + s *sharedIndexInformer +} + +func (sd *sharedIndexInformerDone) Name() string { + return fmt.Sprintf("SharedIndexInformer %T", sd.s.objectType) +} + +func (sd *sharedIndexInformerDone) Done() <-chan struct{} { + return sd.s.synced } func (s *sharedIndexInformer) LastSyncResourceVersion() string { @@ -708,7 +904,7 @@ func (s *sharedIndexInformer) AddEventHandlerWithOptions(handler ResourceEventHa } } - listener := newProcessListener(logger, handler, resyncPeriod, determineResyncPeriod(logger, resyncPeriod, s.resyncCheckPeriod), s.clock.Now(), initialBufferSize, s.HasSynced) + listener := newProcessListener(logger, handler, resyncPeriod, determineResyncPeriod(logger, resyncPeriod, s.resyncCheckPeriod), s.clock.Now(), initialBufferSize, s.HasSyncedChecker()) if !s.started { return s.processor.addListener(listener), nil @@ -734,23 +930,27 @@ func (s *sharedIndexInformer) AddEventHandlerWithOptions(handler ResourceEventHa // thread adding them and the counter is temporarily zero). listener.add(addNotification{newObj: item, isInInitialList: true}) } + + // Initial list is added, now we can allow the listener to detect that "upstream has synced". + s.processor.wg.Start(listener.watchSynced) + return handle, nil } -func (s *sharedIndexInformer) HandleDeltas(obj interface{}, isInInitialList bool) error { +func (s *sharedIndexInformer) handleDeltas(logger klog.Logger, obj interface{}, isInInitialList bool) error { s.blockDeltas.Lock() defer s.blockDeltas.Unlock() if deltas, ok := obj.(Deltas); ok { - return processDeltas(s, s.indexer, deltas, isInInitialList, s.keyFunc) + return processDeltas(logger, s, s.indexer, deltas, isInInitialList, s.keyFunc) } return errors.New("object given as Process argument is not Deltas") } -func (s *sharedIndexInformer) HandleBatchDeltas(deltas []Delta, isInInitialList bool) error { +func (s *sharedIndexInformer) handleBatchDeltas(logger klog.Logger, deltas []Delta, isInInitialList bool) error { s.blockDeltas.Lock() defer s.blockDeltas.Unlock() - return processDeltasInBatch(s, s.indexer, deltas, isInInitialList, s.keyFunc) + return processDeltasInBatch(logger, s, s.indexer, deltas, isInInitialList, s.keyFunc) } // Conforms to ResourceEventHandler @@ -854,6 +1054,8 @@ func (p *sharedProcessor) addListener(listener *processorListener) ResourceEvent p.listeners[listener] = true if p.listenersStarted { + // Not starting listener.watchSynced! + // The caller must first add the initial list, then start it. p.wg.Start(listener.run) p.wg.Start(listener.pop) } @@ -925,6 +1127,7 @@ func (p *sharedProcessor) run(ctx context.Context) { p.listenersLock.Lock() defer p.listenersLock.Unlock() for listener := range p.listeners { + p.wg.Start(listener.watchSynced) p.wg.Start(listener.run) p.wg.Start(listener.pop) } @@ -986,7 +1189,7 @@ func (p *sharedProcessor) resyncCheckPeriodChanged(logger klog.Logger, resyncChe } // processorListener relays notifications from a sharedProcessor to -// one ResourceEventHandler --- using two goroutines, two unbuffered +// one ResourceEventHandler --- using three goroutines, two unbuffered // channels, and an unbounded ring buffer. The `add(notification)` // function sends the given notification to `addCh`. One goroutine // runs `pop()`, which pumps notifications from `addCh` to `nextCh` @@ -994,16 +1197,23 @@ func (p *sharedProcessor) resyncCheckPeriodChanged(logger klog.Logger, resyncChe // Another goroutine runs `run()`, which receives notifications from // `nextCh` and synchronously invokes the appropriate handler method. // +// The third goroutine watches the upstream "has synced" channel +// and notifies a SingleFileTracker instance. That instance then +// combines the upstream state and the processListener state to +// implement the overall "event handler has synced". +// // processorListener also keeps track of the adjusted requested resync // period of the listener. type processorListener struct { logger klog.Logger nextCh chan interface{} addCh chan interface{} + done chan struct{} handler ResourceEventHandler - syncTracker *synctrack.SingleFileTracker + syncTracker *synctrack.SingleFileTracker + upstreamHasSynced DoneChecker // pendingNotifications is an unbounded ring buffer that holds all notifications not yet distributed. // There is one per listener, but a failing/stalled listener will have infinite pendingNotifications @@ -1041,13 +1251,21 @@ func (p *processorListener) HasSynced() bool { return p.syncTracker.HasSynced() } -func newProcessListener(logger klog.Logger, handler ResourceEventHandler, requestedResyncPeriod, resyncPeriod time.Duration, now time.Time, bufferSize int, hasSynced func() bool) *processorListener { +// HasNamedSync is done if the source informer has synced, and all +// corresponding events have been delivered. +func (p *processorListener) HasSyncedChecker() DoneChecker { + return p.syncTracker +} + +func newProcessListener(logger klog.Logger, handler ResourceEventHandler, requestedResyncPeriod, resyncPeriod time.Duration, now time.Time, bufferSize int, hasSynced DoneChecker) *processorListener { ret := &processorListener{ logger: logger, nextCh: make(chan interface{}), addCh: make(chan interface{}), + done: make(chan struct{}), + upstreamHasSynced: hasSynced, handler: handler, - syncTracker: &synctrack.SingleFileTracker{UpstreamHasSynced: hasSynced}, + syncTracker: synctrack.NewSingleFileTracker(fmt.Sprintf("%s + event handler %s", hasSynced.Name(), nameForHandler(handler))), pendingNotifications: *buffer.NewRingGrowing(bufferSize), requestedResyncPeriod: requestedResyncPeriod, resyncPeriod: resyncPeriod, @@ -1068,6 +1286,7 @@ func (p *processorListener) add(notification interface{}) { func (p *processorListener) pop() { defer utilruntime.HandleCrashWithLogger(p.logger) defer close(p.nextCh) // Tell .run() to stop + defer close(p.done) // Tell .watchSynced() to stop var nextCh chan<- interface{} var notification interface{} @@ -1131,6 +1350,16 @@ func (p *processorListener) run() { } } +func (p *processorListener) watchSynced() { + select { + case <-p.upstreamHasSynced.Done(): + // Notify tracker that the upstream has synced. + p.syncTracker.UpstreamHasSynced() + case <-p.done: + // Give up waiting for sync. + } +} + // shouldResync determines if the listener needs a resync. If the listener's resyncPeriod is 0, // this always returns false. func (p *processorListener) shouldResync(now time.Time) bool { diff --git a/tools/cache/shared_informer_test.go b/tools/cache/shared_informer_test.go index 0b40315a8..835e6debd 100644 --- a/tools/cache/shared_informer_test.go +++ b/tools/cache/shared_informer_test.go @@ -1134,8 +1134,13 @@ func TestAddWhileActive(t *testing.T) { return } - if !handle1.HasSynced() { - t.Error("Not synced after Run??") + select { + case <-handle1.HasSyncedChecker().Done(): + if !handle1.HasSynced() { + t.Error("Not synced after channel said we are synced??") + } + case <-time.After(10 * time.Second): + t.Error("Not synced 10 seconds after Run??") } listener2.lock.Lock() // ensure we observe it before it has synced diff --git a/tools/cache/synctrack/synctrack.go b/tools/cache/synctrack/synctrack.go index 3fa2beb6b..e941dafb8 100644 --- a/tools/cache/synctrack/synctrack.go +++ b/tools/cache/synctrack/synctrack.go @@ -20,6 +20,7 @@ limitations under the License. package synctrack import ( + "context" "sync" "sync/atomic" @@ -27,11 +28,32 @@ import ( ) // AsyncTracker helps propagate HasSynced in the face of multiple worker threads. +// The user has to monitor the upstream "has synced" +// and notify the tracker when that changes from false to true. type AsyncTracker[T comparable] struct { - UpstreamHasSynced func() bool + // name describes the instance. + name string + + // upstreamHasSynced is changed from false (initial value) to true + // when UpstreamHasSynced is called. + upstreamHasSynced atomic.Bool lock sync.Mutex waiting sets.Set[T] + + // synced gets canceled once both the tracker and upstream are synced. + // A context is convenient for this because it gives us a channel + // and handles thread-safety. + synced context.Context + cancel func() +} + +func NewAsyncTracker[T comparable](name string) *AsyncTracker[T] { + t := &AsyncTracker[T]{ + name: name, + } + t.synced, t.cancel = context.WithCancel(context.Background()) + return t } // Start should be called prior to processing each key which is part of the @@ -57,6 +79,28 @@ func (t *AsyncTracker[T]) Finished(key T) { if t.waiting != nil { t.waiting.Delete(key) } + + // Maybe synced now? + if t.upstreamHasSynced.Load() && len(t.waiting) == 0 { + // Mark as synced. + t.cancel() + } +} + +// UpstreamHasSynced needs to be called at least once as soon as +// the upstream "has synced" becomes true. It tells AsyncTracker +// that the source is synced. +// +// Must be called after handing over the initial list to Start. +func (t *AsyncTracker[T]) UpstreamHasSynced() { + // Upstream is done, but we might not be yet. + t.upstreamHasSynced.Store(true) + t.lock.Lock() + defer t.lock.Unlock() + if len(t.waiting) == 0 { + // Mark as synced. + t.cancel() + } } // HasSynced returns true if the source is synced and every key present in the @@ -64,27 +108,51 @@ func (t *AsyncTracker[T]) Finished(key T) { // itself synced until *after* it has delivered the notification for the last // key, and that notification handler must have called Start. func (t *AsyncTracker[T]) HasSynced() bool { - // Call UpstreamHasSynced first: it might take a lock, which might take - // a significant amount of time, and we can't hold our lock while - // waiting on that or a user is likely to get a deadlock. - if !t.UpstreamHasSynced() { - return false - } - t.lock.Lock() - defer t.lock.Unlock() - return t.waiting.Len() == 0 + return t.synced.Err() != nil +} + +// Done returns a channel that is closed if the source is synced and every key present in the +// initial list has been processed. This relies on the source not considering +// itself synced until *after* it has delivered the notification for the last +// key, and that notification handler must have called Start. +func (t *AsyncTracker[T]) Done() <-chan struct{} { + return t.synced.Done() +} + +func (t *AsyncTracker[T]) Name() string { + return t.name } // SingleFileTracker helps propagate HasSynced when events are processed in -// order (i.e. via a queue). +// order (i.e. via a queue). The user has to monitor the upstream "has synced" +// and notify the tracker when that changes from false to true. type SingleFileTracker struct { + // name describes the instance. + name string + // Important: count is used with atomic operations so it must be 64-bit // aligned, otherwise atomic operations will panic. Having it at the top of // the struct will guarantee that, even on 32-bit arches. // See https://pkg.go.dev/sync/atomic#pkg-note-BUG for more information. count int64 - UpstreamHasSynced func() bool + // upstreamHasSynced is changed from false (initial value) to true + // when UpstreamHasSynced is called. + upstreamHasSynced atomic.Bool + + // synced gets canceled once both the tracker and upstream are synced. + // A context is convenient for this because it gives us a channel + // and handles thread-safety. + synced context.Context + cancel func() +} + +func NewSingleFileTracker(name string) *SingleFileTracker { + t := &SingleFileTracker{ + name: name, + } + t.synced, t.cancel = context.WithCancel(context.Background()) + return t } // Start should be called prior to processing each key which is part of the @@ -103,6 +171,26 @@ func (t *SingleFileTracker) Finished() { if result < 0 { panic("synctrack: negative counter; this logic error means HasSynced may return incorrect value") } + + // Maybe synced now? + if result == 0 && t.upstreamHasSynced.Load() { + // Mark as synced. + t.cancel() + } +} + +// UpstreamHasSynced needs to be called at least once as soon as +// the upstream "has synced" becomes true. It tells SingleFileTracker +// that the source is synced. +// +// Must be called after handing over the initial list to Start. +func (t *SingleFileTracker) UpstreamHasSynced() { + // Upstream is done, but we might not be yet. + t.upstreamHasSynced.Store(true) + if atomic.LoadInt64(&t.count) == 0 { + // Mark as synced. + t.cancel() + } } // HasSynced returns true if the source is synced and every key present in the @@ -110,11 +198,17 @@ func (t *SingleFileTracker) Finished() { // itself synced until *after* it has delivered the notification for the last // key, and that notification handler must have called Start. func (t *SingleFileTracker) HasSynced() bool { - // Call UpstreamHasSynced first: it might take a lock, which might take - // a significant amount of time, and we don't want to then act on a - // stale count value. - if !t.UpstreamHasSynced() { - return false - } - return atomic.LoadInt64(&t.count) <= 0 + return t.synced.Err() != nil +} + +// Done returns a channel that is closed if the source is synced and every key present in the +// initial list has been processed. This relies on the source not considering +// itself synced until *after* it has delivered the notification for the last +// key, and that notification handler must have called Start. +func (t *SingleFileTracker) Done() <-chan struct{} { + return t.synced.Done() +} + +func (t *SingleFileTracker) Name() string { + return t.name } diff --git a/tools/cache/synctrack/synctrack_test.go b/tools/cache/synctrack/synctrack_test.go index 4cf089e22..bff8838e8 100644 --- a/tools/cache/synctrack/synctrack_test.go +++ b/tools/cache/synctrack/synctrack_test.go @@ -19,29 +19,24 @@ package synctrack import ( "strings" "sync" - "time" "testing" ) -func testSingleFileFuncs(upstreamHasSynced func() bool) (start func(), finished func(), hasSynced func() bool) { - tracker := SingleFileTracker{ - UpstreamHasSynced: upstreamHasSynced, - } - return tracker.Start, tracker.Finished, tracker.HasSynced +func testSingleFileFuncs() (upstreamHasSynced func(), start func(), finished func(), hasSynced func() bool, synced <-chan struct{}) { + tracker := NewSingleFileTracker("") + return tracker.UpstreamHasSynced, tracker.Start, tracker.Finished, tracker.HasSynced, tracker.Done() } -func testAsyncFuncs(upstreamHasSynced func() bool) (start func(), finished func(), hasSynced func() bool) { - tracker := AsyncTracker[string]{ - UpstreamHasSynced: upstreamHasSynced, - } - return func() { tracker.Start("key") }, func() { tracker.Finished("key") }, tracker.HasSynced +func testAsyncFuncs() (upstreamHasSynced func(), start func(), finished func(), hasSynced func() bool, synced <-chan struct{}) { + tracker := NewAsyncTracker[string]("") + return tracker.UpstreamHasSynced, func() { tracker.Start("key") }, func() { tracker.Finished("key") }, tracker.HasSynced, tracker.Done() } func TestBasicLogic(t *testing.T) { table := []struct { name string - construct func(func() bool) (func(), func(), func() bool) + construct func() (func(), func(), func(), func() bool, <-chan struct{}) }{ {"SingleFile", testSingleFileFuncs}, {"Async", testAsyncFuncs}, @@ -50,27 +45,87 @@ func TestBasicLogic(t *testing.T) { for _, entry := range table { t.Run(entry.name, func(t *testing.T) { table := []struct { - synced bool + synced bool + syncedBeforeFinish bool + start bool finish bool expectSynced bool }{ - {false, true, true, false}, - {true, true, false, false}, - {false, true, false, false}, - {true, true, true, true}, + {false, false, true, true, false}, + {true, false, true, false, false}, + {true, true, true, false, false}, + {false, false, true, false, false}, + {true, false, true, true, true}, + {true, true, true, true, true}, } for _, tt := range table { - Start, Finished, HasSynced := entry.construct(func() bool { return tt.synced }) + upstreamHasSynced, start, finished, hasSynced, synced := entry.construct() + syncedDone := func() bool { + select { + case <-synced: + return true + default: + return false + } + } + + if hasSynced() { + t.Errorf("for %#v got HasSynced() true before start (wanted false)", tt) + } + if syncedDone() { + t.Errorf("for %#v got Done() true before start (wanted false)", tt) + } + if tt.start { - Start() + start() + } + + if hasSynced() { + t.Errorf("for %#v got HasSynced() true after start (wanted false)", tt) + } + if syncedDone() { + t.Errorf("for %#v got Done() true after start (wanted false)", tt) + } + + // "upstream has synced" may occur before or after finished, but not before start. + if tt.synced && tt.syncedBeforeFinish { + upstreamHasSynced() + if hasSynced() { + t.Errorf("for %#v got HasSynced() true after upstreamHasSynced and before finish (wanted false)", tt) + } + if syncedDone() { + t.Errorf("for %#v got Done() true after upstreamHasSynced and before finish (wanted false)", tt) + } } if tt.finish { - Finished() + finished() } - got := HasSynced() - if e, a := tt.expectSynced, got; e != a { - t.Errorf("for %#v got %v (wanted %v)", tt, a, e) + if tt.synced && !tt.syncedBeforeFinish { + if hasSynced() { + t.Errorf("for %#v got HasSynced() true after finish and before upstreamHasSynced (wanted false)", tt) + } + if syncedDone() { + t.Errorf("for %#v got Done() true after finish and before upstreamHasSynced (wanted false)", tt) + } + upstreamHasSynced() + } + if e, a := tt.expectSynced, hasSynced(); e != a { + t.Errorf("for %#v got HasSynced() %v (wanted %v)", tt, a, e) + } + if e, a := tt.expectSynced, syncedDone(); e != a { + t.Errorf("for %#v got Done() %v (wanted %v)", tt, a, e) + } + + select { + case <-synced: + if !tt.expectSynced { + t.Errorf("for %#v got done (wanted not done)", tt) + } + default: + if tt.expectSynced { + t.Errorf("for %#v got done (wanted not done)", tt) + } } } }) @@ -78,7 +133,7 @@ func TestBasicLogic(t *testing.T) { } func TestAsyncLocking(t *testing.T) { - aft := AsyncTracker[int]{UpstreamHasSynced: func() bool { return true }} + aft := NewAsyncTracker[int]("") var wg sync.WaitGroup for _, i := range []int{0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10} { @@ -92,6 +147,7 @@ func TestAsyncLocking(t *testing.T) { }(i) } wg.Wait() + aft.UpstreamHasSynced() if !aft.HasSynced() { t.Errorf("async tracker must have made a threading error?") } @@ -99,7 +155,7 @@ func TestAsyncLocking(t *testing.T) { } func TestSingleFileCounting(t *testing.T) { - sft := SingleFileTracker{UpstreamHasSynced: func() bool { return true }} + sft := NewSingleFileTracker("") for i := 0; i < 100; i++ { sft.Start() @@ -115,6 +171,8 @@ func TestSingleFileCounting(t *testing.T) { } sft.Finished() + sft.UpstreamHasSynced() + if !sft.HasSynced() { t.Fatal("Unexpectedly not synced?") } @@ -148,24 +206,35 @@ func TestSingleFileCounting(t *testing.T) { func TestSingleFile(t *testing.T) { table := []struct { - synced bool + synced bool + syncedBeforeStops bool + starts int stops int expectSynced bool }{ - {false, 1, 1, false}, - {true, 1, 0, false}, - {false, 1, 0, false}, - {true, 1, 1, true}, + {false, false, 1, 1, false}, + {true, false, 1, 0, false}, + {true, true, 1, 0, false}, + {false, false, 1, 0, false}, + {true, false, 1, 1, true}, + {true, true, 1, 1, true}, } for _, tt := range table { - sft := SingleFileTracker{UpstreamHasSynced: func() bool { return tt.synced }} + sft := NewSingleFileTracker("") for i := 0; i < tt.starts; i++ { sft.Start() } + // "upstream has synced" may occur before or after finished, but not before start. + if tt.synced && tt.syncedBeforeStops { + sft.UpstreamHasSynced() + } for i := 0; i < tt.stops; i++ { sft.Finished() } + if tt.synced && !tt.syncedBeforeStops { + sft.UpstreamHasSynced() + } got := sft.HasSynced() if e, a := tt.expectSynced, got; e != a { t.Errorf("for %#v got %v (wanted %v)", tt, a, e) @@ -173,67 +242,3 @@ func TestSingleFile(t *testing.T) { } } - -func TestNoStaleValue(t *testing.T) { - table := []struct { - name string - construct func(func() bool) (func(), func(), func() bool) - }{ - {"SingleFile", testSingleFileFuncs}, - {"Async", testAsyncFuncs}, - } - - for _, entry := range table { - t.Run(entry.name, func(t *testing.T) { - var lock sync.Mutex - upstreamHasSynced := func() bool { - lock.Lock() - defer lock.Unlock() - return true - } - - Start, Finished, HasSynced := entry.construct(upstreamHasSynced) - - // Ordinarily the corresponding lock would be held and you wouldn't be - // able to call this function at this point. - if !HasSynced() { - t.Fatal("Unexpectedly not synced??") - } - - Start() - if HasSynced() { - t.Fatal("Unexpectedly synced??") - } - Finished() - if !HasSynced() { - t.Fatal("Unexpectedly not synced??") - } - - // Now we will prove that if the lock is held, you can't get a false - // HasSynced return. - lock.Lock() - - // This goroutine calls HasSynced - var wg sync.WaitGroup - wg.Add(1) - go func() { - defer wg.Done() - if HasSynced() { - t.Error("Unexpectedly synced??") - } - }() - - // This goroutine increments + unlocks. The sleep is to bias the - // runtime such that the other goroutine usually wins (it needs to work - // in both orderings, this one is more likely to be buggy). - go func() { - time.Sleep(time.Millisecond) - Start() - lock.Unlock() - }() - - wg.Wait() - }) - } - -} diff --git a/tools/cache/the_real_fifo.go b/tools/cache/the_real_fifo.go index 5ac89c945..4856a9baf 100644 --- a/tools/cache/the_real_fifo.go +++ b/tools/cache/the_real_fifo.go @@ -23,11 +23,20 @@ import ( utilruntime "k8s.io/apimachinery/pkg/util/runtime" "k8s.io/apimachinery/pkg/util/sets" + "k8s.io/klog/v2" utiltrace "k8s.io/utils/trace" ) // RealFIFOOptions is the configuration parameters for RealFIFO. type RealFIFOOptions struct { + // If set, log output will go to this logger instead of klog.Background(). + // The name of the fifo gets added automatically. + Logger *klog.Logger + + // Name can be used to override the default "RealFIFO" name for the new instance. + // Optional. Used only if Identifier.Name returns an empty string. + Name string + // KeyFunction is used to figure out what key an object should have. (It's // exposed in the returned RealFIFO's keyOf() method, with additional // handling around deleted objects and queue state). @@ -78,11 +87,23 @@ var _ QueueWithBatch = &RealFIFO{} // 1. delivers notifications for items that have been deleted // 2. delivers multiple notifications per item instead of simply the most recent value type RealFIFO struct { + // logger is a per-instance logger. This gets chosen when constructing + // the instance, with klog.Background() as default. + logger klog.Logger + + // name is the name of the fifo. It is included in the logger. + name string + lock sync.RWMutex cond sync.Cond items []Delta + // synced is initially an open channel. It gets closed (once!) by checkSynced_locked + // as soon as the initial sync is considered complete. + synced chan struct{} + syncedClosed bool + // populated is true if the first batch of items inserted by Replace() has been populated // or Delete/Add/Update was called first. populated bool @@ -145,6 +166,7 @@ type SyncAllInfo struct{} var ( _ = Queue(&RealFIFO{}) // RealFIFO is a Queue _ = TransformingStore(&RealFIFO{}) // RealFIFO implements TransformingStore to allow memory optimizations + _ = DoneChecker(&RealFIFO{}) // RealFIFO and implements DoneChecker. ) // Close the queue. @@ -181,11 +203,37 @@ func (f *RealFIFO) HasSynced() bool { return f.hasSynced_locked() } -// ignoring lint to reduce delta to the original for review. It's ok adjust later. -// -//lint:file-ignore ST1003: should not use underscores in Go names +// HasSyncedChecker is done if an Add/Update/Delete/AddIfNotPresent are called first, +// or the first batch of items inserted by Replace() has been popped. +func (f *RealFIFO) HasSyncedChecker() DoneChecker { + return f +} + +// Name implements [DoneChecker.Name] +func (f *RealFIFO) Name() string { + return f.name +} + +// Done implements [DoneChecker.Done] +func (f *RealFIFO) Done() <-chan struct{} { + return f.synced +} + +// hasSynced_locked returns the result of a prior checkSynced_locked call. func (f *RealFIFO) hasSynced_locked() bool { - return f.populated && f.initialPopulationCount == 0 + return f.syncedClosed +} + +// checkSynced_locked checks whether the initial batch of items (set via Replace) has been delivered +// and closes the synced channel as needed. It must be called after changing f.populated and/or +// f.initialPopulationCount while the mutex is still locked. +func (f *RealFIFO) checkSynced_locked() { + synced := f.populated && f.initialPopulationCount == 0 + if synced && !f.syncedClosed { + // Initial sync is complete. + f.syncedClosed = true + close(f.synced) + } } // addToItems_locked appends to the delta list. @@ -276,6 +324,7 @@ func (f *RealFIFO) Add(obj interface{}) error { defer f.lock.Unlock() f.populated = true + f.checkSynced_locked() retErr := f.addToItems_locked(Added, false, obj) return retErr @@ -287,6 +336,7 @@ func (f *RealFIFO) Update(obj interface{}) error { defer f.lock.Unlock() f.populated = true + f.checkSynced_locked() retErr := f.addToItems_locked(Updated, false, obj) return retErr @@ -300,6 +350,7 @@ func (f *RealFIFO) Delete(obj interface{}) error { defer f.lock.Unlock() f.populated = true + f.checkSynced_locked() retErr := f.addToItems_locked(Deleted, false, obj) return retErr @@ -347,6 +398,7 @@ func (f *RealFIFO) Pop(process PopProcessFunc) (interface{}, error) { defer func() { if f.initialPopulationCount > 0 { f.initialPopulationCount-- + f.checkSynced_locked() } }() @@ -467,7 +519,6 @@ func (f *RealFIFO) PopBatch(processBatch ProcessBatchFunc, processSingle PopProc unique.Insert(id) moveDeltaToProcessList(i) } - f.items = f.items[len(deltas):] // Decrement initialPopulationCount if needed. // This is done in a defer so we only do this *after* processing is complete, @@ -475,6 +526,7 @@ func (f *RealFIFO) PopBatch(processBatch ProcessBatchFunc, processSingle PopProc defer func() { if f.initialPopulationCount > 0 { f.initialPopulationCount -= len(deltas) + f.checkSynced_locked() } }() @@ -524,7 +576,7 @@ func (f *RealFIFO) Replace(newItems []interface{}, resourceVersion string) error if f.emitAtomicEvents { err = f.addReplaceToItemsLocked(newItems, resourceVersion) } else { - err = reconcileReplacement(f.items, f.knownObjects, newItems, f.keyOf, + err = reconcileReplacement(f.logger, f.items, f.knownObjects, newItems, f.keyOf, func(obj DeletedFinalStateUnknown) error { return f.addToItems_locked(Deleted, true, obj) }, @@ -539,6 +591,7 @@ func (f *RealFIFO) Replace(newItems []interface{}, resourceVersion string) error if !f.populated { f.populated = true f.initialPopulationCount = len(f.items) + f.checkSynced_locked() } return nil @@ -548,6 +601,7 @@ func (f *RealFIFO) Replace(newItems []interface{}, resourceVersion string) error // and based upon the state of the items in the queue and known objects will call onDelete and onReplace // depending upon whether the item is being deleted or replaced/added. func reconcileReplacement( + logger klog.Logger, queuedItems []Delta, knownObjects KeyListerGetter, newItems []interface{}, @@ -623,10 +677,10 @@ func reconcileReplacement( deletedObj, exists, err := knownObjects.GetByKey(knownKey) if err != nil { deletedObj = nil - utilruntime.HandleError(fmt.Errorf("error during lookup, placing DeleteFinalStateUnknown marker without object: key=%q, err=%w", knownKey, err)) + utilruntime.HandleErrorWithLogger(logger, err, "Error during lookup, placing DeleteFinalStateUnknown marker without object", "key", knownKey) } else if !exists { deletedObj = nil - utilruntime.HandleError(fmt.Errorf("key does not exist in known objects store, placing DeleteFinalStateUnknown marker without object: key=%q", knownKey)) + utilruntime.HandleErrorWithLogger(logger, nil, "Key does not exist in known objects store, placing DeleteFinalStateUnknown marker without object", "key", knownKey) } retErr := onDelete(DeletedFinalStateUnknown{ Key: knownKey, @@ -683,10 +737,10 @@ func (f *RealFIFO) Resync() error { knownObj, exists, err := f.knownObjects.GetByKey(knownKey) if err != nil { - utilruntime.HandleError(fmt.Errorf("unable to queue object for sync: key=%q, err=%w", knownKey, err)) + utilruntime.HandleErrorWithLogger(f.logger, err, "Unable to queue object for sync", "key", knownKey) continue } else if !exists { - utilruntime.HandleError(fmt.Errorf("key does not exist in known objects store, unable to queue object for sync: key=%q", knownKey)) + utilruntime.HandleErrorWithLogger(f.logger, nil, "Key does not exist in known objects store, unable to queue object for sync", "key", knownKey) continue } @@ -739,7 +793,10 @@ func NewRealFIFOWithOptions(opts RealFIFOOptions) *RealFIFO { } f := &RealFIFO{ + logger: klog.Background(), + name: "RealFIFO", items: make([]Delta, 0, 10), + synced: make(chan struct{}), keyFunc: opts.KeyFunction, knownObjects: opts.KnownObjects, transformer: opts.Transformer, @@ -749,7 +806,16 @@ func NewRealFIFOWithOptions(opts RealFIFOOptions) *RealFIFO { identifier: opts.Identifier, metrics: newFIFOMetrics(opts.Identifier, opts.MetricsProvider), } - + if opts.Logger != nil { + f.logger = *opts.Logger + } + if name := opts.Name; name != "" { + f.name = name + } + if name := opts.Identifier.Name(); name != "" { + f.name = name + } + f.logger = klog.LoggerWithName(f.logger, f.name) f.cond.L = &f.lock return f } diff --git a/tools/cache/wait_test.go b/tools/cache/wait_test.go new file mode 100644 index 000000000..f03b48dd8 --- /dev/null +++ b/tools/cache/wait_test.go @@ -0,0 +1,246 @@ +/* +Copyright The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package cache + +import ( + "bytes" + "context" + "errors" + "fmt" + "os" + "reflect" + "runtime" + "sync" + "testing" + "testing/synctest" + "time" + + "github.com/stretchr/testify/assert" + "k8s.io/klog/v2" + "k8s.io/klog/v2/textlogger" +) + +func init() { + // The test below is sensitive to the time zone, log output uses time.Local. + time.Local = time.UTC +} + +func TestWaitFor(t *testing.T) { + for name, tc := range map[string]struct { + what string + checkers []DoneChecker + timeout time.Duration + timeoutReason string + + expectDone bool + + // Time is predictable and starts at the synctest epoch. + // %[1]d is the pid, %[2]d the line number of the WaitFor call. + expectOutput string + }{ + "empty": { + expectDone: true, + }, + "no-caches": { + what: "my-caches", + expectDone: true, + expectOutput: `I0101 00:00:00.000000 %7[1]d wait_test.go:%[2]d] "Waiting" for="my-caches" +`, + }, + "no-logging": { + checkers: []DoneChecker{newMockChecker("first", 10*time.Second), newMockChecker("second", 5*time.Second), newMockChecker("last", 0*time.Second)}, + expectDone: true, + }, + "with-logging": { + what: "my-caches", + checkers: []DoneChecker{newMockChecker("first", 10*time.Second), newMockChecker("second", 5*time.Second), newMockChecker("last", 0*time.Second)}, + expectDone: true, + expectOutput: `I0101 00:00:00.000000 %7[1]d wait_test.go:%[2]d] "Waiting" for="my-caches" +I0101 00:00:00.000000 %7[1]d wait_test.go:%[2]d] "Done waiting" for="my-caches" instance="last" +I0101 00:00:05.000000 %7[1]d wait_test.go:%[2]d] "Done waiting" for="my-caches" instance="second" +I0101 00:00:10.000000 %7[1]d wait_test.go:%[2]d] "Done waiting" for="my-caches" instance="first" +`, + }, + "some-timeout": { + timeout: time.Minute, + what: "my-caches", + checkers: []DoneChecker{newMockChecker("first", 10*time.Second), newMockChecker("second", -1), newMockChecker("last", 0*time.Second)}, + expectDone: false, + expectOutput: `I0101 00:00:00.000000 %7[1]d wait_test.go:%[2]d] "Waiting" for="my-caches" +I0101 00:00:00.000000 %7[1]d wait_test.go:%[2]d] "Done waiting" for="my-caches" instance="last" +I0101 00:00:10.000000 %7[1]d wait_test.go:%[2]d] "Done waiting" for="my-caches" instance="first" +I0101 00:01:00.000000 %7[1]d wait_test.go:%[2]d] "Timed out waiting" for="my-caches" cause="context deadline exceeded" instances=["second"] +`, + }, + "some-canceled": { + timeout: -1, + what: "my-caches", + checkers: []DoneChecker{newMockChecker("first", 10*time.Second), newMockChecker("second", -1), newMockChecker("last", 0*time.Second)}, + expectDone: false, + expectOutput: `I0101 00:00:00.000000 %7[1]d wait_test.go:%[2]d] "Waiting" for="my-caches" +I0101 00:00:00.000000 %7[1]d wait_test.go:%[2]d] "Done waiting" for="my-caches" instance="last" +I0101 00:00:00.000000 %7[1]d wait_test.go:%[2]d] "Timed out waiting" for="my-caches" cause="context canceled" instances=["first","second"] +`, + }, + "more": { + timeoutReason: "go fish", + timeout: 5 * time.Second, + what: "my-caches", + checkers: []DoneChecker{newMockChecker("first", 10*time.Second), newMockChecker("second", -1), newMockChecker("last", 0*time.Second)}, + expectDone: false, + expectOutput: `I0101 00:00:00.000000 %7[1]d wait_test.go:%[2]d] "Waiting" for="my-caches" +I0101 00:00:00.000000 %7[1]d wait_test.go:%[2]d] "Done waiting" for="my-caches" instance="last" +I0101 00:00:05.000000 %7[1]d wait_test.go:%[2]d] "Timed out waiting" for="my-caches" cause="go fish" instances=["first","second"] +`, + }, + "all": { + timeout: time.Minute, + what: "my-caches", + checkers: []DoneChecker{newMockChecker("first", -1), newMockChecker("second", -1), newMockChecker("last", -1)}, + expectDone: false, + expectOutput: `I0101 00:00:00.000000 %7[1]d wait_test.go:%[2]d] "Waiting" for="my-caches" +I0101 00:01:00.000000 %7[1]d wait_test.go:%[2]d] "Timed out waiting" for="my-caches" cause="context deadline exceeded" instances=["first","last","second"] +`, + }, + } { + t.Run(name, func(t *testing.T) { + synctest.Test(t, func(t *testing.T) { + var buffer bytes.Buffer + logger := textlogger.NewLogger(textlogger.NewConfig(textlogger.Output(&buffer))) + ctx := klog.NewContext(context.Background(), logger) + var wg sync.WaitGroup + defer wg.Wait() + if tc.timeout != 0 { + switch tc.timeoutReason { + case "": + if tc.timeout > 0 { + c, cancel := context.WithTimeout(ctx, tc.timeout) + defer cancel() + ctx = c + } else { + c, cancel := context.WithCancel(ctx) + cancel() + ctx = c + } + default: + c, cancel := context.WithCancelCause(ctx) + wg.Go(func() { + time.Sleep(tc.timeout) + cancel(errors.New(tc.timeoutReason)) + }) + ctx = c + } + } + _, _, line, _ := runtime.Caller(0) + done := WaitFor(ctx, tc.what, tc.checkers...) + expectOutput := tc.expectOutput + if expectOutput != "" { + expectOutput = fmt.Sprintf(expectOutput, os.Getpid(), line+1) + } + assert.Equal(t, tc.expectDone, done, "done") + assert.Equal(t, expectOutput, buffer.String(), "output") + }) + }) + } +} + +// newMockChecker can be created outside of a synctest bubble. +// It constructs the channel inside when Done is first called. +func newMockChecker(name string, delay time.Duration) DoneChecker { + return &mockChecker{ + name: name, + delay: delay, + } +} + +type mockChecker struct { + name string + delay time.Duration + initialized bool + done <-chan struct{} +} + +func (m *mockChecker) Name() string { return m.name } +func (m *mockChecker) Done() <-chan struct{} { + if !m.initialized { + switch { + case m.delay > 0: + // In the future. + ctx := context.Background() + // This leaks a cancel, but is hard to avoid (cannot use the parent t.Cleanup, no other way to delay calling it). Doesn't matter in a unit test. + //nolint:govet + ctx, _ = context.WithTimeout(ctx, m.delay) + m.done = ctx.Done() + case m.delay == 0: + // Immediately. + c := make(chan struct{}) + close(c) + m.done = c + default: + // Never. + c := make(chan struct{}) + m.done = c + } + m.initialized = true + } + return m.done +} + +func TestSyncResult(t *testing.T) { + for name, tc := range map[string]struct { + result SyncResult + expectAsError string + }{ + "empty": {}, + "one": { + result: SyncResult{ + Err: errors.New("my custom cancellation reason"), + Synced: map[reflect.Type]bool{ + reflect.TypeFor[int](): true, + reflect.TypeFor[string](): false, + }, + }, + expectAsError: "failed to sync all caches: string: my custom cancellation reason", + }, + "many": { + result: SyncResult{ + Err: errors.New("my custom cancellation reason"), + Synced: map[reflect.Type]bool{ + reflect.TypeFor[int](): false, + reflect.TypeFor[string](): false, + }, + }, + expectAsError: "failed to sync all caches: int, string: my custom cancellation reason", + }, + } { + + t.Run(name, func(t *testing.T) { + actual := tc.result.AsError() + switch { + case tc.expectAsError == "" && actual != nil: + t.Fatalf("expected no error, got %v", actual) + case tc.expectAsError != "" && actual == nil: + t.Fatalf("expected %q, got no error", actual) + case tc.expectAsError != "" && actual != nil && actual.Error() != tc.expectAsError: + t.Fatalf("expected %q, got %q", tc.expectAsError, actual.Error()) + } + if tc.result.Err != nil && !errors.Is(actual, tc.result.Err) { + t.Errorf("actual error %+v should wrap %v but doesn't", actual, tc.result.Err) + } + }) + } +}