diff --git a/pkg/scheduler/extender_test.go b/pkg/scheduler/extender_test.go index 113f8c4906f..6286bbf85b7 100644 --- a/pkg/scheduler/extender_test.go +++ b/pkg/scheduler/extender_test.go @@ -301,7 +301,6 @@ func TestSchedulerWithExtenders(t *testing.T) { nil, nil, nil, - nil, emptySnapshot, schedulerapi.DefaultPercentageOfNodesToScore) podIgnored := &v1.Pod{} diff --git a/pkg/scheduler/framework/preemption/preemption.go b/pkg/scheduler/framework/preemption/preemption.go index af5e46d1ae2..673a007f9af 100644 --- a/pkg/scheduler/framework/preemption/preemption.go +++ b/pkg/scheduler/framework/preemption/preemption.go @@ -138,7 +138,7 @@ type Evaluator struct { func (ev *Evaluator) Preempt(ctx context.Context, pod *v1.Pod, m framework.NodeToStatusMap) (*framework.PostFilterResult, *framework.Status) { // 0) Fetch the latest version of . // It's safe to directly fetch pod here. Because the informer cache has already been - // initialized when creating the Scheduler obj, i.e., factory.go#MakeDefaultErrorFunc(). + // initialized when creating the Scheduler obj. // However, tests may need to manually initialize the shared pod informer. podNamespace, podName := pod.Namespace, pod.Name pod, err := ev.PodLister.Pods(pod.Namespace).Get(pod.Name) diff --git a/pkg/scheduler/schedule_one.go b/pkg/scheduler/schedule_one.go index b0d32e0c92a..fd77597f1bf 100644 --- a/pkg/scheduler/schedule_one.go +++ b/pkg/scheduler/schedule_one.go @@ -26,6 +26,7 @@ import ( "time" v1 "k8s.io/api/core/v1" + apierrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" utilruntime "k8s.io/apimachinery/pkg/util/runtime" "k8s.io/apimachinery/pkg/util/sets" @@ -129,7 +130,7 @@ func (sched *Scheduler) scheduleOne(ctx context.Context) { klog.ErrorS(err, "Error selecting node for pod", "pod", klog.KObj(pod)) metrics.PodScheduleError(fwk.ProfileName(), metrics.SinceInSeconds(start)) } - sched.handleSchedulingFailure(ctx, fwk, podInfo, err, v1.PodReasonUnschedulable, nominatingInfo) + sched.FailureHandler(ctx, fwk, podInfo, err, v1.PodReasonUnschedulable, nominatingInfo) return } metrics.SchedulingAlgorithmLatency.Observe(metrics.SinceInSeconds(start)) @@ -146,7 +147,7 @@ func (sched *Scheduler) scheduleOne(ctx context.Context) { // This relies on the fact that Error will check if the pod has been bound // to a node and if so will not add it back to the unscheduled pods queue // (otherwise this would cause an infinite loop). - sched.handleSchedulingFailure(ctx, fwk, assumedPodInfo, err, SchedulerError, clearNominatedNode) + sched.FailureHandler(ctx, fwk, assumedPodInfo, err, SchedulerError, clearNominatedNode) return } @@ -158,7 +159,7 @@ func (sched *Scheduler) scheduleOne(ctx context.Context) { if forgetErr := sched.Cache.ForgetPod(assumedPod); forgetErr != nil { klog.ErrorS(forgetErr, "Scheduler cache ForgetPod failed") } - sched.handleSchedulingFailure(ctx, fwk, assumedPodInfo, sts.AsError(), SchedulerError, clearNominatedNode) + sched.FailureHandler(ctx, fwk, assumedPodInfo, sts.AsError(), SchedulerError, clearNominatedNode) return } @@ -178,7 +179,7 @@ func (sched *Scheduler) scheduleOne(ctx context.Context) { if forgetErr := sched.Cache.ForgetPod(assumedPod); forgetErr != nil { klog.ErrorS(forgetErr, "Scheduler cache ForgetPod failed") } - sched.handleSchedulingFailure(ctx, fwk, assumedPodInfo, runPermitStatus.AsError(), reason, clearNominatedNode) + sched.FailureHandler(ctx, fwk, assumedPodInfo, runPermitStatus.AsError(), reason, clearNominatedNode) return } @@ -221,7 +222,7 @@ func (sched *Scheduler) scheduleOne(ctx context.Context) { return assumedPod.UID != pod.UID }) } - sched.handleSchedulingFailure(ctx, fwk, assumedPodInfo, waitOnPermitStatus.AsError(), reason, clearNominatedNode) + sched.FailureHandler(ctx, fwk, assumedPodInfo, waitOnPermitStatus.AsError(), reason, clearNominatedNode) return } @@ -239,7 +240,7 @@ func (sched *Scheduler) scheduleOne(ctx context.Context) { // TODO(#103853): de-duplicate the logic. sched.SchedulingQueue.MoveAllToActiveOrBackoffQueue(internalqueue.AssignedPodDelete, nil) } - sched.handleSchedulingFailure(ctx, fwk, assumedPodInfo, preBindStatus.AsError(), SchedulerError, clearNominatedNode) + sched.FailureHandler(ctx, fwk, assumedPodInfo, preBindStatus.AsError(), SchedulerError, clearNominatedNode) return } @@ -256,7 +257,7 @@ func (sched *Scheduler) scheduleOne(ctx context.Context) { // TODO(#103853): de-duplicate the logic. sched.SchedulingQueue.MoveAllToActiveOrBackoffQueue(internalqueue.AssignedPodDelete, nil) } - sched.handleSchedulingFailure(ctx, fwk, assumedPodInfo, fmt.Errorf("binding rejected: %w", err), SchedulerError, clearNominatedNode) + sched.FailureHandler(ctx, fwk, assumedPodInfo, fmt.Errorf("binding rejected: %w", err), SchedulerError, clearNominatedNode) return } // Calculating nodeResourceString can be heavy. Avoid it if klog verbosity is below 2. @@ -810,7 +811,49 @@ func getAttemptsLabel(p *framework.QueuedPodInfo) string { // handleSchedulingFailure records an event for the pod that indicates the // pod has failed to schedule. Also, update the pod condition and nominated node name if set. func (sched *Scheduler) handleSchedulingFailure(ctx context.Context, fwk framework.Framework, podInfo *framework.QueuedPodInfo, err error, reason string, nominatingInfo *framework.NominatingInfo) { - sched.Error(podInfo, err) + pod := podInfo.Pod + if err == ErrNoNodesAvailable { + klog.V(2).InfoS("Unable to schedule pod; no nodes are registered to the cluster; waiting", "pod", klog.KObj(pod)) + } else if fitError, ok := err.(*framework.FitError); ok { + // Inject UnschedulablePlugins to PodInfo, which will be used later for moving Pods between queues efficiently. + podInfo.UnschedulablePlugins = fitError.Diagnosis.UnschedulablePlugins + klog.V(2).InfoS("Unable to schedule pod; no fit; waiting", "pod", klog.KObj(pod), "err", err) + } else if apierrors.IsNotFound(err) { + klog.V(2).InfoS("Unable to schedule pod, possibly due to node not found; waiting", "pod", klog.KObj(pod), "err", err) + if errStatus, ok := err.(apierrors.APIStatus); ok && errStatus.Status().Details.Kind == "node" { + nodeName := errStatus.Status().Details.Name + // when node is not found, We do not remove the node right away. Trying again to get + // the node and if the node is still not found, then remove it from the scheduler cache. + _, err := fwk.ClientSet().CoreV1().Nodes().Get(context.TODO(), nodeName, metav1.GetOptions{}) + if err != nil && apierrors.IsNotFound(err) { + node := v1.Node{ObjectMeta: metav1.ObjectMeta{Name: nodeName}} + if err := sched.Cache.RemoveNode(&node); err != nil { + klog.V(4).InfoS("Node is not found; failed to remove it from the cache", "node", node.Name) + } + } + } + } else { + klog.ErrorS(err, "Error scheduling pod; retrying", "pod", klog.KObj(pod)) + } + + // Check if the Pod exists in informer cache. + podLister := fwk.SharedInformerFactory().Core().V1().Pods().Lister() + cachedPod, e := podLister.Pods(pod.Namespace).Get(pod.Name) + if e != nil { + klog.InfoS("Pod doesn't exist in informer cache", "pod", klog.KObj(pod), "err", e) + } else { + // In the case of extender, the pod may have been bound successfully, but timed out returning its response to the scheduler. + // It could result in the live version to carry .spec.nodeName, and that's inconsistent with the internal-queued version. + if len(cachedPod.Spec.NodeName) != 0 { + klog.InfoS("Pod has been assigned to node. Abort adding it back to queue.", "pod", klog.KObj(pod), "node", cachedPod.Spec.NodeName) + } else { + // As is from SharedInformer, we need to do a DeepCopy() here. + podInfo.PodInfo = framework.NewPodInfo(cachedPod.DeepCopy()) + if err := sched.SchedulingQueue.AddUnschedulableIfNotPresent(podInfo, sched.SchedulingQueue.SchedulingCycle()); err != nil { + klog.ErrorS(err, "Error occurred") + } + } + } // Update the scheduling queue with the nominated pod information. Without // this, there would be a race condition between the next scheduling cycle @@ -820,7 +863,11 @@ func (sched *Scheduler) handleSchedulingFailure(ctx context.Context, fwk framewo sched.SchedulingQueue.AddNominatedPod(podInfo.PodInfo, nominatingInfo) } - pod := podInfo.Pod + if err == nil { + // Only tests can reach here. + return + } + msg := truncateMessage(err.Error()) fwk.EventRecorder().Eventf(pod, nil, v1.EventTypeWarning, "FailedScheduling", "Scheduling", msg) if err := updatePod(ctx, sched.client, pod, &v1.PodCondition{ diff --git a/pkg/scheduler/schedule_one_test.go b/pkg/scheduler/schedule_one_test.go index c563e6c8952..1315557c4f3 100644 --- a/pkg/scheduler/schedule_one_test.go +++ b/pkg/scheduler/schedule_one_test.go @@ -590,10 +590,6 @@ func TestSchedulerScheduleOne(t *testing.T) { func() *framework.QueuedPodInfo { return &framework.QueuedPodInfo{PodInfo: framework.NewPodInfo(item.sendPod)} }, - func(p *framework.QueuedPodInfo, err error) { - gotPod = p.Pod - gotError = err - }, nil, internalqueue.NewTestQueue(ctx, nil), profile.Map{ @@ -605,6 +601,13 @@ func TestSchedulerScheduleOne(t *testing.T) { s.SchedulePod = func(ctx context.Context, fwk framework.Framework, state *framework.CycleState, pod *v1.Pod) (ScheduleResult, error) { return item.mockResult.result, item.mockResult.err } + s.FailureHandler = func(_ context.Context, fwk framework.Framework, p *framework.QueuedPodInfo, err error, _ string, _ *framework.NominatingInfo) { + gotPod = p.Pod + gotError = err + + msg := truncateMessage(err.Error()) + fwk.EventRecorder().Eventf(p.Pod, nil, v1.EventTypeWarning, "FailedScheduling", "Scheduling", msg) + } called := make(chan struct{}) stopFunc := eventBroadcaster.StartEventWatcher(func(obj runtime.Object) { e, _ := obj.(*eventsv1.Event) @@ -2026,7 +2029,6 @@ func TestSchedulerSchedulePod(t *testing.T) { nil, nil, nil, - nil, snapshot, schedulerapi.DefaultPercentageOfNodesToScore) informerFactory.Start(ctx.Done()) @@ -2328,7 +2330,6 @@ func TestZeroRequest(t *testing.T) { nil, nil, nil, - nil, snapshot, schedulerapi.DefaultPercentageOfNodesToScore) @@ -2512,7 +2513,6 @@ func TestPreferNominatedNodeFilterCallCounts(t *testing.T) { nil, nil, nil, - nil, snapshot, schedulerapi.DefaultPercentageOfNodesToScore) @@ -2574,7 +2574,6 @@ func makeScheduler(nodes []*v1.Node) *Scheduler { nil, nil, nil, - nil, emptySnapshot, schedulerapi.DefaultPercentageOfNodesToScore) cache.UpdateSnapshot(s.nodeInfoSnapshot) @@ -2671,9 +2670,6 @@ func setupTestScheduler(ctx context.Context, queuedPodStore *clientcache.FIFO, c func() *framework.QueuedPodInfo { return &framework.QueuedPodInfo{PodInfo: framework.NewPodInfo(clientcache.Pop(queuedPodStore).(*v1.Pod))} }, - func(p *framework.QueuedPodInfo, err error) { - errChan <- err - }, nil, schedulingQueue, profile.Map{ @@ -2682,6 +2678,12 @@ func setupTestScheduler(ctx context.Context, queuedPodStore *clientcache.FIFO, c client, internalcache.NewEmptySnapshot(), schedulerapi.DefaultPercentageOfNodesToScore) + sched.FailureHandler = func(_ context.Context, _ framework.Framework, p *framework.QueuedPodInfo, err error, _ string, _ *framework.NominatingInfo) { + errChan <- err + + msg := truncateMessage(err.Error()) + fwk.EventRecorder().Eventf(p.Pod, nil, v1.EventTypeWarning, "FailedScheduling", "Scheduling", msg) + } return sched, bindingChan, errChan } diff --git a/pkg/scheduler/scheduler.go b/pkg/scheduler/scheduler.go index c6f89232d9b..b1558beef27 100644 --- a/pkg/scheduler/scheduler.go +++ b/pkg/scheduler/scheduler.go @@ -23,7 +23,6 @@ import ( "time" v1 "k8s.io/api/core/v1" - apierrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/util/sets" "k8s.io/apimachinery/pkg/util/wait" @@ -31,7 +30,6 @@ import ( "k8s.io/client-go/informers" coreinformers "k8s.io/client-go/informers/core/v1" clientset "k8s.io/client-go/kubernetes" - corelisters "k8s.io/client-go/listers/core/v1" restclient "k8s.io/client-go/rest" "k8s.io/client-go/tools/cache" "k8s.io/klog/v2" @@ -74,9 +72,8 @@ type Scheduler struct { // stale while they sit in a channel. NextPod func() *framework.QueuedPodInfo - // Error is called if there is an error. It is passed the pod in - // question, and the error - Error func(*framework.QueuedPodInfo, error) + // FailureHandler is called upon a scheduling failure. + FailureHandler FailureHandlerFn // SchedulePod tries to schedule the given pod to one of the nodes in the node list. // Return a struct of ScheduleResult with the name of suggested host on success, @@ -318,7 +315,6 @@ func New(client clientset.Interface, schedulerCache, extenders, internalqueue.MakeNextPodFunc(podQueue), - MakeDefaultErrorFunc(client, podLister, podQueue, schedulerCache), stopEverything, podQueue, profiles, @@ -348,56 +344,6 @@ func (sched *Scheduler) Run(ctx context.Context) { sched.SchedulingQueue.Close() } -// MakeDefaultErrorFunc construct a function to handle pod scheduler error -func MakeDefaultErrorFunc(client clientset.Interface, podLister corelisters.PodLister, podQueue internalqueue.SchedulingQueue, schedulerCache internalcache.Cache) func(*framework.QueuedPodInfo, error) { - return func(podInfo *framework.QueuedPodInfo, err error) { - pod := podInfo.Pod - if err == ErrNoNodesAvailable { - klog.V(2).InfoS("Unable to schedule pod; no nodes are registered to the cluster; waiting", "pod", klog.KObj(pod)) - } else if fitError, ok := err.(*framework.FitError); ok { - // Inject UnschedulablePlugins to PodInfo, which will be used later for moving Pods between queues efficiently. - podInfo.UnschedulablePlugins = fitError.Diagnosis.UnschedulablePlugins - klog.V(2).InfoS("Unable to schedule pod; no fit; waiting", "pod", klog.KObj(pod), "err", err) - } else if apierrors.IsNotFound(err) { - klog.V(2).InfoS("Unable to schedule pod, possibly due to node not found; waiting", "pod", klog.KObj(pod), "err", err) - if errStatus, ok := err.(apierrors.APIStatus); ok && errStatus.Status().Details.Kind == "node" { - nodeName := errStatus.Status().Details.Name - // when node is not found, We do not remove the node right away. Trying again to get - // the node and if the node is still not found, then remove it from the scheduler cache. - _, err := client.CoreV1().Nodes().Get(context.TODO(), nodeName, metav1.GetOptions{}) - if err != nil && apierrors.IsNotFound(err) { - node := v1.Node{ObjectMeta: metav1.ObjectMeta{Name: nodeName}} - if err := schedulerCache.RemoveNode(&node); err != nil { - klog.V(4).InfoS("Node is not found; failed to remove it from the cache", "node", node.Name) - } - } - } - } else { - klog.ErrorS(err, "Error scheduling pod; retrying", "pod", klog.KObj(pod)) - } - - // Check if the Pod exists in informer cache. - cachedPod, err := podLister.Pods(pod.Namespace).Get(pod.Name) - if err != nil { - klog.InfoS("Pod doesn't exist in informer cache", "pod", klog.KObj(pod), "err", err) - return - } - - // In the case of extender, the pod may have been bound successfully, but timed out returning its response to the scheduler. - // It could result in the live version to carry .spec.nodeName, and that's inconsistent with the internal-queued version. - if len(cachedPod.Spec.NodeName) != 0 { - klog.InfoS("Pod has been assigned to node. Abort adding it back to queue.", "pod", klog.KObj(pod), "node", cachedPod.Spec.NodeName) - return - } - - // As is from SharedInformer, we need to do a DeepCopy() here. - podInfo.PodInfo = framework.NewPodInfo(cachedPod.DeepCopy()) - if err := podQueue.AddUnschedulableIfNotPresent(podInfo, podQueue.SchedulingCycle()); err != nil { - klog.ErrorS(err, "Error occurred") - } - } -} - // NewInformerFactory creates a SharedInformerFactory and initializes a scheduler specific // in-place podInformer. func NewInformerFactory(cs clientset.Interface, resyncPeriod time.Duration) informers.SharedInformerFactory { @@ -464,12 +410,13 @@ func buildExtenders(extenders []schedulerapi.Extender, profiles []schedulerapi.K return fExtenders, nil } +type FailureHandlerFn func(ctx context.Context, fwk framework.Framework, podInfo *framework.QueuedPodInfo, err error, reason string, nominatingInfo *framework.NominatingInfo) + // newScheduler creates a Scheduler object. func newScheduler( cache internalcache.Cache, extenders []framework.Extender, nextPod func() *framework.QueuedPodInfo, - Error func(*framework.QueuedPodInfo, error), stopEverything <-chan struct{}, schedulingQueue internalqueue.SchedulingQueue, profiles profile.Map, @@ -480,7 +427,6 @@ func newScheduler( Cache: cache, Extenders: extenders, NextPod: nextPod, - Error: Error, StopEverything: stopEverything, SchedulingQueue: schedulingQueue, Profiles: profiles, @@ -489,6 +435,7 @@ func newScheduler( percentageOfNodesToScore: percentageOfNodesToScore, } sched.SchedulePod = sched.schedulePod + sched.FailureHandler = sched.handleSchedulingFailure return &sched } diff --git a/pkg/scheduler/scheduler_test.go b/pkg/scheduler/scheduler_test.go index 47298d5869a..28ab7d4a492 100644 --- a/pkg/scheduler/scheduler_test.go +++ b/pkg/scheduler/scheduler_test.go @@ -30,12 +30,15 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/util/sets" "k8s.io/client-go/informers" + "k8s.io/client-go/kubernetes" "k8s.io/client-go/kubernetes/fake" + "k8s.io/client-go/kubernetes/scheme" "k8s.io/client-go/tools/cache" "k8s.io/client-go/tools/events" schedulerapi "k8s.io/kubernetes/pkg/scheduler/apis/config" "k8s.io/kubernetes/pkg/scheduler/framework" "k8s.io/kubernetes/pkg/scheduler/framework/plugins/defaultbinder" + "k8s.io/kubernetes/pkg/scheduler/framework/plugins/queuesort" frameworkruntime "k8s.io/kubernetes/pkg/scheduler/framework/runtime" internalcache "k8s.io/kubernetes/pkg/scheduler/internal/cache" internalqueue "k8s.io/kubernetes/pkg/scheduler/internal/queue" @@ -226,7 +229,7 @@ func TestSchedulerCreation(t *testing.T) { } } -func TestDefaultErrorFunc(t *testing.T) { +func TestFailureHandler(t *testing.T) { testPod := st.MakePod().Name("test-pod").Namespace(v1.NamespaceDefault).Obj() testPodUpdated := testPod.DeepCopy() testPodUpdated.Labels = map[string]string{"foo": ""} @@ -259,8 +262,8 @@ func TestDefaultErrorFunc(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - stopCh := make(chan struct{}) - defer close(stopCh) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() client := fake.NewSimpleClientset(&v1.PodList{Items: []v1.Pod{*testPod}}) informerFactory := informers.NewSharedInformerFactory(client, 0) @@ -269,7 +272,7 @@ func TestDefaultErrorFunc(t *testing.T) { podInformer.Informer().GetStore().Add(testPod) queue := internalqueue.NewPriorityQueue(nil, informerFactory, internalqueue.WithClock(testingclock.NewFakeClock(time.Now()))) - schedulerCache := internalcache.New(30*time.Second, stopCh) + schedulerCache := internalcache.New(30*time.Second, ctx.Done()) queue.Add(testPod) queue.Pop() @@ -283,9 +286,13 @@ func TestDefaultErrorFunc(t *testing.T) { queue.Delete(testPod) } + s, fwk, err := initScheduler(ctx.Done(), schedulerCache, queue, client, informerFactory) + if err != nil { + t.Fatal(err) + } + testPodInfo := &framework.QueuedPodInfo{PodInfo: framework.NewPodInfo(testPod)} - errFunc := MakeDefaultErrorFunc(client, podInformer.Lister(), queue, schedulerCache) - errFunc(testPodInfo, tt.injectErr) + s.FailureHandler(ctx, fwk, testPodInfo, tt.injectErr, v1.PodReasonUnschedulable, nil) var got *v1.Pod if tt.podUpdatedDuringScheduling { @@ -305,7 +312,7 @@ func TestDefaultErrorFunc(t *testing.T) { } } -func TestDefaultErrorFunc_NodeNotFound(t *testing.T) { +func TestFailureHandler_NodeNotFound(t *testing.T) { nodeFoo := &v1.Node{ObjectMeta: metav1.ObjectMeta{Name: "foo"}} nodeBar := &v1.Node{ObjectMeta: metav1.ObjectMeta{Name: "bar"}} testPod := st.MakePod().Name("test-pod").Namespace(v1.NamespaceDefault).Obj() @@ -354,9 +361,13 @@ func TestDefaultErrorFunc_NodeNotFound(t *testing.T) { } } + s, fwk, err := initScheduler(ctx.Done(), schedulerCache, queue, client, informerFactory) + if err != nil { + t.Fatal(err) + } + testPodInfo := &framework.QueuedPodInfo{PodInfo: framework.NewPodInfo(testPod)} - errFunc := MakeDefaultErrorFunc(client, podInformer.Lister(), queue, schedulerCache) - errFunc(testPodInfo, tt.injectErr) + s.FailureHandler(ctx, fwk, testPodInfo, tt.injectErr, v1.PodReasonUnschedulable, nil) gotNodes := schedulerCache.Dump().Nodes gotNodeNames := sets.NewString() @@ -370,9 +381,9 @@ func TestDefaultErrorFunc_NodeNotFound(t *testing.T) { } } -func TestDefaultErrorFunc_PodAlreadyBound(t *testing.T) { - stopCh := make(chan struct{}) - defer close(stopCh) +func TestFailureHandler_PodAlreadyBound(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() nodeFoo := v1.Node{ObjectMeta: metav1.ObjectMeta{Name: "foo"}} testPod := st.MakePod().Name("test-pod").Namespace(v1.NamespaceDefault).Node("foo").Obj() @@ -384,14 +395,18 @@ func TestDefaultErrorFunc_PodAlreadyBound(t *testing.T) { podInformer.Informer().GetStore().Add(testPod) queue := internalqueue.NewPriorityQueue(nil, informerFactory, internalqueue.WithClock(testingclock.NewFakeClock(time.Now()))) - schedulerCache := internalcache.New(30*time.Second, stopCh) + schedulerCache := internalcache.New(30*time.Second, ctx.Done()) // Add node to schedulerCache no matter it's deleted in API server or not. schedulerCache.AddNode(&nodeFoo) + s, fwk, err := initScheduler(ctx.Done(), schedulerCache, queue, client, informerFactory) + if err != nil { + t.Fatal(err) + } + testPodInfo := &framework.QueuedPodInfo{PodInfo: framework.NewPodInfo(testPod)} - errFunc := MakeDefaultErrorFunc(client, podInformer.Lister(), queue, schedulerCache) - errFunc(testPodInfo, fmt.Errorf("binding rejected: timeout")) + s.FailureHandler(ctx, fwk, testPodInfo, fmt.Errorf("binding rejected: timeout"), v1.PodReasonUnschedulable, nil) pod := getPodFromPriorityQueue(queue, testPod) if pod != nil { @@ -425,3 +440,35 @@ func getPodFromPriorityQueue(queue *internalqueue.PriorityQueue, pod *v1.Pod) *v return nil } + +func initScheduler(stop <-chan struct{}, cache internalcache.Cache, queue internalqueue.SchedulingQueue, + client kubernetes.Interface, informerFactory informers.SharedInformerFactory) (*Scheduler, framework.Framework, error) { + registerPluginFuncs := []st.RegisterPluginFunc{ + st.RegisterQueueSortPlugin(queuesort.Name, queuesort.New), + st.RegisterBindPlugin(defaultbinder.Name, defaultbinder.New), + } + eventBroadcaster := events.NewBroadcaster(&events.EventSinkImpl{Interface: client.EventsV1()}) + fwk, err := st.NewFramework(registerPluginFuncs, + testSchedulerName, + frameworkruntime.WithClientSet(client), + frameworkruntime.WithInformerFactory(informerFactory), + frameworkruntime.WithEventRecorder(eventBroadcaster.NewRecorder(scheme.Scheme, testSchedulerName)), + ) + if err != nil { + return nil, nil, err + } + + s := newScheduler( + cache, + nil, + nil, + stop, + queue, + profile.Map{testSchedulerName: fwk}, + client, + nil, + 0, + ) + + return s, fwk, nil +} diff --git a/test/integration/scheduler/queue_test.go b/test/integration/scheduler/queue_test.go index 8c9364deb4f..23039d10191 100644 --- a/test/integration/scheduler/queue_test.go +++ b/test/integration/scheduler/queue_test.go @@ -105,7 +105,7 @@ func TestCoreResourceEnqueue(t *testing.T) { if fitError == nil { t.Fatalf("Expect Pod %v to fail at scheduling.", podInfo.Pod.Name) } - testCtx.Scheduler.Error(podInfo, fitError) + testCtx.Scheduler.FailureHandler(ctx, fwk, podInfo, fitError, v1.PodReasonUnschedulable, nil) } // Trigger a NodeTaintChange event. @@ -280,7 +280,7 @@ func TestCustomResourceEnqueue(t *testing.T) { if fitError == nil { t.Fatalf("Expect Pod %v to fail at scheduling.", podInfo.Pod.Name) } - testCtx.Scheduler.Error(podInfo, fitError) + testCtx.Scheduler.FailureHandler(ctx, fwk, podInfo, fitError, v1.PodReasonUnschedulable, nil) // Scheduling cycle is incremented from 0 to 1 after NextPod() is called, so // pass a number larger than 1 to move Pod to unschedulablePods.