Avoid thundering herd on etcd on masters upgrade

Kubernetes-commit: 773d3583bcb7a2ac39c772a9861978d81db0df2f
This commit is contained in:
wojtekt
2019-12-30 13:53:28 +01:00
committed by Kubernetes Publisher
parent 21de178e1d
commit 5534e24283
2 changed files with 65 additions and 4 deletions

View File

@@ -85,7 +85,12 @@ type Reflector struct {
// lastSyncResourceVersionMutex guards read/write access to lastSyncResourceVersion
lastSyncResourceVersionMutex sync.RWMutex
// WatchListPageSize is the requested chunk size of initial and resync watch lists.
// Defaults to pager.PageSize.
// If unset, for consistent reads (RV="") or reads that opt-into arbitrarily old data
// (RV="0") it will default to pager.PageSize, for the rest (RV != "" && RV != "0")
// it will turn off pagination to allow serving them from watch cache.
// NOTE: It should be used carefully as paginated lists are always served directly from
// etcd, which is significantly less efficient and may lead to serious performance and
// scalability problems.
WatchListPageSize int64
}
@@ -220,6 +225,21 @@ func (r *Reflector) ListAndWatch(stopCh <-chan struct{}) error {
}))
if r.WatchListPageSize != 0 {
pager.PageSize = r.WatchListPageSize
} else {
// User didn't explicitly request pagination.
if options.ResourceVersion != "" && options.ResourceVersion != "0" {
// We also don't turn off pagination for ResourceVersion="0", since watch cache
// is ignoring Limit in that case anyway, and if watchcache is not enabled we
// don't introduce regression.
// With ResourceVersion != "", we have a possibility to list from watch cache,
// but we do that (for ResourceVersion != "0") only if Limit is unset.
// To avoid thundering herd on etcd (e.g. on master upgrades), we explicitly
// switch off pagination to force listing from watch cache (if enabled).
// With the existing semantic of RV (result is at least as fresh as provided RV),
// this is correct and doesn't lead to going back in time.
pager.PageSize = 0
}
}
list, err = pager.List(context.Background(), options)
@@ -320,7 +340,9 @@ func (r *Reflector) ListAndWatch(stopCh <-chan struct{}) error {
if err != nil {
switch {
case isExpiredError(err):
r.setIsLastSyncResourceVersionExpired(true)
// Don't set LastSyncResourceVersionExpired - LIST call with ResourceVersion=RV already
// has a semantic that it returns data at least as fresh as provided RV.
// So first try to LIST with setting RV to resource version of last observed object.
klog.V(4).Infof("%s: watch of %v closed with: %v", r.name, r.expectedTypeName, err)
case err == io.EOF:
// watch closed normally
@@ -344,8 +366,10 @@ func (r *Reflector) ListAndWatch(stopCh <-chan struct{}) error {
if err != errorStopRequested {
switch {
case isExpiredError(err):
r.setIsLastSyncResourceVersionExpired(true)
klog.V(4).Infof("%s: watch of %v ended with: %v", r.name, r.expectedTypeName, err)
// Don't set LastSyncResourceVersionExpired - LIST call with ResourceVersion=RV already
// has a semantic that it returns data at least as fresh as provided RV.
// So first try to LIST with setting RV to resource version of last observed object.
klog.V(4).Infof("%s: watch of %v closed with: %v", r.name, r.expectedTypeName, err)
default:
klog.Warningf("%s: watch of %v ended with: %v", r.name, r.expectedTypeName, err)
}