From bad1caabdeb663e603313fd2049066b2952d29ca Mon Sep 17 00:00:00 2001 From: Patrick Ohly Date: Fri, 20 Dec 2024 13:55:47 +0100 Subject: [PATCH] client-go + apimachinery watch: context support The Lister and Watcher interfaces only supported methods without context, but were typically implemented with client-go API calls which need a context. New interfaces get added using the same approach as in https://github.com/kubernetes/kubernetes/pull/129109. Kubernetes-commit: 6688adae142e37114d9dfa8d94cd1d8a91fbcc13 --- rest/request.go | 3 +- tools/cache/controller_test.go | 2 +- tools/cache/listwatch.go | 174 +++++++++++++++++++++++++- tools/cache/mutation_detector_test.go | 4 +- tools/cache/reflector.go | 19 +-- tools/cache/reflector_test.go | 75 +++++------ tools/watch/informerwatcher.go | 34 ++++- tools/watch/informerwatcher_test.go | 31 +++++ tools/watch/retrywatcher.go | 103 +++++++-------- tools/watch/retrywatcher_test.go | 6 +- tools/watch/until.go | 6 +- 11 files changed, 329 insertions(+), 128 deletions(-) diff --git a/rest/request.go b/rest/request.go index 5bf5db07..1eb2f9b4 100644 --- a/rest/request.go +++ b/rest/request.go @@ -1008,7 +1008,8 @@ func (r *Request) newStreamWatcher(ctx context.Context, resp *http.Response) (wa frameReader := framer.NewFrameReader(resp.Body) watchEventDecoder := streaming.NewDecoder(frameReader, streamingSerializer) - return watch.NewStreamWatcher( + return watch.NewStreamWatcherWithLogger( + klog.FromContext(ctx), restclientwatch.NewDecoder(watchEventDecoder, objectDecoder), // use 500 to indicate that the cause of the error is unknown - other error codes // are more specific to HTTP interactions, and set a reason diff --git a/tools/cache/controller_test.go b/tools/cache/controller_test.go index 05425792..9067fc15 100644 --- a/tools/cache/controller_test.go +++ b/tools/cache/controller_test.go @@ -363,7 +363,7 @@ func TestUpdate(t *testing.T) { // everything we've added has been deleted. watchCh := make(chan struct{}) _, controller := NewInformer( - &testLW{ + &ListWatch{ WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) { watch, err := source.Watch(options) close(watchCh) diff --git a/tools/cache/listwatch.go b/tools/cache/listwatch.go index f5708ffe..30d30e88 100644 --- a/tools/cache/listwatch.go +++ b/tools/cache/listwatch.go @@ -27,50 +27,160 @@ import ( ) // Lister is any object that knows how to perform an initial list. +// +// Ideally, all implementations of Lister should also implement ListerWithContext. type Lister interface { // List should return a list type object; the Items field will be extracted, and the // ResourceVersion field will be used to start the watch in the right place. + // + // Deprecated: use ListerWithContext.ListWithContext instead. List(options metav1.ListOptions) (runtime.Object, error) } +// ListerWithContext is any object that knows how to perform an initial list. +type ListerWithContext interface { + // ListWithContext should return a list type object; the Items field will be extracted, and the + // ResourceVersion field will be used to start the watch in the right place. + ListWithContext(ctx context.Context, options metav1.ListOptions) (runtime.Object, error) +} + +func ToListerWithContext(l Lister) ListerWithContext { + if l, ok := l.(ListerWithContext); ok { + return l + } + return listerWrapper{ + parent: l, + } +} + +type listerWrapper struct { + parent Lister +} + +func (l listerWrapper) ListWithContext(ctx context.Context, options metav1.ListOptions) (runtime.Object, error) { + return l.parent.List(options) +} + // Watcher is any object that knows how to start a watch on a resource. +// +// Ideally, all implementations of Watcher should also implement WatcherWithContext. type Watcher interface { // Watch should begin a watch at the specified version. // // If Watch returns an error, it should handle its own cleanup, including // but not limited to calling Stop() on the watch, if one was constructed. // This allows the caller to ignore the watch, if the error is non-nil. + // + // Deprecated: use WatcherWithContext.WatchWithContext instead. Watch(options metav1.ListOptions) (watch.Interface, error) } +// WatcherWithContext is any object that knows how to start a watch on a resource. +type WatcherWithContext interface { + // WatchWithContext should begin a watch at the specified version. + // + // If Watch returns an error, it should handle its own cleanup, including + // but not limited to calling Stop() on the watch, if one was constructed. + // This allows the caller to ignore the watch, if the error is non-nil. + WatchWithContext(ctx context.Context, options metav1.ListOptions) (watch.Interface, error) +} + +func ToWatcherWithContext(w Watcher) WatcherWithContext { + if w, ok := w.(WatcherWithContext); ok { + return w + } + return watcherWrapper{ + parent: w, + } +} + +type watcherWrapper struct { + parent Watcher +} + +func (l watcherWrapper) WatchWithContext(ctx context.Context, options metav1.ListOptions) (watch.Interface, error) { + return l.parent.Watch(options) +} + // ListerWatcher is any object that knows how to perform an initial list and start a watch on a resource. +// +// Ideally, all implementations of ListerWatcher should also implement ListerWatcherWithContext. type ListerWatcher interface { Lister Watcher } +// ListerWatcherWithContext is any object that knows how to perform an initial list and start a watch on a resource. +type ListerWatcherWithContext interface { + ListerWithContext + WatcherWithContext +} + +func ToListerWatcherWithContext(lw ListerWatcher) ListerWatcherWithContext { + if lw, ok := lw.(ListerWatcherWithContext); ok { + return lw + } + return listerWatcherWrapper{ + ListerWithContext: ToListerWithContext(lw), + WatcherWithContext: ToWatcherWithContext(lw), + } +} + +type listerWatcherWrapper struct { + ListerWithContext + WatcherWithContext +} + // ListFunc knows how to list resources +// +// Deprecated: use ListWithContextFunc instead. type ListFunc func(options metav1.ListOptions) (runtime.Object, error) +// ListWithContextFunc knows how to list resources +type ListWithContextFunc func(ctx context.Context, options metav1.ListOptions) (runtime.Object, error) + // WatchFunc knows how to watch resources +// +// Deprecated: use WatchFuncWithContext instead. type WatchFunc func(options metav1.ListOptions) (watch.Interface, error) -// ListWatch knows how to list and watch a set of apiserver resources. It satisfies the ListerWatcher interface. +// WatchFuncWithContext knows how to watch resources +type WatchFuncWithContext func(ctx context.Context, options metav1.ListOptions) (watch.Interface, error) + +// ListWatch knows how to list and watch a set of apiserver resources. +// It satisfies the ListerWatcher and ListerWatcherWithContext interfaces. // It is a convenience function for users of NewReflector, etc. -// ListFunc and WatchFunc must not be nil +// ListFunc or ListWithContextFunc must be set. Same for WatchFunc and WatchFuncWithContext. +// ListWithContextFunc and WatchFuncWithContext are preferred if +// a context is available, otherwise ListFunc and WatchFunc. +// +// NewFilteredListWatchFromClient sets all of the functions to ensure that callers +// which only know about ListFunc and WatchFunc continue to work. type ListWatch struct { - ListFunc ListFunc + // Deprecated: use ListWithContext instead. + ListFunc ListFunc + // Deprecated: use WatchWithContext instead. WatchFunc WatchFunc + + ListWithContextFunc ListWithContextFunc + WatchFuncWithContext WatchFuncWithContext + // DisableChunking requests no chunking for this list watcher. DisableChunking bool } +var ( + _ ListerWatcher = &ListWatch{} + _ ListerWatcherWithContext = &ListWatch{} +) + // Getter interface knows how to access Get method from RESTClient. type Getter interface { Get() *restclient.Request } // NewListWatchFromClient creates a new ListWatch from the specified client, resource, namespace and field selector. +// For backward compatibility, all function fields are populated. func NewListWatchFromClient(c Getter, resource string, namespace string, fieldSelector fields.Selector) *ListWatch { optionsModifier := func(options *metav1.ListOptions) { options.FieldSelector = fieldSelector.String() @@ -81,6 +191,7 @@ func NewListWatchFromClient(c Getter, resource string, namespace string, fieldSe // NewFilteredListWatchFromClient creates a new ListWatch from the specified client, resource, namespace, and option modifier. // Option modifier is a function takes a ListOptions and modifies the consumed ListOptions. Provide customized modifier function // to apply modification to ListOptions with a field selector, a label selector, or any other desired options. +// For backward compatibility, all function fields are populated. func NewFilteredListWatchFromClient(c Getter, resource string, namespace string, optionsModifier func(options *metav1.ListOptions)) *ListWatch { listFunc := func(options metav1.ListOptions) (runtime.Object, error) { optionsModifier(&options) @@ -88,7 +199,7 @@ func NewFilteredListWatchFromClient(c Getter, resource string, namespace string, Namespace(namespace). Resource(resource). VersionedParams(&options, metav1.ParameterCodec). - Do(context.TODO()). + Do(context.Background()). Get() } watchFunc := func(options metav1.ListOptions) (watch.Interface, error) { @@ -98,19 +209,70 @@ func NewFilteredListWatchFromClient(c Getter, resource string, namespace string, Namespace(namespace). Resource(resource). VersionedParams(&options, metav1.ParameterCodec). - Watch(context.TODO()) + Watch(context.Background()) + } + listFuncWithContext := func(ctx context.Context, options metav1.ListOptions) (runtime.Object, error) { + optionsModifier(&options) + return c.Get(). + Namespace(namespace). + Resource(resource). + VersionedParams(&options, metav1.ParameterCodec). + Do(ctx). + Get() + } + watchFuncWithContext := func(ctx context.Context, options metav1.ListOptions) (watch.Interface, error) { + options.Watch = true + optionsModifier(&options) + return c.Get(). + Namespace(namespace). + Resource(resource). + VersionedParams(&options, metav1.ParameterCodec). + Watch(ctx) + } + return &ListWatch{ + ListFunc: listFunc, + WatchFunc: watchFunc, + ListWithContextFunc: listFuncWithContext, + WatchFuncWithContext: watchFuncWithContext, } - return &ListWatch{ListFunc: listFunc, WatchFunc: watchFunc} } // List a set of apiserver resources +// +// Deprecated: use ListWatchWithContext.ListWithContext instead. func (lw *ListWatch) List(options metav1.ListOptions) (runtime.Object, error) { // ListWatch is used in Reflector, which already supports pagination. // Don't paginate here to avoid duplication. + if lw.ListFunc != nil { + return lw.ListFunc(options) + } + return lw.ListWithContextFunc(context.Background(), options) +} + +// List a set of apiserver resources +func (lw *ListWatch) ListWithContext(ctx context.Context, options metav1.ListOptions) (runtime.Object, error) { + // ListWatch is used in Reflector, which already supports pagination. + // Don't paginate here to avoid duplication. + if lw.ListWithContextFunc != nil { + return lw.ListWithContextFunc(ctx, options) + } return lw.ListFunc(options) } // Watch a set of apiserver resources +// +// Deprecated: use ListWatchWithContext.WatchWithContext instead. func (lw *ListWatch) Watch(options metav1.ListOptions) (watch.Interface, error) { + if lw.WatchFunc != nil { + return lw.WatchFunc(options) + } + return lw.WatchFuncWithContext(context.Background(), options) +} + +// Watch a set of apiserver resources +func (lw *ListWatch) WatchWithContext(ctx context.Context, options metav1.ListOptions) (watch.Interface, error) { + if lw.WatchFuncWithContext != nil { + return lw.WatchFuncWithContext(ctx, options) + } return lw.WatchFunc(options) } diff --git a/tools/cache/mutation_detector_test.go b/tools/cache/mutation_detector_test.go index 589b87a0..fab9f4dd 100644 --- a/tools/cache/mutation_detector_test.go +++ b/tools/cache/mutation_detector_test.go @@ -20,7 +20,7 @@ import ( "testing" "time" - "k8s.io/api/core/v1" + v1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/util/wait" @@ -29,7 +29,7 @@ import ( func TestMutationDetector(t *testing.T) { fakeWatch := watch.NewFake() - lw := &testLW{ + lw := &ListWatch{ WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) { return fakeWatch, nil }, diff --git a/tools/cache/reflector.go b/tools/cache/reflector.go index 0d054df4..8e2a8270 100644 --- a/tools/cache/reflector.go +++ b/tools/cache/reflector.go @@ -96,7 +96,7 @@ type Reflector struct { // The destination to sync up with the watch source store ReflectorStore // listerWatcher is used to perform lists and watches. - listerWatcher ListerWatcher + listerWatcher ListerWatcherWithContext // backoff manages backoff of ListWatch backoffManager wait.BackoffManager resyncPeriod time.Duration @@ -270,7 +270,7 @@ func NewReflectorWithOptions(lw ListerWatcher, expectedType interface{}, store R resyncPeriod: options.ResyncPeriod, minWatchTimeout: minWatchTimeout, typeDescription: options.TypeDescription, - listerWatcher: lw, + listerWatcher: ToListerWatcherWithContext(lw), store: store, // We used to make the call every 1sec (1 QPS), the goal here is to achieve ~98% traffic reduction when // API server is not healthy. With these parameters, backoff will stop at [30,60) sec interval which is @@ -512,7 +512,7 @@ func (r *Reflector) watch(ctx context.Context, w watch.Interface, resyncerrc cha AllowWatchBookmarks: true, } - w, err = r.listerWatcher.Watch(options) + w, err = r.listerWatcher.WatchWithContext(ctx, options) if err != nil { if canRetry := isWatchErrorRetriable(err); canRetry { logger.V(4).Info("Watch failed - backing off", "reflector", r.name, "type", r.typeDescription, "err", err) @@ -583,7 +583,7 @@ func (r *Reflector) list(ctx context.Context) error { // Attempt to gather list in chunks, if supported by listerWatcher, if not, the first // list request will return the full response. pager := pager.New(pager.SimplePageFunc(func(opts metav1.ListOptions) (runtime.Object, error) { - return r.listerWatcher.List(opts) + return r.listerWatcher.ListWithContext(ctx, opts) })) switch { case r.WatchListPageSize != 0: @@ -739,7 +739,7 @@ func (r *Reflector) watchList(ctx context.Context) (watch.Interface, error) { } start := r.clock.Now() - w, err = r.listerWatcher.Watch(options) + w, err = r.listerWatcher.WatchWithContext(ctx, options) if err != nil { if isErrorRetriableWithSideEffectsFn(err) { continue @@ -771,7 +771,7 @@ func (r *Reflector) watchList(ctx context.Context) (watch.Interface, error) { // we utilize the temporaryStore to ensure independence from the current store implementation. // as of today, the store is implemented as a queue and will be drained by the higher-level // component as soon as it finishes replacing the content. - checkWatchListDataConsistencyIfRequested(ctx, r.name, resourceVersion, wrapListFuncWithContext(r.listerWatcher.List), temporaryStore.List) + checkWatchListDataConsistencyIfRequested(ctx, r.name, resourceVersion, r.listerWatcher.ListWithContext, temporaryStore.List) if err := r.store.Replace(temporaryStore.List(), resourceVersion); err != nil { return nil, fmt.Errorf("unable to sync watch-list result: %w", err) @@ -1057,13 +1057,6 @@ func isWatchErrorRetriable(err error) bool { return false } -// wrapListFuncWithContext simply wraps ListFunction into another function that accepts a context and ignores it. -func wrapListFuncWithContext(listFn ListFunc) func(ctx context.Context, options metav1.ListOptions) (runtime.Object, error) { - return func(_ context.Context, options metav1.ListOptions) (runtime.Object, error) { - return listFn(options) - } -} - // initialEventsEndBookmarkTicker a ticker that produces a warning if the bookmark event // which marks the end of the watch stream, has not been received within the defined tick interval. // diff --git a/tools/cache/reflector_test.go b/tools/cache/reflector_test.go index b880cfc0..d7614f36 100644 --- a/tools/cache/reflector_test.go +++ b/tools/cache/reflector_test.go @@ -52,24 +52,12 @@ import ( var nevererrc chan error -type testLW struct { - ListFunc func(options metav1.ListOptions) (runtime.Object, error) - WatchFunc func(options metav1.ListOptions) (watch.Interface, error) -} - -func (t *testLW) List(options metav1.ListOptions) (runtime.Object, error) { - return t.ListFunc(options) -} -func (t *testLW) Watch(options metav1.ListOptions) (watch.Interface, error) { - return t.WatchFunc(options) -} - func TestCloseWatchChannelOnError(t *testing.T) { _, ctx := ktesting.NewTestContext(t) - r := NewReflector(&testLW{}, &v1.Pod{}, NewStore(MetaNamespaceKeyFunc), 0) + r := NewReflector(&ListWatch{}, &v1.Pod{}, NewStore(MetaNamespaceKeyFunc), 0) pod := &v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "bar"}} fw := watch.NewFake() - r.listerWatcher = &testLW{ + r.listerWatcher = &ListWatch{ WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) { return fw, nil }, @@ -94,9 +82,9 @@ func TestRunUntil(t *testing.T) { _, ctx := ktesting.NewTestContext(t) ctx, cancel := context.WithCancelCause(ctx) store := NewStore(MetaNamespaceKeyFunc) - r := NewReflector(&testLW{}, &v1.Pod{}, store, 0) + r := NewReflector(&ListWatch{}, &v1.Pod{}, store, 0) fw := watch.NewFake() - r.listerWatcher = &testLW{ + r.listerWatcher = &ListWatch{ WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) { return fw, nil }, @@ -138,7 +126,7 @@ func TestRunUntil(t *testing.T) { func TestReflectorResyncChan(t *testing.T) { s := NewStore(MetaNamespaceKeyFunc) - g := NewReflector(&testLW{}, &v1.Pod{}, s, time.Millisecond) + g := NewReflector(&ListWatch{}, &v1.Pod{}, s, time.Millisecond) a, _ := g.resyncChan() b := time.After(wait.ForeverTestTimeout) select { @@ -208,7 +196,7 @@ func TestReflectorWatchStoppedAfter(t *testing.T) { func BenchmarkReflectorResyncChanMany(b *testing.B) { s := NewStore(MetaNamespaceKeyFunc) - g := NewReflector(&testLW{}, &v1.Pod{}, s, 25*time.Millisecond) + g := NewReflector(&ListWatch{}, &v1.Pod{}, s, 25*time.Millisecond) // The improvement to this (calling the timer's Stop() method) makes // this benchmark about 40% faster. for i := 0; i < b.N; i++ { @@ -223,7 +211,7 @@ func BenchmarkReflectorResyncChanMany(b *testing.B) { // ResultChan is only called once and that Stop is called after ResultChan. func TestReflectorHandleWatchStoppedBefore(t *testing.T) { s := NewStore(MetaNamespaceKeyFunc) - g := NewReflector(&testLW{}, &v1.Pod{}, s, 0) + g := NewReflector(&ListWatch{}, &v1.Pod{}, s, 0) _, ctx := ktesting.NewTestContext(t) ctx, cancel := context.WithCancelCause(ctx) // Simulate the context being canceled before the watchHandler is called @@ -255,7 +243,7 @@ func TestReflectorHandleWatchStoppedBefore(t *testing.T) { // ResultChan is only called once and that Stop is called after ResultChan. func TestReflectorHandleWatchStoppedAfter(t *testing.T) { s := NewStore(MetaNamespaceKeyFunc) - g := NewReflector(&testLW{}, &v1.Pod{}, s, 0) + g := NewReflector(&ListWatch{}, &v1.Pod{}, s, 0) var calls []string _, ctx := ktesting.NewTestContext(t) ctx, cancel := context.WithCancelCause(ctx) @@ -291,7 +279,7 @@ func TestReflectorHandleWatchStoppedAfter(t *testing.T) { // stops when the result channel is closed before handleWatch was called. func TestReflectorHandleWatchResultChanClosedBefore(t *testing.T) { s := NewStore(MetaNamespaceKeyFunc) - g := NewReflector(&testLW{}, &v1.Pod{}, s, 0) + g := NewReflector(&ListWatch{}, &v1.Pod{}, s, 0) _, ctx := ktesting.NewTestContext(t) var calls []string resultCh := make(chan watch.Event) @@ -320,7 +308,7 @@ func TestReflectorHandleWatchResultChanClosedBefore(t *testing.T) { // stops when the result channel is closed after handleWatch has started watching. func TestReflectorHandleWatchResultChanClosedAfter(t *testing.T) { s := NewStore(MetaNamespaceKeyFunc) - g := NewReflector(&testLW{}, &v1.Pod{}, s, 0) + g := NewReflector(&ListWatch{}, &v1.Pod{}, s, 0) _, ctx := ktesting.NewTestContext(t) var calls []string resultCh := make(chan watch.Event) @@ -352,7 +340,7 @@ func TestReflectorHandleWatchResultChanClosedAfter(t *testing.T) { func TestReflectorWatchHandler(t *testing.T) { s := NewStore(MetaNamespaceKeyFunc) - g := NewReflector(&testLW{}, &v1.Pod{}, s, 0) + g := NewReflector(&ListWatch{}, &v1.Pod{}, s, 0) // Wrap setLastSyncResourceVersion so we can tell the watchHandler to stop // watching after all the events have been consumed. This avoids race // conditions which can happen if the producer calls Stop(), instead of the @@ -416,7 +404,7 @@ func TestReflectorWatchHandler(t *testing.T) { func TestReflectorStopWatch(t *testing.T) { s := NewStore(MetaNamespaceKeyFunc) - g := NewReflector(&testLW{}, &v1.Pod{}, s, 0) + g := NewReflector(&ListWatch{}, &v1.Pod{}, s, 0) fw := watch.NewFake() _, ctx := ktesting.NewTestContext(t) ctx, cancel := context.WithCancelCause(ctx) @@ -437,7 +425,7 @@ func TestReflectorListAndWatch(t *testing.T) { // to get called at the beginning of the watch with 1, and again with 3 when we // inject an error. expectedRVs := []string{"1", "3"} - lw := &testLW{ + lw := &ListWatch{ WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) { rv := options.ResourceVersion fw := watch.NewFake() @@ -555,7 +543,7 @@ func TestReflectorListAndWatchWithErrors(t *testing.T) { watchRet, watchErr := item.events, item.watchErr _, ctx := ktesting.NewTestContext(t) ctx, cancel := context.WithCancelCause(ctx) - lw := &testLW{ + lw := &ListWatch{ WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) { if watchErr != nil { return nil, watchErr @@ -634,7 +622,7 @@ func TestReflectorListAndWatchInitConnBackoff(t *testing.T) { time.Sleep(100 * time.Microsecond) } }() - lw := &testLW{ + lw := &ListWatch{ WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) { if connFails > 0 { connFails-- @@ -687,7 +675,7 @@ func TestBackoffOnTooManyRequests(t *testing.T) { clock := &clock.RealClock{} bm := &fakeBackoff{clock: clock} - lw := &testLW{ + lw := &ListWatch{ ListFunc: func(options metav1.ListOptions) (runtime.Object, error) { return &v1.PodList{ListMeta: metav1.ListMeta{ResourceVersion: "1"}}, nil }, @@ -733,7 +721,7 @@ func TestNoRelistOnTooManyRequests(t *testing.T) { bm := &fakeBackoff{clock: clock} listCalls, watchCalls := 0, 0 - lw := &testLW{ + lw := &ListWatch{ ListFunc: func(options metav1.ListOptions) (runtime.Object, error) { listCalls++ return &v1.PodList{ListMeta: metav1.ListMeta{ResourceVersion: "1"}}, nil @@ -804,7 +792,7 @@ func TestRetryInternalError(t *testing.T) { counter := 0 - lw := &testLW{ + lw := &ListWatch{ ListFunc: func(options metav1.ListOptions) (runtime.Object, error) { return &v1.PodList{ListMeta: metav1.ListMeta{ResourceVersion: "1"}}, nil }, @@ -860,7 +848,7 @@ func TestReflectorResync(t *testing.T) { }, } - lw := &testLW{ + lw := &ListWatch{ WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) { fw := watch.NewFake() return fw, nil @@ -885,7 +873,7 @@ func TestReflectorWatchListPageSize(t *testing.T) { ctx, cancel := context.WithCancelCause(ctx) s := NewStore(MetaNamespaceKeyFunc) - lw := &testLW{ + lw := &ListWatch{ WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) { // Stop once the reflector begins watching since we're only interested in the list. cancel(errors.New("done")) @@ -931,7 +919,7 @@ func TestReflectorNotPaginatingNotConsistentReads(t *testing.T) { ctx, cancel := context.WithCancelCause(ctx) s := NewStore(MetaNamespaceKeyFunc) - lw := &testLW{ + lw := &ListWatch{ WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) { // Stop once the reflector begins watching since we're only interested in the list. cancel(errors.New("done")) @@ -967,7 +955,7 @@ func TestReflectorPaginatingNonConsistentReadsIfWatchCacheDisabled(t *testing.T) var cancel func(error) s := NewStore(MetaNamespaceKeyFunc) - lw := &testLW{ + lw := &ListWatch{ WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) { // Stop once the reflector begins watching since we're only interested in the list. cancel(errors.New("done")) @@ -1024,7 +1012,7 @@ func TestReflectorResyncWithResourceVersion(t *testing.T) { s := NewStore(MetaNamespaceKeyFunc) listCallRVs := []string{} - lw := &testLW{ + lw := &ListWatch{ WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) { // Stop once the reflector begins watching since we're only interested in the list. cancel(errors.New("done")) @@ -1085,7 +1073,7 @@ func TestReflectorExpiredExactResourceVersion(t *testing.T) { s := NewStore(MetaNamespaceKeyFunc) listCallRVs := []string{} - lw := &testLW{ + lw := &ListWatch{ WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) { // Stop once the reflector begins watching since we're only interested in the list. cancel(errors.New("done")) @@ -1145,7 +1133,7 @@ func TestReflectorFullListIfExpired(t *testing.T) { s := NewStore(MetaNamespaceKeyFunc) listCallRVs := []string{} - lw := &testLW{ + lw := &ListWatch{ WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) { // Stop once the reflector begins watching since we're only interested in the list. cancel(errors.New("done")) @@ -1220,7 +1208,7 @@ func TestReflectorFullListIfTooLarge(t *testing.T) { listCallRVs := []string{} version := 30 - lw := &testLW{ + lw := &ListWatch{ WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) { // Stop once the reflector begins watching since we're only interested in the list. cancel(errors.New("done")) @@ -1394,7 +1382,7 @@ func TestWatchTimeout(t *testing.T) { s := NewStore(MetaNamespaceKeyFunc) var gotTimeoutSeconds int64 - lw := &testLW{ + lw := &ListWatch{ ListFunc: func(options metav1.ListOptions) (runtime.Object, error) { return &v1.PodList{ListMeta: metav1.ListMeta{ResourceVersion: "10"}}, nil }, @@ -1450,7 +1438,7 @@ func TestReflectorResourceVersionUpdate(t *testing.T) { ctx, cancel := context.WithCancelCause(ctx) fw := watch.NewFake() - lw := &testLW{ + lw := &ListWatch{ WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) { return fw, nil }, @@ -1866,7 +1854,7 @@ func TestReflectorReplacesStoreOnUnsafeDelete(t *testing.T) { } var once sync.Once - lw := &testLW{ + lw := &ListWatch{ WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) { fw := watch.NewFake() go func() { @@ -1892,6 +1880,7 @@ func TestReflectorReplacesStoreOnUnsafeDelete(t *testing.T) { doneCh, stopCh := make(chan struct{}), make(chan struct{}) go func() { defer close(doneCh) + //nolint:logcheck // Intentionally uses the old API. r.Run(stopCh) }() @@ -2066,9 +2055,6 @@ func BenchmarkEachListItemWithAlloc(b *testing.B) { } func BenchmarkReflectorList(b *testing.B) { - ctx, cancel := context.WithTimeout(context.Background(), wait.ForeverTestTimeout) - defer cancel() - store := NewStore(func(obj interface{}) (string, error) { o, err := meta.Accessor(obj) if err != nil { @@ -2102,6 +2088,7 @@ func BenchmarkReflectorList(b *testing.B) { for _, tc := range tests { b.Run(tc.name, func(b *testing.B) { + _, ctx := ktesting.NewTestContext(b) sample := tc.sample() reflector := NewReflector(newPageTestLW(pageNum), &sample, store, 0) diff --git a/tools/watch/informerwatcher.go b/tools/watch/informerwatcher.go index 5e6aad5c..374264ef 100644 --- a/tools/watch/informerwatcher.go +++ b/tools/watch/informerwatcher.go @@ -17,11 +17,14 @@ limitations under the License. package watch import ( + "context" "sync" "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/util/wait" "k8s.io/apimachinery/pkg/watch" "k8s.io/client-go/tools/cache" + "k8s.io/klog/v2" ) func newEventProcessor(out chan<- watch.Event) *eventProcessor { @@ -103,7 +106,19 @@ func (e *eventProcessor) stop() { // NewIndexerInformerWatcher will create an IndexerInformer and wrap it into watch.Interface // so you can use it anywhere where you'd have used a regular Watcher returned from Watch method. // it also returns a channel you can use to wait for the informers to fully shutdown. +// +// Contextual logging: NewIndexerInformerWatcherWithContext should be used instead of NewIndexerInformerWatcher in code which supports contextual logging. func NewIndexerInformerWatcher(lw cache.ListerWatcher, objType runtime.Object) (cache.Indexer, cache.Controller, watch.Interface, <-chan struct{}) { + return NewIndexerInformerWatcherWithContext(context.Background(), lw, objType) +} + +// NewIndexerInformerWatcher will create an IndexerInformer and wrap it into watch.Interface +// so you can use it anywhere where you'd have used a regular Watcher returned from Watch method. +// it also returns a channel you can use to wait for the informers to fully shutdown. +// +// Cancellation of the context has the same effect as calling [watch.Interface.Stop]. One or +// the other can be used. +func NewIndexerInformerWatcherWithContext(ctx context.Context, lw cache.ListerWatcher, objType runtime.Object) (cache.Indexer, cache.Controller, watch.Interface, <-chan struct{}) { ch := make(chan watch.Event) w := watch.NewProxyWatcher(ch) e := newEventProcessor(ch) @@ -137,13 +152,30 @@ func NewIndexerInformerWatcher(lw cache.ListerWatcher, objType runtime.Object) ( }, }, cache.Indexers{}) + // This will get stopped, but without waiting for it. go e.run() + logger := klog.FromContext(ctx) + if ctx.Done() != nil { + go func() { + select { + case <-ctx.Done(): + // Map cancellation to Stop. The informer below only waits for that. + w.Stop() + case <-w.StopChan(): + } + }() + } + doneCh := make(chan struct{}) go func() { defer close(doneCh) defer e.stop() - informer.Run(w.StopChan()) + // Waiting for w.StopChan() is the traditional behavior which gets + // preserved here. Context cancellation is handled above. + ctx := wait.ContextForChannel(w.StopChan()) + ctx = klog.NewContext(ctx, logger) + informer.RunWithContext(ctx) }() return indexer, informer, w, doneCh diff --git a/tools/watch/informerwatcher_test.go b/tools/watch/informerwatcher_test.go index 1f8e16c2..0a657d76 100644 --- a/tools/watch/informerwatcher_test.go +++ b/tools/watch/informerwatcher_test.go @@ -18,6 +18,7 @@ package watch import ( "context" + "errors" "reflect" goruntime "runtime" "sort" @@ -39,6 +40,8 @@ import ( fakeclientset "k8s.io/client-go/kubernetes/fake" testcore "k8s.io/client-go/testing" "k8s.io/client-go/tools/cache" + "k8s.io/klog/v2" + "k8s.io/klog/v2/ktesting" ) // TestEventProcessorExit is expected to timeout if the event processor fails @@ -320,6 +323,7 @@ func TestNewInformerWatcher(t *testing.T) { return fake.CoreV1().Secrets("").Watch(context.TODO(), options) }, } + //nolint:logcheck // Intentionally uses the older API. _, _, outputWatcher, informerDoneCh := NewIndexerInformerWatcher(lw, &corev1.Secret{}) outputCh := outputWatcher.ResultChan() timeoutCh := time.After(wait.ForeverTestTimeout) @@ -413,6 +417,7 @@ func TestInformerWatcherDeletedFinalStateUnknown(t *testing.T) { return w, nil }, } + //nolint:logcheck // Intentionally uses the older API. _, _, w, done := NewIndexerInformerWatcher(lw, &corev1.Secret{}) defer w.Stop() @@ -462,3 +467,29 @@ func TestInformerWatcherDeletedFinalStateUnknown(t *testing.T) { t.Fatalf("expected at least 1 watch call, got %d", watchCalls) } } + +func TestInformerContext(t *testing.T) { + logger, ctx := ktesting.NewTestContext(t) + ctx, cancel := context.WithCancel(ctx) + defer cancel() + + // Whatever gets called first will stop. + validateContext := func(ctx context.Context) error { + if reflect.TypeOf(logger.GetSink()) != reflect.TypeOf(klog.FromContext(ctx).GetSink()) { + t.Errorf("Expected logger %+v from context, got %+v", logger, klog.FromContext(ctx)) + } + cancel() + return errors.New("not implemented by text") + } + lw := &cache.ListWatch{ + ListWithContextFunc: func(ctx context.Context, options metav1.ListOptions) (runtime.Object, error) { + return nil, validateContext(ctx) + }, + WatchFuncWithContext: func(ctx context.Context, options metav1.ListOptions) (watch.Interface, error) { + return nil, validateContext(ctx) + }, + } + + _, _, _, done := NewIndexerInformerWatcherWithContext(ctx, lw, &corev1.Secret{}) + <-done +} diff --git a/tools/watch/retrywatcher.go b/tools/watch/retrywatcher.go index d36d7455..45249d8e 100644 --- a/tools/watch/retrywatcher.go +++ b/tools/watch/retrywatcher.go @@ -22,7 +22,6 @@ import ( "fmt" "io" "net/http" - "sync" "time" apierrors "k8s.io/apimachinery/pkg/api/errors" @@ -48,23 +47,31 @@ type resourceVersionGetter interface { // Please note that this is not resilient to etcd cache not having the resource version anymore - you would need to // use Informers for that. type RetryWatcher struct { + cancel func(error) lastResourceVersion string - watcherClient cache.Watcher + watcherClient cache.WatcherWithContext resultChan chan watch.Event - stopChan chan struct{} doneChan chan struct{} minRestartDelay time.Duration - stopChanLock sync.Mutex } // NewRetryWatcher creates a new RetryWatcher. // It will make sure that watches gets restarted in case of recoverable errors. // The initialResourceVersion will be given to watch method when first called. +// +// Deprecated: use NewRetryWatcherWithContext instead. func NewRetryWatcher(initialResourceVersion string, watcherClient cache.Watcher) (*RetryWatcher, error) { - return newRetryWatcher(initialResourceVersion, watcherClient, 1*time.Second) + return NewRetryWatcherWithContext(context.Background(), initialResourceVersion, cache.ToWatcherWithContext(watcherClient)) } -func newRetryWatcher(initialResourceVersion string, watcherClient cache.Watcher, minRestartDelay time.Duration) (*RetryWatcher, error) { +// NewRetryWatcherWithContext creates a new RetryWatcher. +// It will make sure that watches gets restarted in case of recoverable errors. +// The initialResourceVersion will be given to watch method when first called. +func NewRetryWatcherWithContext(ctx context.Context, initialResourceVersion string, watcherClient cache.WatcherWithContext) (*RetryWatcher, error) { + return newRetryWatcher(ctx, initialResourceVersion, watcherClient, 1*time.Second) +} + +func newRetryWatcher(ctx context.Context, initialResourceVersion string, watcherClient cache.WatcherWithContext, minRestartDelay time.Duration) (*RetryWatcher, error) { switch initialResourceVersion { case "", "0": // TODO: revisit this if we ever get WATCH v2 where it means start "now" @@ -74,34 +81,36 @@ func newRetryWatcher(initialResourceVersion string, watcherClient cache.Watcher, break } + ctx, cancel := context.WithCancelCause(ctx) + rw := &RetryWatcher{ + cancel: cancel, lastResourceVersion: initialResourceVersion, watcherClient: watcherClient, - stopChan: make(chan struct{}), doneChan: make(chan struct{}), resultChan: make(chan watch.Event, 0), minRestartDelay: minRestartDelay, } - go rw.receive() + go rw.receive(ctx) return rw, nil } -func (rw *RetryWatcher) send(event watch.Event) bool { +func (rw *RetryWatcher) send(ctx context.Context, event watch.Event) bool { // Writing to an unbuffered channel is blocking operation // and we need to check if stop wasn't requested while doing so. select { case rw.resultChan <- event: return true - case <-rw.stopChan: + case <-ctx.Done(): return false } } // doReceive returns true when it is done, false otherwise. // If it is not done the second return value holds the time to wait before calling it again. -func (rw *RetryWatcher) doReceive() (bool, time.Duration) { - watcher, err := rw.watcherClient.Watch(metav1.ListOptions{ +func (rw *RetryWatcher) doReceive(ctx context.Context) (bool, time.Duration) { + watcher, err := rw.watcherClient.WatchWithContext(ctx, metav1.ListOptions{ ResourceVersion: rw.lastResourceVersion, AllowWatchBookmarks: true, }) @@ -117,13 +126,13 @@ func (rw *RetryWatcher) doReceive() (bool, time.Duration) { return false, 0 case io.ErrUnexpectedEOF: - klog.V(1).InfoS("Watch closed with unexpected EOF", "err", err) + klog.FromContext(ctx).V(1).Info("Watch closed with unexpected EOF", "err", err) return false, 0 default: msg := "Watch failed" if net.IsProbableEOF(err) || net.IsTimeout(err) { - klog.V(5).InfoS(msg, "err", err) + klog.FromContext(ctx).V(5).Info(msg, "err", err) // Retry return false, 0 } @@ -132,38 +141,38 @@ func (rw *RetryWatcher) doReceive() (bool, time.Duration) { // being invalid (e.g. expired token). if apierrors.IsForbidden(err) || apierrors.IsUnauthorized(err) { // Add more detail since the forbidden message returned by the Kubernetes API is just "unknown". - klog.ErrorS(err, msg+": ensure the client has valid credentials and watch permissions on the resource") + klog.FromContext(ctx).Error(err, msg+": ensure the client has valid credentials and watch permissions on the resource") if apiStatus, ok := err.(apierrors.APIStatus); ok { statusErr := apiStatus.Status() - sent := rw.send(watch.Event{ + sent := rw.send(ctx, watch.Event{ Type: watch.Error, Object: &statusErr, }) if !sent { // This likely means the RetryWatcher is stopping but return false so the caller to doReceive can // verify this and potentially retry. - klog.Error("Failed to send the Unauthorized or Forbidden watch event") + klog.FromContext(ctx).Error(nil, "Failed to send the Unauthorized or Forbidden watch event") return false, 0 } } else { // This should never happen since apierrors only handles apierrors.APIStatus. Still, this is an // unrecoverable error, so still allow it to return true below. - klog.ErrorS(err, msg+": encountered an unexpected Unauthorized or Forbidden error type") + klog.FromContext(ctx).Error(err, msg+": encountered an unexpected Unauthorized or Forbidden error type") } return true, 0 } - klog.ErrorS(err, msg) + klog.FromContext(ctx).Error(err, msg) // Retry return false, 0 } if watcher == nil { - klog.ErrorS(nil, "Watch returned nil watcher") + klog.FromContext(ctx).Error(nil, "Watch returned nil watcher") // Retry return false, 0 } @@ -173,12 +182,12 @@ func (rw *RetryWatcher) doReceive() (bool, time.Duration) { for { select { - case <-rw.stopChan: - klog.V(4).InfoS("Stopping RetryWatcher.") + case <-ctx.Done(): + klog.FromContext(ctx).V(4).Info("Stopping RetryWatcher") return true, 0 case event, ok := <-ch: if !ok { - klog.V(4).InfoS("Failed to get event! Re-creating the watcher.", "resourceVersion", rw.lastResourceVersion) + klog.FromContext(ctx).V(4).Info("Failed to get event - re-creating the watcher", "resourceVersion", rw.lastResourceVersion) return false, 0 } @@ -187,7 +196,7 @@ func (rw *RetryWatcher) doReceive() (bool, time.Duration) { case watch.Added, watch.Modified, watch.Deleted, watch.Bookmark: metaObject, ok := event.Object.(resourceVersionGetter) if !ok { - _ = rw.send(watch.Event{ + _ = rw.send(ctx, watch.Event{ Type: watch.Error, Object: &apierrors.NewInternalError(errors.New("retryWatcher: doesn't support resourceVersion")).ErrStatus, }) @@ -197,7 +206,7 @@ func (rw *RetryWatcher) doReceive() (bool, time.Duration) { resourceVersion := metaObject.GetResourceVersion() if resourceVersion == "" { - _ = rw.send(watch.Event{ + _ = rw.send(ctx, watch.Event{ Type: watch.Error, Object: &apierrors.NewInternalError(fmt.Errorf("retryWatcher: object %#v doesn't support resourceVersion", event.Object)).ErrStatus, }) @@ -207,7 +216,7 @@ func (rw *RetryWatcher) doReceive() (bool, time.Duration) { // All is fine; send the non-bookmark events and update resource version. if event.Type != watch.Bookmark { - ok = rw.send(event) + ok = rw.send(ctx, event) if !ok { return true, 0 } @@ -221,7 +230,7 @@ func (rw *RetryWatcher) doReceive() (bool, time.Duration) { errObject := apierrors.FromObject(event.Object) statusErr, ok := errObject.(*apierrors.StatusError) if !ok { - klog.Error(fmt.Sprintf("Received an error which is not *metav1.Status but %s", dump.Pretty(event.Object))) + klog.FromContext(ctx).Error(nil, "Received an error which is not *metav1.Status", "errorObject", dump.Pretty(event.Object)) // Retry unknown errors return false, 0 } @@ -236,7 +245,7 @@ func (rw *RetryWatcher) doReceive() (bool, time.Duration) { switch status.Code { case http.StatusGone: // Never retry RV too old errors - _ = rw.send(event) + _ = rw.send(ctx, event) return true, 0 case http.StatusGatewayTimeout, http.StatusInternalServerError: @@ -250,15 +259,15 @@ func (rw *RetryWatcher) doReceive() (bool, time.Duration) { // Log here so we have a record of hitting the unexpected error // and we can whitelist some error codes if we missed any that are expected. - klog.V(5).Info(fmt.Sprintf("Retrying after unexpected error: %s", dump.Pretty(event.Object))) + klog.FromContext(ctx).V(5).Info("Retrying after unexpected error", "errorObject", dump.Pretty(event.Object)) // Retry return false, statusDelay } default: - klog.Errorf("Failed to recognize Event type %q", event.Type) - _ = rw.send(watch.Event{ + klog.FromContext(ctx).Error(nil, "Failed to recognize event", "type", event.Type) + _ = rw.send(ctx, watch.Event{ Type: watch.Error, Object: &apierrors.NewInternalError(fmt.Errorf("retryWatcher failed to recognize Event type %q", event.Type)).ErrStatus, }) @@ -270,29 +279,21 @@ func (rw *RetryWatcher) doReceive() (bool, time.Duration) { } // receive reads the result from a watcher, restarting it if necessary. -func (rw *RetryWatcher) receive() { +func (rw *RetryWatcher) receive(ctx context.Context) { defer close(rw.doneChan) defer close(rw.resultChan) - klog.V(4).Info("Starting RetryWatcher.") - defer klog.V(4).Info("Stopping RetryWatcher.") + logger := klog.FromContext(ctx) + logger.V(4).Info("Starting RetryWatcher") + defer logger.V(4).Info("Stopping RetryWatcher") - ctx, cancel := context.WithCancel(context.Background()) + ctx, cancel := context.WithCancel(ctx) defer cancel() - go func() { - select { - case <-rw.stopChan: - cancel() - return - case <-ctx.Done(): - return - } - }() // We use non sliding until so we don't introduce delays on happy path when WATCH call // timeouts or gets closed and we need to reestablish it while also avoiding hot loops. wait.NonSlidingUntilWithContext(ctx, func(ctx context.Context) { - done, retryAfter := rw.doReceive() + done, retryAfter := rw.doReceive(ctx) if done { cancel() return @@ -306,7 +307,7 @@ func (rw *RetryWatcher) receive() { case <-timer.C: } - klog.V(4).Infof("Restarting RetryWatcher at RV=%q", rw.lastResourceVersion) + logger.V(4).Info("Restarting RetryWatcher", "resourceVersion", rw.lastResourceVersion) }, rw.minRestartDelay) } @@ -317,15 +318,7 @@ func (rw *RetryWatcher) ResultChan() <-chan watch.Event { // Stop implements Interface. func (rw *RetryWatcher) Stop() { - rw.stopChanLock.Lock() - defer rw.stopChanLock.Unlock() - - // Prevent closing an already closed channel to prevent a panic - select { - case <-rw.stopChan: - default: - close(rw.stopChan) - } + rw.cancel(errors.New("asked to stop")) } // Done allows the caller to be notified when Retry watcher stops. diff --git a/tools/watch/retrywatcher_test.go b/tools/watch/retrywatcher_test.go index 873ce37e..307b8da3 100644 --- a/tools/watch/retrywatcher_test.go +++ b/tools/watch/retrywatcher_test.go @@ -37,6 +37,7 @@ import ( "k8s.io/apimachinery/pkg/watch" "k8s.io/client-go/tools/cache" "k8s.io/klog/v2" + "k8s.io/klog/v2/ktesting" ) func init() { @@ -54,7 +55,7 @@ func (o testObject) GetObjectKind() schema.ObjectKind { return schema.EmptyObjec func (o testObject) DeepCopyObject() runtime.Object { return o } func (o testObject) GetResourceVersion() string { return o.resourceVersion } -func withCounter(w cache.Watcher) (*uint32, cache.Watcher) { +func withCounter(w cache.Watcher) (*uint32, cache.WatcherWithContext) { var counter uint32 return &counter, &cache.ListWatch{ WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) { @@ -549,10 +550,11 @@ func TestRetryWatcher(t *testing.T) { for _, tc := range tt { tc := tc t.Run(tc.name, func(t *testing.T) { + _, ctx := ktesting.NewTestContext(t) t.Parallel() atomicCounter, watchFunc := withCounter(tc.watchClient) - watcher, err := newRetryWatcher(tc.initialRV, watchFunc, time.Duration(0)) + watcher, err := newRetryWatcher(ctx, tc.initialRV, watchFunc, time.Duration(0)) if err != nil { t.Fatalf("failed to create a RetryWatcher: %v", err) } diff --git a/tools/watch/until.go b/tools/watch/until.go index a2474556..844b93fb 100644 --- a/tools/watch/until.go +++ b/tools/watch/until.go @@ -105,7 +105,7 @@ func UntilWithoutRetry(ctx context.Context, watcher watch.Interface, conditions // // The most frequent usage for Until would be a test where you want to verify exact order of events ("edges"). func Until(ctx context.Context, initialResourceVersion string, watcherClient cache.Watcher, conditions ...ConditionFunc) (*watch.Event, error) { - w, err := NewRetryWatcher(initialResourceVersion, watcherClient) + w, err := NewRetryWatcherWithContext(ctx, initialResourceVersion, cache.ToWatcherWithContext(watcherClient)) if err != nil { return nil, err } @@ -126,7 +126,7 @@ func Until(ctx context.Context, initialResourceVersion string, watcherClient cac // The most frequent usage would be a command that needs to watch the "state of the world" and should't fail, like: // waiting for object reaching a state, "small" controllers, ... func UntilWithSync(ctx context.Context, lw cache.ListerWatcher, objType runtime.Object, precondition PreconditionFunc, conditions ...ConditionFunc) (*watch.Event, error) { - indexer, informer, watcher, done := NewIndexerInformerWatcher(lw, objType) + indexer, informer, watcher, done := NewIndexerInformerWatcherWithContext(ctx, lw, objType) // We need to wait for the internal informers to fully stop so it's easier to reason about // and it works with non-thread safe clients. defer func() { <-done }() @@ -156,7 +156,7 @@ func UntilWithSync(ctx context.Context, lw cache.ListerWatcher, objType runtime. func ContextWithOptionalTimeout(parent context.Context, timeout time.Duration) (context.Context, context.CancelFunc) { if timeout < 0 { // This should be handled in validation - klog.Errorf("Timeout for context shall not be negative!") + klog.FromContext(parent).Error(nil, "Timeout for context shall not be negative") timeout = 0 }