From 5534e24283d00a68e21fccb807b710fdb0e092ef Mon Sep 17 00:00:00 2001 From: wojtekt Date: Mon, 30 Dec 2019 13:53:28 +0100 Subject: [PATCH 1/2] Avoid thundering herd on etcd on masters upgrade Kubernetes-commit: 773d3583bcb7a2ac39c772a9861978d81db0df2f --- tools/cache/reflector.go | 32 ++++++++++++++++++++++++++---- tools/cache/reflector_test.go | 37 +++++++++++++++++++++++++++++++++++ 2 files changed, 65 insertions(+), 4 deletions(-) diff --git a/tools/cache/reflector.go b/tools/cache/reflector.go index 32ef798a..2a1af4e9 100644 --- a/tools/cache/reflector.go +++ b/tools/cache/reflector.go @@ -85,7 +85,12 @@ type Reflector struct { // lastSyncResourceVersionMutex guards read/write access to lastSyncResourceVersion lastSyncResourceVersionMutex sync.RWMutex // WatchListPageSize is the requested chunk size of initial and resync watch lists. - // Defaults to pager.PageSize. + // If unset, for consistent reads (RV="") or reads that opt-into arbitrarily old data + // (RV="0") it will default to pager.PageSize, for the rest (RV != "" && RV != "0") + // it will turn off pagination to allow serving them from watch cache. + // NOTE: It should be used carefully as paginated lists are always served directly from + // etcd, which is significantly less efficient and may lead to serious performance and + // scalability problems. WatchListPageSize int64 } @@ -220,6 +225,21 @@ func (r *Reflector) ListAndWatch(stopCh <-chan struct{}) error { })) if r.WatchListPageSize != 0 { pager.PageSize = r.WatchListPageSize + } else { + // User didn't explicitly request pagination. + if options.ResourceVersion != "" && options.ResourceVersion != "0" { + // We also don't turn off pagination for ResourceVersion="0", since watch cache + // is ignoring Limit in that case anyway, and if watchcache is not enabled we + // don't introduce regression. + + // With ResourceVersion != "", we have a possibility to list from watch cache, + // but we do that (for ResourceVersion != "0") only if Limit is unset. + // To avoid thundering herd on etcd (e.g. on master upgrades), we explicitly + // switch off pagination to force listing from watch cache (if enabled). + // With the existing semantic of RV (result is at least as fresh as provided RV), + // this is correct and doesn't lead to going back in time. + pager.PageSize = 0 + } } list, err = pager.List(context.Background(), options) @@ -320,7 +340,9 @@ func (r *Reflector) ListAndWatch(stopCh <-chan struct{}) error { if err != nil { switch { case isExpiredError(err): - r.setIsLastSyncResourceVersionExpired(true) + // Don't set LastSyncResourceVersionExpired - LIST call with ResourceVersion=RV already + // has a semantic that it returns data at least as fresh as provided RV. + // So first try to LIST with setting RV to resource version of last observed object. klog.V(4).Infof("%s: watch of %v closed with: %v", r.name, r.expectedTypeName, err) case err == io.EOF: // watch closed normally @@ -344,8 +366,10 @@ func (r *Reflector) ListAndWatch(stopCh <-chan struct{}) error { if err != errorStopRequested { switch { case isExpiredError(err): - r.setIsLastSyncResourceVersionExpired(true) - klog.V(4).Infof("%s: watch of %v ended with: %v", r.name, r.expectedTypeName, err) + // Don't set LastSyncResourceVersionExpired - LIST call with ResourceVersion=RV already + // has a semantic that it returns data at least as fresh as provided RV. + // So first try to LIST with setting RV to resource version of last observed object. + klog.V(4).Infof("%s: watch of %v closed with: %v", r.name, r.expectedTypeName, err) default: klog.Warningf("%s: watch of %v ended with: %v", r.name, r.expectedTypeName, err) } diff --git a/tools/cache/reflector_test.go b/tools/cache/reflector_test.go index 9f6aaa07..973403ec 100644 --- a/tools/cache/reflector_test.go +++ b/tools/cache/reflector_test.go @@ -425,6 +425,8 @@ func TestReflectorWatchListPageSize(t *testing.T) { }, } r := NewReflector(lw, &v1.Pod{}, s, 0) + // Set resource version to test pagination also for not consistent reads. + r.setLastSyncResourceVersion("10") // Set the reflector to paginate the list request in 4 item chunks. r.WatchListPageSize = 4 r.ListAndWatch(stopCh) @@ -435,6 +437,41 @@ func TestReflectorWatchListPageSize(t *testing.T) { } } +func TestReflectorNotPaginatingNotConsistentReads(t *testing.T) { + stopCh := make(chan struct{}) + s := NewStore(MetaNamespaceKeyFunc) + + lw := &testLW{ + WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) { + // Stop once the reflector begins watching since we're only interested in the list. + close(stopCh) + fw := watch.NewFake() + return fw, nil + }, + ListFunc: func(options metav1.ListOptions) (runtime.Object, error) { + if options.ResourceVersion != "10" { + t.Fatalf("Expected ResourceVersion: \"10\", got: %s", options.ResourceVersion) + } + if options.Limit != 0 { + t.Fatalf("Expected list Limit of 0 but got %d", options.Limit) + } + pods := make([]v1.Pod, 10) + for i := 0; i < 10; i++ { + pods[i] = v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: fmt.Sprintf("pod-%d", i), ResourceVersion: fmt.Sprintf("%d", i)}} + } + return &v1.PodList{ListMeta: metav1.ListMeta{ResourceVersion: "10"}, Items: pods}, nil + }, + } + r := NewReflector(lw, &v1.Pod{}, s, 0) + r.setLastSyncResourceVersion("10") + r.ListAndWatch(stopCh) + + results := s.List() + if len(results) != 10 { + t.Errorf("Expected 10 results, got %d", len(results)) + } +} + // TestReflectorResyncWithResourceVersion ensures that a reflector keeps track of the ResourceVersion and sends // it in relist requests to prevent the reflector from traveling back in time if the relist is to a api-server or // etcd that is partitioned and serving older data than the reflector has already processed. From 0e2dbbf70a15dd8619c1bc7658f8df63c8300c0a Mon Sep 17 00:00:00 2001 From: wojtekt Date: Tue, 28 Jan 2020 09:39:27 +0100 Subject: [PATCH 2/2] Switch pager to return whether the result was paginated Kubernetes-commit: 5dcf08c1993718e94cf6537af50e0a411dd2878c --- tools/cache/reflector.go | 57 ++++++++++++++++++++--------- tools/cache/reflector_test.go | 51 ++++++++++++++++++++++++++ tools/pager/pager.go | 21 ++++++----- tools/pager/pager_test.go | 67 +++++++++++++++++++++-------------- 4 files changed, 144 insertions(+), 52 deletions(-) diff --git a/tools/cache/reflector.go b/tools/cache/reflector.go index 2a1af4e9..d0c3190a 100644 --- a/tools/cache/reflector.go +++ b/tools/cache/reflector.go @@ -75,6 +75,9 @@ type Reflector struct { ShouldResync func() bool // clock allows tests to manipulate time clock clock.Clock + // paginatedResult defines whether pagination should be forced for list calls. + // It is set based on the result of the initial list call. + paginatedResult bool // lastSyncResourceVersion is the resource version token last // observed when doing a sync with the underlying store // it is thread safe, but not synchronized with the underlying store @@ -209,6 +212,7 @@ func (r *Reflector) ListAndWatch(stopCh <-chan struct{}) error { initTrace := trace.New("Reflector ListAndWatch", trace.Field{"name", r.name}) defer initTrace.LogIfLong(10 * time.Second) var list runtime.Object + var paginatedResult bool var err error listCh := make(chan struct{}, 1) panicCh := make(chan interface{}, 1) @@ -223,26 +227,30 @@ func (r *Reflector) ListAndWatch(stopCh <-chan struct{}) error { pager := pager.New(pager.SimplePageFunc(func(opts metav1.ListOptions) (runtime.Object, error) { return r.listerWatcher.List(opts) })) - if r.WatchListPageSize != 0 { + switch { + case r.WatchListPageSize != 0: pager.PageSize = r.WatchListPageSize - } else { + case r.paginatedResult: + // We got a paginated result initially. Assume this resource and server honor + // paging requests (i.e. watch cache is probably disabled) and leave the default + // pager size set. + case options.ResourceVersion != "" && options.ResourceVersion != "0": // User didn't explicitly request pagination. - if options.ResourceVersion != "" && options.ResourceVersion != "0" { - // We also don't turn off pagination for ResourceVersion="0", since watch cache - // is ignoring Limit in that case anyway, and if watchcache is not enabled we - // don't introduce regression. - - // With ResourceVersion != "", we have a possibility to list from watch cache, - // but we do that (for ResourceVersion != "0") only if Limit is unset. - // To avoid thundering herd on etcd (e.g. on master upgrades), we explicitly - // switch off pagination to force listing from watch cache (if enabled). - // With the existing semantic of RV (result is at least as fresh as provided RV), - // this is correct and doesn't lead to going back in time. - pager.PageSize = 0 - } + // + // With ResourceVersion != "", we have a possibility to list from watch cache, + // but we do that (for ResourceVersion != "0") only if Limit is unset. + // To avoid thundering herd on etcd (e.g. on master upgrades), we explicitly + // switch off pagination to force listing from watch cache (if enabled). + // With the existing semantic of RV (result is at least as fresh as provided RV), + // this is correct and doesn't lead to going back in time. + // + // We also don't turn off pagination for ResourceVersion="0", since watch cache + // is ignoring Limit in that case anyway, and if watch cache is not enabled + // we don't introduce regression. + pager.PageSize = 0 } - list, err = pager.List(context.Background(), options) + list, paginatedResult, err = pager.List(context.Background(), options) if isExpiredError(err) { r.setIsLastSyncResourceVersionExpired(true) // Retry immediately if the resource version used to list is expired. @@ -250,7 +258,7 @@ func (r *Reflector) ListAndWatch(stopCh <-chan struct{}) error { // continuation pages, but the pager might not be enabled, or the full list might fail because the // resource version it is listing at is expired, so we need to fallback to resourceVersion="" in all // to recover and ensure the reflector makes forward progress. - list, err = pager.List(context.Background(), metav1.ListOptions{ResourceVersion: r.relistResourceVersion()}) + list, paginatedResult, err = pager.List(context.Background(), metav1.ListOptions{ResourceVersion: r.relistResourceVersion()}) } close(listCh) }() @@ -264,6 +272,21 @@ func (r *Reflector) ListAndWatch(stopCh <-chan struct{}) error { if err != nil { return fmt.Errorf("%s: Failed to list %v: %v", r.name, r.expectedTypeName, err) } + + // We check if the list was paginated and if so set the paginatedResult based on that. + // However, we want to do that only for the initial list (which is the only case + // when we set ResourceVersion="0"). The reasoning behind it is that later, in some + // situations we may force listing directly from etcd (by setting ResourceVersion="") + // which will return paginated result, even if watch cache is enabled. However, in + // that case, we still want to prefer sending requests to watch cache if possible. + // + // Paginated result returned for request with ResourceVersion="0" mean that watch + // cache is disabled and there are a lot of objects of a given type. In such case, + // there is no need to prefer listing from watch cache. + if options.ResourceVersion == "0" && paginatedResult { + r.paginatedResult = true + } + r.setIsLastSyncResourceVersionExpired(false) // list was successful initTrace.Step("Objects listed") listMetaInterface, err := meta.ListAccessor(list) diff --git a/tools/cache/reflector_test.go b/tools/cache/reflector_test.go index 973403ec..59b66256 100644 --- a/tools/cache/reflector_test.go +++ b/tools/cache/reflector_test.go @@ -472,6 +472,57 @@ func TestReflectorNotPaginatingNotConsistentReads(t *testing.T) { } } +func TestReflectorPaginatingNonConsistentReadsIfWatchCacheDisabled(t *testing.T) { + var stopCh chan struct{} + s := NewStore(MetaNamespaceKeyFunc) + + lw := &testLW{ + WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) { + // Stop once the reflector begins watching since we're only interested in the list. + close(stopCh) + fw := watch.NewFake() + return fw, nil + }, + ListFunc: func(options metav1.ListOptions) (runtime.Object, error) { + // Check that default pager limit is set. + if options.Limit != 500 { + t.Fatalf("Expected list Limit of 500 but got %d", options.Limit) + } + pods := make([]v1.Pod, 10) + for i := 0; i < 10; i++ { + pods[i] = v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: fmt.Sprintf("pod-%d", i), ResourceVersion: fmt.Sprintf("%d", i)}} + } + switch options.Continue { + case "": + return &v1.PodList{ListMeta: metav1.ListMeta{ResourceVersion: "10", Continue: "C1"}, Items: pods[0:4]}, nil + case "C1": + return &v1.PodList{ListMeta: metav1.ListMeta{ResourceVersion: "10", Continue: "C2"}, Items: pods[4:8]}, nil + case "C2": + return &v1.PodList{ListMeta: metav1.ListMeta{ResourceVersion: "10"}, Items: pods[8:10]}, nil + default: + t.Fatalf("Unrecognized continue: %s", options.Continue) + } + return nil, nil + }, + } + r := NewReflector(lw, &v1.Pod{}, s, 0) + + // Initial list should initialize paginatedResult in the reflector. + stopCh = make(chan struct{}) + r.ListAndWatch(stopCh) + if results := s.List(); len(results) != 10 { + t.Errorf("Expected 10 results, got %d", len(results)) + } + + // Since initial list for ResourceVersion="0" was paginated, the subsequent + // ones should also be paginated. + stopCh = make(chan struct{}) + r.ListAndWatch(stopCh) + if results := s.List(); len(results) != 10 { + t.Errorf("Expected 10 results, got %d", len(results)) + } +} + // TestReflectorResyncWithResourceVersion ensures that a reflector keeps track of the ResourceVersion and sends // it in relist requests to prevent the reflector from traveling back in time if the relist is to a api-server or // etcd that is partitioned and serving older data than the reflector has already processed. diff --git a/tools/pager/pager.go b/tools/pager/pager.go index 307808be..f6c6a012 100644 --- a/tools/pager/pager.go +++ b/tools/pager/pager.go @@ -73,16 +73,18 @@ func New(fn ListPageFunc) *ListPager { // List returns a single list object, but attempts to retrieve smaller chunks from the // server to reduce the impact on the server. If the chunk attempt fails, it will load // the full list instead. The Limit field on options, if unset, will default to the page size. -func (p *ListPager) List(ctx context.Context, options metav1.ListOptions) (runtime.Object, error) { +func (p *ListPager) List(ctx context.Context, options metav1.ListOptions) (runtime.Object, bool, error) { if options.Limit == 0 { options.Limit = p.PageSize } requestedResourceVersion := options.ResourceVersion var list *metainternalversion.List + paginatedResult := false + for { select { case <-ctx.Done(): - return nil, ctx.Err() + return nil, paginatedResult, ctx.Err() default: } @@ -93,23 +95,24 @@ func (p *ListPager) List(ctx context.Context, options metav1.ListOptions) (runti // failing when the resource versions is established by the first page request falls out of the compaction // during the subsequent list requests). if !errors.IsResourceExpired(err) || !p.FullListIfExpired || options.Continue == "" { - return nil, err + return nil, paginatedResult, err } // the list expired while we were processing, fall back to a full list at // the requested ResourceVersion. options.Limit = 0 options.Continue = "" options.ResourceVersion = requestedResourceVersion - return p.PageFn(ctx, options) + result, err := p.PageFn(ctx, options) + return result, paginatedResult, err } m, err := meta.ListAccessor(obj) if err != nil { - return nil, fmt.Errorf("returned object must be a list: %v", err) + return nil, paginatedResult, fmt.Errorf("returned object must be a list: %v", err) } // exit early and return the object we got if we haven't processed any pages if len(m.GetContinue()) == 0 && list == nil { - return obj, nil + return obj, paginatedResult, nil } // initialize the list and fill its contents @@ -122,12 +125,12 @@ func (p *ListPager) List(ctx context.Context, options metav1.ListOptions) (runti list.Items = append(list.Items, obj) return nil }); err != nil { - return nil, err + return nil, paginatedResult, err } // if we have no more items, return the list if len(m.GetContinue()) == 0 { - return list, nil + return list, paginatedResult, nil } // set the next loop up @@ -136,6 +139,8 @@ func (p *ListPager) List(ctx context.Context, options metav1.ListOptions) (runti // `specifying resource version is not allowed when using continue` error. // See https://github.com/kubernetes/kubernetes/issues/85221#issuecomment-553748143. options.ResourceVersion = "" + // At this point, result is already paginated. + paginatedResult = true } } diff --git a/tools/pager/pager_test.go b/tools/pager/pager_test.go index 35d4449f..86ec9f6e 100644 --- a/tools/pager/pager_test.go +++ b/tools/pager/pager_test.go @@ -120,6 +120,7 @@ func (p *testPager) ExpiresOnSecondPageThenFullList(ctx context.Context, options } return p.PagedList(ctx, options) } + func TestListPager_List(t *testing.T) { type fields struct { PageSize int64 @@ -135,43 +136,50 @@ func TestListPager_List(t *testing.T) { fields fields args args want runtime.Object + wantPaged bool wantErr bool isExpired bool }{ { - name: "empty page", - fields: fields{PageSize: 10, PageFn: (&testPager{t: t, expectPage: 10, remaining: 0, rv: "rv:20"}).PagedList}, - args: args{}, - want: list(0, "rv:20"), + name: "empty page", + fields: fields{PageSize: 10, PageFn: (&testPager{t: t, expectPage: 10, remaining: 0, rv: "rv:20"}).PagedList}, + args: args{}, + want: list(0, "rv:20"), + wantPaged: false, }, { - name: "one page", - fields: fields{PageSize: 10, PageFn: (&testPager{t: t, expectPage: 10, remaining: 9, rv: "rv:20"}).PagedList}, - args: args{}, - want: list(9, "rv:20"), + name: "one page", + fields: fields{PageSize: 10, PageFn: (&testPager{t: t, expectPage: 10, remaining: 9, rv: "rv:20"}).PagedList}, + args: args{}, + want: list(9, "rv:20"), + wantPaged: false, }, { - name: "one full page", - fields: fields{PageSize: 10, PageFn: (&testPager{t: t, expectPage: 10, remaining: 10, rv: "rv:20"}).PagedList}, - args: args{}, - want: list(10, "rv:20"), + name: "one full page", + fields: fields{PageSize: 10, PageFn: (&testPager{t: t, expectPage: 10, remaining: 10, rv: "rv:20"}).PagedList}, + args: args{}, + want: list(10, "rv:20"), + wantPaged: false, }, { - name: "two pages", - fields: fields{PageSize: 10, PageFn: (&testPager{t: t, expectPage: 10, remaining: 11, rv: "rv:20"}).PagedList}, - args: args{}, - want: list(11, "rv:20"), + name: "two pages", + fields: fields{PageSize: 10, PageFn: (&testPager{t: t, expectPage: 10, remaining: 11, rv: "rv:20"}).PagedList}, + args: args{}, + want: list(11, "rv:20"), + wantPaged: true, }, { - name: "three pages", - fields: fields{PageSize: 10, PageFn: (&testPager{t: t, expectPage: 10, remaining: 21, rv: "rv:20"}).PagedList}, - args: args{}, - want: list(21, "rv:20"), + name: "three pages", + fields: fields{PageSize: 10, PageFn: (&testPager{t: t, expectPage: 10, remaining: 21, rv: "rv:20"}).PagedList}, + args: args{}, + want: list(21, "rv:20"), + wantPaged: true, }, { name: "expires on second page", fields: fields{PageSize: 10, PageFn: (&testPager{t: t, expectPage: 10, remaining: 21, rv: "rv:20"}).ExpiresOnSecondPage}, args: args{}, + wantPaged: true, wantErr: true, isExpired: true, }, @@ -182,14 +190,16 @@ func TestListPager_List(t *testing.T) { PageSize: 10, PageFn: (&testPager{t: t, expectPage: 10, remaining: 21, rv: "rv:20"}).ExpiresOnSecondPageThenFullList, }, - args: args{}, - want: list(21, "rv:20"), + args: args{}, + want: list(21, "rv:20"), + wantPaged: true, }, { - name: "two pages with resourceVersion", - fields: fields{PageSize: 10, PageFn: (&testPager{t: t, expectPage: 10, remaining: 11, rv: "rv:20"}).PagedList}, - args: args{options: metav1.ListOptions{ResourceVersion: "rv:10"}}, - want: list(11, "rv:20"), + name: "two pages with resourceVersion", + fields: fields{PageSize: 10, PageFn: (&testPager{t: t, expectPage: 10, remaining: 11, rv: "rv:20"}).PagedList}, + args: args{options: metav1.ListOptions{ResourceVersion: "rv:10"}}, + want: list(11, "rv:20"), + wantPaged: true, }, } for _, tt := range tests { @@ -203,7 +213,7 @@ func TestListPager_List(t *testing.T) { if ctx == nil { ctx = context.Background() } - got, err := p.List(ctx, tt.args.options) + got, paginatedResult, err := p.List(ctx, tt.args.options) if (err != nil) != tt.wantErr { t.Errorf("ListPager.List() error = %v, wantErr %v", err, tt.wantErr) return @@ -212,6 +222,9 @@ func TestListPager_List(t *testing.T) { t.Errorf("ListPager.List() error = %v, isExpired %v", err, tt.isExpired) return } + if tt.wantPaged != paginatedResult { + t.Errorf("paginatedResult = %t, want %t", paginatedResult, tt.wantPaged) + } if !reflect.DeepEqual(got, tt.want) { t.Errorf("ListPager.List() = %v, want %v", got, tt.want) }