Use CheckAndMarkAsUncertainViaReconstruction for uncertain volumes

Also only remove volumes from skippedDuringReconstruction only if
volume was marked as attached.
This commit is contained in:
Hemant Kumar 2022-07-13 13:23:47 -04:00
parent 6d43345c06
commit 835e8ccc76
7 changed files with 197 additions and 78 deletions

View File

@ -350,23 +350,6 @@ func (asw *actualStateOfWorld) MarkVolumeAsDetached(
asw.DeleteVolume(volumeName)
}
func (asw *actualStateOfWorld) AddVolumeViaReconstruction(opts operationexecutor.MarkVolumeOpts) error {
err := asw.MarkVolumeAsMounted(opts)
if err != nil {
return err
}
asw.Lock()
defer asw.Unlock()
podMap, ok := asw.foundDuringReconstruction[opts.VolumeName]
if !ok {
podMap = map[volumetypes.UniquePodName]types.UID{}
}
podMap[opts.PodName] = opts.PodUID
asw.foundDuringReconstruction[opts.VolumeName] = podMap
return nil
}
func (asw *actualStateOfWorld) IsVolumeReconstructed(volumeName v1.UniqueVolumeName, podName volumetypes.UniquePodName) bool {
volumeState := asw.GetVolumeMountState(volumeName, podName)
@ -385,6 +368,84 @@ func (asw *actualStateOfWorld) IsVolumeReconstructed(volumeName v1.UniqueVolumeN
return foundPod
}
func (asw *actualStateOfWorld) CheckAndMarkVolumeAsUncertainViaReconstruction(opts operationexecutor.MarkVolumeOpts) (bool, error) {
asw.Lock()
defer asw.Unlock()
volumeObj, volumeExists := asw.attachedVolumes[opts.VolumeName]
if !volumeExists {
return false, nil
}
podObj, podExists := volumeObj.mountedPods[opts.PodName]
if podExists {
// if volume mount was uncertain we should keep trying to unmount the volume
if podObj.volumeMountStateForPod == operationexecutor.VolumeMountUncertain {
return false, nil
}
if podObj.volumeMountStateForPod == operationexecutor.VolumeMounted {
return false, nil
}
}
podName := opts.PodName
podUID := opts.PodUID
volumeName := opts.VolumeName
mounter := opts.Mounter
blockVolumeMapper := opts.BlockVolumeMapper
outerVolumeSpecName := opts.OuterVolumeSpecName
volumeGidValue := opts.VolumeGidVolume
volumeSpec := opts.VolumeSpec
podObj = mountedPod{
podName: podName,
podUID: podUID,
mounter: mounter,
blockVolumeMapper: blockVolumeMapper,
outerVolumeSpecName: outerVolumeSpecName,
volumeGidValue: volumeGidValue,
volumeSpec: volumeSpec,
remountRequired: false,
volumeMountStateForPod: operationexecutor.VolumeMountUncertain,
}
if mounter != nil {
// The mounter stored in the object may have old information,
// use the newest one.
podObj.mounter = mounter
}
asw.attachedVolumes[volumeName].mountedPods[podName] = podObj
podMap, ok := asw.foundDuringReconstruction[opts.VolumeName]
if !ok {
podMap = map[volumetypes.UniquePodName]types.UID{}
}
podMap[opts.PodName] = opts.PodUID
asw.foundDuringReconstruction[opts.VolumeName] = podMap
return true, nil
}
func (asw *actualStateOfWorld) CheckAndMarkDeviceUncertainViaReconstruction(volumeName v1.UniqueVolumeName, deviceMountPath string) bool {
asw.Lock()
defer asw.Unlock()
volumeObj, volumeExists := asw.attachedVolumes[volumeName]
// CheckAndMarkDeviceUncertainViaReconstruction requires volume to be marked as attached, so if
// volume does not exist in ASOW or is in any state other than DeviceNotMounted we should return
if !volumeExists || volumeObj.deviceMountState != operationexecutor.DeviceNotMounted {
return false
}
volumeObj.deviceMountState = operationexecutor.DeviceMountUncertain
// we are only changing deviceMountPath because devicePath at at this stage is
// determined from node object.
volumeObj.deviceMountPath = deviceMountPath
asw.attachedVolumes[volumeName] = volumeObj
return true
}
func (asw *actualStateOfWorld) MarkVolumeAsMounted(markVolumeOpts operationexecutor.MarkVolumeOpts) error {
return asw.AddPodToVolume(markVolumeOpts)
}

View File

@ -460,6 +460,8 @@ func Test_AddTwoPodsToVolume_Positive(t *testing.T) {
verifyVolumeMountedElsewhere(t, podName2, generatedVolumeName2, true /*expectedMountedElsewhere */, asw)
}
// Test if volumes that were recorded to be read from disk during reconstruction
// are handled correctly by the ASOW.
func TestActualStateOfWorld_FoundDuringReconstruction(t *testing.T) {
tests := []struct {
name string
@ -555,7 +557,7 @@ func TestActualStateOfWorld_FoundDuringReconstruction(t *testing.T) {
VolumeSpec: volumeSpec1,
VolumeMountState: operationexecutor.VolumeMountUncertain,
}
err = asw.AddVolumeViaReconstruction(markVolumeOpts1)
_, err = asw.CheckAndMarkVolumeAsUncertainViaReconstruction(markVolumeOpts1)
if err != nil {
t.Fatalf("AddPodToVolume failed. Expected: <no error> Actual: <%v>", err)
}

View File

@ -181,7 +181,6 @@ func (rc *reconciler) reconcile() {
// After running the above operations if skippedDuringReconstruction is not empty
// then ensure that all volumes which were discovered and skipped during reconstruction
// are added to actualStateOfWorld in uncertain state.
// This should be called only ONCE after reconstruction.
if len(rc.skippedDuringReconstruction) > 0 {
rc.processReconstructedVolumes()
}
@ -265,42 +264,43 @@ func (rc *reconciler) mountAttachedVolumes(volumeToMount cache.VolumeToMount, po
// But if mount operation fails for some reason then we still need to mark the volume as uncertain
// and wait for the next reconciliation loop to deal with it.
func (rc *reconciler) processReconstructedVolumes() {
if rc.kubeClient != nil {
rc.updateDevicePath(rc.skippedDuringReconstruction)
}
for volumeName, glblVolumeInfo := range rc.skippedDuringReconstruction {
// check if volume is marked as attached to the node
// for now lets only process volumes which are at least known as attached to the node
// this should help with most volume types (including secret, configmap etc)
if !rc.actualStateOfWorld.VolumeExists(volumeName) {
klog.V(4).InfoS("Volume is not marked as attached to the node. Skipping processing of the volume", "volumeName", volumeName)
delete(rc.skippedDuringReconstruction, volumeName)
continue
}
uncertainVolumeCount := 0
// only delete volumes which were marked as attached here.
// This should ensure that - we will wait for volumes which were not marked as attached
// before adding them in uncertain state during reconstruction.
delete(rc.skippedDuringReconstruction, volumeName)
for podName, volume := range glblVolumeInfo.podVolumes {
volumeNotMounted := rc.actualStateOfWorld.PodRemovedFromVolume(podName, volume.volumeName)
markVolumeOpts := operationexecutor.MarkVolumeOpts{
PodName: volume.podName,
PodUID: types.UID(podName),
VolumeName: volume.volumeName,
Mounter: volume.mounter,
BlockVolumeMapper: volume.blockVolumeMapper,
OuterVolumeSpecName: volume.outerVolumeSpecName,
VolumeGidVolume: volume.volumeGidValue,
VolumeSpec: volume.volumeSpec,
VolumeMountState: operationexecutor.VolumeMountUncertain,
}
volumeAdded, err := rc.actualStateOfWorld.CheckAndMarkVolumeAsUncertainViaReconstruction(markVolumeOpts)
// if volume is not mounted then lets mark volume mounted in uncertain state in ASOW
if volumeNotMounted {
markVolumeOpts := operationexecutor.MarkVolumeOpts{
PodName: volume.podName,
PodUID: types.UID(volume.podName),
VolumeName: volume.volumeName,
Mounter: volume.mounter,
BlockVolumeMapper: volume.blockVolumeMapper,
OuterVolumeSpecName: volume.outerVolumeSpecName,
VolumeGidVolume: volume.volumeGidValue,
VolumeSpec: volume.volumeSpec,
VolumeMountState: operationexecutor.VolumeMountUncertain,
}
err := rc.actualStateOfWorld.AddVolumeViaReconstruction(markVolumeOpts)
if volumeAdded {
uncertainVolumeCount += 1
if err != nil {
klog.ErrorS(err, "Could not add pod to volume information to actual state of world", "pod", klog.KObj(volume.pod))
continue
}
klog.V(4).InfoS("Volume is marked as mounted and added into the actual state", "pod", klog.KObj(volume.pod), "podName", volume.podName, "volumeName", volume.volumeName)
klog.V(4).InfoS("Volume is marked as mounted in uncertain state and added to the actual state", "pod", klog.KObj(volume.pod), "podName", volume.podName, "volumeName", volume.volumeName)
}
}
@ -312,19 +312,13 @@ func (rc *reconciler) processReconstructedVolumes() {
klog.ErrorS(err, "Could not find device mount path for volume", "volumeName", glblVolumeInfo.volumeName)
continue
}
currentMountState := rc.actualStateOfWorld.GetDeviceMountState(glblVolumeInfo.volumeName)
if currentMountState == operationexecutor.DeviceNotMounted {
err = rc.actualStateOfWorld.MarkDeviceAsUncertain(glblVolumeInfo.volumeName, glblVolumeInfo.devicePath, deviceMountPath)
if err != nil {
klog.ErrorS(err, "Could not mark device is mounted to actual state of world", "volume", glblVolumeInfo.volumeName)
continue
}
klog.V(4).InfoS("Volume is marked device as mounted and added into the actual state", "volumeName", glblVolumeInfo.volumeName)
deviceMounted := rc.actualStateOfWorld.CheckAndMarkDeviceUncertainViaReconstruction(glblVolumeInfo.volumeName, deviceMountPath)
if !deviceMounted {
klog.V(3).InfoS("Could not mark device as mounted in uncertain state", "volumeName", glblVolumeInfo.volumeName)
}
}
}
}
rc.skippedDuringReconstruction = make(map[v1.UniqueVolumeName]*globalVolumeInfo)
}
func (rc *reconciler) waitForVolumeAttach(volumeToMount cache.VolumeToMount) {

View File

@ -2202,6 +2202,28 @@ func getFakeNode() *v1.Node {
}
}
func getInlineFakePod(podName, podUUID, outerName, innerName string) *v1.Pod {
pod := &v1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: podName,
UID: k8stypes.UID(podUUID),
},
Spec: v1.PodSpec{
Volumes: []v1.Volume{
{
Name: outerName,
VolumeSource: v1.VolumeSource{
GCEPersistentDisk: &v1.GCEPersistentDiskVolumeSource{
PDName: innerName,
},
},
},
},
},
}
return pod
}
func getReconciler(kubeletDir string, t *testing.T, volumePaths []string) (Reconciler, *volumetesting.FakeVolumePlugin) {
node := getFakeNode()
volumePluginMgr, fakePlugin := volumetesting.GetTestKubeletVolumePluginMgrWithNodeAndRoot(t, node, kubeletDir)
@ -2239,11 +2261,23 @@ func getReconciler(kubeletDir string, t *testing.T, volumePaths []string) (Recon
}
func TestSyncStates(t *testing.T) {
type podInfo struct {
podName string
podUID string
outerVolumeName string
innerVolumeName string
}
defaultPodInfo := podInfo{
podName: "pod1",
podUID: "pod1uid",
outerVolumeName: "volume-name",
innerVolumeName: "volume-name",
}
tests := []struct {
name string
volumePaths []string
createMountPoint bool
addToDSOW bool
podInfos []podInfo
postSyncStatCallback func(rcInstance *reconciler, fakePlugin *volumetesting.FakeVolumePlugin) error
verifyFunc func(rcInstance *reconciler, fakePlugin *volumetesting.FakeVolumePlugin) error
}{
@ -2254,6 +2288,7 @@ func TestSyncStates(t *testing.T) {
path.Join("pod2", "volumes", "fake-plugin", "pvc-abcdef"),
},
createMountPoint: true,
podInfos: []podInfo{},
verifyFunc: func(rcInstance *reconciler, fakePlugin *volumetesting.FakeVolumePlugin) error {
mountedPods := rcInstance.actualStateOfWorld.GetMountedVolumes()
if len(mountedPods) != 2 {
@ -2262,12 +2297,33 @@ func TestSyncStates(t *testing.T) {
return nil
},
},
{
name: "when two pods are using same volume and one of them is deleted",
volumePaths: []string{
path.Join("pod1uid", "volumes", "fake-plugin", "volume-name"),
path.Join("pod2uid", "volumes", "fake-plugin", "volume-name"),
},
createMountPoint: true,
podInfos: []podInfo{defaultPodInfo},
verifyFunc: func(rcInstance *reconciler, fakePlugin *volumetesting.FakeVolumePlugin) error {
// for pod that is deleted, volume is considered as mounted
mountedPods := rcInstance.actualStateOfWorld.GetMountedVolumes()
if len(mountedPods) != 1 {
return fmt.Errorf("expected 1 pods to in asw got %d", len(mountedPods))
}
if types.UniquePodName("pod2uid") != mountedPods[0].PodName {
return fmt.Errorf("expected mounted pod to be %s got %s", "pod2uid", mountedPods[0].PodName)
}
return nil
},
},
{
name: "when reconstruction fails for a volume, volumes should be cleaned up",
volumePaths: []string{
path.Join("pod1", "volumes", "fake-plugin", "pvc-abcdef"),
},
createMountPoint: false,
podInfos: []podInfo{},
verifyFunc: func(rcInstance *reconciler, fakePlugin *volumetesting.FakeVolumePlugin) error {
return retryWithExponentialBackOff(reconcilerSyncWaitDuration, func() (bool, error) {
err := volumetesting.VerifyTearDownCallCount(1, fakePlugin)
@ -2284,7 +2340,7 @@ func TestSyncStates(t *testing.T) {
path.Join("pod1uid", "volumes", "fake-plugin", "volume-name"),
},
createMountPoint: true,
addToDSOW: true,
podInfos: []podInfo{defaultPodInfo},
postSyncStatCallback: func(rcInstance *reconciler, fakePlugin *volumetesting.FakeVolumePlugin) error {
skippedVolumes := rcInstance.skippedDuringReconstruction
if len(skippedVolumes) != 1 {
@ -2303,6 +2359,22 @@ func TestSyncStates(t *testing.T) {
if !addedViaReconstruction {
return fmt.Errorf("expected volume %s to be marked as added via reconstruction", mountedPodVolume.VolumeName)
}
// check device mount state
attachedVolumes := rcInstance.actualStateOfWorld.GetAttachedVolumes()
if len(attachedVolumes) != 1 {
return fmt.Errorf("expected 1 volume to be unmounted, got %d", len(attachedVolumes))
}
firstAttachedVolume := attachedVolumes[0]
if !firstAttachedVolume.DeviceMayBeMounted() {
return fmt.Errorf("expected %s volume to be mounted in uncertain state", firstAttachedVolume.VolumeName)
}
// also skippedVolumes map should be empty
skippedVolumes := rcInstance.skippedDuringReconstruction
if len(skippedVolumes) > 0 {
return fmt.Errorf("expected 0 pods in skipped volumes found %d", len(skippedVolumes))
}
return nil
},
},
@ -2315,28 +2387,6 @@ func TestSyncStates(t *testing.T) {
}
defer os.RemoveAll(tmpKubeletDir)
pod := &v1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: "pod1",
UID: "pod1uid",
},
Spec: v1.PodSpec{
Volumes: []v1.Volume{
{
Name: "volume-name",
VolumeSource: v1.VolumeSource{
GCEPersistentDisk: &v1.GCEPersistentDiskVolumeSource{
PDName: "volume-name",
},
},
},
},
},
}
volumeSpec := &volume.Spec{Volume: &pod.Spec.Volumes[0]}
podName := util.GetUniquePodName(pod)
// create kubelet pod directory
tmpKubeletPodDir := filepath.Join(tmpKubeletDir, "pods")
os.MkdirAll(tmpKubeletPodDir, 0755)
@ -2355,7 +2405,10 @@ func TestSyncStates(t *testing.T) {
rc, fakePlugin := getReconciler(tmpKubeletDir, t, mountPaths)
rcInstance, _ := rc.(*reconciler)
if tc.addToDSOW {
for _, tpodInfo := range tc.podInfos {
pod := getInlineFakePod(tpodInfo.podName, tpodInfo.podUID, tpodInfo.outerVolumeName, tpodInfo.innerVolumeName)
volumeSpec := &volume.Spec{Volume: &pod.Spec.Volumes[0]}
podName := util.GetUniquePodName(pod)
volumeName, err := rcInstance.desiredStateOfWorld.AddPodToVolume(
podName, pod, volumeSpec, volumeSpec.Name(), "" /* volumeGidValue */)
if err != nil {

View File

@ -1660,8 +1660,8 @@ func GetTestKubeletVolumePluginMgrWithNode(t *testing.T, node *v1.Node) (*volume
return v.GetPluginMgr(), plugins[0].(*FakeVolumePlugin)
}
func GetTestKubeletVolumePluginMgrWithNodeAndRoot(t *testing.T, node *v1.Node, rootDir string) (*VolumePluginMgr, *FakeVolumePlugin) {
plugins := ProbeVolumePlugins(VolumeConfig{})
func GetTestKubeletVolumePluginMgrWithNodeAndRoot(t *testing.T, node *v1.Node, rootDir string) (*volume.VolumePluginMgr, *FakeVolumePlugin) {
plugins := ProbeVolumePlugins(volume.VolumeConfig{})
v := NewFakeKubeletVolumeHost(
t,
rootDir, /* rootDir */

View File

@ -216,9 +216,18 @@ type ActualStateOfWorldMounterUpdater interface {
// volume expansion must not be retried for this volume
MarkForInUseExpansionError(volumeName v1.UniqueVolumeName)
// AddVolumeViaReconstruction adds the volume to actual state of the world and also
// marks the volume as one found during reconstruction.
AddVolumeViaReconstruction(opts MarkVolumeOpts) error
// CheckAndMarkVolumeAsUncertainViaReconstruction only adds volume to actual state of the world
// if volume was not already there. This avoid overwriting in any previously stored
// state. It returns error if there was an error adding the volume to ASOW.
// It returns true, if this operation resulted in volume being added to ASOW
// otherwise it returns false.
CheckAndMarkVolumeAsUncertainViaReconstruction(opts MarkVolumeOpts) (bool, error)
// CheckAndMarkDeviceUncertainViaReconstruction only adds device to actual state of the world
// if device was not already there. This avoids overwriting in any previously stored
// state. We only supply deviceMountPath because devicePath is already determined from
// VerifyControllerAttachedVolume function.
CheckAndMarkDeviceUncertainViaReconstruction(volumeName v1.UniqueVolumeName, deviceMountPath string) bool
// IsVolumeReconstructed returns true if volume currently added to actual state of the world
// was found during reconstruction.

View File

@ -788,7 +788,7 @@ func (og *operationGenerator) markVolumeErrorState(volumeToMount VolumeToMount,
// if volume was previously reconstructed we are not going to change its state as unmounted even
// if mount operation fails.
if actualStateOfWorld.IsVolumeReconstructed(volumeToMount.VolumeName, volumeToMount.PodName) {
klog.V(3).Infof("MountVolume.markVolumeErrorState leaving volume uncertain", "volumeName", volumeToMount.VolumeName)
klog.V(3).InfoS("MountVolume.markVolumeErrorState leaving volume uncertain", "volumeName", volumeToMount.VolumeName)
return
}