mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-09-06 19:52:42 +00:00
GuaranteedUpdate must write if stored data is not canonical
An optimization added to the GuaranteedUpdate loop changed the comparison of the current objects serialization against the stored data, instead comparing to the in memory object, which defeated the mechanism we use to migrate stored data. This commit preserves that optimization but correctly verifies the in memory serialization against the on disk serialization by fetching the latest serialized data. Since most updates are not no-ops, this should not regress the performance of the normal path.
This commit is contained in:
@@ -264,11 +264,13 @@ func (s *store) GuaranteedUpdate(
|
||||
key = path.Join(s.pathPrefix, key)
|
||||
|
||||
var origState *objState
|
||||
var mustCheckData bool
|
||||
if len(suggestion) == 1 && suggestion[0] != nil {
|
||||
origState, err = s.getStateFromObject(suggestion[0])
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
mustCheckData = true
|
||||
} else {
|
||||
getResp, err := s.client.KV.Get(ctx, key, s.getOps...)
|
||||
if err != nil {
|
||||
@@ -297,6 +299,21 @@ func (s *store) GuaranteedUpdate(
|
||||
return err
|
||||
}
|
||||
if !origState.stale && bytes.Equal(data, origState.data) {
|
||||
// if we skipped the original Get in this loop, we must refresh from
|
||||
// etcd in order to be sure the data in the store is equivalent to
|
||||
// our desired serialization
|
||||
if mustCheckData {
|
||||
getResp, err := s.client.KV.Get(ctx, key, s.getOps...)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
origState, err = s.getState(getResp, key, v, ignoreNotFound)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
mustCheckData = false
|
||||
continue
|
||||
}
|
||||
return decode(s.codec, s.versioner, origState.data, out, origState.rev)
|
||||
}
|
||||
|
||||
@@ -330,6 +347,7 @@ func (s *store) GuaranteedUpdate(
|
||||
return err
|
||||
}
|
||||
trace.Step("Retry value restored")
|
||||
mustCheckData = false
|
||||
continue
|
||||
}
|
||||
putResp := txnResp.Responses[0].GetResponsePut()
|
||||
|
@@ -20,6 +20,7 @@ import (
|
||||
"bytes"
|
||||
"fmt"
|
||||
"reflect"
|
||||
"strconv"
|
||||
"sync"
|
||||
"testing"
|
||||
|
||||
@@ -459,6 +460,41 @@ func TestGuaranteedUpdateWithTTL(t *testing.T) {
|
||||
testCheckEventType(t, watch.Deleted, w)
|
||||
}
|
||||
|
||||
func TestGuaranteedUpdateChecksStoredData(t *testing.T) {
|
||||
ctx, store, cluster := testSetup(t)
|
||||
defer cluster.Terminate(t)
|
||||
|
||||
input := &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo"}}
|
||||
key := "/somekey"
|
||||
|
||||
// serialize input into etcd with data that would be normalized by a write - in this case, leading
|
||||
// and trailing whitespace
|
||||
codec := codecs.LegacyCodec(examplev1.SchemeGroupVersion)
|
||||
data, err := runtime.Encode(codec, input)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
resp, err := store.client.Put(ctx, key, "test! "+string(data)+" ")
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
// this update should write the canonical value to etcd because the new serialization differs
|
||||
// from the stored serialization
|
||||
input.ResourceVersion = strconv.FormatInt(resp.Header.Revision, 10)
|
||||
out := &example.Pod{}
|
||||
err = store.GuaranteedUpdate(ctx, key, out, true, nil,
|
||||
func(_ runtime.Object, _ storage.ResponseMeta) (runtime.Object, *uint64, error) {
|
||||
return input, nil, nil
|
||||
}, input)
|
||||
if err != nil {
|
||||
t.Fatalf("Update failed: %v", err)
|
||||
}
|
||||
if out.ResourceVersion == strconv.FormatInt(resp.Header.Revision, 10) {
|
||||
t.Errorf("guaranteed update should have updated the serialized data, got %#v", out)
|
||||
}
|
||||
}
|
||||
|
||||
func TestGuaranteedUpdateWithConflict(t *testing.T) {
|
||||
ctx, store, cluster := testSetup(t)
|
||||
defer cluster.Terminate(t)
|
||||
@@ -569,10 +605,10 @@ func TestTransformationFailure(t *testing.T) {
|
||||
}); !storage.IsInternalError(err) {
|
||||
t.Errorf("Unexpected error: %v", err)
|
||||
}
|
||||
// GuaranteedUpdate with suggestion should not return an error if we don't change the object
|
||||
// GuaranteedUpdate with suggestion should return an error if we don't change the object
|
||||
if err := store.GuaranteedUpdate(ctx, preset[1].key, &example.Pod{}, false, nil, func(input runtime.Object, res storage.ResponseMeta) (output runtime.Object, ttl *uint64, err error) {
|
||||
return input, nil, nil
|
||||
}, preset[1].obj); err != nil {
|
||||
}, preset[1].obj); err == nil {
|
||||
t.Errorf("Unexpected error: %v", err)
|
||||
}
|
||||
|
||||
|
Reference in New Issue
Block a user