API-initiated eviction: handle deleteOptions correctly

when adding a DisruptionTarget condition into a pod that will be deleted

- handle ResourceVersion and Preconditions correctly
- handle DryRun option correctly

Co-authored-by: Jordan Liggitt jordan@liggitt.net
This commit is contained in:
Filip Křepinský 2023-03-13 22:40:26 +01:00
parent de9ce03f19
commit 51c0e2374f
3 changed files with 285 additions and 10 deletions

View File

@ -25,6 +25,7 @@ import (
policyv1 "k8s.io/api/policy/v1"
policyv1beta1 "k8s.io/api/policy/v1beta1"
"k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/runtime"
@ -308,11 +309,30 @@ func (r *EvictionREST) Create(ctx context.Context, name string, obj runtime.Obje
}
func addConditionAndDeletePod(r *EvictionREST, ctx context.Context, name string, validation rest.ValidateObjectFunc, options *metav1.DeleteOptions) error {
if feature.DefaultFeatureGate.Enabled(features.PodDisruptionConditions) {
pod, err := getPod(r, ctx, name)
if err != nil {
return err
if !dryrun.IsDryRun(options.DryRun) && feature.DefaultFeatureGate.Enabled(features.PodDisruptionConditions) {
getLatestPod := func(_ context.Context, _, oldObj runtime.Object) (runtime.Object, error) {
// Throwaway the newObj. We care only about the latest pod obtained from etcd (oldObj).
// So we can add DisruptionTarget condition in conditionAppender without conflicts.
latestPod := oldObj.(*api.Pod).DeepCopy()
if options.Preconditions != nil {
if uid := options.Preconditions.UID; uid != nil && len(*uid) > 0 && *uid != latestPod.UID {
return nil, errors.NewConflict(
schema.GroupResource{Group: "", Resource: "Pod"},
latestPod.Name,
fmt.Errorf("the UID in the precondition (%s) does not match the UID in record (%s). The object might have been deleted and then recreated", *uid, latestPod.UID),
)
}
if rv := options.Preconditions.ResourceVersion; rv != nil && len(*rv) > 0 && *rv != latestPod.ResourceVersion {
return nil, errors.NewConflict(
schema.GroupResource{Group: "", Resource: "Pod"},
latestPod.Name,
fmt.Errorf("the ResourceVersion in the precondition (%s) does not match the ResourceVersion in record (%s). The object might have been modified", *rv, latestPod.ResourceVersion),
)
}
}
return latestPod, nil
}
conditionAppender := func(_ context.Context, newObj, _ runtime.Object) (runtime.Object, error) {
podObj := newObj.(*api.Pod)
podutil.UpdatePodCondition(&podObj.Status, &api.PodCondition{
@ -324,11 +344,22 @@ func addConditionAndDeletePod(r *EvictionREST, ctx context.Context, name string,
return podObj, nil
}
podCopyUpdated := rest.DefaultUpdatedObjectInfo(pod, conditionAppender)
podUpdatedObjectInfo := rest.DefaultUpdatedObjectInfo(nil, getLatestPod, conditionAppender) // order important
if _, _, err = r.store.Update(ctx, name, podCopyUpdated, rest.ValidateAllObjectFunc, rest.ValidateAllObjectUpdateFunc, false, &metav1.UpdateOptions{}); err != nil {
updatedPodObject, _, err := r.store.Update(ctx, name, podUpdatedObjectInfo, rest.ValidateAllObjectFunc, rest.ValidateAllObjectUpdateFunc, false, &metav1.UpdateOptions{})
if err != nil {
return err
}
if !resourceVersionIsUnset(options) {
newResourceVersion, err := meta.NewAccessor().ResourceVersion(updatedPodObject)
if err != nil {
return err
}
// bump the resource version, since we are the one who modified it via the update
options = options.DeepCopy()
options.Preconditions.ResourceVersion = &newResourceVersion
}
}
_, _, err := r.store.Delete(ctx, name, rest.ValidateAllObjectFunc, options)
return err

View File

@ -709,6 +709,109 @@ func TestEvictionPDBStatus(t *testing.T) {
}
}
func TestAddConditionAndDelete(t *testing.T) {
cases := []struct {
name string
initialPod bool
makeDeleteOptions func(*api.Pod) *metav1.DeleteOptions
expectErr string
}{
{
name: "simple",
initialPod: true,
makeDeleteOptions: func(pod *api.Pod) *metav1.DeleteOptions { return &metav1.DeleteOptions{} },
},
{
name: "missing",
initialPod: false,
makeDeleteOptions: func(pod *api.Pod) *metav1.DeleteOptions { return &metav1.DeleteOptions{} },
expectErr: "not found",
},
{
name: "valid uid",
initialPod: true,
makeDeleteOptions: func(pod *api.Pod) *metav1.DeleteOptions {
return &metav1.DeleteOptions{Preconditions: &metav1.Preconditions{UID: &pod.UID}}
},
},
{
name: "invalid uid",
initialPod: true,
makeDeleteOptions: func(pod *api.Pod) *metav1.DeleteOptions {
badUID := pod.UID + "1"
return &metav1.DeleteOptions{Preconditions: &metav1.Preconditions{UID: &badUID}}
},
expectErr: "The object might have been deleted and then recreated",
},
{
name: "valid resourceVersion",
initialPod: true,
makeDeleteOptions: func(pod *api.Pod) *metav1.DeleteOptions {
return &metav1.DeleteOptions{Preconditions: &metav1.Preconditions{ResourceVersion: &pod.ResourceVersion}}
},
},
{
name: "invalid resourceVersion",
initialPod: true,
makeDeleteOptions: func(pod *api.Pod) *metav1.DeleteOptions {
badRV := pod.ResourceVersion + "1"
return &metav1.DeleteOptions{Preconditions: &metav1.Preconditions{ResourceVersion: &badRV}}
},
expectErr: "The object might have been modified",
},
}
testContext := genericapirequest.WithNamespace(genericapirequest.NewContext(), metav1.NamespaceDefault)
storage, _, _, server := newStorage(t)
defer server.Terminate(t)
defer storage.Store.DestroyFunc()
client := fake.NewSimpleClientset()
evictionRest := newEvictionStorage(storage.Store, client.PolicyV1())
for _, tc := range cases {
for _, conditionsEnabled := range []bool{true, false} {
name := fmt.Sprintf("%s_conditions=%v", tc.name, conditionsEnabled)
t.Run(name, func(t *testing.T) {
defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.PodDisruptionConditions, conditionsEnabled)()
var deleteOptions *metav1.DeleteOptions
if tc.initialPod {
newPod := validNewPod()
createdObj, err := storage.Create(testContext, newPod, rest.ValidateAllObjectFunc, &metav1.CreateOptions{})
if err != nil {
t.Fatal(err)
}
t.Cleanup(func() {
zero := int64(0)
storage.Delete(testContext, newPod.Name, rest.ValidateAllObjectFunc, &metav1.DeleteOptions{GracePeriodSeconds: &zero})
})
deleteOptions = tc.makeDeleteOptions(createdObj.(*api.Pod))
} else {
deleteOptions = tc.makeDeleteOptions(nil)
}
if deleteOptions == nil {
deleteOptions = &metav1.DeleteOptions{}
}
err := addConditionAndDeletePod(evictionRest, testContext, "foo", rest.ValidateAllObjectFunc, deleteOptions)
if err == nil {
if tc.expectErr != "" {
t.Fatalf("expected err containing %q, got none", tc.expectErr)
}
return
}
if tc.expectErr == "" {
t.Fatalf("unexpected err: %v", err)
}
if !strings.Contains(err.Error(), tc.expectErr) {
t.Fatalf("expected err containing %q, got %v", tc.expectErr, err)
}
})
}
}
}
func resource(resource string) schema.GroupResource {
return schema.GroupResource{Group: "", Resource: resource}
}
@ -765,7 +868,7 @@ func (ms *mockStore) Watch(ctx context.Context, options *metainternalversion.Lis
}
func (ms *mockStore) Update(ctx context.Context, name string, objInfo rest.UpdatedObjectInfo, createValidation rest.ValidateObjectFunc, updateValidation rest.ValidateObjectUpdateFunc, forceAllowCreate bool, options *metav1.UpdateOptions) (runtime.Object, bool, error) {
return nil, false, nil
return ms.pod, false, nil
}
func (ms *mockStore) Get(ctx context.Context, name string, options *metav1.GetOptions) (runtime.Object, error) {

View File

@ -38,12 +38,14 @@ import (
"k8s.io/apimachinery/pkg/types"
utilerrors "k8s.io/apimachinery/pkg/util/errors"
"k8s.io/apimachinery/pkg/util/intstr"
"k8s.io/apimachinery/pkg/util/uuid"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/apiserver/pkg/util/feature"
cacheddiscovery "k8s.io/client-go/discovery/cached/memory"
"k8s.io/client-go/dynamic"
"k8s.io/client-go/informers"
clientset "k8s.io/client-go/kubernetes"
policyv1client "k8s.io/client-go/kubernetes/typed/policy/v1"
restclient "k8s.io/client-go/rest"
"k8s.io/client-go/restmapper"
"k8s.io/client-go/scale"
@ -345,22 +347,34 @@ func TestEvictionWithFinalizers(t *testing.T) {
cases := map[string]struct {
enablePodDisruptionConditions bool
phase v1.PodPhase
dryRun bool
wantDisruptionTargetCond bool
}{
"terminal pod with PodDisruptionConditions enabled": {
enablePodDisruptionConditions: true,
phase: v1.PodSucceeded,
wantDisruptionTargetCond: true,
},
"terminal pod with PodDisruptionConditions disabled": {
enablePodDisruptionConditions: false,
phase: v1.PodSucceeded,
wantDisruptionTargetCond: false,
},
"running pod with PodDisruptionConditions enabled": {
enablePodDisruptionConditions: true,
phase: v1.PodRunning,
wantDisruptionTargetCond: true,
},
"running pod with PodDisruptionConditions disabled": {
enablePodDisruptionConditions: false,
phase: v1.PodRunning,
wantDisruptionTargetCond: false,
},
"running pod with PodDisruptionConditions enabled should not update conditions in dry-run mode": {
enablePodDisruptionConditions: true,
phase: v1.PodRunning,
dryRun: true,
wantDisruptionTargetCond: false,
},
}
for name, tc := range cases {
@ -391,6 +405,9 @@ func TestEvictionWithFinalizers(t *testing.T) {
waitToObservePods(t, informers.Core().V1().Pods().Informer(), 1, tc.phase)
deleteOption := metav1.DeleteOptions{}
if tc.dryRun {
deleteOption.DryRun = []string{metav1.DryRunAll}
}
eviction := newV1Eviction(ns.Name, pod.Name, deleteOption)
@ -404,9 +421,9 @@ func TestEvictionWithFinalizers(t *testing.T) {
t.Fatalf("Failed to get the pod %q with error: %q", klog.KObj(pod), e)
}
_, cond := podutil.GetPodCondition(&updatedPod.Status, v1.PodConditionType(v1.DisruptionTarget))
if tc.enablePodDisruptionConditions == true && cond == nil {
if tc.wantDisruptionTargetCond == true && cond == nil {
t.Errorf("Pod %q does not have the expected condition: %q", klog.KObj(updatedPod), v1.DisruptionTarget)
} else if tc.enablePodDisruptionConditions == false && cond != nil {
} else if tc.wantDisruptionTargetCond == false && cond != nil {
t.Errorf("Pod %q has an unexpected condition: %q", klog.KObj(updatedPod), v1.DisruptionTarget)
}
})
@ -489,12 +506,20 @@ func TestEvictionWithUnhealthyPodEvictionPolicy(t *testing.T) {
return pdb.Status.ExpectedPods == 1
})
}
// Eviction API can potentially return http.StatusTooManyRequests (429) or http.StatusGatewayTimeout (504) with retryAfterSeconds == 10s
// Do not retry - we want to test that the first request succeeds and make sure it doesn't unnecessarily block the test for 10s
policyV1NoRetriesRESTClient := &noRetriesRESTClient{Interface: clientSet.PolicyV1().RESTClient()}
policyV1NoRetriesClient := policyv1client.New(policyV1NoRetriesRESTClient)
deleteOption := metav1.DeleteOptions{}
eviction := newV1Eviction(ns.Name, pod.Name, deleteOption)
err := clientSet.PolicyV1().Evictions(ns.Name).Evict(ctx, eviction)
err := policyV1NoRetriesClient.Evictions(ns.Name).Evict(ctx, eviction)
if err != nil {
t.Fatalf("Eviction of pod failed %v", err)
}
if policyV1NoRetriesRESTClient.postCalls != 1 {
t.Fatalf("expected a single POST call, got %d", policyV1NoRetriesRESTClient.postCalls)
}
waitToObservePods(t, informers.Core().V1().Pods().Informer(), 0, v1.PodRunning)
waitPDBStable(t, clientSet, ns.Name, pdb.Name, 0)
@ -502,6 +527,109 @@ func TestEvictionWithUnhealthyPodEvictionPolicy(t *testing.T) {
}
}
// TestEvictionWithPrecondition tests eviction with delete preconditions
func TestEvictionWithPrecondition(t *testing.T) {
cases := map[string]struct {
enforceResourceVersion bool
injectWrongResourceVersion bool
enforceUID bool
injectWrongUID bool
shouldErr bool
}{
"eviction enforcing resource version": {
enforceResourceVersion: true,
},
"eviction enforcing UID": {
enforceUID: true,
},
"eviction enforcing resource version and UID": {
enforceUID: true,
enforceResourceVersion: true,
},
"eviction enforcing wrong resource version should fail": {
enforceResourceVersion: true,
injectWrongResourceVersion: true,
shouldErr: true,
},
"eviction enforcing wrong UID should fail": {
enforceUID: true,
injectWrongUID: true,
shouldErr: true,
},
}
for name, tc := range cases {
t.Run(name, func(t *testing.T) {
closeFn, rm, informers, _, clientSet := rmSetup(t)
defer closeFn()
ns := framework.CreateNamespaceOrDie(clientSet, "eviction-with-preconditions", t)
defer framework.DeleteNamespaceOrDie(clientSet, ns, t)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
informers.Start(ctx.Done())
go rm.Run(ctx)
pod := newPod("pod")
pod, err := clientSet.CoreV1().Pods(ns.Name).Create(context.TODO(), pod, metav1.CreateOptions{})
if err != nil {
t.Errorf("Failed to create pod: %q", err)
}
pod.Status.Phase = v1.PodRunning
addPodConditionReady(pod)
// generate a new resource version
updatedPod, err := clientSet.CoreV1().Pods(ns.Name).UpdateStatus(ctx, pod, metav1.UpdateOptions{})
if err != nil {
t.Fatal(err)
}
waitToObservePods(t, informers.Core().V1().Pods().Informer(), 1, v1.PodRunning)
deleteOption := metav1.DeleteOptions{}
if tc.enforceResourceVersion || tc.enforceUID {
deleteOption.Preconditions = &metav1.Preconditions{}
}
if tc.enforceResourceVersion {
if tc.injectWrongResourceVersion {
deleteOption.Preconditions.ResourceVersion = &pod.ResourceVersion
} else {
deleteOption.Preconditions.ResourceVersion = &updatedPod.ResourceVersion
}
}
if tc.enforceUID {
if tc.injectWrongUID {
newUID := uuid.NewUUID()
deleteOption.Preconditions.UID = &newUID
} else {
deleteOption.Preconditions.UID = &updatedPod.UID
}
}
// Eviction API can potentially return http.StatusTooManyRequests (429) or http.StatusGatewayTimeout (504) with retryAfterSeconds == 10s
// Do not retry - we want to test that the first request succeeds and make sure it doesn't unnecessarily block the test for 10s
policyV1NoRetriesRESTClient := &noRetriesRESTClient{Interface: clientSet.PolicyV1().RESTClient()}
policyV1NoRetriesClient := policyv1client.New(policyV1NoRetriesRESTClient)
eviction := newV1Eviction(ns.Name, updatedPod.Name, deleteOption)
err = policyV1NoRetriesClient.Evictions(ns.Name).Evict(ctx, eviction)
if err != nil && !tc.shouldErr {
t.Fatalf("Eviction of pod failed %q", err)
}
if err == nil && tc.shouldErr {
t.Fatal("Eviction of pod should fail")
}
if policyV1NoRetriesRESTClient.postCalls != 1 {
t.Fatalf("expected a single POST call, got %d", policyV1NoRetriesRESTClient.postCalls)
}
})
}
}
func newPod(podName string) *v1.Pod {
return &v1.Pod{
ObjectMeta: metav1.ObjectMeta{
@ -637,3 +765,16 @@ func waitPDB(t *testing.T, clientSet clientset.Interface, ns, pdbName string, co
func unhealthyPolicyPtr(unhealthyPodEvictionPolicy policyv1.UnhealthyPodEvictionPolicyType) *policyv1.UnhealthyPodEvictionPolicyType {
return &unhealthyPodEvictionPolicy
}
type noRetriesRESTClient struct {
mu sync.Mutex
postCalls int
restclient.Interface
}
func (n *noRetriesRESTClient) Post() *restclient.Request {
n.mu.Lock()
defer n.mu.Unlock()
n.postCalls++
return n.Interface.Post().MaxRetries(0)
}