mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-30 23:15:14 +00:00
Add code to mark volume as uncertain
Update bazel files Add tests for volume mounts in uncertain state
This commit is contained in:
parent
a795f3de88
commit
34a6007dfe
@ -173,6 +173,7 @@ type AttachedVolume struct {
|
||||
DeviceMountState operationexecutor.DeviceMountState
|
||||
}
|
||||
|
||||
// DeviceMayBeMounted returns true if device may be mounted in global path.
|
||||
func (av AttachedVolume) DeviceMayBeMounted() bool {
|
||||
return av.DeviceMountState == operationexecutor.DeviceGloballyMounted ||
|
||||
av.DeviceMountState == operationexecutor.DeviceMountUncertain
|
||||
@ -252,11 +253,6 @@ type attachedVolume struct {
|
||||
// this volume implements the volume.Attacher interface
|
||||
pluginIsAttachable bool
|
||||
|
||||
// globallyMounted indicates that the volume is mounted to the underlying
|
||||
// device at a global mount point. This global mount point must be unmounted
|
||||
// prior to detach.
|
||||
globallyMounted bool
|
||||
|
||||
// deviceMountState stores information that tells us if device is mounted
|
||||
// globally or not
|
||||
deviceMountState operationexecutor.DeviceMountState
|
||||
@ -313,11 +309,11 @@ type mountedPod struct {
|
||||
// mounted to this pod but its size has been expanded after that.
|
||||
fsResizeRequired bool
|
||||
|
||||
// volumeMounted stores state of volume mount for the pod. if it is:
|
||||
// volumeMountStateForPod stores state of volume mount for the pod. if it is:
|
||||
// - VolumeMounted: means volume for pod has been successfully mounted
|
||||
// - VolumeMountUncertain: means volume for pod may not be mounted, but it must be unmounted
|
||||
// - VolumeNotMounted: means volume for pod has not been mounted
|
||||
volumeMounted operationexecutor.VolumeMountState
|
||||
volumeMountStateForPod operationexecutor.VolumeMountState
|
||||
}
|
||||
|
||||
func (asw *actualStateOfWorld) MarkVolumeAsAttached(
|
||||
@ -363,6 +359,11 @@ func (asw *actualStateOfWorld) MarkDeviceAsUncertain(
|
||||
return asw.SetDeviceMountState(volumeName, operationexecutor.DeviceMountUncertain, devicePath, deviceMountPath)
|
||||
}
|
||||
|
||||
func (asw *actualStateOfWorld) MarkVolumeMountAsUncertain(markVolumeOpts operationexecutor.MarkVolumeMountedOpts) error {
|
||||
markVolumeOpts.VolumeMountState = operationexecutor.VolumeMountUncertain
|
||||
return asw.AddPodToVolume(markVolumeOpts)
|
||||
}
|
||||
|
||||
func (asw *actualStateOfWorld) MarkDeviceAsUnmounted(
|
||||
volumeName v1.UniqueVolumeName) error {
|
||||
return asw.SetDeviceMountState(volumeName, operationexecutor.DeviceNotMounted, "", "")
|
||||
@ -448,14 +449,14 @@ func (asw *actualStateOfWorld) AddPodToVolume(markVolumeOpts operationexecutor.M
|
||||
podObj, podExists := volumeObj.mountedPods[podName]
|
||||
if !podExists {
|
||||
podObj = mountedPod{
|
||||
podName: podName,
|
||||
podUID: podUID,
|
||||
mounter: mounter,
|
||||
blockVolumeMapper: blockVolumeMapper,
|
||||
outerVolumeSpecName: outerVolumeSpecName,
|
||||
volumeGidValue: volumeGidValue,
|
||||
volumeSpec: volumeSpec,
|
||||
volumeMounted: markVolumeOpts.VolumeMountState,
|
||||
podName: podName,
|
||||
podUID: podUID,
|
||||
mounter: mounter,
|
||||
blockVolumeMapper: blockVolumeMapper,
|
||||
outerVolumeSpecName: outerVolumeSpecName,
|
||||
volumeGidValue: volumeGidValue,
|
||||
volumeSpec: volumeSpec,
|
||||
volumeMountStateForPod: markVolumeOpts.VolumeMountState,
|
||||
}
|
||||
}
|
||||
|
||||
@ -674,7 +675,7 @@ func (asw *actualStateOfWorld) GetMountedVolumes() []MountedVolume {
|
||||
mountedVolume := make([]MountedVolume, 0 /* len */, len(asw.attachedVolumes) /* cap */)
|
||||
for _, volumeObj := range asw.attachedVolumes {
|
||||
for _, podObj := range volumeObj.mountedPods {
|
||||
if podObj.volumeMounted == operationexecutor.VolumeMounted {
|
||||
if podObj.volumeMountStateForPod == operationexecutor.VolumeMounted {
|
||||
mountedVolume = append(
|
||||
mountedVolume,
|
||||
getMountedVolume(&podObj, &volumeObj))
|
||||
@ -684,14 +685,15 @@ func (asw *actualStateOfWorld) GetMountedVolumes() []MountedVolume {
|
||||
return mountedVolume
|
||||
}
|
||||
|
||||
// GetAllMountedVolumes returns all volumes which could be locally mounted for a pod.
|
||||
func (asw *actualStateOfWorld) GetAllMountedVolumes() []MountedVolume {
|
||||
asw.RLock()
|
||||
defer asw.RUnlock()
|
||||
mountedVolume := make([]MountedVolume, 0 /* len */, len(asw.attachedVolumes) /* cap */)
|
||||
for _, volumeObj := range asw.attachedVolumes {
|
||||
for _, podObj := range volumeObj.mountedPods {
|
||||
if podObj.volumeMounted == operationexecutor.VolumeMounted ||
|
||||
podObj.volumeMounted == operationexecutor.VolumeMountUncertain {
|
||||
if podObj.volumeMountStateForPod == operationexecutor.VolumeMounted ||
|
||||
podObj.volumeMountStateForPod == operationexecutor.VolumeMountUncertain {
|
||||
mountedVolume = append(
|
||||
mountedVolume,
|
||||
getMountedVolume(&podObj, &volumeObj))
|
||||
@ -710,7 +712,7 @@ func (asw *actualStateOfWorld) GetMountedVolumesForPod(
|
||||
mountedVolume := make([]MountedVolume, 0 /* len */, len(asw.attachedVolumes) /* cap */)
|
||||
for _, volumeObj := range asw.attachedVolumes {
|
||||
for mountedPodName, podObj := range volumeObj.mountedPods {
|
||||
if mountedPodName == podName && podObj.volumeMounted == operationexecutor.VolumeMounted {
|
||||
if mountedPodName == podName && podObj.volumeMountStateForPod == operationexecutor.VolumeMounted {
|
||||
mountedVolume = append(
|
||||
mountedVolume,
|
||||
getMountedVolume(&podObj, &volumeObj))
|
||||
|
@ -86,6 +86,7 @@ func TestMetricCollection(t *testing.T) {
|
||||
BlockVolumeMapper: mapper,
|
||||
OuterVolumeSpecName: volumeSpec.Name(),
|
||||
VolumeSpec: volumeSpec,
|
||||
VolumeMountState: operationexecutor.VolumeMounted,
|
||||
}
|
||||
err = asw.AddPodToVolume(markVolumeOpts)
|
||||
if err != nil {
|
||||
|
@ -167,7 +167,9 @@ func (rc *reconciler) reconcile() {
|
||||
rc.unmountVolumes()
|
||||
|
||||
// Next we mount required volumes. This function could also trigger
|
||||
// detach if kubelet is responsible for detaching volumes.
|
||||
// attach if kubelet is responsible for attaching volumes.
|
||||
// If underlying PVC was resized while in-use then this function also handles volume
|
||||
// resizing.
|
||||
rc.mountAttachVolumes()
|
||||
|
||||
// Ensure devices that should be detached/unmounted are detached/unmounted.
|
||||
|
@ -336,7 +336,7 @@ func Test_Run_Positive_VolumeAttachMountUnmountDetach(t *testing.T) {
|
||||
|
||||
// Act
|
||||
dsw.DeletePodFromVolume(podName, generatedVolumeName)
|
||||
waitForDetach(t, fakePlugin, generatedVolumeName, asw)
|
||||
waitForDetach(t, generatedVolumeName, asw)
|
||||
|
||||
// Assert
|
||||
assert.NoError(t, volumetesting.VerifyTearDownCallCount(
|
||||
@ -428,7 +428,7 @@ func Test_Run_Positive_VolumeUnmountControllerAttachEnabled(t *testing.T) {
|
||||
|
||||
// Act
|
||||
dsw.DeletePodFromVolume(podName, generatedVolumeName)
|
||||
waitForDetach(t, fakePlugin, generatedVolumeName, asw)
|
||||
waitForDetach(t, generatedVolumeName, asw)
|
||||
|
||||
// Assert
|
||||
assert.NoError(t, volumetesting.VerifyTearDownCallCount(
|
||||
@ -739,7 +739,7 @@ func Test_Run_Positive_BlockVolumeAttachMapUnmapDetach(t *testing.T) {
|
||||
|
||||
// Act
|
||||
dsw.DeletePodFromVolume(podName, generatedVolumeName)
|
||||
waitForDetach(t, fakePlugin, generatedVolumeName, asw)
|
||||
waitForDetach(t, generatedVolumeName, asw)
|
||||
|
||||
// Assert
|
||||
assert.NoError(t, volumetesting.VerifyTearDownDeviceCallCount(
|
||||
@ -855,7 +855,7 @@ func Test_Run_Positive_VolumeUnmapControllerAttachEnabled(t *testing.T) {
|
||||
|
||||
// Act
|
||||
dsw.DeletePodFromVolume(podName, generatedVolumeName)
|
||||
waitForDetach(t, fakePlugin, generatedVolumeName, asw)
|
||||
waitForDetach(t, generatedVolumeName, asw)
|
||||
|
||||
// Assert
|
||||
assert.NoError(t, volumetesting.VerifyTearDownDeviceCallCount(
|
||||
@ -1145,6 +1145,237 @@ func Test_Run_Positive_VolumeFSResizeControllerAttachEnabled(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
func Test_UncertainDeviceGlobalMounts(t *testing.T) {
|
||||
fsMode := v1.PersistentVolumeFilesystem
|
||||
pv := &v1.PersistentVolume{
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
Name: volumetesting.TimeoutOnMountDeviceVolumeName,
|
||||
UID: "pvuid",
|
||||
},
|
||||
Spec: v1.PersistentVolumeSpec{
|
||||
ClaimRef: &v1.ObjectReference{Name: "pvc"},
|
||||
VolumeMode: &fsMode,
|
||||
},
|
||||
}
|
||||
pvc := &v1.PersistentVolumeClaim{
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
Name: "pvc",
|
||||
UID: "pvcuid",
|
||||
},
|
||||
Spec: v1.PersistentVolumeClaimSpec{
|
||||
VolumeName: volumetesting.TimeoutOnMountDeviceVolumeName,
|
||||
},
|
||||
}
|
||||
pod := &v1.Pod{
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
Name: "pod1",
|
||||
UID: "pod1uid",
|
||||
},
|
||||
Spec: v1.PodSpec{
|
||||
Volumes: []v1.Volume{
|
||||
{
|
||||
Name: "volume-name",
|
||||
VolumeSource: v1.VolumeSource{
|
||||
PersistentVolumeClaim: &v1.PersistentVolumeClaimVolumeSource{
|
||||
ClaimName: pvc.Name,
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
volumePluginMgr, fakePlugin := volumetesting.GetTestVolumePluginMgr(t)
|
||||
dsw := cache.NewDesiredStateOfWorld(volumePluginMgr)
|
||||
asw := cache.NewActualStateOfWorld(nodeName, volumePluginMgr)
|
||||
kubeClient := createtestClientWithPVPVC(pv, pvc, v1.AttachedVolume{
|
||||
Name: v1.UniqueVolumeName(fmt.Sprintf("fake-plugin/%s", volumetesting.TimeoutOnMountDeviceVolumeName)),
|
||||
DevicePath: "fake/path",
|
||||
})
|
||||
fakeRecorder := &record.FakeRecorder{}
|
||||
fakeHandler := volumetesting.NewBlockVolumePathHandler()
|
||||
oex := operationexecutor.NewOperationExecutor(operationexecutor.NewOperationGenerator(
|
||||
kubeClient,
|
||||
volumePluginMgr,
|
||||
fakeRecorder,
|
||||
false, /* checkNodeCapabilitiesBeforeMount */
|
||||
fakeHandler))
|
||||
|
||||
reconciler := NewReconciler(
|
||||
kubeClient,
|
||||
true, /* controllerAttachDetachEnabled */
|
||||
reconcilerLoopSleepDuration,
|
||||
waitForAttachTimeout,
|
||||
nodeName,
|
||||
dsw,
|
||||
asw,
|
||||
hasAddedPods,
|
||||
oex,
|
||||
&mount.FakeMounter{},
|
||||
hostutil.NewFakeHostUtil(nil),
|
||||
volumePluginMgr,
|
||||
kubeletPodsDir)
|
||||
volumeSpec := &volume.Spec{PersistentVolume: pv}
|
||||
podName := util.GetUniquePodName(pod)
|
||||
volumeName, err := dsw.AddPodToVolume(
|
||||
podName, pod, volumeSpec, volumeSpec.Name(), "" /* volumeGidValue */)
|
||||
// Assert
|
||||
if err != nil {
|
||||
t.Fatalf("AddPodToVolume failed. Expected: <no error> Actual: <%v>", err)
|
||||
}
|
||||
dsw.MarkVolumesReportedInUse([]v1.UniqueVolumeName{volumeName})
|
||||
|
||||
// Start the reconciler to fill ASW.
|
||||
stopChan, stoppedChan := make(chan struct{}), make(chan struct{})
|
||||
go func() {
|
||||
reconciler.Run(stopChan)
|
||||
close(stoppedChan)
|
||||
}()
|
||||
waitForVolumeToExistInASW(t, volumeName, asw)
|
||||
waitForUncertainGlobalMount(t, volumeName, asw)
|
||||
|
||||
dsw.DeletePodFromVolume(podName, volumeName)
|
||||
waitForDetach(t, volumeName, asw)
|
||||
|
||||
volumetesting.VerifyUnmountDeviceCallCount(1, fakePlugin)
|
||||
}
|
||||
|
||||
func Test_UncertainVolumeMountState(t *testing.T) {
|
||||
fsMode := v1.PersistentVolumeFilesystem
|
||||
pv := &v1.PersistentVolume{
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
Name: volumetesting.TimeoutOnSetupVolumeName,
|
||||
UID: "pvuid",
|
||||
},
|
||||
Spec: v1.PersistentVolumeSpec{
|
||||
ClaimRef: &v1.ObjectReference{Name: "pvc"},
|
||||
VolumeMode: &fsMode,
|
||||
},
|
||||
}
|
||||
pvc := &v1.PersistentVolumeClaim{
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
Name: "pvc",
|
||||
UID: "pvcuid",
|
||||
},
|
||||
Spec: v1.PersistentVolumeClaimSpec{
|
||||
VolumeName: volumetesting.TimeoutOnSetupVolumeName,
|
||||
},
|
||||
}
|
||||
pod := &v1.Pod{
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
Name: "pod1",
|
||||
UID: "pod1uid",
|
||||
},
|
||||
Spec: v1.PodSpec{
|
||||
Volumes: []v1.Volume{
|
||||
{
|
||||
Name: "volume-name",
|
||||
VolumeSource: v1.VolumeSource{
|
||||
PersistentVolumeClaim: &v1.PersistentVolumeClaimVolumeSource{
|
||||
ClaimName: pvc.Name,
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
volumePluginMgr, fakePlugin := volumetesting.GetTestVolumePluginMgr(t)
|
||||
dsw := cache.NewDesiredStateOfWorld(volumePluginMgr)
|
||||
asw := cache.NewActualStateOfWorld(nodeName, volumePluginMgr)
|
||||
kubeClient := createtestClientWithPVPVC(pv, pvc, v1.AttachedVolume{
|
||||
Name: v1.UniqueVolumeName(fmt.Sprintf("fake-plugin/%s", volumetesting.TimeoutOnSetupVolumeName)),
|
||||
DevicePath: "fake/path",
|
||||
})
|
||||
fakeRecorder := &record.FakeRecorder{}
|
||||
fakeHandler := volumetesting.NewBlockVolumePathHandler()
|
||||
oex := operationexecutor.NewOperationExecutor(operationexecutor.NewOperationGenerator(
|
||||
kubeClient,
|
||||
volumePluginMgr,
|
||||
fakeRecorder,
|
||||
false, /* checkNodeCapabilitiesBeforeMount */
|
||||
fakeHandler))
|
||||
|
||||
reconciler := NewReconciler(
|
||||
kubeClient,
|
||||
true, /* controllerAttachDetachEnabled */
|
||||
reconcilerLoopSleepDuration,
|
||||
waitForAttachTimeout,
|
||||
nodeName,
|
||||
dsw,
|
||||
asw,
|
||||
hasAddedPods,
|
||||
oex,
|
||||
&mount.FakeMounter{},
|
||||
hostutil.NewFakeHostUtil(nil),
|
||||
volumePluginMgr,
|
||||
kubeletPodsDir)
|
||||
volumeSpec := &volume.Spec{PersistentVolume: pv}
|
||||
podName := util.GetUniquePodName(pod)
|
||||
volumeName, err := dsw.AddPodToVolume(
|
||||
podName, pod, volumeSpec, volumeSpec.Name(), "" /* volumeGidValue */)
|
||||
// Assert
|
||||
if err != nil {
|
||||
t.Fatalf("AddPodToVolume failed. Expected: <no error> Actual: <%v>", err)
|
||||
}
|
||||
dsw.MarkVolumesReportedInUse([]v1.UniqueVolumeName{volumeName})
|
||||
|
||||
// Start the reconciler to fill ASW.
|
||||
stopChan, stoppedChan := make(chan struct{}), make(chan struct{})
|
||||
go func() {
|
||||
reconciler.Run(stopChan)
|
||||
close(stoppedChan)
|
||||
}()
|
||||
waitForVolumeToExistInASW(t, volumeName, asw)
|
||||
waitForUncertainPodMount(t, volumeName, asw)
|
||||
|
||||
dsw.DeletePodFromVolume(podName, volumeName)
|
||||
waitForDetach(t, volumeName, asw)
|
||||
|
||||
volumetesting.VerifyUnmountDeviceCallCount(1, fakePlugin)
|
||||
volumetesting.VerifyTearDownCallCount(1, fakePlugin)
|
||||
}
|
||||
|
||||
func waitForUncertainGlobalMount(t *testing.T, volumeName v1.UniqueVolumeName, asw cache.ActualStateOfWorld) {
|
||||
// check if volume is globally mounted in uncertain state
|
||||
err := retryWithExponentialBackOff(
|
||||
time.Duration(500*time.Millisecond),
|
||||
func() (bool, error) {
|
||||
unmountedVolumes := asw.GetUnmountedVolumes()
|
||||
for _, v := range unmountedVolumes {
|
||||
if v.VolumeName == volumeName && v.DeviceMountState == operationexecutor.DeviceMountUncertain {
|
||||
return true, nil
|
||||
}
|
||||
}
|
||||
return false, nil
|
||||
},
|
||||
)
|
||||
|
||||
if err != nil {
|
||||
t.Fatalf("expected volumes %s to be mounted in uncertain state globally", volumeName)
|
||||
}
|
||||
}
|
||||
|
||||
func waitForUncertainPodMount(t *testing.T, volumeName v1.UniqueVolumeName, asw cache.ActualStateOfWorld) {
|
||||
// check if volume is locally pod mounted in uncertain state
|
||||
err := retryWithExponentialBackOff(
|
||||
time.Duration(500*time.Millisecond),
|
||||
func() (bool, error) {
|
||||
allMountedVolumes := asw.GetAllMountedVolumes()
|
||||
for _, v := range allMountedVolumes {
|
||||
if v.VolumeName == volumeName {
|
||||
return true, nil
|
||||
}
|
||||
}
|
||||
return false, nil
|
||||
},
|
||||
)
|
||||
|
||||
if err != nil {
|
||||
t.Fatalf("expected volumes %s to be mounted in uncertain state for pod", volumeName)
|
||||
}
|
||||
}
|
||||
|
||||
func waitForMount(
|
||||
t *testing.T,
|
||||
fakePlugin *volumetesting.FakeVolumePlugin,
|
||||
@ -1169,9 +1400,23 @@ func waitForMount(
|
||||
}
|
||||
}
|
||||
|
||||
func waitForVolumeToExistInASW(t *testing.T, volumeName v1.UniqueVolumeName, asw cache.ActualStateOfWorld) {
|
||||
err := retryWithExponentialBackOff(
|
||||
time.Duration(500*time.Millisecond),
|
||||
func() (bool, error) {
|
||||
if asw.VolumeExists(volumeName) {
|
||||
return true, nil
|
||||
}
|
||||
return false, nil
|
||||
},
|
||||
)
|
||||
if err != nil {
|
||||
t.Fatalf("Timed out waiting for volume %q to be exist in asw.", volumeName)
|
||||
}
|
||||
}
|
||||
|
||||
func waitForDetach(
|
||||
t *testing.T,
|
||||
fakePlugin *volumetesting.FakeVolumePlugin,
|
||||
volumeName v1.UniqueVolumeName,
|
||||
asw cache.ActualStateOfWorld) {
|
||||
err := retryWithExponentialBackOff(
|
||||
|
@ -20,6 +20,7 @@ go_library(
|
||||
"//pkg/volume:go_default_library",
|
||||
"//pkg/volume/csi/nodeinfomanager:go_default_library",
|
||||
"//pkg/volume/util:go_default_library",
|
||||
"//pkg/volume/util/types:go_default_library",
|
||||
"//staging/src/k8s.io/api/core/v1:go_default_library",
|
||||
"//staging/src/k8s.io/api/storage/v1:go_default_library",
|
||||
"//staging/src/k8s.io/api/storage/v1beta1:go_default_library",
|
||||
@ -37,6 +38,8 @@ go_library(
|
||||
"//staging/src/k8s.io/csi-translation-lib/plugins:go_default_library",
|
||||
"//vendor/github.com/container-storage-interface/spec/lib/go/csi:go_default_library",
|
||||
"//vendor/google.golang.org/grpc:go_default_library",
|
||||
"//vendor/google.golang.org/grpc/codes:go_default_library",
|
||||
"//vendor/google.golang.org/grpc/status:go_default_library",
|
||||
"//vendor/k8s.io/klog:go_default_library",
|
||||
"//vendor/k8s.io/utils/strings:go_default_library",
|
||||
],
|
||||
|
@ -36,6 +36,7 @@ import (
|
||||
"k8s.io/apimachinery/pkg/watch"
|
||||
"k8s.io/client-go/kubernetes"
|
||||
"k8s.io/kubernetes/pkg/volume"
|
||||
volumetypes "k8s.io/kubernetes/pkg/volume/util/types"
|
||||
)
|
||||
|
||||
const (
|
||||
@ -265,7 +266,8 @@ func (c *csiAttacher) MountDevice(spec *volume.Spec, devicePath string, deviceMo
|
||||
return err
|
||||
}
|
||||
defer func() {
|
||||
if err != nil {
|
||||
// Only for non-timedout errors remove the mount directory
|
||||
if err != nil && !volumetypes.IsOperationTimeOutError(err) {
|
||||
// clean up metadata
|
||||
klog.Errorf(log("attacher.MountDevice failed: %v", err))
|
||||
if err := removeMountDir(c.plugin, deviceMountPath); err != nil {
|
||||
|
@ -28,11 +28,14 @@ import (
|
||||
|
||||
csipbv1 "github.com/container-storage-interface/spec/lib/go/csi"
|
||||
"google.golang.org/grpc"
|
||||
"google.golang.org/grpc/codes"
|
||||
"google.golang.org/grpc/status"
|
||||
api "k8s.io/api/core/v1"
|
||||
"k8s.io/apimachinery/pkg/api/resource"
|
||||
"k8s.io/apimachinery/pkg/util/wait"
|
||||
"k8s.io/klog"
|
||||
"k8s.io/kubernetes/pkg/volume"
|
||||
volumetypes "k8s.io/kubernetes/pkg/volume/util/types"
|
||||
)
|
||||
|
||||
type csiClient interface {
|
||||
@ -213,6 +216,7 @@ func (c *csiDriverClient) NodePublishVolume(
|
||||
if targetPath == "" {
|
||||
return errors.New("missing target path")
|
||||
}
|
||||
|
||||
if c.nodeV1ClientCreator == nil {
|
||||
return errors.New("failed to call NodePublishVolume. nodeV1ClientCreator is nil")
|
||||
|
||||
@ -255,7 +259,10 @@ func (c *csiDriverClient) NodePublishVolume(
|
||||
}
|
||||
|
||||
_, err = nodeClient.NodePublishVolume(ctx, req)
|
||||
return err
|
||||
if err != nil && !isFinalError(err) {
|
||||
return volumetypes.NewOperationTimedOutError(err.Error())
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (c *csiDriverClient) NodeExpandVolume(ctx context.Context, volumeID, volumePath string, newSize resource.Quantity) (resource.Quantity, error) {
|
||||
@ -374,6 +381,9 @@ func (c *csiDriverClient) NodeStageVolume(ctx context.Context,
|
||||
}
|
||||
|
||||
_, err = nodeClient.NodeStageVolume(ctx, req)
|
||||
if err != nil && !isFinalError(err) {
|
||||
return volumetypes.NewOperationTimedOutError(err.Error())
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
||||
@ -613,3 +623,27 @@ func (c *csiDriverClient) NodeGetVolumeStats(ctx context.Context, volID string,
|
||||
}
|
||||
return metrics, nil
|
||||
}
|
||||
|
||||
func isFinalError(err error) bool {
|
||||
// Sources:
|
||||
// https://github.com/grpc/grpc/blob/master/doc/statuscodes.md
|
||||
// https://github.com/container-storage-interface/spec/blob/master/spec.md
|
||||
st, ok := status.FromError(err)
|
||||
if !ok {
|
||||
// This is not gRPC error. The operation must have failed before gRPC
|
||||
// method was called, otherwise we would get gRPC error.
|
||||
// We don't know if any previous CreateVolume is in progress, be on the safe side.
|
||||
return false
|
||||
}
|
||||
switch st.Code() {
|
||||
case codes.Canceled, // gRPC: Client Application cancelled the request
|
||||
codes.DeadlineExceeded, // gRPC: Timeout
|
||||
codes.Unavailable, // gRPC: Server shutting down, TCP connection broken - previous CreateVolume() may be still in progress.
|
||||
codes.ResourceExhausted, // gRPC: Server temporarily out of resources - previous CreateVolume() may be still in progress.
|
||||
codes.Aborted: // CSI: Operation pending for volume
|
||||
return false
|
||||
}
|
||||
// All other errors mean that provisioning either did not
|
||||
// even start or failed. It is for sure not in progress.
|
||||
return true
|
||||
}
|
||||
|
@ -36,6 +36,7 @@ import (
|
||||
"k8s.io/client-go/kubernetes"
|
||||
"k8s.io/kubernetes/pkg/features"
|
||||
"k8s.io/kubernetes/pkg/volume"
|
||||
volumetypes "k8s.io/kubernetes/pkg/volume/util/types"
|
||||
utilstrings "k8s.io/utils/strings"
|
||||
)
|
||||
|
||||
@ -254,8 +255,11 @@ func (c *csiMountMgr) SetUpAt(dir string, mounterArgs volume.MounterArgs) error
|
||||
)
|
||||
|
||||
if err != nil {
|
||||
if removeMountDirErr := removeMountDir(c.plugin, dir); removeMountDirErr != nil {
|
||||
klog.Error(log("mounter.SetupAt failed to remove mount dir after a NodePublish() error [%s]: %v", dir, removeMountDirErr))
|
||||
// If error is not of type time out then we can remove the mount directory
|
||||
if volumetypes.IsOperationTimeOutError(err) {
|
||||
if removeMountDirErr := removeMountDir(c.plugin, dir); removeMountDirErr != nil {
|
||||
klog.Error(log("mounter.SetupAt failed to remove mount dir after a NodePublish() error [%s]: %v", dir, removeMountDirErr))
|
||||
}
|
||||
}
|
||||
return errors.New(log("mounter.SetupAt failed: %v", err))
|
||||
}
|
||||
|
@ -18,6 +18,7 @@ go_library(
|
||||
"//pkg/volume/util/hostutil:go_default_library",
|
||||
"//pkg/volume/util/recyclerclient:go_default_library",
|
||||
"//pkg/volume/util/subpath:go_default_library",
|
||||
"//pkg/volume/util/types:go_default_library",
|
||||
"//pkg/volume/util/volumepathhandler:go_default_library",
|
||||
"//staging/src/k8s.io/api/authentication/v1:go_default_library",
|
||||
"//staging/src/k8s.io/api/core/v1:go_default_library",
|
||||
|
@ -51,6 +51,7 @@ import (
|
||||
"k8s.io/kubernetes/pkg/volume/util/hostutil"
|
||||
"k8s.io/kubernetes/pkg/volume/util/recyclerclient"
|
||||
"k8s.io/kubernetes/pkg/volume/util/subpath"
|
||||
volumetypes "k8s.io/kubernetes/pkg/volume/util/types"
|
||||
"k8s.io/kubernetes/pkg/volume/util/volumepathhandler"
|
||||
)
|
||||
|
||||
@ -66,6 +67,10 @@ const (
|
||||
TimeoutAttachNode = "timeout-attach-node"
|
||||
// The node is marked as multi-attach which means it is allowed to attach the volume to multiple nodes.
|
||||
MultiAttachNode = "multi-attach-node"
|
||||
// TimeoutOnSetupVolumeName will cause Setup call to timeout but volume will finish mounting.
|
||||
TimeoutOnSetupVolumeName = "timeout-setup-volume"
|
||||
// TimeoutOnMountDeviceVolumeName will cause MountDevice call to timeout but Setup will finish.
|
||||
TimeoutOnMountDeviceVolumeName = "timeout-mount-device-volume"
|
||||
)
|
||||
|
||||
// fakeVolumeHost is useful for testing volume plugins.
|
||||
@ -835,6 +840,9 @@ func (fv *FakeVolume) CanMount() error {
|
||||
func (fv *FakeVolume) SetUp(mounterArgs MounterArgs) error {
|
||||
fv.Lock()
|
||||
defer fv.Unlock()
|
||||
if fv.VolName == TimeoutOnSetupVolumeName {
|
||||
return volumetypes.NewOperationTimedOutError("time out on setup")
|
||||
}
|
||||
fv.SetUpCallCount++
|
||||
return fv.SetUpAt(fv.getPath(), mounterArgs)
|
||||
}
|
||||
@ -1039,6 +1047,9 @@ func (fv *FakeVolume) GetDeviceMountPath(spec *Spec) (string, error) {
|
||||
func (fv *FakeVolume) MountDevice(spec *Spec, devicePath string, deviceMountPath string) error {
|
||||
fv.Lock()
|
||||
defer fv.Unlock()
|
||||
if spec.Name() == TimeoutOnMountDeviceVolumeName {
|
||||
return volumetypes.NewOperationTimedOutError("error mounting device")
|
||||
}
|
||||
fv.MountDeviceCallCount++
|
||||
return nil
|
||||
}
|
||||
@ -1049,6 +1060,12 @@ func (fv *FakeVolume) GetMountDeviceCallCount() int {
|
||||
return fv.MountDeviceCallCount
|
||||
}
|
||||
|
||||
func (fv *FakeVolume) GetUnmountDeviceCallCount() int {
|
||||
fv.RLock()
|
||||
defer fv.RUnlock()
|
||||
return fv.UnmountDeviceCallCount
|
||||
}
|
||||
|
||||
func (fv *FakeVolume) Detach(volumeName string, nodeName types.NodeName) error {
|
||||
fv.Lock()
|
||||
defer fv.Unlock()
|
||||
@ -1304,6 +1321,19 @@ func VerifyMountDeviceCallCount(
|
||||
expectedMountDeviceCallCount)
|
||||
}
|
||||
|
||||
func VerifyUnmountDeviceCallCount(expectedCallCount int, fakeVolumePlugin *FakeVolumePlugin) error {
|
||||
for _, attacher := range fakeVolumePlugin.GetAttachers() {
|
||||
actualCallCount := attacher.GetUnmountDeviceCallCount()
|
||||
if actualCallCount >= expectedCallCount {
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
||||
return fmt.Errorf(
|
||||
"No Attachers have expected MountunDeviceCallCount. Expected: <%v>.",
|
||||
expectedCallCount)
|
||||
}
|
||||
|
||||
// VerifyZeroMountDeviceCallCount ensures that all Attachers for this plugin
|
||||
// have a zero MountDeviceCallCount. Otherwise it returns an error.
|
||||
func VerifyZeroMountDeviceCallCount(fakeVolumePlugin *FakeVolumePlugin) error {
|
||||
|
@ -160,6 +160,7 @@ func NewOperationExecutor(
|
||||
}
|
||||
}
|
||||
|
||||
// MarkVolumeMountedOpts is an struct to pass arguments to MountVolume functions
|
||||
type MarkVolumeMountedOpts struct {
|
||||
PodName volumetypes.UniquePodName
|
||||
PodUID types.UID
|
||||
@ -181,9 +182,13 @@ type ActualStateOfWorldMounterUpdater interface {
|
||||
// Marks the specified volume as unmounted from the specified pod
|
||||
MarkVolumeAsUnmounted(podName volumetypes.UniquePodName, volumeName v1.UniqueVolumeName) error
|
||||
|
||||
// MarkVolumeMountAsUncertain marks state of volume mount for the pod uncertain
|
||||
MarkVolumeMountAsUncertain(markVolumeOpts MarkVolumeMountedOpts) error
|
||||
|
||||
// Marks the specified volume as having been globally mounted.
|
||||
MarkDeviceAsMounted(volumeName v1.UniqueVolumeName, devicePath, deviceMountPath string) error
|
||||
|
||||
// MarkDeviceAsUncertain marks device state in global mount path as uncertain
|
||||
MarkDeviceAsUncertain(volumeName v1.UniqueVolumeName, devicePath, deviceMountPath string) error
|
||||
|
||||
// Marks the specified volume as having its global mount unmounted.
|
||||
@ -372,10 +377,10 @@ type VolumeToMount struct {
|
||||
type DeviceMountState string
|
||||
|
||||
const (
|
||||
// DeviceGloballymounted means device has been globally mounted successfully
|
||||
// DeviceGloballyMounted means device has been globally mounted successfully
|
||||
DeviceGloballyMounted DeviceMountState = "DeviceGloballyMounted"
|
||||
|
||||
// Uncertain means device may not be mounted but a mount operation may be
|
||||
// DeviceMountUncertain means device may not be mounted but a mount operation may be
|
||||
// in-progress which can cause device mount to succeed.
|
||||
DeviceMountUncertain DeviceMountState = "DeviceMountUncertain"
|
||||
|
||||
@ -387,10 +392,13 @@ const (
|
||||
type VolumeMountState string
|
||||
|
||||
const (
|
||||
// VolumeMounted means volume has been mounted in pod's local path
|
||||
VolumeMounted VolumeMountState = "VolumeMounted"
|
||||
|
||||
// VolumeMountUncertain means volume may or may not be mounted in pods' local path
|
||||
VolumeMountUncertain VolumeMountState = "VolumeMountUncertain"
|
||||
|
||||
// VolumeNotMounted means volume has not been mounted in pod's local path
|
||||
VolumeNotMounted VolumeMountState = "VolumeNotMounted"
|
||||
)
|
||||
|
||||
|
@ -580,6 +580,12 @@ func (og *operationGenerator) GenerateMountVolumeFunc(
|
||||
devicePath,
|
||||
deviceMountPath)
|
||||
if err != nil {
|
||||
if volumetypes.IsOperationTimeOutError(err) {
|
||||
markDeviceUncertainError := actualStateOfWorld.MarkDeviceAsUncertain(volumeToMount.VolumeName, devicePath, deviceMountPath)
|
||||
if markDeviceUncertainError != nil {
|
||||
klog.Infof("MountVolume.MarkDeviceAsUncertain failed with %v", markDeviceUncertainError)
|
||||
}
|
||||
}
|
||||
// On failure, return error. Caller will log and retry.
|
||||
return volumeToMount.GenerateError("MountVolume.MountDevice failed", err)
|
||||
}
|
||||
@ -621,7 +627,25 @@ func (og *operationGenerator) GenerateMountVolumeFunc(
|
||||
FsGroup: fsGroup,
|
||||
DesiredSize: volumeToMount.DesiredSizeLimit,
|
||||
})
|
||||
// Update actual state of world
|
||||
markOpts := MarkVolumeMountedOpts{
|
||||
PodName: volumeToMount.PodName,
|
||||
PodUID: volumeToMount.Pod.UID,
|
||||
VolumeName: volumeToMount.VolumeName,
|
||||
Mounter: volumeMounter,
|
||||
OuterVolumeSpecName: volumeToMount.OuterVolumeSpecName,
|
||||
VolumeGidVolume: volumeToMount.VolumeGidValue,
|
||||
VolumeSpec: originalSpec,
|
||||
VolumeMountState: VolumeMounted,
|
||||
}
|
||||
if mountErr != nil {
|
||||
if volumetypes.IsOperationTimeOutError(mountErr) {
|
||||
markOpts.VolumeMountState = VolumeMountUncertain
|
||||
t := actualStateOfWorld.MarkVolumeMountAsUncertain(markOpts)
|
||||
if t != nil {
|
||||
klog.Errorf("MountVolume.MarkVolumeMountAsUncertain failed: %v", t)
|
||||
}
|
||||
}
|
||||
// On failure, return error. Caller will log and retry.
|
||||
return volumeToMount.GenerateError("MountVolume.SetUp failed", mountErr)
|
||||
}
|
||||
|
@ -51,6 +51,31 @@ func (o *GeneratedOperations) Run() (eventErr, detailedErr error) {
|
||||
return o.OperationFunc()
|
||||
}
|
||||
|
||||
// OperationTimedOutError indicates a particular volume operation has timed out.
|
||||
type OperationTimedOutError struct {
|
||||
msg string
|
||||
}
|
||||
|
||||
func (err *OperationTimedOutError) Error() string {
|
||||
return err.msg
|
||||
}
|
||||
|
||||
// NewOperationTimedOutError returns a new instance of OperationTimedOutError
|
||||
func NewOperationTimedOutError(msg string) *OperationTimedOutError {
|
||||
return &OperationTimedOutError{
|
||||
msg: msg,
|
||||
}
|
||||
}
|
||||
|
||||
// IsOperationTimeOutError returns true if volume operation could have timed out for client but possibly
|
||||
// still running or being processed by the volume plugin.
|
||||
func IsOperationTimeOutError(err error) bool {
|
||||
if _, ok := err.(*OperationTimedOutError); ok {
|
||||
return true
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
||||
const (
|
||||
// VolumeResizerKey is key that will be used to store resizer used
|
||||
// for resizing PVC. The generated key/value pair will be added
|
||||
|
Loading…
Reference in New Issue
Block a user