From 84723c2d3ef5ff5d30aadd1ad72068bf2254358c Mon Sep 17 00:00:00 2001 From: Joe Betz Date: Thu, 14 Mar 2019 13:26:19 -0700 Subject: [PATCH] Paginate List call performed by Reflector's ListAndWatch call --- .../apiserver/pkg/storage/cacher/cacher.go | 24 +++++- .../apiserver/pkg/storage/etcd3/store.go | 4 + .../pkg/storage/tests/cacher_test.go | 82 +++++++++++++++++++ .../k8s.io/client-go/tools/cache/reflector.go | 15 +++- .../client-go/tools/cache/reflector_test.go | 45 +++++++++- 5 files changed, 164 insertions(+), 6 deletions(-) diff --git a/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher.go b/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher.go index d20b41a42b1..e5564287d06 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher.go @@ -56,6 +56,12 @@ var ( emptyFunc = func() {} ) +const ( + // storageWatchListPageSize is the cacher's request chunk size of + // initial and resync watch lists to storage. + storageWatchListPageSize = int64(10000) +) + func init() { prometheus.MustRegister(initCounter) } @@ -229,7 +235,7 @@ type Cacher struct { // given configuration. func NewCacherFromConfig(config Config) *Cacher { watchCache := newWatchCache(config.CacheCapacity, config.KeyFunc, config.GetAttrsFunc, config.Versioner) - listerWatcher := newCacherListerWatcher(config.Storage, config.ResourcePrefix, config.NewListFunc) + listerWatcher := NewCacherListerWatcher(config.Storage, config.ResourcePrefix, config.NewListFunc) reflectorName := "storage/cacher.go:" + config.ResourcePrefix obj := config.NewFunc() @@ -240,12 +246,14 @@ func NewCacherFromConfig(config Config) *Cacher { } stopCh := make(chan struct{}) + reflector := cache.NewNamedReflector(reflectorName, listerWatcher, obj, watchCache, 0) + reflector.WatchListPageSize = storageWatchListPageSize cacher := &Cacher{ ready: newReady(), storage: config.Storage, objectType: reflect.TypeOf(obj), watchCache: watchCache, - reflector: cache.NewNamedReflector(reflectorName, listerWatcher, obj, watchCache, 0), + reflector: reflector, versioner: config.Versioner, triggerFunc: config.TriggerPublisherFunc, watcherIdx: 0, @@ -816,7 +824,8 @@ type cacherListerWatcher struct { newListFunc func() runtime.Object } -func newCacherListerWatcher(storage storage.Interface, resourcePrefix string, newListFunc func() runtime.Object) cache.ListerWatcher { +// NewCacherListerWatcher returns a storage.Interface backed ListerWatcher. +func NewCacherListerWatcher(storage storage.Interface, resourcePrefix string, newListFunc func() runtime.Object) cache.ListerWatcher { return &cacherListerWatcher{ storage: storage, resourcePrefix: resourcePrefix, @@ -827,7 +836,14 @@ func newCacherListerWatcher(storage storage.Interface, resourcePrefix string, ne // Implements cache.ListerWatcher interface. func (lw *cacherListerWatcher) List(options metav1.ListOptions) (runtime.Object, error) { list := lw.newListFunc() - if err := lw.storage.List(context.TODO(), lw.resourcePrefix, "", storage.Everything, list); err != nil { + pred := storage.SelectionPredicate{ + Label: labels.Everything(), + Field: fields.Everything(), + Limit: options.Limit, + Continue: options.Continue, + } + + if err := lw.storage.List(context.TODO(), lw.resourcePrefix, "", pred, list); err != nil { return nil, err } return list, nil diff --git a/staging/src/k8s.io/apiserver/pkg/storage/etcd3/store.go b/staging/src/k8s.io/apiserver/pkg/storage/etcd3/store.go index 0667a407c02..34e91361768 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/etcd3/store.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/etcd3/store.go @@ -384,6 +384,8 @@ func (s *store) GuaranteedUpdate( // GetToList implements storage.Interface.GetToList. func (s *store) GetToList(ctx context.Context, key string, resourceVersion string, pred storage.SelectionPredicate, listObj runtime.Object) error { + trace := utiltrace.New(fmt.Sprintf("GetToList etcd3: key=%v, resourceVersion=%s, limit: %d, continue: %s", key, resourceVersion, pred.Limit, pred.Continue)) + defer trace.LogIfLong(500 * time.Millisecond) listPtr, err := meta.GetItemsPtr(listObj) if err != nil { return err @@ -487,6 +489,8 @@ func encodeContinue(key, keyPrefix string, resourceVersion int64) (string, error // List implements storage.Interface.List. func (s *store) List(ctx context.Context, key, resourceVersion string, pred storage.SelectionPredicate, listObj runtime.Object) error { + trace := utiltrace.New(fmt.Sprintf("List etcd3: key=%v, resourceVersion=%s, limit: %d, continue: %s", key, resourceVersion, pred.Limit, pred.Continue)) + defer trace.LogIfLong(500 * time.Millisecond) listPtr, err := meta.GetItemsPtr(listObj) if err != nil { return err diff --git a/staging/src/k8s.io/apiserver/pkg/storage/tests/cacher_test.go b/staging/src/k8s.io/apiserver/pkg/storage/tests/cacher_test.go index beface7a19f..619d505305e 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/tests/cacher_test.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/tests/cacher_test.go @@ -692,3 +692,85 @@ func TestRandomWatchDeliver(t *testing.T) { watched++ } } + +func TestCacherListerWatcher(t *testing.T) { + prefix := "pods" + fn := func() runtime.Object { return &example.PodList{} } + server, store := newEtcdTestStorage(t, prefix) + defer server.Terminate(t) + + podFoo := makeTestPod("foo") + podBar := makeTestPod("bar") + podBaz := makeTestPod("baz") + + _ = updatePod(t, store, podFoo, nil) + _ = updatePod(t, store, podBar, nil) + _ = updatePod(t, store, podBaz, nil) + + lw := cacherstorage.NewCacherListerWatcher(store, prefix, fn) + + obj, err := lw.List(metav1.ListOptions{}) + if err != nil { + t.Fatalf("List failed: %v", err) + } + pl, ok := obj.(*example.PodList) + if !ok { + t.Fatalf("Expected PodList but got %t", pl) + } + if len(pl.Items) != 3 { + t.Errorf("Expected PodList of length 3 but got %d", len(pl.Items)) + } +} + +func TestCacherListerWatcherPagination(t *testing.T) { + prefix := "pods" + fn := func() runtime.Object { return &example.PodList{} } + server, store := newEtcdTestStorage(t, prefix) + defer server.Terminate(t) + + podFoo := makeTestPod("foo") + podBar := makeTestPod("bar") + podBaz := makeTestPod("baz") + + _ = updatePod(t, store, podFoo, nil) + _ = updatePod(t, store, podBar, nil) + _ = updatePod(t, store, podBaz, nil) + + lw := cacherstorage.NewCacherListerWatcher(store, prefix, fn) + + obj1, err := lw.List(metav1.ListOptions{Limit: 2}) + if err != nil { + t.Fatalf("List failed: %v", err) + } + limit1, ok := obj1.(*example.PodList) + if !ok { + t.Fatalf("Expected PodList but got %t", limit1) + } + if len(limit1.Items) != 2 { + t.Errorf("Expected PodList of length 2 but got %d", len(limit1.Items)) + } + if limit1.Continue == "" { + t.Errorf("Expected list to have Continue but got none") + } + obj2, err := lw.List(metav1.ListOptions{Limit: 2, Continue: limit1.Continue}) + if err != nil { + t.Fatalf("List failed: %v", err) + } + limit2, ok := obj2.(*example.PodList) + if !ok { + t.Fatalf("Expected PodList but got %t", limit2) + } + if limit2.Continue != "" { + t.Errorf("Expected list not to have Continue, but got %s", limit1.Continue) + } + + if limit1.Items[0].Name != podBar.Name { + t.Errorf("Expected list1.Items[0] to be %s but got %s", podBar.Name, limit1.Items[0].Name) + } + if limit1.Items[1].Name != podBaz.Name { + t.Errorf("Expected list1.Items[1] to be %s but got %s", podBaz.Name, limit1.Items[1].Name) + } + if limit2.Items[0].Name != podFoo.Name { + t.Errorf("Expected list2.Items[0] to be %s but got %s", podFoo.Name, limit2.Items[0].Name) + } +} diff --git a/staging/src/k8s.io/client-go/tools/cache/reflector.go b/staging/src/k8s.io/client-go/tools/cache/reflector.go index a629fd4e8da..21bba3ae35d 100644 --- a/staging/src/k8s.io/client-go/tools/cache/reflector.go +++ b/staging/src/k8s.io/client-go/tools/cache/reflector.go @@ -17,6 +17,7 @@ limitations under the License. package cache import ( + "context" "errors" "fmt" "io" @@ -38,6 +39,7 @@ import ( utilruntime "k8s.io/apimachinery/pkg/util/runtime" "k8s.io/apimachinery/pkg/util/wait" "k8s.io/apimachinery/pkg/watch" + "k8s.io/client-go/tools/pager" "k8s.io/klog" "k8s.io/utils/trace" ) @@ -68,6 +70,9 @@ type Reflector struct { lastSyncResourceVersion string // lastSyncResourceVersionMutex guards read/write access to lastSyncResourceVersion lastSyncResourceVersionMutex sync.RWMutex + // WatchListPageSize is the requested chunk size of initial and resync watch lists. + // Defaults to pager.PageSize. + WatchListPageSize int64 } var ( @@ -179,7 +184,15 @@ func (r *Reflector) ListAndWatch(stopCh <-chan struct{}) error { panicCh <- r } }() - list, err = r.listerWatcher.List(options) + // Attempt to gather list in chunks, if supported by listerWatcher, if not, the first + // list request will return the full response. + pager := pager.New(pager.SimplePageFunc(func(opts metav1.ListOptions) (runtime.Object, error) { + return r.listerWatcher.List(opts) + })) + if r.WatchListPageSize != 0 { + pager.PageSize = r.WatchListPageSize + } + list, err = pager.List(context.Background(), options) close(listCh) }() select { diff --git a/staging/src/k8s.io/client-go/tools/cache/reflector_test.go b/staging/src/k8s.io/client-go/tools/cache/reflector_test.go index bb06059f7e1..caa0723231f 100644 --- a/staging/src/k8s.io/client-go/tools/cache/reflector_test.go +++ b/staging/src/k8s.io/client-go/tools/cache/reflector_test.go @@ -24,7 +24,7 @@ import ( "testing" "time" - "k8s.io/api/core/v1" + v1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/util/wait" @@ -387,3 +387,46 @@ func TestReflectorResync(t *testing.T) { t.Errorf("exactly 2 iterations were expected, got: %v", iteration) } } + +func TestReflectorWatchListPageSize(t *testing.T) { + stopCh := make(chan struct{}) + s := NewStore(MetaNamespaceKeyFunc) + + lw := &testLW{ + WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) { + // Stop once the reflector begins watching since we're only interested in the list. + close(stopCh) + fw := watch.NewFake() + return fw, nil + }, + ListFunc: func(options metav1.ListOptions) (runtime.Object, error) { + if options.Limit != 4 { + t.Fatalf("Expected list Limit of 4 but got %d", options.Limit) + } + pods := make([]v1.Pod, 10) + for i := 0; i < 10; i++ { + pods[i] = v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: fmt.Sprintf("pod-%d", i), ResourceVersion: fmt.Sprintf("%d", i)}} + } + switch options.Continue { + case "": + return &v1.PodList{ListMeta: metav1.ListMeta{ResourceVersion: "10", Continue: "C1"}, Items: pods[0:4]}, nil + case "C1": + return &v1.PodList{ListMeta: metav1.ListMeta{ResourceVersion: "10", Continue: "C2"}, Items: pods[4:8]}, nil + case "C2": + return &v1.PodList{ListMeta: metav1.ListMeta{ResourceVersion: "10"}, Items: pods[8:10]}, nil + default: + t.Fatalf("Unrecognized continue: %s", options.Continue) + } + return nil, nil + }, + } + r := NewReflector(lw, &v1.Pod{}, s, 0) + // Set the reflector to paginate the list request in 4 item chunks. + r.WatchListPageSize = 4 + r.ListAndWatch(stopCh) + + results := s.List() + if len(results) != 10 { + t.Errorf("Expected 10 results, got %d", len(results)) + } +}