mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-21 10:51:29 +00:00
Merge pull request #93710 from Jiawei0227/attachable2non
Detect volume attach-ability in the middle of attaching
This commit is contained in:
commit
e23d83eead
@ -154,6 +154,22 @@ func (dswp *desiredStateOfWorldPopulator) findAndRemoveDeletedPods() {
|
|||||||
klog.V(1).Infof("Removing pod %q (UID %q) from dsw because it does not exist in pod informer.", dswPodKey, dswPodUID)
|
klog.V(1).Infof("Removing pod %q (UID %q) from dsw because it does not exist in pod informer.", dswPodKey, dswPodUID)
|
||||||
dswp.desiredStateOfWorld.DeletePod(dswPodUID, dswPodToAdd.VolumeName, dswPodToAdd.NodeName)
|
dswp.desiredStateOfWorld.DeletePod(dswPodUID, dswPodToAdd.VolumeName, dswPodToAdd.NodeName)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// check if the existing volumes changes its attachability
|
||||||
|
for _, volumeToAttach := range dswp.desiredStateOfWorld.GetVolumesToAttach() {
|
||||||
|
// IsAttachableVolume() will result in a fetch of CSIDriver object if the volume plugin type is CSI.
|
||||||
|
// The result is returned from CSIDriverLister which is from local cache. So this is not an expensive call.
|
||||||
|
volumeAttachable := volutil.IsAttachableVolume(volumeToAttach.VolumeSpec, dswp.volumePluginMgr)
|
||||||
|
if !volumeAttachable {
|
||||||
|
klog.Infof("Volume %v changes from attachable to non-attachable.", volumeToAttach.VolumeName)
|
||||||
|
for _, scheduledPod := range volumeToAttach.ScheduledPods {
|
||||||
|
podUID := volutil.GetUniquePodName(scheduledPod)
|
||||||
|
dswp.desiredStateOfWorld.DeletePod(podUID, volumeToAttach.VolumeName, volumeToAttach.NodeName)
|
||||||
|
klog.V(4).Infof("Removing podUID: %v, volume: %v on node: %v from desired state of world"+
|
||||||
|
" because of the change of volume attachability.", podUID, volumeToAttach.VolumeName, volumeToAttach.NodeName)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (dswp *desiredStateOfWorldPopulator) findAndAddActivePods() {
|
func (dswp *desiredStateOfWorldPopulator) findAndAddActivePods() {
|
||||||
|
@ -20,7 +20,7 @@ import (
|
|||||||
"testing"
|
"testing"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"k8s.io/api/core/v1"
|
v1 "k8s.io/api/core/v1"
|
||||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||||
k8stypes "k8s.io/apimachinery/pkg/types"
|
k8stypes "k8s.io/apimachinery/pkg/types"
|
||||||
"k8s.io/client-go/informers"
|
"k8s.io/client-go/informers"
|
||||||
@ -140,3 +140,86 @@ func TestFindAndAddActivePods_FindAndRemoveDeletedPods(t *testing.T) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestFindAndRemoveNonattachableVolumes(t *testing.T) {
|
||||||
|
fakeVolumePluginMgr, fakeVolumePlugin := volumetesting.GetTestVolumePluginMgr(t)
|
||||||
|
fakeClient := &fake.Clientset{}
|
||||||
|
|
||||||
|
fakeInformerFactory := informers.NewSharedInformerFactory(fakeClient, controller.NoResyncPeriodFunc())
|
||||||
|
fakePodInformer := fakeInformerFactory.Core().V1().Pods()
|
||||||
|
|
||||||
|
fakesDSW := cache.NewDesiredStateOfWorld(fakeVolumePluginMgr)
|
||||||
|
|
||||||
|
pod := &v1.Pod{
|
||||||
|
ObjectMeta: metav1.ObjectMeta{
|
||||||
|
Name: "dswp-test-pod",
|
||||||
|
UID: "dswp-test-pod-uid",
|
||||||
|
Namespace: "dswp-test",
|
||||||
|
},
|
||||||
|
Spec: v1.PodSpec{
|
||||||
|
NodeName: "dswp-test-host",
|
||||||
|
Volumes: []v1.Volume{
|
||||||
|
{
|
||||||
|
Name: "dswp-test-volume-name",
|
||||||
|
VolumeSource: v1.VolumeSource{
|
||||||
|
CSI: &v1.CSIVolumeSource{
|
||||||
|
Driver: "dswp-test-fake-csi-driver",
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
Status: v1.PodStatus{
|
||||||
|
Phase: v1.PodPhase("Running"),
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
fakePodInformer.Informer().GetStore().Add(pod)
|
||||||
|
|
||||||
|
generatedVolumeName := "fake-plugin/dswp-test-fake-csi-driver"
|
||||||
|
pvcLister := fakeInformerFactory.Core().V1().PersistentVolumeClaims().Lister()
|
||||||
|
pvLister := fakeInformerFactory.Core().V1().PersistentVolumes().Lister()
|
||||||
|
|
||||||
|
csiTranslator := csitrans.New()
|
||||||
|
dswp := &desiredStateOfWorldPopulator{
|
||||||
|
loopSleepDuration: 100 * time.Millisecond,
|
||||||
|
listPodsRetryDuration: 3 * time.Second,
|
||||||
|
desiredStateOfWorld: fakesDSW,
|
||||||
|
volumePluginMgr: fakeVolumePluginMgr,
|
||||||
|
podLister: fakePodInformer.Lister(),
|
||||||
|
pvcLister: pvcLister,
|
||||||
|
pvLister: pvLister,
|
||||||
|
csiMigratedPluginManager: csimigration.NewPluginManager(csiTranslator),
|
||||||
|
intreeToCSITranslator: csiTranslator,
|
||||||
|
}
|
||||||
|
|
||||||
|
//add the given node to the list of nodes managed by dsw
|
||||||
|
dswp.desiredStateOfWorld.AddNode(k8stypes.NodeName(pod.Spec.NodeName), false /*keepTerminatedPodVolumes*/)
|
||||||
|
|
||||||
|
dswp.findAndAddActivePods()
|
||||||
|
|
||||||
|
expectedVolumeName := v1.UniqueVolumeName(generatedVolumeName)
|
||||||
|
|
||||||
|
//check if the given volume referenced by the pod is added to dsw
|
||||||
|
volumeExists := dswp.desiredStateOfWorld.VolumeExists(expectedVolumeName, k8stypes.NodeName(pod.Spec.NodeName))
|
||||||
|
if !volumeExists {
|
||||||
|
t.Fatalf(
|
||||||
|
"VolumeExists(%q) failed. Expected: <true> Actual: <%v>",
|
||||||
|
expectedVolumeName,
|
||||||
|
volumeExists)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Change the CSI volume plugin attachability
|
||||||
|
fakeVolumePlugin.NonAttachable = true
|
||||||
|
|
||||||
|
dswp.findAndRemoveDeletedPods()
|
||||||
|
|
||||||
|
// The volume should not exist after it becomes non-attachable
|
||||||
|
volumeExists = dswp.desiredStateOfWorld.VolumeExists(expectedVolumeName, k8stypes.NodeName(pod.Spec.NodeName))
|
||||||
|
if volumeExists {
|
||||||
|
t.Fatalf(
|
||||||
|
"VolumeExists(%q) failed. Expected: <false> Actual: <%v>",
|
||||||
|
expectedVolumeName,
|
||||||
|
volumeExists)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
@ -120,6 +120,9 @@ type DesiredStateOfWorld interface {
|
|||||||
|
|
||||||
// GetPodsWithErrors returns names of pods that have stored errors.
|
// GetPodsWithErrors returns names of pods that have stored errors.
|
||||||
GetPodsWithErrors() []types.UniquePodName
|
GetPodsWithErrors() []types.UniquePodName
|
||||||
|
|
||||||
|
// MarkVolumeAttachability updates the volume's attachability for a given volume
|
||||||
|
MarkVolumeAttachability(volumeName v1.UniqueVolumeName, attachable bool)
|
||||||
}
|
}
|
||||||
|
|
||||||
// VolumeToMount represents a volume that is attached to this node and needs to
|
// VolumeToMount represents a volume that is attached to this node and needs to
|
||||||
@ -233,8 +236,8 @@ func (dsw *desiredStateOfWorld) AddPodToVolume(
|
|||||||
|
|
||||||
// The unique volume name used depends on whether the volume is attachable/device-mountable
|
// The unique volume name used depends on whether the volume is attachable/device-mountable
|
||||||
// or not.
|
// or not.
|
||||||
attachable := dsw.isAttachableVolume(volumeSpec)
|
attachable := util.IsAttachableVolume(volumeSpec, dsw.volumePluginMgr)
|
||||||
deviceMountable := dsw.isDeviceMountableVolume(volumeSpec)
|
deviceMountable := util.IsDeviceMountableVolume(volumeSpec, dsw.volumePluginMgr)
|
||||||
if attachable || deviceMountable {
|
if attachable || deviceMountable {
|
||||||
// For attachable/device-mountable volumes, use the unique volume name as reported by
|
// For attachable/device-mountable volumes, use the unique volume name as reported by
|
||||||
// the plugin.
|
// the plugin.
|
||||||
@ -410,31 +413,6 @@ func (dsw *desiredStateOfWorld) GetVolumesToMount() []VolumeToMount {
|
|||||||
return volumesToMount
|
return volumesToMount
|
||||||
}
|
}
|
||||||
|
|
||||||
func (dsw *desiredStateOfWorld) isAttachableVolume(volumeSpec *volume.Spec) bool {
|
|
||||||
attachableVolumePlugin, _ :=
|
|
||||||
dsw.volumePluginMgr.FindAttachablePluginBySpec(volumeSpec)
|
|
||||||
if attachableVolumePlugin != nil {
|
|
||||||
volumeAttacher, err := attachableVolumePlugin.NewAttacher()
|
|
||||||
if err == nil && volumeAttacher != nil {
|
|
||||||
return true
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
return false
|
|
||||||
}
|
|
||||||
|
|
||||||
func (dsw *desiredStateOfWorld) isDeviceMountableVolume(volumeSpec *volume.Spec) bool {
|
|
||||||
deviceMountableVolumePlugin, _ := dsw.volumePluginMgr.FindDeviceMountablePluginBySpec(volumeSpec)
|
|
||||||
if deviceMountableVolumePlugin != nil {
|
|
||||||
volumeDeviceMounter, err := deviceMountableVolumePlugin.NewDeviceMounter()
|
|
||||||
if err == nil && volumeDeviceMounter != nil {
|
|
||||||
return true
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
return false
|
|
||||||
}
|
|
||||||
|
|
||||||
func (dsw *desiredStateOfWorld) AddErrorToPod(podName types.UniquePodName, err string) {
|
func (dsw *desiredStateOfWorld) AddErrorToPod(podName types.UniquePodName, err string) {
|
||||||
dsw.Lock()
|
dsw.Lock()
|
||||||
defer dsw.Unlock()
|
defer dsw.Unlock()
|
||||||
@ -469,3 +447,14 @@ func (dsw *desiredStateOfWorld) GetPodsWithErrors() []types.UniquePodName {
|
|||||||
}
|
}
|
||||||
return pods
|
return pods
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (dsw *desiredStateOfWorld) MarkVolumeAttachability(volumeName v1.UniqueVolumeName, attachable bool) {
|
||||||
|
dsw.Lock()
|
||||||
|
defer dsw.Unlock()
|
||||||
|
volumeObj, volumeExists := dsw.volumesToMount[volumeName]
|
||||||
|
if !volumeExists {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
volumeObj.pluginIsAttachable = attachable
|
||||||
|
dsw.volumesToMount[volumeName] = volumeObj
|
||||||
|
}
|
||||||
|
@ -60,6 +60,7 @@ go_test(
|
|||||||
"//pkg/kubelet/status:go_default_library",
|
"//pkg/kubelet/status:go_default_library",
|
||||||
"//pkg/kubelet/status/testing:go_default_library",
|
"//pkg/kubelet/status/testing:go_default_library",
|
||||||
"//pkg/kubelet/volumemanager/cache:go_default_library",
|
"//pkg/kubelet/volumemanager/cache:go_default_library",
|
||||||
|
"//pkg/volume:go_default_library",
|
||||||
"//pkg/volume/csimigration:go_default_library",
|
"//pkg/volume/csimigration:go_default_library",
|
||||||
"//pkg/volume/testing:go_default_library",
|
"//pkg/volume/testing:go_default_library",
|
||||||
"//pkg/volume/util:go_default_library",
|
"//pkg/volume/util:go_default_library",
|
||||||
|
@ -91,7 +91,8 @@ func NewDesiredStateOfWorldPopulator(
|
|||||||
kubeContainerRuntime kubecontainer.Runtime,
|
kubeContainerRuntime kubecontainer.Runtime,
|
||||||
keepTerminatedPodVolumes bool,
|
keepTerminatedPodVolumes bool,
|
||||||
csiMigratedPluginManager csimigration.PluginManager,
|
csiMigratedPluginManager csimigration.PluginManager,
|
||||||
intreeToCSITranslator csimigration.InTreeToCSITranslator) DesiredStateOfWorldPopulator {
|
intreeToCSITranslator csimigration.InTreeToCSITranslator,
|
||||||
|
volumePluginMgr *volume.VolumePluginMgr) DesiredStateOfWorldPopulator {
|
||||||
return &desiredStateOfWorldPopulator{
|
return &desiredStateOfWorldPopulator{
|
||||||
kubeClient: kubeClient,
|
kubeClient: kubeClient,
|
||||||
loopSleepDuration: loopSleepDuration,
|
loopSleepDuration: loopSleepDuration,
|
||||||
@ -108,6 +109,7 @@ func NewDesiredStateOfWorldPopulator(
|
|||||||
hasAddedPodsLock: sync.RWMutex{},
|
hasAddedPodsLock: sync.RWMutex{},
|
||||||
csiMigratedPluginManager: csiMigratedPluginManager,
|
csiMigratedPluginManager: csiMigratedPluginManager,
|
||||||
intreeToCSITranslator: intreeToCSITranslator,
|
intreeToCSITranslator: intreeToCSITranslator,
|
||||||
|
volumePluginMgr: volumePluginMgr,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -127,6 +129,7 @@ type desiredStateOfWorldPopulator struct {
|
|||||||
hasAddedPodsLock sync.RWMutex
|
hasAddedPodsLock sync.RWMutex
|
||||||
csiMigratedPluginManager csimigration.PluginManager
|
csiMigratedPluginManager csimigration.PluginManager
|
||||||
intreeToCSITranslator csimigration.InTreeToCSITranslator
|
intreeToCSITranslator csimigration.InTreeToCSITranslator
|
||||||
|
volumePluginMgr *volume.VolumePluginMgr
|
||||||
}
|
}
|
||||||
|
|
||||||
type processedPods struct {
|
type processedPods struct {
|
||||||
@ -222,6 +225,20 @@ func (dswp *desiredStateOfWorldPopulator) findAndRemoveDeletedPods() {
|
|||||||
for _, volumeToMount := range dswp.desiredStateOfWorld.GetVolumesToMount() {
|
for _, volumeToMount := range dswp.desiredStateOfWorld.GetVolumesToMount() {
|
||||||
pod, podExists := dswp.podManager.GetPodByUID(volumeToMount.Pod.UID)
|
pod, podExists := dswp.podManager.GetPodByUID(volumeToMount.Pod.UID)
|
||||||
if podExists {
|
if podExists {
|
||||||
|
|
||||||
|
// check if the attachability has changed for this volume
|
||||||
|
if volumeToMount.PluginIsAttachable {
|
||||||
|
attachableVolumePlugin, err := dswp.volumePluginMgr.FindAttachablePluginBySpec(volumeToMount.VolumeSpec)
|
||||||
|
// only this means the plugin is truly non-attachable
|
||||||
|
if err == nil && attachableVolumePlugin == nil {
|
||||||
|
// It is not possible right now for a CSI plugin to be both attachable and non-deviceMountable
|
||||||
|
// So the uniqueVolumeName should remain the same after the attachability change
|
||||||
|
dswp.desiredStateOfWorld.MarkVolumeAttachability(volumeToMount.VolumeName, false)
|
||||||
|
klog.Infof("Volume %v changes from attachable to non-attachable.", volumeToMount.VolumeName)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// Skip running pods
|
// Skip running pods
|
||||||
if !dswp.isPodTerminated(pod) {
|
if !dswp.isPodTerminated(pod) {
|
||||||
continue
|
continue
|
||||||
|
@ -40,6 +40,7 @@ import (
|
|||||||
"k8s.io/kubernetes/pkg/kubelet/status"
|
"k8s.io/kubernetes/pkg/kubelet/status"
|
||||||
statustest "k8s.io/kubernetes/pkg/kubelet/status/testing"
|
statustest "k8s.io/kubernetes/pkg/kubelet/status/testing"
|
||||||
"k8s.io/kubernetes/pkg/kubelet/volumemanager/cache"
|
"k8s.io/kubernetes/pkg/kubelet/volumemanager/cache"
|
||||||
|
"k8s.io/kubernetes/pkg/volume"
|
||||||
"k8s.io/kubernetes/pkg/volume/csimigration"
|
"k8s.io/kubernetes/pkg/volume/csimigration"
|
||||||
volumetesting "k8s.io/kubernetes/pkg/volume/testing"
|
volumetesting "k8s.io/kubernetes/pkg/volume/testing"
|
||||||
"k8s.io/kubernetes/pkg/volume/util"
|
"k8s.io/kubernetes/pkg/volume/util"
|
||||||
@ -373,6 +374,86 @@ func TestFindAndRemoveDeletedPodsWithActualState(t *testing.T) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestFindAndRemoveNonattachableVolumes(t *testing.T) {
|
||||||
|
// create dswp
|
||||||
|
mode := v1.PersistentVolumeFilesystem
|
||||||
|
pv := &v1.PersistentVolume{
|
||||||
|
ObjectMeta: metav1.ObjectMeta{
|
||||||
|
Name: "dswp-test-volume-name",
|
||||||
|
},
|
||||||
|
Spec: v1.PersistentVolumeSpec{
|
||||||
|
ClaimRef: &v1.ObjectReference{Namespace: "ns", Name: "file-bound"},
|
||||||
|
VolumeMode: &mode,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
pvc := &v1.PersistentVolumeClaim{
|
||||||
|
Spec: v1.PersistentVolumeClaimSpec{
|
||||||
|
VolumeName: "dswp-test-volume-name",
|
||||||
|
},
|
||||||
|
Status: v1.PersistentVolumeClaimStatus{
|
||||||
|
Phase: v1.ClaimBound,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
fakeVolumePluginMgr, fakeVolumePlugin := volumetesting.GetTestVolumePluginMgr(t)
|
||||||
|
dswp, fakePodManager, fakesDSW := createDswpWithVolumeWithCustomPluginMgr(t, pv, pvc, fakeVolumePluginMgr)
|
||||||
|
|
||||||
|
// create pod
|
||||||
|
containers := []v1.Container{
|
||||||
|
{
|
||||||
|
VolumeMounts: []v1.VolumeMount{
|
||||||
|
{
|
||||||
|
Name: "dswp-test-volume-name",
|
||||||
|
MountPath: "/mnt",
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
pod := createPodWithVolume("dswp-test-pod", "dswp-test-volume-name", "file-bound", containers)
|
||||||
|
|
||||||
|
fakePodManager.AddPod(pod)
|
||||||
|
|
||||||
|
podName := util.GetUniquePodName(pod)
|
||||||
|
|
||||||
|
generatedVolumeName := "fake-plugin/" + pod.Spec.Volumes[0].Name
|
||||||
|
|
||||||
|
dswp.findAndAddNewPods()
|
||||||
|
|
||||||
|
if !dswp.pods.processedPods[podName] {
|
||||||
|
t.Fatalf("Failed to record that the volumes for the specified pod: %s have been processed by the populator", podName)
|
||||||
|
}
|
||||||
|
|
||||||
|
expectedVolumeName := v1.UniqueVolumeName(generatedVolumeName)
|
||||||
|
|
||||||
|
volumeExists := fakesDSW.VolumeExists(expectedVolumeName)
|
||||||
|
if !volumeExists {
|
||||||
|
t.Fatalf(
|
||||||
|
"VolumeExists(%q) failed. Expected: <true> Actual: <%v>",
|
||||||
|
expectedVolumeName,
|
||||||
|
volumeExists)
|
||||||
|
}
|
||||||
|
|
||||||
|
// change the volume plugin from attachable to non-attachable
|
||||||
|
fakeVolumePlugin.NonAttachable = true
|
||||||
|
|
||||||
|
// The volume should still exist
|
||||||
|
verifyVolumeExistsInVolumesToMount(
|
||||||
|
t, v1.UniqueVolumeName(generatedVolumeName), false /* expectReportedInUse */, fakesDSW)
|
||||||
|
|
||||||
|
dswp.findAndRemoveDeletedPods()
|
||||||
|
// After the volume plugin changes to nonattachable, the corresponding volume attachable field should change.
|
||||||
|
volumesToMount := fakesDSW.GetVolumesToMount()
|
||||||
|
for _, volume := range volumesToMount {
|
||||||
|
if volume.VolumeName == expectedVolumeName {
|
||||||
|
if volume.PluginIsAttachable {
|
||||||
|
t.Fatalf(
|
||||||
|
"Volume %v in the list of desired state of world volumes to mount is still attachable. Expected not",
|
||||||
|
expectedVolumeName)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
func TestFindAndAddNewPods_FindAndRemoveDeletedPods_Valid_Block_VolumeDevices(t *testing.T) {
|
func TestFindAndAddNewPods_FindAndRemoveDeletedPods_Valid_Block_VolumeDevices(t *testing.T) {
|
||||||
// create dswp
|
// create dswp
|
||||||
mode := v1.PersistentVolumeBlock
|
mode := v1.PersistentVolumeBlock
|
||||||
@ -997,6 +1078,12 @@ func createPodWithVolume(pod, pv, pvc string, containers []v1.Container) *v1.Pod
|
|||||||
|
|
||||||
func createDswpWithVolume(t *testing.T, pv *v1.PersistentVolume, pvc *v1.PersistentVolumeClaim) (*desiredStateOfWorldPopulator, kubepod.Manager, cache.DesiredStateOfWorld) {
|
func createDswpWithVolume(t *testing.T, pv *v1.PersistentVolume, pvc *v1.PersistentVolumeClaim) (*desiredStateOfWorldPopulator, kubepod.Manager, cache.DesiredStateOfWorld) {
|
||||||
fakeVolumePluginMgr, _ := volumetesting.GetTestVolumePluginMgr(t)
|
fakeVolumePluginMgr, _ := volumetesting.GetTestVolumePluginMgr(t)
|
||||||
|
dswp, fakePodManager, fakesDSW := createDswpWithVolumeWithCustomPluginMgr(t, pv, pvc, fakeVolumePluginMgr)
|
||||||
|
return dswp, fakePodManager, fakesDSW
|
||||||
|
}
|
||||||
|
|
||||||
|
func createDswpWithVolumeWithCustomPluginMgr(t *testing.T, pv *v1.PersistentVolume, pvc *v1.PersistentVolumeClaim,
|
||||||
|
fakeVolumePluginMgr *volume.VolumePluginMgr) (*desiredStateOfWorldPopulator, kubepod.Manager, cache.DesiredStateOfWorld) {
|
||||||
fakeClient := &fake.Clientset{}
|
fakeClient := &fake.Clientset{}
|
||||||
fakeClient.AddReactor("get", "persistentvolumeclaims", func(action core.Action) (bool, runtime.Object, error) {
|
fakeClient.AddReactor("get", "persistentvolumeclaims", func(action core.Action) (bool, runtime.Object, error) {
|
||||||
return true, pvc, nil
|
return true, pvc, nil
|
||||||
@ -1031,6 +1118,7 @@ func createDswpWithVolume(t *testing.T, pv *v1.PersistentVolume, pvc *v1.Persist
|
|||||||
keepTerminatedPodVolumes: false,
|
keepTerminatedPodVolumes: false,
|
||||||
csiMigratedPluginManager: csimigration.NewPluginManager(csiTranslator),
|
csiMigratedPluginManager: csimigration.NewPluginManager(csiTranslator),
|
||||||
intreeToCSITranslator: csiTranslator,
|
intreeToCSITranslator: csiTranslator,
|
||||||
|
volumePluginMgr: fakeVolumePluginMgr,
|
||||||
}
|
}
|
||||||
return dswp, fakePodManager, fakesDSW
|
return dswp, fakePodManager, fakesDSW
|
||||||
}
|
}
|
||||||
|
@ -193,7 +193,8 @@ func NewVolumeManager(
|
|||||||
kubeContainerRuntime,
|
kubeContainerRuntime,
|
||||||
keepTerminatedPodVolumes,
|
keepTerminatedPodVolumes,
|
||||||
csiMigratedPluginManager,
|
csiMigratedPluginManager,
|
||||||
intreeToCSITranslator)
|
intreeToCSITranslator,
|
||||||
|
volumePluginMgr)
|
||||||
vm.reconciler = reconciler.NewReconciler(
|
vm.reconciler = reconciler.NewReconciler(
|
||||||
kubeClient,
|
kubeClient,
|
||||||
controllerAttachDetachEnabled,
|
controllerAttachDetachEnabled,
|
||||||
|
@ -381,6 +381,9 @@ type FakeVolumePlugin struct {
|
|||||||
ProvisionDelaySeconds int
|
ProvisionDelaySeconds int
|
||||||
SupportsRemount bool
|
SupportsRemount bool
|
||||||
|
|
||||||
|
// default to false which means it is attachable by default
|
||||||
|
NonAttachable bool
|
||||||
|
|
||||||
// Add callbacks as needed
|
// Add callbacks as needed
|
||||||
WaitForAttachHook func(spec *Spec, devicePath string, pod *v1.Pod, spectimeout time.Duration) (string, error)
|
WaitForAttachHook func(spec *Spec, devicePath string, pod *v1.Pod, spectimeout time.Duration) (string, error)
|
||||||
UnmountDeviceHook func(globalMountPath string) error
|
UnmountDeviceHook func(globalMountPath string) error
|
||||||
@ -444,6 +447,8 @@ func (plugin *FakeVolumePlugin) GetVolumeName(spec *Spec) (string, error) {
|
|||||||
} else if spec.PersistentVolume != nil &&
|
} else if spec.PersistentVolume != nil &&
|
||||||
spec.PersistentVolume.Spec.GCEPersistentDisk != nil {
|
spec.PersistentVolume.Spec.GCEPersistentDisk != nil {
|
||||||
volumeName = spec.PersistentVolume.Spec.GCEPersistentDisk.PDName
|
volumeName = spec.PersistentVolume.Spec.GCEPersistentDisk.PDName
|
||||||
|
} else if spec.Volume != nil && spec.Volume.CSI != nil {
|
||||||
|
volumeName = spec.Volume.CSI.Driver
|
||||||
}
|
}
|
||||||
if volumeName == "" {
|
if volumeName == "" {
|
||||||
volumeName = spec.Name()
|
volumeName = spec.Name()
|
||||||
@ -604,7 +609,7 @@ func (plugin *FakeVolumePlugin) GetNewDetacherCallCount() int {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (plugin *FakeVolumePlugin) CanAttach(spec *Spec) (bool, error) {
|
func (plugin *FakeVolumePlugin) CanAttach(spec *Spec) (bool, error) {
|
||||||
return true, nil
|
return !plugin.NonAttachable, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (plugin *FakeVolumePlugin) CanDeviceMount(spec *Spec) (bool, error) {
|
func (plugin *FakeVolumePlugin) CanDeviceMount(spec *Spec) (bool, error) {
|
||||||
|
@ -342,7 +342,7 @@ func (og *operationGenerator) GenerateAttachVolumeFunc(
|
|||||||
uncertainNode = derr.CurrentNode
|
uncertainNode = derr.CurrentNode
|
||||||
}
|
}
|
||||||
addErr := actualStateOfWorld.MarkVolumeAsUncertain(
|
addErr := actualStateOfWorld.MarkVolumeAsUncertain(
|
||||||
v1.UniqueVolumeName(""),
|
volumeToAttach.VolumeName,
|
||||||
volumeToAttach.VolumeSpec,
|
volumeToAttach.VolumeSpec,
|
||||||
uncertainNode)
|
uncertainNode)
|
||||||
if addErr != nil {
|
if addErr != nil {
|
||||||
@ -418,10 +418,9 @@ func (og *operationGenerator) GenerateDetachVolumeFunc(
|
|||||||
var err error
|
var err error
|
||||||
|
|
||||||
if volumeToDetach.VolumeSpec != nil {
|
if volumeToDetach.VolumeSpec != nil {
|
||||||
attachableVolumePlugin, err =
|
attachableVolumePlugin, err = findDetachablePluginBySpec(volumeToDetach.VolumeSpec, og.volumePluginMgr)
|
||||||
og.volumePluginMgr.FindAttachablePluginBySpec(volumeToDetach.VolumeSpec)
|
|
||||||
if err != nil || attachableVolumePlugin == nil {
|
if err != nil || attachableVolumePlugin == nil {
|
||||||
return volumetypes.GeneratedOperations{}, volumeToDetach.GenerateErrorDetailed("DetachVolume.FindAttachablePluginBySpec failed", err)
|
return volumetypes.GeneratedOperations{}, volumeToDetach.GenerateErrorDetailed("DetachVolume.findDetachablePluginBySpec failed", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
volumeName, err =
|
volumeName, err =
|
||||||
@ -1731,3 +1730,25 @@ func isDeviceOpened(deviceToDetach AttachedVolume, hostUtil hostutil.HostUtils)
|
|||||||
}
|
}
|
||||||
return deviceOpened, nil
|
return deviceOpened, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// findDetachablePluginBySpec is a variant of VolumePluginMgr.FindAttachablePluginByName() function.
|
||||||
|
// The difference is that it bypass the CanAttach() check for CSI plugin, i.e. it assumes all CSI plugin supports detach.
|
||||||
|
// The intention here is that a CSI plugin volume can end up in an Uncertain state, so that a detach
|
||||||
|
// operation will help it to detach no matter it actually has the ability to attach/detach.
|
||||||
|
func findDetachablePluginBySpec(spec *volume.Spec, pm *volume.VolumePluginMgr) (volume.AttachableVolumePlugin, error) {
|
||||||
|
volumePlugin, err := pm.FindPluginBySpec(spec)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
if attachableVolumePlugin, ok := volumePlugin.(volume.AttachableVolumePlugin); ok {
|
||||||
|
if attachableVolumePlugin.GetPluginName() == "kubernetes.io/csi" {
|
||||||
|
return attachableVolumePlugin, nil
|
||||||
|
}
|
||||||
|
if canAttach, err := attachableVolumePlugin.CanAttach(spec); err != nil {
|
||||||
|
return nil, err
|
||||||
|
} else if canAttach {
|
||||||
|
return attachableVolumePlugin, nil
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return nil, nil
|
||||||
|
}
|
||||||
|
@ -702,3 +702,29 @@ func IsMultiAttachAllowed(volumeSpec *volume.Spec) bool {
|
|||||||
// we don't know if it's supported or not and let the attacher fail later in cases it's not supported
|
// we don't know if it's supported or not and let the attacher fail later in cases it's not supported
|
||||||
return true
|
return true
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// IsAttachableVolume checks if the given volumeSpec is an attachable volume or not
|
||||||
|
func IsAttachableVolume(volumeSpec *volume.Spec, volumePluginMgr *volume.VolumePluginMgr) bool {
|
||||||
|
attachableVolumePlugin, _ := volumePluginMgr.FindAttachablePluginBySpec(volumeSpec)
|
||||||
|
if attachableVolumePlugin != nil {
|
||||||
|
volumeAttacher, err := attachableVolumePlugin.NewAttacher()
|
||||||
|
if err == nil && volumeAttacher != nil {
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
|
||||||
|
// IsDeviceMountableVolume checks if the given volumeSpec is an device mountable volume or not
|
||||||
|
func IsDeviceMountableVolume(volumeSpec *volume.Spec, volumePluginMgr *volume.VolumePluginMgr) bool {
|
||||||
|
deviceMountableVolumePlugin, _ := volumePluginMgr.FindDeviceMountablePluginBySpec(volumeSpec)
|
||||||
|
if deviceMountableVolumePlugin != nil {
|
||||||
|
volumeDeviceMounter, err := deviceMountableVolumePlugin.NewDeviceMounter()
|
||||||
|
if err == nil && volumeDeviceMounter != nil {
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
@ -42,7 +42,9 @@ import (
|
|||||||
cachetools "k8s.io/client-go/tools/cache"
|
cachetools "k8s.io/client-go/tools/cache"
|
||||||
watchtools "k8s.io/client-go/tools/watch"
|
watchtools "k8s.io/client-go/tools/watch"
|
||||||
"k8s.io/kubernetes/pkg/controller/volume/scheduling"
|
"k8s.io/kubernetes/pkg/controller/volume/scheduling"
|
||||||
|
"k8s.io/kubernetes/pkg/kubelet/events"
|
||||||
"k8s.io/kubernetes/test/e2e/framework"
|
"k8s.io/kubernetes/test/e2e/framework"
|
||||||
|
e2eevents "k8s.io/kubernetes/test/e2e/framework/events"
|
||||||
e2epod "k8s.io/kubernetes/test/e2e/framework/pod"
|
e2epod "k8s.io/kubernetes/test/e2e/framework/pod"
|
||||||
e2epv "k8s.io/kubernetes/test/e2e/framework/pv"
|
e2epv "k8s.io/kubernetes/test/e2e/framework/pv"
|
||||||
"k8s.io/kubernetes/test/e2e/storage/drivers"
|
"k8s.io/kubernetes/test/e2e/storage/drivers"
|
||||||
@ -58,6 +60,7 @@ const (
|
|||||||
csiNodeLimitUpdateTimeout = 5 * time.Minute
|
csiNodeLimitUpdateTimeout = 5 * time.Minute
|
||||||
csiPodUnschedulableTimeout = 5 * time.Minute
|
csiPodUnschedulableTimeout = 5 * time.Minute
|
||||||
csiResizeWaitPeriod = 5 * time.Minute
|
csiResizeWaitPeriod = 5 * time.Minute
|
||||||
|
csiVolumeAttachmentTimeout = 7 * time.Minute
|
||||||
// how long to wait for Resizing Condition on PVC to appear
|
// how long to wait for Resizing Condition on PVC to appear
|
||||||
csiResizingConditionWait = 2 * time.Minute
|
csiResizingConditionWait = 2 * time.Minute
|
||||||
|
|
||||||
@ -307,6 +310,77 @@ var _ = utils.SIGDescribe("CSI mock volume", func() {
|
|||||||
}
|
}
|
||||||
})
|
})
|
||||||
|
|
||||||
|
ginkgo.Context("CSI CSIDriver deployment after pod creation using non-attachable mock driver", func() {
|
||||||
|
ginkgo.It("should bringup pod after deploying CSIDriver attach=false [Slow]", func() {
|
||||||
|
var err error
|
||||||
|
init(testParameters{registerDriver: false, disableAttach: true})
|
||||||
|
defer cleanup()
|
||||||
|
|
||||||
|
_, claim, pod := createPod(false /* persistent volume, late binding as specified above */)
|
||||||
|
if pod == nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
ginkgo.By("Checking if attaching failed and pod cannot start")
|
||||||
|
eventSelector := fields.Set{
|
||||||
|
"involvedObject.kind": "Pod",
|
||||||
|
"involvedObject.name": pod.Name,
|
||||||
|
"involvedObject.namespace": pod.Namespace,
|
||||||
|
"reason": events.FailedAttachVolume,
|
||||||
|
}.AsSelector().String()
|
||||||
|
msg := "AttachVolume.Attach failed for volume"
|
||||||
|
|
||||||
|
err = e2eevents.WaitTimeoutForEvent(m.cs, pod.Namespace, eventSelector, msg, framework.PodStartTimeout)
|
||||||
|
if err != nil {
|
||||||
|
podErr := e2epod.WaitTimeoutForPodRunningInNamespace(m.cs, pod.Name, pod.Namespace, 10*time.Second)
|
||||||
|
framework.ExpectError(podErr, "Pod should not be in running status because attaching should failed")
|
||||||
|
// Events are unreliable, don't depend on the event. It's used only to speed up the test.
|
||||||
|
framework.Logf("Attach should fail and the corresponding event should show up, error: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// VolumeAttachment should be created because the default value for CSI attachable is true
|
||||||
|
ginkgo.By("Checking if VolumeAttachment was created for the pod")
|
||||||
|
handle := getVolumeHandle(m.cs, claim)
|
||||||
|
attachmentHash := sha256.Sum256([]byte(fmt.Sprintf("%s%s%s", handle, m.provisioner, m.config.ClientNodeSelection.Name)))
|
||||||
|
attachmentName := fmt.Sprintf("csi-%x", attachmentHash)
|
||||||
|
_, err = m.cs.StorageV1().VolumeAttachments().Get(context.TODO(), attachmentName, metav1.GetOptions{})
|
||||||
|
if err != nil {
|
||||||
|
if apierrors.IsNotFound(err) {
|
||||||
|
framework.ExpectNoError(err, "Expected VolumeAttachment but none was found")
|
||||||
|
} else {
|
||||||
|
framework.ExpectNoError(err, "Failed to find VolumeAttachment")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
ginkgo.By("Deploy CSIDriver object with attachRequired=false")
|
||||||
|
driverNamespace := m.config.DriverNamespace
|
||||||
|
|
||||||
|
canAttach := false
|
||||||
|
o := utils.PatchCSIOptions{
|
||||||
|
OldDriverName: "csi-mock",
|
||||||
|
NewDriverName: "csi-mock-" + f.UniqueName,
|
||||||
|
CanAttach: &canAttach,
|
||||||
|
}
|
||||||
|
cleanupCSIDriver, err := utils.CreateFromManifests(f, driverNamespace, func(item interface{}) error {
|
||||||
|
return utils.PatchCSIDeployment(f, o, item)
|
||||||
|
}, "test/e2e/testing-manifests/storage-csi/mock/csi-mock-driverinfo.yaml")
|
||||||
|
if err != nil {
|
||||||
|
framework.Failf("fail to deploy CSIDriver object: %v", err)
|
||||||
|
}
|
||||||
|
m.testCleanups = append(m.testCleanups, cleanupCSIDriver)
|
||||||
|
|
||||||
|
ginkgo.By("Wait for the pod in running status")
|
||||||
|
err = e2epod.WaitForPodNameRunningInNamespace(m.cs, pod.Name, pod.Namespace)
|
||||||
|
framework.ExpectNoError(err, "Failed to start pod: %v", err)
|
||||||
|
|
||||||
|
ginkgo.By(fmt.Sprintf("Wait for the volumeattachment to be deleted up to %v", csiVolumeAttachmentTimeout))
|
||||||
|
// This step can be slow because we have to wait either a NodeUpdate event happens or
|
||||||
|
// the detachment for this volume timeout so that we can do a force detach.
|
||||||
|
err = waitForVolumeAttachmentTerminated(attachmentName, m.cs)
|
||||||
|
framework.ExpectNoError(err, "Failed to delete VolumeAttachment: %v", err)
|
||||||
|
})
|
||||||
|
})
|
||||||
|
|
||||||
ginkgo.Context("CSI workload information using mock driver", func() {
|
ginkgo.Context("CSI workload information using mock driver", func() {
|
||||||
var (
|
var (
|
||||||
err error
|
err error
|
||||||
@ -1186,6 +1260,24 @@ func waitForMaxVolumeCondition(pod *v1.Pod, cs clientset.Interface) error {
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func waitForVolumeAttachmentTerminated(attachmentName string, cs clientset.Interface) error {
|
||||||
|
waitErr := wait.PollImmediate(10*time.Second, csiVolumeAttachmentTimeout, func() (bool, error) {
|
||||||
|
_, err := cs.StorageV1().VolumeAttachments().Get(context.TODO(), attachmentName, metav1.GetOptions{})
|
||||||
|
if err != nil {
|
||||||
|
// if the volumeattachment object is not found, it means it has been terminated.
|
||||||
|
if apierrors.IsNotFound(err) {
|
||||||
|
return true, nil
|
||||||
|
}
|
||||||
|
return false, err
|
||||||
|
}
|
||||||
|
return false, nil
|
||||||
|
})
|
||||||
|
if waitErr != nil {
|
||||||
|
return fmt.Errorf("error waiting volume attachment %v to terminate: %v", attachmentName, waitErr)
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
func checkCSINodeForLimits(nodeName string, driverName string, cs clientset.Interface) (int32, error) {
|
func checkCSINodeForLimits(nodeName string, driverName string, cs clientset.Interface) (int32, error) {
|
||||||
var attachLimit int32
|
var attachLimit int32
|
||||||
|
|
||||||
|
Loading…
Reference in New Issue
Block a user