diff --git a/pkg/registry/core/pod/storage/eviction.go b/pkg/registry/core/pod/storage/eviction.go index 71e9d97bbbf..7834396afa5 100644 --- a/pkg/registry/core/pod/storage/eviction.go +++ b/pkg/registry/core/pod/storage/eviction.go @@ -25,6 +25,7 @@ import ( policyv1 "k8s.io/api/policy/v1" policyv1beta1 "k8s.io/api/policy/v1beta1" "k8s.io/apimachinery/pkg/api/errors" + "k8s.io/apimachinery/pkg/api/meta" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/runtime" @@ -308,11 +309,30 @@ func (r *EvictionREST) Create(ctx context.Context, name string, obj runtime.Obje } func addConditionAndDeletePod(r *EvictionREST, ctx context.Context, name string, validation rest.ValidateObjectFunc, options *metav1.DeleteOptions) error { - if feature.DefaultFeatureGate.Enabled(features.PodDisruptionConditions) { - pod, err := getPod(r, ctx, name) - if err != nil { - return err + if !dryrun.IsDryRun(options.DryRun) && feature.DefaultFeatureGate.Enabled(features.PodDisruptionConditions) { + getLatestPod := func(_ context.Context, _, oldObj runtime.Object) (runtime.Object, error) { + // Throwaway the newObj. We care only about the latest pod obtained from etcd (oldObj). + // So we can add DisruptionTarget condition in conditionAppender without conflicts. + latestPod := oldObj.(*api.Pod).DeepCopy() + if options.Preconditions != nil { + if uid := options.Preconditions.UID; uid != nil && len(*uid) > 0 && *uid != latestPod.UID { + return nil, errors.NewConflict( + schema.GroupResource{Group: "", Resource: "Pod"}, + latestPod.Name, + fmt.Errorf("the UID in the precondition (%s) does not match the UID in record (%s). The object might have been deleted and then recreated", *uid, latestPod.UID), + ) + } + if rv := options.Preconditions.ResourceVersion; rv != nil && len(*rv) > 0 && *rv != latestPod.ResourceVersion { + return nil, errors.NewConflict( + schema.GroupResource{Group: "", Resource: "Pod"}, + latestPod.Name, + fmt.Errorf("the ResourceVersion in the precondition (%s) does not match the ResourceVersion in record (%s). The object might have been modified", *rv, latestPod.ResourceVersion), + ) + } + } + return latestPod, nil } + conditionAppender := func(_ context.Context, newObj, _ runtime.Object) (runtime.Object, error) { podObj := newObj.(*api.Pod) podutil.UpdatePodCondition(&podObj.Status, &api.PodCondition{ @@ -324,11 +344,22 @@ func addConditionAndDeletePod(r *EvictionREST, ctx context.Context, name string, return podObj, nil } - podCopyUpdated := rest.DefaultUpdatedObjectInfo(pod, conditionAppender) + podUpdatedObjectInfo := rest.DefaultUpdatedObjectInfo(nil, getLatestPod, conditionAppender) // order important - if _, _, err = r.store.Update(ctx, name, podCopyUpdated, rest.ValidateAllObjectFunc, rest.ValidateAllObjectUpdateFunc, false, &metav1.UpdateOptions{}); err != nil { + updatedPodObject, _, err := r.store.Update(ctx, name, podUpdatedObjectInfo, rest.ValidateAllObjectFunc, rest.ValidateAllObjectUpdateFunc, false, &metav1.UpdateOptions{}) + if err != nil { return err } + + if !resourceVersionIsUnset(options) { + newResourceVersion, err := meta.NewAccessor().ResourceVersion(updatedPodObject) + if err != nil { + return err + } + // bump the resource version, since we are the one who modified it via the update + options = options.DeepCopy() + options.Preconditions.ResourceVersion = &newResourceVersion + } } _, _, err := r.store.Delete(ctx, name, rest.ValidateAllObjectFunc, options) return err diff --git a/pkg/registry/core/pod/storage/eviction_test.go b/pkg/registry/core/pod/storage/eviction_test.go index 260b44d9983..56311b6eece 100644 --- a/pkg/registry/core/pod/storage/eviction_test.go +++ b/pkg/registry/core/pod/storage/eviction_test.go @@ -709,6 +709,109 @@ func TestEvictionPDBStatus(t *testing.T) { } } +func TestAddConditionAndDelete(t *testing.T) { + cases := []struct { + name string + initialPod bool + makeDeleteOptions func(*api.Pod) *metav1.DeleteOptions + expectErr string + }{ + { + name: "simple", + initialPod: true, + makeDeleteOptions: func(pod *api.Pod) *metav1.DeleteOptions { return &metav1.DeleteOptions{} }, + }, + { + name: "missing", + initialPod: false, + makeDeleteOptions: func(pod *api.Pod) *metav1.DeleteOptions { return &metav1.DeleteOptions{} }, + expectErr: "not found", + }, + { + name: "valid uid", + initialPod: true, + makeDeleteOptions: func(pod *api.Pod) *metav1.DeleteOptions { + return &metav1.DeleteOptions{Preconditions: &metav1.Preconditions{UID: &pod.UID}} + }, + }, + { + name: "invalid uid", + initialPod: true, + makeDeleteOptions: func(pod *api.Pod) *metav1.DeleteOptions { + badUID := pod.UID + "1" + return &metav1.DeleteOptions{Preconditions: &metav1.Preconditions{UID: &badUID}} + }, + expectErr: "The object might have been deleted and then recreated", + }, + { + name: "valid resourceVersion", + initialPod: true, + makeDeleteOptions: func(pod *api.Pod) *metav1.DeleteOptions { + return &metav1.DeleteOptions{Preconditions: &metav1.Preconditions{ResourceVersion: &pod.ResourceVersion}} + }, + }, + { + name: "invalid resourceVersion", + initialPod: true, + makeDeleteOptions: func(pod *api.Pod) *metav1.DeleteOptions { + badRV := pod.ResourceVersion + "1" + return &metav1.DeleteOptions{Preconditions: &metav1.Preconditions{ResourceVersion: &badRV}} + }, + expectErr: "The object might have been modified", + }, + } + + testContext := genericapirequest.WithNamespace(genericapirequest.NewContext(), metav1.NamespaceDefault) + + storage, _, _, server := newStorage(t) + defer server.Terminate(t) + defer storage.Store.DestroyFunc() + + client := fake.NewSimpleClientset() + evictionRest := newEvictionStorage(storage.Store, client.PolicyV1()) + + for _, tc := range cases { + for _, conditionsEnabled := range []bool{true, false} { + name := fmt.Sprintf("%s_conditions=%v", tc.name, conditionsEnabled) + t.Run(name, func(t *testing.T) { + defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.PodDisruptionConditions, conditionsEnabled)() + var deleteOptions *metav1.DeleteOptions + if tc.initialPod { + newPod := validNewPod() + createdObj, err := storage.Create(testContext, newPod, rest.ValidateAllObjectFunc, &metav1.CreateOptions{}) + if err != nil { + t.Fatal(err) + } + t.Cleanup(func() { + zero := int64(0) + storage.Delete(testContext, newPod.Name, rest.ValidateAllObjectFunc, &metav1.DeleteOptions{GracePeriodSeconds: &zero}) + }) + deleteOptions = tc.makeDeleteOptions(createdObj.(*api.Pod)) + } else { + deleteOptions = tc.makeDeleteOptions(nil) + } + if deleteOptions == nil { + deleteOptions = &metav1.DeleteOptions{} + } + + err := addConditionAndDeletePod(evictionRest, testContext, "foo", rest.ValidateAllObjectFunc, deleteOptions) + if err == nil { + if tc.expectErr != "" { + t.Fatalf("expected err containing %q, got none", tc.expectErr) + } + return + } + if tc.expectErr == "" { + t.Fatalf("unexpected err: %v", err) + } + if !strings.Contains(err.Error(), tc.expectErr) { + t.Fatalf("expected err containing %q, got %v", tc.expectErr, err) + } + }) + } + } +} + func resource(resource string) schema.GroupResource { return schema.GroupResource{Group: "", Resource: resource} } @@ -765,7 +868,7 @@ func (ms *mockStore) Watch(ctx context.Context, options *metainternalversion.Lis } func (ms *mockStore) Update(ctx context.Context, name string, objInfo rest.UpdatedObjectInfo, createValidation rest.ValidateObjectFunc, updateValidation rest.ValidateObjectUpdateFunc, forceAllowCreate bool, options *metav1.UpdateOptions) (runtime.Object, bool, error) { - return nil, false, nil + return ms.pod, false, nil } func (ms *mockStore) Get(ctx context.Context, name string, options *metav1.GetOptions) (runtime.Object, error) { diff --git a/test/integration/evictions/evictions_test.go b/test/integration/evictions/evictions_test.go index 99606f87b5f..6c9e70e02e3 100644 --- a/test/integration/evictions/evictions_test.go +++ b/test/integration/evictions/evictions_test.go @@ -38,12 +38,14 @@ import ( "k8s.io/apimachinery/pkg/types" utilerrors "k8s.io/apimachinery/pkg/util/errors" "k8s.io/apimachinery/pkg/util/intstr" + "k8s.io/apimachinery/pkg/util/uuid" "k8s.io/apimachinery/pkg/util/wait" "k8s.io/apiserver/pkg/util/feature" cacheddiscovery "k8s.io/client-go/discovery/cached/memory" "k8s.io/client-go/dynamic" "k8s.io/client-go/informers" clientset "k8s.io/client-go/kubernetes" + policyv1client "k8s.io/client-go/kubernetes/typed/policy/v1" restclient "k8s.io/client-go/rest" "k8s.io/client-go/restmapper" "k8s.io/client-go/scale" @@ -345,22 +347,34 @@ func TestEvictionWithFinalizers(t *testing.T) { cases := map[string]struct { enablePodDisruptionConditions bool phase v1.PodPhase + dryRun bool + wantDisruptionTargetCond bool }{ "terminal pod with PodDisruptionConditions enabled": { enablePodDisruptionConditions: true, phase: v1.PodSucceeded, + wantDisruptionTargetCond: true, }, "terminal pod with PodDisruptionConditions disabled": { enablePodDisruptionConditions: false, phase: v1.PodSucceeded, + wantDisruptionTargetCond: false, }, "running pod with PodDisruptionConditions enabled": { enablePodDisruptionConditions: true, phase: v1.PodRunning, + wantDisruptionTargetCond: true, }, "running pod with PodDisruptionConditions disabled": { enablePodDisruptionConditions: false, phase: v1.PodRunning, + wantDisruptionTargetCond: false, + }, + "running pod with PodDisruptionConditions enabled should not update conditions in dry-run mode": { + enablePodDisruptionConditions: true, + phase: v1.PodRunning, + dryRun: true, + wantDisruptionTargetCond: false, }, } for name, tc := range cases { @@ -391,6 +405,9 @@ func TestEvictionWithFinalizers(t *testing.T) { waitToObservePods(t, informers.Core().V1().Pods().Informer(), 1, tc.phase) deleteOption := metav1.DeleteOptions{} + if tc.dryRun { + deleteOption.DryRun = []string{metav1.DryRunAll} + } eviction := newV1Eviction(ns.Name, pod.Name, deleteOption) @@ -404,9 +421,9 @@ func TestEvictionWithFinalizers(t *testing.T) { t.Fatalf("Failed to get the pod %q with error: %q", klog.KObj(pod), e) } _, cond := podutil.GetPodCondition(&updatedPod.Status, v1.PodConditionType(v1.DisruptionTarget)) - if tc.enablePodDisruptionConditions == true && cond == nil { + if tc.wantDisruptionTargetCond == true && cond == nil { t.Errorf("Pod %q does not have the expected condition: %q", klog.KObj(updatedPod), v1.DisruptionTarget) - } else if tc.enablePodDisruptionConditions == false && cond != nil { + } else if tc.wantDisruptionTargetCond == false && cond != nil { t.Errorf("Pod %q has an unexpected condition: %q", klog.KObj(updatedPod), v1.DisruptionTarget) } }) @@ -489,12 +506,20 @@ func TestEvictionWithUnhealthyPodEvictionPolicy(t *testing.T) { return pdb.Status.ExpectedPods == 1 }) } + // Eviction API can potentially return http.StatusTooManyRequests (429) or http.StatusGatewayTimeout (504) with retryAfterSeconds == 10s + // Do not retry - we want to test that the first request succeeds and make sure it doesn't unnecessarily block the test for 10s + policyV1NoRetriesRESTClient := &noRetriesRESTClient{Interface: clientSet.PolicyV1().RESTClient()} + policyV1NoRetriesClient := policyv1client.New(policyV1NoRetriesRESTClient) + deleteOption := metav1.DeleteOptions{} eviction := newV1Eviction(ns.Name, pod.Name, deleteOption) - err := clientSet.PolicyV1().Evictions(ns.Name).Evict(ctx, eviction) + err := policyV1NoRetriesClient.Evictions(ns.Name).Evict(ctx, eviction) if err != nil { t.Fatalf("Eviction of pod failed %v", err) } + if policyV1NoRetriesRESTClient.postCalls != 1 { + t.Fatalf("expected a single POST call, got %d", policyV1NoRetriesRESTClient.postCalls) + } waitToObservePods(t, informers.Core().V1().Pods().Informer(), 0, v1.PodRunning) waitPDBStable(t, clientSet, ns.Name, pdb.Name, 0) @@ -502,6 +527,109 @@ func TestEvictionWithUnhealthyPodEvictionPolicy(t *testing.T) { } } +// TestEvictionWithPrecondition tests eviction with delete preconditions +func TestEvictionWithPrecondition(t *testing.T) { + cases := map[string]struct { + enforceResourceVersion bool + injectWrongResourceVersion bool + enforceUID bool + injectWrongUID bool + shouldErr bool + }{ + "eviction enforcing resource version": { + enforceResourceVersion: true, + }, + "eviction enforcing UID": { + enforceUID: true, + }, + "eviction enforcing resource version and UID": { + enforceUID: true, + enforceResourceVersion: true, + }, + "eviction enforcing wrong resource version should fail": { + enforceResourceVersion: true, + injectWrongResourceVersion: true, + shouldErr: true, + }, + "eviction enforcing wrong UID should fail": { + enforceUID: true, + injectWrongUID: true, + shouldErr: true, + }, + } + for name, tc := range cases { + t.Run(name, func(t *testing.T) { + closeFn, rm, informers, _, clientSet := rmSetup(t) + defer closeFn() + + ns := framework.CreateNamespaceOrDie(clientSet, "eviction-with-preconditions", t) + defer framework.DeleteNamespaceOrDie(clientSet, ns, t) + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + informers.Start(ctx.Done()) + go rm.Run(ctx) + + pod := newPod("pod") + pod, err := clientSet.CoreV1().Pods(ns.Name).Create(context.TODO(), pod, metav1.CreateOptions{}) + if err != nil { + t.Errorf("Failed to create pod: %q", err) + } + + pod.Status.Phase = v1.PodRunning + addPodConditionReady(pod) + + // generate a new resource version + updatedPod, err := clientSet.CoreV1().Pods(ns.Name).UpdateStatus(ctx, pod, metav1.UpdateOptions{}) + if err != nil { + t.Fatal(err) + } + + waitToObservePods(t, informers.Core().V1().Pods().Informer(), 1, v1.PodRunning) + + deleteOption := metav1.DeleteOptions{} + + if tc.enforceResourceVersion || tc.enforceUID { + deleteOption.Preconditions = &metav1.Preconditions{} + } + + if tc.enforceResourceVersion { + if tc.injectWrongResourceVersion { + deleteOption.Preconditions.ResourceVersion = &pod.ResourceVersion + } else { + deleteOption.Preconditions.ResourceVersion = &updatedPod.ResourceVersion + } + + } + if tc.enforceUID { + if tc.injectWrongUID { + newUID := uuid.NewUUID() + deleteOption.Preconditions.UID = &newUID + } else { + deleteOption.Preconditions.UID = &updatedPod.UID + } + } + + // Eviction API can potentially return http.StatusTooManyRequests (429) or http.StatusGatewayTimeout (504) with retryAfterSeconds == 10s + // Do not retry - we want to test that the first request succeeds and make sure it doesn't unnecessarily block the test for 10s + policyV1NoRetriesRESTClient := &noRetriesRESTClient{Interface: clientSet.PolicyV1().RESTClient()} + policyV1NoRetriesClient := policyv1client.New(policyV1NoRetriesRESTClient) + + eviction := newV1Eviction(ns.Name, updatedPod.Name, deleteOption) + err = policyV1NoRetriesClient.Evictions(ns.Name).Evict(ctx, eviction) + if err != nil && !tc.shouldErr { + t.Fatalf("Eviction of pod failed %q", err) + } + if err == nil && tc.shouldErr { + t.Fatal("Eviction of pod should fail") + } + if policyV1NoRetriesRESTClient.postCalls != 1 { + t.Fatalf("expected a single POST call, got %d", policyV1NoRetriesRESTClient.postCalls) + } + }) + } +} + func newPod(podName string) *v1.Pod { return &v1.Pod{ ObjectMeta: metav1.ObjectMeta{ @@ -637,3 +765,16 @@ func waitPDB(t *testing.T, clientSet clientset.Interface, ns, pdbName string, co func unhealthyPolicyPtr(unhealthyPodEvictionPolicy policyv1.UnhealthyPodEvictionPolicyType) *policyv1.UnhealthyPodEvictionPolicyType { return &unhealthyPodEvictionPolicy } + +type noRetriesRESTClient struct { + mu sync.Mutex + postCalls int + restclient.Interface +} + +func (n *noRetriesRESTClient) Post() *restclient.Request { + n.mu.Lock() + defer n.mu.Unlock() + n.postCalls++ + return n.Interface.Post().MaxRetries(0) +}