mirror of
https://github.com/k3s-io/kubernetes.git
synced 2026-01-06 07:57:35 +00:00
BlockVolumesSupport: CRI, VolumeManager and OperationExecutor changes
This patch contains following changes. - container runtime changes for adding block devices - volumemanager changes - operationexecutor changes
This commit is contained in:
@@ -29,7 +29,9 @@ import (
|
||||
|
||||
"k8s.io/api/core/v1"
|
||||
"k8s.io/apimachinery/pkg/types"
|
||||
utilfeature "k8s.io/apiserver/pkg/util/feature"
|
||||
expandcache "k8s.io/kubernetes/pkg/controller/volume/expand/cache"
|
||||
"k8s.io/kubernetes/pkg/features"
|
||||
"k8s.io/kubernetes/pkg/util/mount"
|
||||
"k8s.io/kubernetes/pkg/volume"
|
||||
"k8s.io/kubernetes/pkg/volume/util"
|
||||
@@ -105,6 +107,28 @@ type OperationExecutor interface {
|
||||
// actual state of the world to reflect that.
|
||||
UnmountDevice(deviceToDetach AttachedVolume, actualStateOfWorld ActualStateOfWorldMounterUpdater, mounter mount.Interface) error
|
||||
|
||||
// MapVolume is used when the volumeMode is 'Block'.
|
||||
// This method creates a symbolic link to the volume from both the pod
|
||||
// specified in volumeToMount and global map path.
|
||||
// Specifically it will:
|
||||
// * Wait for the device to finish attaching (for attachable volumes only).
|
||||
// * Update actual state of world to reflect volume is globally mounted/mapped.
|
||||
// * Map volume to global map path using symbolic link.
|
||||
// * Map the volume to the pod device map path using symbolic link.
|
||||
// * Update actual state of world to reflect volume is mounted/mapped to the pod path.
|
||||
MapVolume(waitForAttachTimeout time.Duration, volumeToMount VolumeToMount, actualStateOfWorld ActualStateOfWorldMounterUpdater) error
|
||||
|
||||
// UnmapVolume unmaps symbolic link to the volume from both the pod device
|
||||
// map path in volumeToUnmount and global map path.
|
||||
// And then, updates the actual state of the world to reflect that.
|
||||
UnmapVolume(volumeToUnmount MountedVolume, actualStateOfWorld ActualStateOfWorldMounterUpdater) error
|
||||
|
||||
// UnmapDevice checks number of symbolic links under global map path.
|
||||
// If number of reference is zero, remove global map path directory and
|
||||
// free a volume for detach.
|
||||
// It then updates the actual state of the world to reflect that.
|
||||
UnmapDevice(deviceToDetach AttachedVolume, actualStateOfWorld ActualStateOfWorldMounterUpdater, mounter mount.Interface) error
|
||||
|
||||
// VerifyControllerAttachedVolume checks if the specified volume is present
|
||||
// in the specified nodes AttachedVolumes Status field. It uses kubeClient
|
||||
// to fetch the node object.
|
||||
@@ -139,7 +163,7 @@ func NewOperationExecutor(
|
||||
// state of the world cache after successful mount/unmount.
|
||||
type ActualStateOfWorldMounterUpdater interface {
|
||||
// Marks the specified volume as mounted to the specified pod
|
||||
MarkVolumeAsMounted(podName volumetypes.UniquePodName, podUID types.UID, volumeName v1.UniqueVolumeName, mounter volume.Mounter, outerVolumeSpecName string, volumeGidValue string) error
|
||||
MarkVolumeAsMounted(podName volumetypes.UniquePodName, podUID types.UID, volumeName v1.UniqueVolumeName, mounter volume.Mounter, blockVolumeMapper volume.BlockVolumeMapper, outerVolumeSpecName string, volumeGidValue string) error
|
||||
|
||||
// Marks the specified volume as unmounted from the specified pod
|
||||
MarkVolumeAsUnmounted(podName volumetypes.UniquePodName, volumeName v1.UniqueVolumeName) error
|
||||
@@ -495,8 +519,16 @@ type MountedVolume struct {
|
||||
// by kubelet to create container.VolumeMap.
|
||||
Mounter volume.Mounter
|
||||
|
||||
// BlockVolumeMapper is the volume mapper used to map this volume. It is required
|
||||
// by kubelet to create container.VolumeMap.
|
||||
BlockVolumeMapper volume.BlockVolumeMapper
|
||||
|
||||
// VolumeGidValue contains the value of the GID annotation, if present.
|
||||
VolumeGidValue string
|
||||
|
||||
// VolumeSpec is a volume spec containing the specification for the volume
|
||||
// that should be mounted.
|
||||
VolumeSpec *volume.Spec
|
||||
}
|
||||
|
||||
// GenerateMsgDetailed returns detailed msgs for mounted volumes
|
||||
@@ -733,6 +765,68 @@ func (oe *operationExecutor) ExpandVolume(pvcWithResizeRequest *expandcache.PVCW
|
||||
return oe.pendingOperations.Run(uniqueVolumeKey, "", expandFunc, opCompleteFunc)
|
||||
}
|
||||
|
||||
func (oe *operationExecutor) MapVolume(
|
||||
waitForAttachTimeout time.Duration,
|
||||
volumeToMount VolumeToMount,
|
||||
actualStateOfWorld ActualStateOfWorldMounterUpdater) error {
|
||||
mapFunc, plugin, err := oe.operationGenerator.GenerateMapVolumeFunc(
|
||||
waitForAttachTimeout, volumeToMount, actualStateOfWorld)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Avoid executing map from multiple pods referencing the
|
||||
// same volume in parallel
|
||||
podName := nestedpendingoperations.EmptyUniquePodName
|
||||
// TODO: remove this -- not necessary
|
||||
if !volumeToMount.PluginIsAttachable {
|
||||
// Non-attachable volume plugins can execute mount for multiple pods
|
||||
// referencing the same volume in parallel
|
||||
podName = volumehelper.GetUniquePodName(volumeToMount.Pod)
|
||||
}
|
||||
|
||||
opCompleteFunc := util.OperationCompleteHook(plugin, "map_volume")
|
||||
return oe.pendingOperations.Run(
|
||||
volumeToMount.VolumeName, podName, mapFunc, opCompleteFunc)
|
||||
}
|
||||
|
||||
func (oe *operationExecutor) UnmapVolume(
|
||||
volumeToUnmount MountedVolume,
|
||||
actualStateOfWorld ActualStateOfWorldMounterUpdater) error {
|
||||
unmapFunc, plugin, err :=
|
||||
oe.operationGenerator.GenerateUnmapVolumeFunc(volumeToUnmount, actualStateOfWorld)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// All volume plugins can execute unmap for multiple pods referencing the
|
||||
// same volume in parallel
|
||||
podName := volumetypes.UniquePodName(volumeToUnmount.PodUID)
|
||||
|
||||
opCompleteFunc := util.OperationCompleteHook(plugin, "unmap_volume")
|
||||
return oe.pendingOperations.Run(
|
||||
volumeToUnmount.VolumeName, podName, unmapFunc, opCompleteFunc)
|
||||
}
|
||||
|
||||
func (oe *operationExecutor) UnmapDevice(
|
||||
deviceToDetach AttachedVolume,
|
||||
actualStateOfWorld ActualStateOfWorldMounterUpdater,
|
||||
mounter mount.Interface) error {
|
||||
unmapDeviceFunc, plugin, err :=
|
||||
oe.operationGenerator.GenerateUnmapDeviceFunc(deviceToDetach, actualStateOfWorld, mounter)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Avoid executing unmap device from multiple pods referencing
|
||||
// the same volume in parallel
|
||||
podName := nestedpendingoperations.EmptyUniquePodName
|
||||
|
||||
opCompleteFunc := util.OperationCompleteHook(plugin, "unmap_device")
|
||||
return oe.pendingOperations.Run(
|
||||
deviceToDetach.VolumeName, podName, unmapDeviceFunc, opCompleteFunc)
|
||||
}
|
||||
|
||||
func (oe *operationExecutor) VerifyControllerAttachedVolume(
|
||||
volumeToMount VolumeToMount,
|
||||
nodeName types.NodeName,
|
||||
@@ -748,6 +842,200 @@ func (oe *operationExecutor) VerifyControllerAttachedVolume(
|
||||
volumeToMount.VolumeName, "" /* podName */, verifyControllerAttachedVolumeFunc, opCompleteFunc)
|
||||
}
|
||||
|
||||
// VolumeStateHandler defines a set of operations for handling mount/unmount/detach/reconstruct volume-related operations
|
||||
type VolumeStateHandler interface {
|
||||
// Volume is attached, mount/map it
|
||||
MountVolumeHandler(waitForAttachTimeout time.Duration, volumeToMount VolumeToMount, actualStateOfWorld ActualStateOfWorldMounterUpdater, isRemount bool, remountingLogStr string) error
|
||||
// Volume is mounted/mapped, unmount/unmap it
|
||||
UnmountVolumeHandler(mountedVolume MountedVolume, actualStateOfWorld ActualStateOfWorldMounterUpdater) error
|
||||
// Volume is not referenced from pod, unmount/unmap and detach it
|
||||
UnmountDeviceHandler(attachedVolume AttachedVolume, actualStateOfWorld ActualStateOfWorldMounterUpdater, mounter mount.Interface) error
|
||||
// Reconstruct volume from mount path
|
||||
ReconstructVolumeHandler(plugin volume.VolumePlugin, mapperPlugin volume.BlockVolumePlugin, uid types.UID, podName volumetypes.UniquePodName, volumeSpecName string, mountPath string, pluginName string) (*volume.Spec, error)
|
||||
// check mount path if volume still exists
|
||||
CheckVolumeExistence(mountPath, volumeName string, mounter mount.Interface, uniqueVolumeName v1.UniqueVolumeName, podName volumetypes.UniquePodName, podUID types.UID, attachable volume.AttachableVolumePlugin) (bool, error)
|
||||
}
|
||||
|
||||
// NewVolumeHandler return a new instance of volumeHandler depens on a volumeMode
|
||||
func NewVolumeHandler(volumeSpec *volume.Spec, oe OperationExecutor) (VolumeStateHandler, error) {
|
||||
|
||||
// TODO: remove feature gate check after no longer needed
|
||||
var volumeHandler VolumeStateHandler
|
||||
if utilfeature.DefaultFeatureGate.Enabled(features.BlockVolume) {
|
||||
volumeMode, err := volumehelper.GetVolumeMode(volumeSpec)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if volumeMode == v1.PersistentVolumeFilesystem {
|
||||
volumeHandler = NewFilesystemVolumeHandler(oe)
|
||||
} else {
|
||||
volumeHandler = NewBlockVolumeHandler(oe)
|
||||
}
|
||||
} else {
|
||||
volumeHandler = NewFilesystemVolumeHandler(oe)
|
||||
}
|
||||
return volumeHandler, nil
|
||||
}
|
||||
|
||||
// NewFilesystemVolumeHandler returns a new instance of FilesystemVolumeHandler.
|
||||
func NewFilesystemVolumeHandler(operationExecutor OperationExecutor) FilesystemVolumeHandler {
|
||||
return FilesystemVolumeHandler{
|
||||
oe: operationExecutor}
|
||||
}
|
||||
|
||||
// NewBlockVolumeHandler returns a new instance of BlockVolumeHandler.
|
||||
func NewBlockVolumeHandler(operationExecutor OperationExecutor) BlockVolumeHandler {
|
||||
return BlockVolumeHandler{
|
||||
oe: operationExecutor}
|
||||
}
|
||||
|
||||
// FilesystemVolumeHandler is VolumeHandler for Filesystem volume
|
||||
type FilesystemVolumeHandler struct {
|
||||
oe OperationExecutor
|
||||
}
|
||||
|
||||
// BlockVolumeHandler is VolumeHandler for Block volume
|
||||
type BlockVolumeHandler struct {
|
||||
oe OperationExecutor
|
||||
}
|
||||
|
||||
// MountVolumeHandler mount/remount a volume when a volume is attached
|
||||
// This method is handler for filesystem volume
|
||||
func (f FilesystemVolumeHandler) MountVolumeHandler(waitForAttachTimeout time.Duration, volumeToMount VolumeToMount, actualStateOfWorld ActualStateOfWorldMounterUpdater, isRemount bool, remountingLogStr string) error {
|
||||
glog.V(12).Infof(volumeToMount.GenerateMsgDetailed("Starting operationExecutor.MountVolume", remountingLogStr))
|
||||
err := f.oe.MountVolume(
|
||||
waitForAttachTimeout,
|
||||
volumeToMount,
|
||||
actualStateOfWorld,
|
||||
isRemount)
|
||||
return err
|
||||
}
|
||||
|
||||
// UnmountVolumeHandler unmount a volume if a volume is mounted
|
||||
// This method is handler for filesystem volume
|
||||
func (f FilesystemVolumeHandler) UnmountVolumeHandler(mountedVolume MountedVolume, actualStateOfWorld ActualStateOfWorldMounterUpdater) error {
|
||||
glog.V(12).Infof(mountedVolume.GenerateMsgDetailed("Starting operationExecutor.UnmountVolume", ""))
|
||||
err := f.oe.UnmountVolume(
|
||||
mountedVolume,
|
||||
actualStateOfWorld)
|
||||
return err
|
||||
}
|
||||
|
||||
// UnmountDeviceHandler unmount and detach a device if a volume isn't referenced
|
||||
// This method is handler for filesystem volume
|
||||
func (f FilesystemVolumeHandler) UnmountDeviceHandler(attachedVolume AttachedVolume, actualStateOfWorld ActualStateOfWorldMounterUpdater, mounter mount.Interface) error {
|
||||
glog.V(12).Infof(attachedVolume.GenerateMsgDetailed("Starting operationExecutor.UnmountDevice", ""))
|
||||
err := f.oe.UnmountDevice(
|
||||
attachedVolume,
|
||||
actualStateOfWorld,
|
||||
mounter)
|
||||
return err
|
||||
}
|
||||
|
||||
// ReconstructVolumeHandler create volumeSpec from mount path
|
||||
// This method is handler for filesystem volume
|
||||
func (f FilesystemVolumeHandler) ReconstructVolumeHandler(plugin volume.VolumePlugin, _ volume.BlockVolumePlugin, _ types.UID, _ volumetypes.UniquePodName, volumeSpecName string, mountPath string, _ string) (*volume.Spec, error) {
|
||||
glog.V(12).Infof("Starting operationExecutor.ReconstructVolumepodName")
|
||||
volumeSpec, err := plugin.ConstructVolumeSpec(volumeSpecName, mountPath)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return volumeSpec, nil
|
||||
}
|
||||
|
||||
// CheckVolumeExistence checks mount path directory if volume still exists, return true if volume is there
|
||||
// Also return true for non-attachable volume case without mount point check
|
||||
// This method is handler for filesystem volume
|
||||
func (f FilesystemVolumeHandler) CheckVolumeExistence(mountPath, volumeName string, mounter mount.Interface, uniqueVolumeName v1.UniqueVolumeName, podName volumetypes.UniquePodName, podUID types.UID, attachable volume.AttachableVolumePlugin) (bool, error) {
|
||||
if attachable != nil {
|
||||
var isNotMount bool
|
||||
var mountCheckErr error
|
||||
if isNotMount, mountCheckErr = mounter.IsLikelyNotMountPoint(mountPath); mountCheckErr != nil {
|
||||
return false, fmt.Errorf("Could not check whether the volume %q (spec.Name: %q) pod %q (UID: %q) is mounted with: %v",
|
||||
uniqueVolumeName,
|
||||
volumeName,
|
||||
podName,
|
||||
podUID,
|
||||
mountCheckErr)
|
||||
}
|
||||
return !isNotMount, nil
|
||||
}
|
||||
return true, nil
|
||||
}
|
||||
|
||||
// MountVolumeHandler creates a map to device if a volume is attached
|
||||
// This method is handler for block volume
|
||||
func (b BlockVolumeHandler) MountVolumeHandler(waitForAttachTimeout time.Duration, volumeToMount VolumeToMount, actualStateOfWorld ActualStateOfWorldMounterUpdater, _ bool, _ string) error {
|
||||
glog.V(12).Infof(volumeToMount.GenerateMsgDetailed("Starting operationExecutor.MapVolume", ""))
|
||||
err := b.oe.MapVolume(
|
||||
waitForAttachTimeout,
|
||||
volumeToMount,
|
||||
actualStateOfWorld)
|
||||
return err
|
||||
}
|
||||
|
||||
// UnmountVolumeHandler unmap a volume if a volume is mapped
|
||||
// This method is handler for block volume
|
||||
func (b BlockVolumeHandler) UnmountVolumeHandler(mountedVolume MountedVolume, actualStateOfWorld ActualStateOfWorldMounterUpdater) error {
|
||||
glog.V(12).Infof(mountedVolume.GenerateMsgDetailed("Starting operationExecutor.UnmapVolume", ""))
|
||||
err := b.oe.UnmapVolume(
|
||||
mountedVolume,
|
||||
actualStateOfWorld)
|
||||
return err
|
||||
}
|
||||
|
||||
// UnmountDeviceHandler detach a device and remove loopback if a volume isn't referenced
|
||||
// This method is handler for block volume
|
||||
func (b BlockVolumeHandler) UnmountDeviceHandler(attachedVolume AttachedVolume, actualStateOfWorld ActualStateOfWorldMounterUpdater, mounter mount.Interface) error {
|
||||
glog.V(12).Infof(attachedVolume.GenerateMsgDetailed("Starting operationExecutor.UnmapDevice", ""))
|
||||
err := b.oe.UnmapDevice(
|
||||
attachedVolume,
|
||||
actualStateOfWorld,
|
||||
mounter)
|
||||
return err
|
||||
}
|
||||
|
||||
// ReconstructVolumeHandler create volumeSpec from mount path
|
||||
// This method is handler for block volume
|
||||
func (b BlockVolumeHandler) ReconstructVolumeHandler(_ volume.VolumePlugin, mapperPlugin volume.BlockVolumePlugin, uid types.UID, podName volumetypes.UniquePodName, volumeSpecName string, mountPath string, pluginName string) (*volume.Spec, error) {
|
||||
glog.V(12).Infof("Starting operationExecutor.ReconstructVolume")
|
||||
if mapperPlugin == nil {
|
||||
return nil, fmt.Errorf("Could not find block volume plugin %q (spec.Name: %q) pod %q (UID: %q)",
|
||||
pluginName,
|
||||
volumeSpecName,
|
||||
podName,
|
||||
uid)
|
||||
}
|
||||
// mountPath contains volumeName on the path. In the case of block volume, {volumeName} is symbolic link
|
||||
// corresponding to raw block device.
|
||||
// ex. mountPath: pods/{podUid}}/{DefaultKubeletVolumeDevicesDirName}/{escapeQualifiedPluginName}/{volumeName}
|
||||
volumeSpec, err := mapperPlugin.ConstructBlockVolumeSpec(uid, volumeSpecName, mountPath)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return volumeSpec, nil
|
||||
}
|
||||
|
||||
// CheckVolumeExistence checks mount path directory if volume still exists, then return
|
||||
// true if volume is there. Either plugin is attachable or non-attachable, the plugin
|
||||
// should have symbolic link associated to raw block device under pod device map
|
||||
// if volume exists.
|
||||
// This method is handler for block volume
|
||||
func (b BlockVolumeHandler) CheckVolumeExistence(mountPath, volumeName string, mounter mount.Interface, uniqueVolumeName v1.UniqueVolumeName, podName volumetypes.UniquePodName, podUID types.UID, _ volume.AttachableVolumePlugin) (bool, error) {
|
||||
blkutil := util.NewBlockVolumePathHandler()
|
||||
var islinkExist bool
|
||||
var checkErr error
|
||||
if islinkExist, checkErr = blkutil.IsSymlinkExist(mountPath); checkErr != nil {
|
||||
return false, fmt.Errorf("Could not check whether the block volume %q (spec.Name: %q) pod %q (UID: %q) is mapped to: %v",
|
||||
uniqueVolumeName,
|
||||
volumeName,
|
||||
podName,
|
||||
podUID,
|
||||
checkErr)
|
||||
}
|
||||
return islinkExist, nil
|
||||
}
|
||||
|
||||
// TODO: this is a workaround for the unmount device issue caused by gci mounter.
|
||||
// In GCI cluster, if gci mounter is used for mounting, the container started by mounter
|
||||
// script will cause additional mounts created in the container. Since these mounts are
|
||||
|
||||
Reference in New Issue
Block a user