From 37957e2a0dc3669c6084366ea3fb3877548a5345 Mon Sep 17 00:00:00 2001 From: Cheng Xing Date: Tue, 14 Jan 2020 10:56:58 -0800 Subject: [PATCH 1/2] Refactor operation keys for NestedPendingOperations --- .../nestedpendingoperations.go | 111 ++++++++---------- 1 file changed, 47 insertions(+), 64 deletions(-) diff --git a/pkg/volume/util/nestedpendingoperations/nestedpendingoperations.go b/pkg/volume/util/nestedpendingoperations/nestedpendingoperations.go index 193e362cd0b..cb03bebfeb7 100644 --- a/pkg/volume/util/nestedpendingoperations/nestedpendingoperations.go +++ b/pkg/volume/util/nestedpendingoperations/nestedpendingoperations.go @@ -88,8 +88,7 @@ type nestedPendingOperations struct { } type operation struct { - volumeName v1.UniqueVolumeName - podName types.UniquePodName + key operationKey operationName string operationPending bool expBackoff exponentialbackoff.ExponentialBackoff @@ -101,18 +100,19 @@ func (grm *nestedPendingOperations) Run( generatedOperations types.GeneratedOperations) error { grm.lock.Lock() defer grm.lock.Unlock() - opExists, previousOpIndex := grm.isOperationExists(volumeName, podName) + + opKey := operationKey{volumeName, podName} + + opExists, previousOpIndex := grm.isOperationExists(opKey) if opExists { previousOp := grm.operations[previousOpIndex] // Operation already exists if previousOp.operationPending { // Operation is pending - operationKey := getOperationKey(volumeName, podName) - return NewAlreadyExistsError(operationKey) + return NewAlreadyExistsError(opKey) } - operationKey := getOperationKey(volumeName, podName) - backOffErr := previousOp.expBackoff.SafeToRetry(operationKey) + backOffErr := previousOp.expBackoff.SafeToRetry(opKey.String()) if backOffErr != nil { if previousOp.operationName == generatedOperations.OperationName { return backOffErr @@ -124,15 +124,13 @@ func (grm *nestedPendingOperations) Run( // Update existing operation to mark as pending. grm.operations[previousOpIndex].operationPending = true - grm.operations[previousOpIndex].volumeName = volumeName - grm.operations[previousOpIndex].podName = podName + grm.operations[previousOpIndex].key = opKey } else { // Create a new operation grm.operations = append(grm.operations, operation{ + key: opKey, operationPending: true, - volumeName: volumeName, - podName: podName, operationName: generatedOperations.OperationName, expBackoff: exponentialbackoff.ExponentialBackoff{}, }) @@ -142,7 +140,7 @@ func (grm *nestedPendingOperations) Run( // Handle unhandled panics (very unlikely) defer k8sRuntime.HandleCrash() // Handle completion of and error, if any, from operationFunc() - defer grm.operationComplete(volumeName, podName, &detailedErr) + defer grm.operationComplete(opKey, &detailedErr) return generatedOperations.Run() }() @@ -156,7 +154,8 @@ func (grm *nestedPendingOperations) IsOperationPending( grm.lock.RLock() defer grm.lock.RUnlock() - exist, previousOpIndex := grm.isOperationExists(volumeName, podName) + opKey := operationKey{volumeName, podName} + exist, previousOpIndex := grm.isOperationExists(opKey) if exist && grm.operations[previousOpIndex].operationPending { return true } @@ -164,59 +163,49 @@ func (grm *nestedPendingOperations) IsOperationPending( } // This is an internal function and caller should acquire and release the lock -func (grm *nestedPendingOperations) isOperationExists( - volumeName v1.UniqueVolumeName, - podName types.UniquePodName) (bool, int) { +func (grm *nestedPendingOperations) isOperationExists(key operationKey) (bool, int) { // If volumeName is empty, operation can be executed concurrently - if volumeName == EmptyUniqueVolumeName { + if key.volumeName == EmptyUniqueVolumeName { return false, -1 } for previousOpIndex, previousOp := range grm.operations { - if previousOp.volumeName != volumeName { - // No match, keep searching - continue - } + volumeNameMatch := previousOp.key.volumeName == key.volumeName - if previousOp.podName != EmptyUniquePodName && - podName != EmptyUniquePodName && - previousOp.podName != podName { - // No match, keep searching - continue - } + podNameMatch := previousOp.key.podName == EmptyUniquePodName || + key.podName == EmptyUniquePodName || + previousOp.key.podName == key.podName - // Match - return true, previousOpIndex + + if volumeNameMatch && podNameMatch { + return true, previousOpIndex + } } + return false, -1 } -func (grm *nestedPendingOperations) getOperation( - volumeName v1.UniqueVolumeName, - podName types.UniquePodName) (uint, error) { +func (grm *nestedPendingOperations) getOperation(key operationKey) (uint, error) { // Assumes lock has been acquired by caller. for i, op := range grm.operations { - if op.volumeName == volumeName && - op.podName == podName { + if op.key.volumeName == key.volumeName && + op.key.podName == key.podName { return uint(i), nil } } - logOperationKey := getOperationKey(volumeName, podName) - return 0, fmt.Errorf("Operation %q not found", logOperationKey) + return 0, fmt.Errorf("Operation %q not found", key) } -func (grm *nestedPendingOperations) deleteOperation( +func (grm *nestedPendingOperations) deleteOperation(key operationKey) { // Assumes lock has been acquired by caller. - volumeName v1.UniqueVolumeName, - podName types.UniquePodName) { opIndex := -1 for i, op := range grm.operations { - if op.volumeName == volumeName && - op.podName == podName { + if op.key.volumeName == key.volumeName && + op.key.podName == key.podName { opIndex = i break } @@ -227,8 +216,7 @@ func (grm *nestedPendingOperations) deleteOperation( grm.operations = grm.operations[:len(grm.operations)-1] } -func (grm *nestedPendingOperations) operationComplete( - volumeName v1.UniqueVolumeName, podName types.UniquePodName, err *error) { +func (grm *nestedPendingOperations) operationComplete(key operationKey, err *error) { // Defer operations are executed in Last-In is First-Out order. In this case // the lock is acquired first when operationCompletes begins, and is // released when the method finishes, after the lock is released cond is @@ -239,24 +227,20 @@ func (grm *nestedPendingOperations) operationComplete( if *err == nil || !grm.exponentialBackOffOnError { // Operation completed without error, or exponentialBackOffOnError disabled - grm.deleteOperation(volumeName, podName) + grm.deleteOperation(key) if *err != nil { // Log error - logOperationKey := getOperationKey(volumeName, podName) - klog.Errorf("operation %s failed with: %v", - logOperationKey, - *err) + klog.Errorf("operation %s failed with: %v", key, *err) } return } // Operation completed with error and exponentialBackOffOnError Enabled - existingOpIndex, getOpErr := grm.getOperation(volumeName, podName) + existingOpIndex, getOpErr := grm.getOperation(key) if getOpErr != nil { // Failed to find existing operation - logOperationKey := getOperationKey(volumeName, podName) klog.Errorf("Operation %s completed. error: %v. exponentialBackOffOnError is enabled, but failed to get operation to update.", - logOperationKey, + key, *err) return } @@ -265,10 +249,8 @@ func (grm *nestedPendingOperations) operationComplete( grm.operations[existingOpIndex].operationPending = false // Log error - operationKey := - getOperationKey(volumeName, podName) klog.Errorf("%v", grm.operations[existingOpIndex].expBackoff. - GenerateNoRetriesPermittedMsg(operationKey)) + GenerateNoRetriesPermittedMsg(key.String())) } func (grm *nestedPendingOperations) Wait() { @@ -280,21 +262,22 @@ func (grm *nestedPendingOperations) Wait() { } } -func getOperationKey( - volumeName v1.UniqueVolumeName, podName types.UniquePodName) string { - podNameStr := "" - if podName != EmptyUniquePodName { - podNameStr = fmt.Sprintf(" (%q)", podName) - } +type operationKey struct { + volumeName v1.UniqueVolumeName + podName types.UniquePodName +} + +func (key operationKey) String() string { + podNameStr := fmt.Sprintf(" (%q)", key.podName) return fmt.Sprintf("%q%s", - volumeName, + key.volumeName, podNameStr) } // NewAlreadyExistsError returns a new instance of AlreadyExists error. -func NewAlreadyExistsError(operationKey string) error { - return alreadyExistsError{operationKey} +func NewAlreadyExistsError(key operationKey) error { + return alreadyExistsError{key} } // IsAlreadyExists returns true if an error returned from @@ -313,7 +296,7 @@ func IsAlreadyExists(err error) bool { // new operation can not be started because an operation with the same operation // name is already executing. type alreadyExistsError struct { - operationKey string + operationKey operationKey } var _ error = alreadyExistsError{} From c6a03fa5be5a3a8eaf1d3953918390230d8115ef Mon Sep 17 00:00:00 2001 From: Cheng Xing Date: Tue, 14 Jan 2020 11:00:31 -0800 Subject: [PATCH 2/2] Parallelize attach operations across different nodes for volumes that allow multi-attach --- .../volume/attachdetach/reconciler/BUILD | 2 +- .../attachdetach/reconciler/reconciler.go | 82 ++-- .../volumemanager/reconciler/reconciler.go | 4 +- pkg/volume/util/nestedpendingoperations/BUILD | 2 + .../nestedpendingoperations.go | 91 ++-- .../nestedpendingoperations_test.go | 415 +++++++++++++----- .../operationexecutor/operation_executor.go | 41 +- .../operation_executor_test.go | 207 ++++++++- pkg/volume/util/util.go | 41 ++ 9 files changed, 685 insertions(+), 200 deletions(-) diff --git a/pkg/controller/volume/attachdetach/reconciler/BUILD b/pkg/controller/volume/attachdetach/reconciler/BUILD index cb047fc9366..b3a94b397e3 100644 --- a/pkg/controller/volume/attachdetach/reconciler/BUILD +++ b/pkg/controller/volume/attachdetach/reconciler/BUILD @@ -16,7 +16,7 @@ go_library( "//pkg/controller/volume/attachdetach/statusupdater:go_default_library", "//pkg/kubelet/events:go_default_library", "//pkg/util/goroutinemap/exponentialbackoff:go_default_library", - "//pkg/volume:go_default_library", + "//pkg/volume/util:go_default_library", "//pkg/volume/util/operationexecutor:go_default_library", "//staging/src/k8s.io/api/core/v1:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/types:go_default_library", diff --git a/pkg/controller/volume/attachdetach/reconciler/reconciler.go b/pkg/controller/volume/attachdetach/reconciler/reconciler.go index a2ec9be2a93..06561e6fa0b 100644 --- a/pkg/controller/volume/attachdetach/reconciler/reconciler.go +++ b/pkg/controller/volume/attachdetach/reconciler/reconciler.go @@ -24,7 +24,7 @@ import ( "strings" "time" - "k8s.io/api/core/v1" + v1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/util/wait" "k8s.io/client-go/tools/record" @@ -34,7 +34,7 @@ import ( "k8s.io/kubernetes/pkg/controller/volume/attachdetach/statusupdater" kevents "k8s.io/kubernetes/pkg/kubelet/events" "k8s.io/kubernetes/pkg/util/goroutinemap/exponentialbackoff" - "k8s.io/kubernetes/pkg/volume" + "k8s.io/kubernetes/pkg/volume/util" "k8s.io/kubernetes/pkg/volume/util/operationexecutor" ) @@ -134,42 +134,6 @@ func (rc *reconciler) syncStates() { rc.attacherDetacher.VerifyVolumesAreAttached(volumesPerNode, rc.actualStateOfWorld) } -// isMultiAttachForbidden checks if attaching this volume to multiple nodes is definitely not allowed/possible. -// In its current form, this function can only reliably say for which volumes it's definitely forbidden. If it returns -// false, it is not guaranteed that multi-attach is actually supported by the volume type and we must rely on the -// attacher to fail fast in such cases. -// Please see https://github.com/kubernetes/kubernetes/issues/40669 and https://github.com/kubernetes/kubernetes/pull/40148#discussion_r98055047 -func (rc *reconciler) isMultiAttachForbidden(volumeSpec *volume.Spec) bool { - if volumeSpec.Volume != nil { - // Check for volume types which are known to fail slow or cause trouble when trying to multi-attach - if volumeSpec.Volume.AzureDisk != nil || - volumeSpec.Volume.Cinder != nil { - return true - } - } - - // Only if this volume is a persistent volume, we have reliable information on whether it's allowed or not to - // multi-attach. We trust in the individual volume implementations to not allow unsupported access modes - if volumeSpec.PersistentVolume != nil { - // Check for persistent volume types which do not fail when trying to multi-attach - if len(volumeSpec.PersistentVolume.Spec.AccessModes) == 0 { - // No access mode specified so we don't know for sure. Let the attacher fail if needed - return false - } - - // check if this volume is allowed to be attached to multiple PODs/nodes, if yes, return false - for _, accessMode := range volumeSpec.PersistentVolume.Spec.AccessModes { - if accessMode == v1.ReadWriteMany || accessMode == v1.ReadOnlyMany { - return false - } - } - return true - } - - // we don't know if it's supported or not and let the attacher fail later in cases it's not supported - return false -} - func (rc *reconciler) reconcile() { // Detaches are triggered before attaches so that volumes referenced by // pods that are rescheduled to a different node are detached first. @@ -182,9 +146,16 @@ func (rc *reconciler) reconcile() { // This check must be done before we do any other checks, as otherwise the other checks // may pass while at the same time the volume leaves the pending state, resulting in // double detach attempts - if rc.attacherDetacher.IsOperationPending(attachedVolume.VolumeName, "") { - klog.V(10).Infof("Operation for volume %q is already running. Can't start detach for %q", attachedVolume.VolumeName, attachedVolume.NodeName) - continue + if util.IsMultiAttachForbidden(attachedVolume.VolumeSpec) { + if rc.attacherDetacher.IsOperationPending(attachedVolume.VolumeName, "" /* podName */, "" /* nodeName */) { + klog.V(10).Infof("Operation for volume %q is already running in the cluster. Can't start detach for %q", attachedVolume.VolumeName, attachedVolume.NodeName) + continue + } + } else { + if rc.attacherDetacher.IsOperationPending(attachedVolume.VolumeName, "" /* podName */, attachedVolume.NodeName) { + klog.V(10).Infof("Operation for volume %q is already running for node %q. Can't start detach", attachedVolume.VolumeName, attachedVolume.NodeName) + continue + } } // Set the detach request time @@ -260,15 +231,17 @@ func (rc *reconciler) attachDesiredVolumes() { rc.actualStateOfWorld.ResetDetachRequestTime(volumeToAttach.VolumeName, volumeToAttach.NodeName) continue } - // Don't even try to start an operation if there is already one running - if rc.attacherDetacher.IsOperationPending(volumeToAttach.VolumeName, "") { - if klog.V(10) { - klog.Infof("Operation for volume %q is already running. Can't start attach for %q", volumeToAttach.VolumeName, volumeToAttach.NodeName) - } - continue - } - if rc.isMultiAttachForbidden(volumeToAttach.VolumeSpec) { + if util.IsMultiAttachForbidden(volumeToAttach.VolumeSpec) { + + // Don't even try to start an operation if there is already one running for the given volume + if rc.attacherDetacher.IsOperationPending(volumeToAttach.VolumeName, "" /* podName */, "" /* nodeName */) { + if klog.V(10) { + klog.Infof("Operation for volume %q is already running. Can't start attach for %q", volumeToAttach.VolumeName, volumeToAttach.NodeName) + } + continue + } + nodes := rc.actualStateOfWorld.GetNodesForAttachedVolume(volumeToAttach.VolumeName) if len(nodes) > 0 { if !volumeToAttach.MultiAttachErrorReported { @@ -277,6 +250,17 @@ func (rc *reconciler) attachDesiredVolumes() { } continue } + + } else { + + // Don't even try to start an operation if there is already one running for the given volume and node. + if rc.attacherDetacher.IsOperationPending(volumeToAttach.VolumeName, "" /* podName */, volumeToAttach.NodeName) { + if klog.V(10) { + klog.Infof("Operation for volume %q is already running for node %q. Can't start attach", volumeToAttach.VolumeName, volumeToAttach.NodeName) + } + continue + } + } // Volume/Node doesn't exist, spawn a goroutine to attach it diff --git a/pkg/kubelet/volumemanager/reconciler/reconciler.go b/pkg/kubelet/volumemanager/reconciler/reconciler.go index adb2e2038cd..46b1e88c38a 100644 --- a/pkg/kubelet/volumemanager/reconciler/reconciler.go +++ b/pkg/kubelet/volumemanager/reconciler/reconciler.go @@ -294,7 +294,7 @@ func (rc *reconciler) unmountDetachDevices() { for _, attachedVolume := range rc.actualStateOfWorld.GetUnmountedVolumes() { // Check IsOperationPending to avoid marking a volume as detached if it's in the process of mounting. if !rc.desiredStateOfWorld.VolumeExists(attachedVolume.VolumeName) && - !rc.operationExecutor.IsOperationPending(attachedVolume.VolumeName, nestedpendingoperations.EmptyUniquePodName) { + !rc.operationExecutor.IsOperationPending(attachedVolume.VolumeName, nestedpendingoperations.EmptyUniquePodName, nestedpendingoperations.EmptyNodeName) { if attachedVolume.DeviceMayBeMounted() { // Volume is globally mounted to device, unmount it klog.V(5).Infof(attachedVolume.GenerateMsgDetailed("Starting operationExecutor.UnmountDevice", "")) @@ -422,7 +422,7 @@ func (rc *reconciler) syncStates() { continue } // There is no pod that uses the volume. - if rc.operationExecutor.IsOperationPending(reconstructedVolume.volumeName, nestedpendingoperations.EmptyUniquePodName) { + if rc.operationExecutor.IsOperationPending(reconstructedVolume.volumeName, nestedpendingoperations.EmptyUniquePodName, nestedpendingoperations.EmptyNodeName) { klog.Warning("Volume is in pending operation, skip cleaning up mounts") } klog.V(2).Infof( diff --git a/pkg/volume/util/nestedpendingoperations/BUILD b/pkg/volume/util/nestedpendingoperations/BUILD index 5e940356c8d..5e287a0aba9 100644 --- a/pkg/volume/util/nestedpendingoperations/BUILD +++ b/pkg/volume/util/nestedpendingoperations/BUILD @@ -14,6 +14,7 @@ go_library( "//pkg/util/goroutinemap/exponentialbackoff:go_default_library", "//pkg/volume/util/types:go_default_library", "//staging/src/k8s.io/api/core/v1:go_default_library", + "//staging/src/k8s.io/apimachinery/pkg/types:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/util/runtime:go_default_library", "//vendor/k8s.io/klog:go_default_library", ], @@ -27,6 +28,7 @@ go_test( "//pkg/util/goroutinemap/exponentialbackoff:go_default_library", "//pkg/volume/util/types:go_default_library", "//staging/src/k8s.io/api/core/v1:go_default_library", + "//staging/src/k8s.io/apimachinery/pkg/types:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/util/wait:go_default_library", ], ) diff --git a/pkg/volume/util/nestedpendingoperations/nestedpendingoperations.go b/pkg/volume/util/nestedpendingoperations/nestedpendingoperations.go index cb03bebfeb7..e0813b4d04c 100644 --- a/pkg/volume/util/nestedpendingoperations/nestedpendingoperations.go +++ b/pkg/volume/util/nestedpendingoperations/nestedpendingoperations.go @@ -29,45 +29,73 @@ import ( "sync" "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/types" k8sRuntime "k8s.io/apimachinery/pkg/util/runtime" "k8s.io/klog" "k8s.io/kubernetes/pkg/util/goroutinemap/exponentialbackoff" - "k8s.io/kubernetes/pkg/volume/util/types" + volumetypes "k8s.io/kubernetes/pkg/volume/util/types" ) const ( // EmptyUniquePodName is a UniquePodName for empty string. - EmptyUniquePodName types.UniquePodName = types.UniquePodName("") + EmptyUniquePodName volumetypes.UniquePodName = volumetypes.UniquePodName("") // EmptyUniqueVolumeName is a UniqueVolumeName for empty string EmptyUniqueVolumeName v1.UniqueVolumeName = v1.UniqueVolumeName("") + + // EmptyNodeName is a NodeName for empty string + EmptyNodeName types.NodeName = types.NodeName("") ) // NestedPendingOperations defines the supported set of operations. type NestedPendingOperations interface { - // Run adds the concatenation of volumeName and podName to the list of - // running operations and spawns a new go routine to execute operationFunc. - // If an operation with the same volumeName, same or empty podName - // and same operationName exits, an AlreadyExists or ExponentialBackoff - // error is returned. If an operation with same volumeName and podName - // has ExponentialBackoff error but operationName is different, exponential - // backoff is reset and operation is allowed to proceed. - // This enables multiple operations to execute in parallel for the same - // volumeName as long as they have different podName. + + // Run adds the concatenation of volumeName and one of podName or nodeName to + // the list of running operations and spawns a new go routine to execute + // OperationFunc inside generatedOperations. + + // volumeName, podName, and nodeName collectively form the operation key. + // The following forms of operation keys are supported: + // - volumeName empty, podName empty, nodeName empty + // This key does not have any conflicting keys. + // - volumeName exists, podName empty, nodeName empty + // This key conflicts with all other keys with the same volumeName. + // - volumeName exists, podName exists, nodeName empty + // This key conflicts with: + // - the same volumeName and podName + // - the same volumeName, but no podName + // - volumeName exists, podName empty, nodeName exists + // This key conflicts with: + // - the same volumeName and nodeName + // - the same volumeName but no nodeName + + // If an operation with the same operationName and a conflicting key exists, + // an AlreadyExists or ExponentialBackoff error is returned. + // If an operation with a conflicting key has ExponentialBackoff error but + // operationName is different, exponential backoff is reset and operation is + // allowed to proceed. + // Once the operation is complete, the go routine is terminated and the - // concatenation of volumeName and podName is removed from the list of - // executing operations allowing a new operation to be started with the - // volumeName without error. - Run(volumeName v1.UniqueVolumeName, podName types.UniquePodName, generatedOperations types.GeneratedOperations) error + // concatenation of volumeName and (podName or nodeName) is removed from the + // list of executing operations allowing a new operation to be started with + // the volumeName without error. + Run( + volumeName v1.UniqueVolumeName, + podName volumetypes.UniquePodName, + nodeName types.NodeName, + generatedOperations volumetypes.GeneratedOperations) error // Wait blocks until all operations are completed. This is typically // necessary during tests - the test should wait until all operations finish // and evaluate results after that. Wait() - // IsOperationPending returns true if an operation for the given volumeName and podName is pending, - // otherwise it returns false - IsOperationPending(volumeName v1.UniqueVolumeName, podName types.UniquePodName) bool + // IsOperationPending returns true if an operation for the given volumeName + // and one of podName or nodeName is pending, otherwise it returns false + IsOperationPending( + volumeName v1.UniqueVolumeName, + podName volumetypes.UniquePodName, + nodeName types.NodeName) bool } // NewNestedPendingOperations returns a new instance of NestedPendingOperations. @@ -96,12 +124,13 @@ type operation struct { func (grm *nestedPendingOperations) Run( volumeName v1.UniqueVolumeName, - podName types.UniquePodName, - generatedOperations types.GeneratedOperations) error { + podName volumetypes.UniquePodName, + nodeName types.NodeName, + generatedOperations volumetypes.GeneratedOperations) error { grm.lock.Lock() defer grm.lock.Unlock() - opKey := operationKey{volumeName, podName} + opKey := operationKey{volumeName, podName, nodeName} opExists, previousOpIndex := grm.isOperationExists(opKey) if opExists { @@ -149,12 +178,13 @@ func (grm *nestedPendingOperations) Run( func (grm *nestedPendingOperations) IsOperationPending( volumeName v1.UniqueVolumeName, - podName types.UniquePodName) bool { + podName volumetypes.UniquePodName, + nodeName types.NodeName) bool { grm.lock.RLock() defer grm.lock.RUnlock() - opKey := operationKey{volumeName, podName} + opKey := operationKey{volumeName, podName, nodeName} exist, previousOpIndex := grm.isOperationExists(opKey) if exist && grm.operations[previousOpIndex].operationPending { return true @@ -177,8 +207,11 @@ func (grm *nestedPendingOperations) isOperationExists(key operationKey) (bool, i key.podName == EmptyUniquePodName || previousOp.key.podName == key.podName + nodeNameMatch := previousOp.key.nodeName == EmptyNodeName || + key.nodeName == EmptyNodeName || + previousOp.key.nodeName == key.nodeName - if volumeNameMatch && podNameMatch { + if volumeNameMatch && podNameMatch && nodeNameMatch { return true, previousOpIndex } } @@ -264,15 +297,15 @@ func (grm *nestedPendingOperations) Wait() { type operationKey struct { volumeName v1.UniqueVolumeName - podName types.UniquePodName + podName volumetypes.UniquePodName + nodeName types.NodeName } func (key operationKey) String() string { - podNameStr := fmt.Sprintf(" (%q)", key.podName) - - return fmt.Sprintf("%q%s", + return fmt.Sprintf("{volumeName=%q, podName=%q, nodeName=%q}", key.volumeName, - podNameStr) + key.podName, + key.nodeName) } // NewAlreadyExistsError returns a new instance of AlreadyExists error. diff --git a/pkg/volume/util/nestedpendingoperations/nestedpendingoperations_test.go b/pkg/volume/util/nestedpendingoperations/nestedpendingoperations_test.go index fe52c897798..d9301d64f4a 100644 --- a/pkg/volume/util/nestedpendingoperations/nestedpendingoperations_test.go +++ b/pkg/volume/util/nestedpendingoperations/nestedpendingoperations_test.go @@ -22,9 +22,10 @@ import ( "time" "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/util/wait" "k8s.io/kubernetes/pkg/util/goroutinemap/exponentialbackoff" - "k8s.io/kubernetes/pkg/volume/util/types" + volumetypes "k8s.io/kubernetes/pkg/volume/util/types" ) const ( @@ -44,22 +45,22 @@ const ( initialOperationWaitTimeLong time.Duration = 500 * time.Millisecond ) -func Test_NewGoRoutineMap_Positive_SingleOp(t *testing.T) { +func Test_NestedPendingOperations_Positive_SingleOp(t *testing.T) { // Arrange grm := NewNestedPendingOperations(false /* exponentialBackOffOnError */) volumeName := v1.UniqueVolumeName("volume-name") operation := func() (error, error) { return nil, nil } // Act - err := grm.Run(volumeName, "" /* operationSubName */, types.GeneratedOperations{OperationFunc: operation}) + err := grm.Run(volumeName, "" /* operationSubName */, "" /* nodeName */, volumetypes.GeneratedOperations{OperationFunc: operation}) // Assert if err != nil { - t.Fatalf("NewGoRoutine failed. Expected: Actual: <%v>", err) + t.Fatalf("NestedPendingOperations failed. Expected: Actual: <%v>", err) } } -func Test_NewGoRoutineMap_Positive_TwoOps(t *testing.T) { +func Test_NestedPendingOperations_Positive_TwoOps(t *testing.T) { // Arrange grm := NewNestedPendingOperations(false /* exponentialBackOffOnError */) volume1Name := v1.UniqueVolumeName("volume1-name") @@ -67,65 +68,65 @@ func Test_NewGoRoutineMap_Positive_TwoOps(t *testing.T) { operation := func() (error, error) { return nil, nil } // Act - err1 := grm.Run(volume1Name, "" /* operationSubName */, types.GeneratedOperations{OperationFunc: operation}) - err2 := grm.Run(volume2Name, "" /* operationSubName */, types.GeneratedOperations{OperationFunc: operation}) + err1 := grm.Run(volume1Name, "" /* operationSubName */, "" /* nodeName */, volumetypes.GeneratedOperations{OperationFunc: operation}) + err2 := grm.Run(volume2Name, "" /* operationSubName */, "" /* nodeName */, volumetypes.GeneratedOperations{OperationFunc: operation}) // Assert if err1 != nil { - t.Fatalf("NewGoRoutine %q failed. Expected: Actual: <%v>", volume1Name, err1) + t.Fatalf("NestedPendingOperations %q failed. Expected: Actual: <%v>", volume1Name, err1) } if err2 != nil { - t.Fatalf("NewGoRoutine %q failed. Expected: Actual: <%v>", volume2Name, err2) + t.Fatalf("NestedPendingOperations %q failed. Expected: Actual: <%v>", volume2Name, err2) } } -func Test_NewGoRoutineMap_Positive_TwoSubOps(t *testing.T) { +func Test_NestedPendingOperations_Positive_TwoSubOps(t *testing.T) { // Arrange grm := NewNestedPendingOperations(false /* exponentialBackOffOnError */) volumeName := v1.UniqueVolumeName("volume-name") - operation1PodName := types.UniquePodName("operation1-podname") - operation2PodName := types.UniquePodName("operation2-podname") + operation1PodName := volumetypes.UniquePodName("operation1-podname") + operation2PodName := volumetypes.UniquePodName("operation2-podname") operation := func() (error, error) { return nil, nil } // Act - err1 := grm.Run(volumeName, operation1PodName, types.GeneratedOperations{OperationFunc: operation}) - err2 := grm.Run(volumeName, operation2PodName, types.GeneratedOperations{OperationFunc: operation}) + err1 := grm.Run(volumeName, operation1PodName, "" /* nodeName */, volumetypes.GeneratedOperations{OperationFunc: operation}) + err2 := grm.Run(volumeName, operation2PodName, "" /* nodeName */, volumetypes.GeneratedOperations{OperationFunc: operation}) // Assert if err1 != nil { - t.Fatalf("NewGoRoutine %q failed. Expected: Actual: <%v>", operation1PodName, err1) + t.Fatalf("NestedPendingOperations %q failed. Expected: Actual: <%v>", operation1PodName, err1) } if err2 != nil { - t.Fatalf("NewGoRoutine %q failed. Expected: Actual: <%v>", operation2PodName, err2) + t.Fatalf("NestedPendingOperations %q failed. Expected: Actual: <%v>", operation2PodName, err2) } } -func Test_NewGoRoutineMap_Positive_SingleOpWithExpBackoff(t *testing.T) { +func Test_NestedPendingOperations_Positive_SingleOpWithExpBackoff(t *testing.T) { // Arrange grm := NewNestedPendingOperations(true /* exponentialBackOffOnError */) volumeName := v1.UniqueVolumeName("volume-name") operation := func() (error, error) { return nil, nil } // Act - err := grm.Run(volumeName, "" /* operationSubName */, types.GeneratedOperations{OperationFunc: operation}) + err := grm.Run(volumeName, "" /* operationSubName */, "" /* nodeName */, volumetypes.GeneratedOperations{OperationFunc: operation}) // Assert if err != nil { - t.Fatalf("NewGoRoutine failed. Expected: Actual: <%v>", err) + t.Fatalf("NestedPendingOperations failed. Expected: Actual: <%v>", err) } } -func Test_NewGoRoutineMap_Positive_SecondOpAfterFirstCompletes(t *testing.T) { +func Test_NestedPendingOperations_Positive_SecondOpAfterFirstCompletes(t *testing.T) { // Arrange grm := NewNestedPendingOperations(false /* exponentialBackOffOnError */) volumeName := v1.UniqueVolumeName("volume-name") operation1DoneCh := make(chan interface{}, 0 /* bufferSize */) operation1 := generateCallbackFunc(operation1DoneCh) - err1 := grm.Run(volumeName, "" /* operationSubName */, types.GeneratedOperations{OperationFunc: operation1}) + err1 := grm.Run(volumeName, "" /* operationSubName */, "" /* nodeName */, volumetypes.GeneratedOperations{OperationFunc: operation1}) if err1 != nil { - t.Fatalf("NewGoRoutine failed. Expected: Actual: <%v>", err1) + t.Fatalf("NestedPendingOperations failed. Expected: Actual: <%v>", err1) } operation2 := generateNoopFunc() <-operation1DoneCh // Force operation1 to complete @@ -134,9 +135,9 @@ func Test_NewGoRoutineMap_Positive_SecondOpAfterFirstCompletes(t *testing.T) { err2 := retryWithExponentialBackOff( time.Duration(initialOperationWaitTimeShort), func() (bool, error) { - err := grm.Run(volumeName, "" /* operationSubName */, types.GeneratedOperations{OperationFunc: operation2}) + err := grm.Run(volumeName, "" /* operationSubName */, "" /* nodeName */, volumetypes.GeneratedOperations{OperationFunc: operation2}) if err != nil { - t.Logf("Warning: NewGoRoutine failed with %v. Will retry.", err) + t.Logf("Warning: NestedPendingOperations failed with %v. Will retry.", err) return false, nil } return true, nil @@ -145,19 +146,19 @@ func Test_NewGoRoutineMap_Positive_SecondOpAfterFirstCompletes(t *testing.T) { // Assert if err2 != nil { - t.Fatalf("NewGoRoutine failed. Expected: Actual: <%v>", err2) + t.Fatalf("NestedPendingOperations failed. Expected: Actual: <%v>", err2) } } -func Test_NewGoRoutineMap_Positive_SecondOpAfterFirstCompletesWithExpBackoff(t *testing.T) { +func Test_NestedPendingOperations_Positive_SecondOpAfterFirstCompletesWithExpBackoff(t *testing.T) { // Arrange grm := NewNestedPendingOperations(true /* exponentialBackOffOnError */) volumeName := v1.UniqueVolumeName("volume-name") operation1DoneCh := make(chan interface{}, 0 /* bufferSize */) operation1 := generateCallbackFunc(operation1DoneCh) - err1 := grm.Run(volumeName, "" /* operationSubName */, types.GeneratedOperations{OperationFunc: operation1}) + err1 := grm.Run(volumeName, "" /* operationSubName */, "" /* nodeName */, volumetypes.GeneratedOperations{OperationFunc: operation1}) if err1 != nil { - t.Fatalf("NewGoRoutine failed. Expected: Actual: <%v>", err1) + t.Fatalf("NestedPendingOperations failed. Expected: Actual: <%v>", err1) } operation2 := generateNoopFunc() <-operation1DoneCh // Force operation1 to complete @@ -166,9 +167,9 @@ func Test_NewGoRoutineMap_Positive_SecondOpAfterFirstCompletesWithExpBackoff(t * err2 := retryWithExponentialBackOff( time.Duration(initialOperationWaitTimeShort), func() (bool, error) { - err := grm.Run(volumeName, "" /* operationSubName */, types.GeneratedOperations{OperationFunc: operation2}) + err := grm.Run(volumeName, "" /* operationSubName */, "" /* nodeName */, volumetypes.GeneratedOperations{OperationFunc: operation2}) if err != nil { - t.Logf("Warning: NewGoRoutine failed with %v. Will retry.", err) + t.Logf("Warning: NestedPendingOperations failed with %v. Will retry.", err) return false, nil } return true, nil @@ -177,18 +178,18 @@ func Test_NewGoRoutineMap_Positive_SecondOpAfterFirstCompletesWithExpBackoff(t * // Assert if err2 != nil { - t.Fatalf("NewGoRoutine failed. Expected: Actual: <%v>", err2) + t.Fatalf("NestedPendingOperations failed. Expected: Actual: <%v>", err2) } } -func Test_NewGoRoutineMap_Positive_SecondOpAfterFirstPanics(t *testing.T) { +func Test_NestedPendingOperations_Positive_SecondOpAfterFirstPanics(t *testing.T) { // Arrange grm := NewNestedPendingOperations(false /* exponentialBackOffOnError */) volumeName := v1.UniqueVolumeName("volume-name") operation1 := generatePanicFunc() - err1 := grm.Run(volumeName, "" /* operationSubName */, types.GeneratedOperations{OperationFunc: operation1}) + err1 := grm.Run(volumeName, "" /* operationSubName */, "" /* nodeName */, volumetypes.GeneratedOperations{OperationFunc: operation1}) if err1 != nil { - t.Fatalf("NewGoRoutine failed. Expected: Actual: <%v>", err1) + t.Fatalf("NestedPendingOperations failed. Expected: Actual: <%v>", err1) } operation2 := generateNoopFunc() @@ -196,9 +197,9 @@ func Test_NewGoRoutineMap_Positive_SecondOpAfterFirstPanics(t *testing.T) { err2 := retryWithExponentialBackOff( time.Duration(initialOperationWaitTimeShort), func() (bool, error) { - err := grm.Run(volumeName, "" /* operationSubName */, types.GeneratedOperations{OperationFunc: operation2}) + err := grm.Run(volumeName, "" /* operationSubName */, "" /* nodeName */, volumetypes.GeneratedOperations{OperationFunc: operation2}) if err != nil { - t.Logf("Warning: NewGoRoutine failed with %v. Will retry.", err) + t.Logf("Warning: NestedPendingOperations failed with %v. Will retry.", err) return false, nil } return true, nil @@ -207,18 +208,18 @@ func Test_NewGoRoutineMap_Positive_SecondOpAfterFirstPanics(t *testing.T) { // Assert if err2 != nil { - t.Fatalf("NewGoRoutine failed. Expected: Actual: <%v>", err2) + t.Fatalf("NestedPendingOperations failed. Expected: Actual: <%v>", err2) } } -func Test_NewGoRoutineMap_Positive_SecondOpAfterFirstPanicsWithExpBackoff(t *testing.T) { +func Test_NestedPendingOperations_Positive_SecondOpAfterFirstPanicsWithExpBackoff(t *testing.T) { // Arrange grm := NewNestedPendingOperations(true /* exponentialBackOffOnError */) volumeName := v1.UniqueVolumeName("volume-name") operation1 := generatePanicFunc() - err1 := grm.Run(volumeName, "" /* operationSubName */, types.GeneratedOperations{OperationFunc: operation1}) + err1 := grm.Run(volumeName, "" /* operationSubName */, "" /* nodeName */, volumetypes.GeneratedOperations{OperationFunc: operation1}) if err1 != nil { - t.Fatalf("NewGoRoutine failed. Expected: Actual: <%v>", err1) + t.Fatalf("NestedPendingOperations failed. Expected: Actual: <%v>", err1) } operation2 := generateNoopFunc() @@ -226,9 +227,9 @@ func Test_NewGoRoutineMap_Positive_SecondOpAfterFirstPanicsWithExpBackoff(t *tes err2 := retryWithExponentialBackOff( time.Duration(initialOperationWaitTimeLong), // Longer duration to accommodate for backoff func() (bool, error) { - err := grm.Run(volumeName, "" /* operationSubName */, types.GeneratedOperations{OperationFunc: operation2}) + err := grm.Run(volumeName, "" /* operationSubName */, "" /* nodeName */, volumetypes.GeneratedOperations{OperationFunc: operation2}) if err != nil { - t.Logf("Warning: NewGoRoutine failed with %v. Will retry.", err) + t.Logf("Warning: NestedPendingOperations failed with %v. Will retry.", err) return false, nil } return true, nil @@ -237,43 +238,43 @@ func Test_NewGoRoutineMap_Positive_SecondOpAfterFirstPanicsWithExpBackoff(t *tes // Assert if err2 != nil { - t.Fatalf("NewGoRoutine failed. Expected: Actual: <%v>", err2) + t.Fatalf("NestedPendingOperations failed. Expected: Actual: <%v>", err2) } } -func Test_NewGoRoutineMap_Negative_SecondOpBeforeFirstCompletes(t *testing.T) { +func Test_NestedPendingOperations_Negative_SecondOpBeforeFirstCompletes(t *testing.T) { // Arrange grm := NewNestedPendingOperations(false /* exponentialBackOffOnError */) volumeName := v1.UniqueVolumeName("volume-name") operation1DoneCh := make(chan interface{}, 0 /* bufferSize */) operation1 := generateWaitFunc(operation1DoneCh) - err1 := grm.Run(volumeName, "" /* operationSubName */, types.GeneratedOperations{OperationFunc: operation1}) + err1 := grm.Run(volumeName, "" /* operationSubName */, "" /* nodeName */, volumetypes.GeneratedOperations{OperationFunc: operation1}) if err1 != nil { - t.Fatalf("NewGoRoutine failed. Expected: Actual: <%v>", err1) + t.Fatalf("NestedPendingOperations failed. Expected: Actual: <%v>", err1) } operation2 := generateNoopFunc() // Act - err2 := grm.Run(volumeName, "" /* operationSubName */, types.GeneratedOperations{OperationFunc: operation2}) + err2 := grm.Run(volumeName, "" /* operationSubName */, "" /* nodeName */, volumetypes.GeneratedOperations{OperationFunc: operation2}) // Assert if err2 == nil { - t.Fatalf("NewGoRoutine did not fail. Expected: Actual: ", volumeName) + t.Fatalf("NestedPendingOperations did not fail. Expected: Actual: ", volumeName) } if !IsAlreadyExists(err2) { - t.Fatalf("NewGoRoutine did not return alreadyExistsError, got: %v", err2) + t.Fatalf("NestedPendingOperations did not return alreadyExistsError, got: %v", err2) } } -func Test_NewGoRoutineMap_Negative_SecondThirdOpWithDifferentNames(t *testing.T) { +func Test_NestedPendingOperations_Negative_SecondThirdOpWithDifferentNames(t *testing.T) { // Arrange grm := NewNestedPendingOperations(true /* exponentialBackOffOnError */) volumeName := v1.UniqueVolumeName("volume-name") op1Name := "mount_volume" operation1 := generateErrorFunc() - err1 := grm.Run(volumeName, "" /* operationSubName */, types.GeneratedOperations{OperationFunc: operation1, OperationName: op1Name}) + err1 := grm.Run(volumeName, "" /* operationSubName */, "" /* nodeName */, volumetypes.GeneratedOperations{OperationFunc: operation1, OperationName: op1Name}) if err1 != nil { - t.Fatalf("NewGoRoutine failed. Expected: Actual: <%v>", err1) + t.Fatalf("NestedPendingOperations failed. Expected: Actual: <%v>", err1) } // Shorter than exponential backoff period, so as to trigger exponential backoff error on second // operation. @@ -283,7 +284,8 @@ func Test_NewGoRoutineMap_Negative_SecondThirdOpWithDifferentNames(t *testing.T) func() (bool, error) { err := grm.Run(volumeName, "", /* operationSubName */ - types.GeneratedOperations{OperationFunc: operation2, OperationName: op1Name}) + "", /* nodeName */ + volumetypes.GeneratedOperations{OperationFunc: operation2, OperationName: op1Name}) if exponentialbackoff.IsExponentialBackoff(err) { return true, nil @@ -294,114 +296,114 @@ func Test_NewGoRoutineMap_Negative_SecondThirdOpWithDifferentNames(t *testing.T) // Assert if err2 != nil { - t.Fatalf("Expected NewGoRoutine to fail with exponential backoff for operationKey : %s and operationName : %s", volumeName, op1Name) + t.Fatalf("Expected NestedPendingOperations to fail with exponential backoff for operationKey : %s and operationName : %s", volumeName, op1Name) } operation3 := generateNoopFunc() op3Name := "unmount_volume" // Act - err3 := grm.Run(volumeName, "" /*pod name*/, types.GeneratedOperations{OperationFunc: operation3, OperationName: op3Name}) + err3 := grm.Run(volumeName, "" /*pod name*/, "" /* nodeName */, volumetypes.GeneratedOperations{OperationFunc: operation3, OperationName: op3Name}) if err3 != nil { - t.Fatalf("NewGoRoutine failed. Expected Actual: <%v>", err3) + t.Fatalf("NestedPendingOperations failed. Expected Actual: <%v>", err3) } } -func Test_NewGoRoutineMap_Negative_SecondSubOpBeforeFirstCompletes2(t *testing.T) { +func Test_NestedPendingOperations_Negative_SecondSubOpBeforeFirstCompletes2(t *testing.T) { // Arrange grm := NewNestedPendingOperations(false /* exponentialBackOffOnError */) volumeName := v1.UniqueVolumeName("volume-name") - operationPodName := types.UniquePodName("operation-podname") + operationPodName := volumetypes.UniquePodName("operation-podname") operation1DoneCh := make(chan interface{}, 0 /* bufferSize */) operation1 := generateWaitFunc(operation1DoneCh) - err1 := grm.Run(volumeName, operationPodName, types.GeneratedOperations{OperationFunc: operation1}) + err1 := grm.Run(volumeName, operationPodName, "" /* nodeName */, volumetypes.GeneratedOperations{OperationFunc: operation1}) if err1 != nil { - t.Fatalf("NewGoRoutine failed. Expected: Actual: <%v>", err1) + t.Fatalf("NestedPendingOperations failed. Expected: Actual: <%v>", err1) } operation2 := generateNoopFunc() // Act - err2 := grm.Run(volumeName, operationPodName, types.GeneratedOperations{OperationFunc: operation2}) + err2 := grm.Run(volumeName, operationPodName, "" /* nodeName */, volumetypes.GeneratedOperations{OperationFunc: operation2}) // Assert if err2 == nil { - t.Fatalf("NewGoRoutine did not fail. Expected: Actual: ", volumeName) + t.Fatalf("NestedPendingOperations did not fail. Expected: Actual: ", volumeName) } if !IsAlreadyExists(err2) { - t.Fatalf("NewGoRoutine did not return alreadyExistsError, got: %v", err2) + t.Fatalf("NestedPendingOperations did not return alreadyExistsError, got: %v", err2) } } -func Test_NewGoRoutineMap_Negative_SecondSubOpBeforeFirstCompletes(t *testing.T) { +func Test_NestedPendingOperations_Negative_SecondSubOpBeforeFirstCompletes(t *testing.T) { // Arrange grm := NewNestedPendingOperations(false /* exponentialBackOffOnError */) volumeName := v1.UniqueVolumeName("volume-name") - operationPodName := types.UniquePodName("operation-podname") + operationPodName := volumetypes.UniquePodName("operation-podname") operation1DoneCh := make(chan interface{}, 0 /* bufferSize */) operation1 := generateWaitFunc(operation1DoneCh) - err1 := grm.Run(volumeName, operationPodName, types.GeneratedOperations{OperationFunc: operation1}) + err1 := grm.Run(volumeName, operationPodName, "" /* nodeName */, volumetypes.GeneratedOperations{OperationFunc: operation1}) if err1 != nil { - t.Fatalf("NewGoRoutine failed. Expected: Actual: <%v>", err1) + t.Fatalf("NestedPendingOperations failed. Expected: Actual: <%v>", err1) } operation2 := generateNoopFunc() // Act - err2 := grm.Run(volumeName, operationPodName, types.GeneratedOperations{OperationFunc: operation2}) + err2 := grm.Run(volumeName, operationPodName, "" /* nodeName */, volumetypes.GeneratedOperations{OperationFunc: operation2}) // Assert if err2 == nil { - t.Fatalf("NewGoRoutine did not fail. Expected: Actual: ", volumeName) + t.Fatalf("NestedPendingOperations did not fail. Expected: Actual: ", volumeName) } if !IsAlreadyExists(err2) { - t.Fatalf("NewGoRoutine did not return alreadyExistsError, got: %v", err2) + t.Fatalf("NestedPendingOperations did not return alreadyExistsError, got: %v", err2) } } -func Test_NewGoRoutineMap_Negative_SecondOpBeforeFirstCompletesWithExpBackoff(t *testing.T) { +func Test_NestedPendingOperations_Negative_SecondOpBeforeFirstCompletesWithExpBackoff(t *testing.T) { // Arrange grm := NewNestedPendingOperations(true /* exponentialBackOffOnError */) volumeName := v1.UniqueVolumeName("volume-name") operation1DoneCh := make(chan interface{}, 0 /* bufferSize */) operation1 := generateWaitFunc(operation1DoneCh) - err1 := grm.Run(volumeName, "" /* operationSubName */, types.GeneratedOperations{OperationFunc: operation1}) + err1 := grm.Run(volumeName, "" /* operationSubName */, "" /* nodeName */, volumetypes.GeneratedOperations{OperationFunc: operation1}) if err1 != nil { - t.Fatalf("NewGoRoutine failed. Expected: Actual: <%v>", err1) + t.Fatalf("NestedPendingOperations failed. Expected: Actual: <%v>", err1) } operation2 := generateNoopFunc() // Act - err2 := grm.Run(volumeName, "" /* operationSubName */, types.GeneratedOperations{OperationFunc: operation2}) + err2 := grm.Run(volumeName, "" /* operationSubName */, "" /* nodeName */, volumetypes.GeneratedOperations{OperationFunc: operation2}) // Assert if err2 == nil { - t.Fatalf("NewGoRoutine did not fail. Expected: Actual: ", volumeName) + t.Fatalf("NestedPendingOperations did not fail. Expected: Actual: ", volumeName) } if !IsAlreadyExists(err2) { - t.Fatalf("NewGoRoutine did not return alreadyExistsError, got: %v", err2) + t.Fatalf("NestedPendingOperations did not return alreadyExistsError, got: %v", err2) } } -func Test_NewGoRoutineMap_Positive_ThirdOpAfterFirstCompletes(t *testing.T) { +func Test_NestedPendingOperations_Positive_ThirdOpAfterFirstCompletes(t *testing.T) { // Arrange grm := NewNestedPendingOperations(false /* exponentialBackOffOnError */) volumeName := v1.UniqueVolumeName("volume-name") operation1DoneCh := make(chan interface{}, 0 /* bufferSize */) operation1 := generateWaitFunc(operation1DoneCh) - err1 := grm.Run(volumeName, "" /* operationSubName */, types.GeneratedOperations{OperationFunc: operation1}) + err1 := grm.Run(volumeName, "" /* operationSubName */, "" /* nodeName */, volumetypes.GeneratedOperations{OperationFunc: operation1}) if err1 != nil { - t.Fatalf("NewGoRoutine failed. Expected: Actual: <%v>", err1) + t.Fatalf("NestedPendingOperations failed. Expected: Actual: <%v>", err1) } operation2 := generateNoopFunc() operation3 := generateNoopFunc() // Act - err2 := grm.Run(volumeName, "" /* operationSubName */, types.GeneratedOperations{OperationFunc: operation2}) + err2 := grm.Run(volumeName, "" /* operationSubName */, "" /* nodeName */, volumetypes.GeneratedOperations{OperationFunc: operation2}) // Assert if err2 == nil { - t.Fatalf("NewGoRoutine did not fail. Expected: Actual: ", volumeName) + t.Fatalf("NestedPendingOperations did not fail. Expected: Actual: ", volumeName) } if !IsAlreadyExists(err2) { - t.Fatalf("NewGoRoutine did not return alreadyExistsError, got: %v", err2) + t.Fatalf("NestedPendingOperations did not return alreadyExistsError, got: %v", err2) } // Act @@ -409,9 +411,9 @@ func Test_NewGoRoutineMap_Positive_ThirdOpAfterFirstCompletes(t *testing.T) { err3 := retryWithExponentialBackOff( time.Duration(initialOperationWaitTimeShort), func() (bool, error) { - err := grm.Run(volumeName, "" /* operationSubName */, types.GeneratedOperations{OperationFunc: operation3}) + err := grm.Run(volumeName, "" /* operationSubName */, "" /* nodeName */, volumetypes.GeneratedOperations{OperationFunc: operation3}) if err != nil { - t.Logf("Warning: NewGoRoutine failed with %v. Will retry.", err) + t.Logf("Warning: NestedPendingOperations failed with %v. Will retry.", err) return false, nil } return true, nil @@ -420,32 +422,32 @@ func Test_NewGoRoutineMap_Positive_ThirdOpAfterFirstCompletes(t *testing.T) { // Assert if err3 != nil { - t.Fatalf("NewGoRoutine failed. Expected: Actual: <%v>", err3) + t.Fatalf("NestedPendingOperations failed. Expected: Actual: <%v>", err3) } } -func Test_NewGoRoutineMap_Positive_ThirdOpAfterFirstCompletesWithExpBackoff(t *testing.T) { +func Test_NestedPendingOperations_Positive_ThirdOpAfterFirstCompletesWithExpBackoff(t *testing.T) { // Arrange grm := NewNestedPendingOperations(true /* exponentialBackOffOnError */) volumeName := v1.UniqueVolumeName("volume-name") operation1DoneCh := make(chan interface{}, 0 /* bufferSize */) operation1 := generateWaitFunc(operation1DoneCh) - err1 := grm.Run(volumeName, "" /* operationSubName */, types.GeneratedOperations{OperationFunc: operation1}) + err1 := grm.Run(volumeName, "" /* operationSubName */, "" /* nodeName */, volumetypes.GeneratedOperations{OperationFunc: operation1}) if err1 != nil { - t.Fatalf("NewGoRoutine failed. Expected: Actual: <%v>", err1) + t.Fatalf("NestedPendingOperations failed. Expected: Actual: <%v>", err1) } operation2 := generateNoopFunc() operation3 := generateNoopFunc() // Act - err2 := grm.Run(volumeName, "" /* operationSubName */, types.GeneratedOperations{OperationFunc: operation2}) + err2 := grm.Run(volumeName, "" /* operationSubName */, "" /* nodeName */, volumetypes.GeneratedOperations{OperationFunc: operation2}) // Assert if err2 == nil { - t.Fatalf("NewGoRoutine did not fail. Expected: Actual: ", volumeName) + t.Fatalf("NestedPendingOperations did not fail. Expected: Actual: ", volumeName) } if !IsAlreadyExists(err2) { - t.Fatalf("NewGoRoutine did not return alreadyExistsError, got: %v", err2) + t.Fatalf("NestedPendingOperations did not return alreadyExistsError, got: %v", err2) } // Act @@ -453,9 +455,9 @@ func Test_NewGoRoutineMap_Positive_ThirdOpAfterFirstCompletesWithExpBackoff(t *t err3 := retryWithExponentialBackOff( time.Duration(initialOperationWaitTimeShort), func() (bool, error) { - err := grm.Run(volumeName, "" /* operationSubName */, types.GeneratedOperations{OperationFunc: operation3}) + err := grm.Run(volumeName, "" /* operationSubName */, "" /* nodeName */, volumetypes.GeneratedOperations{OperationFunc: operation3}) if err != nil { - t.Logf("Warning: NewGoRoutine failed with %v. Will retry.", err) + t.Logf("Warning: NestedPendingOperations failed with %v. Will retry.", err) return false, nil } return true, nil @@ -464,11 +466,11 @@ func Test_NewGoRoutineMap_Positive_ThirdOpAfterFirstCompletesWithExpBackoff(t *t // Assert if err3 != nil { - t.Fatalf("NewGoRoutine failed. Expected: Actual: <%v>", err3) + t.Fatalf("NestedPendingOperations failed. Expected: Actual: <%v>", err3) } } -func Test_NewGoRoutineMap_Positive_WaitEmpty(t *testing.T) { +func Test_NestedPendingOperations_Positive_WaitEmpty(t *testing.T) { // Test than Wait() on empty GoRoutineMap always succeeds without blocking // Arrange grm := NewNestedPendingOperations(false /* exponentialBackOffOnError */) @@ -487,7 +489,7 @@ func Test_NewGoRoutineMap_Positive_WaitEmpty(t *testing.T) { } } -func Test_NewGoRoutineMap_Positive_WaitEmptyWithExpBackoff(t *testing.T) { +func Test_NestedPendingOperations_Positive_WaitEmptyWithExpBackoff(t *testing.T) { // Test than Wait() on empty GoRoutineMap always succeeds without blocking // Arrange grm := NewNestedPendingOperations(true /* exponentialBackOffOnError */) @@ -506,16 +508,16 @@ func Test_NewGoRoutineMap_Positive_WaitEmptyWithExpBackoff(t *testing.T) { } } -func Test_NewGoRoutineMap_Positive_Wait(t *testing.T) { +func Test_NestedPendingOperations_Positive_Wait(t *testing.T) { // Test that Wait() really blocks until the last operation succeeds // Arrange grm := NewNestedPendingOperations(false /* exponentialBackOffOnError */) volumeName := v1.UniqueVolumeName("volume-name") operation1DoneCh := make(chan interface{}, 0 /* bufferSize */) operation1 := generateWaitFunc(operation1DoneCh) - err := grm.Run(volumeName, "" /* operationSubName */, types.GeneratedOperations{OperationFunc: operation1}) + err := grm.Run(volumeName, "" /* operationSubName */, "" /* nodeName */, volumetypes.GeneratedOperations{OperationFunc: operation1}) if err != nil { - t.Fatalf("NewGoRoutine failed. Expected: Actual: <%v>", err) + t.Fatalf("NestedPendingOperations failed. Expected: Actual: <%v>", err) } // Act @@ -535,16 +537,16 @@ func Test_NewGoRoutineMap_Positive_Wait(t *testing.T) { } } -func Test_NewGoRoutineMap_Positive_WaitWithExpBackoff(t *testing.T) { +func Test_NestedPendingOperations_Positive_WaitWithExpBackoff(t *testing.T) { // Test that Wait() really blocks until the last operation succeeds // Arrange grm := NewNestedPendingOperations(true /* exponentialBackOffOnError */) volumeName := v1.UniqueVolumeName("volume-name") operation1DoneCh := make(chan interface{}, 0 /* bufferSize */) operation1 := generateWaitFunc(operation1DoneCh) - err := grm.Run(volumeName, "" /* operationSubName */, types.GeneratedOperations{OperationFunc: operation1}) + err := grm.Run(volumeName, "" /* operationSubName */, "" /* nodeName */, volumetypes.GeneratedOperations{OperationFunc: operation1}) if err != nil { - t.Fatalf("NewGoRoutine failed. Expected: Actual: <%v>", err) + t.Fatalf("NestedPendingOperations failed. Expected: Actual: <%v>", err) } // Act @@ -564,6 +566,215 @@ func Test_NewGoRoutineMap_Positive_WaitWithExpBackoff(t *testing.T) { } } +/* Concurrent operations tests */ + +// Covered cases: +// FIRST OP | SECOND OP | RESULT +// None | None | Positive +// None | Volume | Positive +// None | Volume Pod | Positive +// None | Volume Node | Positive +// Volume | None | Positive +// Volume | Volume | Negative (covered in Test_NestedPendingOperations_Negative_SecondOpBeforeFirstCompletes above) +// Volume | Volume Pod | Negative +// Volume | Volume Node | Negative +// Volume Pod | None | Positive +// Volume Pod | Volume | Negative +// Volume Pod | Volume Pod | Negative (covered in Test_NestedPendingOperations_Negative_SecondSubOpBeforeFirstCompletes above) +// Volume Node | None | Positive +// Volume Node | Volume | Negative +// Volume Node | Volume Node | Negative + +// These cases are not covered because they will never occur within the same +// binary, so either result works. +// Volume Pod | Volume Node +// Volume Node | Volume Pod + +func Test_NestedPendingOperations_Positive_SecondOpBeforeFirstCompletes_FirstNone_SecondNone(t *testing.T) { + testConcurrentOperationsPositive(t, + "", /* volumeName1 */ + "", /* podName1 */ + "", /* nodeName1 */ + "", /* volumeName1 */ + "", /* podName2 */ + "" /* nodeName2 */) +} + +func Test_NestedPendingOperations_Positive_SecondOpBeforeFirstCompletes_FirstNone_SecondVolume(t *testing.T) { + testConcurrentOperationsPositive(t, + "", /* volumeName1 */ + "", /* podName1 */ + "", /* nodeName1 */ + v1.UniqueVolumeName("volume-name"), + "", /* podName2 */ + "" /* nodeName2 */) +} + +func Test_NestedPendingOperations_Positive_SecondOpBeforeFirstCompletes_FirstNone_SecondVolumePod(t *testing.T) { + testConcurrentOperationsPositive(t, + "", /* volumeName1 */ + "", /* podName1 */ + "", /* nodeName1 */ + v1.UniqueVolumeName("volume-name"), + volumetypes.UniquePodName("operation-podname"), + "" /* nodeName2 */) +} + +func Test_NestedPendingOperations_Positive_SecondOpBeforeFirstCompletes_FirstNone_SecondVolumeNode(t *testing.T) { + testConcurrentOperationsPositive(t, + "", /* volumeName1 */ + "", /* podName1 */ + "", /* nodeName1 */ + v1.UniqueVolumeName("volume-name"), + "", /* podName2 */ + types.NodeName("operation-nodename")) +} + +func Test_NestedPendingOperations_Positive_SecondOpBeforeFirstCompletes_FirstVolume_SecondNone(t *testing.T) { + testConcurrentOperationsPositive(t, + v1.UniqueVolumeName("volume-name"), + "", /* podName1 */ + "", /* nodeName1 */ + "", /* volumeName1 */ + "", /* podName2 */ + "" /* nodeName2 */) +} + +func Test_NestedPendingOperations_Negative_SecondOpBeforeFirstCompletes_FirstVolume_SecondVolumePod(t *testing.T) { + testConcurrentOperationsNegative(t, + v1.UniqueVolumeName("volume-name"), + "", /* podName1 */ + "", /* nodeName1 */ + v1.UniqueVolumeName("volume-name"), + volumetypes.UniquePodName("operation-podname"), + "" /* nodeName2 */) +} + +func Test_NestedPendingOperations_Negative_SecondOpBeforeFirstCompletes_FirstVolume_SecondVolumeNode(t *testing.T) { + testConcurrentOperationsNegative(t, + v1.UniqueVolumeName("volume-name"), + "", /* podName1 */ + "", /* nodeName1 */ + v1.UniqueVolumeName("volume-name"), + "", /* podName2 */ + types.NodeName("operation-nodename")) +} + +func Test_NestedPendingOperations_Positive_SecondOpBeforeFirstCompletes_FirstVolumePod_SecondNone(t *testing.T) { + testConcurrentOperationsPositive(t, + v1.UniqueVolumeName("volume-name"), + volumetypes.UniquePodName("operation-podname"), + "", /* nodeName1 */ + "", /* volumeName1 */ + "", /* podName2 */ + "" /* nodeName2 */) +} + +func Test_NestedPendingOperations_Negative_SecondOpBeforeFirstCompletes_FirstVolumePod_SecondVolume(t *testing.T) { + testConcurrentOperationsNegative(t, + v1.UniqueVolumeName("volume-name"), + volumetypes.UniquePodName("operation-podname"), + "", /* nodeName1 */ + v1.UniqueVolumeName("volume-name"), + "", /* podName2 */ + "" /* nodeName2 */) +} + +func Test_NestedPendingOperations_Positive_SecondOpBeforeFirstCompletes_FirstVolumeNode_SecondNone(t *testing.T) { + testConcurrentOperationsPositive(t, + v1.UniqueVolumeName("volume-name"), + "", /* podName1 */ + types.NodeName("operation-nodename"), + "", /* volumeName1 */ + "", /* podName2 */ + "" /* nodeName2 */) +} + +func Test_NestedPendingOperations_Negative_SecondOpBeforeFirstCompletes_FirstVolumeNode_SecondVolume(t *testing.T) { + testConcurrentOperationsNegative(t, + v1.UniqueVolumeName("volume-name"), + "", /* podName1 */ + types.NodeName("operation-nodename"), + v1.UniqueVolumeName("volume-name"), + "", /* podName2 */ + "" /* nodeName2 */) +} + +func Test_NestedPendingOperations_Negative_SecondOpBeforeFirstCompletes_FirstVolumeNode_SecondVolumeNode(t *testing.T) { + testConcurrentOperationsNegative(t, + v1.UniqueVolumeName("volume-name"), + "", /* podName1 */ + types.NodeName("operation-nodename"), + v1.UniqueVolumeName("volume-name"), + "", /* podName2 */ + types.NodeName("operation-nodename")) +} + +// testConcurrentOperationsPositive passes if the two operations keyed by the +// provided parameters are executed in parallel, and fails otherwise. +func testConcurrentOperationsPositive( + t *testing.T, + volumeName1 v1.UniqueVolumeName, + podName1 volumetypes.UniquePodName, + nodeName1 types.NodeName, + volumeName2 v1.UniqueVolumeName, + podName2 volumetypes.UniquePodName, + nodeName2 types.NodeName) { + + // Arrange + grm := NewNestedPendingOperations(false /* exponentialBackOffOnError */) + operation1DoneCh := make(chan interface{}, 0 /* bufferSize */) + operation1 := generateWaitFunc(operation1DoneCh) + err1 := grm.Run(volumeName1, podName1, nodeName1 /* nodeName */, volumetypes.GeneratedOperations{OperationFunc: operation1}) + if err1 != nil { + t.Fatalf("NestedPendingOperations failed. Expected: Actual: <%v>", err1) + } + operation2 := generateNoopFunc() + + // Act + err2 := grm.Run(volumeName2, podName2, nodeName2, volumetypes.GeneratedOperations{OperationFunc: operation2}) + + // Assert + if err2 != nil { + t.Fatalf("NestedPendingOperations failed. Expected: Actual: <%v>", err2) + } +} + +// testConcurrentOperationsNegative passes if the creation of the second +// operation returns an alreadyExists error, and fails otherwise. +func testConcurrentOperationsNegative( + t *testing.T, + volumeName1 v1.UniqueVolumeName, + podName1 volumetypes.UniquePodName, + nodeName1 types.NodeName, + volumeName2 v1.UniqueVolumeName, + podName2 volumetypes.UniquePodName, + nodeName2 types.NodeName) { + + // Arrange + grm := NewNestedPendingOperations(false /* exponentialBackOffOnError */) + operation1DoneCh := make(chan interface{}, 0 /* bufferSize */) + operation1 := generateWaitFunc(operation1DoneCh) + err1 := grm.Run(volumeName1, podName1, nodeName1 /* nodeName */, volumetypes.GeneratedOperations{OperationFunc: operation1}) + if err1 != nil { + t.Fatalf("NestedPendingOperations failed. Expected: Actual: <%v>", err1) + } + operation2 := generateNoopFunc() + + // Act + err2 := grm.Run(volumeName2, podName2, nodeName2, volumetypes.GeneratedOperations{OperationFunc: operation2}) + + // Assert + if err2 == nil { + t.Fatalf("NestedPendingOperations did not fail. Expected an operation to already exist") + } + if !IsAlreadyExists(err2) { + t.Fatalf("NestedPendingOperations did not return alreadyExistsError, got: %v", err2) + } +} + +/* END concurrent operations tests */ + func generateCallbackFunc(done chan<- interface{}) func() (error, error) { return func() (error, error) { done <- true diff --git a/pkg/volume/util/operationexecutor/operation_executor.go b/pkg/volume/util/operationexecutor/operation_executor.go index 463770f1730..27caa231820 100644 --- a/pkg/volume/util/operationexecutor/operation_executor.go +++ b/pkg/volume/util/operationexecutor/operation_executor.go @@ -138,9 +138,9 @@ type OperationExecutor interface { // back off on retries. VerifyControllerAttachedVolume(volumeToMount VolumeToMount, nodeName types.NodeName, actualStateOfWorld ActualStateOfWorldAttacherUpdater) error - // IsOperationPending returns true if an operation for the given volumeName and podName is pending, - // otherwise it returns false - IsOperationPending(volumeName v1.UniqueVolumeName, podName volumetypes.UniquePodName) bool + // IsOperationPending returns true if an operation for the given volumeName + // and one of podName or nodeName is pending, otherwise it returns false + IsOperationPending(volumeName v1.UniqueVolumeName, podName volumetypes.UniquePodName, nodeName types.NodeName) bool // ExpandInUseVolume will resize volume's file system to expected size without unmounting the volume. ExpandInUseVolume(volumeToMount VolumeToMount, actualStateOfWorld ActualStateOfWorldMounterUpdater) error // ReconstructVolumeOperation construct a new volumeSpec and returns it created by plugin @@ -650,8 +650,11 @@ type operationExecutor struct { operationGenerator OperationGenerator } -func (oe *operationExecutor) IsOperationPending(volumeName v1.UniqueVolumeName, podName volumetypes.UniquePodName) bool { - return oe.pendingOperations.IsOperationPending(volumeName, podName) +func (oe *operationExecutor) IsOperationPending( + volumeName v1.UniqueVolumeName, + podName volumetypes.UniquePodName, + nodeName types.NodeName) bool { + return oe.pendingOperations.IsOperationPending(volumeName, podName, nodeName) } func (oe *operationExecutor) AttachVolume( @@ -660,8 +663,13 @@ func (oe *operationExecutor) AttachVolume( generatedOperations := oe.operationGenerator.GenerateAttachVolumeFunc(volumeToAttach, actualStateOfWorld) + if util.IsMultiAttachForbidden(volumeToAttach.VolumeSpec) { + return oe.pendingOperations.Run( + volumeToAttach.VolumeName, "" /* podName */, "" /* nodeName */, generatedOperations) + } + return oe.pendingOperations.Run( - volumeToAttach.VolumeName, "" /* podName */, generatedOperations) + volumeToAttach.VolumeName, "" /* podName */, volumeToAttach.NodeName, generatedOperations) } func (oe *operationExecutor) DetachVolume( @@ -674,8 +682,13 @@ func (oe *operationExecutor) DetachVolume( return err } + if util.IsMultiAttachForbidden(volumeToDetach.VolumeSpec) { + return oe.pendingOperations.Run( + volumeToDetach.VolumeName, "" /* podName */, "" /* nodeName */, generatedOperations) + } + return oe.pendingOperations.Run( - volumeToDetach.VolumeName, "" /* podName */, generatedOperations) + volumeToDetach.VolumeName, "" /* podName */, volumeToDetach.NodeName, generatedOperations) } func (oe *operationExecutor) VerifyVolumesAreAttached( @@ -761,7 +774,7 @@ func (oe *operationExecutor) VerifyVolumesAreAttached( // Ugly hack to ensure - we don't do parallel bulk polling of same volume plugin uniquePluginName := v1.UniqueVolumeName(pluginName) - err = oe.pendingOperations.Run(uniquePluginName, "" /* Pod Name */, generatedOperations) + err = oe.pendingOperations.Run(uniquePluginName, "" /* Pod Name */, "" /* nodeName */, generatedOperations) if err != nil { klog.Errorf("BulkVerifyVolumes.Run Error bulk volume verification for plugin %q with %v", pluginName, err) } @@ -779,7 +792,7 @@ func (oe *operationExecutor) VerifyVolumesAreAttachedPerNode( } // Give an empty UniqueVolumeName so that this operation could be executed concurrently. - return oe.pendingOperations.Run("" /* volumeName */, "" /* podName */, generatedOperations) + return oe.pendingOperations.Run("" /* volumeName */, "" /* podName */, "" /* nodeName */, generatedOperations) } func (oe *operationExecutor) MountVolume( @@ -820,7 +833,7 @@ func (oe *operationExecutor) MountVolume( // TODO mount_device return oe.pendingOperations.Run( - volumeToMount.VolumeName, podName, generatedOperations) + volumeToMount.VolumeName, podName, "" /* nodeName */, generatedOperations) } func (oe *operationExecutor) UnmountVolume( @@ -851,7 +864,7 @@ func (oe *operationExecutor) UnmountVolume( podName := volumetypes.UniquePodName(volumeToUnmount.PodUID) return oe.pendingOperations.Run( - volumeToUnmount.VolumeName, podName, generatedOperations) + volumeToUnmount.VolumeName, podName, "" /* nodeName */, generatedOperations) } func (oe *operationExecutor) UnmountDevice( @@ -882,7 +895,7 @@ func (oe *operationExecutor) UnmountDevice( podName := nestedpendingoperations.EmptyUniquePodName return oe.pendingOperations.Run( - deviceToDetach.VolumeName, podName, generatedOperations) + deviceToDetach.VolumeName, podName, "" /* nodeName */, generatedOperations) } func (oe *operationExecutor) ExpandInUseVolume(volumeToMount VolumeToMount, actualStateOfWorld ActualStateOfWorldMounterUpdater) error { @@ -890,7 +903,7 @@ func (oe *operationExecutor) ExpandInUseVolume(volumeToMount VolumeToMount, actu if err != nil { return err } - return oe.pendingOperations.Run(volumeToMount.VolumeName, "", generatedOperations) + return oe.pendingOperations.Run(volumeToMount.VolumeName, "", "" /* nodeName */, generatedOperations) } func (oe *operationExecutor) VerifyControllerAttachedVolume( @@ -904,7 +917,7 @@ func (oe *operationExecutor) VerifyControllerAttachedVolume( } return oe.pendingOperations.Run( - volumeToMount.VolumeName, "" /* podName */, generatedOperations) + volumeToMount.VolumeName, "" /* podName */, "" /* nodeName */, generatedOperations) } // ReconstructVolumeOperation return a func to create volumeSpec from mount path diff --git a/pkg/volume/util/operationexecutor/operation_executor_test.go b/pkg/volume/util/operationexecutor/operation_executor_test.go index adc199a004f..da4983d5125 100644 --- a/pkg/volume/util/operationexecutor/operation_executor_test.go +++ b/pkg/volume/util/operationexecutor/operation_executor_test.go @@ -17,6 +17,7 @@ limitations under the License. package operationexecutor import ( + "fmt" "strconv" "testing" "time" @@ -180,7 +181,7 @@ func TestOperationExecutor_UnmountDeviceConcurrently(t *testing.T) { } } -func TestOperationExecutor_AttachVolumeConcurrently(t *testing.T) { +func TestOperationExecutor_AttachSingleNodeVolumeConcurrentlyToSameNode(t *testing.T) { // Arrange ch, quit, oe := setup() volumesToAttach := make([]VolumeToAttach, numVolumesToAttach) @@ -191,6 +192,13 @@ func TestOperationExecutor_AttachVolumeConcurrently(t *testing.T) { volumesToAttach[i] = VolumeToAttach{ VolumeName: v1.UniqueVolumeName(pdName), NodeName: "node", + VolumeSpec: &volume.Spec{ + PersistentVolume: &v1.PersistentVolume{ + Spec: v1.PersistentVolumeSpec{ + AccessModes: []v1.PersistentVolumeAccessMode{v1.ReadWriteOnce}, + }, + }, + }, } oe.AttachVolume(volumesToAttach[i], nil /* actualStateOfWorldAttacherUpdater */) } @@ -201,7 +209,91 @@ func TestOperationExecutor_AttachVolumeConcurrently(t *testing.T) { } } -func TestOperationExecutor_DetachVolumeConcurrently(t *testing.T) { +func TestOperationExecutor_AttachMultiNodeVolumeConcurrentlyToSameNode(t *testing.T) { + // Arrange + ch, quit, oe := setup() + volumesToAttach := make([]VolumeToAttach, numVolumesToAttach) + pdName := "pd-volume" + + // Act + for i := range volumesToAttach { + volumesToAttach[i] = VolumeToAttach{ + VolumeName: v1.UniqueVolumeName(pdName), + NodeName: "node", + VolumeSpec: &volume.Spec{ + PersistentVolume: &v1.PersistentVolume{ + Spec: v1.PersistentVolumeSpec{ + AccessModes: []v1.PersistentVolumeAccessMode{v1.ReadOnlyMany}, + }, + }, + }, + } + oe.AttachVolume(volumesToAttach[i], nil /* actualStateOfWorldAttacherUpdater */) + } + + // Assert + if !isOperationRunSerially(ch, quit) { + t.Fatalf("Attach volume operations should not start concurrently") + } +} + +func TestOperationExecutor_AttachSingleNodeVolumeConcurrentlyToDifferentNodes(t *testing.T) { + // Arrange + ch, quit, oe := setup() + volumesToAttach := make([]VolumeToAttach, numVolumesToAttach) + pdName := "pd-volume" + + // Act + for i := range volumesToAttach { + volumesToAttach[i] = VolumeToAttach{ + VolumeName: v1.UniqueVolumeName(pdName), + NodeName: types.NodeName(fmt.Sprintf("node%d", i)), + VolumeSpec: &volume.Spec{ + PersistentVolume: &v1.PersistentVolume{ + Spec: v1.PersistentVolumeSpec{ + AccessModes: []v1.PersistentVolumeAccessMode{v1.ReadWriteOnce}, + }, + }, + }, + } + oe.AttachVolume(volumesToAttach[i], nil /* actualStateOfWorldAttacherUpdater */) + } + + // Assert + if !isOperationRunSerially(ch, quit) { + t.Fatalf("Attach volume operations should not start concurrently") + } +} + +func TestOperationExecutor_AttachMultiNodeVolumeConcurrentlyToDifferentNodes(t *testing.T) { + // Arrange + ch, quit, oe := setup() + volumesToAttach := make([]VolumeToAttach, numVolumesToAttach) + pdName := "pd-volume" + + // Act + for i := range volumesToAttach { + volumesToAttach[i] = VolumeToAttach{ + VolumeName: v1.UniqueVolumeName(pdName), + NodeName: types.NodeName(fmt.Sprintf("node%d", i)), + VolumeSpec: &volume.Spec{ + PersistentVolume: &v1.PersistentVolume{ + Spec: v1.PersistentVolumeSpec{ + AccessModes: []v1.PersistentVolumeAccessMode{v1.ReadOnlyMany}, + }, + }, + }, + } + oe.AttachVolume(volumesToAttach[i], nil /* actualStateOfWorldAttacherUpdater */) + } + + // Assert + if !isOperationRunConcurrently(ch, quit, numVolumesToAttach) { + t.Fatalf("Attach volume operations should not execute serially") + } +} + +func TestOperationExecutor_DetachSingleNodeVolumeConcurrentlyFromSameNode(t *testing.T) { // Arrange ch, quit, oe := setup() attachedVolumes := make([]AttachedVolume, numVolumesToDetach) @@ -212,6 +304,13 @@ func TestOperationExecutor_DetachVolumeConcurrently(t *testing.T) { attachedVolumes[i] = AttachedVolume{ VolumeName: v1.UniqueVolumeName(pdName), NodeName: "node", + VolumeSpec: &volume.Spec{ + PersistentVolume: &v1.PersistentVolume{ + Spec: v1.PersistentVolumeSpec{ + AccessModes: []v1.PersistentVolumeAccessMode{v1.ReadWriteOnce}, + }, + }, + }, } oe.DetachVolume(attachedVolumes[i], true /* verifySafeToDetach */, nil /* actualStateOfWorldAttacherUpdater */) } @@ -222,7 +321,91 @@ func TestOperationExecutor_DetachVolumeConcurrently(t *testing.T) { } } -func TestOperationExecutor_VerifyVolumesAreAttachedConcurrently(t *testing.T) { +func TestOperationExecutor_DetachMultiNodeVolumeConcurrentlyFromSameNode(t *testing.T) { + // Arrange + ch, quit, oe := setup() + attachedVolumes := make([]AttachedVolume, numVolumesToDetach) + pdName := "pd-volume" + + // Act + for i := range attachedVolumes { + attachedVolumes[i] = AttachedVolume{ + VolumeName: v1.UniqueVolumeName(pdName), + NodeName: "node", + VolumeSpec: &volume.Spec{ + PersistentVolume: &v1.PersistentVolume{ + Spec: v1.PersistentVolumeSpec{ + AccessModes: []v1.PersistentVolumeAccessMode{v1.ReadOnlyMany}, + }, + }, + }, + } + oe.DetachVolume(attachedVolumes[i], true /* verifySafeToDetach */, nil /* actualStateOfWorldAttacherUpdater */) + } + + // Assert + if !isOperationRunSerially(ch, quit) { + t.Fatalf("DetachVolume operations should not run concurrently") + } +} + +func TestOperationExecutor_DetachSingleNodeVolumeConcurrentlyFromDifferentNodes(t *testing.T) { + // Arrange + ch, quit, oe := setup() + attachedVolumes := make([]AttachedVolume, numVolumesToDetach) + pdName := "pd-volume" + + // Act + for i := range attachedVolumes { + attachedVolumes[i] = AttachedVolume{ + VolumeName: v1.UniqueVolumeName(pdName), + NodeName: types.NodeName(fmt.Sprintf("node%d", i)), + VolumeSpec: &volume.Spec{ + PersistentVolume: &v1.PersistentVolume{ + Spec: v1.PersistentVolumeSpec{ + AccessModes: []v1.PersistentVolumeAccessMode{v1.ReadWriteOnce}, + }, + }, + }, + } + oe.DetachVolume(attachedVolumes[i], true /* verifySafeToDetach */, nil /* actualStateOfWorldAttacherUpdater */) + } + + // Assert + if !isOperationRunSerially(ch, quit) { + t.Fatalf("DetachVolume operations should not run concurrently") + } +} + +func TestOperationExecutor_DetachMultiNodeVolumeConcurrentlyFromDifferentNodes(t *testing.T) { + // Arrange + ch, quit, oe := setup() + attachedVolumes := make([]AttachedVolume, numVolumesToDetach) + pdName := "pd-volume" + + // Act + for i := range attachedVolumes { + attachedVolumes[i] = AttachedVolume{ + VolumeName: v1.UniqueVolumeName(pdName), + NodeName: types.NodeName(fmt.Sprintf("node%d", i)), + VolumeSpec: &volume.Spec{ + PersistentVolume: &v1.PersistentVolume{ + Spec: v1.PersistentVolumeSpec{ + AccessModes: []v1.PersistentVolumeAccessMode{v1.ReadOnlyMany}, + }, + }, + }, + } + oe.DetachVolume(attachedVolumes[i], true /* verifySafeToDetach */, nil /* actualStateOfWorldAttacherUpdater */) + } + + // Assert + if !isOperationRunConcurrently(ch, quit, numVolumesToDetach) { + t.Fatalf("Attach volume operations should not execute serially") + } +} + +func TestOperationExecutor_VerifyVolumesAreAttachedConcurrentlyOnSameNode(t *testing.T) { // Arrange ch, quit, oe := setup() @@ -237,6 +420,24 @@ func TestOperationExecutor_VerifyVolumesAreAttachedConcurrently(t *testing.T) { } } +func TestOperationExecutor_VerifyVolumesAreAttachedConcurrentlyOnDifferentNodes(t *testing.T) { + // Arrange + ch, quit, oe := setup() + + // Act + for i := 0; i < numVolumesToVerifyAttached; i++ { + oe.VerifyVolumesAreAttachedPerNode( + nil, /* attachedVolumes */ + types.NodeName(fmt.Sprintf("node-name-%d", i)), + nil /* actualStateOfWorldAttacherUpdater */) + } + + // Assert + if !isOperationRunConcurrently(ch, quit, numVolumesToVerifyAttached) { + t.Fatalf("VerifyVolumesAreAttached operation is not being run concurrently") + } +} + func TestOperationExecutor_VerifyControllerAttachedVolumeConcurrently(t *testing.T) { // Arrange ch, quit, oe := setup() diff --git a/pkg/volume/util/util.go b/pkg/volume/util/util.go index e3baaa7c3a6..94419755942 100644 --- a/pkg/volume/util/util.go +++ b/pkg/volume/util/util.go @@ -644,3 +644,44 @@ func WriteVolumeCache(deviceMountPath string, exec utilexec.Interface) error { // For linux runtime, it skips because unmount will automatically flush disk data return nil } + +// IsMultiAttachForbidden checks if attaching this volume to multiple nodes is definitely not allowed/possible. +// In its current form, this function can only reliably say for which volumes it's definitely forbidden. If it returns +// false, it is not guaranteed that multi-attach is actually supported by the volume type and we must rely on the +// attacher to fail fast in such cases. +// Please see https://github.com/kubernetes/kubernetes/issues/40669 and https://github.com/kubernetes/kubernetes/pull/40148#discussion_r98055047 +func IsMultiAttachForbidden(volumeSpec *volume.Spec) bool { + if volumeSpec == nil { + // we don't know if it's supported or not and let the attacher fail later in cases it's not supported + return false + } + + if volumeSpec.Volume != nil { + // Check for volume types which are known to fail slow or cause trouble when trying to multi-attach + if volumeSpec.Volume.AzureDisk != nil || + volumeSpec.Volume.Cinder != nil { + return true + } + } + + // Only if this volume is a persistent volume, we have reliable information on whether it's allowed or not to + // multi-attach. We trust in the individual volume implementations to not allow unsupported access modes + if volumeSpec.PersistentVolume != nil { + // Check for persistent volume types which do not fail when trying to multi-attach + if len(volumeSpec.PersistentVolume.Spec.AccessModes) == 0 { + // No access mode specified so we don't know for sure. Let the attacher fail if needed + return false + } + + // check if this volume is allowed to be attached to multiple PODs/nodes, if yes, return false + for _, accessMode := range volumeSpec.PersistentVolume.Spec.AccessModes { + if accessMode == v1.ReadWriteMany || accessMode == v1.ReadOnlyMany { + return false + } + } + return true + } + + // we don't know if it's supported or not and let the attacher fail later in cases it's not supported + return false +}