Merge pull request #87795 from zhan849/harry/reflector-backoff

add exponential backoff with cap and reset in reflector during retrying ListWatch

Kubernetes-commit: f9250c4f951a65d6eb65b6aceb1ac1f33598c14a
This commit is contained in:
Kubernetes Publisher 2020-02-11 17:06:21 -08:00
commit bf78b1a1f3

View File

@ -66,10 +66,10 @@ type Reflector struct {
store Store store Store
// listerWatcher is used to perform lists and watches. // listerWatcher is used to perform lists and watches.
listerWatcher ListerWatcher listerWatcher ListerWatcher
// period controls timing between an unsuccessful watch ending and
// the beginning of the next list. // backoff manages backoff of ListWatch
period time.Duration backoffManager wait.BackoffManager
// The period at which ShouldResync is invoked
resyncPeriod time.Duration resyncPeriod time.Duration
// ShouldResync is invoked periodically and whenever it returns `true` the Store's Resync operation is invoked // ShouldResync is invoked periodically and whenever it returns `true` the Store's Resync operation is invoked
ShouldResync func() bool ShouldResync func() bool
@ -127,13 +127,17 @@ func NewReflector(lw ListerWatcher, expectedType interface{}, store Store, resyn
// NewNamedReflector same as NewReflector, but with a specified name for logging // NewNamedReflector same as NewReflector, but with a specified name for logging
func NewNamedReflector(name string, lw ListerWatcher, expectedType interface{}, store Store, resyncPeriod time.Duration) *Reflector { func NewNamedReflector(name string, lw ListerWatcher, expectedType interface{}, store Store, resyncPeriod time.Duration) *Reflector {
realClock := &clock.RealClock{}
r := &Reflector{ r := &Reflector{
name: name, name: name,
listerWatcher: lw, listerWatcher: lw,
store: store, store: store,
period: time.Second, // We used to make the call every 1sec (1 QPS), the goal here is to achieve ~98% traffic reduction when
resyncPeriod: resyncPeriod, // API server is not healthy. With these parameters, backoff will stop at [30,60) sec interval which is
clock: &clock.RealClock{}, // 0.22 QPS. If we don't backoff for 2min, assume API server is healthy and we reset the backoff.
backoffManager: wait.NewExponentialBackoffManager(800*time.Millisecond, 30*time.Second, 2*time.Minute, 2.0, 1.0, realClock),
resyncPeriod: resyncPeriod,
clock: realClock,
} }
r.setExpectedType(expectedType) r.setExpectedType(expectedType)
return r return r
@ -168,12 +172,13 @@ var internalPackages = []string{"client-go/tools/cache/"}
// objects and subsequent deltas. // objects and subsequent deltas.
// Run will exit when stopCh is closed. // Run will exit when stopCh is closed.
func (r *Reflector) Run(stopCh <-chan struct{}) { func (r *Reflector) Run(stopCh <-chan struct{}) {
klog.V(3).Infof("Starting reflector %v (%s) from %s", r.expectedTypeName, r.resyncPeriod, r.name) klog.V(2).Infof("Starting reflector %s (%s) from %s", r.expectedTypeName, r.resyncPeriod, r.name)
wait.Until(func() { wait.BackoffUntil(func() {
if err := r.ListAndWatch(stopCh); err != nil { if err := r.ListAndWatch(stopCh); err != nil {
utilruntime.HandleError(err) utilruntime.HandleError(err)
} }
}, r.period, stopCh) }, r.backoffManager, true, stopCh)
klog.V(2).Infof("Stopping reflector %s (%s) from %s", r.expectedTypeName, r.resyncPeriod, r.name)
} }
var ( var (