Switch pager to return whether the result was paginated

Kubernetes-commit: 5dcf08c1993718e94cf6537af50e0a411dd2878c
This commit is contained in:
wojtekt 2020-01-28 09:39:27 +01:00 committed by Kubernetes Publisher
parent 5534e24283
commit 0e2dbbf70a
4 changed files with 144 additions and 52 deletions

View File

@ -75,6 +75,9 @@ type Reflector struct {
ShouldResync func() bool ShouldResync func() bool
// clock allows tests to manipulate time // clock allows tests to manipulate time
clock clock.Clock clock clock.Clock
// paginatedResult defines whether pagination should be forced for list calls.
// It is set based on the result of the initial list call.
paginatedResult bool
// lastSyncResourceVersion is the resource version token last // lastSyncResourceVersion is the resource version token last
// observed when doing a sync with the underlying store // observed when doing a sync with the underlying store
// it is thread safe, but not synchronized with the underlying store // it is thread safe, but not synchronized with the underlying store
@ -209,6 +212,7 @@ func (r *Reflector) ListAndWatch(stopCh <-chan struct{}) error {
initTrace := trace.New("Reflector ListAndWatch", trace.Field{"name", r.name}) initTrace := trace.New("Reflector ListAndWatch", trace.Field{"name", r.name})
defer initTrace.LogIfLong(10 * time.Second) defer initTrace.LogIfLong(10 * time.Second)
var list runtime.Object var list runtime.Object
var paginatedResult bool
var err error var err error
listCh := make(chan struct{}, 1) listCh := make(chan struct{}, 1)
panicCh := make(chan interface{}, 1) panicCh := make(chan interface{}, 1)
@ -223,26 +227,30 @@ func (r *Reflector) ListAndWatch(stopCh <-chan struct{}) error {
pager := pager.New(pager.SimplePageFunc(func(opts metav1.ListOptions) (runtime.Object, error) { pager := pager.New(pager.SimplePageFunc(func(opts metav1.ListOptions) (runtime.Object, error) {
return r.listerWatcher.List(opts) return r.listerWatcher.List(opts)
})) }))
if r.WatchListPageSize != 0 { switch {
case r.WatchListPageSize != 0:
pager.PageSize = r.WatchListPageSize pager.PageSize = r.WatchListPageSize
} else { case r.paginatedResult:
// We got a paginated result initially. Assume this resource and server honor
// paging requests (i.e. watch cache is probably disabled) and leave the default
// pager size set.
case options.ResourceVersion != "" && options.ResourceVersion != "0":
// User didn't explicitly request pagination. // User didn't explicitly request pagination.
if options.ResourceVersion != "" && options.ResourceVersion != "0" { //
// We also don't turn off pagination for ResourceVersion="0", since watch cache // With ResourceVersion != "", we have a possibility to list from watch cache,
// is ignoring Limit in that case anyway, and if watchcache is not enabled we // but we do that (for ResourceVersion != "0") only if Limit is unset.
// don't introduce regression. // To avoid thundering herd on etcd (e.g. on master upgrades), we explicitly
// switch off pagination to force listing from watch cache (if enabled).
// With ResourceVersion != "", we have a possibility to list from watch cache, // With the existing semantic of RV (result is at least as fresh as provided RV),
// but we do that (for ResourceVersion != "0") only if Limit is unset. // this is correct and doesn't lead to going back in time.
// To avoid thundering herd on etcd (e.g. on master upgrades), we explicitly //
// switch off pagination to force listing from watch cache (if enabled). // We also don't turn off pagination for ResourceVersion="0", since watch cache
// With the existing semantic of RV (result is at least as fresh as provided RV), // is ignoring Limit in that case anyway, and if watch cache is not enabled
// this is correct and doesn't lead to going back in time. // we don't introduce regression.
pager.PageSize = 0 pager.PageSize = 0
}
} }
list, err = pager.List(context.Background(), options) list, paginatedResult, err = pager.List(context.Background(), options)
if isExpiredError(err) { if isExpiredError(err) {
r.setIsLastSyncResourceVersionExpired(true) r.setIsLastSyncResourceVersionExpired(true)
// Retry immediately if the resource version used to list is expired. // Retry immediately if the resource version used to list is expired.
@ -250,7 +258,7 @@ func (r *Reflector) ListAndWatch(stopCh <-chan struct{}) error {
// continuation pages, but the pager might not be enabled, or the full list might fail because the // continuation pages, but the pager might not be enabled, or the full list might fail because the
// resource version it is listing at is expired, so we need to fallback to resourceVersion="" in all // resource version it is listing at is expired, so we need to fallback to resourceVersion="" in all
// to recover and ensure the reflector makes forward progress. // to recover and ensure the reflector makes forward progress.
list, err = pager.List(context.Background(), metav1.ListOptions{ResourceVersion: r.relistResourceVersion()}) list, paginatedResult, err = pager.List(context.Background(), metav1.ListOptions{ResourceVersion: r.relistResourceVersion()})
} }
close(listCh) close(listCh)
}() }()
@ -264,6 +272,21 @@ func (r *Reflector) ListAndWatch(stopCh <-chan struct{}) error {
if err != nil { if err != nil {
return fmt.Errorf("%s: Failed to list %v: %v", r.name, r.expectedTypeName, err) return fmt.Errorf("%s: Failed to list %v: %v", r.name, r.expectedTypeName, err)
} }
// We check if the list was paginated and if so set the paginatedResult based on that.
// However, we want to do that only for the initial list (which is the only case
// when we set ResourceVersion="0"). The reasoning behind it is that later, in some
// situations we may force listing directly from etcd (by setting ResourceVersion="")
// which will return paginated result, even if watch cache is enabled. However, in
// that case, we still want to prefer sending requests to watch cache if possible.
//
// Paginated result returned for request with ResourceVersion="0" mean that watch
// cache is disabled and there are a lot of objects of a given type. In such case,
// there is no need to prefer listing from watch cache.
if options.ResourceVersion == "0" && paginatedResult {
r.paginatedResult = true
}
r.setIsLastSyncResourceVersionExpired(false) // list was successful r.setIsLastSyncResourceVersionExpired(false) // list was successful
initTrace.Step("Objects listed") initTrace.Step("Objects listed")
listMetaInterface, err := meta.ListAccessor(list) listMetaInterface, err := meta.ListAccessor(list)

View File

@ -472,6 +472,57 @@ func TestReflectorNotPaginatingNotConsistentReads(t *testing.T) {
} }
} }
func TestReflectorPaginatingNonConsistentReadsIfWatchCacheDisabled(t *testing.T) {
var stopCh chan struct{}
s := NewStore(MetaNamespaceKeyFunc)
lw := &testLW{
WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
// Stop once the reflector begins watching since we're only interested in the list.
close(stopCh)
fw := watch.NewFake()
return fw, nil
},
ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
// Check that default pager limit is set.
if options.Limit != 500 {
t.Fatalf("Expected list Limit of 500 but got %d", options.Limit)
}
pods := make([]v1.Pod, 10)
for i := 0; i < 10; i++ {
pods[i] = v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: fmt.Sprintf("pod-%d", i), ResourceVersion: fmt.Sprintf("%d", i)}}
}
switch options.Continue {
case "":
return &v1.PodList{ListMeta: metav1.ListMeta{ResourceVersion: "10", Continue: "C1"}, Items: pods[0:4]}, nil
case "C1":
return &v1.PodList{ListMeta: metav1.ListMeta{ResourceVersion: "10", Continue: "C2"}, Items: pods[4:8]}, nil
case "C2":
return &v1.PodList{ListMeta: metav1.ListMeta{ResourceVersion: "10"}, Items: pods[8:10]}, nil
default:
t.Fatalf("Unrecognized continue: %s", options.Continue)
}
return nil, nil
},
}
r := NewReflector(lw, &v1.Pod{}, s, 0)
// Initial list should initialize paginatedResult in the reflector.
stopCh = make(chan struct{})
r.ListAndWatch(stopCh)
if results := s.List(); len(results) != 10 {
t.Errorf("Expected 10 results, got %d", len(results))
}
// Since initial list for ResourceVersion="0" was paginated, the subsequent
// ones should also be paginated.
stopCh = make(chan struct{})
r.ListAndWatch(stopCh)
if results := s.List(); len(results) != 10 {
t.Errorf("Expected 10 results, got %d", len(results))
}
}
// TestReflectorResyncWithResourceVersion ensures that a reflector keeps track of the ResourceVersion and sends // TestReflectorResyncWithResourceVersion ensures that a reflector keeps track of the ResourceVersion and sends
// it in relist requests to prevent the reflector from traveling back in time if the relist is to a api-server or // it in relist requests to prevent the reflector from traveling back in time if the relist is to a api-server or
// etcd that is partitioned and serving older data than the reflector has already processed. // etcd that is partitioned and serving older data than the reflector has already processed.

View File

@ -73,16 +73,18 @@ func New(fn ListPageFunc) *ListPager {
// List returns a single list object, but attempts to retrieve smaller chunks from the // List returns a single list object, but attempts to retrieve smaller chunks from the
// server to reduce the impact on the server. If the chunk attempt fails, it will load // server to reduce the impact on the server. If the chunk attempt fails, it will load
// the full list instead. The Limit field on options, if unset, will default to the page size. // the full list instead. The Limit field on options, if unset, will default to the page size.
func (p *ListPager) List(ctx context.Context, options metav1.ListOptions) (runtime.Object, error) { func (p *ListPager) List(ctx context.Context, options metav1.ListOptions) (runtime.Object, bool, error) {
if options.Limit == 0 { if options.Limit == 0 {
options.Limit = p.PageSize options.Limit = p.PageSize
} }
requestedResourceVersion := options.ResourceVersion requestedResourceVersion := options.ResourceVersion
var list *metainternalversion.List var list *metainternalversion.List
paginatedResult := false
for { for {
select { select {
case <-ctx.Done(): case <-ctx.Done():
return nil, ctx.Err() return nil, paginatedResult, ctx.Err()
default: default:
} }
@ -93,23 +95,24 @@ func (p *ListPager) List(ctx context.Context, options metav1.ListOptions) (runti
// failing when the resource versions is established by the first page request falls out of the compaction // failing when the resource versions is established by the first page request falls out of the compaction
// during the subsequent list requests). // during the subsequent list requests).
if !errors.IsResourceExpired(err) || !p.FullListIfExpired || options.Continue == "" { if !errors.IsResourceExpired(err) || !p.FullListIfExpired || options.Continue == "" {
return nil, err return nil, paginatedResult, err
} }
// the list expired while we were processing, fall back to a full list at // the list expired while we were processing, fall back to a full list at
// the requested ResourceVersion. // the requested ResourceVersion.
options.Limit = 0 options.Limit = 0
options.Continue = "" options.Continue = ""
options.ResourceVersion = requestedResourceVersion options.ResourceVersion = requestedResourceVersion
return p.PageFn(ctx, options) result, err := p.PageFn(ctx, options)
return result, paginatedResult, err
} }
m, err := meta.ListAccessor(obj) m, err := meta.ListAccessor(obj)
if err != nil { if err != nil {
return nil, fmt.Errorf("returned object must be a list: %v", err) return nil, paginatedResult, fmt.Errorf("returned object must be a list: %v", err)
} }
// exit early and return the object we got if we haven't processed any pages // exit early and return the object we got if we haven't processed any pages
if len(m.GetContinue()) == 0 && list == nil { if len(m.GetContinue()) == 0 && list == nil {
return obj, nil return obj, paginatedResult, nil
} }
// initialize the list and fill its contents // initialize the list and fill its contents
@ -122,12 +125,12 @@ func (p *ListPager) List(ctx context.Context, options metav1.ListOptions) (runti
list.Items = append(list.Items, obj) list.Items = append(list.Items, obj)
return nil return nil
}); err != nil { }); err != nil {
return nil, err return nil, paginatedResult, err
} }
// if we have no more items, return the list // if we have no more items, return the list
if len(m.GetContinue()) == 0 { if len(m.GetContinue()) == 0 {
return list, nil return list, paginatedResult, nil
} }
// set the next loop up // set the next loop up
@ -136,6 +139,8 @@ func (p *ListPager) List(ctx context.Context, options metav1.ListOptions) (runti
// `specifying resource version is not allowed when using continue` error. // `specifying resource version is not allowed when using continue` error.
// See https://github.com/kubernetes/kubernetes/issues/85221#issuecomment-553748143. // See https://github.com/kubernetes/kubernetes/issues/85221#issuecomment-553748143.
options.ResourceVersion = "" options.ResourceVersion = ""
// At this point, result is already paginated.
paginatedResult = true
} }
} }

View File

@ -120,6 +120,7 @@ func (p *testPager) ExpiresOnSecondPageThenFullList(ctx context.Context, options
} }
return p.PagedList(ctx, options) return p.PagedList(ctx, options)
} }
func TestListPager_List(t *testing.T) { func TestListPager_List(t *testing.T) {
type fields struct { type fields struct {
PageSize int64 PageSize int64
@ -135,43 +136,50 @@ func TestListPager_List(t *testing.T) {
fields fields fields fields
args args args args
want runtime.Object want runtime.Object
wantPaged bool
wantErr bool wantErr bool
isExpired bool isExpired bool
}{ }{
{ {
name: "empty page", name: "empty page",
fields: fields{PageSize: 10, PageFn: (&testPager{t: t, expectPage: 10, remaining: 0, rv: "rv:20"}).PagedList}, fields: fields{PageSize: 10, PageFn: (&testPager{t: t, expectPage: 10, remaining: 0, rv: "rv:20"}).PagedList},
args: args{}, args: args{},
want: list(0, "rv:20"), want: list(0, "rv:20"),
wantPaged: false,
}, },
{ {
name: "one page", name: "one page",
fields: fields{PageSize: 10, PageFn: (&testPager{t: t, expectPage: 10, remaining: 9, rv: "rv:20"}).PagedList}, fields: fields{PageSize: 10, PageFn: (&testPager{t: t, expectPage: 10, remaining: 9, rv: "rv:20"}).PagedList},
args: args{}, args: args{},
want: list(9, "rv:20"), want: list(9, "rv:20"),
wantPaged: false,
}, },
{ {
name: "one full page", name: "one full page",
fields: fields{PageSize: 10, PageFn: (&testPager{t: t, expectPage: 10, remaining: 10, rv: "rv:20"}).PagedList}, fields: fields{PageSize: 10, PageFn: (&testPager{t: t, expectPage: 10, remaining: 10, rv: "rv:20"}).PagedList},
args: args{}, args: args{},
want: list(10, "rv:20"), want: list(10, "rv:20"),
wantPaged: false,
}, },
{ {
name: "two pages", name: "two pages",
fields: fields{PageSize: 10, PageFn: (&testPager{t: t, expectPage: 10, remaining: 11, rv: "rv:20"}).PagedList}, fields: fields{PageSize: 10, PageFn: (&testPager{t: t, expectPage: 10, remaining: 11, rv: "rv:20"}).PagedList},
args: args{}, args: args{},
want: list(11, "rv:20"), want: list(11, "rv:20"),
wantPaged: true,
}, },
{ {
name: "three pages", name: "three pages",
fields: fields{PageSize: 10, PageFn: (&testPager{t: t, expectPage: 10, remaining: 21, rv: "rv:20"}).PagedList}, fields: fields{PageSize: 10, PageFn: (&testPager{t: t, expectPage: 10, remaining: 21, rv: "rv:20"}).PagedList},
args: args{}, args: args{},
want: list(21, "rv:20"), want: list(21, "rv:20"),
wantPaged: true,
}, },
{ {
name: "expires on second page", name: "expires on second page",
fields: fields{PageSize: 10, PageFn: (&testPager{t: t, expectPage: 10, remaining: 21, rv: "rv:20"}).ExpiresOnSecondPage}, fields: fields{PageSize: 10, PageFn: (&testPager{t: t, expectPage: 10, remaining: 21, rv: "rv:20"}).ExpiresOnSecondPage},
args: args{}, args: args{},
wantPaged: true,
wantErr: true, wantErr: true,
isExpired: true, isExpired: true,
}, },
@ -182,14 +190,16 @@ func TestListPager_List(t *testing.T) {
PageSize: 10, PageSize: 10,
PageFn: (&testPager{t: t, expectPage: 10, remaining: 21, rv: "rv:20"}).ExpiresOnSecondPageThenFullList, PageFn: (&testPager{t: t, expectPage: 10, remaining: 21, rv: "rv:20"}).ExpiresOnSecondPageThenFullList,
}, },
args: args{}, args: args{},
want: list(21, "rv:20"), want: list(21, "rv:20"),
wantPaged: true,
}, },
{ {
name: "two pages with resourceVersion", name: "two pages with resourceVersion",
fields: fields{PageSize: 10, PageFn: (&testPager{t: t, expectPage: 10, remaining: 11, rv: "rv:20"}).PagedList}, fields: fields{PageSize: 10, PageFn: (&testPager{t: t, expectPage: 10, remaining: 11, rv: "rv:20"}).PagedList},
args: args{options: metav1.ListOptions{ResourceVersion: "rv:10"}}, args: args{options: metav1.ListOptions{ResourceVersion: "rv:10"}},
want: list(11, "rv:20"), want: list(11, "rv:20"),
wantPaged: true,
}, },
} }
for _, tt := range tests { for _, tt := range tests {
@ -203,7 +213,7 @@ func TestListPager_List(t *testing.T) {
if ctx == nil { if ctx == nil {
ctx = context.Background() ctx = context.Background()
} }
got, err := p.List(ctx, tt.args.options) got, paginatedResult, err := p.List(ctx, tt.args.options)
if (err != nil) != tt.wantErr { if (err != nil) != tt.wantErr {
t.Errorf("ListPager.List() error = %v, wantErr %v", err, tt.wantErr) t.Errorf("ListPager.List() error = %v, wantErr %v", err, tt.wantErr)
return return
@ -212,6 +222,9 @@ func TestListPager_List(t *testing.T) {
t.Errorf("ListPager.List() error = %v, isExpired %v", err, tt.isExpired) t.Errorf("ListPager.List() error = %v, isExpired %v", err, tt.isExpired)
return return
} }
if tt.wantPaged != paginatedResult {
t.Errorf("paginatedResult = %t, want %t", paginatedResult, tt.wantPaged)
}
if !reflect.DeepEqual(got, tt.want) { if !reflect.DeepEqual(got, tt.want) {
t.Errorf("ListPager.List() = %v, want %v", got, tt.want) t.Errorf("ListPager.List() = %v, want %v", got, tt.want)
} }