Merge pull request #122435 from aleksandra-malinowska/scheduler-first-fit-3

Scheduler first fit
This commit is contained in:
Kubernetes Prow Robot 2023-12-21 18:24:04 +01:00 committed by GitHub
commit c8c845ae08
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 136 additions and 14 deletions

View File

@ -383,6 +383,11 @@ func (h *HTTPExtender) IsBinder() bool {
return h.bindVerb != ""
}
// IsPrioritizer returns whether this extender is configured for the Prioritize method.
func (h *HTTPExtender) IsPrioritizer() bool {
return h.prioritizeVerb != ""
}
// Helper function to send messages to the extender
func (h *HTTPExtender) send(action string, args interface{}, result interface{}) error {
out, err := json.Marshal(args)

View File

@ -93,6 +93,7 @@ func TestSchedulerWithExtenders(t *testing.T) {
registerPlugins: []tf.RegisterPluginFunc{
tf.RegisterFilterPlugin("TrueFilter", tf.NewTrueFilterPlugin),
tf.RegisterQueueSortPlugin(queuesort.Name, queuesort.New),
tf.RegisterScorePlugin("EqualPrioritizerPlugin", tf.NewEqualPrioritizerPlugin(), 20),
tf.RegisterBindPlugin(defaultbinder.Name, defaultbinder.New),
},
extenders: []tf.FakeExtender{
@ -245,6 +246,7 @@ func TestSchedulerWithExtenders(t *testing.T) {
// because of the errors from errorPredicateExtender.
registerPlugins: []tf.RegisterPluginFunc{
tf.RegisterFilterPlugin("TrueFilter", tf.NewTrueFilterPlugin),
tf.RegisterScorePlugin("EqualPrioritizerPlugin", tf.NewEqualPrioritizerPlugin(), 20),
tf.RegisterQueueSortPlugin(queuesort.Name, queuesort.New),
tf.RegisterBindPlugin(defaultbinder.Name, defaultbinder.New),
},
@ -268,6 +270,30 @@ func TestSchedulerWithExtenders(t *testing.T) {
},
name: "test 9",
},
{
registerPlugins: []tf.RegisterPluginFunc{
tf.RegisterFilterPlugin("TrueFilter", tf.NewTrueFilterPlugin),
tf.RegisterQueueSortPlugin(queuesort.Name, queuesort.New),
tf.RegisterBindPlugin(defaultbinder.Name, defaultbinder.New),
},
extenders: []tf.FakeExtender{
{
ExtenderName: "FakeExtender1",
Predicates: []tf.FitPredicate{tf.TruePredicateExtender},
},
{
ExtenderName: "FakeExtender2",
Predicates: []tf.FitPredicate{tf.Node1PredicateExtender},
},
},
nodes: []string{"node1", "node2"},
expectedResult: ScheduleResult{
SuggestedHost: "node1",
EvaluatedNodes: 1,
FeasibleNodes: 1,
},
name: "test 10",
},
}
for _, test := range tests {

View File

@ -50,6 +50,9 @@ type Extender interface {
// this pod is managed by this extender.
IsInterested(pod *v1.Pod) bool
// IsPrioritizer returns whether this extender is configured for the Prioritize method.
IsPrioritizer() bool
// ProcessPreemption returns nodes with their victim pods processed by extender based on
// given:
// 1. Pod to schedule

View File

@ -422,7 +422,7 @@ func (sched *Scheduler) schedulePod(ctx context.Context, fwk framework.Framework
}, nil
}
priorityList, err := prioritizeNodes(ctx, sched.Extenders, fwk, state, pod, feasibleNodes)
priorityList, err := sched.prioritizeNodes(ctx, fwk, state, pod, feasibleNodes)
if err != nil {
return result, err
}
@ -544,6 +544,19 @@ func (sched *Scheduler) evaluateNominatedNode(ctx context.Context, pod *v1.Pod,
return feasibleNodes, nil
}
// hasScoring checks if scoring nodes is configured.
func (sched *Scheduler) hasScoring(fwk framework.Framework) bool {
if fwk.HasScorePlugins() {
return true
}
for _, extender := range sched.Extenders {
if extender.IsPrioritizer() {
return true
}
}
return false
}
// findNodesThatPassFilters finds the nodes that fit the filter plugins.
func (sched *Scheduler) findNodesThatPassFilters(
ctx context.Context,
@ -554,6 +567,9 @@ func (sched *Scheduler) findNodesThatPassFilters(
nodes []*framework.NodeInfo) ([]*framework.NodeInfo, error) {
numAllNodes := len(nodes)
numNodesToFind := sched.numFeasibleNodesToFind(fwk.PercentageOfNodesToScore(), int32(numAllNodes))
if !sched.hasScoring(fwk) {
numNodesToFind = 1
}
// Create feasible list with enough space to avoid growing it
// and allow assigning.
@ -705,9 +721,8 @@ func findNodesThatPassExtenders(ctx context.Context, extenders []framework.Exten
// The scores from each plugin are added together to make the score for that node, then
// any extenders are run as well.
// All scores are finally combined (added) to get the total weighted scores of all nodes
func prioritizeNodes(
func (sched *Scheduler) prioritizeNodes(
ctx context.Context,
extenders []framework.Extender,
fwk framework.Framework,
state *framework.CycleState,
pod *v1.Pod,
@ -716,7 +731,7 @@ func prioritizeNodes(
logger := klog.FromContext(ctx)
// If no priority configs are provided, then all nodes will have a score of one.
// This is required to generate the priority list in the required format
if len(extenders) == 0 && !fwk.HasScorePlugins() {
if !sched.hasScoring(fwk) {
result := make([]framework.NodePluginScores, 0, len(nodes))
for i := range nodes {
result = append(result, framework.NodePluginScores{
@ -749,14 +764,17 @@ func prioritizeNodes(
}
}
if len(extenders) != 0 && nodes != nil {
if len(sched.Extenders) != 0 && nodes != nil {
// allNodeExtendersScores has all extenders scores for all nodes.
// It is keyed with node name.
allNodeExtendersScores := make(map[string]*framework.NodePluginScores, len(nodes))
var mu sync.Mutex
var wg sync.WaitGroup
for i := range extenders {
if !extenders[i].IsInterested(pod) {
for i := range sched.Extenders {
if !sched.Extenders[i].IsInterested(pod) {
continue
}
if !sched.Extenders[i].IsPrioritizer() {
continue
}
wg.Add(1)
@ -766,10 +784,10 @@ func prioritizeNodes(
metrics.Goroutines.WithLabelValues(metrics.PrioritizingExtender).Dec()
wg.Done()
}()
prioritizedList, weight, err := extenders[extIndex].Prioritize(pod, nodes)
prioritizedList, weight, err := sched.Extenders[extIndex].Prioritize(pod, nodes)
if err != nil {
// Prioritization errors from extender can be ignored, let k8s/other extenders determine the priorities
logger.V(5).Info("Failed to run extender's priority function. No score given by this extender.", "error", err, "pod", klog.KObj(pod), "extender", extenders[extIndex].Name())
logger.V(5).Info("Failed to run extender's priority function. No score given by this extender.", "error", err, "pod", klog.KObj(pod), "extender", sched.Extenders[extIndex].Name())
return
}
mu.Lock()
@ -778,7 +796,7 @@ func prioritizeNodes(
nodename := (*prioritizedList)[i].Host
score := (*prioritizedList)[i].Score
if loggerVTen.Enabled() {
loggerVTen.Info("Extender scored node for pod", "pod", klog.KObj(pod), "extender", extenders[extIndex].Name(), "node", nodename, "score", score)
loggerVTen.Info("Extender scored node for pod", "pod", klog.KObj(pod), "extender", sched.Extenders[extIndex].Name(), "node", nodename, "score", score)
}
// MaxExtenderPriority may diverge from the max priority used in the scheduler and defined by MaxNodeScore,
@ -788,11 +806,11 @@ func prioritizeNodes(
if allNodeExtendersScores[nodename] == nil {
allNodeExtendersScores[nodename] = &framework.NodePluginScores{
Name: nodename,
Scores: make([]framework.PluginScore, 0, len(extenders)),
Scores: make([]framework.PluginScore, 0, len(sched.Extenders)),
}
}
allNodeExtendersScores[nodename].Scores = append(allNodeExtendersScores[nodename].Scores, framework.PluginScore{
Name: extenders[extIndex].Name(),
Name: sched.Extenders[extIndex].Name(),
Score: finalscore,
})
allNodeExtendersScores[nodename].TotalScore += finalscore

View File

@ -91,6 +91,7 @@ type fakeExtender struct {
interestedPodName string
ignorable bool
gotBind bool
isPrioritizer bool
}
func (f *fakeExtender) Name() string {
@ -140,6 +141,10 @@ func (f *fakeExtender) IsInterested(pod *v1.Pod) bool {
return pod != nil && pod.Name == f.interestedPodName
}
func (f *fakeExtender) IsPrioritizer() bool {
return f.isPrioritizer
}
type falseMapPlugin struct{}
func newFalseMapPlugin() frameworkruntime.PluginFactory {
@ -1809,6 +1814,7 @@ func TestSchedulerSchedulePod(t *testing.T) {
registerPlugins: []tf.RegisterPluginFunc{
tf.RegisterQueueSortPlugin(queuesort.Name, queuesort.New),
tf.RegisterFilterPlugin("TrueFilter", tf.NewTrueFilterPlugin),
tf.RegisterScorePlugin("EqualPrioritizerPlugin", tf.NewEqualPrioritizerPlugin(), 20),
tf.RegisterBindPlugin(defaultbinder.Name, defaultbinder.New),
},
nodes: []string{"node1", "node2"},
@ -1926,6 +1932,7 @@ func TestSchedulerSchedulePod(t *testing.T) {
tf.RegisterQueueSortPlugin(queuesort.Name, queuesort.New),
tf.RegisterPreFilterPlugin(volumebinding.Name, frameworkruntime.FactoryAdapter(fts, volumebinding.New)),
tf.RegisterFilterPlugin("TrueFilter", tf.NewTrueFilterPlugin),
tf.RegisterScorePlugin("EqualPrioritizerPlugin", tf.NewEqualPrioritizerPlugin(), 20),
tf.RegisterBindPlugin(defaultbinder.Name, defaultbinder.New),
},
nodes: []string{"node1", "node2"},
@ -2039,6 +2046,7 @@ func TestSchedulerSchedulePod(t *testing.T) {
"PreFilter",
"Filter",
),
tf.RegisterScorePlugin("EqualPrioritizerPlugin", tf.NewEqualPrioritizerPlugin(), 20),
tf.RegisterBindPlugin(defaultbinder.Name, defaultbinder.New),
},
nodes: []string{"node1", "node2", "node3"},
@ -2312,6 +2320,7 @@ func TestSchedulerSchedulePod(t *testing.T) {
},
}, nil
}, "PreFilter", "Filter"),
tf.RegisterScorePlugin("EqualPrioritizerPlugin", tf.NewEqualPrioritizerPlugin(), 20),
tf.RegisterBindPlugin(defaultbinder.Name, defaultbinder.New),
},
nodes: []string{"node1", "node2", "node3"},
@ -2334,6 +2343,33 @@ func TestSchedulerSchedulePod(t *testing.T) {
pod: st.MakePod().Name("ignore").UID("ignore").Obj(),
wantNodes: sets.New("node1", "node2"),
},
{
name: "test without score plugin no extra nodes are evaluated",
registerPlugins: []tf.RegisterPluginFunc{
tf.RegisterQueueSortPlugin(queuesort.Name, queuesort.New),
tf.RegisterFilterPlugin("TrueFilter", tf.NewTrueFilterPlugin),
tf.RegisterBindPlugin(defaultbinder.Name, defaultbinder.New),
},
nodes: []string{"node1", "node2", "node3"},
pod: st.MakePod().Name("pod1").UID("pod1").Obj(),
wantNodes: sets.New("node1", "node2", "node3"),
wantEvaluatedNodes: ptr.To[int32](1),
},
{
name: "test no score plugin, prefilter plugin returning 2 nodes, only 1 node is evaluated",
registerPlugins: []tf.RegisterPluginFunc{
tf.RegisterQueueSortPlugin(queuesort.Name, queuesort.New),
tf.RegisterPreFilterPlugin(
"FakePreFilter",
tf.NewFakePreFilterPlugin("FakePreFilter", &framework.PreFilterResult{NodeNames: sets.New("node1", "node2")}, nil),
),
tf.RegisterBindPlugin(defaultbinder.Name, defaultbinder.New),
},
nodes: []string{"node1", "node2", "node3"},
pod: st.MakePod().Name("test-prefilter").UID("test-prefilter").Obj(),
wantNodes: sets.New("node1", "node2"),
wantEvaluatedNodes: ptr.To[int32](1),
},
}
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
@ -2707,7 +2743,7 @@ func TestZeroRequest(t *testing.T) {
t.Fatalf("error filtering nodes: %+v", err)
}
fwk.RunPreScorePlugins(ctx, state, test.pod, tf.BuildNodeInfos(test.nodes))
list, err := prioritizeNodes(ctx, nil, fwk, state, test.pod, tf.BuildNodeInfos(test.nodes))
list, err := sched.prioritizeNodes(ctx, fwk, state, test.pod, tf.BuildNodeInfos(test.nodes))
if err != nil {
t.Errorf("unexpected error: %v", err)
}
@ -3103,7 +3139,10 @@ func Test_prioritizeNodes(t *testing.T) {
for ii := range test.extenders {
extenders = append(extenders, &test.extenders[ii])
}
nodesscores, err := prioritizeNodes(ctx, extenders, fwk, state, test.pod, tf.BuildNodeInfos(test.nodes))
sched := &Scheduler{
Extenders: extenders,
}
nodesscores, err := sched.prioritizeNodes(ctx, fwk, state, test.pod, tf.BuildNodeInfos(test.nodes))
if err != nil {
t.Errorf("unexpected error: %v", err)
}
@ -3206,6 +3245,7 @@ func TestFairEvaluationForNodes(t *testing.T) {
[]tf.RegisterPluginFunc{
tf.RegisterQueueSortPlugin(queuesort.Name, queuesort.New),
tf.RegisterFilterPlugin("TrueFilter", tf.NewTrueFilterPlugin),
tf.RegisterScorePlugin("EqualPrioritizerPlugin", tf.NewEqualPrioritizerPlugin(), 20),
tf.RegisterBindPlugin(defaultbinder.Name, defaultbinder.New),
},
"",
@ -3284,6 +3324,7 @@ func TestPreferNominatedNodeFilterCallCounts(t *testing.T) {
registerPlugins := []tf.RegisterPluginFunc{
tf.RegisterQueueSortPlugin(queuesort.Name, queuesort.New),
registerFakeFilterFunc,
tf.RegisterScorePlugin("EqualPrioritizerPlugin", tf.NewEqualPrioritizerPlugin(), 20),
tf.RegisterBindPlugin(defaultbinder.Name, defaultbinder.New),
}
fwk, err := tf.NewFramework(

View File

@ -137,6 +137,30 @@ func (pl *node2PrioritizerPlugin) ScoreExtensions() framework.ScoreExtensions {
return nil
}
type equalPrioritizerPlugin struct{}
// NewEqualPrioritizerPlugin returns a factory function to build equalPrioritizerPlugin.
func NewEqualPrioritizerPlugin() frameworkruntime.PluginFactory {
return func(_ context.Context, _ runtime.Object, _ framework.Handle) (framework.Plugin, error) {
return &equalPrioritizerPlugin{}, nil
}
}
// Name returns the name of the plugin.
func (pl *equalPrioritizerPlugin) Name() string {
return "EqualPrioritizerPlugin"
}
// Score returns score 1 for each node.
func (pl *equalPrioritizerPlugin) Score(_ context.Context, _ *framework.CycleState, _ *v1.Pod, _ string) (int64, *framework.Status) {
return int64(1), nil
}
// ScoreExtensions returns nil.
func (pl *equalPrioritizerPlugin) ScoreExtensions() framework.ScoreExtensions {
return nil
}
// FakeExtender is a data struct which implements the Extender interface.
type FakeExtender struct {
// ExtenderName indicates this fake extender's name.
@ -380,6 +404,11 @@ func (f *FakeExtender) IsBinder() bool {
return true
}
// IsPrioritizer returns true if there are any prioritizers.
func (f *FakeExtender) IsPrioritizer() bool {
return len(f.Prioritizers) > 0
}
// IsInterested returns a bool indicating whether this extender is interested in this Pod.
func (f *FakeExtender) IsInterested(pod *v1.Pod) bool {
return !f.UnInterested