diff --git a/pkg/scheduler/framework/events.go b/pkg/scheduler/framework/events.go index e6b628028c5..910c254c520 100644 --- a/pkg/scheduler/framework/events.go +++ b/pkg/scheduler/framework/events.go @@ -34,6 +34,9 @@ const ( // ForceActivate is the event when a pod is moved from unschedulablePods/backoffQ // to activeQ. Usually it's triggered by plugin implementations. ForceActivate = "ForceActivate" + // UnschedulableTimeout is the event when a pod is moved from unschedulablePods + // due to the timeout specified at pod-max-in-unschedulable-pods-duration. + UnschedulableTimeout = "UnschedulableTimeout" ) var ( @@ -50,7 +53,9 @@ var ( // EventUnscheduledPodDelete is the event when an unscheduled pod is deleted. EventUnscheduledPodDelete = ClusterEvent{Resource: unschedulablePod, ActionType: Delete} // EventUnschedulableTimeout is the event when a pod stays in unschedulable for longer than timeout. - EventUnschedulableTimeout = ClusterEvent{Resource: WildCard, ActionType: All, label: "UnschedulableTimeout"} + EventUnschedulableTimeout = ClusterEvent{Resource: WildCard, ActionType: All, label: UnschedulableTimeout} + // EventForceActivate is the event when a pod is moved from unschedulablePods/backoffQ to activeQ. + EventForceActivate = ClusterEvent{Resource: WildCard, ActionType: All, label: ForceActivate} ) // PodSchedulingPropertiesChange interprets the update of a pod and returns corresponding UpdatePodXYZ event(s). diff --git a/pkg/scheduler/framework/interface.go b/pkg/scheduler/framework/interface.go index f41e35a3fce..433de93f2ad 100644 --- a/pkg/scheduler/framework/interface.go +++ b/pkg/scheduler/framework/interface.go @@ -902,7 +902,11 @@ func (ni *NominatingInfo) Mode() NominatingMode { // PodActivator abstracts operations in the scheduling queue. type PodActivator interface { - // Activate moves the given pods to activeQ iff they're in unschedulablePods or backoffQ. + // Activate moves the given pods to activeQ. + // If a pod isn't found in unschedulablePods or backoffQ and it's in-flight, + // the wildcard event is registered so that the pod will be requeued when it comes back. + // But, if a pod isn't found in unschedulablePods or backoffQ and it's not in-flight (i.e., completely unknown pod), + // Activate would ignore the pod. Activate(logger klog.Logger, pods map[string]*v1.Pod) } diff --git a/pkg/scheduler/framework/plugins/defaultpreemption/default_preemption_test.go b/pkg/scheduler/framework/plugins/defaultpreemption/default_preemption_test.go index 80b9e9959b4..a1d87dcd064 100644 --- a/pkg/scheduler/framework/plugins/defaultpreemption/default_preemption_test.go +++ b/pkg/scheduler/framework/plugins/defaultpreemption/default_preemption_test.go @@ -43,6 +43,7 @@ import ( clientsetfake "k8s.io/client-go/kubernetes/fake" clienttesting "k8s.io/client-go/testing" "k8s.io/client-go/tools/events" + "k8s.io/klog/v2" "k8s.io/klog/v2/ktesting" kubeschedulerconfigv1 "k8s.io/kube-scheduler/config/v1" extenderv1 "k8s.io/kube-scheduler/extender/v1" @@ -1549,6 +1550,7 @@ func TestPodEligibleToPreemptOthers(t *testing.T) { }) } } + func TestPreempt(t *testing.T) { metrics.Register() tests := []struct { @@ -1829,6 +1831,7 @@ func TestPreempt(t *testing.T) { frameworkruntime.WithInformerFactory(informerFactory), frameworkruntime.WithWaitingPods(waitingPods), frameworkruntime.WithLogger(logger), + frameworkruntime.WithPodActivator(&fakePodActivator{}), ) if err != nil { t.Fatal(err) @@ -1941,3 +1944,8 @@ func TestPreempt(t *testing.T) { } } } + +type fakePodActivator struct { +} + +func (f *fakePodActivator) Activate(logger klog.Logger, pods map[string]*v1.Pod) {} diff --git a/pkg/scheduler/framework/preemption/preemption.go b/pkg/scheduler/framework/preemption/preemption.go index 4abb6b88719..3f478c75d2a 100644 --- a/pkg/scheduler/framework/preemption/preemption.go +++ b/pkg/scheduler/framework/preemption/preemption.go @@ -493,11 +493,10 @@ func (ev *Evaluator) prepareCandidateAsync(c Candidate, pod *v1.Pod, pluginName defer metrics.PreemptionGoroutinesDuration.WithLabelValues(result).Observe(metrics.SinceInSeconds(startTime)) defer metrics.PreemptionGoroutinesExecutionTotal.WithLabelValues(result).Inc() defer func() { - if result == metrics.GoroutineResultError { - // When API call isn't successful, the Pod may get stuck in the unschedulable pod pool in the worst case. - // So, we should move the Pod to the activeQ anyways. - ev.Handler.Activate(logger, map[string]*v1.Pod{pod.Name: pod}) - } + ev.mu.Lock() + delete(ev.preempting, pod.UID) + ev.mu.Unlock() + ev.Handler.Activate(logger, map[string]*v1.Pod{pod.Name: pod}) }() defer cancel() logger.V(2).Info("Start the preemption asynchronously", "preemptor", klog.KObj(pod), "node", c.Name(), "numVictims", len(c.Victims().Pods)) @@ -508,40 +507,21 @@ func (ev *Evaluator) prepareCandidateAsync(c Candidate, pod *v1.Pod, pluginName // lets scheduler find another place for them. nominatedPods := getLowerPriorityNominatedPods(logger, ev.Handler, pod, c.Name()) if err := util.ClearNominatedNodeName(ctx, ev.Handler.ClientSet(), nominatedPods...); err != nil { - logger.Error(err, "Cannot clear 'NominatedNodeName' field") + logger.Error(err, "Cannot clear 'NominatedNodeName' field from lower priority pods on the same target node", "node", c.Name()) result = metrics.GoroutineResultError // We do not return as this error is not critical. } if len(c.Victims().Pods) == 0 { - ev.mu.Lock() - delete(ev.preempting, pod.UID) - ev.mu.Unlock() - return } - // We can evict all victims in parallel, but the last one. - // We have to remove the pod from the preempting map before the last one is evicted - // because, otherwise, the pod removal might be notified to the scheduling queue before - // we remove this pod from the preempting map, - // and the pod could end up stucking at the unschedulable pod pool - // by all the pod removal events being ignored. - ev.Handler.Parallelizer().Until(ctx, len(c.Victims().Pods)-1, preemptPod, ev.PluginName) + ev.Handler.Parallelizer().Until(ctx, len(c.Victims().Pods), preemptPod, ev.PluginName) if err := errCh.ReceiveError(); err != nil { logger.Error(err, "Error occurred during async preemption") result = metrics.GoroutineResultError } - ev.mu.Lock() - delete(ev.preempting, pod.UID) - ev.mu.Unlock() - - if err := ev.PreemptPod(ctx, c, pod, c.Victims().Pods[len(c.Victims().Pods)-1], pluginName); err != nil { - logger.Error(err, "Error occurred during async preemption") - result = metrics.GoroutineResultError - } - logger.V(2).Info("Async Preemption finished completely", "preemptor", klog.KObj(pod), "node", c.Name(), "result", result) }() } diff --git a/pkg/scheduler/framework/preemption/preemption_test.go b/pkg/scheduler/framework/preemption/preemption_test.go index 22473be4f8f..b21a1afa289 100644 --- a/pkg/scheduler/framework/preemption/preemption_test.go +++ b/pkg/scheduler/framework/preemption/preemption_test.go @@ -468,6 +468,7 @@ func TestPrepareCandidate(t *testing.T) { nodeNames: []string{node1Name}, expectedStatus: nil, expectedPreemptingMap: sets.New(types.UID("preemptor")), + expectedActivatedPods: map[string]*v1.Pod{preemptor.Name: preemptor}, }, { name: "one victim without condition", @@ -488,6 +489,7 @@ func TestPrepareCandidate(t *testing.T) { expectedDeletedPods: []string{"victim1"}, expectedStatus: nil, expectedPreemptingMap: sets.New(types.UID("preemptor")), + expectedActivatedPods: map[string]*v1.Pod{preemptor.Name: preemptor}, }, { name: "one victim with same condition", @@ -508,6 +510,7 @@ func TestPrepareCandidate(t *testing.T) { expectedDeletedPods: []string{"victim1"}, expectedStatus: nil, expectedPreemptingMap: sets.New(types.UID("preemptor")), + expectedActivatedPods: map[string]*v1.Pod{preemptor.Name: preemptor}, }, { name: "one victim, not-found victim error is ignored when patching", @@ -526,6 +529,7 @@ func TestPrepareCandidate(t *testing.T) { expectedDeletedPods: []string{"victim1"}, expectedStatus: nil, expectedPreemptingMap: sets.New(types.UID("preemptor")), + expectedActivatedPods: map[string]*v1.Pod{preemptor.Name: preemptor}, }, { name: "one victim, but pod deletion failed", @@ -563,6 +567,7 @@ func TestPrepareCandidate(t *testing.T) { expectedDeletedPods: []string{"victim1"}, expectedStatus: nil, expectedPreemptingMap: sets.New(types.UID("preemptor")), + expectedActivatedPods: map[string]*v1.Pod{preemptor.Name: preemptor}, }, { name: "one victim, but patch pod failed", diff --git a/pkg/scheduler/framework/types.go b/pkg/scheduler/framework/types.go index c0dedaf5340..4e63acf95c4 100644 --- a/pkg/scheduler/framework/types.go +++ b/pkg/scheduler/framework/types.go @@ -296,7 +296,7 @@ func (ce ClusterEvent) Label() string { // AllClusterEventLabels returns all possible cluster event labels given to the metrics. func AllClusterEventLabels() []string { - labels := []string{EventUnschedulableTimeout.Label()} + labels := []string{UnschedulableTimeout, ForceActivate} for _, r := range allResources { for _, a := range basicActionTypes { labels = append(labels, ClusterEvent{Resource: r, ActionType: a}.Label())