Merge pull request #110670 from gnufied/fix-pod-deletion-terminating

Fix pod stuck in termination state when mount fails or gets skipped after kubelet restart
This commit is contained in:
Kubernetes Prow Robot 2022-07-27 06:31:29 -07:00 committed by GitHub
commit 9ad4c5c0a0
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 709 additions and 47 deletions

View File

@ -196,9 +196,10 @@ func NewActualStateOfWorld(
nodeName types.NodeName,
volumePluginMgr *volume.VolumePluginMgr) ActualStateOfWorld {
return &actualStateOfWorld{
nodeName: nodeName,
attachedVolumes: make(map[v1.UniqueVolumeName]attachedVolume),
volumePluginMgr: volumePluginMgr,
nodeName: nodeName,
attachedVolumes: make(map[v1.UniqueVolumeName]attachedVolume),
foundDuringReconstruction: make(map[v1.UniqueVolumeName]map[volumetypes.UniquePodName]types.UID),
volumePluginMgr: volumePluginMgr,
}
}
@ -227,6 +228,9 @@ type actualStateOfWorld struct {
// The key in this map is the name of the volume and the value is an object
// containing more information about the attached volume.
attachedVolumes map[v1.UniqueVolumeName]attachedVolume
// foundDuringReconstruction is a map of volumes which were discovered
// from kubelet root directory when kubelet was restarted.
foundDuringReconstruction map[v1.UniqueVolumeName]map[volumetypes.UniquePodName]types.UID
// volumePluginMgr is the volume plugin manager used to create volume
// plugin objects.
@ -346,6 +350,102 @@ func (asw *actualStateOfWorld) MarkVolumeAsDetached(
asw.DeleteVolume(volumeName)
}
func (asw *actualStateOfWorld) IsVolumeReconstructed(volumeName v1.UniqueVolumeName, podName volumetypes.UniquePodName) bool {
volumeState := asw.GetVolumeMountState(volumeName, podName)
// only uncertain volumes are reconstructed
if volumeState != operationexecutor.VolumeMountUncertain {
return false
}
asw.RLock()
defer asw.RUnlock()
podMap, ok := asw.foundDuringReconstruction[volumeName]
if !ok {
return false
}
_, foundPod := podMap[podName]
return foundPod
}
func (asw *actualStateOfWorld) CheckAndMarkVolumeAsUncertainViaReconstruction(opts operationexecutor.MarkVolumeOpts) (bool, error) {
asw.Lock()
defer asw.Unlock()
volumeObj, volumeExists := asw.attachedVolumes[opts.VolumeName]
if !volumeExists {
return false, nil
}
podObj, podExists := volumeObj.mountedPods[opts.PodName]
if podExists {
// if volume mount was uncertain we should keep trying to unmount the volume
if podObj.volumeMountStateForPod == operationexecutor.VolumeMountUncertain {
return false, nil
}
if podObj.volumeMountStateForPod == operationexecutor.VolumeMounted {
return false, nil
}
}
podName := opts.PodName
podUID := opts.PodUID
volumeName := opts.VolumeName
mounter := opts.Mounter
blockVolumeMapper := opts.BlockVolumeMapper
outerVolumeSpecName := opts.OuterVolumeSpecName
volumeGidValue := opts.VolumeGidVolume
volumeSpec := opts.VolumeSpec
podObj = mountedPod{
podName: podName,
podUID: podUID,
mounter: mounter,
blockVolumeMapper: blockVolumeMapper,
outerVolumeSpecName: outerVolumeSpecName,
volumeGidValue: volumeGidValue,
volumeSpec: volumeSpec,
remountRequired: false,
volumeMountStateForPod: operationexecutor.VolumeMountUncertain,
}
if mounter != nil {
// The mounter stored in the object may have old information,
// use the newest one.
podObj.mounter = mounter
}
asw.attachedVolumes[volumeName].mountedPods[podName] = podObj
podMap, ok := asw.foundDuringReconstruction[opts.VolumeName]
if !ok {
podMap = map[volumetypes.UniquePodName]types.UID{}
}
podMap[opts.PodName] = opts.PodUID
asw.foundDuringReconstruction[opts.VolumeName] = podMap
return true, nil
}
func (asw *actualStateOfWorld) CheckAndMarkDeviceUncertainViaReconstruction(volumeName v1.UniqueVolumeName, deviceMountPath string) bool {
asw.Lock()
defer asw.Unlock()
volumeObj, volumeExists := asw.attachedVolumes[volumeName]
// CheckAndMarkDeviceUncertainViaReconstruction requires volume to be marked as attached, so if
// volume does not exist in ASOW or is in any state other than DeviceNotMounted we should return
if !volumeExists || volumeObj.deviceMountState != operationexecutor.DeviceNotMounted {
return false
}
volumeObj.deviceMountState = operationexecutor.DeviceMountUncertain
// we are only changing deviceMountPath because devicePath at at this stage is
// determined from node object.
volumeObj.deviceMountPath = deviceMountPath
asw.attachedVolumes[volumeName] = volumeObj
return true
}
func (asw *actualStateOfWorld) MarkVolumeAsMounted(markVolumeOpts operationexecutor.MarkVolumeOpts) error {
return asw.AddPodToVolume(markVolumeOpts)
}
@ -535,6 +635,11 @@ func (asw *actualStateOfWorld) AddPodToVolume(markVolumeOpts operationexecutor.M
// If pod exists, reset remountRequired value
podObj.remountRequired = false
podObj.volumeMountStateForPod = markVolumeOpts.VolumeMountState
// if volume is mounted successfully, then it should be removed from foundDuringReconstruction map
if markVolumeOpts.VolumeMountState == operationexecutor.VolumeMounted {
delete(asw.foundDuringReconstruction[volumeName], podName)
}
if mounter != nil {
// The mounter stored in the object may have old information,
// use the newest one.
@ -641,6 +746,12 @@ func (asw *actualStateOfWorld) DeletePodFromVolume(
delete(asw.attachedVolumes[volumeName].mountedPods, podName)
}
// if there were reconstructed volumes, we should remove them
_, podExists = asw.foundDuringReconstruction[volumeName]
if podExists {
delete(asw.foundDuringReconstruction[volumeName], podName)
}
return nil
}
@ -661,6 +772,7 @@ func (asw *actualStateOfWorld) DeleteVolume(volumeName v1.UniqueVolumeName) erro
}
delete(asw.attachedVolumes, volumeName)
delete(asw.foundDuringReconstruction, volumeName)
return nil
}
@ -739,7 +851,6 @@ func (asw *actualStateOfWorld) PodRemovedFromVolume(
return false
}
}
return true
}

View File

@ -17,7 +17,9 @@ limitations under the License.
package cache
import (
"fmt"
"k8s.io/apimachinery/pkg/api/resource"
"k8s.io/apimachinery/pkg/types"
"testing"
"github.com/stretchr/testify/require"
@ -458,6 +460,151 @@ func Test_AddTwoPodsToVolume_Positive(t *testing.T) {
verifyVolumeMountedElsewhere(t, podName2, generatedVolumeName2, true /*expectedMountedElsewhere */, asw)
}
// Test if volumes that were recorded to be read from disk during reconstruction
// are handled correctly by the ASOW.
func TestActualStateOfWorld_FoundDuringReconstruction(t *testing.T) {
tests := []struct {
name string
opCallback func(asw ActualStateOfWorld, volumeOpts operationexecutor.MarkVolumeOpts) error
verifyCallback func(asw ActualStateOfWorld, volumeOpts operationexecutor.MarkVolumeOpts) error
}{
{
name: "marking volume mounted should remove volume from found during reconstruction",
opCallback: func(asw ActualStateOfWorld, volumeOpts operationexecutor.MarkVolumeOpts) error {
volumeOpts.VolumeMountState = operationexecutor.VolumeMounted
return asw.MarkVolumeAsMounted(volumeOpts)
},
verifyCallback: func(asw ActualStateOfWorld, volumeOpts operationexecutor.MarkVolumeOpts) error {
ok := asw.IsVolumeReconstructed(volumeOpts.VolumeName, volumeOpts.PodName)
if ok {
return fmt.Errorf("found unexpected volume in reconstructed volume list")
}
return nil
},
},
{
name: "removing volume from pod should remove volume from found during reconstruction",
opCallback: func(asw ActualStateOfWorld, volumeOpts operationexecutor.MarkVolumeOpts) error {
return asw.MarkVolumeAsUnmounted(volumeOpts.PodName, volumeOpts.VolumeName)
},
verifyCallback: func(asw ActualStateOfWorld, volumeOpts operationexecutor.MarkVolumeOpts) error {
ok := asw.IsVolumeReconstructed(volumeOpts.VolumeName, volumeOpts.PodName)
if ok {
return fmt.Errorf("found unexpected volume in reconstructed volume list")
}
return nil
},
},
{
name: "removing volume entirely from ASOW should remove volume from found during reconstruction",
opCallback: func(asw ActualStateOfWorld, volumeOpts operationexecutor.MarkVolumeOpts) error {
err := asw.MarkVolumeAsUnmounted(volumeOpts.PodName, volumeOpts.VolumeName)
if err != nil {
return err
}
asw.MarkVolumeAsDetached(volumeOpts.VolumeName, "")
return nil
},
verifyCallback: func(asw ActualStateOfWorld, volumeOpts operationexecutor.MarkVolumeOpts) error {
ok := asw.IsVolumeReconstructed(volumeOpts.VolumeName, volumeOpts.PodName)
if ok {
return fmt.Errorf("found unexpected volume in reconstructed volume list")
}
aswInstance, _ := asw.(*actualStateOfWorld)
_, found := aswInstance.foundDuringReconstruction[volumeOpts.VolumeName]
if found {
return fmt.Errorf("found unexpected volume in reconstructed map")
}
return nil
},
},
}
for _, tc := range tests {
t.Run(tc.name, func(t *testing.T) {
volumePluginMgr, plugin := volumetesting.GetTestKubeletVolumePluginMgr(t)
asw := NewActualStateOfWorld("mynode" /* nodeName */, volumePluginMgr)
devicePath := "fake/device/path"
pod1 := getTestPod("pod1", "pod1uid", "volume-name-1", "fake-device1")
volumeSpec1 := &volume.Spec{Volume: &pod1.Spec.Volumes[0]}
generatedVolumeName1, err := util.GetUniqueVolumeNameFromSpec(
plugin, volumeSpec1)
require.NoError(t, err)
err = asw.MarkVolumeAsAttached(generatedVolumeName1, volumeSpec1, "" /* nodeName */, devicePath)
if err != nil {
t.Fatalf("MarkVolumeAsAttached failed. Expected: <no error> Actual: <%v>", err)
}
podName1 := util.GetUniquePodName(pod1)
mounter1, err := plugin.NewMounter(volumeSpec1, pod1, volume.VolumeOptions{})
if err != nil {
t.Fatalf("NewMounter failed. Expected: <no error> Actual: <%v>", err)
}
mapper1, err := plugin.NewBlockVolumeMapper(volumeSpec1, pod1, volume.VolumeOptions{})
if err != nil {
t.Fatalf("NewBlockVolumeMapper failed. Expected: <no error> Actual: <%v>", err)
}
markVolumeOpts1 := operationexecutor.MarkVolumeOpts{
PodName: podName1,
PodUID: pod1.UID,
VolumeName: generatedVolumeName1,
Mounter: mounter1,
BlockVolumeMapper: mapper1,
OuterVolumeSpecName: volumeSpec1.Name(),
VolumeSpec: volumeSpec1,
VolumeMountState: operationexecutor.VolumeMountUncertain,
}
_, err = asw.CheckAndMarkVolumeAsUncertainViaReconstruction(markVolumeOpts1)
if err != nil {
t.Fatalf("AddPodToVolume failed. Expected: <no error> Actual: <%v>", err)
}
// make sure state is as we expect it to be
verifyVolumeExistsAsw(t, generatedVolumeName1, true /* shouldExist */, asw)
verifyVolumeDoesntExistInUnmountedVolumes(t, generatedVolumeName1, asw)
verifyVolumeDoesntExistInGloballyMountedVolumes(t, generatedVolumeName1, asw)
verifyVolumeExistsWithSpecNameInVolumeAsw(t, podName1, volumeSpec1.Name(), asw)
verifyVolumeSpecNameInVolumeAsw(t, podName1, []*volume.Spec{volumeSpec1}, asw)
verifyVolumeFoundInReconstruction(t, podName1, generatedVolumeName1, asw)
if tc.opCallback != nil {
err = tc.opCallback(asw, markVolumeOpts1)
if err != nil {
t.Fatalf("for test %s: %v", tc.name, err)
}
}
err = tc.verifyCallback(asw, markVolumeOpts1)
if err != nil {
t.Fatalf("for test %s verification failed: %v", tc.name, err)
}
})
}
}
func getTestPod(podName, podUID, outerVolumeName, pdName string) *v1.Pod {
pod := &v1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: podName,
UID: types.UID(podUID),
},
Spec: v1.PodSpec{
Volumes: []v1.Volume{
{
Name: outerVolumeName,
VolumeSource: v1.VolumeSource{
GCEPersistentDisk: &v1.GCEPersistentDiskVolumeSource{
PDName: pdName,
},
},
},
},
},
}
return pod
}
// Calls AddPodToVolume() to add pod to empty data struct
// Verifies call fails with "volume does not exist" error.
func Test_AddPodToVolume_Negative_VolumeDoesntExist(t *testing.T) {
@ -873,3 +1020,10 @@ func verifyVolumeSpecNameInVolumeAsw(
}
}
}
func verifyVolumeFoundInReconstruction(t *testing.T, podToCheck volumetypes.UniquePodName, volumeToCheck v1.UniqueVolumeName, asw ActualStateOfWorld) {
isRecontructed := asw.IsVolumeReconstructed(volumeToCheck, podToCheck)
if !isRecontructed {
t.Fatalf("ASW IsVolumeReconstructed result invalid. expected <true> Actual <false>")
}
}

View File

@ -255,7 +255,6 @@ func (dswp *desiredStateOfWorldPopulator) findAndRemoveDeletedPods() {
continue
}
klog.V(4).InfoS("Removing volume from desired state", "pod", klog.KObj(volumeToMount.Pod), "podUID", volumeToMount.Pod.UID, "volumeName", volumeToMountSpecName)
dswp.desiredStateOfWorld.DeletePodFromVolume(
volumeToMount.PodName, volumeToMount.VolumeName)
dswp.deleteProcessedPod(volumeToMount.PodName)

View File

@ -129,6 +129,7 @@ func NewReconciler(
operationExecutor: operationExecutor,
mounter: mounter,
hostutil: hostutil,
skippedDuringReconstruction: map[v1.UniqueVolumeName]*globalVolumeInfo{},
volumePluginMgr: volumePluginMgr,
kubeletPodsDir: kubeletPodsDir,
timeOfLastSync: time.Time{},
@ -148,6 +149,7 @@ type reconciler struct {
mounter mount.Interface
hostutil hostutil.HostUtils
volumePluginMgr *volumepkg.VolumePluginMgr
skippedDuringReconstruction map[v1.UniqueVolumeName]*globalVolumeInfo
kubeletPodsDir string
timeOfLastSync time.Time
}
@ -185,6 +187,13 @@ func (rc *reconciler) reconcile() {
// Ensure devices that should be detached/unmounted are detached/unmounted.
rc.unmountDetachDevices()
// After running the above operations if skippedDuringReconstruction is not empty
// then ensure that all volumes which were discovered and skipped during reconstruction
// are added to actualStateOfWorld in uncertain state.
if len(rc.skippedDuringReconstruction) > 0 {
rc.processReconstructedVolumes()
}
}
func (rc *reconciler) unmountVolumes() {
@ -259,6 +268,69 @@ func (rc *reconciler) mountAttachedVolumes(volumeToMount cache.VolumeToMount, po
}
}
// processReconstructedVolumes checks volumes which were skipped during the reconstruction
// process because it was assumed that since these volumes were present in DSOW they would get
// mounted correctly and make it into ASOW.
// But if mount operation fails for some reason then we still need to mark the volume as uncertain
// and wait for the next reconciliation loop to deal with it.
func (rc *reconciler) processReconstructedVolumes() {
for volumeName, glblVolumeInfo := range rc.skippedDuringReconstruction {
// check if volume is marked as attached to the node
// for now lets only process volumes which are at least known as attached to the node
// this should help with most volume types (including secret, configmap etc)
if !rc.actualStateOfWorld.VolumeExists(volumeName) {
klog.V(4).InfoS("Volume is not marked as attached to the node. Skipping processing of the volume", "volumeName", volumeName)
continue
}
uncertainVolumeCount := 0
// only delete volumes which were marked as attached here.
// This should ensure that - we will wait for volumes which were not marked as attached
// before adding them in uncertain state during reconstruction.
delete(rc.skippedDuringReconstruction, volumeName)
for podName, volume := range glblVolumeInfo.podVolumes {
markVolumeOpts := operationexecutor.MarkVolumeOpts{
PodName: volume.podName,
PodUID: types.UID(podName),
VolumeName: volume.volumeName,
Mounter: volume.mounter,
BlockVolumeMapper: volume.blockVolumeMapper,
OuterVolumeSpecName: volume.outerVolumeSpecName,
VolumeGidVolume: volume.volumeGidValue,
VolumeSpec: volume.volumeSpec,
VolumeMountState: operationexecutor.VolumeMountUncertain,
}
volumeAdded, err := rc.actualStateOfWorld.CheckAndMarkVolumeAsUncertainViaReconstruction(markVolumeOpts)
// if volume is not mounted then lets mark volume mounted in uncertain state in ASOW
if volumeAdded {
uncertainVolumeCount += 1
if err != nil {
klog.ErrorS(err, "Could not add pod to volume information to actual state of world", "pod", klog.KObj(volume.pod))
continue
}
klog.V(4).InfoS("Volume is marked as mounted in uncertain state and added to the actual state", "pod", klog.KObj(volume.pod), "podName", volume.podName, "volumeName", volume.volumeName)
}
}
if uncertainVolumeCount > 0 {
// If the volume has device to mount, we mark its device as uncertain
if glblVolumeInfo.deviceMounter != nil || glblVolumeInfo.blockVolumeMapper != nil {
deviceMountPath, err := getDeviceMountPath(glblVolumeInfo)
if err != nil {
klog.ErrorS(err, "Could not find device mount path for volume", "volumeName", glblVolumeInfo.volumeName)
continue
}
deviceMounted := rc.actualStateOfWorld.CheckAndMarkDeviceUncertainViaReconstruction(glblVolumeInfo.volumeName, deviceMountPath)
if !deviceMounted {
klog.V(3).InfoS("Could not mark device as mounted in uncertain state", "volumeName", glblVolumeInfo.volumeName)
}
}
}
}
}
func (rc *reconciler) waitForVolumeAttach(volumeToMount cache.VolumeToMount) {
if rc.controllerAttachDetachEnabled || !volumeToMount.PluginIsAttachable {
//// lets not spin a goroutine and unnecessarily trigger exponential backoff if this happens
@ -344,7 +416,7 @@ func (rc *reconciler) unmountDetachDevices() {
// it will try to clean up the mount paths with operation executor.
func (rc *reconciler) sync() {
defer rc.updateLastSyncTime()
rc.syncStates()
rc.syncStates(rc.kubeletPodsDir)
}
func (rc *reconciler) updateLastSyncTime() {
@ -376,19 +448,38 @@ type reconstructedVolume struct {
blockVolumeMapper volumepkg.BlockVolumeMapper
}
// globalVolumeInfo stores reconstructed volume information
// for each pod that was using that volume.
type globalVolumeInfo struct {
volumeName v1.UniqueVolumeName
volumeSpec *volumepkg.Spec
devicePath string
mounter volumepkg.Mounter
deviceMounter volumepkg.DeviceMounter
blockVolumeMapper volumepkg.BlockVolumeMapper
podVolumes map[volumetypes.UniquePodName]*reconstructedVolume
}
func (gvi *globalVolumeInfo) addPodVolume(rcv *reconstructedVolume) {
if gvi.podVolumes == nil {
gvi.podVolumes = map[volumetypes.UniquePodName]*reconstructedVolume{}
}
gvi.podVolumes[rcv.podName] = rcv
}
// syncStates scans the volume directories under the given pod directory.
// If the volume is not in desired state of world, this function will reconstruct
// the volume related information and put it in both the actual and desired state of worlds.
// For some volume plugins that cannot support reconstruction, it will clean up the existing
// mount points since the volume is no long needed (removed from desired state)
func (rc *reconciler) syncStates() {
func (rc *reconciler) syncStates(kubeletPodDir string) {
// Get volumes information by reading the pod's directory
podVolumes, err := getVolumesFromPodDir(rc.kubeletPodsDir)
podVolumes, err := getVolumesFromPodDir(kubeletPodDir)
if err != nil {
klog.ErrorS(err, "Cannot get volumes from disk, skip sync states for volume reconstruction")
return
}
volumesNeedUpdate := make(map[v1.UniqueVolumeName]*reconstructedVolume)
volumesNeedUpdate := make(map[v1.UniqueVolumeName]*globalVolumeInfo)
volumeNeedReport := []v1.UniqueVolumeName{}
for _, volume := range podVolumes {
if rc.actualStateOfWorld.VolumeExistsWithSpecName(volume.podName, volume.volumeSpecName) {
@ -411,6 +502,18 @@ func (rc *reconciler) syncStates() {
rc.cleanupMounts(volume)
continue
}
gvl := &globalVolumeInfo{
volumeName: reconstructedVolume.volumeName,
volumeSpec: reconstructedVolume.volumeSpec,
devicePath: reconstructedVolume.devicePath,
deviceMounter: reconstructedVolume.deviceMounter,
blockVolumeMapper: reconstructedVolume.blockVolumeMapper,
mounter: reconstructedVolume.mounter,
}
if cachedInfo, ok := volumesNeedUpdate[reconstructedVolume.volumeName]; ok {
gvl = cachedInfo
}
gvl.addPodVolume(reconstructedVolume)
if volumeInDSW {
// Some pod needs the volume. And it exists on disk. Some previous
// kubelet must have created the directory, therefore it must have
@ -418,6 +521,7 @@ func (rc *reconciler) syncStates() {
// this new kubelet so reconcile() calls SetUp and re-mounts the
// volume if it's necessary.
volumeNeedReport = append(volumeNeedReport, reconstructedVolume.volumeName)
rc.skippedDuringReconstruction[reconstructedVolume.volumeName] = gvl
klog.V(4).InfoS("Volume exists in desired state, marking as InUse", "podName", volume.podName, "volumeSpecName", volume.volumeSpecName)
continue
}
@ -426,7 +530,7 @@ func (rc *reconciler) syncStates() {
klog.InfoS("Volume is in pending operation, skip cleaning up mounts")
}
klog.V(2).InfoS("Reconciler sync states: could not find pod information in desired state, update it in actual state", "reconstructedVolume", reconstructedVolume)
volumesNeedUpdate[reconstructedVolume.volumeName] = reconstructedVolume
volumesNeedUpdate[reconstructedVolume.volumeName] = gvl
}
if len(volumesNeedUpdate) > 0 {
@ -600,7 +704,7 @@ func (rc *reconciler) reconstructVolume(volume podVolume) (*reconstructedVolume,
}
// updateDevicePath gets the node status to retrieve volume device path information.
func (rc *reconciler) updateDevicePath(volumesNeedUpdate map[v1.UniqueVolumeName]*reconstructedVolume) {
func (rc *reconciler) updateDevicePath(volumesNeedUpdate map[v1.UniqueVolumeName]*globalVolumeInfo) {
node, fetchErr := rc.kubeClient.CoreV1().Nodes().Get(context.TODO(), string(rc.nodeName), metav1.GetOptions{})
if fetchErr != nil {
klog.ErrorS(fetchErr, "UpdateStates in reconciler: could not get node status with error")
@ -618,19 +722,19 @@ func (rc *reconciler) updateDevicePath(volumesNeedUpdate map[v1.UniqueVolumeName
// getDeviceMountPath returns device mount path for block volume which
// implements BlockVolumeMapper or filesystem volume which implements
// DeviceMounter
func getDeviceMountPath(volume *reconstructedVolume) (string, error) {
if volume.blockVolumeMapper != nil {
// for block volume, we return its global map path
return volume.blockVolumeMapper.GetGlobalMapPath(volume.volumeSpec)
} else if volume.deviceMounter != nil {
// for filesystem volume, we return its device mount path if the plugin implements DeviceMounter
return volume.deviceMounter.GetDeviceMountPath(volume.volumeSpec)
func getDeviceMountPath(gvi *globalVolumeInfo) (string, error) {
if gvi.blockVolumeMapper != nil {
// for block gvi, we return its global map path
return gvi.blockVolumeMapper.GetGlobalMapPath(gvi.volumeSpec)
} else if gvi.deviceMounter != nil {
// for filesystem gvi, we return its device mount path if the plugin implements DeviceMounter
return gvi.deviceMounter.GetDeviceMountPath(gvi.volumeSpec)
} else {
return "", fmt.Errorf("blockVolumeMapper or deviceMounter required")
}
}
func (rc *reconciler) updateStates(volumesNeedUpdate map[v1.UniqueVolumeName]*reconstructedVolume) error {
func (rc *reconciler) updateStates(volumesNeedUpdate map[v1.UniqueVolumeName]*globalVolumeInfo) error {
// Get the node status to retrieve volume device path information.
// Skip reporting devicePath in node objects if kubeClient is nil.
// In standalone mode, kubelet is not expected to mount any attachable volume types or secret, configmaps etc.
@ -638,49 +742,56 @@ func (rc *reconciler) updateStates(volumesNeedUpdate map[v1.UniqueVolumeName]*re
rc.updateDevicePath(volumesNeedUpdate)
}
for _, volume := range volumesNeedUpdate {
for _, gvl := range volumesNeedUpdate {
err := rc.actualStateOfWorld.MarkVolumeAsAttached(
//TODO: the devicePath might not be correct for some volume plugins: see issue #54108
volume.volumeName, volume.volumeSpec, "" /* nodeName */, volume.devicePath)
gvl.volumeName, gvl.volumeSpec, "" /* nodeName */, gvl.devicePath)
if err != nil {
klog.ErrorS(err, "Could not add volume information to actual state of world", "pod", klog.KObj(volume.pod))
klog.ErrorS(err, "Could not add volume information to actual state of world", "volumeName", gvl.volumeName)
continue
}
markVolumeOpts := operationexecutor.MarkVolumeOpts{
PodName: volume.podName,
PodUID: types.UID(volume.podName),
VolumeName: volume.volumeName,
Mounter: volume.mounter,
BlockVolumeMapper: volume.blockVolumeMapper,
OuterVolumeSpecName: volume.outerVolumeSpecName,
VolumeGidVolume: volume.volumeGidValue,
VolumeSpec: volume.volumeSpec,
VolumeMountState: operationexecutor.VolumeMounted,
for _, volume := range gvl.podVolumes {
err = rc.markVolumeState(volume, operationexecutor.VolumeMounted)
if err != nil {
klog.ErrorS(err, "Could not add pod to volume information to actual state of world", "pod", klog.KObj(volume.pod))
continue
}
klog.V(4).InfoS("Volume is marked as mounted and added into the actual state", "pod", klog.KObj(volume.pod), "podName", volume.podName, "volumeName", volume.volumeName)
}
err = rc.actualStateOfWorld.MarkVolumeAsMounted(markVolumeOpts)
if err != nil {
klog.ErrorS(err, "Could not add pod to volume information to actual state of world", "pod", klog.KObj(volume.pod))
continue
}
klog.V(4).InfoS("Volume is marked as mounted and added into the actual state", "pod", klog.KObj(volume.pod), "podName", volume.podName, "volumeName", volume.volumeName)
// If the volume has device to mount, we mark its device as mounted.
if volume.deviceMounter != nil || volume.blockVolumeMapper != nil {
deviceMountPath, err := getDeviceMountPath(volume)
if gvl.deviceMounter != nil || gvl.blockVolumeMapper != nil {
deviceMountPath, err := getDeviceMountPath(gvl)
if err != nil {
klog.ErrorS(err, "Could not find device mount path for volume", "volumeName", volume.volumeName, "pod", klog.KObj(volume.pod))
klog.ErrorS(err, "Could not find device mount path for volume", "volumeName", gvl.volumeName)
continue
}
err = rc.actualStateOfWorld.MarkDeviceAsMounted(volume.volumeName, volume.devicePath, deviceMountPath)
err = rc.actualStateOfWorld.MarkDeviceAsMounted(gvl.volumeName, gvl.devicePath, deviceMountPath)
if err != nil {
klog.ErrorS(err, "Could not mark device is mounted to actual state of world", "pod", klog.KObj(volume.pod))
klog.ErrorS(err, "Could not mark device is mounted to actual state of world", "volume", gvl.volumeName)
continue
}
klog.V(4).InfoS("Volume is marked device as mounted and added into the actual state", "pod", klog.KObj(volume.pod), "podName", volume.podName, "volumeName", volume.volumeName)
klog.V(4).InfoS("Volume is marked device as mounted and added into the actual state", "volumeName", gvl.volumeName)
}
}
return nil
}
func (rc *reconciler) markVolumeState(volume *reconstructedVolume, volumeState operationexecutor.VolumeMountState) error {
markVolumeOpts := operationexecutor.MarkVolumeOpts{
PodName: volume.podName,
PodUID: types.UID(volume.podName),
VolumeName: volume.volumeName,
Mounter: volume.mounter,
BlockVolumeMapper: volume.blockVolumeMapper,
OuterVolumeSpecName: volume.outerVolumeSpecName,
VolumeGidVolume: volume.volumeGidValue,
VolumeSpec: volume.volumeSpec,
VolumeMountState: volumeState,
}
err := rc.actualStateOfWorld.MarkVolumeAsMounted(markVolumeOpts)
return err
}
// getVolumesFromPodDir scans through the volumes directories under the given pod directory.
// It returns a list of pod volume information including pod's uid, volume's plugin name, mount path,
// and volume spec name.

View File

@ -19,11 +19,15 @@ package reconciler
import (
"crypto/md5"
"fmt"
csitrans "k8s.io/csi-translation-lib"
"k8s.io/kubernetes/pkg/volume/csimigration"
"os"
"path"
"path/filepath"
"testing"
"time"
csitrans "k8s.io/csi-translation-lib"
"k8s.io/kubernetes/pkg/volume/csimigration"
"github.com/stretchr/testify/assert"
"k8s.io/mount-utils"
@ -2181,3 +2185,249 @@ func Test_Run_Positive_VolumeMountControllerAttachEnabledRace(t *testing.T) {
<-finished
waitForMount(t, fakePlugin, generatedVolumeName, asw)
}
func getFakeNode() *v1.Node {
return &v1.Node{
ObjectMeta: metav1.ObjectMeta{
Name: string(nodeName),
},
Status: v1.NodeStatus{
VolumesAttached: []v1.AttachedVolume{
{
Name: "fake-plugin/fake-device1",
DevicePath: "/fake/path",
},
},
},
}
}
func getInlineFakePod(podName, podUUID, outerName, innerName string) *v1.Pod {
pod := &v1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: podName,
UID: k8stypes.UID(podUUID),
},
Spec: v1.PodSpec{
Volumes: []v1.Volume{
{
Name: outerName,
VolumeSource: v1.VolumeSource{
GCEPersistentDisk: &v1.GCEPersistentDiskVolumeSource{
PDName: innerName,
},
},
},
},
},
}
return pod
}
func getReconciler(kubeletDir string, t *testing.T, volumePaths []string) (Reconciler, *volumetesting.FakeVolumePlugin) {
node := getFakeNode()
volumePluginMgr, fakePlugin := volumetesting.GetTestKubeletVolumePluginMgrWithNodeAndRoot(t, node, kubeletDir)
tmpKubeletPodDir := filepath.Join(kubeletDir, "pods")
dsw := cache.NewDesiredStateOfWorld(volumePluginMgr)
asw := cache.NewActualStateOfWorld(nodeName, volumePluginMgr)
kubeClient := createTestClient()
fakeRecorder := &record.FakeRecorder{}
fakeHandler := volumetesting.NewBlockVolumePathHandler()
oex := operationexecutor.NewOperationExecutor(operationexecutor.NewOperationGenerator(
kubeClient,
volumePluginMgr,
fakeRecorder,
fakeHandler))
mountPoints := []mount.MountPoint{}
for _, volumePath := range volumePaths {
mountPoints = append(mountPoints, mount.MountPoint{Path: volumePath})
}
rc := NewReconciler(
kubeClient,
true, /* controllerAttachDetachEnabled */
reconcilerLoopSleepDuration,
waitForAttachTimeout,
nodeName,
dsw,
asw,
hasAddedPods,
oex,
mount.NewFakeMounter(mountPoints),
hostutil.NewFakeHostUtil(nil),
volumePluginMgr,
tmpKubeletPodDir)
return rc, fakePlugin
}
func TestSyncStates(t *testing.T) {
type podInfo struct {
podName string
podUID string
outerVolumeName string
innerVolumeName string
}
defaultPodInfo := podInfo{
podName: "pod1",
podUID: "pod1uid",
outerVolumeName: "volume-name",
innerVolumeName: "volume-name",
}
tests := []struct {
name string
volumePaths []string
createMountPoint bool
podInfos []podInfo
postSyncStatCallback func(rcInstance *reconciler, fakePlugin *volumetesting.FakeVolumePlugin) error
verifyFunc func(rcInstance *reconciler, fakePlugin *volumetesting.FakeVolumePlugin) error
}{
{
name: "when two pods are using same volume and both are deleted",
volumePaths: []string{
path.Join("pod1", "volumes", "fake-plugin", "pvc-abcdef"),
path.Join("pod2", "volumes", "fake-plugin", "pvc-abcdef"),
},
createMountPoint: true,
podInfos: []podInfo{},
verifyFunc: func(rcInstance *reconciler, fakePlugin *volumetesting.FakeVolumePlugin) error {
mountedPods := rcInstance.actualStateOfWorld.GetMountedVolumes()
if len(mountedPods) != 2 {
return fmt.Errorf("expected 2 pods to in asw got %d", len(mountedPods))
}
return nil
},
},
{
name: "when two pods are using same volume and one of them is deleted",
volumePaths: []string{
path.Join("pod1uid", "volumes", "fake-plugin", "volume-name"),
path.Join("pod2uid", "volumes", "fake-plugin", "volume-name"),
},
createMountPoint: true,
podInfos: []podInfo{defaultPodInfo},
verifyFunc: func(rcInstance *reconciler, fakePlugin *volumetesting.FakeVolumePlugin) error {
// for pod that is deleted, volume is considered as mounted
mountedPods := rcInstance.actualStateOfWorld.GetMountedVolumes()
if len(mountedPods) != 1 {
return fmt.Errorf("expected 1 pods to in asw got %d", len(mountedPods))
}
if types.UniquePodName("pod2uid") != mountedPods[0].PodName {
return fmt.Errorf("expected mounted pod to be %s got %s", "pod2uid", mountedPods[0].PodName)
}
return nil
},
},
{
name: "when reconstruction fails for a volume, volumes should be cleaned up",
volumePaths: []string{
path.Join("pod1", "volumes", "fake-plugin", "pvc-abcdef"),
},
createMountPoint: false,
podInfos: []podInfo{},
verifyFunc: func(rcInstance *reconciler, fakePlugin *volumetesting.FakeVolumePlugin) error {
return retryWithExponentialBackOff(reconcilerSyncWaitDuration, func() (bool, error) {
err := volumetesting.VerifyTearDownCallCount(1, fakePlugin)
if err != nil {
return false, nil
}
return true, nil
})
},
},
{
name: "when volume exists in dsow, volume should be recorded in skipped during reconstruction",
volumePaths: []string{
path.Join("pod1uid", "volumes", "fake-plugin", "volume-name"),
},
createMountPoint: true,
podInfos: []podInfo{defaultPodInfo},
postSyncStatCallback: func(rcInstance *reconciler, fakePlugin *volumetesting.FakeVolumePlugin) error {
skippedVolumes := rcInstance.skippedDuringReconstruction
if len(skippedVolumes) != 1 {
return fmt.Errorf("expected 1 pods to in skippedDuringReconstruction got %d", len(skippedVolumes))
}
rcInstance.processReconstructedVolumes()
return nil
},
verifyFunc: func(rcInstance *reconciler, fakePlugin *volumetesting.FakeVolumePlugin) error {
mountedPods := rcInstance.actualStateOfWorld.GetAllMountedVolumes()
if len(mountedPods) != 1 {
return fmt.Errorf("expected 1 pods to in mounted volume list got %d", len(mountedPods))
}
mountedPodVolume := mountedPods[0]
addedViaReconstruction := rcInstance.actualStateOfWorld.IsVolumeReconstructed(mountedPodVolume.VolumeName, mountedPodVolume.PodName)
if !addedViaReconstruction {
return fmt.Errorf("expected volume %s to be marked as added via reconstruction", mountedPodVolume.VolumeName)
}
// check device mount state
attachedVolumes := rcInstance.actualStateOfWorld.GetAttachedVolumes()
if len(attachedVolumes) != 1 {
return fmt.Errorf("expected 1 volume to be unmounted, got %d", len(attachedVolumes))
}
firstAttachedVolume := attachedVolumes[0]
if !firstAttachedVolume.DeviceMayBeMounted() {
return fmt.Errorf("expected %s volume to be mounted in uncertain state", firstAttachedVolume.VolumeName)
}
// also skippedVolumes map should be empty
skippedVolumes := rcInstance.skippedDuringReconstruction
if len(skippedVolumes) > 0 {
return fmt.Errorf("expected 0 pods in skipped volumes found %d", len(skippedVolumes))
}
return nil
},
},
}
for _, tc := range tests {
t.Run(tc.name, func(t *testing.T) {
tmpKubeletDir, err := os.MkdirTemp("", "")
if err != nil {
t.Fatalf("can't make a temp directory for kubeletPods: %v", err)
}
defer os.RemoveAll(tmpKubeletDir)
// create kubelet pod directory
tmpKubeletPodDir := filepath.Join(tmpKubeletDir, "pods")
os.MkdirAll(tmpKubeletPodDir, 0755)
mountPaths := []string{}
// create pod and volume directories so as reconciler can find them.
for _, volumePath := range tc.volumePaths {
vp := filepath.Join(tmpKubeletPodDir, volumePath)
if tc.createMountPoint {
mountPaths = append(mountPaths, vp)
}
os.MkdirAll(vp, 0755)
}
rc, fakePlugin := getReconciler(tmpKubeletDir, t, mountPaths)
rcInstance, _ := rc.(*reconciler)
for _, tpodInfo := range tc.podInfos {
pod := getInlineFakePod(tpodInfo.podName, tpodInfo.podUID, tpodInfo.outerVolumeName, tpodInfo.innerVolumeName)
volumeSpec := &volume.Spec{Volume: &pod.Spec.Volumes[0]}
podName := util.GetUniquePodName(pod)
volumeName, err := rcInstance.desiredStateOfWorld.AddPodToVolume(
podName, pod, volumeSpec, volumeSpec.Name(), "" /* volumeGidValue */)
if err != nil {
t.Fatalf("error adding volume %s to dsow: %v", volumeSpec.Name(), err)
}
rcInstance.actualStateOfWorld.MarkVolumeAsAttached(volumeName, volumeSpec, nodeName, "")
}
rcInstance.syncStates(tmpKubeletPodDir)
if tc.postSyncStatCallback != nil {
err := tc.postSyncStatCallback(rcInstance, fakePlugin)
if err != nil {
t.Errorf("test %s, postSyncStatCallback failed: %v", tc.name, err)
}
}
if err := tc.verifyFunc(rcInstance, fakePlugin); err != nil {
t.Errorf("test %s failed: %v", tc.name, err)
}
})
}
}

View File

@ -1661,6 +1661,19 @@ func GetTestKubeletVolumePluginMgrWithNode(t *testing.T, node *v1.Node) (*volume
return v.GetPluginMgr(), plugins[0].(*FakeVolumePlugin)
}
func GetTestKubeletVolumePluginMgrWithNodeAndRoot(t *testing.T, node *v1.Node, rootDir string) (*volume.VolumePluginMgr, *FakeVolumePlugin) {
plugins := ProbeVolumePlugins(volume.VolumeConfig{})
v := NewFakeKubeletVolumeHost(
t,
rootDir, /* rootDir */
nil, /* kubeClient */
plugins, /* plugins */
)
v.WithNode(node)
return v.GetPluginMgr(), plugins[0].(*FakeVolumePlugin)
}
// CreateTestPVC returns a provisionable PVC for tests
func CreateTestPVC(capacity string, accessModes []v1.PersistentVolumeAccessMode) *v1.PersistentVolumeClaim {
claim := v1.PersistentVolumeClaim{

View File

@ -215,6 +215,23 @@ type ActualStateOfWorldMounterUpdater interface {
// MarkForInUseExpansionError marks the volume to have in-use error during expansion.
// volume expansion must not be retried for this volume
MarkForInUseExpansionError(volumeName v1.UniqueVolumeName)
// CheckAndMarkVolumeAsUncertainViaReconstruction only adds volume to actual state of the world
// if volume was not already there. This avoid overwriting in any previously stored
// state. It returns error if there was an error adding the volume to ASOW.
// It returns true, if this operation resulted in volume being added to ASOW
// otherwise it returns false.
CheckAndMarkVolumeAsUncertainViaReconstruction(opts MarkVolumeOpts) (bool, error)
// CheckAndMarkDeviceUncertainViaReconstruction only adds device to actual state of the world
// if device was not already there. This avoids overwriting in any previously stored
// state. We only supply deviceMountPath because devicePath is already determined from
// VerifyControllerAttachedVolume function.
CheckAndMarkDeviceUncertainViaReconstruction(volumeName v1.UniqueVolumeName, deviceMountPath string) bool
// IsVolumeReconstructed returns true if volume currently added to actual state of the world
// was found during reconstruction.
IsVolumeReconstructed(volumeName v1.UniqueVolumeName, podName volumetypes.UniquePodName) bool
}
// ActualStateOfWorldAttacherUpdater defines a set of operations updating the

View File

@ -785,11 +785,19 @@ func (og *operationGenerator) markDeviceErrorState(volumeToMount VolumeToMount,
func (og *operationGenerator) markVolumeErrorState(volumeToMount VolumeToMount, markOpts MarkVolumeOpts, mountError error, actualStateOfWorld ActualStateOfWorldMounterUpdater) {
if volumetypes.IsOperationFinishedError(mountError) &&
actualStateOfWorld.GetVolumeMountState(volumeToMount.VolumeName, markOpts.PodName) == VolumeMountUncertain {
// if volume was previously reconstructed we are not going to change its state as unmounted even
// if mount operation fails.
if actualStateOfWorld.IsVolumeReconstructed(volumeToMount.VolumeName, volumeToMount.PodName) {
klog.V(3).InfoS("MountVolume.markVolumeErrorState leaving volume uncertain", "volumeName", volumeToMount.VolumeName)
return
}
t := actualStateOfWorld.MarkVolumeAsUnmounted(volumeToMount.PodName, volumeToMount.VolumeName)
if t != nil {
klog.Errorf(volumeToMount.GenerateErrorDetailed("MountVolume.MarkVolumeAsUnmounted failed", t).Error())
}
return
}
if volumetypes.IsUncertainProgressError(mountError) &&
@ -799,7 +807,6 @@ func (og *operationGenerator) markVolumeErrorState(volumeToMount VolumeToMount,
klog.Errorf(volumeToMount.GenerateErrorDetailed("MountVolume.MarkVolumeMountAsUncertain failed", t).Error())
}
}
}
func (og *operationGenerator) GenerateUnmountVolumeFunc(