From e5a4f09ab3ac15815ceb039fbc7f546266855fd6 Mon Sep 17 00:00:00 2001 From: Joe Betz Date: Thu, 21 Mar 2019 11:25:07 -0700 Subject: [PATCH] Add resourceVersion=0 paginated list integration test for disabled and enabled watch cache --- .../apiserver/pkg/storage/cacher/cacher.go | 2 + .../k8s.io/client-go/tools/cache/reflector.go | 1 + test/integration/apiserver/apiserver_test.go | 66 ++++++++++++++++++- test/integration/framework/master_utils.go | 31 ++++++++- 4 files changed, 96 insertions(+), 4 deletions(-) diff --git a/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher.go b/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher.go index e5564287d06..df027df68be 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher.go @@ -247,6 +247,8 @@ func NewCacherFromConfig(config Config) *Cacher { stopCh := make(chan struct{}) reflector := cache.NewNamedReflector(reflectorName, listerWatcher, obj, watchCache, 0) + // Configure reflector's pager to for an appropriate pagination chunk size for fetching data from + // storage. The pager falls back to full list if paginated list calls fail due to an "Expired" error. reflector.WatchListPageSize = storageWatchListPageSize cacher := &Cacher{ ready: newReady(), diff --git a/staging/src/k8s.io/client-go/tools/cache/reflector.go b/staging/src/k8s.io/client-go/tools/cache/reflector.go index 21bba3ae35d..023fd5e8e1a 100644 --- a/staging/src/k8s.io/client-go/tools/cache/reflector.go +++ b/staging/src/k8s.io/client-go/tools/cache/reflector.go @@ -192,6 +192,7 @@ func (r *Reflector) ListAndWatch(stopCh <-chan struct{}) error { if r.WatchListPageSize != 0 { pager.PageSize = r.WatchListPageSize } + // Pager falls back to full list if paginated list calls fail due to an "Expired" error. list, err = pager.List(context.Background(), options) close(listCh) }() diff --git a/test/integration/apiserver/apiserver_test.go b/test/integration/apiserver/apiserver_test.go index f3fc735193b..c5f012538ea 100644 --- a/test/integration/apiserver/apiserver_test.go +++ b/test/integration/apiserver/apiserver_test.go @@ -65,8 +65,16 @@ func setup(t *testing.T, groupVersions ...schema.GroupVersion) (*httptest.Server return setupWithResources(t, groupVersions, nil) } +func setupWithOptions(t *testing.T, opts *framework.MasterConfigOptions, groupVersions ...schema.GroupVersion) (*httptest.Server, clientset.Interface, framework.CloseFunc) { + return setupWithResourcesWithOptions(t, opts, groupVersions, nil) +} + func setupWithResources(t *testing.T, groupVersions []schema.GroupVersion, resources []schema.GroupVersionResource) (*httptest.Server, clientset.Interface, framework.CloseFunc) { - masterConfig := framework.NewIntegrationTestMasterConfig() + return setupWithResourcesWithOptions(t, &framework.MasterConfigOptions{}, groupVersions, resources) +} + +func setupWithResourcesWithOptions(t *testing.T, opts *framework.MasterConfigOptions, groupVersions []schema.GroupVersion, resources []schema.GroupVersionResource) (*httptest.Server, clientset.Interface, framework.CloseFunc) { + masterConfig := framework.NewIntegrationTestMasterConfigWithOptions(opts) if len(groupVersions) > 0 || len(resources) > 0 { resourceConfig := master.DefaultAPIResourceConfigSource() resourceConfig.EnableVersions(groupVersions...) @@ -189,6 +197,62 @@ func Test202StatusCode(t *testing.T) { verifyStatusCode(t, "DELETE", s.URL+path.Join("/apis/apps/v1/namespaces", ns.Name, "replicasets", rs.Name), cascDel, 202) } +func TestListResourceVersion0(t *testing.T) { + var testcases = []struct { + name string + watchCacheEnabled bool + }{ + { + name: "watchCacheOn", + watchCacheEnabled: true, + }, + { + name: "watchCacheOff", + watchCacheEnabled: false, + }, + } + for _, tc := range testcases { + t.Run(tc.name, func(t *testing.T) { + defer utilfeaturetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.APIListChunking, true)() + etcdOptions := framework.DefaultEtcdOptions() + etcdOptions.EnableWatchCache = tc.watchCacheEnabled + s, clientSet, closeFn := setupWithOptions(t, &framework.MasterConfigOptions{EtcdOptions: etcdOptions}) + defer closeFn() + + ns := framework.CreateTestingNamespace("list-paging", s, t) + defer framework.DeleteTestingNamespace(ns, s, t) + + rsClient := clientSet.AppsV1().ReplicaSets(ns.Name) + + for i := 0; i < 10; i++ { + rs := newRS(ns.Name) + rs.Name = fmt.Sprintf("test-%d", i) + if _, err := rsClient.Create(rs); err != nil { + t.Fatal(err) + } + } + + pagerFn := func(opts metav1.ListOptions) (runtime.Object, error) { + return rsClient.List(opts) + } + + p := pager.New(pager.SimplePageFunc(pagerFn)) + p.PageSize = 3 + listObj, err := p.List(context.Background(), metav1.ListOptions{ResourceVersion: "0"}) + if err != nil { + t.Fatalf("Unexpected list error: %v", err) + } + items, err := meta.ExtractList(listObj) + if err != nil { + t.Fatalf("Failed to extract list from %v", listObj) + } + if len(items) != 10 { + t.Errorf("Expected list size of 10 but got %d", len(items)) + } + }) + } +} + func TestAPIListChunking(t *testing.T) { defer utilfeaturetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.APIListChunking, true)() s, clientSet, closeFn := setup(t) diff --git a/test/integration/framework/master_utils.go b/test/integration/framework/master_utils.go index b0464cdf56c..85b1b5f0120 100644 --- a/test/integration/framework/master_utils.go +++ b/test/integration/framework/master_utils.go @@ -242,7 +242,13 @@ func startMasterOrDie(masterConfig *master.Config, incomingServer *httptest.Serv // NewIntegrationTestMasterConfig returns the master config appropriate for most integration tests. func NewIntegrationTestMasterConfig() *master.Config { - masterConfig := NewMasterConfig() + return NewIntegrationTestMasterConfigWithOptions(&MasterConfigOptions{}) +} + +// NewIntegrationTestMasterConfigWithOptions returns the master config appropriate for most integration tests +// configured with the provided options. +func NewIntegrationTestMasterConfigWithOptions(opts *MasterConfigOptions) *master.Config { + masterConfig := NewMasterConfigWithOptions(opts) masterConfig.GenericConfig.PublicAddress = net.ParseIP("192.168.10.4") masterConfig.ExtraConfig.APIResourceConfigSource = master.DefaultAPIResourceConfigSource() @@ -252,13 +258,32 @@ func NewIntegrationTestMasterConfig() *master.Config { return masterConfig } -// NewMasterConfig returns a basic master config. -func NewMasterConfig() *master.Config { +// MasterConfigOptions are the configurable options for a new integration test master config. +type MasterConfigOptions struct { + EtcdOptions *options.EtcdOptions +} + +// DefaultEtcdOptions are the default EtcdOptions for use with integration tests. +func DefaultEtcdOptions() *options.EtcdOptions { // This causes the integration tests to exercise the etcd // prefix code, so please don't change without ensuring // sufficient coverage in other ways. etcdOptions := options.NewEtcdOptions(storagebackend.NewDefaultConfig(uuid.New(), nil)) etcdOptions.StorageConfig.Transport.ServerList = []string{GetEtcdURL()} + return etcdOptions +} + +// NewMasterConfig returns a basic master config. +func NewMasterConfig() *master.Config { + return NewMasterConfigWithOptions(&MasterConfigOptions{}) +} + +// NewMasterConfigWithOptions returns a basic master config configured with the provided options. +func NewMasterConfigWithOptions(opts *MasterConfigOptions) *master.Config { + etcdOptions := DefaultEtcdOptions() + if opts.EtcdOptions != nil { + etcdOptions = opts.EtcdOptions + } info, _ := runtime.SerializerInfoForMediaType(legacyscheme.Codecs.SupportedMediaTypes(), runtime.ContentTypeJSON) ns := NewSingleContentTypeSerializer(legacyscheme.Scheme, info)