Merge pull request #79109 from ahg-g/scoring

Score plugin for the scheduling framework.
This commit is contained in:
Kubernetes Prow Robot 2019-07-16 21:22:34 -07:00 committed by GitHub
commit a3898dc41d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 247 additions and 32 deletions

View File

@ -240,7 +240,7 @@ func (g *genericScheduler) Schedule(pod *v1.Pod, nodeLister algorithm.NodeLister
}
metaPrioritiesInterface := g.priorityMetaProducer(pod, g.nodeInfoSnapshot.NodeInfoMap)
priorityList, err := PrioritizeNodes(pod, g.nodeInfoSnapshot.NodeInfoMap, metaPrioritiesInterface, g.prioritizers, filteredNodes, g.extenders)
priorityList, err := PrioritizeNodes(pod, g.nodeInfoSnapshot.NodeInfoMap, metaPrioritiesInterface, g.prioritizers, filteredNodes, g.extenders, g.framework, pluginContext)
if err != nil {
return result, err
}
@ -677,7 +677,8 @@ func PrioritizeNodes(
priorityConfigs []priorities.PriorityConfig,
nodes []*v1.Node,
extenders []algorithm.SchedulerExtender,
) (schedulerapi.HostPriorityList, error) {
framework framework.Framework,
pluginContext *framework.PluginContext) (schedulerapi.HostPriorityList, error) {
// If no priority configs are provided, then the EqualPriority function is applied
// This is required to generate the priority list in the required format
if len(priorityConfigs) == 0 && len(extenders) == 0 {
@ -762,6 +763,12 @@ func PrioritizeNodes(
return schedulerapi.HostPriorityList{}, errors.NewAggregate(errs)
}
// Run the Score plugins.
scoresMap, scoreStatus := framework.RunScorePlugins(pluginContext, pod, nodes)
if !scoreStatus.IsSuccess() {
return schedulerapi.HostPriorityList{}, scoreStatus.AsError()
}
// Summarize all scores.
result := make(schedulerapi.HostPriorityList, 0, len(nodes))
@ -772,6 +779,12 @@ func PrioritizeNodes(
}
}
for _, scoreList := range scoresMap {
for i := range nodes {
result[i].Score += scoreList[i]
}
}
if len(extenders) != 0 && nodes != nil {
combinedScores := make(map[string]int, len(nodeNameToInfo))
for i := range extenders {

View File

@ -720,7 +720,7 @@ func TestZeroRequest(t *testing.T) {
list, err := PrioritizeNodes(
test.pod, nodeNameToInfo, metaData, priorityConfigs,
schedulertesting.FakeNodeLister(test.nodes), []algorithm.SchedulerExtender{})
schedulertesting.FakeNodeLister(test.nodes), []algorithm.SchedulerExtender{}, emptyFramework, nil)
if err != nil {
t.Errorf("unexpected error: %v", err)
}

View File

@ -14,9 +14,11 @@ go_library(
deps = [
"//pkg/scheduler/apis/config:go_default_library",
"//pkg/scheduler/internal/cache:go_default_library",
"//pkg/scheduler/util:go_default_library",
"//staging/src/k8s.io/api/core/v1:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/runtime:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/types:go_default_library",
"//staging/src/k8s.io/client-go/util/workqueue:go_default_library",
"//vendor/k8s.io/klog:go_default_library",
],
)

View File

@ -17,32 +17,36 @@ limitations under the License.
package v1alpha1
import (
"context"
"fmt"
"time"
"k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/util/workqueue"
"k8s.io/klog"
"k8s.io/kubernetes/pkg/scheduler/apis/config"
"k8s.io/kubernetes/pkg/scheduler/internal/cache"
schedutil "k8s.io/kubernetes/pkg/scheduler/util"
)
// framework is the component responsible for initializing and running scheduler
// plugins.
type framework struct {
registry Registry
nodeInfoSnapshot *cache.NodeInfoSnapshot
waitingPods *waitingPodsMap
plugins map[string]Plugin // a map of initialized plugins. Plugin name:plugin instance.
queueSortPlugins []QueueSortPlugin
prefilterPlugins []PrefilterPlugin
reservePlugins []ReservePlugin
prebindPlugins []PrebindPlugin
bindPlugins []BindPlugin
postbindPlugins []PostbindPlugin
unreservePlugins []UnreservePlugin
permitPlugins []PermitPlugin
registry Registry
nodeInfoSnapshot *cache.NodeInfoSnapshot
waitingPods *waitingPodsMap
pluginNameToWeightMap map[string]int
queueSortPlugins []QueueSortPlugin
prefilterPlugins []PrefilterPlugin
scorePlugins []ScorePlugin
reservePlugins []ReservePlugin
prebindPlugins []PrebindPlugin
bindPlugins []BindPlugin
postbindPlugins []PostbindPlugin
unreservePlugins []UnreservePlugin
permitPlugins []PermitPlugin
}
const (
@ -55,10 +59,10 @@ var _ = Framework(&framework{})
// NewFramework initializes plugins given the configuration and the registry.
func NewFramework(r Registry, plugins *config.Plugins, args []config.PluginConfig) (Framework, error) {
f := &framework{
registry: r,
nodeInfoSnapshot: cache.NewNodeInfoSnapshot(),
plugins: make(map[string]Plugin),
waitingPods: newWaitingPodsMap(),
registry: r,
nodeInfoSnapshot: cache.NewNodeInfoSnapshot(),
pluginNameToWeightMap: make(map[string]int),
waitingPods: newWaitingPodsMap(),
}
if plugins == nil {
return f, nil
@ -71,6 +75,7 @@ func NewFramework(r Registry, plugins *config.Plugins, args []config.PluginConfi
}
pluginConfig := pluginNameToConfig(args)
pluginsMap := make(map[string]Plugin)
for name, factory := range r {
// initialize only needed plugins
if _, ok := pg[name]; !ok {
@ -84,12 +89,19 @@ func NewFramework(r Registry, plugins *config.Plugins, args []config.PluginConfi
if err != nil {
return nil, fmt.Errorf("error initializing plugin %v: %v", name, err)
}
f.plugins[name] = p
pluginsMap[name] = p
// A weight of zero is not permitted, plugins can be disabled explicitly
// when configured.
f.pluginNameToWeightMap[name] = int(pg[name].Weight)
if f.pluginNameToWeightMap[name] == 0 {
f.pluginNameToWeightMap[name] = 1
}
}
if plugins.PreFilter != nil {
for _, pf := range plugins.PreFilter.Enabled {
if pg, ok := f.plugins[pf.Name]; ok {
if pg, ok := pluginsMap[pf.Name]; ok {
p, ok := pg.(PrefilterPlugin)
if !ok {
return nil, fmt.Errorf("plugin %v does not extend prefilter plugin", pf.Name)
@ -101,9 +113,23 @@ func NewFramework(r Registry, plugins *config.Plugins, args []config.PluginConfi
}
}
if plugins.Score != nil {
for _, sc := range plugins.Score.Enabled {
if pg, ok := pluginsMap[sc.Name]; ok {
p, ok := pg.(ScorePlugin)
if !ok {
return nil, fmt.Errorf("plugin %v does not extend score plugin", sc.Name)
}
f.scorePlugins = append(f.scorePlugins, p)
} else {
return nil, fmt.Errorf("score plugin %v does not exist", sc.Name)
}
}
}
if plugins.Reserve != nil {
for _, r := range plugins.Reserve.Enabled {
if pg, ok := f.plugins[r.Name]; ok {
if pg, ok := pluginsMap[r.Name]; ok {
p, ok := pg.(ReservePlugin)
if !ok {
return nil, fmt.Errorf("plugin %v does not extend reserve plugin", r.Name)
@ -117,7 +143,7 @@ func NewFramework(r Registry, plugins *config.Plugins, args []config.PluginConfi
if plugins.PreBind != nil {
for _, pb := range plugins.PreBind.Enabled {
if pg, ok := f.plugins[pb.Name]; ok {
if pg, ok := pluginsMap[pb.Name]; ok {
p, ok := pg.(PrebindPlugin)
if !ok {
return nil, fmt.Errorf("plugin %v does not extend prebind plugin", pb.Name)
@ -131,7 +157,7 @@ func NewFramework(r Registry, plugins *config.Plugins, args []config.PluginConfi
if plugins.Bind != nil {
for _, pb := range plugins.Bind.Enabled {
if pg, ok := f.plugins[pb.Name]; ok {
if pg, ok := pluginsMap[pb.Name]; ok {
p, ok := pg.(BindPlugin)
if !ok {
return nil, fmt.Errorf("plugin %v does not extend bind plugin", pb.Name)
@ -145,7 +171,7 @@ func NewFramework(r Registry, plugins *config.Plugins, args []config.PluginConfi
if plugins.PostBind != nil {
for _, pb := range plugins.PostBind.Enabled {
if pg, ok := f.plugins[pb.Name]; ok {
if pg, ok := pluginsMap[pb.Name]; ok {
p, ok := pg.(PostbindPlugin)
if !ok {
return nil, fmt.Errorf("plugin %v does not extend postbind plugin", pb.Name)
@ -159,7 +185,7 @@ func NewFramework(r Registry, plugins *config.Plugins, args []config.PluginConfi
if plugins.Unreserve != nil {
for _, ur := range plugins.Unreserve.Enabled {
if pg, ok := f.plugins[ur.Name]; ok {
if pg, ok := pluginsMap[ur.Name]; ok {
p, ok := pg.(UnreservePlugin)
if !ok {
return nil, fmt.Errorf("plugin %v does not extend unreserve plugin", ur.Name)
@ -173,7 +199,7 @@ func NewFramework(r Registry, plugins *config.Plugins, args []config.PluginConfi
if plugins.Permit != nil {
for _, pr := range plugins.Permit.Enabled {
if pg, ok := f.plugins[pr.Name]; ok {
if pg, ok := pluginsMap[pr.Name]; ok {
p, ok := pg.(PermitPlugin)
if !ok {
return nil, fmt.Errorf("plugin %v does not extend permit plugin", pr.Name)
@ -187,7 +213,7 @@ func NewFramework(r Registry, plugins *config.Plugins, args []config.PluginConfi
if plugins.QueueSort != nil {
for _, qs := range plugins.QueueSort.Enabled {
if pg, ok := f.plugins[qs.Name]; ok {
if pg, ok := pluginsMap[qs.Name]; ok {
p, ok := pg.(QueueSortPlugin)
if !ok {
return nil, fmt.Errorf("plugin %v does not extend queue sort plugin", qs.Name)
@ -237,6 +263,43 @@ func (f *framework) RunPrefilterPlugins(
return nil
}
// RunScorePlugins runs the set of configured scoring plugins. It returns a map that
// stores for each scoring plugin name the corresponding NodeScoreList(s).
// It also returns *Status, which is set to non-success if any of the plugins returns
// a non-success status.
func (f *framework) RunScorePlugins(pc *PluginContext, pod *v1.Pod, nodes []*v1.Node) (PluginToNodeScoreMap, *Status) {
pluginToNodeScoreMap := make(PluginToNodeScoreMap, len(f.scorePlugins))
for _, pl := range f.scorePlugins {
pluginToNodeScoreMap[pl.Name()] = make(NodeScoreList, len(nodes))
}
ctx, cancel := context.WithCancel(context.Background())
errCh := schedutil.NewErrorChannel()
workqueue.ParallelizeUntil(ctx, 16, len(nodes), func(index int) {
for _, pl := range f.scorePlugins {
weight, weightExists := f.pluginNameToWeightMap[pl.Name()]
if !weightExists {
err := fmt.Errorf("weight does not exist for plugin %v", pl.Name())
errCh.SendErrorWithCancel(err, cancel)
return
}
score, status := pl.Score(pc, pod, nodes[index].Name)
if !status.IsSuccess() {
errCh.SendErrorWithCancel(fmt.Errorf(status.Message()), cancel)
return
}
pluginToNodeScoreMap[pl.Name()][index] = score * weight
}
})
if err := errCh.ReceiveError(); err != nil {
msg := fmt.Sprintf("error while running score plugin for pod %v: %v", pod.Name, err)
klog.Error(msg)
return nil, NewStatus(Error, msg)
}
return pluginToNodeScoreMap, nil
}
// RunPrebindPlugins runs the set of configured prebind plugins. It returns a
// failure (bool) if any of the plugins returns an error. It also returns an
// error containing the rejection message or the error occurred in the plugin.
@ -400,8 +463,8 @@ func pluginNameToConfig(args []config.PluginConfig) map[string]*runtime.Unknown
return pc
}
func pluginsNeeded(plugins *config.Plugins) map[string]struct{} {
pgMap := make(map[string]struct{}, 0)
func pluginsNeeded(plugins *config.Plugins) map[string]config.Plugin {
pgMap := make(map[string]config.Plugin, 0)
if plugins == nil {
return pgMap
@ -412,7 +475,7 @@ func pluginsNeeded(plugins *config.Plugins) map[string]struct{} {
return
}
for _, pg := range pgs.Enabled {
pgMap[pg.Name] = struct{}{}
pgMap[pg.Name] = pg
}
}
find(plugins.QueueSort)

View File

@ -30,6 +30,12 @@ import (
// Code is the Status code/type which is returned from plugins.
type Code int
// NodeScoreList declares a list of nodes and their scores.
type NodeScoreList []int
// PluginToNodeScoreMap declares a map from plugin name to its NodeScoreList.
type PluginToNodeScoreMap map[string]NodeScoreList
// These are predefined codes used in a Status.
const (
// Success means that plugin ran correctly and found pod schedulable.
@ -137,6 +143,16 @@ type PrefilterPlugin interface {
Prefilter(pc *PluginContext, p *v1.Pod) *Status
}
// ScorePlugin is an interface that must be implemented by "score" plugins to rank
// nodes that passed the filtering phase.
type ScorePlugin interface {
Plugin
// Score is called on each filtered node. It must return success and an integer
// indicating the rank of the node. All scoring plugins must return success or
// the pod will be rejected.
Score(pc *PluginContext, p *v1.Pod, nodeName string) (int, *Status)
}
// ReservePlugin is an interface for Reserve plugins. These plugins are called
// at the reservation point. These are meant to update the state of the plugin.
// This concept used to be called 'assume' in the original scheduler.
@ -220,6 +236,12 @@ type Framework interface {
// cycle is aborted.
RunPrefilterPlugins(pc *PluginContext, pod *v1.Pod) *Status
// RunScorePlugins runs the set of configured scoring plugins. It returns a map that
// stores for each scoring plugin name the corresponding NodeScoreList(s).
// It also returns *Status, which is set to non-success if any of the plugins returns
// a non-success status.
RunScorePlugins(pc *PluginContext, pod *v1.Pod, nodes []*v1.Node) (PluginToNodeScoreMap, *Status)
// RunPrebindPlugins runs the set of configured prebind plugins. It returns
// *Status and its code is set to non-success if any of the plugins returns
// anything but Success. If the Status code is "Unschedulable", it is

View File

@ -171,6 +171,10 @@ func (*fakeFramework) RunPrefilterPlugins(pc *framework.PluginContext, pod *v1.P
return nil
}
func (*fakeFramework) RunScorePlugins(pc *framework.PluginContext, pod *v1.Pod, nodes []*v1.Node) (framework.PluginToNodeScoreMap, *framework.Status) {
return nil, nil
}
func (*fakeFramework) RunPrebindPlugins(pc *framework.PluginContext, pod *v1.Pod, nodeName string) *framework.Status {
return nil
}

View File

@ -36,6 +36,12 @@ type PrefilterPlugin struct {
rejectPrefilter bool
}
type ScorePlugin struct {
failScore bool
numCalled int
highScoreNode string
}
type ReservePlugin struct {
numReserveCalled int
failReserve bool
@ -79,6 +85,7 @@ type PermitPlugin struct {
const (
prefilterPluginName = "prefilter-plugin"
scorePluginName = "score-plugin"
reservePluginName = "reserve-plugin"
prebindPluginName = "prebind-plugin"
unreservePluginName = "unreserve-plugin"
@ -87,6 +94,7 @@ const (
)
var _ = framework.PrefilterPlugin(&PrefilterPlugin{})
var _ = framework.ScorePlugin(&ScorePlugin{})
var _ = framework.ReservePlugin(&ReservePlugin{})
var _ = framework.PrebindPlugin(&PrebindPlugin{})
var _ = framework.BindPlugin(&BindPlugin{})
@ -94,6 +102,38 @@ var _ = framework.PostbindPlugin(&PostbindPlugin{})
var _ = framework.UnreservePlugin(&UnreservePlugin{})
var _ = framework.PermitPlugin(&PermitPlugin{})
// Name returns name of the score plugin.
func (sp *ScorePlugin) Name() string {
return scorePluginName
}
// reset returns name of the score plugin.
func (sp *ScorePlugin) reset() {
sp.failScore = false
sp.numCalled = 0
sp.highScoreNode = ""
}
var scPlugin = &ScorePlugin{}
// Score returns the score of scheduling a pod on a specific node.
func (sp *ScorePlugin) Score(pc *framework.PluginContext, p *v1.Pod, nodeName string) (int, *framework.Status) {
sp.numCalled++
if sp.failScore {
return 0, framework.NewStatus(framework.Error, fmt.Sprintf("injecting failure for pod %v", p.Name))
}
score := 10
if nodeName == sp.highScoreNode {
score = 100
}
return score, nil
}
// NewScorePlugin is the factory for score plugin.
func NewScorePlugin(_ *runtime.Unknown, _ framework.FrameworkHandle) (framework.Plugin, error) {
return scPlugin, nil
}
// Name returns name of the plugin.
func (rp *ReservePlugin) Name() string {
return reservePluginName
@ -402,6 +442,73 @@ func TestPrefilterPlugin(t *testing.T) {
}
}
// TestScorePlugin tests invocation of score plugins.
func TestScorePlugin(t *testing.T) {
// Create a plugin registry for testing. Register only a score plugin.
registry := framework.Registry{scorePluginName: NewScorePlugin}
// Setup initial score plugin for testing.
plugins := &schedulerconfig.Plugins{
Score: &schedulerconfig.PluginSet{
Enabled: []schedulerconfig.Plugin{
{
Name: scorePluginName,
},
},
},
}
// Set empty plugin config for testing
emptyPluginConfig := []schedulerconfig.PluginConfig{}
// Create the master and the scheduler with the test plugin set.
context := initTestSchedulerWithOptions(t,
initTestMaster(t, "score-plugin", nil),
false, nil, registry, plugins, emptyPluginConfig, false, time.Second)
defer cleanupTest(t, context)
cs := context.clientSet
// Add multiple nodes, one of them will be scored much higher than the others.
nodes, err := createNodes(cs, "test-node", nil, 10)
if err != nil {
t.Fatalf("Cannot create nodes: %v", err)
}
scPlugin.highScoreNode = nodes[3].Name
for i, fail := range []bool{false, true} {
scPlugin.failScore = fail
// Create a best effort pod.
pod, err := createPausePod(cs,
initPausePod(cs, &pausePodConfig{Name: "test-pod", Namespace: context.ns.Name}))
if err != nil {
t.Fatalf("Error while creating a test pod: %v", err)
}
if fail {
if err = waitForPodUnschedulable(cs, pod); err != nil {
t.Errorf("test #%v: Didn't expect the pod to be scheduled. error: %v", i, err)
}
} else {
if err = waitForPodToSchedule(cs, pod); err != nil {
t.Errorf("Expected the pod to be scheduled. error: %v", err)
} else {
p, err := getPod(cs, pod.Name, pod.Namespace)
if err != nil {
t.Errorf("Failed to retrieve the pod. error: %v", err)
} else if p.Spec.NodeName != scPlugin.highScoreNode {
t.Errorf("Expected the pod to be scheduled on node %q, got %q", scPlugin.highScoreNode, p.Spec.NodeName)
}
}
}
if scPlugin.numCalled == 0 {
t.Errorf("Expected the reserve plugin to be called.")
}
scPlugin.reset()
cleanupPods(cs, t, []*v1.Pod{pod})
}
}
// TestReservePlugin tests invocation of reserve plugins.
func TestReservePlugin(t *testing.T) {
// Create a plugin registry for testing. Register only a reserve plugin.
@ -444,7 +551,7 @@ func TestReservePlugin(t *testing.T) {
if fail {
if err = wait.Poll(10*time.Millisecond, 30*time.Second, podSchedulingError(cs, pod.Namespace, pod.Name)); err != nil {
t.Errorf("Didn't expected the pod to be scheduled. error: %v", err)
t.Errorf("Didn't expect the pod to be scheduled. error: %v", err)
}
} else {
if err = waitForPodToSchedule(cs, pod); err != nil {

View File

@ -734,6 +734,10 @@ func deletePod(cs clientset.Interface, podName string, nsName string) error {
return cs.CoreV1().Pods(nsName).Delete(podName, metav1.NewDeleteOptions(0))
}
func getPod(cs clientset.Interface, podName string, podNamespace string) (*v1.Pod, error) {
return cs.CoreV1().Pods(podNamespace).Get(podName, metav1.GetOptions{})
}
// cleanupPods deletes the given pods and waits for them to be actually deleted.
func cleanupPods(cs clientset.Interface, t *testing.T, pods []*v1.Pod) {
for _, p := range pods {