From e601eb7c5a6b72e0cbd03128305638c9a039f422 Mon Sep 17 00:00:00 2001 From: Kensei Nakada Date: Thu, 7 Nov 2024 00:54:55 +0900 Subject: [PATCH] fix: run activate() only when fail --- .../framework/preemption/preemption.go | 30 +++++++++++++++---- .../framework/preemption/preemption_test.go | 13 ++++++-- 2 files changed, 35 insertions(+), 8 deletions(-) diff --git a/pkg/scheduler/framework/preemption/preemption.go b/pkg/scheduler/framework/preemption/preemption.go index 924898b3fe4..3b5c8daff41 100644 --- a/pkg/scheduler/framework/preemption/preemption.go +++ b/pkg/scheduler/framework/preemption/preemption.go @@ -492,10 +492,11 @@ func (ev *Evaluator) prepareCandidateAsync(c Candidate, pod *v1.Pod, pluginName defer metrics.PreemptionGoroutinesDuration.WithLabelValues(result).Observe(metrics.SinceInSeconds(startTime)) defer metrics.PreemptionGoroutinesExecutionTotal.WithLabelValues(result).Inc() defer func() { - ev.mu.Lock() - ev.preempting.Delete(pod.UID) - ev.mu.Unlock() - ev.Handler.Activate(logger, map[string]*v1.Pod{pod.Name: pod}) + if result == metrics.GoroutineResultError { + // When API call isn't successful, the Pod may get stuck in the unschedulable pod pool in the worst case. + // So, we should move the Pod to the activeQ. + ev.Handler.Activate(logger, map[string]*v1.Pod{pod.Name: pod}) + } }() defer cancel() logger.V(2).Info("Start the preemption asynchronously", "preemptor", klog.KObj(pod), "node", c.Name(), "numVictims", len(c.Victims().Pods)) @@ -512,15 +513,34 @@ func (ev *Evaluator) prepareCandidateAsync(c Candidate, pod *v1.Pod, pluginName } if len(c.Victims().Pods) == 0 { + ev.mu.Lock() + delete(ev.preempting, pod.UID) + ev.mu.Unlock() + return } - ev.Handler.Parallelizer().Until(ctx, len(c.Victims().Pods), preemptPod, ev.PluginName) + // We can evict all victims in parallel, but the last one. + // We have to remove the pod from the preempting map before the last one is evicted + // because, otherwise, the pod removal might be notified to the scheduling queue before + // we remove this pod from the preempting map, + // and the pod could end up stucking at the unschedulable pod pool + // by all the pod removal events being ignored. + ev.Handler.Parallelizer().Until(ctx, len(c.Victims().Pods)-1, preemptPod, ev.PluginName) if err := errCh.ReceiveError(); err != nil { logger.Error(err, "Error occurred during async preemption") result = metrics.GoroutineResultError } + ev.mu.Lock() + delete(ev.preempting, pod.UID) + ev.mu.Unlock() + + if err := ev.PreemptPod(ctx, c, pod, c.Victims().Pods[len(c.Victims().Pods)-1], pluginName); err != nil { + logger.Error(err, "Error occurred during async preemption") + result = metrics.GoroutineResultError + } + logger.V(2).Info("Async Preemption finished completely", "preemptor", klog.KObj(pod), "node", c.Name(), "result", result) }() } diff --git a/pkg/scheduler/framework/preemption/preemption_test.go b/pkg/scheduler/framework/preemption/preemption_test.go index f97a0985570..22473be4f8f 100644 --- a/pkg/scheduler/framework/preemption/preemption_test.go +++ b/pkg/scheduler/framework/preemption/preemption_test.go @@ -453,6 +453,7 @@ func TestPrepareCandidate(t *testing.T) { expectedStatus *framework.Status // Only compared when async preemption is enabled. expectedPreemptingMap sets.Set[types.UID] + expectedActivatedPods map[string]*v1.Pod }{ { name: "no victims", @@ -543,6 +544,7 @@ func TestPrepareCandidate(t *testing.T) { nodeNames: []string{node1Name}, expectedStatus: framework.AsStatus(errors.New("delete pod failed")), expectedPreemptingMap: sets.New(types.UID("preemptor")), + expectedActivatedPods: map[string]*v1.Pod{preemptor.Name: preemptor}, }, { name: "one victim, not-found victim error is ignored when deleting", @@ -579,6 +581,7 @@ func TestPrepareCandidate(t *testing.T) { nodeNames: []string{node1Name}, expectedStatus: framework.AsStatus(errors.New("patch pod status failed")), expectedPreemptingMap: sets.New(types.UID("preemptor")), + expectedActivatedPods: map[string]*v1.Pod{preemptor.Name: preemptor}, }, { name: "two victims without condition, one passes successfully and the second fails", @@ -601,6 +604,7 @@ func TestPrepareCandidate(t *testing.T) { expectedDeletedPods: []string{"victim2"}, expectedStatus: framework.AsStatus(errors.New("patch pod status failed")), expectedPreemptingMap: sets.New(types.UID("preemptor")), + expectedActivatedPods: map[string]*v1.Pod{preemptor.Name: preemptor}, }, } @@ -730,9 +734,12 @@ func TestPrepareCandidate(t *testing.T) { } if asyncPreemptionEnabled { - // Make sure the preemptor is activated regardless of the preemption result. - if !reflect.DeepEqual(map[string]*v1.Pod{tt.preemptor.Name: tt.preemptor}, fakeActivator.activatedPods) { - lastErrMsg = fmt.Sprintf("expected activated pods %v, got %v", map[string]*v1.Pod{tt.preemptor.Name: tt.preemptor}, fakeActivator.activatedPods) + if tt.expectedActivatedPods != nil && !reflect.DeepEqual(tt.expectedActivatedPods, fakeActivator.activatedPods) { + lastErrMsg = fmt.Sprintf("expected activated pods %v, got %v", tt.expectedActivatedPods, fakeActivator.activatedPods) + return false, nil + } + if tt.expectedActivatedPods == nil && len(fakeActivator.activatedPods) != 0 { + lastErrMsg = fmt.Sprintf("expected no activated pods, got %v", fakeActivator.activatedPods) return false, nil } }