Prioritizing nodes based on volume capacity

This commit is contained in:
Yecheng Fu 2020-11-08 23:00:25 +08:00
parent 61a44d0dbe
commit 21a43586e7
7 changed files with 210 additions and 44 deletions

View File

@ -82,6 +82,28 @@ type BindingInfo struct {
pv *v1.PersistentVolume pv *v1.PersistentVolume
} }
// StorageClassName returns the name of the storage class.
func (b *BindingInfo) StorageClassName() string {
return b.pv.Spec.StorageClassName
}
// VolumeResource represents volume resource.
type VolumeResource struct {
Requested int64
Capacity int64
}
// VolumeResource returns volume resource.
func (b *BindingInfo) VolumeResource() *VolumeResource {
// both fields are mandatory
requestedQty := b.pvc.Spec.Resources.Requests[v1.ResourceName(v1.ResourceStorage)]
capacitQty := b.pv.Spec.Capacity[v1.ResourceName(v1.ResourceStorage)]
return &VolumeResource{
Requested: requestedQty.Value(),
Capacity: capacitQty.Value(),
}
}
// PodVolumes holds pod's volumes information used in volume scheduling. // PodVolumes holds pod's volumes information used in volume scheduling.
type PodVolumes struct { type PodVolumes struct {
// StaticBindings are binding decisions for PVCs which can be bound to // StaticBindings are binding decisions for PVCs which can be bound to

View File

@ -69,7 +69,7 @@ func ListAlgorithmProviders() string {
} }
func getDefaultConfig() *schedulerapi.Plugins { func getDefaultConfig() *schedulerapi.Plugins {
return &schedulerapi.Plugins{ plugins := &schedulerapi.Plugins{
QueueSort: schedulerapi.PluginSet{ QueueSort: schedulerapi.PluginSet{
Enabled: []schedulerapi.Plugin{ Enabled: []schedulerapi.Plugin{
{Name: queuesort.Name}, {Name: queuesort.Name},
@ -148,6 +148,10 @@ func getDefaultConfig() *schedulerapi.Plugins {
}, },
}, },
} }
if utilfeature.DefaultFeatureGate.Enabled(features.VolumeCapacityPriority) {
plugins.Score.Enabled = append(plugins.Score.Enabled, schedulerapi.Plugin{Name: volumebinding.Name, Weight: 1})
}
return plugins
} }
func getClusterAutoscalerConfig() *schedulerapi.Plugins { func getClusterAutoscalerConfig() *schedulerapi.Plugins {

View File

@ -0,0 +1,51 @@
/*
Copyright 2021 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package helper
// FunctionShape represents a collection of FunctionShapePoint.
type FunctionShape []FunctionShapePoint
// FunctionShapePoint represents a shape point.
type FunctionShapePoint struct {
// Utilization is function argument.
Utilization int64
// Score is function value.
Score int64
}
// BuildBrokenLinearFunction creates a function which is built using linear segments. Segments are defined via shape array.
// Shape[i].Utilization slice represents points on "Utilization" axis where different segments meet.
// Shape[i].Score represents function values at meeting points.
//
// function f(p) is defined as:
// shape[0].Score for p < f[0].Utilization
// shape[i].Score for p == shape[i].Utilization
// shape[n-1].Score for p > shape[n-1].Utilization
// and linear between points (p < shape[i].Utilization)
func BuildBrokenLinearFunction(shape FunctionShape) func(int64) int64 {
return func(p int64) int64 {
for i := 0; i < len(shape); i++ {
if p <= int64(shape[i].Utilization) {
if i == 0 {
return shape[0].Score
}
return shape[i-1].Score + (shape[i].Score-shape[i-1].Score)*(p-shape[i-1].Utilization)/(shape[i].Utilization-shape[i-1].Utilization)
}
}
return shape[len(shape)-1].Score
}
}

View File

@ -26,6 +26,7 @@ import (
"k8s.io/kubernetes/pkg/scheduler/apis/config" "k8s.io/kubernetes/pkg/scheduler/apis/config"
"k8s.io/kubernetes/pkg/scheduler/apis/config/validation" "k8s.io/kubernetes/pkg/scheduler/apis/config/validation"
"k8s.io/kubernetes/pkg/scheduler/framework" "k8s.io/kubernetes/pkg/scheduler/framework"
"k8s.io/kubernetes/pkg/scheduler/framework/plugins/helper"
) )
const ( const (
@ -34,15 +35,6 @@ const (
maxUtilization = 100 maxUtilization = 100
) )
type functionShape []functionShapePoint
type functionShapePoint struct {
// utilization is function argument.
utilization int64
// score is function value.
score int64
}
// NewRequestedToCapacityRatio initializes a new plugin and returns it. // NewRequestedToCapacityRatio initializes a new plugin and returns it.
func NewRequestedToCapacityRatio(plArgs runtime.Object, handle framework.Handle) (framework.Plugin, error) { func NewRequestedToCapacityRatio(plArgs runtime.Object, handle framework.Handle) (framework.Plugin, error) {
args, err := getRequestedToCapacityRatioArgs(plArgs) args, err := getRequestedToCapacityRatioArgs(plArgs)
@ -54,14 +46,14 @@ func NewRequestedToCapacityRatio(plArgs runtime.Object, handle framework.Handle)
return nil, err return nil, err
} }
shape := make([]functionShapePoint, 0, len(args.Shape)) shape := make([]helper.FunctionShapePoint, 0, len(args.Shape))
for _, point := range args.Shape { for _, point := range args.Shape {
shape = append(shape, functionShapePoint{ shape = append(shape, helper.FunctionShapePoint{
utilization: int64(point.Utilization), Utilization: int64(point.Utilization),
// MaxCustomPriorityScore may diverge from the max score used in the scheduler and defined by MaxNodeScore, // MaxCustomPriorityScore may diverge from the max score used in the scheduler and defined by MaxNodeScore,
// therefore we need to scale the score returned by requested to capacity ratio to the score range // therefore we need to scale the score returned by requested to capacity ratio to the score range
// used by the scheduler. // used by the scheduler.
score: int64(point.Score) * (framework.MaxNodeScore / config.MaxCustomPriorityScore), Score: int64(point.Score) * (framework.MaxNodeScore / config.MaxCustomPriorityScore),
}) })
} }
@ -120,8 +112,8 @@ func (pl *RequestedToCapacityRatio) ScoreExtensions() framework.ScoreExtensions
return nil return nil
} }
func buildRequestedToCapacityRatioScorerFunction(scoringFunctionShape functionShape, resourceToWeightMap resourceToWeightMap) func(resourceToValueMap, resourceToValueMap, bool, int, int) int64 { func buildRequestedToCapacityRatioScorerFunction(scoringFunctionShape helper.FunctionShape, resourceToWeightMap resourceToWeightMap) func(resourceToValueMap, resourceToValueMap, bool, int, int) int64 {
rawScoringFunction := buildBrokenLinearFunction(scoringFunctionShape) rawScoringFunction := helper.BuildBrokenLinearFunction(scoringFunctionShape)
resourceScoringFunction := func(requested, capacity int64) int64 { resourceScoringFunction := func(requested, capacity int64) int64 {
if capacity == 0 || requested > capacity { if capacity == 0 || requested > capacity {
return rawScoringFunction(maxUtilization) return rawScoringFunction(maxUtilization)
@ -144,26 +136,3 @@ func buildRequestedToCapacityRatioScorerFunction(scoringFunctionShape functionSh
return int64(math.Round(float64(nodeScore) / float64(weightSum))) return int64(math.Round(float64(nodeScore) / float64(weightSum)))
} }
} }
// Creates a function which is built using linear segments. Segments are defined via shape array.
// Shape[i].utilization slice represents points on "utilization" axis where different segments meet.
// Shape[i].score represents function values at meeting points.
//
// function f(p) is defined as:
// shape[0].score for p < f[0].utilization
// shape[i].score for p == shape[i].utilization
// shape[n-1].score for p > shape[n-1].utilization
// and linear between points (p < shape[i].utilization)
func buildBrokenLinearFunction(shape functionShape) func(int64) int64 {
return func(p int64) int64 {
for i := 0; i < len(shape); i++ {
if p <= int64(shape[i].utilization) {
if i == 0 {
return shape[0].score
}
return shape[i-1].score + (shape[i].score-shape[i-1].score)*(p-shape[i-1].utilization)/(shape[i].utilization-shape[i-1].utilization)
}
}
return shape[len(shape)-1].score
}
}

View File

@ -27,6 +27,7 @@ import (
"k8s.io/apimachinery/pkg/api/resource" "k8s.io/apimachinery/pkg/api/resource"
"k8s.io/kubernetes/pkg/scheduler/apis/config" "k8s.io/kubernetes/pkg/scheduler/apis/config"
"k8s.io/kubernetes/pkg/scheduler/framework" "k8s.io/kubernetes/pkg/scheduler/framework"
"k8s.io/kubernetes/pkg/scheduler/framework/plugins/helper"
"k8s.io/kubernetes/pkg/scheduler/framework/runtime" "k8s.io/kubernetes/pkg/scheduler/framework/runtime"
"k8s.io/kubernetes/pkg/scheduler/internal/cache" "k8s.io/kubernetes/pkg/scheduler/internal/cache"
) )
@ -124,13 +125,13 @@ func TestBrokenLinearFunction(t *testing.T) {
expected int64 expected int64
} }
type Test struct { type Test struct {
points []functionShapePoint points []helper.FunctionShapePoint
assertions []Assertion assertions []Assertion
} }
tests := []Test{ tests := []Test{
{ {
points: []functionShapePoint{{10, 1}, {90, 9}}, points: []helper.FunctionShapePoint{{Utilization: 10, Score: 1}, {Utilization: 90, Score: 9}},
assertions: []Assertion{ assertions: []Assertion{
{p: -10, expected: 1}, {p: -10, expected: 1},
{p: 0, expected: 1}, {p: 0, expected: 1},
@ -147,7 +148,7 @@ func TestBrokenLinearFunction(t *testing.T) {
}, },
}, },
{ {
points: []functionShapePoint{{0, 2}, {40, 10}, {100, 0}}, points: []helper.FunctionShapePoint{{Utilization: 0, Score: 2}, {Utilization: 40, Score: 10}, {Utilization: 100, Score: 0}},
assertions: []Assertion{ assertions: []Assertion{
{p: -10, expected: 2}, {p: -10, expected: 2},
{p: 0, expected: 2}, {p: 0, expected: 2},
@ -160,7 +161,7 @@ func TestBrokenLinearFunction(t *testing.T) {
}, },
}, },
{ {
points: []functionShapePoint{{0, 2}, {40, 2}, {100, 2}}, points: []helper.FunctionShapePoint{{Utilization: 0, Score: 2}, {Utilization: 40, Score: 2}, {Utilization: 100, Score: 2}},
assertions: []Assertion{ assertions: []Assertion{
{p: -10, expected: 2}, {p: -10, expected: 2},
{p: 0, expected: 2}, {p: 0, expected: 2},
@ -176,7 +177,7 @@ func TestBrokenLinearFunction(t *testing.T) {
for i, test := range tests { for i, test := range tests {
t.Run(fmt.Sprintf("case_%d", i), func(t *testing.T) { t.Run(fmt.Sprintf("case_%d", i), func(t *testing.T) {
function := buildBrokenLinearFunction(test.points) function := helper.BuildBrokenLinearFunction(test.points)
for _, assertion := range test.assertions { for _, assertion := range test.assertions {
assert.InDelta(t, assertion.expected, function(assertion.p), 0.1, "points=%v, p=%f", test.points, assertion.p) assert.InDelta(t, assertion.expected, function(assertion.p), 0.1, "points=%v, p=%f", test.points, assertion.p)
} }

View File

@ -0,0 +1,52 @@
/*
Copyright 2021 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package volumebinding
import (
"math"
"k8s.io/kubernetes/pkg/controller/volume/scheduling"
"k8s.io/kubernetes/pkg/scheduler/framework/plugins/helper"
)
type classResourceMap map[string]*scheduling.VolumeResource
type volumeCapacityScorer func(classResourceMap) int64
func buildScorerFunction(scoringFunctionShape helper.FunctionShape) volumeCapacityScorer {
rawScoringFunction := helper.BuildBrokenLinearFunction(scoringFunctionShape)
f := func(requested, capacity int64) int64 {
if capacity == 0 || requested > capacity {
return rawScoringFunction(maxUtilization)
}
return rawScoringFunction(requested * maxUtilization / capacity)
}
return func(classResources classResourceMap) int64 {
var nodeScore, weightSum int64
for _, resource := range classResources {
classScore := f(resource.Requested, resource.Capacity)
nodeScore += classScore
// in alpha stage, all classes have the same weight
weightSum += 1
}
if weightSum == 0 {
return 0
}
return int64(math.Round(float64(nodeScore) / float64(weightSum)))
}
}

View File

@ -33,6 +33,7 @@ import (
"k8s.io/kubernetes/pkg/features" "k8s.io/kubernetes/pkg/features"
"k8s.io/kubernetes/pkg/scheduler/apis/config" "k8s.io/kubernetes/pkg/scheduler/apis/config"
"k8s.io/kubernetes/pkg/scheduler/framework" "k8s.io/kubernetes/pkg/scheduler/framework"
"k8s.io/kubernetes/pkg/scheduler/framework/plugins/helper"
) )
const ( const (
@ -40,6 +41,8 @@ const (
DefaultBindTimeoutSeconds = 600 DefaultBindTimeoutSeconds = 600
stateKey framework.StateKey = Name stateKey framework.StateKey = Name
maxUtilization = 100
) )
// the state is initialized in PreFilter phase. because we save the pointer in // the state is initialized in PreFilter phase. because we save the pointer in
@ -68,12 +71,14 @@ type VolumeBinding struct {
Binder scheduling.SchedulerVolumeBinder Binder scheduling.SchedulerVolumeBinder
PVCLister corelisters.PersistentVolumeClaimLister PVCLister corelisters.PersistentVolumeClaimLister
GenericEphemeralVolumeFeatureEnabled bool GenericEphemeralVolumeFeatureEnabled bool
scorer volumeCapacityScorer
} }
var _ framework.PreFilterPlugin = &VolumeBinding{} var _ framework.PreFilterPlugin = &VolumeBinding{}
var _ framework.FilterPlugin = &VolumeBinding{} var _ framework.FilterPlugin = &VolumeBinding{}
var _ framework.ReservePlugin = &VolumeBinding{} var _ framework.ReservePlugin = &VolumeBinding{}
var _ framework.PreBindPlugin = &VolumeBinding{} var _ framework.PreBindPlugin = &VolumeBinding{}
var _ framework.ScorePlugin = &VolumeBinding{}
// Name is the name of the plugin used in Registry and configurations. // Name is the name of the plugin used in Registry and configurations.
const Name = "VolumeBinding" const Name = "VolumeBinding"
@ -214,6 +219,54 @@ func (pl *VolumeBinding) Filter(ctx context.Context, cs *framework.CycleState, p
return nil return nil
} }
var (
// TODO (for alpha) make it configurable in config.VolumeBindingArgs
defaultShapePoint = []config.UtilizationShapePoint{
{
Utilization: 0,
Score: 0,
},
{
Utilization: 100,
Score: int32(config.MaxCustomPriorityScore),
},
}
)
// Score invoked at the score extension point.
func (pl *VolumeBinding) Score(ctx context.Context, cs *framework.CycleState, pod *v1.Pod, nodeName string) (int64, *framework.Status) {
if pl.scorer == nil {
return 0, nil
}
state, err := getStateData(cs)
if err != nil {
return 0, framework.AsStatus(err)
}
podVolumes, ok := state.podVolumesByNode[nodeName]
if !ok {
return 0, nil
}
// group by storage class
classToWeight := make(classToWeightMap)
requestedClassToValueMap := make(map[string]int64)
capacityClassToValueMap := make(map[string]int64)
for _, staticBinding := range podVolumes.StaticBindings {
class := staticBinding.StorageClassName()
volumeResource := staticBinding.VolumeResource()
if _, ok := requestedClassToValueMap[class]; !ok {
classToWeight[class] = 1 // in alpha stage, all classes have the same weight
}
requestedClassToValueMap[class] += volumeResource.Requested
capacityClassToValueMap[class] += volumeResource.Capacity
}
return pl.scorer(requestedClassToValueMap, capacityClassToValueMap, classToWeight), nil
}
// ScoreExtensions of the Score plugin.
func (pl *VolumeBinding) ScoreExtensions() framework.ScoreExtensions {
return nil
}
// Reserve reserves volumes of pod and saves binding status in cycle state. // Reserve reserves volumes of pod and saves binding status in cycle state.
func (pl *VolumeBinding) Reserve(ctx context.Context, cs *framework.CycleState, pod *v1.Pod, nodeName string) *framework.Status { func (pl *VolumeBinding) Reserve(ctx context.Context, cs *framework.CycleState, pod *v1.Pod, nodeName string) *framework.Status {
state, err := getStateData(cs) state, err := getStateData(cs)
@ -303,10 +356,24 @@ func New(plArgs runtime.Object, fh framework.Handle) (framework.Plugin, error) {
} }
} }
binder := scheduling.NewVolumeBinder(fh.ClientSet(), podInformer, nodeInformer, csiNodeInformer, pvcInformer, pvInformer, storageClassInformer, capacityCheck, time.Duration(args.BindTimeoutSeconds)*time.Second) binder := scheduling.NewVolumeBinder(fh.ClientSet(), podInformer, nodeInformer, csiNodeInformer, pvcInformer, pvInformer, storageClassInformer, capacityCheck, time.Duration(args.BindTimeoutSeconds)*time.Second)
// build score function
var scorer volumeCapacityScorer
if utilfeature.DefaultFeatureGate.Enabled(features.VolumeCapacityPriority) {
shape := make(helper.FunctionShape, 0, len(defaultShapePoint))
for _, point := range defaultShapePoint {
shape = append(shape, helper.FunctionShapePoint{
Utilization: int64(point.Utilization),
Score: int64(point.Score) * (framework.MaxNodeScore / config.MaxCustomPriorityScore),
})
}
scorer = buildScorerFunction(shape)
}
return &VolumeBinding{ return &VolumeBinding{
Binder: binder, Binder: binder,
PVCLister: pvcInformer.Lister(), PVCLister: pvcInformer.Lister(),
GenericEphemeralVolumeFeatureEnabled: utilfeature.DefaultFeatureGate.Enabled(features.GenericEphemeralVolume), GenericEphemeralVolumeFeatureEnabled: utilfeature.DefaultFeatureGate.Enabled(features.GenericEphemeralVolume),
scorer: scorer,
}, nil }, nil
} }