diff --git a/pkg/scheduler/backend/queue/scheduling_queue.go b/pkg/scheduler/backend/queue/scheduling_queue.go index d4c5082d139..00ec76f4c8a 100644 --- a/pkg/scheduler/backend/queue/scheduling_queue.go +++ b/pkg/scheduler/backend/queue/scheduling_queue.go @@ -120,6 +120,7 @@ type SchedulingQueue interface { AssignedPodUpdated(logger klog.Logger, oldPod, newPod *v1.Pod, event framework.ClusterEvent) PendingPods() ([]*v1.Pod, string) PodsInActiveQ() []*v1.Pod + InFlightPods() []*v1.Pod // Close closes the SchedulingQueue so that the goroutine which is // waiting to pop items can exit gracefully. Close() @@ -837,6 +838,15 @@ func (p *PriorityQueue) Done(pod types.UID) { p.activeQ.done(pod) } +func (p *PriorityQueue) InFlightPods() []*v1.Pod { + if !p.isSchedulingQueueHintEnabled { + // do nothing if schedulingQueueHint is disabled. + // In that case, we don't have inFlightPods and inFlightEvents. + return nil + } + return p.activeQ.listInFlightPods() +} + // isPodUpdated checks if the pod is updated in a way that it may have become // schedulable. It drops status of the pod and compares it with old version, // except for pod.status.resourceClaimStatuses: changing that may have an diff --git a/pkg/scheduler/schedule_one.go b/pkg/scheduler/schedule_one.go index 513fe5e029a..581fd3c7ed3 100644 --- a/pkg/scheduler/schedule_one.go +++ b/pkg/scheduler/schedule_one.go @@ -87,9 +87,12 @@ func (sched *Scheduler) ScheduleOne(ctx context.Context) { // This shouldn't happen, because we only accept for scheduling the pods // which specify a scheduler name that matches one of the profiles. logger.Error(err, "Error occurred") + sched.SchedulingQueue.Done(pod.UID) return } if sched.skipPodSchedule(ctx, fwk, pod) { + // We don't put this Pod back to the queue, but we have to cleanup the in-flight pods/events. + sched.SchedulingQueue.Done(pod.UID) return } diff --git a/pkg/scheduler/schedule_one_test.go b/pkg/scheduler/schedule_one_test.go index c548f6f710b..da67dc9b03a 100644 --- a/pkg/scheduler/schedule_one_test.go +++ b/pkg/scheduler/schedule_one_test.go @@ -41,16 +41,19 @@ import ( "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/util/sets" "k8s.io/apimachinery/pkg/util/wait" + utilfeature "k8s.io/apiserver/pkg/util/feature" "k8s.io/client-go/informers" clientsetfake "k8s.io/client-go/kubernetes/fake" "k8s.io/client-go/kubernetes/scheme" clienttesting "k8s.io/client-go/testing" clientcache "k8s.io/client-go/tools/cache" "k8s.io/client-go/tools/events" + featuregatetesting "k8s.io/component-base/featuregate/testing" "k8s.io/component-helpers/storage/volume" "k8s.io/klog/v2" "k8s.io/klog/v2/ktesting" extenderv1 "k8s.io/kube-scheduler/extender/v1" + "k8s.io/kubernetes/pkg/features" schedulerapi "k8s.io/kubernetes/pkg/scheduler/apis/config" internalcache "k8s.io/kubernetes/pkg/scheduler/backend/cache" fakecache "k8s.io/kubernetes/pkg/scheduler/backend/cache/fake" @@ -747,103 +750,109 @@ func TestSchedulerScheduleOne(t *testing.T) { }, } - for _, item := range table { - t.Run(item.name, func(t *testing.T) { - var gotError error - var gotPod *v1.Pod - var gotForgetPod *v1.Pod - var gotAssumedPod *v1.Pod - var gotBinding *v1.Binding - cache := &fakecache.Cache{ - ForgetFunc: func(pod *v1.Pod) { - gotForgetPod = pod - }, - AssumeFunc: func(pod *v1.Pod) { - gotAssumedPod = pod - }, - IsAssumedPodFunc: func(pod *v1.Pod) bool { - if pod == nil || gotAssumedPod == nil { - return false + for _, qHintEnabled := range []bool{true, false} { + for _, item := range table { + t.Run(fmt.Sprintf("[QueueingHint: %v] %s", qHintEnabled, item.name), func(t *testing.T) { + featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.SchedulerQueueingHints, qHintEnabled) + var gotError error + var gotPod *v1.Pod + var gotForgetPod *v1.Pod + var gotAssumedPod *v1.Pod + var gotBinding *v1.Binding + cache := &fakecache.Cache{ + ForgetFunc: func(pod *v1.Pod) { + gotForgetPod = pod + }, + AssumeFunc: func(pod *v1.Pod) { + gotAssumedPod = pod + }, + IsAssumedPodFunc: func(pod *v1.Pod) bool { + if pod == nil || gotAssumedPod == nil { + return false + } + return pod.UID == gotAssumedPod.UID + }, + } + client := clientsetfake.NewClientset(item.sendPod) + client.PrependReactor("create", "pods", func(action clienttesting.Action) (bool, runtime.Object, error) { + if action.GetSubresource() != "binding" { + return false, nil, nil } - return pod.UID == gotAssumedPod.UID - }, - } - client := clientsetfake.NewClientset(item.sendPod) - client.PrependReactor("create", "pods", func(action clienttesting.Action) (bool, runtime.Object, error) { - if action.GetSubresource() != "binding" { - return false, nil, nil + gotBinding = action.(clienttesting.CreateAction).GetObject().(*v1.Binding) + return true, gotBinding, item.injectBindError + }) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + fwk, err := tf.NewFramework(ctx, + append(item.registerPluginFuncs, + tf.RegisterQueueSortPlugin(queuesort.Name, queuesort.New), + tf.RegisterBindPlugin(defaultbinder.Name, defaultbinder.New), + ), + testSchedulerName, + frameworkruntime.WithClientSet(client), + frameworkruntime.WithEventRecorder(eventBroadcaster.NewRecorder(scheme.Scheme, testSchedulerName)), + frameworkruntime.WithWaitingPods(frameworkruntime.NewWaitingPodsMap()), + ) + if err != nil { + t.Fatal(err) } - gotBinding = action.(clienttesting.CreateAction).GetObject().(*v1.Binding) - return true, gotBinding, item.injectBindError - }) - registerPluginFuncs := append(item.registerPluginFuncs, - tf.RegisterQueueSortPlugin(queuesort.Name, queuesort.New), - tf.RegisterBindPlugin(defaultbinder.Name, defaultbinder.New), - ) - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - fwk, err := tf.NewFramework(ctx, - registerPluginFuncs, - testSchedulerName, - frameworkruntime.WithClientSet(client), - frameworkruntime.WithEventRecorder(eventBroadcaster.NewRecorder(scheme.Scheme, testSchedulerName)), - frameworkruntime.WithWaitingPods(frameworkruntime.NewWaitingPodsMap()), - ) - if err != nil { - t.Fatal(err) - } - informerFactory := informers.NewSharedInformerFactory(client, 0) - sched := &Scheduler{ - Cache: cache, - client: client, - NextPod: func(logger klog.Logger) (*framework.QueuedPodInfo, error) { - return &framework.QueuedPodInfo{PodInfo: mustNewPodInfo(t, item.sendPod)}, nil - }, - SchedulingQueue: internalqueue.NewSchedulingQueue(nil, informerFactory), - Profiles: profile.Map{testSchedulerName: fwk}, - } - - sched.SchedulePod = func(ctx context.Context, fwk framework.Framework, state *framework.CycleState, pod *v1.Pod) (ScheduleResult, error) { - return item.mockResult.result, item.mockResult.err - } - sched.FailureHandler = func(_ context.Context, fwk framework.Framework, p *framework.QueuedPodInfo, status *framework.Status, _ *framework.NominatingInfo, _ time.Time) { - gotPod = p.Pod - gotError = status.AsError() - - msg := truncateMessage(gotError.Error()) - fwk.EventRecorder().Eventf(p.Pod, nil, v1.EventTypeWarning, "FailedScheduling", "Scheduling", msg) - } - called := make(chan struct{}) - stopFunc, err := eventBroadcaster.StartEventWatcher(func(obj runtime.Object) { - e, _ := obj.(*eventsv1.Event) - if e.Reason != item.eventReason { - t.Errorf("got event %v, want %v", e.Reason, item.eventReason) + informerFactory := informers.NewSharedInformerFactory(client, 0) + queue := internalqueue.NewSchedulingQueue(nil, informerFactory) + sched := &Scheduler{ + Cache: cache, + client: client, + NextPod: queue.Pop, + SchedulingQueue: queue, + Profiles: profile.Map{testSchedulerName: fwk}, } - close(called) + queue.Add(klog.Logger{}, item.sendPod) + + sched.SchedulePod = func(ctx context.Context, fwk framework.Framework, state *framework.CycleState, pod *v1.Pod) (ScheduleResult, error) { + return item.mockResult.result, item.mockResult.err + } + sched.FailureHandler = func(_ context.Context, fwk framework.Framework, p *framework.QueuedPodInfo, status *framework.Status, _ *framework.NominatingInfo, _ time.Time) { + gotPod = p.Pod + gotError = status.AsError() + + msg := truncateMessage(gotError.Error()) + fwk.EventRecorder().Eventf(p.Pod, nil, v1.EventTypeWarning, "FailedScheduling", "Scheduling", msg) + queue.Done(p.Pod.UID) + } + called := make(chan struct{}) + stopFunc, err := eventBroadcaster.StartEventWatcher(func(obj runtime.Object) { + e, _ := obj.(*eventsv1.Event) + if e.Reason != item.eventReason { + t.Errorf("got event %v, want %v", e.Reason, item.eventReason) + } + close(called) + }) + if err != nil { + t.Fatal(err) + } + sched.ScheduleOne(ctx) + <-called + if e, a := item.expectAssumedPod, gotAssumedPod; !reflect.DeepEqual(e, a) { + t.Errorf("assumed pod: wanted %v, got %v", e, a) + } + if e, a := item.expectErrorPod, gotPod; !reflect.DeepEqual(e, a) { + t.Errorf("error pod: wanted %v, got %v", e, a) + } + if e, a := item.expectForgetPod, gotForgetPod; !reflect.DeepEqual(e, a) { + t.Errorf("forget pod: wanted %v, got %v", e, a) + } + if e, a := item.expectError, gotError; !reflect.DeepEqual(e, a) { + t.Errorf("error: wanted %v, got %v", e, a) + } + if diff := cmp.Diff(item.expectBind, gotBinding); diff != "" { + t.Errorf("got binding diff (-want, +got): %s", diff) + } + if len(queue.InFlightPods()) != 0 { + t.Errorf("in-flight pods should be always empty after SchedulingOne. It has %v Pods", len(queue.InFlightPods())) + } + stopFunc() }) - if err != nil { - t.Fatal(err) - } - sched.ScheduleOne(ctx) - <-called - if e, a := item.expectAssumedPod, gotAssumedPod; !reflect.DeepEqual(e, a) { - t.Errorf("assumed pod: wanted %v, got %v", e, a) - } - if e, a := item.expectErrorPod, gotPod; !reflect.DeepEqual(e, a) { - t.Errorf("error pod: wanted %v, got %v", e, a) - } - if e, a := item.expectForgetPod, gotForgetPod; !reflect.DeepEqual(e, a) { - t.Errorf("forget pod: wanted %v, got %v", e, a) - } - if e, a := item.expectError, gotError; !reflect.DeepEqual(e, a) { - t.Errorf("error: wanted %v, got %v", e, a) - } - if diff := cmp.Diff(item.expectBind, gotBinding); diff != "" { - t.Errorf("got binding diff (-want, +got): %s", diff) - } - stopFunc() - }) + } } }