Merge pull request #112563 from kerthcet/cleanup/optimize-new-scheduler

Remove newScheduler for reducing complexity
This commit is contained in:
Kubernetes Prow Robot 2022-10-11 12:32:41 -07:00 committed by GitHub
commit 5113b705d2
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 86 additions and 132 deletions

View File

@ -295,18 +295,16 @@ func TestSchedulerWithExtenders(t *testing.T) {
t.Fatal(err) t.Fatal(err)
} }
scheduler := newScheduler( sched := &Scheduler{
cache, Cache: cache,
extenders, nodeInfoSnapshot: emptySnapshot,
nil, percentageOfNodesToScore: schedulerapi.DefaultPercentageOfNodesToScore,
nil, Extenders: extenders,
nil, }
nil, sched.applyDefaultHandlers()
nil,
emptySnapshot,
schedulerapi.DefaultPercentageOfNodesToScore)
podIgnored := &v1.Pod{} podIgnored := &v1.Pod{}
result, err := scheduler.SchedulePod(ctx, fwk, framework.NewCycleState(), podIgnored) result, err := sched.SchedulePod(ctx, fwk, framework.NewCycleState(), podIgnored)
if test.expectsErr { if test.expectsErr {
if err == nil { if err == nil {
t.Errorf("Unexpected non-error, result %+v", result) t.Errorf("Unexpected non-error, result %+v", result)

View File

@ -584,24 +584,20 @@ func TestSchedulerScheduleOne(t *testing.T) {
t.Fatal(err) t.Fatal(err)
} }
s := newScheduler( sched := &Scheduler{
cache, Cache: cache,
nil, client: client,
func() *framework.QueuedPodInfo { NextPod: func() *framework.QueuedPodInfo {
return &framework.QueuedPodInfo{PodInfo: framework.NewPodInfo(item.sendPod)} return &framework.QueuedPodInfo{PodInfo: framework.NewPodInfo(item.sendPod)}
}, },
nil, SchedulingQueue: internalqueue.NewTestQueue(ctx, nil),
internalqueue.NewTestQueue(ctx, nil), Profiles: profile.Map{testSchedulerName: fwk},
profile.Map{ }
testSchedulerName: fwk,
}, sched.SchedulePod = func(ctx context.Context, fwk framework.Framework, state *framework.CycleState, pod *v1.Pod) (ScheduleResult, error) {
client,
nil,
0)
s.SchedulePod = func(ctx context.Context, fwk framework.Framework, state *framework.CycleState, pod *v1.Pod) (ScheduleResult, error) {
return item.mockResult.result, item.mockResult.err return item.mockResult.result, item.mockResult.err
} }
s.FailureHandler = func(_ context.Context, fwk framework.Framework, p *framework.QueuedPodInfo, err error, _ string, _ *framework.NominatingInfo) { sched.FailureHandler = func(_ context.Context, fwk framework.Framework, p *framework.QueuedPodInfo, err error, _ string, _ *framework.NominatingInfo) {
gotPod = p.Pod gotPod = p.Pod
gotError = err gotError = err
@ -616,7 +612,7 @@ func TestSchedulerScheduleOne(t *testing.T) {
} }
close(called) close(called)
}) })
s.scheduleOne(ctx) sched.scheduleOne(ctx)
<-called <-called
if e, a := item.expectAssumedPod, gotAssumedPod; !reflect.DeepEqual(e, a) { if e, a := item.expectAssumedPod, gotAssumedPod; !reflect.DeepEqual(e, a) {
t.Errorf("assumed pod: wanted %v, got %v", e, a) t.Errorf("assumed pod: wanted %v, got %v", e, a)
@ -2011,20 +2007,17 @@ func TestSchedulerSchedulePod(t *testing.T) {
t.Fatal(err) t.Fatal(err)
} }
scheduler := newScheduler( sched := &Scheduler{
cache, Cache: cache,
nil, nodeInfoSnapshot: snapshot,
nil, percentageOfNodesToScore: schedulerapi.DefaultPercentageOfNodesToScore,
nil, }
nil, sched.applyDefaultHandlers()
nil,
nil,
snapshot,
schedulerapi.DefaultPercentageOfNodesToScore)
informerFactory.Start(ctx.Done()) informerFactory.Start(ctx.Done())
informerFactory.WaitForCacheSync(ctx.Done()) informerFactory.WaitForCacheSync(ctx.Done())
result, err := scheduler.SchedulePod(ctx, fwk, framework.NewCycleState(), test.pod) result, err := sched.SchedulePod(ctx, fwk, framework.NewCycleState(), test.pod)
if err != test.wErr { if err != test.wErr {
gotFitErr, gotOK := err.(*framework.FitError) gotFitErr, gotOK := err.(*framework.FitError)
wantFitErr, wantOK := test.wErr.(*framework.FitError) wantFitErr, wantOK := test.wErr.(*framework.FitError)
@ -2324,19 +2317,14 @@ func TestZeroRequest(t *testing.T) {
t.Fatalf("error creating framework: %+v", err) t.Fatalf("error creating framework: %+v", err)
} }
scheduler := newScheduler( sched := &Scheduler{
nil, nodeInfoSnapshot: snapshot,
nil, percentageOfNodesToScore: schedulerapi.DefaultPercentageOfNodesToScore,
nil, }
nil, sched.applyDefaultHandlers()
nil,
nil,
nil,
snapshot,
schedulerapi.DefaultPercentageOfNodesToScore)
state := framework.NewCycleState() state := framework.NewCycleState()
_, _, err = scheduler.findNodesThatFitPod(ctx, fwk, state, test.pod) _, _, err = sched.findNodesThatFitPod(ctx, fwk, state, test.pod)
if err != nil { if err != nil {
t.Fatalf("error filtering nodes: %+v", err) t.Fatalf("error filtering nodes: %+v", err)
} }
@ -2511,18 +2499,15 @@ func TestPreferNominatedNodeFilterCallCounts(t *testing.T) {
t.Fatal(err) t.Fatal(err)
} }
snapshot := internalcache.NewSnapshot(nil, nodes) snapshot := internalcache.NewSnapshot(nil, nodes)
scheduler := newScheduler(
cache,
nil,
nil,
nil,
nil,
nil,
nil,
snapshot,
schedulerapi.DefaultPercentageOfNodesToScore)
_, _, err = scheduler.findNodesThatFitPod(ctx, fwk, framework.NewCycleState(), test.pod) sched := &Scheduler{
Cache: cache,
nodeInfoSnapshot: snapshot,
percentageOfNodesToScore: schedulerapi.DefaultPercentageOfNodesToScore,
}
sched.applyDefaultHandlers()
_, _, err = sched.findNodesThatFitPod(ctx, fwk, framework.NewCycleState(), test.pod)
if err != nil { if err != nil {
t.Errorf("unexpected error: %v", err) t.Errorf("unexpected error: %v", err)
} }
@ -2572,18 +2557,14 @@ func makeScheduler(nodes []*v1.Node) *Scheduler {
cache.AddNode(n) cache.AddNode(n)
} }
s := newScheduler( sched := &Scheduler{
cache, Cache: cache,
nil, nodeInfoSnapshot: emptySnapshot,
nil, percentageOfNodesToScore: schedulerapi.DefaultPercentageOfNodesToScore,
nil, }
nil, sched.applyDefaultHandlers()
nil, cache.UpdateSnapshot(sched.nodeInfoSnapshot)
nil, return sched
emptySnapshot,
schedulerapi.DefaultPercentageOfNodesToScore)
cache.UpdateSnapshot(s.nodeInfoSnapshot)
return s
} }
func makeNode(node string, milliCPU, memory int64) *v1.Node { func makeNode(node string, milliCPU, memory int64) *v1.Node {
@ -2671,20 +2652,19 @@ func setupTestScheduler(ctx context.Context, queuedPodStore *clientcache.FIFO, c
) )
errChan := make(chan error, 1) errChan := make(chan error, 1)
sched := newScheduler( sched := &Scheduler{
cache, Cache: cache,
nil, client: client,
func() *framework.QueuedPodInfo { nodeInfoSnapshot: internalcache.NewEmptySnapshot(),
percentageOfNodesToScore: schedulerapi.DefaultPercentageOfNodesToScore,
NextPod: func() *framework.QueuedPodInfo {
return &framework.QueuedPodInfo{PodInfo: framework.NewPodInfo(clientcache.Pop(queuedPodStore).(*v1.Pod))} return &framework.QueuedPodInfo{PodInfo: framework.NewPodInfo(clientcache.Pop(queuedPodStore).(*v1.Pod))}
}, },
nil, SchedulingQueue: schedulingQueue,
schedulingQueue, Profiles: profile.Map{testSchedulerName: fwk},
profile.Map{ }
testSchedulerName: fwk,
}, sched.SchedulePod = sched.schedulePod
client,
internalcache.NewEmptySnapshot(),
schedulerapi.DefaultPercentageOfNodesToScore)
sched.FailureHandler = func(_ context.Context, _ framework.Framework, p *framework.QueuedPodInfo, err error, _ string, _ *framework.NominatingInfo) { sched.FailureHandler = func(_ context.Context, _ framework.Framework, p *framework.QueuedPodInfo, err error, _ string, _ *framework.NominatingInfo) {
errChan <- err errChan <- err

View File

@ -98,6 +98,11 @@ type Scheduler struct {
nextStartNodeIndex int nextStartNodeIndex int
} }
func (s *Scheduler) applyDefaultHandlers() {
s.SchedulePod = s.schedulePod
s.FailureHandler = s.handleSchedulingFailure
}
type schedulerOptions struct { type schedulerOptions struct {
componentConfigVersion string componentConfigVersion string
kubeConfig *restclient.Config kubeConfig *restclient.Config
@ -311,17 +316,18 @@ func New(client clientset.Interface,
debugger := cachedebugger.New(nodeLister, podLister, schedulerCache, podQueue) debugger := cachedebugger.New(nodeLister, podLister, schedulerCache, podQueue)
debugger.ListenForSignal(stopEverything) debugger.ListenForSignal(stopEverything)
sched := newScheduler( sched := &Scheduler{
schedulerCache, Cache: schedulerCache,
extenders, client: client,
internalqueue.MakeNextPodFunc(podQueue), nodeInfoSnapshot: snapshot,
stopEverything, percentageOfNodesToScore: options.percentageOfNodesToScore,
podQueue, Extenders: extenders,
profiles, NextPod: internalqueue.MakeNextPodFunc(podQueue),
client, StopEverything: stopEverything,
snapshot, SchedulingQueue: podQueue,
options.percentageOfNodesToScore, Profiles: profiles,
) }
sched.applyDefaultHandlers()
addAllEventHandlers(sched, informerFactory, dynInformerFactory, unionedGVKs(clusterEventMap)) addAllEventHandlers(sched, informerFactory, dynInformerFactory, unionedGVKs(clusterEventMap))
@ -412,33 +418,6 @@ func buildExtenders(extenders []schedulerapi.Extender, profiles []schedulerapi.K
type FailureHandlerFn func(ctx context.Context, fwk framework.Framework, podInfo *framework.QueuedPodInfo, err error, reason string, nominatingInfo *framework.NominatingInfo) type FailureHandlerFn func(ctx context.Context, fwk framework.Framework, podInfo *framework.QueuedPodInfo, err error, reason string, nominatingInfo *framework.NominatingInfo)
// newScheduler creates a Scheduler object.
func newScheduler(
cache internalcache.Cache,
extenders []framework.Extender,
nextPod func() *framework.QueuedPodInfo,
stopEverything <-chan struct{},
schedulingQueue internalqueue.SchedulingQueue,
profiles profile.Map,
client clientset.Interface,
nodeInfoSnapshot *internalcache.Snapshot,
percentageOfNodesToScore int32) *Scheduler {
sched := Scheduler{
Cache: cache,
Extenders: extenders,
NextPod: nextPod,
StopEverything: stopEverything,
SchedulingQueue: schedulingQueue,
Profiles: profiles,
client: client,
nodeInfoSnapshot: nodeInfoSnapshot,
percentageOfNodesToScore: percentageOfNodesToScore,
}
sched.SchedulePod = sched.schedulePod
sched.FailureHandler = sched.handleSchedulingFailure
return &sched
}
func unionedGVKs(m map[framework.ClusterEvent]sets.String) map[framework.GVK]framework.ActionType { func unionedGVKs(m map[framework.ClusterEvent]sets.String) map[framework.GVK]framework.ActionType {
gvkMap := make(map[framework.GVK]framework.ActionType) gvkMap := make(map[framework.GVK]framework.ActionType)
for evt := range m { for evt := range m {

View File

@ -460,17 +460,14 @@ func initScheduler(stop <-chan struct{}, cache internalcache.Cache, queue intern
return nil, nil, err return nil, nil, err
} }
s := newScheduler( s := &Scheduler{
cache, Cache: cache,
nil, client: client,
nil, StopEverything: stop,
stop, SchedulingQueue: queue,
queue, Profiles: profile.Map{testSchedulerName: fwk},
profile.Map{testSchedulerName: fwk}, }
client, s.applyDefaultHandlers()
nil,
0,
)
return s, fwk, nil return s, fwk, nil
} }