From 5d289bc44cf93b70f50b74a089eb36f9f3a8a50e Mon Sep 17 00:00:00 2001 From: Patrick Ohly Date: Fri, 26 Jul 2024 15:26:00 +0200 Subject: [PATCH] client-go/tools/cache: add APIs with context parameter The context is used for cancellation and to support contextual logging. In most cases, alternative *WithContext APIs get added, except for NewIntegerResourceVersionMutationCache where code searches indicate that the API is not used downstream. An API break around SharedInformer couldn't be avoided because the alternative (keeping the interface unchanged and adding a second one with the new method) would have been worse. controller-runtime needs to be updated because it implements that interface in a test package. Downstream consumers of controller-runtime will work unless they use those test package. Converting Kubernetes to use the other new alternatives will follow. In the meantime, usage of the new alternatives cannot be enforced via logcheck yet (see https://github.com/kubernetes/kubernetes/issues/126379 for the process). Passing context through and checking it for cancellation is tricky for event handlers. A better approach is to map the context cancellation to the normal removal of an event handler via a helper goroutine. Thanks to the new HandleErrorWithLogr and HandleCrashWithLogr, remembering the logger is sufficient for handling problems at runtime. Kubernetes-commit: 4638ba971661497b147906b8977ae206c9dd6e44 --- examples/workqueue/main.go | 24 +- tools/cache/cache_test.go | 25 +++ tools/cache/controller.go | 53 +++-- tools/cache/controller_test.go | 74 ++++--- tools/cache/delta_fifo.go | 25 ++- tools/cache/listers.go | 7 +- tools/cache/main_test.go | 8 - tools/cache/mutation_cache.go | 8 +- tools/cache/mutation_detector.go | 1 + tools/cache/processor_listener_test.go | 3 +- tools/cache/reflector.go | 187 ++++++++++------ tools/cache/reflector_test.go | 207 ++++++++++-------- tools/cache/reflector_watchlist_test.go | 23 +- tools/cache/shared_informer.go | 170 +++++++++++---- tools/cache/shared_informer_test.go | 277 +++++++++++++++++++++--- tools/cache/util_test.go | 20 ++ 16 files changed, 800 insertions(+), 312 deletions(-) create mode 100644 tools/cache/cache_test.go diff --git a/examples/workqueue/main.go b/examples/workqueue/main.go index b8825dc1..eab9daba 100644 --- a/examples/workqueue/main.go +++ b/examples/workqueue/main.go @@ -17,6 +17,8 @@ limitations under the License. package main import ( + "context" + "errors" "flag" "fmt" "time" @@ -116,30 +118,30 @@ func (c *Controller) handleErr(err error, key string) { } // Run begins watching and syncing. -func (c *Controller) Run(workers int, stopCh chan struct{}) { - defer runtime.HandleCrash() +func (c *Controller) Run(ctx context.Context, workers int) { + defer runtime.HandleCrashWithContext(ctx) // Let the workers stop when we are done defer c.queue.ShutDown() klog.Info("Starting Pod controller") - go c.informer.Run(stopCh) + go c.informer.RunWithContext(ctx) // Wait for all involved caches to be synced, before processing items from the queue is started - if !cache.WaitForCacheSync(stopCh, c.informer.HasSynced) { + if !cache.WaitForNamedCacheSyncWithContext(ctx, c.informer.HasSynced) { runtime.HandleError(fmt.Errorf("Timed out waiting for caches to sync")) return } for i := 0; i < workers; i++ { - go wait.Until(c.runWorker, time.Second, stopCh) + go wait.UntilWithContext(ctx, c.runWorker, time.Second) } - <-stopCh + <-ctx.Done() klog.Info("Stopping Pod controller") } -func (c *Controller) runWorker() { +func (c *Controller) runWorker(ctx context.Context) { for c.processNextItem() { } } @@ -164,6 +166,8 @@ func main() { klog.Fatal(err) } + ctx := context.Background() + // create the pod watcher podListWatcher := cache.NewListWatchFromClient(clientset.CoreV1().RESTClient(), "pods", v1.NamespaceDefault, fields.Everything()) @@ -211,9 +215,9 @@ func main() { }) // Now let's start the controller - stop := make(chan struct{}) - defer close(stop) - go controller.Run(1, stop) + cancelCtx, cancel := context.WithCancelCause(ctx) + defer cancel(errors.New("time to stop because main has completed")) + go controller.Run(cancelCtx, 1) // Wait forever select {} diff --git a/tools/cache/cache_test.go b/tools/cache/cache_test.go new file mode 100644 index 00000000..614ffba2 --- /dev/null +++ b/tools/cache/cache_test.go @@ -0,0 +1,25 @@ +/* +Copyright 2024 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package cache + +import ( + "k8s.io/klog/v2" +) + +func init() { + klog.InitFlags(nil) +} diff --git a/tools/cache/controller.go b/tools/cache/controller.go index e523a665..41ead09e 100644 --- a/tools/cache/controller.go +++ b/tools/cache/controller.go @@ -17,6 +17,7 @@ limitations under the License. package cache import ( + "context" "errors" "sync" "time" @@ -79,8 +80,14 @@ type Config struct { RetryOnError bool // Called whenever the ListAndWatch drops the connection with an error. + // + // Contextual logging: WatchErrorHandlerWithContext should be used instead of WatchErrorHandler in code which supports contextual logging. WatchErrorHandler WatchErrorHandler + // Called whenever the ListAndWatch drops the connection with an error + // and WatchErrorHandler is not set. + WatchErrorHandlerWithContext WatchErrorHandlerWithContext + // WatchListPageSize is the requested chunk size of initial and relist watch lists. WatchListPageSize int64 } @@ -104,12 +111,21 @@ type controller struct { // Controller is a low-level controller that is parameterized by a // Config and used in sharedIndexInformer. type Controller interface { - // Run does two things. One is to construct and run a Reflector + // RunWithContext does two things. One is to construct and run a Reflector // to pump objects/notifications from the Config's ListerWatcher // to the Config's Queue and possibly invoke the occasional Resync // on that Queue. The other is to repeatedly Pop from the Queue // and process with the Config's ProcessFunc. Both of these - // continue until `stopCh` is closed. + // continue until the context is canceled. + // + // It's an error to call RunWithContext more than once. + // RunWithContext blocks; call via go. + RunWithContext(ctx context.Context) + + // Run does the same as RunWithContext with a stop channel instead of + // a context. + // + // Contextual logging: RunWithcontext should be used instead of Run in code which supports contextual logging. Run(stopCh <-chan struct{}) // HasSynced delegates to the Config's Queue @@ -129,13 +145,16 @@ func New(c *Config) Controller { return ctlr } -// Run begins processing items, and will continue until a value is sent down stopCh or it is closed. -// It's an error to call Run more than once. -// Run blocks; call via go. +// Run implements [Controller.Run]. func (c *controller) Run(stopCh <-chan struct{}) { - defer utilruntime.HandleCrash() + c.RunWithContext(wait.ContextForChannel(stopCh)) +} + +// RunWithContext implements [Controller.RunWithContext]. +func (c *controller) RunWithContext(ctx context.Context) { + defer utilruntime.HandleCrashWithContext(ctx) go func() { - <-stopCh + <-ctx.Done() c.config.Queue.Close() }() r := NewReflectorWithOptions( @@ -152,7 +171,11 @@ func (c *controller) Run(stopCh <-chan struct{}) { r.ShouldResync = c.config.ShouldResync r.WatchListPageSize = c.config.WatchListPageSize if c.config.WatchErrorHandler != nil { - r.watchErrorHandler = c.config.WatchErrorHandler + r.watchErrorHandler = func(_ context.Context, r *Reflector, err error) { + c.config.WatchErrorHandler(r, err) + } + } else if c.config.WatchErrorHandlerWithContext != nil { + r.watchErrorHandler = c.config.WatchErrorHandlerWithContext } c.reflectorMutex.Lock() @@ -161,9 +184,9 @@ func (c *controller) Run(stopCh <-chan struct{}) { var wg wait.Group - wg.StartWithChannel(stopCh, r.Run) + wg.StartWithContext(ctx, r.RunWithContext) - wait.Until(c.processLoop, time.Second, stopCh) + wait.UntilWithContext(ctx, c.processLoop, time.Second) wg.Wait() } @@ -185,13 +208,11 @@ func (c *controller) LastSyncResourceVersion() string { // TODO: Consider doing the processing in parallel. This will require a little thought // to make sure that we don't end up processing the same object multiple times // concurrently. -// -// TODO: Plumb through the stopCh here (and down to the queue) so that this can -// actually exit when the controller is stopped. Or just give up on this stuff -// ever being stoppable. Converting this whole package to use Context would -// also be helpful. -func (c *controller) processLoop() { +func (c *controller) processLoop(ctx context.Context) { for { + // TODO: Plumb through the ctx so that this can + // actually exit when the controller is stopped. Or just give up on this stuff + // ever being stoppable. obj, err := c.config.Queue.Pop(PopProcessFunc(c.config.Process)) if err != nil { if err == ErrFIFOClosed { diff --git a/tools/cache/controller_test.go b/tools/cache/controller_test.go index 232585c8..6b263bc8 100644 --- a/tools/cache/controller_test.go +++ b/tools/cache/controller_test.go @@ -17,6 +17,7 @@ limitations under the License. package cache import ( + "context" "fmt" "math/rand" "sync" @@ -32,6 +33,7 @@ import ( "k8s.io/apimachinery/pkg/util/wait" "k8s.io/apimachinery/pkg/watch" fcache "k8s.io/client-go/tools/cache/testing" + "k8s.io/klog/v2/ktesting" fuzz "github.com/google/gofuzz" ) @@ -98,10 +100,10 @@ func Example() { }, } - // Create the controller and run it until we close stop. - stop := make(chan struct{}) - defer close(stop) - go New(cfg).Run(stop) + // Create the controller and run it until we cancel. + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + go New(cfg).RunWithContext(ctx) // Let's add a few objects to the source. testIDs := []string{"a-hello", "b-controller", "c-framework"} @@ -156,10 +158,10 @@ func ExampleNewInformer() { }, ) - // Run the controller and run it until we close stop. - stop := make(chan struct{}) - defer close(stop) - go controller.Run(stop) + // Run the controller and run it until we cancel. + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + go controller.RunWithContext(ctx) // Let's add a few objects to the source. testIDs := []string{"a-hello", "b-controller", "c-framework"} @@ -227,9 +229,10 @@ func TestHammerController(t *testing.T) { t.Errorf("Expected HasSynced() to return false before we started the controller") } - // Run the controller and run it until we close stop. - stop := make(chan struct{}) - go controller.Run(stop) + // Run the controller and run it until we cancel. + _, ctx := ktesting.NewTestContext(t) + ctx, cancel := context.WithCancel(ctx) + go controller.RunWithContext(ctx) // Let's wait for the controller to do its initial sync wait.Poll(100*time.Millisecond, wait.ForeverTestTimeout, func() (bool, error) { @@ -288,7 +291,7 @@ func TestHammerController(t *testing.T) { // Let's wait for the controller to finish processing the things we just added. // TODO: look in the queue to see how many items need to be processed. time.Sleep(100 * time.Millisecond) - close(stop) + cancel() // TODO: Verify that no goroutines were leaked here and that everything shut // down cleanly. @@ -385,11 +388,13 @@ func TestUpdate(t *testing.T) { }, ) - // Run the controller and run it until we close stop. + // Run the controller and run it until we cancel. // Once Run() is called, calls to testDoneWG.Done() might start, so // all testDoneWG.Add() calls must happen before this point - stop := make(chan struct{}) - go controller.Run(stop) + _, ctx := ktesting.NewTestContext(t) + ctx, cancel := context.WithCancel(ctx) + defer cancel() + go controller.RunWithContext(ctx) <-watchCh // run every test a few times, in parallel @@ -407,7 +412,6 @@ func TestUpdate(t *testing.T) { // Let's wait for the controller to process the things we just added. testDoneWG.Wait() - close(stop) } func TestPanicPropagated(t *testing.T) { @@ -427,9 +431,10 @@ func TestPanicPropagated(t *testing.T) { }, ) - // Run the controller and run it until we close stop. - stop := make(chan struct{}) - defer close(stop) + // Run the controller and run it until we cancel. + _, ctx := ktesting.NewTestContext(t) + ctx, cancel := context.WithCancel(ctx) + defer cancel() propagated := make(chan interface{}) go func() { @@ -438,7 +443,7 @@ func TestPanicPropagated(t *testing.T) { propagated <- r } }() - controller.Run(stop) + controller.RunWithContext(ctx) }() // Let's add a object to the source. It will trigger a panic. source.Add(&v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "test"}}) @@ -555,8 +560,10 @@ func TestTransformingInformer(t *testing.T) { } } - stopCh := make(chan struct{}) - go controller.Run(stopCh) + _, ctx := ktesting.NewTestContext(t) + ctx, cancel := context.WithCancel(ctx) + defer cancel() + go controller.RunWithContext(ctx) verifyEvent(watch.Added, nil, expectedPod("pod1", "2")) verifyStore([]interface{}{expectedPod("pod1", "2")}) @@ -574,11 +581,17 @@ func TestTransformingInformer(t *testing.T) { source.Delete(makePod("pod1", "2")) verifyEvent(watch.Deleted, expectedPod("pod1", "2"), nil) verifyStore([]interface{}{expectedPod("pod2", "2"), expectedPod("pod3", "1")}) - - close(stopCh) } func TestTransformingInformerRace(t *testing.T) { + _, ctx := ktesting.NewTestContext(t) + // Canceled *only* when the test is done. + testCtx, cancel := context.WithCancel(ctx) + defer cancel() + // Canceled *also* during the test. + ctx, cancel = context.WithCancel(ctx) + defer cancel() + // source simulates an apiserver object endpoint. source := newFakeControllerSource(t) @@ -618,7 +631,11 @@ func TestTransformingInformerRace(t *testing.T) { type event struct{} events := make(chan event, numObjs) recordEvent := func(eventType watch.EventType, previous, current interface{}) { - events <- event{} + select { + case events <- event{}: + case <-testCtx.Done(): + // Don't block forever in the write above when test is already done. + } } checkEvents := func(count int) { for i := 0; i < count; i++ { @@ -637,8 +654,7 @@ func TestTransformingInformerRace(t *testing.T) { podTransformer, ) - stopCh := make(chan struct{}) - go controller.Run(stopCh) + go controller.RunWithContext(ctx) checkEvents(numObjs) @@ -652,7 +668,7 @@ func TestTransformingInformerRace(t *testing.T) { key := fmt.Sprintf("namespace/pod-%d", index) for { select { - case <-stopCh: + case <-ctx.Done(): return default: } @@ -674,7 +690,7 @@ func TestTransformingInformerRace(t *testing.T) { // Let resyncs to happen for some time. time.Sleep(time.Second) - close(stopCh) + cancel() wg.Wait() close(errors) for err := range errors { diff --git a/tools/cache/delta_fifo.go b/tools/cache/delta_fifo.go index ce74dfb6..4bb526cd 100644 --- a/tools/cache/delta_fifo.go +++ b/tools/cache/delta_fifo.go @@ -55,6 +55,9 @@ type DeltaFIFOOptions struct { // If set, will be called for objects before enqueueing them. Please // see the comment on TransformFunc for details. Transformer TransformFunc + + // If set, log output will go to this logger instead of klog.Background(). + Logger *klog.Logger } // DeltaFIFO is like FIFO, but differs in two ways. One is that the @@ -136,6 +139,10 @@ type DeltaFIFO struct { // Called with every object if non-nil. transformer TransformFunc + + // logger is a per-instance logger. This gets chosen when constructing + // the instance, with klog.Background() as default. + logger klog.Logger } // TransformFunc allows for transforming an object before it will be processed. @@ -253,6 +260,10 @@ func NewDeltaFIFOWithOptions(opts DeltaFIFOOptions) *DeltaFIFO { emitDeltaTypeReplaced: opts.EmitDeltaTypeReplaced, transformer: opts.Transformer, + logger: klog.Background(), + } + if opts.Logger != nil { + f.logger = *opts.Logger } f.cond.L = &f.lock return f @@ -487,10 +498,10 @@ func (f *DeltaFIFO) queueActionInternalLocked(actionType, internalActionType Del // when given a non-empty list (as it is here). // If somehow it happens anyway, deal with it but complain. if oldDeltas == nil { - klog.Errorf("Impossible dedupDeltas for id=%q: oldDeltas=%#+v, obj=%#+v; ignoring", id, oldDeltas, obj) + f.logger.Error(nil, "Impossible dedupDeltas, ignoring", "id", id, "oldDeltas", oldDeltas, "obj", obj) return nil } - klog.Errorf("Impossible dedupDeltas for id=%q: oldDeltas=%#+v, obj=%#+v; breaking invariant by storing empty Deltas", id, oldDeltas, obj) + f.logger.Error(nil, "Impossible dedupDeltas, breaking invariant by storing empty Deltas", "id", id, "oldDeltas", oldDeltas, "obj", obj) f.items[id] = newDeltas return fmt.Errorf("Impossible dedupDeltas for id=%q: oldDeltas=%#+v, obj=%#+v; broke DeltaFIFO invariant by storing empty Deltas", id, oldDeltas, obj) } @@ -597,7 +608,7 @@ func (f *DeltaFIFO) Pop(process PopProcessFunc) (interface{}, error) { item, ok := f.items[id] if !ok { // This should never happen - klog.Errorf("Inconceivable! %q was in f.queue but not f.items; ignoring.", id) + f.logger.Error(nil, "Inconceivable! Item was in f.queue but not f.items; ignoring", "id", id) continue } delete(f.items, id) @@ -694,10 +705,10 @@ func (f *DeltaFIFO) Replace(list []interface{}, _ string) error { deletedObj, exists, err := f.knownObjects.GetByKey(k) if err != nil { deletedObj = nil - klog.Errorf("Unexpected error %v during lookup of key %v, placing DeleteFinalStateUnknown marker without object", err, k) + f.logger.Error(err, "Unexpected error during lookup, placing DeleteFinalStateUnknown marker without object", "key", k) } else if !exists { deletedObj = nil - klog.Infof("Key %v does not exist in known objects store, placing DeleteFinalStateUnknown marker without object", k) + f.logger.Info("Key does not exist in known objects store, placing DeleteFinalStateUnknown marker without object", "key", k) } queuedDeletions++ if err := f.queueActionLocked(Deleted, DeletedFinalStateUnknown{k, deletedObj}); err != nil { @@ -737,10 +748,10 @@ func (f *DeltaFIFO) Resync() error { func (f *DeltaFIFO) syncKeyLocked(key string) error { obj, exists, err := f.knownObjects.GetByKey(key) if err != nil { - klog.Errorf("Unexpected error %v during lookup of key %v, unable to queue object for sync", err, key) + f.logger.Error(err, "Unexpected error during lookup, unable to queue object for sync", "key", key) return nil } else if !exists { - klog.Infof("Key %v does not exist in known objects store, unable to queue object for sync", key) + f.logger.Info("Key does not exist in known objects store, unable to queue object for sync", "key", key) return nil } diff --git a/tools/cache/listers.go b/tools/cache/listers.go index a60f4494..9e050ff4 100644 --- a/tools/cache/listers.go +++ b/tools/cache/listers.go @@ -62,7 +62,12 @@ func ListAllByNamespace(indexer Indexer, namespace string, selector labels.Selec items, err := indexer.Index(NamespaceIndex, &metav1.ObjectMeta{Namespace: namespace}) if err != nil { // Ignore error; do slow search without index. - klog.Warningf("can not retrieve list of objects using index : %v", err) + // + // ListAllByNamespace is called by generated code + // (k8s.io/client-go/listers) and probably not worth converting + // to contextual logging, which would require changing all of + // those APIs. + klog.TODO().Info("Warning: can not retrieve list of objects using index", "err", err) for _, m := range indexer.List() { metadata, err := meta.Accessor(m) if err != nil { diff --git a/tools/cache/main_test.go b/tools/cache/main_test.go index 12817a07..b0ed00f3 100644 --- a/tools/cache/main_test.go +++ b/tools/cache/main_test.go @@ -30,14 +30,6 @@ func TestMain(m *testing.M) { goleak.IgnoreAnyFunction("k8s.io/client-go/tools/cache.TestFIFO_addUpdate.func1"), goleak.IgnoreAnyFunction("k8s.io/client-go/tools/cache.TestDeltaFIFO_addReplace.func1"), goleak.IgnoreAnyFunction("k8s.io/client-go/tools/cache.TestDeltaFIFO_addUpdate.func1"), - - // TODO: fix the following tests by adding WithContext APIs and cancellation. - goleak.IgnoreAnyFunction("k8s.io/client-go/tools/cache.TestTransformingInformerRace.func3"), - // Created by k8s.io/client-go/tools/cache.TestReflectorListAndWatch, cannot filter on that (https://github.com/uber-go/goleak/issues/135): - goleak.IgnoreAnyFunction("k8s.io/client-go/tools/cache.(*Reflector).ListAndWatch"), - goleak.IgnoreAnyFunction("k8s.io/client-go/tools/cache.(*Reflector).startResync"), - // ??? - goleak.IgnoreAnyFunction("k8s.io/client-go/tools/cache.(*DeltaFIFO).Close"), } goleak.VerifyTestMain(m, options...) } diff --git a/tools/cache/mutation_cache.go b/tools/cache/mutation_cache.go index c6f953d8..6800a625 100644 --- a/tools/cache/mutation_cache.go +++ b/tools/cache/mutation_cache.go @@ -60,7 +60,7 @@ type ResourceVersionComparator interface { // If includeAdds is true, objects in the mutation cache will be returned even if they don't exist // in the underlying store. This is only safe if your use of the cache can handle mutation entries // remaining in the cache for up to ttl when mutations and deletes occur very closely in time. -func NewIntegerResourceVersionMutationCache(backingCache Store, indexer Indexer, ttl time.Duration, includeAdds bool) MutationCache { +func NewIntegerResourceVersionMutationCache(logger klog.Logger, backingCache Store, indexer Indexer, ttl time.Duration, includeAdds bool) MutationCache { return &mutationCache{ backingCache: backingCache, indexer: indexer, @@ -68,6 +68,7 @@ func NewIntegerResourceVersionMutationCache(backingCache Store, indexer Indexer, comparator: etcdObjectVersioner{}, ttl: ttl, includeAdds: includeAdds, + logger: logger, } } @@ -75,6 +76,7 @@ func NewIntegerResourceVersionMutationCache(backingCache Store, indexer Indexer, // since you can't distinguish between, "didn't observe create" and "was deleted after create", // if the key is missing from the backing cache, we always return it as missing type mutationCache struct { + logger klog.Logger lock sync.Mutex backingCache Store indexer Indexer @@ -157,7 +159,7 @@ func (c *mutationCache) ByIndex(name string, indexKey string) ([]interface{}, er } elements, err := fn(updated) if err != nil { - klog.V(4).Infof("Unable to calculate an index entry for mutation cache entry %s: %v", key, err) + c.logger.V(4).Info("Unable to calculate an index entry for mutation cache entry", "key", key, "err", err) continue } for _, inIndex := range elements { @@ -204,7 +206,7 @@ func (c *mutationCache) Mutation(obj interface{}) { key, err := DeletionHandlingMetaNamespaceKeyFunc(obj) if err != nil { // this is a "nice to have", so failures shouldn't do anything weird - utilruntime.HandleError(err) + utilruntime.HandleErrorWithLogger(c.logger, err, "DeletionHandlingMetaNamespaceKeyFunc") return } diff --git a/tools/cache/mutation_detector.go b/tools/cache/mutation_detector.go index b37537cb..27ea62bf 100644 --- a/tools/cache/mutation_detector.go +++ b/tools/cache/mutation_detector.go @@ -50,6 +50,7 @@ func NewCacheMutationDetector(name string) MutationDetector { if !mutationDetectionEnabled { return dummyMutationDetector{} } + //nolint:logcheck // This code shouldn't be used in production. klog.Warningln("Mutation detector is enabled, this will result in memory leakage.") return &defaultCacheMutationDetector{name: name, period: 1 * time.Second, retainDuration: 2 * time.Minute} } diff --git a/tools/cache/processor_listener_test.go b/tools/cache/processor_listener_test.go index fd658197..517258a1 100644 --- a/tools/cache/processor_listener_test.go +++ b/tools/cache/processor_listener_test.go @@ -22,6 +22,7 @@ import ( "time" "k8s.io/apimachinery/pkg/util/wait" + "k8s.io/klog/v2" ) const ( @@ -35,7 +36,7 @@ func BenchmarkListener(b *testing.B) { swg.Add(b.N) b.SetParallelism(concurrencyLevel) // Preallocate enough space so that benchmark does not run out of it - pl := newProcessListener(&ResourceEventHandlerFuncs{ + pl := newProcessListener(klog.Background(), &ResourceEventHandlerFuncs{ AddFunc: func(obj interface{}) { swg.Done() }, diff --git a/tools/cache/reflector.go b/tools/cache/reflector.go index 030b4529..f8dac4f9 100644 --- a/tools/cache/reflector.go +++ b/tools/cache/reflector.go @@ -95,7 +95,7 @@ type Reflector struct { // lastSyncResourceVersionMutex guards read/write access to lastSyncResourceVersion lastSyncResourceVersionMutex sync.RWMutex // Called whenever the ListAndWatch drops the connection with an error. - watchErrorHandler WatchErrorHandler + watchErrorHandler WatchErrorHandlerWithContext // WatchListPageSize is the requested chunk size of initial and resync watch lists. // If unset, for consistent reads (RV="") or reads that opt-into arbitrarily old data // (RV="0") it will default to pager.PageSize, for the rest (RV != "" && RV != "0") @@ -150,20 +150,32 @@ type ResourceVersionUpdater interface { // should be offloaded. type WatchErrorHandler func(r *Reflector, err error) -// DefaultWatchErrorHandler is the default implementation of WatchErrorHandler -func DefaultWatchErrorHandler(r *Reflector, err error) { +// The WatchErrorHandler is called whenever ListAndWatch drops the +// connection with an error. After calling this handler, the informer +// will backoff and retry. +// +// The default implementation looks at the error type and tries to log +// the error message at an appropriate level. +// +// Implementations of this handler may display the error message in other +// ways. Implementations should return quickly - any expensive processing +// should be offloaded. +type WatchErrorHandlerWithContext func(ctx context.Context, r *Reflector, err error) + +// DefaultWatchErrorHandler is the default implementation of WatchErrorHandlerWithContext. +func DefaultWatchErrorHandler(ctx context.Context, r *Reflector, err error) { switch { case isExpiredError(err): // Don't set LastSyncResourceVersionUnavailable - LIST call with ResourceVersion=RV already // has a semantic that it returns data at least as fresh as provided RV. // So first try to LIST with setting RV to resource version of last observed object. - klog.V(4).Infof("%s: watch of %v closed with: %v", r.name, r.typeDescription, err) + klog.FromContext(ctx).V(4).Info("Watch closed", "reflector", r.name, "type", r.typeDescription, "err", err) case err == io.EOF: // watch closed normally case err == io.ErrUnexpectedEOF: - klog.V(1).Infof("%s: Watch for %v closed with unexpected EOF: %v", r.name, r.typeDescription, err) + klog.FromContext(ctx).V(1).Info("Watch closed with unexpected EOF", "reflector", r.name, "type", r.typeDescription, "err", err) default: - utilruntime.HandleError(fmt.Errorf("%s: Failed to watch %v: %v", r.name, r.typeDescription, err)) + utilruntime.HandleErrorWithContext(ctx, err, "Failed to watch", "reflector", r.name, "type", r.typeDescription) } } @@ -243,7 +255,7 @@ func NewReflectorWithOptions(lw ListerWatcher, expectedType interface{}, store S // 0.22 QPS. If we don't backoff for 2min, assume API server is healthy and we reset the backoff. backoffManager: wait.NewExponentialBackoffManager(800*time.Millisecond, 30*time.Second, 2*time.Minute, 2.0, 1.0, reflectorClock), clock: reflectorClock, - watchErrorHandler: WatchErrorHandler(DefaultWatchErrorHandler), + watchErrorHandler: WatchErrorHandlerWithContext(DefaultWatchErrorHandler), expectedType: reflect.TypeOf(expectedType), } @@ -309,14 +321,24 @@ var internalPackages = []string{"client-go/tools/cache/"} // Run repeatedly uses the reflector's ListAndWatch to fetch all the // objects and subsequent deltas. // Run will exit when stopCh is closed. +// +// Contextual logging: RunWithContext should be used instead of Run in code which supports contextual logging. func (r *Reflector) Run(stopCh <-chan struct{}) { - klog.V(3).Infof("Starting reflector %s (%s) from %s", r.typeDescription, r.resyncPeriod, r.name) + r.RunWithContext(wait.ContextForChannel(stopCh)) +} + +// RunWithContext repeatedly uses the reflector's ListAndWatch to fetch all the +// objects and subsequent deltas. +// Run will exit when the context is canceled. +func (r *Reflector) RunWithContext(ctx context.Context) { + logger := klog.FromContext(ctx) + logger.V(3).Info("Starting reflector", "type", r.typeDescription, "resyncPeriod", r.resyncPeriod, "reflector", r.name) wait.BackoffUntil(func() { - if err := r.ListAndWatch(stopCh); err != nil { - r.watchErrorHandler(r, err) + if err := r.ListAndWatchWithContext(ctx); err != nil { + r.watchErrorHandler(ctx, r, err) } - }, r.backoffManager, true, stopCh) - klog.V(3).Infof("Stopping reflector %s (%s) from %s", r.typeDescription, r.resyncPeriod, r.name) + }, r.backoffManager, true, ctx.Done()) + logger.V(3).Info("Stopping reflector", "type", r.typeDescription, "resyncPeriod", r.resyncPeriod, "reflector", r.name) } var ( @@ -345,21 +367,31 @@ func (r *Reflector) resyncChan() (<-chan time.Time, func() bool) { // ListAndWatch first lists all items and get the resource version at the moment of call, // and then use the resource version to watch. // It returns error if ListAndWatch didn't even try to initialize watch. +// +// Contextual logging: ListAndWatchWithContext should be used instead of ListAndWatch in code which supports contextual logging. func (r *Reflector) ListAndWatch(stopCh <-chan struct{}) error { - klog.V(3).Infof("Listing and watching %v from %s", r.typeDescription, r.name) + return r.ListAndWatchWithContext(wait.ContextForChannel(stopCh)) +} + +// ListAndWatchWithContext first lists all items and get the resource version at the moment of call, +// and then use the resource version to watch. +// It returns error if ListAndWatchWithContext didn't even try to initialize watch. +func (r *Reflector) ListAndWatchWithContext(ctx context.Context) error { + logger := klog.FromContext(ctx) + logger.V(3).Info("Listing and watching", "type", r.typeDescription, "reflector", r.name) var err error var w watch.Interface useWatchList := ptr.Deref(r.UseWatchList, false) fallbackToList := !useWatchList if useWatchList { - w, err = r.watchList(stopCh) + w, err = r.watchList(ctx) if w == nil && err == nil { // stopCh was closed return nil } if err != nil { - klog.Warningf("The watchlist request ended with an error, falling back to the standard LIST/WATCH semantics because making progress is better than deadlocking, err = %v", err) + logger.Error(err, "The watchlist request ended with an error, falling back to the standard LIST/WATCH semantics because making progress is better than deadlocking") fallbackToList = true // ensure that we won't accidentally pass some garbage down the watch. w = nil @@ -367,20 +399,21 @@ func (r *Reflector) ListAndWatch(stopCh <-chan struct{}) error { } if fallbackToList { - err = r.list(stopCh) + err = r.list(ctx) if err != nil { return err } } - klog.V(2).Infof("Caches populated for %v from %s", r.typeDescription, r.name) - return r.watchWithResync(w, stopCh) + logger.V(2).Info("Caches populated", "type", r.typeDescription, "reflector", r.name) + return r.watchWithResync(ctx, w) } // startResync periodically calls r.store.Resync() method. // Note that this method is blocking and should be // called in a separate goroutine. -func (r *Reflector) startResync(stopCh <-chan struct{}, cancelCh <-chan struct{}, resyncerrc chan error) { +func (r *Reflector) startResync(ctx context.Context, resyncerrc chan error) { + logger := klog.FromContext(ctx) resyncCh, cleanup := r.resyncChan() defer func() { cleanup() // Call the last one written into cleanup @@ -388,13 +421,11 @@ func (r *Reflector) startResync(stopCh <-chan struct{}, cancelCh <-chan struct{} for { select { case <-resyncCh: - case <-stopCh: - return - case <-cancelCh: + case <-ctx.Done(): return } if r.ShouldResync == nil || r.ShouldResync() { - klog.V(4).Infof("%s: forcing resync", r.name) + logger.V(4).Info("Forcing resync", "reflector", r.name) if err := r.store.Resync(); err != nil { resyncerrc <- err return @@ -406,16 +437,27 @@ func (r *Reflector) startResync(stopCh <-chan struct{}, cancelCh <-chan struct{} } // watchWithResync runs watch with startResync in the background. -func (r *Reflector) watchWithResync(w watch.Interface, stopCh <-chan struct{}) error { +func (r *Reflector) watchWithResync(ctx context.Context, w watch.Interface) error { resyncerrc := make(chan error, 1) - cancelCh := make(chan struct{}) - defer close(cancelCh) - go r.startResync(stopCh, cancelCh, resyncerrc) - return r.watch(w, stopCh, resyncerrc) + cancelCtx, cancel := context.WithCancel(ctx) + // Waiting for completion of the goroutine is relevant for race detector. + // Without this, there is a race between "this function returns + code + // waiting for it" and "goroutine does something". + var wg wait.Group + defer func() { + cancel() + wg.Wait() + }() + wg.Start(func() { + r.startResync(cancelCtx, resyncerrc) + }) + return r.watch(ctx, w, resyncerrc) } // watch simply starts a watch request with the server. -func (r *Reflector) watch(w watch.Interface, stopCh <-chan struct{}, resyncerrc chan error) error { +func (r *Reflector) watch(ctx context.Context, w watch.Interface, resyncerrc chan error) error { + stopCh := ctx.Done() + logger := klog.FromContext(ctx) var err error retry := NewRetryWithDeadline(r.MaxInternalErrorRetryDuration, time.Minute, apierrors.IsInternalError, r.clock) @@ -451,7 +493,7 @@ func (r *Reflector) watch(w watch.Interface, stopCh <-chan struct{}, resyncerrc w, err = r.listerWatcher.Watch(options) if err != nil { if canRetry := isWatchErrorRetriable(err); canRetry { - klog.V(4).Infof("%s: watch of %v returned %v - backing off", r.name, r.typeDescription, err) + logger.V(4).Info("Watch failed - backing off", "reflector", r.name, "type", r.typeDescription, "err", err) select { case <-stopCh: return nil @@ -463,8 +505,8 @@ func (r *Reflector) watch(w watch.Interface, stopCh <-chan struct{}, resyncerrc } } - err = handleWatch(start, w, r.store, r.expectedType, r.expectedGVK, r.name, r.typeDescription, r.setLastSyncResourceVersion, - r.clock, resyncerrc, stopCh) + err = handleWatch(ctx, start, w, r.store, r.expectedType, r.expectedGVK, r.name, r.typeDescription, r.setLastSyncResourceVersion, + r.clock, resyncerrc) // Ensure that watch will not be reused across iterations. w.Stop() w = nil @@ -476,9 +518,9 @@ func (r *Reflector) watch(w watch.Interface, stopCh <-chan struct{}, resyncerrc // Don't set LastSyncResourceVersionUnavailable - LIST call with ResourceVersion=RV already // has a semantic that it returns data at least as fresh as provided RV. // So first try to LIST with setting RV to resource version of last observed object. - klog.V(4).Infof("%s: watch of %v closed with: %v", r.name, r.typeDescription, err) + logger.V(4).Info("Watch closed", "reflector", r.name, "type", r.typeDescription, "err", err) case apierrors.IsTooManyRequests(err): - klog.V(2).Infof("%s: watch of %v returned 429 - backing off", r.name, r.typeDescription) + logger.V(2).Info("Watch returned 429 - backing off", "reflector", r.name, "type", r.typeDescription) select { case <-stopCh: return nil @@ -486,10 +528,10 @@ func (r *Reflector) watch(w watch.Interface, stopCh <-chan struct{}, resyncerrc continue } case apierrors.IsInternalError(err) && retry.ShouldRetry(): - klog.V(2).Infof("%s: retrying watch of %v internal error: %v", r.name, r.typeDescription, err) + logger.V(2).Info("Retrying watch after internal error", "reflector", r.name, "type", r.typeDescription, "err", err) continue default: - klog.Warningf("%s: watch of %v ended with: %v", r.name, r.typeDescription, err) + logger.Info("Warning: watch ended with error", "reflector", r.name, "type", r.typeDescription, "err", err) } } return nil @@ -499,7 +541,7 @@ func (r *Reflector) watch(w watch.Interface, stopCh <-chan struct{}, resyncerrc // list simply lists all items and records a resource version obtained from the server at the moment of the call. // the resource version can be used for further progress notification (aka. watch). -func (r *Reflector) list(stopCh <-chan struct{}) error { +func (r *Reflector) list(ctx context.Context) error { var resourceVersion string options := metav1.ListOptions{ResourceVersion: r.relistResourceVersion()} @@ -558,7 +600,7 @@ func (r *Reflector) list(stopCh <-chan struct{}) error { close(listCh) }() select { - case <-stopCh: + case <-ctx.Done(): return nil case r := <-panicCh: panic(r) @@ -566,7 +608,6 @@ func (r *Reflector) list(stopCh <-chan struct{}) error { } initTrace.Step("Objects listed", trace.Field{Key: "error", Value: err}) if err != nil { - klog.Warningf("%s: failed to list %v: %v", r.name, r.typeDescription, err) return fmt.Errorf("failed to list %v: %w", r.typeDescription, err) } @@ -624,7 +665,9 @@ func (r *Reflector) list(stopCh <-chan struct{}) error { // After receiving a "Bookmark" event the reflector is considered to be synchronized. // It replaces its internal store with the collected items and // reuses the current watch requests for getting further events. -func (r *Reflector) watchList(stopCh <-chan struct{}) (watch.Interface, error) { +func (r *Reflector) watchList(ctx context.Context) (watch.Interface, error) { + stopCh := ctx.Done() + logger := klog.FromContext(ctx) var w watch.Interface var err error var temporaryStore Store @@ -634,7 +677,7 @@ func (r *Reflector) watchList(stopCh <-chan struct{}) (watch.Interface, error) { // could be unified with the r.watch method isErrorRetriableWithSideEffectsFn := func(err error) bool { if canRetry := isWatchErrorRetriable(err); canRetry { - klog.V(2).Infof("%s: watch-list of %v returned %v - backing off", r.name, r.typeDescription, err) + logger.V(2).Info("watch-list failed - backing off", "reflector", r.name, "type", r.typeDescription, "err", err) <-r.backoffManager.Backoff().C() return true } @@ -681,9 +724,9 @@ func (r *Reflector) watchList(stopCh <-chan struct{}) (watch.Interface, error) { } return nil, err } - watchListBookmarkReceived, err := handleListWatch(start, w, temporaryStore, r.expectedType, r.expectedGVK, r.name, r.typeDescription, + watchListBookmarkReceived, err := handleListWatch(ctx, start, w, temporaryStore, r.expectedType, r.expectedGVK, r.name, r.typeDescription, func(rv string) { resourceVersion = rv }, - r.clock, make(chan error), stopCh) + r.clock, make(chan error)) if err != nil { w.Stop() // stop and retry with clean state if errors.Is(err, errorStopRequested) { @@ -706,7 +749,7 @@ func (r *Reflector) watchList(stopCh <-chan struct{}) (watch.Interface, error) { // we utilize the temporaryStore to ensure independence from the current store implementation. // as of today, the store is implemented as a queue and will be drained by the higher-level // component as soon as it finishes replacing the content. - checkWatchListDataConsistencyIfRequested(wait.ContextForChannel(stopCh), r.name, resourceVersion, wrapListFuncWithContext(r.listerWatcher.List), temporaryStore.List) + checkWatchListDataConsistencyIfRequested(ctx, r.name, resourceVersion, wrapListFuncWithContext(r.listerWatcher.List), temporaryStore.List) if err := r.store.Replace(temporaryStore.List(), resourceVersion); err != nil { return nil, fmt.Errorf("unable to sync watch-list result: %w", err) @@ -731,6 +774,7 @@ func (r *Reflector) syncWith(items []runtime.Object, resourceVersion string) err // retry. If successful, the watcher will be left open after receiving the // initial set of objects, to allow watching for future events. func handleListWatch( + ctx context.Context, start time.Time, w watch.Interface, store Store, @@ -741,17 +785,17 @@ func handleListWatch( setLastSyncResourceVersion func(string), clock clock.Clock, errCh chan error, - stopCh <-chan struct{}, ) (bool, error) { exitOnWatchListBookmarkReceived := true - return handleAnyWatch(start, w, store, expectedType, expectedGVK, name, expectedTypeName, - setLastSyncResourceVersion, exitOnWatchListBookmarkReceived, clock, errCh, stopCh) + return handleAnyWatch(ctx, start, w, store, expectedType, expectedGVK, name, expectedTypeName, + setLastSyncResourceVersion, exitOnWatchListBookmarkReceived, clock, errCh) } // handleListWatch consumes events from w, updates the Store, and records the // last seen ResourceVersion, to allow continuing from that ResourceVersion on // retry. The watcher will always be stopped on exit. func handleWatch( + ctx context.Context, start time.Time, w watch.Interface, store Store, @@ -762,11 +806,10 @@ func handleWatch( setLastSyncResourceVersion func(string), clock clock.Clock, errCh chan error, - stopCh <-chan struct{}, ) error { exitOnWatchListBookmarkReceived := false - _, err := handleAnyWatch(start, w, store, expectedType, expectedGVK, name, expectedTypeName, - setLastSyncResourceVersion, exitOnWatchListBookmarkReceived, clock, errCh, stopCh) + _, err := handleAnyWatch(ctx, start, w, store, expectedType, expectedGVK, name, expectedTypeName, + setLastSyncResourceVersion, exitOnWatchListBookmarkReceived, clock, errCh) return err } @@ -779,7 +822,9 @@ func handleWatch( // The watcher will always be stopped, unless exitOnWatchListBookmarkReceived is // true and watchListBookmarkReceived is true. This allows the same watch stream // to be re-used by the caller to continue watching for new events. -func handleAnyWatch(start time.Time, +func handleAnyWatch( + ctx context.Context, + start time.Time, w watch.Interface, store Store, expectedType reflect.Type, @@ -790,17 +835,17 @@ func handleAnyWatch(start time.Time, exitOnWatchListBookmarkReceived bool, clock clock.Clock, errCh chan error, - stopCh <-chan struct{}, ) (bool, error) { watchListBookmarkReceived := false eventCount := 0 - initialEventsEndBookmarkWarningTicker := newInitialEventsEndBookmarkTicker(name, clock, start, exitOnWatchListBookmarkReceived) + logger := klog.FromContext(ctx) + initialEventsEndBookmarkWarningTicker := newInitialEventsEndBookmarkTicker(logger, name, clock, start, exitOnWatchListBookmarkReceived) defer initialEventsEndBookmarkWarningTicker.Stop() loop: for { select { - case <-stopCh: + case <-ctx.Done(): return watchListBookmarkReceived, errorStopRequested case err := <-errCh: return watchListBookmarkReceived, err @@ -813,19 +858,19 @@ loop: } if expectedType != nil { if e, a := expectedType, reflect.TypeOf(event.Object); e != a { - utilruntime.HandleError(fmt.Errorf("%s: expected type %v, but watch event object had type %v", name, e, a)) + utilruntime.HandleErrorWithContext(ctx, nil, "Unexpected watch event object type", "reflector", name, "expectedType", e, "actualType", a) continue } } if expectedGVK != nil { if e, a := *expectedGVK, event.Object.GetObjectKind().GroupVersionKind(); e != a { - utilruntime.HandleError(fmt.Errorf("%s: expected gvk %v, but watch event object had gvk %v", name, e, a)) + utilruntime.HandleErrorWithContext(ctx, nil, "Unexpected watch event object gvk", "reflector", name, "expectedGVK", e, "actualGVK", a) continue } } meta, err := meta.Accessor(event.Object) if err != nil { - utilruntime.HandleError(fmt.Errorf("%s: unable to understand watch event %#v", name, event)) + utilruntime.HandleErrorWithContext(ctx, err, "Unable to understand watch event", "reflector", name, "event", event) continue } resourceVersion := meta.GetResourceVersion() @@ -833,12 +878,12 @@ loop: case watch.Added: err := store.Add(event.Object) if err != nil { - utilruntime.HandleError(fmt.Errorf("%s: unable to add watch event object (%#v) to store: %v", name, event.Object, err)) + utilruntime.HandleErrorWithContext(ctx, err, "Unable to add watch event object to store", "reflector", name, "object", event.Object) } case watch.Modified: err := store.Update(event.Object) if err != nil { - utilruntime.HandleError(fmt.Errorf("%s: unable to update watch event object (%#v) to store: %v", name, event.Object, err)) + utilruntime.HandleErrorWithContext(ctx, err, "Unable to update watch event object to store", "reflector", name, "object", event.Object) } case watch.Deleted: // TODO: Will any consumers need access to the "last known @@ -846,7 +891,7 @@ loop: // to change this. err := store.Delete(event.Object) if err != nil { - utilruntime.HandleError(fmt.Errorf("%s: unable to delete watch event object (%#v) from store: %v", name, event.Object, err)) + utilruntime.HandleErrorWithContext(ctx, err, "Unable to delete watch event object from store", "reflector", name, "object", event.Object) } case watch.Bookmark: // A `Bookmark` means watch has synced here, just update the resourceVersion @@ -854,7 +899,7 @@ loop: watchListBookmarkReceived = true } default: - utilruntime.HandleError(fmt.Errorf("%s: unable to understand watch event %#v", name, event)) + utilruntime.HandleErrorWithContext(ctx, err, "Unknown watch event", "reflector", name, "event", event) } setLastSyncResourceVersion(resourceVersion) if rvu, ok := store.(ResourceVersionUpdater); ok { @@ -863,7 +908,7 @@ loop: eventCount++ if exitOnWatchListBookmarkReceived && watchListBookmarkReceived { watchDuration := clock.Since(start) - klog.V(4).Infof("exiting %v Watch because received the bookmark that marks the end of initial events stream, total %v items received in %v", name, eventCount, watchDuration) + klog.FromContext(ctx).V(4).Info("Exiting watch because received the bookmark that marks the end of initial events stream", "reflector", name, "totalItems", eventCount, "duration", watchDuration) return watchListBookmarkReceived, nil } initialEventsEndBookmarkWarningTicker.observeLastEventTimeStamp(clock.Now()) @@ -876,7 +921,7 @@ loop: if watchDuration < 1*time.Second && eventCount == 0 { return watchListBookmarkReceived, fmt.Errorf("very short watch: %s: Unexpected watch close - watch lasted less than a second and no items received", name) } - klog.V(4).Infof("%s: Watch close - %v total %v items received", name, expectedTypeName, eventCount) + klog.FromContext(ctx).V(4).Info("Watch close", "reflector", name, "type", expectedTypeName, "totalItems", eventCount) return watchListBookmarkReceived, nil } @@ -1004,8 +1049,9 @@ func wrapListFuncWithContext(listFn ListFunc) func(ctx context.Context, options // The methods exposed by this type are not thread-safe. type initialEventsEndBookmarkTicker struct { clock.Ticker - clock clock.Clock - name string + clock clock.Clock + name string + logger klog.Logger watchStart time.Time tickInterval time.Duration @@ -1019,15 +1065,15 @@ type initialEventsEndBookmarkTicker struct { // Note that the caller controls whether to call t.C() and t.Stop(). // // In practice, the reflector exits the watchHandler as soon as the bookmark event is received and calls the t.C() method. -func newInitialEventsEndBookmarkTicker(name string, c clock.Clock, watchStart time.Time, exitOnWatchListBookmarkReceived bool) *initialEventsEndBookmarkTicker { - return newInitialEventsEndBookmarkTickerInternal(name, c, watchStart, 10*time.Second, exitOnWatchListBookmarkReceived) +func newInitialEventsEndBookmarkTicker(logger klog.Logger, name string, c clock.Clock, watchStart time.Time, exitOnWatchListBookmarkReceived bool) *initialEventsEndBookmarkTicker { + return newInitialEventsEndBookmarkTickerInternal(logger, name, c, watchStart, 10*time.Second, exitOnWatchListBookmarkReceived) } -func newInitialEventsEndBookmarkTickerInternal(name string, c clock.Clock, watchStart time.Time, tickInterval time.Duration, exitOnWatchListBookmarkReceived bool) *initialEventsEndBookmarkTicker { +func newInitialEventsEndBookmarkTickerInternal(logger klog.Logger, name string, c clock.Clock, watchStart time.Time, tickInterval time.Duration, exitOnWatchListBookmarkReceived bool) *initialEventsEndBookmarkTicker { clockWithTicker, ok := c.(clock.WithTicker) if !ok || !exitOnWatchListBookmarkReceived { if exitOnWatchListBookmarkReceived { - klog.Warningf("clock does not support WithTicker interface but exitOnInitialEventsEndBookmark was requested") + logger.Info("Warning: clock does not support WithTicker interface but exitOnInitialEventsEndBookmark was requested") } return &initialEventsEndBookmarkTicker{ Ticker: &noopTicker{}, @@ -1038,6 +1084,7 @@ func newInitialEventsEndBookmarkTickerInternal(name string, c clock.Clock, watch Ticker: clockWithTicker.NewTicker(tickInterval), clock: c, name: name, + logger: logger, watchStart: watchStart, tickInterval: tickInterval, } @@ -1049,7 +1096,7 @@ func (t *initialEventsEndBookmarkTicker) observeLastEventTimeStamp(lastEventObse func (t *initialEventsEndBookmarkTicker) warnIfExpired() { if err := t.produceWarningIfExpired(); err != nil { - klog.Warning(err) + t.logger.Info("Warning: event bookmark expired", "err", err) } } diff --git a/tools/cache/reflector_test.go b/tools/cache/reflector_test.go index a56fce6e..b880cfc0 100644 --- a/tools/cache/reflector_test.go +++ b/tools/cache/reflector_test.go @@ -45,6 +45,7 @@ import ( "k8s.io/apimachinery/pkg/util/sets" "k8s.io/apimachinery/pkg/util/wait" "k8s.io/apimachinery/pkg/watch" + "k8s.io/klog/v2/ktesting" "k8s.io/utils/clock" testingclock "k8s.io/utils/clock/testing" ) @@ -64,6 +65,7 @@ func (t *testLW) Watch(options metav1.ListOptions) (watch.Interface, error) { } func TestCloseWatchChannelOnError(t *testing.T) { + _, ctx := ktesting.NewTestContext(t) r := NewReflector(&testLW{}, &v1.Pod{}, NewStore(MetaNamespaceKeyFunc), 0) pod := &v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "bar"}} fw := watch.NewFake() @@ -75,7 +77,7 @@ func TestCloseWatchChannelOnError(t *testing.T) { return &v1.PodList{ListMeta: metav1.ListMeta{ResourceVersion: "1"}}, nil }, } - go r.ListAndWatch(wait.NeverStop) + go func() { assert.NoError(t, r.ListAndWatchWithContext(ctx)) }() fw.Error(pod) select { case _, ok := <-fw.ResultChan(): @@ -89,7 +91,8 @@ func TestCloseWatchChannelOnError(t *testing.T) { } func TestRunUntil(t *testing.T) { - stopCh := make(chan struct{}) + _, ctx := ktesting.NewTestContext(t) + ctx, cancel := context.WithCancelCause(ctx) store := NewStore(MetaNamespaceKeyFunc) r := NewReflector(&testLW{}, &v1.Pod{}, store, 0) fw := watch.NewFake() @@ -104,13 +107,13 @@ func TestRunUntil(t *testing.T) { doneCh := make(chan struct{}) go func() { defer close(doneCh) - r.Run(stopCh) + r.RunWithContext(ctx) }() // Synchronously add a dummy pod into the watch channel so we // know the RunUntil go routine is in the watch handler. fw.Add(&v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "bar"}}) - close(stopCh) + cancel(errors.New("done")) resultCh := fw.ResultChan() for { select { @@ -149,8 +152,9 @@ func TestReflectorResyncChan(t *testing.T) { // TestReflectorWatchStoppedBefore ensures that neither List nor Watch are // called if the stop channel is closed before Reflector.watch is called. func TestReflectorWatchStoppedBefore(t *testing.T) { - stopCh := make(chan struct{}) - close(stopCh) + _, ctx := ktesting.NewTestContext(t) + ctx, cancel := context.WithCancelCause(ctx) + cancel(errors.New("don't run")) lw := &ListWatch{ ListFunc: func(_ metav1.ListOptions) (runtime.Object, error) { @@ -165,14 +169,15 @@ func TestReflectorWatchStoppedBefore(t *testing.T) { } target := NewReflector(lw, &v1.Pod{}, nil, 0) - err := target.watch(nil, stopCh, nil) + err := target.watch(ctx, nil, nil) require.NoError(t, err) } // TestReflectorWatchStoppedAfter ensures that neither the watcher is stopped if // the stop channel is closed after Reflector.watch has started watching. func TestReflectorWatchStoppedAfter(t *testing.T) { - stopCh := make(chan struct{}) + _, ctx := ktesting.NewTestContext(t) + ctx, cancel := context.WithCancelCause(ctx) var watchers []*watch.FakeWatcher @@ -185,7 +190,7 @@ func TestReflectorWatchStoppedAfter(t *testing.T) { // Simulate the stop channel being closed after watching has started go func() { time.Sleep(10 * time.Millisecond) - close(stopCh) + cancel(errors.New("10ms timeout reached")) }() // Use a fake watcher that never sends events w := watch.NewFake() @@ -195,7 +200,7 @@ func TestReflectorWatchStoppedAfter(t *testing.T) { } target := NewReflector(lw, &v1.Pod{}, nil, 0) - err := target.watch(nil, stopCh, nil) + err := target.watch(ctx, nil, nil) require.NoError(t, err) require.Len(t, watchers, 1) require.True(t, watchers[0].IsStopped()) @@ -219,9 +224,10 @@ func BenchmarkReflectorResyncChanMany(b *testing.B) { func TestReflectorHandleWatchStoppedBefore(t *testing.T) { s := NewStore(MetaNamespaceKeyFunc) g := NewReflector(&testLW{}, &v1.Pod{}, s, 0) - stopCh := make(chan struct{}) - // Simulate the watch channel being closed before the watchHandler is called - close(stopCh) + _, ctx := ktesting.NewTestContext(t) + ctx, cancel := context.WithCancelCause(ctx) + // Simulate the context being canceled before the watchHandler is called + cancel(errors.New("don't run")) var calls []string resultCh := make(chan watch.Event) fw := watch.MockWatcher{ @@ -234,7 +240,7 @@ func TestReflectorHandleWatchStoppedBefore(t *testing.T) { return resultCh }, } - err := handleWatch(time.Now(), fw, s, g.expectedType, g.expectedGVK, g.name, g.typeDescription, g.setLastSyncResourceVersion, g.clock, nevererrc, stopCh) + err := handleWatch(ctx, time.Now(), fw, s, g.expectedType, g.expectedGVK, g.name, g.typeDescription, g.setLastSyncResourceVersion, g.clock, nevererrc) if err == nil { t.Errorf("unexpected non-error") } @@ -251,7 +257,8 @@ func TestReflectorHandleWatchStoppedAfter(t *testing.T) { s := NewStore(MetaNamespaceKeyFunc) g := NewReflector(&testLW{}, &v1.Pod{}, s, 0) var calls []string - stopCh := make(chan struct{}) + _, ctx := ktesting.NewTestContext(t) + ctx, cancel := context.WithCancelCause(ctx) resultCh := make(chan watch.Event) fw := watch.MockWatcher{ StopFunc: func() { @@ -265,12 +272,12 @@ func TestReflectorHandleWatchStoppedAfter(t *testing.T) { // caller, after watching has started. go func() { time.Sleep(10 * time.Millisecond) - close(stopCh) + cancel(errors.New("10ms timeout reached")) }() return resultCh }, } - err := handleWatch(time.Now(), fw, s, g.expectedType, g.expectedGVK, g.name, g.typeDescription, g.setLastSyncResourceVersion, g.clock, nevererrc, stopCh) + err := handleWatch(ctx, time.Now(), fw, s, g.expectedType, g.expectedGVK, g.name, g.typeDescription, g.setLastSyncResourceVersion, g.clock, nevererrc) if err == nil { t.Errorf("unexpected non-error") } @@ -285,6 +292,7 @@ func TestReflectorHandleWatchStoppedAfter(t *testing.T) { func TestReflectorHandleWatchResultChanClosedBefore(t *testing.T) { s := NewStore(MetaNamespaceKeyFunc) g := NewReflector(&testLW{}, &v1.Pod{}, s, 0) + _, ctx := ktesting.NewTestContext(t) var calls []string resultCh := make(chan watch.Event) fw := watch.MockWatcher{ @@ -298,7 +306,7 @@ func TestReflectorHandleWatchResultChanClosedBefore(t *testing.T) { } // Simulate the result channel being closed by the producer before handleWatch is called. close(resultCh) - err := handleWatch(time.Now(), fw, s, g.expectedType, g.expectedGVK, g.name, g.typeDescription, g.setLastSyncResourceVersion, g.clock, nevererrc, wait.NeverStop) + err := handleWatch(ctx, time.Now(), fw, s, g.expectedType, g.expectedGVK, g.name, g.typeDescription, g.setLastSyncResourceVersion, g.clock, nevererrc) if err == nil { t.Errorf("unexpected non-error") } @@ -313,6 +321,7 @@ func TestReflectorHandleWatchResultChanClosedBefore(t *testing.T) { func TestReflectorHandleWatchResultChanClosedAfter(t *testing.T) { s := NewStore(MetaNamespaceKeyFunc) g := NewReflector(&testLW{}, &v1.Pod{}, s, 0) + _, ctx := ktesting.NewTestContext(t) var calls []string resultCh := make(chan watch.Event) fw := watch.MockWatcher{ @@ -331,7 +340,7 @@ func TestReflectorHandleWatchResultChanClosedAfter(t *testing.T) { return resultCh }, } - err := handleWatch(time.Now(), fw, s, g.expectedType, g.expectedGVK, g.name, g.typeDescription, g.setLastSyncResourceVersion, g.clock, nevererrc, wait.NeverStop) + err := handleWatch(ctx, time.Now(), fw, s, g.expectedType, g.expectedGVK, g.name, g.typeDescription, g.setLastSyncResourceVersion, g.clock, nevererrc) if err == nil { t.Errorf("unexpected non-error") } @@ -348,11 +357,12 @@ func TestReflectorWatchHandler(t *testing.T) { // watching after all the events have been consumed. This avoids race // conditions which can happen if the producer calls Stop(), instead of the // consumer. - stopCh := make(chan struct{}) + _, ctx := ktesting.NewTestContext(t) + ctx, cancel := context.WithCancelCause(ctx) setLastSyncResourceVersion := func(rv string) { g.setLastSyncResourceVersion(rv) if rv == "32" { - close(stopCh) + cancel(errors.New("LastSyncResourceVersion is 32")) } } fw := watch.NewFake() @@ -365,7 +375,7 @@ func TestReflectorWatchHandler(t *testing.T) { fw.Add(&v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "baz", ResourceVersion: "32"}}) fw.Stop() }() - err := handleWatch(time.Now(), fw, s, g.expectedType, g.expectedGVK, g.name, g.typeDescription, setLastSyncResourceVersion, g.clock, nevererrc, stopCh) + err := handleWatch(ctx, time.Now(), fw, s, g.expectedType, g.expectedGVK, g.name, g.typeDescription, setLastSyncResourceVersion, g.clock, nevererrc) // TODO(karlkfi): Fix FakeWatcher to avoid race condition between watcher.Stop() & close(stopCh) if err != nil && !errors.Is(err, errorStopRequested) { t.Errorf("unexpected error %v", err) @@ -408,15 +418,19 @@ func TestReflectorStopWatch(t *testing.T) { s := NewStore(MetaNamespaceKeyFunc) g := NewReflector(&testLW{}, &v1.Pod{}, s, 0) fw := watch.NewFake() - stopWatch := make(chan struct{}) - close(stopWatch) - err := handleWatch(time.Now(), fw, s, g.expectedType, g.expectedGVK, g.name, g.typeDescription, g.setLastSyncResourceVersion, g.clock, nevererrc, stopWatch) + _, ctx := ktesting.NewTestContext(t) + ctx, cancel := context.WithCancelCause(ctx) + cancel(errors.New("don't run")) + err := handleWatch(ctx, time.Now(), fw, s, g.expectedType, g.expectedGVK, g.name, g.typeDescription, g.setLastSyncResourceVersion, g.clock, nevererrc) if err != errorStopRequested { t.Errorf("expected stop error, got %q", err) } } func TestReflectorListAndWatch(t *testing.T) { + _, ctx := ktesting.NewTestContext(t) + ctx, cancel := context.WithCancel(ctx) + defer cancel() createdFakes := make(chan *watch.FakeWatcher) // The ListFunc says that it's at revision 1. Therefore, we expect our WatchFunc @@ -442,7 +456,7 @@ func TestReflectorListAndWatch(t *testing.T) { } s := NewFIFO(MetaNamespaceKeyFunc) r := NewReflector(lw, &v1.Pod{}, s, 0) - go r.ListAndWatch(wait.NeverStop) + go func() { assert.NoError(t, r.ListAndWatchWithContext(ctx)) }() ids := []string{"foo", "bar", "baz", "qux", "zoo"} var fw *watch.FakeWatcher @@ -539,7 +553,8 @@ func TestReflectorListAndWatchWithErrors(t *testing.T) { } } watchRet, watchErr := item.events, item.watchErr - stopCh := make(chan struct{}) + _, ctx := ktesting.NewTestContext(t) + ctx, cancel := context.WithCancelCause(ctx) lw := &testLW{ WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) { if watchErr != nil { @@ -557,7 +572,7 @@ func TestReflectorListAndWatchWithErrors(t *testing.T) { // result channel, and wait for the consumer to stop the // watcher, to avoid race conditions. // TODO: Fix the FakeWatcher to separate watcher.Stop from close(resultCh) - close(stopCh) + cancel(errors.New("done")) }() return fw, nil }, @@ -566,7 +581,7 @@ func TestReflectorListAndWatchWithErrors(t *testing.T) { }, } r := NewReflector(lw, &v1.Pod{}, s, 0) - err := r.ListAndWatch(stopCh) + err := r.ListAndWatchWithContext(ctx) if item.listErr != nil && !errors.Is(err, item.listErr) { t.Errorf("unexpected ListAndWatch error: %v", err) } @@ -593,7 +608,8 @@ func TestReflectorListAndWatchInitConnBackoff(t *testing.T) { for _, test := range table { t.Run(fmt.Sprintf("%d connection failures takes at least %d ms", test.numConnFails, 1< 0 { if resyncPeriod < minimumResyncPeriod { - klog.Warningf("resyncPeriod %v is too small. Changing it to the minimum allowed value of %v", resyncPeriod, minimumResyncPeriod) + logger.Info("Warning: resync period is too small. Changing it to the minimum allowed value", "resyncPeriod", resyncPeriod, "minimumResyncPeriod", minimumResyncPeriod) resyncPeriod = minimumResyncPeriod } if resyncPeriod < s.resyncCheckPeriod { if s.started { - klog.Warningf("resyncPeriod %v is smaller than resyncCheckPeriod %v and the informer has already started. Changing it to %v", resyncPeriod, s.resyncCheckPeriod, s.resyncCheckPeriod) + logger.Info("Warning: resync period is smaller than resync check period and the informer has already started. Changing it to the resync check period", "resyncPeriod", resyncPeriod, "resyncCheckPeriod", s.resyncCheckPeriod) + resyncPeriod = s.resyncCheckPeriod } else { // if the event handler's resyncPeriod is smaller than the current resyncCheckPeriod, update // resyncCheckPeriod to match resyncPeriod and adjust the resync periods of all the listeners // accordingly s.resyncCheckPeriod = resyncPeriod - s.processor.resyncCheckPeriodChanged(resyncPeriod) + s.processor.resyncCheckPeriodChanged(logger, resyncPeriod) } } } - listener := newProcessListener(handler, resyncPeriod, determineResyncPeriod(resyncPeriod, s.resyncCheckPeriod), s.clock.Now(), initialBufferSize, s.HasSynced) + listener := newProcessListener(logger, handler, resyncPeriod, determineResyncPeriod(logger, resyncPeriod, s.resyncCheckPeriod), s.clock.Now(), initialBufferSize, s.HasSynced) if !s.started { return s.processor.addListener(listener), nil @@ -794,7 +877,7 @@ func (p *sharedProcessor) distribute(obj interface{}, sync bool) { } } -func (p *sharedProcessor) run(stopCh <-chan struct{}) { +func (p *sharedProcessor) run(ctx context.Context) { func() { p.listenersLock.RLock() defer p.listenersLock.RUnlock() @@ -804,7 +887,7 @@ func (p *sharedProcessor) run(stopCh <-chan struct{}) { } p.listenersStarted = true }() - <-stopCh + <-ctx.Done() p.listenersLock.Lock() defer p.listenersLock.Unlock() @@ -844,13 +927,13 @@ func (p *sharedProcessor) shouldResync() bool { return resyncNeeded } -func (p *sharedProcessor) resyncCheckPeriodChanged(resyncCheckPeriod time.Duration) { +func (p *sharedProcessor) resyncCheckPeriodChanged(logger klog.Logger, resyncCheckPeriod time.Duration) { p.listenersLock.RLock() defer p.listenersLock.RUnlock() for listener := range p.listeners { resyncPeriod := determineResyncPeriod( - listener.requestedResyncPeriod, resyncCheckPeriod) + logger, listener.requestedResyncPeriod, resyncCheckPeriod) listener.setResyncPeriod(resyncPeriod) } } @@ -867,6 +950,7 @@ func (p *sharedProcessor) resyncCheckPeriodChanged(resyncCheckPeriod time.Durati // processorListener also keeps track of the adjusted requested resync // period of the listener. type processorListener struct { + logger klog.Logger nextCh chan interface{} addCh chan interface{} @@ -910,8 +994,9 @@ func (p *processorListener) HasSynced() bool { return p.syncTracker.HasSynced() } -func newProcessListener(handler ResourceEventHandler, requestedResyncPeriod, resyncPeriod time.Duration, now time.Time, bufferSize int, hasSynced func() bool) *processorListener { +func newProcessListener(logger klog.Logger, handler ResourceEventHandler, requestedResyncPeriod, resyncPeriod time.Duration, now time.Time, bufferSize int, hasSynced func() bool) *processorListener { ret := &processorListener{ + logger: logger, nextCh: make(chan interface{}), addCh: make(chan interface{}), handler: handler, @@ -934,7 +1019,7 @@ func (p *processorListener) add(notification interface{}) { } func (p *processorListener) pop() { - defer utilruntime.HandleCrash() + defer utilruntime.HandleCrashWithLogger(p.logger) defer close(p.nextCh) // Tell .run() to stop var nextCh chan<- interface{} @@ -966,11 +1051,21 @@ func (p *processorListener) pop() { func (p *processorListener) run() { // this call blocks until the channel is closed. When a panic happens during the notification // we will catch it, **the offending item will be skipped!**, and after a short delay (one second) - // the next notification will be attempted. This is usually better than the alternative of never + // the next notification will be attempted. This is usually better than the alternative of never // delivering again. - stopCh := make(chan struct{}) - wait.Until(func() { - for next := range p.nextCh { + // + // This only applies if utilruntime is configured to not panic, which is not the default. + sleepAfterCrash := false + for next := range p.nextCh { + if sleepAfterCrash { + // Sleep before processing the next item. + time.Sleep(time.Second) + } + func() { + // Gets reset below, but only if we get that far. + sleepAfterCrash = true + defer utilruntime.HandleCrashWithLogger(p.logger) + switch notification := next.(type) { case updateNotification: p.handler.OnUpdate(notification.oldObj, notification.newObj) @@ -982,12 +1077,11 @@ func (p *processorListener) run() { case deleteNotification: p.handler.OnDelete(notification.oldObj) default: - utilruntime.HandleError(fmt.Errorf("unrecognized notification: %T", next)) + utilruntime.HandleErrorWithLogger(p.logger, nil, "unrecognized notification", "notificationType", fmt.Sprintf("%T", next)) } - } - // the only way to get here is if the p.nextCh is empty and closed - close(stopCh) - }, 1*time.Second, stopCh) + sleepAfterCrash = false + }() + } } // shouldResync deterimines if the listener needs a resync. If the listener's resyncPeriod is 0, diff --git a/tools/cache/shared_informer_test.go b/tools/cache/shared_informer_test.go index a8f5f074..d17db6bd 100644 --- a/tools/cache/shared_informer_test.go +++ b/tools/cache/shared_informer_test.go @@ -20,6 +20,7 @@ import ( "context" "fmt" "math/rand" + "runtime" "strconv" "strings" "sync" @@ -34,8 +35,13 @@ import ( v1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/meta" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + utilruntime "k8s.io/apimachinery/pkg/util/runtime" "k8s.io/apimachinery/pkg/util/sets" "k8s.io/apimachinery/pkg/util/wait" + fcache "k8s.io/client-go/tools/cache/testing" + "k8s.io/klog/v2" + "k8s.io/klog/v2/ktesting" + "k8s.io/klog/v2/textlogger" testingclock "k8s.io/utils/clock/testing" ) @@ -145,10 +151,15 @@ func TestIndexer(t *testing.T) { if err != nil { t.Fatal(err) } - stop := make(chan struct{}) - defer close(stop) - go informer.Run(stop) + var wg wait.Group + stop := make(chan struct{}) + wg.StartWithChannel(stop, informer.Run) + defer func() { + close(stop) + wg.Wait() + }() + WaitForCacheSync(stop, informer.HasSynced) cmpOps := cmpopts.SortSlices(func(a, b any) bool { @@ -222,10 +233,13 @@ func TestListenerResyncPeriods(t *testing.T) { informer.AddEventHandlerWithResyncPeriod(listener3, listener3.resyncPeriod) listeners := []*testListener{listener1, listener2, listener3} + var wg wait.Group stop := make(chan struct{}) - defer close(stop) - - go informer.Run(stop) + wg.StartWithChannel(stop, informer.Run) + defer func() { + close(stop) + wg.Wait() + }() // ensure all listeners got the initial List for _, listener := range listeners { @@ -361,10 +375,14 @@ func TestSharedInformerInitializationRace(t *testing.T) { informer := NewSharedInformer(source, &v1.Pod{}, 1*time.Second).(*sharedIndexInformer) listener := newTestListener("raceListener", 0) - stop := make(chan struct{}) go informer.AddEventHandlerWithResyncPeriod(listener, listener.resyncPeriod) - go informer.Run(stop) - close(stop) + var wg wait.Group + stop := make(chan struct{}) + wg.StartWithChannel(stop, informer.Run) + defer func() { + close(stop) + wg.Wait() + }() } // TestSharedInformerWatchDisruption simulates a watch that was closed @@ -392,10 +410,13 @@ func TestSharedInformerWatchDisruption(t *testing.T) { informer.AddEventHandlerWithResyncPeriod(listenerResync, listenerResync.resyncPeriod) listeners := []*testListener{listenerNoResync, listenerResync} + var wg wait.Group stop := make(chan struct{}) - defer close(stop) - - go informer.Run(stop) + wg.StartWithChannel(stop, informer.Run) + defer func() { + close(stop) + wg.Wait() + }() for _, listener := range listeners { if !listener.ok() { @@ -458,8 +479,13 @@ func TestSharedInformerErrorHandling(t *testing.T) { errCh <- err }) + var wg wait.Group stop := make(chan struct{}) - go informer.Run(stop) + wg.StartWithChannel(stop, informer.Run) + defer func() { + close(stop) + wg.Wait() + }() select { case err := <-errCh: @@ -469,7 +495,6 @@ func TestSharedInformerErrorHandling(t *testing.T) { case <-time.After(time.Second): t.Errorf("Timeout waiting for error handler call") } - close(stop) } // TestSharedInformerStartRace is a regression test to ensure there is no race between @@ -494,9 +519,12 @@ func TestSharedInformerStartRace(t *testing.T) { } }() - go informer.Run(stop) - - close(stop) + var wg wait.Group + wg.StartWithChannel(stop, informer.Run) + defer func() { + close(stop) + wg.Wait() + }() } func TestSharedInformerTransformer(t *testing.T) { @@ -522,9 +550,13 @@ func TestSharedInformerTransformer(t *testing.T) { listenerTransformer := newTestListener("listenerTransformer", 0, "POD1", "POD2") informer.AddEventHandler(listenerTransformer) + var wg wait.Group stop := make(chan struct{}) - go informer.Run(stop) - defer close(stop) + wg.StartWithChannel(stop, informer.Run) + defer func() { + close(stop) + wg.Wait() + }() if !listenerTransformer.ok() { t.Errorf("%s: expected %v, got %v", listenerTransformer.name, listenerTransformer.expectedItemNames, listenerTransformer.receivedItemNames) @@ -757,9 +789,11 @@ func TestSharedInformerHandlerAbuse(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) informerCtx, informerCancel := context.WithCancel(context.Background()) - go func() { - informer.Run(informerCtx.Done()) + var informerWg wait.Group + informerWg.StartWithChannel(informerCtx.Done(), informer.Run) + defer func() { cancel() + informerWg.Wait() }() worker := func() { @@ -881,8 +915,10 @@ func TestStateSharedInformer(t *testing.T) { t.Errorf("informer already stopped after creation") return } + var wg wait.Group stop := make(chan struct{}) - go informer.Run(stop) + wg.StartWithChannel(stop, informer.Run) + defer wg.Wait() if !listener.ok() { t.Errorf("informer did not report initial objects") close(stop) @@ -921,7 +957,9 @@ func TestAddOnStoppedSharedInformer(t *testing.T) { informer := NewSharedInformer(source, &v1.Pod{}, 1*time.Second).(*sharedIndexInformer) listener := newTestListener("listener", 0, "pod1") stop := make(chan struct{}) - go informer.Run(stop) + var wg wait.Group + wg.StartWithChannel(stop, informer.Run) + defer wg.Wait() close(stop) err := wait.PollImmediate(100*time.Millisecond, 2*time.Second, func() (bool, error) { @@ -959,7 +997,9 @@ func TestRemoveOnStoppedSharedInformer(t *testing.T) { return } stop := make(chan struct{}) - go informer.Run(stop) + var wg wait.Group + wg.StartWithChannel(stop, informer.Run) + defer wg.Wait() close(stop) fmt.Println("sleeping") time.Sleep(1 * time.Second) @@ -986,9 +1026,13 @@ func TestRemoveWhileActive(t *testing.T) { handle, _ := informer.AddEventHandler(listener) stop := make(chan struct{}) - defer close(stop) + var wg wait.Group + wg.StartWithChannel(stop, informer.Run) + defer func() { + close(stop) + wg.Wait() + }() - go informer.Run(stop) source.Add(&v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "pod1"}}) if !listener.ok() { @@ -1026,7 +1070,12 @@ func TestAddWhileActive(t *testing.T) { } stop := make(chan struct{}) - defer close(stop) + var wg wait.Group + wg.StartWithChannel(stop, informer.Run) + defer func() { + close(stop) + wg.Wait() + }() go informer.Run(stop) source.Add(&v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "pod1"}}) @@ -1079,8 +1128,6 @@ func TestAddWhileActive(t *testing.T) { func TestShutdown(t *testing.T) { t.Run("no-context", func(t *testing.T) { source := newFakeControllerSource(t) - stop := make(chan struct{}) - defer close(stop) informer := NewSharedInformer(source, &v1.Pod{}, 1*time.Second) handler, err := informer.AddEventHandler(ResourceEventHandlerFuncs{ @@ -1090,17 +1137,30 @@ func TestShutdown(t *testing.T) { defer func() { assert.NoError(t, informer.RemoveEventHandler(handler)) }() - go informer.Run(stop) + + var wg wait.Group + stop := make(chan struct{}) + wg.StartWithChannel(stop, informer.Run) + defer func() { + close(stop) + wg.Wait() + }() + require.Eventually(t, informer.HasSynced, time.Minute, time.Millisecond, "informer has synced") }) t.Run("no-context-later", func(t *testing.T) { source := newFakeControllerSource(t) - stop := make(chan struct{}) - defer close(stop) - informer := NewSharedInformer(source, &v1.Pod{}, 1*time.Second) - go informer.Run(stop) + + var wg wait.Group + stop := make(chan struct{}) + wg.StartWithChannel(stop, informer.Run) + defer func() { + close(stop) + wg.Wait() + }() + require.Eventually(t, informer.HasSynced, time.Minute, time.Millisecond, "informer has synced") handler, err := informer.AddEventHandler(ResourceEventHandlerFuncs{ @@ -1122,3 +1182,152 @@ func TestShutdown(t *testing.T) { // and it doesn't matter that nothing gets stopped or removed. }) } + +func TestEventPanics(t *testing.T) { + // timeInUTC := time.Date(2009, 12, 1, 13, 30, 40, 42000, time.UTC) + // timeString := "1201 13:30:40.000042" + // Initialized by init. + var ( + buffer threadSafeBuffer + logger klog.Logger + source *fcache.FakeControllerSource + ) + + init := func(t *testing.T) { + // Restoring state is very sensitive to ordering. All goroutines spawned + // by a test must have completed and there has to be a check that they + // have completed that is visible to the race detector. This also + // applies to all other tests! + t.Cleanup(klog.CaptureState().Restore) //nolint:logcheck // CaptureState shouldn't be used in packages with contextual logging, but here it is okay. + buffer.buffer.Reset() + logger = textlogger.NewLogger(textlogger.NewConfig( + // textlogger.FixedTime(timeInUTC), + textlogger.Output(&buffer), + )) + oldReallyCrash := utilruntime.ReallyCrash + utilruntime.ReallyCrash = false + t.Cleanup(func() { utilruntime.ReallyCrash = oldReallyCrash }) + + source = newFakeControllerSource(t) + source.Add(&v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "pod1"}}) + } + + newHandler := func(ctx context.Context) ResourceEventHandlerFuncs { + logger := klog.FromContext(ctx) + return ResourceEventHandlerFuncs{ + AddFunc: func(obj any) { + logger.Info("Add func will panic now", "pod", klog.KObj(obj.(*v1.Pod))) + panic("fake panic") + }, + } + } + _, _, panicLine, _ := runtime.Caller(0) + panicLine -= 4 + expectedLog := func(name string) string { + if name == "" { + return fmt.Sprintf(`shared_informer_test.go:%d] "Observed a panic" panic="fake panic"`, panicLine) + } + return fmt.Sprintf(`shared_informer_test.go:%d] "Observed a panic" logger=%q panic="fake panic"`, panicLine, name) + } + handler := newHandler(context.Background()) + + t.Run("simple", func(t *testing.T) { + init(t) + klog.SetLogger(logger) + stop := make(chan struct{}) + informer := NewSharedInformer(source, &v1.Pod{}, 1*time.Second) + handle, err := informer.AddEventHandler(handler) + require.NoError(t, err) + defer func() { + assert.NoError(t, informer.RemoveEventHandler(handle)) + }() + var wg wait.Group + wg.StartWithChannel(stop, informer.Run) + defer func() { + close(stop) + assert.Eventually(t, informer.IsStopped, time.Minute, time.Millisecond, "informer has stopped") + wg.Wait() // For race detector... + }() + require.Eventually(t, informer.HasSynced, time.Minute, time.Millisecond, "informer has synced") + + // This times out (https://github.com/kubernetes/kubernetes/issues/129024) because the + // handler never syncs when the callback panics: + // require.Eventually(t, handle.HasSynced, time.Minute, time.Millisecond, "handler has synced") + // + // Wait for a non-empty buffer instead. This implies that we have to make + // the buffer thread-safe, which wouldn't be necessary otherwise. + assert.EventuallyWithT(t, func(t *assert.CollectT) { + assert.Contains(t, buffer.String(), expectedLog("")) + }, time.Minute, time.Millisecond, "handler has panicked") + }) + + t.Run("many", func(t *testing.T) { + init(t) + // One pod was already created in init, add some more. + numPods := 5 + for i := 1; i < numPods; i++ { + source.Add(&v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: fmt.Sprintf("pod%d", i+1)}}) + } + _, ctx := ktesting.NewTestContext(t) + ctx, cancel := context.WithCancel(ctx) + defer cancel() + + informer := NewSharedInformer(source, &v1.Pod{}, 1*time.Second) + name1 := "fake-event-handler-1" + logger1 := klog.LoggerWithName(logger, name1) + ctx1 := klog.NewContext(ctx, logger1) + handle1, err := informer.AddEventHandlerWithOptions(newHandler(ctx1), HandlerOptions{Logger: &logger1}) + require.NoError(t, err) + defer func() { + assert.NoError(t, informer.RemoveEventHandler(handle1)) + }() + name2 := "fake-event-handler-2" + logger2 := klog.LoggerWithName(logger, name2) + ctx2 := klog.NewContext(ctx, logger2) + handle2, err := informer.AddEventHandlerWithOptions(newHandler(ctx2), HandlerOptions{Logger: &logger2}) + require.NoError(t, err) + defer func() { + assert.NoError(t, informer.RemoveEventHandler(handle2)) + }() + + start := time.Now() + var wg wait.Group + informerName := "informer" + informerLogger := klog.LoggerWithName(logger, informerName) + informerCtx := klog.NewContext(ctx, informerLogger) + wg.StartWithContext(informerCtx, informer.RunWithContext) + defer func() { + cancel() + assert.Eventually(t, informer.IsStopped, time.Minute, time.Millisecond, "informer has stopped") + wg.Wait() // For race detector... + }() + require.Eventually(t, informer.HasSynced, time.Minute, time.Millisecond, "informer has synced") + + assert.EventuallyWithT(t, func(t *assert.CollectT) { + output := buffer.String() + expected := expectedLog(name1) + if !assert.Equal(t, numPods, numOccurrences(output, expected), "Log output should have the right number of panics for %q (search string: %q), got instead:\n%s", name1, expected, output) { + return + } + expected = expectedLog(name2) + assert.Equal(t, numPods, numOccurrences(output, expected), "Log output should have the right number of panics for %q (search string %q, got instead:\n%s", name2, expected, output) + }, 30*time.Second, time.Millisecond, "handler has panicked") + + // Both handlers should have slept for one second after each panic, + // except after the last pod event because then the input channel + // gets closed. + assert.GreaterOrEqual(t, time.Since(start), time.Duration(numPods-1)*time.Second, "Delay in processorListener.run") + }) +} + +func numOccurrences(hay, needle string) int { + count := 0 + for { + index := strings.Index(hay, needle) + if index < 0 { + return count + } + count++ + hay = hay[index+len(needle):] + } +} diff --git a/tools/cache/util_test.go b/tools/cache/util_test.go index 6d073ca7..ed19e3eb 100644 --- a/tools/cache/util_test.go +++ b/tools/cache/util_test.go @@ -17,6 +17,8 @@ limitations under the License. package cache import ( + "bytes" + "sync" "testing" fcache "k8s.io/client-go/tools/cache/testing" @@ -27,3 +29,21 @@ func newFakeControllerSource(tb testing.TB) *fcache.FakeControllerSource { tb.Cleanup(source.Shutdown) return source } + +// threadSafeBuffer is a thread-safe wrapper around bytes.Buffer. +type threadSafeBuffer struct { + buffer bytes.Buffer + mu sync.Mutex +} + +func (b *threadSafeBuffer) Write(p []byte) (n int, err error) { + b.mu.Lock() + defer b.mu.Unlock() + return b.buffer.Write(p) +} + +func (b *threadSafeBuffer) String() string { + b.mu.Lock() + defer b.mu.Unlock() + return b.buffer.String() +}