diff --git a/staging/src/k8s.io/client-go/tools/cache/controller.go b/staging/src/k8s.io/client-go/tools/cache/controller.go index c6f6f52fab3..e523a66522d 100644 --- a/staging/src/k8s.io/client-go/tools/cache/controller.go +++ b/staging/src/k8s.io/client-go/tools/cache/controller.go @@ -59,6 +59,12 @@ type Config struct { // FullResyncPeriod is the period at which ShouldResync is considered. FullResyncPeriod time.Duration + // MinWatchTimeout, if set, will define the minimum timeout for watch requests send + // to kube-apiserver. However, values lower than 5m will not be honored to avoid + // negative performance impact on controlplane. + // Optional - if unset a default value of 5m will be used. + MinWatchTimeout time.Duration + // ShouldResync is periodically used by the reflector to determine // whether to Resync the Queue. If ShouldResync is `nil` or // returns true, it means the reflector should proceed with the @@ -138,6 +144,7 @@ func (c *controller) Run(stopCh <-chan struct{}) { c.config.Queue, ReflectorOptions{ ResyncPeriod: c.config.FullResyncPeriod, + MinWatchTimeout: c.config.MinWatchTimeout, TypeDescription: c.config.ObjectDescription, Clock: c.clock, }, @@ -366,6 +373,12 @@ type InformerOptions struct { // Optional - if unset, store resyncing is not happening periodically. ResyncPeriod time.Duration + // MinWatchTimeout, if set, will define the minimum timeout for watch requests send + // to kube-apiserver. However, values lower than 5m will not be honored to avoid + // negative performance impact on controlplane. + // Optional - if unset a default value of 5m will be used. + MinWatchTimeout time.Duration + // Indexers, if set, are the indexers for the received objects to optimize // certain queries. // Optional - if unset no indexes are maintained. @@ -580,6 +593,7 @@ func newInformer(clientState Store, options InformerOptions) Controller { ListerWatcher: options.ListerWatcher, ObjectType: options.ObjectType, FullResyncPeriod: options.ResyncPeriod, + MinWatchTimeout: options.MinWatchTimeout, RetryOnError: false, Process: func(obj interface{}, isInInitialList bool) error { diff --git a/staging/src/k8s.io/client-go/tools/cache/reflector.go b/staging/src/k8s.io/client-go/tools/cache/reflector.go index f733e244ccc..31fa9dbb50c 100644 --- a/staging/src/k8s.io/client-go/tools/cache/reflector.go +++ b/staging/src/k8s.io/client-go/tools/cache/reflector.go @@ -49,6 +49,12 @@ import ( const defaultExpectedTypeName = "" +var ( + // We try to spread the load on apiserver by setting timeouts for + // watch requests - it is random in [minWatchTimeout, 2*minWatchTimeout]. + defaultMinWatchTimeout = 5 * time.Minute +) + // Reflector watches a specified resource and causes all changes to be reflected in the given store. type Reflector struct { // name identifies this reflector. By default it will be a file:line if possible. @@ -72,6 +78,8 @@ type Reflector struct { // backoff manages backoff of ListWatch backoffManager wait.BackoffManager resyncPeriod time.Duration + // minWatchTimeout defines the minimum timeout for watch requests. + minWatchTimeout time.Duration // clock allows tests to manipulate time clock clock.Clock // paginatedResult defines whether pagination should be forced for list calls. @@ -151,12 +159,6 @@ func DefaultWatchErrorHandler(r *Reflector, err error) { } } -var ( - // We try to spread the load on apiserver by setting timeouts for - // watch requests - it is random in [minWatchTimeout, 2*minWatchTimeout]. - minWatchTimeout = 5 * time.Minute -) - // NewNamespaceKeyedIndexerAndReflector creates an Indexer and a Reflector // The indexer is configured to key on namespace func NewNamespaceKeyedIndexerAndReflector(lw ListerWatcher, expectedType interface{}, resyncPeriod time.Duration) (indexer Indexer, reflector *Reflector) { @@ -194,6 +196,10 @@ type ReflectorOptions struct { // (do not resync). ResyncPeriod time.Duration + // MinWatchTimeout, if non-zero, defines the minimum timeout for watch requests send to kube-apiserver. + // However, values lower than 5m will not be honored to avoid negative performance impact on controlplane. + MinWatchTimeout time.Duration + // Clock allows tests to control time. If unset defaults to clock.RealClock{} Clock clock.Clock } @@ -213,9 +219,14 @@ func NewReflectorWithOptions(lw ListerWatcher, expectedType interface{}, store S if reflectorClock == nil { reflectorClock = clock.RealClock{} } + minWatchTimeout := defaultMinWatchTimeout + if options.MinWatchTimeout > defaultMinWatchTimeout { + minWatchTimeout = options.MinWatchTimeout + } r := &Reflector{ name: options.Name, resyncPeriod: options.ResyncPeriod, + minWatchTimeout: minWatchTimeout, typeDescription: options.TypeDescription, listerWatcher: lw, store: store, @@ -415,7 +426,7 @@ func (r *Reflector) watch(w watch.Interface, stopCh <-chan struct{}, resyncerrc start := r.clock.Now() if w == nil { - timeoutSeconds := int64(minWatchTimeout.Seconds() * (rand.Float64() + 1.0)) + timeoutSeconds := int64(r.minWatchTimeout.Seconds() * (rand.Float64() + 1.0)) options := metav1.ListOptions{ ResourceVersion: r.LastSyncResourceVersion(), // We want to avoid situations of hanging watchers. Stop any watchers that do not @@ -642,7 +653,7 @@ func (r *Reflector) watchList(stopCh <-chan struct{}) (watch.Interface, error) { // TODO(#115478): large "list", slow clients, slow network, p&f // might slow down streaming and eventually fail. // maybe in such a case we should retry with an increased timeout? - timeoutSeconds := int64(minWatchTimeout.Seconds() * (rand.Float64() + 1.0)) + timeoutSeconds := int64(r.minWatchTimeout.Seconds() * (rand.Float64() + 1.0)) options := metav1.ListOptions{ ResourceVersion: lastKnownRV, AllowWatchBookmarks: true, diff --git a/staging/src/k8s.io/client-go/tools/cache/reflector_test.go b/staging/src/k8s.io/client-go/tools/cache/reflector_test.go index 611357b7d51..84a8d2697f2 100644 --- a/staging/src/k8s.io/client-go/tools/cache/reflector_test.go +++ b/staging/src/k8s.io/client-go/tools/cache/reflector_test.go @@ -1091,6 +1091,67 @@ func TestGetExpectedGVKFromObject(t *testing.T) { } } +func TestWatchTimeout(t *testing.T) { + + testCases := []struct { + name string + minWatchTimeout time.Duration + expectedMinTimeoutSeconds int64 + }{ + { + name: "no timeout", + expectedMinTimeoutSeconds: 5 * 60, + }, + { + name: "small timeout not honored", + minWatchTimeout: time.Second, + expectedMinTimeoutSeconds: 5 * 60, + }, + { + name: "30m timeout", + minWatchTimeout: 30 * time.Minute, + expectedMinTimeoutSeconds: 30 * 60, + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + stopCh := make(chan struct{}) + s := NewStore(MetaNamespaceKeyFunc) + var gotTimeoutSeconds int64 + + lw := &testLW{ + ListFunc: func(options metav1.ListOptions) (runtime.Object, error) { + return &v1.PodList{ListMeta: metav1.ListMeta{ResourceVersion: "10"}}, nil + }, + WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) { + if options.TimeoutSeconds != nil { + gotTimeoutSeconds = *options.TimeoutSeconds + } + + // Stop once the reflector begins watching since we're only interested in the list. + close(stopCh) + return watch.NewFake(), nil + }, + } + + opts := ReflectorOptions{ + MinWatchTimeout: tc.minWatchTimeout, + } + r := NewReflectorWithOptions(lw, &v1.Pod{}, s, opts) + if err := r.ListAndWatch(stopCh); err != nil { + t.Fatal(err) + } + + minExpected := tc.expectedMinTimeoutSeconds + maxExpected := 2 * tc.expectedMinTimeoutSeconds + if gotTimeoutSeconds < minExpected || gotTimeoutSeconds > maxExpected { + t.Errorf("unexpected TimeoutSecond, got %v, expected in [%v, %v]", gotTimeoutSeconds, minExpected, maxExpected) + } + }) + } +} + type storeWithRV struct { Store