mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-08-14 14:23:37 +00:00
Fixes Attach Detach Controller reconciler race reading ActualStateOfWorld and operation pending states; fixes reconciler_test mock detach to account for multiple attaches on a node
This commit is contained in:
parent
a9e9cabbea
commit
a61743b125
@ -98,7 +98,8 @@ type ActualStateOfWorld interface {
|
|||||||
|
|
||||||
// IsVolumeAttachedToNode returns true if the specified volume/node combo exists
|
// IsVolumeAttachedToNode returns true if the specified volume/node combo exists
|
||||||
// in the underlying store indicating the specified volume is attached to
|
// in the underlying store indicating the specified volume is attached to
|
||||||
// the specified node.
|
// the specified node, and false if either the combo does not exist, or the
|
||||||
|
// attached state is marked as uncertain.
|
||||||
IsVolumeAttachedToNode(volumeName v1.UniqueVolumeName, nodeName types.NodeName) bool
|
IsVolumeAttachedToNode(volumeName v1.UniqueVolumeName, nodeName types.NodeName) bool
|
||||||
|
|
||||||
// GetAttachedVolumes generates and returns a list of volumes/node pairs
|
// GetAttachedVolumes generates and returns a list of volumes/node pairs
|
||||||
|
@ -142,6 +142,7 @@ func (rc *reconciler) reconcile() {
|
|||||||
for _, attachedVolume := range rc.actualStateOfWorld.GetAttachedVolumes() {
|
for _, attachedVolume := range rc.actualStateOfWorld.GetAttachedVolumes() {
|
||||||
if !rc.desiredStateOfWorld.VolumeExists(
|
if !rc.desiredStateOfWorld.VolumeExists(
|
||||||
attachedVolume.VolumeName, attachedVolume.NodeName) {
|
attachedVolume.VolumeName, attachedVolume.NodeName) {
|
||||||
|
|
||||||
// Check whether there already exist an operation pending, and don't even
|
// Check whether there already exist an operation pending, and don't even
|
||||||
// try to start an operation if there is already one running.
|
// try to start an operation if there is already one running.
|
||||||
// This check must be done before we do any other checks, as otherwise the other checks
|
// This check must be done before we do any other checks, as otherwise the other checks
|
||||||
@ -161,6 +162,28 @@ func (rc *reconciler) reconcile() {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Because the detach operation updates the ActualStateOfWorld before
|
||||||
|
// marking itself complete, it's possible for the volume to be removed
|
||||||
|
// from the ActualStateOfWorld between the GetAttachedVolumes() check
|
||||||
|
// and the IsOperationPending() check above.
|
||||||
|
// Check the ActualStateOfWorld again to avoid issuing an unnecessary
|
||||||
|
// detach.
|
||||||
|
// See https://github.com/kubernetes/kubernetes/issues/93902
|
||||||
|
attachedVolumesForNode := rc.actualStateOfWorld.GetAttachedVolumesForNode(attachedVolume.NodeName)
|
||||||
|
stillAttached := false
|
||||||
|
for _, volForNode := range attachedVolumesForNode {
|
||||||
|
if volForNode.VolumeName == attachedVolume.VolumeName {
|
||||||
|
stillAttached = true
|
||||||
|
break
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if !stillAttached {
|
||||||
|
if klog.V(5).Enabled() {
|
||||||
|
klog.Infof(attachedVolume.GenerateMsgDetailed("Volume detached--skipping", ""))
|
||||||
|
}
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
// Set the detach request time
|
// Set the detach request time
|
||||||
elapsedTime, err := rc.actualStateOfWorld.SetDetachRequestTime(attachedVolume.VolumeName, attachedVolume.NodeName)
|
elapsedTime, err := rc.actualStateOfWorld.SetDetachRequestTime(attachedVolume.VolumeName, attachedVolume.NodeName)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@ -226,6 +249,29 @@ func (rc *reconciler) reconcile() {
|
|||||||
func (rc *reconciler) attachDesiredVolumes() {
|
func (rc *reconciler) attachDesiredVolumes() {
|
||||||
// Ensure volumes that should be attached are attached.
|
// Ensure volumes that should be attached are attached.
|
||||||
for _, volumeToAttach := range rc.desiredStateOfWorld.GetVolumesToAttach() {
|
for _, volumeToAttach := range rc.desiredStateOfWorld.GetVolumesToAttach() {
|
||||||
|
if util.IsMultiAttachAllowed(volumeToAttach.VolumeSpec) {
|
||||||
|
// Don't even try to start an operation if there is already one running for the given volume and node.
|
||||||
|
if rc.attacherDetacher.IsOperationPending(volumeToAttach.VolumeName, "" /* podName */, volumeToAttach.NodeName) {
|
||||||
|
if klog.V(10).Enabled() {
|
||||||
|
klog.Infof("Operation for volume %q is already running for node %q. Can't start attach", volumeToAttach.VolumeName, volumeToAttach.NodeName)
|
||||||
|
}
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
// Don't even try to start an operation if there is already one running for the given volume
|
||||||
|
if rc.attacherDetacher.IsOperationPending(volumeToAttach.VolumeName, "" /* podName */, "" /* nodeName */) {
|
||||||
|
if klog.V(10).Enabled() {
|
||||||
|
klog.Infof("Operation for volume %q is already running. Can't start attach for %q", volumeToAttach.VolumeName, volumeToAttach.NodeName)
|
||||||
|
}
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Because the attach operation updates the ActualStateOfWorld before
|
||||||
|
// marking itself complete, IsOperationPending() must be checked before
|
||||||
|
// IsVolumeAttachedToNode() to guarantee the ActualStateOfWorld is
|
||||||
|
// up-to-date when it's read.
|
||||||
|
// See https://github.com/kubernetes/kubernetes/issues/93902
|
||||||
if rc.actualStateOfWorld.IsVolumeAttachedToNode(volumeToAttach.VolumeName, volumeToAttach.NodeName) {
|
if rc.actualStateOfWorld.IsVolumeAttachedToNode(volumeToAttach.VolumeName, volumeToAttach.NodeName) {
|
||||||
// Volume/Node exists, touch it to reset detachRequestedTime
|
// Volume/Node exists, touch it to reset detachRequestedTime
|
||||||
if klog.V(5).Enabled() {
|
if klog.V(5).Enabled() {
|
||||||
@ -235,26 +281,7 @@ func (rc *reconciler) attachDesiredVolumes() {
|
|||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
if util.IsMultiAttachAllowed(volumeToAttach.VolumeSpec) {
|
if !util.IsMultiAttachAllowed(volumeToAttach.VolumeSpec) {
|
||||||
|
|
||||||
// Don't even try to start an operation if there is already one running for the given volume and node.
|
|
||||||
if rc.attacherDetacher.IsOperationPending(volumeToAttach.VolumeName, "" /* podName */, volumeToAttach.NodeName) {
|
|
||||||
if klog.V(10).Enabled() {
|
|
||||||
klog.Infof("Operation for volume %q is already running for node %q. Can't start attach", volumeToAttach.VolumeName, volumeToAttach.NodeName)
|
|
||||||
}
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
|
|
||||||
} else {
|
|
||||||
|
|
||||||
// Don't even try to start an operation if there is already one running for the given volume
|
|
||||||
if rc.attacherDetacher.IsOperationPending(volumeToAttach.VolumeName, "" /* podName */, "" /* nodeName */) {
|
|
||||||
if klog.V(10).Enabled() {
|
|
||||||
klog.Infof("Operation for volume %q is already running. Can't start attach for %q", volumeToAttach.VolumeName, volumeToAttach.NodeName)
|
|
||||||
}
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
|
|
||||||
nodes := rc.actualStateOfWorld.GetNodesForAttachedVolume(volumeToAttach.VolumeName)
|
nodes := rc.actualStateOfWorld.GetNodesForAttachedVolume(volumeToAttach.VolumeName)
|
||||||
if len(nodes) > 0 {
|
if len(nodes) > 0 {
|
||||||
if !volumeToAttach.MultiAttachErrorReported {
|
if !volumeToAttach.MultiAttachErrorReported {
|
||||||
@ -263,7 +290,6 @@ func (rc *reconciler) attachDesiredVolumes() {
|
|||||||
}
|
}
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// Volume/Node doesn't exist, spawn a goroutine to attach it
|
// Volume/Node doesn't exist, spawn a goroutine to attach it
|
||||||
|
@ -347,7 +347,7 @@ func Test_Run_Negative_OneDesiredVolumeAttachThenDetachWithUnmountedVolumeUpdate
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Creates a volume with accessMode ReadWriteMany
|
// Creates a volume with accessMode ReadWriteMany
|
||||||
// Populates desiredStateOfWorld cache with two ode/volume/pod tuples pointing to the created volume
|
// Populates desiredStateOfWorld cache with two node/volume/pod tuples pointing to the created volume
|
||||||
// Calls Run()
|
// Calls Run()
|
||||||
// Verifies there are two attach calls and no detach calls.
|
// Verifies there are two attach calls and no detach calls.
|
||||||
// Deletes the first node/volume/pod tuple from desiredStateOfWorld cache without first marking the node/volume as unmounted.
|
// Deletes the first node/volume/pod tuple from desiredStateOfWorld cache without first marking the node/volume as unmounted.
|
||||||
@ -536,7 +536,7 @@ func Test_Run_OneVolumeAttachAndDetachMultipleNodesWithReadWriteOnce(t *testing.
|
|||||||
// Creates a volume with accessMode ReadWriteOnce
|
// Creates a volume with accessMode ReadWriteOnce
|
||||||
// First create a pod which will try to attach the volume to the a node named "uncertain-node". The attach call for this node will
|
// First create a pod which will try to attach the volume to the a node named "uncertain-node". The attach call for this node will
|
||||||
// fail for timeout, but the volume will be actually attached to the node after the call.
|
// fail for timeout, but the volume will be actually attached to the node after the call.
|
||||||
// Secondly, delete the this pod.
|
// Secondly, delete this pod.
|
||||||
// Lastly, create a pod scheduled to a normal node which will trigger attach volume to the node. The attach should return successfully.
|
// Lastly, create a pod scheduled to a normal node which will trigger attach volume to the node. The attach should return successfully.
|
||||||
func Test_Run_OneVolumeAttachAndDetachUncertainNodesWithReadWriteOnce(t *testing.T) {
|
func Test_Run_OneVolumeAttachAndDetachUncertainNodesWithReadWriteOnce(t *testing.T) {
|
||||||
// Arrange
|
// Arrange
|
||||||
|
@ -26,6 +26,7 @@ go_library(
|
|||||||
"//staging/src/k8s.io/apimachinery/pkg/api/resource:go_default_library",
|
"//staging/src/k8s.io/apimachinery/pkg/api/resource:go_default_library",
|
||||||
"//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
|
"//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
|
||||||
"//staging/src/k8s.io/apimachinery/pkg/types:go_default_library",
|
"//staging/src/k8s.io/apimachinery/pkg/types:go_default_library",
|
||||||
|
"//staging/src/k8s.io/apimachinery/pkg/util/sets:go_default_library",
|
||||||
"//staging/src/k8s.io/apimachinery/pkg/util/uuid:go_default_library",
|
"//staging/src/k8s.io/apimachinery/pkg/util/uuid:go_default_library",
|
||||||
"//staging/src/k8s.io/apimachinery/pkg/util/wait:go_default_library",
|
"//staging/src/k8s.io/apimachinery/pkg/util/wait:go_default_library",
|
||||||
"//staging/src/k8s.io/client-go/informers:go_default_library",
|
"//staging/src/k8s.io/client-go/informers:go_default_library",
|
||||||
|
@ -28,6 +28,7 @@ import (
|
|||||||
"time"
|
"time"
|
||||||
|
|
||||||
"k8s.io/mount-utils"
|
"k8s.io/mount-utils"
|
||||||
|
"k8s.io/apimachinery/pkg/util/sets"
|
||||||
"k8s.io/utils/exec"
|
"k8s.io/utils/exec"
|
||||||
testingexec "k8s.io/utils/exec/testing"
|
testingexec "k8s.io/utils/exec/testing"
|
||||||
utilstrings "k8s.io/utils/strings"
|
utilstrings "k8s.io/utils/strings"
|
||||||
@ -426,7 +427,7 @@ func (plugin *FakeVolumePlugin) getFakeVolume(list *[]*FakeVolume) *FakeVolume {
|
|||||||
WaitForAttachHook: plugin.WaitForAttachHook,
|
WaitForAttachHook: plugin.WaitForAttachHook,
|
||||||
UnmountDeviceHook: plugin.UnmountDeviceHook,
|
UnmountDeviceHook: plugin.UnmountDeviceHook,
|
||||||
}
|
}
|
||||||
volume.VolumesAttached = make(map[string]types.NodeName)
|
volume.VolumesAttached = make(map[string]sets.String)
|
||||||
volume.DeviceMountState = make(map[string]string)
|
volume.DeviceMountState = make(map[string]string)
|
||||||
volume.VolumeMountState = make(map[string]string)
|
volume.VolumeMountState = make(map[string]string)
|
||||||
*list = append(*list, volume)
|
*list = append(*list, volume)
|
||||||
@ -835,7 +836,7 @@ type FakeVolume struct {
|
|||||||
VolName string
|
VolName string
|
||||||
Plugin *FakeVolumePlugin
|
Plugin *FakeVolumePlugin
|
||||||
MetricsNil
|
MetricsNil
|
||||||
VolumesAttached map[string]types.NodeName
|
VolumesAttached map[string]sets.String
|
||||||
DeviceMountState map[string]string
|
DeviceMountState map[string]string
|
||||||
VolumeMountState map[string]string
|
VolumeMountState map[string]string
|
||||||
|
|
||||||
@ -1154,11 +1155,12 @@ func (fv *FakeVolume) Attach(spec *Spec, nodeName types.NodeName) (string, error
|
|||||||
fv.Lock()
|
fv.Lock()
|
||||||
defer fv.Unlock()
|
defer fv.Unlock()
|
||||||
fv.AttachCallCount++
|
fv.AttachCallCount++
|
||||||
|
|
||||||
volumeName, err := getUniqueVolumeName(spec)
|
volumeName, err := getUniqueVolumeName(spec)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return "", err
|
return "", err
|
||||||
}
|
}
|
||||||
volumeNode, exist := fv.VolumesAttached[volumeName]
|
volumeNodes, exist := fv.VolumesAttached[volumeName]
|
||||||
if exist {
|
if exist {
|
||||||
if nodeName == UncertainAttachNode {
|
if nodeName == UncertainAttachNode {
|
||||||
return "/dev/vdb-test", nil
|
return "/dev/vdb-test", nil
|
||||||
@ -1168,13 +1170,14 @@ func (fv *FakeVolume) Attach(spec *Spec, nodeName types.NodeName) (string, error
|
|||||||
if nodeName == TimeoutAttachNode {
|
if nodeName == TimeoutAttachNode {
|
||||||
return "", fmt.Errorf("Timed out to attach volume %q to node %q", volumeName, nodeName)
|
return "", fmt.Errorf("Timed out to attach volume %q to node %q", volumeName, nodeName)
|
||||||
}
|
}
|
||||||
if volumeNode == nodeName || volumeNode == MultiAttachNode || nodeName == MultiAttachNode {
|
if volumeNodes.Has(string(nodeName)) || volumeNodes.Has(MultiAttachNode) || nodeName == MultiAttachNode {
|
||||||
|
volumeNodes.Insert(string(nodeName))
|
||||||
return "/dev/vdb-test", nil
|
return "/dev/vdb-test", nil
|
||||||
}
|
}
|
||||||
return "", fmt.Errorf("volume %q trying to attach to node %q is already attached to node %q", volumeName, nodeName, volumeNode)
|
return "", fmt.Errorf("volume %q trying to attach to node %q is already attached to node %q", volumeName, nodeName, volumeNodes)
|
||||||
}
|
}
|
||||||
|
|
||||||
fv.VolumesAttached[volumeName] = nodeName
|
fv.VolumesAttached[volumeName] = sets.NewString(string(nodeName))
|
||||||
if nodeName == UncertainAttachNode || nodeName == TimeoutAttachNode {
|
if nodeName == UncertainAttachNode || nodeName == TimeoutAttachNode {
|
||||||
return "", fmt.Errorf("Timed out to attach volume %q to node %q", volumeName, nodeName)
|
return "", fmt.Errorf("Timed out to attach volume %q to node %q", volumeName, nodeName)
|
||||||
}
|
}
|
||||||
@ -1272,10 +1275,18 @@ func (fv *FakeVolume) Detach(volumeName string, nodeName types.NodeName) error {
|
|||||||
fv.Lock()
|
fv.Lock()
|
||||||
defer fv.Unlock()
|
defer fv.Unlock()
|
||||||
fv.DetachCallCount++
|
fv.DetachCallCount++
|
||||||
if _, exist := fv.VolumesAttached[volumeName]; !exist {
|
|
||||||
return fmt.Errorf("Trying to detach volume %q that is not attached to the node %q", volumeName, nodeName)
|
node := string(nodeName)
|
||||||
|
volumeNodes, exist := fv.VolumesAttached[volumeName]
|
||||||
|
if !exist || !volumeNodes.Has(node) {
|
||||||
|
return fmt.Errorf("Trying to detach volume %q that is not attached to the node %q", volumeName, node)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
volumeNodes.Delete(node)
|
||||||
|
if volumeNodes.Len() == 0 {
|
||||||
delete(fv.VolumesAttached, volumeName)
|
delete(fv.VolumesAttached, volumeName)
|
||||||
|
}
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
Loading…
Reference in New Issue
Block a user