diff --git a/tools/cache/reflector.go b/tools/cache/reflector.go index 2a1af4e9..d0c3190a 100644 --- a/tools/cache/reflector.go +++ b/tools/cache/reflector.go @@ -75,6 +75,9 @@ type Reflector struct { ShouldResync func() bool // clock allows tests to manipulate time clock clock.Clock + // paginatedResult defines whether pagination should be forced for list calls. + // It is set based on the result of the initial list call. + paginatedResult bool // lastSyncResourceVersion is the resource version token last // observed when doing a sync with the underlying store // it is thread safe, but not synchronized with the underlying store @@ -209,6 +212,7 @@ func (r *Reflector) ListAndWatch(stopCh <-chan struct{}) error { initTrace := trace.New("Reflector ListAndWatch", trace.Field{"name", r.name}) defer initTrace.LogIfLong(10 * time.Second) var list runtime.Object + var paginatedResult bool var err error listCh := make(chan struct{}, 1) panicCh := make(chan interface{}, 1) @@ -223,26 +227,30 @@ func (r *Reflector) ListAndWatch(stopCh <-chan struct{}) error { pager := pager.New(pager.SimplePageFunc(func(opts metav1.ListOptions) (runtime.Object, error) { return r.listerWatcher.List(opts) })) - if r.WatchListPageSize != 0 { + switch { + case r.WatchListPageSize != 0: pager.PageSize = r.WatchListPageSize - } else { + case r.paginatedResult: + // We got a paginated result initially. Assume this resource and server honor + // paging requests (i.e. watch cache is probably disabled) and leave the default + // pager size set. + case options.ResourceVersion != "" && options.ResourceVersion != "0": // User didn't explicitly request pagination. - if options.ResourceVersion != "" && options.ResourceVersion != "0" { - // We also don't turn off pagination for ResourceVersion="0", since watch cache - // is ignoring Limit in that case anyway, and if watchcache is not enabled we - // don't introduce regression. - - // With ResourceVersion != "", we have a possibility to list from watch cache, - // but we do that (for ResourceVersion != "0") only if Limit is unset. - // To avoid thundering herd on etcd (e.g. on master upgrades), we explicitly - // switch off pagination to force listing from watch cache (if enabled). - // With the existing semantic of RV (result is at least as fresh as provided RV), - // this is correct and doesn't lead to going back in time. - pager.PageSize = 0 - } + // + // With ResourceVersion != "", we have a possibility to list from watch cache, + // but we do that (for ResourceVersion != "0") only if Limit is unset. + // To avoid thundering herd on etcd (e.g. on master upgrades), we explicitly + // switch off pagination to force listing from watch cache (if enabled). + // With the existing semantic of RV (result is at least as fresh as provided RV), + // this is correct and doesn't lead to going back in time. + // + // We also don't turn off pagination for ResourceVersion="0", since watch cache + // is ignoring Limit in that case anyway, and if watch cache is not enabled + // we don't introduce regression. + pager.PageSize = 0 } - list, err = pager.List(context.Background(), options) + list, paginatedResult, err = pager.List(context.Background(), options) if isExpiredError(err) { r.setIsLastSyncResourceVersionExpired(true) // Retry immediately if the resource version used to list is expired. @@ -250,7 +258,7 @@ func (r *Reflector) ListAndWatch(stopCh <-chan struct{}) error { // continuation pages, but the pager might not be enabled, or the full list might fail because the // resource version it is listing at is expired, so we need to fallback to resourceVersion="" in all // to recover and ensure the reflector makes forward progress. - list, err = pager.List(context.Background(), metav1.ListOptions{ResourceVersion: r.relistResourceVersion()}) + list, paginatedResult, err = pager.List(context.Background(), metav1.ListOptions{ResourceVersion: r.relistResourceVersion()}) } close(listCh) }() @@ -264,6 +272,21 @@ func (r *Reflector) ListAndWatch(stopCh <-chan struct{}) error { if err != nil { return fmt.Errorf("%s: Failed to list %v: %v", r.name, r.expectedTypeName, err) } + + // We check if the list was paginated and if so set the paginatedResult based on that. + // However, we want to do that only for the initial list (which is the only case + // when we set ResourceVersion="0"). The reasoning behind it is that later, in some + // situations we may force listing directly from etcd (by setting ResourceVersion="") + // which will return paginated result, even if watch cache is enabled. However, in + // that case, we still want to prefer sending requests to watch cache if possible. + // + // Paginated result returned for request with ResourceVersion="0" mean that watch + // cache is disabled and there are a lot of objects of a given type. In such case, + // there is no need to prefer listing from watch cache. + if options.ResourceVersion == "0" && paginatedResult { + r.paginatedResult = true + } + r.setIsLastSyncResourceVersionExpired(false) // list was successful initTrace.Step("Objects listed") listMetaInterface, err := meta.ListAccessor(list) diff --git a/tools/cache/reflector_test.go b/tools/cache/reflector_test.go index 973403ec..59b66256 100644 --- a/tools/cache/reflector_test.go +++ b/tools/cache/reflector_test.go @@ -472,6 +472,57 @@ func TestReflectorNotPaginatingNotConsistentReads(t *testing.T) { } } +func TestReflectorPaginatingNonConsistentReadsIfWatchCacheDisabled(t *testing.T) { + var stopCh chan struct{} + s := NewStore(MetaNamespaceKeyFunc) + + lw := &testLW{ + WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) { + // Stop once the reflector begins watching since we're only interested in the list. + close(stopCh) + fw := watch.NewFake() + return fw, nil + }, + ListFunc: func(options metav1.ListOptions) (runtime.Object, error) { + // Check that default pager limit is set. + if options.Limit != 500 { + t.Fatalf("Expected list Limit of 500 but got %d", options.Limit) + } + pods := make([]v1.Pod, 10) + for i := 0; i < 10; i++ { + pods[i] = v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: fmt.Sprintf("pod-%d", i), ResourceVersion: fmt.Sprintf("%d", i)}} + } + switch options.Continue { + case "": + return &v1.PodList{ListMeta: metav1.ListMeta{ResourceVersion: "10", Continue: "C1"}, Items: pods[0:4]}, nil + case "C1": + return &v1.PodList{ListMeta: metav1.ListMeta{ResourceVersion: "10", Continue: "C2"}, Items: pods[4:8]}, nil + case "C2": + return &v1.PodList{ListMeta: metav1.ListMeta{ResourceVersion: "10"}, Items: pods[8:10]}, nil + default: + t.Fatalf("Unrecognized continue: %s", options.Continue) + } + return nil, nil + }, + } + r := NewReflector(lw, &v1.Pod{}, s, 0) + + // Initial list should initialize paginatedResult in the reflector. + stopCh = make(chan struct{}) + r.ListAndWatch(stopCh) + if results := s.List(); len(results) != 10 { + t.Errorf("Expected 10 results, got %d", len(results)) + } + + // Since initial list for ResourceVersion="0" was paginated, the subsequent + // ones should also be paginated. + stopCh = make(chan struct{}) + r.ListAndWatch(stopCh) + if results := s.List(); len(results) != 10 { + t.Errorf("Expected 10 results, got %d", len(results)) + } +} + // TestReflectorResyncWithResourceVersion ensures that a reflector keeps track of the ResourceVersion and sends // it in relist requests to prevent the reflector from traveling back in time if the relist is to a api-server or // etcd that is partitioned and serving older data than the reflector has already processed. diff --git a/tools/pager/pager.go b/tools/pager/pager.go index 307808be..f6c6a012 100644 --- a/tools/pager/pager.go +++ b/tools/pager/pager.go @@ -73,16 +73,18 @@ func New(fn ListPageFunc) *ListPager { // List returns a single list object, but attempts to retrieve smaller chunks from the // server to reduce the impact on the server. If the chunk attempt fails, it will load // the full list instead. The Limit field on options, if unset, will default to the page size. -func (p *ListPager) List(ctx context.Context, options metav1.ListOptions) (runtime.Object, error) { +func (p *ListPager) List(ctx context.Context, options metav1.ListOptions) (runtime.Object, bool, error) { if options.Limit == 0 { options.Limit = p.PageSize } requestedResourceVersion := options.ResourceVersion var list *metainternalversion.List + paginatedResult := false + for { select { case <-ctx.Done(): - return nil, ctx.Err() + return nil, paginatedResult, ctx.Err() default: } @@ -93,23 +95,24 @@ func (p *ListPager) List(ctx context.Context, options metav1.ListOptions) (runti // failing when the resource versions is established by the first page request falls out of the compaction // during the subsequent list requests). if !errors.IsResourceExpired(err) || !p.FullListIfExpired || options.Continue == "" { - return nil, err + return nil, paginatedResult, err } // the list expired while we were processing, fall back to a full list at // the requested ResourceVersion. options.Limit = 0 options.Continue = "" options.ResourceVersion = requestedResourceVersion - return p.PageFn(ctx, options) + result, err := p.PageFn(ctx, options) + return result, paginatedResult, err } m, err := meta.ListAccessor(obj) if err != nil { - return nil, fmt.Errorf("returned object must be a list: %v", err) + return nil, paginatedResult, fmt.Errorf("returned object must be a list: %v", err) } // exit early and return the object we got if we haven't processed any pages if len(m.GetContinue()) == 0 && list == nil { - return obj, nil + return obj, paginatedResult, nil } // initialize the list and fill its contents @@ -122,12 +125,12 @@ func (p *ListPager) List(ctx context.Context, options metav1.ListOptions) (runti list.Items = append(list.Items, obj) return nil }); err != nil { - return nil, err + return nil, paginatedResult, err } // if we have no more items, return the list if len(m.GetContinue()) == 0 { - return list, nil + return list, paginatedResult, nil } // set the next loop up @@ -136,6 +139,8 @@ func (p *ListPager) List(ctx context.Context, options metav1.ListOptions) (runti // `specifying resource version is not allowed when using continue` error. // See https://github.com/kubernetes/kubernetes/issues/85221#issuecomment-553748143. options.ResourceVersion = "" + // At this point, result is already paginated. + paginatedResult = true } } diff --git a/tools/pager/pager_test.go b/tools/pager/pager_test.go index 35d4449f..86ec9f6e 100644 --- a/tools/pager/pager_test.go +++ b/tools/pager/pager_test.go @@ -120,6 +120,7 @@ func (p *testPager) ExpiresOnSecondPageThenFullList(ctx context.Context, options } return p.PagedList(ctx, options) } + func TestListPager_List(t *testing.T) { type fields struct { PageSize int64 @@ -135,43 +136,50 @@ func TestListPager_List(t *testing.T) { fields fields args args want runtime.Object + wantPaged bool wantErr bool isExpired bool }{ { - name: "empty page", - fields: fields{PageSize: 10, PageFn: (&testPager{t: t, expectPage: 10, remaining: 0, rv: "rv:20"}).PagedList}, - args: args{}, - want: list(0, "rv:20"), + name: "empty page", + fields: fields{PageSize: 10, PageFn: (&testPager{t: t, expectPage: 10, remaining: 0, rv: "rv:20"}).PagedList}, + args: args{}, + want: list(0, "rv:20"), + wantPaged: false, }, { - name: "one page", - fields: fields{PageSize: 10, PageFn: (&testPager{t: t, expectPage: 10, remaining: 9, rv: "rv:20"}).PagedList}, - args: args{}, - want: list(9, "rv:20"), + name: "one page", + fields: fields{PageSize: 10, PageFn: (&testPager{t: t, expectPage: 10, remaining: 9, rv: "rv:20"}).PagedList}, + args: args{}, + want: list(9, "rv:20"), + wantPaged: false, }, { - name: "one full page", - fields: fields{PageSize: 10, PageFn: (&testPager{t: t, expectPage: 10, remaining: 10, rv: "rv:20"}).PagedList}, - args: args{}, - want: list(10, "rv:20"), + name: "one full page", + fields: fields{PageSize: 10, PageFn: (&testPager{t: t, expectPage: 10, remaining: 10, rv: "rv:20"}).PagedList}, + args: args{}, + want: list(10, "rv:20"), + wantPaged: false, }, { - name: "two pages", - fields: fields{PageSize: 10, PageFn: (&testPager{t: t, expectPage: 10, remaining: 11, rv: "rv:20"}).PagedList}, - args: args{}, - want: list(11, "rv:20"), + name: "two pages", + fields: fields{PageSize: 10, PageFn: (&testPager{t: t, expectPage: 10, remaining: 11, rv: "rv:20"}).PagedList}, + args: args{}, + want: list(11, "rv:20"), + wantPaged: true, }, { - name: "three pages", - fields: fields{PageSize: 10, PageFn: (&testPager{t: t, expectPage: 10, remaining: 21, rv: "rv:20"}).PagedList}, - args: args{}, - want: list(21, "rv:20"), + name: "three pages", + fields: fields{PageSize: 10, PageFn: (&testPager{t: t, expectPage: 10, remaining: 21, rv: "rv:20"}).PagedList}, + args: args{}, + want: list(21, "rv:20"), + wantPaged: true, }, { name: "expires on second page", fields: fields{PageSize: 10, PageFn: (&testPager{t: t, expectPage: 10, remaining: 21, rv: "rv:20"}).ExpiresOnSecondPage}, args: args{}, + wantPaged: true, wantErr: true, isExpired: true, }, @@ -182,14 +190,16 @@ func TestListPager_List(t *testing.T) { PageSize: 10, PageFn: (&testPager{t: t, expectPage: 10, remaining: 21, rv: "rv:20"}).ExpiresOnSecondPageThenFullList, }, - args: args{}, - want: list(21, "rv:20"), + args: args{}, + want: list(21, "rv:20"), + wantPaged: true, }, { - name: "two pages with resourceVersion", - fields: fields{PageSize: 10, PageFn: (&testPager{t: t, expectPage: 10, remaining: 11, rv: "rv:20"}).PagedList}, - args: args{options: metav1.ListOptions{ResourceVersion: "rv:10"}}, - want: list(11, "rv:20"), + name: "two pages with resourceVersion", + fields: fields{PageSize: 10, PageFn: (&testPager{t: t, expectPage: 10, remaining: 11, rv: "rv:20"}).PagedList}, + args: args{options: metav1.ListOptions{ResourceVersion: "rv:10"}}, + want: list(11, "rv:20"), + wantPaged: true, }, } for _, tt := range tests { @@ -203,7 +213,7 @@ func TestListPager_List(t *testing.T) { if ctx == nil { ctx = context.Background() } - got, err := p.List(ctx, tt.args.options) + got, paginatedResult, err := p.List(ctx, tt.args.options) if (err != nil) != tt.wantErr { t.Errorf("ListPager.List() error = %v, wantErr %v", err, tt.wantErr) return @@ -212,6 +222,9 @@ func TestListPager_List(t *testing.T) { t.Errorf("ListPager.List() error = %v, isExpired %v", err, tt.isExpired) return } + if tt.wantPaged != paginatedResult { + t.Errorf("paginatedResult = %t, want %t", paginatedResult, tt.wantPaged) + } if !reflect.DeepEqual(got, tt.want) { t.Errorf("ListPager.List() = %v, want %v", got, tt.want) }