From cd1059e3c4bd90395e739d9568a448c5cc5972ca Mon Sep 17 00:00:00 2001 From: Jordan Liggitt Date: Wed, 29 Jan 2020 14:58:32 -0500 Subject: [PATCH] Revert "Merge pull request #87258 from verult/slow-rxm-attach" This reverts commit 15c3f1b119ff4b4073c86df05c819a9672b80d68, reversing changes made to 52d7614a8ca5b8aebc45333b6dc8fbf86a5e7ddf. --- .../volume/attachdetach/reconciler/BUILD | 2 +- .../attachdetach/reconciler/reconciler.go | 80 ++-- .../volumemanager/reconciler/reconciler.go | 4 +- pkg/volume/util/nestedpendingoperations/BUILD | 2 - .../nestedpendingoperations.go | 192 ++++---- .../nestedpendingoperations_test.go | 415 +++++------------- .../operationexecutor/operation_executor.go | 41 +- .../operation_executor_test.go | 207 +-------- pkg/volume/util/util.go | 41 -- 9 files changed, 258 insertions(+), 726 deletions(-) diff --git a/pkg/controller/volume/attachdetach/reconciler/BUILD b/pkg/controller/volume/attachdetach/reconciler/BUILD index b3a94b397e3..cb047fc9366 100644 --- a/pkg/controller/volume/attachdetach/reconciler/BUILD +++ b/pkg/controller/volume/attachdetach/reconciler/BUILD @@ -16,7 +16,7 @@ go_library( "//pkg/controller/volume/attachdetach/statusupdater:go_default_library", "//pkg/kubelet/events:go_default_library", "//pkg/util/goroutinemap/exponentialbackoff:go_default_library", - "//pkg/volume/util:go_default_library", + "//pkg/volume:go_default_library", "//pkg/volume/util/operationexecutor:go_default_library", "//staging/src/k8s.io/api/core/v1:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/types:go_default_library", diff --git a/pkg/controller/volume/attachdetach/reconciler/reconciler.go b/pkg/controller/volume/attachdetach/reconciler/reconciler.go index 06561e6fa0b..a2ec9be2a93 100644 --- a/pkg/controller/volume/attachdetach/reconciler/reconciler.go +++ b/pkg/controller/volume/attachdetach/reconciler/reconciler.go @@ -24,7 +24,7 @@ import ( "strings" "time" - v1 "k8s.io/api/core/v1" + "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/util/wait" "k8s.io/client-go/tools/record" @@ -34,7 +34,7 @@ import ( "k8s.io/kubernetes/pkg/controller/volume/attachdetach/statusupdater" kevents "k8s.io/kubernetes/pkg/kubelet/events" "k8s.io/kubernetes/pkg/util/goroutinemap/exponentialbackoff" - "k8s.io/kubernetes/pkg/volume/util" + "k8s.io/kubernetes/pkg/volume" "k8s.io/kubernetes/pkg/volume/util/operationexecutor" ) @@ -134,6 +134,42 @@ func (rc *reconciler) syncStates() { rc.attacherDetacher.VerifyVolumesAreAttached(volumesPerNode, rc.actualStateOfWorld) } +// isMultiAttachForbidden checks if attaching this volume to multiple nodes is definitely not allowed/possible. +// In its current form, this function can only reliably say for which volumes it's definitely forbidden. If it returns +// false, it is not guaranteed that multi-attach is actually supported by the volume type and we must rely on the +// attacher to fail fast in such cases. +// Please see https://github.com/kubernetes/kubernetes/issues/40669 and https://github.com/kubernetes/kubernetes/pull/40148#discussion_r98055047 +func (rc *reconciler) isMultiAttachForbidden(volumeSpec *volume.Spec) bool { + if volumeSpec.Volume != nil { + // Check for volume types which are known to fail slow or cause trouble when trying to multi-attach + if volumeSpec.Volume.AzureDisk != nil || + volumeSpec.Volume.Cinder != nil { + return true + } + } + + // Only if this volume is a persistent volume, we have reliable information on whether it's allowed or not to + // multi-attach. We trust in the individual volume implementations to not allow unsupported access modes + if volumeSpec.PersistentVolume != nil { + // Check for persistent volume types which do not fail when trying to multi-attach + if len(volumeSpec.PersistentVolume.Spec.AccessModes) == 0 { + // No access mode specified so we don't know for sure. Let the attacher fail if needed + return false + } + + // check if this volume is allowed to be attached to multiple PODs/nodes, if yes, return false + for _, accessMode := range volumeSpec.PersistentVolume.Spec.AccessModes { + if accessMode == v1.ReadWriteMany || accessMode == v1.ReadOnlyMany { + return false + } + } + return true + } + + // we don't know if it's supported or not and let the attacher fail later in cases it's not supported + return false +} + func (rc *reconciler) reconcile() { // Detaches are triggered before attaches so that volumes referenced by // pods that are rescheduled to a different node are detached first. @@ -146,16 +182,9 @@ func (rc *reconciler) reconcile() { // This check must be done before we do any other checks, as otherwise the other checks // may pass while at the same time the volume leaves the pending state, resulting in // double detach attempts - if util.IsMultiAttachForbidden(attachedVolume.VolumeSpec) { - if rc.attacherDetacher.IsOperationPending(attachedVolume.VolumeName, "" /* podName */, "" /* nodeName */) { - klog.V(10).Infof("Operation for volume %q is already running in the cluster. Can't start detach for %q", attachedVolume.VolumeName, attachedVolume.NodeName) - continue - } - } else { - if rc.attacherDetacher.IsOperationPending(attachedVolume.VolumeName, "" /* podName */, attachedVolume.NodeName) { - klog.V(10).Infof("Operation for volume %q is already running for node %q. Can't start detach", attachedVolume.VolumeName, attachedVolume.NodeName) - continue - } + if rc.attacherDetacher.IsOperationPending(attachedVolume.VolumeName, "") { + klog.V(10).Infof("Operation for volume %q is already running. Can't start detach for %q", attachedVolume.VolumeName, attachedVolume.NodeName) + continue } // Set the detach request time @@ -231,17 +260,15 @@ func (rc *reconciler) attachDesiredVolumes() { rc.actualStateOfWorld.ResetDetachRequestTime(volumeToAttach.VolumeName, volumeToAttach.NodeName) continue } - - if util.IsMultiAttachForbidden(volumeToAttach.VolumeSpec) { - - // Don't even try to start an operation if there is already one running for the given volume - if rc.attacherDetacher.IsOperationPending(volumeToAttach.VolumeName, "" /* podName */, "" /* nodeName */) { - if klog.V(10) { - klog.Infof("Operation for volume %q is already running. Can't start attach for %q", volumeToAttach.VolumeName, volumeToAttach.NodeName) - } - continue + // Don't even try to start an operation if there is already one running + if rc.attacherDetacher.IsOperationPending(volumeToAttach.VolumeName, "") { + if klog.V(10) { + klog.Infof("Operation for volume %q is already running. Can't start attach for %q", volumeToAttach.VolumeName, volumeToAttach.NodeName) } + continue + } + if rc.isMultiAttachForbidden(volumeToAttach.VolumeSpec) { nodes := rc.actualStateOfWorld.GetNodesForAttachedVolume(volumeToAttach.VolumeName) if len(nodes) > 0 { if !volumeToAttach.MultiAttachErrorReported { @@ -250,17 +277,6 @@ func (rc *reconciler) attachDesiredVolumes() { } continue } - - } else { - - // Don't even try to start an operation if there is already one running for the given volume and node. - if rc.attacherDetacher.IsOperationPending(volumeToAttach.VolumeName, "" /* podName */, volumeToAttach.NodeName) { - if klog.V(10) { - klog.Infof("Operation for volume %q is already running for node %q. Can't start attach", volumeToAttach.VolumeName, volumeToAttach.NodeName) - } - continue - } - } // Volume/Node doesn't exist, spawn a goroutine to attach it diff --git a/pkg/kubelet/volumemanager/reconciler/reconciler.go b/pkg/kubelet/volumemanager/reconciler/reconciler.go index 46b1e88c38a..adb2e2038cd 100644 --- a/pkg/kubelet/volumemanager/reconciler/reconciler.go +++ b/pkg/kubelet/volumemanager/reconciler/reconciler.go @@ -294,7 +294,7 @@ func (rc *reconciler) unmountDetachDevices() { for _, attachedVolume := range rc.actualStateOfWorld.GetUnmountedVolumes() { // Check IsOperationPending to avoid marking a volume as detached if it's in the process of mounting. if !rc.desiredStateOfWorld.VolumeExists(attachedVolume.VolumeName) && - !rc.operationExecutor.IsOperationPending(attachedVolume.VolumeName, nestedpendingoperations.EmptyUniquePodName, nestedpendingoperations.EmptyNodeName) { + !rc.operationExecutor.IsOperationPending(attachedVolume.VolumeName, nestedpendingoperations.EmptyUniquePodName) { if attachedVolume.DeviceMayBeMounted() { // Volume is globally mounted to device, unmount it klog.V(5).Infof(attachedVolume.GenerateMsgDetailed("Starting operationExecutor.UnmountDevice", "")) @@ -422,7 +422,7 @@ func (rc *reconciler) syncStates() { continue } // There is no pod that uses the volume. - if rc.operationExecutor.IsOperationPending(reconstructedVolume.volumeName, nestedpendingoperations.EmptyUniquePodName, nestedpendingoperations.EmptyNodeName) { + if rc.operationExecutor.IsOperationPending(reconstructedVolume.volumeName, nestedpendingoperations.EmptyUniquePodName) { klog.Warning("Volume is in pending operation, skip cleaning up mounts") } klog.V(2).Infof( diff --git a/pkg/volume/util/nestedpendingoperations/BUILD b/pkg/volume/util/nestedpendingoperations/BUILD index 5e287a0aba9..5e940356c8d 100644 --- a/pkg/volume/util/nestedpendingoperations/BUILD +++ b/pkg/volume/util/nestedpendingoperations/BUILD @@ -14,7 +14,6 @@ go_library( "//pkg/util/goroutinemap/exponentialbackoff:go_default_library", "//pkg/volume/util/types:go_default_library", "//staging/src/k8s.io/api/core/v1:go_default_library", - "//staging/src/k8s.io/apimachinery/pkg/types:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/util/runtime:go_default_library", "//vendor/k8s.io/klog:go_default_library", ], @@ -28,7 +27,6 @@ go_test( "//pkg/util/goroutinemap/exponentialbackoff:go_default_library", "//pkg/volume/util/types:go_default_library", "//staging/src/k8s.io/api/core/v1:go_default_library", - "//staging/src/k8s.io/apimachinery/pkg/types:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/util/wait:go_default_library", ], ) diff --git a/pkg/volume/util/nestedpendingoperations/nestedpendingoperations.go b/pkg/volume/util/nestedpendingoperations/nestedpendingoperations.go index e0813b4d04c..193e362cd0b 100644 --- a/pkg/volume/util/nestedpendingoperations/nestedpendingoperations.go +++ b/pkg/volume/util/nestedpendingoperations/nestedpendingoperations.go @@ -29,73 +29,45 @@ import ( "sync" "k8s.io/api/core/v1" - "k8s.io/apimachinery/pkg/types" k8sRuntime "k8s.io/apimachinery/pkg/util/runtime" "k8s.io/klog" "k8s.io/kubernetes/pkg/util/goroutinemap/exponentialbackoff" - volumetypes "k8s.io/kubernetes/pkg/volume/util/types" + "k8s.io/kubernetes/pkg/volume/util/types" ) const ( // EmptyUniquePodName is a UniquePodName for empty string. - EmptyUniquePodName volumetypes.UniquePodName = volumetypes.UniquePodName("") + EmptyUniquePodName types.UniquePodName = types.UniquePodName("") // EmptyUniqueVolumeName is a UniqueVolumeName for empty string EmptyUniqueVolumeName v1.UniqueVolumeName = v1.UniqueVolumeName("") - - // EmptyNodeName is a NodeName for empty string - EmptyNodeName types.NodeName = types.NodeName("") ) // NestedPendingOperations defines the supported set of operations. type NestedPendingOperations interface { - - // Run adds the concatenation of volumeName and one of podName or nodeName to - // the list of running operations and spawns a new go routine to execute - // OperationFunc inside generatedOperations. - - // volumeName, podName, and nodeName collectively form the operation key. - // The following forms of operation keys are supported: - // - volumeName empty, podName empty, nodeName empty - // This key does not have any conflicting keys. - // - volumeName exists, podName empty, nodeName empty - // This key conflicts with all other keys with the same volumeName. - // - volumeName exists, podName exists, nodeName empty - // This key conflicts with: - // - the same volumeName and podName - // - the same volumeName, but no podName - // - volumeName exists, podName empty, nodeName exists - // This key conflicts with: - // - the same volumeName and nodeName - // - the same volumeName but no nodeName - - // If an operation with the same operationName and a conflicting key exists, - // an AlreadyExists or ExponentialBackoff error is returned. - // If an operation with a conflicting key has ExponentialBackoff error but - // operationName is different, exponential backoff is reset and operation is - // allowed to proceed. - + // Run adds the concatenation of volumeName and podName to the list of + // running operations and spawns a new go routine to execute operationFunc. + // If an operation with the same volumeName, same or empty podName + // and same operationName exits, an AlreadyExists or ExponentialBackoff + // error is returned. If an operation with same volumeName and podName + // has ExponentialBackoff error but operationName is different, exponential + // backoff is reset and operation is allowed to proceed. + // This enables multiple operations to execute in parallel for the same + // volumeName as long as they have different podName. // Once the operation is complete, the go routine is terminated and the - // concatenation of volumeName and (podName or nodeName) is removed from the - // list of executing operations allowing a new operation to be started with - // the volumeName without error. - Run( - volumeName v1.UniqueVolumeName, - podName volumetypes.UniquePodName, - nodeName types.NodeName, - generatedOperations volumetypes.GeneratedOperations) error + // concatenation of volumeName and podName is removed from the list of + // executing operations allowing a new operation to be started with the + // volumeName without error. + Run(volumeName v1.UniqueVolumeName, podName types.UniquePodName, generatedOperations types.GeneratedOperations) error // Wait blocks until all operations are completed. This is typically // necessary during tests - the test should wait until all operations finish // and evaluate results after that. Wait() - // IsOperationPending returns true if an operation for the given volumeName - // and one of podName or nodeName is pending, otherwise it returns false - IsOperationPending( - volumeName v1.UniqueVolumeName, - podName volumetypes.UniquePodName, - nodeName types.NodeName) bool + // IsOperationPending returns true if an operation for the given volumeName and podName is pending, + // otherwise it returns false + IsOperationPending(volumeName v1.UniqueVolumeName, podName types.UniquePodName) bool } // NewNestedPendingOperations returns a new instance of NestedPendingOperations. @@ -116,7 +88,8 @@ type nestedPendingOperations struct { } type operation struct { - key operationKey + volumeName v1.UniqueVolumeName + podName types.UniquePodName operationName string operationPending bool expBackoff exponentialbackoff.ExponentialBackoff @@ -124,24 +97,22 @@ type operation struct { func (grm *nestedPendingOperations) Run( volumeName v1.UniqueVolumeName, - podName volumetypes.UniquePodName, - nodeName types.NodeName, - generatedOperations volumetypes.GeneratedOperations) error { + podName types.UniquePodName, + generatedOperations types.GeneratedOperations) error { grm.lock.Lock() defer grm.lock.Unlock() - - opKey := operationKey{volumeName, podName, nodeName} - - opExists, previousOpIndex := grm.isOperationExists(opKey) + opExists, previousOpIndex := grm.isOperationExists(volumeName, podName) if opExists { previousOp := grm.operations[previousOpIndex] // Operation already exists if previousOp.operationPending { // Operation is pending - return NewAlreadyExistsError(opKey) + operationKey := getOperationKey(volumeName, podName) + return NewAlreadyExistsError(operationKey) } - backOffErr := previousOp.expBackoff.SafeToRetry(opKey.String()) + operationKey := getOperationKey(volumeName, podName) + backOffErr := previousOp.expBackoff.SafeToRetry(operationKey) if backOffErr != nil { if previousOp.operationName == generatedOperations.OperationName { return backOffErr @@ -153,13 +124,15 @@ func (grm *nestedPendingOperations) Run( // Update existing operation to mark as pending. grm.operations[previousOpIndex].operationPending = true - grm.operations[previousOpIndex].key = opKey + grm.operations[previousOpIndex].volumeName = volumeName + grm.operations[previousOpIndex].podName = podName } else { // Create a new operation grm.operations = append(grm.operations, operation{ - key: opKey, operationPending: true, + volumeName: volumeName, + podName: podName, operationName: generatedOperations.OperationName, expBackoff: exponentialbackoff.ExponentialBackoff{}, }) @@ -169,7 +142,7 @@ func (grm *nestedPendingOperations) Run( // Handle unhandled panics (very unlikely) defer k8sRuntime.HandleCrash() // Handle completion of and error, if any, from operationFunc() - defer grm.operationComplete(opKey, &detailedErr) + defer grm.operationComplete(volumeName, podName, &detailedErr) return generatedOperations.Run() }() @@ -178,14 +151,12 @@ func (grm *nestedPendingOperations) Run( func (grm *nestedPendingOperations) IsOperationPending( volumeName v1.UniqueVolumeName, - podName volumetypes.UniquePodName, - nodeName types.NodeName) bool { + podName types.UniquePodName) bool { grm.lock.RLock() defer grm.lock.RUnlock() - opKey := operationKey{volumeName, podName, nodeName} - exist, previousOpIndex := grm.isOperationExists(opKey) + exist, previousOpIndex := grm.isOperationExists(volumeName, podName) if exist && grm.operations[previousOpIndex].operationPending { return true } @@ -193,52 +164,59 @@ func (grm *nestedPendingOperations) IsOperationPending( } // This is an internal function and caller should acquire and release the lock -func (grm *nestedPendingOperations) isOperationExists(key operationKey) (bool, int) { +func (grm *nestedPendingOperations) isOperationExists( + volumeName v1.UniqueVolumeName, + podName types.UniquePodName) (bool, int) { // If volumeName is empty, operation can be executed concurrently - if key.volumeName == EmptyUniqueVolumeName { + if volumeName == EmptyUniqueVolumeName { return false, -1 } for previousOpIndex, previousOp := range grm.operations { - volumeNameMatch := previousOp.key.volumeName == key.volumeName - - podNameMatch := previousOp.key.podName == EmptyUniquePodName || - key.podName == EmptyUniquePodName || - previousOp.key.podName == key.podName - - nodeNameMatch := previousOp.key.nodeName == EmptyNodeName || - key.nodeName == EmptyNodeName || - previousOp.key.nodeName == key.nodeName - - if volumeNameMatch && podNameMatch && nodeNameMatch { - return true, previousOpIndex + if previousOp.volumeName != volumeName { + // No match, keep searching + continue } - } + if previousOp.podName != EmptyUniquePodName && + podName != EmptyUniquePodName && + previousOp.podName != podName { + // No match, keep searching + continue + } + + // Match + return true, previousOpIndex + } return false, -1 } -func (grm *nestedPendingOperations) getOperation(key operationKey) (uint, error) { +func (grm *nestedPendingOperations) getOperation( + volumeName v1.UniqueVolumeName, + podName types.UniquePodName) (uint, error) { // Assumes lock has been acquired by caller. for i, op := range grm.operations { - if op.key.volumeName == key.volumeName && - op.key.podName == key.podName { + if op.volumeName == volumeName && + op.podName == podName { return uint(i), nil } } - return 0, fmt.Errorf("Operation %q not found", key) + logOperationKey := getOperationKey(volumeName, podName) + return 0, fmt.Errorf("Operation %q not found", logOperationKey) } -func (grm *nestedPendingOperations) deleteOperation(key operationKey) { +func (grm *nestedPendingOperations) deleteOperation( // Assumes lock has been acquired by caller. + volumeName v1.UniqueVolumeName, + podName types.UniquePodName) { opIndex := -1 for i, op := range grm.operations { - if op.key.volumeName == key.volumeName && - op.key.podName == key.podName { + if op.volumeName == volumeName && + op.podName == podName { opIndex = i break } @@ -249,7 +227,8 @@ func (grm *nestedPendingOperations) deleteOperation(key operationKey) { grm.operations = grm.operations[:len(grm.operations)-1] } -func (grm *nestedPendingOperations) operationComplete(key operationKey, err *error) { +func (grm *nestedPendingOperations) operationComplete( + volumeName v1.UniqueVolumeName, podName types.UniquePodName, err *error) { // Defer operations are executed in Last-In is First-Out order. In this case // the lock is acquired first when operationCompletes begins, and is // released when the method finishes, after the lock is released cond is @@ -260,20 +239,24 @@ func (grm *nestedPendingOperations) operationComplete(key operationKey, err *err if *err == nil || !grm.exponentialBackOffOnError { // Operation completed without error, or exponentialBackOffOnError disabled - grm.deleteOperation(key) + grm.deleteOperation(volumeName, podName) if *err != nil { // Log error - klog.Errorf("operation %s failed with: %v", key, *err) + logOperationKey := getOperationKey(volumeName, podName) + klog.Errorf("operation %s failed with: %v", + logOperationKey, + *err) } return } // Operation completed with error and exponentialBackOffOnError Enabled - existingOpIndex, getOpErr := grm.getOperation(key) + existingOpIndex, getOpErr := grm.getOperation(volumeName, podName) if getOpErr != nil { // Failed to find existing operation + logOperationKey := getOperationKey(volumeName, podName) klog.Errorf("Operation %s completed. error: %v. exponentialBackOffOnError is enabled, but failed to get operation to update.", - key, + logOperationKey, *err) return } @@ -282,8 +265,10 @@ func (grm *nestedPendingOperations) operationComplete(key operationKey, err *err grm.operations[existingOpIndex].operationPending = false // Log error + operationKey := + getOperationKey(volumeName, podName) klog.Errorf("%v", grm.operations[existingOpIndex].expBackoff. - GenerateNoRetriesPermittedMsg(key.String())) + GenerateNoRetriesPermittedMsg(operationKey)) } func (grm *nestedPendingOperations) Wait() { @@ -295,22 +280,21 @@ func (grm *nestedPendingOperations) Wait() { } } -type operationKey struct { - volumeName v1.UniqueVolumeName - podName volumetypes.UniquePodName - nodeName types.NodeName -} +func getOperationKey( + volumeName v1.UniqueVolumeName, podName types.UniquePodName) string { + podNameStr := "" + if podName != EmptyUniquePodName { + podNameStr = fmt.Sprintf(" (%q)", podName) + } -func (key operationKey) String() string { - return fmt.Sprintf("{volumeName=%q, podName=%q, nodeName=%q}", - key.volumeName, - key.podName, - key.nodeName) + return fmt.Sprintf("%q%s", + volumeName, + podNameStr) } // NewAlreadyExistsError returns a new instance of AlreadyExists error. -func NewAlreadyExistsError(key operationKey) error { - return alreadyExistsError{key} +func NewAlreadyExistsError(operationKey string) error { + return alreadyExistsError{operationKey} } // IsAlreadyExists returns true if an error returned from @@ -329,7 +313,7 @@ func IsAlreadyExists(err error) bool { // new operation can not be started because an operation with the same operation // name is already executing. type alreadyExistsError struct { - operationKey operationKey + operationKey string } var _ error = alreadyExistsError{} diff --git a/pkg/volume/util/nestedpendingoperations/nestedpendingoperations_test.go b/pkg/volume/util/nestedpendingoperations/nestedpendingoperations_test.go index d9301d64f4a..fe52c897798 100644 --- a/pkg/volume/util/nestedpendingoperations/nestedpendingoperations_test.go +++ b/pkg/volume/util/nestedpendingoperations/nestedpendingoperations_test.go @@ -22,10 +22,9 @@ import ( "time" "k8s.io/api/core/v1" - "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/util/wait" "k8s.io/kubernetes/pkg/util/goroutinemap/exponentialbackoff" - volumetypes "k8s.io/kubernetes/pkg/volume/util/types" + "k8s.io/kubernetes/pkg/volume/util/types" ) const ( @@ -45,22 +44,22 @@ const ( initialOperationWaitTimeLong time.Duration = 500 * time.Millisecond ) -func Test_NestedPendingOperations_Positive_SingleOp(t *testing.T) { +func Test_NewGoRoutineMap_Positive_SingleOp(t *testing.T) { // Arrange grm := NewNestedPendingOperations(false /* exponentialBackOffOnError */) volumeName := v1.UniqueVolumeName("volume-name") operation := func() (error, error) { return nil, nil } // Act - err := grm.Run(volumeName, "" /* operationSubName */, "" /* nodeName */, volumetypes.GeneratedOperations{OperationFunc: operation}) + err := grm.Run(volumeName, "" /* operationSubName */, types.GeneratedOperations{OperationFunc: operation}) // Assert if err != nil { - t.Fatalf("NestedPendingOperations failed. Expected: Actual: <%v>", err) + t.Fatalf("NewGoRoutine failed. Expected: Actual: <%v>", err) } } -func Test_NestedPendingOperations_Positive_TwoOps(t *testing.T) { +func Test_NewGoRoutineMap_Positive_TwoOps(t *testing.T) { // Arrange grm := NewNestedPendingOperations(false /* exponentialBackOffOnError */) volume1Name := v1.UniqueVolumeName("volume1-name") @@ -68,65 +67,65 @@ func Test_NestedPendingOperations_Positive_TwoOps(t *testing.T) { operation := func() (error, error) { return nil, nil } // Act - err1 := grm.Run(volume1Name, "" /* operationSubName */, "" /* nodeName */, volumetypes.GeneratedOperations{OperationFunc: operation}) - err2 := grm.Run(volume2Name, "" /* operationSubName */, "" /* nodeName */, volumetypes.GeneratedOperations{OperationFunc: operation}) + err1 := grm.Run(volume1Name, "" /* operationSubName */, types.GeneratedOperations{OperationFunc: operation}) + err2 := grm.Run(volume2Name, "" /* operationSubName */, types.GeneratedOperations{OperationFunc: operation}) // Assert if err1 != nil { - t.Fatalf("NestedPendingOperations %q failed. Expected: Actual: <%v>", volume1Name, err1) + t.Fatalf("NewGoRoutine %q failed. Expected: Actual: <%v>", volume1Name, err1) } if err2 != nil { - t.Fatalf("NestedPendingOperations %q failed. Expected: Actual: <%v>", volume2Name, err2) + t.Fatalf("NewGoRoutine %q failed. Expected: Actual: <%v>", volume2Name, err2) } } -func Test_NestedPendingOperations_Positive_TwoSubOps(t *testing.T) { +func Test_NewGoRoutineMap_Positive_TwoSubOps(t *testing.T) { // Arrange grm := NewNestedPendingOperations(false /* exponentialBackOffOnError */) volumeName := v1.UniqueVolumeName("volume-name") - operation1PodName := volumetypes.UniquePodName("operation1-podname") - operation2PodName := volumetypes.UniquePodName("operation2-podname") + operation1PodName := types.UniquePodName("operation1-podname") + operation2PodName := types.UniquePodName("operation2-podname") operation := func() (error, error) { return nil, nil } // Act - err1 := grm.Run(volumeName, operation1PodName, "" /* nodeName */, volumetypes.GeneratedOperations{OperationFunc: operation}) - err2 := grm.Run(volumeName, operation2PodName, "" /* nodeName */, volumetypes.GeneratedOperations{OperationFunc: operation}) + err1 := grm.Run(volumeName, operation1PodName, types.GeneratedOperations{OperationFunc: operation}) + err2 := grm.Run(volumeName, operation2PodName, types.GeneratedOperations{OperationFunc: operation}) // Assert if err1 != nil { - t.Fatalf("NestedPendingOperations %q failed. Expected: Actual: <%v>", operation1PodName, err1) + t.Fatalf("NewGoRoutine %q failed. Expected: Actual: <%v>", operation1PodName, err1) } if err2 != nil { - t.Fatalf("NestedPendingOperations %q failed. Expected: Actual: <%v>", operation2PodName, err2) + t.Fatalf("NewGoRoutine %q failed. Expected: Actual: <%v>", operation2PodName, err2) } } -func Test_NestedPendingOperations_Positive_SingleOpWithExpBackoff(t *testing.T) { +func Test_NewGoRoutineMap_Positive_SingleOpWithExpBackoff(t *testing.T) { // Arrange grm := NewNestedPendingOperations(true /* exponentialBackOffOnError */) volumeName := v1.UniqueVolumeName("volume-name") operation := func() (error, error) { return nil, nil } // Act - err := grm.Run(volumeName, "" /* operationSubName */, "" /* nodeName */, volumetypes.GeneratedOperations{OperationFunc: operation}) + err := grm.Run(volumeName, "" /* operationSubName */, types.GeneratedOperations{OperationFunc: operation}) // Assert if err != nil { - t.Fatalf("NestedPendingOperations failed. Expected: Actual: <%v>", err) + t.Fatalf("NewGoRoutine failed. Expected: Actual: <%v>", err) } } -func Test_NestedPendingOperations_Positive_SecondOpAfterFirstCompletes(t *testing.T) { +func Test_NewGoRoutineMap_Positive_SecondOpAfterFirstCompletes(t *testing.T) { // Arrange grm := NewNestedPendingOperations(false /* exponentialBackOffOnError */) volumeName := v1.UniqueVolumeName("volume-name") operation1DoneCh := make(chan interface{}, 0 /* bufferSize */) operation1 := generateCallbackFunc(operation1DoneCh) - err1 := grm.Run(volumeName, "" /* operationSubName */, "" /* nodeName */, volumetypes.GeneratedOperations{OperationFunc: operation1}) + err1 := grm.Run(volumeName, "" /* operationSubName */, types.GeneratedOperations{OperationFunc: operation1}) if err1 != nil { - t.Fatalf("NestedPendingOperations failed. Expected: Actual: <%v>", err1) + t.Fatalf("NewGoRoutine failed. Expected: Actual: <%v>", err1) } operation2 := generateNoopFunc() <-operation1DoneCh // Force operation1 to complete @@ -135,9 +134,9 @@ func Test_NestedPendingOperations_Positive_SecondOpAfterFirstCompletes(t *testin err2 := retryWithExponentialBackOff( time.Duration(initialOperationWaitTimeShort), func() (bool, error) { - err := grm.Run(volumeName, "" /* operationSubName */, "" /* nodeName */, volumetypes.GeneratedOperations{OperationFunc: operation2}) + err := grm.Run(volumeName, "" /* operationSubName */, types.GeneratedOperations{OperationFunc: operation2}) if err != nil { - t.Logf("Warning: NestedPendingOperations failed with %v. Will retry.", err) + t.Logf("Warning: NewGoRoutine failed with %v. Will retry.", err) return false, nil } return true, nil @@ -146,19 +145,19 @@ func Test_NestedPendingOperations_Positive_SecondOpAfterFirstCompletes(t *testin // Assert if err2 != nil { - t.Fatalf("NestedPendingOperations failed. Expected: Actual: <%v>", err2) + t.Fatalf("NewGoRoutine failed. Expected: Actual: <%v>", err2) } } -func Test_NestedPendingOperations_Positive_SecondOpAfterFirstCompletesWithExpBackoff(t *testing.T) { +func Test_NewGoRoutineMap_Positive_SecondOpAfterFirstCompletesWithExpBackoff(t *testing.T) { // Arrange grm := NewNestedPendingOperations(true /* exponentialBackOffOnError */) volumeName := v1.UniqueVolumeName("volume-name") operation1DoneCh := make(chan interface{}, 0 /* bufferSize */) operation1 := generateCallbackFunc(operation1DoneCh) - err1 := grm.Run(volumeName, "" /* operationSubName */, "" /* nodeName */, volumetypes.GeneratedOperations{OperationFunc: operation1}) + err1 := grm.Run(volumeName, "" /* operationSubName */, types.GeneratedOperations{OperationFunc: operation1}) if err1 != nil { - t.Fatalf("NestedPendingOperations failed. Expected: Actual: <%v>", err1) + t.Fatalf("NewGoRoutine failed. Expected: Actual: <%v>", err1) } operation2 := generateNoopFunc() <-operation1DoneCh // Force operation1 to complete @@ -167,9 +166,9 @@ func Test_NestedPendingOperations_Positive_SecondOpAfterFirstCompletesWithExpBac err2 := retryWithExponentialBackOff( time.Duration(initialOperationWaitTimeShort), func() (bool, error) { - err := grm.Run(volumeName, "" /* operationSubName */, "" /* nodeName */, volumetypes.GeneratedOperations{OperationFunc: operation2}) + err := grm.Run(volumeName, "" /* operationSubName */, types.GeneratedOperations{OperationFunc: operation2}) if err != nil { - t.Logf("Warning: NestedPendingOperations failed with %v. Will retry.", err) + t.Logf("Warning: NewGoRoutine failed with %v. Will retry.", err) return false, nil } return true, nil @@ -178,18 +177,18 @@ func Test_NestedPendingOperations_Positive_SecondOpAfterFirstCompletesWithExpBac // Assert if err2 != nil { - t.Fatalf("NestedPendingOperations failed. Expected: Actual: <%v>", err2) + t.Fatalf("NewGoRoutine failed. Expected: Actual: <%v>", err2) } } -func Test_NestedPendingOperations_Positive_SecondOpAfterFirstPanics(t *testing.T) { +func Test_NewGoRoutineMap_Positive_SecondOpAfterFirstPanics(t *testing.T) { // Arrange grm := NewNestedPendingOperations(false /* exponentialBackOffOnError */) volumeName := v1.UniqueVolumeName("volume-name") operation1 := generatePanicFunc() - err1 := grm.Run(volumeName, "" /* operationSubName */, "" /* nodeName */, volumetypes.GeneratedOperations{OperationFunc: operation1}) + err1 := grm.Run(volumeName, "" /* operationSubName */, types.GeneratedOperations{OperationFunc: operation1}) if err1 != nil { - t.Fatalf("NestedPendingOperations failed. Expected: Actual: <%v>", err1) + t.Fatalf("NewGoRoutine failed. Expected: Actual: <%v>", err1) } operation2 := generateNoopFunc() @@ -197,9 +196,9 @@ func Test_NestedPendingOperations_Positive_SecondOpAfterFirstPanics(t *testing.T err2 := retryWithExponentialBackOff( time.Duration(initialOperationWaitTimeShort), func() (bool, error) { - err := grm.Run(volumeName, "" /* operationSubName */, "" /* nodeName */, volumetypes.GeneratedOperations{OperationFunc: operation2}) + err := grm.Run(volumeName, "" /* operationSubName */, types.GeneratedOperations{OperationFunc: operation2}) if err != nil { - t.Logf("Warning: NestedPendingOperations failed with %v. Will retry.", err) + t.Logf("Warning: NewGoRoutine failed with %v. Will retry.", err) return false, nil } return true, nil @@ -208,18 +207,18 @@ func Test_NestedPendingOperations_Positive_SecondOpAfterFirstPanics(t *testing.T // Assert if err2 != nil { - t.Fatalf("NestedPendingOperations failed. Expected: Actual: <%v>", err2) + t.Fatalf("NewGoRoutine failed. Expected: Actual: <%v>", err2) } } -func Test_NestedPendingOperations_Positive_SecondOpAfterFirstPanicsWithExpBackoff(t *testing.T) { +func Test_NewGoRoutineMap_Positive_SecondOpAfterFirstPanicsWithExpBackoff(t *testing.T) { // Arrange grm := NewNestedPendingOperations(true /* exponentialBackOffOnError */) volumeName := v1.UniqueVolumeName("volume-name") operation1 := generatePanicFunc() - err1 := grm.Run(volumeName, "" /* operationSubName */, "" /* nodeName */, volumetypes.GeneratedOperations{OperationFunc: operation1}) + err1 := grm.Run(volumeName, "" /* operationSubName */, types.GeneratedOperations{OperationFunc: operation1}) if err1 != nil { - t.Fatalf("NestedPendingOperations failed. Expected: Actual: <%v>", err1) + t.Fatalf("NewGoRoutine failed. Expected: Actual: <%v>", err1) } operation2 := generateNoopFunc() @@ -227,9 +226,9 @@ func Test_NestedPendingOperations_Positive_SecondOpAfterFirstPanicsWithExpBackof err2 := retryWithExponentialBackOff( time.Duration(initialOperationWaitTimeLong), // Longer duration to accommodate for backoff func() (bool, error) { - err := grm.Run(volumeName, "" /* operationSubName */, "" /* nodeName */, volumetypes.GeneratedOperations{OperationFunc: operation2}) + err := grm.Run(volumeName, "" /* operationSubName */, types.GeneratedOperations{OperationFunc: operation2}) if err != nil { - t.Logf("Warning: NestedPendingOperations failed with %v. Will retry.", err) + t.Logf("Warning: NewGoRoutine failed with %v. Will retry.", err) return false, nil } return true, nil @@ -238,43 +237,43 @@ func Test_NestedPendingOperations_Positive_SecondOpAfterFirstPanicsWithExpBackof // Assert if err2 != nil { - t.Fatalf("NestedPendingOperations failed. Expected: Actual: <%v>", err2) + t.Fatalf("NewGoRoutine failed. Expected: Actual: <%v>", err2) } } -func Test_NestedPendingOperations_Negative_SecondOpBeforeFirstCompletes(t *testing.T) { +func Test_NewGoRoutineMap_Negative_SecondOpBeforeFirstCompletes(t *testing.T) { // Arrange grm := NewNestedPendingOperations(false /* exponentialBackOffOnError */) volumeName := v1.UniqueVolumeName("volume-name") operation1DoneCh := make(chan interface{}, 0 /* bufferSize */) operation1 := generateWaitFunc(operation1DoneCh) - err1 := grm.Run(volumeName, "" /* operationSubName */, "" /* nodeName */, volumetypes.GeneratedOperations{OperationFunc: operation1}) + err1 := grm.Run(volumeName, "" /* operationSubName */, types.GeneratedOperations{OperationFunc: operation1}) if err1 != nil { - t.Fatalf("NestedPendingOperations failed. Expected: Actual: <%v>", err1) + t.Fatalf("NewGoRoutine failed. Expected: Actual: <%v>", err1) } operation2 := generateNoopFunc() // Act - err2 := grm.Run(volumeName, "" /* operationSubName */, "" /* nodeName */, volumetypes.GeneratedOperations{OperationFunc: operation2}) + err2 := grm.Run(volumeName, "" /* operationSubName */, types.GeneratedOperations{OperationFunc: operation2}) // Assert if err2 == nil { - t.Fatalf("NestedPendingOperations did not fail. Expected: Actual: ", volumeName) + t.Fatalf("NewGoRoutine did not fail. Expected: Actual: ", volumeName) } if !IsAlreadyExists(err2) { - t.Fatalf("NestedPendingOperations did not return alreadyExistsError, got: %v", err2) + t.Fatalf("NewGoRoutine did not return alreadyExistsError, got: %v", err2) } } -func Test_NestedPendingOperations_Negative_SecondThirdOpWithDifferentNames(t *testing.T) { +func Test_NewGoRoutineMap_Negative_SecondThirdOpWithDifferentNames(t *testing.T) { // Arrange grm := NewNestedPendingOperations(true /* exponentialBackOffOnError */) volumeName := v1.UniqueVolumeName("volume-name") op1Name := "mount_volume" operation1 := generateErrorFunc() - err1 := grm.Run(volumeName, "" /* operationSubName */, "" /* nodeName */, volumetypes.GeneratedOperations{OperationFunc: operation1, OperationName: op1Name}) + err1 := grm.Run(volumeName, "" /* operationSubName */, types.GeneratedOperations{OperationFunc: operation1, OperationName: op1Name}) if err1 != nil { - t.Fatalf("NestedPendingOperations failed. Expected: Actual: <%v>", err1) + t.Fatalf("NewGoRoutine failed. Expected: Actual: <%v>", err1) } // Shorter than exponential backoff period, so as to trigger exponential backoff error on second // operation. @@ -284,8 +283,7 @@ func Test_NestedPendingOperations_Negative_SecondThirdOpWithDifferentNames(t *te func() (bool, error) { err := grm.Run(volumeName, "", /* operationSubName */ - "", /* nodeName */ - volumetypes.GeneratedOperations{OperationFunc: operation2, OperationName: op1Name}) + types.GeneratedOperations{OperationFunc: operation2, OperationName: op1Name}) if exponentialbackoff.IsExponentialBackoff(err) { return true, nil @@ -296,114 +294,114 @@ func Test_NestedPendingOperations_Negative_SecondThirdOpWithDifferentNames(t *te // Assert if err2 != nil { - t.Fatalf("Expected NestedPendingOperations to fail with exponential backoff for operationKey : %s and operationName : %s", volumeName, op1Name) + t.Fatalf("Expected NewGoRoutine to fail with exponential backoff for operationKey : %s and operationName : %s", volumeName, op1Name) } operation3 := generateNoopFunc() op3Name := "unmount_volume" // Act - err3 := grm.Run(volumeName, "" /*pod name*/, "" /* nodeName */, volumetypes.GeneratedOperations{OperationFunc: operation3, OperationName: op3Name}) + err3 := grm.Run(volumeName, "" /*pod name*/, types.GeneratedOperations{OperationFunc: operation3, OperationName: op3Name}) if err3 != nil { - t.Fatalf("NestedPendingOperations failed. Expected Actual: <%v>", err3) + t.Fatalf("NewGoRoutine failed. Expected Actual: <%v>", err3) } } -func Test_NestedPendingOperations_Negative_SecondSubOpBeforeFirstCompletes2(t *testing.T) { +func Test_NewGoRoutineMap_Negative_SecondSubOpBeforeFirstCompletes2(t *testing.T) { // Arrange grm := NewNestedPendingOperations(false /* exponentialBackOffOnError */) volumeName := v1.UniqueVolumeName("volume-name") - operationPodName := volumetypes.UniquePodName("operation-podname") + operationPodName := types.UniquePodName("operation-podname") operation1DoneCh := make(chan interface{}, 0 /* bufferSize */) operation1 := generateWaitFunc(operation1DoneCh) - err1 := grm.Run(volumeName, operationPodName, "" /* nodeName */, volumetypes.GeneratedOperations{OperationFunc: operation1}) + err1 := grm.Run(volumeName, operationPodName, types.GeneratedOperations{OperationFunc: operation1}) if err1 != nil { - t.Fatalf("NestedPendingOperations failed. Expected: Actual: <%v>", err1) + t.Fatalf("NewGoRoutine failed. Expected: Actual: <%v>", err1) } operation2 := generateNoopFunc() // Act - err2 := grm.Run(volumeName, operationPodName, "" /* nodeName */, volumetypes.GeneratedOperations{OperationFunc: operation2}) + err2 := grm.Run(volumeName, operationPodName, types.GeneratedOperations{OperationFunc: operation2}) // Assert if err2 == nil { - t.Fatalf("NestedPendingOperations did not fail. Expected: Actual: ", volumeName) + t.Fatalf("NewGoRoutine did not fail. Expected: Actual: ", volumeName) } if !IsAlreadyExists(err2) { - t.Fatalf("NestedPendingOperations did not return alreadyExistsError, got: %v", err2) + t.Fatalf("NewGoRoutine did not return alreadyExistsError, got: %v", err2) } } -func Test_NestedPendingOperations_Negative_SecondSubOpBeforeFirstCompletes(t *testing.T) { +func Test_NewGoRoutineMap_Negative_SecondSubOpBeforeFirstCompletes(t *testing.T) { // Arrange grm := NewNestedPendingOperations(false /* exponentialBackOffOnError */) volumeName := v1.UniqueVolumeName("volume-name") - operationPodName := volumetypes.UniquePodName("operation-podname") + operationPodName := types.UniquePodName("operation-podname") operation1DoneCh := make(chan interface{}, 0 /* bufferSize */) operation1 := generateWaitFunc(operation1DoneCh) - err1 := grm.Run(volumeName, operationPodName, "" /* nodeName */, volumetypes.GeneratedOperations{OperationFunc: operation1}) + err1 := grm.Run(volumeName, operationPodName, types.GeneratedOperations{OperationFunc: operation1}) if err1 != nil { - t.Fatalf("NestedPendingOperations failed. Expected: Actual: <%v>", err1) + t.Fatalf("NewGoRoutine failed. Expected: Actual: <%v>", err1) } operation2 := generateNoopFunc() // Act - err2 := grm.Run(volumeName, operationPodName, "" /* nodeName */, volumetypes.GeneratedOperations{OperationFunc: operation2}) + err2 := grm.Run(volumeName, operationPodName, types.GeneratedOperations{OperationFunc: operation2}) // Assert if err2 == nil { - t.Fatalf("NestedPendingOperations did not fail. Expected: Actual: ", volumeName) + t.Fatalf("NewGoRoutine did not fail. Expected: Actual: ", volumeName) } if !IsAlreadyExists(err2) { - t.Fatalf("NestedPendingOperations did not return alreadyExistsError, got: %v", err2) + t.Fatalf("NewGoRoutine did not return alreadyExistsError, got: %v", err2) } } -func Test_NestedPendingOperations_Negative_SecondOpBeforeFirstCompletesWithExpBackoff(t *testing.T) { +func Test_NewGoRoutineMap_Negative_SecondOpBeforeFirstCompletesWithExpBackoff(t *testing.T) { // Arrange grm := NewNestedPendingOperations(true /* exponentialBackOffOnError */) volumeName := v1.UniqueVolumeName("volume-name") operation1DoneCh := make(chan interface{}, 0 /* bufferSize */) operation1 := generateWaitFunc(operation1DoneCh) - err1 := grm.Run(volumeName, "" /* operationSubName */, "" /* nodeName */, volumetypes.GeneratedOperations{OperationFunc: operation1}) + err1 := grm.Run(volumeName, "" /* operationSubName */, types.GeneratedOperations{OperationFunc: operation1}) if err1 != nil { - t.Fatalf("NestedPendingOperations failed. Expected: Actual: <%v>", err1) + t.Fatalf("NewGoRoutine failed. Expected: Actual: <%v>", err1) } operation2 := generateNoopFunc() // Act - err2 := grm.Run(volumeName, "" /* operationSubName */, "" /* nodeName */, volumetypes.GeneratedOperations{OperationFunc: operation2}) + err2 := grm.Run(volumeName, "" /* operationSubName */, types.GeneratedOperations{OperationFunc: operation2}) // Assert if err2 == nil { - t.Fatalf("NestedPendingOperations did not fail. Expected: Actual: ", volumeName) + t.Fatalf("NewGoRoutine did not fail. Expected: Actual: ", volumeName) } if !IsAlreadyExists(err2) { - t.Fatalf("NestedPendingOperations did not return alreadyExistsError, got: %v", err2) + t.Fatalf("NewGoRoutine did not return alreadyExistsError, got: %v", err2) } } -func Test_NestedPendingOperations_Positive_ThirdOpAfterFirstCompletes(t *testing.T) { +func Test_NewGoRoutineMap_Positive_ThirdOpAfterFirstCompletes(t *testing.T) { // Arrange grm := NewNestedPendingOperations(false /* exponentialBackOffOnError */) volumeName := v1.UniqueVolumeName("volume-name") operation1DoneCh := make(chan interface{}, 0 /* bufferSize */) operation1 := generateWaitFunc(operation1DoneCh) - err1 := grm.Run(volumeName, "" /* operationSubName */, "" /* nodeName */, volumetypes.GeneratedOperations{OperationFunc: operation1}) + err1 := grm.Run(volumeName, "" /* operationSubName */, types.GeneratedOperations{OperationFunc: operation1}) if err1 != nil { - t.Fatalf("NestedPendingOperations failed. Expected: Actual: <%v>", err1) + t.Fatalf("NewGoRoutine failed. Expected: Actual: <%v>", err1) } operation2 := generateNoopFunc() operation3 := generateNoopFunc() // Act - err2 := grm.Run(volumeName, "" /* operationSubName */, "" /* nodeName */, volumetypes.GeneratedOperations{OperationFunc: operation2}) + err2 := grm.Run(volumeName, "" /* operationSubName */, types.GeneratedOperations{OperationFunc: operation2}) // Assert if err2 == nil { - t.Fatalf("NestedPendingOperations did not fail. Expected: Actual: ", volumeName) + t.Fatalf("NewGoRoutine did not fail. Expected: Actual: ", volumeName) } if !IsAlreadyExists(err2) { - t.Fatalf("NestedPendingOperations did not return alreadyExistsError, got: %v", err2) + t.Fatalf("NewGoRoutine did not return alreadyExistsError, got: %v", err2) } // Act @@ -411,9 +409,9 @@ func Test_NestedPendingOperations_Positive_ThirdOpAfterFirstCompletes(t *testing err3 := retryWithExponentialBackOff( time.Duration(initialOperationWaitTimeShort), func() (bool, error) { - err := grm.Run(volumeName, "" /* operationSubName */, "" /* nodeName */, volumetypes.GeneratedOperations{OperationFunc: operation3}) + err := grm.Run(volumeName, "" /* operationSubName */, types.GeneratedOperations{OperationFunc: operation3}) if err != nil { - t.Logf("Warning: NestedPendingOperations failed with %v. Will retry.", err) + t.Logf("Warning: NewGoRoutine failed with %v. Will retry.", err) return false, nil } return true, nil @@ -422,32 +420,32 @@ func Test_NestedPendingOperations_Positive_ThirdOpAfterFirstCompletes(t *testing // Assert if err3 != nil { - t.Fatalf("NestedPendingOperations failed. Expected: Actual: <%v>", err3) + t.Fatalf("NewGoRoutine failed. Expected: Actual: <%v>", err3) } } -func Test_NestedPendingOperations_Positive_ThirdOpAfterFirstCompletesWithExpBackoff(t *testing.T) { +func Test_NewGoRoutineMap_Positive_ThirdOpAfterFirstCompletesWithExpBackoff(t *testing.T) { // Arrange grm := NewNestedPendingOperations(true /* exponentialBackOffOnError */) volumeName := v1.UniqueVolumeName("volume-name") operation1DoneCh := make(chan interface{}, 0 /* bufferSize */) operation1 := generateWaitFunc(operation1DoneCh) - err1 := grm.Run(volumeName, "" /* operationSubName */, "" /* nodeName */, volumetypes.GeneratedOperations{OperationFunc: operation1}) + err1 := grm.Run(volumeName, "" /* operationSubName */, types.GeneratedOperations{OperationFunc: operation1}) if err1 != nil { - t.Fatalf("NestedPendingOperations failed. Expected: Actual: <%v>", err1) + t.Fatalf("NewGoRoutine failed. Expected: Actual: <%v>", err1) } operation2 := generateNoopFunc() operation3 := generateNoopFunc() // Act - err2 := grm.Run(volumeName, "" /* operationSubName */, "" /* nodeName */, volumetypes.GeneratedOperations{OperationFunc: operation2}) + err2 := grm.Run(volumeName, "" /* operationSubName */, types.GeneratedOperations{OperationFunc: operation2}) // Assert if err2 == nil { - t.Fatalf("NestedPendingOperations did not fail. Expected: Actual: ", volumeName) + t.Fatalf("NewGoRoutine did not fail. Expected: Actual: ", volumeName) } if !IsAlreadyExists(err2) { - t.Fatalf("NestedPendingOperations did not return alreadyExistsError, got: %v", err2) + t.Fatalf("NewGoRoutine did not return alreadyExistsError, got: %v", err2) } // Act @@ -455,9 +453,9 @@ func Test_NestedPendingOperations_Positive_ThirdOpAfterFirstCompletesWithExpBack err3 := retryWithExponentialBackOff( time.Duration(initialOperationWaitTimeShort), func() (bool, error) { - err := grm.Run(volumeName, "" /* operationSubName */, "" /* nodeName */, volumetypes.GeneratedOperations{OperationFunc: operation3}) + err := grm.Run(volumeName, "" /* operationSubName */, types.GeneratedOperations{OperationFunc: operation3}) if err != nil { - t.Logf("Warning: NestedPendingOperations failed with %v. Will retry.", err) + t.Logf("Warning: NewGoRoutine failed with %v. Will retry.", err) return false, nil } return true, nil @@ -466,11 +464,11 @@ func Test_NestedPendingOperations_Positive_ThirdOpAfterFirstCompletesWithExpBack // Assert if err3 != nil { - t.Fatalf("NestedPendingOperations failed. Expected: Actual: <%v>", err3) + t.Fatalf("NewGoRoutine failed. Expected: Actual: <%v>", err3) } } -func Test_NestedPendingOperations_Positive_WaitEmpty(t *testing.T) { +func Test_NewGoRoutineMap_Positive_WaitEmpty(t *testing.T) { // Test than Wait() on empty GoRoutineMap always succeeds without blocking // Arrange grm := NewNestedPendingOperations(false /* exponentialBackOffOnError */) @@ -489,7 +487,7 @@ func Test_NestedPendingOperations_Positive_WaitEmpty(t *testing.T) { } } -func Test_NestedPendingOperations_Positive_WaitEmptyWithExpBackoff(t *testing.T) { +func Test_NewGoRoutineMap_Positive_WaitEmptyWithExpBackoff(t *testing.T) { // Test than Wait() on empty GoRoutineMap always succeeds without blocking // Arrange grm := NewNestedPendingOperations(true /* exponentialBackOffOnError */) @@ -508,16 +506,16 @@ func Test_NestedPendingOperations_Positive_WaitEmptyWithExpBackoff(t *testing.T) } } -func Test_NestedPendingOperations_Positive_Wait(t *testing.T) { +func Test_NewGoRoutineMap_Positive_Wait(t *testing.T) { // Test that Wait() really blocks until the last operation succeeds // Arrange grm := NewNestedPendingOperations(false /* exponentialBackOffOnError */) volumeName := v1.UniqueVolumeName("volume-name") operation1DoneCh := make(chan interface{}, 0 /* bufferSize */) operation1 := generateWaitFunc(operation1DoneCh) - err := grm.Run(volumeName, "" /* operationSubName */, "" /* nodeName */, volumetypes.GeneratedOperations{OperationFunc: operation1}) + err := grm.Run(volumeName, "" /* operationSubName */, types.GeneratedOperations{OperationFunc: operation1}) if err != nil { - t.Fatalf("NestedPendingOperations failed. Expected: Actual: <%v>", err) + t.Fatalf("NewGoRoutine failed. Expected: Actual: <%v>", err) } // Act @@ -537,16 +535,16 @@ func Test_NestedPendingOperations_Positive_Wait(t *testing.T) { } } -func Test_NestedPendingOperations_Positive_WaitWithExpBackoff(t *testing.T) { +func Test_NewGoRoutineMap_Positive_WaitWithExpBackoff(t *testing.T) { // Test that Wait() really blocks until the last operation succeeds // Arrange grm := NewNestedPendingOperations(true /* exponentialBackOffOnError */) volumeName := v1.UniqueVolumeName("volume-name") operation1DoneCh := make(chan interface{}, 0 /* bufferSize */) operation1 := generateWaitFunc(operation1DoneCh) - err := grm.Run(volumeName, "" /* operationSubName */, "" /* nodeName */, volumetypes.GeneratedOperations{OperationFunc: operation1}) + err := grm.Run(volumeName, "" /* operationSubName */, types.GeneratedOperations{OperationFunc: operation1}) if err != nil { - t.Fatalf("NestedPendingOperations failed. Expected: Actual: <%v>", err) + t.Fatalf("NewGoRoutine failed. Expected: Actual: <%v>", err) } // Act @@ -566,215 +564,6 @@ func Test_NestedPendingOperations_Positive_WaitWithExpBackoff(t *testing.T) { } } -/* Concurrent operations tests */ - -// Covered cases: -// FIRST OP | SECOND OP | RESULT -// None | None | Positive -// None | Volume | Positive -// None | Volume Pod | Positive -// None | Volume Node | Positive -// Volume | None | Positive -// Volume | Volume | Negative (covered in Test_NestedPendingOperations_Negative_SecondOpBeforeFirstCompletes above) -// Volume | Volume Pod | Negative -// Volume | Volume Node | Negative -// Volume Pod | None | Positive -// Volume Pod | Volume | Negative -// Volume Pod | Volume Pod | Negative (covered in Test_NestedPendingOperations_Negative_SecondSubOpBeforeFirstCompletes above) -// Volume Node | None | Positive -// Volume Node | Volume | Negative -// Volume Node | Volume Node | Negative - -// These cases are not covered because they will never occur within the same -// binary, so either result works. -// Volume Pod | Volume Node -// Volume Node | Volume Pod - -func Test_NestedPendingOperations_Positive_SecondOpBeforeFirstCompletes_FirstNone_SecondNone(t *testing.T) { - testConcurrentOperationsPositive(t, - "", /* volumeName1 */ - "", /* podName1 */ - "", /* nodeName1 */ - "", /* volumeName1 */ - "", /* podName2 */ - "" /* nodeName2 */) -} - -func Test_NestedPendingOperations_Positive_SecondOpBeforeFirstCompletes_FirstNone_SecondVolume(t *testing.T) { - testConcurrentOperationsPositive(t, - "", /* volumeName1 */ - "", /* podName1 */ - "", /* nodeName1 */ - v1.UniqueVolumeName("volume-name"), - "", /* podName2 */ - "" /* nodeName2 */) -} - -func Test_NestedPendingOperations_Positive_SecondOpBeforeFirstCompletes_FirstNone_SecondVolumePod(t *testing.T) { - testConcurrentOperationsPositive(t, - "", /* volumeName1 */ - "", /* podName1 */ - "", /* nodeName1 */ - v1.UniqueVolumeName("volume-name"), - volumetypes.UniquePodName("operation-podname"), - "" /* nodeName2 */) -} - -func Test_NestedPendingOperations_Positive_SecondOpBeforeFirstCompletes_FirstNone_SecondVolumeNode(t *testing.T) { - testConcurrentOperationsPositive(t, - "", /* volumeName1 */ - "", /* podName1 */ - "", /* nodeName1 */ - v1.UniqueVolumeName("volume-name"), - "", /* podName2 */ - types.NodeName("operation-nodename")) -} - -func Test_NestedPendingOperations_Positive_SecondOpBeforeFirstCompletes_FirstVolume_SecondNone(t *testing.T) { - testConcurrentOperationsPositive(t, - v1.UniqueVolumeName("volume-name"), - "", /* podName1 */ - "", /* nodeName1 */ - "", /* volumeName1 */ - "", /* podName2 */ - "" /* nodeName2 */) -} - -func Test_NestedPendingOperations_Negative_SecondOpBeforeFirstCompletes_FirstVolume_SecondVolumePod(t *testing.T) { - testConcurrentOperationsNegative(t, - v1.UniqueVolumeName("volume-name"), - "", /* podName1 */ - "", /* nodeName1 */ - v1.UniqueVolumeName("volume-name"), - volumetypes.UniquePodName("operation-podname"), - "" /* nodeName2 */) -} - -func Test_NestedPendingOperations_Negative_SecondOpBeforeFirstCompletes_FirstVolume_SecondVolumeNode(t *testing.T) { - testConcurrentOperationsNegative(t, - v1.UniqueVolumeName("volume-name"), - "", /* podName1 */ - "", /* nodeName1 */ - v1.UniqueVolumeName("volume-name"), - "", /* podName2 */ - types.NodeName("operation-nodename")) -} - -func Test_NestedPendingOperations_Positive_SecondOpBeforeFirstCompletes_FirstVolumePod_SecondNone(t *testing.T) { - testConcurrentOperationsPositive(t, - v1.UniqueVolumeName("volume-name"), - volumetypes.UniquePodName("operation-podname"), - "", /* nodeName1 */ - "", /* volumeName1 */ - "", /* podName2 */ - "" /* nodeName2 */) -} - -func Test_NestedPendingOperations_Negative_SecondOpBeforeFirstCompletes_FirstVolumePod_SecondVolume(t *testing.T) { - testConcurrentOperationsNegative(t, - v1.UniqueVolumeName("volume-name"), - volumetypes.UniquePodName("operation-podname"), - "", /* nodeName1 */ - v1.UniqueVolumeName("volume-name"), - "", /* podName2 */ - "" /* nodeName2 */) -} - -func Test_NestedPendingOperations_Positive_SecondOpBeforeFirstCompletes_FirstVolumeNode_SecondNone(t *testing.T) { - testConcurrentOperationsPositive(t, - v1.UniqueVolumeName("volume-name"), - "", /* podName1 */ - types.NodeName("operation-nodename"), - "", /* volumeName1 */ - "", /* podName2 */ - "" /* nodeName2 */) -} - -func Test_NestedPendingOperations_Negative_SecondOpBeforeFirstCompletes_FirstVolumeNode_SecondVolume(t *testing.T) { - testConcurrentOperationsNegative(t, - v1.UniqueVolumeName("volume-name"), - "", /* podName1 */ - types.NodeName("operation-nodename"), - v1.UniqueVolumeName("volume-name"), - "", /* podName2 */ - "" /* nodeName2 */) -} - -func Test_NestedPendingOperations_Negative_SecondOpBeforeFirstCompletes_FirstVolumeNode_SecondVolumeNode(t *testing.T) { - testConcurrentOperationsNegative(t, - v1.UniqueVolumeName("volume-name"), - "", /* podName1 */ - types.NodeName("operation-nodename"), - v1.UniqueVolumeName("volume-name"), - "", /* podName2 */ - types.NodeName("operation-nodename")) -} - -// testConcurrentOperationsPositive passes if the two operations keyed by the -// provided parameters are executed in parallel, and fails otherwise. -func testConcurrentOperationsPositive( - t *testing.T, - volumeName1 v1.UniqueVolumeName, - podName1 volumetypes.UniquePodName, - nodeName1 types.NodeName, - volumeName2 v1.UniqueVolumeName, - podName2 volumetypes.UniquePodName, - nodeName2 types.NodeName) { - - // Arrange - grm := NewNestedPendingOperations(false /* exponentialBackOffOnError */) - operation1DoneCh := make(chan interface{}, 0 /* bufferSize */) - operation1 := generateWaitFunc(operation1DoneCh) - err1 := grm.Run(volumeName1, podName1, nodeName1 /* nodeName */, volumetypes.GeneratedOperations{OperationFunc: operation1}) - if err1 != nil { - t.Fatalf("NestedPendingOperations failed. Expected: Actual: <%v>", err1) - } - operation2 := generateNoopFunc() - - // Act - err2 := grm.Run(volumeName2, podName2, nodeName2, volumetypes.GeneratedOperations{OperationFunc: operation2}) - - // Assert - if err2 != nil { - t.Fatalf("NestedPendingOperations failed. Expected: Actual: <%v>", err2) - } -} - -// testConcurrentOperationsNegative passes if the creation of the second -// operation returns an alreadyExists error, and fails otherwise. -func testConcurrentOperationsNegative( - t *testing.T, - volumeName1 v1.UniqueVolumeName, - podName1 volumetypes.UniquePodName, - nodeName1 types.NodeName, - volumeName2 v1.UniqueVolumeName, - podName2 volumetypes.UniquePodName, - nodeName2 types.NodeName) { - - // Arrange - grm := NewNestedPendingOperations(false /* exponentialBackOffOnError */) - operation1DoneCh := make(chan interface{}, 0 /* bufferSize */) - operation1 := generateWaitFunc(operation1DoneCh) - err1 := grm.Run(volumeName1, podName1, nodeName1 /* nodeName */, volumetypes.GeneratedOperations{OperationFunc: operation1}) - if err1 != nil { - t.Fatalf("NestedPendingOperations failed. Expected: Actual: <%v>", err1) - } - operation2 := generateNoopFunc() - - // Act - err2 := grm.Run(volumeName2, podName2, nodeName2, volumetypes.GeneratedOperations{OperationFunc: operation2}) - - // Assert - if err2 == nil { - t.Fatalf("NestedPendingOperations did not fail. Expected an operation to already exist") - } - if !IsAlreadyExists(err2) { - t.Fatalf("NestedPendingOperations did not return alreadyExistsError, got: %v", err2) - } -} - -/* END concurrent operations tests */ - func generateCallbackFunc(done chan<- interface{}) func() (error, error) { return func() (error, error) { done <- true diff --git a/pkg/volume/util/operationexecutor/operation_executor.go b/pkg/volume/util/operationexecutor/operation_executor.go index 27caa231820..463770f1730 100644 --- a/pkg/volume/util/operationexecutor/operation_executor.go +++ b/pkg/volume/util/operationexecutor/operation_executor.go @@ -138,9 +138,9 @@ type OperationExecutor interface { // back off on retries. VerifyControllerAttachedVolume(volumeToMount VolumeToMount, nodeName types.NodeName, actualStateOfWorld ActualStateOfWorldAttacherUpdater) error - // IsOperationPending returns true if an operation for the given volumeName - // and one of podName or nodeName is pending, otherwise it returns false - IsOperationPending(volumeName v1.UniqueVolumeName, podName volumetypes.UniquePodName, nodeName types.NodeName) bool + // IsOperationPending returns true if an operation for the given volumeName and podName is pending, + // otherwise it returns false + IsOperationPending(volumeName v1.UniqueVolumeName, podName volumetypes.UniquePodName) bool // ExpandInUseVolume will resize volume's file system to expected size without unmounting the volume. ExpandInUseVolume(volumeToMount VolumeToMount, actualStateOfWorld ActualStateOfWorldMounterUpdater) error // ReconstructVolumeOperation construct a new volumeSpec and returns it created by plugin @@ -650,11 +650,8 @@ type operationExecutor struct { operationGenerator OperationGenerator } -func (oe *operationExecutor) IsOperationPending( - volumeName v1.UniqueVolumeName, - podName volumetypes.UniquePodName, - nodeName types.NodeName) bool { - return oe.pendingOperations.IsOperationPending(volumeName, podName, nodeName) +func (oe *operationExecutor) IsOperationPending(volumeName v1.UniqueVolumeName, podName volumetypes.UniquePodName) bool { + return oe.pendingOperations.IsOperationPending(volumeName, podName) } func (oe *operationExecutor) AttachVolume( @@ -663,13 +660,8 @@ func (oe *operationExecutor) AttachVolume( generatedOperations := oe.operationGenerator.GenerateAttachVolumeFunc(volumeToAttach, actualStateOfWorld) - if util.IsMultiAttachForbidden(volumeToAttach.VolumeSpec) { - return oe.pendingOperations.Run( - volumeToAttach.VolumeName, "" /* podName */, "" /* nodeName */, generatedOperations) - } - return oe.pendingOperations.Run( - volumeToAttach.VolumeName, "" /* podName */, volumeToAttach.NodeName, generatedOperations) + volumeToAttach.VolumeName, "" /* podName */, generatedOperations) } func (oe *operationExecutor) DetachVolume( @@ -682,13 +674,8 @@ func (oe *operationExecutor) DetachVolume( return err } - if util.IsMultiAttachForbidden(volumeToDetach.VolumeSpec) { - return oe.pendingOperations.Run( - volumeToDetach.VolumeName, "" /* podName */, "" /* nodeName */, generatedOperations) - } - return oe.pendingOperations.Run( - volumeToDetach.VolumeName, "" /* podName */, volumeToDetach.NodeName, generatedOperations) + volumeToDetach.VolumeName, "" /* podName */, generatedOperations) } func (oe *operationExecutor) VerifyVolumesAreAttached( @@ -774,7 +761,7 @@ func (oe *operationExecutor) VerifyVolumesAreAttached( // Ugly hack to ensure - we don't do parallel bulk polling of same volume plugin uniquePluginName := v1.UniqueVolumeName(pluginName) - err = oe.pendingOperations.Run(uniquePluginName, "" /* Pod Name */, "" /* nodeName */, generatedOperations) + err = oe.pendingOperations.Run(uniquePluginName, "" /* Pod Name */, generatedOperations) if err != nil { klog.Errorf("BulkVerifyVolumes.Run Error bulk volume verification for plugin %q with %v", pluginName, err) } @@ -792,7 +779,7 @@ func (oe *operationExecutor) VerifyVolumesAreAttachedPerNode( } // Give an empty UniqueVolumeName so that this operation could be executed concurrently. - return oe.pendingOperations.Run("" /* volumeName */, "" /* podName */, "" /* nodeName */, generatedOperations) + return oe.pendingOperations.Run("" /* volumeName */, "" /* podName */, generatedOperations) } func (oe *operationExecutor) MountVolume( @@ -833,7 +820,7 @@ func (oe *operationExecutor) MountVolume( // TODO mount_device return oe.pendingOperations.Run( - volumeToMount.VolumeName, podName, "" /* nodeName */, generatedOperations) + volumeToMount.VolumeName, podName, generatedOperations) } func (oe *operationExecutor) UnmountVolume( @@ -864,7 +851,7 @@ func (oe *operationExecutor) UnmountVolume( podName := volumetypes.UniquePodName(volumeToUnmount.PodUID) return oe.pendingOperations.Run( - volumeToUnmount.VolumeName, podName, "" /* nodeName */, generatedOperations) + volumeToUnmount.VolumeName, podName, generatedOperations) } func (oe *operationExecutor) UnmountDevice( @@ -895,7 +882,7 @@ func (oe *operationExecutor) UnmountDevice( podName := nestedpendingoperations.EmptyUniquePodName return oe.pendingOperations.Run( - deviceToDetach.VolumeName, podName, "" /* nodeName */, generatedOperations) + deviceToDetach.VolumeName, podName, generatedOperations) } func (oe *operationExecutor) ExpandInUseVolume(volumeToMount VolumeToMount, actualStateOfWorld ActualStateOfWorldMounterUpdater) error { @@ -903,7 +890,7 @@ func (oe *operationExecutor) ExpandInUseVolume(volumeToMount VolumeToMount, actu if err != nil { return err } - return oe.pendingOperations.Run(volumeToMount.VolumeName, "", "" /* nodeName */, generatedOperations) + return oe.pendingOperations.Run(volumeToMount.VolumeName, "", generatedOperations) } func (oe *operationExecutor) VerifyControllerAttachedVolume( @@ -917,7 +904,7 @@ func (oe *operationExecutor) VerifyControllerAttachedVolume( } return oe.pendingOperations.Run( - volumeToMount.VolumeName, "" /* podName */, "" /* nodeName */, generatedOperations) + volumeToMount.VolumeName, "" /* podName */, generatedOperations) } // ReconstructVolumeOperation return a func to create volumeSpec from mount path diff --git a/pkg/volume/util/operationexecutor/operation_executor_test.go b/pkg/volume/util/operationexecutor/operation_executor_test.go index da4983d5125..adc199a004f 100644 --- a/pkg/volume/util/operationexecutor/operation_executor_test.go +++ b/pkg/volume/util/operationexecutor/operation_executor_test.go @@ -17,7 +17,6 @@ limitations under the License. package operationexecutor import ( - "fmt" "strconv" "testing" "time" @@ -181,7 +180,7 @@ func TestOperationExecutor_UnmountDeviceConcurrently(t *testing.T) { } } -func TestOperationExecutor_AttachSingleNodeVolumeConcurrentlyToSameNode(t *testing.T) { +func TestOperationExecutor_AttachVolumeConcurrently(t *testing.T) { // Arrange ch, quit, oe := setup() volumesToAttach := make([]VolumeToAttach, numVolumesToAttach) @@ -192,13 +191,6 @@ func TestOperationExecutor_AttachSingleNodeVolumeConcurrentlyToSameNode(t *testi volumesToAttach[i] = VolumeToAttach{ VolumeName: v1.UniqueVolumeName(pdName), NodeName: "node", - VolumeSpec: &volume.Spec{ - PersistentVolume: &v1.PersistentVolume{ - Spec: v1.PersistentVolumeSpec{ - AccessModes: []v1.PersistentVolumeAccessMode{v1.ReadWriteOnce}, - }, - }, - }, } oe.AttachVolume(volumesToAttach[i], nil /* actualStateOfWorldAttacherUpdater */) } @@ -209,91 +201,7 @@ func TestOperationExecutor_AttachSingleNodeVolumeConcurrentlyToSameNode(t *testi } } -func TestOperationExecutor_AttachMultiNodeVolumeConcurrentlyToSameNode(t *testing.T) { - // Arrange - ch, quit, oe := setup() - volumesToAttach := make([]VolumeToAttach, numVolumesToAttach) - pdName := "pd-volume" - - // Act - for i := range volumesToAttach { - volumesToAttach[i] = VolumeToAttach{ - VolumeName: v1.UniqueVolumeName(pdName), - NodeName: "node", - VolumeSpec: &volume.Spec{ - PersistentVolume: &v1.PersistentVolume{ - Spec: v1.PersistentVolumeSpec{ - AccessModes: []v1.PersistentVolumeAccessMode{v1.ReadOnlyMany}, - }, - }, - }, - } - oe.AttachVolume(volumesToAttach[i], nil /* actualStateOfWorldAttacherUpdater */) - } - - // Assert - if !isOperationRunSerially(ch, quit) { - t.Fatalf("Attach volume operations should not start concurrently") - } -} - -func TestOperationExecutor_AttachSingleNodeVolumeConcurrentlyToDifferentNodes(t *testing.T) { - // Arrange - ch, quit, oe := setup() - volumesToAttach := make([]VolumeToAttach, numVolumesToAttach) - pdName := "pd-volume" - - // Act - for i := range volumesToAttach { - volumesToAttach[i] = VolumeToAttach{ - VolumeName: v1.UniqueVolumeName(pdName), - NodeName: types.NodeName(fmt.Sprintf("node%d", i)), - VolumeSpec: &volume.Spec{ - PersistentVolume: &v1.PersistentVolume{ - Spec: v1.PersistentVolumeSpec{ - AccessModes: []v1.PersistentVolumeAccessMode{v1.ReadWriteOnce}, - }, - }, - }, - } - oe.AttachVolume(volumesToAttach[i], nil /* actualStateOfWorldAttacherUpdater */) - } - - // Assert - if !isOperationRunSerially(ch, quit) { - t.Fatalf("Attach volume operations should not start concurrently") - } -} - -func TestOperationExecutor_AttachMultiNodeVolumeConcurrentlyToDifferentNodes(t *testing.T) { - // Arrange - ch, quit, oe := setup() - volumesToAttach := make([]VolumeToAttach, numVolumesToAttach) - pdName := "pd-volume" - - // Act - for i := range volumesToAttach { - volumesToAttach[i] = VolumeToAttach{ - VolumeName: v1.UniqueVolumeName(pdName), - NodeName: types.NodeName(fmt.Sprintf("node%d", i)), - VolumeSpec: &volume.Spec{ - PersistentVolume: &v1.PersistentVolume{ - Spec: v1.PersistentVolumeSpec{ - AccessModes: []v1.PersistentVolumeAccessMode{v1.ReadOnlyMany}, - }, - }, - }, - } - oe.AttachVolume(volumesToAttach[i], nil /* actualStateOfWorldAttacherUpdater */) - } - - // Assert - if !isOperationRunConcurrently(ch, quit, numVolumesToAttach) { - t.Fatalf("Attach volume operations should not execute serially") - } -} - -func TestOperationExecutor_DetachSingleNodeVolumeConcurrentlyFromSameNode(t *testing.T) { +func TestOperationExecutor_DetachVolumeConcurrently(t *testing.T) { // Arrange ch, quit, oe := setup() attachedVolumes := make([]AttachedVolume, numVolumesToDetach) @@ -304,13 +212,6 @@ func TestOperationExecutor_DetachSingleNodeVolumeConcurrentlyFromSameNode(t *tes attachedVolumes[i] = AttachedVolume{ VolumeName: v1.UniqueVolumeName(pdName), NodeName: "node", - VolumeSpec: &volume.Spec{ - PersistentVolume: &v1.PersistentVolume{ - Spec: v1.PersistentVolumeSpec{ - AccessModes: []v1.PersistentVolumeAccessMode{v1.ReadWriteOnce}, - }, - }, - }, } oe.DetachVolume(attachedVolumes[i], true /* verifySafeToDetach */, nil /* actualStateOfWorldAttacherUpdater */) } @@ -321,91 +222,7 @@ func TestOperationExecutor_DetachSingleNodeVolumeConcurrentlyFromSameNode(t *tes } } -func TestOperationExecutor_DetachMultiNodeVolumeConcurrentlyFromSameNode(t *testing.T) { - // Arrange - ch, quit, oe := setup() - attachedVolumes := make([]AttachedVolume, numVolumesToDetach) - pdName := "pd-volume" - - // Act - for i := range attachedVolumes { - attachedVolumes[i] = AttachedVolume{ - VolumeName: v1.UniqueVolumeName(pdName), - NodeName: "node", - VolumeSpec: &volume.Spec{ - PersistentVolume: &v1.PersistentVolume{ - Spec: v1.PersistentVolumeSpec{ - AccessModes: []v1.PersistentVolumeAccessMode{v1.ReadOnlyMany}, - }, - }, - }, - } - oe.DetachVolume(attachedVolumes[i], true /* verifySafeToDetach */, nil /* actualStateOfWorldAttacherUpdater */) - } - - // Assert - if !isOperationRunSerially(ch, quit) { - t.Fatalf("DetachVolume operations should not run concurrently") - } -} - -func TestOperationExecutor_DetachSingleNodeVolumeConcurrentlyFromDifferentNodes(t *testing.T) { - // Arrange - ch, quit, oe := setup() - attachedVolumes := make([]AttachedVolume, numVolumesToDetach) - pdName := "pd-volume" - - // Act - for i := range attachedVolumes { - attachedVolumes[i] = AttachedVolume{ - VolumeName: v1.UniqueVolumeName(pdName), - NodeName: types.NodeName(fmt.Sprintf("node%d", i)), - VolumeSpec: &volume.Spec{ - PersistentVolume: &v1.PersistentVolume{ - Spec: v1.PersistentVolumeSpec{ - AccessModes: []v1.PersistentVolumeAccessMode{v1.ReadWriteOnce}, - }, - }, - }, - } - oe.DetachVolume(attachedVolumes[i], true /* verifySafeToDetach */, nil /* actualStateOfWorldAttacherUpdater */) - } - - // Assert - if !isOperationRunSerially(ch, quit) { - t.Fatalf("DetachVolume operations should not run concurrently") - } -} - -func TestOperationExecutor_DetachMultiNodeVolumeConcurrentlyFromDifferentNodes(t *testing.T) { - // Arrange - ch, quit, oe := setup() - attachedVolumes := make([]AttachedVolume, numVolumesToDetach) - pdName := "pd-volume" - - // Act - for i := range attachedVolumes { - attachedVolumes[i] = AttachedVolume{ - VolumeName: v1.UniqueVolumeName(pdName), - NodeName: types.NodeName(fmt.Sprintf("node%d", i)), - VolumeSpec: &volume.Spec{ - PersistentVolume: &v1.PersistentVolume{ - Spec: v1.PersistentVolumeSpec{ - AccessModes: []v1.PersistentVolumeAccessMode{v1.ReadOnlyMany}, - }, - }, - }, - } - oe.DetachVolume(attachedVolumes[i], true /* verifySafeToDetach */, nil /* actualStateOfWorldAttacherUpdater */) - } - - // Assert - if !isOperationRunConcurrently(ch, quit, numVolumesToDetach) { - t.Fatalf("Attach volume operations should not execute serially") - } -} - -func TestOperationExecutor_VerifyVolumesAreAttachedConcurrentlyOnSameNode(t *testing.T) { +func TestOperationExecutor_VerifyVolumesAreAttachedConcurrently(t *testing.T) { // Arrange ch, quit, oe := setup() @@ -420,24 +237,6 @@ func TestOperationExecutor_VerifyVolumesAreAttachedConcurrentlyOnSameNode(t *tes } } -func TestOperationExecutor_VerifyVolumesAreAttachedConcurrentlyOnDifferentNodes(t *testing.T) { - // Arrange - ch, quit, oe := setup() - - // Act - for i := 0; i < numVolumesToVerifyAttached; i++ { - oe.VerifyVolumesAreAttachedPerNode( - nil, /* attachedVolumes */ - types.NodeName(fmt.Sprintf("node-name-%d", i)), - nil /* actualStateOfWorldAttacherUpdater */) - } - - // Assert - if !isOperationRunConcurrently(ch, quit, numVolumesToVerifyAttached) { - t.Fatalf("VerifyVolumesAreAttached operation is not being run concurrently") - } -} - func TestOperationExecutor_VerifyControllerAttachedVolumeConcurrently(t *testing.T) { // Arrange ch, quit, oe := setup() diff --git a/pkg/volume/util/util.go b/pkg/volume/util/util.go index 94419755942..e3baaa7c3a6 100644 --- a/pkg/volume/util/util.go +++ b/pkg/volume/util/util.go @@ -644,44 +644,3 @@ func WriteVolumeCache(deviceMountPath string, exec utilexec.Interface) error { // For linux runtime, it skips because unmount will automatically flush disk data return nil } - -// IsMultiAttachForbidden checks if attaching this volume to multiple nodes is definitely not allowed/possible. -// In its current form, this function can only reliably say for which volumes it's definitely forbidden. If it returns -// false, it is not guaranteed that multi-attach is actually supported by the volume type and we must rely on the -// attacher to fail fast in such cases. -// Please see https://github.com/kubernetes/kubernetes/issues/40669 and https://github.com/kubernetes/kubernetes/pull/40148#discussion_r98055047 -func IsMultiAttachForbidden(volumeSpec *volume.Spec) bool { - if volumeSpec == nil { - // we don't know if it's supported or not and let the attacher fail later in cases it's not supported - return false - } - - if volumeSpec.Volume != nil { - // Check for volume types which are known to fail slow or cause trouble when trying to multi-attach - if volumeSpec.Volume.AzureDisk != nil || - volumeSpec.Volume.Cinder != nil { - return true - } - } - - // Only if this volume is a persistent volume, we have reliable information on whether it's allowed or not to - // multi-attach. We trust in the individual volume implementations to not allow unsupported access modes - if volumeSpec.PersistentVolume != nil { - // Check for persistent volume types which do not fail when trying to multi-attach - if len(volumeSpec.PersistentVolume.Spec.AccessModes) == 0 { - // No access mode specified so we don't know for sure. Let the attacher fail if needed - return false - } - - // check if this volume is allowed to be attached to multiple PODs/nodes, if yes, return false - for _, accessMode := range volumeSpec.PersistentVolume.Spec.AccessModes { - if accessMode == v1.ReadWriteMany || accessMode == v1.ReadOnlyMany { - return false - } - } - return true - } - - // we don't know if it's supported or not and let the attacher fail later in cases it's not supported - return false -}