Merge pull request #124245 from wojtek-t/informer_options

Allow for configuring MinWatchTimeout in reflector

Kubernetes-commit: 32a3cbc51dd1d572ba3d6eba66df333e1454dd58
This commit is contained in:
Kubernetes Publisher 2024-04-18 03:24:57 -07:00
commit 3407442ff2
5 changed files with 193 additions and 39 deletions

4
go.mod
View File

@ -24,8 +24,8 @@ require (
golang.org/x/term v0.18.0 golang.org/x/term v0.18.0
golang.org/x/time v0.3.0 golang.org/x/time v0.3.0
google.golang.org/protobuf v1.33.0 google.golang.org/protobuf v1.33.0
k8s.io/api v0.0.0-20240418013359-a819b1d9bd16 k8s.io/api v0.0.0-20240418093414-76b6c7ce02a4
k8s.io/apimachinery v0.0.0-20240418093209-8c36da9e60f7 k8s.io/apimachinery v0.0.0-20240418133208-ea31e5150286
k8s.io/klog/v2 v2.120.1 k8s.io/klog/v2 v2.120.1
k8s.io/kube-openapi v0.0.0-20240228011516-70dd3763d340 k8s.io/kube-openapi v0.0.0-20240228011516-70dd3763d340
k8s.io/utils v0.0.0-20230726121419-3b25d923346b k8s.io/utils v0.0.0-20230726121419-3b25d923346b

8
go.sum
View File

@ -153,10 +153,10 @@ gopkg.in/yaml.v2 v2.4.0/go.mod h1:RDklbk79AGWmwhnvt/jBztapEOGDOx6ZbXqjP6csGnQ=
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
k8s.io/api v0.0.0-20240418013359-a819b1d9bd16 h1:DXJla1ulezom5N0QIRlZetozcxtRPdS7U+muHDJuiO4= k8s.io/api v0.0.0-20240418093414-76b6c7ce02a4 h1:Aoz6y1eO+GKFC27cTyO+VJkbaEyQsGec2IhAYYaN7IU=
k8s.io/api v0.0.0-20240418013359-a819b1d9bd16/go.mod h1:a1YU16kjsAapUzg1LYaOqTnbMlo87NXy9bSeWjRmfoo= k8s.io/api v0.0.0-20240418093414-76b6c7ce02a4/go.mod h1:sbi+6EMV/95qn9kNfIj4HSNs4jcBqkcBxEAD+AKHUHY=
k8s.io/apimachinery v0.0.0-20240418093209-8c36da9e60f7 h1:SydhMcp6AJkjqqVcd0o0uz7ntTcs/QyIgIHAFYfIm7E= k8s.io/apimachinery v0.0.0-20240418133208-ea31e5150286 h1:pGpFsAFMSxtFe98HpTcgDgblsARQhckNfASAgCmlXS4=
k8s.io/apimachinery v0.0.0-20240418093209-8c36da9e60f7/go.mod h1:iexa2somDaxdnj7bha06bhb43Zpa6eWH8N8dbqVjTUc= k8s.io/apimachinery v0.0.0-20240418133208-ea31e5150286/go.mod h1:iexa2somDaxdnj7bha06bhb43Zpa6eWH8N8dbqVjTUc=
k8s.io/klog/v2 v2.120.1 h1:QXU6cPEOIslTGvZaXvFWiP9VKyeet3sawzTOvdXb4Vw= k8s.io/klog/v2 v2.120.1 h1:QXU6cPEOIslTGvZaXvFWiP9VKyeet3sawzTOvdXb4Vw=
k8s.io/klog/v2 v2.120.1/go.mod h1:3Jpz1GvMt720eyJH1ckRHK1EDfpxISzJ7I9OYgaDtPE= k8s.io/klog/v2 v2.120.1/go.mod h1:3Jpz1GvMt720eyJH1ckRHK1EDfpxISzJ7I9OYgaDtPE=
k8s.io/kube-openapi v0.0.0-20240228011516-70dd3763d340 h1:BZqlfIlq5YbRMFko6/PM7FjZpUb45WallggurYhKGag= k8s.io/kube-openapi v0.0.0-20240228011516-70dd3763d340 h1:BZqlfIlq5YbRMFko6/PM7FjZpUb45WallggurYhKGag=

View File

@ -59,6 +59,12 @@ type Config struct {
// FullResyncPeriod is the period at which ShouldResync is considered. // FullResyncPeriod is the period at which ShouldResync is considered.
FullResyncPeriod time.Duration FullResyncPeriod time.Duration
// MinWatchTimeout, if set, will define the minimum timeout for watch requests send
// to kube-apiserver. However, values lower than 5m will not be honored to avoid
// negative performance impact on controlplane.
// Optional - if unset a default value of 5m will be used.
MinWatchTimeout time.Duration
// ShouldResync is periodically used by the reflector to determine // ShouldResync is periodically used by the reflector to determine
// whether to Resync the Queue. If ShouldResync is `nil` or // whether to Resync the Queue. If ShouldResync is `nil` or
// returns true, it means the reflector should proceed with the // returns true, it means the reflector should proceed with the
@ -138,6 +144,7 @@ func (c *controller) Run(stopCh <-chan struct{}) {
c.config.Queue, c.config.Queue,
ReflectorOptions{ ReflectorOptions{
ResyncPeriod: c.config.FullResyncPeriod, ResyncPeriod: c.config.FullResyncPeriod,
MinWatchTimeout: c.config.MinWatchTimeout,
TypeDescription: c.config.ObjectDescription, TypeDescription: c.config.ObjectDescription,
Clock: c.clock, Clock: c.clock,
}, },
@ -346,6 +353,58 @@ func DeletionHandlingObjectToName(obj interface{}) (ObjectName, error) {
return ObjectToName(obj) return ObjectToName(obj)
} }
// InformerOptions configure a Reflector.
type InformerOptions struct {
// ListerWatcher implements List and Watch functions for the source of the resource
// the informer will be informing about.
ListerWatcher ListerWatcher
// ObjectType is an object of the type that informer is expected to receive.
ObjectType runtime.Object
// Handler defines functions that should called on object mutations.
Handler ResourceEventHandler
// ResyncPeriod is the underlying Reflector's resync period. If non-zero, the store
// is re-synced with that frequency - Modify events are delivered even if objects
// didn't change.
// This is useful for synchronizing objects that configure external resources
// (e.g. configure cloud provider functionalities).
// Optional - if unset, store resyncing is not happening periodically.
ResyncPeriod time.Duration
// MinWatchTimeout, if set, will define the minimum timeout for watch requests send
// to kube-apiserver. However, values lower than 5m will not be honored to avoid
// negative performance impact on controlplane.
// Optional - if unset a default value of 5m will be used.
MinWatchTimeout time.Duration
// Indexers, if set, are the indexers for the received objects to optimize
// certain queries.
// Optional - if unset no indexes are maintained.
Indexers Indexers
// Transform function, if set, will be called on all objects before they will be
// put into the Store and corresponding Add/Modify/Delete handlers will be invoked
// for them.
// Optional - if unset no additional transforming is happening.
Transform TransformFunc
}
// NewInformerWithOptions returns a Store and a controller for populating the store
// while also providing event notifications. You should only used the returned
// Store for Get/List operations; Add/Modify/Deletes will cause the event
// notifications to be faulty.
func NewInformerWithOptions(options InformerOptions) (Store, Controller) {
var clientState Store
if options.Indexers == nil {
clientState = NewStore(DeletionHandlingMetaNamespaceKeyFunc)
} else {
clientState = NewIndexer(DeletionHandlingMetaNamespaceKeyFunc, options.Indexers)
}
return clientState, newInformer(clientState, options)
}
// NewInformer returns a Store and a controller for populating the store // NewInformer returns a Store and a controller for populating the store
// while also providing event notifications. You should only used the returned // while also providing event notifications. You should only used the returned
// Store for Get/List operations; Add/Modify/Deletes will cause the event // Store for Get/List operations; Add/Modify/Deletes will cause the event
@ -360,6 +419,8 @@ func DeletionHandlingObjectToName(obj interface{}) (ObjectName, error) {
// long as possible (until the upstream source closes the watch or times out, // long as possible (until the upstream source closes the watch or times out,
// or you stop the controller). // or you stop the controller).
// - h is the object you want notifications sent to. // - h is the object you want notifications sent to.
//
// Deprecated: Use NewInformerWithOptions instead.
func NewInformer( func NewInformer(
lw ListerWatcher, lw ListerWatcher,
objType runtime.Object, objType runtime.Object,
@ -369,7 +430,13 @@ func NewInformer(
// This will hold the client state, as we know it. // This will hold the client state, as we know it.
clientState := NewStore(DeletionHandlingMetaNamespaceKeyFunc) clientState := NewStore(DeletionHandlingMetaNamespaceKeyFunc)
return clientState, newInformer(lw, objType, resyncPeriod, h, clientState, nil) options := InformerOptions{
ListerWatcher: lw,
ObjectType: objType,
Handler: h,
ResyncPeriod: resyncPeriod,
}
return clientState, newInformer(clientState, options)
} }
// NewIndexerInformer returns an Indexer and a Controller for populating the index // NewIndexerInformer returns an Indexer and a Controller for populating the index
@ -387,6 +454,8 @@ func NewInformer(
// or you stop the controller). // or you stop the controller).
// - h is the object you want notifications sent to. // - h is the object you want notifications sent to.
// - indexers is the indexer for the received object type. // - indexers is the indexer for the received object type.
//
// Deprecated: Use NewInformerWithOptions instead.
func NewIndexerInformer( func NewIndexerInformer(
lw ListerWatcher, lw ListerWatcher,
objType runtime.Object, objType runtime.Object,
@ -397,7 +466,14 @@ func NewIndexerInformer(
// This will hold the client state, as we know it. // This will hold the client state, as we know it.
clientState := NewIndexer(DeletionHandlingMetaNamespaceKeyFunc, indexers) clientState := NewIndexer(DeletionHandlingMetaNamespaceKeyFunc, indexers)
return clientState, newInformer(lw, objType, resyncPeriod, h, clientState, nil) options := InformerOptions{
ListerWatcher: lw,
ObjectType: objType,
Handler: h,
ResyncPeriod: resyncPeriod,
Indexers: indexers,
}
return clientState, newInformer(clientState, options)
} }
// NewTransformingInformer returns a Store and a controller for populating // NewTransformingInformer returns a Store and a controller for populating
@ -407,6 +483,8 @@ func NewIndexerInformer(
// The given transform function will be called on all objects before they will // The given transform function will be called on all objects before they will
// put into the Store and corresponding Add/Modify/Delete handlers will // put into the Store and corresponding Add/Modify/Delete handlers will
// be invoked for them. // be invoked for them.
//
// Deprecated: Use NewInformerWithOptions instead.
func NewTransformingInformer( func NewTransformingInformer(
lw ListerWatcher, lw ListerWatcher,
objType runtime.Object, objType runtime.Object,
@ -417,7 +495,14 @@ func NewTransformingInformer(
// This will hold the client state, as we know it. // This will hold the client state, as we know it.
clientState := NewStore(DeletionHandlingMetaNamespaceKeyFunc) clientState := NewStore(DeletionHandlingMetaNamespaceKeyFunc)
return clientState, newInformer(lw, objType, resyncPeriod, h, clientState, transformer) options := InformerOptions{
ListerWatcher: lw,
ObjectType: objType,
Handler: h,
ResyncPeriod: resyncPeriod,
Transform: transformer,
}
return clientState, newInformer(clientState, options)
} }
// NewTransformingIndexerInformer returns an Indexer and a controller for // NewTransformingIndexerInformer returns an Indexer and a controller for
@ -427,6 +512,8 @@ func NewTransformingInformer(
// The given transform function will be called on all objects before they will // The given transform function will be called on all objects before they will
// be put into the Index and corresponding Add/Modify/Delete handlers will // be put into the Index and corresponding Add/Modify/Delete handlers will
// be invoked for them. // be invoked for them.
//
// Deprecated: Use NewInformerWithOptions instead.
func NewTransformingIndexerInformer( func NewTransformingIndexerInformer(
lw ListerWatcher, lw ListerWatcher,
objType runtime.Object, objType runtime.Object,
@ -438,7 +525,15 @@ func NewTransformingIndexerInformer(
// This will hold the client state, as we know it. // This will hold the client state, as we know it.
clientState := NewIndexer(DeletionHandlingMetaNamespaceKeyFunc, indexers) clientState := NewIndexer(DeletionHandlingMetaNamespaceKeyFunc, indexers)
return clientState, newInformer(lw, objType, resyncPeriod, h, clientState, transformer) options := InformerOptions{
ListerWatcher: lw,
ObjectType: objType,
Handler: h,
ResyncPeriod: resyncPeriod,
Indexers: indexers,
Transform: transformer,
}
return clientState, newInformer(clientState, options)
} }
// Multiplexes updates in the form of a list of Deltas into a Store, and informs // Multiplexes updates in the form of a list of Deltas into a Store, and informs
@ -481,42 +576,29 @@ func processDeltas(
// providing event notifications. // providing event notifications.
// //
// Parameters // Parameters
// - lw is list and watch functions for the source of the resource you want to
// be informed of.
// - objType is an object of the type that you expect to receive.
// - resyncPeriod: if non-zero, will re-list this often (you will get OnUpdate
// calls, even if nothing changed). Otherwise, re-list will be delayed as
// long as possible (until the upstream source closes the watch or times out,
// or you stop the controller).
// - h is the object you want notifications sent to.
// - clientState is the store you want to populate // - clientState is the store you want to populate
func newInformer( // - options contain the options to configure the controller
lw ListerWatcher, func newInformer(clientState Store, options InformerOptions) Controller {
objType runtime.Object,
resyncPeriod time.Duration,
h ResourceEventHandler,
clientState Store,
transformer TransformFunc,
) Controller {
// This will hold incoming changes. Note how we pass clientState in as a // This will hold incoming changes. Note how we pass clientState in as a
// KeyLister, that way resync operations will result in the correct set // KeyLister, that way resync operations will result in the correct set
// of update/delete deltas. // of update/delete deltas.
fifo := NewDeltaFIFOWithOptions(DeltaFIFOOptions{ fifo := NewDeltaFIFOWithOptions(DeltaFIFOOptions{
KnownObjects: clientState, KnownObjects: clientState,
EmitDeltaTypeReplaced: true, EmitDeltaTypeReplaced: true,
Transformer: transformer, Transformer: options.Transform,
}) })
cfg := &Config{ cfg := &Config{
Queue: fifo, Queue: fifo,
ListerWatcher: lw, ListerWatcher: options.ListerWatcher,
ObjectType: objType, ObjectType: options.ObjectType,
FullResyncPeriod: resyncPeriod, FullResyncPeriod: options.ResyncPeriod,
MinWatchTimeout: options.MinWatchTimeout,
RetryOnError: false, RetryOnError: false,
Process: func(obj interface{}, isInInitialList bool) error { Process: func(obj interface{}, isInInitialList bool) error {
if deltas, ok := obj.(Deltas); ok { if deltas, ok := obj.(Deltas); ok {
return processDeltas(h, clientState, deltas, isInInitialList) return processDeltas(options.Handler, clientState, deltas, isInInitialList)
} }
return errors.New("object given as Process argument is not Deltas") return errors.New("object given as Process argument is not Deltas")
}, },

View File

@ -49,6 +49,12 @@ import (
const defaultExpectedTypeName = "<unspecified>" const defaultExpectedTypeName = "<unspecified>"
var (
// We try to spread the load on apiserver by setting timeouts for
// watch requests - it is random in [minWatchTimeout, 2*minWatchTimeout].
defaultMinWatchTimeout = 5 * time.Minute
)
// Reflector watches a specified resource and causes all changes to be reflected in the given store. // Reflector watches a specified resource and causes all changes to be reflected in the given store.
type Reflector struct { type Reflector struct {
// name identifies this reflector. By default it will be a file:line if possible. // name identifies this reflector. By default it will be a file:line if possible.
@ -72,6 +78,8 @@ type Reflector struct {
// backoff manages backoff of ListWatch // backoff manages backoff of ListWatch
backoffManager wait.BackoffManager backoffManager wait.BackoffManager
resyncPeriod time.Duration resyncPeriod time.Duration
// minWatchTimeout defines the minimum timeout for watch requests.
minWatchTimeout time.Duration
// clock allows tests to manipulate time // clock allows tests to manipulate time
clock clock.Clock clock clock.Clock
// paginatedResult defines whether pagination should be forced for list calls. // paginatedResult defines whether pagination should be forced for list calls.
@ -151,12 +159,6 @@ func DefaultWatchErrorHandler(r *Reflector, err error) {
} }
} }
var (
// We try to spread the load on apiserver by setting timeouts for
// watch requests - it is random in [minWatchTimeout, 2*minWatchTimeout].
minWatchTimeout = 5 * time.Minute
)
// NewNamespaceKeyedIndexerAndReflector creates an Indexer and a Reflector // NewNamespaceKeyedIndexerAndReflector creates an Indexer and a Reflector
// The indexer is configured to key on namespace // The indexer is configured to key on namespace
func NewNamespaceKeyedIndexerAndReflector(lw ListerWatcher, expectedType interface{}, resyncPeriod time.Duration) (indexer Indexer, reflector *Reflector) { func NewNamespaceKeyedIndexerAndReflector(lw ListerWatcher, expectedType interface{}, resyncPeriod time.Duration) (indexer Indexer, reflector *Reflector) {
@ -194,6 +196,10 @@ type ReflectorOptions struct {
// (do not resync). // (do not resync).
ResyncPeriod time.Duration ResyncPeriod time.Duration
// MinWatchTimeout, if non-zero, defines the minimum timeout for watch requests send to kube-apiserver.
// However, values lower than 5m will not be honored to avoid negative performance impact on controlplane.
MinWatchTimeout time.Duration
// Clock allows tests to control time. If unset defaults to clock.RealClock{} // Clock allows tests to control time. If unset defaults to clock.RealClock{}
Clock clock.Clock Clock clock.Clock
} }
@ -213,9 +219,14 @@ func NewReflectorWithOptions(lw ListerWatcher, expectedType interface{}, store S
if reflectorClock == nil { if reflectorClock == nil {
reflectorClock = clock.RealClock{} reflectorClock = clock.RealClock{}
} }
minWatchTimeout := defaultMinWatchTimeout
if options.MinWatchTimeout > defaultMinWatchTimeout {
minWatchTimeout = options.MinWatchTimeout
}
r := &Reflector{ r := &Reflector{
name: options.Name, name: options.Name,
resyncPeriod: options.ResyncPeriod, resyncPeriod: options.ResyncPeriod,
minWatchTimeout: minWatchTimeout,
typeDescription: options.TypeDescription, typeDescription: options.TypeDescription,
listerWatcher: lw, listerWatcher: lw,
store: store, store: store,
@ -415,7 +426,7 @@ func (r *Reflector) watch(w watch.Interface, stopCh <-chan struct{}, resyncerrc
start := r.clock.Now() start := r.clock.Now()
if w == nil { if w == nil {
timeoutSeconds := int64(minWatchTimeout.Seconds() * (rand.Float64() + 1.0)) timeoutSeconds := int64(r.minWatchTimeout.Seconds() * (rand.Float64() + 1.0))
options := metav1.ListOptions{ options := metav1.ListOptions{
ResourceVersion: r.LastSyncResourceVersion(), ResourceVersion: r.LastSyncResourceVersion(),
// We want to avoid situations of hanging watchers. Stop any watchers that do not // We want to avoid situations of hanging watchers. Stop any watchers that do not
@ -642,7 +653,7 @@ func (r *Reflector) watchList(stopCh <-chan struct{}) (watch.Interface, error) {
// TODO(#115478): large "list", slow clients, slow network, p&f // TODO(#115478): large "list", slow clients, slow network, p&f
// might slow down streaming and eventually fail. // might slow down streaming and eventually fail.
// maybe in such a case we should retry with an increased timeout? // maybe in such a case we should retry with an increased timeout?
timeoutSeconds := int64(minWatchTimeout.Seconds() * (rand.Float64() + 1.0)) timeoutSeconds := int64(r.minWatchTimeout.Seconds() * (rand.Float64() + 1.0))
options := metav1.ListOptions{ options := metav1.ListOptions{
ResourceVersion: lastKnownRV, ResourceVersion: lastKnownRV,
AllowWatchBookmarks: true, AllowWatchBookmarks: true,

View File

@ -1091,6 +1091,67 @@ func TestGetExpectedGVKFromObject(t *testing.T) {
} }
} }
func TestWatchTimeout(t *testing.T) {
testCases := []struct {
name string
minWatchTimeout time.Duration
expectedMinTimeoutSeconds int64
}{
{
name: "no timeout",
expectedMinTimeoutSeconds: 5 * 60,
},
{
name: "small timeout not honored",
minWatchTimeout: time.Second,
expectedMinTimeoutSeconds: 5 * 60,
},
{
name: "30m timeout",
minWatchTimeout: 30 * time.Minute,
expectedMinTimeoutSeconds: 30 * 60,
},
}
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
stopCh := make(chan struct{})
s := NewStore(MetaNamespaceKeyFunc)
var gotTimeoutSeconds int64
lw := &testLW{
ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
return &v1.PodList{ListMeta: metav1.ListMeta{ResourceVersion: "10"}}, nil
},
WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
if options.TimeoutSeconds != nil {
gotTimeoutSeconds = *options.TimeoutSeconds
}
// Stop once the reflector begins watching since we're only interested in the list.
close(stopCh)
return watch.NewFake(), nil
},
}
opts := ReflectorOptions{
MinWatchTimeout: tc.minWatchTimeout,
}
r := NewReflectorWithOptions(lw, &v1.Pod{}, s, opts)
if err := r.ListAndWatch(stopCh); err != nil {
t.Fatal(err)
}
minExpected := tc.expectedMinTimeoutSeconds
maxExpected := 2 * tc.expectedMinTimeoutSeconds
if gotTimeoutSeconds < minExpected || gotTimeoutSeconds > maxExpected {
t.Errorf("unexpected TimeoutSecond, got %v, expected in [%v, %v]", gotTimeoutSeconds, minExpected, maxExpected)
}
})
}
}
type storeWithRV struct { type storeWithRV struct {
Store Store