diff --git a/pkg/kubelet/volumemanager/cache/actual_state_of_world.go b/pkg/kubelet/volumemanager/cache/actual_state_of_world.go index 629a69d7a49..a7b06af94ea 100644 --- a/pkg/kubelet/volumemanager/cache/actual_state_of_world.go +++ b/pkg/kubelet/volumemanager/cache/actual_state_of_world.go @@ -24,11 +24,10 @@ import ( "fmt" "sync" - "k8s.io/klog" - "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/types" utilfeature "k8s.io/apiserver/pkg/util/feature" + "k8s.io/klog" "k8s.io/kubernetes/pkg/features" "k8s.io/kubernetes/pkg/volume" "k8s.io/kubernetes/pkg/volume/util" @@ -568,7 +567,9 @@ func (asw *actualStateOfWorld) SetVolumeGloballyMounted( volumeObj.globallyMounted = globallyMounted volumeObj.deviceMountPath = deviceMountPath - volumeObj.devicePath = devicePath + if devicePath != "" { + volumeObj.devicePath = devicePath + } asw.attachedVolumes[volumeName] = volumeObj return nil } diff --git a/pkg/kubelet/volumemanager/reconciler/BUILD b/pkg/kubelet/volumemanager/reconciler/BUILD index 2a264aa4ee3..dbb3ee4b86a 100644 --- a/pkg/kubelet/volumemanager/reconciler/BUILD +++ b/pkg/kubelet/volumemanager/reconciler/BUILD @@ -56,6 +56,7 @@ go_test( "//staging/src/k8s.io/client-go/testing:go_default_library", "//staging/src/k8s.io/client-go/tools/record:go_default_library", "//vendor/github.com/stretchr/testify/assert:go_default_library", + "//vendor/k8s.io/klog:go_default_library", ], ) diff --git a/pkg/kubelet/volumemanager/reconciler/reconciler_test.go b/pkg/kubelet/volumemanager/reconciler/reconciler_test.go index 38eaba2f03c..b51ddaf72fb 100644 --- a/pkg/kubelet/volumemanager/reconciler/reconciler_test.go +++ b/pkg/kubelet/volumemanager/reconciler/reconciler_test.go @@ -32,6 +32,7 @@ import ( "k8s.io/client-go/kubernetes/fake" core "k8s.io/client-go/testing" "k8s.io/client-go/tools/record" + "k8s.io/klog" "k8s.io/kubernetes/pkg/features" "k8s.io/kubernetes/pkg/kubelet/volumemanager/cache" "k8s.io/kubernetes/pkg/util/mount" @@ -1129,7 +1130,7 @@ func createTestClient() *fake.Clientset { VolumesAttached: []v1.AttachedVolume{ { Name: "fake-plugin/fake-device1", - DevicePath: "fake/path", + DevicePath: "/fake/path", }, }}, }, nil @@ -1170,3 +1171,96 @@ func createtestClientWithPVPVC(pv *v1.PersistentVolume, pvc *v1.PersistentVolume }) return fakeClient } + +func Test_Run_Positive_VolumeMountControllerAttachEnabledRace(t *testing.T) { + // Arrange + volumePluginMgr, fakePlugin := volumetesting.GetTestVolumePluginMgr(t) + + dsw := cache.NewDesiredStateOfWorld(volumePluginMgr) + asw := cache.NewActualStateOfWorld(nodeName, volumePluginMgr) + kubeClient := createTestClient() + fakeRecorder := &record.FakeRecorder{} + fakeHandler := volumetesting.NewBlockVolumePathHandler() + oex := operationexecutor.NewOperationExecutor(operationexecutor.NewOperationGenerator( + kubeClient, + volumePluginMgr, + fakeRecorder, + false, /* checkNodeCapabilitiesBeforeMount */ + fakeHandler)) + reconciler := NewReconciler( + kubeClient, + true, /* controllerAttachDetachEnabled */ + reconcilerLoopSleepDuration, + reconcilerSyncStatesSleepPeriod, + waitForAttachTimeout, + nodeName, + dsw, + asw, + hasAddedPods, + oex, + &mount.FakeMounter{}, + volumePluginMgr, + kubeletPodsDir) + pod := &v1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: "pod1", + UID: "pod1uid", + }, + Spec: v1.PodSpec{ + Volumes: []v1.Volume{ + { + Name: "volume-name", + VolumeSource: v1.VolumeSource{ + GCEPersistentDisk: &v1.GCEPersistentDiskVolumeSource{ + PDName: "fake-device1", + }, + }, + }, + }, + }, + } + + // Some steps are executes out of order in callbacks, follow the numbers. + + // 1. Add a volume to DSW and wait until it's mounted + volumeSpec := &volume.Spec{Volume: &pod.Spec.Volumes[0]} + podName := util.GetUniquePodName(pod) + generatedVolumeName, err := dsw.AddPodToVolume( + podName, pod, volumeSpec, volumeSpec.Name(), "" /* volumeGidValue */) + dsw.MarkVolumesReportedInUse([]v1.UniqueVolumeName{generatedVolumeName}) + + if err != nil { + t.Fatalf("AddPodToVolume failed. Expected: Actual: <%v>", err) + } + runReconciler(reconciler) + waitForMount(t, fakePlugin, generatedVolumeName, asw) + + finished := make(chan interface{}) + fakePlugin.UnmountDeviceHook = func(mountPath string) error { + // Act: + // 3. While a volume is being unmounted, add it back to the desired state of world + klog.Infof("UnmountDevice called") + generatedVolumeName, err = dsw.AddPodToVolume( + podName, pod, volumeSpec, volumeSpec.Name(), "" /* volumeGidValue */) + dsw.MarkVolumesReportedInUse([]v1.UniqueVolumeName{generatedVolumeName}) + return nil + } + + fakePlugin.WaitForAttachHook = func(spec *volume.Spec, devicePath string, pod *v1.Pod, spectimeout time.Duration) (string, error) { + // Assert + // 4. When the volume is mounted again, expect that UnmountDevice operation did not clear devicePath + if devicePath == "" { + t.Errorf("Expected WaitForAttach called with devicePath from Node.Status") + close(finished) + return "", fmt.Errorf("Expected devicePath from Node.Status") + } + close(finished) + return devicePath, nil + } + + // 2. Delete the volume from DSW (and wait for callbacks) + dsw.DeletePodFromVolume(podName, generatedVolumeName) + + <-finished + waitForMount(t, fakePlugin, generatedVolumeName, asw) +} diff --git a/pkg/volume/testing/testing.go b/pkg/volume/testing/testing.go index 224db3e3b86..4c116aa5934 100644 --- a/pkg/volume/testing/testing.go +++ b/pkg/volume/testing/testing.go @@ -250,6 +250,10 @@ type FakeVolumePlugin struct { LimitKey string ProvisionDelaySeconds int + // Add callbacks as needed + WaitForAttachHook func(spec *Spec, devicePath string, pod *v1.Pod, spectimeout time.Duration) (string, error) + UnmountDeviceHook func(globalMountPath string) error + Mounters []*FakeVolume Unmounters []*FakeVolume Attachers []*FakeVolume @@ -269,7 +273,10 @@ var _ DeviceMountableVolumePlugin = &FakeVolumePlugin{} var _ FSResizableVolumePlugin = &FakeVolumePlugin{} func (plugin *FakeVolumePlugin) getFakeVolume(list *[]*FakeVolume) *FakeVolume { - volume := &FakeVolume{} + volume := &FakeVolume{ + WaitForAttachHook: plugin.WaitForAttachHook, + UnmountDeviceHook: plugin.UnmountDeviceHook, + } *list = append(*list, volume) return volume } @@ -551,6 +558,10 @@ type FakeVolume struct { Plugin *FakeVolumePlugin MetricsNil + // Add callbacks as needed + WaitForAttachHook func(spec *Spec, devicePath string, pod *v1.Pod, spectimeout time.Duration) (string, error) + UnmountDeviceHook func(globalMountPath string) error + SetUpCallCount int TearDownCallCount int AttachCallCount int @@ -724,6 +735,9 @@ func (fv *FakeVolume) WaitForAttach(spec *Spec, devicePath string, pod *v1.Pod, fv.Lock() defer fv.Unlock() fv.WaitForAttachCallCount++ + if fv.WaitForAttachHook != nil { + return fv.WaitForAttachHook(spec, devicePath, pod, spectimeout) + } return "/dev/sdb", nil } @@ -776,6 +790,9 @@ func (fv *FakeVolume) UnmountDevice(globalMountPath string) error { fv.Lock() defer fv.Unlock() fv.UnmountDeviceCallCount++ + if fv.UnmountDeviceHook != nil { + return fv.UnmountDeviceHook(globalMountPath) + } return nil }