Merge pull request #107135 from ruquanzhao/removegenericScheduler

Scheduler: Remove genericScheduler and SchedulerAlgorithm.
This commit is contained in:
Kubernetes Prow Robot 2022-03-04 08:20:52 -08:00 committed by GitHub
commit 0a3470a68d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 197 additions and 176 deletions

View File

@ -41,7 +41,7 @@ import (
st "k8s.io/kubernetes/pkg/scheduler/testing"
)
func TestGenericSchedulerWithExtenders(t *testing.T) {
func TestSchedulerWithExtenders(t *testing.T) {
tests := []struct {
name string
registerPlugins []st.RegisterPluginFunc
@ -294,12 +294,19 @@ func TestGenericSchedulerWithExtenders(t *testing.T) {
t.Fatal(err)
}
scheduler := NewGenericScheduler(
scheduler := newScheduler(
cache,
extenders,
nil,
nil,
nil,
nil,
nil,
nil,
emptySnapshot,
schedulerapi.DefaultPercentageOfNodesToScore)
podIgnored := &v1.Pod{}
result, err := scheduler.Schedule(context.Background(), extenders, fwk, framework.NewCycleState(), podIgnored)
result, err := scheduler.SchedulePod(context.Background(), fwk, framework.NewCycleState(), podIgnored)
if test.expectsErr {
if err == nil {
t.Errorf("Unexpected non-error, result %+v", result)

View File

@ -182,22 +182,18 @@ func (c *Configurator) create() (*Scheduler, error) {
)
debugger.ListenForSignal(c.StopEverything)
algo := NewGenericScheduler(
sched := newScheduler(
c.schedulerCache,
extenders,
internalqueue.MakeNextPodFunc(podQueue),
MakeDefaultErrorFunc(c.client, c.informerFactory.Core().V1().Pods().Lister(), podQueue, c.schedulerCache),
c.StopEverything,
podQueue,
profiles,
c.client,
c.nodeInfoSnapshot,
c.percentageOfNodesToScore,
)
return &Scheduler{
Cache: c.schedulerCache,
Algorithm: algo,
Extenders: extenders,
Profiles: profiles,
NextPod: internalqueue.MakeNextPodFunc(podQueue),
Error: MakeDefaultErrorFunc(c.client, c.informerFactory.Core().V1().Pods().Lister(), podQueue, c.schedulerCache),
StopEverything: c.StopEverything,
SchedulingQueue: podQueue,
}, nil
c.percentageOfNodesToScore)
return sched, nil
}
// MakeDefaultErrorFunc construct a function to handle pod scheduler error

View File

@ -32,7 +32,6 @@ import (
"k8s.io/kubernetes/pkg/scheduler/framework"
"k8s.io/kubernetes/pkg/scheduler/framework/parallelize"
"k8s.io/kubernetes/pkg/scheduler/framework/runtime"
internalcache "k8s.io/kubernetes/pkg/scheduler/internal/cache"
"k8s.io/kubernetes/pkg/scheduler/metrics"
utiltrace "k8s.io/utils/trace"
)
@ -53,13 +52,6 @@ const (
// ErrNoNodesAvailable is used to describe the error that no nodes available to schedule pods.
var ErrNoNodesAvailable = fmt.Errorf("no nodes available to schedule pods")
// ScheduleAlgorithm is an interface implemented by things that know how to schedule pods
// onto machines.
// TODO: Rename this type.
type ScheduleAlgorithm interface {
Schedule(context.Context, []framework.Extender, framework.Framework, *framework.CycleState, *v1.Pod) (scheduleResult ScheduleResult, err error)
}
// ScheduleResult represents the result of one pod scheduled. It will contain
// the final selected Node, along with the selected intermediate information.
type ScheduleResult struct {
@ -71,37 +63,30 @@ type ScheduleResult struct {
FeasibleNodes int
}
type genericScheduler struct {
cache internalcache.Cache
nodeInfoSnapshot *internalcache.Snapshot
percentageOfNodesToScore int32
nextStartNodeIndex int
}
// snapshot snapshots scheduler cache and node infos for all fit and priority
// functions.
func (g *genericScheduler) snapshot() error {
func (sched *Scheduler) snapshot() error {
// Used for all fit and priority funcs.
return g.cache.UpdateSnapshot(g.nodeInfoSnapshot)
return sched.Cache.UpdateSnapshot(sched.nodeInfoSnapshot)
}
// Schedule tries to schedule the given pod to one of the nodes in the node list.
// schedulePod tries to schedule the given pod to one of the nodes in the node list.
// If it succeeds, it will return the name of the node.
// If it fails, it will return a FitError error with reasons.
func (g *genericScheduler) Schedule(ctx context.Context, extenders []framework.Extender, fwk framework.Framework, state *framework.CycleState, pod *v1.Pod) (result ScheduleResult, err error) {
// If it fails, it will return a FitError with reasons.
func (sched *Scheduler) schedulePod(ctx context.Context, fwk framework.Framework, state *framework.CycleState, pod *v1.Pod) (result ScheduleResult, err error) {
trace := utiltrace.New("Scheduling", utiltrace.Field{Key: "namespace", Value: pod.Namespace}, utiltrace.Field{Key: "name", Value: pod.Name})
defer trace.LogIfLong(100 * time.Millisecond)
if err := g.snapshot(); err != nil {
if err := sched.snapshot(); err != nil {
return result, err
}
trace.Step("Snapshotting scheduler cache and node infos done")
if g.nodeInfoSnapshot.NumNodes() == 0 {
if sched.nodeInfoSnapshot.NumNodes() == 0 {
return result, ErrNoNodesAvailable
}
feasibleNodes, diagnosis, err := g.findNodesThatFitPod(ctx, extenders, fwk, state, pod)
feasibleNodes, diagnosis, err := sched.findNodesThatFitPod(ctx, fwk, state, pod)
if err != nil {
return result, err
}
@ -110,7 +95,7 @@ func (g *genericScheduler) Schedule(ctx context.Context, extenders []framework.E
if len(feasibleNodes) == 0 {
return result, &framework.FitError{
Pod: pod,
NumAllNodes: g.nodeInfoSnapshot.NumNodes(),
NumAllNodes: sched.nodeInfoSnapshot.NumNodes(),
Diagnosis: diagnosis,
}
}
@ -124,12 +109,12 @@ func (g *genericScheduler) Schedule(ctx context.Context, extenders []framework.E
}, nil
}
priorityList, err := prioritizeNodes(ctx, extenders, fwk, state, pod, feasibleNodes)
priorityList, err := prioritizeNodes(ctx, sched.Extenders, fwk, state, pod, feasibleNodes)
if err != nil {
return result, err
}
host, err := g.selectHost(priorityList)
host, err := selectHost(priorityList)
trace.Step("Prioritizing done")
return ScheduleResult{
@ -141,7 +126,7 @@ func (g *genericScheduler) Schedule(ctx context.Context, extenders []framework.E
// selectHost takes a prioritized list of nodes and then picks one
// in a reservoir sampling manner from the nodes that had the highest score.
func (g *genericScheduler) selectHost(nodeScoreList framework.NodeScoreList) (string, error) {
func selectHost(nodeScoreList framework.NodeScoreList) (string, error) {
if len(nodeScoreList) == 0 {
return "", fmt.Errorf("empty priorityList")
}
@ -166,12 +151,12 @@ func (g *genericScheduler) selectHost(nodeScoreList framework.NodeScoreList) (st
// numFeasibleNodesToFind returns the number of feasible nodes that once found, the scheduler stops
// its search for more feasible nodes.
func (g *genericScheduler) numFeasibleNodesToFind(numAllNodes int32) (numNodes int32) {
if numAllNodes < minFeasibleNodesToFind || g.percentageOfNodesToScore >= 100 {
func (sched *Scheduler) numFeasibleNodesToFind(numAllNodes int32) (numNodes int32) {
if numAllNodes < minFeasibleNodesToFind || sched.percentageOfNodesToScore >= 100 {
return numAllNodes
}
adaptivePercentage := g.percentageOfNodesToScore
adaptivePercentage := sched.percentageOfNodesToScore
if adaptivePercentage <= 0 {
basePercentageOfNodesToScore := int32(50)
adaptivePercentage = basePercentageOfNodesToScore - numAllNodes/125
@ -188,19 +173,19 @@ func (g *genericScheduler) numFeasibleNodesToFind(numAllNodes int32) (numNodes i
return numNodes
}
func (g *genericScheduler) evaluateNominatedNode(ctx context.Context, extenders []framework.Extender, pod *v1.Pod, fwk framework.Framework, state *framework.CycleState, diagnosis framework.Diagnosis) ([]*v1.Node, error) {
func (sched *Scheduler) evaluateNominatedNode(ctx context.Context, pod *v1.Pod, fwk framework.Framework, state *framework.CycleState, diagnosis framework.Diagnosis) ([]*v1.Node, error) {
nnn := pod.Status.NominatedNodeName
nodeInfo, err := g.nodeInfoSnapshot.Get(nnn)
nodeInfo, err := sched.nodeInfoSnapshot.Get(nnn)
if err != nil {
return nil, err
}
node := []*framework.NodeInfo{nodeInfo}
feasibleNodes, err := g.findNodesThatPassFilters(ctx, fwk, state, pod, diagnosis, node)
feasibleNodes, err := sched.findNodesThatPassFilters(ctx, fwk, state, pod, diagnosis, node)
if err != nil {
return nil, err
}
feasibleNodes, err = findNodesThatPassExtenders(extenders, pod, feasibleNodes, diagnosis.NodeToStatusMap)
feasibleNodes, err = findNodesThatPassExtenders(sched.Extenders, pod, feasibleNodes, diagnosis.NodeToStatusMap)
if err != nil {
return nil, err
}
@ -210,7 +195,7 @@ func (g *genericScheduler) evaluateNominatedNode(ctx context.Context, extenders
// Filters the nodes to find the ones that fit the pod based on the framework
// filter plugins and filter extenders.
func (g *genericScheduler) findNodesThatFitPod(ctx context.Context, extenders []framework.Extender, fwk framework.Framework, state *framework.CycleState, pod *v1.Pod) ([]*v1.Node, framework.Diagnosis, error) {
func (sched *Scheduler) findNodesThatFitPod(ctx context.Context, fwk framework.Framework, state *framework.CycleState, pod *v1.Pod) ([]*v1.Node, framework.Diagnosis, error) {
diagnosis := framework.Diagnosis{
NodeToStatusMap: make(framework.NodeToStatusMap),
UnschedulablePlugins: sets.NewString(),
@ -218,7 +203,7 @@ func (g *genericScheduler) findNodesThatFitPod(ctx context.Context, extenders []
// Run "prefilter" plugins.
s := fwk.RunPreFilterPlugins(ctx, state, pod)
allNodes, err := g.nodeInfoSnapshot.NodeInfos().List()
allNodes, err := sched.nodeInfoSnapshot.NodeInfos().List()
if err != nil {
return nil, diagnosis, err
}
@ -239,7 +224,7 @@ func (g *genericScheduler) findNodesThatFitPod(ctx context.Context, extenders []
// "NominatedNodeName" can potentially be set in a previous scheduling cycle as a result of preemption.
// This node is likely the only candidate that will fit the pod, and hence we try it first before iterating over all nodes.
if len(pod.Status.NominatedNodeName) > 0 {
feasibleNodes, err := g.evaluateNominatedNode(ctx, extenders, pod, fwk, state, diagnosis)
feasibleNodes, err := sched.evaluateNominatedNode(ctx, pod, fwk, state, diagnosis)
if err != nil {
klog.ErrorS(err, "Evaluation failed on nominated node", "pod", klog.KObj(pod), "node", pod.Status.NominatedNodeName)
}
@ -248,12 +233,12 @@ func (g *genericScheduler) findNodesThatFitPod(ctx context.Context, extenders []
return feasibleNodes, diagnosis, nil
}
}
feasibleNodes, err := g.findNodesThatPassFilters(ctx, fwk, state, pod, diagnosis, allNodes)
feasibleNodes, err := sched.findNodesThatPassFilters(ctx, fwk, state, pod, diagnosis, allNodes)
if err != nil {
return nil, diagnosis, err
}
feasibleNodes, err = findNodesThatPassExtenders(extenders, pod, feasibleNodes, diagnosis.NodeToStatusMap)
feasibleNodes, err = findNodesThatPassExtenders(sched.Extenders, pod, feasibleNodes, diagnosis.NodeToStatusMap)
if err != nil {
return nil, diagnosis, err
}
@ -261,14 +246,14 @@ func (g *genericScheduler) findNodesThatFitPod(ctx context.Context, extenders []
}
// findNodesThatPassFilters finds the nodes that fit the filter plugins.
func (g *genericScheduler) findNodesThatPassFilters(
func (sched *Scheduler) findNodesThatPassFilters(
ctx context.Context,
fwk framework.Framework,
state *framework.CycleState,
pod *v1.Pod,
diagnosis framework.Diagnosis,
nodes []*framework.NodeInfo) ([]*v1.Node, error) {
numNodesToFind := g.numFeasibleNodesToFind(int32(len(nodes)))
numNodesToFind := sched.numFeasibleNodesToFind(int32(len(nodes)))
// Create feasible list with enough space to avoid growing it
// and allow assigning.
@ -277,9 +262,9 @@ func (g *genericScheduler) findNodesThatPassFilters(
if !fwk.HasFilterPlugins() {
length := len(nodes)
for i := range feasibleNodes {
feasibleNodes[i] = nodes[(g.nextStartNodeIndex+i)%length].Node()
feasibleNodes[i] = nodes[(sched.nextStartNodeIndex+i)%length].Node()
}
g.nextStartNodeIndex = (g.nextStartNodeIndex + len(feasibleNodes)) % length
sched.nextStartNodeIndex = (sched.nextStartNodeIndex + len(feasibleNodes)) % length
return feasibleNodes, nil
}
@ -290,7 +275,7 @@ func (g *genericScheduler) findNodesThatPassFilters(
checkNode := func(i int) {
// We check the nodes starting from where we left off in the previous scheduling cycle,
// this is to make sure all nodes have the same chance of being examined across pods.
nodeInfo := nodes[(g.nextStartNodeIndex+i)%len(nodes)]
nodeInfo := nodes[(sched.nextStartNodeIndex+i)%len(nodes)]
status := fwk.RunFilterPluginsWithNominatedPods(ctx, state, pod, nodeInfo)
if status.Code() == framework.Error {
errCh.SendErrorWithCancel(status.AsError(), cancel)
@ -325,7 +310,7 @@ func (g *genericScheduler) findNodesThatPassFilters(
// are found.
fwk.Parallelizer().Until(ctx, len(nodes), checkNode)
processedNodes := int(feasibleNodesLen) + len(diagnosis.NodeToStatusMap)
g.nextStartNodeIndex = (g.nextStartNodeIndex + processedNodes) % len(nodes)
sched.nextStartNodeIndex = (sched.nextStartNodeIndex + processedNodes) % len(nodes)
feasibleNodes = feasibleNodes[:feasibleNodesLen]
if err := errCh.ReceiveError(); err != nil {
@ -494,15 +479,3 @@ func prioritizeNodes(
}
return result, nil
}
// NewGenericScheduler creates a genericScheduler object.
func NewGenericScheduler(
cache internalcache.Cache,
nodeInfoSnapshot *internalcache.Snapshot,
percentageOfNodesToScore int32) ScheduleAlgorithm {
return &genericScheduler{
cache: cache,
nodeInfoSnapshot: nodeInfoSnapshot,
percentageOfNodesToScore: percentageOfNodesToScore,
}
}

View File

@ -202,7 +202,6 @@ func makeNodeList(nodeNames []string) []*v1.Node {
}
func TestSelectHost(t *testing.T) {
scheduler := genericScheduler{}
tests := []struct {
name string
list framework.NodeScoreList
@ -253,7 +252,7 @@ func TestSelectHost(t *testing.T) {
t.Run(test.name, func(t *testing.T) {
// increase the randomness
for i := 0; i < 10; i++ {
got, err := scheduler.selectHost(test.list)
got, err := selectHost(test.list)
if test.expectsErr {
if err == nil {
t.Error("Unexpected non-error")
@ -450,7 +449,7 @@ func TestFindNodesThatPassExtenders(t *testing.T) {
}
}
func TestGenericScheduler(t *testing.T) {
func TestSchedulerSchedulePod(t *testing.T) {
fts := feature.Features{}
tests := []struct {
name string
@ -1006,15 +1005,21 @@ func TestGenericScheduler(t *testing.T) {
t.Fatal(err)
}
scheduler := NewGenericScheduler(
scheduler := newScheduler(
cache,
nil,
nil,
nil,
nil,
nil,
nil,
nil,
snapshot,
schedulerapi.DefaultPercentageOfNodesToScore,
)
schedulerapi.DefaultPercentageOfNodesToScore)
informerFactory.Start(ctx.Done())
informerFactory.WaitForCacheSync(ctx.Done())
result, err := scheduler.Schedule(ctx, nil, fwk, framework.NewCycleState(), test.pod)
result, err := scheduler.SchedulePod(ctx, fwk, framework.NewCycleState(), test.pod)
if err != test.wErr {
gotFitErr, gotOK := err.(*framework.FitError)
wantFitErr, wantOK := test.wErr.(*framework.FitError)
@ -1036,19 +1041,26 @@ func TestGenericScheduler(t *testing.T) {
}
}
// makeScheduler makes a simple genericScheduler for testing.
func makeScheduler(nodes []*v1.Node) *genericScheduler {
// makeScheduler makes a simple Scheduler for testing.
func makeScheduler(nodes []*v1.Node) *Scheduler {
cache := internalcache.New(time.Duration(0), wait.NeverStop)
for _, n := range nodes {
cache.AddNode(n)
}
s := NewGenericScheduler(
s := newScheduler(
cache,
nil,
nil,
nil,
nil,
nil,
nil,
nil,
emptySnapshot,
schedulerapi.DefaultPercentageOfNodesToScore)
cache.UpdateSnapshot(s.(*genericScheduler).nodeInfoSnapshot)
return s.(*genericScheduler)
cache.UpdateSnapshot(s.nodeInfoSnapshot)
return s
}
func TestFindFitAllError(t *testing.T) {
@ -1068,8 +1080,7 @@ func TestFindFitAllError(t *testing.T) {
t.Fatal(err)
}
_, diagnosis, err := scheduler.findNodesThatFitPod(context.Background(), nil, fwk, framework.NewCycleState(), &v1.Pod{})
_, diagnosis, err := scheduler.findNodesThatFitPod(context.Background(), fwk, framework.NewCycleState(), &v1.Pod{})
if err != nil {
t.Errorf("unexpected error: %v", err)
}
@ -1105,8 +1116,7 @@ func TestFindFitSomeError(t *testing.T) {
}
pod := &v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "1", UID: types.UID("1")}}
_, diagnosis, err := scheduler.findNodesThatFitPod(context.Background(), nil, fwk, framework.NewCycleState(), pod)
_, diagnosis, err := scheduler.findNodesThatFitPod(context.Background(), fwk, framework.NewCycleState(), pod)
if err != nil {
t.Errorf("unexpected error: %v", err)
}
@ -1179,14 +1189,13 @@ func TestFindFitPredicateCallCounts(t *testing.T) {
}
scheduler := makeScheduler(nodes)
if err := scheduler.cache.UpdateSnapshot(scheduler.nodeInfoSnapshot); err != nil {
if err := scheduler.Cache.UpdateSnapshot(scheduler.nodeInfoSnapshot); err != nil {
t.Fatal(err)
}
fwk.AddNominatedPod(framework.NewPodInfo(&v1.Pod{ObjectMeta: metav1.ObjectMeta{UID: "nominated"}, Spec: v1.PodSpec{Priority: &midPriority}}),
&framework.NominatingInfo{NominatingMode: framework.ModeOverride, NominatedNodeName: "1"})
_, _, err = scheduler.findNodesThatFitPod(context.Background(), nil, fwk, framework.NewCycleState(), test.pod)
_, _, err = scheduler.findNodesThatFitPod(context.Background(), fwk, framework.NewCycleState(), test.pod)
if err != nil {
t.Errorf("unexpected error: %v", err)
}
@ -1335,15 +1344,21 @@ func TestZeroRequest(t *testing.T) {
t.Fatalf("error creating framework: %+v", err)
}
scheduler := NewGenericScheduler(
scheduler := newScheduler(
nil,
emptySnapshot,
schedulerapi.DefaultPercentageOfNodesToScore).(*genericScheduler)
scheduler.nodeInfoSnapshot = snapshot
nil,
nil,
nil,
nil,
nil,
nil,
nil,
snapshot,
schedulerapi.DefaultPercentageOfNodesToScore)
ctx := context.Background()
state := framework.NewCycleState()
_, _, err = scheduler.findNodesThatFitPod(ctx, nil, fwk, state, test.pod)
_, _, err = scheduler.findNodesThatFitPod(ctx, fwk, state, test.pod)
if err != nil {
t.Fatalf("error filtering nodes: %+v", err)
}
@ -1406,11 +1421,11 @@ func TestNumFeasibleNodesToFind(t *testing.T) {
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
g := &genericScheduler{
sched := &Scheduler{
percentageOfNodesToScore: tt.percentageOfNodesToScore,
}
if gotNumNodes := g.numFeasibleNodesToFind(tt.numAllNodes); gotNumNodes != tt.wantNumNodes {
t.Errorf("genericScheduler.numFeasibleNodesToFind() = %v, want %v", gotNumNodes, tt.wantNumNodes)
if gotNumNodes := sched.numFeasibleNodesToFind(tt.numAllNodes); gotNumNodes != tt.wantNumNodes {
t.Errorf("Scheduler.numFeasibleNodesToFind() = %v, want %v", gotNumNodes, tt.wantNumNodes)
}
})
}
@ -1423,7 +1438,7 @@ func TestFairEvaluationForNodes(t *testing.T) {
nodeNames = append(nodeNames, strconv.Itoa(i))
}
nodes := makeNodeList(nodeNames)
g := makeScheduler(nodes)
sched := makeScheduler(nodes)
fwk, err := st.NewFramework(
[]st.RegisterPluginFunc{
st.RegisterQueueSortPlugin(queuesort.Name, queuesort.New),
@ -1438,20 +1453,20 @@ func TestFairEvaluationForNodes(t *testing.T) {
}
// To make numAllNodes % nodesToFind != 0
g.percentageOfNodesToScore = 30
nodesToFind := int(g.numFeasibleNodesToFind(int32(numAllNodes)))
sched.percentageOfNodesToScore = 30
nodesToFind := int(sched.numFeasibleNodesToFind(int32(numAllNodes)))
// Iterating over all nodes more than twice
for i := 0; i < 2*(numAllNodes/nodesToFind+1); i++ {
nodesThatFit, _, err := g.findNodesThatFitPod(context.Background(), nil, fwk, framework.NewCycleState(), &v1.Pod{})
nodesThatFit, _, err := sched.findNodesThatFitPod(context.Background(), fwk, framework.NewCycleState(), &v1.Pod{})
if err != nil {
t.Errorf("unexpected error: %v", err)
}
if len(nodesThatFit) != nodesToFind {
t.Errorf("got %d nodes filtered, want %d", len(nodesThatFit), nodesToFind)
}
if g.nextStartNodeIndex != (i+1)*nodesToFind%numAllNodes {
t.Errorf("got %d lastProcessedNodeIndex, want %d", g.nextStartNodeIndex, (i+1)*nodesToFind%numAllNodes)
if sched.nextStartNodeIndex != (i+1)*nodesToFind%numAllNodes {
t.Errorf("got %d lastProcessedNodeIndex, want %d", sched.nextStartNodeIndex, (i+1)*nodesToFind%numAllNodes)
}
}
}
@ -1513,13 +1528,19 @@ func TestPreferNominatedNodeFilterCallCounts(t *testing.T) {
t.Fatal(err)
}
snapshot := internalcache.NewSnapshot(nil, nodes)
scheduler := NewGenericScheduler(
scheduler := newScheduler(
cache,
nil,
nil,
nil,
nil,
nil,
nil,
nil,
snapshot,
schedulerapi.DefaultPercentageOfNodesToScore).(*genericScheduler)
_, _, err = scheduler.findNodesThatFitPod(context.Background(), nil, fwk, framework.NewCycleState(), test.pod)
schedulerapi.DefaultPercentageOfNodesToScore)
_, _, err = scheduler.findNodesThatFitPod(context.Background(), fwk, framework.NewCycleState(), test.pod)
if err != nil {
t.Errorf("unexpected error: %v", err)
}

View File

@ -69,8 +69,6 @@ type Scheduler struct {
// by NodeLister and Algorithm.
Cache internalcache.Cache
Algorithm ScheduleAlgorithm
Extenders []framework.Extender
// NextPod should be a function that blocks until the next pod
@ -83,6 +81,11 @@ type Scheduler struct {
// question, and the error
Error func(*framework.QueuedPodInfo, error)
// SchedulePod tries to schedule the given pod to one of the nodes in the node list.
// Return a struct of ScheduleResult with the name of suggested host on success,
// otherwise will return a FitError with reasons.
SchedulePod func(ctx context.Context, fwk framework.Framework, state *framework.CycleState, pod *v1.Pod) (ScheduleResult, error)
// Close this to shut down the scheduler.
StopEverything <-chan struct{}
@ -93,6 +96,12 @@ type Scheduler struct {
Profiles profile.Map
client clientset.Interface
nodeInfoSnapshot *internalcache.Snapshot
percentageOfNodesToScore int32
nextStartNodeIndex int
}
type schedulerOptions struct {
@ -213,6 +222,34 @@ var defaultSchedulerOptions = schedulerOptions{
applyDefaultProfile: true,
}
// newScheduler creates a Scheduler object.
func newScheduler(
cache internalcache.Cache,
extenders []framework.Extender,
nextPod func() *framework.QueuedPodInfo,
Error func(*framework.QueuedPodInfo, error),
stopEverything <-chan struct{},
schedulingQueue internalqueue.SchedulingQueue,
profiles profile.Map,
client clientset.Interface,
nodeInfoSnapshot *internalcache.Snapshot,
percentageOfNodesToScore int32) *Scheduler {
sched := Scheduler{
Cache: cache,
Extenders: extenders,
NextPod: nextPod,
Error: Error,
StopEverything: stopEverything,
SchedulingQueue: schedulingQueue,
Profiles: profiles,
client: client,
nodeInfoSnapshot: nodeInfoSnapshot,
percentageOfNodesToScore: percentageOfNodesToScore,
}
sched.SchedulePod = sched.schedulePod
return &sched
}
// New returns a Scheduler
func New(client clientset.Interface,
informerFactory informers.SharedInformerFactory,
@ -279,10 +316,6 @@ func New(client clientset.Interface,
return nil, fmt.Errorf("couldn't create scheduler: %v", err)
}
// Additional tweaks to the config produced by the configurator.
sched.StopEverything = stopEverything
sched.client = client
addAllEventHandlers(sched, informerFactory, dynInformerFactory, unionedGVKs(clusterEventMap))
return sched, nil
@ -462,9 +495,9 @@ func (sched *Scheduler) scheduleOne(ctx context.Context) {
schedulingCycleCtx, cancel := context.WithCancel(ctx)
defer cancel()
scheduleResult, err := sched.Algorithm.Schedule(schedulingCycleCtx, sched.Extenders, fwk, state, pod)
scheduleResult, err := sched.SchedulePod(schedulingCycleCtx, fwk, state, pod)
if err != nil {
// Schedule() may have failed because the pod would not fit on any host, so we try to
// SchedulePod() may have failed because the pod would not fit on any host, so we try to
// preempt, with the expectation that the next time the pod is tried for scheduling it
// will fit due to the preemption. It is also possible that a different pod will schedule
// into the resources that were preempted, but this is harmless.

View File

@ -104,15 +104,11 @@ func podWithResources(id, desiredHost string, limits v1.ResourceList, requests v
return pod
}
type mockScheduler struct {
type mockScheduleResult struct {
result ScheduleResult
err error
}
func (es mockScheduler) Schedule(ctx context.Context, extenders []framework.Extender, fwk framework.Framework, state *framework.CycleState, pod *v1.Pod) (ScheduleResult, error) {
return es.result, es.err
}
func TestSchedulerCreation(t *testing.T) {
invalidRegistry := map[string]frameworkruntime.PluginFactory{
defaultbinder.Name: defaultbinder.New,
@ -307,7 +303,6 @@ func TestSchedulerScheduleOne(t *testing.T) {
name string
injectBindError error
sendPod *v1.Pod
algo ScheduleAlgorithm
registerPluginFuncs []st.RegisterPluginFunc
expectErrorPod *v1.Pod
expectForgetPod *v1.Pod
@ -315,11 +310,12 @@ func TestSchedulerScheduleOne(t *testing.T) {
expectError error
expectBind *v1.Binding
eventReason string
mockResult mockScheduleResult
}{
{
name: "error reserve pod",
sendPod: podWithID("foo", ""),
algo: mockScheduler{ScheduleResult{SuggestedHost: testNode.Name, EvaluatedNodes: 1, FeasibleNodes: 1}, nil},
name: "error reserve pod",
sendPod: podWithID("foo", ""),
mockResult: mockScheduleResult{ScheduleResult{SuggestedHost: testNode.Name, EvaluatedNodes: 1, FeasibleNodes: 1}, nil},
registerPluginFuncs: []st.RegisterPluginFunc{
st.RegisterReservePlugin("FakeReserve", st.NewFakeReservePlugin(framework.NewStatus(framework.Error, "reserve error"))),
},
@ -330,9 +326,9 @@ func TestSchedulerScheduleOne(t *testing.T) {
eventReason: "FailedScheduling",
},
{
name: "error permit pod",
sendPod: podWithID("foo", ""),
algo: mockScheduler{ScheduleResult{SuggestedHost: testNode.Name, EvaluatedNodes: 1, FeasibleNodes: 1}, nil},
name: "error permit pod",
sendPod: podWithID("foo", ""),
mockResult: mockScheduleResult{ScheduleResult{SuggestedHost: testNode.Name, EvaluatedNodes: 1, FeasibleNodes: 1}, nil},
registerPluginFuncs: []st.RegisterPluginFunc{
st.RegisterPermitPlugin("FakePermit", st.NewFakePermitPlugin(framework.NewStatus(framework.Error, "permit error"), time.Minute)),
},
@ -343,9 +339,9 @@ func TestSchedulerScheduleOne(t *testing.T) {
eventReason: "FailedScheduling",
},
{
name: "error prebind pod",
sendPod: podWithID("foo", ""),
algo: mockScheduler{ScheduleResult{SuggestedHost: testNode.Name, EvaluatedNodes: 1, FeasibleNodes: 1}, nil},
name: "error prebind pod",
sendPod: podWithID("foo", ""),
mockResult: mockScheduleResult{ScheduleResult{SuggestedHost: testNode.Name, EvaluatedNodes: 1, FeasibleNodes: 1}, nil},
registerPluginFuncs: []st.RegisterPluginFunc{
st.RegisterPreBindPlugin("FakePreBind", st.NewFakePreBindPlugin(framework.AsStatus(preBindErr))),
},
@ -358,7 +354,7 @@ func TestSchedulerScheduleOne(t *testing.T) {
{
name: "bind assumed pod scheduled",
sendPod: podWithID("foo", ""),
algo: mockScheduler{ScheduleResult{SuggestedHost: testNode.Name, EvaluatedNodes: 1, FeasibleNodes: 1}, nil},
mockResult: mockScheduleResult{ScheduleResult{SuggestedHost: testNode.Name, EvaluatedNodes: 1, FeasibleNodes: 1}, nil},
expectBind: &v1.Binding{ObjectMeta: metav1.ObjectMeta{Name: "foo", UID: types.UID("foo")}, Target: v1.ObjectReference{Kind: "Node", Name: testNode.Name}},
expectAssumedPod: podWithID("foo", testNode.Name),
eventReason: "Scheduled",
@ -366,7 +362,7 @@ func TestSchedulerScheduleOne(t *testing.T) {
{
name: "error pod failed scheduling",
sendPod: podWithID("foo", ""),
algo: mockScheduler{ScheduleResult{SuggestedHost: testNode.Name, EvaluatedNodes: 1, FeasibleNodes: 1}, errS},
mockResult: mockScheduleResult{ScheduleResult{SuggestedHost: testNode.Name, EvaluatedNodes: 1, FeasibleNodes: 1}, errS},
expectError: errS,
expectErrorPod: podWithID("foo", ""),
eventReason: "FailedScheduling",
@ -374,7 +370,7 @@ func TestSchedulerScheduleOne(t *testing.T) {
{
name: "error bind forget pod failed scheduling",
sendPod: podWithID("foo", ""),
algo: mockScheduler{ScheduleResult{SuggestedHost: testNode.Name, EvaluatedNodes: 1, FeasibleNodes: 1}, nil},
mockResult: mockScheduleResult{ScheduleResult{SuggestedHost: testNode.Name, EvaluatedNodes: 1, FeasibleNodes: 1}, nil},
expectBind: &v1.Binding{ObjectMeta: metav1.ObjectMeta{Name: "foo", UID: types.UID("foo")}, Target: v1.ObjectReference{Kind: "Node", Name: testNode.Name}},
expectAssumedPod: podWithID("foo", testNode.Name),
injectBindError: errB,
@ -386,7 +382,7 @@ func TestSchedulerScheduleOne(t *testing.T) {
{
name: "deleting pod",
sendPod: deletingPod("foo"),
algo: mockScheduler{ScheduleResult{}, nil},
mockResult: mockScheduleResult{ScheduleResult{}, nil},
eventReason: "FailedScheduling",
},
}
@ -435,21 +431,26 @@ func TestSchedulerScheduleOne(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
s := &Scheduler{
Cache: cache,
Algorithm: item.algo,
client: client,
Error: func(p *framework.QueuedPodInfo, err error) {
s := newScheduler(
cache,
nil,
func() *framework.QueuedPodInfo {
return &framework.QueuedPodInfo{PodInfo: framework.NewPodInfo(item.sendPod)}
},
func(p *framework.QueuedPodInfo, err error) {
gotPod = p.Pod
gotError = err
},
NextPod: func() *framework.QueuedPodInfo {
return &framework.QueuedPodInfo{PodInfo: framework.NewPodInfo(item.sendPod)}
},
Profiles: profile.Map{
nil,
internalqueue.NewTestQueue(ctx, nil),
profile.Map{
testSchedulerName: fwk,
},
SchedulingQueue: internalqueue.NewTestQueue(ctx, nil),
client,
nil,
0)
s.SchedulePod = func(ctx context.Context, fwk framework.Framework, state *framework.CycleState, pod *v1.Pod) (ScheduleResult, error) {
return item.mockResult.result, item.mockResult.err
}
called := make(chan struct{})
stopFunc := eventBroadcaster.StartEventWatcher(func(obj runtime.Object) {
@ -914,29 +915,24 @@ func setupTestScheduler(ctx context.Context, queuedPodStore *clientcache.FIFO, c
frameworkruntime.WithPodNominator(internalqueue.NewPodNominator(informerFactory.Core().V1().Pods().Lister())),
)
algo := NewGenericScheduler(
cache,
internalcache.NewEmptySnapshot(),
schedulerapi.DefaultPercentageOfNodesToScore,
)
errChan := make(chan error, 1)
sched := &Scheduler{
Cache: cache,
Algorithm: algo,
NextPod: func() *framework.QueuedPodInfo {
sched := newScheduler(
cache,
nil,
func() *framework.QueuedPodInfo {
return &framework.QueuedPodInfo{PodInfo: framework.NewPodInfo(clientcache.Pop(queuedPodStore).(*v1.Pod))}
},
Error: func(p *framework.QueuedPodInfo, err error) {
func(p *framework.QueuedPodInfo, err error) {
errChan <- err
},
Profiles: profile.Map{
nil,
schedulingQueue,
profile.Map{
testSchedulerName: fwk,
},
client: client,
SchedulingQueue: schedulingQueue,
}
client,
internalcache.NewEmptySnapshot(),
schedulerapi.DefaultPercentageOfNodesToScore)
return sched, bindingChan, errChan
}
@ -1180,16 +1176,11 @@ func TestSchedulerBinding(t *testing.T) {
}
stop := make(chan struct{})
defer close(stop)
cache := internalcache.New(100*time.Millisecond, stop)
algo := NewGenericScheduler(
cache,
nil,
0,
)
sched := Scheduler{
Algorithm: algo,
Extenders: test.extenders,
Cache: cache,
sched := &Scheduler{
Extenders: test.extenders,
Cache: internalcache.New(100*time.Millisecond, stop),
nodeInfoSnapshot: nil,
percentageOfNodesToScore: 0,
}
err = sched.bind(context.Background(), fwk, pod, "node", nil)
if err != nil {

View File

@ -100,7 +100,7 @@ func TestCoreResourceEnqueue(t *testing.T) {
t.Fatalf("Cannot find the profile for Pod %v", podInfo.Pod.Name)
}
// Schedule the Pod manually.
_, fitError := testCtx.Scheduler.Algorithm.Schedule(ctx, nil, fwk, framework.NewCycleState(), podInfo.Pod)
_, fitError := testCtx.Scheduler.SchedulePod(ctx, fwk, framework.NewCycleState(), podInfo.Pod)
if fitError == nil {
t.Fatalf("Expect Pod %v to fail at scheduling.", podInfo.Pod.Name)
}
@ -277,7 +277,7 @@ func TestCustomResourceEnqueue(t *testing.T) {
t.Fatalf("Cannot find the profile for Pod %v", podInfo.Pod.Name)
}
// Schedule the Pod manually.
_, fitError := testCtx.Scheduler.Algorithm.Schedule(ctx, nil, fwk, framework.NewCycleState(), podInfo.Pod)
_, fitError := testCtx.Scheduler.SchedulePod(ctx, fwk, framework.NewCycleState(), podInfo.Pod)
// The fitError is expected to be non-nil as it failed the fakeCRPlugin plugin.
if fitError == nil {
t.Fatalf("Expect Pod %v to fail at scheduling.", podInfo.Pod.Name)