diff --git a/staging/src/k8s.io/client-go/tools/cache/reflector_test.go b/staging/src/k8s.io/client-go/tools/cache/reflector_test.go index 65f5124f0ef..2b0796c6ac4 100644 --- a/staging/src/k8s.io/client-go/tools/cache/reflector_test.go +++ b/staging/src/k8s.io/client-go/tools/cache/reflector_test.go @@ -572,27 +572,26 @@ func TestReflectorFullListIfExpired(t *testing.T) { for i := 0; i < 8; i++ { pods[i] = v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: fmt.Sprintf("pod-%d", i), ResourceVersion: fmt.Sprintf("%d", i)}} } - switch options.ResourceVersion { - case "0": + rvContinueLimit := func(rv, c string, l int64) metav1.ListOptions { + return metav1.ListOptions{ResourceVersion: rv, Continue: c, Limit: l} + } + switch rvContinueLimit(options.ResourceVersion, options.Continue, options.Limit) { + // initial limited list + case rvContinueLimit("0", "", 4): return &v1.PodList{ListMeta: metav1.ListMeta{ResourceVersion: "10"}, Items: pods[0:4]}, nil - case "10": - switch options.Limit { - case 4: - switch options.Continue { - case "": - return &v1.PodList{ListMeta: metav1.ListMeta{Continue: "C1", ResourceVersion: "11"}, Items: pods[0:4]}, nil - case "C1": - return nil, apierrs.NewResourceExpired("The resourceVersion for the provided watch is too old.") - default: - t.Fatalf("Unrecognized Continue: %s", options.Continue) - } - case 0: - return &v1.PodList{ListMeta: metav1.ListMeta{ResourceVersion: "11"}, Items: pods[0:8]}, nil - default: - t.Fatalf("Unrecognized Limit: %d", options.Limit) - } + // first page of the rv=10 list + case rvContinueLimit("10", "", 4): + return &v1.PodList{ListMeta: metav1.ListMeta{Continue: "C1", ResourceVersion: "11"}, Items: pods[0:4]}, nil + // second page of the above list + case rvContinueLimit("", "C1", 4): + return nil, apierrs.NewResourceExpired("The resourceVersion for the provided watch is too old.") + // rv=10 unlimited list + case rvContinueLimit("10", "", 0): + return &v1.PodList{ListMeta: metav1.ListMeta{ResourceVersion: "11"}, Items: pods[0:8]}, nil default: - t.Fatalf("Unrecognized ResourceVersion: %s", options.ResourceVersion) + err := fmt.Errorf("unexpected list options: %#v", options) + t.Error(err) + return nil, err } return nil, nil }, @@ -601,25 +600,29 @@ func TestReflectorFullListIfExpired(t *testing.T) { r.WatchListPageSize = 4 // Initial list should use RV=0 - r.ListAndWatch(stopCh) + if err := r.ListAndWatch(stopCh); err != nil { + t.Fatal(err) + } results := s.List() if len(results) != 4 { t.Errorf("Expected 4 results, got %d", len(results)) } - // relist should use lastSyncResourceVersions (RV=10) and since second page of RV=10 is expired, it should full list with RV=10 + // relist should use lastSyncResourceVersions (RV=10) and since second page of that expired, it should full list with RV=10 stopCh = make(chan struct{}) - r.ListAndWatch(stopCh) + if err := r.ListAndWatch(stopCh); err != nil { + t.Fatal(err) + } results = s.List() if len(results) != 8 { t.Errorf("Expected 8 results, got %d", len(results)) } - expectedRVs := []string{"0", "10", "10", "10"} + expectedRVs := []string{"0", "10", "", "10"} if !reflect.DeepEqual(listCallRVs, expectedRVs) { - t.Errorf("Expected series of list calls with resource versiosn of %v but got: %v", expectedRVs, listCallRVs) + t.Errorf("Expected series of list calls with resource versiosn of %#v but got: %#v", expectedRVs, listCallRVs) } } diff --git a/staging/src/k8s.io/client-go/tools/pager/pager.go b/staging/src/k8s.io/client-go/tools/pager/pager.go index 25d190efa68..307808be1e1 100644 --- a/staging/src/k8s.io/client-go/tools/pager/pager.go +++ b/staging/src/k8s.io/client-go/tools/pager/pager.go @@ -77,6 +77,7 @@ func (p *ListPager) List(ctx context.Context, options metav1.ListOptions) (runti if options.Limit == 0 { options.Limit = p.PageSize } + requestedResourceVersion := options.ResourceVersion var list *metainternalversion.List for { select { @@ -94,9 +95,11 @@ func (p *ListPager) List(ctx context.Context, options metav1.ListOptions) (runti if !errors.IsResourceExpired(err) || !p.FullListIfExpired || options.Continue == "" { return nil, err } - // the list expired while we were processing, fall back to a full list + // the list expired while we were processing, fall back to a full list at + // the requested ResourceVersion. options.Limit = 0 options.Continue = "" + options.ResourceVersion = requestedResourceVersion return p.PageFn(ctx, options) } m, err := meta.ListAccessor(obj) @@ -129,6 +132,10 @@ func (p *ListPager) List(ctx context.Context, options metav1.ListOptions) (runti // set the next loop up options.Continue = m.GetContinue() + // Clear the ResourceVersion on the subsequent List calls to avoid the + // `specifying resource version is not allowed when using continue` error. + // See https://github.com/kubernetes/kubernetes/issues/85221#issuecomment-553748143. + options.ResourceVersion = "" } } diff --git a/staging/src/k8s.io/client-go/tools/pager/pager_test.go b/staging/src/k8s.io/client-go/tools/pager/pager_test.go index 2332b53d78f..35d4449fb40 100644 --- a/staging/src/k8s.io/client-go/tools/pager/pager_test.go +++ b/staging/src/k8s.io/client-go/tools/pager/pager_test.go @@ -72,6 +72,10 @@ func (p *testPager) PagedList(ctx context.Context, options metav1.ListOptions) ( p.t.Errorf("invariant violated, expected limit %d and continue %s, got %#v", p.expectPage, expectedContinue, options) return nil, fmt.Errorf("invariant violated") } + if options.Continue != "" && options.ResourceVersion != "" { + p.t.Errorf("invariant violated, specifying resource version (%s) is not allowed when using continue (%s).", options.ResourceVersion, options.Continue) + return nil, fmt.Errorf("invariant violated") + } var list metainternalversion.List total := options.Limit if total == 0 { @@ -181,6 +185,12 @@ func TestListPager_List(t *testing.T) { args: args{}, want: list(21, "rv:20"), }, + { + name: "two pages with resourceVersion", + fields: fields{PageSize: 10, PageFn: (&testPager{t: t, expectPage: 10, remaining: 11, rv: "rv:20"}).PagedList}, + args: args{options: metav1.ListOptions{ResourceVersion: "rv:10"}}, + want: list(11, "rv:20"), + }, } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) {