client-go + apimachinery watch: context support

The Lister and Watcher interfaces only supported methods without context, but
were typically implemented with client-go API calls which need a context. New
interfaces get added using the same approach as in
https://github.com/kubernetes/kubernetes/pull/129109.

Kubernetes-commit: 6688adae142e37114d9dfa8d94cd1d8a91fbcc13
This commit is contained in:
Patrick Ohly 2024-12-20 13:55:47 +01:00 committed by Kubernetes Publisher
parent 362c5e8de9
commit bad1caabde
11 changed files with 329 additions and 128 deletions

View File

@ -1008,7 +1008,8 @@ func (r *Request) newStreamWatcher(ctx context.Context, resp *http.Response) (wa
frameReader := framer.NewFrameReader(resp.Body) frameReader := framer.NewFrameReader(resp.Body)
watchEventDecoder := streaming.NewDecoder(frameReader, streamingSerializer) watchEventDecoder := streaming.NewDecoder(frameReader, streamingSerializer)
return watch.NewStreamWatcher( return watch.NewStreamWatcherWithLogger(
klog.FromContext(ctx),
restclientwatch.NewDecoder(watchEventDecoder, objectDecoder), restclientwatch.NewDecoder(watchEventDecoder, objectDecoder),
// use 500 to indicate that the cause of the error is unknown - other error codes // use 500 to indicate that the cause of the error is unknown - other error codes
// are more specific to HTTP interactions, and set a reason // are more specific to HTTP interactions, and set a reason

View File

@ -363,7 +363,7 @@ func TestUpdate(t *testing.T) {
// everything we've added has been deleted. // everything we've added has been deleted.
watchCh := make(chan struct{}) watchCh := make(chan struct{})
_, controller := NewInformer( _, controller := NewInformer(
&testLW{ &ListWatch{
WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) { WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
watch, err := source.Watch(options) watch, err := source.Watch(options)
close(watchCh) close(watchCh)

View File

@ -27,50 +27,160 @@ import (
) )
// Lister is any object that knows how to perform an initial list. // Lister is any object that knows how to perform an initial list.
//
// Ideally, all implementations of Lister should also implement ListerWithContext.
type Lister interface { type Lister interface {
// List should return a list type object; the Items field will be extracted, and the // List should return a list type object; the Items field will be extracted, and the
// ResourceVersion field will be used to start the watch in the right place. // ResourceVersion field will be used to start the watch in the right place.
//
// Deprecated: use ListerWithContext.ListWithContext instead.
List(options metav1.ListOptions) (runtime.Object, error) List(options metav1.ListOptions) (runtime.Object, error)
} }
// ListerWithContext is any object that knows how to perform an initial list.
type ListerWithContext interface {
// ListWithContext should return a list type object; the Items field will be extracted, and the
// ResourceVersion field will be used to start the watch in the right place.
ListWithContext(ctx context.Context, options metav1.ListOptions) (runtime.Object, error)
}
func ToListerWithContext(l Lister) ListerWithContext {
if l, ok := l.(ListerWithContext); ok {
return l
}
return listerWrapper{
parent: l,
}
}
type listerWrapper struct {
parent Lister
}
func (l listerWrapper) ListWithContext(ctx context.Context, options metav1.ListOptions) (runtime.Object, error) {
return l.parent.List(options)
}
// Watcher is any object that knows how to start a watch on a resource. // Watcher is any object that knows how to start a watch on a resource.
//
// Ideally, all implementations of Watcher should also implement WatcherWithContext.
type Watcher interface { type Watcher interface {
// Watch should begin a watch at the specified version. // Watch should begin a watch at the specified version.
// //
// If Watch returns an error, it should handle its own cleanup, including // If Watch returns an error, it should handle its own cleanup, including
// but not limited to calling Stop() on the watch, if one was constructed. // but not limited to calling Stop() on the watch, if one was constructed.
// This allows the caller to ignore the watch, if the error is non-nil. // This allows the caller to ignore the watch, if the error is non-nil.
//
// Deprecated: use WatcherWithContext.WatchWithContext instead.
Watch(options metav1.ListOptions) (watch.Interface, error) Watch(options metav1.ListOptions) (watch.Interface, error)
} }
// WatcherWithContext is any object that knows how to start a watch on a resource.
type WatcherWithContext interface {
// WatchWithContext should begin a watch at the specified version.
//
// If Watch returns an error, it should handle its own cleanup, including
// but not limited to calling Stop() on the watch, if one was constructed.
// This allows the caller to ignore the watch, if the error is non-nil.
WatchWithContext(ctx context.Context, options metav1.ListOptions) (watch.Interface, error)
}
func ToWatcherWithContext(w Watcher) WatcherWithContext {
if w, ok := w.(WatcherWithContext); ok {
return w
}
return watcherWrapper{
parent: w,
}
}
type watcherWrapper struct {
parent Watcher
}
func (l watcherWrapper) WatchWithContext(ctx context.Context, options metav1.ListOptions) (watch.Interface, error) {
return l.parent.Watch(options)
}
// ListerWatcher is any object that knows how to perform an initial list and start a watch on a resource. // ListerWatcher is any object that knows how to perform an initial list and start a watch on a resource.
//
// Ideally, all implementations of ListerWatcher should also implement ListerWatcherWithContext.
type ListerWatcher interface { type ListerWatcher interface {
Lister Lister
Watcher Watcher
} }
// ListerWatcherWithContext is any object that knows how to perform an initial list and start a watch on a resource.
type ListerWatcherWithContext interface {
ListerWithContext
WatcherWithContext
}
func ToListerWatcherWithContext(lw ListerWatcher) ListerWatcherWithContext {
if lw, ok := lw.(ListerWatcherWithContext); ok {
return lw
}
return listerWatcherWrapper{
ListerWithContext: ToListerWithContext(lw),
WatcherWithContext: ToWatcherWithContext(lw),
}
}
type listerWatcherWrapper struct {
ListerWithContext
WatcherWithContext
}
// ListFunc knows how to list resources // ListFunc knows how to list resources
//
// Deprecated: use ListWithContextFunc instead.
type ListFunc func(options metav1.ListOptions) (runtime.Object, error) type ListFunc func(options metav1.ListOptions) (runtime.Object, error)
// ListWithContextFunc knows how to list resources
type ListWithContextFunc func(ctx context.Context, options metav1.ListOptions) (runtime.Object, error)
// WatchFunc knows how to watch resources // WatchFunc knows how to watch resources
//
// Deprecated: use WatchFuncWithContext instead.
type WatchFunc func(options metav1.ListOptions) (watch.Interface, error) type WatchFunc func(options metav1.ListOptions) (watch.Interface, error)
// ListWatch knows how to list and watch a set of apiserver resources. It satisfies the ListerWatcher interface. // WatchFuncWithContext knows how to watch resources
type WatchFuncWithContext func(ctx context.Context, options metav1.ListOptions) (watch.Interface, error)
// ListWatch knows how to list and watch a set of apiserver resources.
// It satisfies the ListerWatcher and ListerWatcherWithContext interfaces.
// It is a convenience function for users of NewReflector, etc. // It is a convenience function for users of NewReflector, etc.
// ListFunc and WatchFunc must not be nil // ListFunc or ListWithContextFunc must be set. Same for WatchFunc and WatchFuncWithContext.
// ListWithContextFunc and WatchFuncWithContext are preferred if
// a context is available, otherwise ListFunc and WatchFunc.
//
// NewFilteredListWatchFromClient sets all of the functions to ensure that callers
// which only know about ListFunc and WatchFunc continue to work.
type ListWatch struct { type ListWatch struct {
ListFunc ListFunc // Deprecated: use ListWithContext instead.
ListFunc ListFunc
// Deprecated: use WatchWithContext instead.
WatchFunc WatchFunc WatchFunc WatchFunc
ListWithContextFunc ListWithContextFunc
WatchFuncWithContext WatchFuncWithContext
// DisableChunking requests no chunking for this list watcher. // DisableChunking requests no chunking for this list watcher.
DisableChunking bool DisableChunking bool
} }
var (
_ ListerWatcher = &ListWatch{}
_ ListerWatcherWithContext = &ListWatch{}
)
// Getter interface knows how to access Get method from RESTClient. // Getter interface knows how to access Get method from RESTClient.
type Getter interface { type Getter interface {
Get() *restclient.Request Get() *restclient.Request
} }
// NewListWatchFromClient creates a new ListWatch from the specified client, resource, namespace and field selector. // NewListWatchFromClient creates a new ListWatch from the specified client, resource, namespace and field selector.
// For backward compatibility, all function fields are populated.
func NewListWatchFromClient(c Getter, resource string, namespace string, fieldSelector fields.Selector) *ListWatch { func NewListWatchFromClient(c Getter, resource string, namespace string, fieldSelector fields.Selector) *ListWatch {
optionsModifier := func(options *metav1.ListOptions) { optionsModifier := func(options *metav1.ListOptions) {
options.FieldSelector = fieldSelector.String() options.FieldSelector = fieldSelector.String()
@ -81,6 +191,7 @@ func NewListWatchFromClient(c Getter, resource string, namespace string, fieldSe
// NewFilteredListWatchFromClient creates a new ListWatch from the specified client, resource, namespace, and option modifier. // NewFilteredListWatchFromClient creates a new ListWatch from the specified client, resource, namespace, and option modifier.
// Option modifier is a function takes a ListOptions and modifies the consumed ListOptions. Provide customized modifier function // Option modifier is a function takes a ListOptions and modifies the consumed ListOptions. Provide customized modifier function
// to apply modification to ListOptions with a field selector, a label selector, or any other desired options. // to apply modification to ListOptions with a field selector, a label selector, or any other desired options.
// For backward compatibility, all function fields are populated.
func NewFilteredListWatchFromClient(c Getter, resource string, namespace string, optionsModifier func(options *metav1.ListOptions)) *ListWatch { func NewFilteredListWatchFromClient(c Getter, resource string, namespace string, optionsModifier func(options *metav1.ListOptions)) *ListWatch {
listFunc := func(options metav1.ListOptions) (runtime.Object, error) { listFunc := func(options metav1.ListOptions) (runtime.Object, error) {
optionsModifier(&options) optionsModifier(&options)
@ -88,7 +199,7 @@ func NewFilteredListWatchFromClient(c Getter, resource string, namespace string,
Namespace(namespace). Namespace(namespace).
Resource(resource). Resource(resource).
VersionedParams(&options, metav1.ParameterCodec). VersionedParams(&options, metav1.ParameterCodec).
Do(context.TODO()). Do(context.Background()).
Get() Get()
} }
watchFunc := func(options metav1.ListOptions) (watch.Interface, error) { watchFunc := func(options metav1.ListOptions) (watch.Interface, error) {
@ -98,19 +209,70 @@ func NewFilteredListWatchFromClient(c Getter, resource string, namespace string,
Namespace(namespace). Namespace(namespace).
Resource(resource). Resource(resource).
VersionedParams(&options, metav1.ParameterCodec). VersionedParams(&options, metav1.ParameterCodec).
Watch(context.TODO()) Watch(context.Background())
}
listFuncWithContext := func(ctx context.Context, options metav1.ListOptions) (runtime.Object, error) {
optionsModifier(&options)
return c.Get().
Namespace(namespace).
Resource(resource).
VersionedParams(&options, metav1.ParameterCodec).
Do(ctx).
Get()
}
watchFuncWithContext := func(ctx context.Context, options metav1.ListOptions) (watch.Interface, error) {
options.Watch = true
optionsModifier(&options)
return c.Get().
Namespace(namespace).
Resource(resource).
VersionedParams(&options, metav1.ParameterCodec).
Watch(ctx)
}
return &ListWatch{
ListFunc: listFunc,
WatchFunc: watchFunc,
ListWithContextFunc: listFuncWithContext,
WatchFuncWithContext: watchFuncWithContext,
} }
return &ListWatch{ListFunc: listFunc, WatchFunc: watchFunc}
} }
// List a set of apiserver resources // List a set of apiserver resources
//
// Deprecated: use ListWatchWithContext.ListWithContext instead.
func (lw *ListWatch) List(options metav1.ListOptions) (runtime.Object, error) { func (lw *ListWatch) List(options metav1.ListOptions) (runtime.Object, error) {
// ListWatch is used in Reflector, which already supports pagination. // ListWatch is used in Reflector, which already supports pagination.
// Don't paginate here to avoid duplication. // Don't paginate here to avoid duplication.
if lw.ListFunc != nil {
return lw.ListFunc(options)
}
return lw.ListWithContextFunc(context.Background(), options)
}
// List a set of apiserver resources
func (lw *ListWatch) ListWithContext(ctx context.Context, options metav1.ListOptions) (runtime.Object, error) {
// ListWatch is used in Reflector, which already supports pagination.
// Don't paginate here to avoid duplication.
if lw.ListWithContextFunc != nil {
return lw.ListWithContextFunc(ctx, options)
}
return lw.ListFunc(options) return lw.ListFunc(options)
} }
// Watch a set of apiserver resources // Watch a set of apiserver resources
//
// Deprecated: use ListWatchWithContext.WatchWithContext instead.
func (lw *ListWatch) Watch(options metav1.ListOptions) (watch.Interface, error) { func (lw *ListWatch) Watch(options metav1.ListOptions) (watch.Interface, error) {
if lw.WatchFunc != nil {
return lw.WatchFunc(options)
}
return lw.WatchFuncWithContext(context.Background(), options)
}
// Watch a set of apiserver resources
func (lw *ListWatch) WatchWithContext(ctx context.Context, options metav1.ListOptions) (watch.Interface, error) {
if lw.WatchFuncWithContext != nil {
return lw.WatchFuncWithContext(ctx, options)
}
return lw.WatchFunc(options) return lw.WatchFunc(options)
} }

View File

@ -20,7 +20,7 @@ import (
"testing" "testing"
"time" "time"
"k8s.io/api/core/v1" v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/util/wait" "k8s.io/apimachinery/pkg/util/wait"
@ -29,7 +29,7 @@ import (
func TestMutationDetector(t *testing.T) { func TestMutationDetector(t *testing.T) {
fakeWatch := watch.NewFake() fakeWatch := watch.NewFake()
lw := &testLW{ lw := &ListWatch{
WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) { WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
return fakeWatch, nil return fakeWatch, nil
}, },

View File

@ -96,7 +96,7 @@ type Reflector struct {
// The destination to sync up with the watch source // The destination to sync up with the watch source
store ReflectorStore store ReflectorStore
// listerWatcher is used to perform lists and watches. // listerWatcher is used to perform lists and watches.
listerWatcher ListerWatcher listerWatcher ListerWatcherWithContext
// backoff manages backoff of ListWatch // backoff manages backoff of ListWatch
backoffManager wait.BackoffManager backoffManager wait.BackoffManager
resyncPeriod time.Duration resyncPeriod time.Duration
@ -270,7 +270,7 @@ func NewReflectorWithOptions(lw ListerWatcher, expectedType interface{}, store R
resyncPeriod: options.ResyncPeriod, resyncPeriod: options.ResyncPeriod,
minWatchTimeout: minWatchTimeout, minWatchTimeout: minWatchTimeout,
typeDescription: options.TypeDescription, typeDescription: options.TypeDescription,
listerWatcher: lw, listerWatcher: ToListerWatcherWithContext(lw),
store: store, store: store,
// We used to make the call every 1sec (1 QPS), the goal here is to achieve ~98% traffic reduction when // We used to make the call every 1sec (1 QPS), the goal here is to achieve ~98% traffic reduction when
// API server is not healthy. With these parameters, backoff will stop at [30,60) sec interval which is // API server is not healthy. With these parameters, backoff will stop at [30,60) sec interval which is
@ -512,7 +512,7 @@ func (r *Reflector) watch(ctx context.Context, w watch.Interface, resyncerrc cha
AllowWatchBookmarks: true, AllowWatchBookmarks: true,
} }
w, err = r.listerWatcher.Watch(options) w, err = r.listerWatcher.WatchWithContext(ctx, options)
if err != nil { if err != nil {
if canRetry := isWatchErrorRetriable(err); canRetry { if canRetry := isWatchErrorRetriable(err); canRetry {
logger.V(4).Info("Watch failed - backing off", "reflector", r.name, "type", r.typeDescription, "err", err) logger.V(4).Info("Watch failed - backing off", "reflector", r.name, "type", r.typeDescription, "err", err)
@ -583,7 +583,7 @@ func (r *Reflector) list(ctx context.Context) error {
// Attempt to gather list in chunks, if supported by listerWatcher, if not, the first // Attempt to gather list in chunks, if supported by listerWatcher, if not, the first
// list request will return the full response. // list request will return the full response.
pager := pager.New(pager.SimplePageFunc(func(opts metav1.ListOptions) (runtime.Object, error) { pager := pager.New(pager.SimplePageFunc(func(opts metav1.ListOptions) (runtime.Object, error) {
return r.listerWatcher.List(opts) return r.listerWatcher.ListWithContext(ctx, opts)
})) }))
switch { switch {
case r.WatchListPageSize != 0: case r.WatchListPageSize != 0:
@ -739,7 +739,7 @@ func (r *Reflector) watchList(ctx context.Context) (watch.Interface, error) {
} }
start := r.clock.Now() start := r.clock.Now()
w, err = r.listerWatcher.Watch(options) w, err = r.listerWatcher.WatchWithContext(ctx, options)
if err != nil { if err != nil {
if isErrorRetriableWithSideEffectsFn(err) { if isErrorRetriableWithSideEffectsFn(err) {
continue continue
@ -771,7 +771,7 @@ func (r *Reflector) watchList(ctx context.Context) (watch.Interface, error) {
// we utilize the temporaryStore to ensure independence from the current store implementation. // we utilize the temporaryStore to ensure independence from the current store implementation.
// as of today, the store is implemented as a queue and will be drained by the higher-level // as of today, the store is implemented as a queue and will be drained by the higher-level
// component as soon as it finishes replacing the content. // component as soon as it finishes replacing the content.
checkWatchListDataConsistencyIfRequested(ctx, r.name, resourceVersion, wrapListFuncWithContext(r.listerWatcher.List), temporaryStore.List) checkWatchListDataConsistencyIfRequested(ctx, r.name, resourceVersion, r.listerWatcher.ListWithContext, temporaryStore.List)
if err := r.store.Replace(temporaryStore.List(), resourceVersion); err != nil { if err := r.store.Replace(temporaryStore.List(), resourceVersion); err != nil {
return nil, fmt.Errorf("unable to sync watch-list result: %w", err) return nil, fmt.Errorf("unable to sync watch-list result: %w", err)
@ -1057,13 +1057,6 @@ func isWatchErrorRetriable(err error) bool {
return false return false
} }
// wrapListFuncWithContext simply wraps ListFunction into another function that accepts a context and ignores it.
func wrapListFuncWithContext(listFn ListFunc) func(ctx context.Context, options metav1.ListOptions) (runtime.Object, error) {
return func(_ context.Context, options metav1.ListOptions) (runtime.Object, error) {
return listFn(options)
}
}
// initialEventsEndBookmarkTicker a ticker that produces a warning if the bookmark event // initialEventsEndBookmarkTicker a ticker that produces a warning if the bookmark event
// which marks the end of the watch stream, has not been received within the defined tick interval. // which marks the end of the watch stream, has not been received within the defined tick interval.
// //

View File

@ -52,24 +52,12 @@ import (
var nevererrc chan error var nevererrc chan error
type testLW struct {
ListFunc func(options metav1.ListOptions) (runtime.Object, error)
WatchFunc func(options metav1.ListOptions) (watch.Interface, error)
}
func (t *testLW) List(options metav1.ListOptions) (runtime.Object, error) {
return t.ListFunc(options)
}
func (t *testLW) Watch(options metav1.ListOptions) (watch.Interface, error) {
return t.WatchFunc(options)
}
func TestCloseWatchChannelOnError(t *testing.T) { func TestCloseWatchChannelOnError(t *testing.T) {
_, ctx := ktesting.NewTestContext(t) _, ctx := ktesting.NewTestContext(t)
r := NewReflector(&testLW{}, &v1.Pod{}, NewStore(MetaNamespaceKeyFunc), 0) r := NewReflector(&ListWatch{}, &v1.Pod{}, NewStore(MetaNamespaceKeyFunc), 0)
pod := &v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "bar"}} pod := &v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "bar"}}
fw := watch.NewFake() fw := watch.NewFake()
r.listerWatcher = &testLW{ r.listerWatcher = &ListWatch{
WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) { WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
return fw, nil return fw, nil
}, },
@ -94,9 +82,9 @@ func TestRunUntil(t *testing.T) {
_, ctx := ktesting.NewTestContext(t) _, ctx := ktesting.NewTestContext(t)
ctx, cancel := context.WithCancelCause(ctx) ctx, cancel := context.WithCancelCause(ctx)
store := NewStore(MetaNamespaceKeyFunc) store := NewStore(MetaNamespaceKeyFunc)
r := NewReflector(&testLW{}, &v1.Pod{}, store, 0) r := NewReflector(&ListWatch{}, &v1.Pod{}, store, 0)
fw := watch.NewFake() fw := watch.NewFake()
r.listerWatcher = &testLW{ r.listerWatcher = &ListWatch{
WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) { WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
return fw, nil return fw, nil
}, },
@ -138,7 +126,7 @@ func TestRunUntil(t *testing.T) {
func TestReflectorResyncChan(t *testing.T) { func TestReflectorResyncChan(t *testing.T) {
s := NewStore(MetaNamespaceKeyFunc) s := NewStore(MetaNamespaceKeyFunc)
g := NewReflector(&testLW{}, &v1.Pod{}, s, time.Millisecond) g := NewReflector(&ListWatch{}, &v1.Pod{}, s, time.Millisecond)
a, _ := g.resyncChan() a, _ := g.resyncChan()
b := time.After(wait.ForeverTestTimeout) b := time.After(wait.ForeverTestTimeout)
select { select {
@ -208,7 +196,7 @@ func TestReflectorWatchStoppedAfter(t *testing.T) {
func BenchmarkReflectorResyncChanMany(b *testing.B) { func BenchmarkReflectorResyncChanMany(b *testing.B) {
s := NewStore(MetaNamespaceKeyFunc) s := NewStore(MetaNamespaceKeyFunc)
g := NewReflector(&testLW{}, &v1.Pod{}, s, 25*time.Millisecond) g := NewReflector(&ListWatch{}, &v1.Pod{}, s, 25*time.Millisecond)
// The improvement to this (calling the timer's Stop() method) makes // The improvement to this (calling the timer's Stop() method) makes
// this benchmark about 40% faster. // this benchmark about 40% faster.
for i := 0; i < b.N; i++ { for i := 0; i < b.N; i++ {
@ -223,7 +211,7 @@ func BenchmarkReflectorResyncChanMany(b *testing.B) {
// ResultChan is only called once and that Stop is called after ResultChan. // ResultChan is only called once and that Stop is called after ResultChan.
func TestReflectorHandleWatchStoppedBefore(t *testing.T) { func TestReflectorHandleWatchStoppedBefore(t *testing.T) {
s := NewStore(MetaNamespaceKeyFunc) s := NewStore(MetaNamespaceKeyFunc)
g := NewReflector(&testLW{}, &v1.Pod{}, s, 0) g := NewReflector(&ListWatch{}, &v1.Pod{}, s, 0)
_, ctx := ktesting.NewTestContext(t) _, ctx := ktesting.NewTestContext(t)
ctx, cancel := context.WithCancelCause(ctx) ctx, cancel := context.WithCancelCause(ctx)
// Simulate the context being canceled before the watchHandler is called // Simulate the context being canceled before the watchHandler is called
@ -255,7 +243,7 @@ func TestReflectorHandleWatchStoppedBefore(t *testing.T) {
// ResultChan is only called once and that Stop is called after ResultChan. // ResultChan is only called once and that Stop is called after ResultChan.
func TestReflectorHandleWatchStoppedAfter(t *testing.T) { func TestReflectorHandleWatchStoppedAfter(t *testing.T) {
s := NewStore(MetaNamespaceKeyFunc) s := NewStore(MetaNamespaceKeyFunc)
g := NewReflector(&testLW{}, &v1.Pod{}, s, 0) g := NewReflector(&ListWatch{}, &v1.Pod{}, s, 0)
var calls []string var calls []string
_, ctx := ktesting.NewTestContext(t) _, ctx := ktesting.NewTestContext(t)
ctx, cancel := context.WithCancelCause(ctx) ctx, cancel := context.WithCancelCause(ctx)
@ -291,7 +279,7 @@ func TestReflectorHandleWatchStoppedAfter(t *testing.T) {
// stops when the result channel is closed before handleWatch was called. // stops when the result channel is closed before handleWatch was called.
func TestReflectorHandleWatchResultChanClosedBefore(t *testing.T) { func TestReflectorHandleWatchResultChanClosedBefore(t *testing.T) {
s := NewStore(MetaNamespaceKeyFunc) s := NewStore(MetaNamespaceKeyFunc)
g := NewReflector(&testLW{}, &v1.Pod{}, s, 0) g := NewReflector(&ListWatch{}, &v1.Pod{}, s, 0)
_, ctx := ktesting.NewTestContext(t) _, ctx := ktesting.NewTestContext(t)
var calls []string var calls []string
resultCh := make(chan watch.Event) resultCh := make(chan watch.Event)
@ -320,7 +308,7 @@ func TestReflectorHandleWatchResultChanClosedBefore(t *testing.T) {
// stops when the result channel is closed after handleWatch has started watching. // stops when the result channel is closed after handleWatch has started watching.
func TestReflectorHandleWatchResultChanClosedAfter(t *testing.T) { func TestReflectorHandleWatchResultChanClosedAfter(t *testing.T) {
s := NewStore(MetaNamespaceKeyFunc) s := NewStore(MetaNamespaceKeyFunc)
g := NewReflector(&testLW{}, &v1.Pod{}, s, 0) g := NewReflector(&ListWatch{}, &v1.Pod{}, s, 0)
_, ctx := ktesting.NewTestContext(t) _, ctx := ktesting.NewTestContext(t)
var calls []string var calls []string
resultCh := make(chan watch.Event) resultCh := make(chan watch.Event)
@ -352,7 +340,7 @@ func TestReflectorHandleWatchResultChanClosedAfter(t *testing.T) {
func TestReflectorWatchHandler(t *testing.T) { func TestReflectorWatchHandler(t *testing.T) {
s := NewStore(MetaNamespaceKeyFunc) s := NewStore(MetaNamespaceKeyFunc)
g := NewReflector(&testLW{}, &v1.Pod{}, s, 0) g := NewReflector(&ListWatch{}, &v1.Pod{}, s, 0)
// Wrap setLastSyncResourceVersion so we can tell the watchHandler to stop // Wrap setLastSyncResourceVersion so we can tell the watchHandler to stop
// watching after all the events have been consumed. This avoids race // watching after all the events have been consumed. This avoids race
// conditions which can happen if the producer calls Stop(), instead of the // conditions which can happen if the producer calls Stop(), instead of the
@ -416,7 +404,7 @@ func TestReflectorWatchHandler(t *testing.T) {
func TestReflectorStopWatch(t *testing.T) { func TestReflectorStopWatch(t *testing.T) {
s := NewStore(MetaNamespaceKeyFunc) s := NewStore(MetaNamespaceKeyFunc)
g := NewReflector(&testLW{}, &v1.Pod{}, s, 0) g := NewReflector(&ListWatch{}, &v1.Pod{}, s, 0)
fw := watch.NewFake() fw := watch.NewFake()
_, ctx := ktesting.NewTestContext(t) _, ctx := ktesting.NewTestContext(t)
ctx, cancel := context.WithCancelCause(ctx) ctx, cancel := context.WithCancelCause(ctx)
@ -437,7 +425,7 @@ func TestReflectorListAndWatch(t *testing.T) {
// to get called at the beginning of the watch with 1, and again with 3 when we // to get called at the beginning of the watch with 1, and again with 3 when we
// inject an error. // inject an error.
expectedRVs := []string{"1", "3"} expectedRVs := []string{"1", "3"}
lw := &testLW{ lw := &ListWatch{
WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) { WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
rv := options.ResourceVersion rv := options.ResourceVersion
fw := watch.NewFake() fw := watch.NewFake()
@ -555,7 +543,7 @@ func TestReflectorListAndWatchWithErrors(t *testing.T) {
watchRet, watchErr := item.events, item.watchErr watchRet, watchErr := item.events, item.watchErr
_, ctx := ktesting.NewTestContext(t) _, ctx := ktesting.NewTestContext(t)
ctx, cancel := context.WithCancelCause(ctx) ctx, cancel := context.WithCancelCause(ctx)
lw := &testLW{ lw := &ListWatch{
WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) { WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
if watchErr != nil { if watchErr != nil {
return nil, watchErr return nil, watchErr
@ -634,7 +622,7 @@ func TestReflectorListAndWatchInitConnBackoff(t *testing.T) {
time.Sleep(100 * time.Microsecond) time.Sleep(100 * time.Microsecond)
} }
}() }()
lw := &testLW{ lw := &ListWatch{
WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) { WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
if connFails > 0 { if connFails > 0 {
connFails-- connFails--
@ -687,7 +675,7 @@ func TestBackoffOnTooManyRequests(t *testing.T) {
clock := &clock.RealClock{} clock := &clock.RealClock{}
bm := &fakeBackoff{clock: clock} bm := &fakeBackoff{clock: clock}
lw := &testLW{ lw := &ListWatch{
ListFunc: func(options metav1.ListOptions) (runtime.Object, error) { ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
return &v1.PodList{ListMeta: metav1.ListMeta{ResourceVersion: "1"}}, nil return &v1.PodList{ListMeta: metav1.ListMeta{ResourceVersion: "1"}}, nil
}, },
@ -733,7 +721,7 @@ func TestNoRelistOnTooManyRequests(t *testing.T) {
bm := &fakeBackoff{clock: clock} bm := &fakeBackoff{clock: clock}
listCalls, watchCalls := 0, 0 listCalls, watchCalls := 0, 0
lw := &testLW{ lw := &ListWatch{
ListFunc: func(options metav1.ListOptions) (runtime.Object, error) { ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
listCalls++ listCalls++
return &v1.PodList{ListMeta: metav1.ListMeta{ResourceVersion: "1"}}, nil return &v1.PodList{ListMeta: metav1.ListMeta{ResourceVersion: "1"}}, nil
@ -804,7 +792,7 @@ func TestRetryInternalError(t *testing.T) {
counter := 0 counter := 0
lw := &testLW{ lw := &ListWatch{
ListFunc: func(options metav1.ListOptions) (runtime.Object, error) { ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
return &v1.PodList{ListMeta: metav1.ListMeta{ResourceVersion: "1"}}, nil return &v1.PodList{ListMeta: metav1.ListMeta{ResourceVersion: "1"}}, nil
}, },
@ -860,7 +848,7 @@ func TestReflectorResync(t *testing.T) {
}, },
} }
lw := &testLW{ lw := &ListWatch{
WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) { WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
fw := watch.NewFake() fw := watch.NewFake()
return fw, nil return fw, nil
@ -885,7 +873,7 @@ func TestReflectorWatchListPageSize(t *testing.T) {
ctx, cancel := context.WithCancelCause(ctx) ctx, cancel := context.WithCancelCause(ctx)
s := NewStore(MetaNamespaceKeyFunc) s := NewStore(MetaNamespaceKeyFunc)
lw := &testLW{ lw := &ListWatch{
WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) { WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
// Stop once the reflector begins watching since we're only interested in the list. // Stop once the reflector begins watching since we're only interested in the list.
cancel(errors.New("done")) cancel(errors.New("done"))
@ -931,7 +919,7 @@ func TestReflectorNotPaginatingNotConsistentReads(t *testing.T) {
ctx, cancel := context.WithCancelCause(ctx) ctx, cancel := context.WithCancelCause(ctx)
s := NewStore(MetaNamespaceKeyFunc) s := NewStore(MetaNamespaceKeyFunc)
lw := &testLW{ lw := &ListWatch{
WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) { WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
// Stop once the reflector begins watching since we're only interested in the list. // Stop once the reflector begins watching since we're only interested in the list.
cancel(errors.New("done")) cancel(errors.New("done"))
@ -967,7 +955,7 @@ func TestReflectorPaginatingNonConsistentReadsIfWatchCacheDisabled(t *testing.T)
var cancel func(error) var cancel func(error)
s := NewStore(MetaNamespaceKeyFunc) s := NewStore(MetaNamespaceKeyFunc)
lw := &testLW{ lw := &ListWatch{
WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) { WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
// Stop once the reflector begins watching since we're only interested in the list. // Stop once the reflector begins watching since we're only interested in the list.
cancel(errors.New("done")) cancel(errors.New("done"))
@ -1024,7 +1012,7 @@ func TestReflectorResyncWithResourceVersion(t *testing.T) {
s := NewStore(MetaNamespaceKeyFunc) s := NewStore(MetaNamespaceKeyFunc)
listCallRVs := []string{} listCallRVs := []string{}
lw := &testLW{ lw := &ListWatch{
WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) { WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
// Stop once the reflector begins watching since we're only interested in the list. // Stop once the reflector begins watching since we're only interested in the list.
cancel(errors.New("done")) cancel(errors.New("done"))
@ -1085,7 +1073,7 @@ func TestReflectorExpiredExactResourceVersion(t *testing.T) {
s := NewStore(MetaNamespaceKeyFunc) s := NewStore(MetaNamespaceKeyFunc)
listCallRVs := []string{} listCallRVs := []string{}
lw := &testLW{ lw := &ListWatch{
WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) { WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
// Stop once the reflector begins watching since we're only interested in the list. // Stop once the reflector begins watching since we're only interested in the list.
cancel(errors.New("done")) cancel(errors.New("done"))
@ -1145,7 +1133,7 @@ func TestReflectorFullListIfExpired(t *testing.T) {
s := NewStore(MetaNamespaceKeyFunc) s := NewStore(MetaNamespaceKeyFunc)
listCallRVs := []string{} listCallRVs := []string{}
lw := &testLW{ lw := &ListWatch{
WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) { WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
// Stop once the reflector begins watching since we're only interested in the list. // Stop once the reflector begins watching since we're only interested in the list.
cancel(errors.New("done")) cancel(errors.New("done"))
@ -1220,7 +1208,7 @@ func TestReflectorFullListIfTooLarge(t *testing.T) {
listCallRVs := []string{} listCallRVs := []string{}
version := 30 version := 30
lw := &testLW{ lw := &ListWatch{
WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) { WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
// Stop once the reflector begins watching since we're only interested in the list. // Stop once the reflector begins watching since we're only interested in the list.
cancel(errors.New("done")) cancel(errors.New("done"))
@ -1394,7 +1382,7 @@ func TestWatchTimeout(t *testing.T) {
s := NewStore(MetaNamespaceKeyFunc) s := NewStore(MetaNamespaceKeyFunc)
var gotTimeoutSeconds int64 var gotTimeoutSeconds int64
lw := &testLW{ lw := &ListWatch{
ListFunc: func(options metav1.ListOptions) (runtime.Object, error) { ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
return &v1.PodList{ListMeta: metav1.ListMeta{ResourceVersion: "10"}}, nil return &v1.PodList{ListMeta: metav1.ListMeta{ResourceVersion: "10"}}, nil
}, },
@ -1450,7 +1438,7 @@ func TestReflectorResourceVersionUpdate(t *testing.T) {
ctx, cancel := context.WithCancelCause(ctx) ctx, cancel := context.WithCancelCause(ctx)
fw := watch.NewFake() fw := watch.NewFake()
lw := &testLW{ lw := &ListWatch{
WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) { WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
return fw, nil return fw, nil
}, },
@ -1866,7 +1854,7 @@ func TestReflectorReplacesStoreOnUnsafeDelete(t *testing.T) {
} }
var once sync.Once var once sync.Once
lw := &testLW{ lw := &ListWatch{
WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) { WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
fw := watch.NewFake() fw := watch.NewFake()
go func() { go func() {
@ -1892,6 +1880,7 @@ func TestReflectorReplacesStoreOnUnsafeDelete(t *testing.T) {
doneCh, stopCh := make(chan struct{}), make(chan struct{}) doneCh, stopCh := make(chan struct{}), make(chan struct{})
go func() { go func() {
defer close(doneCh) defer close(doneCh)
//nolint:logcheck // Intentionally uses the old API.
r.Run(stopCh) r.Run(stopCh)
}() }()
@ -2066,9 +2055,6 @@ func BenchmarkEachListItemWithAlloc(b *testing.B) {
} }
func BenchmarkReflectorList(b *testing.B) { func BenchmarkReflectorList(b *testing.B) {
ctx, cancel := context.WithTimeout(context.Background(), wait.ForeverTestTimeout)
defer cancel()
store := NewStore(func(obj interface{}) (string, error) { store := NewStore(func(obj interface{}) (string, error) {
o, err := meta.Accessor(obj) o, err := meta.Accessor(obj)
if err != nil { if err != nil {
@ -2102,6 +2088,7 @@ func BenchmarkReflectorList(b *testing.B) {
for _, tc := range tests { for _, tc := range tests {
b.Run(tc.name, func(b *testing.B) { b.Run(tc.name, func(b *testing.B) {
_, ctx := ktesting.NewTestContext(b)
sample := tc.sample() sample := tc.sample()
reflector := NewReflector(newPageTestLW(pageNum), &sample, store, 0) reflector := NewReflector(newPageTestLW(pageNum), &sample, store, 0)

View File

@ -17,11 +17,14 @@ limitations under the License.
package watch package watch
import ( import (
"context"
"sync" "sync"
"k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/apimachinery/pkg/watch" "k8s.io/apimachinery/pkg/watch"
"k8s.io/client-go/tools/cache" "k8s.io/client-go/tools/cache"
"k8s.io/klog/v2"
) )
func newEventProcessor(out chan<- watch.Event) *eventProcessor { func newEventProcessor(out chan<- watch.Event) *eventProcessor {
@ -103,7 +106,19 @@ func (e *eventProcessor) stop() {
// NewIndexerInformerWatcher will create an IndexerInformer and wrap it into watch.Interface // NewIndexerInformerWatcher will create an IndexerInformer and wrap it into watch.Interface
// so you can use it anywhere where you'd have used a regular Watcher returned from Watch method. // so you can use it anywhere where you'd have used a regular Watcher returned from Watch method.
// it also returns a channel you can use to wait for the informers to fully shutdown. // it also returns a channel you can use to wait for the informers to fully shutdown.
//
// Contextual logging: NewIndexerInformerWatcherWithContext should be used instead of NewIndexerInformerWatcher in code which supports contextual logging.
func NewIndexerInformerWatcher(lw cache.ListerWatcher, objType runtime.Object) (cache.Indexer, cache.Controller, watch.Interface, <-chan struct{}) { func NewIndexerInformerWatcher(lw cache.ListerWatcher, objType runtime.Object) (cache.Indexer, cache.Controller, watch.Interface, <-chan struct{}) {
return NewIndexerInformerWatcherWithContext(context.Background(), lw, objType)
}
// NewIndexerInformerWatcher will create an IndexerInformer and wrap it into watch.Interface
// so you can use it anywhere where you'd have used a regular Watcher returned from Watch method.
// it also returns a channel you can use to wait for the informers to fully shutdown.
//
// Cancellation of the context has the same effect as calling [watch.Interface.Stop]. One or
// the other can be used.
func NewIndexerInformerWatcherWithContext(ctx context.Context, lw cache.ListerWatcher, objType runtime.Object) (cache.Indexer, cache.Controller, watch.Interface, <-chan struct{}) {
ch := make(chan watch.Event) ch := make(chan watch.Event)
w := watch.NewProxyWatcher(ch) w := watch.NewProxyWatcher(ch)
e := newEventProcessor(ch) e := newEventProcessor(ch)
@ -137,13 +152,30 @@ func NewIndexerInformerWatcher(lw cache.ListerWatcher, objType runtime.Object) (
}, },
}, cache.Indexers{}) }, cache.Indexers{})
// This will get stopped, but without waiting for it.
go e.run() go e.run()
logger := klog.FromContext(ctx)
if ctx.Done() != nil {
go func() {
select {
case <-ctx.Done():
// Map cancellation to Stop. The informer below only waits for that.
w.Stop()
case <-w.StopChan():
}
}()
}
doneCh := make(chan struct{}) doneCh := make(chan struct{})
go func() { go func() {
defer close(doneCh) defer close(doneCh)
defer e.stop() defer e.stop()
informer.Run(w.StopChan()) // Waiting for w.StopChan() is the traditional behavior which gets
// preserved here. Context cancellation is handled above.
ctx := wait.ContextForChannel(w.StopChan())
ctx = klog.NewContext(ctx, logger)
informer.RunWithContext(ctx)
}() }()
return indexer, informer, w, doneCh return indexer, informer, w, doneCh

View File

@ -18,6 +18,7 @@ package watch
import ( import (
"context" "context"
"errors"
"reflect" "reflect"
goruntime "runtime" goruntime "runtime"
"sort" "sort"
@ -39,6 +40,8 @@ import (
fakeclientset "k8s.io/client-go/kubernetes/fake" fakeclientset "k8s.io/client-go/kubernetes/fake"
testcore "k8s.io/client-go/testing" testcore "k8s.io/client-go/testing"
"k8s.io/client-go/tools/cache" "k8s.io/client-go/tools/cache"
"k8s.io/klog/v2"
"k8s.io/klog/v2/ktesting"
) )
// TestEventProcessorExit is expected to timeout if the event processor fails // TestEventProcessorExit is expected to timeout if the event processor fails
@ -320,6 +323,7 @@ func TestNewInformerWatcher(t *testing.T) {
return fake.CoreV1().Secrets("").Watch(context.TODO(), options) return fake.CoreV1().Secrets("").Watch(context.TODO(), options)
}, },
} }
//nolint:logcheck // Intentionally uses the older API.
_, _, outputWatcher, informerDoneCh := NewIndexerInformerWatcher(lw, &corev1.Secret{}) _, _, outputWatcher, informerDoneCh := NewIndexerInformerWatcher(lw, &corev1.Secret{})
outputCh := outputWatcher.ResultChan() outputCh := outputWatcher.ResultChan()
timeoutCh := time.After(wait.ForeverTestTimeout) timeoutCh := time.After(wait.ForeverTestTimeout)
@ -413,6 +417,7 @@ func TestInformerWatcherDeletedFinalStateUnknown(t *testing.T) {
return w, nil return w, nil
}, },
} }
//nolint:logcheck // Intentionally uses the older API.
_, _, w, done := NewIndexerInformerWatcher(lw, &corev1.Secret{}) _, _, w, done := NewIndexerInformerWatcher(lw, &corev1.Secret{})
defer w.Stop() defer w.Stop()
@ -462,3 +467,29 @@ func TestInformerWatcherDeletedFinalStateUnknown(t *testing.T) {
t.Fatalf("expected at least 1 watch call, got %d", watchCalls) t.Fatalf("expected at least 1 watch call, got %d", watchCalls)
} }
} }
func TestInformerContext(t *testing.T) {
logger, ctx := ktesting.NewTestContext(t)
ctx, cancel := context.WithCancel(ctx)
defer cancel()
// Whatever gets called first will stop.
validateContext := func(ctx context.Context) error {
if reflect.TypeOf(logger.GetSink()) != reflect.TypeOf(klog.FromContext(ctx).GetSink()) {
t.Errorf("Expected logger %+v from context, got %+v", logger, klog.FromContext(ctx))
}
cancel()
return errors.New("not implemented by text")
}
lw := &cache.ListWatch{
ListWithContextFunc: func(ctx context.Context, options metav1.ListOptions) (runtime.Object, error) {
return nil, validateContext(ctx)
},
WatchFuncWithContext: func(ctx context.Context, options metav1.ListOptions) (watch.Interface, error) {
return nil, validateContext(ctx)
},
}
_, _, _, done := NewIndexerInformerWatcherWithContext(ctx, lw, &corev1.Secret{})
<-done
}

View File

@ -22,7 +22,6 @@ import (
"fmt" "fmt"
"io" "io"
"net/http" "net/http"
"sync"
"time" "time"
apierrors "k8s.io/apimachinery/pkg/api/errors" apierrors "k8s.io/apimachinery/pkg/api/errors"
@ -48,23 +47,31 @@ type resourceVersionGetter interface {
// Please note that this is not resilient to etcd cache not having the resource version anymore - you would need to // Please note that this is not resilient to etcd cache not having the resource version anymore - you would need to
// use Informers for that. // use Informers for that.
type RetryWatcher struct { type RetryWatcher struct {
cancel func(error)
lastResourceVersion string lastResourceVersion string
watcherClient cache.Watcher watcherClient cache.WatcherWithContext
resultChan chan watch.Event resultChan chan watch.Event
stopChan chan struct{}
doneChan chan struct{} doneChan chan struct{}
minRestartDelay time.Duration minRestartDelay time.Duration
stopChanLock sync.Mutex
} }
// NewRetryWatcher creates a new RetryWatcher. // NewRetryWatcher creates a new RetryWatcher.
// It will make sure that watches gets restarted in case of recoverable errors. // It will make sure that watches gets restarted in case of recoverable errors.
// The initialResourceVersion will be given to watch method when first called. // The initialResourceVersion will be given to watch method when first called.
//
// Deprecated: use NewRetryWatcherWithContext instead.
func NewRetryWatcher(initialResourceVersion string, watcherClient cache.Watcher) (*RetryWatcher, error) { func NewRetryWatcher(initialResourceVersion string, watcherClient cache.Watcher) (*RetryWatcher, error) {
return newRetryWatcher(initialResourceVersion, watcherClient, 1*time.Second) return NewRetryWatcherWithContext(context.Background(), initialResourceVersion, cache.ToWatcherWithContext(watcherClient))
} }
func newRetryWatcher(initialResourceVersion string, watcherClient cache.Watcher, minRestartDelay time.Duration) (*RetryWatcher, error) { // NewRetryWatcherWithContext creates a new RetryWatcher.
// It will make sure that watches gets restarted in case of recoverable errors.
// The initialResourceVersion will be given to watch method when first called.
func NewRetryWatcherWithContext(ctx context.Context, initialResourceVersion string, watcherClient cache.WatcherWithContext) (*RetryWatcher, error) {
return newRetryWatcher(ctx, initialResourceVersion, watcherClient, 1*time.Second)
}
func newRetryWatcher(ctx context.Context, initialResourceVersion string, watcherClient cache.WatcherWithContext, minRestartDelay time.Duration) (*RetryWatcher, error) {
switch initialResourceVersion { switch initialResourceVersion {
case "", "0": case "", "0":
// TODO: revisit this if we ever get WATCH v2 where it means start "now" // TODO: revisit this if we ever get WATCH v2 where it means start "now"
@ -74,34 +81,36 @@ func newRetryWatcher(initialResourceVersion string, watcherClient cache.Watcher,
break break
} }
ctx, cancel := context.WithCancelCause(ctx)
rw := &RetryWatcher{ rw := &RetryWatcher{
cancel: cancel,
lastResourceVersion: initialResourceVersion, lastResourceVersion: initialResourceVersion,
watcherClient: watcherClient, watcherClient: watcherClient,
stopChan: make(chan struct{}),
doneChan: make(chan struct{}), doneChan: make(chan struct{}),
resultChan: make(chan watch.Event, 0), resultChan: make(chan watch.Event, 0),
minRestartDelay: minRestartDelay, minRestartDelay: minRestartDelay,
} }
go rw.receive() go rw.receive(ctx)
return rw, nil return rw, nil
} }
func (rw *RetryWatcher) send(event watch.Event) bool { func (rw *RetryWatcher) send(ctx context.Context, event watch.Event) bool {
// Writing to an unbuffered channel is blocking operation // Writing to an unbuffered channel is blocking operation
// and we need to check if stop wasn't requested while doing so. // and we need to check if stop wasn't requested while doing so.
select { select {
case rw.resultChan <- event: case rw.resultChan <- event:
return true return true
case <-rw.stopChan: case <-ctx.Done():
return false return false
} }
} }
// doReceive returns true when it is done, false otherwise. // doReceive returns true when it is done, false otherwise.
// If it is not done the second return value holds the time to wait before calling it again. // If it is not done the second return value holds the time to wait before calling it again.
func (rw *RetryWatcher) doReceive() (bool, time.Duration) { func (rw *RetryWatcher) doReceive(ctx context.Context) (bool, time.Duration) {
watcher, err := rw.watcherClient.Watch(metav1.ListOptions{ watcher, err := rw.watcherClient.WatchWithContext(ctx, metav1.ListOptions{
ResourceVersion: rw.lastResourceVersion, ResourceVersion: rw.lastResourceVersion,
AllowWatchBookmarks: true, AllowWatchBookmarks: true,
}) })
@ -117,13 +126,13 @@ func (rw *RetryWatcher) doReceive() (bool, time.Duration) {
return false, 0 return false, 0
case io.ErrUnexpectedEOF: case io.ErrUnexpectedEOF:
klog.V(1).InfoS("Watch closed with unexpected EOF", "err", err) klog.FromContext(ctx).V(1).Info("Watch closed with unexpected EOF", "err", err)
return false, 0 return false, 0
default: default:
msg := "Watch failed" msg := "Watch failed"
if net.IsProbableEOF(err) || net.IsTimeout(err) { if net.IsProbableEOF(err) || net.IsTimeout(err) {
klog.V(5).InfoS(msg, "err", err) klog.FromContext(ctx).V(5).Info(msg, "err", err)
// Retry // Retry
return false, 0 return false, 0
} }
@ -132,38 +141,38 @@ func (rw *RetryWatcher) doReceive() (bool, time.Duration) {
// being invalid (e.g. expired token). // being invalid (e.g. expired token).
if apierrors.IsForbidden(err) || apierrors.IsUnauthorized(err) { if apierrors.IsForbidden(err) || apierrors.IsUnauthorized(err) {
// Add more detail since the forbidden message returned by the Kubernetes API is just "unknown". // Add more detail since the forbidden message returned by the Kubernetes API is just "unknown".
klog.ErrorS(err, msg+": ensure the client has valid credentials and watch permissions on the resource") klog.FromContext(ctx).Error(err, msg+": ensure the client has valid credentials and watch permissions on the resource")
if apiStatus, ok := err.(apierrors.APIStatus); ok { if apiStatus, ok := err.(apierrors.APIStatus); ok {
statusErr := apiStatus.Status() statusErr := apiStatus.Status()
sent := rw.send(watch.Event{ sent := rw.send(ctx, watch.Event{
Type: watch.Error, Type: watch.Error,
Object: &statusErr, Object: &statusErr,
}) })
if !sent { if !sent {
// This likely means the RetryWatcher is stopping but return false so the caller to doReceive can // This likely means the RetryWatcher is stopping but return false so the caller to doReceive can
// verify this and potentially retry. // verify this and potentially retry.
klog.Error("Failed to send the Unauthorized or Forbidden watch event") klog.FromContext(ctx).Error(nil, "Failed to send the Unauthorized or Forbidden watch event")
return false, 0 return false, 0
} }
} else { } else {
// This should never happen since apierrors only handles apierrors.APIStatus. Still, this is an // This should never happen since apierrors only handles apierrors.APIStatus. Still, this is an
// unrecoverable error, so still allow it to return true below. // unrecoverable error, so still allow it to return true below.
klog.ErrorS(err, msg+": encountered an unexpected Unauthorized or Forbidden error type") klog.FromContext(ctx).Error(err, msg+": encountered an unexpected Unauthorized or Forbidden error type")
} }
return true, 0 return true, 0
} }
klog.ErrorS(err, msg) klog.FromContext(ctx).Error(err, msg)
// Retry // Retry
return false, 0 return false, 0
} }
if watcher == nil { if watcher == nil {
klog.ErrorS(nil, "Watch returned nil watcher") klog.FromContext(ctx).Error(nil, "Watch returned nil watcher")
// Retry // Retry
return false, 0 return false, 0
} }
@ -173,12 +182,12 @@ func (rw *RetryWatcher) doReceive() (bool, time.Duration) {
for { for {
select { select {
case <-rw.stopChan: case <-ctx.Done():
klog.V(4).InfoS("Stopping RetryWatcher.") klog.FromContext(ctx).V(4).Info("Stopping RetryWatcher")
return true, 0 return true, 0
case event, ok := <-ch: case event, ok := <-ch:
if !ok { if !ok {
klog.V(4).InfoS("Failed to get event! Re-creating the watcher.", "resourceVersion", rw.lastResourceVersion) klog.FromContext(ctx).V(4).Info("Failed to get event - re-creating the watcher", "resourceVersion", rw.lastResourceVersion)
return false, 0 return false, 0
} }
@ -187,7 +196,7 @@ func (rw *RetryWatcher) doReceive() (bool, time.Duration) {
case watch.Added, watch.Modified, watch.Deleted, watch.Bookmark: case watch.Added, watch.Modified, watch.Deleted, watch.Bookmark:
metaObject, ok := event.Object.(resourceVersionGetter) metaObject, ok := event.Object.(resourceVersionGetter)
if !ok { if !ok {
_ = rw.send(watch.Event{ _ = rw.send(ctx, watch.Event{
Type: watch.Error, Type: watch.Error,
Object: &apierrors.NewInternalError(errors.New("retryWatcher: doesn't support resourceVersion")).ErrStatus, Object: &apierrors.NewInternalError(errors.New("retryWatcher: doesn't support resourceVersion")).ErrStatus,
}) })
@ -197,7 +206,7 @@ func (rw *RetryWatcher) doReceive() (bool, time.Duration) {
resourceVersion := metaObject.GetResourceVersion() resourceVersion := metaObject.GetResourceVersion()
if resourceVersion == "" { if resourceVersion == "" {
_ = rw.send(watch.Event{ _ = rw.send(ctx, watch.Event{
Type: watch.Error, Type: watch.Error,
Object: &apierrors.NewInternalError(fmt.Errorf("retryWatcher: object %#v doesn't support resourceVersion", event.Object)).ErrStatus, Object: &apierrors.NewInternalError(fmt.Errorf("retryWatcher: object %#v doesn't support resourceVersion", event.Object)).ErrStatus,
}) })
@ -207,7 +216,7 @@ func (rw *RetryWatcher) doReceive() (bool, time.Duration) {
// All is fine; send the non-bookmark events and update resource version. // All is fine; send the non-bookmark events and update resource version.
if event.Type != watch.Bookmark { if event.Type != watch.Bookmark {
ok = rw.send(event) ok = rw.send(ctx, event)
if !ok { if !ok {
return true, 0 return true, 0
} }
@ -221,7 +230,7 @@ func (rw *RetryWatcher) doReceive() (bool, time.Duration) {
errObject := apierrors.FromObject(event.Object) errObject := apierrors.FromObject(event.Object)
statusErr, ok := errObject.(*apierrors.StatusError) statusErr, ok := errObject.(*apierrors.StatusError)
if !ok { if !ok {
klog.Error(fmt.Sprintf("Received an error which is not *metav1.Status but %s", dump.Pretty(event.Object))) klog.FromContext(ctx).Error(nil, "Received an error which is not *metav1.Status", "errorObject", dump.Pretty(event.Object))
// Retry unknown errors // Retry unknown errors
return false, 0 return false, 0
} }
@ -236,7 +245,7 @@ func (rw *RetryWatcher) doReceive() (bool, time.Duration) {
switch status.Code { switch status.Code {
case http.StatusGone: case http.StatusGone:
// Never retry RV too old errors // Never retry RV too old errors
_ = rw.send(event) _ = rw.send(ctx, event)
return true, 0 return true, 0
case http.StatusGatewayTimeout, http.StatusInternalServerError: case http.StatusGatewayTimeout, http.StatusInternalServerError:
@ -250,15 +259,15 @@ func (rw *RetryWatcher) doReceive() (bool, time.Duration) {
// Log here so we have a record of hitting the unexpected error // Log here so we have a record of hitting the unexpected error
// and we can whitelist some error codes if we missed any that are expected. // and we can whitelist some error codes if we missed any that are expected.
klog.V(5).Info(fmt.Sprintf("Retrying after unexpected error: %s", dump.Pretty(event.Object))) klog.FromContext(ctx).V(5).Info("Retrying after unexpected error", "errorObject", dump.Pretty(event.Object))
// Retry // Retry
return false, statusDelay return false, statusDelay
} }
default: default:
klog.Errorf("Failed to recognize Event type %q", event.Type) klog.FromContext(ctx).Error(nil, "Failed to recognize event", "type", event.Type)
_ = rw.send(watch.Event{ _ = rw.send(ctx, watch.Event{
Type: watch.Error, Type: watch.Error,
Object: &apierrors.NewInternalError(fmt.Errorf("retryWatcher failed to recognize Event type %q", event.Type)).ErrStatus, Object: &apierrors.NewInternalError(fmt.Errorf("retryWatcher failed to recognize Event type %q", event.Type)).ErrStatus,
}) })
@ -270,29 +279,21 @@ func (rw *RetryWatcher) doReceive() (bool, time.Duration) {
} }
// receive reads the result from a watcher, restarting it if necessary. // receive reads the result from a watcher, restarting it if necessary.
func (rw *RetryWatcher) receive() { func (rw *RetryWatcher) receive(ctx context.Context) {
defer close(rw.doneChan) defer close(rw.doneChan)
defer close(rw.resultChan) defer close(rw.resultChan)
klog.V(4).Info("Starting RetryWatcher.") logger := klog.FromContext(ctx)
defer klog.V(4).Info("Stopping RetryWatcher.") logger.V(4).Info("Starting RetryWatcher")
defer logger.V(4).Info("Stopping RetryWatcher")
ctx, cancel := context.WithCancel(context.Background()) ctx, cancel := context.WithCancel(ctx)
defer cancel() defer cancel()
go func() {
select {
case <-rw.stopChan:
cancel()
return
case <-ctx.Done():
return
}
}()
// We use non sliding until so we don't introduce delays on happy path when WATCH call // We use non sliding until so we don't introduce delays on happy path when WATCH call
// timeouts or gets closed and we need to reestablish it while also avoiding hot loops. // timeouts or gets closed and we need to reestablish it while also avoiding hot loops.
wait.NonSlidingUntilWithContext(ctx, func(ctx context.Context) { wait.NonSlidingUntilWithContext(ctx, func(ctx context.Context) {
done, retryAfter := rw.doReceive() done, retryAfter := rw.doReceive(ctx)
if done { if done {
cancel() cancel()
return return
@ -306,7 +307,7 @@ func (rw *RetryWatcher) receive() {
case <-timer.C: case <-timer.C:
} }
klog.V(4).Infof("Restarting RetryWatcher at RV=%q", rw.lastResourceVersion) logger.V(4).Info("Restarting RetryWatcher", "resourceVersion", rw.lastResourceVersion)
}, rw.minRestartDelay) }, rw.minRestartDelay)
} }
@ -317,15 +318,7 @@ func (rw *RetryWatcher) ResultChan() <-chan watch.Event {
// Stop implements Interface. // Stop implements Interface.
func (rw *RetryWatcher) Stop() { func (rw *RetryWatcher) Stop() {
rw.stopChanLock.Lock() rw.cancel(errors.New("asked to stop"))
defer rw.stopChanLock.Unlock()
// Prevent closing an already closed channel to prevent a panic
select {
case <-rw.stopChan:
default:
close(rw.stopChan)
}
} }
// Done allows the caller to be notified when Retry watcher stops. // Done allows the caller to be notified when Retry watcher stops.

View File

@ -37,6 +37,7 @@ import (
"k8s.io/apimachinery/pkg/watch" "k8s.io/apimachinery/pkg/watch"
"k8s.io/client-go/tools/cache" "k8s.io/client-go/tools/cache"
"k8s.io/klog/v2" "k8s.io/klog/v2"
"k8s.io/klog/v2/ktesting"
) )
func init() { func init() {
@ -54,7 +55,7 @@ func (o testObject) GetObjectKind() schema.ObjectKind { return schema.EmptyObjec
func (o testObject) DeepCopyObject() runtime.Object { return o } func (o testObject) DeepCopyObject() runtime.Object { return o }
func (o testObject) GetResourceVersion() string { return o.resourceVersion } func (o testObject) GetResourceVersion() string { return o.resourceVersion }
func withCounter(w cache.Watcher) (*uint32, cache.Watcher) { func withCounter(w cache.Watcher) (*uint32, cache.WatcherWithContext) {
var counter uint32 var counter uint32
return &counter, &cache.ListWatch{ return &counter, &cache.ListWatch{
WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) { WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
@ -549,10 +550,11 @@ func TestRetryWatcher(t *testing.T) {
for _, tc := range tt { for _, tc := range tt {
tc := tc tc := tc
t.Run(tc.name, func(t *testing.T) { t.Run(tc.name, func(t *testing.T) {
_, ctx := ktesting.NewTestContext(t)
t.Parallel() t.Parallel()
atomicCounter, watchFunc := withCounter(tc.watchClient) atomicCounter, watchFunc := withCounter(tc.watchClient)
watcher, err := newRetryWatcher(tc.initialRV, watchFunc, time.Duration(0)) watcher, err := newRetryWatcher(ctx, tc.initialRV, watchFunc, time.Duration(0))
if err != nil { if err != nil {
t.Fatalf("failed to create a RetryWatcher: %v", err) t.Fatalf("failed to create a RetryWatcher: %v", err)
} }

View File

@ -105,7 +105,7 @@ func UntilWithoutRetry(ctx context.Context, watcher watch.Interface, conditions
// //
// The most frequent usage for Until would be a test where you want to verify exact order of events ("edges"). // The most frequent usage for Until would be a test where you want to verify exact order of events ("edges").
func Until(ctx context.Context, initialResourceVersion string, watcherClient cache.Watcher, conditions ...ConditionFunc) (*watch.Event, error) { func Until(ctx context.Context, initialResourceVersion string, watcherClient cache.Watcher, conditions ...ConditionFunc) (*watch.Event, error) {
w, err := NewRetryWatcher(initialResourceVersion, watcherClient) w, err := NewRetryWatcherWithContext(ctx, initialResourceVersion, cache.ToWatcherWithContext(watcherClient))
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -126,7 +126,7 @@ func Until(ctx context.Context, initialResourceVersion string, watcherClient cac
// The most frequent usage would be a command that needs to watch the "state of the world" and should't fail, like: // The most frequent usage would be a command that needs to watch the "state of the world" and should't fail, like:
// waiting for object reaching a state, "small" controllers, ... // waiting for object reaching a state, "small" controllers, ...
func UntilWithSync(ctx context.Context, lw cache.ListerWatcher, objType runtime.Object, precondition PreconditionFunc, conditions ...ConditionFunc) (*watch.Event, error) { func UntilWithSync(ctx context.Context, lw cache.ListerWatcher, objType runtime.Object, precondition PreconditionFunc, conditions ...ConditionFunc) (*watch.Event, error) {
indexer, informer, watcher, done := NewIndexerInformerWatcher(lw, objType) indexer, informer, watcher, done := NewIndexerInformerWatcherWithContext(ctx, lw, objType)
// We need to wait for the internal informers to fully stop so it's easier to reason about // We need to wait for the internal informers to fully stop so it's easier to reason about
// and it works with non-thread safe clients. // and it works with non-thread safe clients.
defer func() { <-done }() defer func() { <-done }()
@ -156,7 +156,7 @@ func UntilWithSync(ctx context.Context, lw cache.ListerWatcher, objType runtime.
func ContextWithOptionalTimeout(parent context.Context, timeout time.Duration) (context.Context, context.CancelFunc) { func ContextWithOptionalTimeout(parent context.Context, timeout time.Duration) (context.Context, context.CancelFunc) {
if timeout < 0 { if timeout < 0 {
// This should be handled in validation // This should be handled in validation
klog.Errorf("Timeout for context shall not be negative!") klog.FromContext(parent).Error(nil, "Timeout for context shall not be negative")
timeout = 0 timeout = 0
} }