diff --git a/pkg/kubelet/kubelet_pods.go b/pkg/kubelet/kubelet_pods.go index a4257c1f885..4b5e59f48f7 100644 --- a/pkg/kubelet/kubelet_pods.go +++ b/pkg/kubelet/kubelet_pods.go @@ -2018,21 +2018,31 @@ func hasHostNamespace(pod *v1.Pod) bool { // hasHostMountPVC returns true if a PVC is referencing a HostPath volume. func (kl *Kubelet) hasHostMountPVC(pod *v1.Pod) bool { for _, volume := range pod.Spec.Volumes { - if volume.PersistentVolumeClaim != nil { - pvc, err := kl.kubeClient.CoreV1().PersistentVolumeClaims(pod.Namespace).Get(context.TODO(), volume.PersistentVolumeClaim.ClaimName, metav1.GetOptions{}) - if err != nil { - klog.InfoS("Unable to retrieve pvc", "pvc", klog.KRef(pod.Namespace, volume.PersistentVolumeClaim.ClaimName), "err", err) + pvcName := "" + switch { + case volume.PersistentVolumeClaim != nil: + pvcName = volume.PersistentVolumeClaim.ClaimName + case volume.Ephemeral != nil: + if !utilfeature.DefaultFeatureGate.Enabled(features.GenericEphemeralVolume) { continue } - if pvc != nil { - referencedVolume, err := kl.kubeClient.CoreV1().PersistentVolumes().Get(context.TODO(), pvc.Spec.VolumeName, metav1.GetOptions{}) - if err != nil { - klog.InfoS("Unable to retrieve pv", "pvName", pvc.Spec.VolumeName, "err", err) - continue - } - if referencedVolume != nil && referencedVolume.Spec.HostPath != nil { - return true - } + pvcName = pod.Name + "-" + volume.Name + default: + continue + } + pvc, err := kl.kubeClient.CoreV1().PersistentVolumeClaims(pod.Namespace).Get(context.TODO(), pvcName, metav1.GetOptions{}) + if err != nil { + klog.InfoS("Unable to retrieve pvc", "pvc", klog.KRef(pod.Namespace, pvcName), "err", err) + continue + } + if pvc != nil { + referencedVolume, err := kl.kubeClient.CoreV1().PersistentVolumes().Get(context.TODO(), pvc.Spec.VolumeName, metav1.GetOptions{}) + if err != nil { + klog.InfoS("Unable to retrieve pv", "pvName", pvc.Spec.VolumeName, "err", err) + continue + } + if referencedVolume != nil && referencedVolume.Spec.HostPath != nil { + return true } } } diff --git a/pkg/kubelet/kubelet_pods_test.go b/pkg/kubelet/kubelet_pods_test.go index 0b24accdfa2..655cb04d115 100644 --- a/pkg/kubelet/kubelet_pods_test.go +++ b/pkg/kubelet/kubelet_pods_test.go @@ -38,8 +38,10 @@ import ( "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/util/diff" + utilfeature "k8s.io/apiserver/pkg/util/feature" core "k8s.io/client-go/testing" "k8s.io/client-go/tools/record" + featuregatetesting "k8s.io/component-base/featuregate/testing" netutils "k8s.io/utils/net" // TODO: remove this import if @@ -47,6 +49,7 @@ import ( // to "v1"? _ "k8s.io/kubernetes/pkg/apis/core/install" + "k8s.io/kubernetes/pkg/features" kubecontainer "k8s.io/kubernetes/pkg/kubelet/container" containertest "k8s.io/kubernetes/pkg/kubelet/container/testing" "k8s.io/kubernetes/pkg/kubelet/cri/streaming/portforward" @@ -2886,13 +2889,16 @@ func TestGetPortForward(t *testing.T) { } func TestHasHostMountPVC(t *testing.T) { - tests := map[string]struct { - pvError error - pvcError error - expected bool - podHasPVC bool - pvcIsHostPath bool - }{ + type testcase struct { + pvError error + pvcError error + expected bool + podHasPVC bool + pvcIsHostPath bool + podHasEphemeral bool + ephemeralEnabled bool + } + tests := map[string]testcase{ "no pvc": {podHasPVC: false, expected: false}, "error fetching pvc": { podHasPVC: true, @@ -2909,6 +2915,18 @@ func TestHasHostMountPVC(t *testing.T) { pvcIsHostPath: true, expected: true, }, + "enabled ephemeral host path": { + podHasEphemeral: true, + pvcIsHostPath: true, + ephemeralEnabled: true, + expected: true, + }, + "disabled ephemeral host path": { + podHasEphemeral: true, + pvcIsHostPath: true, + ephemeralEnabled: false, + expected: false, + }, "non host path pvc": { podHasPVC: true, pvcIsHostPath: false, @@ -2916,7 +2934,8 @@ func TestHasHostMountPVC(t *testing.T) { }, } - for k, v := range tests { + run := func(t *testing.T, v testcase) { + defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.GenericEphemeralVolume, v.ephemeralEnabled)() testKubelet := newTestKubelet(t, false) defer testKubelet.Cleanup() pod := &v1.Pod{ @@ -2935,13 +2954,23 @@ func TestHasHostMountPVC(t *testing.T) { }, }, } + } - if v.pvcIsHostPath { - volumeToReturn.Spec.PersistentVolumeSource = v1.PersistentVolumeSource{ - HostPath: &v1.HostPathVolumeSource{}, - } + if v.podHasEphemeral { + pod.Spec.Volumes = []v1.Volume{ + { + Name: "xyz", + VolumeSource: v1.VolumeSource{ + Ephemeral: &v1.EphemeralVolumeSource{}, + }, + }, } + } + if (v.podHasPVC || v.podHasEphemeral) && v.pvcIsHostPath { + volumeToReturn.Spec.PersistentVolumeSource = v1.PersistentVolumeSource{ + HostPath: &v1.HostPathVolumeSource{}, + } } testKubelet.fakeKubeClient.AddReactor("get", "persistentvolumeclaims", func(action core.Action) (bool, runtime.Object, error) { @@ -2957,9 +2986,14 @@ func TestHasHostMountPVC(t *testing.T) { actual := testKubelet.kubelet.hasHostMountPVC(pod) if actual != v.expected { - t.Errorf("%s expected %t but got %t", k, v.expected, actual) + t.Errorf("expected %t but got %t", v.expected, actual) } + } + for k, v := range tests { + t.Run(k, func(t *testing.T) { + run(t, v) + }) } } diff --git a/pkg/scheduler/framework/plugins/nodevolumelimits/csi.go b/pkg/scheduler/framework/plugins/nodevolumelimits/csi.go index 2b4a752d16c..def28682f0b 100644 --- a/pkg/scheduler/framework/plugins/nodevolumelimits/csi.go +++ b/pkg/scheduler/framework/plugins/nodevolumelimits/csi.go @@ -22,6 +22,7 @@ import ( v1 "k8s.io/api/core/v1" storagev1 "k8s.io/api/storage/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/util/rand" corelisters "k8s.io/client-go/listers/core/v1" @@ -30,6 +31,7 @@ import ( csitrans "k8s.io/csi-translation-lib" "k8s.io/klog/v2" "k8s.io/kubernetes/pkg/scheduler/framework" + "k8s.io/kubernetes/pkg/scheduler/framework/plugins/feature" "k8s.io/kubernetes/pkg/scheduler/framework/plugins/names" volumeutil "k8s.io/kubernetes/pkg/volume/util" ) @@ -54,6 +56,8 @@ type CSILimits struct { randomVolumeIDPrefix string translator InTreeToCSITranslator + + enableGenericEphemeralVolume bool } var _ framework.FilterPlugin = &CSILimits{} @@ -96,7 +100,7 @@ func (pl *CSILimits) Filter(ctx context.Context, _ *framework.CycleState, pod *v } newVolumes := make(map[string]string) - if err := pl.filterAttachableVolumes(csiNode, pod.Spec.Volumes, pod.Namespace, newVolumes); err != nil { + if err := pl.filterAttachableVolumes(pod, csiNode, true /* new pod */, newVolumes); err != nil { return framework.AsStatus(err) } @@ -113,7 +117,7 @@ func (pl *CSILimits) Filter(ctx context.Context, _ *framework.CycleState, pod *v attachedVolumes := make(map[string]string) for _, existingPod := range nodeInfo.Pods { - if err := pl.filterAttachableVolumes(csiNode, existingPod.Pod.Spec.Volumes, existingPod.Pod.Namespace, attachedVolumes); err != nil { + if err := pl.filterAttachableVolumes(existingPod.Pod, csiNode, false /* existing pod */, attachedVolumes); err != nil { return framework.AsStatus(err) } } @@ -144,25 +148,55 @@ func (pl *CSILimits) Filter(ctx context.Context, _ *framework.CycleState, pod *v } func (pl *CSILimits) filterAttachableVolumes( - csiNode *storagev1.CSINode, volumes []v1.Volume, namespace string, result map[string]string) error { - for _, vol := range volumes { - // CSI volumes can only be used as persistent volumes - if vol.PersistentVolumeClaim == nil { + pod *v1.Pod, csiNode *storagev1.CSINode, newPod bool, result map[string]string) error { + for _, vol := range pod.Spec.Volumes { + // CSI volumes can only be used through a PVC. + pvcName := "" + ephemeral := false + switch { + case vol.PersistentVolumeClaim != nil: + pvcName = vol.PersistentVolumeClaim.ClaimName + case vol.Ephemeral != nil: + if newPod && !pl.enableGenericEphemeralVolume { + return fmt.Errorf( + "volume %s is a generic ephemeral volume, but that feature is disabled in kube-scheduler", + vol.Name, + ) + } + // Generic ephemeral inline volumes also use a PVC, + // just with a computed name and certain ownership. + // That is checked below once the pvc object is + // retrieved. + pvcName = pod.Name + "-" + vol.Name + ephemeral = true + default: continue } - pvcName := vol.PersistentVolumeClaim.ClaimName if pvcName == "" { return fmt.Errorf("PersistentVolumeClaim had no name") } - pvc, err := pl.pvcLister.PersistentVolumeClaims(namespace).Get(pvcName) + pvc, err := pl.pvcLister.PersistentVolumeClaims(pod.Namespace).Get(pvcName) if err != nil { - klog.V(5).InfoS("Unable to look up PVC info", "PVC", fmt.Sprintf("%s/%s", namespace, pvcName)) + if newPod { + // The PVC is required to proceed with + // scheduling of a new pod because it cannot + // run without it. Bail out immediately. + return fmt.Errorf("looking up PVC %s/%s: %v", pod.Namespace, pvcName, err) + } + // If the PVC is invalid, we don't count the volume because + // there's no guarantee that it belongs to the running predicate. + klog.V(5).InfoS("Unable to look up PVC info", "PVC", fmt.Sprintf("%s/%s", pod.Namespace, pvcName)) continue } + // The PVC for an ephemeral volume must be owned by the pod. + if ephemeral && !metav1.IsControlledBy(pvc, pod) { + return fmt.Errorf("PVC %s/%s is not owned by pod", pod.Namespace, pvcName) + } + driverName, volumeHandle := pl.getCSIDriverInfo(csiNode, pvc) if driverName == "" || volumeHandle == "" { klog.V(5).Info("Could not find a CSI driver name or volume handle, not counting volume") @@ -276,7 +310,7 @@ func (pl *CSILimits) getCSIDriverInfoFromSC(csiNode *storagev1.CSINode, pvc *v1. } // NewCSI initializes a new plugin and returns it. -func NewCSI(_ runtime.Object, handle framework.Handle) (framework.Plugin, error) { +func NewCSI(_ runtime.Object, handle framework.Handle, fts feature.Features) (framework.Plugin, error) { informerFactory := handle.SharedInformerFactory() pvLister := informerFactory.Core().V1().PersistentVolumes().Lister() pvcLister := informerFactory.Core().V1().PersistentVolumeClaims().Lister() @@ -284,12 +318,13 @@ func NewCSI(_ runtime.Object, handle framework.Handle) (framework.Plugin, error) scLister := informerFactory.Storage().V1().StorageClasses().Lister() return &CSILimits{ - csiNodeLister: csiNodesLister, - pvLister: pvLister, - pvcLister: pvcLister, - scLister: scLister, - randomVolumeIDPrefix: rand.String(32), - translator: csitrans.New(), + csiNodeLister: csiNodesLister, + pvLister: pvLister, + pvcLister: pvcLister, + scLister: scLister, + randomVolumeIDPrefix: rand.String(32), + translator: csitrans.New(), + enableGenericEphemeralVolume: fts.EnableGenericEphemeralVolume, }, nil } diff --git a/pkg/scheduler/framework/plugins/nodevolumelimits/csi_test.go b/pkg/scheduler/framework/plugins/nodevolumelimits/csi_test.go index b92620acacc..f94808d2a86 100644 --- a/pkg/scheduler/framework/plugins/nodevolumelimits/csi_test.go +++ b/pkg/scheduler/framework/plugins/nodevolumelimits/csi_test.go @@ -47,6 +47,10 @@ const ( hostpathInTreePluginName = "kubernetes.io/hostpath" ) +var ( + scName = "csi-sc" +) + // getVolumeLimitKey returns a ResourceName by filter type func getVolumeLimitKey(filterType string) v1.ResourceName { switch filterType { @@ -236,14 +240,112 @@ func TestCSILimits(t *testing.T) { }, } + ephemeralVolumePod := &v1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: "test", + Name: "abc", + UID: "12345", + }, + Spec: v1.PodSpec{ + Volumes: []v1.Volume{ + { + Name: "xyz", + VolumeSource: v1.VolumeSource{ + Ephemeral: &v1.EphemeralVolumeSource{}, + }, + }, + }, + }, + } + controller := true + ephemeralClaim := &v1.PersistentVolumeClaim{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: ephemeralVolumePod.Namespace, + Name: ephemeralVolumePod.Name + "-" + ephemeralVolumePod.Spec.Volumes[0].Name, + OwnerReferences: []metav1.OwnerReference{ + { + Kind: "Pod", + Name: ephemeralVolumePod.Name, + UID: ephemeralVolumePod.UID, + Controller: &controller, + }, + }, + }, + Spec: v1.PersistentVolumeClaimSpec{ + StorageClassName: &scName, + }, + } + conflictingClaim := ephemeralClaim.DeepCopy() + conflictingClaim.OwnerReferences = nil + + ephemeralTwoVolumePod := &v1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: "test", + Name: "abc", + UID: "12345II", + }, + Spec: v1.PodSpec{ + Volumes: []v1.Volume{ + { + Name: "x", + VolumeSource: v1.VolumeSource{ + Ephemeral: &v1.EphemeralVolumeSource{}, + }, + }, + { + Name: "y", + VolumeSource: v1.VolumeSource{ + Ephemeral: &v1.EphemeralVolumeSource{}, + }, + }, + }, + }, + } + ephemeralClaimX := &v1.PersistentVolumeClaim{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: ephemeralTwoVolumePod.Namespace, + Name: ephemeralTwoVolumePod.Name + "-" + ephemeralTwoVolumePod.Spec.Volumes[0].Name, + OwnerReferences: []metav1.OwnerReference{ + { + Kind: "Pod", + Name: ephemeralTwoVolumePod.Name, + UID: ephemeralTwoVolumePod.UID, + Controller: &controller, + }, + }, + }, + Spec: v1.PersistentVolumeClaimSpec{ + StorageClassName: &scName, + }, + } + ephemeralClaimY := &v1.PersistentVolumeClaim{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: ephemeralTwoVolumePod.Namespace, + Name: ephemeralTwoVolumePod.Name + "-" + ephemeralTwoVolumePod.Spec.Volumes[1].Name, + OwnerReferences: []metav1.OwnerReference{ + { + Kind: "Pod", + Name: ephemeralTwoVolumePod.Name, + UID: ephemeralTwoVolumePod.UID, + Controller: &controller, + }, + }, + }, + Spec: v1.PersistentVolumeClaimSpec{ + StorageClassName: &scName, + }, + } + tests := []struct { newPod *v1.Pod existingPods []*v1.Pod + extraClaims []v1.PersistentVolumeClaim filterName string maxVols int driverNames []string test string migrationEnabled bool + ephemeralEnabled bool limitSource string wantStatus *framework.Status }{ @@ -443,6 +545,97 @@ func TestCSILimits(t *testing.T) { limitSource: "csinode", test: "should not count in-tree and count csi volumes if migration is disabled (when scheduling in-tree volumes)", }, + // ephemeral volumes + { + newPod: ephemeralVolumePod, + filterName: "csi", + driverNames: []string{ebsCSIDriverName}, + test: "ephemeral volume feature disabled", + wantStatus: framework.NewStatus(framework.Error, "volume xyz is a generic ephemeral volume, but that feature is disabled in kube-scheduler"), + }, + { + newPod: ephemeralVolumePod, + filterName: "csi", + ephemeralEnabled: true, + driverNames: []string{ebsCSIDriverName}, + test: "ephemeral volume missing", + wantStatus: framework.NewStatus(framework.Error, `looking up PVC test/abc-xyz: persistentvolumeclaim "abc-xyz" not found`), + }, + { + newPod: ephemeralVolumePod, + filterName: "csi", + ephemeralEnabled: true, + extraClaims: []v1.PersistentVolumeClaim{*conflictingClaim}, + driverNames: []string{ebsCSIDriverName}, + test: "ephemeral volume not owned", + wantStatus: framework.NewStatus(framework.Error, "PVC test/abc-xyz is not owned by pod"), + }, + { + newPod: ephemeralVolumePod, + filterName: "csi", + ephemeralEnabled: true, + extraClaims: []v1.PersistentVolumeClaim{*ephemeralClaim}, + driverNames: []string{ebsCSIDriverName}, + test: "ephemeral volume unbound", + }, + { + newPod: ephemeralVolumePod, + filterName: "csi", + ephemeralEnabled: true, + extraClaims: []v1.PersistentVolumeClaim{*ephemeralClaim}, + driverNames: []string{ebsCSIDriverName}, + existingPods: []*v1.Pod{runningPod, csiEBSTwoVolPod}, + maxVols: 2, + limitSource: "node", + test: "ephemeral doesn't when node volume limit <= pods CSI volume", + wantStatus: framework.NewStatus(framework.Unschedulable, ErrReasonMaxVolumeCountExceeded), + }, + { + newPod: csiEBSOneVolPod, + filterName: "csi", + ephemeralEnabled: true, + extraClaims: []v1.PersistentVolumeClaim{*ephemeralClaimX, *ephemeralClaimY}, + driverNames: []string{ebsCSIDriverName}, + existingPods: []*v1.Pod{runningPod, ephemeralTwoVolumePod}, + maxVols: 2, + limitSource: "node", + test: "ephemeral doesn't when node volume limit <= pods ephemeral CSI volume", + wantStatus: framework.NewStatus(framework.Unschedulable, ErrReasonMaxVolumeCountExceeded), + }, + { + newPod: csiEBSOneVolPod, + filterName: "csi", + ephemeralEnabled: false, + extraClaims: []v1.PersistentVolumeClaim{*ephemeralClaim}, + driverNames: []string{ebsCSIDriverName}, + existingPods: []*v1.Pod{runningPod, ephemeralVolumePod, csiEBSTwoVolPod}, + maxVols: 3, + limitSource: "node", + test: "persistent doesn't when node volume limit <= pods ephemeral CSI volume + persistent volume, ephemeral disabled", + wantStatus: framework.NewStatus(framework.Unschedulable, ErrReasonMaxVolumeCountExceeded), + }, + { + newPod: csiEBSOneVolPod, + filterName: "csi", + ephemeralEnabled: true, + extraClaims: []v1.PersistentVolumeClaim{*ephemeralClaim}, + driverNames: []string{ebsCSIDriverName}, + existingPods: []*v1.Pod{runningPod, ephemeralVolumePod, csiEBSTwoVolPod}, + maxVols: 3, + limitSource: "node", + test: "persistent doesn't when node volume limit <= pods ephemeral CSI volume + persistent volume", + wantStatus: framework.NewStatus(framework.Unschedulable, ErrReasonMaxVolumeCountExceeded), + }, + { + newPod: csiEBSOneVolPod, + filterName: "csi", + ephemeralEnabled: true, + extraClaims: []v1.PersistentVolumeClaim{*ephemeralClaim}, + driverNames: []string{ebsCSIDriverName}, + existingPods: []*v1.Pod{runningPod, ephemeralVolumePod, csiEBSTwoVolPod}, + maxVols: 4, + test: "persistent okay when node volume limit > pods ephemeral CSI volume + persistent volume", + }, } // running attachable predicate tests with feature gate and limit present on nodes @@ -457,14 +650,15 @@ func TestCSILimits(t *testing.T) { defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.CSIMigration, false)() defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.CSIMigrationAWS, false)() } - p := &CSILimits{ csiNodeLister: getFakeCSINodeLister(csiNode), pvLister: getFakeCSIPVLister(test.filterName, test.driverNames...), - pvcLister: getFakeCSIPVCLister(test.filterName, "csi-sc", test.driverNames...), - scLister: getFakeCSIStorageClassLister("csi-sc", test.driverNames[0]), + pvcLister: append(getFakeCSIPVCLister(test.filterName, scName, test.driverNames...), test.extraClaims...), + scLister: getFakeCSIStorageClassLister(scName, test.driverNames[0]), randomVolumeIDPrefix: rand.String(32), translator: csitrans.New(), + + enableGenericEphemeralVolume: test.ephemeralEnabled, } gotStatus := p.Filter(context.Background(), nil, test.newPod, node) if !reflect.DeepEqual(gotStatus, test.wantStatus) { diff --git a/pkg/scheduler/framework/plugins/nodevolumelimits/non_csi.go b/pkg/scheduler/framework/plugins/nodevolumelimits/non_csi.go index 7910c1f7bec..78e8458b794 100644 --- a/pkg/scheduler/framework/plugins/nodevolumelimits/non_csi.go +++ b/pkg/scheduler/framework/plugins/nodevolumelimits/non_csi.go @@ -26,6 +26,7 @@ import ( v1 "k8s.io/api/core/v1" storage "k8s.io/api/storage/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/util/rand" "k8s.io/apimachinery/pkg/util/sets" @@ -35,6 +36,7 @@ import ( csilibplugins "k8s.io/csi-translation-lib/plugins" "k8s.io/klog/v2" "k8s.io/kubernetes/pkg/scheduler/framework" + "k8s.io/kubernetes/pkg/scheduler/framework/plugins/feature" "k8s.io/kubernetes/pkg/scheduler/framework/plugins/names" volumeutil "k8s.io/kubernetes/pkg/volume/util" ) @@ -68,36 +70,36 @@ const ( const AzureDiskName = names.AzureDiskLimits // NewAzureDisk returns function that initializes a new plugin and returns it. -func NewAzureDisk(_ runtime.Object, handle framework.Handle) (framework.Plugin, error) { +func NewAzureDisk(_ runtime.Object, handle framework.Handle, fts feature.Features) (framework.Plugin, error) { informerFactory := handle.SharedInformerFactory() - return newNonCSILimitsWithInformerFactory(azureDiskVolumeFilterType, informerFactory), nil + return newNonCSILimitsWithInformerFactory(azureDiskVolumeFilterType, informerFactory, fts), nil } // CinderName is the name of the plugin used in the plugin registry and configurations. const CinderName = names.CinderLimits // NewCinder returns function that initializes a new plugin and returns it. -func NewCinder(_ runtime.Object, handle framework.Handle) (framework.Plugin, error) { +func NewCinder(_ runtime.Object, handle framework.Handle, fts feature.Features) (framework.Plugin, error) { informerFactory := handle.SharedInformerFactory() - return newNonCSILimitsWithInformerFactory(cinderVolumeFilterType, informerFactory), nil + return newNonCSILimitsWithInformerFactory(cinderVolumeFilterType, informerFactory, fts), nil } // EBSName is the name of the plugin used in the plugin registry and configurations. const EBSName = names.EBSLimits // NewEBS returns function that initializes a new plugin and returns it. -func NewEBS(_ runtime.Object, handle framework.Handle) (framework.Plugin, error) { +func NewEBS(_ runtime.Object, handle framework.Handle, fts feature.Features) (framework.Plugin, error) { informerFactory := handle.SharedInformerFactory() - return newNonCSILimitsWithInformerFactory(ebsVolumeFilterType, informerFactory), nil + return newNonCSILimitsWithInformerFactory(ebsVolumeFilterType, informerFactory, fts), nil } // GCEPDName is the name of the plugin used in the plugin registry and configurations. const GCEPDName = names.GCEPDLimits // NewGCEPD returns function that initializes a new plugin and returns it. -func NewGCEPD(_ runtime.Object, handle framework.Handle) (framework.Plugin, error) { +func NewGCEPD(_ runtime.Object, handle framework.Handle, fts feature.Features) (framework.Plugin, error) { informerFactory := handle.SharedInformerFactory() - return newNonCSILimitsWithInformerFactory(gcePDVolumeFilterType, informerFactory), nil + return newNonCSILimitsWithInformerFactory(gcePDVolumeFilterType, informerFactory, fts), nil } // nonCSILimits contains information to check the max number of volumes for a plugin. @@ -115,6 +117,8 @@ type nonCSILimits struct { // It is used to prefix volumeID generated inside the predicate() method to // avoid conflicts with any real volume. randomVolumeIDPrefix string + + enableGenericEphemeralVolume bool } var _ framework.FilterPlugin = &nonCSILimits{} @@ -124,13 +128,14 @@ var _ framework.EnqueueExtensions = &nonCSILimits{} func newNonCSILimitsWithInformerFactory( filterName string, informerFactory informers.SharedInformerFactory, + fts feature.Features, ) framework.Plugin { pvLister := informerFactory.Core().V1().PersistentVolumes().Lister() pvcLister := informerFactory.Core().V1().PersistentVolumeClaims().Lister() csiNodesLister := informerFactory.Storage().V1().CSINodes().Lister() scLister := informerFactory.Storage().V1().StorageClasses().Lister() - return newNonCSILimits(filterName, csiNodesLister, scLister, pvLister, pvcLister) + return newNonCSILimits(filterName, csiNodesLister, scLister, pvLister, pvcLister, fts) } // newNonCSILimits creates a plugin which evaluates whether a pod can fit based on the @@ -149,6 +154,7 @@ func newNonCSILimits( scLister storagelisters.StorageClassLister, pvLister corelisters.PersistentVolumeLister, pvcLister corelisters.PersistentVolumeClaimLister, + fts feature.Features, ) framework.Plugin { var filter VolumeFilter var volumeLimitKey v1.ResourceName @@ -185,6 +191,8 @@ func newNonCSILimits( pvcLister: pvcLister, scLister: scLister, randomVolumeIDPrefix: rand.String(32), + + enableGenericEphemeralVolume: fts.EnableGenericEphemeralVolume, } return pl @@ -213,7 +221,7 @@ func (pl *nonCSILimits) Filter(ctx context.Context, _ *framework.CycleState, pod } newVolumes := make(sets.String) - if err := pl.filterVolumes(pod.Spec.Volumes, pod.Namespace, newVolumes); err != nil { + if err := pl.filterVolumes(pod, true /* new pod */, newVolumes); err != nil { return framework.AsStatus(err) } @@ -246,7 +254,7 @@ func (pl *nonCSILimits) Filter(ctx context.Context, _ *framework.CycleState, pod // count unique volumes existingVolumes := make(sets.String) for _, existingPod := range nodeInfo.Pods { - if err := pl.filterVolumes(existingPod.Pod.Spec.Volumes, existingPod.Pod.Namespace, existingVolumes); err != nil { + if err := pl.filterVolumes(existingPod.Pod, false /* existing pod */, existingVolumes); err != nil { return framework.AsStatus(err) } } @@ -270,57 +278,90 @@ func (pl *nonCSILimits) Filter(ctx context.Context, _ *framework.CycleState, pod return nil } -func (pl *nonCSILimits) filterVolumes(volumes []v1.Volume, namespace string, filteredVolumes sets.String) error { +func (pl *nonCSILimits) filterVolumes(pod *v1.Pod, newPod bool, filteredVolumes sets.String) error { + volumes := pod.Spec.Volumes for i := range volumes { vol := &volumes[i] if id, ok := pl.filter.FilterVolume(vol); ok { filteredVolumes.Insert(id) - } else if vol.PersistentVolumeClaim != nil { - pvcName := vol.PersistentVolumeClaim.ClaimName - if pvcName == "" { - return fmt.Errorf("PersistentVolumeClaim had no name") - } + continue + } - // Until we know real ID of the volume use namespace/pvcName as substitute - // with a random prefix (calculated and stored inside 'c' during initialization) - // to avoid conflicts with existing volume IDs. - pvID := fmt.Sprintf("%s-%s/%s", pl.randomVolumeIDPrefix, namespace, pvcName) - - pvc, err := pl.pvcLister.PersistentVolumeClaims(namespace).Get(pvcName) - if err != nil { - // If the PVC is invalid, we don't count the volume because - // there's no guarantee that it belongs to the running predicate. - klog.V(4).InfoS("Unable to look up PVC info, assuming PVC doesn't match predicate when counting limits", "PVC", fmt.Sprintf("%s/%s", namespace, pvcName), "err", err) - continue + pvcName := "" + ephemeral := false + switch { + case vol.PersistentVolumeClaim != nil: + pvcName = vol.PersistentVolumeClaim.ClaimName + case vol.Ephemeral != nil: + if !pl.enableGenericEphemeralVolume { + return fmt.Errorf( + "volume %s is a generic ephemeral volume, but that feature is disabled in kube-scheduler", + vol.Name, + ) } + // Generic ephemeral inline volumes also use a PVC, + // just with a computed name and certain ownership. + // That is checked below once the pvc object is + // retrieved. + pvcName = pod.Name + "-" + vol.Name + ephemeral = true + default: + continue + } + if pvcName == "" { + return fmt.Errorf("PersistentVolumeClaim had no name") + } - pvName := pvc.Spec.VolumeName - if pvName == "" { - // PVC is not bound. It was either deleted and created again or - // it was forcefully unbound by admin. The pod can still use the - // original PV where it was bound to, so we count the volume if - // it belongs to the running predicate. - if pl.matchProvisioner(pvc) { - klog.V(4).InfoS("PVC is not bound, assuming PVC matches predicate when counting limits", "PVC", fmt.Sprintf("%s/%s", namespace, pvcName)) - filteredVolumes.Insert(pvID) - } - continue - } + // Until we know real ID of the volume use namespace/pvcName as substitute + // with a random prefix (calculated and stored inside 'c' during initialization) + // to avoid conflicts with existing volume IDs. + pvID := fmt.Sprintf("%s-%s/%s", pl.randomVolumeIDPrefix, pod.Namespace, pvcName) - pv, err := pl.pvLister.Get(pvName) - if err != nil { - // If the PV is invalid and PVC belongs to the running predicate, - // log the error and count the PV towards the PV limit. - if pl.matchProvisioner(pvc) { - klog.V(4).InfoS("Unable to look up PV, assuming PV matches predicate when counting limits", "PV", fmt.Sprintf("%s/%s/%s", namespace, pvcName, pvName), "err", err) - filteredVolumes.Insert(pvID) - } - continue + pvc, err := pl.pvcLister.PersistentVolumeClaims(pod.Namespace).Get(pvcName) + if err != nil { + if newPod { + // The PVC is required to proceed with + // scheduling of a new pod because it cannot + // run without it. Bail out immediately. + return fmt.Errorf("looking up PVC %s/%s: %v", pod.Namespace, pvcName, err) } + // If the PVC is invalid, we don't count the volume because + // there's no guarantee that it belongs to the running predicate. + klog.V(4).InfoS("Unable to look up PVC info, assuming PVC doesn't match predicate when counting limits", "PVC", fmt.Sprintf("%s/%s", pod.Namespace, pvcName), "err", err) + continue + } - if id, ok := pl.filter.FilterPersistentVolume(pv); ok { - filteredVolumes.Insert(id) + // The PVC for an ephemeral volume must be owned by the pod. + if ephemeral && !metav1.IsControlledBy(pvc, pod) { + return fmt.Errorf("PVC %s/%s is not owned by pod", pod.Namespace, pvcName) + } + + pvName := pvc.Spec.VolumeName + if pvName == "" { + // PVC is not bound. It was either deleted and created again or + // it was forcefully unbound by admin. The pod can still use the + // original PV where it was bound to, so we count the volume if + // it belongs to the running predicate. + if pl.matchProvisioner(pvc) { + klog.V(4).InfoS("PVC is not bound, assuming PVC matches predicate when counting limits", "PVC", fmt.Sprintf("%s/%s", pod.Namespace, pvcName)) + filteredVolumes.Insert(pvID) } + continue + } + + pv, err := pl.pvLister.Get(pvName) + if err != nil { + // If the PV is invalid and PVC belongs to the running predicate, + // log the error and count the PV towards the PV limit. + if pl.matchProvisioner(pvc) { + klog.V(4).InfoS("Unable to look up PV, assuming PV matches predicate when counting limits", "PV", fmt.Sprintf("%s/%s/%s", pod.Namespace, pvcName, pvName), "err", err) + filteredVolumes.Insert(pvID) + } + continue + } + + if id, ok := pl.filter.FilterPersistentVolume(pv); ok { + filteredVolumes.Insert(id) } } diff --git a/pkg/scheduler/framework/plugins/nodevolumelimits/non_csi_test.go b/pkg/scheduler/framework/plugins/nodevolumelimits/non_csi_test.go index 35b9310371c..a4ebd9113bc 100644 --- a/pkg/scheduler/framework/plugins/nodevolumelimits/non_csi_test.go +++ b/pkg/scheduler/framework/plugins/nodevolumelimits/non_csi_test.go @@ -28,9 +28,114 @@ import ( csilibplugins "k8s.io/csi-translation-lib/plugins" "k8s.io/kubernetes/pkg/scheduler/framework" fakeframework "k8s.io/kubernetes/pkg/scheduler/framework/fake" + "k8s.io/kubernetes/pkg/scheduler/framework/plugins/feature" utilpointer "k8s.io/utils/pointer" ) +func TestEphemeralLimits(t *testing.T) { + // We have to specify a valid filter and arbitrarily pick Cinder here. + // It doesn't matter for the test cases. + filterName := cinderVolumeFilterType + driverName := csilibplugins.CinderInTreePluginName + + ephemeralVolumePod := &v1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: "test", + Name: "abc", + UID: "12345", + }, + Spec: v1.PodSpec{ + Volumes: []v1.Volume{ + { + Name: "xyz", + VolumeSource: v1.VolumeSource{ + Ephemeral: &v1.EphemeralVolumeSource{}, + }, + }, + }, + }, + } + controller := true + ephemeralClaim := &v1.PersistentVolumeClaim{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: ephemeralVolumePod.Namespace, + Name: ephemeralVolumePod.Name + "-" + ephemeralVolumePod.Spec.Volumes[0].Name, + OwnerReferences: []metav1.OwnerReference{ + { + Kind: "Pod", + Name: ephemeralVolumePod.Name, + UID: ephemeralVolumePod.UID, + Controller: &controller, + }, + }, + }, + Spec: v1.PersistentVolumeClaimSpec{ + VolumeName: "missing", + StorageClassName: &filterName, + }, + } + conflictingClaim := ephemeralClaim.DeepCopy() + conflictingClaim.OwnerReferences = nil + + tests := []struct { + newPod *v1.Pod + existingPods []*v1.Pod + extraClaims []v1.PersistentVolumeClaim + ephemeralEnabled bool + maxVols int + test string + wantStatus *framework.Status + }{ + { + newPod: ephemeralVolumePod, + test: "volume feature disabled", + wantStatus: framework.NewStatus(framework.Error, "volume xyz is a generic ephemeral volume, but that feature is disabled in kube-scheduler"), + }, + { + newPod: ephemeralVolumePod, + ephemeralEnabled: true, + test: "volume missing", + wantStatus: framework.NewStatus(framework.Error, `looking up PVC test/abc-xyz: persistentvolumeclaim "abc-xyz" not found`), + }, + { + newPod: ephemeralVolumePod, + ephemeralEnabled: true, + extraClaims: []v1.PersistentVolumeClaim{*conflictingClaim}, + test: "volume not owned", + wantStatus: framework.NewStatus(framework.Error, "PVC test/abc-xyz is not owned by pod"), + }, + { + newPod: ephemeralVolumePod, + ephemeralEnabled: true, + extraClaims: []v1.PersistentVolumeClaim{*ephemeralClaim}, + maxVols: 1, + test: "volume unbound, allowed", + }, + { + newPod: ephemeralVolumePod, + ephemeralEnabled: true, + extraClaims: []v1.PersistentVolumeClaim{*ephemeralClaim}, + maxVols: 0, + test: "volume unbound, exceeds limit", + wantStatus: framework.NewStatus(framework.Unschedulable, ErrReasonMaxVolumeCountExceeded), + }, + } + + for _, test := range tests { + t.Run(test.test, func(t *testing.T) { + fts := feature.Features{ + EnableGenericEphemeralVolume: test.ephemeralEnabled, + } + node, csiNode := getNodeWithPodAndVolumeLimits("node", test.existingPods, int64(test.maxVols), filterName) + p := newNonCSILimits(filterName, getFakeCSINodeLister(csiNode), getFakeCSIStorageClassLister(filterName, driverName), getFakePVLister(filterName), append(getFakePVCLister(filterName), test.extraClaims...), fts).(framework.FilterPlugin) + gotStatus := p.Filter(context.Background(), nil, test.newPod, node) + if !reflect.DeepEqual(gotStatus, test.wantStatus) { + t.Errorf("status does not match: %v, want: %v", gotStatus, test.wantStatus) + } + }) + } +} + func TestAzureDiskLimits(t *testing.T) { oneVolPod := &v1.Pod{ Spec: v1.PodSpec{ @@ -360,7 +465,7 @@ func TestAzureDiskLimits(t *testing.T) { for _, test := range tests { t.Run(test.test, func(t *testing.T) { node, csiNode := getNodeWithPodAndVolumeLimits("node", test.existingPods, int64(test.maxVols), test.filterName) - p := newNonCSILimits(test.filterName, getFakeCSINodeLister(csiNode), getFakeCSIStorageClassLister(test.filterName, test.driverName), getFakePVLister(test.filterName), getFakePVCLister(test.filterName)).(framework.FilterPlugin) + p := newNonCSILimits(test.filterName, getFakeCSINodeLister(csiNode), getFakeCSIStorageClassLister(test.filterName, test.driverName), getFakePVLister(test.filterName), getFakePVCLister(test.filterName), feature.Features{}).(framework.FilterPlugin) gotStatus := p.Filter(context.Background(), nil, test.newPod, node) if !reflect.DeepEqual(gotStatus, test.wantStatus) { t.Errorf("status does not match: %v, want: %v", gotStatus, test.wantStatus) @@ -427,7 +532,7 @@ func TestCinderLimits(t *testing.T) { for _, test := range tests { t.Run(test.test, func(t *testing.T) { node, csiNode := getNodeWithPodAndVolumeLimits("node", test.existingPods, int64(test.maxVols), test.filterName) - p := newNonCSILimits(test.filterName, getFakeCSINodeLister(csiNode), getFakeCSIStorageClassLister(test.filterName, test.driverName), getFakePVLister(test.filterName), getFakePVCLister(test.filterName)).(framework.FilterPlugin) + p := newNonCSILimits(test.filterName, getFakeCSINodeLister(csiNode), getFakeCSIStorageClassLister(test.filterName, test.driverName), getFakePVLister(test.filterName), getFakePVCLister(test.filterName), feature.Features{}).(framework.FilterPlugin) gotStatus := p.Filter(context.Background(), nil, test.newPod, node) if !reflect.DeepEqual(gotStatus, test.wantStatus) { t.Errorf("status does not match: %v, want: %v", gotStatus, test.wantStatus) @@ -834,7 +939,7 @@ func TestEBSLimits(t *testing.T) { for _, test := range tests { t.Run(test.test, func(t *testing.T) { node, csiNode := getNodeWithPodAndVolumeLimits("node", test.existingPods, int64(test.maxVols), test.filterName) - p := newNonCSILimits(test.filterName, getFakeCSINodeLister(csiNode), getFakeCSIStorageClassLister(test.filterName, test.driverName), getFakePVLister(test.filterName), getFakePVCLister(test.filterName)).(framework.FilterPlugin) + p := newNonCSILimits(test.filterName, getFakeCSINodeLister(csiNode), getFakeCSIStorageClassLister(test.filterName, test.driverName), getFakePVLister(test.filterName), getFakePVCLister(test.filterName), feature.Features{}).(framework.FilterPlugin) gotStatus := p.Filter(context.Background(), nil, test.newPod, node) if !reflect.DeepEqual(gotStatus, test.wantStatus) { t.Errorf("status does not match: %v, want: %v", gotStatus, test.wantStatus) @@ -1172,7 +1277,7 @@ func TestGCEPDLimits(t *testing.T) { for _, test := range tests { t.Run(test.test, func(t *testing.T) { node, csiNode := getNodeWithPodAndVolumeLimits("node", test.existingPods, int64(test.maxVols), test.filterName) - p := newNonCSILimits(test.filterName, getFakeCSINodeLister(csiNode), getFakeCSIStorageClassLister(test.filterName, test.driverName), getFakePVLister(test.filterName), getFakePVCLister(test.filterName)).(framework.FilterPlugin) + p := newNonCSILimits(test.filterName, getFakeCSINodeLister(csiNode), getFakeCSIStorageClassLister(test.filterName, test.driverName), getFakePVLister(test.filterName), getFakePVCLister(test.filterName), feature.Features{}).(framework.FilterPlugin) gotStatus := p.Filter(context.Background(), nil, test.newPod, node) if !reflect.DeepEqual(gotStatus, test.wantStatus) { t.Errorf("status does not match: %v, want: %v", gotStatus, test.wantStatus) diff --git a/pkg/scheduler/framework/plugins/registry.go b/pkg/scheduler/framework/plugins/registry.go index 6476c7a9bd9..fc33eedde43 100644 --- a/pkg/scheduler/framework/plugins/registry.go +++ b/pkg/scheduler/framework/plugins/registry.go @@ -75,11 +75,11 @@ func NewInTreeRegistry() runtime.Registry { volumebinding.Name: runtime.FactoryAdapter(fts, volumebinding.New), volumerestrictions.Name: runtime.FactoryAdapter(fts, volumerestrictions.New), volumezone.Name: volumezone.New, - nodevolumelimits.CSIName: nodevolumelimits.NewCSI, - nodevolumelimits.EBSName: nodevolumelimits.NewEBS, - nodevolumelimits.GCEPDName: nodevolumelimits.NewGCEPD, - nodevolumelimits.AzureDiskName: nodevolumelimits.NewAzureDisk, - nodevolumelimits.CinderName: nodevolumelimits.NewCinder, + nodevolumelimits.CSIName: runtime.FactoryAdapter(fts, nodevolumelimits.NewCSI), + nodevolumelimits.EBSName: runtime.FactoryAdapter(fts, nodevolumelimits.NewEBS), + nodevolumelimits.GCEPDName: runtime.FactoryAdapter(fts, nodevolumelimits.NewGCEPD), + nodevolumelimits.AzureDiskName: runtime.FactoryAdapter(fts, nodevolumelimits.NewAzureDisk), + nodevolumelimits.CinderName: runtime.FactoryAdapter(fts, nodevolumelimits.NewCinder), interpodaffinity.Name: runtime.FactoryAdapter(fts, interpodaffinity.New), nodelabel.Name: nodelabel.New, serviceaffinity.Name: serviceaffinity.New, diff --git a/test/e2e/storage/csi_mock_volume.go b/test/e2e/storage/csi_mock_volume.go index 8b9864a59ca..aba74f9d7dd 100644 --- a/test/e2e/storage/csi_mock_volume.go +++ b/test/e2e/storage/csi_mock_volume.go @@ -197,7 +197,11 @@ var _ = utils.SIGDescribe("CSI mock volume", func() { framework.ExpectNoError(err, "Failed to register CSIDriver %v", m.config.GetUniqueDriverName()) } - createPod := func(ephemeral bool) (class *storagev1.StorageClass, claim *v1.PersistentVolumeClaim, pod *v1.Pod) { + type volumeType string + csiEphemeral := volumeType("CSI") + genericEphemeral := volumeType("Ephemeral") + pvcReference := volumeType("PVC") + createPod := func(withVolume volumeType) (class *storagev1.StorageClass, claim *v1.PersistentVolumeClaim, pod *v1.Pod) { ginkgo.By("Creating pod") sc := m.driver.GetDynamicProvisionStorageClass(m.config, "") scTest := testsuites.StorageClassTest{ @@ -213,12 +217,21 @@ var _ = utils.SIGDescribe("CSI mock volume", func() { // The mock driver only works when everything runs on a single node. nodeSelection := m.config.ClientNodeSelection - if ephemeral { + switch withVolume { + case csiEphemeral: pod = startPausePodInline(f.ClientSet, scTest, nodeSelection, f.Namespace.Name) - if pod != nil { - m.pods = append(m.pods, pod) + case genericEphemeral: + class, pod = startPausePodGenericEphemeral(f.ClientSet, scTest, nodeSelection, m.tp.scName, f.Namespace.Name) + if class != nil { + m.sc[class.Name] = class } - } else { + claim = &v1.PersistentVolumeClaim{ + ObjectMeta: metav1.ObjectMeta{ + Name: pod.Name + "-" + pod.Spec.Volumes[0].Name, + Namespace: f.Namespace.Name, + }, + } + case pvcReference: class, claim, pod = startPausePod(f.ClientSet, scTest, nodeSelection, m.tp.scName, f.Namespace.Name) if class != nil { m.sc[class.Name] = class @@ -226,9 +239,9 @@ var _ = utils.SIGDescribe("CSI mock volume", func() { if claim != nil { m.pvcs = append(m.pvcs, claim) } - if pod != nil { - m.pods = append(m.pods, pod) - } + } + if pod != nil { + m.pods = append(m.pods, pod) } return // result variables set above } @@ -318,6 +331,7 @@ var _ = utils.SIGDescribe("CSI mock volume", func() { name string disableAttach bool deployClusterRegistrar bool + volumeType volumeType }{ { name: "should not require VolumeAttach for drivers without attachment", @@ -328,6 +342,11 @@ var _ = utils.SIGDescribe("CSI mock volume", func() { name: "should require VolumeAttach for drivers with attachment", deployClusterRegistrar: true, }, + { + name: "should require VolumeAttach for ephemermal volume and drivers with attachment", + deployClusterRegistrar: true, + volumeType: genericEphemeral, + }, { name: "should preserve attachment policy when no CSIDriver present", deployClusterRegistrar: false, @@ -340,7 +359,11 @@ var _ = utils.SIGDescribe("CSI mock volume", func() { init(testParameters{registerDriver: test.deployClusterRegistrar, disableAttach: test.disableAttach}) defer cleanup() - _, claim, pod := createPod(false) + volumeType := t.volumeType + if volumeType == "" { + volumeType = pvcReference + } + _, claim, pod := createPod(volumeType) if pod == nil { return } @@ -375,7 +398,7 @@ var _ = utils.SIGDescribe("CSI mock volume", func() { init(testParameters{registerDriver: false, disableAttach: true}) defer cleanup() - _, claim, pod := createPod(false /* persistent volume, late binding as specified above */) + _, claim, pod := createPod(pvcReference) // late binding as specified above if pod == nil { return } @@ -497,7 +520,11 @@ var _ = utils.SIGDescribe("CSI mock volume", func() { defer cleanup() - _, _, pod := createPod(test.expectEphemeral) + withVolume := pvcReference + if test.expectEphemeral { + withVolume = csiEphemeral + } + _, _, pod := createPod(withVolume) if pod == nil { return } @@ -539,23 +566,73 @@ var _ = utils.SIGDescribe("CSI mock volume", func() { gomega.Expect(csiNodeAttachLimit).To(gomega.BeNumerically("==", 2)) - _, _, pod1 := createPod(false) + _, _, pod1 := createPod(pvcReference) gomega.Expect(pod1).NotTo(gomega.BeNil(), "while creating first pod") err = e2epod.WaitForPodNameRunningInNamespace(m.cs, pod1.Name, pod1.Namespace) framework.ExpectNoError(err, "Failed to start pod1: %v", err) - _, _, pod2 := createPod(false) + _, _, pod2 := createPod(pvcReference) gomega.Expect(pod2).NotTo(gomega.BeNil(), "while creating second pod") err = e2epod.WaitForPodNameRunningInNamespace(m.cs, pod2.Name, pod2.Namespace) framework.ExpectNoError(err, "Failed to start pod2: %v", err) - _, _, pod3 := createPod(false) + _, _, pod3 := createPod(pvcReference) gomega.Expect(pod3).NotTo(gomega.BeNil(), "while creating third pod") err = waitForMaxVolumeCondition(pod3, m.cs) framework.ExpectNoError(err, "while waiting for max volume condition on pod : %+v", pod3) }) + + ginkgo.It("should report attach limit for generic ephemeral volume when persistent volume is attached [Slow]", func() { + // define volume limit to be 2 for this test + var err error + init(testParameters{attachLimit: 1}) + defer cleanup() + nodeName := m.config.ClientNodeSelection.Name + driverName := m.config.GetUniqueDriverName() + + csiNodeAttachLimit, err := checkCSINodeForLimits(nodeName, driverName, m.cs) + framework.ExpectNoError(err, "while checking limits in CSINode: %v", err) + + gomega.Expect(csiNodeAttachLimit).To(gomega.BeNumerically("==", 1)) + + _, _, pod1 := createPod(pvcReference) + gomega.Expect(pod1).NotTo(gomega.BeNil(), "while creating pod with persistent volume") + + err = e2epod.WaitForPodNameRunningInNamespace(m.cs, pod1.Name, pod1.Namespace) + framework.ExpectNoError(err, "Failed to start pod1: %v", err) + + _, _, pod2 := createPod(genericEphemeral) + gomega.Expect(pod2).NotTo(gomega.BeNil(), "while creating pod with ephemeral volume") + err = waitForMaxVolumeCondition(pod2, m.cs) + framework.ExpectNoError(err, "while waiting for max volume condition on pod : %+v", pod2) + }) + + ginkgo.It("should report attach limit for persistent volume when generic ephemeral volume is attached [Slow]", func() { + // define volume limit to be 2 for this test + var err error + init(testParameters{attachLimit: 1}) + defer cleanup() + nodeName := m.config.ClientNodeSelection.Name + driverName := m.config.GetUniqueDriverName() + + csiNodeAttachLimit, err := checkCSINodeForLimits(nodeName, driverName, m.cs) + framework.ExpectNoError(err, "while checking limits in CSINode: %v", err) + + gomega.Expect(csiNodeAttachLimit).To(gomega.BeNumerically("==", 1)) + + _, _, pod1 := createPod(genericEphemeral) + gomega.Expect(pod1).NotTo(gomega.BeNil(), "while creating pod with persistent volume") + + err = e2epod.WaitForPodNameRunningInNamespace(m.cs, pod1.Name, pod1.Namespace) + framework.ExpectNoError(err, "Failed to start pod1: %v", err) + + _, _, pod2 := createPod(pvcReference) + gomega.Expect(pod2).NotTo(gomega.BeNil(), "while creating pod with ephemeral volume") + err = waitForMaxVolumeCondition(pod2, m.cs) + framework.ExpectNoError(err, "while waiting for max volume condition on pod : %+v", pod2) + }) }) ginkgo.Context("CSI Volume expansion", func() { @@ -603,7 +680,7 @@ var _ = utils.SIGDescribe("CSI mock volume", func() { init(tp) defer cleanup() - sc, pvc, pod := createPod(false) + sc, pvc, pod := createPod(pvcReference) gomega.Expect(pod).NotTo(gomega.BeNil(), "while creating pod for resizing") framework.ExpectEqual(*sc.AllowVolumeExpansion, true, "failed creating sc with allowed expansion") @@ -696,7 +773,7 @@ var _ = utils.SIGDescribe("CSI mock volume", func() { defer cleanup() - sc, pvc, pod := createPod(false) + sc, pvc, pod := createPod(pvcReference) gomega.Expect(pod).NotTo(gomega.BeNil(), "while creating pod for resizing") framework.ExpectEqual(*sc.AllowVolumeExpansion, true, "failed creating sc with allowed expansion") @@ -837,7 +914,7 @@ var _ = utils.SIGDescribe("CSI mock volume", func() { }) defer cleanup() - _, claim, pod := createPod(false) + _, claim, pod := createPod(pvcReference) if pod == nil { return } @@ -975,7 +1052,7 @@ var _ = utils.SIGDescribe("CSI mock volume", func() { }) defer cleanup() - _, claim, pod := createPod(false) + _, claim, pod := createPod(pvcReference) if pod == nil { return } @@ -1121,7 +1198,7 @@ var _ = utils.SIGDescribe("CSI mock volume", func() { framework.ExpectNoError(err, "create PVC watch") defer pvcWatch.Stop() - sc, claim, pod := createPod(false) + sc, claim, pod := createPod(pvcReference) gomega.Expect(pod).NotTo(gomega.BeNil(), "while creating pod") bindingMode := storagev1.VolumeBindingImmediate if test.lateBinding { @@ -1331,7 +1408,7 @@ var _ = utils.SIGDescribe("CSI mock volume", func() { syncDelay := 5 * time.Second time.Sleep(syncDelay) - sc, _, pod := createPod(false /* persistent volume, late binding as specified above */) + sc, _, pod := createPod(pvcReference) // late binding as specified above framework.ExpectEqual(sc.Name, scName, "pre-selected storage class name not used") waitCtx, cancel := context.WithTimeout(context.Background(), f.Timeouts.PodStart) @@ -1530,7 +1607,7 @@ var _ = utils.SIGDescribe("CSI mock volume", func() { defer cleanup() - _, _, pod := createPod(false) + _, _, pod := createPod(pvcReference) if pod == nil { return } @@ -1993,7 +2070,7 @@ func checkCSINodeForLimits(nodeName string, driverName string, cs clientset.Inte return attachLimit, nil } -func createClaim(cs clientset.Interface, t testsuites.StorageClassTest, node e2epod.NodeSelection, scName, ns string) (*storagev1.StorageClass, *v1.PersistentVolumeClaim) { +func createSC(cs clientset.Interface, t testsuites.StorageClassTest, scName, ns string) *storagev1.StorageClass { class := newStorageClass(t, ns, "") if scName != "" { class.Name = scName @@ -2005,12 +2082,17 @@ func createClaim(cs clientset.Interface, t testsuites.StorageClassTest, node e2e framework.ExpectNoError(err, "Failed to create class: %v", err) } + return class +} + +func createClaim(cs clientset.Interface, t testsuites.StorageClassTest, node e2epod.NodeSelection, scName, ns string) (*storagev1.StorageClass, *v1.PersistentVolumeClaim) { + class := createSC(cs, t, scName, ns) claim := e2epv.MakePersistentVolumeClaim(e2epv.PersistentVolumeClaimConfig{ ClaimSize: t.ClaimSize, StorageClassName: &(class.Name), VolumeMode: &t.VolumeMode, }, ns) - claim, err = cs.CoreV1().PersistentVolumeClaims(ns).Create(context.TODO(), claim, metav1.CreateOptions{}) + claim, err := cs.CoreV1().PersistentVolumeClaims(ns).Create(context.TODO(), claim, metav1.CreateOptions{}) framework.ExpectNoError(err, "Failed to create claim: %v", err) if !t.DelayBinding { @@ -2046,6 +2128,21 @@ func startPausePodInline(cs clientset.Interface, t testsuites.StorageClassTest, return pod } +func startPausePodGenericEphemeral(cs clientset.Interface, t testsuites.StorageClassTest, node e2epod.NodeSelection, scName, ns string) (*storagev1.StorageClass, *v1.Pod) { + class := createSC(cs, t, scName, ns) + claim := e2epv.MakePersistentVolumeClaim(e2epv.PersistentVolumeClaimConfig{ + ClaimSize: t.ClaimSize, + StorageClassName: &(class.Name), + VolumeMode: &t.VolumeMode, + }, ns) + pod, err := startPausePodWithVolumeSource(cs, v1.VolumeSource{ + Ephemeral: &v1.EphemeralVolumeSource{ + VolumeClaimTemplate: &v1.PersistentVolumeClaimTemplate{Spec: claim.Spec}}, + }, node, ns) + framework.ExpectNoError(err, "Failed to create pod: %v", err) + return class, pod +} + func startPausePodWithClaim(cs clientset.Interface, pvc *v1.PersistentVolumeClaim, node e2epod.NodeSelection, ns string) (*v1.Pod, error) { return startPausePodWithVolumeSource(cs, v1.VolumeSource{