From 973583e781b4c9cf1409d6fd801264d3853ed00b Mon Sep 17 00:00:00 2001 From: mtanino Date: Tue, 6 Feb 2018 15:44:07 -0500 Subject: [PATCH 1/2] Refactor volumehandler in operationexecutor --- .../volumemanager/reconciler/reconciler.go | 48 +-- .../reconciler/reconciler_test.go | 39 +- .../operationexecutor/operation_executor.go | 406 ++++++------------ .../operation_executor_test.go | 29 +- .../operationexecutor/operation_generator.go | 4 +- pkg/volume/util/volumehelper/volumehelper.go | 17 + 6 files changed, 229 insertions(+), 314 deletions(-) diff --git a/pkg/kubelet/volumemanager/reconciler/reconciler.go b/pkg/kubelet/volumemanager/reconciler/reconciler.go index f7293c6d76d..7d6846fcbc0 100644 --- a/pkg/kubelet/volumemanager/reconciler/reconciler.go +++ b/pkg/kubelet/volumemanager/reconciler/reconciler.go @@ -166,12 +166,10 @@ func (rc *reconciler) reconcile() { // Ensure volumes that should be unmounted are unmounted. for _, mountedVolume := range rc.actualStateOfWorld.GetMountedVolumes() { if !rc.desiredStateOfWorld.PodExistsInVolume(mountedVolume.PodName, mountedVolume.VolumeName) { - volumeHandler, err := operationexecutor.NewVolumeHandler(mountedVolume.VolumeSpec, rc.operationExecutor) - if err != nil { - glog.Errorf(mountedVolume.GenerateErrorDetailed(fmt.Sprintf("operationExecutor.NewVolumeHandler for UnmountVolume failed"), err).Error()) - continue - } - err = volumeHandler.UnmountVolumeHandler(mountedVolume.MountedVolume, rc.actualStateOfWorld) + // Volume is mounted, unmount it + glog.V(12).Infof(mountedVolume.GenerateMsgDetailed("Starting operationExecutor.UnmountVolume", "")) + err := rc.operationExecutor.UnmountVolume( + mountedVolume.MountedVolume, rc.actualStateOfWorld) if err != nil && !nestedpendingoperations.IsAlreadyExists(err) && !exponentialbackoff.IsExponentialBackoff(err) { @@ -236,12 +234,12 @@ func (rc *reconciler) reconcile() { if isRemount { remountingLogStr = "Volume is already mounted to pod, but remount was requested." } - volumeHandler, err := operationexecutor.NewVolumeHandler(volumeToMount.VolumeSpec, rc.operationExecutor) - if err != nil { - glog.Errorf(volumeToMount.GenerateErrorDetailed(fmt.Sprintf("operationExecutor.NewVolumeHandler for MountVolume failed"), err).Error()) - continue - } - err = volumeHandler.MountVolumeHandler(rc.waitForAttachTimeout, volumeToMount.VolumeToMount, rc.actualStateOfWorld, isRemount, remountingLogStr) + glog.V(12).Infof(volumeToMount.GenerateMsgDetailed("Starting operationExecutor.MountVolume", remountingLogStr)) + err := rc.operationExecutor.MountVolume( + rc.waitForAttachTimeout, + volumeToMount.VolumeToMount, + rc.actualStateOfWorld, + isRemount) if err != nil && !nestedpendingoperations.IsAlreadyExists(err) && !exponentialbackoff.IsExponentialBackoff(err) { @@ -265,12 +263,10 @@ func (rc *reconciler) reconcile() { if !rc.desiredStateOfWorld.VolumeExists(attachedVolume.VolumeName) && !rc.operationExecutor.IsOperationPending(attachedVolume.VolumeName, nestedpendingoperations.EmptyUniquePodName) { if attachedVolume.GloballyMounted { - volumeHandler, err := operationexecutor.NewVolumeHandler(attachedVolume.VolumeSpec, rc.operationExecutor) - if err != nil { - glog.Errorf(attachedVolume.GenerateErrorDetailed(fmt.Sprintf("operationExecutor.NewVolumeHandler for UnmountDevice failed"), err).Error()) - continue - } - err = volumeHandler.UnmountDeviceHandler(attachedVolume.AttachedVolume, rc.actualStateOfWorld, rc.mounter) + // Volume is globally mounted to device, unmount it + glog.V(12).Infof(attachedVolume.GenerateMsgDetailed("Starting operationExecutor.UnmountDevice", "")) + err := rc.operationExecutor.UnmountDevice( + attachedVolume.AttachedVolume, rc.actualStateOfWorld, rc.mounter) if err != nil && !nestedpendingoperations.IsAlreadyExists(err) && !exponentialbackoff.IsExponentialBackoff(err) { @@ -403,14 +399,9 @@ func (rc *reconciler) cleanupMounts(volume podVolume) { PluginName: volume.pluginName, PodUID: types.UID(volume.podName), } - volumeHandler, err := operationexecutor.NewVolumeHandlerWithMode(volume.volumeMode, rc.operationExecutor) - if err != nil { - glog.Errorf(mountedVolume.GenerateErrorDetailed(fmt.Sprintf("operationExecutor.NewVolumeHandler for UnmountVolume failed"), err).Error()) - return - } // TODO: Currently cleanupMounts only includes UnmountVolume operation. In the next PR, we will add // to unmount both volume and device in the same routine. - err = volumeHandler.UnmountVolumeHandler(mountedVolume, rc.actualStateOfWorld) + err := rc.operationExecutor.UnmountVolume(mountedVolume, rc.actualStateOfWorld) if err != nil { glog.Errorf(mountedVolume.GenerateErrorDetailed(fmt.Sprintf("volumeHandler.UnmountVolumeHandler for UnmountVolume failed"), err).Error()) return @@ -435,15 +426,12 @@ func (rc *reconciler) reconstructVolume(volume podVolume) (*reconstructedVolume, UID: types.UID(volume.podName), }, } - volumeHandler, err := operationexecutor.NewVolumeHandlerWithMode(volume.volumeMode, rc.operationExecutor) - if err != nil { - return nil, err - } mapperPlugin, err := rc.volumePluginMgr.FindMapperPluginByName(volume.pluginName) if err != nil { return nil, err } - volumeSpec, err := volumeHandler.ReconstructVolumeHandler( + volumeSpec, err := rc.operationExecutor.ReconstructVolumeOperation( + volume.volumeMode, plugin, mapperPlugin, pod.UID, @@ -466,7 +454,7 @@ func (rc *reconciler) reconstructVolume(volume podVolume) (*reconstructedVolume, uniqueVolumeName = volumehelper.GetUniqueVolumeNameForNonAttachableVolume(volume.podName, plugin, volumeSpec) } // Check existence of mount point for filesystem volume or symbolic link for block volume - isExist, checkErr := volumeHandler.CheckVolumeExistence(volume.mountPath, volumeSpec.Name(), rc.mounter, uniqueVolumeName, volume.podName, pod.UID, attachablePlugin) + isExist, checkErr := rc.operationExecutor.CheckVolumeExistenceOperation(volumeSpec, volume.mountPath, volumeSpec.Name(), rc.mounter, uniqueVolumeName, volume.podName, pod.UID, attachablePlugin) if checkErr != nil { return nil, err } diff --git a/pkg/kubelet/volumemanager/reconciler/reconciler_test.go b/pkg/kubelet/volumemanager/reconciler/reconciler_test.go index 187171ebda9..22b49d02485 100644 --- a/pkg/kubelet/volumemanager/reconciler/reconciler_test.go +++ b/pkg/kubelet/volumemanager/reconciler/reconciler_test.go @@ -835,6 +835,8 @@ func Test_GenerateMapVolumeFunc_Plugin_Not_Found(t *testing.T) { }, } + // Enable BlockVolume feature gate + utilfeature.DefaultFeatureGate.Set("BlockVolume=true") for name, tc := range testCases { t.Run(name, func(t *testing.T) { volumePluginMgr := &volume.VolumePluginMgr{} @@ -854,14 +856,20 @@ func Test_GenerateMapVolumeFunc_Plugin_Not_Found(t *testing.T) { }, Spec: v1.PodSpec{}, } - volumeToMount := operationexecutor.VolumeToMount{Pod: pod, VolumeSpec: &volume.Spec{}} - err := oex.MapVolume(waitForAttachTimeout, volumeToMount, asw) + volumeMode := v1.PersistentVolumeBlock + tmpSpec := &volume.Spec{PersistentVolume: &v1.PersistentVolume{Spec: v1.PersistentVolumeSpec{VolumeMode: &volumeMode}}} + volumeToMount := operationexecutor.VolumeToMount{ + Pod: pod, + VolumeSpec: tmpSpec} + err := oex.MountVolume(waitForAttachTimeout, volumeToMount, asw, false) // Assert if assert.Error(t, err) { assert.Contains(t, err.Error(), tc.expectedErrMsg) } }) } + // Rollback feature gate to false. + utilfeature.DefaultFeatureGate.Set("BlockVolume=false") } func Test_GenerateUnmapVolumeFunc_Plugin_Not_Found(t *testing.T) { @@ -882,6 +890,8 @@ func Test_GenerateUnmapVolumeFunc_Plugin_Not_Found(t *testing.T) { }, } + // Enable BlockVolume feature gate + utilfeature.DefaultFeatureGate.Set("BlockVolume=true") for name, tc := range testCases { t.Run(name, func(t *testing.T) { volumePluginMgr := &volume.VolumePluginMgr{} @@ -893,14 +903,20 @@ func Test_GenerateUnmapVolumeFunc_Plugin_Not_Found(t *testing.T) { nil, /* fakeRecorder */ false, /* checkNodeCapabilitiesBeforeMount */ nil)) - volumeToUnmount := operationexecutor.MountedVolume{PluginName: "fake-file-plugin"} - err := oex.UnmapVolume(volumeToUnmount, asw) + volumeMode := v1.PersistentVolumeBlock + tmpSpec := &volume.Spec{PersistentVolume: &v1.PersistentVolume{Spec: v1.PersistentVolumeSpec{VolumeMode: &volumeMode}}} + volumeToUnmount := operationexecutor.MountedVolume{ + PluginName: "fake-file-plugin", + VolumeSpec: tmpSpec} + err := oex.UnmountVolume(volumeToUnmount, asw) // Assert if assert.Error(t, err) { assert.Contains(t, err.Error(), tc.expectedErrMsg) } }) } + // Rollback feature gate to false. + utilfeature.DefaultFeatureGate.Set("BlockVolume=false") } func Test_GenerateUnmapDeviceFunc_Plugin_Not_Found(t *testing.T) { @@ -912,15 +928,17 @@ func Test_GenerateUnmapDeviceFunc_Plugin_Not_Found(t *testing.T) { "volumePlugin is nil": { volumePlugins: []volume.VolumePlugin{}, expectErr: true, - expectedErrMsg: "UnmapDevice.FindMapperPluginBySpec failed", + expectedErrMsg: "UnmapDevice.FindMapperPluginByName failed", }, "blockVolumePlugin is nil": { volumePlugins: volumetesting.NewFakeFileVolumePlugin(), expectErr: true, - expectedErrMsg: "UnmapDevice.FindMapperPluginBySpec failed to find BlockVolumeMapper plugin. Volume plugin is nil.", + expectedErrMsg: "UnmapDevice.FindMapperPluginByName failed to find BlockVolumeMapper plugin. Volume plugin is nil.", }, } + // Enable BlockVolume feature gate + utilfeature.DefaultFeatureGate.Set("BlockVolume=true") for name, tc := range testCases { t.Run(name, func(t *testing.T) { volumePluginMgr := &volume.VolumePluginMgr{} @@ -933,15 +951,18 @@ func Test_GenerateUnmapDeviceFunc_Plugin_Not_Found(t *testing.T) { false, /* checkNodeCapabilitiesBeforeMount */ nil)) var mounter mount.Interface - plugins := volumetesting.NewFakeFileVolumePlugin() - deviceToDetach := operationexecutor.AttachedVolume{VolumeSpec: &volume.Spec{}, PluginName: plugins[0].GetPluginName()} - err := oex.UnmapDevice(deviceToDetach, asw, mounter) + volumeMode := v1.PersistentVolumeBlock + tmpSpec := &volume.Spec{PersistentVolume: &v1.PersistentVolume{Spec: v1.PersistentVolumeSpec{VolumeMode: &volumeMode}}} + deviceToDetach := operationexecutor.AttachedVolume{VolumeSpec: tmpSpec, PluginName: "fake-file-plugin"} + err := oex.UnmountDevice(deviceToDetach, asw, mounter) // Assert if assert.Error(t, err) { assert.Contains(t, err.Error(), tc.expectedErrMsg) } }) } + // Rollback feature gate to false. + utilfeature.DefaultFeatureGate.Set("BlockVolume=false") } func waitForMount( diff --git a/pkg/volume/util/operationexecutor/operation_executor.go b/pkg/volume/util/operationexecutor/operation_executor.go index dec31299efd..486add34da3 100644 --- a/pkg/volume/util/operationexecutor/operation_executor.go +++ b/pkg/volume/util/operationexecutor/operation_executor.go @@ -28,9 +28,7 @@ import ( "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/types" - utilfeature "k8s.io/apiserver/pkg/util/feature" expandcache "k8s.io/kubernetes/pkg/controller/volume/expand/cache" - "k8s.io/kubernetes/pkg/features" "k8s.io/kubernetes/pkg/util/mount" "k8s.io/kubernetes/pkg/volume" "k8s.io/kubernetes/pkg/volume/util" @@ -83,7 +81,8 @@ type OperationExecutor interface { // Status.VolumesInUse list (operation fails with error if it is). DetachVolume(volumeToDetach AttachedVolume, verifySafeToDetach bool, actualStateOfWorld ActualStateOfWorldAttacherUpdater) error - // MountVolume mounts the volume to the pod specified in volumeToMount. + // If a volume has 'Filesystem' volumeMode, MountVolume mounts the + // volume to the pod specified in volumeToMount. // Specifically it will: // * Wait for the device to finish attaching (for attachable volumes only). // * Mount device to global mount path (for attachable volumes only). @@ -95,38 +94,36 @@ type OperationExecutor interface { // The parameter "isRemount" is informational and used to adjust logging // verbosity. An initial mount is more log-worthy than a remount, for // example. - MountVolume(waitForAttachTimeout time.Duration, volumeToMount VolumeToMount, actualStateOfWorld ActualStateOfWorldMounterUpdater, isRemount bool) error - - // UnmountVolume unmounts the volume from the pod specified in - // volumeToUnmount and updates the actual state of the world to reflect that. - UnmountVolume(volumeToUnmount MountedVolume, actualStateOfWorld ActualStateOfWorldMounterUpdater) error - - // UnmountDevice unmounts the volumes global mount path from the device (for - // attachable volumes only, freeing it for detach. It then updates the - // actual state of the world to reflect that. - UnmountDevice(deviceToDetach AttachedVolume, actualStateOfWorld ActualStateOfWorldMounterUpdater, mounter mount.Interface) error - - // MapVolume is used when the volumeMode is 'Block'. - // This method creates a symbolic link to the volume from both the pod - // specified in volumeToMount and global map path. + // + // For 'Block' volumeMode, this method creates a symbolic link to + // the volume from both the pod specified in volumeToMount and global map path. // Specifically it will: // * Wait for the device to finish attaching (for attachable volumes only). // * Update actual state of world to reflect volume is globally mounted/mapped. // * Map volume to global map path using symbolic link. // * Map the volume to the pod device map path using symbolic link. // * Update actual state of world to reflect volume is mounted/mapped to the pod path. - MapVolume(waitForAttachTimeout time.Duration, volumeToMount VolumeToMount, actualStateOfWorld ActualStateOfWorldMounterUpdater) error + MountVolume(waitForAttachTimeout time.Duration, volumeToMount VolumeToMount, actualStateOfWorld ActualStateOfWorldMounterUpdater, isRemount bool) error - // UnmapVolume unmaps symbolic link to the volume from both the pod device - // map path in volumeToUnmount and global map path. + // If a volume has 'Filesystem' volumeMode, UnmountVolume unmounts the + // volume from the pod specified in volumeToUnmount and updates the actual + // state of the world to reflect that. + // + // For 'Block' volumeMode, this method unmaps symbolic link to the volume + // from both the pod device map path in volumeToUnmount and global map path. // And then, updates the actual state of the world to reflect that. - UnmapVolume(volumeToUnmount MountedVolume, actualStateOfWorld ActualStateOfWorldMounterUpdater) error + UnmountVolume(volumeToUnmount MountedVolume, actualStateOfWorld ActualStateOfWorldMounterUpdater) error - // UnmapDevice checks number of symbolic links under global map path. - // If number of reference is zero, remove global map path directory and - // free a volume for detach. + // If a volume has 'Filesystem' volumeMode, UnmountDevice unmounts the + // volumes global mount path from the device (for attachable volumes only, + // freeing it for detach. It then updates the actual state of the world to + // reflect that. + // + // For 'Block' volumeMode, this method checks number of symbolic links under + // global map path. If number of reference is zero, remove global map path + // directory and free a volume for detach. // It then updates the actual state of the world to reflect that. - UnmapDevice(deviceToDetach AttachedVolume, actualStateOfWorld ActualStateOfWorldMounterUpdater, mounter mount.Interface) error + UnmountDevice(deviceToDetach AttachedVolume, actualStateOfWorld ActualStateOfWorldMounterUpdater, mounter mount.Interface) error // VerifyControllerAttachedVolume checks if the specified volume is present // in the specified nodes AttachedVolumes Status field. It uses kubeClient @@ -145,6 +142,10 @@ type OperationExecutor interface { IsOperationPending(volumeName v1.UniqueVolumeName, podName volumetypes.UniquePodName) bool // Expand Volume will grow size available to PVC ExpandVolume(*expandcache.PVCWithResizeRequest, expandcache.VolumeResizeMap) error + // ReconstructVolumeOperation construct a new volumeSpec and returns it created by plugin + ReconstructVolumeOperation(volumeMode v1.PersistentVolumeMode, plugin volume.VolumePlugin, mapperPlugin volume.BlockVolumePlugin, uid types.UID, podName volumetypes.UniquePodName, volumeSpecName string, mountPath string, pluginName string) (*volume.Spec, error) + // CheckVolumeExistenceOperation checks volume existence + CheckVolumeExistenceOperation(volumeSpec *volume.Spec, mountPath, volumeName string, mounter mount.Interface, uniqueVolumeName v1.UniqueVolumeName, podName volumetypes.UniquePodName, podUID types.UID, attachable volume.AttachableVolumePlugin) (bool, error) } // NewOperationExecutor returns a new instance of OperationExecutor. @@ -707,13 +708,30 @@ func (oe *operationExecutor) MountVolume( volumeToMount VolumeToMount, actualStateOfWorld ActualStateOfWorldMounterUpdater, isRemount bool) error { - generatedOperations, err := oe.operationGenerator.GenerateMountVolumeFunc( - waitForAttachTimeout, volumeToMount, actualStateOfWorld, isRemount) + fsVolume, err := volumehelper.CheckVolumeModeFilesystem(volumeToMount.VolumeSpec) if err != nil { return err } + var generatedOperations volumetypes.GeneratedOperations + if fsVolume { + // Filesystem volume case + // Mount/remount a volume when a volume is attached + generatedOperations, err = oe.operationGenerator.GenerateMountVolumeFunc( + waitForAttachTimeout, volumeToMount, actualStateOfWorld, isRemount) + } else { + // Block volume case + // Creates a map to device if a volume is attached + generatedOperations, err = oe.operationGenerator.GenerateMapVolumeFunc( + waitForAttachTimeout, volumeToMount, actualStateOfWorld) + } + if err != nil { + return err + } + // Avoid executing mount/map from multiple pods referencing the + // same volume in parallel podName := nestedpendingoperations.EmptyUniquePodName + // TODO: remove this -- not necessary if !volumeToMount.PluginIsAttachable { // Non-attachable volume plugins can execute mount for multiple pods @@ -729,14 +747,26 @@ func (oe *operationExecutor) MountVolume( func (oe *operationExecutor) UnmountVolume( volumeToUnmount MountedVolume, actualStateOfWorld ActualStateOfWorldMounterUpdater) error { - - generatedOperations, err := - oe.operationGenerator.GenerateUnmountVolumeFunc(volumeToUnmount, actualStateOfWorld) + fsVolume, err := volumehelper.CheckVolumeModeFilesystem(volumeToUnmount.VolumeSpec) if err != nil { return err } - - // All volume plugins can execute mount for multiple pods referencing the + var generatedOperations volumetypes.GeneratedOperations + if fsVolume { + // Filesystem volume case + // Unmount a volume if a volume is mounted + generatedOperations, err = oe.operationGenerator.GenerateUnmountVolumeFunc( + volumeToUnmount, actualStateOfWorld) + } else { + // Block volume case + // Unmap a volume if a volume is mapped + generatedOperations, err = oe.operationGenerator.GenerateUnmapVolumeFunc( + volumeToUnmount, actualStateOfWorld) + } + if err != nil { + return err + } + // All volume plugins can execute unmount/unmap for multiple pods referencing the // same volume in parallel podName := volumetypes.UniquePodName(volumeToUnmount.PodUID) @@ -748,14 +778,31 @@ func (oe *operationExecutor) UnmountDevice( deviceToDetach AttachedVolume, actualStateOfWorld ActualStateOfWorldMounterUpdater, mounter mount.Interface) error { - generatedOperations, err := - oe.operationGenerator.GenerateUnmountDeviceFunc(deviceToDetach, actualStateOfWorld, mounter) + fsVolume, err := volumehelper.CheckVolumeModeFilesystem(deviceToDetach.VolumeSpec) if err != nil { return err } + var generatedOperations volumetypes.GeneratedOperations + if fsVolume { + // Filesystem volume case + // Unmount and detach a device if a volume isn't referenced + generatedOperations, err = oe.operationGenerator.GenerateUnmountDeviceFunc( + deviceToDetach, actualStateOfWorld, mounter) + } else { + // Block volume case + // Detach a device and remove loopback if a volume isn't referenced + generatedOperations, err = oe.operationGenerator.GenerateUnmapDeviceFunc( + deviceToDetach, actualStateOfWorld, mounter) + } + if err != nil { + return err + } + // Avoid executing unmount/unmap device from multiple pods referencing + // the same volume in parallel + podName := nestedpendingoperations.EmptyUniquePodName return oe.pendingOperations.Run( - deviceToDetach.VolumeName, "" /* podName */, generatedOperations) + deviceToDetach.VolumeName, podName, generatedOperations) } func (oe *operationExecutor) ExpandVolume(pvcWithResizeRequest *expandcache.PVCWithResizeRequest, resizeMap expandcache.VolumeResizeMap) error { @@ -769,65 +816,6 @@ func (oe *operationExecutor) ExpandVolume(pvcWithResizeRequest *expandcache.PVCW return oe.pendingOperations.Run(uniqueVolumeKey, "", generatedOperations) } -func (oe *operationExecutor) MapVolume( - waitForAttachTimeout time.Duration, - volumeToMount VolumeToMount, - actualStateOfWorld ActualStateOfWorldMounterUpdater) error { - generatedOperations, err := oe.operationGenerator.GenerateMapVolumeFunc( - waitForAttachTimeout, volumeToMount, actualStateOfWorld) - if err != nil { - return err - } - - // Avoid executing map from multiple pods referencing the - // same volume in parallel - podName := nestedpendingoperations.EmptyUniquePodName - // TODO: remove this -- not necessary - if !volumeToMount.PluginIsAttachable { - // Non-attachable volume plugins can execute mount for multiple pods - // referencing the same volume in parallel - podName = volumehelper.GetUniquePodName(volumeToMount.Pod) - } - - return oe.pendingOperations.Run( - volumeToMount.VolumeName, podName, generatedOperations) -} - -func (oe *operationExecutor) UnmapVolume( - volumeToUnmount MountedVolume, - actualStateOfWorld ActualStateOfWorldMounterUpdater) error { - generatedOperations, err := - oe.operationGenerator.GenerateUnmapVolumeFunc(volumeToUnmount, actualStateOfWorld) - if err != nil { - return err - } - - // All volume plugins can execute unmap for multiple pods referencing the - // same volume in parallel - podName := volumetypes.UniquePodName(volumeToUnmount.PodUID) - - return oe.pendingOperations.Run( - volumeToUnmount.VolumeName, podName, generatedOperations) -} - -func (oe *operationExecutor) UnmapDevice( - deviceToDetach AttachedVolume, - actualStateOfWorld ActualStateOfWorldMounterUpdater, - mounter mount.Interface) error { - generatedOperations, err := - oe.operationGenerator.GenerateUnmapDeviceFunc(deviceToDetach, actualStateOfWorld, mounter) - if err != nil { - return err - } - - // Avoid executing unmap device from multiple pods referencing - // the same volume in parallel - podName := nestedpendingoperations.EmptyUniquePodName - - return oe.pendingOperations.Run( - deviceToDetach.VolumeName, podName, generatedOperations) -} - func (oe *operationExecutor) VerifyControllerAttachedVolume( volumeToMount VolumeToMount, nodeName types.NodeName, @@ -842,177 +830,30 @@ func (oe *operationExecutor) VerifyControllerAttachedVolume( volumeToMount.VolumeName, "" /* podName */, generatedOperations) } -// VolumeStateHandler defines a set of operations for handling mount/unmount/detach/reconstruct volume-related operations -type VolumeStateHandler interface { - // Volume is attached, mount/map it - MountVolumeHandler(waitForAttachTimeout time.Duration, volumeToMount VolumeToMount, actualStateOfWorld ActualStateOfWorldMounterUpdater, isRemount bool, remountingLogStr string) error - // Volume is mounted/mapped, unmount/unmap it - UnmountVolumeHandler(mountedVolume MountedVolume, actualStateOfWorld ActualStateOfWorldMounterUpdater) error - // Volume is not referenced from pod, unmount/unmap and detach it - UnmountDeviceHandler(attachedVolume AttachedVolume, actualStateOfWorld ActualStateOfWorldMounterUpdater, mounter mount.Interface) error - // Reconstruct volume from mount path - ReconstructVolumeHandler(plugin volume.VolumePlugin, mapperPlugin volume.BlockVolumePlugin, uid types.UID, podName volumetypes.UniquePodName, volumeSpecName string, mountPath string, pluginName string) (*volume.Spec, error) - // check mount path if volume still exists - CheckVolumeExistence(mountPath, volumeName string, mounter mount.Interface, uniqueVolumeName v1.UniqueVolumeName, podName volumetypes.UniquePodName, podUID types.UID, attachable volume.AttachableVolumePlugin) (bool, error) -} +// ReconstructVolumeOperation return a func to create volumeSpec from mount path +func (oe *operationExecutor) ReconstructVolumeOperation( + volumeMode v1.PersistentVolumeMode, + plugin volume.VolumePlugin, + mapperPlugin volume.BlockVolumePlugin, + uid types.UID, + podName volumetypes.UniquePodName, + volumeSpecName string, + mountPath string, + pluginName string) (*volume.Spec, error) { -// NewVolumeHandler return a new instance of volumeHandler depens on a volumeMode -func NewVolumeHandler(volumeSpec *volume.Spec, oe OperationExecutor) (VolumeStateHandler, error) { - - // TODO: remove feature gate check after no longer needed - var volumeHandler VolumeStateHandler - if utilfeature.DefaultFeatureGate.Enabled(features.BlockVolume) { - volumeMode, err := volumehelper.GetVolumeMode(volumeSpec) + // Filesystem Volume case + if volumeMode == v1.PersistentVolumeFilesystem { + // Create volumeSpec from mount path + glog.V(12).Infof("Starting operationExecutor.ReconstructVolumepodName") + volumeSpec, err := plugin.ConstructVolumeSpec(volumeSpecName, mountPath) if err != nil { return nil, err } - if volumeMode == v1.PersistentVolumeFilesystem { - volumeHandler = NewFilesystemVolumeHandler(oe) - } else { - volumeHandler = NewBlockVolumeHandler(oe) - } - } else { - volumeHandler = NewFilesystemVolumeHandler(oe) + return volumeSpec, nil } - return volumeHandler, nil -} -// NewVolumeHandlerWithMode return a new instance of volumeHandler depens on a volumeMode -func NewVolumeHandlerWithMode(volumeMode v1.PersistentVolumeMode, oe OperationExecutor) (VolumeStateHandler, error) { - var volumeHandler VolumeStateHandler - if utilfeature.DefaultFeatureGate.Enabled(features.BlockVolume) { - if volumeMode == v1.PersistentVolumeFilesystem { - volumeHandler = NewFilesystemVolumeHandler(oe) - } else { - volumeHandler = NewBlockVolumeHandler(oe) - } - } else { - volumeHandler = NewFilesystemVolumeHandler(oe) - } - return volumeHandler, nil -} - -// NewFilesystemVolumeHandler returns a new instance of FilesystemVolumeHandler. -func NewFilesystemVolumeHandler(operationExecutor OperationExecutor) FilesystemVolumeHandler { - return FilesystemVolumeHandler{ - oe: operationExecutor} -} - -// NewBlockVolumeHandler returns a new instance of BlockVolumeHandler. -func NewBlockVolumeHandler(operationExecutor OperationExecutor) BlockVolumeHandler { - return BlockVolumeHandler{ - oe: operationExecutor} -} - -// FilesystemVolumeHandler is VolumeHandler for Filesystem volume -type FilesystemVolumeHandler struct { - oe OperationExecutor -} - -// BlockVolumeHandler is VolumeHandler for Block volume -type BlockVolumeHandler struct { - oe OperationExecutor -} - -// MountVolumeHandler mount/remount a volume when a volume is attached -// This method is handler for filesystem volume -func (f FilesystemVolumeHandler) MountVolumeHandler(waitForAttachTimeout time.Duration, volumeToMount VolumeToMount, actualStateOfWorld ActualStateOfWorldMounterUpdater, isRemount bool, remountingLogStr string) error { - glog.V(12).Infof(volumeToMount.GenerateMsgDetailed("Starting operationExecutor.MountVolume", remountingLogStr)) - err := f.oe.MountVolume( - waitForAttachTimeout, - volumeToMount, - actualStateOfWorld, - isRemount) - return err -} - -// UnmountVolumeHandler unmount a volume if a volume is mounted -// This method is handler for filesystem volume -func (f FilesystemVolumeHandler) UnmountVolumeHandler(mountedVolume MountedVolume, actualStateOfWorld ActualStateOfWorldMounterUpdater) error { - glog.V(12).Infof(mountedVolume.GenerateMsgDetailed("Starting operationExecutor.UnmountVolume", "")) - err := f.oe.UnmountVolume( - mountedVolume, - actualStateOfWorld) - return err -} - -// UnmountDeviceHandler unmount and detach a device if a volume isn't referenced -// This method is handler for filesystem volume -func (f FilesystemVolumeHandler) UnmountDeviceHandler(attachedVolume AttachedVolume, actualStateOfWorld ActualStateOfWorldMounterUpdater, mounter mount.Interface) error { - glog.V(12).Infof(attachedVolume.GenerateMsgDetailed("Starting operationExecutor.UnmountDevice", "")) - err := f.oe.UnmountDevice( - attachedVolume, - actualStateOfWorld, - mounter) - return err -} - -// ReconstructVolumeHandler create volumeSpec from mount path -// This method is handler for filesystem volume -func (f FilesystemVolumeHandler) ReconstructVolumeHandler(plugin volume.VolumePlugin, _ volume.BlockVolumePlugin, _ types.UID, _ volumetypes.UniquePodName, volumeSpecName string, mountPath string, _ string) (*volume.Spec, error) { - glog.V(4).Infof("Starting operationExecutor.ReconstructVolumepodName volume spec name %s, mount path %s", volumeSpecName, mountPath) - volumeSpec, err := plugin.ConstructVolumeSpec(volumeSpecName, mountPath) - if err != nil { - return nil, err - } - return volumeSpec, nil -} - -// CheckVolumeExistence checks mount path directory if volume still exists, return true if volume is there -// Also return true for non-attachable volume case without mount point check -// This method is handler for filesystem volume -func (f FilesystemVolumeHandler) CheckVolumeExistence(mountPath, volumeName string, mounter mount.Interface, uniqueVolumeName v1.UniqueVolumeName, podName volumetypes.UniquePodName, podUID types.UID, attachable volume.AttachableVolumePlugin) (bool, error) { - if attachable != nil { - var isNotMount bool - var mountCheckErr error - if isNotMount, mountCheckErr = mounter.IsLikelyNotMountPoint(mountPath); mountCheckErr != nil { - return false, fmt.Errorf("Could not check whether the volume %q (spec.Name: %q) pod %q (UID: %q) is mounted with: %v", - uniqueVolumeName, - volumeName, - podName, - podUID, - mountCheckErr) - } - return !isNotMount, nil - } - return true, nil -} - -// MountVolumeHandler creates a map to device if a volume is attached -// This method is handler for block volume -func (b BlockVolumeHandler) MountVolumeHandler(waitForAttachTimeout time.Duration, volumeToMount VolumeToMount, actualStateOfWorld ActualStateOfWorldMounterUpdater, _ bool, _ string) error { - glog.V(12).Infof(volumeToMount.GenerateMsgDetailed("Starting operationExecutor.MapVolume", "")) - err := b.oe.MapVolume( - waitForAttachTimeout, - volumeToMount, - actualStateOfWorld) - return err -} - -// UnmountVolumeHandler unmap a volume if a volume is mapped -// This method is handler for block volume -func (b BlockVolumeHandler) UnmountVolumeHandler(mountedVolume MountedVolume, actualStateOfWorld ActualStateOfWorldMounterUpdater) error { - glog.V(12).Infof(mountedVolume.GenerateMsgDetailed("Starting operationExecutor.UnmapVolume", "")) - err := b.oe.UnmapVolume( - mountedVolume, - actualStateOfWorld) - return err -} - -// UnmountDeviceHandler detach a device and remove loopback if a volume isn't referenced -// This method is handler for block volume -func (b BlockVolumeHandler) UnmountDeviceHandler(attachedVolume AttachedVolume, actualStateOfWorld ActualStateOfWorldMounterUpdater, mounter mount.Interface) error { - glog.V(12).Infof(attachedVolume.GenerateMsgDetailed("Starting operationExecutor.UnmapDevice", "")) - err := b.oe.UnmapDevice( - attachedVolume, - actualStateOfWorld, - mounter) - return err -} - -// ReconstructVolumeHandler create volumeSpec from mount path -// This method is handler for block volume -func (b BlockVolumeHandler) ReconstructVolumeHandler(_ volume.VolumePlugin, mapperPlugin volume.BlockVolumePlugin, uid types.UID, podName volumetypes.UniquePodName, volumeSpecName string, mountPath string, pluginName string) (*volume.Spec, error) { + // Block Volume case + // Create volumeSpec from mount path glog.V(12).Infof("Starting operationExecutor.ReconstructVolume") if mapperPlugin == nil { return nil, fmt.Errorf("Could not find block volume plugin %q (spec.Name: %q) pod %q (UID: %q)", @@ -1031,12 +872,47 @@ func (b BlockVolumeHandler) ReconstructVolumeHandler(_ volume.VolumePlugin, mapp return volumeSpec, nil } -// CheckVolumeExistence checks mount path directory if volume still exists, then return -// true if volume is there. Either plugin is attachable or non-attachable, the plugin -// should have symbolic link associated to raw block device under pod device map -// if volume exists. -// This method is handler for block volume -func (b BlockVolumeHandler) CheckVolumeExistence(mountPath, volumeName string, mounter mount.Interface, uniqueVolumeName v1.UniqueVolumeName, podName volumetypes.UniquePodName, podUID types.UID, _ volume.AttachableVolumePlugin) (bool, error) { +// CheckVolumeExistenceOperation return a func() to check mount path directory if volume still exists +func (oe *operationExecutor) CheckVolumeExistenceOperation( + volumeSpec *volume.Spec, + mountPath, volumeName string, + mounter mount.Interface, + uniqueVolumeName v1.UniqueVolumeName, + podName volumetypes.UniquePodName, + podUID types.UID, + attachable volume.AttachableVolumePlugin) (bool, error) { + fsVolume, err := volumehelper.CheckVolumeModeFilesystem(volumeSpec) + if err != nil { + return false, err + } + + // Filesystem Volume case + // For attachable volume case, check mount path directory if volume is still existing and mounted. + // Return true if volume is mounted. + if fsVolume { + if attachable != nil { + var isNotMount bool + var mountCheckErr error + if isNotMount, mountCheckErr = mounter.IsLikelyNotMountPoint(mountPath); mountCheckErr != nil { + return false, fmt.Errorf("Could not check whether the volume %q (spec.Name: %q) pod %q (UID: %q) is mounted with: %v", + uniqueVolumeName, + volumeName, + podName, + podUID, + mountCheckErr) + } + return !isNotMount, nil + } + // For non-attachable volume case, skip check and return true without mount point check + // since plugins may not have volume mount point. + return true, nil + } + + // Block Volume case + // Check mount path directory if volume still exists, then return true if volume + // is there. Either plugin is attachable or non-attachable, the plugin should + // have symbolic link associated to raw block device under pod device map + // if volume exists. blkutil := util.NewBlockVolumePathHandler() var islinkExist bool var checkErr error diff --git a/pkg/volume/util/operationexecutor/operation_executor_test.go b/pkg/volume/util/operationexecutor/operation_executor_test.go index 6aabd54b1fb..2453a6e630f 100644 --- a/pkg/volume/util/operationexecutor/operation_executor_test.go +++ b/pkg/volume/util/operationexecutor/operation_executor_test.go @@ -231,12 +231,14 @@ func TestOperationExecutor_VerifyControllerAttachedVolumeConcurrently(t *testing } } -func TestOperationExecutor_MapVolume_ConcurrentMapForNonAttachablePlugins(t *testing.T) { +func TestOperationExecutor_MountVolume_ConcurrentMountForNonAttachablePlugins_VolumeMode_Block(t *testing.T) { // Arrange ch, quit, oe := setup() volumesToMount := make([]VolumeToMount, numVolumesToMap) secretName := "secret-volume" volumeName := v1.UniqueVolumeName(secretName) + volumeMode := v1.PersistentVolumeBlock + tmpSpec := &volume.Spec{PersistentVolume: &v1.PersistentVolume{Spec: v1.PersistentVolumeSpec{VolumeMode: &volumeMode}}} // Act for i := range volumesToMount { @@ -247,8 +249,9 @@ func TestOperationExecutor_MapVolume_ConcurrentMapForNonAttachablePlugins(t *tes VolumeName: volumeName, PluginIsAttachable: false, // this field determines whether the plugin is attachable ReportedInUse: true, + VolumeSpec: tmpSpec, } - oe.MapVolume(0 /* waitForAttachTimeOut */, volumesToMount[i], nil /* actualStateOfWorldMounterUpdater */) + oe.MountVolume(0 /* waitForAttachTimeOut */, volumesToMount[i], nil /* actualStateOfWorldMounterUpdater */, false) } // Assert @@ -257,12 +260,14 @@ func TestOperationExecutor_MapVolume_ConcurrentMapForNonAttachablePlugins(t *tes } } -func TestOperationExecutor_MapVolume_ConcurrentMapForAttachablePlugins(t *testing.T) { +func TestOperationExecutor_MountVolume_ConcurrentMountForAttachablePlugins_VolumeMode_Block(t *testing.T) { // Arrange ch, quit, oe := setup() volumesToMount := make([]VolumeToMount, numVolumesToAttach) pdName := "pd-volume" volumeName := v1.UniqueVolumeName(pdName) + volumeMode := v1.PersistentVolumeBlock + tmpSpec := &volume.Spec{PersistentVolume: &v1.PersistentVolume{Spec: v1.PersistentVolumeSpec{VolumeMode: &volumeMode}}} // Act for i := range volumesToMount { @@ -273,8 +278,9 @@ func TestOperationExecutor_MapVolume_ConcurrentMapForAttachablePlugins(t *testin VolumeName: volumeName, PluginIsAttachable: true, // this field determines whether the plugin is attachable ReportedInUse: true, + VolumeSpec: tmpSpec, } - oe.MapVolume(0 /* waitForAttachTimeout */, volumesToMount[i], nil /* actualStateOfWorldMounterUpdater */) + oe.MountVolume(0 /* waitForAttachTimeout */, volumesToMount[i], nil /* actualStateOfWorldMounterUpdater */, false) } // Assert @@ -283,12 +289,14 @@ func TestOperationExecutor_MapVolume_ConcurrentMapForAttachablePlugins(t *testin } } -func TestOperationExecutor_UnmapVolume_ConcurrentUnmapForAllPlugins(t *testing.T) { +func TestOperationExecutor_UnmountVolume_ConcurrentUnmountForAllPlugins_VolumeMode_Block(t *testing.T) { // Arrange ch, quit, oe := setup() volumesToUnmount := make([]MountedVolume, numAttachableVolumesToUnmap+numNonAttachableVolumesToUnmap) pdName := "pd-volume" secretName := "secret-volume" + volumeMode := v1.PersistentVolumeBlock + tmpSpec := &volume.Spec{PersistentVolume: &v1.PersistentVolume{Spec: v1.PersistentVolumeSpec{VolumeMode: &volumeMode}}} // Act for i := 0; i < numNonAttachableVolumesToUnmap+numAttachableVolumesToUnmap; i++ { @@ -299,6 +307,7 @@ func TestOperationExecutor_UnmapVolume_ConcurrentUnmapForAllPlugins(t *testing.T PodName: volumetypes.UniquePodName(podName), VolumeName: v1.UniqueVolumeName(secretName), PodUID: pod.UID, + VolumeSpec: tmpSpec, } } else { pod := getTestPodWithGCEPD(podName, pdName) @@ -306,9 +315,10 @@ func TestOperationExecutor_UnmapVolume_ConcurrentUnmapForAllPlugins(t *testing.T PodName: volumetypes.UniquePodName(podName), VolumeName: v1.UniqueVolumeName(pdName), PodUID: pod.UID, + VolumeSpec: tmpSpec, } } - oe.UnmapVolume(volumesToUnmount[i], nil /* actualStateOfWorldMounterUpdater */) + oe.UnmountVolume(volumesToUnmount[i], nil /* actualStateOfWorldMounterUpdater */) } // Assert @@ -317,19 +327,22 @@ func TestOperationExecutor_UnmapVolume_ConcurrentUnmapForAllPlugins(t *testing.T } } -func TestOperationExecutor_UnmapDeviceConcurrently(t *testing.T) { +func TestOperationExecutor_UnmountDeviceConcurrently_VolumeMode_Block(t *testing.T) { // Arrange ch, quit, oe := setup() attachedVolumes := make([]AttachedVolume, numDevicesToUnmap) pdName := "pd-volume" + volumeMode := v1.PersistentVolumeBlock + tmpSpec := &volume.Spec{PersistentVolume: &v1.PersistentVolume{Spec: v1.PersistentVolumeSpec{VolumeMode: &volumeMode}}} // Act for i := range attachedVolumes { attachedVolumes[i] = AttachedVolume{ VolumeName: v1.UniqueVolumeName(pdName), NodeName: "node-name", + VolumeSpec: tmpSpec, } - oe.UnmapDevice(attachedVolumes[i], nil /* actualStateOfWorldMounterUpdater */, nil /* mount.Interface */) + oe.UnmountDevice(attachedVolumes[i], nil /* actualStateOfWorldMounterUpdater */, nil /* mount.Interface */) } // Assert diff --git a/pkg/volume/util/operationexecutor/operation_generator.go b/pkg/volume/util/operationexecutor/operation_generator.go index 848c3b87087..71d71ae5ede 100644 --- a/pkg/volume/util/operationexecutor/operation_generator.go +++ b/pkg/volume/util/operationexecutor/operation_generator.go @@ -1017,10 +1017,10 @@ func (og *operationGenerator) GenerateUnmapDeviceFunc( blockVolumePlugin, err := og.volumePluginMgr.FindMapperPluginByName(deviceToDetach.PluginName) if err != nil { - return volumetypes.GeneratedOperations{}, deviceToDetach.GenerateErrorDetailed("UnmapDevice.FindMapperPluginBySpec failed", err) + return volumetypes.GeneratedOperations{}, deviceToDetach.GenerateErrorDetailed("UnmapDevice.FindMapperPluginByName failed", err) } if blockVolumePlugin == nil { - return volumetypes.GeneratedOperations{}, deviceToDetach.GenerateErrorDetailed("UnmapDevice.FindMapperPluginBySpec failed to find BlockVolumeMapper plugin. Volume plugin is nil.", nil) + return volumetypes.GeneratedOperations{}, deviceToDetach.GenerateErrorDetailed("UnmapDevice.FindMapperPluginByName failed to find BlockVolumeMapper plugin. Volume plugin is nil.", nil) } blockVolumeUnmapper, newUnmapperErr := blockVolumePlugin.NewBlockVolumeUnmapper( diff --git a/pkg/volume/util/volumehelper/volumehelper.go b/pkg/volume/util/volumehelper/volumehelper.go index 74b14be5de8..d8c83540123 100644 --- a/pkg/volume/util/volumehelper/volumehelper.go +++ b/pkg/volume/util/volumehelper/volumehelper.go @@ -23,6 +23,8 @@ import ( "strings" "k8s.io/api/core/v1" + utilfeature "k8s.io/apiserver/pkg/util/feature" + "k8s.io/kubernetes/pkg/features" "k8s.io/kubernetes/pkg/util/mount" "k8s.io/kubernetes/pkg/volume" "k8s.io/kubernetes/pkg/volume/util/types" @@ -157,3 +159,18 @@ func GetPersistentVolumeClaimVolumeMode(claim *v1.PersistentVolumeClaim) (v1.Per } return "", fmt.Errorf("cannot get volumeMode from pvc: %v", claim.Name) } + +// CheckVolumeModeFilesystem checks VolumeMode. +// If the mode is Filesystem, return true otherwise return false. +func CheckVolumeModeFilesystem(volumeSpec *volume.Spec) (bool, error) { + if utilfeature.DefaultFeatureGate.Enabled(features.BlockVolume) { + volumeMode, err := GetVolumeMode(volumeSpec) + if err != nil { + return true, err + } + if volumeMode == v1.PersistentVolumeBlock { + return false, nil + } + } + return true, nil +} From bc86537f184da3b6344816578c05b73a9a8ab5dc Mon Sep 17 00:00:00 2001 From: mtanino Date: Fri, 9 Feb 2018 14:09:32 -0500 Subject: [PATCH 2/2] Autogenerated files --- pkg/volume/util/volumehelper/BUILD | 2 ++ 1 file changed, 2 insertions(+) diff --git a/pkg/volume/util/volumehelper/BUILD b/pkg/volume/util/volumehelper/BUILD index 3a57c7dbdfb..507792a9636 100644 --- a/pkg/volume/util/volumehelper/BUILD +++ b/pkg/volume/util/volumehelper/BUILD @@ -10,10 +10,12 @@ go_library( srcs = ["volumehelper.go"], importpath = "k8s.io/kubernetes/pkg/volume/util/volumehelper", deps = [ + "//pkg/features:go_default_library", "//pkg/util/mount:go_default_library", "//pkg/volume:go_default_library", "//pkg/volume/util/types:go_default_library", "//vendor/k8s.io/api/core/v1:go_default_library", + "//vendor/k8s.io/apiserver/pkg/util/feature:go_default_library", ], )