mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-23 11:50:44 +00:00
Remove genericScheduler and SchedulerAlgorithm.
Signed-off-by: Ruquan Zhao <ruquan.zhao@arm.com>
This commit is contained in:
parent
356ec6f12f
commit
f1a5b6ca06
@ -41,7 +41,7 @@ import (
|
||||
st "k8s.io/kubernetes/pkg/scheduler/testing"
|
||||
)
|
||||
|
||||
func TestGenericSchedulerWithExtenders(t *testing.T) {
|
||||
func TestSchedulerWithExtenders(t *testing.T) {
|
||||
tests := []struct {
|
||||
name string
|
||||
registerPlugins []st.RegisterPluginFunc
|
||||
@ -294,12 +294,19 @@ func TestGenericSchedulerWithExtenders(t *testing.T) {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
scheduler := NewGenericScheduler(
|
||||
scheduler := newScheduler(
|
||||
cache,
|
||||
extenders,
|
||||
nil,
|
||||
nil,
|
||||
nil,
|
||||
nil,
|
||||
nil,
|
||||
nil,
|
||||
emptySnapshot,
|
||||
schedulerapi.DefaultPercentageOfNodesToScore)
|
||||
podIgnored := &v1.Pod{}
|
||||
result, err := scheduler.Schedule(context.Background(), extenders, fwk, framework.NewCycleState(), podIgnored)
|
||||
result, err := scheduler.SchedulePod(context.Background(), fwk, framework.NewCycleState(), podIgnored)
|
||||
if test.expectsErr {
|
||||
if err == nil {
|
||||
t.Errorf("Unexpected non-error, result %+v", result)
|
||||
|
@ -182,22 +182,18 @@ func (c *Configurator) create() (*Scheduler, error) {
|
||||
)
|
||||
debugger.ListenForSignal(c.StopEverything)
|
||||
|
||||
algo := NewGenericScheduler(
|
||||
sched := newScheduler(
|
||||
c.schedulerCache,
|
||||
extenders,
|
||||
internalqueue.MakeNextPodFunc(podQueue),
|
||||
MakeDefaultErrorFunc(c.client, c.informerFactory.Core().V1().Pods().Lister(), podQueue, c.schedulerCache),
|
||||
c.StopEverything,
|
||||
podQueue,
|
||||
profiles,
|
||||
c.client,
|
||||
c.nodeInfoSnapshot,
|
||||
c.percentageOfNodesToScore,
|
||||
)
|
||||
|
||||
return &Scheduler{
|
||||
Cache: c.schedulerCache,
|
||||
Algorithm: algo,
|
||||
Extenders: extenders,
|
||||
Profiles: profiles,
|
||||
NextPod: internalqueue.MakeNextPodFunc(podQueue),
|
||||
Error: MakeDefaultErrorFunc(c.client, c.informerFactory.Core().V1().Pods().Lister(), podQueue, c.schedulerCache),
|
||||
StopEverything: c.StopEverything,
|
||||
SchedulingQueue: podQueue,
|
||||
}, nil
|
||||
c.percentageOfNodesToScore)
|
||||
return sched, nil
|
||||
}
|
||||
|
||||
// MakeDefaultErrorFunc construct a function to handle pod scheduler error
|
||||
|
@ -32,7 +32,6 @@ import (
|
||||
"k8s.io/kubernetes/pkg/scheduler/framework"
|
||||
"k8s.io/kubernetes/pkg/scheduler/framework/parallelize"
|
||||
"k8s.io/kubernetes/pkg/scheduler/framework/runtime"
|
||||
internalcache "k8s.io/kubernetes/pkg/scheduler/internal/cache"
|
||||
"k8s.io/kubernetes/pkg/scheduler/metrics"
|
||||
utiltrace "k8s.io/utils/trace"
|
||||
)
|
||||
@ -53,13 +52,6 @@ const (
|
||||
// ErrNoNodesAvailable is used to describe the error that no nodes available to schedule pods.
|
||||
var ErrNoNodesAvailable = fmt.Errorf("no nodes available to schedule pods")
|
||||
|
||||
// ScheduleAlgorithm is an interface implemented by things that know how to schedule pods
|
||||
// onto machines.
|
||||
// TODO: Rename this type.
|
||||
type ScheduleAlgorithm interface {
|
||||
Schedule(context.Context, []framework.Extender, framework.Framework, *framework.CycleState, *v1.Pod) (scheduleResult ScheduleResult, err error)
|
||||
}
|
||||
|
||||
// ScheduleResult represents the result of one pod scheduled. It will contain
|
||||
// the final selected Node, along with the selected intermediate information.
|
||||
type ScheduleResult struct {
|
||||
@ -71,37 +63,30 @@ type ScheduleResult struct {
|
||||
FeasibleNodes int
|
||||
}
|
||||
|
||||
type genericScheduler struct {
|
||||
cache internalcache.Cache
|
||||
nodeInfoSnapshot *internalcache.Snapshot
|
||||
percentageOfNodesToScore int32
|
||||
nextStartNodeIndex int
|
||||
}
|
||||
|
||||
// snapshot snapshots scheduler cache and node infos for all fit and priority
|
||||
// functions.
|
||||
func (g *genericScheduler) snapshot() error {
|
||||
func (sched *Scheduler) snapshot() error {
|
||||
// Used for all fit and priority funcs.
|
||||
return g.cache.UpdateSnapshot(g.nodeInfoSnapshot)
|
||||
return sched.Cache.UpdateSnapshot(sched.nodeInfoSnapshot)
|
||||
}
|
||||
|
||||
// Schedule tries to schedule the given pod to one of the nodes in the node list.
|
||||
// schedulePod tries to schedule the given pod to one of the nodes in the node list.
|
||||
// If it succeeds, it will return the name of the node.
|
||||
// If it fails, it will return a FitError error with reasons.
|
||||
func (g *genericScheduler) Schedule(ctx context.Context, extenders []framework.Extender, fwk framework.Framework, state *framework.CycleState, pod *v1.Pod) (result ScheduleResult, err error) {
|
||||
// If it fails, it will return a FitError with reasons.
|
||||
func (sched *Scheduler) schedulePod(ctx context.Context, fwk framework.Framework, state *framework.CycleState, pod *v1.Pod) (result ScheduleResult, err error) {
|
||||
trace := utiltrace.New("Scheduling", utiltrace.Field{Key: "namespace", Value: pod.Namespace}, utiltrace.Field{Key: "name", Value: pod.Name})
|
||||
defer trace.LogIfLong(100 * time.Millisecond)
|
||||
|
||||
if err := g.snapshot(); err != nil {
|
||||
if err := sched.snapshot(); err != nil {
|
||||
return result, err
|
||||
}
|
||||
trace.Step("Snapshotting scheduler cache and node infos done")
|
||||
|
||||
if g.nodeInfoSnapshot.NumNodes() == 0 {
|
||||
if sched.nodeInfoSnapshot.NumNodes() == 0 {
|
||||
return result, ErrNoNodesAvailable
|
||||
}
|
||||
|
||||
feasibleNodes, diagnosis, err := g.findNodesThatFitPod(ctx, extenders, fwk, state, pod)
|
||||
feasibleNodes, diagnosis, err := sched.findNodesThatFitPod(ctx, fwk, state, pod)
|
||||
if err != nil {
|
||||
return result, err
|
||||
}
|
||||
@ -110,7 +95,7 @@ func (g *genericScheduler) Schedule(ctx context.Context, extenders []framework.E
|
||||
if len(feasibleNodes) == 0 {
|
||||
return result, &framework.FitError{
|
||||
Pod: pod,
|
||||
NumAllNodes: g.nodeInfoSnapshot.NumNodes(),
|
||||
NumAllNodes: sched.nodeInfoSnapshot.NumNodes(),
|
||||
Diagnosis: diagnosis,
|
||||
}
|
||||
}
|
||||
@ -124,12 +109,12 @@ func (g *genericScheduler) Schedule(ctx context.Context, extenders []framework.E
|
||||
}, nil
|
||||
}
|
||||
|
||||
priorityList, err := prioritizeNodes(ctx, extenders, fwk, state, pod, feasibleNodes)
|
||||
priorityList, err := prioritizeNodes(ctx, sched.Extenders, fwk, state, pod, feasibleNodes)
|
||||
if err != nil {
|
||||
return result, err
|
||||
}
|
||||
|
||||
host, err := g.selectHost(priorityList)
|
||||
host, err := selectHost(priorityList)
|
||||
trace.Step("Prioritizing done")
|
||||
|
||||
return ScheduleResult{
|
||||
@ -141,7 +126,7 @@ func (g *genericScheduler) Schedule(ctx context.Context, extenders []framework.E
|
||||
|
||||
// selectHost takes a prioritized list of nodes and then picks one
|
||||
// in a reservoir sampling manner from the nodes that had the highest score.
|
||||
func (g *genericScheduler) selectHost(nodeScoreList framework.NodeScoreList) (string, error) {
|
||||
func selectHost(nodeScoreList framework.NodeScoreList) (string, error) {
|
||||
if len(nodeScoreList) == 0 {
|
||||
return "", fmt.Errorf("empty priorityList")
|
||||
}
|
||||
@ -166,12 +151,12 @@ func (g *genericScheduler) selectHost(nodeScoreList framework.NodeScoreList) (st
|
||||
|
||||
// numFeasibleNodesToFind returns the number of feasible nodes that once found, the scheduler stops
|
||||
// its search for more feasible nodes.
|
||||
func (g *genericScheduler) numFeasibleNodesToFind(numAllNodes int32) (numNodes int32) {
|
||||
if numAllNodes < minFeasibleNodesToFind || g.percentageOfNodesToScore >= 100 {
|
||||
func (sched *Scheduler) numFeasibleNodesToFind(numAllNodes int32) (numNodes int32) {
|
||||
if numAllNodes < minFeasibleNodesToFind || sched.percentageOfNodesToScore >= 100 {
|
||||
return numAllNodes
|
||||
}
|
||||
|
||||
adaptivePercentage := g.percentageOfNodesToScore
|
||||
adaptivePercentage := sched.percentageOfNodesToScore
|
||||
if adaptivePercentage <= 0 {
|
||||
basePercentageOfNodesToScore := int32(50)
|
||||
adaptivePercentage = basePercentageOfNodesToScore - numAllNodes/125
|
||||
@ -188,19 +173,19 @@ func (g *genericScheduler) numFeasibleNodesToFind(numAllNodes int32) (numNodes i
|
||||
return numNodes
|
||||
}
|
||||
|
||||
func (g *genericScheduler) evaluateNominatedNode(ctx context.Context, extenders []framework.Extender, pod *v1.Pod, fwk framework.Framework, state *framework.CycleState, diagnosis framework.Diagnosis) ([]*v1.Node, error) {
|
||||
func (sched *Scheduler) evaluateNominatedNode(ctx context.Context, pod *v1.Pod, fwk framework.Framework, state *framework.CycleState, diagnosis framework.Diagnosis) ([]*v1.Node, error) {
|
||||
nnn := pod.Status.NominatedNodeName
|
||||
nodeInfo, err := g.nodeInfoSnapshot.Get(nnn)
|
||||
nodeInfo, err := sched.nodeInfoSnapshot.Get(nnn)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
node := []*framework.NodeInfo{nodeInfo}
|
||||
feasibleNodes, err := g.findNodesThatPassFilters(ctx, fwk, state, pod, diagnosis, node)
|
||||
feasibleNodes, err := sched.findNodesThatPassFilters(ctx, fwk, state, pod, diagnosis, node)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
feasibleNodes, err = findNodesThatPassExtenders(extenders, pod, feasibleNodes, diagnosis.NodeToStatusMap)
|
||||
feasibleNodes, err = findNodesThatPassExtenders(sched.Extenders, pod, feasibleNodes, diagnosis.NodeToStatusMap)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
@ -210,7 +195,7 @@ func (g *genericScheduler) evaluateNominatedNode(ctx context.Context, extenders
|
||||
|
||||
// Filters the nodes to find the ones that fit the pod based on the framework
|
||||
// filter plugins and filter extenders.
|
||||
func (g *genericScheduler) findNodesThatFitPod(ctx context.Context, extenders []framework.Extender, fwk framework.Framework, state *framework.CycleState, pod *v1.Pod) ([]*v1.Node, framework.Diagnosis, error) {
|
||||
func (sched *Scheduler) findNodesThatFitPod(ctx context.Context, fwk framework.Framework, state *framework.CycleState, pod *v1.Pod) ([]*v1.Node, framework.Diagnosis, error) {
|
||||
diagnosis := framework.Diagnosis{
|
||||
NodeToStatusMap: make(framework.NodeToStatusMap),
|
||||
UnschedulablePlugins: sets.NewString(),
|
||||
@ -218,7 +203,7 @@ func (g *genericScheduler) findNodesThatFitPod(ctx context.Context, extenders []
|
||||
|
||||
// Run "prefilter" plugins.
|
||||
s := fwk.RunPreFilterPlugins(ctx, state, pod)
|
||||
allNodes, err := g.nodeInfoSnapshot.NodeInfos().List()
|
||||
allNodes, err := sched.nodeInfoSnapshot.NodeInfos().List()
|
||||
if err != nil {
|
||||
return nil, diagnosis, err
|
||||
}
|
||||
@ -239,7 +224,7 @@ func (g *genericScheduler) findNodesThatFitPod(ctx context.Context, extenders []
|
||||
// "NominatedNodeName" can potentially be set in a previous scheduling cycle as a result of preemption.
|
||||
// This node is likely the only candidate that will fit the pod, and hence we try it first before iterating over all nodes.
|
||||
if len(pod.Status.NominatedNodeName) > 0 {
|
||||
feasibleNodes, err := g.evaluateNominatedNode(ctx, extenders, pod, fwk, state, diagnosis)
|
||||
feasibleNodes, err := sched.evaluateNominatedNode(ctx, pod, fwk, state, diagnosis)
|
||||
if err != nil {
|
||||
klog.ErrorS(err, "Evaluation failed on nominated node", "pod", klog.KObj(pod), "node", pod.Status.NominatedNodeName)
|
||||
}
|
||||
@ -248,12 +233,12 @@ func (g *genericScheduler) findNodesThatFitPod(ctx context.Context, extenders []
|
||||
return feasibleNodes, diagnosis, nil
|
||||
}
|
||||
}
|
||||
feasibleNodes, err := g.findNodesThatPassFilters(ctx, fwk, state, pod, diagnosis, allNodes)
|
||||
feasibleNodes, err := sched.findNodesThatPassFilters(ctx, fwk, state, pod, diagnosis, allNodes)
|
||||
if err != nil {
|
||||
return nil, diagnosis, err
|
||||
}
|
||||
|
||||
feasibleNodes, err = findNodesThatPassExtenders(extenders, pod, feasibleNodes, diagnosis.NodeToStatusMap)
|
||||
feasibleNodes, err = findNodesThatPassExtenders(sched.Extenders, pod, feasibleNodes, diagnosis.NodeToStatusMap)
|
||||
if err != nil {
|
||||
return nil, diagnosis, err
|
||||
}
|
||||
@ -261,14 +246,14 @@ func (g *genericScheduler) findNodesThatFitPod(ctx context.Context, extenders []
|
||||
}
|
||||
|
||||
// findNodesThatPassFilters finds the nodes that fit the filter plugins.
|
||||
func (g *genericScheduler) findNodesThatPassFilters(
|
||||
func (sched *Scheduler) findNodesThatPassFilters(
|
||||
ctx context.Context,
|
||||
fwk framework.Framework,
|
||||
state *framework.CycleState,
|
||||
pod *v1.Pod,
|
||||
diagnosis framework.Diagnosis,
|
||||
nodes []*framework.NodeInfo) ([]*v1.Node, error) {
|
||||
numNodesToFind := g.numFeasibleNodesToFind(int32(len(nodes)))
|
||||
numNodesToFind := sched.numFeasibleNodesToFind(int32(len(nodes)))
|
||||
|
||||
// Create feasible list with enough space to avoid growing it
|
||||
// and allow assigning.
|
||||
@ -277,9 +262,9 @@ func (g *genericScheduler) findNodesThatPassFilters(
|
||||
if !fwk.HasFilterPlugins() {
|
||||
length := len(nodes)
|
||||
for i := range feasibleNodes {
|
||||
feasibleNodes[i] = nodes[(g.nextStartNodeIndex+i)%length].Node()
|
||||
feasibleNodes[i] = nodes[(sched.nextStartNodeIndex+i)%length].Node()
|
||||
}
|
||||
g.nextStartNodeIndex = (g.nextStartNodeIndex + len(feasibleNodes)) % length
|
||||
sched.nextStartNodeIndex = (sched.nextStartNodeIndex + len(feasibleNodes)) % length
|
||||
return feasibleNodes, nil
|
||||
}
|
||||
|
||||
@ -290,7 +275,7 @@ func (g *genericScheduler) findNodesThatPassFilters(
|
||||
checkNode := func(i int) {
|
||||
// We check the nodes starting from where we left off in the previous scheduling cycle,
|
||||
// this is to make sure all nodes have the same chance of being examined across pods.
|
||||
nodeInfo := nodes[(g.nextStartNodeIndex+i)%len(nodes)]
|
||||
nodeInfo := nodes[(sched.nextStartNodeIndex+i)%len(nodes)]
|
||||
status := fwk.RunFilterPluginsWithNominatedPods(ctx, state, pod, nodeInfo)
|
||||
if status.Code() == framework.Error {
|
||||
errCh.SendErrorWithCancel(status.AsError(), cancel)
|
||||
@ -325,7 +310,7 @@ func (g *genericScheduler) findNodesThatPassFilters(
|
||||
// are found.
|
||||
fwk.Parallelizer().Until(ctx, len(nodes), checkNode)
|
||||
processedNodes := int(feasibleNodesLen) + len(diagnosis.NodeToStatusMap)
|
||||
g.nextStartNodeIndex = (g.nextStartNodeIndex + processedNodes) % len(nodes)
|
||||
sched.nextStartNodeIndex = (sched.nextStartNodeIndex + processedNodes) % len(nodes)
|
||||
|
||||
feasibleNodes = feasibleNodes[:feasibleNodesLen]
|
||||
if err := errCh.ReceiveError(); err != nil {
|
||||
@ -494,15 +479,3 @@ func prioritizeNodes(
|
||||
}
|
||||
return result, nil
|
||||
}
|
||||
|
||||
// NewGenericScheduler creates a genericScheduler object.
|
||||
func NewGenericScheduler(
|
||||
cache internalcache.Cache,
|
||||
nodeInfoSnapshot *internalcache.Snapshot,
|
||||
percentageOfNodesToScore int32) ScheduleAlgorithm {
|
||||
return &genericScheduler{
|
||||
cache: cache,
|
||||
nodeInfoSnapshot: nodeInfoSnapshot,
|
||||
percentageOfNodesToScore: percentageOfNodesToScore,
|
||||
}
|
||||
}
|
||||
|
@ -202,7 +202,6 @@ func makeNodeList(nodeNames []string) []*v1.Node {
|
||||
}
|
||||
|
||||
func TestSelectHost(t *testing.T) {
|
||||
scheduler := genericScheduler{}
|
||||
tests := []struct {
|
||||
name string
|
||||
list framework.NodeScoreList
|
||||
@ -253,7 +252,7 @@ func TestSelectHost(t *testing.T) {
|
||||
t.Run(test.name, func(t *testing.T) {
|
||||
// increase the randomness
|
||||
for i := 0; i < 10; i++ {
|
||||
got, err := scheduler.selectHost(test.list)
|
||||
got, err := selectHost(test.list)
|
||||
if test.expectsErr {
|
||||
if err == nil {
|
||||
t.Error("Unexpected non-error")
|
||||
@ -450,7 +449,7 @@ func TestFindNodesThatPassExtenders(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
func TestGenericScheduler(t *testing.T) {
|
||||
func TestSchedulerSchedulePod(t *testing.T) {
|
||||
fts := feature.Features{}
|
||||
tests := []struct {
|
||||
name string
|
||||
@ -1006,15 +1005,21 @@ func TestGenericScheduler(t *testing.T) {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
scheduler := NewGenericScheduler(
|
||||
scheduler := newScheduler(
|
||||
cache,
|
||||
nil,
|
||||
nil,
|
||||
nil,
|
||||
nil,
|
||||
nil,
|
||||
nil,
|
||||
nil,
|
||||
snapshot,
|
||||
schedulerapi.DefaultPercentageOfNodesToScore,
|
||||
)
|
||||
schedulerapi.DefaultPercentageOfNodesToScore)
|
||||
informerFactory.Start(ctx.Done())
|
||||
informerFactory.WaitForCacheSync(ctx.Done())
|
||||
|
||||
result, err := scheduler.Schedule(ctx, nil, fwk, framework.NewCycleState(), test.pod)
|
||||
result, err := scheduler.SchedulePod(ctx, fwk, framework.NewCycleState(), test.pod)
|
||||
if err != test.wErr {
|
||||
gotFitErr, gotOK := err.(*framework.FitError)
|
||||
wantFitErr, wantOK := test.wErr.(*framework.FitError)
|
||||
@ -1036,19 +1041,26 @@ func TestGenericScheduler(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
// makeScheduler makes a simple genericScheduler for testing.
|
||||
func makeScheduler(nodes []*v1.Node) *genericScheduler {
|
||||
// makeScheduler makes a simple Scheduler for testing.
|
||||
func makeScheduler(nodes []*v1.Node) *Scheduler {
|
||||
cache := internalcache.New(time.Duration(0), wait.NeverStop)
|
||||
for _, n := range nodes {
|
||||
cache.AddNode(n)
|
||||
}
|
||||
|
||||
s := NewGenericScheduler(
|
||||
s := newScheduler(
|
||||
cache,
|
||||
nil,
|
||||
nil,
|
||||
nil,
|
||||
nil,
|
||||
nil,
|
||||
nil,
|
||||
nil,
|
||||
emptySnapshot,
|
||||
schedulerapi.DefaultPercentageOfNodesToScore)
|
||||
cache.UpdateSnapshot(s.(*genericScheduler).nodeInfoSnapshot)
|
||||
return s.(*genericScheduler)
|
||||
cache.UpdateSnapshot(s.nodeInfoSnapshot)
|
||||
return s
|
||||
}
|
||||
|
||||
func TestFindFitAllError(t *testing.T) {
|
||||
@ -1068,8 +1080,7 @@ func TestFindFitAllError(t *testing.T) {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
_, diagnosis, err := scheduler.findNodesThatFitPod(context.Background(), nil, fwk, framework.NewCycleState(), &v1.Pod{})
|
||||
|
||||
_, diagnosis, err := scheduler.findNodesThatFitPod(context.Background(), fwk, framework.NewCycleState(), &v1.Pod{})
|
||||
if err != nil {
|
||||
t.Errorf("unexpected error: %v", err)
|
||||
}
|
||||
@ -1105,8 +1116,7 @@ func TestFindFitSomeError(t *testing.T) {
|
||||
}
|
||||
|
||||
pod := &v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "1", UID: types.UID("1")}}
|
||||
_, diagnosis, err := scheduler.findNodesThatFitPod(context.Background(), nil, fwk, framework.NewCycleState(), pod)
|
||||
|
||||
_, diagnosis, err := scheduler.findNodesThatFitPod(context.Background(), fwk, framework.NewCycleState(), pod)
|
||||
if err != nil {
|
||||
t.Errorf("unexpected error: %v", err)
|
||||
}
|
||||
@ -1179,14 +1189,13 @@ func TestFindFitPredicateCallCounts(t *testing.T) {
|
||||
}
|
||||
|
||||
scheduler := makeScheduler(nodes)
|
||||
if err := scheduler.cache.UpdateSnapshot(scheduler.nodeInfoSnapshot); err != nil {
|
||||
if err := scheduler.Cache.UpdateSnapshot(scheduler.nodeInfoSnapshot); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
fwk.AddNominatedPod(framework.NewPodInfo(&v1.Pod{ObjectMeta: metav1.ObjectMeta{UID: "nominated"}, Spec: v1.PodSpec{Priority: &midPriority}}),
|
||||
&framework.NominatingInfo{NominatingMode: framework.ModeOverride, NominatedNodeName: "1"})
|
||||
|
||||
_, _, err = scheduler.findNodesThatFitPod(context.Background(), nil, fwk, framework.NewCycleState(), test.pod)
|
||||
|
||||
_, _, err = scheduler.findNodesThatFitPod(context.Background(), fwk, framework.NewCycleState(), test.pod)
|
||||
if err != nil {
|
||||
t.Errorf("unexpected error: %v", err)
|
||||
}
|
||||
@ -1335,15 +1344,21 @@ func TestZeroRequest(t *testing.T) {
|
||||
t.Fatalf("error creating framework: %+v", err)
|
||||
}
|
||||
|
||||
scheduler := NewGenericScheduler(
|
||||
scheduler := newScheduler(
|
||||
nil,
|
||||
emptySnapshot,
|
||||
schedulerapi.DefaultPercentageOfNodesToScore).(*genericScheduler)
|
||||
scheduler.nodeInfoSnapshot = snapshot
|
||||
nil,
|
||||
nil,
|
||||
nil,
|
||||
nil,
|
||||
nil,
|
||||
nil,
|
||||
nil,
|
||||
snapshot,
|
||||
schedulerapi.DefaultPercentageOfNodesToScore)
|
||||
|
||||
ctx := context.Background()
|
||||
state := framework.NewCycleState()
|
||||
_, _, err = scheduler.findNodesThatFitPod(ctx, nil, fwk, state, test.pod)
|
||||
_, _, err = scheduler.findNodesThatFitPod(ctx, fwk, state, test.pod)
|
||||
if err != nil {
|
||||
t.Fatalf("error filtering nodes: %+v", err)
|
||||
}
|
||||
@ -1406,11 +1421,11 @@ func TestNumFeasibleNodesToFind(t *testing.T) {
|
||||
}
|
||||
for _, tt := range tests {
|
||||
t.Run(tt.name, func(t *testing.T) {
|
||||
g := &genericScheduler{
|
||||
sched := &Scheduler{
|
||||
percentageOfNodesToScore: tt.percentageOfNodesToScore,
|
||||
}
|
||||
if gotNumNodes := g.numFeasibleNodesToFind(tt.numAllNodes); gotNumNodes != tt.wantNumNodes {
|
||||
t.Errorf("genericScheduler.numFeasibleNodesToFind() = %v, want %v", gotNumNodes, tt.wantNumNodes)
|
||||
if gotNumNodes := sched.numFeasibleNodesToFind(tt.numAllNodes); gotNumNodes != tt.wantNumNodes {
|
||||
t.Errorf("Scheduler.numFeasibleNodesToFind() = %v, want %v", gotNumNodes, tt.wantNumNodes)
|
||||
}
|
||||
})
|
||||
}
|
||||
@ -1423,7 +1438,7 @@ func TestFairEvaluationForNodes(t *testing.T) {
|
||||
nodeNames = append(nodeNames, strconv.Itoa(i))
|
||||
}
|
||||
nodes := makeNodeList(nodeNames)
|
||||
g := makeScheduler(nodes)
|
||||
sched := makeScheduler(nodes)
|
||||
fwk, err := st.NewFramework(
|
||||
[]st.RegisterPluginFunc{
|
||||
st.RegisterQueueSortPlugin(queuesort.Name, queuesort.New),
|
||||
@ -1438,20 +1453,20 @@ func TestFairEvaluationForNodes(t *testing.T) {
|
||||
}
|
||||
|
||||
// To make numAllNodes % nodesToFind != 0
|
||||
g.percentageOfNodesToScore = 30
|
||||
nodesToFind := int(g.numFeasibleNodesToFind(int32(numAllNodes)))
|
||||
sched.percentageOfNodesToScore = 30
|
||||
nodesToFind := int(sched.numFeasibleNodesToFind(int32(numAllNodes)))
|
||||
|
||||
// Iterating over all nodes more than twice
|
||||
for i := 0; i < 2*(numAllNodes/nodesToFind+1); i++ {
|
||||
nodesThatFit, _, err := g.findNodesThatFitPod(context.Background(), nil, fwk, framework.NewCycleState(), &v1.Pod{})
|
||||
nodesThatFit, _, err := sched.findNodesThatFitPod(context.Background(), fwk, framework.NewCycleState(), &v1.Pod{})
|
||||
if err != nil {
|
||||
t.Errorf("unexpected error: %v", err)
|
||||
}
|
||||
if len(nodesThatFit) != nodesToFind {
|
||||
t.Errorf("got %d nodes filtered, want %d", len(nodesThatFit), nodesToFind)
|
||||
}
|
||||
if g.nextStartNodeIndex != (i+1)*nodesToFind%numAllNodes {
|
||||
t.Errorf("got %d lastProcessedNodeIndex, want %d", g.nextStartNodeIndex, (i+1)*nodesToFind%numAllNodes)
|
||||
if sched.nextStartNodeIndex != (i+1)*nodesToFind%numAllNodes {
|
||||
t.Errorf("got %d lastProcessedNodeIndex, want %d", sched.nextStartNodeIndex, (i+1)*nodesToFind%numAllNodes)
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -1513,13 +1528,19 @@ func TestPreferNominatedNodeFilterCallCounts(t *testing.T) {
|
||||
t.Fatal(err)
|
||||
}
|
||||
snapshot := internalcache.NewSnapshot(nil, nodes)
|
||||
scheduler := NewGenericScheduler(
|
||||
scheduler := newScheduler(
|
||||
cache,
|
||||
nil,
|
||||
nil,
|
||||
nil,
|
||||
nil,
|
||||
nil,
|
||||
nil,
|
||||
nil,
|
||||
snapshot,
|
||||
schedulerapi.DefaultPercentageOfNodesToScore).(*genericScheduler)
|
||||
|
||||
_, _, err = scheduler.findNodesThatFitPod(context.Background(), nil, fwk, framework.NewCycleState(), test.pod)
|
||||
schedulerapi.DefaultPercentageOfNodesToScore)
|
||||
|
||||
_, _, err = scheduler.findNodesThatFitPod(context.Background(), fwk, framework.NewCycleState(), test.pod)
|
||||
if err != nil {
|
||||
t.Errorf("unexpected error: %v", err)
|
||||
}
|
||||
|
@ -69,8 +69,6 @@ type Scheduler struct {
|
||||
// by NodeLister and Algorithm.
|
||||
Cache internalcache.Cache
|
||||
|
||||
Algorithm ScheduleAlgorithm
|
||||
|
||||
Extenders []framework.Extender
|
||||
|
||||
// NextPod should be a function that blocks until the next pod
|
||||
@ -83,6 +81,11 @@ type Scheduler struct {
|
||||
// question, and the error
|
||||
Error func(*framework.QueuedPodInfo, error)
|
||||
|
||||
// SchedulePod tries to schedule the given pod to one of the nodes in the node list.
|
||||
// Return a struct of ScheduleResult with the name of suggested host on success,
|
||||
// otherwise will return a FitError with reasons.
|
||||
SchedulePod func(ctx context.Context, fwk framework.Framework, state *framework.CycleState, pod *v1.Pod) (ScheduleResult, error)
|
||||
|
||||
// Close this to shut down the scheduler.
|
||||
StopEverything <-chan struct{}
|
||||
|
||||
@ -93,6 +96,12 @@ type Scheduler struct {
|
||||
Profiles profile.Map
|
||||
|
||||
client clientset.Interface
|
||||
|
||||
nodeInfoSnapshot *internalcache.Snapshot
|
||||
|
||||
percentageOfNodesToScore int32
|
||||
|
||||
nextStartNodeIndex int
|
||||
}
|
||||
|
||||
type schedulerOptions struct {
|
||||
@ -213,6 +222,34 @@ var defaultSchedulerOptions = schedulerOptions{
|
||||
applyDefaultProfile: true,
|
||||
}
|
||||
|
||||
// newScheduler creates a Scheduler object.
|
||||
func newScheduler(
|
||||
cache internalcache.Cache,
|
||||
extenders []framework.Extender,
|
||||
nextPod func() *framework.QueuedPodInfo,
|
||||
Error func(*framework.QueuedPodInfo, error),
|
||||
stopEverything <-chan struct{},
|
||||
schedulingQueue internalqueue.SchedulingQueue,
|
||||
profiles profile.Map,
|
||||
client clientset.Interface,
|
||||
nodeInfoSnapshot *internalcache.Snapshot,
|
||||
percentageOfNodesToScore int32) *Scheduler {
|
||||
sched := Scheduler{
|
||||
Cache: cache,
|
||||
Extenders: extenders,
|
||||
NextPod: nextPod,
|
||||
Error: Error,
|
||||
StopEverything: stopEverything,
|
||||
SchedulingQueue: schedulingQueue,
|
||||
Profiles: profiles,
|
||||
client: client,
|
||||
nodeInfoSnapshot: nodeInfoSnapshot,
|
||||
percentageOfNodesToScore: percentageOfNodesToScore,
|
||||
}
|
||||
sched.SchedulePod = sched.schedulePod
|
||||
return &sched
|
||||
}
|
||||
|
||||
// New returns a Scheduler
|
||||
func New(client clientset.Interface,
|
||||
informerFactory informers.SharedInformerFactory,
|
||||
@ -279,10 +316,6 @@ func New(client clientset.Interface,
|
||||
return nil, fmt.Errorf("couldn't create scheduler: %v", err)
|
||||
}
|
||||
|
||||
// Additional tweaks to the config produced by the configurator.
|
||||
sched.StopEverything = stopEverything
|
||||
sched.client = client
|
||||
|
||||
addAllEventHandlers(sched, informerFactory, dynInformerFactory, unionedGVKs(clusterEventMap))
|
||||
|
||||
return sched, nil
|
||||
@ -462,9 +495,9 @@ func (sched *Scheduler) scheduleOne(ctx context.Context) {
|
||||
|
||||
schedulingCycleCtx, cancel := context.WithCancel(ctx)
|
||||
defer cancel()
|
||||
scheduleResult, err := sched.Algorithm.Schedule(schedulingCycleCtx, sched.Extenders, fwk, state, pod)
|
||||
scheduleResult, err := sched.SchedulePod(schedulingCycleCtx, fwk, state, pod)
|
||||
if err != nil {
|
||||
// Schedule() may have failed because the pod would not fit on any host, so we try to
|
||||
// SchedulePod() may have failed because the pod would not fit on any host, so we try to
|
||||
// preempt, with the expectation that the next time the pod is tried for scheduling it
|
||||
// will fit due to the preemption. It is also possible that a different pod will schedule
|
||||
// into the resources that were preempted, but this is harmless.
|
||||
|
@ -104,15 +104,11 @@ func podWithResources(id, desiredHost string, limits v1.ResourceList, requests v
|
||||
return pod
|
||||
}
|
||||
|
||||
type mockScheduler struct {
|
||||
type mockScheduleResult struct {
|
||||
result ScheduleResult
|
||||
err error
|
||||
}
|
||||
|
||||
func (es mockScheduler) Schedule(ctx context.Context, extenders []framework.Extender, fwk framework.Framework, state *framework.CycleState, pod *v1.Pod) (ScheduleResult, error) {
|
||||
return es.result, es.err
|
||||
}
|
||||
|
||||
func TestSchedulerCreation(t *testing.T) {
|
||||
invalidRegistry := map[string]frameworkruntime.PluginFactory{
|
||||
defaultbinder.Name: defaultbinder.New,
|
||||
@ -307,7 +303,6 @@ func TestSchedulerScheduleOne(t *testing.T) {
|
||||
name string
|
||||
injectBindError error
|
||||
sendPod *v1.Pod
|
||||
algo ScheduleAlgorithm
|
||||
registerPluginFuncs []st.RegisterPluginFunc
|
||||
expectErrorPod *v1.Pod
|
||||
expectForgetPod *v1.Pod
|
||||
@ -315,11 +310,12 @@ func TestSchedulerScheduleOne(t *testing.T) {
|
||||
expectError error
|
||||
expectBind *v1.Binding
|
||||
eventReason string
|
||||
mockResult mockScheduleResult
|
||||
}{
|
||||
{
|
||||
name: "error reserve pod",
|
||||
sendPod: podWithID("foo", ""),
|
||||
algo: mockScheduler{ScheduleResult{SuggestedHost: testNode.Name, EvaluatedNodes: 1, FeasibleNodes: 1}, nil},
|
||||
name: "error reserve pod",
|
||||
sendPod: podWithID("foo", ""),
|
||||
mockResult: mockScheduleResult{ScheduleResult{SuggestedHost: testNode.Name, EvaluatedNodes: 1, FeasibleNodes: 1}, nil},
|
||||
registerPluginFuncs: []st.RegisterPluginFunc{
|
||||
st.RegisterReservePlugin("FakeReserve", st.NewFakeReservePlugin(framework.NewStatus(framework.Error, "reserve error"))),
|
||||
},
|
||||
@ -330,9 +326,9 @@ func TestSchedulerScheduleOne(t *testing.T) {
|
||||
eventReason: "FailedScheduling",
|
||||
},
|
||||
{
|
||||
name: "error permit pod",
|
||||
sendPod: podWithID("foo", ""),
|
||||
algo: mockScheduler{ScheduleResult{SuggestedHost: testNode.Name, EvaluatedNodes: 1, FeasibleNodes: 1}, nil},
|
||||
name: "error permit pod",
|
||||
sendPod: podWithID("foo", ""),
|
||||
mockResult: mockScheduleResult{ScheduleResult{SuggestedHost: testNode.Name, EvaluatedNodes: 1, FeasibleNodes: 1}, nil},
|
||||
registerPluginFuncs: []st.RegisterPluginFunc{
|
||||
st.RegisterPermitPlugin("FakePermit", st.NewFakePermitPlugin(framework.NewStatus(framework.Error, "permit error"), time.Minute)),
|
||||
},
|
||||
@ -343,9 +339,9 @@ func TestSchedulerScheduleOne(t *testing.T) {
|
||||
eventReason: "FailedScheduling",
|
||||
},
|
||||
{
|
||||
name: "error prebind pod",
|
||||
sendPod: podWithID("foo", ""),
|
||||
algo: mockScheduler{ScheduleResult{SuggestedHost: testNode.Name, EvaluatedNodes: 1, FeasibleNodes: 1}, nil},
|
||||
name: "error prebind pod",
|
||||
sendPod: podWithID("foo", ""),
|
||||
mockResult: mockScheduleResult{ScheduleResult{SuggestedHost: testNode.Name, EvaluatedNodes: 1, FeasibleNodes: 1}, nil},
|
||||
registerPluginFuncs: []st.RegisterPluginFunc{
|
||||
st.RegisterPreBindPlugin("FakePreBind", st.NewFakePreBindPlugin(framework.AsStatus(preBindErr))),
|
||||
},
|
||||
@ -358,7 +354,7 @@ func TestSchedulerScheduleOne(t *testing.T) {
|
||||
{
|
||||
name: "bind assumed pod scheduled",
|
||||
sendPod: podWithID("foo", ""),
|
||||
algo: mockScheduler{ScheduleResult{SuggestedHost: testNode.Name, EvaluatedNodes: 1, FeasibleNodes: 1}, nil},
|
||||
mockResult: mockScheduleResult{ScheduleResult{SuggestedHost: testNode.Name, EvaluatedNodes: 1, FeasibleNodes: 1}, nil},
|
||||
expectBind: &v1.Binding{ObjectMeta: metav1.ObjectMeta{Name: "foo", UID: types.UID("foo")}, Target: v1.ObjectReference{Kind: "Node", Name: testNode.Name}},
|
||||
expectAssumedPod: podWithID("foo", testNode.Name),
|
||||
eventReason: "Scheduled",
|
||||
@ -366,7 +362,7 @@ func TestSchedulerScheduleOne(t *testing.T) {
|
||||
{
|
||||
name: "error pod failed scheduling",
|
||||
sendPod: podWithID("foo", ""),
|
||||
algo: mockScheduler{ScheduleResult{SuggestedHost: testNode.Name, EvaluatedNodes: 1, FeasibleNodes: 1}, errS},
|
||||
mockResult: mockScheduleResult{ScheduleResult{SuggestedHost: testNode.Name, EvaluatedNodes: 1, FeasibleNodes: 1}, errS},
|
||||
expectError: errS,
|
||||
expectErrorPod: podWithID("foo", ""),
|
||||
eventReason: "FailedScheduling",
|
||||
@ -374,7 +370,7 @@ func TestSchedulerScheduleOne(t *testing.T) {
|
||||
{
|
||||
name: "error bind forget pod failed scheduling",
|
||||
sendPod: podWithID("foo", ""),
|
||||
algo: mockScheduler{ScheduleResult{SuggestedHost: testNode.Name, EvaluatedNodes: 1, FeasibleNodes: 1}, nil},
|
||||
mockResult: mockScheduleResult{ScheduleResult{SuggestedHost: testNode.Name, EvaluatedNodes: 1, FeasibleNodes: 1}, nil},
|
||||
expectBind: &v1.Binding{ObjectMeta: metav1.ObjectMeta{Name: "foo", UID: types.UID("foo")}, Target: v1.ObjectReference{Kind: "Node", Name: testNode.Name}},
|
||||
expectAssumedPod: podWithID("foo", testNode.Name),
|
||||
injectBindError: errB,
|
||||
@ -386,7 +382,7 @@ func TestSchedulerScheduleOne(t *testing.T) {
|
||||
{
|
||||
name: "deleting pod",
|
||||
sendPod: deletingPod("foo"),
|
||||
algo: mockScheduler{ScheduleResult{}, nil},
|
||||
mockResult: mockScheduleResult{ScheduleResult{}, nil},
|
||||
eventReason: "FailedScheduling",
|
||||
},
|
||||
}
|
||||
@ -435,21 +431,26 @@ func TestSchedulerScheduleOne(t *testing.T) {
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
defer cancel()
|
||||
|
||||
s := &Scheduler{
|
||||
Cache: cache,
|
||||
Algorithm: item.algo,
|
||||
client: client,
|
||||
Error: func(p *framework.QueuedPodInfo, err error) {
|
||||
s := newScheduler(
|
||||
cache,
|
||||
nil,
|
||||
func() *framework.QueuedPodInfo {
|
||||
return &framework.QueuedPodInfo{PodInfo: framework.NewPodInfo(item.sendPod)}
|
||||
},
|
||||
func(p *framework.QueuedPodInfo, err error) {
|
||||
gotPod = p.Pod
|
||||
gotError = err
|
||||
},
|
||||
NextPod: func() *framework.QueuedPodInfo {
|
||||
return &framework.QueuedPodInfo{PodInfo: framework.NewPodInfo(item.sendPod)}
|
||||
},
|
||||
Profiles: profile.Map{
|
||||
nil,
|
||||
internalqueue.NewTestQueue(ctx, nil),
|
||||
profile.Map{
|
||||
testSchedulerName: fwk,
|
||||
},
|
||||
SchedulingQueue: internalqueue.NewTestQueue(ctx, nil),
|
||||
client,
|
||||
nil,
|
||||
0)
|
||||
s.SchedulePod = func(ctx context.Context, fwk framework.Framework, state *framework.CycleState, pod *v1.Pod) (ScheduleResult, error) {
|
||||
return item.mockResult.result, item.mockResult.err
|
||||
}
|
||||
called := make(chan struct{})
|
||||
stopFunc := eventBroadcaster.StartEventWatcher(func(obj runtime.Object) {
|
||||
@ -914,29 +915,24 @@ func setupTestScheduler(ctx context.Context, queuedPodStore *clientcache.FIFO, c
|
||||
frameworkruntime.WithPodNominator(internalqueue.NewPodNominator(informerFactory.Core().V1().Pods().Lister())),
|
||||
)
|
||||
|
||||
algo := NewGenericScheduler(
|
||||
cache,
|
||||
internalcache.NewEmptySnapshot(),
|
||||
schedulerapi.DefaultPercentageOfNodesToScore,
|
||||
)
|
||||
|
||||
errChan := make(chan error, 1)
|
||||
sched := &Scheduler{
|
||||
Cache: cache,
|
||||
Algorithm: algo,
|
||||
NextPod: func() *framework.QueuedPodInfo {
|
||||
sched := newScheduler(
|
||||
cache,
|
||||
nil,
|
||||
func() *framework.QueuedPodInfo {
|
||||
return &framework.QueuedPodInfo{PodInfo: framework.NewPodInfo(clientcache.Pop(queuedPodStore).(*v1.Pod))}
|
||||
},
|
||||
Error: func(p *framework.QueuedPodInfo, err error) {
|
||||
func(p *framework.QueuedPodInfo, err error) {
|
||||
errChan <- err
|
||||
},
|
||||
Profiles: profile.Map{
|
||||
nil,
|
||||
schedulingQueue,
|
||||
profile.Map{
|
||||
testSchedulerName: fwk,
|
||||
},
|
||||
client: client,
|
||||
SchedulingQueue: schedulingQueue,
|
||||
}
|
||||
|
||||
client,
|
||||
internalcache.NewEmptySnapshot(),
|
||||
schedulerapi.DefaultPercentageOfNodesToScore)
|
||||
return sched, bindingChan, errChan
|
||||
}
|
||||
|
||||
@ -1180,16 +1176,11 @@ func TestSchedulerBinding(t *testing.T) {
|
||||
}
|
||||
stop := make(chan struct{})
|
||||
defer close(stop)
|
||||
cache := internalcache.New(100*time.Millisecond, stop)
|
||||
algo := NewGenericScheduler(
|
||||
cache,
|
||||
nil,
|
||||
0,
|
||||
)
|
||||
sched := Scheduler{
|
||||
Algorithm: algo,
|
||||
Extenders: test.extenders,
|
||||
Cache: cache,
|
||||
sched := &Scheduler{
|
||||
Extenders: test.extenders,
|
||||
Cache: internalcache.New(100*time.Millisecond, stop),
|
||||
nodeInfoSnapshot: nil,
|
||||
percentageOfNodesToScore: 0,
|
||||
}
|
||||
err = sched.bind(context.Background(), fwk, pod, "node", nil)
|
||||
if err != nil {
|
||||
|
@ -100,7 +100,7 @@ func TestCoreResourceEnqueue(t *testing.T) {
|
||||
t.Fatalf("Cannot find the profile for Pod %v", podInfo.Pod.Name)
|
||||
}
|
||||
// Schedule the Pod manually.
|
||||
_, fitError := testCtx.Scheduler.Algorithm.Schedule(ctx, nil, fwk, framework.NewCycleState(), podInfo.Pod)
|
||||
_, fitError := testCtx.Scheduler.SchedulePod(ctx, fwk, framework.NewCycleState(), podInfo.Pod)
|
||||
if fitError == nil {
|
||||
t.Fatalf("Expect Pod %v to fail at scheduling.", podInfo.Pod.Name)
|
||||
}
|
||||
@ -277,7 +277,7 @@ func TestCustomResourceEnqueue(t *testing.T) {
|
||||
t.Fatalf("Cannot find the profile for Pod %v", podInfo.Pod.Name)
|
||||
}
|
||||
// Schedule the Pod manually.
|
||||
_, fitError := testCtx.Scheduler.Algorithm.Schedule(ctx, nil, fwk, framework.NewCycleState(), podInfo.Pod)
|
||||
_, fitError := testCtx.Scheduler.SchedulePod(ctx, fwk, framework.NewCycleState(), podInfo.Pod)
|
||||
// The fitError is expected to be non-nil as it failed the fakeCRPlugin plugin.
|
||||
if fitError == nil {
|
||||
t.Fatalf("Expect Pod %v to fail at scheduling.", podInfo.Pod.Name)
|
||||
|
Loading…
Reference in New Issue
Block a user