Merge pull request #43152 from ncdc/watch-cache-retry-live-object-on-conflict

Automatic merge from submit-queue (batch tested with PRs 52176, 43152). If you want to cherry-pick this change to another branch, please follow the instructions <a href="https://github.com/kubernetes/community/blob/master/contributors/devel/cherry-picks.md">here</a>..

etcd3 store: retry with live object on conflict if there was a suggestion

Retry with a live object instead of the cached version if the watch
cache receives a conflict trying to do the update.

Fixes #41892
This commit is contained in:
Kubernetes Submit Queue 2017-09-16 09:45:31 -07:00 committed by GitHub
commit d48611a1da
3 changed files with 73 additions and 10 deletions

View File

@ -19,6 +19,7 @@ go_test(
"//vendor/github.com/coreos/etcd/etcdserver/api/v3rpc/rpctypes:go_default_library",
"//vendor/github.com/coreos/etcd/integration:go_default_library",
"//vendor/golang.org/x/net/context:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/api/errors:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/api/testing:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/fields:go_default_library",

View File

@ -32,6 +32,7 @@ import (
"github.com/golang/glog"
"golang.org/x/net/context"
apierrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/api/meta"
"k8s.io/apimachinery/pkg/conversion"
"k8s.io/apimachinery/pkg/runtime"
@ -271,6 +272,14 @@ func (s *store) GuaranteedUpdate(
}
key = path.Join(s.pathPrefix, key)
getCurrentState := func() (*objState, error) {
getResp, err := s.client.KV.Get(ctx, key, s.getOps...)
if err != nil {
return nil, err
}
return s.getState(getResp, key, v, ignoreNotFound)
}
var origState *objState
var mustCheckData bool
if len(suggestion) == 1 && suggestion[0] != nil {
@ -280,11 +289,7 @@ func (s *store) GuaranteedUpdate(
}
mustCheckData = true
} else {
getResp, err := s.client.KV.Get(ctx, key, s.getOps...)
if err != nil {
return err
}
origState, err = s.getState(getResp, key, v, ignoreNotFound)
origState, err = getCurrentState()
if err != nil {
return err
}
@ -299,6 +304,18 @@ func (s *store) GuaranteedUpdate(
ret, ttl, err := s.updateState(origState, tryUpdate)
if err != nil {
// It's possible we were working with stale data
if mustCheckData && apierrors.IsConflict(err) {
// Actually fetch
origState, err = getCurrentState()
if err != nil {
return err
}
mustCheckData = false
// Retry
continue
}
return err
}
@ -311,11 +328,7 @@ func (s *store) GuaranteedUpdate(
// etcd in order to be sure the data in the store is equivalent to
// our desired serialization
if mustCheckData {
getResp, err := s.client.KV.Get(ctx, key, s.getOps...)
if err != nil {
return err
}
origState, err = s.getState(getResp, key, v, ignoreNotFound)
origState, err = getCurrentState()
if err != nil {
return err
}

View File

@ -20,6 +20,7 @@ import (
"bytes"
"encoding/base64"
"encoding/json"
"errors"
"fmt"
"reflect"
"strconv"
@ -29,6 +30,7 @@ import (
"github.com/coreos/etcd/clientv3"
"github.com/coreos/etcd/integration"
"golang.org/x/net/context"
apierrors "k8s.io/apimachinery/pkg/api/errors"
apitesting "k8s.io/apimachinery/pkg/api/testing"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/fields"
@ -584,6 +586,53 @@ func TestGuaranteedUpdateWithConflict(t *testing.T) {
}
}
func TestGuaranteedUpdateWithSuggestionAndConflict(t *testing.T) {
ctx, store, cluster := testSetup(t)
defer cluster.Terminate(t)
key, originalPod := testPropogateStore(ctx, t, store, &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo"}})
// First, update without a suggestion so originalPod is outdated
updatedPod := &example.Pod{}
err := store.GuaranteedUpdate(ctx, key, updatedPod, false, nil,
storage.SimpleUpdate(func(obj runtime.Object) (runtime.Object, error) {
pod := obj.(*example.Pod)
pod.Name = "foo-2"
return pod, nil
}),
)
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
// Second, update using the outdated originalPod as the suggestion. Return a conflict error when
// passed originalPod, and make sure that SimpleUpdate is called a second time after a live lookup
// with the value of updatedPod.
sawConflict := false
updatedPod2 := &example.Pod{}
err = store.GuaranteedUpdate(ctx, key, updatedPod2, false, nil,
storage.SimpleUpdate(func(obj runtime.Object) (runtime.Object, error) {
pod := obj.(*example.Pod)
if pod.Name != "foo-2" {
if sawConflict {
t.Fatalf("unexpected second conflict")
}
sawConflict = true
// simulated stale object - return a conflict
return nil, apierrors.NewConflict(example.SchemeGroupVersion.WithResource("pods").GroupResource(), "name", errors.New("foo"))
}
pod.Name = "foo-3"
return pod, nil
}),
originalPod,
)
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
if updatedPod2.Name != "foo-3" {
t.Errorf("unexpected pod name: %q", updatedPod2.Name)
}
}
func TestTransformationFailure(t *testing.T) {
codec := apitesting.TestCodec(codecs, examplev1.SchemeGroupVersion)
cluster := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1})