diff --git a/staging/src/k8s.io/apiserver/pkg/storage/etcd3/store.go b/staging/src/k8s.io/apiserver/pkg/storage/etcd3/store.go index 210b67084c4..708018cd49b 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/etcd3/store.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/etcd3/store.go @@ -264,11 +264,13 @@ func (s *store) GuaranteedUpdate( key = path.Join(s.pathPrefix, key) var origState *objState + var mustCheckData bool if len(suggestion) == 1 && suggestion[0] != nil { origState, err = s.getStateFromObject(suggestion[0]) if err != nil { return err } + mustCheckData = true } else { getResp, err := s.client.KV.Get(ctx, key, s.getOps...) if err != nil { @@ -297,6 +299,21 @@ func (s *store) GuaranteedUpdate( return err } if !origState.stale && bytes.Equal(data, origState.data) { + // if we skipped the original Get in this loop, we must refresh from + // etcd in order to be sure the data in the store is equivalent to + // our desired serialization + if mustCheckData { + getResp, err := s.client.KV.Get(ctx, key, s.getOps...) + if err != nil { + return err + } + origState, err = s.getState(getResp, key, v, ignoreNotFound) + if err != nil { + return err + } + mustCheckData = false + continue + } return decode(s.codec, s.versioner, origState.data, out, origState.rev) } @@ -330,6 +347,7 @@ func (s *store) GuaranteedUpdate( return err } trace.Step("Retry value restored") + mustCheckData = false continue } putResp := txnResp.Responses[0].GetResponsePut() diff --git a/staging/src/k8s.io/apiserver/pkg/storage/etcd3/store_test.go b/staging/src/k8s.io/apiserver/pkg/storage/etcd3/store_test.go index cc1d76c7789..ed9c3bf3c86 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/etcd3/store_test.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/etcd3/store_test.go @@ -20,6 +20,7 @@ import ( "bytes" "fmt" "reflect" + "strconv" "sync" "testing" @@ -459,6 +460,41 @@ func TestGuaranteedUpdateWithTTL(t *testing.T) { testCheckEventType(t, watch.Deleted, w) } +func TestGuaranteedUpdateChecksStoredData(t *testing.T) { + ctx, store, cluster := testSetup(t) + defer cluster.Terminate(t) + + input := &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo"}} + key := "/somekey" + + // serialize input into etcd with data that would be normalized by a write - in this case, leading + // and trailing whitespace + codec := codecs.LegacyCodec(examplev1.SchemeGroupVersion) + data, err := runtime.Encode(codec, input) + if err != nil { + t.Fatal(err) + } + resp, err := store.client.Put(ctx, key, "test! "+string(data)+" ") + if err != nil { + t.Fatal(err) + } + + // this update should write the canonical value to etcd because the new serialization differs + // from the stored serialization + input.ResourceVersion = strconv.FormatInt(resp.Header.Revision, 10) + out := &example.Pod{} + err = store.GuaranteedUpdate(ctx, key, out, true, nil, + func(_ runtime.Object, _ storage.ResponseMeta) (runtime.Object, *uint64, error) { + return input, nil, nil + }, input) + if err != nil { + t.Fatalf("Update failed: %v", err) + } + if out.ResourceVersion == strconv.FormatInt(resp.Header.Revision, 10) { + t.Errorf("guaranteed update should have updated the serialized data, got %#v", out) + } +} + func TestGuaranteedUpdateWithConflict(t *testing.T) { ctx, store, cluster := testSetup(t) defer cluster.Terminate(t) @@ -569,10 +605,10 @@ func TestTransformationFailure(t *testing.T) { }); !storage.IsInternalError(err) { t.Errorf("Unexpected error: %v", err) } - // GuaranteedUpdate with suggestion should not return an error if we don't change the object + // GuaranteedUpdate with suggestion should return an error if we don't change the object if err := store.GuaranteedUpdate(ctx, preset[1].key, &example.Pod{}, false, nil, func(input runtime.Object, res storage.ResponseMeta) (output runtime.Object, ttl *uint64, err error) { return input, nil, nil - }, preset[1].obj); err != nil { + }, preset[1].obj); err == nil { t.Errorf("Unexpected error: %v", err) }