mirror of
https://github.com/kubernetes/client-go.git
synced 2025-08-01 07:20:59 +00:00
Allow for configuring MinWatchTimeout in Reflector and Informer.
Kubernetes-commit: 29e38c19b853b6cc3950541b1727395acf5eb4d3
This commit is contained in:
parent
7da319745b
commit
b9e952f4d7
14
tools/cache/controller.go
vendored
14
tools/cache/controller.go
vendored
@ -59,6 +59,12 @@ type Config struct {
|
||||
// FullResyncPeriod is the period at which ShouldResync is considered.
|
||||
FullResyncPeriod time.Duration
|
||||
|
||||
// MinWatchTimeout, if set, will define the minimum timeout for watch requests send
|
||||
// to kube-apiserver. However, values lower than 5m will not be honored to avoid
|
||||
// negative performance impact on controlplane.
|
||||
// Optional - if unset a default value of 5m will be used.
|
||||
MinWatchTimeout time.Duration
|
||||
|
||||
// ShouldResync is periodically used by the reflector to determine
|
||||
// whether to Resync the Queue. If ShouldResync is `nil` or
|
||||
// returns true, it means the reflector should proceed with the
|
||||
@ -138,6 +144,7 @@ func (c *controller) Run(stopCh <-chan struct{}) {
|
||||
c.config.Queue,
|
||||
ReflectorOptions{
|
||||
ResyncPeriod: c.config.FullResyncPeriod,
|
||||
MinWatchTimeout: c.config.MinWatchTimeout,
|
||||
TypeDescription: c.config.ObjectDescription,
|
||||
Clock: c.clock,
|
||||
},
|
||||
@ -366,6 +373,12 @@ type InformerOptions struct {
|
||||
// Optional - if unset, store resyncing is not happening periodically.
|
||||
ResyncPeriod time.Duration
|
||||
|
||||
// MinWatchTimeout, if set, will define the minimum timeout for watch requests send
|
||||
// to kube-apiserver. However, values lower than 5m will not be honored to avoid
|
||||
// negative performance impact on controlplane.
|
||||
// Optional - if unset a default value of 5m will be used.
|
||||
MinWatchTimeout time.Duration
|
||||
|
||||
// Indexers, if set, are the indexers for the received objects to optimize
|
||||
// certain queries.
|
||||
// Optional - if unset no indexes are maintained.
|
||||
@ -580,6 +593,7 @@ func newInformer(clientState Store, options InformerOptions) Controller {
|
||||
ListerWatcher: options.ListerWatcher,
|
||||
ObjectType: options.ObjectType,
|
||||
FullResyncPeriod: options.ResyncPeriod,
|
||||
MinWatchTimeout: options.MinWatchTimeout,
|
||||
RetryOnError: false,
|
||||
|
||||
Process: func(obj interface{}, isInInitialList bool) error {
|
||||
|
27
tools/cache/reflector.go
vendored
27
tools/cache/reflector.go
vendored
@ -49,6 +49,12 @@ import (
|
||||
|
||||
const defaultExpectedTypeName = "<unspecified>"
|
||||
|
||||
var (
|
||||
// We try to spread the load on apiserver by setting timeouts for
|
||||
// watch requests - it is random in [minWatchTimeout, 2*minWatchTimeout].
|
||||
defaultMinWatchTimeout = 5 * time.Minute
|
||||
)
|
||||
|
||||
// Reflector watches a specified resource and causes all changes to be reflected in the given store.
|
||||
type Reflector struct {
|
||||
// name identifies this reflector. By default it will be a file:line if possible.
|
||||
@ -72,6 +78,8 @@ type Reflector struct {
|
||||
// backoff manages backoff of ListWatch
|
||||
backoffManager wait.BackoffManager
|
||||
resyncPeriod time.Duration
|
||||
// minWatchTimeout defines the minimum timeout for watch requests.
|
||||
minWatchTimeout time.Duration
|
||||
// clock allows tests to manipulate time
|
||||
clock clock.Clock
|
||||
// paginatedResult defines whether pagination should be forced for list calls.
|
||||
@ -151,12 +159,6 @@ func DefaultWatchErrorHandler(r *Reflector, err error) {
|
||||
}
|
||||
}
|
||||
|
||||
var (
|
||||
// We try to spread the load on apiserver by setting timeouts for
|
||||
// watch requests - it is random in [minWatchTimeout, 2*minWatchTimeout].
|
||||
minWatchTimeout = 5 * time.Minute
|
||||
)
|
||||
|
||||
// NewNamespaceKeyedIndexerAndReflector creates an Indexer and a Reflector
|
||||
// The indexer is configured to key on namespace
|
||||
func NewNamespaceKeyedIndexerAndReflector(lw ListerWatcher, expectedType interface{}, resyncPeriod time.Duration) (indexer Indexer, reflector *Reflector) {
|
||||
@ -194,6 +196,10 @@ type ReflectorOptions struct {
|
||||
// (do not resync).
|
||||
ResyncPeriod time.Duration
|
||||
|
||||
// MinWatchTimeout, if non-zero, defines the minimum timeout for watch requests send to kube-apiserver.
|
||||
// However, values lower than 5m will not be honored to avoid negative performance impact on controlplane.
|
||||
MinWatchTimeout time.Duration
|
||||
|
||||
// Clock allows tests to control time. If unset defaults to clock.RealClock{}
|
||||
Clock clock.Clock
|
||||
}
|
||||
@ -213,9 +219,14 @@ func NewReflectorWithOptions(lw ListerWatcher, expectedType interface{}, store S
|
||||
if reflectorClock == nil {
|
||||
reflectorClock = clock.RealClock{}
|
||||
}
|
||||
minWatchTimeout := defaultMinWatchTimeout
|
||||
if options.MinWatchTimeout > defaultMinWatchTimeout {
|
||||
minWatchTimeout = options.MinWatchTimeout
|
||||
}
|
||||
r := &Reflector{
|
||||
name: options.Name,
|
||||
resyncPeriod: options.ResyncPeriod,
|
||||
minWatchTimeout: minWatchTimeout,
|
||||
typeDescription: options.TypeDescription,
|
||||
listerWatcher: lw,
|
||||
store: store,
|
||||
@ -415,7 +426,7 @@ func (r *Reflector) watch(w watch.Interface, stopCh <-chan struct{}, resyncerrc
|
||||
start := r.clock.Now()
|
||||
|
||||
if w == nil {
|
||||
timeoutSeconds := int64(minWatchTimeout.Seconds() * (rand.Float64() + 1.0))
|
||||
timeoutSeconds := int64(r.minWatchTimeout.Seconds() * (rand.Float64() + 1.0))
|
||||
options := metav1.ListOptions{
|
||||
ResourceVersion: r.LastSyncResourceVersion(),
|
||||
// We want to avoid situations of hanging watchers. Stop any watchers that do not
|
||||
@ -642,7 +653,7 @@ func (r *Reflector) watchList(stopCh <-chan struct{}) (watch.Interface, error) {
|
||||
// TODO(#115478): large "list", slow clients, slow network, p&f
|
||||
// might slow down streaming and eventually fail.
|
||||
// maybe in such a case we should retry with an increased timeout?
|
||||
timeoutSeconds := int64(minWatchTimeout.Seconds() * (rand.Float64() + 1.0))
|
||||
timeoutSeconds := int64(r.minWatchTimeout.Seconds() * (rand.Float64() + 1.0))
|
||||
options := metav1.ListOptions{
|
||||
ResourceVersion: lastKnownRV,
|
||||
AllowWatchBookmarks: true,
|
||||
|
61
tools/cache/reflector_test.go
vendored
61
tools/cache/reflector_test.go
vendored
@ -1091,6 +1091,67 @@ func TestGetExpectedGVKFromObject(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
func TestWatchTimeout(t *testing.T) {
|
||||
|
||||
testCases := []struct {
|
||||
name string
|
||||
minWatchTimeout time.Duration
|
||||
expectedMinTimeoutSeconds int64
|
||||
}{
|
||||
{
|
||||
name: "no timeout",
|
||||
expectedMinTimeoutSeconds: 5 * 60,
|
||||
},
|
||||
{
|
||||
name: "small timeout not honored",
|
||||
minWatchTimeout: time.Second,
|
||||
expectedMinTimeoutSeconds: 5 * 60,
|
||||
},
|
||||
{
|
||||
name: "30m timeout",
|
||||
minWatchTimeout: 30 * time.Minute,
|
||||
expectedMinTimeoutSeconds: 30 * 60,
|
||||
},
|
||||
}
|
||||
|
||||
for _, tc := range testCases {
|
||||
t.Run(tc.name, func(t *testing.T) {
|
||||
stopCh := make(chan struct{})
|
||||
s := NewStore(MetaNamespaceKeyFunc)
|
||||
var gotTimeoutSeconds int64
|
||||
|
||||
lw := &testLW{
|
||||
ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
|
||||
return &v1.PodList{ListMeta: metav1.ListMeta{ResourceVersion: "10"}}, nil
|
||||
},
|
||||
WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
|
||||
if options.TimeoutSeconds != nil {
|
||||
gotTimeoutSeconds = *options.TimeoutSeconds
|
||||
}
|
||||
|
||||
// Stop once the reflector begins watching since we're only interested in the list.
|
||||
close(stopCh)
|
||||
return watch.NewFake(), nil
|
||||
},
|
||||
}
|
||||
|
||||
opts := ReflectorOptions{
|
||||
MinWatchTimeout: tc.minWatchTimeout,
|
||||
}
|
||||
r := NewReflectorWithOptions(lw, &v1.Pod{}, s, opts)
|
||||
if err := r.ListAndWatch(stopCh); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
minExpected := tc.expectedMinTimeoutSeconds
|
||||
maxExpected := 2 * tc.expectedMinTimeoutSeconds
|
||||
if gotTimeoutSeconds < minExpected || gotTimeoutSeconds > maxExpected {
|
||||
t.Errorf("unexpected TimeoutSecond, got %v, expected in [%v, %v]", gotTimeoutSeconds, minExpected, maxExpected)
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
type storeWithRV struct {
|
||||
Store
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user