mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-26 21:17:23 +00:00
Merge pull request #102558 from Jerry-Ge/unroll-extenders
Unroll ScheduleAlgorithm#Extenders() to Scheduler.go
This commit is contained in:
commit
521be6344e
@ -1394,7 +1394,7 @@ func TestCompatibility_v1_Scheduler(t *testing.T) {
|
|||||||
t.Errorf("unexpected plugins diff (-want, +got): %s", diff)
|
t.Errorf("unexpected plugins diff (-want, +got): %s", diff)
|
||||||
}
|
}
|
||||||
|
|
||||||
gotExtenders := sched.Algorithm.Extenders()
|
gotExtenders := sched.Extenders
|
||||||
var wantExtenders []*core.HTTPExtender
|
var wantExtenders []*core.HTTPExtender
|
||||||
for _, e := range tc.wantExtenders {
|
for _, e := range tc.wantExtenders {
|
||||||
extender, err := core.NewHTTPExtender(&e)
|
extender, err := core.NewHTTPExtender(&e)
|
||||||
|
@ -280,10 +280,9 @@ func TestGenericSchedulerWithExtenders(t *testing.T) {
|
|||||||
scheduler := NewGenericScheduler(
|
scheduler := NewGenericScheduler(
|
||||||
cache,
|
cache,
|
||||||
emptySnapshot,
|
emptySnapshot,
|
||||||
extenders,
|
|
||||||
schedulerapi.DefaultPercentageOfNodesToScore)
|
schedulerapi.DefaultPercentageOfNodesToScore)
|
||||||
podIgnored := &v1.Pod{}
|
podIgnored := &v1.Pod{}
|
||||||
result, err := scheduler.Schedule(context.Background(), fwk, framework.NewCycleState(), podIgnored)
|
result, err := scheduler.Schedule(context.Background(), extenders, fwk, framework.NewCycleState(), podIgnored)
|
||||||
if test.expectsErr {
|
if test.expectsErr {
|
||||||
if err == nil {
|
if err == nil {
|
||||||
t.Errorf("Unexpected non-error, result %+v", result)
|
t.Errorf("Unexpected non-error, result %+v", result)
|
||||||
|
@ -59,10 +59,7 @@ var ErrNoNodesAvailable = fmt.Errorf("no nodes available to schedule pods")
|
|||||||
// onto machines.
|
// onto machines.
|
||||||
// TODO: Rename this type.
|
// TODO: Rename this type.
|
||||||
type ScheduleAlgorithm interface {
|
type ScheduleAlgorithm interface {
|
||||||
Schedule(context.Context, framework.Framework, *framework.CycleState, *v1.Pod) (scheduleResult ScheduleResult, err error)
|
Schedule(context.Context, []framework.Extender, framework.Framework, *framework.CycleState, *v1.Pod) (scheduleResult ScheduleResult, err error)
|
||||||
// Extenders returns a slice of extender config. This is exposed for
|
|
||||||
// testing.
|
|
||||||
Extenders() []framework.Extender
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// ScheduleResult represents the result of one pod scheduled. It will contain
|
// ScheduleResult represents the result of one pod scheduled. It will contain
|
||||||
@ -78,7 +75,6 @@ type ScheduleResult struct {
|
|||||||
|
|
||||||
type genericScheduler struct {
|
type genericScheduler struct {
|
||||||
cache internalcache.Cache
|
cache internalcache.Cache
|
||||||
extenders []framework.Extender
|
|
||||||
nodeInfoSnapshot *internalcache.Snapshot
|
nodeInfoSnapshot *internalcache.Snapshot
|
||||||
percentageOfNodesToScore int32
|
percentageOfNodesToScore int32
|
||||||
nextStartNodeIndex int
|
nextStartNodeIndex int
|
||||||
@ -94,7 +90,7 @@ func (g *genericScheduler) snapshot() error {
|
|||||||
// Schedule tries to schedule the given pod to one of the nodes in the node list.
|
// Schedule tries to schedule the given pod to one of the nodes in the node list.
|
||||||
// If it succeeds, it will return the name of the node.
|
// If it succeeds, it will return the name of the node.
|
||||||
// If it fails, it will return a FitError error with reasons.
|
// If it fails, it will return a FitError error with reasons.
|
||||||
func (g *genericScheduler) Schedule(ctx context.Context, fwk framework.Framework, state *framework.CycleState, pod *v1.Pod) (result ScheduleResult, err error) {
|
func (g *genericScheduler) Schedule(ctx context.Context, extenders []framework.Extender, fwk framework.Framework, state *framework.CycleState, pod *v1.Pod) (result ScheduleResult, err error) {
|
||||||
trace := utiltrace.New("Scheduling", utiltrace.Field{Key: "namespace", Value: pod.Namespace}, utiltrace.Field{Key: "name", Value: pod.Name})
|
trace := utiltrace.New("Scheduling", utiltrace.Field{Key: "namespace", Value: pod.Namespace}, utiltrace.Field{Key: "name", Value: pod.Name})
|
||||||
defer trace.LogIfLong(100 * time.Millisecond)
|
defer trace.LogIfLong(100 * time.Millisecond)
|
||||||
|
|
||||||
@ -107,7 +103,7 @@ func (g *genericScheduler) Schedule(ctx context.Context, fwk framework.Framework
|
|||||||
return result, ErrNoNodesAvailable
|
return result, ErrNoNodesAvailable
|
||||||
}
|
}
|
||||||
|
|
||||||
feasibleNodes, diagnosis, err := g.findNodesThatFitPod(ctx, fwk, state, pod)
|
feasibleNodes, diagnosis, err := g.findNodesThatFitPod(ctx, extenders, fwk, state, pod)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return result, err
|
return result, err
|
||||||
}
|
}
|
||||||
@ -130,7 +126,7 @@ func (g *genericScheduler) Schedule(ctx context.Context, fwk framework.Framework
|
|||||||
}, nil
|
}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
priorityList, err := g.prioritizeNodes(ctx, fwk, state, pod, feasibleNodes)
|
priorityList, err := prioritizeNodes(ctx, extenders, fwk, state, pod, feasibleNodes)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return result, err
|
return result, err
|
||||||
}
|
}
|
||||||
@ -145,10 +141,6 @@ func (g *genericScheduler) Schedule(ctx context.Context, fwk framework.Framework
|
|||||||
}, err
|
}, err
|
||||||
}
|
}
|
||||||
|
|
||||||
func (g *genericScheduler) Extenders() []framework.Extender {
|
|
||||||
return g.extenders
|
|
||||||
}
|
|
||||||
|
|
||||||
// selectHost takes a prioritized list of nodes and then picks one
|
// selectHost takes a prioritized list of nodes and then picks one
|
||||||
// in a reservoir sampling manner from the nodes that had the highest score.
|
// in a reservoir sampling manner from the nodes that had the highest score.
|
||||||
func (g *genericScheduler) selectHost(nodeScoreList framework.NodeScoreList) (string, error) {
|
func (g *genericScheduler) selectHost(nodeScoreList framework.NodeScoreList) (string, error) {
|
||||||
@ -198,7 +190,7 @@ func (g *genericScheduler) numFeasibleNodesToFind(numAllNodes int32) (numNodes i
|
|||||||
return numNodes
|
return numNodes
|
||||||
}
|
}
|
||||||
|
|
||||||
func (g *genericScheduler) evaluateNominatedNode(ctx context.Context, pod *v1.Pod, fwk framework.Framework, state *framework.CycleState, diagnosis framework.Diagnosis) ([]*v1.Node, error) {
|
func (g *genericScheduler) evaluateNominatedNode(ctx context.Context, extenders []framework.Extender, pod *v1.Pod, fwk framework.Framework, state *framework.CycleState, diagnosis framework.Diagnosis) ([]*v1.Node, error) {
|
||||||
nnn := pod.Status.NominatedNodeName
|
nnn := pod.Status.NominatedNodeName
|
||||||
nodeInfo, err := g.nodeInfoSnapshot.Get(nnn)
|
nodeInfo, err := g.nodeInfoSnapshot.Get(nnn)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@ -210,7 +202,7 @@ func (g *genericScheduler) evaluateNominatedNode(ctx context.Context, pod *v1.Po
|
|||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
feasibleNodes, err = g.findNodesThatPassExtenders(pod, feasibleNodes, diagnosis.NodeToStatusMap)
|
feasibleNodes, err = findNodesThatPassExtenders(extenders, pod, feasibleNodes, diagnosis.NodeToStatusMap)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
@ -220,7 +212,7 @@ func (g *genericScheduler) evaluateNominatedNode(ctx context.Context, pod *v1.Po
|
|||||||
|
|
||||||
// Filters the nodes to find the ones that fit the pod based on the framework
|
// Filters the nodes to find the ones that fit the pod based on the framework
|
||||||
// filter plugins and filter extenders.
|
// filter plugins and filter extenders.
|
||||||
func (g *genericScheduler) findNodesThatFitPod(ctx context.Context, fwk framework.Framework, state *framework.CycleState, pod *v1.Pod) ([]*v1.Node, framework.Diagnosis, error) {
|
func (g *genericScheduler) findNodesThatFitPod(ctx context.Context, extenders []framework.Extender, fwk framework.Framework, state *framework.CycleState, pod *v1.Pod) ([]*v1.Node, framework.Diagnosis, error) {
|
||||||
diagnosis := framework.Diagnosis{
|
diagnosis := framework.Diagnosis{
|
||||||
NodeToStatusMap: make(framework.NodeToStatusMap),
|
NodeToStatusMap: make(framework.NodeToStatusMap),
|
||||||
UnschedulablePlugins: sets.NewString(),
|
UnschedulablePlugins: sets.NewString(),
|
||||||
@ -249,7 +241,7 @@ func (g *genericScheduler) findNodesThatFitPod(ctx context.Context, fwk framewor
|
|||||||
// "NominatedNodeName" can potentially be set in a previous scheduling cycle as a result of preemption.
|
// "NominatedNodeName" can potentially be set in a previous scheduling cycle as a result of preemption.
|
||||||
// This node is likely the only candidate that will fit the pod, and hence we try it first before iterating over all nodes.
|
// This node is likely the only candidate that will fit the pod, and hence we try it first before iterating over all nodes.
|
||||||
if len(pod.Status.NominatedNodeName) > 0 && feature.DefaultFeatureGate.Enabled(features.PreferNominatedNode) {
|
if len(pod.Status.NominatedNodeName) > 0 && feature.DefaultFeatureGate.Enabled(features.PreferNominatedNode) {
|
||||||
feasibleNodes, err := g.evaluateNominatedNode(ctx, pod, fwk, state, diagnosis)
|
feasibleNodes, err := g.evaluateNominatedNode(ctx, extenders, pod, fwk, state, diagnosis)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
klog.ErrorS(err, "Evaluation failed on nominated node", "pod", klog.KObj(pod), "node", pod.Status.NominatedNodeName)
|
klog.ErrorS(err, "Evaluation failed on nominated node", "pod", klog.KObj(pod), "node", pod.Status.NominatedNodeName)
|
||||||
}
|
}
|
||||||
@ -263,7 +255,7 @@ func (g *genericScheduler) findNodesThatFitPod(ctx context.Context, fwk framewor
|
|||||||
return nil, diagnosis, err
|
return nil, diagnosis, err
|
||||||
}
|
}
|
||||||
|
|
||||||
feasibleNodes, err = g.findNodesThatPassExtenders(pod, feasibleNodes, diagnosis.NodeToStatusMap)
|
feasibleNodes, err = findNodesThatPassExtenders(extenders, pod, feasibleNodes, diagnosis.NodeToStatusMap)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, diagnosis, err
|
return nil, diagnosis, err
|
||||||
}
|
}
|
||||||
@ -345,11 +337,11 @@ func (g *genericScheduler) findNodesThatPassFilters(
|
|||||||
return feasibleNodes, nil
|
return feasibleNodes, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (g *genericScheduler) findNodesThatPassExtenders(pod *v1.Pod, feasibleNodes []*v1.Node, statuses framework.NodeToStatusMap) ([]*v1.Node, error) {
|
func findNodesThatPassExtenders(extenders []framework.Extender, pod *v1.Pod, feasibleNodes []*v1.Node, statuses framework.NodeToStatusMap) ([]*v1.Node, error) {
|
||||||
// Extenders are called sequentially.
|
// Extenders are called sequentially.
|
||||||
// Nodes in original feasibleNodes can be excluded in one extender, and pass on to the next
|
// Nodes in original feasibleNodes can be excluded in one extender, and pass on to the next
|
||||||
// extender in a decreasing manner.
|
// extender in a decreasing manner.
|
||||||
for _, extender := range g.extenders {
|
for _, extender := range extenders {
|
||||||
if len(feasibleNodes) == 0 {
|
if len(feasibleNodes) == 0 {
|
||||||
break
|
break
|
||||||
}
|
}
|
||||||
@ -403,8 +395,9 @@ func (g *genericScheduler) findNodesThatPassExtenders(pod *v1.Pod, feasibleNodes
|
|||||||
// The scores from each plugin are added together to make the score for that node, then
|
// The scores from each plugin are added together to make the score for that node, then
|
||||||
// any extenders are run as well.
|
// any extenders are run as well.
|
||||||
// All scores are finally combined (added) to get the total weighted scores of all nodes
|
// All scores are finally combined (added) to get the total weighted scores of all nodes
|
||||||
func (g *genericScheduler) prioritizeNodes(
|
func prioritizeNodes(
|
||||||
ctx context.Context,
|
ctx context.Context,
|
||||||
|
extenders []framework.Extender,
|
||||||
fwk framework.Framework,
|
fwk framework.Framework,
|
||||||
state *framework.CycleState,
|
state *framework.CycleState,
|
||||||
pod *v1.Pod,
|
pod *v1.Pod,
|
||||||
@ -412,7 +405,7 @@ func (g *genericScheduler) prioritizeNodes(
|
|||||||
) (framework.NodeScoreList, error) {
|
) (framework.NodeScoreList, error) {
|
||||||
// If no priority configs are provided, then all nodes will have a score of one.
|
// If no priority configs are provided, then all nodes will have a score of one.
|
||||||
// This is required to generate the priority list in the required format
|
// This is required to generate the priority list in the required format
|
||||||
if len(g.extenders) == 0 && !fwk.HasScorePlugins() {
|
if len(extenders) == 0 && !fwk.HasScorePlugins() {
|
||||||
result := make(framework.NodeScoreList, 0, len(nodes))
|
result := make(framework.NodeScoreList, 0, len(nodes))
|
||||||
for i := range nodes {
|
for i := range nodes {
|
||||||
result = append(result, framework.NodeScore{
|
result = append(result, framework.NodeScore{
|
||||||
@ -453,12 +446,12 @@ func (g *genericScheduler) prioritizeNodes(
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if len(g.extenders) != 0 && nodes != nil {
|
if len(extenders) != 0 && nodes != nil {
|
||||||
var mu sync.Mutex
|
var mu sync.Mutex
|
||||||
var wg sync.WaitGroup
|
var wg sync.WaitGroup
|
||||||
combinedScores := make(map[string]int64, len(nodes))
|
combinedScores := make(map[string]int64, len(nodes))
|
||||||
for i := range g.extenders {
|
for i := range extenders {
|
||||||
if !g.extenders[i].IsInterested(pod) {
|
if !extenders[i].IsInterested(pod) {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
wg.Add(1)
|
wg.Add(1)
|
||||||
@ -468,7 +461,7 @@ func (g *genericScheduler) prioritizeNodes(
|
|||||||
metrics.SchedulerGoroutines.WithLabelValues(metrics.PrioritizingExtender).Dec()
|
metrics.SchedulerGoroutines.WithLabelValues(metrics.PrioritizingExtender).Dec()
|
||||||
wg.Done()
|
wg.Done()
|
||||||
}()
|
}()
|
||||||
prioritizedList, weight, err := g.extenders[extIndex].Prioritize(pod, nodes)
|
prioritizedList, weight, err := extenders[extIndex].Prioritize(pod, nodes)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
// Prioritization errors from extender can be ignored, let k8s/other extenders determine the priorities
|
// Prioritization errors from extender can be ignored, let k8s/other extenders determine the priorities
|
||||||
return
|
return
|
||||||
@ -477,7 +470,7 @@ func (g *genericScheduler) prioritizeNodes(
|
|||||||
for i := range *prioritizedList {
|
for i := range *prioritizedList {
|
||||||
host, score := (*prioritizedList)[i].Host, (*prioritizedList)[i].Score
|
host, score := (*prioritizedList)[i].Host, (*prioritizedList)[i].Score
|
||||||
if klog.V(10).Enabled() {
|
if klog.V(10).Enabled() {
|
||||||
klog.InfoS("Extender scored node for pod", "pod", klog.KObj(pod), "extender", g.extenders[extIndex].Name(), "node", host, "score", score)
|
klog.InfoS("Extender scored node for pod", "pod", klog.KObj(pod), "extender", extenders[extIndex].Name(), "node", host, "score", score)
|
||||||
}
|
}
|
||||||
combinedScores[host] += score * weight
|
combinedScores[host] += score * weight
|
||||||
}
|
}
|
||||||
@ -505,11 +498,9 @@ func (g *genericScheduler) prioritizeNodes(
|
|||||||
func NewGenericScheduler(
|
func NewGenericScheduler(
|
||||||
cache internalcache.Cache,
|
cache internalcache.Cache,
|
||||||
nodeInfoSnapshot *internalcache.Snapshot,
|
nodeInfoSnapshot *internalcache.Snapshot,
|
||||||
extenders []framework.Extender,
|
|
||||||
percentageOfNodesToScore int32) ScheduleAlgorithm {
|
percentageOfNodesToScore int32) ScheduleAlgorithm {
|
||||||
return &genericScheduler{
|
return &genericScheduler{
|
||||||
cache: cache,
|
cache: cache,
|
||||||
extenders: extenders,
|
|
||||||
nodeInfoSnapshot: nodeInfoSnapshot,
|
nodeInfoSnapshot: nodeInfoSnapshot,
|
||||||
percentageOfNodesToScore: percentageOfNodesToScore,
|
percentageOfNodesToScore: percentageOfNodesToScore,
|
||||||
}
|
}
|
||||||
|
@ -425,11 +425,9 @@ func TestFindNodesThatPassExtenders(t *testing.T) {
|
|||||||
for ii := range tt.extenders {
|
for ii := range tt.extenders {
|
||||||
extenders = append(extenders, &tt.extenders[ii])
|
extenders = append(extenders, &tt.extenders[ii])
|
||||||
}
|
}
|
||||||
scheduler := &genericScheduler{
|
|
||||||
extenders: extenders,
|
|
||||||
}
|
|
||||||
pod := &v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "1", UID: types.UID("1")}}
|
pod := &v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "1", UID: types.UID("1")}}
|
||||||
got, err := scheduler.findNodesThatPassExtenders(pod, tt.nodes, tt.filteredNodesStatuses)
|
got, err := findNodesThatPassExtenders(extenders, pod, tt.nodes, tt.filteredNodesStatuses)
|
||||||
if tt.expectsErr {
|
if tt.expectsErr {
|
||||||
if err == nil {
|
if err == nil {
|
||||||
t.Error("Unexpected non-error")
|
t.Error("Unexpected non-error")
|
||||||
@ -1006,13 +1004,12 @@ func TestGenericScheduler(t *testing.T) {
|
|||||||
scheduler := NewGenericScheduler(
|
scheduler := NewGenericScheduler(
|
||||||
cache,
|
cache,
|
||||||
snapshot,
|
snapshot,
|
||||||
[]framework.Extender{},
|
|
||||||
schedulerapi.DefaultPercentageOfNodesToScore,
|
schedulerapi.DefaultPercentageOfNodesToScore,
|
||||||
)
|
)
|
||||||
informerFactory.Start(ctx.Done())
|
informerFactory.Start(ctx.Done())
|
||||||
informerFactory.WaitForCacheSync(ctx.Done())
|
informerFactory.WaitForCacheSync(ctx.Done())
|
||||||
|
|
||||||
result, err := scheduler.Schedule(ctx, fwk, framework.NewCycleState(), test.pod)
|
result, err := scheduler.Schedule(ctx, nil, fwk, framework.NewCycleState(), test.pod)
|
||||||
// TODO(#94696): replace reflect.DeepEqual with cmp.Diff().
|
// TODO(#94696): replace reflect.DeepEqual with cmp.Diff().
|
||||||
if err != test.wErr {
|
if err != test.wErr {
|
||||||
gotFitErr, gotOK := err.(*framework.FitError)
|
gotFitErr, gotOK := err.(*framework.FitError)
|
||||||
@ -1043,7 +1040,6 @@ func makeScheduler(nodes []*v1.Node) *genericScheduler {
|
|||||||
s := NewGenericScheduler(
|
s := NewGenericScheduler(
|
||||||
cache,
|
cache,
|
||||||
emptySnapshot,
|
emptySnapshot,
|
||||||
nil,
|
|
||||||
schedulerapi.DefaultPercentageOfNodesToScore)
|
schedulerapi.DefaultPercentageOfNodesToScore)
|
||||||
cache.UpdateSnapshot(s.(*genericScheduler).nodeInfoSnapshot)
|
cache.UpdateSnapshot(s.(*genericScheduler).nodeInfoSnapshot)
|
||||||
return s.(*genericScheduler)
|
return s.(*genericScheduler)
|
||||||
@ -1066,7 +1062,7 @@ func TestFindFitAllError(t *testing.T) {
|
|||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
|
|
||||||
_, diagnosis, err := scheduler.findNodesThatFitPod(context.Background(), fwk, framework.NewCycleState(), &v1.Pod{})
|
_, diagnosis, err := scheduler.findNodesThatFitPod(context.Background(), nil, fwk, framework.NewCycleState(), &v1.Pod{})
|
||||||
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Errorf("unexpected error: %v", err)
|
t.Errorf("unexpected error: %v", err)
|
||||||
@ -1100,7 +1096,7 @@ func TestFindFitSomeError(t *testing.T) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
pod := &v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "1", UID: types.UID("1")}}
|
pod := &v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "1", UID: types.UID("1")}}
|
||||||
_, diagnosis, err := scheduler.findNodesThatFitPod(context.Background(), fwk, framework.NewCycleState(), pod)
|
_, diagnosis, err := scheduler.findNodesThatFitPod(context.Background(), nil, fwk, framework.NewCycleState(), pod)
|
||||||
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Errorf("unexpected error: %v", err)
|
t.Errorf("unexpected error: %v", err)
|
||||||
@ -1179,7 +1175,7 @@ func TestFindFitPredicateCallCounts(t *testing.T) {
|
|||||||
}
|
}
|
||||||
fwk.AddNominatedPod(framework.NewPodInfo(&v1.Pod{ObjectMeta: metav1.ObjectMeta{UID: "nominated"}, Spec: v1.PodSpec{Priority: &midPriority}}), "1")
|
fwk.AddNominatedPod(framework.NewPodInfo(&v1.Pod{ObjectMeta: metav1.ObjectMeta{UID: "nominated"}, Spec: v1.PodSpec{Priority: &midPriority}}), "1")
|
||||||
|
|
||||||
_, _, err = scheduler.findNodesThatFitPod(context.Background(), fwk, framework.NewCycleState(), test.pod)
|
_, _, err = scheduler.findNodesThatFitPod(context.Background(), nil, fwk, framework.NewCycleState(), test.pod)
|
||||||
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Errorf("unexpected error: %v", err)
|
t.Errorf("unexpected error: %v", err)
|
||||||
@ -1338,18 +1334,17 @@ func TestZeroRequest(t *testing.T) {
|
|||||||
scheduler := NewGenericScheduler(
|
scheduler := NewGenericScheduler(
|
||||||
nil,
|
nil,
|
||||||
emptySnapshot,
|
emptySnapshot,
|
||||||
[]framework.Extender{},
|
|
||||||
schedulerapi.DefaultPercentageOfNodesToScore).(*genericScheduler)
|
schedulerapi.DefaultPercentageOfNodesToScore).(*genericScheduler)
|
||||||
scheduler.nodeInfoSnapshot = snapshot
|
scheduler.nodeInfoSnapshot = snapshot
|
||||||
|
|
||||||
ctx := context.Background()
|
ctx := context.Background()
|
||||||
state := framework.NewCycleState()
|
state := framework.NewCycleState()
|
||||||
_, _, err = scheduler.findNodesThatFitPod(ctx, fwk, state, test.pod)
|
_, _, err = scheduler.findNodesThatFitPod(ctx, nil, fwk, state, test.pod)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatalf("error filtering nodes: %+v", err)
|
t.Fatalf("error filtering nodes: %+v", err)
|
||||||
}
|
}
|
||||||
fwk.RunPreScorePlugins(ctx, state, test.pod, test.nodes)
|
fwk.RunPreScorePlugins(ctx, state, test.pod, test.nodes)
|
||||||
list, err := scheduler.prioritizeNodes(ctx, fwk, state, test.pod, test.nodes)
|
list, err := prioritizeNodes(ctx, nil, fwk, state, test.pod, test.nodes)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Errorf("unexpected error: %v", err)
|
t.Errorf("unexpected error: %v", err)
|
||||||
}
|
}
|
||||||
@ -1444,7 +1439,7 @@ func TestFairEvaluationForNodes(t *testing.T) {
|
|||||||
|
|
||||||
// Iterating over all nodes more than twice
|
// Iterating over all nodes more than twice
|
||||||
for i := 0; i < 2*(numAllNodes/nodesToFind+1); i++ {
|
for i := 0; i < 2*(numAllNodes/nodesToFind+1); i++ {
|
||||||
nodesThatFit, _, err := g.findNodesThatFitPod(context.Background(), fwk, framework.NewCycleState(), &v1.Pod{})
|
nodesThatFit, _, err := g.findNodesThatFitPod(context.Background(), nil, fwk, framework.NewCycleState(), &v1.Pod{})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Errorf("unexpected error: %v", err)
|
t.Errorf("unexpected error: %v", err)
|
||||||
}
|
}
|
||||||
@ -1527,10 +1522,9 @@ func TestPreferNominatedNodeFilterCallCounts(t *testing.T) {
|
|||||||
scheduler := NewGenericScheduler(
|
scheduler := NewGenericScheduler(
|
||||||
cache,
|
cache,
|
||||||
snapshot,
|
snapshot,
|
||||||
[]framework.Extender{},
|
|
||||||
schedulerapi.DefaultPercentageOfNodesToScore).(*genericScheduler)
|
schedulerapi.DefaultPercentageOfNodesToScore).(*genericScheduler)
|
||||||
|
|
||||||
_, _, err = scheduler.findNodesThatFitPod(context.Background(), fwk, framework.NewCycleState(), test.pod)
|
_, _, err = scheduler.findNodesThatFitPod(context.Background(), nil, fwk, framework.NewCycleState(), test.pod)
|
||||||
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Errorf("unexpected error: %v", err)
|
t.Errorf("unexpected error: %v", err)
|
||||||
|
@ -180,13 +180,13 @@ func (c *Configurator) create() (*Scheduler, error) {
|
|||||||
algo := core.NewGenericScheduler(
|
algo := core.NewGenericScheduler(
|
||||||
c.schedulerCache,
|
c.schedulerCache,
|
||||||
c.nodeInfoSnapshot,
|
c.nodeInfoSnapshot,
|
||||||
extenders,
|
|
||||||
c.percentageOfNodesToScore,
|
c.percentageOfNodesToScore,
|
||||||
)
|
)
|
||||||
|
|
||||||
return &Scheduler{
|
return &Scheduler{
|
||||||
SchedulerCache: c.schedulerCache,
|
SchedulerCache: c.schedulerCache,
|
||||||
Algorithm: algo,
|
Algorithm: algo,
|
||||||
|
Extenders: extenders,
|
||||||
Profiles: profiles,
|
Profiles: profiles,
|
||||||
NextPod: internalqueue.MakeNextPodFunc(podQueue),
|
NextPod: internalqueue.MakeNextPodFunc(podQueue),
|
||||||
Error: MakeDefaultErrorFunc(c.client, c.informerFactory.Core().V1().Pods().Lister(), podQueue, c.schedulerCache),
|
Error: MakeDefaultErrorFunc(c.client, c.informerFactory.Core().V1().Pods().Lister(), podQueue, c.schedulerCache),
|
||||||
|
@ -71,6 +71,8 @@ type Scheduler struct {
|
|||||||
|
|
||||||
Algorithm core.ScheduleAlgorithm
|
Algorithm core.ScheduleAlgorithm
|
||||||
|
|
||||||
|
Extenders []framework.Extender
|
||||||
|
|
||||||
// NextPod should be a function that blocks until the next pod
|
// NextPod should be a function that blocks until the next pod
|
||||||
// is available. We don't use a channel for this, because scheduling
|
// is available. We don't use a channel for this, because scheduling
|
||||||
// a pod may take some amount of time and we don't want pods to get
|
// a pod may take some amount of time and we don't want pods to get
|
||||||
@ -461,7 +463,7 @@ func (sched *Scheduler) bind(ctx context.Context, fwk framework.Framework, assum
|
|||||||
|
|
||||||
// TODO(#87159): Move this to a Plugin.
|
// TODO(#87159): Move this to a Plugin.
|
||||||
func (sched *Scheduler) extendersBinding(pod *v1.Pod, node string) (bool, error) {
|
func (sched *Scheduler) extendersBinding(pod *v1.Pod, node string) (bool, error) {
|
||||||
for _, extender := range sched.Algorithm.Extenders() {
|
for _, extender := range sched.Extenders {
|
||||||
if !extender.IsBinder() || !extender.IsInterested(pod) {
|
if !extender.IsBinder() || !extender.IsInterested(pod) {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
@ -512,7 +514,7 @@ func (sched *Scheduler) scheduleOne(ctx context.Context) {
|
|||||||
state.SetRecordPluginMetrics(rand.Intn(100) < pluginMetricsSamplePercent)
|
state.SetRecordPluginMetrics(rand.Intn(100) < pluginMetricsSamplePercent)
|
||||||
schedulingCycleCtx, cancel := context.WithCancel(ctx)
|
schedulingCycleCtx, cancel := context.WithCancel(ctx)
|
||||||
defer cancel()
|
defer cancel()
|
||||||
scheduleResult, err := sched.Algorithm.Schedule(schedulingCycleCtx, fwk, state, pod)
|
scheduleResult, err := sched.Algorithm.Schedule(schedulingCycleCtx, sched.Extenders, fwk, state, pod)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
// Schedule() may have failed because the pod would not fit on any host, so we try to
|
// Schedule() may have failed because the pod would not fit on any host, so we try to
|
||||||
// preempt, with the expectation that the next time the pod is tried for scheduling it
|
// preempt, with the expectation that the next time the pod is tried for scheduling it
|
||||||
|
@ -114,14 +114,10 @@ type mockScheduler struct {
|
|||||||
err error
|
err error
|
||||||
}
|
}
|
||||||
|
|
||||||
func (es mockScheduler) Schedule(ctx context.Context, fwk framework.Framework, state *framework.CycleState, pod *v1.Pod) (core.ScheduleResult, error) {
|
func (es mockScheduler) Schedule(ctx context.Context, extenders []framework.Extender, fwk framework.Framework, state *framework.CycleState, pod *v1.Pod) (core.ScheduleResult, error) {
|
||||||
return es.result, es.err
|
return es.result, es.err
|
||||||
}
|
}
|
||||||
|
|
||||||
func (es mockScheduler) Extenders() []framework.Extender {
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestSchedulerCreation(t *testing.T) {
|
func TestSchedulerCreation(t *testing.T) {
|
||||||
invalidRegistry := map[string]frameworkruntime.PluginFactory{
|
invalidRegistry := map[string]frameworkruntime.PluginFactory{
|
||||||
defaultbinder.Name: defaultbinder.New,
|
defaultbinder.Name: defaultbinder.New,
|
||||||
@ -837,7 +833,6 @@ func setupTestScheduler(queuedPodStore *clientcache.FIFO, scache internalcache.C
|
|||||||
algo := core.NewGenericScheduler(
|
algo := core.NewGenericScheduler(
|
||||||
scache,
|
scache,
|
||||||
internalcache.NewEmptySnapshot(),
|
internalcache.NewEmptySnapshot(),
|
||||||
[]framework.Extender{},
|
|
||||||
schedulerapi.DefaultPercentageOfNodesToScore,
|
schedulerapi.DefaultPercentageOfNodesToScore,
|
||||||
)
|
)
|
||||||
|
|
||||||
@ -1184,11 +1179,11 @@ func TestSchedulerBinding(t *testing.T) {
|
|||||||
algo := core.NewGenericScheduler(
|
algo := core.NewGenericScheduler(
|
||||||
scache,
|
scache,
|
||||||
nil,
|
nil,
|
||||||
test.extenders,
|
|
||||||
0,
|
0,
|
||||||
)
|
)
|
||||||
sched := Scheduler{
|
sched := Scheduler{
|
||||||
Algorithm: algo,
|
Algorithm: algo,
|
||||||
|
Extenders: test.extenders,
|
||||||
SchedulerCache: scache,
|
SchedulerCache: scache,
|
||||||
}
|
}
|
||||||
err = sched.bind(context.Background(), fwk, pod, "node", nil)
|
err = sched.bind(context.Background(), fwk, pod, "node", nil)
|
||||||
|
@ -137,7 +137,7 @@ func TestServiceAffinityEnqueue(t *testing.T) {
|
|||||||
t.Fatalf("Cannot find the profile for Pod %v", podInfo.Pod.Name)
|
t.Fatalf("Cannot find the profile for Pod %v", podInfo.Pod.Name)
|
||||||
}
|
}
|
||||||
// Schedule the Pod manually.
|
// Schedule the Pod manually.
|
||||||
_, fitError := testCtx.Scheduler.Algorithm.Schedule(ctx, fwk, framework.NewCycleState(), podInfo.Pod)
|
_, fitError := testCtx.Scheduler.Algorithm.Schedule(ctx, nil, fwk, framework.NewCycleState(), podInfo.Pod)
|
||||||
// The fitError is expected to be:
|
// The fitError is expected to be:
|
||||||
// 0/2 nodes are available: 1 Too many pods, 1 node(s) didn't match service affinity.
|
// 0/2 nodes are available: 1 Too many pods, 1 node(s) didn't match service affinity.
|
||||||
if fitError == nil {
|
if fitError == nil {
|
||||||
@ -301,7 +301,7 @@ func TestCustomResourceEnqueue(t *testing.T) {
|
|||||||
t.Fatalf("Cannot find the profile for Pod %v", podInfo.Pod.Name)
|
t.Fatalf("Cannot find the profile for Pod %v", podInfo.Pod.Name)
|
||||||
}
|
}
|
||||||
// Schedule the Pod manually.
|
// Schedule the Pod manually.
|
||||||
_, fitError := testCtx.Scheduler.Algorithm.Schedule(ctx, fwk, framework.NewCycleState(), podInfo.Pod)
|
_, fitError := testCtx.Scheduler.Algorithm.Schedule(ctx, nil, fwk, framework.NewCycleState(), podInfo.Pod)
|
||||||
// The fitError is expected to be non-nil as it failed the fakeCRPlugin plugin.
|
// The fitError is expected to be non-nil as it failed the fakeCRPlugin plugin.
|
||||||
if fitError == nil {
|
if fitError == nil {
|
||||||
t.Fatalf("Expect Pod %v to fail at scheduling.", podInfo.Pod.Name)
|
t.Fatalf("Expect Pod %v to fail at scheduling.", podInfo.Pod.Name)
|
||||||
|
Loading…
Reference in New Issue
Block a user