From 110c39ef60ce055aa7239d1cf86098b8c03773e1 Mon Sep 17 00:00:00 2001 From: Jerry-Ge Date: Thu, 3 Jun 2021 10:38:57 +0800 Subject: [PATCH] unroll extenders Signed-off-by: Jerry Ge Co-authored-by: Huang-Wei --- .../apis/config/testing/compatibility_test.go | 2 +- pkg/scheduler/core/extender_test.go | 3 +- pkg/scheduler/core/generic_scheduler.go | 47 ++++++++----------- pkg/scheduler/core/generic_scheduler_test.go | 26 ++++------ pkg/scheduler/factory.go | 2 +- pkg/scheduler/scheduler.go | 6 ++- pkg/scheduler/scheduler_test.go | 9 +--- test/integration/scheduler/queue_test.go | 4 +- 8 files changed, 40 insertions(+), 59 deletions(-) diff --git a/pkg/scheduler/apis/config/testing/compatibility_test.go b/pkg/scheduler/apis/config/testing/compatibility_test.go index e1cbd51402b..d9e8a9b2193 100644 --- a/pkg/scheduler/apis/config/testing/compatibility_test.go +++ b/pkg/scheduler/apis/config/testing/compatibility_test.go @@ -1394,7 +1394,7 @@ func TestCompatibility_v1_Scheduler(t *testing.T) { t.Errorf("unexpected plugins diff (-want, +got): %s", diff) } - gotExtenders := sched.Algorithm.Extenders() + gotExtenders := sched.Extenders var wantExtenders []*core.HTTPExtender for _, e := range tc.wantExtenders { extender, err := core.NewHTTPExtender(&e) diff --git a/pkg/scheduler/core/extender_test.go b/pkg/scheduler/core/extender_test.go index 3bb78d3d69e..913c04588a3 100644 --- a/pkg/scheduler/core/extender_test.go +++ b/pkg/scheduler/core/extender_test.go @@ -280,10 +280,9 @@ func TestGenericSchedulerWithExtenders(t *testing.T) { scheduler := NewGenericScheduler( cache, emptySnapshot, - extenders, schedulerapi.DefaultPercentageOfNodesToScore) podIgnored := &v1.Pod{} - result, err := scheduler.Schedule(context.Background(), fwk, framework.NewCycleState(), podIgnored) + result, err := scheduler.Schedule(context.Background(), extenders, fwk, framework.NewCycleState(), podIgnored) if test.expectsErr { if err == nil { t.Errorf("Unexpected non-error, result %+v", result) diff --git a/pkg/scheduler/core/generic_scheduler.go b/pkg/scheduler/core/generic_scheduler.go index c9cdd774d50..304174dbefb 100644 --- a/pkg/scheduler/core/generic_scheduler.go +++ b/pkg/scheduler/core/generic_scheduler.go @@ -59,10 +59,7 @@ var ErrNoNodesAvailable = fmt.Errorf("no nodes available to schedule pods") // onto machines. // TODO: Rename this type. type ScheduleAlgorithm interface { - Schedule(context.Context, framework.Framework, *framework.CycleState, *v1.Pod) (scheduleResult ScheduleResult, err error) - // Extenders returns a slice of extender config. This is exposed for - // testing. - Extenders() []framework.Extender + Schedule(context.Context, []framework.Extender, framework.Framework, *framework.CycleState, *v1.Pod) (scheduleResult ScheduleResult, err error) } // ScheduleResult represents the result of one pod scheduled. It will contain @@ -78,7 +75,6 @@ type ScheduleResult struct { type genericScheduler struct { cache internalcache.Cache - extenders []framework.Extender nodeInfoSnapshot *internalcache.Snapshot percentageOfNodesToScore int32 nextStartNodeIndex int @@ -94,7 +90,7 @@ func (g *genericScheduler) snapshot() error { // Schedule tries to schedule the given pod to one of the nodes in the node list. // If it succeeds, it will return the name of the node. // If it fails, it will return a FitError error with reasons. -func (g *genericScheduler) Schedule(ctx context.Context, fwk framework.Framework, state *framework.CycleState, pod *v1.Pod) (result ScheduleResult, err error) { +func (g *genericScheduler) Schedule(ctx context.Context, extenders []framework.Extender, fwk framework.Framework, state *framework.CycleState, pod *v1.Pod) (result ScheduleResult, err error) { trace := utiltrace.New("Scheduling", utiltrace.Field{Key: "namespace", Value: pod.Namespace}, utiltrace.Field{Key: "name", Value: pod.Name}) defer trace.LogIfLong(100 * time.Millisecond) @@ -107,7 +103,7 @@ func (g *genericScheduler) Schedule(ctx context.Context, fwk framework.Framework return result, ErrNoNodesAvailable } - feasibleNodes, diagnosis, err := g.findNodesThatFitPod(ctx, fwk, state, pod) + feasibleNodes, diagnosis, err := g.findNodesThatFitPod(ctx, extenders, fwk, state, pod) if err != nil { return result, err } @@ -130,7 +126,7 @@ func (g *genericScheduler) Schedule(ctx context.Context, fwk framework.Framework }, nil } - priorityList, err := g.prioritizeNodes(ctx, fwk, state, pod, feasibleNodes) + priorityList, err := prioritizeNodes(ctx, extenders, fwk, state, pod, feasibleNodes) if err != nil { return result, err } @@ -145,10 +141,6 @@ func (g *genericScheduler) Schedule(ctx context.Context, fwk framework.Framework }, err } -func (g *genericScheduler) Extenders() []framework.Extender { - return g.extenders -} - // selectHost takes a prioritized list of nodes and then picks one // in a reservoir sampling manner from the nodes that had the highest score. func (g *genericScheduler) selectHost(nodeScoreList framework.NodeScoreList) (string, error) { @@ -198,7 +190,7 @@ func (g *genericScheduler) numFeasibleNodesToFind(numAllNodes int32) (numNodes i return numNodes } -func (g *genericScheduler) evaluateNominatedNode(ctx context.Context, pod *v1.Pod, fwk framework.Framework, state *framework.CycleState, diagnosis framework.Diagnosis) ([]*v1.Node, error) { +func (g *genericScheduler) evaluateNominatedNode(ctx context.Context, extenders []framework.Extender, pod *v1.Pod, fwk framework.Framework, state *framework.CycleState, diagnosis framework.Diagnosis) ([]*v1.Node, error) { nnn := pod.Status.NominatedNodeName nodeInfo, err := g.nodeInfoSnapshot.Get(nnn) if err != nil { @@ -210,7 +202,7 @@ func (g *genericScheduler) evaluateNominatedNode(ctx context.Context, pod *v1.Po return nil, err } - feasibleNodes, err = g.findNodesThatPassExtenders(pod, feasibleNodes, diagnosis.NodeToStatusMap) + feasibleNodes, err = findNodesThatPassExtenders(extenders, pod, feasibleNodes, diagnosis.NodeToStatusMap) if err != nil { return nil, err } @@ -220,7 +212,7 @@ func (g *genericScheduler) evaluateNominatedNode(ctx context.Context, pod *v1.Po // Filters the nodes to find the ones that fit the pod based on the framework // filter plugins and filter extenders. -func (g *genericScheduler) findNodesThatFitPod(ctx context.Context, fwk framework.Framework, state *framework.CycleState, pod *v1.Pod) ([]*v1.Node, framework.Diagnosis, error) { +func (g *genericScheduler) findNodesThatFitPod(ctx context.Context, extenders []framework.Extender, fwk framework.Framework, state *framework.CycleState, pod *v1.Pod) ([]*v1.Node, framework.Diagnosis, error) { diagnosis := framework.Diagnosis{ NodeToStatusMap: make(framework.NodeToStatusMap), UnschedulablePlugins: sets.NewString(), @@ -249,7 +241,7 @@ func (g *genericScheduler) findNodesThatFitPod(ctx context.Context, fwk framewor // "NominatedNodeName" can potentially be set in a previous scheduling cycle as a result of preemption. // This node is likely the only candidate that will fit the pod, and hence we try it first before iterating over all nodes. if len(pod.Status.NominatedNodeName) > 0 && feature.DefaultFeatureGate.Enabled(features.PreferNominatedNode) { - feasibleNodes, err := g.evaluateNominatedNode(ctx, pod, fwk, state, diagnosis) + feasibleNodes, err := g.evaluateNominatedNode(ctx, extenders, pod, fwk, state, diagnosis) if err != nil { klog.ErrorS(err, "Evaluation failed on nominated node", "pod", klog.KObj(pod), "node", pod.Status.NominatedNodeName) } @@ -263,7 +255,7 @@ func (g *genericScheduler) findNodesThatFitPod(ctx context.Context, fwk framewor return nil, diagnosis, err } - feasibleNodes, err = g.findNodesThatPassExtenders(pod, feasibleNodes, diagnosis.NodeToStatusMap) + feasibleNodes, err = findNodesThatPassExtenders(extenders, pod, feasibleNodes, diagnosis.NodeToStatusMap) if err != nil { return nil, diagnosis, err } @@ -345,11 +337,11 @@ func (g *genericScheduler) findNodesThatPassFilters( return feasibleNodes, nil } -func (g *genericScheduler) findNodesThatPassExtenders(pod *v1.Pod, feasibleNodes []*v1.Node, statuses framework.NodeToStatusMap) ([]*v1.Node, error) { +func findNodesThatPassExtenders(extenders []framework.Extender, pod *v1.Pod, feasibleNodes []*v1.Node, statuses framework.NodeToStatusMap) ([]*v1.Node, error) { // Extenders are called sequentially. // Nodes in original feasibleNodes can be excluded in one extender, and pass on to the next // extender in a decreasing manner. - for _, extender := range g.extenders { + for _, extender := range extenders { if len(feasibleNodes) == 0 { break } @@ -403,8 +395,9 @@ func (g *genericScheduler) findNodesThatPassExtenders(pod *v1.Pod, feasibleNodes // The scores from each plugin are added together to make the score for that node, then // any extenders are run as well. // All scores are finally combined (added) to get the total weighted scores of all nodes -func (g *genericScheduler) prioritizeNodes( +func prioritizeNodes( ctx context.Context, + extenders []framework.Extender, fwk framework.Framework, state *framework.CycleState, pod *v1.Pod, @@ -412,7 +405,7 @@ func (g *genericScheduler) prioritizeNodes( ) (framework.NodeScoreList, error) { // If no priority configs are provided, then all nodes will have a score of one. // This is required to generate the priority list in the required format - if len(g.extenders) == 0 && !fwk.HasScorePlugins() { + if len(extenders) == 0 && !fwk.HasScorePlugins() { result := make(framework.NodeScoreList, 0, len(nodes)) for i := range nodes { result = append(result, framework.NodeScore{ @@ -453,12 +446,12 @@ func (g *genericScheduler) prioritizeNodes( } } - if len(g.extenders) != 0 && nodes != nil { + if len(extenders) != 0 && nodes != nil { var mu sync.Mutex var wg sync.WaitGroup combinedScores := make(map[string]int64, len(nodes)) - for i := range g.extenders { - if !g.extenders[i].IsInterested(pod) { + for i := range extenders { + if !extenders[i].IsInterested(pod) { continue } wg.Add(1) @@ -468,7 +461,7 @@ func (g *genericScheduler) prioritizeNodes( metrics.SchedulerGoroutines.WithLabelValues(metrics.PrioritizingExtender).Dec() wg.Done() }() - prioritizedList, weight, err := g.extenders[extIndex].Prioritize(pod, nodes) + prioritizedList, weight, err := extenders[extIndex].Prioritize(pod, nodes) if err != nil { // Prioritization errors from extender can be ignored, let k8s/other extenders determine the priorities return @@ -477,7 +470,7 @@ func (g *genericScheduler) prioritizeNodes( for i := range *prioritizedList { host, score := (*prioritizedList)[i].Host, (*prioritizedList)[i].Score if klog.V(10).Enabled() { - klog.InfoS("Extender scored node for pod", "pod", klog.KObj(pod), "extender", g.extenders[extIndex].Name(), "node", host, "score", score) + klog.InfoS("Extender scored node for pod", "pod", klog.KObj(pod), "extender", extenders[extIndex].Name(), "node", host, "score", score) } combinedScores[host] += score * weight } @@ -505,11 +498,9 @@ func (g *genericScheduler) prioritizeNodes( func NewGenericScheduler( cache internalcache.Cache, nodeInfoSnapshot *internalcache.Snapshot, - extenders []framework.Extender, percentageOfNodesToScore int32) ScheduleAlgorithm { return &genericScheduler{ cache: cache, - extenders: extenders, nodeInfoSnapshot: nodeInfoSnapshot, percentageOfNodesToScore: percentageOfNodesToScore, } diff --git a/pkg/scheduler/core/generic_scheduler_test.go b/pkg/scheduler/core/generic_scheduler_test.go index e27261a9b3d..54ea8ee247d 100644 --- a/pkg/scheduler/core/generic_scheduler_test.go +++ b/pkg/scheduler/core/generic_scheduler_test.go @@ -425,11 +425,9 @@ func TestFindNodesThatPassExtenders(t *testing.T) { for ii := range tt.extenders { extenders = append(extenders, &tt.extenders[ii]) } - scheduler := &genericScheduler{ - extenders: extenders, - } + pod := &v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "1", UID: types.UID("1")}} - got, err := scheduler.findNodesThatPassExtenders(pod, tt.nodes, tt.filteredNodesStatuses) + got, err := findNodesThatPassExtenders(extenders, pod, tt.nodes, tt.filteredNodesStatuses) if tt.expectsErr { if err == nil { t.Error("Unexpected non-error") @@ -1006,13 +1004,12 @@ func TestGenericScheduler(t *testing.T) { scheduler := NewGenericScheduler( cache, snapshot, - []framework.Extender{}, schedulerapi.DefaultPercentageOfNodesToScore, ) informerFactory.Start(ctx.Done()) informerFactory.WaitForCacheSync(ctx.Done()) - result, err := scheduler.Schedule(ctx, fwk, framework.NewCycleState(), test.pod) + result, err := scheduler.Schedule(ctx, nil, fwk, framework.NewCycleState(), test.pod) // TODO(#94696): replace reflect.DeepEqual with cmp.Diff(). if err != test.wErr { gotFitErr, gotOK := err.(*framework.FitError) @@ -1043,7 +1040,6 @@ func makeScheduler(nodes []*v1.Node) *genericScheduler { s := NewGenericScheduler( cache, emptySnapshot, - nil, schedulerapi.DefaultPercentageOfNodesToScore) cache.UpdateSnapshot(s.(*genericScheduler).nodeInfoSnapshot) return s.(*genericScheduler) @@ -1066,7 +1062,7 @@ func TestFindFitAllError(t *testing.T) { t.Fatal(err) } - _, diagnosis, err := scheduler.findNodesThatFitPod(context.Background(), fwk, framework.NewCycleState(), &v1.Pod{}) + _, diagnosis, err := scheduler.findNodesThatFitPod(context.Background(), nil, fwk, framework.NewCycleState(), &v1.Pod{}) if err != nil { t.Errorf("unexpected error: %v", err) @@ -1100,7 +1096,7 @@ func TestFindFitSomeError(t *testing.T) { } pod := &v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "1", UID: types.UID("1")}} - _, diagnosis, err := scheduler.findNodesThatFitPod(context.Background(), fwk, framework.NewCycleState(), pod) + _, diagnosis, err := scheduler.findNodesThatFitPod(context.Background(), nil, fwk, framework.NewCycleState(), pod) if err != nil { t.Errorf("unexpected error: %v", err) @@ -1179,7 +1175,7 @@ func TestFindFitPredicateCallCounts(t *testing.T) { } fwk.AddNominatedPod(framework.NewPodInfo(&v1.Pod{ObjectMeta: metav1.ObjectMeta{UID: "nominated"}, Spec: v1.PodSpec{Priority: &midPriority}}), "1") - _, _, err = scheduler.findNodesThatFitPod(context.Background(), fwk, framework.NewCycleState(), test.pod) + _, _, err = scheduler.findNodesThatFitPod(context.Background(), nil, fwk, framework.NewCycleState(), test.pod) if err != nil { t.Errorf("unexpected error: %v", err) @@ -1338,18 +1334,17 @@ func TestZeroRequest(t *testing.T) { scheduler := NewGenericScheduler( nil, emptySnapshot, - []framework.Extender{}, schedulerapi.DefaultPercentageOfNodesToScore).(*genericScheduler) scheduler.nodeInfoSnapshot = snapshot ctx := context.Background() state := framework.NewCycleState() - _, _, err = scheduler.findNodesThatFitPod(ctx, fwk, state, test.pod) + _, _, err = scheduler.findNodesThatFitPod(ctx, nil, fwk, state, test.pod) if err != nil { t.Fatalf("error filtering nodes: %+v", err) } fwk.RunPreScorePlugins(ctx, state, test.pod, test.nodes) - list, err := scheduler.prioritizeNodes(ctx, fwk, state, test.pod, test.nodes) + list, err := prioritizeNodes(ctx, nil, fwk, state, test.pod, test.nodes) if err != nil { t.Errorf("unexpected error: %v", err) } @@ -1444,7 +1439,7 @@ func TestFairEvaluationForNodes(t *testing.T) { // Iterating over all nodes more than twice for i := 0; i < 2*(numAllNodes/nodesToFind+1); i++ { - nodesThatFit, _, err := g.findNodesThatFitPod(context.Background(), fwk, framework.NewCycleState(), &v1.Pod{}) + nodesThatFit, _, err := g.findNodesThatFitPod(context.Background(), nil, fwk, framework.NewCycleState(), &v1.Pod{}) if err != nil { t.Errorf("unexpected error: %v", err) } @@ -1527,10 +1522,9 @@ func TestPreferNominatedNodeFilterCallCounts(t *testing.T) { scheduler := NewGenericScheduler( cache, snapshot, - []framework.Extender{}, schedulerapi.DefaultPercentageOfNodesToScore).(*genericScheduler) - _, _, err = scheduler.findNodesThatFitPod(context.Background(), fwk, framework.NewCycleState(), test.pod) + _, _, err = scheduler.findNodesThatFitPod(context.Background(), nil, fwk, framework.NewCycleState(), test.pod) if err != nil { t.Errorf("unexpected error: %v", err) diff --git a/pkg/scheduler/factory.go b/pkg/scheduler/factory.go index 089a3469be1..91ab541d420 100644 --- a/pkg/scheduler/factory.go +++ b/pkg/scheduler/factory.go @@ -177,13 +177,13 @@ func (c *Configurator) create() (*Scheduler, error) { algo := core.NewGenericScheduler( c.schedulerCache, c.nodeInfoSnapshot, - extenders, c.percentageOfNodesToScore, ) return &Scheduler{ SchedulerCache: c.schedulerCache, Algorithm: algo, + Extenders: extenders, Profiles: profiles, NextPod: internalqueue.MakeNextPodFunc(podQueue), Error: MakeDefaultErrorFunc(c.client, c.informerFactory.Core().V1().Pods().Lister(), podQueue, c.schedulerCache), diff --git a/pkg/scheduler/scheduler.go b/pkg/scheduler/scheduler.go index 8d4d734c731..b8bc65f23e2 100644 --- a/pkg/scheduler/scheduler.go +++ b/pkg/scheduler/scheduler.go @@ -71,6 +71,8 @@ type Scheduler struct { Algorithm core.ScheduleAlgorithm + Extenders []framework.Extender + // NextPod should be a function that blocks until the next pod // is available. We don't use a channel for this, because scheduling // a pod may take some amount of time and we don't want pods to get @@ -449,7 +451,7 @@ func (sched *Scheduler) bind(ctx context.Context, fwk framework.Framework, assum // TODO(#87159): Move this to a Plugin. func (sched *Scheduler) extendersBinding(pod *v1.Pod, node string) (bool, error) { - for _, extender := range sched.Algorithm.Extenders() { + for _, extender := range sched.Extenders { if !extender.IsBinder() || !extender.IsInterested(pod) { continue } @@ -500,7 +502,7 @@ func (sched *Scheduler) scheduleOne(ctx context.Context) { state.SetRecordPluginMetrics(rand.Intn(100) < pluginMetricsSamplePercent) schedulingCycleCtx, cancel := context.WithCancel(ctx) defer cancel() - scheduleResult, err := sched.Algorithm.Schedule(schedulingCycleCtx, fwk, state, pod) + scheduleResult, err := sched.Algorithm.Schedule(schedulingCycleCtx, sched.Extenders, fwk, state, pod) if err != nil { // Schedule() may have failed because the pod would not fit on any host, so we try to // preempt, with the expectation that the next time the pod is tried for scheduling it diff --git a/pkg/scheduler/scheduler_test.go b/pkg/scheduler/scheduler_test.go index 83de1c3e64e..f961f812ab1 100644 --- a/pkg/scheduler/scheduler_test.go +++ b/pkg/scheduler/scheduler_test.go @@ -114,14 +114,10 @@ type mockScheduler struct { err error } -func (es mockScheduler) Schedule(ctx context.Context, fwk framework.Framework, state *framework.CycleState, pod *v1.Pod) (core.ScheduleResult, error) { +func (es mockScheduler) Schedule(ctx context.Context, extenders []framework.Extender, fwk framework.Framework, state *framework.CycleState, pod *v1.Pod) (core.ScheduleResult, error) { return es.result, es.err } -func (es mockScheduler) Extenders() []framework.Extender { - return nil -} - func TestSchedulerCreation(t *testing.T) { invalidRegistry := map[string]frameworkruntime.PluginFactory{ defaultbinder.Name: defaultbinder.New, @@ -837,7 +833,6 @@ func setupTestScheduler(queuedPodStore *clientcache.FIFO, scache internalcache.C algo := core.NewGenericScheduler( scache, internalcache.NewEmptySnapshot(), - []framework.Extender{}, schedulerapi.DefaultPercentageOfNodesToScore, ) @@ -1184,11 +1179,11 @@ func TestSchedulerBinding(t *testing.T) { algo := core.NewGenericScheduler( scache, nil, - test.extenders, 0, ) sched := Scheduler{ Algorithm: algo, + Extenders: test.extenders, SchedulerCache: scache, } err = sched.bind(context.Background(), fwk, pod, "node", nil) diff --git a/test/integration/scheduler/queue_test.go b/test/integration/scheduler/queue_test.go index 895f1441179..e2a99e3274f 100644 --- a/test/integration/scheduler/queue_test.go +++ b/test/integration/scheduler/queue_test.go @@ -137,7 +137,7 @@ func TestServiceAffinityEnqueue(t *testing.T) { t.Fatalf("Cannot find the profile for Pod %v", podInfo.Pod.Name) } // Schedule the Pod manually. - _, fitError := testCtx.Scheduler.Algorithm.Schedule(ctx, fwk, framework.NewCycleState(), podInfo.Pod) + _, fitError := testCtx.Scheduler.Algorithm.Schedule(ctx, nil, fwk, framework.NewCycleState(), podInfo.Pod) // The fitError is expected to be: // 0/2 nodes are available: 1 Too many pods, 1 node(s) didn't match service affinity. if fitError == nil { @@ -301,7 +301,7 @@ func TestCustomResourceEnqueue(t *testing.T) { t.Fatalf("Cannot find the profile for Pod %v", podInfo.Pod.Name) } // Schedule the Pod manually. - _, fitError := testCtx.Scheduler.Algorithm.Schedule(ctx, fwk, framework.NewCycleState(), podInfo.Pod) + _, fitError := testCtx.Scheduler.Algorithm.Schedule(ctx, nil, fwk, framework.NewCycleState(), podInfo.Pod) // The fitError is expected to be non-nil as it failed the fakeCRPlugin plugin. if fitError == nil { t.Fatalf("Expect Pod %v to fail at scheduling.", podInfo.Pod.Name)