From 84723c2d3ef5ff5d30aadd1ad72068bf2254358c Mon Sep 17 00:00:00 2001 From: Joe Betz Date: Thu, 14 Mar 2019 13:26:19 -0700 Subject: [PATCH 1/2] Paginate List call performed by Reflector's ListAndWatch call --- .../apiserver/pkg/storage/cacher/cacher.go | 24 +++++- .../apiserver/pkg/storage/etcd3/store.go | 4 + .../pkg/storage/tests/cacher_test.go | 82 +++++++++++++++++++ .../k8s.io/client-go/tools/cache/reflector.go | 15 +++- .../client-go/tools/cache/reflector_test.go | 45 +++++++++- 5 files changed, 164 insertions(+), 6 deletions(-) diff --git a/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher.go b/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher.go index d20b41a42b1..e5564287d06 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher.go @@ -56,6 +56,12 @@ var ( emptyFunc = func() {} ) +const ( + // storageWatchListPageSize is the cacher's request chunk size of + // initial and resync watch lists to storage. + storageWatchListPageSize = int64(10000) +) + func init() { prometheus.MustRegister(initCounter) } @@ -229,7 +235,7 @@ type Cacher struct { // given configuration. func NewCacherFromConfig(config Config) *Cacher { watchCache := newWatchCache(config.CacheCapacity, config.KeyFunc, config.GetAttrsFunc, config.Versioner) - listerWatcher := newCacherListerWatcher(config.Storage, config.ResourcePrefix, config.NewListFunc) + listerWatcher := NewCacherListerWatcher(config.Storage, config.ResourcePrefix, config.NewListFunc) reflectorName := "storage/cacher.go:" + config.ResourcePrefix obj := config.NewFunc() @@ -240,12 +246,14 @@ func NewCacherFromConfig(config Config) *Cacher { } stopCh := make(chan struct{}) + reflector := cache.NewNamedReflector(reflectorName, listerWatcher, obj, watchCache, 0) + reflector.WatchListPageSize = storageWatchListPageSize cacher := &Cacher{ ready: newReady(), storage: config.Storage, objectType: reflect.TypeOf(obj), watchCache: watchCache, - reflector: cache.NewNamedReflector(reflectorName, listerWatcher, obj, watchCache, 0), + reflector: reflector, versioner: config.Versioner, triggerFunc: config.TriggerPublisherFunc, watcherIdx: 0, @@ -816,7 +824,8 @@ type cacherListerWatcher struct { newListFunc func() runtime.Object } -func newCacherListerWatcher(storage storage.Interface, resourcePrefix string, newListFunc func() runtime.Object) cache.ListerWatcher { +// NewCacherListerWatcher returns a storage.Interface backed ListerWatcher. +func NewCacherListerWatcher(storage storage.Interface, resourcePrefix string, newListFunc func() runtime.Object) cache.ListerWatcher { return &cacherListerWatcher{ storage: storage, resourcePrefix: resourcePrefix, @@ -827,7 +836,14 @@ func newCacherListerWatcher(storage storage.Interface, resourcePrefix string, ne // Implements cache.ListerWatcher interface. func (lw *cacherListerWatcher) List(options metav1.ListOptions) (runtime.Object, error) { list := lw.newListFunc() - if err := lw.storage.List(context.TODO(), lw.resourcePrefix, "", storage.Everything, list); err != nil { + pred := storage.SelectionPredicate{ + Label: labels.Everything(), + Field: fields.Everything(), + Limit: options.Limit, + Continue: options.Continue, + } + + if err := lw.storage.List(context.TODO(), lw.resourcePrefix, "", pred, list); err != nil { return nil, err } return list, nil diff --git a/staging/src/k8s.io/apiserver/pkg/storage/etcd3/store.go b/staging/src/k8s.io/apiserver/pkg/storage/etcd3/store.go index 0667a407c02..34e91361768 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/etcd3/store.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/etcd3/store.go @@ -384,6 +384,8 @@ func (s *store) GuaranteedUpdate( // GetToList implements storage.Interface.GetToList. func (s *store) GetToList(ctx context.Context, key string, resourceVersion string, pred storage.SelectionPredicate, listObj runtime.Object) error { + trace := utiltrace.New(fmt.Sprintf("GetToList etcd3: key=%v, resourceVersion=%s, limit: %d, continue: %s", key, resourceVersion, pred.Limit, pred.Continue)) + defer trace.LogIfLong(500 * time.Millisecond) listPtr, err := meta.GetItemsPtr(listObj) if err != nil { return err @@ -487,6 +489,8 @@ func encodeContinue(key, keyPrefix string, resourceVersion int64) (string, error // List implements storage.Interface.List. func (s *store) List(ctx context.Context, key, resourceVersion string, pred storage.SelectionPredicate, listObj runtime.Object) error { + trace := utiltrace.New(fmt.Sprintf("List etcd3: key=%v, resourceVersion=%s, limit: %d, continue: %s", key, resourceVersion, pred.Limit, pred.Continue)) + defer trace.LogIfLong(500 * time.Millisecond) listPtr, err := meta.GetItemsPtr(listObj) if err != nil { return err diff --git a/staging/src/k8s.io/apiserver/pkg/storage/tests/cacher_test.go b/staging/src/k8s.io/apiserver/pkg/storage/tests/cacher_test.go index beface7a19f..619d505305e 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/tests/cacher_test.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/tests/cacher_test.go @@ -692,3 +692,85 @@ func TestRandomWatchDeliver(t *testing.T) { watched++ } } + +func TestCacherListerWatcher(t *testing.T) { + prefix := "pods" + fn := func() runtime.Object { return &example.PodList{} } + server, store := newEtcdTestStorage(t, prefix) + defer server.Terminate(t) + + podFoo := makeTestPod("foo") + podBar := makeTestPod("bar") + podBaz := makeTestPod("baz") + + _ = updatePod(t, store, podFoo, nil) + _ = updatePod(t, store, podBar, nil) + _ = updatePod(t, store, podBaz, nil) + + lw := cacherstorage.NewCacherListerWatcher(store, prefix, fn) + + obj, err := lw.List(metav1.ListOptions{}) + if err != nil { + t.Fatalf("List failed: %v", err) + } + pl, ok := obj.(*example.PodList) + if !ok { + t.Fatalf("Expected PodList but got %t", pl) + } + if len(pl.Items) != 3 { + t.Errorf("Expected PodList of length 3 but got %d", len(pl.Items)) + } +} + +func TestCacherListerWatcherPagination(t *testing.T) { + prefix := "pods" + fn := func() runtime.Object { return &example.PodList{} } + server, store := newEtcdTestStorage(t, prefix) + defer server.Terminate(t) + + podFoo := makeTestPod("foo") + podBar := makeTestPod("bar") + podBaz := makeTestPod("baz") + + _ = updatePod(t, store, podFoo, nil) + _ = updatePod(t, store, podBar, nil) + _ = updatePod(t, store, podBaz, nil) + + lw := cacherstorage.NewCacherListerWatcher(store, prefix, fn) + + obj1, err := lw.List(metav1.ListOptions{Limit: 2}) + if err != nil { + t.Fatalf("List failed: %v", err) + } + limit1, ok := obj1.(*example.PodList) + if !ok { + t.Fatalf("Expected PodList but got %t", limit1) + } + if len(limit1.Items) != 2 { + t.Errorf("Expected PodList of length 2 but got %d", len(limit1.Items)) + } + if limit1.Continue == "" { + t.Errorf("Expected list to have Continue but got none") + } + obj2, err := lw.List(metav1.ListOptions{Limit: 2, Continue: limit1.Continue}) + if err != nil { + t.Fatalf("List failed: %v", err) + } + limit2, ok := obj2.(*example.PodList) + if !ok { + t.Fatalf("Expected PodList but got %t", limit2) + } + if limit2.Continue != "" { + t.Errorf("Expected list not to have Continue, but got %s", limit1.Continue) + } + + if limit1.Items[0].Name != podBar.Name { + t.Errorf("Expected list1.Items[0] to be %s but got %s", podBar.Name, limit1.Items[0].Name) + } + if limit1.Items[1].Name != podBaz.Name { + t.Errorf("Expected list1.Items[1] to be %s but got %s", podBaz.Name, limit1.Items[1].Name) + } + if limit2.Items[0].Name != podFoo.Name { + t.Errorf("Expected list2.Items[0] to be %s but got %s", podFoo.Name, limit2.Items[0].Name) + } +} diff --git a/staging/src/k8s.io/client-go/tools/cache/reflector.go b/staging/src/k8s.io/client-go/tools/cache/reflector.go index a629fd4e8da..21bba3ae35d 100644 --- a/staging/src/k8s.io/client-go/tools/cache/reflector.go +++ b/staging/src/k8s.io/client-go/tools/cache/reflector.go @@ -17,6 +17,7 @@ limitations under the License. package cache import ( + "context" "errors" "fmt" "io" @@ -38,6 +39,7 @@ import ( utilruntime "k8s.io/apimachinery/pkg/util/runtime" "k8s.io/apimachinery/pkg/util/wait" "k8s.io/apimachinery/pkg/watch" + "k8s.io/client-go/tools/pager" "k8s.io/klog" "k8s.io/utils/trace" ) @@ -68,6 +70,9 @@ type Reflector struct { lastSyncResourceVersion string // lastSyncResourceVersionMutex guards read/write access to lastSyncResourceVersion lastSyncResourceVersionMutex sync.RWMutex + // WatchListPageSize is the requested chunk size of initial and resync watch lists. + // Defaults to pager.PageSize. + WatchListPageSize int64 } var ( @@ -179,7 +184,15 @@ func (r *Reflector) ListAndWatch(stopCh <-chan struct{}) error { panicCh <- r } }() - list, err = r.listerWatcher.List(options) + // Attempt to gather list in chunks, if supported by listerWatcher, if not, the first + // list request will return the full response. + pager := pager.New(pager.SimplePageFunc(func(opts metav1.ListOptions) (runtime.Object, error) { + return r.listerWatcher.List(opts) + })) + if r.WatchListPageSize != 0 { + pager.PageSize = r.WatchListPageSize + } + list, err = pager.List(context.Background(), options) close(listCh) }() select { diff --git a/staging/src/k8s.io/client-go/tools/cache/reflector_test.go b/staging/src/k8s.io/client-go/tools/cache/reflector_test.go index bb06059f7e1..caa0723231f 100644 --- a/staging/src/k8s.io/client-go/tools/cache/reflector_test.go +++ b/staging/src/k8s.io/client-go/tools/cache/reflector_test.go @@ -24,7 +24,7 @@ import ( "testing" "time" - "k8s.io/api/core/v1" + v1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/util/wait" @@ -387,3 +387,46 @@ func TestReflectorResync(t *testing.T) { t.Errorf("exactly 2 iterations were expected, got: %v", iteration) } } + +func TestReflectorWatchListPageSize(t *testing.T) { + stopCh := make(chan struct{}) + s := NewStore(MetaNamespaceKeyFunc) + + lw := &testLW{ + WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) { + // Stop once the reflector begins watching since we're only interested in the list. + close(stopCh) + fw := watch.NewFake() + return fw, nil + }, + ListFunc: func(options metav1.ListOptions) (runtime.Object, error) { + if options.Limit != 4 { + t.Fatalf("Expected list Limit of 4 but got %d", options.Limit) + } + pods := make([]v1.Pod, 10) + for i := 0; i < 10; i++ { + pods[i] = v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: fmt.Sprintf("pod-%d", i), ResourceVersion: fmt.Sprintf("%d", i)}} + } + switch options.Continue { + case "": + return &v1.PodList{ListMeta: metav1.ListMeta{ResourceVersion: "10", Continue: "C1"}, Items: pods[0:4]}, nil + case "C1": + return &v1.PodList{ListMeta: metav1.ListMeta{ResourceVersion: "10", Continue: "C2"}, Items: pods[4:8]}, nil + case "C2": + return &v1.PodList{ListMeta: metav1.ListMeta{ResourceVersion: "10"}, Items: pods[8:10]}, nil + default: + t.Fatalf("Unrecognized continue: %s", options.Continue) + } + return nil, nil + }, + } + r := NewReflector(lw, &v1.Pod{}, s, 0) + // Set the reflector to paginate the list request in 4 item chunks. + r.WatchListPageSize = 4 + r.ListAndWatch(stopCh) + + results := s.List() + if len(results) != 10 { + t.Errorf("Expected 10 results, got %d", len(results)) + } +} From e5a4f09ab3ac15815ceb039fbc7f546266855fd6 Mon Sep 17 00:00:00 2001 From: Joe Betz Date: Thu, 21 Mar 2019 11:25:07 -0700 Subject: [PATCH 2/2] Add resourceVersion=0 paginated list integration test for disabled and enabled watch cache --- .../apiserver/pkg/storage/cacher/cacher.go | 2 + .../k8s.io/client-go/tools/cache/reflector.go | 1 + test/integration/apiserver/apiserver_test.go | 66 ++++++++++++++++++- test/integration/framework/master_utils.go | 31 ++++++++- 4 files changed, 96 insertions(+), 4 deletions(-) diff --git a/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher.go b/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher.go index e5564287d06..df027df68be 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher.go @@ -247,6 +247,8 @@ func NewCacherFromConfig(config Config) *Cacher { stopCh := make(chan struct{}) reflector := cache.NewNamedReflector(reflectorName, listerWatcher, obj, watchCache, 0) + // Configure reflector's pager to for an appropriate pagination chunk size for fetching data from + // storage. The pager falls back to full list if paginated list calls fail due to an "Expired" error. reflector.WatchListPageSize = storageWatchListPageSize cacher := &Cacher{ ready: newReady(), diff --git a/staging/src/k8s.io/client-go/tools/cache/reflector.go b/staging/src/k8s.io/client-go/tools/cache/reflector.go index 21bba3ae35d..023fd5e8e1a 100644 --- a/staging/src/k8s.io/client-go/tools/cache/reflector.go +++ b/staging/src/k8s.io/client-go/tools/cache/reflector.go @@ -192,6 +192,7 @@ func (r *Reflector) ListAndWatch(stopCh <-chan struct{}) error { if r.WatchListPageSize != 0 { pager.PageSize = r.WatchListPageSize } + // Pager falls back to full list if paginated list calls fail due to an "Expired" error. list, err = pager.List(context.Background(), options) close(listCh) }() diff --git a/test/integration/apiserver/apiserver_test.go b/test/integration/apiserver/apiserver_test.go index f3fc735193b..c5f012538ea 100644 --- a/test/integration/apiserver/apiserver_test.go +++ b/test/integration/apiserver/apiserver_test.go @@ -65,8 +65,16 @@ func setup(t *testing.T, groupVersions ...schema.GroupVersion) (*httptest.Server return setupWithResources(t, groupVersions, nil) } +func setupWithOptions(t *testing.T, opts *framework.MasterConfigOptions, groupVersions ...schema.GroupVersion) (*httptest.Server, clientset.Interface, framework.CloseFunc) { + return setupWithResourcesWithOptions(t, opts, groupVersions, nil) +} + func setupWithResources(t *testing.T, groupVersions []schema.GroupVersion, resources []schema.GroupVersionResource) (*httptest.Server, clientset.Interface, framework.CloseFunc) { - masterConfig := framework.NewIntegrationTestMasterConfig() + return setupWithResourcesWithOptions(t, &framework.MasterConfigOptions{}, groupVersions, resources) +} + +func setupWithResourcesWithOptions(t *testing.T, opts *framework.MasterConfigOptions, groupVersions []schema.GroupVersion, resources []schema.GroupVersionResource) (*httptest.Server, clientset.Interface, framework.CloseFunc) { + masterConfig := framework.NewIntegrationTestMasterConfigWithOptions(opts) if len(groupVersions) > 0 || len(resources) > 0 { resourceConfig := master.DefaultAPIResourceConfigSource() resourceConfig.EnableVersions(groupVersions...) @@ -189,6 +197,62 @@ func Test202StatusCode(t *testing.T) { verifyStatusCode(t, "DELETE", s.URL+path.Join("/apis/apps/v1/namespaces", ns.Name, "replicasets", rs.Name), cascDel, 202) } +func TestListResourceVersion0(t *testing.T) { + var testcases = []struct { + name string + watchCacheEnabled bool + }{ + { + name: "watchCacheOn", + watchCacheEnabled: true, + }, + { + name: "watchCacheOff", + watchCacheEnabled: false, + }, + } + for _, tc := range testcases { + t.Run(tc.name, func(t *testing.T) { + defer utilfeaturetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.APIListChunking, true)() + etcdOptions := framework.DefaultEtcdOptions() + etcdOptions.EnableWatchCache = tc.watchCacheEnabled + s, clientSet, closeFn := setupWithOptions(t, &framework.MasterConfigOptions{EtcdOptions: etcdOptions}) + defer closeFn() + + ns := framework.CreateTestingNamespace("list-paging", s, t) + defer framework.DeleteTestingNamespace(ns, s, t) + + rsClient := clientSet.AppsV1().ReplicaSets(ns.Name) + + for i := 0; i < 10; i++ { + rs := newRS(ns.Name) + rs.Name = fmt.Sprintf("test-%d", i) + if _, err := rsClient.Create(rs); err != nil { + t.Fatal(err) + } + } + + pagerFn := func(opts metav1.ListOptions) (runtime.Object, error) { + return rsClient.List(opts) + } + + p := pager.New(pager.SimplePageFunc(pagerFn)) + p.PageSize = 3 + listObj, err := p.List(context.Background(), metav1.ListOptions{ResourceVersion: "0"}) + if err != nil { + t.Fatalf("Unexpected list error: %v", err) + } + items, err := meta.ExtractList(listObj) + if err != nil { + t.Fatalf("Failed to extract list from %v", listObj) + } + if len(items) != 10 { + t.Errorf("Expected list size of 10 but got %d", len(items)) + } + }) + } +} + func TestAPIListChunking(t *testing.T) { defer utilfeaturetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.APIListChunking, true)() s, clientSet, closeFn := setup(t) diff --git a/test/integration/framework/master_utils.go b/test/integration/framework/master_utils.go index b0464cdf56c..85b1b5f0120 100644 --- a/test/integration/framework/master_utils.go +++ b/test/integration/framework/master_utils.go @@ -242,7 +242,13 @@ func startMasterOrDie(masterConfig *master.Config, incomingServer *httptest.Serv // NewIntegrationTestMasterConfig returns the master config appropriate for most integration tests. func NewIntegrationTestMasterConfig() *master.Config { - masterConfig := NewMasterConfig() + return NewIntegrationTestMasterConfigWithOptions(&MasterConfigOptions{}) +} + +// NewIntegrationTestMasterConfigWithOptions returns the master config appropriate for most integration tests +// configured with the provided options. +func NewIntegrationTestMasterConfigWithOptions(opts *MasterConfigOptions) *master.Config { + masterConfig := NewMasterConfigWithOptions(opts) masterConfig.GenericConfig.PublicAddress = net.ParseIP("192.168.10.4") masterConfig.ExtraConfig.APIResourceConfigSource = master.DefaultAPIResourceConfigSource() @@ -252,13 +258,32 @@ func NewIntegrationTestMasterConfig() *master.Config { return masterConfig } -// NewMasterConfig returns a basic master config. -func NewMasterConfig() *master.Config { +// MasterConfigOptions are the configurable options for a new integration test master config. +type MasterConfigOptions struct { + EtcdOptions *options.EtcdOptions +} + +// DefaultEtcdOptions are the default EtcdOptions for use with integration tests. +func DefaultEtcdOptions() *options.EtcdOptions { // This causes the integration tests to exercise the etcd // prefix code, so please don't change without ensuring // sufficient coverage in other ways. etcdOptions := options.NewEtcdOptions(storagebackend.NewDefaultConfig(uuid.New(), nil)) etcdOptions.StorageConfig.Transport.ServerList = []string{GetEtcdURL()} + return etcdOptions +} + +// NewMasterConfig returns a basic master config. +func NewMasterConfig() *master.Config { + return NewMasterConfigWithOptions(&MasterConfigOptions{}) +} + +// NewMasterConfigWithOptions returns a basic master config configured with the provided options. +func NewMasterConfigWithOptions(opts *MasterConfigOptions) *master.Config { + etcdOptions := DefaultEtcdOptions() + if opts.EtcdOptions != nil { + etcdOptions = opts.EtcdOptions + } info, _ := runtime.SerializerInfoForMediaType(legacyscheme.Codecs.SupportedMediaTypes(), runtime.ContentTypeJSON) ns := NewSingleContentTypeSerializer(legacyscheme.Scheme, info)