Merge pull request #88660 from jsafrane/block-uncertain

Implement uncertain mount for block volumes
This commit is contained in:
Kubernetes Prow Robot 2020-03-02 11:43:08 -08:00 committed by GitHub
commit 7e2394cbb0
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 412 additions and 257 deletions

View File

@ -1148,7 +1148,6 @@ func Test_Run_Positive_VolumeFSResizeControllerAttachEnabled(t *testing.T) {
} }
func Test_UncertainDeviceGlobalMounts(t *testing.T) { func Test_UncertainDeviceGlobalMounts(t *testing.T) {
fsMode := v1.PersistentVolumeFilesystem
var tests = []struct { var tests = []struct {
name string name string
deviceState operationexecutor.DeviceMountState deviceState operationexecutor.DeviceMountState
@ -1190,129 +1189,140 @@ func Test_UncertainDeviceGlobalMounts(t *testing.T) {
}, },
} }
for _, tc := range tests { for _, mode := range []v1.PersistentVolumeMode{v1.PersistentVolumeBlock, v1.PersistentVolumeFilesystem} {
t.Run(tc.name, func(t *testing.T) { for _, tc := range tests {
testName := fmt.Sprintf("%s [%s]", tc.name, mode)
t.Run(testName+"[", func(t *testing.T) {
pv := &v1.PersistentVolume{ pv := &v1.PersistentVolume{
ObjectMeta: metav1.ObjectMeta{ ObjectMeta: metav1.ObjectMeta{
Name: tc.volumeName, Name: tc.volumeName,
UID: "pvuid", UID: "pvuid",
}, },
Spec: v1.PersistentVolumeSpec{ Spec: v1.PersistentVolumeSpec{
ClaimRef: &v1.ObjectReference{Name: "pvc"}, ClaimRef: &v1.ObjectReference{Name: "pvc"},
VolumeMode: &fsMode, VolumeMode: &mode,
}, },
} }
pvc := &v1.PersistentVolumeClaim{ pvc := &v1.PersistentVolumeClaim{
ObjectMeta: metav1.ObjectMeta{ ObjectMeta: metav1.ObjectMeta{
Name: "pvc", Name: "pvc",
UID: "pvcuid", UID: "pvcuid",
}, },
Spec: v1.PersistentVolumeClaimSpec{ Spec: v1.PersistentVolumeClaimSpec{
VolumeName: tc.volumeName, VolumeName: tc.volumeName,
}, VolumeMode: &mode,
} },
pod := &v1.Pod{ }
ObjectMeta: metav1.ObjectMeta{ pod := &v1.Pod{
Name: "pod1", ObjectMeta: metav1.ObjectMeta{
UID: "pod1uid", Name: "pod1",
}, UID: "pod1uid",
Spec: v1.PodSpec{ },
Volumes: []v1.Volume{ Spec: v1.PodSpec{
{ Volumes: []v1.Volume{
Name: "volume-name", {
VolumeSource: v1.VolumeSource{ Name: "volume-name",
PersistentVolumeClaim: &v1.PersistentVolumeClaimVolumeSource{ VolumeSource: v1.VolumeSource{
ClaimName: pvc.Name, PersistentVolumeClaim: &v1.PersistentVolumeClaimVolumeSource{
ClaimName: pvc.Name,
},
}, },
}, },
}, },
}, },
}, }
}
volumePluginMgr, fakePlugin := volumetesting.GetTestVolumePluginMgr(t) volumePluginMgr, fakePlugin := volumetesting.GetTestVolumePluginMgr(t)
fakePlugin.SupportsRemount = tc.supportRemount fakePlugin.SupportsRemount = tc.supportRemount
dsw := cache.NewDesiredStateOfWorld(volumePluginMgr) dsw := cache.NewDesiredStateOfWorld(volumePluginMgr)
asw := cache.NewActualStateOfWorld(nodeName, volumePluginMgr) asw := cache.NewActualStateOfWorld(nodeName, volumePluginMgr)
kubeClient := createtestClientWithPVPVC(pv, pvc, v1.AttachedVolume{ kubeClient := createtestClientWithPVPVC(pv, pvc, v1.AttachedVolume{
Name: v1.UniqueVolumeName(fmt.Sprintf("fake-plugin/%s", tc.volumeName)), Name: v1.UniqueVolumeName(fmt.Sprintf("fake-plugin/%s", tc.volumeName)),
DevicePath: "fake/path", DevicePath: "fake/path",
})
fakeRecorder := &record.FakeRecorder{}
fakeHandler := volumetesting.NewBlockVolumePathHandler()
oex := operationexecutor.NewOperationExecutor(operationexecutor.NewOperationGenerator(
kubeClient,
volumePluginMgr,
fakeRecorder,
false, /* checkNodeCapabilitiesBeforeMount */
fakeHandler))
reconciler := NewReconciler(
kubeClient,
true, /* controllerAttachDetachEnabled */
reconcilerLoopSleepDuration,
waitForAttachTimeout,
nodeName,
dsw,
asw,
hasAddedPods,
oex,
&mount.FakeMounter{},
hostutil.NewFakeHostUtil(nil),
volumePluginMgr,
kubeletPodsDir)
volumeSpec := &volume.Spec{PersistentVolume: pv}
podName := util.GetUniquePodName(pod)
volumeName, err := dsw.AddPodToVolume(
podName, pod, volumeSpec, volumeSpec.Name(), "" /* volumeGidValue */)
// Assert
if err != nil {
t.Fatalf("AddPodToVolume failed. Expected: <no error> Actual: <%v>", err)
}
dsw.MarkVolumesReportedInUse([]v1.UniqueVolumeName{volumeName})
// Start the reconciler to fill ASW.
stopChan, stoppedChan := make(chan struct{}), make(chan struct{})
go func() {
reconciler.Run(stopChan)
close(stoppedChan)
}()
waitForVolumeToExistInASW(t, volumeName, asw)
if tc.volumeName == volumetesting.TimeoutAndFailOnMountDeviceVolumeName {
// Wait upto 10s for reconciler to catch up
time.Sleep(reconcilerSyncWaitDuration)
}
if tc.volumeName == volumetesting.SuccessAndFailOnMountDeviceName ||
tc.volumeName == volumetesting.SuccessAndTimeoutDeviceName {
// wait for mount and then break it via remount
waitForMount(t, fakePlugin, volumeName, asw)
asw.MarkRemountRequired(podName)
time.Sleep(reconcilerSyncWaitDuration)
}
if tc.deviceState == operationexecutor.DeviceMountUncertain {
waitForUncertainGlobalMount(t, volumeName, asw)
}
if tc.deviceState == operationexecutor.DeviceGloballyMounted {
waitForMount(t, fakePlugin, volumeName, asw)
}
dsw.DeletePodFromVolume(podName, volumeName)
waitForDetach(t, volumeName, asw)
if mode == v1.PersistentVolumeFilesystem {
err = volumetesting.VerifyUnmountDeviceCallCount(tc.unmountDeviceCallCount, fakePlugin)
} else {
if tc.unmountDeviceCallCount == 0 {
err = volumetesting.VerifyZeroTearDownDeviceCallCount(fakePlugin)
} else {
err = volumetesting.VerifyTearDownDeviceCallCount(tc.unmountDeviceCallCount, fakePlugin)
}
}
if err != nil {
t.Errorf("Error verifying UnMountDeviceCallCount: %v", err)
}
}) })
fakeRecorder := &record.FakeRecorder{} }
fakeHandler := volumetesting.NewBlockVolumePathHandler()
oex := operationexecutor.NewOperationExecutor(operationexecutor.NewOperationGenerator(
kubeClient,
volumePluginMgr,
fakeRecorder,
false, /* checkNodeCapabilitiesBeforeMount */
fakeHandler))
reconciler := NewReconciler(
kubeClient,
true, /* controllerAttachDetachEnabled */
reconcilerLoopSleepDuration,
waitForAttachTimeout,
nodeName,
dsw,
asw,
hasAddedPods,
oex,
&mount.FakeMounter{},
hostutil.NewFakeHostUtil(nil),
volumePluginMgr,
kubeletPodsDir)
volumeSpec := &volume.Spec{PersistentVolume: pv}
podName := util.GetUniquePodName(pod)
volumeName, err := dsw.AddPodToVolume(
podName, pod, volumeSpec, volumeSpec.Name(), "" /* volumeGidValue */)
// Assert
if err != nil {
t.Fatalf("AddPodToVolume failed. Expected: <no error> Actual: <%v>", err)
}
dsw.MarkVolumesReportedInUse([]v1.UniqueVolumeName{volumeName})
// Start the reconciler to fill ASW.
stopChan, stoppedChan := make(chan struct{}), make(chan struct{})
go func() {
reconciler.Run(stopChan)
close(stoppedChan)
}()
waitForVolumeToExistInASW(t, volumeName, asw)
if tc.volumeName == volumetesting.TimeoutAndFailOnMountDeviceVolumeName {
// Wait upto 10s for reconciler to catchup
time.Sleep(reconcilerSyncWaitDuration)
}
if tc.volumeName == volumetesting.SuccessAndFailOnMountDeviceName ||
tc.volumeName == volumetesting.SuccessAndTimeoutDeviceName {
// wait for mount and then break it via remount
waitForMount(t, fakePlugin, volumeName, asw)
asw.MarkRemountRequired(podName)
time.Sleep(reconcilerSyncWaitDuration)
}
if tc.deviceState == operationexecutor.DeviceMountUncertain {
waitForUncertainGlobalMount(t, volumeName, asw)
}
if tc.deviceState == operationexecutor.DeviceGloballyMounted {
waitForMount(t, fakePlugin, volumeName, asw)
}
dsw.DeletePodFromVolume(podName, volumeName)
waitForDetach(t, volumeName, asw)
err = volumetesting.VerifyUnmountDeviceCallCount(tc.unmountDeviceCallCount, fakePlugin)
if err != nil {
t.Errorf("Error verifying UnMountDeviceCallCount: %v", err)
}
})
} }
} }
func Test_UncertainVolumeMountState(t *testing.T) { func Test_UncertainVolumeMountState(t *testing.T) {
fsMode := v1.PersistentVolumeFilesystem
var tests = []struct { var tests = []struct {
name string name string
volumeState operationexecutor.VolumeMountState volumeState operationexecutor.VolumeMountState
@ -1331,14 +1341,14 @@ func Test_UncertainVolumeMountState(t *testing.T) {
{ {
name: "failed operation should result in not-mounted volume", name: "failed operation should result in not-mounted volume",
volumeState: operationexecutor.VolumeNotMounted, volumeState: operationexecutor.VolumeNotMounted,
unmountDeviceCallCount: 0, unmountDeviceCallCount: 1,
unmountVolumeCount: 0, unmountVolumeCount: 0,
volumeName: volumetesting.FailOnSetupVolumeName, volumeName: volumetesting.FailOnSetupVolumeName,
}, },
{ {
name: "timeout followed by failed operation should result in non-mounted volume", name: "timeout followed by failed operation should result in non-mounted volume",
volumeState: operationexecutor.VolumeNotMounted, volumeState: operationexecutor.VolumeNotMounted,
unmountDeviceCallCount: 0, unmountDeviceCallCount: 1,
unmountVolumeCount: 0, unmountVolumeCount: 0,
volumeName: volumetesting.TimeoutAndFailOnSetupVolumeName, volumeName: volumetesting.TimeoutAndFailOnSetupVolumeName,
}, },
@ -1360,123 +1370,151 @@ func Test_UncertainVolumeMountState(t *testing.T) {
}, },
} }
for _, tc := range tests { for _, mode := range []v1.PersistentVolumeMode{v1.PersistentVolumeBlock, v1.PersistentVolumeFilesystem} {
t.Run(tc.name, func(t *testing.T) { for _, tc := range tests {
pv := &v1.PersistentVolume{ testName := fmt.Sprintf("%s [%s]", tc.name, mode)
ObjectMeta: metav1.ObjectMeta{ t.Run(testName, func(t *testing.T) {
Name: tc.volumeName, pv := &v1.PersistentVolume{
UID: "pvuid", ObjectMeta: metav1.ObjectMeta{
}, Name: tc.volumeName,
Spec: v1.PersistentVolumeSpec{ UID: "pvuid",
ClaimRef: &v1.ObjectReference{Name: "pvc"}, },
VolumeMode: &fsMode, Spec: v1.PersistentVolumeSpec{
}, ClaimRef: &v1.ObjectReference{Name: "pvc"},
} VolumeMode: &mode,
pvc := &v1.PersistentVolumeClaim{ },
ObjectMeta: metav1.ObjectMeta{ }
Name: "pvc", pvc := &v1.PersistentVolumeClaim{
UID: "pvcuid", ObjectMeta: metav1.ObjectMeta{
}, Name: "pvc",
Spec: v1.PersistentVolumeClaimSpec{ UID: "pvcuid",
VolumeName: tc.volumeName, },
}, Spec: v1.PersistentVolumeClaimSpec{
} VolumeName: tc.volumeName,
pod := &v1.Pod{ VolumeMode: &mode,
ObjectMeta: metav1.ObjectMeta{ },
Name: "pod1", }
UID: "pod1uid", pod := &v1.Pod{
}, ObjectMeta: metav1.ObjectMeta{
Spec: v1.PodSpec{ Name: "pod1",
Volumes: []v1.Volume{ UID: "pod1uid",
{ },
Name: "volume-name", Spec: v1.PodSpec{
VolumeSource: v1.VolumeSource{ Volumes: []v1.Volume{
PersistentVolumeClaim: &v1.PersistentVolumeClaimVolumeSource{ {
ClaimName: pvc.Name, Name: "volume-name",
VolumeSource: v1.VolumeSource{
PersistentVolumeClaim: &v1.PersistentVolumeClaimVolumeSource{
ClaimName: pvc.Name,
},
}, },
}, },
}, },
}, },
}, }
}
volumePluginMgr, fakePlugin := volumetesting.GetTestVolumePluginMgr(t) volumePluginMgr, fakePlugin := volumetesting.GetTestVolumePluginMgr(t)
fakePlugin.SupportsRemount = tc.supportRemount fakePlugin.SupportsRemount = tc.supportRemount
dsw := cache.NewDesiredStateOfWorld(volumePluginMgr) dsw := cache.NewDesiredStateOfWorld(volumePluginMgr)
asw := cache.NewActualStateOfWorld(nodeName, volumePluginMgr) asw := cache.NewActualStateOfWorld(nodeName, volumePluginMgr)
kubeClient := createtestClientWithPVPVC(pv, pvc, v1.AttachedVolume{ kubeClient := createtestClientWithPVPVC(pv, pvc, v1.AttachedVolume{
Name: v1.UniqueVolumeName(fmt.Sprintf("fake-plugin/%s", tc.volumeName)), Name: v1.UniqueVolumeName(fmt.Sprintf("fake-plugin/%s", tc.volumeName)),
DevicePath: "fake/path", DevicePath: "fake/path",
})
fakeRecorder := &record.FakeRecorder{}
fakeHandler := volumetesting.NewBlockVolumePathHandler()
oex := operationexecutor.NewOperationExecutor(operationexecutor.NewOperationGenerator(
kubeClient,
volumePluginMgr,
fakeRecorder,
false, /* checkNodeCapabilitiesBeforeMount */
fakeHandler))
reconciler := NewReconciler(
kubeClient,
true, /* controllerAttachDetachEnabled */
reconcilerLoopSleepDuration,
waitForAttachTimeout,
nodeName,
dsw,
asw,
hasAddedPods,
oex,
&mount.FakeMounter{},
hostutil.NewFakeHostUtil(nil),
volumePluginMgr,
kubeletPodsDir)
volumeSpec := &volume.Spec{PersistentVolume: pv}
podName := util.GetUniquePodName(pod)
volumeName, err := dsw.AddPodToVolume(
podName, pod, volumeSpec, volumeSpec.Name(), "" /* volumeGidValue */)
// Assert
if err != nil {
t.Fatalf("AddPodToVolume failed. Expected: <no error> Actual: <%v>", err)
}
dsw.MarkVolumesReportedInUse([]v1.UniqueVolumeName{volumeName})
// Start the reconciler to fill ASW.
stopChan, stoppedChan := make(chan struct{}), make(chan struct{})
go func() {
reconciler.Run(stopChan)
close(stoppedChan)
}()
waitForVolumeToExistInASW(t, volumeName, asw)
if tc.volumeName == volumetesting.TimeoutAndFailOnSetupVolumeName {
// Wait upto 10s for reconciler to catchup
time.Sleep(reconcilerSyncWaitDuration)
}
if tc.volumeName == volumetesting.SuccessAndFailOnSetupVolumeName ||
tc.volumeName == volumetesting.SuccessAndTimeoutSetupVolumeName {
// wait for mount and then break it via remount
waitForMount(t, fakePlugin, volumeName, asw)
asw.MarkRemountRequired(podName)
time.Sleep(reconcilerSyncWaitDuration)
}
if tc.volumeState == operationexecutor.VolumeMountUncertain {
waitForUncertainPodMount(t, volumeName, asw)
}
if tc.volumeState == operationexecutor.VolumeMounted {
waitForMount(t, fakePlugin, volumeName, asw)
}
dsw.DeletePodFromVolume(podName, volumeName)
waitForDetach(t, volumeName, asw)
if mode == v1.PersistentVolumeFilesystem {
if err := volumetesting.VerifyUnmountDeviceCallCount(tc.unmountDeviceCallCount, fakePlugin); err != nil {
t.Errorf("Error verifying UnMountDeviceCallCount: %v", err)
}
if err := volumetesting.VerifyTearDownCallCount(tc.unmountVolumeCount, fakePlugin); err != nil {
t.Errorf("Error verifying UnMountDeviceCallCount: %v", err)
}
} else {
if tc.unmountVolumeCount == 0 {
if err := volumetesting.VerifyZeroUnmapPodDeviceCallCount(fakePlugin); err != nil {
t.Errorf("Error verifying UnMountDeviceCallCount: %v", err)
}
} else {
if err := volumetesting.VerifyUnmapPodDeviceCallCount(tc.unmountVolumeCount, fakePlugin); err != nil {
t.Errorf("Error verifying UnMountDeviceCallCount: %v", err)
}
}
if tc.unmountDeviceCallCount == 0 {
if err := volumetesting.VerifyZeroTearDownDeviceCallCount(fakePlugin); err != nil {
t.Errorf("Error verifying UnMountDeviceCallCount: %v", err)
}
} else {
if err := volumetesting.VerifyTearDownDeviceCallCount(tc.unmountDeviceCallCount, fakePlugin); err != nil {
t.Errorf("Error verifying UnMountDeviceCallCount: %v", err)
}
}
}
}) })
fakeRecorder := &record.FakeRecorder{} }
fakeHandler := volumetesting.NewBlockVolumePathHandler()
oex := operationexecutor.NewOperationExecutor(operationexecutor.NewOperationGenerator(
kubeClient,
volumePluginMgr,
fakeRecorder,
false, /* checkNodeCapabilitiesBeforeMount */
fakeHandler))
reconciler := NewReconciler(
kubeClient,
true, /* controllerAttachDetachEnabled */
reconcilerLoopSleepDuration,
waitForAttachTimeout,
nodeName,
dsw,
asw,
hasAddedPods,
oex,
&mount.FakeMounter{},
hostutil.NewFakeHostUtil(nil),
volumePluginMgr,
kubeletPodsDir)
volumeSpec := &volume.Spec{PersistentVolume: pv}
podName := util.GetUniquePodName(pod)
volumeName, err := dsw.AddPodToVolume(
podName, pod, volumeSpec, volumeSpec.Name(), "" /* volumeGidValue */)
// Assert
if err != nil {
t.Fatalf("AddPodToVolume failed. Expected: <no error> Actual: <%v>", err)
}
dsw.MarkVolumesReportedInUse([]v1.UniqueVolumeName{volumeName})
// Start the reconciler to fill ASW.
stopChan, stoppedChan := make(chan struct{}), make(chan struct{})
go func() {
reconciler.Run(stopChan)
close(stoppedChan)
}()
waitForVolumeToExistInASW(t, volumeName, asw)
if tc.volumeName == volumetesting.TimeoutAndFailOnSetupVolumeName {
// Wait upto 10s for reconciler to catchup
time.Sleep(reconcilerSyncWaitDuration)
}
if tc.volumeName == volumetesting.SuccessAndFailOnSetupVolumeName ||
tc.volumeName == volumetesting.SuccessAndTimeoutSetupVolumeName {
// wait for mount and then break it via remount
waitForMount(t, fakePlugin, volumeName, asw)
asw.MarkRemountRequired(podName)
time.Sleep(reconcilerSyncWaitDuration)
}
if tc.volumeState == operationexecutor.VolumeMountUncertain {
waitForUncertainPodMount(t, volumeName, asw)
}
if tc.volumeState == operationexecutor.VolumeMounted {
waitForMount(t, fakePlugin, volumeName, asw)
}
dsw.DeletePodFromVolume(podName, volumeName)
waitForDetach(t, volumeName, asw)
volumetesting.VerifyUnmountDeviceCallCount(tc.unmountDeviceCallCount, fakePlugin)
volumetesting.VerifyTearDownCallCount(tc.unmountVolumeCount, fakePlugin)
})
} }
} }
func waitForUncertainGlobalMount(t *testing.T, volumeName v1.UniqueVolumeName, asw cache.ActualStateOfWorld) { func waitForUncertainGlobalMount(t *testing.T, volumeName v1.UniqueVolumeName, asw cache.ActualStateOfWorld) {

View File

@ -88,6 +88,8 @@ go_test(
"//staging/src/k8s.io/component-base/featuregate/testing:go_default_library", "//staging/src/k8s.io/component-base/featuregate/testing:go_default_library",
"//vendor/github.com/container-storage-interface/spec/lib/go/csi:go_default_library", "//vendor/github.com/container-storage-interface/spec/lib/go/csi:go_default_library",
"//vendor/github.com/stretchr/testify/assert:go_default_library", "//vendor/github.com/stretchr/testify/assert:go_default_library",
"//vendor/google.golang.org/grpc/codes:go_default_library",
"//vendor/google.golang.org/grpc/status:go_default_library",
"//vendor/k8s.io/klog:go_default_library", "//vendor/k8s.io/klog:go_default_library",
], ],
) )

View File

@ -189,7 +189,7 @@ func (m *csiBlockMapper) stageVolumeForBlock(
nil /* MountOptions */) nil /* MountOptions */)
if err != nil { if err != nil {
return "", errors.New(log("blockMapper.stageVolumeForBlock failed: %v", err)) return "", err
} }
klog.V(4).Infof(log("blockMapper.stageVolumeForBlock successfully requested NodeStageVolume [%s]", stagingPath)) klog.V(4).Infof(log("blockMapper.stageVolumeForBlock successfully requested NodeStageVolume [%s]", stagingPath))
@ -249,7 +249,7 @@ func (m *csiBlockMapper) publishVolumeForBlock(
) )
if err != nil { if err != nil {
return "", errors.New(log("blockMapper.publishVolumeForBlock failed: %v", err)) return "", err
} }
return publishPath, nil return publishPath, nil
@ -503,19 +503,8 @@ func (m *csiBlockMapper) UnmapPodDevice() error {
ctx, cancel := context.WithTimeout(context.Background(), csiTimeout) ctx, cancel := context.WithTimeout(context.Background(), csiTimeout)
defer cancel() defer cancel()
// Call NodeUnpublishVolume // Call NodeUnpublishVolume.
if _, err := os.Stat(publishPath); err != nil { // Even if publishPath does not exist - previous NodePublish may have timed out
if os.IsNotExist(err) { // and Kubernetes makes sure that the operation is finished.
klog.V(4).Infof(log("blockMapper.UnmapPodDevice publishPath(%s) has already been deleted, skip calling NodeUnpublishVolume", publishPath)) return m.unpublishVolumeForBlock(ctx, csiClient, publishPath)
} else {
return err
}
} else {
err := m.unpublishVolumeForBlock(ctx, csiClient, publishPath)
if err != nil {
return err
}
}
return nil
} }

View File

@ -18,12 +18,14 @@ package csi
import ( import (
"context" "context"
"errors"
"fmt" "fmt"
"os" "os"
"path/filepath" "path/filepath"
"testing" "testing"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
api "k8s.io/api/core/v1" api "k8s.io/api/core/v1"
"k8s.io/api/storage/v1beta1" "k8s.io/api/storage/v1beta1"
meta "k8s.io/apimachinery/pkg/apis/meta/v1" meta "k8s.io/apimachinery/pkg/apis/meta/v1"
@ -299,7 +301,7 @@ func TestBlockMapperSetupDeviceError(t *testing.T) {
csiMapper.csiClient = setupClient(t, true) csiMapper.csiClient = setupClient(t, true)
fClient := csiMapper.csiClient.(*fakeCsiDriverClient) fClient := csiMapper.csiClient.(*fakeCsiDriverClient)
fClient.nodeClient.SetNextError(errors.New("mock final error")) fClient.nodeClient.SetNextError(status.Error(codes.InvalidArgument, "mock final error"))
attachID := getAttachmentName(csiMapper.volumeID, string(csiMapper.driverName), string(nodeName)) attachID := getAttachmentName(csiMapper.volumeID, string(csiMapper.driverName), string(nodeName))
attachment := makeTestAttachment(attachID, nodeName, pvName) attachment := makeTestAttachment(attachID, nodeName, pvName)

View File

@ -959,7 +959,42 @@ func (fv *FakeVolume) TearDownAt(dir string) error {
func (fv *FakeVolume) SetUpDevice() error { func (fv *FakeVolume) SetUpDevice() error {
fv.Lock() fv.Lock()
defer fv.Unlock() defer fv.Unlock()
if fv.VolName == TimeoutOnMountDeviceVolumeName {
fv.DeviceMountState[fv.VolName] = deviceMountUncertain
return volumetypes.NewUncertainProgressError("mount failed")
}
if fv.VolName == FailMountDeviceVolumeName {
fv.DeviceMountState[fv.VolName] = deviceNotMounted
return fmt.Errorf("error mapping disk: %s", fv.VolName)
}
if fv.VolName == TimeoutAndFailOnMountDeviceVolumeName {
_, ok := fv.DeviceMountState[fv.VolName]
if !ok {
fv.DeviceMountState[fv.VolName] = deviceMountUncertain
return volumetypes.NewUncertainProgressError("timed out mounting error")
}
fv.DeviceMountState[fv.VolName] = deviceNotMounted
return fmt.Errorf("error mapping disk: %s", fv.VolName)
}
if fv.VolName == SuccessAndTimeoutDeviceName {
_, ok := fv.DeviceMountState[fv.VolName]
if ok {
fv.DeviceMountState[fv.VolName] = deviceMountUncertain
return volumetypes.NewUncertainProgressError("error mounting state")
}
}
if fv.VolName == SuccessAndFailOnMountDeviceName {
_, ok := fv.DeviceMountState[fv.VolName]
if ok {
return fmt.Errorf("error mapping disk: %s", fv.VolName)
}
}
fv.DeviceMountState[fv.VolName] = deviceMounted
fv.SetUpDeviceCallCount++ fv.SetUpDeviceCallCount++
return nil return nil
} }
@ -1044,6 +1079,45 @@ func (fv *FakeVolume) GetUnmapPodDeviceCallCount() int {
func (fv *FakeVolume) MapPodDevice() (string, error) { func (fv *FakeVolume) MapPodDevice() (string, error) {
fv.Lock() fv.Lock()
defer fv.Unlock() defer fv.Unlock()
if fv.VolName == TimeoutOnSetupVolumeName {
fv.VolumeMountState[fv.VolName] = volumeMountUncertain
return "", volumetypes.NewUncertainProgressError("time out on setup")
}
if fv.VolName == FailOnSetupVolumeName {
fv.VolumeMountState[fv.VolName] = volumeNotMounted
return "", fmt.Errorf("mounting volume failed")
}
if fv.VolName == TimeoutAndFailOnSetupVolumeName {
_, ok := fv.VolumeMountState[fv.VolName]
if !ok {
fv.VolumeMountState[fv.VolName] = volumeMountUncertain
return "", volumetypes.NewUncertainProgressError("time out on setup")
}
fv.VolumeMountState[fv.VolName] = volumeNotMounted
return "", fmt.Errorf("mounting volume failed")
}
if fv.VolName == SuccessAndFailOnSetupVolumeName {
_, ok := fv.VolumeMountState[fv.VolName]
if ok {
fv.VolumeMountState[fv.VolName] = volumeNotMounted
return "", fmt.Errorf("mounting volume failed")
}
}
if fv.VolName == SuccessAndTimeoutSetupVolumeName {
_, ok := fv.VolumeMountState[fv.VolName]
if ok {
fv.VolumeMountState[fv.VolName] = volumeMountUncertain
return "", volumetypes.NewUncertainProgressError("time out on setup")
}
}
fv.VolumeMountState[fv.VolName] = volumeMounted
fv.MapPodDeviceCallCount++ fv.MapPodDeviceCallCount++
return "", nil return "", nil
} }
@ -1624,6 +1698,39 @@ func VerifyZeroTearDownDeviceCallCount(fakeVolumePlugin *FakeVolumePlugin) error
return nil return nil
} }
// VerifyUnmapPodDeviceCallCount ensures that at least one of the Unmappers for this
// plugin has the expected number of UnmapPodDevice calls. Otherwise it
// returns an error.
func VerifyUnmapPodDeviceCallCount(
expectedUnmapPodDeviceCallCount int,
fakeVolumePlugin *FakeVolumePlugin) error {
for _, unmapper := range fakeVolumePlugin.GetBlockVolumeUnmapper() {
actualCallCount := unmapper.GetUnmapPodDeviceCallCount()
if actualCallCount >= expectedUnmapPodDeviceCallCount {
return nil
}
}
return fmt.Errorf(
"No Unmapper have expected UnmapPodDeviceCallCount. Expected: <%v>.",
expectedUnmapPodDeviceCallCount)
}
// VerifyZeroUnmapPodDeviceCallCount ensures that all Mappers for this plugin have a
// zero UnmapPodDevice calls. Otherwise it returns an error.
func VerifyZeroUnmapPodDeviceCallCount(fakeVolumePlugin *FakeVolumePlugin) error {
for _, unmapper := range fakeVolumePlugin.GetBlockVolumeUnmapper() {
actualCallCount := unmapper.GetUnmapPodDeviceCallCount()
if actualCallCount != 0 {
return fmt.Errorf(
"At least one unmapper has non-zero UnmapPodDeviceCallCount: <%v>.",
actualCallCount)
}
}
return nil
}
// VerifyGetGlobalMapPathCallCount ensures that at least one of the Mappers for this // VerifyGetGlobalMapPathCallCount ensures that at least one of the Mappers for this
// plugin has the expectedGlobalMapPathCallCount number of calls. Otherwise it returns // plugin has the expectedGlobalMapPathCallCount number of calls. Otherwise it returns
// an error. // an error.

View File

@ -20,6 +20,7 @@ import (
"context" "context"
goerrors "errors" goerrors "errors"
"fmt" "fmt"
"os"
"path/filepath" "path/filepath"
"strings" "strings"
"time" "time"
@ -929,7 +930,7 @@ func (og *operationGenerator) GenerateMapVolumeFunc(
volumeAttacher, _ = attachableVolumePlugin.NewAttacher() volumeAttacher, _ = attachableVolumePlugin.NewAttacher()
} }
mapVolumeFunc := func() (error, error) { mapVolumeFunc := func() (simpleErr error, detailedErr error) {
var devicePath string var devicePath string
// Set up global map path under the given plugin directory using symbolic link // Set up global map path under the given plugin directory using symbolic link
globalMapPath, err := globalMapPath, err :=
@ -956,6 +957,7 @@ func (og *operationGenerator) GenerateMapVolumeFunc(
if customBlockVolumeMapper, ok := blockVolumeMapper.(volume.CustomBlockVolumeMapper); ok { if customBlockVolumeMapper, ok := blockVolumeMapper.(volume.CustomBlockVolumeMapper); ok {
mapErr := customBlockVolumeMapper.SetUpDevice() mapErr := customBlockVolumeMapper.SetUpDevice()
if mapErr != nil { if mapErr != nil {
og.markDeviceErrorState(volumeToMount, devicePath, globalMapPath, mapErr, actualStateOfWorld)
// On failure, return error. Caller will log and retry. // On failure, return error. Caller will log and retry.
return volumeToMount.GenerateError("MapVolume.SetUpDevice failed", mapErr) return volumeToMount.GenerateError("MapVolume.SetUpDevice failed", mapErr)
} }
@ -970,15 +972,36 @@ func (og *operationGenerator) GenerateMapVolumeFunc(
return volumeToMount.GenerateError("MapVolume.MarkDeviceAsMounted failed", markDeviceMappedErr) return volumeToMount.GenerateError("MapVolume.MarkDeviceAsMounted failed", markDeviceMappedErr)
} }
markVolumeOpts := MarkVolumeOpts{
PodName: volumeToMount.PodName,
PodUID: volumeToMount.Pod.UID,
VolumeName: volumeToMount.VolumeName,
BlockVolumeMapper: blockVolumeMapper,
OuterVolumeSpecName: volumeToMount.OuterVolumeSpecName,
VolumeGidVolume: volumeToMount.VolumeGidValue,
VolumeSpec: volumeToMount.VolumeSpec,
VolumeMountState: VolumeMounted,
}
// Call MapPodDevice if blockVolumeMapper implements CustomBlockVolumeMapper // Call MapPodDevice if blockVolumeMapper implements CustomBlockVolumeMapper
if customBlockVolumeMapper, ok := blockVolumeMapper.(volume.CustomBlockVolumeMapper); ok { if customBlockVolumeMapper, ok := blockVolumeMapper.(volume.CustomBlockVolumeMapper); ok {
// Execute driver specific map // Execute driver specific map
pluginDevicePath, mapErr := customBlockVolumeMapper.MapPodDevice() pluginDevicePath, mapErr := customBlockVolumeMapper.MapPodDevice()
if mapErr != nil { if mapErr != nil {
// On failure, return error. Caller will log and retry. // On failure, return error. Caller will log and retry.
og.markVolumeErrorState(volumeToMount, markVolumeOpts, mapErr, actualStateOfWorld)
return volumeToMount.GenerateError("MapVolume.MapPodDevice failed", mapErr) return volumeToMount.GenerateError("MapVolume.MapPodDevice failed", mapErr)
} }
// From now on, the volume is mapped. Mark it as uncertain on error,
// so it is is unmapped when corresponding pod is deleted.
defer func() {
if simpleErr != nil {
errText := simpleErr.Error()
og.markVolumeErrorState(volumeToMount, markVolumeOpts, volumetypes.NewUncertainProgressError(errText), actualStateOfWorld)
}
}()
// if pluginDevicePath is provided, assume attacher may not provide device // if pluginDevicePath is provided, assume attacher may not provide device
// or attachment flow uses SetupDevice to get device path // or attachment flow uses SetupDevice to get device path
if len(pluginDevicePath) != 0 { if len(pluginDevicePath) != 0 {
@ -1044,17 +1067,6 @@ func (og *operationGenerator) GenerateMapVolumeFunc(
return volumeToMount.GenerateError("MapVolume.MarkVolumeAsMounted failed while expanding volume", resizeError) return volumeToMount.GenerateError("MapVolume.MarkVolumeAsMounted failed while expanding volume", resizeError)
} }
markVolumeOpts := MarkVolumeOpts{
PodName: volumeToMount.PodName,
PodUID: volumeToMount.Pod.UID,
VolumeName: volumeToMount.VolumeName,
BlockVolumeMapper: blockVolumeMapper,
OuterVolumeSpecName: volumeToMount.OuterVolumeSpecName,
VolumeGidVolume: volumeToMount.VolumeGidValue,
VolumeSpec: volumeToMount.VolumeSpec,
VolumeMountState: VolumeMounted,
}
markVolMountedErr := actualStateOfWorld.MarkVolumeAsMounted(markVolumeOpts) markVolMountedErr := actualStateOfWorld.MarkVolumeAsMounted(markVolumeOpts)
if markVolMountedErr != nil { if markVolMountedErr != nil {
// On failure, return error. Caller will log and retry. // On failure, return error. Caller will log and retry.
@ -1191,7 +1203,12 @@ func (og *operationGenerator) GenerateUnmapDeviceFunc(
globalMapPath := deviceToDetach.DeviceMountPath globalMapPath := deviceToDetach.DeviceMountPath
refs, err := og.blkUtil.GetDeviceBindMountRefs(deviceToDetach.DevicePath, globalMapPath) refs, err := og.blkUtil.GetDeviceBindMountRefs(deviceToDetach.DevicePath, globalMapPath)
if err != nil { if err != nil {
return deviceToDetach.GenerateError("UnmapDevice.GetDeviceBindMountRefs check failed", err) if os.IsNotExist(err) {
// Looks like SetupDevice did not complete. Fall through to TearDownDevice and mark the device as unmounted.
refs = nil
} else {
return deviceToDetach.GenerateError("UnmapDevice.GetDeviceBindMountRefs check failed", err)
}
} }
if len(refs) > 0 { if len(refs) > 0 {
err = fmt.Errorf("The device %q is still referenced from other Pods %v", globalMapPath, refs) err = fmt.Errorf("The device %q is still referenced from other Pods %v", globalMapPath, refs)

View File

@ -283,7 +283,7 @@ func (v VolumePathHandler) GetDeviceBindMountRefs(devPath string, mapPath string
var refs []string var refs []string
files, err := ioutil.ReadDir(mapPath) files, err := ioutil.ReadDir(mapPath)
if err != nil { if err != nil {
return nil, fmt.Errorf("directory cannot read %v", err) return nil, err
} }
for _, file := range files { for _, file := range files {
if file.Mode()&os.ModeDevice != os.ModeDevice { if file.Mode()&os.ModeDevice != os.ModeDevice {