diff --git a/staging/src/k8s.io/apiserver/pkg/storage/etcd3/BUILD b/staging/src/k8s.io/apiserver/pkg/storage/etcd3/BUILD index 5cdfd13c143..ad5b06d819c 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/etcd3/BUILD +++ b/staging/src/k8s.io/apiserver/pkg/storage/etcd3/BUILD @@ -19,6 +19,7 @@ go_test( "//vendor/github.com/coreos/etcd/etcdserver/api/v3rpc/rpctypes:go_default_library", "//vendor/github.com/coreos/etcd/integration:go_default_library", "//vendor/golang.org/x/net/context:go_default_library", + "//vendor/k8s.io/apimachinery/pkg/api/errors:go_default_library", "//vendor/k8s.io/apimachinery/pkg/api/testing:go_default_library", "//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library", "//vendor/k8s.io/apimachinery/pkg/fields:go_default_library", diff --git a/staging/src/k8s.io/apiserver/pkg/storage/etcd3/store.go b/staging/src/k8s.io/apiserver/pkg/storage/etcd3/store.go index 69c715745cc..e5a67f043ec 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/etcd3/store.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/etcd3/store.go @@ -32,6 +32,7 @@ import ( "github.com/golang/glog" "golang.org/x/net/context" + apierrors "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/api/meta" "k8s.io/apimachinery/pkg/conversion" "k8s.io/apimachinery/pkg/runtime" @@ -271,6 +272,14 @@ func (s *store) GuaranteedUpdate( } key = path.Join(s.pathPrefix, key) + getCurrentState := func() (*objState, error) { + getResp, err := s.client.KV.Get(ctx, key, s.getOps...) + if err != nil { + return nil, err + } + return s.getState(getResp, key, v, ignoreNotFound) + } + var origState *objState var mustCheckData bool if len(suggestion) == 1 && suggestion[0] != nil { @@ -280,11 +289,7 @@ func (s *store) GuaranteedUpdate( } mustCheckData = true } else { - getResp, err := s.client.KV.Get(ctx, key, s.getOps...) - if err != nil { - return err - } - origState, err = s.getState(getResp, key, v, ignoreNotFound) + origState, err = getCurrentState() if err != nil { return err } @@ -299,6 +304,18 @@ func (s *store) GuaranteedUpdate( ret, ttl, err := s.updateState(origState, tryUpdate) if err != nil { + // It's possible we were working with stale data + if mustCheckData && apierrors.IsConflict(err) { + // Actually fetch + origState, err = getCurrentState() + if err != nil { + return err + } + mustCheckData = false + // Retry + continue + } + return err } @@ -311,11 +328,7 @@ func (s *store) GuaranteedUpdate( // etcd in order to be sure the data in the store is equivalent to // our desired serialization if mustCheckData { - getResp, err := s.client.KV.Get(ctx, key, s.getOps...) - if err != nil { - return err - } - origState, err = s.getState(getResp, key, v, ignoreNotFound) + origState, err = getCurrentState() if err != nil { return err } diff --git a/staging/src/k8s.io/apiserver/pkg/storage/etcd3/store_test.go b/staging/src/k8s.io/apiserver/pkg/storage/etcd3/store_test.go index e46f73a9665..6ee72431d9c 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/etcd3/store_test.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/etcd3/store_test.go @@ -20,6 +20,7 @@ import ( "bytes" "encoding/base64" "encoding/json" + "errors" "fmt" "reflect" "strconv" @@ -29,6 +30,7 @@ import ( "github.com/coreos/etcd/clientv3" "github.com/coreos/etcd/integration" "golang.org/x/net/context" + apierrors "k8s.io/apimachinery/pkg/api/errors" apitesting "k8s.io/apimachinery/pkg/api/testing" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/fields" @@ -584,6 +586,53 @@ func TestGuaranteedUpdateWithConflict(t *testing.T) { } } +func TestGuaranteedUpdateWithSuggestionAndConflict(t *testing.T) { + ctx, store, cluster := testSetup(t) + defer cluster.Terminate(t) + key, originalPod := testPropogateStore(ctx, t, store, &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo"}}) + + // First, update without a suggestion so originalPod is outdated + updatedPod := &example.Pod{} + err := store.GuaranteedUpdate(ctx, key, updatedPod, false, nil, + storage.SimpleUpdate(func(obj runtime.Object) (runtime.Object, error) { + pod := obj.(*example.Pod) + pod.Name = "foo-2" + return pod, nil + }), + ) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + + // Second, update using the outdated originalPod as the suggestion. Return a conflict error when + // passed originalPod, and make sure that SimpleUpdate is called a second time after a live lookup + // with the value of updatedPod. + sawConflict := false + updatedPod2 := &example.Pod{} + err = store.GuaranteedUpdate(ctx, key, updatedPod2, false, nil, + storage.SimpleUpdate(func(obj runtime.Object) (runtime.Object, error) { + pod := obj.(*example.Pod) + if pod.Name != "foo-2" { + if sawConflict { + t.Fatalf("unexpected second conflict") + } + sawConflict = true + // simulated stale object - return a conflict + return nil, apierrors.NewConflict(example.SchemeGroupVersion.WithResource("pods").GroupResource(), "name", errors.New("foo")) + } + pod.Name = "foo-3" + return pod, nil + }), + originalPod, + ) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + if updatedPod2.Name != "foo-3" { + t.Errorf("unexpected pod name: %q", updatedPod2.Name) + } +} + func TestTransformationFailure(t *testing.T) { codec := apitesting.TestCodec(codecs, examplev1.SchemeGroupVersion) cluster := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1})