diff --git a/pkg/scheduler/apis/config/testing/compatibility_test.go b/pkg/scheduler/apis/config/testing/compatibility_test.go index 6f3185728b7..ef456785e03 100644 --- a/pkg/scheduler/apis/config/testing/compatibility_test.go +++ b/pkg/scheduler/apis/config/testing/compatibility_test.go @@ -1348,7 +1348,7 @@ func TestCompatibility_v1_Scheduler(t *testing.T) { } defProf := sched.Profiles["default-scheduler"] - gotPlugins := defProf.Framework.ListPlugins() + gotPlugins := defProf.ListPlugins() if diff := cmp.Diff(tc.wantPlugins, gotPlugins); diff != "" { t.Errorf("unexpected plugins diff (-want, +got): %s", diff) } diff --git a/pkg/scheduler/core/BUILD b/pkg/scheduler/core/BUILD index 332e2461688..ec49c76dfbd 100644 --- a/pkg/scheduler/core/BUILD +++ b/pkg/scheduler/core/BUILD @@ -15,7 +15,6 @@ go_library( "//pkg/scheduler/internal/cache:go_default_library", "//pkg/scheduler/internal/parallelize:go_default_library", "//pkg/scheduler/metrics:go_default_library", - "//pkg/scheduler/profile:go_default_library", "//pkg/scheduler/util:go_default_library", "//staging/src/k8s.io/api/core/v1:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/util/net:go_default_library", @@ -48,7 +47,6 @@ go_test( "//pkg/scheduler/framework/runtime:go_default_library", "//pkg/scheduler/internal/cache:go_default_library", "//pkg/scheduler/internal/queue:go_default_library", - "//pkg/scheduler/profile:go_default_library", "//pkg/scheduler/testing:go_default_library", "//pkg/scheduler/util:go_default_library", "//staging/src/k8s.io/api/core/v1:go_default_library", diff --git a/pkg/scheduler/core/extender_test.go b/pkg/scheduler/core/extender_test.go index 7844ed235c0..afd35fd78da 100644 --- a/pkg/scheduler/core/extender_test.go +++ b/pkg/scheduler/core/extender_test.go @@ -36,7 +36,6 @@ import ( "k8s.io/kubernetes/pkg/scheduler/framework/runtime" internalcache "k8s.io/kubernetes/pkg/scheduler/internal/cache" internalqueue "k8s.io/kubernetes/pkg/scheduler/internal/queue" - "k8s.io/kubernetes/pkg/scheduler/profile" st "k8s.io/kubernetes/pkg/scheduler/testing" ) @@ -278,9 +277,6 @@ func TestGenericSchedulerWithExtenders(t *testing.T) { if err != nil { t.Fatal(err) } - prof := &profile.Profile{ - Framework: fwk, - } scheduler := NewGenericScheduler( cache, @@ -288,7 +284,7 @@ func TestGenericSchedulerWithExtenders(t *testing.T) { extenders, schedulerapi.DefaultPercentageOfNodesToScore) podIgnored := &v1.Pod{} - result, err := scheduler.Schedule(context.Background(), prof, framework.NewCycleState(), podIgnored) + result, err := scheduler.Schedule(context.Background(), fwk, framework.NewCycleState(), podIgnored) if test.expectsErr { if err == nil { t.Errorf("Unexpected non-error, result %+v", result) diff --git a/pkg/scheduler/core/generic_scheduler.go b/pkg/scheduler/core/generic_scheduler.go index e2159831695..961038dbe1b 100644 --- a/pkg/scheduler/core/generic_scheduler.go +++ b/pkg/scheduler/core/generic_scheduler.go @@ -36,7 +36,6 @@ import ( internalcache "k8s.io/kubernetes/pkg/scheduler/internal/cache" "k8s.io/kubernetes/pkg/scheduler/internal/parallelize" "k8s.io/kubernetes/pkg/scheduler/metrics" - "k8s.io/kubernetes/pkg/scheduler/profile" "k8s.io/kubernetes/pkg/scheduler/util" utiltrace "k8s.io/utils/trace" ) @@ -94,7 +93,7 @@ func (f *FitError) Error() string { // onto machines. // TODO: Rename this type. type ScheduleAlgorithm interface { - Schedule(context.Context, *profile.Profile, *framework.CycleState, *v1.Pod) (scheduleResult ScheduleResult, err error) + Schedule(context.Context, framework.Framework, *framework.CycleState, *v1.Pod) (scheduleResult ScheduleResult, err error) // Extenders returns a slice of extender config. This is exposed for // testing. Extenders() []framework.Extender @@ -129,7 +128,7 @@ func (g *genericScheduler) snapshot() error { // Schedule tries to schedule the given pod to one of the nodes in the node list. // If it succeeds, it will return the name of the node. // If it fails, it will return a FitError error with reasons. -func (g *genericScheduler) Schedule(ctx context.Context, prof *profile.Profile, state *framework.CycleState, pod *v1.Pod) (result ScheduleResult, err error) { +func (g *genericScheduler) Schedule(ctx context.Context, fwk framework.Framework, state *framework.CycleState, pod *v1.Pod) (result ScheduleResult, err error) { trace := utiltrace.New("Scheduling", utiltrace.Field{Key: "namespace", Value: pod.Namespace}, utiltrace.Field{Key: "name", Value: pod.Name}) defer trace.LogIfLong(100 * time.Millisecond) @@ -142,7 +141,7 @@ func (g *genericScheduler) Schedule(ctx context.Context, prof *profile.Profile, return result, ErrNoNodesAvailable } - feasibleNodes, filteredNodesStatuses, err := g.findNodesThatFitPod(ctx, prof, state, pod) + feasibleNodes, filteredNodesStatuses, err := g.findNodesThatFitPod(ctx, fwk, state, pod) if err != nil { return result, err } @@ -165,7 +164,7 @@ func (g *genericScheduler) Schedule(ctx context.Context, prof *profile.Profile, }, nil } - priorityList, err := g.prioritizeNodes(ctx, prof, state, pod, feasibleNodes) + priorityList, err := g.prioritizeNodes(ctx, fwk, state, pod, feasibleNodes) if err != nil { return result, err } @@ -235,11 +234,11 @@ func (g *genericScheduler) numFeasibleNodesToFind(numAllNodes int32) (numNodes i // Filters the nodes to find the ones that fit the pod based on the framework // filter plugins and filter extenders. -func (g *genericScheduler) findNodesThatFitPod(ctx context.Context, prof *profile.Profile, state *framework.CycleState, pod *v1.Pod) ([]*v1.Node, framework.NodeToStatusMap, error) { +func (g *genericScheduler) findNodesThatFitPod(ctx context.Context, fwk framework.Framework, state *framework.CycleState, pod *v1.Pod) ([]*v1.Node, framework.NodeToStatusMap, error) { filteredNodesStatuses := make(framework.NodeToStatusMap) // Run "prefilter" plugins. - s := prof.RunPreFilterPlugins(ctx, state, pod) + s := fwk.RunPreFilterPlugins(ctx, state, pod) if !s.IsSuccess() { if !s.IsUnschedulable() { return nil, nil, s.AsError() @@ -256,7 +255,7 @@ func (g *genericScheduler) findNodesThatFitPod(ctx context.Context, prof *profil return nil, filteredNodesStatuses, nil } - feasibleNodes, err := g.findNodesThatPassFilters(ctx, prof, state, pod, filteredNodesStatuses) + feasibleNodes, err := g.findNodesThatPassFilters(ctx, fwk, state, pod, filteredNodesStatuses) if err != nil { return nil, nil, err } @@ -269,7 +268,7 @@ func (g *genericScheduler) findNodesThatFitPod(ctx context.Context, prof *profil } // findNodesThatPassFilters finds the nodes that fit the filter plugins. -func (g *genericScheduler) findNodesThatPassFilters(ctx context.Context, prof *profile.Profile, state *framework.CycleState, pod *v1.Pod, statuses framework.NodeToStatusMap) ([]*v1.Node, error) { +func (g *genericScheduler) findNodesThatPassFilters(ctx context.Context, fwk framework.Framework, state *framework.CycleState, pod *v1.Pod, statuses framework.NodeToStatusMap) ([]*v1.Node, error) { allNodes, err := g.nodeInfoSnapshot.NodeInfos().List() if err != nil { return nil, err @@ -281,7 +280,7 @@ func (g *genericScheduler) findNodesThatPassFilters(ctx context.Context, prof *p // and allow assigning. feasibleNodes := make([]*v1.Node, numNodesToFind) - if !prof.HasFilterPlugins() { + if !fwk.HasFilterPlugins() { length := len(allNodes) for i := range feasibleNodes { feasibleNodes[i] = allNodes[(g.nextStartNodeIndex+i)%length].Node() @@ -298,7 +297,7 @@ func (g *genericScheduler) findNodesThatPassFilters(ctx context.Context, prof *p // We check the nodes starting from where we left off in the previous scheduling cycle, // this is to make sure all nodes have the same chance of being examined across pods. nodeInfo := allNodes[(g.nextStartNodeIndex+i)%len(allNodes)] - fits, status, err := PodPassesFiltersOnNode(ctx, prof.PreemptHandle(), state, pod, nodeInfo) + fits, status, err := PodPassesFiltersOnNode(ctx, fwk.PreemptHandle(), state, pod, nodeInfo) if err != nil { errCh.SendErrorWithCancel(err, cancel) return @@ -326,7 +325,7 @@ func (g *genericScheduler) findNodesThatPassFilters(ctx context.Context, prof *p // We record Filter extension point latency here instead of in framework.go because framework.RunFilterPlugins // function is called for each node, whereas we want to have an overall latency for all nodes per scheduling cycle. // Note that this latency also includes latency for `addNominatedPods`, which calls framework.RunPreFilterAddPod. - metrics.FrameworkExtensionPointDuration.WithLabelValues(runtime.Filter, statusCode.String(), prof.Name).Observe(metrics.SinceInSeconds(beginCheckNode)) + metrics.FrameworkExtensionPointDuration.WithLabelValues(runtime.Filter, statusCode.String(), fwk.ProfileName()).Observe(metrics.SinceInSeconds(beginCheckNode)) }() // Stops searching for more nodes once the configured number of feasible nodes @@ -470,14 +469,14 @@ func PodPassesFiltersOnNode( // All scores are finally combined (added) to get the total weighted scores of all nodes func (g *genericScheduler) prioritizeNodes( ctx context.Context, - prof *profile.Profile, + fwk framework.Framework, state *framework.CycleState, pod *v1.Pod, nodes []*v1.Node, ) (framework.NodeScoreList, error) { // If no priority configs are provided, then all nodes will have a score of one. // This is required to generate the priority list in the required format - if len(g.extenders) == 0 && !prof.HasScorePlugins() { + if len(g.extenders) == 0 && !fwk.HasScorePlugins() { result := make(framework.NodeScoreList, 0, len(nodes)) for i := range nodes { result = append(result, framework.NodeScore{ @@ -489,13 +488,13 @@ func (g *genericScheduler) prioritizeNodes( } // Run PreScore plugins. - preScoreStatus := prof.RunPreScorePlugins(ctx, state, pod, nodes) + preScoreStatus := fwk.RunPreScorePlugins(ctx, state, pod, nodes) if !preScoreStatus.IsSuccess() { return nil, preScoreStatus.AsError() } // Run the Score plugins. - scoresMap, scoreStatus := prof.RunScorePlugins(ctx, state, pod, nodes) + scoresMap, scoreStatus := fwk.RunScorePlugins(ctx, state, pod, nodes) if !scoreStatus.IsSuccess() { return nil, scoreStatus.AsError() } diff --git a/pkg/scheduler/core/generic_scheduler_test.go b/pkg/scheduler/core/generic_scheduler_test.go index 6622059b73c..dfcd0a2a4ec 100644 --- a/pkg/scheduler/core/generic_scheduler_test.go +++ b/pkg/scheduler/core/generic_scheduler_test.go @@ -47,7 +47,6 @@ import ( frameworkruntime "k8s.io/kubernetes/pkg/scheduler/framework/runtime" internalcache "k8s.io/kubernetes/pkg/scheduler/internal/cache" internalqueue "k8s.io/kubernetes/pkg/scheduler/internal/queue" - "k8s.io/kubernetes/pkg/scheduler/profile" st "k8s.io/kubernetes/pkg/scheduler/testing" schedutil "k8s.io/kubernetes/pkg/scheduler/util" ) @@ -759,9 +758,6 @@ func TestGenericScheduler(t *testing.T) { if err != nil { t.Fatal(err) } - prof := &profile.Profile{ - Framework: fwk, - } scheduler := NewGenericScheduler( cache, @@ -772,7 +768,7 @@ func TestGenericScheduler(t *testing.T) { informerFactory.Start(ctx.Done()) informerFactory.WaitForCacheSync(ctx.Done()) - result, err := scheduler.Schedule(ctx, prof, framework.NewCycleState(), test.pod) + result, err := scheduler.Schedule(ctx, fwk, framework.NewCycleState(), test.pod) if err != test.wErr && !strings.Contains(err.Error(), test.wErr.Error()) { t.Errorf("Unexpected error: %v, expected: %v", err.Error(), test.wErr) } @@ -817,9 +813,8 @@ func TestFindFitAllError(t *testing.T) { if err != nil { t.Fatal(err) } - prof := &profile.Profile{Framework: fwk} - _, nodeToStatusMap, err := scheduler.findNodesThatFitPod(context.Background(), prof, framework.NewCycleState(), &v1.Pod{}) + _, nodeToStatusMap, err := scheduler.findNodesThatFitPod(context.Background(), fwk, framework.NewCycleState(), &v1.Pod{}) if err != nil { t.Errorf("unexpected error: %v", err) @@ -858,10 +853,9 @@ func TestFindFitSomeError(t *testing.T) { if err != nil { t.Fatal(err) } - prof := &profile.Profile{Framework: fwk} pod := &v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "1", UID: types.UID("1")}} - _, nodeToStatusMap, err := scheduler.findNodesThatFitPod(context.Background(), prof, framework.NewCycleState(), pod) + _, nodeToStatusMap, err := scheduler.findNodesThatFitPod(context.Background(), fwk, framework.NewCycleState(), pod) if err != nil { t.Errorf("unexpected error: %v", err) @@ -928,7 +922,6 @@ func TestFindFitPredicateCallCounts(t *testing.T) { if err != nil { t.Fatal(err) } - prof := &profile.Profile{Framework: fwk} scheduler := makeScheduler(nodes) if err := scheduler.cache.UpdateSnapshot(scheduler.nodeInfoSnapshot); err != nil { @@ -936,7 +929,7 @@ func TestFindFitPredicateCallCounts(t *testing.T) { } fwk.PreemptHandle().AddNominatedPod(&v1.Pod{ObjectMeta: metav1.ObjectMeta{UID: "nominated"}, Spec: v1.PodSpec{Priority: &midPriority}}, "1") - _, _, err = scheduler.findNodesThatFitPod(context.Background(), prof, framework.NewCycleState(), test.pod) + _, _, err = scheduler.findNodesThatFitPod(context.Background(), fwk, framework.NewCycleState(), test.pod) if err != nil { t.Errorf("unexpected error: %v", err) @@ -1084,7 +1077,6 @@ func TestZeroRequest(t *testing.T) { if err != nil { t.Fatalf("error creating framework: %+v", err) } - prof := &profile.Profile{Framework: fwk} scheduler := NewGenericScheduler( nil, @@ -1095,12 +1087,12 @@ func TestZeroRequest(t *testing.T) { ctx := context.Background() state := framework.NewCycleState() - _, _, err = scheduler.findNodesThatFitPod(ctx, prof, state, test.pod) + _, _, err = scheduler.findNodesThatFitPod(ctx, fwk, state, test.pod) if err != nil { t.Fatalf("error filtering nodes: %+v", err) } - prof.RunPreScorePlugins(ctx, state, test.pod, test.nodes) - list, err := scheduler.prioritizeNodes(ctx, prof, state, test.pod, test.nodes) + fwk.RunPreScorePlugins(ctx, state, test.pod, test.nodes) + list, err := scheduler.prioritizeNodes(ctx, fwk, state, test.pod, test.nodes) if err != nil { t.Errorf("unexpected error: %v", err) } @@ -1187,14 +1179,14 @@ func TestFairEvaluationForNodes(t *testing.T) { if err != nil { t.Fatal(err) } - prof := &profile.Profile{Framework: fwk} + // To make numAllNodes % nodesToFind != 0 g.percentageOfNodesToScore = 30 nodesToFind := int(g.numFeasibleNodesToFind(int32(numAllNodes))) // Iterating over all nodes more than twice for i := 0; i < 2*(numAllNodes/nodesToFind+1); i++ { - nodesThatFit, _, err := g.findNodesThatFitPod(context.Background(), prof, framework.NewCycleState(), &v1.Pod{}) + nodesThatFit, _, err := g.findNodesThatFitPod(context.Background(), fwk, framework.NewCycleState(), &v1.Pod{}) if err != nil { t.Errorf("unexpected error: %v", err) } diff --git a/pkg/scheduler/eventhandlers.go b/pkg/scheduler/eventhandlers.go index 9abfab6334a..446eac65732 100644 --- a/pkg/scheduler/eventhandlers.go +++ b/pkg/scheduler/eventhandlers.go @@ -205,14 +205,14 @@ func (sched *Scheduler) deletePodFromSchedulingQueue(obj interface{}) { if err := sched.SchedulingQueue.Delete(pod); err != nil { utilruntime.HandleError(fmt.Errorf("unable to dequeue %T: %v", obj, err)) } - prof, err := sched.profileForPod(pod) + fwk, err := sched.frameworkForPod(pod) if err != nil { // This shouldn't happen, because we only accept for scheduling the pods // which specify a scheduler name that matches one of the profiles. klog.Error(err) return } - prof.Framework.RejectWaitingPod(pod.UID) + fwk.RejectWaitingPod(pod.UID) } func (sched *Scheduler) addPodToCache(obj interface{}) { diff --git a/pkg/scheduler/factory.go b/pkg/scheduler/factory.go index e2189fdb385..89d6fc548c7 100644 --- a/pkg/scheduler/factory.go +++ b/pkg/scheduler/factory.go @@ -158,7 +158,7 @@ func (c *Configurator) create() (*Scheduler, error) { return nil, errors.New("at least one profile is required") } // Profiles are required to have equivalent queue sort plugins. - lessFn := profiles[c.profiles[0].SchedulerName].Framework.QueueSortFunc() + lessFn := profiles[c.profiles[0].SchedulerName].QueueSortFunc() podQueue := internalqueue.NewSchedulingQueue( lessFn, internalqueue.WithPodInitialBackoffDuration(time.Duration(c.podInitialBackoffSeconds)*time.Second), diff --git a/pkg/scheduler/framework/interface.go b/pkg/scheduler/framework/interface.go index 3f0660b3db6..a0ab27c719e 100644 --- a/pkg/scheduler/framework/interface.go +++ b/pkg/scheduler/framework/interface.go @@ -504,6 +504,9 @@ type Framework interface { // ListPlugins returns a map of extension point name to list of configured Plugins. ListPlugins() map[string][]config.Plugin + + // ProfileName returns the profile name associated to this framework. + ProfileName() string } // Handle provides data and some tools that plugins can use. It is diff --git a/pkg/scheduler/framework/runtime/framework.go b/pkg/scheduler/framework/runtime/framework.go index f6c715af93c..18d3117634f 100644 --- a/pkg/scheduler/framework/runtime/framework.go +++ b/pkg/scheduler/framework/runtime/framework.go @@ -1027,3 +1027,8 @@ func (f *frameworkImpl) pluginsNeeded(plugins *config.Plugins) map[string]config func (f *frameworkImpl) PreemptHandle() framework.PreemptHandle { return f.preemptHandle } + +// ProfileName returns the profile name associated to this framework. +func (f *frameworkImpl) ProfileName() string { + return f.profileName +} diff --git a/pkg/scheduler/profile/BUILD b/pkg/scheduler/profile/BUILD index 6b7e566d0e4..f082523198c 100644 --- a/pkg/scheduler/profile/BUILD +++ b/pkg/scheduler/profile/BUILD @@ -39,9 +39,7 @@ go_test( "//pkg/scheduler/framework:go_default_library", "//pkg/scheduler/framework/runtime:go_default_library", "//staging/src/k8s.io/api/core/v1:go_default_library", - "//staging/src/k8s.io/api/events/v1:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/runtime:go_default_library", - "//staging/src/k8s.io/client-go/kubernetes/fake:go_default_library", "//staging/src/k8s.io/client-go/tools/events:go_default_library", ], ) diff --git a/pkg/scheduler/profile/profile.go b/pkg/scheduler/profile/profile.go index 4b025955234..621a7319418 100644 --- a/pkg/scheduler/profile/profile.go +++ b/pkg/scheduler/profile/profile.go @@ -36,33 +36,22 @@ type RecorderFactory func(string) events.EventRecorder // FrameworkFactory builds a Framework for a given profile configuration. type FrameworkFactory func(config.KubeSchedulerProfile, ...frameworkruntime.Option) (framework.Framework, error) -// Profile is a scheduling profile. -type Profile struct { - framework.Framework - Recorder events.EventRecorder - Name string -} - -// NewProfile builds a Profile for the given configuration. -func NewProfile(cfg config.KubeSchedulerProfile, frameworkFact FrameworkFactory, recorderFact RecorderFactory, - opts ...frameworkruntime.Option) (*Profile, error) { +// newProfile builds a Profile for the given configuration. +func newProfile(cfg config.KubeSchedulerProfile, frameworkFact FrameworkFactory, recorderFact RecorderFactory, + opts ...frameworkruntime.Option) (framework.Framework, error) { recorder := recorderFact(cfg.SchedulerName) opts = append(opts, frameworkruntime.WithEventRecorder(recorder), frameworkruntime.WithProfileName(cfg.SchedulerName)) fwk, err := frameworkFact(cfg, opts...) if err != nil { return nil, err } - return &Profile{ - Name: cfg.SchedulerName, - Framework: fwk, - Recorder: recorder, - }, nil + return fwk, nil } -// Map holds profiles indexed by scheduler name. -type Map map[string]*Profile +// Map holds frameworks indexed by scheduler name. +type Map map[string]framework.Framework -// NewMap builds the profiles given by the configuration, indexed by name. +// NewMap builds the frameworks given by the configuration, indexed by name. func NewMap(cfgs []config.KubeSchedulerProfile, frameworkFact FrameworkFactory, recorderFact RecorderFactory, opts ...frameworkruntime.Option) (Map, error) { m := make(Map) @@ -72,7 +61,7 @@ func NewMap(cfgs []config.KubeSchedulerProfile, frameworkFact FrameworkFactory, if err := v.validate(cfg); err != nil { return nil, err } - p, err := NewProfile(cfg, frameworkFact, recorderFact, opts...) + p, err := newProfile(cfg, frameworkFact, recorderFact, opts...) if err != nil { return nil, fmt.Errorf("creating profile for scheduler name %s: %v", cfg.SchedulerName, err) } diff --git a/pkg/scheduler/profile/profile_test.go b/pkg/scheduler/profile/profile_test.go index 2db63063b83..bccfe01654d 100644 --- a/pkg/scheduler/profile/profile_test.go +++ b/pkg/scheduler/profile/profile_test.go @@ -23,9 +23,7 @@ import ( "testing" v1 "k8s.io/api/core/v1" - eventsv1 "k8s.io/api/events/v1" "k8s.io/apimachinery/pkg/runtime" - "k8s.io/client-go/kubernetes/fake" "k8s.io/client-go/tools/events" "k8s.io/kubernetes/pkg/scheduler/apis/config" "k8s.io/kubernetes/pkg/scheduler/framework" @@ -39,94 +37,6 @@ var fakeRegistry = frameworkruntime.Registry{ "Another": newFakePlugin, } -func TestNewProfile(t *testing.T) { - cases := []struct { - name string - cfg config.KubeSchedulerProfile - wantErr string - }{ - { - name: "valid", - cfg: config.KubeSchedulerProfile{ - SchedulerName: "valid-profile", - Plugins: &config.Plugins{ - QueueSort: &config.PluginSet{ - Enabled: []config.Plugin{ - {Name: "QueueSort"}, - }, - }, - Bind: &config.PluginSet{ - Enabled: []config.Plugin{ - {Name: "Bind1"}, - }, - }, - }, - PluginConfig: []config.PluginConfig{ - { - Name: "QueueSort", - Args: &runtime.Unknown{Raw: []byte("{}")}, - }, - }, - }, - }, - { - name: "invalid framework configuration", - cfg: config.KubeSchedulerProfile{ - SchedulerName: "invalid-profile", - Plugins: &config.Plugins{ - QueueSort: &config.PluginSet{ - Enabled: []config.Plugin{ - {Name: "QueueSort"}, - }, - }, - }, - }, - wantErr: "at least one bind plugin is needed", - }, - { - name: "one queue sort plugin required for profile", - cfg: config.KubeSchedulerProfile{ - SchedulerName: "profile-1", - Plugins: &config.Plugins{ - Bind: &config.PluginSet{ - Enabled: []config.Plugin{ - {Name: "Bind1"}, - }, - }, - }, - }, - wantErr: "no queue sort plugin is enabled", - }, - } - for _, tc := range cases { - t.Run(tc.name, func(t *testing.T) { - c := fake.NewSimpleClientset() - b := events.NewBroadcaster(&events.EventSinkImpl{Interface: c.EventsV1()}) - p, err := NewProfile(tc.cfg, fakeFrameworkFactory, NewRecorderFactory(b)) - if err := checkErr(err, tc.wantErr); err != nil { - t.Fatal(err) - } - if len(tc.wantErr) != 0 { - return - } - - called := make(chan struct{}) - var ctrl string - stopFn := b.StartEventWatcher(func(obj runtime.Object) { - e, _ := obj.(*eventsv1.Event) - ctrl = e.ReportingController - close(called) - }) - p.Recorder.Eventf(&v1.Pod{}, nil, v1.EventTypeNormal, "", "", "") - <-called - stopFn() - if ctrl != tc.cfg.SchedulerName { - t.Errorf("got controller name %q in event, want %q", ctrl, tc.cfg.SchedulerName) - } - }) - } -} - func TestNewMap(t *testing.T) { cases := []struct { name string @@ -317,6 +227,22 @@ func TestNewMap(t *testing.T) { }, wantErr: "plugins required for profile", }, + { + name: "invalid framework configuration", + cfgs: []config.KubeSchedulerProfile{ + { + SchedulerName: "invalid-profile", + Plugins: &config.Plugins{ + QueueSort: &config.PluginSet{ + Enabled: []config.Plugin{ + {Name: "QueueSort"}, + }, + }, + }, + }, + }, + wantErr: "at least one bind plugin is needed", + }, } for _, tc := range cases { t.Run(tc.name, func(t *testing.T) { diff --git a/pkg/scheduler/scheduler.go b/pkg/scheduler/scheduler.go index 3ab718dde35..45ab04cfbe7 100644 --- a/pkg/scheduler/scheduler.go +++ b/pkg/scheduler/scheduler.go @@ -307,7 +307,7 @@ func (sched *Scheduler) Run(ctx context.Context) { // recordSchedulingFailure records an event for the pod that indicates the // pod has failed to schedule. Also, update the pod condition and nominated node name if set. -func (sched *Scheduler) recordSchedulingFailure(prof *profile.Profile, podInfo *framework.QueuedPodInfo, err error, reason string, nominatedNode string) { +func (sched *Scheduler) recordSchedulingFailure(fwk framework.Framework, podInfo *framework.QueuedPodInfo, err error, reason string, nominatedNode string) { sched.Error(podInfo, err) // Update the scheduling queue with the nominated pod information. Without @@ -319,7 +319,7 @@ func (sched *Scheduler) recordSchedulingFailure(prof *profile.Profile, podInfo * } pod := podInfo.Pod - prof.Recorder.Eventf(pod, nil, v1.EventTypeWarning, "FailedScheduling", "Scheduling", err.Error()) + fwk.EventRecorder().Eventf(pod, nil, v1.EventTypeWarning, "FailedScheduling", "Scheduling", err.Error()) if err := updatePod(sched.client, pod, &v1.PodCondition{ Type: v1.PodScheduled, Status: v1.ConditionFalse, @@ -369,17 +369,17 @@ func (sched *Scheduler) assume(assumed *v1.Pod, host string) error { // bind binds a pod to a given node defined in a binding object. // The precedence for binding is: (1) extenders and (2) framework plugins. // We expect this to run asynchronously, so we handle binding metrics internally. -func (sched *Scheduler) bind(ctx context.Context, prof *profile.Profile, assumed *v1.Pod, targetNode string, state *framework.CycleState) (err error) { +func (sched *Scheduler) bind(ctx context.Context, fwk framework.Framework, assumed *v1.Pod, targetNode string, state *framework.CycleState) (err error) { start := time.Now() defer func() { - sched.finishBinding(prof, assumed, targetNode, start, err) + sched.finishBinding(fwk, assumed, targetNode, start, err) }() bound, err := sched.extendersBinding(assumed, targetNode) if bound { return err } - bindStatus := prof.RunBindPlugins(ctx, state, assumed, targetNode) + bindStatus := fwk.RunBindPlugins(ctx, state, assumed, targetNode) if bindStatus.IsSuccess() { return nil } @@ -403,7 +403,7 @@ func (sched *Scheduler) extendersBinding(pod *v1.Pod, node string) (bool, error) return false, nil } -func (sched *Scheduler) finishBinding(prof *profile.Profile, assumed *v1.Pod, targetNode string, start time.Time, err error) { +func (sched *Scheduler) finishBinding(fwk framework.Framework, assumed *v1.Pod, targetNode string, start time.Time, err error) { if finErr := sched.SchedulerCache.FinishBinding(assumed); finErr != nil { klog.Errorf("scheduler cache FinishBinding failed: %v", finErr) } @@ -413,7 +413,7 @@ func (sched *Scheduler) finishBinding(prof *profile.Profile, assumed *v1.Pod, ta } metrics.DeprecatedBindingLatency.Observe(metrics.SinceInSeconds(start)) - prof.Recorder.Eventf(assumed, nil, v1.EventTypeNormal, "Scheduled", "Binding", "Successfully assigned %v/%v to %v", assumed.Namespace, assumed.Name, targetNode) + fwk.EventRecorder().Eventf(assumed, nil, v1.EventTypeNormal, "Scheduled", "Binding", "Successfully assigned %v/%v to %v", assumed.Namespace, assumed.Name, targetNode) } // scheduleOne does the entire scheduling workflow for a single pod. It is serialized on the scheduling algorithm's host fitting. @@ -424,14 +424,14 @@ func (sched *Scheduler) scheduleOne(ctx context.Context) { return } pod := podInfo.Pod - prof, err := sched.profileForPod(pod) + fwk, err := sched.frameworkForPod(pod) if err != nil { // This shouldn't happen, because we only accept for scheduling the pods // which specify a scheduler name that matches one of the profiles. klog.Error(err) return } - if sched.skipPodSchedule(prof, pod) { + if sched.skipPodSchedule(fwk, pod) { return } @@ -443,7 +443,7 @@ func (sched *Scheduler) scheduleOne(ctx context.Context) { state.SetRecordPluginMetrics(rand.Intn(100) < pluginMetricsSamplePercent) schedulingCycleCtx, cancel := context.WithCancel(ctx) defer cancel() - scheduleResult, err := sched.Algorithm.Schedule(schedulingCycleCtx, prof, state, pod) + scheduleResult, err := sched.Algorithm.Schedule(schedulingCycleCtx, fwk, state, pod) if err != nil { // Schedule() may have failed because the pod would not fit on any host, so we try to // preempt, with the expectation that the next time the pod is tried for scheduling it @@ -451,11 +451,11 @@ func (sched *Scheduler) scheduleOne(ctx context.Context) { // into the resources that were preempted, but this is harmless. nominatedNode := "" if fitError, ok := err.(*core.FitError); ok { - if !prof.HasPostFilterPlugins() { + if !fwk.HasPostFilterPlugins() { klog.V(3).Infof("No PostFilter plugins are registered, so no preemption will be performed.") } else { // Run PostFilter plugins to try to make the pod schedulable in a future scheduling cycle. - result, status := prof.RunPostFilterPlugins(ctx, state, pod, fitError.FilteredNodesStatuses) + result, status := fwk.RunPostFilterPlugins(ctx, state, pod, fitError.FilteredNodesStatuses) if status.Code() == framework.Error { klog.Errorf("Status after running PostFilter plugins for pod %v/%v: %v", pod.Namespace, pod.Name, status) } else { @@ -468,15 +468,15 @@ func (sched *Scheduler) scheduleOne(ctx context.Context) { // Pod did not fit anywhere, so it is counted as a failure. If preemption // succeeds, the pod should get counted as a success the next time we try to // schedule it. (hopefully) - metrics.PodUnschedulable(prof.Name, metrics.SinceInSeconds(start)) + metrics.PodUnschedulable(fwk.ProfileName(), metrics.SinceInSeconds(start)) } else if err == core.ErrNoNodesAvailable { // No nodes available is counted as unschedulable rather than an error. - metrics.PodUnschedulable(prof.Name, metrics.SinceInSeconds(start)) + metrics.PodUnschedulable(fwk.ProfileName(), metrics.SinceInSeconds(start)) } else { klog.ErrorS(err, "Error selecting node for pod", "pod", klog.KObj(pod)) - metrics.PodScheduleError(prof.Name, metrics.SinceInSeconds(start)) + metrics.PodScheduleError(fwk.ProfileName(), metrics.SinceInSeconds(start)) } - sched.recordSchedulingFailure(prof, podInfo, err, v1.PodReasonUnschedulable, nominatedNode) + sched.recordSchedulingFailure(fwk, podInfo, err, v1.PodReasonUnschedulable, nominatedNode) return } metrics.SchedulingAlgorithmLatency.Observe(metrics.SinceInSeconds(start)) @@ -487,45 +487,45 @@ func (sched *Scheduler) scheduleOne(ctx context.Context) { // assume modifies `assumedPod` by setting NodeName=scheduleResult.SuggestedHost err = sched.assume(assumedPod, scheduleResult.SuggestedHost) if err != nil { - metrics.PodScheduleError(prof.Name, metrics.SinceInSeconds(start)) + metrics.PodScheduleError(fwk.ProfileName(), metrics.SinceInSeconds(start)) // This is most probably result of a BUG in retrying logic. // We report an error here so that pod scheduling can be retried. // This relies on the fact that Error will check if the pod has been bound // to a node and if so will not add it back to the unscheduled pods queue // (otherwise this would cause an infinite loop). - sched.recordSchedulingFailure(prof, assumedPodInfo, err, SchedulerError, "") + sched.recordSchedulingFailure(fwk, assumedPodInfo, err, SchedulerError, "") return } // Run the Reserve method of reserve plugins. - if sts := prof.RunReservePluginsReserve(schedulingCycleCtx, state, assumedPod, scheduleResult.SuggestedHost); !sts.IsSuccess() { - metrics.PodScheduleError(prof.Name, metrics.SinceInSeconds(start)) + if sts := fwk.RunReservePluginsReserve(schedulingCycleCtx, state, assumedPod, scheduleResult.SuggestedHost); !sts.IsSuccess() { + metrics.PodScheduleError(fwk.ProfileName(), metrics.SinceInSeconds(start)) // trigger un-reserve to clean up state associated with the reserved Pod - prof.RunReservePluginsUnreserve(schedulingCycleCtx, state, assumedPod, scheduleResult.SuggestedHost) + fwk.RunReservePluginsUnreserve(schedulingCycleCtx, state, assumedPod, scheduleResult.SuggestedHost) if forgetErr := sched.SchedulerCache.ForgetPod(assumedPod); forgetErr != nil { klog.Errorf("scheduler cache ForgetPod failed: %v", forgetErr) } - sched.recordSchedulingFailure(prof, assumedPodInfo, sts.AsError(), SchedulerError, "") + sched.recordSchedulingFailure(fwk, assumedPodInfo, sts.AsError(), SchedulerError, "") return } // Run "permit" plugins. - runPermitStatus := prof.RunPermitPlugins(schedulingCycleCtx, state, assumedPod, scheduleResult.SuggestedHost) + runPermitStatus := fwk.RunPermitPlugins(schedulingCycleCtx, state, assumedPod, scheduleResult.SuggestedHost) if runPermitStatus.Code() != framework.Wait && !runPermitStatus.IsSuccess() { var reason string if runPermitStatus.IsUnschedulable() { - metrics.PodUnschedulable(prof.Name, metrics.SinceInSeconds(start)) + metrics.PodUnschedulable(fwk.ProfileName(), metrics.SinceInSeconds(start)) reason = v1.PodReasonUnschedulable } else { - metrics.PodScheduleError(prof.Name, metrics.SinceInSeconds(start)) + metrics.PodScheduleError(fwk.ProfileName(), metrics.SinceInSeconds(start)) reason = SchedulerError } // One of the plugins returned status different than success or wait. - prof.RunReservePluginsUnreserve(schedulingCycleCtx, state, assumedPod, scheduleResult.SuggestedHost) + fwk.RunReservePluginsUnreserve(schedulingCycleCtx, state, assumedPod, scheduleResult.SuggestedHost) if forgetErr := sched.SchedulerCache.ForgetPod(assumedPod); forgetErr != nil { klog.Errorf("scheduler cache ForgetPod failed: %v", forgetErr) } - sched.recordSchedulingFailure(prof, assumedPodInfo, runPermitStatus.AsError(), reason, "") + sched.recordSchedulingFailure(fwk, assumedPodInfo, runPermitStatus.AsError(), reason, "") return } @@ -536,58 +536,58 @@ func (sched *Scheduler) scheduleOne(ctx context.Context) { metrics.SchedulerGoroutines.WithLabelValues("binding").Inc() defer metrics.SchedulerGoroutines.WithLabelValues("binding").Dec() - waitOnPermitStatus := prof.WaitOnPermit(bindingCycleCtx, assumedPod) + waitOnPermitStatus := fwk.WaitOnPermit(bindingCycleCtx, assumedPod) if !waitOnPermitStatus.IsSuccess() { var reason string if waitOnPermitStatus.IsUnschedulable() { - metrics.PodUnschedulable(prof.Name, metrics.SinceInSeconds(start)) + metrics.PodUnschedulable(fwk.ProfileName(), metrics.SinceInSeconds(start)) reason = v1.PodReasonUnschedulable } else { - metrics.PodScheduleError(prof.Name, metrics.SinceInSeconds(start)) + metrics.PodScheduleError(fwk.ProfileName(), metrics.SinceInSeconds(start)) reason = SchedulerError } // trigger un-reserve plugins to clean up state associated with the reserved Pod - prof.RunReservePluginsUnreserve(bindingCycleCtx, state, assumedPod, scheduleResult.SuggestedHost) + fwk.RunReservePluginsUnreserve(bindingCycleCtx, state, assumedPod, scheduleResult.SuggestedHost) if forgetErr := sched.SchedulerCache.ForgetPod(assumedPod); forgetErr != nil { klog.Errorf("scheduler cache ForgetPod failed: %v", forgetErr) } - sched.recordSchedulingFailure(prof, assumedPodInfo, waitOnPermitStatus.AsError(), reason, "") + sched.recordSchedulingFailure(fwk, assumedPodInfo, waitOnPermitStatus.AsError(), reason, "") return } // Run "prebind" plugins. - preBindStatus := prof.RunPreBindPlugins(bindingCycleCtx, state, assumedPod, scheduleResult.SuggestedHost) + preBindStatus := fwk.RunPreBindPlugins(bindingCycleCtx, state, assumedPod, scheduleResult.SuggestedHost) if !preBindStatus.IsSuccess() { - metrics.PodScheduleError(prof.Name, metrics.SinceInSeconds(start)) + metrics.PodScheduleError(fwk.ProfileName(), metrics.SinceInSeconds(start)) // trigger un-reserve plugins to clean up state associated with the reserved Pod - prof.RunReservePluginsUnreserve(bindingCycleCtx, state, assumedPod, scheduleResult.SuggestedHost) + fwk.RunReservePluginsUnreserve(bindingCycleCtx, state, assumedPod, scheduleResult.SuggestedHost) if forgetErr := sched.SchedulerCache.ForgetPod(assumedPod); forgetErr != nil { klog.Errorf("scheduler cache ForgetPod failed: %v", forgetErr) } - sched.recordSchedulingFailure(prof, assumedPodInfo, preBindStatus.AsError(), SchedulerError, "") + sched.recordSchedulingFailure(fwk, assumedPodInfo, preBindStatus.AsError(), SchedulerError, "") return } - err := sched.bind(bindingCycleCtx, prof, assumedPod, scheduleResult.SuggestedHost, state) + err := sched.bind(bindingCycleCtx, fwk, assumedPod, scheduleResult.SuggestedHost, state) if err != nil { - metrics.PodScheduleError(prof.Name, metrics.SinceInSeconds(start)) + metrics.PodScheduleError(fwk.ProfileName(), metrics.SinceInSeconds(start)) // trigger un-reserve plugins to clean up state associated with the reserved Pod - prof.RunReservePluginsUnreserve(bindingCycleCtx, state, assumedPod, scheduleResult.SuggestedHost) + fwk.RunReservePluginsUnreserve(bindingCycleCtx, state, assumedPod, scheduleResult.SuggestedHost) if err := sched.SchedulerCache.ForgetPod(assumedPod); err != nil { klog.Errorf("scheduler cache ForgetPod failed: %v", err) } - sched.recordSchedulingFailure(prof, assumedPodInfo, fmt.Errorf("Binding rejected: %v", err), SchedulerError, "") + sched.recordSchedulingFailure(fwk, assumedPodInfo, fmt.Errorf("Binding rejected: %v", err), SchedulerError, "") } else { // Calculating nodeResourceString can be heavy. Avoid it if klog verbosity is below 2. if klog.V(2).Enabled() { klog.InfoS("Successfully bound pod to node", "pod", klog.KObj(pod), "node", scheduleResult.SuggestedHost, "evaluatedNodes", scheduleResult.EvaluatedNodes, "feasibleNodes", scheduleResult.FeasibleNodes) } - metrics.PodScheduled(prof.Name, metrics.SinceInSeconds(start)) + metrics.PodScheduled(fwk.ProfileName(), metrics.SinceInSeconds(start)) metrics.PodSchedulingAttempts.Observe(float64(podInfo.Attempts)) metrics.PodSchedulingDuration.WithLabelValues(getAttemptsLabel(podInfo)).Observe(metrics.SinceInSeconds(podInfo.InitialAttemptTimestamp)) // Run "postbind" plugins. - prof.RunPostBindPlugins(bindingCycleCtx, state, assumedPod, scheduleResult.SuggestedHost) + fwk.RunPostBindPlugins(bindingCycleCtx, state, assumedPod, scheduleResult.SuggestedHost) } }() } @@ -601,19 +601,19 @@ func getAttemptsLabel(p *framework.QueuedPodInfo) string { return strconv.Itoa(p.Attempts) } -func (sched *Scheduler) profileForPod(pod *v1.Pod) (*profile.Profile, error) { - prof, ok := sched.Profiles[pod.Spec.SchedulerName] +func (sched *Scheduler) frameworkForPod(pod *v1.Pod) (framework.Framework, error) { + fwk, ok := sched.Profiles[pod.Spec.SchedulerName] if !ok { return nil, fmt.Errorf("profile not found for scheduler name %q", pod.Spec.SchedulerName) } - return prof, nil + return fwk, nil } // skipPodSchedule returns true if we could skip scheduling the pod for specified cases. -func (sched *Scheduler) skipPodSchedule(prof *profile.Profile, pod *v1.Pod) bool { +func (sched *Scheduler) skipPodSchedule(fwk framework.Framework, pod *v1.Pod) bool { // Case 1: pod is being deleted. if pod.DeletionTimestamp != nil { - prof.Recorder.Eventf(pod, nil, v1.EventTypeWarning, "FailedScheduling", "Scheduling", "skip schedule deleting pod: %v/%v", pod.Namespace, pod.Name) + fwk.EventRecorder().Eventf(pod, nil, v1.EventTypeWarning, "FailedScheduling", "Scheduling", "skip schedule deleting pod: %v/%v", pod.Namespace, pod.Name) klog.V(3).Infof("Skip schedule deleting pod: %v/%v", pod.Namespace, pod.Name) return true } diff --git a/pkg/scheduler/scheduler_test.go b/pkg/scheduler/scheduler_test.go index 6a7681ced7f..b328ffa7301 100644 --- a/pkg/scheduler/scheduler_test.go +++ b/pkg/scheduler/scheduler_test.go @@ -112,7 +112,7 @@ type mockScheduler struct { err error } -func (es mockScheduler) Schedule(ctx context.Context, profile *profile.Profile, state *framework.CycleState, pod *v1.Pod) (core.ScheduleResult, error) { +func (es mockScheduler) Schedule(ctx context.Context, fwk framework.Framework, state *framework.CycleState, pod *v1.Pod) (core.ScheduleResult, error) { return es.result, es.err } @@ -340,7 +340,10 @@ func TestSchedulerScheduleOne(t *testing.T) { st.RegisterQueueSortPlugin(queuesort.Name, queuesort.New), st.RegisterBindPlugin(defaultbinder.Name, defaultbinder.New), ) - fwk, err := st.NewFramework(registerPluginFuncs, frameworkruntime.WithClientSet(client)) + fwk, err := st.NewFramework(registerPluginFuncs, + frameworkruntime.WithClientSet(client), + frameworkruntime.WithEventRecorder(eventBroadcaster.NewRecorder(scheme.Scheme, testSchedulerName)), + frameworkruntime.WithProfileName(testSchedulerName)) if err != nil { t.Fatal(err) } @@ -357,11 +360,7 @@ func TestSchedulerScheduleOne(t *testing.T) { return &framework.QueuedPodInfo{Pod: item.sendPod} }, Profiles: profile.Map{ - testSchedulerName: &profile.Profile{ - Framework: fwk, - Recorder: eventBroadcaster.NewRecorder(scheme.Scheme, testSchedulerName), - Name: testSchedulerName, - }, + testSchedulerName: fwk, }, } called := make(chan struct{}) @@ -808,23 +807,21 @@ func setupTestScheduler(queuedPodStore *clientcache.FIFO, scache internalcache.C return true, b, nil }) + var recorder events.EventRecorder + if broadcaster != nil { + recorder = broadcaster.NewRecorder(scheme.Scheme, testSchedulerName) + } else { + recorder = &events.FakeRecorder{} + } + fwk, _ := st.NewFramework( fns, frameworkruntime.WithClientSet(client), + frameworkruntime.WithEventRecorder(recorder), frameworkruntime.WithInformerFactory(informerFactory), frameworkruntime.WithPodNominator(internalqueue.NewPodNominator()), + frameworkruntime.WithProfileName(testSchedulerName), ) - prof := &profile.Profile{ - Framework: fwk, - Recorder: &events.FakeRecorder{}, - Name: testSchedulerName, - } - if broadcaster != nil { - prof.Recorder = broadcaster.NewRecorder(scheme.Scheme, testSchedulerName) - } - profiles := profile.Map{ - testSchedulerName: prof, - } algo := core.NewGenericScheduler( scache, @@ -843,8 +840,10 @@ func setupTestScheduler(queuedPodStore *clientcache.FIFO, scache internalcache.C Error: func(p *framework.QueuedPodInfo, err error) { errChan <- err }, - Profiles: profiles, - client: client, + Profiles: profile.Map{ + testSchedulerName: fwk, + }, + client: client, } return sched, bindingChan, errChan @@ -1164,14 +1163,10 @@ func TestSchedulerBinding(t *testing.T) { fwk, err := st.NewFramework([]st.RegisterPluginFunc{ st.RegisterQueueSortPlugin(queuesort.Name, queuesort.New), st.RegisterBindPlugin(defaultbinder.Name, defaultbinder.New), - }, frameworkruntime.WithClientSet(client)) + }, frameworkruntime.WithClientSet(client), frameworkruntime.WithEventRecorder(&events.FakeRecorder{})) if err != nil { t.Fatal(err) } - prof := &profile.Profile{ - Framework: fwk, - Recorder: &events.FakeRecorder{}, - } stop := make(chan struct{}) defer close(stop) scache := internalcache.New(100*time.Millisecond, stop) @@ -1185,7 +1180,7 @@ func TestSchedulerBinding(t *testing.T) { Algorithm: algo, SchedulerCache: scache, } - err = sched.bind(context.Background(), prof, pod, "node", nil) + err = sched.bind(context.Background(), fwk, pod, "node", nil) if err != nil { t.Error(err) }