consider ephemeral volumes for host path and node limits check

When adding the ephemeral volume feature, the special case for
PersistentVolumeClaim volume sources in kubelet's host path and node
limits checks was overlooked. An ephemeral volume source is another
way of referencing a claim and has to be treated the same way.
This commit is contained in:
Patrick Ohly 2021-03-23 10:23:06 +01:00
parent dc2fe6d56c
commit 1e26115df5
8 changed files with 629 additions and 129 deletions

View File

@ -2018,21 +2018,31 @@ func hasHostNamespace(pod *v1.Pod) bool {
// hasHostMountPVC returns true if a PVC is referencing a HostPath volume. // hasHostMountPVC returns true if a PVC is referencing a HostPath volume.
func (kl *Kubelet) hasHostMountPVC(pod *v1.Pod) bool { func (kl *Kubelet) hasHostMountPVC(pod *v1.Pod) bool {
for _, volume := range pod.Spec.Volumes { for _, volume := range pod.Spec.Volumes {
if volume.PersistentVolumeClaim != nil { pvcName := ""
pvc, err := kl.kubeClient.CoreV1().PersistentVolumeClaims(pod.Namespace).Get(context.TODO(), volume.PersistentVolumeClaim.ClaimName, metav1.GetOptions{}) switch {
if err != nil { case volume.PersistentVolumeClaim != nil:
klog.InfoS("Unable to retrieve pvc", "pvc", klog.KRef(pod.Namespace, volume.PersistentVolumeClaim.ClaimName), "err", err) pvcName = volume.PersistentVolumeClaim.ClaimName
case volume.Ephemeral != nil:
if !utilfeature.DefaultFeatureGate.Enabled(features.GenericEphemeralVolume) {
continue continue
} }
if pvc != nil { pvcName = pod.Name + "-" + volume.Name
referencedVolume, err := kl.kubeClient.CoreV1().PersistentVolumes().Get(context.TODO(), pvc.Spec.VolumeName, metav1.GetOptions{}) default:
if err != nil { continue
klog.InfoS("Unable to retrieve pv", "pvName", pvc.Spec.VolumeName, "err", err) }
continue pvc, err := kl.kubeClient.CoreV1().PersistentVolumeClaims(pod.Namespace).Get(context.TODO(), pvcName, metav1.GetOptions{})
} if err != nil {
if referencedVolume != nil && referencedVolume.Spec.HostPath != nil { klog.InfoS("Unable to retrieve pvc", "pvc", klog.KRef(pod.Namespace, pvcName), "err", err)
return true continue
} }
if pvc != nil {
referencedVolume, err := kl.kubeClient.CoreV1().PersistentVolumes().Get(context.TODO(), pvc.Spec.VolumeName, metav1.GetOptions{})
if err != nil {
klog.InfoS("Unable to retrieve pv", "pvName", pvc.Spec.VolumeName, "err", err)
continue
}
if referencedVolume != nil && referencedVolume.Spec.HostPath != nil {
return true
} }
} }
} }

View File

@ -38,8 +38,10 @@ import (
"k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/diff" "k8s.io/apimachinery/pkg/util/diff"
utilfeature "k8s.io/apiserver/pkg/util/feature"
core "k8s.io/client-go/testing" core "k8s.io/client-go/testing"
"k8s.io/client-go/tools/record" "k8s.io/client-go/tools/record"
featuregatetesting "k8s.io/component-base/featuregate/testing"
netutils "k8s.io/utils/net" netutils "k8s.io/utils/net"
// TODO: remove this import if // TODO: remove this import if
@ -47,6 +49,7 @@ import (
// to "v1"? // to "v1"?
_ "k8s.io/kubernetes/pkg/apis/core/install" _ "k8s.io/kubernetes/pkg/apis/core/install"
"k8s.io/kubernetes/pkg/features"
kubecontainer "k8s.io/kubernetes/pkg/kubelet/container" kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
containertest "k8s.io/kubernetes/pkg/kubelet/container/testing" containertest "k8s.io/kubernetes/pkg/kubelet/container/testing"
"k8s.io/kubernetes/pkg/kubelet/cri/streaming/portforward" "k8s.io/kubernetes/pkg/kubelet/cri/streaming/portforward"
@ -2886,13 +2889,16 @@ func TestGetPortForward(t *testing.T) {
} }
func TestHasHostMountPVC(t *testing.T) { func TestHasHostMountPVC(t *testing.T) {
tests := map[string]struct { type testcase struct {
pvError error pvError error
pvcError error pvcError error
expected bool expected bool
podHasPVC bool podHasPVC bool
pvcIsHostPath bool pvcIsHostPath bool
}{ podHasEphemeral bool
ephemeralEnabled bool
}
tests := map[string]testcase{
"no pvc": {podHasPVC: false, expected: false}, "no pvc": {podHasPVC: false, expected: false},
"error fetching pvc": { "error fetching pvc": {
podHasPVC: true, podHasPVC: true,
@ -2909,6 +2915,18 @@ func TestHasHostMountPVC(t *testing.T) {
pvcIsHostPath: true, pvcIsHostPath: true,
expected: true, expected: true,
}, },
"enabled ephemeral host path": {
podHasEphemeral: true,
pvcIsHostPath: true,
ephemeralEnabled: true,
expected: true,
},
"disabled ephemeral host path": {
podHasEphemeral: true,
pvcIsHostPath: true,
ephemeralEnabled: false,
expected: false,
},
"non host path pvc": { "non host path pvc": {
podHasPVC: true, podHasPVC: true,
pvcIsHostPath: false, pvcIsHostPath: false,
@ -2916,7 +2934,8 @@ func TestHasHostMountPVC(t *testing.T) {
}, },
} }
for k, v := range tests { run := func(t *testing.T, v testcase) {
defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.GenericEphemeralVolume, v.ephemeralEnabled)()
testKubelet := newTestKubelet(t, false) testKubelet := newTestKubelet(t, false)
defer testKubelet.Cleanup() defer testKubelet.Cleanup()
pod := &v1.Pod{ pod := &v1.Pod{
@ -2935,13 +2954,23 @@ func TestHasHostMountPVC(t *testing.T) {
}, },
}, },
} }
}
if v.pvcIsHostPath { if v.podHasEphemeral {
volumeToReturn.Spec.PersistentVolumeSource = v1.PersistentVolumeSource{ pod.Spec.Volumes = []v1.Volume{
HostPath: &v1.HostPathVolumeSource{}, {
} Name: "xyz",
VolumeSource: v1.VolumeSource{
Ephemeral: &v1.EphemeralVolumeSource{},
},
},
} }
}
if (v.podHasPVC || v.podHasEphemeral) && v.pvcIsHostPath {
volumeToReturn.Spec.PersistentVolumeSource = v1.PersistentVolumeSource{
HostPath: &v1.HostPathVolumeSource{},
}
} }
testKubelet.fakeKubeClient.AddReactor("get", "persistentvolumeclaims", func(action core.Action) (bool, runtime.Object, error) { testKubelet.fakeKubeClient.AddReactor("get", "persistentvolumeclaims", func(action core.Action) (bool, runtime.Object, error) {
@ -2957,9 +2986,14 @@ func TestHasHostMountPVC(t *testing.T) {
actual := testKubelet.kubelet.hasHostMountPVC(pod) actual := testKubelet.kubelet.hasHostMountPVC(pod)
if actual != v.expected { if actual != v.expected {
t.Errorf("%s expected %t but got %t", k, v.expected, actual) t.Errorf("expected %t but got %t", v.expected, actual)
} }
}
for k, v := range tests {
t.Run(k, func(t *testing.T) {
run(t, v)
})
} }
} }

View File

@ -22,6 +22,7 @@ import (
v1 "k8s.io/api/core/v1" v1 "k8s.io/api/core/v1"
storagev1 "k8s.io/api/storage/v1" storagev1 "k8s.io/api/storage/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/util/rand" "k8s.io/apimachinery/pkg/util/rand"
corelisters "k8s.io/client-go/listers/core/v1" corelisters "k8s.io/client-go/listers/core/v1"
@ -30,6 +31,7 @@ import (
csitrans "k8s.io/csi-translation-lib" csitrans "k8s.io/csi-translation-lib"
"k8s.io/klog/v2" "k8s.io/klog/v2"
"k8s.io/kubernetes/pkg/scheduler/framework" "k8s.io/kubernetes/pkg/scheduler/framework"
"k8s.io/kubernetes/pkg/scheduler/framework/plugins/feature"
"k8s.io/kubernetes/pkg/scheduler/framework/plugins/names" "k8s.io/kubernetes/pkg/scheduler/framework/plugins/names"
volumeutil "k8s.io/kubernetes/pkg/volume/util" volumeutil "k8s.io/kubernetes/pkg/volume/util"
) )
@ -54,6 +56,8 @@ type CSILimits struct {
randomVolumeIDPrefix string randomVolumeIDPrefix string
translator InTreeToCSITranslator translator InTreeToCSITranslator
enableGenericEphemeralVolume bool
} }
var _ framework.FilterPlugin = &CSILimits{} var _ framework.FilterPlugin = &CSILimits{}
@ -96,7 +100,7 @@ func (pl *CSILimits) Filter(ctx context.Context, _ *framework.CycleState, pod *v
} }
newVolumes := make(map[string]string) newVolumes := make(map[string]string)
if err := pl.filterAttachableVolumes(csiNode, pod.Spec.Volumes, pod.Namespace, newVolumes); err != nil { if err := pl.filterAttachableVolumes(pod, csiNode, true /* new pod */, newVolumes); err != nil {
return framework.AsStatus(err) return framework.AsStatus(err)
} }
@ -113,7 +117,7 @@ func (pl *CSILimits) Filter(ctx context.Context, _ *framework.CycleState, pod *v
attachedVolumes := make(map[string]string) attachedVolumes := make(map[string]string)
for _, existingPod := range nodeInfo.Pods { for _, existingPod := range nodeInfo.Pods {
if err := pl.filterAttachableVolumes(csiNode, existingPod.Pod.Spec.Volumes, existingPod.Pod.Namespace, attachedVolumes); err != nil { if err := pl.filterAttachableVolumes(existingPod.Pod, csiNode, false /* existing pod */, attachedVolumes); err != nil {
return framework.AsStatus(err) return framework.AsStatus(err)
} }
} }
@ -144,25 +148,47 @@ func (pl *CSILimits) Filter(ctx context.Context, _ *framework.CycleState, pod *v
} }
func (pl *CSILimits) filterAttachableVolumes( func (pl *CSILimits) filterAttachableVolumes(
csiNode *storagev1.CSINode, volumes []v1.Volume, namespace string, result map[string]string) error { pod *v1.Pod, csiNode *storagev1.CSINode, newPod bool, result map[string]string) error {
for _, vol := range volumes { for _, vol := range pod.Spec.Volumes {
// CSI volumes can only be used as persistent volumes // CSI volumes can only be used through a PVC.
if vol.PersistentVolumeClaim == nil { pvcName := ""
ephemeral := false
switch {
case vol.PersistentVolumeClaim != nil:
pvcName = vol.PersistentVolumeClaim.ClaimName
case vol.Ephemeral != nil:
if newPod && !pl.enableGenericEphemeralVolume {
return fmt.Errorf(
"volume %s is a generic ephemeral volume, but that feature is disabled in kube-scheduler",
vol.Name,
)
}
// Generic ephemeral inline volumes also use a PVC,
// just with a computed name and certain ownership.
// That is checked below once the pvc object is
// retrieved.
pvcName = pod.Name + "-" + vol.Name
ephemeral = true
default:
continue continue
} }
pvcName := vol.PersistentVolumeClaim.ClaimName
if pvcName == "" { if pvcName == "" {
return fmt.Errorf("PersistentVolumeClaim had no name") return fmt.Errorf("PersistentVolumeClaim had no name")
} }
pvc, err := pl.pvcLister.PersistentVolumeClaims(namespace).Get(pvcName) pvc, err := pl.pvcLister.PersistentVolumeClaims(pod.Namespace).Get(pvcName)
if err != nil { if err != nil {
klog.V(5).InfoS("Unable to look up PVC info", "PVC", fmt.Sprintf("%s/%s", namespace, pvcName)) klog.V(5).InfoS("Unable to look up PVC info", "PVC", fmt.Sprintf("%s/%s", pod.Namespace, pvcName))
continue continue
} }
// The PVC for an ephemeral volume must be owned by the pod.
if ephemeral && !metav1.IsControlledBy(pvc, pod) {
return fmt.Errorf("PVC %s/%s is not owned by pod", pod.Namespace, pvcName)
}
driverName, volumeHandle := pl.getCSIDriverInfo(csiNode, pvc) driverName, volumeHandle := pl.getCSIDriverInfo(csiNode, pvc)
if driverName == "" || volumeHandle == "" { if driverName == "" || volumeHandle == "" {
klog.V(5).Info("Could not find a CSI driver name or volume handle, not counting volume") klog.V(5).Info("Could not find a CSI driver name or volume handle, not counting volume")
@ -276,7 +302,7 @@ func (pl *CSILimits) getCSIDriverInfoFromSC(csiNode *storagev1.CSINode, pvc *v1.
} }
// NewCSI initializes a new plugin and returns it. // NewCSI initializes a new plugin and returns it.
func NewCSI(_ runtime.Object, handle framework.Handle) (framework.Plugin, error) { func NewCSI(_ runtime.Object, handle framework.Handle, fts feature.Features) (framework.Plugin, error) {
informerFactory := handle.SharedInformerFactory() informerFactory := handle.SharedInformerFactory()
pvLister := informerFactory.Core().V1().PersistentVolumes().Lister() pvLister := informerFactory.Core().V1().PersistentVolumes().Lister()
pvcLister := informerFactory.Core().V1().PersistentVolumeClaims().Lister() pvcLister := informerFactory.Core().V1().PersistentVolumeClaims().Lister()
@ -284,12 +310,13 @@ func NewCSI(_ runtime.Object, handle framework.Handle) (framework.Plugin, error)
scLister := informerFactory.Storage().V1().StorageClasses().Lister() scLister := informerFactory.Storage().V1().StorageClasses().Lister()
return &CSILimits{ return &CSILimits{
csiNodeLister: csiNodesLister, csiNodeLister: csiNodesLister,
pvLister: pvLister, pvLister: pvLister,
pvcLister: pvcLister, pvcLister: pvcLister,
scLister: scLister, scLister: scLister,
randomVolumeIDPrefix: rand.String(32), randomVolumeIDPrefix: rand.String(32),
translator: csitrans.New(), translator: csitrans.New(),
enableGenericEphemeralVolume: fts.EnableGenericEphemeralVolume,
}, nil }, nil
} }

View File

@ -47,6 +47,10 @@ const (
hostpathInTreePluginName = "kubernetes.io/hostpath" hostpathInTreePluginName = "kubernetes.io/hostpath"
) )
var (
scName = "csi-sc"
)
// getVolumeLimitKey returns a ResourceName by filter type // getVolumeLimitKey returns a ResourceName by filter type
func getVolumeLimitKey(filterType string) v1.ResourceName { func getVolumeLimitKey(filterType string) v1.ResourceName {
switch filterType { switch filterType {
@ -236,14 +240,112 @@ func TestCSILimits(t *testing.T) {
}, },
} }
ephemeralVolumePod := &v1.Pod{
ObjectMeta: metav1.ObjectMeta{
Namespace: "test",
Name: "abc",
UID: "12345",
},
Spec: v1.PodSpec{
Volumes: []v1.Volume{
{
Name: "xyz",
VolumeSource: v1.VolumeSource{
Ephemeral: &v1.EphemeralVolumeSource{},
},
},
},
},
}
controller := true
ephemeralClaim := &v1.PersistentVolumeClaim{
ObjectMeta: metav1.ObjectMeta{
Namespace: ephemeralVolumePod.Namespace,
Name: ephemeralVolumePod.Name + "-" + ephemeralVolumePod.Spec.Volumes[0].Name,
OwnerReferences: []metav1.OwnerReference{
{
Kind: "Pod",
Name: ephemeralVolumePod.Name,
UID: ephemeralVolumePod.UID,
Controller: &controller,
},
},
},
Spec: v1.PersistentVolumeClaimSpec{
StorageClassName: &scName,
},
}
conflictingClaim := ephemeralClaim.DeepCopy()
conflictingClaim.OwnerReferences = nil
ephemeralTwoVolumePod := &v1.Pod{
ObjectMeta: metav1.ObjectMeta{
Namespace: "test",
Name: "abc",
UID: "12345II",
},
Spec: v1.PodSpec{
Volumes: []v1.Volume{
{
Name: "x",
VolumeSource: v1.VolumeSource{
Ephemeral: &v1.EphemeralVolumeSource{},
},
},
{
Name: "y",
VolumeSource: v1.VolumeSource{
Ephemeral: &v1.EphemeralVolumeSource{},
},
},
},
},
}
ephemeralClaimX := &v1.PersistentVolumeClaim{
ObjectMeta: metav1.ObjectMeta{
Namespace: ephemeralTwoVolumePod.Namespace,
Name: ephemeralTwoVolumePod.Name + "-" + ephemeralTwoVolumePod.Spec.Volumes[0].Name,
OwnerReferences: []metav1.OwnerReference{
{
Kind: "Pod",
Name: ephemeralTwoVolumePod.Name,
UID: ephemeralTwoVolumePod.UID,
Controller: &controller,
},
},
},
Spec: v1.PersistentVolumeClaimSpec{
StorageClassName: &scName,
},
}
ephemeralClaimY := &v1.PersistentVolumeClaim{
ObjectMeta: metav1.ObjectMeta{
Namespace: ephemeralTwoVolumePod.Namespace,
Name: ephemeralTwoVolumePod.Name + "-" + ephemeralTwoVolumePod.Spec.Volumes[1].Name,
OwnerReferences: []metav1.OwnerReference{
{
Kind: "Pod",
Name: ephemeralTwoVolumePod.Name,
UID: ephemeralTwoVolumePod.UID,
Controller: &controller,
},
},
},
Spec: v1.PersistentVolumeClaimSpec{
StorageClassName: &scName,
},
}
tests := []struct { tests := []struct {
newPod *v1.Pod newPod *v1.Pod
existingPods []*v1.Pod existingPods []*v1.Pod
extraClaims []v1.PersistentVolumeClaim
filterName string filterName string
maxVols int maxVols int
driverNames []string driverNames []string
test string test string
migrationEnabled bool migrationEnabled bool
ephemeralEnabled bool
limitSource string limitSource string
wantStatus *framework.Status wantStatus *framework.Status
}{ }{
@ -443,6 +545,96 @@ func TestCSILimits(t *testing.T) {
limitSource: "csinode", limitSource: "csinode",
test: "should not count in-tree and count csi volumes if migration is disabled (when scheduling in-tree volumes)", test: "should not count in-tree and count csi volumes if migration is disabled (when scheduling in-tree volumes)",
}, },
// ephemeral volumes
{
newPod: ephemeralVolumePod,
filterName: "csi",
driverNames: []string{ebsCSIDriverName},
test: "ephemeral volume feature disabled",
wantStatus: framework.NewStatus(framework.Error, "volume xyz is a generic ephemeral volume, but that feature is disabled in kube-scheduler"),
},
{
newPod: ephemeralVolumePod,
filterName: "csi",
ephemeralEnabled: true,
driverNames: []string{ebsCSIDriverName},
test: "ephemeral volume missing",
},
{
newPod: ephemeralVolumePod,
filterName: "csi",
ephemeralEnabled: true,
extraClaims: []v1.PersistentVolumeClaim{*conflictingClaim},
driverNames: []string{ebsCSIDriverName},
test: "ephemeral volume not owned",
wantStatus: framework.NewStatus(framework.Error, "PVC test/abc-xyz is not owned by pod"),
},
{
newPod: ephemeralVolumePod,
filterName: "csi",
ephemeralEnabled: true,
extraClaims: []v1.PersistentVolumeClaim{*ephemeralClaim},
driverNames: []string{ebsCSIDriverName},
test: "ephemeral volume unbound",
},
{
newPod: ephemeralVolumePod,
filterName: "csi",
ephemeralEnabled: true,
extraClaims: []v1.PersistentVolumeClaim{*ephemeralClaim},
driverNames: []string{ebsCSIDriverName},
existingPods: []*v1.Pod{runningPod, csiEBSTwoVolPod},
maxVols: 2,
limitSource: "node",
test: "ephemeral doesn't when node volume limit <= pods CSI volume",
wantStatus: framework.NewStatus(framework.Unschedulable, ErrReasonMaxVolumeCountExceeded),
},
{
newPod: csiEBSOneVolPod,
filterName: "csi",
ephemeralEnabled: true,
extraClaims: []v1.PersistentVolumeClaim{*ephemeralClaimX, *ephemeralClaimY},
driverNames: []string{ebsCSIDriverName},
existingPods: []*v1.Pod{runningPod, ephemeralTwoVolumePod},
maxVols: 2,
limitSource: "node",
test: "ephemeral doesn't when node volume limit <= pods ephemeral CSI volume",
wantStatus: framework.NewStatus(framework.Unschedulable, ErrReasonMaxVolumeCountExceeded),
},
{
newPod: csiEBSOneVolPod,
filterName: "csi",
ephemeralEnabled: false,
extraClaims: []v1.PersistentVolumeClaim{*ephemeralClaim},
driverNames: []string{ebsCSIDriverName},
existingPods: []*v1.Pod{runningPod, ephemeralVolumePod, csiEBSTwoVolPod},
maxVols: 3,
limitSource: "node",
test: "persistent doesn't when node volume limit <= pods ephemeral CSI volume + persistent volume, ephemeral disabled",
wantStatus: framework.NewStatus(framework.Unschedulable, ErrReasonMaxVolumeCountExceeded),
},
{
newPod: csiEBSOneVolPod,
filterName: "csi",
ephemeralEnabled: true,
extraClaims: []v1.PersistentVolumeClaim{*ephemeralClaim},
driverNames: []string{ebsCSIDriverName},
existingPods: []*v1.Pod{runningPod, ephemeralVolumePod, csiEBSTwoVolPod},
maxVols: 3,
limitSource: "node",
test: "persistent doesn't when node volume limit <= pods ephemeral CSI volume + persistent volume",
wantStatus: framework.NewStatus(framework.Unschedulable, ErrReasonMaxVolumeCountExceeded),
},
{
newPod: csiEBSOneVolPod,
filterName: "csi",
ephemeralEnabled: true,
extraClaims: []v1.PersistentVolumeClaim{*ephemeralClaim},
driverNames: []string{ebsCSIDriverName},
existingPods: []*v1.Pod{runningPod, ephemeralVolumePod, csiEBSTwoVolPod},
maxVols: 4,
test: "persistent okay when node volume limit > pods ephemeral CSI volume + persistent volume",
},
} }
// running attachable predicate tests with feature gate and limit present on nodes // running attachable predicate tests with feature gate and limit present on nodes
@ -457,14 +649,15 @@ func TestCSILimits(t *testing.T) {
defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.CSIMigration, false)() defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.CSIMigration, false)()
defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.CSIMigrationAWS, false)() defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.CSIMigrationAWS, false)()
} }
p := &CSILimits{ p := &CSILimits{
csiNodeLister: getFakeCSINodeLister(csiNode), csiNodeLister: getFakeCSINodeLister(csiNode),
pvLister: getFakeCSIPVLister(test.filterName, test.driverNames...), pvLister: getFakeCSIPVLister(test.filterName, test.driverNames...),
pvcLister: getFakeCSIPVCLister(test.filterName, "csi-sc", test.driverNames...), pvcLister: append(getFakeCSIPVCLister(test.filterName, scName, test.driverNames...), test.extraClaims...),
scLister: getFakeCSIStorageClassLister("csi-sc", test.driverNames[0]), scLister: getFakeCSIStorageClassLister(scName, test.driverNames[0]),
randomVolumeIDPrefix: rand.String(32), randomVolumeIDPrefix: rand.String(32),
translator: csitrans.New(), translator: csitrans.New(),
enableGenericEphemeralVolume: test.ephemeralEnabled,
} }
gotStatus := p.Filter(context.Background(), nil, test.newPod, node) gotStatus := p.Filter(context.Background(), nil, test.newPod, node)
if !reflect.DeepEqual(gotStatus, test.wantStatus) { if !reflect.DeepEqual(gotStatus, test.wantStatus) {

View File

@ -26,6 +26,7 @@ import (
v1 "k8s.io/api/core/v1" v1 "k8s.io/api/core/v1"
storage "k8s.io/api/storage/v1" storage "k8s.io/api/storage/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/util/rand" "k8s.io/apimachinery/pkg/util/rand"
"k8s.io/apimachinery/pkg/util/sets" "k8s.io/apimachinery/pkg/util/sets"
@ -35,6 +36,7 @@ import (
csilibplugins "k8s.io/csi-translation-lib/plugins" csilibplugins "k8s.io/csi-translation-lib/plugins"
"k8s.io/klog/v2" "k8s.io/klog/v2"
"k8s.io/kubernetes/pkg/scheduler/framework" "k8s.io/kubernetes/pkg/scheduler/framework"
"k8s.io/kubernetes/pkg/scheduler/framework/plugins/feature"
"k8s.io/kubernetes/pkg/scheduler/framework/plugins/names" "k8s.io/kubernetes/pkg/scheduler/framework/plugins/names"
volumeutil "k8s.io/kubernetes/pkg/volume/util" volumeutil "k8s.io/kubernetes/pkg/volume/util"
) )
@ -68,36 +70,36 @@ const (
const AzureDiskName = names.AzureDiskLimits const AzureDiskName = names.AzureDiskLimits
// NewAzureDisk returns function that initializes a new plugin and returns it. // NewAzureDisk returns function that initializes a new plugin and returns it.
func NewAzureDisk(_ runtime.Object, handle framework.Handle) (framework.Plugin, error) { func NewAzureDisk(_ runtime.Object, handle framework.Handle, fts feature.Features) (framework.Plugin, error) {
informerFactory := handle.SharedInformerFactory() informerFactory := handle.SharedInformerFactory()
return newNonCSILimitsWithInformerFactory(azureDiskVolumeFilterType, informerFactory), nil return newNonCSILimitsWithInformerFactory(azureDiskVolumeFilterType, informerFactory, fts), nil
} }
// CinderName is the name of the plugin used in the plugin registry and configurations. // CinderName is the name of the plugin used in the plugin registry and configurations.
const CinderName = names.CinderLimits const CinderName = names.CinderLimits
// NewCinder returns function that initializes a new plugin and returns it. // NewCinder returns function that initializes a new plugin and returns it.
func NewCinder(_ runtime.Object, handle framework.Handle) (framework.Plugin, error) { func NewCinder(_ runtime.Object, handle framework.Handle, fts feature.Features) (framework.Plugin, error) {
informerFactory := handle.SharedInformerFactory() informerFactory := handle.SharedInformerFactory()
return newNonCSILimitsWithInformerFactory(cinderVolumeFilterType, informerFactory), nil return newNonCSILimitsWithInformerFactory(cinderVolumeFilterType, informerFactory, fts), nil
} }
// EBSName is the name of the plugin used in the plugin registry and configurations. // EBSName is the name of the plugin used in the plugin registry and configurations.
const EBSName = names.EBSLimits const EBSName = names.EBSLimits
// NewEBS returns function that initializes a new plugin and returns it. // NewEBS returns function that initializes a new plugin and returns it.
func NewEBS(_ runtime.Object, handle framework.Handle) (framework.Plugin, error) { func NewEBS(_ runtime.Object, handle framework.Handle, fts feature.Features) (framework.Plugin, error) {
informerFactory := handle.SharedInformerFactory() informerFactory := handle.SharedInformerFactory()
return newNonCSILimitsWithInformerFactory(ebsVolumeFilterType, informerFactory), nil return newNonCSILimitsWithInformerFactory(ebsVolumeFilterType, informerFactory, fts), nil
} }
// GCEPDName is the name of the plugin used in the plugin registry and configurations. // GCEPDName is the name of the plugin used in the plugin registry and configurations.
const GCEPDName = names.GCEPDLimits const GCEPDName = names.GCEPDLimits
// NewGCEPD returns function that initializes a new plugin and returns it. // NewGCEPD returns function that initializes a new plugin and returns it.
func NewGCEPD(_ runtime.Object, handle framework.Handle) (framework.Plugin, error) { func NewGCEPD(_ runtime.Object, handle framework.Handle, fts feature.Features) (framework.Plugin, error) {
informerFactory := handle.SharedInformerFactory() informerFactory := handle.SharedInformerFactory()
return newNonCSILimitsWithInformerFactory(gcePDVolumeFilterType, informerFactory), nil return newNonCSILimitsWithInformerFactory(gcePDVolumeFilterType, informerFactory, fts), nil
} }
// nonCSILimits contains information to check the max number of volumes for a plugin. // nonCSILimits contains information to check the max number of volumes for a plugin.
@ -115,6 +117,8 @@ type nonCSILimits struct {
// It is used to prefix volumeID generated inside the predicate() method to // It is used to prefix volumeID generated inside the predicate() method to
// avoid conflicts with any real volume. // avoid conflicts with any real volume.
randomVolumeIDPrefix string randomVolumeIDPrefix string
enableGenericEphemeralVolume bool
} }
var _ framework.FilterPlugin = &nonCSILimits{} var _ framework.FilterPlugin = &nonCSILimits{}
@ -124,13 +128,14 @@ var _ framework.EnqueueExtensions = &nonCSILimits{}
func newNonCSILimitsWithInformerFactory( func newNonCSILimitsWithInformerFactory(
filterName string, filterName string,
informerFactory informers.SharedInformerFactory, informerFactory informers.SharedInformerFactory,
fts feature.Features,
) framework.Plugin { ) framework.Plugin {
pvLister := informerFactory.Core().V1().PersistentVolumes().Lister() pvLister := informerFactory.Core().V1().PersistentVolumes().Lister()
pvcLister := informerFactory.Core().V1().PersistentVolumeClaims().Lister() pvcLister := informerFactory.Core().V1().PersistentVolumeClaims().Lister()
csiNodesLister := informerFactory.Storage().V1().CSINodes().Lister() csiNodesLister := informerFactory.Storage().V1().CSINodes().Lister()
scLister := informerFactory.Storage().V1().StorageClasses().Lister() scLister := informerFactory.Storage().V1().StorageClasses().Lister()
return newNonCSILimits(filterName, csiNodesLister, scLister, pvLister, pvcLister) return newNonCSILimits(filterName, csiNodesLister, scLister, pvLister, pvcLister, fts)
} }
// newNonCSILimits creates a plugin which evaluates whether a pod can fit based on the // newNonCSILimits creates a plugin which evaluates whether a pod can fit based on the
@ -149,6 +154,7 @@ func newNonCSILimits(
scLister storagelisters.StorageClassLister, scLister storagelisters.StorageClassLister,
pvLister corelisters.PersistentVolumeLister, pvLister corelisters.PersistentVolumeLister,
pvcLister corelisters.PersistentVolumeClaimLister, pvcLister corelisters.PersistentVolumeClaimLister,
fts feature.Features,
) framework.Plugin { ) framework.Plugin {
var filter VolumeFilter var filter VolumeFilter
var volumeLimitKey v1.ResourceName var volumeLimitKey v1.ResourceName
@ -185,6 +191,8 @@ func newNonCSILimits(
pvcLister: pvcLister, pvcLister: pvcLister,
scLister: scLister, scLister: scLister,
randomVolumeIDPrefix: rand.String(32), randomVolumeIDPrefix: rand.String(32),
enableGenericEphemeralVolume: fts.EnableGenericEphemeralVolume,
} }
return pl return pl
@ -213,7 +221,7 @@ func (pl *nonCSILimits) Filter(ctx context.Context, _ *framework.CycleState, pod
} }
newVolumes := make(sets.String) newVolumes := make(sets.String)
if err := pl.filterVolumes(pod.Spec.Volumes, pod.Namespace, newVolumes); err != nil { if err := pl.filterVolumes(pod, newVolumes); err != nil {
return framework.AsStatus(err) return framework.AsStatus(err)
} }
@ -246,7 +254,7 @@ func (pl *nonCSILimits) Filter(ctx context.Context, _ *framework.CycleState, pod
// count unique volumes // count unique volumes
existingVolumes := make(sets.String) existingVolumes := make(sets.String)
for _, existingPod := range nodeInfo.Pods { for _, existingPod := range nodeInfo.Pods {
if err := pl.filterVolumes(existingPod.Pod.Spec.Volumes, existingPod.Pod.Namespace, existingVolumes); err != nil { if err := pl.filterVolumes(existingPod.Pod, existingVolumes); err != nil {
return framework.AsStatus(err) return framework.AsStatus(err)
} }
} }
@ -270,57 +278,84 @@ func (pl *nonCSILimits) Filter(ctx context.Context, _ *framework.CycleState, pod
return nil return nil
} }
func (pl *nonCSILimits) filterVolumes(volumes []v1.Volume, namespace string, filteredVolumes sets.String) error { func (pl *nonCSILimits) filterVolumes(pod *v1.Pod, filteredVolumes sets.String) error {
volumes := pod.Spec.Volumes
for i := range volumes { for i := range volumes {
vol := &volumes[i] vol := &volumes[i]
if id, ok := pl.filter.FilterVolume(vol); ok { if id, ok := pl.filter.FilterVolume(vol); ok {
filteredVolumes.Insert(id) filteredVolumes.Insert(id)
} else if vol.PersistentVolumeClaim != nil { continue
pvcName := vol.PersistentVolumeClaim.ClaimName }
if pvcName == "" {
return fmt.Errorf("PersistentVolumeClaim had no name")
}
// Until we know real ID of the volume use namespace/pvcName as substitute pvcName := ""
// with a random prefix (calculated and stored inside 'c' during initialization) ephemeral := false
// to avoid conflicts with existing volume IDs. switch {
pvID := fmt.Sprintf("%s-%s/%s", pl.randomVolumeIDPrefix, namespace, pvcName) case vol.PersistentVolumeClaim != nil:
pvcName = vol.PersistentVolumeClaim.ClaimName
pvc, err := pl.pvcLister.PersistentVolumeClaims(namespace).Get(pvcName) case vol.Ephemeral != nil:
if err != nil { if !pl.enableGenericEphemeralVolume {
// If the PVC is invalid, we don't count the volume because return fmt.Errorf(
// there's no guarantee that it belongs to the running predicate. "volume %s is a generic ephemeral volume, but that feature is disabled in kube-scheduler",
klog.V(4).InfoS("Unable to look up PVC info, assuming PVC doesn't match predicate when counting limits", "PVC", fmt.Sprintf("%s/%s", namespace, pvcName), "err", err) vol.Name,
continue )
} }
// Generic ephemeral inline volumes also use a PVC,
// just with a computed name and certain ownership.
// That is checked below once the pvc object is
// retrieved.
pvcName = pod.Name + "-" + vol.Name
ephemeral = true
default:
continue
}
if pvcName == "" {
return fmt.Errorf("PersistentVolumeClaim had no name")
}
pvName := pvc.Spec.VolumeName // Until we know real ID of the volume use namespace/pvcName as substitute
if pvName == "" { // with a random prefix (calculated and stored inside 'c' during initialization)
// PVC is not bound. It was either deleted and created again or // to avoid conflicts with existing volume IDs.
// it was forcefully unbound by admin. The pod can still use the pvID := fmt.Sprintf("%s-%s/%s", pl.randomVolumeIDPrefix, pod.Namespace, pvcName)
// original PV where it was bound to, so we count the volume if
// it belongs to the running predicate.
if pl.matchProvisioner(pvc) {
klog.V(4).InfoS("PVC is not bound, assuming PVC matches predicate when counting limits", "PVC", fmt.Sprintf("%s/%s", namespace, pvcName))
filteredVolumes.Insert(pvID)
}
continue
}
pv, err := pl.pvLister.Get(pvName) pvc, err := pl.pvcLister.PersistentVolumeClaims(pod.Namespace).Get(pvcName)
if err != nil { if err != nil {
// If the PV is invalid and PVC belongs to the running predicate, // If the PVC is invalid, we don't count the volume because
// log the error and count the PV towards the PV limit. // there's no guarantee that it belongs to the running predicate.
if pl.matchProvisioner(pvc) { klog.V(4).InfoS("Unable to look up PVC info, assuming PVC doesn't match predicate when counting limits", "PVC", fmt.Sprintf("%s/%s", pod.Namespace, pvcName), "err", err)
klog.V(4).InfoS("Unable to look up PV, assuming PV matches predicate when counting limits", "PV", fmt.Sprintf("%s/%s/%s", namespace, pvcName, pvName), "err", err) continue
filteredVolumes.Insert(pvID) }
}
continue
}
if id, ok := pl.filter.FilterPersistentVolume(pv); ok { // The PVC for an ephemeral volume must be owned by the pod.
filteredVolumes.Insert(id) if ephemeral && !metav1.IsControlledBy(pvc, pod) {
return fmt.Errorf("PVC %s/%s is not owned by pod", pod.Namespace, pvcName)
}
pvName := pvc.Spec.VolumeName
if pvName == "" {
// PVC is not bound. It was either deleted and created again or
// it was forcefully unbound by admin. The pod can still use the
// original PV where it was bound to, so we count the volume if
// it belongs to the running predicate.
if pl.matchProvisioner(pvc) {
klog.V(4).InfoS("PVC is not bound, assuming PVC matches predicate when counting limits", "PVC", fmt.Sprintf("%s/%s", pod.Namespace, pvcName))
filteredVolumes.Insert(pvID)
} }
continue
}
pv, err := pl.pvLister.Get(pvName)
if err != nil {
// If the PV is invalid and PVC belongs to the running predicate,
// log the error and count the PV towards the PV limit.
if pl.matchProvisioner(pvc) {
klog.V(4).InfoS("Unable to look up PV, assuming PV matches predicate when counting limits", "PV", fmt.Sprintf("%s/%s/%s", pod.Namespace, pvcName, pvName), "err", err)
filteredVolumes.Insert(pvID)
}
continue
}
if id, ok := pl.filter.FilterPersistentVolume(pv); ok {
filteredVolumes.Insert(id)
} }
} }

View File

@ -28,9 +28,113 @@ import (
csilibplugins "k8s.io/csi-translation-lib/plugins" csilibplugins "k8s.io/csi-translation-lib/plugins"
"k8s.io/kubernetes/pkg/scheduler/framework" "k8s.io/kubernetes/pkg/scheduler/framework"
fakeframework "k8s.io/kubernetes/pkg/scheduler/framework/fake" fakeframework "k8s.io/kubernetes/pkg/scheduler/framework/fake"
"k8s.io/kubernetes/pkg/scheduler/framework/plugins/feature"
utilpointer "k8s.io/utils/pointer" utilpointer "k8s.io/utils/pointer"
) )
func TestEphemeralLimits(t *testing.T) {
// We have to specify a valid filter and arbitrarily pick Cinder here.
// It doesn't matter for the test cases.
filterName := cinderVolumeFilterType
driverName := csilibplugins.CinderInTreePluginName
ephemeralVolumePod := &v1.Pod{
ObjectMeta: metav1.ObjectMeta{
Namespace: "test",
Name: "abc",
UID: "12345",
},
Spec: v1.PodSpec{
Volumes: []v1.Volume{
{
Name: "xyz",
VolumeSource: v1.VolumeSource{
Ephemeral: &v1.EphemeralVolumeSource{},
},
},
},
},
}
controller := true
ephemeralClaim := &v1.PersistentVolumeClaim{
ObjectMeta: metav1.ObjectMeta{
Namespace: ephemeralVolumePod.Namespace,
Name: ephemeralVolumePod.Name + "-" + ephemeralVolumePod.Spec.Volumes[0].Name,
OwnerReferences: []metav1.OwnerReference{
{
Kind: "Pod",
Name: ephemeralVolumePod.Name,
UID: ephemeralVolumePod.UID,
Controller: &controller,
},
},
},
Spec: v1.PersistentVolumeClaimSpec{
VolumeName: "missing",
StorageClassName: &filterName,
},
}
conflictingClaim := ephemeralClaim.DeepCopy()
conflictingClaim.OwnerReferences = nil
tests := []struct {
newPod *v1.Pod
existingPods []*v1.Pod
extraClaims []v1.PersistentVolumeClaim
ephemeralEnabled bool
maxVols int
test string
wantStatus *framework.Status
}{
{
newPod: ephemeralVolumePod,
test: "volume feature disabled",
wantStatus: framework.NewStatus(framework.Error, "volume xyz is a generic ephemeral volume, but that feature is disabled in kube-scheduler"),
},
{
newPod: ephemeralVolumePod,
ephemeralEnabled: true,
test: "volume missing",
},
{
newPod: ephemeralVolumePod,
ephemeralEnabled: true,
extraClaims: []v1.PersistentVolumeClaim{*conflictingClaim},
test: "volume not owned",
wantStatus: framework.NewStatus(framework.Error, "PVC test/abc-xyz is not owned by pod"),
},
{
newPod: ephemeralVolumePod,
ephemeralEnabled: true,
extraClaims: []v1.PersistentVolumeClaim{*ephemeralClaim},
maxVols: 1,
test: "volume unbound, allowed",
},
{
newPod: ephemeralVolumePod,
ephemeralEnabled: true,
extraClaims: []v1.PersistentVolumeClaim{*ephemeralClaim},
maxVols: 0,
test: "volume unbound, exceeds limit",
wantStatus: framework.NewStatus(framework.Unschedulable, ErrReasonMaxVolumeCountExceeded),
},
}
for _, test := range tests {
t.Run(test.test, func(t *testing.T) {
fts := feature.Features{
EnableGenericEphemeralVolume: test.ephemeralEnabled,
}
node, csiNode := getNodeWithPodAndVolumeLimits("node", test.existingPods, int64(test.maxVols), filterName)
p := newNonCSILimits(filterName, getFakeCSINodeLister(csiNode), getFakeCSIStorageClassLister(filterName, driverName), getFakePVLister(filterName), append(getFakePVCLister(filterName), test.extraClaims...), fts).(framework.FilterPlugin)
gotStatus := p.Filter(context.Background(), nil, test.newPod, node)
if !reflect.DeepEqual(gotStatus, test.wantStatus) {
t.Errorf("status does not match: %v, want: %v", gotStatus, test.wantStatus)
}
})
}
}
func TestAzureDiskLimits(t *testing.T) { func TestAzureDiskLimits(t *testing.T) {
oneVolPod := &v1.Pod{ oneVolPod := &v1.Pod{
Spec: v1.PodSpec{ Spec: v1.PodSpec{
@ -360,7 +464,7 @@ func TestAzureDiskLimits(t *testing.T) {
for _, test := range tests { for _, test := range tests {
t.Run(test.test, func(t *testing.T) { t.Run(test.test, func(t *testing.T) {
node, csiNode := getNodeWithPodAndVolumeLimits("node", test.existingPods, int64(test.maxVols), test.filterName) node, csiNode := getNodeWithPodAndVolumeLimits("node", test.existingPods, int64(test.maxVols), test.filterName)
p := newNonCSILimits(test.filterName, getFakeCSINodeLister(csiNode), getFakeCSIStorageClassLister(test.filterName, test.driverName), getFakePVLister(test.filterName), getFakePVCLister(test.filterName)).(framework.FilterPlugin) p := newNonCSILimits(test.filterName, getFakeCSINodeLister(csiNode), getFakeCSIStorageClassLister(test.filterName, test.driverName), getFakePVLister(test.filterName), getFakePVCLister(test.filterName), feature.Features{}).(framework.FilterPlugin)
gotStatus := p.Filter(context.Background(), nil, test.newPod, node) gotStatus := p.Filter(context.Background(), nil, test.newPod, node)
if !reflect.DeepEqual(gotStatus, test.wantStatus) { if !reflect.DeepEqual(gotStatus, test.wantStatus) {
t.Errorf("status does not match: %v, want: %v", gotStatus, test.wantStatus) t.Errorf("status does not match: %v, want: %v", gotStatus, test.wantStatus)
@ -427,7 +531,7 @@ func TestCinderLimits(t *testing.T) {
for _, test := range tests { for _, test := range tests {
t.Run(test.test, func(t *testing.T) { t.Run(test.test, func(t *testing.T) {
node, csiNode := getNodeWithPodAndVolumeLimits("node", test.existingPods, int64(test.maxVols), test.filterName) node, csiNode := getNodeWithPodAndVolumeLimits("node", test.existingPods, int64(test.maxVols), test.filterName)
p := newNonCSILimits(test.filterName, getFakeCSINodeLister(csiNode), getFakeCSIStorageClassLister(test.filterName, test.driverName), getFakePVLister(test.filterName), getFakePVCLister(test.filterName)).(framework.FilterPlugin) p := newNonCSILimits(test.filterName, getFakeCSINodeLister(csiNode), getFakeCSIStorageClassLister(test.filterName, test.driverName), getFakePVLister(test.filterName), getFakePVCLister(test.filterName), feature.Features{}).(framework.FilterPlugin)
gotStatus := p.Filter(context.Background(), nil, test.newPod, node) gotStatus := p.Filter(context.Background(), nil, test.newPod, node)
if !reflect.DeepEqual(gotStatus, test.wantStatus) { if !reflect.DeepEqual(gotStatus, test.wantStatus) {
t.Errorf("status does not match: %v, want: %v", gotStatus, test.wantStatus) t.Errorf("status does not match: %v, want: %v", gotStatus, test.wantStatus)
@ -834,7 +938,7 @@ func TestEBSLimits(t *testing.T) {
for _, test := range tests { for _, test := range tests {
t.Run(test.test, func(t *testing.T) { t.Run(test.test, func(t *testing.T) {
node, csiNode := getNodeWithPodAndVolumeLimits("node", test.existingPods, int64(test.maxVols), test.filterName) node, csiNode := getNodeWithPodAndVolumeLimits("node", test.existingPods, int64(test.maxVols), test.filterName)
p := newNonCSILimits(test.filterName, getFakeCSINodeLister(csiNode), getFakeCSIStorageClassLister(test.filterName, test.driverName), getFakePVLister(test.filterName), getFakePVCLister(test.filterName)).(framework.FilterPlugin) p := newNonCSILimits(test.filterName, getFakeCSINodeLister(csiNode), getFakeCSIStorageClassLister(test.filterName, test.driverName), getFakePVLister(test.filterName), getFakePVCLister(test.filterName), feature.Features{}).(framework.FilterPlugin)
gotStatus := p.Filter(context.Background(), nil, test.newPod, node) gotStatus := p.Filter(context.Background(), nil, test.newPod, node)
if !reflect.DeepEqual(gotStatus, test.wantStatus) { if !reflect.DeepEqual(gotStatus, test.wantStatus) {
t.Errorf("status does not match: %v, want: %v", gotStatus, test.wantStatus) t.Errorf("status does not match: %v, want: %v", gotStatus, test.wantStatus)
@ -1172,7 +1276,7 @@ func TestGCEPDLimits(t *testing.T) {
for _, test := range tests { for _, test := range tests {
t.Run(test.test, func(t *testing.T) { t.Run(test.test, func(t *testing.T) {
node, csiNode := getNodeWithPodAndVolumeLimits("node", test.existingPods, int64(test.maxVols), test.filterName) node, csiNode := getNodeWithPodAndVolumeLimits("node", test.existingPods, int64(test.maxVols), test.filterName)
p := newNonCSILimits(test.filterName, getFakeCSINodeLister(csiNode), getFakeCSIStorageClassLister(test.filterName, test.driverName), getFakePVLister(test.filterName), getFakePVCLister(test.filterName)).(framework.FilterPlugin) p := newNonCSILimits(test.filterName, getFakeCSINodeLister(csiNode), getFakeCSIStorageClassLister(test.filterName, test.driverName), getFakePVLister(test.filterName), getFakePVCLister(test.filterName), feature.Features{}).(framework.FilterPlugin)
gotStatus := p.Filter(context.Background(), nil, test.newPod, node) gotStatus := p.Filter(context.Background(), nil, test.newPod, node)
if !reflect.DeepEqual(gotStatus, test.wantStatus) { if !reflect.DeepEqual(gotStatus, test.wantStatus) {
t.Errorf("status does not match: %v, want: %v", gotStatus, test.wantStatus) t.Errorf("status does not match: %v, want: %v", gotStatus, test.wantStatus)

View File

@ -75,11 +75,11 @@ func NewInTreeRegistry() runtime.Registry {
volumebinding.Name: runtime.FactoryAdapter(fts, volumebinding.New), volumebinding.Name: runtime.FactoryAdapter(fts, volumebinding.New),
volumerestrictions.Name: runtime.FactoryAdapter(fts, volumerestrictions.New), volumerestrictions.Name: runtime.FactoryAdapter(fts, volumerestrictions.New),
volumezone.Name: volumezone.New, volumezone.Name: volumezone.New,
nodevolumelimits.CSIName: nodevolumelimits.NewCSI, nodevolumelimits.CSIName: runtime.FactoryAdapter(fts, nodevolumelimits.NewCSI),
nodevolumelimits.EBSName: nodevolumelimits.NewEBS, nodevolumelimits.EBSName: runtime.FactoryAdapter(fts, nodevolumelimits.NewEBS),
nodevolumelimits.GCEPDName: nodevolumelimits.NewGCEPD, nodevolumelimits.GCEPDName: runtime.FactoryAdapter(fts, nodevolumelimits.NewGCEPD),
nodevolumelimits.AzureDiskName: nodevolumelimits.NewAzureDisk, nodevolumelimits.AzureDiskName: runtime.FactoryAdapter(fts, nodevolumelimits.NewAzureDisk),
nodevolumelimits.CinderName: nodevolumelimits.NewCinder, nodevolumelimits.CinderName: runtime.FactoryAdapter(fts, nodevolumelimits.NewCinder),
interpodaffinity.Name: runtime.FactoryAdapter(fts, interpodaffinity.New), interpodaffinity.Name: runtime.FactoryAdapter(fts, interpodaffinity.New),
nodelabel.Name: nodelabel.New, nodelabel.Name: nodelabel.New,
serviceaffinity.Name: serviceaffinity.New, serviceaffinity.Name: serviceaffinity.New,

View File

@ -197,7 +197,11 @@ var _ = utils.SIGDescribe("CSI mock volume", func() {
framework.ExpectNoError(err, "Failed to register CSIDriver %v", m.config.GetUniqueDriverName()) framework.ExpectNoError(err, "Failed to register CSIDriver %v", m.config.GetUniqueDriverName())
} }
createPod := func(ephemeral bool) (class *storagev1.StorageClass, claim *v1.PersistentVolumeClaim, pod *v1.Pod) { type volumeType string
csiEphemeral := volumeType("CSI")
genericEphemeral := volumeType("Ephemeral")
pvcReference := volumeType("PVC")
createPod := func(withVolume volumeType) (class *storagev1.StorageClass, claim *v1.PersistentVolumeClaim, pod *v1.Pod) {
ginkgo.By("Creating pod") ginkgo.By("Creating pod")
sc := m.driver.GetDynamicProvisionStorageClass(m.config, "") sc := m.driver.GetDynamicProvisionStorageClass(m.config, "")
scTest := testsuites.StorageClassTest{ scTest := testsuites.StorageClassTest{
@ -213,12 +217,21 @@ var _ = utils.SIGDescribe("CSI mock volume", func() {
// The mock driver only works when everything runs on a single node. // The mock driver only works when everything runs on a single node.
nodeSelection := m.config.ClientNodeSelection nodeSelection := m.config.ClientNodeSelection
if ephemeral { switch withVolume {
case csiEphemeral:
pod = startPausePodInline(f.ClientSet, scTest, nodeSelection, f.Namespace.Name) pod = startPausePodInline(f.ClientSet, scTest, nodeSelection, f.Namespace.Name)
if pod != nil { case genericEphemeral:
m.pods = append(m.pods, pod) class, pod = startPausePodGenericEphemeral(f.ClientSet, scTest, nodeSelection, m.tp.scName, f.Namespace.Name)
if class != nil {
m.sc[class.Name] = class
} }
} else { claim = &v1.PersistentVolumeClaim{
ObjectMeta: metav1.ObjectMeta{
Name: pod.Name + "-" + pod.Spec.Volumes[0].Name,
Namespace: f.Namespace.Name,
},
}
case pvcReference:
class, claim, pod = startPausePod(f.ClientSet, scTest, nodeSelection, m.tp.scName, f.Namespace.Name) class, claim, pod = startPausePod(f.ClientSet, scTest, nodeSelection, m.tp.scName, f.Namespace.Name)
if class != nil { if class != nil {
m.sc[class.Name] = class m.sc[class.Name] = class
@ -226,9 +239,9 @@ var _ = utils.SIGDescribe("CSI mock volume", func() {
if claim != nil { if claim != nil {
m.pvcs = append(m.pvcs, claim) m.pvcs = append(m.pvcs, claim)
} }
if pod != nil { }
m.pods = append(m.pods, pod) if pod != nil {
} m.pods = append(m.pods, pod)
} }
return // result variables set above return // result variables set above
} }
@ -318,6 +331,7 @@ var _ = utils.SIGDescribe("CSI mock volume", func() {
name string name string
disableAttach bool disableAttach bool
deployClusterRegistrar bool deployClusterRegistrar bool
volumeType volumeType
}{ }{
{ {
name: "should not require VolumeAttach for drivers without attachment", name: "should not require VolumeAttach for drivers without attachment",
@ -328,6 +342,11 @@ var _ = utils.SIGDescribe("CSI mock volume", func() {
name: "should require VolumeAttach for drivers with attachment", name: "should require VolumeAttach for drivers with attachment",
deployClusterRegistrar: true, deployClusterRegistrar: true,
}, },
{
name: "should require VolumeAttach for ephemermal volume and drivers with attachment",
deployClusterRegistrar: true,
volumeType: genericEphemeral,
},
{ {
name: "should preserve attachment policy when no CSIDriver present", name: "should preserve attachment policy when no CSIDriver present",
deployClusterRegistrar: false, deployClusterRegistrar: false,
@ -340,7 +359,11 @@ var _ = utils.SIGDescribe("CSI mock volume", func() {
init(testParameters{registerDriver: test.deployClusterRegistrar, disableAttach: test.disableAttach}) init(testParameters{registerDriver: test.deployClusterRegistrar, disableAttach: test.disableAttach})
defer cleanup() defer cleanup()
_, claim, pod := createPod(false) volumeType := t.volumeType
if volumeType == "" {
volumeType = pvcReference
}
_, claim, pod := createPod(volumeType)
if pod == nil { if pod == nil {
return return
} }
@ -375,7 +398,7 @@ var _ = utils.SIGDescribe("CSI mock volume", func() {
init(testParameters{registerDriver: false, disableAttach: true}) init(testParameters{registerDriver: false, disableAttach: true})
defer cleanup() defer cleanup()
_, claim, pod := createPod(false /* persistent volume, late binding as specified above */) _, claim, pod := createPod(pvcReference) // late binding as specified above
if pod == nil { if pod == nil {
return return
} }
@ -497,7 +520,11 @@ var _ = utils.SIGDescribe("CSI mock volume", func() {
defer cleanup() defer cleanup()
_, _, pod := createPod(test.expectEphemeral) withVolume := pvcReference
if test.expectEphemeral {
withVolume = csiEphemeral
}
_, _, pod := createPod(withVolume)
if pod == nil { if pod == nil {
return return
} }
@ -539,23 +566,73 @@ var _ = utils.SIGDescribe("CSI mock volume", func() {
gomega.Expect(csiNodeAttachLimit).To(gomega.BeNumerically("==", 2)) gomega.Expect(csiNodeAttachLimit).To(gomega.BeNumerically("==", 2))
_, _, pod1 := createPod(false) _, _, pod1 := createPod(pvcReference)
gomega.Expect(pod1).NotTo(gomega.BeNil(), "while creating first pod") gomega.Expect(pod1).NotTo(gomega.BeNil(), "while creating first pod")
err = e2epod.WaitForPodNameRunningInNamespace(m.cs, pod1.Name, pod1.Namespace) err = e2epod.WaitForPodNameRunningInNamespace(m.cs, pod1.Name, pod1.Namespace)
framework.ExpectNoError(err, "Failed to start pod1: %v", err) framework.ExpectNoError(err, "Failed to start pod1: %v", err)
_, _, pod2 := createPod(false) _, _, pod2 := createPod(pvcReference)
gomega.Expect(pod2).NotTo(gomega.BeNil(), "while creating second pod") gomega.Expect(pod2).NotTo(gomega.BeNil(), "while creating second pod")
err = e2epod.WaitForPodNameRunningInNamespace(m.cs, pod2.Name, pod2.Namespace) err = e2epod.WaitForPodNameRunningInNamespace(m.cs, pod2.Name, pod2.Namespace)
framework.ExpectNoError(err, "Failed to start pod2: %v", err) framework.ExpectNoError(err, "Failed to start pod2: %v", err)
_, _, pod3 := createPod(false) _, _, pod3 := createPod(pvcReference)
gomega.Expect(pod3).NotTo(gomega.BeNil(), "while creating third pod") gomega.Expect(pod3).NotTo(gomega.BeNil(), "while creating third pod")
err = waitForMaxVolumeCondition(pod3, m.cs) err = waitForMaxVolumeCondition(pod3, m.cs)
framework.ExpectNoError(err, "while waiting for max volume condition on pod : %+v", pod3) framework.ExpectNoError(err, "while waiting for max volume condition on pod : %+v", pod3)
}) })
ginkgo.It("should report attach limit for generic ephemeral volume when persistent volume is attached [Slow]", func() {
// define volume limit to be 2 for this test
var err error
init(testParameters{attachLimit: 1})
defer cleanup()
nodeName := m.config.ClientNodeSelection.Name
driverName := m.config.GetUniqueDriverName()
csiNodeAttachLimit, err := checkCSINodeForLimits(nodeName, driverName, m.cs)
framework.ExpectNoError(err, "while checking limits in CSINode: %v", err)
gomega.Expect(csiNodeAttachLimit).To(gomega.BeNumerically("==", 1))
_, _, pod1 := createPod(pvcReference)
gomega.Expect(pod1).NotTo(gomega.BeNil(), "while creating pod with persistent volume")
err = e2epod.WaitForPodNameRunningInNamespace(m.cs, pod1.Name, pod1.Namespace)
framework.ExpectNoError(err, "Failed to start pod1: %v", err)
_, _, pod2 := createPod(genericEphemeral)
gomega.Expect(pod2).NotTo(gomega.BeNil(), "while creating pod with ephemeral volume")
err = waitForMaxVolumeCondition(pod2, m.cs)
framework.ExpectNoError(err, "while waiting for max volume condition on pod : %+v", pod2)
})
ginkgo.It("should report attach limit for persistent volume when generic ephemeral volume is attached [Slow]", func() {
// define volume limit to be 2 for this test
var err error
init(testParameters{attachLimit: 1})
defer cleanup()
nodeName := m.config.ClientNodeSelection.Name
driverName := m.config.GetUniqueDriverName()
csiNodeAttachLimit, err := checkCSINodeForLimits(nodeName, driverName, m.cs)
framework.ExpectNoError(err, "while checking limits in CSINode: %v", err)
gomega.Expect(csiNodeAttachLimit).To(gomega.BeNumerically("==", 1))
_, _, pod1 := createPod(genericEphemeral)
gomega.Expect(pod1).NotTo(gomega.BeNil(), "while creating pod with persistent volume")
err = e2epod.WaitForPodNameRunningInNamespace(m.cs, pod1.Name, pod1.Namespace)
framework.ExpectNoError(err, "Failed to start pod1: %v", err)
_, _, pod2 := createPod(pvcReference)
gomega.Expect(pod2).NotTo(gomega.BeNil(), "while creating pod with ephemeral volume")
err = waitForMaxVolumeCondition(pod2, m.cs)
framework.ExpectNoError(err, "while waiting for max volume condition on pod : %+v", pod2)
})
}) })
ginkgo.Context("CSI Volume expansion", func() { ginkgo.Context("CSI Volume expansion", func() {
@ -603,7 +680,7 @@ var _ = utils.SIGDescribe("CSI mock volume", func() {
init(tp) init(tp)
defer cleanup() defer cleanup()
sc, pvc, pod := createPod(false) sc, pvc, pod := createPod(pvcReference)
gomega.Expect(pod).NotTo(gomega.BeNil(), "while creating pod for resizing") gomega.Expect(pod).NotTo(gomega.BeNil(), "while creating pod for resizing")
framework.ExpectEqual(*sc.AllowVolumeExpansion, true, "failed creating sc with allowed expansion") framework.ExpectEqual(*sc.AllowVolumeExpansion, true, "failed creating sc with allowed expansion")
@ -696,7 +773,7 @@ var _ = utils.SIGDescribe("CSI mock volume", func() {
defer cleanup() defer cleanup()
sc, pvc, pod := createPod(false) sc, pvc, pod := createPod(pvcReference)
gomega.Expect(pod).NotTo(gomega.BeNil(), "while creating pod for resizing") gomega.Expect(pod).NotTo(gomega.BeNil(), "while creating pod for resizing")
framework.ExpectEqual(*sc.AllowVolumeExpansion, true, "failed creating sc with allowed expansion") framework.ExpectEqual(*sc.AllowVolumeExpansion, true, "failed creating sc with allowed expansion")
@ -837,7 +914,7 @@ var _ = utils.SIGDescribe("CSI mock volume", func() {
}) })
defer cleanup() defer cleanup()
_, claim, pod := createPod(false) _, claim, pod := createPod(pvcReference)
if pod == nil { if pod == nil {
return return
} }
@ -975,7 +1052,7 @@ var _ = utils.SIGDescribe("CSI mock volume", func() {
}) })
defer cleanup() defer cleanup()
_, claim, pod := createPod(false) _, claim, pod := createPod(pvcReference)
if pod == nil { if pod == nil {
return return
} }
@ -1121,7 +1198,7 @@ var _ = utils.SIGDescribe("CSI mock volume", func() {
framework.ExpectNoError(err, "create PVC watch") framework.ExpectNoError(err, "create PVC watch")
defer pvcWatch.Stop() defer pvcWatch.Stop()
sc, claim, pod := createPod(false) sc, claim, pod := createPod(pvcReference)
gomega.Expect(pod).NotTo(gomega.BeNil(), "while creating pod") gomega.Expect(pod).NotTo(gomega.BeNil(), "while creating pod")
bindingMode := storagev1.VolumeBindingImmediate bindingMode := storagev1.VolumeBindingImmediate
if test.lateBinding { if test.lateBinding {
@ -1331,7 +1408,7 @@ var _ = utils.SIGDescribe("CSI mock volume", func() {
syncDelay := 5 * time.Second syncDelay := 5 * time.Second
time.Sleep(syncDelay) time.Sleep(syncDelay)
sc, _, pod := createPod(false /* persistent volume, late binding as specified above */) sc, _, pod := createPod(pvcReference) // late binding as specified above
framework.ExpectEqual(sc.Name, scName, "pre-selected storage class name not used") framework.ExpectEqual(sc.Name, scName, "pre-selected storage class name not used")
waitCtx, cancel := context.WithTimeout(context.Background(), f.Timeouts.PodStart) waitCtx, cancel := context.WithTimeout(context.Background(), f.Timeouts.PodStart)
@ -1530,7 +1607,7 @@ var _ = utils.SIGDescribe("CSI mock volume", func() {
defer cleanup() defer cleanup()
_, _, pod := createPod(false) _, _, pod := createPod(pvcReference)
if pod == nil { if pod == nil {
return return
} }
@ -1993,7 +2070,7 @@ func checkCSINodeForLimits(nodeName string, driverName string, cs clientset.Inte
return attachLimit, nil return attachLimit, nil
} }
func createClaim(cs clientset.Interface, t testsuites.StorageClassTest, node e2epod.NodeSelection, scName, ns string) (*storagev1.StorageClass, *v1.PersistentVolumeClaim) { func createSC(cs clientset.Interface, t testsuites.StorageClassTest, scName, ns string) *storagev1.StorageClass {
class := newStorageClass(t, ns, "") class := newStorageClass(t, ns, "")
if scName != "" { if scName != "" {
class.Name = scName class.Name = scName
@ -2005,12 +2082,17 @@ func createClaim(cs clientset.Interface, t testsuites.StorageClassTest, node e2e
framework.ExpectNoError(err, "Failed to create class: %v", err) framework.ExpectNoError(err, "Failed to create class: %v", err)
} }
return class
}
func createClaim(cs clientset.Interface, t testsuites.StorageClassTest, node e2epod.NodeSelection, scName, ns string) (*storagev1.StorageClass, *v1.PersistentVolumeClaim) {
class := createSC(cs, t, scName, ns)
claim := e2epv.MakePersistentVolumeClaim(e2epv.PersistentVolumeClaimConfig{ claim := e2epv.MakePersistentVolumeClaim(e2epv.PersistentVolumeClaimConfig{
ClaimSize: t.ClaimSize, ClaimSize: t.ClaimSize,
StorageClassName: &(class.Name), StorageClassName: &(class.Name),
VolumeMode: &t.VolumeMode, VolumeMode: &t.VolumeMode,
}, ns) }, ns)
claim, err = cs.CoreV1().PersistentVolumeClaims(ns).Create(context.TODO(), claim, metav1.CreateOptions{}) claim, err := cs.CoreV1().PersistentVolumeClaims(ns).Create(context.TODO(), claim, metav1.CreateOptions{})
framework.ExpectNoError(err, "Failed to create claim: %v", err) framework.ExpectNoError(err, "Failed to create claim: %v", err)
if !t.DelayBinding { if !t.DelayBinding {
@ -2046,6 +2128,21 @@ func startPausePodInline(cs clientset.Interface, t testsuites.StorageClassTest,
return pod return pod
} }
func startPausePodGenericEphemeral(cs clientset.Interface, t testsuites.StorageClassTest, node e2epod.NodeSelection, scName, ns string) (*storagev1.StorageClass, *v1.Pod) {
class := createSC(cs, t, scName, ns)
claim := e2epv.MakePersistentVolumeClaim(e2epv.PersistentVolumeClaimConfig{
ClaimSize: t.ClaimSize,
StorageClassName: &(class.Name),
VolumeMode: &t.VolumeMode,
}, ns)
pod, err := startPausePodWithVolumeSource(cs, v1.VolumeSource{
Ephemeral: &v1.EphemeralVolumeSource{
VolumeClaimTemplate: &v1.PersistentVolumeClaimTemplate{Spec: claim.Spec}},
}, node, ns)
framework.ExpectNoError(err, "Failed to create pod: %v", err)
return class, pod
}
func startPausePodWithClaim(cs clientset.Interface, pvc *v1.PersistentVolumeClaim, node e2epod.NodeSelection, ns string) (*v1.Pod, error) { func startPausePodWithClaim(cs clientset.Interface, pvc *v1.PersistentVolumeClaim, node e2epod.NodeSelection, ns string) (*v1.Pod, error) {
return startPausePodWithVolumeSource(cs, return startPausePodWithVolumeSource(cs,
v1.VolumeSource{ v1.VolumeSource{