diff --git a/pkg/controller/volume/scheduling/scheduler_binder.go b/pkg/controller/volume/scheduling/scheduler_binder.go index ae277ef5b31..6c6fd5443c4 100644 --- a/pkg/controller/volume/scheduling/scheduler_binder.go +++ b/pkg/controller/volume/scheduling/scheduler_binder.go @@ -82,6 +82,28 @@ type BindingInfo struct { pv *v1.PersistentVolume } +// StorageClassName returns the name of the storage class. +func (b *BindingInfo) StorageClassName() string { + return b.pv.Spec.StorageClassName +} + +// VolumeResource represents volume resource. +type VolumeResource struct { + Requested int64 + Capacity int64 +} + +// VolumeResource returns volume resource. +func (b *BindingInfo) VolumeResource() *VolumeResource { + // both fields are mandatory + requestedQty := b.pvc.Spec.Resources.Requests[v1.ResourceName(v1.ResourceStorage)] + capacitQty := b.pv.Spec.Capacity[v1.ResourceName(v1.ResourceStorage)] + return &VolumeResource{ + Requested: requestedQty.Value(), + Capacity: capacitQty.Value(), + } +} + // PodVolumes holds pod's volumes information used in volume scheduling. type PodVolumes struct { // StaticBindings are binding decisions for PVCs which can be bound to diff --git a/pkg/scheduler/algorithmprovider/registry.go b/pkg/scheduler/algorithmprovider/registry.go index 809f014353b..c1adea5a44e 100644 --- a/pkg/scheduler/algorithmprovider/registry.go +++ b/pkg/scheduler/algorithmprovider/registry.go @@ -69,7 +69,7 @@ func ListAlgorithmProviders() string { } func getDefaultConfig() *schedulerapi.Plugins { - return &schedulerapi.Plugins{ + plugins := &schedulerapi.Plugins{ QueueSort: schedulerapi.PluginSet{ Enabled: []schedulerapi.Plugin{ {Name: queuesort.Name}, @@ -148,6 +148,10 @@ func getDefaultConfig() *schedulerapi.Plugins { }, }, } + if utilfeature.DefaultFeatureGate.Enabled(features.VolumeCapacityPriority) { + plugins.Score.Enabled = append(plugins.Score.Enabled, schedulerapi.Plugin{Name: volumebinding.Name, Weight: 1}) + } + return plugins } func getClusterAutoscalerConfig() *schedulerapi.Plugins { diff --git a/pkg/scheduler/framework/plugins/helper/shape_score.go b/pkg/scheduler/framework/plugins/helper/shape_score.go new file mode 100644 index 00000000000..4da1e76f17a --- /dev/null +++ b/pkg/scheduler/framework/plugins/helper/shape_score.go @@ -0,0 +1,51 @@ +/* +Copyright 2021 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package helper + +// FunctionShape represents a collection of FunctionShapePoint. +type FunctionShape []FunctionShapePoint + +// FunctionShapePoint represents a shape point. +type FunctionShapePoint struct { + // Utilization is function argument. + Utilization int64 + // Score is function value. + Score int64 +} + +// BuildBrokenLinearFunction creates a function which is built using linear segments. Segments are defined via shape array. +// Shape[i].Utilization slice represents points on "Utilization" axis where different segments meet. +// Shape[i].Score represents function values at meeting points. +// +// function f(p) is defined as: +// shape[0].Score for p < f[0].Utilization +// shape[i].Score for p == shape[i].Utilization +// shape[n-1].Score for p > shape[n-1].Utilization +// and linear between points (p < shape[i].Utilization) +func BuildBrokenLinearFunction(shape FunctionShape) func(int64) int64 { + return func(p int64) int64 { + for i := 0; i < len(shape); i++ { + if p <= int64(shape[i].Utilization) { + if i == 0 { + return shape[0].Score + } + return shape[i-1].Score + (shape[i].Score-shape[i-1].Score)*(p-shape[i-1].Utilization)/(shape[i].Utilization-shape[i-1].Utilization) + } + } + return shape[len(shape)-1].Score + } +} diff --git a/pkg/scheduler/framework/plugins/noderesources/requested_to_capacity_ratio.go b/pkg/scheduler/framework/plugins/noderesources/requested_to_capacity_ratio.go index 27ff11beed1..b0bf7ab4b48 100755 --- a/pkg/scheduler/framework/plugins/noderesources/requested_to_capacity_ratio.go +++ b/pkg/scheduler/framework/plugins/noderesources/requested_to_capacity_ratio.go @@ -26,6 +26,7 @@ import ( "k8s.io/kubernetes/pkg/scheduler/apis/config" "k8s.io/kubernetes/pkg/scheduler/apis/config/validation" "k8s.io/kubernetes/pkg/scheduler/framework" + "k8s.io/kubernetes/pkg/scheduler/framework/plugins/helper" ) const ( @@ -34,15 +35,6 @@ const ( maxUtilization = 100 ) -type functionShape []functionShapePoint - -type functionShapePoint struct { - // utilization is function argument. - utilization int64 - // score is function value. - score int64 -} - // NewRequestedToCapacityRatio initializes a new plugin and returns it. func NewRequestedToCapacityRatio(plArgs runtime.Object, handle framework.Handle) (framework.Plugin, error) { args, err := getRequestedToCapacityRatioArgs(plArgs) @@ -54,14 +46,14 @@ func NewRequestedToCapacityRatio(plArgs runtime.Object, handle framework.Handle) return nil, err } - shape := make([]functionShapePoint, 0, len(args.Shape)) + shape := make([]helper.FunctionShapePoint, 0, len(args.Shape)) for _, point := range args.Shape { - shape = append(shape, functionShapePoint{ - utilization: int64(point.Utilization), + shape = append(shape, helper.FunctionShapePoint{ + Utilization: int64(point.Utilization), // MaxCustomPriorityScore may diverge from the max score used in the scheduler and defined by MaxNodeScore, // therefore we need to scale the score returned by requested to capacity ratio to the score range // used by the scheduler. - score: int64(point.Score) * (framework.MaxNodeScore / config.MaxCustomPriorityScore), + Score: int64(point.Score) * (framework.MaxNodeScore / config.MaxCustomPriorityScore), }) } @@ -120,8 +112,8 @@ func (pl *RequestedToCapacityRatio) ScoreExtensions() framework.ScoreExtensions return nil } -func buildRequestedToCapacityRatioScorerFunction(scoringFunctionShape functionShape, resourceToWeightMap resourceToWeightMap) func(resourceToValueMap, resourceToValueMap, bool, int, int) int64 { - rawScoringFunction := buildBrokenLinearFunction(scoringFunctionShape) +func buildRequestedToCapacityRatioScorerFunction(scoringFunctionShape helper.FunctionShape, resourceToWeightMap resourceToWeightMap) func(resourceToValueMap, resourceToValueMap, bool, int, int) int64 { + rawScoringFunction := helper.BuildBrokenLinearFunction(scoringFunctionShape) resourceScoringFunction := func(requested, capacity int64) int64 { if capacity == 0 || requested > capacity { return rawScoringFunction(maxUtilization) @@ -144,26 +136,3 @@ func buildRequestedToCapacityRatioScorerFunction(scoringFunctionShape functionSh return int64(math.Round(float64(nodeScore) / float64(weightSum))) } } - -// Creates a function which is built using linear segments. Segments are defined via shape array. -// Shape[i].utilization slice represents points on "utilization" axis where different segments meet. -// Shape[i].score represents function values at meeting points. -// -// function f(p) is defined as: -// shape[0].score for p < f[0].utilization -// shape[i].score for p == shape[i].utilization -// shape[n-1].score for p > shape[n-1].utilization -// and linear between points (p < shape[i].utilization) -func buildBrokenLinearFunction(shape functionShape) func(int64) int64 { - return func(p int64) int64 { - for i := 0; i < len(shape); i++ { - if p <= int64(shape[i].utilization) { - if i == 0 { - return shape[0].score - } - return shape[i-1].score + (shape[i].score-shape[i-1].score)*(p-shape[i-1].utilization)/(shape[i].utilization-shape[i-1].utilization) - } - } - return shape[len(shape)-1].score - } -} diff --git a/pkg/scheduler/framework/plugins/noderesources/requested_to_capacity_ratio_test.go b/pkg/scheduler/framework/plugins/noderesources/requested_to_capacity_ratio_test.go index 17254b3fe84..b73a32420f6 100644 --- a/pkg/scheduler/framework/plugins/noderesources/requested_to_capacity_ratio_test.go +++ b/pkg/scheduler/framework/plugins/noderesources/requested_to_capacity_ratio_test.go @@ -27,6 +27,7 @@ import ( "k8s.io/apimachinery/pkg/api/resource" "k8s.io/kubernetes/pkg/scheduler/apis/config" "k8s.io/kubernetes/pkg/scheduler/framework" + "k8s.io/kubernetes/pkg/scheduler/framework/plugins/helper" "k8s.io/kubernetes/pkg/scheduler/framework/runtime" "k8s.io/kubernetes/pkg/scheduler/internal/cache" ) @@ -124,13 +125,13 @@ func TestBrokenLinearFunction(t *testing.T) { expected int64 } type Test struct { - points []functionShapePoint + points []helper.FunctionShapePoint assertions []Assertion } tests := []Test{ { - points: []functionShapePoint{{10, 1}, {90, 9}}, + points: []helper.FunctionShapePoint{{Utilization: 10, Score: 1}, {Utilization: 90, Score: 9}}, assertions: []Assertion{ {p: -10, expected: 1}, {p: 0, expected: 1}, @@ -147,7 +148,7 @@ func TestBrokenLinearFunction(t *testing.T) { }, }, { - points: []functionShapePoint{{0, 2}, {40, 10}, {100, 0}}, + points: []helper.FunctionShapePoint{{Utilization: 0, Score: 2}, {Utilization: 40, Score: 10}, {Utilization: 100, Score: 0}}, assertions: []Assertion{ {p: -10, expected: 2}, {p: 0, expected: 2}, @@ -160,7 +161,7 @@ func TestBrokenLinearFunction(t *testing.T) { }, }, { - points: []functionShapePoint{{0, 2}, {40, 2}, {100, 2}}, + points: []helper.FunctionShapePoint{{Utilization: 0, Score: 2}, {Utilization: 40, Score: 2}, {Utilization: 100, Score: 2}}, assertions: []Assertion{ {p: -10, expected: 2}, {p: 0, expected: 2}, @@ -176,7 +177,7 @@ func TestBrokenLinearFunction(t *testing.T) { for i, test := range tests { t.Run(fmt.Sprintf("case_%d", i), func(t *testing.T) { - function := buildBrokenLinearFunction(test.points) + function := helper.BuildBrokenLinearFunction(test.points) for _, assertion := range test.assertions { assert.InDelta(t, assertion.expected, function(assertion.p), 0.1, "points=%v, p=%f", test.points, assertion.p) } diff --git a/pkg/scheduler/framework/plugins/volumebinding/scorer.go b/pkg/scheduler/framework/plugins/volumebinding/scorer.go new file mode 100644 index 00000000000..fc08216bf75 --- /dev/null +++ b/pkg/scheduler/framework/plugins/volumebinding/scorer.go @@ -0,0 +1,52 @@ +/* +Copyright 2021 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package volumebinding + +import ( + "math" + + "k8s.io/kubernetes/pkg/controller/volume/scheduling" + "k8s.io/kubernetes/pkg/scheduler/framework/plugins/helper" +) + +type classResourceMap map[string]*scheduling.VolumeResource + +type volumeCapacityScorer func(classResourceMap) int64 + +func buildScorerFunction(scoringFunctionShape helper.FunctionShape) volumeCapacityScorer { + rawScoringFunction := helper.BuildBrokenLinearFunction(scoringFunctionShape) + f := func(requested, capacity int64) int64 { + if capacity == 0 || requested > capacity { + return rawScoringFunction(maxUtilization) + } + + return rawScoringFunction(requested * maxUtilization / capacity) + } + return func(classResources classResourceMap) int64 { + var nodeScore, weightSum int64 + for _, resource := range classResources { + classScore := f(resource.Requested, resource.Capacity) + nodeScore += classScore + // in alpha stage, all classes have the same weight + weightSum += 1 + } + if weightSum == 0 { + return 0 + } + return int64(math.Round(float64(nodeScore) / float64(weightSum))) + } +} diff --git a/pkg/scheduler/framework/plugins/volumebinding/volume_binding.go b/pkg/scheduler/framework/plugins/volumebinding/volume_binding.go index 19d20676962..b09ad7f2079 100644 --- a/pkg/scheduler/framework/plugins/volumebinding/volume_binding.go +++ b/pkg/scheduler/framework/plugins/volumebinding/volume_binding.go @@ -33,6 +33,7 @@ import ( "k8s.io/kubernetes/pkg/features" "k8s.io/kubernetes/pkg/scheduler/apis/config" "k8s.io/kubernetes/pkg/scheduler/framework" + "k8s.io/kubernetes/pkg/scheduler/framework/plugins/helper" ) const ( @@ -40,6 +41,8 @@ const ( DefaultBindTimeoutSeconds = 600 stateKey framework.StateKey = Name + + maxUtilization = 100 ) // the state is initialized in PreFilter phase. because we save the pointer in @@ -68,12 +71,14 @@ type VolumeBinding struct { Binder scheduling.SchedulerVolumeBinder PVCLister corelisters.PersistentVolumeClaimLister GenericEphemeralVolumeFeatureEnabled bool + scorer volumeCapacityScorer } var _ framework.PreFilterPlugin = &VolumeBinding{} var _ framework.FilterPlugin = &VolumeBinding{} var _ framework.ReservePlugin = &VolumeBinding{} var _ framework.PreBindPlugin = &VolumeBinding{} +var _ framework.ScorePlugin = &VolumeBinding{} // Name is the name of the plugin used in Registry and configurations. const Name = "VolumeBinding" @@ -214,6 +219,54 @@ func (pl *VolumeBinding) Filter(ctx context.Context, cs *framework.CycleState, p return nil } +var ( + // TODO (for alpha) make it configurable in config.VolumeBindingArgs + defaultShapePoint = []config.UtilizationShapePoint{ + { + Utilization: 0, + Score: 0, + }, + { + Utilization: 100, + Score: int32(config.MaxCustomPriorityScore), + }, + } +) + +// Score invoked at the score extension point. +func (pl *VolumeBinding) Score(ctx context.Context, cs *framework.CycleState, pod *v1.Pod, nodeName string) (int64, *framework.Status) { + if pl.scorer == nil { + return 0, nil + } + state, err := getStateData(cs) + if err != nil { + return 0, framework.AsStatus(err) + } + podVolumes, ok := state.podVolumesByNode[nodeName] + if !ok { + return 0, nil + } + // group by storage class + classToWeight := make(classToWeightMap) + requestedClassToValueMap := make(map[string]int64) + capacityClassToValueMap := make(map[string]int64) + for _, staticBinding := range podVolumes.StaticBindings { + class := staticBinding.StorageClassName() + volumeResource := staticBinding.VolumeResource() + if _, ok := requestedClassToValueMap[class]; !ok { + classToWeight[class] = 1 // in alpha stage, all classes have the same weight + } + requestedClassToValueMap[class] += volumeResource.Requested + capacityClassToValueMap[class] += volumeResource.Capacity + } + return pl.scorer(requestedClassToValueMap, capacityClassToValueMap, classToWeight), nil +} + +// ScoreExtensions of the Score plugin. +func (pl *VolumeBinding) ScoreExtensions() framework.ScoreExtensions { + return nil +} + // Reserve reserves volumes of pod and saves binding status in cycle state. func (pl *VolumeBinding) Reserve(ctx context.Context, cs *framework.CycleState, pod *v1.Pod, nodeName string) *framework.Status { state, err := getStateData(cs) @@ -303,10 +356,24 @@ func New(plArgs runtime.Object, fh framework.Handle) (framework.Plugin, error) { } } binder := scheduling.NewVolumeBinder(fh.ClientSet(), podInformer, nodeInformer, csiNodeInformer, pvcInformer, pvInformer, storageClassInformer, capacityCheck, time.Duration(args.BindTimeoutSeconds)*time.Second) + + // build score function + var scorer volumeCapacityScorer + if utilfeature.DefaultFeatureGate.Enabled(features.VolumeCapacityPriority) { + shape := make(helper.FunctionShape, 0, len(defaultShapePoint)) + for _, point := range defaultShapePoint { + shape = append(shape, helper.FunctionShapePoint{ + Utilization: int64(point.Utilization), + Score: int64(point.Score) * (framework.MaxNodeScore / config.MaxCustomPriorityScore), + }) + } + scorer = buildScorerFunction(shape) + } return &VolumeBinding{ Binder: binder, PVCLister: pvcInformer.Lister(), GenericEphemeralVolumeFeatureEnabled: utilfeature.DefaultFeatureGate.Enabled(features.GenericEphemeralVolume), + scorer: scorer, }, nil }