From d858f18787fe3ef437d803a1548a0296f4eca14d Mon Sep 17 00:00:00 2001 From: Joe Betz Date: Thu, 14 Mar 2019 13:26:19 -0700 Subject: [PATCH] Paginate List call performed by Reflector's ListAndWatch call Kubernetes-commit: 84723c2d3ef5ff5d30aadd1ad72068bf2254358c --- tools/cache/reflector.go | 15 +++++++++++- tools/cache/reflector_test.go | 45 ++++++++++++++++++++++++++++++++++- 2 files changed, 58 insertions(+), 2 deletions(-) diff --git a/tools/cache/reflector.go b/tools/cache/reflector.go index a629fd4e..21bba3ae 100644 --- a/tools/cache/reflector.go +++ b/tools/cache/reflector.go @@ -17,6 +17,7 @@ limitations under the License. package cache import ( + "context" "errors" "fmt" "io" @@ -38,6 +39,7 @@ import ( utilruntime "k8s.io/apimachinery/pkg/util/runtime" "k8s.io/apimachinery/pkg/util/wait" "k8s.io/apimachinery/pkg/watch" + "k8s.io/client-go/tools/pager" "k8s.io/klog" "k8s.io/utils/trace" ) @@ -68,6 +70,9 @@ type Reflector struct { lastSyncResourceVersion string // lastSyncResourceVersionMutex guards read/write access to lastSyncResourceVersion lastSyncResourceVersionMutex sync.RWMutex + // WatchListPageSize is the requested chunk size of initial and resync watch lists. + // Defaults to pager.PageSize. + WatchListPageSize int64 } var ( @@ -179,7 +184,15 @@ func (r *Reflector) ListAndWatch(stopCh <-chan struct{}) error { panicCh <- r } }() - list, err = r.listerWatcher.List(options) + // Attempt to gather list in chunks, if supported by listerWatcher, if not, the first + // list request will return the full response. + pager := pager.New(pager.SimplePageFunc(func(opts metav1.ListOptions) (runtime.Object, error) { + return r.listerWatcher.List(opts) + })) + if r.WatchListPageSize != 0 { + pager.PageSize = r.WatchListPageSize + } + list, err = pager.List(context.Background(), options) close(listCh) }() select { diff --git a/tools/cache/reflector_test.go b/tools/cache/reflector_test.go index bb06059f..caa07232 100644 --- a/tools/cache/reflector_test.go +++ b/tools/cache/reflector_test.go @@ -24,7 +24,7 @@ import ( "testing" "time" - "k8s.io/api/core/v1" + v1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/util/wait" @@ -387,3 +387,46 @@ func TestReflectorResync(t *testing.T) { t.Errorf("exactly 2 iterations were expected, got: %v", iteration) } } + +func TestReflectorWatchListPageSize(t *testing.T) { + stopCh := make(chan struct{}) + s := NewStore(MetaNamespaceKeyFunc) + + lw := &testLW{ + WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) { + // Stop once the reflector begins watching since we're only interested in the list. + close(stopCh) + fw := watch.NewFake() + return fw, nil + }, + ListFunc: func(options metav1.ListOptions) (runtime.Object, error) { + if options.Limit != 4 { + t.Fatalf("Expected list Limit of 4 but got %d", options.Limit) + } + pods := make([]v1.Pod, 10) + for i := 0; i < 10; i++ { + pods[i] = v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: fmt.Sprintf("pod-%d", i), ResourceVersion: fmt.Sprintf("%d", i)}} + } + switch options.Continue { + case "": + return &v1.PodList{ListMeta: metav1.ListMeta{ResourceVersion: "10", Continue: "C1"}, Items: pods[0:4]}, nil + case "C1": + return &v1.PodList{ListMeta: metav1.ListMeta{ResourceVersion: "10", Continue: "C2"}, Items: pods[4:8]}, nil + case "C2": + return &v1.PodList{ListMeta: metav1.ListMeta{ResourceVersion: "10"}, Items: pods[8:10]}, nil + default: + t.Fatalf("Unrecognized continue: %s", options.Continue) + } + return nil, nil + }, + } + r := NewReflector(lw, &v1.Pod{}, s, 0) + // Set the reflector to paginate the list request in 4 item chunks. + r.WatchListPageSize = 4 + r.ListAndWatch(stopCh) + + results := s.List() + if len(results) != 10 { + t.Errorf("Expected 10 results, got %d", len(results)) + } +}