From 850759ec87e520db19df217cc510f267c081dcf3 Mon Sep 17 00:00:00 2001 From: caiweidong Date: Wed, 17 Mar 2021 14:15:09 +0800 Subject: [PATCH] Preempting: do not delete the victim if it just exits in WaitingPods --- .../defaultpreemption/default_preemption.go | 10 ++--- test/integration/scheduler/framework_test.go | 45 +++++++++++++++---- 2 files changed, 41 insertions(+), 14 deletions(-) diff --git a/pkg/scheduler/framework/plugins/defaultpreemption/default_preemption.go b/pkg/scheduler/framework/plugins/defaultpreemption/default_preemption.go index b3d7186bee6..ffdd2c4d3e2 100644 --- a/pkg/scheduler/framework/plugins/defaultpreemption/default_preemption.go +++ b/pkg/scheduler/framework/plugins/defaultpreemption/default_preemption.go @@ -694,13 +694,13 @@ func selectVictimsOnNode( // - Clear the low-priority pods' nominatedNodeName status if needed func PrepareCandidate(c Candidate, fh framework.Handle, cs kubernetes.Interface, pod *v1.Pod, pluginName string) *framework.Status { for _, victim := range c.Victims().Pods { - if err := util.DeletePod(cs, victim); err != nil { - klog.ErrorS(err, "preempting pod", "pod", klog.KObj(victim)) - return framework.AsStatus(err) - } - // If the victim is a WaitingPod, send a reject message to the PermitPlugin + // If the victim is a WaitingPod, send a reject message to the PermitPlugin. + // Otherwise we should delete the victim. if waitingPod := fh.GetWaitingPod(victim.UID); waitingPod != nil { waitingPod.Reject(pluginName, "preempted") + } else if err := util.DeletePod(cs, victim); err != nil { + klog.ErrorS(err, "Preempting pod", "pod", klog.KObj(victim), "preemptor", klog.KObj(pod)) + return framework.AsStatus(err) } fh.EventRecorder().Eventf(victim, pod, v1.EventTypeNormal, "Preempted", "Preempting", "Preempted by %v/%v on node %v", pod.Namespace, pod.Name, c.Name()) diff --git a/test/integration/scheduler/framework_test.go b/test/integration/scheduler/framework_test.go index bc86a329c36..eafb65e6ce6 100644 --- a/test/integration/scheduler/framework_test.go +++ b/test/integration/scheduler/framework_test.go @@ -24,7 +24,7 @@ import ( "time" v1 "k8s.io/api/core/v1" - apierrors "k8s.io/apimachinery/pkg/api/errors" + "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/api/resource" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" @@ -1850,14 +1850,29 @@ func TestPreemptWithPermitPlugin(t *testing.T) { permitPlugin.timeoutPermit = false permitPlugin.waitAndRejectPermit = false permitPlugin.waitAndAllowPermit = true + permitPlugin.waitingPod = "waiting-pod" lowPriority, highPriority := int32(100), int32(300) resourceRequest := v1.ResourceRequirements{Requests: v1.ResourceList{ + v1.ResourceCPU: *resource.NewMilliQuantity(200, resource.DecimalSI), + v1.ResourceMemory: *resource.NewQuantity(200, resource.DecimalSI)}, + } + preemptorResourceRequest := v1.ResourceRequirements{Requests: v1.ResourceList{ v1.ResourceCPU: *resource.NewMilliQuantity(400, resource.DecimalSI), v1.ResourceMemory: *resource.NewQuantity(400, resource.DecimalSI)}, } - // First pod will go waiting. + // First pod will go running. + runningPod := initPausePod(&pausePodConfig{Name: "running-pod", Namespace: testCtx.NS.Name, Priority: &lowPriority, Resources: &resourceRequest}) + runningPod.Spec.TerminationGracePeriodSeconds = new(int64) + runningPod, err = createPausePod(testCtx.ClientSet, runningPod) + if err != nil { + t.Errorf("Error while creating the waiting pod: %v", err) + } + // Wait until the pod scheduled, then create a preemptor pod to preempt it. + wait.Poll(100*time.Millisecond, 30*time.Second, podScheduled(testCtx.ClientSet, runningPod.Name, runningPod.Namespace)) + + // Second pod will go waiting. waitingPod := initPausePod(&pausePodConfig{Name: "waiting-pod", Namespace: testCtx.NS.Name, Priority: &lowPriority, Resources: &resourceRequest}) waitingPod.Spec.TerminationGracePeriodSeconds = new(int64) waitingPod, err = createPausePod(testCtx.ClientSet, waitingPod) @@ -1871,9 +1886,9 @@ func TestPreemptWithPermitPlugin(t *testing.T) { return w, nil }) - // Create second pod which should preempt first pod. + // Create third pod which should preempt other pods. preemptorPod, err := createPausePod(testCtx.ClientSet, - initPausePod(&pausePodConfig{Name: "preemptor-pod", Namespace: testCtx.NS.Name, Priority: &highPriority, Resources: &resourceRequest})) + initPausePod(&pausePodConfig{Name: "preemptor-pod", Namespace: testCtx.NS.Name, Priority: &highPriority, Resources: &preemptorResourceRequest})) if err != nil { t.Errorf("Error while creating the preemptor pod: %v", err) } @@ -1885,18 +1900,30 @@ func TestPreemptWithPermitPlugin(t *testing.T) { // } if err := wait.Poll(200*time.Millisecond, wait.ForeverTestTimeout, func() (bool, error) { - _, err := getPod(testCtx.ClientSet, waitingPod.Name, waitingPod.Namespace) - return apierrors.IsNotFound(err), nil + w := false + permitPlugin.fh.IterateOverWaitingPods(func(wp framework.WaitingPod) { w = true }) + return !w, nil }); err != nil { - t.Error("Expected the waiting pod to get preempted and deleted") + t.Error("Expected the waiting pod to get preempted") + } + // Expect the waitingPod to be still present. + if _, err := getPod(testCtx.ClientSet, waitingPod.Name, waitingPod.Namespace); err != nil { + t.Error("Get waiting pod in waiting pod failed.") + } + // Expect the runningPod to be deleted physically. + _, err = getPod(testCtx.ClientSet, runningPod.Name, runningPod.Namespace) + if err != nil && !errors.IsNotFound(err) { + t.Error("Get running pod failed.") + } + if err == nil { + t.Error("Running pod still exist.") } - if permitPlugin.numPermitCalled == 0 { t.Errorf("Expected the permit plugin to be called.") } permitPlugin.reset() - testutils.CleanupPods(testCtx.ClientSet, t, []*v1.Pod{waitingPod, preemptorPod}) + testutils.CleanupPods(testCtx.ClientSet, t, []*v1.Pod{waitingPod, runningPod, preemptorPod}) } func initTestSchedulerForFrameworkTest(t *testing.T, testCtx *testutils.TestContext, nodeCount int, opts ...scheduler.Option) *testutils.TestContext {