diff --git a/examples/workqueue/main.go b/examples/workqueue/main.go index b8825dc1..eab9daba 100644 --- a/examples/workqueue/main.go +++ b/examples/workqueue/main.go @@ -17,6 +17,8 @@ limitations under the License. package main import ( + "context" + "errors" "flag" "fmt" "time" @@ -116,30 +118,30 @@ func (c *Controller) handleErr(err error, key string) { } // Run begins watching and syncing. -func (c *Controller) Run(workers int, stopCh chan struct{}) { - defer runtime.HandleCrash() +func (c *Controller) Run(ctx context.Context, workers int) { + defer runtime.HandleCrashWithContext(ctx) // Let the workers stop when we are done defer c.queue.ShutDown() klog.Info("Starting Pod controller") - go c.informer.Run(stopCh) + go c.informer.RunWithContext(ctx) // Wait for all involved caches to be synced, before processing items from the queue is started - if !cache.WaitForCacheSync(stopCh, c.informer.HasSynced) { + if !cache.WaitForNamedCacheSyncWithContext(ctx, c.informer.HasSynced) { runtime.HandleError(fmt.Errorf("Timed out waiting for caches to sync")) return } for i := 0; i < workers; i++ { - go wait.Until(c.runWorker, time.Second, stopCh) + go wait.UntilWithContext(ctx, c.runWorker, time.Second) } - <-stopCh + <-ctx.Done() klog.Info("Stopping Pod controller") } -func (c *Controller) runWorker() { +func (c *Controller) runWorker(ctx context.Context) { for c.processNextItem() { } } @@ -164,6 +166,8 @@ func main() { klog.Fatal(err) } + ctx := context.Background() + // create the pod watcher podListWatcher := cache.NewListWatchFromClient(clientset.CoreV1().RESTClient(), "pods", v1.NamespaceDefault, fields.Everything()) @@ -211,9 +215,9 @@ func main() { }) // Now let's start the controller - stop := make(chan struct{}) - defer close(stop) - go controller.Run(1, stop) + cancelCtx, cancel := context.WithCancelCause(ctx) + defer cancel(errors.New("time to stop because main has completed")) + go controller.Run(cancelCtx, 1) // Wait forever select {} diff --git a/tools/cache/cache_test.go b/tools/cache/cache_test.go new file mode 100644 index 00000000..614ffba2 --- /dev/null +++ b/tools/cache/cache_test.go @@ -0,0 +1,25 @@ +/* +Copyright 2024 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package cache + +import ( + "k8s.io/klog/v2" +) + +func init() { + klog.InitFlags(nil) +} diff --git a/tools/cache/controller.go b/tools/cache/controller.go index e523a665..41ead09e 100644 --- a/tools/cache/controller.go +++ b/tools/cache/controller.go @@ -17,6 +17,7 @@ limitations under the License. package cache import ( + "context" "errors" "sync" "time" @@ -79,8 +80,14 @@ type Config struct { RetryOnError bool // Called whenever the ListAndWatch drops the connection with an error. + // + // Contextual logging: WatchErrorHandlerWithContext should be used instead of WatchErrorHandler in code which supports contextual logging. WatchErrorHandler WatchErrorHandler + // Called whenever the ListAndWatch drops the connection with an error + // and WatchErrorHandler is not set. + WatchErrorHandlerWithContext WatchErrorHandlerWithContext + // WatchListPageSize is the requested chunk size of initial and relist watch lists. WatchListPageSize int64 } @@ -104,12 +111,21 @@ type controller struct { // Controller is a low-level controller that is parameterized by a // Config and used in sharedIndexInformer. type Controller interface { - // Run does two things. One is to construct and run a Reflector + // RunWithContext does two things. One is to construct and run a Reflector // to pump objects/notifications from the Config's ListerWatcher // to the Config's Queue and possibly invoke the occasional Resync // on that Queue. The other is to repeatedly Pop from the Queue // and process with the Config's ProcessFunc. Both of these - // continue until `stopCh` is closed. + // continue until the context is canceled. + // + // It's an error to call RunWithContext more than once. + // RunWithContext blocks; call via go. + RunWithContext(ctx context.Context) + + // Run does the same as RunWithContext with a stop channel instead of + // a context. + // + // Contextual logging: RunWithcontext should be used instead of Run in code which supports contextual logging. Run(stopCh <-chan struct{}) // HasSynced delegates to the Config's Queue @@ -129,13 +145,16 @@ func New(c *Config) Controller { return ctlr } -// Run begins processing items, and will continue until a value is sent down stopCh or it is closed. -// It's an error to call Run more than once. -// Run blocks; call via go. +// Run implements [Controller.Run]. func (c *controller) Run(stopCh <-chan struct{}) { - defer utilruntime.HandleCrash() + c.RunWithContext(wait.ContextForChannel(stopCh)) +} + +// RunWithContext implements [Controller.RunWithContext]. +func (c *controller) RunWithContext(ctx context.Context) { + defer utilruntime.HandleCrashWithContext(ctx) go func() { - <-stopCh + <-ctx.Done() c.config.Queue.Close() }() r := NewReflectorWithOptions( @@ -152,7 +171,11 @@ func (c *controller) Run(stopCh <-chan struct{}) { r.ShouldResync = c.config.ShouldResync r.WatchListPageSize = c.config.WatchListPageSize if c.config.WatchErrorHandler != nil { - r.watchErrorHandler = c.config.WatchErrorHandler + r.watchErrorHandler = func(_ context.Context, r *Reflector, err error) { + c.config.WatchErrorHandler(r, err) + } + } else if c.config.WatchErrorHandlerWithContext != nil { + r.watchErrorHandler = c.config.WatchErrorHandlerWithContext } c.reflectorMutex.Lock() @@ -161,9 +184,9 @@ func (c *controller) Run(stopCh <-chan struct{}) { var wg wait.Group - wg.StartWithChannel(stopCh, r.Run) + wg.StartWithContext(ctx, r.RunWithContext) - wait.Until(c.processLoop, time.Second, stopCh) + wait.UntilWithContext(ctx, c.processLoop, time.Second) wg.Wait() } @@ -185,13 +208,11 @@ func (c *controller) LastSyncResourceVersion() string { // TODO: Consider doing the processing in parallel. This will require a little thought // to make sure that we don't end up processing the same object multiple times // concurrently. -// -// TODO: Plumb through the stopCh here (and down to the queue) so that this can -// actually exit when the controller is stopped. Or just give up on this stuff -// ever being stoppable. Converting this whole package to use Context would -// also be helpful. -func (c *controller) processLoop() { +func (c *controller) processLoop(ctx context.Context) { for { + // TODO: Plumb through the ctx so that this can + // actually exit when the controller is stopped. Or just give up on this stuff + // ever being stoppable. obj, err := c.config.Queue.Pop(PopProcessFunc(c.config.Process)) if err != nil { if err == ErrFIFOClosed { diff --git a/tools/cache/controller_test.go b/tools/cache/controller_test.go index 232585c8..6b263bc8 100644 --- a/tools/cache/controller_test.go +++ b/tools/cache/controller_test.go @@ -17,6 +17,7 @@ limitations under the License. package cache import ( + "context" "fmt" "math/rand" "sync" @@ -32,6 +33,7 @@ import ( "k8s.io/apimachinery/pkg/util/wait" "k8s.io/apimachinery/pkg/watch" fcache "k8s.io/client-go/tools/cache/testing" + "k8s.io/klog/v2/ktesting" fuzz "github.com/google/gofuzz" ) @@ -98,10 +100,10 @@ func Example() { }, } - // Create the controller and run it until we close stop. - stop := make(chan struct{}) - defer close(stop) - go New(cfg).Run(stop) + // Create the controller and run it until we cancel. + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + go New(cfg).RunWithContext(ctx) // Let's add a few objects to the source. testIDs := []string{"a-hello", "b-controller", "c-framework"} @@ -156,10 +158,10 @@ func ExampleNewInformer() { }, ) - // Run the controller and run it until we close stop. - stop := make(chan struct{}) - defer close(stop) - go controller.Run(stop) + // Run the controller and run it until we cancel. + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + go controller.RunWithContext(ctx) // Let's add a few objects to the source. testIDs := []string{"a-hello", "b-controller", "c-framework"} @@ -227,9 +229,10 @@ func TestHammerController(t *testing.T) { t.Errorf("Expected HasSynced() to return false before we started the controller") } - // Run the controller and run it until we close stop. - stop := make(chan struct{}) - go controller.Run(stop) + // Run the controller and run it until we cancel. + _, ctx := ktesting.NewTestContext(t) + ctx, cancel := context.WithCancel(ctx) + go controller.RunWithContext(ctx) // Let's wait for the controller to do its initial sync wait.Poll(100*time.Millisecond, wait.ForeverTestTimeout, func() (bool, error) { @@ -288,7 +291,7 @@ func TestHammerController(t *testing.T) { // Let's wait for the controller to finish processing the things we just added. // TODO: look in the queue to see how many items need to be processed. time.Sleep(100 * time.Millisecond) - close(stop) + cancel() // TODO: Verify that no goroutines were leaked here and that everything shut // down cleanly. @@ -385,11 +388,13 @@ func TestUpdate(t *testing.T) { }, ) - // Run the controller and run it until we close stop. + // Run the controller and run it until we cancel. // Once Run() is called, calls to testDoneWG.Done() might start, so // all testDoneWG.Add() calls must happen before this point - stop := make(chan struct{}) - go controller.Run(stop) + _, ctx := ktesting.NewTestContext(t) + ctx, cancel := context.WithCancel(ctx) + defer cancel() + go controller.RunWithContext(ctx) <-watchCh // run every test a few times, in parallel @@ -407,7 +412,6 @@ func TestUpdate(t *testing.T) { // Let's wait for the controller to process the things we just added. testDoneWG.Wait() - close(stop) } func TestPanicPropagated(t *testing.T) { @@ -427,9 +431,10 @@ func TestPanicPropagated(t *testing.T) { }, ) - // Run the controller and run it until we close stop. - stop := make(chan struct{}) - defer close(stop) + // Run the controller and run it until we cancel. + _, ctx := ktesting.NewTestContext(t) + ctx, cancel := context.WithCancel(ctx) + defer cancel() propagated := make(chan interface{}) go func() { @@ -438,7 +443,7 @@ func TestPanicPropagated(t *testing.T) { propagated <- r } }() - controller.Run(stop) + controller.RunWithContext(ctx) }() // Let's add a object to the source. It will trigger a panic. source.Add(&v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "test"}}) @@ -555,8 +560,10 @@ func TestTransformingInformer(t *testing.T) { } } - stopCh := make(chan struct{}) - go controller.Run(stopCh) + _, ctx := ktesting.NewTestContext(t) + ctx, cancel := context.WithCancel(ctx) + defer cancel() + go controller.RunWithContext(ctx) verifyEvent(watch.Added, nil, expectedPod("pod1", "2")) verifyStore([]interface{}{expectedPod("pod1", "2")}) @@ -574,11 +581,17 @@ func TestTransformingInformer(t *testing.T) { source.Delete(makePod("pod1", "2")) verifyEvent(watch.Deleted, expectedPod("pod1", "2"), nil) verifyStore([]interface{}{expectedPod("pod2", "2"), expectedPod("pod3", "1")}) - - close(stopCh) } func TestTransformingInformerRace(t *testing.T) { + _, ctx := ktesting.NewTestContext(t) + // Canceled *only* when the test is done. + testCtx, cancel := context.WithCancel(ctx) + defer cancel() + // Canceled *also* during the test. + ctx, cancel = context.WithCancel(ctx) + defer cancel() + // source simulates an apiserver object endpoint. source := newFakeControllerSource(t) @@ -618,7 +631,11 @@ func TestTransformingInformerRace(t *testing.T) { type event struct{} events := make(chan event, numObjs) recordEvent := func(eventType watch.EventType, previous, current interface{}) { - events <- event{} + select { + case events <- event{}: + case <-testCtx.Done(): + // Don't block forever in the write above when test is already done. + } } checkEvents := func(count int) { for i := 0; i < count; i++ { @@ -637,8 +654,7 @@ func TestTransformingInformerRace(t *testing.T) { podTransformer, ) - stopCh := make(chan struct{}) - go controller.Run(stopCh) + go controller.RunWithContext(ctx) checkEvents(numObjs) @@ -652,7 +668,7 @@ func TestTransformingInformerRace(t *testing.T) { key := fmt.Sprintf("namespace/pod-%d", index) for { select { - case <-stopCh: + case <-ctx.Done(): return default: } @@ -674,7 +690,7 @@ func TestTransformingInformerRace(t *testing.T) { // Let resyncs to happen for some time. time.Sleep(time.Second) - close(stopCh) + cancel() wg.Wait() close(errors) for err := range errors { diff --git a/tools/cache/delta_fifo.go b/tools/cache/delta_fifo.go index ce74dfb6..4bb526cd 100644 --- a/tools/cache/delta_fifo.go +++ b/tools/cache/delta_fifo.go @@ -55,6 +55,9 @@ type DeltaFIFOOptions struct { // If set, will be called for objects before enqueueing them. Please // see the comment on TransformFunc for details. Transformer TransformFunc + + // If set, log output will go to this logger instead of klog.Background(). + Logger *klog.Logger } // DeltaFIFO is like FIFO, but differs in two ways. One is that the @@ -136,6 +139,10 @@ type DeltaFIFO struct { // Called with every object if non-nil. transformer TransformFunc + + // logger is a per-instance logger. This gets chosen when constructing + // the instance, with klog.Background() as default. + logger klog.Logger } // TransformFunc allows for transforming an object before it will be processed. @@ -253,6 +260,10 @@ func NewDeltaFIFOWithOptions(opts DeltaFIFOOptions) *DeltaFIFO { emitDeltaTypeReplaced: opts.EmitDeltaTypeReplaced, transformer: opts.Transformer, + logger: klog.Background(), + } + if opts.Logger != nil { + f.logger = *opts.Logger } f.cond.L = &f.lock return f @@ -487,10 +498,10 @@ func (f *DeltaFIFO) queueActionInternalLocked(actionType, internalActionType Del // when given a non-empty list (as it is here). // If somehow it happens anyway, deal with it but complain. if oldDeltas == nil { - klog.Errorf("Impossible dedupDeltas for id=%q: oldDeltas=%#+v, obj=%#+v; ignoring", id, oldDeltas, obj) + f.logger.Error(nil, "Impossible dedupDeltas, ignoring", "id", id, "oldDeltas", oldDeltas, "obj", obj) return nil } - klog.Errorf("Impossible dedupDeltas for id=%q: oldDeltas=%#+v, obj=%#+v; breaking invariant by storing empty Deltas", id, oldDeltas, obj) + f.logger.Error(nil, "Impossible dedupDeltas, breaking invariant by storing empty Deltas", "id", id, "oldDeltas", oldDeltas, "obj", obj) f.items[id] = newDeltas return fmt.Errorf("Impossible dedupDeltas for id=%q: oldDeltas=%#+v, obj=%#+v; broke DeltaFIFO invariant by storing empty Deltas", id, oldDeltas, obj) } @@ -597,7 +608,7 @@ func (f *DeltaFIFO) Pop(process PopProcessFunc) (interface{}, error) { item, ok := f.items[id] if !ok { // This should never happen - klog.Errorf("Inconceivable! %q was in f.queue but not f.items; ignoring.", id) + f.logger.Error(nil, "Inconceivable! Item was in f.queue but not f.items; ignoring", "id", id) continue } delete(f.items, id) @@ -694,10 +705,10 @@ func (f *DeltaFIFO) Replace(list []interface{}, _ string) error { deletedObj, exists, err := f.knownObjects.GetByKey(k) if err != nil { deletedObj = nil - klog.Errorf("Unexpected error %v during lookup of key %v, placing DeleteFinalStateUnknown marker without object", err, k) + f.logger.Error(err, "Unexpected error during lookup, placing DeleteFinalStateUnknown marker without object", "key", k) } else if !exists { deletedObj = nil - klog.Infof("Key %v does not exist in known objects store, placing DeleteFinalStateUnknown marker without object", k) + f.logger.Info("Key does not exist in known objects store, placing DeleteFinalStateUnknown marker without object", "key", k) } queuedDeletions++ if err := f.queueActionLocked(Deleted, DeletedFinalStateUnknown{k, deletedObj}); err != nil { @@ -737,10 +748,10 @@ func (f *DeltaFIFO) Resync() error { func (f *DeltaFIFO) syncKeyLocked(key string) error { obj, exists, err := f.knownObjects.GetByKey(key) if err != nil { - klog.Errorf("Unexpected error %v during lookup of key %v, unable to queue object for sync", err, key) + f.logger.Error(err, "Unexpected error during lookup, unable to queue object for sync", "key", key) return nil } else if !exists { - klog.Infof("Key %v does not exist in known objects store, unable to queue object for sync", key) + f.logger.Info("Key does not exist in known objects store, unable to queue object for sync", "key", key) return nil } diff --git a/tools/cache/listers.go b/tools/cache/listers.go index a60f4494..9e050ff4 100644 --- a/tools/cache/listers.go +++ b/tools/cache/listers.go @@ -62,7 +62,12 @@ func ListAllByNamespace(indexer Indexer, namespace string, selector labels.Selec items, err := indexer.Index(NamespaceIndex, &metav1.ObjectMeta{Namespace: namespace}) if err != nil { // Ignore error; do slow search without index. - klog.Warningf("can not retrieve list of objects using index : %v", err) + // + // ListAllByNamespace is called by generated code + // (k8s.io/client-go/listers) and probably not worth converting + // to contextual logging, which would require changing all of + // those APIs. + klog.TODO().Info("Warning: can not retrieve list of objects using index", "err", err) for _, m := range indexer.List() { metadata, err := meta.Accessor(m) if err != nil { diff --git a/tools/cache/main_test.go b/tools/cache/main_test.go index 12817a07..b0ed00f3 100644 --- a/tools/cache/main_test.go +++ b/tools/cache/main_test.go @@ -30,14 +30,6 @@ func TestMain(m *testing.M) { goleak.IgnoreAnyFunction("k8s.io/client-go/tools/cache.TestFIFO_addUpdate.func1"), goleak.IgnoreAnyFunction("k8s.io/client-go/tools/cache.TestDeltaFIFO_addReplace.func1"), goleak.IgnoreAnyFunction("k8s.io/client-go/tools/cache.TestDeltaFIFO_addUpdate.func1"), - - // TODO: fix the following tests by adding WithContext APIs and cancellation. - goleak.IgnoreAnyFunction("k8s.io/client-go/tools/cache.TestTransformingInformerRace.func3"), - // Created by k8s.io/client-go/tools/cache.TestReflectorListAndWatch, cannot filter on that (https://github.com/uber-go/goleak/issues/135): - goleak.IgnoreAnyFunction("k8s.io/client-go/tools/cache.(*Reflector).ListAndWatch"), - goleak.IgnoreAnyFunction("k8s.io/client-go/tools/cache.(*Reflector).startResync"), - // ??? - goleak.IgnoreAnyFunction("k8s.io/client-go/tools/cache.(*DeltaFIFO).Close"), } goleak.VerifyTestMain(m, options...) } diff --git a/tools/cache/mutation_cache.go b/tools/cache/mutation_cache.go index c6f953d8..6800a625 100644 --- a/tools/cache/mutation_cache.go +++ b/tools/cache/mutation_cache.go @@ -60,7 +60,7 @@ type ResourceVersionComparator interface { // If includeAdds is true, objects in the mutation cache will be returned even if they don't exist // in the underlying store. This is only safe if your use of the cache can handle mutation entries // remaining in the cache for up to ttl when mutations and deletes occur very closely in time. -func NewIntegerResourceVersionMutationCache(backingCache Store, indexer Indexer, ttl time.Duration, includeAdds bool) MutationCache { +func NewIntegerResourceVersionMutationCache(logger klog.Logger, backingCache Store, indexer Indexer, ttl time.Duration, includeAdds bool) MutationCache { return &mutationCache{ backingCache: backingCache, indexer: indexer, @@ -68,6 +68,7 @@ func NewIntegerResourceVersionMutationCache(backingCache Store, indexer Indexer, comparator: etcdObjectVersioner{}, ttl: ttl, includeAdds: includeAdds, + logger: logger, } } @@ -75,6 +76,7 @@ func NewIntegerResourceVersionMutationCache(backingCache Store, indexer Indexer, // since you can't distinguish between, "didn't observe create" and "was deleted after create", // if the key is missing from the backing cache, we always return it as missing type mutationCache struct { + logger klog.Logger lock sync.Mutex backingCache Store indexer Indexer @@ -157,7 +159,7 @@ func (c *mutationCache) ByIndex(name string, indexKey string) ([]interface{}, er } elements, err := fn(updated) if err != nil { - klog.V(4).Infof("Unable to calculate an index entry for mutation cache entry %s: %v", key, err) + c.logger.V(4).Info("Unable to calculate an index entry for mutation cache entry", "key", key, "err", err) continue } for _, inIndex := range elements { @@ -204,7 +206,7 @@ func (c *mutationCache) Mutation(obj interface{}) { key, err := DeletionHandlingMetaNamespaceKeyFunc(obj) if err != nil { // this is a "nice to have", so failures shouldn't do anything weird - utilruntime.HandleError(err) + utilruntime.HandleErrorWithLogger(c.logger, err, "DeletionHandlingMetaNamespaceKeyFunc") return } diff --git a/tools/cache/mutation_detector.go b/tools/cache/mutation_detector.go index b37537cb..27ea62bf 100644 --- a/tools/cache/mutation_detector.go +++ b/tools/cache/mutation_detector.go @@ -50,6 +50,7 @@ func NewCacheMutationDetector(name string) MutationDetector { if !mutationDetectionEnabled { return dummyMutationDetector{} } + //nolint:logcheck // This code shouldn't be used in production. klog.Warningln("Mutation detector is enabled, this will result in memory leakage.") return &defaultCacheMutationDetector{name: name, period: 1 * time.Second, retainDuration: 2 * time.Minute} } diff --git a/tools/cache/processor_listener_test.go b/tools/cache/processor_listener_test.go index fd658197..517258a1 100644 --- a/tools/cache/processor_listener_test.go +++ b/tools/cache/processor_listener_test.go @@ -22,6 +22,7 @@ import ( "time" "k8s.io/apimachinery/pkg/util/wait" + "k8s.io/klog/v2" ) const ( @@ -35,7 +36,7 @@ func BenchmarkListener(b *testing.B) { swg.Add(b.N) b.SetParallelism(concurrencyLevel) // Preallocate enough space so that benchmark does not run out of it - pl := newProcessListener(&ResourceEventHandlerFuncs{ + pl := newProcessListener(klog.Background(), &ResourceEventHandlerFuncs{ AddFunc: func(obj interface{}) { swg.Done() }, diff --git a/tools/cache/reflector.go b/tools/cache/reflector.go index 030b4529..f8dac4f9 100644 --- a/tools/cache/reflector.go +++ b/tools/cache/reflector.go @@ -95,7 +95,7 @@ type Reflector struct { // lastSyncResourceVersionMutex guards read/write access to lastSyncResourceVersion lastSyncResourceVersionMutex sync.RWMutex // Called whenever the ListAndWatch drops the connection with an error. - watchErrorHandler WatchErrorHandler + watchErrorHandler WatchErrorHandlerWithContext // WatchListPageSize is the requested chunk size of initial and resync watch lists. // If unset, for consistent reads (RV="") or reads that opt-into arbitrarily old data // (RV="0") it will default to pager.PageSize, for the rest (RV != "" && RV != "0") @@ -150,20 +150,32 @@ type ResourceVersionUpdater interface { // should be offloaded. type WatchErrorHandler func(r *Reflector, err error) -// DefaultWatchErrorHandler is the default implementation of WatchErrorHandler -func DefaultWatchErrorHandler(r *Reflector, err error) { +// The WatchErrorHandler is called whenever ListAndWatch drops the +// connection with an error. After calling this handler, the informer +// will backoff and retry. +// +// The default implementation looks at the error type and tries to log +// the error message at an appropriate level. +// +// Implementations of this handler may display the error message in other +// ways. Implementations should return quickly - any expensive processing +// should be offloaded. +type WatchErrorHandlerWithContext func(ctx context.Context, r *Reflector, err error) + +// DefaultWatchErrorHandler is the default implementation of WatchErrorHandlerWithContext. +func DefaultWatchErrorHandler(ctx context.Context, r *Reflector, err error) { switch { case isExpiredError(err): // Don't set LastSyncResourceVersionUnavailable - LIST call with ResourceVersion=RV already // has a semantic that it returns data at least as fresh as provided RV. // So first try to LIST with setting RV to resource version of last observed object. - klog.V(4).Infof("%s: watch of %v closed with: %v", r.name, r.typeDescription, err) + klog.FromContext(ctx).V(4).Info("Watch closed", "reflector", r.name, "type", r.typeDescription, "err", err) case err == io.EOF: // watch closed normally case err == io.ErrUnexpectedEOF: - klog.V(1).Infof("%s: Watch for %v closed with unexpected EOF: %v", r.name, r.typeDescription, err) + klog.FromContext(ctx).V(1).Info("Watch closed with unexpected EOF", "reflector", r.name, "type", r.typeDescription, "err", err) default: - utilruntime.HandleError(fmt.Errorf("%s: Failed to watch %v: %v", r.name, r.typeDescription, err)) + utilruntime.HandleErrorWithContext(ctx, err, "Failed to watch", "reflector", r.name, "type", r.typeDescription) } } @@ -243,7 +255,7 @@ func NewReflectorWithOptions(lw ListerWatcher, expectedType interface{}, store S // 0.22 QPS. If we don't backoff for 2min, assume API server is healthy and we reset the backoff. backoffManager: wait.NewExponentialBackoffManager(800*time.Millisecond, 30*time.Second, 2*time.Minute, 2.0, 1.0, reflectorClock), clock: reflectorClock, - watchErrorHandler: WatchErrorHandler(DefaultWatchErrorHandler), + watchErrorHandler: WatchErrorHandlerWithContext(DefaultWatchErrorHandler), expectedType: reflect.TypeOf(expectedType), } @@ -309,14 +321,24 @@ var internalPackages = []string{"client-go/tools/cache/"} // Run repeatedly uses the reflector's ListAndWatch to fetch all the // objects and subsequent deltas. // Run will exit when stopCh is closed. +// +// Contextual logging: RunWithContext should be used instead of Run in code which supports contextual logging. func (r *Reflector) Run(stopCh <-chan struct{}) { - klog.V(3).Infof("Starting reflector %s (%s) from %s", r.typeDescription, r.resyncPeriod, r.name) + r.RunWithContext(wait.ContextForChannel(stopCh)) +} + +// RunWithContext repeatedly uses the reflector's ListAndWatch to fetch all the +// objects and subsequent deltas. +// Run will exit when the context is canceled. +func (r *Reflector) RunWithContext(ctx context.Context) { + logger := klog.FromContext(ctx) + logger.V(3).Info("Starting reflector", "type", r.typeDescription, "resyncPeriod", r.resyncPeriod, "reflector", r.name) wait.BackoffUntil(func() { - if err := r.ListAndWatch(stopCh); err != nil { - r.watchErrorHandler(r, err) + if err := r.ListAndWatchWithContext(ctx); err != nil { + r.watchErrorHandler(ctx, r, err) } - }, r.backoffManager, true, stopCh) - klog.V(3).Infof("Stopping reflector %s (%s) from %s", r.typeDescription, r.resyncPeriod, r.name) + }, r.backoffManager, true, ctx.Done()) + logger.V(3).Info("Stopping reflector", "type", r.typeDescription, "resyncPeriod", r.resyncPeriod, "reflector", r.name) } var ( @@ -345,21 +367,31 @@ func (r *Reflector) resyncChan() (<-chan time.Time, func() bool) { // ListAndWatch first lists all items and get the resource version at the moment of call, // and then use the resource version to watch. // It returns error if ListAndWatch didn't even try to initialize watch. +// +// Contextual logging: ListAndWatchWithContext should be used instead of ListAndWatch in code which supports contextual logging. func (r *Reflector) ListAndWatch(stopCh <-chan struct{}) error { - klog.V(3).Infof("Listing and watching %v from %s", r.typeDescription, r.name) + return r.ListAndWatchWithContext(wait.ContextForChannel(stopCh)) +} + +// ListAndWatchWithContext first lists all items and get the resource version at the moment of call, +// and then use the resource version to watch. +// It returns error if ListAndWatchWithContext didn't even try to initialize watch. +func (r *Reflector) ListAndWatchWithContext(ctx context.Context) error { + logger := klog.FromContext(ctx) + logger.V(3).Info("Listing and watching", "type", r.typeDescription, "reflector", r.name) var err error var w watch.Interface useWatchList := ptr.Deref(r.UseWatchList, false) fallbackToList := !useWatchList if useWatchList { - w, err = r.watchList(stopCh) + w, err = r.watchList(ctx) if w == nil && err == nil { // stopCh was closed return nil } if err != nil { - klog.Warningf("The watchlist request ended with an error, falling back to the standard LIST/WATCH semantics because making progress is better than deadlocking, err = %v", err) + logger.Error(err, "The watchlist request ended with an error, falling back to the standard LIST/WATCH semantics because making progress is better than deadlocking") fallbackToList = true // ensure that we won't accidentally pass some garbage down the watch. w = nil @@ -367,20 +399,21 @@ func (r *Reflector) ListAndWatch(stopCh <-chan struct{}) error { } if fallbackToList { - err = r.list(stopCh) + err = r.list(ctx) if err != nil { return err } } - klog.V(2).Infof("Caches populated for %v from %s", r.typeDescription, r.name) - return r.watchWithResync(w, stopCh) + logger.V(2).Info("Caches populated", "type", r.typeDescription, "reflector", r.name) + return r.watchWithResync(ctx, w) } // startResync periodically calls r.store.Resync() method. // Note that this method is blocking and should be // called in a separate goroutine. -func (r *Reflector) startResync(stopCh <-chan struct{}, cancelCh <-chan struct{}, resyncerrc chan error) { +func (r *Reflector) startResync(ctx context.Context, resyncerrc chan error) { + logger := klog.FromContext(ctx) resyncCh, cleanup := r.resyncChan() defer func() { cleanup() // Call the last one written into cleanup @@ -388,13 +421,11 @@ func (r *Reflector) startResync(stopCh <-chan struct{}, cancelCh <-chan struct{} for { select { case <-resyncCh: - case <-stopCh: - return - case <-cancelCh: + case <-ctx.Done(): return } if r.ShouldResync == nil || r.ShouldResync() { - klog.V(4).Infof("%s: forcing resync", r.name) + logger.V(4).Info("Forcing resync", "reflector", r.name) if err := r.store.Resync(); err != nil { resyncerrc <- err return @@ -406,16 +437,27 @@ func (r *Reflector) startResync(stopCh <-chan struct{}, cancelCh <-chan struct{} } // watchWithResync runs watch with startResync in the background. -func (r *Reflector) watchWithResync(w watch.Interface, stopCh <-chan struct{}) error { +func (r *Reflector) watchWithResync(ctx context.Context, w watch.Interface) error { resyncerrc := make(chan error, 1) - cancelCh := make(chan struct{}) - defer close(cancelCh) - go r.startResync(stopCh, cancelCh, resyncerrc) - return r.watch(w, stopCh, resyncerrc) + cancelCtx, cancel := context.WithCancel(ctx) + // Waiting for completion of the goroutine is relevant for race detector. + // Without this, there is a race between "this function returns + code + // waiting for it" and "goroutine does something". + var wg wait.Group + defer func() { + cancel() + wg.Wait() + }() + wg.Start(func() { + r.startResync(cancelCtx, resyncerrc) + }) + return r.watch(ctx, w, resyncerrc) } // watch simply starts a watch request with the server. -func (r *Reflector) watch(w watch.Interface, stopCh <-chan struct{}, resyncerrc chan error) error { +func (r *Reflector) watch(ctx context.Context, w watch.Interface, resyncerrc chan error) error { + stopCh := ctx.Done() + logger := klog.FromContext(ctx) var err error retry := NewRetryWithDeadline(r.MaxInternalErrorRetryDuration, time.Minute, apierrors.IsInternalError, r.clock) @@ -451,7 +493,7 @@ func (r *Reflector) watch(w watch.Interface, stopCh <-chan struct{}, resyncerrc w, err = r.listerWatcher.Watch(options) if err != nil { if canRetry := isWatchErrorRetriable(err); canRetry { - klog.V(4).Infof("%s: watch of %v returned %v - backing off", r.name, r.typeDescription, err) + logger.V(4).Info("Watch failed - backing off", "reflector", r.name, "type", r.typeDescription, "err", err) select { case <-stopCh: return nil @@ -463,8 +505,8 @@ func (r *Reflector) watch(w watch.Interface, stopCh <-chan struct{}, resyncerrc } } - err = handleWatch(start, w, r.store, r.expectedType, r.expectedGVK, r.name, r.typeDescription, r.setLastSyncResourceVersion, - r.clock, resyncerrc, stopCh) + err = handleWatch(ctx, start, w, r.store, r.expectedType, r.expectedGVK, r.name, r.typeDescription, r.setLastSyncResourceVersion, + r.clock, resyncerrc) // Ensure that watch will not be reused across iterations. w.Stop() w = nil @@ -476,9 +518,9 @@ func (r *Reflector) watch(w watch.Interface, stopCh <-chan struct{}, resyncerrc // Don't set LastSyncResourceVersionUnavailable - LIST call with ResourceVersion=RV already // has a semantic that it returns data at least as fresh as provided RV. // So first try to LIST with setting RV to resource version of last observed object. - klog.V(4).Infof("%s: watch of %v closed with: %v", r.name, r.typeDescription, err) + logger.V(4).Info("Watch closed", "reflector", r.name, "type", r.typeDescription, "err", err) case apierrors.IsTooManyRequests(err): - klog.V(2).Infof("%s: watch of %v returned 429 - backing off", r.name, r.typeDescription) + logger.V(2).Info("Watch returned 429 - backing off", "reflector", r.name, "type", r.typeDescription) select { case <-stopCh: return nil @@ -486,10 +528,10 @@ func (r *Reflector) watch(w watch.Interface, stopCh <-chan struct{}, resyncerrc continue } case apierrors.IsInternalError(err) && retry.ShouldRetry(): - klog.V(2).Infof("%s: retrying watch of %v internal error: %v", r.name, r.typeDescription, err) + logger.V(2).Info("Retrying watch after internal error", "reflector", r.name, "type", r.typeDescription, "err", err) continue default: - klog.Warningf("%s: watch of %v ended with: %v", r.name, r.typeDescription, err) + logger.Info("Warning: watch ended with error", "reflector", r.name, "type", r.typeDescription, "err", err) } } return nil @@ -499,7 +541,7 @@ func (r *Reflector) watch(w watch.Interface, stopCh <-chan struct{}, resyncerrc // list simply lists all items and records a resource version obtained from the server at the moment of the call. // the resource version can be used for further progress notification (aka. watch). -func (r *Reflector) list(stopCh <-chan struct{}) error { +func (r *Reflector) list(ctx context.Context) error { var resourceVersion string options := metav1.ListOptions{ResourceVersion: r.relistResourceVersion()} @@ -558,7 +600,7 @@ func (r *Reflector) list(stopCh <-chan struct{}) error { close(listCh) }() select { - case <-stopCh: + case <-ctx.Done(): return nil case r := <-panicCh: panic(r) @@ -566,7 +608,6 @@ func (r *Reflector) list(stopCh <-chan struct{}) error { } initTrace.Step("Objects listed", trace.Field{Key: "error", Value: err}) if err != nil { - klog.Warningf("%s: failed to list %v: %v", r.name, r.typeDescription, err) return fmt.Errorf("failed to list %v: %w", r.typeDescription, err) } @@ -624,7 +665,9 @@ func (r *Reflector) list(stopCh <-chan struct{}) error { // After receiving a "Bookmark" event the reflector is considered to be synchronized. // It replaces its internal store with the collected items and // reuses the current watch requests for getting further events. -func (r *Reflector) watchList(stopCh <-chan struct{}) (watch.Interface, error) { +func (r *Reflector) watchList(ctx context.Context) (watch.Interface, error) { + stopCh := ctx.Done() + logger := klog.FromContext(ctx) var w watch.Interface var err error var temporaryStore Store @@ -634,7 +677,7 @@ func (r *Reflector) watchList(stopCh <-chan struct{}) (watch.Interface, error) { // could be unified with the r.watch method isErrorRetriableWithSideEffectsFn := func(err error) bool { if canRetry := isWatchErrorRetriable(err); canRetry { - klog.V(2).Infof("%s: watch-list of %v returned %v - backing off", r.name, r.typeDescription, err) + logger.V(2).Info("watch-list failed - backing off", "reflector", r.name, "type", r.typeDescription, "err", err) <-r.backoffManager.Backoff().C() return true } @@ -681,9 +724,9 @@ func (r *Reflector) watchList(stopCh <-chan struct{}) (watch.Interface, error) { } return nil, err } - watchListBookmarkReceived, err := handleListWatch(start, w, temporaryStore, r.expectedType, r.expectedGVK, r.name, r.typeDescription, + watchListBookmarkReceived, err := handleListWatch(ctx, start, w, temporaryStore, r.expectedType, r.expectedGVK, r.name, r.typeDescription, func(rv string) { resourceVersion = rv }, - r.clock, make(chan error), stopCh) + r.clock, make(chan error)) if err != nil { w.Stop() // stop and retry with clean state if errors.Is(err, errorStopRequested) { @@ -706,7 +749,7 @@ func (r *Reflector) watchList(stopCh <-chan struct{}) (watch.Interface, error) { // we utilize the temporaryStore to ensure independence from the current store implementation. // as of today, the store is implemented as a queue and will be drained by the higher-level // component as soon as it finishes replacing the content. - checkWatchListDataConsistencyIfRequested(wait.ContextForChannel(stopCh), r.name, resourceVersion, wrapListFuncWithContext(r.listerWatcher.List), temporaryStore.List) + checkWatchListDataConsistencyIfRequested(ctx, r.name, resourceVersion, wrapListFuncWithContext(r.listerWatcher.List), temporaryStore.List) if err := r.store.Replace(temporaryStore.List(), resourceVersion); err != nil { return nil, fmt.Errorf("unable to sync watch-list result: %w", err) @@ -731,6 +774,7 @@ func (r *Reflector) syncWith(items []runtime.Object, resourceVersion string) err // retry. If successful, the watcher will be left open after receiving the // initial set of objects, to allow watching for future events. func handleListWatch( + ctx context.Context, start time.Time, w watch.Interface, store Store, @@ -741,17 +785,17 @@ func handleListWatch( setLastSyncResourceVersion func(string), clock clock.Clock, errCh chan error, - stopCh <-chan struct{}, ) (bool, error) { exitOnWatchListBookmarkReceived := true - return handleAnyWatch(start, w, store, expectedType, expectedGVK, name, expectedTypeName, - setLastSyncResourceVersion, exitOnWatchListBookmarkReceived, clock, errCh, stopCh) + return handleAnyWatch(ctx, start, w, store, expectedType, expectedGVK, name, expectedTypeName, + setLastSyncResourceVersion, exitOnWatchListBookmarkReceived, clock, errCh) } // handleListWatch consumes events from w, updates the Store, and records the // last seen ResourceVersion, to allow continuing from that ResourceVersion on // retry. The watcher will always be stopped on exit. func handleWatch( + ctx context.Context, start time.Time, w watch.Interface, store Store, @@ -762,11 +806,10 @@ func handleWatch( setLastSyncResourceVersion func(string), clock clock.Clock, errCh chan error, - stopCh <-chan struct{}, ) error { exitOnWatchListBookmarkReceived := false - _, err := handleAnyWatch(start, w, store, expectedType, expectedGVK, name, expectedTypeName, - setLastSyncResourceVersion, exitOnWatchListBookmarkReceived, clock, errCh, stopCh) + _, err := handleAnyWatch(ctx, start, w, store, expectedType, expectedGVK, name, expectedTypeName, + setLastSyncResourceVersion, exitOnWatchListBookmarkReceived, clock, errCh) return err } @@ -779,7 +822,9 @@ func handleWatch( // The watcher will always be stopped, unless exitOnWatchListBookmarkReceived is // true and watchListBookmarkReceived is true. This allows the same watch stream // to be re-used by the caller to continue watching for new events. -func handleAnyWatch(start time.Time, +func handleAnyWatch( + ctx context.Context, + start time.Time, w watch.Interface, store Store, expectedType reflect.Type, @@ -790,17 +835,17 @@ func handleAnyWatch(start time.Time, exitOnWatchListBookmarkReceived bool, clock clock.Clock, errCh chan error, - stopCh <-chan struct{}, ) (bool, error) { watchListBookmarkReceived := false eventCount := 0 - initialEventsEndBookmarkWarningTicker := newInitialEventsEndBookmarkTicker(name, clock, start, exitOnWatchListBookmarkReceived) + logger := klog.FromContext(ctx) + initialEventsEndBookmarkWarningTicker := newInitialEventsEndBookmarkTicker(logger, name, clock, start, exitOnWatchListBookmarkReceived) defer initialEventsEndBookmarkWarningTicker.Stop() loop: for { select { - case <-stopCh: + case <-ctx.Done(): return watchListBookmarkReceived, errorStopRequested case err := <-errCh: return watchListBookmarkReceived, err @@ -813,19 +858,19 @@ loop: } if expectedType != nil { if e, a := expectedType, reflect.TypeOf(event.Object); e != a { - utilruntime.HandleError(fmt.Errorf("%s: expected type %v, but watch event object had type %v", name, e, a)) + utilruntime.HandleErrorWithContext(ctx, nil, "Unexpected watch event object type", "reflector", name, "expectedType", e, "actualType", a) continue } } if expectedGVK != nil { if e, a := *expectedGVK, event.Object.GetObjectKind().GroupVersionKind(); e != a { - utilruntime.HandleError(fmt.Errorf("%s: expected gvk %v, but watch event object had gvk %v", name, e, a)) + utilruntime.HandleErrorWithContext(ctx, nil, "Unexpected watch event object gvk", "reflector", name, "expectedGVK", e, "actualGVK", a) continue } } meta, err := meta.Accessor(event.Object) if err != nil { - utilruntime.HandleError(fmt.Errorf("%s: unable to understand watch event %#v", name, event)) + utilruntime.HandleErrorWithContext(ctx, err, "Unable to understand watch event", "reflector", name, "event", event) continue } resourceVersion := meta.GetResourceVersion() @@ -833,12 +878,12 @@ loop: case watch.Added: err := store.Add(event.Object) if err != nil { - utilruntime.HandleError(fmt.Errorf("%s: unable to add watch event object (%#v) to store: %v", name, event.Object, err)) + utilruntime.HandleErrorWithContext(ctx, err, "Unable to add watch event object to store", "reflector", name, "object", event.Object) } case watch.Modified: err := store.Update(event.Object) if err != nil { - utilruntime.HandleError(fmt.Errorf("%s: unable to update watch event object (%#v) to store: %v", name, event.Object, err)) + utilruntime.HandleErrorWithContext(ctx, err, "Unable to update watch event object to store", "reflector", name, "object", event.Object) } case watch.Deleted: // TODO: Will any consumers need access to the "last known @@ -846,7 +891,7 @@ loop: // to change this. err := store.Delete(event.Object) if err != nil { - utilruntime.HandleError(fmt.Errorf("%s: unable to delete watch event object (%#v) from store: %v", name, event.Object, err)) + utilruntime.HandleErrorWithContext(ctx, err, "Unable to delete watch event object from store", "reflector", name, "object", event.Object) } case watch.Bookmark: // A `Bookmark` means watch has synced here, just update the resourceVersion @@ -854,7 +899,7 @@ loop: watchListBookmarkReceived = true } default: - utilruntime.HandleError(fmt.Errorf("%s: unable to understand watch event %#v", name, event)) + utilruntime.HandleErrorWithContext(ctx, err, "Unknown watch event", "reflector", name, "event", event) } setLastSyncResourceVersion(resourceVersion) if rvu, ok := store.(ResourceVersionUpdater); ok { @@ -863,7 +908,7 @@ loop: eventCount++ if exitOnWatchListBookmarkReceived && watchListBookmarkReceived { watchDuration := clock.Since(start) - klog.V(4).Infof("exiting %v Watch because received the bookmark that marks the end of initial events stream, total %v items received in %v", name, eventCount, watchDuration) + klog.FromContext(ctx).V(4).Info("Exiting watch because received the bookmark that marks the end of initial events stream", "reflector", name, "totalItems", eventCount, "duration", watchDuration) return watchListBookmarkReceived, nil } initialEventsEndBookmarkWarningTicker.observeLastEventTimeStamp(clock.Now()) @@ -876,7 +921,7 @@ loop: if watchDuration < 1*time.Second && eventCount == 0 { return watchListBookmarkReceived, fmt.Errorf("very short watch: %s: Unexpected watch close - watch lasted less than a second and no items received", name) } - klog.V(4).Infof("%s: Watch close - %v total %v items received", name, expectedTypeName, eventCount) + klog.FromContext(ctx).V(4).Info("Watch close", "reflector", name, "type", expectedTypeName, "totalItems", eventCount) return watchListBookmarkReceived, nil } @@ -1004,8 +1049,9 @@ func wrapListFuncWithContext(listFn ListFunc) func(ctx context.Context, options // The methods exposed by this type are not thread-safe. type initialEventsEndBookmarkTicker struct { clock.Ticker - clock clock.Clock - name string + clock clock.Clock + name string + logger klog.Logger watchStart time.Time tickInterval time.Duration @@ -1019,15 +1065,15 @@ type initialEventsEndBookmarkTicker struct { // Note that the caller controls whether to call t.C() and t.Stop(). // // In practice, the reflector exits the watchHandler as soon as the bookmark event is received and calls the t.C() method. -func newInitialEventsEndBookmarkTicker(name string, c clock.Clock, watchStart time.Time, exitOnWatchListBookmarkReceived bool) *initialEventsEndBookmarkTicker { - return newInitialEventsEndBookmarkTickerInternal(name, c, watchStart, 10*time.Second, exitOnWatchListBookmarkReceived) +func newInitialEventsEndBookmarkTicker(logger klog.Logger, name string, c clock.Clock, watchStart time.Time, exitOnWatchListBookmarkReceived bool) *initialEventsEndBookmarkTicker { + return newInitialEventsEndBookmarkTickerInternal(logger, name, c, watchStart, 10*time.Second, exitOnWatchListBookmarkReceived) } -func newInitialEventsEndBookmarkTickerInternal(name string, c clock.Clock, watchStart time.Time, tickInterval time.Duration, exitOnWatchListBookmarkReceived bool) *initialEventsEndBookmarkTicker { +func newInitialEventsEndBookmarkTickerInternal(logger klog.Logger, name string, c clock.Clock, watchStart time.Time, tickInterval time.Duration, exitOnWatchListBookmarkReceived bool) *initialEventsEndBookmarkTicker { clockWithTicker, ok := c.(clock.WithTicker) if !ok || !exitOnWatchListBookmarkReceived { if exitOnWatchListBookmarkReceived { - klog.Warningf("clock does not support WithTicker interface but exitOnInitialEventsEndBookmark was requested") + logger.Info("Warning: clock does not support WithTicker interface but exitOnInitialEventsEndBookmark was requested") } return &initialEventsEndBookmarkTicker{ Ticker: &noopTicker{}, @@ -1038,6 +1084,7 @@ func newInitialEventsEndBookmarkTickerInternal(name string, c clock.Clock, watch Ticker: clockWithTicker.NewTicker(tickInterval), clock: c, name: name, + logger: logger, watchStart: watchStart, tickInterval: tickInterval, } @@ -1049,7 +1096,7 @@ func (t *initialEventsEndBookmarkTicker) observeLastEventTimeStamp(lastEventObse func (t *initialEventsEndBookmarkTicker) warnIfExpired() { if err := t.produceWarningIfExpired(); err != nil { - klog.Warning(err) + t.logger.Info("Warning: event bookmark expired", "err", err) } } diff --git a/tools/cache/reflector_test.go b/tools/cache/reflector_test.go index a56fce6e..b880cfc0 100644 --- a/tools/cache/reflector_test.go +++ b/tools/cache/reflector_test.go @@ -45,6 +45,7 @@ import ( "k8s.io/apimachinery/pkg/util/sets" "k8s.io/apimachinery/pkg/util/wait" "k8s.io/apimachinery/pkg/watch" + "k8s.io/klog/v2/ktesting" "k8s.io/utils/clock" testingclock "k8s.io/utils/clock/testing" ) @@ -64,6 +65,7 @@ func (t *testLW) Watch(options metav1.ListOptions) (watch.Interface, error) { } func TestCloseWatchChannelOnError(t *testing.T) { + _, ctx := ktesting.NewTestContext(t) r := NewReflector(&testLW{}, &v1.Pod{}, NewStore(MetaNamespaceKeyFunc), 0) pod := &v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "bar"}} fw := watch.NewFake() @@ -75,7 +77,7 @@ func TestCloseWatchChannelOnError(t *testing.T) { return &v1.PodList{ListMeta: metav1.ListMeta{ResourceVersion: "1"}}, nil }, } - go r.ListAndWatch(wait.NeverStop) + go func() { assert.NoError(t, r.ListAndWatchWithContext(ctx)) }() fw.Error(pod) select { case _, ok := <-fw.ResultChan(): @@ -89,7 +91,8 @@ func TestCloseWatchChannelOnError(t *testing.T) { } func TestRunUntil(t *testing.T) { - stopCh := make(chan struct{}) + _, ctx := ktesting.NewTestContext(t) + ctx, cancel := context.WithCancelCause(ctx) store := NewStore(MetaNamespaceKeyFunc) r := NewReflector(&testLW{}, &v1.Pod{}, store, 0) fw := watch.NewFake() @@ -104,13 +107,13 @@ func TestRunUntil(t *testing.T) { doneCh := make(chan struct{}) go func() { defer close(doneCh) - r.Run(stopCh) + r.RunWithContext(ctx) }() // Synchronously add a dummy pod into the watch channel so we // know the RunUntil go routine is in the watch handler. fw.Add(&v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "bar"}}) - close(stopCh) + cancel(errors.New("done")) resultCh := fw.ResultChan() for { select { @@ -149,8 +152,9 @@ func TestReflectorResyncChan(t *testing.T) { // TestReflectorWatchStoppedBefore ensures that neither List nor Watch are // called if the stop channel is closed before Reflector.watch is called. func TestReflectorWatchStoppedBefore(t *testing.T) { - stopCh := make(chan struct{}) - close(stopCh) + _, ctx := ktesting.NewTestContext(t) + ctx, cancel := context.WithCancelCause(ctx) + cancel(errors.New("don't run")) lw := &ListWatch{ ListFunc: func(_ metav1.ListOptions) (runtime.Object, error) { @@ -165,14 +169,15 @@ func TestReflectorWatchStoppedBefore(t *testing.T) { } target := NewReflector(lw, &v1.Pod{}, nil, 0) - err := target.watch(nil, stopCh, nil) + err := target.watch(ctx, nil, nil) require.NoError(t, err) } // TestReflectorWatchStoppedAfter ensures that neither the watcher is stopped if // the stop channel is closed after Reflector.watch has started watching. func TestReflectorWatchStoppedAfter(t *testing.T) { - stopCh := make(chan struct{}) + _, ctx := ktesting.NewTestContext(t) + ctx, cancel := context.WithCancelCause(ctx) var watchers []*watch.FakeWatcher @@ -185,7 +190,7 @@ func TestReflectorWatchStoppedAfter(t *testing.T) { // Simulate the stop channel being closed after watching has started go func() { time.Sleep(10 * time.Millisecond) - close(stopCh) + cancel(errors.New("10ms timeout reached")) }() // Use a fake watcher that never sends events w := watch.NewFake() @@ -195,7 +200,7 @@ func TestReflectorWatchStoppedAfter(t *testing.T) { } target := NewReflector(lw, &v1.Pod{}, nil, 0) - err := target.watch(nil, stopCh, nil) + err := target.watch(ctx, nil, nil) require.NoError(t, err) require.Len(t, watchers, 1) require.True(t, watchers[0].IsStopped()) @@ -219,9 +224,10 @@ func BenchmarkReflectorResyncChanMany(b *testing.B) { func TestReflectorHandleWatchStoppedBefore(t *testing.T) { s := NewStore(MetaNamespaceKeyFunc) g := NewReflector(&testLW{}, &v1.Pod{}, s, 0) - stopCh := make(chan struct{}) - // Simulate the watch channel being closed before the watchHandler is called - close(stopCh) + _, ctx := ktesting.NewTestContext(t) + ctx, cancel := context.WithCancelCause(ctx) + // Simulate the context being canceled before the watchHandler is called + cancel(errors.New("don't run")) var calls []string resultCh := make(chan watch.Event) fw := watch.MockWatcher{ @@ -234,7 +240,7 @@ func TestReflectorHandleWatchStoppedBefore(t *testing.T) { return resultCh }, } - err := handleWatch(time.Now(), fw, s, g.expectedType, g.expectedGVK, g.name, g.typeDescription, g.setLastSyncResourceVersion, g.clock, nevererrc, stopCh) + err := handleWatch(ctx, time.Now(), fw, s, g.expectedType, g.expectedGVK, g.name, g.typeDescription, g.setLastSyncResourceVersion, g.clock, nevererrc) if err == nil { t.Errorf("unexpected non-error") } @@ -251,7 +257,8 @@ func TestReflectorHandleWatchStoppedAfter(t *testing.T) { s := NewStore(MetaNamespaceKeyFunc) g := NewReflector(&testLW{}, &v1.Pod{}, s, 0) var calls []string - stopCh := make(chan struct{}) + _, ctx := ktesting.NewTestContext(t) + ctx, cancel := context.WithCancelCause(ctx) resultCh := make(chan watch.Event) fw := watch.MockWatcher{ StopFunc: func() { @@ -265,12 +272,12 @@ func TestReflectorHandleWatchStoppedAfter(t *testing.T) { // caller, after watching has started. go func() { time.Sleep(10 * time.Millisecond) - close(stopCh) + cancel(errors.New("10ms timeout reached")) }() return resultCh }, } - err := handleWatch(time.Now(), fw, s, g.expectedType, g.expectedGVK, g.name, g.typeDescription, g.setLastSyncResourceVersion, g.clock, nevererrc, stopCh) + err := handleWatch(ctx, time.Now(), fw, s, g.expectedType, g.expectedGVK, g.name, g.typeDescription, g.setLastSyncResourceVersion, g.clock, nevererrc) if err == nil { t.Errorf("unexpected non-error") } @@ -285,6 +292,7 @@ func TestReflectorHandleWatchStoppedAfter(t *testing.T) { func TestReflectorHandleWatchResultChanClosedBefore(t *testing.T) { s := NewStore(MetaNamespaceKeyFunc) g := NewReflector(&testLW{}, &v1.Pod{}, s, 0) + _, ctx := ktesting.NewTestContext(t) var calls []string resultCh := make(chan watch.Event) fw := watch.MockWatcher{ @@ -298,7 +306,7 @@ func TestReflectorHandleWatchResultChanClosedBefore(t *testing.T) { } // Simulate the result channel being closed by the producer before handleWatch is called. close(resultCh) - err := handleWatch(time.Now(), fw, s, g.expectedType, g.expectedGVK, g.name, g.typeDescription, g.setLastSyncResourceVersion, g.clock, nevererrc, wait.NeverStop) + err := handleWatch(ctx, time.Now(), fw, s, g.expectedType, g.expectedGVK, g.name, g.typeDescription, g.setLastSyncResourceVersion, g.clock, nevererrc) if err == nil { t.Errorf("unexpected non-error") } @@ -313,6 +321,7 @@ func TestReflectorHandleWatchResultChanClosedBefore(t *testing.T) { func TestReflectorHandleWatchResultChanClosedAfter(t *testing.T) { s := NewStore(MetaNamespaceKeyFunc) g := NewReflector(&testLW{}, &v1.Pod{}, s, 0) + _, ctx := ktesting.NewTestContext(t) var calls []string resultCh := make(chan watch.Event) fw := watch.MockWatcher{ @@ -331,7 +340,7 @@ func TestReflectorHandleWatchResultChanClosedAfter(t *testing.T) { return resultCh }, } - err := handleWatch(time.Now(), fw, s, g.expectedType, g.expectedGVK, g.name, g.typeDescription, g.setLastSyncResourceVersion, g.clock, nevererrc, wait.NeverStop) + err := handleWatch(ctx, time.Now(), fw, s, g.expectedType, g.expectedGVK, g.name, g.typeDescription, g.setLastSyncResourceVersion, g.clock, nevererrc) if err == nil { t.Errorf("unexpected non-error") } @@ -348,11 +357,12 @@ func TestReflectorWatchHandler(t *testing.T) { // watching after all the events have been consumed. This avoids race // conditions which can happen if the producer calls Stop(), instead of the // consumer. - stopCh := make(chan struct{}) + _, ctx := ktesting.NewTestContext(t) + ctx, cancel := context.WithCancelCause(ctx) setLastSyncResourceVersion := func(rv string) { g.setLastSyncResourceVersion(rv) if rv == "32" { - close(stopCh) + cancel(errors.New("LastSyncResourceVersion is 32")) } } fw := watch.NewFake() @@ -365,7 +375,7 @@ func TestReflectorWatchHandler(t *testing.T) { fw.Add(&v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "baz", ResourceVersion: "32"}}) fw.Stop() }() - err := handleWatch(time.Now(), fw, s, g.expectedType, g.expectedGVK, g.name, g.typeDescription, setLastSyncResourceVersion, g.clock, nevererrc, stopCh) + err := handleWatch(ctx, time.Now(), fw, s, g.expectedType, g.expectedGVK, g.name, g.typeDescription, setLastSyncResourceVersion, g.clock, nevererrc) // TODO(karlkfi): Fix FakeWatcher to avoid race condition between watcher.Stop() & close(stopCh) if err != nil && !errors.Is(err, errorStopRequested) { t.Errorf("unexpected error %v", err) @@ -408,15 +418,19 @@ func TestReflectorStopWatch(t *testing.T) { s := NewStore(MetaNamespaceKeyFunc) g := NewReflector(&testLW{}, &v1.Pod{}, s, 0) fw := watch.NewFake() - stopWatch := make(chan struct{}) - close(stopWatch) - err := handleWatch(time.Now(), fw, s, g.expectedType, g.expectedGVK, g.name, g.typeDescription, g.setLastSyncResourceVersion, g.clock, nevererrc, stopWatch) + _, ctx := ktesting.NewTestContext(t) + ctx, cancel := context.WithCancelCause(ctx) + cancel(errors.New("don't run")) + err := handleWatch(ctx, time.Now(), fw, s, g.expectedType, g.expectedGVK, g.name, g.typeDescription, g.setLastSyncResourceVersion, g.clock, nevererrc) if err != errorStopRequested { t.Errorf("expected stop error, got %q", err) } } func TestReflectorListAndWatch(t *testing.T) { + _, ctx := ktesting.NewTestContext(t) + ctx, cancel := context.WithCancel(ctx) + defer cancel() createdFakes := make(chan *watch.FakeWatcher) // The ListFunc says that it's at revision 1. Therefore, we expect our WatchFunc @@ -442,7 +456,7 @@ func TestReflectorListAndWatch(t *testing.T) { } s := NewFIFO(MetaNamespaceKeyFunc) r := NewReflector(lw, &v1.Pod{}, s, 0) - go r.ListAndWatch(wait.NeverStop) + go func() { assert.NoError(t, r.ListAndWatchWithContext(ctx)) }() ids := []string{"foo", "bar", "baz", "qux", "zoo"} var fw *watch.FakeWatcher @@ -539,7 +553,8 @@ func TestReflectorListAndWatchWithErrors(t *testing.T) { } } watchRet, watchErr := item.events, item.watchErr - stopCh := make(chan struct{}) + _, ctx := ktesting.NewTestContext(t) + ctx, cancel := context.WithCancelCause(ctx) lw := &testLW{ WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) { if watchErr != nil { @@ -557,7 +572,7 @@ func TestReflectorListAndWatchWithErrors(t *testing.T) { // result channel, and wait for the consumer to stop the // watcher, to avoid race conditions. // TODO: Fix the FakeWatcher to separate watcher.Stop from close(resultCh) - close(stopCh) + cancel(errors.New("done")) }() return fw, nil }, @@ -566,7 +581,7 @@ func TestReflectorListAndWatchWithErrors(t *testing.T) { }, } r := NewReflector(lw, &v1.Pod{}, s, 0) - err := r.ListAndWatch(stopCh) + err := r.ListAndWatchWithContext(ctx) if item.listErr != nil && !errors.Is(err, item.listErr) { t.Errorf("unexpected ListAndWatch error: %v", err) } @@ -593,7 +608,8 @@ func TestReflectorListAndWatchInitConnBackoff(t *testing.T) { for _, test := range table { t.Run(fmt.Sprintf("%d connection failures takes at least %d ms", test.numConnFails, 1< 0 { if resyncPeriod < minimumResyncPeriod { - klog.Warningf("resyncPeriod %v is too small. Changing it to the minimum allowed value of %v", resyncPeriod, minimumResyncPeriod) + logger.Info("Warning: resync period is too small. Changing it to the minimum allowed value", "resyncPeriod", resyncPeriod, "minimumResyncPeriod", minimumResyncPeriod) resyncPeriod = minimumResyncPeriod } if resyncPeriod < s.resyncCheckPeriod { if s.started { - klog.Warningf("resyncPeriod %v is smaller than resyncCheckPeriod %v and the informer has already started. Changing it to %v", resyncPeriod, s.resyncCheckPeriod, s.resyncCheckPeriod) + logger.Info("Warning: resync period is smaller than resync check period and the informer has already started. Changing it to the resync check period", "resyncPeriod", resyncPeriod, "resyncCheckPeriod", s.resyncCheckPeriod) + resyncPeriod = s.resyncCheckPeriod } else { // if the event handler's resyncPeriod is smaller than the current resyncCheckPeriod, update // resyncCheckPeriod to match resyncPeriod and adjust the resync periods of all the listeners // accordingly s.resyncCheckPeriod = resyncPeriod - s.processor.resyncCheckPeriodChanged(resyncPeriod) + s.processor.resyncCheckPeriodChanged(logger, resyncPeriod) } } } - listener := newProcessListener(handler, resyncPeriod, determineResyncPeriod(resyncPeriod, s.resyncCheckPeriod), s.clock.Now(), initialBufferSize, s.HasSynced) + listener := newProcessListener(logger, handler, resyncPeriod, determineResyncPeriod(logger, resyncPeriod, s.resyncCheckPeriod), s.clock.Now(), initialBufferSize, s.HasSynced) if !s.started { return s.processor.addListener(listener), nil @@ -794,7 +877,7 @@ func (p *sharedProcessor) distribute(obj interface{}, sync bool) { } } -func (p *sharedProcessor) run(stopCh <-chan struct{}) { +func (p *sharedProcessor) run(ctx context.Context) { func() { p.listenersLock.RLock() defer p.listenersLock.RUnlock() @@ -804,7 +887,7 @@ func (p *sharedProcessor) run(stopCh <-chan struct{}) { } p.listenersStarted = true }() - <-stopCh + <-ctx.Done() p.listenersLock.Lock() defer p.listenersLock.Unlock() @@ -844,13 +927,13 @@ func (p *sharedProcessor) shouldResync() bool { return resyncNeeded } -func (p *sharedProcessor) resyncCheckPeriodChanged(resyncCheckPeriod time.Duration) { +func (p *sharedProcessor) resyncCheckPeriodChanged(logger klog.Logger, resyncCheckPeriod time.Duration) { p.listenersLock.RLock() defer p.listenersLock.RUnlock() for listener := range p.listeners { resyncPeriod := determineResyncPeriod( - listener.requestedResyncPeriod, resyncCheckPeriod) + logger, listener.requestedResyncPeriod, resyncCheckPeriod) listener.setResyncPeriod(resyncPeriod) } } @@ -867,6 +950,7 @@ func (p *sharedProcessor) resyncCheckPeriodChanged(resyncCheckPeriod time.Durati // processorListener also keeps track of the adjusted requested resync // period of the listener. type processorListener struct { + logger klog.Logger nextCh chan interface{} addCh chan interface{} @@ -910,8 +994,9 @@ func (p *processorListener) HasSynced() bool { return p.syncTracker.HasSynced() } -func newProcessListener(handler ResourceEventHandler, requestedResyncPeriod, resyncPeriod time.Duration, now time.Time, bufferSize int, hasSynced func() bool) *processorListener { +func newProcessListener(logger klog.Logger, handler ResourceEventHandler, requestedResyncPeriod, resyncPeriod time.Duration, now time.Time, bufferSize int, hasSynced func() bool) *processorListener { ret := &processorListener{ + logger: logger, nextCh: make(chan interface{}), addCh: make(chan interface{}), handler: handler, @@ -934,7 +1019,7 @@ func (p *processorListener) add(notification interface{}) { } func (p *processorListener) pop() { - defer utilruntime.HandleCrash() + defer utilruntime.HandleCrashWithLogger(p.logger) defer close(p.nextCh) // Tell .run() to stop var nextCh chan<- interface{} @@ -966,11 +1051,21 @@ func (p *processorListener) pop() { func (p *processorListener) run() { // this call blocks until the channel is closed. When a panic happens during the notification // we will catch it, **the offending item will be skipped!**, and after a short delay (one second) - // the next notification will be attempted. This is usually better than the alternative of never + // the next notification will be attempted. This is usually better than the alternative of never // delivering again. - stopCh := make(chan struct{}) - wait.Until(func() { - for next := range p.nextCh { + // + // This only applies if utilruntime is configured to not panic, which is not the default. + sleepAfterCrash := false + for next := range p.nextCh { + if sleepAfterCrash { + // Sleep before processing the next item. + time.Sleep(time.Second) + } + func() { + // Gets reset below, but only if we get that far. + sleepAfterCrash = true + defer utilruntime.HandleCrashWithLogger(p.logger) + switch notification := next.(type) { case updateNotification: p.handler.OnUpdate(notification.oldObj, notification.newObj) @@ -982,12 +1077,11 @@ func (p *processorListener) run() { case deleteNotification: p.handler.OnDelete(notification.oldObj) default: - utilruntime.HandleError(fmt.Errorf("unrecognized notification: %T", next)) + utilruntime.HandleErrorWithLogger(p.logger, nil, "unrecognized notification", "notificationType", fmt.Sprintf("%T", next)) } - } - // the only way to get here is if the p.nextCh is empty and closed - close(stopCh) - }, 1*time.Second, stopCh) + sleepAfterCrash = false + }() + } } // shouldResync deterimines if the listener needs a resync. If the listener's resyncPeriod is 0, diff --git a/tools/cache/shared_informer_test.go b/tools/cache/shared_informer_test.go index a8f5f074..d17db6bd 100644 --- a/tools/cache/shared_informer_test.go +++ b/tools/cache/shared_informer_test.go @@ -20,6 +20,7 @@ import ( "context" "fmt" "math/rand" + "runtime" "strconv" "strings" "sync" @@ -34,8 +35,13 @@ import ( v1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/meta" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + utilruntime "k8s.io/apimachinery/pkg/util/runtime" "k8s.io/apimachinery/pkg/util/sets" "k8s.io/apimachinery/pkg/util/wait" + fcache "k8s.io/client-go/tools/cache/testing" + "k8s.io/klog/v2" + "k8s.io/klog/v2/ktesting" + "k8s.io/klog/v2/textlogger" testingclock "k8s.io/utils/clock/testing" ) @@ -145,10 +151,15 @@ func TestIndexer(t *testing.T) { if err != nil { t.Fatal(err) } - stop := make(chan struct{}) - defer close(stop) - go informer.Run(stop) + var wg wait.Group + stop := make(chan struct{}) + wg.StartWithChannel(stop, informer.Run) + defer func() { + close(stop) + wg.Wait() + }() + WaitForCacheSync(stop, informer.HasSynced) cmpOps := cmpopts.SortSlices(func(a, b any) bool { @@ -222,10 +233,13 @@ func TestListenerResyncPeriods(t *testing.T) { informer.AddEventHandlerWithResyncPeriod(listener3, listener3.resyncPeriod) listeners := []*testListener{listener1, listener2, listener3} + var wg wait.Group stop := make(chan struct{}) - defer close(stop) - - go informer.Run(stop) + wg.StartWithChannel(stop, informer.Run) + defer func() { + close(stop) + wg.Wait() + }() // ensure all listeners got the initial List for _, listener := range listeners { @@ -361,10 +375,14 @@ func TestSharedInformerInitializationRace(t *testing.T) { informer := NewSharedInformer(source, &v1.Pod{}, 1*time.Second).(*sharedIndexInformer) listener := newTestListener("raceListener", 0) - stop := make(chan struct{}) go informer.AddEventHandlerWithResyncPeriod(listener, listener.resyncPeriod) - go informer.Run(stop) - close(stop) + var wg wait.Group + stop := make(chan struct{}) + wg.StartWithChannel(stop, informer.Run) + defer func() { + close(stop) + wg.Wait() + }() } // TestSharedInformerWatchDisruption simulates a watch that was closed @@ -392,10 +410,13 @@ func TestSharedInformerWatchDisruption(t *testing.T) { informer.AddEventHandlerWithResyncPeriod(listenerResync, listenerResync.resyncPeriod) listeners := []*testListener{listenerNoResync, listenerResync} + var wg wait.Group stop := make(chan struct{}) - defer close(stop) - - go informer.Run(stop) + wg.StartWithChannel(stop, informer.Run) + defer func() { + close(stop) + wg.Wait() + }() for _, listener := range listeners { if !listener.ok() { @@ -458,8 +479,13 @@ func TestSharedInformerErrorHandling(t *testing.T) { errCh <- err }) + var wg wait.Group stop := make(chan struct{}) - go informer.Run(stop) + wg.StartWithChannel(stop, informer.Run) + defer func() { + close(stop) + wg.Wait() + }() select { case err := <-errCh: @@ -469,7 +495,6 @@ func TestSharedInformerErrorHandling(t *testing.T) { case <-time.After(time.Second): t.Errorf("Timeout waiting for error handler call") } - close(stop) } // TestSharedInformerStartRace is a regression test to ensure there is no race between @@ -494,9 +519,12 @@ func TestSharedInformerStartRace(t *testing.T) { } }() - go informer.Run(stop) - - close(stop) + var wg wait.Group + wg.StartWithChannel(stop, informer.Run) + defer func() { + close(stop) + wg.Wait() + }() } func TestSharedInformerTransformer(t *testing.T) { @@ -522,9 +550,13 @@ func TestSharedInformerTransformer(t *testing.T) { listenerTransformer := newTestListener("listenerTransformer", 0, "POD1", "POD2") informer.AddEventHandler(listenerTransformer) + var wg wait.Group stop := make(chan struct{}) - go informer.Run(stop) - defer close(stop) + wg.StartWithChannel(stop, informer.Run) + defer func() { + close(stop) + wg.Wait() + }() if !listenerTransformer.ok() { t.Errorf("%s: expected %v, got %v", listenerTransformer.name, listenerTransformer.expectedItemNames, listenerTransformer.receivedItemNames) @@ -757,9 +789,11 @@ func TestSharedInformerHandlerAbuse(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) informerCtx, informerCancel := context.WithCancel(context.Background()) - go func() { - informer.Run(informerCtx.Done()) + var informerWg wait.Group + informerWg.StartWithChannel(informerCtx.Done(), informer.Run) + defer func() { cancel() + informerWg.Wait() }() worker := func() { @@ -881,8 +915,10 @@ func TestStateSharedInformer(t *testing.T) { t.Errorf("informer already stopped after creation") return } + var wg wait.Group stop := make(chan struct{}) - go informer.Run(stop) + wg.StartWithChannel(stop, informer.Run) + defer wg.Wait() if !listener.ok() { t.Errorf("informer did not report initial objects") close(stop) @@ -921,7 +957,9 @@ func TestAddOnStoppedSharedInformer(t *testing.T) { informer := NewSharedInformer(source, &v1.Pod{}, 1*time.Second).(*sharedIndexInformer) listener := newTestListener("listener", 0, "pod1") stop := make(chan struct{}) - go informer.Run(stop) + var wg wait.Group + wg.StartWithChannel(stop, informer.Run) + defer wg.Wait() close(stop) err := wait.PollImmediate(100*time.Millisecond, 2*time.Second, func() (bool, error) { @@ -959,7 +997,9 @@ func TestRemoveOnStoppedSharedInformer(t *testing.T) { return } stop := make(chan struct{}) - go informer.Run(stop) + var wg wait.Group + wg.StartWithChannel(stop, informer.Run) + defer wg.Wait() close(stop) fmt.Println("sleeping") time.Sleep(1 * time.Second) @@ -986,9 +1026,13 @@ func TestRemoveWhileActive(t *testing.T) { handle, _ := informer.AddEventHandler(listener) stop := make(chan struct{}) - defer close(stop) + var wg wait.Group + wg.StartWithChannel(stop, informer.Run) + defer func() { + close(stop) + wg.Wait() + }() - go informer.Run(stop) source.Add(&v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "pod1"}}) if !listener.ok() { @@ -1026,7 +1070,12 @@ func TestAddWhileActive(t *testing.T) { } stop := make(chan struct{}) - defer close(stop) + var wg wait.Group + wg.StartWithChannel(stop, informer.Run) + defer func() { + close(stop) + wg.Wait() + }() go informer.Run(stop) source.Add(&v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "pod1"}}) @@ -1079,8 +1128,6 @@ func TestAddWhileActive(t *testing.T) { func TestShutdown(t *testing.T) { t.Run("no-context", func(t *testing.T) { source := newFakeControllerSource(t) - stop := make(chan struct{}) - defer close(stop) informer := NewSharedInformer(source, &v1.Pod{}, 1*time.Second) handler, err := informer.AddEventHandler(ResourceEventHandlerFuncs{ @@ -1090,17 +1137,30 @@ func TestShutdown(t *testing.T) { defer func() { assert.NoError(t, informer.RemoveEventHandler(handler)) }() - go informer.Run(stop) + + var wg wait.Group + stop := make(chan struct{}) + wg.StartWithChannel(stop, informer.Run) + defer func() { + close(stop) + wg.Wait() + }() + require.Eventually(t, informer.HasSynced, time.Minute, time.Millisecond, "informer has synced") }) t.Run("no-context-later", func(t *testing.T) { source := newFakeControllerSource(t) - stop := make(chan struct{}) - defer close(stop) - informer := NewSharedInformer(source, &v1.Pod{}, 1*time.Second) - go informer.Run(stop) + + var wg wait.Group + stop := make(chan struct{}) + wg.StartWithChannel(stop, informer.Run) + defer func() { + close(stop) + wg.Wait() + }() + require.Eventually(t, informer.HasSynced, time.Minute, time.Millisecond, "informer has synced") handler, err := informer.AddEventHandler(ResourceEventHandlerFuncs{ @@ -1122,3 +1182,152 @@ func TestShutdown(t *testing.T) { // and it doesn't matter that nothing gets stopped or removed. }) } + +func TestEventPanics(t *testing.T) { + // timeInUTC := time.Date(2009, 12, 1, 13, 30, 40, 42000, time.UTC) + // timeString := "1201 13:30:40.000042" + // Initialized by init. + var ( + buffer threadSafeBuffer + logger klog.Logger + source *fcache.FakeControllerSource + ) + + init := func(t *testing.T) { + // Restoring state is very sensitive to ordering. All goroutines spawned + // by a test must have completed and there has to be a check that they + // have completed that is visible to the race detector. This also + // applies to all other tests! + t.Cleanup(klog.CaptureState().Restore) //nolint:logcheck // CaptureState shouldn't be used in packages with contextual logging, but here it is okay. + buffer.buffer.Reset() + logger = textlogger.NewLogger(textlogger.NewConfig( + // textlogger.FixedTime(timeInUTC), + textlogger.Output(&buffer), + )) + oldReallyCrash := utilruntime.ReallyCrash + utilruntime.ReallyCrash = false + t.Cleanup(func() { utilruntime.ReallyCrash = oldReallyCrash }) + + source = newFakeControllerSource(t) + source.Add(&v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "pod1"}}) + } + + newHandler := func(ctx context.Context) ResourceEventHandlerFuncs { + logger := klog.FromContext(ctx) + return ResourceEventHandlerFuncs{ + AddFunc: func(obj any) { + logger.Info("Add func will panic now", "pod", klog.KObj(obj.(*v1.Pod))) + panic("fake panic") + }, + } + } + _, _, panicLine, _ := runtime.Caller(0) + panicLine -= 4 + expectedLog := func(name string) string { + if name == "" { + return fmt.Sprintf(`shared_informer_test.go:%d] "Observed a panic" panic="fake panic"`, panicLine) + } + return fmt.Sprintf(`shared_informer_test.go:%d] "Observed a panic" logger=%q panic="fake panic"`, panicLine, name) + } + handler := newHandler(context.Background()) + + t.Run("simple", func(t *testing.T) { + init(t) + klog.SetLogger(logger) + stop := make(chan struct{}) + informer := NewSharedInformer(source, &v1.Pod{}, 1*time.Second) + handle, err := informer.AddEventHandler(handler) + require.NoError(t, err) + defer func() { + assert.NoError(t, informer.RemoveEventHandler(handle)) + }() + var wg wait.Group + wg.StartWithChannel(stop, informer.Run) + defer func() { + close(stop) + assert.Eventually(t, informer.IsStopped, time.Minute, time.Millisecond, "informer has stopped") + wg.Wait() // For race detector... + }() + require.Eventually(t, informer.HasSynced, time.Minute, time.Millisecond, "informer has synced") + + // This times out (https://github.com/kubernetes/kubernetes/issues/129024) because the + // handler never syncs when the callback panics: + // require.Eventually(t, handle.HasSynced, time.Minute, time.Millisecond, "handler has synced") + // + // Wait for a non-empty buffer instead. This implies that we have to make + // the buffer thread-safe, which wouldn't be necessary otherwise. + assert.EventuallyWithT(t, func(t *assert.CollectT) { + assert.Contains(t, buffer.String(), expectedLog("")) + }, time.Minute, time.Millisecond, "handler has panicked") + }) + + t.Run("many", func(t *testing.T) { + init(t) + // One pod was already created in init, add some more. + numPods := 5 + for i := 1; i < numPods; i++ { + source.Add(&v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: fmt.Sprintf("pod%d", i+1)}}) + } + _, ctx := ktesting.NewTestContext(t) + ctx, cancel := context.WithCancel(ctx) + defer cancel() + + informer := NewSharedInformer(source, &v1.Pod{}, 1*time.Second) + name1 := "fake-event-handler-1" + logger1 := klog.LoggerWithName(logger, name1) + ctx1 := klog.NewContext(ctx, logger1) + handle1, err := informer.AddEventHandlerWithOptions(newHandler(ctx1), HandlerOptions{Logger: &logger1}) + require.NoError(t, err) + defer func() { + assert.NoError(t, informer.RemoveEventHandler(handle1)) + }() + name2 := "fake-event-handler-2" + logger2 := klog.LoggerWithName(logger, name2) + ctx2 := klog.NewContext(ctx, logger2) + handle2, err := informer.AddEventHandlerWithOptions(newHandler(ctx2), HandlerOptions{Logger: &logger2}) + require.NoError(t, err) + defer func() { + assert.NoError(t, informer.RemoveEventHandler(handle2)) + }() + + start := time.Now() + var wg wait.Group + informerName := "informer" + informerLogger := klog.LoggerWithName(logger, informerName) + informerCtx := klog.NewContext(ctx, informerLogger) + wg.StartWithContext(informerCtx, informer.RunWithContext) + defer func() { + cancel() + assert.Eventually(t, informer.IsStopped, time.Minute, time.Millisecond, "informer has stopped") + wg.Wait() // For race detector... + }() + require.Eventually(t, informer.HasSynced, time.Minute, time.Millisecond, "informer has synced") + + assert.EventuallyWithT(t, func(t *assert.CollectT) { + output := buffer.String() + expected := expectedLog(name1) + if !assert.Equal(t, numPods, numOccurrences(output, expected), "Log output should have the right number of panics for %q (search string: %q), got instead:\n%s", name1, expected, output) { + return + } + expected = expectedLog(name2) + assert.Equal(t, numPods, numOccurrences(output, expected), "Log output should have the right number of panics for %q (search string %q, got instead:\n%s", name2, expected, output) + }, 30*time.Second, time.Millisecond, "handler has panicked") + + // Both handlers should have slept for one second after each panic, + // except after the last pod event because then the input channel + // gets closed. + assert.GreaterOrEqual(t, time.Since(start), time.Duration(numPods-1)*time.Second, "Delay in processorListener.run") + }) +} + +func numOccurrences(hay, needle string) int { + count := 0 + for { + index := strings.Index(hay, needle) + if index < 0 { + return count + } + count++ + hay = hay[index+len(needle):] + } +} diff --git a/tools/cache/util_test.go b/tools/cache/util_test.go index 6d073ca7..ed19e3eb 100644 --- a/tools/cache/util_test.go +++ b/tools/cache/util_test.go @@ -17,6 +17,8 @@ limitations under the License. package cache import ( + "bytes" + "sync" "testing" fcache "k8s.io/client-go/tools/cache/testing" @@ -27,3 +29,21 @@ func newFakeControllerSource(tb testing.TB) *fcache.FakeControllerSource { tb.Cleanup(source.Shutdown) return source } + +// threadSafeBuffer is a thread-safe wrapper around bytes.Buffer. +type threadSafeBuffer struct { + buffer bytes.Buffer + mu sync.Mutex +} + +func (b *threadSafeBuffer) Write(p []byte) (n int, err error) { + b.mu.Lock() + defer b.mu.Unlock() + return b.buffer.Write(p) +} + +func (b *threadSafeBuffer) String() string { + b.mu.Lock() + defer b.mu.Unlock() + return b.buffer.String() +}