mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-08-01 15:58:37 +00:00
Handle the case of remounts correctly
This commit is contained in:
parent
5feea93163
commit
309c6f863a
@ -369,6 +369,34 @@ func (asw *actualStateOfWorld) MarkDeviceAsUnmounted(
|
||||
return asw.SetDeviceMountState(volumeName, operationexecutor.DeviceNotMounted, "", "")
|
||||
}
|
||||
|
||||
func (asw *actualStateOfWorld) GetDeviceMountState(volumeName v1.UniqueVolumeName) operationexecutor.DeviceMountState {
|
||||
asw.RLock()
|
||||
defer asw.RUnlock()
|
||||
|
||||
volumeObj, volumeExists := asw.attachedVolumes[volumeName]
|
||||
if !volumeExists {
|
||||
return operationexecutor.DeviceNotMounted
|
||||
}
|
||||
|
||||
return volumeObj.deviceMountState
|
||||
}
|
||||
|
||||
func (asw *actualStateOfWorld) GetVolumeMountState(volumeName v1.UniqueVolumeName, podName volumetypes.UniquePodName) operationexecutor.VolumeMountState {
|
||||
asw.RLock()
|
||||
defer asw.RUnlock()
|
||||
|
||||
volumeObj, volumeExists := asw.attachedVolumes[volumeName]
|
||||
if !volumeExists {
|
||||
return operationexecutor.VolumeNotMounted
|
||||
}
|
||||
|
||||
podObj, podExists := volumeObj.mountedPods[podName]
|
||||
if !podExists {
|
||||
return operationexecutor.VolumeNotMounted
|
||||
}
|
||||
return podObj.volumeMountStateForPod
|
||||
}
|
||||
|
||||
// addVolume adds the given volume to the cache indicating the specified
|
||||
// volume is attached to this node. If no volume name is supplied, a unique
|
||||
// volume name is generated from the volumeSpec and returned on success. If a
|
||||
|
@ -55,6 +55,7 @@ const (
|
||||
nodeName k8stypes.NodeName = k8stypes.NodeName("mynodename")
|
||||
kubeletPodsDir string = "fake-dir"
|
||||
testOperationBackOffDuration time.Duration = 100 * time.Millisecond
|
||||
reconcilerSyncWaitDuration time.Duration = 10 * time.Second
|
||||
)
|
||||
|
||||
func hasAddedPods() bool { return true }
|
||||
@ -1153,6 +1154,7 @@ func Test_UncertainDeviceGlobalMounts(t *testing.T) {
|
||||
deviceState operationexecutor.DeviceMountState
|
||||
unmountDeviceCallCount int
|
||||
volumeName string
|
||||
supportRemount bool
|
||||
}{
|
||||
{
|
||||
name: "timed out operations should result in device marked as uncertain",
|
||||
@ -1172,6 +1174,20 @@ func Test_UncertainDeviceGlobalMounts(t *testing.T) {
|
||||
unmountDeviceCallCount: 0,
|
||||
volumeName: volumetesting.TimeoutAndFailOnMountDeviceVolumeName,
|
||||
},
|
||||
{
|
||||
name: "success followed by timeout operation should result in mounted device",
|
||||
deviceState: operationexecutor.DeviceGloballyMounted,
|
||||
unmountDeviceCallCount: 1,
|
||||
volumeName: volumetesting.SuccessAndTimeoutDeviceName,
|
||||
supportRemount: true,
|
||||
},
|
||||
{
|
||||
name: "success followed by failed operation should result in mounted device",
|
||||
deviceState: operationexecutor.DeviceGloballyMounted,
|
||||
unmountDeviceCallCount: 1,
|
||||
volumeName: volumetesting.SuccessAndFailOnMountDeviceName,
|
||||
supportRemount: true,
|
||||
},
|
||||
}
|
||||
|
||||
for _, tc := range tests {
|
||||
@ -1216,6 +1232,8 @@ func Test_UncertainDeviceGlobalMounts(t *testing.T) {
|
||||
}
|
||||
|
||||
volumePluginMgr, fakePlugin := volumetesting.GetTestVolumePluginMgr(t)
|
||||
fakePlugin.SupportsRemount = tc.supportRemount
|
||||
|
||||
dsw := cache.NewDesiredStateOfWorld(volumePluginMgr)
|
||||
asw := cache.NewActualStateOfWorld(nodeName, volumePluginMgr)
|
||||
kubeClient := createtestClientWithPVPVC(pv, pvc, v1.AttachedVolume{
|
||||
@ -1262,113 +1280,203 @@ func Test_UncertainDeviceGlobalMounts(t *testing.T) {
|
||||
close(stoppedChan)
|
||||
}()
|
||||
waitForVolumeToExistInASW(t, volumeName, asw)
|
||||
if tc.volumeName == volumetesting.TimeoutAndFailOnMountDeviceVolumeName {
|
||||
// Wait upto 10s for reconciler to catchup
|
||||
time.Sleep(reconcilerSyncWaitDuration)
|
||||
}
|
||||
|
||||
if tc.volumeName == volumetesting.SuccessAndFailOnMountDeviceName ||
|
||||
tc.volumeName == volumetesting.SuccessAndTimeoutDeviceName {
|
||||
// wait for mount and then break it via remount
|
||||
waitForMount(t, fakePlugin, volumeName, asw)
|
||||
asw.MarkRemountRequired(podName)
|
||||
time.Sleep(reconcilerSyncWaitDuration)
|
||||
}
|
||||
|
||||
if tc.deviceState == operationexecutor.DeviceMountUncertain {
|
||||
waitForUncertainGlobalMount(t, volumeName, asw)
|
||||
}
|
||||
|
||||
if tc.deviceState == operationexecutor.DeviceGloballyMounted {
|
||||
waitForMount(t, fakePlugin, volumeName, asw)
|
||||
}
|
||||
|
||||
dsw.DeletePodFromVolume(podName, volumeName)
|
||||
waitForDetach(t, volumeName, asw)
|
||||
|
||||
volumetesting.VerifyUnmountDeviceCallCount(1, fakePlugin)
|
||||
|
||||
err = volumetesting.VerifyUnmountDeviceCallCount(tc.unmountDeviceCallCount, fakePlugin)
|
||||
if err != nil {
|
||||
t.Errorf("Error verifying UnMountDeviceCallCount: %v", err)
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func Test_UncertainVolumeMountState(t *testing.T) {
|
||||
fsMode := v1.PersistentVolumeFilesystem
|
||||
pv := &v1.PersistentVolume{
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
Name: volumetesting.TimeoutOnSetupVolumeName,
|
||||
UID: "pvuid",
|
||||
var tests = []struct {
|
||||
name string
|
||||
volumeState operationexecutor.VolumeMountState
|
||||
unmountDeviceCallCount int
|
||||
unmountVolumeCount int
|
||||
volumeName string
|
||||
supportRemount bool
|
||||
}{
|
||||
{
|
||||
name: "timed out operations should result in volume marked as uncertain",
|
||||
volumeState: operationexecutor.VolumeMountUncertain,
|
||||
unmountDeviceCallCount: 1,
|
||||
unmountVolumeCount: 1,
|
||||
volumeName: volumetesting.TimeoutOnSetupVolumeName,
|
||||
},
|
||||
Spec: v1.PersistentVolumeSpec{
|
||||
ClaimRef: &v1.ObjectReference{Name: "pvc"},
|
||||
VolumeMode: &fsMode,
|
||||
{
|
||||
name: "failed operation should result in not-mounted volume",
|
||||
volumeState: operationexecutor.VolumeNotMounted,
|
||||
unmountDeviceCallCount: 0,
|
||||
unmountVolumeCount: 0,
|
||||
volumeName: volumetesting.FailOnSetupVolumeName,
|
||||
},
|
||||
{
|
||||
name: "timeout followed by failed operation should result in non-mounted volume",
|
||||
volumeState: operationexecutor.VolumeNotMounted,
|
||||
unmountDeviceCallCount: 0,
|
||||
unmountVolumeCount: 0,
|
||||
volumeName: volumetesting.TimeoutAndFailOnSetupVolumeName,
|
||||
},
|
||||
{
|
||||
name: "success followed by timeout operation should result in mounted volume",
|
||||
volumeState: operationexecutor.VolumeMounted,
|
||||
unmountDeviceCallCount: 1,
|
||||
unmountVolumeCount: 1,
|
||||
volumeName: volumetesting.SuccessAndTimeoutSetupVolumeName,
|
||||
supportRemount: true,
|
||||
},
|
||||
{
|
||||
name: "success followed by failed operation should result in mounted volume",
|
||||
volumeState: operationexecutor.VolumeMounted,
|
||||
unmountDeviceCallCount: 1,
|
||||
unmountVolumeCount: 1,
|
||||
volumeName: volumetesting.SuccessAndFailOnSetupVolumeName,
|
||||
supportRemount: true,
|
||||
},
|
||||
}
|
||||
pvc := &v1.PersistentVolumeClaim{
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
Name: "pvc",
|
||||
UID: "pvcuid",
|
||||
},
|
||||
Spec: v1.PersistentVolumeClaimSpec{
|
||||
VolumeName: volumetesting.TimeoutOnSetupVolumeName,
|
||||
},
|
||||
}
|
||||
pod := &v1.Pod{
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
Name: "pod1",
|
||||
UID: "pod1uid",
|
||||
},
|
||||
Spec: v1.PodSpec{
|
||||
Volumes: []v1.Volume{
|
||||
{
|
||||
Name: "volume-name",
|
||||
VolumeSource: v1.VolumeSource{
|
||||
PersistentVolumeClaim: &v1.PersistentVolumeClaimVolumeSource{
|
||||
ClaimName: pvc.Name,
|
||||
|
||||
for _, tc := range tests {
|
||||
t.Run(tc.name, func(t *testing.T) {
|
||||
pv := &v1.PersistentVolume{
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
Name: tc.volumeName,
|
||||
UID: "pvuid",
|
||||
},
|
||||
Spec: v1.PersistentVolumeSpec{
|
||||
ClaimRef: &v1.ObjectReference{Name: "pvc"},
|
||||
VolumeMode: &fsMode,
|
||||
},
|
||||
}
|
||||
pvc := &v1.PersistentVolumeClaim{
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
Name: "pvc",
|
||||
UID: "pvcuid",
|
||||
},
|
||||
Spec: v1.PersistentVolumeClaimSpec{
|
||||
VolumeName: tc.volumeName,
|
||||
},
|
||||
}
|
||||
pod := &v1.Pod{
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
Name: "pod1",
|
||||
UID: "pod1uid",
|
||||
},
|
||||
Spec: v1.PodSpec{
|
||||
Volumes: []v1.Volume{
|
||||
{
|
||||
Name: "volume-name",
|
||||
VolumeSource: v1.VolumeSource{
|
||||
PersistentVolumeClaim: &v1.PersistentVolumeClaimVolumeSource{
|
||||
ClaimName: pvc.Name,
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
volumePluginMgr, fakePlugin := volumetesting.GetTestVolumePluginMgr(t)
|
||||
fakePlugin.SupportsRemount = tc.supportRemount
|
||||
dsw := cache.NewDesiredStateOfWorld(volumePluginMgr)
|
||||
asw := cache.NewActualStateOfWorld(nodeName, volumePluginMgr)
|
||||
kubeClient := createtestClientWithPVPVC(pv, pvc, v1.AttachedVolume{
|
||||
Name: v1.UniqueVolumeName(fmt.Sprintf("fake-plugin/%s", tc.volumeName)),
|
||||
DevicePath: "fake/path",
|
||||
})
|
||||
fakeRecorder := &record.FakeRecorder{}
|
||||
fakeHandler := volumetesting.NewBlockVolumePathHandler()
|
||||
oex := operationexecutor.NewOperationExecutor(operationexecutor.NewOperationGenerator(
|
||||
kubeClient,
|
||||
volumePluginMgr,
|
||||
fakeRecorder,
|
||||
false, /* checkNodeCapabilitiesBeforeMount */
|
||||
fakeHandler))
|
||||
|
||||
reconciler := NewReconciler(
|
||||
kubeClient,
|
||||
true, /* controllerAttachDetachEnabled */
|
||||
reconcilerLoopSleepDuration,
|
||||
waitForAttachTimeout,
|
||||
nodeName,
|
||||
dsw,
|
||||
asw,
|
||||
hasAddedPods,
|
||||
oex,
|
||||
&mount.FakeMounter{},
|
||||
hostutil.NewFakeHostUtil(nil),
|
||||
volumePluginMgr,
|
||||
kubeletPodsDir)
|
||||
volumeSpec := &volume.Spec{PersistentVolume: pv}
|
||||
podName := util.GetUniquePodName(pod)
|
||||
volumeName, err := dsw.AddPodToVolume(
|
||||
podName, pod, volumeSpec, volumeSpec.Name(), "" /* volumeGidValue */)
|
||||
// Assert
|
||||
if err != nil {
|
||||
t.Fatalf("AddPodToVolume failed. Expected: <no error> Actual: <%v>", err)
|
||||
}
|
||||
dsw.MarkVolumesReportedInUse([]v1.UniqueVolumeName{volumeName})
|
||||
|
||||
// Start the reconciler to fill ASW.
|
||||
stopChan, stoppedChan := make(chan struct{}), make(chan struct{})
|
||||
go func() {
|
||||
reconciler.Run(stopChan)
|
||||
close(stoppedChan)
|
||||
}()
|
||||
waitForVolumeToExistInASW(t, volumeName, asw)
|
||||
if tc.volumeName == volumetesting.TimeoutAndFailOnSetupVolumeName {
|
||||
// Wait upto 10s for reconciler to catchup
|
||||
time.Sleep(reconcilerSyncWaitDuration)
|
||||
}
|
||||
|
||||
if tc.volumeName == volumetesting.SuccessAndFailOnSetupVolumeName ||
|
||||
tc.volumeName == volumetesting.SuccessAndTimeoutSetupVolumeName {
|
||||
// wait for mount and then break it via remount
|
||||
waitForMount(t, fakePlugin, volumeName, asw)
|
||||
asw.MarkRemountRequired(podName)
|
||||
time.Sleep(reconcilerSyncWaitDuration)
|
||||
}
|
||||
|
||||
if tc.volumeState == operationexecutor.VolumeMountUncertain {
|
||||
waitForUncertainPodMount(t, volumeName, asw)
|
||||
}
|
||||
|
||||
if tc.volumeState == operationexecutor.VolumeMounted {
|
||||
waitForMount(t, fakePlugin, volumeName, asw)
|
||||
}
|
||||
|
||||
dsw.DeletePodFromVolume(podName, volumeName)
|
||||
waitForDetach(t, volumeName, asw)
|
||||
|
||||
volumetesting.VerifyUnmountDeviceCallCount(tc.unmountDeviceCallCount, fakePlugin)
|
||||
volumetesting.VerifyTearDownCallCount(tc.unmountVolumeCount, fakePlugin)
|
||||
})
|
||||
}
|
||||
|
||||
volumePluginMgr, fakePlugin := volumetesting.GetTestVolumePluginMgr(t)
|
||||
dsw := cache.NewDesiredStateOfWorld(volumePluginMgr)
|
||||
asw := cache.NewActualStateOfWorld(nodeName, volumePluginMgr)
|
||||
kubeClient := createtestClientWithPVPVC(pv, pvc, v1.AttachedVolume{
|
||||
Name: v1.UniqueVolumeName(fmt.Sprintf("fake-plugin/%s", volumetesting.TimeoutOnSetupVolumeName)),
|
||||
DevicePath: "fake/path",
|
||||
})
|
||||
fakeRecorder := &record.FakeRecorder{}
|
||||
fakeHandler := volumetesting.NewBlockVolumePathHandler()
|
||||
oex := operationexecutor.NewOperationExecutor(operationexecutor.NewOperationGenerator(
|
||||
kubeClient,
|
||||
volumePluginMgr,
|
||||
fakeRecorder,
|
||||
false, /* checkNodeCapabilitiesBeforeMount */
|
||||
fakeHandler))
|
||||
|
||||
reconciler := NewReconciler(
|
||||
kubeClient,
|
||||
true, /* controllerAttachDetachEnabled */
|
||||
reconcilerLoopSleepDuration,
|
||||
waitForAttachTimeout,
|
||||
nodeName,
|
||||
dsw,
|
||||
asw,
|
||||
hasAddedPods,
|
||||
oex,
|
||||
&mount.FakeMounter{},
|
||||
hostutil.NewFakeHostUtil(nil),
|
||||
volumePluginMgr,
|
||||
kubeletPodsDir)
|
||||
volumeSpec := &volume.Spec{PersistentVolume: pv}
|
||||
podName := util.GetUniquePodName(pod)
|
||||
volumeName, err := dsw.AddPodToVolume(
|
||||
podName, pod, volumeSpec, volumeSpec.Name(), "" /* volumeGidValue */)
|
||||
// Assert
|
||||
if err != nil {
|
||||
t.Fatalf("AddPodToVolume failed. Expected: <no error> Actual: <%v>", err)
|
||||
}
|
||||
dsw.MarkVolumesReportedInUse([]v1.UniqueVolumeName{volumeName})
|
||||
|
||||
// Start the reconciler to fill ASW.
|
||||
stopChan, stoppedChan := make(chan struct{}), make(chan struct{})
|
||||
go func() {
|
||||
reconciler.Run(stopChan)
|
||||
close(stoppedChan)
|
||||
}()
|
||||
waitForVolumeToExistInASW(t, volumeName, asw)
|
||||
waitForUncertainPodMount(t, volumeName, asw)
|
||||
|
||||
dsw.DeletePodFromVolume(podName, volumeName)
|
||||
waitForDetach(t, volumeName, asw)
|
||||
|
||||
volumetesting.VerifyUnmountDeviceCallCount(1, fakePlugin)
|
||||
volumetesting.VerifyTearDownCallCount(1, fakePlugin)
|
||||
}
|
||||
|
||||
func waitForUncertainGlobalMount(t *testing.T, volumeName v1.UniqueVolumeName, asw cache.ActualStateOfWorld) {
|
||||
|
@ -100,16 +100,16 @@ func (c *csiMountMgr) CanMount() error {
|
||||
}
|
||||
|
||||
func (c *csiMountMgr) SetUp(mounterArgs volume.MounterArgs) (volumetypes.OperationStatus, error) {
|
||||
opExitStatus, err := c.setupUtil(c.GetPath(), mounterArgs)
|
||||
opExitStatus, err := c.setupInternal(c.GetPath(), mounterArgs)
|
||||
return opExitStatus, err
|
||||
}
|
||||
|
||||
func (c *csiMountMgr) SetUpAt(dir string, mounterArgs volume.MounterArgs) error {
|
||||
_, err := c.setupUtil(dir, mounterArgs)
|
||||
_, err := c.setupInternal(dir, mounterArgs)
|
||||
return err
|
||||
}
|
||||
|
||||
func (c *csiMountMgr) setupUtil(dir string, mounterArgs volume.MounterArgs) (volumetypes.OperationStatus, error) {
|
||||
func (c *csiMountMgr) setupInternal(dir string, mounterArgs volume.MounterArgs) (volumetypes.OperationStatus, error) {
|
||||
klog.V(4).Infof(log("Mounter.SetUpAt(%s)", dir))
|
||||
// default to finished operation status
|
||||
opExitStatus := volumetypes.OperationFinished
|
||||
|
@ -69,12 +69,25 @@ const (
|
||||
MultiAttachNode = "multi-attach-node"
|
||||
// TimeoutOnSetupVolumeName will cause Setup call to timeout but volume will finish mounting.
|
||||
TimeoutOnSetupVolumeName = "timeout-setup-volume"
|
||||
// FailOnSetupVolumeName will cause setup call to fail
|
||||
FailOnSetupVolumeName = "fail-setup-volume"
|
||||
//TimeoutAndFailOnSetupVolumeName will first timeout and then fail the setup
|
||||
TimeoutAndFailOnSetupVolumeName = "timeout-and-fail-setup-volume"
|
||||
// SuccessAndTimeoutSetupVolumeName will cause first mount operation to succeed but subsequent attempts to timeout
|
||||
SuccessAndTimeoutSetupVolumeName = "success-and-timeout-setup-volume-name"
|
||||
// SuccessAndFailOnSetupVolumeName will cause first mount operation to succeed but subsequent attempts to fail
|
||||
SuccessAndFailOnSetupVolumeName = "success-and-failed-setup-device-name"
|
||||
|
||||
// TimeoutOnMountDeviceVolumeName will cause MountDevice call to timeout but Setup will finish.
|
||||
TimeoutOnMountDeviceVolumeName = "timeout-mount-device-volume"
|
||||
// TimeoutAndFailOnMountDeviceVolumeName will cause first MountDevice call to timeout but second call will fail
|
||||
TimeoutAndFailOnMountDeviceVolumeName = "timeout-and-fail-mount-device-name"
|
||||
// FailMountDeviceVolumeName will cause MountDevice operation on volume to fail
|
||||
FailMountDeviceVolumeName = "fail-mount-device-volume-name"
|
||||
// SuccessAndTimeoutDeviceName will cause first mount operation to succeed but subsequent attempts to timeout
|
||||
SuccessAndTimeoutDeviceName = "success-and-timeout-device-name"
|
||||
// SuccessAndFailOnMountDeviceName will cause first mount operation to succeed but subsequent attempts to fail
|
||||
SuccessAndFailOnMountDeviceName = "success-and-failed-mount-device-name"
|
||||
)
|
||||
|
||||
// fakeVolumeHost is useful for testing volume plugins.
|
||||
@ -354,6 +367,7 @@ type FakeVolumePlugin struct {
|
||||
VolumeLimitsError error
|
||||
LimitKey string
|
||||
ProvisionDelaySeconds int
|
||||
SupportsRemount bool
|
||||
|
||||
// Add callbacks as needed
|
||||
WaitForAttachHook func(spec *Spec, devicePath string, pod *v1.Pod, spectimeout time.Duration) (string, error)
|
||||
@ -393,6 +407,7 @@ func (plugin *FakeVolumePlugin) getFakeVolume(list *[]*FakeVolume) *FakeVolume {
|
||||
}
|
||||
volume.VolumesAttached = make(map[string]types.NodeName)
|
||||
volume.DeviceMountState = make(map[string]volumetypes.OperationStatus)
|
||||
volume.VolumeMountState = make(map[string]volumetypes.OperationStatus)
|
||||
*list = append(*list, volume)
|
||||
return volume
|
||||
}
|
||||
@ -430,7 +445,7 @@ func (plugin *FakeVolumePlugin) CanSupport(spec *Spec) bool {
|
||||
}
|
||||
|
||||
func (plugin *FakeVolumePlugin) RequiresRemount() bool {
|
||||
return false
|
||||
return plugin.SupportsRemount
|
||||
}
|
||||
|
||||
func (plugin *FakeVolumePlugin) SupportsMountOption() bool {
|
||||
@ -796,6 +811,7 @@ type FakeVolume struct {
|
||||
MetricsNil
|
||||
VolumesAttached map[string]types.NodeName
|
||||
DeviceMountState map[string]volumetypes.OperationStatus
|
||||
VolumeMountState map[string]volumetypes.OperationStatus
|
||||
|
||||
// Add callbacks as needed
|
||||
WaitForAttachHook func(spec *Spec, devicePath string, pod *v1.Pod, spectimeout time.Duration) (string, error)
|
||||
@ -844,22 +860,58 @@ func (fv *FakeVolume) CanMount() error {
|
||||
}
|
||||
|
||||
func (fv *FakeVolume) SetUp(mounterArgs MounterArgs) (volumetypes.OperationStatus, error) {
|
||||
internalSetup := func() error {
|
||||
fv.Lock()
|
||||
defer fv.Unlock()
|
||||
if fv.VolName == TimeoutOnSetupVolumeName {
|
||||
return volumetypes.NewOperationTimedOutError("time out on setup")
|
||||
}
|
||||
fv.SetUpCallCount++
|
||||
return fv.SetUpAt(fv.getPath(), mounterArgs)
|
||||
}
|
||||
err := internalSetup()
|
||||
fv.Lock()
|
||||
defer fv.Unlock()
|
||||
err := fv.setupInternal(mounterArgs)
|
||||
fv.SetUpCallCount++
|
||||
if volumetypes.IsOperationTimeOutError(err) {
|
||||
return volumetypes.OperationInProgress, err
|
||||
}
|
||||
return volumetypes.OperationFinished, err
|
||||
}
|
||||
|
||||
func (fv *FakeVolume) setupInternal(mounterArgs MounterArgs) error {
|
||||
if fv.VolName == TimeoutOnSetupVolumeName {
|
||||
fv.VolumeMountState[fv.VolName] = volumetypes.OperationInProgress
|
||||
return volumetypes.NewOperationTimedOutError("time out on setup")
|
||||
}
|
||||
|
||||
if fv.VolName == FailOnSetupVolumeName {
|
||||
return fmt.Errorf("mounting volume failed")
|
||||
}
|
||||
|
||||
if fv.VolName == TimeoutAndFailOnSetupVolumeName {
|
||||
_, ok := fv.VolumeMountState[fv.VolName]
|
||||
if !ok {
|
||||
fv.VolumeMountState[fv.VolName] = volumetypes.OperationInProgress
|
||||
return volumetypes.NewOperationTimedOutError("time out on setup")
|
||||
}
|
||||
fv.VolumeMountState[fv.VolName] = volumetypes.OperationFinished
|
||||
return fmt.Errorf("mounting volume failed")
|
||||
|
||||
}
|
||||
|
||||
if fv.VolName == SuccessAndFailOnSetupVolumeName {
|
||||
_, ok := fv.VolumeMountState[fv.VolName]
|
||||
if ok {
|
||||
fv.VolumeMountState[fv.VolName] = volumetypes.OperationFinished
|
||||
return fmt.Errorf("mounting volume failed")
|
||||
}
|
||||
}
|
||||
|
||||
if fv.VolName == SuccessAndTimeoutSetupVolumeName {
|
||||
_, ok := fv.VolumeMountState[fv.VolName]
|
||||
if ok {
|
||||
fv.VolumeMountState[fv.VolName] = volumetypes.OperationInProgress
|
||||
return volumetypes.NewOperationTimedOutError("time out on setup")
|
||||
}
|
||||
}
|
||||
|
||||
fv.VolumeMountState[fv.VolName] = volumetypes.OperationFinished
|
||||
|
||||
return fv.SetUpAt(fv.getPath(), mounterArgs)
|
||||
}
|
||||
|
||||
func (fv *FakeVolume) GetSetUpCallCount() int {
|
||||
fv.RLock()
|
||||
defer fv.RUnlock()
|
||||
@ -1071,16 +1123,30 @@ func (fv *FakeVolume) mountDeviceInternal(spec *Spec, devicePath string, deviceM
|
||||
}
|
||||
|
||||
if spec.Name() == TimeoutAndFailOnMountDeviceVolumeName {
|
||||
oldState, ok := fv.DeviceMountState[spec.Name()]
|
||||
_, ok := fv.DeviceMountState[spec.Name()]
|
||||
if !ok {
|
||||
fv.DeviceMountState[spec.Name()] = volumetypes.OperationInProgress
|
||||
return volumetypes.NewOperationTimedOutError("timed out mounting error")
|
||||
}
|
||||
fv.DeviceMountState[spec.Name()] = volumetypes.OperationFinished
|
||||
return fmt.Errorf("error mounting disk: %s", devicePath)
|
||||
}
|
||||
|
||||
if spec.Name() == SuccessAndTimeoutDeviceName {
|
||||
_, ok := fv.DeviceMountState[spec.Name()]
|
||||
if ok {
|
||||
fv.DeviceMountState[spec.Name()] = volumetypes.OperationInProgress
|
||||
return volumetypes.NewOperationTimedOutError("error mounting state")
|
||||
}
|
||||
if oldState == volumetypes.OperationInProgress {
|
||||
fv.DeviceMountState[spec.Name()] = volumetypes.OperationFinished
|
||||
}
|
||||
|
||||
if spec.Name() == SuccessAndFailOnMountDeviceName {
|
||||
_, ok := fv.DeviceMountState[spec.Name()]
|
||||
if ok {
|
||||
return fmt.Errorf("error mounting disk: %s", devicePath)
|
||||
}
|
||||
}
|
||||
fv.DeviceMountState[spec.Name()] = volumetypes.OperationFinished
|
||||
fv.MountDeviceCallCount++
|
||||
return nil
|
||||
}
|
||||
@ -1361,16 +1427,25 @@ func VerifyMountDeviceCallCount(
|
||||
}
|
||||
|
||||
func VerifyUnmountDeviceCallCount(expectedCallCount int, fakeVolumePlugin *FakeVolumePlugin) error {
|
||||
for _, attacher := range fakeVolumePlugin.GetAttachers() {
|
||||
actualCallCount := attacher.GetUnmountDeviceCallCount()
|
||||
if actualCallCount >= expectedCallCount {
|
||||
detachers := fakeVolumePlugin.GetDetachers()
|
||||
if len(detachers) == 0 && (expectedCallCount == 0) {
|
||||
return nil
|
||||
}
|
||||
actualCallCount := 0
|
||||
for _, detacher := range detachers {
|
||||
actualCallCount = detacher.GetUnmountDeviceCallCount()
|
||||
if expectedCallCount == 0 && actualCallCount == expectedCallCount {
|
||||
return nil
|
||||
}
|
||||
|
||||
if (expectedCallCount > 0) && (actualCallCount >= expectedCallCount) {
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
||||
return fmt.Errorf(
|
||||
"No Attachers have expected MountunDeviceCallCount. Expected: <%v>.",
|
||||
expectedCallCount)
|
||||
"Expected DeviceUnmount Call %d, got %d",
|
||||
expectedCallCount, actualCallCount)
|
||||
}
|
||||
|
||||
// VerifyZeroMountDeviceCallCount ensures that all Attachers for this plugin
|
||||
@ -1427,9 +1502,18 @@ func VerifyZeroSetUpCallCount(fakeVolumePlugin *FakeVolumePlugin) error {
|
||||
func VerifyTearDownCallCount(
|
||||
expectedTearDownCallCount int,
|
||||
fakeVolumePlugin *FakeVolumePlugin) error {
|
||||
for _, unmounter := range fakeVolumePlugin.GetUnmounters() {
|
||||
unmounters := fakeVolumePlugin.GetUnmounters()
|
||||
if len(unmounters) == 0 && (expectedTearDownCallCount == 0) {
|
||||
return nil
|
||||
}
|
||||
|
||||
for _, unmounter := range unmounters {
|
||||
actualCallCount := unmounter.GetTearDownCallCount()
|
||||
if actualCallCount >= expectedTearDownCallCount {
|
||||
if expectedTearDownCallCount == 0 && actualCallCount == expectedTearDownCallCount {
|
||||
return nil
|
||||
}
|
||||
|
||||
if (expectedTearDownCallCount > 0) && (actualCallCount >= expectedTearDownCallCount) {
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
@ -196,6 +196,12 @@ type ActualStateOfWorldMounterUpdater interface {
|
||||
|
||||
// Marks the specified volume's file system resize request is finished.
|
||||
MarkVolumeAsResized(podName volumetypes.UniquePodName, volumeName v1.UniqueVolumeName) error
|
||||
|
||||
// GetDeviceMountState returns mount state of the device in global path
|
||||
GetDeviceMountState(volumeName v1.UniqueVolumeName) DeviceMountState
|
||||
|
||||
// GetVolumeMountState returns mount state of the volume for the Pod
|
||||
GetVolumeMountState(volumName v1.UniqueVolumeName, podName volumetypes.UniquePodName) VolumeMountState
|
||||
}
|
||||
|
||||
// ActualStateOfWorldAttacherUpdater defines a set of operations updating the
|
||||
@ -397,6 +403,9 @@ const (
|
||||
|
||||
// VolumeMountUncertain means volume may or may not be mounted in pods' local path
|
||||
VolumeMountUncertain VolumeMountState = "VolumeMountUncertain"
|
||||
|
||||
// VolumeNotMounted means volume has not be mounted in pod's local path
|
||||
VolumeNotMounted VolumeMountState = "VolumeNotMounted"
|
||||
)
|
||||
|
||||
// GenerateMsgDetailed returns detailed msgs for volumes to mount
|
||||
|
@ -580,18 +580,7 @@ func (og *operationGenerator) GenerateMountVolumeFunc(
|
||||
devicePath,
|
||||
deviceMountPath)
|
||||
if err != nil {
|
||||
switch operationState {
|
||||
case volumetypes.OperationInProgress:
|
||||
markDeviceUncertainError := actualStateOfWorld.MarkDeviceAsUncertain(volumeToMount.VolumeName, devicePath, deviceMountPath)
|
||||
if markDeviceUncertainError != nil {
|
||||
klog.Errorf(volumeToMount.GenerateErrorDetailed("MountDevice.MarkDeviceAsUncertain failed", markDeviceUncertainError).Error())
|
||||
}
|
||||
case volumetypes.OperationFinished:
|
||||
markDeviceUnmountError := actualStateOfWorld.MarkDeviceAsUnmounted(volumeToMount.VolumeName)
|
||||
if markDeviceUnmountError != nil {
|
||||
klog.Errorf(volumeToMount.GenerateErrorDetailed("MountDevice.MarkDeviceAsUnmounted failed", markDeviceUnmountError).Error())
|
||||
}
|
||||
}
|
||||
og.markDeviceErrorState(volumeToMount, devicePath, deviceMountPath, operationState, actualStateOfWorld)
|
||||
// On failure, return error. Caller will log and retry.
|
||||
return volumeToMount.GenerateError("MountVolume.MountDevice failed", err)
|
||||
}
|
||||
@ -645,19 +634,7 @@ func (og *operationGenerator) GenerateMountVolumeFunc(
|
||||
VolumeMountState: VolumeMounted,
|
||||
}
|
||||
if mountErr != nil {
|
||||
switch opExitStatus {
|
||||
case volumetypes.OperationInProgress:
|
||||
markOpts.VolumeMountState = VolumeMountUncertain
|
||||
t := actualStateOfWorld.MarkVolumeMountAsUncertain(markOpts)
|
||||
if t != nil {
|
||||
klog.Errorf(volumeToMount.GenerateErrorDetailed("MountVolume.MarkVolumeMountAsUncertain failed", t).Error())
|
||||
}
|
||||
case volumetypes.OperationFinished:
|
||||
t := actualStateOfWorld.MarkVolumeAsUnmounted(volumeToMount.PodName, volumeToMount.VolumeName)
|
||||
if t != nil {
|
||||
klog.Errorf(volumeToMount.GenerateErrorDetailed("MountVolume.MarkVolumeAsUnmounted failed", t).Error())
|
||||
}
|
||||
}
|
||||
og.markVolumeErrorState(volumeToMount, markOpts, opExitStatus, actualStateOfWorld)
|
||||
// On failure, return error. Caller will log and retry.
|
||||
return volumeToMount.GenerateError("MountVolume.SetUp failed", mountErr)
|
||||
}
|
||||
@ -717,6 +694,47 @@ func (og *operationGenerator) GenerateMountVolumeFunc(
|
||||
}
|
||||
}
|
||||
|
||||
func (og *operationGenerator) markDeviceErrorState(volumeToMount VolumeToMount, devicePath, deviceMountPath string, operationState volumetypes.OperationStatus, actualStateOfWorld ActualStateOfWorldMounterUpdater) {
|
||||
switch operationState {
|
||||
case volumetypes.OperationInProgress:
|
||||
// only devices which are not mounted can be marked as uncertain. We do not want to mark a device
|
||||
// which was previously marked as mounted here as uncertain.
|
||||
if actualStateOfWorld.GetDeviceMountState(volumeToMount.VolumeName) == DeviceNotMounted {
|
||||
markDeviceUncertainError := actualStateOfWorld.MarkDeviceAsUncertain(volumeToMount.VolumeName, devicePath, deviceMountPath)
|
||||
if markDeviceUncertainError != nil {
|
||||
klog.Errorf(volumeToMount.GenerateErrorDetailed("MountDevice.MarkDeviceAsUncertain failed", markDeviceUncertainError).Error())
|
||||
}
|
||||
}
|
||||
case volumetypes.OperationFinished:
|
||||
// Similarly only devices which were uncertain can be marked as unmounted
|
||||
if actualStateOfWorld.GetDeviceMountState(volumeToMount.VolumeName) == DeviceMountUncertain {
|
||||
markDeviceUnmountError := actualStateOfWorld.MarkDeviceAsUnmounted(volumeToMount.VolumeName)
|
||||
if markDeviceUnmountError != nil {
|
||||
klog.Errorf(volumeToMount.GenerateErrorDetailed("MountDevice.MarkDeviceAsUnmounted failed", markDeviceUnmountError).Error())
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (og *operationGenerator) markVolumeErrorState(volumeToMount VolumeToMount, markOpts MarkVolumeOpts, operationState volumetypes.OperationStatus, actualStateOfWorld ActualStateOfWorldMounterUpdater) {
|
||||
switch operationState {
|
||||
case volumetypes.OperationInProgress:
|
||||
if actualStateOfWorld.GetVolumeMountState(volumeToMount.VolumeName, markOpts.PodName) == VolumeNotMounted {
|
||||
t := actualStateOfWorld.MarkVolumeMountAsUncertain(markOpts)
|
||||
if t != nil {
|
||||
klog.Errorf(volumeToMount.GenerateErrorDetailed("MountVolume.MarkVolumeMountAsUncertain failed", t).Error())
|
||||
}
|
||||
}
|
||||
case volumetypes.OperationFinished:
|
||||
if actualStateOfWorld.GetVolumeMountState(volumeToMount.VolumeName, markOpts.PodName) == VolumeMountUncertain {
|
||||
t := actualStateOfWorld.MarkVolumeAsUnmounted(volumeToMount.PodName, volumeToMount.VolumeName)
|
||||
if t != nil {
|
||||
klog.Errorf(volumeToMount.GenerateErrorDetailed("MountVolume.MarkVolumeAsUnmounted failed", t).Error())
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (og *operationGenerator) GenerateUnmountVolumeFunc(
|
||||
volumeToUnmount MountedVolume,
|
||||
actualStateOfWorld ActualStateOfWorldMounterUpdater,
|
||||
|
@ -60,7 +60,7 @@ const (
|
||||
|
||||
// OperationInProgress means volume operation has been started and
|
||||
// is in-progress. This state does not indicate if operation will succeed or fail but
|
||||
// merely it has been started and in in-progress.
|
||||
// merely it has been started and is in-progress.
|
||||
OperationInProgress OperationStatus = "InProgress"
|
||||
|
||||
// OperationStateNoChange indicates it is unchanged from previous state.
|
||||
|
Loading…
Reference in New Issue
Block a user