diff --git a/pkg/registry/core/pod/storage/storage.go b/pkg/registry/core/pod/storage/storage.go index 2cf88faaf65..5017395f231 100644 --- a/pkg/registry/core/pod/storage/storage.go +++ b/pkg/registry/core/pod/storage/storage.go @@ -26,6 +26,7 @@ import ( "k8s.io/apimachinery/pkg/api/meta" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/types" "k8s.io/apiserver/pkg/registry/generic" genericregistry "k8s.io/apiserver/pkg/registry/generic/registry" "k8s.io/apiserver/pkg/registry/rest" @@ -180,7 +181,7 @@ func (r *BindingREST) Create(ctx context.Context, name string, obj runtime.Objec } } - err = r.assignPod(ctx, binding.Name, binding.Target.Name, binding.Annotations, dryrun.IsDryRun(options.DryRun)) + err = r.assignPod(ctx, binding.UID, binding.ResourceVersion, binding.Name, binding.Target.Name, binding.Annotations, dryrun.IsDryRun(options.DryRun)) out = &metav1.Status{Status: metav1.StatusSuccess} return } @@ -188,12 +189,24 @@ func (r *BindingREST) Create(ctx context.Context, name string, obj runtime.Objec // setPodHostAndAnnotations sets the given pod's host to 'machine' if and only if it was // previously 'oldMachine' and merges the provided annotations with those of the pod. // Returns the current state of the pod, or an error. -func (r *BindingREST) setPodHostAndAnnotations(ctx context.Context, podID, oldMachine, machine string, annotations map[string]string, dryRun bool) (finalPod *api.Pod, err error) { +func (r *BindingREST) setPodHostAndAnnotations(ctx context.Context, podUID types.UID, podResourceVersion, podID, oldMachine, machine string, annotations map[string]string, dryRun bool) (finalPod *api.Pod, err error) { podKey, err := r.store.KeyFunc(ctx, podID) if err != nil { return nil, err } - err = r.store.Storage.GuaranteedUpdate(ctx, podKey, &api.Pod{}, false, nil, storage.SimpleUpdate(func(obj runtime.Object) (runtime.Object, error) { + + var preconditions *storage.Preconditions + if podUID != "" || podResourceVersion != "" { + preconditions = &storage.Preconditions{} + if podUID != "" { + preconditions.UID = &podUID + } + if podResourceVersion != "" { + preconditions.ResourceVersion = &podResourceVersion + } + } + + err = r.store.Storage.GuaranteedUpdate(ctx, podKey, &api.Pod{}, false, preconditions, storage.SimpleUpdate(func(obj runtime.Object) (runtime.Object, error) { pod, ok := obj.(*api.Pod) if !ok { return nil, fmt.Errorf("unexpected object: %#v", obj) @@ -222,8 +235,8 @@ func (r *BindingREST) setPodHostAndAnnotations(ctx context.Context, podID, oldMa } // assignPod assigns the given pod to the given machine. -func (r *BindingREST) assignPod(ctx context.Context, podID string, machine string, annotations map[string]string, dryRun bool) (err error) { - if _, err = r.setPodHostAndAnnotations(ctx, podID, "", machine, annotations, dryRun); err != nil { +func (r *BindingREST) assignPod(ctx context.Context, podUID types.UID, podResourceVersion, podID string, machine string, annotations map[string]string, dryRun bool) (err error) { + if _, err = r.setPodHostAndAnnotations(ctx, podUID, podResourceVersion, podID, "", machine, annotations, dryRun); err != nil { err = storeerr.InterpretGetError(err, api.Resource("pods"), podID) err = storeerr.InterpretUpdateError(err, api.Resource("pods"), podID) if _, ok := err.(*errors.StatusError); !ok { diff --git a/pkg/registry/core/pod/storage/storage_test.go b/pkg/registry/core/pod/storage/storage_test.go index e4254e37d3e..4f671e2c48b 100644 --- a/pkg/registry/core/pod/storage/storage_test.go +++ b/pkg/registry/core/pod/storage/storage_test.go @@ -31,6 +31,7 @@ import ( "k8s.io/apimachinery/pkg/fields" "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/util/diff" genericapirequest "k8s.io/apiserver/pkg/endpoints/request" "k8s.io/apiserver/pkg/registry/generic" @@ -744,6 +745,133 @@ func TestEtcdCreateWithConflict(t *testing.T) { } } +func validNewBinding() *api.Binding { + return &api.Binding{ + ObjectMeta: metav1.ObjectMeta{Name: "foo"}, + Target: api.ObjectReference{Name: "machine", Kind: "Node"}, + } +} + +func TestEtcdCreateBindingWithUIDAndResourceVersion(t *testing.T) { + originUID := func(pod *api.Pod) types.UID { + return pod.UID + } + emptyUID := func(pod *api.Pod) types.UID { + return "" + } + changedUID := func(pod *api.Pod) types.UID { + return pod.UID + "-changed" + } + + originResourceVersion := func(pod *api.Pod) string { + return pod.ResourceVersion + } + emptyResourceVersion := func(pod *api.Pod) string { + return "" + } + changedResourceVersion := func(pod *api.Pod) string { + return pod.ResourceVersion + "-changed" + } + + noError := func(err error) bool { + return err == nil + } + conflictError := func(err error) bool { + return err != nil && errors.IsConflict(err) + } + + testCases := map[string]struct { + podUIDGetter func(pod *api.Pod) types.UID + podResourceVersionGetter func(pod *api.Pod) string + errOK func(error) bool + expectedNodeName string + }{ + "originUID-originResourceVersion": { + podUIDGetter: originUID, + podResourceVersionGetter: originResourceVersion, + errOK: noError, + expectedNodeName: "machine", + }, + "originUID-emptyResourceVersion": { + podUIDGetter: originUID, + podResourceVersionGetter: emptyResourceVersion, + errOK: noError, + expectedNodeName: "machine", + }, + "originUID-changedResourceVersion": { + podUIDGetter: originUID, + podResourceVersionGetter: changedResourceVersion, + errOK: conflictError, + expectedNodeName: "", + }, + "emptyUID-originResourceVersion": { + podUIDGetter: emptyUID, + podResourceVersionGetter: originResourceVersion, + errOK: noError, + expectedNodeName: "machine", + }, + "emptyUID-emptyResourceVersion": { + podUIDGetter: emptyUID, + podResourceVersionGetter: emptyResourceVersion, + errOK: noError, + expectedNodeName: "machine", + }, + "emptyUID-changedResourceVersion": { + podUIDGetter: emptyUID, + podResourceVersionGetter: changedResourceVersion, + errOK: conflictError, + expectedNodeName: "", + }, + "changedUID-originResourceVersion": { + podUIDGetter: changedUID, + podResourceVersionGetter: originResourceVersion, + errOK: conflictError, + expectedNodeName: "", + }, + "changedUID-emptyResourceVersion": { + podUIDGetter: changedUID, + podResourceVersionGetter: emptyResourceVersion, + errOK: conflictError, + expectedNodeName: "", + }, + "changedUID-changedResourceVersion": { + podUIDGetter: changedUID, + podResourceVersionGetter: changedResourceVersion, + errOK: conflictError, + expectedNodeName: "", + }, + } + + storage, bindingStorage, _, server := newStorage(t) + defer server.Terminate(t) + defer storage.Store.DestroyFunc() + + for k, testCase := range testCases { + pod := validNewPod() + pod.Namespace = fmt.Sprintf("namespace-%s", strings.ToLower(k)) + ctx := genericapirequest.WithNamespace(genericapirequest.NewDefaultContext(), pod.Namespace) + + podCreated, err := storage.Create(ctx, pod, rest.ValidateAllObjectFunc, &metav1.CreateOptions{}) + if err != nil { + t.Fatalf("%s: unexpected error: %v", k, err) + } + + binding := validNewBinding() + binding.UID = testCase.podUIDGetter(podCreated.(*api.Pod)) + binding.ResourceVersion = testCase.podResourceVersionGetter(podCreated.(*api.Pod)) + + if _, err := bindingStorage.Create(ctx, binding.Name, binding, rest.ValidateAllObjectFunc, &metav1.CreateOptions{}); !testCase.errOK(err) { + t.Errorf("%s: unexpected error: %v", k, err) + } + + if pod, err := storage.Get(ctx, pod.Name, &metav1.GetOptions{}); err != nil { + t.Errorf("%s: unexpected error: %v", k, err) + } else if pod.(*api.Pod).Spec.NodeName != testCase.expectedNodeName { + t.Errorf("%s: expected: %v, got: %v", k, pod.(*api.Pod).Spec.NodeName, testCase.expectedNodeName) + } + } +} + func TestEtcdCreateWithExistingContainers(t *testing.T) { storage, bindingStorage, _, server := newStorage(t) defer server.Terminate(t)