From b851614adfe2b39941d518485480ff527fa4f0c1 Mon Sep 17 00:00:00 2001 From: Clayton Coleman Date: Sat, 1 Jul 2017 23:50:40 -0400 Subject: [PATCH] GuaranteedUpdate must write if stored data is not canonical An optimization added to the GuaranteedUpdate loop changed the comparison of the current objects serialization against the stored data, instead comparing to the in memory object, which defeated the mechanism we use to migrate stored data. This commit preserves that optimization but correctly verifies the in memory serialization against the on disk serialization by fetching the latest serialized data. Since most updates are not no-ops, this should not regress the performance of the normal path. --- .../apiserver/pkg/storage/etcd3/store.go | 18 +++++++++ .../apiserver/pkg/storage/etcd3/store_test.go | 40 ++++++++++++++++++- 2 files changed, 56 insertions(+), 2 deletions(-) diff --git a/staging/src/k8s.io/apiserver/pkg/storage/etcd3/store.go b/staging/src/k8s.io/apiserver/pkg/storage/etcd3/store.go index 210b67084c4..708018cd49b 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/etcd3/store.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/etcd3/store.go @@ -264,11 +264,13 @@ func (s *store) GuaranteedUpdate( key = path.Join(s.pathPrefix, key) var origState *objState + var mustCheckData bool if len(suggestion) == 1 && suggestion[0] != nil { origState, err = s.getStateFromObject(suggestion[0]) if err != nil { return err } + mustCheckData = true } else { getResp, err := s.client.KV.Get(ctx, key, s.getOps...) if err != nil { @@ -297,6 +299,21 @@ func (s *store) GuaranteedUpdate( return err } if !origState.stale && bytes.Equal(data, origState.data) { + // if we skipped the original Get in this loop, we must refresh from + // etcd in order to be sure the data in the store is equivalent to + // our desired serialization + if mustCheckData { + getResp, err := s.client.KV.Get(ctx, key, s.getOps...) + if err != nil { + return err + } + origState, err = s.getState(getResp, key, v, ignoreNotFound) + if err != nil { + return err + } + mustCheckData = false + continue + } return decode(s.codec, s.versioner, origState.data, out, origState.rev) } @@ -330,6 +347,7 @@ func (s *store) GuaranteedUpdate( return err } trace.Step("Retry value restored") + mustCheckData = false continue } putResp := txnResp.Responses[0].GetResponsePut() diff --git a/staging/src/k8s.io/apiserver/pkg/storage/etcd3/store_test.go b/staging/src/k8s.io/apiserver/pkg/storage/etcd3/store_test.go index cc1d76c7789..ed9c3bf3c86 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/etcd3/store_test.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/etcd3/store_test.go @@ -20,6 +20,7 @@ import ( "bytes" "fmt" "reflect" + "strconv" "sync" "testing" @@ -459,6 +460,41 @@ func TestGuaranteedUpdateWithTTL(t *testing.T) { testCheckEventType(t, watch.Deleted, w) } +func TestGuaranteedUpdateChecksStoredData(t *testing.T) { + ctx, store, cluster := testSetup(t) + defer cluster.Terminate(t) + + input := &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo"}} + key := "/somekey" + + // serialize input into etcd with data that would be normalized by a write - in this case, leading + // and trailing whitespace + codec := codecs.LegacyCodec(examplev1.SchemeGroupVersion) + data, err := runtime.Encode(codec, input) + if err != nil { + t.Fatal(err) + } + resp, err := store.client.Put(ctx, key, "test! "+string(data)+" ") + if err != nil { + t.Fatal(err) + } + + // this update should write the canonical value to etcd because the new serialization differs + // from the stored serialization + input.ResourceVersion = strconv.FormatInt(resp.Header.Revision, 10) + out := &example.Pod{} + err = store.GuaranteedUpdate(ctx, key, out, true, nil, + func(_ runtime.Object, _ storage.ResponseMeta) (runtime.Object, *uint64, error) { + return input, nil, nil + }, input) + if err != nil { + t.Fatalf("Update failed: %v", err) + } + if out.ResourceVersion == strconv.FormatInt(resp.Header.Revision, 10) { + t.Errorf("guaranteed update should have updated the serialized data, got %#v", out) + } +} + func TestGuaranteedUpdateWithConflict(t *testing.T) { ctx, store, cluster := testSetup(t) defer cluster.Terminate(t) @@ -569,10 +605,10 @@ func TestTransformationFailure(t *testing.T) { }); !storage.IsInternalError(err) { t.Errorf("Unexpected error: %v", err) } - // GuaranteedUpdate with suggestion should not return an error if we don't change the object + // GuaranteedUpdate with suggestion should return an error if we don't change the object if err := store.GuaranteedUpdate(ctx, preset[1].key, &example.Pod{}, false, nil, func(input runtime.Object, res storage.ResponseMeta) (output runtime.Object, ttl *uint64, err error) { return input, nil, nil - }, preset[1].obj); err != nil { + }, preset[1].obj); err == nil { t.Errorf("Unexpected error: %v", err) }