Merge pull request #85272 from mm4tt/pager_fix

pager.go: don't set ResourceVersion on subsequent List calls

Kubernetes-commit: 452c8c9ad303933123ad1f0b208bc935315e8a67
This commit is contained in:
Kubernetes Publisher 2019-11-15 04:51:42 -08:00
commit 050872ffe7
6 changed files with 53 additions and 33 deletions

4
Godeps/Godeps.json generated
View File

@ -348,11 +348,11 @@
},
{
"ImportPath": "k8s.io/api",
"Rev": "6cd33094d465"
"Rev": "bbc9463b57e5"
},
{
"ImportPath": "k8s.io/apimachinery",
"Rev": "67a48e0c9266"
"Rev": "3c7067801da2"
},
{
"ImportPath": "k8s.io/gengo",

8
go.mod
View File

@ -28,8 +28,8 @@ require (
golang.org/x/oauth2 v0.0.0-20190604053449-0f29369cfe45
golang.org/x/time v0.0.0-20190308202827-9d24e82272b4
google.golang.org/appengine v1.5.0 // indirect
k8s.io/api v0.0.0-20191114215726-6cd33094d465
k8s.io/apimachinery v0.0.0-20191114215425-67a48e0c9266
k8s.io/api v0.0.0-20191115135540-bbc9463b57e5
k8s.io/apimachinery v0.0.0-20191115015347-3c7067801da2
k8s.io/klog v1.0.0
k8s.io/utils v0.0.0-20191030222137-2b95a09bc58d
sigs.k8s.io/yaml v1.1.0
@ -38,6 +38,6 @@ require (
replace (
golang.org/x/sys => golang.org/x/sys v0.0.0-20190813064441-fde4db37ae7a // pinned to release-branch.go1.13
golang.org/x/tools => golang.org/x/tools v0.0.0-20190821162956-65e3620a7ae7 // pinned to release-branch.go1.13
k8s.io/api => k8s.io/api v0.0.0-20191114215726-6cd33094d465
k8s.io/apimachinery => k8s.io/apimachinery v0.0.0-20191114215425-67a48e0c9266
k8s.io/api => k8s.io/api v0.0.0-20191115135540-bbc9463b57e5
k8s.io/apimachinery => k8s.io/apimachinery v0.0.0-20191115015347-3c7067801da2
)

4
go.sum
View File

@ -191,8 +191,8 @@ gopkg.in/yaml.v2 v2.2.4 h1:/eiJrUcujPVeJ3xlSWaiNi3uSVmDGBK1pDHUHAnao1I=
gopkg.in/yaml.v2 v2.2.4/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
honnef.co/go/tools v0.0.0-20190102054323-c2f93a96b099/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4=
honnef.co/go/tools v0.0.0-20190106161140-3f1c8253044a/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4=
k8s.io/api v0.0.0-20191114215726-6cd33094d465/go.mod h1:TnwOuwwOmtUq5BythlhmbJs+lwQGEsSJ6N4S4ODl2m8=
k8s.io/apimachinery v0.0.0-20191114215425-67a48e0c9266/go.mod h1:dXFS2zaQR8fyzuvRdJDHw2Aerij/yVGJSre0bZQSVJA=
k8s.io/api v0.0.0-20191115135540-bbc9463b57e5/go.mod h1:iA/8arsvelvo4IDqIhX4IbjTEKBGgvsf2OraTuRtLFU=
k8s.io/apimachinery v0.0.0-20191115015347-3c7067801da2/go.mod h1:dXFS2zaQR8fyzuvRdJDHw2Aerij/yVGJSre0bZQSVJA=
k8s.io/gengo v0.0.0-20190128074634-0689ccc1d7d6/go.mod h1:ezvh/TsK7cY6rbqRK0oQQ8IAqLxYwwyPxAX1Pzy0ii0=
k8s.io/klog v0.0.0-20181102134211-b9b56d5dfc92/go.mod h1:Gq+BEi5rUBO/HRz0bTSXDUcqjScdoY3a9IHpCEIOOfk=
k8s.io/klog v0.3.0/go.mod h1:Gq+BEi5rUBO/HRz0bTSXDUcqjScdoY3a9IHpCEIOOfk=

View File

@ -572,27 +572,26 @@ func TestReflectorFullListIfExpired(t *testing.T) {
for i := 0; i < 8; i++ {
pods[i] = v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: fmt.Sprintf("pod-%d", i), ResourceVersion: fmt.Sprintf("%d", i)}}
}
switch options.ResourceVersion {
case "0":
rvContinueLimit := func(rv, c string, l int64) metav1.ListOptions {
return metav1.ListOptions{ResourceVersion: rv, Continue: c, Limit: l}
}
switch rvContinueLimit(options.ResourceVersion, options.Continue, options.Limit) {
// initial limited list
case rvContinueLimit("0", "", 4):
return &v1.PodList{ListMeta: metav1.ListMeta{ResourceVersion: "10"}, Items: pods[0:4]}, nil
case "10":
switch options.Limit {
case 4:
switch options.Continue {
case "":
return &v1.PodList{ListMeta: metav1.ListMeta{Continue: "C1", ResourceVersion: "11"}, Items: pods[0:4]}, nil
case "C1":
return nil, apierrs.NewResourceExpired("The resourceVersion for the provided watch is too old.")
default:
t.Fatalf("Unrecognized Continue: %s", options.Continue)
}
case 0:
return &v1.PodList{ListMeta: metav1.ListMeta{ResourceVersion: "11"}, Items: pods[0:8]}, nil
default:
t.Fatalf("Unrecognized Limit: %d", options.Limit)
}
// first page of the rv=10 list
case rvContinueLimit("10", "", 4):
return &v1.PodList{ListMeta: metav1.ListMeta{Continue: "C1", ResourceVersion: "11"}, Items: pods[0:4]}, nil
// second page of the above list
case rvContinueLimit("", "C1", 4):
return nil, apierrs.NewResourceExpired("The resourceVersion for the provided watch is too old.")
// rv=10 unlimited list
case rvContinueLimit("10", "", 0):
return &v1.PodList{ListMeta: metav1.ListMeta{ResourceVersion: "11"}, Items: pods[0:8]}, nil
default:
t.Fatalf("Unrecognized ResourceVersion: %s", options.ResourceVersion)
err := fmt.Errorf("unexpected list options: %#v", options)
t.Error(err)
return nil, err
}
return nil, nil
},
@ -601,25 +600,29 @@ func TestReflectorFullListIfExpired(t *testing.T) {
r.WatchListPageSize = 4
// Initial list should use RV=0
r.ListAndWatch(stopCh)
if err := r.ListAndWatch(stopCh); err != nil {
t.Fatal(err)
}
results := s.List()
if len(results) != 4 {
t.Errorf("Expected 4 results, got %d", len(results))
}
// relist should use lastSyncResourceVersions (RV=10) and since second page of RV=10 is expired, it should full list with RV=10
// relist should use lastSyncResourceVersions (RV=10) and since second page of that expired, it should full list with RV=10
stopCh = make(chan struct{})
r.ListAndWatch(stopCh)
if err := r.ListAndWatch(stopCh); err != nil {
t.Fatal(err)
}
results = s.List()
if len(results) != 8 {
t.Errorf("Expected 8 results, got %d", len(results))
}
expectedRVs := []string{"0", "10", "10", "10"}
expectedRVs := []string{"0", "10", "", "10"}
if !reflect.DeepEqual(listCallRVs, expectedRVs) {
t.Errorf("Expected series of list calls with resource versiosn of %v but got: %v", expectedRVs, listCallRVs)
t.Errorf("Expected series of list calls with resource versiosn of %#v but got: %#v", expectedRVs, listCallRVs)
}
}

View File

@ -77,6 +77,7 @@ func (p *ListPager) List(ctx context.Context, options metav1.ListOptions) (runti
if options.Limit == 0 {
options.Limit = p.PageSize
}
requestedResourceVersion := options.ResourceVersion
var list *metainternalversion.List
for {
select {
@ -94,9 +95,11 @@ func (p *ListPager) List(ctx context.Context, options metav1.ListOptions) (runti
if !errors.IsResourceExpired(err) || !p.FullListIfExpired || options.Continue == "" {
return nil, err
}
// the list expired while we were processing, fall back to a full list
// the list expired while we were processing, fall back to a full list at
// the requested ResourceVersion.
options.Limit = 0
options.Continue = ""
options.ResourceVersion = requestedResourceVersion
return p.PageFn(ctx, options)
}
m, err := meta.ListAccessor(obj)
@ -129,6 +132,10 @@ func (p *ListPager) List(ctx context.Context, options metav1.ListOptions) (runti
// set the next loop up
options.Continue = m.GetContinue()
// Clear the ResourceVersion on the subsequent List calls to avoid the
// `specifying resource version is not allowed when using continue` error.
// See https://github.com/kubernetes/kubernetes/issues/85221#issuecomment-553748143.
options.ResourceVersion = ""
}
}

View File

@ -72,6 +72,10 @@ func (p *testPager) PagedList(ctx context.Context, options metav1.ListOptions) (
p.t.Errorf("invariant violated, expected limit %d and continue %s, got %#v", p.expectPage, expectedContinue, options)
return nil, fmt.Errorf("invariant violated")
}
if options.Continue != "" && options.ResourceVersion != "" {
p.t.Errorf("invariant violated, specifying resource version (%s) is not allowed when using continue (%s).", options.ResourceVersion, options.Continue)
return nil, fmt.Errorf("invariant violated")
}
var list metainternalversion.List
total := options.Limit
if total == 0 {
@ -181,6 +185,12 @@ func TestListPager_List(t *testing.T) {
args: args{},
want: list(21, "rv:20"),
},
{
name: "two pages with resourceVersion",
fields: fields{PageSize: 10, PageFn: (&testPager{t: t, expectPage: 10, remaining: 11, rv: "rv:20"}).PagedList},
args: args{options: metav1.ListOptions{ResourceVersion: "rv:10"}},
want: list(11, "rv:20"),
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {