From 773d3583bcb7a2ac39c772a9861978d81db0df2f Mon Sep 17 00:00:00 2001 From: wojtekt Date: Mon, 30 Dec 2019 13:53:28 +0100 Subject: [PATCH] Avoid thundering herd on etcd on masters upgrade --- .../k8s.io/client-go/tools/cache/reflector.go | 32 ++++++++++++++-- .../client-go/tools/cache/reflector_test.go | 37 +++++++++++++++++++ 2 files changed, 65 insertions(+), 4 deletions(-) diff --git a/staging/src/k8s.io/client-go/tools/cache/reflector.go b/staging/src/k8s.io/client-go/tools/cache/reflector.go index 32ef798a236..2a1af4e9ee2 100644 --- a/staging/src/k8s.io/client-go/tools/cache/reflector.go +++ b/staging/src/k8s.io/client-go/tools/cache/reflector.go @@ -85,7 +85,12 @@ type Reflector struct { // lastSyncResourceVersionMutex guards read/write access to lastSyncResourceVersion lastSyncResourceVersionMutex sync.RWMutex // WatchListPageSize is the requested chunk size of initial and resync watch lists. - // Defaults to pager.PageSize. + // If unset, for consistent reads (RV="") or reads that opt-into arbitrarily old data + // (RV="0") it will default to pager.PageSize, for the rest (RV != "" && RV != "0") + // it will turn off pagination to allow serving them from watch cache. + // NOTE: It should be used carefully as paginated lists are always served directly from + // etcd, which is significantly less efficient and may lead to serious performance and + // scalability problems. WatchListPageSize int64 } @@ -220,6 +225,21 @@ func (r *Reflector) ListAndWatch(stopCh <-chan struct{}) error { })) if r.WatchListPageSize != 0 { pager.PageSize = r.WatchListPageSize + } else { + // User didn't explicitly request pagination. + if options.ResourceVersion != "" && options.ResourceVersion != "0" { + // We also don't turn off pagination for ResourceVersion="0", since watch cache + // is ignoring Limit in that case anyway, and if watchcache is not enabled we + // don't introduce regression. + + // With ResourceVersion != "", we have a possibility to list from watch cache, + // but we do that (for ResourceVersion != "0") only if Limit is unset. + // To avoid thundering herd on etcd (e.g. on master upgrades), we explicitly + // switch off pagination to force listing from watch cache (if enabled). + // With the existing semantic of RV (result is at least as fresh as provided RV), + // this is correct and doesn't lead to going back in time. + pager.PageSize = 0 + } } list, err = pager.List(context.Background(), options) @@ -320,7 +340,9 @@ func (r *Reflector) ListAndWatch(stopCh <-chan struct{}) error { if err != nil { switch { case isExpiredError(err): - r.setIsLastSyncResourceVersionExpired(true) + // Don't set LastSyncResourceVersionExpired - LIST call with ResourceVersion=RV already + // has a semantic that it returns data at least as fresh as provided RV. + // So first try to LIST with setting RV to resource version of last observed object. klog.V(4).Infof("%s: watch of %v closed with: %v", r.name, r.expectedTypeName, err) case err == io.EOF: // watch closed normally @@ -344,8 +366,10 @@ func (r *Reflector) ListAndWatch(stopCh <-chan struct{}) error { if err != errorStopRequested { switch { case isExpiredError(err): - r.setIsLastSyncResourceVersionExpired(true) - klog.V(4).Infof("%s: watch of %v ended with: %v", r.name, r.expectedTypeName, err) + // Don't set LastSyncResourceVersionExpired - LIST call with ResourceVersion=RV already + // has a semantic that it returns data at least as fresh as provided RV. + // So first try to LIST with setting RV to resource version of last observed object. + klog.V(4).Infof("%s: watch of %v closed with: %v", r.name, r.expectedTypeName, err) default: klog.Warningf("%s: watch of %v ended with: %v", r.name, r.expectedTypeName, err) } diff --git a/staging/src/k8s.io/client-go/tools/cache/reflector_test.go b/staging/src/k8s.io/client-go/tools/cache/reflector_test.go index 9f6aaa07668..973403ec130 100644 --- a/staging/src/k8s.io/client-go/tools/cache/reflector_test.go +++ b/staging/src/k8s.io/client-go/tools/cache/reflector_test.go @@ -425,6 +425,8 @@ func TestReflectorWatchListPageSize(t *testing.T) { }, } r := NewReflector(lw, &v1.Pod{}, s, 0) + // Set resource version to test pagination also for not consistent reads. + r.setLastSyncResourceVersion("10") // Set the reflector to paginate the list request in 4 item chunks. r.WatchListPageSize = 4 r.ListAndWatch(stopCh) @@ -435,6 +437,41 @@ func TestReflectorWatchListPageSize(t *testing.T) { } } +func TestReflectorNotPaginatingNotConsistentReads(t *testing.T) { + stopCh := make(chan struct{}) + s := NewStore(MetaNamespaceKeyFunc) + + lw := &testLW{ + WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) { + // Stop once the reflector begins watching since we're only interested in the list. + close(stopCh) + fw := watch.NewFake() + return fw, nil + }, + ListFunc: func(options metav1.ListOptions) (runtime.Object, error) { + if options.ResourceVersion != "10" { + t.Fatalf("Expected ResourceVersion: \"10\", got: %s", options.ResourceVersion) + } + if options.Limit != 0 { + t.Fatalf("Expected list Limit of 0 but got %d", options.Limit) + } + pods := make([]v1.Pod, 10) + for i := 0; i < 10; i++ { + pods[i] = v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: fmt.Sprintf("pod-%d", i), ResourceVersion: fmt.Sprintf("%d", i)}} + } + return &v1.PodList{ListMeta: metav1.ListMeta{ResourceVersion: "10"}, Items: pods}, nil + }, + } + r := NewReflector(lw, &v1.Pod{}, s, 0) + r.setLastSyncResourceVersion("10") + r.ListAndWatch(stopCh) + + results := s.List() + if len(results) != 10 { + t.Errorf("Expected 10 results, got %d", len(results)) + } +} + // TestReflectorResyncWithResourceVersion ensures that a reflector keeps track of the ResourceVersion and sends // it in relist requests to prevent the reflector from traveling back in time if the relist is to a api-server or // etcd that is partitioned and serving older data than the reflector has already processed.