diff --git a/pkg/controller/volume/attach_detach_controller.go b/pkg/controller/volume/attach_detach_controller.go index 63f615793e9..7997f5e8f66 100644 --- a/pkg/controller/volume/attach_detach_controller.go +++ b/pkg/controller/volume/attach_detach_controller.go @@ -50,7 +50,7 @@ const ( // attach detach controller will wait for a volume to be safely unmounted // from its node. Once this time has expired, the controller will assume the // node or kubelet are unresponsive and will detach the volume anyway. - reconcilerMaxWaitForUnmountDuration time.Duration = 3 * time.Minute + reconcilerMaxWaitForUnmountDuration time.Duration = 6 * time.Minute // desiredStateOfWorldPopulatorLoopSleepPeriod is the amount of time the // DesiredStateOfWorldPopulator loop waits between successive executions diff --git a/pkg/controller/volume/reconciler/reconciler.go b/pkg/controller/volume/reconciler/reconciler.go index b948ee259b4..914d93250c7 100644 --- a/pkg/controller/volume/reconciler/reconciler.go +++ b/pkg/controller/volume/reconciler/reconciler.go @@ -109,7 +109,7 @@ func (rc *reconciler) reconciliationLoopFunc() func() { if !attachedVolume.MountedByNode { glog.V(5).Infof("Attempting to start DetachVolume for volume %q from node %q", attachedVolume.VolumeName, attachedVolume.NodeName) - err := rc.attacherDetacher.DetachVolume(attachedVolume.AttachedVolume, rc.actualStateOfWorld) + err := rc.attacherDetacher.DetachVolume(attachedVolume.AttachedVolume, true /* verifySafeToDetach */, rc.actualStateOfWorld) if err == nil { glog.Infof("Started DetachVolume for volume %q from node %q", attachedVolume.VolumeName, attachedVolume.NodeName) } @@ -129,7 +129,7 @@ func (rc *reconciler) reconciliationLoopFunc() func() { // If volume is not safe to detach (is mounted) wait a max amount of time before detaching any way. if timeElapsed > rc.maxWaitForUnmountDuration { glog.V(5).Infof("Attempting to start DetachVolume for volume %q from node %q. Volume is not safe to detach, but maxWaitForUnmountDuration expired.", attachedVolume.VolumeName, attachedVolume.NodeName) - err := rc.attacherDetacher.DetachVolume(attachedVolume.AttachedVolume, rc.actualStateOfWorld) + err := rc.attacherDetacher.DetachVolume(attachedVolume.AttachedVolume, false /* verifySafeToDetach */, rc.actualStateOfWorld) if err == nil { glog.Infof("Started DetachVolume for volume %q from node %q due to maxWaitForUnmountDuration expiry.", attachedVolume.VolumeName, attachedVolume.NodeName) } diff --git a/pkg/kubelet/kubelet.go b/pkg/kubelet/kubelet.go index 4f897185166..c3769d88aed 100644 --- a/pkg/kubelet/kubelet.go +++ b/pkg/kubelet/kubelet.go @@ -3404,7 +3404,11 @@ func (kl *Kubelet) tryUpdateNodeStatus() error { return err } // Update the current status on the API server - _, err = kl.kubeClient.Core().Nodes().UpdateStatus(node) + updatedNode, err := kl.kubeClient.Core().Nodes().UpdateStatus(node) + if err == nil { + kl.volumeManager.MarkVolumesAsReportedInUse( + updatedNode.Status.VolumesInUse) + } return err } diff --git a/pkg/kubelet/kubelet_test.go b/pkg/kubelet/kubelet_test.go index b503382bd73..552df6990b3 100644 --- a/pkg/kubelet/kubelet_test.go +++ b/pkg/kubelet/kubelet_test.go @@ -597,7 +597,7 @@ func TestVolumeUnmountAndDetachControllerDisabled(t *testing.T) { } // Verify volumes detached and no longer reported as in use - err = waitForVolumeDetach(kubelet.volumeManager) + err = waitForVolumeDetach(api.UniqueVolumeName("fake/vol1"), kubelet.volumeManager) if err != nil { t.Error(err) } @@ -611,7 +611,6 @@ func TestVolumeUnmountAndDetachControllerDisabled(t *testing.T) { if err != nil { t.Error(err) } - } func TestVolumeAttachAndMountControllerEnabled(t *testing.T) { @@ -657,6 +656,13 @@ func TestVolumeAttachAndMountControllerEnabled(t *testing.T) { }() kubelet.podManager.SetPods([]*api.Pod{pod}) + + // Fake node status update + go simulateVolumeInUseUpdate( + api.UniqueVolumeName("fake/vol1"), + stopCh, + kubelet.volumeManager) + err := kubelet.volumeManager.WaitForAttachAndMount(pod) if err != nil { t.Errorf("Expected success: %v", err) @@ -747,6 +753,12 @@ func TestVolumeUnmountAndDetachControllerEnabled(t *testing.T) { // Add pod kubelet.podManager.SetPods([]*api.Pod{pod}) + // Fake node status update + go simulateVolumeInUseUpdate( + api.UniqueVolumeName("fake/vol1"), + stopCh, + kubelet.volumeManager) + // Verify volumes attached err := kubelet.volumeManager.WaitForAttachAndMount(pod) if err != nil { @@ -815,7 +827,7 @@ func TestVolumeUnmountAndDetachControllerEnabled(t *testing.T) { } // Verify volumes detached and no longer reported as in use - err = waitForVolumeDetach(kubelet.volumeManager) + err = waitForVolumeDetach(api.UniqueVolumeName("fake/vol1"), kubelet.volumeManager) if err != nil { t.Error(err) } @@ -828,7 +840,6 @@ func TestVolumeUnmountAndDetachControllerEnabled(t *testing.T) { if err != nil { t.Error(err) } - } func TestPodVolumesExist(t *testing.T) { @@ -4987,19 +4998,15 @@ func waitForVolumeUnmount( } func waitForVolumeDetach( + volumeName api.UniqueVolumeName, volumeManager kubeletvolume.VolumeManager) error { attachedVolumes := []api.UniqueVolumeName{} err := retryWithExponentialBackOff( time.Duration(50*time.Millisecond), func() (bool, error) { // Verify volumes detached - attachedVolumes = volumeManager.GetVolumesInUse() - - if len(attachedVolumes) != 0 { - return false, nil - } - - return true, nil + volumeAttached := volumeManager.VolumeIsAttached(volumeName) + return !volumeAttached, nil }, ) @@ -5020,3 +5027,20 @@ func retryWithExponentialBackOff(initialDuration time.Duration, fn wait.Conditio } return wait.ExponentialBackoff(backoff, fn) } + +func simulateVolumeInUseUpdate( + volumeName api.UniqueVolumeName, + stopCh <-chan struct{}, + volumeManager kubeletvolume.VolumeManager) { + ticker := time.NewTicker(100 * time.Millisecond) + defer ticker.Stop() + for { + select { + case <-ticker.C: + volumeManager.MarkVolumesAsReportedInUse( + []api.UniqueVolumeName{volumeName}) + case <-stopCh: + return + } + } +} diff --git a/pkg/kubelet/volume/cache/actual_state_of_world.go b/pkg/kubelet/volume/cache/actual_state_of_world.go index e0a9a5d53b6..4dd5f5444fa 100644 --- a/pkg/kubelet/volume/cache/actual_state_of_world.go +++ b/pkg/kubelet/volume/cache/actual_state_of_world.go @@ -117,6 +117,11 @@ type ActualStateOfWorld interface { // volumes that do not need to update contents should not fail. PodExistsInVolume(podName volumetypes.UniquePodName, volumeName api.UniqueVolumeName) (bool, string, error) + // VolumeExists returns true if the given volume exists in the list of + // attached volumes in the cache, indicating the volume is attached to this + // node. + VolumeExists(volumeName api.UniqueVolumeName) bool + // GetMountedVolumes generates and returns a list of volumes and the pods // they are successfully attached and mounted for based on the current // actual state of the world. @@ -127,12 +132,17 @@ type ActualStateOfWorld interface { // current actual state of the world. GetMountedVolumesForPod(podName volumetypes.UniquePodName) []MountedVolume - // GetAttachedVolumes generates and returns a list of all attached volumes. - GetAttachedVolumes() []AttachedVolume + // GetGloballyMountedVolumes generates and returns a list of all attached + // volumes that are globally mounted. This list can be used to determine + // which volumes should be reported as "in use" in the node's VolumesInUse + // status field. Globally mounted here refers to the shared plugin mount + // point for the attachable volume from which the pod specific mount points + // are created (via bind mount). + GetGloballyMountedVolumes() []AttachedVolume // GetUnmountedVolumes generates and returns a list of attached volumes that // have no mountedPods. This list can be used to determine which volumes are - // no longer referenced and may be detached. + // no longer referenced and may be globally unmounted and detached. GetUnmountedVolumes() []AttachedVolume } @@ -492,6 +502,15 @@ func (asw *actualStateOfWorld) PodExistsInVolume( return podExists, volumeObj.devicePath, nil } +func (asw *actualStateOfWorld) VolumeExists( + volumeName api.UniqueVolumeName) bool { + asw.RLock() + defer asw.RUnlock() + + _, volumeExists := asw.attachedVolumes[volumeName] + return volumeExists +} + func (asw *actualStateOfWorld) GetMountedVolumes() []MountedVolume { asw.RLock() defer asw.RUnlock() @@ -525,17 +544,20 @@ func (asw *actualStateOfWorld) GetMountedVolumesForPod( return mountedVolume } -func (asw *actualStateOfWorld) GetAttachedVolumes() []AttachedVolume { +func (asw *actualStateOfWorld) GetGloballyMountedVolumes() []AttachedVolume { asw.RLock() defer asw.RUnlock() - unmountedVolumes := make([]AttachedVolume, 0 /* len */, len(asw.attachedVolumes) /* cap */) + globallyMountedVolumes := make( + []AttachedVolume, 0 /* len */, len(asw.attachedVolumes) /* cap */) for _, volumeObj := range asw.attachedVolumes { - unmountedVolumes = append( - unmountedVolumes, - asw.getAttachedVolume(&volumeObj)) + if volumeObj.globallyMounted { + globallyMountedVolumes = append( + globallyMountedVolumes, + asw.newAttachedVolume(&volumeObj)) + } } - return unmountedVolumes + return globallyMountedVolumes } func (asw *actualStateOfWorld) GetUnmountedVolumes() []AttachedVolume { @@ -546,14 +568,14 @@ func (asw *actualStateOfWorld) GetUnmountedVolumes() []AttachedVolume { if len(volumeObj.mountedPods) == 0 { unmountedVolumes = append( unmountedVolumes, - asw.getAttachedVolume(&volumeObj)) + asw.newAttachedVolume(&volumeObj)) } } return unmountedVolumes } -func (asw *actualStateOfWorld) getAttachedVolume( +func (asw *actualStateOfWorld) newAttachedVolume( attachedVolume *attachedVolume) AttachedVolume { return AttachedVolume{ AttachedVolume: operationexecutor.AttachedVolume{ diff --git a/pkg/kubelet/volume/cache/actual_state_of_world_test.go b/pkg/kubelet/volume/cache/actual_state_of_world_test.go index d74fca6c13d..7a1cf27b840 100644 --- a/pkg/kubelet/volume/cache/actual_state_of_world_test.go +++ b/pkg/kubelet/volume/cache/actual_state_of_world_test.go @@ -27,7 +27,8 @@ import ( ) // Calls AddVolume() once to add volume -// Verifies newly added volume exists in GetAttachedVolumes() +// Verifies newly added volume exists in GetUnmountedVolumes() +// Verifies newly added volume doesn't exist in GetGloballyMountedVolumes() func Test_AddVolume_Positive_NewVolume(t *testing.T) { // Arrange volumePluginMgr, _ := volumetesting.GetTestVolumePluginMgr(t) @@ -61,12 +62,15 @@ func Test_AddVolume_Positive_NewVolume(t *testing.T) { t.Fatalf("AddVolume failed. Expected: Actual: <%v>", err) } - verifyVolumeExistsInAttachedVolumes(t, generatedVolumeName, asw) + verifyVolumeExistsAsw(t, generatedVolumeName, true /* shouldExist */, asw) + verifyVolumeExistsInUnmountedVolumes(t, generatedVolumeName, asw) + verifyVolumeDoesntExistInGloballyMountedVolumes(t, generatedVolumeName, asw) } // Calls AddVolume() twice to add the same volume -// Verifies newly added volume exists in GetAttachedVolumes() and second call -// doesn't fail +// Verifies second call doesn't fail +// Verifies newly added volume exists in GetUnmountedVolumes() +// Verifies newly added volume doesn't exist in GetGloballyMountedVolumes() func Test_AddVolume_Positive_ExistingVolume(t *testing.T) { // Arrange volumePluginMgr, _ := volumetesting.GetTestVolumePluginMgr(t) @@ -105,7 +109,9 @@ func Test_AddVolume_Positive_ExistingVolume(t *testing.T) { t.Fatalf("AddVolume failed. Expected: Actual: <%v>", err) } - verifyVolumeExistsInAttachedVolumes(t, generatedVolumeName, asw) + verifyVolumeExistsAsw(t, generatedVolumeName, true /* shouldExist */, asw) + verifyVolumeExistsInUnmountedVolumes(t, generatedVolumeName, asw) + verifyVolumeDoesntExistInGloballyMountedVolumes(t, generatedVolumeName, asw) } // Populates data struct with a volume @@ -160,7 +166,9 @@ func Test_AddPodToVolume_Positive_ExistingVolumeNewNode(t *testing.T) { t.Fatalf("AddPodToVolume failed. Expected: Actual: <%v>", err) } - verifyVolumeExistsInAttachedVolumes(t, generatedVolumeName, asw) + verifyVolumeExistsAsw(t, generatedVolumeName, true /* shouldExist */, asw) + verifyVolumeDoesntExistInUnmountedVolumes(t, generatedVolumeName, asw) + verifyVolumeDoesntExistInGloballyMountedVolumes(t, generatedVolumeName, asw) verifyPodExistsInVolumeAsw(t, podName, generatedVolumeName, "fake/device/path" /* expectedDevicePath */, asw) } @@ -223,7 +231,9 @@ func Test_AddPodToVolume_Positive_ExistingVolumeExistingNode(t *testing.T) { t.Fatalf("AddPodToVolume failed. Expected: Actual: <%v>", err) } - verifyVolumeExistsInAttachedVolumes(t, generatedVolumeName, asw) + verifyVolumeExistsAsw(t, generatedVolumeName, true /* shouldExist */, asw) + verifyVolumeDoesntExistInUnmountedVolumes(t, generatedVolumeName, asw) + verifyVolumeDoesntExistInGloballyMountedVolumes(t, generatedVolumeName, asw) verifyPodExistsInVolumeAsw(t, podName, generatedVolumeName, "fake/device/path" /* expectedDevicePath */, asw) } @@ -280,7 +290,9 @@ func Test_AddPodToVolume_Negative_VolumeDoesntExist(t *testing.T) { t.Fatalf("AddPodToVolume did not fail. Expected: <\"no volume with the name ... exists in the list of attached volumes\"> Actual: ") } - verifyVolumeDoesntExistInAttachedVolumes(t, volumeName, asw) + verifyVolumeExistsAsw(t, volumeName, false /* shouldExist */, asw) + verifyVolumeDoesntExistInUnmountedVolumes(t, volumeName, asw) + verifyVolumeDoesntExistInGloballyMountedVolumes(t, volumeName, asw) verifyPodDoesntExistInVolumeAsw( t, podName, @@ -289,28 +301,116 @@ func Test_AddPodToVolume_Negative_VolumeDoesntExist(t *testing.T) { asw) } -func verifyVolumeExistsInAttachedVolumes( +// Calls AddVolume() once to add volume +// Calls MarkDeviceAsMounted() to mark volume as globally mounted. +// Verifies newly added volume exists in GetUnmountedVolumes() +// Verifies newly added volume exists in GetGloballyMountedVolumes() +func Test_MarkDeviceAsMounted_Positive_NewVolume(t *testing.T) { + // Arrange + volumePluginMgr, _ := volumetesting.GetTestVolumePluginMgr(t) + asw := NewActualStateOfWorld("mynode" /* nodeName */, volumePluginMgr) + pod := &api.Pod{ + ObjectMeta: api.ObjectMeta{ + Name: "pod1", + UID: "pod1uid", + }, + Spec: api.PodSpec{ + Volumes: []api.Volume{ + { + Name: "volume-name", + VolumeSource: api.VolumeSource{ + GCEPersistentDisk: &api.GCEPersistentDiskVolumeSource{ + PDName: "fake-device1", + }, + }, + }, + }, + }, + } + volumeSpec := &volume.Spec{Volume: &pod.Spec.Volumes[0]} + devicePath := "fake/device/path" + generatedVolumeName, err := asw.AddVolume(volumeSpec, devicePath) + if err != nil { + t.Fatalf("AddVolume failed. Expected: Actual: <%v>", err) + } + + // Act + err = asw.MarkDeviceAsMounted(generatedVolumeName) + + // Assert + if err != nil { + t.Fatalf("MarkDeviceAsMounted failed. Expected: Actual: <%v>", err) + } + + verifyVolumeExistsAsw(t, generatedVolumeName, true /* shouldExist */, asw) + verifyVolumeExistsInUnmountedVolumes(t, generatedVolumeName, asw) + verifyVolumeExistsInGloballyMountedVolumes(t, generatedVolumeName, asw) +} + +func verifyVolumeExistsInGloballyMountedVolumes( t *testing.T, expectedVolumeName api.UniqueVolumeName, asw ActualStateOfWorld) { - attachedVolumes := asw.GetAttachedVolumes() - for _, volume := range attachedVolumes { + globallyMountedVolumes := asw.GetGloballyMountedVolumes() + for _, volume := range globallyMountedVolumes { if volume.VolumeName == expectedVolumeName { return } } t.Fatalf( - "Could not find volume %v in the list of attached volumes for actual state of world %+v", + "Could not find volume %v in the list of GloballyMountedVolumes for actual state of world %+v", expectedVolumeName, - attachedVolumes) + globallyMountedVolumes) } -func verifyVolumeDoesntExistInAttachedVolumes( +func verifyVolumeDoesntExistInGloballyMountedVolumes( t *testing.T, volumeToCheck api.UniqueVolumeName, asw ActualStateOfWorld) { - attachedVolumes := asw.GetAttachedVolumes() - for _, volume := range attachedVolumes { + globallyMountedVolumes := asw.GetGloballyMountedVolumes() + for _, volume := range globallyMountedVolumes { if volume.VolumeName == volumeToCheck { t.Fatalf( - "Found volume %v in the list of attached volumes. Expected it not to exist.", + "Found volume %v in the list of GloballyMountedVolumes. Expected it not to exist.", + volumeToCheck) + } + } +} + +func verifyVolumeExistsAsw( + t *testing.T, + expectedVolumeName api.UniqueVolumeName, + shouldExist bool, + asw ActualStateOfWorld) { + volumeExists := asw.VolumeExists(expectedVolumeName) + if shouldExist != volumeExists { + t.Fatalf( + "VolumeExists(%q) response incorrect. Expected: <%v> Actual: <%v>", + expectedVolumeName, + shouldExist, + volumeExists) + } +} + +func verifyVolumeExistsInUnmountedVolumes( + t *testing.T, expectedVolumeName api.UniqueVolumeName, asw ActualStateOfWorld) { + unmountedVolumes := asw.GetUnmountedVolumes() + for _, volume := range unmountedVolumes { + if volume.VolumeName == expectedVolumeName { + return + } + } + + t.Fatalf( + "Could not find volume %v in the list of UnmountedVolumes for actual state of world %+v", + expectedVolumeName, + unmountedVolumes) +} + +func verifyVolumeDoesntExistInUnmountedVolumes( + t *testing.T, volumeToCheck api.UniqueVolumeName, asw ActualStateOfWorld) { + unmountedVolumes := asw.GetUnmountedVolumes() + for _, volume := range unmountedVolumes { + if volume.VolumeName == volumeToCheck { + t.Fatalf( + "Found volume %v in the list of UnmountedVolumes. Expected it not to exist.", volumeToCheck) } } diff --git a/pkg/kubelet/volume/cache/desired_state_of_world.go b/pkg/kubelet/volume/cache/desired_state_of_world.go index 004f029630c..673897d8e71 100644 --- a/pkg/kubelet/volume/cache/desired_state_of_world.go +++ b/pkg/kubelet/volume/cache/desired_state_of_world.go @@ -53,6 +53,16 @@ type DesiredStateOfWorld interface { // volume, this is a no-op. AddPodToVolume(podName types.UniquePodName, pod *api.Pod, volumeSpec *volume.Spec, outerVolumeSpecName string, volumeGidValue string) (api.UniqueVolumeName, error) + // MarkVolumesReportedInUse sets the ReportedInUse value to true for the + // reportedVolumes. For volumes not in the reportedVolumes list, the + // ReportedInUse value is reset to false. The default ReportedInUse value + // for a newly created volume is false. + // When set to true this value indicates that the volume was successfully + // added to the VolumesInUse field in the node's status. + // If a volume in the reportedVolumes list does not exist in the list of + // volumes that should be attached to this node, it is skipped without error. + MarkVolumesReportedInUse(reportedVolumes []api.UniqueVolumeName) + // DeletePodFromVolume removes the given pod from the given volume in the // cache indicating the specified pod no longer requires the specified // volume. @@ -128,6 +138,10 @@ type volumeToMount struct { // volumeGidValue contains the value of the GID annotation, if present. volumeGidValue string + + // reportedInUse indicates that the volume was successfully added to the + // VolumesInUse field in the node's status. + reportedInUse bool } // The pod object represents a pod that references the underlying volume and @@ -186,6 +200,7 @@ func (dsw *desiredStateOfWorld) AddPodToVolume( podsToMount: make(map[types.UniquePodName]podToMount), pluginIsAttachable: dsw.isAttachableVolume(volumeSpec), volumeGidValue: volumeGidValue, + reportedInUse: false, } dsw.volumesToMount[volumeName] = volumeObj } @@ -203,6 +218,25 @@ func (dsw *desiredStateOfWorld) AddPodToVolume( return volumeName, nil } +func (dsw *desiredStateOfWorld) MarkVolumesReportedInUse( + reportedVolumes []api.UniqueVolumeName) { + dsw.Lock() + defer dsw.Unlock() + + reportedVolumesMap := make( + map[api.UniqueVolumeName]bool, len(reportedVolumes) /* capacity */) + + for _, reportedVolume := range reportedVolumes { + reportedVolumesMap[reportedVolume] = true + } + + for volumeName, volumeObj := range dsw.volumesToMount { + _, volumeReported := reportedVolumesMap[volumeName] + volumeObj.reportedInUse = volumeReported + dsw.volumesToMount[volumeName] = volumeObj + } +} + func (dsw *desiredStateOfWorld) DeletePodFromVolume( podName types.UniquePodName, volumeName api.UniqueVolumeName) { dsw.Lock() @@ -266,7 +300,8 @@ func (dsw *desiredStateOfWorld) GetVolumesToMount() []VolumeToMount { VolumeSpec: podObj.spec, PluginIsAttachable: volumeObj.pluginIsAttachable, OuterVolumeSpecName: podObj.outerVolumeSpecName, - VolumeGidValue: volumeObj.volumeGidValue}}) + VolumeGidValue: volumeObj.volumeGidValue, + ReportedInUse: volumeObj.reportedInUse}}) } } return volumesToMount diff --git a/pkg/kubelet/volume/cache/desired_state_of_world_test.go b/pkg/kubelet/volume/cache/desired_state_of_world_test.go index 19262707848..41a3c2235c5 100644 --- a/pkg/kubelet/volume/cache/desired_state_of_world_test.go +++ b/pkg/kubelet/volume/cache/desired_state_of_world_test.go @@ -64,8 +64,9 @@ func Test_AddPodToVolume_Positive_NewPodNewVolume(t *testing.T) { t.Fatalf("AddPodToVolume failed. Expected: Actual: <%v>", err) } - verifyVolumeExists(t, generatedVolumeName, dsw) - verifyVolumeExistsInVolumesToMount(t, generatedVolumeName, dsw) + verifyVolumeExistsDsw(t, generatedVolumeName, dsw) + verifyVolumeExistsInVolumesToMount( + t, generatedVolumeName, false /* expectReportedInUse */, dsw) verifyPodExistsInVolumeDsw(t, podName, generatedVolumeName, dsw) } @@ -107,8 +108,9 @@ func Test_AddPodToVolume_Positive_ExistingPodExistingVolume(t *testing.T) { t.Fatalf("AddPodToVolume failed. Expected: Actual: <%v>", err) } - verifyVolumeExists(t, generatedVolumeName, dsw) - verifyVolumeExistsInVolumesToMount(t, generatedVolumeName, dsw) + verifyVolumeExistsDsw(t, generatedVolumeName, dsw) + verifyVolumeExistsInVolumesToMount( + t, generatedVolumeName, false /* expectReportedInUse */, dsw) verifyPodExistsInVolumeDsw(t, podName, generatedVolumeName, dsw) } @@ -145,8 +147,9 @@ func Test_DeletePodFromVolume_Positive_PodExistsVolumeExists(t *testing.T) { if err != nil { t.Fatalf("AddPodToVolume failed. Expected: Actual: <%v>", err) } - verifyVolumeExists(t, generatedVolumeName, dsw) - verifyVolumeExistsInVolumesToMount(t, generatedVolumeName, dsw) + verifyVolumeExistsDsw(t, generatedVolumeName, dsw) + verifyVolumeExistsInVolumesToMount( + t, generatedVolumeName, false /* expectReportedInUse */, dsw) verifyPodExistsInVolumeDsw(t, podName, generatedVolumeName, dsw) // Act @@ -158,7 +161,140 @@ func Test_DeletePodFromVolume_Positive_PodExistsVolumeExists(t *testing.T) { verifyPodDoesntExistInVolumeDsw(t, podName, generatedVolumeName, dsw) } -func verifyVolumeExists( +// Calls AddPodToVolume() to add three new volumes to data struct +// Verifies newly added pod/volume exists via PodExistsInVolume() +// VolumeExists() and GetVolumesToMount() +// Marks only second volume as reported in use. +// Verifies only that volume is marked reported in use +// Marks only first volume as reported in use. +// Verifies only that volume is marked reported in use +func Test_MarkVolumesReportedInUse_Positive_NewPodNewVolume(t *testing.T) { + // Arrange + volumePluginMgr, _ := volumetesting.GetTestVolumePluginMgr(t) + dsw := NewDesiredStateOfWorld(volumePluginMgr) + + pod1 := &api.Pod{ + ObjectMeta: api.ObjectMeta{ + Name: "pod1", + UID: "pod1uid", + }, + Spec: api.PodSpec{ + Volumes: []api.Volume{ + { + Name: "volume1-name", + VolumeSource: api.VolumeSource{ + GCEPersistentDisk: &api.GCEPersistentDiskVolumeSource{ + PDName: "fake-device1", + }, + }, + }, + }, + }, + } + + volume1Spec := &volume.Spec{Volume: &pod1.Spec.Volumes[0]} + pod1Name := volumehelper.GetUniquePodName(pod1) + + pod2 := &api.Pod{ + ObjectMeta: api.ObjectMeta{ + Name: "pod2", + UID: "pod2uid", + }, + Spec: api.PodSpec{ + Volumes: []api.Volume{ + { + Name: "volume2-name", + VolumeSource: api.VolumeSource{ + GCEPersistentDisk: &api.GCEPersistentDiskVolumeSource{ + PDName: "fake-device2", + }, + }, + }, + }, + }, + } + + volume2Spec := &volume.Spec{Volume: &pod2.Spec.Volumes[0]} + pod2Name := volumehelper.GetUniquePodName(pod2) + + pod3 := &api.Pod{ + ObjectMeta: api.ObjectMeta{ + Name: "pod3", + UID: "pod3uid", + }, + Spec: api.PodSpec{ + Volumes: []api.Volume{ + { + Name: "volume3-name", + VolumeSource: api.VolumeSource{ + GCEPersistentDisk: &api.GCEPersistentDiskVolumeSource{ + PDName: "fake-device3", + }, + }, + }, + }, + }, + } + + volume3Spec := &volume.Spec{Volume: &pod3.Spec.Volumes[0]} + pod3Name := volumehelper.GetUniquePodName(pod3) + + generatedVolume1Name, err := dsw.AddPodToVolume( + pod1Name, pod1, volume1Spec, volume1Spec.Name(), "" /* volumeGidValue */) + if err != nil { + t.Fatalf("AddPodToVolume failed. Expected: Actual: <%v>", err) + } + + generatedVolume2Name, err := dsw.AddPodToVolume( + pod2Name, pod2, volume2Spec, volume2Spec.Name(), "" /* volumeGidValue */) + if err != nil { + t.Fatalf("AddPodToVolume failed. Expected: Actual: <%v>", err) + } + + generatedVolume3Name, err := dsw.AddPodToVolume( + pod3Name, pod3, volume3Spec, volume3Spec.Name(), "" /* volumeGidValue */) + if err != nil { + t.Fatalf("AddPodToVolume failed. Expected: Actual: <%v>", err) + } + + // Act + volumesReportedInUse := []api.UniqueVolumeName{generatedVolume2Name} + dsw.MarkVolumesReportedInUse(volumesReportedInUse) + + // Assert + verifyVolumeExistsDsw(t, generatedVolume1Name, dsw) + verifyVolumeExistsInVolumesToMount( + t, generatedVolume1Name, false /* expectReportedInUse */, dsw) + verifyPodExistsInVolumeDsw(t, pod1Name, generatedVolume1Name, dsw) + verifyVolumeExistsDsw(t, generatedVolume2Name, dsw) + verifyVolumeExistsInVolumesToMount( + t, generatedVolume2Name, true /* expectReportedInUse */, dsw) + verifyPodExistsInVolumeDsw(t, pod2Name, generatedVolume2Name, dsw) + verifyVolumeExistsDsw(t, generatedVolume3Name, dsw) + verifyVolumeExistsInVolumesToMount( + t, generatedVolume3Name, false /* expectReportedInUse */, dsw) + verifyPodExistsInVolumeDsw(t, pod3Name, generatedVolume3Name, dsw) + + // Act + volumesReportedInUse = []api.UniqueVolumeName{generatedVolume3Name} + dsw.MarkVolumesReportedInUse(volumesReportedInUse) + + // Assert + verifyVolumeExistsDsw(t, generatedVolume1Name, dsw) + verifyVolumeExistsInVolumesToMount( + t, generatedVolume1Name, false /* expectReportedInUse */, dsw) + verifyPodExistsInVolumeDsw(t, pod1Name, generatedVolume1Name, dsw) + verifyVolumeExistsDsw(t, generatedVolume2Name, dsw) + verifyVolumeExistsInVolumesToMount( + t, generatedVolume2Name, false /* expectReportedInUse */, dsw) + verifyPodExistsInVolumeDsw(t, pod2Name, generatedVolume2Name, dsw) + verifyVolumeExistsDsw(t, generatedVolume3Name, dsw) + verifyVolumeExistsInVolumesToMount( + t, generatedVolume3Name, true /* expectReportedInUse */, dsw) + verifyPodExistsInVolumeDsw(t, pod3Name, generatedVolume3Name, dsw) +} + +func verifyVolumeExistsDsw( t *testing.T, expectedVolumeName api.UniqueVolumeName, dsw DesiredStateOfWorld) { volumeExists := dsw.VolumeExists(expectedVolumeName) if !volumeExists { @@ -181,10 +317,21 @@ func verifyVolumeDoesntExist( } func verifyVolumeExistsInVolumesToMount( - t *testing.T, expectedVolumeName api.UniqueVolumeName, dsw DesiredStateOfWorld) { + t *testing.T, + expectedVolumeName api.UniqueVolumeName, + expectReportedInUse bool, + dsw DesiredStateOfWorld) { volumesToMount := dsw.GetVolumesToMount() for _, volume := range volumesToMount { if volume.VolumeName == expectedVolumeName { + if volume.ReportedInUse != expectReportedInUse { + t.Fatalf( + "Found volume %v in the list of VolumesToMount, but ReportedInUse incorrect. Expected: <%v> Actual: <%v>", + expectedVolumeName, + expectReportedInUse, + volume.ReportedInUse) + } + return } } diff --git a/pkg/kubelet/volume/reconciler/reconciler.go b/pkg/kubelet/volume/reconciler/reconciler.go index de620d8a762..2f27bb9cab1 100644 --- a/pkg/kubelet/volume/reconciler/reconciler.go +++ b/pkg/kubelet/volume/reconciler/reconciler.go @@ -295,7 +295,7 @@ func (rc *reconciler) reconciliationLoopFunc() func() { attachedVolume.VolumeName, attachedVolume.VolumeSpec.Name()) err := rc.operationExecutor.DetachVolume( - attachedVolume.AttachedVolume, rc.actualStateOfWorld) + attachedVolume.AttachedVolume, false /* verifySafeToDetach */, rc.actualStateOfWorld) if err != nil && !goroutinemap.IsAlreadyExists(err) && !goroutinemap.IsExponentialBackoff(err) { diff --git a/pkg/kubelet/volume/reconciler/reconciler_test.go b/pkg/kubelet/volume/reconciler/reconciler_test.go index 6337e757c38..3f9c71fd4b6 100644 --- a/pkg/kubelet/volume/reconciler/reconciler_test.go +++ b/pkg/kubelet/volume/reconciler/reconciler_test.go @@ -116,7 +116,7 @@ func Test_Run_Positive_VolumeAttachAndMount(t *testing.T) { volumeSpec := &volume.Spec{Volume: &pod.Spec.Volumes[0]} podName := volumehelper.GetUniquePodName(pod) - _, err := dsw.AddPodToVolume( + generatedVolumeName, err := dsw.AddPodToVolume( podName, pod, volumeSpec, volumeSpec.Name(), "" /* volumeGidValue */) // Assert @@ -126,7 +126,7 @@ func Test_Run_Positive_VolumeAttachAndMount(t *testing.T) { // Act go reconciler.Run(wait.NeverStop) - waitForAttach(t, fakePlugin, asw) + waitForMount(t, fakePlugin, generatedVolumeName, asw) // Assert assert.NoError(t, volumetesting.VerifyAttachCallCount( @@ -183,8 +183,9 @@ func Test_Run_Positive_VolumeMountControllerAttachEnabled(t *testing.T) { volumeSpec := &volume.Spec{Volume: &pod.Spec.Volumes[0]} podName := volumehelper.GetUniquePodName(pod) - _, err := dsw.AddPodToVolume( + generatedVolumeName, err := dsw.AddPodToVolume( podName, pod, volumeSpec, volumeSpec.Name(), "" /* volumeGidValue */) + dsw.MarkVolumesReportedInUse([]api.UniqueVolumeName{generatedVolumeName}) // Assert if err != nil { @@ -193,7 +194,7 @@ func Test_Run_Positive_VolumeMountControllerAttachEnabled(t *testing.T) { // Act go reconciler.Run(wait.NeverStop) - waitForAttach(t, fakePlugin, asw) + waitForMount(t, fakePlugin, generatedVolumeName, asw) // Assert assert.NoError(t, volumetesting.VerifyZeroAttachCalls(fakePlugin)) @@ -259,7 +260,7 @@ func Test_Run_Positive_VolumeAttachMountUnmountDetach(t *testing.T) { // Act go reconciler.Run(wait.NeverStop) - waitForAttach(t, fakePlugin, asw) + waitForMount(t, fakePlugin, generatedVolumeName, asw) // Assert assert.NoError(t, volumetesting.VerifyAttachCallCount( @@ -275,7 +276,7 @@ func Test_Run_Positive_VolumeAttachMountUnmountDetach(t *testing.T) { // Act dsw.DeletePodFromVolume(podName, generatedVolumeName) - waitForDetach(t, fakePlugin, asw) + waitForDetach(t, fakePlugin, generatedVolumeName, asw) // Assert assert.NoError(t, volumetesting.VerifyTearDownCallCount( @@ -338,7 +339,8 @@ func Test_Run_Positive_VolumeUnmountControllerAttachEnabled(t *testing.T) { // Act go reconciler.Run(wait.NeverStop) - waitForAttach(t, fakePlugin, asw) + dsw.MarkVolumesReportedInUse([]api.UniqueVolumeName{generatedVolumeName}) + waitForMount(t, fakePlugin, generatedVolumeName, asw) // Assert assert.NoError(t, volumetesting.VerifyZeroAttachCalls(fakePlugin)) @@ -353,7 +355,7 @@ func Test_Run_Positive_VolumeUnmountControllerAttachEnabled(t *testing.T) { // Act dsw.DeletePodFromVolume(podName, generatedVolumeName) - waitForDetach(t, fakePlugin, asw) + waitForDetach(t, fakePlugin, generatedVolumeName, asw) // Assert assert.NoError(t, volumetesting.VerifyTearDownCallCount( @@ -361,16 +363,19 @@ func Test_Run_Positive_VolumeUnmountControllerAttachEnabled(t *testing.T) { assert.NoError(t, volumetesting.VerifyZeroDetachCallCount(fakePlugin)) } -func waitForAttach( +func waitForMount( t *testing.T, fakePlugin *volumetesting.FakeVolumePlugin, + volumeName api.UniqueVolumeName, asw cache.ActualStateOfWorld) { err := retryWithExponentialBackOff( time.Duration(5*time.Millisecond), func() (bool, error) { mountedVolumes := asw.GetMountedVolumes() - if len(mountedVolumes) > 0 { - return true, nil + for _, mountedVolume := range mountedVolumes { + if mountedVolume.VolumeName == volumeName { + return true, nil + } } return false, nil @@ -378,28 +383,28 @@ func waitForAttach( ) if err != nil { - t.Fatalf("Timed out waiting for len of asw.GetMountedVolumes() to become non-zero.") + t.Fatalf("Timed out waiting for volume %q to be attached.", volumeName) } } func waitForDetach( t *testing.T, fakePlugin *volumetesting.FakeVolumePlugin, + volumeName api.UniqueVolumeName, asw cache.ActualStateOfWorld) { err := retryWithExponentialBackOff( time.Duration(5*time.Millisecond), func() (bool, error) { - attachedVolumes := asw.GetAttachedVolumes() - if len(attachedVolumes) == 0 { - return true, nil + if asw.VolumeExists(volumeName) { + return false, nil } - return false, nil + return true, nil }, ) if err != nil { - t.Fatalf("Timed out waiting for len of asw.attachedVolumes() to become zero.") + t.Fatalf("Timed out waiting for volume %q to be detached.", volumeName) } } diff --git a/pkg/kubelet/volume/volume_manager.go b/pkg/kubelet/volume/volume_manager.go index b706f811e46..3cd1d8bd263 100644 --- a/pkg/kubelet/volume/volume_manager.go +++ b/pkg/kubelet/volume/volume_manager.go @@ -102,10 +102,24 @@ type VolumeManager interface { // pod object is bad, and should be avoided. GetVolumesForPodAndAppendSupplementalGroups(pod *api.Pod) container.VolumeMap - // Returns a list of all volumes that are currently attached according to - // the actual state of the world cache and implement the volume.Attacher - // interface. + // Returns a list of all volumes that implement the volume.Attacher + // interface and are currently in use according to the actual and desired + // state of the world caches. A volume is considered "in use" as soon as it + // is added to the desired state of world, indicating it *should* be + // attached to this node and remains "in use" until it is removed from both + // the desired state of the world and the actual state of the world, or it + // has been unmounted (as indicated in actual state of world). + // TODO(#27653): VolumesInUse should be handled gracefully on kubelet' + // restarts. GetVolumesInUse() []api.UniqueVolumeName + + // VolumeIsAttached returns true if the given volume is attached to this + // node. + VolumeIsAttached(volumeName api.UniqueVolumeName) bool + + // Marks the specified volume as having successfully been reported as "in + // use" in the nodes's volume status. + MarkVolumesAsReportedInUse(volumesReportedAsInUse []api.UniqueVolumeName) } // NewVolumeManager returns a new concrete instance implementing the @@ -209,16 +223,47 @@ func (vm *volumeManager) GetVolumesForPodAndAppendSupplementalGroups( } func (vm *volumeManager) GetVolumesInUse() []api.UniqueVolumeName { - attachedVolumes := vm.actualStateOfWorld.GetAttachedVolumes() - volumesInUse := - make([]api.UniqueVolumeName, 0 /* len */, len(attachedVolumes) /* cap */) - for _, attachedVolume := range attachedVolumes { - if attachedVolume.PluginIsAttachable { - volumesInUse = append(volumesInUse, attachedVolume.VolumeName) + // Report volumes in desired state of world and actual state of world so + // that volumes are marked in use as soon as the decision is made that the + // volume *should* be attached to this node until it is safely unmounted. + desiredVolumes := vm.desiredStateOfWorld.GetVolumesToMount() + mountedVolumes := vm.actualStateOfWorld.GetGloballyMountedVolumes() + volumesToReportInUse := + make( + []api.UniqueVolumeName, + 0, /* len */ + len(desiredVolumes)+len(mountedVolumes) /* cap */) + desiredVolumesMap := + make( + map[api.UniqueVolumeName]bool, + len(desiredVolumes)+len(mountedVolumes) /* cap */) + + for _, volume := range desiredVolumes { + if volume.PluginIsAttachable { + desiredVolumesMap[volume.VolumeName] = true + volumesToReportInUse = append(volumesToReportInUse, volume.VolumeName) } } - return volumesInUse + for _, volume := range mountedVolumes { + if volume.PluginIsAttachable { + if _, exists := desiredVolumesMap[volume.VolumeName]; !exists { + volumesToReportInUse = append(volumesToReportInUse, volume.VolumeName) + } + } + } + + return volumesToReportInUse +} + +func (vm *volumeManager) VolumeIsAttached( + volumeName api.UniqueVolumeName) bool { + return vm.actualStateOfWorld.VolumeExists(volumeName) +} + +func (vm *volumeManager) MarkVolumesAsReportedInUse( + volumesReportedAsInUse []api.UniqueVolumeName) { + vm.desiredStateOfWorld.MarkVolumesReportedInUse(volumesReportedAsInUse) } // getVolumesForPodHelper is a helper method implements the common logic for diff --git a/pkg/volume/util/operationexecutor/operation_executor.go b/pkg/volume/util/operationexecutor/operation_executor.go index 25b4fb3b165..30eebab13e9 100644 --- a/pkg/volume/util/operationexecutor/operation_executor.go +++ b/pkg/volume/util/operationexecutor/operation_executor.go @@ -60,8 +60,10 @@ type OperationExecutor interface { // DetachVolume detaches the volume from the node specified in // volumeToDetach, and updates the actual state of the world to reflect - // that. - DetachVolume(volumeToDetach AttachedVolume, actualStateOfWorld ActualStateOfWorldAttacherUpdater) error + // that. If verifySafeToDetach is set, a call is made to the fetch the node + // object and it is used to verify that the volume does not exist in Node's + // Status.VolumesInUse list (operation fails with error if it is). + DetachVolume(volumeToDetach AttachedVolume, verifySafeToDetach bool, actualStateOfWorld ActualStateOfWorldAttacherUpdater) error // MountVolume mounts the volume to the pod specified in volumeToMount. // Specifically it will: @@ -183,6 +185,10 @@ type VolumeToMount struct { // DevicePath contains the path on the node where the volume is attached. // For non-attachable volumes this is empty. DevicePath string + + // ReportedInUse indicates that the volume was successfully added to the + // VolumesInUse field in the node's status. + ReportedInUse bool } // AttachedVolume represents a volume that is attached to a node. @@ -335,9 +341,10 @@ func (oe *operationExecutor) AttachVolume( func (oe *operationExecutor) DetachVolume( volumeToDetach AttachedVolume, + verifySafeToDetach bool, actualStateOfWorld ActualStateOfWorldAttacherUpdater) error { detachFunc, err := - oe.generateDetachVolumeFunc(volumeToDetach, actualStateOfWorld) + oe.generateDetachVolumeFunc(volumeToDetach, verifySafeToDetach, actualStateOfWorld) if err != nil { return err } @@ -465,6 +472,7 @@ func (oe *operationExecutor) generateAttachVolumeFunc( func (oe *operationExecutor) generateDetachVolumeFunc( volumeToDetach AttachedVolume, + verifySafeToDetach bool, actualStateOfWorld ActualStateOfWorldAttacherUpdater) (func() error, error) { // Get attacher plugin attachableVolumePlugin, err := @@ -500,6 +508,44 @@ func (oe *operationExecutor) generateDetachVolumeFunc( } return func() error { + if verifySafeToDetach { + // Fetch current node object + node, fetchErr := oe.kubeClient.Core().Nodes().Get(volumeToDetach.NodeName) + if fetchErr != nil { + // On failure, return error. Caller will log and retry. + return fmt.Errorf( + "DetachVolume failed fetching node from API server for volume %q (spec.Name: %q) from node %q with: %v", + volumeToDetach.VolumeName, + volumeToDetach.VolumeSpec.Name(), + volumeToDetach.NodeName, + fetchErr) + } + + if node == nil { + // On failure, return error. Caller will log and retry. + return fmt.Errorf( + "DetachVolume failed fetching node from API server for volume %q (spec.Name: %q) from node %q. Error: node object retrieved from API server is nil.", + volumeToDetach.VolumeName, + volumeToDetach.VolumeSpec.Name(), + volumeToDetach.NodeName) + } + + for _, inUseVolume := range node.Status.VolumesInUse { + if inUseVolume == volumeToDetach.VolumeName { + return fmt.Errorf("DetachVolume failed for volume %q (spec.Name: %q) from node %q. Error: volume is still in use by node, according to Node status.", + volumeToDetach.VolumeName, + volumeToDetach.VolumeSpec.Name(), + volumeToDetach.NodeName) + } + } + + // Volume not attached, return error. Caller will log and retry. + glog.Infof("Verified volume is safe to detach for volume %q (spec.Name: %q) from node %q.", + volumeToDetach.VolumeName, + volumeToDetach.VolumeSpec.Name(), + volumeToDetach.NodeName) + } + // Execute detach detachErr := volumeDetacher.Detach(volumeName, volumeToDetach.NodeName) if detachErr != nil { @@ -864,6 +910,20 @@ func (oe *operationExecutor) generateVerifyControllerAttachedVolumeFunc( return nil } + if !volumeToMount.ReportedInUse { + // If the given volume has not yet been added to the list of + // VolumesInUse in the node's volume status, do not proceed, return + // error. Caller will log and retry. The node status is updated + // periodically by kubelet, so it may take as much as 10 seconds + // before this clears. + // Issue #28141 to enable on demand status updates. + return fmt.Errorf("Volume %q (spec.Name: %q) pod %q (UID: %q) has not yet been added to the list of VolumesInUse in the node's volume status.", + volumeToMount.VolumeName, + volumeToMount.VolumeSpec.Name(), + volumeToMount.PodName, + volumeToMount.Pod.UID) + } + // Fetch current node object node, fetchErr := oe.kubeClient.Core().Nodes().Get(nodeName) if fetchErr != nil {