diff --git a/staging/src/k8s.io/apiserver/pkg/storage/etcd3/store.go b/staging/src/k8s.io/apiserver/pkg/storage/etcd3/store.go index ff654adec57..21eca5b2435 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/etcd3/store.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/etcd3/store.go @@ -332,13 +332,16 @@ func (s *store) GuaranteedUpdate( if err != nil { return err } + mustCheckData = false if !bytes.Equal(data, origState.data) { // original data changed, restart loop - mustCheckData = false continue } } - return decode(s.codec, s.versioner, origState.data, out, origState.rev) + // recheck that the data from etcd is not stale before short-circuiting a write + if !origState.stale { + return decode(s.codec, s.versioner, origState.data, out, origState.rev) + } } newData, err := s.transformer.TransformToStorage(data, transformContext) diff --git a/staging/src/k8s.io/apiserver/pkg/storage/etcd3/store_test.go b/staging/src/k8s.io/apiserver/pkg/storage/etcd3/store_test.go index d7c7f0f10d3..b6666aa9a38 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/etcd3/store_test.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/etcd3/store_test.go @@ -526,6 +526,8 @@ func TestGuaranteedUpdateChecksStoredData(t *testing.T) { t.Fatal(err) } + store.transformer = prefixTransformer{prefix: []byte(defaultTestPrefix)} + // this update should write the canonical value to etcd because the new serialization differs // from the stored serialization input.ResourceVersion = strconv.FormatInt(resp.Header.Revision, 10) @@ -540,6 +542,36 @@ func TestGuaranteedUpdateChecksStoredData(t *testing.T) { if out.ResourceVersion == strconv.FormatInt(resp.Header.Revision, 10) { t.Errorf("guaranteed update should have updated the serialized data, got %#v", out) } + + lastVersion := out.ResourceVersion + + // this update should not write to etcd because the input matches the stored data + input = out + out = &example.Pod{} + err = store.GuaranteedUpdate(ctx, key, out, true, nil, + func(_ runtime.Object, _ storage.ResponseMeta) (runtime.Object, *uint64, error) { + return input, nil, nil + }, input) + if err != nil { + t.Fatalf("Update failed: %v", err) + } + if out.ResourceVersion != lastVersion { + t.Errorf("guaranteed update should have short-circuited write, got %#v", out) + } + + store.transformer = prefixTransformer{prefix: []byte(defaultTestPrefix), stale: true} + + // this update should write to etcd because the transformer reported stale + err = store.GuaranteedUpdate(ctx, key, out, true, nil, + func(_ runtime.Object, _ storage.ResponseMeta) (runtime.Object, *uint64, error) { + return input, nil, nil + }, input) + if err != nil { + t.Fatalf("Update failed: %v", err) + } + if out.ResourceVersion == lastVersion { + t.Errorf("guaranteed update should have written to etcd when transformer reported stale, got %#v", out) + } } func TestGuaranteedUpdateWithConflict(t *testing.T) {