Refactor NodeStage function

Timeout operations should result in
Fix unit tests for uncertainDeviceGlobalMounts
This commit is contained in:
Hemant Kumar
2019-09-27 15:25:19 -04:00
parent 57019e0628
commit 0c52b6606e
8 changed files with 161 additions and 97 deletions

View File

@@ -224,6 +224,7 @@ go_test(
"//pkg/volume/util:go_default_library",
"//pkg/volume/util/hostutil:go_default_library",
"//pkg/volume/util/subpath:go_default_library",
"//pkg/volume/util/types:go_default_library",
"//staging/src/k8s.io/api/core/v1:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/api/equality:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/api/errors:go_default_library",

View File

@@ -51,9 +51,10 @@ const (
reconcilerLoopSleepDuration time.Duration = 1 * time.Nanosecond
// waitForAttachTimeout is the maximum amount of time a
// operationexecutor.Mount call will wait for a volume to be attached.
waitForAttachTimeout time.Duration = 1 * time.Second
nodeName k8stypes.NodeName = k8stypes.NodeName("mynodename")
kubeletPodsDir string = "fake-dir"
waitForAttachTimeout time.Duration = 1 * time.Second
nodeName k8stypes.NodeName = k8stypes.NodeName("mynodename")
kubeletPodsDir string = "fake-dir"
testOperationBackOffDuration time.Duration = 100 * time.Millisecond
)
func hasAddedPods() bool { return true }
@@ -1134,7 +1135,7 @@ func Test_Run_Positive_VolumeFSResizeControllerAttachEnabled(t *testing.T) {
// resize operation and clear the fsResizeRequired flag for volume.
go reconciler.Run(wait.NeverStop)
waitErr := retryWithExponentialBackOff(500*time.Millisecond, func() (done bool, err error) {
waitErr := retryWithExponentialBackOff(testOperationBackOffDuration, func() (done bool, err error) {
mounted, _, err := asw.PodExistsInVolume(podName, volumeName)
return mounted && err == nil, nil
})
@@ -1147,97 +1148,131 @@ func Test_Run_Positive_VolumeFSResizeControllerAttachEnabled(t *testing.T) {
func Test_UncertainDeviceGlobalMounts(t *testing.T) {
fsMode := v1.PersistentVolumeFilesystem
pv := &v1.PersistentVolume{
ObjectMeta: metav1.ObjectMeta{
Name: volumetesting.TimeoutOnMountDeviceVolumeName,
UID: "pvuid",
var tests = []struct {
name string
deviceState operationexecutor.DeviceMountState
unmountDeviceCallCount int
volumeName string
}{
{
name: "timed out operations should result in device marked as uncertain",
deviceState: operationexecutor.DeviceMountUncertain,
unmountDeviceCallCount: 1,
volumeName: volumetesting.TimeoutOnMountDeviceVolumeName,
},
Spec: v1.PersistentVolumeSpec{
ClaimRef: &v1.ObjectReference{Name: "pvc"},
VolumeMode: &fsMode,
{
name: "failed operation should result in not-mounted device",
deviceState: operationexecutor.DeviceNotMounted,
unmountDeviceCallCount: 0,
volumeName: volumetesting.FailMountDeviceVolumeName,
},
{
name: "timeout followed by failed operation should result in non-mounted device",
deviceState: operationexecutor.DeviceNotMounted,
unmountDeviceCallCount: 0,
volumeName: volumetesting.TimeoutAndFailOnMountDeviceVolumeName,
},
}
pvc := &v1.PersistentVolumeClaim{
ObjectMeta: metav1.ObjectMeta{
Name: "pvc",
UID: "pvcuid",
},
Spec: v1.PersistentVolumeClaimSpec{
VolumeName: volumetesting.TimeoutOnMountDeviceVolumeName,
},
}
pod := &v1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: "pod1",
UID: "pod1uid",
},
Spec: v1.PodSpec{
Volumes: []v1.Volume{
{
Name: "volume-name",
VolumeSource: v1.VolumeSource{
PersistentVolumeClaim: &v1.PersistentVolumeClaimVolumeSource{
ClaimName: pvc.Name,
for _, tc := range tests {
t.Run(tc.name, func(t *testing.T) {
pv := &v1.PersistentVolume{
ObjectMeta: metav1.ObjectMeta{
Name: tc.volumeName,
UID: "pvuid",
},
Spec: v1.PersistentVolumeSpec{
ClaimRef: &v1.ObjectReference{Name: "pvc"},
VolumeMode: &fsMode,
},
}
pvc := &v1.PersistentVolumeClaim{
ObjectMeta: metav1.ObjectMeta{
Name: "pvc",
UID: "pvcuid",
},
Spec: v1.PersistentVolumeClaimSpec{
VolumeName: tc.volumeName,
},
}
pod := &v1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: "pod1",
UID: "pod1uid",
},
Spec: v1.PodSpec{
Volumes: []v1.Volume{
{
Name: "volume-name",
VolumeSource: v1.VolumeSource{
PersistentVolumeClaim: &v1.PersistentVolumeClaimVolumeSource{
ClaimName: pvc.Name,
},
},
},
},
},
},
},
}
volumePluginMgr, fakePlugin := volumetesting.GetTestVolumePluginMgr(t)
dsw := cache.NewDesiredStateOfWorld(volumePluginMgr)
asw := cache.NewActualStateOfWorld(nodeName, volumePluginMgr)
kubeClient := createtestClientWithPVPVC(pv, pvc, v1.AttachedVolume{
Name: v1.UniqueVolumeName(fmt.Sprintf("fake-plugin/%s", tc.volumeName)),
DevicePath: "fake/path",
})
fakeRecorder := &record.FakeRecorder{}
fakeHandler := volumetesting.NewBlockVolumePathHandler()
oex := operationexecutor.NewOperationExecutor(operationexecutor.NewOperationGenerator(
kubeClient,
volumePluginMgr,
fakeRecorder,
false, /* checkNodeCapabilitiesBeforeMount */
fakeHandler))
reconciler := NewReconciler(
kubeClient,
true, /* controllerAttachDetachEnabled */
reconcilerLoopSleepDuration,
waitForAttachTimeout,
nodeName,
dsw,
asw,
hasAddedPods,
oex,
&mount.FakeMounter{},
hostutil.NewFakeHostUtil(nil),
volumePluginMgr,
kubeletPodsDir)
volumeSpec := &volume.Spec{PersistentVolume: pv}
podName := util.GetUniquePodName(pod)
volumeName, err := dsw.AddPodToVolume(
podName, pod, volumeSpec, volumeSpec.Name(), "" /* volumeGidValue */)
// Assert
if err != nil {
t.Fatalf("AddPodToVolume failed. Expected: <no error> Actual: <%v>", err)
}
dsw.MarkVolumesReportedInUse([]v1.UniqueVolumeName{volumeName})
// Start the reconciler to fill ASW.
stopChan, stoppedChan := make(chan struct{}), make(chan struct{})
go func() {
reconciler.Run(stopChan)
close(stoppedChan)
}()
waitForVolumeToExistInASW(t, volumeName, asw)
if tc.deviceState == operationexecutor.DeviceMountUncertain {
waitForUncertainGlobalMount(t, volumeName, asw)
}
dsw.DeletePodFromVolume(podName, volumeName)
waitForDetach(t, volumeName, asw)
volumetesting.VerifyUnmountDeviceCallCount(1, fakePlugin)
})
}
volumePluginMgr, fakePlugin := volumetesting.GetTestVolumePluginMgr(t)
dsw := cache.NewDesiredStateOfWorld(volumePluginMgr)
asw := cache.NewActualStateOfWorld(nodeName, volumePluginMgr)
kubeClient := createtestClientWithPVPVC(pv, pvc, v1.AttachedVolume{
Name: v1.UniqueVolumeName(fmt.Sprintf("fake-plugin/%s", volumetesting.TimeoutOnMountDeviceVolumeName)),
DevicePath: "fake/path",
})
fakeRecorder := &record.FakeRecorder{}
fakeHandler := volumetesting.NewBlockVolumePathHandler()
oex := operationexecutor.NewOperationExecutor(operationexecutor.NewOperationGenerator(
kubeClient,
volumePluginMgr,
fakeRecorder,
false, /* checkNodeCapabilitiesBeforeMount */
fakeHandler))
reconciler := NewReconciler(
kubeClient,
true, /* controllerAttachDetachEnabled */
reconcilerLoopSleepDuration,
waitForAttachTimeout,
nodeName,
dsw,
asw,
hasAddedPods,
oex,
&mount.FakeMounter{},
hostutil.NewFakeHostUtil(nil),
volumePluginMgr,
kubeletPodsDir)
volumeSpec := &volume.Spec{PersistentVolume: pv}
podName := util.GetUniquePodName(pod)
volumeName, err := dsw.AddPodToVolume(
podName, pod, volumeSpec, volumeSpec.Name(), "" /* volumeGidValue */)
// Assert
if err != nil {
t.Fatalf("AddPodToVolume failed. Expected: <no error> Actual: <%v>", err)
}
dsw.MarkVolumesReportedInUse([]v1.UniqueVolumeName{volumeName})
// Start the reconciler to fill ASW.
stopChan, stoppedChan := make(chan struct{}), make(chan struct{})
go func() {
reconciler.Run(stopChan)
close(stoppedChan)
}()
waitForVolumeToExistInASW(t, volumeName, asw)
waitForUncertainGlobalMount(t, volumeName, asw)
dsw.DeletePodFromVolume(podName, volumeName)
waitForDetach(t, volumeName, asw)
volumetesting.VerifyUnmountDeviceCallCount(1, fakePlugin)
}
func Test_UncertainVolumeMountState(t *testing.T) {
@@ -1339,7 +1374,7 @@ func Test_UncertainVolumeMountState(t *testing.T) {
func waitForUncertainGlobalMount(t *testing.T, volumeName v1.UniqueVolumeName, asw cache.ActualStateOfWorld) {
// check if volume is globally mounted in uncertain state
err := retryWithExponentialBackOff(
time.Duration(500*time.Millisecond),
testOperationBackOffDuration,
func() (bool, error) {
unmountedVolumes := asw.GetUnmountedVolumes()
for _, v := range unmountedVolumes {
@@ -1359,7 +1394,7 @@ func waitForUncertainGlobalMount(t *testing.T, volumeName v1.UniqueVolumeName, a
func waitForUncertainPodMount(t *testing.T, volumeName v1.UniqueVolumeName, asw cache.ActualStateOfWorld) {
// check if volume is locally pod mounted in uncertain state
err := retryWithExponentialBackOff(
time.Duration(500*time.Millisecond),
testOperationBackOffDuration,
func() (bool, error) {
allMountedVolumes := asw.GetAllMountedVolumes()
for _, v := range allMountedVolumes {
@@ -1382,7 +1417,7 @@ func waitForMount(
volumeName v1.UniqueVolumeName,
asw cache.ActualStateOfWorld) {
err := retryWithExponentialBackOff(
time.Duration(500*time.Millisecond),
testOperationBackOffDuration,
func() (bool, error) {
mountedVolumes := asw.GetMountedVolumes()
for _, mountedVolume := range mountedVolumes {
@@ -1402,7 +1437,7 @@ func waitForMount(
func waitForVolumeToExistInASW(t *testing.T, volumeName v1.UniqueVolumeName, asw cache.ActualStateOfWorld) {
err := retryWithExponentialBackOff(
time.Duration(500*time.Millisecond),
testOperationBackOffDuration,
func() (bool, error) {
if asw.VolumeExists(volumeName) {
return true, nil
@@ -1420,7 +1455,7 @@ func waitForDetach(
volumeName v1.UniqueVolumeName,
asw cache.ActualStateOfWorld) {
err := retryWithExponentialBackOff(
time.Duration(500*time.Millisecond),
testOperationBackOffDuration,
func() (bool, error) {
if asw.VolumeExists(volumeName) {
return false, nil

View File

@@ -66,6 +66,7 @@ go_test(
"//pkg/volume/csi/fake:go_default_library",
"//pkg/volume/testing:go_default_library",
"//pkg/volume/util:go_default_library",
"//pkg/volume/util/types:go_default_library",
"//staging/src/k8s.io/api/core/v1:go_default_library",
"//staging/src/k8s.io/api/storage/v1:go_default_library",
"//staging/src/k8s.io/api/storage/v1beta1:go_default_library",

View File

@@ -499,7 +499,7 @@ func TestMounterSetupWithStatusTracking(t *testing.T) {
}
if !tc.shouldFail && err != nil {
t.Fatalf("expected successs got mounter.Setup failed with: %v", err)
t.Fatalf("expected success got mounter.Setup failed with: %v", err)
}
})
}

View File

@@ -11,6 +11,8 @@ go_library(
deps = [
"//vendor/github.com/container-storage-interface/spec/lib/go/csi:go_default_library",
"//vendor/google.golang.org/grpc:go_default_library",
"//vendor/google.golang.org/grpc/codes:go_default_library",
"//vendor/google.golang.org/grpc/status:go_default_library",
],
)

View File

@@ -98,8 +98,8 @@ func (a *flexVolumeAttacher) MountDevice(spec *volume.Spec, devicePath string, d
return err
}
func (attacher *flexVolumeAttacher) MountDeviceWithStatusTracking(spec *volume.Spec, devicePath string, deviceMountPath string) (volumetypes.OperationStatus, error) {
err := attacher.MountDevice(spec, devicePath, deviceMountPath)
func (a *flexVolumeAttacher) MountDeviceWithStatusTracking(spec *volume.Spec, devicePath string, deviceMountPath string) (volumetypes.OperationStatus, error) {
err := a.MountDevice(spec, devicePath, deviceMountPath)
return volumetypes.OperationFinished, err
}

View File

@@ -71,6 +71,10 @@ const (
TimeoutOnSetupVolumeName = "timeout-setup-volume"
// TimeoutOnMountDeviceVolumeName will cause MountDevice call to timeout but Setup will finish.
TimeoutOnMountDeviceVolumeName = "timeout-mount-device-volume"
// TimeoutAndFailOnMountDeviceVolumeName will cause first MountDevice call to timeout but second call will fail
TimeoutAndFailOnMountDeviceVolumeName = "timeout-and-fail-mount-device-name"
// FailMountDeviceVolumeName will cause MountDevice operation on volume to fail
FailMountDeviceVolumeName = "fail-mount-device-volume-name"
)
// fakeVolumeHost is useful for testing volume plugins.
@@ -388,6 +392,7 @@ func (plugin *FakeVolumePlugin) getFakeVolume(list *[]*FakeVolume) *FakeVolume {
UnmountDeviceHook: plugin.UnmountDeviceHook,
}
volume.VolumesAttached = make(map[string]types.NodeName)
volume.DeviceMountState = make(map[string]volumetypes.OperationStatus)
*list = append(*list, volume)
return volume
}
@@ -789,7 +794,8 @@ type FakeVolume struct {
VolName string
Plugin *FakeVolumePlugin
MetricsNil
VolumesAttached map[string]types.NodeName
VolumesAttached map[string]types.NodeName
DeviceMountState map[string]volumetypes.OperationStatus
// Add callbacks as needed
WaitForAttachHook func(spec *Spec, devicePath string, pod *v1.Pod, spectimeout time.Duration) (string, error)
@@ -1056,8 +1062,26 @@ func (fv *FakeVolume) MountDevice(spec *Spec, devicePath string, deviceMountPath
fv.Lock()
defer fv.Unlock()
if spec.Name() == TimeoutOnMountDeviceVolumeName {
fv.DeviceMountState[spec.Name()] = volumetypes.OperationInProgress
return volumetypes.NewOperationTimedOutError("error mounting device")
}
if spec.Name() == FailMountDeviceVolumeName {
fv.DeviceMountState[spec.Name()] = volumetypes.OperationFinished
return fmt.Errorf("error mounting disk: %s", devicePath)
}
if spec.Name() == TimeoutAndFailOnMountDeviceVolumeName {
oldState, ok := fv.DeviceMountState[spec.Name()]
if !ok {
fv.DeviceMountState[spec.Name()] = volumetypes.OperationInProgress
return volumetypes.NewOperationTimedOutError("error mounting state")
}
if oldState == volumetypes.OperationInProgress {
fv.DeviceMountState[spec.Name()] = volumetypes.OperationFinished
return fmt.Errorf("error mounting disk: %s", devicePath)
}
}
fv.MountDeviceCallCount++
return nil
}

View File

@@ -51,6 +51,7 @@ func (o *GeneratedOperations) Run() (eventErr, detailedErr error) {
return o.OperationFunc()
}
// OperationStatus is used to store status of a volume operation
type OperationStatus string
const (