diff --git a/pkg/scheduler/BUILD b/pkg/scheduler/BUILD index b58321c92e9..5f4c5b5cc85 100644 --- a/pkg/scheduler/BUILD +++ b/pkg/scheduler/BUILD @@ -42,7 +42,6 @@ go_library( "//staging/src/k8s.io/client-go/informers/core/v1:go_default_library", "//staging/src/k8s.io/client-go/kubernetes:go_default_library", "//staging/src/k8s.io/client-go/listers/core/v1:go_default_library", - "//staging/src/k8s.io/client-go/listers/policy/v1beta1:go_default_library", "//staging/src/k8s.io/client-go/tools/cache:go_default_library", "//vendor/github.com/google/go-cmp/cmp:go_default_library", "//vendor/k8s.io/klog/v2:go_default_library", diff --git a/pkg/scheduler/core/BUILD b/pkg/scheduler/core/BUILD index 6a29762ac58..b72c559a105 100644 --- a/pkg/scheduler/core/BUILD +++ b/pkg/scheduler/core/BUILD @@ -10,6 +10,7 @@ go_library( visibility = ["//visibility:public"], deps = [ "//pkg/api/v1/pod:go_default_library", + "//pkg/features:go_default_library", "//pkg/scheduler/apis/config:go_default_library", "//pkg/scheduler/framework/v1alpha1:go_default_library", "//pkg/scheduler/internal/cache:go_default_library", @@ -23,8 +24,8 @@ go_library( "//staging/src/k8s.io/apimachinery/pkg/labels:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/util/net:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/util/sets:go_default_library", + "//staging/src/k8s.io/apiserver/pkg/util/feature:go_default_library", "//staging/src/k8s.io/client-go/listers/core/v1:go_default_library", - "//staging/src/k8s.io/client-go/listers/policy/v1beta1:go_default_library", "//staging/src/k8s.io/client-go/rest:go_default_library", "//staging/src/k8s.io/kube-scheduler/extender/v1:go_default_library", "//vendor/k8s.io/klog/v2:go_default_library", diff --git a/pkg/scheduler/core/extender_test.go b/pkg/scheduler/core/extender_test.go index b726b1f1583..f02aa25cb65 100644 --- a/pkg/scheduler/core/extender_test.go +++ b/pkg/scheduler/core/extender_test.go @@ -283,7 +283,6 @@ func TestGenericSchedulerWithExtenders(t *testing.T) { emptySnapshot, extenders, informerFactory.Core().V1().PersistentVolumeClaims().Lister(), - informerFactory.Policy().V1beta1().PodDisruptionBudgets().Lister(), false, schedulerapi.DefaultPercentageOfNodesToScore) podIgnored := &v1.Pod{} diff --git a/pkg/scheduler/core/generic_scheduler.go b/pkg/scheduler/core/generic_scheduler.go index 37fe60cd7e7..1ea72edeba3 100644 --- a/pkg/scheduler/core/generic_scheduler.go +++ b/pkg/scheduler/core/generic_scheduler.go @@ -33,10 +33,11 @@ import ( policy "k8s.io/api/policy/v1beta1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/labels" + utilfeature "k8s.io/apiserver/pkg/util/feature" corelisters "k8s.io/client-go/listers/core/v1" - policylisters "k8s.io/client-go/listers/policy/v1beta1" extenderv1 "k8s.io/kube-scheduler/extender/v1" podutil "k8s.io/kubernetes/pkg/api/v1/pod" + kubefeatures "k8s.io/kubernetes/pkg/features" framework "k8s.io/kubernetes/pkg/scheduler/framework/v1alpha1" internalcache "k8s.io/kubernetes/pkg/scheduler/internal/cache" "k8s.io/kubernetes/pkg/scheduler/internal/parallelize" @@ -100,10 +101,6 @@ func (f *FitError) Error() string { // TODO: Rename this type. type ScheduleAlgorithm interface { Schedule(context.Context, *profile.Profile, *framework.CycleState, *v1.Pod) (scheduleResult ScheduleResult, err error) - // Preempt receives scheduling filter result (NodeToStatusMap) for a pod and tries to create room for - // the pod by preempting lower priority pods if possible. - // It returns the node where preemption happened, and error if any. - Preempt(context.Context, *profile.Profile, *framework.CycleState, *v1.Pod, framework.NodeToStatusMap) (selectedNode string, err error) // Extenders returns a slice of extender config. This is exposed for // testing. Extenders() []framework.Extender @@ -126,7 +123,6 @@ type genericScheduler struct { extenders []framework.Extender nodeInfoSnapshot *internalcache.Snapshot pvcLister corelisters.PersistentVolumeClaimLister - pdbLister policylisters.PodDisruptionBudgetLister disablePreemption bool percentageOfNodesToScore int32 nextStartNodeIndex int @@ -236,7 +232,7 @@ func (g *genericScheduler) selectHost(nodeScoreList framework.NodeScoreList) (st return selected, nil } -// preempt finds nodes with pods that can be preempted to make room for "pod" to +// Preempt finds nodes with pods that can be preempted to make room for "pod" to // schedule. It chooses one of the nodes and preempts the pods on the node and // returns 1) the node, 2) the list of preempted pods if such a node is found, // 3) A list of pods whose nominated node name should be cleared, and 4) any @@ -248,8 +244,8 @@ func (g *genericScheduler) selectHost(nodeScoreList framework.NodeScoreList) (st // other pods with the same priority. The nominated pod prevents other pods from // using the nominated resources and the nominated pod could take a long time // before it is retried after many other pending pods. -func (g *genericScheduler) Preempt(ctx context.Context, prof *profile.Profile, state *framework.CycleState, pod *v1.Pod, m framework.NodeToStatusMap) (string, error) { - cs := prof.ClientSet() +func Preempt(ctx context.Context, fh framework.FrameworkHandle, state *framework.CycleState, pod *v1.Pod, m framework.NodeToStatusMap) (string, error) { + cs := fh.ClientSet() // TODO(Huang-Wei): get pod from informer cache instead of API server. pod, err := util.GetUpdatedPod(cs, pod) if err != nil { @@ -257,11 +253,11 @@ func (g *genericScheduler) Preempt(ctx context.Context, prof *profile.Profile, s return "", err } - if !podEligibleToPreemptOthers(pod, g.nodeInfoSnapshot.NodeInfos()) { + if !podEligibleToPreemptOthers(pod, fh.SnapshotSharedLister().NodeInfos()) { klog.V(5).Infof("Pod %v/%v is not eligible for more preemption.", pod.Namespace, pod.Name) return "", nil } - allNodes, err := g.nodeInfoSnapshot.NodeInfos().List() + allNodes, err := fh.SnapshotSharedLister().NodeInfos().List() if err != nil { return "", err } @@ -285,14 +281,11 @@ func (g *genericScheduler) Preempt(ctx context.Context, prof *profile.Profile, s } klog.Infof("%v potential nodes for preemption, first %v are: %v", len(potentialNodes), len(sample), sample) } - var pdbs []*policy.PodDisruptionBudget - if g.pdbLister != nil { - pdbs, err = g.pdbLister.List(labels.Everything()) - if err != nil { - return "", err - } + pdbs, err := getPodDisruptionBudgets(fh) + if err != nil { + return "", err } - nodeNameToVictims, err := selectNodesForPreemption(ctx, prof, g.podNominator, state, pod, potentialNodes, pdbs) + nodeNameToVictims, err := selectNodesForPreemption(ctx, fh.PreemptHandle(), fh.PreemptHandle(), state, pod, potentialNodes, pdbs) if err != nil { return "", err } @@ -300,7 +293,7 @@ func (g *genericScheduler) Preempt(ctx context.Context, prof *profile.Profile, s // We will only check nodeNameToVictims with extenders that support preemption. // Extenders which do not support preemption may later prevent preemptor from being scheduled on the nominated // node. In that case, scheduler will find a different host for the preemptor in subsequent scheduling cycles. - nodeNameToVictims, err = g.processPreemptionWithExtenders(pod, nodeNameToVictims) + nodeNameToVictims, err = processPreemptionWithExtenders(fh, pod, nodeNameToVictims) if err != nil { return "", err } @@ -317,10 +310,10 @@ func (g *genericScheduler) Preempt(ctx context.Context, prof *profile.Profile, s return "", err } // If the victim is a WaitingPod, send a reject message to the PermitPlugin - if waitingPod := prof.GetWaitingPod(victim.UID); waitingPod != nil { + if waitingPod := fh.GetWaitingPod(victim.UID); waitingPod != nil { waitingPod.Reject("preempted") } - prof.Recorder.Eventf(victim, pod, v1.EventTypeNormal, "Preempted", "Preempting", "Preempted by %v/%v on node %v", pod.Namespace, pod.Name, candidateNode) + fh.EventRecorder().Eventf(victim, pod, v1.EventTypeNormal, "Preempted", "Preempting", "Preempted by %v/%v on node %v", pod.Namespace, pod.Name, candidateNode) } metrics.PreemptionVictims.Observe(float64(len(victims))) @@ -328,7 +321,7 @@ func (g *genericScheduler) Preempt(ctx context.Context, prof *profile.Profile, s // this node. So, we should remove their nomination. Removing their // nomination updates these pods and moves them to the active queue. It // lets scheduler find another place for them. - nominatedPods := g.getLowerPriorityNominatedPods(pod, candidateNode) + nominatedPods := getLowerPriorityNominatedPods(fh.PreemptHandle(), pod, candidateNode) if err := util.ClearNominatedNodeName(cs, nominatedPods...); err != nil { klog.Errorf("Cannot clear 'NominatedNodeName' field: %v", err) // We do not return as this error is not critical. @@ -337,18 +330,22 @@ func (g *genericScheduler) Preempt(ctx context.Context, prof *profile.Profile, s return candidateNode, nil } +func getPodDisruptionBudgets(fh framework.FrameworkHandle) ([]*policy.PodDisruptionBudget, error) { + if utilfeature.DefaultFeatureGate.Enabled(kubefeatures.PodDisruptionBudget) { + return fh.SharedInformerFactory().Policy().V1beta1().PodDisruptionBudgets().Lister().List(labels.Everything()) + } + return nil, nil +} + // processPreemptionWithExtenders processes preemption with extenders -func (g *genericScheduler) processPreemptionWithExtenders( - pod *v1.Pod, - nodeNameToVictims map[string]*extenderv1.Victims, -) (map[string]*extenderv1.Victims, error) { +func processPreemptionWithExtenders(fh framework.FrameworkHandle, pod *v1.Pod, nodeNameToVictims map[string]*extenderv1.Victims) (map[string]*extenderv1.Victims, error) { if len(nodeNameToVictims) > 0 { - for _, extender := range g.extenders { + for _, extender := range fh.PreemptHandle().Extenders() { if extender.SupportsPreemption() && extender.IsInterested(pod) { newNodeNameToVictims, err := extender.ProcessPreemption( pod, nodeNameToVictims, - g.nodeInfoSnapshot.NodeInfos(), + fh.SnapshotSharedLister().NodeInfos(), ) if err != nil { if extender.IsIgnorable() { @@ -381,8 +378,8 @@ func (g *genericScheduler) processPreemptionWithExtenders( // manipulation of NodeInfo and PreFilter state per nominated pod. It may not be // worth the complexity, especially because we generally expect to have a very // small number of nominated pods per node. -func (g *genericScheduler) getLowerPriorityNominatedPods(pod *v1.Pod, nodeName string) []*v1.Pod { - pods := g.podNominator.NominatedPodsForNode(nodeName) +func getLowerPriorityNominatedPods(pn framework.PodNominator, pod *v1.Pod, nodeName string) []*v1.Pod { + pods := pn.NominatedPodsForNode(nodeName) if len(pods) == 0 { return nil @@ -1141,7 +1138,6 @@ func NewGenericScheduler( nodeInfoSnapshot *internalcache.Snapshot, extenders []framework.Extender, pvcLister corelisters.PersistentVolumeClaimLister, - pdbLister policylisters.PodDisruptionBudgetLister, disablePreemption bool, percentageOfNodesToScore int32) ScheduleAlgorithm { return &genericScheduler{ @@ -1150,7 +1146,6 @@ func NewGenericScheduler( extenders: extenders, nodeInfoSnapshot: nodeInfoSnapshot, pvcLister: pvcLister, - pdbLister: pdbLister, disablePreemption: disablePreemption, percentageOfNodesToScore: percentageOfNodesToScore, } diff --git a/pkg/scheduler/core/generic_scheduler_test.go b/pkg/scheduler/core/generic_scheduler_test.go index b05791f4425..a1e13a2ed1e 100644 --- a/pkg/scheduler/core/generic_scheduler_test.go +++ b/pkg/scheduler/core/generic_scheduler_test.go @@ -693,9 +693,6 @@ func TestGenericScheduler(t *testing.T) { } for _, test := range tests { t.Run(test.name, func(t *testing.T) { - client := clientsetfake.NewSimpleClientset() - informerFactory := informers.NewSharedInformerFactory(client, 0) - cache := internalcache.New(time.Duration(0), wait.NeverStop) for _, pod := range test.pods { cache.AddPod(pod) @@ -724,7 +721,6 @@ func TestGenericScheduler(t *testing.T) { snapshot, []framework.Extender{}, pvcLister, - informerFactory.Policy().V1beta1().PodDisruptionBudgets().Lister(), false, schedulerapi.DefaultPercentageOfNodesToScore) result, err := scheduler.Schedule(context.Background(), prof, framework.NewCycleState(), test.pod) @@ -752,7 +748,7 @@ func makeScheduler(nodes []*v1.Node) *genericScheduler { cache, internalqueue.NewSchedulingQueue(nil), emptySnapshot, - nil, nil, nil, false, + nil, nil, false, schedulerapi.DefaultPercentageOfNodesToScore) cache.UpdateSnapshot(s.(*genericScheduler).nodeInfoSnapshot) return s.(*genericScheduler) @@ -1045,7 +1041,6 @@ func TestZeroRequest(t *testing.T) { emptySnapshot, []framework.Extender{}, nil, - nil, false, schedulerapi.DefaultPercentageOfNodesToScore).(*genericScheduler) scheduler.nodeInfoSnapshot = snapshot @@ -1508,9 +1503,6 @@ func TestSelectNodesForPreemption(t *testing.T) { labelKeys := []string{"hostname", "zone", "region"} for _, test := range tests { t.Run(test.name, func(t *testing.T) { - client := clientsetfake.NewSimpleClientset() - informerFactory := informers.NewSharedInformerFactory(client, 0) - filterFailedNodeReturnCodeMap := map[string]framework.Code{} cache := internalcache.New(time.Duration(0), wait.NeverStop) for _, pod := range test.pods { @@ -1558,7 +1550,6 @@ func TestSelectNodesForPreemption(t *testing.T) { snapshot, []framework.Extender{}, nil, - informerFactory.Policy().V1beta1().PodDisruptionBudgets().Lister(), false, schedulerapi.DefaultPercentageOfNodesToScore) g := scheduler.(*genericScheduler) @@ -2306,7 +2297,6 @@ func TestPreempt(t *testing.T) { deletedPodNames.Insert(action.(clienttesting.DeleteAction).GetName()) return true, nil, nil }) - informerFactory := informers.NewSharedInformerFactory(client, 0) stop := make(chan struct{}) cache := internalcache.New(time.Duration(0), stop) @@ -2344,29 +2334,21 @@ func TestPreempt(t *testing.T) { extenders = append(extenders, extender) } + podNominator := internalqueue.NewPodNominator() snapshot := internalcache.NewSnapshot(test.pods, nodes) fwk, err := st.NewFramework( test.registerPlugins, framework.WithClientSet(client), + framework.WithEventRecorder(&events.FakeRecorder{}), + framework.WithExtenders(extenders), + framework.WithPodNominator(podNominator), framework.WithSnapshotSharedLister(snapshot), + framework.WithInformerFactory(informers.NewSharedInformerFactory(client, 0)), ) if err != nil { t.Fatal(err) } - prof := &profile.Profile{ - Framework: fwk, - Recorder: &events.FakeRecorder{}, - } - scheduler := NewGenericScheduler( - cache, - internalqueue.NewSchedulingQueue(nil), - snapshot, - extenders, - informerFactory.Core().V1().PersistentVolumeClaims().Lister(), - informerFactory.Policy().V1beta1().PodDisruptionBudgets().Lister(), - false, - schedulerapi.DefaultPercentageOfNodesToScore) state := framework.NewCycleState() // Some tests rely on PreFilter plugin to compute its CycleState. preFilterStatus := fwk.RunPreFilterPlugins(context.Background(), state, test.pod) @@ -2378,7 +2360,7 @@ func TestPreempt(t *testing.T) { if test.failedNodeToStatusMap != nil { failedNodeToStatusMap = test.failedNodeToStatusMap } - node, err := scheduler.Preempt(context.Background(), prof, state, test.pod, failedNodeToStatusMap) + node, err := Preempt(context.Background(), fwk, state, test.pod, failedNodeToStatusMap) if err != nil { t.Errorf("unexpected error in preemption: %v", err) } @@ -2416,7 +2398,7 @@ func TestPreempt(t *testing.T) { } // Call preempt again and make sure it doesn't preempt any more pods. - node, err = scheduler.Preempt(context.Background(), prof, state, test.pod, failedNodeToStatusMap) + node, err = Preempt(context.Background(), fwk, state, test.pod, failedNodeToStatusMap) if err != nil { t.Errorf("unexpected error in preemption: %v", err) } diff --git a/pkg/scheduler/eventhandlers.go b/pkg/scheduler/eventhandlers.go index 1809fc84645..2085ffc3b5c 100644 --- a/pkg/scheduler/eventhandlers.go +++ b/pkg/scheduler/eventhandlers.go @@ -30,6 +30,7 @@ import ( coreinformers "k8s.io/client-go/informers/core/v1" "k8s.io/client-go/tools/cache" "k8s.io/kubernetes/pkg/features" + kubefeatures "k8s.io/kubernetes/pkg/features" "k8s.io/kubernetes/pkg/scheduler/internal/queue" "k8s.io/kubernetes/pkg/scheduler/profile" ) @@ -466,6 +467,11 @@ func addAllEventHandlers( AddFunc: sched.onStorageClassAdd, }, ) + + // TODO(Huang-Wei): remove this hack when defaultpreemption plugin is enabled. + if utilfeature.DefaultFeatureGate.Enabled(kubefeatures.PodDisruptionBudget) { + informerFactory.Policy().V1beta1().PodDisruptionBudgets().Lister() + } } func nodeSchedulingPropertiesChange(newNode *v1.Node, oldNode *v1.Node) string { diff --git a/pkg/scheduler/factory.go b/pkg/scheduler/factory.go index 906532accba..4b02dbbb30b 100644 --- a/pkg/scheduler/factory.go +++ b/pkg/scheduler/factory.go @@ -30,15 +30,12 @@ import ( "k8s.io/apimachinery/pkg/fields" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/util/sets" - utilfeature "k8s.io/apiserver/pkg/util/feature" "k8s.io/client-go/informers" coreinformers "k8s.io/client-go/informers/core/v1" clientset "k8s.io/client-go/kubernetes" corelisters "k8s.io/client-go/listers/core/v1" - policylisters "k8s.io/client-go/listers/policy/v1beta1" "k8s.io/client-go/tools/cache" "k8s.io/klog/v2" - kubefeatures "k8s.io/kubernetes/pkg/features" "k8s.io/kubernetes/pkg/scheduler/algorithmprovider" schedulerapi "k8s.io/kubernetes/pkg/scheduler/apis/config" "k8s.io/kubernetes/pkg/scheduler/apis/config/validation" @@ -192,7 +189,6 @@ func (c *Configurator) create() (*Scheduler, error) { c.nodeInfoSnapshot, extenders, c.informerFactory.Core().V1().PersistentVolumeClaims().Lister(), - GetPodDisruptionBudgetLister(c.informerFactory), c.disablePreemption, c.percentageOfNodesToScore, ) @@ -478,11 +474,3 @@ func MakeDefaultErrorFunc(client clientset.Interface, podLister corelisters.PodL } } } - -// GetPodDisruptionBudgetLister returns pdb lister from the given informer factory. Returns nil if PodDisruptionBudget feature is disabled. -func GetPodDisruptionBudgetLister(informerFactory informers.SharedInformerFactory) policylisters.PodDisruptionBudgetLister { - if utilfeature.DefaultFeatureGate.Enabled(kubefeatures.PodDisruptionBudget) { - return informerFactory.Policy().V1beta1().PodDisruptionBudgets().Lister() - } - return nil -} diff --git a/pkg/scheduler/framework/plugins/BUILD b/pkg/scheduler/framework/plugins/BUILD index 117075bd0ed..a65144171cb 100644 --- a/pkg/scheduler/framework/plugins/BUILD +++ b/pkg/scheduler/framework/plugins/BUILD @@ -48,6 +48,7 @@ filegroup( ":package-srcs", "//pkg/scheduler/framework/plugins/defaultbinder:all-srcs", "//pkg/scheduler/framework/plugins/defaultpodtopologyspread:all-srcs", + "//pkg/scheduler/framework/plugins/defaultpreemption:all-srcs", "//pkg/scheduler/framework/plugins/examples:all-srcs", "//pkg/scheduler/framework/plugins/helper:all-srcs", "//pkg/scheduler/framework/plugins/imagelocality:all-srcs", diff --git a/pkg/scheduler/framework/plugins/defaultpreemption/BUILD b/pkg/scheduler/framework/plugins/defaultpreemption/BUILD new file mode 100644 index 00000000000..80e8552dcf7 --- /dev/null +++ b/pkg/scheduler/framework/plugins/defaultpreemption/BUILD @@ -0,0 +1,28 @@ +load("@io_bazel_rules_go//go:def.bzl", "go_library") + +go_library( + name = "go_default_library", + srcs = ["default_preemption.go"], + importpath = "k8s.io/kubernetes/pkg/scheduler/framework/plugins/defaultpreemption", + visibility = ["//visibility:public"], + deps = [ + "//pkg/scheduler/core:go_default_library", + "//pkg/scheduler/framework/v1alpha1:go_default_library", + "//staging/src/k8s.io/api/core/v1:go_default_library", + "//staging/src/k8s.io/apimachinery/pkg/runtime:go_default_library", + ], +) + +filegroup( + name = "package-srcs", + srcs = glob(["**"]), + tags = ["automanaged"], + visibility = ["//visibility:private"], +) + +filegroup( + name = "all-srcs", + srcs = [":package-srcs"], + tags = ["automanaged"], + visibility = ["//visibility:public"], +) diff --git a/pkg/scheduler/framework/plugins/defaultpreemption/default_preemption.go b/pkg/scheduler/framework/plugins/defaultpreemption/default_preemption.go new file mode 100644 index 00000000000..670978cff17 --- /dev/null +++ b/pkg/scheduler/framework/plugins/defaultpreemption/default_preemption.go @@ -0,0 +1,61 @@ +/* +Copyright 2020 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package defaultpreemption + +import ( + "context" + + v1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/kubernetes/pkg/scheduler/core" + framework "k8s.io/kubernetes/pkg/scheduler/framework/v1alpha1" +) + +const ( + // Name of the plugin used in the plugin registry and configurations. + Name = "DefaultPreemption" +) + +// DefaultPreemption is a PostFilter plugin implements the preemption logic. +type DefaultPreemption struct { + fh framework.FrameworkHandle +} + +var _ framework.PostFilterPlugin = &DefaultPreemption{} + +// Name returns name of the plugin. It is used in logs, etc. +func (pl *DefaultPreemption) Name() string { + return Name +} + +// New initializes a new plugin and returns it. +func New(_ runtime.Object, fh framework.FrameworkHandle) (framework.Plugin, error) { + pl := DefaultPreemption{fh} + return &pl, nil +} + +// PostFilter invoked at the postFilter extension point. +func (pl *DefaultPreemption) PostFilter(ctx context.Context, state *framework.CycleState, pod *v1.Pod, m framework.NodeToStatusMap) (*framework.PostFilterResult, *framework.Status) { + nnn, err := core.Preempt(ctx, pl.fh, state, pod, m) + if err != nil { + return nil, framework.NewStatus(framework.Error, err.Error()) + } + if nnn == "" { + return nil, framework.NewStatus(framework.Unschedulable) + } + return &framework.PostFilterResult{NominatedNodeName: nnn}, framework.NewStatus(framework.Success) +} diff --git a/pkg/scheduler/scheduler.go b/pkg/scheduler/scheduler.go index c88555528a2..bf8845a2407 100644 --- a/pkg/scheduler/scheduler.go +++ b/pkg/scheduler/scheduler.go @@ -486,7 +486,7 @@ func (sched *Scheduler) scheduleOne(ctx context.Context) { } else { preemptionStartTime := time.Now() // TODO(Huang-Wei): implement the preemption logic as a PostFilter plugin. - nominatedNode, _ = sched.Algorithm.Preempt(schedulingCycleCtx, prof, state, pod, fitError.FilteredNodesStatuses) + nominatedNode, _ = core.Preempt(schedulingCycleCtx, prof, state, pod, fitError.FilteredNodesStatuses) metrics.PreemptionAttempts.Inc() metrics.SchedulingAlgorithmPreemptionEvaluationDuration.Observe(metrics.SinceInSeconds(preemptionStartTime)) metrics.DeprecatedSchedulingDuration.WithLabelValues(metrics.PreemptionEvaluation).Observe(metrics.SinceInSeconds(preemptionStartTime)) diff --git a/pkg/scheduler/scheduler_test.go b/pkg/scheduler/scheduler_test.go index 48de4840167..048f14de26c 100644 --- a/pkg/scheduler/scheduler_test.go +++ b/pkg/scheduler/scheduler_test.go @@ -120,10 +120,6 @@ func (es mockScheduler) Extenders() []framework.Extender { return nil } -func (es mockScheduler) Preempt(ctx context.Context, profile *profile.Profile, state *framework.CycleState, pod *v1.Pod, m framework.NodeToStatusMap) (string, error) { - return "", nil -} - func TestSchedulerCreation(t *testing.T) { invalidRegistry := map[string]framework.PluginFactory{ defaultbinder.Name: defaultbinder.New, @@ -787,7 +783,6 @@ func setupTestScheduler(queuedPodStore *clientcache.FIFO, scache internalcache.C internalcache.NewEmptySnapshot(), []framework.Extender{}, informerFactory.Core().V1().PersistentVolumeClaims().Lister(), - informerFactory.Policy().V1beta1().PodDisruptionBudgets().Lister(), false, schedulerapi.DefaultPercentageOfNodesToScore, ) @@ -1140,7 +1135,6 @@ func TestSchedulerBinding(t *testing.T) { nil, test.extenders, nil, - nil, false, 0, )