Merge pull request #94599 from verult/adc-op-asw-race

Fixes Attach Detach Controller reconciler race reading ActualStateOfWorld and operation pending states
This commit is contained in:
Kubernetes Prow Robot 2020-12-08 16:28:53 -08:00 committed by GitHub
commit ce7ac8442e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 178 additions and 110 deletions

View File

@ -461,7 +461,12 @@ func (adc *attachDetachController) populateDesiredStateOfWorld() error {
err) err)
continue continue
} }
if adc.actualStateOfWorld.IsVolumeAttachedToNode(volumeName, nodeName) { attachState := adc.actualStateOfWorld.GetAttachState(volumeName, nodeName)
if attachState == cache.AttachStateAttached {
klog.V(10).Infof("Volume %q is attached to node %q. Marking as attached in ActualStateOfWorld",
volumeName,
nodeName,
)
devicePath, err := adc.getNodeVolumeDevicePath(volumeName, nodeName) devicePath, err := adc.getNodeVolumeDevicePath(volumeName, nodeName)
if err != nil { if err != nil {
klog.Errorf("Failed to find device path: %v", err) klog.Errorf("Failed to find device path: %v", err)

View File

@ -116,8 +116,8 @@ func Test_AttachDetachControllerStateOfWolrdPopulators_Positive(t *testing.T) {
for _, node := range nodes { for _, node := range nodes {
nodeName := types.NodeName(node.Name) nodeName := types.NodeName(node.Name)
for _, attachedVolume := range node.Status.VolumesAttached { for _, attachedVolume := range node.Status.VolumesAttached {
found := adc.actualStateOfWorld.IsVolumeAttachedToNode(attachedVolume.Name, nodeName) attachedState := adc.actualStateOfWorld.GetAttachState(attachedVolume.Name, nodeName)
if !found { if attachedState != cache.AttachStateAttached {
t.Fatalf("Run failed with error. Node %s, volume %s not found", nodeName, attachedVolume.Name) t.Fatalf("Run failed with error. Node %s, volume %s not found", nodeName, attachedVolume.Name)
} }
} }

View File

@ -96,10 +96,13 @@ type ActualStateOfWorld interface {
// nodes, the volume is also deleted. // nodes, the volume is also deleted.
DeleteVolumeNode(volumeName v1.UniqueVolumeName, nodeName types.NodeName) DeleteVolumeNode(volumeName v1.UniqueVolumeName, nodeName types.NodeName)
// IsVolumeAttachedToNode returns true if the specified volume/node combo exists // GetAttachState returns the attach state for the given volume-node
// in the underlying store indicating the specified volume is attached to // combination.
// the specified node. // Returns AttachStateAttached if the specified volume/node combo exists in
IsVolumeAttachedToNode(volumeName v1.UniqueVolumeName, nodeName types.NodeName) bool // the underlying store indicating the specified volume is attached to the
// specified node, AttachStateDetached if the combo does not exist, or
// AttachStateUncertain if the attached state is marked as uncertain.
GetAttachState(volumeName v1.UniqueVolumeName, nodeName types.NodeName) AttachState
// GetAttachedVolumes generates and returns a list of volumes/node pairs // GetAttachedVolumes generates and returns a list of volumes/node pairs
// reflecting which volumes might attached to which nodes based on the // reflecting which volumes might attached to which nodes based on the
@ -153,6 +156,31 @@ type AttachedVolume struct {
DetachRequestedTime time.Time DetachRequestedTime time.Time
} }
// AttachState represents the attach state of a volume to a node known to the
// Actual State of World.
// This type is used as external representation of attach state (specifically
// as the return type of GetAttachState only); the state is represented
// differently in the internal cache implementation.
type AttachState int
const (
// AttachStateAttached represents the state in which the volume is attached to
// the node.
AttachStateAttached AttachState = iota
// AttachStateUncertain represents the state in which the Actual State of World
// does not know whether the volume is attached to the node.
AttachStateUncertain
// AttachStateDetached represents the state in which the volume is not
// attached to the node.
AttachStateDetached
)
func (s AttachState) String() string {
return []string{"Attached", "Uncertain", "Detached"}[s]
}
// NewActualStateOfWorld returns a new instance of ActualStateOfWorld. // NewActualStateOfWorld returns a new instance of ActualStateOfWorld.
func NewActualStateOfWorld(volumePluginMgr *volume.VolumePluginMgr) ActualStateOfWorld { func NewActualStateOfWorld(volumePluginMgr *volume.VolumePluginMgr) ActualStateOfWorld {
return &actualStateOfWorld{ return &actualStateOfWorld{
@ -530,19 +558,22 @@ func (asw *actualStateOfWorld) DeleteVolumeNode(
asw.removeVolumeFromReportAsAttached(volumeName, nodeName) asw.removeVolumeFromReportAsAttached(volumeName, nodeName)
} }
func (asw *actualStateOfWorld) IsVolumeAttachedToNode( func (asw *actualStateOfWorld) GetAttachState(
volumeName v1.UniqueVolumeName, nodeName types.NodeName) bool { volumeName v1.UniqueVolumeName, nodeName types.NodeName) AttachState {
asw.RLock() asw.RLock()
defer asw.RUnlock() defer asw.RUnlock()
volumeObj, volumeExists := asw.attachedVolumes[volumeName] volumeObj, volumeExists := asw.attachedVolumes[volumeName]
if volumeExists { if volumeExists {
if node, nodeExists := volumeObj.nodesAttachedTo[nodeName]; nodeExists { if node, nodeExists := volumeObj.nodesAttachedTo[nodeName]; nodeExists {
return node.attachedConfirmed if node.attachedConfirmed {
return AttachStateAttached
}
return AttachStateUncertain
} }
} }
return false return AttachStateDetached
} }
func (asw *actualStateOfWorld) GetAttachedVolumes() []AttachedVolume { func (asw *actualStateOfWorld) GetAttachedVolumes() []AttachedVolume {

View File

@ -47,9 +47,9 @@ func Test_AddVolumeNode_Positive_NewVolumeNewNode(t *testing.T) {
t.Fatalf("AddVolumeNode failed. Expected: <no error> Actual: <%v>", err) t.Fatalf("AddVolumeNode failed. Expected: <no error> Actual: <%v>", err)
} }
volumeNodeComboExists := asw.IsVolumeAttachedToNode(generatedVolumeName, nodeName) volumeNodeComboState := asw.GetAttachState(generatedVolumeName, nodeName)
if !volumeNodeComboExists { if volumeNodeComboState != AttachStateAttached {
t.Fatalf("%q/%q volume/node combo does not exist, it should.", generatedVolumeName, nodeName) t.Fatalf("%q/%q volume/node combo is marked %q; expected 'Attached'.", generatedVolumeName, nodeName, volumeNodeComboState)
} }
attachedVolumes := asw.GetAttachedVolumes() attachedVolumes := asw.GetAttachedVolumes()
@ -82,9 +82,9 @@ func Test_AddVolumeNode_Positive_NewVolumeNewNodeWithFalseAttached(t *testing.T)
t.Fatalf("AddVolumeNode failed. Expected: <no error> Actual: <%v>", err) t.Fatalf("AddVolumeNode failed. Expected: <no error> Actual: <%v>", err)
} }
volumeNodeComboExists := asw.IsVolumeAttachedToNode(generatedVolumeName, nodeName) volumeNodeComboState := asw.GetAttachState(generatedVolumeName, nodeName)
if volumeNodeComboExists { if volumeNodeComboState != AttachStateUncertain {
t.Fatalf("%q/%q volume/node combo does exist, it should not.", generatedVolumeName, nodeName) t.Fatalf("%q/%q volume/node combo is marked %q, expected 'Uncertain'.", generatedVolumeName, nodeName, volumeNodeComboState)
} }
allVolumes := asw.GetAttachedVolumes() allVolumes := asw.GetAttachedVolumes()
@ -131,9 +131,9 @@ func Test_AddVolumeNode_Positive_NewVolumeNewNodeWithFalseAttached(t *testing.T)
generatedVolumeName2) generatedVolumeName2)
} }
volumeNodeComboExists = asw.IsVolumeAttachedToNode(generatedVolumeName, nodeName) volumeNodeComboState = asw.GetAttachState(generatedVolumeName, nodeName)
if !volumeNodeComboExists { if volumeNodeComboState != AttachStateAttached {
t.Fatalf("%q/%q combo does not exist, it should.", generatedVolumeName, nodeName) t.Fatalf("%q/%q volume/node combo is marked %q; expected 'Attached'.", generatedVolumeName, nodeName, volumeNodeComboState)
} }
attachedVolumes := asw.GetAttachedVolumes() attachedVolumes := asw.GetAttachedVolumes()
@ -182,9 +182,9 @@ func Test_AddVolumeNode_Positive_NewVolumeTwoNodesWithFalseAttached(t *testing.T
t.Fatalf("AddVolumeNode failed. Expected: <no error> Actual: <%v>", err) t.Fatalf("AddVolumeNode failed. Expected: <no error> Actual: <%v>", err)
} }
volumeNodeComboExists := asw.IsVolumeAttachedToNode(generatedVolumeName, node1Name) volumeNodeComboState := asw.GetAttachState(generatedVolumeName, node1Name)
if volumeNodeComboExists { if volumeNodeComboState != AttachStateUncertain {
t.Fatalf("%q/%q volume/node combo does exist, it should not.", generatedVolumeName, node1Name) t.Fatalf("%q/%q volume/node combo is marked %q, expected 'Uncertain'.", generatedVolumeName, node1Name, volumeNodeComboState)
} }
generatedVolumeName2, add2Err := asw.AddVolumeNode(volumeName, volumeSpec, node2Name, devicePath, true) generatedVolumeName2, add2Err := asw.AddVolumeNode(volumeName, volumeSpec, node2Name, devicePath, true)
@ -201,9 +201,9 @@ func Test_AddVolumeNode_Positive_NewVolumeTwoNodesWithFalseAttached(t *testing.T
generatedVolumeName2) generatedVolumeName2)
} }
volumeNodeComboExists = asw.IsVolumeAttachedToNode(generatedVolumeName, node2Name) volumeNodeComboState = asw.GetAttachState(generatedVolumeName, node2Name)
if !volumeNodeComboExists { if volumeNodeComboState != AttachStateAttached {
t.Fatalf("%q/%q combo does not exist, it should.", generatedVolumeName, node2Name) t.Fatalf("%q/%q volume/node combo is marked %q; expected 'Attached'.", generatedVolumeName, node2Name, volumeNodeComboState)
} }
attachedVolumes := asw.GetAttachedVolumes() attachedVolumes := asw.GetAttachedVolumes()
@ -268,14 +268,14 @@ func Test_AddVolumeNode_Positive_ExistingVolumeNewNode(t *testing.T) {
generatedVolumeName2) generatedVolumeName2)
} }
volumeNode1ComboExists := asw.IsVolumeAttachedToNode(generatedVolumeName1, node1Name) volumeNode1ComboState := asw.GetAttachState(generatedVolumeName1, node1Name)
if !volumeNode1ComboExists { if volumeNode1ComboState != AttachStateAttached {
t.Fatalf("%q/%q volume/node combo does not exist, it should.", generatedVolumeName1, node1Name) t.Fatalf("%q/%q volume/node combo is marked %q; expected 'Attached'.", generatedVolumeName1, node1Name, volumeNode1ComboState)
} }
volumeNode2ComboExists := asw.IsVolumeAttachedToNode(generatedVolumeName1, node2Name) volumeNode2ComboState := asw.GetAttachState(generatedVolumeName1, node2Name)
if !volumeNode2ComboExists { if volumeNode2ComboState != AttachStateAttached {
t.Fatalf("%q/%q volume/node combo does not exist, it should.", generatedVolumeName1, node2Name) t.Fatalf("%q/%q volume/node combo is marked %q; expected 'Attached'.", generatedVolumeName1, node2Name, volumeNode2ComboState)
} }
attachedVolumes := asw.GetAttachedVolumes() attachedVolumes := asw.GetAttachedVolumes()
@ -317,9 +317,9 @@ func Test_AddVolumeNode_Positive_ExistingVolumeExistingNode(t *testing.T) {
generatedVolumeName2) generatedVolumeName2)
} }
volumeNodeComboExists := asw.IsVolumeAttachedToNode(generatedVolumeName1, nodeName) volumeNodeComboState := asw.GetAttachState(generatedVolumeName1, nodeName)
if !volumeNodeComboExists { if volumeNodeComboState != AttachStateAttached {
t.Fatalf("%q/%q volume/node combo does not exist, it should.", generatedVolumeName1, nodeName) t.Fatalf("%q/%q volume/node combo is marked %q; expected 'Attached'.", generatedVolumeName1, nodeName, volumeNodeComboState)
} }
attachedVolumes := asw.GetAttachedVolumes() attachedVolumes := asw.GetAttachedVolumes()
@ -350,9 +350,9 @@ func Test_DeleteVolumeNode_Positive_VolumeExistsNodeExists(t *testing.T) {
asw.DeleteVolumeNode(generatedVolumeName, nodeName) asw.DeleteVolumeNode(generatedVolumeName, nodeName)
// Assert // Assert
volumeNodeComboExists := asw.IsVolumeAttachedToNode(generatedVolumeName, nodeName) volumeNodeComboState := asw.GetAttachState(generatedVolumeName, nodeName)
if volumeNodeComboExists { if volumeNodeComboState != AttachStateDetached {
t.Fatalf("%q/%q volume/node combo exists, it should not.", generatedVolumeName, nodeName) t.Fatalf("%q/%q volume/node combo is marked %q, expected 'Detached'.", generatedVolumeName, nodeName, volumeNodeComboState)
} }
attachedVolumes := asw.GetAttachedVolumes() attachedVolumes := asw.GetAttachedVolumes()
@ -374,9 +374,9 @@ func Test_DeleteVolumeNode_Positive_VolumeDoesntExistNodeDoesntExist(t *testing.
asw.DeleteVolumeNode(volumeName, nodeName) asw.DeleteVolumeNode(volumeName, nodeName)
// Assert // Assert
volumeNodeComboExists := asw.IsVolumeAttachedToNode(volumeName, nodeName) volumeNodeComboState := asw.GetAttachState(volumeName, nodeName)
if volumeNodeComboExists { if volumeNodeComboState != AttachStateDetached {
t.Fatalf("%q/%q volume/node combo exists, it should not.", volumeName, nodeName) t.Fatalf("%q/%q volume/node combo is marked %q, expected 'Detached'.", volumeName, nodeName, volumeNodeComboState)
} }
attachedVolumes := asw.GetAttachedVolumes() attachedVolumes := asw.GetAttachedVolumes()
@ -417,14 +417,14 @@ func Test_DeleteVolumeNode_Positive_TwoNodesOneDeleted(t *testing.T) {
asw.DeleteVolumeNode(generatedVolumeName1, node1Name) asw.DeleteVolumeNode(generatedVolumeName1, node1Name)
// Assert // Assert
volumeNodeComboExists := asw.IsVolumeAttachedToNode(generatedVolumeName1, node1Name) volumeNodeComboState := asw.GetAttachState(generatedVolumeName1, node1Name)
if volumeNodeComboExists { if volumeNodeComboState != AttachStateDetached {
t.Fatalf("%q/%q volume/node combo exists, it should not.", generatedVolumeName1, node1Name) t.Fatalf("%q/%q volume/node combo is marked %q, expected 'Detached'.", generatedVolumeName1, node1Name, volumeNodeComboState)
} }
volumeNodeComboExists = asw.IsVolumeAttachedToNode(generatedVolumeName1, node2Name) volumeNodeComboState = asw.GetAttachState(generatedVolumeName1, node2Name)
if !volumeNodeComboExists { if volumeNodeComboState != AttachStateAttached {
t.Fatalf("%q/%q volume/node combo does not exist, it should.", generatedVolumeName1, node2Name) t.Fatalf("%q/%q volume/node combo is marked %q; expected 'Attached'.", generatedVolumeName1, node2Name, volumeNodeComboState)
} }
attachedVolumes := asw.GetAttachedVolumes() attachedVolumes := asw.GetAttachedVolumes()
@ -436,7 +436,7 @@ func Test_DeleteVolumeNode_Positive_TwoNodesOneDeleted(t *testing.T) {
} }
// Populates data struct with one volume/node entry. // Populates data struct with one volume/node entry.
// Calls IsVolumeAttachedToNode() to verify entry. // Calls GetAttachState() to verify entry.
// Verifies the populated volume/node entry exists. // Verifies the populated volume/node entry exists.
func Test_VolumeNodeExists_Positive_VolumeExistsNodeExists(t *testing.T) { func Test_VolumeNodeExists_Positive_VolumeExistsNodeExists(t *testing.T) {
// Arrange // Arrange
@ -452,11 +452,11 @@ func Test_VolumeNodeExists_Positive_VolumeExistsNodeExists(t *testing.T) {
} }
// Act // Act
volumeNodeComboExists := asw.IsVolumeAttachedToNode(generatedVolumeName, nodeName) volumeNodeComboState := asw.GetAttachState(generatedVolumeName, nodeName)
// Assert // Assert
if !volumeNodeComboExists { if volumeNodeComboState != AttachStateAttached {
t.Fatalf("%q/%q volume/node combo does not exist, it should.", generatedVolumeName, nodeName) t.Fatalf("%q/%q volume/node combo is marked %q; expected 'Attached'.", generatedVolumeName, nodeName, volumeNodeComboState)
} }
attachedVolumes := asw.GetAttachedVolumes() attachedVolumes := asw.GetAttachedVolumes()
@ -468,7 +468,7 @@ func Test_VolumeNodeExists_Positive_VolumeExistsNodeExists(t *testing.T) {
} }
// Populates data struct with one volume1/node1 entry. // Populates data struct with one volume1/node1 entry.
// Calls IsVolumeAttachedToNode() with volume1/node2. // Calls GetAttachState() with volume1/node2.
// Verifies requested entry does not exist, but populated entry does. // Verifies requested entry does not exist, but populated entry does.
func Test_VolumeNodeExists_Positive_VolumeExistsNodeDoesntExist(t *testing.T) { func Test_VolumeNodeExists_Positive_VolumeExistsNodeDoesntExist(t *testing.T) {
// Arrange // Arrange
@ -485,11 +485,11 @@ func Test_VolumeNodeExists_Positive_VolumeExistsNodeDoesntExist(t *testing.T) {
} }
// Act // Act
volumeNodeComboExists := asw.IsVolumeAttachedToNode(generatedVolumeName, node2Name) volumeNodeComboState := asw.GetAttachState(generatedVolumeName, node2Name)
// Assert // Assert
if volumeNodeComboExists { if volumeNodeComboState != AttachStateDetached {
t.Fatalf("%q/%q volume/node combo exists, it should not.", generatedVolumeName, node2Name) t.Fatalf("%q/%q volume/node combo is marked %q, expected 'Detached'.", generatedVolumeName, node2Name, volumeNodeComboState)
} }
attachedVolumes := asw.GetAttachedVolumes() attachedVolumes := asw.GetAttachedVolumes()
@ -500,7 +500,7 @@ func Test_VolumeNodeExists_Positive_VolumeExistsNodeDoesntExist(t *testing.T) {
verifyAttachedVolume(t, attachedVolumes, generatedVolumeName, string(volumeName), node1Name, devicePath, true /* expectedMountedByNode */, false /* expectNonZeroDetachRequestedTime */) verifyAttachedVolume(t, attachedVolumes, generatedVolumeName, string(volumeName), node1Name, devicePath, true /* expectedMountedByNode */, false /* expectNonZeroDetachRequestedTime */)
} }
// Calls IsVolumeAttachedToNode() on empty data struct. // Calls GetAttachState() on empty data struct.
// Verifies requested entry does not exist. // Verifies requested entry does not exist.
func Test_VolumeNodeExists_Positive_VolumeAndNodeDontExist(t *testing.T) { func Test_VolumeNodeExists_Positive_VolumeAndNodeDontExist(t *testing.T) {
// Arrange // Arrange
@ -510,11 +510,11 @@ func Test_VolumeNodeExists_Positive_VolumeAndNodeDontExist(t *testing.T) {
nodeName := types.NodeName("node-name") nodeName := types.NodeName("node-name")
// Act // Act
volumeNodeComboExists := asw.IsVolumeAttachedToNode(volumeName, nodeName) volumeNodeComboState := asw.GetAttachState(volumeName, nodeName)
// Assert // Assert
if volumeNodeComboExists { if volumeNodeComboState != AttachStateDetached {
t.Fatalf("%q/%q volume/node combo exists, it should not.", volumeName, nodeName) t.Fatalf("%q/%q volume/node combo is marked %q, expected 'Detached'.", volumeName, nodeName, volumeNodeComboState)
} }
attachedVolumes := asw.GetAttachedVolumes() attachedVolumes := asw.GetAttachedVolumes()

View File

@ -142,6 +142,7 @@ func (rc *reconciler) reconcile() {
for _, attachedVolume := range rc.actualStateOfWorld.GetAttachedVolumes() { for _, attachedVolume := range rc.actualStateOfWorld.GetAttachedVolumes() {
if !rc.desiredStateOfWorld.VolumeExists( if !rc.desiredStateOfWorld.VolumeExists(
attachedVolume.VolumeName, attachedVolume.NodeName) { attachedVolume.VolumeName, attachedVolume.NodeName) {
// Check whether there already exist an operation pending, and don't even // Check whether there already exist an operation pending, and don't even
// try to start an operation if there is already one running. // try to start an operation if there is already one running.
// This check must be done before we do any other checks, as otherwise the other checks // This check must be done before we do any other checks, as otherwise the other checks
@ -161,6 +162,21 @@ func (rc *reconciler) reconcile() {
} }
} }
// Because the detach operation updates the ActualStateOfWorld before
// marking itself complete, it's possible for the volume to be removed
// from the ActualStateOfWorld between the GetAttachedVolumes() check
// and the IsOperationPending() check above.
// Check the ActualStateOfWorld again to avoid issuing an unnecessary
// detach.
// See https://github.com/kubernetes/kubernetes/issues/93902
attachState := rc.actualStateOfWorld.GetAttachState(attachedVolume.VolumeName, attachedVolume.NodeName)
if attachState == cache.AttachStateDetached {
if klog.V(5).Enabled() {
klog.Infof(attachedVolume.GenerateMsgDetailed("Volume detached--skipping", ""))
}
continue
}
// Set the detach request time // Set the detach request time
elapsedTime, err := rc.actualStateOfWorld.SetDetachRequestTime(attachedVolume.VolumeName, attachedVolume.NodeName) elapsedTime, err := rc.actualStateOfWorld.SetDetachRequestTime(attachedVolume.VolumeName, attachedVolume.NodeName)
if err != nil { if err != nil {
@ -226,7 +242,31 @@ func (rc *reconciler) reconcile() {
func (rc *reconciler) attachDesiredVolumes() { func (rc *reconciler) attachDesiredVolumes() {
// Ensure volumes that should be attached are attached. // Ensure volumes that should be attached are attached.
for _, volumeToAttach := range rc.desiredStateOfWorld.GetVolumesToAttach() { for _, volumeToAttach := range rc.desiredStateOfWorld.GetVolumesToAttach() {
if rc.actualStateOfWorld.IsVolumeAttachedToNode(volumeToAttach.VolumeName, volumeToAttach.NodeName) { if util.IsMultiAttachAllowed(volumeToAttach.VolumeSpec) {
// Don't even try to start an operation if there is already one running for the given volume and node.
if rc.attacherDetacher.IsOperationPending(volumeToAttach.VolumeName, "" /* podName */, volumeToAttach.NodeName) {
if klog.V(10).Enabled() {
klog.Infof("Operation for volume %q is already running for node %q. Can't start attach", volumeToAttach.VolumeName, volumeToAttach.NodeName)
}
continue
}
} else {
// Don't even try to start an operation if there is already one running for the given volume
if rc.attacherDetacher.IsOperationPending(volumeToAttach.VolumeName, "" /* podName */, "" /* nodeName */) {
if klog.V(10).Enabled() {
klog.Infof("Operation for volume %q is already running. Can't start attach for %q", volumeToAttach.VolumeName, volumeToAttach.NodeName)
}
continue
}
}
// Because the attach operation updates the ActualStateOfWorld before
// marking itself complete, IsOperationPending() must be checked before
// GetAttachState() to guarantee the ActualStateOfWorld is
// up-to-date when it's read.
// See https://github.com/kubernetes/kubernetes/issues/93902
attachState := rc.actualStateOfWorld.GetAttachState(volumeToAttach.VolumeName, volumeToAttach.NodeName)
if attachState == cache.AttachStateAttached {
// Volume/Node exists, touch it to reset detachRequestedTime // Volume/Node exists, touch it to reset detachRequestedTime
if klog.V(5).Enabled() { if klog.V(5).Enabled() {
klog.Infof(volumeToAttach.GenerateMsgDetailed("Volume attached--touching", "")) klog.Infof(volumeToAttach.GenerateMsgDetailed("Volume attached--touching", ""))
@ -235,26 +275,7 @@ func (rc *reconciler) attachDesiredVolumes() {
continue continue
} }
if util.IsMultiAttachAllowed(volumeToAttach.VolumeSpec) { if !util.IsMultiAttachAllowed(volumeToAttach.VolumeSpec) {
// Don't even try to start an operation if there is already one running for the given volume and node.
if rc.attacherDetacher.IsOperationPending(volumeToAttach.VolumeName, "" /* podName */, volumeToAttach.NodeName) {
if klog.V(10).Enabled() {
klog.Infof("Operation for volume %q is already running for node %q. Can't start attach", volumeToAttach.VolumeName, volumeToAttach.NodeName)
}
continue
}
} else {
// Don't even try to start an operation if there is already one running for the given volume
if rc.attacherDetacher.IsOperationPending(volumeToAttach.VolumeName, "" /* podName */, "" /* nodeName */) {
if klog.V(10).Enabled() {
klog.Infof("Operation for volume %q is already running. Can't start attach for %q", volumeToAttach.VolumeName, volumeToAttach.NodeName)
}
continue
}
nodes := rc.actualStateOfWorld.GetNodesForAttachedVolume(volumeToAttach.VolumeName) nodes := rc.actualStateOfWorld.GetNodesForAttachedVolume(volumeToAttach.VolumeName)
if len(nodes) > 0 { if len(nodes) > 0 {
if !volumeToAttach.MultiAttachErrorReported { if !volumeToAttach.MultiAttachErrorReported {
@ -263,7 +284,6 @@ func (rc *reconciler) attachDesiredVolumes() {
} }
continue continue
} }
} }
// Volume/Node doesn't exist, spawn a goroutine to attach it // Volume/Node doesn't exist, spawn a goroutine to attach it

View File

@ -347,7 +347,7 @@ func Test_Run_Negative_OneDesiredVolumeAttachThenDetachWithUnmountedVolumeUpdate
} }
// Creates a volume with accessMode ReadWriteMany // Creates a volume with accessMode ReadWriteMany
// Populates desiredStateOfWorld cache with two ode/volume/pod tuples pointing to the created volume // Populates desiredStateOfWorld cache with two node/volume/pod tuples pointing to the created volume
// Calls Run() // Calls Run()
// Verifies there are two attach calls and no detach calls. // Verifies there are two attach calls and no detach calls.
// Deletes the first node/volume/pod tuple from desiredStateOfWorld cache without first marking the node/volume as unmounted. // Deletes the first node/volume/pod tuple from desiredStateOfWorld cache without first marking the node/volume as unmounted.
@ -536,7 +536,7 @@ func Test_Run_OneVolumeAttachAndDetachMultipleNodesWithReadWriteOnce(t *testing.
// Creates a volume with accessMode ReadWriteOnce // Creates a volume with accessMode ReadWriteOnce
// First create a pod which will try to attach the volume to the a node named "uncertain-node". The attach call for this node will // First create a pod which will try to attach the volume to the a node named "uncertain-node". The attach call for this node will
// fail for timeout, but the volume will be actually attached to the node after the call. // fail for timeout, but the volume will be actually attached to the node after the call.
// Secondly, delete the this pod. // Secondly, delete this pod.
// Lastly, create a pod scheduled to a normal node which will trigger attach volume to the node. The attach should return successfully. // Lastly, create a pod scheduled to a normal node which will trigger attach volume to the node. The attach should return successfully.
func Test_Run_OneVolumeAttachAndDetachUncertainNodesWithReadWriteOnce(t *testing.T) { func Test_Run_OneVolumeAttachAndDetachUncertainNodesWithReadWriteOnce(t *testing.T) {
// Arrange // Arrange
@ -577,9 +577,9 @@ func Test_Run_OneVolumeAttachAndDetachUncertainNodesWithReadWriteOnce(t *testing
} }
time.Sleep(1 * time.Second) time.Sleep(1 * time.Second)
// Volume is added to asw. Because attach operation fails, volume should not reported as attached to the node. // Volume is added to asw. Because attach operation fails, volume should not be reported as attached to the node.
waitForVolumeAddedToNode(t, generatedVolumeName, nodeName1, asw) waitForVolumeAddedToNode(t, generatedVolumeName, nodeName1, asw)
verifyVolumeAttachedToNode(t, generatedVolumeName, nodeName1, true, asw) verifyVolumeAttachedToNode(t, generatedVolumeName, nodeName1, cache.AttachStateAttached, asw)
verifyVolumeReportedAsAttachedToNode(t, generatedVolumeName, nodeName1, true, asw) verifyVolumeReportedAsAttachedToNode(t, generatedVolumeName, nodeName1, true, asw)
// When volume is added to the node, it is set to mounted by default. Then the status will be updated by checking node status VolumeInUse. // When volume is added to the node, it is set to mounted by default. Then the status will be updated by checking node status VolumeInUse.
@ -596,7 +596,7 @@ func Test_Run_OneVolumeAttachAndDetachUncertainNodesWithReadWriteOnce(t *testing
t.Fatalf("AddPod failed. Expected: <no error> Actual: <%v>", podAddErr) t.Fatalf("AddPod failed. Expected: <no error> Actual: <%v>", podAddErr)
} }
waitForVolumeAttachedToNode(t, generatedVolumeName, nodeName2, asw) waitForVolumeAttachedToNode(t, generatedVolumeName, nodeName2, asw)
verifyVolumeAttachedToNode(t, generatedVolumeName, nodeName2, true, asw) verifyVolumeAttachedToNode(t, generatedVolumeName, nodeName2, cache.AttachStateAttached, asw)
} }
@ -643,9 +643,9 @@ func Test_Run_OneVolumeAttachAndDetachTimeoutNodesWithReadWriteOnce(t *testing.T
t.Fatalf("AddPod failed. Expected: <no error> Actual: <%v>", podAddErr) t.Fatalf("AddPod failed. Expected: <no error> Actual: <%v>", podAddErr)
} }
// Volume is added to asw. Because attach operation fails, volume should not reported as attached to the node. // Volume is added to asw. Because attach operation fails, volume should not be reported as attached to the node.
waitForVolumeAddedToNode(t, generatedVolumeName, nodeName1, asw) waitForVolumeAddedToNode(t, generatedVolumeName, nodeName1, asw)
verifyVolumeAttachedToNode(t, generatedVolumeName, nodeName1, false, asw) verifyVolumeAttachedToNode(t, generatedVolumeName, nodeName1, cache.AttachStateUncertain, asw)
verifyVolumeReportedAsAttachedToNode(t, generatedVolumeName, nodeName1, false, asw) verifyVolumeReportedAsAttachedToNode(t, generatedVolumeName, nodeName1, false, asw)
// When volume is added to the node, it is set to mounted by default. Then the status will be updated by checking node status VolumeInUse. // When volume is added to the node, it is set to mounted by default. Then the status will be updated by checking node status VolumeInUse.
@ -662,7 +662,7 @@ func Test_Run_OneVolumeAttachAndDetachTimeoutNodesWithReadWriteOnce(t *testing.T
t.Fatalf("AddPod failed. Expected: <no error> Actual: <%v>", podAddErr) t.Fatalf("AddPod failed. Expected: <no error> Actual: <%v>", podAddErr)
} }
waitForVolumeAttachedToNode(t, generatedVolumeName, nodeName2, asw) waitForVolumeAttachedToNode(t, generatedVolumeName, nodeName2, asw)
verifyVolumeAttachedToNode(t, generatedVolumeName, nodeName2, true, asw) verifyVolumeAttachedToNode(t, generatedVolumeName, nodeName2, cache.AttachStateAttached, asw)
} }
@ -1048,7 +1048,8 @@ func waitForVolumeAttachedToNode(
err := retryWithExponentialBackOff( err := retryWithExponentialBackOff(
time.Duration(500*time.Millisecond), time.Duration(500*time.Millisecond),
func() (bool, error) { func() (bool, error) {
if asw.IsVolumeAttachedToNode(volumeName, nodeName) { attachState := asw.GetAttachState(volumeName, nodeName)
if attachState == cache.AttachStateAttached {
return true, nil return true, nil
} }
t.Logf( t.Logf(
@ -1060,7 +1061,8 @@ func waitForVolumeAttachedToNode(
}, },
) )
if err != nil && !asw.IsVolumeAttachedToNode(volumeName, nodeName) { attachState := asw.GetAttachState(volumeName, nodeName)
if err != nil && attachState != cache.AttachStateAttached {
t.Fatalf( t.Fatalf(
"Volume <%v> is not attached to node <%v>.", "Volume <%v> is not attached to node <%v>.",
volumeName, volumeName,
@ -1141,19 +1143,17 @@ func verifyVolumeAttachedToNode(
t *testing.T, t *testing.T,
volumeName v1.UniqueVolumeName, volumeName v1.UniqueVolumeName,
nodeName k8stypes.NodeName, nodeName k8stypes.NodeName,
isAttached bool, expectedAttachState cache.AttachState,
asw cache.ActualStateOfWorld, asw cache.ActualStateOfWorld,
) { ) {
result := asw.IsVolumeAttachedToNode(volumeName, nodeName) attachState := asw.GetAttachState(volumeName, nodeName)
if result == isAttached { if attachState != expectedAttachState {
return t.Fatalf("Check volume <%v> is attached to node <%v>, got %v, expected %v",
volumeName,
nodeName,
attachState,
expectedAttachState)
} }
t.Fatalf("Check volume <%v> is attached to node <%v>, got %v, expected %v",
volumeName,
nodeName,
result,
isAttached)
} }
func verifyVolumeReportedAsAttachedToNode( func verifyVolumeReportedAsAttachedToNode(

View File

@ -26,6 +26,7 @@ go_library(
"//staging/src/k8s.io/apimachinery/pkg/api/resource:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/api/resource:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/types:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/types:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/sets:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/uuid:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/util/uuid:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/wait:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/util/wait:go_default_library",
"//staging/src/k8s.io/client-go/informers:go_default_library", "//staging/src/k8s.io/client-go/informers:go_default_library",

View File

@ -27,6 +27,7 @@ import (
"testing" "testing"
"time" "time"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/mount-utils" "k8s.io/mount-utils"
"k8s.io/utils/exec" "k8s.io/utils/exec"
testingexec "k8s.io/utils/exec/testing" testingexec "k8s.io/utils/exec/testing"
@ -427,7 +428,7 @@ func (plugin *FakeVolumePlugin) getFakeVolume(list *[]*FakeVolume) *FakeVolume {
WaitForAttachHook: plugin.WaitForAttachHook, WaitForAttachHook: plugin.WaitForAttachHook,
UnmountDeviceHook: plugin.UnmountDeviceHook, UnmountDeviceHook: plugin.UnmountDeviceHook,
} }
volume.VolumesAttached = make(map[string]types.NodeName) volume.VolumesAttached = make(map[string]sets.String)
volume.DeviceMountState = make(map[string]string) volume.DeviceMountState = make(map[string]string)
volume.VolumeMountState = make(map[string]string) volume.VolumeMountState = make(map[string]string)
*list = append(*list, volume) *list = append(*list, volume)
@ -836,7 +837,7 @@ type FakeVolume struct {
VolName string VolName string
Plugin *FakeVolumePlugin Plugin *FakeVolumePlugin
MetricsNil MetricsNil
VolumesAttached map[string]types.NodeName VolumesAttached map[string]sets.String
DeviceMountState map[string]string DeviceMountState map[string]string
VolumeMountState map[string]string VolumeMountState map[string]string
@ -1155,11 +1156,12 @@ func (fv *FakeVolume) Attach(spec *Spec, nodeName types.NodeName) (string, error
fv.Lock() fv.Lock()
defer fv.Unlock() defer fv.Unlock()
fv.AttachCallCount++ fv.AttachCallCount++
volumeName, err := getUniqueVolumeName(spec) volumeName, err := getUniqueVolumeName(spec)
if err != nil { if err != nil {
return "", err return "", err
} }
volumeNode, exist := fv.VolumesAttached[volumeName] volumeNodes, exist := fv.VolumesAttached[volumeName]
if exist { if exist {
if nodeName == UncertainAttachNode { if nodeName == UncertainAttachNode {
return "/dev/vdb-test", nil return "/dev/vdb-test", nil
@ -1169,13 +1171,14 @@ func (fv *FakeVolume) Attach(spec *Spec, nodeName types.NodeName) (string, error
if nodeName == TimeoutAttachNode { if nodeName == TimeoutAttachNode {
return "", fmt.Errorf("Timed out to attach volume %q to node %q", volumeName, nodeName) return "", fmt.Errorf("Timed out to attach volume %q to node %q", volumeName, nodeName)
} }
if volumeNode == nodeName || volumeNode == MultiAttachNode || nodeName == MultiAttachNode { if volumeNodes.Has(string(nodeName)) || volumeNodes.Has(MultiAttachNode) || nodeName == MultiAttachNode {
volumeNodes.Insert(string(nodeName))
return "/dev/vdb-test", nil return "/dev/vdb-test", nil
} }
return "", fmt.Errorf("volume %q trying to attach to node %q is already attached to node %q", volumeName, nodeName, volumeNode) return "", fmt.Errorf("volume %q trying to attach to node %q is already attached to node %q", volumeName, nodeName, volumeNodes)
} }
fv.VolumesAttached[volumeName] = nodeName fv.VolumesAttached[volumeName] = sets.NewString(string(nodeName))
if nodeName == UncertainAttachNode || nodeName == TimeoutAttachNode { if nodeName == UncertainAttachNode || nodeName == TimeoutAttachNode {
return "", fmt.Errorf("Timed out to attach volume %q to node %q", volumeName, nodeName) return "", fmt.Errorf("Timed out to attach volume %q to node %q", volumeName, nodeName)
} }
@ -1273,10 +1276,18 @@ func (fv *FakeVolume) Detach(volumeName string, nodeName types.NodeName) error {
fv.Lock() fv.Lock()
defer fv.Unlock() defer fv.Unlock()
fv.DetachCallCount++ fv.DetachCallCount++
if _, exist := fv.VolumesAttached[volumeName]; !exist {
return fmt.Errorf("Trying to detach volume %q that is not attached to the node %q", volumeName, nodeName) node := string(nodeName)
volumeNodes, exist := fv.VolumesAttached[volumeName]
if !exist || !volumeNodes.Has(node) {
return fmt.Errorf("Trying to detach volume %q that is not attached to the node %q", volumeName, node)
} }
delete(fv.VolumesAttached, volumeName)
volumeNodes.Delete(node)
if volumeNodes.Len() == 0 {
delete(fv.VolumesAttached, volumeName)
}
return nil return nil
} }