diff --git a/pkg/kubelet/volumemanager/cache/actual_state_of_world.go b/pkg/kubelet/volumemanager/cache/actual_state_of_world.go index 5d89aa5febb..182657e1c11 100644 --- a/pkg/kubelet/volumemanager/cache/actual_state_of_world.go +++ b/pkg/kubelet/volumemanager/cache/actual_state_of_world.go @@ -173,6 +173,7 @@ type AttachedVolume struct { DeviceMountState operationexecutor.DeviceMountState } +// DeviceMayBeMounted returns true if device may be mounted in global path. func (av AttachedVolume) DeviceMayBeMounted() bool { return av.DeviceMountState == operationexecutor.DeviceGloballyMounted || av.DeviceMountState == operationexecutor.DeviceMountUncertain @@ -252,11 +253,6 @@ type attachedVolume struct { // this volume implements the volume.Attacher interface pluginIsAttachable bool - // globallyMounted indicates that the volume is mounted to the underlying - // device at a global mount point. This global mount point must be unmounted - // prior to detach. - globallyMounted bool - // deviceMountState stores information that tells us if device is mounted // globally or not deviceMountState operationexecutor.DeviceMountState @@ -313,11 +309,11 @@ type mountedPod struct { // mounted to this pod but its size has been expanded after that. fsResizeRequired bool - // volumeMounted stores state of volume mount for the pod. if it is: + // volumeMountStateForPod stores state of volume mount for the pod. if it is: // - VolumeMounted: means volume for pod has been successfully mounted // - VolumeMountUncertain: means volume for pod may not be mounted, but it must be unmounted // - VolumeNotMounted: means volume for pod has not been mounted - volumeMounted operationexecutor.VolumeMountState + volumeMountStateForPod operationexecutor.VolumeMountState } func (asw *actualStateOfWorld) MarkVolumeAsAttached( @@ -363,6 +359,11 @@ func (asw *actualStateOfWorld) MarkDeviceAsUncertain( return asw.SetDeviceMountState(volumeName, operationexecutor.DeviceMountUncertain, devicePath, deviceMountPath) } +func (asw *actualStateOfWorld) MarkVolumeMountAsUncertain(markVolumeOpts operationexecutor.MarkVolumeMountedOpts) error { + markVolumeOpts.VolumeMountState = operationexecutor.VolumeMountUncertain + return asw.AddPodToVolume(markVolumeOpts) +} + func (asw *actualStateOfWorld) MarkDeviceAsUnmounted( volumeName v1.UniqueVolumeName) error { return asw.SetDeviceMountState(volumeName, operationexecutor.DeviceNotMounted, "", "") @@ -448,14 +449,14 @@ func (asw *actualStateOfWorld) AddPodToVolume(markVolumeOpts operationexecutor.M podObj, podExists := volumeObj.mountedPods[podName] if !podExists { podObj = mountedPod{ - podName: podName, - podUID: podUID, - mounter: mounter, - blockVolumeMapper: blockVolumeMapper, - outerVolumeSpecName: outerVolumeSpecName, - volumeGidValue: volumeGidValue, - volumeSpec: volumeSpec, - volumeMounted: markVolumeOpts.VolumeMountState, + podName: podName, + podUID: podUID, + mounter: mounter, + blockVolumeMapper: blockVolumeMapper, + outerVolumeSpecName: outerVolumeSpecName, + volumeGidValue: volumeGidValue, + volumeSpec: volumeSpec, + volumeMountStateForPod: markVolumeOpts.VolumeMountState, } } @@ -674,7 +675,7 @@ func (asw *actualStateOfWorld) GetMountedVolumes() []MountedVolume { mountedVolume := make([]MountedVolume, 0 /* len */, len(asw.attachedVolumes) /* cap */) for _, volumeObj := range asw.attachedVolumes { for _, podObj := range volumeObj.mountedPods { - if podObj.volumeMounted == operationexecutor.VolumeMounted { + if podObj.volumeMountStateForPod == operationexecutor.VolumeMounted { mountedVolume = append( mountedVolume, getMountedVolume(&podObj, &volumeObj)) @@ -684,14 +685,15 @@ func (asw *actualStateOfWorld) GetMountedVolumes() []MountedVolume { return mountedVolume } +// GetAllMountedVolumes returns all volumes which could be locally mounted for a pod. func (asw *actualStateOfWorld) GetAllMountedVolumes() []MountedVolume { asw.RLock() defer asw.RUnlock() mountedVolume := make([]MountedVolume, 0 /* len */, len(asw.attachedVolumes) /* cap */) for _, volumeObj := range asw.attachedVolumes { for _, podObj := range volumeObj.mountedPods { - if podObj.volumeMounted == operationexecutor.VolumeMounted || - podObj.volumeMounted == operationexecutor.VolumeMountUncertain { + if podObj.volumeMountStateForPod == operationexecutor.VolumeMounted || + podObj.volumeMountStateForPod == operationexecutor.VolumeMountUncertain { mountedVolume = append( mountedVolume, getMountedVolume(&podObj, &volumeObj)) @@ -710,7 +712,7 @@ func (asw *actualStateOfWorld) GetMountedVolumesForPod( mountedVolume := make([]MountedVolume, 0 /* len */, len(asw.attachedVolumes) /* cap */) for _, volumeObj := range asw.attachedVolumes { for mountedPodName, podObj := range volumeObj.mountedPods { - if mountedPodName == podName && podObj.volumeMounted == operationexecutor.VolumeMounted { + if mountedPodName == podName && podObj.volumeMountStateForPod == operationexecutor.VolumeMounted { mountedVolume = append( mountedVolume, getMountedVolume(&podObj, &volumeObj)) diff --git a/pkg/kubelet/volumemanager/metrics/metrics_test.go b/pkg/kubelet/volumemanager/metrics/metrics_test.go index 986fb6f0e65..2af2c17cdf5 100644 --- a/pkg/kubelet/volumemanager/metrics/metrics_test.go +++ b/pkg/kubelet/volumemanager/metrics/metrics_test.go @@ -86,6 +86,7 @@ func TestMetricCollection(t *testing.T) { BlockVolumeMapper: mapper, OuterVolumeSpecName: volumeSpec.Name(), VolumeSpec: volumeSpec, + VolumeMountState: operationexecutor.VolumeMounted, } err = asw.AddPodToVolume(markVolumeOpts) if err != nil { diff --git a/pkg/kubelet/volumemanager/reconciler/reconciler.go b/pkg/kubelet/volumemanager/reconciler/reconciler.go index cd000065a68..ec46ea492ac 100644 --- a/pkg/kubelet/volumemanager/reconciler/reconciler.go +++ b/pkg/kubelet/volumemanager/reconciler/reconciler.go @@ -167,7 +167,9 @@ func (rc *reconciler) reconcile() { rc.unmountVolumes() // Next we mount required volumes. This function could also trigger - // detach if kubelet is responsible for detaching volumes. + // attach if kubelet is responsible for attaching volumes. + // If underlying PVC was resized while in-use then this function also handles volume + // resizing. rc.mountAttachVolumes() // Ensure devices that should be detached/unmounted are detached/unmounted. diff --git a/pkg/kubelet/volumemanager/reconciler/reconciler_test.go b/pkg/kubelet/volumemanager/reconciler/reconciler_test.go index 0fccd8d7567..dfafd8985a1 100644 --- a/pkg/kubelet/volumemanager/reconciler/reconciler_test.go +++ b/pkg/kubelet/volumemanager/reconciler/reconciler_test.go @@ -336,7 +336,7 @@ func Test_Run_Positive_VolumeAttachMountUnmountDetach(t *testing.T) { // Act dsw.DeletePodFromVolume(podName, generatedVolumeName) - waitForDetach(t, fakePlugin, generatedVolumeName, asw) + waitForDetach(t, generatedVolumeName, asw) // Assert assert.NoError(t, volumetesting.VerifyTearDownCallCount( @@ -428,7 +428,7 @@ func Test_Run_Positive_VolumeUnmountControllerAttachEnabled(t *testing.T) { // Act dsw.DeletePodFromVolume(podName, generatedVolumeName) - waitForDetach(t, fakePlugin, generatedVolumeName, asw) + waitForDetach(t, generatedVolumeName, asw) // Assert assert.NoError(t, volumetesting.VerifyTearDownCallCount( @@ -739,7 +739,7 @@ func Test_Run_Positive_BlockVolumeAttachMapUnmapDetach(t *testing.T) { // Act dsw.DeletePodFromVolume(podName, generatedVolumeName) - waitForDetach(t, fakePlugin, generatedVolumeName, asw) + waitForDetach(t, generatedVolumeName, asw) // Assert assert.NoError(t, volumetesting.VerifyTearDownDeviceCallCount( @@ -855,7 +855,7 @@ func Test_Run_Positive_VolumeUnmapControllerAttachEnabled(t *testing.T) { // Act dsw.DeletePodFromVolume(podName, generatedVolumeName) - waitForDetach(t, fakePlugin, generatedVolumeName, asw) + waitForDetach(t, generatedVolumeName, asw) // Assert assert.NoError(t, volumetesting.VerifyTearDownDeviceCallCount( @@ -1145,6 +1145,237 @@ func Test_Run_Positive_VolumeFSResizeControllerAttachEnabled(t *testing.T) { } } +func Test_UncertainDeviceGlobalMounts(t *testing.T) { + fsMode := v1.PersistentVolumeFilesystem + pv := &v1.PersistentVolume{ + ObjectMeta: metav1.ObjectMeta{ + Name: volumetesting.TimeoutOnMountDeviceVolumeName, + UID: "pvuid", + }, + Spec: v1.PersistentVolumeSpec{ + ClaimRef: &v1.ObjectReference{Name: "pvc"}, + VolumeMode: &fsMode, + }, + } + pvc := &v1.PersistentVolumeClaim{ + ObjectMeta: metav1.ObjectMeta{ + Name: "pvc", + UID: "pvcuid", + }, + Spec: v1.PersistentVolumeClaimSpec{ + VolumeName: volumetesting.TimeoutOnMountDeviceVolumeName, + }, + } + pod := &v1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: "pod1", + UID: "pod1uid", + }, + Spec: v1.PodSpec{ + Volumes: []v1.Volume{ + { + Name: "volume-name", + VolumeSource: v1.VolumeSource{ + PersistentVolumeClaim: &v1.PersistentVolumeClaimVolumeSource{ + ClaimName: pvc.Name, + }, + }, + }, + }, + }, + } + + volumePluginMgr, fakePlugin := volumetesting.GetTestVolumePluginMgr(t) + dsw := cache.NewDesiredStateOfWorld(volumePluginMgr) + asw := cache.NewActualStateOfWorld(nodeName, volumePluginMgr) + kubeClient := createtestClientWithPVPVC(pv, pvc, v1.AttachedVolume{ + Name: v1.UniqueVolumeName(fmt.Sprintf("fake-plugin/%s", volumetesting.TimeoutOnMountDeviceVolumeName)), + DevicePath: "fake/path", + }) + fakeRecorder := &record.FakeRecorder{} + fakeHandler := volumetesting.NewBlockVolumePathHandler() + oex := operationexecutor.NewOperationExecutor(operationexecutor.NewOperationGenerator( + kubeClient, + volumePluginMgr, + fakeRecorder, + false, /* checkNodeCapabilitiesBeforeMount */ + fakeHandler)) + + reconciler := NewReconciler( + kubeClient, + true, /* controllerAttachDetachEnabled */ + reconcilerLoopSleepDuration, + waitForAttachTimeout, + nodeName, + dsw, + asw, + hasAddedPods, + oex, + &mount.FakeMounter{}, + hostutil.NewFakeHostUtil(nil), + volumePluginMgr, + kubeletPodsDir) + volumeSpec := &volume.Spec{PersistentVolume: pv} + podName := util.GetUniquePodName(pod) + volumeName, err := dsw.AddPodToVolume( + podName, pod, volumeSpec, volumeSpec.Name(), "" /* volumeGidValue */) + // Assert + if err != nil { + t.Fatalf("AddPodToVolume failed. Expected: Actual: <%v>", err) + } + dsw.MarkVolumesReportedInUse([]v1.UniqueVolumeName{volumeName}) + + // Start the reconciler to fill ASW. + stopChan, stoppedChan := make(chan struct{}), make(chan struct{}) + go func() { + reconciler.Run(stopChan) + close(stoppedChan) + }() + waitForVolumeToExistInASW(t, volumeName, asw) + waitForUncertainGlobalMount(t, volumeName, asw) + + dsw.DeletePodFromVolume(podName, volumeName) + waitForDetach(t, volumeName, asw) + + volumetesting.VerifyUnmountDeviceCallCount(1, fakePlugin) +} + +func Test_UncertainVolumeMountState(t *testing.T) { + fsMode := v1.PersistentVolumeFilesystem + pv := &v1.PersistentVolume{ + ObjectMeta: metav1.ObjectMeta{ + Name: volumetesting.TimeoutOnSetupVolumeName, + UID: "pvuid", + }, + Spec: v1.PersistentVolumeSpec{ + ClaimRef: &v1.ObjectReference{Name: "pvc"}, + VolumeMode: &fsMode, + }, + } + pvc := &v1.PersistentVolumeClaim{ + ObjectMeta: metav1.ObjectMeta{ + Name: "pvc", + UID: "pvcuid", + }, + Spec: v1.PersistentVolumeClaimSpec{ + VolumeName: volumetesting.TimeoutOnSetupVolumeName, + }, + } + pod := &v1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: "pod1", + UID: "pod1uid", + }, + Spec: v1.PodSpec{ + Volumes: []v1.Volume{ + { + Name: "volume-name", + VolumeSource: v1.VolumeSource{ + PersistentVolumeClaim: &v1.PersistentVolumeClaimVolumeSource{ + ClaimName: pvc.Name, + }, + }, + }, + }, + }, + } + + volumePluginMgr, fakePlugin := volumetesting.GetTestVolumePluginMgr(t) + dsw := cache.NewDesiredStateOfWorld(volumePluginMgr) + asw := cache.NewActualStateOfWorld(nodeName, volumePluginMgr) + kubeClient := createtestClientWithPVPVC(pv, pvc, v1.AttachedVolume{ + Name: v1.UniqueVolumeName(fmt.Sprintf("fake-plugin/%s", volumetesting.TimeoutOnSetupVolumeName)), + DevicePath: "fake/path", + }) + fakeRecorder := &record.FakeRecorder{} + fakeHandler := volumetesting.NewBlockVolumePathHandler() + oex := operationexecutor.NewOperationExecutor(operationexecutor.NewOperationGenerator( + kubeClient, + volumePluginMgr, + fakeRecorder, + false, /* checkNodeCapabilitiesBeforeMount */ + fakeHandler)) + + reconciler := NewReconciler( + kubeClient, + true, /* controllerAttachDetachEnabled */ + reconcilerLoopSleepDuration, + waitForAttachTimeout, + nodeName, + dsw, + asw, + hasAddedPods, + oex, + &mount.FakeMounter{}, + hostutil.NewFakeHostUtil(nil), + volumePluginMgr, + kubeletPodsDir) + volumeSpec := &volume.Spec{PersistentVolume: pv} + podName := util.GetUniquePodName(pod) + volumeName, err := dsw.AddPodToVolume( + podName, pod, volumeSpec, volumeSpec.Name(), "" /* volumeGidValue */) + // Assert + if err != nil { + t.Fatalf("AddPodToVolume failed. Expected: Actual: <%v>", err) + } + dsw.MarkVolumesReportedInUse([]v1.UniqueVolumeName{volumeName}) + + // Start the reconciler to fill ASW. + stopChan, stoppedChan := make(chan struct{}), make(chan struct{}) + go func() { + reconciler.Run(stopChan) + close(stoppedChan) + }() + waitForVolumeToExistInASW(t, volumeName, asw) + waitForUncertainPodMount(t, volumeName, asw) + + dsw.DeletePodFromVolume(podName, volumeName) + waitForDetach(t, volumeName, asw) + + volumetesting.VerifyUnmountDeviceCallCount(1, fakePlugin) + volumetesting.VerifyTearDownCallCount(1, fakePlugin) +} + +func waitForUncertainGlobalMount(t *testing.T, volumeName v1.UniqueVolumeName, asw cache.ActualStateOfWorld) { + // check if volume is globally mounted in uncertain state + err := retryWithExponentialBackOff( + time.Duration(500*time.Millisecond), + func() (bool, error) { + unmountedVolumes := asw.GetUnmountedVolumes() + for _, v := range unmountedVolumes { + if v.VolumeName == volumeName && v.DeviceMountState == operationexecutor.DeviceMountUncertain { + return true, nil + } + } + return false, nil + }, + ) + + if err != nil { + t.Fatalf("expected volumes %s to be mounted in uncertain state globally", volumeName) + } +} + +func waitForUncertainPodMount(t *testing.T, volumeName v1.UniqueVolumeName, asw cache.ActualStateOfWorld) { + // check if volume is locally pod mounted in uncertain state + err := retryWithExponentialBackOff( + time.Duration(500*time.Millisecond), + func() (bool, error) { + allMountedVolumes := asw.GetAllMountedVolumes() + for _, v := range allMountedVolumes { + if v.VolumeName == volumeName { + return true, nil + } + } + return false, nil + }, + ) + + if err != nil { + t.Fatalf("expected volumes %s to be mounted in uncertain state for pod", volumeName) + } +} + func waitForMount( t *testing.T, fakePlugin *volumetesting.FakeVolumePlugin, @@ -1169,9 +1400,23 @@ func waitForMount( } } +func waitForVolumeToExistInASW(t *testing.T, volumeName v1.UniqueVolumeName, asw cache.ActualStateOfWorld) { + err := retryWithExponentialBackOff( + time.Duration(500*time.Millisecond), + func() (bool, error) { + if asw.VolumeExists(volumeName) { + return true, nil + } + return false, nil + }, + ) + if err != nil { + t.Fatalf("Timed out waiting for volume %q to be exist in asw.", volumeName) + } +} + func waitForDetach( t *testing.T, - fakePlugin *volumetesting.FakeVolumePlugin, volumeName v1.UniqueVolumeName, asw cache.ActualStateOfWorld) { err := retryWithExponentialBackOff( diff --git a/pkg/volume/csi/BUILD b/pkg/volume/csi/BUILD index 1b4bf551c8f..db1aa80e0d7 100644 --- a/pkg/volume/csi/BUILD +++ b/pkg/volume/csi/BUILD @@ -20,6 +20,7 @@ go_library( "//pkg/volume:go_default_library", "//pkg/volume/csi/nodeinfomanager:go_default_library", "//pkg/volume/util:go_default_library", + "//pkg/volume/util/types:go_default_library", "//staging/src/k8s.io/api/core/v1:go_default_library", "//staging/src/k8s.io/api/storage/v1:go_default_library", "//staging/src/k8s.io/api/storage/v1beta1:go_default_library", @@ -37,6 +38,8 @@ go_library( "//staging/src/k8s.io/csi-translation-lib/plugins:go_default_library", "//vendor/github.com/container-storage-interface/spec/lib/go/csi:go_default_library", "//vendor/google.golang.org/grpc:go_default_library", + "//vendor/google.golang.org/grpc/codes:go_default_library", + "//vendor/google.golang.org/grpc/status:go_default_library", "//vendor/k8s.io/klog:go_default_library", "//vendor/k8s.io/utils/strings:go_default_library", ], diff --git a/pkg/volume/csi/csi_attacher.go b/pkg/volume/csi/csi_attacher.go index 1ca94ddfdfa..b25aba98068 100644 --- a/pkg/volume/csi/csi_attacher.go +++ b/pkg/volume/csi/csi_attacher.go @@ -36,6 +36,7 @@ import ( "k8s.io/apimachinery/pkg/watch" "k8s.io/client-go/kubernetes" "k8s.io/kubernetes/pkg/volume" + volumetypes "k8s.io/kubernetes/pkg/volume/util/types" ) const ( @@ -265,7 +266,8 @@ func (c *csiAttacher) MountDevice(spec *volume.Spec, devicePath string, deviceMo return err } defer func() { - if err != nil { + // Only for non-timedout errors remove the mount directory + if err != nil && !volumetypes.IsOperationTimeOutError(err) { // clean up metadata klog.Errorf(log("attacher.MountDevice failed: %v", err)) if err := removeMountDir(c.plugin, deviceMountPath); err != nil { diff --git a/pkg/volume/csi/csi_client.go b/pkg/volume/csi/csi_client.go index 99c3181a0aa..6f54f871e8f 100644 --- a/pkg/volume/csi/csi_client.go +++ b/pkg/volume/csi/csi_client.go @@ -28,11 +28,14 @@ import ( csipbv1 "github.com/container-storage-interface/spec/lib/go/csi" "google.golang.org/grpc" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" api "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/resource" "k8s.io/apimachinery/pkg/util/wait" "k8s.io/klog" "k8s.io/kubernetes/pkg/volume" + volumetypes "k8s.io/kubernetes/pkg/volume/util/types" ) type csiClient interface { @@ -213,6 +216,7 @@ func (c *csiDriverClient) NodePublishVolume( if targetPath == "" { return errors.New("missing target path") } + if c.nodeV1ClientCreator == nil { return errors.New("failed to call NodePublishVolume. nodeV1ClientCreator is nil") @@ -255,7 +259,10 @@ func (c *csiDriverClient) NodePublishVolume( } _, err = nodeClient.NodePublishVolume(ctx, req) - return err + if err != nil && !isFinalError(err) { + return volumetypes.NewOperationTimedOutError(err.Error()) + } + return nil } func (c *csiDriverClient) NodeExpandVolume(ctx context.Context, volumeID, volumePath string, newSize resource.Quantity) (resource.Quantity, error) { @@ -374,6 +381,9 @@ func (c *csiDriverClient) NodeStageVolume(ctx context.Context, } _, err = nodeClient.NodeStageVolume(ctx, req) + if err != nil && !isFinalError(err) { + return volumetypes.NewOperationTimedOutError(err.Error()) + } return err } @@ -613,3 +623,27 @@ func (c *csiDriverClient) NodeGetVolumeStats(ctx context.Context, volID string, } return metrics, nil } + +func isFinalError(err error) bool { + // Sources: + // https://github.com/grpc/grpc/blob/master/doc/statuscodes.md + // https://github.com/container-storage-interface/spec/blob/master/spec.md + st, ok := status.FromError(err) + if !ok { + // This is not gRPC error. The operation must have failed before gRPC + // method was called, otherwise we would get gRPC error. + // We don't know if any previous CreateVolume is in progress, be on the safe side. + return false + } + switch st.Code() { + case codes.Canceled, // gRPC: Client Application cancelled the request + codes.DeadlineExceeded, // gRPC: Timeout + codes.Unavailable, // gRPC: Server shutting down, TCP connection broken - previous CreateVolume() may be still in progress. + codes.ResourceExhausted, // gRPC: Server temporarily out of resources - previous CreateVolume() may be still in progress. + codes.Aborted: // CSI: Operation pending for volume + return false + } + // All other errors mean that provisioning either did not + // even start or failed. It is for sure not in progress. + return true +} diff --git a/pkg/volume/csi/csi_mounter.go b/pkg/volume/csi/csi_mounter.go index 2cce6de579c..7fb73725da3 100644 --- a/pkg/volume/csi/csi_mounter.go +++ b/pkg/volume/csi/csi_mounter.go @@ -36,6 +36,7 @@ import ( "k8s.io/client-go/kubernetes" "k8s.io/kubernetes/pkg/features" "k8s.io/kubernetes/pkg/volume" + volumetypes "k8s.io/kubernetes/pkg/volume/util/types" utilstrings "k8s.io/utils/strings" ) @@ -254,8 +255,11 @@ func (c *csiMountMgr) SetUpAt(dir string, mounterArgs volume.MounterArgs) error ) if err != nil { - if removeMountDirErr := removeMountDir(c.plugin, dir); removeMountDirErr != nil { - klog.Error(log("mounter.SetupAt failed to remove mount dir after a NodePublish() error [%s]: %v", dir, removeMountDirErr)) + // If error is not of type time out then we can remove the mount directory + if volumetypes.IsOperationTimeOutError(err) { + if removeMountDirErr := removeMountDir(c.plugin, dir); removeMountDirErr != nil { + klog.Error(log("mounter.SetupAt failed to remove mount dir after a NodePublish() error [%s]: %v", dir, removeMountDirErr)) + } } return errors.New(log("mounter.SetupAt failed: %v", err)) } diff --git a/pkg/volume/testing/BUILD b/pkg/volume/testing/BUILD index 8d9502fd331..b585365f1ee 100644 --- a/pkg/volume/testing/BUILD +++ b/pkg/volume/testing/BUILD @@ -18,6 +18,7 @@ go_library( "//pkg/volume/util/hostutil:go_default_library", "//pkg/volume/util/recyclerclient:go_default_library", "//pkg/volume/util/subpath:go_default_library", + "//pkg/volume/util/types:go_default_library", "//pkg/volume/util/volumepathhandler:go_default_library", "//staging/src/k8s.io/api/authentication/v1:go_default_library", "//staging/src/k8s.io/api/core/v1:go_default_library", diff --git a/pkg/volume/testing/testing.go b/pkg/volume/testing/testing.go index 0134d1b2077..e74750cb067 100644 --- a/pkg/volume/testing/testing.go +++ b/pkg/volume/testing/testing.go @@ -51,6 +51,7 @@ import ( "k8s.io/kubernetes/pkg/volume/util/hostutil" "k8s.io/kubernetes/pkg/volume/util/recyclerclient" "k8s.io/kubernetes/pkg/volume/util/subpath" + volumetypes "k8s.io/kubernetes/pkg/volume/util/types" "k8s.io/kubernetes/pkg/volume/util/volumepathhandler" ) @@ -66,6 +67,10 @@ const ( TimeoutAttachNode = "timeout-attach-node" // The node is marked as multi-attach which means it is allowed to attach the volume to multiple nodes. MultiAttachNode = "multi-attach-node" + // TimeoutOnSetupVolumeName will cause Setup call to timeout but volume will finish mounting. + TimeoutOnSetupVolumeName = "timeout-setup-volume" + // TimeoutOnMountDeviceVolumeName will cause MountDevice call to timeout but Setup will finish. + TimeoutOnMountDeviceVolumeName = "timeout-mount-device-volume" ) // fakeVolumeHost is useful for testing volume plugins. @@ -835,6 +840,9 @@ func (fv *FakeVolume) CanMount() error { func (fv *FakeVolume) SetUp(mounterArgs MounterArgs) error { fv.Lock() defer fv.Unlock() + if fv.VolName == TimeoutOnSetupVolumeName { + return volumetypes.NewOperationTimedOutError("time out on setup") + } fv.SetUpCallCount++ return fv.SetUpAt(fv.getPath(), mounterArgs) } @@ -1039,6 +1047,9 @@ func (fv *FakeVolume) GetDeviceMountPath(spec *Spec) (string, error) { func (fv *FakeVolume) MountDevice(spec *Spec, devicePath string, deviceMountPath string) error { fv.Lock() defer fv.Unlock() + if spec.Name() == TimeoutOnMountDeviceVolumeName { + return volumetypes.NewOperationTimedOutError("error mounting device") + } fv.MountDeviceCallCount++ return nil } @@ -1049,6 +1060,12 @@ func (fv *FakeVolume) GetMountDeviceCallCount() int { return fv.MountDeviceCallCount } +func (fv *FakeVolume) GetUnmountDeviceCallCount() int { + fv.RLock() + defer fv.RUnlock() + return fv.UnmountDeviceCallCount +} + func (fv *FakeVolume) Detach(volumeName string, nodeName types.NodeName) error { fv.Lock() defer fv.Unlock() @@ -1304,6 +1321,19 @@ func VerifyMountDeviceCallCount( expectedMountDeviceCallCount) } +func VerifyUnmountDeviceCallCount(expectedCallCount int, fakeVolumePlugin *FakeVolumePlugin) error { + for _, attacher := range fakeVolumePlugin.GetAttachers() { + actualCallCount := attacher.GetUnmountDeviceCallCount() + if actualCallCount >= expectedCallCount { + return nil + } + } + + return fmt.Errorf( + "No Attachers have expected MountunDeviceCallCount. Expected: <%v>.", + expectedCallCount) +} + // VerifyZeroMountDeviceCallCount ensures that all Attachers for this plugin // have a zero MountDeviceCallCount. Otherwise it returns an error. func VerifyZeroMountDeviceCallCount(fakeVolumePlugin *FakeVolumePlugin) error { diff --git a/pkg/volume/util/operationexecutor/operation_executor.go b/pkg/volume/util/operationexecutor/operation_executor.go index 92a4e4305a3..010299cee41 100644 --- a/pkg/volume/util/operationexecutor/operation_executor.go +++ b/pkg/volume/util/operationexecutor/operation_executor.go @@ -160,6 +160,7 @@ func NewOperationExecutor( } } +// MarkVolumeMountedOpts is an struct to pass arguments to MountVolume functions type MarkVolumeMountedOpts struct { PodName volumetypes.UniquePodName PodUID types.UID @@ -181,9 +182,13 @@ type ActualStateOfWorldMounterUpdater interface { // Marks the specified volume as unmounted from the specified pod MarkVolumeAsUnmounted(podName volumetypes.UniquePodName, volumeName v1.UniqueVolumeName) error + // MarkVolumeMountAsUncertain marks state of volume mount for the pod uncertain + MarkVolumeMountAsUncertain(markVolumeOpts MarkVolumeMountedOpts) error + // Marks the specified volume as having been globally mounted. MarkDeviceAsMounted(volumeName v1.UniqueVolumeName, devicePath, deviceMountPath string) error + // MarkDeviceAsUncertain marks device state in global mount path as uncertain MarkDeviceAsUncertain(volumeName v1.UniqueVolumeName, devicePath, deviceMountPath string) error // Marks the specified volume as having its global mount unmounted. @@ -372,10 +377,10 @@ type VolumeToMount struct { type DeviceMountState string const ( - // DeviceGloballymounted means device has been globally mounted successfully + // DeviceGloballyMounted means device has been globally mounted successfully DeviceGloballyMounted DeviceMountState = "DeviceGloballyMounted" - // Uncertain means device may not be mounted but a mount operation may be + // DeviceMountUncertain means device may not be mounted but a mount operation may be // in-progress which can cause device mount to succeed. DeviceMountUncertain DeviceMountState = "DeviceMountUncertain" @@ -387,10 +392,13 @@ const ( type VolumeMountState string const ( + // VolumeMounted means volume has been mounted in pod's local path VolumeMounted VolumeMountState = "VolumeMounted" + // VolumeMountUncertain means volume may or may not be mounted in pods' local path VolumeMountUncertain VolumeMountState = "VolumeMountUncertain" + // VolumeNotMounted means volume has not been mounted in pod's local path VolumeNotMounted VolumeMountState = "VolumeNotMounted" ) diff --git a/pkg/volume/util/operationexecutor/operation_generator.go b/pkg/volume/util/operationexecutor/operation_generator.go index 425a5b6f79a..2956ada9e66 100644 --- a/pkg/volume/util/operationexecutor/operation_generator.go +++ b/pkg/volume/util/operationexecutor/operation_generator.go @@ -580,6 +580,12 @@ func (og *operationGenerator) GenerateMountVolumeFunc( devicePath, deviceMountPath) if err != nil { + if volumetypes.IsOperationTimeOutError(err) { + markDeviceUncertainError := actualStateOfWorld.MarkDeviceAsUncertain(volumeToMount.VolumeName, devicePath, deviceMountPath) + if markDeviceUncertainError != nil { + klog.Infof("MountVolume.MarkDeviceAsUncertain failed with %v", markDeviceUncertainError) + } + } // On failure, return error. Caller will log and retry. return volumeToMount.GenerateError("MountVolume.MountDevice failed", err) } @@ -621,7 +627,25 @@ func (og *operationGenerator) GenerateMountVolumeFunc( FsGroup: fsGroup, DesiredSize: volumeToMount.DesiredSizeLimit, }) + // Update actual state of world + markOpts := MarkVolumeMountedOpts{ + PodName: volumeToMount.PodName, + PodUID: volumeToMount.Pod.UID, + VolumeName: volumeToMount.VolumeName, + Mounter: volumeMounter, + OuterVolumeSpecName: volumeToMount.OuterVolumeSpecName, + VolumeGidVolume: volumeToMount.VolumeGidValue, + VolumeSpec: originalSpec, + VolumeMountState: VolumeMounted, + } if mountErr != nil { + if volumetypes.IsOperationTimeOutError(mountErr) { + markOpts.VolumeMountState = VolumeMountUncertain + t := actualStateOfWorld.MarkVolumeMountAsUncertain(markOpts) + if t != nil { + klog.Errorf("MountVolume.MarkVolumeMountAsUncertain failed: %v", t) + } + } // On failure, return error. Caller will log and retry. return volumeToMount.GenerateError("MountVolume.SetUp failed", mountErr) } diff --git a/pkg/volume/util/types/types.go b/pkg/volume/util/types/types.go index 2811cd1dc6c..425e5b47f6d 100644 --- a/pkg/volume/util/types/types.go +++ b/pkg/volume/util/types/types.go @@ -51,6 +51,31 @@ func (o *GeneratedOperations) Run() (eventErr, detailedErr error) { return o.OperationFunc() } +// OperationTimedOutError indicates a particular volume operation has timed out. +type OperationTimedOutError struct { + msg string +} + +func (err *OperationTimedOutError) Error() string { + return err.msg +} + +// NewOperationTimedOutError returns a new instance of OperationTimedOutError +func NewOperationTimedOutError(msg string) *OperationTimedOutError { + return &OperationTimedOutError{ + msg: msg, + } +} + +// IsOperationTimeOutError returns true if volume operation could have timed out for client but possibly +// still running or being processed by the volume plugin. +func IsOperationTimeOutError(err error) bool { + if _, ok := err.(*OperationTimedOutError); ok { + return true + } + return false +} + const ( // VolumeResizerKey is key that will be used to store resizer used // for resizing PVC. The generated key/value pair will be added