diff --git a/staging/src/k8s.io/client-go/tools/watch/informerwatcher.go b/staging/src/k8s.io/client-go/tools/watch/informerwatcher.go index 374264ef07c..114abfcc9be 100644 --- a/staging/src/k8s.io/client-go/tools/watch/informerwatcher.go +++ b/staging/src/k8s.io/client-go/tools/watch/informerwatcher.go @@ -17,7 +17,6 @@ limitations under the License. package watch import ( - "context" "sync" "k8s.io/apimachinery/pkg/runtime" @@ -107,18 +106,15 @@ func (e *eventProcessor) stop() { // so you can use it anywhere where you'd have used a regular Watcher returned from Watch method. // it also returns a channel you can use to wait for the informers to fully shutdown. // -// Contextual logging: NewIndexerInformerWatcherWithContext should be used instead of NewIndexerInformerWatcher in code which supports contextual logging. +// Contextual logging: NewIndexerInformerWatcherWithLogger should be used instead of NewIndexerInformerWatcher in code which supports contextual logging. func NewIndexerInformerWatcher(lw cache.ListerWatcher, objType runtime.Object) (cache.Indexer, cache.Controller, watch.Interface, <-chan struct{}) { - return NewIndexerInformerWatcherWithContext(context.Background(), lw, objType) + return NewIndexerInformerWatcherWithLogger(klog.Background(), lw, objType) } -// NewIndexerInformerWatcher will create an IndexerInformer and wrap it into watch.Interface +// NewIndexerInformerWatcherWithLogger will create an IndexerInformer and wrap it into watch.Interface // so you can use it anywhere where you'd have used a regular Watcher returned from Watch method. // it also returns a channel you can use to wait for the informers to fully shutdown. -// -// Cancellation of the context has the same effect as calling [watch.Interface.Stop]. One or -// the other can be used. -func NewIndexerInformerWatcherWithContext(ctx context.Context, lw cache.ListerWatcher, objType runtime.Object) (cache.Indexer, cache.Controller, watch.Interface, <-chan struct{}) { +func NewIndexerInformerWatcherWithLogger(logger klog.Logger, lw cache.ListerWatcher, objType runtime.Object) (cache.Indexer, cache.Controller, watch.Interface, <-chan struct{}) { ch := make(chan watch.Event) w := watch.NewProxyWatcher(ch) e := newEventProcessor(ch) @@ -155,24 +151,12 @@ func NewIndexerInformerWatcherWithContext(ctx context.Context, lw cache.ListerWa // This will get stopped, but without waiting for it. go e.run() - logger := klog.FromContext(ctx) - if ctx.Done() != nil { - go func() { - select { - case <-ctx.Done(): - // Map cancellation to Stop. The informer below only waits for that. - w.Stop() - case <-w.StopChan(): - } - }() - } - doneCh := make(chan struct{}) go func() { defer close(doneCh) defer e.stop() // Waiting for w.StopChan() is the traditional behavior which gets - // preserved here. Context cancellation is handled above. + // preserved here, with the logger added to support contextual logging. ctx := wait.ContextForChannel(w.StopChan()) ctx = klog.NewContext(ctx, logger) informer.RunWithContext(ctx) diff --git a/staging/src/k8s.io/client-go/tools/watch/informerwatcher_test.go b/staging/src/k8s.io/client-go/tools/watch/informerwatcher_test.go index 0a657d76a5a..e03f9612dba 100644 --- a/staging/src/k8s.io/client-go/tools/watch/informerwatcher_test.go +++ b/staging/src/k8s.io/client-go/tools/watch/informerwatcher_test.go @@ -18,7 +18,6 @@ package watch import ( "context" - "errors" "reflect" goruntime "runtime" "sort" @@ -40,8 +39,6 @@ import ( fakeclientset "k8s.io/client-go/kubernetes/fake" testcore "k8s.io/client-go/testing" "k8s.io/client-go/tools/cache" - "k8s.io/klog/v2" - "k8s.io/klog/v2/ktesting" ) // TestEventProcessorExit is expected to timeout if the event processor fails @@ -467,29 +464,3 @@ func TestInformerWatcherDeletedFinalStateUnknown(t *testing.T) { t.Fatalf("expected at least 1 watch call, got %d", watchCalls) } } - -func TestInformerContext(t *testing.T) { - logger, ctx := ktesting.NewTestContext(t) - ctx, cancel := context.WithCancel(ctx) - defer cancel() - - // Whatever gets called first will stop. - validateContext := func(ctx context.Context) error { - if reflect.TypeOf(logger.GetSink()) != reflect.TypeOf(klog.FromContext(ctx).GetSink()) { - t.Errorf("Expected logger %+v from context, got %+v", logger, klog.FromContext(ctx)) - } - cancel() - return errors.New("not implemented by text") - } - lw := &cache.ListWatch{ - ListWithContextFunc: func(ctx context.Context, options metav1.ListOptions) (runtime.Object, error) { - return nil, validateContext(ctx) - }, - WatchFuncWithContext: func(ctx context.Context, options metav1.ListOptions) (watch.Interface, error) { - return nil, validateContext(ctx) - }, - } - - _, _, _, done := NewIndexerInformerWatcherWithContext(ctx, lw, &corev1.Secret{}) - <-done -} diff --git a/staging/src/k8s.io/client-go/tools/watch/until.go b/staging/src/k8s.io/client-go/tools/watch/until.go index 844b93fb0f6..03ceaf002d2 100644 --- a/staging/src/k8s.io/client-go/tools/watch/until.go +++ b/staging/src/k8s.io/client-go/tools/watch/until.go @@ -126,7 +126,7 @@ func Until(ctx context.Context, initialResourceVersion string, watcherClient cac // The most frequent usage would be a command that needs to watch the "state of the world" and should't fail, like: // waiting for object reaching a state, "small" controllers, ... func UntilWithSync(ctx context.Context, lw cache.ListerWatcher, objType runtime.Object, precondition PreconditionFunc, conditions ...ConditionFunc) (*watch.Event, error) { - indexer, informer, watcher, done := NewIndexerInformerWatcherWithContext(ctx, lw, objType) + indexer, informer, watcher, done := NewIndexerInformerWatcherWithLogger(klog.FromContext(ctx), lw, objType) // We need to wait for the internal informers to fully stop so it's easier to reason about // and it works with non-thread safe clients. defer func() { <-done }()