Parallelize attach operations across different nodes for volumes that allow multi-attach

This commit is contained in:
Cheng Xing 2020-01-14 10:56:58 -08:00
parent fe9073b8c1
commit ef3d66b98b
9 changed files with 798 additions and 296 deletions

View File

@ -16,7 +16,7 @@ go_library(
"//pkg/controller/volume/attachdetach/statusupdater:go_default_library", "//pkg/controller/volume/attachdetach/statusupdater:go_default_library",
"//pkg/kubelet/events:go_default_library", "//pkg/kubelet/events:go_default_library",
"//pkg/util/goroutinemap/exponentialbackoff:go_default_library", "//pkg/util/goroutinemap/exponentialbackoff:go_default_library",
"//pkg/volume:go_default_library", "//pkg/volume/util:go_default_library",
"//pkg/volume/util/operationexecutor:go_default_library", "//pkg/volume/util/operationexecutor:go_default_library",
"//staging/src/k8s.io/api/core/v1:go_default_library", "//staging/src/k8s.io/api/core/v1:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/types:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/types:go_default_library",

View File

@ -24,7 +24,7 @@ import (
"strings" "strings"
"time" "time"
"k8s.io/api/core/v1" v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/wait" "k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/tools/record" "k8s.io/client-go/tools/record"
@ -34,7 +34,7 @@ import (
"k8s.io/kubernetes/pkg/controller/volume/attachdetach/statusupdater" "k8s.io/kubernetes/pkg/controller/volume/attachdetach/statusupdater"
kevents "k8s.io/kubernetes/pkg/kubelet/events" kevents "k8s.io/kubernetes/pkg/kubelet/events"
"k8s.io/kubernetes/pkg/util/goroutinemap/exponentialbackoff" "k8s.io/kubernetes/pkg/util/goroutinemap/exponentialbackoff"
"k8s.io/kubernetes/pkg/volume" "k8s.io/kubernetes/pkg/volume/util"
"k8s.io/kubernetes/pkg/volume/util/operationexecutor" "k8s.io/kubernetes/pkg/volume/util/operationexecutor"
) )
@ -134,42 +134,6 @@ func (rc *reconciler) syncStates() {
rc.attacherDetacher.VerifyVolumesAreAttached(volumesPerNode, rc.actualStateOfWorld) rc.attacherDetacher.VerifyVolumesAreAttached(volumesPerNode, rc.actualStateOfWorld)
} }
// isMultiAttachForbidden checks if attaching this volume to multiple nodes is definitely not allowed/possible.
// In its current form, this function can only reliably say for which volumes it's definitely forbidden. If it returns
// false, it is not guaranteed that multi-attach is actually supported by the volume type and we must rely on the
// attacher to fail fast in such cases.
// Please see https://github.com/kubernetes/kubernetes/issues/40669 and https://github.com/kubernetes/kubernetes/pull/40148#discussion_r98055047
func (rc *reconciler) isMultiAttachForbidden(volumeSpec *volume.Spec) bool {
if volumeSpec.Volume != nil {
// Check for volume types which are known to fail slow or cause trouble when trying to multi-attach
if volumeSpec.Volume.AzureDisk != nil ||
volumeSpec.Volume.Cinder != nil {
return true
}
}
// Only if this volume is a persistent volume, we have reliable information on whether it's allowed or not to
// multi-attach. We trust in the individual volume implementations to not allow unsupported access modes
if volumeSpec.PersistentVolume != nil {
// Check for persistent volume types which do not fail when trying to multi-attach
if len(volumeSpec.PersistentVolume.Spec.AccessModes) == 0 {
// No access mode specified so we don't know for sure. Let the attacher fail if needed
return false
}
// check if this volume is allowed to be attached to multiple PODs/nodes, if yes, return false
for _, accessMode := range volumeSpec.PersistentVolume.Spec.AccessModes {
if accessMode == v1.ReadWriteMany || accessMode == v1.ReadOnlyMany {
return false
}
}
return true
}
// we don't know if it's supported or not and let the attacher fail later in cases it's not supported
return false
}
func (rc *reconciler) reconcile() { func (rc *reconciler) reconcile() {
// Detaches are triggered before attaches so that volumes referenced by // Detaches are triggered before attaches so that volumes referenced by
// pods that are rescheduled to a different node are detached first. // pods that are rescheduled to a different node are detached first.
@ -178,14 +142,24 @@ func (rc *reconciler) reconcile() {
for _, attachedVolume := range rc.actualStateOfWorld.GetAttachedVolumes() { for _, attachedVolume := range rc.actualStateOfWorld.GetAttachedVolumes() {
if !rc.desiredStateOfWorld.VolumeExists( if !rc.desiredStateOfWorld.VolumeExists(
attachedVolume.VolumeName, attachedVolume.NodeName) { attachedVolume.VolumeName, attachedVolume.NodeName) {
// Don't even try to start an operation if there is already one running // Check whether there already exist an operation pending, and don't even
// try to start an operation if there is already one running.
// This check must be done before we do any other checks, as otherwise the other checks // This check must be done before we do any other checks, as otherwise the other checks
// may pass while at the same time the volume leaves the pending state, resulting in // may pass while at the same time the volume leaves the pending state, resulting in
// double detach attempts // double detach attempts
if rc.attacherDetacher.IsOperationPending(attachedVolume.VolumeName, "") { // The operation key format is different depending on whether the volume
klog.V(10).Infof("Operation for volume %q is already running. Can't start detach for %q", attachedVolume.VolumeName, attachedVolume.NodeName) // allows multi attach across different nodes.
if util.IsMultiAttachAllowed(attachedVolume.VolumeSpec) {
if rc.attacherDetacher.IsOperationPending(attachedVolume.VolumeName, "" /* podName */, attachedVolume.NodeName) {
klog.V(10).Infof("Operation for volume %q is already running for node %q. Can't start detach", attachedVolume.VolumeName, attachedVolume.NodeName)
continue continue
} }
} else {
if rc.attacherDetacher.IsOperationPending(attachedVolume.VolumeName, "" /* podName */, "" /* nodeName */) {
klog.V(10).Infof("Operation for volume %q is already running in the cluster. Can't start detach for %q", attachedVolume.VolumeName, attachedVolume.NodeName)
continue
}
}
// Set the detach request time // Set the detach request time
elapsedTime, err := rc.actualStateOfWorld.SetDetachRequestTime(attachedVolume.VolumeName, attachedVolume.NodeName) elapsedTime, err := rc.actualStateOfWorld.SetDetachRequestTime(attachedVolume.VolumeName, attachedVolume.NodeName)
@ -260,15 +234,27 @@ func (rc *reconciler) attachDesiredVolumes() {
rc.actualStateOfWorld.ResetDetachRequestTime(volumeToAttach.VolumeName, volumeToAttach.NodeName) rc.actualStateOfWorld.ResetDetachRequestTime(volumeToAttach.VolumeName, volumeToAttach.NodeName)
continue continue
} }
// Don't even try to start an operation if there is already one running
if rc.attacherDetacher.IsOperationPending(volumeToAttach.VolumeName, "") { if util.IsMultiAttachAllowed(volumeToAttach.VolumeSpec) {
// Don't even try to start an operation if there is already one running for the given volume and node.
if rc.attacherDetacher.IsOperationPending(volumeToAttach.VolumeName, "" /* podName */, volumeToAttach.NodeName) {
if klog.V(10) {
klog.Infof("Operation for volume %q is already running for node %q. Can't start attach", volumeToAttach.VolumeName, volumeToAttach.NodeName)
}
continue
}
} else {
// Don't even try to start an operation if there is already one running for the given volume
if rc.attacherDetacher.IsOperationPending(volumeToAttach.VolumeName, "" /* podName */, "" /* nodeName */) {
if klog.V(10) { if klog.V(10) {
klog.Infof("Operation for volume %q is already running. Can't start attach for %q", volumeToAttach.VolumeName, volumeToAttach.NodeName) klog.Infof("Operation for volume %q is already running. Can't start attach for %q", volumeToAttach.VolumeName, volumeToAttach.NodeName)
} }
continue continue
} }
if rc.isMultiAttachForbidden(volumeToAttach.VolumeSpec) {
nodes := rc.actualStateOfWorld.GetNodesForAttachedVolume(volumeToAttach.VolumeName) nodes := rc.actualStateOfWorld.GetNodesForAttachedVolume(volumeToAttach.VolumeName)
if len(nodes) > 0 { if len(nodes) > 0 {
if !volumeToAttach.MultiAttachErrorReported { if !volumeToAttach.MultiAttachErrorReported {
@ -277,6 +263,7 @@ func (rc *reconciler) attachDesiredVolumes() {
} }
continue continue
} }
} }
// Volume/Node doesn't exist, spawn a goroutine to attach it // Volume/Node doesn't exist, spawn a goroutine to attach it

View File

@ -295,7 +295,7 @@ func (rc *reconciler) unmountDetachDevices() {
for _, attachedVolume := range rc.actualStateOfWorld.GetUnmountedVolumes() { for _, attachedVolume := range rc.actualStateOfWorld.GetUnmountedVolumes() {
// Check IsOperationPending to avoid marking a volume as detached if it's in the process of mounting. // Check IsOperationPending to avoid marking a volume as detached if it's in the process of mounting.
if !rc.desiredStateOfWorld.VolumeExists(attachedVolume.VolumeName) && if !rc.desiredStateOfWorld.VolumeExists(attachedVolume.VolumeName) &&
!rc.operationExecutor.IsOperationPending(attachedVolume.VolumeName, nestedpendingoperations.EmptyUniquePodName) { !rc.operationExecutor.IsOperationPending(attachedVolume.VolumeName, nestedpendingoperations.EmptyUniquePodName, nestedpendingoperations.EmptyNodeName) {
if attachedVolume.DeviceMayBeMounted() { if attachedVolume.DeviceMayBeMounted() {
// Volume is globally mounted to device, unmount it // Volume is globally mounted to device, unmount it
klog.V(5).Infof(attachedVolume.GenerateMsgDetailed("Starting operationExecutor.UnmountDevice", "")) klog.V(5).Infof(attachedVolume.GenerateMsgDetailed("Starting operationExecutor.UnmountDevice", ""))
@ -423,7 +423,7 @@ func (rc *reconciler) syncStates() {
continue continue
} }
// There is no pod that uses the volume. // There is no pod that uses the volume.
if rc.operationExecutor.IsOperationPending(reconstructedVolume.volumeName, nestedpendingoperations.EmptyUniquePodName) { if rc.operationExecutor.IsOperationPending(reconstructedVolume.volumeName, nestedpendingoperations.EmptyUniquePodName, nestedpendingoperations.EmptyNodeName) {
klog.Warning("Volume is in pending operation, skip cleaning up mounts") klog.Warning("Volume is in pending operation, skip cleaning up mounts")
} }
klog.V(2).Infof( klog.V(2).Infof(

View File

@ -14,6 +14,7 @@ go_library(
"//pkg/util/goroutinemap/exponentialbackoff:go_default_library", "//pkg/util/goroutinemap/exponentialbackoff:go_default_library",
"//pkg/volume/util/types:go_default_library", "//pkg/volume/util/types:go_default_library",
"//staging/src/k8s.io/api/core/v1:go_default_library", "//staging/src/k8s.io/api/core/v1:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/types:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/runtime:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/util/runtime:go_default_library",
"//vendor/k8s.io/klog:go_default_library", "//vendor/k8s.io/klog:go_default_library",
], ],
@ -27,6 +28,7 @@ go_test(
"//pkg/util/goroutinemap/exponentialbackoff:go_default_library", "//pkg/util/goroutinemap/exponentialbackoff:go_default_library",
"//pkg/volume/util/types:go_default_library", "//pkg/volume/util/types:go_default_library",
"//staging/src/k8s.io/api/core/v1:go_default_library", "//staging/src/k8s.io/api/core/v1:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/types:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/wait:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/util/wait:go_default_library",
], ],
) )

View File

@ -29,45 +29,83 @@ import (
"sync" "sync"
"k8s.io/api/core/v1" "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/types"
k8sRuntime "k8s.io/apimachinery/pkg/util/runtime" k8sRuntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/klog" "k8s.io/klog"
"k8s.io/kubernetes/pkg/util/goroutinemap/exponentialbackoff" "k8s.io/kubernetes/pkg/util/goroutinemap/exponentialbackoff"
"k8s.io/kubernetes/pkg/volume/util/types" volumetypes "k8s.io/kubernetes/pkg/volume/util/types"
) )
const ( const (
// EmptyUniquePodName is a UniquePodName for empty string. // EmptyUniquePodName is a UniquePodName for empty string.
EmptyUniquePodName types.UniquePodName = types.UniquePodName("") EmptyUniquePodName volumetypes.UniquePodName = volumetypes.UniquePodName("")
// EmptyUniqueVolumeName is a UniqueVolumeName for empty string // EmptyUniqueVolumeName is a UniqueVolumeName for empty string
EmptyUniqueVolumeName v1.UniqueVolumeName = v1.UniqueVolumeName("") EmptyUniqueVolumeName v1.UniqueVolumeName = v1.UniqueVolumeName("")
// EmptyNodeName is a NodeName for empty string
EmptyNodeName types.NodeName = types.NodeName("")
) )
// NestedPendingOperations defines the supported set of operations. // NestedPendingOperations defines the supported set of operations.
type NestedPendingOperations interface { type NestedPendingOperations interface {
// Run adds the concatenation of volumeName and podName to the list of
// running operations and spawns a new go routine to execute operationFunc. // Run adds the concatenation of volumeName, podName, and nodeName to the list
// If an operation with the same volumeName, same or empty podName // of running operations and spawns a new go routine to run
// and same operationName exits, an AlreadyExists or ExponentialBackoff // generatedOperations.
// error is returned. If an operation with same volumeName and podName
// has ExponentialBackoff error but operationName is different, exponential // volumeName, podName, and nodeName collectively form the operation key.
// backoff is reset and operation is allowed to proceed. // The following forms of operation keys are supported (two keys are designed
// This enables multiple operations to execute in parallel for the same // to be "matched" if we want to serialize their operations):
// volumeName as long as they have different podName. // - volumeName empty, podName and nodeName could be anything
// Once the operation is complete, the go routine is terminated and the // This key does not match with any keys.
// concatenation of volumeName and podName is removed from the list of // - volumeName exists, podName empty, nodeName empty
// executing operations allowing a new operation to be started with the // This key matches all other keys with the same volumeName.
// volumeName without error. // - volumeName exists, podName exists, nodeName empty
Run(volumeName v1.UniqueVolumeName, podName types.UniquePodName, generatedOperations types.GeneratedOperations) error // This key matches with:
// - the same volumeName and podName
// - the same volumeName, but empty podName
// - volumeName exists, podName empty, nodeName exists
// This key matches with:
// - the same volumeName and nodeName
// - the same volumeName but empty nodeName
// If there is no operation with a matching key, the operation is allowed to
// proceed.
// If an operation with a matching key exists and the previous operation is
// running, an AlreadyExists error is returned.
// If an operation with a matching key exists and the previous operation
// failed:
// - If the previous operation has the same
// generatedOperations.operationName:
// - If the full exponential backoff period is satisfied, the operation is
// allowed to proceed.
// - Otherwise, an ExponentialBackoff error is returned.
// - Otherwise, exponential backoff is reset and operation is allowed to
// proceed.
// Once the operation is complete, the go routine is terminated. If the
// operation succeeded, its corresponding key is removed from the list of
// executing operations, allowing a new operation to be started with the key
// without error. If it failed, the key remains and the exponential
// backoff status is updated.
Run(
volumeName v1.UniqueVolumeName,
podName volumetypes.UniquePodName,
nodeName types.NodeName,
generatedOperations volumetypes.GeneratedOperations) error
// Wait blocks until all operations are completed. This is typically // Wait blocks until all operations are completed. This is typically
// necessary during tests - the test should wait until all operations finish // necessary during tests - the test should wait until all operations finish
// and evaluate results after that. // and evaluate results after that.
Wait() Wait()
// IsOperationPending returns true if an operation for the given volumeName and podName is pending, // IsOperationPending returns true if an operation for the given volumeName
// otherwise it returns false // and one of podName or nodeName is pending, otherwise it returns false
IsOperationPending(volumeName v1.UniqueVolumeName, podName types.UniquePodName) bool IsOperationPending(
volumeName v1.UniqueVolumeName,
podName volumetypes.UniquePodName,
nodeName types.NodeName) bool
} }
// NewNestedPendingOperations returns a new instance of NestedPendingOperations. // NewNestedPendingOperations returns a new instance of NestedPendingOperations.
@ -88,8 +126,7 @@ type nestedPendingOperations struct {
} }
type operation struct { type operation struct {
volumeName v1.UniqueVolumeName key operationKey
podName types.UniquePodName
operationName string operationName string
operationPending bool operationPending bool
expBackoff exponentialbackoff.ExponentialBackoff expBackoff exponentialbackoff.ExponentialBackoff
@ -97,22 +134,24 @@ type operation struct {
func (grm *nestedPendingOperations) Run( func (grm *nestedPendingOperations) Run(
volumeName v1.UniqueVolumeName, volumeName v1.UniqueVolumeName,
podName types.UniquePodName, podName volumetypes.UniquePodName,
generatedOperations types.GeneratedOperations) error { nodeName types.NodeName,
generatedOperations volumetypes.GeneratedOperations) error {
grm.lock.Lock() grm.lock.Lock()
defer grm.lock.Unlock() defer grm.lock.Unlock()
opExists, previousOpIndex := grm.isOperationExists(volumeName, podName)
opKey := operationKey{volumeName, podName, nodeName}
opExists, previousOpIndex := grm.isOperationExists(opKey)
if opExists { if opExists {
previousOp := grm.operations[previousOpIndex] previousOp := grm.operations[previousOpIndex]
// Operation already exists // Operation already exists
if previousOp.operationPending { if previousOp.operationPending {
// Operation is pending // Operation is pending
operationKey := getOperationKey(volumeName, podName) return NewAlreadyExistsError(opKey)
return NewAlreadyExistsError(operationKey)
} }
operationKey := getOperationKey(volumeName, podName) backOffErr := previousOp.expBackoff.SafeToRetry(fmt.Sprintf("%+v", opKey))
backOffErr := previousOp.expBackoff.SafeToRetry(operationKey)
if backOffErr != nil { if backOffErr != nil {
if previousOp.operationName == generatedOperations.OperationName { if previousOp.operationName == generatedOperations.OperationName {
return backOffErr return backOffErr
@ -124,15 +163,13 @@ func (grm *nestedPendingOperations) Run(
// Update existing operation to mark as pending. // Update existing operation to mark as pending.
grm.operations[previousOpIndex].operationPending = true grm.operations[previousOpIndex].operationPending = true
grm.operations[previousOpIndex].volumeName = volumeName grm.operations[previousOpIndex].key = opKey
grm.operations[previousOpIndex].podName = podName
} else { } else {
// Create a new operation // Create a new operation
grm.operations = append(grm.operations, grm.operations = append(grm.operations,
operation{ operation{
key: opKey,
operationPending: true, operationPending: true,
volumeName: volumeName,
podName: podName,
operationName: generatedOperations.OperationName, operationName: generatedOperations.OperationName,
expBackoff: exponentialbackoff.ExponentialBackoff{}, expBackoff: exponentialbackoff.ExponentialBackoff{},
}) })
@ -142,7 +179,7 @@ func (grm *nestedPendingOperations) Run(
// Handle unhandled panics (very unlikely) // Handle unhandled panics (very unlikely)
defer k8sRuntime.HandleCrash() defer k8sRuntime.HandleCrash()
// Handle completion of and error, if any, from operationFunc() // Handle completion of and error, if any, from operationFunc()
defer grm.operationComplete(volumeName, podName, &detailedErr) defer grm.operationComplete(opKey, &detailedErr)
return generatedOperations.Run() return generatedOperations.Run()
}() }()
@ -151,12 +188,14 @@ func (grm *nestedPendingOperations) Run(
func (grm *nestedPendingOperations) IsOperationPending( func (grm *nestedPendingOperations) IsOperationPending(
volumeName v1.UniqueVolumeName, volumeName v1.UniqueVolumeName,
podName types.UniquePodName) bool { podName volumetypes.UniquePodName,
nodeName types.NodeName) bool {
grm.lock.RLock() grm.lock.RLock()
defer grm.lock.RUnlock() defer grm.lock.RUnlock()
exist, previousOpIndex := grm.isOperationExists(volumeName, podName) opKey := operationKey{volumeName, podName, nodeName}
exist, previousOpIndex := grm.isOperationExists(opKey)
if exist && grm.operations[previousOpIndex].operationPending { if exist && grm.operations[previousOpIndex].operationPending {
return true return true
} }
@ -164,71 +203,69 @@ func (grm *nestedPendingOperations) IsOperationPending(
} }
// This is an internal function and caller should acquire and release the lock // This is an internal function and caller should acquire and release the lock
func (grm *nestedPendingOperations) isOperationExists( func (grm *nestedPendingOperations) isOperationExists(key operationKey) (bool, int) {
volumeName v1.UniqueVolumeName,
podName types.UniquePodName) (bool, int) {
// If volumeName is empty, operation can be executed concurrently // If volumeName is empty, operation can be executed concurrently
if volumeName == EmptyUniqueVolumeName { if key.volumeName == EmptyUniqueVolumeName {
return false, -1 return false, -1
} }
for previousOpIndex, previousOp := range grm.operations { for previousOpIndex, previousOp := range grm.operations {
if previousOp.volumeName != volumeName { volumeNameMatch := previousOp.key.volumeName == key.volumeName
// No match, keep searching
continue
}
if previousOp.podName != EmptyUniquePodName && podNameMatch := previousOp.key.podName == EmptyUniquePodName ||
podName != EmptyUniquePodName && key.podName == EmptyUniquePodName ||
previousOp.podName != podName { previousOp.key.podName == key.podName
// No match, keep searching
continue
}
// Match nodeNameMatch := previousOp.key.nodeName == EmptyNodeName ||
key.nodeName == EmptyNodeName ||
previousOp.key.nodeName == key.nodeName
if volumeNameMatch && podNameMatch && nodeNameMatch {
return true, previousOpIndex return true, previousOpIndex
} }
}
return false, -1 return false, -1
} }
func (grm *nestedPendingOperations) getOperation( func (grm *nestedPendingOperations) getOperation(key operationKey) (uint, error) {
volumeName v1.UniqueVolumeName,
podName types.UniquePodName) (uint, error) {
// Assumes lock has been acquired by caller. // Assumes lock has been acquired by caller.
for i, op := range grm.operations { for i, op := range grm.operations {
if op.volumeName == volumeName && if op.key.volumeName == key.volumeName &&
op.podName == podName { op.key.podName == key.podName &&
op.key.nodeName == key.nodeName {
return uint(i), nil return uint(i), nil
} }
} }
logOperationKey := getOperationKey(volumeName, podName) return 0, fmt.Errorf("Operation %+v not found", key)
return 0, fmt.Errorf("Operation %q not found", logOperationKey)
} }
func (grm *nestedPendingOperations) deleteOperation( func (grm *nestedPendingOperations) deleteOperation(key operationKey) {
// Assumes lock has been acquired by caller. // Assumes lock has been acquired by caller.
volumeName v1.UniqueVolumeName,
podName types.UniquePodName) {
opIndex := -1 opIndex := -1
for i, op := range grm.operations { for i, op := range grm.operations {
if op.volumeName == volumeName && if op.key.volumeName == key.volumeName &&
op.podName == podName { op.key.podName == key.podName &&
op.key.nodeName == key.nodeName {
opIndex = i opIndex = i
break break
} }
} }
if opIndex < 0 {
return
}
// Delete index without preserving order // Delete index without preserving order
grm.operations[opIndex] = grm.operations[len(grm.operations)-1] grm.operations[opIndex] = grm.operations[len(grm.operations)-1]
grm.operations = grm.operations[:len(grm.operations)-1] grm.operations = grm.operations[:len(grm.operations)-1]
} }
func (grm *nestedPendingOperations) operationComplete( func (grm *nestedPendingOperations) operationComplete(key operationKey, err *error) {
volumeName v1.UniqueVolumeName, podName types.UniquePodName, err *error) {
// Defer operations are executed in Last-In is First-Out order. In this case // Defer operations are executed in Last-In is First-Out order. In this case
// the lock is acquired first when operationCompletes begins, and is // the lock is acquired first when operationCompletes begins, and is
// released when the method finishes, after the lock is released cond is // released when the method finishes, after the lock is released cond is
@ -239,24 +276,20 @@ func (grm *nestedPendingOperations) operationComplete(
if *err == nil || !grm.exponentialBackOffOnError { if *err == nil || !grm.exponentialBackOffOnError {
// Operation completed without error, or exponentialBackOffOnError disabled // Operation completed without error, or exponentialBackOffOnError disabled
grm.deleteOperation(volumeName, podName) grm.deleteOperation(key)
if *err != nil { if *err != nil {
// Log error // Log error
logOperationKey := getOperationKey(volumeName, podName) klog.Errorf("operation %+v failed with: %v", key, *err)
klog.Errorf("operation %s failed with: %v",
logOperationKey,
*err)
} }
return return
} }
// Operation completed with error and exponentialBackOffOnError Enabled // Operation completed with error and exponentialBackOffOnError Enabled
existingOpIndex, getOpErr := grm.getOperation(volumeName, podName) existingOpIndex, getOpErr := grm.getOperation(key)
if getOpErr != nil { if getOpErr != nil {
// Failed to find existing operation // Failed to find existing operation
logOperationKey := getOperationKey(volumeName, podName) klog.Errorf("Operation %+v completed. error: %v. exponentialBackOffOnError is enabled, but failed to get operation to update.",
klog.Errorf("Operation %s completed. error: %v. exponentialBackOffOnError is enabled, but failed to get operation to update.", key,
logOperationKey,
*err) *err)
return return
} }
@ -265,10 +298,8 @@ func (grm *nestedPendingOperations) operationComplete(
grm.operations[existingOpIndex].operationPending = false grm.operations[existingOpIndex].operationPending = false
// Log error // Log error
operationKey :=
getOperationKey(volumeName, podName)
klog.Errorf("%v", grm.operations[existingOpIndex].expBackoff. klog.Errorf("%v", grm.operations[existingOpIndex].expBackoff.
GenerateNoRetriesPermittedMsg(operationKey)) GenerateNoRetriesPermittedMsg(fmt.Sprintf("%+v", key)))
} }
func (grm *nestedPendingOperations) Wait() { func (grm *nestedPendingOperations) Wait() {
@ -280,21 +311,15 @@ func (grm *nestedPendingOperations) Wait() {
} }
} }
func getOperationKey( type operationKey struct {
volumeName v1.UniqueVolumeName, podName types.UniquePodName) string { volumeName v1.UniqueVolumeName
podNameStr := "" podName volumetypes.UniquePodName
if podName != EmptyUniquePodName { nodeName types.NodeName
podNameStr = fmt.Sprintf(" (%q)", podName)
}
return fmt.Sprintf("%q%s",
volumeName,
podNameStr)
} }
// NewAlreadyExistsError returns a new instance of AlreadyExists error. // NewAlreadyExistsError returns a new instance of AlreadyExists error.
func NewAlreadyExistsError(operationKey string) error { func NewAlreadyExistsError(key operationKey) error {
return alreadyExistsError{operationKey} return alreadyExistsError{key}
} }
// IsAlreadyExists returns true if an error returned from // IsAlreadyExists returns true if an error returned from
@ -313,13 +338,13 @@ func IsAlreadyExists(err error) bool {
// new operation can not be started because an operation with the same operation // new operation can not be started because an operation with the same operation
// name is already executing. // name is already executing.
type alreadyExistsError struct { type alreadyExistsError struct {
operationKey string operationKey operationKey
} }
var _ error = alreadyExistsError{} var _ error = alreadyExistsError{}
func (err alreadyExistsError) Error() string { func (err alreadyExistsError) Error() string {
return fmt.Sprintf( return fmt.Sprintf(
"Failed to create operation with name %q. An operation with that name is already executing.", "Failed to create operation with name %+v. An operation with that name is already executing.",
err.operationKey) err.operationKey)
} }

View File

@ -22,9 +22,10 @@ import (
"time" "time"
"k8s.io/api/core/v1" "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/wait" "k8s.io/apimachinery/pkg/util/wait"
"k8s.io/kubernetes/pkg/util/goroutinemap/exponentialbackoff" "k8s.io/kubernetes/pkg/util/goroutinemap/exponentialbackoff"
"k8s.io/kubernetes/pkg/volume/util/types" volumetypes "k8s.io/kubernetes/pkg/volume/util/types"
) )
const ( const (
@ -44,99 +45,95 @@ const (
initialOperationWaitTimeLong time.Duration = 500 * time.Millisecond initialOperationWaitTimeLong time.Duration = 500 * time.Millisecond
) )
func Test_NewGoRoutineMap_Positive_SingleOp(t *testing.T) { func Test_NestedPendingOperations_Positive_SingleOp(t *testing.T) {
// Arrange // Arrange
grm := NewNestedPendingOperations(false /* exponentialBackOffOnError */) grm := NewNestedPendingOperations(false /* exponentialBackOffOnError */)
volumeName := v1.UniqueVolumeName("volume-name") volumeName := v1.UniqueVolumeName("volume-name")
operation := func() (error, error) { return nil, nil }
// Act // Act
err := grm.Run(volumeName, "" /* operationSubName */, types.GeneratedOperations{OperationFunc: operation}) err := grm.Run(volumeName, EmptyUniquePodName, EmptyNodeName, volumetypes.GeneratedOperations{OperationFunc: noopFunc})
// Assert // Assert
if err != nil { if err != nil {
t.Fatalf("NewGoRoutine failed. Expected: <no error> Actual: <%v>", err) t.Fatalf("NestedPendingOperations failed. Expected: <no error> Actual: <%v>", err)
} }
} }
func Test_NewGoRoutineMap_Positive_TwoOps(t *testing.T) { func Test_NestedPendingOperations_Positive_TwoOps(t *testing.T) {
// Arrange // Arrange
grm := NewNestedPendingOperations(false /* exponentialBackOffOnError */) grm := NewNestedPendingOperations(false /* exponentialBackOffOnError */)
volume1Name := v1.UniqueVolumeName("volume1-name") volume1Name := v1.UniqueVolumeName("volume1-name")
volume2Name := v1.UniqueVolumeName("volume2-name") volume2Name := v1.UniqueVolumeName("volume2-name")
operation := func() (error, error) { return nil, nil }
// Act // Act
err1 := grm.Run(volume1Name, "" /* operationSubName */, types.GeneratedOperations{OperationFunc: operation}) err1 := grm.Run(volume1Name, EmptyUniquePodName, EmptyNodeName, volumetypes.GeneratedOperations{OperationFunc: noopFunc})
err2 := grm.Run(volume2Name, "" /* operationSubName */, types.GeneratedOperations{OperationFunc: operation}) err2 := grm.Run(volume2Name, EmptyUniquePodName, EmptyNodeName, volumetypes.GeneratedOperations{OperationFunc: noopFunc})
// Assert // Assert
if err1 != nil { if err1 != nil {
t.Fatalf("NewGoRoutine %q failed. Expected: <no error> Actual: <%v>", volume1Name, err1) t.Fatalf("NestedPendingOperations %q failed. Expected: <no error> Actual: <%v>", volume1Name, err1)
} }
if err2 != nil { if err2 != nil {
t.Fatalf("NewGoRoutine %q failed. Expected: <no error> Actual: <%v>", volume2Name, err2) t.Fatalf("NestedPendingOperations %q failed. Expected: <no error> Actual: <%v>", volume2Name, err2)
} }
} }
func Test_NewGoRoutineMap_Positive_TwoSubOps(t *testing.T) { func Test_NestedPendingOperations_Positive_TwoSubOps(t *testing.T) {
// Arrange // Arrange
grm := NewNestedPendingOperations(false /* exponentialBackOffOnError */) grm := NewNestedPendingOperations(false /* exponentialBackOffOnError */)
volumeName := v1.UniqueVolumeName("volume-name") volumeName := v1.UniqueVolumeName("volume-name")
operation1PodName := types.UniquePodName("operation1-podname") operation1PodName := volumetypes.UniquePodName("operation1-podname")
operation2PodName := types.UniquePodName("operation2-podname") operation2PodName := volumetypes.UniquePodName("operation2-podname")
operation := func() (error, error) { return nil, nil }
// Act // Act
err1 := grm.Run(volumeName, operation1PodName, types.GeneratedOperations{OperationFunc: operation}) err1 := grm.Run(volumeName, operation1PodName, EmptyNodeName, volumetypes.GeneratedOperations{OperationFunc: noopFunc})
err2 := grm.Run(volumeName, operation2PodName, types.GeneratedOperations{OperationFunc: operation}) err2 := grm.Run(volumeName, operation2PodName, EmptyNodeName, volumetypes.GeneratedOperations{OperationFunc: noopFunc})
// Assert // Assert
if err1 != nil { if err1 != nil {
t.Fatalf("NewGoRoutine %q failed. Expected: <no error> Actual: <%v>", operation1PodName, err1) t.Fatalf("NestedPendingOperations %q failed. Expected: <no error> Actual: <%v>", operation1PodName, err1)
} }
if err2 != nil { if err2 != nil {
t.Fatalf("NewGoRoutine %q failed. Expected: <no error> Actual: <%v>", operation2PodName, err2) t.Fatalf("NestedPendingOperations %q failed. Expected: <no error> Actual: <%v>", operation2PodName, err2)
} }
} }
func Test_NewGoRoutineMap_Positive_SingleOpWithExpBackoff(t *testing.T) { func Test_NestedPendingOperations_Positive_SingleOpWithExpBackoff(t *testing.T) {
// Arrange // Arrange
grm := NewNestedPendingOperations(true /* exponentialBackOffOnError */) grm := NewNestedPendingOperations(true /* exponentialBackOffOnError */)
volumeName := v1.UniqueVolumeName("volume-name") volumeName := v1.UniqueVolumeName("volume-name")
operation := func() (error, error) { return nil, nil }
// Act // Act
err := grm.Run(volumeName, "" /* operationSubName */, types.GeneratedOperations{OperationFunc: operation}) err := grm.Run(volumeName, EmptyUniquePodName, EmptyNodeName, volumetypes.GeneratedOperations{OperationFunc: noopFunc})
// Assert // Assert
if err != nil { if err != nil {
t.Fatalf("NewGoRoutine failed. Expected: <no error> Actual: <%v>", err) t.Fatalf("NestedPendingOperations failed. Expected: <no error> Actual: <%v>", err)
} }
} }
func Test_NewGoRoutineMap_Positive_SecondOpAfterFirstCompletes(t *testing.T) { func Test_NestedPendingOperations_Positive_SecondOpAfterFirstCompletes(t *testing.T) {
// Arrange // Arrange
grm := NewNestedPendingOperations(false /* exponentialBackOffOnError */) grm := NewNestedPendingOperations(false /* exponentialBackOffOnError */)
volumeName := v1.UniqueVolumeName("volume-name") volumeName := v1.UniqueVolumeName("volume-name")
operation1DoneCh := make(chan interface{}, 0 /* bufferSize */) operation1DoneCh := make(chan interface{}, 0 /* bufferSize */)
operation1 := generateCallbackFunc(operation1DoneCh) operation1 := generateCallbackFunc(operation1DoneCh)
err1 := grm.Run(volumeName, "" /* operationSubName */, types.GeneratedOperations{OperationFunc: operation1}) err1 := grm.Run(volumeName, EmptyUniquePodName, EmptyNodeName, volumetypes.GeneratedOperations{OperationFunc: operation1})
if err1 != nil { if err1 != nil {
t.Fatalf("NewGoRoutine failed. Expected: <no error> Actual: <%v>", err1) t.Fatalf("NestedPendingOperations failed. Expected: <no error> Actual: <%v>", err1)
} }
operation2 := generateNoopFunc() operation2 := noopFunc
<-operation1DoneCh // Force operation1 to complete <-operation1DoneCh // Force operation1 to complete
// Act // Act
err2 := retryWithExponentialBackOff( err2 := retryWithExponentialBackOff(
time.Duration(initialOperationWaitTimeShort), time.Duration(initialOperationWaitTimeShort),
func() (bool, error) { func() (bool, error) {
err := grm.Run(volumeName, "" /* operationSubName */, types.GeneratedOperations{OperationFunc: operation2}) err := grm.Run(volumeName, EmptyUniquePodName, EmptyNodeName, volumetypes.GeneratedOperations{OperationFunc: operation2})
if err != nil { if err != nil {
t.Logf("Warning: NewGoRoutine failed with %v. Will retry.", err) t.Logf("Warning: NestedPendingOperations failed with %v. Will retry.", err)
return false, nil return false, nil
} }
return true, nil return true, nil
@ -145,30 +142,30 @@ func Test_NewGoRoutineMap_Positive_SecondOpAfterFirstCompletes(t *testing.T) {
// Assert // Assert
if err2 != nil { if err2 != nil {
t.Fatalf("NewGoRoutine failed. Expected: <no error> Actual: <%v>", err2) t.Fatalf("NestedPendingOperations failed. Expected: <no error> Actual: <%v>", err2)
} }
} }
func Test_NewGoRoutineMap_Positive_SecondOpAfterFirstCompletesWithExpBackoff(t *testing.T) { func Test_NestedPendingOperations_Positive_SecondOpAfterFirstCompletesWithExpBackoff(t *testing.T) {
// Arrange // Arrange
grm := NewNestedPendingOperations(true /* exponentialBackOffOnError */) grm := NewNestedPendingOperations(true /* exponentialBackOffOnError */)
volumeName := v1.UniqueVolumeName("volume-name") volumeName := v1.UniqueVolumeName("volume-name")
operation1DoneCh := make(chan interface{}, 0 /* bufferSize */) operation1DoneCh := make(chan interface{}, 0 /* bufferSize */)
operation1 := generateCallbackFunc(operation1DoneCh) operation1 := generateCallbackFunc(operation1DoneCh)
err1 := grm.Run(volumeName, "" /* operationSubName */, types.GeneratedOperations{OperationFunc: operation1}) err1 := grm.Run(volumeName, EmptyUniquePodName, EmptyNodeName, volumetypes.GeneratedOperations{OperationFunc: operation1})
if err1 != nil { if err1 != nil {
t.Fatalf("NewGoRoutine failed. Expected: <no error> Actual: <%v>", err1) t.Fatalf("NestedPendingOperations failed. Expected: <no error> Actual: <%v>", err1)
} }
operation2 := generateNoopFunc() operation2 := noopFunc
<-operation1DoneCh // Force operation1 to complete <-operation1DoneCh // Force operation1 to complete
// Act // Act
err2 := retryWithExponentialBackOff( err2 := retryWithExponentialBackOff(
time.Duration(initialOperationWaitTimeShort), time.Duration(initialOperationWaitTimeShort),
func() (bool, error) { func() (bool, error) {
err := grm.Run(volumeName, "" /* operationSubName */, types.GeneratedOperations{OperationFunc: operation2}) err := grm.Run(volumeName, EmptyUniquePodName, EmptyNodeName, volumetypes.GeneratedOperations{OperationFunc: operation2})
if err != nil { if err != nil {
t.Logf("Warning: NewGoRoutine failed with %v. Will retry.", err) t.Logf("Warning: NestedPendingOperations failed with %v. Will retry.", err)
return false, nil return false, nil
} }
return true, nil return true, nil
@ -177,28 +174,28 @@ func Test_NewGoRoutineMap_Positive_SecondOpAfterFirstCompletesWithExpBackoff(t *
// Assert // Assert
if err2 != nil { if err2 != nil {
t.Fatalf("NewGoRoutine failed. Expected: <no error> Actual: <%v>", err2) t.Fatalf("NestedPendingOperations failed. Expected: <no error> Actual: <%v>", err2)
} }
} }
func Test_NewGoRoutineMap_Positive_SecondOpAfterFirstPanics(t *testing.T) { func Test_NestedPendingOperations_Positive_SecondOpAfterFirstPanics(t *testing.T) {
// Arrange // Arrange
grm := NewNestedPendingOperations(false /* exponentialBackOffOnError */) grm := NewNestedPendingOperations(false /* exponentialBackOffOnError */)
volumeName := v1.UniqueVolumeName("volume-name") volumeName := v1.UniqueVolumeName("volume-name")
operation1 := generatePanicFunc() operation1 := panicFunc
err1 := grm.Run(volumeName, "" /* operationSubName */, types.GeneratedOperations{OperationFunc: operation1}) err1 := grm.Run(volumeName, EmptyUniquePodName, EmptyNodeName, volumetypes.GeneratedOperations{OperationFunc: operation1})
if err1 != nil { if err1 != nil {
t.Fatalf("NewGoRoutine failed. Expected: <no error> Actual: <%v>", err1) t.Fatalf("NestedPendingOperations failed. Expected: <no error> Actual: <%v>", err1)
} }
operation2 := generateNoopFunc() operation2 := noopFunc
// Act // Act
err2 := retryWithExponentialBackOff( err2 := retryWithExponentialBackOff(
time.Duration(initialOperationWaitTimeShort), time.Duration(initialOperationWaitTimeShort),
func() (bool, error) { func() (bool, error) {
err := grm.Run(volumeName, "" /* operationSubName */, types.GeneratedOperations{OperationFunc: operation2}) err := grm.Run(volumeName, EmptyUniquePodName, EmptyNodeName, volumetypes.GeneratedOperations{OperationFunc: operation2})
if err != nil { if err != nil {
t.Logf("Warning: NewGoRoutine failed with %v. Will retry.", err) t.Logf("Warning: NestedPendingOperations failed with %v. Will retry.", err)
return false, nil return false, nil
} }
return true, nil return true, nil
@ -207,28 +204,28 @@ func Test_NewGoRoutineMap_Positive_SecondOpAfterFirstPanics(t *testing.T) {
// Assert // Assert
if err2 != nil { if err2 != nil {
t.Fatalf("NewGoRoutine failed. Expected: <no error> Actual: <%v>", err2) t.Fatalf("NestedPendingOperations failed. Expected: <no error> Actual: <%v>", err2)
} }
} }
func Test_NewGoRoutineMap_Positive_SecondOpAfterFirstPanicsWithExpBackoff(t *testing.T) { func Test_NestedPendingOperations_Positive_SecondOpAfterFirstPanicsWithExpBackoff(t *testing.T) {
// Arrange // Arrange
grm := NewNestedPendingOperations(true /* exponentialBackOffOnError */) grm := NewNestedPendingOperations(true /* exponentialBackOffOnError */)
volumeName := v1.UniqueVolumeName("volume-name") volumeName := v1.UniqueVolumeName("volume-name")
operation1 := generatePanicFunc() operation1 := panicFunc
err1 := grm.Run(volumeName, "" /* operationSubName */, types.GeneratedOperations{OperationFunc: operation1}) err1 := grm.Run(volumeName, EmptyUniquePodName, EmptyNodeName, volumetypes.GeneratedOperations{OperationFunc: operation1})
if err1 != nil { if err1 != nil {
t.Fatalf("NewGoRoutine failed. Expected: <no error> Actual: <%v>", err1) t.Fatalf("NestedPendingOperations failed. Expected: <no error> Actual: <%v>", err1)
} }
operation2 := generateNoopFunc() operation2 := noopFunc
// Act // Act
err2 := retryWithExponentialBackOff( err2 := retryWithExponentialBackOff(
time.Duration(initialOperationWaitTimeLong), // Longer duration to accommodate for backoff time.Duration(initialOperationWaitTimeLong), // Longer duration to accommodate for backoff
func() (bool, error) { func() (bool, error) {
err := grm.Run(volumeName, "" /* operationSubName */, types.GeneratedOperations{OperationFunc: operation2}) err := grm.Run(volumeName, EmptyUniquePodName, EmptyNodeName, volumetypes.GeneratedOperations{OperationFunc: operation2})
if err != nil { if err != nil {
t.Logf("Warning: NewGoRoutine failed with %v. Will retry.", err) t.Logf("Warning: NestedPendingOperations failed with %v. Will retry.", err)
return false, nil return false, nil
} }
return true, nil return true, nil
@ -237,53 +234,54 @@ func Test_NewGoRoutineMap_Positive_SecondOpAfterFirstPanicsWithExpBackoff(t *tes
// Assert // Assert
if err2 != nil { if err2 != nil {
t.Fatalf("NewGoRoutine failed. Expected: <no error> Actual: <%v>", err2) t.Fatalf("NestedPendingOperations failed. Expected: <no error> Actual: <%v>", err2)
} }
} }
func Test_NewGoRoutineMap_Negative_SecondOpBeforeFirstCompletes(t *testing.T) { func Test_NestedPendingOperations_Negative_SecondOpBeforeFirstCompletes(t *testing.T) {
// Arrange // Arrange
grm := NewNestedPendingOperations(false /* exponentialBackOffOnError */) grm := NewNestedPendingOperations(false /* exponentialBackOffOnError */)
volumeName := v1.UniqueVolumeName("volume-name") volumeName := v1.UniqueVolumeName("volume-name")
operation1DoneCh := make(chan interface{}, 0 /* bufferSize */) operation1DoneCh := make(chan interface{}, 0 /* bufferSize */)
operation1 := generateWaitFunc(operation1DoneCh) operation1 := generateWaitFunc(operation1DoneCh)
err1 := grm.Run(volumeName, "" /* operationSubName */, types.GeneratedOperations{OperationFunc: operation1}) err1 := grm.Run(volumeName, EmptyUniquePodName, EmptyNodeName, volumetypes.GeneratedOperations{OperationFunc: operation1})
if err1 != nil { if err1 != nil {
t.Fatalf("NewGoRoutine failed. Expected: <no error> Actual: <%v>", err1) t.Fatalf("NestedPendingOperations failed. Expected: <no error> Actual: <%v>", err1)
} }
operation2 := generateNoopFunc() operation2 := noopFunc
// Act // Act
err2 := grm.Run(volumeName, "" /* operationSubName */, types.GeneratedOperations{OperationFunc: operation2}) err2 := grm.Run(volumeName, EmptyUniquePodName, EmptyNodeName, volumetypes.GeneratedOperations{OperationFunc: operation2})
// Assert // Assert
if err2 == nil { if err2 == nil {
t.Fatalf("NewGoRoutine did not fail. Expected: <Failed to create operation with name \"%s\". An operation with that name already exists.> Actual: <no error>", volumeName) t.Fatalf("NestedPendingOperations did not fail. Expected: <Failed to create operation with name \"%s\". An operation with that name already exists.> Actual: <no error>", volumeName)
} }
if !IsAlreadyExists(err2) { if !IsAlreadyExists(err2) {
t.Fatalf("NewGoRoutine did not return alreadyExistsError, got: %v", err2) t.Fatalf("NestedPendingOperations did not return alreadyExistsError, got: %v", err2)
} }
} }
func Test_NewGoRoutineMap_Negative_SecondThirdOpWithDifferentNames(t *testing.T) { func Test_NestedPendingOperations_Negative_SecondThirdOpWithDifferentNames(t *testing.T) {
// Arrange // Arrange
grm := NewNestedPendingOperations(true /* exponentialBackOffOnError */) grm := NewNestedPendingOperations(true /* exponentialBackOffOnError */)
volumeName := v1.UniqueVolumeName("volume-name") volumeName := v1.UniqueVolumeName("volume-name")
op1Name := "mount_volume" op1Name := "mount_volume"
operation1 := generateErrorFunc() operation1 := errorFunc
err1 := grm.Run(volumeName, "" /* operationSubName */, types.GeneratedOperations{OperationFunc: operation1, OperationName: op1Name}) err1 := grm.Run(volumeName, EmptyUniquePodName, EmptyNodeName, volumetypes.GeneratedOperations{OperationFunc: operation1, OperationName: op1Name})
if err1 != nil { if err1 != nil {
t.Fatalf("NewGoRoutine failed. Expected: <no error> Actual: <%v>", err1) t.Fatalf("NestedPendingOperations failed. Expected: <no error> Actual: <%v>", err1)
} }
// Shorter than exponential backoff period, so as to trigger exponential backoff error on second // Shorter than exponential backoff period, so as to trigger exponential backoff error on second
// operation. // operation.
operation2 := generateErrorFunc() operation2 := errorFunc
err2 := retryWithExponentialBackOff( err2 := retryWithExponentialBackOff(
initialOperationWaitTimeShort, initialOperationWaitTimeShort,
func() (bool, error) { func() (bool, error) {
err := grm.Run(volumeName, err := grm.Run(volumeName,
"", /* operationSubName */ EmptyUniquePodName,
types.GeneratedOperations{OperationFunc: operation2, OperationName: op1Name}) EmptyNodeName,
volumetypes.GeneratedOperations{OperationFunc: operation2, OperationName: op1Name})
if exponentialbackoff.IsExponentialBackoff(err) { if exponentialbackoff.IsExponentialBackoff(err) {
return true, nil return true, nil
@ -294,114 +292,114 @@ func Test_NewGoRoutineMap_Negative_SecondThirdOpWithDifferentNames(t *testing.T)
// Assert // Assert
if err2 != nil { if err2 != nil {
t.Fatalf("Expected NewGoRoutine to fail with exponential backoff for operationKey : %s and operationName : %s", volumeName, op1Name) t.Fatalf("Expected NestedPendingOperations to fail with exponential backoff for operationKey : %s and operationName : %s", volumeName, op1Name)
} }
operation3 := generateNoopFunc() operation3 := noopFunc
op3Name := "unmount_volume" op3Name := "unmount_volume"
// Act // Act
err3 := grm.Run(volumeName, "" /*pod name*/, types.GeneratedOperations{OperationFunc: operation3, OperationName: op3Name}) err3 := grm.Run(volumeName, EmptyUniquePodName, EmptyNodeName, volumetypes.GeneratedOperations{OperationFunc: operation3, OperationName: op3Name})
if err3 != nil { if err3 != nil {
t.Fatalf("NewGoRoutine failed. Expected <no error> Actual: <%v>", err3) t.Fatalf("NestedPendingOperations failed. Expected <no error> Actual: <%v>", err3)
} }
} }
func Test_NewGoRoutineMap_Negative_SecondSubOpBeforeFirstCompletes2(t *testing.T) { func Test_NestedPendingOperations_Negative_SecondSubOpBeforeFirstCompletes2(t *testing.T) {
// Arrange // Arrange
grm := NewNestedPendingOperations(false /* exponentialBackOffOnError */) grm := NewNestedPendingOperations(false /* exponentialBackOffOnError */)
volumeName := v1.UniqueVolumeName("volume-name") volumeName := v1.UniqueVolumeName("volume-name")
operationPodName := types.UniquePodName("operation-podname") operationPodName := volumetypes.UniquePodName("operation-podname")
operation1DoneCh := make(chan interface{}, 0 /* bufferSize */) operation1DoneCh := make(chan interface{}, 0 /* bufferSize */)
operation1 := generateWaitFunc(operation1DoneCh) operation1 := generateWaitFunc(operation1DoneCh)
err1 := grm.Run(volumeName, operationPodName, types.GeneratedOperations{OperationFunc: operation1}) err1 := grm.Run(volumeName, operationPodName, EmptyNodeName, volumetypes.GeneratedOperations{OperationFunc: operation1})
if err1 != nil { if err1 != nil {
t.Fatalf("NewGoRoutine failed. Expected: <no error> Actual: <%v>", err1) t.Fatalf("NestedPendingOperations failed. Expected: <no error> Actual: <%v>", err1)
} }
operation2 := generateNoopFunc() operation2 := noopFunc
// Act // Act
err2 := grm.Run(volumeName, operationPodName, types.GeneratedOperations{OperationFunc: operation2}) err2 := grm.Run(volumeName, operationPodName, EmptyNodeName, volumetypes.GeneratedOperations{OperationFunc: operation2})
// Assert // Assert
if err2 == nil { if err2 == nil {
t.Fatalf("NewGoRoutine did not fail. Expected: <Failed to create operation with name \"%s\". An operation with that name already exists.> Actual: <no error>", volumeName) t.Fatalf("NestedPendingOperations did not fail. Expected: <Failed to create operation with name \"%s\". An operation with that name already exists.> Actual: <no error>", volumeName)
} }
if !IsAlreadyExists(err2) { if !IsAlreadyExists(err2) {
t.Fatalf("NewGoRoutine did not return alreadyExistsError, got: %v", err2) t.Fatalf("NestedPendingOperations did not return alreadyExistsError, got: %v", err2)
} }
} }
func Test_NewGoRoutineMap_Negative_SecondSubOpBeforeFirstCompletes(t *testing.T) { func Test_NestedPendingOperations_Negative_SecondSubOpBeforeFirstCompletes(t *testing.T) {
// Arrange // Arrange
grm := NewNestedPendingOperations(false /* exponentialBackOffOnError */) grm := NewNestedPendingOperations(false /* exponentialBackOffOnError */)
volumeName := v1.UniqueVolumeName("volume-name") volumeName := v1.UniqueVolumeName("volume-name")
operationPodName := types.UniquePodName("operation-podname") operationPodName := volumetypes.UniquePodName("operation-podname")
operation1DoneCh := make(chan interface{}, 0 /* bufferSize */) operation1DoneCh := make(chan interface{}, 0 /* bufferSize */)
operation1 := generateWaitFunc(operation1DoneCh) operation1 := generateWaitFunc(operation1DoneCh)
err1 := grm.Run(volumeName, operationPodName, types.GeneratedOperations{OperationFunc: operation1}) err1 := grm.Run(volumeName, operationPodName, EmptyNodeName, volumetypes.GeneratedOperations{OperationFunc: operation1})
if err1 != nil { if err1 != nil {
t.Fatalf("NewGoRoutine failed. Expected: <no error> Actual: <%v>", err1) t.Fatalf("NestedPendingOperations failed. Expected: <no error> Actual: <%v>", err1)
} }
operation2 := generateNoopFunc() operation2 := noopFunc
// Act // Act
err2 := grm.Run(volumeName, operationPodName, types.GeneratedOperations{OperationFunc: operation2}) err2 := grm.Run(volumeName, operationPodName, EmptyNodeName, volumetypes.GeneratedOperations{OperationFunc: operation2})
// Assert // Assert
if err2 == nil { if err2 == nil {
t.Fatalf("NewGoRoutine did not fail. Expected: <Failed to create operation with name \"%s\". An operation with that name already exists.> Actual: <no error>", volumeName) t.Fatalf("NestedPendingOperations did not fail. Expected: <Failed to create operation with name \"%s\". An operation with that name already exists.> Actual: <no error>", volumeName)
} }
if !IsAlreadyExists(err2) { if !IsAlreadyExists(err2) {
t.Fatalf("NewGoRoutine did not return alreadyExistsError, got: %v", err2) t.Fatalf("NestedPendingOperations did not return alreadyExistsError, got: %v", err2)
} }
} }
func Test_NewGoRoutineMap_Negative_SecondOpBeforeFirstCompletesWithExpBackoff(t *testing.T) { func Test_NestedPendingOperations_Negative_SecondOpBeforeFirstCompletesWithExpBackoff(t *testing.T) {
// Arrange // Arrange
grm := NewNestedPendingOperations(true /* exponentialBackOffOnError */) grm := NewNestedPendingOperations(true /* exponentialBackOffOnError */)
volumeName := v1.UniqueVolumeName("volume-name") volumeName := v1.UniqueVolumeName("volume-name")
operation1DoneCh := make(chan interface{}, 0 /* bufferSize */) operation1DoneCh := make(chan interface{}, 0 /* bufferSize */)
operation1 := generateWaitFunc(operation1DoneCh) operation1 := generateWaitFunc(operation1DoneCh)
err1 := grm.Run(volumeName, "" /* operationSubName */, types.GeneratedOperations{OperationFunc: operation1}) err1 := grm.Run(volumeName, EmptyUniquePodName, EmptyNodeName, volumetypes.GeneratedOperations{OperationFunc: operation1})
if err1 != nil { if err1 != nil {
t.Fatalf("NewGoRoutine failed. Expected: <no error> Actual: <%v>", err1) t.Fatalf("NestedPendingOperations failed. Expected: <no error> Actual: <%v>", err1)
} }
operation2 := generateNoopFunc() operation2 := noopFunc
// Act // Act
err2 := grm.Run(volumeName, "" /* operationSubName */, types.GeneratedOperations{OperationFunc: operation2}) err2 := grm.Run(volumeName, EmptyUniquePodName, EmptyNodeName, volumetypes.GeneratedOperations{OperationFunc: operation2})
// Assert // Assert
if err2 == nil { if err2 == nil {
t.Fatalf("NewGoRoutine did not fail. Expected: <Failed to create operation with name \"%s\". An operation with that name already exists.> Actual: <no error>", volumeName) t.Fatalf("NestedPendingOperations did not fail. Expected: <Failed to create operation with name \"%s\". An operation with that name already exists.> Actual: <no error>", volumeName)
} }
if !IsAlreadyExists(err2) { if !IsAlreadyExists(err2) {
t.Fatalf("NewGoRoutine did not return alreadyExistsError, got: %v", err2) t.Fatalf("NestedPendingOperations did not return alreadyExistsError, got: %v", err2)
} }
} }
func Test_NewGoRoutineMap_Positive_ThirdOpAfterFirstCompletes(t *testing.T) { func Test_NestedPendingOperations_Positive_ThirdOpAfterFirstCompletes(t *testing.T) {
// Arrange // Arrange
grm := NewNestedPendingOperations(false /* exponentialBackOffOnError */) grm := NewNestedPendingOperations(false /* exponentialBackOffOnError */)
volumeName := v1.UniqueVolumeName("volume-name") volumeName := v1.UniqueVolumeName("volume-name")
operation1DoneCh := make(chan interface{}, 0 /* bufferSize */) operation1DoneCh := make(chan interface{}, 0 /* bufferSize */)
operation1 := generateWaitFunc(operation1DoneCh) operation1 := generateWaitFunc(operation1DoneCh)
err1 := grm.Run(volumeName, "" /* operationSubName */, types.GeneratedOperations{OperationFunc: operation1}) err1 := grm.Run(volumeName, EmptyUniquePodName, EmptyNodeName, volumetypes.GeneratedOperations{OperationFunc: operation1})
if err1 != nil { if err1 != nil {
t.Fatalf("NewGoRoutine failed. Expected: <no error> Actual: <%v>", err1) t.Fatalf("NestedPendingOperations failed. Expected: <no error> Actual: <%v>", err1)
} }
operation2 := generateNoopFunc() operation2 := noopFunc
operation3 := generateNoopFunc() operation3 := noopFunc
// Act // Act
err2 := grm.Run(volumeName, "" /* operationSubName */, types.GeneratedOperations{OperationFunc: operation2}) err2 := grm.Run(volumeName, EmptyUniquePodName, EmptyNodeName, volumetypes.GeneratedOperations{OperationFunc: operation2})
// Assert // Assert
if err2 == nil { if err2 == nil {
t.Fatalf("NewGoRoutine did not fail. Expected: <Failed to create operation with name \"%s\". An operation with that name already exists.> Actual: <no error>", volumeName) t.Fatalf("NestedPendingOperations did not fail. Expected: <Failed to create operation with name \"%s\". An operation with that name already exists.> Actual: <no error>", volumeName)
} }
if !IsAlreadyExists(err2) { if !IsAlreadyExists(err2) {
t.Fatalf("NewGoRoutine did not return alreadyExistsError, got: %v", err2) t.Fatalf("NestedPendingOperations did not return alreadyExistsError, got: %v", err2)
} }
// Act // Act
@ -409,9 +407,9 @@ func Test_NewGoRoutineMap_Positive_ThirdOpAfterFirstCompletes(t *testing.T) {
err3 := retryWithExponentialBackOff( err3 := retryWithExponentialBackOff(
time.Duration(initialOperationWaitTimeShort), time.Duration(initialOperationWaitTimeShort),
func() (bool, error) { func() (bool, error) {
err := grm.Run(volumeName, "" /* operationSubName */, types.GeneratedOperations{OperationFunc: operation3}) err := grm.Run(volumeName, EmptyUniquePodName, EmptyNodeName, volumetypes.GeneratedOperations{OperationFunc: operation3})
if err != nil { if err != nil {
t.Logf("Warning: NewGoRoutine failed with %v. Will retry.", err) t.Logf("Warning: NestedPendingOperations failed with %v. Will retry.", err)
return false, nil return false, nil
} }
return true, nil return true, nil
@ -420,32 +418,32 @@ func Test_NewGoRoutineMap_Positive_ThirdOpAfterFirstCompletes(t *testing.T) {
// Assert // Assert
if err3 != nil { if err3 != nil {
t.Fatalf("NewGoRoutine failed. Expected: <no error> Actual: <%v>", err3) t.Fatalf("NestedPendingOperations failed. Expected: <no error> Actual: <%v>", err3)
} }
} }
func Test_NewGoRoutineMap_Positive_ThirdOpAfterFirstCompletesWithExpBackoff(t *testing.T) { func Test_NestedPendingOperations_Positive_ThirdOpAfterFirstCompletesWithExpBackoff(t *testing.T) {
// Arrange // Arrange
grm := NewNestedPendingOperations(true /* exponentialBackOffOnError */) grm := NewNestedPendingOperations(true /* exponentialBackOffOnError */)
volumeName := v1.UniqueVolumeName("volume-name") volumeName := v1.UniqueVolumeName("volume-name")
operation1DoneCh := make(chan interface{}, 0 /* bufferSize */) operation1DoneCh := make(chan interface{}, 0 /* bufferSize */)
operation1 := generateWaitFunc(operation1DoneCh) operation1 := generateWaitFunc(operation1DoneCh)
err1 := grm.Run(volumeName, "" /* operationSubName */, types.GeneratedOperations{OperationFunc: operation1}) err1 := grm.Run(volumeName, EmptyUniquePodName, EmptyNodeName, volumetypes.GeneratedOperations{OperationFunc: operation1})
if err1 != nil { if err1 != nil {
t.Fatalf("NewGoRoutine failed. Expected: <no error> Actual: <%v>", err1) t.Fatalf("NestedPendingOperations failed. Expected: <no error> Actual: <%v>", err1)
} }
operation2 := generateNoopFunc() operation2 := noopFunc
operation3 := generateNoopFunc() operation3 := noopFunc
// Act // Act
err2 := grm.Run(volumeName, "" /* operationSubName */, types.GeneratedOperations{OperationFunc: operation2}) err2 := grm.Run(volumeName, EmptyUniquePodName, EmptyNodeName, volumetypes.GeneratedOperations{OperationFunc: operation2})
// Assert // Assert
if err2 == nil { if err2 == nil {
t.Fatalf("NewGoRoutine did not fail. Expected: <Failed to create operation with name \"%s\". An operation with that name already exists.> Actual: <no error>", volumeName) t.Fatalf("NestedPendingOperations did not fail. Expected: <Failed to create operation with name \"%s\". An operation with that name already exists.> Actual: <no error>", volumeName)
} }
if !IsAlreadyExists(err2) { if !IsAlreadyExists(err2) {
t.Fatalf("NewGoRoutine did not return alreadyExistsError, got: %v", err2) t.Fatalf("NestedPendingOperations did not return alreadyExistsError, got: %v", err2)
} }
// Act // Act
@ -453,9 +451,9 @@ func Test_NewGoRoutineMap_Positive_ThirdOpAfterFirstCompletesWithExpBackoff(t *t
err3 := retryWithExponentialBackOff( err3 := retryWithExponentialBackOff(
time.Duration(initialOperationWaitTimeShort), time.Duration(initialOperationWaitTimeShort),
func() (bool, error) { func() (bool, error) {
err := grm.Run(volumeName, "" /* operationSubName */, types.GeneratedOperations{OperationFunc: operation3}) err := grm.Run(volumeName, EmptyUniquePodName, EmptyNodeName, volumetypes.GeneratedOperations{OperationFunc: operation3})
if err != nil { if err != nil {
t.Logf("Warning: NewGoRoutine failed with %v. Will retry.", err) t.Logf("Warning: NestedPendingOperations failed with %v. Will retry.", err)
return false, nil return false, nil
} }
return true, nil return true, nil
@ -464,11 +462,11 @@ func Test_NewGoRoutineMap_Positive_ThirdOpAfterFirstCompletesWithExpBackoff(t *t
// Assert // Assert
if err3 != nil { if err3 != nil {
t.Fatalf("NewGoRoutine failed. Expected: <no error> Actual: <%v>", err3) t.Fatalf("NestedPendingOperations failed. Expected: <no error> Actual: <%v>", err3)
} }
} }
func Test_NewGoRoutineMap_Positive_WaitEmpty(t *testing.T) { func Test_NestedPendingOperations_Positive_WaitEmpty(t *testing.T) {
// Test than Wait() on empty GoRoutineMap always succeeds without blocking // Test than Wait() on empty GoRoutineMap always succeeds without blocking
// Arrange // Arrange
grm := NewNestedPendingOperations(false /* exponentialBackOffOnError */) grm := NewNestedPendingOperations(false /* exponentialBackOffOnError */)
@ -487,7 +485,7 @@ func Test_NewGoRoutineMap_Positive_WaitEmpty(t *testing.T) {
} }
} }
func Test_NewGoRoutineMap_Positive_WaitEmptyWithExpBackoff(t *testing.T) { func Test_NestedPendingOperations_Positive_WaitEmptyWithExpBackoff(t *testing.T) {
// Test than Wait() on empty GoRoutineMap always succeeds without blocking // Test than Wait() on empty GoRoutineMap always succeeds without blocking
// Arrange // Arrange
grm := NewNestedPendingOperations(true /* exponentialBackOffOnError */) grm := NewNestedPendingOperations(true /* exponentialBackOffOnError */)
@ -506,16 +504,16 @@ func Test_NewGoRoutineMap_Positive_WaitEmptyWithExpBackoff(t *testing.T) {
} }
} }
func Test_NewGoRoutineMap_Positive_Wait(t *testing.T) { func Test_NestedPendingOperations_Positive_Wait(t *testing.T) {
// Test that Wait() really blocks until the last operation succeeds // Test that Wait() really blocks until the last operation succeeds
// Arrange // Arrange
grm := NewNestedPendingOperations(false /* exponentialBackOffOnError */) grm := NewNestedPendingOperations(false /* exponentialBackOffOnError */)
volumeName := v1.UniqueVolumeName("volume-name") volumeName := v1.UniqueVolumeName("volume-name")
operation1DoneCh := make(chan interface{}, 0 /* bufferSize */) operation1DoneCh := make(chan interface{}, 0 /* bufferSize */)
operation1 := generateWaitFunc(operation1DoneCh) operation1 := generateWaitFunc(operation1DoneCh)
err := grm.Run(volumeName, "" /* operationSubName */, types.GeneratedOperations{OperationFunc: operation1}) err := grm.Run(volumeName, EmptyUniquePodName, EmptyNodeName, volumetypes.GeneratedOperations{OperationFunc: operation1})
if err != nil { if err != nil {
t.Fatalf("NewGoRoutine failed. Expected: <no error> Actual: <%v>", err) t.Fatalf("NestedPendingOperations failed. Expected: <no error> Actual: <%v>", err)
} }
// Act // Act
@ -535,16 +533,16 @@ func Test_NewGoRoutineMap_Positive_Wait(t *testing.T) {
} }
} }
func Test_NewGoRoutineMap_Positive_WaitWithExpBackoff(t *testing.T) { func Test_NestedPendingOperations_Positive_WaitWithExpBackoff(t *testing.T) {
// Test that Wait() really blocks until the last operation succeeds // Test that Wait() really blocks until the last operation succeeds
// Arrange // Arrange
grm := NewNestedPendingOperations(true /* exponentialBackOffOnError */) grm := NewNestedPendingOperations(true /* exponentialBackOffOnError */)
volumeName := v1.UniqueVolumeName("volume-name") volumeName := v1.UniqueVolumeName("volume-name")
operation1DoneCh := make(chan interface{}, 0 /* bufferSize */) operation1DoneCh := make(chan interface{}, 0 /* bufferSize */)
operation1 := generateWaitFunc(operation1DoneCh) operation1 := generateWaitFunc(operation1DoneCh)
err := grm.Run(volumeName, "" /* operationSubName */, types.GeneratedOperations{OperationFunc: operation1}) err := grm.Run(volumeName, EmptyUniquePodName, EmptyNodeName, volumetypes.GeneratedOperations{OperationFunc: operation1})
if err != nil { if err != nil {
t.Fatalf("NewGoRoutine failed. Expected: <no error> Actual: <%v>", err) t.Fatalf("NestedPendingOperations failed. Expected: <no error> Actual: <%v>", err)
} }
// Act // Act
@ -564,6 +562,268 @@ func Test_NewGoRoutineMap_Positive_WaitWithExpBackoff(t *testing.T) {
} }
} }
/* Concurrent operations tests */
// "None" means volume, pod, and node names are all empty
// "Volume" means volume name is set, but pod name and node name are empty
// "Volume Pod" means volume and pod names are set, but the node name is empty
// "Volume Node" means volume and node names are set, but the pod name is empty
// The same volume, pod, and node names are used (where they are not empty).
// Covered cases:
// FIRST OP | SECOND OP | RESULT
// None | None | Positive
// None | Volume | Positive
// None | Volume Pod | Positive
// None | Volume Node | Positive
// Volume | None | Positive
// Volume | Volume | Negative (covered in Test_NestedPendingOperations_Negative_SecondOpBeforeFirstCompletes above)
// Volume | Volume Pod | Negative
// Volume | Volume Node | Negative
// Volume Pod | None | Positive
// Volume Pod | Volume | Negative
// Volume Pod | Volume Pod | Negative (covered in Test_NestedPendingOperations_Negative_SecondSubOpBeforeFirstCompletes above)
// Volume Node | None | Positive
// Volume Node | Volume | Negative
// Volume Node | Volume Node | Negative
// These cases are not covered because they will never occur within the same
// binary, so either result works.
// Volume Pod | Volume Node
// Volume Node | Volume Pod
func Test_NestedPendingOperations_SecondOpBeforeFirstCompletes(t *testing.T) {
const (
keyNone = iota
keyVolume
keyVolumePod
keyVolumeNode
)
type testCase struct {
testID int
keyTypes []int // only 2 elements are supported
expectPass bool
}
tests := []testCase{
{testID: 1, keyTypes: []int{keyNone, keyNone}, expectPass: true},
{testID: 2, keyTypes: []int{keyNone, keyVolume}, expectPass: true},
{testID: 3, keyTypes: []int{keyNone, keyVolumePod}, expectPass: true},
{testID: 4, keyTypes: []int{keyNone, keyVolumeNode}, expectPass: true},
{testID: 5, keyTypes: []int{keyVolume, keyNone}, expectPass: true},
{testID: 6, keyTypes: []int{keyVolume, keyVolumePod}, expectPass: false},
{testID: 7, keyTypes: []int{keyVolume, keyVolumeNode}, expectPass: false},
{testID: 8, keyTypes: []int{keyVolumePod, keyNone}, expectPass: true},
{testID: 9, keyTypes: []int{keyVolumePod, keyVolume}, expectPass: false},
{testID: 10, keyTypes: []int{keyVolumeNode, keyNone}, expectPass: true},
{testID: 11, keyTypes: []int{keyVolumeNode, keyVolume}, expectPass: false},
{testID: 12, keyTypes: []int{keyVolumeNode, keyVolumeNode}, expectPass: false},
}
for _, test := range tests {
var (
volumeNames []v1.UniqueVolumeName
podNames []volumetypes.UniquePodName
nodeNames []types.NodeName
)
for _, keyType := range test.keyTypes {
var (
v v1.UniqueVolumeName
p volumetypes.UniquePodName
n types.NodeName
)
switch keyType {
case keyNone:
v = EmptyUniqueVolumeName
p = EmptyUniquePodName
n = EmptyNodeName
case keyVolume:
v = v1.UniqueVolumeName("volume-name")
p = EmptyUniquePodName
n = EmptyNodeName
case keyVolumePod:
v = v1.UniqueVolumeName("volume-name")
p = volumetypes.UniquePodName("operation-podname")
n = EmptyNodeName
case keyVolumeNode:
v = v1.UniqueVolumeName("volume-name")
p = EmptyUniquePodName
n = types.NodeName("operation-nodename")
}
volumeNames = append(volumeNames, v)
podNames = append(podNames, p)
nodeNames = append(nodeNames, n)
}
t.Run(fmt.Sprintf("Test %d", test.testID), func(t *testing.T) {
if test.expectPass {
testConcurrentOperationsPositive(t,
volumeNames[0], podNames[0], nodeNames[0],
volumeNames[1], podNames[1], nodeNames[1],
)
} else {
testConcurrentOperationsNegative(t,
volumeNames[0], podNames[0], nodeNames[0],
volumeNames[1], podNames[1], nodeNames[1],
)
}
})
}
}
func Test_NestedPendingOperations_Positive_Issue_88355(t *testing.T) {
// This test reproduces the scenario that is likely to have caused
// kubernetes/kubernetes issue #88355.
// Please refer to the issue for more context:
// https://github.com/kubernetes/kubernetes/issues/88355
// Below, vx is a volume name, and nx is a node name.
// Operation sequence:
// opZ(v0) starts (operates on a different volume from all other operations)
// op1(v1, n1) starts
// op2(v1, n2) starts
// opZ(v0) ends with success
// op2(v1, n2) ends with an error (exponential backoff should be triggered)
// op1(v1, n1) ends with success
// op3(v1, n2) starts (continuously retried on exponential backoff error)
// op3(v1, n2) ends with success
// op4(v1, n2) starts
// op4(v1, n2) ends with success
const (
mainVolumeName = "main-volume"
opZVolumeName = "other-volume"
node1 = "node1"
node2 = "node2"
// delay after an operation is signaled to finish to ensure it actually
// finishes before running the next operation.
delay = 50 * time.Millisecond
// Replicates the default AttachDetachController reconcile period
reconcilerPeriod = 100 * time.Millisecond
)
grm := NewNestedPendingOperations(true /* exponentialBackOffOnError */)
opZContinueCh := make(chan interface{}, 0 /* bufferSize */)
op1ContinueCh := make(chan interface{}, 0 /* bufferSize */)
op2ContinueCh := make(chan interface{}, 0 /* bufferSize */)
operationZ := generateWaitFunc(opZContinueCh)
operation1 := generateWaitFunc(op1ContinueCh)
operation2 := generateWaitWithErrorFunc(op2ContinueCh)
operation3 := noopFunc
operation4 := noopFunc
errZ := grm.Run(opZVolumeName, "" /* podName */, "" /* nodeName */, volumetypes.GeneratedOperations{OperationFunc: operationZ})
if errZ != nil {
t.Fatalf("NestedPendingOperations failed for operationZ. Expected: <no error> Actual: <%v>", errZ)
}
err1 := grm.Run(mainVolumeName, "" /* podName */, node1, volumetypes.GeneratedOperations{OperationFunc: operation1})
if err1 != nil {
t.Fatalf("NestedPendingOperations failed for operation1. Expected: <no error> Actual: <%v>", err1)
}
err2 := grm.Run(mainVolumeName, "" /* podName */, node2, volumetypes.GeneratedOperations{OperationFunc: operation2})
if err2 != nil {
t.Fatalf("NestedPendingOperations failed for operation2. Expected: <no error> Actual: <%v>", err2)
}
opZContinueCh <- true
time.Sleep(delay)
op2ContinueCh <- true
time.Sleep(delay)
op1ContinueCh <- true
time.Sleep(delay)
for {
err3 := grm.Run(mainVolumeName, "" /* podName */, node2, volumetypes.GeneratedOperations{OperationFunc: operation3})
if err3 == nil {
break
} else if !exponentialbackoff.IsExponentialBackoff(err3) {
t.Fatalf("NestedPendingOperations failed. Expected: <no error> Actual: <%v>", err3)
}
time.Sleep(reconcilerPeriod)
}
time.Sleep(delay)
err4 := grm.Run(mainVolumeName, "" /* podName */, node2, volumetypes.GeneratedOperations{OperationFunc: operation4})
if err4 != nil {
t.Fatalf("NestedPendingOperations failed. Expected: <no error> Actual: <%v>", err4)
}
}
// testConcurrentOperationsPositive passes if the two operations keyed by the
// provided parameters are executed in parallel, and fails otherwise.
func testConcurrentOperationsPositive(
t *testing.T,
volumeName1 v1.UniqueVolumeName,
podName1 volumetypes.UniquePodName,
nodeName1 types.NodeName,
volumeName2 v1.UniqueVolumeName,
podName2 volumetypes.UniquePodName,
nodeName2 types.NodeName) {
// Arrange
grm := NewNestedPendingOperations(false /* exponentialBackOffOnError */)
operation1DoneCh := make(chan interface{}, 0 /* bufferSize */)
operation1 := generateWaitFunc(operation1DoneCh)
err1 := grm.Run(volumeName1, podName1, nodeName1 /* nodeName */, volumetypes.GeneratedOperations{OperationFunc: operation1})
if err1 != nil {
t.Errorf("NestedPendingOperations failed. Expected: <no error> Actual: <%v>", err1)
}
operation2 := noopFunc
// Act
err2 := grm.Run(volumeName2, podName2, nodeName2, volumetypes.GeneratedOperations{OperationFunc: operation2})
// Assert
if err2 != nil {
t.Errorf("NestedPendingOperations failed. Expected: <no error> Actual: <%v>", err2)
}
}
// testConcurrentOperationsNegative passes if the creation of the second
// operation returns an alreadyExists error, and fails otherwise.
func testConcurrentOperationsNegative(
t *testing.T,
volumeName1 v1.UniqueVolumeName,
podName1 volumetypes.UniquePodName,
nodeName1 types.NodeName,
volumeName2 v1.UniqueVolumeName,
podName2 volumetypes.UniquePodName,
nodeName2 types.NodeName) {
// Arrange
grm := NewNestedPendingOperations(false /* exponentialBackOffOnError */)
operation1DoneCh := make(chan interface{}, 0 /* bufferSize */)
operation1 := generateWaitFunc(operation1DoneCh)
err1 := grm.Run(volumeName1, podName1, nodeName1 /* nodeName */, volumetypes.GeneratedOperations{OperationFunc: operation1})
if err1 != nil {
t.Errorf("NestedPendingOperations failed. Expected: <no error> Actual: <%v>", err1)
}
operation2 := noopFunc
// Act
err2 := grm.Run(volumeName2, podName2, nodeName2, volumetypes.GeneratedOperations{OperationFunc: operation2})
// Assert
if err2 == nil {
t.Errorf("NestedPendingOperations did not fail. Expected an operation to already exist")
}
if !IsAlreadyExists(err2) {
t.Errorf("NestedPendingOperations did not return alreadyExistsError, got: %v", err2)
}
}
/* END concurrent operations tests */
func generateCallbackFunc(done chan<- interface{}) func() (error, error) { func generateCallbackFunc(done chan<- interface{}) func() (error, error) {
return func() (error, error) { return func() (error, error) {
done <- true done <- true
@ -578,21 +838,22 @@ func generateWaitFunc(done <-chan interface{}) func() (error, error) {
} }
} }
func generatePanicFunc() func() (error, error) { func panicFunc() (error, error) {
return func() (error, error) {
panic("testing panic") panic("testing panic")
}
} }
func generateErrorFunc() func() (error, error) { func errorFunc() (error, error) {
return fmt.Errorf("placeholder1"), fmt.Errorf("placeholder2")
}
func generateWaitWithErrorFunc(done <-chan interface{}) func() (error, error) {
return func() (error, error) { return func() (error, error) {
return fmt.Errorf("placholder1"), fmt.Errorf("placeholder2") <-done
return fmt.Errorf("placeholder1"), fmt.Errorf("placeholder2")
} }
} }
func generateNoopFunc() func() (error, error) { func noopFunc() (error, error) { return nil, nil }
return func() (error, error) { return nil, nil }
}
func retryWithExponentialBackOff(initialDuration time.Duration, fn wait.ConditionFunc) error { func retryWithExponentialBackOff(initialDuration time.Duration, fn wait.ConditionFunc) error {
backoff := wait.Backoff{ backoff := wait.Backoff{

View File

@ -138,9 +138,9 @@ type OperationExecutor interface {
// back off on retries. // back off on retries.
VerifyControllerAttachedVolume(volumeToMount VolumeToMount, nodeName types.NodeName, actualStateOfWorld ActualStateOfWorldAttacherUpdater) error VerifyControllerAttachedVolume(volumeToMount VolumeToMount, nodeName types.NodeName, actualStateOfWorld ActualStateOfWorldAttacherUpdater) error
// IsOperationPending returns true if an operation for the given volumeName and podName is pending, // IsOperationPending returns true if an operation for the given volumeName
// otherwise it returns false // and one of podName or nodeName is pending, otherwise it returns false
IsOperationPending(volumeName v1.UniqueVolumeName, podName volumetypes.UniquePodName) bool IsOperationPending(volumeName v1.UniqueVolumeName, podName volumetypes.UniquePodName, nodeName types.NodeName) bool
// ExpandInUseVolume will resize volume's file system to expected size without unmounting the volume. // ExpandInUseVolume will resize volume's file system to expected size without unmounting the volume.
ExpandInUseVolume(volumeToMount VolumeToMount, actualStateOfWorld ActualStateOfWorldMounterUpdater) error ExpandInUseVolume(volumeToMount VolumeToMount, actualStateOfWorld ActualStateOfWorldMounterUpdater) error
// ReconstructVolumeOperation construct a new volumeSpec and returns it created by plugin // ReconstructVolumeOperation construct a new volumeSpec and returns it created by plugin
@ -650,8 +650,11 @@ type operationExecutor struct {
operationGenerator OperationGenerator operationGenerator OperationGenerator
} }
func (oe *operationExecutor) IsOperationPending(volumeName v1.UniqueVolumeName, podName volumetypes.UniquePodName) bool { func (oe *operationExecutor) IsOperationPending(
return oe.pendingOperations.IsOperationPending(volumeName, podName) volumeName v1.UniqueVolumeName,
podName volumetypes.UniquePodName,
nodeName types.NodeName) bool {
return oe.pendingOperations.IsOperationPending(volumeName, podName, nodeName)
} }
func (oe *operationExecutor) AttachVolume( func (oe *operationExecutor) AttachVolume(
@ -660,8 +663,13 @@ func (oe *operationExecutor) AttachVolume(
generatedOperations := generatedOperations :=
oe.operationGenerator.GenerateAttachVolumeFunc(volumeToAttach, actualStateOfWorld) oe.operationGenerator.GenerateAttachVolumeFunc(volumeToAttach, actualStateOfWorld)
if util.IsMultiAttachAllowed(volumeToAttach.VolumeSpec) {
return oe.pendingOperations.Run( return oe.pendingOperations.Run(
volumeToAttach.VolumeName, "" /* podName */, generatedOperations) volumeToAttach.VolumeName, "" /* podName */, volumeToAttach.NodeName, generatedOperations)
}
return oe.pendingOperations.Run(
volumeToAttach.VolumeName, "" /* podName */, "" /* nodeName */, generatedOperations)
} }
func (oe *operationExecutor) DetachVolume( func (oe *operationExecutor) DetachVolume(
@ -674,8 +682,13 @@ func (oe *operationExecutor) DetachVolume(
return err return err
} }
if util.IsMultiAttachAllowed(volumeToDetach.VolumeSpec) {
return oe.pendingOperations.Run( return oe.pendingOperations.Run(
volumeToDetach.VolumeName, "" /* podName */, generatedOperations) volumeToDetach.VolumeName, "" /* podName */, volumeToDetach.NodeName, generatedOperations)
}
return oe.pendingOperations.Run(
volumeToDetach.VolumeName, "" /* podName */, "" /* nodeName */, generatedOperations)
} }
func (oe *operationExecutor) VerifyVolumesAreAttached( func (oe *operationExecutor) VerifyVolumesAreAttached(
@ -761,7 +774,7 @@ func (oe *operationExecutor) VerifyVolumesAreAttached(
// Ugly hack to ensure - we don't do parallel bulk polling of same volume plugin // Ugly hack to ensure - we don't do parallel bulk polling of same volume plugin
uniquePluginName := v1.UniqueVolumeName(pluginName) uniquePluginName := v1.UniqueVolumeName(pluginName)
err = oe.pendingOperations.Run(uniquePluginName, "" /* Pod Name */, generatedOperations) err = oe.pendingOperations.Run(uniquePluginName, "" /* Pod Name */, "" /* nodeName */, generatedOperations)
if err != nil { if err != nil {
klog.Errorf("BulkVerifyVolumes.Run Error bulk volume verification for plugin %q with %v", pluginName, err) klog.Errorf("BulkVerifyVolumes.Run Error bulk volume verification for plugin %q with %v", pluginName, err)
} }
@ -779,7 +792,7 @@ func (oe *operationExecutor) VerifyVolumesAreAttachedPerNode(
} }
// Give an empty UniqueVolumeName so that this operation could be executed concurrently. // Give an empty UniqueVolumeName so that this operation could be executed concurrently.
return oe.pendingOperations.Run("" /* volumeName */, "" /* podName */, generatedOperations) return oe.pendingOperations.Run("" /* volumeName */, "" /* podName */, "" /* nodeName */, generatedOperations)
} }
func (oe *operationExecutor) MountVolume( func (oe *operationExecutor) MountVolume(
@ -820,7 +833,7 @@ func (oe *operationExecutor) MountVolume(
// TODO mount_device // TODO mount_device
return oe.pendingOperations.Run( return oe.pendingOperations.Run(
volumeToMount.VolumeName, podName, generatedOperations) volumeToMount.VolumeName, podName, "" /* nodeName */, generatedOperations)
} }
func (oe *operationExecutor) UnmountVolume( func (oe *operationExecutor) UnmountVolume(
@ -851,7 +864,7 @@ func (oe *operationExecutor) UnmountVolume(
podName := volumetypes.UniquePodName(volumeToUnmount.PodUID) podName := volumetypes.UniquePodName(volumeToUnmount.PodUID)
return oe.pendingOperations.Run( return oe.pendingOperations.Run(
volumeToUnmount.VolumeName, podName, generatedOperations) volumeToUnmount.VolumeName, podName, "" /* nodeName */, generatedOperations)
} }
func (oe *operationExecutor) UnmountDevice( func (oe *operationExecutor) UnmountDevice(
@ -882,7 +895,7 @@ func (oe *operationExecutor) UnmountDevice(
podName := nestedpendingoperations.EmptyUniquePodName podName := nestedpendingoperations.EmptyUniquePodName
return oe.pendingOperations.Run( return oe.pendingOperations.Run(
deviceToDetach.VolumeName, podName, generatedOperations) deviceToDetach.VolumeName, podName, "" /* nodeName */, generatedOperations)
} }
func (oe *operationExecutor) ExpandInUseVolume(volumeToMount VolumeToMount, actualStateOfWorld ActualStateOfWorldMounterUpdater) error { func (oe *operationExecutor) ExpandInUseVolume(volumeToMount VolumeToMount, actualStateOfWorld ActualStateOfWorldMounterUpdater) error {
@ -890,7 +903,7 @@ func (oe *operationExecutor) ExpandInUseVolume(volumeToMount VolumeToMount, actu
if err != nil { if err != nil {
return err return err
} }
return oe.pendingOperations.Run(volumeToMount.VolumeName, "", generatedOperations) return oe.pendingOperations.Run(volumeToMount.VolumeName, "", "" /* nodeName */, generatedOperations)
} }
func (oe *operationExecutor) VerifyControllerAttachedVolume( func (oe *operationExecutor) VerifyControllerAttachedVolume(
@ -904,7 +917,7 @@ func (oe *operationExecutor) VerifyControllerAttachedVolume(
} }
return oe.pendingOperations.Run( return oe.pendingOperations.Run(
volumeToMount.VolumeName, "" /* podName */, generatedOperations) volumeToMount.VolumeName, "" /* podName */, "" /* nodeName */, generatedOperations)
} }
// ReconstructVolumeOperation return a func to create volumeSpec from mount path // ReconstructVolumeOperation return a func to create volumeSpec from mount path

View File

@ -17,6 +17,7 @@ limitations under the License.
package operationexecutor package operationexecutor
import ( import (
"fmt"
"strconv" "strconv"
"testing" "testing"
"time" "time"
@ -180,7 +181,7 @@ func TestOperationExecutor_UnmountDeviceConcurrently(t *testing.T) {
} }
} }
func TestOperationExecutor_AttachVolumeConcurrently(t *testing.T) { func TestOperationExecutor_AttachSingleNodeVolumeConcurrentlyToSameNode(t *testing.T) {
// Arrange // Arrange
ch, quit, oe := setup() ch, quit, oe := setup()
volumesToAttach := make([]VolumeToAttach, numVolumesToAttach) volumesToAttach := make([]VolumeToAttach, numVolumesToAttach)
@ -191,6 +192,13 @@ func TestOperationExecutor_AttachVolumeConcurrently(t *testing.T) {
volumesToAttach[i] = VolumeToAttach{ volumesToAttach[i] = VolumeToAttach{
VolumeName: v1.UniqueVolumeName(pdName), VolumeName: v1.UniqueVolumeName(pdName),
NodeName: "node", NodeName: "node",
VolumeSpec: &volume.Spec{
PersistentVolume: &v1.PersistentVolume{
Spec: v1.PersistentVolumeSpec{
AccessModes: []v1.PersistentVolumeAccessMode{v1.ReadWriteOnce},
},
},
},
} }
oe.AttachVolume(volumesToAttach[i], nil /* actualStateOfWorldAttacherUpdater */) oe.AttachVolume(volumesToAttach[i], nil /* actualStateOfWorldAttacherUpdater */)
} }
@ -201,7 +209,91 @@ func TestOperationExecutor_AttachVolumeConcurrently(t *testing.T) {
} }
} }
func TestOperationExecutor_DetachVolumeConcurrently(t *testing.T) { func TestOperationExecutor_AttachMultiNodeVolumeConcurrentlyToSameNode(t *testing.T) {
// Arrange
ch, quit, oe := setup()
volumesToAttach := make([]VolumeToAttach, numVolumesToAttach)
pdName := "pd-volume"
// Act
for i := range volumesToAttach {
volumesToAttach[i] = VolumeToAttach{
VolumeName: v1.UniqueVolumeName(pdName),
NodeName: "node",
VolumeSpec: &volume.Spec{
PersistentVolume: &v1.PersistentVolume{
Spec: v1.PersistentVolumeSpec{
AccessModes: []v1.PersistentVolumeAccessMode{v1.ReadOnlyMany},
},
},
},
}
oe.AttachVolume(volumesToAttach[i], nil /* actualStateOfWorldAttacherUpdater */)
}
// Assert
if !isOperationRunSerially(ch, quit) {
t.Fatalf("Attach volume operations should not start concurrently")
}
}
func TestOperationExecutor_AttachSingleNodeVolumeConcurrentlyToDifferentNodes(t *testing.T) {
// Arrange
ch, quit, oe := setup()
volumesToAttach := make([]VolumeToAttach, numVolumesToAttach)
pdName := "pd-volume"
// Act
for i := range volumesToAttach {
volumesToAttach[i] = VolumeToAttach{
VolumeName: v1.UniqueVolumeName(pdName),
NodeName: types.NodeName(fmt.Sprintf("node%d", i)),
VolumeSpec: &volume.Spec{
PersistentVolume: &v1.PersistentVolume{
Spec: v1.PersistentVolumeSpec{
AccessModes: []v1.PersistentVolumeAccessMode{v1.ReadWriteOnce},
},
},
},
}
oe.AttachVolume(volumesToAttach[i], nil /* actualStateOfWorldAttacherUpdater */)
}
// Assert
if !isOperationRunSerially(ch, quit) {
t.Fatalf("Attach volume operations should not start concurrently")
}
}
func TestOperationExecutor_AttachMultiNodeVolumeConcurrentlyToDifferentNodes(t *testing.T) {
// Arrange
ch, quit, oe := setup()
volumesToAttach := make([]VolumeToAttach, numVolumesToAttach)
pdName := "pd-volume"
// Act
for i := range volumesToAttach {
volumesToAttach[i] = VolumeToAttach{
VolumeName: v1.UniqueVolumeName(pdName),
NodeName: types.NodeName(fmt.Sprintf("node%d", i)),
VolumeSpec: &volume.Spec{
PersistentVolume: &v1.PersistentVolume{
Spec: v1.PersistentVolumeSpec{
AccessModes: []v1.PersistentVolumeAccessMode{v1.ReadOnlyMany},
},
},
},
}
oe.AttachVolume(volumesToAttach[i], nil /* actualStateOfWorldAttacherUpdater */)
}
// Assert
if !isOperationRunConcurrently(ch, quit, numVolumesToAttach) {
t.Fatalf("Attach volume operations should not execute serially")
}
}
func TestOperationExecutor_DetachSingleNodeVolumeConcurrentlyFromSameNode(t *testing.T) {
// Arrange // Arrange
ch, quit, oe := setup() ch, quit, oe := setup()
attachedVolumes := make([]AttachedVolume, numVolumesToDetach) attachedVolumes := make([]AttachedVolume, numVolumesToDetach)
@ -212,6 +304,13 @@ func TestOperationExecutor_DetachVolumeConcurrently(t *testing.T) {
attachedVolumes[i] = AttachedVolume{ attachedVolumes[i] = AttachedVolume{
VolumeName: v1.UniqueVolumeName(pdName), VolumeName: v1.UniqueVolumeName(pdName),
NodeName: "node", NodeName: "node",
VolumeSpec: &volume.Spec{
PersistentVolume: &v1.PersistentVolume{
Spec: v1.PersistentVolumeSpec{
AccessModes: []v1.PersistentVolumeAccessMode{v1.ReadWriteOnce},
},
},
},
} }
oe.DetachVolume(attachedVolumes[i], true /* verifySafeToDetach */, nil /* actualStateOfWorldAttacherUpdater */) oe.DetachVolume(attachedVolumes[i], true /* verifySafeToDetach */, nil /* actualStateOfWorldAttacherUpdater */)
} }
@ -222,7 +321,63 @@ func TestOperationExecutor_DetachVolumeConcurrently(t *testing.T) {
} }
} }
func TestOperationExecutor_VerifyVolumesAreAttachedConcurrently(t *testing.T) { func TestOperationExecutor_DetachMultiNodeVolumeConcurrentlyFromSameNode(t *testing.T) {
// Arrange
ch, quit, oe := setup()
attachedVolumes := make([]AttachedVolume, numVolumesToDetach)
pdName := "pd-volume"
// Act
for i := range attachedVolumes {
attachedVolumes[i] = AttachedVolume{
VolumeName: v1.UniqueVolumeName(pdName),
NodeName: "node",
VolumeSpec: &volume.Spec{
PersistentVolume: &v1.PersistentVolume{
Spec: v1.PersistentVolumeSpec{
AccessModes: []v1.PersistentVolumeAccessMode{v1.ReadOnlyMany},
},
},
},
}
oe.DetachVolume(attachedVolumes[i], true /* verifySafeToDetach */, nil /* actualStateOfWorldAttacherUpdater */)
}
// Assert
if !isOperationRunSerially(ch, quit) {
t.Fatalf("DetachVolume operations should not run concurrently")
}
}
func TestOperationExecutor_DetachMultiNodeVolumeConcurrentlyFromDifferentNodes(t *testing.T) {
// Arrange
ch, quit, oe := setup()
attachedVolumes := make([]AttachedVolume, numVolumesToDetach)
pdName := "pd-volume"
// Act
for i := range attachedVolumes {
attachedVolumes[i] = AttachedVolume{
VolumeName: v1.UniqueVolumeName(pdName),
NodeName: types.NodeName(fmt.Sprintf("node%d", i)),
VolumeSpec: &volume.Spec{
PersistentVolume: &v1.PersistentVolume{
Spec: v1.PersistentVolumeSpec{
AccessModes: []v1.PersistentVolumeAccessMode{v1.ReadOnlyMany},
},
},
},
}
oe.DetachVolume(attachedVolumes[i], true /* verifySafeToDetach */, nil /* actualStateOfWorldAttacherUpdater */)
}
// Assert
if !isOperationRunConcurrently(ch, quit, numVolumesToDetach) {
t.Fatalf("Attach volume operations should not execute serially")
}
}
func TestOperationExecutor_VerifyVolumesAreAttachedConcurrentlyOnSameNode(t *testing.T) {
// Arrange // Arrange
ch, quit, oe := setup() ch, quit, oe := setup()
@ -237,6 +392,24 @@ func TestOperationExecutor_VerifyVolumesAreAttachedConcurrently(t *testing.T) {
} }
} }
func TestOperationExecutor_VerifyVolumesAreAttachedConcurrentlyOnDifferentNodes(t *testing.T) {
// Arrange
ch, quit, oe := setup()
// Act
for i := 0; i < numVolumesToVerifyAttached; i++ {
oe.VerifyVolumesAreAttachedPerNode(
nil, /* attachedVolumes */
types.NodeName(fmt.Sprintf("node-name-%d", i)),
nil /* actualStateOfWorldAttacherUpdater */)
}
// Assert
if !isOperationRunConcurrently(ch, quit, numVolumesToVerifyAttached) {
t.Fatalf("VerifyVolumesAreAttached operation is not being run concurrently")
}
}
func TestOperationExecutor_VerifyControllerAttachedVolumeConcurrently(t *testing.T) { func TestOperationExecutor_VerifyControllerAttachedVolumeConcurrently(t *testing.T) {
// Arrange // Arrange
ch, quit, oe := setup() ch, quit, oe := setup()

View File

@ -645,3 +645,44 @@ func WriteVolumeCache(deviceMountPath string, exec utilexec.Interface) error {
// For linux runtime, it skips because unmount will automatically flush disk data // For linux runtime, it skips because unmount will automatically flush disk data
return nil return nil
} }
// IsMultiAttachAllowed checks if attaching this volume to multiple nodes is definitely not allowed/possible.
// In its current form, this function can only reliably say for which volumes it's definitely forbidden. If it returns
// false, it is not guaranteed that multi-attach is actually supported by the volume type and we must rely on the
// attacher to fail fast in such cases.
// Please see https://github.com/kubernetes/kubernetes/issues/40669 and https://github.com/kubernetes/kubernetes/pull/40148#discussion_r98055047
func IsMultiAttachAllowed(volumeSpec *volume.Spec) bool {
if volumeSpec == nil {
// we don't know if it's supported or not and let the attacher fail later in cases it's not supported
return true
}
if volumeSpec.Volume != nil {
// Check for volume types which are known to fail slow or cause trouble when trying to multi-attach
if volumeSpec.Volume.AzureDisk != nil ||
volumeSpec.Volume.Cinder != nil {
return false
}
}
// Only if this volume is a persistent volume, we have reliable information on whether it's allowed or not to
// multi-attach. We trust in the individual volume implementations to not allow unsupported access modes
if volumeSpec.PersistentVolume != nil {
// Check for persistent volume types which do not fail when trying to multi-attach
if len(volumeSpec.PersistentVolume.Spec.AccessModes) == 0 {
// No access mode specified so we don't know for sure. Let the attacher fail if needed
return true
}
// check if this volume is allowed to be attached to multiple PODs/nodes, if yes, return false
for _, accessMode := range volumeSpec.PersistentVolume.Spec.AccessModes {
if accessMode == v1.ReadWriteMany || accessMode == v1.ReadOnlyMany {
return true
}
}
return false
}
// we don't know if it's supported or not and let the attacher fail later in cases it's not supported
return true
}