diff --git a/cmd/kube-scheduler/app/BUILD b/cmd/kube-scheduler/app/BUILD index af0bbbb4910..eb071684b88 100644 --- a/cmd/kube-scheduler/app/BUILD +++ b/cmd/kube-scheduler/app/BUILD @@ -17,6 +17,7 @@ go_library( "//pkg/scheduler/apis/config:go_default_library", "//pkg/scheduler/framework/v1alpha1:go_default_library", "//pkg/scheduler/metrics:go_default_library", + "//pkg/scheduler/profile:go_default_library", "//pkg/util/configz:go_default_library", "//pkg/util/flag:go_default_library", "//staging/src/k8s.io/api/core/v1:go_default_library", diff --git a/cmd/kube-scheduler/app/config/config.go b/cmd/kube-scheduler/app/config/config.go index 04cc94fa10b..a3d5932dca9 100644 --- a/cmd/kube-scheduler/app/config/config.go +++ b/cmd/kube-scheduler/app/config/config.go @@ -53,7 +53,6 @@ type Config struct { CoreBroadcaster record.EventBroadcaster EventClient v1beta1.EventsGetter - Recorder events.EventRecorder Broadcaster events.EventBroadcaster // LeaderElection is optional. diff --git a/cmd/kube-scheduler/app/server.go b/cmd/kube-scheduler/app/server.go index 207fec0aadd..ca9c9446e58 100644 --- a/cmd/kube-scheduler/app/server.go +++ b/cmd/kube-scheduler/app/server.go @@ -19,7 +19,6 @@ package app import ( "context" - "errors" "fmt" "io" "net/http" @@ -59,6 +58,7 @@ import ( kubeschedulerconfig "k8s.io/kubernetes/pkg/scheduler/apis/config" framework "k8s.io/kubernetes/pkg/scheduler/framework/v1alpha1" "k8s.io/kubernetes/pkg/scheduler/metrics" + "k8s.io/kubernetes/pkg/scheduler/profile" "k8s.io/kubernetes/pkg/util/configz" utilflag "k8s.io/kubernetes/pkg/util/flag" ) @@ -172,34 +172,19 @@ func Run(ctx context.Context, cc schedulerserverconfig.CompletedConfig, outOfTre } } - if len(cc.ComponentConfig.Profiles) != 1 { - // TODO(#85737): Support more than one profile. - return errors.New("multiple scheduling profiles are unsupported") - } - profile := cc.ComponentConfig.Profiles[0] - // Prepare event clients. - if _, err := cc.Client.Discovery().ServerResourcesForGroupVersion(eventsv1beta1.SchemeGroupVersion.String()); err == nil { - cc.Broadcaster = events.NewBroadcaster(&events.EventSinkImpl{Interface: cc.EventClient.Events("")}) - cc.Recorder = cc.Broadcaster.NewRecorder(scheme.Scheme, profile.SchedulerName) - } else { - recorder := cc.CoreBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: profile.SchedulerName}) - cc.Recorder = record.NewEventRecorderAdapter(recorder) - } - + recorderFactory := getRecorderFactory(&cc) // Create the scheduler. sched, err := scheduler.New(cc.Client, cc.InformerFactory, cc.PodInformer, - cc.Recorder, + recorderFactory, ctx.Done(), - scheduler.WithName(profile.SchedulerName), + scheduler.WithProfiles(cc.ComponentConfig.Profiles...), scheduler.WithAlgorithmSource(cc.ComponentConfig.AlgorithmSource), scheduler.WithPreemptionDisabled(cc.ComponentConfig.DisablePreemption), scheduler.WithPercentageOfNodesToScore(cc.ComponentConfig.PercentageOfNodesToScore), scheduler.WithBindTimeoutSeconds(cc.ComponentConfig.BindTimeoutSeconds), scheduler.WithFrameworkOutOfTreeRegistry(outOfTreeRegistry), - scheduler.WithFrameworkPlugins(profile.Plugins), - scheduler.WithFrameworkPluginConfig(profile.PluginConfig), scheduler.WithPodMaxBackoffSeconds(cc.ComponentConfig.PodMaxBackoffSeconds), scheduler.WithPodInitialBackoffSeconds(cc.ComponentConfig.PodInitialBackoffSeconds), ) @@ -336,6 +321,17 @@ func newHealthzHandler(config *kubeschedulerconfig.KubeSchedulerConfiguration, s return pathRecorderMux } +func getRecorderFactory(cc *schedulerserverconfig.CompletedConfig) profile.RecorderFactory { + if _, err := cc.Client.Discovery().ServerResourcesForGroupVersion(eventsv1beta1.SchemeGroupVersion.String()); err == nil { + cc.Broadcaster = events.NewBroadcaster(&events.EventSinkImpl{Interface: cc.EventClient.Events("")}) + return profile.NewRecorderFactory(cc.Broadcaster) + } + return func(name string) events.EventRecorder { + r := cc.CoreBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: name}) + return record.NewEventRecorderAdapter(r) + } +} + // WithPlugin creates an Option based on plugin name and factory. This function is used to register out-of-tree plugins. func WithPlugin(name string, factory framework.PluginFactory) Option { return func(registry framework.Registry) error { diff --git a/pkg/scheduler/BUILD b/pkg/scheduler/BUILD index 30553e6fa61..d7b4e6101fa 100644 --- a/pkg/scheduler/BUILD +++ b/pkg/scheduler/BUILD @@ -27,6 +27,7 @@ go_library( "//pkg/scheduler/internal/cache/debugger:go_default_library", "//pkg/scheduler/internal/queue:go_default_library", "//pkg/scheduler/metrics:go_default_library", + "//pkg/scheduler/profile:go_default_library", "//pkg/scheduler/volumebinder:go_default_library", "//staging/src/k8s.io/api/core/v1:go_default_library", "//staging/src/k8s.io/api/storage/v1:go_default_library", @@ -45,7 +46,6 @@ go_library( "//staging/src/k8s.io/client-go/listers/core/v1:go_default_library", "//staging/src/k8s.io/client-go/listers/policy/v1beta1:go_default_library", "//staging/src/k8s.io/client-go/tools/cache:go_default_library", - "//staging/src/k8s.io/client-go/tools/events:go_default_library", "//vendor/k8s.io/klog:go_default_library", ], ) @@ -81,6 +81,7 @@ go_test( "//pkg/scheduler/internal/queue:go_default_library", "//pkg/scheduler/listers:go_default_library", "//pkg/scheduler/nodeinfo:go_default_library", + "//pkg/scheduler/profile:go_default_library", "//pkg/scheduler/testing:go_default_library", "//pkg/scheduler/volumebinder:go_default_library", "//staging/src/k8s.io/api/core/v1:go_default_library", @@ -128,6 +129,7 @@ filegroup( "//pkg/scheduler/listers:all-srcs", "//pkg/scheduler/metrics:all-srcs", "//pkg/scheduler/nodeinfo:all-srcs", + "//pkg/scheduler/profile:all-srcs", "//pkg/scheduler/testing:all-srcs", "//pkg/scheduler/util:all-srcs", "//pkg/scheduler/volumebinder:all-srcs", diff --git a/pkg/scheduler/apis/config/testing/BUILD b/pkg/scheduler/apis/config/testing/BUILD index a77d0358480..a7fef1b039c 100644 --- a/pkg/scheduler/apis/config/testing/BUILD +++ b/pkg/scheduler/apis/config/testing/BUILD @@ -14,12 +14,14 @@ go_test( "//pkg/scheduler/apis/config:go_default_library", "//pkg/scheduler/apis/config/scheme:go_default_library", "//pkg/scheduler/core:go_default_library", + "//pkg/scheduler/profile:go_default_library", "//staging/src/k8s.io/api/core/v1:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/runtime:go_default_library", "//staging/src/k8s.io/apiserver/pkg/util/feature:go_default_library", "//staging/src/k8s.io/client-go/informers:go_default_library", "//staging/src/k8s.io/client-go/kubernetes/fake:go_default_library", + "//staging/src/k8s.io/client-go/tools/events:go_default_library", "//staging/src/k8s.io/component-base/featuregate:go_default_library", "//staging/src/k8s.io/component-base/featuregate/testing:go_default_library", "//vendor/github.com/google/go-cmp/cmp:go_default_library", diff --git a/pkg/scheduler/apis/config/testing/compatibility_test.go b/pkg/scheduler/apis/config/testing/compatibility_test.go index b3662be76c8..e24810b14d7 100644 --- a/pkg/scheduler/apis/config/testing/compatibility_test.go +++ b/pkg/scheduler/apis/config/testing/compatibility_test.go @@ -20,6 +20,8 @@ import ( "testing" "github.com/google/go-cmp/cmp" + "k8s.io/client-go/tools/events" + "k8s.io/kubernetes/pkg/scheduler/profile" v1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -1376,12 +1378,13 @@ func TestCompatibility_v1_Scheduler(t *testing.T) { }, } informerFactory := informers.NewSharedInformerFactory(client, 0) + recorderFactory := profile.NewRecorderFactory(events.NewBroadcaster(&events.EventSinkImpl{Interface: client.EventsV1beta1().Events("")})) sched, err := scheduler.New( client, informerFactory, informerFactory.Core().V1().Pods(), - nil, + recorderFactory, make(chan struct{}), scheduler.WithAlgorithmSource(algorithmSrc), ) @@ -1390,7 +1393,8 @@ func TestCompatibility_v1_Scheduler(t *testing.T) { t.Fatalf("Error constructing: %v", err) } - gotPlugins := sched.Framework.ListPlugins() + defProf := sched.Profiles["default-scheduler"] + gotPlugins := defProf.Framework.ListPlugins() if diff := cmp.Diff(tc.wantPlugins, gotPlugins); diff != "" { t.Errorf("unexpected plugins diff (-want, +got): %s", diff) } @@ -1538,12 +1542,13 @@ func TestAlgorithmProviderCompatibility(t *testing.T) { client := fake.NewSimpleClientset() informerFactory := informers.NewSharedInformerFactory(client, 0) + recorderFactory := profile.NewRecorderFactory(events.NewBroadcaster(&events.EventSinkImpl{Interface: client.EventsV1beta1().Events("")})) sched, err := scheduler.New( client, informerFactory, informerFactory.Core().V1().Pods(), - nil, + recorderFactory, make(chan struct{}), opts..., ) @@ -1552,7 +1557,8 @@ func TestAlgorithmProviderCompatibility(t *testing.T) { t.Fatalf("Error constructing: %v", err) } - gotPlugins := sched.Framework.ListPlugins() + defProf := sched.Profiles["default-scheduler"] + gotPlugins := defProf.ListPlugins() if diff := cmp.Diff(tc.wantPlugins, gotPlugins); diff != "" { t.Errorf("unexpected plugins diff (-want, +got): %s", diff) } diff --git a/pkg/scheduler/core/BUILD b/pkg/scheduler/core/BUILD index 95c8c3b374e..6121b3f0e30 100644 --- a/pkg/scheduler/core/BUILD +++ b/pkg/scheduler/core/BUILD @@ -18,8 +18,8 @@ go_library( "//pkg/scheduler/listers:go_default_library", "//pkg/scheduler/metrics:go_default_library", "//pkg/scheduler/nodeinfo:go_default_library", + "//pkg/scheduler/profile:go_default_library", "//pkg/scheduler/util:go_default_library", - "//pkg/scheduler/volumebinder:go_default_library", "//staging/src/k8s.io/api/core/v1:go_default_library", "//staging/src/k8s.io/api/policy/v1beta1:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library", @@ -66,6 +66,7 @@ go_test( "//pkg/scheduler/listers:go_default_library", "//pkg/scheduler/listers/fake:go_default_library", "//pkg/scheduler/nodeinfo:go_default_library", + "//pkg/scheduler/profile:go_default_library", "//pkg/scheduler/testing:go_default_library", "//pkg/scheduler/util:go_default_library", "//staging/src/k8s.io/api/core/v1:go_default_library", diff --git a/pkg/scheduler/core/extender_test.go b/pkg/scheduler/core/extender_test.go index 0de01e32c57..2d3b64295d5 100644 --- a/pkg/scheduler/core/extender_test.go +++ b/pkg/scheduler/core/extender_test.go @@ -42,6 +42,7 @@ import ( internalqueue "k8s.io/kubernetes/pkg/scheduler/internal/queue" "k8s.io/kubernetes/pkg/scheduler/listers" schedulernodeinfo "k8s.io/kubernetes/pkg/scheduler/nodeinfo" + "k8s.io/kubernetes/pkg/scheduler/profile" st "k8s.io/kubernetes/pkg/scheduler/testing" "k8s.io/kubernetes/pkg/scheduler/util" ) @@ -589,21 +590,22 @@ func TestGenericSchedulerWithExtenders(t *testing.T) { if err != nil { t.Fatal(err) } + prof := &profile.Profile{ + Framework: fwk, + } scheduler := NewGenericScheduler( cache, queue, emptySnapshot, - fwk, extenders, - nil, informerFactory.Core().V1().PersistentVolumeClaims().Lister(), informerFactory.Policy().V1beta1().PodDisruptionBudgets().Lister(), false, schedulerapi.DefaultPercentageOfNodesToScore, false) podIgnored := &v1.Pod{} - result, err := scheduler.Schedule(context.Background(), framework.NewCycleState(), podIgnored) + result, err := scheduler.Schedule(context.Background(), prof, framework.NewCycleState(), podIgnored) if test.expectsErr { if err == nil { t.Errorf("Unexpected non-error, result %+v", result) diff --git a/pkg/scheduler/core/generic_scheduler.go b/pkg/scheduler/core/generic_scheduler.go index 67c50088478..8a6acce0eb6 100644 --- a/pkg/scheduler/core/generic_scheduler.go +++ b/pkg/scheduler/core/generic_scheduler.go @@ -44,8 +44,8 @@ import ( "k8s.io/kubernetes/pkg/scheduler/listers" "k8s.io/kubernetes/pkg/scheduler/metrics" schedulernodeinfo "k8s.io/kubernetes/pkg/scheduler/nodeinfo" + "k8s.io/kubernetes/pkg/scheduler/profile" "k8s.io/kubernetes/pkg/scheduler/util" - "k8s.io/kubernetes/pkg/scheduler/volumebinder" utiltrace "k8s.io/utils/trace" ) @@ -102,22 +102,15 @@ func (f *FitError) Error() string { // onto machines. // TODO: Rename this type. type ScheduleAlgorithm interface { - Schedule(context.Context, *framework.CycleState, *v1.Pod) (scheduleResult ScheduleResult, err error) + Schedule(context.Context, *profile.Profile, *framework.CycleState, *v1.Pod) (scheduleResult ScheduleResult, err error) // Preempt receives scheduling errors for a pod and tries to create room for // the pod by preempting lower priority pods if possible. // It returns the node where preemption happened, a list of preempted pods, a // list of pods whose nominated node name should be removed, and error if any. - Preempt(context.Context, *framework.CycleState, *v1.Pod, error) (selectedNode *v1.Node, preemptedPods []*v1.Pod, cleanupNominatedPods []*v1.Pod, err error) + Preempt(context.Context, *profile.Profile, *framework.CycleState, *v1.Pod, error) (selectedNode *v1.Node, preemptedPods []*v1.Pod, cleanupNominatedPods []*v1.Pod, err error) // Prioritizers returns a slice of priority config. This is exposed for // testing. Extenders() []SchedulerExtender - // Snapshot snapshots scheduler cache and node infos. This is needed - // for cluster autoscaler integration. - // TODO(#85691): remove this once CA migrates to creating a Framework instead of a full scheduler. - Snapshot() error - // Framework returns the scheduler framework instance. This is needed for cluster autoscaler integration. - // TODO(#85691): remove this once CA migrates to creating a Framework instead of a full scheduler. - Framework() framework.Framework } // ScheduleResult represents the result of one pod scheduled. It will contain @@ -134,10 +127,8 @@ type ScheduleResult struct { type genericScheduler struct { cache internalcache.Cache schedulingQueue internalqueue.SchedulingQueue - framework framework.Framework extenders []SchedulerExtender nodeInfoSnapshot *internalcache.Snapshot - volumeBinder *volumebinder.VolumeBinder pvcLister corelisters.PersistentVolumeClaimLister pdbLister policylisters.PodDisruptionBudgetLister disablePreemption bool @@ -146,23 +137,17 @@ type genericScheduler struct { nextStartNodeIndex int } -// Snapshot snapshots scheduler cache and node infos for all fit and priority +// snapshot snapshots scheduler cache and node infos for all fit and priority // functions. -func (g *genericScheduler) Snapshot() error { +func (g *genericScheduler) snapshot() error { // Used for all fit and priority funcs. return g.cache.UpdateSnapshot(g.nodeInfoSnapshot) } -// Framework returns the framework instance. -func (g *genericScheduler) Framework() framework.Framework { - // Used for all fit and priority funcs. - return g.framework -} - // Schedule tries to schedule the given pod to one of the nodes in the node list. // If it succeeds, it will return the name of the node. // If it fails, it will return a FitError error with reasons. -func (g *genericScheduler) Schedule(ctx context.Context, state *framework.CycleState, pod *v1.Pod) (result ScheduleResult, err error) { +func (g *genericScheduler) Schedule(ctx context.Context, prof *profile.Profile, state *framework.CycleState, pod *v1.Pod) (result ScheduleResult, err error) { trace := utiltrace.New("Scheduling", utiltrace.Field{Key: "namespace", Value: pod.Namespace}, utiltrace.Field{Key: "name", Value: pod.Name}) defer trace.LogIfLong(100 * time.Millisecond) @@ -171,7 +156,7 @@ func (g *genericScheduler) Schedule(ctx context.Context, state *framework.CycleS } trace.Step("Basic checks done") - if err := g.Snapshot(); err != nil { + if err := g.snapshot(); err != nil { return result, err } trace.Step("Snapshotting scheduler cache and node infos done") @@ -181,14 +166,14 @@ func (g *genericScheduler) Schedule(ctx context.Context, state *framework.CycleS } // Run "prefilter" plugins. - preFilterStatus := g.framework.RunPreFilterPlugins(ctx, state, pod) + preFilterStatus := prof.RunPreFilterPlugins(ctx, state, pod) if !preFilterStatus.IsSuccess() { return result, preFilterStatus.AsError() } trace.Step("Running prefilter plugins done") startPredicateEvalTime := time.Now() - filteredNodes, filteredNodesStatuses, err := g.findNodesThatFitPod(ctx, state, pod) + filteredNodes, filteredNodesStatuses, err := g.findNodesThatFitPod(ctx, prof, state, pod) if err != nil { return result, err } @@ -203,7 +188,7 @@ func (g *genericScheduler) Schedule(ctx context.Context, state *framework.CycleS } // Run "prescore" plugins. - prescoreStatus := g.framework.RunPreScorePlugins(ctx, state, pod, filteredNodes) + prescoreStatus := prof.RunPreScorePlugins(ctx, state, pod, filteredNodes) if !prescoreStatus.IsSuccess() { return result, prescoreStatus.AsError() } @@ -223,7 +208,7 @@ func (g *genericScheduler) Schedule(ctx context.Context, state *framework.CycleS }, nil } - priorityList, err := g.prioritizeNodes(ctx, state, pod, filteredNodes) + priorityList, err := g.prioritizeNodes(ctx, prof, state, pod, filteredNodes) if err != nil { return result, err } @@ -282,7 +267,7 @@ func (g *genericScheduler) selectHost(nodeScoreList framework.NodeScoreList) (st // other pods with the same priority. The nominated pod prevents other pods from // using the nominated resources and the nominated pod could take a long time // before it is retried after many other pending pods. -func (g *genericScheduler) Preempt(ctx context.Context, state *framework.CycleState, pod *v1.Pod, scheduleErr error) (*v1.Node, []*v1.Pod, []*v1.Pod, error) { +func (g *genericScheduler) Preempt(ctx context.Context, prof *profile.Profile, state *framework.CycleState, pod *v1.Pod, scheduleErr error) (*v1.Node, []*v1.Pod, []*v1.Pod, error) { // Scheduler may return various types of errors. Consider preemption only if // the error is of type FitError. fitError, ok := scheduleErr.(*FitError) @@ -313,7 +298,7 @@ func (g *genericScheduler) Preempt(ctx context.Context, state *framework.CycleSt return nil, nil, nil, err } } - nodeToVictims, err := g.selectNodesForPreemption(ctx, state, pod, potentialNodes, pdbs) + nodeToVictims, err := g.selectNodesForPreemption(ctx, prof, state, pod, potentialNodes, pdbs) if err != nil { return nil, nil, nil, err } @@ -426,9 +411,9 @@ func (g *genericScheduler) numFeasibleNodesToFind(numAllNodes int32) (numNodes i // Filters the nodes to find the ones that fit the pod based on the framework // filter plugins and filter extenders. -func (g *genericScheduler) findNodesThatFitPod(ctx context.Context, state *framework.CycleState, pod *v1.Pod) ([]*v1.Node, framework.NodeToStatusMap, error) { +func (g *genericScheduler) findNodesThatFitPod(ctx context.Context, prof *profile.Profile, state *framework.CycleState, pod *v1.Pod) ([]*v1.Node, framework.NodeToStatusMap, error) { filteredNodesStatuses := make(framework.NodeToStatusMap) - filtered, err := g.findNodesThatPassFilters(ctx, state, pod, filteredNodesStatuses) + filtered, err := g.findNodesThatPassFilters(ctx, prof, state, pod, filteredNodesStatuses) if err != nil { return nil, nil, err } @@ -441,7 +426,7 @@ func (g *genericScheduler) findNodesThatFitPod(ctx context.Context, state *frame } // findNodesThatPassFilters finds the nodes that fit the filter plugins. -func (g *genericScheduler) findNodesThatPassFilters(ctx context.Context, state *framework.CycleState, pod *v1.Pod, statuses framework.NodeToStatusMap) ([]*v1.Node, error) { +func (g *genericScheduler) findNodesThatPassFilters(ctx context.Context, prof *profile.Profile, state *framework.CycleState, pod *v1.Pod, statuses framework.NodeToStatusMap) ([]*v1.Node, error) { allNodes, err := g.nodeInfoSnapshot.NodeInfos().List() if err != nil { return nil, err @@ -453,7 +438,7 @@ func (g *genericScheduler) findNodesThatPassFilters(ctx context.Context, state * // and allow assigning. filtered := make([]*v1.Node, numNodesToFind) - if !g.framework.HasFilterPlugins() { + if !prof.HasFilterPlugins() { for i := range filtered { filtered[i] = allNodes[i].Node() } @@ -469,7 +454,7 @@ func (g *genericScheduler) findNodesThatPassFilters(ctx context.Context, state * // We check the nodes starting from where we left off in the previous scheduling cycle, // this is to make sure all nodes have the same chance of being examined across pods. nodeInfo := allNodes[(g.nextStartNodeIndex+i)%len(allNodes)] - fits, status, err := g.podPassesFiltersOnNode(ctx, state, pod, nodeInfo) + fits, status, err := g.podPassesFiltersOnNode(ctx, prof, state, pod, nodeInfo) if err != nil { errCh.SendErrorWithCancel(err, cancel) return @@ -547,8 +532,7 @@ func (g *genericScheduler) findNodesThatPassExtenders(pod *v1.Pod, filtered []*v // addNominatedPods adds pods with equal or greater priority which are nominated // to run on the node. It returns 1) whether any pod was added, 2) augmented cycleState, // 3) augmented nodeInfo. -func (g *genericScheduler) addNominatedPods(ctx context.Context, pod *v1.Pod, state *framework.CycleState, - nodeInfo *schedulernodeinfo.NodeInfo) (bool, *framework.CycleState, *schedulernodeinfo.NodeInfo, error) { +func (g *genericScheduler) addNominatedPods(ctx context.Context, prof *profile.Profile, pod *v1.Pod, state *framework.CycleState, nodeInfo *schedulernodeinfo.NodeInfo) (bool, *framework.CycleState, *schedulernodeinfo.NodeInfo, error) { if g.schedulingQueue == nil || nodeInfo == nil || nodeInfo.Node() == nil { // This may happen only in tests. return false, state, nodeInfo, nil @@ -563,7 +547,7 @@ func (g *genericScheduler) addNominatedPods(ctx context.Context, pod *v1.Pod, st for _, p := range nominatedPods { if podutil.GetPodPriority(p) >= podutil.GetPodPriority(pod) && p.UID != pod.UID { nodeInfoOut.AddPod(p) - status := g.framework.RunPreFilterExtensionAddPod(ctx, stateOut, pod, p, nodeInfoOut) + status := prof.RunPreFilterExtensionAddPod(ctx, stateOut, pod, p, nodeInfoOut) if !status.IsSuccess() { return false, state, nodeInfo, status.AsError() } @@ -585,6 +569,7 @@ func (g *genericScheduler) addNominatedPods(ctx context.Context, pod *v1.Pod, st // NodeInfo before calling this function. func (g *genericScheduler) podPassesFiltersOnNode( ctx context.Context, + prof *profile.Profile, state *framework.CycleState, pod *v1.Pod, info *schedulernodeinfo.NodeInfo, @@ -615,7 +600,7 @@ func (g *genericScheduler) podPassesFiltersOnNode( nodeInfoToUse := info if i == 0 { var err error - podsAdded, stateToUse, nodeInfoToUse, err = g.addNominatedPods(ctx, pod, state, info) + podsAdded, stateToUse, nodeInfoToUse, err = g.addNominatedPods(ctx, prof, pod, state, info) if err != nil { return false, nil, err } @@ -623,7 +608,7 @@ func (g *genericScheduler) podPassesFiltersOnNode( break } - statusMap := g.framework.RunFilterPlugins(ctx, stateToUse, pod, nodeInfoToUse) + statusMap := prof.RunFilterPlugins(ctx, stateToUse, pod, nodeInfoToUse) status = statusMap.Merge() if !status.IsSuccess() && !status.IsUnschedulable() { return false, status, status.AsError() @@ -640,13 +625,14 @@ func (g *genericScheduler) podPassesFiltersOnNode( // All scores are finally combined (added) to get the total weighted scores of all nodes func (g *genericScheduler) prioritizeNodes( ctx context.Context, + prof *profile.Profile, state *framework.CycleState, pod *v1.Pod, nodes []*v1.Node, ) (framework.NodeScoreList, error) { // If no priority configs are provided, then all nodes will have a score of one. // This is required to generate the priority list in the required format - if len(g.extenders) == 0 && !g.framework.HasScorePlugins() { + if len(g.extenders) == 0 && !prof.HasScorePlugins() { result := make(framework.NodeScoreList, 0, len(nodes)) for i := range nodes { result = append(result, framework.NodeScore{ @@ -658,7 +644,7 @@ func (g *genericScheduler) prioritizeNodes( } // Run the Score plugins. - scoresMap, scoreStatus := g.framework.RunScorePlugins(ctx, state, pod, nodes) + scoresMap, scoreStatus := prof.RunScorePlugins(ctx, state, pod, nodes) if !scoreStatus.IsSuccess() { return framework.NodeScoreList{}, scoreStatus.AsError() } @@ -863,6 +849,7 @@ func pickOneNodeForPreemption(nodesToVictims map[*v1.Node]*extenderv1.Victims) * // preemption in parallel. func (g *genericScheduler) selectNodesForPreemption( ctx context.Context, + prof *profile.Profile, state *framework.CycleState, pod *v1.Pod, potentialNodes []*schedulernodeinfo.NodeInfo, @@ -874,7 +861,7 @@ func (g *genericScheduler) selectNodesForPreemption( checkNode := func(i int) { nodeInfoCopy := potentialNodes[i].Clone() stateCopy := state.Clone() - pods, numPDBViolations, fits := g.selectVictimsOnNode(ctx, stateCopy, pod, nodeInfoCopy, pdbs) + pods, numPDBViolations, fits := g.selectVictimsOnNode(ctx, prof, stateCopy, pod, nodeInfoCopy, pdbs) if fits { resultLock.Lock() victims := extenderv1.Victims{ @@ -952,6 +939,7 @@ func filterPodsWithPDBViolation(pods []*v1.Pod, pdbs []*policy.PodDisruptionBudg // these predicates can be satisfied by removing more pods from the node. func (g *genericScheduler) selectVictimsOnNode( ctx context.Context, + prof *profile.Profile, state *framework.CycleState, pod *v1.Pod, nodeInfo *schedulernodeinfo.NodeInfo, @@ -963,7 +951,7 @@ func (g *genericScheduler) selectVictimsOnNode( if err := nodeInfo.RemovePod(rp); err != nil { return err } - status := g.framework.RunPreFilterExtensionRemovePod(ctx, state, pod, rp, nodeInfo) + status := prof.RunPreFilterExtensionRemovePod(ctx, state, pod, rp, nodeInfo) if !status.IsSuccess() { return status.AsError() } @@ -971,7 +959,7 @@ func (g *genericScheduler) selectVictimsOnNode( } addPod := func(ap *v1.Pod) error { nodeInfo.AddPod(ap) - status := g.framework.RunPreFilterExtensionAddPod(ctx, state, pod, ap, nodeInfo) + status := prof.RunPreFilterExtensionAddPod(ctx, state, pod, ap, nodeInfo) if !status.IsSuccess() { return status.AsError() } @@ -994,7 +982,7 @@ func (g *genericScheduler) selectVictimsOnNode( // inter-pod affinity to one or more victims, but we have decided not to // support this case for performance reasons. Having affinity to lower // priority pods is not a recommended configuration anyway. - if fits, _, err := g.podPassesFiltersOnNode(ctx, state, pod, nodeInfo); !fits { + if fits, _, err := g.podPassesFiltersOnNode(ctx, prof, state, pod, nodeInfo); !fits { if err != nil { klog.Warningf("Encountered error while selecting victims on node %v: %v", nodeInfo.Node().Name, err) } @@ -1012,7 +1000,7 @@ func (g *genericScheduler) selectVictimsOnNode( if err := addPod(p); err != nil { return false, err } - fits, _, _ := g.podPassesFiltersOnNode(ctx, state, pod, nodeInfo) + fits, _, _ := g.podPassesFiltersOnNode(ctx, prof, state, pod, nodeInfo) if !fits { if err := removePod(p); err != nil { return false, err @@ -1114,9 +1102,7 @@ func NewGenericScheduler( cache internalcache.Cache, podQueue internalqueue.SchedulingQueue, nodeInfoSnapshot *internalcache.Snapshot, - framework framework.Framework, extenders []SchedulerExtender, - volumeBinder *volumebinder.VolumeBinder, pvcLister corelisters.PersistentVolumeClaimLister, pdbLister policylisters.PodDisruptionBudgetLister, disablePreemption bool, @@ -1125,10 +1111,8 @@ func NewGenericScheduler( return &genericScheduler{ cache: cache, schedulingQueue: podQueue, - framework: framework, extenders: extenders, nodeInfoSnapshot: nodeInfoSnapshot, - volumeBinder: volumeBinder, pvcLister: pvcLister, pdbLister: pdbLister, disablePreemption: disablePreemption, diff --git a/pkg/scheduler/core/generic_scheduler_test.go b/pkg/scheduler/core/generic_scheduler_test.go index cdc636aee49..5aad5b1cac6 100644 --- a/pkg/scheduler/core/generic_scheduler_test.go +++ b/pkg/scheduler/core/generic_scheduler_test.go @@ -59,6 +59,7 @@ import ( fakelisters "k8s.io/kubernetes/pkg/scheduler/listers/fake" "k8s.io/kubernetes/pkg/scheduler/nodeinfo" schedulernodeinfo "k8s.io/kubernetes/pkg/scheduler/nodeinfo" + "k8s.io/kubernetes/pkg/scheduler/profile" st "k8s.io/kubernetes/pkg/scheduler/testing" schedutil "k8s.io/kubernetes/pkg/scheduler/util" ) @@ -804,6 +805,7 @@ func TestGenericScheduler(t *testing.T) { if err != nil { t.Fatal(err) } + prof := &profile.Profile{Framework: fwk} var pvcs []v1.PersistentVolumeClaim pvcs = append(pvcs, test.pvcs...) @@ -813,15 +815,13 @@ func TestGenericScheduler(t *testing.T) { cache, internalqueue.NewSchedulingQueue(nil), snapshot, - fwk, []SchedulerExtender{}, - nil, pvcLister, informerFactory.Policy().V1beta1().PodDisruptionBudgets().Lister(), false, schedulerapi.DefaultPercentageOfNodesToScore, false) - result, err := scheduler.Schedule(context.Background(), framework.NewCycleState(), test.pod) + result, err := scheduler.Schedule(context.Background(), prof, framework.NewCycleState(), test.pod) if !reflect.DeepEqual(err, test.wErr) { t.Errorf("Unexpected error: %v, expected: %v", err.Error(), test.wErr) } @@ -836,35 +836,46 @@ func TestGenericScheduler(t *testing.T) { } // makeScheduler makes a simple genericScheduler for testing. -func makeScheduler(nodes []*v1.Node, fns ...st.RegisterPluginFunc) *genericScheduler { +func makeScheduler(nodes []*v1.Node) *genericScheduler { cache := internalcache.New(time.Duration(0), wait.NeverStop) for _, n := range nodes { cache.AddNode(n) } - fwk, _ := st.NewFramework(fns) s := NewGenericScheduler( cache, internalqueue.NewSchedulingQueue(nil), emptySnapshot, - fwk, - nil, nil, nil, nil, false, + nil, nil, nil, false, schedulerapi.DefaultPercentageOfNodesToScore, false) cache.UpdateSnapshot(s.(*genericScheduler).nodeInfoSnapshot) return s.(*genericScheduler) } +func makeProfile(fns ...st.RegisterPluginFunc) (*profile.Profile, error) { + fwk, err := st.NewFramework(fns) + if err != nil { + return nil, err + } + return &profile.Profile{ + Framework: fwk, + }, nil +} + func TestFindFitAllError(t *testing.T) { nodes := makeNodeList([]string{"3", "2", "1"}) - scheduler := makeScheduler( - nodes, + scheduler := makeScheduler(nodes) + prof, err := makeProfile( st.RegisterQueueSortPlugin(queuesort.Name, queuesort.New), st.RegisterFilterPlugin("TrueFilter", NewTrueFilterPlugin), st.RegisterFilterPlugin("MatchFilter", NewMatchFilterPlugin), st.RegisterBindPlugin(defaultbinder.Name, defaultbinder.New), ) + if err != nil { + t.Fatal(err) + } - _, nodeToStatusMap, err := scheduler.findNodesThatFitPod(context.Background(), framework.NewCycleState(), &v1.Pod{}) + _, nodeToStatusMap, err := scheduler.findNodesThatFitPod(context.Background(), prof, framework.NewCycleState(), &v1.Pod{}) if err != nil { t.Errorf("unexpected error: %v", err) @@ -890,16 +901,19 @@ func TestFindFitAllError(t *testing.T) { func TestFindFitSomeError(t *testing.T) { nodes := makeNodeList([]string{"3", "2", "1"}) - scheduler := makeScheduler( - nodes, + scheduler := makeScheduler(nodes) + prof, err := makeProfile( st.RegisterQueueSortPlugin(queuesort.Name, queuesort.New), st.RegisterFilterPlugin("TrueFilter", NewTrueFilterPlugin), st.RegisterFilterPlugin("MatchFilter", NewMatchFilterPlugin), st.RegisterBindPlugin(defaultbinder.Name, defaultbinder.New), ) + if err != nil { + t.Fatal(err) + } pod := &v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "1", UID: types.UID("1")}} - _, nodeToStatusMap, err := scheduler.findNodesThatFitPod(context.Background(), framework.NewCycleState(), pod) + _, nodeToStatusMap, err := scheduler.findNodesThatFitPod(context.Background(), prof, framework.NewCycleState(), pod) if err != nil { t.Errorf("unexpected error: %v", err) @@ -947,11 +961,6 @@ func TestFindFitPredicateCallCounts(t *testing.T) { for _, test := range tests { nodes := makeNodeList([]string{"1"}) - cache := internalcache.New(time.Duration(0), wait.NeverStop) - for _, n := range nodes { - cache.AddNode(n) - } - plugin := fakeFilterPlugin{} registerFakeFilterFunc := st.RegisterFilterPlugin( "FakeFilter", @@ -964,23 +973,18 @@ func TestFindFitPredicateCallCounts(t *testing.T) { registerFakeFilterFunc, st.RegisterBindPlugin(defaultbinder.Name, defaultbinder.New), } - fwk, err := st.NewFramework(registerPlugins) + prof, err := makeProfile(registerPlugins...) if err != nil { t.Fatal(err) } - queue := internalqueue.NewSchedulingQueue(nil) - scheduler := NewGenericScheduler( - cache, - queue, - emptySnapshot, - fwk, - nil, nil, nil, nil, false, - schedulerapi.DefaultPercentageOfNodesToScore, false).(*genericScheduler) - cache.UpdateSnapshot(scheduler.nodeInfoSnapshot) - queue.UpdateNominatedPodForNode(&v1.Pod{ObjectMeta: metav1.ObjectMeta{UID: types.UID("nominated")}, Spec: v1.PodSpec{Priority: &midPriority}}, "1") + scheduler := makeScheduler(nodes) + if err := scheduler.cache.UpdateSnapshot(scheduler.nodeInfoSnapshot); err != nil { + t.Fatal(err) + } + scheduler.schedulingQueue.UpdateNominatedPodForNode(&v1.Pod{ObjectMeta: metav1.ObjectMeta{UID: "nominated"}, Spec: v1.PodSpec{Priority: &midPriority}}, "1") - _, _, err = scheduler.findNodesThatFitPod(context.Background(), framework.NewCycleState(), test.pod) + _, _, err = scheduler.findNodesThatFitPod(context.Background(), prof, framework.NewCycleState(), test.pod) if err != nil { t.Errorf("unexpected error: %v", err) @@ -1127,16 +1131,15 @@ func TestZeroRequest(t *testing.T) { if err != nil { t.Fatalf("error creating framework: %+v", err) } + prof := &profile.Profile{Framework: fwk} scheduler := NewGenericScheduler( nil, nil, emptySnapshot, - fwk, []SchedulerExtender{}, nil, nil, - nil, false, schedulerapi.DefaultPercentageOfNodesToScore, false).(*genericScheduler) @@ -1144,17 +1147,12 @@ func TestZeroRequest(t *testing.T) { ctx := context.Background() state := framework.NewCycleState() - _, _, err = scheduler.findNodesThatFitPod(ctx, state, test.pod) + _, _, err = scheduler.findNodesThatFitPod(ctx, prof, state, test.pod) if err != nil { t.Fatalf("error filtering nodes: %+v", err) } - scheduler.framework.RunPreScorePlugins(ctx, state, test.pod, test.nodes) - list, err := scheduler.prioritizeNodes( - ctx, - state, - test.pod, - test.nodes, - ) + prof.RunPreScorePlugins(ctx, state, test.pod, test.nodes) + list, err := scheduler.prioritizeNodes(ctx, prof, state, test.pod, test.nodes) if err != nil { t.Errorf("unexpected error: %v", err) } @@ -1613,15 +1611,14 @@ func TestSelectNodesForPreemption(t *testing.T) { if err != nil { t.Fatal(err) } + prof := &profile.Profile{Framework: fwk} scheduler := NewGenericScheduler( nil, internalqueue.NewSchedulingQueue(nil), snapshot, - fwk, []SchedulerExtender{}, nil, - nil, informerFactory.Policy().V1beta1().PodDisruptionBudgets().Lister(), false, schedulerapi.DefaultPercentageOfNodesToScore, @@ -1632,7 +1629,7 @@ func TestSelectNodesForPreemption(t *testing.T) { state := framework.NewCycleState() // Some tests rely on PreFilter plugin to compute its CycleState. - preFilterStatus := fwk.RunPreFilterPlugins(context.Background(), state, test.pod) + preFilterStatus := prof.RunPreFilterPlugins(context.Background(), state, test.pod) if !preFilterStatus.IsSuccess() { t.Errorf("Unexpected preFilterStatus: %v", preFilterStatus) } @@ -1640,7 +1637,7 @@ func TestSelectNodesForPreemption(t *testing.T) { if err != nil { t.Fatal(err) } - nodeToPods, err := g.selectNodesForPreemption(context.Background(), state, test.pod, nodeInfos, test.pdbs) + nodeToPods, err := g.selectNodesForPreemption(context.Background(), prof, state, test.pod, nodeInfos, test.pdbs) if err != nil { t.Error(err) } @@ -1903,9 +1900,9 @@ func TestPickOneNodeForPreemption(t *testing.T) { if err != nil { t.Fatal(err) } + prof := &profile.Profile{Framework: fwk} g := &genericScheduler{ - framework: fwk, nodeInfoSnapshot: snapshot, } assignDefaultStartTime(test.pods) @@ -1916,11 +1913,11 @@ func TestPickOneNodeForPreemption(t *testing.T) { } state := framework.NewCycleState() // Some tests rely on PreFilter plugin to compute its CycleState. - preFilterStatus := fwk.RunPreFilterPlugins(context.Background(), state, test.pod) + preFilterStatus := prof.RunPreFilterPlugins(context.Background(), state, test.pod) if !preFilterStatus.IsSuccess() { t.Errorf("Unexpected preFilterStatus: %v", preFilterStatus) } - candidateNodes, _ := g.selectNodesForPreemption(context.Background(), state, test.pod, nodeInfos, nil) + candidateNodes, _ := g.selectNodesForPreemption(context.Background(), prof, state, test.pod, nodeInfos, nil) node := pickOneNodeForPreemption(candidateNodes) found := false for _, nodeName := range test.expected { @@ -2411,14 +2408,13 @@ func TestPreempt(t *testing.T) { if err != nil { t.Fatal(err) } + prof := &profile.Profile{Framework: fwk} scheduler := NewGenericScheduler( cache, internalqueue.NewSchedulingQueue(nil), snapshot, - fwk, extenders, - nil, informerFactory.Core().V1().PersistentVolumeClaims().Lister(), informerFactory.Policy().V1beta1().PodDisruptionBudgets().Lister(), false, @@ -2435,7 +2431,7 @@ func TestPreempt(t *testing.T) { if test.failedNodeToStatusMap != nil { failedNodeToStatusMap = test.failedNodeToStatusMap } - node, victims, _, err := scheduler.Preempt(context.Background(), state, test.pod, error(&FitError{Pod: test.pod, FilteredNodesStatuses: failedNodeToStatusMap})) + node, victims, _, err := scheduler.Preempt(context.Background(), prof, state, test.pod, error(&FitError{Pod: test.pod, FilteredNodesStatuses: failedNodeToStatusMap})) if err != nil { t.Errorf("unexpected error in preemption: %v", err) } @@ -2465,7 +2461,7 @@ func TestPreempt(t *testing.T) { test.pod.Status.NominatedNodeName = node.Name } // Call preempt again and make sure it doesn't preempt any more pods. - node, victims, _, err = scheduler.Preempt(context.Background(), state, test.pod, error(&FitError{Pod: test.pod, FilteredNodesStatuses: failedNodeToStatusMap})) + node, victims, _, err = scheduler.Preempt(context.Background(), prof, state, test.pod, error(&FitError{Pod: test.pod, FilteredNodesStatuses: failedNodeToStatusMap})) if err != nil { t.Errorf("unexpected error in preemption: %v", err) } @@ -2547,19 +2543,22 @@ func TestFairEvaluationForNodes(t *testing.T) { nodeNames = append(nodeNames, strconv.Itoa(i)) } nodes := makeNodeList(nodeNames) - g := makeScheduler( - nodes, + g := makeScheduler(nodes) + prof, err := makeProfile( st.RegisterQueueSortPlugin(queuesort.Name, queuesort.New), st.RegisterFilterPlugin("TrueFilter", NewTrueFilterPlugin), st.RegisterBindPlugin(defaultbinder.Name, defaultbinder.New), ) + if err != nil { + t.Fatal(err) + } // To make numAllNodes % nodesToFind != 0 g.percentageOfNodesToScore = 30 nodesToFind := int(g.numFeasibleNodesToFind(int32(numAllNodes))) // Iterating over all nodes more than twice for i := 0; i < 2*(numAllNodes/nodesToFind+1); i++ { - nodesThatFit, _, err := g.findNodesThatFitPod(context.Background(), framework.NewCycleState(), &v1.Pod{}) + nodesThatFit, _, err := g.findNodesThatFitPod(context.Background(), prof, framework.NewCycleState(), &v1.Pod{}) if err != nil { t.Errorf("unexpected error: %v", err) } diff --git a/pkg/scheduler/eventhandlers.go b/pkg/scheduler/eventhandlers.go index ed66ec9e647..f6d8bb1c96a 100644 --- a/pkg/scheduler/eventhandlers.go +++ b/pkg/scheduler/eventhandlers.go @@ -21,6 +21,7 @@ import ( "reflect" "k8s.io/klog" + "k8s.io/kubernetes/pkg/scheduler/profile" v1 "k8s.io/api/core/v1" storagev1 "k8s.io/api/storage/v1" @@ -209,7 +210,14 @@ func (sched *Scheduler) deletePodFromSchedulingQueue(obj interface{}) { // Volume binder only wants to keep unassigned pods sched.VolumeBinder.DeletePodBindings(pod) } - sched.Framework.RejectWaitingPod(pod.UID) + prof, err := sched.profileForPod(pod) + if err != nil { + // This shouldn't happen, because we only accept for scheduling the pods + // which specify a scheduler name that matches one of the profiles. + klog.Error(err) + return + } + prof.Framework.RejectWaitingPod(pod.UID) } func (sched *Scheduler) addPodToCache(obj interface{}) { @@ -286,8 +294,8 @@ func assignedPod(pod *v1.Pod) bool { } // responsibleForPod returns true if the pod has asked to be scheduled by the given scheduler. -func responsibleForPod(pod *v1.Pod, schedulerName string) bool { - return schedulerName == pod.Spec.SchedulerName +func responsibleForPod(pod *v1.Pod, profiles profile.Map) bool { + return profiles.HandlesSchedulerName(pod.Spec.SchedulerName) } // skipPodUpdate checks whether the specified pod update should be ignored. @@ -337,11 +345,10 @@ func (sched *Scheduler) skipPodUpdate(pod *v1.Pod) bool { return true } -// AddAllEventHandlers is a helper function used in tests and in Scheduler +// addAllEventHandlers is a helper function used in tests and in Scheduler // to add event handlers for various informers. -func AddAllEventHandlers( +func addAllEventHandlers( sched *Scheduler, - schedulerName string, informerFactory informers.SharedInformerFactory, podInformer coreinformers.PodInformer, ) { @@ -376,10 +383,10 @@ func AddAllEventHandlers( FilterFunc: func(obj interface{}) bool { switch t := obj.(type) { case *v1.Pod: - return !assignedPod(t) && responsibleForPod(t, schedulerName) + return !assignedPod(t) && responsibleForPod(t, sched.Profiles) case cache.DeletedFinalStateUnknown: if pod, ok := t.Obj.(*v1.Pod); ok { - return !assignedPod(pod) && responsibleForPod(pod, schedulerName) + return !assignedPod(pod) && responsibleForPod(pod, sched.Profiles) } utilruntime.HandleError(fmt.Errorf("unable to convert object %T to *v1.Pod in %T", obj, sched)) return false diff --git a/pkg/scheduler/factory.go b/pkg/scheduler/factory.go index c7fdb7c2fc1..0874f8d0dcd 100644 --- a/pkg/scheduler/factory.go +++ b/pkg/scheduler/factory.go @@ -18,12 +18,13 @@ package scheduler import ( "context" + "errors" "fmt" "sort" "time" v1 "k8s.io/api/core/v1" - "k8s.io/apimachinery/pkg/api/errors" + apierrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/fields" "k8s.io/apimachinery/pkg/types" @@ -51,6 +52,7 @@ import ( internalcache "k8s.io/kubernetes/pkg/scheduler/internal/cache" cachedebugger "k8s.io/kubernetes/pkg/scheduler/internal/cache/debugger" internalqueue "k8s.io/kubernetes/pkg/scheduler/internal/queue" + "k8s.io/kubernetes/pkg/scheduler/profile" "k8s.io/kubernetes/pkg/scheduler/volumebinder" ) @@ -69,6 +71,8 @@ type Binder interface { type Configurator struct { client clientset.Interface + recorderFactory profile.RecorderFactory + informerFactory informers.SharedInformerFactory podInformer coreinformers.PodInformer @@ -98,31 +102,37 @@ type Configurator struct { enableNonPreempting bool - // framework configuration arguments. + profiles []schedulerapi.KubeSchedulerProfile registry framework.Registry - plugins *schedulerapi.Plugins - pluginConfig []schedulerapi.PluginConfig nodeInfoSnapshot *internalcache.Snapshot } -// create a scheduler from a set of registered plugins. -func (c *Configurator) create(extenders []core.SchedulerExtender) (*Scheduler, error) { - framework, err := framework.NewFramework( +func (c *Configurator) buildFramework(p schedulerapi.KubeSchedulerProfile) (framework.Framework, error) { + return framework.NewFramework( c.registry, - c.plugins, - c.pluginConfig, + p.Plugins, + p.PluginConfig, framework.WithClientSet(c.client), framework.WithInformerFactory(c.informerFactory), framework.WithSnapshotSharedLister(c.nodeInfoSnapshot), framework.WithRunAllFilters(c.alwaysCheckAllPredicates), framework.WithVolumeBinder(c.volumeBinder), ) - if err != nil { - return nil, fmt.Errorf("initializing the scheduling framework: %v", err) - } +} +// create a scheduler from a set of registered plugins. +func (c *Configurator) create(extenders []core.SchedulerExtender) (*Scheduler, error) { + profiles, err := profile.NewMap(c.profiles, c.buildFramework, c.recorderFactory) + if err != nil { + return nil, fmt.Errorf("initializing profiles: %v", err) + } + if len(profiles) == 0 { + return nil, errors.New("at least one profile is required") + } + // Profiles are required to have equivalent queue sort plugins. + lessFn := profiles[c.profiles[0].SchedulerName].Framework.QueueSortFunc() podQueue := internalqueue.NewSchedulingQueue( - framework, + lessFn, internalqueue.WithPodInitialBackoffDuration(time.Duration(c.podInitialBackoffSeconds)*time.Second), internalqueue.WithPodMaxBackoffDuration(time.Duration(c.podMaxBackoffSeconds)*time.Second), ) @@ -140,9 +150,7 @@ func (c *Configurator) create(extenders []core.SchedulerExtender) (*Scheduler, e c.schedulerCache, podQueue, c.nodeInfoSnapshot, - framework, extenders, - c.volumeBinder, c.informerFactory.Core().V1().PersistentVolumeClaims().Lister(), GetPodDisruptionBudgetLister(c.informerFactory), c.disablePreemption, @@ -153,7 +161,7 @@ func (c *Configurator) create(extenders []core.SchedulerExtender) (*Scheduler, e return &Scheduler{ SchedulerCache: c.schedulerCache, Algorithm: algo, - Framework: framework, + Profiles: profiles, NextPod: internalqueue.MakeNextPodFunc(podQueue), Error: MakeDefaultErrorFunc(c.client, podQueue, c.schedulerCache), StopEverything: c.StopEverything, @@ -171,9 +179,13 @@ func (c *Configurator) createFromProvider(providerName string) (*Scheduler, erro return nil, fmt.Errorf("algorithm provider %q is not registered", providerName) } - // Combine the provided plugins with the ones from component config. - defaultPlugins.Apply(c.plugins) - c.plugins = defaultPlugins + for i := range c.profiles { + prof := &c.profiles[i] + plugins := &schedulerapi.Plugins{} + plugins.Append(defaultPlugins) + plugins.Apply(prof.Plugins) + prof.Plugins = plugins + } return c.create([]core.SchedulerExtender{}) } @@ -271,10 +283,10 @@ func (c *Configurator) createFromConfig(policy schedulerapi.Policy) (*Scheduler, } // Combine all framework configurations. If this results in any duplication, framework // instantiation should fail. - var defaultPlugins schedulerapi.Plugins + var defPlugins schedulerapi.Plugins // "PrioritySort" and "DefaultBinder" were neither predicates nor priorities // before. We add them by default. - defaultPlugins.Append(&schedulerapi.Plugins{ + defPlugins.Append(&schedulerapi.Plugins{ QueueSort: &schedulerapi.PluginSet{ Enabled: []schedulerapi.Plugin{{Name: queuesort.Name}}, }, @@ -282,16 +294,23 @@ func (c *Configurator) createFromConfig(policy schedulerapi.Policy) (*Scheduler, Enabled: []schedulerapi.Plugin{{Name: defaultbinder.Name}}, }, }) - defaultPlugins.Append(pluginsForPredicates) - defaultPlugins.Append(pluginsForPriorities) - defaultPlugins.Apply(c.plugins) - c.plugins = &defaultPlugins + defPlugins.Append(pluginsForPredicates) + defPlugins.Append(pluginsForPriorities) + var defPluginConfig []schedulerapi.PluginConfig + defPluginConfig = append(defPluginConfig, pluginConfigForPredicates...) + defPluginConfig = append(defPluginConfig, pluginConfigForPriorities...) + for i := range c.profiles { + prof := &c.profiles[i] + plugins := &schedulerapi.Plugins{} + plugins.Append(&defPlugins) + plugins.Apply(prof.Plugins) + prof.Plugins = plugins - var pluginConfig []schedulerapi.PluginConfig - pluginConfig = append(pluginConfig, pluginConfigForPredicates...) - pluginConfig = append(pluginConfig, pluginConfigForPriorities...) - pluginConfig = append(pluginConfig, c.pluginConfig...) - c.pluginConfig = pluginConfig + var pluginConfig []schedulerapi.PluginConfig + pluginConfig = append(pluginConfig, defPluginConfig...) + pluginConfig = append(pluginConfig, prof.PluginConfig...) + prof.PluginConfig = pluginConfig + } return c.create(extenders) } @@ -397,14 +416,14 @@ func MakeDefaultErrorFunc(client clientset.Interface, podQueue internalqueue.Sch } else { if _, ok := err.(*core.FitError); ok { klog.V(2).Infof("Unable to schedule %v/%v: no fit: %v; waiting", pod.Namespace, pod.Name, err) - } else if errors.IsNotFound(err) { + } else if apierrors.IsNotFound(err) { klog.V(2).Infof("Unable to schedule %v/%v: possibly due to node not found: %v; waiting", pod.Namespace, pod.Name, err) - if errStatus, ok := err.(errors.APIStatus); ok && errStatus.Status().Details.Kind == "node" { + if errStatus, ok := err.(apierrors.APIStatus); ok && errStatus.Status().Details.Kind == "node" { nodeName := errStatus.Status().Details.Name // when node is not found, We do not remove the node right away. Trying again to get // the node and if the node is still not found, then remove it from the scheduler cache. _, err := client.CoreV1().Nodes().Get(context.TODO(), nodeName, metav1.GetOptions{}) - if err != nil && errors.IsNotFound(err) { + if err != nil && apierrors.IsNotFound(err) { node := v1.Node{ObjectMeta: metav1.ObjectMeta{Name: nodeName}} if err := schedulerCache.RemoveNode(&node); err != nil { klog.V(4).Infof("Node %q is not found; failed to remove it from the cache.", node.Name) @@ -442,7 +461,7 @@ func MakeDefaultErrorFunc(client clientset.Interface, podQueue internalqueue.Sch } break } - if errors.IsNotFound(err) { + if apierrors.IsNotFound(err) { klog.Warningf("A pod %v no longer exists", podID) return } diff --git a/pkg/scheduler/factory_test.go b/pkg/scheduler/factory_test.go index 568882d5b20..b702e798345 100644 --- a/pkg/scheduler/factory_test.go +++ b/pkg/scheduler/factory_test.go @@ -35,6 +35,7 @@ import ( "k8s.io/client-go/kubernetes/fake" clienttesting "k8s.io/client-go/testing" "k8s.io/client-go/tools/cache" + "k8s.io/client-go/tools/events" apitesting "k8s.io/kubernetes/pkg/api/testing" kubefeatures "k8s.io/kubernetes/pkg/features" schedulerapi "k8s.io/kubernetes/pkg/scheduler/apis/config" @@ -51,6 +52,7 @@ import ( internalqueue "k8s.io/kubernetes/pkg/scheduler/internal/queue" "k8s.io/kubernetes/pkg/scheduler/listers" schedulernodeinfo "k8s.io/kubernetes/pkg/scheduler/nodeinfo" + "k8s.io/kubernetes/pkg/scheduler/profile" ) const ( @@ -58,6 +60,7 @@ const ( bindTimeoutSeconds = 600 podInitialBackoffDurationSeconds = 1 podMaxBackoffDurationSeconds = 10 + testSchedulerName = "test-scheduler" ) func TestCreate(t *testing.T) { @@ -65,7 +68,9 @@ func TestCreate(t *testing.T) { stopCh := make(chan struct{}) defer close(stopCh) factory := newConfigFactory(client, stopCh) - factory.createFromProvider(schedulerapi.SchedulerDefaultProviderName) + if _, err := factory.createFromProvider(schedulerapi.SchedulerDefaultProviderName); err != nil { + t.Error(err) + } } // Test configures a scheduler from a policies defined in a file @@ -106,12 +111,14 @@ func TestCreateFromConfig(t *testing.T) { if err != nil { t.Fatalf("createFromConfig failed: %v", err) } - queueSortPls := sched.Framework.ListPlugins()["QueueSortPlugin"] + // createFromConfig is the old codepath where we only have one profile. + prof := sched.Profiles[testSchedulerName] + queueSortPls := prof.ListPlugins()["QueueSortPlugin"] wantQueuePls := []schedulerapi.Plugin{{Name: queuesort.Name}} if diff := cmp.Diff(wantQueuePls, queueSortPls); diff != "" { t.Errorf("Unexpected QueueSort plugins (-want, +got): %s", diff) } - bindPls := sched.Framework.ListPlugins()["BindPlugin"] + bindPls := prof.ListPlugins()["BindPlugin"] wantBindPls := []schedulerapi.Plugin{{Name: defaultbinder.Name}} if diff := cmp.Diff(wantBindPls, bindPls); diff != "" { t.Errorf("Unexpected Bind plugins (-want, +got): %s", diff) @@ -119,16 +126,16 @@ func TestCreateFromConfig(t *testing.T) { // Verify that node label predicate/priority are converted to framework plugins. wantArgs := `{"Name":"NodeLabel","Args":{"presentLabels":["zone"],"absentLabels":["foo"],"presentLabelsPreference":["l1"],"absentLabelsPreference":["l2"]}}` - verifyPluginConvertion(t, nodelabel.Name, []string{"FilterPlugin", "ScorePlugin"}, sched, factory, 6, wantArgs) + verifyPluginConvertion(t, nodelabel.Name, []string{"FilterPlugin", "ScorePlugin"}, prof, &factory.profiles[0], 6, wantArgs) // Verify that service affinity custom predicate/priority is converted to framework plugin. wantArgs = `{"Name":"ServiceAffinity","Args":{"labels":["zone","foo"],"antiAffinityLabelsPreference":["rack","zone"]}}` - verifyPluginConvertion(t, serviceaffinity.Name, []string{"FilterPlugin", "ScorePlugin"}, sched, factory, 6, wantArgs) + verifyPluginConvertion(t, serviceaffinity.Name, []string{"FilterPlugin", "ScorePlugin"}, prof, &factory.profiles[0], 6, wantArgs) // TODO(#87703): Verify all plugin configs. } -func verifyPluginConvertion(t *testing.T, name string, extentionPoints []string, sched *Scheduler, configurator *Configurator, wantWeight int32, wantArgs string) { +func verifyPluginConvertion(t *testing.T, name string, extentionPoints []string, prof *profile.Profile, cfg *schedulerapi.KubeSchedulerProfile, wantWeight int32, wantArgs string) { for _, extensionPoint := range extentionPoints { - plugin, ok := findPlugin(name, extensionPoint, sched) + plugin, ok := findPlugin(name, extensionPoint, prof) if !ok { t.Fatalf("%q plugin does not exist in framework.", name) } @@ -138,7 +145,7 @@ func verifyPluginConvertion(t *testing.T, name string, extentionPoints []string, } } // Verify that the policy config is converted to plugin config. - pluginConfig := findPluginConfig(name, configurator) + pluginConfig := findPluginConfig(name, cfg) encoding, err := json.Marshal(pluginConfig) if err != nil { t.Errorf("Failed to marshal %+v: %v", pluginConfig, err) @@ -149,8 +156,8 @@ func verifyPluginConvertion(t *testing.T, name string, extentionPoints []string, } } -func findPlugin(name, extensionPoint string, sched *Scheduler) (schedulerapi.Plugin, bool) { - for _, pl := range sched.Framework.ListPlugins()[extensionPoint] { +func findPlugin(name, extensionPoint string, prof *profile.Profile) (schedulerapi.Plugin, bool) { + for _, pl := range prof.ListPlugins()[extensionPoint] { if pl.Name == name { return pl, true } @@ -158,8 +165,8 @@ func findPlugin(name, extensionPoint string, sched *Scheduler) (schedulerapi.Plu return schedulerapi.Plugin{}, false } -func findPluginConfig(name string, configurator *Configurator) schedulerapi.PluginConfig { - for _, c := range configurator.pluginConfig { +func findPluginConfig(name string, prof *schedulerapi.KubeSchedulerProfile) schedulerapi.PluginConfig { + for _, c := range prof.PluginConfig { if c.Name == name { return c } @@ -196,10 +203,12 @@ func TestCreateFromConfigWithHardPodAffinitySymmetricWeight(t *testing.T) { if err := runtime.DecodeInto(scheme.Codecs.UniversalDecoder(), configData, &policy); err != nil { t.Fatalf("Invalid configuration: %v", err) } - factory.createFromConfig(policy) + if _, err := factory.createFromConfig(policy); err != nil { + t.Fatal(err) + } // TODO(#87703): Verify that the entire pluginConfig is correct. foundAffinityCfg := false - for _, cfg := range factory.pluginConfig { + for _, cfg := range factory.profiles[0].PluginConfig { if cfg.Name == interpodaffinity.Name { foundAffinityCfg = true wantArgs := runtime.Unknown{Raw: []byte(`{"hardPodAffinityWeight":10}`)} @@ -228,9 +237,12 @@ func TestCreateFromEmptyConfig(t *testing.T) { t.Errorf("Invalid configuration: %v", err) } - factory.createFromConfig(policy) - if len(factory.pluginConfig) != 0 { - t.Errorf("got plugin config %s, want none", factory.pluginConfig) + if _, err := factory.createFromConfig(policy); err != nil { + t.Fatal(err) + } + prof := factory.profiles[0] + if len(prof.PluginConfig) != 0 { + t.Errorf("got plugin config %s, want none", prof.PluginConfig) } } @@ -256,7 +268,7 @@ func TestCreateFromConfigWithUnspecifiedPredicatesOrPriorities(t *testing.T) { if err != nil { t.Fatalf("Failed to create scheduler from configuration: %v", err) } - if _, exist := findPlugin("NodeResourcesFit", "FilterPlugin", sched); !exist { + if _, exist := findPlugin("NodeResourcesFit", "FilterPlugin", sched.Profiles[testSchedulerName]); !exist { t.Errorf("Expected plugin NodeResourcesFit") } } @@ -396,6 +408,7 @@ func newConfigFactoryWithFrameworkRegistry( registry framework.Registry) *Configurator { informerFactory := informers.NewSharedInformerFactory(client, 0) snapshot := internalcache.NewEmptySnapshot() + recorderFactory := profile.NewRecorderFactory(events.NewBroadcaster(&events.EventSinkImpl{Interface: client.EventsV1beta1().Events("")})) return &Configurator{ client: client, informerFactory: informerFactory, @@ -408,9 +421,11 @@ func newConfigFactoryWithFrameworkRegistry( StopEverything: stopCh, enableNonPreempting: utilfeature.DefaultFeatureGate.Enabled(kubefeatures.NonPreemptingPriority), registry: registry, - plugins: nil, - pluginConfig: []schedulerapi.PluginConfig{}, - nodeInfoSnapshot: snapshot, + profiles: []schedulerapi.KubeSchedulerProfile{ + {SchedulerName: testSchedulerName}, + }, + recorderFactory: recorderFactory, + nodeInfoSnapshot: snapshot, } } diff --git a/pkg/scheduler/internal/queue/BUILD b/pkg/scheduler/internal/queue/BUILD index 392390e1795..cec893da3de 100644 --- a/pkg/scheduler/internal/queue/BUILD +++ b/pkg/scheduler/internal/queue/BUILD @@ -28,19 +28,14 @@ go_test( embed = [":go_default_library"], deps = [ "//pkg/api/v1/pod:go_default_library", - "//pkg/scheduler/algorithmprovider:go_default_library", - "//pkg/scheduler/apis/config:go_default_library", - "//pkg/scheduler/framework/plugins:go_default_library", + "//pkg/scheduler/framework/plugins/queuesort:go_default_library", "//pkg/scheduler/framework/v1alpha1:go_default_library", - "//pkg/scheduler/internal/cache:go_default_library", "//pkg/scheduler/metrics:go_default_library", "//pkg/scheduler/util:go_default_library", "//staging/src/k8s.io/api/core/v1:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/types:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/util/clock:go_default_library", - "//staging/src/k8s.io/client-go/informers:go_default_library", - "//staging/src/k8s.io/client-go/kubernetes/fake:go_default_library", "//staging/src/k8s.io/component-base/metrics/testutil:go_default_library", ], ) diff --git a/pkg/scheduler/internal/queue/scheduling_queue.go b/pkg/scheduler/internal/queue/scheduling_queue.go index a7a76f9150a..a6ca8127c63 100644 --- a/pkg/scheduler/internal/queue/scheduling_queue.go +++ b/pkg/scheduler/internal/queue/scheduling_queue.go @@ -99,8 +99,8 @@ type SchedulingQueue interface { } // NewSchedulingQueue initializes a priority queue as a new scheduling queue. -func NewSchedulingQueue(fwk framework.Framework, opts ...Option) SchedulingQueue { - return NewPriorityQueue(fwk, opts...) +func NewSchedulingQueue(lessFn framework.LessFunc, opts ...Option) SchedulingQueue { + return NewPriorityQueue(lessFn, opts...) } // NominatedNodeName returns nominated node name of a Pod. @@ -200,7 +200,7 @@ func newPodInfoNoTimestamp(pod *v1.Pod) *framework.PodInfo { // NewPriorityQueue creates a PriorityQueue object. func NewPriorityQueue( - fwk framework.Framework, + lessFn framework.LessFunc, opts ...Option, ) *PriorityQueue { options := defaultPriorityQueueOptions @@ -211,7 +211,7 @@ func NewPriorityQueue( comp := func(podInfo1, podInfo2 interface{}) bool { pInfo1 := podInfo1.(*framework.PodInfo) pInfo2 := podInfo2.(*framework.PodInfo) - return fwk.QueueSortFunc()(pInfo1, pInfo2) + return lessFn(pInfo1, pInfo2) } pq := &PriorityQueue{ diff --git a/pkg/scheduler/internal/queue/scheduling_queue_test.go b/pkg/scheduler/internal/queue/scheduling_queue_test.go index aea8f2e8b04..ea6391974a3 100644 --- a/pkg/scheduler/internal/queue/scheduling_queue_test.go +++ b/pkg/scheduler/internal/queue/scheduling_queue_test.go @@ -28,15 +28,10 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/util/clock" - "k8s.io/client-go/informers" - "k8s.io/client-go/kubernetes/fake" "k8s.io/component-base/metrics/testutil" podutil "k8s.io/kubernetes/pkg/api/v1/pod" - "k8s.io/kubernetes/pkg/scheduler/algorithmprovider" - schedulerapi "k8s.io/kubernetes/pkg/scheduler/apis/config" - frameworkplugins "k8s.io/kubernetes/pkg/scheduler/framework/plugins" + "k8s.io/kubernetes/pkg/scheduler/framework/plugins/queuesort" framework "k8s.io/kubernetes/pkg/scheduler/framework/v1alpha1" - "k8s.io/kubernetes/pkg/scheduler/internal/cache" "k8s.io/kubernetes/pkg/scheduler/metrics" "k8s.io/kubernetes/pkg/scheduler/util" ) @@ -122,7 +117,7 @@ func getUnschedulablePod(p *PriorityQueue, pod *v1.Pod) *v1.Pod { } func TestPriorityQueue_Add(t *testing.T) { - q := createAndRunPriorityQueue(newDefaultFramework()) + q := createAndRunPriorityQueue(newDefaultQueueSort()) if err := q.Add(&medPriorityPod); err != nil { t.Errorf("add failed: %v", err) } @@ -158,25 +153,13 @@ func TestPriorityQueue_Add(t *testing.T) { } } -func newDefaultFramework() framework.Framework { - plugins := algorithmprovider.NewRegistry()[schedulerapi.SchedulerDefaultProviderName] - fakeClient := fake.NewSimpleClientset() - fwk, err := framework.NewFramework( - frameworkplugins.NewInTreeRegistry(), - plugins, - nil, - framework.WithClientSet(fakeClient), - framework.WithInformerFactory(informers.NewSharedInformerFactory(fakeClient, 0)), - framework.WithSnapshotSharedLister(cache.NewEmptySnapshot()), - ) - if err != nil { - panic(err) - } - return fwk +func newDefaultQueueSort() framework.LessFunc { + sort := &queuesort.PrioritySort{} + return sort.Less } func TestPriorityQueue_AddWithReversePriorityLessFunc(t *testing.T) { - q := createAndRunPriorityQueue(newDefaultFramework()) + q := createAndRunPriorityQueue(newDefaultQueueSort()) if err := q.Add(&medPriorityPod); err != nil { t.Errorf("add failed: %v", err) } @@ -192,7 +175,7 @@ func TestPriorityQueue_AddWithReversePriorityLessFunc(t *testing.T) { } func TestPriorityQueue_AddUnschedulableIfNotPresent(t *testing.T) { - q := createAndRunPriorityQueue(newDefaultFramework()) + q := createAndRunPriorityQueue(newDefaultQueueSort()) q.Add(&highPriNominatedPod) q.AddUnschedulableIfNotPresent(newPodInfoNoTimestamp(&highPriNominatedPod), q.SchedulingCycle()) // Must not add anything. q.AddUnschedulableIfNotPresent(newPodInfoNoTimestamp(&unschedulablePod), q.SchedulingCycle()) @@ -224,7 +207,7 @@ func TestPriorityQueue_AddUnschedulableIfNotPresent(t *testing.T) { // Pods in and before current scheduling cycle will be put back to activeQueue // if we were trying to schedule them when we received move request. func TestPriorityQueue_AddUnschedulableIfNotPresent_Backoff(t *testing.T) { - q := createAndRunPriorityQueue(newDefaultFramework(), WithClock(clock.NewFakeClock(time.Now()))) + q := createAndRunPriorityQueue(newDefaultQueueSort(), WithClock(clock.NewFakeClock(time.Now()))) totalNum := 10 expectedPods := make([]v1.Pod, 0, totalNum) for i := 0; i < totalNum; i++ { @@ -291,7 +274,7 @@ func TestPriorityQueue_AddUnschedulableIfNotPresent_Backoff(t *testing.T) { } func TestPriorityQueue_Pop(t *testing.T) { - q := createAndRunPriorityQueue(newDefaultFramework()) + q := createAndRunPriorityQueue(newDefaultQueueSort()) wg := sync.WaitGroup{} wg.Add(1) go func() { @@ -308,7 +291,7 @@ func TestPriorityQueue_Pop(t *testing.T) { } func TestPriorityQueue_Update(t *testing.T) { - q := createAndRunPriorityQueue(newDefaultFramework()) + q := createAndRunPriorityQueue(newDefaultQueueSort()) q.Update(nil, &highPriorityPod) q.lock.RLock() if _, exists, _ := q.activeQ.Get(newPodInfoNoTimestamp(&highPriorityPod)); !exists { @@ -364,7 +347,7 @@ func TestPriorityQueue_Update(t *testing.T) { } func TestPriorityQueue_Delete(t *testing.T) { - q := createAndRunPriorityQueue(newDefaultFramework()) + q := createAndRunPriorityQueue(newDefaultQueueSort()) q.Update(&highPriorityPod, &highPriNominatedPod) q.Add(&unschedulablePod) if err := q.Delete(&highPriNominatedPod); err != nil { @@ -390,7 +373,7 @@ func TestPriorityQueue_Delete(t *testing.T) { } func TestPriorityQueue_MoveAllToActiveOrBackoffQueue(t *testing.T) { - q := createAndRunPriorityQueue(newDefaultFramework()) + q := createAndRunPriorityQueue(newDefaultQueueSort()) q.Add(&medPriorityPod) q.AddUnschedulableIfNotPresent(q.newPodInfo(&unschedulablePod), q.SchedulingCycle()) q.AddUnschedulableIfNotPresent(q.newPodInfo(&highPriorityPod), q.SchedulingCycle()) @@ -442,7 +425,7 @@ func TestPriorityQueue_AssignedPodAdded(t *testing.T) { } c := clock.NewFakeClock(time.Now()) - q := createAndRunPriorityQueue(newDefaultFramework(), WithClock(c)) + q := createAndRunPriorityQueue(newDefaultQueueSort(), WithClock(c)) q.Add(&medPriorityPod) // Add a couple of pods to the unschedulableQ. q.AddUnschedulableIfNotPresent(q.newPodInfo(&unschedulablePod), q.SchedulingCycle()) @@ -468,7 +451,7 @@ func TestPriorityQueue_AssignedPodAdded(t *testing.T) { } func TestPriorityQueue_NominatedPodsForNode(t *testing.T) { - q := createAndRunPriorityQueue(newDefaultFramework()) + q := createAndRunPriorityQueue(newDefaultQueueSort()) q.Add(&medPriorityPod) q.Add(&unschedulablePod) q.Add(&highPriorityPod) @@ -493,7 +476,7 @@ func TestPriorityQueue_PendingPods(t *testing.T) { return pendingSet } - q := createAndRunPriorityQueue(newDefaultFramework()) + q := createAndRunPriorityQueue(newDefaultQueueSort()) q.Add(&medPriorityPod) q.AddUnschedulableIfNotPresent(q.newPodInfo(&unschedulablePod), q.SchedulingCycle()) q.AddUnschedulableIfNotPresent(q.newPodInfo(&highPriorityPod), q.SchedulingCycle()) @@ -510,7 +493,7 @@ func TestPriorityQueue_PendingPods(t *testing.T) { } func TestPriorityQueue_UpdateNominatedPodForNode(t *testing.T) { - q := createAndRunPriorityQueue(newDefaultFramework()) + q := createAndRunPriorityQueue(newDefaultQueueSort()) if err := q.Add(&medPriorityPod); err != nil { t.Errorf("add failed: %v", err) } @@ -580,7 +563,7 @@ func TestPriorityQueue_UpdateNominatedPodForNode(t *testing.T) { func TestPriorityQueue_NewWithOptions(t *testing.T) { q := createAndRunPriorityQueue( - newDefaultFramework(), + newDefaultQueueSort(), WithPodInitialBackoffDuration(2*time.Second), WithPodMaxBackoffDuration(20*time.Second), ) @@ -752,7 +735,7 @@ func TestSchedulingQueue_Close(t *testing.T) { }{ { name: "PriorityQueue close", - q: createAndRunPriorityQueue(newDefaultFramework()), + q: createAndRunPriorityQueue(newDefaultQueueSort()), expectedErr: fmt.Errorf(queueClosed), }, } @@ -781,7 +764,7 @@ func TestSchedulingQueue_Close(t *testing.T) { // ensures that an unschedulable pod does not block head of the queue when there // are frequent events that move pods to the active queue. func TestRecentlyTriedPodsGoBack(t *testing.T) { - q := createAndRunPriorityQueue(newDefaultFramework()) + q := createAndRunPriorityQueue(newDefaultQueueSort()) // Add a few pods to priority queue. for i := 0; i < 5; i++ { p := v1.Pod{ @@ -836,7 +819,7 @@ func TestRecentlyTriedPodsGoBack(t *testing.T) { // are frequent events that move pods to the active queue. func TestPodFailedSchedulingMultipleTimesDoesNotBlockNewerPod(t *testing.T) { c := clock.NewFakeClock(time.Now()) - q := createAndRunPriorityQueue(newDefaultFramework(), WithClock(c)) + q := createAndRunPriorityQueue(newDefaultQueueSort(), WithClock(c)) // Add an unschedulable pod to a priority queue. // This makes a situation that the pod was tried to schedule @@ -927,7 +910,7 @@ func TestPodFailedSchedulingMultipleTimesDoesNotBlockNewerPod(t *testing.T) { // TestHighPriorityBackoff tests that a high priority pod does not block // other pods if it is unschedulable func TestHighPriorityBackoff(t *testing.T) { - q := createAndRunPriorityQueue(newDefaultFramework()) + q := createAndRunPriorityQueue(newDefaultQueueSort()) midPod := v1.Pod{ ObjectMeta: metav1.ObjectMeta{ @@ -991,7 +974,7 @@ func TestHighPriorityBackoff(t *testing.T) { // activeQ after one minutes if it is in unschedulableQ func TestHighPriorityFlushUnschedulableQLeftover(t *testing.T) { c := clock.NewFakeClock(time.Now()) - q := createAndRunPriorityQueue(newDefaultFramework(), WithClock(c)) + q := createAndRunPriorityQueue(newDefaultQueueSort(), WithClock(c)) midPod := v1.Pod{ ObjectMeta: metav1.ObjectMeta{ Name: "test-midpod", @@ -1182,7 +1165,7 @@ func TestPodTimestamp(t *testing.T) { for _, test := range tests { t.Run(test.name, func(t *testing.T) { - queue := createAndRunPriorityQueue(newDefaultFramework(), WithClock(clock.NewFakeClock(timestamp))) + queue := createAndRunPriorityQueue(newDefaultQueueSort(), WithClock(clock.NewFakeClock(timestamp))) var podInfoList []*framework.PodInfo for i, op := range test.operations { @@ -1341,7 +1324,7 @@ scheduler_pending_pods{queue="unschedulable"} 0 for _, test := range tests { t.Run(test.name, func(t *testing.T) { resetMetrics() - queue := createAndRunPriorityQueue(newDefaultFramework(), WithClock(clock.NewFakeClock(timestamp))) + queue := createAndRunPriorityQueue(newDefaultQueueSort(), WithClock(clock.NewFakeClock(timestamp))) for i, op := range test.operations { for _, pInfo := range test.operands[i] { op(queue, pInfo) @@ -1370,7 +1353,7 @@ func TestPerPodSchedulingMetrics(t *testing.T) { // Case 1: A pod is created and scheduled after 1 attempt. The queue operations are // Add -> Pop. c := clock.NewFakeClock(timestamp) - queue := createAndRunPriorityQueue(newDefaultFramework(), WithClock(c)) + queue := createAndRunPriorityQueue(newDefaultQueueSort(), WithClock(c)) queue.Add(pod) pInfo, err := queue.Pop() if err != nil { @@ -1381,7 +1364,7 @@ func TestPerPodSchedulingMetrics(t *testing.T) { // Case 2: A pod is created and scheduled after 2 attempts. The queue operations are // Add -> Pop -> AddUnschedulableIfNotPresent -> flushUnschedulableQLeftover -> Pop. c = clock.NewFakeClock(timestamp) - queue = createAndRunPriorityQueue(newDefaultFramework(), WithClock(c)) + queue = createAndRunPriorityQueue(newDefaultQueueSort(), WithClock(c)) queue.Add(pod) pInfo, err = queue.Pop() if err != nil { @@ -1401,7 +1384,7 @@ func TestPerPodSchedulingMetrics(t *testing.T) { // Case 3: Similar to case 2, but before the second pop, call update, the queue operations are // Add -> Pop -> AddUnschedulableIfNotPresent -> flushUnschedulableQLeftover -> Update -> Pop. c = clock.NewFakeClock(timestamp) - queue = createAndRunPriorityQueue(newDefaultFramework(), WithClock(c)) + queue = createAndRunPriorityQueue(newDefaultQueueSort(), WithClock(c)) queue.Add(pod) pInfo, err = queue.Pop() if err != nil { @@ -1499,7 +1482,7 @@ func TestIncomingPodsMetrics(t *testing.T) { for _, test := range tests { t.Run(test.name, func(t *testing.T) { metrics.SchedulerQueueIncomingPods.Reset() - queue := NewPriorityQueue(newDefaultFramework(), WithClock(clock.NewFakeClock(timestamp))) + queue := NewPriorityQueue(newDefaultQueueSort(), WithClock(clock.NewFakeClock(timestamp))) queue.Close() queue.Run() for _, op := range test.operations { @@ -1525,15 +1508,15 @@ func checkPerPodSchedulingMetrics(name string, t *testing.T, pInfo *framework.Po } } -func createAndRunPriorityQueue(fwk framework.Framework, opts ...Option) *PriorityQueue { - q := NewPriorityQueue(fwk, opts...) +func createAndRunPriorityQueue(lessFn framework.LessFunc, opts ...Option) *PriorityQueue { + q := NewPriorityQueue(lessFn, opts...) q.Run() return q } func TestBackOffFlow(t *testing.T) { cl := clock.NewFakeClock(time.Now()) - q := NewPriorityQueue(newDefaultFramework(), WithClock(cl)) + q := NewPriorityQueue(newDefaultQueueSort(), WithClock(cl)) steps := []struct { wantBackoff time.Duration }{ diff --git a/pkg/scheduler/profile/BUILD b/pkg/scheduler/profile/BUILD new file mode 100644 index 00000000000..fce95fc5ae6 --- /dev/null +++ b/pkg/scheduler/profile/BUILD @@ -0,0 +1,45 @@ +load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test") + +go_library( + name = "go_default_library", + srcs = ["profile.go"], + importpath = "k8s.io/kubernetes/pkg/scheduler/profile", + visibility = ["//visibility:public"], + deps = [ + "//pkg/scheduler/apis/config:go_default_library", + "//pkg/scheduler/framework/v1alpha1:go_default_library", + "//staging/src/k8s.io/apimachinery/pkg/runtime:go_default_library", + "//staging/src/k8s.io/client-go/kubernetes/scheme:go_default_library", + "//staging/src/k8s.io/client-go/tools/events:go_default_library", + "//vendor/github.com/google/go-cmp/cmp:go_default_library", + ], +) + +filegroup( + name = "package-srcs", + srcs = glob(["**"]), + tags = ["automanaged"], + visibility = ["//visibility:private"], +) + +filegroup( + name = "all-srcs", + srcs = [":package-srcs"], + tags = ["automanaged"], + visibility = ["//visibility:public"], +) + +go_test( + name = "go_default_test", + srcs = ["profile_test.go"], + embed = [":go_default_library"], + deps = [ + "//pkg/scheduler/apis/config:go_default_library", + "//pkg/scheduler/framework/v1alpha1:go_default_library", + "//staging/src/k8s.io/api/core/v1:go_default_library", + "//staging/src/k8s.io/api/events/v1beta1:go_default_library", + "//staging/src/k8s.io/apimachinery/pkg/runtime:go_default_library", + "//staging/src/k8s.io/client-go/kubernetes/fake:go_default_library", + "//staging/src/k8s.io/client-go/tools/events:go_default_library", + ], +) diff --git a/pkg/scheduler/profile/profile.go b/pkg/scheduler/profile/profile.go new file mode 100644 index 00000000000..39f94b9ba19 --- /dev/null +++ b/pkg/scheduler/profile/profile.go @@ -0,0 +1,129 @@ +/* +Copyright 2020 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +// Package profile holds the definition of a scheduling Profile. +package profile + +import ( + "errors" + "fmt" + + "github.com/google/go-cmp/cmp" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/client-go/kubernetes/scheme" + "k8s.io/client-go/tools/events" + "k8s.io/kubernetes/pkg/scheduler/apis/config" + framework "k8s.io/kubernetes/pkg/scheduler/framework/v1alpha1" +) + +// RecorderFactory builds an EventRecorder for a given scheduler name. +type RecorderFactory func(string) events.EventRecorder + +// FrameworkFactory builds a Framework for a given profile configuration. +type FrameworkFactory func(config.KubeSchedulerProfile) (framework.Framework, error) + +// Profile is a scheduling profile. +type Profile struct { + framework.Framework + Recorder events.EventRecorder +} + +// NewProfile builds a Profile for the given configuration. +func NewProfile(cfg config.KubeSchedulerProfile, frameworkFact FrameworkFactory, recorderFact RecorderFactory) (*Profile, error) { + f, err := frameworkFact(cfg) + if err != nil { + return nil, err + } + r := recorderFact(cfg.SchedulerName) + return &Profile{ + Framework: f, + Recorder: r, + }, nil +} + +// Map holds profiles indexed by scheduler name. +type Map map[string]*Profile + +// NewMap builds the profiles given by the configuration, indexed by name. +func NewMap(cfgs []config.KubeSchedulerProfile, frameworkFact FrameworkFactory, recorderFact RecorderFactory) (Map, error) { + m := make(Map) + v := cfgValidator{m: m} + + for _, cfg := range cfgs { + if err := v.validate(cfg); err != nil { + return nil, err + } + p, err := NewProfile(cfg, frameworkFact, recorderFact) + if err != nil { + return nil, fmt.Errorf("creating profile for scheduler name %s: %v", cfg.SchedulerName, err) + } + m[cfg.SchedulerName] = p + } + return m, nil +} + +// HandlesSchedulerName returns whether a profile handles the given scheduler name. +func (m Map) HandlesSchedulerName(name string) bool { + _, ok := m[name] + return ok +} + +// NewRecorderFactory returns a RecorderFactory for the broadcaster. +func NewRecorderFactory(b events.EventBroadcaster) RecorderFactory { + return func(name string) events.EventRecorder { + return b.NewRecorder(scheme.Scheme, name) + } +} + +type cfgValidator struct { + m Map + queueSort string + queueSortArgs runtime.Unknown +} + +func (v *cfgValidator) validate(cfg config.KubeSchedulerProfile) error { + if len(cfg.SchedulerName) == 0 { + return errors.New("scheduler name is needed") + } + if cfg.Plugins == nil { + return fmt.Errorf("plugins required for profile with scheduler name %q", cfg.SchedulerName) + } + if v.m[cfg.SchedulerName] != nil { + return fmt.Errorf("duplicate profile with scheduler name %q", cfg.SchedulerName) + } + if cfg.Plugins.QueueSort == nil || len(cfg.Plugins.QueueSort.Enabled) != 1 { + return fmt.Errorf("one queue sort plugin required for profile with scheduler name %q", cfg.SchedulerName) + } + queueSort := cfg.Plugins.QueueSort.Enabled[0].Name + var queueSortArgs runtime.Unknown + for _, plCfg := range cfg.PluginConfig { + if plCfg.Name == queueSort { + queueSortArgs = plCfg.Args + } + } + if len(v.queueSort) == 0 { + v.queueSort = queueSort + v.queueSortArgs = queueSortArgs + return nil + } + if v.queueSort != queueSort { + return fmt.Errorf("different queue sort plugins for profile %q: %q, first: %q", cfg.SchedulerName, queueSort, v.queueSort) + } + if !cmp.Equal(v.queueSortArgs, queueSortArgs) { + return fmt.Errorf("different queue sort plugin args for profile %q: %s", cfg.SchedulerName, queueSortArgs.Raw) + } + return nil +} diff --git a/pkg/scheduler/profile/profile_test.go b/pkg/scheduler/profile/profile_test.go new file mode 100644 index 00000000000..2f96a652166 --- /dev/null +++ b/pkg/scheduler/profile/profile_test.go @@ -0,0 +1,327 @@ +/* +Copyright 2020 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package profile + +import ( + "context" + "fmt" + "strings" + "testing" + + v1 "k8s.io/api/core/v1" + "k8s.io/api/events/v1beta1" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/client-go/kubernetes/fake" + "k8s.io/client-go/tools/events" + "k8s.io/kubernetes/pkg/scheduler/apis/config" + framework "k8s.io/kubernetes/pkg/scheduler/framework/v1alpha1" +) + +var fakeRegistry = framework.Registry{ + "QueueSort": newFakePlugin, + "Bind1": newFakePlugin, + "Bind2": newFakePlugin, + "Another": newFakePlugin, +} + +func TestNewProfile(t *testing.T) { + cases := []struct { + name string + cfg config.KubeSchedulerProfile + wantErr string + }{ + { + name: "valid", + cfg: config.KubeSchedulerProfile{ + SchedulerName: "valid-profile", + Plugins: &config.Plugins{ + QueueSort: &config.PluginSet{ + Enabled: []config.Plugin{ + {Name: "QueueSort"}, + }, + }, + Bind: &config.PluginSet{ + Enabled: []config.Plugin{ + {Name: "Bind1"}, + }, + }, + }, + PluginConfig: []config.PluginConfig{ + { + Name: "QueueSort", + Args: runtime.Unknown{Raw: []byte("{}")}, + }, + }, + }, + }, + { + name: "invalid framework configuration", + cfg: config.KubeSchedulerProfile{ + SchedulerName: "invalid-profile", + Plugins: &config.Plugins{ + QueueSort: &config.PluginSet{ + Enabled: []config.Plugin{ + {Name: "QueueSort"}, + }, + }, + }, + }, + wantErr: "at least one bind plugin is needed", + }, + } + for _, tc := range cases { + t.Run(tc.name, func(t *testing.T) { + c := fake.NewSimpleClientset() + b := events.NewBroadcaster(&events.EventSinkImpl{Interface: c.EventsV1beta1().Events("")}) + p, err := NewProfile(tc.cfg, fakeFrameworkFactory, NewRecorderFactory(b)) + if err := checkErr(err, tc.wantErr); err != nil { + t.Fatal(err) + } + if len(tc.wantErr) != 0 { + return + } + + called := make(chan struct{}) + var ctrl string + stopFn := b.StartEventWatcher(func(obj runtime.Object) { + e, _ := obj.(*v1beta1.Event) + ctrl = e.ReportingController + close(called) + }) + p.Recorder.Eventf(&v1.Pod{}, nil, v1.EventTypeNormal, "", "", "") + <-called + stopFn() + if ctrl != tc.cfg.SchedulerName { + t.Errorf("got controller name %q in event, want %q", ctrl, tc.cfg.SchedulerName) + } + }) + } +} + +func TestNewMap(t *testing.T) { + cases := []struct { + name string + cfgs []config.KubeSchedulerProfile + wantErr string + }{ + { + name: "valid", + cfgs: []config.KubeSchedulerProfile{ + { + SchedulerName: "profile-1", + Plugins: &config.Plugins{ + QueueSort: &config.PluginSet{ + Enabled: []config.Plugin{ + {Name: "QueueSort"}, + }, + }, + Bind: &config.PluginSet{ + Enabled: []config.Plugin{ + {Name: "Bind1"}, + }, + }, + }, + }, + { + SchedulerName: "profile-2", + Plugins: &config.Plugins{ + QueueSort: &config.PluginSet{ + Enabled: []config.Plugin{ + {Name: "QueueSort"}, + }, + }, + Bind: &config.PluginSet{ + Enabled: []config.Plugin{ + {Name: "Bind2"}, + }, + }, + }, + PluginConfig: []config.PluginConfig{ + { + Name: "Bind2", + Args: runtime.Unknown{Raw: []byte("{}")}, + }, + }, + }, + }, + }, + { + name: "different queue sort", + cfgs: []config.KubeSchedulerProfile{ + { + SchedulerName: "profile-1", + Plugins: &config.Plugins{ + QueueSort: &config.PluginSet{ + Enabled: []config.Plugin{ + {Name: "QueueSort"}, + }, + }, + Bind: &config.PluginSet{ + Enabled: []config.Plugin{ + {Name: "Bind1"}, + }, + }, + }, + }, + { + SchedulerName: "profile-2", + Plugins: &config.Plugins{ + QueueSort: &config.PluginSet{ + Enabled: []config.Plugin{ + {Name: "Another"}, + }, + }, + Bind: &config.PluginSet{ + Enabled: []config.Plugin{ + {Name: "Bind2"}, + }, + }, + }, + }, + }, + wantErr: "different queue sort plugins", + }, + { + name: "different queue sort args", + cfgs: []config.KubeSchedulerProfile{ + { + SchedulerName: "profile-1", + Plugins: &config.Plugins{ + QueueSort: &config.PluginSet{ + Enabled: []config.Plugin{ + {Name: "QueueSort"}, + }, + }, + Bind: &config.PluginSet{ + Enabled: []config.Plugin{ + {Name: "Bind1"}, + }, + }, + }, + PluginConfig: []config.PluginConfig{ + { + Name: "QueueSort", + Args: runtime.Unknown{Raw: []byte("{}")}, + }, + }, + }, + { + SchedulerName: "profile-2", + Plugins: &config.Plugins{ + QueueSort: &config.PluginSet{ + Enabled: []config.Plugin{ + {Name: "QueueSort"}, + }, + }, + Bind: &config.PluginSet{ + Enabled: []config.Plugin{ + {Name: "Bind2"}, + }, + }, + }, + }, + }, + wantErr: "different queue sort plugin args", + }, + { + name: "duplicate scheduler name", + cfgs: []config.KubeSchedulerProfile{ + { + SchedulerName: "profile-1", + Plugins: &config.Plugins{ + QueueSort: &config.PluginSet{ + Enabled: []config.Plugin{ + {Name: "QueueSort"}, + }, + }, + Bind: &config.PluginSet{ + Enabled: []config.Plugin{ + {Name: "Bind1"}, + }, + }, + }, + }, + { + SchedulerName: "profile-1", + Plugins: &config.Plugins{ + QueueSort: &config.PluginSet{ + Enabled: []config.Plugin{ + {Name: "QueueSort"}, + }, + }, + Bind: &config.PluginSet{ + Enabled: []config.Plugin{ + {Name: "Bind2"}, + }, + }, + }, + }, + }, + wantErr: "duplicate profile", + }, + } + for _, tc := range cases { + t.Run(tc.name, func(t *testing.T) { + m, err := NewMap(tc.cfgs, fakeFrameworkFactory, nilRecorderFactory) + if err := checkErr(err, tc.wantErr); err != nil { + t.Fatal(err) + } + if len(tc.wantErr) != 0 { + return + } + if len(m) != len(tc.cfgs) { + t.Errorf("got %d profiles, want %d", len(m), len(tc.cfgs)) + } + }) + } +} + +type fakePlugin struct{} + +func (p *fakePlugin) Name() string { + return "" +} + +func (p *fakePlugin) Less(*framework.PodInfo, *framework.PodInfo) bool { + return false +} + +func (p *fakePlugin) Bind(context.Context, *framework.CycleState, *v1.Pod, string) *framework.Status { + return nil +} + +func newFakePlugin(_ *runtime.Unknown, _ framework.FrameworkHandle) (framework.Plugin, error) { + return &fakePlugin{}, nil +} + +func fakeFrameworkFactory(cfg config.KubeSchedulerProfile) (framework.Framework, error) { + return framework.NewFramework(fakeRegistry, cfg.Plugins, cfg.PluginConfig) +} + +func nilRecorderFactory(_ string) events.EventRecorder { + return nil +} + +func checkErr(err error, wantErr string) error { + if len(wantErr) == 0 { + return err + } + if err == nil || !strings.Contains(err.Error(), wantErr) { + return fmt.Errorf("got error %q, want %q", err, wantErr) + } + return nil +} diff --git a/pkg/scheduler/scheduler.go b/pkg/scheduler/scheduler.go index 89411a3f940..f09aea9738f 100644 --- a/pkg/scheduler/scheduler.go +++ b/pkg/scheduler/scheduler.go @@ -33,7 +33,6 @@ import ( coreinformers "k8s.io/client-go/informers/core/v1" clientset "k8s.io/client-go/kubernetes" "k8s.io/client-go/tools/cache" - "k8s.io/client-go/tools/events" "k8s.io/klog" podutil "k8s.io/kubernetes/pkg/api/v1/pod" kubefeatures "k8s.io/kubernetes/pkg/features" @@ -45,6 +44,7 @@ import ( internalcache "k8s.io/kubernetes/pkg/scheduler/internal/cache" internalqueue "k8s.io/kubernetes/pkg/scheduler/internal/queue" "k8s.io/kubernetes/pkg/scheduler/metrics" + "k8s.io/kubernetes/pkg/scheduler/profile" "k8s.io/kubernetes/pkg/scheduler/volumebinder" ) @@ -89,8 +89,6 @@ type Scheduler struct { // PodPreemptor is used to evict pods and update 'NominatedNode' field of // the preemptor pod. podPreemptor podPreemptor - // Framework runs scheduler plugins at configured extension points. - Framework framework.Framework // NextPod should be a function that blocks until the next pod // is available. We don't use a channel for this, because scheduling @@ -102,9 +100,6 @@ type Scheduler struct { // question, and the error Error func(*framework.PodInfo, error) - // Recorder is the EventRecorder to use - Recorder events.EventRecorder - // Close this to shut down the scheduler. StopEverything <-chan struct{} @@ -117,6 +112,9 @@ type Scheduler struct { // SchedulingQueue holds pods to be scheduled SchedulingQueue internalqueue.SchedulingQueue + // Profiles are the scheduling profiles. + Profiles profile.Map + scheduledPodsHasSynced func() bool } @@ -126,7 +124,6 @@ func (sched *Scheduler) Cache() internalcache.Cache { } type schedulerOptions struct { - schedulerName string schedulerAlgorithmSource schedulerapi.SchedulerAlgorithmSource disablePreemption bool percentageOfNodesToScore int32 @@ -135,18 +132,17 @@ type schedulerOptions struct { podMaxBackoffSeconds int64 // Contains out-of-tree plugins to be merged with the in-tree registry. frameworkOutOfTreeRegistry framework.Registry - // Plugins and PluginConfig set from ComponentConfig. - frameworkPlugins *schedulerapi.Plugins - frameworkPluginConfig []schedulerapi.PluginConfig + profiles []schedulerapi.KubeSchedulerProfile } // Option configures a Scheduler type Option func(*schedulerOptions) -// WithName sets schedulerName for Scheduler, the default schedulerName is default-scheduler -func WithName(schedulerName string) Option { +// WithProfiles sets profiles for Scheduler. By default, there is one profile +// with the name "default-scheduler". +func WithProfiles(p ...schedulerapi.KubeSchedulerProfile) Option { return func(o *schedulerOptions) { - o.schedulerName = schedulerName + o.profiles = p } } @@ -186,20 +182,6 @@ func WithFrameworkOutOfTreeRegistry(registry framework.Registry) Option { } } -// WithFrameworkPlugins sets the plugins that the framework should be configured with. -func WithFrameworkPlugins(plugins *schedulerapi.Plugins) Option { - return func(o *schedulerOptions) { - o.frameworkPlugins = plugins - } -} - -// WithFrameworkPluginConfig sets the PluginConfig slice that the framework should be configured with. -func WithFrameworkPluginConfig(pluginConfig []schedulerapi.PluginConfig) Option { - return func(o *schedulerOptions) { - o.frameworkPluginConfig = pluginConfig - } -} - // WithPodInitialBackoffSeconds sets podInitialBackoffSeconds for Scheduler, the default value is 1 func WithPodInitialBackoffSeconds(podInitialBackoffSeconds int64) Option { return func(o *schedulerOptions) { @@ -215,7 +197,10 @@ func WithPodMaxBackoffSeconds(podMaxBackoffSeconds int64) Option { } var defaultSchedulerOptions = schedulerOptions{ - schedulerName: v1.DefaultSchedulerName, + profiles: []schedulerapi.KubeSchedulerProfile{ + // Profiles' default plugins are set from the algorithm provider. + {SchedulerName: v1.DefaultSchedulerName}, + }, schedulerAlgorithmSource: schedulerapi.SchedulerAlgorithmSource{ Provider: defaultAlgorithmSourceProviderName(), }, @@ -230,7 +215,7 @@ var defaultSchedulerOptions = schedulerOptions{ func New(client clientset.Interface, informerFactory informers.SharedInformerFactory, podInformer coreinformers.PodInformer, - recorder events.EventRecorder, + recorderFactory profile.RecorderFactory, stopCh <-chan struct{}, opts ...Option) (*Scheduler, error) { @@ -264,6 +249,7 @@ func New(client clientset.Interface, configurator := &Configurator{ client: client, + recorderFactory: recorderFactory, informerFactory: informerFactory, podInformer: podInformer, volumeBinder: volumeBinder, @@ -275,9 +261,8 @@ func New(client clientset.Interface, podInitialBackoffSeconds: options.podInitialBackoffSeconds, podMaxBackoffSeconds: options.podMaxBackoffSeconds, enableNonPreempting: utilfeature.DefaultFeatureGate.Enabled(kubefeatures.NonPreemptingPriority), + profiles: append([]schedulerapi.KubeSchedulerProfile(nil), options.profiles...), registry: registry, - plugins: options.frameworkPlugins, - pluginConfig: options.frameworkPluginConfig, nodeInfoSnapshot: snapshot, } @@ -315,14 +300,13 @@ func New(client clientset.Interface, return nil, fmt.Errorf("unsupported algorithm source: %v", source) } // Additional tweaks to the config produced by the configurator. - sched.Recorder = recorder sched.DisablePreemption = options.disablePreemption sched.StopEverything = stopEverything sched.podConditionUpdater = &podConditionUpdaterImpl{client} sched.podPreemptor = &podPreemptorImpl{client} sched.scheduledPodsHasSynced = podInformer.Informer().HasSynced - AddAllEventHandlers(sched, options.schedulerName, informerFactory, podInformer) + addAllEventHandlers(sched, informerFactory, podInformer) return sched, nil } @@ -375,10 +359,10 @@ func (sched *Scheduler) Run(ctx context.Context) { // recordFailedSchedulingEvent records an event for the pod that indicates the // pod has failed to schedule. // NOTE: This function modifies "pod". "pod" should be copied before being passed. -func (sched *Scheduler) recordSchedulingFailure(podInfo *framework.PodInfo, err error, reason string, message string) { +func (sched *Scheduler) recordSchedulingFailure(prof *profile.Profile, podInfo *framework.PodInfo, err error, reason string, message string) { sched.Error(podInfo, err) pod := podInfo.Pod - sched.Recorder.Eventf(pod, nil, v1.EventTypeWarning, "FailedScheduling", "Scheduling", message) + prof.Recorder.Eventf(pod, nil, v1.EventTypeWarning, "FailedScheduling", "Scheduling", message) if err := sched.podConditionUpdater.update(pod, &v1.PodCondition{ Type: v1.PodScheduled, Status: v1.ConditionFalse, @@ -392,14 +376,14 @@ func (sched *Scheduler) recordSchedulingFailure(podInfo *framework.PodInfo, err // preempt tries to create room for a pod that has failed to schedule, by preempting lower priority pods if possible. // If it succeeds, it adds the name of the node where preemption has happened to the pod spec. // It returns the node name and an error if any. -func (sched *Scheduler) preempt(ctx context.Context, state *framework.CycleState, fwk framework.Framework, preemptor *v1.Pod, scheduleErr error) (string, error) { +func (sched *Scheduler) preempt(ctx context.Context, prof *profile.Profile, state *framework.CycleState, preemptor *v1.Pod, scheduleErr error) (string, error) { preemptor, err := sched.podPreemptor.getUpdatedPod(preemptor) if err != nil { klog.Errorf("Error getting the updated preemptor pod object: %v", err) return "", err } - node, victims, nominatedPodsToClear, err := sched.Algorithm.Preempt(ctx, state, preemptor, scheduleErr) + node, victims, nominatedPodsToClear, err := sched.Algorithm.Preempt(ctx, prof, state, preemptor, scheduleErr) if err != nil { klog.Errorf("Error preempting victims to make room for %v/%v: %v", preemptor.Namespace, preemptor.Name, err) return "", err @@ -426,10 +410,10 @@ func (sched *Scheduler) preempt(ctx context.Context, state *framework.CycleState return "", err } // If the victim is a WaitingPod, send a reject message to the PermitPlugin - if waitingPod := fwk.GetWaitingPod(victim.UID); waitingPod != nil { + if waitingPod := prof.GetWaitingPod(victim.UID); waitingPod != nil { waitingPod.Reject("preempted") } - sched.Recorder.Eventf(victim, preemptor, v1.EventTypeNormal, "Preempted", "Preempting", "Preempted by %v/%v on node %v", preemptor.Namespace, preemptor.Name, nodeName) + prof.Recorder.Eventf(victim, preemptor, v1.EventTypeNormal, "Preempted", "Preempting", "Preempted by %v/%v on node %v", preemptor.Namespace, preemptor.Name, nodeName) } metrics.PreemptionVictims.Observe(float64(len(victims))) @@ -496,17 +480,17 @@ func (sched *Scheduler) assume(assumed *v1.Pod, host string) error { // bind binds a pod to a given node defined in a binding object. // The precedence for binding is: (1) extenders and (2) framework plugins. // We expect this to run asynchronously, so we handle binding metrics internally. -func (sched *Scheduler) bind(ctx context.Context, assumed *v1.Pod, targetNode string, state *framework.CycleState) (err error) { +func (sched *Scheduler) bind(ctx context.Context, prof *profile.Profile, assumed *v1.Pod, targetNode string, state *framework.CycleState) (err error) { start := time.Now() defer func() { - sched.finishBinding(assumed, targetNode, start, err) + sched.finishBinding(prof, assumed, targetNode, start, err) }() bound, err := sched.extendersBinding(assumed, targetNode) if bound { return err } - bindStatus := sched.Framework.RunBindPlugins(ctx, state, assumed, targetNode) + bindStatus := prof.RunBindPlugins(ctx, state, assumed, targetNode) if bindStatus.IsSuccess() { return nil } @@ -530,7 +514,7 @@ func (sched *Scheduler) extendersBinding(pod *v1.Pod, node string) (bool, error) return false, nil } -func (sched *Scheduler) finishBinding(assumed *v1.Pod, targetNode string, start time.Time, err error) { +func (sched *Scheduler) finishBinding(prof *profile.Profile, assumed *v1.Pod, targetNode string, start time.Time, err error) { if finErr := sched.SchedulerCache.FinishBinding(assumed); finErr != nil { klog.Errorf("scheduler cache FinishBinding failed: %v", finErr) } @@ -544,20 +528,25 @@ func (sched *Scheduler) finishBinding(assumed *v1.Pod, targetNode string, start metrics.BindingLatency.Observe(metrics.SinceInSeconds(start)) metrics.DeprecatedSchedulingDuration.WithLabelValues(metrics.Binding).Observe(metrics.SinceInSeconds(start)) - sched.Recorder.Eventf(assumed, nil, v1.EventTypeNormal, "Scheduled", "Binding", "Successfully assigned %v/%v to %v", assumed.Namespace, assumed.Name, targetNode) + prof.Recorder.Eventf(assumed, nil, v1.EventTypeNormal, "Scheduled", "Binding", "Successfully assigned %v/%v to %v", assumed.Namespace, assumed.Name, targetNode) } // scheduleOne does the entire scheduling workflow for a single pod. It is serialized on the scheduling algorithm's host fitting. func (sched *Scheduler) scheduleOne(ctx context.Context) { - fwk := sched.Framework - podInfo := sched.NextPod() // pod could be nil when schedulerQueue is closed if podInfo == nil || podInfo.Pod == nil { return } pod := podInfo.Pod - if sched.skipPodSchedule(pod) { + prof, err := sched.profileForPod(pod) + if err != nil { + // This shouldn't happen, because we only accept for scheduling the pods + // which specify a scheduler name that matches one of the profiles. + klog.Error(err) + return + } + if sched.skipPodSchedule(prof, pod) { return } @@ -569,9 +558,9 @@ func (sched *Scheduler) scheduleOne(ctx context.Context) { state.SetRecordPluginMetrics(rand.Intn(100) < pluginMetricsSamplePercent) schedulingCycleCtx, cancel := context.WithCancel(ctx) defer cancel() - scheduleResult, err := sched.Algorithm.Schedule(schedulingCycleCtx, state, pod) + scheduleResult, err := sched.Algorithm.Schedule(schedulingCycleCtx, prof, state, pod) if err != nil { - sched.recordSchedulingFailure(podInfo.DeepCopy(), err, v1.PodReasonUnschedulable, err.Error()) + sched.recordSchedulingFailure(prof, podInfo.DeepCopy(), err, v1.PodReasonUnschedulable, err.Error()) // Schedule() may have failed because the pod would not fit on any host, so we try to // preempt, with the expectation that the next time the pod is tried for scheduling it // will fit due to the preemption. It is also possible that a different pod will schedule @@ -582,7 +571,7 @@ func (sched *Scheduler) scheduleOne(ctx context.Context) { " No preemption is performed.") } else { preemptionStartTime := time.Now() - sched.preempt(schedulingCycleCtx, state, fwk, pod, fitError) + sched.preempt(schedulingCycleCtx, prof, state, pod, fitError) metrics.PreemptionAttempts.Inc() metrics.SchedulingAlgorithmPreemptionEvaluationDuration.Observe(metrics.SinceInSeconds(preemptionStartTime)) metrics.DeprecatedSchedulingDuration.WithLabelValues(metrics.PreemptionEvaluation).Observe(metrics.SinceInSeconds(preemptionStartTime)) @@ -612,15 +601,15 @@ func (sched *Scheduler) scheduleOne(ctx context.Context) { // This function modifies 'assumedPod' if volume binding is required. allBound, err := sched.VolumeBinder.Binder.AssumePodVolumes(assumedPod, scheduleResult.SuggestedHost) if err != nil { - sched.recordSchedulingFailure(assumedPodInfo, err, SchedulerError, + sched.recordSchedulingFailure(prof, assumedPodInfo, err, SchedulerError, fmt.Sprintf("AssumePodVolumes failed: %v", err)) metrics.PodScheduleErrors.Inc() return } // Run "reserve" plugins. - if sts := fwk.RunReservePlugins(schedulingCycleCtx, state, assumedPod, scheduleResult.SuggestedHost); !sts.IsSuccess() { - sched.recordSchedulingFailure(assumedPodInfo, sts.AsError(), SchedulerError, sts.Message()) + if sts := prof.RunReservePlugins(schedulingCycleCtx, state, assumedPod, scheduleResult.SuggestedHost); !sts.IsSuccess() { + sched.recordSchedulingFailure(prof, assumedPodInfo, sts.AsError(), SchedulerError, sts.Message()) metrics.PodScheduleErrors.Inc() return } @@ -633,15 +622,15 @@ func (sched *Scheduler) scheduleOne(ctx context.Context) { // This relies on the fact that Error will check if the pod has been bound // to a node and if so will not add it back to the unscheduled pods queue // (otherwise this would cause an infinite loop). - sched.recordSchedulingFailure(assumedPodInfo, err, SchedulerError, fmt.Sprintf("AssumePod failed: %v", err)) + sched.recordSchedulingFailure(prof, assumedPodInfo, err, SchedulerError, fmt.Sprintf("AssumePod failed: %v", err)) metrics.PodScheduleErrors.Inc() // trigger un-reserve plugins to clean up state associated with the reserved Pod - fwk.RunUnreservePlugins(schedulingCycleCtx, state, assumedPod, scheduleResult.SuggestedHost) + prof.RunUnreservePlugins(schedulingCycleCtx, state, assumedPod, scheduleResult.SuggestedHost) return } // Run "permit" plugins. - runPermitStatus := fwk.RunPermitPlugins(schedulingCycleCtx, state, assumedPod, scheduleResult.SuggestedHost) + runPermitStatus := prof.RunPermitPlugins(schedulingCycleCtx, state, assumedPod, scheduleResult.SuggestedHost) if runPermitStatus.Code() != framework.Wait && !runPermitStatus.IsSuccess() { var reason string if runPermitStatus.IsUnschedulable() { @@ -655,8 +644,8 @@ func (sched *Scheduler) scheduleOne(ctx context.Context) { klog.Errorf("scheduler cache ForgetPod failed: %v", forgetErr) } // One of the plugins returned status different than success or wait. - fwk.RunUnreservePlugins(schedulingCycleCtx, state, assumedPod, scheduleResult.SuggestedHost) - sched.recordSchedulingFailure(assumedPodInfo, runPermitStatus.AsError(), reason, runPermitStatus.Message()) + prof.RunUnreservePlugins(schedulingCycleCtx, state, assumedPod, scheduleResult.SuggestedHost) + sched.recordSchedulingFailure(prof, assumedPodInfo, runPermitStatus.AsError(), reason, runPermitStatus.Message()) return } @@ -667,7 +656,7 @@ func (sched *Scheduler) scheduleOne(ctx context.Context) { metrics.SchedulerGoroutines.WithLabelValues("binding").Inc() defer metrics.SchedulerGoroutines.WithLabelValues("binding").Dec() - waitOnPermitStatus := fwk.WaitOnPermit(bindingCycleCtx, assumedPod) + waitOnPermitStatus := prof.WaitOnPermit(bindingCycleCtx, assumedPod) if !waitOnPermitStatus.IsSuccess() { var reason string if waitOnPermitStatus.IsUnschedulable() { @@ -681,8 +670,8 @@ func (sched *Scheduler) scheduleOne(ctx context.Context) { klog.Errorf("scheduler cache ForgetPod failed: %v", forgetErr) } // trigger un-reserve plugins to clean up state associated with the reserved Pod - fwk.RunUnreservePlugins(bindingCycleCtx, state, assumedPod, scheduleResult.SuggestedHost) - sched.recordSchedulingFailure(assumedPodInfo, waitOnPermitStatus.AsError(), reason, waitOnPermitStatus.Message()) + prof.RunUnreservePlugins(bindingCycleCtx, state, assumedPod, scheduleResult.SuggestedHost) + sched.recordSchedulingFailure(prof, assumedPodInfo, waitOnPermitStatus.AsError(), reason, waitOnPermitStatus.Message()) return } @@ -690,16 +679,16 @@ func (sched *Scheduler) scheduleOne(ctx context.Context) { if !allBound { err := sched.bindVolumes(assumedPod) if err != nil { - sched.recordSchedulingFailure(assumedPodInfo, err, "VolumeBindingFailed", err.Error()) + sched.recordSchedulingFailure(prof, assumedPodInfo, err, "VolumeBindingFailed", err.Error()) metrics.PodScheduleErrors.Inc() // trigger un-reserve plugins to clean up state associated with the reserved Pod - fwk.RunUnreservePlugins(bindingCycleCtx, state, assumedPod, scheduleResult.SuggestedHost) + prof.RunUnreservePlugins(bindingCycleCtx, state, assumedPod, scheduleResult.SuggestedHost) return } } // Run "prebind" plugins. - preBindStatus := fwk.RunPreBindPlugins(bindingCycleCtx, state, assumedPod, scheduleResult.SuggestedHost) + preBindStatus := prof.RunPreBindPlugins(bindingCycleCtx, state, assumedPod, scheduleResult.SuggestedHost) if !preBindStatus.IsSuccess() { var reason string metrics.PodScheduleErrors.Inc() @@ -708,18 +697,18 @@ func (sched *Scheduler) scheduleOne(ctx context.Context) { klog.Errorf("scheduler cache ForgetPod failed: %v", forgetErr) } // trigger un-reserve plugins to clean up state associated with the reserved Pod - fwk.RunUnreservePlugins(bindingCycleCtx, state, assumedPod, scheduleResult.SuggestedHost) - sched.recordSchedulingFailure(assumedPodInfo, preBindStatus.AsError(), reason, preBindStatus.Message()) + prof.RunUnreservePlugins(bindingCycleCtx, state, assumedPod, scheduleResult.SuggestedHost) + sched.recordSchedulingFailure(prof, assumedPodInfo, preBindStatus.AsError(), reason, preBindStatus.Message()) return } - err := sched.bind(bindingCycleCtx, assumedPod, scheduleResult.SuggestedHost, state) + err := sched.bind(bindingCycleCtx, prof, assumedPod, scheduleResult.SuggestedHost, state) metrics.E2eSchedulingLatency.Observe(metrics.SinceInSeconds(start)) if err != nil { metrics.PodScheduleErrors.Inc() // trigger un-reserve plugins to clean up state associated with the reserved Pod - fwk.RunUnreservePlugins(bindingCycleCtx, state, assumedPod, scheduleResult.SuggestedHost) - sched.recordSchedulingFailure(assumedPodInfo, err, SchedulerError, fmt.Sprintf("Binding rejected: %v", err)) + prof.RunUnreservePlugins(bindingCycleCtx, state, assumedPod, scheduleResult.SuggestedHost) + sched.recordSchedulingFailure(prof, assumedPodInfo, err, SchedulerError, fmt.Sprintf("Binding rejected: %v", err)) } else { // Calculating nodeResourceString can be heavy. Avoid it if klog verbosity is below 2. if klog.V(2) { @@ -731,16 +720,24 @@ func (sched *Scheduler) scheduleOne(ctx context.Context) { metrics.PodSchedulingDuration.Observe(metrics.SinceInSeconds(podInfo.InitialAttemptTimestamp)) // Run "postbind" plugins. - fwk.RunPostBindPlugins(bindingCycleCtx, state, assumedPod, scheduleResult.SuggestedHost) + prof.RunPostBindPlugins(bindingCycleCtx, state, assumedPod, scheduleResult.SuggestedHost) } }() } +func (sched *Scheduler) profileForPod(pod *v1.Pod) (*profile.Profile, error) { + prof, ok := sched.Profiles[pod.Spec.SchedulerName] + if !ok { + return nil, fmt.Errorf("profile not found for scheduler name %q", pod.Spec.SchedulerName) + } + return prof, nil +} + // skipPodSchedule returns true if we could skip scheduling the pod for specified cases. -func (sched *Scheduler) skipPodSchedule(pod *v1.Pod) bool { +func (sched *Scheduler) skipPodSchedule(prof *profile.Profile, pod *v1.Pod) bool { // Case 1: pod is being deleted. if pod.DeletionTimestamp != nil { - sched.Recorder.Eventf(pod, nil, v1.EventTypeWarning, "FailedScheduling", "Scheduling", "skip schedule deleting pod: %v/%v", pod.Namespace, pod.Name) + prof.Recorder.Eventf(pod, nil, v1.EventTypeWarning, "FailedScheduling", "Scheduling", "skip schedule deleting pod: %v/%v", pod.Namespace, pod.Name) klog.V(3).Infof("Skip schedule deleting pod: %v/%v", pod.Namespace, pod.Name) return true } diff --git a/pkg/scheduler/scheduler_test.go b/pkg/scheduler/scheduler_test.go index 749515240e7..70f5b842584 100644 --- a/pkg/scheduler/scheduler_test.go +++ b/pkg/scheduler/scheduler_test.go @@ -56,6 +56,7 @@ import ( internalcache "k8s.io/kubernetes/pkg/scheduler/internal/cache" fakecache "k8s.io/kubernetes/pkg/scheduler/internal/cache/fake" internalqueue "k8s.io/kubernetes/pkg/scheduler/internal/queue" + "k8s.io/kubernetes/pkg/scheduler/profile" st "k8s.io/kubernetes/pkg/scheduler/testing" "k8s.io/kubernetes/pkg/scheduler/volumebinder" ) @@ -92,7 +93,8 @@ func podWithID(id, desiredHost string) *v1.Pod { SelfLink: fmt.Sprintf("/api/v1/%s/%s", string(v1.ResourcePods), id), }, Spec: v1.PodSpec{ - NodeName: desiredHost, + NodeName: desiredHost, + SchedulerName: testSchedulerName, }, } } @@ -107,7 +109,8 @@ func deletingPod(id string) *v1.Pod { SelfLink: fmt.Sprintf("/api/v1/%s/%s", string(v1.ResourcePods), id), }, Spec: v1.PodSpec{ - NodeName: "", + NodeName: "", + SchedulerName: testSchedulerName, }, } } @@ -133,24 +136,16 @@ type mockScheduler struct { err error } -func (es mockScheduler) Schedule(ctx context.Context, state *framework.CycleState, pod *v1.Pod) (core.ScheduleResult, error) { +func (es mockScheduler) Schedule(ctx context.Context, profile *profile.Profile, state *framework.CycleState, pod *v1.Pod) (core.ScheduleResult, error) { return es.result, es.err } func (es mockScheduler) Extenders() []core.SchedulerExtender { return nil } -func (es mockScheduler) Preempt(ctx context.Context, state *framework.CycleState, pod *v1.Pod, scheduleErr error) (*v1.Node, []*v1.Pod, []*v1.Pod, error) { +func (es mockScheduler) Preempt(ctx context.Context, i *profile.Profile, state *framework.CycleState, pod *v1.Pod, scheduleErr error) (*v1.Node, []*v1.Pod, []*v1.Pod, error) { return nil, nil, nil, nil } -func (es mockScheduler) Snapshot() error { - return nil - -} -func (es mockScheduler) Framework() framework.Framework { - return nil - -} func TestSchedulerCreation(t *testing.T) { client := clientsetfake.NewSimpleClientset() @@ -163,7 +158,7 @@ func TestSchedulerCreation(t *testing.T) { _, err := New(client, informerFactory, NewPodInformer(client, 0), - eventBroadcaster.NewRecorder(scheme.Scheme, "scheduler"), + profile.NewRecorderFactory(eventBroadcaster), stopCh, WithPodInitialBackoffSeconds(1), WithPodMaxBackoffSeconds(10), @@ -187,7 +182,7 @@ func TestSchedulerCreation(t *testing.T) { _, err = New(client, informerFactory, NewPodInformer(client, 0), - eventBroadcaster.NewRecorder(scheme.Scheme, "scheduler"), + profile.NewRecorderFactory(eventBroadcaster), stopCh, WithPodInitialBackoffSeconds(1), WithPodMaxBackoffSeconds(10), @@ -308,8 +303,12 @@ func TestSchedulerScheduleOne(t *testing.T) { NextPod: func() *framework.PodInfo { return &framework.PodInfo{Pod: item.sendPod} }, - Framework: fwk, - Recorder: eventBroadcaster.NewRecorder(scheme.Scheme, "scheduler"), + Profiles: profile.Map{ + testSchedulerName: &profile.Profile{ + Framework: fwk, + Recorder: eventBroadcaster.NewRecorder(scheme.Scheme, testSchedulerName), + }, + }, VolumeBinder: volumebinder.NewFakeVolumeBinder(&volumescheduling.FakeVolumeBinderConfig{AllBound: true}), } called := make(chan struct{}) @@ -598,7 +597,7 @@ func TestSchedulerFailedSchedulingReasons(t *testing.T) { // queuedPodStore: pods queued before processing. // scache: scheduler cache that might contain assumed pods. -func setupTestScheduler(queuedPodStore *clientcache.FIFO, scache internalcache.Cache, informerFactory informers.SharedInformerFactory, recorder events.EventRecorder, fakeVolumeBinder *volumebinder.VolumeBinder, fns ...st.RegisterPluginFunc) (*Scheduler, chan *v1.Binding, chan error) { +func setupTestScheduler(queuedPodStore *clientcache.FIFO, scache internalcache.Cache, informerFactory informers.SharedInformerFactory, broadcaster events.EventBroadcaster, fakeVolumeBinder *volumebinder.VolumeBinder, fns ...st.RegisterPluginFunc) (*Scheduler, chan *v1.Binding, chan error) { if fakeVolumeBinder == nil { // Create default volume binder if it didn't set. fakeVolumeBinder = volumebinder.NewFakeVolumeBinder(&volumescheduling.FakeVolumeBinderConfig{AllBound: true}) @@ -615,13 +614,22 @@ func setupTestScheduler(queuedPodStore *clientcache.FIFO, scache internalcache.C }) fwk, _ := st.NewFramework(fns, framework.WithClientSet(client), framework.WithVolumeBinder(fakeVolumeBinder)) + prof := &profile.Profile{ + Framework: fwk, + Recorder: &events.FakeRecorder{}, + } + if broadcaster != nil { + prof.Recorder = broadcaster.NewRecorder(scheme.Scheme, testSchedulerName) + } + profiles := profile.Map{ + testSchedulerName: prof, + } + algo := core.NewGenericScheduler( scache, internalqueue.NewSchedulingQueue(nil), internalcache.NewEmptySnapshot(), - fwk, []core.SchedulerExtender{}, - nil, informerFactory.Core().V1().PersistentVolumeClaims().Lister(), informerFactory.Policy().V1beta1().PodDisruptionBudgets().Lister(), false, @@ -639,17 +647,12 @@ func setupTestScheduler(queuedPodStore *clientcache.FIFO, scache internalcache.C Error: func(p *framework.PodInfo, err error) { errChan <- err }, - Recorder: &events.FakeRecorder{}, + Profiles: profiles, podConditionUpdater: fakePodConditionUpdater{}, podPreemptor: fakePodPreemptor{}, - Framework: fwk, VolumeBinder: fakeVolumeBinder, } - if recorder != nil { - sched.Recorder = recorder - } - return sched, bindingChan, errChan } @@ -667,13 +670,12 @@ func setupTestSchedulerWithVolumeBinding(fakeVolumeBinder *volumebinder.VolumeBi client := clientsetfake.NewSimpleClientset(&testNode, &testPVC) informerFactory := informers.NewSharedInformerFactory(client, 0) - recorder := broadcaster.NewRecorder(scheme.Scheme, "scheduler") fns := []st.RegisterPluginFunc{ st.RegisterQueueSortPlugin(queuesort.Name, queuesort.New), st.RegisterBindPlugin(defaultbinder.Name, defaultbinder.New), st.RegisterFilterPlugin(volumebinding.Name, volumebinding.New), } - s, bindingChan, errChan := setupTestScheduler(queuedPodStore, scache, informerFactory, recorder, fakeVolumeBinder, fns...) + s, bindingChan, errChan := setupTestScheduler(queuedPodStore, scache, informerFactory, broadcaster, fakeVolumeBinder, fns...) informerFactory.Start(stop) informerFactory.WaitForCacheSync(stop) s.VolumeBinder = fakeVolumeBinder @@ -987,6 +989,10 @@ func TestSchedulerBinding(t *testing.T) { if err != nil { t.Fatal(err) } + prof := &profile.Profile{ + Framework: fwk, + Recorder: &events.FakeRecorder{}, + } stop := make(chan struct{}) defer close(stop) scache := internalcache.New(100*time.Millisecond, stop) @@ -994,22 +1000,18 @@ func TestSchedulerBinding(t *testing.T) { scache, nil, nil, - fwk, test.extenders, nil, nil, - nil, false, 0, false, ) sched := Scheduler{ Algorithm: algo, - Framework: fwk, - Recorder: &events.FakeRecorder{}, SchedulerCache: scache, } - err = sched.bind(context.Background(), pod, "node", nil) + err = sched.bind(context.Background(), prof, pod, "node", nil) if err != nil { t.Error(err) } diff --git a/test/integration/daemonset/BUILD b/test/integration/daemonset/BUILD index 9c917afb504..af10a1bdfb5 100644 --- a/test/integration/daemonset/BUILD +++ b/test/integration/daemonset/BUILD @@ -14,12 +14,12 @@ go_test( ], tags = ["integration"], deps = [ - "//pkg/api/legacyscheme:go_default_library", "//pkg/api/v1/pod:go_default_library", "//pkg/apis/core:go_default_library", "//pkg/controller:go_default_library", "//pkg/controller/daemon:go_default_library", "//pkg/scheduler:go_default_library", + "//pkg/scheduler/profile:go_default_library", "//pkg/util/labels:go_default_library", "//staging/src/k8s.io/api/apps/v1:go_default_library", "//staging/src/k8s.io/api/core/v1:go_default_library", diff --git a/test/integration/daemonset/daemonset_test.go b/test/integration/daemonset/daemonset_test.go index 19899d7b1b8..fd87bcbe2a7 100644 --- a/test/integration/daemonset/daemonset_test.go +++ b/test/integration/daemonset/daemonset_test.go @@ -40,12 +40,12 @@ import ( "k8s.io/client-go/tools/events" "k8s.io/client-go/util/flowcontrol" "k8s.io/client-go/util/retry" - "k8s.io/kubernetes/pkg/api/legacyscheme" podutil "k8s.io/kubernetes/pkg/api/v1/pod" api "k8s.io/kubernetes/pkg/apis/core" "k8s.io/kubernetes/pkg/controller" "k8s.io/kubernetes/pkg/controller/daemon" "k8s.io/kubernetes/pkg/scheduler" + "k8s.io/kubernetes/pkg/scheduler/profile" labelsutil "k8s.io/kubernetes/pkg/util/labels" "k8s.io/kubernetes/test/integration/framework" ) @@ -92,10 +92,7 @@ func setupScheduler( cs, informerFactory, informerFactory.Core().V1().Pods(), - eventBroadcaster.NewRecorder( - legacyscheme.Scheme, - v1.DefaultSchedulerName, - ), + profile.NewRecorderFactory(eventBroadcaster), ctx.Done(), ) if err != nil { diff --git a/test/integration/scheduler/BUILD b/test/integration/scheduler/BUILD index 8a13dd3a743..6fd359ee474 100644 --- a/test/integration/scheduler/BUILD +++ b/test/integration/scheduler/BUILD @@ -23,7 +23,6 @@ go_test( embed = [":go_default_library"], tags = ["integration"], deps = [ - "//pkg/api/legacyscheme:go_default_library", "//pkg/api/v1/pod:go_default_library", "//pkg/apis/scheduling:go_default_library", "//pkg/controller/nodelifecycle:go_default_library", @@ -34,6 +33,7 @@ go_test( "//pkg/scheduler/framework/plugins/defaultbinder:go_default_library", "//pkg/scheduler/framework/v1alpha1:go_default_library", "//pkg/scheduler/nodeinfo:go_default_library", + "//pkg/scheduler/profile:go_default_library", "//pkg/scheduler/testing:go_default_library", "//plugin/pkg/admission/defaulttolerationseconds:go_default_library", "//plugin/pkg/admission/podtolerationrestriction:go_default_library", @@ -85,13 +85,13 @@ go_library( srcs = ["util.go"], importpath = "k8s.io/kubernetes/test/integration/scheduler", deps = [ - "//pkg/api/legacyscheme:go_default_library", "//pkg/api/v1/pod:go_default_library", "//pkg/controller/disruption:go_default_library", "//pkg/scheduler:go_default_library", "//pkg/scheduler/apis/config:go_default_library", "//pkg/scheduler/apis/config/scheme:go_default_library", "//pkg/scheduler/apis/config/v1:go_default_library", + "//pkg/scheduler/profile:go_default_library", "//pkg/util/taints:go_default_library", "//staging/src/k8s.io/api/core/v1:go_default_library", "//staging/src/k8s.io/api/policy/v1beta1:go_default_library", diff --git a/test/integration/scheduler/framework_test.go b/test/integration/scheduler/framework_test.go index c18df3d3299..9cbfd97e0be 100644 --- a/test/integration/scheduler/framework_test.go +++ b/test/integration/scheduler/framework_test.go @@ -470,11 +470,12 @@ func TestPreFilterPlugin(t *testing.T) { registry := framework.Registry{prefilterPluginName: newPlugin(preFilterPlugin)} // Setup initial prefilter plugin for testing. - plugins := &schedulerconfig.Plugins{ - PreFilter: &schedulerconfig.PluginSet{ - Enabled: []schedulerconfig.Plugin{ - { - Name: prefilterPluginName, + prof := schedulerconfig.KubeSchedulerProfile{ + SchedulerName: v1.DefaultSchedulerName, + Plugins: &schedulerconfig.Plugins{ + PreFilter: &schedulerconfig.PluginSet{ + Enabled: []schedulerconfig.Plugin{ + {Name: prefilterPluginName}, }, }, }, @@ -482,7 +483,7 @@ func TestPreFilterPlugin(t *testing.T) { // Create the master and the scheduler with the test plugin set. testCtx := initTestSchedulerForFrameworkTest(t, initTestMaster(t, "prefilter-plugin", nil), 2, - scheduler.WithFrameworkPlugins(plugins), + scheduler.WithProfiles(prof), scheduler.WithFrameworkOutOfTreeRegistry(registry)) defer cleanupTest(t, testCtx) @@ -541,19 +542,19 @@ func TestScorePlugin(t *testing.T) { scorePluginName: newPlugin(scorePlugin), } - // Setup initial score plugin for testing. - plugins := &schedulerconfig.Plugins{ - Score: &schedulerconfig.PluginSet{ - Enabled: []schedulerconfig.Plugin{ - { - Name: scorePluginName, + prof := schedulerconfig.KubeSchedulerProfile{ + SchedulerName: v1.DefaultSchedulerName, + Plugins: &schedulerconfig.Plugins{ + Score: &schedulerconfig.PluginSet{ + Enabled: []schedulerconfig.Plugin{ + {Name: scorePluginName}, }, }, }, } testCtx := initTestSchedulerForFrameworkTest(t, initTestMaster(t, "score-plugin", nil), 10, - scheduler.WithFrameworkPlugins(plugins), + scheduler.WithProfiles(prof), scheduler.WithFrameworkOutOfTreeRegistry(registry)) defer cleanupTest(t, testCtx) @@ -601,17 +602,18 @@ func TestNormalizeScorePlugin(t *testing.T) { } // Setup initial score plugin for testing. - plugins := &schedulerconfig.Plugins{ - Score: &schedulerconfig.PluginSet{ - Enabled: []schedulerconfig.Plugin{ - { - Name: scoreWithNormalizePluginName, + prof := schedulerconfig.KubeSchedulerProfile{ + SchedulerName: v1.DefaultSchedulerName, + Plugins: &schedulerconfig.Plugins{ + Score: &schedulerconfig.PluginSet{ + Enabled: []schedulerconfig.Plugin{ + {Name: scoreWithNormalizePluginName}, }, }, }, } testCtx := initTestSchedulerForFrameworkTest(t, initTestMaster(t, "score-plugin", nil), 10, - scheduler.WithFrameworkPlugins(plugins), + scheduler.WithProfiles(prof), scheduler.WithFrameworkOutOfTreeRegistry(registry)) defer cleanupTest(t, testCtx) @@ -644,11 +646,14 @@ func TestReservePlugin(t *testing.T) { registry := framework.Registry{reservePluginName: newPlugin(reservePlugin)} // Setup initial reserve plugin for testing. - plugins := &schedulerconfig.Plugins{ - Reserve: &schedulerconfig.PluginSet{ - Enabled: []schedulerconfig.Plugin{ - { - Name: reservePluginName, + prof := schedulerconfig.KubeSchedulerProfile{ + SchedulerName: v1.DefaultSchedulerName, + Plugins: &schedulerconfig.Plugins{ + Reserve: &schedulerconfig.PluginSet{ + Enabled: []schedulerconfig.Plugin{ + { + Name: reservePluginName, + }, }, }, }, @@ -656,7 +661,7 @@ func TestReservePlugin(t *testing.T) { // Create the master and the scheduler with the test plugin set. testCtx := initTestSchedulerForFrameworkTest(t, initTestMaster(t, "reserve-plugin", nil), 2, - scheduler.WithFrameworkPlugins(plugins), + scheduler.WithProfiles(prof), scheduler.WithFrameworkOutOfTreeRegistry(registry)) defer cleanupTest(t, testCtx) @@ -696,11 +701,14 @@ func TestPrebindPlugin(t *testing.T) { registry := framework.Registry{preBindPluginName: newPlugin(preBindPlugin)} // Setup initial prebind plugin for testing. - plugins := &schedulerconfig.Plugins{ - PreBind: &schedulerconfig.PluginSet{ - Enabled: []schedulerconfig.Plugin{ - { - Name: preBindPluginName, + prof := schedulerconfig.KubeSchedulerProfile{ + SchedulerName: v1.DefaultSchedulerName, + Plugins: &schedulerconfig.Plugins{ + PreBind: &schedulerconfig.PluginSet{ + Enabled: []schedulerconfig.Plugin{ + { + Name: preBindPluginName, + }, }, }, }, @@ -708,7 +716,7 @@ func TestPrebindPlugin(t *testing.T) { // Create the master and the scheduler with the test plugin set. testCtx := initTestSchedulerForFrameworkTest(t, initTestMaster(t, "prebind-plugin", nil), 2, - scheduler.WithFrameworkPlugins(plugins), + scheduler.WithProfiles(prof), scheduler.WithFrameworkOutOfTreeRegistry(registry)) defer cleanupTest(t, testCtx) @@ -772,18 +780,21 @@ func TestUnreservePlugin(t *testing.T) { } // Setup initial unreserve and prebind plugin for testing. - plugins := &schedulerconfig.Plugins{ - Unreserve: &schedulerconfig.PluginSet{ - Enabled: []schedulerconfig.Plugin{ - { - Name: unreservePluginName, + prof := schedulerconfig.KubeSchedulerProfile{ + SchedulerName: v1.DefaultSchedulerName, + Plugins: &schedulerconfig.Plugins{ + Unreserve: &schedulerconfig.PluginSet{ + Enabled: []schedulerconfig.Plugin{ + { + Name: unreservePluginName, + }, }, }, - }, - PreBind: &schedulerconfig.PluginSet{ - Enabled: []schedulerconfig.Plugin{ - { - Name: preBindPluginName, + PreBind: &schedulerconfig.PluginSet{ + Enabled: []schedulerconfig.Plugin{ + { + Name: preBindPluginName, + }, }, }, }, @@ -791,7 +802,7 @@ func TestUnreservePlugin(t *testing.T) { // Create the master and the scheduler with the test plugin set. testCtx := initTestSchedulerForFrameworkTest(t, initTestMaster(t, "unreserve-plugin", nil), 2, - scheduler.WithFrameworkPlugins(plugins), + scheduler.WithProfiles(prof), scheduler.WithFrameworkOutOfTreeRegistry(registry)) defer cleanupTest(t, testCtx) @@ -867,23 +878,26 @@ func TestBindPlugin(t *testing.T) { } // Setup initial unreserve and bind plugins for testing. - plugins := &schedulerconfig.Plugins{ - Unreserve: &schedulerconfig.PluginSet{ - Enabled: []schedulerconfig.Plugin{{Name: unreservePlugin.Name()}}, - }, - Bind: &schedulerconfig.PluginSet{ - // Put DefaultBinder last. - Enabled: []schedulerconfig.Plugin{{Name: bindPlugin1.Name()}, {Name: bindPlugin2.Name()}, {Name: defaultbinder.Name}}, - Disabled: []schedulerconfig.Plugin{{Name: defaultbinder.Name}}, - }, - PostBind: &schedulerconfig.PluginSet{ - Enabled: []schedulerconfig.Plugin{{Name: postBindPlugin.Name()}}, + prof := schedulerconfig.KubeSchedulerProfile{ + SchedulerName: v1.DefaultSchedulerName, + Plugins: &schedulerconfig.Plugins{ + Unreserve: &schedulerconfig.PluginSet{ + Enabled: []schedulerconfig.Plugin{{Name: unreservePlugin.Name()}}, + }, + Bind: &schedulerconfig.PluginSet{ + // Put DefaultBinder last. + Enabled: []schedulerconfig.Plugin{{Name: bindPlugin1.Name()}, {Name: bindPlugin2.Name()}, {Name: defaultbinder.Name}}, + Disabled: []schedulerconfig.Plugin{{Name: defaultbinder.Name}}, + }, + PostBind: &schedulerconfig.PluginSet{ + Enabled: []schedulerconfig.Plugin{{Name: postBindPlugin.Name()}}, + }, }, } // Create the master and the scheduler with the test plugin set. testCtx := initTestSchedulerWithOptions(t, testContext, false, nil, time.Second, - scheduler.WithFrameworkPlugins(plugins), + scheduler.WithProfiles(prof), scheduler.WithFrameworkOutOfTreeRegistry(registry)) defer cleanupTest(t, testCtx) @@ -1024,18 +1038,21 @@ func TestPostBindPlugin(t *testing.T) { } // Setup initial prebind and postbind plugin for testing. - plugins := &schedulerconfig.Plugins{ - PreBind: &schedulerconfig.PluginSet{ - Enabled: []schedulerconfig.Plugin{ - { - Name: preBindPluginName, + prof := schedulerconfig.KubeSchedulerProfile{ + SchedulerName: v1.DefaultSchedulerName, + Plugins: &schedulerconfig.Plugins{ + PreBind: &schedulerconfig.PluginSet{ + Enabled: []schedulerconfig.Plugin{ + { + Name: preBindPluginName, + }, }, }, - }, - PostBind: &schedulerconfig.PluginSet{ - Enabled: []schedulerconfig.Plugin{ - { - Name: postBindPluginName, + PostBind: &schedulerconfig.PluginSet{ + Enabled: []schedulerconfig.Plugin{ + { + Name: postBindPluginName, + }, }, }, }, @@ -1043,7 +1060,7 @@ func TestPostBindPlugin(t *testing.T) { // Create the master and the scheduler with the test plugin set. testCtx := initTestSchedulerForFrameworkTest(t, initTestMaster(t, "postbind-plugin", nil), 2, - scheduler.WithFrameworkPlugins(plugins), + scheduler.WithProfiles(prof), scheduler.WithFrameworkOutOfTreeRegistry(registry)) defer cleanupTest(t, testCtx) @@ -1095,11 +1112,11 @@ func TestPostBindPlugin(t *testing.T) { func TestPermitPlugin(t *testing.T) { // Create a plugin registry for testing. Register only a permit plugin. perPlugin := &PermitPlugin{name: permitPluginName} - registry, plugins := initRegistryAndConfig(perPlugin) + registry, prof := initRegistryAndConfig(perPlugin) // Create the master and the scheduler with the test plugin set. testCtx := initTestSchedulerForFrameworkTest(t, initTestMaster(t, "permit-plugin", nil), 2, - scheduler.WithFrameworkPlugins(plugins), + scheduler.WithProfiles(prof), scheduler.WithFrameworkOutOfTreeRegistry(registry)) defer cleanupTest(t, testCtx) @@ -1183,11 +1200,11 @@ func TestMultiplePermitPlugins(t *testing.T) { // Create a plugin registry for testing. perPlugin1 := &PermitPlugin{name: "permit-plugin-1"} perPlugin2 := &PermitPlugin{name: "permit-plugin-2"} - registry, plugins := initRegistryAndConfig(perPlugin1, perPlugin2) + registry, prof := initRegistryAndConfig(perPlugin1, perPlugin2) // Create the master and the scheduler with the test plugin set. testCtx := initTestSchedulerForFrameworkTest(t, initTestMaster(t, "multi-permit-plugin", nil), 2, - scheduler.WithFrameworkPlugins(plugins), + scheduler.WithProfiles(prof), scheduler.WithFrameworkOutOfTreeRegistry(registry)) defer cleanupTest(t, testCtx) @@ -1238,11 +1255,11 @@ func TestPermitPluginsCancelled(t *testing.T) { // Create a plugin registry for testing. perPlugin1 := &PermitPlugin{name: "permit-plugin-1"} perPlugin2 := &PermitPlugin{name: "permit-plugin-2"} - registry, plugins := initRegistryAndConfig(perPlugin1, perPlugin2) + registry, prof := initRegistryAndConfig(perPlugin1, perPlugin2) // Create the master and the scheduler with the test plugin set. testCtx := initTestSchedulerForFrameworkTest(t, initTestMaster(t, "permit-plugins", nil), 2, - scheduler.WithFrameworkPlugins(plugins), + scheduler.WithProfiles(prof), scheduler.WithFrameworkOutOfTreeRegistry(registry)) defer cleanupTest(t, testCtx) @@ -1279,11 +1296,11 @@ func TestPermitPluginsCancelled(t *testing.T) { func TestCoSchedulingWithPermitPlugin(t *testing.T) { // Create a plugin registry for testing. Register only a permit plugin. permitPlugin := &PermitPlugin{name: permitPluginName} - registry, plugins := initRegistryAndConfig(permitPlugin) + registry, prof := initRegistryAndConfig(permitPlugin) // Create the master and the scheduler with the test plugin set. testCtx := initTestSchedulerForFrameworkTest(t, initTestMaster(t, "permit-plugin", nil), 2, - scheduler.WithFrameworkPlugins(plugins), + scheduler.WithProfiles(prof), scheduler.WithFrameworkOutOfTreeRegistry(registry)) defer cleanupTest(t, testCtx) @@ -1352,11 +1369,14 @@ func TestFilterPlugin(t *testing.T) { registry := framework.Registry{filterPluginName: newPlugin(filterPlugin)} // Setup initial filter plugin for testing. - plugins := &schedulerconfig.Plugins{ - Filter: &schedulerconfig.PluginSet{ - Enabled: []schedulerconfig.Plugin{ - { - Name: filterPluginName, + prof := schedulerconfig.KubeSchedulerProfile{ + SchedulerName: v1.DefaultSchedulerName, + Plugins: &schedulerconfig.Plugins{ + Filter: &schedulerconfig.PluginSet{ + Enabled: []schedulerconfig.Plugin{ + { + Name: filterPluginName, + }, }, }, }, @@ -1364,7 +1384,7 @@ func TestFilterPlugin(t *testing.T) { // Create the master and the scheduler with the test plugin set. testCtx := initTestSchedulerForFrameworkTest(t, initTestMaster(t, "filter-plugin", nil), 2, - scheduler.WithFrameworkPlugins(plugins), + scheduler.WithProfiles(prof), scheduler.WithFrameworkOutOfTreeRegistry(registry)) defer cleanupTest(t, testCtx) @@ -1403,11 +1423,14 @@ func TestPreScorePlugin(t *testing.T) { registry := framework.Registry{preScorePluginName: newPlugin(preScorePlugin)} // Setup initial pre-score plugin for testing. - plugins := &schedulerconfig.Plugins{ - PreScore: &schedulerconfig.PluginSet{ - Enabled: []schedulerconfig.Plugin{ - { - Name: preScorePluginName, + prof := schedulerconfig.KubeSchedulerProfile{ + SchedulerName: v1.DefaultSchedulerName, + Plugins: &schedulerconfig.Plugins{ + PreScore: &schedulerconfig.PluginSet{ + Enabled: []schedulerconfig.Plugin{ + { + Name: preScorePluginName, + }, }, }, }, @@ -1415,7 +1438,7 @@ func TestPreScorePlugin(t *testing.T) { // Create the master and the scheduler with the test plugin set. testCtx := initTestSchedulerForFrameworkTest(t, initTestMaster(t, "pre-score-plugin", nil), 2, - scheduler.WithFrameworkPlugins(plugins), + scheduler.WithProfiles(prof), scheduler.WithFrameworkOutOfTreeRegistry(registry)) defer cleanupTest(t, testCtx) @@ -1451,11 +1474,11 @@ func TestPreScorePlugin(t *testing.T) { func TestPreemptWithPermitPlugin(t *testing.T) { // Create a plugin registry for testing. Register only a permit plugin. permitPlugin := &PermitPlugin{} - registry, plugins := initRegistryAndConfig(permitPlugin) + registry, prof := initRegistryAndConfig(permitPlugin) // Create the master and the scheduler with the test plugin set. testCtx := initTestSchedulerForFrameworkTest(t, initTestMaster(t, "preempt-with-permit-plugin", nil), 0, - scheduler.WithFrameworkPlugins(plugins), + scheduler.WithProfiles(prof), scheduler.WithFrameworkOutOfTreeRegistry(registry)) defer cleanupTest(t, testCtx) @@ -1532,19 +1555,23 @@ func initTestSchedulerForFrameworkTest(t *testing.T, testCtx *testContext, nodeC // initRegistryAndConfig returns registry and plugins config based on give plugins. // TODO: refactor it to a more generic functions that accepts all kinds of Plugins as arguments -func initRegistryAndConfig(pp ...*PermitPlugin) (registry framework.Registry, plugins *schedulerconfig.Plugins) { +func initRegistryAndConfig(pp ...*PermitPlugin) (registry framework.Registry, prof schedulerconfig.KubeSchedulerProfile) { if len(pp) == 0 { return } registry = framework.Registry{} - plugins = &schedulerconfig.Plugins{ - Permit: &schedulerconfig.PluginSet{}, - } - + var plugins []schedulerconfig.Plugin for _, p := range pp { registry.Register(p.Name(), newPermitPlugin(p)) - plugins.Permit.Enabled = append(plugins.Permit.Enabled, schedulerconfig.Plugin{Name: p.Name()}) + plugins = append(plugins, schedulerconfig.Plugin{Name: p.Name()}) + } + + prof.SchedulerName = v1.DefaultSchedulerName + prof.Plugins = &schedulerconfig.Plugins{ + Permit: &schedulerconfig.PluginSet{ + Enabled: plugins, + }, } return } diff --git a/test/integration/scheduler/preemption_test.go b/test/integration/scheduler/preemption_test.go index 9a35c9a8123..e517d8d5e3a 100644 --- a/test/integration/scheduler/preemption_test.go +++ b/test/integration/scheduler/preemption_test.go @@ -128,18 +128,17 @@ func TestPreemption(t *testing.T) { if err != nil { t.Fatalf("Error registering a filter: %v", err) } - plugins := &schedulerconfig.Plugins{ - Filter: &schedulerconfig.PluginSet{ - Enabled: []schedulerconfig.Plugin{ - { - Name: filterPluginName, + prof := schedulerconfig.KubeSchedulerProfile{ + SchedulerName: v1.DefaultSchedulerName, + Plugins: &schedulerconfig.Plugins{ + Filter: &schedulerconfig.PluginSet{ + Enabled: []schedulerconfig.Plugin{ + {Name: filterPluginName}, }, }, - }, - PreFilter: &schedulerconfig.PluginSet{ - Enabled: []schedulerconfig.Plugin{ - { - Name: filterPluginName, + PreFilter: &schedulerconfig.PluginSet{ + Enabled: []schedulerconfig.Plugin{ + {Name: filterPluginName}, }, }, }, @@ -147,7 +146,7 @@ func TestPreemption(t *testing.T) { testCtx := initTestSchedulerWithOptions(t, initTestMaster(t, "preemptiom", nil), false, nil, time.Second, - scheduler.WithFrameworkPlugins(plugins), + scheduler.WithProfiles(prof), scheduler.WithFrameworkOutOfTreeRegistry(registry)) defer cleanupTest(t, testCtx) diff --git a/test/integration/scheduler/scheduler_test.go b/test/integration/scheduler/scheduler_test.go index bcc1b1cc108..eace91e14c9 100644 --- a/test/integration/scheduler/scheduler_test.go +++ b/test/integration/scheduler/scheduler_test.go @@ -37,9 +37,9 @@ import ( restclient "k8s.io/client-go/rest" "k8s.io/client-go/tools/cache" "k8s.io/client-go/tools/events" - "k8s.io/kubernetes/pkg/api/legacyscheme" "k8s.io/kubernetes/pkg/scheduler" kubeschedulerconfig "k8s.io/kubernetes/pkg/scheduler/apis/config" + "k8s.io/kubernetes/pkg/scheduler/profile" "k8s.io/kubernetes/test/integration/framework" ) @@ -270,9 +270,8 @@ priorities: [] sched, err := scheduler.New(clientSet, informerFactory, scheduler.NewPodInformer(clientSet, 0), - eventBroadcaster.NewRecorder(legacyscheme.Scheme, v1.DefaultSchedulerName), + profile.NewRecorderFactory(eventBroadcaster), nil, - scheduler.WithName(v1.DefaultSchedulerName), scheduler.WithAlgorithmSource(kubeschedulerconfig.SchedulerAlgorithmSource{ Policy: &kubeschedulerconfig.SchedulerPolicySource{ ConfigMap: &kubeschedulerconfig.SchedulerPolicyConfigMapSource{ @@ -287,7 +286,7 @@ priorities: [] t.Fatalf("couldn't make scheduler config for test %d: %v", i, err) } - schedPlugins := sched.Framework.ListPlugins() + schedPlugins := sched.Profiles[v1.DefaultSchedulerName].ListPlugins() if diff := cmp.Diff(test.expectedPlugins, schedPlugins); diff != "" { t.Errorf("unexpected plugins diff (-want, +got): %s", diff) } @@ -317,9 +316,8 @@ func TestSchedulerCreationFromNonExistentConfigMap(t *testing.T) { _, err := scheduler.New(clientSet, informerFactory, scheduler.NewPodInformer(clientSet, 0), - eventBroadcaster.NewRecorder(legacyscheme.Scheme, v1.DefaultSchedulerName), + profile.NewRecorderFactory(eventBroadcaster), nil, - scheduler.WithName(v1.DefaultSchedulerName), scheduler.WithAlgorithmSource(kubeschedulerconfig.SchedulerAlgorithmSource{ Policy: &kubeschedulerconfig.SchedulerPolicySource{ ConfigMap: &kubeschedulerconfig.SchedulerPolicyConfigMapSource{ @@ -461,7 +459,7 @@ func TestUnschedulableNodes(t *testing.T) { } } -func TestMultiScheduler(t *testing.T) { +func TestMultipleSchedulers(t *testing.T) { // This integration tests the multi-scheduler feature in the following way: // 1. create a default scheduler // 2. create a node @@ -538,7 +536,8 @@ func TestMultiScheduler(t *testing.T) { } // 5. create and start a scheduler with name "foo-scheduler" - testCtx = initTestSchedulerWithOptions(t, testCtx, true, nil, time.Second, scheduler.WithName(fooScheduler)) + fooProf := kubeschedulerconfig.KubeSchedulerProfile{SchedulerName: fooScheduler} + testCtx = initTestSchedulerWithOptions(t, testCtx, true, nil, time.Second, scheduler.WithProfiles(fooProf)) // 6. **check point-2**: // - testPodWithAnnotationFitsFoo should be scheduled diff --git a/test/integration/scheduler/util.go b/test/integration/scheduler/util.go index c9b8b9608c1..3d4b095f91e 100644 --- a/test/integration/scheduler/util.go +++ b/test/integration/scheduler/util.go @@ -46,13 +46,13 @@ import ( "k8s.io/client-go/scale" "k8s.io/client-go/tools/cache" "k8s.io/client-go/tools/events" - "k8s.io/kubernetes/pkg/api/legacyscheme" podutil "k8s.io/kubernetes/pkg/api/v1/pod" "k8s.io/kubernetes/pkg/controller/disruption" "k8s.io/kubernetes/pkg/scheduler" schedulerapi "k8s.io/kubernetes/pkg/scheduler/apis/config" "k8s.io/kubernetes/pkg/scheduler/apis/config/scheme" schedulerapiv1 "k8s.io/kubernetes/pkg/scheduler/apis/config/v1" + "k8s.io/kubernetes/pkg/scheduler/profile" taintutils "k8s.io/kubernetes/pkg/util/taints" "k8s.io/kubernetes/test/integration/framework" imageutils "k8s.io/kubernetes/test/utils/image" @@ -174,10 +174,6 @@ func initTestSchedulerWithOptions( eventBroadcaster := events.NewBroadcaster(&events.EventSinkImpl{ Interface: testCtx.clientSet.EventsV1beta1().Events(""), }) - recorder := eventBroadcaster.NewRecorder( - legacyscheme.Scheme, - v1.DefaultSchedulerName, - ) if policy != nil { opts = append(opts, scheduler.WithAlgorithmSource(createAlgorithmSourceFromPolicy(policy, testCtx.clientSet))) } @@ -186,7 +182,7 @@ func initTestSchedulerWithOptions( testCtx.clientSet, testCtx.informerFactory, podInformer, - recorder, + profile.NewRecorderFactory(eventBroadcaster), testCtx.ctx.Done(), opts..., ) diff --git a/test/integration/util/BUILD b/test/integration/util/BUILD index 4b9e82af0ef..473a316e14d 100644 --- a/test/integration/util/BUILD +++ b/test/integration/util/BUILD @@ -13,9 +13,9 @@ go_library( ], importpath = "k8s.io/kubernetes/test/integration/util", deps = [ - "//pkg/api/legacyscheme:go_default_library", "//pkg/controller/volume/persistentvolume/util:go_default_library", "//pkg/scheduler:go_default_library", + "//pkg/scheduler/profile:go_default_library", "//staging/src/k8s.io/api/core/v1:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library", "//staging/src/k8s.io/client-go/informers:go_default_library", diff --git a/test/integration/util/util.go b/test/integration/util/util.go index d535d96afd5..a11f42aaab0 100644 --- a/test/integration/util/util.go +++ b/test/integration/util/util.go @@ -29,9 +29,9 @@ import ( "k8s.io/client-go/tools/cache" "k8s.io/client-go/tools/events" "k8s.io/klog" - "k8s.io/kubernetes/pkg/api/legacyscheme" pvutil "k8s.io/kubernetes/pkg/controller/volume/persistentvolume/util" "k8s.io/kubernetes/pkg/scheduler" + "k8s.io/kubernetes/pkg/scheduler/profile" "k8s.io/kubernetes/test/integration/framework" ) @@ -69,12 +69,12 @@ func StartScheduler(clientSet clientset.Interface) (*scheduler.Scheduler, corein evtBroadcaster.StartRecordingToSink(ctx.Done()) - recorder := evtBroadcaster.NewRecorder( - legacyscheme.Scheme, - v1.DefaultSchedulerName, - ) - - sched, err := createScheduler(clientSet, informerFactory, podInformer, recorder, ctx.Done()) + sched, err := scheduler.New( + clientSet, + informerFactory, + podInformer, + profile.NewRecorderFactory(evtBroadcaster), + ctx.Done()) if err != nil { klog.Fatalf("Error creating scheduler: %v", err) } @@ -131,20 +131,3 @@ func StartFakePVController(clientSet clientset.Interface) ShutdownFunc { informerFactory.Start(ctx.Done()) return ShutdownFunc(cancel) } - -// createScheduler create a scheduler with given informer factory and default name. -func createScheduler( - clientSet clientset.Interface, - informerFactory informers.SharedInformerFactory, - podInformer coreinformers.PodInformer, - recorder events.EventRecorder, - stopCh <-chan struct{}, -) (*scheduler.Scheduler, error) { - return scheduler.New( - clientSet, - informerFactory, - podInformer, - recorder, - stopCh, - ) -} diff --git a/test/integration/volumescheduling/BUILD b/test/integration/volumescheduling/BUILD index e3cc3842987..ba42c42784a 100644 --- a/test/integration/volumescheduling/BUILD +++ b/test/integration/volumescheduling/BUILD @@ -54,9 +54,9 @@ go_library( srcs = ["util.go"], importpath = "k8s.io/kubernetes/test/integration/volumescheduling", deps = [ - "//pkg/api/legacyscheme:go_default_library", "//pkg/api/v1/pod:go_default_library", "//pkg/scheduler:go_default_library", + "//pkg/scheduler/profile:go_default_library", "//staging/src/k8s.io/api/core/v1:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/runtime/schema:go_default_library", @@ -64,7 +64,6 @@ go_library( "//staging/src/k8s.io/apimachinery/pkg/util/wait:go_default_library", "//staging/src/k8s.io/apiserver/pkg/admission:go_default_library", "//staging/src/k8s.io/client-go/informers:go_default_library", - "//staging/src/k8s.io/client-go/informers/core/v1:go_default_library", "//staging/src/k8s.io/client-go/kubernetes:go_default_library", "//staging/src/k8s.io/client-go/rest:go_default_library", "//staging/src/k8s.io/client-go/tools/events:go_default_library", diff --git a/test/integration/volumescheduling/util.go b/test/integration/volumescheduling/util.go index 729efaa417a..49b2bfbc3a7 100644 --- a/test/integration/volumescheduling/util.go +++ b/test/integration/volumescheduling/util.go @@ -30,13 +30,12 @@ import ( "k8s.io/apimachinery/pkg/util/wait" "k8s.io/apiserver/pkg/admission" "k8s.io/client-go/informers" - coreinformers "k8s.io/client-go/informers/core/v1" clientset "k8s.io/client-go/kubernetes" restclient "k8s.io/client-go/rest" "k8s.io/client-go/tools/events" - "k8s.io/kubernetes/pkg/api/legacyscheme" podutil "k8s.io/kubernetes/pkg/api/v1/pod" "k8s.io/kubernetes/pkg/scheduler" + "k8s.io/kubernetes/pkg/scheduler/profile" "k8s.io/kubernetes/test/integration/framework" ) @@ -108,14 +107,14 @@ func initTestSchedulerWithOptions( eventBroadcaster := events.NewBroadcaster(&events.EventSinkImpl{ Interface: testCtx.clientSet.EventsV1beta1().Events(""), }) - recorder := eventBroadcaster.NewRecorder( - legacyscheme.Scheme, - v1.DefaultSchedulerName, - ) var err error - testCtx.scheduler, err = createSchedulerWithPodInformer( - testCtx.clientSet, podInformer, testCtx.informerFactory, recorder, testCtx.ctx.Done()) + testCtx.scheduler, err = scheduler.New( + testCtx.clientSet, + testCtx.informerFactory, + podInformer, + profile.NewRecorderFactory(eventBroadcaster), + testCtx.ctx.Done()) if err != nil { t.Fatalf("Couldn't create scheduler: %v", err) @@ -130,23 +129,6 @@ func initTestSchedulerWithOptions( return testCtx } -// createSchedulerWithPodInformer creates a new scheduler. -func createSchedulerWithPodInformer( - clientSet clientset.Interface, - podInformer coreinformers.PodInformer, - informerFactory informers.SharedInformerFactory, - recorder events.EventRecorder, - stopCh <-chan struct{}, -) (*scheduler.Scheduler, error) { - return scheduler.New( - clientSet, - informerFactory, - podInformer, - recorder, - stopCh, - ) -} - // cleanupTest deletes the scheduler and the test namespace. It should be called // at the end of a test. func cleanupTest(t *testing.T, testCtx *testContext) {