Merge pull request #96347 from cofyc/kep1845

Prioritizing nodes based on volume capacity
This commit is contained in:
Kubernetes Prow Robot 2021-03-05 14:45:43 -08:00 committed by GitHub
commit ed6b9addbf
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
12 changed files with 1222 additions and 116 deletions

View File

@ -82,6 +82,28 @@ type BindingInfo struct {
pv *v1.PersistentVolume
}
// StorageClassName returns the name of the storage class.
func (b *BindingInfo) StorageClassName() string {
return b.pv.Spec.StorageClassName
}
// StorageResource represents storage resource.
type StorageResource struct {
Requested int64
Capacity int64
}
// StorageResource returns storage resource.
func (b *BindingInfo) StorageResource() *StorageResource {
// both fields are mandatory
requestedQty := b.pvc.Spec.Resources.Requests[v1.ResourceName(v1.ResourceStorage)]
capacityQty := b.pv.Spec.Capacity[v1.ResourceName(v1.ResourceStorage)]
return &StorageResource{
Requested: requestedQty.Value(),
Capacity: capacityQty.Value(),
}
}
// PodVolumes holds pod's volumes information used in volume scheduling.
type PodVolumes struct {
// StaticBindings are binding decisions for PVCs which can be bound to

View File

@ -672,6 +672,10 @@ const (
// Enables the usage of different protocols in the same Service with type=LoadBalancer
MixedProtocolLBService featuregate.Feature = "MixedProtocolLBService"
// owner: @cofyc
// alpha: v1.21
VolumeCapacityPriority featuregate.Feature = "VolumeCapacityPriority"
// owner: @ahg-g
// alpha: v1.21
//
@ -786,6 +790,7 @@ var defaultKubernetesFeatureGates = map[featuregate.Feature]featuregate.FeatureS
GracefulNodeShutdown: {Default: false, PreRelease: featuregate.Alpha},
ServiceLBNodePortControl: {Default: false, PreRelease: featuregate.Alpha},
MixedProtocolLBService: {Default: false, PreRelease: featuregate.Alpha},
VolumeCapacityPriority: {Default: false, PreRelease: featuregate.Alpha},
PreferNominatedNode: {Default: false, PreRelease: featuregate.Alpha},
RunAsGroup: {Default: true, PreRelease: featuregate.GA, LockToDefault: true}, // remove in 1.22
PodDeletionCost: {Default: false, PreRelease: featuregate.Alpha},

View File

@ -69,7 +69,7 @@ func ListAlgorithmProviders() string {
}
func getDefaultConfig() *schedulerapi.Plugins {
return &schedulerapi.Plugins{
plugins := &schedulerapi.Plugins{
QueueSort: schedulerapi.PluginSet{
Enabled: []schedulerapi.Plugin{
{Name: queuesort.Name},
@ -148,6 +148,10 @@ func getDefaultConfig() *schedulerapi.Plugins {
},
},
}
if utilfeature.DefaultFeatureGate.Enabled(features.VolumeCapacityPriority) {
plugins.Score.Enabled = append(plugins.Score.Enabled, schedulerapi.Plugin{Name: volumebinding.Name, Weight: 1})
}
return plugins
}
func getClusterAutoscalerConfig() *schedulerapi.Plugins {

View File

@ -0,0 +1,51 @@
/*
Copyright 2021 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package helper
// FunctionShape represents a collection of FunctionShapePoint.
type FunctionShape []FunctionShapePoint
// FunctionShapePoint represents a shape point.
type FunctionShapePoint struct {
// Utilization is function argument.
Utilization int64
// Score is function value.
Score int64
}
// BuildBrokenLinearFunction creates a function which is built using linear segments. Segments are defined via shape array.
// Shape[i].Utilization slice represents points on "Utilization" axis where different segments meet.
// Shape[i].Score represents function values at meeting points.
//
// function f(p) is defined as:
// shape[0].Score for p < f[0].Utilization
// shape[i].Score for p == shape[i].Utilization
// shape[n-1].Score for p > shape[n-1].Utilization
// and linear between points (p < shape[i].Utilization)
func BuildBrokenLinearFunction(shape FunctionShape) func(int64) int64 {
return func(p int64) int64 {
for i := 0; i < len(shape); i++ {
if p <= int64(shape[i].Utilization) {
if i == 0 {
return shape[0].Score
}
return shape[i-1].Score + (shape[i].Score-shape[i-1].Score)*(p-shape[i-1].Utilization)/(shape[i].Utilization-shape[i-1].Utilization)
}
}
return shape[len(shape)-1].Score
}
}

View File

@ -26,6 +26,7 @@ import (
"k8s.io/kubernetes/pkg/scheduler/apis/config"
"k8s.io/kubernetes/pkg/scheduler/apis/config/validation"
"k8s.io/kubernetes/pkg/scheduler/framework"
"k8s.io/kubernetes/pkg/scheduler/framework/plugins/helper"
)
const (
@ -34,15 +35,6 @@ const (
maxUtilization = 100
)
type functionShape []functionShapePoint
type functionShapePoint struct {
// utilization is function argument.
utilization int64
// score is function value.
score int64
}
// NewRequestedToCapacityRatio initializes a new plugin and returns it.
func NewRequestedToCapacityRatio(plArgs runtime.Object, handle framework.Handle) (framework.Plugin, error) {
args, err := getRequestedToCapacityRatioArgs(plArgs)
@ -54,14 +46,14 @@ func NewRequestedToCapacityRatio(plArgs runtime.Object, handle framework.Handle)
return nil, err
}
shape := make([]functionShapePoint, 0, len(args.Shape))
shape := make([]helper.FunctionShapePoint, 0, len(args.Shape))
for _, point := range args.Shape {
shape = append(shape, functionShapePoint{
utilization: int64(point.Utilization),
shape = append(shape, helper.FunctionShapePoint{
Utilization: int64(point.Utilization),
// MaxCustomPriorityScore may diverge from the max score used in the scheduler and defined by MaxNodeScore,
// therefore we need to scale the score returned by requested to capacity ratio to the score range
// used by the scheduler.
score: int64(point.Score) * (framework.MaxNodeScore / config.MaxCustomPriorityScore),
Score: int64(point.Score) * (framework.MaxNodeScore / config.MaxCustomPriorityScore),
})
}
@ -120,8 +112,8 @@ func (pl *RequestedToCapacityRatio) ScoreExtensions() framework.ScoreExtensions
return nil
}
func buildRequestedToCapacityRatioScorerFunction(scoringFunctionShape functionShape, resourceToWeightMap resourceToWeightMap) func(resourceToValueMap, resourceToValueMap, bool, int, int) int64 {
rawScoringFunction := buildBrokenLinearFunction(scoringFunctionShape)
func buildRequestedToCapacityRatioScorerFunction(scoringFunctionShape helper.FunctionShape, resourceToWeightMap resourceToWeightMap) func(resourceToValueMap, resourceToValueMap, bool, int, int) int64 {
rawScoringFunction := helper.BuildBrokenLinearFunction(scoringFunctionShape)
resourceScoringFunction := func(requested, capacity int64) int64 {
if capacity == 0 || requested > capacity {
return rawScoringFunction(maxUtilization)
@ -144,26 +136,3 @@ func buildRequestedToCapacityRatioScorerFunction(scoringFunctionShape functionSh
return int64(math.Round(float64(nodeScore) / float64(weightSum)))
}
}
// Creates a function which is built using linear segments. Segments are defined via shape array.
// Shape[i].utilization slice represents points on "utilization" axis where different segments meet.
// Shape[i].score represents function values at meeting points.
//
// function f(p) is defined as:
// shape[0].score for p < f[0].utilization
// shape[i].score for p == shape[i].utilization
// shape[n-1].score for p > shape[n-1].utilization
// and linear between points (p < shape[i].utilization)
func buildBrokenLinearFunction(shape functionShape) func(int64) int64 {
return func(p int64) int64 {
for i := 0; i < len(shape); i++ {
if p <= int64(shape[i].utilization) {
if i == 0 {
return shape[0].score
}
return shape[i-1].score + (shape[i].score-shape[i-1].score)*(p-shape[i-1].utilization)/(shape[i].utilization-shape[i-1].utilization)
}
}
return shape[len(shape)-1].score
}
}

View File

@ -27,6 +27,7 @@ import (
"k8s.io/apimachinery/pkg/api/resource"
"k8s.io/kubernetes/pkg/scheduler/apis/config"
"k8s.io/kubernetes/pkg/scheduler/framework"
"k8s.io/kubernetes/pkg/scheduler/framework/plugins/helper"
"k8s.io/kubernetes/pkg/scheduler/framework/runtime"
"k8s.io/kubernetes/pkg/scheduler/internal/cache"
)
@ -124,13 +125,13 @@ func TestBrokenLinearFunction(t *testing.T) {
expected int64
}
type Test struct {
points []functionShapePoint
points []helper.FunctionShapePoint
assertions []Assertion
}
tests := []Test{
{
points: []functionShapePoint{{10, 1}, {90, 9}},
points: []helper.FunctionShapePoint{{Utilization: 10, Score: 1}, {Utilization: 90, Score: 9}},
assertions: []Assertion{
{p: -10, expected: 1},
{p: 0, expected: 1},
@ -147,7 +148,7 @@ func TestBrokenLinearFunction(t *testing.T) {
},
},
{
points: []functionShapePoint{{0, 2}, {40, 10}, {100, 0}},
points: []helper.FunctionShapePoint{{Utilization: 0, Score: 2}, {Utilization: 40, Score: 10}, {Utilization: 100, Score: 0}},
assertions: []Assertion{
{p: -10, expected: 2},
{p: 0, expected: 2},
@ -160,7 +161,7 @@ func TestBrokenLinearFunction(t *testing.T) {
},
},
{
points: []functionShapePoint{{0, 2}, {40, 2}, {100, 2}},
points: []helper.FunctionShapePoint{{Utilization: 0, Score: 2}, {Utilization: 40, Score: 2}, {Utilization: 100, Score: 2}},
assertions: []Assertion{
{p: -10, expected: 2},
{p: 0, expected: 2},
@ -176,7 +177,7 @@ func TestBrokenLinearFunction(t *testing.T) {
for i, test := range tests {
t.Run(fmt.Sprintf("case_%d", i), func(t *testing.T) {
function := buildBrokenLinearFunction(test.points)
function := helper.BuildBrokenLinearFunction(test.points)
for _, assertion := range test.assertions {
assert.InDelta(t, assertion.expected, function(assertion.p), 0.1, "points=%v, p=%f", test.points, assertion.p)
}

View File

@ -0,0 +1,55 @@
/*
Copyright 2021 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package volumebinding
import (
"math"
"k8s.io/kubernetes/pkg/controller/volume/scheduling"
"k8s.io/kubernetes/pkg/scheduler/framework/plugins/helper"
)
// classResourceMap holds a map of storage class to resource.
type classResourceMap map[string]*scheduling.StorageResource
// volumeCapacityScorer calculates the score based on class storage resource information.
type volumeCapacityScorer func(classResourceMap) int64
// buildScorerFunction builds volumeCapacityScorer from the scoring function shape.
func buildScorerFunction(scoringFunctionShape helper.FunctionShape) volumeCapacityScorer {
rawScoringFunction := helper.BuildBrokenLinearFunction(scoringFunctionShape)
f := func(requested, capacity int64) int64 {
if capacity == 0 || requested > capacity {
return rawScoringFunction(maxUtilization)
}
return rawScoringFunction(requested * maxUtilization / capacity)
}
return func(classResources classResourceMap) int64 {
var nodeScore int64
// in alpha stage, all classes have the same weight
weightSum := len(classResources)
if weightSum == 0 {
return 0
}
for _, resource := range classResources {
classScore := f(resource.Requested, resource.Capacity)
nodeScore += classScore
}
return int64(math.Round(float64(nodeScore) / float64(weightSum)))
}
}

View File

@ -0,0 +1,312 @@
/*
Copyright 2021 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package volumebinding
import (
"testing"
"k8s.io/kubernetes/pkg/controller/volume/scheduling"
"k8s.io/kubernetes/pkg/scheduler/apis/config"
"k8s.io/kubernetes/pkg/scheduler/framework"
"k8s.io/kubernetes/pkg/scheduler/framework/plugins/helper"
)
const (
classHDD = "hdd"
classSSD = "ssd"
)
func TestScore(t *testing.T) {
defaultShape := make(helper.FunctionShape, 0, len(defaultShapePoint))
for _, point := range defaultShapePoint {
defaultShape = append(defaultShape, helper.FunctionShapePoint{
Utilization: int64(point.Utilization),
Score: int64(point.Score) * (framework.MaxNodeScore / config.MaxCustomPriorityScore),
})
}
type scoreCase struct {
classResources classResourceMap
score int64
}
tests := []struct {
name string
shape helper.FunctionShape
cases []scoreCase
}{
{
name: "default shape, single class",
shape: defaultShape,
cases: []scoreCase{
{
classResourceMap{
classHDD: &scheduling.StorageResource{
Requested: 0,
Capacity: 100,
},
},
0,
},
{
classResourceMap{
classHDD: &scheduling.StorageResource{
Requested: 30,
Capacity: 100,
},
},
30,
},
{
classResourceMap{
classHDD: &scheduling.StorageResource{
Requested: 50,
Capacity: 100,
},
},
50,
},
{
classResourceMap{
classHDD: &scheduling.StorageResource{
Requested: 100,
Capacity: 100,
},
},
100,
},
},
},
{
name: "default shape, multiple classes",
shape: defaultShape,
cases: []scoreCase{
{
classResourceMap{
classHDD: &scheduling.StorageResource{
Requested: 0,
Capacity: 100,
},
classSSD: &scheduling.StorageResource{
Requested: 0,
Capacity: 100,
},
},
0,
},
{
classResourceMap{
classHDD: &scheduling.StorageResource{
Requested: 0,
Capacity: 100,
},
classSSD: &scheduling.StorageResource{
Requested: 30,
Capacity: 100,
},
},
15,
},
{
classResourceMap{
classHDD: &scheduling.StorageResource{
Requested: 30,
Capacity: 100,
},
classSSD: &scheduling.StorageResource{
Requested: 30,
Capacity: 100,
},
},
30,
},
{
classResourceMap{
classHDD: &scheduling.StorageResource{
Requested: 30,
Capacity: 100,
},
classSSD: &scheduling.StorageResource{
Requested: 60,
Capacity: 100,
},
},
45,
},
{
classResourceMap{
classHDD: &scheduling.StorageResource{
Requested: 50,
Capacity: 100,
},
classSSD: &scheduling.StorageResource{
Requested: 50,
Capacity: 100,
},
},
50,
},
{
classResourceMap{
classHDD: &scheduling.StorageResource{
Requested: 50,
Capacity: 100,
},
classSSD: &scheduling.StorageResource{
Requested: 100,
Capacity: 100,
},
},
75,
},
{
classResourceMap{
classHDD: &scheduling.StorageResource{
Requested: 100,
Capacity: 100,
},
classSSD: &scheduling.StorageResource{
Requested: 100,
Capacity: 100,
},
},
100,
},
},
},
{
name: "custom shape, multiple classes",
shape: helper.FunctionShape{
{
Utilization: 50,
Score: 0,
},
{
Utilization: 80,
Score: 30,
},
{
Utilization: 100,
Score: 50,
},
},
cases: []scoreCase{
{
classResourceMap{
classHDD: &scheduling.StorageResource{
Requested: 0,
Capacity: 100,
},
classSSD: &scheduling.StorageResource{
Requested: 0,
Capacity: 100,
},
},
0,
},
{
classResourceMap{
classHDD: &scheduling.StorageResource{
Requested: 0,
Capacity: 100,
},
classSSD: &scheduling.StorageResource{
Requested: 30,
Capacity: 100,
},
},
0,
},
{
classResourceMap{
classHDD: &scheduling.StorageResource{
Requested: 30,
Capacity: 100,
},
classSSD: &scheduling.StorageResource{
Requested: 30,
Capacity: 100,
},
},
0,
},
{
classResourceMap{
classHDD: &scheduling.StorageResource{
Requested: 30,
Capacity: 100,
},
classSSD: &scheduling.StorageResource{
Requested: 60,
Capacity: 100,
},
},
5,
},
{
classResourceMap{
classHDD: &scheduling.StorageResource{
Requested: 50,
Capacity: 100,
},
classSSD: &scheduling.StorageResource{
Requested: 100,
Capacity: 100,
},
},
25,
},
{
classResourceMap{
classHDD: &scheduling.StorageResource{
Requested: 90,
Capacity: 100,
},
classSSD: &scheduling.StorageResource{
Requested: 90,
Capacity: 100,
},
},
40,
},
{
classResourceMap{
classHDD: &scheduling.StorageResource{
Requested: 100,
Capacity: 100,
},
classSSD: &scheduling.StorageResource{
Requested: 100,
Capacity: 100,
},
},
50,
},
},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
f := buildScorerFunction(tt.shape)
for _, c := range tt.cases {
gotScore := f(c.classResources)
if gotScore != c.score {
t.Errorf("Expect %d, but got %d", c.score, gotScore)
}
}
})
}
}

View File

@ -33,6 +33,7 @@ import (
"k8s.io/kubernetes/pkg/features"
"k8s.io/kubernetes/pkg/scheduler/apis/config"
"k8s.io/kubernetes/pkg/scheduler/framework"
"k8s.io/kubernetes/pkg/scheduler/framework/plugins/helper"
)
const (
@ -40,6 +41,8 @@ const (
DefaultBindTimeoutSeconds = 600
stateKey framework.StateKey = Name
maxUtilization = 100
)
// the state is initialized in PreFilter phase. because we save the pointer in
@ -68,12 +71,14 @@ type VolumeBinding struct {
Binder scheduling.SchedulerVolumeBinder
PVCLister corelisters.PersistentVolumeClaimLister
GenericEphemeralVolumeFeatureEnabled bool
scorer volumeCapacityScorer
}
var _ framework.PreFilterPlugin = &VolumeBinding{}
var _ framework.FilterPlugin = &VolumeBinding{}
var _ framework.ReservePlugin = &VolumeBinding{}
var _ framework.PreBindPlugin = &VolumeBinding{}
var _ framework.ScorePlugin = &VolumeBinding{}
// Name is the name of the plugin used in Registry and configurations.
const Name = "VolumeBinding"
@ -214,6 +219,55 @@ func (pl *VolumeBinding) Filter(ctx context.Context, cs *framework.CycleState, p
return nil
}
var (
// TODO (for alpha) make it configurable in config.VolumeBindingArgs
defaultShapePoint = []config.UtilizationShapePoint{
{
Utilization: 0,
Score: 0,
},
{
Utilization: 100,
Score: int32(config.MaxCustomPriorityScore),
},
}
)
// Score invoked at the score extension point.
func (pl *VolumeBinding) Score(ctx context.Context, cs *framework.CycleState, pod *v1.Pod, nodeName string) (int64, *framework.Status) {
if pl.scorer == nil {
return 0, nil
}
state, err := getStateData(cs)
if err != nil {
return 0, framework.AsStatus(err)
}
podVolumes, ok := state.podVolumesByNode[nodeName]
if !ok {
return 0, nil
}
// group by storage class
classResources := make(classResourceMap)
for _, staticBinding := range podVolumes.StaticBindings {
class := staticBinding.StorageClassName()
storageResource := staticBinding.StorageResource()
if _, ok := classResources[class]; !ok {
classResources[class] = &scheduling.StorageResource{
Requested: 0,
Capacity: 0,
}
}
classResources[class].Requested += storageResource.Requested
classResources[class].Capacity += storageResource.Capacity
}
return pl.scorer(classResources), nil
}
// ScoreExtensions of the Score plugin.
func (pl *VolumeBinding) ScoreExtensions() framework.ScoreExtensions {
return nil
}
// Reserve reserves volumes of pod and saves binding status in cycle state.
func (pl *VolumeBinding) Reserve(ctx context.Context, cs *framework.CycleState, pod *v1.Pod, nodeName string) *framework.Status {
state, err := getStateData(cs)
@ -303,10 +357,24 @@ func New(plArgs runtime.Object, fh framework.Handle) (framework.Plugin, error) {
}
}
binder := scheduling.NewVolumeBinder(fh.ClientSet(), podInformer, nodeInformer, csiNodeInformer, pvcInformer, pvInformer, storageClassInformer, capacityCheck, time.Duration(args.BindTimeoutSeconds)*time.Second)
// build score function
var scorer volumeCapacityScorer
if utilfeature.DefaultFeatureGate.Enabled(features.VolumeCapacityPriority) {
shape := make(helper.FunctionShape, 0, len(defaultShapePoint))
for _, point := range defaultShapePoint {
shape = append(shape, helper.FunctionShapePoint{
Utilization: int64(point.Utilization),
Score: int64(point.Score) * (framework.MaxNodeScore / config.MaxCustomPriorityScore),
})
}
scorer = buildScorerFunction(shape)
}
return &VolumeBinding{
Binder: binder,
PVCLister: pvcInformer.Lister(),
GenericEphemeralVolumeFeatureEnabled: utilfeature.DefaultFeatureGate.Enabled(features.GenericEphemeralVolume),
scorer: scorer,
}, nil
}

View File

@ -21,13 +21,20 @@ import (
"reflect"
"testing"
"github.com/google/go-cmp/cmp"
"github.com/google/go-cmp/cmp/cmpopts"
v1 "k8s.io/api/core/v1"
storagev1 "k8s.io/api/storage/v1"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
utilfeature "k8s.io/apiserver/pkg/util/feature"
"k8s.io/client-go/informers"
"k8s.io/client-go/kubernetes/fake"
"k8s.io/component-base/featuregate"
featuregatetesting "k8s.io/component-base/featuregate/testing"
pvutil "k8s.io/kubernetes/pkg/controller/volume/persistentvolume/util"
"k8s.io/kubernetes/pkg/controller/volume/scheduling"
"k8s.io/kubernetes/pkg/features"
"k8s.io/kubernetes/pkg/scheduler/apis/config"
"k8s.io/kubernetes/pkg/scheduler/framework"
"k8s.io/kubernetes/pkg/scheduler/framework/runtime"
@ -49,18 +56,71 @@ var (
},
VolumeBindingMode: &waitForFirstConsumer,
}
waitHDDSC = &storagev1.StorageClass{
ObjectMeta: metav1.ObjectMeta{
Name: "wait-hdd-sc",
},
VolumeBindingMode: &waitForFirstConsumer,
}
)
func makePV(name string) *v1.PersistentVolume {
return &v1.PersistentVolume{
func makeNode(name string) *v1.Node {
return &v1.Node{
ObjectMeta: metav1.ObjectMeta{
Name: name,
Labels: map[string]string{
v1.LabelHostname: name,
},
},
}
}
func addPVNodeAffinity(pv *v1.PersistentVolume, volumeNodeAffinity *v1.VolumeNodeAffinity) *v1.PersistentVolume {
pv.Spec.NodeAffinity = volumeNodeAffinity
func mergeNodeLabels(node *v1.Node, labels map[string]string) *v1.Node {
for k, v := range labels {
node.Labels[k] = v
}
return node
}
func makePV(name string, className string) *v1.PersistentVolume {
return &v1.PersistentVolume{
ObjectMeta: metav1.ObjectMeta{
Name: name,
},
Spec: v1.PersistentVolumeSpec{
StorageClassName: className,
},
Status: v1.PersistentVolumeStatus{
Phase: v1.VolumeAvailable,
},
}
}
func setPVNodeAffinity(pv *v1.PersistentVolume, keyValues map[string][]string) *v1.PersistentVolume {
matchExpressions := make([]v1.NodeSelectorRequirement, 0)
for key, values := range keyValues {
matchExpressions = append(matchExpressions, v1.NodeSelectorRequirement{
Key: key,
Operator: v1.NodeSelectorOpIn,
Values: values,
})
}
pv.Spec.NodeAffinity = &v1.VolumeNodeAffinity{
Required: &v1.NodeSelector{
NodeSelectorTerms: []v1.NodeSelectorTerm{
{
MatchExpressions: matchExpressions,
},
},
},
}
return pv
}
func setPVCapacity(pv *v1.PersistentVolume, capacity resource.Quantity) *v1.PersistentVolume {
pv.Spec.Capacity = v1.ResourceList{
v1.ResourceName(v1.ResourceStorage): capacity,
}
return pv
}
@ -81,6 +141,15 @@ func makePVC(name string, boundPVName string, storageClassName string) *v1.Persi
return pvc
}
func setPVCRequestStorage(pvc *v1.PersistentVolumeClaim, request resource.Quantity) *v1.PersistentVolumeClaim {
pvc.Spec.Resources = v1.ResourceRequirements{
Requests: v1.ResourceList{
v1.ResourceName(v1.ResourceStorage): request,
},
}
return pvc
}
func makePod(name string, pvcNames []string) *v1.Pod {
p := &v1.Pod{
ObjectMeta: metav1.ObjectMeta{
@ -105,30 +174,42 @@ func TestVolumeBinding(t *testing.T) {
table := []struct {
name string
pod *v1.Pod
node *v1.Node
nodes []*v1.Node
pvcs []*v1.PersistentVolumeClaim
pvs []*v1.PersistentVolume
feature featuregate.Feature
wantPreFilterStatus *framework.Status
wantStateAfterPreFilter *stateData
wantFilterStatus *framework.Status
wantFilterStatus []*framework.Status
wantScores []int64
}{
{
name: "pod has not pvcs",
pod: makePod("pod-a", nil),
node: &v1.Node{},
nodes: []*v1.Node{
makeNode("node-a"),
},
wantStateAfterPreFilter: &stateData{
skip: true,
},
wantFilterStatus: []*framework.Status{
nil,
},
wantScores: []int64{
0,
},
},
{
name: "all bound",
pod: makePod("pod-a", []string{"pvc-a"}),
node: &v1.Node{},
nodes: []*v1.Node{
makeNode("node-a"),
},
pvcs: []*v1.PersistentVolumeClaim{
makePVC("pvc-a", "pv-a", waitSC.Name),
},
pvs: []*v1.PersistentVolume{
makePV("pv-a"),
makePV("pv-a", waitSC.Name),
},
wantStateAfterPreFilter: &stateData{
boundClaims: []*v1.PersistentVolumeClaim{
@ -137,36 +218,68 @@ func TestVolumeBinding(t *testing.T) {
claimsToBind: []*v1.PersistentVolumeClaim{},
podVolumesByNode: map[string]*scheduling.PodVolumes{},
},
wantFilterStatus: []*framework.Status{
nil,
},
wantScores: []int64{
0,
},
},
{
name: "PVC does not exist",
pod: makePod("pod-a", []string{"pvc-a"}),
node: &v1.Node{},
name: "PVC does not exist",
pod: makePod("pod-a", []string{"pvc-a"}),
nodes: []*v1.Node{
makeNode("node-a"),
},
pvcs: []*v1.PersistentVolumeClaim{},
wantPreFilterStatus: framework.NewStatus(framework.UnschedulableAndUnresolvable, `persistentvolumeclaim "pvc-a" not found`),
wantFilterStatus: []*framework.Status{
nil,
},
wantScores: []int64{
0,
},
},
{
name: "Part of PVCs do not exist",
pod: makePod("pod-a", []string{"pvc-a", "pvc-b"}),
node: &v1.Node{},
nodes: []*v1.Node{
makeNode("node-a"),
},
pvcs: []*v1.PersistentVolumeClaim{
makePVC("pvc-a", "pv-a", waitSC.Name),
},
wantPreFilterStatus: framework.NewStatus(framework.UnschedulableAndUnresolvable, `persistentvolumeclaim "pvc-b" not found`),
wantFilterStatus: []*framework.Status{
nil,
},
wantScores: []int64{
0,
},
},
{
name: "immediate claims not bound",
pod: makePod("pod-a", []string{"pvc-a"}),
node: &v1.Node{},
nodes: []*v1.Node{
makeNode("node-a"),
},
pvcs: []*v1.PersistentVolumeClaim{
makePVC("pvc-a", "", immediateSC.Name),
},
wantPreFilterStatus: framework.NewStatus(framework.UnschedulableAndUnresolvable, "pod has unbound immediate PersistentVolumeClaims"),
wantFilterStatus: []*framework.Status{
nil,
},
wantScores: []int64{
0,
},
},
{
name: "unbound claims no matches",
pod: makePod("pod-a", []string{"pvc-a"}),
node: &v1.Node{},
nodes: []*v1.Node{
makeNode("node-a"),
},
pvcs: []*v1.PersistentVolumeClaim{
makePVC("pvc-a", "", waitSC.Name),
},
@ -177,37 +290,28 @@ func TestVolumeBinding(t *testing.T) {
},
podVolumesByNode: map[string]*scheduling.PodVolumes{},
},
wantFilterStatus: framework.NewStatus(framework.UnschedulableAndUnresolvable, string(scheduling.ErrReasonBindConflict)),
wantFilterStatus: []*framework.Status{
framework.NewStatus(framework.UnschedulableAndUnresolvable, string(scheduling.ErrReasonBindConflict)),
},
wantScores: []int64{
0,
},
},
{
name: "bound and unbound unsatisfied",
pod: makePod("pod-a", []string{"pvc-a", "pvc-b"}),
node: &v1.Node{
ObjectMeta: metav1.ObjectMeta{
Labels: map[string]string{
"foo": "barbar",
},
},
nodes: []*v1.Node{
mergeNodeLabels(makeNode("node-a"), map[string]string{
"foo": "barbar",
}),
},
pvcs: []*v1.PersistentVolumeClaim{
makePVC("pvc-a", "pv-a", waitSC.Name),
makePVC("pvc-b", "", waitSC.Name),
},
pvs: []*v1.PersistentVolume{
addPVNodeAffinity(makePV("pv-a"), &v1.VolumeNodeAffinity{
Required: &v1.NodeSelector{
NodeSelectorTerms: []v1.NodeSelectorTerm{
{
MatchExpressions: []v1.NodeSelectorRequirement{
{
Key: "foo",
Operator: v1.NodeSelectorOpIn,
Values: []string{"bar"},
},
},
},
},
},
setPVNodeAffinity(makePV("pv-a", waitSC.Name), map[string][]string{
"foo": {"bar"},
}),
},
wantStateAfterPreFilter: &stateData{
@ -219,19 +323,33 @@ func TestVolumeBinding(t *testing.T) {
},
podVolumesByNode: map[string]*scheduling.PodVolumes{},
},
wantFilterStatus: framework.NewStatus(framework.UnschedulableAndUnresolvable, string(scheduling.ErrReasonNodeConflict), string(scheduling.ErrReasonBindConflict)),
wantFilterStatus: []*framework.Status{
framework.NewStatus(framework.UnschedulableAndUnresolvable, string(scheduling.ErrReasonNodeConflict), string(scheduling.ErrReasonBindConflict)),
},
wantScores: []int64{
0,
},
},
{
name: "pvc not found",
pod: makePod("pod-a", []string{"pvc-a"}),
node: &v1.Node{},
name: "pvc not found",
pod: makePod("pod-a", []string{"pvc-a"}),
nodes: []*v1.Node{
makeNode("node-a"),
},
wantPreFilterStatus: framework.NewStatus(framework.UnschedulableAndUnresolvable, `persistentvolumeclaim "pvc-a" not found`),
wantFilterStatus: nil,
wantFilterStatus: []*framework.Status{
nil,
},
wantScores: []int64{
0,
},
},
{
name: "pv not found",
pod: makePod("pod-a", []string{"pvc-a"}),
node: &v1.Node{},
nodes: []*v1.Node{
makeNode("node-a"),
},
pvcs: []*v1.PersistentVolumeClaim{
makePVC("pvc-a", "pv-a", waitSC.Name),
},
@ -243,12 +361,176 @@ func TestVolumeBinding(t *testing.T) {
claimsToBind: []*v1.PersistentVolumeClaim{},
podVolumesByNode: map[string]*scheduling.PodVolumes{},
},
wantFilterStatus: framework.NewStatus(framework.UnschedulableAndUnresolvable, `pvc(s) bound to non-existent pv(s)`),
wantFilterStatus: []*framework.Status{
framework.NewStatus(framework.UnschedulableAndUnresolvable, `pvc(s) bound to non-existent pv(s)`),
},
wantScores: []int64{
0,
},
},
{
name: "local volumes with close capacity are preferred",
pod: makePod("pod-a", []string{"pvc-a"}),
nodes: []*v1.Node{
makeNode("node-a"),
makeNode("node-b"),
makeNode("node-c"),
},
pvcs: []*v1.PersistentVolumeClaim{
setPVCRequestStorage(makePVC("pvc-a", "", waitSC.Name), resource.MustParse("50Gi")),
},
pvs: []*v1.PersistentVolume{
setPVNodeAffinity(setPVCapacity(makePV("pv-a-0", waitSC.Name), resource.MustParse("200Gi")), map[string][]string{v1.LabelHostname: {"node-a"}}),
setPVNodeAffinity(setPVCapacity(makePV("pv-a-1", waitSC.Name), resource.MustParse("200Gi")), map[string][]string{v1.LabelHostname: {"node-a"}}),
setPVNodeAffinity(setPVCapacity(makePV("pv-b-0", waitSC.Name), resource.MustParse("100Gi")), map[string][]string{v1.LabelHostname: {"node-b"}}),
setPVNodeAffinity(setPVCapacity(makePV("pv-b-1", waitSC.Name), resource.MustParse("100Gi")), map[string][]string{v1.LabelHostname: {"node-b"}}),
},
feature: features.VolumeCapacityPriority,
wantPreFilterStatus: nil,
wantStateAfterPreFilter: &stateData{
boundClaims: []*v1.PersistentVolumeClaim{},
claimsToBind: []*v1.PersistentVolumeClaim{
setPVCRequestStorage(makePVC("pvc-a", "", waitSC.Name), resource.MustParse("50Gi")),
},
podVolumesByNode: map[string]*scheduling.PodVolumes{},
},
wantFilterStatus: []*framework.Status{
nil,
nil,
framework.NewStatus(framework.UnschedulableAndUnresolvable, `node(s) didn't find available persistent volumes to bind`),
},
wantScores: []int64{
25,
50,
0,
},
},
{
name: "local volumes with close capacity are preferred (multiple pvcs)",
pod: makePod("pod-a", []string{"pvc-0", "pvc-1"}),
nodes: []*v1.Node{
makeNode("node-a"),
makeNode("node-b"),
makeNode("node-c"),
},
pvcs: []*v1.PersistentVolumeClaim{
setPVCRequestStorage(makePVC("pvc-0", "", waitSC.Name), resource.MustParse("50Gi")),
setPVCRequestStorage(makePVC("pvc-1", "", waitHDDSC.Name), resource.MustParse("100Gi")),
},
pvs: []*v1.PersistentVolume{
setPVNodeAffinity(setPVCapacity(makePV("pv-a-0", waitSC.Name), resource.MustParse("200Gi")), map[string][]string{v1.LabelHostname: {"node-a"}}),
setPVNodeAffinity(setPVCapacity(makePV("pv-a-1", waitSC.Name), resource.MustParse("200Gi")), map[string][]string{v1.LabelHostname: {"node-a"}}),
setPVNodeAffinity(setPVCapacity(makePV("pv-a-2", waitHDDSC.Name), resource.MustParse("200Gi")), map[string][]string{v1.LabelHostname: {"node-a"}}),
setPVNodeAffinity(setPVCapacity(makePV("pv-a-3", waitHDDSC.Name), resource.MustParse("200Gi")), map[string][]string{v1.LabelHostname: {"node-a"}}),
setPVNodeAffinity(setPVCapacity(makePV("pv-b-0", waitSC.Name), resource.MustParse("100Gi")), map[string][]string{v1.LabelHostname: {"node-b"}}),
setPVNodeAffinity(setPVCapacity(makePV("pv-b-1", waitSC.Name), resource.MustParse("100Gi")), map[string][]string{v1.LabelHostname: {"node-b"}}),
setPVNodeAffinity(setPVCapacity(makePV("pv-b-2", waitHDDSC.Name), resource.MustParse("100Gi")), map[string][]string{v1.LabelHostname: {"node-b"}}),
setPVNodeAffinity(setPVCapacity(makePV("pv-b-3", waitHDDSC.Name), resource.MustParse("100Gi")), map[string][]string{v1.LabelHostname: {"node-b"}}),
},
feature: features.VolumeCapacityPriority,
wantPreFilterStatus: nil,
wantStateAfterPreFilter: &stateData{
boundClaims: []*v1.PersistentVolumeClaim{},
claimsToBind: []*v1.PersistentVolumeClaim{
setPVCRequestStorage(makePVC("pvc-0", "", waitSC.Name), resource.MustParse("50Gi")),
setPVCRequestStorage(makePVC("pvc-1", "", waitHDDSC.Name), resource.MustParse("100Gi")),
},
podVolumesByNode: map[string]*scheduling.PodVolumes{},
},
wantFilterStatus: []*framework.Status{
nil,
nil,
framework.NewStatus(framework.UnschedulableAndUnresolvable, `node(s) didn't find available persistent volumes to bind`),
},
wantScores: []int64{
38,
75,
0,
},
},
{
name: "zonal volumes with close capacity are preferred",
pod: makePod("pod-a", []string{"pvc-a"}),
nodes: []*v1.Node{
mergeNodeLabels(makeNode("zone-a-node-a"), map[string]string{
"topology.kubernetes.io/region": "region-a",
"topology.kubernetes.io/zone": "zone-a",
}),
mergeNodeLabels(makeNode("zone-a-node-b"), map[string]string{
"topology.kubernetes.io/region": "region-a",
"topology.kubernetes.io/zone": "zone-a",
}),
mergeNodeLabels(makeNode("zone-b-node-a"), map[string]string{
"topology.kubernetes.io/region": "region-b",
"topology.kubernetes.io/zone": "zone-b",
}),
mergeNodeLabels(makeNode("zone-b-node-b"), map[string]string{
"topology.kubernetes.io/region": "region-b",
"topology.kubernetes.io/zone": "zone-b",
}),
mergeNodeLabels(makeNode("zone-c-node-a"), map[string]string{
"topology.kubernetes.io/region": "region-c",
"topology.kubernetes.io/zone": "zone-c",
}),
mergeNodeLabels(makeNode("zone-c-node-b"), map[string]string{
"topology.kubernetes.io/region": "region-c",
"topology.kubernetes.io/zone": "zone-c",
}),
},
pvcs: []*v1.PersistentVolumeClaim{
setPVCRequestStorage(makePVC("pvc-a", "", waitSC.Name), resource.MustParse("50Gi")),
},
pvs: []*v1.PersistentVolume{
setPVNodeAffinity(setPVCapacity(makePV("pv-a-0", waitSC.Name), resource.MustParse("200Gi")), map[string][]string{
"topology.kubernetes.io/region": {"region-a"},
"topology.kubernetes.io/zone": {"zone-a"},
}),
setPVNodeAffinity(setPVCapacity(makePV("pv-a-1", waitSC.Name), resource.MustParse("200Gi")), map[string][]string{
"topology.kubernetes.io/region": {"region-a"},
"topology.kubernetes.io/zone": {"zone-a"},
}),
setPVNodeAffinity(setPVCapacity(makePV("pv-b-0", waitSC.Name), resource.MustParse("100Gi")), map[string][]string{
"topology.kubernetes.io/region": {"region-b"},
"topology.kubernetes.io/zone": {"zone-b"},
}),
setPVNodeAffinity(setPVCapacity(makePV("pv-b-1", waitSC.Name), resource.MustParse("100Gi")), map[string][]string{
"topology.kubernetes.io/region": {"region-b"},
"topology.kubernetes.io/zone": {"zone-b"},
}),
},
feature: features.VolumeCapacityPriority,
wantPreFilterStatus: nil,
wantStateAfterPreFilter: &stateData{
boundClaims: []*v1.PersistentVolumeClaim{},
claimsToBind: []*v1.PersistentVolumeClaim{
setPVCRequestStorage(makePVC("pvc-a", "", waitSC.Name), resource.MustParse("50Gi")),
},
podVolumesByNode: map[string]*scheduling.PodVolumes{},
},
wantFilterStatus: []*framework.Status{
nil,
nil,
nil,
nil,
framework.NewStatus(framework.UnschedulableAndUnresolvable, `node(s) didn't find available persistent volumes to bind`),
framework.NewStatus(framework.UnschedulableAndUnresolvable, `node(s) didn't find available persistent volumes to bind`),
},
wantScores: []int64{
25,
25,
50,
50,
0,
0,
},
},
}
for _, item := range table {
t.Run(item.name, func(t *testing.T) {
if item.feature != "" {
defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, item.feature, true)()
}
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
client := fake.NewSimpleClientset()
@ -271,8 +553,9 @@ func TestVolumeBinding(t *testing.T) {
t.Log("Feed testing data and wait for them to be synced")
client.StorageV1().StorageClasses().Create(ctx, immediateSC, metav1.CreateOptions{})
client.StorageV1().StorageClasses().Create(ctx, waitSC, metav1.CreateOptions{})
if item.node != nil {
client.CoreV1().Nodes().Create(ctx, item.node, metav1.CreateOptions{})
client.StorageV1().StorageClasses().Create(ctx, waitHDDSC, metav1.CreateOptions{})
for _, node := range item.nodes {
client.CoreV1().Nodes().Create(ctx, node, metav1.CreateOptions{})
}
for _, pvc := range item.pvcs {
client.CoreV1().PersistentVolumeClaims(pvc.Namespace).Create(ctx, pvc, metav1.CreateOptions{})
@ -290,8 +573,12 @@ func TestVolumeBinding(t *testing.T) {
t.Log("Verify")
p := pl.(*VolumeBinding)
nodeInfo := framework.NewNodeInfo()
nodeInfo.SetNode(item.node)
nodeInfos := make([]*framework.NodeInfo, 0)
for _, node := range item.nodes {
nodeInfo := framework.NewNodeInfo()
nodeInfo.SetNode(node)
nodeInfos = append(nodeInfos, nodeInfo)
}
state := framework.NewCycleState()
t.Logf("Verify: call PreFilter and check status")
@ -305,18 +592,35 @@ func TestVolumeBinding(t *testing.T) {
}
t.Logf("Verify: check state after prefilter phase")
stateData, err := getStateData(state)
got, err := getStateData(state)
if err != nil {
t.Fatal(err)
}
if !reflect.DeepEqual(stateData, item.wantStateAfterPreFilter) {
t.Errorf("state got after prefilter does not match: %v, want: %v", stateData, item.wantStateAfterPreFilter)
stateCmpOpts := []cmp.Option{
cmp.AllowUnexported(stateData{}),
cmpopts.IgnoreFields(stateData{}, "Mutex"),
}
if diff := cmp.Diff(item.wantStateAfterPreFilter, got, stateCmpOpts...); diff != "" {
t.Errorf("state got after prefilter does not match (-want,+got):\n%s", diff)
}
t.Logf("Verify: call Filter and check status")
gotStatus := p.Filter(ctx, state, item.pod, nodeInfo)
if !reflect.DeepEqual(gotStatus, item.wantFilterStatus) {
t.Errorf("filter status does not match: %v, want: %v", gotStatus, item.wantFilterStatus)
for i, nodeInfo := range nodeInfos {
gotStatus := p.Filter(ctx, state, item.pod, nodeInfo)
if !reflect.DeepEqual(gotStatus, item.wantFilterStatus[i]) {
t.Errorf("filter status does not match for node %q, got: %v, want: %v", nodeInfo.Node().Name, gotStatus, item.wantFilterStatus)
}
}
t.Logf("Verify: Score")
for i, node := range item.nodes {
score, status := p.Score(ctx, state, item.pod, node.Name)
if !status.IsSuccess() {
t.Errorf("Score expects success status, got: %v", status)
}
if score != item.wantScores[i] {
t.Errorf("Score expects score %d for node %q, got: %d", item.wantScores[i], node.Name, score)
}
}
})
}

View File

@ -1024,7 +1024,7 @@ func TestRescheduleProvisioning(t *testing.T) {
}
// Prepare node and storage class.
testNode := makeNode(0)
testNode := makeNode(1)
if _, err := clientset.CoreV1().Nodes().Create(context.TODO(), testNode, metav1.CreateOptions{}); err != nil {
t.Fatalf("Failed to create Node %q: %v", testNode.Name, err)
}
@ -1078,7 +1078,7 @@ func setupCluster(t *testing.T, nsName string, numberOfNodes int, resyncPeriod t
// Create shared objects
// Create nodes
for i := 0; i < numberOfNodes; i++ {
testNode := makeNode(i)
testNode := makeNode(i + 1)
if _, err := clientset.CoreV1().Nodes().Create(context.TODO(), testNode, metav1.CreateOptions{}); err != nil {
t.Fatalf("Failed to create Node %q: %v", testNode.Name, err)
}
@ -1199,21 +1199,6 @@ func makePV(name, scName, pvcName, ns, node string) *v1.PersistentVolume {
Path: "/test-path",
},
},
NodeAffinity: &v1.VolumeNodeAffinity{
Required: &v1.NodeSelector{
NodeSelectorTerms: []v1.NodeSelectorTerm{
{
MatchExpressions: []v1.NodeSelectorRequirement{
{
Key: nodeAffinityLabelKey,
Operator: v1.NodeSelectorOpIn,
Values: []string{node},
},
},
},
},
},
},
},
}
@ -1221,6 +1206,24 @@ func makePV(name, scName, pvcName, ns, node string) *v1.PersistentVolume {
pv.Spec.ClaimRef = &v1.ObjectReference{Name: pvcName, Namespace: ns}
}
if node != "" {
pv.Spec.NodeAffinity = &v1.VolumeNodeAffinity{
Required: &v1.NodeSelector{
NodeSelectorTerms: []v1.NodeSelectorTerm{
{
MatchExpressions: []v1.NodeSelectorRequirement{
{
Key: nodeAffinityLabelKey,
Operator: v1.NodeSelectorOpIn,
Values: []string{node},
},
},
},
},
},
}
}
return pv
}
@ -1280,11 +1283,13 @@ func makePod(name, ns string, pvcs []string) *v1.Pod {
}
}
// makeNode creates a node with the name "node-<index>"
func makeNode(index int) *v1.Node {
name := fmt.Sprintf("node-%d", index)
return &v1.Node{
ObjectMeta: metav1.ObjectMeta{
Name: fmt.Sprintf("node-%d", index+1),
Labels: map[string]string{nodeAffinityLabelKey: fmt.Sprintf("node-%d", index+1)},
Name: name,
Labels: map[string]string{nodeAffinityLabelKey: name},
},
Spec: v1.NodeSpec{Unschedulable: false},
Status: v1.NodeStatus{

View File

@ -0,0 +1,310 @@
/*
Copyright 2021 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package volumescheduling
// This file tests the VolumeCapacityPriority feature.
import (
"context"
"testing"
"time"
v1 "k8s.io/api/core/v1"
storagev1 "k8s.io/api/storage/v1"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
utilfeature "k8s.io/apiserver/pkg/util/feature"
featuregatetesting "k8s.io/component-base/featuregate/testing"
"k8s.io/klog/v2"
"k8s.io/kubernetes/pkg/features"
)
var (
waitSSDSC = makeStorageClass("ssd", &modeWait)
waitHDDSC = makeStorageClass("hdd", &modeWait)
)
func mergeNodeLabels(node *v1.Node, labels map[string]string) *v1.Node {
for k, v := range labels {
node.Labels[k] = v
}
return node
}
func setupClusterForVolumeCapacityPriority(t *testing.T, nsName string, resyncPeriod time.Duration, provisionDelaySeconds int) *testConfig {
textCtx := initTestSchedulerWithOptions(t, initTestMaster(t, nsName, nil), resyncPeriod)
clientset := textCtx.clientSet
ns := textCtx.ns.Name
ctrl, informerFactory, err := initPVController(t, textCtx, provisionDelaySeconds)
if err != nil {
t.Fatalf("Failed to create PV controller: %v", err)
}
go ctrl.Run(textCtx.ctx.Done())
// Start informer factory after all controllers are configured and running.
informerFactory.Start(textCtx.ctx.Done())
informerFactory.WaitForCacheSync(textCtx.ctx.Done())
return &testConfig{
client: clientset,
ns: ns,
stop: textCtx.ctx.Done(),
teardown: func() {
klog.Infof("test cluster %q start to tear down", ns)
deleteTestObjects(clientset, ns, metav1.DeleteOptions{})
cleanupTest(t, textCtx)
},
}
}
func TestVolumeCapacityPriority(t *testing.T) {
defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.VolumeCapacityPriority, true)()
config := setupClusterForVolumeCapacityPriority(t, "volume-capacity-priority", 0, 0)
defer config.teardown()
tests := []struct {
name string
pod *v1.Pod
nodes []*v1.Node
pvs []*v1.PersistentVolume
pvcs []*v1.PersistentVolumeClaim
wantNodeName string
}{
{
name: "local volumes with close capacity are preferred",
pod: makePod("pod", config.ns, []string{"data"}),
nodes: []*v1.Node{
makeNode(0),
makeNode(1),
makeNode(2),
},
pvs: []*v1.PersistentVolume{
setPVNodeAffinity(setPVCapacity(makePV("pv-0", waitSSDSC.Name, "", config.ns, "node-0"), resource.MustParse("200Gi")), map[string][]string{v1.LabelHostname: {"node-0"}}),
setPVNodeAffinity(setPVCapacity(makePV("pv-1", waitSSDSC.Name, "", config.ns, "node-0"), resource.MustParse("200Gi")), map[string][]string{v1.LabelHostname: {"node-0"}}),
setPVNodeAffinity(setPVCapacity(makePV("pv-2", waitSSDSC.Name, "", config.ns, "node-1"), resource.MustParse("100Gi")), map[string][]string{v1.LabelHostname: {"node-1"}}),
setPVNodeAffinity(setPVCapacity(makePV("pv-3", waitSSDSC.Name, "", config.ns, "node-1"), resource.MustParse("100Gi")), map[string][]string{v1.LabelHostname: {"node-1"}}),
setPVNodeAffinity(setPVCapacity(makePV("pv-4", waitSSDSC.Name, "", config.ns, "node-2"), resource.MustParse("100Gi")), map[string][]string{v1.LabelHostname: {"node-2"}}),
setPVNodeAffinity(setPVCapacity(makePV("pv-5", waitSSDSC.Name, "", config.ns, "node-2"), resource.MustParse("50Gi")), map[string][]string{v1.LabelHostname: {"node-2"}}),
},
pvcs: []*v1.PersistentVolumeClaim{
setPVCRequestStorage(makePVC("data", config.ns, &waitSSDSC.Name, ""), resource.MustParse("20Gi")),
},
wantNodeName: "node-2",
},
{
name: "local volumes with close capacity are preferred (multiple pvcs)",
pod: makePod("pod", config.ns, []string{"data-0", "data-1"}),
nodes: []*v1.Node{
makeNode(0),
makeNode(1),
makeNode(2),
},
pvs: []*v1.PersistentVolume{
setPVNodeAffinity(setPVCapacity(makePV("pv-0", waitSSDSC.Name, "", config.ns, "node-0"), resource.MustParse("200Gi")), map[string][]string{v1.LabelHostname: {"node-0"}}),
setPVNodeAffinity(setPVCapacity(makePV("pv-1", waitSSDSC.Name, "", config.ns, "node-0"), resource.MustParse("200Gi")), map[string][]string{v1.LabelHostname: {"node-0"}}),
setPVNodeAffinity(setPVCapacity(makePV("pv-2", waitSSDSC.Name, "", config.ns, "node-1"), resource.MustParse("100Gi")), map[string][]string{v1.LabelHostname: {"node-1"}}),
setPVNodeAffinity(setPVCapacity(makePV("pv-3", waitSSDSC.Name, "", config.ns, "node-1"), resource.MustParse("100Gi")), map[string][]string{v1.LabelHostname: {"node-1"}}),
setPVNodeAffinity(setPVCapacity(makePV("pv-4", waitSSDSC.Name, "", config.ns, "node-2"), resource.MustParse("100Gi")), map[string][]string{v1.LabelHostname: {"node-2"}}),
setPVNodeAffinity(setPVCapacity(makePV("pv-5", waitSSDSC.Name, "", config.ns, "node-2"), resource.MustParse("50Gi")), map[string][]string{v1.LabelHostname: {"node-2"}}),
},
pvcs: []*v1.PersistentVolumeClaim{
setPVCRequestStorage(makePVC("data-0", config.ns, &waitSSDSC.Name, ""), resource.MustParse("80Gi")),
setPVCRequestStorage(makePVC("data-1", config.ns, &waitSSDSC.Name, ""), resource.MustParse("80Gi")),
},
wantNodeName: "node-1",
},
{
name: "local volumes with close capacity are preferred (multiple pvcs, multiple classes)",
pod: makePod("pod", config.ns, []string{"data-0", "data-1"}),
nodes: []*v1.Node{
makeNode(0),
makeNode(1),
makeNode(2),
},
pvs: []*v1.PersistentVolume{
setPVNodeAffinity(setPVCapacity(makePV("pv-0", waitSSDSC.Name, "", config.ns, "node-0"), resource.MustParse("200Gi")), map[string][]string{v1.LabelHostname: {"node-0"}}),
setPVNodeAffinity(setPVCapacity(makePV("pv-1", waitHDDSC.Name, "", config.ns, "node-0"), resource.MustParse("200Gi")), map[string][]string{v1.LabelHostname: {"node-0"}}),
setPVNodeAffinity(setPVCapacity(makePV("pv-2", waitSSDSC.Name, "", config.ns, "node-1"), resource.MustParse("100Gi")), map[string][]string{v1.LabelHostname: {"node-1"}}),
setPVNodeAffinity(setPVCapacity(makePV("pv-3", waitHDDSC.Name, "", config.ns, "node-1"), resource.MustParse("100Gi")), map[string][]string{v1.LabelHostname: {"node-1"}}),
setPVNodeAffinity(setPVCapacity(makePV("pv-4", waitSSDSC.Name, "", config.ns, "node-2"), resource.MustParse("100Gi")), map[string][]string{v1.LabelHostname: {"node-2"}}),
setPVNodeAffinity(setPVCapacity(makePV("pv-5", waitHDDSC.Name, "", config.ns, "node-2"), resource.MustParse("50Gi")), map[string][]string{v1.LabelHostname: {"node-2"}}),
},
pvcs: []*v1.PersistentVolumeClaim{
setPVCRequestStorage(makePVC("data-0", config.ns, &waitSSDSC.Name, ""), resource.MustParse("80Gi")),
setPVCRequestStorage(makePVC("data-1", config.ns, &waitHDDSC.Name, ""), resource.MustParse("80Gi")),
},
wantNodeName: "node-1",
},
{
name: "zonal volumes with close capacity are preferred (multiple pvcs, multiple classes)",
pod: makePod("pod", config.ns, []string{"data-0", "data-1"}),
nodes: []*v1.Node{
mergeNodeLabels(makeNode(0), map[string]string{
"topology.kubernetes.io/region": "region-a",
"topology.kubernetes.io/zone": "zone-a",
}),
mergeNodeLabels(makeNode(1), map[string]string{
"topology.kubernetes.io/region": "region-b",
"topology.kubernetes.io/zone": "zone-b",
}),
mergeNodeLabels(makeNode(2), map[string]string{
"topology.kubernetes.io/region": "region-c",
"topology.kubernetes.io/zone": "zone-c",
}),
},
pvs: []*v1.PersistentVolume{
setPVNodeAffinity(setPVCapacity(makePV("pv-0", waitSSDSC.Name, "", config.ns, ""), resource.MustParse("200Gi")), map[string][]string{
"topology.kubernetes.io/region": {"region-a"},
"topology.kubernetes.io/zone": {"zone-a"},
}),
setPVNodeAffinity(setPVCapacity(makePV("pv-1", waitHDDSC.Name, "", config.ns, ""), resource.MustParse("200Gi")), map[string][]string{
"topology.kubernetes.io/region": {"region-a"},
"topology.kubernetes.io/zone": {"zone-a"},
}),
setPVNodeAffinity(setPVCapacity(makePV("pv-2", waitSSDSC.Name, "", config.ns, ""), resource.MustParse("100Gi")), map[string][]string{
"topology.kubernetes.io/region": {"region-b"},
"topology.kubernetes.io/zone": {"zone-b"},
}),
setPVNodeAffinity(setPVCapacity(makePV("pv-3", waitHDDSC.Name, "", config.ns, ""), resource.MustParse("100Gi")), map[string][]string{
"topology.kubernetes.io/region": {"region-b"},
"topology.kubernetes.io/zone": {"zone-b"},
}),
setPVNodeAffinity(setPVCapacity(makePV("pv-4", waitSSDSC.Name, "", config.ns, ""), resource.MustParse("100Gi")), map[string][]string{
"topology.kubernetes.io/region": {"region-c"},
"topology.kubernetes.io/zone": {"zone-c"},
}),
setPVNodeAffinity(setPVCapacity(makePV("pv-5", waitHDDSC.Name, "", config.ns, ""), resource.MustParse("50Gi")), map[string][]string{
"topology.kubernetes.io/region": {"region-c"},
"topology.kubernetes.io/zone": {"zone-c"},
}),
},
pvcs: []*v1.PersistentVolumeClaim{
setPVCRequestStorage(makePVC("data-0", config.ns, &waitSSDSC.Name, ""), resource.MustParse("80Gi")),
setPVCRequestStorage(makePVC("data-1", config.ns, &waitHDDSC.Name, ""), resource.MustParse("80Gi")),
},
wantNodeName: "node-1",
},
}
c := config.client
t.Log("Creating StorageClasses")
classes := map[string]*storagev1.StorageClass{}
classes[waitSSDSC.Name] = waitSSDSC
classes[waitHDDSC.Name] = waitHDDSC
for _, sc := range classes {
if _, err := c.StorageV1().StorageClasses().Create(context.TODO(), sc, metav1.CreateOptions{}); err != nil {
t.Fatalf("failed to create StorageClass %q: %v", sc.Name, err)
}
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
t.Log("Creating Nodes")
for _, node := range tt.nodes {
if _, err := c.CoreV1().Nodes().Create(context.TODO(), node, metav1.CreateOptions{}); err != nil {
t.Fatalf("failed to create Node %q: %v", node.Name, err)
}
}
t.Log("Creating PVs")
for _, pv := range tt.pvs {
if _, err := c.CoreV1().PersistentVolumes().Create(context.TODO(), pv, metav1.CreateOptions{}); err != nil {
t.Fatalf("failed to create PersistentVolume %q: %v", pv.Name, err)
}
}
// https://github.com/kubernetes/kubernetes/issues/85320
t.Log("Waiting for PVs to become available to avoid race condition in PV controller")
for _, pv := range tt.pvs {
if err := waitForPVPhase(c, pv.Name, v1.VolumeAvailable); err != nil {
t.Fatalf("failed to wait for PersistentVolume %q to become available: %v", pv.Name, err)
}
}
t.Log("Creating PVCs")
for _, pvc := range tt.pvcs {
if _, err := c.CoreV1().PersistentVolumeClaims(config.ns).Create(context.TODO(), pvc, metav1.CreateOptions{}); err != nil {
t.Fatalf("failed to create PersistentVolumeClaim %q: %v", pvc.Name, err)
}
}
t.Log("Create Pod")
if _, err := c.CoreV1().Pods(config.ns).Create(context.TODO(), tt.pod, metav1.CreateOptions{}); err != nil {
t.Fatalf("failed to create Pod %q: %v", tt.pod.Name, err)
}
if err := waitForPodToSchedule(c, tt.pod); err != nil {
t.Errorf("failed to schedule Pod %q: %v", tt.pod.Name, err)
}
t.Log("Verify the assigned node")
pod, err := c.CoreV1().Pods(config.ns).Get(context.TODO(), tt.pod.Name, metav1.GetOptions{})
if err != nil {
t.Fatalf("failed to get Pod %q: %v", tt.pod.Name, err)
}
if pod.Spec.NodeName != tt.wantNodeName {
t.Errorf("pod %s assigned node expects %q, got %q", pod.Name, tt.wantNodeName, pod.Spec.NodeName)
}
t.Log("Cleanup test objects")
c.CoreV1().Nodes().DeleteCollection(context.TODO(), deleteOption, metav1.ListOptions{})
c.CoreV1().Pods(config.ns).DeleteCollection(context.TODO(), deleteOption, metav1.ListOptions{})
c.CoreV1().PersistentVolumeClaims(config.ns).DeleteCollection(context.TODO(), deleteOption, metav1.ListOptions{})
c.CoreV1().PersistentVolumes().DeleteCollection(context.TODO(), deleteOption, metav1.ListOptions{})
})
}
}
func setPVNodeAffinity(pv *v1.PersistentVolume, keyValues map[string][]string) *v1.PersistentVolume {
matchExpressions := make([]v1.NodeSelectorRequirement, 0)
for key, values := range keyValues {
matchExpressions = append(matchExpressions, v1.NodeSelectorRequirement{
Key: key,
Operator: v1.NodeSelectorOpIn,
Values: values,
})
}
pv.Spec.NodeAffinity = &v1.VolumeNodeAffinity{
Required: &v1.NodeSelector{
NodeSelectorTerms: []v1.NodeSelectorTerm{
{
MatchExpressions: matchExpressions,
},
},
},
}
return pv
}
func setPVCapacity(pv *v1.PersistentVolume, capacity resource.Quantity) *v1.PersistentVolume {
if pv.Spec.Capacity == nil {
pv.Spec.Capacity = make(v1.ResourceList)
}
pv.Spec.Capacity[v1.ResourceName(v1.ResourceStorage)] = capacity
return pv
}
func setPVCRequestStorage(pvc *v1.PersistentVolumeClaim, request resource.Quantity) *v1.PersistentVolumeClaim {
pvc.Spec.Resources = v1.ResourceRequirements{
Requests: v1.ResourceList{
v1.ResourceName(v1.ResourceStorage): request,
},
}
return pvc
}