From f1a5b6ca065085c5d690748b5f5f8cd81313812d Mon Sep 17 00:00:00 2001 From: Ruquan Zhao Date: Mon, 14 Feb 2022 14:40:14 +0800 Subject: [PATCH] Remove genericScheduler and SchedulerAlgorithm. Signed-off-by: Ruquan Zhao --- pkg/scheduler/extender_test.go | 13 ++- pkg/scheduler/factory.go | 24 +++--- pkg/scheduler/generic_scheduler.go | 87 +++++++------------ pkg/scheduler/generic_scheduler_test.go | 95 ++++++++++++--------- pkg/scheduler/scheduler.go | 49 +++++++++-- pkg/scheduler/scheduler_test.go | 101 +++++++++++------------ test/integration/scheduler/queue_test.go | 4 +- 7 files changed, 197 insertions(+), 176 deletions(-) diff --git a/pkg/scheduler/extender_test.go b/pkg/scheduler/extender_test.go index d137edf01a8..d27ea1eb7e2 100644 --- a/pkg/scheduler/extender_test.go +++ b/pkg/scheduler/extender_test.go @@ -41,7 +41,7 @@ import ( st "k8s.io/kubernetes/pkg/scheduler/testing" ) -func TestGenericSchedulerWithExtenders(t *testing.T) { +func TestSchedulerWithExtenders(t *testing.T) { tests := []struct { name string registerPlugins []st.RegisterPluginFunc @@ -294,12 +294,19 @@ func TestGenericSchedulerWithExtenders(t *testing.T) { t.Fatal(err) } - scheduler := NewGenericScheduler( + scheduler := newScheduler( cache, + extenders, + nil, + nil, + nil, + nil, + nil, + nil, emptySnapshot, schedulerapi.DefaultPercentageOfNodesToScore) podIgnored := &v1.Pod{} - result, err := scheduler.Schedule(context.Background(), extenders, fwk, framework.NewCycleState(), podIgnored) + result, err := scheduler.SchedulePod(context.Background(), fwk, framework.NewCycleState(), podIgnored) if test.expectsErr { if err == nil { t.Errorf("Unexpected non-error, result %+v", result) diff --git a/pkg/scheduler/factory.go b/pkg/scheduler/factory.go index 3643361b869..012f85cdc6d 100644 --- a/pkg/scheduler/factory.go +++ b/pkg/scheduler/factory.go @@ -182,22 +182,18 @@ func (c *Configurator) create() (*Scheduler, error) { ) debugger.ListenForSignal(c.StopEverything) - algo := NewGenericScheduler( + sched := newScheduler( c.schedulerCache, + extenders, + internalqueue.MakeNextPodFunc(podQueue), + MakeDefaultErrorFunc(c.client, c.informerFactory.Core().V1().Pods().Lister(), podQueue, c.schedulerCache), + c.StopEverything, + podQueue, + profiles, + c.client, c.nodeInfoSnapshot, - c.percentageOfNodesToScore, - ) - - return &Scheduler{ - Cache: c.schedulerCache, - Algorithm: algo, - Extenders: extenders, - Profiles: profiles, - NextPod: internalqueue.MakeNextPodFunc(podQueue), - Error: MakeDefaultErrorFunc(c.client, c.informerFactory.Core().V1().Pods().Lister(), podQueue, c.schedulerCache), - StopEverything: c.StopEverything, - SchedulingQueue: podQueue, - }, nil + c.percentageOfNodesToScore) + return sched, nil } // MakeDefaultErrorFunc construct a function to handle pod scheduler error diff --git a/pkg/scheduler/generic_scheduler.go b/pkg/scheduler/generic_scheduler.go index ac216a2e2e8..e04be6a81e6 100644 --- a/pkg/scheduler/generic_scheduler.go +++ b/pkg/scheduler/generic_scheduler.go @@ -32,7 +32,6 @@ import ( "k8s.io/kubernetes/pkg/scheduler/framework" "k8s.io/kubernetes/pkg/scheduler/framework/parallelize" "k8s.io/kubernetes/pkg/scheduler/framework/runtime" - internalcache "k8s.io/kubernetes/pkg/scheduler/internal/cache" "k8s.io/kubernetes/pkg/scheduler/metrics" utiltrace "k8s.io/utils/trace" ) @@ -53,13 +52,6 @@ const ( // ErrNoNodesAvailable is used to describe the error that no nodes available to schedule pods. var ErrNoNodesAvailable = fmt.Errorf("no nodes available to schedule pods") -// ScheduleAlgorithm is an interface implemented by things that know how to schedule pods -// onto machines. -// TODO: Rename this type. -type ScheduleAlgorithm interface { - Schedule(context.Context, []framework.Extender, framework.Framework, *framework.CycleState, *v1.Pod) (scheduleResult ScheduleResult, err error) -} - // ScheduleResult represents the result of one pod scheduled. It will contain // the final selected Node, along with the selected intermediate information. type ScheduleResult struct { @@ -71,37 +63,30 @@ type ScheduleResult struct { FeasibleNodes int } -type genericScheduler struct { - cache internalcache.Cache - nodeInfoSnapshot *internalcache.Snapshot - percentageOfNodesToScore int32 - nextStartNodeIndex int -} - // snapshot snapshots scheduler cache and node infos for all fit and priority // functions. -func (g *genericScheduler) snapshot() error { +func (sched *Scheduler) snapshot() error { // Used for all fit and priority funcs. - return g.cache.UpdateSnapshot(g.nodeInfoSnapshot) + return sched.Cache.UpdateSnapshot(sched.nodeInfoSnapshot) } -// Schedule tries to schedule the given pod to one of the nodes in the node list. +// schedulePod tries to schedule the given pod to one of the nodes in the node list. // If it succeeds, it will return the name of the node. -// If it fails, it will return a FitError error with reasons. -func (g *genericScheduler) Schedule(ctx context.Context, extenders []framework.Extender, fwk framework.Framework, state *framework.CycleState, pod *v1.Pod) (result ScheduleResult, err error) { +// If it fails, it will return a FitError with reasons. +func (sched *Scheduler) schedulePod(ctx context.Context, fwk framework.Framework, state *framework.CycleState, pod *v1.Pod) (result ScheduleResult, err error) { trace := utiltrace.New("Scheduling", utiltrace.Field{Key: "namespace", Value: pod.Namespace}, utiltrace.Field{Key: "name", Value: pod.Name}) defer trace.LogIfLong(100 * time.Millisecond) - if err := g.snapshot(); err != nil { + if err := sched.snapshot(); err != nil { return result, err } trace.Step("Snapshotting scheduler cache and node infos done") - if g.nodeInfoSnapshot.NumNodes() == 0 { + if sched.nodeInfoSnapshot.NumNodes() == 0 { return result, ErrNoNodesAvailable } - feasibleNodes, diagnosis, err := g.findNodesThatFitPod(ctx, extenders, fwk, state, pod) + feasibleNodes, diagnosis, err := sched.findNodesThatFitPod(ctx, fwk, state, pod) if err != nil { return result, err } @@ -110,7 +95,7 @@ func (g *genericScheduler) Schedule(ctx context.Context, extenders []framework.E if len(feasibleNodes) == 0 { return result, &framework.FitError{ Pod: pod, - NumAllNodes: g.nodeInfoSnapshot.NumNodes(), + NumAllNodes: sched.nodeInfoSnapshot.NumNodes(), Diagnosis: diagnosis, } } @@ -124,12 +109,12 @@ func (g *genericScheduler) Schedule(ctx context.Context, extenders []framework.E }, nil } - priorityList, err := prioritizeNodes(ctx, extenders, fwk, state, pod, feasibleNodes) + priorityList, err := prioritizeNodes(ctx, sched.Extenders, fwk, state, pod, feasibleNodes) if err != nil { return result, err } - host, err := g.selectHost(priorityList) + host, err := selectHost(priorityList) trace.Step("Prioritizing done") return ScheduleResult{ @@ -141,7 +126,7 @@ func (g *genericScheduler) Schedule(ctx context.Context, extenders []framework.E // selectHost takes a prioritized list of nodes and then picks one // in a reservoir sampling manner from the nodes that had the highest score. -func (g *genericScheduler) selectHost(nodeScoreList framework.NodeScoreList) (string, error) { +func selectHost(nodeScoreList framework.NodeScoreList) (string, error) { if len(nodeScoreList) == 0 { return "", fmt.Errorf("empty priorityList") } @@ -166,12 +151,12 @@ func (g *genericScheduler) selectHost(nodeScoreList framework.NodeScoreList) (st // numFeasibleNodesToFind returns the number of feasible nodes that once found, the scheduler stops // its search for more feasible nodes. -func (g *genericScheduler) numFeasibleNodesToFind(numAllNodes int32) (numNodes int32) { - if numAllNodes < minFeasibleNodesToFind || g.percentageOfNodesToScore >= 100 { +func (sched *Scheduler) numFeasibleNodesToFind(numAllNodes int32) (numNodes int32) { + if numAllNodes < minFeasibleNodesToFind || sched.percentageOfNodesToScore >= 100 { return numAllNodes } - adaptivePercentage := g.percentageOfNodesToScore + adaptivePercentage := sched.percentageOfNodesToScore if adaptivePercentage <= 0 { basePercentageOfNodesToScore := int32(50) adaptivePercentage = basePercentageOfNodesToScore - numAllNodes/125 @@ -188,19 +173,19 @@ func (g *genericScheduler) numFeasibleNodesToFind(numAllNodes int32) (numNodes i return numNodes } -func (g *genericScheduler) evaluateNominatedNode(ctx context.Context, extenders []framework.Extender, pod *v1.Pod, fwk framework.Framework, state *framework.CycleState, diagnosis framework.Diagnosis) ([]*v1.Node, error) { +func (sched *Scheduler) evaluateNominatedNode(ctx context.Context, pod *v1.Pod, fwk framework.Framework, state *framework.CycleState, diagnosis framework.Diagnosis) ([]*v1.Node, error) { nnn := pod.Status.NominatedNodeName - nodeInfo, err := g.nodeInfoSnapshot.Get(nnn) + nodeInfo, err := sched.nodeInfoSnapshot.Get(nnn) if err != nil { return nil, err } node := []*framework.NodeInfo{nodeInfo} - feasibleNodes, err := g.findNodesThatPassFilters(ctx, fwk, state, pod, diagnosis, node) + feasibleNodes, err := sched.findNodesThatPassFilters(ctx, fwk, state, pod, diagnosis, node) if err != nil { return nil, err } - feasibleNodes, err = findNodesThatPassExtenders(extenders, pod, feasibleNodes, diagnosis.NodeToStatusMap) + feasibleNodes, err = findNodesThatPassExtenders(sched.Extenders, pod, feasibleNodes, diagnosis.NodeToStatusMap) if err != nil { return nil, err } @@ -210,7 +195,7 @@ func (g *genericScheduler) evaluateNominatedNode(ctx context.Context, extenders // Filters the nodes to find the ones that fit the pod based on the framework // filter plugins and filter extenders. -func (g *genericScheduler) findNodesThatFitPod(ctx context.Context, extenders []framework.Extender, fwk framework.Framework, state *framework.CycleState, pod *v1.Pod) ([]*v1.Node, framework.Diagnosis, error) { +func (sched *Scheduler) findNodesThatFitPod(ctx context.Context, fwk framework.Framework, state *framework.CycleState, pod *v1.Pod) ([]*v1.Node, framework.Diagnosis, error) { diagnosis := framework.Diagnosis{ NodeToStatusMap: make(framework.NodeToStatusMap), UnschedulablePlugins: sets.NewString(), @@ -218,7 +203,7 @@ func (g *genericScheduler) findNodesThatFitPod(ctx context.Context, extenders [] // Run "prefilter" plugins. s := fwk.RunPreFilterPlugins(ctx, state, pod) - allNodes, err := g.nodeInfoSnapshot.NodeInfos().List() + allNodes, err := sched.nodeInfoSnapshot.NodeInfos().List() if err != nil { return nil, diagnosis, err } @@ -239,7 +224,7 @@ func (g *genericScheduler) findNodesThatFitPod(ctx context.Context, extenders [] // "NominatedNodeName" can potentially be set in a previous scheduling cycle as a result of preemption. // This node is likely the only candidate that will fit the pod, and hence we try it first before iterating over all nodes. if len(pod.Status.NominatedNodeName) > 0 { - feasibleNodes, err := g.evaluateNominatedNode(ctx, extenders, pod, fwk, state, diagnosis) + feasibleNodes, err := sched.evaluateNominatedNode(ctx, pod, fwk, state, diagnosis) if err != nil { klog.ErrorS(err, "Evaluation failed on nominated node", "pod", klog.KObj(pod), "node", pod.Status.NominatedNodeName) } @@ -248,12 +233,12 @@ func (g *genericScheduler) findNodesThatFitPod(ctx context.Context, extenders [] return feasibleNodes, diagnosis, nil } } - feasibleNodes, err := g.findNodesThatPassFilters(ctx, fwk, state, pod, diagnosis, allNodes) + feasibleNodes, err := sched.findNodesThatPassFilters(ctx, fwk, state, pod, diagnosis, allNodes) if err != nil { return nil, diagnosis, err } - feasibleNodes, err = findNodesThatPassExtenders(extenders, pod, feasibleNodes, diagnosis.NodeToStatusMap) + feasibleNodes, err = findNodesThatPassExtenders(sched.Extenders, pod, feasibleNodes, diagnosis.NodeToStatusMap) if err != nil { return nil, diagnosis, err } @@ -261,14 +246,14 @@ func (g *genericScheduler) findNodesThatFitPod(ctx context.Context, extenders [] } // findNodesThatPassFilters finds the nodes that fit the filter plugins. -func (g *genericScheduler) findNodesThatPassFilters( +func (sched *Scheduler) findNodesThatPassFilters( ctx context.Context, fwk framework.Framework, state *framework.CycleState, pod *v1.Pod, diagnosis framework.Diagnosis, nodes []*framework.NodeInfo) ([]*v1.Node, error) { - numNodesToFind := g.numFeasibleNodesToFind(int32(len(nodes))) + numNodesToFind := sched.numFeasibleNodesToFind(int32(len(nodes))) // Create feasible list with enough space to avoid growing it // and allow assigning. @@ -277,9 +262,9 @@ func (g *genericScheduler) findNodesThatPassFilters( if !fwk.HasFilterPlugins() { length := len(nodes) for i := range feasibleNodes { - feasibleNodes[i] = nodes[(g.nextStartNodeIndex+i)%length].Node() + feasibleNodes[i] = nodes[(sched.nextStartNodeIndex+i)%length].Node() } - g.nextStartNodeIndex = (g.nextStartNodeIndex + len(feasibleNodes)) % length + sched.nextStartNodeIndex = (sched.nextStartNodeIndex + len(feasibleNodes)) % length return feasibleNodes, nil } @@ -290,7 +275,7 @@ func (g *genericScheduler) findNodesThatPassFilters( checkNode := func(i int) { // We check the nodes starting from where we left off in the previous scheduling cycle, // this is to make sure all nodes have the same chance of being examined across pods. - nodeInfo := nodes[(g.nextStartNodeIndex+i)%len(nodes)] + nodeInfo := nodes[(sched.nextStartNodeIndex+i)%len(nodes)] status := fwk.RunFilterPluginsWithNominatedPods(ctx, state, pod, nodeInfo) if status.Code() == framework.Error { errCh.SendErrorWithCancel(status.AsError(), cancel) @@ -325,7 +310,7 @@ func (g *genericScheduler) findNodesThatPassFilters( // are found. fwk.Parallelizer().Until(ctx, len(nodes), checkNode) processedNodes := int(feasibleNodesLen) + len(diagnosis.NodeToStatusMap) - g.nextStartNodeIndex = (g.nextStartNodeIndex + processedNodes) % len(nodes) + sched.nextStartNodeIndex = (sched.nextStartNodeIndex + processedNodes) % len(nodes) feasibleNodes = feasibleNodes[:feasibleNodesLen] if err := errCh.ReceiveError(); err != nil { @@ -494,15 +479,3 @@ func prioritizeNodes( } return result, nil } - -// NewGenericScheduler creates a genericScheduler object. -func NewGenericScheduler( - cache internalcache.Cache, - nodeInfoSnapshot *internalcache.Snapshot, - percentageOfNodesToScore int32) ScheduleAlgorithm { - return &genericScheduler{ - cache: cache, - nodeInfoSnapshot: nodeInfoSnapshot, - percentageOfNodesToScore: percentageOfNodesToScore, - } -} diff --git a/pkg/scheduler/generic_scheduler_test.go b/pkg/scheduler/generic_scheduler_test.go index b40dad8d9fa..5465722a65a 100644 --- a/pkg/scheduler/generic_scheduler_test.go +++ b/pkg/scheduler/generic_scheduler_test.go @@ -202,7 +202,6 @@ func makeNodeList(nodeNames []string) []*v1.Node { } func TestSelectHost(t *testing.T) { - scheduler := genericScheduler{} tests := []struct { name string list framework.NodeScoreList @@ -253,7 +252,7 @@ func TestSelectHost(t *testing.T) { t.Run(test.name, func(t *testing.T) { // increase the randomness for i := 0; i < 10; i++ { - got, err := scheduler.selectHost(test.list) + got, err := selectHost(test.list) if test.expectsErr { if err == nil { t.Error("Unexpected non-error") @@ -450,7 +449,7 @@ func TestFindNodesThatPassExtenders(t *testing.T) { } } -func TestGenericScheduler(t *testing.T) { +func TestSchedulerSchedulePod(t *testing.T) { fts := feature.Features{} tests := []struct { name string @@ -1006,15 +1005,21 @@ func TestGenericScheduler(t *testing.T) { t.Fatal(err) } - scheduler := NewGenericScheduler( + scheduler := newScheduler( cache, + nil, + nil, + nil, + nil, + nil, + nil, + nil, snapshot, - schedulerapi.DefaultPercentageOfNodesToScore, - ) + schedulerapi.DefaultPercentageOfNodesToScore) informerFactory.Start(ctx.Done()) informerFactory.WaitForCacheSync(ctx.Done()) - result, err := scheduler.Schedule(ctx, nil, fwk, framework.NewCycleState(), test.pod) + result, err := scheduler.SchedulePod(ctx, fwk, framework.NewCycleState(), test.pod) if err != test.wErr { gotFitErr, gotOK := err.(*framework.FitError) wantFitErr, wantOK := test.wErr.(*framework.FitError) @@ -1036,19 +1041,26 @@ func TestGenericScheduler(t *testing.T) { } } -// makeScheduler makes a simple genericScheduler for testing. -func makeScheduler(nodes []*v1.Node) *genericScheduler { +// makeScheduler makes a simple Scheduler for testing. +func makeScheduler(nodes []*v1.Node) *Scheduler { cache := internalcache.New(time.Duration(0), wait.NeverStop) for _, n := range nodes { cache.AddNode(n) } - s := NewGenericScheduler( + s := newScheduler( cache, + nil, + nil, + nil, + nil, + nil, + nil, + nil, emptySnapshot, schedulerapi.DefaultPercentageOfNodesToScore) - cache.UpdateSnapshot(s.(*genericScheduler).nodeInfoSnapshot) - return s.(*genericScheduler) + cache.UpdateSnapshot(s.nodeInfoSnapshot) + return s } func TestFindFitAllError(t *testing.T) { @@ -1068,8 +1080,7 @@ func TestFindFitAllError(t *testing.T) { t.Fatal(err) } - _, diagnosis, err := scheduler.findNodesThatFitPod(context.Background(), nil, fwk, framework.NewCycleState(), &v1.Pod{}) - + _, diagnosis, err := scheduler.findNodesThatFitPod(context.Background(), fwk, framework.NewCycleState(), &v1.Pod{}) if err != nil { t.Errorf("unexpected error: %v", err) } @@ -1105,8 +1116,7 @@ func TestFindFitSomeError(t *testing.T) { } pod := &v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "1", UID: types.UID("1")}} - _, diagnosis, err := scheduler.findNodesThatFitPod(context.Background(), nil, fwk, framework.NewCycleState(), pod) - + _, diagnosis, err := scheduler.findNodesThatFitPod(context.Background(), fwk, framework.NewCycleState(), pod) if err != nil { t.Errorf("unexpected error: %v", err) } @@ -1179,14 +1189,13 @@ func TestFindFitPredicateCallCounts(t *testing.T) { } scheduler := makeScheduler(nodes) - if err := scheduler.cache.UpdateSnapshot(scheduler.nodeInfoSnapshot); err != nil { + if err := scheduler.Cache.UpdateSnapshot(scheduler.nodeInfoSnapshot); err != nil { t.Fatal(err) } fwk.AddNominatedPod(framework.NewPodInfo(&v1.Pod{ObjectMeta: metav1.ObjectMeta{UID: "nominated"}, Spec: v1.PodSpec{Priority: &midPriority}}), &framework.NominatingInfo{NominatingMode: framework.ModeOverride, NominatedNodeName: "1"}) - _, _, err = scheduler.findNodesThatFitPod(context.Background(), nil, fwk, framework.NewCycleState(), test.pod) - + _, _, err = scheduler.findNodesThatFitPod(context.Background(), fwk, framework.NewCycleState(), test.pod) if err != nil { t.Errorf("unexpected error: %v", err) } @@ -1335,15 +1344,21 @@ func TestZeroRequest(t *testing.T) { t.Fatalf("error creating framework: %+v", err) } - scheduler := NewGenericScheduler( + scheduler := newScheduler( nil, - emptySnapshot, - schedulerapi.DefaultPercentageOfNodesToScore).(*genericScheduler) - scheduler.nodeInfoSnapshot = snapshot + nil, + nil, + nil, + nil, + nil, + nil, + nil, + snapshot, + schedulerapi.DefaultPercentageOfNodesToScore) ctx := context.Background() state := framework.NewCycleState() - _, _, err = scheduler.findNodesThatFitPod(ctx, nil, fwk, state, test.pod) + _, _, err = scheduler.findNodesThatFitPod(ctx, fwk, state, test.pod) if err != nil { t.Fatalf("error filtering nodes: %+v", err) } @@ -1406,11 +1421,11 @@ func TestNumFeasibleNodesToFind(t *testing.T) { } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - g := &genericScheduler{ + sched := &Scheduler{ percentageOfNodesToScore: tt.percentageOfNodesToScore, } - if gotNumNodes := g.numFeasibleNodesToFind(tt.numAllNodes); gotNumNodes != tt.wantNumNodes { - t.Errorf("genericScheduler.numFeasibleNodesToFind() = %v, want %v", gotNumNodes, tt.wantNumNodes) + if gotNumNodes := sched.numFeasibleNodesToFind(tt.numAllNodes); gotNumNodes != tt.wantNumNodes { + t.Errorf("Scheduler.numFeasibleNodesToFind() = %v, want %v", gotNumNodes, tt.wantNumNodes) } }) } @@ -1423,7 +1438,7 @@ func TestFairEvaluationForNodes(t *testing.T) { nodeNames = append(nodeNames, strconv.Itoa(i)) } nodes := makeNodeList(nodeNames) - g := makeScheduler(nodes) + sched := makeScheduler(nodes) fwk, err := st.NewFramework( []st.RegisterPluginFunc{ st.RegisterQueueSortPlugin(queuesort.Name, queuesort.New), @@ -1438,20 +1453,20 @@ func TestFairEvaluationForNodes(t *testing.T) { } // To make numAllNodes % nodesToFind != 0 - g.percentageOfNodesToScore = 30 - nodesToFind := int(g.numFeasibleNodesToFind(int32(numAllNodes))) + sched.percentageOfNodesToScore = 30 + nodesToFind := int(sched.numFeasibleNodesToFind(int32(numAllNodes))) // Iterating over all nodes more than twice for i := 0; i < 2*(numAllNodes/nodesToFind+1); i++ { - nodesThatFit, _, err := g.findNodesThatFitPod(context.Background(), nil, fwk, framework.NewCycleState(), &v1.Pod{}) + nodesThatFit, _, err := sched.findNodesThatFitPod(context.Background(), fwk, framework.NewCycleState(), &v1.Pod{}) if err != nil { t.Errorf("unexpected error: %v", err) } if len(nodesThatFit) != nodesToFind { t.Errorf("got %d nodes filtered, want %d", len(nodesThatFit), nodesToFind) } - if g.nextStartNodeIndex != (i+1)*nodesToFind%numAllNodes { - t.Errorf("got %d lastProcessedNodeIndex, want %d", g.nextStartNodeIndex, (i+1)*nodesToFind%numAllNodes) + if sched.nextStartNodeIndex != (i+1)*nodesToFind%numAllNodes { + t.Errorf("got %d lastProcessedNodeIndex, want %d", sched.nextStartNodeIndex, (i+1)*nodesToFind%numAllNodes) } } } @@ -1513,13 +1528,19 @@ func TestPreferNominatedNodeFilterCallCounts(t *testing.T) { t.Fatal(err) } snapshot := internalcache.NewSnapshot(nil, nodes) - scheduler := NewGenericScheduler( + scheduler := newScheduler( cache, + nil, + nil, + nil, + nil, + nil, + nil, + nil, snapshot, - schedulerapi.DefaultPercentageOfNodesToScore).(*genericScheduler) - - _, _, err = scheduler.findNodesThatFitPod(context.Background(), nil, fwk, framework.NewCycleState(), test.pod) + schedulerapi.DefaultPercentageOfNodesToScore) + _, _, err = scheduler.findNodesThatFitPod(context.Background(), fwk, framework.NewCycleState(), test.pod) if err != nil { t.Errorf("unexpected error: %v", err) } diff --git a/pkg/scheduler/scheduler.go b/pkg/scheduler/scheduler.go index 0d77793a896..c51e688e21b 100644 --- a/pkg/scheduler/scheduler.go +++ b/pkg/scheduler/scheduler.go @@ -69,8 +69,6 @@ type Scheduler struct { // by NodeLister and Algorithm. Cache internalcache.Cache - Algorithm ScheduleAlgorithm - Extenders []framework.Extender // NextPod should be a function that blocks until the next pod @@ -83,6 +81,11 @@ type Scheduler struct { // question, and the error Error func(*framework.QueuedPodInfo, error) + // SchedulePod tries to schedule the given pod to one of the nodes in the node list. + // Return a struct of ScheduleResult with the name of suggested host on success, + // otherwise will return a FitError with reasons. + SchedulePod func(ctx context.Context, fwk framework.Framework, state *framework.CycleState, pod *v1.Pod) (ScheduleResult, error) + // Close this to shut down the scheduler. StopEverything <-chan struct{} @@ -93,6 +96,12 @@ type Scheduler struct { Profiles profile.Map client clientset.Interface + + nodeInfoSnapshot *internalcache.Snapshot + + percentageOfNodesToScore int32 + + nextStartNodeIndex int } type schedulerOptions struct { @@ -213,6 +222,34 @@ var defaultSchedulerOptions = schedulerOptions{ applyDefaultProfile: true, } +// newScheduler creates a Scheduler object. +func newScheduler( + cache internalcache.Cache, + extenders []framework.Extender, + nextPod func() *framework.QueuedPodInfo, + Error func(*framework.QueuedPodInfo, error), + stopEverything <-chan struct{}, + schedulingQueue internalqueue.SchedulingQueue, + profiles profile.Map, + client clientset.Interface, + nodeInfoSnapshot *internalcache.Snapshot, + percentageOfNodesToScore int32) *Scheduler { + sched := Scheduler{ + Cache: cache, + Extenders: extenders, + NextPod: nextPod, + Error: Error, + StopEverything: stopEverything, + SchedulingQueue: schedulingQueue, + Profiles: profiles, + client: client, + nodeInfoSnapshot: nodeInfoSnapshot, + percentageOfNodesToScore: percentageOfNodesToScore, + } + sched.SchedulePod = sched.schedulePod + return &sched +} + // New returns a Scheduler func New(client clientset.Interface, informerFactory informers.SharedInformerFactory, @@ -279,10 +316,6 @@ func New(client clientset.Interface, return nil, fmt.Errorf("couldn't create scheduler: %v", err) } - // Additional tweaks to the config produced by the configurator. - sched.StopEverything = stopEverything - sched.client = client - addAllEventHandlers(sched, informerFactory, dynInformerFactory, unionedGVKs(clusterEventMap)) return sched, nil @@ -462,9 +495,9 @@ func (sched *Scheduler) scheduleOne(ctx context.Context) { schedulingCycleCtx, cancel := context.WithCancel(ctx) defer cancel() - scheduleResult, err := sched.Algorithm.Schedule(schedulingCycleCtx, sched.Extenders, fwk, state, pod) + scheduleResult, err := sched.SchedulePod(schedulingCycleCtx, fwk, state, pod) if err != nil { - // Schedule() may have failed because the pod would not fit on any host, so we try to + // SchedulePod() may have failed because the pod would not fit on any host, so we try to // preempt, with the expectation that the next time the pod is tried for scheduling it // will fit due to the preemption. It is also possible that a different pod will schedule // into the resources that were preempted, but this is harmless. diff --git a/pkg/scheduler/scheduler_test.go b/pkg/scheduler/scheduler_test.go index b146dd1cf25..83f05ac49fc 100644 --- a/pkg/scheduler/scheduler_test.go +++ b/pkg/scheduler/scheduler_test.go @@ -104,15 +104,11 @@ func podWithResources(id, desiredHost string, limits v1.ResourceList, requests v return pod } -type mockScheduler struct { +type mockScheduleResult struct { result ScheduleResult err error } -func (es mockScheduler) Schedule(ctx context.Context, extenders []framework.Extender, fwk framework.Framework, state *framework.CycleState, pod *v1.Pod) (ScheduleResult, error) { - return es.result, es.err -} - func TestSchedulerCreation(t *testing.T) { invalidRegistry := map[string]frameworkruntime.PluginFactory{ defaultbinder.Name: defaultbinder.New, @@ -307,7 +303,6 @@ func TestSchedulerScheduleOne(t *testing.T) { name string injectBindError error sendPod *v1.Pod - algo ScheduleAlgorithm registerPluginFuncs []st.RegisterPluginFunc expectErrorPod *v1.Pod expectForgetPod *v1.Pod @@ -315,11 +310,12 @@ func TestSchedulerScheduleOne(t *testing.T) { expectError error expectBind *v1.Binding eventReason string + mockResult mockScheduleResult }{ { - name: "error reserve pod", - sendPod: podWithID("foo", ""), - algo: mockScheduler{ScheduleResult{SuggestedHost: testNode.Name, EvaluatedNodes: 1, FeasibleNodes: 1}, nil}, + name: "error reserve pod", + sendPod: podWithID("foo", ""), + mockResult: mockScheduleResult{ScheduleResult{SuggestedHost: testNode.Name, EvaluatedNodes: 1, FeasibleNodes: 1}, nil}, registerPluginFuncs: []st.RegisterPluginFunc{ st.RegisterReservePlugin("FakeReserve", st.NewFakeReservePlugin(framework.NewStatus(framework.Error, "reserve error"))), }, @@ -330,9 +326,9 @@ func TestSchedulerScheduleOne(t *testing.T) { eventReason: "FailedScheduling", }, { - name: "error permit pod", - sendPod: podWithID("foo", ""), - algo: mockScheduler{ScheduleResult{SuggestedHost: testNode.Name, EvaluatedNodes: 1, FeasibleNodes: 1}, nil}, + name: "error permit pod", + sendPod: podWithID("foo", ""), + mockResult: mockScheduleResult{ScheduleResult{SuggestedHost: testNode.Name, EvaluatedNodes: 1, FeasibleNodes: 1}, nil}, registerPluginFuncs: []st.RegisterPluginFunc{ st.RegisterPermitPlugin("FakePermit", st.NewFakePermitPlugin(framework.NewStatus(framework.Error, "permit error"), time.Minute)), }, @@ -343,9 +339,9 @@ func TestSchedulerScheduleOne(t *testing.T) { eventReason: "FailedScheduling", }, { - name: "error prebind pod", - sendPod: podWithID("foo", ""), - algo: mockScheduler{ScheduleResult{SuggestedHost: testNode.Name, EvaluatedNodes: 1, FeasibleNodes: 1}, nil}, + name: "error prebind pod", + sendPod: podWithID("foo", ""), + mockResult: mockScheduleResult{ScheduleResult{SuggestedHost: testNode.Name, EvaluatedNodes: 1, FeasibleNodes: 1}, nil}, registerPluginFuncs: []st.RegisterPluginFunc{ st.RegisterPreBindPlugin("FakePreBind", st.NewFakePreBindPlugin(framework.AsStatus(preBindErr))), }, @@ -358,7 +354,7 @@ func TestSchedulerScheduleOne(t *testing.T) { { name: "bind assumed pod scheduled", sendPod: podWithID("foo", ""), - algo: mockScheduler{ScheduleResult{SuggestedHost: testNode.Name, EvaluatedNodes: 1, FeasibleNodes: 1}, nil}, + mockResult: mockScheduleResult{ScheduleResult{SuggestedHost: testNode.Name, EvaluatedNodes: 1, FeasibleNodes: 1}, nil}, expectBind: &v1.Binding{ObjectMeta: metav1.ObjectMeta{Name: "foo", UID: types.UID("foo")}, Target: v1.ObjectReference{Kind: "Node", Name: testNode.Name}}, expectAssumedPod: podWithID("foo", testNode.Name), eventReason: "Scheduled", @@ -366,7 +362,7 @@ func TestSchedulerScheduleOne(t *testing.T) { { name: "error pod failed scheduling", sendPod: podWithID("foo", ""), - algo: mockScheduler{ScheduleResult{SuggestedHost: testNode.Name, EvaluatedNodes: 1, FeasibleNodes: 1}, errS}, + mockResult: mockScheduleResult{ScheduleResult{SuggestedHost: testNode.Name, EvaluatedNodes: 1, FeasibleNodes: 1}, errS}, expectError: errS, expectErrorPod: podWithID("foo", ""), eventReason: "FailedScheduling", @@ -374,7 +370,7 @@ func TestSchedulerScheduleOne(t *testing.T) { { name: "error bind forget pod failed scheduling", sendPod: podWithID("foo", ""), - algo: mockScheduler{ScheduleResult{SuggestedHost: testNode.Name, EvaluatedNodes: 1, FeasibleNodes: 1}, nil}, + mockResult: mockScheduleResult{ScheduleResult{SuggestedHost: testNode.Name, EvaluatedNodes: 1, FeasibleNodes: 1}, nil}, expectBind: &v1.Binding{ObjectMeta: metav1.ObjectMeta{Name: "foo", UID: types.UID("foo")}, Target: v1.ObjectReference{Kind: "Node", Name: testNode.Name}}, expectAssumedPod: podWithID("foo", testNode.Name), injectBindError: errB, @@ -386,7 +382,7 @@ func TestSchedulerScheduleOne(t *testing.T) { { name: "deleting pod", sendPod: deletingPod("foo"), - algo: mockScheduler{ScheduleResult{}, nil}, + mockResult: mockScheduleResult{ScheduleResult{}, nil}, eventReason: "FailedScheduling", }, } @@ -435,21 +431,26 @@ func TestSchedulerScheduleOne(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - s := &Scheduler{ - Cache: cache, - Algorithm: item.algo, - client: client, - Error: func(p *framework.QueuedPodInfo, err error) { + s := newScheduler( + cache, + nil, + func() *framework.QueuedPodInfo { + return &framework.QueuedPodInfo{PodInfo: framework.NewPodInfo(item.sendPod)} + }, + func(p *framework.QueuedPodInfo, err error) { gotPod = p.Pod gotError = err }, - NextPod: func() *framework.QueuedPodInfo { - return &framework.QueuedPodInfo{PodInfo: framework.NewPodInfo(item.sendPod)} - }, - Profiles: profile.Map{ + nil, + internalqueue.NewTestQueue(ctx, nil), + profile.Map{ testSchedulerName: fwk, }, - SchedulingQueue: internalqueue.NewTestQueue(ctx, nil), + client, + nil, + 0) + s.SchedulePod = func(ctx context.Context, fwk framework.Framework, state *framework.CycleState, pod *v1.Pod) (ScheduleResult, error) { + return item.mockResult.result, item.mockResult.err } called := make(chan struct{}) stopFunc := eventBroadcaster.StartEventWatcher(func(obj runtime.Object) { @@ -914,29 +915,24 @@ func setupTestScheduler(ctx context.Context, queuedPodStore *clientcache.FIFO, c frameworkruntime.WithPodNominator(internalqueue.NewPodNominator(informerFactory.Core().V1().Pods().Lister())), ) - algo := NewGenericScheduler( - cache, - internalcache.NewEmptySnapshot(), - schedulerapi.DefaultPercentageOfNodesToScore, - ) - errChan := make(chan error, 1) - sched := &Scheduler{ - Cache: cache, - Algorithm: algo, - NextPod: func() *framework.QueuedPodInfo { + sched := newScheduler( + cache, + nil, + func() *framework.QueuedPodInfo { return &framework.QueuedPodInfo{PodInfo: framework.NewPodInfo(clientcache.Pop(queuedPodStore).(*v1.Pod))} }, - Error: func(p *framework.QueuedPodInfo, err error) { + func(p *framework.QueuedPodInfo, err error) { errChan <- err }, - Profiles: profile.Map{ + nil, + schedulingQueue, + profile.Map{ testSchedulerName: fwk, }, - client: client, - SchedulingQueue: schedulingQueue, - } - + client, + internalcache.NewEmptySnapshot(), + schedulerapi.DefaultPercentageOfNodesToScore) return sched, bindingChan, errChan } @@ -1180,16 +1176,11 @@ func TestSchedulerBinding(t *testing.T) { } stop := make(chan struct{}) defer close(stop) - cache := internalcache.New(100*time.Millisecond, stop) - algo := NewGenericScheduler( - cache, - nil, - 0, - ) - sched := Scheduler{ - Algorithm: algo, - Extenders: test.extenders, - Cache: cache, + sched := &Scheduler{ + Extenders: test.extenders, + Cache: internalcache.New(100*time.Millisecond, stop), + nodeInfoSnapshot: nil, + percentageOfNodesToScore: 0, } err = sched.bind(context.Background(), fwk, pod, "node", nil) if err != nil { diff --git a/test/integration/scheduler/queue_test.go b/test/integration/scheduler/queue_test.go index f172540d224..442024493d5 100644 --- a/test/integration/scheduler/queue_test.go +++ b/test/integration/scheduler/queue_test.go @@ -100,7 +100,7 @@ func TestCoreResourceEnqueue(t *testing.T) { t.Fatalf("Cannot find the profile for Pod %v", podInfo.Pod.Name) } // Schedule the Pod manually. - _, fitError := testCtx.Scheduler.Algorithm.Schedule(ctx, nil, fwk, framework.NewCycleState(), podInfo.Pod) + _, fitError := testCtx.Scheduler.SchedulePod(ctx, fwk, framework.NewCycleState(), podInfo.Pod) if fitError == nil { t.Fatalf("Expect Pod %v to fail at scheduling.", podInfo.Pod.Name) } @@ -277,7 +277,7 @@ func TestCustomResourceEnqueue(t *testing.T) { t.Fatalf("Cannot find the profile for Pod %v", podInfo.Pod.Name) } // Schedule the Pod manually. - _, fitError := testCtx.Scheduler.Algorithm.Schedule(ctx, nil, fwk, framework.NewCycleState(), podInfo.Pod) + _, fitError := testCtx.Scheduler.SchedulePod(ctx, fwk, framework.NewCycleState(), podInfo.Pod) // The fitError is expected to be non-nil as it failed the fakeCRPlugin plugin. if fitError == nil { t.Fatalf("Expect Pod %v to fail at scheduling.", podInfo.Pod.Name)