unroll extenders

Signed-off-by: Jerry Ge <jerry.ge@arm.com>

Co-authored-by: Huang-Wei <wei.huang1@ibm.com>
This commit is contained in:
Jerry-Ge 2021-06-03 10:38:57 +08:00
parent 2cefcc6be7
commit 110c39ef60
8 changed files with 40 additions and 59 deletions

View File

@ -1394,7 +1394,7 @@ func TestCompatibility_v1_Scheduler(t *testing.T) {
t.Errorf("unexpected plugins diff (-want, +got): %s", diff)
}
gotExtenders := sched.Algorithm.Extenders()
gotExtenders := sched.Extenders
var wantExtenders []*core.HTTPExtender
for _, e := range tc.wantExtenders {
extender, err := core.NewHTTPExtender(&e)

View File

@ -280,10 +280,9 @@ func TestGenericSchedulerWithExtenders(t *testing.T) {
scheduler := NewGenericScheduler(
cache,
emptySnapshot,
extenders,
schedulerapi.DefaultPercentageOfNodesToScore)
podIgnored := &v1.Pod{}
result, err := scheduler.Schedule(context.Background(), fwk, framework.NewCycleState(), podIgnored)
result, err := scheduler.Schedule(context.Background(), extenders, fwk, framework.NewCycleState(), podIgnored)
if test.expectsErr {
if err == nil {
t.Errorf("Unexpected non-error, result %+v", result)

View File

@ -59,10 +59,7 @@ var ErrNoNodesAvailable = fmt.Errorf("no nodes available to schedule pods")
// onto machines.
// TODO: Rename this type.
type ScheduleAlgorithm interface {
Schedule(context.Context, framework.Framework, *framework.CycleState, *v1.Pod) (scheduleResult ScheduleResult, err error)
// Extenders returns a slice of extender config. This is exposed for
// testing.
Extenders() []framework.Extender
Schedule(context.Context, []framework.Extender, framework.Framework, *framework.CycleState, *v1.Pod) (scheduleResult ScheduleResult, err error)
}
// ScheduleResult represents the result of one pod scheduled. It will contain
@ -78,7 +75,6 @@ type ScheduleResult struct {
type genericScheduler struct {
cache internalcache.Cache
extenders []framework.Extender
nodeInfoSnapshot *internalcache.Snapshot
percentageOfNodesToScore int32
nextStartNodeIndex int
@ -94,7 +90,7 @@ func (g *genericScheduler) snapshot() error {
// Schedule tries to schedule the given pod to one of the nodes in the node list.
// If it succeeds, it will return the name of the node.
// If it fails, it will return a FitError error with reasons.
func (g *genericScheduler) Schedule(ctx context.Context, fwk framework.Framework, state *framework.CycleState, pod *v1.Pod) (result ScheduleResult, err error) {
func (g *genericScheduler) Schedule(ctx context.Context, extenders []framework.Extender, fwk framework.Framework, state *framework.CycleState, pod *v1.Pod) (result ScheduleResult, err error) {
trace := utiltrace.New("Scheduling", utiltrace.Field{Key: "namespace", Value: pod.Namespace}, utiltrace.Field{Key: "name", Value: pod.Name})
defer trace.LogIfLong(100 * time.Millisecond)
@ -107,7 +103,7 @@ func (g *genericScheduler) Schedule(ctx context.Context, fwk framework.Framework
return result, ErrNoNodesAvailable
}
feasibleNodes, diagnosis, err := g.findNodesThatFitPod(ctx, fwk, state, pod)
feasibleNodes, diagnosis, err := g.findNodesThatFitPod(ctx, extenders, fwk, state, pod)
if err != nil {
return result, err
}
@ -130,7 +126,7 @@ func (g *genericScheduler) Schedule(ctx context.Context, fwk framework.Framework
}, nil
}
priorityList, err := g.prioritizeNodes(ctx, fwk, state, pod, feasibleNodes)
priorityList, err := prioritizeNodes(ctx, extenders, fwk, state, pod, feasibleNodes)
if err != nil {
return result, err
}
@ -145,10 +141,6 @@ func (g *genericScheduler) Schedule(ctx context.Context, fwk framework.Framework
}, err
}
func (g *genericScheduler) Extenders() []framework.Extender {
return g.extenders
}
// selectHost takes a prioritized list of nodes and then picks one
// in a reservoir sampling manner from the nodes that had the highest score.
func (g *genericScheduler) selectHost(nodeScoreList framework.NodeScoreList) (string, error) {
@ -198,7 +190,7 @@ func (g *genericScheduler) numFeasibleNodesToFind(numAllNodes int32) (numNodes i
return numNodes
}
func (g *genericScheduler) evaluateNominatedNode(ctx context.Context, pod *v1.Pod, fwk framework.Framework, state *framework.CycleState, diagnosis framework.Diagnosis) ([]*v1.Node, error) {
func (g *genericScheduler) evaluateNominatedNode(ctx context.Context, extenders []framework.Extender, pod *v1.Pod, fwk framework.Framework, state *framework.CycleState, diagnosis framework.Diagnosis) ([]*v1.Node, error) {
nnn := pod.Status.NominatedNodeName
nodeInfo, err := g.nodeInfoSnapshot.Get(nnn)
if err != nil {
@ -210,7 +202,7 @@ func (g *genericScheduler) evaluateNominatedNode(ctx context.Context, pod *v1.Po
return nil, err
}
feasibleNodes, err = g.findNodesThatPassExtenders(pod, feasibleNodes, diagnosis.NodeToStatusMap)
feasibleNodes, err = findNodesThatPassExtenders(extenders, pod, feasibleNodes, diagnosis.NodeToStatusMap)
if err != nil {
return nil, err
}
@ -220,7 +212,7 @@ func (g *genericScheduler) evaluateNominatedNode(ctx context.Context, pod *v1.Po
// Filters the nodes to find the ones that fit the pod based on the framework
// filter plugins and filter extenders.
func (g *genericScheduler) findNodesThatFitPod(ctx context.Context, fwk framework.Framework, state *framework.CycleState, pod *v1.Pod) ([]*v1.Node, framework.Diagnosis, error) {
func (g *genericScheduler) findNodesThatFitPod(ctx context.Context, extenders []framework.Extender, fwk framework.Framework, state *framework.CycleState, pod *v1.Pod) ([]*v1.Node, framework.Diagnosis, error) {
diagnosis := framework.Diagnosis{
NodeToStatusMap: make(framework.NodeToStatusMap),
UnschedulablePlugins: sets.NewString(),
@ -249,7 +241,7 @@ func (g *genericScheduler) findNodesThatFitPod(ctx context.Context, fwk framewor
// "NominatedNodeName" can potentially be set in a previous scheduling cycle as a result of preemption.
// This node is likely the only candidate that will fit the pod, and hence we try it first before iterating over all nodes.
if len(pod.Status.NominatedNodeName) > 0 && feature.DefaultFeatureGate.Enabled(features.PreferNominatedNode) {
feasibleNodes, err := g.evaluateNominatedNode(ctx, pod, fwk, state, diagnosis)
feasibleNodes, err := g.evaluateNominatedNode(ctx, extenders, pod, fwk, state, diagnosis)
if err != nil {
klog.ErrorS(err, "Evaluation failed on nominated node", "pod", klog.KObj(pod), "node", pod.Status.NominatedNodeName)
}
@ -263,7 +255,7 @@ func (g *genericScheduler) findNodesThatFitPod(ctx context.Context, fwk framewor
return nil, diagnosis, err
}
feasibleNodes, err = g.findNodesThatPassExtenders(pod, feasibleNodes, diagnosis.NodeToStatusMap)
feasibleNodes, err = findNodesThatPassExtenders(extenders, pod, feasibleNodes, diagnosis.NodeToStatusMap)
if err != nil {
return nil, diagnosis, err
}
@ -345,11 +337,11 @@ func (g *genericScheduler) findNodesThatPassFilters(
return feasibleNodes, nil
}
func (g *genericScheduler) findNodesThatPassExtenders(pod *v1.Pod, feasibleNodes []*v1.Node, statuses framework.NodeToStatusMap) ([]*v1.Node, error) {
func findNodesThatPassExtenders(extenders []framework.Extender, pod *v1.Pod, feasibleNodes []*v1.Node, statuses framework.NodeToStatusMap) ([]*v1.Node, error) {
// Extenders are called sequentially.
// Nodes in original feasibleNodes can be excluded in one extender, and pass on to the next
// extender in a decreasing manner.
for _, extender := range g.extenders {
for _, extender := range extenders {
if len(feasibleNodes) == 0 {
break
}
@ -403,8 +395,9 @@ func (g *genericScheduler) findNodesThatPassExtenders(pod *v1.Pod, feasibleNodes
// The scores from each plugin are added together to make the score for that node, then
// any extenders are run as well.
// All scores are finally combined (added) to get the total weighted scores of all nodes
func (g *genericScheduler) prioritizeNodes(
func prioritizeNodes(
ctx context.Context,
extenders []framework.Extender,
fwk framework.Framework,
state *framework.CycleState,
pod *v1.Pod,
@ -412,7 +405,7 @@ func (g *genericScheduler) prioritizeNodes(
) (framework.NodeScoreList, error) {
// If no priority configs are provided, then all nodes will have a score of one.
// This is required to generate the priority list in the required format
if len(g.extenders) == 0 && !fwk.HasScorePlugins() {
if len(extenders) == 0 && !fwk.HasScorePlugins() {
result := make(framework.NodeScoreList, 0, len(nodes))
for i := range nodes {
result = append(result, framework.NodeScore{
@ -453,12 +446,12 @@ func (g *genericScheduler) prioritizeNodes(
}
}
if len(g.extenders) != 0 && nodes != nil {
if len(extenders) != 0 && nodes != nil {
var mu sync.Mutex
var wg sync.WaitGroup
combinedScores := make(map[string]int64, len(nodes))
for i := range g.extenders {
if !g.extenders[i].IsInterested(pod) {
for i := range extenders {
if !extenders[i].IsInterested(pod) {
continue
}
wg.Add(1)
@ -468,7 +461,7 @@ func (g *genericScheduler) prioritizeNodes(
metrics.SchedulerGoroutines.WithLabelValues(metrics.PrioritizingExtender).Dec()
wg.Done()
}()
prioritizedList, weight, err := g.extenders[extIndex].Prioritize(pod, nodes)
prioritizedList, weight, err := extenders[extIndex].Prioritize(pod, nodes)
if err != nil {
// Prioritization errors from extender can be ignored, let k8s/other extenders determine the priorities
return
@ -477,7 +470,7 @@ func (g *genericScheduler) prioritizeNodes(
for i := range *prioritizedList {
host, score := (*prioritizedList)[i].Host, (*prioritizedList)[i].Score
if klog.V(10).Enabled() {
klog.InfoS("Extender scored node for pod", "pod", klog.KObj(pod), "extender", g.extenders[extIndex].Name(), "node", host, "score", score)
klog.InfoS("Extender scored node for pod", "pod", klog.KObj(pod), "extender", extenders[extIndex].Name(), "node", host, "score", score)
}
combinedScores[host] += score * weight
}
@ -505,11 +498,9 @@ func (g *genericScheduler) prioritizeNodes(
func NewGenericScheduler(
cache internalcache.Cache,
nodeInfoSnapshot *internalcache.Snapshot,
extenders []framework.Extender,
percentageOfNodesToScore int32) ScheduleAlgorithm {
return &genericScheduler{
cache: cache,
extenders: extenders,
nodeInfoSnapshot: nodeInfoSnapshot,
percentageOfNodesToScore: percentageOfNodesToScore,
}

View File

@ -425,11 +425,9 @@ func TestFindNodesThatPassExtenders(t *testing.T) {
for ii := range tt.extenders {
extenders = append(extenders, &tt.extenders[ii])
}
scheduler := &genericScheduler{
extenders: extenders,
}
pod := &v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "1", UID: types.UID("1")}}
got, err := scheduler.findNodesThatPassExtenders(pod, tt.nodes, tt.filteredNodesStatuses)
got, err := findNodesThatPassExtenders(extenders, pod, tt.nodes, tt.filteredNodesStatuses)
if tt.expectsErr {
if err == nil {
t.Error("Unexpected non-error")
@ -1006,13 +1004,12 @@ func TestGenericScheduler(t *testing.T) {
scheduler := NewGenericScheduler(
cache,
snapshot,
[]framework.Extender{},
schedulerapi.DefaultPercentageOfNodesToScore,
)
informerFactory.Start(ctx.Done())
informerFactory.WaitForCacheSync(ctx.Done())
result, err := scheduler.Schedule(ctx, fwk, framework.NewCycleState(), test.pod)
result, err := scheduler.Schedule(ctx, nil, fwk, framework.NewCycleState(), test.pod)
// TODO(#94696): replace reflect.DeepEqual with cmp.Diff().
if err != test.wErr {
gotFitErr, gotOK := err.(*framework.FitError)
@ -1043,7 +1040,6 @@ func makeScheduler(nodes []*v1.Node) *genericScheduler {
s := NewGenericScheduler(
cache,
emptySnapshot,
nil,
schedulerapi.DefaultPercentageOfNodesToScore)
cache.UpdateSnapshot(s.(*genericScheduler).nodeInfoSnapshot)
return s.(*genericScheduler)
@ -1066,7 +1062,7 @@ func TestFindFitAllError(t *testing.T) {
t.Fatal(err)
}
_, diagnosis, err := scheduler.findNodesThatFitPod(context.Background(), fwk, framework.NewCycleState(), &v1.Pod{})
_, diagnosis, err := scheduler.findNodesThatFitPod(context.Background(), nil, fwk, framework.NewCycleState(), &v1.Pod{})
if err != nil {
t.Errorf("unexpected error: %v", err)
@ -1100,7 +1096,7 @@ func TestFindFitSomeError(t *testing.T) {
}
pod := &v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "1", UID: types.UID("1")}}
_, diagnosis, err := scheduler.findNodesThatFitPod(context.Background(), fwk, framework.NewCycleState(), pod)
_, diagnosis, err := scheduler.findNodesThatFitPod(context.Background(), nil, fwk, framework.NewCycleState(), pod)
if err != nil {
t.Errorf("unexpected error: %v", err)
@ -1179,7 +1175,7 @@ func TestFindFitPredicateCallCounts(t *testing.T) {
}
fwk.AddNominatedPod(framework.NewPodInfo(&v1.Pod{ObjectMeta: metav1.ObjectMeta{UID: "nominated"}, Spec: v1.PodSpec{Priority: &midPriority}}), "1")
_, _, err = scheduler.findNodesThatFitPod(context.Background(), fwk, framework.NewCycleState(), test.pod)
_, _, err = scheduler.findNodesThatFitPod(context.Background(), nil, fwk, framework.NewCycleState(), test.pod)
if err != nil {
t.Errorf("unexpected error: %v", err)
@ -1338,18 +1334,17 @@ func TestZeroRequest(t *testing.T) {
scheduler := NewGenericScheduler(
nil,
emptySnapshot,
[]framework.Extender{},
schedulerapi.DefaultPercentageOfNodesToScore).(*genericScheduler)
scheduler.nodeInfoSnapshot = snapshot
ctx := context.Background()
state := framework.NewCycleState()
_, _, err = scheduler.findNodesThatFitPod(ctx, fwk, state, test.pod)
_, _, err = scheduler.findNodesThatFitPod(ctx, nil, fwk, state, test.pod)
if err != nil {
t.Fatalf("error filtering nodes: %+v", err)
}
fwk.RunPreScorePlugins(ctx, state, test.pod, test.nodes)
list, err := scheduler.prioritizeNodes(ctx, fwk, state, test.pod, test.nodes)
list, err := prioritizeNodes(ctx, nil, fwk, state, test.pod, test.nodes)
if err != nil {
t.Errorf("unexpected error: %v", err)
}
@ -1444,7 +1439,7 @@ func TestFairEvaluationForNodes(t *testing.T) {
// Iterating over all nodes more than twice
for i := 0; i < 2*(numAllNodes/nodesToFind+1); i++ {
nodesThatFit, _, err := g.findNodesThatFitPod(context.Background(), fwk, framework.NewCycleState(), &v1.Pod{})
nodesThatFit, _, err := g.findNodesThatFitPod(context.Background(), nil, fwk, framework.NewCycleState(), &v1.Pod{})
if err != nil {
t.Errorf("unexpected error: %v", err)
}
@ -1527,10 +1522,9 @@ func TestPreferNominatedNodeFilterCallCounts(t *testing.T) {
scheduler := NewGenericScheduler(
cache,
snapshot,
[]framework.Extender{},
schedulerapi.DefaultPercentageOfNodesToScore).(*genericScheduler)
_, _, err = scheduler.findNodesThatFitPod(context.Background(), fwk, framework.NewCycleState(), test.pod)
_, _, err = scheduler.findNodesThatFitPod(context.Background(), nil, fwk, framework.NewCycleState(), test.pod)
if err != nil {
t.Errorf("unexpected error: %v", err)

View File

@ -177,13 +177,13 @@ func (c *Configurator) create() (*Scheduler, error) {
algo := core.NewGenericScheduler(
c.schedulerCache,
c.nodeInfoSnapshot,
extenders,
c.percentageOfNodesToScore,
)
return &Scheduler{
SchedulerCache: c.schedulerCache,
Algorithm: algo,
Extenders: extenders,
Profiles: profiles,
NextPod: internalqueue.MakeNextPodFunc(podQueue),
Error: MakeDefaultErrorFunc(c.client, c.informerFactory.Core().V1().Pods().Lister(), podQueue, c.schedulerCache),

View File

@ -71,6 +71,8 @@ type Scheduler struct {
Algorithm core.ScheduleAlgorithm
Extenders []framework.Extender
// NextPod should be a function that blocks until the next pod
// is available. We don't use a channel for this, because scheduling
// a pod may take some amount of time and we don't want pods to get
@ -449,7 +451,7 @@ func (sched *Scheduler) bind(ctx context.Context, fwk framework.Framework, assum
// TODO(#87159): Move this to a Plugin.
func (sched *Scheduler) extendersBinding(pod *v1.Pod, node string) (bool, error) {
for _, extender := range sched.Algorithm.Extenders() {
for _, extender := range sched.Extenders {
if !extender.IsBinder() || !extender.IsInterested(pod) {
continue
}
@ -500,7 +502,7 @@ func (sched *Scheduler) scheduleOne(ctx context.Context) {
state.SetRecordPluginMetrics(rand.Intn(100) < pluginMetricsSamplePercent)
schedulingCycleCtx, cancel := context.WithCancel(ctx)
defer cancel()
scheduleResult, err := sched.Algorithm.Schedule(schedulingCycleCtx, fwk, state, pod)
scheduleResult, err := sched.Algorithm.Schedule(schedulingCycleCtx, sched.Extenders, fwk, state, pod)
if err != nil {
// Schedule() may have failed because the pod would not fit on any host, so we try to
// preempt, with the expectation that the next time the pod is tried for scheduling it

View File

@ -114,14 +114,10 @@ type mockScheduler struct {
err error
}
func (es mockScheduler) Schedule(ctx context.Context, fwk framework.Framework, state *framework.CycleState, pod *v1.Pod) (core.ScheduleResult, error) {
func (es mockScheduler) Schedule(ctx context.Context, extenders []framework.Extender, fwk framework.Framework, state *framework.CycleState, pod *v1.Pod) (core.ScheduleResult, error) {
return es.result, es.err
}
func (es mockScheduler) Extenders() []framework.Extender {
return nil
}
func TestSchedulerCreation(t *testing.T) {
invalidRegistry := map[string]frameworkruntime.PluginFactory{
defaultbinder.Name: defaultbinder.New,
@ -837,7 +833,6 @@ func setupTestScheduler(queuedPodStore *clientcache.FIFO, scache internalcache.C
algo := core.NewGenericScheduler(
scache,
internalcache.NewEmptySnapshot(),
[]framework.Extender{},
schedulerapi.DefaultPercentageOfNodesToScore,
)
@ -1184,11 +1179,11 @@ func TestSchedulerBinding(t *testing.T) {
algo := core.NewGenericScheduler(
scache,
nil,
test.extenders,
0,
)
sched := Scheduler{
Algorithm: algo,
Extenders: test.extenders,
SchedulerCache: scache,
}
err = sched.bind(context.Background(), fwk, pod, "node", nil)

View File

@ -137,7 +137,7 @@ func TestServiceAffinityEnqueue(t *testing.T) {
t.Fatalf("Cannot find the profile for Pod %v", podInfo.Pod.Name)
}
// Schedule the Pod manually.
_, fitError := testCtx.Scheduler.Algorithm.Schedule(ctx, fwk, framework.NewCycleState(), podInfo.Pod)
_, fitError := testCtx.Scheduler.Algorithm.Schedule(ctx, nil, fwk, framework.NewCycleState(), podInfo.Pod)
// The fitError is expected to be:
// 0/2 nodes are available: 1 Too many pods, 1 node(s) didn't match service affinity.
if fitError == nil {
@ -301,7 +301,7 @@ func TestCustomResourceEnqueue(t *testing.T) {
t.Fatalf("Cannot find the profile for Pod %v", podInfo.Pod.Name)
}
// Schedule the Pod manually.
_, fitError := testCtx.Scheduler.Algorithm.Schedule(ctx, fwk, framework.NewCycleState(), podInfo.Pod)
_, fitError := testCtx.Scheduler.Algorithm.Schedule(ctx, nil, fwk, framework.NewCycleState(), podInfo.Pod)
// The fitError is expected to be non-nil as it failed the fakeCRPlugin plugin.
if fitError == nil {
t.Fatalf("Expect Pod %v to fail at scheduling.", podInfo.Pod.Name)