mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-22 19:31:44 +00:00
Merge pull request #100482 from pohly/generic-ephemeral-volume-checks
generic ephemeral volume checks
This commit is contained in:
commit
e414cf7641
@ -2018,21 +2018,31 @@ func hasHostNamespace(pod *v1.Pod) bool {
|
||||
// hasHostMountPVC returns true if a PVC is referencing a HostPath volume.
|
||||
func (kl *Kubelet) hasHostMountPVC(pod *v1.Pod) bool {
|
||||
for _, volume := range pod.Spec.Volumes {
|
||||
if volume.PersistentVolumeClaim != nil {
|
||||
pvc, err := kl.kubeClient.CoreV1().PersistentVolumeClaims(pod.Namespace).Get(context.TODO(), volume.PersistentVolumeClaim.ClaimName, metav1.GetOptions{})
|
||||
if err != nil {
|
||||
klog.InfoS("Unable to retrieve pvc", "pvc", klog.KRef(pod.Namespace, volume.PersistentVolumeClaim.ClaimName), "err", err)
|
||||
pvcName := ""
|
||||
switch {
|
||||
case volume.PersistentVolumeClaim != nil:
|
||||
pvcName = volume.PersistentVolumeClaim.ClaimName
|
||||
case volume.Ephemeral != nil:
|
||||
if !utilfeature.DefaultFeatureGate.Enabled(features.GenericEphemeralVolume) {
|
||||
continue
|
||||
}
|
||||
if pvc != nil {
|
||||
referencedVolume, err := kl.kubeClient.CoreV1().PersistentVolumes().Get(context.TODO(), pvc.Spec.VolumeName, metav1.GetOptions{})
|
||||
if err != nil {
|
||||
klog.InfoS("Unable to retrieve pv", "pvName", pvc.Spec.VolumeName, "err", err)
|
||||
continue
|
||||
}
|
||||
if referencedVolume != nil && referencedVolume.Spec.HostPath != nil {
|
||||
return true
|
||||
}
|
||||
pvcName = pod.Name + "-" + volume.Name
|
||||
default:
|
||||
continue
|
||||
}
|
||||
pvc, err := kl.kubeClient.CoreV1().PersistentVolumeClaims(pod.Namespace).Get(context.TODO(), pvcName, metav1.GetOptions{})
|
||||
if err != nil {
|
||||
klog.InfoS("Unable to retrieve pvc", "pvc", klog.KRef(pod.Namespace, pvcName), "err", err)
|
||||
continue
|
||||
}
|
||||
if pvc != nil {
|
||||
referencedVolume, err := kl.kubeClient.CoreV1().PersistentVolumes().Get(context.TODO(), pvc.Spec.VolumeName, metav1.GetOptions{})
|
||||
if err != nil {
|
||||
klog.InfoS("Unable to retrieve pv", "pvName", pvc.Spec.VolumeName, "err", err)
|
||||
continue
|
||||
}
|
||||
if referencedVolume != nil && referencedVolume.Spec.HostPath != nil {
|
||||
return true
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -38,8 +38,10 @@ import (
|
||||
"k8s.io/apimachinery/pkg/runtime"
|
||||
"k8s.io/apimachinery/pkg/types"
|
||||
"k8s.io/apimachinery/pkg/util/diff"
|
||||
utilfeature "k8s.io/apiserver/pkg/util/feature"
|
||||
core "k8s.io/client-go/testing"
|
||||
"k8s.io/client-go/tools/record"
|
||||
featuregatetesting "k8s.io/component-base/featuregate/testing"
|
||||
netutils "k8s.io/utils/net"
|
||||
|
||||
// TODO: remove this import if
|
||||
@ -47,6 +49,7 @@ import (
|
||||
// to "v1"?
|
||||
|
||||
_ "k8s.io/kubernetes/pkg/apis/core/install"
|
||||
"k8s.io/kubernetes/pkg/features"
|
||||
kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
|
||||
containertest "k8s.io/kubernetes/pkg/kubelet/container/testing"
|
||||
"k8s.io/kubernetes/pkg/kubelet/cri/streaming/portforward"
|
||||
@ -2886,13 +2889,16 @@ func TestGetPortForward(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestHasHostMountPVC(t *testing.T) {
|
||||
tests := map[string]struct {
|
||||
pvError error
|
||||
pvcError error
|
||||
expected bool
|
||||
podHasPVC bool
|
||||
pvcIsHostPath bool
|
||||
}{
|
||||
type testcase struct {
|
||||
pvError error
|
||||
pvcError error
|
||||
expected bool
|
||||
podHasPVC bool
|
||||
pvcIsHostPath bool
|
||||
podHasEphemeral bool
|
||||
ephemeralEnabled bool
|
||||
}
|
||||
tests := map[string]testcase{
|
||||
"no pvc": {podHasPVC: false, expected: false},
|
||||
"error fetching pvc": {
|
||||
podHasPVC: true,
|
||||
@ -2909,6 +2915,18 @@ func TestHasHostMountPVC(t *testing.T) {
|
||||
pvcIsHostPath: true,
|
||||
expected: true,
|
||||
},
|
||||
"enabled ephemeral host path": {
|
||||
podHasEphemeral: true,
|
||||
pvcIsHostPath: true,
|
||||
ephemeralEnabled: true,
|
||||
expected: true,
|
||||
},
|
||||
"disabled ephemeral host path": {
|
||||
podHasEphemeral: true,
|
||||
pvcIsHostPath: true,
|
||||
ephemeralEnabled: false,
|
||||
expected: false,
|
||||
},
|
||||
"non host path pvc": {
|
||||
podHasPVC: true,
|
||||
pvcIsHostPath: false,
|
||||
@ -2916,7 +2934,8 @@ func TestHasHostMountPVC(t *testing.T) {
|
||||
},
|
||||
}
|
||||
|
||||
for k, v := range tests {
|
||||
run := func(t *testing.T, v testcase) {
|
||||
defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.GenericEphemeralVolume, v.ephemeralEnabled)()
|
||||
testKubelet := newTestKubelet(t, false)
|
||||
defer testKubelet.Cleanup()
|
||||
pod := &v1.Pod{
|
||||
@ -2935,13 +2954,23 @@ func TestHasHostMountPVC(t *testing.T) {
|
||||
},
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
if v.pvcIsHostPath {
|
||||
volumeToReturn.Spec.PersistentVolumeSource = v1.PersistentVolumeSource{
|
||||
HostPath: &v1.HostPathVolumeSource{},
|
||||
}
|
||||
if v.podHasEphemeral {
|
||||
pod.Spec.Volumes = []v1.Volume{
|
||||
{
|
||||
Name: "xyz",
|
||||
VolumeSource: v1.VolumeSource{
|
||||
Ephemeral: &v1.EphemeralVolumeSource{},
|
||||
},
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
if (v.podHasPVC || v.podHasEphemeral) && v.pvcIsHostPath {
|
||||
volumeToReturn.Spec.PersistentVolumeSource = v1.PersistentVolumeSource{
|
||||
HostPath: &v1.HostPathVolumeSource{},
|
||||
}
|
||||
}
|
||||
|
||||
testKubelet.fakeKubeClient.AddReactor("get", "persistentvolumeclaims", func(action core.Action) (bool, runtime.Object, error) {
|
||||
@ -2957,9 +2986,14 @@ func TestHasHostMountPVC(t *testing.T) {
|
||||
|
||||
actual := testKubelet.kubelet.hasHostMountPVC(pod)
|
||||
if actual != v.expected {
|
||||
t.Errorf("%s expected %t but got %t", k, v.expected, actual)
|
||||
t.Errorf("expected %t but got %t", v.expected, actual)
|
||||
}
|
||||
}
|
||||
|
||||
for k, v := range tests {
|
||||
t.Run(k, func(t *testing.T) {
|
||||
run(t, v)
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -22,6 +22,7 @@ import (
|
||||
|
||||
v1 "k8s.io/api/core/v1"
|
||||
storagev1 "k8s.io/api/storage/v1"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/apimachinery/pkg/runtime"
|
||||
"k8s.io/apimachinery/pkg/util/rand"
|
||||
corelisters "k8s.io/client-go/listers/core/v1"
|
||||
@ -30,6 +31,7 @@ import (
|
||||
csitrans "k8s.io/csi-translation-lib"
|
||||
"k8s.io/klog/v2"
|
||||
"k8s.io/kubernetes/pkg/scheduler/framework"
|
||||
"k8s.io/kubernetes/pkg/scheduler/framework/plugins/feature"
|
||||
"k8s.io/kubernetes/pkg/scheduler/framework/plugins/names"
|
||||
volumeutil "k8s.io/kubernetes/pkg/volume/util"
|
||||
)
|
||||
@ -54,6 +56,8 @@ type CSILimits struct {
|
||||
randomVolumeIDPrefix string
|
||||
|
||||
translator InTreeToCSITranslator
|
||||
|
||||
enableGenericEphemeralVolume bool
|
||||
}
|
||||
|
||||
var _ framework.FilterPlugin = &CSILimits{}
|
||||
@ -96,7 +100,7 @@ func (pl *CSILimits) Filter(ctx context.Context, _ *framework.CycleState, pod *v
|
||||
}
|
||||
|
||||
newVolumes := make(map[string]string)
|
||||
if err := pl.filterAttachableVolumes(csiNode, pod.Spec.Volumes, pod.Namespace, newVolumes); err != nil {
|
||||
if err := pl.filterAttachableVolumes(pod, csiNode, true /* new pod */, newVolumes); err != nil {
|
||||
return framework.AsStatus(err)
|
||||
}
|
||||
|
||||
@ -113,7 +117,7 @@ func (pl *CSILimits) Filter(ctx context.Context, _ *framework.CycleState, pod *v
|
||||
|
||||
attachedVolumes := make(map[string]string)
|
||||
for _, existingPod := range nodeInfo.Pods {
|
||||
if err := pl.filterAttachableVolumes(csiNode, existingPod.Pod.Spec.Volumes, existingPod.Pod.Namespace, attachedVolumes); err != nil {
|
||||
if err := pl.filterAttachableVolumes(existingPod.Pod, csiNode, false /* existing pod */, attachedVolumes); err != nil {
|
||||
return framework.AsStatus(err)
|
||||
}
|
||||
}
|
||||
@ -144,25 +148,55 @@ func (pl *CSILimits) Filter(ctx context.Context, _ *framework.CycleState, pod *v
|
||||
}
|
||||
|
||||
func (pl *CSILimits) filterAttachableVolumes(
|
||||
csiNode *storagev1.CSINode, volumes []v1.Volume, namespace string, result map[string]string) error {
|
||||
for _, vol := range volumes {
|
||||
// CSI volumes can only be used as persistent volumes
|
||||
if vol.PersistentVolumeClaim == nil {
|
||||
pod *v1.Pod, csiNode *storagev1.CSINode, newPod bool, result map[string]string) error {
|
||||
for _, vol := range pod.Spec.Volumes {
|
||||
// CSI volumes can only be used through a PVC.
|
||||
pvcName := ""
|
||||
ephemeral := false
|
||||
switch {
|
||||
case vol.PersistentVolumeClaim != nil:
|
||||
pvcName = vol.PersistentVolumeClaim.ClaimName
|
||||
case vol.Ephemeral != nil:
|
||||
if newPod && !pl.enableGenericEphemeralVolume {
|
||||
return fmt.Errorf(
|
||||
"volume %s is a generic ephemeral volume, but that feature is disabled in kube-scheduler",
|
||||
vol.Name,
|
||||
)
|
||||
}
|
||||
// Generic ephemeral inline volumes also use a PVC,
|
||||
// just with a computed name and certain ownership.
|
||||
// That is checked below once the pvc object is
|
||||
// retrieved.
|
||||
pvcName = pod.Name + "-" + vol.Name
|
||||
ephemeral = true
|
||||
default:
|
||||
continue
|
||||
}
|
||||
pvcName := vol.PersistentVolumeClaim.ClaimName
|
||||
|
||||
if pvcName == "" {
|
||||
return fmt.Errorf("PersistentVolumeClaim had no name")
|
||||
}
|
||||
|
||||
pvc, err := pl.pvcLister.PersistentVolumeClaims(namespace).Get(pvcName)
|
||||
pvc, err := pl.pvcLister.PersistentVolumeClaims(pod.Namespace).Get(pvcName)
|
||||
|
||||
if err != nil {
|
||||
klog.V(5).InfoS("Unable to look up PVC info", "PVC", fmt.Sprintf("%s/%s", namespace, pvcName))
|
||||
if newPod {
|
||||
// The PVC is required to proceed with
|
||||
// scheduling of a new pod because it cannot
|
||||
// run without it. Bail out immediately.
|
||||
return fmt.Errorf("looking up PVC %s/%s: %v", pod.Namespace, pvcName, err)
|
||||
}
|
||||
// If the PVC is invalid, we don't count the volume because
|
||||
// there's no guarantee that it belongs to the running predicate.
|
||||
klog.V(5).InfoS("Unable to look up PVC info", "PVC", fmt.Sprintf("%s/%s", pod.Namespace, pvcName))
|
||||
continue
|
||||
}
|
||||
|
||||
// The PVC for an ephemeral volume must be owned by the pod.
|
||||
if ephemeral && !metav1.IsControlledBy(pvc, pod) {
|
||||
return fmt.Errorf("PVC %s/%s is not owned by pod", pod.Namespace, pvcName)
|
||||
}
|
||||
|
||||
driverName, volumeHandle := pl.getCSIDriverInfo(csiNode, pvc)
|
||||
if driverName == "" || volumeHandle == "" {
|
||||
klog.V(5).Info("Could not find a CSI driver name or volume handle, not counting volume")
|
||||
@ -276,7 +310,7 @@ func (pl *CSILimits) getCSIDriverInfoFromSC(csiNode *storagev1.CSINode, pvc *v1.
|
||||
}
|
||||
|
||||
// NewCSI initializes a new plugin and returns it.
|
||||
func NewCSI(_ runtime.Object, handle framework.Handle) (framework.Plugin, error) {
|
||||
func NewCSI(_ runtime.Object, handle framework.Handle, fts feature.Features) (framework.Plugin, error) {
|
||||
informerFactory := handle.SharedInformerFactory()
|
||||
pvLister := informerFactory.Core().V1().PersistentVolumes().Lister()
|
||||
pvcLister := informerFactory.Core().V1().PersistentVolumeClaims().Lister()
|
||||
@ -284,12 +318,13 @@ func NewCSI(_ runtime.Object, handle framework.Handle) (framework.Plugin, error)
|
||||
scLister := informerFactory.Storage().V1().StorageClasses().Lister()
|
||||
|
||||
return &CSILimits{
|
||||
csiNodeLister: csiNodesLister,
|
||||
pvLister: pvLister,
|
||||
pvcLister: pvcLister,
|
||||
scLister: scLister,
|
||||
randomVolumeIDPrefix: rand.String(32),
|
||||
translator: csitrans.New(),
|
||||
csiNodeLister: csiNodesLister,
|
||||
pvLister: pvLister,
|
||||
pvcLister: pvcLister,
|
||||
scLister: scLister,
|
||||
randomVolumeIDPrefix: rand.String(32),
|
||||
translator: csitrans.New(),
|
||||
enableGenericEphemeralVolume: fts.EnableGenericEphemeralVolume,
|
||||
}, nil
|
||||
}
|
||||
|
||||
|
@ -47,6 +47,10 @@ const (
|
||||
hostpathInTreePluginName = "kubernetes.io/hostpath"
|
||||
)
|
||||
|
||||
var (
|
||||
scName = "csi-sc"
|
||||
)
|
||||
|
||||
// getVolumeLimitKey returns a ResourceName by filter type
|
||||
func getVolumeLimitKey(filterType string) v1.ResourceName {
|
||||
switch filterType {
|
||||
@ -236,14 +240,112 @@ func TestCSILimits(t *testing.T) {
|
||||
},
|
||||
}
|
||||
|
||||
ephemeralVolumePod := &v1.Pod{
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
Namespace: "test",
|
||||
Name: "abc",
|
||||
UID: "12345",
|
||||
},
|
||||
Spec: v1.PodSpec{
|
||||
Volumes: []v1.Volume{
|
||||
{
|
||||
Name: "xyz",
|
||||
VolumeSource: v1.VolumeSource{
|
||||
Ephemeral: &v1.EphemeralVolumeSource{},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
controller := true
|
||||
ephemeralClaim := &v1.PersistentVolumeClaim{
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
Namespace: ephemeralVolumePod.Namespace,
|
||||
Name: ephemeralVolumePod.Name + "-" + ephemeralVolumePod.Spec.Volumes[0].Name,
|
||||
OwnerReferences: []metav1.OwnerReference{
|
||||
{
|
||||
Kind: "Pod",
|
||||
Name: ephemeralVolumePod.Name,
|
||||
UID: ephemeralVolumePod.UID,
|
||||
Controller: &controller,
|
||||
},
|
||||
},
|
||||
},
|
||||
Spec: v1.PersistentVolumeClaimSpec{
|
||||
StorageClassName: &scName,
|
||||
},
|
||||
}
|
||||
conflictingClaim := ephemeralClaim.DeepCopy()
|
||||
conflictingClaim.OwnerReferences = nil
|
||||
|
||||
ephemeralTwoVolumePod := &v1.Pod{
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
Namespace: "test",
|
||||
Name: "abc",
|
||||
UID: "12345II",
|
||||
},
|
||||
Spec: v1.PodSpec{
|
||||
Volumes: []v1.Volume{
|
||||
{
|
||||
Name: "x",
|
||||
VolumeSource: v1.VolumeSource{
|
||||
Ephemeral: &v1.EphemeralVolumeSource{},
|
||||
},
|
||||
},
|
||||
{
|
||||
Name: "y",
|
||||
VolumeSource: v1.VolumeSource{
|
||||
Ephemeral: &v1.EphemeralVolumeSource{},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
ephemeralClaimX := &v1.PersistentVolumeClaim{
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
Namespace: ephemeralTwoVolumePod.Namespace,
|
||||
Name: ephemeralTwoVolumePod.Name + "-" + ephemeralTwoVolumePod.Spec.Volumes[0].Name,
|
||||
OwnerReferences: []metav1.OwnerReference{
|
||||
{
|
||||
Kind: "Pod",
|
||||
Name: ephemeralTwoVolumePod.Name,
|
||||
UID: ephemeralTwoVolumePod.UID,
|
||||
Controller: &controller,
|
||||
},
|
||||
},
|
||||
},
|
||||
Spec: v1.PersistentVolumeClaimSpec{
|
||||
StorageClassName: &scName,
|
||||
},
|
||||
}
|
||||
ephemeralClaimY := &v1.PersistentVolumeClaim{
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
Namespace: ephemeralTwoVolumePod.Namespace,
|
||||
Name: ephemeralTwoVolumePod.Name + "-" + ephemeralTwoVolumePod.Spec.Volumes[1].Name,
|
||||
OwnerReferences: []metav1.OwnerReference{
|
||||
{
|
||||
Kind: "Pod",
|
||||
Name: ephemeralTwoVolumePod.Name,
|
||||
UID: ephemeralTwoVolumePod.UID,
|
||||
Controller: &controller,
|
||||
},
|
||||
},
|
||||
},
|
||||
Spec: v1.PersistentVolumeClaimSpec{
|
||||
StorageClassName: &scName,
|
||||
},
|
||||
}
|
||||
|
||||
tests := []struct {
|
||||
newPod *v1.Pod
|
||||
existingPods []*v1.Pod
|
||||
extraClaims []v1.PersistentVolumeClaim
|
||||
filterName string
|
||||
maxVols int
|
||||
driverNames []string
|
||||
test string
|
||||
migrationEnabled bool
|
||||
ephemeralEnabled bool
|
||||
limitSource string
|
||||
wantStatus *framework.Status
|
||||
}{
|
||||
@ -443,6 +545,97 @@ func TestCSILimits(t *testing.T) {
|
||||
limitSource: "csinode",
|
||||
test: "should not count in-tree and count csi volumes if migration is disabled (when scheduling in-tree volumes)",
|
||||
},
|
||||
// ephemeral volumes
|
||||
{
|
||||
newPod: ephemeralVolumePod,
|
||||
filterName: "csi",
|
||||
driverNames: []string{ebsCSIDriverName},
|
||||
test: "ephemeral volume feature disabled",
|
||||
wantStatus: framework.NewStatus(framework.Error, "volume xyz is a generic ephemeral volume, but that feature is disabled in kube-scheduler"),
|
||||
},
|
||||
{
|
||||
newPod: ephemeralVolumePod,
|
||||
filterName: "csi",
|
||||
ephemeralEnabled: true,
|
||||
driverNames: []string{ebsCSIDriverName},
|
||||
test: "ephemeral volume missing",
|
||||
wantStatus: framework.NewStatus(framework.Error, `looking up PVC test/abc-xyz: persistentvolumeclaim "abc-xyz" not found`),
|
||||
},
|
||||
{
|
||||
newPod: ephemeralVolumePod,
|
||||
filterName: "csi",
|
||||
ephemeralEnabled: true,
|
||||
extraClaims: []v1.PersistentVolumeClaim{*conflictingClaim},
|
||||
driverNames: []string{ebsCSIDriverName},
|
||||
test: "ephemeral volume not owned",
|
||||
wantStatus: framework.NewStatus(framework.Error, "PVC test/abc-xyz is not owned by pod"),
|
||||
},
|
||||
{
|
||||
newPod: ephemeralVolumePod,
|
||||
filterName: "csi",
|
||||
ephemeralEnabled: true,
|
||||
extraClaims: []v1.PersistentVolumeClaim{*ephemeralClaim},
|
||||
driverNames: []string{ebsCSIDriverName},
|
||||
test: "ephemeral volume unbound",
|
||||
},
|
||||
{
|
||||
newPod: ephemeralVolumePod,
|
||||
filterName: "csi",
|
||||
ephemeralEnabled: true,
|
||||
extraClaims: []v1.PersistentVolumeClaim{*ephemeralClaim},
|
||||
driverNames: []string{ebsCSIDriverName},
|
||||
existingPods: []*v1.Pod{runningPod, csiEBSTwoVolPod},
|
||||
maxVols: 2,
|
||||
limitSource: "node",
|
||||
test: "ephemeral doesn't when node volume limit <= pods CSI volume",
|
||||
wantStatus: framework.NewStatus(framework.Unschedulable, ErrReasonMaxVolumeCountExceeded),
|
||||
},
|
||||
{
|
||||
newPod: csiEBSOneVolPod,
|
||||
filterName: "csi",
|
||||
ephemeralEnabled: true,
|
||||
extraClaims: []v1.PersistentVolumeClaim{*ephemeralClaimX, *ephemeralClaimY},
|
||||
driverNames: []string{ebsCSIDriverName},
|
||||
existingPods: []*v1.Pod{runningPod, ephemeralTwoVolumePod},
|
||||
maxVols: 2,
|
||||
limitSource: "node",
|
||||
test: "ephemeral doesn't when node volume limit <= pods ephemeral CSI volume",
|
||||
wantStatus: framework.NewStatus(framework.Unschedulable, ErrReasonMaxVolumeCountExceeded),
|
||||
},
|
||||
{
|
||||
newPod: csiEBSOneVolPod,
|
||||
filterName: "csi",
|
||||
ephemeralEnabled: false,
|
||||
extraClaims: []v1.PersistentVolumeClaim{*ephemeralClaim},
|
||||
driverNames: []string{ebsCSIDriverName},
|
||||
existingPods: []*v1.Pod{runningPod, ephemeralVolumePod, csiEBSTwoVolPod},
|
||||
maxVols: 3,
|
||||
limitSource: "node",
|
||||
test: "persistent doesn't when node volume limit <= pods ephemeral CSI volume + persistent volume, ephemeral disabled",
|
||||
wantStatus: framework.NewStatus(framework.Unschedulable, ErrReasonMaxVolumeCountExceeded),
|
||||
},
|
||||
{
|
||||
newPod: csiEBSOneVolPod,
|
||||
filterName: "csi",
|
||||
ephemeralEnabled: true,
|
||||
extraClaims: []v1.PersistentVolumeClaim{*ephemeralClaim},
|
||||
driverNames: []string{ebsCSIDriverName},
|
||||
existingPods: []*v1.Pod{runningPod, ephemeralVolumePod, csiEBSTwoVolPod},
|
||||
maxVols: 3,
|
||||
limitSource: "node",
|
||||
test: "persistent doesn't when node volume limit <= pods ephemeral CSI volume + persistent volume",
|
||||
wantStatus: framework.NewStatus(framework.Unschedulable, ErrReasonMaxVolumeCountExceeded),
|
||||
},
|
||||
{
|
||||
newPod: csiEBSOneVolPod,
|
||||
filterName: "csi",
|
||||
ephemeralEnabled: true,
|
||||
extraClaims: []v1.PersistentVolumeClaim{*ephemeralClaim},
|
||||
driverNames: []string{ebsCSIDriverName},
|
||||
existingPods: []*v1.Pod{runningPod, ephemeralVolumePod, csiEBSTwoVolPod},
|
||||
maxVols: 4,
|
||||
test: "persistent okay when node volume limit > pods ephemeral CSI volume + persistent volume",
|
||||
},
|
||||
}
|
||||
|
||||
// running attachable predicate tests with feature gate and limit present on nodes
|
||||
@ -457,14 +650,15 @@ func TestCSILimits(t *testing.T) {
|
||||
defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.CSIMigration, false)()
|
||||
defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.CSIMigrationAWS, false)()
|
||||
}
|
||||
|
||||
p := &CSILimits{
|
||||
csiNodeLister: getFakeCSINodeLister(csiNode),
|
||||
pvLister: getFakeCSIPVLister(test.filterName, test.driverNames...),
|
||||
pvcLister: getFakeCSIPVCLister(test.filterName, "csi-sc", test.driverNames...),
|
||||
scLister: getFakeCSIStorageClassLister("csi-sc", test.driverNames[0]),
|
||||
pvcLister: append(getFakeCSIPVCLister(test.filterName, scName, test.driverNames...), test.extraClaims...),
|
||||
scLister: getFakeCSIStorageClassLister(scName, test.driverNames[0]),
|
||||
randomVolumeIDPrefix: rand.String(32),
|
||||
translator: csitrans.New(),
|
||||
|
||||
enableGenericEphemeralVolume: test.ephemeralEnabled,
|
||||
}
|
||||
gotStatus := p.Filter(context.Background(), nil, test.newPod, node)
|
||||
if !reflect.DeepEqual(gotStatus, test.wantStatus) {
|
||||
|
@ -26,6 +26,7 @@ import (
|
||||
|
||||
v1 "k8s.io/api/core/v1"
|
||||
storage "k8s.io/api/storage/v1"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/apimachinery/pkg/runtime"
|
||||
"k8s.io/apimachinery/pkg/util/rand"
|
||||
"k8s.io/apimachinery/pkg/util/sets"
|
||||
@ -35,6 +36,7 @@ import (
|
||||
csilibplugins "k8s.io/csi-translation-lib/plugins"
|
||||
"k8s.io/klog/v2"
|
||||
"k8s.io/kubernetes/pkg/scheduler/framework"
|
||||
"k8s.io/kubernetes/pkg/scheduler/framework/plugins/feature"
|
||||
"k8s.io/kubernetes/pkg/scheduler/framework/plugins/names"
|
||||
volumeutil "k8s.io/kubernetes/pkg/volume/util"
|
||||
)
|
||||
@ -68,36 +70,36 @@ const (
|
||||
const AzureDiskName = names.AzureDiskLimits
|
||||
|
||||
// NewAzureDisk returns function that initializes a new plugin and returns it.
|
||||
func NewAzureDisk(_ runtime.Object, handle framework.Handle) (framework.Plugin, error) {
|
||||
func NewAzureDisk(_ runtime.Object, handle framework.Handle, fts feature.Features) (framework.Plugin, error) {
|
||||
informerFactory := handle.SharedInformerFactory()
|
||||
return newNonCSILimitsWithInformerFactory(azureDiskVolumeFilterType, informerFactory), nil
|
||||
return newNonCSILimitsWithInformerFactory(azureDiskVolumeFilterType, informerFactory, fts), nil
|
||||
}
|
||||
|
||||
// CinderName is the name of the plugin used in the plugin registry and configurations.
|
||||
const CinderName = names.CinderLimits
|
||||
|
||||
// NewCinder returns function that initializes a new plugin and returns it.
|
||||
func NewCinder(_ runtime.Object, handle framework.Handle) (framework.Plugin, error) {
|
||||
func NewCinder(_ runtime.Object, handle framework.Handle, fts feature.Features) (framework.Plugin, error) {
|
||||
informerFactory := handle.SharedInformerFactory()
|
||||
return newNonCSILimitsWithInformerFactory(cinderVolumeFilterType, informerFactory), nil
|
||||
return newNonCSILimitsWithInformerFactory(cinderVolumeFilterType, informerFactory, fts), nil
|
||||
}
|
||||
|
||||
// EBSName is the name of the plugin used in the plugin registry and configurations.
|
||||
const EBSName = names.EBSLimits
|
||||
|
||||
// NewEBS returns function that initializes a new plugin and returns it.
|
||||
func NewEBS(_ runtime.Object, handle framework.Handle) (framework.Plugin, error) {
|
||||
func NewEBS(_ runtime.Object, handle framework.Handle, fts feature.Features) (framework.Plugin, error) {
|
||||
informerFactory := handle.SharedInformerFactory()
|
||||
return newNonCSILimitsWithInformerFactory(ebsVolumeFilterType, informerFactory), nil
|
||||
return newNonCSILimitsWithInformerFactory(ebsVolumeFilterType, informerFactory, fts), nil
|
||||
}
|
||||
|
||||
// GCEPDName is the name of the plugin used in the plugin registry and configurations.
|
||||
const GCEPDName = names.GCEPDLimits
|
||||
|
||||
// NewGCEPD returns function that initializes a new plugin and returns it.
|
||||
func NewGCEPD(_ runtime.Object, handle framework.Handle) (framework.Plugin, error) {
|
||||
func NewGCEPD(_ runtime.Object, handle framework.Handle, fts feature.Features) (framework.Plugin, error) {
|
||||
informerFactory := handle.SharedInformerFactory()
|
||||
return newNonCSILimitsWithInformerFactory(gcePDVolumeFilterType, informerFactory), nil
|
||||
return newNonCSILimitsWithInformerFactory(gcePDVolumeFilterType, informerFactory, fts), nil
|
||||
}
|
||||
|
||||
// nonCSILimits contains information to check the max number of volumes for a plugin.
|
||||
@ -115,6 +117,8 @@ type nonCSILimits struct {
|
||||
// It is used to prefix volumeID generated inside the predicate() method to
|
||||
// avoid conflicts with any real volume.
|
||||
randomVolumeIDPrefix string
|
||||
|
||||
enableGenericEphemeralVolume bool
|
||||
}
|
||||
|
||||
var _ framework.FilterPlugin = &nonCSILimits{}
|
||||
@ -124,13 +128,14 @@ var _ framework.EnqueueExtensions = &nonCSILimits{}
|
||||
func newNonCSILimitsWithInformerFactory(
|
||||
filterName string,
|
||||
informerFactory informers.SharedInformerFactory,
|
||||
fts feature.Features,
|
||||
) framework.Plugin {
|
||||
pvLister := informerFactory.Core().V1().PersistentVolumes().Lister()
|
||||
pvcLister := informerFactory.Core().V1().PersistentVolumeClaims().Lister()
|
||||
csiNodesLister := informerFactory.Storage().V1().CSINodes().Lister()
|
||||
scLister := informerFactory.Storage().V1().StorageClasses().Lister()
|
||||
|
||||
return newNonCSILimits(filterName, csiNodesLister, scLister, pvLister, pvcLister)
|
||||
return newNonCSILimits(filterName, csiNodesLister, scLister, pvLister, pvcLister, fts)
|
||||
}
|
||||
|
||||
// newNonCSILimits creates a plugin which evaluates whether a pod can fit based on the
|
||||
@ -149,6 +154,7 @@ func newNonCSILimits(
|
||||
scLister storagelisters.StorageClassLister,
|
||||
pvLister corelisters.PersistentVolumeLister,
|
||||
pvcLister corelisters.PersistentVolumeClaimLister,
|
||||
fts feature.Features,
|
||||
) framework.Plugin {
|
||||
var filter VolumeFilter
|
||||
var volumeLimitKey v1.ResourceName
|
||||
@ -185,6 +191,8 @@ func newNonCSILimits(
|
||||
pvcLister: pvcLister,
|
||||
scLister: scLister,
|
||||
randomVolumeIDPrefix: rand.String(32),
|
||||
|
||||
enableGenericEphemeralVolume: fts.EnableGenericEphemeralVolume,
|
||||
}
|
||||
|
||||
return pl
|
||||
@ -213,7 +221,7 @@ func (pl *nonCSILimits) Filter(ctx context.Context, _ *framework.CycleState, pod
|
||||
}
|
||||
|
||||
newVolumes := make(sets.String)
|
||||
if err := pl.filterVolumes(pod.Spec.Volumes, pod.Namespace, newVolumes); err != nil {
|
||||
if err := pl.filterVolumes(pod, true /* new pod */, newVolumes); err != nil {
|
||||
return framework.AsStatus(err)
|
||||
}
|
||||
|
||||
@ -246,7 +254,7 @@ func (pl *nonCSILimits) Filter(ctx context.Context, _ *framework.CycleState, pod
|
||||
// count unique volumes
|
||||
existingVolumes := make(sets.String)
|
||||
for _, existingPod := range nodeInfo.Pods {
|
||||
if err := pl.filterVolumes(existingPod.Pod.Spec.Volumes, existingPod.Pod.Namespace, existingVolumes); err != nil {
|
||||
if err := pl.filterVolumes(existingPod.Pod, false /* existing pod */, existingVolumes); err != nil {
|
||||
return framework.AsStatus(err)
|
||||
}
|
||||
}
|
||||
@ -270,57 +278,90 @@ func (pl *nonCSILimits) Filter(ctx context.Context, _ *framework.CycleState, pod
|
||||
return nil
|
||||
}
|
||||
|
||||
func (pl *nonCSILimits) filterVolumes(volumes []v1.Volume, namespace string, filteredVolumes sets.String) error {
|
||||
func (pl *nonCSILimits) filterVolumes(pod *v1.Pod, newPod bool, filteredVolumes sets.String) error {
|
||||
volumes := pod.Spec.Volumes
|
||||
for i := range volumes {
|
||||
vol := &volumes[i]
|
||||
if id, ok := pl.filter.FilterVolume(vol); ok {
|
||||
filteredVolumes.Insert(id)
|
||||
} else if vol.PersistentVolumeClaim != nil {
|
||||
pvcName := vol.PersistentVolumeClaim.ClaimName
|
||||
if pvcName == "" {
|
||||
return fmt.Errorf("PersistentVolumeClaim had no name")
|
||||
}
|
||||
continue
|
||||
}
|
||||
|
||||
// Until we know real ID of the volume use namespace/pvcName as substitute
|
||||
// with a random prefix (calculated and stored inside 'c' during initialization)
|
||||
// to avoid conflicts with existing volume IDs.
|
||||
pvID := fmt.Sprintf("%s-%s/%s", pl.randomVolumeIDPrefix, namespace, pvcName)
|
||||
|
||||
pvc, err := pl.pvcLister.PersistentVolumeClaims(namespace).Get(pvcName)
|
||||
if err != nil {
|
||||
// If the PVC is invalid, we don't count the volume because
|
||||
// there's no guarantee that it belongs to the running predicate.
|
||||
klog.V(4).InfoS("Unable to look up PVC info, assuming PVC doesn't match predicate when counting limits", "PVC", fmt.Sprintf("%s/%s", namespace, pvcName), "err", err)
|
||||
continue
|
||||
pvcName := ""
|
||||
ephemeral := false
|
||||
switch {
|
||||
case vol.PersistentVolumeClaim != nil:
|
||||
pvcName = vol.PersistentVolumeClaim.ClaimName
|
||||
case vol.Ephemeral != nil:
|
||||
if !pl.enableGenericEphemeralVolume {
|
||||
return fmt.Errorf(
|
||||
"volume %s is a generic ephemeral volume, but that feature is disabled in kube-scheduler",
|
||||
vol.Name,
|
||||
)
|
||||
}
|
||||
// Generic ephemeral inline volumes also use a PVC,
|
||||
// just with a computed name and certain ownership.
|
||||
// That is checked below once the pvc object is
|
||||
// retrieved.
|
||||
pvcName = pod.Name + "-" + vol.Name
|
||||
ephemeral = true
|
||||
default:
|
||||
continue
|
||||
}
|
||||
if pvcName == "" {
|
||||
return fmt.Errorf("PersistentVolumeClaim had no name")
|
||||
}
|
||||
|
||||
pvName := pvc.Spec.VolumeName
|
||||
if pvName == "" {
|
||||
// PVC is not bound. It was either deleted and created again or
|
||||
// it was forcefully unbound by admin. The pod can still use the
|
||||
// original PV where it was bound to, so we count the volume if
|
||||
// it belongs to the running predicate.
|
||||
if pl.matchProvisioner(pvc) {
|
||||
klog.V(4).InfoS("PVC is not bound, assuming PVC matches predicate when counting limits", "PVC", fmt.Sprintf("%s/%s", namespace, pvcName))
|
||||
filteredVolumes.Insert(pvID)
|
||||
}
|
||||
continue
|
||||
}
|
||||
// Until we know real ID of the volume use namespace/pvcName as substitute
|
||||
// with a random prefix (calculated and stored inside 'c' during initialization)
|
||||
// to avoid conflicts with existing volume IDs.
|
||||
pvID := fmt.Sprintf("%s-%s/%s", pl.randomVolumeIDPrefix, pod.Namespace, pvcName)
|
||||
|
||||
pv, err := pl.pvLister.Get(pvName)
|
||||
if err != nil {
|
||||
// If the PV is invalid and PVC belongs to the running predicate,
|
||||
// log the error and count the PV towards the PV limit.
|
||||
if pl.matchProvisioner(pvc) {
|
||||
klog.V(4).InfoS("Unable to look up PV, assuming PV matches predicate when counting limits", "PV", fmt.Sprintf("%s/%s/%s", namespace, pvcName, pvName), "err", err)
|
||||
filteredVolumes.Insert(pvID)
|
||||
}
|
||||
continue
|
||||
pvc, err := pl.pvcLister.PersistentVolumeClaims(pod.Namespace).Get(pvcName)
|
||||
if err != nil {
|
||||
if newPod {
|
||||
// The PVC is required to proceed with
|
||||
// scheduling of a new pod because it cannot
|
||||
// run without it. Bail out immediately.
|
||||
return fmt.Errorf("looking up PVC %s/%s: %v", pod.Namespace, pvcName, err)
|
||||
}
|
||||
// If the PVC is invalid, we don't count the volume because
|
||||
// there's no guarantee that it belongs to the running predicate.
|
||||
klog.V(4).InfoS("Unable to look up PVC info, assuming PVC doesn't match predicate when counting limits", "PVC", fmt.Sprintf("%s/%s", pod.Namespace, pvcName), "err", err)
|
||||
continue
|
||||
}
|
||||
|
||||
if id, ok := pl.filter.FilterPersistentVolume(pv); ok {
|
||||
filteredVolumes.Insert(id)
|
||||
// The PVC for an ephemeral volume must be owned by the pod.
|
||||
if ephemeral && !metav1.IsControlledBy(pvc, pod) {
|
||||
return fmt.Errorf("PVC %s/%s is not owned by pod", pod.Namespace, pvcName)
|
||||
}
|
||||
|
||||
pvName := pvc.Spec.VolumeName
|
||||
if pvName == "" {
|
||||
// PVC is not bound. It was either deleted and created again or
|
||||
// it was forcefully unbound by admin. The pod can still use the
|
||||
// original PV where it was bound to, so we count the volume if
|
||||
// it belongs to the running predicate.
|
||||
if pl.matchProvisioner(pvc) {
|
||||
klog.V(4).InfoS("PVC is not bound, assuming PVC matches predicate when counting limits", "PVC", fmt.Sprintf("%s/%s", pod.Namespace, pvcName))
|
||||
filteredVolumes.Insert(pvID)
|
||||
}
|
||||
continue
|
||||
}
|
||||
|
||||
pv, err := pl.pvLister.Get(pvName)
|
||||
if err != nil {
|
||||
// If the PV is invalid and PVC belongs to the running predicate,
|
||||
// log the error and count the PV towards the PV limit.
|
||||
if pl.matchProvisioner(pvc) {
|
||||
klog.V(4).InfoS("Unable to look up PV, assuming PV matches predicate when counting limits", "PV", fmt.Sprintf("%s/%s/%s", pod.Namespace, pvcName, pvName), "err", err)
|
||||
filteredVolumes.Insert(pvID)
|
||||
}
|
||||
continue
|
||||
}
|
||||
|
||||
if id, ok := pl.filter.FilterPersistentVolume(pv); ok {
|
||||
filteredVolumes.Insert(id)
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -28,9 +28,114 @@ import (
|
||||
csilibplugins "k8s.io/csi-translation-lib/plugins"
|
||||
"k8s.io/kubernetes/pkg/scheduler/framework"
|
||||
fakeframework "k8s.io/kubernetes/pkg/scheduler/framework/fake"
|
||||
"k8s.io/kubernetes/pkg/scheduler/framework/plugins/feature"
|
||||
utilpointer "k8s.io/utils/pointer"
|
||||
)
|
||||
|
||||
func TestEphemeralLimits(t *testing.T) {
|
||||
// We have to specify a valid filter and arbitrarily pick Cinder here.
|
||||
// It doesn't matter for the test cases.
|
||||
filterName := cinderVolumeFilterType
|
||||
driverName := csilibplugins.CinderInTreePluginName
|
||||
|
||||
ephemeralVolumePod := &v1.Pod{
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
Namespace: "test",
|
||||
Name: "abc",
|
||||
UID: "12345",
|
||||
},
|
||||
Spec: v1.PodSpec{
|
||||
Volumes: []v1.Volume{
|
||||
{
|
||||
Name: "xyz",
|
||||
VolumeSource: v1.VolumeSource{
|
||||
Ephemeral: &v1.EphemeralVolumeSource{},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
controller := true
|
||||
ephemeralClaim := &v1.PersistentVolumeClaim{
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
Namespace: ephemeralVolumePod.Namespace,
|
||||
Name: ephemeralVolumePod.Name + "-" + ephemeralVolumePod.Spec.Volumes[0].Name,
|
||||
OwnerReferences: []metav1.OwnerReference{
|
||||
{
|
||||
Kind: "Pod",
|
||||
Name: ephemeralVolumePod.Name,
|
||||
UID: ephemeralVolumePod.UID,
|
||||
Controller: &controller,
|
||||
},
|
||||
},
|
||||
},
|
||||
Spec: v1.PersistentVolumeClaimSpec{
|
||||
VolumeName: "missing",
|
||||
StorageClassName: &filterName,
|
||||
},
|
||||
}
|
||||
conflictingClaim := ephemeralClaim.DeepCopy()
|
||||
conflictingClaim.OwnerReferences = nil
|
||||
|
||||
tests := []struct {
|
||||
newPod *v1.Pod
|
||||
existingPods []*v1.Pod
|
||||
extraClaims []v1.PersistentVolumeClaim
|
||||
ephemeralEnabled bool
|
||||
maxVols int
|
||||
test string
|
||||
wantStatus *framework.Status
|
||||
}{
|
||||
{
|
||||
newPod: ephemeralVolumePod,
|
||||
test: "volume feature disabled",
|
||||
wantStatus: framework.NewStatus(framework.Error, "volume xyz is a generic ephemeral volume, but that feature is disabled in kube-scheduler"),
|
||||
},
|
||||
{
|
||||
newPod: ephemeralVolumePod,
|
||||
ephemeralEnabled: true,
|
||||
test: "volume missing",
|
||||
wantStatus: framework.NewStatus(framework.Error, `looking up PVC test/abc-xyz: persistentvolumeclaim "abc-xyz" not found`),
|
||||
},
|
||||
{
|
||||
newPod: ephemeralVolumePod,
|
||||
ephemeralEnabled: true,
|
||||
extraClaims: []v1.PersistentVolumeClaim{*conflictingClaim},
|
||||
test: "volume not owned",
|
||||
wantStatus: framework.NewStatus(framework.Error, "PVC test/abc-xyz is not owned by pod"),
|
||||
},
|
||||
{
|
||||
newPod: ephemeralVolumePod,
|
||||
ephemeralEnabled: true,
|
||||
extraClaims: []v1.PersistentVolumeClaim{*ephemeralClaim},
|
||||
maxVols: 1,
|
||||
test: "volume unbound, allowed",
|
||||
},
|
||||
{
|
||||
newPod: ephemeralVolumePod,
|
||||
ephemeralEnabled: true,
|
||||
extraClaims: []v1.PersistentVolumeClaim{*ephemeralClaim},
|
||||
maxVols: 0,
|
||||
test: "volume unbound, exceeds limit",
|
||||
wantStatus: framework.NewStatus(framework.Unschedulable, ErrReasonMaxVolumeCountExceeded),
|
||||
},
|
||||
}
|
||||
|
||||
for _, test := range tests {
|
||||
t.Run(test.test, func(t *testing.T) {
|
||||
fts := feature.Features{
|
||||
EnableGenericEphemeralVolume: test.ephemeralEnabled,
|
||||
}
|
||||
node, csiNode := getNodeWithPodAndVolumeLimits("node", test.existingPods, int64(test.maxVols), filterName)
|
||||
p := newNonCSILimits(filterName, getFakeCSINodeLister(csiNode), getFakeCSIStorageClassLister(filterName, driverName), getFakePVLister(filterName), append(getFakePVCLister(filterName), test.extraClaims...), fts).(framework.FilterPlugin)
|
||||
gotStatus := p.Filter(context.Background(), nil, test.newPod, node)
|
||||
if !reflect.DeepEqual(gotStatus, test.wantStatus) {
|
||||
t.Errorf("status does not match: %v, want: %v", gotStatus, test.wantStatus)
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestAzureDiskLimits(t *testing.T) {
|
||||
oneVolPod := &v1.Pod{
|
||||
Spec: v1.PodSpec{
|
||||
@ -360,7 +465,7 @@ func TestAzureDiskLimits(t *testing.T) {
|
||||
for _, test := range tests {
|
||||
t.Run(test.test, func(t *testing.T) {
|
||||
node, csiNode := getNodeWithPodAndVolumeLimits("node", test.existingPods, int64(test.maxVols), test.filterName)
|
||||
p := newNonCSILimits(test.filterName, getFakeCSINodeLister(csiNode), getFakeCSIStorageClassLister(test.filterName, test.driverName), getFakePVLister(test.filterName), getFakePVCLister(test.filterName)).(framework.FilterPlugin)
|
||||
p := newNonCSILimits(test.filterName, getFakeCSINodeLister(csiNode), getFakeCSIStorageClassLister(test.filterName, test.driverName), getFakePVLister(test.filterName), getFakePVCLister(test.filterName), feature.Features{}).(framework.FilterPlugin)
|
||||
gotStatus := p.Filter(context.Background(), nil, test.newPod, node)
|
||||
if !reflect.DeepEqual(gotStatus, test.wantStatus) {
|
||||
t.Errorf("status does not match: %v, want: %v", gotStatus, test.wantStatus)
|
||||
@ -427,7 +532,7 @@ func TestCinderLimits(t *testing.T) {
|
||||
for _, test := range tests {
|
||||
t.Run(test.test, func(t *testing.T) {
|
||||
node, csiNode := getNodeWithPodAndVolumeLimits("node", test.existingPods, int64(test.maxVols), test.filterName)
|
||||
p := newNonCSILimits(test.filterName, getFakeCSINodeLister(csiNode), getFakeCSIStorageClassLister(test.filterName, test.driverName), getFakePVLister(test.filterName), getFakePVCLister(test.filterName)).(framework.FilterPlugin)
|
||||
p := newNonCSILimits(test.filterName, getFakeCSINodeLister(csiNode), getFakeCSIStorageClassLister(test.filterName, test.driverName), getFakePVLister(test.filterName), getFakePVCLister(test.filterName), feature.Features{}).(framework.FilterPlugin)
|
||||
gotStatus := p.Filter(context.Background(), nil, test.newPod, node)
|
||||
if !reflect.DeepEqual(gotStatus, test.wantStatus) {
|
||||
t.Errorf("status does not match: %v, want: %v", gotStatus, test.wantStatus)
|
||||
@ -834,7 +939,7 @@ func TestEBSLimits(t *testing.T) {
|
||||
for _, test := range tests {
|
||||
t.Run(test.test, func(t *testing.T) {
|
||||
node, csiNode := getNodeWithPodAndVolumeLimits("node", test.existingPods, int64(test.maxVols), test.filterName)
|
||||
p := newNonCSILimits(test.filterName, getFakeCSINodeLister(csiNode), getFakeCSIStorageClassLister(test.filterName, test.driverName), getFakePVLister(test.filterName), getFakePVCLister(test.filterName)).(framework.FilterPlugin)
|
||||
p := newNonCSILimits(test.filterName, getFakeCSINodeLister(csiNode), getFakeCSIStorageClassLister(test.filterName, test.driverName), getFakePVLister(test.filterName), getFakePVCLister(test.filterName), feature.Features{}).(framework.FilterPlugin)
|
||||
gotStatus := p.Filter(context.Background(), nil, test.newPod, node)
|
||||
if !reflect.DeepEqual(gotStatus, test.wantStatus) {
|
||||
t.Errorf("status does not match: %v, want: %v", gotStatus, test.wantStatus)
|
||||
@ -1172,7 +1277,7 @@ func TestGCEPDLimits(t *testing.T) {
|
||||
for _, test := range tests {
|
||||
t.Run(test.test, func(t *testing.T) {
|
||||
node, csiNode := getNodeWithPodAndVolumeLimits("node", test.existingPods, int64(test.maxVols), test.filterName)
|
||||
p := newNonCSILimits(test.filterName, getFakeCSINodeLister(csiNode), getFakeCSIStorageClassLister(test.filterName, test.driverName), getFakePVLister(test.filterName), getFakePVCLister(test.filterName)).(framework.FilterPlugin)
|
||||
p := newNonCSILimits(test.filterName, getFakeCSINodeLister(csiNode), getFakeCSIStorageClassLister(test.filterName, test.driverName), getFakePVLister(test.filterName), getFakePVCLister(test.filterName), feature.Features{}).(framework.FilterPlugin)
|
||||
gotStatus := p.Filter(context.Background(), nil, test.newPod, node)
|
||||
if !reflect.DeepEqual(gotStatus, test.wantStatus) {
|
||||
t.Errorf("status does not match: %v, want: %v", gotStatus, test.wantStatus)
|
||||
|
@ -75,11 +75,11 @@ func NewInTreeRegistry() runtime.Registry {
|
||||
volumebinding.Name: runtime.FactoryAdapter(fts, volumebinding.New),
|
||||
volumerestrictions.Name: runtime.FactoryAdapter(fts, volumerestrictions.New),
|
||||
volumezone.Name: volumezone.New,
|
||||
nodevolumelimits.CSIName: nodevolumelimits.NewCSI,
|
||||
nodevolumelimits.EBSName: nodevolumelimits.NewEBS,
|
||||
nodevolumelimits.GCEPDName: nodevolumelimits.NewGCEPD,
|
||||
nodevolumelimits.AzureDiskName: nodevolumelimits.NewAzureDisk,
|
||||
nodevolumelimits.CinderName: nodevolumelimits.NewCinder,
|
||||
nodevolumelimits.CSIName: runtime.FactoryAdapter(fts, nodevolumelimits.NewCSI),
|
||||
nodevolumelimits.EBSName: runtime.FactoryAdapter(fts, nodevolumelimits.NewEBS),
|
||||
nodevolumelimits.GCEPDName: runtime.FactoryAdapter(fts, nodevolumelimits.NewGCEPD),
|
||||
nodevolumelimits.AzureDiskName: runtime.FactoryAdapter(fts, nodevolumelimits.NewAzureDisk),
|
||||
nodevolumelimits.CinderName: runtime.FactoryAdapter(fts, nodevolumelimits.NewCinder),
|
||||
interpodaffinity.Name: runtime.FactoryAdapter(fts, interpodaffinity.New),
|
||||
nodelabel.Name: nodelabel.New,
|
||||
serviceaffinity.Name: serviceaffinity.New,
|
||||
|
@ -197,7 +197,11 @@ var _ = utils.SIGDescribe("CSI mock volume", func() {
|
||||
framework.ExpectNoError(err, "Failed to register CSIDriver %v", m.config.GetUniqueDriverName())
|
||||
}
|
||||
|
||||
createPod := func(ephemeral bool) (class *storagev1.StorageClass, claim *v1.PersistentVolumeClaim, pod *v1.Pod) {
|
||||
type volumeType string
|
||||
csiEphemeral := volumeType("CSI")
|
||||
genericEphemeral := volumeType("Ephemeral")
|
||||
pvcReference := volumeType("PVC")
|
||||
createPod := func(withVolume volumeType) (class *storagev1.StorageClass, claim *v1.PersistentVolumeClaim, pod *v1.Pod) {
|
||||
ginkgo.By("Creating pod")
|
||||
sc := m.driver.GetDynamicProvisionStorageClass(m.config, "")
|
||||
scTest := testsuites.StorageClassTest{
|
||||
@ -213,12 +217,21 @@ var _ = utils.SIGDescribe("CSI mock volume", func() {
|
||||
|
||||
// The mock driver only works when everything runs on a single node.
|
||||
nodeSelection := m.config.ClientNodeSelection
|
||||
if ephemeral {
|
||||
switch withVolume {
|
||||
case csiEphemeral:
|
||||
pod = startPausePodInline(f.ClientSet, scTest, nodeSelection, f.Namespace.Name)
|
||||
if pod != nil {
|
||||
m.pods = append(m.pods, pod)
|
||||
case genericEphemeral:
|
||||
class, pod = startPausePodGenericEphemeral(f.ClientSet, scTest, nodeSelection, m.tp.scName, f.Namespace.Name)
|
||||
if class != nil {
|
||||
m.sc[class.Name] = class
|
||||
}
|
||||
} else {
|
||||
claim = &v1.PersistentVolumeClaim{
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
Name: pod.Name + "-" + pod.Spec.Volumes[0].Name,
|
||||
Namespace: f.Namespace.Name,
|
||||
},
|
||||
}
|
||||
case pvcReference:
|
||||
class, claim, pod = startPausePod(f.ClientSet, scTest, nodeSelection, m.tp.scName, f.Namespace.Name)
|
||||
if class != nil {
|
||||
m.sc[class.Name] = class
|
||||
@ -226,9 +239,9 @@ var _ = utils.SIGDescribe("CSI mock volume", func() {
|
||||
if claim != nil {
|
||||
m.pvcs = append(m.pvcs, claim)
|
||||
}
|
||||
if pod != nil {
|
||||
m.pods = append(m.pods, pod)
|
||||
}
|
||||
}
|
||||
if pod != nil {
|
||||
m.pods = append(m.pods, pod)
|
||||
}
|
||||
return // result variables set above
|
||||
}
|
||||
@ -318,6 +331,7 @@ var _ = utils.SIGDescribe("CSI mock volume", func() {
|
||||
name string
|
||||
disableAttach bool
|
||||
deployClusterRegistrar bool
|
||||
volumeType volumeType
|
||||
}{
|
||||
{
|
||||
name: "should not require VolumeAttach for drivers without attachment",
|
||||
@ -328,6 +342,11 @@ var _ = utils.SIGDescribe("CSI mock volume", func() {
|
||||
name: "should require VolumeAttach for drivers with attachment",
|
||||
deployClusterRegistrar: true,
|
||||
},
|
||||
{
|
||||
name: "should require VolumeAttach for ephemermal volume and drivers with attachment",
|
||||
deployClusterRegistrar: true,
|
||||
volumeType: genericEphemeral,
|
||||
},
|
||||
{
|
||||
name: "should preserve attachment policy when no CSIDriver present",
|
||||
deployClusterRegistrar: false,
|
||||
@ -340,7 +359,11 @@ var _ = utils.SIGDescribe("CSI mock volume", func() {
|
||||
init(testParameters{registerDriver: test.deployClusterRegistrar, disableAttach: test.disableAttach})
|
||||
defer cleanup()
|
||||
|
||||
_, claim, pod := createPod(false)
|
||||
volumeType := t.volumeType
|
||||
if volumeType == "" {
|
||||
volumeType = pvcReference
|
||||
}
|
||||
_, claim, pod := createPod(volumeType)
|
||||
if pod == nil {
|
||||
return
|
||||
}
|
||||
@ -375,7 +398,7 @@ var _ = utils.SIGDescribe("CSI mock volume", func() {
|
||||
init(testParameters{registerDriver: false, disableAttach: true})
|
||||
defer cleanup()
|
||||
|
||||
_, claim, pod := createPod(false /* persistent volume, late binding as specified above */)
|
||||
_, claim, pod := createPod(pvcReference) // late binding as specified above
|
||||
if pod == nil {
|
||||
return
|
||||
}
|
||||
@ -497,7 +520,11 @@ var _ = utils.SIGDescribe("CSI mock volume", func() {
|
||||
|
||||
defer cleanup()
|
||||
|
||||
_, _, pod := createPod(test.expectEphemeral)
|
||||
withVolume := pvcReference
|
||||
if test.expectEphemeral {
|
||||
withVolume = csiEphemeral
|
||||
}
|
||||
_, _, pod := createPod(withVolume)
|
||||
if pod == nil {
|
||||
return
|
||||
}
|
||||
@ -539,23 +566,73 @@ var _ = utils.SIGDescribe("CSI mock volume", func() {
|
||||
|
||||
gomega.Expect(csiNodeAttachLimit).To(gomega.BeNumerically("==", 2))
|
||||
|
||||
_, _, pod1 := createPod(false)
|
||||
_, _, pod1 := createPod(pvcReference)
|
||||
gomega.Expect(pod1).NotTo(gomega.BeNil(), "while creating first pod")
|
||||
|
||||
err = e2epod.WaitForPodNameRunningInNamespace(m.cs, pod1.Name, pod1.Namespace)
|
||||
framework.ExpectNoError(err, "Failed to start pod1: %v", err)
|
||||
|
||||
_, _, pod2 := createPod(false)
|
||||
_, _, pod2 := createPod(pvcReference)
|
||||
gomega.Expect(pod2).NotTo(gomega.BeNil(), "while creating second pod")
|
||||
|
||||
err = e2epod.WaitForPodNameRunningInNamespace(m.cs, pod2.Name, pod2.Namespace)
|
||||
framework.ExpectNoError(err, "Failed to start pod2: %v", err)
|
||||
|
||||
_, _, pod3 := createPod(false)
|
||||
_, _, pod3 := createPod(pvcReference)
|
||||
gomega.Expect(pod3).NotTo(gomega.BeNil(), "while creating third pod")
|
||||
err = waitForMaxVolumeCondition(pod3, m.cs)
|
||||
framework.ExpectNoError(err, "while waiting for max volume condition on pod : %+v", pod3)
|
||||
})
|
||||
|
||||
ginkgo.It("should report attach limit for generic ephemeral volume when persistent volume is attached [Slow]", func() {
|
||||
// define volume limit to be 2 for this test
|
||||
var err error
|
||||
init(testParameters{attachLimit: 1})
|
||||
defer cleanup()
|
||||
nodeName := m.config.ClientNodeSelection.Name
|
||||
driverName := m.config.GetUniqueDriverName()
|
||||
|
||||
csiNodeAttachLimit, err := checkCSINodeForLimits(nodeName, driverName, m.cs)
|
||||
framework.ExpectNoError(err, "while checking limits in CSINode: %v", err)
|
||||
|
||||
gomega.Expect(csiNodeAttachLimit).To(gomega.BeNumerically("==", 1))
|
||||
|
||||
_, _, pod1 := createPod(pvcReference)
|
||||
gomega.Expect(pod1).NotTo(gomega.BeNil(), "while creating pod with persistent volume")
|
||||
|
||||
err = e2epod.WaitForPodNameRunningInNamespace(m.cs, pod1.Name, pod1.Namespace)
|
||||
framework.ExpectNoError(err, "Failed to start pod1: %v", err)
|
||||
|
||||
_, _, pod2 := createPod(genericEphemeral)
|
||||
gomega.Expect(pod2).NotTo(gomega.BeNil(), "while creating pod with ephemeral volume")
|
||||
err = waitForMaxVolumeCondition(pod2, m.cs)
|
||||
framework.ExpectNoError(err, "while waiting for max volume condition on pod : %+v", pod2)
|
||||
})
|
||||
|
||||
ginkgo.It("should report attach limit for persistent volume when generic ephemeral volume is attached [Slow]", func() {
|
||||
// define volume limit to be 2 for this test
|
||||
var err error
|
||||
init(testParameters{attachLimit: 1})
|
||||
defer cleanup()
|
||||
nodeName := m.config.ClientNodeSelection.Name
|
||||
driverName := m.config.GetUniqueDriverName()
|
||||
|
||||
csiNodeAttachLimit, err := checkCSINodeForLimits(nodeName, driverName, m.cs)
|
||||
framework.ExpectNoError(err, "while checking limits in CSINode: %v", err)
|
||||
|
||||
gomega.Expect(csiNodeAttachLimit).To(gomega.BeNumerically("==", 1))
|
||||
|
||||
_, _, pod1 := createPod(genericEphemeral)
|
||||
gomega.Expect(pod1).NotTo(gomega.BeNil(), "while creating pod with persistent volume")
|
||||
|
||||
err = e2epod.WaitForPodNameRunningInNamespace(m.cs, pod1.Name, pod1.Namespace)
|
||||
framework.ExpectNoError(err, "Failed to start pod1: %v", err)
|
||||
|
||||
_, _, pod2 := createPod(pvcReference)
|
||||
gomega.Expect(pod2).NotTo(gomega.BeNil(), "while creating pod with ephemeral volume")
|
||||
err = waitForMaxVolumeCondition(pod2, m.cs)
|
||||
framework.ExpectNoError(err, "while waiting for max volume condition on pod : %+v", pod2)
|
||||
})
|
||||
})
|
||||
|
||||
ginkgo.Context("CSI Volume expansion", func() {
|
||||
@ -603,7 +680,7 @@ var _ = utils.SIGDescribe("CSI mock volume", func() {
|
||||
init(tp)
|
||||
defer cleanup()
|
||||
|
||||
sc, pvc, pod := createPod(false)
|
||||
sc, pvc, pod := createPod(pvcReference)
|
||||
gomega.Expect(pod).NotTo(gomega.BeNil(), "while creating pod for resizing")
|
||||
|
||||
framework.ExpectEqual(*sc.AllowVolumeExpansion, true, "failed creating sc with allowed expansion")
|
||||
@ -696,7 +773,7 @@ var _ = utils.SIGDescribe("CSI mock volume", func() {
|
||||
|
||||
defer cleanup()
|
||||
|
||||
sc, pvc, pod := createPod(false)
|
||||
sc, pvc, pod := createPod(pvcReference)
|
||||
gomega.Expect(pod).NotTo(gomega.BeNil(), "while creating pod for resizing")
|
||||
|
||||
framework.ExpectEqual(*sc.AllowVolumeExpansion, true, "failed creating sc with allowed expansion")
|
||||
@ -837,7 +914,7 @@ var _ = utils.SIGDescribe("CSI mock volume", func() {
|
||||
})
|
||||
defer cleanup()
|
||||
|
||||
_, claim, pod := createPod(false)
|
||||
_, claim, pod := createPod(pvcReference)
|
||||
if pod == nil {
|
||||
return
|
||||
}
|
||||
@ -975,7 +1052,7 @@ var _ = utils.SIGDescribe("CSI mock volume", func() {
|
||||
})
|
||||
defer cleanup()
|
||||
|
||||
_, claim, pod := createPod(false)
|
||||
_, claim, pod := createPod(pvcReference)
|
||||
if pod == nil {
|
||||
return
|
||||
}
|
||||
@ -1121,7 +1198,7 @@ var _ = utils.SIGDescribe("CSI mock volume", func() {
|
||||
framework.ExpectNoError(err, "create PVC watch")
|
||||
defer pvcWatch.Stop()
|
||||
|
||||
sc, claim, pod := createPod(false)
|
||||
sc, claim, pod := createPod(pvcReference)
|
||||
gomega.Expect(pod).NotTo(gomega.BeNil(), "while creating pod")
|
||||
bindingMode := storagev1.VolumeBindingImmediate
|
||||
if test.lateBinding {
|
||||
@ -1331,7 +1408,7 @@ var _ = utils.SIGDescribe("CSI mock volume", func() {
|
||||
syncDelay := 5 * time.Second
|
||||
time.Sleep(syncDelay)
|
||||
|
||||
sc, _, pod := createPod(false /* persistent volume, late binding as specified above */)
|
||||
sc, _, pod := createPod(pvcReference) // late binding as specified above
|
||||
framework.ExpectEqual(sc.Name, scName, "pre-selected storage class name not used")
|
||||
|
||||
waitCtx, cancel := context.WithTimeout(context.Background(), f.Timeouts.PodStart)
|
||||
@ -1530,7 +1607,7 @@ var _ = utils.SIGDescribe("CSI mock volume", func() {
|
||||
|
||||
defer cleanup()
|
||||
|
||||
_, _, pod := createPod(false)
|
||||
_, _, pod := createPod(pvcReference)
|
||||
if pod == nil {
|
||||
return
|
||||
}
|
||||
@ -1993,7 +2070,7 @@ func checkCSINodeForLimits(nodeName string, driverName string, cs clientset.Inte
|
||||
return attachLimit, nil
|
||||
}
|
||||
|
||||
func createClaim(cs clientset.Interface, t testsuites.StorageClassTest, node e2epod.NodeSelection, scName, ns string) (*storagev1.StorageClass, *v1.PersistentVolumeClaim) {
|
||||
func createSC(cs clientset.Interface, t testsuites.StorageClassTest, scName, ns string) *storagev1.StorageClass {
|
||||
class := newStorageClass(t, ns, "")
|
||||
if scName != "" {
|
||||
class.Name = scName
|
||||
@ -2005,12 +2082,17 @@ func createClaim(cs clientset.Interface, t testsuites.StorageClassTest, node e2e
|
||||
framework.ExpectNoError(err, "Failed to create class: %v", err)
|
||||
}
|
||||
|
||||
return class
|
||||
}
|
||||
|
||||
func createClaim(cs clientset.Interface, t testsuites.StorageClassTest, node e2epod.NodeSelection, scName, ns string) (*storagev1.StorageClass, *v1.PersistentVolumeClaim) {
|
||||
class := createSC(cs, t, scName, ns)
|
||||
claim := e2epv.MakePersistentVolumeClaim(e2epv.PersistentVolumeClaimConfig{
|
||||
ClaimSize: t.ClaimSize,
|
||||
StorageClassName: &(class.Name),
|
||||
VolumeMode: &t.VolumeMode,
|
||||
}, ns)
|
||||
claim, err = cs.CoreV1().PersistentVolumeClaims(ns).Create(context.TODO(), claim, metav1.CreateOptions{})
|
||||
claim, err := cs.CoreV1().PersistentVolumeClaims(ns).Create(context.TODO(), claim, metav1.CreateOptions{})
|
||||
framework.ExpectNoError(err, "Failed to create claim: %v", err)
|
||||
|
||||
if !t.DelayBinding {
|
||||
@ -2046,6 +2128,21 @@ func startPausePodInline(cs clientset.Interface, t testsuites.StorageClassTest,
|
||||
return pod
|
||||
}
|
||||
|
||||
func startPausePodGenericEphemeral(cs clientset.Interface, t testsuites.StorageClassTest, node e2epod.NodeSelection, scName, ns string) (*storagev1.StorageClass, *v1.Pod) {
|
||||
class := createSC(cs, t, scName, ns)
|
||||
claim := e2epv.MakePersistentVolumeClaim(e2epv.PersistentVolumeClaimConfig{
|
||||
ClaimSize: t.ClaimSize,
|
||||
StorageClassName: &(class.Name),
|
||||
VolumeMode: &t.VolumeMode,
|
||||
}, ns)
|
||||
pod, err := startPausePodWithVolumeSource(cs, v1.VolumeSource{
|
||||
Ephemeral: &v1.EphemeralVolumeSource{
|
||||
VolumeClaimTemplate: &v1.PersistentVolumeClaimTemplate{Spec: claim.Spec}},
|
||||
}, node, ns)
|
||||
framework.ExpectNoError(err, "Failed to create pod: %v", err)
|
||||
return class, pod
|
||||
}
|
||||
|
||||
func startPausePodWithClaim(cs clientset.Interface, pvc *v1.PersistentVolumeClaim, node e2epod.NodeSelection, ns string) (*v1.Pod, error) {
|
||||
return startPausePodWithVolumeSource(cs,
|
||||
v1.VolumeSource{
|
||||
|
Loading…
Reference in New Issue
Block a user