From 86a5bd98b644b4b79f9efa1da406ab505ca51a7f Mon Sep 17 00:00:00 2001 From: Jan Safranek Date: Mon, 2 Mar 2020 12:54:02 +0100 Subject: [PATCH 1/5] Add uncertain map state to block volumes Volume mount should be marked as uncertain after NodeStage / NodePublish timeout or similar error, when the driver can continue with the operation in background. --- pkg/volume/csi/csi_block.go | 4 +-- .../operationexecutor/operation_generator.go | 35 ++++++++++++------- 2 files changed, 25 insertions(+), 14 deletions(-) diff --git a/pkg/volume/csi/csi_block.go b/pkg/volume/csi/csi_block.go index ee2ae319ee4..ceacdc4a140 100644 --- a/pkg/volume/csi/csi_block.go +++ b/pkg/volume/csi/csi_block.go @@ -189,7 +189,7 @@ func (m *csiBlockMapper) stageVolumeForBlock( nil /* MountOptions */) if err != nil { - return "", errors.New(log("blockMapper.stageVolumeForBlock failed: %v", err)) + return "", err } klog.V(4).Infof(log("blockMapper.stageVolumeForBlock successfully requested NodeStageVolume [%s]", stagingPath)) @@ -249,7 +249,7 @@ func (m *csiBlockMapper) publishVolumeForBlock( ) if err != nil { - return "", errors.New(log("blockMapper.publishVolumeForBlock failed: %v", err)) + return "", err } return publishPath, nil diff --git a/pkg/volume/util/operationexecutor/operation_generator.go b/pkg/volume/util/operationexecutor/operation_generator.go index 5aeb095226b..9f86cf93f8e 100644 --- a/pkg/volume/util/operationexecutor/operation_generator.go +++ b/pkg/volume/util/operationexecutor/operation_generator.go @@ -929,7 +929,7 @@ func (og *operationGenerator) GenerateMapVolumeFunc( volumeAttacher, _ = attachableVolumePlugin.NewAttacher() } - mapVolumeFunc := func() (error, error) { + mapVolumeFunc := func() (simpleErr error, detailedErr error) { var devicePath string // Set up global map path under the given plugin directory using symbolic link globalMapPath, err := @@ -956,6 +956,7 @@ func (og *operationGenerator) GenerateMapVolumeFunc( if customBlockVolumeMapper, ok := blockVolumeMapper.(volume.CustomBlockVolumeMapper); ok { mapErr := customBlockVolumeMapper.SetUpDevice() if mapErr != nil { + og.markDeviceErrorState(volumeToMount, devicePath, globalMapPath, mapErr, actualStateOfWorld) // On failure, return error. Caller will log and retry. return volumeToMount.GenerateError("MapVolume.SetUpDevice failed", mapErr) } @@ -970,15 +971,36 @@ func (og *operationGenerator) GenerateMapVolumeFunc( return volumeToMount.GenerateError("MapVolume.MarkDeviceAsMounted failed", markDeviceMappedErr) } + markVolumeOpts := MarkVolumeOpts{ + PodName: volumeToMount.PodName, + PodUID: volumeToMount.Pod.UID, + VolumeName: volumeToMount.VolumeName, + BlockVolumeMapper: blockVolumeMapper, + OuterVolumeSpecName: volumeToMount.OuterVolumeSpecName, + VolumeGidVolume: volumeToMount.VolumeGidValue, + VolumeSpec: volumeToMount.VolumeSpec, + VolumeMountState: VolumeMounted, + } + // Call MapPodDevice if blockVolumeMapper implements CustomBlockVolumeMapper if customBlockVolumeMapper, ok := blockVolumeMapper.(volume.CustomBlockVolumeMapper); ok { // Execute driver specific map pluginDevicePath, mapErr := customBlockVolumeMapper.MapPodDevice() if mapErr != nil { // On failure, return error. Caller will log and retry. + og.markVolumeErrorState(volumeToMount, markVolumeOpts, mapErr, actualStateOfWorld) return volumeToMount.GenerateError("MapVolume.MapPodDevice failed", mapErr) } + // From now on, the volume is mapped. Mark it as uncertain on error, + // so it is is unmapped when corresponding pod is deleted. + defer func() { + if simpleErr != nil { + errText := simpleErr.Error() + og.markVolumeErrorState(volumeToMount, markVolumeOpts, volumetypes.NewUncertainProgressError(errText), actualStateOfWorld) + } + }() + // if pluginDevicePath is provided, assume attacher may not provide device // or attachment flow uses SetupDevice to get device path if len(pluginDevicePath) != 0 { @@ -1044,17 +1066,6 @@ func (og *operationGenerator) GenerateMapVolumeFunc( return volumeToMount.GenerateError("MapVolume.MarkVolumeAsMounted failed while expanding volume", resizeError) } - markVolumeOpts := MarkVolumeOpts{ - PodName: volumeToMount.PodName, - PodUID: volumeToMount.Pod.UID, - VolumeName: volumeToMount.VolumeName, - BlockVolumeMapper: blockVolumeMapper, - OuterVolumeSpecName: volumeToMount.OuterVolumeSpecName, - VolumeGidVolume: volumeToMount.VolumeGidValue, - VolumeSpec: volumeToMount.VolumeSpec, - VolumeMountState: VolumeMounted, - } - markVolMountedErr := actualStateOfWorld.MarkVolumeAsMounted(markVolumeOpts) if markVolMountedErr != nil { // On failure, return error. Caller will log and retry. From f6fc73573c7308004bfa62f5c6dbda0fef941a89 Mon Sep 17 00:00:00 2001 From: Jan Safranek Date: Mon, 2 Mar 2020 12:54:02 +0100 Subject: [PATCH 2/5] Call NodeUnpublish after NodePublish timeout When NodePublish times out and user deletes corresponding pod, the driver may continue publishing the volume. In order to "cancel" this operation, Kubernetes must issue NodeUnpublish and wait until it finishes. Therefore, NodeUnpublish should be called even if the target directory (created by the driver) does not exist yet. --- pkg/volume/csi/csi_block.go | 19 ++++--------------- 1 file changed, 4 insertions(+), 15 deletions(-) diff --git a/pkg/volume/csi/csi_block.go b/pkg/volume/csi/csi_block.go index ceacdc4a140..2d8d31fd897 100644 --- a/pkg/volume/csi/csi_block.go +++ b/pkg/volume/csi/csi_block.go @@ -503,19 +503,8 @@ func (m *csiBlockMapper) UnmapPodDevice() error { ctx, cancel := context.WithTimeout(context.Background(), csiTimeout) defer cancel() - // Call NodeUnpublishVolume - if _, err := os.Stat(publishPath); err != nil { - if os.IsNotExist(err) { - klog.V(4).Infof(log("blockMapper.UnmapPodDevice publishPath(%s) has already been deleted, skip calling NodeUnpublishVolume", publishPath)) - } else { - return err - } - } else { - err := m.unpublishVolumeForBlock(ctx, csiClient, publishPath) - if err != nil { - return err - } - } - - return nil + // Call NodeUnpublishVolume. + // Even if publishPath does not exist - previous NodePublish may have timed out + // and Kubernetes makes sure that the operation is finished. + return m.unpublishVolumeForBlock(ctx, csiClient, publishPath) } From c11427fef5b03018c9b4823b35fb9dfe541f339d Mon Sep 17 00:00:00 2001 From: Jan Safranek Date: Mon, 2 Mar 2020 12:54:02 +0100 Subject: [PATCH 3/5] Call NodeUnstage after NodeStage timeout When NodeStage times out and does not prepare destination device and user deletes corresponding pod, the driver may continue staging the volume in background. Kubernetes must call NodeUnstage to "cancel" this operation. Therefore TearDownDevice should be called even when the target directory does not exist (yet). --- pkg/volume/util/operationexecutor/operation_generator.go | 8 +++++++- pkg/volume/util/volumepathhandler/volume_path_handler.go | 2 +- 2 files changed, 8 insertions(+), 2 deletions(-) diff --git a/pkg/volume/util/operationexecutor/operation_generator.go b/pkg/volume/util/operationexecutor/operation_generator.go index 9f86cf93f8e..5a98946c6f3 100644 --- a/pkg/volume/util/operationexecutor/operation_generator.go +++ b/pkg/volume/util/operationexecutor/operation_generator.go @@ -20,6 +20,7 @@ import ( "context" goerrors "errors" "fmt" + "os" "path/filepath" "strings" "time" @@ -1202,7 +1203,12 @@ func (og *operationGenerator) GenerateUnmapDeviceFunc( globalMapPath := deviceToDetach.DeviceMountPath refs, err := og.blkUtil.GetDeviceBindMountRefs(deviceToDetach.DevicePath, globalMapPath) if err != nil { - return deviceToDetach.GenerateError("UnmapDevice.GetDeviceBindMountRefs check failed", err) + if os.IsNotExist(err) { + // Looks like SetupDevice did not complete. Fall through to TearDownDevice and mark the device as unmounted. + refs = nil + } else { + return deviceToDetach.GenerateError("UnmapDevice.GetDeviceBindMountRefs check failed", err) + } } if len(refs) > 0 { err = fmt.Errorf("The device %q is still referenced from other Pods %v", globalMapPath, refs) diff --git a/pkg/volume/util/volumepathhandler/volume_path_handler.go b/pkg/volume/util/volumepathhandler/volume_path_handler.go index 9c0983bbf73..6f198302dc0 100644 --- a/pkg/volume/util/volumepathhandler/volume_path_handler.go +++ b/pkg/volume/util/volumepathhandler/volume_path_handler.go @@ -283,7 +283,7 @@ func (v VolumePathHandler) GetDeviceBindMountRefs(devPath string, mapPath string var refs []string files, err := ioutil.ReadDir(mapPath) if err != nil { - return nil, fmt.Errorf("directory cannot read %v", err) + return nil, err } for _, file := range files { if file.Mode()&os.ModeDevice != os.ModeDevice { From 853678713337ec1eee20f4fe16cd32f0775346f9 Mon Sep 17 00:00:00 2001 From: Jan Safranek Date: Mon, 2 Mar 2020 12:54:02 +0100 Subject: [PATCH 4/5] Add unit tests --- .../reconciler/reconciler_test.go | 486 ++++++++++-------- pkg/volume/testing/testing.go | 107 ++++ 2 files changed, 369 insertions(+), 224 deletions(-) diff --git a/pkg/kubelet/volumemanager/reconciler/reconciler_test.go b/pkg/kubelet/volumemanager/reconciler/reconciler_test.go index 9fde0713d92..f6a07157ee8 100644 --- a/pkg/kubelet/volumemanager/reconciler/reconciler_test.go +++ b/pkg/kubelet/volumemanager/reconciler/reconciler_test.go @@ -1148,7 +1148,6 @@ func Test_Run_Positive_VolumeFSResizeControllerAttachEnabled(t *testing.T) { } func Test_UncertainDeviceGlobalMounts(t *testing.T) { - fsMode := v1.PersistentVolumeFilesystem var tests = []struct { name string deviceState operationexecutor.DeviceMountState @@ -1190,129 +1189,140 @@ func Test_UncertainDeviceGlobalMounts(t *testing.T) { }, } - for _, tc := range tests { - t.Run(tc.name, func(t *testing.T) { + for _, mode := range []v1.PersistentVolumeMode{v1.PersistentVolumeBlock, v1.PersistentVolumeFilesystem} { + for _, tc := range tests { + testName := fmt.Sprintf("%s [%s]", tc.name, mode) + t.Run(testName+"[", func(t *testing.T) { - pv := &v1.PersistentVolume{ - ObjectMeta: metav1.ObjectMeta{ - Name: tc.volumeName, - UID: "pvuid", - }, - Spec: v1.PersistentVolumeSpec{ - ClaimRef: &v1.ObjectReference{Name: "pvc"}, - VolumeMode: &fsMode, - }, - } - pvc := &v1.PersistentVolumeClaim{ - ObjectMeta: metav1.ObjectMeta{ - Name: "pvc", - UID: "pvcuid", - }, - Spec: v1.PersistentVolumeClaimSpec{ - VolumeName: tc.volumeName, - }, - } - pod := &v1.Pod{ - ObjectMeta: metav1.ObjectMeta{ - Name: "pod1", - UID: "pod1uid", - }, - Spec: v1.PodSpec{ - Volumes: []v1.Volume{ - { - Name: "volume-name", - VolumeSource: v1.VolumeSource{ - PersistentVolumeClaim: &v1.PersistentVolumeClaimVolumeSource{ - ClaimName: pvc.Name, + pv := &v1.PersistentVolume{ + ObjectMeta: metav1.ObjectMeta{ + Name: tc.volumeName, + UID: "pvuid", + }, + Spec: v1.PersistentVolumeSpec{ + ClaimRef: &v1.ObjectReference{Name: "pvc"}, + VolumeMode: &mode, + }, + } + pvc := &v1.PersistentVolumeClaim{ + ObjectMeta: metav1.ObjectMeta{ + Name: "pvc", + UID: "pvcuid", + }, + Spec: v1.PersistentVolumeClaimSpec{ + VolumeName: tc.volumeName, + VolumeMode: &mode, + }, + } + pod := &v1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: "pod1", + UID: "pod1uid", + }, + Spec: v1.PodSpec{ + Volumes: []v1.Volume{ + { + Name: "volume-name", + VolumeSource: v1.VolumeSource{ + PersistentVolumeClaim: &v1.PersistentVolumeClaimVolumeSource{ + ClaimName: pvc.Name, + }, }, }, }, }, - }, - } + } - volumePluginMgr, fakePlugin := volumetesting.GetTestVolumePluginMgr(t) - fakePlugin.SupportsRemount = tc.supportRemount + volumePluginMgr, fakePlugin := volumetesting.GetTestVolumePluginMgr(t) + fakePlugin.SupportsRemount = tc.supportRemount - dsw := cache.NewDesiredStateOfWorld(volumePluginMgr) - asw := cache.NewActualStateOfWorld(nodeName, volumePluginMgr) - kubeClient := createtestClientWithPVPVC(pv, pvc, v1.AttachedVolume{ - Name: v1.UniqueVolumeName(fmt.Sprintf("fake-plugin/%s", tc.volumeName)), - DevicePath: "fake/path", + dsw := cache.NewDesiredStateOfWorld(volumePluginMgr) + asw := cache.NewActualStateOfWorld(nodeName, volumePluginMgr) + kubeClient := createtestClientWithPVPVC(pv, pvc, v1.AttachedVolume{ + Name: v1.UniqueVolumeName(fmt.Sprintf("fake-plugin/%s", tc.volumeName)), + DevicePath: "fake/path", + }) + fakeRecorder := &record.FakeRecorder{} + fakeHandler := volumetesting.NewBlockVolumePathHandler() + oex := operationexecutor.NewOperationExecutor(operationexecutor.NewOperationGenerator( + kubeClient, + volumePluginMgr, + fakeRecorder, + false, /* checkNodeCapabilitiesBeforeMount */ + fakeHandler)) + + reconciler := NewReconciler( + kubeClient, + true, /* controllerAttachDetachEnabled */ + reconcilerLoopSleepDuration, + waitForAttachTimeout, + nodeName, + dsw, + asw, + hasAddedPods, + oex, + &mount.FakeMounter{}, + hostutil.NewFakeHostUtil(nil), + volumePluginMgr, + kubeletPodsDir) + volumeSpec := &volume.Spec{PersistentVolume: pv} + podName := util.GetUniquePodName(pod) + volumeName, err := dsw.AddPodToVolume( + podName, pod, volumeSpec, volumeSpec.Name(), "" /* volumeGidValue */) + // Assert + if err != nil { + t.Fatalf("AddPodToVolume failed. Expected: Actual: <%v>", err) + } + dsw.MarkVolumesReportedInUse([]v1.UniqueVolumeName{volumeName}) + + // Start the reconciler to fill ASW. + stopChan, stoppedChan := make(chan struct{}), make(chan struct{}) + go func() { + reconciler.Run(stopChan) + close(stoppedChan) + }() + waitForVolumeToExistInASW(t, volumeName, asw) + if tc.volumeName == volumetesting.TimeoutAndFailOnMountDeviceVolumeName { + // Wait upto 10s for reconciler to catch up + time.Sleep(reconcilerSyncWaitDuration) + } + + if tc.volumeName == volumetesting.SuccessAndFailOnMountDeviceName || + tc.volumeName == volumetesting.SuccessAndTimeoutDeviceName { + // wait for mount and then break it via remount + waitForMount(t, fakePlugin, volumeName, asw) + asw.MarkRemountRequired(podName) + time.Sleep(reconcilerSyncWaitDuration) + } + + if tc.deviceState == operationexecutor.DeviceMountUncertain { + waitForUncertainGlobalMount(t, volumeName, asw) + } + + if tc.deviceState == operationexecutor.DeviceGloballyMounted { + waitForMount(t, fakePlugin, volumeName, asw) + } + + dsw.DeletePodFromVolume(podName, volumeName) + waitForDetach(t, volumeName, asw) + if mode == v1.PersistentVolumeFilesystem { + err = volumetesting.VerifyUnmountDeviceCallCount(tc.unmountDeviceCallCount, fakePlugin) + } else { + if tc.unmountDeviceCallCount == 0 { + err = volumetesting.VerifyZeroTearDownDeviceCallCount(fakePlugin) + } else { + err = volumetesting.VerifyTearDownDeviceCallCount(tc.unmountDeviceCallCount, fakePlugin) + } + } + if err != nil { + t.Errorf("Error verifying UnMountDeviceCallCount: %v", err) + } }) - fakeRecorder := &record.FakeRecorder{} - fakeHandler := volumetesting.NewBlockVolumePathHandler() - oex := operationexecutor.NewOperationExecutor(operationexecutor.NewOperationGenerator( - kubeClient, - volumePluginMgr, - fakeRecorder, - false, /* checkNodeCapabilitiesBeforeMount */ - fakeHandler)) - - reconciler := NewReconciler( - kubeClient, - true, /* controllerAttachDetachEnabled */ - reconcilerLoopSleepDuration, - waitForAttachTimeout, - nodeName, - dsw, - asw, - hasAddedPods, - oex, - &mount.FakeMounter{}, - hostutil.NewFakeHostUtil(nil), - volumePluginMgr, - kubeletPodsDir) - volumeSpec := &volume.Spec{PersistentVolume: pv} - podName := util.GetUniquePodName(pod) - volumeName, err := dsw.AddPodToVolume( - podName, pod, volumeSpec, volumeSpec.Name(), "" /* volumeGidValue */) - // Assert - if err != nil { - t.Fatalf("AddPodToVolume failed. Expected: Actual: <%v>", err) - } - dsw.MarkVolumesReportedInUse([]v1.UniqueVolumeName{volumeName}) - - // Start the reconciler to fill ASW. - stopChan, stoppedChan := make(chan struct{}), make(chan struct{}) - go func() { - reconciler.Run(stopChan) - close(stoppedChan) - }() - waitForVolumeToExistInASW(t, volumeName, asw) - if tc.volumeName == volumetesting.TimeoutAndFailOnMountDeviceVolumeName { - // Wait upto 10s for reconciler to catchup - time.Sleep(reconcilerSyncWaitDuration) - } - - if tc.volumeName == volumetesting.SuccessAndFailOnMountDeviceName || - tc.volumeName == volumetesting.SuccessAndTimeoutDeviceName { - // wait for mount and then break it via remount - waitForMount(t, fakePlugin, volumeName, asw) - asw.MarkRemountRequired(podName) - time.Sleep(reconcilerSyncWaitDuration) - } - - if tc.deviceState == operationexecutor.DeviceMountUncertain { - waitForUncertainGlobalMount(t, volumeName, asw) - } - - if tc.deviceState == operationexecutor.DeviceGloballyMounted { - waitForMount(t, fakePlugin, volumeName, asw) - } - - dsw.DeletePodFromVolume(podName, volumeName) - waitForDetach(t, volumeName, asw) - err = volumetesting.VerifyUnmountDeviceCallCount(tc.unmountDeviceCallCount, fakePlugin) - if err != nil { - t.Errorf("Error verifying UnMountDeviceCallCount: %v", err) - } - }) + } } } func Test_UncertainVolumeMountState(t *testing.T) { - fsMode := v1.PersistentVolumeFilesystem var tests = []struct { name string volumeState operationexecutor.VolumeMountState @@ -1331,14 +1341,14 @@ func Test_UncertainVolumeMountState(t *testing.T) { { name: "failed operation should result in not-mounted volume", volumeState: operationexecutor.VolumeNotMounted, - unmountDeviceCallCount: 0, + unmountDeviceCallCount: 1, unmountVolumeCount: 0, volumeName: volumetesting.FailOnSetupVolumeName, }, { name: "timeout followed by failed operation should result in non-mounted volume", volumeState: operationexecutor.VolumeNotMounted, - unmountDeviceCallCount: 0, + unmountDeviceCallCount: 1, unmountVolumeCount: 0, volumeName: volumetesting.TimeoutAndFailOnSetupVolumeName, }, @@ -1360,123 +1370,151 @@ func Test_UncertainVolumeMountState(t *testing.T) { }, } - for _, tc := range tests { - t.Run(tc.name, func(t *testing.T) { - pv := &v1.PersistentVolume{ - ObjectMeta: metav1.ObjectMeta{ - Name: tc.volumeName, - UID: "pvuid", - }, - Spec: v1.PersistentVolumeSpec{ - ClaimRef: &v1.ObjectReference{Name: "pvc"}, - VolumeMode: &fsMode, - }, - } - pvc := &v1.PersistentVolumeClaim{ - ObjectMeta: metav1.ObjectMeta{ - Name: "pvc", - UID: "pvcuid", - }, - Spec: v1.PersistentVolumeClaimSpec{ - VolumeName: tc.volumeName, - }, - } - pod := &v1.Pod{ - ObjectMeta: metav1.ObjectMeta{ - Name: "pod1", - UID: "pod1uid", - }, - Spec: v1.PodSpec{ - Volumes: []v1.Volume{ - { - Name: "volume-name", - VolumeSource: v1.VolumeSource{ - PersistentVolumeClaim: &v1.PersistentVolumeClaimVolumeSource{ - ClaimName: pvc.Name, + for _, mode := range []v1.PersistentVolumeMode{v1.PersistentVolumeBlock, v1.PersistentVolumeFilesystem} { + for _, tc := range tests { + testName := fmt.Sprintf("%s [%s]", tc.name, mode) + t.Run(testName, func(t *testing.T) { + pv := &v1.PersistentVolume{ + ObjectMeta: metav1.ObjectMeta{ + Name: tc.volumeName, + UID: "pvuid", + }, + Spec: v1.PersistentVolumeSpec{ + ClaimRef: &v1.ObjectReference{Name: "pvc"}, + VolumeMode: &mode, + }, + } + pvc := &v1.PersistentVolumeClaim{ + ObjectMeta: metav1.ObjectMeta{ + Name: "pvc", + UID: "pvcuid", + }, + Spec: v1.PersistentVolumeClaimSpec{ + VolumeName: tc.volumeName, + VolumeMode: &mode, + }, + } + pod := &v1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: "pod1", + UID: "pod1uid", + }, + Spec: v1.PodSpec{ + Volumes: []v1.Volume{ + { + Name: "volume-name", + VolumeSource: v1.VolumeSource{ + PersistentVolumeClaim: &v1.PersistentVolumeClaimVolumeSource{ + ClaimName: pvc.Name, + }, }, }, }, }, - }, - } + } - volumePluginMgr, fakePlugin := volumetesting.GetTestVolumePluginMgr(t) - fakePlugin.SupportsRemount = tc.supportRemount - dsw := cache.NewDesiredStateOfWorld(volumePluginMgr) - asw := cache.NewActualStateOfWorld(nodeName, volumePluginMgr) - kubeClient := createtestClientWithPVPVC(pv, pvc, v1.AttachedVolume{ - Name: v1.UniqueVolumeName(fmt.Sprintf("fake-plugin/%s", tc.volumeName)), - DevicePath: "fake/path", + volumePluginMgr, fakePlugin := volumetesting.GetTestVolumePluginMgr(t) + fakePlugin.SupportsRemount = tc.supportRemount + dsw := cache.NewDesiredStateOfWorld(volumePluginMgr) + asw := cache.NewActualStateOfWorld(nodeName, volumePluginMgr) + kubeClient := createtestClientWithPVPVC(pv, pvc, v1.AttachedVolume{ + Name: v1.UniqueVolumeName(fmt.Sprintf("fake-plugin/%s", tc.volumeName)), + DevicePath: "fake/path", + }) + fakeRecorder := &record.FakeRecorder{} + fakeHandler := volumetesting.NewBlockVolumePathHandler() + oex := operationexecutor.NewOperationExecutor(operationexecutor.NewOperationGenerator( + kubeClient, + volumePluginMgr, + fakeRecorder, + false, /* checkNodeCapabilitiesBeforeMount */ + fakeHandler)) + + reconciler := NewReconciler( + kubeClient, + true, /* controllerAttachDetachEnabled */ + reconcilerLoopSleepDuration, + waitForAttachTimeout, + nodeName, + dsw, + asw, + hasAddedPods, + oex, + &mount.FakeMounter{}, + hostutil.NewFakeHostUtil(nil), + volumePluginMgr, + kubeletPodsDir) + volumeSpec := &volume.Spec{PersistentVolume: pv} + podName := util.GetUniquePodName(pod) + volumeName, err := dsw.AddPodToVolume( + podName, pod, volumeSpec, volumeSpec.Name(), "" /* volumeGidValue */) + // Assert + if err != nil { + t.Fatalf("AddPodToVolume failed. Expected: Actual: <%v>", err) + } + dsw.MarkVolumesReportedInUse([]v1.UniqueVolumeName{volumeName}) + + // Start the reconciler to fill ASW. + stopChan, stoppedChan := make(chan struct{}), make(chan struct{}) + go func() { + reconciler.Run(stopChan) + close(stoppedChan) + }() + waitForVolumeToExistInASW(t, volumeName, asw) + if tc.volumeName == volumetesting.TimeoutAndFailOnSetupVolumeName { + // Wait upto 10s for reconciler to catchup + time.Sleep(reconcilerSyncWaitDuration) + } + + if tc.volumeName == volumetesting.SuccessAndFailOnSetupVolumeName || + tc.volumeName == volumetesting.SuccessAndTimeoutSetupVolumeName { + // wait for mount and then break it via remount + waitForMount(t, fakePlugin, volumeName, asw) + asw.MarkRemountRequired(podName) + time.Sleep(reconcilerSyncWaitDuration) + } + + if tc.volumeState == operationexecutor.VolumeMountUncertain { + waitForUncertainPodMount(t, volumeName, asw) + } + + if tc.volumeState == operationexecutor.VolumeMounted { + waitForMount(t, fakePlugin, volumeName, asw) + } + + dsw.DeletePodFromVolume(podName, volumeName) + waitForDetach(t, volumeName, asw) + + if mode == v1.PersistentVolumeFilesystem { + if err := volumetesting.VerifyUnmountDeviceCallCount(tc.unmountDeviceCallCount, fakePlugin); err != nil { + t.Errorf("Error verifying UnMountDeviceCallCount: %v", err) + } + if err := volumetesting.VerifyTearDownCallCount(tc.unmountVolumeCount, fakePlugin); err != nil { + t.Errorf("Error verifying UnMountDeviceCallCount: %v", err) + } + } else { + if tc.unmountVolumeCount == 0 { + if err := volumetesting.VerifyZeroUnmapPodDeviceCallCount(fakePlugin); err != nil { + t.Errorf("Error verifying UnMountDeviceCallCount: %v", err) + } + } else { + if err := volumetesting.VerifyUnmapPodDeviceCallCount(tc.unmountVolumeCount, fakePlugin); err != nil { + t.Errorf("Error verifying UnMountDeviceCallCount: %v", err) + } + } + if tc.unmountDeviceCallCount == 0 { + if err := volumetesting.VerifyZeroTearDownDeviceCallCount(fakePlugin); err != nil { + t.Errorf("Error verifying UnMountDeviceCallCount: %v", err) + } + } else { + if err := volumetesting.VerifyTearDownDeviceCallCount(tc.unmountDeviceCallCount, fakePlugin); err != nil { + t.Errorf("Error verifying UnMountDeviceCallCount: %v", err) + } + } + } }) - fakeRecorder := &record.FakeRecorder{} - fakeHandler := volumetesting.NewBlockVolumePathHandler() - oex := operationexecutor.NewOperationExecutor(operationexecutor.NewOperationGenerator( - kubeClient, - volumePluginMgr, - fakeRecorder, - false, /* checkNodeCapabilitiesBeforeMount */ - fakeHandler)) - - reconciler := NewReconciler( - kubeClient, - true, /* controllerAttachDetachEnabled */ - reconcilerLoopSleepDuration, - waitForAttachTimeout, - nodeName, - dsw, - asw, - hasAddedPods, - oex, - &mount.FakeMounter{}, - hostutil.NewFakeHostUtil(nil), - volumePluginMgr, - kubeletPodsDir) - volumeSpec := &volume.Spec{PersistentVolume: pv} - podName := util.GetUniquePodName(pod) - volumeName, err := dsw.AddPodToVolume( - podName, pod, volumeSpec, volumeSpec.Name(), "" /* volumeGidValue */) - // Assert - if err != nil { - t.Fatalf("AddPodToVolume failed. Expected: Actual: <%v>", err) - } - dsw.MarkVolumesReportedInUse([]v1.UniqueVolumeName{volumeName}) - - // Start the reconciler to fill ASW. - stopChan, stoppedChan := make(chan struct{}), make(chan struct{}) - go func() { - reconciler.Run(stopChan) - close(stoppedChan) - }() - waitForVolumeToExistInASW(t, volumeName, asw) - if tc.volumeName == volumetesting.TimeoutAndFailOnSetupVolumeName { - // Wait upto 10s for reconciler to catchup - time.Sleep(reconcilerSyncWaitDuration) - } - - if tc.volumeName == volumetesting.SuccessAndFailOnSetupVolumeName || - tc.volumeName == volumetesting.SuccessAndTimeoutSetupVolumeName { - // wait for mount and then break it via remount - waitForMount(t, fakePlugin, volumeName, asw) - asw.MarkRemountRequired(podName) - time.Sleep(reconcilerSyncWaitDuration) - } - - if tc.volumeState == operationexecutor.VolumeMountUncertain { - waitForUncertainPodMount(t, volumeName, asw) - } - - if tc.volumeState == operationexecutor.VolumeMounted { - waitForMount(t, fakePlugin, volumeName, asw) - } - - dsw.DeletePodFromVolume(podName, volumeName) - waitForDetach(t, volumeName, asw) - - volumetesting.VerifyUnmountDeviceCallCount(tc.unmountDeviceCallCount, fakePlugin) - volumetesting.VerifyTearDownCallCount(tc.unmountVolumeCount, fakePlugin) - }) + } } - } func waitForUncertainGlobalMount(t *testing.T, volumeName v1.UniqueVolumeName, asw cache.ActualStateOfWorld) { diff --git a/pkg/volume/testing/testing.go b/pkg/volume/testing/testing.go index e414ddec813..055b54496f6 100644 --- a/pkg/volume/testing/testing.go +++ b/pkg/volume/testing/testing.go @@ -959,7 +959,42 @@ func (fv *FakeVolume) TearDownAt(dir string) error { func (fv *FakeVolume) SetUpDevice() error { fv.Lock() defer fv.Unlock() + if fv.VolName == TimeoutOnMountDeviceVolumeName { + fv.DeviceMountState[fv.VolName] = deviceMountUncertain + return volumetypes.NewUncertainProgressError("mount failed") + } + if fv.VolName == FailMountDeviceVolumeName { + fv.DeviceMountState[fv.VolName] = deviceNotMounted + return fmt.Errorf("error mapping disk: %s", fv.VolName) + } + + if fv.VolName == TimeoutAndFailOnMountDeviceVolumeName { + _, ok := fv.DeviceMountState[fv.VolName] + if !ok { + fv.DeviceMountState[fv.VolName] = deviceMountUncertain + return volumetypes.NewUncertainProgressError("timed out mounting error") + } + fv.DeviceMountState[fv.VolName] = deviceNotMounted + return fmt.Errorf("error mapping disk: %s", fv.VolName) + } + + if fv.VolName == SuccessAndTimeoutDeviceName { + _, ok := fv.DeviceMountState[fv.VolName] + if ok { + fv.DeviceMountState[fv.VolName] = deviceMountUncertain + return volumetypes.NewUncertainProgressError("error mounting state") + } + } + if fv.VolName == SuccessAndFailOnMountDeviceName { + _, ok := fv.DeviceMountState[fv.VolName] + if ok { + return fmt.Errorf("error mapping disk: %s", fv.VolName) + } + } + + fv.DeviceMountState[fv.VolName] = deviceMounted fv.SetUpDeviceCallCount++ + return nil } @@ -1044,6 +1079,45 @@ func (fv *FakeVolume) GetUnmapPodDeviceCallCount() int { func (fv *FakeVolume) MapPodDevice() (string, error) { fv.Lock() defer fv.Unlock() + + if fv.VolName == TimeoutOnSetupVolumeName { + fv.VolumeMountState[fv.VolName] = volumeMountUncertain + return "", volumetypes.NewUncertainProgressError("time out on setup") + } + + if fv.VolName == FailOnSetupVolumeName { + fv.VolumeMountState[fv.VolName] = volumeNotMounted + return "", fmt.Errorf("mounting volume failed") + } + + if fv.VolName == TimeoutAndFailOnSetupVolumeName { + _, ok := fv.VolumeMountState[fv.VolName] + if !ok { + fv.VolumeMountState[fv.VolName] = volumeMountUncertain + return "", volumetypes.NewUncertainProgressError("time out on setup") + } + fv.VolumeMountState[fv.VolName] = volumeNotMounted + return "", fmt.Errorf("mounting volume failed") + + } + + if fv.VolName == SuccessAndFailOnSetupVolumeName { + _, ok := fv.VolumeMountState[fv.VolName] + if ok { + fv.VolumeMountState[fv.VolName] = volumeNotMounted + return "", fmt.Errorf("mounting volume failed") + } + } + + if fv.VolName == SuccessAndTimeoutSetupVolumeName { + _, ok := fv.VolumeMountState[fv.VolName] + if ok { + fv.VolumeMountState[fv.VolName] = volumeMountUncertain + return "", volumetypes.NewUncertainProgressError("time out on setup") + } + } + + fv.VolumeMountState[fv.VolName] = volumeMounted fv.MapPodDeviceCallCount++ return "", nil } @@ -1624,6 +1698,39 @@ func VerifyZeroTearDownDeviceCallCount(fakeVolumePlugin *FakeVolumePlugin) error return nil } +// VerifyUnmapPodDeviceCallCount ensures that at least one of the Unmappers for this +// plugin has the expected number of UnmapPodDevice calls. Otherwise it +// returns an error. +func VerifyUnmapPodDeviceCallCount( + expectedUnmapPodDeviceCallCount int, + fakeVolumePlugin *FakeVolumePlugin) error { + for _, unmapper := range fakeVolumePlugin.GetBlockVolumeUnmapper() { + actualCallCount := unmapper.GetUnmapPodDeviceCallCount() + if actualCallCount >= expectedUnmapPodDeviceCallCount { + return nil + } + } + + return fmt.Errorf( + "No Unmapper have expected UnmapPodDeviceCallCount. Expected: <%v>.", + expectedUnmapPodDeviceCallCount) +} + +// VerifyZeroUnmapPodDeviceCallCount ensures that all Mappers for this plugin have a +// zero UnmapPodDevice calls. Otherwise it returns an error. +func VerifyZeroUnmapPodDeviceCallCount(fakeVolumePlugin *FakeVolumePlugin) error { + for _, unmapper := range fakeVolumePlugin.GetBlockVolumeUnmapper() { + actualCallCount := unmapper.GetUnmapPodDeviceCallCount() + if actualCallCount != 0 { + return fmt.Errorf( + "At least one unmapper has non-zero UnmapPodDeviceCallCount: <%v>.", + actualCallCount) + } + } + + return nil +} + // VerifyGetGlobalMapPathCallCount ensures that at least one of the Mappers for this // plugin has the expectedGlobalMapPathCallCount number of calls. Otherwise it returns // an error. From afcbb683865473a701cc491229039b8c2ae42f23 Mon Sep 17 00:00:00 2001 From: Jan Safranek Date: Mon, 2 Mar 2020 12:54:03 +0100 Subject: [PATCH 5/5] Fix unit test to fail with proper final gRPC code Plain "errors.New" is interpreted as transient error. --- pkg/volume/csi/BUILD | 2 ++ pkg/volume/csi/csi_block_test.go | 6 ++++-- 2 files changed, 6 insertions(+), 2 deletions(-) diff --git a/pkg/volume/csi/BUILD b/pkg/volume/csi/BUILD index 0d18e1832fb..0f96d0a94c4 100644 --- a/pkg/volume/csi/BUILD +++ b/pkg/volume/csi/BUILD @@ -88,6 +88,8 @@ go_test( "//staging/src/k8s.io/component-base/featuregate/testing:go_default_library", "//vendor/github.com/container-storage-interface/spec/lib/go/csi:go_default_library", "//vendor/github.com/stretchr/testify/assert:go_default_library", + "//vendor/google.golang.org/grpc/codes:go_default_library", + "//vendor/google.golang.org/grpc/status:go_default_library", "//vendor/k8s.io/klog:go_default_library", ], ) diff --git a/pkg/volume/csi/csi_block_test.go b/pkg/volume/csi/csi_block_test.go index e856da79427..04348e9b46f 100644 --- a/pkg/volume/csi/csi_block_test.go +++ b/pkg/volume/csi/csi_block_test.go @@ -18,12 +18,14 @@ package csi import ( "context" - "errors" "fmt" "os" "path/filepath" "testing" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" + api "k8s.io/api/core/v1" "k8s.io/api/storage/v1beta1" meta "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -299,7 +301,7 @@ func TestBlockMapperSetupDeviceError(t *testing.T) { csiMapper.csiClient = setupClient(t, true) fClient := csiMapper.csiClient.(*fakeCsiDriverClient) - fClient.nodeClient.SetNextError(errors.New("mock final error")) + fClient.nodeClient.SetNextError(status.Error(codes.InvalidArgument, "mock final error")) attachID := getAttachmentName(csiMapper.volumeID, string(csiMapper.driverName), string(nodeName)) attachment := makeTestAttachment(attachID, nodeName, pvName)