Merge pull request #82492 from gnufied/fix-uncertain-mounts

Fix uncertain mounts
This commit is contained in:
Kubernetes Prow Robot 2019-12-17 14:49:57 -08:00 committed by GitHub
commit 40df9f82d0
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
25 changed files with 1393 additions and 251 deletions

View File

@ -40,6 +40,7 @@ go_test(
"//pkg/volume:go_default_library",
"//pkg/volume/testing:go_default_library",
"//pkg/volume/util:go_default_library",
"//pkg/volume/util/operationexecutor:go_default_library",
"//pkg/volume/util/types:go_default_library",
"//staging/src/k8s.io/api/core/v1:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",

View File

@ -24,7 +24,7 @@ import (
"fmt"
"sync"
"k8s.io/api/core/v1"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/types"
utilfeature "k8s.io/apiserver/pkg/util/feature"
"k8s.io/klog"
@ -59,7 +59,7 @@ type ActualStateOfWorld interface {
// volume, reset the pod's remountRequired value.
// If a volume with the name volumeName does not exist in the list of
// attached volumes, an error is returned.
AddPodToVolume(podName volumetypes.UniquePodName, podUID types.UID, volumeName v1.UniqueVolumeName, mounter volume.Mounter, blockVolumeMapper volume.BlockVolumeMapper, outerVolumeSpecName string, volumeGidValue string, volumeSpec *volume.Spec) error
AddPodToVolume(operationexecutor.MarkVolumeOpts) error
// MarkRemountRequired marks each volume that is successfully attached and
// mounted for the specified pod as requiring remount (if the plugin for the
@ -68,13 +68,13 @@ type ActualStateOfWorld interface {
// pod update.
MarkRemountRequired(podName volumetypes.UniquePodName)
// SetVolumeGloballyMounted sets the GloballyMounted value for the given
// volume. When set to true this value indicates that the volume is mounted
// to the underlying device at a global mount point. This global mount point
// must unmounted prior to detach.
// SetDeviceMountState sets device mount state for the given volume. When deviceMountState is set to DeviceGloballyMounted
// then device is mounted at a global mount point. When it is set to DeviceMountUncertain then also it means volume
// MAY be globally mounted at a global mount point. In both cases - the volume must be unmounted from
// global mount point prior to detach.
// If a volume with the name volumeName does not exist in the list of
// attached volumes, an error is returned.
SetVolumeGloballyMounted(volumeName v1.UniqueVolumeName, globallyMounted bool, devicePath, deviceMountPath string) error
SetDeviceMountState(volumeName v1.UniqueVolumeName, deviceMountState operationexecutor.DeviceMountState, devicePath, deviceMountPath string) error
// DeletePodFromVolume removes the given pod from the given volume in the
// cache indicating the volume has been successfully unmounted from the pod.
@ -127,6 +127,10 @@ type ActualStateOfWorld interface {
// actual state of the world.
GetMountedVolumes() []MountedVolume
// GetAllMountedVolumes returns list of all possibly mounted volumes including
// those that are in VolumeMounted state and VolumeMountUncertain state.
GetAllMountedVolumes() []MountedVolume
// GetMountedVolumesForPod generates and returns a list of volumes that are
// successfully attached and mounted for the specified pod based on the
// current actual state of the world.
@ -165,10 +169,15 @@ type MountedVolume struct {
type AttachedVolume struct {
operationexecutor.AttachedVolume
// GloballyMounted indicates that the volume is mounted to the underlying
// device at a global mount point. This global mount point must unmounted
// prior to detach.
GloballyMounted bool
// DeviceMountState indicates if device has been globally mounted or is not.
DeviceMountState operationexecutor.DeviceMountState
}
// DeviceMayBeMounted returns true if device is mounted in global path or is in
// uncertain state.
func (av AttachedVolume) DeviceMayBeMounted() bool {
return av.DeviceMountState == operationexecutor.DeviceGloballyMounted ||
av.DeviceMountState == operationexecutor.DeviceMountUncertain
}
// NewActualStateOfWorld returns a new instance of ActualStateOfWorld.
@ -245,10 +254,9 @@ type attachedVolume struct {
// this volume implements the volume.Attacher interface
pluginIsAttachable bool
// globallyMounted indicates that the volume is mounted to the underlying
// device at a global mount point. This global mount point must be unmounted
// prior to detach.
globallyMounted bool
// deviceMountState stores information that tells us if device is mounted
// globally or not
deviceMountState operationexecutor.DeviceMountState
// devicePath contains the path on the node where the volume is attached for
// attachable volumes
@ -301,6 +309,11 @@ type mountedPod struct {
// fsResizeRequired indicates the underlying volume has been successfully
// mounted to this pod but its size has been expanded after that.
fsResizeRequired bool
// volumeMountStateForPod stores state of volume mount for the pod. if it is:
// - VolumeMounted: means volume for pod has been successfully mounted
// - VolumeMountUncertain: means volume for pod may not be mounted, but it must be unmounted
volumeMountStateForPod operationexecutor.VolumeMountState
}
func (asw *actualStateOfWorld) MarkVolumeAsAttached(
@ -318,24 +331,8 @@ func (asw *actualStateOfWorld) MarkVolumeAsDetached(
asw.DeleteVolume(volumeName)
}
func (asw *actualStateOfWorld) MarkVolumeAsMounted(
podName volumetypes.UniquePodName,
podUID types.UID,
volumeName v1.UniqueVolumeName,
mounter volume.Mounter,
blockVolumeMapper volume.BlockVolumeMapper,
outerVolumeSpecName string,
volumeGidValue string,
volumeSpec *volume.Spec) error {
return asw.AddPodToVolume(
podName,
podUID,
volumeName,
mounter,
blockVolumeMapper,
outerVolumeSpecName,
volumeGidValue,
volumeSpec)
func (asw *actualStateOfWorld) MarkVolumeAsMounted(markVolumeOpts operationexecutor.MarkVolumeOpts) error {
return asw.AddPodToVolume(markVolumeOpts)
}
func (asw *actualStateOfWorld) AddVolumeToReportAsAttached(volumeName v1.UniqueVolumeName, nodeName types.NodeName) {
@ -354,12 +351,50 @@ func (asw *actualStateOfWorld) MarkVolumeAsUnmounted(
func (asw *actualStateOfWorld) MarkDeviceAsMounted(
volumeName v1.UniqueVolumeName, devicePath, deviceMountPath string) error {
return asw.SetVolumeGloballyMounted(volumeName, true /* globallyMounted */, devicePath, deviceMountPath)
return asw.SetDeviceMountState(volumeName, operationexecutor.DeviceGloballyMounted, devicePath, deviceMountPath)
}
func (asw *actualStateOfWorld) MarkDeviceAsUncertain(
volumeName v1.UniqueVolumeName, devicePath, deviceMountPath string) error {
return asw.SetDeviceMountState(volumeName, operationexecutor.DeviceMountUncertain, devicePath, deviceMountPath)
}
func (asw *actualStateOfWorld) MarkVolumeMountAsUncertain(markVolumeOpts operationexecutor.MarkVolumeOpts) error {
markVolumeOpts.VolumeMountState = operationexecutor.VolumeMountUncertain
return asw.AddPodToVolume(markVolumeOpts)
}
func (asw *actualStateOfWorld) MarkDeviceAsUnmounted(
volumeName v1.UniqueVolumeName) error {
return asw.SetVolumeGloballyMounted(volumeName, false /* globallyMounted */, "", "")
return asw.SetDeviceMountState(volumeName, operationexecutor.DeviceNotMounted, "", "")
}
func (asw *actualStateOfWorld) GetDeviceMountState(volumeName v1.UniqueVolumeName) operationexecutor.DeviceMountState {
asw.RLock()
defer asw.RUnlock()
volumeObj, volumeExists := asw.attachedVolumes[volumeName]
if !volumeExists {
return operationexecutor.DeviceNotMounted
}
return volumeObj.deviceMountState
}
func (asw *actualStateOfWorld) GetVolumeMountState(volumeName v1.UniqueVolumeName, podName volumetypes.UniquePodName) operationexecutor.VolumeMountState {
asw.RLock()
defer asw.RUnlock()
volumeObj, volumeExists := asw.attachedVolumes[volumeName]
if !volumeExists {
return operationexecutor.VolumeNotMounted
}
podObj, podExists := volumeObj.mountedPods[podName]
if !podExists {
return operationexecutor.VolumeNotMounted
}
return podObj.volumeMountStateForPod
}
// addVolume adds the given volume to the cache indicating the specified
@ -405,7 +440,7 @@ func (asw *actualStateOfWorld) addVolume(
mountedPods: make(map[volumetypes.UniquePodName]mountedPod),
pluginName: volumePlugin.GetPluginName(),
pluginIsAttachable: pluginIsAttachable,
globallyMounted: false,
deviceMountState: operationexecutor.DeviceNotMounted,
devicePath: devicePath,
}
} else {
@ -420,15 +455,15 @@ func (asw *actualStateOfWorld) addVolume(
return nil
}
func (asw *actualStateOfWorld) AddPodToVolume(
podName volumetypes.UniquePodName,
podUID types.UID,
volumeName v1.UniqueVolumeName,
mounter volume.Mounter,
blockVolumeMapper volume.BlockVolumeMapper,
outerVolumeSpecName string,
volumeGidValue string,
volumeSpec *volume.Spec) error {
func (asw *actualStateOfWorld) AddPodToVolume(markVolumeOpts operationexecutor.MarkVolumeOpts) error {
podName := markVolumeOpts.PodName
podUID := markVolumeOpts.PodUID
volumeName := markVolumeOpts.VolumeName
mounter := markVolumeOpts.Mounter
blockVolumeMapper := markVolumeOpts.BlockVolumeMapper
outerVolumeSpecName := markVolumeOpts.OuterVolumeSpecName
volumeGidValue := markVolumeOpts.VolumeGidVolume
volumeSpec := markVolumeOpts.VolumeSpec
asw.Lock()
defer asw.Unlock()
@ -442,20 +477,21 @@ func (asw *actualStateOfWorld) AddPodToVolume(
podObj, podExists := volumeObj.mountedPods[podName]
if !podExists {
podObj = mountedPod{
podName: podName,
podUID: podUID,
mounter: mounter,
blockVolumeMapper: blockVolumeMapper,
outerVolumeSpecName: outerVolumeSpecName,
volumeGidValue: volumeGidValue,
volumeSpec: volumeSpec,
podName: podName,
podUID: podUID,
mounter: mounter,
blockVolumeMapper: blockVolumeMapper,
outerVolumeSpecName: outerVolumeSpecName,
volumeGidValue: volumeGidValue,
volumeSpec: volumeSpec,
volumeMountStateForPod: markVolumeOpts.VolumeMountState,
}
}
// If pod exists, reset remountRequired value
podObj.remountRequired = false
podObj.volumeMountStateForPod = markVolumeOpts.VolumeMountState
asw.attachedVolumes[volumeName].mountedPods[podName] = podObj
return nil
}
@ -554,8 +590,8 @@ func (asw *actualStateOfWorld) MarkFSResizeRequired(
}
}
func (asw *actualStateOfWorld) SetVolumeGloballyMounted(
volumeName v1.UniqueVolumeName, globallyMounted bool, devicePath, deviceMountPath string) error {
func (asw *actualStateOfWorld) SetDeviceMountState(
volumeName v1.UniqueVolumeName, deviceMountState operationexecutor.DeviceMountState, devicePath, deviceMountPath string) error {
asw.Lock()
defer asw.Unlock()
@ -566,7 +602,7 @@ func (asw *actualStateOfWorld) SetVolumeGloballyMounted(
volumeName)
}
volumeObj.globallyMounted = globallyMounted
volumeObj.deviceMountState = deviceMountState
volumeObj.deviceMountPath = deviceMountPath
if devicePath != "" {
volumeObj.devicePath = devicePath
@ -628,6 +664,10 @@ func (asw *actualStateOfWorld) PodExistsInVolume(
podObj, podExists := volumeObj.mountedPods[podName]
if podExists {
// if volume mount was uncertain we should keep trying to mount the volume
if podObj.volumeMountStateForPod == operationexecutor.VolumeMountUncertain {
return false, volumeObj.devicePath, nil
}
if podObj.remountRequired {
return true, volumeObj.devicePath, newRemountRequiredError(volumeObj.volumeName, podObj.podName)
}
@ -668,9 +708,30 @@ func (asw *actualStateOfWorld) GetMountedVolumes() []MountedVolume {
mountedVolume := make([]MountedVolume, 0 /* len */, len(asw.attachedVolumes) /* cap */)
for _, volumeObj := range asw.attachedVolumes {
for _, podObj := range volumeObj.mountedPods {
mountedVolume = append(
mountedVolume,
getMountedVolume(&podObj, &volumeObj))
if podObj.volumeMountStateForPod == operationexecutor.VolumeMounted {
mountedVolume = append(
mountedVolume,
getMountedVolume(&podObj, &volumeObj))
}
}
}
return mountedVolume
}
// GetAllMountedVolumes returns all volumes which could be locally mounted for a pod.
func (asw *actualStateOfWorld) GetAllMountedVolumes() []MountedVolume {
asw.RLock()
defer asw.RUnlock()
mountedVolume := make([]MountedVolume, 0 /* len */, len(asw.attachedVolumes) /* cap */)
for _, volumeObj := range asw.attachedVolumes {
for _, podObj := range volumeObj.mountedPods {
if podObj.volumeMountStateForPod == operationexecutor.VolumeMounted ||
podObj.volumeMountStateForPod == operationexecutor.VolumeMountUncertain {
mountedVolume = append(
mountedVolume,
getMountedVolume(&podObj, &volumeObj))
}
}
}
@ -683,10 +744,12 @@ func (asw *actualStateOfWorld) GetMountedVolumesForPod(
defer asw.RUnlock()
mountedVolume := make([]MountedVolume, 0 /* len */, len(asw.attachedVolumes) /* cap */)
for _, volumeObj := range asw.attachedVolumes {
if podObj, podExists := volumeObj.mountedPods[podName]; podExists {
mountedVolume = append(
mountedVolume,
getMountedVolume(&podObj, &volumeObj))
for mountedPodName, podObj := range volumeObj.mountedPods {
if mountedPodName == podName && podObj.volumeMountStateForPod == operationexecutor.VolumeMounted {
mountedVolume = append(
mountedVolume,
getMountedVolume(&podObj, &volumeObj))
}
}
}
@ -699,7 +762,7 @@ func (asw *actualStateOfWorld) GetGloballyMountedVolumes() []AttachedVolume {
globallyMountedVolumes := make(
[]AttachedVolume, 0 /* len */, len(asw.attachedVolumes) /* cap */)
for _, volumeObj := range asw.attachedVolumes {
if volumeObj.globallyMounted {
if volumeObj.deviceMountState == operationexecutor.DeviceGloballyMounted {
globallyMountedVolumes = append(
globallyMountedVolumes,
asw.newAttachedVolume(&volumeObj))
@ -749,7 +812,7 @@ func (asw *actualStateOfWorld) newAttachedVolume(
DevicePath: attachedVolume.devicePath,
DeviceMountPath: attachedVolume.deviceMountPath,
PluginName: attachedVolume.pluginName},
GloballyMounted: attachedVolume.globallyMounted,
DeviceMountState: attachedVolume.deviceMountState,
}
}

View File

@ -25,6 +25,7 @@ import (
"k8s.io/kubernetes/pkg/volume"
volumetesting "k8s.io/kubernetes/pkg/volume/testing"
"k8s.io/kubernetes/pkg/volume/util"
"k8s.io/kubernetes/pkg/volume/util/operationexecutor"
volumetypes "k8s.io/kubernetes/pkg/volume/util/types"
)
@ -220,9 +221,16 @@ func Test_AddPodToVolume_Positive_ExistingVolumeNewNode(t *testing.T) {
}
// Act
err = asw.AddPodToVolume(
podName, pod.UID, generatedVolumeName, mounter, mapper, volumeSpec.Name(), "" /* volumeGidValue */, volumeSpec)
markVolumeOpts := operationexecutor.MarkVolumeOpts{
PodName: podName,
PodUID: pod.UID,
VolumeName: generatedVolumeName,
Mounter: mounter,
BlockVolumeMapper: mapper,
OuterVolumeSpecName: volumeSpec.Name(),
VolumeSpec: volumeSpec,
}
err = asw.AddPodToVolume(markVolumeOpts)
// Assert
if err != nil {
t.Fatalf("AddPodToVolume failed. Expected: <no error> Actual: <%v>", err)
@ -287,16 +295,22 @@ func Test_AddPodToVolume_Positive_ExistingVolumeExistingNode(t *testing.T) {
t.Fatalf("NewBlockVolumeMapper failed. Expected: <no error> Actual: <%v>", err)
}
err = asw.AddPodToVolume(
podName, pod.UID, generatedVolumeName, mounter, mapper, volumeSpec.Name(), "" /* volumeGidValue */, volumeSpec)
markVolumeOpts := operationexecutor.MarkVolumeOpts{
PodName: podName,
PodUID: pod.UID,
VolumeName: generatedVolumeName,
Mounter: mounter,
BlockVolumeMapper: mapper,
OuterVolumeSpecName: volumeSpec.Name(),
VolumeSpec: volumeSpec,
}
err = asw.AddPodToVolume(markVolumeOpts)
if err != nil {
t.Fatalf("AddPodToVolume failed. Expected: <no error> Actual: <%v>", err)
}
// Act
err = asw.AddPodToVolume(
podName, pod.UID, generatedVolumeName, mounter, mapper, volumeSpec.Name(), "" /* volumeGidValue */, volumeSpec)
err = asw.AddPodToVolume(markVolumeOpts)
// Assert
if err != nil {
t.Fatalf("AddPodToVolume failed. Expected: <no error> Actual: <%v>", err)
@ -388,8 +402,16 @@ func Test_AddTwoPodsToVolume_Positive(t *testing.T) {
t.Fatalf("NewBlockVolumeMapper failed. Expected: <no error> Actual: <%v>", err)
}
err = asw.AddPodToVolume(
podName1, pod1.UID, generatedVolumeName1, mounter1, mapper1, volumeSpec1.Name(), "" /* volumeGidValue */, volumeSpec1)
markVolumeOpts1 := operationexecutor.MarkVolumeOpts{
PodName: podName1,
PodUID: pod1.UID,
VolumeName: generatedVolumeName1,
Mounter: mounter1,
BlockVolumeMapper: mapper1,
OuterVolumeSpecName: volumeSpec1.Name(),
VolumeSpec: volumeSpec1,
}
err = asw.AddPodToVolume(markVolumeOpts1)
if err != nil {
t.Fatalf("AddPodToVolume failed. Expected: <no error> Actual: <%v>", err)
}
@ -406,8 +428,16 @@ func Test_AddTwoPodsToVolume_Positive(t *testing.T) {
t.Fatalf("NewBlockVolumeMapper failed. Expected: <no error> Actual: <%v>", err)
}
err = asw.AddPodToVolume(
podName2, pod2.UID, generatedVolumeName1, mounter2, mapper2, volumeSpec2.Name(), "" /* volumeGidValue */, volumeSpec2)
markVolumeOpts2 := operationexecutor.MarkVolumeOpts{
PodName: podName2,
PodUID: pod2.UID,
VolumeName: generatedVolumeName1,
Mounter: mounter2,
BlockVolumeMapper: mapper2,
OuterVolumeSpecName: volumeSpec2.Name(),
VolumeSpec: volumeSpec2,
}
err = asw.AddPodToVolume(markVolumeOpts2)
if err != nil {
t.Fatalf("AddPodToVolume failed. Expected: <no error> Actual: <%v>", err)
}
@ -421,7 +451,6 @@ func Test_AddTwoPodsToVolume_Positive(t *testing.T) {
verifyVolumeExistsWithSpecNameInVolumeAsw(t, podName2, volumeSpec2.Name(), asw)
verifyVolumeSpecNameInVolumeAsw(t, podName1, []*volume.Spec{volumeSpec1}, asw)
verifyVolumeSpecNameInVolumeAsw(t, podName2, []*volume.Spec{volumeSpec2}, asw)
}
// Calls AddPodToVolume() to add pod to empty data struct
@ -484,9 +513,16 @@ func Test_AddPodToVolume_Negative_VolumeDoesntExist(t *testing.T) {
}
// Act
err = asw.AddPodToVolume(
podName, pod.UID, volumeName, mounter, mapper, volumeSpec.Name(), "" /* volumeGidValue */, volumeSpec)
markVolumeOpts := operationexecutor.MarkVolumeOpts{
PodName: podName,
PodUID: pod.UID,
VolumeName: volumeName,
Mounter: mounter,
BlockVolumeMapper: mapper,
OuterVolumeSpecName: volumeSpec.Name(),
VolumeSpec: volumeSpec,
}
err = asw.AddPodToVolume(markVolumeOpts)
// Assert
if err == nil {
t.Fatalf("AddPodToVolume did not fail. Expected: <\"no volume with the name ... exists in the list of attached volumes\"> Actual: <no error>")
@ -556,6 +592,76 @@ func Test_MarkDeviceAsMounted_Positive_NewVolume(t *testing.T) {
verifyVolumeExistsInGloballyMountedVolumes(t, generatedVolumeName, asw)
}
func TestUncertainVolumeMounts(t *testing.T) {
// Arrange
volumePluginMgr, plugin := volumetesting.GetTestVolumePluginMgr(t)
asw := NewActualStateOfWorld("mynode" /* nodeName */, volumePluginMgr)
devicePath := "fake/device/path"
pod1 := &v1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: "pod1",
UID: "pod1uid",
},
Spec: v1.PodSpec{
Volumes: []v1.Volume{
{
Name: "volume-name-1",
VolumeSource: v1.VolumeSource{
GCEPersistentDisk: &v1.GCEPersistentDiskVolumeSource{
PDName: "fake-device1",
},
},
},
},
},
}
volumeSpec1 := &volume.Spec{Volume: &pod1.Spec.Volumes[0]}
generatedVolumeName1, err := util.GetUniqueVolumeNameFromSpec(
plugin, volumeSpec1)
require.NoError(t, err)
err = asw.MarkVolumeAsAttached(generatedVolumeName1, volumeSpec1, "" /* nodeName */, devicePath)
if err != nil {
t.Fatalf("MarkVolumeAsAttached failed. Expected: <no error> Actual: <%v>", err)
}
podName1 := util.GetUniquePodName(pod1)
mounter1, err := plugin.NewMounter(volumeSpec1, pod1, volume.VolumeOptions{})
if err != nil {
t.Fatalf("NewMounter failed. Expected: <no error> Actual: <%v>", err)
}
markVolumeOpts1 := operationexecutor.MarkVolumeOpts{
PodName: podName1,
PodUID: pod1.UID,
VolumeName: generatedVolumeName1,
Mounter: mounter1,
OuterVolumeSpecName: volumeSpec1.Name(),
VolumeSpec: volumeSpec1,
VolumeMountState: operationexecutor.VolumeMountUncertain,
}
err = asw.AddPodToVolume(markVolumeOpts1)
if err != nil {
t.Fatalf("AddPodToVolume failed. Expected: <no error> Actual: <%v>", err)
}
mountedVolumes := asw.GetMountedVolumesForPod(podName1)
volumeFound := false
for _, volume := range mountedVolumes {
if volume.InnerVolumeSpecName == volumeSpec1.Name() {
volumeFound = true
}
}
if volumeFound {
t.Fatalf("expected volume %s to be not found in asw", volumeSpec1.Name())
}
volExists, _, _ := asw.PodExistsInVolume(podName1, generatedVolumeName1)
if volExists {
t.Fatalf("expected volume %s to not exist in asw", generatedVolumeName1)
}
}
func verifyVolumeExistsInGloballyMountedVolumes(
t *testing.T, expectedVolumeName v1.UniqueVolumeName, asw ActualStateOfWorld) {
globallyMountedVolumes := asw.GetGloballyMountedVolumes()

View File

@ -37,6 +37,7 @@ go_test(
"//pkg/volume:go_default_library",
"//pkg/volume/testing:go_default_library",
"//pkg/volume/util:go_default_library",
"//pkg/volume/util/operationexecutor:go_default_library",
"//staging/src/k8s.io/api/core/v1:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/types:go_default_library",

View File

@ -27,6 +27,7 @@ import (
volumetesting "k8s.io/kubernetes/pkg/volume/testing"
"k8s.io/kubernetes/pkg/volume/util"
"k8s.io/kubernetes/pkg/volume/util/operationexecutor"
)
func TestMetricCollection(t *testing.T) {
@ -77,8 +78,17 @@ func TestMetricCollection(t *testing.T) {
t.Fatalf("MarkVolumeAsAttached failed. Expected: <no error> Actual: <%v>", err)
}
err = asw.AddPodToVolume(
podName, pod.UID, generatedVolumeName, mounter, mapper, volumeSpec.Name(), "", volumeSpec)
markVolumeOpts := operationexecutor.MarkVolumeOpts{
PodName: podName,
PodUID: pod.UID,
VolumeName: generatedVolumeName,
Mounter: mounter,
BlockVolumeMapper: mapper,
OuterVolumeSpecName: volumeSpec.Name(),
VolumeSpec: volumeSpec,
VolumeMountState: operationexecutor.VolumeMounted,
}
err = asw.AddPodToVolume(markVolumeOpts)
if err != nil {
t.Fatalf("AddPodToVolume failed. Expected: <no error> Actual: <%v>", err)
}

View File

@ -63,6 +63,7 @@ go_test(
"//pkg/volume/csimigration:go_default_library",
"//pkg/volume/testing:go_default_library",
"//pkg/volume/util:go_default_library",
"//pkg/volume/util/operationexecutor:go_default_library",
"//pkg/volume/util/types:go_default_library",
"//staging/src/k8s.io/api/core/v1:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/api/resource:go_default_library",

View File

@ -43,6 +43,7 @@ import (
"k8s.io/kubernetes/pkg/volume/csimigration"
volumetesting "k8s.io/kubernetes/pkg/volume/testing"
"k8s.io/kubernetes/pkg/volume/util"
"k8s.io/kubernetes/pkg/volume/util/operationexecutor"
"k8s.io/kubernetes/pkg/volume/util/types"
)
@ -854,8 +855,16 @@ func reconcileASW(asw cache.ActualStateOfWorld, dsw cache.DesiredStateOfWorld, t
if err != nil {
t.Fatalf("Unexpected error when MarkVolumeAsAttached: %v", err)
}
err = asw.MarkVolumeAsMounted(volumeToMount.PodName, volumeToMount.Pod.UID,
volumeToMount.VolumeName, nil, nil, volumeToMount.OuterVolumeSpecName, volumeToMount.VolumeGidValue, volumeToMount.VolumeSpec)
markVolumeOpts := operationexecutor.MarkVolumeOpts{
PodName: volumeToMount.PodName,
PodUID: volumeToMount.Pod.UID,
VolumeName: volumeToMount.VolumeName,
OuterVolumeSpecName: volumeToMount.OuterVolumeSpecName,
VolumeGidVolume: volumeToMount.VolumeGidValue,
VolumeSpec: volumeToMount.VolumeSpec,
VolumeMountState: operationexecutor.VolumeMounted,
}
err = asw.MarkVolumeAsMounted(markVolumeOpts)
if err != nil {
t.Fatalf("Unexpected error when MarkVolumeAsMounted: %v", err)
}

View File

@ -164,9 +164,21 @@ func (rc *reconciler) reconcile() {
// referenced by a pod that was deleted and is now referenced by another
// pod is unmounted from the first pod before being mounted to the new
// pod.
rc.unmountVolumes()
// Next we mount required volumes. This function could also trigger
// attach if kubelet is responsible for attaching volumes.
// If underlying PVC was resized while in-use then this function also handles volume
// resizing.
rc.mountAttachVolumes()
// Ensure devices that should be detached/unmounted are detached/unmounted.
rc.unmountDetachDevices()
}
func (rc *reconciler) unmountVolumes() {
// Ensure volumes that should be unmounted are unmounted.
for _, mountedVolume := range rc.actualStateOfWorld.GetMountedVolumes() {
for _, mountedVolume := range rc.actualStateOfWorld.GetAllMountedVolumes() {
if !rc.desiredStateOfWorld.PodExistsInVolume(mountedVolume.PodName, mountedVolume.VolumeName) {
// Volume is mounted, unmount it
klog.V(5).Infof(mountedVolume.GenerateMsgDetailed("Starting operationExecutor.UnmountVolume", ""))
@ -184,7 +196,9 @@ func (rc *reconciler) reconcile() {
}
}
}
}
func (rc *reconciler) mountAttachVolumes() {
// Ensure volumes that should be attached/mounted are attached/mounted.
for _, volumeToMount := range rc.desiredStateOfWorld.GetVolumesToMount() {
volMounted, devicePath, err := rc.actualStateOfWorld.PodExistsInVolume(volumeToMount.PodName, volumeToMount.VolumeName)
@ -274,13 +288,14 @@ func (rc *reconciler) reconcile() {
}
}
}
}
// Ensure devices that should be detached/unmounted are detached/unmounted.
func (rc *reconciler) unmountDetachDevices() {
for _, attachedVolume := range rc.actualStateOfWorld.GetUnmountedVolumes() {
// Check IsOperationPending to avoid marking a volume as detached if it's in the process of mounting.
if !rc.desiredStateOfWorld.VolumeExists(attachedVolume.VolumeName) &&
!rc.operationExecutor.IsOperationPending(attachedVolume.VolumeName, nestedpendingoperations.EmptyUniquePodName) {
if attachedVolume.GloballyMounted {
if attachedVolume.DeviceMayBeMounted() {
// Volume is globally mounted to device, unmount it
klog.V(5).Infof(attachedVolume.GenerateMsgDetailed("Starting operationExecutor.UnmountDevice", ""))
err := rc.operationExecutor.UnmountDevice(
@ -625,15 +640,18 @@ func (rc *reconciler) updateStates(volumesNeedUpdate map[v1.UniqueVolumeName]*re
klog.Errorf("Could not add volume information to actual state of world: %v", err)
continue
}
err = rc.actualStateOfWorld.MarkVolumeAsMounted(
volume.podName,
types.UID(volume.podName),
volume.volumeName,
volume.mounter,
volume.blockVolumeMapper,
volume.outerVolumeSpecName,
volume.volumeGidValue,
volume.volumeSpec)
markVolumeOpts := operationexecutor.MarkVolumeOpts{
PodName: volume.podName,
PodUID: types.UID(volume.podName),
VolumeName: volume.volumeName,
Mounter: volume.mounter,
BlockVolumeMapper: volume.blockVolumeMapper,
OuterVolumeSpecName: volume.outerVolumeSpecName,
VolumeGidVolume: volume.volumeGidValue,
VolumeSpec: volume.volumeSpec,
VolumeMountState: operationexecutor.VolumeMounted,
}
err = rc.actualStateOfWorld.MarkVolumeAsMounted(markVolumeOpts)
if err != nil {
klog.Errorf("Could not add pod to volume information to actual state of world: %v", err)
continue

View File

@ -51,9 +51,11 @@ const (
reconcilerLoopSleepDuration time.Duration = 1 * time.Nanosecond
// waitForAttachTimeout is the maximum amount of time a
// operationexecutor.Mount call will wait for a volume to be attached.
waitForAttachTimeout time.Duration = 1 * time.Second
nodeName k8stypes.NodeName = k8stypes.NodeName("mynodename")
kubeletPodsDir string = "fake-dir"
waitForAttachTimeout time.Duration = 1 * time.Second
nodeName k8stypes.NodeName = k8stypes.NodeName("mynodename")
kubeletPodsDir string = "fake-dir"
testOperationBackOffDuration time.Duration = 100 * time.Millisecond
reconcilerSyncWaitDuration time.Duration = 10 * time.Second
)
func hasAddedPods() bool { return true }
@ -336,7 +338,7 @@ func Test_Run_Positive_VolumeAttachMountUnmountDetach(t *testing.T) {
// Act
dsw.DeletePodFromVolume(podName, generatedVolumeName)
waitForDetach(t, fakePlugin, generatedVolumeName, asw)
waitForDetach(t, generatedVolumeName, asw)
// Assert
assert.NoError(t, volumetesting.VerifyTearDownCallCount(
@ -428,7 +430,7 @@ func Test_Run_Positive_VolumeUnmountControllerAttachEnabled(t *testing.T) {
// Act
dsw.DeletePodFromVolume(podName, generatedVolumeName)
waitForDetach(t, fakePlugin, generatedVolumeName, asw)
waitForDetach(t, generatedVolumeName, asw)
// Assert
assert.NoError(t, volumetesting.VerifyTearDownCallCount(
@ -739,7 +741,7 @@ func Test_Run_Positive_BlockVolumeAttachMapUnmapDetach(t *testing.T) {
// Act
dsw.DeletePodFromVolume(podName, generatedVolumeName)
waitForDetach(t, fakePlugin, generatedVolumeName, asw)
waitForDetach(t, generatedVolumeName, asw)
// Assert
assert.NoError(t, volumetesting.VerifyTearDownDeviceCallCount(
@ -855,7 +857,7 @@ func Test_Run_Positive_VolumeUnmapControllerAttachEnabled(t *testing.T) {
// Act
dsw.DeletePodFromVolume(podName, generatedVolumeName)
waitForDetach(t, fakePlugin, generatedVolumeName, asw)
waitForDetach(t, generatedVolumeName, asw)
// Assert
assert.NoError(t, volumetesting.VerifyTearDownDeviceCallCount(
@ -1134,7 +1136,7 @@ func Test_Run_Positive_VolumeFSResizeControllerAttachEnabled(t *testing.T) {
// resize operation and clear the fsResizeRequired flag for volume.
go reconciler.Run(wait.NeverStop)
waitErr := retryWithExponentialBackOff(500*time.Millisecond, func() (done bool, err error) {
waitErr := retryWithExponentialBackOff(testOperationBackOffDuration, func() (done bool, err error) {
mounted, _, err := asw.PodExistsInVolume(podName, volumeName)
return mounted && err == nil, nil
})
@ -1145,13 +1147,385 @@ func Test_Run_Positive_VolumeFSResizeControllerAttachEnabled(t *testing.T) {
}
}
func Test_UncertainDeviceGlobalMounts(t *testing.T) {
fsMode := v1.PersistentVolumeFilesystem
var tests = []struct {
name string
deviceState operationexecutor.DeviceMountState
unmountDeviceCallCount int
volumeName string
supportRemount bool
}{
{
name: "timed out operations should result in device marked as uncertain",
deviceState: operationexecutor.DeviceMountUncertain,
unmountDeviceCallCount: 1,
volumeName: volumetesting.TimeoutOnMountDeviceVolumeName,
},
{
name: "failed operation should result in not-mounted device",
deviceState: operationexecutor.DeviceNotMounted,
unmountDeviceCallCount: 0,
volumeName: volumetesting.FailMountDeviceVolumeName,
},
{
name: "timeout followed by failed operation should result in non-mounted device",
deviceState: operationexecutor.DeviceNotMounted,
unmountDeviceCallCount: 0,
volumeName: volumetesting.TimeoutAndFailOnMountDeviceVolumeName,
},
{
name: "success followed by timeout operation should result in mounted device",
deviceState: operationexecutor.DeviceGloballyMounted,
unmountDeviceCallCount: 1,
volumeName: volumetesting.SuccessAndTimeoutDeviceName,
supportRemount: true,
},
{
name: "success followed by failed operation should result in mounted device",
deviceState: operationexecutor.DeviceGloballyMounted,
unmountDeviceCallCount: 1,
volumeName: volumetesting.SuccessAndFailOnMountDeviceName,
supportRemount: true,
},
}
for _, tc := range tests {
t.Run(tc.name, func(t *testing.T) {
pv := &v1.PersistentVolume{
ObjectMeta: metav1.ObjectMeta{
Name: tc.volumeName,
UID: "pvuid",
},
Spec: v1.PersistentVolumeSpec{
ClaimRef: &v1.ObjectReference{Name: "pvc"},
VolumeMode: &fsMode,
},
}
pvc := &v1.PersistentVolumeClaim{
ObjectMeta: metav1.ObjectMeta{
Name: "pvc",
UID: "pvcuid",
},
Spec: v1.PersistentVolumeClaimSpec{
VolumeName: tc.volumeName,
},
}
pod := &v1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: "pod1",
UID: "pod1uid",
},
Spec: v1.PodSpec{
Volumes: []v1.Volume{
{
Name: "volume-name",
VolumeSource: v1.VolumeSource{
PersistentVolumeClaim: &v1.PersistentVolumeClaimVolumeSource{
ClaimName: pvc.Name,
},
},
},
},
},
}
volumePluginMgr, fakePlugin := volumetesting.GetTestVolumePluginMgr(t)
fakePlugin.SupportsRemount = tc.supportRemount
dsw := cache.NewDesiredStateOfWorld(volumePluginMgr)
asw := cache.NewActualStateOfWorld(nodeName, volumePluginMgr)
kubeClient := createtestClientWithPVPVC(pv, pvc, v1.AttachedVolume{
Name: v1.UniqueVolumeName(fmt.Sprintf("fake-plugin/%s", tc.volumeName)),
DevicePath: "fake/path",
})
fakeRecorder := &record.FakeRecorder{}
fakeHandler := volumetesting.NewBlockVolumePathHandler()
oex := operationexecutor.NewOperationExecutor(operationexecutor.NewOperationGenerator(
kubeClient,
volumePluginMgr,
fakeRecorder,
false, /* checkNodeCapabilitiesBeforeMount */
fakeHandler))
reconciler := NewReconciler(
kubeClient,
true, /* controllerAttachDetachEnabled */
reconcilerLoopSleepDuration,
waitForAttachTimeout,
nodeName,
dsw,
asw,
hasAddedPods,
oex,
&mount.FakeMounter{},
hostutil.NewFakeHostUtil(nil),
volumePluginMgr,
kubeletPodsDir)
volumeSpec := &volume.Spec{PersistentVolume: pv}
podName := util.GetUniquePodName(pod)
volumeName, err := dsw.AddPodToVolume(
podName, pod, volumeSpec, volumeSpec.Name(), "" /* volumeGidValue */)
// Assert
if err != nil {
t.Fatalf("AddPodToVolume failed. Expected: <no error> Actual: <%v>", err)
}
dsw.MarkVolumesReportedInUse([]v1.UniqueVolumeName{volumeName})
// Start the reconciler to fill ASW.
stopChan, stoppedChan := make(chan struct{}), make(chan struct{})
go func() {
reconciler.Run(stopChan)
close(stoppedChan)
}()
waitForVolumeToExistInASW(t, volumeName, asw)
if tc.volumeName == volumetesting.TimeoutAndFailOnMountDeviceVolumeName {
// Wait upto 10s for reconciler to catchup
time.Sleep(reconcilerSyncWaitDuration)
}
if tc.volumeName == volumetesting.SuccessAndFailOnMountDeviceName ||
tc.volumeName == volumetesting.SuccessAndTimeoutDeviceName {
// wait for mount and then break it via remount
waitForMount(t, fakePlugin, volumeName, asw)
asw.MarkRemountRequired(podName)
time.Sleep(reconcilerSyncWaitDuration)
}
if tc.deviceState == operationexecutor.DeviceMountUncertain {
waitForUncertainGlobalMount(t, volumeName, asw)
}
if tc.deviceState == operationexecutor.DeviceGloballyMounted {
waitForMount(t, fakePlugin, volumeName, asw)
}
dsw.DeletePodFromVolume(podName, volumeName)
waitForDetach(t, volumeName, asw)
err = volumetesting.VerifyUnmountDeviceCallCount(tc.unmountDeviceCallCount, fakePlugin)
if err != nil {
t.Errorf("Error verifying UnMountDeviceCallCount: %v", err)
}
})
}
}
func Test_UncertainVolumeMountState(t *testing.T) {
fsMode := v1.PersistentVolumeFilesystem
var tests = []struct {
name string
volumeState operationexecutor.VolumeMountState
unmountDeviceCallCount int
unmountVolumeCount int
volumeName string
supportRemount bool
}{
{
name: "timed out operations should result in volume marked as uncertain",
volumeState: operationexecutor.VolumeMountUncertain,
unmountDeviceCallCount: 1,
unmountVolumeCount: 1,
volumeName: volumetesting.TimeoutOnSetupVolumeName,
},
{
name: "failed operation should result in not-mounted volume",
volumeState: operationexecutor.VolumeNotMounted,
unmountDeviceCallCount: 0,
unmountVolumeCount: 0,
volumeName: volumetesting.FailOnSetupVolumeName,
},
{
name: "timeout followed by failed operation should result in non-mounted volume",
volumeState: operationexecutor.VolumeNotMounted,
unmountDeviceCallCount: 0,
unmountVolumeCount: 0,
volumeName: volumetesting.TimeoutAndFailOnSetupVolumeName,
},
{
name: "success followed by timeout operation should result in mounted volume",
volumeState: operationexecutor.VolumeMounted,
unmountDeviceCallCount: 1,
unmountVolumeCount: 1,
volumeName: volumetesting.SuccessAndTimeoutSetupVolumeName,
supportRemount: true,
},
{
name: "success followed by failed operation should result in mounted volume",
volumeState: operationexecutor.VolumeMounted,
unmountDeviceCallCount: 1,
unmountVolumeCount: 1,
volumeName: volumetesting.SuccessAndFailOnSetupVolumeName,
supportRemount: true,
},
}
for _, tc := range tests {
t.Run(tc.name, func(t *testing.T) {
pv := &v1.PersistentVolume{
ObjectMeta: metav1.ObjectMeta{
Name: tc.volumeName,
UID: "pvuid",
},
Spec: v1.PersistentVolumeSpec{
ClaimRef: &v1.ObjectReference{Name: "pvc"},
VolumeMode: &fsMode,
},
}
pvc := &v1.PersistentVolumeClaim{
ObjectMeta: metav1.ObjectMeta{
Name: "pvc",
UID: "pvcuid",
},
Spec: v1.PersistentVolumeClaimSpec{
VolumeName: tc.volumeName,
},
}
pod := &v1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: "pod1",
UID: "pod1uid",
},
Spec: v1.PodSpec{
Volumes: []v1.Volume{
{
Name: "volume-name",
VolumeSource: v1.VolumeSource{
PersistentVolumeClaim: &v1.PersistentVolumeClaimVolumeSource{
ClaimName: pvc.Name,
},
},
},
},
},
}
volumePluginMgr, fakePlugin := volumetesting.GetTestVolumePluginMgr(t)
fakePlugin.SupportsRemount = tc.supportRemount
dsw := cache.NewDesiredStateOfWorld(volumePluginMgr)
asw := cache.NewActualStateOfWorld(nodeName, volumePluginMgr)
kubeClient := createtestClientWithPVPVC(pv, pvc, v1.AttachedVolume{
Name: v1.UniqueVolumeName(fmt.Sprintf("fake-plugin/%s", tc.volumeName)),
DevicePath: "fake/path",
})
fakeRecorder := &record.FakeRecorder{}
fakeHandler := volumetesting.NewBlockVolumePathHandler()
oex := operationexecutor.NewOperationExecutor(operationexecutor.NewOperationGenerator(
kubeClient,
volumePluginMgr,
fakeRecorder,
false, /* checkNodeCapabilitiesBeforeMount */
fakeHandler))
reconciler := NewReconciler(
kubeClient,
true, /* controllerAttachDetachEnabled */
reconcilerLoopSleepDuration,
waitForAttachTimeout,
nodeName,
dsw,
asw,
hasAddedPods,
oex,
&mount.FakeMounter{},
hostutil.NewFakeHostUtil(nil),
volumePluginMgr,
kubeletPodsDir)
volumeSpec := &volume.Spec{PersistentVolume: pv}
podName := util.GetUniquePodName(pod)
volumeName, err := dsw.AddPodToVolume(
podName, pod, volumeSpec, volumeSpec.Name(), "" /* volumeGidValue */)
// Assert
if err != nil {
t.Fatalf("AddPodToVolume failed. Expected: <no error> Actual: <%v>", err)
}
dsw.MarkVolumesReportedInUse([]v1.UniqueVolumeName{volumeName})
// Start the reconciler to fill ASW.
stopChan, stoppedChan := make(chan struct{}), make(chan struct{})
go func() {
reconciler.Run(stopChan)
close(stoppedChan)
}()
waitForVolumeToExistInASW(t, volumeName, asw)
if tc.volumeName == volumetesting.TimeoutAndFailOnSetupVolumeName {
// Wait upto 10s for reconciler to catchup
time.Sleep(reconcilerSyncWaitDuration)
}
if tc.volumeName == volumetesting.SuccessAndFailOnSetupVolumeName ||
tc.volumeName == volumetesting.SuccessAndTimeoutSetupVolumeName {
// wait for mount and then break it via remount
waitForMount(t, fakePlugin, volumeName, asw)
asw.MarkRemountRequired(podName)
time.Sleep(reconcilerSyncWaitDuration)
}
if tc.volumeState == operationexecutor.VolumeMountUncertain {
waitForUncertainPodMount(t, volumeName, asw)
}
if tc.volumeState == operationexecutor.VolumeMounted {
waitForMount(t, fakePlugin, volumeName, asw)
}
dsw.DeletePodFromVolume(podName, volumeName)
waitForDetach(t, volumeName, asw)
volumetesting.VerifyUnmountDeviceCallCount(tc.unmountDeviceCallCount, fakePlugin)
volumetesting.VerifyTearDownCallCount(tc.unmountVolumeCount, fakePlugin)
})
}
}
func waitForUncertainGlobalMount(t *testing.T, volumeName v1.UniqueVolumeName, asw cache.ActualStateOfWorld) {
// check if volume is globally mounted in uncertain state
err := retryWithExponentialBackOff(
testOperationBackOffDuration,
func() (bool, error) {
unmountedVolumes := asw.GetUnmountedVolumes()
for _, v := range unmountedVolumes {
if v.VolumeName == volumeName && v.DeviceMountState == operationexecutor.DeviceMountUncertain {
return true, nil
}
}
return false, nil
},
)
if err != nil {
t.Fatalf("expected volumes %s to be mounted in uncertain state globally", volumeName)
}
}
func waitForUncertainPodMount(t *testing.T, volumeName v1.UniqueVolumeName, asw cache.ActualStateOfWorld) {
// check if volume is locally pod mounted in uncertain state
err := retryWithExponentialBackOff(
testOperationBackOffDuration,
func() (bool, error) {
allMountedVolumes := asw.GetAllMountedVolumes()
for _, v := range allMountedVolumes {
if v.VolumeName == volumeName {
return true, nil
}
}
return false, nil
},
)
if err != nil {
t.Fatalf("expected volumes %s to be mounted in uncertain state for pod", volumeName)
}
}
func waitForMount(
t *testing.T,
fakePlugin *volumetesting.FakeVolumePlugin,
volumeName v1.UniqueVolumeName,
asw cache.ActualStateOfWorld) {
err := retryWithExponentialBackOff(
time.Duration(500*time.Millisecond),
testOperationBackOffDuration,
func() (bool, error) {
mountedVolumes := asw.GetMountedVolumes()
for _, mountedVolume := range mountedVolumes {
@ -1169,13 +1543,27 @@ func waitForMount(
}
}
func waitForVolumeToExistInASW(t *testing.T, volumeName v1.UniqueVolumeName, asw cache.ActualStateOfWorld) {
err := retryWithExponentialBackOff(
testOperationBackOffDuration,
func() (bool, error) {
if asw.VolumeExists(volumeName) {
return true, nil
}
return false, nil
},
)
if err != nil {
t.Fatalf("Timed out waiting for volume %q to be exist in asw.", volumeName)
}
}
func waitForDetach(
t *testing.T,
fakePlugin *volumetesting.FakeVolumePlugin,
volumeName v1.UniqueVolumeName,
asw cache.ActualStateOfWorld) {
err := retryWithExponentialBackOff(
time.Duration(500*time.Millisecond),
testOperationBackOffDuration,
func() (bool, error) {
if asw.VolumeExists(volumeName) {
return false, nil

View File

@ -430,6 +430,7 @@ func (vm *volumeManager) verifyVolumesMountedFunc(podName types.UniquePodName, e
// getUnmountedVolumes fetches the current list of mounted volumes from
// the actual state of the world, and uses it to process the list of
// expectedVolumes. It returns a list of unmounted volumes.
// The list also includes volume that may be mounted in uncertain state.
func (vm *volumeManager) getUnmountedVolumes(podName types.UniquePodName, expectedVolumes []string) []string {
mountedVolumes := sets.NewString()
for _, mountedVolume := range vm.actualStateOfWorld.GetMountedVolumesForPod(podName) {

View File

@ -20,6 +20,7 @@ go_library(
"//pkg/volume:go_default_library",
"//pkg/volume/csi/nodeinfomanager:go_default_library",
"//pkg/volume/util:go_default_library",
"//pkg/volume/util/types:go_default_library",
"//staging/src/k8s.io/api/core/v1:go_default_library",
"//staging/src/k8s.io/api/storage/v1:go_default_library",
"//staging/src/k8s.io/api/storage/v1beta1:go_default_library",
@ -37,6 +38,8 @@ go_library(
"//staging/src/k8s.io/csi-translation-lib/plugins:go_default_library",
"//vendor/github.com/container-storage-interface/spec/lib/go/csi:go_default_library",
"//vendor/google.golang.org/grpc:go_default_library",
"//vendor/google.golang.org/grpc/codes:go_default_library",
"//vendor/google.golang.org/grpc/status:go_default_library",
"//vendor/k8s.io/klog:go_default_library",
"//vendor/k8s.io/utils/strings:go_default_library",
],
@ -63,6 +66,7 @@ go_test(
"//pkg/volume/csi/fake:go_default_library",
"//pkg/volume/testing:go_default_library",
"//pkg/volume/util:go_default_library",
"//pkg/volume/util/types:go_default_library",
"//staging/src/k8s.io/api/core/v1:go_default_library",
"//staging/src/k8s.io/api/storage/v1:go_default_library",
"//staging/src/k8s.io/api/storage/v1beta1:go_default_library",

View File

@ -36,6 +36,7 @@ import (
"k8s.io/apimachinery/pkg/watch"
"k8s.io/client-go/kubernetes"
"k8s.io/kubernetes/pkg/volume"
volumetypes "k8s.io/kubernetes/pkg/volume/util/types"
)
const (
@ -219,7 +220,7 @@ func (c *csiAttacher) GetDeviceMountPath(spec *volume.Spec) (string, error) {
return deviceMountPath, nil
}
func (c *csiAttacher) MountDevice(spec *volume.Spec, devicePath string, deviceMountPath string) (err error) {
func (c *csiAttacher) MountDevice(spec *volume.Spec, devicePath string, deviceMountPath string) error {
klog.V(4).Infof(log("attacher.MountDevice(%s, %s)", devicePath, deviceMountPath))
if deviceMountPath == "" {
@ -246,6 +247,43 @@ func (c *csiAttacher) MountDevice(spec *volume.Spec, devicePath string, deviceMo
return errors.New(log("attacher.MountDevice failed to get CSIPersistentVolumeSource: %v", err))
}
// lets check if node/unstage is supported
if c.csiClient == nil {
c.csiClient, err = newCsiDriverClient(csiDriverName(csiSource.Driver))
if err != nil {
return errors.New(log("attacher.MountDevice failed to create newCsiDriverClient: %v", err))
}
}
csi := c.csiClient
ctx, cancel := context.WithTimeout(context.Background(), csiTimeout)
defer cancel()
// Check whether "STAGE_UNSTAGE_VOLUME" is set
stageUnstageSet, err := csi.NodeSupportsStageUnstage(ctx)
if err != nil {
return err
}
// Get secrets and publish context required for mountDevice
nodeName := string(c.plugin.host.GetNodeName())
publishContext, err := c.plugin.getPublishContext(c.k8s, csiSource.VolumeHandle, csiSource.Driver, nodeName)
if err != nil {
return volumetypes.NewTransientOperationFailure(err.Error())
}
nodeStageSecrets := map[string]string{}
// we only require secrets if csiSource has them and volume has NodeStage capability
if csiSource.NodeStageSecretRef != nil && stageUnstageSet {
nodeStageSecrets, err = getCredentialsFromSecret(c.k8s, csiSource.NodeStageSecretRef)
if err != nil {
err = fmt.Errorf("fetching NodeStageSecretRef %s/%s failed: %v",
csiSource.NodeStageSecretRef.Namespace, csiSource.NodeStageSecretRef.Name, err)
// if we failed to fetch secret then that could be a transient error
return volumetypes.NewTransientOperationFailure(err.Error())
}
}
// Store volume metadata for UnmountDevice. Keep it around even if the
// driver does not support NodeStage, UnmountDevice still needs it.
if err = os.MkdirAll(deviceMountPath, 0750); err != nil {
@ -265,7 +303,9 @@ func (c *csiAttacher) MountDevice(spec *volume.Spec, devicePath string, deviceMo
return err
}
defer func() {
if err != nil {
// Only if there was an error and volume operation was considered
// finished, we should remove the directory.
if err != nil && volumetypes.IsOperationFinishedError(err) {
// clean up metadata
klog.Errorf(log("attacher.MountDevice failed: %v", err))
if err := removeMountDir(c.plugin, deviceMountPath); err != nil {
@ -274,41 +314,12 @@ func (c *csiAttacher) MountDevice(spec *volume.Spec, devicePath string, deviceMo
}
}()
if c.csiClient == nil {
c.csiClient, err = newCsiDriverClient(csiDriverName(csiSource.Driver))
if err != nil {
return errors.New(log("attacher.MountDevice failed to create newCsiDriverClient: %v", err))
}
}
csi := c.csiClient
ctx, cancel := context.WithTimeout(context.Background(), csiTimeout)
defer cancel()
// Check whether "STAGE_UNSTAGE_VOLUME" is set
stageUnstageSet, err := csi.NodeSupportsStageUnstage(ctx)
if err != nil {
return err
}
if !stageUnstageSet {
klog.Infof(log("attacher.MountDevice STAGE_UNSTAGE_VOLUME capability not set. Skipping MountDevice..."))
// defer does *not* remove the metadata file and it's correct - UnmountDevice needs it there.
return nil
}
// Start MountDevice
nodeName := string(c.plugin.host.GetNodeName())
publishContext, err := c.plugin.getPublishContext(c.k8s, csiSource.VolumeHandle, csiSource.Driver, nodeName)
nodeStageSecrets := map[string]string{}
if csiSource.NodeStageSecretRef != nil {
nodeStageSecrets, err = getCredentialsFromSecret(c.k8s, csiSource.NodeStageSecretRef)
if err != nil {
err = fmt.Errorf("fetching NodeStageSecretRef %s/%s failed: %v",
csiSource.NodeStageSecretRef.Namespace, csiSource.NodeStageSecretRef.Name, err)
return err
}
}
//TODO (vladimirvivien) implement better AccessModes mapping between k8s and CSI
accessMode := v1.ReadWriteOnce
if spec.PersistentVolume.Spec.AccessModes != nil {
@ -336,7 +347,7 @@ func (c *csiAttacher) MountDevice(spec *volume.Spec, devicePath string, deviceMo
}
klog.V(4).Infof(log("attacher.MountDevice successfully requested NodeStageVolume [%s]", deviceMountPath))
return nil
return err
}
var _ volume.Detacher = &csiAttacher{}

View File

@ -44,7 +44,9 @@ import (
featuregatetesting "k8s.io/component-base/featuregate/testing"
"k8s.io/kubernetes/pkg/features"
"k8s.io/kubernetes/pkg/volume"
fakecsi "k8s.io/kubernetes/pkg/volume/csi/fake"
volumetest "k8s.io/kubernetes/pkg/volume/testing"
volumetypes "k8s.io/kubernetes/pkg/volume/util/types"
)
var (
@ -1054,73 +1056,107 @@ func TestAttacherGetDeviceMountPath(t *testing.T) {
func TestAttacherMountDevice(t *testing.T) {
pvName := "test-pv"
nonFinalError := volumetypes.NewUncertainProgressError("")
transientError := volumetypes.NewTransientOperationFailure("")
testCases := []struct {
testName string
volName string
devicePath string
deviceMountPath string
stageUnstageSet bool
shouldFail bool
spec *volume.Spec
testName string
volName string
devicePath string
deviceMountPath string
stageUnstageSet bool
shouldFail bool
createAttachment bool
exitError error
spec *volume.Spec
}{
{
testName: "normal PV",
volName: "test-vol1",
devicePath: "path1",
deviceMountPath: "path2",
stageUnstageSet: true,
spec: volume.NewSpecFromPersistentVolume(makeTestPV(pvName, 10, testDriver, "test-vol1"), false),
testName: "normal PV",
volName: "test-vol1",
devicePath: "path1",
deviceMountPath: "path2",
stageUnstageSet: true,
createAttachment: true,
spec: volume.NewSpecFromPersistentVolume(makeTestPV(pvName, 10, testDriver, "test-vol1"), false),
},
{
testName: "normal PV with mount options",
volName: "test-vol1",
devicePath: "path1",
deviceMountPath: "path2",
stageUnstageSet: true,
spec: volume.NewSpecFromPersistentVolume(makeTestPVWithMountOptions(pvName, 10, testDriver, "test-vol1", []string{"test-op"}), false),
testName: "normal PV with mount options",
volName: "test-vol1",
devicePath: "path1",
deviceMountPath: "path2",
stageUnstageSet: true,
createAttachment: true,
spec: volume.NewSpecFromPersistentVolume(makeTestPVWithMountOptions(pvName, 10, testDriver, "test-vol1", []string{"test-op"}), false),
},
{
testName: "no vol name",
volName: "",
devicePath: "path1",
deviceMountPath: "path2",
stageUnstageSet: true,
shouldFail: true,
spec: volume.NewSpecFromPersistentVolume(makeTestPV(pvName, 10, testDriver, ""), false),
testName: "normal PV but with missing attachment should result in no-change",
volName: "test-vol1",
devicePath: "path1",
deviceMountPath: "path2",
stageUnstageSet: true,
createAttachment: false,
shouldFail: true,
exitError: transientError,
spec: volume.NewSpecFromPersistentVolume(makeTestPVWithMountOptions(pvName, 10, testDriver, "test-vol1", []string{"test-op"}), false),
},
{
testName: "no device path",
volName: "test-vol1",
devicePath: "",
deviceMountPath: "path2",
stageUnstageSet: true,
shouldFail: false,
spec: volume.NewSpecFromPersistentVolume(makeTestPV(pvName, 10, testDriver, "test-vol1"), false),
testName: "no vol name",
volName: "",
devicePath: "path1",
deviceMountPath: "path2",
stageUnstageSet: true,
shouldFail: true,
createAttachment: true,
spec: volume.NewSpecFromPersistentVolume(makeTestPV(pvName, 10, testDriver, ""), false),
},
{
testName: "no device mount path",
volName: "test-vol1",
devicePath: "path1",
deviceMountPath: "",
stageUnstageSet: true,
shouldFail: true,
spec: volume.NewSpecFromPersistentVolume(makeTestPV(pvName, 10, testDriver, "test-vol1"), false),
testName: "no device path",
volName: "test-vol1",
devicePath: "",
deviceMountPath: "path2",
stageUnstageSet: true,
shouldFail: false,
createAttachment: true,
spec: volume.NewSpecFromPersistentVolume(makeTestPV(pvName, 10, testDriver, "test-vol1"), false),
},
{
testName: "stage_unstage cap not set",
volName: "test-vol1",
devicePath: "path1",
deviceMountPath: "path2",
stageUnstageSet: false,
spec: volume.NewSpecFromPersistentVolume(makeTestPV(pvName, 10, testDriver, "test-vol1"), false),
testName: "no device mount path",
volName: "test-vol1",
devicePath: "path1",
deviceMountPath: "",
stageUnstageSet: true,
shouldFail: true,
createAttachment: true,
spec: volume.NewSpecFromPersistentVolume(makeTestPV(pvName, 10, testDriver, "test-vol1"), false),
},
{
testName: "failure with volume source",
volName: "test-vol1",
devicePath: "path1",
deviceMountPath: "path2",
shouldFail: true,
spec: volume.NewSpecFromVolume(makeTestVol(pvName, testDriver)),
testName: "stage_unstage cap not set",
volName: "test-vol1",
devicePath: "path1",
deviceMountPath: "path2",
stageUnstageSet: false,
createAttachment: true,
spec: volume.NewSpecFromPersistentVolume(makeTestPV(pvName, 10, testDriver, "test-vol1"), false),
},
{
testName: "failure with volume source",
volName: "test-vol1",
devicePath: "path1",
deviceMountPath: "path2",
shouldFail: true,
createAttachment: true,
spec: volume.NewSpecFromVolume(makeTestVol(pvName, testDriver)),
},
{
testName: "pv with nodestage timeout should result in in-progress device",
volName: fakecsi.NodeStageTimeOut_VolumeID,
devicePath: "path1",
deviceMountPath: "path2",
stageUnstageSet: true,
createAttachment: true,
spec: volume.NewSpecFromPersistentVolume(makeTestPV(pvName, 10, testDriver, fakecsi.NodeStageTimeOut_VolumeID), false),
exitError: nonFinalError,
shouldFail: true,
},
}
@ -1146,18 +1182,20 @@ func TestAttacherMountDevice(t *testing.T) {
nodeName := string(csiAttacher.plugin.host.GetNodeName())
attachID := getAttachmentName(tc.volName, testDriver, nodeName)
// Set up volume attachment
attachment := makeTestAttachment(attachID, nodeName, pvName)
_, err := csiAttacher.k8s.StorageV1().VolumeAttachments().Create(attachment)
if err != nil {
t.Fatalf("failed to attach: %v", err)
if tc.createAttachment {
// Set up volume attachment
attachment := makeTestAttachment(attachID, nodeName, pvName)
_, err := csiAttacher.k8s.StorageV1().VolumeAttachments().Create(attachment)
if err != nil {
t.Fatalf("failed to attach: %v", err)
}
go func() {
fakeWatcher.Delete(attachment)
}()
}
go func() {
fakeWatcher.Delete(attachment)
}()
// Run
err = csiAttacher.MountDevice(tc.spec, tc.devicePath, tc.deviceMountPath)
err := csiAttacher.MountDevice(tc.spec, tc.devicePath, tc.deviceMountPath)
// Verify
if err != nil {
@ -1170,6 +1208,10 @@ func TestAttacherMountDevice(t *testing.T) {
t.Errorf("test should fail, but no error occurred")
}
if tc.exitError != nil && reflect.TypeOf(tc.exitError) != reflect.TypeOf(err) {
t.Fatalf("expected exitError: %v got: %v", tc.exitError, err)
}
// Verify call goes through all the way
numStaged := 1
if !tc.stageUnstageSet {

View File

@ -28,11 +28,14 @@ import (
csipbv1 "github.com/container-storage-interface/spec/lib/go/csi"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
api "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/resource"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/klog"
"k8s.io/kubernetes/pkg/volume"
volumetypes "k8s.io/kubernetes/pkg/volume/util/types"
)
type csiClient interface {
@ -213,6 +216,7 @@ func (c *csiDriverClient) NodePublishVolume(
if targetPath == "" {
return errors.New("missing target path")
}
if c.nodeV1ClientCreator == nil {
return errors.New("failed to call NodePublishVolume. nodeV1ClientCreator is nil")
@ -255,6 +259,9 @@ func (c *csiDriverClient) NodePublishVolume(
}
_, err = nodeClient.NodePublishVolume(ctx, req)
if err != nil && !isFinalError(err) {
return volumetypes.NewUncertainProgressError(err.Error())
}
return err
}
@ -374,6 +381,9 @@ func (c *csiDriverClient) NodeStageVolume(ctx context.Context,
}
_, err = nodeClient.NodeStageVolume(ctx, req)
if err != nil && !isFinalError(err) {
return volumetypes.NewUncertainProgressError(err.Error())
}
return err
}
@ -613,3 +623,27 @@ func (c *csiDriverClient) NodeGetVolumeStats(ctx context.Context, volID string,
}
return metrics, nil
}
func isFinalError(err error) bool {
// Sources:
// https://github.com/grpc/grpc/blob/master/doc/statuscodes.md
// https://github.com/container-storage-interface/spec/blob/master/spec.md
st, ok := status.FromError(err)
if !ok {
// This is not gRPC error. The operation must have failed before gRPC
// method was called, otherwise we would get gRPC error.
// We don't know if any previous volume operation is in progress, be on the safe side.
return false
}
switch st.Code() {
case codes.Canceled, // gRPC: Client Application cancelled the request
codes.DeadlineExceeded, // gRPC: Timeout
codes.Unavailable, // gRPC: Server shutting down, TCP connection broken - previous volume operation may be still in progress.
codes.ResourceExhausted, // gRPC: Server temporarily out of resources - previous volume operation may be still in progress.
codes.Aborted: // CSI: Operation pending for volume
return false
}
// All other errors mean that operation either did not
// even start or failed. It is for sure not in progress.
return true
}

View File

@ -28,6 +28,7 @@ import (
"k8s.io/apimachinery/pkg/api/resource"
"k8s.io/kubernetes/pkg/volume"
"k8s.io/kubernetes/pkg/volume/csi/fake"
volumetypes "k8s.io/kubernetes/pkg/volume/util/types"
)
type fakeCsiDriverClient struct {
@ -156,6 +157,9 @@ func (c *fakeCsiDriverClient) NodePublishVolume(
}
_, err := c.nodeClient.NodePublishVolume(ctx, req)
if err != nil && !isFinalError(err) {
return volumetypes.NewUncertainProgressError(err.Error())
}
return err
}
@ -201,6 +205,9 @@ func (c *fakeCsiDriverClient) NodeStageVolume(ctx context.Context,
}
_, err := c.nodeClient.NodeStageVolume(ctx, req)
if err != nil && !isFinalError(err) {
return volumetypes.NewUncertainProgressError(err.Error())
}
return err
}

View File

@ -36,6 +36,7 @@ import (
"k8s.io/client-go/kubernetes"
"k8s.io/kubernetes/pkg/features"
"k8s.io/kubernetes/pkg/volume"
volumetypes "k8s.io/kubernetes/pkg/volume/util/types"
utilstrings "k8s.io/utils/strings"
)
@ -117,7 +118,8 @@ func (c *csiMountMgr) SetUpAt(dir string, mounterArgs volume.MounterArgs) error
csi, err := c.csiClientGetter.Get()
if err != nil {
return errors.New(log("mounter.SetUpAt failed to get CSI client: %v", err))
return volumetypes.NewTransientOperationFailure(log("mounter.SetUpAt failed to get CSI client: %v", err))
}
ctx, cancel := context.WithTimeout(context.Background(), csiTimeout)
defer cancel()
@ -199,7 +201,8 @@ func (c *csiMountMgr) SetUpAt(dir string, mounterArgs volume.MounterArgs) error
nodeName := string(c.plugin.host.GetNodeName())
c.publishContext, err = c.plugin.getPublishContext(c.k8s, volumeHandle, string(driverName), nodeName)
if err != nil {
return err
// we could have a transient error associated with fetching publish context
return volumetypes.NewTransientOperationFailure(log("mounter.SetUpAt failed to fetch publishContext: %v", err))
}
publishContext = c.publishContext
}
@ -218,8 +221,8 @@ func (c *csiMountMgr) SetUpAt(dir string, mounterArgs volume.MounterArgs) error
if secretRef != nil {
nodePublishSecrets, err = getCredentialsFromSecret(c.k8s, secretRef)
if err != nil {
return fmt.Errorf("fetching NodePublishSecretRef %s/%s failed: %v",
secretRef.Namespace, secretRef.Name, err)
return volumetypes.NewTransientOperationFailure(fmt.Sprintf("fetching NodePublishSecretRef %s/%s failed: %v",
secretRef.Namespace, secretRef.Name, err))
}
}
@ -227,7 +230,7 @@ func (c *csiMountMgr) SetUpAt(dir string, mounterArgs volume.MounterArgs) error
// Inject pod information into volume_attributes
podAttrs, err := c.podAttributes()
if err != nil {
return errors.New(log("mounter.SetUpAt failed to assemble volume attributes: %v", err))
return volumetypes.NewTransientOperationFailure(log("mounter.SetUpAt failed to assemble volume attributes: %v", err))
}
if podAttrs != nil {
if volAttribs == nil {
@ -254,10 +257,13 @@ func (c *csiMountMgr) SetUpAt(dir string, mounterArgs volume.MounterArgs) error
)
if err != nil {
if removeMountDirErr := removeMountDir(c.plugin, dir); removeMountDirErr != nil {
klog.Error(log("mounter.SetupAt failed to remove mount dir after a NodePublish() error [%s]: %v", dir, removeMountDirErr))
// If operation finished with error then we can remove the mount directory.
if volumetypes.IsOperationFinishedError(err) {
if removeMountDirErr := removeMountDir(c.plugin, dir); removeMountDirErr != nil {
klog.Error(log("mounter.SetupAt failed to remove mount dir after a NodePublish() error [%s]: %v", dir, removeMountDirErr))
}
}
return errors.New(log("mounter.SetupAt failed: %v", err))
return err
}
c.supportsSELinux, err = c.kubeVolHost.GetHostUtil().GetSELinuxSupport(dir)
@ -269,21 +275,13 @@ func (c *csiMountMgr) SetUpAt(dir string, mounterArgs volume.MounterArgs) error
// The following logic is derived from https://github.com/kubernetes/kubernetes/issues/66323
// if fstype is "", then skip fsgroup (could be indication of non-block filesystem)
// if fstype is provided and pv.AccessMode == ReadWriteOnly, then apply fsgroup
err = c.applyFSGroup(fsType, mounterArgs.FsGroup)
if err != nil {
// attempt to rollback mount.
fsGrpErr := fmt.Errorf("applyFSGroup failed for vol %s: %v", c.volumeID, err)
if unpubErr := csi.NodeUnpublishVolume(ctx, c.volumeID, dir); unpubErr != nil {
klog.Error(log("NodeUnpublishVolume failed for [%s]: %v", c.volumeID, unpubErr))
return fsGrpErr
}
if unmountErr := removeMountDir(c.plugin, dir); unmountErr != nil {
klog.Error(log("removeMountDir failed for [%s]: %v", dir, unmountErr))
return fsGrpErr
}
return fsGrpErr
// At this point mount operation is successful:
// 1. Since volume can not be used by the pod because of invalid permissions, we must return error
// 2. Since mount is successful, we must record volume as mounted in uncertain state, so it can be
// cleaned up.
return volumetypes.NewUncertainProgressError(fmt.Sprintf("applyFSGroup failed for vol %s: %v", c.volumeID, err))
}
klog.V(4).Infof(log("mounter.SetUp successfully requested NodePublish [%s]", dir))

View File

@ -36,7 +36,9 @@ import (
featuregatetesting "k8s.io/component-base/featuregate/testing"
"k8s.io/kubernetes/pkg/features"
"k8s.io/kubernetes/pkg/volume"
fakecsi "k8s.io/kubernetes/pkg/volume/csi/fake"
"k8s.io/kubernetes/pkg/volume/util"
volumetypes "k8s.io/kubernetes/pkg/volume/util/types"
)
var (
@ -396,6 +398,113 @@ func TestMounterSetUpSimple(t *testing.T) {
}
}
func TestMounterSetupWithStatusTracking(t *testing.T) {
fakeClient := fakeclient.NewSimpleClientset()
plug, tmpDir := newTestPlugin(t, fakeClient)
defer os.RemoveAll(tmpDir)
nonFinalError := volumetypes.NewUncertainProgressError("non-final-error")
transientError := volumetypes.NewTransientOperationFailure("transient-error")
testCases := []struct {
name string
podUID types.UID
spec func(string, []string) *volume.Spec
shouldFail bool
exitError error
createAttachment bool
}{
{
name: "setup with correct persistent volume source should result in finish exit status",
podUID: types.UID(fmt.Sprintf("%08X", rand.Uint64())),
spec: func(fsType string, options []string) *volume.Spec {
pvSrc := makeTestPV("pv1", 20, testDriver, "vol1")
pvSrc.Spec.CSI.FSType = fsType
pvSrc.Spec.MountOptions = options
return volume.NewSpecFromPersistentVolume(pvSrc, false)
},
createAttachment: true,
},
{
name: "setup with missing attachment should result in nochange",
podUID: types.UID(fmt.Sprintf("%08X", rand.Uint64())),
spec: func(fsType string, options []string) *volume.Spec {
return volume.NewSpecFromPersistentVolume(makeTestPV("pv3", 20, testDriver, "vol4"), false)
},
exitError: transientError,
createAttachment: false,
shouldFail: true,
},
{
name: "setup with timeout errors on NodePublish",
podUID: types.UID(fmt.Sprintf("%08X", rand.Uint64())),
spec: func(fsType string, options []string) *volume.Spec {
return volume.NewSpecFromPersistentVolume(makeTestPV("pv4", 20, testDriver, fakecsi.NodePublishTimeOut_VolumeID), false)
},
createAttachment: true,
exitError: nonFinalError,
shouldFail: true,
},
{
name: "setup with missing secrets should result in nochange exit",
podUID: types.UID(fmt.Sprintf("%08X", rand.Uint64())),
spec: func(fsType string, options []string) *volume.Spec {
pv := makeTestPV("pv5", 20, testDriver, "vol6")
pv.Spec.PersistentVolumeSource.CSI.NodePublishSecretRef = &api.SecretReference{
Name: "foo",
Namespace: "default",
}
return volume.NewSpecFromPersistentVolume(pv, false)
},
exitError: transientError,
createAttachment: true,
shouldFail: true,
},
}
for _, tc := range testCases {
registerFakePlugin(testDriver, "endpoint", []string{"1.0.0"}, t)
t.Run(tc.name, func(t *testing.T) {
mounter, err := plug.NewMounter(
tc.spec("ext4", []string{}),
&api.Pod{ObjectMeta: meta.ObjectMeta{UID: tc.podUID, Namespace: testns}},
volume.VolumeOptions{},
)
if mounter == nil {
t.Fatal("failed to create CSI mounter")
}
csiMounter := mounter.(*csiMountMgr)
csiMounter.csiClient = setupClient(t, true)
if csiMounter.volumeLifecycleMode != storagev1beta1.VolumeLifecyclePersistent {
t.Fatal("unexpected volume mode: ", csiMounter.volumeLifecycleMode)
}
if tc.createAttachment {
attachID := getAttachmentName(csiMounter.volumeID, string(csiMounter.driverName), string(plug.host.GetNodeName()))
attachment := makeTestAttachment(attachID, "test-node", csiMounter.spec.Name())
_, err = csiMounter.k8s.StorageV1().VolumeAttachments().Create(attachment)
if err != nil {
t.Fatalf("failed to setup VolumeAttachment: %v", err)
}
}
err = csiMounter.SetUp(volume.MounterArgs{})
if tc.exitError != nil && reflect.TypeOf(tc.exitError) != reflect.TypeOf(err) {
t.Fatalf("expected exitError: %+v got: %+v", tc.exitError, err)
}
if tc.shouldFail && err == nil {
t.Fatalf("expected failure but Setup succeeded")
}
if !tc.shouldFail && err != nil {
t.Fatalf("expected success got mounter.Setup failed with: %v", err)
}
})
}
}
func TestMounterSetUpWithInline(t *testing.T) {
defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.CSIInlineVolume, true)()

View File

@ -11,6 +11,8 @@ go_library(
deps = [
"//vendor/github.com/container-storage-interface/spec/lib/go/csi:go_default_library",
"//vendor/google.golang.org/grpc:go_default_library",
"//vendor/google.golang.org/grpc/codes:go_default_library",
"//vendor/google.golang.org/grpc/status:go_default_library",
],
)

View File

@ -21,9 +21,17 @@ import (
"errors"
"strings"
"google.golang.org/grpc"
csipb "github.com/container-storage-interface/spec/lib/go/csi"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
)
const (
// NodePublishTimeout_VolumeID is volume id that will result in NodePublish operation to timeout
NodePublishTimeOut_VolumeID = "node-publish-timeout"
// NodeStageTimeOut_VolumeID is a volume id that will result in NodeStage operation to timeout
NodeStageTimeOut_VolumeID = "node-stage-timeout"
)
// IdentityClient is a CSI identity client used for testing
@ -158,6 +166,12 @@ func (f *NodeClient) NodePublishVolume(ctx context.Context, req *csipb.NodePubli
if !strings.Contains(fsTypes, fsType) {
return nil, errors.New("invalid fstype")
}
if req.GetVolumeId() == NodePublishTimeOut_VolumeID {
timeoutErr := status.Errorf(codes.DeadlineExceeded, "timeout exceeded")
return nil, timeoutErr
}
f.nodePublishedVolumes[req.GetVolumeId()] = CSIVolume{
VolumeHandle: req.GetVolumeId(),
Path: req.GetTargetPath(),
@ -214,6 +228,11 @@ func (f *NodeClient) NodeStageVolume(ctx context.Context, req *csipb.NodeStageVo
return nil, errors.New("invalid fstype")
}
if req.GetVolumeId() == NodeStageTimeOut_VolumeID {
timeoutErr := status.Errorf(codes.DeadlineExceeded, "timeout exceeded")
return nil, timeoutErr
}
f.nodeStagedVolumes[req.GetVolumeId()] = csiVol
return &csipb.NodeStageVolumeResponse{}, nil
}

View File

@ -18,6 +18,7 @@ go_library(
"//pkg/volume/util/hostutil:go_default_library",
"//pkg/volume/util/recyclerclient:go_default_library",
"//pkg/volume/util/subpath:go_default_library",
"//pkg/volume/util/types:go_default_library",
"//pkg/volume/util/volumepathhandler:go_default_library",
"//staging/src/k8s.io/api/authentication/v1:go_default_library",
"//staging/src/k8s.io/api/core/v1:go_default_library",

View File

@ -51,6 +51,7 @@ import (
"k8s.io/kubernetes/pkg/volume/util/hostutil"
"k8s.io/kubernetes/pkg/volume/util/recyclerclient"
"k8s.io/kubernetes/pkg/volume/util/subpath"
volumetypes "k8s.io/kubernetes/pkg/volume/util/types"
"k8s.io/kubernetes/pkg/volume/util/volumepathhandler"
)
@ -66,6 +67,35 @@ const (
TimeoutAttachNode = "timeout-attach-node"
// The node is marked as multi-attach which means it is allowed to attach the volume to multiple nodes.
MultiAttachNode = "multi-attach-node"
// TimeoutOnSetupVolumeName will cause Setup call to timeout but volume will finish mounting.
TimeoutOnSetupVolumeName = "timeout-setup-volume"
// FailOnSetupVolumeName will cause setup call to fail
FailOnSetupVolumeName = "fail-setup-volume"
//TimeoutAndFailOnSetupVolumeName will first timeout and then fail the setup
TimeoutAndFailOnSetupVolumeName = "timeout-and-fail-setup-volume"
// SuccessAndTimeoutSetupVolumeName will cause first mount operation to succeed but subsequent attempts to timeout
SuccessAndTimeoutSetupVolumeName = "success-and-timeout-setup-volume-name"
// SuccessAndFailOnSetupVolumeName will cause first mount operation to succeed but subsequent attempts to fail
SuccessAndFailOnSetupVolumeName = "success-and-failed-setup-device-name"
// TimeoutOnMountDeviceVolumeName will cause MountDevice call to timeout but Setup will finish.
TimeoutOnMountDeviceVolumeName = "timeout-mount-device-volume"
// TimeoutAndFailOnMountDeviceVolumeName will cause first MountDevice call to timeout but second call will fail
TimeoutAndFailOnMountDeviceVolumeName = "timeout-and-fail-mount-device-name"
// FailMountDeviceVolumeName will cause MountDevice operation on volume to fail
FailMountDeviceVolumeName = "fail-mount-device-volume-name"
// SuccessAndTimeoutDeviceName will cause first mount operation to succeed but subsequent attempts to timeout
SuccessAndTimeoutDeviceName = "success-and-timeout-device-name"
// SuccessAndFailOnMountDeviceName will cause first mount operation to succeed but subsequent attempts to fail
SuccessAndFailOnMountDeviceName = "success-and-failed-mount-device-name"
deviceNotMounted = "deviceNotMounted"
deviceMountUncertain = "deviceMountUncertain"
deviceMounted = "deviceMounted"
volumeNotMounted = "volumeNotMounted"
volumeMountUncertain = "volumeMountUncertain"
volumeMounted = "volumeMounted"
)
// fakeVolumeHost is useful for testing volume plugins.
@ -345,6 +375,7 @@ type FakeVolumePlugin struct {
VolumeLimitsError error
LimitKey string
ProvisionDelaySeconds int
SupportsRemount bool
// Add callbacks as needed
WaitForAttachHook func(spec *Spec, devicePath string, pod *v1.Pod, spectimeout time.Duration) (string, error)
@ -383,6 +414,8 @@ func (plugin *FakeVolumePlugin) getFakeVolume(list *[]*FakeVolume) *FakeVolume {
UnmountDeviceHook: plugin.UnmountDeviceHook,
}
volume.VolumesAttached = make(map[string]types.NodeName)
volume.DeviceMountState = make(map[string]string)
volume.VolumeMountState = make(map[string]string)
*list = append(*list, volume)
return volume
}
@ -420,7 +453,7 @@ func (plugin *FakeVolumePlugin) CanSupport(spec *Spec) bool {
}
func (plugin *FakeVolumePlugin) RequiresRemount() bool {
return false
return plugin.SupportsRemount
}
func (plugin *FakeVolumePlugin) SupportsMountOption() bool {
@ -784,7 +817,9 @@ type FakeVolume struct {
VolName string
Plugin *FakeVolumePlugin
MetricsNil
VolumesAttached map[string]types.NodeName
VolumesAttached map[string]types.NodeName
DeviceMountState map[string]string
VolumeMountState map[string]string
// Add callbacks as needed
WaitForAttachHook func(spec *Spec, devicePath string, pod *v1.Pod, spectimeout time.Duration) (string, error)
@ -835,7 +870,50 @@ func (fv *FakeVolume) CanMount() error {
func (fv *FakeVolume) SetUp(mounterArgs MounterArgs) error {
fv.Lock()
defer fv.Unlock()
err := fv.setupInternal(mounterArgs)
fv.SetUpCallCount++
return err
}
func (fv *FakeVolume) setupInternal(mounterArgs MounterArgs) error {
if fv.VolName == TimeoutOnSetupVolumeName {
fv.VolumeMountState[fv.VolName] = volumeMountUncertain
return volumetypes.NewUncertainProgressError("time out on setup")
}
if fv.VolName == FailOnSetupVolumeName {
fv.VolumeMountState[fv.VolName] = volumeNotMounted
return fmt.Errorf("mounting volume failed")
}
if fv.VolName == TimeoutAndFailOnSetupVolumeName {
_, ok := fv.VolumeMountState[fv.VolName]
if !ok {
fv.VolumeMountState[fv.VolName] = volumeMountUncertain
return volumetypes.NewUncertainProgressError("time out on setup")
}
fv.VolumeMountState[fv.VolName] = volumeNotMounted
return fmt.Errorf("mounting volume failed")
}
if fv.VolName == SuccessAndFailOnSetupVolumeName {
_, ok := fv.VolumeMountState[fv.VolName]
if ok {
fv.VolumeMountState[fv.VolName] = volumeNotMounted
return fmt.Errorf("mounting volume failed")
}
}
if fv.VolName == SuccessAndTimeoutSetupVolumeName {
_, ok := fv.VolumeMountState[fv.VolName]
if ok {
fv.VolumeMountState[fv.VolName] = volumeMountUncertain
return volumetypes.NewUncertainProgressError("time out on setup")
}
}
fv.VolumeMountState[fv.VolName] = volumeNotMounted
return fv.SetUpAt(fv.getPath(), mounterArgs)
}
@ -1036,19 +1114,64 @@ func (fv *FakeVolume) GetDeviceMountPath(spec *Spec) (string, error) {
return "", nil
}
func (fv *FakeVolume) MountDevice(spec *Spec, devicePath string, deviceMountPath string) error {
func (fv *FakeVolume) mountDeviceInternal(spec *Spec, devicePath string, deviceMountPath string) error {
fv.Lock()
defer fv.Unlock()
if spec.Name() == TimeoutOnMountDeviceVolumeName {
fv.DeviceMountState[spec.Name()] = deviceMountUncertain
return volumetypes.NewUncertainProgressError("mount failed")
}
if spec.Name() == FailMountDeviceVolumeName {
fv.DeviceMountState[spec.Name()] = deviceNotMounted
return fmt.Errorf("error mounting disk: %s", devicePath)
}
if spec.Name() == TimeoutAndFailOnMountDeviceVolumeName {
_, ok := fv.DeviceMountState[spec.Name()]
if !ok {
fv.DeviceMountState[spec.Name()] = deviceMountUncertain
return volumetypes.NewUncertainProgressError("timed out mounting error")
}
fv.DeviceMountState[spec.Name()] = deviceNotMounted
return fmt.Errorf("error mounting disk: %s", devicePath)
}
if spec.Name() == SuccessAndTimeoutDeviceName {
_, ok := fv.DeviceMountState[spec.Name()]
if ok {
fv.DeviceMountState[spec.Name()] = deviceMountUncertain
return volumetypes.NewUncertainProgressError("error mounting state")
}
}
if spec.Name() == SuccessAndFailOnMountDeviceName {
_, ok := fv.DeviceMountState[spec.Name()]
if ok {
return fmt.Errorf("error mounting disk: %s", devicePath)
}
}
fv.DeviceMountState[spec.Name()] = deviceMounted
fv.MountDeviceCallCount++
return nil
}
func (fv *FakeVolume) MountDevice(spec *Spec, devicePath string, deviceMountPath string) error {
return fv.mountDeviceInternal(spec, devicePath, deviceMountPath)
}
func (fv *FakeVolume) GetMountDeviceCallCount() int {
fv.RLock()
defer fv.RUnlock()
return fv.MountDeviceCallCount
}
func (fv *FakeVolume) GetUnmountDeviceCallCount() int {
fv.RLock()
defer fv.RUnlock()
return fv.UnmountDeviceCallCount
}
func (fv *FakeVolume) Detach(volumeName string, nodeName types.NodeName) error {
fv.Lock()
defer fv.Unlock()
@ -1304,6 +1427,28 @@ func VerifyMountDeviceCallCount(
expectedMountDeviceCallCount)
}
func VerifyUnmountDeviceCallCount(expectedCallCount int, fakeVolumePlugin *FakeVolumePlugin) error {
detachers := fakeVolumePlugin.GetDetachers()
if len(detachers) == 0 && (expectedCallCount == 0) {
return nil
}
actualCallCount := 0
for _, detacher := range detachers {
actualCallCount = detacher.GetUnmountDeviceCallCount()
if expectedCallCount == 0 && actualCallCount == expectedCallCount {
return nil
}
if (expectedCallCount > 0) && (actualCallCount >= expectedCallCount) {
return nil
}
}
return fmt.Errorf(
"Expected DeviceUnmount Call %d, got %d",
expectedCallCount, actualCallCount)
}
// VerifyZeroMountDeviceCallCount ensures that all Attachers for this plugin
// have a zero MountDeviceCallCount. Otherwise it returns an error.
func VerifyZeroMountDeviceCallCount(fakeVolumePlugin *FakeVolumePlugin) error {
@ -1358,9 +1503,18 @@ func VerifyZeroSetUpCallCount(fakeVolumePlugin *FakeVolumePlugin) error {
func VerifyTearDownCallCount(
expectedTearDownCallCount int,
fakeVolumePlugin *FakeVolumePlugin) error {
for _, unmounter := range fakeVolumePlugin.GetUnmounters() {
unmounters := fakeVolumePlugin.GetUnmounters()
if len(unmounters) == 0 && (expectedTearDownCallCount == 0) {
return nil
}
for _, unmounter := range unmounters {
actualCallCount := unmounter.GetTearDownCallCount()
if actualCallCount >= expectedTearDownCallCount {
if expectedTearDownCallCount == 0 && actualCallCount == expectedTearDownCallCount {
return nil
}
if (expectedTearDownCallCount > 0) && (actualCallCount >= expectedTearDownCallCount) {
return nil
}
}

View File

@ -160,23 +160,48 @@ func NewOperationExecutor(
}
}
// MarkVolumeOpts is an struct to pass arguments to MountVolume functions
type MarkVolumeOpts struct {
PodName volumetypes.UniquePodName
PodUID types.UID
VolumeName v1.UniqueVolumeName
Mounter volume.Mounter
BlockVolumeMapper volume.BlockVolumeMapper
OuterVolumeSpecName string
VolumeGidVolume string
VolumeSpec *volume.Spec
VolumeMountState VolumeMountState
}
// ActualStateOfWorldMounterUpdater defines a set of operations updating the actual
// state of the world cache after successful mount/unmount.
type ActualStateOfWorldMounterUpdater interface {
// Marks the specified volume as mounted to the specified pod
MarkVolumeAsMounted(podName volumetypes.UniquePodName, podUID types.UID, volumeName v1.UniqueVolumeName, mounter volume.Mounter, blockVolumeMapper volume.BlockVolumeMapper, outerVolumeSpecName string, volumeGidValue string, volumeSpec *volume.Spec) error
MarkVolumeAsMounted(markVolumeOpts MarkVolumeOpts) error
// Marks the specified volume as unmounted from the specified pod
MarkVolumeAsUnmounted(podName volumetypes.UniquePodName, volumeName v1.UniqueVolumeName) error
// MarkVolumeMountAsUncertain marks state of volume mount for the pod uncertain
MarkVolumeMountAsUncertain(markVolumeOpts MarkVolumeOpts) error
// Marks the specified volume as having been globally mounted.
MarkDeviceAsMounted(volumeName v1.UniqueVolumeName, devicePath, deviceMountPath string) error
// MarkDeviceAsUncertain marks device state in global mount path as uncertain
MarkDeviceAsUncertain(volumeName v1.UniqueVolumeName, devicePath, deviceMountPath string) error
// Marks the specified volume as having its global mount unmounted.
MarkDeviceAsUnmounted(volumeName v1.UniqueVolumeName) error
// Marks the specified volume's file system resize request is finished.
MarkVolumeAsResized(podName volumetypes.UniquePodName, volumeName v1.UniqueVolumeName) error
// GetDeviceMountState returns mount state of the device in global path
GetDeviceMountState(volumeName v1.UniqueVolumeName) DeviceMountState
// GetVolumeMountState returns mount state of the volume for the Pod
GetVolumeMountState(volumName v1.UniqueVolumeName, podName volumetypes.UniquePodName) VolumeMountState
}
// ActualStateOfWorldAttacherUpdater defines a set of operations updating the
@ -354,6 +379,35 @@ type VolumeToMount struct {
DesiredSizeLimit *resource.Quantity
}
// DeviceMountState represents device mount state in a global path.
type DeviceMountState string
const (
// DeviceGloballyMounted means device has been globally mounted successfully
DeviceGloballyMounted DeviceMountState = "DeviceGloballyMounted"
// DeviceMountUncertain means device may not be mounted but a mount operation may be
// in-progress which can cause device mount to succeed.
DeviceMountUncertain DeviceMountState = "DeviceMountUncertain"
// DeviceNotMounted means device has not been mounted globally.
DeviceNotMounted DeviceMountState = "DeviceNotMounted"
)
// VolumeMountState represents volume mount state in a path local to the pod.
type VolumeMountState string
const (
// VolumeMounted means volume has been mounted in pod's local path
VolumeMounted VolumeMountState = "VolumeMounted"
// VolumeMountUncertain means volume may or may not be mounted in pods' local path
VolumeMountUncertain VolumeMountState = "VolumeMountUncertain"
// VolumeNotMounted means volume has not be mounted in pod's local path
VolumeNotMounted VolumeMountState = "VolumeNotMounted"
)
// GenerateMsgDetailed returns detailed msgs for volumes to mount
func (volume *VolumeToMount) GenerateMsgDetailed(prefixMsg, suffixMsg string) (detailedMsg string) {
detailedStr := fmt.Sprintf("(UniqueName: %q) pod %q (UID: %q)", volume.VolumeName, volume.Pod.Name, volume.Pod.UID)

View File

@ -580,6 +580,7 @@ func (og *operationGenerator) GenerateMountVolumeFunc(
devicePath,
deviceMountPath)
if err != nil {
og.markDeviceErrorState(volumeToMount, devicePath, deviceMountPath, err, actualStateOfWorld)
// On failure, return error. Caller will log and retry.
return volumeToMount.GenerateError("MountVolume.MountDevice failed", err)
}
@ -621,7 +622,19 @@ func (og *operationGenerator) GenerateMountVolumeFunc(
FsGroup: fsGroup,
DesiredSize: volumeToMount.DesiredSizeLimit,
})
// Update actual state of world
markOpts := MarkVolumeOpts{
PodName: volumeToMount.PodName,
PodUID: volumeToMount.Pod.UID,
VolumeName: volumeToMount.VolumeName,
Mounter: volumeMounter,
OuterVolumeSpecName: volumeToMount.OuterVolumeSpecName,
VolumeGidVolume: volumeToMount.VolumeGidValue,
VolumeSpec: volumeToMount.VolumeSpec,
VolumeMountState: VolumeMounted,
}
if mountErr != nil {
og.markVolumeErrorState(volumeToMount, markOpts, mountErr, actualStateOfWorld)
// On failure, return error. Caller will log and retry.
return volumeToMount.GenerateError("MountVolume.SetUp failed", mountErr)
}
@ -647,16 +660,7 @@ func (og *operationGenerator) GenerateMountVolumeFunc(
}
}
// Update actual state of world
markVolMountedErr := actualStateOfWorld.MarkVolumeAsMounted(
volumeToMount.PodName,
volumeToMount.Pod.UID,
volumeToMount.VolumeName,
volumeMounter,
nil,
volumeToMount.OuterVolumeSpecName,
volumeToMount.VolumeGidValue,
volumeToMount.VolumeSpec)
markVolMountedErr := actualStateOfWorld.MarkVolumeAsMounted(markOpts)
if markVolMountedErr != nil {
// On failure, return error. Caller will log and retry.
return volumeToMount.GenerateError("MountVolume.MarkVolumeAsMounted failed", markVolMountedErr)
@ -679,6 +683,49 @@ func (og *operationGenerator) GenerateMountVolumeFunc(
}
}
func (og *operationGenerator) markDeviceErrorState(volumeToMount VolumeToMount, devicePath, deviceMountPath string, mountError error, actualStateOfWorld ActualStateOfWorldMounterUpdater) {
if volumetypes.IsOperationFinishedError(mountError) &&
actualStateOfWorld.GetDeviceMountState(volumeToMount.VolumeName) == DeviceMountUncertain {
// Only devices which were uncertain can be marked as unmounted
markDeviceUnmountError := actualStateOfWorld.MarkDeviceAsUnmounted(volumeToMount.VolumeName)
if markDeviceUnmountError != nil {
klog.Errorf(volumeToMount.GenerateErrorDetailed("MountDevice.MarkDeviceAsUnmounted failed", markDeviceUnmountError).Error())
}
return
}
if volumetypes.IsUncertainProgressError(mountError) &&
actualStateOfWorld.GetDeviceMountState(volumeToMount.VolumeName) == DeviceNotMounted {
// only devices which are not mounted can be marked as uncertain. We do not want to mark a device
// which was previously marked as mounted here as uncertain.
markDeviceUncertainError := actualStateOfWorld.MarkDeviceAsUncertain(volumeToMount.VolumeName, devicePath, deviceMountPath)
if markDeviceUncertainError != nil {
klog.Errorf(volumeToMount.GenerateErrorDetailed("MountDevice.MarkDeviceAsUncertain failed", markDeviceUncertainError).Error())
}
}
}
func (og *operationGenerator) markVolumeErrorState(volumeToMount VolumeToMount, markOpts MarkVolumeOpts, mountError error, actualStateOfWorld ActualStateOfWorldMounterUpdater) {
if volumetypes.IsOperationFinishedError(mountError) &&
actualStateOfWorld.GetVolumeMountState(volumeToMount.VolumeName, markOpts.PodName) == VolumeMountUncertain {
t := actualStateOfWorld.MarkVolumeAsUnmounted(volumeToMount.PodName, volumeToMount.VolumeName)
if t != nil {
klog.Errorf(volumeToMount.GenerateErrorDetailed("MountVolume.MarkVolumeAsUnmounted failed", t).Error())
}
return
}
if volumetypes.IsUncertainProgressError(mountError) &&
actualStateOfWorld.GetVolumeMountState(volumeToMount.VolumeName, markOpts.PodName) == VolumeNotMounted {
t := actualStateOfWorld.MarkVolumeMountAsUncertain(markOpts)
if t != nil {
klog.Errorf(volumeToMount.GenerateErrorDetailed("MountVolume.MarkVolumeMountAsUncertain failed", t).Error())
}
}
}
func (og *operationGenerator) GenerateUnmountVolumeFunc(
volumeToUnmount MountedVolume,
actualStateOfWorld ActualStateOfWorldMounterUpdater,
@ -982,16 +1029,18 @@ func (og *operationGenerator) GenerateMapVolumeFunc(
return volumeToMount.GenerateError("MapVolume.MarkVolumeAsMounted failed while expanding volume", resizeError)
}
// Update actual state of world
markVolMountedErr := actualStateOfWorld.MarkVolumeAsMounted(
volumeToMount.PodName,
volumeToMount.Pod.UID,
volumeToMount.VolumeName,
nil,
blockVolumeMapper,
volumeToMount.OuterVolumeSpecName,
volumeToMount.VolumeGidValue,
volumeToMount.VolumeSpec)
markVolumeOpts := MarkVolumeOpts{
PodName: volumeToMount.PodName,
PodUID: volumeToMount.Pod.UID,
VolumeName: volumeToMount.VolumeName,
BlockVolumeMapper: blockVolumeMapper,
OuterVolumeSpecName: volumeToMount.OuterVolumeSpecName,
VolumeGidVolume: volumeToMount.VolumeGidValue,
VolumeSpec: volumeToMount.VolumeSpec,
VolumeMountState: VolumeMounted,
}
markVolMountedErr := actualStateOfWorld.MarkVolumeAsMounted(markVolumeOpts)
if markVolMountedErr != nil {
// On failure, return error. Caller will log and retry.
return volumeToMount.GenerateError("MapVolume.MarkVolumeAsMounted failed", markVolMountedErr)

View File

@ -51,6 +51,57 @@ func (o *GeneratedOperations) Run() (eventErr, detailedErr error) {
return o.OperationFunc()
}
// TransientOperationFailure indicates operation failed with a transient error
// and may fix itself when retried.
type TransientOperationFailure struct {
msg string
}
func (err *TransientOperationFailure) Error() string {
return err.msg
}
// NewTransientOperationFailure creates an instance of TransientOperationFailure error
func NewTransientOperationFailure(msg string) *TransientOperationFailure {
return &TransientOperationFailure{msg: msg}
}
// UncertainProgressError indicates operation failed with a non-final error
// and operation may be in-progress in background.
type UncertainProgressError struct {
msg string
}
func (err *UncertainProgressError) Error() string {
return err.msg
}
// NewUncertainProgressError creates an instance of UncertainProgressError type
func NewUncertainProgressError(msg string) *UncertainProgressError {
return &UncertainProgressError{msg: msg}
}
// IsOperationFinishedError checks if given error is of type that indicates
// operation is finished with a FINAL error.
func IsOperationFinishedError(err error) bool {
if _, ok := err.(*UncertainProgressError); ok {
return false
}
if _, ok := err.(*TransientOperationFailure); ok {
return false
}
return true
}
// IsUncertainProgressError checks if given error is of type that indicates
// operation might be in-progress in background.
func IsUncertainProgressError(err error) bool {
if _, ok := err.(*UncertainProgressError); ok {
return true
}
return false
}
const (
// VolumeResizerKey is key that will be used to store resizer used
// for resizing PVC. The generated key/value pair will be added

View File

@ -19,7 +19,7 @@ package volume
import (
"time"
"k8s.io/api/core/v1"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
@ -128,7 +128,12 @@ type Mounter interface {
// content should be owned by 'fsGroup' so that it can be
// accessed by the pod. This may be called more than once, so
// implementations must be idempotent.
// It could return following types of errors:
// - TransientOperationFailure
// - UncertainProgressError
// - Error of any other type should be considered a final error
SetUp(mounterArgs MounterArgs) error
// SetUpAt prepares and mounts/unpacks the volume to the
// specified directory path, which may or may not exist yet.
// The mount point and its content should be owned by
@ -247,6 +252,10 @@ type DeviceMounter interface {
// MountDevice mounts the disk to a global path which
// individual pods can then bind mount
// Note that devicePath can be empty if the volume plugin does not implement any of Attach and WaitForAttach methods.
// It could return following types of errors:
// - TransientOperationFailure
// - UncertainProgressError
// - Error of any other type should be considered a final error
MountDevice(spec *Spec, devicePath string, deviceMountPath string) error
}