Keep track of each pod that uses a volume during reconstruction

Add tests for volume cleaning up
This commit is contained in:
Hemant Kumar 2022-06-02 12:26:58 -04:00
parent 42786afae0
commit c8b85fb470
4 changed files with 211 additions and 40 deletions

View File

@ -251,7 +251,6 @@ func (dswp *desiredStateOfWorldPopulator) findAndRemoveDeletedPods() {
continue continue
} }
klog.V(4).InfoS("Removing volume from desired state", "pod", klog.KObj(volumeToMount.Pod), "podUID", volumeToMount.Pod.UID, "volumeName", volumeToMountSpecName) klog.V(4).InfoS("Removing volume from desired state", "pod", klog.KObj(volumeToMount.Pod), "podUID", volumeToMount.Pod.UID, "volumeName", volumeToMountSpecName)
dswp.desiredStateOfWorld.DeletePodFromVolume( dswp.desiredStateOfWorld.DeletePodFromVolume(
volumeToMount.PodName, volumeToMount.VolumeName) volumeToMount.PodName, volumeToMount.VolumeName)
dswp.deleteProcessedPod(volumeToMount.PodName) dswp.deleteProcessedPod(volumeToMount.PodName)

View File

@ -334,7 +334,7 @@ func (rc *reconciler) unmountDetachDevices() {
// it will try to clean up the mount paths with operation executor. // it will try to clean up the mount paths with operation executor.
func (rc *reconciler) sync() { func (rc *reconciler) sync() {
defer rc.updateLastSyncTime() defer rc.updateLastSyncTime()
rc.syncStates() rc.syncStates(rc.kubeletPodsDir)
} }
func (rc *reconciler) updateLastSyncTime() { func (rc *reconciler) updateLastSyncTime() {
@ -366,19 +366,36 @@ type reconstructedVolume struct {
blockVolumeMapper volumepkg.BlockVolumeMapper blockVolumeMapper volumepkg.BlockVolumeMapper
} }
type globalVolumeInfo struct {
volumeName v1.UniqueVolumeName
volumeSpec *volumepkg.Spec
devicePath string
mounter volumepkg.Mounter
deviceMounter volumepkg.DeviceMounter
blockVolumeMapper volumepkg.BlockVolumeMapper
podVolumes map[volumetypes.UniquePodName]*reconstructedVolume
}
func (gvi *globalVolumeInfo) addPodVolume(rcv *reconstructedVolume) {
if gvi.podVolumes == nil {
gvi.podVolumes = map[volumetypes.UniquePodName]*reconstructedVolume{}
}
gvi.podVolumes[rcv.podName] = rcv
}
// syncStates scans the volume directories under the given pod directory. // syncStates scans the volume directories under the given pod directory.
// If the volume is not in desired state of world, this function will reconstruct // If the volume is not in desired state of world, this function will reconstruct
// the volume related information and put it in both the actual and desired state of worlds. // the volume related information and put it in both the actual and desired state of worlds.
// For some volume plugins that cannot support reconstruction, it will clean up the existing // For some volume plugins that cannot support reconstruction, it will clean up the existing
// mount points since the volume is no long needed (removed from desired state) // mount points since the volume is no long needed (removed from desired state)
func (rc *reconciler) syncStates() { func (rc *reconciler) syncStates(kubeletPodDir string) {
// Get volumes information by reading the pod's directory // Get volumes information by reading the pod's directory
podVolumes, err := getVolumesFromPodDir(rc.kubeletPodsDir) podVolumes, err := getVolumesFromPodDir(kubeletPodDir)
if err != nil { if err != nil {
klog.ErrorS(err, "Cannot get volumes from disk, skip sync states for volume reconstruction") klog.ErrorS(err, "Cannot get volumes from disk, skip sync states for volume reconstruction")
return return
} }
volumesNeedUpdate := make(map[v1.UniqueVolumeName]*reconstructedVolume) volumesNeedUpdate := make(map[v1.UniqueVolumeName]*globalVolumeInfo)
volumeNeedReport := []v1.UniqueVolumeName{} volumeNeedReport := []v1.UniqueVolumeName{}
for _, volume := range podVolumes { for _, volume := range podVolumes {
if rc.actualStateOfWorld.VolumeExistsWithSpecName(volume.podName, volume.volumeSpecName) { if rc.actualStateOfWorld.VolumeExistsWithSpecName(volume.podName, volume.volumeSpecName) {
@ -416,7 +433,19 @@ func (rc *reconciler) syncStates() {
klog.InfoS("Volume is in pending operation, skip cleaning up mounts") klog.InfoS("Volume is in pending operation, skip cleaning up mounts")
} }
klog.V(2).InfoS("Reconciler sync states: could not find pod information in desired state, update it in actual state", "reconstructedVolume", reconstructedVolume) klog.V(2).InfoS("Reconciler sync states: could not find pod information in desired state, update it in actual state", "reconstructedVolume", reconstructedVolume)
volumesNeedUpdate[reconstructedVolume.volumeName] = reconstructedVolume gvl := &globalVolumeInfo{
volumeName: reconstructedVolume.volumeName,
volumeSpec: reconstructedVolume.volumeSpec,
devicePath: reconstructedVolume.devicePath,
deviceMounter: reconstructedVolume.deviceMounter,
blockVolumeMapper: reconstructedVolume.blockVolumeMapper,
mounter: reconstructedVolume.mounter,
}
if cachedInfo, ok := volumesNeedUpdate[reconstructedVolume.volumeName]; ok {
gvl = cachedInfo
}
gvl.addPodVolume(reconstructedVolume)
volumesNeedUpdate[reconstructedVolume.volumeName] = gvl
} }
if len(volumesNeedUpdate) > 0 { if len(volumesNeedUpdate) > 0 {
@ -590,7 +619,7 @@ func (rc *reconciler) reconstructVolume(volume podVolume) (*reconstructedVolume,
} }
// updateDevicePath gets the node status to retrieve volume device path information. // updateDevicePath gets the node status to retrieve volume device path information.
func (rc *reconciler) updateDevicePath(volumesNeedUpdate map[v1.UniqueVolumeName]*reconstructedVolume) { func (rc *reconciler) updateDevicePath(volumesNeedUpdate map[v1.UniqueVolumeName]*globalVolumeInfo) {
node, fetchErr := rc.kubeClient.CoreV1().Nodes().Get(context.TODO(), string(rc.nodeName), metav1.GetOptions{}) node, fetchErr := rc.kubeClient.CoreV1().Nodes().Get(context.TODO(), string(rc.nodeName), metav1.GetOptions{})
if fetchErr != nil { if fetchErr != nil {
klog.ErrorS(fetchErr, "UpdateStates in reconciler: could not get node status with error") klog.ErrorS(fetchErr, "UpdateStates in reconciler: could not get node status with error")
@ -608,19 +637,19 @@ func (rc *reconciler) updateDevicePath(volumesNeedUpdate map[v1.UniqueVolumeName
// getDeviceMountPath returns device mount path for block volume which // getDeviceMountPath returns device mount path for block volume which
// implements BlockVolumeMapper or filesystem volume which implements // implements BlockVolumeMapper or filesystem volume which implements
// DeviceMounter // DeviceMounter
func getDeviceMountPath(volume *reconstructedVolume) (string, error) { func getDeviceMountPath(gvi *globalVolumeInfo) (string, error) {
if volume.blockVolumeMapper != nil { if gvi.blockVolumeMapper != nil {
// for block volume, we return its global map path // for block gvi, we return its global map path
return volume.blockVolumeMapper.GetGlobalMapPath(volume.volumeSpec) return gvi.blockVolumeMapper.GetGlobalMapPath(gvi.volumeSpec)
} else if volume.deviceMounter != nil { } else if gvi.deviceMounter != nil {
// for filesystem volume, we return its device mount path if the plugin implements DeviceMounter // for filesystem gvi, we return its device mount path if the plugin implements DeviceMounter
return volume.deviceMounter.GetDeviceMountPath(volume.volumeSpec) return gvi.deviceMounter.GetDeviceMountPath(gvi.volumeSpec)
} else { } else {
return "", fmt.Errorf("blockVolumeMapper or deviceMounter required") return "", fmt.Errorf("blockVolumeMapper or deviceMounter required")
} }
} }
func (rc *reconciler) updateStates(volumesNeedUpdate map[v1.UniqueVolumeName]*reconstructedVolume) error { func (rc *reconciler) updateStates(volumesNeedUpdate map[v1.UniqueVolumeName]*globalVolumeInfo) error {
// Get the node status to retrieve volume device path information. // Get the node status to retrieve volume device path information.
// Skip reporting devicePath in node objects if kubeClient is nil. // Skip reporting devicePath in node objects if kubeClient is nil.
// In standalone mode, kubelet is not expected to mount any attachable volume types or secret, configmaps etc. // In standalone mode, kubelet is not expected to mount any attachable volume types or secret, configmaps etc.
@ -628,14 +657,15 @@ func (rc *reconciler) updateStates(volumesNeedUpdate map[v1.UniqueVolumeName]*re
rc.updateDevicePath(volumesNeedUpdate) rc.updateDevicePath(volumesNeedUpdate)
} }
for _, volume := range volumesNeedUpdate { for _, gvl := range volumesNeedUpdate {
err := rc.actualStateOfWorld.MarkVolumeAsAttached( err := rc.actualStateOfWorld.MarkVolumeAsAttached(
//TODO: the devicePath might not be correct for some volume plugins: see issue #54108 //TODO: the devicePath might not be correct for some volume plugins: see issue #54108
volume.volumeName, volume.volumeSpec, "" /* nodeName */, volume.devicePath) gvl.volumeName, gvl.volumeSpec, "" /* nodeName */, gvl.devicePath)
if err != nil { if err != nil {
klog.ErrorS(err, "Could not add volume information to actual state of world", "pod", klog.KObj(volume.pod)) klog.ErrorS(err, "Could not add volume information to actual state of world", "volumeName", gvl.volumeName)
continue continue
} }
for _, volume := range gvl.podVolumes {
markVolumeOpts := operationexecutor.MarkVolumeOpts{ markVolumeOpts := operationexecutor.MarkVolumeOpts{
PodName: volume.podName, PodName: volume.podName,
PodUID: types.UID(volume.podName), PodUID: types.UID(volume.podName),
@ -653,19 +683,20 @@ func (rc *reconciler) updateStates(volumesNeedUpdate map[v1.UniqueVolumeName]*re
continue continue
} }
klog.V(4).InfoS("Volume is marked as mounted and added into the actual state", "pod", klog.KObj(volume.pod), "podName", volume.podName, "volumeName", volume.volumeName) klog.V(4).InfoS("Volume is marked as mounted and added into the actual state", "pod", klog.KObj(volume.pod), "podName", volume.podName, "volumeName", volume.volumeName)
}
// If the volume has device to mount, we mark its device as mounted. // If the volume has device to mount, we mark its device as mounted.
if volume.deviceMounter != nil || volume.blockVolumeMapper != nil { if gvl.deviceMounter != nil || gvl.blockVolumeMapper != nil {
deviceMountPath, err := getDeviceMountPath(volume) deviceMountPath, err := getDeviceMountPath(gvl)
if err != nil { if err != nil {
klog.ErrorS(err, "Could not find device mount path for volume", "volumeName", volume.volumeName, "pod", klog.KObj(volume.pod)) klog.ErrorS(err, "Could not find device mount path for volume", "volumeName", gvl.volumeName)
continue continue
} }
err = rc.actualStateOfWorld.MarkDeviceAsMounted(volume.volumeName, volume.devicePath, deviceMountPath) err = rc.actualStateOfWorld.MarkDeviceAsMounted(gvl.volumeName, gvl.devicePath, deviceMountPath)
if err != nil { if err != nil {
klog.ErrorS(err, "Could not mark device is mounted to actual state of world", "pod", klog.KObj(volume.pod)) klog.ErrorS(err, "Could not mark device is mounted to actual state of world", "volume", gvl.volumeName)
continue continue
} }
klog.V(4).InfoS("Volume is marked device as mounted and added into the actual state", "pod", klog.KObj(volume.pod), "podName", volume.podName, "volumeName", volume.volumeName) klog.V(4).InfoS("Volume is marked device as mounted and added into the actual state", "volumeName", gvl.volumeName)
} }
} }
return nil return nil

View File

@ -21,6 +21,9 @@ import (
"fmt" "fmt"
csitrans "k8s.io/csi-translation-lib" csitrans "k8s.io/csi-translation-lib"
"k8s.io/kubernetes/pkg/volume/csimigration" "k8s.io/kubernetes/pkg/volume/csimigration"
"os"
"path"
"path/filepath"
"testing" "testing"
"time" "time"
@ -2181,3 +2184,128 @@ func Test_Run_Positive_VolumeMountControllerAttachEnabledRace(t *testing.T) {
<-finished <-finished
waitForMount(t, fakePlugin, generatedVolumeName, asw) waitForMount(t, fakePlugin, generatedVolumeName, asw)
} }
func getFakeNode() *v1.Node {
return &v1.Node{
ObjectMeta: metav1.ObjectMeta{
Name: string(nodeName),
},
Status: v1.NodeStatus{
VolumesAttached: []v1.AttachedVolume{
{
Name: "fake-plugin/fake-device1",
DevicePath: "/fake/path",
},
},
},
}
}
func getReconciler(kubeletDir string, t *testing.T, volumePaths []string) (Reconciler, *volumetesting.FakeVolumePlugin) {
node := getFakeNode()
volumePluginMgr, fakePlugin := volumetesting.GetTestKubeletVolumePluginMgrWithNodeAndRoot(t, node, kubeletDir)
tmpKubeletPodDir := filepath.Join(kubeletDir, "pods")
dsw := cache.NewDesiredStateOfWorld(volumePluginMgr)
asw := cache.NewActualStateOfWorld(nodeName, volumePluginMgr)
kubeClient := createTestClient()
fakeRecorder := &record.FakeRecorder{}
fakeHandler := volumetesting.NewBlockVolumePathHandler()
oex := operationexecutor.NewOperationExecutor(operationexecutor.NewOperationGenerator(
kubeClient,
volumePluginMgr,
fakeRecorder,
fakeHandler))
mountPoints := []mount.MountPoint{}
for _, volumePath := range volumePaths {
mountPoints = append(mountPoints, mount.MountPoint{Path: volumePath})
}
rc := NewReconciler(
kubeClient,
true, /* controllerAttachDetachEnabled */
reconcilerLoopSleepDuration,
waitForAttachTimeout,
nodeName,
dsw,
asw,
hasAddedPods,
oex,
mount.NewFakeMounter(mountPoints),
hostutil.NewFakeHostUtil(nil),
volumePluginMgr,
tmpKubeletPodDir)
return rc, fakePlugin
}
func TestSyncStates(t *testing.T) {
tests := []struct {
name string
volumePaths []string
createMountPoint bool
verifyFunc func(rcInstance *reconciler, fakePlugin *volumetesting.FakeVolumePlugin) error
}{
{
name: "when two pods are using same volume and both are deleted",
volumePaths: []string{
path.Join("pod1", "volumes", "fake-plugin", "pvc-abcdef"),
path.Join("pod2", "volumes", "fake-plugin", "pvc-abcdef"),
},
createMountPoint: true,
verifyFunc: func(rcInstance *reconciler, fakePlugin *volumetesting.FakeVolumePlugin) error {
mountedPods := rcInstance.actualStateOfWorld.GetMountedVolumes()
if len(mountedPods) != 2 {
return fmt.Errorf("expected 2 pods to in asw got %d", len(mountedPods))
}
return nil
},
},
{
name: "when reconstruction fails for a volume, volumes should be cleaned up",
volumePaths: []string{
path.Join("pod1", "volumes", "fake-plugin", "pvc-abcdef"),
},
createMountPoint: false,
verifyFunc: func(rcInstance *reconciler, fakePlugin *volumetesting.FakeVolumePlugin) error {
return retryWithExponentialBackOff(reconcilerSyncWaitDuration, func() (bool, error) {
err := volumetesting.VerifyTearDownCallCount(1, fakePlugin)
if err != nil {
return false, nil
}
return true, nil
})
},
},
}
for _, tc := range tests {
t.Run(tc.name, func(t *testing.T) {
tmpKubeletDir, err := os.MkdirTemp("", "")
if err != nil {
t.Fatalf("can't make a temp directory for kubeletPods: %v", err)
}
defer os.RemoveAll(tmpKubeletDir)
// create kubelet pod directory
tmpKubeletPodDir := filepath.Join(tmpKubeletDir, "pods")
os.MkdirAll(tmpKubeletPodDir, 0755)
mountPaths := []string{}
// create pod and volume directories so as reconciler can find them.
for _, volumePath := range tc.volumePaths {
vp := filepath.Join(tmpKubeletPodDir, volumePath)
if tc.createMountPoint {
mountPaths = append(mountPaths, vp)
}
os.MkdirAll(vp, 0755)
}
rc, fakePlugin := getReconciler(tmpKubeletDir, t, mountPaths)
rcInstance, _ := rc.(*reconciler)
rcInstance.syncStates(tmpKubeletPodDir)
if err := tc.verifyFunc(rcInstance, fakePlugin); err != nil {
t.Errorf("test %s failed: %v", tc.name, err)
}
})
}
}

View File

@ -1660,6 +1660,19 @@ func GetTestKubeletVolumePluginMgrWithNode(t *testing.T, node *v1.Node) (*volume
return v.GetPluginMgr(), plugins[0].(*FakeVolumePlugin) return v.GetPluginMgr(), plugins[0].(*FakeVolumePlugin)
} }
func GetTestKubeletVolumePluginMgrWithNodeAndRoot(t *testing.T, node *v1.Node, rootDir string) (*VolumePluginMgr, *FakeVolumePlugin) {
plugins := ProbeVolumePlugins(VolumeConfig{})
v := NewFakeKubeletVolumeHost(
t,
rootDir, /* rootDir */
nil, /* kubeClient */
plugins, /* plugins */
)
v.WithNode(node)
return v.GetPluginMgr(), plugins[0].(*FakeVolumePlugin)
}
// CreateTestPVC returns a provisionable PVC for tests // CreateTestPVC returns a provisionable PVC for tests
func CreateTestPVC(capacity string, accessModes []v1.PersistentVolumeAccessMode) *v1.PersistentVolumeClaim { func CreateTestPVC(capacity string, accessModes []v1.PersistentVolumeAccessMode) *v1.PersistentVolumeClaim {
claim := v1.PersistentVolumeClaim{ claim := v1.PersistentVolumeClaim{