Add storage capacity scoring to VolumeBinding plugin

This commit is contained in:
bells17 2022-05-11 01:48:27 +09:00 committed by Yuma Ogami
parent 8b08487283
commit f6ddee95f9
10 changed files with 512 additions and 104 deletions

View File

@ -677,6 +677,13 @@ const (
// Enables trafficDistribution field on Services.
ServiceTrafficDistribution featuregate.Feature = "ServiceTrafficDistribution"
// owner: @cupnes
// kep: https://kep.k8s.io/4049
//
// Enables scoring nodes by available storage capacity with
// StorageCapacityScoring feature gate (dynamic provisioning only).
StorageCapacityScoring featuregate.Feature = "StorageCapacityScoring"
// owner: @gjkim42 @SergeyKanzhelev @matthyx @tzneal
// kep: http://kep.k8s.io/753
//

View File

@ -753,6 +753,10 @@ var defaultVersionedKubernetesFeatureGates = map[featuregate.Feature]featuregate
{Version: version.MustParse("1.31"), Default: true, PreRelease: featuregate.GA, LockToDefault: true}, // GA in 1.31, remove in 1.33
},
StorageCapacityScoring: {
{Version: version.MustParse("1.33"), Default: false, PreRelease: featuregate.Alpha},
},
StorageVersionMigrator: {
{Version: version.MustParse("1.30"), Default: false, PreRelease: featuregate.Alpha},
},

View File

@ -193,15 +193,28 @@ func SetDefaults_VolumeBindingArgs(obj *configv1.VolumeBindingArgs) {
obj.BindTimeoutSeconds = ptr.To[int64](600)
}
if len(obj.Shape) == 0 && feature.DefaultFeatureGate.Enabled(features.VolumeCapacityPriority) {
obj.Shape = []configv1.UtilizationShapePoint{
{
Utilization: 0,
Score: 0,
},
{
Utilization: 100,
Score: int32(config.MaxCustomPriorityScore),
},
if feature.DefaultFeatureGate.Enabled(features.StorageCapacityScoring) {
obj.Shape = []configv1.UtilizationShapePoint{
{
Utilization: 0,
Score: int32(config.MaxCustomPriorityScore),
},
{
Utilization: 100,
Score: 0,
},
}
} else {
obj.Shape = []configv1.UtilizationShapePoint{
{
Utilization: 0,
Score: 0,
},
{
Utilization: 100,
Score: int32(config.MaxCustomPriorityScore),
},
}
}
}
}

View File

@ -33,4 +33,5 @@ type Features struct {
EnableSchedulingQueueHint bool
EnableAsyncPreemption bool
EnablePodLevelResources bool
EnableStorageCapacityScoring bool
}

View File

@ -59,6 +59,7 @@ func NewInTreeRegistry() runtime.Registry {
EnableSchedulingQueueHint: feature.DefaultFeatureGate.Enabled(features.SchedulerQueueingHints),
EnableAsyncPreemption: feature.DefaultFeatureGate.Enabled(features.SchedulerAsyncPreemption),
EnablePodLevelResources: feature.DefaultFeatureGate.Enabled(features.PodLevelResources),
EnableStorageCapacityScoring: feature.DefaultFeatureGate.Enabled(features.StorageCapacityScoring),
}
registry := runtime.Registry{

View File

@ -102,13 +102,19 @@ func (b *BindingInfo) StorageResource() *StorageResource {
}
}
// DynamicProvision represents a dynamically provisioned volume.
type DynamicProvision struct {
PVC *v1.PersistentVolumeClaim
NodeCapacity *storagev1.CSIStorageCapacity
}
// PodVolumes holds pod's volumes information used in volume scheduling.
type PodVolumes struct {
// StaticBindings are binding decisions for PVCs which can be bound to
// pre-provisioned static PVs.
StaticBindings []*BindingInfo
// DynamicProvisions are PVCs that require dynamic provisioning
DynamicProvisions []*v1.PersistentVolumeClaim
DynamicProvisions []*DynamicProvision
}
// InTreeToCSITranslator contains methods required to check migratable status
@ -310,7 +316,7 @@ func (b *volumeBinder) FindPodVolumes(logger klog.Logger, pod *v1.Pod, podVolume
var (
staticBindings []*BindingInfo
dynamicProvisions []*v1.PersistentVolumeClaim
dynamicProvisions []*DynamicProvision
)
defer func() {
// Although we do not distinguish nil from empty in this function, for
@ -377,6 +383,16 @@ func (b *volumeBinder) FindPodVolumes(logger klog.Logger, pod *v1.Pod, podVolume
return
}
// ConvertDynamicProvisionsToPVCs converts a slice of *DynamicProvision to a
// slice of PersistentVolumeClaim
func convertDynamicProvisionsToPVCs(dynamicProvisions []*DynamicProvision) []*v1.PersistentVolumeClaim {
pvcs := make([]*v1.PersistentVolumeClaim, 0, len(dynamicProvisions))
for _, dynamicProvision := range dynamicProvisions {
pvcs = append(pvcs, dynamicProvision.PVC)
}
return pvcs
}
// AssumePodVolumes will take the matching PVs and PVCs to provision in pod's
// volume information for the chosen node, and:
// 1. Update the pvCache with the new prebound PV.
@ -423,20 +439,21 @@ func (b *volumeBinder) AssumePodVolumes(logger klog.Logger, assumedPod *v1.Pod,
}
// Assume PVCs
newProvisionedPVCs := []*v1.PersistentVolumeClaim{}
for _, claim := range podVolumes.DynamicProvisions {
newProvisionedPVCs := []*DynamicProvision{}
for _, dynamicProvision := range podVolumes.DynamicProvisions {
// The claims from method args can be pointing to watcher cache. We must not
// modify these, therefore create a copy.
claimClone := claim.DeepCopy()
claimClone := dynamicProvision.PVC.DeepCopy()
metav1.SetMetaDataAnnotation(&claimClone.ObjectMeta, volume.AnnSelectedNode, nodeName)
err = b.pvcCache.Assume(claimClone)
if err != nil {
pvcs := convertDynamicProvisionsToPVCs(newProvisionedPVCs)
b.revertAssumedPVs(newBindings)
b.revertAssumedPVCs(newProvisionedPVCs)
b.revertAssumedPVCs(pvcs)
return
}
newProvisionedPVCs = append(newProvisionedPVCs, claimClone)
newProvisionedPVCs = append(newProvisionedPVCs, &DynamicProvision{PVC: claimClone})
}
podVolumes.StaticBindings = newBindings
@ -446,8 +463,9 @@ func (b *volumeBinder) AssumePodVolumes(logger klog.Logger, assumedPod *v1.Pod,
// RevertAssumedPodVolumes will revert assumed PV and PVC cache.
func (b *volumeBinder) RevertAssumedPodVolumes(podVolumes *PodVolumes) {
pvcs := convertDynamicProvisionsToPVCs(podVolumes.DynamicProvisions)
b.revertAssumedPVs(podVolumes.StaticBindings)
b.revertAssumedPVCs(podVolumes.DynamicProvisions)
b.revertAssumedPVCs(pvcs)
}
// BindPodVolumes gets the cached bindings and PVCs to provision in pod's volumes information,
@ -464,7 +482,7 @@ func (b *volumeBinder) BindPodVolumes(ctx context.Context, assumedPod *v1.Pod, p
}()
bindings := podVolumes.StaticBindings
claimsToProvision := podVolumes.DynamicProvisions
claimsToProvision := convertDynamicProvisionsToPVCs(podVolumes.DynamicProvisions)
// Start API operations
err = b.bindAPIUpdate(ctx, assumedPod, bindings, claimsToProvision)
@ -886,8 +904,8 @@ func (b *volumeBinder) findMatchingVolumes(logger klog.Logger, pod *v1.Pod, clai
// checkVolumeProvisions checks given unbound claims (the claims have gone through func
// findMatchingVolumes, and do not have matching volumes for binding), and return true
// if all of the claims are eligible for dynamic provision.
func (b *volumeBinder) checkVolumeProvisions(logger klog.Logger, pod *v1.Pod, claimsToProvision []*v1.PersistentVolumeClaim, node *v1.Node) (provisionSatisfied, sufficientStorage bool, dynamicProvisions []*v1.PersistentVolumeClaim, err error) {
dynamicProvisions = []*v1.PersistentVolumeClaim{}
func (b *volumeBinder) checkVolumeProvisions(logger klog.Logger, pod *v1.Pod, claimsToProvision []*v1.PersistentVolumeClaim, node *v1.Node) (provisionSatisfied, sufficientStorage bool, dynamicProvisions []*DynamicProvision, err error) {
dynamicProvisions = []*DynamicProvision{}
// We return early with provisionedClaims == nil if a check
// fails or we encounter an error.
@ -915,7 +933,7 @@ func (b *volumeBinder) checkVolumeProvisions(logger klog.Logger, pod *v1.Pod, cl
}
// Check storage capacity.
sufficient, err := b.hasEnoughCapacity(logger, provisioner, claim, class, node)
sufficient, capacity, err := b.hasEnoughCapacity(logger, provisioner, claim, class, node)
if err != nil {
return false, false, nil, err
}
@ -924,8 +942,10 @@ func (b *volumeBinder) checkVolumeProvisions(logger klog.Logger, pod *v1.Pod, cl
return true, false, nil, nil
}
dynamicProvisions = append(dynamicProvisions, claim)
dynamicProvisions = append(dynamicProvisions, &DynamicProvision{
PVC: claim,
NodeCapacity: capacity,
})
}
logger.V(4).Info("Provisioning for claims of pod that has no matching volumes...", "claimCount", len(claimsToProvision), "pod", klog.KObj(pod), "node", klog.KObj(node))
@ -945,12 +965,12 @@ func (b *volumeBinder) revertAssumedPVCs(claims []*v1.PersistentVolumeClaim) {
}
// hasEnoughCapacity checks whether the provisioner has enough capacity left for a new volume of the given size
// that is available from the node.
func (b *volumeBinder) hasEnoughCapacity(logger klog.Logger, provisioner string, claim *v1.PersistentVolumeClaim, storageClass *storagev1.StorageClass, node *v1.Node) (bool, error) {
// that is available from the node. This function returns the node capacity based on the PVC's storage class.
func (b *volumeBinder) hasEnoughCapacity(logger klog.Logger, provisioner string, claim *v1.PersistentVolumeClaim, storageClass *storagev1.StorageClass, node *v1.Node) (bool, *storagev1.CSIStorageCapacity, error) {
quantity, ok := claim.Spec.Resources.Requests[v1.ResourceStorage]
if !ok {
// No capacity to check for.
return true, nil
return true, nil, nil
}
// Only enabled for CSI drivers which opt into it.
@ -960,19 +980,19 @@ func (b *volumeBinder) hasEnoughCapacity(logger klog.Logger, provisioner string,
// Either the provisioner is not a CSI driver or the driver does not
// opt into storage capacity scheduling. Either way, skip
// capacity checking.
return true, nil
return true, nil, nil
}
return false, err
return false, nil, err
}
if driver.Spec.StorageCapacity == nil || !*driver.Spec.StorageCapacity {
return true, nil
return true, nil, nil
}
// Look for a matching CSIStorageCapacity object(s).
// TODO (for beta): benchmark this and potentially introduce some kind of lookup structure (https://github.com/kubernetes/enhancements/issues/1698#issuecomment-654356718).
capacities, err := b.csiStorageCapacityLister.List(labels.Everything())
if err != nil {
return false, err
return false, nil, err
}
sizeInBytes := quantity.Value()
@ -981,7 +1001,7 @@ func (b *volumeBinder) hasEnoughCapacity(logger klog.Logger, provisioner string,
capacitySufficient(capacity, sizeInBytes) &&
b.nodeHasAccess(logger, node, capacity) {
// Enough capacity found.
return true, nil
return true, capacity, nil
}
}
@ -989,7 +1009,7 @@ func (b *volumeBinder) hasEnoughCapacity(logger klog.Logger, provisioner string,
// they had to be rejected. Log that above? But that might be a lot of log output...
logger.V(4).Info("Node has no accessible CSIStorageCapacity with enough capacity for PVC",
"node", klog.KObj(node), "PVC", klog.KObj(claim), "size", sizeInBytes, "storageClass", klog.KObj(storageClass))
return false, nil
return false, nil, nil
}
func capacitySufficient(capacity *storagev1.CSIStorageCapacity, sizeInBytes int64) bool {

View File

@ -122,6 +122,11 @@ var (
// node topology for CSI migration
zone1Labels = map[string]string{v1.LabelFailureDomainBetaZone: "us-east-1", v1.LabelFailureDomainBetaRegion: "us-east-1a"}
// csiCapacity objects
networkAttachedCapacity = makeCapacity("net", waitClassWithProvisioner, nil, "1Gi", "")
node1Capacity = makeCapacity("net", waitClassWithProvisioner, node1, "1Gi", "")
node2Capacity = makeCapacity("net", waitClassWithProvisioner, node2, "1Gi", "")
)
type testEnv struct {
@ -396,14 +401,14 @@ func (env *testEnv) assumeVolumes(t *testing.T, node string, pod *v1.Pod, bindin
}
}
func (env *testEnv) validatePodCache(t *testing.T, node string, pod *v1.Pod, podVolumes *PodVolumes, expectedBindings []*BindingInfo, expectedProvisionings []*v1.PersistentVolumeClaim) {
func (env *testEnv) validatePodCache(t *testing.T, node string, pod *v1.Pod, podVolumes *PodVolumes, expectedBindings []*BindingInfo, expectedProvisionings []*DynamicProvision) {
var (
bindings []*BindingInfo
provisionedClaims []*v1.PersistentVolumeClaim
dynamicProvisions []*DynamicProvision
)
if podVolumes != nil {
bindings = podVolumes.StaticBindings
provisionedClaims = podVolumes.DynamicProvisions
dynamicProvisions = podVolumes.DynamicProvisions
}
if aLen, eLen := len(bindings), len(expectedBindings); aLen != eLen {
t.Errorf("expected %v bindings, got %v", eLen, aLen)
@ -427,17 +432,17 @@ func (env *testEnv) validatePodCache(t *testing.T, node string, pod *v1.Pod, pod
}
}
if aLen, eLen := len(provisionedClaims), len(expectedProvisionings); aLen != eLen {
if aLen, eLen := len(dynamicProvisions), len(expectedProvisionings); aLen != eLen {
t.Errorf("expected %v provisioned claims, got %v", eLen, aLen)
} else if expectedProvisionings == nil && provisionedClaims != nil {
} else if expectedProvisionings == nil && dynamicProvisions != nil {
// nil and empty are different
t.Error("expected nil provisionings, got empty")
} else if expectedProvisionings != nil && provisionedClaims == nil {
} else if expectedProvisionings != nil && dynamicProvisions == nil {
// nil and empty are different
t.Error("expected empty provisionings, got nil")
} else {
for i := 0; i < aLen; i++ {
if diff := cmp.Diff(expectedProvisionings[i], provisionedClaims[i]); diff != "" {
if diff := cmp.Diff(expectedProvisionings[i], dynamicProvisions[i]); diff != "" {
t.Errorf("provisioned claims doesn't match (-want, +got):\n%s", diff)
}
}
@ -1041,7 +1046,7 @@ func TestFindPodVolumesWithProvisioning(t *testing.T) {
// Expected podBindingCache fields
expectedBindings []*BindingInfo
expectedProvisions []*v1.PersistentVolumeClaim
expectedProvisions []*DynamicProvision
// Expected return values
reasons ConflictReasons
@ -1051,26 +1056,26 @@ func TestFindPodVolumesWithProvisioning(t *testing.T) {
scenarios := map[string]scenarioType{
"one-provisioned": {
podPVCs: []*v1.PersistentVolumeClaim{provisionedPVC},
expectedProvisions: []*v1.PersistentVolumeClaim{provisionedPVC},
expectedProvisions: []*DynamicProvision{{PVC: provisionedPVC}},
needsCapacity: true,
},
"two-unbound-pvcs,one-matched,one-provisioned": {
podPVCs: []*v1.PersistentVolumeClaim{unboundPVC, provisionedPVC},
pvs: []*v1.PersistentVolume{pvNode1a},
expectedBindings: []*BindingInfo{makeBinding(unboundPVC, pvNode1a)},
expectedProvisions: []*v1.PersistentVolumeClaim{provisionedPVC},
expectedProvisions: []*DynamicProvision{{PVC: provisionedPVC}},
needsCapacity: true,
},
"one-bound,one-provisioned": {
podPVCs: []*v1.PersistentVolumeClaim{boundPVC, provisionedPVC},
pvs: []*v1.PersistentVolume{pvBound},
expectedProvisions: []*v1.PersistentVolumeClaim{provisionedPVC},
expectedProvisions: []*DynamicProvision{{PVC: provisionedPVC}},
needsCapacity: true,
},
"one-binding,one-selected-node": {
podPVCs: []*v1.PersistentVolumeClaim{boundPVC, selectedNodePVC},
pvs: []*v1.PersistentVolume{pvBound},
expectedProvisions: []*v1.PersistentVolumeClaim{selectedNodePVC},
expectedProvisions: []*DynamicProvision{{PVC: selectedNodePVC}},
needsCapacity: true,
},
"immediate-unbound-pvc": {
@ -1080,7 +1085,7 @@ func TestFindPodVolumesWithProvisioning(t *testing.T) {
"one-immediate-bound,one-provisioned": {
podPVCs: []*v1.PersistentVolumeClaim{immediateBoundPVC, provisionedPVC},
pvs: []*v1.PersistentVolume{pvBoundImmediate},
expectedProvisions: []*v1.PersistentVolumeClaim{provisionedPVC},
expectedProvisions: []*DynamicProvision{{PVC: provisionedPVC}},
needsCapacity: true,
},
"invalid-provisioner": {
@ -1266,17 +1271,17 @@ func TestFindPodVolumesWithCSIMigration(t *testing.T) {
func TestAssumePodVolumes(t *testing.T) {
type scenarioType struct {
// Inputs
podPVCs []*v1.PersistentVolumeClaim
pvs []*v1.PersistentVolume
bindings []*BindingInfo
provisionedPVCs []*v1.PersistentVolumeClaim
podPVCs []*v1.PersistentVolumeClaim
pvs []*v1.PersistentVolume
bindings []*BindingInfo
dynamicProvisions []*DynamicProvision
// Expected return values
shouldFail bool
expectedAllBound bool
expectedBindings []*BindingInfo
expectedProvisionings []*v1.PersistentVolumeClaim
expectedProvisionings []*DynamicProvision
}
scenarios := map[string]scenarioType{
"all-bound": {
@ -1289,21 +1294,21 @@ func TestAssumePodVolumes(t *testing.T) {
bindings: []*BindingInfo{makeBinding(unboundPVC, pvNode1a)},
pvs: []*v1.PersistentVolume{pvNode1a},
expectedBindings: []*BindingInfo{makeBinding(unboundPVC, pvNode1aBound)},
expectedProvisionings: []*v1.PersistentVolumeClaim{},
expectedProvisionings: []*DynamicProvision{},
},
"two-bindings": {
podPVCs: []*v1.PersistentVolumeClaim{unboundPVC, unboundPVC2},
bindings: []*BindingInfo{makeBinding(unboundPVC, pvNode1a), makeBinding(unboundPVC2, pvNode1b)},
pvs: []*v1.PersistentVolume{pvNode1a, pvNode1b},
expectedBindings: []*BindingInfo{makeBinding(unboundPVC, pvNode1aBound), makeBinding(unboundPVC2, pvNode1bBound)},
expectedProvisionings: []*v1.PersistentVolumeClaim{},
expectedProvisionings: []*DynamicProvision{},
},
"pv-already-bound": {
podPVCs: []*v1.PersistentVolumeClaim{unboundPVC},
bindings: []*BindingInfo{makeBinding(unboundPVC, pvNode1aBound)},
pvs: []*v1.PersistentVolume{pvNode1aBound},
expectedBindings: []*BindingInfo{makeBinding(unboundPVC, pvNode1aBound)},
expectedProvisionings: []*v1.PersistentVolumeClaim{},
expectedProvisionings: []*DynamicProvision{},
},
"tmpupdate-failed": {
podPVCs: []*v1.PersistentVolumeClaim{unboundPVC},
@ -1315,16 +1320,16 @@ func TestAssumePodVolumes(t *testing.T) {
podPVCs: []*v1.PersistentVolumeClaim{unboundPVC, provisionedPVC},
bindings: []*BindingInfo{makeBinding(unboundPVC, pvNode1a)},
pvs: []*v1.PersistentVolume{pvNode1a},
provisionedPVCs: []*v1.PersistentVolumeClaim{provisionedPVC},
dynamicProvisions: []*DynamicProvision{{PVC: provisionedPVC}},
expectedBindings: []*BindingInfo{makeBinding(unboundPVC, pvNode1aBound)},
expectedProvisionings: []*v1.PersistentVolumeClaim{selectedNodePVC},
expectedProvisionings: []*DynamicProvision{{PVC: selectedNodePVC}},
},
"one-binding, one-provision-tmpupdate-failed": {
podPVCs: []*v1.PersistentVolumeClaim{unboundPVC, provisionedPVCHigherVersion},
bindings: []*BindingInfo{makeBinding(unboundPVC, pvNode1a)},
pvs: []*v1.PersistentVolume{pvNode1a},
provisionedPVCs: []*v1.PersistentVolumeClaim{provisionedPVC2},
shouldFail: true,
podPVCs: []*v1.PersistentVolumeClaim{unboundPVC, provisionedPVCHigherVersion},
bindings: []*BindingInfo{makeBinding(unboundPVC, pvNode1a)},
pvs: []*v1.PersistentVolume{pvNode1a},
dynamicProvisions: []*DynamicProvision{{PVC: provisionedPVC2}},
shouldFail: true,
},
}
@ -1342,7 +1347,7 @@ func TestAssumePodVolumes(t *testing.T) {
withPVCSVolume(scenario.podPVCs).Pod
podVolumes := &PodVolumes{
StaticBindings: scenario.bindings,
DynamicProvisions: scenario.provisionedPVCs,
DynamicProvisions: scenario.dynamicProvisions,
}
testEnv.initVolumes(scenario.pvs, scenario.pvs)
@ -1363,12 +1368,14 @@ func TestAssumePodVolumes(t *testing.T) {
scenario.expectedBindings = scenario.bindings
}
if scenario.expectedProvisionings == nil {
scenario.expectedProvisionings = scenario.provisionedPVCs
scenario.expectedProvisionings = scenario.dynamicProvisions
}
if scenario.shouldFail {
testEnv.validateCacheRestored(t, pod, scenario.bindings, scenario.provisionedPVCs)
pvcs := convertDynamicProvisionsToPVCs(scenario.dynamicProvisions)
testEnv.validateCacheRestored(t, pod, scenario.bindings, pvcs)
} else {
testEnv.validateAssume(t, pod, scenario.expectedBindings, scenario.expectedProvisionings)
pvcs := convertDynamicProvisionsToPVCs(scenario.expectedProvisionings)
testEnv.validateAssume(t, pod, scenario.expectedBindings, pvcs)
}
testEnv.validatePodCache(t, pod.Spec.NodeName, pod, podVolumes, scenario.expectedBindings, scenario.expectedProvisionings)
}
@ -1386,7 +1393,7 @@ func TestRevertAssumedPodVolumes(t *testing.T) {
podPVCs := []*v1.PersistentVolumeClaim{unboundPVC, provisionedPVC}
bindings := []*BindingInfo{makeBinding(unboundPVC, pvNode1a)}
pvs := []*v1.PersistentVolume{pvNode1a}
provisionedPVCs := []*v1.PersistentVolumeClaim{provisionedPVC}
dynamicProvisions := []*DynamicProvision{{PVC: provisionedPVC}}
expectedBindings := []*BindingInfo{makeBinding(unboundPVC, pvNode1aBound)}
expectedProvisionings := []*v1.PersistentVolumeClaim{selectedNodePVC}
@ -1399,7 +1406,7 @@ func TestRevertAssumedPodVolumes(t *testing.T) {
withPVCSVolume(podPVCs).Pod
podVolumes := &PodVolumes{
StaticBindings: bindings,
DynamicProvisions: provisionedPVCs,
DynamicProvisions: dynamicProvisions,
}
testEnv.initVolumes(pvs, pvs)
@ -1409,8 +1416,9 @@ func TestRevertAssumedPodVolumes(t *testing.T) {
}
testEnv.validateAssume(t, pod, expectedBindings, expectedProvisionings)
claims := convertDynamicProvisionsToPVCs(dynamicProvisions)
testEnv.binder.RevertAssumedPodVolumes(podVolumes)
testEnv.validateCacheRestored(t, pod, bindings, provisionedPVCs)
testEnv.validateCacheRestored(t, pod, bindings, claims)
}
func TestBindAPIUpdate(t *testing.T) {
@ -2075,9 +2083,13 @@ func TestBindPodVolumes(t *testing.T) {
}
// Execute
dynamicProvisions := []*DynamicProvision{}
for _, claim := range claimsToProvision {
dynamicProvisions = append(dynamicProvisions, &DynamicProvision{PVC: claim})
}
podVolumes := &PodVolumes{
StaticBindings: bindings,
DynamicProvisions: claimsToProvision,
DynamicProvisions: dynamicProvisions,
}
err := testEnv.binder.BindPodVolumes(ctx, pod, podVolumes)
@ -2173,29 +2185,42 @@ func TestCapacity(t *testing.T) {
capacities []*storagev1.CSIStorageCapacity
// Expected return values
reasons ConflictReasons
shouldFail bool
expectedProvisions []*DynamicProvision
reasons ConflictReasons
shouldFail bool
}
scenarios := map[string]scenarioType{
"network-attached": {
pvcs: []*v1.PersistentVolumeClaim{provisionedPVC},
capacities: []*storagev1.CSIStorageCapacity{
makeCapacity("net", waitClassWithProvisioner, nil, "1Gi", ""),
networkAttachedCapacity,
},
expectedProvisions: []*DynamicProvision{{
PVC: provisionedPVC,
NodeCapacity: networkAttachedCapacity,
}},
},
"local-storage": {
pvcs: []*v1.PersistentVolumeClaim{provisionedPVC},
capacities: []*storagev1.CSIStorageCapacity{
makeCapacity("net", waitClassWithProvisioner, node1, "1Gi", ""),
node1Capacity,
},
expectedProvisions: []*DynamicProvision{{
PVC: provisionedPVC,
NodeCapacity: node1Capacity,
}},
},
"multiple": {
pvcs: []*v1.PersistentVolumeClaim{provisionedPVC},
capacities: []*storagev1.CSIStorageCapacity{
makeCapacity("net", waitClassWithProvisioner, nil, "1Gi", ""),
makeCapacity("net", waitClassWithProvisioner, node2, "1Gi", ""),
makeCapacity("net", waitClassWithProvisioner, node1, "1Gi", ""),
networkAttachedCapacity,
node2Capacity,
node1Capacity,
},
expectedProvisions: []*DynamicProvision{{
PVC: provisionedPVC,
NodeCapacity: node1Capacity,
}},
},
"no-storage": {
pvcs: []*v1.PersistentVolumeClaim{provisionedPVC},
@ -2297,11 +2322,16 @@ func TestCapacity(t *testing.T) {
t.Error("returned success but expected error")
}
checkReasons(t, reasons, expectedReasons)
provisions := scenario.pvcs
if len(reasons) > 0 {
provisions = nil
expectedProvisions := scenario.expectedProvisions
if !optIn {
for i := 0; i < len(expectedProvisions); i++ {
expectedProvisions[i].NodeCapacity = nil
}
}
testEnv.validatePodCache(t, pod.Spec.NodeName, pod, podVolumes, nil, provisions)
if len(scenario.reasons) > 0 {
expectedProvisions = podVolumes.DynamicProvisions
}
testEnv.validatePodCache(t, pod.Spec.NodeName, pod, podVolumes, nil, expectedProvisions)
}
yesNo := []bool{true, false}

View File

@ -29,6 +29,7 @@ import (
apierrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/runtime"
corelisters "k8s.io/client-go/listers/core/v1"
storagelisters "k8s.io/client-go/listers/storage/v1"
"k8s.io/component-helpers/storage/ephemeral"
"k8s.io/klog/v2"
"k8s.io/kubernetes/pkg/scheduler/apis/config"
@ -70,10 +71,11 @@ func (d *stateData) Clone() framework.StateData {
// In the Filter phase, pod binding cache is created for the pod and used in
// Reserve and PreBind phases.
type VolumeBinding struct {
Binder SchedulerVolumeBinder
PVCLister corelisters.PersistentVolumeClaimLister
scorer volumeCapacityScorer
fts feature.Features
Binder SchedulerVolumeBinder
PVCLister corelisters.PersistentVolumeClaimLister
classLister storagelisters.StorageClassLister
scorer volumeCapacityScorer
fts feature.Features
}
var _ framework.PreFilterPlugin = &VolumeBinding{}
@ -451,7 +453,7 @@ func (pl *VolumeBinding) PreScore(ctx context.Context, cs *framework.CycleState,
if err != nil {
return framework.AsStatus(err)
}
if state.hasStaticBindings {
if state.hasStaticBindings || pl.fts.EnableStorageCapacityScoring {
return nil
}
return framework.NewStatus(framework.Skip)
@ -471,20 +473,44 @@ func (pl *VolumeBinding) Score(ctx context.Context, cs *framework.CycleState, po
if !ok {
return 0, nil
}
// group by storage class
classResources := make(classResourceMap)
for _, staticBinding := range podVolumes.StaticBindings {
class := staticBinding.StorageClassName()
storageResource := staticBinding.StorageResource()
if _, ok := classResources[class]; !ok {
classResources[class] = &StorageResource{
Requested: 0,
Capacity: 0,
if len(podVolumes.StaticBindings) != 0 || !pl.fts.EnableStorageCapacityScoring {
// group static binding volumes by storage class
for _, staticBinding := range podVolumes.StaticBindings {
class := staticBinding.StorageClassName()
storageResource := staticBinding.StorageResource()
if _, ok := classResources[class]; !ok {
classResources[class] = &StorageResource{
Requested: 0,
Capacity: 0,
}
}
classResources[class].Requested += storageResource.Requested
classResources[class].Capacity += storageResource.Capacity
}
} else {
// group dynamic binding volumes by storage class
for _, provision := range podVolumes.DynamicProvisions {
if provision.NodeCapacity == nil {
continue
}
class := *provision.PVC.Spec.StorageClassName
if _, ok := classResources[class]; !ok {
classResources[class] = &StorageResource{
Requested: 0,
Capacity: 0,
}
}
// The following line cannot be +=. For example, if a Pod requests two 50GB volumes from
// a StorageClass with 100GB of capacity on a node, this part of the code will be executed twice.
// In that case, using += would incorrectly set classResources[class].Capacity to 200GB.
classResources[class].Capacity = provision.NodeCapacity.Capacity.Value()
requestedQty := provision.PVC.Spec.Resources.Requests[v1.ResourceName(v1.ResourceStorage)]
classResources[class].Requested += requestedQty.Value()
}
classResources[class].Requested += storageResource.Requested
classResources[class].Capacity += storageResource.Capacity
}
return pl.scorer(classResources), nil
}
@ -595,9 +621,10 @@ func New(ctx context.Context, plArgs runtime.Object, fh framework.Handle, fts fe
scorer = buildScorerFunction(shape)
}
return &VolumeBinding{
Binder: binder,
PVCLister: pvcInformer.Lister(),
scorer: scorer,
fts: fts,
Binder: binder,
PVCLister: pvcInformer.Lister(),
classLister: storageClassInformer.Lister(),
scorer: scorer,
fts: fts,
}, nil
}

View File

@ -59,6 +59,22 @@ var (
},
VolumeBindingMode: &waitForFirstConsumer,
}
waitSCWithStorageCapacity = &storagev1.StorageClass{
ObjectMeta: metav1.ObjectMeta{
Name: "wait-sc-with-storage-capacity",
},
Provisioner: "driver-with-storage-capacity",
VolumeBindingMode: &waitForFirstConsumer,
}
driverWithStorageCapacity = &storagev1.CSIDriver{
ObjectMeta: metav1.ObjectMeta{
Name: "driver-with-storage-capacity",
},
Spec: storagev1.CSIDriverSpec{
StorageCapacity: ptr.To(true),
},
}
defaultShapePoint = []config.UtilizationShapePoint{
{
@ -79,6 +95,7 @@ func TestVolumeBinding(t *testing.T) {
nodes []*v1.Node
pvcs []*v1.PersistentVolumeClaim
pvs []*v1.PersistentVolume
capacities []*storagev1.CSIStorageCapacity
fts feature.Features
args *config.VolumeBindingArgs
wantPreFilterResult *framework.PreFilterResult
@ -716,6 +733,255 @@ func TestVolumeBinding(t *testing.T) {
0,
},
},
{
name: "storage capacity score",
pod: makePod("pod-a").withPVCVolume("pvc-dynamic", "").Pod,
nodes: []*v1.Node{
makeNode("node-a").withLabel(nodeLabelKey, "node-a").Node,
makeNode("node-b").withLabel(nodeLabelKey, "node-b").Node,
makeNode("node-c").withLabel(nodeLabelKey, "node-c").Node,
},
pvcs: []*v1.PersistentVolumeClaim{
makePVC("pvc-dynamic", waitSCWithStorageCapacity.Name).withRequestStorage(resource.MustParse("10Gi")).PersistentVolumeClaim,
},
capacities: []*storagev1.CSIStorageCapacity{
makeCapacity("node-a", waitSCWithStorageCapacity.Name, makeNode("node-a").withLabel(nodeLabelKey, "node-a").Node, "100Gi", ""),
makeCapacity("node-b", waitSCWithStorageCapacity.Name, makeNode("node-b").withLabel(nodeLabelKey, "node-b").Node, "50Gi", ""),
makeCapacity("node-c", waitSCWithStorageCapacity.Name, makeNode("node-c").withLabel(nodeLabelKey, "node-c").Node, "10Gi", ""),
},
fts: feature.Features{
EnableVolumeCapacityPriority: true,
EnableStorageCapacityScoring: true,
},
wantPreFilterStatus: nil,
wantStateAfterPreFilter: &stateData{
podVolumeClaims: &PodVolumeClaims{
boundClaims: []*v1.PersistentVolumeClaim{},
unboundClaimsDelayBinding: []*v1.PersistentVolumeClaim{
makePVC("pvc-dynamic", waitSCWithStorageCapacity.Name).withRequestStorage(resource.MustParse("10Gi")).PersistentVolumeClaim,
},
unboundVolumesDelayBinding: map[string][]*v1.PersistentVolume{waitSCWithStorageCapacity.Name: {}},
},
podVolumesByNode: map[string]*PodVolumes{},
},
wantFilterStatus: []*framework.Status{
nil,
nil,
nil,
},
wantScores: []int64{
10,
20,
100,
},
},
{
name: "storage capacity score with static binds",
pod: makePod("pod-a").withPVCVolume("pvc-dynamic", "").withPVCVolume("pvc-static", "").Pod,
nodes: []*v1.Node{
makeNode("node-a").withLabel(nodeLabelKey, "node-a").Node,
makeNode("node-b").withLabel(nodeLabelKey, "node-b").Node,
makeNode("node-c").withLabel(nodeLabelKey, "node-c").Node,
},
pvcs: []*v1.PersistentVolumeClaim{
makePVC("pvc-dynamic", waitSCWithStorageCapacity.Name).withRequestStorage(resource.MustParse("10Gi")).PersistentVolumeClaim,
makePVC("pvc-static", waitSC.Name).withRequestStorage(resource.MustParse("50Gi")).PersistentVolumeClaim,
},
pvs: []*v1.PersistentVolume{
makePV("pv-static-a", waitSC.Name).
withPhase(v1.VolumeAvailable).
withCapacity(resource.MustParse("100Gi")).
withNodeAffinity(map[string][]string{v1.LabelHostname: {"node-a"}}).PersistentVolume,
makePV("pv-static-b", waitSC.Name).
withPhase(v1.VolumeAvailable).
withCapacity(resource.MustParse("100Gi")).
withNodeAffinity(map[string][]string{v1.LabelHostname: {"node-b"}}).PersistentVolume,
makePV("pv-static-c", waitSC.Name).
withPhase(v1.VolumeAvailable).
withCapacity(resource.MustParse("100Gi")).
withNodeAffinity(map[string][]string{v1.LabelHostname: {"node-c"}}).PersistentVolume,
},
capacities: []*storagev1.CSIStorageCapacity{
makeCapacity("node-a", waitSCWithStorageCapacity.Name, makeNode("node-a").withLabel(nodeLabelKey, "node-a").Node, "100Gi", ""),
makeCapacity("node-b", waitSCWithStorageCapacity.Name, makeNode("node-b").withLabel(nodeLabelKey, "node-b").Node, "50Gi", ""),
makeCapacity("node-c", waitSCWithStorageCapacity.Name, makeNode("node-c").withLabel(nodeLabelKey, "node-c").Node, "10Gi", ""),
},
fts: feature.Features{
EnableVolumeCapacityPriority: true,
EnableStorageCapacityScoring: true,
},
wantPreFilterStatus: nil,
wantStateAfterPreFilter: &stateData{
podVolumeClaims: &PodVolumeClaims{
boundClaims: []*v1.PersistentVolumeClaim{},
unboundClaimsDelayBinding: []*v1.PersistentVolumeClaim{
makePVC("pvc-dynamic", waitSCWithStorageCapacity.Name).withRequestStorage(resource.MustParse("10Gi")).PersistentVolumeClaim,
makePVC("pvc-static", waitSC.Name).withRequestStorage(resource.MustParse("50Gi")).PersistentVolumeClaim,
},
unboundVolumesDelayBinding: map[string][]*v1.PersistentVolume{
waitSC.Name: {
makePV("pv-static-a", waitSC.Name).
withPhase(v1.VolumeAvailable).
withCapacity(resource.MustParse("100Gi")).
withNodeAffinity(map[string][]string{v1.LabelHostname: {"node-a"}}).PersistentVolume,
makePV("pv-static-b", waitSC.Name).
withPhase(v1.VolumeAvailable).
withCapacity(resource.MustParse("100Gi")).
withNodeAffinity(map[string][]string{v1.LabelHostname: {"node-b"}}).PersistentVolume,
makePV("pv-static-c", waitSC.Name).
withPhase(v1.VolumeAvailable).
withCapacity(resource.MustParse("100Gi")).
withNodeAffinity(map[string][]string{v1.LabelHostname: {"node-c"}}).PersistentVolume,
},
waitSCWithStorageCapacity.Name: {},
},
},
podVolumesByNode: map[string]*PodVolumes{},
},
wantFilterStatus: []*framework.Status{
nil,
nil,
nil,
},
wantScores: []int64{
50,
50,
50,
},
},
{
name: "dynamic provisioning with multiple PVCs of the same StorageClass",
pod: makePod("pod-a").withPVCVolume("pvc-dynamic-0", "").withPVCVolume("pvc-dynamic-1", "").Pod,
nodes: []*v1.Node{
makeNode("node-a").withLabel(nodeLabelKey, "node-a").Node,
},
pvcs: []*v1.PersistentVolumeClaim{
makePVC("pvc-dynamic-0", waitSCWithStorageCapacity.Name).withRequestStorage(resource.MustParse("50Gi")).PersistentVolumeClaim,
makePVC("pvc-dynamic-1", waitSCWithStorageCapacity.Name).withRequestStorage(resource.MustParse("50Gi")).PersistentVolumeClaim,
},
capacities: []*storagev1.CSIStorageCapacity{
makeCapacity("node-a", waitSCWithStorageCapacity.Name, makeNode("node-a").withLabel(nodeLabelKey, "node-a").Node, "100Gi", ""),
},
fts: feature.Features{
EnableVolumeCapacityPriority: true,
EnableStorageCapacityScoring: true,
},
wantPreFilterStatus: nil,
wantStateAfterPreFilter: &stateData{
podVolumeClaims: &PodVolumeClaims{
boundClaims: []*v1.PersistentVolumeClaim{},
unboundClaimsDelayBinding: []*v1.PersistentVolumeClaim{
makePVC("pvc-dynamic-0", waitSCWithStorageCapacity.Name).withRequestStorage(resource.MustParse("50Gi")).PersistentVolumeClaim,
makePVC("pvc-dynamic-1", waitSCWithStorageCapacity.Name).withRequestStorage(resource.MustParse("50Gi")).PersistentVolumeClaim,
},
unboundVolumesDelayBinding: map[string][]*v1.PersistentVolume{waitSCWithStorageCapacity.Name: {}},
},
podVolumesByNode: map[string]*PodVolumes{},
},
wantFilterStatus: []*framework.Status{
nil,
},
wantScores: []int64{
100,
},
},
{
name: "prefer node with least allocatable",
pod: makePod("pod-a").withPVCVolume("pvc-dynamic", "").Pod,
nodes: []*v1.Node{
makeNode("node-a").withLabel(nodeLabelKey, "node-a").Node,
makeNode("node-b").withLabel(nodeLabelKey, "node-b").Node,
makeNode("node-c").withLabel(nodeLabelKey, "node-c").Node,
},
pvcs: []*v1.PersistentVolumeClaim{
makePVC("pvc-dynamic", waitSCWithStorageCapacity.Name).withRequestStorage(resource.MustParse("10Gi")).PersistentVolumeClaim,
},
capacities: []*storagev1.CSIStorageCapacity{
makeCapacity("node-a", waitSCWithStorageCapacity.Name, makeNode("node-a").withLabel(nodeLabelKey, "node-a").Node, "100Gi", ""),
makeCapacity("node-b", waitSCWithStorageCapacity.Name, makeNode("node-b").withLabel(nodeLabelKey, "node-b").Node, "20Gi", ""),
makeCapacity("node-c", waitSCWithStorageCapacity.Name, makeNode("node-c").withLabel(nodeLabelKey, "node-c").Node, "10Gi", ""),
},
fts: feature.Features{
EnableVolumeCapacityPriority: true,
EnableStorageCapacityScoring: true,
},
wantPreFilterStatus: nil,
wantStateAfterPreFilter: &stateData{
podVolumeClaims: &PodVolumeClaims{
boundClaims: []*v1.PersistentVolumeClaim{},
unboundClaimsDelayBinding: []*v1.PersistentVolumeClaim{
makePVC("pvc-dynamic", waitSCWithStorageCapacity.Name).withRequestStorage(resource.MustParse("10Gi")).PersistentVolumeClaim,
},
unboundVolumesDelayBinding: map[string][]*v1.PersistentVolume{waitSCWithStorageCapacity.Name: {}},
},
podVolumesByNode: map[string]*PodVolumes{},
},
wantFilterStatus: []*framework.Status{
nil,
nil,
nil,
},
wantScores: []int64{
10,
50,
100,
},
},
{
name: "prefer node with maximum allocatable",
pod: makePod("pod-a").withPVCVolume("pvc-dynamic", "").Pod,
nodes: []*v1.Node{
makeNode("node-a").withLabel(nodeLabelKey, "node-a").Node,
makeNode("node-b").withLabel(nodeLabelKey, "node-b").Node,
makeNode("node-c").withLabel(nodeLabelKey, "node-c").Node,
},
pvcs: []*v1.PersistentVolumeClaim{
makePVC("pvc-dynamic", waitSCWithStorageCapacity.Name).withRequestStorage(resource.MustParse("10Gi")).PersistentVolumeClaim,
},
capacities: []*storagev1.CSIStorageCapacity{
makeCapacity("node-a", waitSCWithStorageCapacity.Name, makeNode("node-a").withLabel(nodeLabelKey, "node-a").Node, "100Gi", ""),
makeCapacity("node-b", waitSCWithStorageCapacity.Name, makeNode("node-b").withLabel(nodeLabelKey, "node-b").Node, "20Gi", ""),
makeCapacity("node-c", waitSCWithStorageCapacity.Name, makeNode("node-c").withLabel(nodeLabelKey, "node-c").Node, "10Gi", ""),
},
fts: feature.Features{
EnableVolumeCapacityPriority: true,
EnableStorageCapacityScoring: true,
},
args: &config.VolumeBindingArgs{
BindTimeoutSeconds: 300,
Shape: []config.UtilizationShapePoint{
{
Utilization: 0,
Score: int32(config.MaxCustomPriorityScore),
},
{
Utilization: 100,
Score: 0,
},
},
},
wantPreFilterStatus: nil,
wantStateAfterPreFilter: &stateData{
podVolumeClaims: &PodVolumeClaims{
boundClaims: []*v1.PersistentVolumeClaim{},
unboundClaimsDelayBinding: []*v1.PersistentVolumeClaim{
makePVC("pvc-dynamic", waitSCWithStorageCapacity.Name).withRequestStorage(resource.MustParse("10Gi")).PersistentVolumeClaim,
},
unboundVolumesDelayBinding: map[string][]*v1.PersistentVolume{waitSCWithStorageCapacity.Name: {}},
},
podVolumesByNode: map[string]*PodVolumes{},
},
wantFilterStatus: []*framework.Status{
nil,
nil,
nil,
},
wantScores: []int64{
90,
50,
0,
},
},
}
for _, item := range table {
@ -751,17 +1017,50 @@ func TestVolumeBinding(t *testing.T) {
}
t.Log("Feed testing data and wait for them to be synced")
client.StorageV1().StorageClasses().Create(ctx, immediateSC, metav1.CreateOptions{})
client.StorageV1().StorageClasses().Create(ctx, waitSC, metav1.CreateOptions{})
client.StorageV1().StorageClasses().Create(ctx, waitHDDSC, metav1.CreateOptions{})
_, err = client.StorageV1().StorageClasses().Create(ctx, immediateSC, metav1.CreateOptions{})
if err != nil {
t.Fatal(err)
}
_, err = client.StorageV1().StorageClasses().Create(ctx, waitSC, metav1.CreateOptions{})
if err != nil {
t.Fatal(err)
}
_, err = client.StorageV1().StorageClasses().Create(ctx, waitHDDSC, metav1.CreateOptions{})
if err != nil {
t.Fatal(err)
}
_, err = client.StorageV1().StorageClasses().Create(ctx, waitSCWithStorageCapacity, metav1.CreateOptions{})
if err != nil {
t.Fatal(err)
}
_, err = client.StorageV1().CSIDrivers().Create(ctx, driverWithStorageCapacity, metav1.CreateOptions{})
if err != nil {
t.Fatal(err)
}
for _, node := range item.nodes {
client.CoreV1().Nodes().Create(ctx, node, metav1.CreateOptions{})
_, err = client.CoreV1().Nodes().Create(ctx, node, metav1.CreateOptions{})
if err != nil {
t.Fatal(err)
}
}
for _, pvc := range item.pvcs {
client.CoreV1().PersistentVolumeClaims(pvc.Namespace).Create(ctx, pvc, metav1.CreateOptions{})
_, err = client.CoreV1().PersistentVolumeClaims(pvc.Namespace).Create(ctx, pvc, metav1.CreateOptions{})
if err != nil {
t.Fatal(err)
}
}
for _, pv := range item.pvs {
client.CoreV1().PersistentVolumes().Create(ctx, pv, metav1.CreateOptions{})
_, err = client.CoreV1().PersistentVolumes().Create(ctx, pv, metav1.CreateOptions{})
if err != nil {
t.Fatal(err)
}
}
for _, capacity := range item.capacities {
_, err = client.StorageV1().CSIStorageCapacities(capacity.Namespace).Create(ctx, capacity, metav1.CreateOptions{})
if err != nil {
t.Fatal(err)
}
}
t.Log("Start informer factory after initialization")

View File

@ -1345,6 +1345,12 @@
lockToDefault: true
preRelease: GA
version: "1.31"
- name: StorageCapacityScoring
versionedSpecs:
- default: false
lockToDefault: false
preRelease: Alpha
version: "1.33"
- name: StorageNamespaceIndex
versionedSpecs:
- default: true