Add code to handle Setup With Status tracking

This commit is contained in:
Hemant Kumar 2019-09-25 16:00:11 -04:00
parent dc9e64c31e
commit 321e99367a
31 changed files with 263 additions and 54 deletions

View File

@ -39,6 +39,7 @@ import (
"k8s.io/kubernetes/pkg/features"
"k8s.io/kubernetes/pkg/volume"
"k8s.io/kubernetes/pkg/volume/util"
volumetypes "k8s.io/kubernetes/pkg/volume/util/types"
"k8s.io/legacy-cloud-providers/aws"
utilstrings "k8s.io/utils/strings"
)
@ -369,6 +370,12 @@ func (b *awsElasticBlockStoreMounter) SetUp(mounterArgs volume.MounterArgs) erro
return b.SetUpAt(b.GetPath(), mounterArgs)
}
// SetupWithStatusTracking attaches the disk and bind mounts to the volume path.
func (b *awsElasticBlockStoreMounter) SetUpWithStatusTracking(mounterArgs volume.MounterArgs) (volumetypes.OperationStatus, error) {
err := b.SetUp(mounterArgs)
return volumetypes.OperationFinished, err
}
// SetUpAt attaches the disk and bind mounts to the volume path.
func (b *awsElasticBlockStoreMounter) SetUpAt(dir string, mounterArgs volume.MounterArgs) error {
// TODO: handle failed mounts here.

View File

@ -29,6 +29,7 @@ import (
v1 "k8s.io/api/core/v1"
"k8s.io/kubernetes/pkg/volume"
"k8s.io/kubernetes/pkg/volume/util"
volumetypes "k8s.io/kubernetes/pkg/volume/util/types"
)
type azureDiskMounter struct {
@ -69,6 +70,11 @@ func (m *azureDiskMounter) SetUp(mounterArgs volume.MounterArgs) error {
return m.SetUpAt(m.GetPath(), mounterArgs)
}
func (m *azureDiskMounter) SetUpWithStatusTracking(mounterArgs volume.MounterArgs) (volumetypes.OperationStatus, error) {
err := m.SetUp(mounterArgs)
return volumetypes.OperationFinished, err
}
func (m *azureDiskMounter) GetPath() string {
return getPath(m.dataDisk.podUID, m.dataDisk.volumeName, m.plugin.host)
}

View File

@ -36,6 +36,7 @@ import (
volumehelpers "k8s.io/cloud-provider/volume/helpers"
"k8s.io/kubernetes/pkg/volume"
volutil "k8s.io/kubernetes/pkg/volume/util"
volumetypes "k8s.io/kubernetes/pkg/volume/util/types"
"k8s.io/legacy-cloud-providers/azure"
)
@ -239,6 +240,12 @@ func (b *azureFileMounter) SetUp(mounterArgs volume.MounterArgs) error {
return b.SetUpAt(b.GetPath(), mounterArgs)
}
// SetUp attaches the disk and bind mounts to the volume path.
func (b *azureFileMounter) SetUpWithStatusTracking(mounterArgs volume.MounterArgs) (volumetypes.OperationStatus, error) {
err := b.SetUp(mounterArgs)
return volumetypes.OperationFinished, err
}
func (b *azureFileMounter) SetUpAt(dir string, mounterArgs volume.MounterArgs) error {
notMnt, err := b.mounter.IsLikelyNotMountPoint(dir)
klog.V(4).Infof("AzureFile mount set up: %s %v %v", dir, !notMnt, err)

View File

@ -33,6 +33,7 @@ import (
"k8s.io/apimachinery/pkg/types"
"k8s.io/kubernetes/pkg/volume"
"k8s.io/kubernetes/pkg/volume/util"
volumetypes "k8s.io/kubernetes/pkg/volume/util/types"
)
// ProbeVolumePlugins is the primary entrypoint for volume plugins.
@ -223,6 +224,12 @@ func (cephfsVolume *cephfsMounter) SetUp(mounterArgs volume.MounterArgs) error {
return cephfsVolume.SetUpAt(cephfsVolume.GetPath(), mounterArgs)
}
// SetUp attaches the disk and bind mounts to the volume path.
func (cephfsVolume *cephfsMounter) SetUpWithStatusTracking(mounterArgs volume.MounterArgs) (volumetypes.OperationStatus, error) {
err := cephfsVolume.SetUp(mounterArgs)
return volumetypes.OperationFinished, err
}
// SetUpAt attaches the disk and bind mounts to the volume path.
func (cephfsVolume *cephfsMounter) SetUpAt(dir string, mounterArgs volume.MounterArgs) error {
notMnt, err := cephfsVolume.mounter.IsLikelyNotMountPoint(dir)

View File

@ -39,6 +39,7 @@ import (
"k8s.io/kubernetes/pkg/features"
"k8s.io/kubernetes/pkg/volume"
"k8s.io/kubernetes/pkg/volume/util"
volumetypes "k8s.io/kubernetes/pkg/volume/util/types"
"k8s.io/legacy-cloud-providers/openstack"
)
@ -393,6 +394,11 @@ func (b *cinderVolumeMounter) SetUp(mounterArgs volume.MounterArgs) error {
return b.SetUpAt(b.GetPath(), mounterArgs)
}
func (b *cinderVolumeMounter) SetUpWithStatusTracking(mounterArgs volume.MounterArgs) (volumetypes.OperationStatus, error) {
err := b.SetUp(mounterArgs)
return volumetypes.OperationFinished, err
}
// SetUp bind mounts to the volume path.
func (b *cinderVolumeMounter) SetUpAt(dir string, mounterArgs volume.MounterArgs) error {
klog.V(5).Infof("Cinder SetUp %s to %s", b.pdName, dir)

View File

@ -29,6 +29,7 @@ import (
"k8s.io/apimachinery/pkg/types"
"k8s.io/kubernetes/pkg/volume"
volumeutil "k8s.io/kubernetes/pkg/volume/util"
volumetypes "k8s.io/kubernetes/pkg/volume/util/types"
)
// ProbeVolumePlugins is the entry point for plugin detection in a package.
@ -184,6 +185,11 @@ func (b *configMapVolumeMounter) SetUp(mounterArgs volume.MounterArgs) error {
return b.SetUpAt(b.GetPath(), mounterArgs)
}
func (b *configMapVolumeMounter) SetUpWithStatusTracking(mounterArgs volume.MounterArgs) (volumetypes.OperationStatus, error) {
err := b.SetUp(mounterArgs)
return volumetypes.OperationFinished, err
}
func (b *configMapVolumeMounter) SetUpAt(dir string, mounterArgs volume.MounterArgs) error {
klog.V(3).Infof("Setting up volume %v for pod %v at %v", b.volName, b.pod.UID, dir)

View File

@ -220,37 +220,44 @@ func (c *csiAttacher) GetDeviceMountPath(spec *volume.Spec) (string, error) {
return deviceMountPath, nil
}
func (c *csiAttacher) MountDevice(spec *volume.Spec, devicePath string, deviceMountPath string) (err error) {
func (c *csiAttacher) MountDevice(spec *volume.Spec, devicePath string, deviceMountPath string) error {
_, err := c.MountDeviceWithStatusTracking(spec, devicePath, deviceMountPath)
return err
}
func (c *csiAttacher) MountDeviceWithStatusTracking(spec *volume.Spec, devicePath string, deviceMountPath string) (volumetypes.OperationStatus, error) {
klog.V(4).Infof(log("attacher.MountDevice(%s, %s)", devicePath, deviceMountPath))
// lets default to operation as finished state
opExitStatus := volumetypes.OperationFinished
if deviceMountPath == "" {
return errors.New(log("attacher.MountDevice failed, deviceMountPath is empty"))
return opExitStatus, errors.New(log("attacher.MountDevice failed, deviceMountPath is empty"))
}
mounted, err := isDirMounted(c.plugin, deviceMountPath)
if err != nil {
klog.Error(log("attacher.MountDevice failed while checking mount status for dir [%s]", deviceMountPath))
return err
return opExitStatus, err
}
if mounted {
klog.V(4).Info(log("attacher.MountDevice skipping mount, dir already mounted [%s]", deviceMountPath))
return nil
return opExitStatus, nil
}
// Setup
if spec == nil {
return errors.New(log("attacher.MountDevice failed, spec is nil"))
return opExitStatus, errors.New(log("attacher.MountDevice failed, spec is nil"))
}
csiSource, err := getPVSourceFromSpec(spec)
if err != nil {
return errors.New(log("attacher.MountDevice failed to get CSIPersistentVolumeSource: %v", err))
return opExitStatus, errors.New(log("attacher.MountDevice failed to get CSIPersistentVolumeSource: %v", err))
}
// Store volume metadata for UnmountDevice. Keep it around even if the
// driver does not support NodeStage, UnmountDevice still needs it.
if err = os.MkdirAll(deviceMountPath, 0750); err != nil {
return errors.New(log("attacher.MountDevice failed to create dir %#v: %v", deviceMountPath, err))
return opExitStatus, errors.New(log("attacher.MountDevice failed to create dir %#v: %v", deviceMountPath, err))
}
klog.V(4).Info(log("created target path successfully [%s]", deviceMountPath))
dataDir := filepath.Dir(deviceMountPath)
@ -263,11 +270,12 @@ func (c *csiAttacher) MountDevice(spec *volume.Spec, devicePath string, deviceMo
if cleanErr := os.RemoveAll(dataDir); cleanErr != nil {
klog.Error(log("failed to remove dir after error [%s]: %v", dataDir, cleanErr))
}
return err
return opExitStatus, err
}
defer func() {
// Only for non-timedout errors remove the mount directory
if err != nil && !volumetypes.IsOperationTimeOutError(err) {
// Only if there was an error and volume operation was considered
// finished, we should remove the directory.
if err != nil && opExitStatus == volumetypes.OperationFinished {
// clean up metadata
klog.Errorf(log("attacher.MountDevice failed: %v", err))
if err := removeMountDir(c.plugin, deviceMountPath); err != nil {
@ -279,7 +287,7 @@ func (c *csiAttacher) MountDevice(spec *volume.Spec, devicePath string, deviceMo
if c.csiClient == nil {
c.csiClient, err = newCsiDriverClient(csiDriverName(csiSource.Driver))
if err != nil {
return errors.New(log("attacher.MountDevice failed to create newCsiDriverClient: %v", err))
return opExitStatus, errors.New(log("attacher.MountDevice failed to create newCsiDriverClient: %v", err))
}
}
csi := c.csiClient
@ -289,12 +297,12 @@ func (c *csiAttacher) MountDevice(spec *volume.Spec, devicePath string, deviceMo
// Check whether "STAGE_UNSTAGE_VOLUME" is set
stageUnstageSet, err := csi.NodeSupportsStageUnstage(ctx)
if err != nil {
return err
return opExitStatus, err
}
if !stageUnstageSet {
klog.Infof(log("attacher.MountDevice STAGE_UNSTAGE_VOLUME capability not set. Skipping MountDevice..."))
// defer does *not* remove the metadata file and it's correct - UnmountDevice needs it there.
return nil
return opExitStatus, nil
}
// Start MountDevice
@ -307,7 +315,9 @@ func (c *csiAttacher) MountDevice(spec *volume.Spec, devicePath string, deviceMo
if err != nil {
err = fmt.Errorf("fetching NodeStageSecretRef %s/%s failed: %v",
csiSource.NodeStageSecretRef.Namespace, csiSource.NodeStageSecretRef.Name, err)
return err
// if we failed to fetch secret then that could be a transient error
opExitStatus = volumetypes.OperationStateNoChange
return opExitStatus, err
}
}
@ -334,16 +344,14 @@ func (c *csiAttacher) MountDevice(spec *volume.Spec, devicePath string, deviceMo
mountOptions)
if err != nil {
return err
if volumetypes.IsOperationTimeOutError(err) {
opExitStatus = volumetypes.OperationInProgress
}
return opExitStatus, err
}
klog.V(4).Infof(log("attacher.MountDevice successfully requested NodeStageVolume [%s]", deviceMountPath))
return nil
}
func (c *csiAttacher) MountDeviceWithStatusTracking(spec *volume.Spec, devicePath string, deviceMountPath string) (volumetypes.OperationStatus, error) {
err := c.MountDevice(spec, devicePath, deviceMountPath)
return volumetypes.OperationFinished, err
return opExitStatus, err
}
var _ volume.Detacher = &csiAttacher{}

View File

@ -103,29 +103,41 @@ func (c *csiMountMgr) SetUp(mounterArgs volume.MounterArgs) error {
return c.SetUpAt(c.GetPath(), mounterArgs)
}
func (c *csiMountMgr) SetUpWithStatusTracking(mounterArgs volume.MounterArgs) (volumetypes.OperationStatus, error) {
err := c.SetUp(mounterArgs)
return volumetypes.OperationFinished, err
}
func (c *csiMountMgr) SetUpAt(dir string, mounterArgs volume.MounterArgs) error {
_, err := c.setupUtil(dir, mounterArgs)
return err
}
func (c *csiMountMgr) setupUtil(dir string, mounterArgs volume.MounterArgs) (volumetypes.OperationStatus, error) {
klog.V(4).Infof(log("Mounter.SetUpAt(%s)", dir))
// default to finished operation status
opExitStatus := volumetypes.OperationFinished
mounted, err := isDirMounted(c.plugin, dir)
if err != nil {
return errors.New(log("mounter.SetUpAt failed while checking mount status for dir [%s]: %v", dir, err))
return opExitStatus, errors.New(log("mounter.SetUpAt failed while checking mount status for dir [%s]: %v", dir, err))
}
if mounted {
klog.V(4).Info(log("mounter.SetUpAt skipping mount, dir already mounted [%s]", dir))
return nil
return opExitStatus, nil
}
csi, err := c.csiClientGetter.Get()
if err != nil {
return errors.New(log("mounter.SetUpAt failed to get CSI client: %v", err))
return opExitStatus, errors.New(log("mounter.SetUpAt failed to get CSI client: %v", err))
}
ctx, cancel := context.WithTimeout(context.Background(), csiTimeout)
defer cancel()
volSrc, pvSrc, err := getSourceFromSpec(c.spec)
if err != nil {
return errors.New(log("mounter.SetupAt failed to get CSI persistent source: %v", err))
return opExitStatus, errors.New(log("mounter.SetupAt failed to get CSI persistent source: %v", err))
}
driverName := c.driverName
@ -146,10 +158,10 @@ func (c *csiMountMgr) SetUpAt(dir string, mounterArgs volume.MounterArgs) error
switch {
case volSrc != nil:
if !utilfeature.DefaultFeatureGate.Enabled(features.CSIInlineVolume) {
return fmt.Errorf("CSIInlineVolume feature required")
return opExitStatus, fmt.Errorf("CSIInlineVolume feature required")
}
if c.volumeLifecycleMode != storage.VolumeLifecycleEphemeral {
return fmt.Errorf("unexpected volume mode: %s", c.volumeLifecycleMode)
return opExitStatus, fmt.Errorf("unexpected volume mode: %s", c.volumeLifecycleMode)
}
if volSrc.FSType != nil {
fsType = *volSrc.FSType
@ -164,7 +176,7 @@ func (c *csiMountMgr) SetUpAt(dir string, mounterArgs volume.MounterArgs) error
}
case pvSrc != nil:
if c.volumeLifecycleMode != storage.VolumeLifecyclePersistent {
return fmt.Errorf("unexpected driver mode: %s", c.volumeLifecycleMode)
return opExitStatus, fmt.Errorf("unexpected driver mode: %s", c.volumeLifecycleMode)
}
fsType = pvSrc.FSType
@ -185,13 +197,13 @@ func (c *csiMountMgr) SetUpAt(dir string, mounterArgs volume.MounterArgs) error
// Check for STAGE_UNSTAGE_VOLUME set and populate deviceMountPath if so
stageUnstageSet, err := csi.NodeSupportsStageUnstage(ctx)
if err != nil {
return errors.New(log("mounter.SetUpAt failed to check for STAGE_UNSTAGE_VOLUME capability: %v", err))
return opExitStatus, errors.New(log("mounter.SetUpAt failed to check for STAGE_UNSTAGE_VOLUME capability: %v", err))
}
if stageUnstageSet {
deviceMountPath, err = makeDeviceMountPath(c.plugin, c.spec)
if err != nil {
return errors.New(log("mounter.SetUpAt failed to make device mount path: %v", err))
return opExitStatus, errors.New(log("mounter.SetUpAt failed to make device mount path: %v", err))
}
}
@ -200,18 +212,20 @@ func (c *csiMountMgr) SetUpAt(dir string, mounterArgs volume.MounterArgs) error
nodeName := string(c.plugin.host.GetNodeName())
c.publishContext, err = c.plugin.getPublishContext(c.k8s, volumeHandle, string(driverName), nodeName)
if err != nil {
return err
// we could have a transient error associated with fetching publish context
opExitStatus = volumetypes.OperationStateNoChange
return opExitStatus, err
}
publishContext = c.publishContext
}
default:
return fmt.Errorf("volume source not found in volume.Spec")
return opExitStatus, fmt.Errorf("volume source not found in volume.Spec")
}
// create target_dir before call to NodePublish
if err := os.MkdirAll(dir, 0750); err != nil {
return errors.New(log("mounter.SetUpAt failed to create dir %#v: %v", dir, err))
return opExitStatus, errors.New(log("mounter.SetUpAt failed to create dir %#v: %v", dir, err))
}
klog.V(4).Info(log("created target path successfully [%s]", dir))
@ -219,7 +233,8 @@ func (c *csiMountMgr) SetUpAt(dir string, mounterArgs volume.MounterArgs) error
if secretRef != nil {
nodePublishSecrets, err = getCredentialsFromSecret(c.k8s, secretRef)
if err != nil {
return fmt.Errorf("fetching NodePublishSecretRef %s/%s failed: %v",
opExitStatus = volumetypes.OperationStateNoChange
return opExitStatus, fmt.Errorf("fetching NodePublishSecretRef %s/%s failed: %v",
secretRef.Namespace, secretRef.Name, err)
}
@ -228,7 +243,8 @@ func (c *csiMountMgr) SetUpAt(dir string, mounterArgs volume.MounterArgs) error
// Inject pod information into volume_attributes
podAttrs, err := c.podAttributes()
if err != nil {
return errors.New(log("mounter.SetUpAt failed to assemble volume attributes: %v", err))
opExitStatus = volumetypes.OperationStateNoChange
return opExitStatus, errors.New(log("mounter.SetUpAt failed to assemble volume attributes: %v", err))
}
if podAttrs != nil {
if volAttribs == nil {
@ -255,13 +271,16 @@ func (c *csiMountMgr) SetUpAt(dir string, mounterArgs volume.MounterArgs) error
)
if err != nil {
// If error is not of type time out then we can remove the mount directory
if volumetypes.IsOperationTimeOutError(err) {
opExitStatus = volumetypes.OperationInProgress
}
// If operation finished with error then we can remove the mount directory.
if opExitStatus == volumetypes.OperationFinished {
if removeMountDirErr := removeMountDir(c.plugin, dir); removeMountDirErr != nil {
klog.Error(log("mounter.SetupAt failed to remove mount dir after a NodePublish() error [%s]: %v", dir, removeMountDirErr))
}
}
return errors.New(log("mounter.SetupAt failed: %v", err))
return opExitStatus, errors.New(log("mounter.SetupAt failed: %v", err))
}
c.supportsSELinux, err = c.kubeVolHost.GetHostUtil().GetSELinuxSupport(dir)
@ -276,22 +295,16 @@ func (c *csiMountMgr) SetUpAt(dir string, mounterArgs volume.MounterArgs) error
err = c.applyFSGroup(fsType, mounterArgs.FsGroup)
if err != nil {
// If we are here that means volume was mounted correctly and it must at least be unmounted
// before it can be used by someone else.
opExitStatus = volumetypes.OperationInProgress
// attempt to rollback mount.
fsGrpErr := fmt.Errorf("applyFSGroup failed for vol %s: %v", c.volumeID, err)
if unpubErr := csi.NodeUnpublishVolume(ctx, c.volumeID, dir); unpubErr != nil {
klog.Error(log("NodeUnpublishVolume failed for [%s]: %v", c.volumeID, unpubErr))
return fsGrpErr
}
if unmountErr := removeMountDir(c.plugin, dir); unmountErr != nil {
klog.Error(log("removeMountDir failed for [%s]: %v", dir, unmountErr))
return fsGrpErr
}
return fsGrpErr
return opExitStatus, fsGrpErr
}
klog.V(4).Infof(log("mounter.SetUp successfully requested NodePublish [%s]", dir))
return nil
return opExitStatus, nil
}
func (c *csiMountMgr) podAttributes() (map[string]string, error) {

View File

@ -28,6 +28,7 @@ import (
"k8s.io/kubernetes/pkg/fieldpath"
"k8s.io/kubernetes/pkg/volume"
volumeutil "k8s.io/kubernetes/pkg/volume/util"
volumetypes "k8s.io/kubernetes/pkg/volume/util/types"
utilstrings "k8s.io/utils/strings"
)
@ -174,6 +175,11 @@ func (b *downwardAPIVolumeMounter) SetUp(mounterArgs volume.MounterArgs) error {
return b.SetUpAt(b.GetPath(), mounterArgs)
}
func (b *downwardAPIVolumeMounter) SetUpWithStatusTracking(mounterArgs volume.MounterArgs) (volumetypes.OperationStatus, error) {
err := b.SetUp(mounterArgs)
return volumetypes.OperationFinished, err
}
func (b *downwardAPIVolumeMounter) SetUpAt(dir string, mounterArgs volume.MounterArgs) error {
klog.V(3).Infof("Setting up a downwardAPI volume %v for pod %v/%v at %v", b.volName, b.pod.Namespace, b.pod.Name, dir)
// Wrap EmptyDir. Here we rely on the idempotency of the wrapped plugin to avoid repeatedly mounting

View File

@ -33,6 +33,7 @@ import (
"k8s.io/kubernetes/pkg/volume"
volumeutil "k8s.io/kubernetes/pkg/volume/util"
"k8s.io/kubernetes/pkg/volume/util/fsquota"
volumetypes "k8s.io/kubernetes/pkg/volume/util/types"
)
// TODO: in the near future, this will be changed to be more restrictive
@ -196,6 +197,11 @@ func (ed *emptyDir) SetUp(mounterArgs volume.MounterArgs) error {
return ed.SetUpAt(ed.GetPath(), mounterArgs)
}
func (ed *emptyDir) SetUpWithStatusTracking(mounterArgs volume.MounterArgs) (volumetypes.OperationStatus, error) {
err := ed.SetUp(mounterArgs)
return volumetypes.OperationFinished, err
}
// SetUpAt creates new directory.
func (ed *emptyDir) SetUpAt(dir string, mounterArgs volume.MounterArgs) error {
notMnt, err := ed.mounter.IsLikelyNotMountPoint(dir)

View File

@ -35,6 +35,7 @@ import (
"k8s.io/kubernetes/pkg/features"
"k8s.io/kubernetes/pkg/volume"
"k8s.io/kubernetes/pkg/volume/util"
volumetypes "k8s.io/kubernetes/pkg/volume/util/types"
"k8s.io/kubernetes/pkg/volume/util/volumepathhandler"
)
@ -373,6 +374,11 @@ func (b *fcDiskMounter) SetUp(mounterArgs volume.MounterArgs) error {
return b.SetUpAt(b.GetPath(), mounterArgs)
}
func (b *fcDiskMounter) SetUpWithStatusTracking(mounterArgs volume.MounterArgs) (volumetypes.OperationStatus, error) {
err := b.SetUp(mounterArgs)
return volumetypes.OperationFinished, err
}
func (b *fcDiskMounter) SetUpAt(dir string, mounterArgs volume.MounterArgs) error {
// diskSetUp checks mountpoints and prevent repeated calls
err := diskSetUp(b.manager, *b, dir, b.mounter, mounterArgs.FsGroup)

View File

@ -21,6 +21,7 @@ import (
"strconv"
"k8s.io/kubernetes/pkg/volume"
volumetypes "k8s.io/kubernetes/pkg/volume/util/types"
"k8s.io/utils/exec"
)
@ -43,6 +44,12 @@ func (f *flexVolumeMounter) SetUp(mounterArgs volume.MounterArgs) error {
return f.SetUpAt(f.GetPath(), mounterArgs)
}
// SetUp creates new directory.
func (f *flexVolumeMounter) SetUpWithStatusTracking(mounterArgs volume.MounterArgs) (volumetypes.OperationStatus, error) {
err := f.SetUp(mounterArgs)
return volumetypes.OperationFinished, err
}
// SetUpAt creates new directory.
func (f *flexVolumeMounter) SetUpAt(dir string, mounterArgs volume.MounterArgs) error {
// Mount only once.

View File

@ -31,6 +31,7 @@ import (
"k8s.io/apimachinery/pkg/types"
"k8s.io/kubernetes/pkg/util/env"
"k8s.io/kubernetes/pkg/volume"
volumetypes "k8s.io/kubernetes/pkg/volume/util/types"
)
// ProbeVolumePlugins is the primary entrypoint for volume plugins.
@ -235,6 +236,11 @@ func (b *flockerVolumeMounter) SetUp(mounterArgs volume.MounterArgs) error {
return b.SetUpAt(b.GetPath(), mounterArgs)
}
func (b *flockerVolumeMounter) SetUpWithStatusTracking(mounterArgs volume.MounterArgs) (volumetypes.OperationStatus, error) {
err := b.SetUp(mounterArgs)
return volumetypes.OperationFinished, err
}
// newFlockerClient uses environment variables and pod attributes to return a
// flocker client capable of talking with the Flocker control service.
func (p *flockerPlugin) newFlockerClient(hostIP string) (*flockerapi.Client, error) {

View File

@ -40,6 +40,7 @@ import (
"k8s.io/kubernetes/pkg/features"
"k8s.io/kubernetes/pkg/volume"
"k8s.io/kubernetes/pkg/volume/util"
volumetypes "k8s.io/kubernetes/pkg/volume/util/types"
gcecloud "k8s.io/legacy-cloud-providers/gce"
)
@ -372,6 +373,12 @@ func (b *gcePersistentDiskMounter) SetUp(mounterArgs volume.MounterArgs) error {
return b.SetUpAt(b.GetPath(), mounterArgs)
}
// SetUp bind mounts the disk global mount to the volume path.
func (b *gcePersistentDiskMounter) SetUpWithStatusTracking(mounterArgs volume.MounterArgs) (volumetypes.OperationStatus, error) {
err := b.SetUp(mounterArgs)
return volumetypes.OperationFinished, err
}
// SetUp bind mounts the disk global mount to the give volume path.
func (b *gcePersistentDiskMounter) SetUpAt(dir string, mounterArgs volume.MounterArgs) error {
// TODO: handle failed mounts here.

View File

@ -26,6 +26,7 @@ import (
"k8s.io/apimachinery/pkg/types"
"k8s.io/kubernetes/pkg/volume"
volumeutil "k8s.io/kubernetes/pkg/volume/util"
volumetypes "k8s.io/kubernetes/pkg/volume/util/types"
"k8s.io/utils/exec"
utilstrings "k8s.io/utils/strings"
)
@ -179,6 +180,12 @@ func (b *gitRepoVolumeMounter) SetUp(mounterArgs volume.MounterArgs) error {
return b.SetUpAt(b.GetPath(), mounterArgs)
}
// SetUp creates new directory and clones a git repo.
func (b *gitRepoVolumeMounter) SetUpWithStatusTracking(mounterArgs volume.MounterArgs) (volumetypes.OperationStatus, error) {
err := b.SetUp(mounterArgs)
return volumetypes.OperationFinished, err
}
// SetUpAt creates new directory and clones a git repo.
func (b *gitRepoVolumeMounter) SetUpAt(dir string, mounterArgs volume.MounterArgs) error {
if volumeutil.IsReady(b.getMetaDir()) {

View File

@ -47,6 +47,7 @@ import (
v1helper "k8s.io/kubernetes/pkg/apis/core/v1/helper"
"k8s.io/kubernetes/pkg/volume"
volutil "k8s.io/kubernetes/pkg/volume/util"
volumetypes "k8s.io/kubernetes/pkg/volume/util/types"
)
// ProbeVolumePlugins is the primary entrypoint for volume plugins.
@ -273,6 +274,12 @@ func (b *glusterfsMounter) SetUp(mounterArgs volume.MounterArgs) error {
return b.SetUpAt(b.GetPath(), mounterArgs)
}
// SetUp attaches the disk and bind mounts to the volume path.
func (b *glusterfsMounter) SetUpWithStatusTracking(mounterArgs volume.MounterArgs) (volumetypes.OperationStatus, error) {
err := b.SetUp(mounterArgs)
return volumetypes.OperationFinished, err
}
func (b *glusterfsMounter) SetUpAt(dir string, mounterArgs volume.MounterArgs) error {
notMnt, err := b.mounter.IsLikelyNotMountPoint(dir)
klog.V(4).Infof("mount setup: %s %v %v", dir, !notMnt, err)

View File

@ -31,6 +31,7 @@ import (
"k8s.io/kubernetes/pkg/volume/util"
"k8s.io/kubernetes/pkg/volume/util/hostutil"
"k8s.io/kubernetes/pkg/volume/util/recyclerclient"
volumetypes "k8s.io/kubernetes/pkg/volume/util/types"
"k8s.io/kubernetes/pkg/volume/validation"
)
@ -238,6 +239,12 @@ func (b *hostPathMounter) SetUp(mounterArgs volume.MounterArgs) error {
return checkType(b.GetPath(), b.pathType, b.hu)
}
// SetUpWithStatusTracking calls setup and returns additional information about operation state
func (b *hostPathMounter) SetUpWithStatusTracking(mounterArgs volume.MounterArgs) (volumetypes.OperationStatus, error) {
err := b.SetUp(mounterArgs)
return volumetypes.OperationFinished, err
}
// SetUpAt does not make sense for host paths - probably programmer error.
func (b *hostPathMounter) SetUpAt(dir string, mounterArgs volume.MounterArgs) error {
return fmt.Errorf("SetUpAt() does not make sense for host paths")

View File

@ -34,6 +34,7 @@ import (
"k8s.io/apimachinery/pkg/types"
"k8s.io/kubernetes/pkg/volume"
ioutil "k8s.io/kubernetes/pkg/volume/util"
volumetypes "k8s.io/kubernetes/pkg/volume/util/types"
"k8s.io/kubernetes/pkg/volume/util/volumepathhandler"
)
@ -342,6 +343,11 @@ func (b *iscsiDiskMounter) SetUp(mounterArgs volume.MounterArgs) error {
return b.SetUpAt(b.GetPath(), mounterArgs)
}
func (b *iscsiDiskMounter) SetUpWithStatusTracking(mounterArgs volume.MounterArgs) (volumetypes.OperationStatus, error) {
err := b.SetUp(mounterArgs)
return volumetypes.OperationFinished, err
}
func (b *iscsiDiskMounter) SetUpAt(dir string, mounterArgs volume.MounterArgs) error {
// diskSetUp checks mountpoints and prevent repeated calls
err := diskSetUp(b.manager, *b, dir, b.mounter, mounterArgs.FsGroup)

View File

@ -479,6 +479,12 @@ func (m *localVolumeMounter) SetUp(mounterArgs volume.MounterArgs) error {
return m.SetUpAt(m.GetPath(), mounterArgs)
}
// SetUp bind mounts the directory to the volume path
func (m *localVolumeMounter) SetUpWithStatusTracking(mounterArgs volume.MounterArgs) (volumetypes.OperationStatus, error) {
err := m.SetUp(mounterArgs)
return volumetypes.OperationFinished, err
}
// SetUpAt bind mounts the directory to the volume path and sets up volume ownership
func (m *localVolumeMounter) SetUpAt(dir string, mounterArgs volume.MounterArgs) error {
m.plugin.volumeLocks.LockKey(m.globalPath)

View File

@ -31,6 +31,7 @@ import (
"k8s.io/kubernetes/pkg/volume"
"k8s.io/kubernetes/pkg/volume/util"
"k8s.io/kubernetes/pkg/volume/util/recyclerclient"
volumetypes "k8s.io/kubernetes/pkg/volume/util/types"
)
func getPath(uid types.UID, volName string, host volume.VolumeHost) string {
@ -241,6 +242,11 @@ func (nfsMounter *nfsMounter) SetUp(mounterArgs volume.MounterArgs) error {
return nfsMounter.SetUpAt(nfsMounter.GetPath(), mounterArgs)
}
func (nfsMounter *nfsMounter) SetUpWithStatusTracking(mounterArgs volume.MounterArgs) (volumetypes.OperationStatus, error) {
err := nfsMounter.SetUp(mounterArgs)
return volumetypes.OperationFinished, err
}
func (nfsMounter *nfsMounter) SetUpAt(dir string, mounterArgs volume.MounterArgs) error {
notMnt, err := mount.IsNotMountPoint(nfsMounter.mounter, dir)
klog.V(4).Infof("NFS mount set up: %s %v %v", dir, !notMnt, err)

View File

@ -31,6 +31,7 @@ import (
"k8s.io/apimachinery/pkg/types"
"k8s.io/kubernetes/pkg/volume"
"k8s.io/kubernetes/pkg/volume/util"
volumetypes "k8s.io/kubernetes/pkg/volume/util/types"
)
const (
@ -299,6 +300,11 @@ func (b *portworxVolumeMounter) SetUp(mounterArgs volume.MounterArgs) error {
return b.SetUpAt(b.GetPath(), mounterArgs)
}
func (b *portworxVolumeMounter) SetUpWithStatusTracking(mounterArgs volume.MounterArgs) (volumetypes.OperationStatus, error) {
err := b.SetUp(mounterArgs)
return volumetypes.OperationFinished, err
}
// SetUpAt attaches the disk and bind mounts to the volume path.
func (b *portworxVolumeMounter) SetUpAt(dir string, mounterArgs volume.MounterArgs) error {
notMnt, err := b.mounter.IsLikelyNotMountPoint(dir)

View File

@ -33,6 +33,7 @@ import (
"k8s.io/kubernetes/pkg/volume/downwardapi"
"k8s.io/kubernetes/pkg/volume/secret"
volumeutil "k8s.io/kubernetes/pkg/volume/util"
volumetypes "k8s.io/kubernetes/pkg/volume/util/types"
utilstrings "k8s.io/utils/strings"
)
@ -188,6 +189,11 @@ func (s *projectedVolumeMounter) SetUp(mounterArgs volume.MounterArgs) error {
return s.SetUpAt(s.GetPath(), mounterArgs)
}
func (s *projectedVolumeMounter) SetUpWithStatusTracking(mounterArgs volume.MounterArgs) (volumetypes.OperationStatus, error) {
err := s.SetUp(mounterArgs)
return volumetypes.OperationFinished, err
}
func (s *projectedVolumeMounter) SetUpAt(dir string, mounterArgs volume.MounterArgs) error {
klog.V(3).Infof("Setting up volume %v for pod %v at %v", s.volName, s.pod.UID, dir)

View File

@ -33,6 +33,7 @@ import (
"k8s.io/apimachinery/pkg/types"
"k8s.io/kubernetes/pkg/volume"
"k8s.io/kubernetes/pkg/volume/util"
volumetypes "k8s.io/kubernetes/pkg/volume/util/types"
)
// ProbeVolumePlugins is the primary entrypoint for volume plugins.
@ -239,6 +240,12 @@ func (mounter *quobyteMounter) SetUp(mounterArgs volume.MounterArgs) error {
return mounter.SetUpAt(pluginDir, mounterArgs)
}
// SetUpWithStatusTracking attaches the disk and bind mounts to the volume path.
func (mounter *quobyteMounter) SetUpWithStatusTracking(mounterArgs volume.MounterArgs) (volumetypes.OperationStatus, error) {
err := mounter.SetUp(mounterArgs)
return volumetypes.OperationFinished, err
}
func (mounter *quobyteMounter) SetUpAt(dir string, mounterArgs volume.MounterArgs) error {
// Check if Quobyte is already mounted on the host in the Plugin Dir
// if so we can use this mountpoint instead of creating a new one

View File

@ -40,6 +40,7 @@ import (
"k8s.io/kubernetes/pkg/volume"
"k8s.io/kubernetes/pkg/volume/util"
volutil "k8s.io/kubernetes/pkg/volume/util"
volumetypes "k8s.io/kubernetes/pkg/volume/util/types"
"k8s.io/kubernetes/pkg/volume/util/volumepathhandler"
)
@ -840,6 +841,11 @@ func (b *rbdMounter) SetUp(mounterArgs volume.MounterArgs) error {
return b.SetUpAt(b.GetPath(), mounterArgs)
}
func (b *rbdMounter) SetUpWithStatusTracking(mounterArgs volume.MounterArgs) (volumetypes.OperationStatus, error) {
err := b.SetUp(mounterArgs)
return volumetypes.OperationFinished, err
}
func (b *rbdMounter) SetUpAt(dir string, mounterArgs volume.MounterArgs) error {
// diskSetUp checks mountpoints and prevent repeated calls
klog.V(4).Infof("rbd: attempting to setup at %s", dir)

View File

@ -35,6 +35,7 @@ import (
volumehelpers "k8s.io/cloud-provider/volume/helpers"
"k8s.io/kubernetes/pkg/volume"
"k8s.io/kubernetes/pkg/volume/util"
volumetypes "k8s.io/kubernetes/pkg/volume/util/types"
)
type sioVolume struct {
@ -82,6 +83,11 @@ func (v *sioVolume) SetUp(mounterArgs volume.MounterArgs) error {
return v.SetUpAt(v.GetPath(), mounterArgs)
}
func (v *sioVolume) SetUpWithStatusTracking(mounterArgs volume.MounterArgs) (volumetypes.OperationStatus, error) {
err := v.SetUp(mounterArgs)
return volumetypes.OperationFinished, err
}
// SetUp bind mounts the disk global mount to the volume path.
func (v *sioVolume) SetUpAt(dir string, mounterArgs volume.MounterArgs) error {
v.plugin.volumeMtx.LockKey(v.volSpecName)

View File

@ -29,6 +29,7 @@ import (
"k8s.io/apimachinery/pkg/types"
"k8s.io/kubernetes/pkg/volume"
volumeutil "k8s.io/kubernetes/pkg/volume/util"
volumetypes "k8s.io/kubernetes/pkg/volume/util/types"
)
// ProbeVolumePlugins is the entry point for plugin detection in a package.
@ -179,6 +180,11 @@ func (b *secretVolumeMounter) SetUp(mounterArgs volume.MounterArgs) error {
return b.SetUpAt(b.GetPath(), mounterArgs)
}
func (b *secretVolumeMounter) SetUpWithStatusTracking(mounterArgs volume.MounterArgs) (volumetypes.OperationStatus, error) {
err := b.SetUp(mounterArgs)
return volumetypes.OperationFinished, err
}
func (b *secretVolumeMounter) SetUpAt(dir string, mounterArgs volume.MounterArgs) error {
klog.V(3).Infof("Setting up volume %v for pod %v at %v", b.volName, b.pod.UID, dir)

View File

@ -36,6 +36,7 @@ import (
volumehelpers "k8s.io/cloud-provider/volume/helpers"
"k8s.io/kubernetes/pkg/volume"
"k8s.io/kubernetes/pkg/volume/util"
volumetypes "k8s.io/kubernetes/pkg/volume/util/types"
)
// ProbeVolumePlugins is the primary entrypoint for volume plugins.
@ -376,6 +377,11 @@ func (b *storageosMounter) SetUp(mounterArgs volume.MounterArgs) error {
return b.SetUpAt(b.GetPath(), mounterArgs)
}
func (b *storageosMounter) SetUpWithStatusTracking(mounterArgs volume.MounterArgs) (volumetypes.OperationStatus, error) {
err := b.SetUp(mounterArgs)
return volumetypes.OperationFinished, err
}
// SetUp bind mounts the disk global mount to the give volume path.
func (b *storageosMounter) SetUpAt(dir string, mounterArgs volume.MounterArgs) error {
notMnt, err := b.mounter.IsLikelyNotMountPoint(dir)

View File

@ -847,6 +847,11 @@ func (fv *FakeVolume) SetUp(mounterArgs MounterArgs) error {
return fv.SetUpAt(fv.getPath(), mounterArgs)
}
func (fv *FakeVolume) SetUpWithStatusTracking(mounterArgs MounterArgs) (volumetypes.OperationStatus, error) {
err := fv.SetUp(mounterArgs)
return volumetypes.OperationFinished, err
}
func (fv *FakeVolume) GetSetUpCallCount() int {
fv.RLock()
defer fv.RUnlock()

View File

@ -580,10 +580,16 @@ func (og *operationGenerator) GenerateMountVolumeFunc(
devicePath,
deviceMountPath)
if err != nil {
if operationState == volumetypes.OperationInProgress {
switch operationState {
case volumetypes.OperationInProgress:
markDeviceUncertainError := actualStateOfWorld.MarkDeviceAsUncertain(volumeToMount.VolumeName, devicePath, deviceMountPath)
if markDeviceUncertainError != nil {
klog.Infof("MountVolume.MarkDeviceAsUncertain failed with %v", markDeviceUncertainError)
klog.Errorf(volumeToMount.GenerateErrorDetailed("MountDevice.MarkDeviceAsUncertain failed", markDeviceUncertainError).Error())
}
case volumetypes.OperationFinished:
markDeviceUnmountError := actualStateOfWorld.MarkDeviceAsUnmounted(volumeToMount.VolumeName)
if markDeviceUnmountError != nil {
klog.Errorf(volumeToMount.GenerateErrorDetailed("MountDevice.MarkDeviceAsUnmounted failed", markDeviceUnmountError).Error())
}
}
// On failure, return error. Caller will log and retry.
@ -623,7 +629,7 @@ func (og *operationGenerator) GenerateMountVolumeFunc(
}
// Execute mount
mountErr := volumeMounter.SetUp(volume.MounterArgs{
opExitStatus, mountErr := volumeMounter.SetUpWithStatusTracking(volume.MounterArgs{
FsGroup: fsGroup,
DesiredSize: volumeToMount.DesiredSizeLimit,
})
@ -639,11 +645,18 @@ func (og *operationGenerator) GenerateMountVolumeFunc(
VolumeMountState: VolumeMounted,
}
if mountErr != nil {
if volumetypes.IsOperationTimeOutError(mountErr) {
switch opExitStatus {
case volumetypes.OperationInProgress:
markOpts.VolumeMountState = VolumeMountUncertain
t := actualStateOfWorld.MarkVolumeMountAsUncertain(markOpts)
if t != nil {
klog.Errorf("MountVolume.MarkVolumeMountAsUncertain failed: %v", t)
klog.Errorf(volumeToMount.GenerateErrorDetailed("MountVolume.MarkVolumeMountAsUncertain failed", t).Error())
}
case volumetypes.OperationFinished:
markOpts.VolumeMountState = VolumeNotMounted
t := actualStateOfWorld.MarkVolumeAsUnmounted(volumeToMount.PodName, volumeToMount.VolumeName)
if t != nil {
klog.Errorf(volumeToMount.GenerateErrorDetailed("MountVolume.MarkVolumeAsUnmounted failed", t).Error())
}
}
// On failure, return error. Caller will log and retry.

View File

@ -130,6 +130,11 @@ type Mounter interface {
// accessed by the pod. This may be called more than once, so
// implementations must be idempotent.
SetUp(mounterArgs MounterArgs) error
// SetupWithStatusTracking is similar to SetUp function except it
// also return operation status as a return value
SetUpWithStatusTracking(mounterArgs MounterArgs) (volumetypes.OperationStatus, error)
// SetUpAt prepares and mounts/unpacks the volume to the
// specified directory path, which may or may not exist yet.
// The mount point and its content should be owned by

View File

@ -38,7 +38,7 @@ import (
"k8s.io/kubernetes/pkg/features"
"k8s.io/kubernetes/pkg/volume"
"k8s.io/kubernetes/pkg/volume/util"
)
volumetypes "k8s.io/kubernetes/pkg/volume/util/types"
// This is the primary entrypoint for volume plugins.
func ProbeVolumePlugins() []volume.VolumePlugin {
@ -217,6 +217,12 @@ func (b *vsphereVolumeMounter) SetUp(mounterArgs volume.MounterArgs) error {
return b.SetUpAt(b.GetPath(), mounterArgs)
}
// SetUp attaches the disk and bind mounts to the volume path.
func (b *vsphereVolumeMounter) SetUpWithStatusTracking(mounterArgs volume.MounterArgs) (volumetypes.OperationStatus, error) {
err := b.SetUp(mounterArgs)
return volumetypes.OperationFinished, err
}
// Checks prior to mount operations to verify that the required components (binaries, etc.)
// to mount the volume are available on the underlying node.
// If not, it returns an error