Fix error for inline migrated volumes

Inline migrated volumes report a PV, even though they are not backed
by PV.
This commit is contained in:
Hemant Kumar 2022-04-04 13:14:29 -04:00
parent 00bd91e558
commit 5da524d973
3 changed files with 115 additions and 10 deletions

View File

@ -343,7 +343,7 @@ func (dswp *desiredStateOfWorldPopulator) checkVolumeFSResize(
volumeSpec *volume.Spec,
uniquePodName volumetypes.UniquePodName,
mountedVolumesForPod map[volumetypes.UniquePodName]map[string]cache.MountedVolume) {
if podVolume.PersistentVolumeClaim == nil {
if podVolume.PersistentVolumeClaim == nil || pvc == nil {
// Only PVC supports resize operation.
return
}

View File

@ -19,6 +19,8 @@ package reconciler
import (
"crypto/md5"
"fmt"
csitrans "k8s.io/csi-translation-lib"
"k8s.io/kubernetes/pkg/volume/csimigration"
"testing"
"time"
@ -177,6 +179,105 @@ func Test_Run_Positive_VolumeAttachAndMount(t *testing.T) {
assert.NoError(t, volumetesting.VerifyZeroDetachCallCount(fakePlugin))
}
// Populates desiredStateOfWorld cache with one volume/pod.
// Calls Run()
// Verifies there is are attach/mount/etc calls and no detach/unmount calls.
func Test_Run_Positive_VolumeAttachAndMountMigrationEnabled(t *testing.T) {
// Arrange
intreeToCSITranslator := csitrans.New()
node := &v1.Node{
ObjectMeta: metav1.ObjectMeta{
Name: string(nodeName),
},
Spec: v1.NodeSpec{},
Status: v1.NodeStatus{
VolumesAttached: []v1.AttachedVolume{
{
Name: v1.UniqueVolumeName(fmt.Sprintf("fake-plugin/%s", "pd.csi.storage.gke.io-fake-device1")),
DevicePath: "fake/path",
},
},
},
}
volumePluginMgr, fakePlugin := volumetesting.GetTestKubeletVolumePluginMgrWithNode(t, node)
dsw := cache.NewDesiredStateOfWorld(volumePluginMgr)
asw := cache.NewActualStateOfWorld(nodeName, volumePluginMgr)
kubeClient := createTestClient(v1.AttachedVolume{
Name: v1.UniqueVolumeName(fmt.Sprintf("fake-plugin/%s", "pd.csi.storage.gke.io-fake-device1")),
DevicePath: "fake/path",
})
fakeRecorder := &record.FakeRecorder{}
fakeHandler := volumetesting.NewBlockVolumePathHandler()
oex := operationexecutor.NewOperationExecutor(operationexecutor.NewOperationGenerator(
kubeClient,
volumePluginMgr,
fakeRecorder,
fakeHandler))
reconciler := NewReconciler(
kubeClient,
true, /* controllerAttachDetachEnabled */
reconcilerLoopSleepDuration,
waitForAttachTimeout,
nodeName,
dsw,
asw,
hasAddedPods,
oex,
mount.NewFakeMounter(nil),
hostutil.NewFakeHostUtil(nil),
volumePluginMgr,
kubeletPodsDir)
pod := &v1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: "pod1",
UID: "pod1uid",
},
Spec: v1.PodSpec{
Volumes: []v1.Volume{
{
Name: "volume-name",
VolumeSource: v1.VolumeSource{
GCEPersistentDisk: &v1.GCEPersistentDiskVolumeSource{
PDName: "fake-device1",
},
},
},
},
},
}
volumeSpec := &volume.Spec{Volume: &pod.Spec.Volumes[0]}
migratedSpec, err := csimigration.TranslateInTreeSpecToCSI(volumeSpec, pod.Namespace, intreeToCSITranslator)
if err != nil {
t.Fatalf("unexpected error while translating spec %v: %v", volumeSpec, err)
}
podName := util.GetUniquePodName(pod)
generatedVolumeName, err := dsw.AddPodToVolume(
podName, pod, migratedSpec, migratedSpec.Name(), "" /* volumeGidValue */)
// Assert
if err != nil {
t.Fatalf("AddPodToVolume failed. Expected: <no error> Actual: <%v>", err)
}
dsw.MarkVolumesReportedInUse([]v1.UniqueVolumeName{generatedVolumeName})
// Act
runReconciler(reconciler)
waitForMount(t, fakePlugin, generatedVolumeName, asw)
// Assert
assert.NoError(t, volumetesting.VerifyWaitForAttachCallCount(
1 /* expectedWaitForAttachCallCount */, fakePlugin))
assert.NoError(t, volumetesting.VerifyMountDeviceCallCount(
1 /* expectedMountDeviceCallCount */, fakePlugin))
assert.NoError(t, volumetesting.VerifySetUpCallCount(
1 /* expectedSetUpCallCount */, fakePlugin))
assert.NoError(t, volumetesting.VerifyZeroTearDownCallCount(fakePlugin))
assert.NoError(t, volumetesting.VerifyZeroDetachCallCount(fakePlugin))
}
// Populates desiredStateOfWorld cache with one volume/pod.
// Enables controllerAttachDetachEnabled.
// Calls Run()
@ -1896,21 +1997,25 @@ func retryWithExponentialBackOff(initialDuration time.Duration, fn wait.Conditio
return wait.ExponentialBackoff(backoff, fn)
}
func createTestClient() *fake.Clientset {
func createTestClient(attachedVolumes ...v1.AttachedVolume) *fake.Clientset {
fakeClient := &fake.Clientset{}
if len(attachedVolumes) == 0 {
attachedVolumes = append(attachedVolumes, v1.AttachedVolume{
Name: "fake-plugin/fake-device1",
DevicePath: "fake/path",
})
}
fakeClient.AddReactor("get", "nodes",
func(action core.Action) (bool, runtime.Object, error) {
return true, &v1.Node{
ObjectMeta: metav1.ObjectMeta{Name: string(nodeName)},
Status: v1.NodeStatus{
VolumesAttached: []v1.AttachedVolume{
{
Name: "fake-plugin/fake-device1",
DevicePath: "/fake/path",
VolumesAttached: attachedVolumes,
},
}},
}, nil
})
},
)
fakeClient.AddReactor("*", "*", func(action core.Action) (bool, runtime.Object, error) {
return true, nil, fmt.Errorf("no reaction implemented for %s", action)
})

View File

@ -1485,7 +1485,7 @@ func (og *operationGenerator) GenerateVerifyControllerAttachedVolumeFunc(
claimSize := actualStateOfWorld.GetClaimSize(volumeToMount.VolumeName)
// only fetch claimSize if it was not set previously
if volumeToMount.VolumeSpec.PersistentVolume != nil && claimSize == nil {
if volumeToMount.VolumeSpec.PersistentVolume != nil && claimSize == nil && !volumeToMount.VolumeSpec.InlineVolumeSpecForCSIMigration {
pv := volumeToMount.VolumeSpec.PersistentVolume
pvc, err := og.kubeClient.CoreV1().PersistentVolumeClaims(pv.Spec.ClaimRef.Namespace).Get(context.TODO(), pv.Spec.ClaimRef.Name, metav1.GetOptions{})
if err != nil {