diff --git a/pkg/kubelet/volumemanager/cache/BUILD b/pkg/kubelet/volumemanager/cache/BUILD index 76a34e9f614..cfe44cf64a9 100644 --- a/pkg/kubelet/volumemanager/cache/BUILD +++ b/pkg/kubelet/volumemanager/cache/BUILD @@ -40,6 +40,7 @@ go_test( "//pkg/volume:go_default_library", "//pkg/volume/testing:go_default_library", "//pkg/volume/util:go_default_library", + "//pkg/volume/util/operationexecutor:go_default_library", "//pkg/volume/util/types:go_default_library", "//staging/src/k8s.io/api/core/v1:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library", diff --git a/pkg/kubelet/volumemanager/cache/actual_state_of_world.go b/pkg/kubelet/volumemanager/cache/actual_state_of_world.go index 08fe393e5a2..5d89aa5febb 100644 --- a/pkg/kubelet/volumemanager/cache/actual_state_of_world.go +++ b/pkg/kubelet/volumemanager/cache/actual_state_of_world.go @@ -59,7 +59,7 @@ type ActualStateOfWorld interface { // volume, reset the pod's remountRequired value. // If a volume with the name volumeName does not exist in the list of // attached volumes, an error is returned. - AddPodToVolume(podName volumetypes.UniquePodName, podUID types.UID, volumeName v1.UniqueVolumeName, mounter volume.Mounter, blockVolumeMapper volume.BlockVolumeMapper, outerVolumeSpecName string, volumeGidValue string, volumeSpec *volume.Spec) error + AddPodToVolume(operationexecutor.MarkVolumeMountedOpts) error // MarkRemountRequired marks each volume that is successfully attached and // mounted for the specified pod as requiring remount (if the plugin for the @@ -68,13 +68,13 @@ type ActualStateOfWorld interface { // pod update. MarkRemountRequired(podName volumetypes.UniquePodName) - // SetVolumeGloballyMounted sets the GloballyMounted value for the given - // volume. When set to true this value indicates that the volume is mounted - // to the underlying device at a global mount point. This global mount point - // must unmounted prior to detach. + // SetDeviceMountState sets device mount state for the given volume. When deviceMountState is set to DeviceGloballyMounted + // then device is mounted at a global mount point. When it is set to DeviceMountUncertain then also it means volume + // MAY be globally mounted at a global mount point. In both cases - the volume must be unmounted from + // global mount point prior to detach. // If a volume with the name volumeName does not exist in the list of // attached volumes, an error is returned. - SetVolumeGloballyMounted(volumeName v1.UniqueVolumeName, globallyMounted bool, devicePath, deviceMountPath string) error + SetDeviceMountState(volumeName v1.UniqueVolumeName, deviceMountState operationexecutor.DeviceMountState, devicePath, deviceMountPath string) error // DeletePodFromVolume removes the given pod from the given volume in the // cache indicating the volume has been successfully unmounted from the pod. @@ -127,6 +127,10 @@ type ActualStateOfWorld interface { // actual state of the world. GetMountedVolumes() []MountedVolume + // GetAllMountedVolumes returns list of all mounted volumes including + // those that are in VolumeMounted state and VolumeMountUncertain state. + GetAllMountedVolumes() []MountedVolume + // GetMountedVolumesForPod generates and returns a list of volumes that are // successfully attached and mounted for the specified pod based on the // current actual state of the world. @@ -165,10 +169,13 @@ type MountedVolume struct { type AttachedVolume struct { operationexecutor.AttachedVolume - // GloballyMounted indicates that the volume is mounted to the underlying - // device at a global mount point. This global mount point must unmounted - // prior to detach. - GloballyMounted bool + // DeviceMountState indicates if device has been globally mounted or is not. + DeviceMountState operationexecutor.DeviceMountState +} + +func (av AttachedVolume) DeviceMayBeMounted() bool { + return av.DeviceMountState == operationexecutor.DeviceGloballyMounted || + av.DeviceMountState == operationexecutor.DeviceMountUncertain } // NewActualStateOfWorld returns a new instance of ActualStateOfWorld. @@ -250,6 +257,10 @@ type attachedVolume struct { // prior to detach. globallyMounted bool + // deviceMountState stores information that tells us if device is mounted + // globally or not + deviceMountState operationexecutor.DeviceMountState + // devicePath contains the path on the node where the volume is attached for // attachable volumes devicePath string @@ -301,6 +312,12 @@ type mountedPod struct { // fsResizeRequired indicates the underlying volume has been successfully // mounted to this pod but its size has been expanded after that. fsResizeRequired bool + + // volumeMounted stores state of volume mount for the pod. if it is: + // - VolumeMounted: means volume for pod has been successfully mounted + // - VolumeMountUncertain: means volume for pod may not be mounted, but it must be unmounted + // - VolumeNotMounted: means volume for pod has not been mounted + volumeMounted operationexecutor.VolumeMountState } func (asw *actualStateOfWorld) MarkVolumeAsAttached( @@ -318,24 +335,8 @@ func (asw *actualStateOfWorld) MarkVolumeAsDetached( asw.DeleteVolume(volumeName) } -func (asw *actualStateOfWorld) MarkVolumeAsMounted( - podName volumetypes.UniquePodName, - podUID types.UID, - volumeName v1.UniqueVolumeName, - mounter volume.Mounter, - blockVolumeMapper volume.BlockVolumeMapper, - outerVolumeSpecName string, - volumeGidValue string, - volumeSpec *volume.Spec) error { - return asw.AddPodToVolume( - podName, - podUID, - volumeName, - mounter, - blockVolumeMapper, - outerVolumeSpecName, - volumeGidValue, - volumeSpec) +func (asw *actualStateOfWorld) MarkVolumeAsMounted(markVolumeOpts operationexecutor.MarkVolumeMountedOpts) error { + return asw.AddPodToVolume(markVolumeOpts) } func (asw *actualStateOfWorld) AddVolumeToReportAsAttached(volumeName v1.UniqueVolumeName, nodeName types.NodeName) { @@ -354,12 +355,17 @@ func (asw *actualStateOfWorld) MarkVolumeAsUnmounted( func (asw *actualStateOfWorld) MarkDeviceAsMounted( volumeName v1.UniqueVolumeName, devicePath, deviceMountPath string) error { - return asw.SetVolumeGloballyMounted(volumeName, true /* globallyMounted */, devicePath, deviceMountPath) + return asw.SetDeviceMountState(volumeName, operationexecutor.DeviceGloballyMounted, devicePath, deviceMountPath) +} + +func (asw *actualStateOfWorld) MarkDeviceAsUncertain( + volumeName v1.UniqueVolumeName, devicePath, deviceMountPath string) error { + return asw.SetDeviceMountState(volumeName, operationexecutor.DeviceMountUncertain, devicePath, deviceMountPath) } func (asw *actualStateOfWorld) MarkDeviceAsUnmounted( volumeName v1.UniqueVolumeName) error { - return asw.SetVolumeGloballyMounted(volumeName, false /* globallyMounted */, "", "") + return asw.SetDeviceMountState(volumeName, operationexecutor.DeviceNotMounted, "", "") } // addVolume adds the given volume to the cache indicating the specified @@ -405,7 +411,7 @@ func (asw *actualStateOfWorld) addVolume( mountedPods: make(map[volumetypes.UniquePodName]mountedPod), pluginName: volumePlugin.GetPluginName(), pluginIsAttachable: pluginIsAttachable, - globallyMounted: false, + deviceMountState: operationexecutor.DeviceNotMounted, devicePath: devicePath, } } else { @@ -420,15 +426,15 @@ func (asw *actualStateOfWorld) addVolume( return nil } -func (asw *actualStateOfWorld) AddPodToVolume( - podName volumetypes.UniquePodName, - podUID types.UID, - volumeName v1.UniqueVolumeName, - mounter volume.Mounter, - blockVolumeMapper volume.BlockVolumeMapper, - outerVolumeSpecName string, - volumeGidValue string, - volumeSpec *volume.Spec) error { +func (asw *actualStateOfWorld) AddPodToVolume(markVolumeOpts operationexecutor.MarkVolumeMountedOpts) error { + podName := markVolumeOpts.PodName + podUID := markVolumeOpts.PodUID + volumeName := markVolumeOpts.VolumeName + mounter := markVolumeOpts.Mounter + blockVolumeMapper := markVolumeOpts.BlockVolumeMapper + outerVolumeSpecName := markVolumeOpts.OuterVolumeSpecName + volumeGidValue := markVolumeOpts.VolumeGidVolume + volumeSpec := markVolumeOpts.VolumeSpec asw.Lock() defer asw.Unlock() @@ -449,13 +455,13 @@ func (asw *actualStateOfWorld) AddPodToVolume( outerVolumeSpecName: outerVolumeSpecName, volumeGidValue: volumeGidValue, volumeSpec: volumeSpec, + volumeMounted: markVolumeOpts.VolumeMountState, } } // If pod exists, reset remountRequired value podObj.remountRequired = false asw.attachedVolumes[volumeName].mountedPods[podName] = podObj - return nil } @@ -554,8 +560,8 @@ func (asw *actualStateOfWorld) MarkFSResizeRequired( } } -func (asw *actualStateOfWorld) SetVolumeGloballyMounted( - volumeName v1.UniqueVolumeName, globallyMounted bool, devicePath, deviceMountPath string) error { +func (asw *actualStateOfWorld) SetDeviceMountState( + volumeName v1.UniqueVolumeName, deviceMountState operationexecutor.DeviceMountState, devicePath, deviceMountPath string) error { asw.Lock() defer asw.Unlock() @@ -566,7 +572,7 @@ func (asw *actualStateOfWorld) SetVolumeGloballyMounted( volumeName) } - volumeObj.globallyMounted = globallyMounted + volumeObj.deviceMountState = deviceMountState volumeObj.deviceMountPath = deviceMountPath if devicePath != "" { volumeObj.devicePath = devicePath @@ -668,9 +674,29 @@ func (asw *actualStateOfWorld) GetMountedVolumes() []MountedVolume { mountedVolume := make([]MountedVolume, 0 /* len */, len(asw.attachedVolumes) /* cap */) for _, volumeObj := range asw.attachedVolumes { for _, podObj := range volumeObj.mountedPods { - mountedVolume = append( - mountedVolume, - getMountedVolume(&podObj, &volumeObj)) + if podObj.volumeMounted == operationexecutor.VolumeMounted { + mountedVolume = append( + mountedVolume, + getMountedVolume(&podObj, &volumeObj)) + } + } + } + return mountedVolume +} + +func (asw *actualStateOfWorld) GetAllMountedVolumes() []MountedVolume { + asw.RLock() + defer asw.RUnlock() + mountedVolume := make([]MountedVolume, 0 /* len */, len(asw.attachedVolumes) /* cap */) + for _, volumeObj := range asw.attachedVolumes { + for _, podObj := range volumeObj.mountedPods { + if podObj.volumeMounted == operationexecutor.VolumeMounted || + podObj.volumeMounted == operationexecutor.VolumeMountUncertain { + mountedVolume = append( + mountedVolume, + getMountedVolume(&podObj, &volumeObj)) + } + } } @@ -683,10 +709,12 @@ func (asw *actualStateOfWorld) GetMountedVolumesForPod( defer asw.RUnlock() mountedVolume := make([]MountedVolume, 0 /* len */, len(asw.attachedVolumes) /* cap */) for _, volumeObj := range asw.attachedVolumes { - if podObj, podExists := volumeObj.mountedPods[podName]; podExists { - mountedVolume = append( - mountedVolume, - getMountedVolume(&podObj, &volumeObj)) + for mountedPodName, podObj := range volumeObj.mountedPods { + if mountedPodName == podName && podObj.volumeMounted == operationexecutor.VolumeMounted { + mountedVolume = append( + mountedVolume, + getMountedVolume(&podObj, &volumeObj)) + } } } @@ -699,7 +727,7 @@ func (asw *actualStateOfWorld) GetGloballyMountedVolumes() []AttachedVolume { globallyMountedVolumes := make( []AttachedVolume, 0 /* len */, len(asw.attachedVolumes) /* cap */) for _, volumeObj := range asw.attachedVolumes { - if volumeObj.globallyMounted { + if volumeObj.deviceMountState == operationexecutor.DeviceGloballyMounted { globallyMountedVolumes = append( globallyMountedVolumes, asw.newAttachedVolume(&volumeObj)) @@ -749,7 +777,7 @@ func (asw *actualStateOfWorld) newAttachedVolume( DevicePath: attachedVolume.devicePath, DeviceMountPath: attachedVolume.deviceMountPath, PluginName: attachedVolume.pluginName}, - GloballyMounted: attachedVolume.globallyMounted, + DeviceMountState: attachedVolume.deviceMountState, } } diff --git a/pkg/kubelet/volumemanager/cache/actual_state_of_world_test.go b/pkg/kubelet/volumemanager/cache/actual_state_of_world_test.go index fd703f8d3df..88ae2e283e8 100644 --- a/pkg/kubelet/volumemanager/cache/actual_state_of_world_test.go +++ b/pkg/kubelet/volumemanager/cache/actual_state_of_world_test.go @@ -25,6 +25,7 @@ import ( "k8s.io/kubernetes/pkg/volume" volumetesting "k8s.io/kubernetes/pkg/volume/testing" "k8s.io/kubernetes/pkg/volume/util" + "k8s.io/kubernetes/pkg/volume/util/operationexecutor" volumetypes "k8s.io/kubernetes/pkg/volume/util/types" ) @@ -220,9 +221,16 @@ func Test_AddPodToVolume_Positive_ExistingVolumeNewNode(t *testing.T) { } // Act - err = asw.AddPodToVolume( - podName, pod.UID, generatedVolumeName, mounter, mapper, volumeSpec.Name(), "" /* volumeGidValue */, volumeSpec) - + markVolumeOpts := operationexecutor.MarkVolumeMountedOpts{ + PodName: podName, + PodUID: pod.UID, + VolumeName: generatedVolumeName, + Mounter: mounter, + BlockVolumeMapper: mapper, + OuterVolumeSpecName: volumeSpec.Name(), + VolumeSpec: volumeSpec, + } + err = asw.AddPodToVolume(markVolumeOpts) // Assert if err != nil { t.Fatalf("AddPodToVolume failed. Expected: Actual: <%v>", err) @@ -287,16 +295,22 @@ func Test_AddPodToVolume_Positive_ExistingVolumeExistingNode(t *testing.T) { t.Fatalf("NewBlockVolumeMapper failed. Expected: Actual: <%v>", err) } - err = asw.AddPodToVolume( - podName, pod.UID, generatedVolumeName, mounter, mapper, volumeSpec.Name(), "" /* volumeGidValue */, volumeSpec) + markVolumeOpts := operationexecutor.MarkVolumeMountedOpts{ + PodName: podName, + PodUID: pod.UID, + VolumeName: generatedVolumeName, + Mounter: mounter, + BlockVolumeMapper: mapper, + OuterVolumeSpecName: volumeSpec.Name(), + VolumeSpec: volumeSpec, + } + err = asw.AddPodToVolume(markVolumeOpts) if err != nil { t.Fatalf("AddPodToVolume failed. Expected: Actual: <%v>", err) } // Act - err = asw.AddPodToVolume( - podName, pod.UID, generatedVolumeName, mounter, mapper, volumeSpec.Name(), "" /* volumeGidValue */, volumeSpec) - + err = asw.AddPodToVolume(markVolumeOpts) // Assert if err != nil { t.Fatalf("AddPodToVolume failed. Expected: Actual: <%v>", err) @@ -388,8 +402,16 @@ func Test_AddTwoPodsToVolume_Positive(t *testing.T) { t.Fatalf("NewBlockVolumeMapper failed. Expected: Actual: <%v>", err) } - err = asw.AddPodToVolume( - podName1, pod1.UID, generatedVolumeName1, mounter1, mapper1, volumeSpec1.Name(), "" /* volumeGidValue */, volumeSpec1) + markVolumeOpts1 := operationexecutor.MarkVolumeMountedOpts{ + PodName: podName1, + PodUID: pod1.UID, + VolumeName: generatedVolumeName1, + Mounter: mounter1, + BlockVolumeMapper: mapper1, + OuterVolumeSpecName: volumeSpec1.Name(), + VolumeSpec: volumeSpec1, + } + err = asw.AddPodToVolume(markVolumeOpts1) if err != nil { t.Fatalf("AddPodToVolume failed. Expected: Actual: <%v>", err) } @@ -406,8 +428,16 @@ func Test_AddTwoPodsToVolume_Positive(t *testing.T) { t.Fatalf("NewBlockVolumeMapper failed. Expected: Actual: <%v>", err) } - err = asw.AddPodToVolume( - podName2, pod2.UID, generatedVolumeName1, mounter2, mapper2, volumeSpec2.Name(), "" /* volumeGidValue */, volumeSpec2) + markVolumeOpts2 := operationexecutor.MarkVolumeMountedOpts{ + PodName: podName2, + PodUID: pod2.UID, + VolumeName: generatedVolumeName1, + Mounter: mounter2, + BlockVolumeMapper: mapper2, + OuterVolumeSpecName: volumeSpec2.Name(), + VolumeSpec: volumeSpec2, + } + err = asw.AddPodToVolume(markVolumeOpts2) if err != nil { t.Fatalf("AddPodToVolume failed. Expected: Actual: <%v>", err) } @@ -421,7 +451,6 @@ func Test_AddTwoPodsToVolume_Positive(t *testing.T) { verifyVolumeExistsWithSpecNameInVolumeAsw(t, podName2, volumeSpec2.Name(), asw) verifyVolumeSpecNameInVolumeAsw(t, podName1, []*volume.Spec{volumeSpec1}, asw) verifyVolumeSpecNameInVolumeAsw(t, podName2, []*volume.Spec{volumeSpec2}, asw) - } // Calls AddPodToVolume() to add pod to empty data struct @@ -484,9 +513,16 @@ func Test_AddPodToVolume_Negative_VolumeDoesntExist(t *testing.T) { } // Act - err = asw.AddPodToVolume( - podName, pod.UID, volumeName, mounter, mapper, volumeSpec.Name(), "" /* volumeGidValue */, volumeSpec) - + markVolumeOpts := operationexecutor.MarkVolumeMountedOpts{ + PodName: podName, + PodUID: pod.UID, + VolumeName: volumeName, + Mounter: mounter, + BlockVolumeMapper: mapper, + OuterVolumeSpecName: volumeSpec.Name(), + VolumeSpec: volumeSpec, + } + err = asw.AddPodToVolume(markVolumeOpts) // Assert if err == nil { t.Fatalf("AddPodToVolume did not fail. Expected: <\"no volume with the name ... exists in the list of attached volumes\"> Actual: ") @@ -556,6 +592,71 @@ func Test_MarkDeviceAsMounted_Positive_NewVolume(t *testing.T) { verifyVolumeExistsInGloballyMountedVolumes(t, generatedVolumeName, asw) } +func TestGetMountedVolumesForPod(t *testing.T) { + // Arrange + volumePluginMgr, plugin := volumetesting.GetTestVolumePluginMgr(t) + asw := NewActualStateOfWorld("mynode" /* nodeName */, volumePluginMgr) + devicePath := "fake/device/path" + + pod1 := &v1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: "pod1", + UID: "pod1uid", + }, + Spec: v1.PodSpec{ + Volumes: []v1.Volume{ + { + Name: "volume-name-1", + VolumeSource: v1.VolumeSource{ + GCEPersistentDisk: &v1.GCEPersistentDiskVolumeSource{ + PDName: "fake-device1", + }, + }, + }, + }, + }, + } + volumeSpec1 := &volume.Spec{Volume: &pod1.Spec.Volumes[0]} + generatedVolumeName1, err := util.GetUniqueVolumeNameFromSpec( + plugin, volumeSpec1) + require.NoError(t, err) + + err = asw.MarkVolumeAsAttached(generatedVolumeName1, volumeSpec1, "" /* nodeName */, devicePath) + if err != nil { + t.Fatalf("MarkVolumeAsAttached failed. Expected: Actual: <%v>", err) + } + podName1 := util.GetUniquePodName(pod1) + + mounter1, err := plugin.NewMounter(volumeSpec1, pod1, volume.VolumeOptions{}) + if err != nil { + t.Fatalf("NewMounter failed. Expected: Actual: <%v>", err) + } + + markVolumeOpts1 := operationexecutor.MarkVolumeMountedOpts{ + PodName: podName1, + PodUID: pod1.UID, + VolumeName: generatedVolumeName1, + Mounter: mounter1, + OuterVolumeSpecName: volumeSpec1.Name(), + VolumeSpec: volumeSpec1, + VolumeMountState: operationexecutor.VolumeMountUncertain, + } + err = asw.AddPodToVolume(markVolumeOpts1) + if err != nil { + t.Fatalf("AddPodToVolume failed. Expected: Actual: <%v>", err) + } + mountedVolumes := asw.GetMountedVolumesForPod(podName1) + volumeFound := false + for _, volume := range mountedVolumes { + if volume.InnerVolumeSpecName == volumeSpec1.Name() { + volumeFound = true + } + } + if volumeFound { + t.Fatalf("expected volume %s to be not found in asw", volumeSpec1.Name()) + } +} + func verifyVolumeExistsInGloballyMountedVolumes( t *testing.T, expectedVolumeName v1.UniqueVolumeName, asw ActualStateOfWorld) { globallyMountedVolumes := asw.GetGloballyMountedVolumes() diff --git a/pkg/kubelet/volumemanager/metrics/BUILD b/pkg/kubelet/volumemanager/metrics/BUILD index 385d70102e1..d39ab810a23 100644 --- a/pkg/kubelet/volumemanager/metrics/BUILD +++ b/pkg/kubelet/volumemanager/metrics/BUILD @@ -37,6 +37,7 @@ go_test( "//pkg/volume:go_default_library", "//pkg/volume/testing:go_default_library", "//pkg/volume/util:go_default_library", + "//pkg/volume/util/operationexecutor:go_default_library", "//staging/src/k8s.io/api/core/v1:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/types:go_default_library", diff --git a/pkg/kubelet/volumemanager/metrics/metrics_test.go b/pkg/kubelet/volumemanager/metrics/metrics_test.go index 3c630d0061c..986fb6f0e65 100644 --- a/pkg/kubelet/volumemanager/metrics/metrics_test.go +++ b/pkg/kubelet/volumemanager/metrics/metrics_test.go @@ -27,6 +27,7 @@ import ( volumetesting "k8s.io/kubernetes/pkg/volume/testing" "k8s.io/kubernetes/pkg/volume/util" + "k8s.io/kubernetes/pkg/volume/util/operationexecutor" ) func TestMetricCollection(t *testing.T) { @@ -77,8 +78,16 @@ func TestMetricCollection(t *testing.T) { t.Fatalf("MarkVolumeAsAttached failed. Expected: Actual: <%v>", err) } - err = asw.AddPodToVolume( - podName, pod.UID, generatedVolumeName, mounter, mapper, volumeSpec.Name(), "", volumeSpec) + markVolumeOpts := operationexecutor.MarkVolumeMountedOpts{ + PodName: podName, + PodUID: pod.UID, + VolumeName: generatedVolumeName, + Mounter: mounter, + BlockVolumeMapper: mapper, + OuterVolumeSpecName: volumeSpec.Name(), + VolumeSpec: volumeSpec, + } + err = asw.AddPodToVolume(markVolumeOpts) if err != nil { t.Fatalf("AddPodToVolume failed. Expected: Actual: <%v>", err) } diff --git a/pkg/kubelet/volumemanager/populator/BUILD b/pkg/kubelet/volumemanager/populator/BUILD index 09896dcadd0..cc3044a4bbf 100644 --- a/pkg/kubelet/volumemanager/populator/BUILD +++ b/pkg/kubelet/volumemanager/populator/BUILD @@ -63,6 +63,7 @@ go_test( "//pkg/volume/csimigration:go_default_library", "//pkg/volume/testing:go_default_library", "//pkg/volume/util:go_default_library", + "//pkg/volume/util/operationexecutor:go_default_library", "//pkg/volume/util/types:go_default_library", "//staging/src/k8s.io/api/core/v1:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/api/resource:go_default_library", diff --git a/pkg/kubelet/volumemanager/populator/desired_state_of_world_populator_test.go b/pkg/kubelet/volumemanager/populator/desired_state_of_world_populator_test.go index 7d872deb8e8..423ac509cae 100644 --- a/pkg/kubelet/volumemanager/populator/desired_state_of_world_populator_test.go +++ b/pkg/kubelet/volumemanager/populator/desired_state_of_world_populator_test.go @@ -43,6 +43,7 @@ import ( "k8s.io/kubernetes/pkg/volume/csimigration" volumetesting "k8s.io/kubernetes/pkg/volume/testing" "k8s.io/kubernetes/pkg/volume/util" + "k8s.io/kubernetes/pkg/volume/util/operationexecutor" "k8s.io/kubernetes/pkg/volume/util/types" ) @@ -854,8 +855,16 @@ func reconcileASW(asw cache.ActualStateOfWorld, dsw cache.DesiredStateOfWorld, t if err != nil { t.Fatalf("Unexpected error when MarkVolumeAsAttached: %v", err) } - err = asw.MarkVolumeAsMounted(volumeToMount.PodName, volumeToMount.Pod.UID, - volumeToMount.VolumeName, nil, nil, volumeToMount.OuterVolumeSpecName, volumeToMount.VolumeGidValue, volumeToMount.VolumeSpec) + markVolumeOpts := operationexecutor.MarkVolumeMountedOpts{ + PodName: volumeToMount.PodName, + PodUID: volumeToMount.Pod.UID, + VolumeName: volumeToMount.VolumeName, + OuterVolumeSpecName: volumeToMount.OuterVolumeSpecName, + VolumeGidVolume: volumeToMount.VolumeGidValue, + VolumeSpec: volumeToMount.VolumeSpec, + VolumeMountState: operationexecutor.VolumeMounted, + } + err = asw.MarkVolumeAsMounted(markVolumeOpts) if err != nil { t.Fatalf("Unexpected error when MarkVolumeAsMounted: %v", err) } diff --git a/pkg/kubelet/volumemanager/reconciler/reconciler.go b/pkg/kubelet/volumemanager/reconciler/reconciler.go index 3524446d5d4..cd000065a68 100644 --- a/pkg/kubelet/volumemanager/reconciler/reconciler.go +++ b/pkg/kubelet/volumemanager/reconciler/reconciler.go @@ -164,9 +164,19 @@ func (rc *reconciler) reconcile() { // referenced by a pod that was deleted and is now referenced by another // pod is unmounted from the first pod before being mounted to the new // pod. + rc.unmountVolumes() + // Next we mount required volumes. This function could also trigger + // detach if kubelet is responsible for detaching volumes. + rc.mountAttachVolumes() + + // Ensure devices that should be detached/unmounted are detached/unmounted. + rc.unmountDetachDevices() +} + +func (rc *reconciler) unmountVolumes() { // Ensure volumes that should be unmounted are unmounted. - for _, mountedVolume := range rc.actualStateOfWorld.GetMountedVolumes() { + for _, mountedVolume := range rc.actualStateOfWorld.GetAllMountedVolumes() { if !rc.desiredStateOfWorld.PodExistsInVolume(mountedVolume.PodName, mountedVolume.VolumeName) { // Volume is mounted, unmount it klog.V(5).Infof(mountedVolume.GenerateMsgDetailed("Starting operationExecutor.UnmountVolume", "")) @@ -184,7 +194,9 @@ func (rc *reconciler) reconcile() { } } } +} +func (rc *reconciler) mountAttachVolumes() { // Ensure volumes that should be attached/mounted are attached/mounted. for _, volumeToMount := range rc.desiredStateOfWorld.GetVolumesToMount() { volMounted, devicePath, err := rc.actualStateOfWorld.PodExistsInVolume(volumeToMount.PodName, volumeToMount.VolumeName) @@ -274,13 +286,14 @@ func (rc *reconciler) reconcile() { } } } +} - // Ensure devices that should be detached/unmounted are detached/unmounted. +func (rc *reconciler) unmountDetachDevices() { for _, attachedVolume := range rc.actualStateOfWorld.GetUnmountedVolumes() { // Check IsOperationPending to avoid marking a volume as detached if it's in the process of mounting. if !rc.desiredStateOfWorld.VolumeExists(attachedVolume.VolumeName) && !rc.operationExecutor.IsOperationPending(attachedVolume.VolumeName, nestedpendingoperations.EmptyUniquePodName) { - if attachedVolume.GloballyMounted { + if attachedVolume.DeviceMayBeMounted() { // Volume is globally mounted to device, unmount it klog.V(5).Infof(attachedVolume.GenerateMsgDetailed("Starting operationExecutor.UnmountDevice", "")) err := rc.operationExecutor.UnmountDevice( @@ -625,15 +638,18 @@ func (rc *reconciler) updateStates(volumesNeedUpdate map[v1.UniqueVolumeName]*re klog.Errorf("Could not add volume information to actual state of world: %v", err) continue } - err = rc.actualStateOfWorld.MarkVolumeAsMounted( - volume.podName, - types.UID(volume.podName), - volume.volumeName, - volume.mounter, - volume.blockVolumeMapper, - volume.outerVolumeSpecName, - volume.volumeGidValue, - volume.volumeSpec) + markVolumeOpts := operationexecutor.MarkVolumeMountedOpts{ + PodName: volume.podName, + PodUID: types.UID(volume.podName), + VolumeName: volume.volumeName, + Mounter: volume.mounter, + BlockVolumeMapper: volume.blockVolumeMapper, + OuterVolumeSpecName: volume.outerVolumeSpecName, + VolumeGidVolume: volume.volumeGidValue, + VolumeSpec: volume.volumeSpec, + VolumeMountState: operationexecutor.VolumeMounted, + } + err = rc.actualStateOfWorld.MarkVolumeAsMounted(markVolumeOpts) if err != nil { klog.Errorf("Could not add pod to volume information to actual state of world: %v", err) continue diff --git a/pkg/kubelet/volumemanager/volume_manager.go b/pkg/kubelet/volumemanager/volume_manager.go index 3fe144a073f..300cf7159b8 100644 --- a/pkg/kubelet/volumemanager/volume_manager.go +++ b/pkg/kubelet/volumemanager/volume_manager.go @@ -430,6 +430,7 @@ func (vm *volumeManager) verifyVolumesMountedFunc(podName types.UniquePodName, e // getUnmountedVolumes fetches the current list of mounted volumes from // the actual state of the world, and uses it to process the list of // expectedVolumes. It returns a list of unmounted volumes. +// The list also includes volume that may be mounted in uncertain state. func (vm *volumeManager) getUnmountedVolumes(podName types.UniquePodName, expectedVolumes []string) []string { mountedVolumes := sets.NewString() for _, mountedVolume := range vm.actualStateOfWorld.GetMountedVolumesForPod(podName) { diff --git a/pkg/volume/util/operationexecutor/operation_executor.go b/pkg/volume/util/operationexecutor/operation_executor.go index c95029e6fd7..92a4e4305a3 100644 --- a/pkg/volume/util/operationexecutor/operation_executor.go +++ b/pkg/volume/util/operationexecutor/operation_executor.go @@ -160,11 +160,23 @@ func NewOperationExecutor( } } +type MarkVolumeMountedOpts struct { + PodName volumetypes.UniquePodName + PodUID types.UID + VolumeName v1.UniqueVolumeName + Mounter volume.Mounter + BlockVolumeMapper volume.BlockVolumeMapper + OuterVolumeSpecName string + VolumeGidVolume string + VolumeSpec *volume.Spec + VolumeMountState VolumeMountState +} + // ActualStateOfWorldMounterUpdater defines a set of operations updating the actual // state of the world cache after successful mount/unmount. type ActualStateOfWorldMounterUpdater interface { // Marks the specified volume as mounted to the specified pod - MarkVolumeAsMounted(podName volumetypes.UniquePodName, podUID types.UID, volumeName v1.UniqueVolumeName, mounter volume.Mounter, blockVolumeMapper volume.BlockVolumeMapper, outerVolumeSpecName string, volumeGidValue string, volumeSpec *volume.Spec) error + MarkVolumeAsMounted(markVolumeOpts MarkVolumeMountedOpts) error // Marks the specified volume as unmounted from the specified pod MarkVolumeAsUnmounted(podName volumetypes.UniquePodName, volumeName v1.UniqueVolumeName) error @@ -172,6 +184,8 @@ type ActualStateOfWorldMounterUpdater interface { // Marks the specified volume as having been globally mounted. MarkDeviceAsMounted(volumeName v1.UniqueVolumeName, devicePath, deviceMountPath string) error + MarkDeviceAsUncertain(volumeName v1.UniqueVolumeName, devicePath, deviceMountPath string) error + // Marks the specified volume as having its global mount unmounted. MarkDeviceAsUnmounted(volumeName v1.UniqueVolumeName) error @@ -354,6 +368,32 @@ type VolumeToMount struct { DesiredSizeLimit *resource.Quantity } +// DeviceMountState represents device mount state in a global path. +type DeviceMountState string + +const ( + // DeviceGloballymounted means device has been globally mounted successfully + DeviceGloballyMounted DeviceMountState = "DeviceGloballyMounted" + + // Uncertain means device may not be mounted but a mount operation may be + // in-progress which can cause device mount to succeed. + DeviceMountUncertain DeviceMountState = "DeviceMountUncertain" + + // DeviceNotMounted means device has not been mounted globally. + DeviceNotMounted DeviceMountState = "DeviceNotMounted" +) + +// VolumeMountState represents volume mount state in a path local to the pod. +type VolumeMountState string + +const ( + VolumeMounted VolumeMountState = "VolumeMounted" + + VolumeMountUncertain VolumeMountState = "VolumeMountUncertain" + + VolumeNotMounted VolumeMountState = "VolumeNotMounted" +) + // GenerateMsgDetailed returns detailed msgs for volumes to mount func (volume *VolumeToMount) GenerateMsgDetailed(prefixMsg, suffixMsg string) (detailedMsg string) { detailedStr := fmt.Sprintf("(UniqueName: %q) pod %q (UID: %q)", volume.VolumeName, volume.Pod.Name, volume.Pod.UID) diff --git a/pkg/volume/util/operationexecutor/operation_generator.go b/pkg/volume/util/operationexecutor/operation_generator.go index 91b86de990d..425a5b6f79a 100644 --- a/pkg/volume/util/operationexecutor/operation_generator.go +++ b/pkg/volume/util/operationexecutor/operation_generator.go @@ -648,15 +648,17 @@ func (og *operationGenerator) GenerateMountVolumeFunc( } // Update actual state of world - markVolMountedErr := actualStateOfWorld.MarkVolumeAsMounted( - volumeToMount.PodName, - volumeToMount.Pod.UID, - volumeToMount.VolumeName, - volumeMounter, - nil, - volumeToMount.OuterVolumeSpecName, - volumeToMount.VolumeGidValue, - volumeToMount.VolumeSpec) + markOpts := MarkVolumeMountedOpts{ + PodName: volumeToMount.PodName, + PodUID: volumeToMount.Pod.UID, + VolumeName: volumeToMount.VolumeName, + Mounter: volumeMounter, + OuterVolumeSpecName: volumeToMount.OuterVolumeSpecName, + VolumeGidVolume: volumeToMount.VolumeGidValue, + VolumeSpec: volumeToMount.VolumeSpec, + VolumeMountState: VolumeMounted, + } + markVolMountedErr := actualStateOfWorld.MarkVolumeAsMounted(markOpts) if markVolMountedErr != nil { // On failure, return error. Caller will log and retry. return volumeToMount.GenerateError("MountVolume.MarkVolumeAsMounted failed", markVolMountedErr) @@ -982,16 +984,18 @@ func (og *operationGenerator) GenerateMapVolumeFunc( return volumeToMount.GenerateError("MapVolume.MarkVolumeAsMounted failed while expanding volume", resizeError) } - // Update actual state of world - markVolMountedErr := actualStateOfWorld.MarkVolumeAsMounted( - volumeToMount.PodName, - volumeToMount.Pod.UID, - volumeToMount.VolumeName, - nil, - blockVolumeMapper, - volumeToMount.OuterVolumeSpecName, - volumeToMount.VolumeGidValue, - volumeToMount.VolumeSpec) + markVolumeOpts := MarkVolumeMountedOpts{ + PodName: volumeToMount.PodName, + PodUID: volumeToMount.Pod.UID, + VolumeName: volumeToMount.VolumeName, + BlockVolumeMapper: blockVolumeMapper, + OuterVolumeSpecName: volumeToMount.OuterVolumeSpecName, + VolumeGidVolume: volumeToMount.VolumeGidValue, + VolumeSpec: volumeToMount.VolumeSpec, + VolumeMountState: VolumeMounted, + } + + markVolMountedErr := actualStateOfWorld.MarkVolumeAsMounted(markVolumeOpts) if markVolMountedErr != nil { // On failure, return error. Caller will log and retry. return volumeToMount.GenerateError("MapVolume.MarkVolumeAsMounted failed", markVolMountedErr)