From 57bc261e52e57b3972493701c7b77f98467ec58b Mon Sep 17 00:00:00 2001 From: Patrick Ohly Date: Fri, 31 Jan 2025 10:18:04 +0100 Subject: [PATCH] client-go watch: NewIndexerInformerWatcherWithContext -> WithLogger The ability to automatically stop on context cancellation was new functionality that adds complexity and wasn't really used in Kubernetes. If someone wants this, they can add it outside of the function. A *WithLogger variant avoids the complexity and is consistent with NewStreamWatcherWithLogger over in apimachinery. Kubernetes-commit: 1a8d8c9b4a33daf9330434e1ad544ef3571722a3 --- tools/watch/informerwatcher.go | 26 +++++--------------------- tools/watch/informerwatcher_test.go | 29 ----------------------------- tools/watch/until.go | 2 +- 3 files changed, 6 insertions(+), 51 deletions(-) diff --git a/tools/watch/informerwatcher.go b/tools/watch/informerwatcher.go index 374264ef..114abfcc 100644 --- a/tools/watch/informerwatcher.go +++ b/tools/watch/informerwatcher.go @@ -17,7 +17,6 @@ limitations under the License. package watch import ( - "context" "sync" "k8s.io/apimachinery/pkg/runtime" @@ -107,18 +106,15 @@ func (e *eventProcessor) stop() { // so you can use it anywhere where you'd have used a regular Watcher returned from Watch method. // it also returns a channel you can use to wait for the informers to fully shutdown. // -// Contextual logging: NewIndexerInformerWatcherWithContext should be used instead of NewIndexerInformerWatcher in code which supports contextual logging. +// Contextual logging: NewIndexerInformerWatcherWithLogger should be used instead of NewIndexerInformerWatcher in code which supports contextual logging. func NewIndexerInformerWatcher(lw cache.ListerWatcher, objType runtime.Object) (cache.Indexer, cache.Controller, watch.Interface, <-chan struct{}) { - return NewIndexerInformerWatcherWithContext(context.Background(), lw, objType) + return NewIndexerInformerWatcherWithLogger(klog.Background(), lw, objType) } -// NewIndexerInformerWatcher will create an IndexerInformer and wrap it into watch.Interface +// NewIndexerInformerWatcherWithLogger will create an IndexerInformer and wrap it into watch.Interface // so you can use it anywhere where you'd have used a regular Watcher returned from Watch method. // it also returns a channel you can use to wait for the informers to fully shutdown. -// -// Cancellation of the context has the same effect as calling [watch.Interface.Stop]. One or -// the other can be used. -func NewIndexerInformerWatcherWithContext(ctx context.Context, lw cache.ListerWatcher, objType runtime.Object) (cache.Indexer, cache.Controller, watch.Interface, <-chan struct{}) { +func NewIndexerInformerWatcherWithLogger(logger klog.Logger, lw cache.ListerWatcher, objType runtime.Object) (cache.Indexer, cache.Controller, watch.Interface, <-chan struct{}) { ch := make(chan watch.Event) w := watch.NewProxyWatcher(ch) e := newEventProcessor(ch) @@ -155,24 +151,12 @@ func NewIndexerInformerWatcherWithContext(ctx context.Context, lw cache.ListerWa // This will get stopped, but without waiting for it. go e.run() - logger := klog.FromContext(ctx) - if ctx.Done() != nil { - go func() { - select { - case <-ctx.Done(): - // Map cancellation to Stop. The informer below only waits for that. - w.Stop() - case <-w.StopChan(): - } - }() - } - doneCh := make(chan struct{}) go func() { defer close(doneCh) defer e.stop() // Waiting for w.StopChan() is the traditional behavior which gets - // preserved here. Context cancellation is handled above. + // preserved here, with the logger added to support contextual logging. ctx := wait.ContextForChannel(w.StopChan()) ctx = klog.NewContext(ctx, logger) informer.RunWithContext(ctx) diff --git a/tools/watch/informerwatcher_test.go b/tools/watch/informerwatcher_test.go index 0a657d76..e03f9612 100644 --- a/tools/watch/informerwatcher_test.go +++ b/tools/watch/informerwatcher_test.go @@ -18,7 +18,6 @@ package watch import ( "context" - "errors" "reflect" goruntime "runtime" "sort" @@ -40,8 +39,6 @@ import ( fakeclientset "k8s.io/client-go/kubernetes/fake" testcore "k8s.io/client-go/testing" "k8s.io/client-go/tools/cache" - "k8s.io/klog/v2" - "k8s.io/klog/v2/ktesting" ) // TestEventProcessorExit is expected to timeout if the event processor fails @@ -467,29 +464,3 @@ func TestInformerWatcherDeletedFinalStateUnknown(t *testing.T) { t.Fatalf("expected at least 1 watch call, got %d", watchCalls) } } - -func TestInformerContext(t *testing.T) { - logger, ctx := ktesting.NewTestContext(t) - ctx, cancel := context.WithCancel(ctx) - defer cancel() - - // Whatever gets called first will stop. - validateContext := func(ctx context.Context) error { - if reflect.TypeOf(logger.GetSink()) != reflect.TypeOf(klog.FromContext(ctx).GetSink()) { - t.Errorf("Expected logger %+v from context, got %+v", logger, klog.FromContext(ctx)) - } - cancel() - return errors.New("not implemented by text") - } - lw := &cache.ListWatch{ - ListWithContextFunc: func(ctx context.Context, options metav1.ListOptions) (runtime.Object, error) { - return nil, validateContext(ctx) - }, - WatchFuncWithContext: func(ctx context.Context, options metav1.ListOptions) (watch.Interface, error) { - return nil, validateContext(ctx) - }, - } - - _, _, _, done := NewIndexerInformerWatcherWithContext(ctx, lw, &corev1.Secret{}) - <-done -} diff --git a/tools/watch/until.go b/tools/watch/until.go index 844b93fb..03ceaf00 100644 --- a/tools/watch/until.go +++ b/tools/watch/until.go @@ -126,7 +126,7 @@ func Until(ctx context.Context, initialResourceVersion string, watcherClient cac // The most frequent usage would be a command that needs to watch the "state of the world" and should't fail, like: // waiting for object reaching a state, "small" controllers, ... func UntilWithSync(ctx context.Context, lw cache.ListerWatcher, objType runtime.Object, precondition PreconditionFunc, conditions ...ConditionFunc) (*watch.Event, error) { - indexer, informer, watcher, done := NewIndexerInformerWatcherWithContext(ctx, lw, objType) + indexer, informer, watcher, done := NewIndexerInformerWatcherWithLogger(klog.FromContext(ctx), lw, objType) // We need to wait for the internal informers to fully stop so it's easier to reason about // and it works with non-thread safe clients. defer func() { <-done }()