From 461c3701f0915acbf49c339f5321fa86879a963e Mon Sep 17 00:00:00 2001 From: Clayton Coleman Date: Sat, 17 Jun 2017 10:07:20 -0400 Subject: [PATCH] Do not persist SelfLink into etcd storage This behavior regressed in an earlier release. Clearing the self link ensures that a new version is always written and reduces the size of the stored object by a small amount. Add tests to verify that Create and Update result in no SelfLink stored in etcd. --- .../test/integration/registration_test.go | 4 +- .../pkg/storage/etcd/api_object_versioner.go | 11 ++++ .../apiserver/pkg/storage/etcd/etcd_helper.go | 5 +- .../apiserver/pkg/storage/etcd3/store.go | 18 ++--- .../apiserver/pkg/storage/etcd3/store_test.go | 66 +++++++++++++++---- .../apiserver/pkg/storage/interfaces.go | 7 +- 6 files changed, 85 insertions(+), 26 deletions(-) diff --git a/staging/src/k8s.io/apiextensions-apiserver/test/integration/registration_test.go b/staging/src/k8s.io/apiextensions-apiserver/test/integration/registration_test.go index def79f6630d..4fe6471873a 100644 --- a/staging/src/k8s.io/apiextensions-apiserver/test/integration/registration_test.go +++ b/staging/src/k8s.io/apiextensions-apiserver/test/integration/registration_test.go @@ -389,7 +389,7 @@ func TestEtcdStorage(t *testing.T) { Metadata: Metadata{ Name: "noxus.mygroup.example.com", Namespace: "", - SelfLink: "/apis/apiextensions.k8s.io/v1beta1/customresourcedefinitions/noxus.mygroup.example.com", + SelfLink: "", }, }, }, @@ -414,7 +414,7 @@ func TestEtcdStorage(t *testing.T) { Metadata: Metadata{ Name: "curlets.mygroup.example.com", Namespace: "", - SelfLink: "/apis/apiextensions.k8s.io/v1beta1/customresourcedefinitions/curlets.mygroup.example.com", + SelfLink: "", }, }, }, diff --git a/staging/src/k8s.io/apiserver/pkg/storage/etcd/api_object_versioner.go b/staging/src/k8s.io/apiserver/pkg/storage/etcd/api_object_versioner.go index f961e2b3eaa..5b8583a2b57 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/etcd/api_object_versioner.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/etcd/api_object_versioner.go @@ -56,6 +56,17 @@ func (a APIObjectVersioner) UpdateList(obj runtime.Object, resourceVersion uint6 return nil } +// PrepareObjectForStorage clears resource version and self link prior to writing to etcd. +func (a APIObjectVersioner) PrepareObjectForStorage(obj runtime.Object) error { + accessor, err := meta.Accessor(obj) + if err != nil { + return err + } + accessor.SetResourceVersion("") + accessor.SetSelfLink("") + return nil +} + // ObjectResourceVersion implements Versioner func (a APIObjectVersioner) ObjectResourceVersion(obj runtime.Object) (uint64, error) { accessor, err := meta.Accessor(obj) diff --git a/staging/src/k8s.io/apiserver/pkg/storage/etcd/etcd_helper.go b/staging/src/k8s.io/apiserver/pkg/storage/etcd/etcd_helper.go index c2c513f503f..d1d7a846c18 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/etcd/etcd_helper.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/etcd/etcd_helper.go @@ -128,6 +128,9 @@ func (h *etcdHelper) Create(ctx context.Context, key string, obj, out runtime.Ob if version, err := h.versioner.ObjectResourceVersion(obj); err == nil && version != 0 { return errors.New("resourceVersion may not be set on objects to be created") } + if err := h.versioner.PrepareObjectForStorage(obj); err != nil { + return fmt.Errorf("PrepareObjectForStorage returned an error: %v", err) + } trace.Step("Version checked") startTime := time.Now() @@ -530,7 +533,7 @@ func (h *etcdHelper) GuaranteedUpdate( } // Since update object may have a resourceVersion set, we need to clear it here. - if err := h.versioner.UpdateObject(ret, 0); err != nil { + if err := h.versioner.PrepareObjectForStorage(ret); err != nil { return errors.New("resourceVersion cannot be set on objects store in etcd") } diff --git a/staging/src/k8s.io/apiserver/pkg/storage/etcd3/store.go b/staging/src/k8s.io/apiserver/pkg/storage/etcd3/store.go index 708018cd49b..d00a26d8922 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/etcd3/store.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/etcd3/store.go @@ -147,6 +147,9 @@ func (s *store) Create(ctx context.Context, key string, obj, out runtime.Object, if version, err := s.versioner.ObjectResourceVersion(obj); err == nil && version != 0 { return errors.New("resourceVersion should not be set on objects to be created") } + if err := s.versioner.PrepareObjectForStorage(obj); err != nil { + return fmt.Errorf("PrepareObjectForStorage failed: %v", err) + } data, err := runtime.Encode(s.codec, obj) if err != nil { return err @@ -486,8 +489,8 @@ func (s *store) getStateFromObject(obj runtime.Object) (*objState, error) { // Compute the serialized form - for that we need to temporarily clean // its resource version field (those are not stored in etcd). - if err := s.versioner.UpdateObject(obj, 0); err != nil { - return nil, errors.New("resourceVersion cannot be set on objects store in etcd") + if err := s.versioner.PrepareObjectForStorage(obj); err != nil { + return nil, fmt.Errorf("PrepareObjectForStorage failed: %v", err) } state.data, err = runtime.Encode(s.codec, obj) if err != nil { @@ -503,15 +506,8 @@ func (s *store) updateState(st *objState, userUpdate storage.UpdateFunc) (runtim return nil, 0, err } - version, err := s.versioner.ObjectResourceVersion(ret) - if err != nil { - return nil, 0, err - } - if version != 0 { - // We cannot store object with resourceVersion in etcd. We need to reset it. - if err := s.versioner.UpdateObject(ret, 0); err != nil { - return nil, 0, fmt.Errorf("UpdateObject failed: %v", err) - } + if err := s.versioner.PrepareObjectForStorage(ret); err != nil { + return nil, 0, fmt.Errorf("PrepareObjectForStorage failed: %v", err) } var ttl uint64 if ttlPtr != nil { diff --git a/staging/src/k8s.io/apiserver/pkg/storage/etcd3/store_test.go b/staging/src/k8s.io/apiserver/pkg/storage/etcd3/store_test.go index ed9c3bf3c86..eb123f8554a 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/etcd3/store_test.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/etcd3/store_test.go @@ -38,6 +38,7 @@ import ( storagetests "k8s.io/apiserver/pkg/storage/tests" "k8s.io/apiserver/pkg/storage/value" + "github.com/coreos/etcd/clientv3" "github.com/coreos/etcd/integration" "golang.org/x/net/context" ) @@ -45,6 +46,8 @@ import ( var scheme = runtime.NewScheme() var codecs = serializer.NewCodecFactory(scheme) +const defaultTestPrefix = "test!" + func init() { metav1.AddToGroupVersion(scheme, metav1.SchemeGroupVersion) example.AddToScheme(scheme) @@ -84,7 +87,7 @@ func TestCreate(t *testing.T) { key := "/testkey" out := &example.Pod{} - obj := &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo"}} + obj := &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo", SelfLink: "testlink"}} // verify that kv pair is empty before set getResp, err := etcdClient.KV.Get(ctx, key) @@ -106,15 +109,32 @@ func TestCreate(t *testing.T) { if out.ResourceVersion == "" { t.Errorf("output should have non-empty resource version") } + if out.SelfLink != "" { + t.Errorf("output should have empty self link") + } - // verify that kv pair is not empty after set - getResp, err = etcdClient.KV.Get(ctx, key) + checkStorageInvariants(ctx, t, etcdClient, store, key) +} + +func checkStorageInvariants(ctx context.Context, t *testing.T, etcdClient *clientv3.Client, store *store, key string) { + getResp, err := etcdClient.KV.Get(ctx, key) if err != nil { t.Fatalf("etcdClient.KV.Get failed: %v", err) } if len(getResp.Kvs) == 0 { t.Fatalf("expecting non empty result on key: %s", key) } + decoded, err := runtime.Decode(store.codec, getResp.Kvs[0].Value[len(defaultTestPrefix):]) + if err != nil { + t.Fatalf("expecting successful decode of object from %v\n%v", err, string(getResp.Kvs[0].Value)) + } + obj := decoded.(*example.Pod) + if obj.ResourceVersion != "" { + t.Errorf("stored object should have empty resource version") + } + if obj.SelfLink != "" { + t.Errorf("stored output should have empty self link") + } } func TestCreateWithTTL(t *testing.T) { @@ -316,7 +336,8 @@ func TestGetToList(t *testing.T) { func TestGuaranteedUpdate(t *testing.T) { ctx, store, cluster := testSetup(t) defer cluster.Terminate(t) - key, storeObj := testPropogateStore(ctx, t, store, &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo", UID: "A"}}) + etcdClient := cluster.RandClient() + key := "/testkey" tests := []struct { key string @@ -326,6 +347,7 @@ func TestGuaranteedUpdate(t *testing.T) { expectInvalidObjErr bool expectNoUpdate bool transformStale bool + hasSelfLink bool }{{ // GuaranteedUpdate on non-existing key with ignoreNotFound=false key: "/non-existing", ignoreNotFound: false, @@ -354,6 +376,14 @@ func TestGuaranteedUpdate(t *testing.T) { expectNotFoundErr: false, expectInvalidObjErr: false, expectNoUpdate: true, + }, { // GuaranteedUpdate with same data AND a self link + key: key, + ignoreNotFound: false, + precondition: nil, + expectNotFoundErr: false, + expectInvalidObjErr: false, + expectNoUpdate: true, + hasSelfLink: true, }, { // GuaranteedUpdate with same data but stale key: key, ignoreNotFound: false, @@ -379,6 +409,8 @@ func TestGuaranteedUpdate(t *testing.T) { }} for i, tt := range tests { + key, storeObj := testPropogateStore(ctx, t, store, &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo", UID: "A"}}) + out := &example.Pod{} name := fmt.Sprintf("foo-%d", i) if tt.expectNoUpdate { @@ -399,6 +431,9 @@ func TestGuaranteedUpdate(t *testing.T) { } } pod := *storeObj + if tt.hasSelfLink { + pod.SelfLink = "testlink" + } pod.Name = name return &pod, nil })) @@ -422,6 +457,13 @@ func TestGuaranteedUpdate(t *testing.T) { if out.ObjectMeta.Name != name { t.Errorf("#%d: pod name want=%s, get=%s", i, name, out.ObjectMeta.Name) } + if out.SelfLink != "" { + t.Errorf("#%d: selflink should not be set", i) + } + + // verify that kv pair is not empty after set and that the underlying data matches expectations + checkStorageInvariants(ctx, t, etcdClient, store, key) + switch tt.expectNoUpdate { case true: if version != out.ResourceVersion { @@ -432,7 +474,6 @@ func TestGuaranteedUpdate(t *testing.T) { t.Errorf("#%d: expect version change, but get the same version=%s", i, version) } } - storeObj = out } } @@ -546,7 +587,7 @@ func TestTransformationFailure(t *testing.T) { codec := apitesting.TestCodec(codecs, examplev1.SchemeGroupVersion) cluster := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1}) defer cluster.Terminate(t) - store := newStore(cluster.RandClient(), false, codec, "", prefixTransformer{prefix: []byte("test!")}) + store := newStore(cluster.RandClient(), false, codec, "", prefixTransformer{prefix: []byte(defaultTestPrefix)}) ctx := context.Background() preset := []struct { @@ -626,7 +667,7 @@ func TestList(t *testing.T) { codec := apitesting.TestCodec(codecs, examplev1.SchemeGroupVersion) cluster := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1}) defer cluster.Terminate(t) - store := newStore(cluster.RandClient(), false, codec, "", prefixTransformer{prefix: []byte("test!")}) + store := newStore(cluster.RandClient(), false, codec, "", prefixTransformer{prefix: []byte(defaultTestPrefix)}) ctx := context.Background() // Setup storage with the following structure: @@ -714,7 +755,7 @@ func TestList(t *testing.T) { func testSetup(t *testing.T) (context.Context, *store, *integration.ClusterV3) { codec := apitesting.TestCodec(codecs, examplev1.SchemeGroupVersion) cluster := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1}) - store := newStore(cluster.RandClient(), false, codec, "", prefixTransformer{prefix: []byte("test!")}) + store := newStore(cluster.RandClient(), false, codec, "", prefixTransformer{prefix: []byte(defaultTestPrefix)}) ctx := context.Background() return ctx, store, cluster } @@ -724,9 +765,12 @@ func testSetup(t *testing.T) (context.Context, *store, *integration.ClusterV3) { func testPropogateStore(ctx context.Context, t *testing.T, store *store, obj *example.Pod) (string, *example.Pod) { // Setup store with a key and grab the output for returning. key := "/testkey" + err := store.unconditionalDelete(ctx, key, &example.Pod{}) + if err != nil && !storage.IsNotFound(err) { + t.Fatal("Cleanup failed: %v", err) + } setOutput := &example.Pod{} - err := store.Create(ctx, key, obj, setOutput, 0) - if err != nil { + if err := store.Create(ctx, key, obj, setOutput, 0); err != nil { t.Fatalf("Set failed: %v", err) } return key, setOutput @@ -736,7 +780,7 @@ func TestPrefix(t *testing.T) { codec := apitesting.TestCodec(codecs, examplev1.SchemeGroupVersion) cluster := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1}) defer cluster.Terminate(t) - transformer := prefixTransformer{prefix: []byte("test!")} + transformer := prefixTransformer{prefix: []byte(defaultTestPrefix)} testcases := map[string]string{ "custom/prefix": "/custom/prefix", "/custom//prefix//": "/custom/prefix", diff --git a/staging/src/k8s.io/apiserver/pkg/storage/interfaces.go b/staging/src/k8s.io/apiserver/pkg/storage/interfaces.go index 74abfdc3e0c..94fb58f40b2 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/interfaces.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/interfaces.go @@ -26,7 +26,9 @@ import ( ) // Versioner abstracts setting and retrieving metadata fields from database response -// onto the object ot list. +// onto the object ot list. It is required to maintain storage invariants - updating an +// object twice with the same data except for the ResourceVersion and SelfLink must be +// a no-op. type Versioner interface { // UpdateObject sets storage metadata into an API object. Returns an error if the object // cannot be updated correctly. May return nil if the requested object does not need metadata @@ -36,6 +38,9 @@ type Versioner interface { // cannot be updated correctly. May return nil if the requested object does not need metadata // from database. UpdateList(obj runtime.Object, resourceVersion uint64) error + // PrepareObjectForStorage should set SelfLink and ResourceVersion to the empty value. Should + // return an error if the specified object cannot be updated. + PrepareObjectForStorage(obj runtime.Object) error // ObjectResourceVersion returns the resource version (for persistence) of the specified object. // Should return an error if the specified object does not have a persistable version. ObjectResourceVersion(obj runtime.Object) (uint64, error)