From fbd65a265a47ffe081aaac2f794a55034333d11d Mon Sep 17 00:00:00 2001 From: wojtekt Date: Mon, 31 Aug 2020 11:58:45 +0200 Subject: [PATCH 1/5] Pipe newFunc to etcd3 storage layer --- pkg/controlplane/instance.go | 2 +- pkg/registry/core/pod/rest/log_test.go | 2 +- .../core/service/allocator/storage/storage.go | 2 +- .../ipallocator/storage/storage_test.go | 2 +- .../portallocator/storage/storage_test.go | 2 +- .../registry/generic/registry/dryrun_test.go | 2 +- .../generic/registry/storage_factory.go | 2 +- .../registry/generic/registry/store_test.go | 9 +++++--- .../pkg/registry/generic/storage_decorator.go | 6 ++--- .../apiserver/pkg/storage/etcd3/store.go | 6 ++--- .../apiserver/pkg/storage/etcd3/store_test.go | 22 +++++++++++-------- .../pkg/storage/etcd3/watcher_test.go | 4 ++-- .../pkg/storage/storagebackend/factory/BUILD | 1 + .../storage/storagebackend/factory/etcd3.go | 5 +++-- .../storage/storagebackend/factory/factory.go | 5 +++-- .../storagebackend/factory/tls_test.go | 2 +- .../pkg/storage/tests/cacher_test.go | 9 +++++--- 17 files changed, 48 insertions(+), 35 deletions(-) diff --git a/pkg/controlplane/instance.go b/pkg/controlplane/instance.go index 4acd4edad99..02e5ac5d203 100644 --- a/pkg/controlplane/instance.go +++ b/pkg/controlplane/instance.go @@ -258,7 +258,7 @@ func (c *Config) createLeaseReconciler() reconcilers.EndpointReconciler { if err != nil { klog.Fatalf("Error determining service IP ranges: %v", err) } - leaseStorage, _, err := storagefactory.Create(*config) + leaseStorage, _, err := storagefactory.Create(*config, nil) if err != nil { klog.Fatalf("Error creating storage factory: %v", err) } diff --git a/pkg/registry/core/pod/rest/log_test.go b/pkg/registry/core/pod/rest/log_test.go index 158037adb46..b47717b46be 100644 --- a/pkg/registry/core/pod/rest/log_test.go +++ b/pkg/registry/core/pod/rest/log_test.go @@ -30,7 +30,7 @@ import ( func TestPodLogValidates(t *testing.T) { config, server := registrytest.NewEtcdStorage(t, "") defer server.Terminate(t) - s, destroyFunc, err := generic.NewRawStorage(config) + s, destroyFunc, err := generic.NewRawStorage(config, nil) if err != nil { t.Fatalf("Unexpected error: %v", err) } diff --git a/pkg/registry/core/service/allocator/storage/storage.go b/pkg/registry/core/service/allocator/storage/storage.go index 19c96164f51..08f6c188db7 100644 --- a/pkg/registry/core/service/allocator/storage/storage.go +++ b/pkg/registry/core/service/allocator/storage/storage.go @@ -62,7 +62,7 @@ var _ rangeallocation.RangeRegistry = &Etcd{} // NewEtcd returns an allocator that is backed by Etcd and can manage // persisting the snapshot state of allocation after each allocation is made. func NewEtcd(alloc allocator.Snapshottable, baseKey string, resource schema.GroupResource, config *storagebackend.Config) (*Etcd, error) { - storage, d, err := generic.NewRawStorage(config) + storage, d, err := generic.NewRawStorage(config, nil) if err != nil { return nil, err } diff --git a/pkg/registry/core/service/ipallocator/storage/storage_test.go b/pkg/registry/core/service/ipallocator/storage/storage_test.go index 763680d81c1..eb9bbafc2d1 100644 --- a/pkg/registry/core/service/ipallocator/storage/storage_test.go +++ b/pkg/registry/core/service/ipallocator/storage/storage_test.go @@ -54,7 +54,7 @@ func newStorage(t *testing.T) (*etcd3testing.EtcdTestServer, ipallocator.Interfa if err != nil { t.Fatalf("unexpected error creating etcd: %v", err) } - s, d, err := generic.NewRawStorage(etcdStorage) + s, d, err := generic.NewRawStorage(etcdStorage, nil) if err != nil { t.Fatalf("Couldn't create storage: %v", err) } diff --git a/pkg/registry/core/service/portallocator/storage/storage_test.go b/pkg/registry/core/service/portallocator/storage/storage_test.go index a86c1ddf4c2..144f2e8e9ca 100644 --- a/pkg/registry/core/service/portallocator/storage/storage_test.go +++ b/pkg/registry/core/service/portallocator/storage/storage_test.go @@ -57,7 +57,7 @@ func newStorage(t *testing.T) (*etcd3testing.EtcdTestServer, portallocator.Inter if err != nil { t.Fatalf("unexpected error creating etcd: %v", err) } - s, d, err := generic.NewRawStorage(etcdStorage) + s, d, err := generic.NewRawStorage(etcdStorage, nil) if err != nil { t.Fatalf("Couldn't create storage: %v", err) } diff --git a/staging/src/k8s.io/apiserver/pkg/registry/generic/registry/dryrun_test.go b/staging/src/k8s.io/apiserver/pkg/registry/generic/registry/dryrun_test.go index 2703ef0df97..25f8e96f5da 100644 --- a/staging/src/k8s.io/apiserver/pkg/registry/generic/registry/dryrun_test.go +++ b/staging/src/k8s.io/apiserver/pkg/registry/generic/registry/dryrun_test.go @@ -38,7 +38,7 @@ import ( func NewDryRunnableTestStorage(t *testing.T) (DryRunnableStorage, func()) { server, sc := etcd3testing.NewUnsecuredEtcd3TestClientServer(t) sc.Codec = apitesting.TestStorageCodec(codecs, examplev1.SchemeGroupVersion) - s, destroy, err := factory.Create(*sc) + s, destroy, err := factory.Create(*sc, nil) if err != nil { t.Fatalf("Error creating storage: %v", err) } diff --git a/staging/src/k8s.io/apiserver/pkg/registry/generic/registry/storage_factory.go b/staging/src/k8s.io/apiserver/pkg/registry/generic/registry/storage_factory.go index 75d315bbf56..8c31bf8195c 100644 --- a/staging/src/k8s.io/apiserver/pkg/registry/generic/registry/storage_factory.go +++ b/staging/src/k8s.io/apiserver/pkg/registry/generic/registry/storage_factory.go @@ -45,7 +45,7 @@ func StorageWithCacher() generic.StorageDecorator { triggerFuncs storage.IndexerFuncs, indexers *cache.Indexers) (storage.Interface, factory.DestroyFunc, error) { - s, d, err := generic.NewRawStorage(storageConfig) + s, d, err := generic.NewRawStorage(storageConfig, newFunc) if err != nil { return s, d, err } diff --git a/staging/src/k8s.io/apiserver/pkg/registry/generic/registry/store_test.go b/staging/src/k8s.io/apiserver/pkg/registry/generic/registry/store_test.go index fd1979bb625..d177d28972f 100644 --- a/staging/src/k8s.io/apiserver/pkg/registry/generic/registry/store_test.go +++ b/staging/src/k8s.io/apiserver/pkg/registry/generic/registry/store_test.go @@ -1601,8 +1601,11 @@ func newTestGenericStoreRegistry(t *testing.T, scheme *runtime.Scheme, hasCacheE server, sc := etcd3testing.NewUnsecuredEtcd3TestClientServer(t) strategy := &testRESTStrategy{scheme, names.SimpleNameGenerator, true, false, true} + newFunc := func() runtime.Object { return &example.Pod{} } + newListFunc := func() runtime.Object { return &example.PodList{} } + sc.Codec = apitesting.TestStorageCodec(codecs, examplev1.SchemeGroupVersion) - s, dFunc, err := factory.Create(*sc) + s, dFunc, err := factory.Create(*sc, newFunc) if err != nil { t.Fatalf("Error creating storage: %v", err) } @@ -1617,8 +1620,8 @@ func newTestGenericStoreRegistry(t *testing.T, scheme *runtime.Scheme, hasCacheE ResourcePrefix: podPrefix, KeyFunc: func(obj runtime.Object) (string, error) { return storage.NoNamespaceKeyFunc(podPrefix, obj) }, GetAttrsFunc: getPodAttrs, - NewFunc: func() runtime.Object { return &example.Pod{} }, - NewListFunc: func() runtime.Object { return &example.PodList{} }, + NewFunc: newFunc, + NewListFunc: newListFunc, Codec: sc.Codec, } cacher, err := cacherstorage.NewCacherFromConfig(config) diff --git a/staging/src/k8s.io/apiserver/pkg/registry/generic/storage_decorator.go b/staging/src/k8s.io/apiserver/pkg/registry/generic/storage_decorator.go index 223b630fc53..e0ca2df04c7 100644 --- a/staging/src/k8s.io/apiserver/pkg/registry/generic/storage_decorator.go +++ b/staging/src/k8s.io/apiserver/pkg/registry/generic/storage_decorator.go @@ -47,12 +47,12 @@ func UndecoratedStorage( getAttrsFunc storage.AttrFunc, trigger storage.IndexerFuncs, indexers *cache.Indexers) (storage.Interface, factory.DestroyFunc, error) { - return NewRawStorage(config) + return NewRawStorage(config, newFunc) } // NewRawStorage creates the low level kv storage. This is a work-around for current // two layer of same storage interface. // TODO: Once cacher is enabled on all registries (event registry is special), we will remove this method. -func NewRawStorage(config *storagebackend.Config) (storage.Interface, factory.DestroyFunc, error) { - return factory.Create(*config) +func NewRawStorage(config *storagebackend.Config, newFunc func() runtime.Object) (storage.Interface, factory.DestroyFunc, error) { + return factory.Create(*config, newFunc) } diff --git a/staging/src/k8s.io/apiserver/pkg/storage/etcd3/store.go b/staging/src/k8s.io/apiserver/pkg/storage/etcd3/store.go index 14643dad913..8aa966a6669 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/etcd3/store.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/etcd3/store.go @@ -83,11 +83,11 @@ type objState struct { } // New returns an etcd3 implementation of storage.Interface. -func New(c *clientv3.Client, codec runtime.Codec, prefix string, transformer value.Transformer, pagingEnabled bool) storage.Interface { - return newStore(c, pagingEnabled, codec, prefix, transformer) +func New(c *clientv3.Client, codec runtime.Codec, newFunc func() runtime.Object, prefix string, transformer value.Transformer, pagingEnabled bool) storage.Interface { + return newStore(c, newFunc, pagingEnabled, codec, prefix, transformer) } -func newStore(c *clientv3.Client, pagingEnabled bool, codec runtime.Codec, prefix string, transformer value.Transformer) *store { +func newStore(c *clientv3.Client, _ func() runtime.Object, pagingEnabled bool, codec runtime.Codec, prefix string, transformer value.Transformer) *store { versioner := APIObjectVersioner{} result := &store{ client: c, diff --git a/staging/src/k8s.io/apiserver/pkg/storage/etcd3/store_test.go b/staging/src/k8s.io/apiserver/pkg/storage/etcd3/store_test.go index db60282d45c..3526cc1ca5d 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/etcd3/store_test.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/etcd3/store_test.go @@ -99,6 +99,10 @@ func (p *prefixTransformer) resetReads() { p.reads = 0 } +func newPod() runtime.Object { + return &example.Pod{} +} + func TestCreate(t *testing.T) { ctx, store, cluster := testSetup(t) defer cluster.Terminate(t) @@ -818,7 +822,7 @@ func TestTransformationFailure(t *testing.T) { codec := apitesting.TestCodec(codecs, examplev1.SchemeGroupVersion) cluster := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1}) defer cluster.Terminate(t) - store := newStore(cluster.RandClient(), false, codec, "", &prefixTransformer{prefix: []byte(defaultTestPrefix)}) + store := newStore(cluster.RandClient(), newPod, false, codec, "", &prefixTransformer{prefix: []byte(defaultTestPrefix)}) ctx := context.Background() preset := []struct { @@ -895,8 +899,8 @@ func TestList(t *testing.T) { codec := apitesting.TestCodec(codecs, examplev1.SchemeGroupVersion) cluster := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1}) defer cluster.Terminate(t) - store := newStore(cluster.RandClient(), true, codec, "", &prefixTransformer{prefix: []byte(defaultTestPrefix)}) - disablePagingStore := newStore(cluster.RandClient(), false, codec, "", &prefixTransformer{prefix: []byte(defaultTestPrefix)}) + store := newStore(cluster.RandClient(), newPod, true, codec, "", &prefixTransformer{prefix: []byte(defaultTestPrefix)}) + disablePagingStore := newStore(cluster.RandClient(), newPod, false, codec, "", &prefixTransformer{prefix: []byte(defaultTestPrefix)}) ctx := context.Background() // Setup storage with the following structure: @@ -1394,7 +1398,7 @@ func TestListContinuation(t *testing.T) { etcdClient := cluster.RandClient() recorder := &clientRecorder{KV: etcdClient.KV} etcdClient.KV = recorder - store := newStore(etcdClient, true, codec, "", transformer) + store := newStore(etcdClient, newPod, true, codec, "", transformer) ctx := context.Background() // Setup storage with the following structure: @@ -1556,7 +1560,7 @@ func TestListContinuationWithFilter(t *testing.T) { etcdClient := cluster.RandClient() recorder := &clientRecorder{KV: etcdClient.KV} etcdClient.KV = recorder - store := newStore(etcdClient, true, codec, "", transformer) + store := newStore(etcdClient, newPod, true, codec, "", transformer) ctx := context.Background() preset := []struct { @@ -1659,7 +1663,7 @@ func TestListInconsistentContinuation(t *testing.T) { codec := apitesting.TestCodec(codecs, examplev1.SchemeGroupVersion) cluster := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1}) defer cluster.Terminate(t) - store := newStore(cluster.RandClient(), true, codec, "", &prefixTransformer{prefix: []byte(defaultTestPrefix)}) + store := newStore(cluster.RandClient(), newPod, true, codec, "", &prefixTransformer{prefix: []byte(defaultTestPrefix)}) ctx := context.Background() // Setup storage with the following structure: @@ -1804,7 +1808,7 @@ func TestListInconsistentContinuation(t *testing.T) { func testSetup(t *testing.T) (context.Context, *store, *integration.ClusterV3) { codec := apitesting.TestCodec(codecs, examplev1.SchemeGroupVersion) cluster := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1}) - store := newStore(cluster.RandClient(), true, codec, "", &prefixTransformer{prefix: []byte(defaultTestPrefix)}) + store := newStore(cluster.RandClient(), newPod, true, codec, "", &prefixTransformer{prefix: []byte(defaultTestPrefix)}) ctx := context.Background() // As 30s is the default timeout for testing in glboal configuration, // we cannot wait longer than that in a single time: change it to 10 @@ -1844,7 +1848,7 @@ func TestPrefix(t *testing.T) { "/registry": "/registry", } for configuredPrefix, effectivePrefix := range testcases { - store := newStore(cluster.RandClient(), true, codec, configuredPrefix, transformer) + store := newStore(cluster.RandClient(), nil, true, codec, configuredPrefix, transformer) if store.pathPrefix != effectivePrefix { t.Errorf("configured prefix of %s, expected effective prefix of %s, got %s", configuredPrefix, effectivePrefix, store.pathPrefix) } @@ -2011,7 +2015,7 @@ func TestConsistentList(t *testing.T) { transformer := &fancyTransformer{ transformer: &prefixTransformer{prefix: []byte(defaultTestPrefix)}, } - store := newStore(cluster.RandClient(), true, codec, "", transformer) + store := newStore(cluster.RandClient(), newPod, true, codec, "", transformer) transformer.store = store for i := 0; i < 5; i++ { diff --git a/staging/src/k8s.io/apiserver/pkg/storage/etcd3/watcher_test.go b/staging/src/k8s.io/apiserver/pkg/storage/etcd3/watcher_test.go index ad3209ccd47..4a1dda94673 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/etcd3/watcher_test.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/etcd3/watcher_test.go @@ -225,13 +225,13 @@ func TestWatchError(t *testing.T) { codec := &testCodec{apitesting.TestCodec(codecs, examplev1.SchemeGroupVersion)} cluster := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1}) defer cluster.Terminate(t) - invalidStore := newStore(cluster.RandClient(), true, codec, "", &prefixTransformer{prefix: []byte("test!")}) + invalidStore := newStore(cluster.RandClient(), newPod, true, codec, "", &prefixTransformer{prefix: []byte("test!")}) ctx := context.Background() w, err := invalidStore.Watch(ctx, "/abc", storage.ListOptions{ResourceVersion: "0", Predicate: storage.Everything}) if err != nil { t.Fatalf("Watch failed: %v", err) } - validStore := newStore(cluster.RandClient(), true, codec, "", &prefixTransformer{prefix: []byte("test!")}) + validStore := newStore(cluster.RandClient(), newPod, true, codec, "", &prefixTransformer{prefix: []byte("test!")}) validStore.GuaranteedUpdate(ctx, "/abc", &example.Pod{}, true, nil, storage.SimpleUpdate( func(runtime.Object) (runtime.Object, error) { return &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo"}}, nil diff --git a/staging/src/k8s.io/apiserver/pkg/storage/storagebackend/factory/BUILD b/staging/src/k8s.io/apiserver/pkg/storage/storagebackend/factory/BUILD index b7cdcfe31b8..c797ce2c7f9 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/storagebackend/factory/BUILD +++ b/staging/src/k8s.io/apiserver/pkg/storage/storagebackend/factory/BUILD @@ -34,6 +34,7 @@ go_library( importmap = "k8s.io/kubernetes/vendor/k8s.io/apiserver/pkg/storage/storagebackend/factory", importpath = "k8s.io/apiserver/pkg/storage/storagebackend/factory", deps = [ + "//staging/src/k8s.io/apimachinery/pkg/runtime:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/util/net:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/util/wait:go_default_library", "//staging/src/k8s.io/apiserver/pkg/server/egressselector:go_default_library", diff --git a/staging/src/k8s.io/apiserver/pkg/storage/storagebackend/factory/etcd3.go b/staging/src/k8s.io/apiserver/pkg/storage/storagebackend/factory/etcd3.go index 26067633e58..3d1f2f150fa 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/storagebackend/factory/etcd3.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/storagebackend/factory/etcd3.go @@ -31,6 +31,7 @@ import ( "go.etcd.io/etcd/pkg/transport" "google.golang.org/grpc" + "k8s.io/apimachinery/pkg/runtime" utilnet "k8s.io/apimachinery/pkg/util/net" "k8s.io/apimachinery/pkg/util/wait" "k8s.io/apiserver/pkg/server/egressselector" @@ -217,7 +218,7 @@ func startCompactorOnce(c storagebackend.TransportConfig, interval time.Duration }, nil } -func newETCD3Storage(c storagebackend.Config) (storage.Interface, DestroyFunc, error) { +func newETCD3Storage(c storagebackend.Config, newFunc func() runtime.Object) (storage.Interface, DestroyFunc, error) { stopCompactor, err := startCompactorOnce(c.Transport, c.CompactionInterval) if err != nil { return nil, nil, err @@ -249,7 +250,7 @@ func newETCD3Storage(c storagebackend.Config) (storage.Interface, DestroyFunc, e if transformer == nil { transformer = value.IdentityTransformer } - return etcd3.New(client, c.Codec, c.Prefix, transformer, c.Paging), destroyFunc, nil + return etcd3.New(client, c.Codec, newFunc, c.Prefix, transformer, c.Paging), destroyFunc, nil } // startDBSizeMonitorPerEndpoint starts a loop to monitor etcd database size and update the diff --git a/staging/src/k8s.io/apiserver/pkg/storage/storagebackend/factory/factory.go b/staging/src/k8s.io/apiserver/pkg/storage/storagebackend/factory/factory.go index a1dc6c0fa89..1e8a8cdb0f0 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/storagebackend/factory/factory.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/storagebackend/factory/factory.go @@ -19,6 +19,7 @@ package factory import ( "fmt" + "k8s.io/apimachinery/pkg/runtime" "k8s.io/apiserver/pkg/storage" "k8s.io/apiserver/pkg/storage/storagebackend" ) @@ -27,12 +28,12 @@ import ( type DestroyFunc func() // Create creates a storage backend based on given config. -func Create(c storagebackend.Config) (storage.Interface, DestroyFunc, error) { +func Create(c storagebackend.Config, newFunc func() runtime.Object) (storage.Interface, DestroyFunc, error) { switch c.Type { case "etcd2": return nil, nil, fmt.Errorf("%v is no longer a supported storage backend", c.Type) case storagebackend.StorageTypeUnset, storagebackend.StorageTypeETCD3: - return newETCD3Storage(c) + return newETCD3Storage(c, newFunc) default: return nil, nil, fmt.Errorf("unknown storage type: %s", c.Type) } diff --git a/staging/src/k8s.io/apiserver/pkg/storage/storagebackend/factory/tls_test.go b/staging/src/k8s.io/apiserver/pkg/storage/storagebackend/factory/tls_test.go index 829a8af730b..4bc49172324 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/storagebackend/factory/tls_test.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/storagebackend/factory/tls_test.go @@ -75,7 +75,7 @@ func TestTLSConnection(t *testing.T) { }, Codec: codec, } - storage, destroyFunc, err := newETCD3Storage(cfg) + storage, destroyFunc, err := newETCD3Storage(cfg, nil) defer destroyFunc() if err != nil { t.Fatal(err) diff --git a/staging/src/k8s.io/apiserver/pkg/storage/tests/cacher_test.go b/staging/src/k8s.io/apiserver/pkg/storage/tests/cacher_test.go index b9de03e15cc..0c4275e6af4 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/tests/cacher_test.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/tests/cacher_test.go @@ -99,9 +99,12 @@ func AddObjectMetaFieldsSet(source fields.Set, objectMeta *metav1.ObjectMeta, ha return source } +func newPod() runtime.Object { return &example.Pod{} } +func newPodList() runtime.Object { return &example.PodList{} } + func newEtcdTestStorage(t *testing.T, prefix string) (*etcd3testing.EtcdTestServer, storage.Interface) { server, _ := etcd3testing.NewUnsecuredEtcd3TestClientServer(t) - storage := etcd3.New(server.V3Client, apitesting.TestCodec(codecs, examplev1.SchemeGroupVersion), prefix, value.IdentityTransformer, true) + storage := etcd3.New(server.V3Client, apitesting.TestCodec(codecs, examplev1.SchemeGroupVersion), newPod, prefix, value.IdentityTransformer, true) return server, storage } @@ -118,8 +121,8 @@ func newTestCacherWithClock(s storage.Interface, clock clock.Clock) (*cacherstor ResourcePrefix: prefix, KeyFunc: func(obj runtime.Object) (string, error) { return storage.NamespaceKeyFunc(prefix, obj) }, GetAttrsFunc: GetAttrs, - NewFunc: func() runtime.Object { return &example.Pod{} }, - NewListFunc: func() runtime.Object { return &example.PodList{} }, + NewFunc: newPod, + NewListFunc: newPodList, Codec: codecs.LegacyCodec(examplev1.SchemeGroupVersion), Clock: clock, } From 56e72841b6005740453828a9f4f7a9a1b9a831f6 Mon Sep 17 00:00:00 2001 From: wojtekt Date: Mon, 31 Aug 2020 12:40:41 +0200 Subject: [PATCH 2/5] Implement etcd3 progress-notify feature in etcd3 layer --- .../apiserver/pkg/storage/etcd3/event.go | 20 +++++--- .../pkg/storage/etcd3/metrics/metrics.go | 14 ++++++ .../apiserver/pkg/storage/etcd3/store.go | 6 +-- .../apiserver/pkg/storage/etcd3/store_test.go | 1 - .../apiserver/pkg/storage/etcd3/watcher.go | 50 +++++++++++++++++-- .../pkg/storage/etcd3/watcher_test.go | 35 ++++++++++++- .../apiserver/pkg/storage/interfaces.go | 3 ++ 7 files changed, 112 insertions(+), 17 deletions(-) diff --git a/staging/src/k8s.io/apiserver/pkg/storage/etcd3/event.go b/staging/src/k8s.io/apiserver/pkg/storage/etcd3/event.go index c4e1f8032b0..83e52c0646c 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/etcd3/event.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/etcd3/event.go @@ -23,12 +23,13 @@ import ( ) type event struct { - key string - value []byte - prevValue []byte - rev int64 - isDeleted bool - isCreated bool + key string + value []byte + prevValue []byte + rev int64 + isDeleted bool + isCreated bool + isProgressNotify bool } // parseKV converts a KeyValue retrieved from an initial sync() listing to a synthetic isCreated event. @@ -61,3 +62,10 @@ func parseEvent(e *clientv3.Event) (*event, error) { } return ret, nil } + +func progressNotifyEvent(rev int64) *event { + return &event{ + rev: rev, + isProgressNotify: true, + } +} diff --git a/staging/src/k8s.io/apiserver/pkg/storage/etcd3/metrics/metrics.go b/staging/src/k8s.io/apiserver/pkg/storage/etcd3/metrics/metrics.go index 4ba0b14fd18..1f001406a71 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/etcd3/metrics/metrics.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/etcd3/metrics/metrics.go @@ -61,6 +61,14 @@ var ( }, []string{"endpoint"}, ) + etcdBookmarkCounts = compbasemetrics.NewGaugeVec( + &compbasemetrics.GaugeOpts{ + Name: "etcd_bookmark_counts", + Help: "Number of etcd bookmarks (progress notify events) split by kind.", + StabilityLevel: compbasemetrics.ALPHA, + }, + []string{"resource"}, + ) ) var registerMetrics sync.Once @@ -72,6 +80,7 @@ func Register() { legacyregistry.MustRegister(etcdRequestLatency) legacyregistry.MustRegister(objectCounts) legacyregistry.MustRegister(dbTotalSize) + legacyregistry.MustRegister(etcdBookmarkCounts) }) } @@ -85,6 +94,11 @@ func RecordEtcdRequestLatency(verb, resource string, startTime time.Time) { etcdRequestLatency.WithLabelValues(verb, resource).Observe(sinceInSeconds(startTime)) } +// RecordEtcdBookmark updates the etcd_bookmark_counts metric. +func RecordEtcdBookmark(resource string) { + etcdBookmarkCounts.WithLabelValues(resource).Inc() +} + // Reset resets the etcd_request_duration_seconds metric. func Reset() { etcdRequestLatency.Reset() diff --git a/staging/src/k8s.io/apiserver/pkg/storage/etcd3/store.go b/staging/src/k8s.io/apiserver/pkg/storage/etcd3/store.go index 8aa966a6669..147fa748594 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/etcd3/store.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/etcd3/store.go @@ -87,7 +87,7 @@ func New(c *clientv3.Client, codec runtime.Codec, newFunc func() runtime.Object, return newStore(c, newFunc, pagingEnabled, codec, prefix, transformer) } -func newStore(c *clientv3.Client, _ func() runtime.Object, pagingEnabled bool, codec runtime.Codec, prefix string, transformer value.Transformer) *store { +func newStore(c *clientv3.Client, newFunc func() runtime.Object, pagingEnabled bool, codec runtime.Codec, prefix string, transformer value.Transformer) *store { versioner := APIObjectVersioner{} result := &store{ client: c, @@ -99,7 +99,7 @@ func newStore(c *clientv3.Client, _ func() runtime.Object, pagingEnabled bool, c // no-op for default prefix of '/registry'. // keeps compatibility with etcd2 impl for custom prefixes that don't start with '/' pathPrefix: path.Join("/", prefix), - watcher: newWatcher(c, codec, versioner, transformer), + watcher: newWatcher(c, codec, newFunc, versioner, transformer), leaseManager: newDefaultLeaseManager(c), } return result @@ -776,7 +776,7 @@ func (s *store) watch(ctx context.Context, key string, opts storage.ListOptions, return nil, err } key = path.Join(s.pathPrefix, key) - return s.watcher.Watch(ctx, key, int64(rev), recursive, opts.Predicate) + return s.watcher.Watch(ctx, key, int64(rev), recursive, opts.ProgressNotify, opts.Predicate) } func (s *store) getState(getResp *clientv3.GetResponse, key string, v reflect.Value, ignoreNotFound bool) (*objState, error) { diff --git a/staging/src/k8s.io/apiserver/pkg/storage/etcd3/store_test.go b/staging/src/k8s.io/apiserver/pkg/storage/etcd3/store_test.go index 3526cc1ca5d..661d9782e39 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/etcd3/store_test.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/etcd3/store_test.go @@ -2077,5 +2077,4 @@ func TestConsistentList(t *testing.T) { if !reflect.DeepEqual(result3, result4) { t.Errorf("inconsistent lists: %#v, %#v", result3, result4) } - } diff --git a/staging/src/k8s.io/apiserver/pkg/storage/etcd3/watcher.go b/staging/src/k8s.io/apiserver/pkg/storage/etcd3/watcher.go index 793495f3e08..bd87382e83f 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/etcd3/watcher.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/etcd3/watcher.go @@ -21,6 +21,7 @@ import ( "errors" "fmt" "os" + "reflect" "strconv" "strings" "sync" @@ -29,6 +30,7 @@ import ( "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/watch" "k8s.io/apiserver/pkg/storage" + "k8s.io/apiserver/pkg/storage/etcd3/metrics" "k8s.io/apiserver/pkg/storage/value" "go.etcd.io/etcd/clientv3" @@ -68,6 +70,8 @@ func TestOnlySetFatalOnDecodeError(b bool) { type watcher struct { client *clientv3.Client codec runtime.Codec + newFunc func() runtime.Object + objectType string versioner storage.Versioner transformer value.Transformer } @@ -78,6 +82,7 @@ type watchChan struct { key string initialRev int64 recursive bool + progressNotify bool internalPred storage.SelectionPredicate ctx context.Context cancel context.CancelFunc @@ -86,13 +91,20 @@ type watchChan struct { errChan chan error } -func newWatcher(client *clientv3.Client, codec runtime.Codec, versioner storage.Versioner, transformer value.Transformer) *watcher { - return &watcher{ +func newWatcher(client *clientv3.Client, codec runtime.Codec, newFunc func() runtime.Object, versioner storage.Versioner, transformer value.Transformer) *watcher { + res := &watcher{ client: client, codec: codec, + newFunc: newFunc, versioner: versioner, transformer: transformer, } + if newFunc == nil { + res.objectType = "" + } else { + res.objectType = reflect.TypeOf(newFunc()).String() + } + return res } // Watch watches on a key and returns a watch.Interface that transfers relevant notifications. @@ -102,21 +114,22 @@ func newWatcher(client *clientv3.Client, codec runtime.Codec, versioner storage. // If recursive is false, it watches on given key. // If recursive is true, it watches any children and directories under the key, excluding the root key itself. // pred must be non-nil. Only if pred matches the change, it will be returned. -func (w *watcher) Watch(ctx context.Context, key string, rev int64, recursive bool, pred storage.SelectionPredicate) (watch.Interface, error) { +func (w *watcher) Watch(ctx context.Context, key string, rev int64, recursive, progressNotify bool, pred storage.SelectionPredicate) (watch.Interface, error) { if recursive && !strings.HasSuffix(key, "/") { key += "/" } - wc := w.createWatchChan(ctx, key, rev, recursive, pred) + wc := w.createWatchChan(ctx, key, rev, recursive, progressNotify, pred) go wc.run() return wc, nil } -func (w *watcher) createWatchChan(ctx context.Context, key string, rev int64, recursive bool, pred storage.SelectionPredicate) *watchChan { +func (w *watcher) createWatchChan(ctx context.Context, key string, rev int64, recursive, progressNotify bool, pred storage.SelectionPredicate) *watchChan { wc := &watchChan{ watcher: w, key: key, initialRev: rev, recursive: recursive, + progressNotify: progressNotify, internalPred: pred, incomingEventChan: make(chan *event, incomingBufSize), resultChan: make(chan watch.Event, outgoingBufSize), @@ -223,6 +236,9 @@ func (wc *watchChan) startWatching(watchClosedCh chan struct{}) { if wc.recursive { opts = append(opts, clientv3.WithPrefix()) } + if wc.progressNotify { + opts = append(opts, clientv3.WithProgressNotify()) + } wch := wc.watcher.client.Watch(wc.ctx, wc.key, opts...) for wres := range wch { if wres.Err() != nil { @@ -232,6 +248,12 @@ func (wc *watchChan) startWatching(watchClosedCh chan struct{}) { wc.sendError(err) return } + if wres.IsProgressNotify() { + wc.sendEvent(progressNotifyEvent(wres.Header.GetRevision())) + metrics.RecordEtcdBookmark(wc.watcher.objectType) + continue + } + for _, e := range wres.Events { parsedEvent, err := parseEvent(e) if err != nil { @@ -299,6 +321,19 @@ func (wc *watchChan) transform(e *event) (res *watch.Event) { } switch { + case e.isProgressNotify: + if wc.watcher.newFunc == nil { + return nil + } + object := wc.watcher.newFunc() + if err := wc.watcher.versioner.UpdateObject(object, uint64(e.rev)); err != nil { + klog.Errorf("failed to propagate object version: %v", err) + return nil + } + res = &watch.Event{ + Type: watch.Bookmark, + Object: object, + } case e.isDeleted: if !wc.filter(oldObj) { return nil @@ -376,6 +411,11 @@ func (wc *watchChan) sendEvent(e *event) { } func (wc *watchChan) prepareObjs(e *event) (curObj runtime.Object, oldObj runtime.Object, err error) { + if e.isProgressNotify { + // progressNotify events doesn't contain neither current nor previous object version, + return nil, nil, nil + } + if !e.isDeleted { data, _, err := wc.watcher.transformer.TransformFromStorage(e.value, authenticatedDataString(e.key)) if err != nil { diff --git a/staging/src/k8s.io/apiserver/pkg/storage/etcd3/watcher_test.go b/staging/src/k8s.io/apiserver/pkg/storage/etcd3/watcher_test.go index 4a1dda94673..e706edf3d6a 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/etcd3/watcher_test.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/etcd3/watcher_test.go @@ -246,7 +246,7 @@ func TestWatchContextCancel(t *testing.T) { cancel() // When we watch with a canceled context, we should detect that it's context canceled. // We won't take it as error and also close the watcher. - w, err := store.watcher.Watch(canceledCtx, "/abc", 0, false, storage.Everything) + w, err := store.watcher.Watch(canceledCtx, "/abc", 0, false, false, storage.Everything) if err != nil { t.Fatal(err) } @@ -265,7 +265,7 @@ func TestWatchErrResultNotBlockAfterCancel(t *testing.T) { origCtx, store, cluster := testSetup(t) defer cluster.Terminate(t) ctx, cancel := context.WithCancel(origCtx) - w := store.watcher.createWatchChan(ctx, "/abc", 0, false, storage.Everything) + w := store.watcher.createWatchChan(ctx, "/abc", 0, false, false, storage.Everything) // make resutlChan and errChan blocking to ensure ordering. w.resultChan = make(chan watch.Event) w.errChan = make(chan error) @@ -314,6 +314,37 @@ func TestWatchDeleteEventObjectHaveLatestRV(t *testing.T) { } } +func TestProgressNotify(t *testing.T) { + codec := apitesting.TestCodec(codecs, examplev1.SchemeGroupVersion) + clusterConfig := &integration.ClusterConfig{ + Size: 1, + WatchProgressNotifyInterval: time.Second, + } + cluster := integration.NewClusterV3(t, clusterConfig) + defer cluster.Terminate(t) + store := newStore(cluster.RandClient(), newPod, false, codec, "", &prefixTransformer{prefix: []byte(defaultTestPrefix)}) + ctx := context.Background() + + key := "/somekey" + input := &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "name"}} + out := &example.Pod{} + if err := store.Create(ctx, key, input, out, 0); err != nil { + t.Fatalf("Create failed: %v", err) + } + + opts := storage.ListOptions{ + ResourceVersion: out.ResourceVersion, + Predicate: storage.Everything, + ProgressNotify: true, + } + w, err := store.Watch(ctx, key, opts) + if err != nil { + t.Fatalf("Watch failed: %v", err) + } + result := &example.Pod{ObjectMeta: metav1.ObjectMeta{ResourceVersion: out.ResourceVersion}} + testCheckResult(t, 0, watch.Bookmark, w, result) +} + type testWatchStruct struct { obj *example.Pod expectEvent bool diff --git a/staging/src/k8s.io/apiserver/pkg/storage/interfaces.go b/staging/src/k8s.io/apiserver/pkg/storage/interfaces.go index fb30020803e..6249b5cc5fd 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/interfaces.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/interfaces.go @@ -269,4 +269,7 @@ type ListOptions struct { ResourceVersionMatch metav1.ResourceVersionMatch // Predicate provides the selection rules for the list operation. Predicate SelectionPredicate + // ProgressNotify determines whether storage-originated bookmark (progress notify) events should + // be delivered to the users. The option is ignored for non-watch requests. + ProgressNotify bool } From 4af1328bb8a3b3eb2289bbbe624480548dd39cdc Mon Sep 17 00:00:00 2001 From: wojtekt Date: Mon, 31 Aug 2020 15:58:16 +0200 Subject: [PATCH 3/5] Allow tracking resource version for reflector store --- .../k8s.io/client-go/tools/cache/reflector.go | 12 ++++ .../client-go/tools/cache/reflector_test.go | 56 +++++++++++++++++++ 2 files changed, 68 insertions(+) diff --git a/staging/src/k8s.io/client-go/tools/cache/reflector.go b/staging/src/k8s.io/client-go/tools/cache/reflector.go index 9c9a758fa29..360d7304b7f 100644 --- a/staging/src/k8s.io/client-go/tools/cache/reflector.go +++ b/staging/src/k8s.io/client-go/tools/cache/reflector.go @@ -101,6 +101,15 @@ type Reflector struct { watchErrorHandler WatchErrorHandler } +// ResourceVersionUpdater is an interface that allows store implementation to +// track the current resource version of the reflector. This is especially +// important if storage bookmarks are enabled. +type ResourceVersionUpdater interface { + // UpdateResourceVersion is called each time current resource version of the reflector + // is updated. + UpdateResourceVersion(resourceVersion string) +} + // The WatchErrorHandler is called whenever ListAndWatch drops the // connection with an error. After calling this handler, the informer // will backoff and retry. @@ -507,6 +516,9 @@ loop: } *resourceVersion = newResourceVersion r.setLastSyncResourceVersion(newResourceVersion) + if rvu, ok := r.store.(ResourceVersionUpdater); ok { + rvu.UpdateResourceVersion(newResourceVersion) + } eventCount++ } } diff --git a/staging/src/k8s.io/client-go/tools/cache/reflector_test.go b/staging/src/k8s.io/client-go/tools/cache/reflector_test.go index 9260d5a698d..6a67e59cc74 100644 --- a/staging/src/k8s.io/client-go/tools/cache/reflector_test.go +++ b/staging/src/k8s.io/client-go/tools/cache/reflector_test.go @@ -910,3 +910,59 @@ func TestReflectorSetExpectedType(t *testing.T) { }) } } + +type storeWithRV struct { + Store + + // resourceVersions tracks values passed by UpdateResourceVersion + resourceVersions []string +} + +func (s *storeWithRV) UpdateResourceVersion(resourceVersion string) { + s.resourceVersions = append(s.resourceVersions, resourceVersion) +} + +func newStoreWithRV() *storeWithRV { + return &storeWithRV{ + Store: NewStore(MetaNamespaceKeyFunc), + } +} + +func TestReflectorResourceVersionUpdate(t *testing.T) { + s := newStoreWithRV() + + stopCh := make(chan struct{}) + fw := watch.NewFake() + + lw := &testLW{ + WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) { + return fw, nil + }, + ListFunc: func(options metav1.ListOptions) (runtime.Object, error) { + return &v1.PodList{ListMeta: metav1.ListMeta{ResourceVersion: "10"}}, nil + }, + } + r := NewReflector(lw, &v1.Pod{}, s, 0) + + makePod := func(rv string) *v1.Pod { + return &v1.Pod{ObjectMeta: metav1.ObjectMeta{ResourceVersion: rv}} + } + + go func() { + fw.Action(watch.Added, makePod("10")) + fw.Action(watch.Modified, makePod("20")) + fw.Action(watch.Bookmark, makePod("30")) + fw.Action(watch.Deleted, makePod("40")) + close(stopCh) + }() + + // Initial list should use RV=0 + if err := r.ListAndWatch(stopCh); err != nil { + t.Fatal(err) + } + + expectedRVs := []string{"10", "20", "30", "40"} + if !reflect.DeepEqual(s.resourceVersions, expectedRVs) { + t.Errorf("Expected series of resource version updates of %#v but got: %#v", expectedRVs, s.resourceVersions) + } +} From a94fb5369d5e77b3fcafd1296bac072a1d6e13fe Mon Sep 17 00:00:00 2001 From: wojtekt Date: Mon, 31 Aug 2020 13:15:36 +0200 Subject: [PATCH 4/5] Enable progress notify events in watchcache --- .../apiserver/pkg/features/kube_features.go | 35 +++++++++++-------- .../apiserver/pkg/storage/cacher/cacher.go | 9 ++++- .../pkg/storage/cacher/watch_cache.go | 23 ++++++++++++ 3 files changed, 52 insertions(+), 15 deletions(-) diff --git a/staging/src/k8s.io/apiserver/pkg/features/kube_features.go b/staging/src/k8s.io/apiserver/pkg/features/kube_features.go index ccb9e24ffae..dac08af0604 100644 --- a/staging/src/k8s.io/apiserver/pkg/features/kube_features.go +++ b/staging/src/k8s.io/apiserver/pkg/features/kube_features.go @@ -138,6 +138,12 @@ const ( // // Allows sending warning headers in API responses. WarningHeaders featuregate.Feature = "WarningHeaders" + + // owner: @wojtek-t + // alpha: v1.20 + // + // Allows for updating watchcache resource version with progress notify events. + EfficientWatchResumption featuregate.Feature = "EfficientWatchResumption" ) func init() { @@ -148,18 +154,19 @@ func init() { // To add a new feature, define a key for it above and add it here. The features will be // available throughout Kubernetes binaries. var defaultKubernetesFeatureGates = map[featuregate.Feature]featuregate.FeatureSpec{ - StreamingProxyRedirects: {Default: true, PreRelease: featuregate.Deprecated}, - ValidateProxyRedirects: {Default: true, PreRelease: featuregate.Beta}, - AdvancedAuditing: {Default: true, PreRelease: featuregate.GA}, - APIResponseCompression: {Default: true, PreRelease: featuregate.Beta}, - APIListChunking: {Default: true, PreRelease: featuregate.Beta}, - DryRun: {Default: true, PreRelease: featuregate.GA}, - RemainingItemCount: {Default: true, PreRelease: featuregate.Beta}, - ServerSideApply: {Default: true, PreRelease: featuregate.Beta}, - StorageVersionHash: {Default: true, PreRelease: featuregate.Beta}, - WatchBookmark: {Default: true, PreRelease: featuregate.GA, LockToDefault: true}, - APIPriorityAndFairness: {Default: false, PreRelease: featuregate.Alpha}, - RemoveSelfLink: {Default: true, PreRelease: featuregate.Beta}, - SelectorIndex: {Default: true, PreRelease: featuregate.Beta}, - WarningHeaders: {Default: true, PreRelease: featuregate.Beta}, + StreamingProxyRedirects: {Default: true, PreRelease: featuregate.Deprecated}, + ValidateProxyRedirects: {Default: true, PreRelease: featuregate.Beta}, + AdvancedAuditing: {Default: true, PreRelease: featuregate.GA}, + APIResponseCompression: {Default: true, PreRelease: featuregate.Beta}, + APIListChunking: {Default: true, PreRelease: featuregate.Beta}, + DryRun: {Default: true, PreRelease: featuregate.GA}, + RemainingItemCount: {Default: true, PreRelease: featuregate.Beta}, + ServerSideApply: {Default: true, PreRelease: featuregate.Beta}, + StorageVersionHash: {Default: true, PreRelease: featuregate.Beta}, + WatchBookmark: {Default: true, PreRelease: featuregate.GA, LockToDefault: true}, + APIPriorityAndFairness: {Default: false, PreRelease: featuregate.Alpha}, + RemoveSelfLink: {Default: true, PreRelease: featuregate.Beta}, + SelectorIndex: {Default: true, PreRelease: featuregate.Beta}, + WarningHeaders: {Default: true, PreRelease: featuregate.Beta}, + EfficientWatchResumption: {Default: false, PreRelease: featuregate.Alpha}, } diff --git a/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher.go b/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher.go index 1b6432b8c36..83874945961 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher.go @@ -1098,7 +1098,14 @@ func (lw *cacherListerWatcher) List(options metav1.ListOptions) (runtime.Object, // Implements cache.ListerWatcher interface. func (lw *cacherListerWatcher) Watch(options metav1.ListOptions) (watch.Interface, error) { - return lw.storage.WatchList(context.TODO(), lw.resourcePrefix, storage.ListOptions{ResourceVersion: options.ResourceVersion, Predicate: storage.Everything}) + opts := storage.ListOptions{ + ResourceVersion: options.ResourceVersion, + Predicate: storage.Everything, + } + if utilfeature.DefaultFeatureGate.Enabled(features.EfficientWatchResumption) { + opts.ProgressNotify = true + } + return lw.storage.WatchList(context.TODO(), lw.resourcePrefix, opts) } // errWatcher implements watch.Interface to return a single error diff --git a/staging/src/k8s.io/apiserver/pkg/storage/cacher/watch_cache.go b/staging/src/k8s.io/apiserver/pkg/storage/cacher/watch_cache.go index 1e91733e143..1035d4a700a 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/cacher/watch_cache.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/cacher/watch_cache.go @@ -381,6 +381,29 @@ func (w *watchCache) doCacheResizeLocked(capacity int) { w.capacity = capacity } +func (w *watchCache) UpdateResourceVersion(resourceVersion string) { + rv, err := w.versioner.ParseResourceVersion(resourceVersion) + if err != nil { + klog.Errorf("Couldn't parse resourceVersion: %v", err) + return + } + + w.Lock() + defer w.Unlock() + w.resourceVersion = rv + + // Don't dispatch bookmarks coming from the storage layer. + // They can be very frequent (even to the level of subseconds) + // to allow efficient watch resumption on kube-apiserver restarts, + // and propagating them down may overload the whole system. + // + // TODO: If at some point we decide the performance and scalability + // footprint is acceptable, this is the place to hook them in. + // However, we then need to check if this was called as a result + // of a bookmark event or regular Add/Update/Delete operation by + // checking if resourceVersion here has changed. +} + // List returns list of pointers to objects. func (w *watchCache) List() []interface{} { return w.store.List() From af61e8fbdf3d83e2b287721fe0d57dd4a2234450 Mon Sep 17 00:00:00 2001 From: wojtekt Date: Tue, 1 Sep 2020 11:00:31 +0200 Subject: [PATCH 5/5] Test watchcache being updated in multietcd setup --- test/integration/apiserver/BUILD | 2 + test/integration/apiserver/watchcache_test.go | 170 ++++++++++++++++++ test/integration/framework/etcd.go | 47 +++-- 3 files changed, 201 insertions(+), 18 deletions(-) create mode 100644 test/integration/apiserver/watchcache_test.go diff --git a/test/integration/apiserver/BUILD b/test/integration/apiserver/BUILD index c67227c5f31..a31fe9aa444 100644 --- a/test/integration/apiserver/BUILD +++ b/test/integration/apiserver/BUILD @@ -15,6 +15,7 @@ go_test( "max_request_body_bytes_test.go", "patch_test.go", "print_test.go", + "watchcache_test.go", ], rundir = ".", tags = [ @@ -25,6 +26,7 @@ go_test( "//cmd/kube-apiserver/app/options:go_default_library", "//pkg/api/legacyscheme:go_default_library", "//pkg/controlplane:go_default_library", + "//pkg/controlplane/reconcilers:go_default_library", "//pkg/features:go_default_library", "//pkg/printers:go_default_library", "//pkg/printers/internalversion:go_default_library", diff --git a/test/integration/apiserver/watchcache_test.go b/test/integration/apiserver/watchcache_test.go new file mode 100644 index 00000000000..f29c8d6cd9d --- /dev/null +++ b/test/integration/apiserver/watchcache_test.go @@ -0,0 +1,170 @@ +/* +Copyright 2020 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package apiserver + +import ( + "context" + "fmt" + "testing" + "time" + + v1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/util/wait" + "k8s.io/apiserver/pkg/features" + utilfeature "k8s.io/apiserver/pkg/util/feature" + clientset "k8s.io/client-go/kubernetes" + restclient "k8s.io/client-go/rest" + featuregatetesting "k8s.io/component-base/featuregate/testing" + "k8s.io/kubernetes/pkg/controlplane/reconcilers" + "k8s.io/kubernetes/test/integration/framework" +) + +// setup create kube-apiserver backed up by two separate etcds, +// with one of them containing events and the other all other objects. +func multiEtcdSetup(t testing.TB) (clientset.Interface, framework.CloseFunc) { + etcdArgs := []string{"--experimental-watch-progress-notify-interval", "1s"} + etcd0URL, stopEtcd0, err := framework.RunCustomEtcd("etcd_watchcache0", etcdArgs) + if err != nil { + t.Fatalf("Couldn't start etcd: %v", err) + } + + etcd1URL, stopEtcd1, err := framework.RunCustomEtcd("etcd_watchcache1", etcdArgs) + if err != nil { + t.Fatalf("Couldn't start etcd: %v", err) + } + + etcdOptions := framework.DefaultEtcdOptions() + // Overwrite etcd setup to our custom etcd instances. + etcdOptions.StorageConfig.Transport.ServerList = []string{etcd0URL} + etcdOptions.EtcdServersOverrides = []string{fmt.Sprintf("/events#%s", etcd1URL)} + etcdOptions.EnableWatchCache = true + + opts := framework.MasterConfigOptions{EtcdOptions: etcdOptions} + masterConfig := framework.NewIntegrationTestMasterConfigWithOptions(&opts) + // Switch off endpoints reconciler to avoid unnecessary operations. + masterConfig.ExtraConfig.EndpointReconcilerType = reconcilers.NoneEndpointReconcilerType + _, s, stopMaster := framework.RunAMaster(masterConfig) + + closeFn := func() { + stopMaster() + stopEtcd1() + stopEtcd0() + } + + clientSet, err := clientset.NewForConfig(&restclient.Config{Host: s.URL, QPS: -1}) + if err != nil { + t.Fatalf("Error in create clientset: %v", err) + } + + // Wait for apiserver to be stabilized. + // Everything but default service creation is checked in RunAMaster above by + // waiting for post start hooks, so we just wait for default service to exist. + // TODO(wojtek-t): Figure out less fragile way. + ctx := context.Background() + if err := wait.Poll(100*time.Millisecond, wait.ForeverTestTimeout, func() (bool, error) { + _, err := clientSet.CoreV1().Services("default").Get(ctx, "kubernetes", metav1.GetOptions{}) + return err == nil, nil + }); err != nil { + t.Fatalf("Failed to wait for kubernetes service: %v:", err) + } + return clientSet, closeFn +} + +func TestWatchCacheUpdatedByEtcd(t *testing.T) { + defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.EfficientWatchResumption, true)() + + c, closeFn := multiEtcdSetup(t) + defer closeFn() + + ctx := context.Background() + + makeConfigMap := func(name string) *v1.ConfigMap { + return &v1.ConfigMap{ObjectMeta: metav1.ObjectMeta{Name: name}} + } + makeSecret := func(name string) *v1.Secret { + return &v1.Secret{ObjectMeta: metav1.ObjectMeta{Name: name}} + } + makeEvent := func(name string) *v1.Event { + return &v1.Event{ObjectMeta: metav1.ObjectMeta{Name: name}} + } + + cm, err := c.CoreV1().ConfigMaps("default").Create(ctx, makeConfigMap("name"), metav1.CreateOptions{}) + if err != nil { + t.Errorf("Couldn't create configmap: %v", err) + } + ev, err := c.CoreV1().Events("default").Create(ctx, makeEvent("name"), metav1.CreateOptions{}) + if err != nil { + t.Errorf("Couldn't create event: %v", err) + } + + listOptions := metav1.ListOptions{ + ResourceVersion: "0", + ResourceVersionMatch: metav1.ResourceVersionMatchNotOlderThan, + } + + // Wait until listing from cache returns resource version of corresponding + // resources (being the last updates). + t.Logf("Waiting for configmaps watchcache synced to %s", cm.ResourceVersion) + if err := wait.Poll(100*time.Millisecond, wait.ForeverTestTimeout, func() (bool, error) { + res, err := c.CoreV1().ConfigMaps("default").List(ctx, listOptions) + if err != nil { + return false, nil + } + return res.ResourceVersion == cm.ResourceVersion, nil + }); err != nil { + t.Errorf("Failed to wait for configmaps watchcache synced: %v", err) + } + t.Logf("Waiting for events watchcache synced to %s", ev.ResourceVersion) + if err := wait.Poll(100*time.Millisecond, wait.ForeverTestTimeout, func() (bool, error) { + res, err := c.CoreV1().Events("default").List(ctx, listOptions) + if err != nil { + return false, nil + } + return res.ResourceVersion == ev.ResourceVersion, nil + }); err != nil { + t.Errorf("Failed to wait for events watchcache synced: %v", err) + } + + // Create a secret, that is stored in the same etcd as configmap, but + // different than events. + se, err := c.CoreV1().Secrets("default").Create(ctx, makeSecret("name"), metav1.CreateOptions{}) + if err != nil { + t.Errorf("Couldn't create secret: %v", err) + } + + t.Logf("Waiting for configmaps watchcache synced to %s", se.ResourceVersion) + if err := wait.Poll(100*time.Millisecond, wait.ForeverTestTimeout, func() (bool, error) { + res, err := c.CoreV1().ConfigMaps("default").List(ctx, listOptions) + if err != nil { + return false, nil + } + return res.ResourceVersion == se.ResourceVersion, nil + }); err != nil { + t.Errorf("Failed to wait for configmaps watchcache synced: %v", err) + } + t.Logf("Waiting for events watchcache NOT synced to %s", se.ResourceVersion) + if err := wait.Poll(100*time.Millisecond, 5*time.Second, func() (bool, error) { + res, err := c.CoreV1().Events("default").List(ctx, listOptions) + if err != nil { + return false, nil + } + return res.ResourceVersion == se.ResourceVersion, nil + }); err == nil || err != wait.ErrWaitTimeout { + t.Errorf("Events watchcache unexpected synced: %v", err) + } +} diff --git a/test/integration/framework/etcd.go b/test/integration/framework/etcd.go index b6d2ae8ce6b..4be970e21b3 100644 --- a/test/integration/framework/etcd.go +++ b/test/integration/framework/etcd.go @@ -84,41 +84,54 @@ func startEtcd() (func(), error) { } klog.V(1).Infof("could not connect to etcd: %v", err) + currentURL, stop, err := RunCustomEtcd("integration_test_etcd_data", nil) + if err != nil { + return nil, err + } + + etcdURL = currentURL + os.Setenv("KUBE_INTEGRATION_ETCD_URL", etcdURL) + + return stop, nil +} + +// RunCustomEtcd starts a custom etcd instance for test purposes. +func RunCustomEtcd(dataDir string, customFlags []string) (url string, stopFn func(), err error) { // TODO: Check for valid etcd version. etcdPath, err := getEtcdPath() if err != nil { fmt.Fprintf(os.Stderr, installEtcd) - return nil, fmt.Errorf("could not find etcd in PATH: %v", err) + return "", nil, fmt.Errorf("could not find etcd in PATH: %v", err) } etcdPort, err := getAvailablePort() if err != nil { - return nil, fmt.Errorf("could not get a port: %v", err) + return "", nil, fmt.Errorf("could not get a port: %v", err) } - etcdURL = fmt.Sprintf("http://127.0.0.1:%d", etcdPort) + customURL := fmt.Sprintf("http://127.0.0.1:%d", etcdPort) - klog.Infof("starting etcd on %s", etcdURL) + klog.Infof("starting etcd on %s", customURL) - etcdDataDir, err := ioutil.TempDir(os.TempDir(), "integration_test_etcd_data") + etcdDataDir, err := ioutil.TempDir(os.TempDir(), dataDir) if err != nil { - return nil, fmt.Errorf("unable to make temp etcd data dir: %v", err) + return "", nil, fmt.Errorf("unable to make temp etcd data dir %s: %v", dataDir, err) } klog.Infof("storing etcd data in: %v", etcdDataDir) ctx, cancel := context.WithCancel(context.Background()) - cmd := exec.CommandContext( - ctx, - etcdPath, + args := []string{ "--data-dir", etcdDataDir, "--listen-client-urls", - GetEtcdURL(), + customURL, "--advertise-client-urls", - GetEtcdURL(), + customURL, "--listen-peer-urls", "http://127.0.0.1:0", "--log-package-levels", "*=NOTICE", // set to INFO or DEBUG for more logs - ) + } + args = append(args, customFlags...) + cmd := exec.CommandContext(ctx, etcdPath, args...) cmd.Stdout = os.Stdout cmd.Stderr = os.Stderr stop := func() { @@ -136,14 +149,14 @@ func startEtcd() (func(), error) { clientv3.SetLogger(grpclog.NewLoggerV2(ioutil.Discard, ioutil.Discard, os.Stderr)) if err := cmd.Start(); err != nil { - return nil, fmt.Errorf("failed to run etcd: %v", err) + return "", nil, fmt.Errorf("failed to run etcd: %v", err) } var i int32 = 1 const pollCount = int32(300) for i <= pollCount { - conn, err = net.DialTimeout("tcp", strings.TrimPrefix(etcdURL, "http://"), 1*time.Second) + conn, err := net.DialTimeout("tcp", strings.TrimPrefix(customURL, "http://"), 1*time.Second) if err == nil { conn.Close() break @@ -151,16 +164,14 @@ func startEtcd() (func(), error) { if i == pollCount { stop() - return nil, fmt.Errorf("could not start etcd") + return "", nil, fmt.Errorf("could not start etcd") } time.Sleep(100 * time.Millisecond) i = i + 1 } - os.Setenv("KUBE_INTEGRATION_ETCD_URL", etcdURL) - - return stop, nil + return customURL, stop, nil } // EtcdMain starts an etcd instance before running tests.