From dc079acc2be603fbc2065e2748c79bb5e2c3453d Mon Sep 17 00:00:00 2001 From: Wei Huang Date: Mon, 19 Jul 2021 15:46:55 -0700 Subject: [PATCH] sched: retry unschedule pods immediately after a waiting pod's deletion --- pkg/scheduler/eventhandlers.go | 7 +- pkg/scheduler/framework/interface.go | 3 +- pkg/scheduler/framework/runtime/framework.go | 8 +- pkg/scheduler/scheduler.go | 20 + pkg/scheduler/scheduler_test.go | 4 +- pkg/scheduler/testing/wrappers.go | 3 + test/integration/scheduler/framework_test.go | 363 ++++++++++++++----- test/integration/scheduler/util.go | 5 +- 8 files changed, 312 insertions(+), 101 deletions(-) diff --git a/pkg/scheduler/eventhandlers.go b/pkg/scheduler/eventhandlers.go index ead032b6081..aea3bba428d 100644 --- a/pkg/scheduler/eventhandlers.go +++ b/pkg/scheduler/eventhandlers.go @@ -174,7 +174,12 @@ func (sched *Scheduler) deletePodFromSchedulingQueue(obj interface{}) { klog.ErrorS(err, "Unable to get profile", "pod", klog.KObj(pod)) return } - fwk.RejectWaitingPod(pod.UID) + // If a waiting pod is rejected, it indicates it's previously assumed and we're + // removing it from the scheduler cache. In this case, signal a AssignedPodDelete + // event to immediately retry some unscheduled Pods. + if fwk.RejectWaitingPod(pod.UID) { + sched.SchedulingQueue.MoveAllToActiveOrBackoffQueue(queue.AssignedPodDelete, nil) + } } func (sched *Scheduler) addPodToCache(obj interface{}) { diff --git a/pkg/scheduler/framework/interface.go b/pkg/scheduler/framework/interface.go index f9284e79843..5e7e6ee13e0 100644 --- a/pkg/scheduler/framework/interface.go +++ b/pkg/scheduler/framework/interface.go @@ -584,7 +584,8 @@ type Handle interface { GetWaitingPod(uid types.UID) WaitingPod // RejectWaitingPod rejects a waiting pod given its UID. - RejectWaitingPod(uid types.UID) + // The return value indicates if the pod is waiting or not. + RejectWaitingPod(uid types.UID) bool // ClientSet returns a kubernetes clientSet. ClientSet() clientset.Interface diff --git a/pkg/scheduler/framework/runtime/framework.go b/pkg/scheduler/framework/runtime/framework.go index 23f77928223..7f078caa6d4 100644 --- a/pkg/scheduler/framework/runtime/framework.go +++ b/pkg/scheduler/framework/runtime/framework.go @@ -1092,11 +1092,13 @@ func (f *frameworkImpl) GetWaitingPod(uid types.UID) framework.WaitingPod { } // RejectWaitingPod rejects a WaitingPod given its UID. -func (f *frameworkImpl) RejectWaitingPod(uid types.UID) { - waitingPod := f.waitingPods.get(uid) - if waitingPod != nil { +// The returned value indicates if the given pod is waiting or not. +func (f *frameworkImpl) RejectWaitingPod(uid types.UID) bool { + if waitingPod := f.waitingPods.get(uid); waitingPod != nil { waitingPod.Reject("", "removed") + return true } + return false } // HasFilterPlugins returns true if at least one filter plugin is defined. diff --git a/pkg/scheduler/scheduler.go b/pkg/scheduler/scheduler.go index 95e1a762446..187e01f78a9 100644 --- a/pkg/scheduler/scheduler.go +++ b/pkg/scheduler/scheduler.go @@ -639,6 +639,16 @@ func (sched *Scheduler) scheduleOne(ctx context.Context) { fwk.RunReservePluginsUnreserve(bindingCycleCtx, state, assumedPod, scheduleResult.SuggestedHost) if forgetErr := sched.SchedulerCache.ForgetPod(assumedPod); forgetErr != nil { klog.ErrorS(forgetErr, "scheduler cache ForgetPod failed") + } else { + // "Forget"ing an assumed Pod in binding cycle should be treated as a PodDelete event, + // as the assumed Pod had occupied a certain amount of resources in scheduler cache. + // TODO(#103853): de-duplicate the logic. + // Avoid moving the assumed Pod itself as it's always Unschedulable. + // It's intentional to "defer" this operation; otherwise MoveAllToActiveOrBackoffQueue() would + // update `q.moveRequest` and thus move the assumed pod to backoffQ anyways. + defer sched.SchedulingQueue.MoveAllToActiveOrBackoffQueue(internalqueue.AssignedPodDelete, func(pod *v1.Pod) bool { + return assumedPod.UID != pod.UID + }) } sched.recordSchedulingFailure(fwk, assumedPodInfo, waitOnPermitStatus.AsError(), reason, "") return @@ -652,6 +662,11 @@ func (sched *Scheduler) scheduleOne(ctx context.Context) { fwk.RunReservePluginsUnreserve(bindingCycleCtx, state, assumedPod, scheduleResult.SuggestedHost) if forgetErr := sched.SchedulerCache.ForgetPod(assumedPod); forgetErr != nil { klog.ErrorS(forgetErr, "scheduler cache ForgetPod failed") + } else { + // "Forget"ing an assumed Pod in binding cycle should be treated as a PodDelete event, + // as the assumed Pod had occupied a certain amount of resources in scheduler cache. + // TODO(#103853): de-duplicate the logic. + sched.SchedulingQueue.MoveAllToActiveOrBackoffQueue(internalqueue.AssignedPodDelete, nil) } sched.recordSchedulingFailure(fwk, assumedPodInfo, preBindStatus.AsError(), SchedulerError, "") return @@ -664,6 +679,11 @@ func (sched *Scheduler) scheduleOne(ctx context.Context) { fwk.RunReservePluginsUnreserve(bindingCycleCtx, state, assumedPod, scheduleResult.SuggestedHost) if err := sched.SchedulerCache.ForgetPod(assumedPod); err != nil { klog.ErrorS(err, "scheduler cache ForgetPod failed") + } else { + // "Forget"ing an assumed Pod in binding cycle should be treated as a PodDelete event, + // as the assumed Pod had occupied a certain amount of resources in scheduler cache. + // TODO(#103853): de-duplicate the logic. + sched.SchedulingQueue.MoveAllToActiveOrBackoffQueue(internalqueue.AssignedPodDelete, nil) } sched.recordSchedulingFailure(fwk, assumedPodInfo, fmt.Errorf("binding rejected: %w", err), SchedulerError, "") } else { diff --git a/pkg/scheduler/scheduler_test.go b/pkg/scheduler/scheduler_test.go index 6c8aab3cfdb..acb639bcd22 100644 --- a/pkg/scheduler/scheduler_test.go +++ b/pkg/scheduler/scheduler_test.go @@ -456,6 +456,7 @@ func TestSchedulerScheduleOne(t *testing.T) { Profiles: profile.Map{ testSchedulerName: fwk, }, + SchedulingQueue: internalqueue.NewTestQueue(context.Background(), nil), } called := make(chan struct{}) stopFunc := eventBroadcaster.StartEventWatcher(func(obj runtime.Object) { @@ -946,7 +947,8 @@ func setupTestScheduler(queuedPodStore *clientcache.FIFO, scache internalcache.C Profiles: profile.Map{ testSchedulerName: fwk, }, - client: client, + client: client, + SchedulingQueue: internalqueue.NewTestQueue(context.Background(), nil), } return sched, bindingChan, errChan diff --git a/pkg/scheduler/testing/wrappers.go b/pkg/scheduler/testing/wrappers.go index 599700a8ec4..8e0c392b43b 100644 --- a/pkg/scheduler/testing/wrappers.go +++ b/pkg/scheduler/testing/wrappers.go @@ -24,6 +24,7 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apimachinery/pkg/types" + imageutils "k8s.io/kubernetes/test/utils/image" "k8s.io/utils/pointer" ) @@ -415,6 +416,8 @@ func (p *PodWrapper) Req(resMap map[v1.ResourceName]string) *PodWrapper { res[k] = resource.MustParse(v) } p.Spec.Containers = append(p.Spec.Containers, v1.Container{ + Name: fmt.Sprintf("con%d", len(p.Spec.Containers)), + Image: imageutils.GetPauseImageName(), Resources: v1.ResourceRequirements{ Requests: res, }, diff --git a/test/integration/scheduler/framework_test.go b/test/integration/scheduler/framework_test.go index d1a7c24091b..612e4b494c0 100644 --- a/test/integration/scheduler/framework_test.go +++ b/test/integration/scheduler/framework_test.go @@ -19,16 +19,17 @@ package scheduler import ( "context" "fmt" + "sync" "sync/atomic" "testing" "time" v1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/errors" - "k8s.io/apimachinery/pkg/api/resource" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/util/wait" clientset "k8s.io/client-go/kubernetes" listersv1 "k8s.io/client-go/listers/core/v1" @@ -38,6 +39,7 @@ import ( configtesting "k8s.io/kubernetes/pkg/scheduler/apis/config/testing" "k8s.io/kubernetes/pkg/scheduler/framework" "k8s.io/kubernetes/pkg/scheduler/framework/plugins/defaultbinder" + "k8s.io/kubernetes/pkg/scheduler/framework/plugins/noderesources" frameworkruntime "k8s.io/kubernetes/pkg/scheduler/framework/runtime" st "k8s.io/kubernetes/pkg/scheduler/testing" testutils "k8s.io/kubernetes/test/integration/util" @@ -66,6 +68,9 @@ type FilterPlugin struct { numFilterCalled int32 failFilter bool rejectFilter bool + + numCalledPerPod map[string]int + sync.RWMutex } type PostFilterPlugin struct { @@ -92,6 +97,10 @@ type PreBindPlugin struct { numPreBindCalled int failPreBind bool rejectPreBind bool + // If set to true, always succeed on non-first scheduling attempt. + succeedOnRetry bool + // Record the pod UIDs that have been tried scheduling. + podUIDs map[types.UID]struct{} } type BindPlugin struct { @@ -232,6 +241,9 @@ func (fp *FilterPlugin) Name() string { func (fp *FilterPlugin) reset() { fp.numFilterCalled = 0 fp.failFilter = false + if fp.numCalledPerPod != nil { + fp.numCalledPerPod = make(map[string]int) + } } // Filter is a test function that returns an error or nil, depending on the @@ -239,6 +251,12 @@ func (fp *FilterPlugin) reset() { func (fp *FilterPlugin) Filter(ctx context.Context, state *framework.CycleState, pod *v1.Pod, nodeInfo *framework.NodeInfo) *framework.Status { atomic.AddInt32(&fp.numFilterCalled, 1) + if fp.numCalledPerPod != nil { + fp.Lock() + fp.numCalledPerPod[fmt.Sprintf("%v/%v", pod.Namespace, pod.Name)]++ + fp.Unlock() + } + if fp.failFilter { return framework.NewStatus(framework.Error, fmt.Sprintf("injecting failure for pod %v", pod.Name)) } @@ -310,6 +328,10 @@ func (pp *PreBindPlugin) Name() string { // PreBind is a test function that returns (true, nil) or errors for testing. func (pp *PreBindPlugin) PreBind(ctx context.Context, state *framework.CycleState, pod *v1.Pod, nodeName string) *framework.Status { pp.numPreBindCalled++ + if _, tried := pp.podUIDs[pod.UID]; tried && pp.succeedOnRetry { + return nil + } + pp.podUIDs[pod.UID] = struct{}{} if pp.failPreBind { return framework.NewStatus(framework.Error, fmt.Sprintf("injecting failure for pod %v", pod.Name)) } @@ -324,6 +346,8 @@ func (pp *PreBindPlugin) reset() { pp.numPreBindCalled = 0 pp.failPreBind = false pp.rejectPreBind = false + pp.succeedOnRetry = false + pp.podUIDs = make(map[types.UID]struct{}) } const bindPluginAnnotation = "bindPluginName" @@ -940,34 +964,55 @@ func TestReservePluginReserve(t *testing.T) { // TestPrebindPlugin tests invocation of prebind plugins. func TestPrebindPlugin(t *testing.T) { - // Create a plugin registry for testing. Register only a prebind plugin. - preBindPlugin := &PreBindPlugin{} - registry := frameworkruntime.Registry{preBindPluginName: newPlugin(preBindPlugin)} + // Create a plugin registry for testing. Register a prebind and a filter plugin. + preBindPlugin := &PreBindPlugin{podUIDs: make(map[types.UID]struct{})} + filterPlugin := &FilterPlugin{} + registry := frameworkruntime.Registry{ + preBindPluginName: newPlugin(preBindPlugin), + filterPluginName: newPlugin(filterPlugin), + } - // Setup initial prebind plugin for testing. + // Setup initial prebind and filter plugin in different profiles. + // The second profile ensures the embedded filter plugin is exclusively called, and hence + // we can use its internal `numFilterCalled` to perform some precise checking logic. cfg := configtesting.V1beta2ToInternalWithDefaults(t, v1beta2.KubeSchedulerConfiguration{ - Profiles: []v1beta2.KubeSchedulerProfile{{ - SchedulerName: pointer.StringPtr(v1.DefaultSchedulerName), - Plugins: &v1beta2.Plugins{ - PreBind: v1beta2.PluginSet{ - Enabled: []v1beta2.Plugin{ - {Name: preBindPluginName}, + Profiles: []v1beta2.KubeSchedulerProfile{ + { + SchedulerName: pointer.StringPtr(v1.DefaultSchedulerName), + Plugins: &v1beta2.Plugins{ + PreBind: v1beta2.PluginSet{ + Enabled: []v1beta2.Plugin{ + {Name: preBindPluginName}, + }, }, }, }, - }}, + { + SchedulerName: pointer.StringPtr("2nd-scheduler"), + Plugins: &v1beta2.Plugins{ + Filter: v1beta2.PluginSet{ + Enabled: []v1beta2.Plugin{ + {Name: filterPluginName}, + }, + }, + }, + }, + }, }) // Create the API server and the scheduler with the test plugin set. - testCtx := initTestSchedulerForFrameworkTest(t, testutils.InitTestAPIServer(t, "prebind-plugin", nil), 2, + nodesNum := 2 + testCtx := initTestSchedulerForFrameworkTest(t, testutils.InitTestAPIServer(t, "prebind-plugin", nil), nodesNum, scheduler.WithProfiles(cfg.Profiles...), scheduler.WithFrameworkOutOfTreeRegistry(registry)) defer testutils.CleanupTest(t, testCtx) tests := []struct { - name string - fail bool - reject bool + name string + fail bool + reject bool + succeedOnRetry bool + unschedulablePod *v1.Pod }{ { name: "disable fail and reject flags", @@ -989,12 +1034,39 @@ func TestPrebindPlugin(t *testing.T) { fail: true, reject: true, }, + { + name: "fail on 1st try but succeed on retry", + fail: true, + reject: false, + succeedOnRetry: true, + }, + { + name: "reject on 1st try but succeed on retry", + fail: false, + reject: true, + succeedOnRetry: true, + }, + { + name: "failure on preBind moves unschedulable pods", + fail: true, + unschedulablePod: st.MakePod().Name("unschedulable-pod").Namespace(testCtx.NS.Name).Container(imageutils.GetPauseImageName()).Obj(), + }, } for _, test := range tests { t.Run(test.name, func(t *testing.T) { + if p := test.unschedulablePod; p != nil { + p.Spec.SchedulerName = "2nd-scheduler" + filterPlugin.rejectFilter = true + if _, err := createPausePod(testCtx.ClientSet, p); err != nil { + t.Fatalf("Error while creating an unschedulable pod: %v", err) + } + defer filterPlugin.reset() + } + preBindPlugin.failPreBind = test.fail preBindPlugin.rejectPreBind = test.reject + preBindPlugin.succeedOnRetry = test.succeedOnRetry // Create a best effort pod. pod, err := createPausePod(testCtx.ClientSet, initPausePod(&pausePodConfig{Name: "test-pod", Namespace: testCtx.NS.Name})) @@ -1003,7 +1075,11 @@ func TestPrebindPlugin(t *testing.T) { } if test.fail || test.reject { - if err = wait.Poll(10*time.Millisecond, 30*time.Second, podSchedulingError(testCtx.ClientSet, pod.Namespace, pod.Name)); err != nil { + if test.succeedOnRetry { + if err = testutils.WaitForPodToScheduleWithTimeout(testCtx.ClientSet, pod, 10*time.Second); err != nil { + t.Errorf("Expected the pod to be schedulable on retry, but got an error: %v", err) + } + } else if err = wait.Poll(10*time.Millisecond, 30*time.Second, podSchedulingError(testCtx.ClientSet, pod.Namespace, pod.Name)); err != nil { t.Errorf("Expected a scheduling error, but didn't get it. error: %v", err) } } else if err = testutils.WaitForPodToSchedule(testCtx.ClientSet, pod); err != nil { @@ -1014,6 +1090,16 @@ func TestPrebindPlugin(t *testing.T) { t.Errorf("Expected the prebind plugin to be called.") } + if test.unschedulablePod != nil { + if err := wait.Poll(10*time.Millisecond, 15*time.Second, func() (bool, error) { + // 2 means the unschedulable pod is expected to be retried at least twice. + // (one initial attempt plus the one moved by the preBind pod) + return int(filterPlugin.numFilterCalled) >= 2*nodesNum, nil + }); err != nil { + t.Errorf("Timed out waiting for the unschedulable Pod to be retried at least twice.") + } + } + preBindPlugin.reset() testutils.CleanupPods(testCtx.ClientSet, t, []*v1.Pod{pod}) }) @@ -1346,6 +1432,7 @@ func TestPostBindPlugin(t *testing.T) { // Create a plugin registry for testing. Register a prebind and a postbind plugin. preBindPlugin := &PreBindPlugin{ failPreBind: test.preBindFail, + podUIDs: make(map[types.UID]struct{}), } postBindPlugin := &PostBindPlugin{ name: postBindPluginName, @@ -1841,14 +1928,52 @@ func TestPreScorePlugin(t *testing.T) { } // TestPreemptWithPermitPlugin tests preempt with permit plugins. +// It verifies how waitingPods behave in different scenarios: +// - when waitingPods get preempted +// - they should be removed from internal waitingPods map, but not physically deleted +// - it'd trigger moving unschedulable Pods, but not the waitingPods themselves +// - when waitingPods get deleted externally, it'd trigger moving unschedulable Pods func TestPreemptWithPermitPlugin(t *testing.T) { - // Create a plugin registry for testing. Register only a permit plugin. + // Create a plugin registry for testing. Register a permit and a filter plugin. permitPlugin := &PermitPlugin{} - registry, prof := initRegistryAndConfig(t, permitPlugin) + // Inject a fake filter plugin to use its internal `numFilterCalled` to verify + // how many times a Pod gets tried scheduling. + filterPlugin := &FilterPlugin{numCalledPerPod: make(map[string]int)} + registry := frameworkruntime.Registry{ + permitPluginName: newPermitPlugin(permitPlugin), + filterPluginName: newPlugin(filterPlugin), + } + + // Setup initial permit and filter plugins in the profile. + cfg := configtesting.V1beta2ToInternalWithDefaults(t, v1beta2.KubeSchedulerConfiguration{ + Profiles: []v1beta2.KubeSchedulerProfile{ + { + SchedulerName: pointer.StringPtr(v1.DefaultSchedulerName), + Plugins: &v1beta2.Plugins{ + Permit: v1beta2.PluginSet{ + Enabled: []v1beta2.Plugin{ + {Name: permitPluginName}, + }, + }, + Filter: v1beta2.PluginSet{ + // Ensure the fake filter plugin is always called; otherwise noderesources + // would fail first and exit the Filter phase. + Enabled: []v1beta2.Plugin{ + {Name: filterPluginName}, + {Name: noderesources.FitName}, + }, + Disabled: []v1beta2.Plugin{ + {Name: noderesources.FitName}, + }, + }, + }, + }, + }, + }) // Create the API server and the scheduler with the test plugin set. testCtx := initTestSchedulerForFrameworkTest(t, testutils.InitTestAPIServer(t, "preempt-with-permit-plugin", nil), 0, - scheduler.WithProfiles(prof), + scheduler.WithProfiles(cfg.Profiles...), scheduler.WithFrameworkOutOfTreeRegistry(registry)) defer testutils.CleanupTest(t, testCtx) @@ -1863,85 +1988,141 @@ func TestPreemptWithPermitPlugin(t *testing.T) { t.Fatal(err) } - permitPlugin.failPermit = false - permitPlugin.rejectPermit = false - permitPlugin.timeoutPermit = false - permitPlugin.waitAndRejectPermit = false - permitPlugin.waitAndAllowPermit = true - permitPlugin.waitingPod = "waiting-pod" - + ns := testCtx.NS.Name lowPriority, highPriority := int32(100), int32(300) - resourceRequest := v1.ResourceRequirements{Requests: v1.ResourceList{ - v1.ResourceCPU: *resource.NewMilliQuantity(200, resource.DecimalSI), - v1.ResourceMemory: *resource.NewQuantity(200, resource.DecimalSI)}, + resReq := map[v1.ResourceName]string{ + v1.ResourceCPU: "200m", + v1.ResourceMemory: "200", } - preemptorResourceRequest := v1.ResourceRequirements{Requests: v1.ResourceList{ - v1.ResourceCPU: *resource.NewMilliQuantity(400, resource.DecimalSI), - v1.ResourceMemory: *resource.NewQuantity(400, resource.DecimalSI)}, + preemptorReq := map[v1.ResourceName]string{ + v1.ResourceCPU: "400m", + v1.ResourceMemory: "400", } - // First pod will go running. - runningPod := initPausePod(&pausePodConfig{Name: "running-pod", Namespace: testCtx.NS.Name, Priority: &lowPriority, Resources: &resourceRequest}) - runningPod.Spec.TerminationGracePeriodSeconds = new(int64) - runningPod, err = createPausePod(testCtx.ClientSet, runningPod) - if err != nil { - t.Errorf("Error while creating the waiting pod: %v", err) - } - // Wait until the pod scheduled, then create a preemptor pod to preempt it. - wait.Poll(100*time.Millisecond, 30*time.Second, podScheduled(testCtx.ClientSet, runningPod.Name, runningPod.Namespace)) - - // Second pod will go waiting. - waitingPod := initPausePod(&pausePodConfig{Name: "waiting-pod", Namespace: testCtx.NS.Name, Priority: &lowPriority, Resources: &resourceRequest}) - waitingPod.Spec.TerminationGracePeriodSeconds = new(int64) - waitingPod, err = createPausePod(testCtx.ClientSet, waitingPod) - if err != nil { - t.Errorf("Error while creating the waiting pod: %v", err) - } - // Wait until the waiting-pod is actually waiting, then create a preemptor pod to preempt it. - wait.Poll(10*time.Millisecond, 30*time.Second, func() (bool, error) { - w := false - permitPlugin.fh.IterateOverWaitingPods(func(wp framework.WaitingPod) { w = true }) - return w, nil - }) - - // Create third pod which should preempt other pods. - preemptorPod, err := createPausePod(testCtx.ClientSet, - initPausePod(&pausePodConfig{Name: "preemptor-pod", Namespace: testCtx.NS.Name, Priority: &highPriority, Resources: &preemptorResourceRequest})) - if err != nil { - t.Errorf("Error while creating the preemptor pod: %v", err) + tests := []struct { + name string + deleteWaitingPod bool + maxNumWaitingPodCalled int + runningPod *v1.Pod + waitingPod *v1.Pod + preemptor *v1.Pod + }{ + { + name: "waiting pod is not physically deleted upon preemption", + maxNumWaitingPodCalled: 2, + runningPod: st.MakePod().Name("running-pod").Namespace(ns).Priority(lowPriority).Req(resReq).ZeroTerminationGracePeriod().Obj(), + waitingPod: st.MakePod().Name("waiting-pod").Namespace(ns).Priority(lowPriority).Req(resReq).ZeroTerminationGracePeriod().Obj(), + preemptor: st.MakePod().Name("preemptor-pod").Namespace(ns).Priority(highPriority).Req(preemptorReq).ZeroTerminationGracePeriod().Obj(), + }, + { + name: "rejecting a waiting pod to trigger retrying unschedulable pods immediately, but the waiting pod itself won't be retried", + maxNumWaitingPodCalled: 1, + waitingPod: st.MakePod().Name("waiting-pod").Namespace(ns).Priority(lowPriority).Req(resReq).ZeroTerminationGracePeriod().Obj(), + preemptor: st.MakePod().Name("preemptor-pod").Namespace(ns).Priority(highPriority).Req(preemptorReq).ZeroTerminationGracePeriod().Obj(), + }, + { + name: "deleting a waiting pod to trigger retrying unschedulable pods immediately", + deleteWaitingPod: true, + maxNumWaitingPodCalled: 1, + waitingPod: st.MakePod().Name("waiting-pod").Namespace(ns).Priority(lowPriority).Req(resReq).ZeroTerminationGracePeriod().Obj(), + preemptor: st.MakePod().Name("preemptor-pod").Namespace(ns).Priority(lowPriority).Req(preemptorReq).ZeroTerminationGracePeriod().Obj(), + }, } - // TODO(#96478): uncomment below once we find a way to trigger MoveAllToActiveOrBackoffQueue() - // upon deletion event of unassigned waiting pods. - // if err = testutils.WaitForPodToSchedule(testCtx.ClientSet, preemptorPod); err != nil { - // t.Errorf("Expected the preemptor pod to be scheduled. error: %v", err) - // } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + defer func() { + permitPlugin.reset() + filterPlugin.reset() + var pods []*v1.Pod + for _, p := range []*v1.Pod{tt.runningPod, tt.waitingPod, tt.preemptor} { + if p != nil { + pods = append(pods, p) + } + } + testutils.CleanupPods(testCtx.ClientSet, t, pods) + }() - if err := wait.Poll(200*time.Millisecond, wait.ForeverTestTimeout, func() (bool, error) { - w := false - permitPlugin.fh.IterateOverWaitingPods(func(wp framework.WaitingPod) { w = true }) - return !w, nil - }); err != nil { - t.Error("Expected the waiting pod to get preempted") - } - // Expect the waitingPod to be still present. - if _, err := getPod(testCtx.ClientSet, waitingPod.Name, waitingPod.Namespace); err != nil { - t.Error("Get waiting pod in waiting pod failed.") - } - // Expect the runningPod to be deleted physically. - _, err = getPod(testCtx.ClientSet, runningPod.Name, runningPod.Namespace) - if err != nil && !errors.IsNotFound(err) { - t.Error("Get running pod failed.") - } - if err == nil { - t.Error("Running pod still exist.") - } - if permitPlugin.numPermitCalled == 0 { - t.Errorf("Expected the permit plugin to be called.") - } + permitPlugin.waitAndAllowPermit = true + permitPlugin.waitingPod = "waiting-pod" - permitPlugin.reset() - testutils.CleanupPods(testCtx.ClientSet, t, []*v1.Pod{waitingPod, runningPod, preemptorPod}) + if r := tt.runningPod; r != nil { + if _, err := createPausePod(testCtx.ClientSet, r); err != nil { + t.Fatalf("Error while creating the running pod: %v", err) + } + // Wait until the pod to be scheduled. + if err = testutils.WaitForPodToSchedule(testCtx.ClientSet, r); err != nil { + t.Fatalf("The running pod is expected to be scheduled: %v", err) + } + } + + if w := tt.waitingPod; w != nil { + if _, err := createPausePod(testCtx.ClientSet, w); err != nil { + t.Fatalf("Error while creating the waiting pod: %v", err) + } + // Wait until the waiting-pod is actually waiting. + if err := wait.Poll(10*time.Millisecond, 30*time.Second, func() (bool, error) { + w := false + permitPlugin.fh.IterateOverWaitingPods(func(wp framework.WaitingPod) { w = true }) + return w, nil + }); err != nil { + t.Fatalf("The waiting pod is expected to be waiting: %v", err) + } + } + + if p := tt.preemptor; p != nil { + if _, err := createPausePod(testCtx.ClientSet, p); err != nil { + t.Fatalf("Error while creating the preemptor pod: %v", err) + } + // Delete the waiting pod if specified. + if w := tt.waitingPod; w != nil && tt.deleteWaitingPod { + if err := deletePod(testCtx.ClientSet, w.Name, w.Namespace); err != nil { + t.Fatalf("Error while deleting the waiting pod: %v", err) + } + } + if err = testutils.WaitForPodToSchedule(testCtx.ClientSet, p); err != nil { + t.Fatalf("Expected the preemptor pod to be scheduled. error: %v", err) + } + } + + if w := tt.waitingPod; w != nil { + if err := wait.Poll(200*time.Millisecond, wait.ForeverTestTimeout, func() (bool, error) { + w := false + permitPlugin.fh.IterateOverWaitingPods(func(wp framework.WaitingPod) { w = true }) + return !w, nil + }); err != nil { + t.Fatalf("Expected the waiting pod to get preempted.") + } + + filterPlugin.RLock() + waitingPodCalled := filterPlugin.numCalledPerPod[fmt.Sprintf("%v/%v", w.Namespace, w.Name)] + filterPlugin.RUnlock() + if waitingPodCalled > tt.maxNumWaitingPodCalled { + t.Fatalf("Expected the waiting pod to be called %v times at most, but got %v", tt.maxNumWaitingPodCalled, waitingPodCalled) + } + + if !tt.deleteWaitingPod { + // Expect the waitingPod to be still present. + if _, err := getPod(testCtx.ClientSet, w.Name, w.Namespace); err != nil { + t.Error("Get waiting pod in waiting pod failed.") + } + } + + if permitPlugin.numPermitCalled == 0 { + t.Errorf("Expected the permit plugin to be called.") + } + } + + if r := tt.runningPod; r != nil { + // Expect the runningPod to be deleted physically. + if _, err = getPod(testCtx.ClientSet, r.Name, r.Namespace); err == nil { + t.Error("The running pod still exists.") + } else if !errors.IsNotFound(err) { + t.Errorf("Get running pod failed: %v", err) + } + } + }) + } } const ( diff --git a/test/integration/scheduler/util.go b/test/integration/scheduler/util.go index da6d036ffa8..e28be7d063f 100644 --- a/test/integration/scheduler/util.go +++ b/test/integration/scheduler/util.go @@ -498,10 +498,7 @@ func podScheduled(c clientset.Interface, podNamespace, podName string) wait.Cond // This could be a connection error so we want to retry. return false, nil } - if pod.Spec.NodeName == "" { - return false, nil - } - return true, nil + return pod.Spec.NodeName != "", nil } }