Add code for introducing uncertain state of mounts

Add a comment about volumestate
This commit is contained in:
Hemant Kumar 2019-09-05 21:44:17 -04:00
parent ea385aa5e9
commit a795f3de88
11 changed files with 316 additions and 105 deletions

View File

@ -40,6 +40,7 @@ go_test(
"//pkg/volume:go_default_library",
"//pkg/volume/testing:go_default_library",
"//pkg/volume/util:go_default_library",
"//pkg/volume/util/operationexecutor:go_default_library",
"//pkg/volume/util/types:go_default_library",
"//staging/src/k8s.io/api/core/v1:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",

View File

@ -59,7 +59,7 @@ type ActualStateOfWorld interface {
// volume, reset the pod's remountRequired value.
// If a volume with the name volumeName does not exist in the list of
// attached volumes, an error is returned.
AddPodToVolume(podName volumetypes.UniquePodName, podUID types.UID, volumeName v1.UniqueVolumeName, mounter volume.Mounter, blockVolumeMapper volume.BlockVolumeMapper, outerVolumeSpecName string, volumeGidValue string, volumeSpec *volume.Spec) error
AddPodToVolume(operationexecutor.MarkVolumeMountedOpts) error
// MarkRemountRequired marks each volume that is successfully attached and
// mounted for the specified pod as requiring remount (if the plugin for the
@ -68,13 +68,13 @@ type ActualStateOfWorld interface {
// pod update.
MarkRemountRequired(podName volumetypes.UniquePodName)
// SetVolumeGloballyMounted sets the GloballyMounted value for the given
// volume. When set to true this value indicates that the volume is mounted
// to the underlying device at a global mount point. This global mount point
// must unmounted prior to detach.
// SetDeviceMountState sets device mount state for the given volume. When deviceMountState is set to DeviceGloballyMounted
// then device is mounted at a global mount point. When it is set to DeviceMountUncertain then also it means volume
// MAY be globally mounted at a global mount point. In both cases - the volume must be unmounted from
// global mount point prior to detach.
// If a volume with the name volumeName does not exist in the list of
// attached volumes, an error is returned.
SetVolumeGloballyMounted(volumeName v1.UniqueVolumeName, globallyMounted bool, devicePath, deviceMountPath string) error
SetDeviceMountState(volumeName v1.UniqueVolumeName, deviceMountState operationexecutor.DeviceMountState, devicePath, deviceMountPath string) error
// DeletePodFromVolume removes the given pod from the given volume in the
// cache indicating the volume has been successfully unmounted from the pod.
@ -127,6 +127,10 @@ type ActualStateOfWorld interface {
// actual state of the world.
GetMountedVolumes() []MountedVolume
// GetAllMountedVolumes returns list of all mounted volumes including
// those that are in VolumeMounted state and VolumeMountUncertain state.
GetAllMountedVolumes() []MountedVolume
// GetMountedVolumesForPod generates and returns a list of volumes that are
// successfully attached and mounted for the specified pod based on the
// current actual state of the world.
@ -165,10 +169,13 @@ type MountedVolume struct {
type AttachedVolume struct {
operationexecutor.AttachedVolume
// GloballyMounted indicates that the volume is mounted to the underlying
// device at a global mount point. This global mount point must unmounted
// prior to detach.
GloballyMounted bool
// DeviceMountState indicates if device has been globally mounted or is not.
DeviceMountState operationexecutor.DeviceMountState
}
func (av AttachedVolume) DeviceMayBeMounted() bool {
return av.DeviceMountState == operationexecutor.DeviceGloballyMounted ||
av.DeviceMountState == operationexecutor.DeviceMountUncertain
}
// NewActualStateOfWorld returns a new instance of ActualStateOfWorld.
@ -250,6 +257,10 @@ type attachedVolume struct {
// prior to detach.
globallyMounted bool
// deviceMountState stores information that tells us if device is mounted
// globally or not
deviceMountState operationexecutor.DeviceMountState
// devicePath contains the path on the node where the volume is attached for
// attachable volumes
devicePath string
@ -301,6 +312,12 @@ type mountedPod struct {
// fsResizeRequired indicates the underlying volume has been successfully
// mounted to this pod but its size has been expanded after that.
fsResizeRequired bool
// volumeMounted stores state of volume mount for the pod. if it is:
// - VolumeMounted: means volume for pod has been successfully mounted
// - VolumeMountUncertain: means volume for pod may not be mounted, but it must be unmounted
// - VolumeNotMounted: means volume for pod has not been mounted
volumeMounted operationexecutor.VolumeMountState
}
func (asw *actualStateOfWorld) MarkVolumeAsAttached(
@ -318,24 +335,8 @@ func (asw *actualStateOfWorld) MarkVolumeAsDetached(
asw.DeleteVolume(volumeName)
}
func (asw *actualStateOfWorld) MarkVolumeAsMounted(
podName volumetypes.UniquePodName,
podUID types.UID,
volumeName v1.UniqueVolumeName,
mounter volume.Mounter,
blockVolumeMapper volume.BlockVolumeMapper,
outerVolumeSpecName string,
volumeGidValue string,
volumeSpec *volume.Spec) error {
return asw.AddPodToVolume(
podName,
podUID,
volumeName,
mounter,
blockVolumeMapper,
outerVolumeSpecName,
volumeGidValue,
volumeSpec)
func (asw *actualStateOfWorld) MarkVolumeAsMounted(markVolumeOpts operationexecutor.MarkVolumeMountedOpts) error {
return asw.AddPodToVolume(markVolumeOpts)
}
func (asw *actualStateOfWorld) AddVolumeToReportAsAttached(volumeName v1.UniqueVolumeName, nodeName types.NodeName) {
@ -354,12 +355,17 @@ func (asw *actualStateOfWorld) MarkVolumeAsUnmounted(
func (asw *actualStateOfWorld) MarkDeviceAsMounted(
volumeName v1.UniqueVolumeName, devicePath, deviceMountPath string) error {
return asw.SetVolumeGloballyMounted(volumeName, true /* globallyMounted */, devicePath, deviceMountPath)
return asw.SetDeviceMountState(volumeName, operationexecutor.DeviceGloballyMounted, devicePath, deviceMountPath)
}
func (asw *actualStateOfWorld) MarkDeviceAsUncertain(
volumeName v1.UniqueVolumeName, devicePath, deviceMountPath string) error {
return asw.SetDeviceMountState(volumeName, operationexecutor.DeviceMountUncertain, devicePath, deviceMountPath)
}
func (asw *actualStateOfWorld) MarkDeviceAsUnmounted(
volumeName v1.UniqueVolumeName) error {
return asw.SetVolumeGloballyMounted(volumeName, false /* globallyMounted */, "", "")
return asw.SetDeviceMountState(volumeName, operationexecutor.DeviceNotMounted, "", "")
}
// addVolume adds the given volume to the cache indicating the specified
@ -405,7 +411,7 @@ func (asw *actualStateOfWorld) addVolume(
mountedPods: make(map[volumetypes.UniquePodName]mountedPod),
pluginName: volumePlugin.GetPluginName(),
pluginIsAttachable: pluginIsAttachable,
globallyMounted: false,
deviceMountState: operationexecutor.DeviceNotMounted,
devicePath: devicePath,
}
} else {
@ -420,15 +426,15 @@ func (asw *actualStateOfWorld) addVolume(
return nil
}
func (asw *actualStateOfWorld) AddPodToVolume(
podName volumetypes.UniquePodName,
podUID types.UID,
volumeName v1.UniqueVolumeName,
mounter volume.Mounter,
blockVolumeMapper volume.BlockVolumeMapper,
outerVolumeSpecName string,
volumeGidValue string,
volumeSpec *volume.Spec) error {
func (asw *actualStateOfWorld) AddPodToVolume(markVolumeOpts operationexecutor.MarkVolumeMountedOpts) error {
podName := markVolumeOpts.PodName
podUID := markVolumeOpts.PodUID
volumeName := markVolumeOpts.VolumeName
mounter := markVolumeOpts.Mounter
blockVolumeMapper := markVolumeOpts.BlockVolumeMapper
outerVolumeSpecName := markVolumeOpts.OuterVolumeSpecName
volumeGidValue := markVolumeOpts.VolumeGidVolume
volumeSpec := markVolumeOpts.VolumeSpec
asw.Lock()
defer asw.Unlock()
@ -449,13 +455,13 @@ func (asw *actualStateOfWorld) AddPodToVolume(
outerVolumeSpecName: outerVolumeSpecName,
volumeGidValue: volumeGidValue,
volumeSpec: volumeSpec,
volumeMounted: markVolumeOpts.VolumeMountState,
}
}
// If pod exists, reset remountRequired value
podObj.remountRequired = false
asw.attachedVolumes[volumeName].mountedPods[podName] = podObj
return nil
}
@ -554,8 +560,8 @@ func (asw *actualStateOfWorld) MarkFSResizeRequired(
}
}
func (asw *actualStateOfWorld) SetVolumeGloballyMounted(
volumeName v1.UniqueVolumeName, globallyMounted bool, devicePath, deviceMountPath string) error {
func (asw *actualStateOfWorld) SetDeviceMountState(
volumeName v1.UniqueVolumeName, deviceMountState operationexecutor.DeviceMountState, devicePath, deviceMountPath string) error {
asw.Lock()
defer asw.Unlock()
@ -566,7 +572,7 @@ func (asw *actualStateOfWorld) SetVolumeGloballyMounted(
volumeName)
}
volumeObj.globallyMounted = globallyMounted
volumeObj.deviceMountState = deviceMountState
volumeObj.deviceMountPath = deviceMountPath
if devicePath != "" {
volumeObj.devicePath = devicePath
@ -668,9 +674,29 @@ func (asw *actualStateOfWorld) GetMountedVolumes() []MountedVolume {
mountedVolume := make([]MountedVolume, 0 /* len */, len(asw.attachedVolumes) /* cap */)
for _, volumeObj := range asw.attachedVolumes {
for _, podObj := range volumeObj.mountedPods {
mountedVolume = append(
mountedVolume,
getMountedVolume(&podObj, &volumeObj))
if podObj.volumeMounted == operationexecutor.VolumeMounted {
mountedVolume = append(
mountedVolume,
getMountedVolume(&podObj, &volumeObj))
}
}
}
return mountedVolume
}
func (asw *actualStateOfWorld) GetAllMountedVolumes() []MountedVolume {
asw.RLock()
defer asw.RUnlock()
mountedVolume := make([]MountedVolume, 0 /* len */, len(asw.attachedVolumes) /* cap */)
for _, volumeObj := range asw.attachedVolumes {
for _, podObj := range volumeObj.mountedPods {
if podObj.volumeMounted == operationexecutor.VolumeMounted ||
podObj.volumeMounted == operationexecutor.VolumeMountUncertain {
mountedVolume = append(
mountedVolume,
getMountedVolume(&podObj, &volumeObj))
}
}
}
@ -683,10 +709,12 @@ func (asw *actualStateOfWorld) GetMountedVolumesForPod(
defer asw.RUnlock()
mountedVolume := make([]MountedVolume, 0 /* len */, len(asw.attachedVolumes) /* cap */)
for _, volumeObj := range asw.attachedVolumes {
if podObj, podExists := volumeObj.mountedPods[podName]; podExists {
mountedVolume = append(
mountedVolume,
getMountedVolume(&podObj, &volumeObj))
for mountedPodName, podObj := range volumeObj.mountedPods {
if mountedPodName == podName && podObj.volumeMounted == operationexecutor.VolumeMounted {
mountedVolume = append(
mountedVolume,
getMountedVolume(&podObj, &volumeObj))
}
}
}
@ -699,7 +727,7 @@ func (asw *actualStateOfWorld) GetGloballyMountedVolumes() []AttachedVolume {
globallyMountedVolumes := make(
[]AttachedVolume, 0 /* len */, len(asw.attachedVolumes) /* cap */)
for _, volumeObj := range asw.attachedVolumes {
if volumeObj.globallyMounted {
if volumeObj.deviceMountState == operationexecutor.DeviceGloballyMounted {
globallyMountedVolumes = append(
globallyMountedVolumes,
asw.newAttachedVolume(&volumeObj))
@ -749,7 +777,7 @@ func (asw *actualStateOfWorld) newAttachedVolume(
DevicePath: attachedVolume.devicePath,
DeviceMountPath: attachedVolume.deviceMountPath,
PluginName: attachedVolume.pluginName},
GloballyMounted: attachedVolume.globallyMounted,
DeviceMountState: attachedVolume.deviceMountState,
}
}

View File

@ -25,6 +25,7 @@ import (
"k8s.io/kubernetes/pkg/volume"
volumetesting "k8s.io/kubernetes/pkg/volume/testing"
"k8s.io/kubernetes/pkg/volume/util"
"k8s.io/kubernetes/pkg/volume/util/operationexecutor"
volumetypes "k8s.io/kubernetes/pkg/volume/util/types"
)
@ -220,9 +221,16 @@ func Test_AddPodToVolume_Positive_ExistingVolumeNewNode(t *testing.T) {
}
// Act
err = asw.AddPodToVolume(
podName, pod.UID, generatedVolumeName, mounter, mapper, volumeSpec.Name(), "" /* volumeGidValue */, volumeSpec)
markVolumeOpts := operationexecutor.MarkVolumeMountedOpts{
PodName: podName,
PodUID: pod.UID,
VolumeName: generatedVolumeName,
Mounter: mounter,
BlockVolumeMapper: mapper,
OuterVolumeSpecName: volumeSpec.Name(),
VolumeSpec: volumeSpec,
}
err = asw.AddPodToVolume(markVolumeOpts)
// Assert
if err != nil {
t.Fatalf("AddPodToVolume failed. Expected: <no error> Actual: <%v>", err)
@ -287,16 +295,22 @@ func Test_AddPodToVolume_Positive_ExistingVolumeExistingNode(t *testing.T) {
t.Fatalf("NewBlockVolumeMapper failed. Expected: <no error> Actual: <%v>", err)
}
err = asw.AddPodToVolume(
podName, pod.UID, generatedVolumeName, mounter, mapper, volumeSpec.Name(), "" /* volumeGidValue */, volumeSpec)
markVolumeOpts := operationexecutor.MarkVolumeMountedOpts{
PodName: podName,
PodUID: pod.UID,
VolumeName: generatedVolumeName,
Mounter: mounter,
BlockVolumeMapper: mapper,
OuterVolumeSpecName: volumeSpec.Name(),
VolumeSpec: volumeSpec,
}
err = asw.AddPodToVolume(markVolumeOpts)
if err != nil {
t.Fatalf("AddPodToVolume failed. Expected: <no error> Actual: <%v>", err)
}
// Act
err = asw.AddPodToVolume(
podName, pod.UID, generatedVolumeName, mounter, mapper, volumeSpec.Name(), "" /* volumeGidValue */, volumeSpec)
err = asw.AddPodToVolume(markVolumeOpts)
// Assert
if err != nil {
t.Fatalf("AddPodToVolume failed. Expected: <no error> Actual: <%v>", err)
@ -388,8 +402,16 @@ func Test_AddTwoPodsToVolume_Positive(t *testing.T) {
t.Fatalf("NewBlockVolumeMapper failed. Expected: <no error> Actual: <%v>", err)
}
err = asw.AddPodToVolume(
podName1, pod1.UID, generatedVolumeName1, mounter1, mapper1, volumeSpec1.Name(), "" /* volumeGidValue */, volumeSpec1)
markVolumeOpts1 := operationexecutor.MarkVolumeMountedOpts{
PodName: podName1,
PodUID: pod1.UID,
VolumeName: generatedVolumeName1,
Mounter: mounter1,
BlockVolumeMapper: mapper1,
OuterVolumeSpecName: volumeSpec1.Name(),
VolumeSpec: volumeSpec1,
}
err = asw.AddPodToVolume(markVolumeOpts1)
if err != nil {
t.Fatalf("AddPodToVolume failed. Expected: <no error> Actual: <%v>", err)
}
@ -406,8 +428,16 @@ func Test_AddTwoPodsToVolume_Positive(t *testing.T) {
t.Fatalf("NewBlockVolumeMapper failed. Expected: <no error> Actual: <%v>", err)
}
err = asw.AddPodToVolume(
podName2, pod2.UID, generatedVolumeName1, mounter2, mapper2, volumeSpec2.Name(), "" /* volumeGidValue */, volumeSpec2)
markVolumeOpts2 := operationexecutor.MarkVolumeMountedOpts{
PodName: podName2,
PodUID: pod2.UID,
VolumeName: generatedVolumeName1,
Mounter: mounter2,
BlockVolumeMapper: mapper2,
OuterVolumeSpecName: volumeSpec2.Name(),
VolumeSpec: volumeSpec2,
}
err = asw.AddPodToVolume(markVolumeOpts2)
if err != nil {
t.Fatalf("AddPodToVolume failed. Expected: <no error> Actual: <%v>", err)
}
@ -421,7 +451,6 @@ func Test_AddTwoPodsToVolume_Positive(t *testing.T) {
verifyVolumeExistsWithSpecNameInVolumeAsw(t, podName2, volumeSpec2.Name(), asw)
verifyVolumeSpecNameInVolumeAsw(t, podName1, []*volume.Spec{volumeSpec1}, asw)
verifyVolumeSpecNameInVolumeAsw(t, podName2, []*volume.Spec{volumeSpec2}, asw)
}
// Calls AddPodToVolume() to add pod to empty data struct
@ -484,9 +513,16 @@ func Test_AddPodToVolume_Negative_VolumeDoesntExist(t *testing.T) {
}
// Act
err = asw.AddPodToVolume(
podName, pod.UID, volumeName, mounter, mapper, volumeSpec.Name(), "" /* volumeGidValue */, volumeSpec)
markVolumeOpts := operationexecutor.MarkVolumeMountedOpts{
PodName: podName,
PodUID: pod.UID,
VolumeName: volumeName,
Mounter: mounter,
BlockVolumeMapper: mapper,
OuterVolumeSpecName: volumeSpec.Name(),
VolumeSpec: volumeSpec,
}
err = asw.AddPodToVolume(markVolumeOpts)
// Assert
if err == nil {
t.Fatalf("AddPodToVolume did not fail. Expected: <\"no volume with the name ... exists in the list of attached volumes\"> Actual: <no error>")
@ -556,6 +592,71 @@ func Test_MarkDeviceAsMounted_Positive_NewVolume(t *testing.T) {
verifyVolumeExistsInGloballyMountedVolumes(t, generatedVolumeName, asw)
}
func TestGetMountedVolumesForPod(t *testing.T) {
// Arrange
volumePluginMgr, plugin := volumetesting.GetTestVolumePluginMgr(t)
asw := NewActualStateOfWorld("mynode" /* nodeName */, volumePluginMgr)
devicePath := "fake/device/path"
pod1 := &v1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: "pod1",
UID: "pod1uid",
},
Spec: v1.PodSpec{
Volumes: []v1.Volume{
{
Name: "volume-name-1",
VolumeSource: v1.VolumeSource{
GCEPersistentDisk: &v1.GCEPersistentDiskVolumeSource{
PDName: "fake-device1",
},
},
},
},
},
}
volumeSpec1 := &volume.Spec{Volume: &pod1.Spec.Volumes[0]}
generatedVolumeName1, err := util.GetUniqueVolumeNameFromSpec(
plugin, volumeSpec1)
require.NoError(t, err)
err = asw.MarkVolumeAsAttached(generatedVolumeName1, volumeSpec1, "" /* nodeName */, devicePath)
if err != nil {
t.Fatalf("MarkVolumeAsAttached failed. Expected: <no error> Actual: <%v>", err)
}
podName1 := util.GetUniquePodName(pod1)
mounter1, err := plugin.NewMounter(volumeSpec1, pod1, volume.VolumeOptions{})
if err != nil {
t.Fatalf("NewMounter failed. Expected: <no error> Actual: <%v>", err)
}
markVolumeOpts1 := operationexecutor.MarkVolumeMountedOpts{
PodName: podName1,
PodUID: pod1.UID,
VolumeName: generatedVolumeName1,
Mounter: mounter1,
OuterVolumeSpecName: volumeSpec1.Name(),
VolumeSpec: volumeSpec1,
VolumeMountState: operationexecutor.VolumeMountUncertain,
}
err = asw.AddPodToVolume(markVolumeOpts1)
if err != nil {
t.Fatalf("AddPodToVolume failed. Expected: <no error> Actual: <%v>", err)
}
mountedVolumes := asw.GetMountedVolumesForPod(podName1)
volumeFound := false
for _, volume := range mountedVolumes {
if volume.InnerVolumeSpecName == volumeSpec1.Name() {
volumeFound = true
}
}
if volumeFound {
t.Fatalf("expected volume %s to be not found in asw", volumeSpec1.Name())
}
}
func verifyVolumeExistsInGloballyMountedVolumes(
t *testing.T, expectedVolumeName v1.UniqueVolumeName, asw ActualStateOfWorld) {
globallyMountedVolumes := asw.GetGloballyMountedVolumes()

View File

@ -37,6 +37,7 @@ go_test(
"//pkg/volume:go_default_library",
"//pkg/volume/testing:go_default_library",
"//pkg/volume/util:go_default_library",
"//pkg/volume/util/operationexecutor:go_default_library",
"//staging/src/k8s.io/api/core/v1:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/types:go_default_library",

View File

@ -27,6 +27,7 @@ import (
volumetesting "k8s.io/kubernetes/pkg/volume/testing"
"k8s.io/kubernetes/pkg/volume/util"
"k8s.io/kubernetes/pkg/volume/util/operationexecutor"
)
func TestMetricCollection(t *testing.T) {
@ -77,8 +78,16 @@ func TestMetricCollection(t *testing.T) {
t.Fatalf("MarkVolumeAsAttached failed. Expected: <no error> Actual: <%v>", err)
}
err = asw.AddPodToVolume(
podName, pod.UID, generatedVolumeName, mounter, mapper, volumeSpec.Name(), "", volumeSpec)
markVolumeOpts := operationexecutor.MarkVolumeMountedOpts{
PodName: podName,
PodUID: pod.UID,
VolumeName: generatedVolumeName,
Mounter: mounter,
BlockVolumeMapper: mapper,
OuterVolumeSpecName: volumeSpec.Name(),
VolumeSpec: volumeSpec,
}
err = asw.AddPodToVolume(markVolumeOpts)
if err != nil {
t.Fatalf("AddPodToVolume failed. Expected: <no error> Actual: <%v>", err)
}

View File

@ -63,6 +63,7 @@ go_test(
"//pkg/volume/csimigration:go_default_library",
"//pkg/volume/testing:go_default_library",
"//pkg/volume/util:go_default_library",
"//pkg/volume/util/operationexecutor:go_default_library",
"//pkg/volume/util/types:go_default_library",
"//staging/src/k8s.io/api/core/v1:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/api/resource:go_default_library",

View File

@ -43,6 +43,7 @@ import (
"k8s.io/kubernetes/pkg/volume/csimigration"
volumetesting "k8s.io/kubernetes/pkg/volume/testing"
"k8s.io/kubernetes/pkg/volume/util"
"k8s.io/kubernetes/pkg/volume/util/operationexecutor"
"k8s.io/kubernetes/pkg/volume/util/types"
)
@ -854,8 +855,16 @@ func reconcileASW(asw cache.ActualStateOfWorld, dsw cache.DesiredStateOfWorld, t
if err != nil {
t.Fatalf("Unexpected error when MarkVolumeAsAttached: %v", err)
}
err = asw.MarkVolumeAsMounted(volumeToMount.PodName, volumeToMount.Pod.UID,
volumeToMount.VolumeName, nil, nil, volumeToMount.OuterVolumeSpecName, volumeToMount.VolumeGidValue, volumeToMount.VolumeSpec)
markVolumeOpts := operationexecutor.MarkVolumeMountedOpts{
PodName: volumeToMount.PodName,
PodUID: volumeToMount.Pod.UID,
VolumeName: volumeToMount.VolumeName,
OuterVolumeSpecName: volumeToMount.OuterVolumeSpecName,
VolumeGidVolume: volumeToMount.VolumeGidValue,
VolumeSpec: volumeToMount.VolumeSpec,
VolumeMountState: operationexecutor.VolumeMounted,
}
err = asw.MarkVolumeAsMounted(markVolumeOpts)
if err != nil {
t.Fatalf("Unexpected error when MarkVolumeAsMounted: %v", err)
}

View File

@ -164,9 +164,19 @@ func (rc *reconciler) reconcile() {
// referenced by a pod that was deleted and is now referenced by another
// pod is unmounted from the first pod before being mounted to the new
// pod.
rc.unmountVolumes()
// Next we mount required volumes. This function could also trigger
// detach if kubelet is responsible for detaching volumes.
rc.mountAttachVolumes()
// Ensure devices that should be detached/unmounted are detached/unmounted.
rc.unmountDetachDevices()
}
func (rc *reconciler) unmountVolumes() {
// Ensure volumes that should be unmounted are unmounted.
for _, mountedVolume := range rc.actualStateOfWorld.GetMountedVolumes() {
for _, mountedVolume := range rc.actualStateOfWorld.GetAllMountedVolumes() {
if !rc.desiredStateOfWorld.PodExistsInVolume(mountedVolume.PodName, mountedVolume.VolumeName) {
// Volume is mounted, unmount it
klog.V(5).Infof(mountedVolume.GenerateMsgDetailed("Starting operationExecutor.UnmountVolume", ""))
@ -184,7 +194,9 @@ func (rc *reconciler) reconcile() {
}
}
}
}
func (rc *reconciler) mountAttachVolumes() {
// Ensure volumes that should be attached/mounted are attached/mounted.
for _, volumeToMount := range rc.desiredStateOfWorld.GetVolumesToMount() {
volMounted, devicePath, err := rc.actualStateOfWorld.PodExistsInVolume(volumeToMount.PodName, volumeToMount.VolumeName)
@ -274,13 +286,14 @@ func (rc *reconciler) reconcile() {
}
}
}
}
// Ensure devices that should be detached/unmounted are detached/unmounted.
func (rc *reconciler) unmountDetachDevices() {
for _, attachedVolume := range rc.actualStateOfWorld.GetUnmountedVolumes() {
// Check IsOperationPending to avoid marking a volume as detached if it's in the process of mounting.
if !rc.desiredStateOfWorld.VolumeExists(attachedVolume.VolumeName) &&
!rc.operationExecutor.IsOperationPending(attachedVolume.VolumeName, nestedpendingoperations.EmptyUniquePodName) {
if attachedVolume.GloballyMounted {
if attachedVolume.DeviceMayBeMounted() {
// Volume is globally mounted to device, unmount it
klog.V(5).Infof(attachedVolume.GenerateMsgDetailed("Starting operationExecutor.UnmountDevice", ""))
err := rc.operationExecutor.UnmountDevice(
@ -625,15 +638,18 @@ func (rc *reconciler) updateStates(volumesNeedUpdate map[v1.UniqueVolumeName]*re
klog.Errorf("Could not add volume information to actual state of world: %v", err)
continue
}
err = rc.actualStateOfWorld.MarkVolumeAsMounted(
volume.podName,
types.UID(volume.podName),
volume.volumeName,
volume.mounter,
volume.blockVolumeMapper,
volume.outerVolumeSpecName,
volume.volumeGidValue,
volume.volumeSpec)
markVolumeOpts := operationexecutor.MarkVolumeMountedOpts{
PodName: volume.podName,
PodUID: types.UID(volume.podName),
VolumeName: volume.volumeName,
Mounter: volume.mounter,
BlockVolumeMapper: volume.blockVolumeMapper,
OuterVolumeSpecName: volume.outerVolumeSpecName,
VolumeGidVolume: volume.volumeGidValue,
VolumeSpec: volume.volumeSpec,
VolumeMountState: operationexecutor.VolumeMounted,
}
err = rc.actualStateOfWorld.MarkVolumeAsMounted(markVolumeOpts)
if err != nil {
klog.Errorf("Could not add pod to volume information to actual state of world: %v", err)
continue

View File

@ -430,6 +430,7 @@ func (vm *volumeManager) verifyVolumesMountedFunc(podName types.UniquePodName, e
// getUnmountedVolumes fetches the current list of mounted volumes from
// the actual state of the world, and uses it to process the list of
// expectedVolumes. It returns a list of unmounted volumes.
// The list also includes volume that may be mounted in uncertain state.
func (vm *volumeManager) getUnmountedVolumes(podName types.UniquePodName, expectedVolumes []string) []string {
mountedVolumes := sets.NewString()
for _, mountedVolume := range vm.actualStateOfWorld.GetMountedVolumesForPod(podName) {

View File

@ -160,11 +160,23 @@ func NewOperationExecutor(
}
}
type MarkVolumeMountedOpts struct {
PodName volumetypes.UniquePodName
PodUID types.UID
VolumeName v1.UniqueVolumeName
Mounter volume.Mounter
BlockVolumeMapper volume.BlockVolumeMapper
OuterVolumeSpecName string
VolumeGidVolume string
VolumeSpec *volume.Spec
VolumeMountState VolumeMountState
}
// ActualStateOfWorldMounterUpdater defines a set of operations updating the actual
// state of the world cache after successful mount/unmount.
type ActualStateOfWorldMounterUpdater interface {
// Marks the specified volume as mounted to the specified pod
MarkVolumeAsMounted(podName volumetypes.UniquePodName, podUID types.UID, volumeName v1.UniqueVolumeName, mounter volume.Mounter, blockVolumeMapper volume.BlockVolumeMapper, outerVolumeSpecName string, volumeGidValue string, volumeSpec *volume.Spec) error
MarkVolumeAsMounted(markVolumeOpts MarkVolumeMountedOpts) error
// Marks the specified volume as unmounted from the specified pod
MarkVolumeAsUnmounted(podName volumetypes.UniquePodName, volumeName v1.UniqueVolumeName) error
@ -172,6 +184,8 @@ type ActualStateOfWorldMounterUpdater interface {
// Marks the specified volume as having been globally mounted.
MarkDeviceAsMounted(volumeName v1.UniqueVolumeName, devicePath, deviceMountPath string) error
MarkDeviceAsUncertain(volumeName v1.UniqueVolumeName, devicePath, deviceMountPath string) error
// Marks the specified volume as having its global mount unmounted.
MarkDeviceAsUnmounted(volumeName v1.UniqueVolumeName) error
@ -354,6 +368,32 @@ type VolumeToMount struct {
DesiredSizeLimit *resource.Quantity
}
// DeviceMountState represents device mount state in a global path.
type DeviceMountState string
const (
// DeviceGloballymounted means device has been globally mounted successfully
DeviceGloballyMounted DeviceMountState = "DeviceGloballyMounted"
// Uncertain means device may not be mounted but a mount operation may be
// in-progress which can cause device mount to succeed.
DeviceMountUncertain DeviceMountState = "DeviceMountUncertain"
// DeviceNotMounted means device has not been mounted globally.
DeviceNotMounted DeviceMountState = "DeviceNotMounted"
)
// VolumeMountState represents volume mount state in a path local to the pod.
type VolumeMountState string
const (
VolumeMounted VolumeMountState = "VolumeMounted"
VolumeMountUncertain VolumeMountState = "VolumeMountUncertain"
VolumeNotMounted VolumeMountState = "VolumeNotMounted"
)
// GenerateMsgDetailed returns detailed msgs for volumes to mount
func (volume *VolumeToMount) GenerateMsgDetailed(prefixMsg, suffixMsg string) (detailedMsg string) {
detailedStr := fmt.Sprintf("(UniqueName: %q) pod %q (UID: %q)", volume.VolumeName, volume.Pod.Name, volume.Pod.UID)

View File

@ -648,15 +648,17 @@ func (og *operationGenerator) GenerateMountVolumeFunc(
}
// Update actual state of world
markVolMountedErr := actualStateOfWorld.MarkVolumeAsMounted(
volumeToMount.PodName,
volumeToMount.Pod.UID,
volumeToMount.VolumeName,
volumeMounter,
nil,
volumeToMount.OuterVolumeSpecName,
volumeToMount.VolumeGidValue,
volumeToMount.VolumeSpec)
markOpts := MarkVolumeMountedOpts{
PodName: volumeToMount.PodName,
PodUID: volumeToMount.Pod.UID,
VolumeName: volumeToMount.VolumeName,
Mounter: volumeMounter,
OuterVolumeSpecName: volumeToMount.OuterVolumeSpecName,
VolumeGidVolume: volumeToMount.VolumeGidValue,
VolumeSpec: volumeToMount.VolumeSpec,
VolumeMountState: VolumeMounted,
}
markVolMountedErr := actualStateOfWorld.MarkVolumeAsMounted(markOpts)
if markVolMountedErr != nil {
// On failure, return error. Caller will log and retry.
return volumeToMount.GenerateError("MountVolume.MarkVolumeAsMounted failed", markVolMountedErr)
@ -982,16 +984,18 @@ func (og *operationGenerator) GenerateMapVolumeFunc(
return volumeToMount.GenerateError("MapVolume.MarkVolumeAsMounted failed while expanding volume", resizeError)
}
// Update actual state of world
markVolMountedErr := actualStateOfWorld.MarkVolumeAsMounted(
volumeToMount.PodName,
volumeToMount.Pod.UID,
volumeToMount.VolumeName,
nil,
blockVolumeMapper,
volumeToMount.OuterVolumeSpecName,
volumeToMount.VolumeGidValue,
volumeToMount.VolumeSpec)
markVolumeOpts := MarkVolumeMountedOpts{
PodName: volumeToMount.PodName,
PodUID: volumeToMount.Pod.UID,
VolumeName: volumeToMount.VolumeName,
BlockVolumeMapper: blockVolumeMapper,
OuterVolumeSpecName: volumeToMount.OuterVolumeSpecName,
VolumeGidVolume: volumeToMount.VolumeGidValue,
VolumeSpec: volumeToMount.VolumeSpec,
VolumeMountState: VolumeMounted,
}
markVolMountedErr := actualStateOfWorld.MarkVolumeAsMounted(markVolumeOpts)
if markVolMountedErr != nil {
// On failure, return error. Caller will log and retry.
return volumeToMount.GenerateError("MapVolume.MarkVolumeAsMounted failed", markVolMountedErr)