Merge pull request #56454 from mtanino/volumehandler-refactor

Automatic merge from submit-queue (batch tested with PRs 59767, 56454, 59237, 59730, 55479). If you want to cherry-pick this change to another branch, please follow the instructions <a href="https://github.com/kubernetes/community/blob/master/contributors/devel/cherry-picks.md">here</a>.

Block Volume: Refactor volumehandler in operationexecutor

**What this PR does / why we need it**:

Based on discussion with @saad-ali at #51494, we need refactor volumehandler in operationexecutor
for Block Volume feature. We don't need to add volumehandler as separated object.

```
VolumeHandler does not need to be a separate object that is constructed inline like this. 
You can create a new operation, e.g. UnmountOperation to which you pass the spec,
and it can return either a UnmountVolume or UnmapVolume.
```

**Which issue(s) this PR fixes** : no related issue.

**Special notes for your reviewer**:

@saad-ali @msau42 

**Release note**:

```release-note
NONE
```
This commit is contained in:
Kubernetes Submit Queue 2018-02-12 15:44:33 -08:00 committed by GitHub
commit 4afdc33c57
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 231 additions and 314 deletions

View File

@ -166,12 +166,10 @@ func (rc *reconciler) reconcile() {
// Ensure volumes that should be unmounted are unmounted.
for _, mountedVolume := range rc.actualStateOfWorld.GetMountedVolumes() {
if !rc.desiredStateOfWorld.PodExistsInVolume(mountedVolume.PodName, mountedVolume.VolumeName) {
volumeHandler, err := operationexecutor.NewVolumeHandler(mountedVolume.VolumeSpec, rc.operationExecutor)
if err != nil {
glog.Errorf(mountedVolume.GenerateErrorDetailed(fmt.Sprintf("operationExecutor.NewVolumeHandler for UnmountVolume failed"), err).Error())
continue
}
err = volumeHandler.UnmountVolumeHandler(mountedVolume.MountedVolume, rc.actualStateOfWorld)
// Volume is mounted, unmount it
glog.V(12).Infof(mountedVolume.GenerateMsgDetailed("Starting operationExecutor.UnmountVolume", ""))
err := rc.operationExecutor.UnmountVolume(
mountedVolume.MountedVolume, rc.actualStateOfWorld)
if err != nil &&
!nestedpendingoperations.IsAlreadyExists(err) &&
!exponentialbackoff.IsExponentialBackoff(err) {
@ -236,12 +234,12 @@ func (rc *reconciler) reconcile() {
if isRemount {
remountingLogStr = "Volume is already mounted to pod, but remount was requested."
}
volumeHandler, err := operationexecutor.NewVolumeHandler(volumeToMount.VolumeSpec, rc.operationExecutor)
if err != nil {
glog.Errorf(volumeToMount.GenerateErrorDetailed(fmt.Sprintf("operationExecutor.NewVolumeHandler for MountVolume failed"), err).Error())
continue
}
err = volumeHandler.MountVolumeHandler(rc.waitForAttachTimeout, volumeToMount.VolumeToMount, rc.actualStateOfWorld, isRemount, remountingLogStr)
glog.V(12).Infof(volumeToMount.GenerateMsgDetailed("Starting operationExecutor.MountVolume", remountingLogStr))
err := rc.operationExecutor.MountVolume(
rc.waitForAttachTimeout,
volumeToMount.VolumeToMount,
rc.actualStateOfWorld,
isRemount)
if err != nil &&
!nestedpendingoperations.IsAlreadyExists(err) &&
!exponentialbackoff.IsExponentialBackoff(err) {
@ -265,12 +263,10 @@ func (rc *reconciler) reconcile() {
if !rc.desiredStateOfWorld.VolumeExists(attachedVolume.VolumeName) &&
!rc.operationExecutor.IsOperationPending(attachedVolume.VolumeName, nestedpendingoperations.EmptyUniquePodName) {
if attachedVolume.GloballyMounted {
volumeHandler, err := operationexecutor.NewVolumeHandler(attachedVolume.VolumeSpec, rc.operationExecutor)
if err != nil {
glog.Errorf(attachedVolume.GenerateErrorDetailed(fmt.Sprintf("operationExecutor.NewVolumeHandler for UnmountDevice failed"), err).Error())
continue
}
err = volumeHandler.UnmountDeviceHandler(attachedVolume.AttachedVolume, rc.actualStateOfWorld, rc.mounter)
// Volume is globally mounted to device, unmount it
glog.V(12).Infof(attachedVolume.GenerateMsgDetailed("Starting operationExecutor.UnmountDevice", ""))
err := rc.operationExecutor.UnmountDevice(
attachedVolume.AttachedVolume, rc.actualStateOfWorld, rc.mounter)
if err != nil &&
!nestedpendingoperations.IsAlreadyExists(err) &&
!exponentialbackoff.IsExponentialBackoff(err) {
@ -403,14 +399,9 @@ func (rc *reconciler) cleanupMounts(volume podVolume) {
PluginName: volume.pluginName,
PodUID: types.UID(volume.podName),
}
volumeHandler, err := operationexecutor.NewVolumeHandlerWithMode(volume.volumeMode, rc.operationExecutor)
if err != nil {
glog.Errorf(mountedVolume.GenerateErrorDetailed(fmt.Sprintf("operationExecutor.NewVolumeHandler for UnmountVolume failed"), err).Error())
return
}
// TODO: Currently cleanupMounts only includes UnmountVolume operation. In the next PR, we will add
// to unmount both volume and device in the same routine.
err = volumeHandler.UnmountVolumeHandler(mountedVolume, rc.actualStateOfWorld)
err := rc.operationExecutor.UnmountVolume(mountedVolume, rc.actualStateOfWorld)
if err != nil {
glog.Errorf(mountedVolume.GenerateErrorDetailed(fmt.Sprintf("volumeHandler.UnmountVolumeHandler for UnmountVolume failed"), err).Error())
return
@ -435,15 +426,12 @@ func (rc *reconciler) reconstructVolume(volume podVolume) (*reconstructedVolume,
UID: types.UID(volume.podName),
},
}
volumeHandler, err := operationexecutor.NewVolumeHandlerWithMode(volume.volumeMode, rc.operationExecutor)
if err != nil {
return nil, err
}
mapperPlugin, err := rc.volumePluginMgr.FindMapperPluginByName(volume.pluginName)
if err != nil {
return nil, err
}
volumeSpec, err := volumeHandler.ReconstructVolumeHandler(
volumeSpec, err := rc.operationExecutor.ReconstructVolumeOperation(
volume.volumeMode,
plugin,
mapperPlugin,
pod.UID,
@ -466,7 +454,7 @@ func (rc *reconciler) reconstructVolume(volume podVolume) (*reconstructedVolume,
uniqueVolumeName = volumehelper.GetUniqueVolumeNameForNonAttachableVolume(volume.podName, plugin, volumeSpec)
}
// Check existence of mount point for filesystem volume or symbolic link for block volume
isExist, checkErr := volumeHandler.CheckVolumeExistence(volume.mountPath, volumeSpec.Name(), rc.mounter, uniqueVolumeName, volume.podName, pod.UID, attachablePlugin)
isExist, checkErr := rc.operationExecutor.CheckVolumeExistenceOperation(volumeSpec, volume.mountPath, volumeSpec.Name(), rc.mounter, uniqueVolumeName, volume.podName, pod.UID, attachablePlugin)
if checkErr != nil {
return nil, err
}

View File

@ -835,6 +835,8 @@ func Test_GenerateMapVolumeFunc_Plugin_Not_Found(t *testing.T) {
},
}
// Enable BlockVolume feature gate
utilfeature.DefaultFeatureGate.Set("BlockVolume=true")
for name, tc := range testCases {
t.Run(name, func(t *testing.T) {
volumePluginMgr := &volume.VolumePluginMgr{}
@ -854,14 +856,20 @@ func Test_GenerateMapVolumeFunc_Plugin_Not_Found(t *testing.T) {
},
Spec: v1.PodSpec{},
}
volumeToMount := operationexecutor.VolumeToMount{Pod: pod, VolumeSpec: &volume.Spec{}}
err := oex.MapVolume(waitForAttachTimeout, volumeToMount, asw)
volumeMode := v1.PersistentVolumeBlock
tmpSpec := &volume.Spec{PersistentVolume: &v1.PersistentVolume{Spec: v1.PersistentVolumeSpec{VolumeMode: &volumeMode}}}
volumeToMount := operationexecutor.VolumeToMount{
Pod: pod,
VolumeSpec: tmpSpec}
err := oex.MountVolume(waitForAttachTimeout, volumeToMount, asw, false)
// Assert
if assert.Error(t, err) {
assert.Contains(t, err.Error(), tc.expectedErrMsg)
}
})
}
// Rollback feature gate to false.
utilfeature.DefaultFeatureGate.Set("BlockVolume=false")
}
func Test_GenerateUnmapVolumeFunc_Plugin_Not_Found(t *testing.T) {
@ -882,6 +890,8 @@ func Test_GenerateUnmapVolumeFunc_Plugin_Not_Found(t *testing.T) {
},
}
// Enable BlockVolume feature gate
utilfeature.DefaultFeatureGate.Set("BlockVolume=true")
for name, tc := range testCases {
t.Run(name, func(t *testing.T) {
volumePluginMgr := &volume.VolumePluginMgr{}
@ -893,14 +903,20 @@ func Test_GenerateUnmapVolumeFunc_Plugin_Not_Found(t *testing.T) {
nil, /* fakeRecorder */
false, /* checkNodeCapabilitiesBeforeMount */
nil))
volumeToUnmount := operationexecutor.MountedVolume{PluginName: "fake-file-plugin"}
err := oex.UnmapVolume(volumeToUnmount, asw)
volumeMode := v1.PersistentVolumeBlock
tmpSpec := &volume.Spec{PersistentVolume: &v1.PersistentVolume{Spec: v1.PersistentVolumeSpec{VolumeMode: &volumeMode}}}
volumeToUnmount := operationexecutor.MountedVolume{
PluginName: "fake-file-plugin",
VolumeSpec: tmpSpec}
err := oex.UnmountVolume(volumeToUnmount, asw)
// Assert
if assert.Error(t, err) {
assert.Contains(t, err.Error(), tc.expectedErrMsg)
}
})
}
// Rollback feature gate to false.
utilfeature.DefaultFeatureGate.Set("BlockVolume=false")
}
func Test_GenerateUnmapDeviceFunc_Plugin_Not_Found(t *testing.T) {
@ -912,15 +928,17 @@ func Test_GenerateUnmapDeviceFunc_Plugin_Not_Found(t *testing.T) {
"volumePlugin is nil": {
volumePlugins: []volume.VolumePlugin{},
expectErr: true,
expectedErrMsg: "UnmapDevice.FindMapperPluginBySpec failed",
expectedErrMsg: "UnmapDevice.FindMapperPluginByName failed",
},
"blockVolumePlugin is nil": {
volumePlugins: volumetesting.NewFakeFileVolumePlugin(),
expectErr: true,
expectedErrMsg: "UnmapDevice.FindMapperPluginBySpec failed to find BlockVolumeMapper plugin. Volume plugin is nil.",
expectedErrMsg: "UnmapDevice.FindMapperPluginByName failed to find BlockVolumeMapper plugin. Volume plugin is nil.",
},
}
// Enable BlockVolume feature gate
utilfeature.DefaultFeatureGate.Set("BlockVolume=true")
for name, tc := range testCases {
t.Run(name, func(t *testing.T) {
volumePluginMgr := &volume.VolumePluginMgr{}
@ -933,15 +951,18 @@ func Test_GenerateUnmapDeviceFunc_Plugin_Not_Found(t *testing.T) {
false, /* checkNodeCapabilitiesBeforeMount */
nil))
var mounter mount.Interface
plugins := volumetesting.NewFakeFileVolumePlugin()
deviceToDetach := operationexecutor.AttachedVolume{VolumeSpec: &volume.Spec{}, PluginName: plugins[0].GetPluginName()}
err := oex.UnmapDevice(deviceToDetach, asw, mounter)
volumeMode := v1.PersistentVolumeBlock
tmpSpec := &volume.Spec{PersistentVolume: &v1.PersistentVolume{Spec: v1.PersistentVolumeSpec{VolumeMode: &volumeMode}}}
deviceToDetach := operationexecutor.AttachedVolume{VolumeSpec: tmpSpec, PluginName: "fake-file-plugin"}
err := oex.UnmountDevice(deviceToDetach, asw, mounter)
// Assert
if assert.Error(t, err) {
assert.Contains(t, err.Error(), tc.expectedErrMsg)
}
})
}
// Rollback feature gate to false.
utilfeature.DefaultFeatureGate.Set("BlockVolume=false")
}
func waitForMount(

View File

@ -28,9 +28,7 @@ import (
"k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/types"
utilfeature "k8s.io/apiserver/pkg/util/feature"
expandcache "k8s.io/kubernetes/pkg/controller/volume/expand/cache"
"k8s.io/kubernetes/pkg/features"
"k8s.io/kubernetes/pkg/util/mount"
"k8s.io/kubernetes/pkg/volume"
"k8s.io/kubernetes/pkg/volume/util"
@ -83,7 +81,8 @@ type OperationExecutor interface {
// Status.VolumesInUse list (operation fails with error if it is).
DetachVolume(volumeToDetach AttachedVolume, verifySafeToDetach bool, actualStateOfWorld ActualStateOfWorldAttacherUpdater) error
// MountVolume mounts the volume to the pod specified in volumeToMount.
// If a volume has 'Filesystem' volumeMode, MountVolume mounts the
// volume to the pod specified in volumeToMount.
// Specifically it will:
// * Wait for the device to finish attaching (for attachable volumes only).
// * Mount device to global mount path (for attachable volumes only).
@ -95,38 +94,36 @@ type OperationExecutor interface {
// The parameter "isRemount" is informational and used to adjust logging
// verbosity. An initial mount is more log-worthy than a remount, for
// example.
MountVolume(waitForAttachTimeout time.Duration, volumeToMount VolumeToMount, actualStateOfWorld ActualStateOfWorldMounterUpdater, isRemount bool) error
// UnmountVolume unmounts the volume from the pod specified in
// volumeToUnmount and updates the actual state of the world to reflect that.
UnmountVolume(volumeToUnmount MountedVolume, actualStateOfWorld ActualStateOfWorldMounterUpdater) error
// UnmountDevice unmounts the volumes global mount path from the device (for
// attachable volumes only, freeing it for detach. It then updates the
// actual state of the world to reflect that.
UnmountDevice(deviceToDetach AttachedVolume, actualStateOfWorld ActualStateOfWorldMounterUpdater, mounter mount.Interface) error
// MapVolume is used when the volumeMode is 'Block'.
// This method creates a symbolic link to the volume from both the pod
// specified in volumeToMount and global map path.
//
// For 'Block' volumeMode, this method creates a symbolic link to
// the volume from both the pod specified in volumeToMount and global map path.
// Specifically it will:
// * Wait for the device to finish attaching (for attachable volumes only).
// * Update actual state of world to reflect volume is globally mounted/mapped.
// * Map volume to global map path using symbolic link.
// * Map the volume to the pod device map path using symbolic link.
// * Update actual state of world to reflect volume is mounted/mapped to the pod path.
MapVolume(waitForAttachTimeout time.Duration, volumeToMount VolumeToMount, actualStateOfWorld ActualStateOfWorldMounterUpdater) error
MountVolume(waitForAttachTimeout time.Duration, volumeToMount VolumeToMount, actualStateOfWorld ActualStateOfWorldMounterUpdater, isRemount bool) error
// UnmapVolume unmaps symbolic link to the volume from both the pod device
// map path in volumeToUnmount and global map path.
// If a volume has 'Filesystem' volumeMode, UnmountVolume unmounts the
// volume from the pod specified in volumeToUnmount and updates the actual
// state of the world to reflect that.
//
// For 'Block' volumeMode, this method unmaps symbolic link to the volume
// from both the pod device map path in volumeToUnmount and global map path.
// And then, updates the actual state of the world to reflect that.
UnmapVolume(volumeToUnmount MountedVolume, actualStateOfWorld ActualStateOfWorldMounterUpdater) error
UnmountVolume(volumeToUnmount MountedVolume, actualStateOfWorld ActualStateOfWorldMounterUpdater) error
// UnmapDevice checks number of symbolic links under global map path.
// If number of reference is zero, remove global map path directory and
// free a volume for detach.
// If a volume has 'Filesystem' volumeMode, UnmountDevice unmounts the
// volumes global mount path from the device (for attachable volumes only,
// freeing it for detach. It then updates the actual state of the world to
// reflect that.
//
// For 'Block' volumeMode, this method checks number of symbolic links under
// global map path. If number of reference is zero, remove global map path
// directory and free a volume for detach.
// It then updates the actual state of the world to reflect that.
UnmapDevice(deviceToDetach AttachedVolume, actualStateOfWorld ActualStateOfWorldMounterUpdater, mounter mount.Interface) error
UnmountDevice(deviceToDetach AttachedVolume, actualStateOfWorld ActualStateOfWorldMounterUpdater, mounter mount.Interface) error
// VerifyControllerAttachedVolume checks if the specified volume is present
// in the specified nodes AttachedVolumes Status field. It uses kubeClient
@ -145,6 +142,10 @@ type OperationExecutor interface {
IsOperationPending(volumeName v1.UniqueVolumeName, podName volumetypes.UniquePodName) bool
// Expand Volume will grow size available to PVC
ExpandVolume(*expandcache.PVCWithResizeRequest, expandcache.VolumeResizeMap) error
// ReconstructVolumeOperation construct a new volumeSpec and returns it created by plugin
ReconstructVolumeOperation(volumeMode v1.PersistentVolumeMode, plugin volume.VolumePlugin, mapperPlugin volume.BlockVolumePlugin, uid types.UID, podName volumetypes.UniquePodName, volumeSpecName string, mountPath string, pluginName string) (*volume.Spec, error)
// CheckVolumeExistenceOperation checks volume existence
CheckVolumeExistenceOperation(volumeSpec *volume.Spec, mountPath, volumeName string, mounter mount.Interface, uniqueVolumeName v1.UniqueVolumeName, podName volumetypes.UniquePodName, podUID types.UID, attachable volume.AttachableVolumePlugin) (bool, error)
}
// NewOperationExecutor returns a new instance of OperationExecutor.
@ -707,13 +708,30 @@ func (oe *operationExecutor) MountVolume(
volumeToMount VolumeToMount,
actualStateOfWorld ActualStateOfWorldMounterUpdater,
isRemount bool) error {
generatedOperations, err := oe.operationGenerator.GenerateMountVolumeFunc(
waitForAttachTimeout, volumeToMount, actualStateOfWorld, isRemount)
fsVolume, err := volumehelper.CheckVolumeModeFilesystem(volumeToMount.VolumeSpec)
if err != nil {
return err
}
var generatedOperations volumetypes.GeneratedOperations
if fsVolume {
// Filesystem volume case
// Mount/remount a volume when a volume is attached
generatedOperations, err = oe.operationGenerator.GenerateMountVolumeFunc(
waitForAttachTimeout, volumeToMount, actualStateOfWorld, isRemount)
} else {
// Block volume case
// Creates a map to device if a volume is attached
generatedOperations, err = oe.operationGenerator.GenerateMapVolumeFunc(
waitForAttachTimeout, volumeToMount, actualStateOfWorld)
}
if err != nil {
return err
}
// Avoid executing mount/map from multiple pods referencing the
// same volume in parallel
podName := nestedpendingoperations.EmptyUniquePodName
// TODO: remove this -- not necessary
if !volumeToMount.PluginIsAttachable {
// Non-attachable volume plugins can execute mount for multiple pods
@ -729,14 +747,26 @@ func (oe *operationExecutor) MountVolume(
func (oe *operationExecutor) UnmountVolume(
volumeToUnmount MountedVolume,
actualStateOfWorld ActualStateOfWorldMounterUpdater) error {
generatedOperations, err :=
oe.operationGenerator.GenerateUnmountVolumeFunc(volumeToUnmount, actualStateOfWorld)
fsVolume, err := volumehelper.CheckVolumeModeFilesystem(volumeToUnmount.VolumeSpec)
if err != nil {
return err
}
// All volume plugins can execute mount for multiple pods referencing the
var generatedOperations volumetypes.GeneratedOperations
if fsVolume {
// Filesystem volume case
// Unmount a volume if a volume is mounted
generatedOperations, err = oe.operationGenerator.GenerateUnmountVolumeFunc(
volumeToUnmount, actualStateOfWorld)
} else {
// Block volume case
// Unmap a volume if a volume is mapped
generatedOperations, err = oe.operationGenerator.GenerateUnmapVolumeFunc(
volumeToUnmount, actualStateOfWorld)
}
if err != nil {
return err
}
// All volume plugins can execute unmount/unmap for multiple pods referencing the
// same volume in parallel
podName := volumetypes.UniquePodName(volumeToUnmount.PodUID)
@ -748,14 +778,31 @@ func (oe *operationExecutor) UnmountDevice(
deviceToDetach AttachedVolume,
actualStateOfWorld ActualStateOfWorldMounterUpdater,
mounter mount.Interface) error {
generatedOperations, err :=
oe.operationGenerator.GenerateUnmountDeviceFunc(deviceToDetach, actualStateOfWorld, mounter)
fsVolume, err := volumehelper.CheckVolumeModeFilesystem(deviceToDetach.VolumeSpec)
if err != nil {
return err
}
var generatedOperations volumetypes.GeneratedOperations
if fsVolume {
// Filesystem volume case
// Unmount and detach a device if a volume isn't referenced
generatedOperations, err = oe.operationGenerator.GenerateUnmountDeviceFunc(
deviceToDetach, actualStateOfWorld, mounter)
} else {
// Block volume case
// Detach a device and remove loopback if a volume isn't referenced
generatedOperations, err = oe.operationGenerator.GenerateUnmapDeviceFunc(
deviceToDetach, actualStateOfWorld, mounter)
}
if err != nil {
return err
}
// Avoid executing unmount/unmap device from multiple pods referencing
// the same volume in parallel
podName := nestedpendingoperations.EmptyUniquePodName
return oe.pendingOperations.Run(
deviceToDetach.VolumeName, "" /* podName */, generatedOperations)
deviceToDetach.VolumeName, podName, generatedOperations)
}
func (oe *operationExecutor) ExpandVolume(pvcWithResizeRequest *expandcache.PVCWithResizeRequest, resizeMap expandcache.VolumeResizeMap) error {
@ -769,65 +816,6 @@ func (oe *operationExecutor) ExpandVolume(pvcWithResizeRequest *expandcache.PVCW
return oe.pendingOperations.Run(uniqueVolumeKey, "", generatedOperations)
}
func (oe *operationExecutor) MapVolume(
waitForAttachTimeout time.Duration,
volumeToMount VolumeToMount,
actualStateOfWorld ActualStateOfWorldMounterUpdater) error {
generatedOperations, err := oe.operationGenerator.GenerateMapVolumeFunc(
waitForAttachTimeout, volumeToMount, actualStateOfWorld)
if err != nil {
return err
}
// Avoid executing map from multiple pods referencing the
// same volume in parallel
podName := nestedpendingoperations.EmptyUniquePodName
// TODO: remove this -- not necessary
if !volumeToMount.PluginIsAttachable {
// Non-attachable volume plugins can execute mount for multiple pods
// referencing the same volume in parallel
podName = volumehelper.GetUniquePodName(volumeToMount.Pod)
}
return oe.pendingOperations.Run(
volumeToMount.VolumeName, podName, generatedOperations)
}
func (oe *operationExecutor) UnmapVolume(
volumeToUnmount MountedVolume,
actualStateOfWorld ActualStateOfWorldMounterUpdater) error {
generatedOperations, err :=
oe.operationGenerator.GenerateUnmapVolumeFunc(volumeToUnmount, actualStateOfWorld)
if err != nil {
return err
}
// All volume plugins can execute unmap for multiple pods referencing the
// same volume in parallel
podName := volumetypes.UniquePodName(volumeToUnmount.PodUID)
return oe.pendingOperations.Run(
volumeToUnmount.VolumeName, podName, generatedOperations)
}
func (oe *operationExecutor) UnmapDevice(
deviceToDetach AttachedVolume,
actualStateOfWorld ActualStateOfWorldMounterUpdater,
mounter mount.Interface) error {
generatedOperations, err :=
oe.operationGenerator.GenerateUnmapDeviceFunc(deviceToDetach, actualStateOfWorld, mounter)
if err != nil {
return err
}
// Avoid executing unmap device from multiple pods referencing
// the same volume in parallel
podName := nestedpendingoperations.EmptyUniquePodName
return oe.pendingOperations.Run(
deviceToDetach.VolumeName, podName, generatedOperations)
}
func (oe *operationExecutor) VerifyControllerAttachedVolume(
volumeToMount VolumeToMount,
nodeName types.NodeName,
@ -842,177 +830,30 @@ func (oe *operationExecutor) VerifyControllerAttachedVolume(
volumeToMount.VolumeName, "" /* podName */, generatedOperations)
}
// VolumeStateHandler defines a set of operations for handling mount/unmount/detach/reconstruct volume-related operations
type VolumeStateHandler interface {
// Volume is attached, mount/map it
MountVolumeHandler(waitForAttachTimeout time.Duration, volumeToMount VolumeToMount, actualStateOfWorld ActualStateOfWorldMounterUpdater, isRemount bool, remountingLogStr string) error
// Volume is mounted/mapped, unmount/unmap it
UnmountVolumeHandler(mountedVolume MountedVolume, actualStateOfWorld ActualStateOfWorldMounterUpdater) error
// Volume is not referenced from pod, unmount/unmap and detach it
UnmountDeviceHandler(attachedVolume AttachedVolume, actualStateOfWorld ActualStateOfWorldMounterUpdater, mounter mount.Interface) error
// Reconstruct volume from mount path
ReconstructVolumeHandler(plugin volume.VolumePlugin, mapperPlugin volume.BlockVolumePlugin, uid types.UID, podName volumetypes.UniquePodName, volumeSpecName string, mountPath string, pluginName string) (*volume.Spec, error)
// check mount path if volume still exists
CheckVolumeExistence(mountPath, volumeName string, mounter mount.Interface, uniqueVolumeName v1.UniqueVolumeName, podName volumetypes.UniquePodName, podUID types.UID, attachable volume.AttachableVolumePlugin) (bool, error)
}
// ReconstructVolumeOperation return a func to create volumeSpec from mount path
func (oe *operationExecutor) ReconstructVolumeOperation(
volumeMode v1.PersistentVolumeMode,
plugin volume.VolumePlugin,
mapperPlugin volume.BlockVolumePlugin,
uid types.UID,
podName volumetypes.UniquePodName,
volumeSpecName string,
mountPath string,
pluginName string) (*volume.Spec, error) {
// NewVolumeHandler return a new instance of volumeHandler depens on a volumeMode
func NewVolumeHandler(volumeSpec *volume.Spec, oe OperationExecutor) (VolumeStateHandler, error) {
// TODO: remove feature gate check after no longer needed
var volumeHandler VolumeStateHandler
if utilfeature.DefaultFeatureGate.Enabled(features.BlockVolume) {
volumeMode, err := volumehelper.GetVolumeMode(volumeSpec)
// Filesystem Volume case
if volumeMode == v1.PersistentVolumeFilesystem {
// Create volumeSpec from mount path
glog.V(12).Infof("Starting operationExecutor.ReconstructVolumepodName")
volumeSpec, err := plugin.ConstructVolumeSpec(volumeSpecName, mountPath)
if err != nil {
return nil, err
}
if volumeMode == v1.PersistentVolumeFilesystem {
volumeHandler = NewFilesystemVolumeHandler(oe)
} else {
volumeHandler = NewBlockVolumeHandler(oe)
}
} else {
volumeHandler = NewFilesystemVolumeHandler(oe)
return volumeSpec, nil
}
return volumeHandler, nil
}
// NewVolumeHandlerWithMode return a new instance of volumeHandler depens on a volumeMode
func NewVolumeHandlerWithMode(volumeMode v1.PersistentVolumeMode, oe OperationExecutor) (VolumeStateHandler, error) {
var volumeHandler VolumeStateHandler
if utilfeature.DefaultFeatureGate.Enabled(features.BlockVolume) {
if volumeMode == v1.PersistentVolumeFilesystem {
volumeHandler = NewFilesystemVolumeHandler(oe)
} else {
volumeHandler = NewBlockVolumeHandler(oe)
}
} else {
volumeHandler = NewFilesystemVolumeHandler(oe)
}
return volumeHandler, nil
}
// NewFilesystemVolumeHandler returns a new instance of FilesystemVolumeHandler.
func NewFilesystemVolumeHandler(operationExecutor OperationExecutor) FilesystemVolumeHandler {
return FilesystemVolumeHandler{
oe: operationExecutor}
}
// NewBlockVolumeHandler returns a new instance of BlockVolumeHandler.
func NewBlockVolumeHandler(operationExecutor OperationExecutor) BlockVolumeHandler {
return BlockVolumeHandler{
oe: operationExecutor}
}
// FilesystemVolumeHandler is VolumeHandler for Filesystem volume
type FilesystemVolumeHandler struct {
oe OperationExecutor
}
// BlockVolumeHandler is VolumeHandler for Block volume
type BlockVolumeHandler struct {
oe OperationExecutor
}
// MountVolumeHandler mount/remount a volume when a volume is attached
// This method is handler for filesystem volume
func (f FilesystemVolumeHandler) MountVolumeHandler(waitForAttachTimeout time.Duration, volumeToMount VolumeToMount, actualStateOfWorld ActualStateOfWorldMounterUpdater, isRemount bool, remountingLogStr string) error {
glog.V(12).Infof(volumeToMount.GenerateMsgDetailed("Starting operationExecutor.MountVolume", remountingLogStr))
err := f.oe.MountVolume(
waitForAttachTimeout,
volumeToMount,
actualStateOfWorld,
isRemount)
return err
}
// UnmountVolumeHandler unmount a volume if a volume is mounted
// This method is handler for filesystem volume
func (f FilesystemVolumeHandler) UnmountVolumeHandler(mountedVolume MountedVolume, actualStateOfWorld ActualStateOfWorldMounterUpdater) error {
glog.V(12).Infof(mountedVolume.GenerateMsgDetailed("Starting operationExecutor.UnmountVolume", ""))
err := f.oe.UnmountVolume(
mountedVolume,
actualStateOfWorld)
return err
}
// UnmountDeviceHandler unmount and detach a device if a volume isn't referenced
// This method is handler for filesystem volume
func (f FilesystemVolumeHandler) UnmountDeviceHandler(attachedVolume AttachedVolume, actualStateOfWorld ActualStateOfWorldMounterUpdater, mounter mount.Interface) error {
glog.V(12).Infof(attachedVolume.GenerateMsgDetailed("Starting operationExecutor.UnmountDevice", ""))
err := f.oe.UnmountDevice(
attachedVolume,
actualStateOfWorld,
mounter)
return err
}
// ReconstructVolumeHandler create volumeSpec from mount path
// This method is handler for filesystem volume
func (f FilesystemVolumeHandler) ReconstructVolumeHandler(plugin volume.VolumePlugin, _ volume.BlockVolumePlugin, _ types.UID, _ volumetypes.UniquePodName, volumeSpecName string, mountPath string, _ string) (*volume.Spec, error) {
glog.V(4).Infof("Starting operationExecutor.ReconstructVolumepodName volume spec name %s, mount path %s", volumeSpecName, mountPath)
volumeSpec, err := plugin.ConstructVolumeSpec(volumeSpecName, mountPath)
if err != nil {
return nil, err
}
return volumeSpec, nil
}
// CheckVolumeExistence checks mount path directory if volume still exists, return true if volume is there
// Also return true for non-attachable volume case without mount point check
// This method is handler for filesystem volume
func (f FilesystemVolumeHandler) CheckVolumeExistence(mountPath, volumeName string, mounter mount.Interface, uniqueVolumeName v1.UniqueVolumeName, podName volumetypes.UniquePodName, podUID types.UID, attachable volume.AttachableVolumePlugin) (bool, error) {
if attachable != nil {
var isNotMount bool
var mountCheckErr error
if isNotMount, mountCheckErr = mounter.IsLikelyNotMountPoint(mountPath); mountCheckErr != nil {
return false, fmt.Errorf("Could not check whether the volume %q (spec.Name: %q) pod %q (UID: %q) is mounted with: %v",
uniqueVolumeName,
volumeName,
podName,
podUID,
mountCheckErr)
}
return !isNotMount, nil
}
return true, nil
}
// MountVolumeHandler creates a map to device if a volume is attached
// This method is handler for block volume
func (b BlockVolumeHandler) MountVolumeHandler(waitForAttachTimeout time.Duration, volumeToMount VolumeToMount, actualStateOfWorld ActualStateOfWorldMounterUpdater, _ bool, _ string) error {
glog.V(12).Infof(volumeToMount.GenerateMsgDetailed("Starting operationExecutor.MapVolume", ""))
err := b.oe.MapVolume(
waitForAttachTimeout,
volumeToMount,
actualStateOfWorld)
return err
}
// UnmountVolumeHandler unmap a volume if a volume is mapped
// This method is handler for block volume
func (b BlockVolumeHandler) UnmountVolumeHandler(mountedVolume MountedVolume, actualStateOfWorld ActualStateOfWorldMounterUpdater) error {
glog.V(12).Infof(mountedVolume.GenerateMsgDetailed("Starting operationExecutor.UnmapVolume", ""))
err := b.oe.UnmapVolume(
mountedVolume,
actualStateOfWorld)
return err
}
// UnmountDeviceHandler detach a device and remove loopback if a volume isn't referenced
// This method is handler for block volume
func (b BlockVolumeHandler) UnmountDeviceHandler(attachedVolume AttachedVolume, actualStateOfWorld ActualStateOfWorldMounterUpdater, mounter mount.Interface) error {
glog.V(12).Infof(attachedVolume.GenerateMsgDetailed("Starting operationExecutor.UnmapDevice", ""))
err := b.oe.UnmapDevice(
attachedVolume,
actualStateOfWorld,
mounter)
return err
}
// ReconstructVolumeHandler create volumeSpec from mount path
// This method is handler for block volume
func (b BlockVolumeHandler) ReconstructVolumeHandler(_ volume.VolumePlugin, mapperPlugin volume.BlockVolumePlugin, uid types.UID, podName volumetypes.UniquePodName, volumeSpecName string, mountPath string, pluginName string) (*volume.Spec, error) {
// Block Volume case
// Create volumeSpec from mount path
glog.V(12).Infof("Starting operationExecutor.ReconstructVolume")
if mapperPlugin == nil {
return nil, fmt.Errorf("Could not find block volume plugin %q (spec.Name: %q) pod %q (UID: %q)",
@ -1031,12 +872,47 @@ func (b BlockVolumeHandler) ReconstructVolumeHandler(_ volume.VolumePlugin, mapp
return volumeSpec, nil
}
// CheckVolumeExistence checks mount path directory if volume still exists, then return
// true if volume is there. Either plugin is attachable or non-attachable, the plugin
// should have symbolic link associated to raw block device under pod device map
// if volume exists.
// This method is handler for block volume
func (b BlockVolumeHandler) CheckVolumeExistence(mountPath, volumeName string, mounter mount.Interface, uniqueVolumeName v1.UniqueVolumeName, podName volumetypes.UniquePodName, podUID types.UID, _ volume.AttachableVolumePlugin) (bool, error) {
// CheckVolumeExistenceOperation return a func() to check mount path directory if volume still exists
func (oe *operationExecutor) CheckVolumeExistenceOperation(
volumeSpec *volume.Spec,
mountPath, volumeName string,
mounter mount.Interface,
uniqueVolumeName v1.UniqueVolumeName,
podName volumetypes.UniquePodName,
podUID types.UID,
attachable volume.AttachableVolumePlugin) (bool, error) {
fsVolume, err := volumehelper.CheckVolumeModeFilesystem(volumeSpec)
if err != nil {
return false, err
}
// Filesystem Volume case
// For attachable volume case, check mount path directory if volume is still existing and mounted.
// Return true if volume is mounted.
if fsVolume {
if attachable != nil {
var isNotMount bool
var mountCheckErr error
if isNotMount, mountCheckErr = mounter.IsLikelyNotMountPoint(mountPath); mountCheckErr != nil {
return false, fmt.Errorf("Could not check whether the volume %q (spec.Name: %q) pod %q (UID: %q) is mounted with: %v",
uniqueVolumeName,
volumeName,
podName,
podUID,
mountCheckErr)
}
return !isNotMount, nil
}
// For non-attachable volume case, skip check and return true without mount point check
// since plugins may not have volume mount point.
return true, nil
}
// Block Volume case
// Check mount path directory if volume still exists, then return true if volume
// is there. Either plugin is attachable or non-attachable, the plugin should
// have symbolic link associated to raw block device under pod device map
// if volume exists.
blkutil := util.NewBlockVolumePathHandler()
var islinkExist bool
var checkErr error

View File

@ -231,12 +231,14 @@ func TestOperationExecutor_VerifyControllerAttachedVolumeConcurrently(t *testing
}
}
func TestOperationExecutor_MapVolume_ConcurrentMapForNonAttachablePlugins(t *testing.T) {
func TestOperationExecutor_MountVolume_ConcurrentMountForNonAttachablePlugins_VolumeMode_Block(t *testing.T) {
// Arrange
ch, quit, oe := setup()
volumesToMount := make([]VolumeToMount, numVolumesToMap)
secretName := "secret-volume"
volumeName := v1.UniqueVolumeName(secretName)
volumeMode := v1.PersistentVolumeBlock
tmpSpec := &volume.Spec{PersistentVolume: &v1.PersistentVolume{Spec: v1.PersistentVolumeSpec{VolumeMode: &volumeMode}}}
// Act
for i := range volumesToMount {
@ -247,8 +249,9 @@ func TestOperationExecutor_MapVolume_ConcurrentMapForNonAttachablePlugins(t *tes
VolumeName: volumeName,
PluginIsAttachable: false, // this field determines whether the plugin is attachable
ReportedInUse: true,
VolumeSpec: tmpSpec,
}
oe.MapVolume(0 /* waitForAttachTimeOut */, volumesToMount[i], nil /* actualStateOfWorldMounterUpdater */)
oe.MountVolume(0 /* waitForAttachTimeOut */, volumesToMount[i], nil /* actualStateOfWorldMounterUpdater */, false)
}
// Assert
@ -257,12 +260,14 @@ func TestOperationExecutor_MapVolume_ConcurrentMapForNonAttachablePlugins(t *tes
}
}
func TestOperationExecutor_MapVolume_ConcurrentMapForAttachablePlugins(t *testing.T) {
func TestOperationExecutor_MountVolume_ConcurrentMountForAttachablePlugins_VolumeMode_Block(t *testing.T) {
// Arrange
ch, quit, oe := setup()
volumesToMount := make([]VolumeToMount, numVolumesToAttach)
pdName := "pd-volume"
volumeName := v1.UniqueVolumeName(pdName)
volumeMode := v1.PersistentVolumeBlock
tmpSpec := &volume.Spec{PersistentVolume: &v1.PersistentVolume{Spec: v1.PersistentVolumeSpec{VolumeMode: &volumeMode}}}
// Act
for i := range volumesToMount {
@ -273,8 +278,9 @@ func TestOperationExecutor_MapVolume_ConcurrentMapForAttachablePlugins(t *testin
VolumeName: volumeName,
PluginIsAttachable: true, // this field determines whether the plugin is attachable
ReportedInUse: true,
VolumeSpec: tmpSpec,
}
oe.MapVolume(0 /* waitForAttachTimeout */, volumesToMount[i], nil /* actualStateOfWorldMounterUpdater */)
oe.MountVolume(0 /* waitForAttachTimeout */, volumesToMount[i], nil /* actualStateOfWorldMounterUpdater */, false)
}
// Assert
@ -283,12 +289,14 @@ func TestOperationExecutor_MapVolume_ConcurrentMapForAttachablePlugins(t *testin
}
}
func TestOperationExecutor_UnmapVolume_ConcurrentUnmapForAllPlugins(t *testing.T) {
func TestOperationExecutor_UnmountVolume_ConcurrentUnmountForAllPlugins_VolumeMode_Block(t *testing.T) {
// Arrange
ch, quit, oe := setup()
volumesToUnmount := make([]MountedVolume, numAttachableVolumesToUnmap+numNonAttachableVolumesToUnmap)
pdName := "pd-volume"
secretName := "secret-volume"
volumeMode := v1.PersistentVolumeBlock
tmpSpec := &volume.Spec{PersistentVolume: &v1.PersistentVolume{Spec: v1.PersistentVolumeSpec{VolumeMode: &volumeMode}}}
// Act
for i := 0; i < numNonAttachableVolumesToUnmap+numAttachableVolumesToUnmap; i++ {
@ -299,6 +307,7 @@ func TestOperationExecutor_UnmapVolume_ConcurrentUnmapForAllPlugins(t *testing.T
PodName: volumetypes.UniquePodName(podName),
VolumeName: v1.UniqueVolumeName(secretName),
PodUID: pod.UID,
VolumeSpec: tmpSpec,
}
} else {
pod := getTestPodWithGCEPD(podName, pdName)
@ -306,9 +315,10 @@ func TestOperationExecutor_UnmapVolume_ConcurrentUnmapForAllPlugins(t *testing.T
PodName: volumetypes.UniquePodName(podName),
VolumeName: v1.UniqueVolumeName(pdName),
PodUID: pod.UID,
VolumeSpec: tmpSpec,
}
}
oe.UnmapVolume(volumesToUnmount[i], nil /* actualStateOfWorldMounterUpdater */)
oe.UnmountVolume(volumesToUnmount[i], nil /* actualStateOfWorldMounterUpdater */)
}
// Assert
@ -317,19 +327,22 @@ func TestOperationExecutor_UnmapVolume_ConcurrentUnmapForAllPlugins(t *testing.T
}
}
func TestOperationExecutor_UnmapDeviceConcurrently(t *testing.T) {
func TestOperationExecutor_UnmountDeviceConcurrently_VolumeMode_Block(t *testing.T) {
// Arrange
ch, quit, oe := setup()
attachedVolumes := make([]AttachedVolume, numDevicesToUnmap)
pdName := "pd-volume"
volumeMode := v1.PersistentVolumeBlock
tmpSpec := &volume.Spec{PersistentVolume: &v1.PersistentVolume{Spec: v1.PersistentVolumeSpec{VolumeMode: &volumeMode}}}
// Act
for i := range attachedVolumes {
attachedVolumes[i] = AttachedVolume{
VolumeName: v1.UniqueVolumeName(pdName),
NodeName: "node-name",
VolumeSpec: tmpSpec,
}
oe.UnmapDevice(attachedVolumes[i], nil /* actualStateOfWorldMounterUpdater */, nil /* mount.Interface */)
oe.UnmountDevice(attachedVolumes[i], nil /* actualStateOfWorldMounterUpdater */, nil /* mount.Interface */)
}
// Assert

View File

@ -1017,10 +1017,10 @@ func (og *operationGenerator) GenerateUnmapDeviceFunc(
blockVolumePlugin, err :=
og.volumePluginMgr.FindMapperPluginByName(deviceToDetach.PluginName)
if err != nil {
return volumetypes.GeneratedOperations{}, deviceToDetach.GenerateErrorDetailed("UnmapDevice.FindMapperPluginBySpec failed", err)
return volumetypes.GeneratedOperations{}, deviceToDetach.GenerateErrorDetailed("UnmapDevice.FindMapperPluginByName failed", err)
}
if blockVolumePlugin == nil {
return volumetypes.GeneratedOperations{}, deviceToDetach.GenerateErrorDetailed("UnmapDevice.FindMapperPluginBySpec failed to find BlockVolumeMapper plugin. Volume plugin is nil.", nil)
return volumetypes.GeneratedOperations{}, deviceToDetach.GenerateErrorDetailed("UnmapDevice.FindMapperPluginByName failed to find BlockVolumeMapper plugin. Volume plugin is nil.", nil)
}
blockVolumeUnmapper, newUnmapperErr := blockVolumePlugin.NewBlockVolumeUnmapper(

View File

@ -10,10 +10,12 @@ go_library(
srcs = ["volumehelper.go"],
importpath = "k8s.io/kubernetes/pkg/volume/util/volumehelper",
deps = [
"//pkg/features:go_default_library",
"//pkg/util/mount:go_default_library",
"//pkg/volume:go_default_library",
"//pkg/volume/util/types:go_default_library",
"//vendor/k8s.io/api/core/v1:go_default_library",
"//vendor/k8s.io/apiserver/pkg/util/feature:go_default_library",
],
)

View File

@ -23,6 +23,8 @@ import (
"strings"
"k8s.io/api/core/v1"
utilfeature "k8s.io/apiserver/pkg/util/feature"
"k8s.io/kubernetes/pkg/features"
"k8s.io/kubernetes/pkg/util/mount"
"k8s.io/kubernetes/pkg/volume"
"k8s.io/kubernetes/pkg/volume/util/types"
@ -157,3 +159,18 @@ func GetPersistentVolumeClaimVolumeMode(claim *v1.PersistentVolumeClaim) (v1.Per
}
return "", fmt.Errorf("cannot get volumeMode from pvc: %v", claim.Name)
}
// CheckVolumeModeFilesystem checks VolumeMode.
// If the mode is Filesystem, return true otherwise return false.
func CheckVolumeModeFilesystem(volumeSpec *volume.Spec) (bool, error) {
if utilfeature.DefaultFeatureGate.Enabled(features.BlockVolume) {
volumeMode, err := GetVolumeMode(volumeSpec)
if err != nil {
return true, err
}
if volumeMode == v1.PersistentVolumeBlock {
return false, nil
}
}
return true, nil
}