diff --git a/pkg/scheduler/core/generic_scheduler.go b/pkg/scheduler/core/generic_scheduler.go index 4bd26daf157..fe44f911dd2 100644 --- a/pkg/scheduler/core/generic_scheduler.go +++ b/pkg/scheduler/core/generic_scheduler.go @@ -240,7 +240,7 @@ func (g *genericScheduler) Schedule(pod *v1.Pod, nodeLister algorithm.NodeLister } metaPrioritiesInterface := g.priorityMetaProducer(pod, g.nodeInfoSnapshot.NodeInfoMap) - priorityList, err := PrioritizeNodes(pod, g.nodeInfoSnapshot.NodeInfoMap, metaPrioritiesInterface, g.prioritizers, filteredNodes, g.extenders) + priorityList, err := PrioritizeNodes(pod, g.nodeInfoSnapshot.NodeInfoMap, metaPrioritiesInterface, g.prioritizers, filteredNodes, g.extenders, g.framework, pluginContext) if err != nil { return result, err } @@ -677,7 +677,8 @@ func PrioritizeNodes( priorityConfigs []priorities.PriorityConfig, nodes []*v1.Node, extenders []algorithm.SchedulerExtender, -) (schedulerapi.HostPriorityList, error) { + framework framework.Framework, + pluginContext *framework.PluginContext) (schedulerapi.HostPriorityList, error) { // If no priority configs are provided, then the EqualPriority function is applied // This is required to generate the priority list in the required format if len(priorityConfigs) == 0 && len(extenders) == 0 { @@ -762,6 +763,12 @@ func PrioritizeNodes( return schedulerapi.HostPriorityList{}, errors.NewAggregate(errs) } + // Run the Score plugins. + scoresMap, scoreStatus := framework.RunScorePlugins(pluginContext, pod, nodes) + if !scoreStatus.IsSuccess() { + return schedulerapi.HostPriorityList{}, scoreStatus.AsError() + } + // Summarize all scores. result := make(schedulerapi.HostPriorityList, 0, len(nodes)) @@ -772,6 +779,12 @@ func PrioritizeNodes( } } + for _, scoreList := range scoresMap { + for i := range nodes { + result[i].Score += scoreList[i] + } + } + if len(extenders) != 0 && nodes != nil { combinedScores := make(map[string]int, len(nodeNameToInfo)) for i := range extenders { diff --git a/pkg/scheduler/core/generic_scheduler_test.go b/pkg/scheduler/core/generic_scheduler_test.go index c007492abbf..ff2b208f794 100644 --- a/pkg/scheduler/core/generic_scheduler_test.go +++ b/pkg/scheduler/core/generic_scheduler_test.go @@ -720,7 +720,7 @@ func TestZeroRequest(t *testing.T) { list, err := PrioritizeNodes( test.pod, nodeNameToInfo, metaData, priorityConfigs, - schedulertesting.FakeNodeLister(test.nodes), []algorithm.SchedulerExtender{}) + schedulertesting.FakeNodeLister(test.nodes), []algorithm.SchedulerExtender{}, emptyFramework, nil) if err != nil { t.Errorf("unexpected error: %v", err) } diff --git a/pkg/scheduler/framework/v1alpha1/BUILD b/pkg/scheduler/framework/v1alpha1/BUILD index 3d2064635be..63a51c5ae93 100644 --- a/pkg/scheduler/framework/v1alpha1/BUILD +++ b/pkg/scheduler/framework/v1alpha1/BUILD @@ -14,9 +14,11 @@ go_library( deps = [ "//pkg/scheduler/apis/config:go_default_library", "//pkg/scheduler/internal/cache:go_default_library", + "//pkg/scheduler/util:go_default_library", "//staging/src/k8s.io/api/core/v1:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/runtime:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/types:go_default_library", + "//staging/src/k8s.io/client-go/util/workqueue:go_default_library", "//vendor/k8s.io/klog:go_default_library", ], ) diff --git a/pkg/scheduler/framework/v1alpha1/framework.go b/pkg/scheduler/framework/v1alpha1/framework.go index c79850ca59a..7dce3761a13 100644 --- a/pkg/scheduler/framework/v1alpha1/framework.go +++ b/pkg/scheduler/framework/v1alpha1/framework.go @@ -17,32 +17,36 @@ limitations under the License. package v1alpha1 import ( + "context" "fmt" "time" "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/types" + "k8s.io/client-go/util/workqueue" "k8s.io/klog" "k8s.io/kubernetes/pkg/scheduler/apis/config" "k8s.io/kubernetes/pkg/scheduler/internal/cache" + schedutil "k8s.io/kubernetes/pkg/scheduler/util" ) // framework is the component responsible for initializing and running scheduler // plugins. type framework struct { - registry Registry - nodeInfoSnapshot *cache.NodeInfoSnapshot - waitingPods *waitingPodsMap - plugins map[string]Plugin // a map of initialized plugins. Plugin name:plugin instance. - queueSortPlugins []QueueSortPlugin - prefilterPlugins []PrefilterPlugin - reservePlugins []ReservePlugin - prebindPlugins []PrebindPlugin - bindPlugins []BindPlugin - postbindPlugins []PostbindPlugin - unreservePlugins []UnreservePlugin - permitPlugins []PermitPlugin + registry Registry + nodeInfoSnapshot *cache.NodeInfoSnapshot + waitingPods *waitingPodsMap + pluginNameToWeightMap map[string]int + queueSortPlugins []QueueSortPlugin + prefilterPlugins []PrefilterPlugin + scorePlugins []ScorePlugin + reservePlugins []ReservePlugin + prebindPlugins []PrebindPlugin + bindPlugins []BindPlugin + postbindPlugins []PostbindPlugin + unreservePlugins []UnreservePlugin + permitPlugins []PermitPlugin } const ( @@ -55,10 +59,10 @@ var _ = Framework(&framework{}) // NewFramework initializes plugins given the configuration and the registry. func NewFramework(r Registry, plugins *config.Plugins, args []config.PluginConfig) (Framework, error) { f := &framework{ - registry: r, - nodeInfoSnapshot: cache.NewNodeInfoSnapshot(), - plugins: make(map[string]Plugin), - waitingPods: newWaitingPodsMap(), + registry: r, + nodeInfoSnapshot: cache.NewNodeInfoSnapshot(), + pluginNameToWeightMap: make(map[string]int), + waitingPods: newWaitingPodsMap(), } if plugins == nil { return f, nil @@ -71,6 +75,7 @@ func NewFramework(r Registry, plugins *config.Plugins, args []config.PluginConfi } pluginConfig := pluginNameToConfig(args) + pluginsMap := make(map[string]Plugin) for name, factory := range r { // initialize only needed plugins if _, ok := pg[name]; !ok { @@ -84,12 +89,19 @@ func NewFramework(r Registry, plugins *config.Plugins, args []config.PluginConfi if err != nil { return nil, fmt.Errorf("error initializing plugin %v: %v", name, err) } - f.plugins[name] = p + pluginsMap[name] = p + + // A weight of zero is not permitted, plugins can be disabled explicitly + // when configured. + f.pluginNameToWeightMap[name] = int(pg[name].Weight) + if f.pluginNameToWeightMap[name] == 0 { + f.pluginNameToWeightMap[name] = 1 + } } if plugins.PreFilter != nil { for _, pf := range plugins.PreFilter.Enabled { - if pg, ok := f.plugins[pf.Name]; ok { + if pg, ok := pluginsMap[pf.Name]; ok { p, ok := pg.(PrefilterPlugin) if !ok { return nil, fmt.Errorf("plugin %v does not extend prefilter plugin", pf.Name) @@ -101,9 +113,23 @@ func NewFramework(r Registry, plugins *config.Plugins, args []config.PluginConfi } } + if plugins.Score != nil { + for _, sc := range plugins.Score.Enabled { + if pg, ok := pluginsMap[sc.Name]; ok { + p, ok := pg.(ScorePlugin) + if !ok { + return nil, fmt.Errorf("plugin %v does not extend score plugin", sc.Name) + } + f.scorePlugins = append(f.scorePlugins, p) + } else { + return nil, fmt.Errorf("score plugin %v does not exist", sc.Name) + } + } + } + if plugins.Reserve != nil { for _, r := range plugins.Reserve.Enabled { - if pg, ok := f.plugins[r.Name]; ok { + if pg, ok := pluginsMap[r.Name]; ok { p, ok := pg.(ReservePlugin) if !ok { return nil, fmt.Errorf("plugin %v does not extend reserve plugin", r.Name) @@ -117,7 +143,7 @@ func NewFramework(r Registry, plugins *config.Plugins, args []config.PluginConfi if plugins.PreBind != nil { for _, pb := range plugins.PreBind.Enabled { - if pg, ok := f.plugins[pb.Name]; ok { + if pg, ok := pluginsMap[pb.Name]; ok { p, ok := pg.(PrebindPlugin) if !ok { return nil, fmt.Errorf("plugin %v does not extend prebind plugin", pb.Name) @@ -131,7 +157,7 @@ func NewFramework(r Registry, plugins *config.Plugins, args []config.PluginConfi if plugins.Bind != nil { for _, pb := range plugins.Bind.Enabled { - if pg, ok := f.plugins[pb.Name]; ok { + if pg, ok := pluginsMap[pb.Name]; ok { p, ok := pg.(BindPlugin) if !ok { return nil, fmt.Errorf("plugin %v does not extend bind plugin", pb.Name) @@ -145,7 +171,7 @@ func NewFramework(r Registry, plugins *config.Plugins, args []config.PluginConfi if plugins.PostBind != nil { for _, pb := range plugins.PostBind.Enabled { - if pg, ok := f.plugins[pb.Name]; ok { + if pg, ok := pluginsMap[pb.Name]; ok { p, ok := pg.(PostbindPlugin) if !ok { return nil, fmt.Errorf("plugin %v does not extend postbind plugin", pb.Name) @@ -159,7 +185,7 @@ func NewFramework(r Registry, plugins *config.Plugins, args []config.PluginConfi if plugins.Unreserve != nil { for _, ur := range plugins.Unreserve.Enabled { - if pg, ok := f.plugins[ur.Name]; ok { + if pg, ok := pluginsMap[ur.Name]; ok { p, ok := pg.(UnreservePlugin) if !ok { return nil, fmt.Errorf("plugin %v does not extend unreserve plugin", ur.Name) @@ -173,7 +199,7 @@ func NewFramework(r Registry, plugins *config.Plugins, args []config.PluginConfi if plugins.Permit != nil { for _, pr := range plugins.Permit.Enabled { - if pg, ok := f.plugins[pr.Name]; ok { + if pg, ok := pluginsMap[pr.Name]; ok { p, ok := pg.(PermitPlugin) if !ok { return nil, fmt.Errorf("plugin %v does not extend permit plugin", pr.Name) @@ -187,7 +213,7 @@ func NewFramework(r Registry, plugins *config.Plugins, args []config.PluginConfi if plugins.QueueSort != nil { for _, qs := range plugins.QueueSort.Enabled { - if pg, ok := f.plugins[qs.Name]; ok { + if pg, ok := pluginsMap[qs.Name]; ok { p, ok := pg.(QueueSortPlugin) if !ok { return nil, fmt.Errorf("plugin %v does not extend queue sort plugin", qs.Name) @@ -237,6 +263,43 @@ func (f *framework) RunPrefilterPlugins( return nil } +// RunScorePlugins runs the set of configured scoring plugins. It returns a map that +// stores for each scoring plugin name the corresponding NodeScoreList(s). +// It also returns *Status, which is set to non-success if any of the plugins returns +// a non-success status. +func (f *framework) RunScorePlugins(pc *PluginContext, pod *v1.Pod, nodes []*v1.Node) (PluginToNodeScoreMap, *Status) { + pluginToNodeScoreMap := make(PluginToNodeScoreMap, len(f.scorePlugins)) + for _, pl := range f.scorePlugins { + pluginToNodeScoreMap[pl.Name()] = make(NodeScoreList, len(nodes)) + } + ctx, cancel := context.WithCancel(context.Background()) + errCh := schedutil.NewErrorChannel() + workqueue.ParallelizeUntil(ctx, 16, len(nodes), func(index int) { + for _, pl := range f.scorePlugins { + weight, weightExists := f.pluginNameToWeightMap[pl.Name()] + if !weightExists { + err := fmt.Errorf("weight does not exist for plugin %v", pl.Name()) + errCh.SendErrorWithCancel(err, cancel) + return + } + score, status := pl.Score(pc, pod, nodes[index].Name) + if !status.IsSuccess() { + errCh.SendErrorWithCancel(fmt.Errorf(status.Message()), cancel) + return + } + pluginToNodeScoreMap[pl.Name()][index] = score * weight + } + }) + + if err := errCh.ReceiveError(); err != nil { + msg := fmt.Sprintf("error while running score plugin for pod %v: %v", pod.Name, err) + klog.Error(msg) + return nil, NewStatus(Error, msg) + } + + return pluginToNodeScoreMap, nil +} + // RunPrebindPlugins runs the set of configured prebind plugins. It returns a // failure (bool) if any of the plugins returns an error. It also returns an // error containing the rejection message or the error occurred in the plugin. @@ -400,8 +463,8 @@ func pluginNameToConfig(args []config.PluginConfig) map[string]*runtime.Unknown return pc } -func pluginsNeeded(plugins *config.Plugins) map[string]struct{} { - pgMap := make(map[string]struct{}, 0) +func pluginsNeeded(plugins *config.Plugins) map[string]config.Plugin { + pgMap := make(map[string]config.Plugin, 0) if plugins == nil { return pgMap @@ -412,7 +475,7 @@ func pluginsNeeded(plugins *config.Plugins) map[string]struct{} { return } for _, pg := range pgs.Enabled { - pgMap[pg.Name] = struct{}{} + pgMap[pg.Name] = pg } } find(plugins.QueueSort) diff --git a/pkg/scheduler/framework/v1alpha1/interface.go b/pkg/scheduler/framework/v1alpha1/interface.go index 8503fdd270c..1a8e647335a 100644 --- a/pkg/scheduler/framework/v1alpha1/interface.go +++ b/pkg/scheduler/framework/v1alpha1/interface.go @@ -30,6 +30,12 @@ import ( // Code is the Status code/type which is returned from plugins. type Code int +// NodeScoreList declares a list of nodes and their scores. +type NodeScoreList []int + +// PluginToNodeScoreMap declares a map from plugin name to its NodeScoreList. +type PluginToNodeScoreMap map[string]NodeScoreList + // These are predefined codes used in a Status. const ( // Success means that plugin ran correctly and found pod schedulable. @@ -137,6 +143,16 @@ type PrefilterPlugin interface { Prefilter(pc *PluginContext, p *v1.Pod) *Status } +// ScorePlugin is an interface that must be implemented by "score" plugins to rank +// nodes that passed the filtering phase. +type ScorePlugin interface { + Plugin + // Score is called on each filtered node. It must return success and an integer + // indicating the rank of the node. All scoring plugins must return success or + // the pod will be rejected. + Score(pc *PluginContext, p *v1.Pod, nodeName string) (int, *Status) +} + // ReservePlugin is an interface for Reserve plugins. These plugins are called // at the reservation point. These are meant to update the state of the plugin. // This concept used to be called 'assume' in the original scheduler. @@ -220,6 +236,12 @@ type Framework interface { // cycle is aborted. RunPrefilterPlugins(pc *PluginContext, pod *v1.Pod) *Status + // RunScorePlugins runs the set of configured scoring plugins. It returns a map that + // stores for each scoring plugin name the corresponding NodeScoreList(s). + // It also returns *Status, which is set to non-success if any of the plugins returns + // a non-success status. + RunScorePlugins(pc *PluginContext, pod *v1.Pod, nodes []*v1.Node) (PluginToNodeScoreMap, *Status) + // RunPrebindPlugins runs the set of configured prebind plugins. It returns // *Status and its code is set to non-success if any of the plugins returns // anything but Success. If the Status code is "Unschedulable", it is diff --git a/pkg/scheduler/internal/queue/scheduling_queue_test.go b/pkg/scheduler/internal/queue/scheduling_queue_test.go index 2197a54f207..e46d4a26205 100644 --- a/pkg/scheduler/internal/queue/scheduling_queue_test.go +++ b/pkg/scheduler/internal/queue/scheduling_queue_test.go @@ -171,6 +171,10 @@ func (*fakeFramework) RunPrefilterPlugins(pc *framework.PluginContext, pod *v1.P return nil } +func (*fakeFramework) RunScorePlugins(pc *framework.PluginContext, pod *v1.Pod, nodes []*v1.Node) (framework.PluginToNodeScoreMap, *framework.Status) { + return nil, nil +} + func (*fakeFramework) RunPrebindPlugins(pc *framework.PluginContext, pod *v1.Pod, nodeName string) *framework.Status { return nil } diff --git a/test/integration/scheduler/framework_test.go b/test/integration/scheduler/framework_test.go index 7c2db6d68b1..61d0d2b48ef 100644 --- a/test/integration/scheduler/framework_test.go +++ b/test/integration/scheduler/framework_test.go @@ -36,6 +36,12 @@ type PrefilterPlugin struct { rejectPrefilter bool } +type ScorePlugin struct { + failScore bool + numCalled int + highScoreNode string +} + type ReservePlugin struct { numReserveCalled int failReserve bool @@ -79,6 +85,7 @@ type PermitPlugin struct { const ( prefilterPluginName = "prefilter-plugin" + scorePluginName = "score-plugin" reservePluginName = "reserve-plugin" prebindPluginName = "prebind-plugin" unreservePluginName = "unreserve-plugin" @@ -87,6 +94,7 @@ const ( ) var _ = framework.PrefilterPlugin(&PrefilterPlugin{}) +var _ = framework.ScorePlugin(&ScorePlugin{}) var _ = framework.ReservePlugin(&ReservePlugin{}) var _ = framework.PrebindPlugin(&PrebindPlugin{}) var _ = framework.BindPlugin(&BindPlugin{}) @@ -94,6 +102,38 @@ var _ = framework.PostbindPlugin(&PostbindPlugin{}) var _ = framework.UnreservePlugin(&UnreservePlugin{}) var _ = framework.PermitPlugin(&PermitPlugin{}) +// Name returns name of the score plugin. +func (sp *ScorePlugin) Name() string { + return scorePluginName +} + +// reset returns name of the score plugin. +func (sp *ScorePlugin) reset() { + sp.failScore = false + sp.numCalled = 0 + sp.highScoreNode = "" +} + +var scPlugin = &ScorePlugin{} + +// Score returns the score of scheduling a pod on a specific node. +func (sp *ScorePlugin) Score(pc *framework.PluginContext, p *v1.Pod, nodeName string) (int, *framework.Status) { + sp.numCalled++ + if sp.failScore { + return 0, framework.NewStatus(framework.Error, fmt.Sprintf("injecting failure for pod %v", p.Name)) + } + score := 10 + if nodeName == sp.highScoreNode { + score = 100 + } + return score, nil +} + +// NewScorePlugin is the factory for score plugin. +func NewScorePlugin(_ *runtime.Unknown, _ framework.FrameworkHandle) (framework.Plugin, error) { + return scPlugin, nil +} + // Name returns name of the plugin. func (rp *ReservePlugin) Name() string { return reservePluginName @@ -402,6 +442,73 @@ func TestPrefilterPlugin(t *testing.T) { } } +// TestScorePlugin tests invocation of score plugins. +func TestScorePlugin(t *testing.T) { + // Create a plugin registry for testing. Register only a score plugin. + registry := framework.Registry{scorePluginName: NewScorePlugin} + + // Setup initial score plugin for testing. + plugins := &schedulerconfig.Plugins{ + Score: &schedulerconfig.PluginSet{ + Enabled: []schedulerconfig.Plugin{ + { + Name: scorePluginName, + }, + }, + }, + } + // Set empty plugin config for testing + emptyPluginConfig := []schedulerconfig.PluginConfig{} + + // Create the master and the scheduler with the test plugin set. + context := initTestSchedulerWithOptions(t, + initTestMaster(t, "score-plugin", nil), + false, nil, registry, plugins, emptyPluginConfig, false, time.Second) + defer cleanupTest(t, context) + + cs := context.clientSet + // Add multiple nodes, one of them will be scored much higher than the others. + nodes, err := createNodes(cs, "test-node", nil, 10) + if err != nil { + t.Fatalf("Cannot create nodes: %v", err) + } + scPlugin.highScoreNode = nodes[3].Name + + for i, fail := range []bool{false, true} { + scPlugin.failScore = fail + // Create a best effort pod. + pod, err := createPausePod(cs, + initPausePod(cs, &pausePodConfig{Name: "test-pod", Namespace: context.ns.Name})) + if err != nil { + t.Fatalf("Error while creating a test pod: %v", err) + } + + if fail { + if err = waitForPodUnschedulable(cs, pod); err != nil { + t.Errorf("test #%v: Didn't expect the pod to be scheduled. error: %v", i, err) + } + } else { + if err = waitForPodToSchedule(cs, pod); err != nil { + t.Errorf("Expected the pod to be scheduled. error: %v", err) + } else { + p, err := getPod(cs, pod.Name, pod.Namespace) + if err != nil { + t.Errorf("Failed to retrieve the pod. error: %v", err) + } else if p.Spec.NodeName != scPlugin.highScoreNode { + t.Errorf("Expected the pod to be scheduled on node %q, got %q", scPlugin.highScoreNode, p.Spec.NodeName) + } + } + } + + if scPlugin.numCalled == 0 { + t.Errorf("Expected the reserve plugin to be called.") + } + + scPlugin.reset() + cleanupPods(cs, t, []*v1.Pod{pod}) + } +} + // TestReservePlugin tests invocation of reserve plugins. func TestReservePlugin(t *testing.T) { // Create a plugin registry for testing. Register only a reserve plugin. @@ -444,7 +551,7 @@ func TestReservePlugin(t *testing.T) { if fail { if err = wait.Poll(10*time.Millisecond, 30*time.Second, podSchedulingError(cs, pod.Namespace, pod.Name)); err != nil { - t.Errorf("Didn't expected the pod to be scheduled. error: %v", err) + t.Errorf("Didn't expect the pod to be scheduled. error: %v", err) } } else { if err = waitForPodToSchedule(cs, pod); err != nil { diff --git a/test/integration/scheduler/util.go b/test/integration/scheduler/util.go index 63aec5349be..3b3a9dbe6bc 100644 --- a/test/integration/scheduler/util.go +++ b/test/integration/scheduler/util.go @@ -734,6 +734,10 @@ func deletePod(cs clientset.Interface, podName string, nsName string) error { return cs.CoreV1().Pods(nsName).Delete(podName, metav1.NewDeleteOptions(0)) } +func getPod(cs clientset.Interface, podName string, podNamespace string) (*v1.Pod, error) { + return cs.CoreV1().Pods(podNamespace).Get(podName, metav1.GetOptions{}) +} + // cleanupPods deletes the given pods and waits for them to be actually deleted. func cleanupPods(cs clientset.Interface, t *testing.T, pods []*v1.Pod) { for _, p := range pods {