client-go watch: NewIndexerInformerWatcherWithContext -> WithLogger

The ability to automatically stop on context cancellation was new functionality
that adds complexity and wasn't really used in Kubernetes. If someone wants
this, they can add it outside of the function.

A *WithLogger variant avoids the complexity and is consistent with
NewStreamWatcherWithLogger over in apimachinery.

Kubernetes-commit: 1a8d8c9b4a33daf9330434e1ad544ef3571722a3
This commit is contained in:
Patrick Ohly 2025-01-31 10:18:04 +01:00 committed by Kubernetes Publisher
parent e8a7cb0e18
commit 57bc261e52
3 changed files with 6 additions and 51 deletions

View File

@ -17,7 +17,6 @@ limitations under the License.
package watch
import (
"context"
"sync"
"k8s.io/apimachinery/pkg/runtime"
@ -107,18 +106,15 @@ func (e *eventProcessor) stop() {
// so you can use it anywhere where you'd have used a regular Watcher returned from Watch method.
// it also returns a channel you can use to wait for the informers to fully shutdown.
//
// Contextual logging: NewIndexerInformerWatcherWithContext should be used instead of NewIndexerInformerWatcher in code which supports contextual logging.
// Contextual logging: NewIndexerInformerWatcherWithLogger should be used instead of NewIndexerInformerWatcher in code which supports contextual logging.
func NewIndexerInformerWatcher(lw cache.ListerWatcher, objType runtime.Object) (cache.Indexer, cache.Controller, watch.Interface, <-chan struct{}) {
return NewIndexerInformerWatcherWithContext(context.Background(), lw, objType)
return NewIndexerInformerWatcherWithLogger(klog.Background(), lw, objType)
}
// NewIndexerInformerWatcher will create an IndexerInformer and wrap it into watch.Interface
// NewIndexerInformerWatcherWithLogger will create an IndexerInformer and wrap it into watch.Interface
// so you can use it anywhere where you'd have used a regular Watcher returned from Watch method.
// it also returns a channel you can use to wait for the informers to fully shutdown.
//
// Cancellation of the context has the same effect as calling [watch.Interface.Stop]. One or
// the other can be used.
func NewIndexerInformerWatcherWithContext(ctx context.Context, lw cache.ListerWatcher, objType runtime.Object) (cache.Indexer, cache.Controller, watch.Interface, <-chan struct{}) {
func NewIndexerInformerWatcherWithLogger(logger klog.Logger, lw cache.ListerWatcher, objType runtime.Object) (cache.Indexer, cache.Controller, watch.Interface, <-chan struct{}) {
ch := make(chan watch.Event)
w := watch.NewProxyWatcher(ch)
e := newEventProcessor(ch)
@ -155,24 +151,12 @@ func NewIndexerInformerWatcherWithContext(ctx context.Context, lw cache.ListerWa
// This will get stopped, but without waiting for it.
go e.run()
logger := klog.FromContext(ctx)
if ctx.Done() != nil {
go func() {
select {
case <-ctx.Done():
// Map cancellation to Stop. The informer below only waits for that.
w.Stop()
case <-w.StopChan():
}
}()
}
doneCh := make(chan struct{})
go func() {
defer close(doneCh)
defer e.stop()
// Waiting for w.StopChan() is the traditional behavior which gets
// preserved here. Context cancellation is handled above.
// preserved here, with the logger added to support contextual logging.
ctx := wait.ContextForChannel(w.StopChan())
ctx = klog.NewContext(ctx, logger)
informer.RunWithContext(ctx)

View File

@ -18,7 +18,6 @@ package watch
import (
"context"
"errors"
"reflect"
goruntime "runtime"
"sort"
@ -40,8 +39,6 @@ import (
fakeclientset "k8s.io/client-go/kubernetes/fake"
testcore "k8s.io/client-go/testing"
"k8s.io/client-go/tools/cache"
"k8s.io/klog/v2"
"k8s.io/klog/v2/ktesting"
)
// TestEventProcessorExit is expected to timeout if the event processor fails
@ -467,29 +464,3 @@ func TestInformerWatcherDeletedFinalStateUnknown(t *testing.T) {
t.Fatalf("expected at least 1 watch call, got %d", watchCalls)
}
}
func TestInformerContext(t *testing.T) {
logger, ctx := ktesting.NewTestContext(t)
ctx, cancel := context.WithCancel(ctx)
defer cancel()
// Whatever gets called first will stop.
validateContext := func(ctx context.Context) error {
if reflect.TypeOf(logger.GetSink()) != reflect.TypeOf(klog.FromContext(ctx).GetSink()) {
t.Errorf("Expected logger %+v from context, got %+v", logger, klog.FromContext(ctx))
}
cancel()
return errors.New("not implemented by text")
}
lw := &cache.ListWatch{
ListWithContextFunc: func(ctx context.Context, options metav1.ListOptions) (runtime.Object, error) {
return nil, validateContext(ctx)
},
WatchFuncWithContext: func(ctx context.Context, options metav1.ListOptions) (watch.Interface, error) {
return nil, validateContext(ctx)
},
}
_, _, _, done := NewIndexerInformerWatcherWithContext(ctx, lw, &corev1.Secret{})
<-done
}

View File

@ -126,7 +126,7 @@ func Until(ctx context.Context, initialResourceVersion string, watcherClient cac
// The most frequent usage would be a command that needs to watch the "state of the world" and should't fail, like:
// waiting for object reaching a state, "small" controllers, ...
func UntilWithSync(ctx context.Context, lw cache.ListerWatcher, objType runtime.Object, precondition PreconditionFunc, conditions ...ConditionFunc) (*watch.Event, error) {
indexer, informer, watcher, done := NewIndexerInformerWatcherWithContext(ctx, lw, objType)
indexer, informer, watcher, done := NewIndexerInformerWatcherWithLogger(klog.FromContext(ctx), lw, objType)
// We need to wait for the internal informers to fully stop so it's easier to reason about
// and it works with non-thread safe clients.
defer func() { <-done }()