diff --git a/pkg/kubelet/volumemanager/reconciler/reconciler_test.go b/pkg/kubelet/volumemanager/reconciler/reconciler_test.go index 9fde0713d92..f6a07157ee8 100644 --- a/pkg/kubelet/volumemanager/reconciler/reconciler_test.go +++ b/pkg/kubelet/volumemanager/reconciler/reconciler_test.go @@ -1148,7 +1148,6 @@ func Test_Run_Positive_VolumeFSResizeControllerAttachEnabled(t *testing.T) { } func Test_UncertainDeviceGlobalMounts(t *testing.T) { - fsMode := v1.PersistentVolumeFilesystem var tests = []struct { name string deviceState operationexecutor.DeviceMountState @@ -1190,129 +1189,140 @@ func Test_UncertainDeviceGlobalMounts(t *testing.T) { }, } - for _, tc := range tests { - t.Run(tc.name, func(t *testing.T) { + for _, mode := range []v1.PersistentVolumeMode{v1.PersistentVolumeBlock, v1.PersistentVolumeFilesystem} { + for _, tc := range tests { + testName := fmt.Sprintf("%s [%s]", tc.name, mode) + t.Run(testName+"[", func(t *testing.T) { - pv := &v1.PersistentVolume{ - ObjectMeta: metav1.ObjectMeta{ - Name: tc.volumeName, - UID: "pvuid", - }, - Spec: v1.PersistentVolumeSpec{ - ClaimRef: &v1.ObjectReference{Name: "pvc"}, - VolumeMode: &fsMode, - }, - } - pvc := &v1.PersistentVolumeClaim{ - ObjectMeta: metav1.ObjectMeta{ - Name: "pvc", - UID: "pvcuid", - }, - Spec: v1.PersistentVolumeClaimSpec{ - VolumeName: tc.volumeName, - }, - } - pod := &v1.Pod{ - ObjectMeta: metav1.ObjectMeta{ - Name: "pod1", - UID: "pod1uid", - }, - Spec: v1.PodSpec{ - Volumes: []v1.Volume{ - { - Name: "volume-name", - VolumeSource: v1.VolumeSource{ - PersistentVolumeClaim: &v1.PersistentVolumeClaimVolumeSource{ - ClaimName: pvc.Name, + pv := &v1.PersistentVolume{ + ObjectMeta: metav1.ObjectMeta{ + Name: tc.volumeName, + UID: "pvuid", + }, + Spec: v1.PersistentVolumeSpec{ + ClaimRef: &v1.ObjectReference{Name: "pvc"}, + VolumeMode: &mode, + }, + } + pvc := &v1.PersistentVolumeClaim{ + ObjectMeta: metav1.ObjectMeta{ + Name: "pvc", + UID: "pvcuid", + }, + Spec: v1.PersistentVolumeClaimSpec{ + VolumeName: tc.volumeName, + VolumeMode: &mode, + }, + } + pod := &v1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: "pod1", + UID: "pod1uid", + }, + Spec: v1.PodSpec{ + Volumes: []v1.Volume{ + { + Name: "volume-name", + VolumeSource: v1.VolumeSource{ + PersistentVolumeClaim: &v1.PersistentVolumeClaimVolumeSource{ + ClaimName: pvc.Name, + }, }, }, }, }, - }, - } + } - volumePluginMgr, fakePlugin := volumetesting.GetTestVolumePluginMgr(t) - fakePlugin.SupportsRemount = tc.supportRemount + volumePluginMgr, fakePlugin := volumetesting.GetTestVolumePluginMgr(t) + fakePlugin.SupportsRemount = tc.supportRemount - dsw := cache.NewDesiredStateOfWorld(volumePluginMgr) - asw := cache.NewActualStateOfWorld(nodeName, volumePluginMgr) - kubeClient := createtestClientWithPVPVC(pv, pvc, v1.AttachedVolume{ - Name: v1.UniqueVolumeName(fmt.Sprintf("fake-plugin/%s", tc.volumeName)), - DevicePath: "fake/path", + dsw := cache.NewDesiredStateOfWorld(volumePluginMgr) + asw := cache.NewActualStateOfWorld(nodeName, volumePluginMgr) + kubeClient := createtestClientWithPVPVC(pv, pvc, v1.AttachedVolume{ + Name: v1.UniqueVolumeName(fmt.Sprintf("fake-plugin/%s", tc.volumeName)), + DevicePath: "fake/path", + }) + fakeRecorder := &record.FakeRecorder{} + fakeHandler := volumetesting.NewBlockVolumePathHandler() + oex := operationexecutor.NewOperationExecutor(operationexecutor.NewOperationGenerator( + kubeClient, + volumePluginMgr, + fakeRecorder, + false, /* checkNodeCapabilitiesBeforeMount */ + fakeHandler)) + + reconciler := NewReconciler( + kubeClient, + true, /* controllerAttachDetachEnabled */ + reconcilerLoopSleepDuration, + waitForAttachTimeout, + nodeName, + dsw, + asw, + hasAddedPods, + oex, + &mount.FakeMounter{}, + hostutil.NewFakeHostUtil(nil), + volumePluginMgr, + kubeletPodsDir) + volumeSpec := &volume.Spec{PersistentVolume: pv} + podName := util.GetUniquePodName(pod) + volumeName, err := dsw.AddPodToVolume( + podName, pod, volumeSpec, volumeSpec.Name(), "" /* volumeGidValue */) + // Assert + if err != nil { + t.Fatalf("AddPodToVolume failed. Expected: Actual: <%v>", err) + } + dsw.MarkVolumesReportedInUse([]v1.UniqueVolumeName{volumeName}) + + // Start the reconciler to fill ASW. + stopChan, stoppedChan := make(chan struct{}), make(chan struct{}) + go func() { + reconciler.Run(stopChan) + close(stoppedChan) + }() + waitForVolumeToExistInASW(t, volumeName, asw) + if tc.volumeName == volumetesting.TimeoutAndFailOnMountDeviceVolumeName { + // Wait upto 10s for reconciler to catch up + time.Sleep(reconcilerSyncWaitDuration) + } + + if tc.volumeName == volumetesting.SuccessAndFailOnMountDeviceName || + tc.volumeName == volumetesting.SuccessAndTimeoutDeviceName { + // wait for mount and then break it via remount + waitForMount(t, fakePlugin, volumeName, asw) + asw.MarkRemountRequired(podName) + time.Sleep(reconcilerSyncWaitDuration) + } + + if tc.deviceState == operationexecutor.DeviceMountUncertain { + waitForUncertainGlobalMount(t, volumeName, asw) + } + + if tc.deviceState == operationexecutor.DeviceGloballyMounted { + waitForMount(t, fakePlugin, volumeName, asw) + } + + dsw.DeletePodFromVolume(podName, volumeName) + waitForDetach(t, volumeName, asw) + if mode == v1.PersistentVolumeFilesystem { + err = volumetesting.VerifyUnmountDeviceCallCount(tc.unmountDeviceCallCount, fakePlugin) + } else { + if tc.unmountDeviceCallCount == 0 { + err = volumetesting.VerifyZeroTearDownDeviceCallCount(fakePlugin) + } else { + err = volumetesting.VerifyTearDownDeviceCallCount(tc.unmountDeviceCallCount, fakePlugin) + } + } + if err != nil { + t.Errorf("Error verifying UnMountDeviceCallCount: %v", err) + } }) - fakeRecorder := &record.FakeRecorder{} - fakeHandler := volumetesting.NewBlockVolumePathHandler() - oex := operationexecutor.NewOperationExecutor(operationexecutor.NewOperationGenerator( - kubeClient, - volumePluginMgr, - fakeRecorder, - false, /* checkNodeCapabilitiesBeforeMount */ - fakeHandler)) - - reconciler := NewReconciler( - kubeClient, - true, /* controllerAttachDetachEnabled */ - reconcilerLoopSleepDuration, - waitForAttachTimeout, - nodeName, - dsw, - asw, - hasAddedPods, - oex, - &mount.FakeMounter{}, - hostutil.NewFakeHostUtil(nil), - volumePluginMgr, - kubeletPodsDir) - volumeSpec := &volume.Spec{PersistentVolume: pv} - podName := util.GetUniquePodName(pod) - volumeName, err := dsw.AddPodToVolume( - podName, pod, volumeSpec, volumeSpec.Name(), "" /* volumeGidValue */) - // Assert - if err != nil { - t.Fatalf("AddPodToVolume failed. Expected: Actual: <%v>", err) - } - dsw.MarkVolumesReportedInUse([]v1.UniqueVolumeName{volumeName}) - - // Start the reconciler to fill ASW. - stopChan, stoppedChan := make(chan struct{}), make(chan struct{}) - go func() { - reconciler.Run(stopChan) - close(stoppedChan) - }() - waitForVolumeToExistInASW(t, volumeName, asw) - if tc.volumeName == volumetesting.TimeoutAndFailOnMountDeviceVolumeName { - // Wait upto 10s for reconciler to catchup - time.Sleep(reconcilerSyncWaitDuration) - } - - if tc.volumeName == volumetesting.SuccessAndFailOnMountDeviceName || - tc.volumeName == volumetesting.SuccessAndTimeoutDeviceName { - // wait for mount and then break it via remount - waitForMount(t, fakePlugin, volumeName, asw) - asw.MarkRemountRequired(podName) - time.Sleep(reconcilerSyncWaitDuration) - } - - if tc.deviceState == operationexecutor.DeviceMountUncertain { - waitForUncertainGlobalMount(t, volumeName, asw) - } - - if tc.deviceState == operationexecutor.DeviceGloballyMounted { - waitForMount(t, fakePlugin, volumeName, asw) - } - - dsw.DeletePodFromVolume(podName, volumeName) - waitForDetach(t, volumeName, asw) - err = volumetesting.VerifyUnmountDeviceCallCount(tc.unmountDeviceCallCount, fakePlugin) - if err != nil { - t.Errorf("Error verifying UnMountDeviceCallCount: %v", err) - } - }) + } } } func Test_UncertainVolumeMountState(t *testing.T) { - fsMode := v1.PersistentVolumeFilesystem var tests = []struct { name string volumeState operationexecutor.VolumeMountState @@ -1331,14 +1341,14 @@ func Test_UncertainVolumeMountState(t *testing.T) { { name: "failed operation should result in not-mounted volume", volumeState: operationexecutor.VolumeNotMounted, - unmountDeviceCallCount: 0, + unmountDeviceCallCount: 1, unmountVolumeCount: 0, volumeName: volumetesting.FailOnSetupVolumeName, }, { name: "timeout followed by failed operation should result in non-mounted volume", volumeState: operationexecutor.VolumeNotMounted, - unmountDeviceCallCount: 0, + unmountDeviceCallCount: 1, unmountVolumeCount: 0, volumeName: volumetesting.TimeoutAndFailOnSetupVolumeName, }, @@ -1360,123 +1370,151 @@ func Test_UncertainVolumeMountState(t *testing.T) { }, } - for _, tc := range tests { - t.Run(tc.name, func(t *testing.T) { - pv := &v1.PersistentVolume{ - ObjectMeta: metav1.ObjectMeta{ - Name: tc.volumeName, - UID: "pvuid", - }, - Spec: v1.PersistentVolumeSpec{ - ClaimRef: &v1.ObjectReference{Name: "pvc"}, - VolumeMode: &fsMode, - }, - } - pvc := &v1.PersistentVolumeClaim{ - ObjectMeta: metav1.ObjectMeta{ - Name: "pvc", - UID: "pvcuid", - }, - Spec: v1.PersistentVolumeClaimSpec{ - VolumeName: tc.volumeName, - }, - } - pod := &v1.Pod{ - ObjectMeta: metav1.ObjectMeta{ - Name: "pod1", - UID: "pod1uid", - }, - Spec: v1.PodSpec{ - Volumes: []v1.Volume{ - { - Name: "volume-name", - VolumeSource: v1.VolumeSource{ - PersistentVolumeClaim: &v1.PersistentVolumeClaimVolumeSource{ - ClaimName: pvc.Name, + for _, mode := range []v1.PersistentVolumeMode{v1.PersistentVolumeBlock, v1.PersistentVolumeFilesystem} { + for _, tc := range tests { + testName := fmt.Sprintf("%s [%s]", tc.name, mode) + t.Run(testName, func(t *testing.T) { + pv := &v1.PersistentVolume{ + ObjectMeta: metav1.ObjectMeta{ + Name: tc.volumeName, + UID: "pvuid", + }, + Spec: v1.PersistentVolumeSpec{ + ClaimRef: &v1.ObjectReference{Name: "pvc"}, + VolumeMode: &mode, + }, + } + pvc := &v1.PersistentVolumeClaim{ + ObjectMeta: metav1.ObjectMeta{ + Name: "pvc", + UID: "pvcuid", + }, + Spec: v1.PersistentVolumeClaimSpec{ + VolumeName: tc.volumeName, + VolumeMode: &mode, + }, + } + pod := &v1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: "pod1", + UID: "pod1uid", + }, + Spec: v1.PodSpec{ + Volumes: []v1.Volume{ + { + Name: "volume-name", + VolumeSource: v1.VolumeSource{ + PersistentVolumeClaim: &v1.PersistentVolumeClaimVolumeSource{ + ClaimName: pvc.Name, + }, }, }, }, }, - }, - } + } - volumePluginMgr, fakePlugin := volumetesting.GetTestVolumePluginMgr(t) - fakePlugin.SupportsRemount = tc.supportRemount - dsw := cache.NewDesiredStateOfWorld(volumePluginMgr) - asw := cache.NewActualStateOfWorld(nodeName, volumePluginMgr) - kubeClient := createtestClientWithPVPVC(pv, pvc, v1.AttachedVolume{ - Name: v1.UniqueVolumeName(fmt.Sprintf("fake-plugin/%s", tc.volumeName)), - DevicePath: "fake/path", + volumePluginMgr, fakePlugin := volumetesting.GetTestVolumePluginMgr(t) + fakePlugin.SupportsRemount = tc.supportRemount + dsw := cache.NewDesiredStateOfWorld(volumePluginMgr) + asw := cache.NewActualStateOfWorld(nodeName, volumePluginMgr) + kubeClient := createtestClientWithPVPVC(pv, pvc, v1.AttachedVolume{ + Name: v1.UniqueVolumeName(fmt.Sprintf("fake-plugin/%s", tc.volumeName)), + DevicePath: "fake/path", + }) + fakeRecorder := &record.FakeRecorder{} + fakeHandler := volumetesting.NewBlockVolumePathHandler() + oex := operationexecutor.NewOperationExecutor(operationexecutor.NewOperationGenerator( + kubeClient, + volumePluginMgr, + fakeRecorder, + false, /* checkNodeCapabilitiesBeforeMount */ + fakeHandler)) + + reconciler := NewReconciler( + kubeClient, + true, /* controllerAttachDetachEnabled */ + reconcilerLoopSleepDuration, + waitForAttachTimeout, + nodeName, + dsw, + asw, + hasAddedPods, + oex, + &mount.FakeMounter{}, + hostutil.NewFakeHostUtil(nil), + volumePluginMgr, + kubeletPodsDir) + volumeSpec := &volume.Spec{PersistentVolume: pv} + podName := util.GetUniquePodName(pod) + volumeName, err := dsw.AddPodToVolume( + podName, pod, volumeSpec, volumeSpec.Name(), "" /* volumeGidValue */) + // Assert + if err != nil { + t.Fatalf("AddPodToVolume failed. Expected: Actual: <%v>", err) + } + dsw.MarkVolumesReportedInUse([]v1.UniqueVolumeName{volumeName}) + + // Start the reconciler to fill ASW. + stopChan, stoppedChan := make(chan struct{}), make(chan struct{}) + go func() { + reconciler.Run(stopChan) + close(stoppedChan) + }() + waitForVolumeToExistInASW(t, volumeName, asw) + if tc.volumeName == volumetesting.TimeoutAndFailOnSetupVolumeName { + // Wait upto 10s for reconciler to catchup + time.Sleep(reconcilerSyncWaitDuration) + } + + if tc.volumeName == volumetesting.SuccessAndFailOnSetupVolumeName || + tc.volumeName == volumetesting.SuccessAndTimeoutSetupVolumeName { + // wait for mount and then break it via remount + waitForMount(t, fakePlugin, volumeName, asw) + asw.MarkRemountRequired(podName) + time.Sleep(reconcilerSyncWaitDuration) + } + + if tc.volumeState == operationexecutor.VolumeMountUncertain { + waitForUncertainPodMount(t, volumeName, asw) + } + + if tc.volumeState == operationexecutor.VolumeMounted { + waitForMount(t, fakePlugin, volumeName, asw) + } + + dsw.DeletePodFromVolume(podName, volumeName) + waitForDetach(t, volumeName, asw) + + if mode == v1.PersistentVolumeFilesystem { + if err := volumetesting.VerifyUnmountDeviceCallCount(tc.unmountDeviceCallCount, fakePlugin); err != nil { + t.Errorf("Error verifying UnMountDeviceCallCount: %v", err) + } + if err := volumetesting.VerifyTearDownCallCount(tc.unmountVolumeCount, fakePlugin); err != nil { + t.Errorf("Error verifying UnMountDeviceCallCount: %v", err) + } + } else { + if tc.unmountVolumeCount == 0 { + if err := volumetesting.VerifyZeroUnmapPodDeviceCallCount(fakePlugin); err != nil { + t.Errorf("Error verifying UnMountDeviceCallCount: %v", err) + } + } else { + if err := volumetesting.VerifyUnmapPodDeviceCallCount(tc.unmountVolumeCount, fakePlugin); err != nil { + t.Errorf("Error verifying UnMountDeviceCallCount: %v", err) + } + } + if tc.unmountDeviceCallCount == 0 { + if err := volumetesting.VerifyZeroTearDownDeviceCallCount(fakePlugin); err != nil { + t.Errorf("Error verifying UnMountDeviceCallCount: %v", err) + } + } else { + if err := volumetesting.VerifyTearDownDeviceCallCount(tc.unmountDeviceCallCount, fakePlugin); err != nil { + t.Errorf("Error verifying UnMountDeviceCallCount: %v", err) + } + } + } }) - fakeRecorder := &record.FakeRecorder{} - fakeHandler := volumetesting.NewBlockVolumePathHandler() - oex := operationexecutor.NewOperationExecutor(operationexecutor.NewOperationGenerator( - kubeClient, - volumePluginMgr, - fakeRecorder, - false, /* checkNodeCapabilitiesBeforeMount */ - fakeHandler)) - - reconciler := NewReconciler( - kubeClient, - true, /* controllerAttachDetachEnabled */ - reconcilerLoopSleepDuration, - waitForAttachTimeout, - nodeName, - dsw, - asw, - hasAddedPods, - oex, - &mount.FakeMounter{}, - hostutil.NewFakeHostUtil(nil), - volumePluginMgr, - kubeletPodsDir) - volumeSpec := &volume.Spec{PersistentVolume: pv} - podName := util.GetUniquePodName(pod) - volumeName, err := dsw.AddPodToVolume( - podName, pod, volumeSpec, volumeSpec.Name(), "" /* volumeGidValue */) - // Assert - if err != nil { - t.Fatalf("AddPodToVolume failed. Expected: Actual: <%v>", err) - } - dsw.MarkVolumesReportedInUse([]v1.UniqueVolumeName{volumeName}) - - // Start the reconciler to fill ASW. - stopChan, stoppedChan := make(chan struct{}), make(chan struct{}) - go func() { - reconciler.Run(stopChan) - close(stoppedChan) - }() - waitForVolumeToExistInASW(t, volumeName, asw) - if tc.volumeName == volumetesting.TimeoutAndFailOnSetupVolumeName { - // Wait upto 10s for reconciler to catchup - time.Sleep(reconcilerSyncWaitDuration) - } - - if tc.volumeName == volumetesting.SuccessAndFailOnSetupVolumeName || - tc.volumeName == volumetesting.SuccessAndTimeoutSetupVolumeName { - // wait for mount and then break it via remount - waitForMount(t, fakePlugin, volumeName, asw) - asw.MarkRemountRequired(podName) - time.Sleep(reconcilerSyncWaitDuration) - } - - if tc.volumeState == operationexecutor.VolumeMountUncertain { - waitForUncertainPodMount(t, volumeName, asw) - } - - if tc.volumeState == operationexecutor.VolumeMounted { - waitForMount(t, fakePlugin, volumeName, asw) - } - - dsw.DeletePodFromVolume(podName, volumeName) - waitForDetach(t, volumeName, asw) - - volumetesting.VerifyUnmountDeviceCallCount(tc.unmountDeviceCallCount, fakePlugin) - volumetesting.VerifyTearDownCallCount(tc.unmountVolumeCount, fakePlugin) - }) + } } - } func waitForUncertainGlobalMount(t *testing.T, volumeName v1.UniqueVolumeName, asw cache.ActualStateOfWorld) { diff --git a/pkg/volume/csi/BUILD b/pkg/volume/csi/BUILD index 0d18e1832fb..0f96d0a94c4 100644 --- a/pkg/volume/csi/BUILD +++ b/pkg/volume/csi/BUILD @@ -88,6 +88,8 @@ go_test( "//staging/src/k8s.io/component-base/featuregate/testing:go_default_library", "//vendor/github.com/container-storage-interface/spec/lib/go/csi:go_default_library", "//vendor/github.com/stretchr/testify/assert:go_default_library", + "//vendor/google.golang.org/grpc/codes:go_default_library", + "//vendor/google.golang.org/grpc/status:go_default_library", "//vendor/k8s.io/klog:go_default_library", ], ) diff --git a/pkg/volume/csi/csi_block.go b/pkg/volume/csi/csi_block.go index ee2ae319ee4..2d8d31fd897 100644 --- a/pkg/volume/csi/csi_block.go +++ b/pkg/volume/csi/csi_block.go @@ -189,7 +189,7 @@ func (m *csiBlockMapper) stageVolumeForBlock( nil /* MountOptions */) if err != nil { - return "", errors.New(log("blockMapper.stageVolumeForBlock failed: %v", err)) + return "", err } klog.V(4).Infof(log("blockMapper.stageVolumeForBlock successfully requested NodeStageVolume [%s]", stagingPath)) @@ -249,7 +249,7 @@ func (m *csiBlockMapper) publishVolumeForBlock( ) if err != nil { - return "", errors.New(log("blockMapper.publishVolumeForBlock failed: %v", err)) + return "", err } return publishPath, nil @@ -503,19 +503,8 @@ func (m *csiBlockMapper) UnmapPodDevice() error { ctx, cancel := context.WithTimeout(context.Background(), csiTimeout) defer cancel() - // Call NodeUnpublishVolume - if _, err := os.Stat(publishPath); err != nil { - if os.IsNotExist(err) { - klog.V(4).Infof(log("blockMapper.UnmapPodDevice publishPath(%s) has already been deleted, skip calling NodeUnpublishVolume", publishPath)) - } else { - return err - } - } else { - err := m.unpublishVolumeForBlock(ctx, csiClient, publishPath) - if err != nil { - return err - } - } - - return nil + // Call NodeUnpublishVolume. + // Even if publishPath does not exist - previous NodePublish may have timed out + // and Kubernetes makes sure that the operation is finished. + return m.unpublishVolumeForBlock(ctx, csiClient, publishPath) } diff --git a/pkg/volume/csi/csi_block_test.go b/pkg/volume/csi/csi_block_test.go index e856da79427..04348e9b46f 100644 --- a/pkg/volume/csi/csi_block_test.go +++ b/pkg/volume/csi/csi_block_test.go @@ -18,12 +18,14 @@ package csi import ( "context" - "errors" "fmt" "os" "path/filepath" "testing" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" + api "k8s.io/api/core/v1" "k8s.io/api/storage/v1beta1" meta "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -299,7 +301,7 @@ func TestBlockMapperSetupDeviceError(t *testing.T) { csiMapper.csiClient = setupClient(t, true) fClient := csiMapper.csiClient.(*fakeCsiDriverClient) - fClient.nodeClient.SetNextError(errors.New("mock final error")) + fClient.nodeClient.SetNextError(status.Error(codes.InvalidArgument, "mock final error")) attachID := getAttachmentName(csiMapper.volumeID, string(csiMapper.driverName), string(nodeName)) attachment := makeTestAttachment(attachID, nodeName, pvName) diff --git a/pkg/volume/testing/testing.go b/pkg/volume/testing/testing.go index e414ddec813..055b54496f6 100644 --- a/pkg/volume/testing/testing.go +++ b/pkg/volume/testing/testing.go @@ -959,7 +959,42 @@ func (fv *FakeVolume) TearDownAt(dir string) error { func (fv *FakeVolume) SetUpDevice() error { fv.Lock() defer fv.Unlock() + if fv.VolName == TimeoutOnMountDeviceVolumeName { + fv.DeviceMountState[fv.VolName] = deviceMountUncertain + return volumetypes.NewUncertainProgressError("mount failed") + } + if fv.VolName == FailMountDeviceVolumeName { + fv.DeviceMountState[fv.VolName] = deviceNotMounted + return fmt.Errorf("error mapping disk: %s", fv.VolName) + } + + if fv.VolName == TimeoutAndFailOnMountDeviceVolumeName { + _, ok := fv.DeviceMountState[fv.VolName] + if !ok { + fv.DeviceMountState[fv.VolName] = deviceMountUncertain + return volumetypes.NewUncertainProgressError("timed out mounting error") + } + fv.DeviceMountState[fv.VolName] = deviceNotMounted + return fmt.Errorf("error mapping disk: %s", fv.VolName) + } + + if fv.VolName == SuccessAndTimeoutDeviceName { + _, ok := fv.DeviceMountState[fv.VolName] + if ok { + fv.DeviceMountState[fv.VolName] = deviceMountUncertain + return volumetypes.NewUncertainProgressError("error mounting state") + } + } + if fv.VolName == SuccessAndFailOnMountDeviceName { + _, ok := fv.DeviceMountState[fv.VolName] + if ok { + return fmt.Errorf("error mapping disk: %s", fv.VolName) + } + } + + fv.DeviceMountState[fv.VolName] = deviceMounted fv.SetUpDeviceCallCount++ + return nil } @@ -1044,6 +1079,45 @@ func (fv *FakeVolume) GetUnmapPodDeviceCallCount() int { func (fv *FakeVolume) MapPodDevice() (string, error) { fv.Lock() defer fv.Unlock() + + if fv.VolName == TimeoutOnSetupVolumeName { + fv.VolumeMountState[fv.VolName] = volumeMountUncertain + return "", volumetypes.NewUncertainProgressError("time out on setup") + } + + if fv.VolName == FailOnSetupVolumeName { + fv.VolumeMountState[fv.VolName] = volumeNotMounted + return "", fmt.Errorf("mounting volume failed") + } + + if fv.VolName == TimeoutAndFailOnSetupVolumeName { + _, ok := fv.VolumeMountState[fv.VolName] + if !ok { + fv.VolumeMountState[fv.VolName] = volumeMountUncertain + return "", volumetypes.NewUncertainProgressError("time out on setup") + } + fv.VolumeMountState[fv.VolName] = volumeNotMounted + return "", fmt.Errorf("mounting volume failed") + + } + + if fv.VolName == SuccessAndFailOnSetupVolumeName { + _, ok := fv.VolumeMountState[fv.VolName] + if ok { + fv.VolumeMountState[fv.VolName] = volumeNotMounted + return "", fmt.Errorf("mounting volume failed") + } + } + + if fv.VolName == SuccessAndTimeoutSetupVolumeName { + _, ok := fv.VolumeMountState[fv.VolName] + if ok { + fv.VolumeMountState[fv.VolName] = volumeMountUncertain + return "", volumetypes.NewUncertainProgressError("time out on setup") + } + } + + fv.VolumeMountState[fv.VolName] = volumeMounted fv.MapPodDeviceCallCount++ return "", nil } @@ -1624,6 +1698,39 @@ func VerifyZeroTearDownDeviceCallCount(fakeVolumePlugin *FakeVolumePlugin) error return nil } +// VerifyUnmapPodDeviceCallCount ensures that at least one of the Unmappers for this +// plugin has the expected number of UnmapPodDevice calls. Otherwise it +// returns an error. +func VerifyUnmapPodDeviceCallCount( + expectedUnmapPodDeviceCallCount int, + fakeVolumePlugin *FakeVolumePlugin) error { + for _, unmapper := range fakeVolumePlugin.GetBlockVolumeUnmapper() { + actualCallCount := unmapper.GetUnmapPodDeviceCallCount() + if actualCallCount >= expectedUnmapPodDeviceCallCount { + return nil + } + } + + return fmt.Errorf( + "No Unmapper have expected UnmapPodDeviceCallCount. Expected: <%v>.", + expectedUnmapPodDeviceCallCount) +} + +// VerifyZeroUnmapPodDeviceCallCount ensures that all Mappers for this plugin have a +// zero UnmapPodDevice calls. Otherwise it returns an error. +func VerifyZeroUnmapPodDeviceCallCount(fakeVolumePlugin *FakeVolumePlugin) error { + for _, unmapper := range fakeVolumePlugin.GetBlockVolumeUnmapper() { + actualCallCount := unmapper.GetUnmapPodDeviceCallCount() + if actualCallCount != 0 { + return fmt.Errorf( + "At least one unmapper has non-zero UnmapPodDeviceCallCount: <%v>.", + actualCallCount) + } + } + + return nil +} + // VerifyGetGlobalMapPathCallCount ensures that at least one of the Mappers for this // plugin has the expectedGlobalMapPathCallCount number of calls. Otherwise it returns // an error. diff --git a/pkg/volume/util/operationexecutor/operation_generator.go b/pkg/volume/util/operationexecutor/operation_generator.go index 5aeb095226b..5a98946c6f3 100644 --- a/pkg/volume/util/operationexecutor/operation_generator.go +++ b/pkg/volume/util/operationexecutor/operation_generator.go @@ -20,6 +20,7 @@ import ( "context" goerrors "errors" "fmt" + "os" "path/filepath" "strings" "time" @@ -929,7 +930,7 @@ func (og *operationGenerator) GenerateMapVolumeFunc( volumeAttacher, _ = attachableVolumePlugin.NewAttacher() } - mapVolumeFunc := func() (error, error) { + mapVolumeFunc := func() (simpleErr error, detailedErr error) { var devicePath string // Set up global map path under the given plugin directory using symbolic link globalMapPath, err := @@ -956,6 +957,7 @@ func (og *operationGenerator) GenerateMapVolumeFunc( if customBlockVolumeMapper, ok := blockVolumeMapper.(volume.CustomBlockVolumeMapper); ok { mapErr := customBlockVolumeMapper.SetUpDevice() if mapErr != nil { + og.markDeviceErrorState(volumeToMount, devicePath, globalMapPath, mapErr, actualStateOfWorld) // On failure, return error. Caller will log and retry. return volumeToMount.GenerateError("MapVolume.SetUpDevice failed", mapErr) } @@ -970,15 +972,36 @@ func (og *operationGenerator) GenerateMapVolumeFunc( return volumeToMount.GenerateError("MapVolume.MarkDeviceAsMounted failed", markDeviceMappedErr) } + markVolumeOpts := MarkVolumeOpts{ + PodName: volumeToMount.PodName, + PodUID: volumeToMount.Pod.UID, + VolumeName: volumeToMount.VolumeName, + BlockVolumeMapper: blockVolumeMapper, + OuterVolumeSpecName: volumeToMount.OuterVolumeSpecName, + VolumeGidVolume: volumeToMount.VolumeGidValue, + VolumeSpec: volumeToMount.VolumeSpec, + VolumeMountState: VolumeMounted, + } + // Call MapPodDevice if blockVolumeMapper implements CustomBlockVolumeMapper if customBlockVolumeMapper, ok := blockVolumeMapper.(volume.CustomBlockVolumeMapper); ok { // Execute driver specific map pluginDevicePath, mapErr := customBlockVolumeMapper.MapPodDevice() if mapErr != nil { // On failure, return error. Caller will log and retry. + og.markVolumeErrorState(volumeToMount, markVolumeOpts, mapErr, actualStateOfWorld) return volumeToMount.GenerateError("MapVolume.MapPodDevice failed", mapErr) } + // From now on, the volume is mapped. Mark it as uncertain on error, + // so it is is unmapped when corresponding pod is deleted. + defer func() { + if simpleErr != nil { + errText := simpleErr.Error() + og.markVolumeErrorState(volumeToMount, markVolumeOpts, volumetypes.NewUncertainProgressError(errText), actualStateOfWorld) + } + }() + // if pluginDevicePath is provided, assume attacher may not provide device // or attachment flow uses SetupDevice to get device path if len(pluginDevicePath) != 0 { @@ -1044,17 +1067,6 @@ func (og *operationGenerator) GenerateMapVolumeFunc( return volumeToMount.GenerateError("MapVolume.MarkVolumeAsMounted failed while expanding volume", resizeError) } - markVolumeOpts := MarkVolumeOpts{ - PodName: volumeToMount.PodName, - PodUID: volumeToMount.Pod.UID, - VolumeName: volumeToMount.VolumeName, - BlockVolumeMapper: blockVolumeMapper, - OuterVolumeSpecName: volumeToMount.OuterVolumeSpecName, - VolumeGidVolume: volumeToMount.VolumeGidValue, - VolumeSpec: volumeToMount.VolumeSpec, - VolumeMountState: VolumeMounted, - } - markVolMountedErr := actualStateOfWorld.MarkVolumeAsMounted(markVolumeOpts) if markVolMountedErr != nil { // On failure, return error. Caller will log and retry. @@ -1191,7 +1203,12 @@ func (og *operationGenerator) GenerateUnmapDeviceFunc( globalMapPath := deviceToDetach.DeviceMountPath refs, err := og.blkUtil.GetDeviceBindMountRefs(deviceToDetach.DevicePath, globalMapPath) if err != nil { - return deviceToDetach.GenerateError("UnmapDevice.GetDeviceBindMountRefs check failed", err) + if os.IsNotExist(err) { + // Looks like SetupDevice did not complete. Fall through to TearDownDevice and mark the device as unmounted. + refs = nil + } else { + return deviceToDetach.GenerateError("UnmapDevice.GetDeviceBindMountRefs check failed", err) + } } if len(refs) > 0 { err = fmt.Errorf("The device %q is still referenced from other Pods %v", globalMapPath, refs) diff --git a/pkg/volume/util/volumepathhandler/volume_path_handler.go b/pkg/volume/util/volumepathhandler/volume_path_handler.go index 9c0983bbf73..6f198302dc0 100644 --- a/pkg/volume/util/volumepathhandler/volume_path_handler.go +++ b/pkg/volume/util/volumepathhandler/volume_path_handler.go @@ -283,7 +283,7 @@ func (v VolumePathHandler) GetDeviceBindMountRefs(devPath string, mapPath string var refs []string files, err := ioutil.ReadDir(mapPath) if err != nil { - return nil, fmt.Errorf("directory cannot read %v", err) + return nil, err } for _, file := range files { if file.Mode()&os.ModeDevice != os.ModeDevice {