From 0c52b6606e6f779ddd0cd22a4ca372b1abf8da60 Mon Sep 17 00:00:00 2001 From: Hemant Kumar Date: Fri, 27 Sep 2019 15:25:19 -0400 Subject: [PATCH] Refactor NodeStage function Timeout operations should result in Fix unit tests for uncertainDeviceGlobalMounts --- pkg/kubelet/BUILD | 1 + .../reconciler/reconciler_test.go | 221 ++++++++++-------- pkg/volume/csi/BUILD | 1 + pkg/volume/csi/csi_mounter_test.go | 2 +- pkg/volume/csi/fake/BUILD | 2 + pkg/volume/flexvolume/attacher.go | 4 +- pkg/volume/testing/testing.go | 26 ++- pkg/volume/util/types/types.go | 1 + 8 files changed, 161 insertions(+), 97 deletions(-) diff --git a/pkg/kubelet/BUILD b/pkg/kubelet/BUILD index f942cf4c2aa..f6681956119 100644 --- a/pkg/kubelet/BUILD +++ b/pkg/kubelet/BUILD @@ -224,6 +224,7 @@ go_test( "//pkg/volume/util:go_default_library", "//pkg/volume/util/hostutil:go_default_library", "//pkg/volume/util/subpath:go_default_library", + "//pkg/volume/util/types:go_default_library", "//staging/src/k8s.io/api/core/v1:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/api/equality:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/api/errors:go_default_library", diff --git a/pkg/kubelet/volumemanager/reconciler/reconciler_test.go b/pkg/kubelet/volumemanager/reconciler/reconciler_test.go index dfafd8985a1..ae8071145cf 100644 --- a/pkg/kubelet/volumemanager/reconciler/reconciler_test.go +++ b/pkg/kubelet/volumemanager/reconciler/reconciler_test.go @@ -51,9 +51,10 @@ const ( reconcilerLoopSleepDuration time.Duration = 1 * time.Nanosecond // waitForAttachTimeout is the maximum amount of time a // operationexecutor.Mount call will wait for a volume to be attached. - waitForAttachTimeout time.Duration = 1 * time.Second - nodeName k8stypes.NodeName = k8stypes.NodeName("mynodename") - kubeletPodsDir string = "fake-dir" + waitForAttachTimeout time.Duration = 1 * time.Second + nodeName k8stypes.NodeName = k8stypes.NodeName("mynodename") + kubeletPodsDir string = "fake-dir" + testOperationBackOffDuration time.Duration = 100 * time.Millisecond ) func hasAddedPods() bool { return true } @@ -1134,7 +1135,7 @@ func Test_Run_Positive_VolumeFSResizeControllerAttachEnabled(t *testing.T) { // resize operation and clear the fsResizeRequired flag for volume. go reconciler.Run(wait.NeverStop) - waitErr := retryWithExponentialBackOff(500*time.Millisecond, func() (done bool, err error) { + waitErr := retryWithExponentialBackOff(testOperationBackOffDuration, func() (done bool, err error) { mounted, _, err := asw.PodExistsInVolume(podName, volumeName) return mounted && err == nil, nil }) @@ -1147,97 +1148,131 @@ func Test_Run_Positive_VolumeFSResizeControllerAttachEnabled(t *testing.T) { func Test_UncertainDeviceGlobalMounts(t *testing.T) { fsMode := v1.PersistentVolumeFilesystem - pv := &v1.PersistentVolume{ - ObjectMeta: metav1.ObjectMeta{ - Name: volumetesting.TimeoutOnMountDeviceVolumeName, - UID: "pvuid", + var tests = []struct { + name string + deviceState operationexecutor.DeviceMountState + unmountDeviceCallCount int + volumeName string + }{ + { + name: "timed out operations should result in device marked as uncertain", + deviceState: operationexecutor.DeviceMountUncertain, + unmountDeviceCallCount: 1, + volumeName: volumetesting.TimeoutOnMountDeviceVolumeName, }, - Spec: v1.PersistentVolumeSpec{ - ClaimRef: &v1.ObjectReference{Name: "pvc"}, - VolumeMode: &fsMode, + { + name: "failed operation should result in not-mounted device", + deviceState: operationexecutor.DeviceNotMounted, + unmountDeviceCallCount: 0, + volumeName: volumetesting.FailMountDeviceVolumeName, + }, + { + name: "timeout followed by failed operation should result in non-mounted device", + deviceState: operationexecutor.DeviceNotMounted, + unmountDeviceCallCount: 0, + volumeName: volumetesting.TimeoutAndFailOnMountDeviceVolumeName, }, } - pvc := &v1.PersistentVolumeClaim{ - ObjectMeta: metav1.ObjectMeta{ - Name: "pvc", - UID: "pvcuid", - }, - Spec: v1.PersistentVolumeClaimSpec{ - VolumeName: volumetesting.TimeoutOnMountDeviceVolumeName, - }, - } - pod := &v1.Pod{ - ObjectMeta: metav1.ObjectMeta{ - Name: "pod1", - UID: "pod1uid", - }, - Spec: v1.PodSpec{ - Volumes: []v1.Volume{ - { - Name: "volume-name", - VolumeSource: v1.VolumeSource{ - PersistentVolumeClaim: &v1.PersistentVolumeClaimVolumeSource{ - ClaimName: pvc.Name, + + for _, tc := range tests { + t.Run(tc.name, func(t *testing.T) { + + pv := &v1.PersistentVolume{ + ObjectMeta: metav1.ObjectMeta{ + Name: tc.volumeName, + UID: "pvuid", + }, + Spec: v1.PersistentVolumeSpec{ + ClaimRef: &v1.ObjectReference{Name: "pvc"}, + VolumeMode: &fsMode, + }, + } + pvc := &v1.PersistentVolumeClaim{ + ObjectMeta: metav1.ObjectMeta{ + Name: "pvc", + UID: "pvcuid", + }, + Spec: v1.PersistentVolumeClaimSpec{ + VolumeName: tc.volumeName, + }, + } + pod := &v1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: "pod1", + UID: "pod1uid", + }, + Spec: v1.PodSpec{ + Volumes: []v1.Volume{ + { + Name: "volume-name", + VolumeSource: v1.VolumeSource{ + PersistentVolumeClaim: &v1.PersistentVolumeClaimVolumeSource{ + ClaimName: pvc.Name, + }, + }, }, }, }, - }, - }, + } + + volumePluginMgr, fakePlugin := volumetesting.GetTestVolumePluginMgr(t) + dsw := cache.NewDesiredStateOfWorld(volumePluginMgr) + asw := cache.NewActualStateOfWorld(nodeName, volumePluginMgr) + kubeClient := createtestClientWithPVPVC(pv, pvc, v1.AttachedVolume{ + Name: v1.UniqueVolumeName(fmt.Sprintf("fake-plugin/%s", tc.volumeName)), + DevicePath: "fake/path", + }) + fakeRecorder := &record.FakeRecorder{} + fakeHandler := volumetesting.NewBlockVolumePathHandler() + oex := operationexecutor.NewOperationExecutor(operationexecutor.NewOperationGenerator( + kubeClient, + volumePluginMgr, + fakeRecorder, + false, /* checkNodeCapabilitiesBeforeMount */ + fakeHandler)) + + reconciler := NewReconciler( + kubeClient, + true, /* controllerAttachDetachEnabled */ + reconcilerLoopSleepDuration, + waitForAttachTimeout, + nodeName, + dsw, + asw, + hasAddedPods, + oex, + &mount.FakeMounter{}, + hostutil.NewFakeHostUtil(nil), + volumePluginMgr, + kubeletPodsDir) + volumeSpec := &volume.Spec{PersistentVolume: pv} + podName := util.GetUniquePodName(pod) + volumeName, err := dsw.AddPodToVolume( + podName, pod, volumeSpec, volumeSpec.Name(), "" /* volumeGidValue */) + // Assert + if err != nil { + t.Fatalf("AddPodToVolume failed. Expected: Actual: <%v>", err) + } + dsw.MarkVolumesReportedInUse([]v1.UniqueVolumeName{volumeName}) + + // Start the reconciler to fill ASW. + stopChan, stoppedChan := make(chan struct{}), make(chan struct{}) + go func() { + reconciler.Run(stopChan) + close(stoppedChan) + }() + waitForVolumeToExistInASW(t, volumeName, asw) + if tc.deviceState == operationexecutor.DeviceMountUncertain { + waitForUncertainGlobalMount(t, volumeName, asw) + } + + dsw.DeletePodFromVolume(podName, volumeName) + waitForDetach(t, volumeName, asw) + + volumetesting.VerifyUnmountDeviceCallCount(1, fakePlugin) + + }) } - - volumePluginMgr, fakePlugin := volumetesting.GetTestVolumePluginMgr(t) - dsw := cache.NewDesiredStateOfWorld(volumePluginMgr) - asw := cache.NewActualStateOfWorld(nodeName, volumePluginMgr) - kubeClient := createtestClientWithPVPVC(pv, pvc, v1.AttachedVolume{ - Name: v1.UniqueVolumeName(fmt.Sprintf("fake-plugin/%s", volumetesting.TimeoutOnMountDeviceVolumeName)), - DevicePath: "fake/path", - }) - fakeRecorder := &record.FakeRecorder{} - fakeHandler := volumetesting.NewBlockVolumePathHandler() - oex := operationexecutor.NewOperationExecutor(operationexecutor.NewOperationGenerator( - kubeClient, - volumePluginMgr, - fakeRecorder, - false, /* checkNodeCapabilitiesBeforeMount */ - fakeHandler)) - - reconciler := NewReconciler( - kubeClient, - true, /* controllerAttachDetachEnabled */ - reconcilerLoopSleepDuration, - waitForAttachTimeout, - nodeName, - dsw, - asw, - hasAddedPods, - oex, - &mount.FakeMounter{}, - hostutil.NewFakeHostUtil(nil), - volumePluginMgr, - kubeletPodsDir) - volumeSpec := &volume.Spec{PersistentVolume: pv} - podName := util.GetUniquePodName(pod) - volumeName, err := dsw.AddPodToVolume( - podName, pod, volumeSpec, volumeSpec.Name(), "" /* volumeGidValue */) - // Assert - if err != nil { - t.Fatalf("AddPodToVolume failed. Expected: Actual: <%v>", err) - } - dsw.MarkVolumesReportedInUse([]v1.UniqueVolumeName{volumeName}) - - // Start the reconciler to fill ASW. - stopChan, stoppedChan := make(chan struct{}), make(chan struct{}) - go func() { - reconciler.Run(stopChan) - close(stoppedChan) - }() - waitForVolumeToExistInASW(t, volumeName, asw) - waitForUncertainGlobalMount(t, volumeName, asw) - - dsw.DeletePodFromVolume(podName, volumeName) - waitForDetach(t, volumeName, asw) - - volumetesting.VerifyUnmountDeviceCallCount(1, fakePlugin) } func Test_UncertainVolumeMountState(t *testing.T) { @@ -1339,7 +1374,7 @@ func Test_UncertainVolumeMountState(t *testing.T) { func waitForUncertainGlobalMount(t *testing.T, volumeName v1.UniqueVolumeName, asw cache.ActualStateOfWorld) { // check if volume is globally mounted in uncertain state err := retryWithExponentialBackOff( - time.Duration(500*time.Millisecond), + testOperationBackOffDuration, func() (bool, error) { unmountedVolumes := asw.GetUnmountedVolumes() for _, v := range unmountedVolumes { @@ -1359,7 +1394,7 @@ func waitForUncertainGlobalMount(t *testing.T, volumeName v1.UniqueVolumeName, a func waitForUncertainPodMount(t *testing.T, volumeName v1.UniqueVolumeName, asw cache.ActualStateOfWorld) { // check if volume is locally pod mounted in uncertain state err := retryWithExponentialBackOff( - time.Duration(500*time.Millisecond), + testOperationBackOffDuration, func() (bool, error) { allMountedVolumes := asw.GetAllMountedVolumes() for _, v := range allMountedVolumes { @@ -1382,7 +1417,7 @@ func waitForMount( volumeName v1.UniqueVolumeName, asw cache.ActualStateOfWorld) { err := retryWithExponentialBackOff( - time.Duration(500*time.Millisecond), + testOperationBackOffDuration, func() (bool, error) { mountedVolumes := asw.GetMountedVolumes() for _, mountedVolume := range mountedVolumes { @@ -1402,7 +1437,7 @@ func waitForMount( func waitForVolumeToExistInASW(t *testing.T, volumeName v1.UniqueVolumeName, asw cache.ActualStateOfWorld) { err := retryWithExponentialBackOff( - time.Duration(500*time.Millisecond), + testOperationBackOffDuration, func() (bool, error) { if asw.VolumeExists(volumeName) { return true, nil @@ -1420,7 +1455,7 @@ func waitForDetach( volumeName v1.UniqueVolumeName, asw cache.ActualStateOfWorld) { err := retryWithExponentialBackOff( - time.Duration(500*time.Millisecond), + testOperationBackOffDuration, func() (bool, error) { if asw.VolumeExists(volumeName) { return false, nil diff --git a/pkg/volume/csi/BUILD b/pkg/volume/csi/BUILD index db1aa80e0d7..f81314c7565 100644 --- a/pkg/volume/csi/BUILD +++ b/pkg/volume/csi/BUILD @@ -66,6 +66,7 @@ go_test( "//pkg/volume/csi/fake:go_default_library", "//pkg/volume/testing:go_default_library", "//pkg/volume/util:go_default_library", + "//pkg/volume/util/types:go_default_library", "//staging/src/k8s.io/api/core/v1:go_default_library", "//staging/src/k8s.io/api/storage/v1:go_default_library", "//staging/src/k8s.io/api/storage/v1beta1:go_default_library", diff --git a/pkg/volume/csi/csi_mounter_test.go b/pkg/volume/csi/csi_mounter_test.go index fcde71c0c13..c397ddf2bae 100644 --- a/pkg/volume/csi/csi_mounter_test.go +++ b/pkg/volume/csi/csi_mounter_test.go @@ -499,7 +499,7 @@ func TestMounterSetupWithStatusTracking(t *testing.T) { } if !tc.shouldFail && err != nil { - t.Fatalf("expected successs got mounter.Setup failed with: %v", err) + t.Fatalf("expected success got mounter.Setup failed with: %v", err) } }) } diff --git a/pkg/volume/csi/fake/BUILD b/pkg/volume/csi/fake/BUILD index c2c6b44d4f5..0d50b64946d 100644 --- a/pkg/volume/csi/fake/BUILD +++ b/pkg/volume/csi/fake/BUILD @@ -11,6 +11,8 @@ go_library( deps = [ "//vendor/github.com/container-storage-interface/spec/lib/go/csi:go_default_library", "//vendor/google.golang.org/grpc:go_default_library", + "//vendor/google.golang.org/grpc/codes:go_default_library", + "//vendor/google.golang.org/grpc/status:go_default_library", ], ) diff --git a/pkg/volume/flexvolume/attacher.go b/pkg/volume/flexvolume/attacher.go index e83da3ff2d3..07e9582a134 100644 --- a/pkg/volume/flexvolume/attacher.go +++ b/pkg/volume/flexvolume/attacher.go @@ -98,8 +98,8 @@ func (a *flexVolumeAttacher) MountDevice(spec *volume.Spec, devicePath string, d return err } -func (attacher *flexVolumeAttacher) MountDeviceWithStatusTracking(spec *volume.Spec, devicePath string, deviceMountPath string) (volumetypes.OperationStatus, error) { - err := attacher.MountDevice(spec, devicePath, deviceMountPath) +func (a *flexVolumeAttacher) MountDeviceWithStatusTracking(spec *volume.Spec, devicePath string, deviceMountPath string) (volumetypes.OperationStatus, error) { + err := a.MountDevice(spec, devicePath, deviceMountPath) return volumetypes.OperationFinished, err } diff --git a/pkg/volume/testing/testing.go b/pkg/volume/testing/testing.go index 7544859913b..083c7a6fab6 100644 --- a/pkg/volume/testing/testing.go +++ b/pkg/volume/testing/testing.go @@ -71,6 +71,10 @@ const ( TimeoutOnSetupVolumeName = "timeout-setup-volume" // TimeoutOnMountDeviceVolumeName will cause MountDevice call to timeout but Setup will finish. TimeoutOnMountDeviceVolumeName = "timeout-mount-device-volume" + // TimeoutAndFailOnMountDeviceVolumeName will cause first MountDevice call to timeout but second call will fail + TimeoutAndFailOnMountDeviceVolumeName = "timeout-and-fail-mount-device-name" + // FailMountDeviceVolumeName will cause MountDevice operation on volume to fail + FailMountDeviceVolumeName = "fail-mount-device-volume-name" ) // fakeVolumeHost is useful for testing volume plugins. @@ -388,6 +392,7 @@ func (plugin *FakeVolumePlugin) getFakeVolume(list *[]*FakeVolume) *FakeVolume { UnmountDeviceHook: plugin.UnmountDeviceHook, } volume.VolumesAttached = make(map[string]types.NodeName) + volume.DeviceMountState = make(map[string]volumetypes.OperationStatus) *list = append(*list, volume) return volume } @@ -789,7 +794,8 @@ type FakeVolume struct { VolName string Plugin *FakeVolumePlugin MetricsNil - VolumesAttached map[string]types.NodeName + VolumesAttached map[string]types.NodeName + DeviceMountState map[string]volumetypes.OperationStatus // Add callbacks as needed WaitForAttachHook func(spec *Spec, devicePath string, pod *v1.Pod, spectimeout time.Duration) (string, error) @@ -1056,8 +1062,26 @@ func (fv *FakeVolume) MountDevice(spec *Spec, devicePath string, deviceMountPath fv.Lock() defer fv.Unlock() if spec.Name() == TimeoutOnMountDeviceVolumeName { + fv.DeviceMountState[spec.Name()] = volumetypes.OperationInProgress return volumetypes.NewOperationTimedOutError("error mounting device") } + + if spec.Name() == FailMountDeviceVolumeName { + fv.DeviceMountState[spec.Name()] = volumetypes.OperationFinished + return fmt.Errorf("error mounting disk: %s", devicePath) + } + + if spec.Name() == TimeoutAndFailOnMountDeviceVolumeName { + oldState, ok := fv.DeviceMountState[spec.Name()] + if !ok { + fv.DeviceMountState[spec.Name()] = volumetypes.OperationInProgress + return volumetypes.NewOperationTimedOutError("error mounting state") + } + if oldState == volumetypes.OperationInProgress { + fv.DeviceMountState[spec.Name()] = volumetypes.OperationFinished + return fmt.Errorf("error mounting disk: %s", devicePath) + } + } fv.MountDeviceCallCount++ return nil } diff --git a/pkg/volume/util/types/types.go b/pkg/volume/util/types/types.go index baaa488137d..284427fcabb 100644 --- a/pkg/volume/util/types/types.go +++ b/pkg/volume/util/types/types.go @@ -51,6 +51,7 @@ func (o *GeneratedOperations) Run() (eventErr, detailedErr error) { return o.OperationFunc() } +// OperationStatus is used to store status of a volume operation type OperationStatus string const (