From efaceb28cce760e6be3f7d0b9abf880e92a5bf82 Mon Sep 17 00:00:00 2001 From: Jing Xu Date: Wed, 7 Sep 2016 15:30:16 -0700 Subject: [PATCH] Fix race condition in updating attached volume between master and node This PR tries to fix issue #29324. This cause of this issue is a race condition happens when marking volumes as attached for node status. This PR tries to clean up the logic of when and where to mark volumes as attached/detached. Basically the workflow as follows, 1. When volume is attached sucessfully, the volume and node info is added into nodesToUpdateStatusFor to mark the volume as attached to the node. 2. When detach request comes in, it will check whether it is safe to detach now. If the check passes, remove the volume from volumesToReportAsAttached to indicate the volume is no longer considered as attached now. Afterwards, reconciler tries to update node status and trigger detach operation. If any of these operation fails, the volume is added back to the volumesToReportAsAttached list showing that it is still attached. These steps should make sure that kubelet get the right (might be outdated) information about which volume is attached or not. It also garantees that if detach operation is pending, kubelet should not trigger any mount operations. --- .../attachdetach/attach_detach_controller.go | 1 + .../cache/actual_state_of_world.go | 202 ++++++++++------- .../cache/actual_state_of_world_test.go | 209 ++++++++++++++++-- .../attachdetach/reconciler/reconciler.go | 92 ++++---- .../cache/actual_state_of_world.go | 9 + .../operationexecutor/operation_executor.go | 29 ++- 6 files changed, 384 insertions(+), 158 deletions(-) diff --git a/pkg/controller/volume/attachdetach/attach_detach_controller.go b/pkg/controller/volume/attachdetach/attach_detach_controller.go index 08c12374ae7..c1055a7ba0e 100644 --- a/pkg/controller/volume/attachdetach/attach_detach_controller.go +++ b/pkg/controller/volume/attachdetach/attach_detach_controller.go @@ -517,6 +517,7 @@ func (adc *attachDetachController) getPVSpecFromCache( // mounted. func (adc *attachDetachController) processVolumesInUse( nodeName string, volumesInUse []api.UniqueVolumeName) { + glog.V(4).Infof("processVolumesInUse for node %q", nodeName) for _, attachedVolume := range adc.actualStateOfWorld.GetAttachedVolumesForNode(nodeName) { mounted := false for _, volumeInUse := range volumesInUse { diff --git a/pkg/controller/volume/attachdetach/cache/actual_state_of_world.go b/pkg/controller/volume/attachdetach/cache/actual_state_of_world.go index 52df9cb36c6..55bff28d2db 100644 --- a/pkg/controller/volume/attachdetach/cache/actual_state_of_world.go +++ b/pkg/controller/volume/attachdetach/cache/actual_state_of_world.go @@ -26,6 +26,8 @@ import ( "sync" "time" + "github.com/golang/glog" + "k8s.io/kubernetes/pkg/api" "k8s.io/kubernetes/pkg/volume" "k8s.io/kubernetes/pkg/volume/util/operationexecutor" @@ -48,8 +50,6 @@ type ActualStateOfWorld interface { // indicating the specified volume is attached to the specified node. // A unique volume name is generated from the volumeSpec and returned on // success. - // If the volume/node combo already exists, the detachRequestedTime is reset - // to zero. // If volumeSpec is not an attachable volume plugin, an error is returned. // If no volume with the name volumeName exists in the store, the volume is // added. @@ -66,15 +66,6 @@ type ActualStateOfWorld interface { // the specified volume, an error is returned. SetVolumeMountedByNode(volumeName api.UniqueVolumeName, nodeName string, mounted bool) error - // MarkDesireToDetach returns the difference between the current time and - // the DetachRequestedTime for the given volume/node combo. If the - // DetachRequestedTime is zero, it is set to the current time. - // If no volume with the name volumeName exists in the store, an error is - // returned. - // If no node with the name nodeName exists in list of attached nodes for - // the specified volume, an error is returned. - MarkDesireToDetach(volumeName api.UniqueVolumeName, nodeName string) (time.Duration, error) - // ResetNodeStatusUpdateNeeded resets statusUpdateNeeded for the specified // node to false indicating the AttachedVolume field of the Node's Status // object has been updated. @@ -82,6 +73,15 @@ type ActualStateOfWorld interface { // the specified volume, an error is returned. ResetNodeStatusUpdateNeeded(nodeName string) error + // ResetDetachRequestTime resets the detachRequestTime to 0 which indicates there is no detach + // request any more for the volume + ResetDetachRequestTime(volumeName api.UniqueVolumeName, nodeName string) + + // SetDetachRequestTime sets the detachRequestedTime to current time if this is no + // previous request (the previous detachRequestedTime is zero) and return the time elapsed + // since last request + SetDetachRequestTime(volumeName api.UniqueVolumeName, nodeName string) (time.Duration, error) + // DeleteVolumeNode removes the given volume and node from the underlying // store indicating the specified volume is no longer attached to the // specified node. @@ -126,9 +126,9 @@ type AttachedVolume struct { // DetachRequestedTime is used to capture the desire to detach this volume. // When the volume is newly created this value is set to time zero. - // It is set to current time, when MarkDesireToDetach(...) is called, if it + // It is set to current time, when SetDetachRequestTime(...) is called, if it // was previously set to zero (other wise its value remains the same). - // It is reset to zero on AddVolumeNode(...) calls. + // It is reset to zero on ResetDetachRequestTime(...) calls. DetachRequestedTime time.Time } @@ -234,6 +234,20 @@ func (asw *actualStateOfWorld) MarkVolumeAsDetached( asw.DeleteVolumeNode(volumeName, nodeName) } +func (asw *actualStateOfWorld) RemoveVolumeFromReportAsAttached( + volumeName api.UniqueVolumeName, nodeName string) error { + asw.Lock() + defer asw.Unlock() + return asw.removeVolumeFromReportAsAttached(volumeName, nodeName) +} + +func (asw *actualStateOfWorld) AddVolumeToReportAsAttached( + volumeName api.UniqueVolumeName, nodeName string) { + asw.Lock() + defer asw.Unlock() + asw.addVolumeToReportAsAttached(volumeName, nodeName) +} + func (asw *actualStateOfWorld) AddVolumeNode( volumeSpec *volume.Spec, nodeName string, devicePath string) (api.UniqueVolumeName, error) { asw.Lock() @@ -267,7 +281,7 @@ func (asw *actualStateOfWorld) AddVolumeNode( asw.attachedVolumes[volumeName] = volumeObj } - nodeObj, nodeExists := volumeObj.nodesAttachedTo[nodeName] + _, nodeExists := volumeObj.nodesAttachedTo[nodeName] if !nodeExists { // Create object if it doesn't exist. volumeObj.nodesAttachedTo[nodeName] = nodeAttachedTo{ @@ -276,30 +290,13 @@ func (asw *actualStateOfWorld) AddVolumeNode( mountedByNodeSetCount: 0, detachRequestedTime: time.Time{}, } - } else if !nodeObj.detachRequestedTime.IsZero() { - // Reset detachRequestedTime values if object exists and time is non-zero - nodeObj.detachRequestedTime = time.Time{} - volumeObj.nodesAttachedTo[nodeName] = nodeObj - } - - nodeToUpdate, nodeToUpdateExists := asw.nodesToUpdateStatusFor[nodeName] - if !nodeToUpdateExists { - // Create object if it doesn't exist - nodeToUpdate = nodeToUpdateStatusFor{ - nodeName: nodeName, - statusUpdateNeeded: true, - volumesToReportAsAttached: make(map[api.UniqueVolumeName]api.UniqueVolumeName), - } - asw.nodesToUpdateStatusFor[nodeName] = nodeToUpdate - } - _, nodeToUpdateVolumeExists := - nodeToUpdate.volumesToReportAsAttached[volumeName] - if !nodeToUpdateVolumeExists { - nodeToUpdate.statusUpdateNeeded = true - nodeToUpdate.volumesToReportAsAttached[volumeName] = volumeName - asw.nodesToUpdateStatusFor[nodeName] = nodeToUpdate + } else { + glog.V(5).Infof("Volume %q is already added to attachedVolume list to the node %q", + volumeName, + nodeName) } + asw.addVolumeToReportAsAttached(volumeName, nodeName) return volumeName, nil } @@ -307,22 +304,10 @@ func (asw *actualStateOfWorld) SetVolumeMountedByNode( volumeName api.UniqueVolumeName, nodeName string, mounted bool) error { asw.Lock() defer asw.Unlock() - volumeObj, volumeExists := asw.attachedVolumes[volumeName] - if !volumeExists { - return fmt.Errorf( - "failed to SetVolumeMountedByNode(volumeName=%v, nodeName=%q, mounted=%v) volumeName does not exist", - volumeName, - nodeName, - mounted) - } - nodeObj, nodeExists := volumeObj.nodesAttachedTo[nodeName] - if !nodeExists { - return fmt.Errorf( - "failed to SetVolumeMountedByNode(volumeName=%v, nodeName=%q, mounted=%v) nodeName does not exist", - volumeName, - nodeName, - mounted) + volumeObj, nodeObj, err := asw.getNodeAndVolume(volumeName, nodeName) + if err != nil { + return fmt.Errorf("Failed to SetVolumeMountedByNode with error: %v", err) } if mounted { @@ -337,37 +322,70 @@ func (asw *actualStateOfWorld) SetVolumeMountedByNode( nodeObj.mountedByNode = mounted volumeObj.nodesAttachedTo[nodeName] = nodeObj - + glog.V(4).Infof("SetVolumeMountedByNode volume %v to the node %q mounted %q", + volumeName, + nodeName, + mounted) return nil } -func (asw *actualStateOfWorld) MarkDesireToDetach( +func (asw *actualStateOfWorld) ResetDetachRequestTime( + volumeName api.UniqueVolumeName, nodeName string) { + asw.Lock() + defer asw.Unlock() + + volumeObj, nodeObj, err := asw.getNodeAndVolume(volumeName, nodeName) + if err != nil { + glog.Errorf("Failed to ResetDetachRequestTime with error: %v", err) + return + } + nodeObj.detachRequestedTime = time.Time{} + volumeObj.nodesAttachedTo[nodeName] = nodeObj +} + +func (asw *actualStateOfWorld) SetDetachRequestTime( volumeName api.UniqueVolumeName, nodeName string) (time.Duration, error) { asw.Lock() defer asw.Unlock() - volumeObj, volumeExists := asw.attachedVolumes[volumeName] - if !volumeExists { - return time.Millisecond * 0, fmt.Errorf( - "failed to MarkDesireToDetach(volumeName=%v, nodeName=%q) volumeName does not exist", - volumeName, - nodeName) + volumeObj, nodeObj, err := asw.getNodeAndVolume(volumeName, nodeName) + if err != nil { + return 0, fmt.Errorf("Failed to set detach request time with error: %v", err) } - - nodeObj, nodeExists := volumeObj.nodesAttachedTo[nodeName] - if !nodeExists { - return time.Millisecond * 0, fmt.Errorf( - "failed to MarkDesireToDetach(volumeName=%v, nodeName=%q) nodeName does not exist", - volumeName, - nodeName) - } - + // If there is no previous detach request, set it to the current time if nodeObj.detachRequestedTime.IsZero() { nodeObj.detachRequestedTime = time.Now() volumeObj.nodesAttachedTo[nodeName] = nodeObj + glog.V(4).Infof("Set detach request time to current time for volume %v on node %q", + volumeName, + nodeName) + } + return time.Since(nodeObj.detachRequestedTime), nil +} + +// Get the volume and node object from actual state of world +// This is an internal function and caller should acquire and release the lock +func (asw *actualStateOfWorld) getNodeAndVolume( + volumeName api.UniqueVolumeName, nodeName string) (attachedVolume, nodeAttachedTo, error) { + + volumeObj, volumeExists := asw.attachedVolumes[volumeName] + if volumeExists { + nodeObj, nodeExists := volumeObj.nodesAttachedTo[nodeName] + if nodeExists { + return volumeObj, nodeObj, nil + } } - // Remove volume from volumes to report as attached + return attachedVolume{}, nodeAttachedTo{}, fmt.Errorf("volume %v is no longer attached to the node %q", + volumeName, + nodeName) +} + +// Remove the volumeName from the node's volumesToReportAsAttached list +// This is an internal function and caller should acquire and release the lock +func (asw *actualStateOfWorld) removeVolumeFromReportAsAttached( + volumeName api.UniqueVolumeName, nodeName string) error { + nodeToUpdate, nodeToUpdateExists := asw.nodesToUpdateStatusFor[nodeName] if nodeToUpdateExists { _, nodeToUpdateVolumeExists := @@ -376,10 +394,43 @@ func (asw *actualStateOfWorld) MarkDesireToDetach( nodeToUpdate.statusUpdateNeeded = true delete(nodeToUpdate.volumesToReportAsAttached, volumeName) asw.nodesToUpdateStatusFor[nodeName] = nodeToUpdate + return nil } } + return fmt.Errorf("volume %q or node %q does not exist in volumesToReportAsAttached list", + volumeName, + nodeName) - return time.Since(volumeObj.nodesAttachedTo[nodeName].detachRequestedTime), nil +} + +// Add the volumeName to the node's volumesToReportAsAttached list +// This is an internal function and caller should acquire and release the lock +func (asw *actualStateOfWorld) addVolumeToReportAsAttached( + volumeName api.UniqueVolumeName, nodeName string) { + // In case the volume/node entry is no longer in attachedVolume list, skip the rest + if _, _, err := asw.getNodeAndVolume(volumeName, nodeName); err != nil { + glog.V(4).Infof("Volume %q is no longer attached to node %q", volumeName, nodeName) + return + } + nodeToUpdate, nodeToUpdateExists := asw.nodesToUpdateStatusFor[nodeName] + if !nodeToUpdateExists { + // Create object if it doesn't exist + nodeToUpdate = nodeToUpdateStatusFor{ + nodeName: nodeName, + statusUpdateNeeded: true, + volumesToReportAsAttached: make(map[api.UniqueVolumeName]api.UniqueVolumeName), + } + asw.nodesToUpdateStatusFor[nodeName] = nodeToUpdate + glog.V(4).Infof("Add new node %q to nodesToUpdateStatusFor", nodeName) + } + _, nodeToUpdateVolumeExists := + nodeToUpdate.volumesToReportAsAttached[volumeName] + if !nodeToUpdateVolumeExists { + nodeToUpdate.statusUpdateNeeded = true + nodeToUpdate.volumesToReportAsAttached[volumeName] = volumeName + asw.nodesToUpdateStatusFor[nodeName] = nodeToUpdate + glog.V(4).Infof("Report volume %q as attached to node %q", volumeName, nodeName) + } } func (asw *actualStateOfWorld) ResetNodeStatusUpdateNeeded( @@ -419,16 +470,7 @@ func (asw *actualStateOfWorld) DeleteVolumeNode( } // Remove volume from volumes to report as attached - nodeToUpdate, nodeToUpdateExists := asw.nodesToUpdateStatusFor[nodeName] - if nodeToUpdateExists { - _, nodeToUpdateVolumeExists := - nodeToUpdate.volumesToReportAsAttached[volumeName] - if nodeToUpdateVolumeExists { - nodeToUpdate.statusUpdateNeeded = true - delete(nodeToUpdate.volumesToReportAsAttached, volumeName) - asw.nodesToUpdateStatusFor[nodeName] = nodeToUpdate - } - } + asw.removeVolumeFromReportAsAttached(volumeName, nodeName) } func (asw *actualStateOfWorld) VolumeNodeExists( diff --git a/pkg/controller/volume/attachdetach/cache/actual_state_of_world_test.go b/pkg/controller/volume/attachdetach/cache/actual_state_of_world_test.go index acd503b2290..8379fab83c0 100644 --- a/pkg/controller/volume/attachdetach/cache/actual_state_of_world_test.go +++ b/pkg/controller/volume/attachdetach/cache/actual_state_of_world_test.go @@ -18,6 +18,7 @@ package cache import ( "testing" + "time" "k8s.io/kubernetes/pkg/api" controllervolumetesting "k8s.io/kubernetes/pkg/controller/volume/attachdetach/testing" @@ -597,7 +598,7 @@ func Test_SetVolumeMountedByNode_Positive_UnsetWithInitialSetAddVolumeNodeNotRes } // Populates data struct with one volume/node entry. -// Calls MarkDesireToDetach() once on volume/node entry. +// Calls RemoveVolumeFromReportAsAttached() once on volume/node entry. // Calls SetVolumeMountedByNode() twice, first setting mounted to true then false. // Verifies mountedByNode is false and detachRequestedTime is NOT zero. func Test_SetVolumeMountedByNode_Positive_UnsetWithInitialSetVerifyDetachRequestedTimePerserved(t *testing.T) { @@ -612,9 +613,13 @@ func Test_SetVolumeMountedByNode_Positive_UnsetWithInitialSetVerifyDetachRequest if addErr != nil { t.Fatalf("AddVolumeNode failed. Expected: Actual: <%v>", addErr) } - _, err := asw.MarkDesireToDetach(generatedVolumeName, nodeName) + _, err := asw.SetDetachRequestTime(generatedVolumeName, nodeName) if err != nil { - t.Fatalf("MarkDesireToDetach failed. Expected: Actual: <%v>", err) + t.Fatalf("SetDetachRequestTime failed. Expected: Actual: <%v>", err) + } + err = asw.RemoveVolumeFromReportAsAttached(generatedVolumeName, nodeName) + if err != nil { + t.Fatalf("RemoveVolumeFromReportAsAttached failed. Expected: Actual: <%v>", err) } expectedDetachRequestedTime := asw.GetAttachedVolumes()[0].DetachRequestedTime @@ -643,7 +648,7 @@ func Test_SetVolumeMountedByNode_Positive_UnsetWithInitialSetVerifyDetachRequest // Populates data struct with one volume/node entry. // Verifies mountedByNode is true and detachRequestedTime is zero (default values). -func Test_MarkDesireToDetach_Positive_Set(t *testing.T) { +func Test_RemoveVolumeFromReportAsAttached_Positive_Set(t *testing.T) { // Arrange volumePluginMgr, _ := volumetesting.GetTestVolumePluginMgr(t) asw := NewActualStateOfWorld(volumePluginMgr) @@ -668,9 +673,9 @@ func Test_MarkDesireToDetach_Positive_Set(t *testing.T) { } // Populates data struct with one volume/node entry. -// Calls MarkDesireToDetach() once on volume/node entry. +// Calls RemoveVolumeFromReportAsAttached() once on volume/node entry. // Verifies mountedByNode is true and detachRequestedTime is NOT zero. -func Test_MarkDesireToDetach_Positive_Marked(t *testing.T) { +func Test_RemoveVolumeFromReportAsAttached_Positive_Marked(t *testing.T) { // Arrange volumePluginMgr, _ := volumetesting.GetTestVolumePluginMgr(t) asw := NewActualStateOfWorld(volumePluginMgr) @@ -684,9 +689,11 @@ func Test_MarkDesireToDetach_Positive_Marked(t *testing.T) { } // Act - _, markDesireToDetachErr := asw.MarkDesireToDetach(generatedVolumeName, nodeName) - - // Assert + _, err := asw.SetDetachRequestTime(generatedVolumeName, nodeName) + if err != nil { + t.Fatalf("SetDetachRequestTime failed. Expected: Actual: <%v>", err) + } + markDesireToDetachErr := asw.RemoveVolumeFromReportAsAttached(generatedVolumeName, nodeName) if markDesireToDetachErr != nil { t.Fatalf("MarkDesireToDetach failed. Expected: Actual: <%v>", markDesireToDetachErr) } @@ -702,7 +709,7 @@ func Test_MarkDesireToDetach_Positive_Marked(t *testing.T) { // Populates data struct with one volume/node entry. // Calls MarkDesireToDetach() once on volume/node entry. -// Calls AddVolumeNode() to re-add the same volume/node entry. +// Calls ResetDetachRequestTime() to reset the detach request time value back to 0. // Verifies mountedByNode is true and detachRequestedTime is reset to zero. func Test_MarkDesireToDetach_Positive_MarkedAddVolumeNodeReset(t *testing.T) { // Arrange @@ -718,12 +725,17 @@ func Test_MarkDesireToDetach_Positive_MarkedAddVolumeNodeReset(t *testing.T) { } // Act - _, markDesireToDetachErr := asw.MarkDesireToDetach(generatedVolumeName, nodeName) - generatedVolumeName, addErr = asw.AddVolumeNode(volumeSpec, nodeName, devicePath) + _, err := asw.SetDetachRequestTime(generatedVolumeName, nodeName) + if err != nil { + t.Fatalf("SetDetachRequestTime failed. Expected: Actual: <%v>", err) + } + markDesireToDetachErr := asw.RemoveVolumeFromReportAsAttached(generatedVolumeName, nodeName) + // Reset detach request time to 0 + asw.ResetDetachRequestTime(generatedVolumeName, nodeName) // Assert if markDesireToDetachErr != nil { - t.Fatalf("MarkDesireToDetach failed. Expected: Actual: <%v>", markDesireToDetachErr) + t.Fatalf("RemoveVolumeFromReportAsAttached failed. Expected: Actual: <%v>", markDesireToDetachErr) } if addErr != nil { t.Fatalf("AddVolumeNode failed. Expected: Actual: <%v>", addErr) @@ -740,9 +752,9 @@ func Test_MarkDesireToDetach_Positive_MarkedAddVolumeNodeReset(t *testing.T) { // Populates data struct with one volume/node entry. // Calls SetVolumeMountedByNode() twice, first setting mounted to true then false. -// Calls MarkDesireToDetach() once on volume/node entry. +// Calls RemoveVolumeFromReportAsAttached() once on volume/node entry. // Verifies mountedByNode is false and detachRequestedTime is NOT zero. -func Test_MarkDesireToDetach_Positive_UnsetWithInitialSetVolumeMountedByNodePreserved(t *testing.T) { +func Test_RemoveVolumeFromReportAsAttached_Positive_UnsetWithInitialSetVolumeMountedByNodePreserved(t *testing.T) { // Arrange volumePluginMgr, _ := volumetesting.GetTestVolumePluginMgr(t) asw := NewActualStateOfWorld(volumePluginMgr) @@ -764,11 +776,13 @@ func Test_MarkDesireToDetach_Positive_UnsetWithInitialSetVolumeMountedByNodePres } // Act - _, markDesireToDetachErr := asw.MarkDesireToDetach(generatedVolumeName, nodeName) - - // Assert - if markDesireToDetachErr != nil { - t.Fatalf("MarkDesireToDetach failed. Expected: Actual: <%v>", markDesireToDetachErr) + _, err := asw.SetDetachRequestTime(generatedVolumeName, nodeName) + if err != nil { + t.Fatalf("SetDetachRequestTime failed. Expected: Actual: <%v>", err) + } + removeVolumeDetachErr := asw.RemoveVolumeFromReportAsAttached(generatedVolumeName, nodeName) + if removeVolumeDetachErr != nil { + t.Fatalf("RemoveVolumeFromReportAsAttached failed. Expected: Actual: <%v>", removeVolumeDetachErr) } // Assert @@ -780,6 +794,161 @@ func Test_MarkDesireToDetach_Positive_UnsetWithInitialSetVolumeMountedByNodePres verifyAttachedVolume(t, attachedVolumes, generatedVolumeName, string(volumeName), nodeName, false /* expectedMountedByNode */, true /* expectNonZeroDetachRequestedTime */) } +// Populates data struct with one volume/node entry. +// Calls RemoveVolumeFromReportAsAttached +// Verifyies there is no valume as reported as attached +func Test_RemoveVolumeFromReportAsAttached(t *testing.T) { + // Arrange + volumePluginMgr, _ := volumetesting.GetTestVolumePluginMgr(t) + asw := NewActualStateOfWorld(volumePluginMgr) + volumeName := api.UniqueVolumeName("volume-name") + volumeSpec := controllervolumetesting.GetTestVolumeSpec(string(volumeName), volumeName) + nodeName := "node-name" + devicePath := "fake/device/path" + generatedVolumeName, addErr := asw.AddVolumeNode(volumeSpec, nodeName, devicePath) + if addErr != nil { + t.Fatalf("AddVolumeNode failed. Expected: Actual: <%v>", addErr) + } + + removeVolumeDetachErr := asw.RemoveVolumeFromReportAsAttached(generatedVolumeName, nodeName) + if removeVolumeDetachErr != nil { + t.Fatalf("RemoveVolumeFromReportAsAttached failed. Expected: Actual: <%v>", removeVolumeDetachErr) + } + + reportAsAttachedVolumesMap := asw.GetVolumesToReportAttached() + volumes, exists := reportAsAttachedVolumesMap[nodeName] + if !exists { + t.Fatalf("MarkDesireToDetach_UnmarkDesireToDetach failed. Expected: Actual: 0 { + t.Fatalf("len(reportAsAttachedVolumes) Expected: <0> Actual: <%v>", len(volumes)) + } + +} + +// Populates data struct with one volume/node entry. +// Calls RemoveVolumeFromReportAsAttached +// Calls AddVolumeToReportAsAttached to add volume back as attached +// Verifyies there is one volume as reported as attached +func Test_RemoveVolumeFromReportAsAttached_AddVolumeToReportAsAttached_Positive(t *testing.T) { + // Arrange + volumePluginMgr, _ := volumetesting.GetTestVolumePluginMgr(t) + asw := NewActualStateOfWorld(volumePluginMgr) + volumeName := api.UniqueVolumeName("volume-name") + volumeSpec := controllervolumetesting.GetTestVolumeSpec(string(volumeName), volumeName) + nodeName := "node-name" + devicePath := "fake/device/path" + generatedVolumeName, addErr := asw.AddVolumeNode(volumeSpec, nodeName, devicePath) + if addErr != nil { + t.Fatalf("AddVolumeNode failed. Expected: Actual: <%v>", addErr) + } + + removeVolumeDetachErr := asw.RemoveVolumeFromReportAsAttached(generatedVolumeName, nodeName) + if removeVolumeDetachErr != nil { + t.Fatalf("RemoveVolumeFromReportAsAttached failed. Expected: Actual: <%v>", removeVolumeDetachErr) + } + + reportAsAttachedVolumesMap := asw.GetVolumesToReportAttached() + volumes, exists := reportAsAttachedVolumesMap[nodeName] + if !exists { + t.Fatalf("MarkDesireToDetach_UnmarkDesireToDetach failed. Expected: Actual: 0 { + t.Fatalf("len(reportAsAttachedVolumes) Expected: <0> Actual: <%v>", len(volumes)) + } + + asw.AddVolumeToReportAsAttached(generatedVolumeName, nodeName) + reportAsAttachedVolumesMap = asw.GetVolumesToReportAttached() + volumes, exists = reportAsAttachedVolumesMap[nodeName] + if !exists { + t.Fatalf("MarkDesireToDetach_UnmarkDesireToDetach failed. Expected: Actual: Actual: <%v>", len(volumes)) + } +} + +// Populates data struct with one volume/node entry. +// Calls RemoveVolumeFromReportAsAttached +// Calls DeleteVolumeNode +// Calls AddVolumeToReportAsAttached +// Verifyies there is no volume as reported as attached +func Test_RemoveVolumeFromReportAsAttached_AddVolumeToReportAsAttached_Negative(t *testing.T) { + // Arrange + volumePluginMgr, _ := volumetesting.GetTestVolumePluginMgr(t) + asw := NewActualStateOfWorld(volumePluginMgr) + volumeName := api.UniqueVolumeName("volume-name") + volumeSpec := controllervolumetesting.GetTestVolumeSpec(string(volumeName), volumeName) + nodeName := "node-name" + devicePath := "fake/device/path" + generatedVolumeName, addErr := asw.AddVolumeNode(volumeSpec, nodeName, devicePath) + if addErr != nil { + t.Fatalf("AddVolumeNode failed. Expected: Actual: <%v>", addErr) + } + + removeVolumeDetachErr := asw.RemoveVolumeFromReportAsAttached(generatedVolumeName, nodeName) + if removeVolumeDetachErr != nil { + t.Fatalf("RemoveVolumeFromReportAsAttached failed. Expected: Actual: <%v>", removeVolumeDetachErr) + } + + reportAsAttachedVolumesMap := asw.GetVolumesToReportAttached() + volumes, exists := reportAsAttachedVolumesMap[nodeName] + if !exists { + t.Fatalf("MarkDesireToDetach_UnmarkDesireToDetach failed. Expected: Actual: 0 { + t.Fatalf("len(reportAsAttachedVolumes) Expected: <0> Actual: <%v>", len(volumes)) + } + + asw.DeleteVolumeNode(generatedVolumeName, nodeName) + asw.AddVolumeToReportAsAttached(generatedVolumeName, nodeName) + + reportAsAttachedVolumesMap = asw.GetVolumesToReportAttached() + volumes, exists = reportAsAttachedVolumesMap[nodeName] + if !exists { + t.Fatalf("MarkDesireToDetach_UnmarkDesireToDetach failed. Expected: Actual: 0 { + t.Fatalf("len(reportAsAttachedVolumes) Expected: <0> Actual: <%v>", len(volumes)) + } +} + +// Populates data struct with one volume/node entry. +// Calls SetDetachRequestTime twice and sleep maxWaitTime (1 second) in between +// The elapsed time returned from the first SetDetachRequestTime call should be smaller than maxWaitTime +// The elapsed time returned from the second SetDetachRequestTime call should be larger than maxWaitTime +func Test_SetDetachRequestTime_Positive(t *testing.T) { + // Arrange + volumePluginMgr, _ := volumetesting.GetTestVolumePluginMgr(t) + asw := NewActualStateOfWorld(volumePluginMgr) + volumeName := api.UniqueVolumeName("volume-name") + volumeSpec := controllervolumetesting.GetTestVolumeSpec(string(volumeName), volumeName) + nodeName := "node-name" + devicePath := "fake/device/path" + generatedVolumeName, addErr := asw.AddVolumeNode(volumeSpec, nodeName, devicePath) + if addErr != nil { + t.Fatalf("AddVolumeNode failed. Expected: Actual: <%v>", addErr) + } + + maxWaitTime := 1 * time.Second + etime, err := asw.SetDetachRequestTime(generatedVolumeName, nodeName) + if err != nil { + t.Fatalf("SetDetachRequestTime failed. Expected: Actual: <%v>", err) + } + if etime >= maxWaitTime { + t.Logf("SetDetachRequestTim Expected: Actual ", etime, maxWaitTime) + } + // Sleep and call SetDetachRequestTime again + time.Sleep(maxWaitTime) + etime, err = asw.SetDetachRequestTime(generatedVolumeName, nodeName) + if err != nil { + t.Fatalf("SetDetachRequestTime failed. Expected: Actual: <%v>", err) + } + if etime < maxWaitTime { + t.Fatalf("SetDetachRequestTim Expected: Actual ", etime, maxWaitTime) + } +} + func verifyAttachedVolume( t *testing.T, attachedVolumes []AttachedVolume, diff --git a/pkg/controller/volume/attachdetach/reconciler/reconciler.go b/pkg/controller/volume/attachdetach/reconciler/reconciler.go index ab39f5b8a1c..6e9816802c3 100644 --- a/pkg/controller/volume/attachdetach/reconciler/reconciler.go +++ b/pkg/controller/volume/attachdetach/reconciler/reconciler.go @@ -92,62 +92,64 @@ func (rc *reconciler) reconciliationLoopFunc() func() { for _, attachedVolume := range rc.actualStateOfWorld.GetAttachedVolumes() { if !rc.desiredStateOfWorld.VolumeExists( attachedVolume.VolumeName, attachedVolume.NodeName) { - // Volume exists in actual state of world but not desired - - // Mark desire to detach - timeElapsed, err := rc.actualStateOfWorld.MarkDesireToDetach(attachedVolume.VolumeName, attachedVolume.NodeName) + // Set the detach request time + elapsedTime, err := rc.actualStateOfWorld.SetDetachRequestTime(attachedVolume.VolumeName, attachedVolume.NodeName) if err != nil { - glog.Errorf("Unexpected error actualStateOfWorld.MarkDesireToDetach(): %v", err) + glog.Errorf("Cannot trigger detach because it fails to set detach request time with error %v", err) + continue } + // Check whether timeout has reached the maximum waiting time + timeout := elapsedTime > rc.maxWaitForUnmountDuration + // Check whether volume is still mounted. Skip detach if it is still mounted unless timeout + if attachedVolume.MountedByNode && !timeout { + glog.V(12).Infof("Cannot trigger detach for volume %q on node %q because volume is still mounted", + attachedVolume.VolumeName, + attachedVolume.NodeName) + continue + } + + // Before triggering volume detach, mark volume as detached and update the node status + // If it fails to update node status, skip detach volume + rc.actualStateOfWorld.RemoveVolumeFromReportAsAttached(attachedVolume.VolumeName, attachedVolume.NodeName) // Update Node Status to indicate volume is no longer safe to mount. err = rc.nodeStatusUpdater.UpdateNodeStatuses() if err != nil { // Skip detaching this volume if unable to update node status - glog.Infof("UpdateNodeStatuses failed with: %v", err) + glog.Errorf("UpdateNodeStatuses failed while attempting to report volume %q as attached to node %q with: %v ", + attachedVolume.VolumeName, + attachedVolume.NodeName, + err) continue } - if !attachedVolume.MountedByNode { - glog.V(5).Infof("Attempting to start DetachVolume for volume %q from node %q", attachedVolume.VolumeName, attachedVolume.NodeName) - err := rc.attacherDetacher.DetachVolume(attachedVolume.AttachedVolume, true /* verifySafeToDetach */, rc.actualStateOfWorld) - if err == nil { + // Trigger detach volume which requires verifing safe to detach step + // If timeout is true, skip verifySafeToDetach check + glog.V(5).Infof("Attempting to start DetachVolume for volume %q from node %q", attachedVolume.VolumeName, attachedVolume.NodeName) + verifySafeToDetach := !timeout + err = rc.attacherDetacher.DetachVolume(attachedVolume.AttachedVolume, verifySafeToDetach, rc.actualStateOfWorld) + if err == nil { + if !timeout { glog.Infof("Started DetachVolume for volume %q from node %q", attachedVolume.VolumeName, attachedVolume.NodeName) - } - if err != nil && - !nestedpendingoperations.IsAlreadyExists(err) && - !exponentialbackoff.IsExponentialBackoff(err) { - // Ignore nestedpendingoperations.IsAlreadyExists && exponentialbackoff.IsExponentialBackoff errors, they are expected. - // Log all other errors. - glog.Errorf( - "operationExecutor.DetachVolume failed to start for volume %q (spec.Name: %q) from node %q with err: %v", + } else { + glog.Infof("Started DetachVolume for volume %q from node %q. This volume is not safe to detach, but maxWaitForUnmountDuration %v expired, force detaching", attachedVolume.VolumeName, - attachedVolume.VolumeSpec.Name(), attachedVolume.NodeName, - err) - } - } else { - // If volume is not safe to detach (is mounted) wait a max amount of time before detaching anyway. - if timeElapsed > rc.maxWaitForUnmountDuration { - glog.V(5).Infof("Attempting to start DetachVolume for volume %q from node %q. Volume is not safe to detach, but maxWaitForUnmountDuration expired.", attachedVolume.VolumeName, attachedVolume.NodeName) - err := rc.attacherDetacher.DetachVolume(attachedVolume.AttachedVolume, false /* verifySafeToDetach */, rc.actualStateOfWorld) - if err == nil { - glog.Infof("Started DetachVolume for volume %q from node %q due to maxWaitForUnmountDuration expiry.", attachedVolume.VolumeName, attachedVolume.NodeName) - } - if err != nil && - !nestedpendingoperations.IsAlreadyExists(err) && - !exponentialbackoff.IsExponentialBackoff(err) { - // Ignore nestedpendingoperations.IsAlreadyExists && exponentialbackoff.IsExponentialBackoff errors, they are expected. - // Log all other errors. - glog.Errorf( - "operationExecutor.DetachVolume failed to start (maxWaitForUnmountDuration expiry) for volume %q (spec.Name: %q) from node %q with err: %v", - attachedVolume.VolumeName, - attachedVolume.VolumeSpec.Name(), - attachedVolume.NodeName, - err) - } + rc.maxWaitForUnmountDuration) } } + if err != nil && + !nestedpendingoperations.IsAlreadyExists(err) && + !exponentialbackoff.IsExponentialBackoff(err) { + // Ignore nestedpendingoperations.IsAlreadyExists && exponentialbackoff.IsExponentialBackoff errors, they are expected. + // Log all other errors. + glog.Errorf( + "operationExecutor.DetachVolume failed to start for volume %q (spec.Name: %q) from node %q with err: %v", + attachedVolume.VolumeName, + attachedVolume.VolumeSpec.Name(), + attachedVolume.NodeName, + err) + } } } @@ -156,12 +158,8 @@ func (rc *reconciler) reconciliationLoopFunc() func() { if rc.actualStateOfWorld.VolumeNodeExists( volumeToAttach.VolumeName, volumeToAttach.NodeName) { // Volume/Node exists, touch it to reset detachRequestedTime - glog.V(12).Infof("Volume %q/Node %q is attached--touching.", volumeToAttach.VolumeName, volumeToAttach.NodeName) - _, err := rc.actualStateOfWorld.AddVolumeNode( - volumeToAttach.VolumeSpec, volumeToAttach.NodeName, "" /* devicePath */) - if err != nil { - glog.Errorf("Unexpected error on actualStateOfWorld.AddVolumeNode(): %v", err) - } + glog.V(5).Infof("Volume %q/Node %q is attached--touching.", volumeToAttach.VolumeName, volumeToAttach.NodeName) + rc.actualStateOfWorld.ResetDetachRequestTime(volumeToAttach.VolumeName, volumeToAttach.NodeName) } else { // Volume/Node doesn't exist, spawn a goroutine to attach it glog.V(5).Infof("Attempting to start AttachVolume for volume %q to node %q", volumeToAttach.VolumeName, volumeToAttach.NodeName) diff --git a/pkg/kubelet/volumemanager/cache/actual_state_of_world.go b/pkg/kubelet/volumemanager/cache/actual_state_of_world.go index 7e8756a7794..5eaa4eb5003 100644 --- a/pkg/kubelet/volumemanager/cache/actual_state_of_world.go +++ b/pkg/kubelet/volumemanager/cache/actual_state_of_world.go @@ -296,6 +296,15 @@ func (asw *actualStateOfWorld) MarkVolumeAsMounted( volumeGidValue) } +func (asw *actualStateOfWorld) AddVolumeToReportAsAttached(volumeName api.UniqueVolumeName, nodeName string) { + // no operation for kubelet side +} + +func (asw *actualStateOfWorld) RemoveVolumeFromReportAsAttached(volumeName api.UniqueVolumeName, nodeName string) error { + // no operation for kubelet side + return nil +} + func (asw *actualStateOfWorld) MarkVolumeAsUnmounted( podName volumetypes.UniquePodName, volumeName api.UniqueVolumeName) error { return asw.DeletePodFromVolume(podName, volumeName) diff --git a/pkg/volume/util/operationexecutor/operation_executor.go b/pkg/volume/util/operationexecutor/operation_executor.go index bb57f99e79b..c09de911599 100644 --- a/pkg/volume/util/operationexecutor/operation_executor.go +++ b/pkg/volume/util/operationexecutor/operation_executor.go @@ -153,6 +153,14 @@ type ActualStateOfWorldAttacherUpdater interface { // Marks the specified volume as detached from the specified node MarkVolumeAsDetached(volumeName api.UniqueVolumeName, nodeName string) + + // Marks desire to detach the specified volume (remove the volume from the node's + // volumesToReportedAsAttached list) + RemoveVolumeFromReportAsAttached(volumeName api.UniqueVolumeName, nodeName string) error + + // Unmarks the desire to detach for the specified volume (add the volume back to + // the node's volumesToReportedAsAttached list) + AddVolumeToReportAsAttached(volumeName api.UniqueVolumeName, nodeName string) } // VolumeToAttach represents a volume that should be attached to a node. @@ -561,24 +569,23 @@ func (oe *operationExecutor) generateDetachVolumeFunc( } return func() error { + var err error if verifySafeToDetach { - safeToDetachErr := oe.verifyVolumeIsSafeToDetach(volumeToDetach) - if safeToDetachErr != nil { - // On failure, return error. Caller will log and retry. - return err - } + err = oe.verifyVolumeIsSafeToDetach(volumeToDetach) } - - // Execute detach - detachErr := volumeDetacher.Detach(volumeName, volumeToDetach.NodeName) - if detachErr != nil { - // On failure, return error. Caller will log and retry. + if err == nil { + err = volumeDetacher.Detach(volumeName, volumeToDetach.NodeName) + } + if err != nil { + // On failure, add volume back to ReportAsAttached list + actualStateOfWorld.AddVolumeToReportAsAttached( + volumeToDetach.VolumeName, volumeToDetach.NodeName) return fmt.Errorf( "DetachVolume.Detach failed for volume %q (spec.Name: %q) from node %q with: %v", volumeToDetach.VolumeName, volumeToDetach.VolumeSpec.Name(), volumeToDetach.NodeName, - detachErr) + err) } glog.Infof(