From 7b48f37a96c8c89d9ce188011248a9631f3a04d6 Mon Sep 17 00:00:00 2001 From: Clayton Coleman Date: Mon, 17 Jul 2017 14:31:11 -0400 Subject: [PATCH 1/3] Add a new paging utility for client side ranging Kubernetes-commit: fb68d1d3a7bfb69f3884db6d360816fb2e7eda1e --- tools/pager/BUILD | 36 +++++++++++++++ tools/pager/pager.go | 103 +++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 139 insertions(+) create mode 100644 tools/pager/BUILD create mode 100644 tools/pager/pager.go diff --git a/tools/pager/BUILD b/tools/pager/BUILD new file mode 100644 index 00000000..fa07ba8c --- /dev/null +++ b/tools/pager/BUILD @@ -0,0 +1,36 @@ +package(default_visibility = ["//visibility:public"]) + +licenses(["notice"]) + +load( + "@io_bazel_rules_go//go:def.bzl", + "go_library", +) + +go_library( + name = "go_default_library", + srcs = ["pager.go"], + tags = ["automanaged"], + deps = [ + "//vendor/golang.org/x/net/context:go_default_library", + "//vendor/k8s.io/apimachinery/pkg/api/errors:go_default_library", + "//vendor/k8s.io/apimachinery/pkg/api/meta:go_default_library", + "//vendor/k8s.io/apimachinery/pkg/apis/meta/internalversion:go_default_library", + "//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library", + "//vendor/k8s.io/apimachinery/pkg/runtime:go_default_library", + ], +) + +filegroup( + name = "package-srcs", + srcs = glob(["**"]), + tags = ["automanaged"], + visibility = ["//visibility:private"], +) + +filegroup( + name = "all-srcs", + srcs = [":package-srcs"], + tags = ["automanaged"], + visibility = ["//visibility:public"], +) diff --git a/tools/pager/pager.go b/tools/pager/pager.go new file mode 100644 index 00000000..57d9632d --- /dev/null +++ b/tools/pager/pager.go @@ -0,0 +1,103 @@ +/* +Copyright 2017 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package pager + +import ( + "fmt" + + "golang.org/x/net/context" + + "k8s.io/apimachinery/pkg/api/errors" + "k8s.io/apimachinery/pkg/api/meta" + metainternalversion "k8s.io/apimachinery/pkg/apis/meta/internalversion" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" +) + +const defaultPageSize = 500 + +type ListPageFunc func(ctx context.Context, opts metav1.ListOptions) (runtime.Object, error) + +func SimplePageFunc(fn func(opts metav1.ListOptions) (runtime.Object, error)) ListPageFunc { + return func(ctx context.Context, opts metav1.ListOptions) (runtime.Object, error) { + return fn(opts) + } +} + +type ListPager struct { + PageSize int64 + PageFn ListPageFunc + + FullListIfExpired bool +} + +func New(fn ListPageFunc) *ListPager { + return &ListPager{ + PageSize: defaultPageSize, + PageFn: fn, + FullListIfExpired: true, + } +} + +func (p *ListPager) List(ctx context.Context, options metav1.ListOptions) (runtime.Object, error) { + if options.Limit == 0 { + options.Limit = p.PageSize + } + var list *metainternalversion.List + for { + obj, err := p.PageFn(ctx, options) + if err != nil { + if !errors.IsResourceExpired(err) || !p.FullListIfExpired { + return nil, err + } + // the list expired while we were processing, fall back to a full list + options.Limit = 0 + options.Continue = "" + return p.PageFn(ctx, options) + } + m, err := meta.ListAccessor(obj) + if err != nil { + return nil, fmt.Errorf("returned object must be a list: %v", err) + } + + // exit early and return the object we got if we haven't processed any pages + if len(m.GetContinue()) == 0 && list == nil { + return obj, nil + } + + // initialize the list and fill its contents + if list == nil { + list = &metainternalversion.List{Items: make([]runtime.Object, 0, options.Limit+1)} + list.ResourceVersion = m.GetResourceVersion() + list.SelfLink = m.GetSelfLink() + } + if err := meta.EachListItem(obj, func(obj runtime.Object) error { + list.Items = append(list.Items, obj) + return nil + }); err != nil { + return nil, err + } + + // if we have no more items, return the list + if len(m.GetContinue()) == 0 { + return list, nil + } + + // set the next loop up + options.Continue = m.GetContinue() + } +} From 2b76a1826e598223e2deac1ee549e1e22803fc6b Mon Sep 17 00:00:00 2001 From: Clayton Coleman Date: Mon, 17 Jul 2017 22:53:14 -0400 Subject: [PATCH 2/3] Enable paging for all list watchers Kubernetes-commit: 500b130ff0a2c744b21cfb8e6d09e94b707dec61 --- tools/cache/BUILD | 2 ++ tools/cache/listwatch.go | 11 +++++++++-- 2 files changed, 11 insertions(+), 2 deletions(-) diff --git a/tools/cache/BUILD b/tools/cache/BUILD index e79cce62..3531eba7 100644 --- a/tools/cache/BUILD +++ b/tools/cache/BUILD @@ -63,6 +63,7 @@ go_library( ], deps = [ "//vendor/github.com/golang/glog:go_default_library", + "//vendor/golang.org/x/net/context:go_default_library", "//vendor/k8s.io/apimachinery/pkg/api/errors:go_default_library", "//vendor/k8s.io/apimachinery/pkg/api/meta:go_default_library", "//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library", @@ -79,6 +80,7 @@ go_library( "//vendor/k8s.io/apimachinery/pkg/watch:go_default_library", "//vendor/k8s.io/client-go/kubernetes/scheme:go_default_library", "//vendor/k8s.io/client-go/rest:go_default_library", + "//vendor/k8s.io/client-go/tools/pager:go_default_library", ], ) diff --git a/tools/cache/listwatch.go b/tools/cache/listwatch.go index 4c976533..cab48ae0 100644 --- a/tools/cache/listwatch.go +++ b/tools/cache/listwatch.go @@ -19,12 +19,15 @@ package cache import ( "time" + "golang.org/x/net/context" + "k8s.io/apimachinery/pkg/api/meta" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/fields" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/watch" restclient "k8s.io/client-go/rest" + "k8s.io/client-go/tools/pager" ) // ListerWatcher is any object that knows how to perform an initial list and start a watch on a resource. @@ -46,8 +49,9 @@ type WatchFunc func(options metav1.ListOptions) (watch.Interface, error) // It is a convenience function for users of NewReflector, etc. // ListFunc and WatchFunc must not be nil type ListWatch struct { - ListFunc ListFunc - WatchFunc WatchFunc + ListFunc ListFunc + WatchFunc WatchFunc + DisablePaging bool } // Getter interface knows how to access Get method from RESTClient. @@ -87,6 +91,9 @@ func timeoutFromListOptions(options metav1.ListOptions) time.Duration { // List a set of apiserver resources func (lw *ListWatch) List(options metav1.ListOptions) (runtime.Object, error) { + if !lw.DisablePaging { + return pager.New(pager.SimplePageFunc(lw.ListFunc)).List(context.TODO(), options) + } return lw.ListFunc(options) } From 6adf847055c0f0630e536d9bd3aebd0818e2e4b3 Mon Sep 17 00:00:00 2001 From: Clayton Coleman Date: Thu, 10 Aug 2017 22:31:51 -0400 Subject: [PATCH 3/3] Server side implementation of paging for etcd3 Add a feature gate in the apiserver to control whether paging can be used. Add controls to the storage factory that allow it to be disabled per resource. Use a JSON encoded continuation token that can be versioned. Create a 410 error if the continuation token is expired. Adds GetContinue() to ListMeta. Kubernetes-commit: 8952a0cb722b77459cf2701632a30f5b264f5aba --- tools/pager/pager.go | 11 +++++++++++ 1 file changed, 11 insertions(+) diff --git a/tools/pager/pager.go b/tools/pager/pager.go index 57d9632d..a4a04cdc 100644 --- a/tools/pager/pager.go +++ b/tools/pager/pager.go @@ -30,14 +30,20 @@ import ( const defaultPageSize = 500 +// ListPageFunc returns a list object for the given list options. type ListPageFunc func(ctx context.Context, opts metav1.ListOptions) (runtime.Object, error) +// SimplePageFunc adapts a context-less list function into one that accepts a context. func SimplePageFunc(fn func(opts metav1.ListOptions) (runtime.Object, error)) ListPageFunc { return func(ctx context.Context, opts metav1.ListOptions) (runtime.Object, error) { return fn(opts) } } +// ListPager assists client code in breaking large list queries into multiple +// smaller chunks of PageSize or smaller. PageFn is expected to accept a +// metav1.ListOptions that supports paging and return a list. The pager does +// not alter the field or label selectors on the initial options list. type ListPager struct { PageSize int64 PageFn ListPageFunc @@ -45,6 +51,8 @@ type ListPager struct { FullListIfExpired bool } +// New creates a new pager from the provided pager function using the default +// options. func New(fn ListPageFunc) *ListPager { return &ListPager{ PageSize: defaultPageSize, @@ -53,6 +61,9 @@ func New(fn ListPageFunc) *ListPager { } } +// List returns a single list object, but attempts to retrieve smaller chunks from the +// server to reduce the impact on the server. If the chunk attempt fails, it will load +// the full list instead. func (p *ListPager) List(ctx context.Context, options metav1.ListOptions) (runtime.Object, error) { if options.Limit == 0 { options.Limit = p.PageSize