diff --git a/pkg/scheduler/framework/interface.go b/pkg/scheduler/framework/interface.go index 285973e249f..f41e35a3fce 100644 --- a/pkg/scheduler/framework/interface.go +++ b/pkg/scheduler/framework/interface.go @@ -770,6 +770,8 @@ type Framework interface { // SetPodNominator sets the PodNominator SetPodNominator(nominator PodNominator) + // SetPodActivator sets the PodActivator + SetPodActivator(activator PodActivator) // Close calls Close method of each plugin. Close() error @@ -783,6 +785,8 @@ type Handle interface { PodNominator // PluginsRunner abstracts operations to run some plugins. PluginsRunner + // PodActivator abstracts operations in the scheduling queue. + PodActivator // SnapshotSharedLister returns listers from the latest NodeInfo Snapshot. The snapshot // is taken at the beginning of a scheduling cycle and remains unchanged until // a pod finishes "Permit" point. @@ -896,6 +900,12 @@ func (ni *NominatingInfo) Mode() NominatingMode { return ni.NominatingMode } +// PodActivator abstracts operations in the scheduling queue. +type PodActivator interface { + // Activate moves the given pods to activeQ iff they're in unschedulablePods or backoffQ. + Activate(logger klog.Logger, pods map[string]*v1.Pod) +} + // PodNominator abstracts operations to maintain nominated Pods. type PodNominator interface { // AddNominatedPod adds the given pod to the nominator or diff --git a/pkg/scheduler/framework/preemption/preemption.go b/pkg/scheduler/framework/preemption/preemption.go index a58781c9690..5aabf6f951d 100644 --- a/pkg/scheduler/framework/preemption/preemption.go +++ b/pkg/scheduler/framework/preemption/preemption.go @@ -492,6 +492,13 @@ func (ev *Evaluator) prepareCandidateAsync(c Candidate, pod *v1.Pod, pluginName result := metrics.GoroutineResultSuccess defer metrics.PreemptionGoroutinesDuration.WithLabelValues(result).Observe(metrics.SinceInSeconds(startTime)) defer metrics.PreemptionGoroutinesExecutionTotal.WithLabelValues(result).Inc() + defer func() { + if result == metrics.GoroutineResultError { + // When API call isn't successful, the Pod may get stuck in the unschedulable pod pool in the worst case. + // So, we should move the Pod to the activeQ anyways. + ev.Handler.Activate(logger, map[string]*v1.Pod{pod.Name: pod}) + } + }() defer cancel() logger.V(2).Info("Start the preemption asynchronously", "preemptor", klog.KObj(pod), "node", c.Name(), "numVictims", len(c.Victims().Pods)) @@ -506,13 +513,6 @@ func (ev *Evaluator) prepareCandidateAsync(c Candidate, pod *v1.Pod, pluginName // We do not return as this error is not critical. } - // We can evict all victims in parallel, but the last one. - // We have to remove the pod from the preempting map before the last one is evicted - // because, otherwise, the pod removal might be notified to the scheduling queue before - // we remove this pod from the preempting map, - // and the pod could end up stucking at the unschedulable pod pool - // by all the pod removal events being ignored. - if len(c.Victims().Pods) == 0 { ev.mu.Lock() delete(ev.preempting, pod.UID) @@ -521,9 +521,15 @@ func (ev *Evaluator) prepareCandidateAsync(c Candidate, pod *v1.Pod, pluginName return } + // We can evict all victims in parallel, but the last one. + // We have to remove the pod from the preempting map before the last one is evicted + // because, otherwise, the pod removal might be notified to the scheduling queue before + // we remove this pod from the preempting map, + // and the pod could end up stucking at the unschedulable pod pool + // by all the pod removal events being ignored. ev.Handler.Parallelizer().Until(ctx, len(c.Victims().Pods)-1, preemptPod, ev.PluginName) if err := errCh.ReceiveError(); err != nil { - logger.Error(err, "Error occurred during preemption") + logger.Error(err, "Error occurred during async preemption") result = metrics.GoroutineResultError } @@ -532,7 +538,7 @@ func (ev *Evaluator) prepareCandidateAsync(c Candidate, pod *v1.Pod, pluginName ev.mu.Unlock() if err := ev.PreemptPod(ctx, c, pod, c.Victims().Pods[len(c.Victims().Pods)-1], pluginName); err != nil { - logger.Error(err, "Error occurred during preemption") + logger.Error(err, "Error occurred during async preemption") result = metrics.GoroutineResultError } diff --git a/pkg/scheduler/framework/preemption/preemption_test.go b/pkg/scheduler/framework/preemption/preemption_test.go index e00644b3390..b6aa1908b18 100644 --- a/pkg/scheduler/framework/preemption/preemption_test.go +++ b/pkg/scheduler/framework/preemption/preemption_test.go @@ -20,6 +20,7 @@ import ( "context" "errors" "fmt" + "reflect" "sort" "testing" "time" @@ -37,6 +38,7 @@ import ( "k8s.io/client-go/kubernetes/scheme" clienttesting "k8s.io/client-go/testing" "k8s.io/client-go/tools/events" + "k8s.io/klog/v2" "k8s.io/klog/v2/ktesting" extenderv1 "k8s.io/kube-scheduler/extender/v1" internalcache "k8s.io/kubernetes/pkg/scheduler/backend/cache" @@ -86,6 +88,16 @@ func (pl *FakePostFilterPlugin) OrderedScoreFuncs(ctx context.Context, nodesToVi return nil } +type fakePodActivator struct { + activatedPods map[string]*v1.Pod +} + +func (f *fakePodActivator) Activate(logger klog.Logger, pods map[string]*v1.Pod) { + for name, pod := range pods { + f.activatedPods[name] = pod + } +} + type FakePreemptionScorePostFilterPlugin struct{} func (pl *FakePreemptionScorePostFilterPlugin) SelectVictimsOnNode( @@ -437,6 +449,7 @@ func TestPrepareCandidate(t *testing.T) { expectedStatus *framework.Status // Only compared when async preemption is enabled. expectedPreemptingMap sets.Set[types.UID] + expectedActivatedPods map[string]*v1.Pod }{ { name: "no victims", @@ -527,6 +540,7 @@ func TestPrepareCandidate(t *testing.T) { nodeNames: []string{node1Name}, expectedStatus: framework.AsStatus(errors.New("delete pod failed")), expectedPreemptingMap: sets.New(types.UID("preemptor")), + expectedActivatedPods: map[string]*v1.Pod{preemptor.Name: preemptor}, }, { name: "one victim, not-found victim error is ignored when deleting", @@ -563,6 +577,7 @@ func TestPrepareCandidate(t *testing.T) { nodeNames: []string{node1Name}, expectedStatus: framework.AsStatus(errors.New("patch pod status failed")), expectedPreemptingMap: sets.New(types.UID("preemptor")), + expectedActivatedPods: map[string]*v1.Pod{preemptor.Name: preemptor}, }, { name: "two victims without condition, one passes successfully and the second fails", @@ -585,6 +600,7 @@ func TestPrepareCandidate(t *testing.T) { expectedDeletedPods: []string{"victim2"}, expectedStatus: framework.AsStatus(errors.New("patch pod status failed")), expectedPreemptingMap: sets.New(types.UID("preemptor")), + expectedActivatedPods: map[string]*v1.Pod{preemptor.Name: preemptor}, }, } @@ -638,6 +654,7 @@ func TestPrepareCandidate(t *testing.T) { informerFactory := informers.NewSharedInformerFactory(cs, 0) eventBroadcaster := events.NewBroadcaster(&events.EventSinkImpl{Interface: cs.EventsV1()}) + fakeActivator := &fakePodActivator{activatedPods: make(map[string]*v1.Pod)} fwk, err := tf.NewFramework( ctx, registeredPlugins, "", @@ -648,6 +665,7 @@ func TestPrepareCandidate(t *testing.T) { frameworkruntime.WithSnapshotSharedLister(internalcache.NewSnapshot(tt.testPods, nodes)), frameworkruntime.WithPodNominator(internalqueue.NewSchedulingQueue(nil, informerFactory)), frameworkruntime.WithEventRecorder(eventBroadcaster.NewRecorder(scheme.Scheme, "test-scheduler")), + frameworkruntime.WithPodActivator(fakeActivator), ) if err != nil { t.Fatal(err) @@ -671,8 +689,6 @@ func TestPrepareCandidate(t *testing.T) { pe.mu.Unlock() // make the requests complete close(requestStopper) - - return } else { close(requestStopper) // no need to stop requests status := pe.prepareCandidate(ctx, tt.candidate, tt.preemptor, "test-plugin") @@ -705,6 +721,18 @@ func TestPrepareCandidate(t *testing.T) { lastErrMsg = fmt.Sprintf("expected patch error %v, got %v", tt.expectedPatchError, patchFailure) return false, nil } + + if asyncPreemptionEnabled { + if tt.expectedActivatedPods != nil && !reflect.DeepEqual(tt.expectedActivatedPods, fakeActivator.activatedPods) { + lastErrMsg = fmt.Sprintf("expected activated pods %v, got %v", tt.expectedActivatedPods, fakeActivator.activatedPods) + return false, nil + } + if tt.expectedActivatedPods == nil && len(fakeActivator.activatedPods) != 0 { + lastErrMsg = fmt.Sprintf("expected no activated pods, got %v", fakeActivator.activatedPods) + return false, nil + } + } + return true, nil }); err != nil { t.Fatal(lastErrMsg) diff --git a/pkg/scheduler/framework/runtime/framework.go b/pkg/scheduler/framework/runtime/framework.go index cbe3bcee3d6..0fe2a02577d 100644 --- a/pkg/scheduler/framework/runtime/framework.go +++ b/pkg/scheduler/framework/runtime/framework.go @@ -84,6 +84,7 @@ type frameworkImpl struct { extenders []framework.Extender framework.PodNominator + framework.PodActivator parallelizer parallelize.Parallelizer } @@ -131,6 +132,7 @@ type frameworkOptions struct { snapshotSharedLister framework.SharedLister metricsRecorder *metrics.MetricAsyncRecorder podNominator framework.PodNominator + podActivator framework.PodActivator extenders []framework.Extender captureProfile CaptureProfile parallelizer parallelize.Parallelizer @@ -200,6 +202,12 @@ func WithPodNominator(nominator framework.PodNominator) Option { } } +func WithPodActivator(activator framework.PodActivator) Option { + return func(o *frameworkOptions) { + o.podActivator = activator + } +} + // WithExtenders sets extenders for the scheduling frameworkImpl. func WithExtenders(extenders []framework.Extender) Option { return func(o *frameworkOptions) { @@ -279,6 +287,7 @@ func NewFramework(ctx context.Context, r Registry, profile *config.KubeScheduler metricsRecorder: options.metricsRecorder, extenders: options.extenders, PodNominator: options.podNominator, + PodActivator: options.podActivator, parallelizer: options.parallelizer, logger: logger, } @@ -427,6 +436,10 @@ func (f *frameworkImpl) SetPodNominator(n framework.PodNominator) { f.PodNominator = n } +func (f *frameworkImpl) SetPodActivator(a framework.PodActivator) { + f.PodActivator = a +} + // Close closes each plugin, when they implement io.Closer interface. func (f *frameworkImpl) Close() error { var errs []error diff --git a/pkg/scheduler/scheduler.go b/pkg/scheduler/scheduler.go index 37591bd29b0..e0b6ba644da 100644 --- a/pkg/scheduler/scheduler.go +++ b/pkg/scheduler/scheduler.go @@ -355,6 +355,7 @@ func New(ctx context.Context, for _, fwk := range profiles { fwk.SetPodNominator(podQueue) + fwk.SetPodActivator(podQueue) } schedulerCache := internalcache.New(ctx, durationToExpireAssumedPod)