RequestedToCapacityRatio as score plugin

This commit is contained in:
Shintaro Murakami 2019-10-29 22:48:34 +09:00
parent f8b45a12f4
commit 4b72af9f00
9 changed files with 292 additions and 21 deletions

View File

@ -23,6 +23,7 @@ go_library(
"//pkg/scheduler/core:go_default_library",
"//pkg/scheduler/framework/plugins:go_default_library",
"//pkg/scheduler/framework/plugins/nodelabel:go_default_library",
"//pkg/scheduler/framework/plugins/requestedtocapacityratio:go_default_library",
"//pkg/scheduler/framework/v1alpha1:go_default_library",
"//pkg/scheduler/internal/cache:go_default_library",
"//pkg/scheduler/internal/cache/debugger:go_default_library",

View File

@ -34,6 +34,7 @@ import (
schedulerapi "k8s.io/kubernetes/pkg/scheduler/api"
"k8s.io/kubernetes/pkg/scheduler/framework/plugins"
"k8s.io/kubernetes/pkg/scheduler/framework/plugins/nodelabel"
"k8s.io/kubernetes/pkg/scheduler/framework/plugins/requestedtocapacityratio"
framework "k8s.io/kubernetes/pkg/scheduler/framework/v1alpha1"
schedulerlisters "k8s.io/kubernetes/pkg/scheduler/listers"
"k8s.io/kubernetes/pkg/scheduler/volumebinder"
@ -372,8 +373,9 @@ func RegisterPriorityConfigFactory(name string, pcf PriorityConfigFactory) strin
// RegisterCustomPriorityFunction registers a custom priority function with the algorithm registry.
// Returns the name, with which the priority function was registered.
func RegisterCustomPriorityFunction(policy schedulerapi.PriorityPolicy) string {
func RegisterCustomPriorityFunction(policy schedulerapi.PriorityPolicy, args *plugins.ConfigProducerArgs) string {
var pcf *PriorityConfigFactory
name := policy.Name
validatePriorityOrDie(policy)
@ -401,17 +403,23 @@ func RegisterCustomPriorityFunction(policy schedulerapi.PriorityPolicy) string {
Weight: policy.Weight,
}
} else if policy.Argument.RequestedToCapacityRatioArguments != nil {
scoringFunctionShape, resources := buildScoringFunctionShapeFromRequestedToCapacityRatioArguments(policy.Argument.RequestedToCapacityRatioArguments)
args.RequestedToCapacityRatioArgs = &requestedtocapacityratio.Args{
FunctionShape: scoringFunctionShape,
ResourceToWeightMap: resources,
}
pcf = &PriorityConfigFactory{
MapReduceFunction: func(args PluginFactoryArgs) (priorities.PriorityMapFunction, priorities.PriorityReduceFunction) {
scoringFunctionShape, resources := buildScoringFunctionShapeFromRequestedToCapacityRatioArguments(policy.Argument.RequestedToCapacityRatioArguments)
p := priorities.RequestedToCapacityRatioResourceAllocationPriority(scoringFunctionShape, resources)
return p.PriorityMap, nil
},
Weight: policy.Weight,
}
// We do not allow specifying the name for custom plugins, see #83472
name = requestedtocapacityratio.Name
}
} else if existingPcf, ok := priorityFunctionMap[policy.Name]; ok {
klog.V(2).Infof("Priority type %s already registered, reusing.", policy.Name)
} else if existingPcf, ok := priorityFunctionMap[name]; ok {
klog.V(2).Infof("Priority type %s already registered, reusing.", name)
// set/update the weight based on the policy
pcf = &PriorityConfigFactory{
Function: existingPcf.Function,
@ -421,10 +429,10 @@ func RegisterCustomPriorityFunction(policy schedulerapi.PriorityPolicy) string {
}
if pcf == nil {
klog.Fatalf("Invalid configuration: Priority type not found for %s", policy.Name)
klog.Fatalf("Invalid configuration: Priority type not found for %s", name)
}
return RegisterPriorityConfigFactory(policy.Name, *pcf)
return RegisterPriorityConfigFactory(name, *pcf)
}
func buildScoringFunctionShapeFromRequestedToCapacityRatioArguments(arguments *schedulerapi.RequestedToCapacityRatioArguments) (priorities.FunctionShape, priorities.ResourceToWeightMap) {

View File

@ -788,7 +788,6 @@ func TestCompatibility_v1_Scheduler(t *testing.T) {
"EqualPriority",
"SelectorSpreadPriority",
"InterPodAffinityPriority",
"RequestedToCapacityRatioPriority",
),
wantPlugins: map[string][]kubeschedulerconfig.Plugin{
"FilterPlugin": {
@ -814,6 +813,7 @@ func TestCompatibility_v1_Scheduler(t *testing.T) {
{Name: "NodeResourcesMostAllocated", Weight: 2},
{Name: "NodeAffinity", Weight: 2},
{Name: "NodePreferAvoidPods", Weight: 2},
{Name: "RequestedToCapacityRatio", Weight: 2},
{Name: "TaintToleration", Weight: 2},
},
},
@ -896,7 +896,6 @@ func TestCompatibility_v1_Scheduler(t *testing.T) {
"EqualPriority",
"SelectorSpreadPriority",
"InterPodAffinityPriority",
"RequestedToCapacityRatioPriority",
),
wantPlugins: map[string][]kubeschedulerconfig.Plugin{
"FilterPlugin": {
@ -923,6 +922,7 @@ func TestCompatibility_v1_Scheduler(t *testing.T) {
{Name: "NodeResourcesMostAllocated", Weight: 2},
{Name: "NodeAffinity", Weight: 2},
{Name: "NodePreferAvoidPods", Weight: 2},
{Name: "RequestedToCapacityRatio", Weight: 2},
{Name: "TaintToleration", Weight: 2},
},
},
@ -1004,7 +1004,6 @@ func TestCompatibility_v1_Scheduler(t *testing.T) {
"EqualPriority",
"SelectorSpreadPriority",
"InterPodAffinityPriority",
"RequestedToCapacityRatioPriority",
),
wantPlugins: map[string][]kubeschedulerconfig.Plugin{
"FilterPlugin": {
@ -1032,6 +1031,7 @@ func TestCompatibility_v1_Scheduler(t *testing.T) {
{Name: "NodeResourcesMostAllocated", Weight: 2},
{Name: "NodeAffinity", Weight: 2},
{Name: "NodePreferAvoidPods", Weight: 2},
{Name: "RequestedToCapacityRatio", Weight: 2},
{Name: "TaintToleration", Weight: 2},
},
},
@ -1117,7 +1117,6 @@ func TestCompatibility_v1_Scheduler(t *testing.T) {
"EqualPriority",
"SelectorSpreadPriority",
"InterPodAffinityPriority",
"RequestedToCapacityRatioPriority",
),
wantPlugins: map[string][]kubeschedulerconfig.Plugin{
"FilterPlugin": {
@ -1145,6 +1144,7 @@ func TestCompatibility_v1_Scheduler(t *testing.T) {
{Name: "NodeResourcesMostAllocated", Weight: 2},
{Name: "NodeAffinity", Weight: 2},
{Name: "NodePreferAvoidPods", Weight: 2},
{Name: "RequestedToCapacityRatio", Weight: 2},
{Name: "TaintToleration", Weight: 2},
},
},
@ -1194,6 +1194,7 @@ func TestCompatibility_v1_Scheduler(t *testing.T) {
"NodeResourcesLeastAllocated": "LeastRequestedPriority",
"NodeResourcesBalancedAllocation": "BalancedResourceAllocation",
"NodeResourcesMostAllocated": "MostRequestedPriority",
"RequestedToCapacityRatio": "RequestedToCapacityRatioPriority",
}
for v, tc := range schedulerFiles {

View File

@ -341,7 +341,7 @@ func (c *Configurator) CreateFromConfig(policy schedulerapi.Policy) (*Config, er
} else {
for _, priority := range policy.Priorities {
klog.V(2).Infof("Registering priority: %s", priority.Name)
priorityKeys.Insert(RegisterCustomPriorityFunction(priority))
priorityKeys.Insert(RegisterCustomPriorityFunction(priority, c.configProducerArgs))
}
}

View File

@ -20,6 +20,7 @@ go_library(
"//pkg/scheduler/framework/plugins/nodeunschedulable:go_default_library",
"//pkg/scheduler/framework/plugins/nodevolumelimits:go_default_library",
"//pkg/scheduler/framework/plugins/podtopologyspread:go_default_library",
"//pkg/scheduler/framework/plugins/requestedtocapacityratio:go_default_library",
"//pkg/scheduler/framework/plugins/tainttoleration:go_default_library",
"//pkg/scheduler/framework/plugins/volumebinding:go_default_library",
"//pkg/scheduler/framework/plugins/volumerestrictions:go_default_library",
@ -55,6 +56,7 @@ filegroup(
"//pkg/scheduler/framework/plugins/nodeunschedulable:all-srcs",
"//pkg/scheduler/framework/plugins/nodevolumelimits:all-srcs",
"//pkg/scheduler/framework/plugins/podtopologyspread:all-srcs",
"//pkg/scheduler/framework/plugins/requestedtocapacityratio:all-srcs",
"//pkg/scheduler/framework/plugins/tainttoleration:all-srcs",
"//pkg/scheduler/framework/plugins/volumebinding:all-srcs",
"//pkg/scheduler/framework/plugins/volumerestrictions:all-srcs",

View File

@ -36,6 +36,7 @@ import (
"k8s.io/kubernetes/pkg/scheduler/framework/plugins/nodeunschedulable"
"k8s.io/kubernetes/pkg/scheduler/framework/plugins/nodevolumelimits"
"k8s.io/kubernetes/pkg/scheduler/framework/plugins/podtopologyspread"
"k8s.io/kubernetes/pkg/scheduler/framework/plugins/requestedtocapacityratio"
"k8s.io/kubernetes/pkg/scheduler/framework/plugins/tainttoleration"
"k8s.io/kubernetes/pkg/scheduler/framework/plugins/volumebinding"
"k8s.io/kubernetes/pkg/scheduler/framework/plugins/volumerestrictions"
@ -78,6 +79,7 @@ func NewDefaultRegistry(args *RegistryArgs) framework.Registry {
nodevolumelimits.CinderName: nodevolumelimits.NewCinder,
interpodaffinity.Name: interpodaffinity.New,
nodelabel.Name: nodelabel.New,
requestedtocapacityratio.Name: requestedtocapacityratio.New,
}
}
@ -89,6 +91,8 @@ type ConfigProducerArgs struct {
Weight int32
// NodeLabelArgs is the args for the NodeLabel plugin.
NodeLabelArgs *nodelabel.Args
// RequestedToCapacityRatioArgs is the args for the RequestedToCapacityRatio plugin.
RequestedToCapacityRatioArgs *requestedtocapacityratio.Args
}
// ConfigProducer produces a framework's configuration.
@ -200,16 +204,7 @@ func NewDefaultConfigProducerRegistry() *ConfigProducerRegistry {
registry.RegisterPredicate(nodelabel.Name,
func(args ConfigProducerArgs) (plugins config.Plugins, pluginConfig []config.PluginConfig) {
plugins.Filter = appendToPluginSet(plugins.Filter, nodelabel.Name, nil)
encoding, err := json.Marshal(args.NodeLabelArgs)
if err != nil {
klog.Fatalf("Failed to marshal %+v", args.NodeLabelArgs)
return
}
config := config.PluginConfig{
Name: nodelabel.Name,
Args: runtime.Unknown{Raw: encoding},
}
pluginConfig = append(pluginConfig, config)
pluginConfig = append(pluginConfig, makePluginConfig(nodelabel.Name, args.NodeLabelArgs))
return
})
@ -253,6 +248,13 @@ func NewDefaultConfigProducerRegistry() *ConfigProducerRegistry {
return
})
registry.RegisterPriority(requestedtocapacityratio.Name,
func(args ConfigProducerArgs) (plugins config.Plugins, pluginConfig []config.PluginConfig) {
plugins.Score = appendToPluginSet(plugins.Score, requestedtocapacityratio.Name, &args.Weight)
pluginConfig = append(pluginConfig, makePluginConfig(requestedtocapacityratio.Name, args.RequestedToCapacityRatioArgs))
return
})
return registry
}
@ -285,3 +287,16 @@ func appendToPluginSet(set *config.PluginSet, name string, weight *int32) *confi
set.Enabled = append(set.Enabled, cfg)
return set
}
func makePluginConfig(pluginName string, args interface{}) config.PluginConfig {
encoding, err := json.Marshal(args)
if err != nil {
klog.Fatal(fmt.Errorf("Failed to marshal %+v: %v", args, err))
return config.PluginConfig{}
}
config := config.PluginConfig{
Name: pluginName,
Args: runtime.Unknown{Raw: encoding},
}
return config
}

View File

@ -0,0 +1,43 @@
load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test")
go_library(
name = "go_default_library",
srcs = ["requested_to_capacity_ratio.go"],
importpath = "k8s.io/kubernetes/pkg/scheduler/framework/plugins/requestedtocapacityratio",
visibility = ["//visibility:public"],
deps = [
"//pkg/scheduler/algorithm/priorities:go_default_library",
"//pkg/scheduler/framework/plugins/migration:go_default_library",
"//pkg/scheduler/framework/v1alpha1:go_default_library",
"//staging/src/k8s.io/api/core/v1:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/runtime:go_default_library",
],
)
go_test(
name = "go_default_test",
srcs = ["requested_to_capacity_ratio_test.go"],
embed = [":go_default_library"],
deps = [
"//pkg/scheduler/framework/v1alpha1:go_default_library",
"//pkg/scheduler/nodeinfo/snapshot:go_default_library",
"//staging/src/k8s.io/api/core/v1:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/api/resource:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/runtime:go_default_library",
],
)
filegroup(
name = "package-srcs",
srcs = glob(["**"]),
tags = ["automanaged"],
visibility = ["//visibility:private"],
)
filegroup(
name = "all-srcs",
srcs = [":package-srcs"],
tags = ["automanaged"],
visibility = ["//visibility:public"],
)

View File

@ -0,0 +1,80 @@
/*
Copyright 2019 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package requestedtocapacityratio
import (
"context"
"fmt"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/kubernetes/pkg/scheduler/algorithm/priorities"
"k8s.io/kubernetes/pkg/scheduler/framework/plugins/migration"
framework "k8s.io/kubernetes/pkg/scheduler/framework/v1alpha1"
)
// Name of this plugin.
const Name = "RequestedToCapacityRatio"
// Args holds the args that are used to configure the plugin.
type Args struct {
FunctionShape priorities.FunctionShape
ResourceToWeightMap priorities.ResourceToWeightMap
}
// New initializes a new plugin and returns it.
func New(plArgs *runtime.Unknown, handle framework.FrameworkHandle) (framework.Plugin, error) {
args := &Args{}
if err := framework.DecodeInto(plArgs, args); err != nil {
return nil, err
}
p := priorities.RequestedToCapacityRatioResourceAllocationPriority(args.FunctionShape, args.ResourceToWeightMap)
return &RequestedToCapacityRatio{
handle: handle,
prioritize: p.PriorityMap,
}, nil
}
// RequestedToCapacityRatio is a score plugin that allow users to apply bin packing
// on core resources like CPU, Memory as well as extended resources like accelerators.
type RequestedToCapacityRatio struct {
handle framework.FrameworkHandle
prioritize priorities.PriorityMapFunction
}
var _ framework.ScorePlugin = &RequestedToCapacityRatio{}
// Name returns name of the plugin. It is used in logs, etc.
func (pl *RequestedToCapacityRatio) Name() string {
return Name
}
// Score invoked at the score extension point.
func (pl *RequestedToCapacityRatio) Score(ctx context.Context, _ *framework.CycleState, pod *v1.Pod, nodeName string) (int64, *framework.Status) {
nodeInfo, err := pl.handle.SnapshotSharedLister().NodeInfos().Get(nodeName)
if err != nil {
return 0, framework.NewStatus(framework.Error, fmt.Sprintf("getting node %q from Snapshot: %v", nodeName, err))
}
// Note that RequestedToCapacityRatioPriority doesn't use priority metadata, hence passing nil here.
s, err := pl.prioritize(pod, nil, nodeInfo)
return s.Score, migration.ErrorToFrameworkStatus(err)
}
// ScoreExtensions of the Score plugin.
func (pl *RequestedToCapacityRatio) ScoreExtensions() framework.ScoreExtensions {
return nil
}

View File

@ -0,0 +1,121 @@
/*
Copyright 2019 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package requestedtocapacityratio
import (
"context"
"reflect"
"testing"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
framework "k8s.io/kubernetes/pkg/scheduler/framework/v1alpha1"
nodeinfosnapshot "k8s.io/kubernetes/pkg/scheduler/nodeinfo/snapshot"
)
func TestRequestedToCapacityRatio(t *testing.T) {
type test struct {
name string
requestedPod *v1.Pod
nodes []*v1.Node
scheduledPods []*v1.Pod
expectedPriorities framework.NodeScoreList
}
tests := []test{
{
name: "nothing scheduled, nothing requested (default - least requested nodes have priority)",
requestedPod: makePod("", 0, 0),
nodes: []*v1.Node{makeNode("node1", 4000, 10000), makeNode("node2", 4000, 10000)},
scheduledPods: []*v1.Pod{makePod("node1", 0, 0), makePod("node2", 0, 0)},
expectedPriorities: []framework.NodeScore{{Name: "node1", Score: 100}, {Name: "node2", Score: 100}},
},
{
name: "nothing scheduled, resources requested, differently sized machines (default - least requested nodes have priority)",
requestedPod: makePod("", 3000, 5000),
nodes: []*v1.Node{makeNode("node1", 4000, 10000), makeNode("node2", 6000, 10000)},
scheduledPods: []*v1.Pod{makePod("node1", 0, 0), makePod("node2", 0, 0)},
expectedPriorities: []framework.NodeScore{{Name: "node1", Score: 38}, {Name: "node2", Score: 50}},
},
{
name: "no resources requested, pods scheduled with resources (default - least requested nodes have priority)",
requestedPod: makePod("", 0, 0),
nodes: []*v1.Node{makeNode("node1", 4000, 10000), makeNode("node2", 6000, 10000)},
scheduledPods: []*v1.Pod{makePod("node1", 3000, 5000), makePod("node2", 3000, 5000)},
expectedPriorities: []framework.NodeScore{{Name: "node1", Score: 38}, {Name: "node2", Score: 50}},
},
}
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
state := framework.NewCycleState()
snapshot := nodeinfosnapshot.NewSnapshot(test.scheduledPods, test.nodes)
fh, _ := framework.NewFramework(nil, nil, nil, framework.WithNodeInfoSnapshot(snapshot))
args := &runtime.Unknown{Raw: []byte(`{"FunctionShape" : [{"Utilization" : 0, "Score" : 100}, {"Utilization" : 100, "Score" : 0}], "ResourceToWeightMap" : {"memory" : 1, "cpu" : 1}}`)}
p, _ := New(args, fh)
var gotPriorities framework.NodeScoreList
for _, n := range test.nodes {
score, status := p.(framework.ScorePlugin).Score(context.Background(), state, test.requestedPod, n.Name)
if !status.IsSuccess() {
t.Errorf("unexpected error: %v", status)
}
gotPriorities = append(gotPriorities, framework.NodeScore{Name: n.Name, Score: score})
}
if !reflect.DeepEqual(test.expectedPriorities, gotPriorities) {
t.Errorf("expected:\n\t%+v,\ngot:\n\t%+v", test.expectedPriorities, gotPriorities)
}
})
}
}
func makeNode(name string, milliCPU, memory int64) *v1.Node {
return &v1.Node{
ObjectMeta: metav1.ObjectMeta{Name: name},
Status: v1.NodeStatus{
Capacity: v1.ResourceList{
v1.ResourceCPU: *resource.NewMilliQuantity(milliCPU, resource.DecimalSI),
v1.ResourceMemory: *resource.NewQuantity(memory, resource.BinarySI),
},
Allocatable: v1.ResourceList{
v1.ResourceCPU: *resource.NewMilliQuantity(milliCPU, resource.DecimalSI),
v1.ResourceMemory: *resource.NewQuantity(memory, resource.BinarySI),
},
},
}
}
func makePod(node string, milliCPU, memory int64) *v1.Pod {
return &v1.Pod{
Spec: v1.PodSpec{
NodeName: node,
Containers: []v1.Container{
{
Resources: v1.ResourceRequirements{
Requests: v1.ResourceList{
v1.ResourceCPU: *resource.NewMilliQuantity(milliCPU, resource.DecimalSI),
v1.ResourceMemory: *resource.NewQuantity(memory, resource.DecimalSI),
},
},
},
},
},
}
}