Fix race condition in updating attached volume between master and node

This PR tries to fix issue #29324. This cause of this issue is a race
condition happens when marking volumes as attached for node status. This
PR tries to clean up the logic of when and where to mark volumes as
attached/detached. Basically the workflow as follows,
1. When volume is attached sucessfully, the volume and node info is
added into nodesToUpdateStatusFor to mark the volume as attached to the
node.
2. When detach request comes in, it will check whether it is safe to
detach now. If the check passes, remove the volume from volumesToReportAsAttached
to indicate the volume is no longer considered as attached now.
Afterwards, reconciler tries to update node status and trigger detach
operation. If any of these operation fails, the volume is added back to
the volumesToReportAsAttached list showing that it is still attached.

These steps should make sure that kubelet get the right (might be
outdated) information about which volume is attached or not. It also
garantees that if detach operation is pending, kubelet should not
trigger any mount operations.
This commit is contained in:
Jing Xu 2016-09-07 15:30:16 -07:00
parent b921c675a0
commit efaceb28cc
6 changed files with 384 additions and 158 deletions

View File

@ -517,6 +517,7 @@ func (adc *attachDetachController) getPVSpecFromCache(
// mounted.
func (adc *attachDetachController) processVolumesInUse(
nodeName string, volumesInUse []api.UniqueVolumeName) {
glog.V(4).Infof("processVolumesInUse for node %q", nodeName)
for _, attachedVolume := range adc.actualStateOfWorld.GetAttachedVolumesForNode(nodeName) {
mounted := false
for _, volumeInUse := range volumesInUse {

View File

@ -26,6 +26,8 @@ import (
"sync"
"time"
"github.com/golang/glog"
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/volume"
"k8s.io/kubernetes/pkg/volume/util/operationexecutor"
@ -48,8 +50,6 @@ type ActualStateOfWorld interface {
// indicating the specified volume is attached to the specified node.
// A unique volume name is generated from the volumeSpec and returned on
// success.
// If the volume/node combo already exists, the detachRequestedTime is reset
// to zero.
// If volumeSpec is not an attachable volume plugin, an error is returned.
// If no volume with the name volumeName exists in the store, the volume is
// added.
@ -66,15 +66,6 @@ type ActualStateOfWorld interface {
// the specified volume, an error is returned.
SetVolumeMountedByNode(volumeName api.UniqueVolumeName, nodeName string, mounted bool) error
// MarkDesireToDetach returns the difference between the current time and
// the DetachRequestedTime for the given volume/node combo. If the
// DetachRequestedTime is zero, it is set to the current time.
// If no volume with the name volumeName exists in the store, an error is
// returned.
// If no node with the name nodeName exists in list of attached nodes for
// the specified volume, an error is returned.
MarkDesireToDetach(volumeName api.UniqueVolumeName, nodeName string) (time.Duration, error)
// ResetNodeStatusUpdateNeeded resets statusUpdateNeeded for the specified
// node to false indicating the AttachedVolume field of the Node's Status
// object has been updated.
@ -82,6 +73,15 @@ type ActualStateOfWorld interface {
// the specified volume, an error is returned.
ResetNodeStatusUpdateNeeded(nodeName string) error
// ResetDetachRequestTime resets the detachRequestTime to 0 which indicates there is no detach
// request any more for the volume
ResetDetachRequestTime(volumeName api.UniqueVolumeName, nodeName string)
// SetDetachRequestTime sets the detachRequestedTime to current time if this is no
// previous request (the previous detachRequestedTime is zero) and return the time elapsed
// since last request
SetDetachRequestTime(volumeName api.UniqueVolumeName, nodeName string) (time.Duration, error)
// DeleteVolumeNode removes the given volume and node from the underlying
// store indicating the specified volume is no longer attached to the
// specified node.
@ -126,9 +126,9 @@ type AttachedVolume struct {
// DetachRequestedTime is used to capture the desire to detach this volume.
// When the volume is newly created this value is set to time zero.
// It is set to current time, when MarkDesireToDetach(...) is called, if it
// It is set to current time, when SetDetachRequestTime(...) is called, if it
// was previously set to zero (other wise its value remains the same).
// It is reset to zero on AddVolumeNode(...) calls.
// It is reset to zero on ResetDetachRequestTime(...) calls.
DetachRequestedTime time.Time
}
@ -234,6 +234,20 @@ func (asw *actualStateOfWorld) MarkVolumeAsDetached(
asw.DeleteVolumeNode(volumeName, nodeName)
}
func (asw *actualStateOfWorld) RemoveVolumeFromReportAsAttached(
volumeName api.UniqueVolumeName, nodeName string) error {
asw.Lock()
defer asw.Unlock()
return asw.removeVolumeFromReportAsAttached(volumeName, nodeName)
}
func (asw *actualStateOfWorld) AddVolumeToReportAsAttached(
volumeName api.UniqueVolumeName, nodeName string) {
asw.Lock()
defer asw.Unlock()
asw.addVolumeToReportAsAttached(volumeName, nodeName)
}
func (asw *actualStateOfWorld) AddVolumeNode(
volumeSpec *volume.Spec, nodeName string, devicePath string) (api.UniqueVolumeName, error) {
asw.Lock()
@ -267,7 +281,7 @@ func (asw *actualStateOfWorld) AddVolumeNode(
asw.attachedVolumes[volumeName] = volumeObj
}
nodeObj, nodeExists := volumeObj.nodesAttachedTo[nodeName]
_, nodeExists := volumeObj.nodesAttachedTo[nodeName]
if !nodeExists {
// Create object if it doesn't exist.
volumeObj.nodesAttachedTo[nodeName] = nodeAttachedTo{
@ -276,30 +290,13 @@ func (asw *actualStateOfWorld) AddVolumeNode(
mountedByNodeSetCount: 0,
detachRequestedTime: time.Time{},
}
} else if !nodeObj.detachRequestedTime.IsZero() {
// Reset detachRequestedTime values if object exists and time is non-zero
nodeObj.detachRequestedTime = time.Time{}
volumeObj.nodesAttachedTo[nodeName] = nodeObj
}
nodeToUpdate, nodeToUpdateExists := asw.nodesToUpdateStatusFor[nodeName]
if !nodeToUpdateExists {
// Create object if it doesn't exist
nodeToUpdate = nodeToUpdateStatusFor{
nodeName: nodeName,
statusUpdateNeeded: true,
volumesToReportAsAttached: make(map[api.UniqueVolumeName]api.UniqueVolumeName),
}
asw.nodesToUpdateStatusFor[nodeName] = nodeToUpdate
}
_, nodeToUpdateVolumeExists :=
nodeToUpdate.volumesToReportAsAttached[volumeName]
if !nodeToUpdateVolumeExists {
nodeToUpdate.statusUpdateNeeded = true
nodeToUpdate.volumesToReportAsAttached[volumeName] = volumeName
asw.nodesToUpdateStatusFor[nodeName] = nodeToUpdate
} else {
glog.V(5).Infof("Volume %q is already added to attachedVolume list to the node %q",
volumeName,
nodeName)
}
asw.addVolumeToReportAsAttached(volumeName, nodeName)
return volumeName, nil
}
@ -307,22 +304,10 @@ func (asw *actualStateOfWorld) SetVolumeMountedByNode(
volumeName api.UniqueVolumeName, nodeName string, mounted bool) error {
asw.Lock()
defer asw.Unlock()
volumeObj, volumeExists := asw.attachedVolumes[volumeName]
if !volumeExists {
return fmt.Errorf(
"failed to SetVolumeMountedByNode(volumeName=%v, nodeName=%q, mounted=%v) volumeName does not exist",
volumeName,
nodeName,
mounted)
}
nodeObj, nodeExists := volumeObj.nodesAttachedTo[nodeName]
if !nodeExists {
return fmt.Errorf(
"failed to SetVolumeMountedByNode(volumeName=%v, nodeName=%q, mounted=%v) nodeName does not exist",
volumeName,
nodeName,
mounted)
volumeObj, nodeObj, err := asw.getNodeAndVolume(volumeName, nodeName)
if err != nil {
return fmt.Errorf("Failed to SetVolumeMountedByNode with error: %v", err)
}
if mounted {
@ -337,37 +322,70 @@ func (asw *actualStateOfWorld) SetVolumeMountedByNode(
nodeObj.mountedByNode = mounted
volumeObj.nodesAttachedTo[nodeName] = nodeObj
glog.V(4).Infof("SetVolumeMountedByNode volume %v to the node %q mounted %q",
volumeName,
nodeName,
mounted)
return nil
}
func (asw *actualStateOfWorld) MarkDesireToDetach(
func (asw *actualStateOfWorld) ResetDetachRequestTime(
volumeName api.UniqueVolumeName, nodeName string) {
asw.Lock()
defer asw.Unlock()
volumeObj, nodeObj, err := asw.getNodeAndVolume(volumeName, nodeName)
if err != nil {
glog.Errorf("Failed to ResetDetachRequestTime with error: %v", err)
return
}
nodeObj.detachRequestedTime = time.Time{}
volumeObj.nodesAttachedTo[nodeName] = nodeObj
}
func (asw *actualStateOfWorld) SetDetachRequestTime(
volumeName api.UniqueVolumeName, nodeName string) (time.Duration, error) {
asw.Lock()
defer asw.Unlock()
volumeObj, volumeExists := asw.attachedVolumes[volumeName]
if !volumeExists {
return time.Millisecond * 0, fmt.Errorf(
"failed to MarkDesireToDetach(volumeName=%v, nodeName=%q) volumeName does not exist",
volumeName,
nodeName)
volumeObj, nodeObj, err := asw.getNodeAndVolume(volumeName, nodeName)
if err != nil {
return 0, fmt.Errorf("Failed to set detach request time with error: %v", err)
}
nodeObj, nodeExists := volumeObj.nodesAttachedTo[nodeName]
if !nodeExists {
return time.Millisecond * 0, fmt.Errorf(
"failed to MarkDesireToDetach(volumeName=%v, nodeName=%q) nodeName does not exist",
volumeName,
nodeName)
}
// If there is no previous detach request, set it to the current time
if nodeObj.detachRequestedTime.IsZero() {
nodeObj.detachRequestedTime = time.Now()
volumeObj.nodesAttachedTo[nodeName] = nodeObj
glog.V(4).Infof("Set detach request time to current time for volume %v on node %q",
volumeName,
nodeName)
}
return time.Since(nodeObj.detachRequestedTime), nil
}
// Get the volume and node object from actual state of world
// This is an internal function and caller should acquire and release the lock
func (asw *actualStateOfWorld) getNodeAndVolume(
volumeName api.UniqueVolumeName, nodeName string) (attachedVolume, nodeAttachedTo, error) {
volumeObj, volumeExists := asw.attachedVolumes[volumeName]
if volumeExists {
nodeObj, nodeExists := volumeObj.nodesAttachedTo[nodeName]
if nodeExists {
return volumeObj, nodeObj, nil
}
}
// Remove volume from volumes to report as attached
return attachedVolume{}, nodeAttachedTo{}, fmt.Errorf("volume %v is no longer attached to the node %q",
volumeName,
nodeName)
}
// Remove the volumeName from the node's volumesToReportAsAttached list
// This is an internal function and caller should acquire and release the lock
func (asw *actualStateOfWorld) removeVolumeFromReportAsAttached(
volumeName api.UniqueVolumeName, nodeName string) error {
nodeToUpdate, nodeToUpdateExists := asw.nodesToUpdateStatusFor[nodeName]
if nodeToUpdateExists {
_, nodeToUpdateVolumeExists :=
@ -376,10 +394,43 @@ func (asw *actualStateOfWorld) MarkDesireToDetach(
nodeToUpdate.statusUpdateNeeded = true
delete(nodeToUpdate.volumesToReportAsAttached, volumeName)
asw.nodesToUpdateStatusFor[nodeName] = nodeToUpdate
return nil
}
}
return fmt.Errorf("volume %q or node %q does not exist in volumesToReportAsAttached list",
volumeName,
nodeName)
return time.Since(volumeObj.nodesAttachedTo[nodeName].detachRequestedTime), nil
}
// Add the volumeName to the node's volumesToReportAsAttached list
// This is an internal function and caller should acquire and release the lock
func (asw *actualStateOfWorld) addVolumeToReportAsAttached(
volumeName api.UniqueVolumeName, nodeName string) {
// In case the volume/node entry is no longer in attachedVolume list, skip the rest
if _, _, err := asw.getNodeAndVolume(volumeName, nodeName); err != nil {
glog.V(4).Infof("Volume %q is no longer attached to node %q", volumeName, nodeName)
return
}
nodeToUpdate, nodeToUpdateExists := asw.nodesToUpdateStatusFor[nodeName]
if !nodeToUpdateExists {
// Create object if it doesn't exist
nodeToUpdate = nodeToUpdateStatusFor{
nodeName: nodeName,
statusUpdateNeeded: true,
volumesToReportAsAttached: make(map[api.UniqueVolumeName]api.UniqueVolumeName),
}
asw.nodesToUpdateStatusFor[nodeName] = nodeToUpdate
glog.V(4).Infof("Add new node %q to nodesToUpdateStatusFor", nodeName)
}
_, nodeToUpdateVolumeExists :=
nodeToUpdate.volumesToReportAsAttached[volumeName]
if !nodeToUpdateVolumeExists {
nodeToUpdate.statusUpdateNeeded = true
nodeToUpdate.volumesToReportAsAttached[volumeName] = volumeName
asw.nodesToUpdateStatusFor[nodeName] = nodeToUpdate
glog.V(4).Infof("Report volume %q as attached to node %q", volumeName, nodeName)
}
}
func (asw *actualStateOfWorld) ResetNodeStatusUpdateNeeded(
@ -419,16 +470,7 @@ func (asw *actualStateOfWorld) DeleteVolumeNode(
}
// Remove volume from volumes to report as attached
nodeToUpdate, nodeToUpdateExists := asw.nodesToUpdateStatusFor[nodeName]
if nodeToUpdateExists {
_, nodeToUpdateVolumeExists :=
nodeToUpdate.volumesToReportAsAttached[volumeName]
if nodeToUpdateVolumeExists {
nodeToUpdate.statusUpdateNeeded = true
delete(nodeToUpdate.volumesToReportAsAttached, volumeName)
asw.nodesToUpdateStatusFor[nodeName] = nodeToUpdate
}
}
asw.removeVolumeFromReportAsAttached(volumeName, nodeName)
}
func (asw *actualStateOfWorld) VolumeNodeExists(

View File

@ -18,6 +18,7 @@ package cache
import (
"testing"
"time"
"k8s.io/kubernetes/pkg/api"
controllervolumetesting "k8s.io/kubernetes/pkg/controller/volume/attachdetach/testing"
@ -597,7 +598,7 @@ func Test_SetVolumeMountedByNode_Positive_UnsetWithInitialSetAddVolumeNodeNotRes
}
// Populates data struct with one volume/node entry.
// Calls MarkDesireToDetach() once on volume/node entry.
// Calls RemoveVolumeFromReportAsAttached() once on volume/node entry.
// Calls SetVolumeMountedByNode() twice, first setting mounted to true then false.
// Verifies mountedByNode is false and detachRequestedTime is NOT zero.
func Test_SetVolumeMountedByNode_Positive_UnsetWithInitialSetVerifyDetachRequestedTimePerserved(t *testing.T) {
@ -612,9 +613,13 @@ func Test_SetVolumeMountedByNode_Positive_UnsetWithInitialSetVerifyDetachRequest
if addErr != nil {
t.Fatalf("AddVolumeNode failed. Expected: <no error> Actual: <%v>", addErr)
}
_, err := asw.MarkDesireToDetach(generatedVolumeName, nodeName)
_, err := asw.SetDetachRequestTime(generatedVolumeName, nodeName)
if err != nil {
t.Fatalf("MarkDesireToDetach failed. Expected: <no error> Actual: <%v>", err)
t.Fatalf("SetDetachRequestTime failed. Expected: <no error> Actual: <%v>", err)
}
err = asw.RemoveVolumeFromReportAsAttached(generatedVolumeName, nodeName)
if err != nil {
t.Fatalf("RemoveVolumeFromReportAsAttached failed. Expected: <no error> Actual: <%v>", err)
}
expectedDetachRequestedTime := asw.GetAttachedVolumes()[0].DetachRequestedTime
@ -643,7 +648,7 @@ func Test_SetVolumeMountedByNode_Positive_UnsetWithInitialSetVerifyDetachRequest
// Populates data struct with one volume/node entry.
// Verifies mountedByNode is true and detachRequestedTime is zero (default values).
func Test_MarkDesireToDetach_Positive_Set(t *testing.T) {
func Test_RemoveVolumeFromReportAsAttached_Positive_Set(t *testing.T) {
// Arrange
volumePluginMgr, _ := volumetesting.GetTestVolumePluginMgr(t)
asw := NewActualStateOfWorld(volumePluginMgr)
@ -668,9 +673,9 @@ func Test_MarkDesireToDetach_Positive_Set(t *testing.T) {
}
// Populates data struct with one volume/node entry.
// Calls MarkDesireToDetach() once on volume/node entry.
// Calls RemoveVolumeFromReportAsAttached() once on volume/node entry.
// Verifies mountedByNode is true and detachRequestedTime is NOT zero.
func Test_MarkDesireToDetach_Positive_Marked(t *testing.T) {
func Test_RemoveVolumeFromReportAsAttached_Positive_Marked(t *testing.T) {
// Arrange
volumePluginMgr, _ := volumetesting.GetTestVolumePluginMgr(t)
asw := NewActualStateOfWorld(volumePluginMgr)
@ -684,9 +689,11 @@ func Test_MarkDesireToDetach_Positive_Marked(t *testing.T) {
}
// Act
_, markDesireToDetachErr := asw.MarkDesireToDetach(generatedVolumeName, nodeName)
// Assert
_, err := asw.SetDetachRequestTime(generatedVolumeName, nodeName)
if err != nil {
t.Fatalf("SetDetachRequestTime failed. Expected: <no error> Actual: <%v>", err)
}
markDesireToDetachErr := asw.RemoveVolumeFromReportAsAttached(generatedVolumeName, nodeName)
if markDesireToDetachErr != nil {
t.Fatalf("MarkDesireToDetach failed. Expected: <no error> Actual: <%v>", markDesireToDetachErr)
}
@ -702,7 +709,7 @@ func Test_MarkDesireToDetach_Positive_Marked(t *testing.T) {
// Populates data struct with one volume/node entry.
// Calls MarkDesireToDetach() once on volume/node entry.
// Calls AddVolumeNode() to re-add the same volume/node entry.
// Calls ResetDetachRequestTime() to reset the detach request time value back to 0.
// Verifies mountedByNode is true and detachRequestedTime is reset to zero.
func Test_MarkDesireToDetach_Positive_MarkedAddVolumeNodeReset(t *testing.T) {
// Arrange
@ -718,12 +725,17 @@ func Test_MarkDesireToDetach_Positive_MarkedAddVolumeNodeReset(t *testing.T) {
}
// Act
_, markDesireToDetachErr := asw.MarkDesireToDetach(generatedVolumeName, nodeName)
generatedVolumeName, addErr = asw.AddVolumeNode(volumeSpec, nodeName, devicePath)
_, err := asw.SetDetachRequestTime(generatedVolumeName, nodeName)
if err != nil {
t.Fatalf("SetDetachRequestTime failed. Expected: <no error> Actual: <%v>", err)
}
markDesireToDetachErr := asw.RemoveVolumeFromReportAsAttached(generatedVolumeName, nodeName)
// Reset detach request time to 0
asw.ResetDetachRequestTime(generatedVolumeName, nodeName)
// Assert
if markDesireToDetachErr != nil {
t.Fatalf("MarkDesireToDetach failed. Expected: <no error> Actual: <%v>", markDesireToDetachErr)
t.Fatalf("RemoveVolumeFromReportAsAttached failed. Expected: <no error> Actual: <%v>", markDesireToDetachErr)
}
if addErr != nil {
t.Fatalf("AddVolumeNode failed. Expected: <no error> Actual: <%v>", addErr)
@ -740,9 +752,9 @@ func Test_MarkDesireToDetach_Positive_MarkedAddVolumeNodeReset(t *testing.T) {
// Populates data struct with one volume/node entry.
// Calls SetVolumeMountedByNode() twice, first setting mounted to true then false.
// Calls MarkDesireToDetach() once on volume/node entry.
// Calls RemoveVolumeFromReportAsAttached() once on volume/node entry.
// Verifies mountedByNode is false and detachRequestedTime is NOT zero.
func Test_MarkDesireToDetach_Positive_UnsetWithInitialSetVolumeMountedByNodePreserved(t *testing.T) {
func Test_RemoveVolumeFromReportAsAttached_Positive_UnsetWithInitialSetVolumeMountedByNodePreserved(t *testing.T) {
// Arrange
volumePluginMgr, _ := volumetesting.GetTestVolumePluginMgr(t)
asw := NewActualStateOfWorld(volumePluginMgr)
@ -764,11 +776,13 @@ func Test_MarkDesireToDetach_Positive_UnsetWithInitialSetVolumeMountedByNodePres
}
// Act
_, markDesireToDetachErr := asw.MarkDesireToDetach(generatedVolumeName, nodeName)
// Assert
if markDesireToDetachErr != nil {
t.Fatalf("MarkDesireToDetach failed. Expected: <no error> Actual: <%v>", markDesireToDetachErr)
_, err := asw.SetDetachRequestTime(generatedVolumeName, nodeName)
if err != nil {
t.Fatalf("SetDetachRequestTime failed. Expected: <no error> Actual: <%v>", err)
}
removeVolumeDetachErr := asw.RemoveVolumeFromReportAsAttached(generatedVolumeName, nodeName)
if removeVolumeDetachErr != nil {
t.Fatalf("RemoveVolumeFromReportAsAttached failed. Expected: <no error> Actual: <%v>", removeVolumeDetachErr)
}
// Assert
@ -780,6 +794,161 @@ func Test_MarkDesireToDetach_Positive_UnsetWithInitialSetVolumeMountedByNodePres
verifyAttachedVolume(t, attachedVolumes, generatedVolumeName, string(volumeName), nodeName, false /* expectedMountedByNode */, true /* expectNonZeroDetachRequestedTime */)
}
// Populates data struct with one volume/node entry.
// Calls RemoveVolumeFromReportAsAttached
// Verifyies there is no valume as reported as attached
func Test_RemoveVolumeFromReportAsAttached(t *testing.T) {
// Arrange
volumePluginMgr, _ := volumetesting.GetTestVolumePluginMgr(t)
asw := NewActualStateOfWorld(volumePluginMgr)
volumeName := api.UniqueVolumeName("volume-name")
volumeSpec := controllervolumetesting.GetTestVolumeSpec(string(volumeName), volumeName)
nodeName := "node-name"
devicePath := "fake/device/path"
generatedVolumeName, addErr := asw.AddVolumeNode(volumeSpec, nodeName, devicePath)
if addErr != nil {
t.Fatalf("AddVolumeNode failed. Expected: <no error> Actual: <%v>", addErr)
}
removeVolumeDetachErr := asw.RemoveVolumeFromReportAsAttached(generatedVolumeName, nodeName)
if removeVolumeDetachErr != nil {
t.Fatalf("RemoveVolumeFromReportAsAttached failed. Expected: <no error> Actual: <%v>", removeVolumeDetachErr)
}
reportAsAttachedVolumesMap := asw.GetVolumesToReportAttached()
volumes, exists := reportAsAttachedVolumesMap[nodeName]
if !exists {
t.Fatalf("MarkDesireToDetach_UnmarkDesireToDetach failed. Expected: <node %q exist> Actual: <node does not exist in the reportedAsAttached map", nodeName)
}
if len(volumes) > 0 {
t.Fatalf("len(reportAsAttachedVolumes) Expected: <0> Actual: <%v>", len(volumes))
}
}
// Populates data struct with one volume/node entry.
// Calls RemoveVolumeFromReportAsAttached
// Calls AddVolumeToReportAsAttached to add volume back as attached
// Verifyies there is one volume as reported as attached
func Test_RemoveVolumeFromReportAsAttached_AddVolumeToReportAsAttached_Positive(t *testing.T) {
// Arrange
volumePluginMgr, _ := volumetesting.GetTestVolumePluginMgr(t)
asw := NewActualStateOfWorld(volumePluginMgr)
volumeName := api.UniqueVolumeName("volume-name")
volumeSpec := controllervolumetesting.GetTestVolumeSpec(string(volumeName), volumeName)
nodeName := "node-name"
devicePath := "fake/device/path"
generatedVolumeName, addErr := asw.AddVolumeNode(volumeSpec, nodeName, devicePath)
if addErr != nil {
t.Fatalf("AddVolumeNode failed. Expected: <no error> Actual: <%v>", addErr)
}
removeVolumeDetachErr := asw.RemoveVolumeFromReportAsAttached(generatedVolumeName, nodeName)
if removeVolumeDetachErr != nil {
t.Fatalf("RemoveVolumeFromReportAsAttached failed. Expected: <no error> Actual: <%v>", removeVolumeDetachErr)
}
reportAsAttachedVolumesMap := asw.GetVolumesToReportAttached()
volumes, exists := reportAsAttachedVolumesMap[nodeName]
if !exists {
t.Fatalf("MarkDesireToDetach_UnmarkDesireToDetach failed. Expected: <node %q exist> Actual: <node does not exist in the reportedAsAttached map", nodeName)
}
if len(volumes) > 0 {
t.Fatalf("len(reportAsAttachedVolumes) Expected: <0> Actual: <%v>", len(volumes))
}
asw.AddVolumeToReportAsAttached(generatedVolumeName, nodeName)
reportAsAttachedVolumesMap = asw.GetVolumesToReportAttached()
volumes, exists = reportAsAttachedVolumesMap[nodeName]
if !exists {
t.Fatalf("MarkDesireToDetach_UnmarkDesireToDetach failed. Expected: <node %q exist> Actual: <node does not exist in the reportedAsAttached map", nodeName)
}
if len(volumes) != 1 {
t.Fatalf("len(reportAsAttachedVolumes) Expected: <1> Actual: <%v>", len(volumes))
}
}
// Populates data struct with one volume/node entry.
// Calls RemoveVolumeFromReportAsAttached
// Calls DeleteVolumeNode
// Calls AddVolumeToReportAsAttached
// Verifyies there is no volume as reported as attached
func Test_RemoveVolumeFromReportAsAttached_AddVolumeToReportAsAttached_Negative(t *testing.T) {
// Arrange
volumePluginMgr, _ := volumetesting.GetTestVolumePluginMgr(t)
asw := NewActualStateOfWorld(volumePluginMgr)
volumeName := api.UniqueVolumeName("volume-name")
volumeSpec := controllervolumetesting.GetTestVolumeSpec(string(volumeName), volumeName)
nodeName := "node-name"
devicePath := "fake/device/path"
generatedVolumeName, addErr := asw.AddVolumeNode(volumeSpec, nodeName, devicePath)
if addErr != nil {
t.Fatalf("AddVolumeNode failed. Expected: <no error> Actual: <%v>", addErr)
}
removeVolumeDetachErr := asw.RemoveVolumeFromReportAsAttached(generatedVolumeName, nodeName)
if removeVolumeDetachErr != nil {
t.Fatalf("RemoveVolumeFromReportAsAttached failed. Expected: <no error> Actual: <%v>", removeVolumeDetachErr)
}
reportAsAttachedVolumesMap := asw.GetVolumesToReportAttached()
volumes, exists := reportAsAttachedVolumesMap[nodeName]
if !exists {
t.Fatalf("MarkDesireToDetach_UnmarkDesireToDetach failed. Expected: <node %q exist> Actual: <node does not exist in the reportedAsAttached map", nodeName)
}
if len(volumes) > 0 {
t.Fatalf("len(reportAsAttachedVolumes) Expected: <0> Actual: <%v>", len(volumes))
}
asw.DeleteVolumeNode(generatedVolumeName, nodeName)
asw.AddVolumeToReportAsAttached(generatedVolumeName, nodeName)
reportAsAttachedVolumesMap = asw.GetVolumesToReportAttached()
volumes, exists = reportAsAttachedVolumesMap[nodeName]
if !exists {
t.Fatalf("MarkDesireToDetach_UnmarkDesireToDetach failed. Expected: <node %q exist> Actual: <node does not exist in the reportedAsAttached map", nodeName)
}
if len(volumes) > 0 {
t.Fatalf("len(reportAsAttachedVolumes) Expected: <0> Actual: <%v>", len(volumes))
}
}
// Populates data struct with one volume/node entry.
// Calls SetDetachRequestTime twice and sleep maxWaitTime (1 second) in between
// The elapsed time returned from the first SetDetachRequestTime call should be smaller than maxWaitTime
// The elapsed time returned from the second SetDetachRequestTime call should be larger than maxWaitTime
func Test_SetDetachRequestTime_Positive(t *testing.T) {
// Arrange
volumePluginMgr, _ := volumetesting.GetTestVolumePluginMgr(t)
asw := NewActualStateOfWorld(volumePluginMgr)
volumeName := api.UniqueVolumeName("volume-name")
volumeSpec := controllervolumetesting.GetTestVolumeSpec(string(volumeName), volumeName)
nodeName := "node-name"
devicePath := "fake/device/path"
generatedVolumeName, addErr := asw.AddVolumeNode(volumeSpec, nodeName, devicePath)
if addErr != nil {
t.Fatalf("AddVolumeNode failed. Expected: <no error> Actual: <%v>", addErr)
}
maxWaitTime := 1 * time.Second
etime, err := asw.SetDetachRequestTime(generatedVolumeName, nodeName)
if err != nil {
t.Fatalf("SetDetachRequestTime failed. Expected: <no error> Actual: <%v>", err)
}
if etime >= maxWaitTime {
t.Logf("SetDetachRequestTim Expected: <elapsed time %v is smaller than maxWaitTime %v> Actual <elapsed time is larger than maxWaitTime>", etime, maxWaitTime)
}
// Sleep and call SetDetachRequestTime again
time.Sleep(maxWaitTime)
etime, err = asw.SetDetachRequestTime(generatedVolumeName, nodeName)
if err != nil {
t.Fatalf("SetDetachRequestTime failed. Expected: <no error> Actual: <%v>", err)
}
if etime < maxWaitTime {
t.Fatalf("SetDetachRequestTim Expected: <elapsed time %v is larger than maxWaitTime %v> Actual <elapsed time is smaller>", etime, maxWaitTime)
}
}
func verifyAttachedVolume(
t *testing.T,
attachedVolumes []AttachedVolume,

View File

@ -92,62 +92,64 @@ func (rc *reconciler) reconciliationLoopFunc() func() {
for _, attachedVolume := range rc.actualStateOfWorld.GetAttachedVolumes() {
if !rc.desiredStateOfWorld.VolumeExists(
attachedVolume.VolumeName, attachedVolume.NodeName) {
// Volume exists in actual state of world but not desired
// Mark desire to detach
timeElapsed, err := rc.actualStateOfWorld.MarkDesireToDetach(attachedVolume.VolumeName, attachedVolume.NodeName)
// Set the detach request time
elapsedTime, err := rc.actualStateOfWorld.SetDetachRequestTime(attachedVolume.VolumeName, attachedVolume.NodeName)
if err != nil {
glog.Errorf("Unexpected error actualStateOfWorld.MarkDesireToDetach(): %v", err)
glog.Errorf("Cannot trigger detach because it fails to set detach request time with error %v", err)
continue
}
// Check whether timeout has reached the maximum waiting time
timeout := elapsedTime > rc.maxWaitForUnmountDuration
// Check whether volume is still mounted. Skip detach if it is still mounted unless timeout
if attachedVolume.MountedByNode && !timeout {
glog.V(12).Infof("Cannot trigger detach for volume %q on node %q because volume is still mounted",
attachedVolume.VolumeName,
attachedVolume.NodeName)
continue
}
// Before triggering volume detach, mark volume as detached and update the node status
// If it fails to update node status, skip detach volume
rc.actualStateOfWorld.RemoveVolumeFromReportAsAttached(attachedVolume.VolumeName, attachedVolume.NodeName)
// Update Node Status to indicate volume is no longer safe to mount.
err = rc.nodeStatusUpdater.UpdateNodeStatuses()
if err != nil {
// Skip detaching this volume if unable to update node status
glog.Infof("UpdateNodeStatuses failed with: %v", err)
glog.Errorf("UpdateNodeStatuses failed while attempting to report volume %q as attached to node %q with: %v ",
attachedVolume.VolumeName,
attachedVolume.NodeName,
err)
continue
}
if !attachedVolume.MountedByNode {
glog.V(5).Infof("Attempting to start DetachVolume for volume %q from node %q", attachedVolume.VolumeName, attachedVolume.NodeName)
err := rc.attacherDetacher.DetachVolume(attachedVolume.AttachedVolume, true /* verifySafeToDetach */, rc.actualStateOfWorld)
if err == nil {
// Trigger detach volume which requires verifing safe to detach step
// If timeout is true, skip verifySafeToDetach check
glog.V(5).Infof("Attempting to start DetachVolume for volume %q from node %q", attachedVolume.VolumeName, attachedVolume.NodeName)
verifySafeToDetach := !timeout
err = rc.attacherDetacher.DetachVolume(attachedVolume.AttachedVolume, verifySafeToDetach, rc.actualStateOfWorld)
if err == nil {
if !timeout {
glog.Infof("Started DetachVolume for volume %q from node %q", attachedVolume.VolumeName, attachedVolume.NodeName)
}
if err != nil &&
!nestedpendingoperations.IsAlreadyExists(err) &&
!exponentialbackoff.IsExponentialBackoff(err) {
// Ignore nestedpendingoperations.IsAlreadyExists && exponentialbackoff.IsExponentialBackoff errors, they are expected.
// Log all other errors.
glog.Errorf(
"operationExecutor.DetachVolume failed to start for volume %q (spec.Name: %q) from node %q with err: %v",
} else {
glog.Infof("Started DetachVolume for volume %q from node %q. This volume is not safe to detach, but maxWaitForUnmountDuration %v expired, force detaching",
attachedVolume.VolumeName,
attachedVolume.VolumeSpec.Name(),
attachedVolume.NodeName,
err)
}
} else {
// If volume is not safe to detach (is mounted) wait a max amount of time before detaching anyway.
if timeElapsed > rc.maxWaitForUnmountDuration {
glog.V(5).Infof("Attempting to start DetachVolume for volume %q from node %q. Volume is not safe to detach, but maxWaitForUnmountDuration expired.", attachedVolume.VolumeName, attachedVolume.NodeName)
err := rc.attacherDetacher.DetachVolume(attachedVolume.AttachedVolume, false /* verifySafeToDetach */, rc.actualStateOfWorld)
if err == nil {
glog.Infof("Started DetachVolume for volume %q from node %q due to maxWaitForUnmountDuration expiry.", attachedVolume.VolumeName, attachedVolume.NodeName)
}
if err != nil &&
!nestedpendingoperations.IsAlreadyExists(err) &&
!exponentialbackoff.IsExponentialBackoff(err) {
// Ignore nestedpendingoperations.IsAlreadyExists && exponentialbackoff.IsExponentialBackoff errors, they are expected.
// Log all other errors.
glog.Errorf(
"operationExecutor.DetachVolume failed to start (maxWaitForUnmountDuration expiry) for volume %q (spec.Name: %q) from node %q with err: %v",
attachedVolume.VolumeName,
attachedVolume.VolumeSpec.Name(),
attachedVolume.NodeName,
err)
}
rc.maxWaitForUnmountDuration)
}
}
if err != nil &&
!nestedpendingoperations.IsAlreadyExists(err) &&
!exponentialbackoff.IsExponentialBackoff(err) {
// Ignore nestedpendingoperations.IsAlreadyExists && exponentialbackoff.IsExponentialBackoff errors, they are expected.
// Log all other errors.
glog.Errorf(
"operationExecutor.DetachVolume failed to start for volume %q (spec.Name: %q) from node %q with err: %v",
attachedVolume.VolumeName,
attachedVolume.VolumeSpec.Name(),
attachedVolume.NodeName,
err)
}
}
}
@ -156,12 +158,8 @@ func (rc *reconciler) reconciliationLoopFunc() func() {
if rc.actualStateOfWorld.VolumeNodeExists(
volumeToAttach.VolumeName, volumeToAttach.NodeName) {
// Volume/Node exists, touch it to reset detachRequestedTime
glog.V(12).Infof("Volume %q/Node %q is attached--touching.", volumeToAttach.VolumeName, volumeToAttach.NodeName)
_, err := rc.actualStateOfWorld.AddVolumeNode(
volumeToAttach.VolumeSpec, volumeToAttach.NodeName, "" /* devicePath */)
if err != nil {
glog.Errorf("Unexpected error on actualStateOfWorld.AddVolumeNode(): %v", err)
}
glog.V(5).Infof("Volume %q/Node %q is attached--touching.", volumeToAttach.VolumeName, volumeToAttach.NodeName)
rc.actualStateOfWorld.ResetDetachRequestTime(volumeToAttach.VolumeName, volumeToAttach.NodeName)
} else {
// Volume/Node doesn't exist, spawn a goroutine to attach it
glog.V(5).Infof("Attempting to start AttachVolume for volume %q to node %q", volumeToAttach.VolumeName, volumeToAttach.NodeName)

View File

@ -296,6 +296,15 @@ func (asw *actualStateOfWorld) MarkVolumeAsMounted(
volumeGidValue)
}
func (asw *actualStateOfWorld) AddVolumeToReportAsAttached(volumeName api.UniqueVolumeName, nodeName string) {
// no operation for kubelet side
}
func (asw *actualStateOfWorld) RemoveVolumeFromReportAsAttached(volumeName api.UniqueVolumeName, nodeName string) error {
// no operation for kubelet side
return nil
}
func (asw *actualStateOfWorld) MarkVolumeAsUnmounted(
podName volumetypes.UniquePodName, volumeName api.UniqueVolumeName) error {
return asw.DeletePodFromVolume(podName, volumeName)

View File

@ -153,6 +153,14 @@ type ActualStateOfWorldAttacherUpdater interface {
// Marks the specified volume as detached from the specified node
MarkVolumeAsDetached(volumeName api.UniqueVolumeName, nodeName string)
// Marks desire to detach the specified volume (remove the volume from the node's
// volumesToReportedAsAttached list)
RemoveVolumeFromReportAsAttached(volumeName api.UniqueVolumeName, nodeName string) error
// Unmarks the desire to detach for the specified volume (add the volume back to
// the node's volumesToReportedAsAttached list)
AddVolumeToReportAsAttached(volumeName api.UniqueVolumeName, nodeName string)
}
// VolumeToAttach represents a volume that should be attached to a node.
@ -561,24 +569,23 @@ func (oe *operationExecutor) generateDetachVolumeFunc(
}
return func() error {
var err error
if verifySafeToDetach {
safeToDetachErr := oe.verifyVolumeIsSafeToDetach(volumeToDetach)
if safeToDetachErr != nil {
// On failure, return error. Caller will log and retry.
return err
}
err = oe.verifyVolumeIsSafeToDetach(volumeToDetach)
}
// Execute detach
detachErr := volumeDetacher.Detach(volumeName, volumeToDetach.NodeName)
if detachErr != nil {
// On failure, return error. Caller will log and retry.
if err == nil {
err = volumeDetacher.Detach(volumeName, volumeToDetach.NodeName)
}
if err != nil {
// On failure, add volume back to ReportAsAttached list
actualStateOfWorld.AddVolumeToReportAsAttached(
volumeToDetach.VolumeName, volumeToDetach.NodeName)
return fmt.Errorf(
"DetachVolume.Detach failed for volume %q (spec.Name: %q) from node %q with: %v",
volumeToDetach.VolumeName,
volumeToDetach.VolumeSpec.Name(),
volumeToDetach.NodeName,
detachErr)
err)
}
glog.Infof(