Remove volume from found during reconstruction if mounted

Add unit tests for removing reconstructed volumes from ASOW
This commit is contained in:
Hemant Kumar 2022-06-30 07:29:10 -04:00
parent b455270f6e
commit 6d43345c06
4 changed files with 190 additions and 7 deletions

View File

@ -227,7 +227,9 @@ type actualStateOfWorld struct {
// state by default.
// The key in this map is the name of the volume and the value is an object
// containing more information about the attached volume.
attachedVolumes map[v1.UniqueVolumeName]attachedVolume
attachedVolumes map[v1.UniqueVolumeName]attachedVolume
// foundDuringReconstruction is a map of volumes which were discovered
// from kubelet root directory when kubelet was restarted.
foundDuringReconstruction map[v1.UniqueVolumeName]map[volumetypes.UniquePodName]types.UID
// volumePluginMgr is the volume plugin manager used to create volume
@ -366,14 +368,15 @@ func (asw *actualStateOfWorld) AddVolumeViaReconstruction(opts operationexecutor
}
func (asw *actualStateOfWorld) IsVolumeReconstructed(volumeName v1.UniqueVolumeName, podName volumetypes.UniquePodName) bool {
asw.RLock()
defer asw.RUnlock()
volumeState := asw.GetVolumeMountState(volumeName, podName)
// only uncertain volumes are reconstructed
if volumeState != operationexecutor.VolumeMountUncertain {
return false
}
asw.RLock()
defer asw.RUnlock()
podMap, ok := asw.foundDuringReconstruction[volumeName]
if !ok {
return false
@ -571,6 +574,11 @@ func (asw *actualStateOfWorld) AddPodToVolume(markVolumeOpts operationexecutor.M
// If pod exists, reset remountRequired value
podObj.remountRequired = false
podObj.volumeMountStateForPod = markVolumeOpts.VolumeMountState
// if volume is mounted successfully, then it should be removed from foundDuringReconstruction map
if markVolumeOpts.VolumeMountState == operationexecutor.VolumeMounted {
delete(asw.foundDuringReconstruction[volumeName], podName)
}
if mounter != nil {
// The mounter stored in the object may have old information,
// use the newest one.

View File

@ -17,7 +17,9 @@ limitations under the License.
package cache
import (
"fmt"
"k8s.io/apimachinery/pkg/api/resource"
"k8s.io/apimachinery/pkg/types"
"testing"
"github.com/stretchr/testify/require"
@ -458,6 +460,149 @@ func Test_AddTwoPodsToVolume_Positive(t *testing.T) {
verifyVolumeMountedElsewhere(t, podName2, generatedVolumeName2, true /*expectedMountedElsewhere */, asw)
}
func TestActualStateOfWorld_FoundDuringReconstruction(t *testing.T) {
tests := []struct {
name string
opCallback func(asw ActualStateOfWorld, volumeOpts operationexecutor.MarkVolumeOpts) error
verifyCallback func(asw ActualStateOfWorld, volumeOpts operationexecutor.MarkVolumeOpts) error
}{
{
name: "marking volume mounted should remove volume from found during reconstruction",
opCallback: func(asw ActualStateOfWorld, volumeOpts operationexecutor.MarkVolumeOpts) error {
volumeOpts.VolumeMountState = operationexecutor.VolumeMounted
return asw.MarkVolumeAsMounted(volumeOpts)
},
verifyCallback: func(asw ActualStateOfWorld, volumeOpts operationexecutor.MarkVolumeOpts) error {
ok := asw.IsVolumeReconstructed(volumeOpts.VolumeName, volumeOpts.PodName)
if ok {
return fmt.Errorf("found unexpected volume in reconstructed volume list")
}
return nil
},
},
{
name: "removing volume from pod should remove volume from found during reconstruction",
opCallback: func(asw ActualStateOfWorld, volumeOpts operationexecutor.MarkVolumeOpts) error {
return asw.MarkVolumeAsUnmounted(volumeOpts.PodName, volumeOpts.VolumeName)
},
verifyCallback: func(asw ActualStateOfWorld, volumeOpts operationexecutor.MarkVolumeOpts) error {
ok := asw.IsVolumeReconstructed(volumeOpts.VolumeName, volumeOpts.PodName)
if ok {
return fmt.Errorf("found unexpected volume in reconstructed volume list")
}
return nil
},
},
{
name: "removing volume entirely from ASOW should remove volume from found during reconstruction",
opCallback: func(asw ActualStateOfWorld, volumeOpts operationexecutor.MarkVolumeOpts) error {
err := asw.MarkVolumeAsUnmounted(volumeOpts.PodName, volumeOpts.VolumeName)
if err != nil {
return err
}
asw.MarkVolumeAsDetached(volumeOpts.VolumeName, "")
return nil
},
verifyCallback: func(asw ActualStateOfWorld, volumeOpts operationexecutor.MarkVolumeOpts) error {
ok := asw.IsVolumeReconstructed(volumeOpts.VolumeName, volumeOpts.PodName)
if ok {
return fmt.Errorf("found unexpected volume in reconstructed volume list")
}
aswInstance, _ := asw.(*actualStateOfWorld)
_, found := aswInstance.foundDuringReconstruction[volumeOpts.VolumeName]
if found {
return fmt.Errorf("found unexpected volume in reconstructed map")
}
return nil
},
},
}
for _, tc := range tests {
t.Run(tc.name, func(t *testing.T) {
volumePluginMgr, plugin := volumetesting.GetTestKubeletVolumePluginMgr(t)
asw := NewActualStateOfWorld("mynode" /* nodeName */, volumePluginMgr)
devicePath := "fake/device/path"
pod1 := getTestPod("pod1", "pod1uid", "volume-name-1", "fake-device1")
volumeSpec1 := &volume.Spec{Volume: &pod1.Spec.Volumes[0]}
generatedVolumeName1, err := util.GetUniqueVolumeNameFromSpec(
plugin, volumeSpec1)
require.NoError(t, err)
err = asw.MarkVolumeAsAttached(generatedVolumeName1, volumeSpec1, "" /* nodeName */, devicePath)
if err != nil {
t.Fatalf("MarkVolumeAsAttached failed. Expected: <no error> Actual: <%v>", err)
}
podName1 := util.GetUniquePodName(pod1)
mounter1, err := plugin.NewMounter(volumeSpec1, pod1, volume.VolumeOptions{})
if err != nil {
t.Fatalf("NewMounter failed. Expected: <no error> Actual: <%v>", err)
}
mapper1, err := plugin.NewBlockVolumeMapper(volumeSpec1, pod1, volume.VolumeOptions{})
if err != nil {
t.Fatalf("NewBlockVolumeMapper failed. Expected: <no error> Actual: <%v>", err)
}
markVolumeOpts1 := operationexecutor.MarkVolumeOpts{
PodName: podName1,
PodUID: pod1.UID,
VolumeName: generatedVolumeName1,
Mounter: mounter1,
BlockVolumeMapper: mapper1,
OuterVolumeSpecName: volumeSpec1.Name(),
VolumeSpec: volumeSpec1,
VolumeMountState: operationexecutor.VolumeMountUncertain,
}
err = asw.AddVolumeViaReconstruction(markVolumeOpts1)
if err != nil {
t.Fatalf("AddPodToVolume failed. Expected: <no error> Actual: <%v>", err)
}
// make sure state is as we expect it to be
verifyVolumeExistsAsw(t, generatedVolumeName1, true /* shouldExist */, asw)
verifyVolumeDoesntExistInUnmountedVolumes(t, generatedVolumeName1, asw)
verifyVolumeDoesntExistInGloballyMountedVolumes(t, generatedVolumeName1, asw)
verifyVolumeExistsWithSpecNameInVolumeAsw(t, podName1, volumeSpec1.Name(), asw)
verifyVolumeSpecNameInVolumeAsw(t, podName1, []*volume.Spec{volumeSpec1}, asw)
verifyVolumeFoundInReconstruction(t, podName1, generatedVolumeName1, asw)
if tc.opCallback != nil {
err = tc.opCallback(asw, markVolumeOpts1)
if err != nil {
t.Fatalf("for test %s: %v", tc.name, err)
}
}
err = tc.verifyCallback(asw, markVolumeOpts1)
if err != nil {
t.Fatalf("for test %s verification failed: %v", tc.name, err)
}
})
}
}
func getTestPod(podName, podUID, outerVolumeName, pdName string) *v1.Pod {
pod := &v1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: podName,
UID: types.UID(podUID),
},
Spec: v1.PodSpec{
Volumes: []v1.Volume{
{
Name: outerVolumeName,
VolumeSource: v1.VolumeSource{
GCEPersistentDisk: &v1.GCEPersistentDiskVolumeSource{
PDName: pdName,
},
},
},
},
},
}
return pod
}
// Calls AddPodToVolume() to add pod to empty data struct
// Verifies call fails with "volume does not exist" error.
func Test_AddPodToVolume_Negative_VolumeDoesntExist(t *testing.T) {
@ -873,3 +1018,10 @@ func verifyVolumeSpecNameInVolumeAsw(
}
}
}
func verifyVolumeFoundInReconstruction(t *testing.T, podToCheck volumetypes.UniquePodName, volumeToCheck v1.UniqueVolumeName, asw ActualStateOfWorld) {
isRecontructed := asw.IsVolumeReconstructed(volumeToCheck, podToCheck)
if !isRecontructed {
t.Fatalf("ASW IsVolumeReconstructed result invalid. expected <true> Actual <false>")
}
}

View File

@ -178,6 +178,10 @@ func (rc *reconciler) reconcile() {
// Ensure devices that should be detached/unmounted are detached/unmounted.
rc.unmountDetachDevices()
// After running the above operations if skippedDuringReconstruction is not empty
// then ensure that all volumes which were discovered and skipped during reconstruction
// are added to actualStateOfWorld in uncertain state.
// This should be called only ONCE after reconstruction.
if len(rc.skippedDuringReconstruction) > 0 {
rc.processReconstructedVolumes()
}
@ -279,7 +283,18 @@ func (rc *reconciler) processReconstructedVolumes() {
volumeNotMounted := rc.actualStateOfWorld.PodRemovedFromVolume(podName, volume.volumeName)
// if volume is not mounted then lets mark volume mounted in uncertain state in ASOW
if volumeNotMounted {
err := rc.markVolumeState(volume, operationexecutor.VolumeMountUncertain)
markVolumeOpts := operationexecutor.MarkVolumeOpts{
PodName: volume.podName,
PodUID: types.UID(volume.podName),
VolumeName: volume.volumeName,
Mounter: volume.mounter,
BlockVolumeMapper: volume.blockVolumeMapper,
OuterVolumeSpecName: volume.outerVolumeSpecName,
VolumeGidVolume: volume.volumeGidValue,
VolumeSpec: volume.volumeSpec,
VolumeMountState: operationexecutor.VolumeMountUncertain,
}
err := rc.actualStateOfWorld.AddVolumeViaReconstruction(markVolumeOpts)
uncertainVolumeCount += 1
if err != nil {
klog.ErrorS(err, "Could not add pod to volume information to actual state of world", "pod", klog.KObj(volume.pod))
@ -429,6 +444,8 @@ type reconstructedVolume struct {
blockVolumeMapper volumepkg.BlockVolumeMapper
}
// globalVolumeInfo stores reconstructed volume information
// for each pod that was using that volume.
type globalVolumeInfo struct {
volumeName v1.UniqueVolumeName
volumeSpec *volumepkg.Spec

View File

@ -19,14 +19,15 @@ package reconciler
import (
"crypto/md5"
"fmt"
csitrans "k8s.io/csi-translation-lib"
"k8s.io/kubernetes/pkg/volume/csimigration"
"os"
"path"
"path/filepath"
"testing"
"time"
csitrans "k8s.io/csi-translation-lib"
"k8s.io/kubernetes/pkg/volume/csimigration"
"github.com/stretchr/testify/assert"
"k8s.io/mount-utils"
@ -2297,6 +2298,11 @@ func TestSyncStates(t *testing.T) {
if len(mountedPods) != 1 {
return fmt.Errorf("expected 1 pods to in mounted volume list got %d", len(mountedPods))
}
mountedPodVolume := mountedPods[0]
addedViaReconstruction := rcInstance.actualStateOfWorld.IsVolumeReconstructed(mountedPodVolume.VolumeName, mountedPodVolume.PodName)
if !addedViaReconstruction {
return fmt.Errorf("expected volume %s to be marked as added via reconstruction", mountedPodVolume.VolumeName)
}
return nil
},
},