client-go/tools/cache: add APIs with context parameter

The context is used for cancellation and to support contextual logging.

In most cases, alternative *WithContext APIs get added, except for
NewIntegerResourceVersionMutationCache where code searches indicate that the
API is not used downstream.

An API break around SharedInformer couldn't be avoided because the
alternative (keeping the interface unchanged and adding a second one with
the new method) would have been worse. controller-runtime needs to be updated
because it implements that interface in a test package. Downstream consumers of
controller-runtime will work unless they use those test package.

Converting Kubernetes to use the other new alternatives will follow. In the
meantime, usage of the new alternatives cannot be enforced via logcheck
yet (see https://github.com/kubernetes/kubernetes/issues/126379 for the
process).

Passing context through and checking it for cancellation is tricky for event
handlers. A better approach is to map the context cancellation to the normal
removal of an event handler via a helper goroutine. Thanks to the new
HandleErrorWithLogr and HandleCrashWithLogr, remembering the logger is
sufficient for handling problems at runtime.

Kubernetes-commit: 4638ba971661497b147906b8977ae206c9dd6e44
This commit is contained in:
Patrick Ohly 2024-07-26 15:26:00 +02:00 committed by Kubernetes Publisher
parent b836a27b07
commit 5d289bc44c
16 changed files with 800 additions and 312 deletions

View File

@ -17,6 +17,8 @@ limitations under the License.
package main package main
import ( import (
"context"
"errors"
"flag" "flag"
"fmt" "fmt"
"time" "time"
@ -116,30 +118,30 @@ func (c *Controller) handleErr(err error, key string) {
} }
// Run begins watching and syncing. // Run begins watching and syncing.
func (c *Controller) Run(workers int, stopCh chan struct{}) { func (c *Controller) Run(ctx context.Context, workers int) {
defer runtime.HandleCrash() defer runtime.HandleCrashWithContext(ctx)
// Let the workers stop when we are done // Let the workers stop when we are done
defer c.queue.ShutDown() defer c.queue.ShutDown()
klog.Info("Starting Pod controller") klog.Info("Starting Pod controller")
go c.informer.Run(stopCh) go c.informer.RunWithContext(ctx)
// Wait for all involved caches to be synced, before processing items from the queue is started // Wait for all involved caches to be synced, before processing items from the queue is started
if !cache.WaitForCacheSync(stopCh, c.informer.HasSynced) { if !cache.WaitForNamedCacheSyncWithContext(ctx, c.informer.HasSynced) {
runtime.HandleError(fmt.Errorf("Timed out waiting for caches to sync")) runtime.HandleError(fmt.Errorf("Timed out waiting for caches to sync"))
return return
} }
for i := 0; i < workers; i++ { for i := 0; i < workers; i++ {
go wait.Until(c.runWorker, time.Second, stopCh) go wait.UntilWithContext(ctx, c.runWorker, time.Second)
} }
<-stopCh <-ctx.Done()
klog.Info("Stopping Pod controller") klog.Info("Stopping Pod controller")
} }
func (c *Controller) runWorker() { func (c *Controller) runWorker(ctx context.Context) {
for c.processNextItem() { for c.processNextItem() {
} }
} }
@ -164,6 +166,8 @@ func main() {
klog.Fatal(err) klog.Fatal(err)
} }
ctx := context.Background()
// create the pod watcher // create the pod watcher
podListWatcher := cache.NewListWatchFromClient(clientset.CoreV1().RESTClient(), "pods", v1.NamespaceDefault, fields.Everything()) podListWatcher := cache.NewListWatchFromClient(clientset.CoreV1().RESTClient(), "pods", v1.NamespaceDefault, fields.Everything())
@ -211,9 +215,9 @@ func main() {
}) })
// Now let's start the controller // Now let's start the controller
stop := make(chan struct{}) cancelCtx, cancel := context.WithCancelCause(ctx)
defer close(stop) defer cancel(errors.New("time to stop because main has completed"))
go controller.Run(1, stop) go controller.Run(cancelCtx, 1)
// Wait forever // Wait forever
select {} select {}

25
tools/cache/cache_test.go vendored Normal file
View File

@ -0,0 +1,25 @@
/*
Copyright 2024 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package cache
import (
"k8s.io/klog/v2"
)
func init() {
klog.InitFlags(nil)
}

View File

@ -17,6 +17,7 @@ limitations under the License.
package cache package cache
import ( import (
"context"
"errors" "errors"
"sync" "sync"
"time" "time"
@ -79,8 +80,14 @@ type Config struct {
RetryOnError bool RetryOnError bool
// Called whenever the ListAndWatch drops the connection with an error. // Called whenever the ListAndWatch drops the connection with an error.
//
// Contextual logging: WatchErrorHandlerWithContext should be used instead of WatchErrorHandler in code which supports contextual logging.
WatchErrorHandler WatchErrorHandler WatchErrorHandler WatchErrorHandler
// Called whenever the ListAndWatch drops the connection with an error
// and WatchErrorHandler is not set.
WatchErrorHandlerWithContext WatchErrorHandlerWithContext
// WatchListPageSize is the requested chunk size of initial and relist watch lists. // WatchListPageSize is the requested chunk size of initial and relist watch lists.
WatchListPageSize int64 WatchListPageSize int64
} }
@ -104,12 +111,21 @@ type controller struct {
// Controller is a low-level controller that is parameterized by a // Controller is a low-level controller that is parameterized by a
// Config and used in sharedIndexInformer. // Config and used in sharedIndexInformer.
type Controller interface { type Controller interface {
// Run does two things. One is to construct and run a Reflector // RunWithContext does two things. One is to construct and run a Reflector
// to pump objects/notifications from the Config's ListerWatcher // to pump objects/notifications from the Config's ListerWatcher
// to the Config's Queue and possibly invoke the occasional Resync // to the Config's Queue and possibly invoke the occasional Resync
// on that Queue. The other is to repeatedly Pop from the Queue // on that Queue. The other is to repeatedly Pop from the Queue
// and process with the Config's ProcessFunc. Both of these // and process with the Config's ProcessFunc. Both of these
// continue until `stopCh` is closed. // continue until the context is canceled.
//
// It's an error to call RunWithContext more than once.
// RunWithContext blocks; call via go.
RunWithContext(ctx context.Context)
// Run does the same as RunWithContext with a stop channel instead of
// a context.
//
// Contextual logging: RunWithcontext should be used instead of Run in code which supports contextual logging.
Run(stopCh <-chan struct{}) Run(stopCh <-chan struct{})
// HasSynced delegates to the Config's Queue // HasSynced delegates to the Config's Queue
@ -129,13 +145,16 @@ func New(c *Config) Controller {
return ctlr return ctlr
} }
// Run begins processing items, and will continue until a value is sent down stopCh or it is closed. // Run implements [Controller.Run].
// It's an error to call Run more than once.
// Run blocks; call via go.
func (c *controller) Run(stopCh <-chan struct{}) { func (c *controller) Run(stopCh <-chan struct{}) {
defer utilruntime.HandleCrash() c.RunWithContext(wait.ContextForChannel(stopCh))
}
// RunWithContext implements [Controller.RunWithContext].
func (c *controller) RunWithContext(ctx context.Context) {
defer utilruntime.HandleCrashWithContext(ctx)
go func() { go func() {
<-stopCh <-ctx.Done()
c.config.Queue.Close() c.config.Queue.Close()
}() }()
r := NewReflectorWithOptions( r := NewReflectorWithOptions(
@ -152,7 +171,11 @@ func (c *controller) Run(stopCh <-chan struct{}) {
r.ShouldResync = c.config.ShouldResync r.ShouldResync = c.config.ShouldResync
r.WatchListPageSize = c.config.WatchListPageSize r.WatchListPageSize = c.config.WatchListPageSize
if c.config.WatchErrorHandler != nil { if c.config.WatchErrorHandler != nil {
r.watchErrorHandler = c.config.WatchErrorHandler r.watchErrorHandler = func(_ context.Context, r *Reflector, err error) {
c.config.WatchErrorHandler(r, err)
}
} else if c.config.WatchErrorHandlerWithContext != nil {
r.watchErrorHandler = c.config.WatchErrorHandlerWithContext
} }
c.reflectorMutex.Lock() c.reflectorMutex.Lock()
@ -161,9 +184,9 @@ func (c *controller) Run(stopCh <-chan struct{}) {
var wg wait.Group var wg wait.Group
wg.StartWithChannel(stopCh, r.Run) wg.StartWithContext(ctx, r.RunWithContext)
wait.Until(c.processLoop, time.Second, stopCh) wait.UntilWithContext(ctx, c.processLoop, time.Second)
wg.Wait() wg.Wait()
} }
@ -185,13 +208,11 @@ func (c *controller) LastSyncResourceVersion() string {
// TODO: Consider doing the processing in parallel. This will require a little thought // TODO: Consider doing the processing in parallel. This will require a little thought
// to make sure that we don't end up processing the same object multiple times // to make sure that we don't end up processing the same object multiple times
// concurrently. // concurrently.
// func (c *controller) processLoop(ctx context.Context) {
// TODO: Plumb through the stopCh here (and down to the queue) so that this can
// actually exit when the controller is stopped. Or just give up on this stuff
// ever being stoppable. Converting this whole package to use Context would
// also be helpful.
func (c *controller) processLoop() {
for { for {
// TODO: Plumb through the ctx so that this can
// actually exit when the controller is stopped. Or just give up on this stuff
// ever being stoppable.
obj, err := c.config.Queue.Pop(PopProcessFunc(c.config.Process)) obj, err := c.config.Queue.Pop(PopProcessFunc(c.config.Process))
if err != nil { if err != nil {
if err == ErrFIFOClosed { if err == ErrFIFOClosed {

View File

@ -17,6 +17,7 @@ limitations under the License.
package cache package cache
import ( import (
"context"
"fmt" "fmt"
"math/rand" "math/rand"
"sync" "sync"
@ -32,6 +33,7 @@ import (
"k8s.io/apimachinery/pkg/util/wait" "k8s.io/apimachinery/pkg/util/wait"
"k8s.io/apimachinery/pkg/watch" "k8s.io/apimachinery/pkg/watch"
fcache "k8s.io/client-go/tools/cache/testing" fcache "k8s.io/client-go/tools/cache/testing"
"k8s.io/klog/v2/ktesting"
fuzz "github.com/google/gofuzz" fuzz "github.com/google/gofuzz"
) )
@ -98,10 +100,10 @@ func Example() {
}, },
} }
// Create the controller and run it until we close stop. // Create the controller and run it until we cancel.
stop := make(chan struct{}) ctx, cancel := context.WithCancel(context.Background())
defer close(stop) defer cancel()
go New(cfg).Run(stop) go New(cfg).RunWithContext(ctx)
// Let's add a few objects to the source. // Let's add a few objects to the source.
testIDs := []string{"a-hello", "b-controller", "c-framework"} testIDs := []string{"a-hello", "b-controller", "c-framework"}
@ -156,10 +158,10 @@ func ExampleNewInformer() {
}, },
) )
// Run the controller and run it until we close stop. // Run the controller and run it until we cancel.
stop := make(chan struct{}) ctx, cancel := context.WithCancel(context.Background())
defer close(stop) defer cancel()
go controller.Run(stop) go controller.RunWithContext(ctx)
// Let's add a few objects to the source. // Let's add a few objects to the source.
testIDs := []string{"a-hello", "b-controller", "c-framework"} testIDs := []string{"a-hello", "b-controller", "c-framework"}
@ -227,9 +229,10 @@ func TestHammerController(t *testing.T) {
t.Errorf("Expected HasSynced() to return false before we started the controller") t.Errorf("Expected HasSynced() to return false before we started the controller")
} }
// Run the controller and run it until we close stop. // Run the controller and run it until we cancel.
stop := make(chan struct{}) _, ctx := ktesting.NewTestContext(t)
go controller.Run(stop) ctx, cancel := context.WithCancel(ctx)
go controller.RunWithContext(ctx)
// Let's wait for the controller to do its initial sync // Let's wait for the controller to do its initial sync
wait.Poll(100*time.Millisecond, wait.ForeverTestTimeout, func() (bool, error) { wait.Poll(100*time.Millisecond, wait.ForeverTestTimeout, func() (bool, error) {
@ -288,7 +291,7 @@ func TestHammerController(t *testing.T) {
// Let's wait for the controller to finish processing the things we just added. // Let's wait for the controller to finish processing the things we just added.
// TODO: look in the queue to see how many items need to be processed. // TODO: look in the queue to see how many items need to be processed.
time.Sleep(100 * time.Millisecond) time.Sleep(100 * time.Millisecond)
close(stop) cancel()
// TODO: Verify that no goroutines were leaked here and that everything shut // TODO: Verify that no goroutines were leaked here and that everything shut
// down cleanly. // down cleanly.
@ -385,11 +388,13 @@ func TestUpdate(t *testing.T) {
}, },
) )
// Run the controller and run it until we close stop. // Run the controller and run it until we cancel.
// Once Run() is called, calls to testDoneWG.Done() might start, so // Once Run() is called, calls to testDoneWG.Done() might start, so
// all testDoneWG.Add() calls must happen before this point // all testDoneWG.Add() calls must happen before this point
stop := make(chan struct{}) _, ctx := ktesting.NewTestContext(t)
go controller.Run(stop) ctx, cancel := context.WithCancel(ctx)
defer cancel()
go controller.RunWithContext(ctx)
<-watchCh <-watchCh
// run every test a few times, in parallel // run every test a few times, in parallel
@ -407,7 +412,6 @@ func TestUpdate(t *testing.T) {
// Let's wait for the controller to process the things we just added. // Let's wait for the controller to process the things we just added.
testDoneWG.Wait() testDoneWG.Wait()
close(stop)
} }
func TestPanicPropagated(t *testing.T) { func TestPanicPropagated(t *testing.T) {
@ -427,9 +431,10 @@ func TestPanicPropagated(t *testing.T) {
}, },
) )
// Run the controller and run it until we close stop. // Run the controller and run it until we cancel.
stop := make(chan struct{}) _, ctx := ktesting.NewTestContext(t)
defer close(stop) ctx, cancel := context.WithCancel(ctx)
defer cancel()
propagated := make(chan interface{}) propagated := make(chan interface{})
go func() { go func() {
@ -438,7 +443,7 @@ func TestPanicPropagated(t *testing.T) {
propagated <- r propagated <- r
} }
}() }()
controller.Run(stop) controller.RunWithContext(ctx)
}() }()
// Let's add a object to the source. It will trigger a panic. // Let's add a object to the source. It will trigger a panic.
source.Add(&v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "test"}}) source.Add(&v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "test"}})
@ -555,8 +560,10 @@ func TestTransformingInformer(t *testing.T) {
} }
} }
stopCh := make(chan struct{}) _, ctx := ktesting.NewTestContext(t)
go controller.Run(stopCh) ctx, cancel := context.WithCancel(ctx)
defer cancel()
go controller.RunWithContext(ctx)
verifyEvent(watch.Added, nil, expectedPod("pod1", "2")) verifyEvent(watch.Added, nil, expectedPod("pod1", "2"))
verifyStore([]interface{}{expectedPod("pod1", "2")}) verifyStore([]interface{}{expectedPod("pod1", "2")})
@ -574,11 +581,17 @@ func TestTransformingInformer(t *testing.T) {
source.Delete(makePod("pod1", "2")) source.Delete(makePod("pod1", "2"))
verifyEvent(watch.Deleted, expectedPod("pod1", "2"), nil) verifyEvent(watch.Deleted, expectedPod("pod1", "2"), nil)
verifyStore([]interface{}{expectedPod("pod2", "2"), expectedPod("pod3", "1")}) verifyStore([]interface{}{expectedPod("pod2", "2"), expectedPod("pod3", "1")})
close(stopCh)
} }
func TestTransformingInformerRace(t *testing.T) { func TestTransformingInformerRace(t *testing.T) {
_, ctx := ktesting.NewTestContext(t)
// Canceled *only* when the test is done.
testCtx, cancel := context.WithCancel(ctx)
defer cancel()
// Canceled *also* during the test.
ctx, cancel = context.WithCancel(ctx)
defer cancel()
// source simulates an apiserver object endpoint. // source simulates an apiserver object endpoint.
source := newFakeControllerSource(t) source := newFakeControllerSource(t)
@ -618,7 +631,11 @@ func TestTransformingInformerRace(t *testing.T) {
type event struct{} type event struct{}
events := make(chan event, numObjs) events := make(chan event, numObjs)
recordEvent := func(eventType watch.EventType, previous, current interface{}) { recordEvent := func(eventType watch.EventType, previous, current interface{}) {
events <- event{} select {
case events <- event{}:
case <-testCtx.Done():
// Don't block forever in the write above when test is already done.
}
} }
checkEvents := func(count int) { checkEvents := func(count int) {
for i := 0; i < count; i++ { for i := 0; i < count; i++ {
@ -637,8 +654,7 @@ func TestTransformingInformerRace(t *testing.T) {
podTransformer, podTransformer,
) )
stopCh := make(chan struct{}) go controller.RunWithContext(ctx)
go controller.Run(stopCh)
checkEvents(numObjs) checkEvents(numObjs)
@ -652,7 +668,7 @@ func TestTransformingInformerRace(t *testing.T) {
key := fmt.Sprintf("namespace/pod-%d", index) key := fmt.Sprintf("namespace/pod-%d", index)
for { for {
select { select {
case <-stopCh: case <-ctx.Done():
return return
default: default:
} }
@ -674,7 +690,7 @@ func TestTransformingInformerRace(t *testing.T) {
// Let resyncs to happen for some time. // Let resyncs to happen for some time.
time.Sleep(time.Second) time.Sleep(time.Second)
close(stopCh) cancel()
wg.Wait() wg.Wait()
close(errors) close(errors)
for err := range errors { for err := range errors {

View File

@ -55,6 +55,9 @@ type DeltaFIFOOptions struct {
// If set, will be called for objects before enqueueing them. Please // If set, will be called for objects before enqueueing them. Please
// see the comment on TransformFunc for details. // see the comment on TransformFunc for details.
Transformer TransformFunc Transformer TransformFunc
// If set, log output will go to this logger instead of klog.Background().
Logger *klog.Logger
} }
// DeltaFIFO is like FIFO, but differs in two ways. One is that the // DeltaFIFO is like FIFO, but differs in two ways. One is that the
@ -136,6 +139,10 @@ type DeltaFIFO struct {
// Called with every object if non-nil. // Called with every object if non-nil.
transformer TransformFunc transformer TransformFunc
// logger is a per-instance logger. This gets chosen when constructing
// the instance, with klog.Background() as default.
logger klog.Logger
} }
// TransformFunc allows for transforming an object before it will be processed. // TransformFunc allows for transforming an object before it will be processed.
@ -253,6 +260,10 @@ func NewDeltaFIFOWithOptions(opts DeltaFIFOOptions) *DeltaFIFO {
emitDeltaTypeReplaced: opts.EmitDeltaTypeReplaced, emitDeltaTypeReplaced: opts.EmitDeltaTypeReplaced,
transformer: opts.Transformer, transformer: opts.Transformer,
logger: klog.Background(),
}
if opts.Logger != nil {
f.logger = *opts.Logger
} }
f.cond.L = &f.lock f.cond.L = &f.lock
return f return f
@ -487,10 +498,10 @@ func (f *DeltaFIFO) queueActionInternalLocked(actionType, internalActionType Del
// when given a non-empty list (as it is here). // when given a non-empty list (as it is here).
// If somehow it happens anyway, deal with it but complain. // If somehow it happens anyway, deal with it but complain.
if oldDeltas == nil { if oldDeltas == nil {
klog.Errorf("Impossible dedupDeltas for id=%q: oldDeltas=%#+v, obj=%#+v; ignoring", id, oldDeltas, obj) f.logger.Error(nil, "Impossible dedupDeltas, ignoring", "id", id, "oldDeltas", oldDeltas, "obj", obj)
return nil return nil
} }
klog.Errorf("Impossible dedupDeltas for id=%q: oldDeltas=%#+v, obj=%#+v; breaking invariant by storing empty Deltas", id, oldDeltas, obj) f.logger.Error(nil, "Impossible dedupDeltas, breaking invariant by storing empty Deltas", "id", id, "oldDeltas", oldDeltas, "obj", obj)
f.items[id] = newDeltas f.items[id] = newDeltas
return fmt.Errorf("Impossible dedupDeltas for id=%q: oldDeltas=%#+v, obj=%#+v; broke DeltaFIFO invariant by storing empty Deltas", id, oldDeltas, obj) return fmt.Errorf("Impossible dedupDeltas for id=%q: oldDeltas=%#+v, obj=%#+v; broke DeltaFIFO invariant by storing empty Deltas", id, oldDeltas, obj)
} }
@ -597,7 +608,7 @@ func (f *DeltaFIFO) Pop(process PopProcessFunc) (interface{}, error) {
item, ok := f.items[id] item, ok := f.items[id]
if !ok { if !ok {
// This should never happen // This should never happen
klog.Errorf("Inconceivable! %q was in f.queue but not f.items; ignoring.", id) f.logger.Error(nil, "Inconceivable! Item was in f.queue but not f.items; ignoring", "id", id)
continue continue
} }
delete(f.items, id) delete(f.items, id)
@ -694,10 +705,10 @@ func (f *DeltaFIFO) Replace(list []interface{}, _ string) error {
deletedObj, exists, err := f.knownObjects.GetByKey(k) deletedObj, exists, err := f.knownObjects.GetByKey(k)
if err != nil { if err != nil {
deletedObj = nil deletedObj = nil
klog.Errorf("Unexpected error %v during lookup of key %v, placing DeleteFinalStateUnknown marker without object", err, k) f.logger.Error(err, "Unexpected error during lookup, placing DeleteFinalStateUnknown marker without object", "key", k)
} else if !exists { } else if !exists {
deletedObj = nil deletedObj = nil
klog.Infof("Key %v does not exist in known objects store, placing DeleteFinalStateUnknown marker without object", k) f.logger.Info("Key does not exist in known objects store, placing DeleteFinalStateUnknown marker without object", "key", k)
} }
queuedDeletions++ queuedDeletions++
if err := f.queueActionLocked(Deleted, DeletedFinalStateUnknown{k, deletedObj}); err != nil { if err := f.queueActionLocked(Deleted, DeletedFinalStateUnknown{k, deletedObj}); err != nil {
@ -737,10 +748,10 @@ func (f *DeltaFIFO) Resync() error {
func (f *DeltaFIFO) syncKeyLocked(key string) error { func (f *DeltaFIFO) syncKeyLocked(key string) error {
obj, exists, err := f.knownObjects.GetByKey(key) obj, exists, err := f.knownObjects.GetByKey(key)
if err != nil { if err != nil {
klog.Errorf("Unexpected error %v during lookup of key %v, unable to queue object for sync", err, key) f.logger.Error(err, "Unexpected error during lookup, unable to queue object for sync", "key", key)
return nil return nil
} else if !exists { } else if !exists {
klog.Infof("Key %v does not exist in known objects store, unable to queue object for sync", key) f.logger.Info("Key does not exist in known objects store, unable to queue object for sync", "key", key)
return nil return nil
} }

View File

@ -62,7 +62,12 @@ func ListAllByNamespace(indexer Indexer, namespace string, selector labels.Selec
items, err := indexer.Index(NamespaceIndex, &metav1.ObjectMeta{Namespace: namespace}) items, err := indexer.Index(NamespaceIndex, &metav1.ObjectMeta{Namespace: namespace})
if err != nil { if err != nil {
// Ignore error; do slow search without index. // Ignore error; do slow search without index.
klog.Warningf("can not retrieve list of objects using index : %v", err) //
// ListAllByNamespace is called by generated code
// (k8s.io/client-go/listers) and probably not worth converting
// to contextual logging, which would require changing all of
// those APIs.
klog.TODO().Info("Warning: can not retrieve list of objects using index", "err", err)
for _, m := range indexer.List() { for _, m := range indexer.List() {
metadata, err := meta.Accessor(m) metadata, err := meta.Accessor(m)
if err != nil { if err != nil {

View File

@ -30,14 +30,6 @@ func TestMain(m *testing.M) {
goleak.IgnoreAnyFunction("k8s.io/client-go/tools/cache.TestFIFO_addUpdate.func1"), goleak.IgnoreAnyFunction("k8s.io/client-go/tools/cache.TestFIFO_addUpdate.func1"),
goleak.IgnoreAnyFunction("k8s.io/client-go/tools/cache.TestDeltaFIFO_addReplace.func1"), goleak.IgnoreAnyFunction("k8s.io/client-go/tools/cache.TestDeltaFIFO_addReplace.func1"),
goleak.IgnoreAnyFunction("k8s.io/client-go/tools/cache.TestDeltaFIFO_addUpdate.func1"), goleak.IgnoreAnyFunction("k8s.io/client-go/tools/cache.TestDeltaFIFO_addUpdate.func1"),
// TODO: fix the following tests by adding WithContext APIs and cancellation.
goleak.IgnoreAnyFunction("k8s.io/client-go/tools/cache.TestTransformingInformerRace.func3"),
// Created by k8s.io/client-go/tools/cache.TestReflectorListAndWatch, cannot filter on that (https://github.com/uber-go/goleak/issues/135):
goleak.IgnoreAnyFunction("k8s.io/client-go/tools/cache.(*Reflector).ListAndWatch"),
goleak.IgnoreAnyFunction("k8s.io/client-go/tools/cache.(*Reflector).startResync"),
// ???
goleak.IgnoreAnyFunction("k8s.io/client-go/tools/cache.(*DeltaFIFO).Close"),
} }
goleak.VerifyTestMain(m, options...) goleak.VerifyTestMain(m, options...)
} }

View File

@ -60,7 +60,7 @@ type ResourceVersionComparator interface {
// If includeAdds is true, objects in the mutation cache will be returned even if they don't exist // If includeAdds is true, objects in the mutation cache will be returned even if they don't exist
// in the underlying store. This is only safe if your use of the cache can handle mutation entries // in the underlying store. This is only safe if your use of the cache can handle mutation entries
// remaining in the cache for up to ttl when mutations and deletes occur very closely in time. // remaining in the cache for up to ttl when mutations and deletes occur very closely in time.
func NewIntegerResourceVersionMutationCache(backingCache Store, indexer Indexer, ttl time.Duration, includeAdds bool) MutationCache { func NewIntegerResourceVersionMutationCache(logger klog.Logger, backingCache Store, indexer Indexer, ttl time.Duration, includeAdds bool) MutationCache {
return &mutationCache{ return &mutationCache{
backingCache: backingCache, backingCache: backingCache,
indexer: indexer, indexer: indexer,
@ -68,6 +68,7 @@ func NewIntegerResourceVersionMutationCache(backingCache Store, indexer Indexer,
comparator: etcdObjectVersioner{}, comparator: etcdObjectVersioner{},
ttl: ttl, ttl: ttl,
includeAdds: includeAdds, includeAdds: includeAdds,
logger: logger,
} }
} }
@ -75,6 +76,7 @@ func NewIntegerResourceVersionMutationCache(backingCache Store, indexer Indexer,
// since you can't distinguish between, "didn't observe create" and "was deleted after create", // since you can't distinguish between, "didn't observe create" and "was deleted after create",
// if the key is missing from the backing cache, we always return it as missing // if the key is missing from the backing cache, we always return it as missing
type mutationCache struct { type mutationCache struct {
logger klog.Logger
lock sync.Mutex lock sync.Mutex
backingCache Store backingCache Store
indexer Indexer indexer Indexer
@ -157,7 +159,7 @@ func (c *mutationCache) ByIndex(name string, indexKey string) ([]interface{}, er
} }
elements, err := fn(updated) elements, err := fn(updated)
if err != nil { if err != nil {
klog.V(4).Infof("Unable to calculate an index entry for mutation cache entry %s: %v", key, err) c.logger.V(4).Info("Unable to calculate an index entry for mutation cache entry", "key", key, "err", err)
continue continue
} }
for _, inIndex := range elements { for _, inIndex := range elements {
@ -204,7 +206,7 @@ func (c *mutationCache) Mutation(obj interface{}) {
key, err := DeletionHandlingMetaNamespaceKeyFunc(obj) key, err := DeletionHandlingMetaNamespaceKeyFunc(obj)
if err != nil { if err != nil {
// this is a "nice to have", so failures shouldn't do anything weird // this is a "nice to have", so failures shouldn't do anything weird
utilruntime.HandleError(err) utilruntime.HandleErrorWithLogger(c.logger, err, "DeletionHandlingMetaNamespaceKeyFunc")
return return
} }

View File

@ -50,6 +50,7 @@ func NewCacheMutationDetector(name string) MutationDetector {
if !mutationDetectionEnabled { if !mutationDetectionEnabled {
return dummyMutationDetector{} return dummyMutationDetector{}
} }
//nolint:logcheck // This code shouldn't be used in production.
klog.Warningln("Mutation detector is enabled, this will result in memory leakage.") klog.Warningln("Mutation detector is enabled, this will result in memory leakage.")
return &defaultCacheMutationDetector{name: name, period: 1 * time.Second, retainDuration: 2 * time.Minute} return &defaultCacheMutationDetector{name: name, period: 1 * time.Second, retainDuration: 2 * time.Minute}
} }

View File

@ -22,6 +22,7 @@ import (
"time" "time"
"k8s.io/apimachinery/pkg/util/wait" "k8s.io/apimachinery/pkg/util/wait"
"k8s.io/klog/v2"
) )
const ( const (
@ -35,7 +36,7 @@ func BenchmarkListener(b *testing.B) {
swg.Add(b.N) swg.Add(b.N)
b.SetParallelism(concurrencyLevel) b.SetParallelism(concurrencyLevel)
// Preallocate enough space so that benchmark does not run out of it // Preallocate enough space so that benchmark does not run out of it
pl := newProcessListener(&ResourceEventHandlerFuncs{ pl := newProcessListener(klog.Background(), &ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) { AddFunc: func(obj interface{}) {
swg.Done() swg.Done()
}, },

View File

@ -95,7 +95,7 @@ type Reflector struct {
// lastSyncResourceVersionMutex guards read/write access to lastSyncResourceVersion // lastSyncResourceVersionMutex guards read/write access to lastSyncResourceVersion
lastSyncResourceVersionMutex sync.RWMutex lastSyncResourceVersionMutex sync.RWMutex
// Called whenever the ListAndWatch drops the connection with an error. // Called whenever the ListAndWatch drops the connection with an error.
watchErrorHandler WatchErrorHandler watchErrorHandler WatchErrorHandlerWithContext
// WatchListPageSize is the requested chunk size of initial and resync watch lists. // WatchListPageSize is the requested chunk size of initial and resync watch lists.
// If unset, for consistent reads (RV="") or reads that opt-into arbitrarily old data // If unset, for consistent reads (RV="") or reads that opt-into arbitrarily old data
// (RV="0") it will default to pager.PageSize, for the rest (RV != "" && RV != "0") // (RV="0") it will default to pager.PageSize, for the rest (RV != "" && RV != "0")
@ -150,20 +150,32 @@ type ResourceVersionUpdater interface {
// should be offloaded. // should be offloaded.
type WatchErrorHandler func(r *Reflector, err error) type WatchErrorHandler func(r *Reflector, err error)
// DefaultWatchErrorHandler is the default implementation of WatchErrorHandler // The WatchErrorHandler is called whenever ListAndWatch drops the
func DefaultWatchErrorHandler(r *Reflector, err error) { // connection with an error. After calling this handler, the informer
// will backoff and retry.
//
// The default implementation looks at the error type and tries to log
// the error message at an appropriate level.
//
// Implementations of this handler may display the error message in other
// ways. Implementations should return quickly - any expensive processing
// should be offloaded.
type WatchErrorHandlerWithContext func(ctx context.Context, r *Reflector, err error)
// DefaultWatchErrorHandler is the default implementation of WatchErrorHandlerWithContext.
func DefaultWatchErrorHandler(ctx context.Context, r *Reflector, err error) {
switch { switch {
case isExpiredError(err): case isExpiredError(err):
// Don't set LastSyncResourceVersionUnavailable - LIST call with ResourceVersion=RV already // Don't set LastSyncResourceVersionUnavailable - LIST call with ResourceVersion=RV already
// has a semantic that it returns data at least as fresh as provided RV. // has a semantic that it returns data at least as fresh as provided RV.
// So first try to LIST with setting RV to resource version of last observed object. // So first try to LIST with setting RV to resource version of last observed object.
klog.V(4).Infof("%s: watch of %v closed with: %v", r.name, r.typeDescription, err) klog.FromContext(ctx).V(4).Info("Watch closed", "reflector", r.name, "type", r.typeDescription, "err", err)
case err == io.EOF: case err == io.EOF:
// watch closed normally // watch closed normally
case err == io.ErrUnexpectedEOF: case err == io.ErrUnexpectedEOF:
klog.V(1).Infof("%s: Watch for %v closed with unexpected EOF: %v", r.name, r.typeDescription, err) klog.FromContext(ctx).V(1).Info("Watch closed with unexpected EOF", "reflector", r.name, "type", r.typeDescription, "err", err)
default: default:
utilruntime.HandleError(fmt.Errorf("%s: Failed to watch %v: %v", r.name, r.typeDescription, err)) utilruntime.HandleErrorWithContext(ctx, err, "Failed to watch", "reflector", r.name, "type", r.typeDescription)
} }
} }
@ -243,7 +255,7 @@ func NewReflectorWithOptions(lw ListerWatcher, expectedType interface{}, store S
// 0.22 QPS. If we don't backoff for 2min, assume API server is healthy and we reset the backoff. // 0.22 QPS. If we don't backoff for 2min, assume API server is healthy and we reset the backoff.
backoffManager: wait.NewExponentialBackoffManager(800*time.Millisecond, 30*time.Second, 2*time.Minute, 2.0, 1.0, reflectorClock), backoffManager: wait.NewExponentialBackoffManager(800*time.Millisecond, 30*time.Second, 2*time.Minute, 2.0, 1.0, reflectorClock),
clock: reflectorClock, clock: reflectorClock,
watchErrorHandler: WatchErrorHandler(DefaultWatchErrorHandler), watchErrorHandler: WatchErrorHandlerWithContext(DefaultWatchErrorHandler),
expectedType: reflect.TypeOf(expectedType), expectedType: reflect.TypeOf(expectedType),
} }
@ -309,14 +321,24 @@ var internalPackages = []string{"client-go/tools/cache/"}
// Run repeatedly uses the reflector's ListAndWatch to fetch all the // Run repeatedly uses the reflector's ListAndWatch to fetch all the
// objects and subsequent deltas. // objects and subsequent deltas.
// Run will exit when stopCh is closed. // Run will exit when stopCh is closed.
//
// Contextual logging: RunWithContext should be used instead of Run in code which supports contextual logging.
func (r *Reflector) Run(stopCh <-chan struct{}) { func (r *Reflector) Run(stopCh <-chan struct{}) {
klog.V(3).Infof("Starting reflector %s (%s) from %s", r.typeDescription, r.resyncPeriod, r.name) r.RunWithContext(wait.ContextForChannel(stopCh))
}
// RunWithContext repeatedly uses the reflector's ListAndWatch to fetch all the
// objects and subsequent deltas.
// Run will exit when the context is canceled.
func (r *Reflector) RunWithContext(ctx context.Context) {
logger := klog.FromContext(ctx)
logger.V(3).Info("Starting reflector", "type", r.typeDescription, "resyncPeriod", r.resyncPeriod, "reflector", r.name)
wait.BackoffUntil(func() { wait.BackoffUntil(func() {
if err := r.ListAndWatch(stopCh); err != nil { if err := r.ListAndWatchWithContext(ctx); err != nil {
r.watchErrorHandler(r, err) r.watchErrorHandler(ctx, r, err)
} }
}, r.backoffManager, true, stopCh) }, r.backoffManager, true, ctx.Done())
klog.V(3).Infof("Stopping reflector %s (%s) from %s", r.typeDescription, r.resyncPeriod, r.name) logger.V(3).Info("Stopping reflector", "type", r.typeDescription, "resyncPeriod", r.resyncPeriod, "reflector", r.name)
} }
var ( var (
@ -345,21 +367,31 @@ func (r *Reflector) resyncChan() (<-chan time.Time, func() bool) {
// ListAndWatch first lists all items and get the resource version at the moment of call, // ListAndWatch first lists all items and get the resource version at the moment of call,
// and then use the resource version to watch. // and then use the resource version to watch.
// It returns error if ListAndWatch didn't even try to initialize watch. // It returns error if ListAndWatch didn't even try to initialize watch.
//
// Contextual logging: ListAndWatchWithContext should be used instead of ListAndWatch in code which supports contextual logging.
func (r *Reflector) ListAndWatch(stopCh <-chan struct{}) error { func (r *Reflector) ListAndWatch(stopCh <-chan struct{}) error {
klog.V(3).Infof("Listing and watching %v from %s", r.typeDescription, r.name) return r.ListAndWatchWithContext(wait.ContextForChannel(stopCh))
}
// ListAndWatchWithContext first lists all items and get the resource version at the moment of call,
// and then use the resource version to watch.
// It returns error if ListAndWatchWithContext didn't even try to initialize watch.
func (r *Reflector) ListAndWatchWithContext(ctx context.Context) error {
logger := klog.FromContext(ctx)
logger.V(3).Info("Listing and watching", "type", r.typeDescription, "reflector", r.name)
var err error var err error
var w watch.Interface var w watch.Interface
useWatchList := ptr.Deref(r.UseWatchList, false) useWatchList := ptr.Deref(r.UseWatchList, false)
fallbackToList := !useWatchList fallbackToList := !useWatchList
if useWatchList { if useWatchList {
w, err = r.watchList(stopCh) w, err = r.watchList(ctx)
if w == nil && err == nil { if w == nil && err == nil {
// stopCh was closed // stopCh was closed
return nil return nil
} }
if err != nil { if err != nil {
klog.Warningf("The watchlist request ended with an error, falling back to the standard LIST/WATCH semantics because making progress is better than deadlocking, err = %v", err) logger.Error(err, "The watchlist request ended with an error, falling back to the standard LIST/WATCH semantics because making progress is better than deadlocking")
fallbackToList = true fallbackToList = true
// ensure that we won't accidentally pass some garbage down the watch. // ensure that we won't accidentally pass some garbage down the watch.
w = nil w = nil
@ -367,20 +399,21 @@ func (r *Reflector) ListAndWatch(stopCh <-chan struct{}) error {
} }
if fallbackToList { if fallbackToList {
err = r.list(stopCh) err = r.list(ctx)
if err != nil { if err != nil {
return err return err
} }
} }
klog.V(2).Infof("Caches populated for %v from %s", r.typeDescription, r.name) logger.V(2).Info("Caches populated", "type", r.typeDescription, "reflector", r.name)
return r.watchWithResync(w, stopCh) return r.watchWithResync(ctx, w)
} }
// startResync periodically calls r.store.Resync() method. // startResync periodically calls r.store.Resync() method.
// Note that this method is blocking and should be // Note that this method is blocking and should be
// called in a separate goroutine. // called in a separate goroutine.
func (r *Reflector) startResync(stopCh <-chan struct{}, cancelCh <-chan struct{}, resyncerrc chan error) { func (r *Reflector) startResync(ctx context.Context, resyncerrc chan error) {
logger := klog.FromContext(ctx)
resyncCh, cleanup := r.resyncChan() resyncCh, cleanup := r.resyncChan()
defer func() { defer func() {
cleanup() // Call the last one written into cleanup cleanup() // Call the last one written into cleanup
@ -388,13 +421,11 @@ func (r *Reflector) startResync(stopCh <-chan struct{}, cancelCh <-chan struct{}
for { for {
select { select {
case <-resyncCh: case <-resyncCh:
case <-stopCh: case <-ctx.Done():
return
case <-cancelCh:
return return
} }
if r.ShouldResync == nil || r.ShouldResync() { if r.ShouldResync == nil || r.ShouldResync() {
klog.V(4).Infof("%s: forcing resync", r.name) logger.V(4).Info("Forcing resync", "reflector", r.name)
if err := r.store.Resync(); err != nil { if err := r.store.Resync(); err != nil {
resyncerrc <- err resyncerrc <- err
return return
@ -406,16 +437,27 @@ func (r *Reflector) startResync(stopCh <-chan struct{}, cancelCh <-chan struct{}
} }
// watchWithResync runs watch with startResync in the background. // watchWithResync runs watch with startResync in the background.
func (r *Reflector) watchWithResync(w watch.Interface, stopCh <-chan struct{}) error { func (r *Reflector) watchWithResync(ctx context.Context, w watch.Interface) error {
resyncerrc := make(chan error, 1) resyncerrc := make(chan error, 1)
cancelCh := make(chan struct{}) cancelCtx, cancel := context.WithCancel(ctx)
defer close(cancelCh) // Waiting for completion of the goroutine is relevant for race detector.
go r.startResync(stopCh, cancelCh, resyncerrc) // Without this, there is a race between "this function returns + code
return r.watch(w, stopCh, resyncerrc) // waiting for it" and "goroutine does something".
var wg wait.Group
defer func() {
cancel()
wg.Wait()
}()
wg.Start(func() {
r.startResync(cancelCtx, resyncerrc)
})
return r.watch(ctx, w, resyncerrc)
} }
// watch simply starts a watch request with the server. // watch simply starts a watch request with the server.
func (r *Reflector) watch(w watch.Interface, stopCh <-chan struct{}, resyncerrc chan error) error { func (r *Reflector) watch(ctx context.Context, w watch.Interface, resyncerrc chan error) error {
stopCh := ctx.Done()
logger := klog.FromContext(ctx)
var err error var err error
retry := NewRetryWithDeadline(r.MaxInternalErrorRetryDuration, time.Minute, apierrors.IsInternalError, r.clock) retry := NewRetryWithDeadline(r.MaxInternalErrorRetryDuration, time.Minute, apierrors.IsInternalError, r.clock)
@ -451,7 +493,7 @@ func (r *Reflector) watch(w watch.Interface, stopCh <-chan struct{}, resyncerrc
w, err = r.listerWatcher.Watch(options) w, err = r.listerWatcher.Watch(options)
if err != nil { if err != nil {
if canRetry := isWatchErrorRetriable(err); canRetry { if canRetry := isWatchErrorRetriable(err); canRetry {
klog.V(4).Infof("%s: watch of %v returned %v - backing off", r.name, r.typeDescription, err) logger.V(4).Info("Watch failed - backing off", "reflector", r.name, "type", r.typeDescription, "err", err)
select { select {
case <-stopCh: case <-stopCh:
return nil return nil
@ -463,8 +505,8 @@ func (r *Reflector) watch(w watch.Interface, stopCh <-chan struct{}, resyncerrc
} }
} }
err = handleWatch(start, w, r.store, r.expectedType, r.expectedGVK, r.name, r.typeDescription, r.setLastSyncResourceVersion, err = handleWatch(ctx, start, w, r.store, r.expectedType, r.expectedGVK, r.name, r.typeDescription, r.setLastSyncResourceVersion,
r.clock, resyncerrc, stopCh) r.clock, resyncerrc)
// Ensure that watch will not be reused across iterations. // Ensure that watch will not be reused across iterations.
w.Stop() w.Stop()
w = nil w = nil
@ -476,9 +518,9 @@ func (r *Reflector) watch(w watch.Interface, stopCh <-chan struct{}, resyncerrc
// Don't set LastSyncResourceVersionUnavailable - LIST call with ResourceVersion=RV already // Don't set LastSyncResourceVersionUnavailable - LIST call with ResourceVersion=RV already
// has a semantic that it returns data at least as fresh as provided RV. // has a semantic that it returns data at least as fresh as provided RV.
// So first try to LIST with setting RV to resource version of last observed object. // So first try to LIST with setting RV to resource version of last observed object.
klog.V(4).Infof("%s: watch of %v closed with: %v", r.name, r.typeDescription, err) logger.V(4).Info("Watch closed", "reflector", r.name, "type", r.typeDescription, "err", err)
case apierrors.IsTooManyRequests(err): case apierrors.IsTooManyRequests(err):
klog.V(2).Infof("%s: watch of %v returned 429 - backing off", r.name, r.typeDescription) logger.V(2).Info("Watch returned 429 - backing off", "reflector", r.name, "type", r.typeDescription)
select { select {
case <-stopCh: case <-stopCh:
return nil return nil
@ -486,10 +528,10 @@ func (r *Reflector) watch(w watch.Interface, stopCh <-chan struct{}, resyncerrc
continue continue
} }
case apierrors.IsInternalError(err) && retry.ShouldRetry(): case apierrors.IsInternalError(err) && retry.ShouldRetry():
klog.V(2).Infof("%s: retrying watch of %v internal error: %v", r.name, r.typeDescription, err) logger.V(2).Info("Retrying watch after internal error", "reflector", r.name, "type", r.typeDescription, "err", err)
continue continue
default: default:
klog.Warningf("%s: watch of %v ended with: %v", r.name, r.typeDescription, err) logger.Info("Warning: watch ended with error", "reflector", r.name, "type", r.typeDescription, "err", err)
} }
} }
return nil return nil
@ -499,7 +541,7 @@ func (r *Reflector) watch(w watch.Interface, stopCh <-chan struct{}, resyncerrc
// list simply lists all items and records a resource version obtained from the server at the moment of the call. // list simply lists all items and records a resource version obtained from the server at the moment of the call.
// the resource version can be used for further progress notification (aka. watch). // the resource version can be used for further progress notification (aka. watch).
func (r *Reflector) list(stopCh <-chan struct{}) error { func (r *Reflector) list(ctx context.Context) error {
var resourceVersion string var resourceVersion string
options := metav1.ListOptions{ResourceVersion: r.relistResourceVersion()} options := metav1.ListOptions{ResourceVersion: r.relistResourceVersion()}
@ -558,7 +600,7 @@ func (r *Reflector) list(stopCh <-chan struct{}) error {
close(listCh) close(listCh)
}() }()
select { select {
case <-stopCh: case <-ctx.Done():
return nil return nil
case r := <-panicCh: case r := <-panicCh:
panic(r) panic(r)
@ -566,7 +608,6 @@ func (r *Reflector) list(stopCh <-chan struct{}) error {
} }
initTrace.Step("Objects listed", trace.Field{Key: "error", Value: err}) initTrace.Step("Objects listed", trace.Field{Key: "error", Value: err})
if err != nil { if err != nil {
klog.Warningf("%s: failed to list %v: %v", r.name, r.typeDescription, err)
return fmt.Errorf("failed to list %v: %w", r.typeDescription, err) return fmt.Errorf("failed to list %v: %w", r.typeDescription, err)
} }
@ -624,7 +665,9 @@ func (r *Reflector) list(stopCh <-chan struct{}) error {
// After receiving a "Bookmark" event the reflector is considered to be synchronized. // After receiving a "Bookmark" event the reflector is considered to be synchronized.
// It replaces its internal store with the collected items and // It replaces its internal store with the collected items and
// reuses the current watch requests for getting further events. // reuses the current watch requests for getting further events.
func (r *Reflector) watchList(stopCh <-chan struct{}) (watch.Interface, error) { func (r *Reflector) watchList(ctx context.Context) (watch.Interface, error) {
stopCh := ctx.Done()
logger := klog.FromContext(ctx)
var w watch.Interface var w watch.Interface
var err error var err error
var temporaryStore Store var temporaryStore Store
@ -634,7 +677,7 @@ func (r *Reflector) watchList(stopCh <-chan struct{}) (watch.Interface, error) {
// could be unified with the r.watch method // could be unified with the r.watch method
isErrorRetriableWithSideEffectsFn := func(err error) bool { isErrorRetriableWithSideEffectsFn := func(err error) bool {
if canRetry := isWatchErrorRetriable(err); canRetry { if canRetry := isWatchErrorRetriable(err); canRetry {
klog.V(2).Infof("%s: watch-list of %v returned %v - backing off", r.name, r.typeDescription, err) logger.V(2).Info("watch-list failed - backing off", "reflector", r.name, "type", r.typeDescription, "err", err)
<-r.backoffManager.Backoff().C() <-r.backoffManager.Backoff().C()
return true return true
} }
@ -681,9 +724,9 @@ func (r *Reflector) watchList(stopCh <-chan struct{}) (watch.Interface, error) {
} }
return nil, err return nil, err
} }
watchListBookmarkReceived, err := handleListWatch(start, w, temporaryStore, r.expectedType, r.expectedGVK, r.name, r.typeDescription, watchListBookmarkReceived, err := handleListWatch(ctx, start, w, temporaryStore, r.expectedType, r.expectedGVK, r.name, r.typeDescription,
func(rv string) { resourceVersion = rv }, func(rv string) { resourceVersion = rv },
r.clock, make(chan error), stopCh) r.clock, make(chan error))
if err != nil { if err != nil {
w.Stop() // stop and retry with clean state w.Stop() // stop and retry with clean state
if errors.Is(err, errorStopRequested) { if errors.Is(err, errorStopRequested) {
@ -706,7 +749,7 @@ func (r *Reflector) watchList(stopCh <-chan struct{}) (watch.Interface, error) {
// we utilize the temporaryStore to ensure independence from the current store implementation. // we utilize the temporaryStore to ensure independence from the current store implementation.
// as of today, the store is implemented as a queue and will be drained by the higher-level // as of today, the store is implemented as a queue and will be drained by the higher-level
// component as soon as it finishes replacing the content. // component as soon as it finishes replacing the content.
checkWatchListDataConsistencyIfRequested(wait.ContextForChannel(stopCh), r.name, resourceVersion, wrapListFuncWithContext(r.listerWatcher.List), temporaryStore.List) checkWatchListDataConsistencyIfRequested(ctx, r.name, resourceVersion, wrapListFuncWithContext(r.listerWatcher.List), temporaryStore.List)
if err := r.store.Replace(temporaryStore.List(), resourceVersion); err != nil { if err := r.store.Replace(temporaryStore.List(), resourceVersion); err != nil {
return nil, fmt.Errorf("unable to sync watch-list result: %w", err) return nil, fmt.Errorf("unable to sync watch-list result: %w", err)
@ -731,6 +774,7 @@ func (r *Reflector) syncWith(items []runtime.Object, resourceVersion string) err
// retry. If successful, the watcher will be left open after receiving the // retry. If successful, the watcher will be left open after receiving the
// initial set of objects, to allow watching for future events. // initial set of objects, to allow watching for future events.
func handleListWatch( func handleListWatch(
ctx context.Context,
start time.Time, start time.Time,
w watch.Interface, w watch.Interface,
store Store, store Store,
@ -741,17 +785,17 @@ func handleListWatch(
setLastSyncResourceVersion func(string), setLastSyncResourceVersion func(string),
clock clock.Clock, clock clock.Clock,
errCh chan error, errCh chan error,
stopCh <-chan struct{},
) (bool, error) { ) (bool, error) {
exitOnWatchListBookmarkReceived := true exitOnWatchListBookmarkReceived := true
return handleAnyWatch(start, w, store, expectedType, expectedGVK, name, expectedTypeName, return handleAnyWatch(ctx, start, w, store, expectedType, expectedGVK, name, expectedTypeName,
setLastSyncResourceVersion, exitOnWatchListBookmarkReceived, clock, errCh, stopCh) setLastSyncResourceVersion, exitOnWatchListBookmarkReceived, clock, errCh)
} }
// handleListWatch consumes events from w, updates the Store, and records the // handleListWatch consumes events from w, updates the Store, and records the
// last seen ResourceVersion, to allow continuing from that ResourceVersion on // last seen ResourceVersion, to allow continuing from that ResourceVersion on
// retry. The watcher will always be stopped on exit. // retry. The watcher will always be stopped on exit.
func handleWatch( func handleWatch(
ctx context.Context,
start time.Time, start time.Time,
w watch.Interface, w watch.Interface,
store Store, store Store,
@ -762,11 +806,10 @@ func handleWatch(
setLastSyncResourceVersion func(string), setLastSyncResourceVersion func(string),
clock clock.Clock, clock clock.Clock,
errCh chan error, errCh chan error,
stopCh <-chan struct{},
) error { ) error {
exitOnWatchListBookmarkReceived := false exitOnWatchListBookmarkReceived := false
_, err := handleAnyWatch(start, w, store, expectedType, expectedGVK, name, expectedTypeName, _, err := handleAnyWatch(ctx, start, w, store, expectedType, expectedGVK, name, expectedTypeName,
setLastSyncResourceVersion, exitOnWatchListBookmarkReceived, clock, errCh, stopCh) setLastSyncResourceVersion, exitOnWatchListBookmarkReceived, clock, errCh)
return err return err
} }
@ -779,7 +822,9 @@ func handleWatch(
// The watcher will always be stopped, unless exitOnWatchListBookmarkReceived is // The watcher will always be stopped, unless exitOnWatchListBookmarkReceived is
// true and watchListBookmarkReceived is true. This allows the same watch stream // true and watchListBookmarkReceived is true. This allows the same watch stream
// to be re-used by the caller to continue watching for new events. // to be re-used by the caller to continue watching for new events.
func handleAnyWatch(start time.Time, func handleAnyWatch(
ctx context.Context,
start time.Time,
w watch.Interface, w watch.Interface,
store Store, store Store,
expectedType reflect.Type, expectedType reflect.Type,
@ -790,17 +835,17 @@ func handleAnyWatch(start time.Time,
exitOnWatchListBookmarkReceived bool, exitOnWatchListBookmarkReceived bool,
clock clock.Clock, clock clock.Clock,
errCh chan error, errCh chan error,
stopCh <-chan struct{},
) (bool, error) { ) (bool, error) {
watchListBookmarkReceived := false watchListBookmarkReceived := false
eventCount := 0 eventCount := 0
initialEventsEndBookmarkWarningTicker := newInitialEventsEndBookmarkTicker(name, clock, start, exitOnWatchListBookmarkReceived) logger := klog.FromContext(ctx)
initialEventsEndBookmarkWarningTicker := newInitialEventsEndBookmarkTicker(logger, name, clock, start, exitOnWatchListBookmarkReceived)
defer initialEventsEndBookmarkWarningTicker.Stop() defer initialEventsEndBookmarkWarningTicker.Stop()
loop: loop:
for { for {
select { select {
case <-stopCh: case <-ctx.Done():
return watchListBookmarkReceived, errorStopRequested return watchListBookmarkReceived, errorStopRequested
case err := <-errCh: case err := <-errCh:
return watchListBookmarkReceived, err return watchListBookmarkReceived, err
@ -813,19 +858,19 @@ loop:
} }
if expectedType != nil { if expectedType != nil {
if e, a := expectedType, reflect.TypeOf(event.Object); e != a { if e, a := expectedType, reflect.TypeOf(event.Object); e != a {
utilruntime.HandleError(fmt.Errorf("%s: expected type %v, but watch event object had type %v", name, e, a)) utilruntime.HandleErrorWithContext(ctx, nil, "Unexpected watch event object type", "reflector", name, "expectedType", e, "actualType", a)
continue continue
} }
} }
if expectedGVK != nil { if expectedGVK != nil {
if e, a := *expectedGVK, event.Object.GetObjectKind().GroupVersionKind(); e != a { if e, a := *expectedGVK, event.Object.GetObjectKind().GroupVersionKind(); e != a {
utilruntime.HandleError(fmt.Errorf("%s: expected gvk %v, but watch event object had gvk %v", name, e, a)) utilruntime.HandleErrorWithContext(ctx, nil, "Unexpected watch event object gvk", "reflector", name, "expectedGVK", e, "actualGVK", a)
continue continue
} }
} }
meta, err := meta.Accessor(event.Object) meta, err := meta.Accessor(event.Object)
if err != nil { if err != nil {
utilruntime.HandleError(fmt.Errorf("%s: unable to understand watch event %#v", name, event)) utilruntime.HandleErrorWithContext(ctx, err, "Unable to understand watch event", "reflector", name, "event", event)
continue continue
} }
resourceVersion := meta.GetResourceVersion() resourceVersion := meta.GetResourceVersion()
@ -833,12 +878,12 @@ loop:
case watch.Added: case watch.Added:
err := store.Add(event.Object) err := store.Add(event.Object)
if err != nil { if err != nil {
utilruntime.HandleError(fmt.Errorf("%s: unable to add watch event object (%#v) to store: %v", name, event.Object, err)) utilruntime.HandleErrorWithContext(ctx, err, "Unable to add watch event object to store", "reflector", name, "object", event.Object)
} }
case watch.Modified: case watch.Modified:
err := store.Update(event.Object) err := store.Update(event.Object)
if err != nil { if err != nil {
utilruntime.HandleError(fmt.Errorf("%s: unable to update watch event object (%#v) to store: %v", name, event.Object, err)) utilruntime.HandleErrorWithContext(ctx, err, "Unable to update watch event object to store", "reflector", name, "object", event.Object)
} }
case watch.Deleted: case watch.Deleted:
// TODO: Will any consumers need access to the "last known // TODO: Will any consumers need access to the "last known
@ -846,7 +891,7 @@ loop:
// to change this. // to change this.
err := store.Delete(event.Object) err := store.Delete(event.Object)
if err != nil { if err != nil {
utilruntime.HandleError(fmt.Errorf("%s: unable to delete watch event object (%#v) from store: %v", name, event.Object, err)) utilruntime.HandleErrorWithContext(ctx, err, "Unable to delete watch event object from store", "reflector", name, "object", event.Object)
} }
case watch.Bookmark: case watch.Bookmark:
// A `Bookmark` means watch has synced here, just update the resourceVersion // A `Bookmark` means watch has synced here, just update the resourceVersion
@ -854,7 +899,7 @@ loop:
watchListBookmarkReceived = true watchListBookmarkReceived = true
} }
default: default:
utilruntime.HandleError(fmt.Errorf("%s: unable to understand watch event %#v", name, event)) utilruntime.HandleErrorWithContext(ctx, err, "Unknown watch event", "reflector", name, "event", event)
} }
setLastSyncResourceVersion(resourceVersion) setLastSyncResourceVersion(resourceVersion)
if rvu, ok := store.(ResourceVersionUpdater); ok { if rvu, ok := store.(ResourceVersionUpdater); ok {
@ -863,7 +908,7 @@ loop:
eventCount++ eventCount++
if exitOnWatchListBookmarkReceived && watchListBookmarkReceived { if exitOnWatchListBookmarkReceived && watchListBookmarkReceived {
watchDuration := clock.Since(start) watchDuration := clock.Since(start)
klog.V(4).Infof("exiting %v Watch because received the bookmark that marks the end of initial events stream, total %v items received in %v", name, eventCount, watchDuration) klog.FromContext(ctx).V(4).Info("Exiting watch because received the bookmark that marks the end of initial events stream", "reflector", name, "totalItems", eventCount, "duration", watchDuration)
return watchListBookmarkReceived, nil return watchListBookmarkReceived, nil
} }
initialEventsEndBookmarkWarningTicker.observeLastEventTimeStamp(clock.Now()) initialEventsEndBookmarkWarningTicker.observeLastEventTimeStamp(clock.Now())
@ -876,7 +921,7 @@ loop:
if watchDuration < 1*time.Second && eventCount == 0 { if watchDuration < 1*time.Second && eventCount == 0 {
return watchListBookmarkReceived, fmt.Errorf("very short watch: %s: Unexpected watch close - watch lasted less than a second and no items received", name) return watchListBookmarkReceived, fmt.Errorf("very short watch: %s: Unexpected watch close - watch lasted less than a second and no items received", name)
} }
klog.V(4).Infof("%s: Watch close - %v total %v items received", name, expectedTypeName, eventCount) klog.FromContext(ctx).V(4).Info("Watch close", "reflector", name, "type", expectedTypeName, "totalItems", eventCount)
return watchListBookmarkReceived, nil return watchListBookmarkReceived, nil
} }
@ -1006,6 +1051,7 @@ type initialEventsEndBookmarkTicker struct {
clock.Ticker clock.Ticker
clock clock.Clock clock clock.Clock
name string name string
logger klog.Logger
watchStart time.Time watchStart time.Time
tickInterval time.Duration tickInterval time.Duration
@ -1019,15 +1065,15 @@ type initialEventsEndBookmarkTicker struct {
// Note that the caller controls whether to call t.C() and t.Stop(). // Note that the caller controls whether to call t.C() and t.Stop().
// //
// In practice, the reflector exits the watchHandler as soon as the bookmark event is received and calls the t.C() method. // In practice, the reflector exits the watchHandler as soon as the bookmark event is received and calls the t.C() method.
func newInitialEventsEndBookmarkTicker(name string, c clock.Clock, watchStart time.Time, exitOnWatchListBookmarkReceived bool) *initialEventsEndBookmarkTicker { func newInitialEventsEndBookmarkTicker(logger klog.Logger, name string, c clock.Clock, watchStart time.Time, exitOnWatchListBookmarkReceived bool) *initialEventsEndBookmarkTicker {
return newInitialEventsEndBookmarkTickerInternal(name, c, watchStart, 10*time.Second, exitOnWatchListBookmarkReceived) return newInitialEventsEndBookmarkTickerInternal(logger, name, c, watchStart, 10*time.Second, exitOnWatchListBookmarkReceived)
} }
func newInitialEventsEndBookmarkTickerInternal(name string, c clock.Clock, watchStart time.Time, tickInterval time.Duration, exitOnWatchListBookmarkReceived bool) *initialEventsEndBookmarkTicker { func newInitialEventsEndBookmarkTickerInternal(logger klog.Logger, name string, c clock.Clock, watchStart time.Time, tickInterval time.Duration, exitOnWatchListBookmarkReceived bool) *initialEventsEndBookmarkTicker {
clockWithTicker, ok := c.(clock.WithTicker) clockWithTicker, ok := c.(clock.WithTicker)
if !ok || !exitOnWatchListBookmarkReceived { if !ok || !exitOnWatchListBookmarkReceived {
if exitOnWatchListBookmarkReceived { if exitOnWatchListBookmarkReceived {
klog.Warningf("clock does not support WithTicker interface but exitOnInitialEventsEndBookmark was requested") logger.Info("Warning: clock does not support WithTicker interface but exitOnInitialEventsEndBookmark was requested")
} }
return &initialEventsEndBookmarkTicker{ return &initialEventsEndBookmarkTicker{
Ticker: &noopTicker{}, Ticker: &noopTicker{},
@ -1038,6 +1084,7 @@ func newInitialEventsEndBookmarkTickerInternal(name string, c clock.Clock, watch
Ticker: clockWithTicker.NewTicker(tickInterval), Ticker: clockWithTicker.NewTicker(tickInterval),
clock: c, clock: c,
name: name, name: name,
logger: logger,
watchStart: watchStart, watchStart: watchStart,
tickInterval: tickInterval, tickInterval: tickInterval,
} }
@ -1049,7 +1096,7 @@ func (t *initialEventsEndBookmarkTicker) observeLastEventTimeStamp(lastEventObse
func (t *initialEventsEndBookmarkTicker) warnIfExpired() { func (t *initialEventsEndBookmarkTicker) warnIfExpired() {
if err := t.produceWarningIfExpired(); err != nil { if err := t.produceWarningIfExpired(); err != nil {
klog.Warning(err) t.logger.Info("Warning: event bookmark expired", "err", err)
} }
} }

View File

@ -45,6 +45,7 @@ import (
"k8s.io/apimachinery/pkg/util/sets" "k8s.io/apimachinery/pkg/util/sets"
"k8s.io/apimachinery/pkg/util/wait" "k8s.io/apimachinery/pkg/util/wait"
"k8s.io/apimachinery/pkg/watch" "k8s.io/apimachinery/pkg/watch"
"k8s.io/klog/v2/ktesting"
"k8s.io/utils/clock" "k8s.io/utils/clock"
testingclock "k8s.io/utils/clock/testing" testingclock "k8s.io/utils/clock/testing"
) )
@ -64,6 +65,7 @@ func (t *testLW) Watch(options metav1.ListOptions) (watch.Interface, error) {
} }
func TestCloseWatchChannelOnError(t *testing.T) { func TestCloseWatchChannelOnError(t *testing.T) {
_, ctx := ktesting.NewTestContext(t)
r := NewReflector(&testLW{}, &v1.Pod{}, NewStore(MetaNamespaceKeyFunc), 0) r := NewReflector(&testLW{}, &v1.Pod{}, NewStore(MetaNamespaceKeyFunc), 0)
pod := &v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "bar"}} pod := &v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "bar"}}
fw := watch.NewFake() fw := watch.NewFake()
@ -75,7 +77,7 @@ func TestCloseWatchChannelOnError(t *testing.T) {
return &v1.PodList{ListMeta: metav1.ListMeta{ResourceVersion: "1"}}, nil return &v1.PodList{ListMeta: metav1.ListMeta{ResourceVersion: "1"}}, nil
}, },
} }
go r.ListAndWatch(wait.NeverStop) go func() { assert.NoError(t, r.ListAndWatchWithContext(ctx)) }()
fw.Error(pod) fw.Error(pod)
select { select {
case _, ok := <-fw.ResultChan(): case _, ok := <-fw.ResultChan():
@ -89,7 +91,8 @@ func TestCloseWatchChannelOnError(t *testing.T) {
} }
func TestRunUntil(t *testing.T) { func TestRunUntil(t *testing.T) {
stopCh := make(chan struct{}) _, ctx := ktesting.NewTestContext(t)
ctx, cancel := context.WithCancelCause(ctx)
store := NewStore(MetaNamespaceKeyFunc) store := NewStore(MetaNamespaceKeyFunc)
r := NewReflector(&testLW{}, &v1.Pod{}, store, 0) r := NewReflector(&testLW{}, &v1.Pod{}, store, 0)
fw := watch.NewFake() fw := watch.NewFake()
@ -104,13 +107,13 @@ func TestRunUntil(t *testing.T) {
doneCh := make(chan struct{}) doneCh := make(chan struct{})
go func() { go func() {
defer close(doneCh) defer close(doneCh)
r.Run(stopCh) r.RunWithContext(ctx)
}() }()
// Synchronously add a dummy pod into the watch channel so we // Synchronously add a dummy pod into the watch channel so we
// know the RunUntil go routine is in the watch handler. // know the RunUntil go routine is in the watch handler.
fw.Add(&v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "bar"}}) fw.Add(&v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "bar"}})
close(stopCh) cancel(errors.New("done"))
resultCh := fw.ResultChan() resultCh := fw.ResultChan()
for { for {
select { select {
@ -149,8 +152,9 @@ func TestReflectorResyncChan(t *testing.T) {
// TestReflectorWatchStoppedBefore ensures that neither List nor Watch are // TestReflectorWatchStoppedBefore ensures that neither List nor Watch are
// called if the stop channel is closed before Reflector.watch is called. // called if the stop channel is closed before Reflector.watch is called.
func TestReflectorWatchStoppedBefore(t *testing.T) { func TestReflectorWatchStoppedBefore(t *testing.T) {
stopCh := make(chan struct{}) _, ctx := ktesting.NewTestContext(t)
close(stopCh) ctx, cancel := context.WithCancelCause(ctx)
cancel(errors.New("don't run"))
lw := &ListWatch{ lw := &ListWatch{
ListFunc: func(_ metav1.ListOptions) (runtime.Object, error) { ListFunc: func(_ metav1.ListOptions) (runtime.Object, error) {
@ -165,14 +169,15 @@ func TestReflectorWatchStoppedBefore(t *testing.T) {
} }
target := NewReflector(lw, &v1.Pod{}, nil, 0) target := NewReflector(lw, &v1.Pod{}, nil, 0)
err := target.watch(nil, stopCh, nil) err := target.watch(ctx, nil, nil)
require.NoError(t, err) require.NoError(t, err)
} }
// TestReflectorWatchStoppedAfter ensures that neither the watcher is stopped if // TestReflectorWatchStoppedAfter ensures that neither the watcher is stopped if
// the stop channel is closed after Reflector.watch has started watching. // the stop channel is closed after Reflector.watch has started watching.
func TestReflectorWatchStoppedAfter(t *testing.T) { func TestReflectorWatchStoppedAfter(t *testing.T) {
stopCh := make(chan struct{}) _, ctx := ktesting.NewTestContext(t)
ctx, cancel := context.WithCancelCause(ctx)
var watchers []*watch.FakeWatcher var watchers []*watch.FakeWatcher
@ -185,7 +190,7 @@ func TestReflectorWatchStoppedAfter(t *testing.T) {
// Simulate the stop channel being closed after watching has started // Simulate the stop channel being closed after watching has started
go func() { go func() {
time.Sleep(10 * time.Millisecond) time.Sleep(10 * time.Millisecond)
close(stopCh) cancel(errors.New("10ms timeout reached"))
}() }()
// Use a fake watcher that never sends events // Use a fake watcher that never sends events
w := watch.NewFake() w := watch.NewFake()
@ -195,7 +200,7 @@ func TestReflectorWatchStoppedAfter(t *testing.T) {
} }
target := NewReflector(lw, &v1.Pod{}, nil, 0) target := NewReflector(lw, &v1.Pod{}, nil, 0)
err := target.watch(nil, stopCh, nil) err := target.watch(ctx, nil, nil)
require.NoError(t, err) require.NoError(t, err)
require.Len(t, watchers, 1) require.Len(t, watchers, 1)
require.True(t, watchers[0].IsStopped()) require.True(t, watchers[0].IsStopped())
@ -219,9 +224,10 @@ func BenchmarkReflectorResyncChanMany(b *testing.B) {
func TestReflectorHandleWatchStoppedBefore(t *testing.T) { func TestReflectorHandleWatchStoppedBefore(t *testing.T) {
s := NewStore(MetaNamespaceKeyFunc) s := NewStore(MetaNamespaceKeyFunc)
g := NewReflector(&testLW{}, &v1.Pod{}, s, 0) g := NewReflector(&testLW{}, &v1.Pod{}, s, 0)
stopCh := make(chan struct{}) _, ctx := ktesting.NewTestContext(t)
// Simulate the watch channel being closed before the watchHandler is called ctx, cancel := context.WithCancelCause(ctx)
close(stopCh) // Simulate the context being canceled before the watchHandler is called
cancel(errors.New("don't run"))
var calls []string var calls []string
resultCh := make(chan watch.Event) resultCh := make(chan watch.Event)
fw := watch.MockWatcher{ fw := watch.MockWatcher{
@ -234,7 +240,7 @@ func TestReflectorHandleWatchStoppedBefore(t *testing.T) {
return resultCh return resultCh
}, },
} }
err := handleWatch(time.Now(), fw, s, g.expectedType, g.expectedGVK, g.name, g.typeDescription, g.setLastSyncResourceVersion, g.clock, nevererrc, stopCh) err := handleWatch(ctx, time.Now(), fw, s, g.expectedType, g.expectedGVK, g.name, g.typeDescription, g.setLastSyncResourceVersion, g.clock, nevererrc)
if err == nil { if err == nil {
t.Errorf("unexpected non-error") t.Errorf("unexpected non-error")
} }
@ -251,7 +257,8 @@ func TestReflectorHandleWatchStoppedAfter(t *testing.T) {
s := NewStore(MetaNamespaceKeyFunc) s := NewStore(MetaNamespaceKeyFunc)
g := NewReflector(&testLW{}, &v1.Pod{}, s, 0) g := NewReflector(&testLW{}, &v1.Pod{}, s, 0)
var calls []string var calls []string
stopCh := make(chan struct{}) _, ctx := ktesting.NewTestContext(t)
ctx, cancel := context.WithCancelCause(ctx)
resultCh := make(chan watch.Event) resultCh := make(chan watch.Event)
fw := watch.MockWatcher{ fw := watch.MockWatcher{
StopFunc: func() { StopFunc: func() {
@ -265,12 +272,12 @@ func TestReflectorHandleWatchStoppedAfter(t *testing.T) {
// caller, after watching has started. // caller, after watching has started.
go func() { go func() {
time.Sleep(10 * time.Millisecond) time.Sleep(10 * time.Millisecond)
close(stopCh) cancel(errors.New("10ms timeout reached"))
}() }()
return resultCh return resultCh
}, },
} }
err := handleWatch(time.Now(), fw, s, g.expectedType, g.expectedGVK, g.name, g.typeDescription, g.setLastSyncResourceVersion, g.clock, nevererrc, stopCh) err := handleWatch(ctx, time.Now(), fw, s, g.expectedType, g.expectedGVK, g.name, g.typeDescription, g.setLastSyncResourceVersion, g.clock, nevererrc)
if err == nil { if err == nil {
t.Errorf("unexpected non-error") t.Errorf("unexpected non-error")
} }
@ -285,6 +292,7 @@ func TestReflectorHandleWatchStoppedAfter(t *testing.T) {
func TestReflectorHandleWatchResultChanClosedBefore(t *testing.T) { func TestReflectorHandleWatchResultChanClosedBefore(t *testing.T) {
s := NewStore(MetaNamespaceKeyFunc) s := NewStore(MetaNamespaceKeyFunc)
g := NewReflector(&testLW{}, &v1.Pod{}, s, 0) g := NewReflector(&testLW{}, &v1.Pod{}, s, 0)
_, ctx := ktesting.NewTestContext(t)
var calls []string var calls []string
resultCh := make(chan watch.Event) resultCh := make(chan watch.Event)
fw := watch.MockWatcher{ fw := watch.MockWatcher{
@ -298,7 +306,7 @@ func TestReflectorHandleWatchResultChanClosedBefore(t *testing.T) {
} }
// Simulate the result channel being closed by the producer before handleWatch is called. // Simulate the result channel being closed by the producer before handleWatch is called.
close(resultCh) close(resultCh)
err := handleWatch(time.Now(), fw, s, g.expectedType, g.expectedGVK, g.name, g.typeDescription, g.setLastSyncResourceVersion, g.clock, nevererrc, wait.NeverStop) err := handleWatch(ctx, time.Now(), fw, s, g.expectedType, g.expectedGVK, g.name, g.typeDescription, g.setLastSyncResourceVersion, g.clock, nevererrc)
if err == nil { if err == nil {
t.Errorf("unexpected non-error") t.Errorf("unexpected non-error")
} }
@ -313,6 +321,7 @@ func TestReflectorHandleWatchResultChanClosedBefore(t *testing.T) {
func TestReflectorHandleWatchResultChanClosedAfter(t *testing.T) { func TestReflectorHandleWatchResultChanClosedAfter(t *testing.T) {
s := NewStore(MetaNamespaceKeyFunc) s := NewStore(MetaNamespaceKeyFunc)
g := NewReflector(&testLW{}, &v1.Pod{}, s, 0) g := NewReflector(&testLW{}, &v1.Pod{}, s, 0)
_, ctx := ktesting.NewTestContext(t)
var calls []string var calls []string
resultCh := make(chan watch.Event) resultCh := make(chan watch.Event)
fw := watch.MockWatcher{ fw := watch.MockWatcher{
@ -331,7 +340,7 @@ func TestReflectorHandleWatchResultChanClosedAfter(t *testing.T) {
return resultCh return resultCh
}, },
} }
err := handleWatch(time.Now(), fw, s, g.expectedType, g.expectedGVK, g.name, g.typeDescription, g.setLastSyncResourceVersion, g.clock, nevererrc, wait.NeverStop) err := handleWatch(ctx, time.Now(), fw, s, g.expectedType, g.expectedGVK, g.name, g.typeDescription, g.setLastSyncResourceVersion, g.clock, nevererrc)
if err == nil { if err == nil {
t.Errorf("unexpected non-error") t.Errorf("unexpected non-error")
} }
@ -348,11 +357,12 @@ func TestReflectorWatchHandler(t *testing.T) {
// watching after all the events have been consumed. This avoids race // watching after all the events have been consumed. This avoids race
// conditions which can happen if the producer calls Stop(), instead of the // conditions which can happen if the producer calls Stop(), instead of the
// consumer. // consumer.
stopCh := make(chan struct{}) _, ctx := ktesting.NewTestContext(t)
ctx, cancel := context.WithCancelCause(ctx)
setLastSyncResourceVersion := func(rv string) { setLastSyncResourceVersion := func(rv string) {
g.setLastSyncResourceVersion(rv) g.setLastSyncResourceVersion(rv)
if rv == "32" { if rv == "32" {
close(stopCh) cancel(errors.New("LastSyncResourceVersion is 32"))
} }
} }
fw := watch.NewFake() fw := watch.NewFake()
@ -365,7 +375,7 @@ func TestReflectorWatchHandler(t *testing.T) {
fw.Add(&v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "baz", ResourceVersion: "32"}}) fw.Add(&v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "baz", ResourceVersion: "32"}})
fw.Stop() fw.Stop()
}() }()
err := handleWatch(time.Now(), fw, s, g.expectedType, g.expectedGVK, g.name, g.typeDescription, setLastSyncResourceVersion, g.clock, nevererrc, stopCh) err := handleWatch(ctx, time.Now(), fw, s, g.expectedType, g.expectedGVK, g.name, g.typeDescription, setLastSyncResourceVersion, g.clock, nevererrc)
// TODO(karlkfi): Fix FakeWatcher to avoid race condition between watcher.Stop() & close(stopCh) // TODO(karlkfi): Fix FakeWatcher to avoid race condition between watcher.Stop() & close(stopCh)
if err != nil && !errors.Is(err, errorStopRequested) { if err != nil && !errors.Is(err, errorStopRequested) {
t.Errorf("unexpected error %v", err) t.Errorf("unexpected error %v", err)
@ -408,15 +418,19 @@ func TestReflectorStopWatch(t *testing.T) {
s := NewStore(MetaNamespaceKeyFunc) s := NewStore(MetaNamespaceKeyFunc)
g := NewReflector(&testLW{}, &v1.Pod{}, s, 0) g := NewReflector(&testLW{}, &v1.Pod{}, s, 0)
fw := watch.NewFake() fw := watch.NewFake()
stopWatch := make(chan struct{}) _, ctx := ktesting.NewTestContext(t)
close(stopWatch) ctx, cancel := context.WithCancelCause(ctx)
err := handleWatch(time.Now(), fw, s, g.expectedType, g.expectedGVK, g.name, g.typeDescription, g.setLastSyncResourceVersion, g.clock, nevererrc, stopWatch) cancel(errors.New("don't run"))
err := handleWatch(ctx, time.Now(), fw, s, g.expectedType, g.expectedGVK, g.name, g.typeDescription, g.setLastSyncResourceVersion, g.clock, nevererrc)
if err != errorStopRequested { if err != errorStopRequested {
t.Errorf("expected stop error, got %q", err) t.Errorf("expected stop error, got %q", err)
} }
} }
func TestReflectorListAndWatch(t *testing.T) { func TestReflectorListAndWatch(t *testing.T) {
_, ctx := ktesting.NewTestContext(t)
ctx, cancel := context.WithCancel(ctx)
defer cancel()
createdFakes := make(chan *watch.FakeWatcher) createdFakes := make(chan *watch.FakeWatcher)
// The ListFunc says that it's at revision 1. Therefore, we expect our WatchFunc // The ListFunc says that it's at revision 1. Therefore, we expect our WatchFunc
@ -442,7 +456,7 @@ func TestReflectorListAndWatch(t *testing.T) {
} }
s := NewFIFO(MetaNamespaceKeyFunc) s := NewFIFO(MetaNamespaceKeyFunc)
r := NewReflector(lw, &v1.Pod{}, s, 0) r := NewReflector(lw, &v1.Pod{}, s, 0)
go r.ListAndWatch(wait.NeverStop) go func() { assert.NoError(t, r.ListAndWatchWithContext(ctx)) }()
ids := []string{"foo", "bar", "baz", "qux", "zoo"} ids := []string{"foo", "bar", "baz", "qux", "zoo"}
var fw *watch.FakeWatcher var fw *watch.FakeWatcher
@ -539,7 +553,8 @@ func TestReflectorListAndWatchWithErrors(t *testing.T) {
} }
} }
watchRet, watchErr := item.events, item.watchErr watchRet, watchErr := item.events, item.watchErr
stopCh := make(chan struct{}) _, ctx := ktesting.NewTestContext(t)
ctx, cancel := context.WithCancelCause(ctx)
lw := &testLW{ lw := &testLW{
WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) { WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
if watchErr != nil { if watchErr != nil {
@ -557,7 +572,7 @@ func TestReflectorListAndWatchWithErrors(t *testing.T) {
// result channel, and wait for the consumer to stop the // result channel, and wait for the consumer to stop the
// watcher, to avoid race conditions. // watcher, to avoid race conditions.
// TODO: Fix the FakeWatcher to separate watcher.Stop from close(resultCh) // TODO: Fix the FakeWatcher to separate watcher.Stop from close(resultCh)
close(stopCh) cancel(errors.New("done"))
}() }()
return fw, nil return fw, nil
}, },
@ -566,7 +581,7 @@ func TestReflectorListAndWatchWithErrors(t *testing.T) {
}, },
} }
r := NewReflector(lw, &v1.Pod{}, s, 0) r := NewReflector(lw, &v1.Pod{}, s, 0)
err := r.ListAndWatch(stopCh) err := r.ListAndWatchWithContext(ctx)
if item.listErr != nil && !errors.Is(err, item.listErr) { if item.listErr != nil && !errors.Is(err, item.listErr) {
t.Errorf("unexpected ListAndWatch error: %v", err) t.Errorf("unexpected ListAndWatch error: %v", err)
} }
@ -593,7 +608,8 @@ func TestReflectorListAndWatchInitConnBackoff(t *testing.T) {
for _, test := range table { for _, test := range table {
t.Run(fmt.Sprintf("%d connection failures takes at least %d ms", test.numConnFails, 1<<test.numConnFails), t.Run(fmt.Sprintf("%d connection failures takes at least %d ms", test.numConnFails, 1<<test.numConnFails),
func(t *testing.T) { func(t *testing.T) {
stopCh := make(chan struct{}) _, ctx := ktesting.NewTestContext(t)
ctx, cancel := context.WithCancelCause(ctx)
connFails := test.numConnFails connFails := test.numConnFails
fakeClock := testingclock.NewFakeClock(time.Unix(0, 0)) fakeClock := testingclock.NewFakeClock(time.Unix(0, 0))
bm := wait.NewExponentialBackoffManager(time.Millisecond, maxBackoff, 100*time.Millisecond, 2.0, 1.0, fakeClock) bm := wait.NewExponentialBackoffManager(time.Millisecond, maxBackoff, 100*time.Millisecond, 2.0, 1.0, fakeClock)
@ -624,7 +640,7 @@ func TestReflectorListAndWatchInitConnBackoff(t *testing.T) {
connFails-- connFails--
return nil, syscall.ECONNREFUSED return nil, syscall.ECONNREFUSED
} }
close(stopCh) cancel(errors.New("done"))
return watch.NewFake(), nil return watch.NewFake(), nil
}, },
ListFunc: func(options metav1.ListOptions) (runtime.Object, error) { ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
@ -637,10 +653,10 @@ func TestReflectorListAndWatchInitConnBackoff(t *testing.T) {
store: NewFIFO(MetaNamespaceKeyFunc), store: NewFIFO(MetaNamespaceKeyFunc),
backoffManager: bm, backoffManager: bm,
clock: fakeClock, clock: fakeClock,
watchErrorHandler: WatchErrorHandler(DefaultWatchErrorHandler), watchErrorHandler: WatchErrorHandlerWithContext(DefaultWatchErrorHandler),
} }
start := fakeClock.Now() start := fakeClock.Now()
err := r.ListAndWatch(stopCh) err := r.ListAndWatchWithContext(ctx)
elapsed := fakeClock.Since(start) elapsed := fakeClock.Since(start)
if err != nil { if err != nil {
t.Errorf("unexpected error %v", err) t.Errorf("unexpected error %v", err)
@ -666,6 +682,7 @@ func (f *fakeBackoff) Backoff() clock.Timer {
} }
func TestBackoffOnTooManyRequests(t *testing.T) { func TestBackoffOnTooManyRequests(t *testing.T) {
_, ctx := ktesting.NewTestContext(t)
err := apierrors.NewTooManyRequests("too many requests", 1) err := apierrors.NewTooManyRequests("too many requests", 1)
clock := &clock.RealClock{} clock := &clock.RealClock{}
bm := &fakeBackoff{clock: clock} bm := &fakeBackoff{clock: clock}
@ -697,11 +714,11 @@ func TestBackoffOnTooManyRequests(t *testing.T) {
store: NewFIFO(MetaNamespaceKeyFunc), store: NewFIFO(MetaNamespaceKeyFunc),
backoffManager: bm, backoffManager: bm,
clock: clock, clock: clock,
watchErrorHandler: WatchErrorHandler(DefaultWatchErrorHandler), watchErrorHandler: WatchErrorHandlerWithContext(DefaultWatchErrorHandler),
} }
stopCh := make(chan struct{}) stopCh := make(chan struct{})
if err := r.ListAndWatch(stopCh); err != nil { if err := r.ListAndWatchWithContext(ctx); err != nil {
t.Fatal(err) t.Fatal(err)
} }
close(stopCh) close(stopCh)
@ -738,14 +755,15 @@ func TestNoRelistOnTooManyRequests(t *testing.T) {
store: NewFIFO(MetaNamespaceKeyFunc), store: NewFIFO(MetaNamespaceKeyFunc),
backoffManager: bm, backoffManager: bm,
clock: clock, clock: clock,
watchErrorHandler: WatchErrorHandler(DefaultWatchErrorHandler), watchErrorHandler: WatchErrorHandlerWithContext(DefaultWatchErrorHandler),
} }
stopCh := make(chan struct{}) _, ctx := ktesting.NewTestContext(t)
if err := r.ListAndWatch(stopCh); err != nil { ctx, cancel := context.WithCancelCause(ctx)
if err := r.ListAndWatchWithContext(ctx); err != nil {
t.Fatal(err) t.Fatal(err)
} }
close(stopCh) cancel(errors.New("done"))
if listCalls != 1 { if listCalls != 1 {
t.Errorf("unexpected list calls: %d", listCalls) t.Errorf("unexpected list calls: %d", listCalls)
} }
@ -812,14 +830,15 @@ func TestRetryInternalError(t *testing.T) {
store: NewFIFO(MetaNamespaceKeyFunc), store: NewFIFO(MetaNamespaceKeyFunc),
backoffManager: bm, backoffManager: bm,
clock: fakeClock, clock: fakeClock,
watchErrorHandler: WatchErrorHandler(DefaultWatchErrorHandler), watchErrorHandler: WatchErrorHandlerWithContext(DefaultWatchErrorHandler),
} }
r.MaxInternalErrorRetryDuration = tc.maxInternalDuration r.MaxInternalErrorRetryDuration = tc.maxInternalDuration
stopCh := make(chan struct{}) _, ctx := ktesting.NewTestContext(t)
r.ListAndWatch(stopCh) ctx, cancel := context.WithCancelCause(ctx)
close(stopCh) require.NoError(t, r.ListAndWatchWithContext(ctx))
cancel(errors.New("done"))
if counter-1 != tc.wantRetries { if counter-1 != tc.wantRetries {
t.Errorf("%v unexpected number of retries: %d", tc, counter-1) t.Errorf("%v unexpected number of retries: %d", tc, counter-1)
@ -829,7 +848,7 @@ func TestRetryInternalError(t *testing.T) {
func TestReflectorResync(t *testing.T) { func TestReflectorResync(t *testing.T) {
iteration := 0 iteration := 0
stopCh := make(chan struct{}) _, ctx := ktesting.NewTestContext(t)
rerr := errors.New("expected resync reached") rerr := errors.New("expected resync reached")
s := &FakeCustomStore{ s := &FakeCustomStore{
ResyncFunc: func() error { ResyncFunc: func() error {
@ -852,7 +871,7 @@ func TestReflectorResync(t *testing.T) {
} }
resyncPeriod := 1 * time.Millisecond resyncPeriod := 1 * time.Millisecond
r := NewReflector(lw, &v1.Pod{}, s, resyncPeriod) r := NewReflector(lw, &v1.Pod{}, s, resyncPeriod)
if err := r.ListAndWatch(stopCh); err != nil { if err := r.ListAndWatchWithContext(ctx); err != nil {
// error from Resync is not propaged up to here. // error from Resync is not propaged up to here.
t.Errorf("expected error %v", err) t.Errorf("expected error %v", err)
} }
@ -862,13 +881,14 @@ func TestReflectorResync(t *testing.T) {
} }
func TestReflectorWatchListPageSize(t *testing.T) { func TestReflectorWatchListPageSize(t *testing.T) {
stopCh := make(chan struct{}) _, ctx := ktesting.NewTestContext(t)
ctx, cancel := context.WithCancelCause(ctx)
s := NewStore(MetaNamespaceKeyFunc) s := NewStore(MetaNamespaceKeyFunc)
lw := &testLW{ lw := &testLW{
WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) { WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
// Stop once the reflector begins watching since we're only interested in the list. // Stop once the reflector begins watching since we're only interested in the list.
close(stopCh) cancel(errors.New("done"))
fw := watch.NewFake() fw := watch.NewFake()
return fw, nil return fw, nil
}, },
@ -898,7 +918,7 @@ func TestReflectorWatchListPageSize(t *testing.T) {
r.setLastSyncResourceVersion("10") r.setLastSyncResourceVersion("10")
// Set the reflector to paginate the list request in 4 item chunks. // Set the reflector to paginate the list request in 4 item chunks.
r.WatchListPageSize = 4 r.WatchListPageSize = 4
r.ListAndWatch(stopCh) require.NoError(t, r.ListAndWatchWithContext(ctx))
results := s.List() results := s.List()
if len(results) != 10 { if len(results) != 10 {
@ -907,13 +927,14 @@ func TestReflectorWatchListPageSize(t *testing.T) {
} }
func TestReflectorNotPaginatingNotConsistentReads(t *testing.T) { func TestReflectorNotPaginatingNotConsistentReads(t *testing.T) {
stopCh := make(chan struct{}) _, ctx := ktesting.NewTestContext(t)
ctx, cancel := context.WithCancelCause(ctx)
s := NewStore(MetaNamespaceKeyFunc) s := NewStore(MetaNamespaceKeyFunc)
lw := &testLW{ lw := &testLW{
WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) { WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
// Stop once the reflector begins watching since we're only interested in the list. // Stop once the reflector begins watching since we're only interested in the list.
close(stopCh) cancel(errors.New("done"))
fw := watch.NewFake() fw := watch.NewFake()
return fw, nil return fw, nil
}, },
@ -933,7 +954,7 @@ func TestReflectorNotPaginatingNotConsistentReads(t *testing.T) {
} }
r := NewReflector(lw, &v1.Pod{}, s, 0) r := NewReflector(lw, &v1.Pod{}, s, 0)
r.setLastSyncResourceVersion("10") r.setLastSyncResourceVersion("10")
r.ListAndWatch(stopCh) require.NoError(t, r.ListAndWatchWithContext(ctx))
results := s.List() results := s.List()
if len(results) != 10 { if len(results) != 10 {
@ -942,13 +963,14 @@ func TestReflectorNotPaginatingNotConsistentReads(t *testing.T) {
} }
func TestReflectorPaginatingNonConsistentReadsIfWatchCacheDisabled(t *testing.T) { func TestReflectorPaginatingNonConsistentReadsIfWatchCacheDisabled(t *testing.T) {
var stopCh chan struct{} _, ctx := ktesting.NewTestContext(t)
var cancel func(error)
s := NewStore(MetaNamespaceKeyFunc) s := NewStore(MetaNamespaceKeyFunc)
lw := &testLW{ lw := &testLW{
WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) { WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
// Stop once the reflector begins watching since we're only interested in the list. // Stop once the reflector begins watching since we're only interested in the list.
close(stopCh) cancel(errors.New("done"))
fw := watch.NewFake() fw := watch.NewFake()
return fw, nil return fw, nil
}, },
@ -977,16 +999,17 @@ func TestReflectorPaginatingNonConsistentReadsIfWatchCacheDisabled(t *testing.T)
r := NewReflector(lw, &v1.Pod{}, s, 0) r := NewReflector(lw, &v1.Pod{}, s, 0)
// Initial list should initialize paginatedResult in the reflector. // Initial list should initialize paginatedResult in the reflector.
stopCh = make(chan struct{}) var cancelCtx context.Context
r.ListAndWatch(stopCh) cancelCtx, cancel = context.WithCancelCause(ctx)
require.NoError(t, r.ListAndWatchWithContext(cancelCtx))
if results := s.List(); len(results) != 10 { if results := s.List(); len(results) != 10 {
t.Errorf("Expected 10 results, got %d", len(results)) t.Errorf("Expected 10 results, got %d", len(results))
} }
// Since initial list for ResourceVersion="0" was paginated, the subsequent // Since initial list for ResourceVersion="0" was paginated, the subsequent
// ones should also be paginated. // ones should also be paginated.
stopCh = make(chan struct{}) cancelCtx, cancel = context.WithCancelCause(ctx)
r.ListAndWatch(stopCh) require.NoError(t, r.ListAndWatchWithContext(cancelCtx))
if results := s.List(); len(results) != 10 { if results := s.List(); len(results) != 10 {
t.Errorf("Expected 10 results, got %d", len(results)) t.Errorf("Expected 10 results, got %d", len(results))
} }
@ -996,14 +1019,15 @@ func TestReflectorPaginatingNonConsistentReadsIfWatchCacheDisabled(t *testing.T)
// it in relist requests to prevent the reflector from traveling back in time if the relist is to a api-server or // it in relist requests to prevent the reflector from traveling back in time if the relist is to a api-server or
// etcd that is partitioned and serving older data than the reflector has already processed. // etcd that is partitioned and serving older data than the reflector has already processed.
func TestReflectorResyncWithResourceVersion(t *testing.T) { func TestReflectorResyncWithResourceVersion(t *testing.T) {
stopCh := make(chan struct{}) _, ctx := ktesting.NewTestContext(t)
cancelCtx, cancel := context.WithCancelCause(ctx)
s := NewStore(MetaNamespaceKeyFunc) s := NewStore(MetaNamespaceKeyFunc)
listCallRVs := []string{} listCallRVs := []string{}
lw := &testLW{ lw := &testLW{
WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) { WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
// Stop once the reflector begins watching since we're only interested in the list. // Stop once the reflector begins watching since we're only interested in the list.
close(stopCh) cancel(errors.New("done"))
fw := watch.NewFake() fw := watch.NewFake()
return fw, nil return fw, nil
}, },
@ -1027,7 +1051,7 @@ func TestReflectorResyncWithResourceVersion(t *testing.T) {
r := NewReflector(lw, &v1.Pod{}, s, 0) r := NewReflector(lw, &v1.Pod{}, s, 0)
// Initial list should use RV=0 // Initial list should use RV=0
r.ListAndWatch(stopCh) require.NoError(t, r.ListAndWatchWithContext(cancelCtx))
results := s.List() results := s.List()
if len(results) != 4 { if len(results) != 4 {
@ -1035,8 +1059,8 @@ func TestReflectorResyncWithResourceVersion(t *testing.T) {
} }
// relist should use lastSyncResourceVersions (RV=10) // relist should use lastSyncResourceVersions (RV=10)
stopCh = make(chan struct{}) cancelCtx, cancel = context.WithCancelCause(ctx)
r.ListAndWatch(stopCh) require.NoError(t, r.ListAndWatchWithContext(cancelCtx))
results = s.List() results = s.List()
if len(results) != 8 { if len(results) != 8 {
@ -1055,14 +1079,16 @@ func TestReflectorResyncWithResourceVersion(t *testing.T) {
// (In kubernetes 1.17, or when the watch cache is enabled, the List will instead return the list that is no older than // (In kubernetes 1.17, or when the watch cache is enabled, the List will instead return the list that is no older than
// the requested ResourceVersion). // the requested ResourceVersion).
func TestReflectorExpiredExactResourceVersion(t *testing.T) { func TestReflectorExpiredExactResourceVersion(t *testing.T) {
stopCh := make(chan struct{}) _, ctx := ktesting.NewTestContext(t)
var cancelCtx context.Context
var cancel func(error)
s := NewStore(MetaNamespaceKeyFunc) s := NewStore(MetaNamespaceKeyFunc)
listCallRVs := []string{} listCallRVs := []string{}
lw := &testLW{ lw := &testLW{
WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) { WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
// Stop once the reflector begins watching since we're only interested in the list. // Stop once the reflector begins watching since we're only interested in the list.
close(stopCh) cancel(errors.New("done"))
fw := watch.NewFake() fw := watch.NewFake()
return fw, nil return fw, nil
}, },
@ -1089,7 +1115,8 @@ func TestReflectorExpiredExactResourceVersion(t *testing.T) {
r := NewReflector(lw, &v1.Pod{}, s, 0) r := NewReflector(lw, &v1.Pod{}, s, 0)
// Initial list should use RV=0 // Initial list should use RV=0
r.ListAndWatch(stopCh) cancelCtx, cancel = context.WithCancelCause(ctx)
require.NoError(t, r.ListAndWatchWithContext(cancelCtx))
results := s.List() results := s.List()
if len(results) != 4 { if len(results) != 4 {
@ -1097,8 +1124,8 @@ func TestReflectorExpiredExactResourceVersion(t *testing.T) {
} }
// relist should use lastSyncResourceVersions (RV=10) and since RV=10 is expired, it should retry with RV="". // relist should use lastSyncResourceVersions (RV=10) and since RV=10 is expired, it should retry with RV="".
stopCh = make(chan struct{}) cancelCtx, cancel = context.WithCancelCause(ctx)
r.ListAndWatch(stopCh) require.NoError(t, r.ListAndWatchWithContext(cancelCtx))
results = s.List() results = s.List()
if len(results) != 8 { if len(results) != 8 {
@ -1112,14 +1139,16 @@ func TestReflectorExpiredExactResourceVersion(t *testing.T) {
} }
func TestReflectorFullListIfExpired(t *testing.T) { func TestReflectorFullListIfExpired(t *testing.T) {
stopCh := make(chan struct{}) _, ctx := ktesting.NewTestContext(t)
var cancelCtx context.Context
var cancel func(error)
s := NewStore(MetaNamespaceKeyFunc) s := NewStore(MetaNamespaceKeyFunc)
listCallRVs := []string{} listCallRVs := []string{}
lw := &testLW{ lw := &testLW{
WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) { WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
// Stop once the reflector begins watching since we're only interested in the list. // Stop once the reflector begins watching since we're only interested in the list.
close(stopCh) cancel(errors.New("done"))
fw := watch.NewFake() fw := watch.NewFake()
return fw, nil return fw, nil
}, },
@ -1156,7 +1185,8 @@ func TestReflectorFullListIfExpired(t *testing.T) {
r.WatchListPageSize = 4 r.WatchListPageSize = 4
// Initial list should use RV=0 // Initial list should use RV=0
if err := r.ListAndWatch(stopCh); err != nil { cancelCtx, cancel = context.WithCancelCause(ctx)
if err := r.ListAndWatchWithContext(cancelCtx); err != nil {
t.Fatal(err) t.Fatal(err)
} }
@ -1166,8 +1196,8 @@ func TestReflectorFullListIfExpired(t *testing.T) {
} }
// relist should use lastSyncResourceVersions (RV=10) and since second page of that expired, it should full list with RV=10 // relist should use lastSyncResourceVersions (RV=10) and since second page of that expired, it should full list with RV=10
stopCh = make(chan struct{}) cancelCtx, cancel = context.WithCancelCause(ctx)
if err := r.ListAndWatch(stopCh); err != nil { if err := r.ListAndWatchWithContext(cancelCtx); err != nil {
t.Fatal(err) t.Fatal(err)
} }
@ -1183,7 +1213,9 @@ func TestReflectorFullListIfExpired(t *testing.T) {
} }
func TestReflectorFullListIfTooLarge(t *testing.T) { func TestReflectorFullListIfTooLarge(t *testing.T) {
stopCh := make(chan struct{}) _, ctx := ktesting.NewTestContext(t)
var cancelCtx context.Context
var cancel func(error)
s := NewStore(MetaNamespaceKeyFunc) s := NewStore(MetaNamespaceKeyFunc)
listCallRVs := []string{} listCallRVs := []string{}
version := 30 version := 30
@ -1191,7 +1223,7 @@ func TestReflectorFullListIfTooLarge(t *testing.T) {
lw := &testLW{ lw := &testLW{
WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) { WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
// Stop once the reflector begins watching since we're only interested in the list. // Stop once the reflector begins watching since we're only interested in the list.
close(stopCh) cancel(errors.New("done"))
fw := watch.NewFake() fw := watch.NewFake()
return fw, nil return fw, nil
}, },
@ -1229,7 +1261,8 @@ func TestReflectorFullListIfTooLarge(t *testing.T) {
r := NewReflector(lw, &v1.Pod{}, s, 0) r := NewReflector(lw, &v1.Pod{}, s, 0)
// Initial list should use RV=0 // Initial list should use RV=0
if err := r.ListAndWatch(stopCh); err != nil { cancelCtx, cancel = context.WithCancelCause(ctx)
if err := r.ListAndWatchWithContext(cancelCtx); err != nil {
t.Fatal(err) t.Fatal(err)
} }
@ -1241,8 +1274,8 @@ func TestReflectorFullListIfTooLarge(t *testing.T) {
// done we simply try to relist from now to avoid continuous errors on relists. // done we simply try to relist from now to avoid continuous errors on relists.
for i := 1; i <= 3; i++ { for i := 1; i <= 3; i++ {
// relist twice to cover the two variants of TooLargeResourceVersion api errors // relist twice to cover the two variants of TooLargeResourceVersion api errors
stopCh = make(chan struct{}) cancelCtx, cancel = context.WithCancelCause(ctx)
if err := r.ListAndWatch(stopCh); err != nil { if err := r.ListAndWatchWithContext(cancelCtx); err != nil {
t.Fatal(err) t.Fatal(err)
} }
} }
@ -1356,7 +1389,8 @@ func TestWatchTimeout(t *testing.T) {
for _, tc := range testCases { for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) { t.Run(tc.name, func(t *testing.T) {
stopCh := make(chan struct{}) _, ctx := ktesting.NewTestContext(t)
ctx, cancel := context.WithCancelCause(ctx)
s := NewStore(MetaNamespaceKeyFunc) s := NewStore(MetaNamespaceKeyFunc)
var gotTimeoutSeconds int64 var gotTimeoutSeconds int64
@ -1370,7 +1404,7 @@ func TestWatchTimeout(t *testing.T) {
} }
// Stop once the reflector begins watching since we're only interested in the list. // Stop once the reflector begins watching since we're only interested in the list.
close(stopCh) cancel(errors.New("done"))
return watch.NewFake(), nil return watch.NewFake(), nil
}, },
} }
@ -1379,7 +1413,7 @@ func TestWatchTimeout(t *testing.T) {
MinWatchTimeout: tc.minWatchTimeout, MinWatchTimeout: tc.minWatchTimeout,
} }
r := NewReflectorWithOptions(lw, &v1.Pod{}, s, opts) r := NewReflectorWithOptions(lw, &v1.Pod{}, s, opts)
if err := r.ListAndWatch(stopCh); err != nil { if err := r.ListAndWatchWithContext(ctx); err != nil {
t.Fatal(err) t.Fatal(err)
} }
@ -1412,7 +1446,8 @@ func newStoreWithRV() *storeWithRV {
func TestReflectorResourceVersionUpdate(t *testing.T) { func TestReflectorResourceVersionUpdate(t *testing.T) {
s := newStoreWithRV() s := newStoreWithRV()
stopCh := make(chan struct{}) _, ctx := ktesting.NewTestContext(t)
ctx, cancel := context.WithCancelCause(ctx)
fw := watch.NewFake() fw := watch.NewFake()
lw := &testLW{ lw := &testLW{
@ -1434,11 +1469,11 @@ func TestReflectorResourceVersionUpdate(t *testing.T) {
fw.Action(watch.Modified, makePod("20")) fw.Action(watch.Modified, makePod("20"))
fw.Action(watch.Bookmark, makePod("30")) fw.Action(watch.Bookmark, makePod("30"))
fw.Action(watch.Deleted, makePod("40")) fw.Action(watch.Deleted, makePod("40"))
close(stopCh) cancel(errors.New("done"))
}() }()
// Initial list should use RV=0 // Initial list should use RV=0
if err := r.ListAndWatch(stopCh); err != nil { if err := r.ListAndWatchWithContext(ctx); err != nil {
t.Fatal(err) t.Fatal(err)
} }
@ -1680,6 +1715,7 @@ func (t *TestPagingPodsLW) Watch(options metav1.ListOptions) (watch.Interface, e
} }
func TestReflectorListExtract(t *testing.T) { func TestReflectorListExtract(t *testing.T) {
_, ctx := ktesting.NewTestContext(t)
store := NewStore(func(obj interface{}) (string, error) { store := NewStore(func(obj interface{}) (string, error) {
pod, ok := obj.(*v1.Pod) pod, ok := obj.(*v1.Pod)
if !ok { if !ok {
@ -1693,8 +1729,7 @@ func TestReflectorListExtract(t *testing.T) {
reflector.WatchListPageSize = fakeItemsNum reflector.WatchListPageSize = fakeItemsNum
// execute list to fill store // execute list to fill store
stopCh := make(chan struct{}) if err := reflector.list(ctx); err != nil {
if err := reflector.list(stopCh); err != nil {
t.Fatal(err) t.Fatal(err)
} }
@ -2074,7 +2109,7 @@ func BenchmarkReflectorList(b *testing.B) {
b.ResetTimer() b.ResetTimer()
for i := 0; i < b.N; i++ { for i := 0; i < b.N; i++ {
err := reflector.list(ctx.Done()) err := reflector.list(ctx)
if err != nil { if err != nil {
b.Fatalf("reflect list: %v", err) b.Fatalf("reflect list: %v", err)
} }

View File

@ -17,6 +17,7 @@ limitations under the License.
package cache package cache
import ( import (
"context"
"errors" "errors"
"fmt" "fmt"
"sort" "sort"
@ -35,6 +36,7 @@ import (
"k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/watch" "k8s.io/apimachinery/pkg/watch"
"k8s.io/klog/v2/ktesting"
testingclock "k8s.io/utils/clock/testing" testingclock "k8s.io/utils/clock/testing"
"k8s.io/utils/pointer" "k8s.io/utils/pointer"
"k8s.io/utils/ptr" "k8s.io/utils/ptr"
@ -51,8 +53,9 @@ func TestInitialEventsEndBookmarkTicker(t *testing.T) {
} }
t.Run("testing NoopInitialEventsEndBookmarkTicker", func(t *testing.T) { t.Run("testing NoopInitialEventsEndBookmarkTicker", func(t *testing.T) {
logger, _ := ktesting.NewTestContext(t)
clock := testingclock.NewFakeClock(time.Now()) clock := testingclock.NewFakeClock(time.Now())
target := newInitialEventsEndBookmarkTickerInternal("testName", clock, clock.Now(), time.Second, false) target := newInitialEventsEndBookmarkTickerInternal(logger, "testName", clock, clock.Now(), time.Second, false)
clock.Step(30 * time.Second) clock.Step(30 * time.Second)
assertNoEvents(t, target.C()) assertNoEvents(t, target.C())
@ -73,8 +76,9 @@ func TestInitialEventsEndBookmarkTicker(t *testing.T) {
}) })
t.Run("testing InitialEventsEndBookmarkTicker backed by a fake clock", func(t *testing.T) { t.Run("testing InitialEventsEndBookmarkTicker backed by a fake clock", func(t *testing.T) {
logger, _ := ktesting.NewTestContext(t)
clock := testingclock.NewFakeClock(time.Now()) clock := testingclock.NewFakeClock(time.Now())
target := newInitialEventsEndBookmarkTickerInternal("testName", clock, clock.Now(), time.Second, true) target := newInitialEventsEndBookmarkTickerInternal(logger, "testName", clock, clock.Now(), time.Second, true)
clock.Step(500 * time.Millisecond) clock.Step(500 * time.Millisecond)
assertNoEvents(t, target.C()) assertNoEvents(t, target.C())
@ -469,7 +473,8 @@ func TestWatchList(t *testing.T) {
for _, s := range scenarios { for _, s := range scenarios {
t.Run(s.name, func(t *testing.T) { t.Run(s.name, func(t *testing.T) {
scenario := s // capture as local variable scenario := s // capture as local variable
listWatcher, store, reflector, stopCh := testData() _, ctx := ktesting.NewTestContext(t)
listWatcher, store, reflector, ctx, cancel := testData(ctx)
go func() { go func() {
for i, e := range scenario.watchEvents { for i, e := range scenario.watchEvents {
listWatcher.fakeWatcher.Action(e.Type, e.Object) listWatcher.fakeWatcher.Action(e.Type, e.Object)
@ -478,7 +483,7 @@ func TestWatchList(t *testing.T) {
continue continue
} }
if i+1 == scenario.closeAfterWatchEvents { if i+1 == scenario.closeAfterWatchEvents {
close(stopCh) cancel(fmt.Errorf("done after %d watch events", i))
} }
} }
}() }()
@ -490,7 +495,7 @@ func TestWatchList(t *testing.T) {
reflector.UseWatchList = ptr.To(false) reflector.UseWatchList = ptr.To(false)
} }
err := reflector.ListAndWatch(stopCh) err := reflector.ListAndWatchWithContext(ctx)
if scenario.expectedError != nil && err == nil { if scenario.expectedError != nil && err == nil {
t.Fatalf("expected error %q, got nil", scenario.expectedError) t.Fatalf("expected error %q, got nil", scenario.expectedError)
} }
@ -567,19 +572,19 @@ func makePod(name, rv string) *v1.Pod {
return &v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: name, ResourceVersion: rv, UID: types.UID(name)}} return &v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: name, ResourceVersion: rv, UID: types.UID(name)}}
} }
func testData() (*fakeListWatcher, Store, *Reflector, chan struct{}) { func testData(ctx context.Context) (*fakeListWatcher, Store, *Reflector, context.Context, func(error)) {
ctx, cancel := context.WithCancelCause(ctx)
s := NewStore(MetaNamespaceKeyFunc) s := NewStore(MetaNamespaceKeyFunc)
stopCh := make(chan struct{})
lw := &fakeListWatcher{ lw := &fakeListWatcher{
fakeWatcher: watch.NewFake(), fakeWatcher: watch.NewFake(),
stop: func() { stop: func() {
close(stopCh) cancel(errors.New("time to stop"))
}, },
} }
r := NewReflector(lw, &v1.Pod{}, s, 0) r := NewReflector(lw, &v1.Pod{}, s, 0)
r.UseWatchList = ptr.To(true) r.UseWatchList = ptr.To(true)
return lw, s, r, stopCh return lw, s, r, ctx, cancel
} }
type fakeListWatcher struct { type fakeListWatcher struct {

View File

@ -17,6 +17,7 @@ limitations under the License.
package cache package cache
import ( import (
"context"
"errors" "errors"
"fmt" "fmt"
"sync" "sync"
@ -29,6 +30,7 @@ import (
"k8s.io/client-go/tools/cache/synctrack" "k8s.io/client-go/tools/cache/synctrack"
"k8s.io/utils/buffer" "k8s.io/utils/buffer"
"k8s.io/utils/clock" "k8s.io/utils/clock"
"k8s.io/utils/ptr"
"k8s.io/klog/v2" "k8s.io/klog/v2"
@ -142,6 +144,8 @@ type SharedInformer interface {
// It returns a registration handle for the handler that can be used to // It returns a registration handle for the handler that can be used to
// remove the handler again, or to tell if the handler is synced (has // remove the handler again, or to tell if the handler is synced (has
// seen every item in the initial list). // seen every item in the initial list).
//
// Contextual logging: AddEventHandlerWithOptions together with a logger in the options should be used instead of AddEventHandler in code which supports contextual logging.
AddEventHandler(handler ResourceEventHandler) (ResourceEventHandlerRegistration, error) AddEventHandler(handler ResourceEventHandler) (ResourceEventHandlerRegistration, error)
// AddEventHandlerWithResyncPeriod adds an event handler to the // AddEventHandlerWithResyncPeriod adds an event handler to the
// shared informer with the requested resync period; zero means // shared informer with the requested resync period; zero means
@ -159,7 +163,12 @@ type SharedInformer interface {
// be competing load and scheduling noise. // be competing load and scheduling noise.
// It returns a registration handle for the handler that can be used to remove // It returns a registration handle for the handler that can be used to remove
// the handler again and an error if the handler cannot be added. // the handler again and an error if the handler cannot be added.
//
// Contextual logging: AddEventHandlerWithOptions together with a logger in the options should be used instead of AddEventHandlerWithResyncPeriod in code which supports contextual logging.
AddEventHandlerWithResyncPeriod(handler ResourceEventHandler, resyncPeriod time.Duration) (ResourceEventHandlerRegistration, error) AddEventHandlerWithResyncPeriod(handler ResourceEventHandler, resyncPeriod time.Duration) (ResourceEventHandlerRegistration, error)
// AddEventHandlerWithOptions is a variant of AddEventHandlerWithResyncPeriod where
// all optional parameters are passed in a struct.
AddEventHandlerWithOptions(handler ResourceEventHandler, options HandlerOptions) (ResourceEventHandlerRegistration, error)
// RemoveEventHandler removes a formerly added event handler given by // RemoveEventHandler removes a formerly added event handler given by
// its registration handle. // its registration handle.
// This function is guaranteed to be idempotent, and thread-safe. // This function is guaranteed to be idempotent, and thread-safe.
@ -170,7 +179,12 @@ type SharedInformer interface {
GetController() Controller GetController() Controller
// Run starts and runs the shared informer, returning after it stops. // Run starts and runs the shared informer, returning after it stops.
// The informer will be stopped when stopCh is closed. // The informer will be stopped when stopCh is closed.
//
// Contextual logging: RunWithContext should be used instead of Run in code which uses contextual logging.
Run(stopCh <-chan struct{}) Run(stopCh <-chan struct{})
// RunWithContext starts and runs the shared informer, returning after it stops.
// The informer will be stopped when the context is canceled.
RunWithContext(ctx context.Context)
// HasSynced returns true if the shared informer's store has been // HasSynced returns true if the shared informer's store has been
// informed by at least one full LIST of the authoritative state // informed by at least one full LIST of the authoritative state
// of the informer's object collection. This is unrelated to "resync". // of the informer's object collection. This is unrelated to "resync".
@ -197,8 +211,14 @@ type SharedInformer interface {
// The handler is intended for visibility, not to e.g. pause the consumers. // The handler is intended for visibility, not to e.g. pause the consumers.
// The handler should return quickly - any expensive processing should be // The handler should return quickly - any expensive processing should be
// offloaded. // offloaded.
//
// Contextual logging: SetWatchErrorHandlerWithContext should be used instead of SetWatchErrorHandler in code which supports contextual logging.
SetWatchErrorHandler(handler WatchErrorHandler) error SetWatchErrorHandler(handler WatchErrorHandler) error
// SetWatchErrorHandlerWithContext is a variant of SetWatchErrorHandler where
// the handler is passed an additional context parameter.
SetWatchErrorHandlerWithContext(handler WatchErrorHandlerWithContext) error
// The TransformFunc is called for each object which is about to be stored. // The TransformFunc is called for each object which is about to be stored.
// //
// This function is intended for you to take the opportunity to // This function is intended for you to take the opportunity to
@ -228,6 +248,21 @@ type ResourceEventHandlerRegistration interface {
HasSynced() bool HasSynced() bool
} }
// Optional configuration options for [SharedInformer.AddEventHandlerWithOptions].
// May be left empty.
type HandlerOptions struct {
// Logger overrides the default klog.Background() logger.
Logger *klog.Logger
// ResyncPeriod requests a certain resync period from an informer. Zero
// means the handler does not care about resyncs. Not all informers do
// resyncs, even if requested. See
// [SharedInformer.AddEventHandlerWithResyncPeriod] for details.
//
// If nil, the default resync period of the shared informer is used.
ResyncPeriod *time.Duration
}
// SharedIndexInformer provides add and get Indexers ability based on SharedInformer. // SharedIndexInformer provides add and get Indexers ability based on SharedInformer.
type SharedIndexInformer interface { type SharedIndexInformer interface {
SharedInformer SharedInformer
@ -309,15 +344,38 @@ const (
// WaitForNamedCacheSync is a wrapper around WaitForCacheSync that generates log messages // WaitForNamedCacheSync is a wrapper around WaitForCacheSync that generates log messages
// indicating that the caller identified by name is waiting for syncs, followed by // indicating that the caller identified by name is waiting for syncs, followed by
// either a successful or failed sync. // either a successful or failed sync.
//
// Contextual logging: WaitForNamedCacheSyncWithContext should be used instead of WaitForNamedCacheSync in code which supports contextual logging.
func WaitForNamedCacheSync(controllerName string, stopCh <-chan struct{}, cacheSyncs ...InformerSynced) bool { func WaitForNamedCacheSync(controllerName string, stopCh <-chan struct{}, cacheSyncs ...InformerSynced) bool {
klog.Infof("Waiting for caches to sync for %s", controllerName) klog.Background().Info("Waiting for caches to sync", "controller", controllerName)
if !WaitForCacheSync(stopCh, cacheSyncs...) { if !WaitForCacheSync(stopCh, cacheSyncs...) {
utilruntime.HandleError(fmt.Errorf("unable to sync caches for %s", controllerName)) utilruntime.HandleErrorWithContext(context.Background(), nil, "Unable to sync caches", "controller", controllerName)
return false return false
} }
klog.Infof("Caches are synced for %s", controllerName) klog.Background().Info("Caches are synced", "controller", controllerName)
return true
}
// WaitForNamedCacheSyncWithContext is a wrapper around WaitForCacheSyncWithContext that generates log messages
// indicating that the caller is waiting for syncs, followed by either a successful or failed sync.
//
// Contextual logging can be used to identify the caller in those log messages. The log level is zero,
// the same as in [WaitForNamedCacheSync]. If this is too verbose, then store a logger with an increased
// threshold in the context:
//
// WaitForNamedCacheSyncWithContext(klog.NewContext(ctx, logger.V(5)), ...)
func WaitForNamedCacheSyncWithContext(ctx context.Context, cacheSyncs ...InformerSynced) bool {
logger := klog.FromContext(ctx)
logger.Info("Waiting for caches to sync")
if !WaitForCacheSync(ctx.Done(), cacheSyncs...) {
utilruntime.HandleErrorWithContext(ctx, nil, "Unable to sync caches")
return false
}
logger.Info("Caches are synced")
return true return true
} }
@ -389,7 +447,7 @@ type sharedIndexInformer struct {
blockDeltas sync.Mutex blockDeltas sync.Mutex
// Called whenever the ListAndWatch drops the connection with an error. // Called whenever the ListAndWatch drops the connection with an error.
watchErrorHandler WatchErrorHandler watchErrorHandler WatchErrorHandlerWithContext
transform TransformFunc transform TransformFunc
} }
@ -403,6 +461,9 @@ type dummyController struct {
informer *sharedIndexInformer informer *sharedIndexInformer
} }
func (v *dummyController) RunWithContext(context.Context) {
}
func (v *dummyController) Run(stopCh <-chan struct{}) { func (v *dummyController) Run(stopCh <-chan struct{}) {
} }
@ -433,6 +494,12 @@ type deleteNotification struct {
} }
func (s *sharedIndexInformer) SetWatchErrorHandler(handler WatchErrorHandler) error { func (s *sharedIndexInformer) SetWatchErrorHandler(handler WatchErrorHandler) error {
return s.SetWatchErrorHandlerWithContext(func(_ context.Context, r *Reflector, err error) {
handler(r, err)
})
}
func (s *sharedIndexInformer) SetWatchErrorHandlerWithContext(handler WatchErrorHandlerWithContext) error {
s.startedLock.Lock() s.startedLock.Lock()
defer s.startedLock.Unlock() defer s.startedLock.Unlock()
@ -457,10 +524,15 @@ func (s *sharedIndexInformer) SetTransform(handler TransformFunc) error {
} }
func (s *sharedIndexInformer) Run(stopCh <-chan struct{}) { func (s *sharedIndexInformer) Run(stopCh <-chan struct{}) {
defer utilruntime.HandleCrash() s.RunWithContext(wait.ContextForChannel(stopCh))
}
func (s *sharedIndexInformer) RunWithContext(ctx context.Context) {
defer utilruntime.HandleCrashWithContext(ctx)
logger := klog.FromContext(ctx)
if s.HasStarted() { if s.HasStarted() {
klog.Warningf("The sharedIndexInformer has started, run more than once is not allowed") logger.Info("Warning: the sharedIndexInformer has started, run more than once is not allowed")
return return
} }
@ -484,7 +556,7 @@ func (s *sharedIndexInformer) Run(stopCh <-chan struct{}) {
ShouldResync: s.processor.shouldResync, ShouldResync: s.processor.shouldResync,
Process: s.HandleDeltas, Process: s.HandleDeltas,
WatchErrorHandler: s.watchErrorHandler, WatchErrorHandlerWithContext: s.watchErrorHandler,
} }
s.controller = New(cfg) s.controller = New(cfg)
@ -492,20 +564,24 @@ func (s *sharedIndexInformer) Run(stopCh <-chan struct{}) {
s.started = true s.started = true
}() }()
// Separate stop channel because Processor should be stopped strictly after controller // Separate stop context because Processor should be stopped strictly after controller.
processorStopCh := make(chan struct{}) // Cancelation in the parent context is ignored and all values are passed on,
// including - but not limited to - a logger.
processorStopCtx, stopProcessor := context.WithCancelCause(context.WithoutCancel(ctx))
var wg wait.Group var wg wait.Group
defer wg.Wait() // Wait for Processor to stop defer wg.Wait() // Wait for Processor to stop
defer close(processorStopCh) // Tell Processor to stop defer stopProcessor(errors.New("informer is stopping")) // Tell Processor to stop
wg.StartWithChannel(processorStopCh, s.cacheMutationDetector.Run) // TODO: extend the MutationDetector interface so that it optionally
wg.StartWithChannel(processorStopCh, s.processor.run) // has a RunWithContext method that we can use here.
wg.StartWithChannel(processorStopCtx.Done(), s.cacheMutationDetector.Run)
wg.StartWithContext(processorStopCtx, s.processor.run)
defer func() { defer func() {
s.startedLock.Lock() s.startedLock.Lock()
defer s.startedLock.Unlock() defer s.startedLock.Unlock()
s.stopped = true // Don't want any new listeners s.stopped = true // Don't want any new listeners
}() }()
s.controller.Run(stopCh) s.controller.RunWithContext(ctx)
} }
func (s *sharedIndexInformer) HasStarted() bool { func (s *sharedIndexInformer) HasStarted() bool {
@ -558,19 +634,19 @@ func (s *sharedIndexInformer) GetController() Controller {
} }
func (s *sharedIndexInformer) AddEventHandler(handler ResourceEventHandler) (ResourceEventHandlerRegistration, error) { func (s *sharedIndexInformer) AddEventHandler(handler ResourceEventHandler) (ResourceEventHandlerRegistration, error) {
return s.AddEventHandlerWithResyncPeriod(handler, s.defaultEventHandlerResyncPeriod) return s.AddEventHandlerWithOptions(handler, HandlerOptions{})
} }
func determineResyncPeriod(desired, check time.Duration) time.Duration { func determineResyncPeriod(logger klog.Logger, desired, check time.Duration) time.Duration {
if desired == 0 { if desired == 0 {
return desired return desired
} }
if check == 0 { if check == 0 {
klog.Warningf("The specified resyncPeriod %v is invalid because this shared informer doesn't support resyncing", desired) logger.Info("Warning: the specified resyncPeriod is invalid because this shared informer doesn't support resyncing", "desired", desired)
return 0 return 0
} }
if desired < check { if desired < check {
klog.Warningf("The specified resyncPeriod %v is being increased to the minimum resyncCheckPeriod %v", desired, check) logger.Info("Warning: the specified resyncPeriod is being increased to the minimum resyncCheckPeriod", "desired", desired, "resyncCheckPeriod", check)
return check return check
} }
return desired return desired
@ -579,6 +655,10 @@ func determineResyncPeriod(desired, check time.Duration) time.Duration {
const minimumResyncPeriod = 1 * time.Second const minimumResyncPeriod = 1 * time.Second
func (s *sharedIndexInformer) AddEventHandlerWithResyncPeriod(handler ResourceEventHandler, resyncPeriod time.Duration) (ResourceEventHandlerRegistration, error) { func (s *sharedIndexInformer) AddEventHandlerWithResyncPeriod(handler ResourceEventHandler, resyncPeriod time.Duration) (ResourceEventHandlerRegistration, error) {
return s.AddEventHandlerWithOptions(handler, HandlerOptions{ResyncPeriod: &resyncPeriod})
}
func (s *sharedIndexInformer) AddEventHandlerWithOptions(handler ResourceEventHandler, options HandlerOptions) (ResourceEventHandlerRegistration, error) {
s.startedLock.Lock() s.startedLock.Lock()
defer s.startedLock.Unlock() defer s.startedLock.Unlock()
@ -586,27 +666,30 @@ func (s *sharedIndexInformer) AddEventHandlerWithResyncPeriod(handler ResourceEv
return nil, fmt.Errorf("handler %v was not added to shared informer because it has stopped already", handler) return nil, fmt.Errorf("handler %v was not added to shared informer because it has stopped already", handler)
} }
logger := ptr.Deref(options.Logger, klog.Background())
resyncPeriod := ptr.Deref(options.ResyncPeriod, s.defaultEventHandlerResyncPeriod)
if resyncPeriod > 0 { if resyncPeriod > 0 {
if resyncPeriod < minimumResyncPeriod { if resyncPeriod < minimumResyncPeriod {
klog.Warningf("resyncPeriod %v is too small. Changing it to the minimum allowed value of %v", resyncPeriod, minimumResyncPeriod) logger.Info("Warning: resync period is too small. Changing it to the minimum allowed value", "resyncPeriod", resyncPeriod, "minimumResyncPeriod", minimumResyncPeriod)
resyncPeriod = minimumResyncPeriod resyncPeriod = minimumResyncPeriod
} }
if resyncPeriod < s.resyncCheckPeriod { if resyncPeriod < s.resyncCheckPeriod {
if s.started { if s.started {
klog.Warningf("resyncPeriod %v is smaller than resyncCheckPeriod %v and the informer has already started. Changing it to %v", resyncPeriod, s.resyncCheckPeriod, s.resyncCheckPeriod) logger.Info("Warning: resync period is smaller than resync check period and the informer has already started. Changing it to the resync check period", "resyncPeriod", resyncPeriod, "resyncCheckPeriod", s.resyncCheckPeriod)
resyncPeriod = s.resyncCheckPeriod resyncPeriod = s.resyncCheckPeriod
} else { } else {
// if the event handler's resyncPeriod is smaller than the current resyncCheckPeriod, update // if the event handler's resyncPeriod is smaller than the current resyncCheckPeriod, update
// resyncCheckPeriod to match resyncPeriod and adjust the resync periods of all the listeners // resyncCheckPeriod to match resyncPeriod and adjust the resync periods of all the listeners
// accordingly // accordingly
s.resyncCheckPeriod = resyncPeriod s.resyncCheckPeriod = resyncPeriod
s.processor.resyncCheckPeriodChanged(resyncPeriod) s.processor.resyncCheckPeriodChanged(logger, resyncPeriod)
} }
} }
} }
listener := newProcessListener(handler, resyncPeriod, determineResyncPeriod(resyncPeriod, s.resyncCheckPeriod), s.clock.Now(), initialBufferSize, s.HasSynced) listener := newProcessListener(logger, handler, resyncPeriod, determineResyncPeriod(logger, resyncPeriod, s.resyncCheckPeriod), s.clock.Now(), initialBufferSize, s.HasSynced)
if !s.started { if !s.started {
return s.processor.addListener(listener), nil return s.processor.addListener(listener), nil
@ -794,7 +877,7 @@ func (p *sharedProcessor) distribute(obj interface{}, sync bool) {
} }
} }
func (p *sharedProcessor) run(stopCh <-chan struct{}) { func (p *sharedProcessor) run(ctx context.Context) {
func() { func() {
p.listenersLock.RLock() p.listenersLock.RLock()
defer p.listenersLock.RUnlock() defer p.listenersLock.RUnlock()
@ -804,7 +887,7 @@ func (p *sharedProcessor) run(stopCh <-chan struct{}) {
} }
p.listenersStarted = true p.listenersStarted = true
}() }()
<-stopCh <-ctx.Done()
p.listenersLock.Lock() p.listenersLock.Lock()
defer p.listenersLock.Unlock() defer p.listenersLock.Unlock()
@ -844,13 +927,13 @@ func (p *sharedProcessor) shouldResync() bool {
return resyncNeeded return resyncNeeded
} }
func (p *sharedProcessor) resyncCheckPeriodChanged(resyncCheckPeriod time.Duration) { func (p *sharedProcessor) resyncCheckPeriodChanged(logger klog.Logger, resyncCheckPeriod time.Duration) {
p.listenersLock.RLock() p.listenersLock.RLock()
defer p.listenersLock.RUnlock() defer p.listenersLock.RUnlock()
for listener := range p.listeners { for listener := range p.listeners {
resyncPeriod := determineResyncPeriod( resyncPeriod := determineResyncPeriod(
listener.requestedResyncPeriod, resyncCheckPeriod) logger, listener.requestedResyncPeriod, resyncCheckPeriod)
listener.setResyncPeriod(resyncPeriod) listener.setResyncPeriod(resyncPeriod)
} }
} }
@ -867,6 +950,7 @@ func (p *sharedProcessor) resyncCheckPeriodChanged(resyncCheckPeriod time.Durati
// processorListener also keeps track of the adjusted requested resync // processorListener also keeps track of the adjusted requested resync
// period of the listener. // period of the listener.
type processorListener struct { type processorListener struct {
logger klog.Logger
nextCh chan interface{} nextCh chan interface{}
addCh chan interface{} addCh chan interface{}
@ -910,8 +994,9 @@ func (p *processorListener) HasSynced() bool {
return p.syncTracker.HasSynced() return p.syncTracker.HasSynced()
} }
func newProcessListener(handler ResourceEventHandler, requestedResyncPeriod, resyncPeriod time.Duration, now time.Time, bufferSize int, hasSynced func() bool) *processorListener { func newProcessListener(logger klog.Logger, handler ResourceEventHandler, requestedResyncPeriod, resyncPeriod time.Duration, now time.Time, bufferSize int, hasSynced func() bool) *processorListener {
ret := &processorListener{ ret := &processorListener{
logger: logger,
nextCh: make(chan interface{}), nextCh: make(chan interface{}),
addCh: make(chan interface{}), addCh: make(chan interface{}),
handler: handler, handler: handler,
@ -934,7 +1019,7 @@ func (p *processorListener) add(notification interface{}) {
} }
func (p *processorListener) pop() { func (p *processorListener) pop() {
defer utilruntime.HandleCrash() defer utilruntime.HandleCrashWithLogger(p.logger)
defer close(p.nextCh) // Tell .run() to stop defer close(p.nextCh) // Tell .run() to stop
var nextCh chan<- interface{} var nextCh chan<- interface{}
@ -968,9 +1053,19 @@ func (p *processorListener) run() {
// we will catch it, **the offending item will be skipped!**, and after a short delay (one second) // we will catch it, **the offending item will be skipped!**, and after a short delay (one second)
// the next notification will be attempted. This is usually better than the alternative of never // the next notification will be attempted. This is usually better than the alternative of never
// delivering again. // delivering again.
stopCh := make(chan struct{}) //
wait.Until(func() { // This only applies if utilruntime is configured to not panic, which is not the default.
sleepAfterCrash := false
for next := range p.nextCh { for next := range p.nextCh {
if sleepAfterCrash {
// Sleep before processing the next item.
time.Sleep(time.Second)
}
func() {
// Gets reset below, but only if we get that far.
sleepAfterCrash = true
defer utilruntime.HandleCrashWithLogger(p.logger)
switch notification := next.(type) { switch notification := next.(type) {
case updateNotification: case updateNotification:
p.handler.OnUpdate(notification.oldObj, notification.newObj) p.handler.OnUpdate(notification.oldObj, notification.newObj)
@ -982,12 +1077,11 @@ func (p *processorListener) run() {
case deleteNotification: case deleteNotification:
p.handler.OnDelete(notification.oldObj) p.handler.OnDelete(notification.oldObj)
default: default:
utilruntime.HandleError(fmt.Errorf("unrecognized notification: %T", next)) utilruntime.HandleErrorWithLogger(p.logger, nil, "unrecognized notification", "notificationType", fmt.Sprintf("%T", next))
} }
sleepAfterCrash = false
}()
} }
// the only way to get here is if the p.nextCh is empty and closed
close(stopCh)
}, 1*time.Second, stopCh)
} }
// shouldResync deterimines if the listener needs a resync. If the listener's resyncPeriod is 0, // shouldResync deterimines if the listener needs a resync. If the listener's resyncPeriod is 0,

View File

@ -20,6 +20,7 @@ import (
"context" "context"
"fmt" "fmt"
"math/rand" "math/rand"
"runtime"
"strconv" "strconv"
"strings" "strings"
"sync" "sync"
@ -34,8 +35,13 @@ import (
v1 "k8s.io/api/core/v1" v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/meta" "k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/sets" "k8s.io/apimachinery/pkg/util/sets"
"k8s.io/apimachinery/pkg/util/wait" "k8s.io/apimachinery/pkg/util/wait"
fcache "k8s.io/client-go/tools/cache/testing"
"k8s.io/klog/v2"
"k8s.io/klog/v2/ktesting"
"k8s.io/klog/v2/textlogger"
testingclock "k8s.io/utils/clock/testing" testingclock "k8s.io/utils/clock/testing"
) )
@ -145,10 +151,15 @@ func TestIndexer(t *testing.T) {
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
stop := make(chan struct{})
defer close(stop)
go informer.Run(stop) var wg wait.Group
stop := make(chan struct{})
wg.StartWithChannel(stop, informer.Run)
defer func() {
close(stop)
wg.Wait()
}()
WaitForCacheSync(stop, informer.HasSynced) WaitForCacheSync(stop, informer.HasSynced)
cmpOps := cmpopts.SortSlices(func(a, b any) bool { cmpOps := cmpopts.SortSlices(func(a, b any) bool {
@ -222,10 +233,13 @@ func TestListenerResyncPeriods(t *testing.T) {
informer.AddEventHandlerWithResyncPeriod(listener3, listener3.resyncPeriod) informer.AddEventHandlerWithResyncPeriod(listener3, listener3.resyncPeriod)
listeners := []*testListener{listener1, listener2, listener3} listeners := []*testListener{listener1, listener2, listener3}
var wg wait.Group
stop := make(chan struct{}) stop := make(chan struct{})
defer close(stop) wg.StartWithChannel(stop, informer.Run)
defer func() {
go informer.Run(stop) close(stop)
wg.Wait()
}()
// ensure all listeners got the initial List // ensure all listeners got the initial List
for _, listener := range listeners { for _, listener := range listeners {
@ -361,10 +375,14 @@ func TestSharedInformerInitializationRace(t *testing.T) {
informer := NewSharedInformer(source, &v1.Pod{}, 1*time.Second).(*sharedIndexInformer) informer := NewSharedInformer(source, &v1.Pod{}, 1*time.Second).(*sharedIndexInformer)
listener := newTestListener("raceListener", 0) listener := newTestListener("raceListener", 0)
stop := make(chan struct{})
go informer.AddEventHandlerWithResyncPeriod(listener, listener.resyncPeriod) go informer.AddEventHandlerWithResyncPeriod(listener, listener.resyncPeriod)
go informer.Run(stop) var wg wait.Group
stop := make(chan struct{})
wg.StartWithChannel(stop, informer.Run)
defer func() {
close(stop) close(stop)
wg.Wait()
}()
} }
// TestSharedInformerWatchDisruption simulates a watch that was closed // TestSharedInformerWatchDisruption simulates a watch that was closed
@ -392,10 +410,13 @@ func TestSharedInformerWatchDisruption(t *testing.T) {
informer.AddEventHandlerWithResyncPeriod(listenerResync, listenerResync.resyncPeriod) informer.AddEventHandlerWithResyncPeriod(listenerResync, listenerResync.resyncPeriod)
listeners := []*testListener{listenerNoResync, listenerResync} listeners := []*testListener{listenerNoResync, listenerResync}
var wg wait.Group
stop := make(chan struct{}) stop := make(chan struct{})
defer close(stop) wg.StartWithChannel(stop, informer.Run)
defer func() {
go informer.Run(stop) close(stop)
wg.Wait()
}()
for _, listener := range listeners { for _, listener := range listeners {
if !listener.ok() { if !listener.ok() {
@ -458,8 +479,13 @@ func TestSharedInformerErrorHandling(t *testing.T) {
errCh <- err errCh <- err
}) })
var wg wait.Group
stop := make(chan struct{}) stop := make(chan struct{})
go informer.Run(stop) wg.StartWithChannel(stop, informer.Run)
defer func() {
close(stop)
wg.Wait()
}()
select { select {
case err := <-errCh: case err := <-errCh:
@ -469,7 +495,6 @@ func TestSharedInformerErrorHandling(t *testing.T) {
case <-time.After(time.Second): case <-time.After(time.Second):
t.Errorf("Timeout waiting for error handler call") t.Errorf("Timeout waiting for error handler call")
} }
close(stop)
} }
// TestSharedInformerStartRace is a regression test to ensure there is no race between // TestSharedInformerStartRace is a regression test to ensure there is no race between
@ -494,9 +519,12 @@ func TestSharedInformerStartRace(t *testing.T) {
} }
}() }()
go informer.Run(stop) var wg wait.Group
wg.StartWithChannel(stop, informer.Run)
defer func() {
close(stop) close(stop)
wg.Wait()
}()
} }
func TestSharedInformerTransformer(t *testing.T) { func TestSharedInformerTransformer(t *testing.T) {
@ -522,9 +550,13 @@ func TestSharedInformerTransformer(t *testing.T) {
listenerTransformer := newTestListener("listenerTransformer", 0, "POD1", "POD2") listenerTransformer := newTestListener("listenerTransformer", 0, "POD1", "POD2")
informer.AddEventHandler(listenerTransformer) informer.AddEventHandler(listenerTransformer)
var wg wait.Group
stop := make(chan struct{}) stop := make(chan struct{})
go informer.Run(stop) wg.StartWithChannel(stop, informer.Run)
defer close(stop) defer func() {
close(stop)
wg.Wait()
}()
if !listenerTransformer.ok() { if !listenerTransformer.ok() {
t.Errorf("%s: expected %v, got %v", listenerTransformer.name, listenerTransformer.expectedItemNames, listenerTransformer.receivedItemNames) t.Errorf("%s: expected %v, got %v", listenerTransformer.name, listenerTransformer.expectedItemNames, listenerTransformer.receivedItemNames)
@ -757,9 +789,11 @@ func TestSharedInformerHandlerAbuse(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background()) ctx, cancel := context.WithCancel(context.Background())
informerCtx, informerCancel := context.WithCancel(context.Background()) informerCtx, informerCancel := context.WithCancel(context.Background())
go func() { var informerWg wait.Group
informer.Run(informerCtx.Done()) informerWg.StartWithChannel(informerCtx.Done(), informer.Run)
defer func() {
cancel() cancel()
informerWg.Wait()
}() }()
worker := func() { worker := func() {
@ -881,8 +915,10 @@ func TestStateSharedInformer(t *testing.T) {
t.Errorf("informer already stopped after creation") t.Errorf("informer already stopped after creation")
return return
} }
var wg wait.Group
stop := make(chan struct{}) stop := make(chan struct{})
go informer.Run(stop) wg.StartWithChannel(stop, informer.Run)
defer wg.Wait()
if !listener.ok() { if !listener.ok() {
t.Errorf("informer did not report initial objects") t.Errorf("informer did not report initial objects")
close(stop) close(stop)
@ -921,7 +957,9 @@ func TestAddOnStoppedSharedInformer(t *testing.T) {
informer := NewSharedInformer(source, &v1.Pod{}, 1*time.Second).(*sharedIndexInformer) informer := NewSharedInformer(source, &v1.Pod{}, 1*time.Second).(*sharedIndexInformer)
listener := newTestListener("listener", 0, "pod1") listener := newTestListener("listener", 0, "pod1")
stop := make(chan struct{}) stop := make(chan struct{})
go informer.Run(stop) var wg wait.Group
wg.StartWithChannel(stop, informer.Run)
defer wg.Wait()
close(stop) close(stop)
err := wait.PollImmediate(100*time.Millisecond, 2*time.Second, func() (bool, error) { err := wait.PollImmediate(100*time.Millisecond, 2*time.Second, func() (bool, error) {
@ -959,7 +997,9 @@ func TestRemoveOnStoppedSharedInformer(t *testing.T) {
return return
} }
stop := make(chan struct{}) stop := make(chan struct{})
go informer.Run(stop) var wg wait.Group
wg.StartWithChannel(stop, informer.Run)
defer wg.Wait()
close(stop) close(stop)
fmt.Println("sleeping") fmt.Println("sleeping")
time.Sleep(1 * time.Second) time.Sleep(1 * time.Second)
@ -986,9 +1026,13 @@ func TestRemoveWhileActive(t *testing.T) {
handle, _ := informer.AddEventHandler(listener) handle, _ := informer.AddEventHandler(listener)
stop := make(chan struct{}) stop := make(chan struct{})
defer close(stop) var wg wait.Group
wg.StartWithChannel(stop, informer.Run)
defer func() {
close(stop)
wg.Wait()
}()
go informer.Run(stop)
source.Add(&v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "pod1"}}) source.Add(&v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "pod1"}})
if !listener.ok() { if !listener.ok() {
@ -1026,7 +1070,12 @@ func TestAddWhileActive(t *testing.T) {
} }
stop := make(chan struct{}) stop := make(chan struct{})
defer close(stop) var wg wait.Group
wg.StartWithChannel(stop, informer.Run)
defer func() {
close(stop)
wg.Wait()
}()
go informer.Run(stop) go informer.Run(stop)
source.Add(&v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "pod1"}}) source.Add(&v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "pod1"}})
@ -1079,8 +1128,6 @@ func TestAddWhileActive(t *testing.T) {
func TestShutdown(t *testing.T) { func TestShutdown(t *testing.T) {
t.Run("no-context", func(t *testing.T) { t.Run("no-context", func(t *testing.T) {
source := newFakeControllerSource(t) source := newFakeControllerSource(t)
stop := make(chan struct{})
defer close(stop)
informer := NewSharedInformer(source, &v1.Pod{}, 1*time.Second) informer := NewSharedInformer(source, &v1.Pod{}, 1*time.Second)
handler, err := informer.AddEventHandler(ResourceEventHandlerFuncs{ handler, err := informer.AddEventHandler(ResourceEventHandlerFuncs{
@ -1090,17 +1137,30 @@ func TestShutdown(t *testing.T) {
defer func() { defer func() {
assert.NoError(t, informer.RemoveEventHandler(handler)) assert.NoError(t, informer.RemoveEventHandler(handler))
}() }()
go informer.Run(stop)
var wg wait.Group
stop := make(chan struct{})
wg.StartWithChannel(stop, informer.Run)
defer func() {
close(stop)
wg.Wait()
}()
require.Eventually(t, informer.HasSynced, time.Minute, time.Millisecond, "informer has synced") require.Eventually(t, informer.HasSynced, time.Minute, time.Millisecond, "informer has synced")
}) })
t.Run("no-context-later", func(t *testing.T) { t.Run("no-context-later", func(t *testing.T) {
source := newFakeControllerSource(t) source := newFakeControllerSource(t)
stop := make(chan struct{})
defer close(stop)
informer := NewSharedInformer(source, &v1.Pod{}, 1*time.Second) informer := NewSharedInformer(source, &v1.Pod{}, 1*time.Second)
go informer.Run(stop)
var wg wait.Group
stop := make(chan struct{})
wg.StartWithChannel(stop, informer.Run)
defer func() {
close(stop)
wg.Wait()
}()
require.Eventually(t, informer.HasSynced, time.Minute, time.Millisecond, "informer has synced") require.Eventually(t, informer.HasSynced, time.Minute, time.Millisecond, "informer has synced")
handler, err := informer.AddEventHandler(ResourceEventHandlerFuncs{ handler, err := informer.AddEventHandler(ResourceEventHandlerFuncs{
@ -1122,3 +1182,152 @@ func TestShutdown(t *testing.T) {
// and it doesn't matter that nothing gets stopped or removed. // and it doesn't matter that nothing gets stopped or removed.
}) })
} }
func TestEventPanics(t *testing.T) {
// timeInUTC := time.Date(2009, 12, 1, 13, 30, 40, 42000, time.UTC)
// timeString := "1201 13:30:40.000042"
// Initialized by init.
var (
buffer threadSafeBuffer
logger klog.Logger
source *fcache.FakeControllerSource
)
init := func(t *testing.T) {
// Restoring state is very sensitive to ordering. All goroutines spawned
// by a test must have completed and there has to be a check that they
// have completed that is visible to the race detector. This also
// applies to all other tests!
t.Cleanup(klog.CaptureState().Restore) //nolint:logcheck // CaptureState shouldn't be used in packages with contextual logging, but here it is okay.
buffer.buffer.Reset()
logger = textlogger.NewLogger(textlogger.NewConfig(
// textlogger.FixedTime(timeInUTC),
textlogger.Output(&buffer),
))
oldReallyCrash := utilruntime.ReallyCrash
utilruntime.ReallyCrash = false
t.Cleanup(func() { utilruntime.ReallyCrash = oldReallyCrash })
source = newFakeControllerSource(t)
source.Add(&v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "pod1"}})
}
newHandler := func(ctx context.Context) ResourceEventHandlerFuncs {
logger := klog.FromContext(ctx)
return ResourceEventHandlerFuncs{
AddFunc: func(obj any) {
logger.Info("Add func will panic now", "pod", klog.KObj(obj.(*v1.Pod)))
panic("fake panic")
},
}
}
_, _, panicLine, _ := runtime.Caller(0)
panicLine -= 4
expectedLog := func(name string) string {
if name == "" {
return fmt.Sprintf(`shared_informer_test.go:%d] "Observed a panic" panic="fake panic"`, panicLine)
}
return fmt.Sprintf(`shared_informer_test.go:%d] "Observed a panic" logger=%q panic="fake panic"`, panicLine, name)
}
handler := newHandler(context.Background())
t.Run("simple", func(t *testing.T) {
init(t)
klog.SetLogger(logger)
stop := make(chan struct{})
informer := NewSharedInformer(source, &v1.Pod{}, 1*time.Second)
handle, err := informer.AddEventHandler(handler)
require.NoError(t, err)
defer func() {
assert.NoError(t, informer.RemoveEventHandler(handle))
}()
var wg wait.Group
wg.StartWithChannel(stop, informer.Run)
defer func() {
close(stop)
assert.Eventually(t, informer.IsStopped, time.Minute, time.Millisecond, "informer has stopped")
wg.Wait() // For race detector...
}()
require.Eventually(t, informer.HasSynced, time.Minute, time.Millisecond, "informer has synced")
// This times out (https://github.com/kubernetes/kubernetes/issues/129024) because the
// handler never syncs when the callback panics:
// require.Eventually(t, handle.HasSynced, time.Minute, time.Millisecond, "handler has synced")
//
// Wait for a non-empty buffer instead. This implies that we have to make
// the buffer thread-safe, which wouldn't be necessary otherwise.
assert.EventuallyWithT(t, func(t *assert.CollectT) {
assert.Contains(t, buffer.String(), expectedLog(""))
}, time.Minute, time.Millisecond, "handler has panicked")
})
t.Run("many", func(t *testing.T) {
init(t)
// One pod was already created in init, add some more.
numPods := 5
for i := 1; i < numPods; i++ {
source.Add(&v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: fmt.Sprintf("pod%d", i+1)}})
}
_, ctx := ktesting.NewTestContext(t)
ctx, cancel := context.WithCancel(ctx)
defer cancel()
informer := NewSharedInformer(source, &v1.Pod{}, 1*time.Second)
name1 := "fake-event-handler-1"
logger1 := klog.LoggerWithName(logger, name1)
ctx1 := klog.NewContext(ctx, logger1)
handle1, err := informer.AddEventHandlerWithOptions(newHandler(ctx1), HandlerOptions{Logger: &logger1})
require.NoError(t, err)
defer func() {
assert.NoError(t, informer.RemoveEventHandler(handle1))
}()
name2 := "fake-event-handler-2"
logger2 := klog.LoggerWithName(logger, name2)
ctx2 := klog.NewContext(ctx, logger2)
handle2, err := informer.AddEventHandlerWithOptions(newHandler(ctx2), HandlerOptions{Logger: &logger2})
require.NoError(t, err)
defer func() {
assert.NoError(t, informer.RemoveEventHandler(handle2))
}()
start := time.Now()
var wg wait.Group
informerName := "informer"
informerLogger := klog.LoggerWithName(logger, informerName)
informerCtx := klog.NewContext(ctx, informerLogger)
wg.StartWithContext(informerCtx, informer.RunWithContext)
defer func() {
cancel()
assert.Eventually(t, informer.IsStopped, time.Minute, time.Millisecond, "informer has stopped")
wg.Wait() // For race detector...
}()
require.Eventually(t, informer.HasSynced, time.Minute, time.Millisecond, "informer has synced")
assert.EventuallyWithT(t, func(t *assert.CollectT) {
output := buffer.String()
expected := expectedLog(name1)
if !assert.Equal(t, numPods, numOccurrences(output, expected), "Log output should have the right number of panics for %q (search string: %q), got instead:\n%s", name1, expected, output) {
return
}
expected = expectedLog(name2)
assert.Equal(t, numPods, numOccurrences(output, expected), "Log output should have the right number of panics for %q (search string %q, got instead:\n%s", name2, expected, output)
}, 30*time.Second, time.Millisecond, "handler has panicked")
// Both handlers should have slept for one second after each panic,
// except after the last pod event because then the input channel
// gets closed.
assert.GreaterOrEqual(t, time.Since(start), time.Duration(numPods-1)*time.Second, "Delay in processorListener.run")
})
}
func numOccurrences(hay, needle string) int {
count := 0
for {
index := strings.Index(hay, needle)
if index < 0 {
return count
}
count++
hay = hay[index+len(needle):]
}
}

View File

@ -17,6 +17,8 @@ limitations under the License.
package cache package cache
import ( import (
"bytes"
"sync"
"testing" "testing"
fcache "k8s.io/client-go/tools/cache/testing" fcache "k8s.io/client-go/tools/cache/testing"
@ -27,3 +29,21 @@ func newFakeControllerSource(tb testing.TB) *fcache.FakeControllerSource {
tb.Cleanup(source.Shutdown) tb.Cleanup(source.Shutdown)
return source return source
} }
// threadSafeBuffer is a thread-safe wrapper around bytes.Buffer.
type threadSafeBuffer struct {
buffer bytes.Buffer
mu sync.Mutex
}
func (b *threadSafeBuffer) Write(p []byte) (n int, err error) {
b.mu.Lock()
defer b.mu.Unlock()
return b.buffer.Write(p)
}
func (b *threadSafeBuffer) String() string {
b.mu.Lock()
defer b.mu.Unlock()
return b.buffer.String()
}