From 1e26115df5398236e71087f29ed6e43165e12ac7 Mon Sep 17 00:00:00 2001 From: Patrick Ohly Date: Tue, 23 Mar 2021 10:23:06 +0100 Subject: [PATCH 1/2] consider ephemeral volumes for host path and node limits check When adding the ephemeral volume feature, the special case for PersistentVolumeClaim volume sources in kubelet's host path and node limits checks was overlooked. An ephemeral volume source is another way of referencing a claim and has to be treated the same way. --- pkg/kubelet/kubelet_pods.go | 36 ++-- pkg/kubelet/kubelet_pods_test.go | 60 ++++-- .../framework/plugins/nodevolumelimits/csi.go | 59 ++++-- .../plugins/nodevolumelimits/csi_test.go | 199 +++++++++++++++++- .../plugins/nodevolumelimits/non_csi.go | 139 +++++++----- .../plugins/nodevolumelimits/non_csi_test.go | 112 +++++++++- pkg/scheduler/framework/plugins/registry.go | 10 +- test/e2e/storage/csi_mock_volume.go | 143 +++++++++++-- 8 files changed, 629 insertions(+), 129 deletions(-) diff --git a/pkg/kubelet/kubelet_pods.go b/pkg/kubelet/kubelet_pods.go index a4257c1f885..4b5e59f48f7 100644 --- a/pkg/kubelet/kubelet_pods.go +++ b/pkg/kubelet/kubelet_pods.go @@ -2018,21 +2018,31 @@ func hasHostNamespace(pod *v1.Pod) bool { // hasHostMountPVC returns true if a PVC is referencing a HostPath volume. func (kl *Kubelet) hasHostMountPVC(pod *v1.Pod) bool { for _, volume := range pod.Spec.Volumes { - if volume.PersistentVolumeClaim != nil { - pvc, err := kl.kubeClient.CoreV1().PersistentVolumeClaims(pod.Namespace).Get(context.TODO(), volume.PersistentVolumeClaim.ClaimName, metav1.GetOptions{}) - if err != nil { - klog.InfoS("Unable to retrieve pvc", "pvc", klog.KRef(pod.Namespace, volume.PersistentVolumeClaim.ClaimName), "err", err) + pvcName := "" + switch { + case volume.PersistentVolumeClaim != nil: + pvcName = volume.PersistentVolumeClaim.ClaimName + case volume.Ephemeral != nil: + if !utilfeature.DefaultFeatureGate.Enabled(features.GenericEphemeralVolume) { continue } - if pvc != nil { - referencedVolume, err := kl.kubeClient.CoreV1().PersistentVolumes().Get(context.TODO(), pvc.Spec.VolumeName, metav1.GetOptions{}) - if err != nil { - klog.InfoS("Unable to retrieve pv", "pvName", pvc.Spec.VolumeName, "err", err) - continue - } - if referencedVolume != nil && referencedVolume.Spec.HostPath != nil { - return true - } + pvcName = pod.Name + "-" + volume.Name + default: + continue + } + pvc, err := kl.kubeClient.CoreV1().PersistentVolumeClaims(pod.Namespace).Get(context.TODO(), pvcName, metav1.GetOptions{}) + if err != nil { + klog.InfoS("Unable to retrieve pvc", "pvc", klog.KRef(pod.Namespace, pvcName), "err", err) + continue + } + if pvc != nil { + referencedVolume, err := kl.kubeClient.CoreV1().PersistentVolumes().Get(context.TODO(), pvc.Spec.VolumeName, metav1.GetOptions{}) + if err != nil { + klog.InfoS("Unable to retrieve pv", "pvName", pvc.Spec.VolumeName, "err", err) + continue + } + if referencedVolume != nil && referencedVolume.Spec.HostPath != nil { + return true } } } diff --git a/pkg/kubelet/kubelet_pods_test.go b/pkg/kubelet/kubelet_pods_test.go index 0b24accdfa2..655cb04d115 100644 --- a/pkg/kubelet/kubelet_pods_test.go +++ b/pkg/kubelet/kubelet_pods_test.go @@ -38,8 +38,10 @@ import ( "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/util/diff" + utilfeature "k8s.io/apiserver/pkg/util/feature" core "k8s.io/client-go/testing" "k8s.io/client-go/tools/record" + featuregatetesting "k8s.io/component-base/featuregate/testing" netutils "k8s.io/utils/net" // TODO: remove this import if @@ -47,6 +49,7 @@ import ( // to "v1"? _ "k8s.io/kubernetes/pkg/apis/core/install" + "k8s.io/kubernetes/pkg/features" kubecontainer "k8s.io/kubernetes/pkg/kubelet/container" containertest "k8s.io/kubernetes/pkg/kubelet/container/testing" "k8s.io/kubernetes/pkg/kubelet/cri/streaming/portforward" @@ -2886,13 +2889,16 @@ func TestGetPortForward(t *testing.T) { } func TestHasHostMountPVC(t *testing.T) { - tests := map[string]struct { - pvError error - pvcError error - expected bool - podHasPVC bool - pvcIsHostPath bool - }{ + type testcase struct { + pvError error + pvcError error + expected bool + podHasPVC bool + pvcIsHostPath bool + podHasEphemeral bool + ephemeralEnabled bool + } + tests := map[string]testcase{ "no pvc": {podHasPVC: false, expected: false}, "error fetching pvc": { podHasPVC: true, @@ -2909,6 +2915,18 @@ func TestHasHostMountPVC(t *testing.T) { pvcIsHostPath: true, expected: true, }, + "enabled ephemeral host path": { + podHasEphemeral: true, + pvcIsHostPath: true, + ephemeralEnabled: true, + expected: true, + }, + "disabled ephemeral host path": { + podHasEphemeral: true, + pvcIsHostPath: true, + ephemeralEnabled: false, + expected: false, + }, "non host path pvc": { podHasPVC: true, pvcIsHostPath: false, @@ -2916,7 +2934,8 @@ func TestHasHostMountPVC(t *testing.T) { }, } - for k, v := range tests { + run := func(t *testing.T, v testcase) { + defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.GenericEphemeralVolume, v.ephemeralEnabled)() testKubelet := newTestKubelet(t, false) defer testKubelet.Cleanup() pod := &v1.Pod{ @@ -2935,13 +2954,23 @@ func TestHasHostMountPVC(t *testing.T) { }, }, } + } - if v.pvcIsHostPath { - volumeToReturn.Spec.PersistentVolumeSource = v1.PersistentVolumeSource{ - HostPath: &v1.HostPathVolumeSource{}, - } + if v.podHasEphemeral { + pod.Spec.Volumes = []v1.Volume{ + { + Name: "xyz", + VolumeSource: v1.VolumeSource{ + Ephemeral: &v1.EphemeralVolumeSource{}, + }, + }, } + } + if (v.podHasPVC || v.podHasEphemeral) && v.pvcIsHostPath { + volumeToReturn.Spec.PersistentVolumeSource = v1.PersistentVolumeSource{ + HostPath: &v1.HostPathVolumeSource{}, + } } testKubelet.fakeKubeClient.AddReactor("get", "persistentvolumeclaims", func(action core.Action) (bool, runtime.Object, error) { @@ -2957,9 +2986,14 @@ func TestHasHostMountPVC(t *testing.T) { actual := testKubelet.kubelet.hasHostMountPVC(pod) if actual != v.expected { - t.Errorf("%s expected %t but got %t", k, v.expected, actual) + t.Errorf("expected %t but got %t", v.expected, actual) } + } + for k, v := range tests { + t.Run(k, func(t *testing.T) { + run(t, v) + }) } } diff --git a/pkg/scheduler/framework/plugins/nodevolumelimits/csi.go b/pkg/scheduler/framework/plugins/nodevolumelimits/csi.go index 2b4a752d16c..e2bb3fc5cac 100644 --- a/pkg/scheduler/framework/plugins/nodevolumelimits/csi.go +++ b/pkg/scheduler/framework/plugins/nodevolumelimits/csi.go @@ -22,6 +22,7 @@ import ( v1 "k8s.io/api/core/v1" storagev1 "k8s.io/api/storage/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/util/rand" corelisters "k8s.io/client-go/listers/core/v1" @@ -30,6 +31,7 @@ import ( csitrans "k8s.io/csi-translation-lib" "k8s.io/klog/v2" "k8s.io/kubernetes/pkg/scheduler/framework" + "k8s.io/kubernetes/pkg/scheduler/framework/plugins/feature" "k8s.io/kubernetes/pkg/scheduler/framework/plugins/names" volumeutil "k8s.io/kubernetes/pkg/volume/util" ) @@ -54,6 +56,8 @@ type CSILimits struct { randomVolumeIDPrefix string translator InTreeToCSITranslator + + enableGenericEphemeralVolume bool } var _ framework.FilterPlugin = &CSILimits{} @@ -96,7 +100,7 @@ func (pl *CSILimits) Filter(ctx context.Context, _ *framework.CycleState, pod *v } newVolumes := make(map[string]string) - if err := pl.filterAttachableVolumes(csiNode, pod.Spec.Volumes, pod.Namespace, newVolumes); err != nil { + if err := pl.filterAttachableVolumes(pod, csiNode, true /* new pod */, newVolumes); err != nil { return framework.AsStatus(err) } @@ -113,7 +117,7 @@ func (pl *CSILimits) Filter(ctx context.Context, _ *framework.CycleState, pod *v attachedVolumes := make(map[string]string) for _, existingPod := range nodeInfo.Pods { - if err := pl.filterAttachableVolumes(csiNode, existingPod.Pod.Spec.Volumes, existingPod.Pod.Namespace, attachedVolumes); err != nil { + if err := pl.filterAttachableVolumes(existingPod.Pod, csiNode, false /* existing pod */, attachedVolumes); err != nil { return framework.AsStatus(err) } } @@ -144,25 +148,47 @@ func (pl *CSILimits) Filter(ctx context.Context, _ *framework.CycleState, pod *v } func (pl *CSILimits) filterAttachableVolumes( - csiNode *storagev1.CSINode, volumes []v1.Volume, namespace string, result map[string]string) error { - for _, vol := range volumes { - // CSI volumes can only be used as persistent volumes - if vol.PersistentVolumeClaim == nil { + pod *v1.Pod, csiNode *storagev1.CSINode, newPod bool, result map[string]string) error { + for _, vol := range pod.Spec.Volumes { + // CSI volumes can only be used through a PVC. + pvcName := "" + ephemeral := false + switch { + case vol.PersistentVolumeClaim != nil: + pvcName = vol.PersistentVolumeClaim.ClaimName + case vol.Ephemeral != nil: + if newPod && !pl.enableGenericEphemeralVolume { + return fmt.Errorf( + "volume %s is a generic ephemeral volume, but that feature is disabled in kube-scheduler", + vol.Name, + ) + } + // Generic ephemeral inline volumes also use a PVC, + // just with a computed name and certain ownership. + // That is checked below once the pvc object is + // retrieved. + pvcName = pod.Name + "-" + vol.Name + ephemeral = true + default: continue } - pvcName := vol.PersistentVolumeClaim.ClaimName if pvcName == "" { return fmt.Errorf("PersistentVolumeClaim had no name") } - pvc, err := pl.pvcLister.PersistentVolumeClaims(namespace).Get(pvcName) + pvc, err := pl.pvcLister.PersistentVolumeClaims(pod.Namespace).Get(pvcName) if err != nil { - klog.V(5).InfoS("Unable to look up PVC info", "PVC", fmt.Sprintf("%s/%s", namespace, pvcName)) + klog.V(5).InfoS("Unable to look up PVC info", "PVC", fmt.Sprintf("%s/%s", pod.Namespace, pvcName)) continue } + // The PVC for an ephemeral volume must be owned by the pod. + if ephemeral && !metav1.IsControlledBy(pvc, pod) { + return fmt.Errorf("PVC %s/%s is not owned by pod", pod.Namespace, pvcName) + } + driverName, volumeHandle := pl.getCSIDriverInfo(csiNode, pvc) if driverName == "" || volumeHandle == "" { klog.V(5).Info("Could not find a CSI driver name or volume handle, not counting volume") @@ -276,7 +302,7 @@ func (pl *CSILimits) getCSIDriverInfoFromSC(csiNode *storagev1.CSINode, pvc *v1. } // NewCSI initializes a new plugin and returns it. -func NewCSI(_ runtime.Object, handle framework.Handle) (framework.Plugin, error) { +func NewCSI(_ runtime.Object, handle framework.Handle, fts feature.Features) (framework.Plugin, error) { informerFactory := handle.SharedInformerFactory() pvLister := informerFactory.Core().V1().PersistentVolumes().Lister() pvcLister := informerFactory.Core().V1().PersistentVolumeClaims().Lister() @@ -284,12 +310,13 @@ func NewCSI(_ runtime.Object, handle framework.Handle) (framework.Plugin, error) scLister := informerFactory.Storage().V1().StorageClasses().Lister() return &CSILimits{ - csiNodeLister: csiNodesLister, - pvLister: pvLister, - pvcLister: pvcLister, - scLister: scLister, - randomVolumeIDPrefix: rand.String(32), - translator: csitrans.New(), + csiNodeLister: csiNodesLister, + pvLister: pvLister, + pvcLister: pvcLister, + scLister: scLister, + randomVolumeIDPrefix: rand.String(32), + translator: csitrans.New(), + enableGenericEphemeralVolume: fts.EnableGenericEphemeralVolume, }, nil } diff --git a/pkg/scheduler/framework/plugins/nodevolumelimits/csi_test.go b/pkg/scheduler/framework/plugins/nodevolumelimits/csi_test.go index b92620acacc..ef3f7782c9a 100644 --- a/pkg/scheduler/framework/plugins/nodevolumelimits/csi_test.go +++ b/pkg/scheduler/framework/plugins/nodevolumelimits/csi_test.go @@ -47,6 +47,10 @@ const ( hostpathInTreePluginName = "kubernetes.io/hostpath" ) +var ( + scName = "csi-sc" +) + // getVolumeLimitKey returns a ResourceName by filter type func getVolumeLimitKey(filterType string) v1.ResourceName { switch filterType { @@ -236,14 +240,112 @@ func TestCSILimits(t *testing.T) { }, } + ephemeralVolumePod := &v1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: "test", + Name: "abc", + UID: "12345", + }, + Spec: v1.PodSpec{ + Volumes: []v1.Volume{ + { + Name: "xyz", + VolumeSource: v1.VolumeSource{ + Ephemeral: &v1.EphemeralVolumeSource{}, + }, + }, + }, + }, + } + controller := true + ephemeralClaim := &v1.PersistentVolumeClaim{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: ephemeralVolumePod.Namespace, + Name: ephemeralVolumePod.Name + "-" + ephemeralVolumePod.Spec.Volumes[0].Name, + OwnerReferences: []metav1.OwnerReference{ + { + Kind: "Pod", + Name: ephemeralVolumePod.Name, + UID: ephemeralVolumePod.UID, + Controller: &controller, + }, + }, + }, + Spec: v1.PersistentVolumeClaimSpec{ + StorageClassName: &scName, + }, + } + conflictingClaim := ephemeralClaim.DeepCopy() + conflictingClaim.OwnerReferences = nil + + ephemeralTwoVolumePod := &v1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: "test", + Name: "abc", + UID: "12345II", + }, + Spec: v1.PodSpec{ + Volumes: []v1.Volume{ + { + Name: "x", + VolumeSource: v1.VolumeSource{ + Ephemeral: &v1.EphemeralVolumeSource{}, + }, + }, + { + Name: "y", + VolumeSource: v1.VolumeSource{ + Ephemeral: &v1.EphemeralVolumeSource{}, + }, + }, + }, + }, + } + ephemeralClaimX := &v1.PersistentVolumeClaim{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: ephemeralTwoVolumePod.Namespace, + Name: ephemeralTwoVolumePod.Name + "-" + ephemeralTwoVolumePod.Spec.Volumes[0].Name, + OwnerReferences: []metav1.OwnerReference{ + { + Kind: "Pod", + Name: ephemeralTwoVolumePod.Name, + UID: ephemeralTwoVolumePod.UID, + Controller: &controller, + }, + }, + }, + Spec: v1.PersistentVolumeClaimSpec{ + StorageClassName: &scName, + }, + } + ephemeralClaimY := &v1.PersistentVolumeClaim{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: ephemeralTwoVolumePod.Namespace, + Name: ephemeralTwoVolumePod.Name + "-" + ephemeralTwoVolumePod.Spec.Volumes[1].Name, + OwnerReferences: []metav1.OwnerReference{ + { + Kind: "Pod", + Name: ephemeralTwoVolumePod.Name, + UID: ephemeralTwoVolumePod.UID, + Controller: &controller, + }, + }, + }, + Spec: v1.PersistentVolumeClaimSpec{ + StorageClassName: &scName, + }, + } + tests := []struct { newPod *v1.Pod existingPods []*v1.Pod + extraClaims []v1.PersistentVolumeClaim filterName string maxVols int driverNames []string test string migrationEnabled bool + ephemeralEnabled bool limitSource string wantStatus *framework.Status }{ @@ -443,6 +545,96 @@ func TestCSILimits(t *testing.T) { limitSource: "csinode", test: "should not count in-tree and count csi volumes if migration is disabled (when scheduling in-tree volumes)", }, + // ephemeral volumes + { + newPod: ephemeralVolumePod, + filterName: "csi", + driverNames: []string{ebsCSIDriverName}, + test: "ephemeral volume feature disabled", + wantStatus: framework.NewStatus(framework.Error, "volume xyz is a generic ephemeral volume, but that feature is disabled in kube-scheduler"), + }, + { + newPod: ephemeralVolumePod, + filterName: "csi", + ephemeralEnabled: true, + driverNames: []string{ebsCSIDriverName}, + test: "ephemeral volume missing", + }, + { + newPod: ephemeralVolumePod, + filterName: "csi", + ephemeralEnabled: true, + extraClaims: []v1.PersistentVolumeClaim{*conflictingClaim}, + driverNames: []string{ebsCSIDriverName}, + test: "ephemeral volume not owned", + wantStatus: framework.NewStatus(framework.Error, "PVC test/abc-xyz is not owned by pod"), + }, + { + newPod: ephemeralVolumePod, + filterName: "csi", + ephemeralEnabled: true, + extraClaims: []v1.PersistentVolumeClaim{*ephemeralClaim}, + driverNames: []string{ebsCSIDriverName}, + test: "ephemeral volume unbound", + }, + { + newPod: ephemeralVolumePod, + filterName: "csi", + ephemeralEnabled: true, + extraClaims: []v1.PersistentVolumeClaim{*ephemeralClaim}, + driverNames: []string{ebsCSIDriverName}, + existingPods: []*v1.Pod{runningPod, csiEBSTwoVolPod}, + maxVols: 2, + limitSource: "node", + test: "ephemeral doesn't when node volume limit <= pods CSI volume", + wantStatus: framework.NewStatus(framework.Unschedulable, ErrReasonMaxVolumeCountExceeded), + }, + { + newPod: csiEBSOneVolPod, + filterName: "csi", + ephemeralEnabled: true, + extraClaims: []v1.PersistentVolumeClaim{*ephemeralClaimX, *ephemeralClaimY}, + driverNames: []string{ebsCSIDriverName}, + existingPods: []*v1.Pod{runningPod, ephemeralTwoVolumePod}, + maxVols: 2, + limitSource: "node", + test: "ephemeral doesn't when node volume limit <= pods ephemeral CSI volume", + wantStatus: framework.NewStatus(framework.Unschedulable, ErrReasonMaxVolumeCountExceeded), + }, + { + newPod: csiEBSOneVolPod, + filterName: "csi", + ephemeralEnabled: false, + extraClaims: []v1.PersistentVolumeClaim{*ephemeralClaim}, + driverNames: []string{ebsCSIDriverName}, + existingPods: []*v1.Pod{runningPod, ephemeralVolumePod, csiEBSTwoVolPod}, + maxVols: 3, + limitSource: "node", + test: "persistent doesn't when node volume limit <= pods ephemeral CSI volume + persistent volume, ephemeral disabled", + wantStatus: framework.NewStatus(framework.Unschedulable, ErrReasonMaxVolumeCountExceeded), + }, + { + newPod: csiEBSOneVolPod, + filterName: "csi", + ephemeralEnabled: true, + extraClaims: []v1.PersistentVolumeClaim{*ephemeralClaim}, + driverNames: []string{ebsCSIDriverName}, + existingPods: []*v1.Pod{runningPod, ephemeralVolumePod, csiEBSTwoVolPod}, + maxVols: 3, + limitSource: "node", + test: "persistent doesn't when node volume limit <= pods ephemeral CSI volume + persistent volume", + wantStatus: framework.NewStatus(framework.Unschedulable, ErrReasonMaxVolumeCountExceeded), + }, + { + newPod: csiEBSOneVolPod, + filterName: "csi", + ephemeralEnabled: true, + extraClaims: []v1.PersistentVolumeClaim{*ephemeralClaim}, + driverNames: []string{ebsCSIDriverName}, + existingPods: []*v1.Pod{runningPod, ephemeralVolumePod, csiEBSTwoVolPod}, + maxVols: 4, + test: "persistent okay when node volume limit > pods ephemeral CSI volume + persistent volume", + }, } // running attachable predicate tests with feature gate and limit present on nodes @@ -457,14 +649,15 @@ func TestCSILimits(t *testing.T) { defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.CSIMigration, false)() defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.CSIMigrationAWS, false)() } - p := &CSILimits{ csiNodeLister: getFakeCSINodeLister(csiNode), pvLister: getFakeCSIPVLister(test.filterName, test.driverNames...), - pvcLister: getFakeCSIPVCLister(test.filterName, "csi-sc", test.driverNames...), - scLister: getFakeCSIStorageClassLister("csi-sc", test.driverNames[0]), + pvcLister: append(getFakeCSIPVCLister(test.filterName, scName, test.driverNames...), test.extraClaims...), + scLister: getFakeCSIStorageClassLister(scName, test.driverNames[0]), randomVolumeIDPrefix: rand.String(32), translator: csitrans.New(), + + enableGenericEphemeralVolume: test.ephemeralEnabled, } gotStatus := p.Filter(context.Background(), nil, test.newPod, node) if !reflect.DeepEqual(gotStatus, test.wantStatus) { diff --git a/pkg/scheduler/framework/plugins/nodevolumelimits/non_csi.go b/pkg/scheduler/framework/plugins/nodevolumelimits/non_csi.go index 7910c1f7bec..63c95049d82 100644 --- a/pkg/scheduler/framework/plugins/nodevolumelimits/non_csi.go +++ b/pkg/scheduler/framework/plugins/nodevolumelimits/non_csi.go @@ -26,6 +26,7 @@ import ( v1 "k8s.io/api/core/v1" storage "k8s.io/api/storage/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/util/rand" "k8s.io/apimachinery/pkg/util/sets" @@ -35,6 +36,7 @@ import ( csilibplugins "k8s.io/csi-translation-lib/plugins" "k8s.io/klog/v2" "k8s.io/kubernetes/pkg/scheduler/framework" + "k8s.io/kubernetes/pkg/scheduler/framework/plugins/feature" "k8s.io/kubernetes/pkg/scheduler/framework/plugins/names" volumeutil "k8s.io/kubernetes/pkg/volume/util" ) @@ -68,36 +70,36 @@ const ( const AzureDiskName = names.AzureDiskLimits // NewAzureDisk returns function that initializes a new plugin and returns it. -func NewAzureDisk(_ runtime.Object, handle framework.Handle) (framework.Plugin, error) { +func NewAzureDisk(_ runtime.Object, handle framework.Handle, fts feature.Features) (framework.Plugin, error) { informerFactory := handle.SharedInformerFactory() - return newNonCSILimitsWithInformerFactory(azureDiskVolumeFilterType, informerFactory), nil + return newNonCSILimitsWithInformerFactory(azureDiskVolumeFilterType, informerFactory, fts), nil } // CinderName is the name of the plugin used in the plugin registry and configurations. const CinderName = names.CinderLimits // NewCinder returns function that initializes a new plugin and returns it. -func NewCinder(_ runtime.Object, handle framework.Handle) (framework.Plugin, error) { +func NewCinder(_ runtime.Object, handle framework.Handle, fts feature.Features) (framework.Plugin, error) { informerFactory := handle.SharedInformerFactory() - return newNonCSILimitsWithInformerFactory(cinderVolumeFilterType, informerFactory), nil + return newNonCSILimitsWithInformerFactory(cinderVolumeFilterType, informerFactory, fts), nil } // EBSName is the name of the plugin used in the plugin registry and configurations. const EBSName = names.EBSLimits // NewEBS returns function that initializes a new plugin and returns it. -func NewEBS(_ runtime.Object, handle framework.Handle) (framework.Plugin, error) { +func NewEBS(_ runtime.Object, handle framework.Handle, fts feature.Features) (framework.Plugin, error) { informerFactory := handle.SharedInformerFactory() - return newNonCSILimitsWithInformerFactory(ebsVolumeFilterType, informerFactory), nil + return newNonCSILimitsWithInformerFactory(ebsVolumeFilterType, informerFactory, fts), nil } // GCEPDName is the name of the plugin used in the plugin registry and configurations. const GCEPDName = names.GCEPDLimits // NewGCEPD returns function that initializes a new plugin and returns it. -func NewGCEPD(_ runtime.Object, handle framework.Handle) (framework.Plugin, error) { +func NewGCEPD(_ runtime.Object, handle framework.Handle, fts feature.Features) (framework.Plugin, error) { informerFactory := handle.SharedInformerFactory() - return newNonCSILimitsWithInformerFactory(gcePDVolumeFilterType, informerFactory), nil + return newNonCSILimitsWithInformerFactory(gcePDVolumeFilterType, informerFactory, fts), nil } // nonCSILimits contains information to check the max number of volumes for a plugin. @@ -115,6 +117,8 @@ type nonCSILimits struct { // It is used to prefix volumeID generated inside the predicate() method to // avoid conflicts with any real volume. randomVolumeIDPrefix string + + enableGenericEphemeralVolume bool } var _ framework.FilterPlugin = &nonCSILimits{} @@ -124,13 +128,14 @@ var _ framework.EnqueueExtensions = &nonCSILimits{} func newNonCSILimitsWithInformerFactory( filterName string, informerFactory informers.SharedInformerFactory, + fts feature.Features, ) framework.Plugin { pvLister := informerFactory.Core().V1().PersistentVolumes().Lister() pvcLister := informerFactory.Core().V1().PersistentVolumeClaims().Lister() csiNodesLister := informerFactory.Storage().V1().CSINodes().Lister() scLister := informerFactory.Storage().V1().StorageClasses().Lister() - return newNonCSILimits(filterName, csiNodesLister, scLister, pvLister, pvcLister) + return newNonCSILimits(filterName, csiNodesLister, scLister, pvLister, pvcLister, fts) } // newNonCSILimits creates a plugin which evaluates whether a pod can fit based on the @@ -149,6 +154,7 @@ func newNonCSILimits( scLister storagelisters.StorageClassLister, pvLister corelisters.PersistentVolumeLister, pvcLister corelisters.PersistentVolumeClaimLister, + fts feature.Features, ) framework.Plugin { var filter VolumeFilter var volumeLimitKey v1.ResourceName @@ -185,6 +191,8 @@ func newNonCSILimits( pvcLister: pvcLister, scLister: scLister, randomVolumeIDPrefix: rand.String(32), + + enableGenericEphemeralVolume: fts.EnableGenericEphemeralVolume, } return pl @@ -213,7 +221,7 @@ func (pl *nonCSILimits) Filter(ctx context.Context, _ *framework.CycleState, pod } newVolumes := make(sets.String) - if err := pl.filterVolumes(pod.Spec.Volumes, pod.Namespace, newVolumes); err != nil { + if err := pl.filterVolumes(pod, newVolumes); err != nil { return framework.AsStatus(err) } @@ -246,7 +254,7 @@ func (pl *nonCSILimits) Filter(ctx context.Context, _ *framework.CycleState, pod // count unique volumes existingVolumes := make(sets.String) for _, existingPod := range nodeInfo.Pods { - if err := pl.filterVolumes(existingPod.Pod.Spec.Volumes, existingPod.Pod.Namespace, existingVolumes); err != nil { + if err := pl.filterVolumes(existingPod.Pod, existingVolumes); err != nil { return framework.AsStatus(err) } } @@ -270,57 +278,84 @@ func (pl *nonCSILimits) Filter(ctx context.Context, _ *framework.CycleState, pod return nil } -func (pl *nonCSILimits) filterVolumes(volumes []v1.Volume, namespace string, filteredVolumes sets.String) error { +func (pl *nonCSILimits) filterVolumes(pod *v1.Pod, filteredVolumes sets.String) error { + volumes := pod.Spec.Volumes for i := range volumes { vol := &volumes[i] if id, ok := pl.filter.FilterVolume(vol); ok { filteredVolumes.Insert(id) - } else if vol.PersistentVolumeClaim != nil { - pvcName := vol.PersistentVolumeClaim.ClaimName - if pvcName == "" { - return fmt.Errorf("PersistentVolumeClaim had no name") - } + continue + } - // Until we know real ID of the volume use namespace/pvcName as substitute - // with a random prefix (calculated and stored inside 'c' during initialization) - // to avoid conflicts with existing volume IDs. - pvID := fmt.Sprintf("%s-%s/%s", pl.randomVolumeIDPrefix, namespace, pvcName) - - pvc, err := pl.pvcLister.PersistentVolumeClaims(namespace).Get(pvcName) - if err != nil { - // If the PVC is invalid, we don't count the volume because - // there's no guarantee that it belongs to the running predicate. - klog.V(4).InfoS("Unable to look up PVC info, assuming PVC doesn't match predicate when counting limits", "PVC", fmt.Sprintf("%s/%s", namespace, pvcName), "err", err) - continue + pvcName := "" + ephemeral := false + switch { + case vol.PersistentVolumeClaim != nil: + pvcName = vol.PersistentVolumeClaim.ClaimName + case vol.Ephemeral != nil: + if !pl.enableGenericEphemeralVolume { + return fmt.Errorf( + "volume %s is a generic ephemeral volume, but that feature is disabled in kube-scheduler", + vol.Name, + ) } + // Generic ephemeral inline volumes also use a PVC, + // just with a computed name and certain ownership. + // That is checked below once the pvc object is + // retrieved. + pvcName = pod.Name + "-" + vol.Name + ephemeral = true + default: + continue + } + if pvcName == "" { + return fmt.Errorf("PersistentVolumeClaim had no name") + } - pvName := pvc.Spec.VolumeName - if pvName == "" { - // PVC is not bound. It was either deleted and created again or - // it was forcefully unbound by admin. The pod can still use the - // original PV where it was bound to, so we count the volume if - // it belongs to the running predicate. - if pl.matchProvisioner(pvc) { - klog.V(4).InfoS("PVC is not bound, assuming PVC matches predicate when counting limits", "PVC", fmt.Sprintf("%s/%s", namespace, pvcName)) - filteredVolumes.Insert(pvID) - } - continue - } + // Until we know real ID of the volume use namespace/pvcName as substitute + // with a random prefix (calculated and stored inside 'c' during initialization) + // to avoid conflicts with existing volume IDs. + pvID := fmt.Sprintf("%s-%s/%s", pl.randomVolumeIDPrefix, pod.Namespace, pvcName) - pv, err := pl.pvLister.Get(pvName) - if err != nil { - // If the PV is invalid and PVC belongs to the running predicate, - // log the error and count the PV towards the PV limit. - if pl.matchProvisioner(pvc) { - klog.V(4).InfoS("Unable to look up PV, assuming PV matches predicate when counting limits", "PV", fmt.Sprintf("%s/%s/%s", namespace, pvcName, pvName), "err", err) - filteredVolumes.Insert(pvID) - } - continue - } + pvc, err := pl.pvcLister.PersistentVolumeClaims(pod.Namespace).Get(pvcName) + if err != nil { + // If the PVC is invalid, we don't count the volume because + // there's no guarantee that it belongs to the running predicate. + klog.V(4).InfoS("Unable to look up PVC info, assuming PVC doesn't match predicate when counting limits", "PVC", fmt.Sprintf("%s/%s", pod.Namespace, pvcName), "err", err) + continue + } - if id, ok := pl.filter.FilterPersistentVolume(pv); ok { - filteredVolumes.Insert(id) + // The PVC for an ephemeral volume must be owned by the pod. + if ephemeral && !metav1.IsControlledBy(pvc, pod) { + return fmt.Errorf("PVC %s/%s is not owned by pod", pod.Namespace, pvcName) + } + + pvName := pvc.Spec.VolumeName + if pvName == "" { + // PVC is not bound. It was either deleted and created again or + // it was forcefully unbound by admin. The pod can still use the + // original PV where it was bound to, so we count the volume if + // it belongs to the running predicate. + if pl.matchProvisioner(pvc) { + klog.V(4).InfoS("PVC is not bound, assuming PVC matches predicate when counting limits", "PVC", fmt.Sprintf("%s/%s", pod.Namespace, pvcName)) + filteredVolumes.Insert(pvID) } + continue + } + + pv, err := pl.pvLister.Get(pvName) + if err != nil { + // If the PV is invalid and PVC belongs to the running predicate, + // log the error and count the PV towards the PV limit. + if pl.matchProvisioner(pvc) { + klog.V(4).InfoS("Unable to look up PV, assuming PV matches predicate when counting limits", "PV", fmt.Sprintf("%s/%s/%s", pod.Namespace, pvcName, pvName), "err", err) + filteredVolumes.Insert(pvID) + } + continue + } + + if id, ok := pl.filter.FilterPersistentVolume(pv); ok { + filteredVolumes.Insert(id) } } diff --git a/pkg/scheduler/framework/plugins/nodevolumelimits/non_csi_test.go b/pkg/scheduler/framework/plugins/nodevolumelimits/non_csi_test.go index 35b9310371c..55adccf89f6 100644 --- a/pkg/scheduler/framework/plugins/nodevolumelimits/non_csi_test.go +++ b/pkg/scheduler/framework/plugins/nodevolumelimits/non_csi_test.go @@ -28,9 +28,113 @@ import ( csilibplugins "k8s.io/csi-translation-lib/plugins" "k8s.io/kubernetes/pkg/scheduler/framework" fakeframework "k8s.io/kubernetes/pkg/scheduler/framework/fake" + "k8s.io/kubernetes/pkg/scheduler/framework/plugins/feature" utilpointer "k8s.io/utils/pointer" ) +func TestEphemeralLimits(t *testing.T) { + // We have to specify a valid filter and arbitrarily pick Cinder here. + // It doesn't matter for the test cases. + filterName := cinderVolumeFilterType + driverName := csilibplugins.CinderInTreePluginName + + ephemeralVolumePod := &v1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: "test", + Name: "abc", + UID: "12345", + }, + Spec: v1.PodSpec{ + Volumes: []v1.Volume{ + { + Name: "xyz", + VolumeSource: v1.VolumeSource{ + Ephemeral: &v1.EphemeralVolumeSource{}, + }, + }, + }, + }, + } + controller := true + ephemeralClaim := &v1.PersistentVolumeClaim{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: ephemeralVolumePod.Namespace, + Name: ephemeralVolumePod.Name + "-" + ephemeralVolumePod.Spec.Volumes[0].Name, + OwnerReferences: []metav1.OwnerReference{ + { + Kind: "Pod", + Name: ephemeralVolumePod.Name, + UID: ephemeralVolumePod.UID, + Controller: &controller, + }, + }, + }, + Spec: v1.PersistentVolumeClaimSpec{ + VolumeName: "missing", + StorageClassName: &filterName, + }, + } + conflictingClaim := ephemeralClaim.DeepCopy() + conflictingClaim.OwnerReferences = nil + + tests := []struct { + newPod *v1.Pod + existingPods []*v1.Pod + extraClaims []v1.PersistentVolumeClaim + ephemeralEnabled bool + maxVols int + test string + wantStatus *framework.Status + }{ + { + newPod: ephemeralVolumePod, + test: "volume feature disabled", + wantStatus: framework.NewStatus(framework.Error, "volume xyz is a generic ephemeral volume, but that feature is disabled in kube-scheduler"), + }, + { + newPod: ephemeralVolumePod, + ephemeralEnabled: true, + test: "volume missing", + }, + { + newPod: ephemeralVolumePod, + ephemeralEnabled: true, + extraClaims: []v1.PersistentVolumeClaim{*conflictingClaim}, + test: "volume not owned", + wantStatus: framework.NewStatus(framework.Error, "PVC test/abc-xyz is not owned by pod"), + }, + { + newPod: ephemeralVolumePod, + ephemeralEnabled: true, + extraClaims: []v1.PersistentVolumeClaim{*ephemeralClaim}, + maxVols: 1, + test: "volume unbound, allowed", + }, + { + newPod: ephemeralVolumePod, + ephemeralEnabled: true, + extraClaims: []v1.PersistentVolumeClaim{*ephemeralClaim}, + maxVols: 0, + test: "volume unbound, exceeds limit", + wantStatus: framework.NewStatus(framework.Unschedulable, ErrReasonMaxVolumeCountExceeded), + }, + } + + for _, test := range tests { + t.Run(test.test, func(t *testing.T) { + fts := feature.Features{ + EnableGenericEphemeralVolume: test.ephemeralEnabled, + } + node, csiNode := getNodeWithPodAndVolumeLimits("node", test.existingPods, int64(test.maxVols), filterName) + p := newNonCSILimits(filterName, getFakeCSINodeLister(csiNode), getFakeCSIStorageClassLister(filterName, driverName), getFakePVLister(filterName), append(getFakePVCLister(filterName), test.extraClaims...), fts).(framework.FilterPlugin) + gotStatus := p.Filter(context.Background(), nil, test.newPod, node) + if !reflect.DeepEqual(gotStatus, test.wantStatus) { + t.Errorf("status does not match: %v, want: %v", gotStatus, test.wantStatus) + } + }) + } +} + func TestAzureDiskLimits(t *testing.T) { oneVolPod := &v1.Pod{ Spec: v1.PodSpec{ @@ -360,7 +464,7 @@ func TestAzureDiskLimits(t *testing.T) { for _, test := range tests { t.Run(test.test, func(t *testing.T) { node, csiNode := getNodeWithPodAndVolumeLimits("node", test.existingPods, int64(test.maxVols), test.filterName) - p := newNonCSILimits(test.filterName, getFakeCSINodeLister(csiNode), getFakeCSIStorageClassLister(test.filterName, test.driverName), getFakePVLister(test.filterName), getFakePVCLister(test.filterName)).(framework.FilterPlugin) + p := newNonCSILimits(test.filterName, getFakeCSINodeLister(csiNode), getFakeCSIStorageClassLister(test.filterName, test.driverName), getFakePVLister(test.filterName), getFakePVCLister(test.filterName), feature.Features{}).(framework.FilterPlugin) gotStatus := p.Filter(context.Background(), nil, test.newPod, node) if !reflect.DeepEqual(gotStatus, test.wantStatus) { t.Errorf("status does not match: %v, want: %v", gotStatus, test.wantStatus) @@ -427,7 +531,7 @@ func TestCinderLimits(t *testing.T) { for _, test := range tests { t.Run(test.test, func(t *testing.T) { node, csiNode := getNodeWithPodAndVolumeLimits("node", test.existingPods, int64(test.maxVols), test.filterName) - p := newNonCSILimits(test.filterName, getFakeCSINodeLister(csiNode), getFakeCSIStorageClassLister(test.filterName, test.driverName), getFakePVLister(test.filterName), getFakePVCLister(test.filterName)).(framework.FilterPlugin) + p := newNonCSILimits(test.filterName, getFakeCSINodeLister(csiNode), getFakeCSIStorageClassLister(test.filterName, test.driverName), getFakePVLister(test.filterName), getFakePVCLister(test.filterName), feature.Features{}).(framework.FilterPlugin) gotStatus := p.Filter(context.Background(), nil, test.newPod, node) if !reflect.DeepEqual(gotStatus, test.wantStatus) { t.Errorf("status does not match: %v, want: %v", gotStatus, test.wantStatus) @@ -834,7 +938,7 @@ func TestEBSLimits(t *testing.T) { for _, test := range tests { t.Run(test.test, func(t *testing.T) { node, csiNode := getNodeWithPodAndVolumeLimits("node", test.existingPods, int64(test.maxVols), test.filterName) - p := newNonCSILimits(test.filterName, getFakeCSINodeLister(csiNode), getFakeCSIStorageClassLister(test.filterName, test.driverName), getFakePVLister(test.filterName), getFakePVCLister(test.filterName)).(framework.FilterPlugin) + p := newNonCSILimits(test.filterName, getFakeCSINodeLister(csiNode), getFakeCSIStorageClassLister(test.filterName, test.driverName), getFakePVLister(test.filterName), getFakePVCLister(test.filterName), feature.Features{}).(framework.FilterPlugin) gotStatus := p.Filter(context.Background(), nil, test.newPod, node) if !reflect.DeepEqual(gotStatus, test.wantStatus) { t.Errorf("status does not match: %v, want: %v", gotStatus, test.wantStatus) @@ -1172,7 +1276,7 @@ func TestGCEPDLimits(t *testing.T) { for _, test := range tests { t.Run(test.test, func(t *testing.T) { node, csiNode := getNodeWithPodAndVolumeLimits("node", test.existingPods, int64(test.maxVols), test.filterName) - p := newNonCSILimits(test.filterName, getFakeCSINodeLister(csiNode), getFakeCSIStorageClassLister(test.filterName, test.driverName), getFakePVLister(test.filterName), getFakePVCLister(test.filterName)).(framework.FilterPlugin) + p := newNonCSILimits(test.filterName, getFakeCSINodeLister(csiNode), getFakeCSIStorageClassLister(test.filterName, test.driverName), getFakePVLister(test.filterName), getFakePVCLister(test.filterName), feature.Features{}).(framework.FilterPlugin) gotStatus := p.Filter(context.Background(), nil, test.newPod, node) if !reflect.DeepEqual(gotStatus, test.wantStatus) { t.Errorf("status does not match: %v, want: %v", gotStatus, test.wantStatus) diff --git a/pkg/scheduler/framework/plugins/registry.go b/pkg/scheduler/framework/plugins/registry.go index 6476c7a9bd9..fc33eedde43 100644 --- a/pkg/scheduler/framework/plugins/registry.go +++ b/pkg/scheduler/framework/plugins/registry.go @@ -75,11 +75,11 @@ func NewInTreeRegistry() runtime.Registry { volumebinding.Name: runtime.FactoryAdapter(fts, volumebinding.New), volumerestrictions.Name: runtime.FactoryAdapter(fts, volumerestrictions.New), volumezone.Name: volumezone.New, - nodevolumelimits.CSIName: nodevolumelimits.NewCSI, - nodevolumelimits.EBSName: nodevolumelimits.NewEBS, - nodevolumelimits.GCEPDName: nodevolumelimits.NewGCEPD, - nodevolumelimits.AzureDiskName: nodevolumelimits.NewAzureDisk, - nodevolumelimits.CinderName: nodevolumelimits.NewCinder, + nodevolumelimits.CSIName: runtime.FactoryAdapter(fts, nodevolumelimits.NewCSI), + nodevolumelimits.EBSName: runtime.FactoryAdapter(fts, nodevolumelimits.NewEBS), + nodevolumelimits.GCEPDName: runtime.FactoryAdapter(fts, nodevolumelimits.NewGCEPD), + nodevolumelimits.AzureDiskName: runtime.FactoryAdapter(fts, nodevolumelimits.NewAzureDisk), + nodevolumelimits.CinderName: runtime.FactoryAdapter(fts, nodevolumelimits.NewCinder), interpodaffinity.Name: runtime.FactoryAdapter(fts, interpodaffinity.New), nodelabel.Name: nodelabel.New, serviceaffinity.Name: serviceaffinity.New, diff --git a/test/e2e/storage/csi_mock_volume.go b/test/e2e/storage/csi_mock_volume.go index 8b9864a59ca..aba74f9d7dd 100644 --- a/test/e2e/storage/csi_mock_volume.go +++ b/test/e2e/storage/csi_mock_volume.go @@ -197,7 +197,11 @@ var _ = utils.SIGDescribe("CSI mock volume", func() { framework.ExpectNoError(err, "Failed to register CSIDriver %v", m.config.GetUniqueDriverName()) } - createPod := func(ephemeral bool) (class *storagev1.StorageClass, claim *v1.PersistentVolumeClaim, pod *v1.Pod) { + type volumeType string + csiEphemeral := volumeType("CSI") + genericEphemeral := volumeType("Ephemeral") + pvcReference := volumeType("PVC") + createPod := func(withVolume volumeType) (class *storagev1.StorageClass, claim *v1.PersistentVolumeClaim, pod *v1.Pod) { ginkgo.By("Creating pod") sc := m.driver.GetDynamicProvisionStorageClass(m.config, "") scTest := testsuites.StorageClassTest{ @@ -213,12 +217,21 @@ var _ = utils.SIGDescribe("CSI mock volume", func() { // The mock driver only works when everything runs on a single node. nodeSelection := m.config.ClientNodeSelection - if ephemeral { + switch withVolume { + case csiEphemeral: pod = startPausePodInline(f.ClientSet, scTest, nodeSelection, f.Namespace.Name) - if pod != nil { - m.pods = append(m.pods, pod) + case genericEphemeral: + class, pod = startPausePodGenericEphemeral(f.ClientSet, scTest, nodeSelection, m.tp.scName, f.Namespace.Name) + if class != nil { + m.sc[class.Name] = class } - } else { + claim = &v1.PersistentVolumeClaim{ + ObjectMeta: metav1.ObjectMeta{ + Name: pod.Name + "-" + pod.Spec.Volumes[0].Name, + Namespace: f.Namespace.Name, + }, + } + case pvcReference: class, claim, pod = startPausePod(f.ClientSet, scTest, nodeSelection, m.tp.scName, f.Namespace.Name) if class != nil { m.sc[class.Name] = class @@ -226,9 +239,9 @@ var _ = utils.SIGDescribe("CSI mock volume", func() { if claim != nil { m.pvcs = append(m.pvcs, claim) } - if pod != nil { - m.pods = append(m.pods, pod) - } + } + if pod != nil { + m.pods = append(m.pods, pod) } return // result variables set above } @@ -318,6 +331,7 @@ var _ = utils.SIGDescribe("CSI mock volume", func() { name string disableAttach bool deployClusterRegistrar bool + volumeType volumeType }{ { name: "should not require VolumeAttach for drivers without attachment", @@ -328,6 +342,11 @@ var _ = utils.SIGDescribe("CSI mock volume", func() { name: "should require VolumeAttach for drivers with attachment", deployClusterRegistrar: true, }, + { + name: "should require VolumeAttach for ephemermal volume and drivers with attachment", + deployClusterRegistrar: true, + volumeType: genericEphemeral, + }, { name: "should preserve attachment policy when no CSIDriver present", deployClusterRegistrar: false, @@ -340,7 +359,11 @@ var _ = utils.SIGDescribe("CSI mock volume", func() { init(testParameters{registerDriver: test.deployClusterRegistrar, disableAttach: test.disableAttach}) defer cleanup() - _, claim, pod := createPod(false) + volumeType := t.volumeType + if volumeType == "" { + volumeType = pvcReference + } + _, claim, pod := createPod(volumeType) if pod == nil { return } @@ -375,7 +398,7 @@ var _ = utils.SIGDescribe("CSI mock volume", func() { init(testParameters{registerDriver: false, disableAttach: true}) defer cleanup() - _, claim, pod := createPod(false /* persistent volume, late binding as specified above */) + _, claim, pod := createPod(pvcReference) // late binding as specified above if pod == nil { return } @@ -497,7 +520,11 @@ var _ = utils.SIGDescribe("CSI mock volume", func() { defer cleanup() - _, _, pod := createPod(test.expectEphemeral) + withVolume := pvcReference + if test.expectEphemeral { + withVolume = csiEphemeral + } + _, _, pod := createPod(withVolume) if pod == nil { return } @@ -539,23 +566,73 @@ var _ = utils.SIGDescribe("CSI mock volume", func() { gomega.Expect(csiNodeAttachLimit).To(gomega.BeNumerically("==", 2)) - _, _, pod1 := createPod(false) + _, _, pod1 := createPod(pvcReference) gomega.Expect(pod1).NotTo(gomega.BeNil(), "while creating first pod") err = e2epod.WaitForPodNameRunningInNamespace(m.cs, pod1.Name, pod1.Namespace) framework.ExpectNoError(err, "Failed to start pod1: %v", err) - _, _, pod2 := createPod(false) + _, _, pod2 := createPod(pvcReference) gomega.Expect(pod2).NotTo(gomega.BeNil(), "while creating second pod") err = e2epod.WaitForPodNameRunningInNamespace(m.cs, pod2.Name, pod2.Namespace) framework.ExpectNoError(err, "Failed to start pod2: %v", err) - _, _, pod3 := createPod(false) + _, _, pod3 := createPod(pvcReference) gomega.Expect(pod3).NotTo(gomega.BeNil(), "while creating third pod") err = waitForMaxVolumeCondition(pod3, m.cs) framework.ExpectNoError(err, "while waiting for max volume condition on pod : %+v", pod3) }) + + ginkgo.It("should report attach limit for generic ephemeral volume when persistent volume is attached [Slow]", func() { + // define volume limit to be 2 for this test + var err error + init(testParameters{attachLimit: 1}) + defer cleanup() + nodeName := m.config.ClientNodeSelection.Name + driverName := m.config.GetUniqueDriverName() + + csiNodeAttachLimit, err := checkCSINodeForLimits(nodeName, driverName, m.cs) + framework.ExpectNoError(err, "while checking limits in CSINode: %v", err) + + gomega.Expect(csiNodeAttachLimit).To(gomega.BeNumerically("==", 1)) + + _, _, pod1 := createPod(pvcReference) + gomega.Expect(pod1).NotTo(gomega.BeNil(), "while creating pod with persistent volume") + + err = e2epod.WaitForPodNameRunningInNamespace(m.cs, pod1.Name, pod1.Namespace) + framework.ExpectNoError(err, "Failed to start pod1: %v", err) + + _, _, pod2 := createPod(genericEphemeral) + gomega.Expect(pod2).NotTo(gomega.BeNil(), "while creating pod with ephemeral volume") + err = waitForMaxVolumeCondition(pod2, m.cs) + framework.ExpectNoError(err, "while waiting for max volume condition on pod : %+v", pod2) + }) + + ginkgo.It("should report attach limit for persistent volume when generic ephemeral volume is attached [Slow]", func() { + // define volume limit to be 2 for this test + var err error + init(testParameters{attachLimit: 1}) + defer cleanup() + nodeName := m.config.ClientNodeSelection.Name + driverName := m.config.GetUniqueDriverName() + + csiNodeAttachLimit, err := checkCSINodeForLimits(nodeName, driverName, m.cs) + framework.ExpectNoError(err, "while checking limits in CSINode: %v", err) + + gomega.Expect(csiNodeAttachLimit).To(gomega.BeNumerically("==", 1)) + + _, _, pod1 := createPod(genericEphemeral) + gomega.Expect(pod1).NotTo(gomega.BeNil(), "while creating pod with persistent volume") + + err = e2epod.WaitForPodNameRunningInNamespace(m.cs, pod1.Name, pod1.Namespace) + framework.ExpectNoError(err, "Failed to start pod1: %v", err) + + _, _, pod2 := createPod(pvcReference) + gomega.Expect(pod2).NotTo(gomega.BeNil(), "while creating pod with ephemeral volume") + err = waitForMaxVolumeCondition(pod2, m.cs) + framework.ExpectNoError(err, "while waiting for max volume condition on pod : %+v", pod2) + }) }) ginkgo.Context("CSI Volume expansion", func() { @@ -603,7 +680,7 @@ var _ = utils.SIGDescribe("CSI mock volume", func() { init(tp) defer cleanup() - sc, pvc, pod := createPod(false) + sc, pvc, pod := createPod(pvcReference) gomega.Expect(pod).NotTo(gomega.BeNil(), "while creating pod for resizing") framework.ExpectEqual(*sc.AllowVolumeExpansion, true, "failed creating sc with allowed expansion") @@ -696,7 +773,7 @@ var _ = utils.SIGDescribe("CSI mock volume", func() { defer cleanup() - sc, pvc, pod := createPod(false) + sc, pvc, pod := createPod(pvcReference) gomega.Expect(pod).NotTo(gomega.BeNil(), "while creating pod for resizing") framework.ExpectEqual(*sc.AllowVolumeExpansion, true, "failed creating sc with allowed expansion") @@ -837,7 +914,7 @@ var _ = utils.SIGDescribe("CSI mock volume", func() { }) defer cleanup() - _, claim, pod := createPod(false) + _, claim, pod := createPod(pvcReference) if pod == nil { return } @@ -975,7 +1052,7 @@ var _ = utils.SIGDescribe("CSI mock volume", func() { }) defer cleanup() - _, claim, pod := createPod(false) + _, claim, pod := createPod(pvcReference) if pod == nil { return } @@ -1121,7 +1198,7 @@ var _ = utils.SIGDescribe("CSI mock volume", func() { framework.ExpectNoError(err, "create PVC watch") defer pvcWatch.Stop() - sc, claim, pod := createPod(false) + sc, claim, pod := createPod(pvcReference) gomega.Expect(pod).NotTo(gomega.BeNil(), "while creating pod") bindingMode := storagev1.VolumeBindingImmediate if test.lateBinding { @@ -1331,7 +1408,7 @@ var _ = utils.SIGDescribe("CSI mock volume", func() { syncDelay := 5 * time.Second time.Sleep(syncDelay) - sc, _, pod := createPod(false /* persistent volume, late binding as specified above */) + sc, _, pod := createPod(pvcReference) // late binding as specified above framework.ExpectEqual(sc.Name, scName, "pre-selected storage class name not used") waitCtx, cancel := context.WithTimeout(context.Background(), f.Timeouts.PodStart) @@ -1530,7 +1607,7 @@ var _ = utils.SIGDescribe("CSI mock volume", func() { defer cleanup() - _, _, pod := createPod(false) + _, _, pod := createPod(pvcReference) if pod == nil { return } @@ -1993,7 +2070,7 @@ func checkCSINodeForLimits(nodeName string, driverName string, cs clientset.Inte return attachLimit, nil } -func createClaim(cs clientset.Interface, t testsuites.StorageClassTest, node e2epod.NodeSelection, scName, ns string) (*storagev1.StorageClass, *v1.PersistentVolumeClaim) { +func createSC(cs clientset.Interface, t testsuites.StorageClassTest, scName, ns string) *storagev1.StorageClass { class := newStorageClass(t, ns, "") if scName != "" { class.Name = scName @@ -2005,12 +2082,17 @@ func createClaim(cs clientset.Interface, t testsuites.StorageClassTest, node e2e framework.ExpectNoError(err, "Failed to create class: %v", err) } + return class +} + +func createClaim(cs clientset.Interface, t testsuites.StorageClassTest, node e2epod.NodeSelection, scName, ns string) (*storagev1.StorageClass, *v1.PersistentVolumeClaim) { + class := createSC(cs, t, scName, ns) claim := e2epv.MakePersistentVolumeClaim(e2epv.PersistentVolumeClaimConfig{ ClaimSize: t.ClaimSize, StorageClassName: &(class.Name), VolumeMode: &t.VolumeMode, }, ns) - claim, err = cs.CoreV1().PersistentVolumeClaims(ns).Create(context.TODO(), claim, metav1.CreateOptions{}) + claim, err := cs.CoreV1().PersistentVolumeClaims(ns).Create(context.TODO(), claim, metav1.CreateOptions{}) framework.ExpectNoError(err, "Failed to create claim: %v", err) if !t.DelayBinding { @@ -2046,6 +2128,21 @@ func startPausePodInline(cs clientset.Interface, t testsuites.StorageClassTest, return pod } +func startPausePodGenericEphemeral(cs clientset.Interface, t testsuites.StorageClassTest, node e2epod.NodeSelection, scName, ns string) (*storagev1.StorageClass, *v1.Pod) { + class := createSC(cs, t, scName, ns) + claim := e2epv.MakePersistentVolumeClaim(e2epv.PersistentVolumeClaimConfig{ + ClaimSize: t.ClaimSize, + StorageClassName: &(class.Name), + VolumeMode: &t.VolumeMode, + }, ns) + pod, err := startPausePodWithVolumeSource(cs, v1.VolumeSource{ + Ephemeral: &v1.EphemeralVolumeSource{ + VolumeClaimTemplate: &v1.PersistentVolumeClaimTemplate{Spec: claim.Spec}}, + }, node, ns) + framework.ExpectNoError(err, "Failed to create pod: %v", err) + return class, pod +} + func startPausePodWithClaim(cs clientset.Interface, pvc *v1.PersistentVolumeClaim, node e2epod.NodeSelection, ns string) (*v1.Pod, error) { return startPausePodWithVolumeSource(cs, v1.VolumeSource{ From 1d181ad84faea855f1da1d98a2ca4680f91ebfd2 Mon Sep 17 00:00:00 2001 From: Patrick Ohly Date: Mon, 6 Sep 2021 12:05:24 +0200 Subject: [PATCH 2/2] scheduler: fail the volume attach limit when PVC is missing Previously, the situation was ignored, which might have had the effect that Pod scheduling continued (?) even though the Pod+PVC weren't known to be in an acceptable state. --- .../framework/plugins/nodevolumelimits/csi.go | 8 ++++++++ .../framework/plugins/nodevolumelimits/csi_test.go | 1 + .../framework/plugins/nodevolumelimits/non_csi.go | 12 +++++++++--- .../plugins/nodevolumelimits/non_csi_test.go | 1 + 4 files changed, 19 insertions(+), 3 deletions(-) diff --git a/pkg/scheduler/framework/plugins/nodevolumelimits/csi.go b/pkg/scheduler/framework/plugins/nodevolumelimits/csi.go index e2bb3fc5cac..def28682f0b 100644 --- a/pkg/scheduler/framework/plugins/nodevolumelimits/csi.go +++ b/pkg/scheduler/framework/plugins/nodevolumelimits/csi.go @@ -180,6 +180,14 @@ func (pl *CSILimits) filterAttachableVolumes( pvc, err := pl.pvcLister.PersistentVolumeClaims(pod.Namespace).Get(pvcName) if err != nil { + if newPod { + // The PVC is required to proceed with + // scheduling of a new pod because it cannot + // run without it. Bail out immediately. + return fmt.Errorf("looking up PVC %s/%s: %v", pod.Namespace, pvcName, err) + } + // If the PVC is invalid, we don't count the volume because + // there's no guarantee that it belongs to the running predicate. klog.V(5).InfoS("Unable to look up PVC info", "PVC", fmt.Sprintf("%s/%s", pod.Namespace, pvcName)) continue } diff --git a/pkg/scheduler/framework/plugins/nodevolumelimits/csi_test.go b/pkg/scheduler/framework/plugins/nodevolumelimits/csi_test.go index ef3f7782c9a..f94808d2a86 100644 --- a/pkg/scheduler/framework/plugins/nodevolumelimits/csi_test.go +++ b/pkg/scheduler/framework/plugins/nodevolumelimits/csi_test.go @@ -559,6 +559,7 @@ func TestCSILimits(t *testing.T) { ephemeralEnabled: true, driverNames: []string{ebsCSIDriverName}, test: "ephemeral volume missing", + wantStatus: framework.NewStatus(framework.Error, `looking up PVC test/abc-xyz: persistentvolumeclaim "abc-xyz" not found`), }, { newPod: ephemeralVolumePod, diff --git a/pkg/scheduler/framework/plugins/nodevolumelimits/non_csi.go b/pkg/scheduler/framework/plugins/nodevolumelimits/non_csi.go index 63c95049d82..78e8458b794 100644 --- a/pkg/scheduler/framework/plugins/nodevolumelimits/non_csi.go +++ b/pkg/scheduler/framework/plugins/nodevolumelimits/non_csi.go @@ -221,7 +221,7 @@ func (pl *nonCSILimits) Filter(ctx context.Context, _ *framework.CycleState, pod } newVolumes := make(sets.String) - if err := pl.filterVolumes(pod, newVolumes); err != nil { + if err := pl.filterVolumes(pod, true /* new pod */, newVolumes); err != nil { return framework.AsStatus(err) } @@ -254,7 +254,7 @@ func (pl *nonCSILimits) Filter(ctx context.Context, _ *framework.CycleState, pod // count unique volumes existingVolumes := make(sets.String) for _, existingPod := range nodeInfo.Pods { - if err := pl.filterVolumes(existingPod.Pod, existingVolumes); err != nil { + if err := pl.filterVolumes(existingPod.Pod, false /* existing pod */, existingVolumes); err != nil { return framework.AsStatus(err) } } @@ -278,7 +278,7 @@ func (pl *nonCSILimits) Filter(ctx context.Context, _ *framework.CycleState, pod return nil } -func (pl *nonCSILimits) filterVolumes(pod *v1.Pod, filteredVolumes sets.String) error { +func (pl *nonCSILimits) filterVolumes(pod *v1.Pod, newPod bool, filteredVolumes sets.String) error { volumes := pod.Spec.Volumes for i := range volumes { vol := &volumes[i] @@ -319,6 +319,12 @@ func (pl *nonCSILimits) filterVolumes(pod *v1.Pod, filteredVolumes sets.String) pvc, err := pl.pvcLister.PersistentVolumeClaims(pod.Namespace).Get(pvcName) if err != nil { + if newPod { + // The PVC is required to proceed with + // scheduling of a new pod because it cannot + // run without it. Bail out immediately. + return fmt.Errorf("looking up PVC %s/%s: %v", pod.Namespace, pvcName, err) + } // If the PVC is invalid, we don't count the volume because // there's no guarantee that it belongs to the running predicate. klog.V(4).InfoS("Unable to look up PVC info, assuming PVC doesn't match predicate when counting limits", "PVC", fmt.Sprintf("%s/%s", pod.Namespace, pvcName), "err", err) diff --git a/pkg/scheduler/framework/plugins/nodevolumelimits/non_csi_test.go b/pkg/scheduler/framework/plugins/nodevolumelimits/non_csi_test.go index 55adccf89f6..a4ebd9113bc 100644 --- a/pkg/scheduler/framework/plugins/nodevolumelimits/non_csi_test.go +++ b/pkg/scheduler/framework/plugins/nodevolumelimits/non_csi_test.go @@ -95,6 +95,7 @@ func TestEphemeralLimits(t *testing.T) { newPod: ephemeralVolumePod, ephemeralEnabled: true, test: "volume missing", + wantStatus: framework.NewStatus(framework.Error, `looking up PVC test/abc-xyz: persistentvolumeclaim "abc-xyz" not found`), }, { newPod: ephemeralVolumePod,