From fbd65a265a47ffe081aaac2f794a55034333d11d Mon Sep 17 00:00:00 2001 From: wojtekt Date: Mon, 31 Aug 2020 11:58:45 +0200 Subject: [PATCH] Pipe newFunc to etcd3 storage layer --- pkg/controlplane/instance.go | 2 +- pkg/registry/core/pod/rest/log_test.go | 2 +- .../core/service/allocator/storage/storage.go | 2 +- .../ipallocator/storage/storage_test.go | 2 +- .../portallocator/storage/storage_test.go | 2 +- .../registry/generic/registry/dryrun_test.go | 2 +- .../generic/registry/storage_factory.go | 2 +- .../registry/generic/registry/store_test.go | 9 +++++--- .../pkg/registry/generic/storage_decorator.go | 6 ++--- .../apiserver/pkg/storage/etcd3/store.go | 6 ++--- .../apiserver/pkg/storage/etcd3/store_test.go | 22 +++++++++++-------- .../pkg/storage/etcd3/watcher_test.go | 4 ++-- .../pkg/storage/storagebackend/factory/BUILD | 1 + .../storage/storagebackend/factory/etcd3.go | 5 +++-- .../storage/storagebackend/factory/factory.go | 5 +++-- .../storagebackend/factory/tls_test.go | 2 +- .../pkg/storage/tests/cacher_test.go | 9 +++++--- 17 files changed, 48 insertions(+), 35 deletions(-) diff --git a/pkg/controlplane/instance.go b/pkg/controlplane/instance.go index 4acd4edad99..02e5ac5d203 100644 --- a/pkg/controlplane/instance.go +++ b/pkg/controlplane/instance.go @@ -258,7 +258,7 @@ func (c *Config) createLeaseReconciler() reconcilers.EndpointReconciler { if err != nil { klog.Fatalf("Error determining service IP ranges: %v", err) } - leaseStorage, _, err := storagefactory.Create(*config) + leaseStorage, _, err := storagefactory.Create(*config, nil) if err != nil { klog.Fatalf("Error creating storage factory: %v", err) } diff --git a/pkg/registry/core/pod/rest/log_test.go b/pkg/registry/core/pod/rest/log_test.go index 158037adb46..b47717b46be 100644 --- a/pkg/registry/core/pod/rest/log_test.go +++ b/pkg/registry/core/pod/rest/log_test.go @@ -30,7 +30,7 @@ import ( func TestPodLogValidates(t *testing.T) { config, server := registrytest.NewEtcdStorage(t, "") defer server.Terminate(t) - s, destroyFunc, err := generic.NewRawStorage(config) + s, destroyFunc, err := generic.NewRawStorage(config, nil) if err != nil { t.Fatalf("Unexpected error: %v", err) } diff --git a/pkg/registry/core/service/allocator/storage/storage.go b/pkg/registry/core/service/allocator/storage/storage.go index 19c96164f51..08f6c188db7 100644 --- a/pkg/registry/core/service/allocator/storage/storage.go +++ b/pkg/registry/core/service/allocator/storage/storage.go @@ -62,7 +62,7 @@ var _ rangeallocation.RangeRegistry = &Etcd{} // NewEtcd returns an allocator that is backed by Etcd and can manage // persisting the snapshot state of allocation after each allocation is made. func NewEtcd(alloc allocator.Snapshottable, baseKey string, resource schema.GroupResource, config *storagebackend.Config) (*Etcd, error) { - storage, d, err := generic.NewRawStorage(config) + storage, d, err := generic.NewRawStorage(config, nil) if err != nil { return nil, err } diff --git a/pkg/registry/core/service/ipallocator/storage/storage_test.go b/pkg/registry/core/service/ipallocator/storage/storage_test.go index 763680d81c1..eb9bbafc2d1 100644 --- a/pkg/registry/core/service/ipallocator/storage/storage_test.go +++ b/pkg/registry/core/service/ipallocator/storage/storage_test.go @@ -54,7 +54,7 @@ func newStorage(t *testing.T) (*etcd3testing.EtcdTestServer, ipallocator.Interfa if err != nil { t.Fatalf("unexpected error creating etcd: %v", err) } - s, d, err := generic.NewRawStorage(etcdStorage) + s, d, err := generic.NewRawStorage(etcdStorage, nil) if err != nil { t.Fatalf("Couldn't create storage: %v", err) } diff --git a/pkg/registry/core/service/portallocator/storage/storage_test.go b/pkg/registry/core/service/portallocator/storage/storage_test.go index a86c1ddf4c2..144f2e8e9ca 100644 --- a/pkg/registry/core/service/portallocator/storage/storage_test.go +++ b/pkg/registry/core/service/portallocator/storage/storage_test.go @@ -57,7 +57,7 @@ func newStorage(t *testing.T) (*etcd3testing.EtcdTestServer, portallocator.Inter if err != nil { t.Fatalf("unexpected error creating etcd: %v", err) } - s, d, err := generic.NewRawStorage(etcdStorage) + s, d, err := generic.NewRawStorage(etcdStorage, nil) if err != nil { t.Fatalf("Couldn't create storage: %v", err) } diff --git a/staging/src/k8s.io/apiserver/pkg/registry/generic/registry/dryrun_test.go b/staging/src/k8s.io/apiserver/pkg/registry/generic/registry/dryrun_test.go index 2703ef0df97..25f8e96f5da 100644 --- a/staging/src/k8s.io/apiserver/pkg/registry/generic/registry/dryrun_test.go +++ b/staging/src/k8s.io/apiserver/pkg/registry/generic/registry/dryrun_test.go @@ -38,7 +38,7 @@ import ( func NewDryRunnableTestStorage(t *testing.T) (DryRunnableStorage, func()) { server, sc := etcd3testing.NewUnsecuredEtcd3TestClientServer(t) sc.Codec = apitesting.TestStorageCodec(codecs, examplev1.SchemeGroupVersion) - s, destroy, err := factory.Create(*sc) + s, destroy, err := factory.Create(*sc, nil) if err != nil { t.Fatalf("Error creating storage: %v", err) } diff --git a/staging/src/k8s.io/apiserver/pkg/registry/generic/registry/storage_factory.go b/staging/src/k8s.io/apiserver/pkg/registry/generic/registry/storage_factory.go index 75d315bbf56..8c31bf8195c 100644 --- a/staging/src/k8s.io/apiserver/pkg/registry/generic/registry/storage_factory.go +++ b/staging/src/k8s.io/apiserver/pkg/registry/generic/registry/storage_factory.go @@ -45,7 +45,7 @@ func StorageWithCacher() generic.StorageDecorator { triggerFuncs storage.IndexerFuncs, indexers *cache.Indexers) (storage.Interface, factory.DestroyFunc, error) { - s, d, err := generic.NewRawStorage(storageConfig) + s, d, err := generic.NewRawStorage(storageConfig, newFunc) if err != nil { return s, d, err } diff --git a/staging/src/k8s.io/apiserver/pkg/registry/generic/registry/store_test.go b/staging/src/k8s.io/apiserver/pkg/registry/generic/registry/store_test.go index fd1979bb625..d177d28972f 100644 --- a/staging/src/k8s.io/apiserver/pkg/registry/generic/registry/store_test.go +++ b/staging/src/k8s.io/apiserver/pkg/registry/generic/registry/store_test.go @@ -1601,8 +1601,11 @@ func newTestGenericStoreRegistry(t *testing.T, scheme *runtime.Scheme, hasCacheE server, sc := etcd3testing.NewUnsecuredEtcd3TestClientServer(t) strategy := &testRESTStrategy{scheme, names.SimpleNameGenerator, true, false, true} + newFunc := func() runtime.Object { return &example.Pod{} } + newListFunc := func() runtime.Object { return &example.PodList{} } + sc.Codec = apitesting.TestStorageCodec(codecs, examplev1.SchemeGroupVersion) - s, dFunc, err := factory.Create(*sc) + s, dFunc, err := factory.Create(*sc, newFunc) if err != nil { t.Fatalf("Error creating storage: %v", err) } @@ -1617,8 +1620,8 @@ func newTestGenericStoreRegistry(t *testing.T, scheme *runtime.Scheme, hasCacheE ResourcePrefix: podPrefix, KeyFunc: func(obj runtime.Object) (string, error) { return storage.NoNamespaceKeyFunc(podPrefix, obj) }, GetAttrsFunc: getPodAttrs, - NewFunc: func() runtime.Object { return &example.Pod{} }, - NewListFunc: func() runtime.Object { return &example.PodList{} }, + NewFunc: newFunc, + NewListFunc: newListFunc, Codec: sc.Codec, } cacher, err := cacherstorage.NewCacherFromConfig(config) diff --git a/staging/src/k8s.io/apiserver/pkg/registry/generic/storage_decorator.go b/staging/src/k8s.io/apiserver/pkg/registry/generic/storage_decorator.go index 223b630fc53..e0ca2df04c7 100644 --- a/staging/src/k8s.io/apiserver/pkg/registry/generic/storage_decorator.go +++ b/staging/src/k8s.io/apiserver/pkg/registry/generic/storage_decorator.go @@ -47,12 +47,12 @@ func UndecoratedStorage( getAttrsFunc storage.AttrFunc, trigger storage.IndexerFuncs, indexers *cache.Indexers) (storage.Interface, factory.DestroyFunc, error) { - return NewRawStorage(config) + return NewRawStorage(config, newFunc) } // NewRawStorage creates the low level kv storage. This is a work-around for current // two layer of same storage interface. // TODO: Once cacher is enabled on all registries (event registry is special), we will remove this method. -func NewRawStorage(config *storagebackend.Config) (storage.Interface, factory.DestroyFunc, error) { - return factory.Create(*config) +func NewRawStorage(config *storagebackend.Config, newFunc func() runtime.Object) (storage.Interface, factory.DestroyFunc, error) { + return factory.Create(*config, newFunc) } diff --git a/staging/src/k8s.io/apiserver/pkg/storage/etcd3/store.go b/staging/src/k8s.io/apiserver/pkg/storage/etcd3/store.go index 14643dad913..8aa966a6669 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/etcd3/store.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/etcd3/store.go @@ -83,11 +83,11 @@ type objState struct { } // New returns an etcd3 implementation of storage.Interface. -func New(c *clientv3.Client, codec runtime.Codec, prefix string, transformer value.Transformer, pagingEnabled bool) storage.Interface { - return newStore(c, pagingEnabled, codec, prefix, transformer) +func New(c *clientv3.Client, codec runtime.Codec, newFunc func() runtime.Object, prefix string, transformer value.Transformer, pagingEnabled bool) storage.Interface { + return newStore(c, newFunc, pagingEnabled, codec, prefix, transformer) } -func newStore(c *clientv3.Client, pagingEnabled bool, codec runtime.Codec, prefix string, transformer value.Transformer) *store { +func newStore(c *clientv3.Client, _ func() runtime.Object, pagingEnabled bool, codec runtime.Codec, prefix string, transformer value.Transformer) *store { versioner := APIObjectVersioner{} result := &store{ client: c, diff --git a/staging/src/k8s.io/apiserver/pkg/storage/etcd3/store_test.go b/staging/src/k8s.io/apiserver/pkg/storage/etcd3/store_test.go index db60282d45c..3526cc1ca5d 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/etcd3/store_test.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/etcd3/store_test.go @@ -99,6 +99,10 @@ func (p *prefixTransformer) resetReads() { p.reads = 0 } +func newPod() runtime.Object { + return &example.Pod{} +} + func TestCreate(t *testing.T) { ctx, store, cluster := testSetup(t) defer cluster.Terminate(t) @@ -818,7 +822,7 @@ func TestTransformationFailure(t *testing.T) { codec := apitesting.TestCodec(codecs, examplev1.SchemeGroupVersion) cluster := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1}) defer cluster.Terminate(t) - store := newStore(cluster.RandClient(), false, codec, "", &prefixTransformer{prefix: []byte(defaultTestPrefix)}) + store := newStore(cluster.RandClient(), newPod, false, codec, "", &prefixTransformer{prefix: []byte(defaultTestPrefix)}) ctx := context.Background() preset := []struct { @@ -895,8 +899,8 @@ func TestList(t *testing.T) { codec := apitesting.TestCodec(codecs, examplev1.SchemeGroupVersion) cluster := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1}) defer cluster.Terminate(t) - store := newStore(cluster.RandClient(), true, codec, "", &prefixTransformer{prefix: []byte(defaultTestPrefix)}) - disablePagingStore := newStore(cluster.RandClient(), false, codec, "", &prefixTransformer{prefix: []byte(defaultTestPrefix)}) + store := newStore(cluster.RandClient(), newPod, true, codec, "", &prefixTransformer{prefix: []byte(defaultTestPrefix)}) + disablePagingStore := newStore(cluster.RandClient(), newPod, false, codec, "", &prefixTransformer{prefix: []byte(defaultTestPrefix)}) ctx := context.Background() // Setup storage with the following structure: @@ -1394,7 +1398,7 @@ func TestListContinuation(t *testing.T) { etcdClient := cluster.RandClient() recorder := &clientRecorder{KV: etcdClient.KV} etcdClient.KV = recorder - store := newStore(etcdClient, true, codec, "", transformer) + store := newStore(etcdClient, newPod, true, codec, "", transformer) ctx := context.Background() // Setup storage with the following structure: @@ -1556,7 +1560,7 @@ func TestListContinuationWithFilter(t *testing.T) { etcdClient := cluster.RandClient() recorder := &clientRecorder{KV: etcdClient.KV} etcdClient.KV = recorder - store := newStore(etcdClient, true, codec, "", transformer) + store := newStore(etcdClient, newPod, true, codec, "", transformer) ctx := context.Background() preset := []struct { @@ -1659,7 +1663,7 @@ func TestListInconsistentContinuation(t *testing.T) { codec := apitesting.TestCodec(codecs, examplev1.SchemeGroupVersion) cluster := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1}) defer cluster.Terminate(t) - store := newStore(cluster.RandClient(), true, codec, "", &prefixTransformer{prefix: []byte(defaultTestPrefix)}) + store := newStore(cluster.RandClient(), newPod, true, codec, "", &prefixTransformer{prefix: []byte(defaultTestPrefix)}) ctx := context.Background() // Setup storage with the following structure: @@ -1804,7 +1808,7 @@ func TestListInconsistentContinuation(t *testing.T) { func testSetup(t *testing.T) (context.Context, *store, *integration.ClusterV3) { codec := apitesting.TestCodec(codecs, examplev1.SchemeGroupVersion) cluster := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1}) - store := newStore(cluster.RandClient(), true, codec, "", &prefixTransformer{prefix: []byte(defaultTestPrefix)}) + store := newStore(cluster.RandClient(), newPod, true, codec, "", &prefixTransformer{prefix: []byte(defaultTestPrefix)}) ctx := context.Background() // As 30s is the default timeout for testing in glboal configuration, // we cannot wait longer than that in a single time: change it to 10 @@ -1844,7 +1848,7 @@ func TestPrefix(t *testing.T) { "/registry": "/registry", } for configuredPrefix, effectivePrefix := range testcases { - store := newStore(cluster.RandClient(), true, codec, configuredPrefix, transformer) + store := newStore(cluster.RandClient(), nil, true, codec, configuredPrefix, transformer) if store.pathPrefix != effectivePrefix { t.Errorf("configured prefix of %s, expected effective prefix of %s, got %s", configuredPrefix, effectivePrefix, store.pathPrefix) } @@ -2011,7 +2015,7 @@ func TestConsistentList(t *testing.T) { transformer := &fancyTransformer{ transformer: &prefixTransformer{prefix: []byte(defaultTestPrefix)}, } - store := newStore(cluster.RandClient(), true, codec, "", transformer) + store := newStore(cluster.RandClient(), newPod, true, codec, "", transformer) transformer.store = store for i := 0; i < 5; i++ { diff --git a/staging/src/k8s.io/apiserver/pkg/storage/etcd3/watcher_test.go b/staging/src/k8s.io/apiserver/pkg/storage/etcd3/watcher_test.go index ad3209ccd47..4a1dda94673 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/etcd3/watcher_test.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/etcd3/watcher_test.go @@ -225,13 +225,13 @@ func TestWatchError(t *testing.T) { codec := &testCodec{apitesting.TestCodec(codecs, examplev1.SchemeGroupVersion)} cluster := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1}) defer cluster.Terminate(t) - invalidStore := newStore(cluster.RandClient(), true, codec, "", &prefixTransformer{prefix: []byte("test!")}) + invalidStore := newStore(cluster.RandClient(), newPod, true, codec, "", &prefixTransformer{prefix: []byte("test!")}) ctx := context.Background() w, err := invalidStore.Watch(ctx, "/abc", storage.ListOptions{ResourceVersion: "0", Predicate: storage.Everything}) if err != nil { t.Fatalf("Watch failed: %v", err) } - validStore := newStore(cluster.RandClient(), true, codec, "", &prefixTransformer{prefix: []byte("test!")}) + validStore := newStore(cluster.RandClient(), newPod, true, codec, "", &prefixTransformer{prefix: []byte("test!")}) validStore.GuaranteedUpdate(ctx, "/abc", &example.Pod{}, true, nil, storage.SimpleUpdate( func(runtime.Object) (runtime.Object, error) { return &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo"}}, nil diff --git a/staging/src/k8s.io/apiserver/pkg/storage/storagebackend/factory/BUILD b/staging/src/k8s.io/apiserver/pkg/storage/storagebackend/factory/BUILD index b7cdcfe31b8..c797ce2c7f9 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/storagebackend/factory/BUILD +++ b/staging/src/k8s.io/apiserver/pkg/storage/storagebackend/factory/BUILD @@ -34,6 +34,7 @@ go_library( importmap = "k8s.io/kubernetes/vendor/k8s.io/apiserver/pkg/storage/storagebackend/factory", importpath = "k8s.io/apiserver/pkg/storage/storagebackend/factory", deps = [ + "//staging/src/k8s.io/apimachinery/pkg/runtime:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/util/net:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/util/wait:go_default_library", "//staging/src/k8s.io/apiserver/pkg/server/egressselector:go_default_library", diff --git a/staging/src/k8s.io/apiserver/pkg/storage/storagebackend/factory/etcd3.go b/staging/src/k8s.io/apiserver/pkg/storage/storagebackend/factory/etcd3.go index 26067633e58..3d1f2f150fa 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/storagebackend/factory/etcd3.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/storagebackend/factory/etcd3.go @@ -31,6 +31,7 @@ import ( "go.etcd.io/etcd/pkg/transport" "google.golang.org/grpc" + "k8s.io/apimachinery/pkg/runtime" utilnet "k8s.io/apimachinery/pkg/util/net" "k8s.io/apimachinery/pkg/util/wait" "k8s.io/apiserver/pkg/server/egressselector" @@ -217,7 +218,7 @@ func startCompactorOnce(c storagebackend.TransportConfig, interval time.Duration }, nil } -func newETCD3Storage(c storagebackend.Config) (storage.Interface, DestroyFunc, error) { +func newETCD3Storage(c storagebackend.Config, newFunc func() runtime.Object) (storage.Interface, DestroyFunc, error) { stopCompactor, err := startCompactorOnce(c.Transport, c.CompactionInterval) if err != nil { return nil, nil, err @@ -249,7 +250,7 @@ func newETCD3Storage(c storagebackend.Config) (storage.Interface, DestroyFunc, e if transformer == nil { transformer = value.IdentityTransformer } - return etcd3.New(client, c.Codec, c.Prefix, transformer, c.Paging), destroyFunc, nil + return etcd3.New(client, c.Codec, newFunc, c.Prefix, transformer, c.Paging), destroyFunc, nil } // startDBSizeMonitorPerEndpoint starts a loop to monitor etcd database size and update the diff --git a/staging/src/k8s.io/apiserver/pkg/storage/storagebackend/factory/factory.go b/staging/src/k8s.io/apiserver/pkg/storage/storagebackend/factory/factory.go index a1dc6c0fa89..1e8a8cdb0f0 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/storagebackend/factory/factory.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/storagebackend/factory/factory.go @@ -19,6 +19,7 @@ package factory import ( "fmt" + "k8s.io/apimachinery/pkg/runtime" "k8s.io/apiserver/pkg/storage" "k8s.io/apiserver/pkg/storage/storagebackend" ) @@ -27,12 +28,12 @@ import ( type DestroyFunc func() // Create creates a storage backend based on given config. -func Create(c storagebackend.Config) (storage.Interface, DestroyFunc, error) { +func Create(c storagebackend.Config, newFunc func() runtime.Object) (storage.Interface, DestroyFunc, error) { switch c.Type { case "etcd2": return nil, nil, fmt.Errorf("%v is no longer a supported storage backend", c.Type) case storagebackend.StorageTypeUnset, storagebackend.StorageTypeETCD3: - return newETCD3Storage(c) + return newETCD3Storage(c, newFunc) default: return nil, nil, fmt.Errorf("unknown storage type: %s", c.Type) } diff --git a/staging/src/k8s.io/apiserver/pkg/storage/storagebackend/factory/tls_test.go b/staging/src/k8s.io/apiserver/pkg/storage/storagebackend/factory/tls_test.go index 829a8af730b..4bc49172324 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/storagebackend/factory/tls_test.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/storagebackend/factory/tls_test.go @@ -75,7 +75,7 @@ func TestTLSConnection(t *testing.T) { }, Codec: codec, } - storage, destroyFunc, err := newETCD3Storage(cfg) + storage, destroyFunc, err := newETCD3Storage(cfg, nil) defer destroyFunc() if err != nil { t.Fatal(err) diff --git a/staging/src/k8s.io/apiserver/pkg/storage/tests/cacher_test.go b/staging/src/k8s.io/apiserver/pkg/storage/tests/cacher_test.go index b9de03e15cc..0c4275e6af4 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/tests/cacher_test.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/tests/cacher_test.go @@ -99,9 +99,12 @@ func AddObjectMetaFieldsSet(source fields.Set, objectMeta *metav1.ObjectMeta, ha return source } +func newPod() runtime.Object { return &example.Pod{} } +func newPodList() runtime.Object { return &example.PodList{} } + func newEtcdTestStorage(t *testing.T, prefix string) (*etcd3testing.EtcdTestServer, storage.Interface) { server, _ := etcd3testing.NewUnsecuredEtcd3TestClientServer(t) - storage := etcd3.New(server.V3Client, apitesting.TestCodec(codecs, examplev1.SchemeGroupVersion), prefix, value.IdentityTransformer, true) + storage := etcd3.New(server.V3Client, apitesting.TestCodec(codecs, examplev1.SchemeGroupVersion), newPod, prefix, value.IdentityTransformer, true) return server, storage } @@ -118,8 +121,8 @@ func newTestCacherWithClock(s storage.Interface, clock clock.Clock) (*cacherstor ResourcePrefix: prefix, KeyFunc: func(obj runtime.Object) (string, error) { return storage.NamespaceKeyFunc(prefix, obj) }, GetAttrsFunc: GetAttrs, - NewFunc: func() runtime.Object { return &example.Pod{} }, - NewListFunc: func() runtime.Object { return &example.PodList{} }, + NewFunc: newPod, + NewListFunc: newPodList, Codec: codecs.LegacyCodec(examplev1.SchemeGroupVersion), Clock: clock, }