From f549520368f096b850e1d01a53705d921938a929 Mon Sep 17 00:00:00 2001 From: Clayton Coleman Date: Sun, 3 Sep 2017 15:04:54 -0400 Subject: [PATCH] Disable default paging in list watches For 1.8 this will be off by default. In 1.9 it will be on by default. Add tests and rename some fields to use the `chunking` terminology. Note that the pager may be used for other things besides chunking. Kubernetes-commit: 8b571bb63bd8a9a6a37db6046a6ab35d3b047bf4 --- tools/cache/listwatch.go | 12 ++- tools/pager/BUILD | 15 +++ tools/pager/pager.go | 8 +- tools/pager/pager_test.go | 206 ++++++++++++++++++++++++++++++++++++++ 4 files changed, 235 insertions(+), 6 deletions(-) create mode 100644 tools/pager/pager_test.go diff --git a/tools/cache/listwatch.go b/tools/cache/listwatch.go index cab48ae0..55a90b63 100644 --- a/tools/cache/listwatch.go +++ b/tools/cache/listwatch.go @@ -49,9 +49,11 @@ type WatchFunc func(options metav1.ListOptions) (watch.Interface, error) // It is a convenience function for users of NewReflector, etc. // ListFunc and WatchFunc must not be nil type ListWatch struct { - ListFunc ListFunc - WatchFunc WatchFunc - DisablePaging bool + ListFunc ListFunc + WatchFunc WatchFunc + // DisableChunking requests no chunking for this list watcher. It has no effect in Kubernetes 1.8, but in + // 1.9 will allow a controller to opt out of chunking. + DisableChunking bool } // Getter interface knows how to access Get method from RESTClient. @@ -91,7 +93,9 @@ func timeoutFromListOptions(options metav1.ListOptions) time.Duration { // List a set of apiserver resources func (lw *ListWatch) List(options metav1.ListOptions) (runtime.Object, error) { - if !lw.DisablePaging { + // chunking will become the default for list watchers starting in Kubernetes 1.9, unless + // otherwise disabled. + if false && !lw.DisableChunking { return pager.New(pager.SimplePageFunc(lw.ListFunc)).List(context.TODO(), options) } return lw.ListFunc(options) diff --git a/tools/pager/BUILD b/tools/pager/BUILD index fa07ba8c..e7b95c40 100644 --- a/tools/pager/BUILD +++ b/tools/pager/BUILD @@ -5,6 +5,7 @@ licenses(["notice"]) load( "@io_bazel_rules_go//go:def.bzl", "go_library", + "go_test", ) go_library( @@ -34,3 +35,17 @@ filegroup( tags = ["automanaged"], visibility = ["//visibility:public"], ) + +go_test( + name = "go_default_test", + srcs = ["pager_test.go"], + library = ":go_default_library", + deps = [ + "//vendor/golang.org/x/net/context:go_default_library", + "//vendor/k8s.io/apimachinery/pkg/api/errors:go_default_library", + "//vendor/k8s.io/apimachinery/pkg/apis/meta/internalversion:go_default_library", + "//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library", + "//vendor/k8s.io/apimachinery/pkg/apis/meta/v1alpha1:go_default_library", + "//vendor/k8s.io/apimachinery/pkg/runtime:go_default_library", + ], +) diff --git a/tools/pager/pager.go b/tools/pager/pager.go index a4a04cdc..2e0874e0 100644 --- a/tools/pager/pager.go +++ b/tools/pager/pager.go @@ -52,7 +52,8 @@ type ListPager struct { } // New creates a new pager from the provided pager function using the default -// options. +// options. It will fall back to a full list if an expiration error is encountered +// as a last resort. func New(fn ListPageFunc) *ListPager { return &ListPager{ PageSize: defaultPageSize, @@ -61,9 +62,12 @@ func New(fn ListPageFunc) *ListPager { } } +// TODO: introduce other types of paging functions - such as those that retrieve from a list +// of namespaces. + // List returns a single list object, but attempts to retrieve smaller chunks from the // server to reduce the impact on the server. If the chunk attempt fails, it will load -// the full list instead. +// the full list instead. The Limit field on options, if unset, will default to the page size. func (p *ListPager) List(ctx context.Context, options metav1.ListOptions) (runtime.Object, error) { if options.Limit == 0 { options.Limit = p.PageSize diff --git a/tools/pager/pager_test.go b/tools/pager/pager_test.go new file mode 100644 index 00000000..6e3e9444 --- /dev/null +++ b/tools/pager/pager_test.go @@ -0,0 +1,206 @@ +/* +Copyright 2017 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package pager + +import ( + "fmt" + "reflect" + "testing" + + "golang.org/x/net/context" + "k8s.io/apimachinery/pkg/api/errors" + metainternalversion "k8s.io/apimachinery/pkg/apis/meta/internalversion" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + metav1alpha1 "k8s.io/apimachinery/pkg/apis/meta/v1alpha1" + "k8s.io/apimachinery/pkg/runtime" +) + +func list(count int, rv string) *metainternalversion.List { + var list metainternalversion.List + for i := 0; i < count; i++ { + list.Items = append(list.Items, &metav1alpha1.PartialObjectMetadata{ + ObjectMeta: metav1.ObjectMeta{ + Name: fmt.Sprintf("%d", i), + }, + }) + } + list.ResourceVersion = rv + return &list +} + +type testPager struct { + t *testing.T + rv string + index int + remaining int + last int + continuing bool + done bool + expectPage int64 +} + +func (p *testPager) reset() { + p.continuing = false + p.remaining += p.index + p.index = 0 + p.last = 0 + p.done = false +} + +func (p *testPager) PagedList(ctx context.Context, options metav1.ListOptions) (runtime.Object, error) { + if p.done { + p.t.Errorf("did not expect additional call to paged list") + return nil, fmt.Errorf("unexpected list call") + } + expectedContinue := fmt.Sprintf("%s:%d", p.rv, p.last) + if options.Limit != p.expectPage || (p.continuing && options.Continue != expectedContinue) { + p.t.Errorf("invariant violated, expected limit %d and continue %s, got %#v", p.expectPage, expectedContinue, options) + return nil, fmt.Errorf("invariant violated") + } + var list metainternalversion.List + total := options.Limit + if total == 0 { + total = int64(p.remaining) + } + for i := int64(0); i < total; i++ { + if p.remaining <= 0 { + break + } + list.Items = append(list.Items, &metav1alpha1.PartialObjectMetadata{ + ObjectMeta: metav1.ObjectMeta{ + Name: fmt.Sprintf("%d", p.index), + }, + }) + p.remaining-- + p.index++ + } + p.last = p.index + if p.remaining > 0 { + list.Continue = fmt.Sprintf("%s:%d", p.rv, p.last) + p.continuing = true + } else { + p.done = true + } + list.ResourceVersion = p.rv + return &list, nil +} + +func (p *testPager) ExpiresOnSecondPage(ctx context.Context, options metav1.ListOptions) (runtime.Object, error) { + if p.continuing { + p.done = true + return nil, errors.NewResourceExpired("this list has expired") + } + return p.PagedList(ctx, options) +} + +func (p *testPager) ExpiresOnSecondPageThenFullList(ctx context.Context, options metav1.ListOptions) (runtime.Object, error) { + if p.continuing { + p.reset() + p.expectPage = 0 + return nil, errors.NewResourceExpired("this list has expired") + } + return p.PagedList(ctx, options) +} + +func TestListPager_List(t *testing.T) { + type fields struct { + PageSize int64 + PageFn ListPageFunc + FullListIfExpired bool + } + type args struct { + ctx context.Context + options metav1.ListOptions + } + tests := []struct { + name string + fields fields + args args + want runtime.Object + wantErr bool + isExpired bool + }{ + { + name: "empty page", + fields: fields{PageSize: 10, PageFn: (&testPager{t: t, expectPage: 10, remaining: 0, rv: "rv:20"}).PagedList}, + args: args{}, + want: list(0, "rv:20"), + }, + { + name: "one page", + fields: fields{PageSize: 10, PageFn: (&testPager{t: t, expectPage: 10, remaining: 9, rv: "rv:20"}).PagedList}, + args: args{}, + want: list(9, "rv:20"), + }, + { + name: "one full page", + fields: fields{PageSize: 10, PageFn: (&testPager{t: t, expectPage: 10, remaining: 10, rv: "rv:20"}).PagedList}, + args: args{}, + want: list(10, "rv:20"), + }, + { + name: "two pages", + fields: fields{PageSize: 10, PageFn: (&testPager{t: t, expectPage: 10, remaining: 11, rv: "rv:20"}).PagedList}, + args: args{}, + want: list(11, "rv:20"), + }, + { + name: "three pages", + fields: fields{PageSize: 10, PageFn: (&testPager{t: t, expectPage: 10, remaining: 21, rv: "rv:20"}).PagedList}, + args: args{}, + want: list(21, "rv:20"), + }, + { + name: "expires on second page", + fields: fields{PageSize: 10, PageFn: (&testPager{t: t, expectPage: 10, remaining: 21, rv: "rv:20"}).ExpiresOnSecondPage}, + args: args{}, + wantErr: true, + isExpired: true, + }, + { + name: "expires on second page and then lists", + fields: fields{ + FullListIfExpired: true, + PageSize: 10, + PageFn: (&testPager{t: t, expectPage: 10, remaining: 21, rv: "rv:20"}).ExpiresOnSecondPageThenFullList, + }, + args: args{}, + want: list(21, "rv:20"), + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + p := &ListPager{ + PageSize: tt.fields.PageSize, + PageFn: tt.fields.PageFn, + FullListIfExpired: tt.fields.FullListIfExpired, + } + got, err := p.List(tt.args.ctx, tt.args.options) + if (err != nil) != tt.wantErr { + t.Errorf("ListPager.List() error = %v, wantErr %v", err, tt.wantErr) + return + } + if tt.isExpired != errors.IsResourceExpired(err) { + t.Errorf("ListPager.List() error = %v, isExpired %v", err, tt.isExpired) + return + } + if !reflect.DeepEqual(got, tt.want) { + t.Errorf("ListPager.List() = %v, want %v", got, tt.want) + } + }) + } +}