diff --git a/pkg/scheduler/scheduler.go b/pkg/scheduler/scheduler.go index cf9b8ec71c7..9a9be063aa3 100644 --- a/pkg/scheduler/scheduler.go +++ b/pkg/scheduler/scheduler.go @@ -295,7 +295,7 @@ func (sched *Scheduler) schedule(pod *v1.Pod, pluginContext *framework.PluginCon // preempt tries to create room for a pod that has failed to schedule, by preempting lower priority pods if possible. // If it succeeds, it adds the name of the node where preemption has happened to the pod spec. // It returns the node name and an error if any. -func (sched *Scheduler) preempt(preemptor *v1.Pod, scheduleErr error) (string, error) { +func (sched *Scheduler) preempt(fwk framework.Framework, preemptor *v1.Pod, scheduleErr error) (string, error) { preemptor, err := sched.config.PodPreemptor.GetUpdatedPod(preemptor) if err != nil { klog.Errorf("Error getting the updated preemptor pod object: %v", err) @@ -328,6 +328,10 @@ func (sched *Scheduler) preempt(preemptor *v1.Pod, scheduleErr error) (string, e klog.Errorf("Error preempting pod %v/%v: %v", victim.Namespace, victim.Name, err) return "", err } + // If the victim is a WaitingPod, send a reject message to the PermitPlugin + if waitingPod := fwk.GetWaitingPod(victim.UID); waitingPod != nil { + waitingPod.Reject("preempted") + } sched.config.Recorder.Eventf(victim, preemptor, v1.EventTypeNormal, "Preempted", "Preempting", "Preempted by %v/%v on node %v", preemptor.Namespace, preemptor.Name, nodeName) } @@ -486,7 +490,7 @@ func (sched *Scheduler) scheduleOne() { " No preemption is performed.") } else { preemptionStartTime := time.Now() - sched.preempt(pod, fitError) + sched.preempt(fwk, pod, fitError) metrics.PreemptionAttempts.Inc() metrics.SchedulingAlgorithmPremptionEvaluationDuration.Observe(metrics.SinceInSeconds(preemptionStartTime)) metrics.DeprecatedSchedulingAlgorithmPremptionEvaluationDuration.Observe(metrics.SinceInMicroseconds(preemptionStartTime)) diff --git a/test/integration/scheduler/framework_test.go b/test/integration/scheduler/framework_test.go index 0dd08b6e9c2..7f849bcc02e 100644 --- a/test/integration/scheduler/framework_test.go +++ b/test/integration/scheduler/framework_test.go @@ -22,6 +22,7 @@ import ( "time" "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/resource" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/util/wait" @@ -85,6 +86,7 @@ type PermitPlugin struct { timeoutPermit bool waitAndRejectPermit bool waitAndAllowPermit bool + allowPermit bool fh framework.FrameworkHandle } @@ -363,6 +365,9 @@ func (pp *PermitPlugin) Permit(pc *framework.PluginContext, pod *v1.Pod, nodeNam if pp.timeoutPermit { return framework.NewStatus(framework.Wait, ""), 3 * time.Second } + if pp.allowPermit && pod.Name != "waiting-pod" { + return nil, 0 + } if pp.waitAndRejectPermit || pp.waitAndAllowPermit { if pod.Name == "waiting-pod" { return framework.NewStatus(framework.Wait, ""), 30 * time.Second @@ -395,6 +400,7 @@ func (pp *PermitPlugin) reset() { pp.timeoutPermit = false pp.waitAndRejectPermit = false pp.waitAndAllowPermit = false + pp.allowPermit = false } // NewPermitPlugin is the factory for permit plugin. @@ -1382,3 +1388,92 @@ func TestFilterPlugin(t *testing.T) { cleanupPods(cs, t, []*v1.Pod{pod}) } } + +// TestPreemptWithPermitPlugin tests preempt with permit plugins. +func TestPreemptWithPermitPlugin(t *testing.T) { + // Create a plugin registry for testing. Register only a permit plugin. + registry := framework.Registry{permitPluginName: NewPermitPlugin} + + // Setup initial permit plugin for testing. + plugins := &schedulerconfig.Plugins{ + Permit: &schedulerconfig.PluginSet{ + Enabled: []schedulerconfig.Plugin{ + { + Name: permitPluginName, + }, + }, + }, + } + // Set permit plugin config for testing + pluginConfig := []schedulerconfig.PluginConfig{ + { + Name: permitPluginName, + Args: runtime.Unknown{}, + }, + } + + // Create the master and the scheduler with the test plugin set. + context := initTestSchedulerWithOptions(t, + initTestMaster(t, "preempt-with-permit-plugin", nil), + false, nil, registry, plugins, pluginConfig, false, time.Second) + defer cleanupTest(t, context) + + cs := context.clientSet + // Add one node. + nodeRes := &v1.ResourceList{ + v1.ResourcePods: *resource.NewQuantity(32, resource.DecimalSI), + v1.ResourceCPU: *resource.NewMilliQuantity(500, resource.DecimalSI), + v1.ResourceMemory: *resource.NewQuantity(500, resource.DecimalSI), + } + _, err := createNodes(cs, "test-node", nodeRes, 1) + if err != nil { + t.Fatalf("Cannot create nodes: %v", err) + } + + perPlugin.failPermit = false + perPlugin.rejectPermit = false + perPlugin.timeoutPermit = false + perPlugin.waitAndRejectPermit = false + perPlugin.waitAndAllowPermit = true + perPlugin.allowPermit = true + + lowPriority, highPriority := int32(100), int32(300) + resourceRequest := v1.ResourceRequirements{Requests: v1.ResourceList{ + v1.ResourceCPU: *resource.NewMilliQuantity(400, resource.DecimalSI), + v1.ResourceMemory: *resource.NewQuantity(400, resource.DecimalSI)}, + } + + // Create two pods. + waitingPod, err := createPausePod(cs, + initPausePod(cs, &pausePodConfig{Name: "waiting-pod", Namespace: context.ns.Name, Priority: &lowPriority, Resources: &resourceRequest})) + if err != nil { + t.Errorf("Error while creating the waiting pod: %v", err) + } + // Wait until the waiting-pod is actually waiting, then create a preemptor pod to preempt it. + wait.Poll(10*time.Millisecond, 30*time.Second, func() (bool, error) { + w := false + perPlugin.fh.IterateOverWaitingPods(func(wp framework.WaitingPod) { w = true }) + return w, nil + }) + + preemptorPod, err := createPausePod(cs, + initPausePod(cs, &pausePodConfig{Name: "preemptor-pod", Namespace: context.ns.Name, Priority: &highPriority, Resources: &resourceRequest})) + if err != nil { + t.Errorf("Error while creating the preemptor pod: %v", err) + } + + if err = waitForPodToSchedule(cs, preemptorPod); err != nil { + t.Errorf("Expected the preemptor pod to be scheduled. error: %v", err) + } + + if _, err := getPod(cs, waitingPod.Name, waitingPod.Namespace); err == nil { + t.Error("Expected the waiting pod to get preempted and deleted") + } + + if perPlugin.numPermitCalled == 0 { + t.Errorf("Expected the permit plugin to be called.") + } + + perPlugin.reset() + cleanupPods(cs, t, []*v1.Pod{waitingPod, preemptorPod}) +}