mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-08-04 18:00:08 +00:00
Merge pull request #124003 from carlory/scheduler-rm-non-csi-limit
kube-scheduler remove non-csi volumelimit plugins
This commit is contained in:
commit
0bcbc3b77a
@ -30,10 +30,6 @@ const (
|
|||||||
NodeResourcesFit = "NodeResourcesFit"
|
NodeResourcesFit = "NodeResourcesFit"
|
||||||
NodeUnschedulable = "NodeUnschedulable"
|
NodeUnschedulable = "NodeUnschedulable"
|
||||||
NodeVolumeLimits = "NodeVolumeLimits"
|
NodeVolumeLimits = "NodeVolumeLimits"
|
||||||
AzureDiskLimits = "AzureDiskLimits"
|
|
||||||
CinderLimits = "CinderLimits"
|
|
||||||
EBSLimits = "EBSLimits"
|
|
||||||
GCEPDLimits = "GCEPDLimits"
|
|
||||||
PodTopologySpread = "PodTopologySpread"
|
PodTopologySpread = "PodTopologySpread"
|
||||||
SchedulingGates = "SchedulingGates"
|
SchedulingGates = "SchedulingGates"
|
||||||
TaintToleration = "TaintToleration"
|
TaintToleration = "TaintToleration"
|
||||||
|
@ -35,7 +35,11 @@ import (
|
|||||||
"k8s.io/kubernetes/pkg/scheduler/framework/plugins/feature"
|
"k8s.io/kubernetes/pkg/scheduler/framework/plugins/feature"
|
||||||
"k8s.io/kubernetes/pkg/scheduler/framework/plugins/names"
|
"k8s.io/kubernetes/pkg/scheduler/framework/plugins/names"
|
||||||
"k8s.io/kubernetes/pkg/scheduler/util"
|
"k8s.io/kubernetes/pkg/scheduler/util"
|
||||||
volumeutil "k8s.io/kubernetes/pkg/volume/util"
|
)
|
||||||
|
|
||||||
|
const (
|
||||||
|
// ErrReasonMaxVolumeCountExceeded is used for MaxVolumeCount predicate error.
|
||||||
|
ErrReasonMaxVolumeCountExceeded = "node(s) exceed max volume count"
|
||||||
)
|
)
|
||||||
|
|
||||||
// InTreeToCSITranslator contains methods required to check migratable status
|
// InTreeToCSITranslator contains methods required to check migratable status
|
||||||
@ -173,7 +177,6 @@ func (pl *CSILimits) Filter(ctx context.Context, _ *framework.CycleState, pod *v
|
|||||||
|
|
||||||
logger := klog.FromContext(ctx)
|
logger := klog.FromContext(ctx)
|
||||||
|
|
||||||
// If CSINode doesn't exist, the predicate may read the limits from Node object
|
|
||||||
csiNode, err := pl.csiNodeLister.Get(node.Name)
|
csiNode, err := pl.csiNodeLister.Get(node.Name)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
// TODO: return the error once CSINode is created by default (2 releases)
|
// TODO: return the error once CSINode is created by default (2 releases)
|
||||||
@ -195,7 +198,7 @@ func (pl *CSILimits) Filter(ctx context.Context, _ *framework.CycleState, pod *v
|
|||||||
}
|
}
|
||||||
|
|
||||||
// If the node doesn't have volume limits, the predicate will always be true
|
// If the node doesn't have volume limits, the predicate will always be true
|
||||||
nodeVolumeLimits := getVolumeLimits(nodeInfo, csiNode)
|
nodeVolumeLimits := getVolumeLimits(csiNode)
|
||||||
if len(nodeVolumeLimits) == 0 {
|
if len(nodeVolumeLimits) == 0 {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
@ -208,22 +211,23 @@ func (pl *CSILimits) Filter(ctx context.Context, _ *framework.CycleState, pod *v
|
|||||||
}
|
}
|
||||||
|
|
||||||
attachedVolumeCount := map[string]int{}
|
attachedVolumeCount := map[string]int{}
|
||||||
for volumeUniqueName, volumeLimitKey := range attachedVolumes {
|
for volumeUniqueName, driverName := range attachedVolumes {
|
||||||
// Don't count single volume used in multiple pods more than once
|
// Don't count single volume used in multiple pods more than once
|
||||||
delete(newVolumes, volumeUniqueName)
|
delete(newVolumes, volumeUniqueName)
|
||||||
attachedVolumeCount[volumeLimitKey]++
|
attachedVolumeCount[driverName]++
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Count the new volumes count per driver
|
||||||
newVolumeCount := map[string]int{}
|
newVolumeCount := map[string]int{}
|
||||||
for _, volumeLimitKey := range newVolumes {
|
for _, driverName := range newVolumes {
|
||||||
newVolumeCount[volumeLimitKey]++
|
newVolumeCount[driverName]++
|
||||||
}
|
}
|
||||||
|
|
||||||
for volumeLimitKey, count := range newVolumeCount {
|
for driverName, count := range newVolumeCount {
|
||||||
maxVolumeLimit, ok := nodeVolumeLimits[v1.ResourceName(volumeLimitKey)]
|
maxVolumeLimit, ok := nodeVolumeLimits[driverName]
|
||||||
if ok {
|
if ok {
|
||||||
currentVolumeCount := attachedVolumeCount[volumeLimitKey]
|
currentVolumeCount := attachedVolumeCount[driverName]
|
||||||
logger.V(5).Info("Found plugin volume limits", "node", node.Name, "volumeLimitKey", volumeLimitKey,
|
logger.V(5).Info("Found plugin volume limits", "node", node.Name, "driverName", driverName,
|
||||||
"maxLimits", maxVolumeLimit, "currentVolumeCount", currentVolumeCount, "newVolumeCount", count,
|
"maxLimits", maxVolumeLimit, "currentVolumeCount", currentVolumeCount, "newVolumeCount", count,
|
||||||
"pod", klog.KObj(pod))
|
"pod", klog.KObj(pod))
|
||||||
if currentVolumeCount+count > int(maxVolumeLimit) {
|
if currentVolumeCount+count > int(maxVolumeLimit) {
|
||||||
@ -235,6 +239,9 @@ func (pl *CSILimits) Filter(ctx context.Context, _ *framework.CycleState, pod *v
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// filterAttachableVolumes filters the attachable volumes from the pod and adds them to the result map.
|
||||||
|
// The result map is a map of volumeUniqueName to driver name. The volumeUniqueName is a unique name for
|
||||||
|
// the volume in the format of "driverName/volumeHandle". And driver name is the CSI driver name.
|
||||||
func (pl *CSILimits) filterAttachableVolumes(
|
func (pl *CSILimits) filterAttachableVolumes(
|
||||||
logger klog.Logger, pod *v1.Pod, csiNode *storagev1.CSINode, newPod bool, result map[string]string) error {
|
logger klog.Logger, pod *v1.Pod, csiNode *storagev1.CSINode, newPod bool, result map[string]string) error {
|
||||||
for _, vol := range pod.Spec.Volumes {
|
for _, vol := range pod.Spec.Volumes {
|
||||||
@ -297,8 +304,7 @@ func (pl *CSILimits) filterAttachableVolumes(
|
|||||||
}
|
}
|
||||||
|
|
||||||
volumeUniqueName := fmt.Sprintf("%s/%s", driverName, volumeHandle)
|
volumeUniqueName := fmt.Sprintf("%s/%s", driverName, volumeHandle)
|
||||||
volumeLimitKey := volumeutil.GetCSIAttachLimitKey(driverName)
|
result[volumeUniqueName] = driverName
|
||||||
result[volumeUniqueName] = volumeLimitKey
|
|
||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
@ -339,8 +345,7 @@ func (pl *CSILimits) checkAttachableInlineVolume(logger klog.Logger, vol *v1.Vol
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
volumeUniqueName := fmt.Sprintf("%s/%s", driverName, translatedPV.Spec.PersistentVolumeSource.CSI.VolumeHandle)
|
volumeUniqueName := fmt.Sprintf("%s/%s", driverName, translatedPV.Spec.PersistentVolumeSource.CSI.VolumeHandle)
|
||||||
volumeLimitKey := volumeutil.GetCSIAttachLimitKey(driverName)
|
result[volumeUniqueName] = driverName
|
||||||
result[volumeUniqueName] = volumeLimitKey
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -460,17 +465,17 @@ func NewCSI(_ context.Context, _ runtime.Object, handle framework.Handle, fts fe
|
|||||||
}, nil
|
}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func getVolumeLimits(nodeInfo *framework.NodeInfo, csiNode *storagev1.CSINode) map[v1.ResourceName]int64 {
|
// getVolumeLimits reads the volume limits from CSINode object and returns a map of volume limits.
|
||||||
// TODO: stop getting values from Node object in v1.18
|
// The key is the driver name and the value is the maximum number of volumes that can be attached to the node.
|
||||||
nodeVolumeLimits := volumeLimits(nodeInfo)
|
// If a key is not found in the map, it means there is no limit for the driver on the node.
|
||||||
if csiNode != nil {
|
func getVolumeLimits(csiNode *storagev1.CSINode) map[string]int64 {
|
||||||
for i := range csiNode.Spec.Drivers {
|
nodeVolumeLimits := make(map[string]int64)
|
||||||
d := csiNode.Spec.Drivers[i]
|
if csiNode == nil {
|
||||||
if d.Allocatable != nil && d.Allocatable.Count != nil {
|
return nodeVolumeLimits
|
||||||
// TODO: drop GetCSIAttachLimitKey once we don't get values from Node object (v1.18)
|
}
|
||||||
k := v1.ResourceName(volumeutil.GetCSIAttachLimitKey(d.Name))
|
for _, d := range csiNode.Spec.Drivers {
|
||||||
nodeVolumeLimits[k] = int64(*d.Allocatable.Count)
|
if d.Allocatable != nil && d.Allocatable.Count != nil {
|
||||||
}
|
nodeVolumeLimits[d.Name] = int64(*d.Allocatable.Count)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return nodeVolumeLimits
|
return nodeVolumeLimits
|
||||||
|
@ -26,7 +26,6 @@ import (
|
|||||||
"github.com/google/go-cmp/cmp"
|
"github.com/google/go-cmp/cmp"
|
||||||
v1 "k8s.io/api/core/v1"
|
v1 "k8s.io/api/core/v1"
|
||||||
storagev1 "k8s.io/api/storage/v1"
|
storagev1 "k8s.io/api/storage/v1"
|
||||||
"k8s.io/apimachinery/pkg/api/resource"
|
|
||||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||||
"k8s.io/apimachinery/pkg/util/rand"
|
"k8s.io/apimachinery/pkg/util/rand"
|
||||||
"k8s.io/apimachinery/pkg/util/sets"
|
"k8s.io/apimachinery/pkg/util/sets"
|
||||||
@ -35,7 +34,6 @@ import (
|
|||||||
"k8s.io/kubernetes/pkg/scheduler/framework"
|
"k8s.io/kubernetes/pkg/scheduler/framework"
|
||||||
st "k8s.io/kubernetes/pkg/scheduler/testing"
|
st "k8s.io/kubernetes/pkg/scheduler/testing"
|
||||||
tf "k8s.io/kubernetes/pkg/scheduler/testing/framework"
|
tf "k8s.io/kubernetes/pkg/scheduler/testing/framework"
|
||||||
volumeutil "k8s.io/kubernetes/pkg/volume/util"
|
|
||||||
"k8s.io/kubernetes/test/utils/ktesting"
|
"k8s.io/kubernetes/test/utils/ktesting"
|
||||||
"k8s.io/utils/ptr"
|
"k8s.io/utils/ptr"
|
||||||
)
|
)
|
||||||
@ -51,22 +49,6 @@ var (
|
|||||||
scName = "csi-sc"
|
scName = "csi-sc"
|
||||||
)
|
)
|
||||||
|
|
||||||
// getVolumeLimitKey returns a ResourceName by filter type
|
|
||||||
func getVolumeLimitKey(filterType string) v1.ResourceName {
|
|
||||||
switch filterType {
|
|
||||||
case ebsVolumeFilterType:
|
|
||||||
return v1.ResourceName(volumeutil.EBSVolumeLimitKey)
|
|
||||||
case gcePDVolumeFilterType:
|
|
||||||
return v1.ResourceName(volumeutil.GCEVolumeLimitKey)
|
|
||||||
case azureDiskVolumeFilterType:
|
|
||||||
return v1.ResourceName(volumeutil.AzureVolumeLimitKey)
|
|
||||||
case cinderVolumeFilterType:
|
|
||||||
return v1.ResourceName(volumeutil.CinderVolumeLimitKey)
|
|
||||||
default:
|
|
||||||
return v1.ResourceName(volumeutil.GetCSIAttachLimitKey(filterType))
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestCSILimits(t *testing.T) {
|
func TestCSILimits(t *testing.T) {
|
||||||
runningPod := st.MakePod().PVC("csi-ebs.csi.aws.com-3").Obj()
|
runningPod := st.MakePod().PVC("csi-ebs.csi.aws.com-3").Obj()
|
||||||
pendingVolumePod := st.MakePod().PVC("csi-4").Obj()
|
pendingVolumePod := st.MakePod().PVC("csi-4").Obj()
|
||||||
@ -297,7 +279,7 @@ func TestCSILimits(t *testing.T) {
|
|||||||
maxVols: 4,
|
maxVols: 4,
|
||||||
driverNames: []string{ebsCSIDriverName},
|
driverNames: []string{ebsCSIDriverName},
|
||||||
test: "fits when node volume limit >= new pods CSI volume",
|
test: "fits when node volume limit >= new pods CSI volume",
|
||||||
limitSource: "node",
|
limitSource: "csinode",
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
newPod: csiEBSOneVolPod,
|
newPod: csiEBSOneVolPod,
|
||||||
@ -306,7 +288,7 @@ func TestCSILimits(t *testing.T) {
|
|||||||
maxVols: 2,
|
maxVols: 2,
|
||||||
driverNames: []string{ebsCSIDriverName},
|
driverNames: []string{ebsCSIDriverName},
|
||||||
test: "doesn't when node volume limit <= pods CSI volume",
|
test: "doesn't when node volume limit <= pods CSI volume",
|
||||||
limitSource: "node",
|
limitSource: "csinode",
|
||||||
wantStatus: framework.NewStatus(framework.Unschedulable, ErrReasonMaxVolumeCountExceeded),
|
wantStatus: framework.NewStatus(framework.Unschedulable, ErrReasonMaxVolumeCountExceeded),
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
@ -326,7 +308,7 @@ func TestCSILimits(t *testing.T) {
|
|||||||
maxVols: 2,
|
maxVols: 2,
|
||||||
driverNames: []string{ebsCSIDriverName},
|
driverNames: []string{ebsCSIDriverName},
|
||||||
test: "count pending PVCs towards volume limit <= pods CSI volume",
|
test: "count pending PVCs towards volume limit <= pods CSI volume",
|
||||||
limitSource: "node",
|
limitSource: "csinode",
|
||||||
wantStatus: framework.NewStatus(framework.Unschedulable, ErrReasonMaxVolumeCountExceeded),
|
wantStatus: framework.NewStatus(framework.Unschedulable, ErrReasonMaxVolumeCountExceeded),
|
||||||
},
|
},
|
||||||
// two same pending PVCs should be counted as 1
|
// two same pending PVCs should be counted as 1
|
||||||
@ -337,7 +319,7 @@ func TestCSILimits(t *testing.T) {
|
|||||||
maxVols: 4,
|
maxVols: 4,
|
||||||
driverNames: []string{ebsCSIDriverName},
|
driverNames: []string{ebsCSIDriverName},
|
||||||
test: "count multiple pending pvcs towards volume limit >= pods CSI volume",
|
test: "count multiple pending pvcs towards volume limit >= pods CSI volume",
|
||||||
limitSource: "node",
|
limitSource: "csinode",
|
||||||
},
|
},
|
||||||
// should count PVCs with invalid PV name but valid SC
|
// should count PVCs with invalid PV name but valid SC
|
||||||
{
|
{
|
||||||
@ -347,7 +329,7 @@ func TestCSILimits(t *testing.T) {
|
|||||||
maxVols: 2,
|
maxVols: 2,
|
||||||
driverNames: []string{ebsCSIDriverName},
|
driverNames: []string{ebsCSIDriverName},
|
||||||
test: "should count PVCs with invalid PV name but valid SC",
|
test: "should count PVCs with invalid PV name but valid SC",
|
||||||
limitSource: "node",
|
limitSource: "csinode",
|
||||||
wantStatus: framework.NewStatus(framework.Unschedulable, ErrReasonMaxVolumeCountExceeded),
|
wantStatus: framework.NewStatus(framework.Unschedulable, ErrReasonMaxVolumeCountExceeded),
|
||||||
},
|
},
|
||||||
// don't count a volume which has storageclass missing
|
// don't count a volume which has storageclass missing
|
||||||
@ -358,7 +340,7 @@ func TestCSILimits(t *testing.T) {
|
|||||||
maxVols: 2,
|
maxVols: 2,
|
||||||
driverNames: []string{ebsCSIDriverName},
|
driverNames: []string{ebsCSIDriverName},
|
||||||
test: "don't count pvcs with missing SC towards volume limit",
|
test: "don't count pvcs with missing SC towards volume limit",
|
||||||
limitSource: "node",
|
limitSource: "csinode",
|
||||||
},
|
},
|
||||||
// don't count multiple volume types
|
// don't count multiple volume types
|
||||||
{
|
{
|
||||||
@ -368,7 +350,7 @@ func TestCSILimits(t *testing.T) {
|
|||||||
maxVols: 2,
|
maxVols: 2,
|
||||||
driverNames: []string{ebsCSIDriverName, gceCSIDriverName},
|
driverNames: []string{ebsCSIDriverName, gceCSIDriverName},
|
||||||
test: "count pvcs with the same type towards volume limit",
|
test: "count pvcs with the same type towards volume limit",
|
||||||
limitSource: "node",
|
limitSource: "csinode",
|
||||||
wantStatus: framework.NewStatus(framework.Unschedulable, ErrReasonMaxVolumeCountExceeded),
|
wantStatus: framework.NewStatus(framework.Unschedulable, ErrReasonMaxVolumeCountExceeded),
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
@ -378,7 +360,7 @@ func TestCSILimits(t *testing.T) {
|
|||||||
maxVols: 2,
|
maxVols: 2,
|
||||||
driverNames: []string{ebsCSIDriverName, gceCSIDriverName},
|
driverNames: []string{ebsCSIDriverName, gceCSIDriverName},
|
||||||
test: "don't count pvcs with different type towards volume limit",
|
test: "don't count pvcs with different type towards volume limit",
|
||||||
limitSource: "node",
|
limitSource: "csinode",
|
||||||
},
|
},
|
||||||
// Tests for in-tree volume migration
|
// Tests for in-tree volume migration
|
||||||
{
|
{
|
||||||
@ -396,10 +378,8 @@ func TestCSILimits(t *testing.T) {
|
|||||||
newPod: inTreeInlineVolPod,
|
newPod: inTreeInlineVolPod,
|
||||||
existingPods: []*v1.Pod{inTreeTwoVolPod},
|
existingPods: []*v1.Pod{inTreeTwoVolPod},
|
||||||
filterName: "csi",
|
filterName: "csi",
|
||||||
maxVols: 2,
|
|
||||||
driverNames: []string{csilibplugins.AWSEBSInTreePluginName, ebsCSIDriverName},
|
driverNames: []string{csilibplugins.AWSEBSInTreePluginName, ebsCSIDriverName},
|
||||||
migrationEnabled: true,
|
migrationEnabled: true,
|
||||||
limitSource: "node",
|
|
||||||
test: "nil csi node",
|
test: "nil csi node",
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
@ -494,6 +474,7 @@ func TestCSILimits(t *testing.T) {
|
|||||||
filterName: "csi",
|
filterName: "csi",
|
||||||
ephemeralEnabled: true,
|
ephemeralEnabled: true,
|
||||||
driverNames: []string{ebsCSIDriverName},
|
driverNames: []string{ebsCSIDriverName},
|
||||||
|
limitSource: "csinode-with-no-limit",
|
||||||
test: "ephemeral volume missing",
|
test: "ephemeral volume missing",
|
||||||
wantStatus: framework.NewStatus(framework.UnschedulableAndUnresolvable, `looking up PVC test/abc-xyz: persistentvolumeclaims "abc-xyz" not found`),
|
wantStatus: framework.NewStatus(framework.UnschedulableAndUnresolvable, `looking up PVC test/abc-xyz: persistentvolumeclaims "abc-xyz" not found`),
|
||||||
},
|
},
|
||||||
@ -503,6 +484,7 @@ func TestCSILimits(t *testing.T) {
|
|||||||
ephemeralEnabled: true,
|
ephemeralEnabled: true,
|
||||||
extraClaims: []v1.PersistentVolumeClaim{*conflictingClaim},
|
extraClaims: []v1.PersistentVolumeClaim{*conflictingClaim},
|
||||||
driverNames: []string{ebsCSIDriverName},
|
driverNames: []string{ebsCSIDriverName},
|
||||||
|
limitSource: "csinode-with-no-limit",
|
||||||
test: "ephemeral volume not owned",
|
test: "ephemeral volume not owned",
|
||||||
wantStatus: framework.AsStatus(errors.New("PVC test/abc-xyz was not created for pod test/abc (pod is not owner)")),
|
wantStatus: framework.AsStatus(errors.New("PVC test/abc-xyz was not created for pod test/abc (pod is not owner)")),
|
||||||
},
|
},
|
||||||
@ -512,6 +494,7 @@ func TestCSILimits(t *testing.T) {
|
|||||||
ephemeralEnabled: true,
|
ephemeralEnabled: true,
|
||||||
extraClaims: []v1.PersistentVolumeClaim{*ephemeralClaim},
|
extraClaims: []v1.PersistentVolumeClaim{*ephemeralClaim},
|
||||||
driverNames: []string{ebsCSIDriverName},
|
driverNames: []string{ebsCSIDriverName},
|
||||||
|
limitSource: "csinode-with-no-limit",
|
||||||
test: "ephemeral volume unbound",
|
test: "ephemeral volume unbound",
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
@ -522,7 +505,7 @@ func TestCSILimits(t *testing.T) {
|
|||||||
driverNames: []string{ebsCSIDriverName},
|
driverNames: []string{ebsCSIDriverName},
|
||||||
existingPods: []*v1.Pod{runningPod, csiEBSTwoVolPod},
|
existingPods: []*v1.Pod{runningPod, csiEBSTwoVolPod},
|
||||||
maxVols: 2,
|
maxVols: 2,
|
||||||
limitSource: "node",
|
limitSource: "csinode",
|
||||||
test: "ephemeral doesn't when node volume limit <= pods CSI volume",
|
test: "ephemeral doesn't when node volume limit <= pods CSI volume",
|
||||||
wantStatus: framework.NewStatus(framework.Unschedulable, ErrReasonMaxVolumeCountExceeded),
|
wantStatus: framework.NewStatus(framework.Unschedulable, ErrReasonMaxVolumeCountExceeded),
|
||||||
},
|
},
|
||||||
@ -534,7 +517,7 @@ func TestCSILimits(t *testing.T) {
|
|||||||
driverNames: []string{ebsCSIDriverName},
|
driverNames: []string{ebsCSIDriverName},
|
||||||
existingPods: []*v1.Pod{runningPod, ephemeralTwoVolumePod},
|
existingPods: []*v1.Pod{runningPod, ephemeralTwoVolumePod},
|
||||||
maxVols: 2,
|
maxVols: 2,
|
||||||
limitSource: "node",
|
limitSource: "csinode",
|
||||||
test: "ephemeral doesn't when node volume limit <= pods ephemeral CSI volume",
|
test: "ephemeral doesn't when node volume limit <= pods ephemeral CSI volume",
|
||||||
wantStatus: framework.NewStatus(framework.Unschedulable, ErrReasonMaxVolumeCountExceeded),
|
wantStatus: framework.NewStatus(framework.Unschedulable, ErrReasonMaxVolumeCountExceeded),
|
||||||
},
|
},
|
||||||
@ -546,7 +529,7 @@ func TestCSILimits(t *testing.T) {
|
|||||||
driverNames: []string{ebsCSIDriverName},
|
driverNames: []string{ebsCSIDriverName},
|
||||||
existingPods: []*v1.Pod{runningPod, ephemeralVolumePod, csiEBSTwoVolPod},
|
existingPods: []*v1.Pod{runningPod, ephemeralVolumePod, csiEBSTwoVolPod},
|
||||||
maxVols: 3,
|
maxVols: 3,
|
||||||
limitSource: "node",
|
limitSource: "csinode",
|
||||||
test: "persistent doesn't when node volume limit <= pods ephemeral CSI volume + persistent volume, ephemeral disabled",
|
test: "persistent doesn't when node volume limit <= pods ephemeral CSI volume + persistent volume, ephemeral disabled",
|
||||||
wantStatus: framework.NewStatus(framework.Unschedulable, ErrReasonMaxVolumeCountExceeded),
|
wantStatus: framework.NewStatus(framework.Unschedulable, ErrReasonMaxVolumeCountExceeded),
|
||||||
},
|
},
|
||||||
@ -558,7 +541,7 @@ func TestCSILimits(t *testing.T) {
|
|||||||
driverNames: []string{ebsCSIDriverName},
|
driverNames: []string{ebsCSIDriverName},
|
||||||
existingPods: []*v1.Pod{runningPod, ephemeralVolumePod, csiEBSTwoVolPod},
|
existingPods: []*v1.Pod{runningPod, ephemeralVolumePod, csiEBSTwoVolPod},
|
||||||
maxVols: 3,
|
maxVols: 3,
|
||||||
limitSource: "node",
|
limitSource: "csinode",
|
||||||
test: "persistent doesn't when node volume limit <= pods ephemeral CSI volume + persistent volume",
|
test: "persistent doesn't when node volume limit <= pods ephemeral CSI volume + persistent volume",
|
||||||
wantStatus: framework.NewStatus(framework.Unschedulable, ErrReasonMaxVolumeCountExceeded),
|
wantStatus: framework.NewStatus(framework.Unschedulable, ErrReasonMaxVolumeCountExceeded),
|
||||||
},
|
},
|
||||||
@ -569,7 +552,8 @@ func TestCSILimits(t *testing.T) {
|
|||||||
extraClaims: []v1.PersistentVolumeClaim{*ephemeralClaim},
|
extraClaims: []v1.PersistentVolumeClaim{*ephemeralClaim},
|
||||||
driverNames: []string{ebsCSIDriverName},
|
driverNames: []string{ebsCSIDriverName},
|
||||||
existingPods: []*v1.Pod{runningPod, ephemeralVolumePod, csiEBSTwoVolPod},
|
existingPods: []*v1.Pod{runningPod, ephemeralVolumePod, csiEBSTwoVolPod},
|
||||||
maxVols: 4,
|
maxVols: 5,
|
||||||
|
limitSource: "csinode",
|
||||||
test: "persistent okay when node volume limit > pods ephemeral CSI volume + persistent volume",
|
test: "persistent okay when node volume limit > pods ephemeral CSI volume + persistent volume",
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
@ -578,7 +562,7 @@ func TestCSILimits(t *testing.T) {
|
|||||||
maxVols: 2,
|
maxVols: 2,
|
||||||
driverNames: []string{ebsCSIDriverName},
|
driverNames: []string{ebsCSIDriverName},
|
||||||
test: "skip Filter when the pod only uses secrets and configmaps",
|
test: "skip Filter when the pod only uses secrets and configmaps",
|
||||||
limitSource: "node",
|
limitSource: "csinode",
|
||||||
wantPreFilterStatus: framework.NewStatus(framework.Skip),
|
wantPreFilterStatus: framework.NewStatus(framework.Skip),
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
@ -587,13 +571,14 @@ func TestCSILimits(t *testing.T) {
|
|||||||
maxVols: 2,
|
maxVols: 2,
|
||||||
driverNames: []string{ebsCSIDriverName},
|
driverNames: []string{ebsCSIDriverName},
|
||||||
test: "don't skip Filter when the pod has pvcs",
|
test: "don't skip Filter when the pod has pvcs",
|
||||||
limitSource: "node",
|
limitSource: "csinode",
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
newPod: ephemeralPodWithConfigmapAndSecret,
|
newPod: ephemeralPodWithConfigmapAndSecret,
|
||||||
filterName: "csi",
|
filterName: "csi",
|
||||||
ephemeralEnabled: true,
|
ephemeralEnabled: true,
|
||||||
driverNames: []string{ebsCSIDriverName},
|
driverNames: []string{ebsCSIDriverName},
|
||||||
|
limitSource: "csinode-with-no-limit",
|
||||||
test: "don't skip Filter when the pod has ephemeral volumes",
|
test: "don't skip Filter when the pod has ephemeral volumes",
|
||||||
wantStatus: framework.NewStatus(framework.UnschedulableAndUnresolvable, `looking up PVC test/abc-xyz: persistentvolumeclaims "abc-xyz" not found`),
|
wantStatus: framework.NewStatus(framework.UnschedulableAndUnresolvable, `looking up PVC test/abc-xyz: persistentvolumeclaims "abc-xyz" not found`),
|
||||||
},
|
},
|
||||||
@ -898,12 +883,6 @@ func getNodeWithPodAndVolumeLimits(limitSource string, pods []*v1.Pod, limit int
|
|||||||
}
|
}
|
||||||
var csiNode *storagev1.CSINode
|
var csiNode *storagev1.CSINode
|
||||||
|
|
||||||
addLimitToNode := func() {
|
|
||||||
for _, driver := range driverNames {
|
|
||||||
node.Status.Allocatable[getVolumeLimitKey(driver)] = *resource.NewQuantity(int64(limit), resource.DecimalSI)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
initCSINode := func() {
|
initCSINode := func() {
|
||||||
csiNode = &storagev1.CSINode{
|
csiNode = &storagev1.CSINode{
|
||||||
ObjectMeta: metav1.ObjectMeta{Name: "node-for-max-pd-test-1"},
|
ObjectMeta: metav1.ObjectMeta{Name: "node-for-max-pd-test-1"},
|
||||||
@ -930,13 +909,8 @@ func getNodeWithPodAndVolumeLimits(limitSource string, pods []*v1.Pod, limit int
|
|||||||
}
|
}
|
||||||
|
|
||||||
switch limitSource {
|
switch limitSource {
|
||||||
case "node":
|
|
||||||
addLimitToNode()
|
|
||||||
case "csinode":
|
case "csinode":
|
||||||
addDriversCSINode(true)
|
addDriversCSINode(true)
|
||||||
case "both":
|
|
||||||
addLimitToNode()
|
|
||||||
addDriversCSINode(true)
|
|
||||||
case "csinode-with-no-limit":
|
case "csinode-with-no-limit":
|
||||||
addDriversCSINode(false)
|
addDriversCSINode(false)
|
||||||
case "no-csi-driver":
|
case "no-csi-driver":
|
||||||
|
@ -1,567 +0,0 @@
|
|||||||
/*
|
|
||||||
Copyright 2019 The Kubernetes Authors.
|
|
||||||
|
|
||||||
Licensed under the Apache License, Version 2.0 (the "License");
|
|
||||||
you may not use this file except in compliance with the License.
|
|
||||||
You may obtain a copy of the License at
|
|
||||||
|
|
||||||
http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
|
|
||||||
Unless required by applicable law or agreed to in writing, software
|
|
||||||
distributed under the License is distributed on an "AS IS" BASIS,
|
|
||||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
||||||
See the License for the specific language governing permissions and
|
|
||||||
limitations under the License.
|
|
||||||
*/
|
|
||||||
|
|
||||||
package nodevolumelimits
|
|
||||||
|
|
||||||
import (
|
|
||||||
"context"
|
|
||||||
"errors"
|
|
||||||
"fmt"
|
|
||||||
"os"
|
|
||||||
"regexp"
|
|
||||||
"strconv"
|
|
||||||
|
|
||||||
v1 "k8s.io/api/core/v1"
|
|
||||||
storage "k8s.io/api/storage/v1"
|
|
||||||
apierrors "k8s.io/apimachinery/pkg/api/errors"
|
|
||||||
"k8s.io/apimachinery/pkg/runtime"
|
|
||||||
"k8s.io/apimachinery/pkg/util/rand"
|
|
||||||
"k8s.io/apimachinery/pkg/util/sets"
|
|
||||||
"k8s.io/client-go/informers"
|
|
||||||
corelisters "k8s.io/client-go/listers/core/v1"
|
|
||||||
storagelisters "k8s.io/client-go/listers/storage/v1"
|
|
||||||
"k8s.io/component-helpers/storage/ephemeral"
|
|
||||||
csilibplugins "k8s.io/csi-translation-lib/plugins"
|
|
||||||
"k8s.io/klog/v2"
|
|
||||||
"k8s.io/kubernetes/pkg/scheduler/framework"
|
|
||||||
"k8s.io/kubernetes/pkg/scheduler/framework/plugins/feature"
|
|
||||||
"k8s.io/kubernetes/pkg/scheduler/framework/plugins/names"
|
|
||||||
volumeutil "k8s.io/kubernetes/pkg/volume/util"
|
|
||||||
)
|
|
||||||
|
|
||||||
const (
|
|
||||||
// defaultMaxGCEPDVolumes defines the maximum number of PD Volumes for GCE.
|
|
||||||
// GCE instances can have up to 16 PD volumes attached.
|
|
||||||
defaultMaxGCEPDVolumes = 16
|
|
||||||
// defaultMaxAzureDiskVolumes defines the maximum number of PD Volumes for Azure.
|
|
||||||
// Larger Azure VMs can actually have much more disks attached.
|
|
||||||
// TODO We should determine the max based on VM size
|
|
||||||
defaultMaxAzureDiskVolumes = 16
|
|
||||||
|
|
||||||
// ebsVolumeFilterType defines the filter name for ebsVolumeFilter.
|
|
||||||
ebsVolumeFilterType = "EBS"
|
|
||||||
// gcePDVolumeFilterType defines the filter name for gcePDVolumeFilter.
|
|
||||||
gcePDVolumeFilterType = "GCE"
|
|
||||||
// azureDiskVolumeFilterType defines the filter name for azureDiskVolumeFilter.
|
|
||||||
azureDiskVolumeFilterType = "AzureDisk"
|
|
||||||
// cinderVolumeFilterType defines the filter name for cinderVolumeFilter.
|
|
||||||
cinderVolumeFilterType = "Cinder"
|
|
||||||
|
|
||||||
// ErrReasonMaxVolumeCountExceeded is used for MaxVolumeCount predicate error.
|
|
||||||
ErrReasonMaxVolumeCountExceeded = "node(s) exceed max volume count"
|
|
||||||
|
|
||||||
// KubeMaxPDVols defines the maximum number of PD Volumes per kubelet.
|
|
||||||
KubeMaxPDVols = "KUBE_MAX_PD_VOLS"
|
|
||||||
)
|
|
||||||
|
|
||||||
// AzureDiskName is the name of the plugin used in the plugin registry and configurations.
|
|
||||||
const AzureDiskName = names.AzureDiskLimits
|
|
||||||
|
|
||||||
// NewAzureDisk returns function that initializes a new plugin and returns it.
|
|
||||||
func NewAzureDisk(ctx context.Context, _ runtime.Object, handle framework.Handle, fts feature.Features) (framework.Plugin, error) {
|
|
||||||
informerFactory := handle.SharedInformerFactory()
|
|
||||||
return newNonCSILimitsWithInformerFactory(ctx, azureDiskVolumeFilterType, informerFactory, fts), nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// CinderName is the name of the plugin used in the plugin registry and configurations.
|
|
||||||
const CinderName = names.CinderLimits
|
|
||||||
|
|
||||||
// NewCinder returns function that initializes a new plugin and returns it.
|
|
||||||
func NewCinder(ctx context.Context, _ runtime.Object, handle framework.Handle, fts feature.Features) (framework.Plugin, error) {
|
|
||||||
informerFactory := handle.SharedInformerFactory()
|
|
||||||
return newNonCSILimitsWithInformerFactory(ctx, cinderVolumeFilterType, informerFactory, fts), nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// EBSName is the name of the plugin used in the plugin registry and configurations.
|
|
||||||
const EBSName = names.EBSLimits
|
|
||||||
|
|
||||||
// NewEBS returns function that initializes a new plugin and returns it.
|
|
||||||
func NewEBS(ctx context.Context, _ runtime.Object, handle framework.Handle, fts feature.Features) (framework.Plugin, error) {
|
|
||||||
informerFactory := handle.SharedInformerFactory()
|
|
||||||
return newNonCSILimitsWithInformerFactory(ctx, ebsVolumeFilterType, informerFactory, fts), nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// GCEPDName is the name of the plugin used in the plugin registry and configurations.
|
|
||||||
const GCEPDName = names.GCEPDLimits
|
|
||||||
|
|
||||||
// NewGCEPD returns function that initializes a new plugin and returns it.
|
|
||||||
func NewGCEPD(ctx context.Context, _ runtime.Object, handle framework.Handle, fts feature.Features) (framework.Plugin, error) {
|
|
||||||
informerFactory := handle.SharedInformerFactory()
|
|
||||||
return newNonCSILimitsWithInformerFactory(ctx, gcePDVolumeFilterType, informerFactory, fts), nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// nonCSILimits contains information to check the max number of volumes for a plugin.
|
|
||||||
type nonCSILimits struct {
|
|
||||||
name string
|
|
||||||
filter VolumeFilter
|
|
||||||
volumeLimitKey v1.ResourceName
|
|
||||||
maxVolumeFunc func(node *v1.Node) int
|
|
||||||
csiNodeLister storagelisters.CSINodeLister
|
|
||||||
pvLister corelisters.PersistentVolumeLister
|
|
||||||
pvcLister corelisters.PersistentVolumeClaimLister
|
|
||||||
scLister storagelisters.StorageClassLister
|
|
||||||
|
|
||||||
// The string below is generated randomly during the struct's initialization.
|
|
||||||
// It is used to prefix volumeID generated inside the predicate() method to
|
|
||||||
// avoid conflicts with any real volume.
|
|
||||||
randomVolumeIDPrefix string
|
|
||||||
}
|
|
||||||
|
|
||||||
var _ framework.PreFilterPlugin = &nonCSILimits{}
|
|
||||||
var _ framework.FilterPlugin = &nonCSILimits{}
|
|
||||||
var _ framework.EnqueueExtensions = &nonCSILimits{}
|
|
||||||
|
|
||||||
// newNonCSILimitsWithInformerFactory returns a plugin with filter name and informer factory.
|
|
||||||
func newNonCSILimitsWithInformerFactory(
|
|
||||||
ctx context.Context,
|
|
||||||
filterName string,
|
|
||||||
informerFactory informers.SharedInformerFactory,
|
|
||||||
fts feature.Features,
|
|
||||||
) framework.Plugin {
|
|
||||||
pvLister := informerFactory.Core().V1().PersistentVolumes().Lister()
|
|
||||||
pvcLister := informerFactory.Core().V1().PersistentVolumeClaims().Lister()
|
|
||||||
csiNodesLister := informerFactory.Storage().V1().CSINodes().Lister()
|
|
||||||
scLister := informerFactory.Storage().V1().StorageClasses().Lister()
|
|
||||||
|
|
||||||
return newNonCSILimits(ctx, filterName, csiNodesLister, scLister, pvLister, pvcLister, fts)
|
|
||||||
}
|
|
||||||
|
|
||||||
// newNonCSILimits creates a plugin which evaluates whether a pod can fit based on the
|
|
||||||
// number of volumes which match a filter that it requests, and those that are already present.
|
|
||||||
//
|
|
||||||
// DEPRECATED
|
|
||||||
// All cloudprovider specific predicates defined here are deprecated in favour of CSI volume limit
|
|
||||||
// predicate - MaxCSIVolumeCountPred.
|
|
||||||
//
|
|
||||||
// The predicate looks for both volumes used directly, as well as PVC volumes that are backed by relevant volume
|
|
||||||
// types, counts the number of unique volumes, and rejects the new pod if it would place the total count over
|
|
||||||
// the maximum.
|
|
||||||
func newNonCSILimits(
|
|
||||||
ctx context.Context,
|
|
||||||
filterName string,
|
|
||||||
csiNodeLister storagelisters.CSINodeLister,
|
|
||||||
scLister storagelisters.StorageClassLister,
|
|
||||||
pvLister corelisters.PersistentVolumeLister,
|
|
||||||
pvcLister corelisters.PersistentVolumeClaimLister,
|
|
||||||
fts feature.Features,
|
|
||||||
) framework.Plugin {
|
|
||||||
logger := klog.FromContext(ctx)
|
|
||||||
var filter VolumeFilter
|
|
||||||
var volumeLimitKey v1.ResourceName
|
|
||||||
var name string
|
|
||||||
|
|
||||||
switch filterName {
|
|
||||||
case ebsVolumeFilterType:
|
|
||||||
name = EBSName
|
|
||||||
filter = ebsVolumeFilter
|
|
||||||
volumeLimitKey = v1.ResourceName(volumeutil.EBSVolumeLimitKey)
|
|
||||||
case gcePDVolumeFilterType:
|
|
||||||
name = GCEPDName
|
|
||||||
filter = gcePDVolumeFilter
|
|
||||||
volumeLimitKey = v1.ResourceName(volumeutil.GCEVolumeLimitKey)
|
|
||||||
case azureDiskVolumeFilterType:
|
|
||||||
name = AzureDiskName
|
|
||||||
filter = azureDiskVolumeFilter
|
|
||||||
volumeLimitKey = v1.ResourceName(volumeutil.AzureVolumeLimitKey)
|
|
||||||
case cinderVolumeFilterType:
|
|
||||||
name = CinderName
|
|
||||||
filter = cinderVolumeFilter
|
|
||||||
volumeLimitKey = v1.ResourceName(volumeutil.CinderVolumeLimitKey)
|
|
||||||
default:
|
|
||||||
logger.Error(errors.New("wrong filterName"), "Cannot create nonCSILimits plugin")
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
pl := &nonCSILimits{
|
|
||||||
name: name,
|
|
||||||
filter: filter,
|
|
||||||
volumeLimitKey: volumeLimitKey,
|
|
||||||
maxVolumeFunc: getMaxVolumeFunc(logger, filterName),
|
|
||||||
csiNodeLister: csiNodeLister,
|
|
||||||
pvLister: pvLister,
|
|
||||||
pvcLister: pvcLister,
|
|
||||||
scLister: scLister,
|
|
||||||
randomVolumeIDPrefix: rand.String(32),
|
|
||||||
}
|
|
||||||
|
|
||||||
return pl
|
|
||||||
}
|
|
||||||
|
|
||||||
// Name returns name of the plugin. It is used in logs, etc.
|
|
||||||
func (pl *nonCSILimits) Name() string {
|
|
||||||
return pl.name
|
|
||||||
}
|
|
||||||
|
|
||||||
// EventsToRegister returns the possible events that may make a Pod
|
|
||||||
// failed by this plugin schedulable.
|
|
||||||
func (pl *nonCSILimits) EventsToRegister(_ context.Context) ([]framework.ClusterEventWithHint, error) {
|
|
||||||
return []framework.ClusterEventWithHint{
|
|
||||||
{Event: framework.ClusterEvent{Resource: framework.Node, ActionType: framework.Add}},
|
|
||||||
{Event: framework.ClusterEvent{Resource: framework.Pod, ActionType: framework.Delete}},
|
|
||||||
{Event: framework.ClusterEvent{Resource: framework.PersistentVolumeClaim, ActionType: framework.Add}},
|
|
||||||
}, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// PreFilter invoked at the prefilter extension point
|
|
||||||
//
|
|
||||||
// If the pod haven't those types of volumes, we'll skip the Filter phase
|
|
||||||
func (pl *nonCSILimits) PreFilter(ctx context.Context, _ *framework.CycleState, pod *v1.Pod) (*framework.PreFilterResult, *framework.Status) {
|
|
||||||
volumes := pod.Spec.Volumes
|
|
||||||
for i := range volumes {
|
|
||||||
vol := &volumes[i]
|
|
||||||
_, ok := pl.filter.FilterVolume(vol)
|
|
||||||
if ok || vol.PersistentVolumeClaim != nil || vol.Ephemeral != nil {
|
|
||||||
return nil, nil
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
return nil, framework.NewStatus(framework.Skip)
|
|
||||||
}
|
|
||||||
|
|
||||||
// PreFilterExtensions returns prefilter extensions, pod add and remove.
|
|
||||||
func (pl *nonCSILimits) PreFilterExtensions() framework.PreFilterExtensions {
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// Filter invoked at the filter extension point.
|
|
||||||
func (pl *nonCSILimits) Filter(ctx context.Context, _ *framework.CycleState, pod *v1.Pod, nodeInfo *framework.NodeInfo) *framework.Status {
|
|
||||||
// If a pod doesn't have any volume attached to it, the predicate will always be true.
|
|
||||||
// Thus we make a fast path for it, to avoid unnecessary computations in this case.
|
|
||||||
if len(pod.Spec.Volumes) == 0 {
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
logger := klog.FromContext(ctx)
|
|
||||||
newVolumes := sets.New[string]()
|
|
||||||
if err := pl.filterVolumes(logger, pod, true /* new pod */, newVolumes); err != nil {
|
|
||||||
if apierrors.IsNotFound(err) {
|
|
||||||
// PVC is not found. This Pod will never be schedulable until PVC is created.
|
|
||||||
return framework.NewStatus(framework.UnschedulableAndUnresolvable, err.Error())
|
|
||||||
}
|
|
||||||
return framework.AsStatus(err)
|
|
||||||
}
|
|
||||||
|
|
||||||
// quick return
|
|
||||||
if len(newVolumes) == 0 {
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
node := nodeInfo.Node()
|
|
||||||
|
|
||||||
var csiNode *storage.CSINode
|
|
||||||
var err error
|
|
||||||
if pl.csiNodeLister != nil {
|
|
||||||
csiNode, err = pl.csiNodeLister.Get(node.Name)
|
|
||||||
if err != nil {
|
|
||||||
// we don't fail here because the CSINode object is only necessary
|
|
||||||
// for determining whether the migration is enabled or not
|
|
||||||
logger.V(5).Info("Could not get a CSINode object for the node", "node", klog.KObj(node), "err", err)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// if a plugin has been migrated to a CSI driver, defer to the CSI predicate
|
|
||||||
if pl.filter.IsMigrated(csiNode) {
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// count unique volumes
|
|
||||||
existingVolumes := sets.New[string]()
|
|
||||||
for _, existingPod := range nodeInfo.Pods {
|
|
||||||
if err := pl.filterVolumes(logger, existingPod.Pod, false /* existing pod */, existingVolumes); err != nil {
|
|
||||||
return framework.AsStatus(err)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
numExistingVolumes := len(existingVolumes)
|
|
||||||
|
|
||||||
// filter out already-mounted volumes
|
|
||||||
for k := range existingVolumes {
|
|
||||||
delete(newVolumes, k)
|
|
||||||
}
|
|
||||||
|
|
||||||
numNewVolumes := len(newVolumes)
|
|
||||||
maxAttachLimit := pl.maxVolumeFunc(node)
|
|
||||||
volumeLimits := volumeLimits(nodeInfo)
|
|
||||||
if maxAttachLimitFromAllocatable, ok := volumeLimits[pl.volumeLimitKey]; ok {
|
|
||||||
maxAttachLimit = int(maxAttachLimitFromAllocatable)
|
|
||||||
}
|
|
||||||
|
|
||||||
if numExistingVolumes+numNewVolumes > maxAttachLimit {
|
|
||||||
return framework.NewStatus(framework.Unschedulable, ErrReasonMaxVolumeCountExceeded)
|
|
||||||
}
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (pl *nonCSILimits) filterVolumes(logger klog.Logger, pod *v1.Pod, newPod bool, filteredVolumes sets.Set[string]) error {
|
|
||||||
volumes := pod.Spec.Volumes
|
|
||||||
for i := range volumes {
|
|
||||||
vol := &volumes[i]
|
|
||||||
if id, ok := pl.filter.FilterVolume(vol); ok {
|
|
||||||
filteredVolumes.Insert(id)
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
|
|
||||||
pvcName := ""
|
|
||||||
isEphemeral := false
|
|
||||||
switch {
|
|
||||||
case vol.PersistentVolumeClaim != nil:
|
|
||||||
pvcName = vol.PersistentVolumeClaim.ClaimName
|
|
||||||
case vol.Ephemeral != nil:
|
|
||||||
// Generic ephemeral inline volumes also use a PVC,
|
|
||||||
// just with a computed name and certain ownership.
|
|
||||||
// That is checked below once the pvc object is
|
|
||||||
// retrieved.
|
|
||||||
pvcName = ephemeral.VolumeClaimName(pod, vol)
|
|
||||||
isEphemeral = true
|
|
||||||
default:
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
if pvcName == "" {
|
|
||||||
return fmt.Errorf("PersistentVolumeClaim had no name")
|
|
||||||
}
|
|
||||||
|
|
||||||
// Until we know real ID of the volume use namespace/pvcName as substitute
|
|
||||||
// with a random prefix (calculated and stored inside 'c' during initialization)
|
|
||||||
// to avoid conflicts with existing volume IDs.
|
|
||||||
pvID := fmt.Sprintf("%s-%s/%s", pl.randomVolumeIDPrefix, pod.Namespace, pvcName)
|
|
||||||
|
|
||||||
pvc, err := pl.pvcLister.PersistentVolumeClaims(pod.Namespace).Get(pvcName)
|
|
||||||
if err != nil {
|
|
||||||
if newPod {
|
|
||||||
// The PVC is required to proceed with
|
|
||||||
// scheduling of a new pod because it cannot
|
|
||||||
// run without it. Bail out immediately.
|
|
||||||
return fmt.Errorf("looking up PVC %s/%s: %w", pod.Namespace, pvcName, err)
|
|
||||||
}
|
|
||||||
// If the PVC is invalid, we don't count the volume because
|
|
||||||
// there's no guarantee that it belongs to the running predicate.
|
|
||||||
logger.V(4).Info("Unable to look up PVC info, assuming PVC doesn't match predicate when counting limits", "pod", klog.KObj(pod), "PVC", klog.KRef(pod.Namespace, pvcName), "err", err)
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
|
|
||||||
// The PVC for an ephemeral volume must be owned by the pod.
|
|
||||||
if isEphemeral {
|
|
||||||
if err := ephemeral.VolumeIsForPod(pod, pvc); err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
pvName := pvc.Spec.VolumeName
|
|
||||||
if pvName == "" {
|
|
||||||
// PVC is not bound. It was either deleted and created again or
|
|
||||||
// it was forcefully unbound by admin. The pod can still use the
|
|
||||||
// original PV where it was bound to, so we count the volume if
|
|
||||||
// it belongs to the running predicate.
|
|
||||||
if pl.matchProvisioner(pvc) {
|
|
||||||
logger.V(4).Info("PVC is not bound, assuming PVC matches predicate when counting limits", "pod", klog.KObj(pod), "PVC", klog.KRef(pod.Namespace, pvcName))
|
|
||||||
filteredVolumes.Insert(pvID)
|
|
||||||
}
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
|
|
||||||
pv, err := pl.pvLister.Get(pvName)
|
|
||||||
if err != nil {
|
|
||||||
// If the PV is invalid and PVC belongs to the running predicate,
|
|
||||||
// log the error and count the PV towards the PV limit.
|
|
||||||
if pl.matchProvisioner(pvc) {
|
|
||||||
logger.V(4).Info("Unable to look up PV, assuming PV matches predicate when counting limits", "pod", klog.KObj(pod), "PVC", klog.KRef(pod.Namespace, pvcName), "PV", klog.KRef("", pvName), "err", err)
|
|
||||||
filteredVolumes.Insert(pvID)
|
|
||||||
}
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
|
|
||||||
if id, ok := pl.filter.FilterPersistentVolume(pv); ok {
|
|
||||||
filteredVolumes.Insert(id)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// matchProvisioner helps identify if the given PVC belongs to the running predicate.
|
|
||||||
func (pl *nonCSILimits) matchProvisioner(pvc *v1.PersistentVolumeClaim) bool {
|
|
||||||
if pvc.Spec.StorageClassName == nil {
|
|
||||||
return false
|
|
||||||
}
|
|
||||||
|
|
||||||
storageClass, err := pl.scLister.Get(*pvc.Spec.StorageClassName)
|
|
||||||
if err != nil || storageClass == nil {
|
|
||||||
return false
|
|
||||||
}
|
|
||||||
|
|
||||||
return pl.filter.MatchProvisioner(storageClass)
|
|
||||||
}
|
|
||||||
|
|
||||||
// getMaxVolLimitFromEnv checks the max PD volumes environment variable, otherwise returning a default value.
|
|
||||||
func getMaxVolLimitFromEnv(logger klog.Logger) int {
|
|
||||||
if rawMaxVols := os.Getenv(KubeMaxPDVols); rawMaxVols != "" {
|
|
||||||
if parsedMaxVols, err := strconv.Atoi(rawMaxVols); err != nil {
|
|
||||||
logger.Error(err, "Unable to parse maximum PD volumes value, using default")
|
|
||||||
} else if parsedMaxVols <= 0 {
|
|
||||||
logger.Error(errors.New("maximum PD volumes is negative"), "Unable to parse maximum PD volumes value, using default")
|
|
||||||
} else {
|
|
||||||
return parsedMaxVols
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
return -1
|
|
||||||
}
|
|
||||||
|
|
||||||
// VolumeFilter contains information on how to filter PD Volumes when checking PD Volume caps.
|
|
||||||
type VolumeFilter struct {
|
|
||||||
// Filter normal volumes
|
|
||||||
FilterVolume func(vol *v1.Volume) (id string, relevant bool)
|
|
||||||
FilterPersistentVolume func(pv *v1.PersistentVolume) (id string, relevant bool)
|
|
||||||
// MatchProvisioner evaluates if the StorageClass provisioner matches the running predicate
|
|
||||||
MatchProvisioner func(sc *storage.StorageClass) (relevant bool)
|
|
||||||
// IsMigrated returns a boolean specifying whether the plugin is migrated to a CSI driver
|
|
||||||
IsMigrated func(csiNode *storage.CSINode) bool
|
|
||||||
}
|
|
||||||
|
|
||||||
// ebsVolumeFilter is a VolumeFilter for filtering AWS ElasticBlockStore Volumes.
|
|
||||||
var ebsVolumeFilter = VolumeFilter{
|
|
||||||
FilterVolume: func(vol *v1.Volume) (string, bool) {
|
|
||||||
if vol.AWSElasticBlockStore != nil {
|
|
||||||
return vol.AWSElasticBlockStore.VolumeID, true
|
|
||||||
}
|
|
||||||
return "", false
|
|
||||||
},
|
|
||||||
|
|
||||||
FilterPersistentVolume: func(pv *v1.PersistentVolume) (string, bool) {
|
|
||||||
if pv.Spec.AWSElasticBlockStore != nil {
|
|
||||||
return pv.Spec.AWSElasticBlockStore.VolumeID, true
|
|
||||||
}
|
|
||||||
return "", false
|
|
||||||
},
|
|
||||||
|
|
||||||
MatchProvisioner: func(sc *storage.StorageClass) bool {
|
|
||||||
return sc.Provisioner == csilibplugins.AWSEBSInTreePluginName
|
|
||||||
},
|
|
||||||
|
|
||||||
IsMigrated: func(csiNode *storage.CSINode) bool {
|
|
||||||
return isCSIMigrationOn(csiNode, csilibplugins.AWSEBSInTreePluginName)
|
|
||||||
},
|
|
||||||
}
|
|
||||||
|
|
||||||
// gcePDVolumeFilter is a VolumeFilter for filtering gce PersistentDisk Volumes.
|
|
||||||
var gcePDVolumeFilter = VolumeFilter{
|
|
||||||
FilterVolume: func(vol *v1.Volume) (string, bool) {
|
|
||||||
if vol.GCEPersistentDisk != nil {
|
|
||||||
return vol.GCEPersistentDisk.PDName, true
|
|
||||||
}
|
|
||||||
return "", false
|
|
||||||
},
|
|
||||||
|
|
||||||
FilterPersistentVolume: func(pv *v1.PersistentVolume) (string, bool) {
|
|
||||||
if pv.Spec.GCEPersistentDisk != nil {
|
|
||||||
return pv.Spec.GCEPersistentDisk.PDName, true
|
|
||||||
}
|
|
||||||
return "", false
|
|
||||||
},
|
|
||||||
|
|
||||||
MatchProvisioner: func(sc *storage.StorageClass) bool {
|
|
||||||
return sc.Provisioner == csilibplugins.GCEPDInTreePluginName
|
|
||||||
},
|
|
||||||
|
|
||||||
IsMigrated: func(csiNode *storage.CSINode) bool {
|
|
||||||
return isCSIMigrationOn(csiNode, csilibplugins.GCEPDInTreePluginName)
|
|
||||||
},
|
|
||||||
}
|
|
||||||
|
|
||||||
// azureDiskVolumeFilter is a VolumeFilter for filtering azure Disk Volumes.
|
|
||||||
var azureDiskVolumeFilter = VolumeFilter{
|
|
||||||
FilterVolume: func(vol *v1.Volume) (string, bool) {
|
|
||||||
if vol.AzureDisk != nil {
|
|
||||||
return vol.AzureDisk.DiskName, true
|
|
||||||
}
|
|
||||||
return "", false
|
|
||||||
},
|
|
||||||
|
|
||||||
FilterPersistentVolume: func(pv *v1.PersistentVolume) (string, bool) {
|
|
||||||
if pv.Spec.AzureDisk != nil {
|
|
||||||
return pv.Spec.AzureDisk.DiskName, true
|
|
||||||
}
|
|
||||||
return "", false
|
|
||||||
},
|
|
||||||
|
|
||||||
MatchProvisioner: func(sc *storage.StorageClass) bool {
|
|
||||||
return sc.Provisioner == csilibplugins.AzureDiskInTreePluginName
|
|
||||||
},
|
|
||||||
|
|
||||||
IsMigrated: func(csiNode *storage.CSINode) bool {
|
|
||||||
return isCSIMigrationOn(csiNode, csilibplugins.AzureDiskInTreePluginName)
|
|
||||||
},
|
|
||||||
}
|
|
||||||
|
|
||||||
// cinderVolumeFilter is a VolumeFilter for filtering cinder Volumes.
|
|
||||||
// It will be deprecated once Openstack cloudprovider has been removed from in-tree.
|
|
||||||
var cinderVolumeFilter = VolumeFilter{
|
|
||||||
FilterVolume: func(vol *v1.Volume) (string, bool) {
|
|
||||||
if vol.Cinder != nil {
|
|
||||||
return vol.Cinder.VolumeID, true
|
|
||||||
}
|
|
||||||
return "", false
|
|
||||||
},
|
|
||||||
|
|
||||||
FilterPersistentVolume: func(pv *v1.PersistentVolume) (string, bool) {
|
|
||||||
if pv.Spec.Cinder != nil {
|
|
||||||
return pv.Spec.Cinder.VolumeID, true
|
|
||||||
}
|
|
||||||
return "", false
|
|
||||||
},
|
|
||||||
|
|
||||||
MatchProvisioner: func(sc *storage.StorageClass) bool {
|
|
||||||
return sc.Provisioner == csilibplugins.CinderInTreePluginName
|
|
||||||
},
|
|
||||||
|
|
||||||
IsMigrated: func(csiNode *storage.CSINode) bool {
|
|
||||||
return isCSIMigrationOn(csiNode, csilibplugins.CinderInTreePluginName)
|
|
||||||
},
|
|
||||||
}
|
|
||||||
|
|
||||||
func getMaxVolumeFunc(logger klog.Logger, filterName string) func(node *v1.Node) int {
|
|
||||||
return func(node *v1.Node) int {
|
|
||||||
maxVolumesFromEnv := getMaxVolLimitFromEnv(logger)
|
|
||||||
if maxVolumesFromEnv > 0 {
|
|
||||||
return maxVolumesFromEnv
|
|
||||||
}
|
|
||||||
|
|
||||||
var nodeInstanceType string
|
|
||||||
for k, v := range node.ObjectMeta.Labels {
|
|
||||||
if k == v1.LabelInstanceType || k == v1.LabelInstanceTypeStable {
|
|
||||||
nodeInstanceType = v
|
|
||||||
break
|
|
||||||
}
|
|
||||||
}
|
|
||||||
switch filterName {
|
|
||||||
case ebsVolumeFilterType:
|
|
||||||
return getMaxEBSVolume(nodeInstanceType)
|
|
||||||
case gcePDVolumeFilterType:
|
|
||||||
return defaultMaxGCEPDVolumes
|
|
||||||
case azureDiskVolumeFilterType:
|
|
||||||
return defaultMaxAzureDiskVolumes
|
|
||||||
case cinderVolumeFilterType:
|
|
||||||
return volumeutil.DefaultMaxCinderVolumes
|
|
||||||
default:
|
|
||||||
return -1
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func getMaxEBSVolume(nodeInstanceType string) int {
|
|
||||||
if ok, _ := regexp.MatchString(volumeutil.EBSNitroLimitRegex, nodeInstanceType); ok {
|
|
||||||
return volumeutil.DefaultMaxEBSNitroVolumeLimit
|
|
||||||
}
|
|
||||||
return volumeutil.DefaultMaxEBSVolumes
|
|
||||||
}
|
|
File diff suppressed because it is too large
Load Diff
@ -24,9 +24,7 @@ import (
|
|||||||
"k8s.io/apimachinery/pkg/util/sets"
|
"k8s.io/apimachinery/pkg/util/sets"
|
||||||
utilfeature "k8s.io/apiserver/pkg/util/feature"
|
utilfeature "k8s.io/apiserver/pkg/util/feature"
|
||||||
csilibplugins "k8s.io/csi-translation-lib/plugins"
|
csilibplugins "k8s.io/csi-translation-lib/plugins"
|
||||||
v1helper "k8s.io/kubernetes/pkg/apis/core/v1/helper"
|
|
||||||
"k8s.io/kubernetes/pkg/features"
|
"k8s.io/kubernetes/pkg/features"
|
||||||
"k8s.io/kubernetes/pkg/scheduler/framework"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
// isCSIMigrationOn returns a boolean value indicating whether
|
// isCSIMigrationOn returns a boolean value indicating whether
|
||||||
@ -73,14 +71,3 @@ func isCSIMigrationOn(csiNode *storagev1.CSINode, pluginName string) bool {
|
|||||||
|
|
||||||
return mpaSet.Has(pluginName)
|
return mpaSet.Has(pluginName)
|
||||||
}
|
}
|
||||||
|
|
||||||
// volumeLimits returns volume limits associated with the node.
|
|
||||||
func volumeLimits(n *framework.NodeInfo) map[v1.ResourceName]int64 {
|
|
||||||
volumeLimits := map[v1.ResourceName]int64{}
|
|
||||||
for k, v := range n.Allocatable.ScalarResources {
|
|
||||||
if v1helper.IsAttachableVolumeResourceName(k) {
|
|
||||||
volumeLimits[k] = v
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return volumeLimits
|
|
||||||
}
|
|
||||||
|
@ -71,10 +71,6 @@ func NewInTreeRegistry() runtime.Registry {
|
|||||||
volumerestrictions.Name: runtime.FactoryAdapter(fts, volumerestrictions.New),
|
volumerestrictions.Name: runtime.FactoryAdapter(fts, volumerestrictions.New),
|
||||||
volumezone.Name: volumezone.New,
|
volumezone.Name: volumezone.New,
|
||||||
nodevolumelimits.CSIName: runtime.FactoryAdapter(fts, nodevolumelimits.NewCSI),
|
nodevolumelimits.CSIName: runtime.FactoryAdapter(fts, nodevolumelimits.NewCSI),
|
||||||
nodevolumelimits.EBSName: runtime.FactoryAdapter(fts, nodevolumelimits.NewEBS),
|
|
||||||
nodevolumelimits.GCEPDName: runtime.FactoryAdapter(fts, nodevolumelimits.NewGCEPD),
|
|
||||||
nodevolumelimits.AzureDiskName: runtime.FactoryAdapter(fts, nodevolumelimits.NewAzureDisk),
|
|
||||||
nodevolumelimits.CinderName: runtime.FactoryAdapter(fts, nodevolumelimits.NewCinder),
|
|
||||||
interpodaffinity.Name: interpodaffinity.New,
|
interpodaffinity.Name: interpodaffinity.New,
|
||||||
queuesort.Name: queuesort.New,
|
queuesort.Name: queuesort.New,
|
||||||
defaultbinder.Name: defaultbinder.New,
|
defaultbinder.Name: defaultbinder.New,
|
||||||
|
@ -139,10 +139,6 @@ var (
|
|||||||
names.NodeResourcesFit,
|
names.NodeResourcesFit,
|
||||||
names.NodeUnschedulable,
|
names.NodeUnschedulable,
|
||||||
names.NodeVolumeLimits,
|
names.NodeVolumeLimits,
|
||||||
names.AzureDiskLimits,
|
|
||||||
names.CinderLimits,
|
|
||||||
names.EBSLimits,
|
|
||||||
names.GCEPDLimits,
|
|
||||||
names.PodTopologySpread,
|
names.PodTopologySpread,
|
||||||
names.SchedulingGates,
|
names.SchedulingGates,
|
||||||
names.TaintToleration,
|
names.TaintToleration,
|
||||||
|
@ -39,7 +39,6 @@ import (
|
|||||||
clientset "k8s.io/client-go/kubernetes"
|
clientset "k8s.io/client-go/kubernetes"
|
||||||
"k8s.io/client-go/util/workqueue"
|
"k8s.io/client-go/util/workqueue"
|
||||||
"k8s.io/kubernetes/pkg/controller/volume/persistentvolume"
|
"k8s.io/kubernetes/pkg/controller/volume/persistentvolume"
|
||||||
"k8s.io/kubernetes/pkg/scheduler/framework/plugins/nodevolumelimits"
|
|
||||||
"k8s.io/kubernetes/pkg/volume"
|
"k8s.io/kubernetes/pkg/volume"
|
||||||
volumetest "k8s.io/kubernetes/pkg/volume/testing"
|
volumetest "k8s.io/kubernetes/pkg/volume/testing"
|
||||||
testutil "k8s.io/kubernetes/test/integration/util"
|
testutil "k8s.io/kubernetes/test/integration/util"
|
||||||
@ -425,7 +424,7 @@ func testVolumeBindingStress(t *testing.T, schedulerResyncPeriod time.Duration,
|
|||||||
|
|
||||||
// Set max volume limit to the number of PVCs the test will create
|
// Set max volume limit to the number of PVCs the test will create
|
||||||
// TODO: remove when max volume limit allows setting through storageclass
|
// TODO: remove when max volume limit allows setting through storageclass
|
||||||
t.Setenv(nodevolumelimits.KubeMaxPDVols, fmt.Sprintf("%v", podLimit*volsPerPod))
|
t.Setenv("KUBE_MAX_PD_VOLS", fmt.Sprintf("%v", podLimit*volsPerPod))
|
||||||
|
|
||||||
scName := &classWait
|
scName := &classWait
|
||||||
if dynamic {
|
if dynamic {
|
||||||
|
Loading…
Reference in New Issue
Block a user