mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-23 19:56:01 +00:00
Merge pull request #87664 from liggitt/revert-parallel-volume
Revert "Merge pull request #87258 from verult/slow-rxm-attach"
This commit is contained in:
commit
7164152844
@ -16,7 +16,7 @@ go_library(
|
|||||||
"//pkg/controller/volume/attachdetach/statusupdater:go_default_library",
|
"//pkg/controller/volume/attachdetach/statusupdater:go_default_library",
|
||||||
"//pkg/kubelet/events:go_default_library",
|
"//pkg/kubelet/events:go_default_library",
|
||||||
"//pkg/util/goroutinemap/exponentialbackoff:go_default_library",
|
"//pkg/util/goroutinemap/exponentialbackoff:go_default_library",
|
||||||
"//pkg/volume/util:go_default_library",
|
"//pkg/volume:go_default_library",
|
||||||
"//pkg/volume/util/operationexecutor:go_default_library",
|
"//pkg/volume/util/operationexecutor:go_default_library",
|
||||||
"//staging/src/k8s.io/api/core/v1:go_default_library",
|
"//staging/src/k8s.io/api/core/v1:go_default_library",
|
||||||
"//staging/src/k8s.io/apimachinery/pkg/types:go_default_library",
|
"//staging/src/k8s.io/apimachinery/pkg/types:go_default_library",
|
||||||
|
@ -24,7 +24,7 @@ import (
|
|||||||
"strings"
|
"strings"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
v1 "k8s.io/api/core/v1"
|
"k8s.io/api/core/v1"
|
||||||
"k8s.io/apimachinery/pkg/types"
|
"k8s.io/apimachinery/pkg/types"
|
||||||
"k8s.io/apimachinery/pkg/util/wait"
|
"k8s.io/apimachinery/pkg/util/wait"
|
||||||
"k8s.io/client-go/tools/record"
|
"k8s.io/client-go/tools/record"
|
||||||
@ -34,7 +34,7 @@ import (
|
|||||||
"k8s.io/kubernetes/pkg/controller/volume/attachdetach/statusupdater"
|
"k8s.io/kubernetes/pkg/controller/volume/attachdetach/statusupdater"
|
||||||
kevents "k8s.io/kubernetes/pkg/kubelet/events"
|
kevents "k8s.io/kubernetes/pkg/kubelet/events"
|
||||||
"k8s.io/kubernetes/pkg/util/goroutinemap/exponentialbackoff"
|
"k8s.io/kubernetes/pkg/util/goroutinemap/exponentialbackoff"
|
||||||
"k8s.io/kubernetes/pkg/volume/util"
|
"k8s.io/kubernetes/pkg/volume"
|
||||||
"k8s.io/kubernetes/pkg/volume/util/operationexecutor"
|
"k8s.io/kubernetes/pkg/volume/util/operationexecutor"
|
||||||
)
|
)
|
||||||
|
|
||||||
@ -134,6 +134,42 @@ func (rc *reconciler) syncStates() {
|
|||||||
rc.attacherDetacher.VerifyVolumesAreAttached(volumesPerNode, rc.actualStateOfWorld)
|
rc.attacherDetacher.VerifyVolumesAreAttached(volumesPerNode, rc.actualStateOfWorld)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// isMultiAttachForbidden checks if attaching this volume to multiple nodes is definitely not allowed/possible.
|
||||||
|
// In its current form, this function can only reliably say for which volumes it's definitely forbidden. If it returns
|
||||||
|
// false, it is not guaranteed that multi-attach is actually supported by the volume type and we must rely on the
|
||||||
|
// attacher to fail fast in such cases.
|
||||||
|
// Please see https://github.com/kubernetes/kubernetes/issues/40669 and https://github.com/kubernetes/kubernetes/pull/40148#discussion_r98055047
|
||||||
|
func (rc *reconciler) isMultiAttachForbidden(volumeSpec *volume.Spec) bool {
|
||||||
|
if volumeSpec.Volume != nil {
|
||||||
|
// Check for volume types which are known to fail slow or cause trouble when trying to multi-attach
|
||||||
|
if volumeSpec.Volume.AzureDisk != nil ||
|
||||||
|
volumeSpec.Volume.Cinder != nil {
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Only if this volume is a persistent volume, we have reliable information on whether it's allowed or not to
|
||||||
|
// multi-attach. We trust in the individual volume implementations to not allow unsupported access modes
|
||||||
|
if volumeSpec.PersistentVolume != nil {
|
||||||
|
// Check for persistent volume types which do not fail when trying to multi-attach
|
||||||
|
if len(volumeSpec.PersistentVolume.Spec.AccessModes) == 0 {
|
||||||
|
// No access mode specified so we don't know for sure. Let the attacher fail if needed
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
|
||||||
|
// check if this volume is allowed to be attached to multiple PODs/nodes, if yes, return false
|
||||||
|
for _, accessMode := range volumeSpec.PersistentVolume.Spec.AccessModes {
|
||||||
|
if accessMode == v1.ReadWriteMany || accessMode == v1.ReadOnlyMany {
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
|
||||||
|
// we don't know if it's supported or not and let the attacher fail later in cases it's not supported
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
|
||||||
func (rc *reconciler) reconcile() {
|
func (rc *reconciler) reconcile() {
|
||||||
// Detaches are triggered before attaches so that volumes referenced by
|
// Detaches are triggered before attaches so that volumes referenced by
|
||||||
// pods that are rescheduled to a different node are detached first.
|
// pods that are rescheduled to a different node are detached first.
|
||||||
@ -146,16 +182,9 @@ func (rc *reconciler) reconcile() {
|
|||||||
// This check must be done before we do any other checks, as otherwise the other checks
|
// This check must be done before we do any other checks, as otherwise the other checks
|
||||||
// may pass while at the same time the volume leaves the pending state, resulting in
|
// may pass while at the same time the volume leaves the pending state, resulting in
|
||||||
// double detach attempts
|
// double detach attempts
|
||||||
if util.IsMultiAttachForbidden(attachedVolume.VolumeSpec) {
|
if rc.attacherDetacher.IsOperationPending(attachedVolume.VolumeName, "") {
|
||||||
if rc.attacherDetacher.IsOperationPending(attachedVolume.VolumeName, "" /* podName */, "" /* nodeName */) {
|
klog.V(10).Infof("Operation for volume %q is already running. Can't start detach for %q", attachedVolume.VolumeName, attachedVolume.NodeName)
|
||||||
klog.V(10).Infof("Operation for volume %q is already running in the cluster. Can't start detach for %q", attachedVolume.VolumeName, attachedVolume.NodeName)
|
continue
|
||||||
continue
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
if rc.attacherDetacher.IsOperationPending(attachedVolume.VolumeName, "" /* podName */, attachedVolume.NodeName) {
|
|
||||||
klog.V(10).Infof("Operation for volume %q is already running for node %q. Can't start detach", attachedVolume.VolumeName, attachedVolume.NodeName)
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// Set the detach request time
|
// Set the detach request time
|
||||||
@ -231,17 +260,15 @@ func (rc *reconciler) attachDesiredVolumes() {
|
|||||||
rc.actualStateOfWorld.ResetDetachRequestTime(volumeToAttach.VolumeName, volumeToAttach.NodeName)
|
rc.actualStateOfWorld.ResetDetachRequestTime(volumeToAttach.VolumeName, volumeToAttach.NodeName)
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
// Don't even try to start an operation if there is already one running
|
||||||
if util.IsMultiAttachForbidden(volumeToAttach.VolumeSpec) {
|
if rc.attacherDetacher.IsOperationPending(volumeToAttach.VolumeName, "") {
|
||||||
|
if klog.V(10) {
|
||||||
// Don't even try to start an operation if there is already one running for the given volume
|
klog.Infof("Operation for volume %q is already running. Can't start attach for %q", volumeToAttach.VolumeName, volumeToAttach.NodeName)
|
||||||
if rc.attacherDetacher.IsOperationPending(volumeToAttach.VolumeName, "" /* podName */, "" /* nodeName */) {
|
|
||||||
if klog.V(10) {
|
|
||||||
klog.Infof("Operation for volume %q is already running. Can't start attach for %q", volumeToAttach.VolumeName, volumeToAttach.NodeName)
|
|
||||||
}
|
|
||||||
continue
|
|
||||||
}
|
}
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
if rc.isMultiAttachForbidden(volumeToAttach.VolumeSpec) {
|
||||||
nodes := rc.actualStateOfWorld.GetNodesForAttachedVolume(volumeToAttach.VolumeName)
|
nodes := rc.actualStateOfWorld.GetNodesForAttachedVolume(volumeToAttach.VolumeName)
|
||||||
if len(nodes) > 0 {
|
if len(nodes) > 0 {
|
||||||
if !volumeToAttach.MultiAttachErrorReported {
|
if !volumeToAttach.MultiAttachErrorReported {
|
||||||
@ -250,17 +277,6 @@ func (rc *reconciler) attachDesiredVolumes() {
|
|||||||
}
|
}
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
} else {
|
|
||||||
|
|
||||||
// Don't even try to start an operation if there is already one running for the given volume and node.
|
|
||||||
if rc.attacherDetacher.IsOperationPending(volumeToAttach.VolumeName, "" /* podName */, volumeToAttach.NodeName) {
|
|
||||||
if klog.V(10) {
|
|
||||||
klog.Infof("Operation for volume %q is already running for node %q. Can't start attach", volumeToAttach.VolumeName, volumeToAttach.NodeName)
|
|
||||||
}
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// Volume/Node doesn't exist, spawn a goroutine to attach it
|
// Volume/Node doesn't exist, spawn a goroutine to attach it
|
||||||
|
@ -294,7 +294,7 @@ func (rc *reconciler) unmountDetachDevices() {
|
|||||||
for _, attachedVolume := range rc.actualStateOfWorld.GetUnmountedVolumes() {
|
for _, attachedVolume := range rc.actualStateOfWorld.GetUnmountedVolumes() {
|
||||||
// Check IsOperationPending to avoid marking a volume as detached if it's in the process of mounting.
|
// Check IsOperationPending to avoid marking a volume as detached if it's in the process of mounting.
|
||||||
if !rc.desiredStateOfWorld.VolumeExists(attachedVolume.VolumeName) &&
|
if !rc.desiredStateOfWorld.VolumeExists(attachedVolume.VolumeName) &&
|
||||||
!rc.operationExecutor.IsOperationPending(attachedVolume.VolumeName, nestedpendingoperations.EmptyUniquePodName, nestedpendingoperations.EmptyNodeName) {
|
!rc.operationExecutor.IsOperationPending(attachedVolume.VolumeName, nestedpendingoperations.EmptyUniquePodName) {
|
||||||
if attachedVolume.DeviceMayBeMounted() {
|
if attachedVolume.DeviceMayBeMounted() {
|
||||||
// Volume is globally mounted to device, unmount it
|
// Volume is globally mounted to device, unmount it
|
||||||
klog.V(5).Infof(attachedVolume.GenerateMsgDetailed("Starting operationExecutor.UnmountDevice", ""))
|
klog.V(5).Infof(attachedVolume.GenerateMsgDetailed("Starting operationExecutor.UnmountDevice", ""))
|
||||||
@ -422,7 +422,7 @@ func (rc *reconciler) syncStates() {
|
|||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
// There is no pod that uses the volume.
|
// There is no pod that uses the volume.
|
||||||
if rc.operationExecutor.IsOperationPending(reconstructedVolume.volumeName, nestedpendingoperations.EmptyUniquePodName, nestedpendingoperations.EmptyNodeName) {
|
if rc.operationExecutor.IsOperationPending(reconstructedVolume.volumeName, nestedpendingoperations.EmptyUniquePodName) {
|
||||||
klog.Warning("Volume is in pending operation, skip cleaning up mounts")
|
klog.Warning("Volume is in pending operation, skip cleaning up mounts")
|
||||||
}
|
}
|
||||||
klog.V(2).Infof(
|
klog.V(2).Infof(
|
||||||
|
@ -14,7 +14,6 @@ go_library(
|
|||||||
"//pkg/util/goroutinemap/exponentialbackoff:go_default_library",
|
"//pkg/util/goroutinemap/exponentialbackoff:go_default_library",
|
||||||
"//pkg/volume/util/types:go_default_library",
|
"//pkg/volume/util/types:go_default_library",
|
||||||
"//staging/src/k8s.io/api/core/v1:go_default_library",
|
"//staging/src/k8s.io/api/core/v1:go_default_library",
|
||||||
"//staging/src/k8s.io/apimachinery/pkg/types:go_default_library",
|
|
||||||
"//staging/src/k8s.io/apimachinery/pkg/util/runtime:go_default_library",
|
"//staging/src/k8s.io/apimachinery/pkg/util/runtime:go_default_library",
|
||||||
"//vendor/k8s.io/klog:go_default_library",
|
"//vendor/k8s.io/klog:go_default_library",
|
||||||
],
|
],
|
||||||
@ -28,7 +27,6 @@ go_test(
|
|||||||
"//pkg/util/goroutinemap/exponentialbackoff:go_default_library",
|
"//pkg/util/goroutinemap/exponentialbackoff:go_default_library",
|
||||||
"//pkg/volume/util/types:go_default_library",
|
"//pkg/volume/util/types:go_default_library",
|
||||||
"//staging/src/k8s.io/api/core/v1:go_default_library",
|
"//staging/src/k8s.io/api/core/v1:go_default_library",
|
||||||
"//staging/src/k8s.io/apimachinery/pkg/types:go_default_library",
|
|
||||||
"//staging/src/k8s.io/apimachinery/pkg/util/wait:go_default_library",
|
"//staging/src/k8s.io/apimachinery/pkg/util/wait:go_default_library",
|
||||||
],
|
],
|
||||||
)
|
)
|
||||||
|
@ -29,73 +29,45 @@ import (
|
|||||||
"sync"
|
"sync"
|
||||||
|
|
||||||
"k8s.io/api/core/v1"
|
"k8s.io/api/core/v1"
|
||||||
"k8s.io/apimachinery/pkg/types"
|
|
||||||
k8sRuntime "k8s.io/apimachinery/pkg/util/runtime"
|
k8sRuntime "k8s.io/apimachinery/pkg/util/runtime"
|
||||||
"k8s.io/klog"
|
"k8s.io/klog"
|
||||||
"k8s.io/kubernetes/pkg/util/goroutinemap/exponentialbackoff"
|
"k8s.io/kubernetes/pkg/util/goroutinemap/exponentialbackoff"
|
||||||
volumetypes "k8s.io/kubernetes/pkg/volume/util/types"
|
"k8s.io/kubernetes/pkg/volume/util/types"
|
||||||
)
|
)
|
||||||
|
|
||||||
const (
|
const (
|
||||||
// EmptyUniquePodName is a UniquePodName for empty string.
|
// EmptyUniquePodName is a UniquePodName for empty string.
|
||||||
EmptyUniquePodName volumetypes.UniquePodName = volumetypes.UniquePodName("")
|
EmptyUniquePodName types.UniquePodName = types.UniquePodName("")
|
||||||
|
|
||||||
// EmptyUniqueVolumeName is a UniqueVolumeName for empty string
|
// EmptyUniqueVolumeName is a UniqueVolumeName for empty string
|
||||||
EmptyUniqueVolumeName v1.UniqueVolumeName = v1.UniqueVolumeName("")
|
EmptyUniqueVolumeName v1.UniqueVolumeName = v1.UniqueVolumeName("")
|
||||||
|
|
||||||
// EmptyNodeName is a NodeName for empty string
|
|
||||||
EmptyNodeName types.NodeName = types.NodeName("")
|
|
||||||
)
|
)
|
||||||
|
|
||||||
// NestedPendingOperations defines the supported set of operations.
|
// NestedPendingOperations defines the supported set of operations.
|
||||||
type NestedPendingOperations interface {
|
type NestedPendingOperations interface {
|
||||||
|
// Run adds the concatenation of volumeName and podName to the list of
|
||||||
// Run adds the concatenation of volumeName and one of podName or nodeName to
|
// running operations and spawns a new go routine to execute operationFunc.
|
||||||
// the list of running operations and spawns a new go routine to execute
|
// If an operation with the same volumeName, same or empty podName
|
||||||
// OperationFunc inside generatedOperations.
|
// and same operationName exits, an AlreadyExists or ExponentialBackoff
|
||||||
|
// error is returned. If an operation with same volumeName and podName
|
||||||
// volumeName, podName, and nodeName collectively form the operation key.
|
// has ExponentialBackoff error but operationName is different, exponential
|
||||||
// The following forms of operation keys are supported:
|
// backoff is reset and operation is allowed to proceed.
|
||||||
// - volumeName empty, podName empty, nodeName empty
|
// This enables multiple operations to execute in parallel for the same
|
||||||
// This key does not have any conflicting keys.
|
// volumeName as long as they have different podName.
|
||||||
// - volumeName exists, podName empty, nodeName empty
|
|
||||||
// This key conflicts with all other keys with the same volumeName.
|
|
||||||
// - volumeName exists, podName exists, nodeName empty
|
|
||||||
// This key conflicts with:
|
|
||||||
// - the same volumeName and podName
|
|
||||||
// - the same volumeName, but no podName
|
|
||||||
// - volumeName exists, podName empty, nodeName exists
|
|
||||||
// This key conflicts with:
|
|
||||||
// - the same volumeName and nodeName
|
|
||||||
// - the same volumeName but no nodeName
|
|
||||||
|
|
||||||
// If an operation with the same operationName and a conflicting key exists,
|
|
||||||
// an AlreadyExists or ExponentialBackoff error is returned.
|
|
||||||
// If an operation with a conflicting key has ExponentialBackoff error but
|
|
||||||
// operationName is different, exponential backoff is reset and operation is
|
|
||||||
// allowed to proceed.
|
|
||||||
|
|
||||||
// Once the operation is complete, the go routine is terminated and the
|
// Once the operation is complete, the go routine is terminated and the
|
||||||
// concatenation of volumeName and (podName or nodeName) is removed from the
|
// concatenation of volumeName and podName is removed from the list of
|
||||||
// list of executing operations allowing a new operation to be started with
|
// executing operations allowing a new operation to be started with the
|
||||||
// the volumeName without error.
|
// volumeName without error.
|
||||||
Run(
|
Run(volumeName v1.UniqueVolumeName, podName types.UniquePodName, generatedOperations types.GeneratedOperations) error
|
||||||
volumeName v1.UniqueVolumeName,
|
|
||||||
podName volumetypes.UniquePodName,
|
|
||||||
nodeName types.NodeName,
|
|
||||||
generatedOperations volumetypes.GeneratedOperations) error
|
|
||||||
|
|
||||||
// Wait blocks until all operations are completed. This is typically
|
// Wait blocks until all operations are completed. This is typically
|
||||||
// necessary during tests - the test should wait until all operations finish
|
// necessary during tests - the test should wait until all operations finish
|
||||||
// and evaluate results after that.
|
// and evaluate results after that.
|
||||||
Wait()
|
Wait()
|
||||||
|
|
||||||
// IsOperationPending returns true if an operation for the given volumeName
|
// IsOperationPending returns true if an operation for the given volumeName and podName is pending,
|
||||||
// and one of podName or nodeName is pending, otherwise it returns false
|
// otherwise it returns false
|
||||||
IsOperationPending(
|
IsOperationPending(volumeName v1.UniqueVolumeName, podName types.UniquePodName) bool
|
||||||
volumeName v1.UniqueVolumeName,
|
|
||||||
podName volumetypes.UniquePodName,
|
|
||||||
nodeName types.NodeName) bool
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewNestedPendingOperations returns a new instance of NestedPendingOperations.
|
// NewNestedPendingOperations returns a new instance of NestedPendingOperations.
|
||||||
@ -116,7 +88,8 @@ type nestedPendingOperations struct {
|
|||||||
}
|
}
|
||||||
|
|
||||||
type operation struct {
|
type operation struct {
|
||||||
key operationKey
|
volumeName v1.UniqueVolumeName
|
||||||
|
podName types.UniquePodName
|
||||||
operationName string
|
operationName string
|
||||||
operationPending bool
|
operationPending bool
|
||||||
expBackoff exponentialbackoff.ExponentialBackoff
|
expBackoff exponentialbackoff.ExponentialBackoff
|
||||||
@ -124,24 +97,22 @@ type operation struct {
|
|||||||
|
|
||||||
func (grm *nestedPendingOperations) Run(
|
func (grm *nestedPendingOperations) Run(
|
||||||
volumeName v1.UniqueVolumeName,
|
volumeName v1.UniqueVolumeName,
|
||||||
podName volumetypes.UniquePodName,
|
podName types.UniquePodName,
|
||||||
nodeName types.NodeName,
|
generatedOperations types.GeneratedOperations) error {
|
||||||
generatedOperations volumetypes.GeneratedOperations) error {
|
|
||||||
grm.lock.Lock()
|
grm.lock.Lock()
|
||||||
defer grm.lock.Unlock()
|
defer grm.lock.Unlock()
|
||||||
|
opExists, previousOpIndex := grm.isOperationExists(volumeName, podName)
|
||||||
opKey := operationKey{volumeName, podName, nodeName}
|
|
||||||
|
|
||||||
opExists, previousOpIndex := grm.isOperationExists(opKey)
|
|
||||||
if opExists {
|
if opExists {
|
||||||
previousOp := grm.operations[previousOpIndex]
|
previousOp := grm.operations[previousOpIndex]
|
||||||
// Operation already exists
|
// Operation already exists
|
||||||
if previousOp.operationPending {
|
if previousOp.operationPending {
|
||||||
// Operation is pending
|
// Operation is pending
|
||||||
return NewAlreadyExistsError(opKey)
|
operationKey := getOperationKey(volumeName, podName)
|
||||||
|
return NewAlreadyExistsError(operationKey)
|
||||||
}
|
}
|
||||||
|
|
||||||
backOffErr := previousOp.expBackoff.SafeToRetry(opKey.String())
|
operationKey := getOperationKey(volumeName, podName)
|
||||||
|
backOffErr := previousOp.expBackoff.SafeToRetry(operationKey)
|
||||||
if backOffErr != nil {
|
if backOffErr != nil {
|
||||||
if previousOp.operationName == generatedOperations.OperationName {
|
if previousOp.operationName == generatedOperations.OperationName {
|
||||||
return backOffErr
|
return backOffErr
|
||||||
@ -153,13 +124,15 @@ func (grm *nestedPendingOperations) Run(
|
|||||||
|
|
||||||
// Update existing operation to mark as pending.
|
// Update existing operation to mark as pending.
|
||||||
grm.operations[previousOpIndex].operationPending = true
|
grm.operations[previousOpIndex].operationPending = true
|
||||||
grm.operations[previousOpIndex].key = opKey
|
grm.operations[previousOpIndex].volumeName = volumeName
|
||||||
|
grm.operations[previousOpIndex].podName = podName
|
||||||
} else {
|
} else {
|
||||||
// Create a new operation
|
// Create a new operation
|
||||||
grm.operations = append(grm.operations,
|
grm.operations = append(grm.operations,
|
||||||
operation{
|
operation{
|
||||||
key: opKey,
|
|
||||||
operationPending: true,
|
operationPending: true,
|
||||||
|
volumeName: volumeName,
|
||||||
|
podName: podName,
|
||||||
operationName: generatedOperations.OperationName,
|
operationName: generatedOperations.OperationName,
|
||||||
expBackoff: exponentialbackoff.ExponentialBackoff{},
|
expBackoff: exponentialbackoff.ExponentialBackoff{},
|
||||||
})
|
})
|
||||||
@ -169,7 +142,7 @@ func (grm *nestedPendingOperations) Run(
|
|||||||
// Handle unhandled panics (very unlikely)
|
// Handle unhandled panics (very unlikely)
|
||||||
defer k8sRuntime.HandleCrash()
|
defer k8sRuntime.HandleCrash()
|
||||||
// Handle completion of and error, if any, from operationFunc()
|
// Handle completion of and error, if any, from operationFunc()
|
||||||
defer grm.operationComplete(opKey, &detailedErr)
|
defer grm.operationComplete(volumeName, podName, &detailedErr)
|
||||||
return generatedOperations.Run()
|
return generatedOperations.Run()
|
||||||
}()
|
}()
|
||||||
|
|
||||||
@ -178,14 +151,12 @@ func (grm *nestedPendingOperations) Run(
|
|||||||
|
|
||||||
func (grm *nestedPendingOperations) IsOperationPending(
|
func (grm *nestedPendingOperations) IsOperationPending(
|
||||||
volumeName v1.UniqueVolumeName,
|
volumeName v1.UniqueVolumeName,
|
||||||
podName volumetypes.UniquePodName,
|
podName types.UniquePodName) bool {
|
||||||
nodeName types.NodeName) bool {
|
|
||||||
|
|
||||||
grm.lock.RLock()
|
grm.lock.RLock()
|
||||||
defer grm.lock.RUnlock()
|
defer grm.lock.RUnlock()
|
||||||
|
|
||||||
opKey := operationKey{volumeName, podName, nodeName}
|
exist, previousOpIndex := grm.isOperationExists(volumeName, podName)
|
||||||
exist, previousOpIndex := grm.isOperationExists(opKey)
|
|
||||||
if exist && grm.operations[previousOpIndex].operationPending {
|
if exist && grm.operations[previousOpIndex].operationPending {
|
||||||
return true
|
return true
|
||||||
}
|
}
|
||||||
@ -193,52 +164,59 @@ func (grm *nestedPendingOperations) IsOperationPending(
|
|||||||
}
|
}
|
||||||
|
|
||||||
// This is an internal function and caller should acquire and release the lock
|
// This is an internal function and caller should acquire and release the lock
|
||||||
func (grm *nestedPendingOperations) isOperationExists(key operationKey) (bool, int) {
|
func (grm *nestedPendingOperations) isOperationExists(
|
||||||
|
volumeName v1.UniqueVolumeName,
|
||||||
|
podName types.UniquePodName) (bool, int) {
|
||||||
|
|
||||||
// If volumeName is empty, operation can be executed concurrently
|
// If volumeName is empty, operation can be executed concurrently
|
||||||
if key.volumeName == EmptyUniqueVolumeName {
|
if volumeName == EmptyUniqueVolumeName {
|
||||||
return false, -1
|
return false, -1
|
||||||
}
|
}
|
||||||
|
|
||||||
for previousOpIndex, previousOp := range grm.operations {
|
for previousOpIndex, previousOp := range grm.operations {
|
||||||
volumeNameMatch := previousOp.key.volumeName == key.volumeName
|
if previousOp.volumeName != volumeName {
|
||||||
|
// No match, keep searching
|
||||||
podNameMatch := previousOp.key.podName == EmptyUniquePodName ||
|
continue
|
||||||
key.podName == EmptyUniquePodName ||
|
|
||||||
previousOp.key.podName == key.podName
|
|
||||||
|
|
||||||
nodeNameMatch := previousOp.key.nodeName == EmptyNodeName ||
|
|
||||||
key.nodeName == EmptyNodeName ||
|
|
||||||
previousOp.key.nodeName == key.nodeName
|
|
||||||
|
|
||||||
if volumeNameMatch && podNameMatch && nodeNameMatch {
|
|
||||||
return true, previousOpIndex
|
|
||||||
}
|
}
|
||||||
}
|
|
||||||
|
|
||||||
|
if previousOp.podName != EmptyUniquePodName &&
|
||||||
|
podName != EmptyUniquePodName &&
|
||||||
|
previousOp.podName != podName {
|
||||||
|
// No match, keep searching
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
// Match
|
||||||
|
return true, previousOpIndex
|
||||||
|
}
|
||||||
return false, -1
|
return false, -1
|
||||||
}
|
}
|
||||||
|
|
||||||
func (grm *nestedPendingOperations) getOperation(key operationKey) (uint, error) {
|
func (grm *nestedPendingOperations) getOperation(
|
||||||
|
volumeName v1.UniqueVolumeName,
|
||||||
|
podName types.UniquePodName) (uint, error) {
|
||||||
// Assumes lock has been acquired by caller.
|
// Assumes lock has been acquired by caller.
|
||||||
|
|
||||||
for i, op := range grm.operations {
|
for i, op := range grm.operations {
|
||||||
if op.key.volumeName == key.volumeName &&
|
if op.volumeName == volumeName &&
|
||||||
op.key.podName == key.podName {
|
op.podName == podName {
|
||||||
return uint(i), nil
|
return uint(i), nil
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
return 0, fmt.Errorf("Operation %q not found", key)
|
logOperationKey := getOperationKey(volumeName, podName)
|
||||||
|
return 0, fmt.Errorf("Operation %q not found", logOperationKey)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (grm *nestedPendingOperations) deleteOperation(key operationKey) {
|
func (grm *nestedPendingOperations) deleteOperation(
|
||||||
// Assumes lock has been acquired by caller.
|
// Assumes lock has been acquired by caller.
|
||||||
|
volumeName v1.UniqueVolumeName,
|
||||||
|
podName types.UniquePodName) {
|
||||||
|
|
||||||
opIndex := -1
|
opIndex := -1
|
||||||
for i, op := range grm.operations {
|
for i, op := range grm.operations {
|
||||||
if op.key.volumeName == key.volumeName &&
|
if op.volumeName == volumeName &&
|
||||||
op.key.podName == key.podName {
|
op.podName == podName {
|
||||||
opIndex = i
|
opIndex = i
|
||||||
break
|
break
|
||||||
}
|
}
|
||||||
@ -249,7 +227,8 @@ func (grm *nestedPendingOperations) deleteOperation(key operationKey) {
|
|||||||
grm.operations = grm.operations[:len(grm.operations)-1]
|
grm.operations = grm.operations[:len(grm.operations)-1]
|
||||||
}
|
}
|
||||||
|
|
||||||
func (grm *nestedPendingOperations) operationComplete(key operationKey, err *error) {
|
func (grm *nestedPendingOperations) operationComplete(
|
||||||
|
volumeName v1.UniqueVolumeName, podName types.UniquePodName, err *error) {
|
||||||
// Defer operations are executed in Last-In is First-Out order. In this case
|
// Defer operations are executed in Last-In is First-Out order. In this case
|
||||||
// the lock is acquired first when operationCompletes begins, and is
|
// the lock is acquired first when operationCompletes begins, and is
|
||||||
// released when the method finishes, after the lock is released cond is
|
// released when the method finishes, after the lock is released cond is
|
||||||
@ -260,20 +239,24 @@ func (grm *nestedPendingOperations) operationComplete(key operationKey, err *err
|
|||||||
|
|
||||||
if *err == nil || !grm.exponentialBackOffOnError {
|
if *err == nil || !grm.exponentialBackOffOnError {
|
||||||
// Operation completed without error, or exponentialBackOffOnError disabled
|
// Operation completed without error, or exponentialBackOffOnError disabled
|
||||||
grm.deleteOperation(key)
|
grm.deleteOperation(volumeName, podName)
|
||||||
if *err != nil {
|
if *err != nil {
|
||||||
// Log error
|
// Log error
|
||||||
klog.Errorf("operation %s failed with: %v", key, *err)
|
logOperationKey := getOperationKey(volumeName, podName)
|
||||||
|
klog.Errorf("operation %s failed with: %v",
|
||||||
|
logOperationKey,
|
||||||
|
*err)
|
||||||
}
|
}
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
// Operation completed with error and exponentialBackOffOnError Enabled
|
// Operation completed with error and exponentialBackOffOnError Enabled
|
||||||
existingOpIndex, getOpErr := grm.getOperation(key)
|
existingOpIndex, getOpErr := grm.getOperation(volumeName, podName)
|
||||||
if getOpErr != nil {
|
if getOpErr != nil {
|
||||||
// Failed to find existing operation
|
// Failed to find existing operation
|
||||||
|
logOperationKey := getOperationKey(volumeName, podName)
|
||||||
klog.Errorf("Operation %s completed. error: %v. exponentialBackOffOnError is enabled, but failed to get operation to update.",
|
klog.Errorf("Operation %s completed. error: %v. exponentialBackOffOnError is enabled, but failed to get operation to update.",
|
||||||
key,
|
logOperationKey,
|
||||||
*err)
|
*err)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
@ -282,8 +265,10 @@ func (grm *nestedPendingOperations) operationComplete(key operationKey, err *err
|
|||||||
grm.operations[existingOpIndex].operationPending = false
|
grm.operations[existingOpIndex].operationPending = false
|
||||||
|
|
||||||
// Log error
|
// Log error
|
||||||
|
operationKey :=
|
||||||
|
getOperationKey(volumeName, podName)
|
||||||
klog.Errorf("%v", grm.operations[existingOpIndex].expBackoff.
|
klog.Errorf("%v", grm.operations[existingOpIndex].expBackoff.
|
||||||
GenerateNoRetriesPermittedMsg(key.String()))
|
GenerateNoRetriesPermittedMsg(operationKey))
|
||||||
}
|
}
|
||||||
|
|
||||||
func (grm *nestedPendingOperations) Wait() {
|
func (grm *nestedPendingOperations) Wait() {
|
||||||
@ -295,22 +280,21 @@ func (grm *nestedPendingOperations) Wait() {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
type operationKey struct {
|
func getOperationKey(
|
||||||
volumeName v1.UniqueVolumeName
|
volumeName v1.UniqueVolumeName, podName types.UniquePodName) string {
|
||||||
podName volumetypes.UniquePodName
|
podNameStr := ""
|
||||||
nodeName types.NodeName
|
if podName != EmptyUniquePodName {
|
||||||
}
|
podNameStr = fmt.Sprintf(" (%q)", podName)
|
||||||
|
}
|
||||||
|
|
||||||
func (key operationKey) String() string {
|
return fmt.Sprintf("%q%s",
|
||||||
return fmt.Sprintf("{volumeName=%q, podName=%q, nodeName=%q}",
|
volumeName,
|
||||||
key.volumeName,
|
podNameStr)
|
||||||
key.podName,
|
|
||||||
key.nodeName)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewAlreadyExistsError returns a new instance of AlreadyExists error.
|
// NewAlreadyExistsError returns a new instance of AlreadyExists error.
|
||||||
func NewAlreadyExistsError(key operationKey) error {
|
func NewAlreadyExistsError(operationKey string) error {
|
||||||
return alreadyExistsError{key}
|
return alreadyExistsError{operationKey}
|
||||||
}
|
}
|
||||||
|
|
||||||
// IsAlreadyExists returns true if an error returned from
|
// IsAlreadyExists returns true if an error returned from
|
||||||
@ -329,7 +313,7 @@ func IsAlreadyExists(err error) bool {
|
|||||||
// new operation can not be started because an operation with the same operation
|
// new operation can not be started because an operation with the same operation
|
||||||
// name is already executing.
|
// name is already executing.
|
||||||
type alreadyExistsError struct {
|
type alreadyExistsError struct {
|
||||||
operationKey operationKey
|
operationKey string
|
||||||
}
|
}
|
||||||
|
|
||||||
var _ error = alreadyExistsError{}
|
var _ error = alreadyExistsError{}
|
||||||
|
@ -22,10 +22,9 @@ import (
|
|||||||
"time"
|
"time"
|
||||||
|
|
||||||
"k8s.io/api/core/v1"
|
"k8s.io/api/core/v1"
|
||||||
"k8s.io/apimachinery/pkg/types"
|
|
||||||
"k8s.io/apimachinery/pkg/util/wait"
|
"k8s.io/apimachinery/pkg/util/wait"
|
||||||
"k8s.io/kubernetes/pkg/util/goroutinemap/exponentialbackoff"
|
"k8s.io/kubernetes/pkg/util/goroutinemap/exponentialbackoff"
|
||||||
volumetypes "k8s.io/kubernetes/pkg/volume/util/types"
|
"k8s.io/kubernetes/pkg/volume/util/types"
|
||||||
)
|
)
|
||||||
|
|
||||||
const (
|
const (
|
||||||
@ -45,22 +44,22 @@ const (
|
|||||||
initialOperationWaitTimeLong time.Duration = 500 * time.Millisecond
|
initialOperationWaitTimeLong time.Duration = 500 * time.Millisecond
|
||||||
)
|
)
|
||||||
|
|
||||||
func Test_NestedPendingOperations_Positive_SingleOp(t *testing.T) {
|
func Test_NewGoRoutineMap_Positive_SingleOp(t *testing.T) {
|
||||||
// Arrange
|
// Arrange
|
||||||
grm := NewNestedPendingOperations(false /* exponentialBackOffOnError */)
|
grm := NewNestedPendingOperations(false /* exponentialBackOffOnError */)
|
||||||
volumeName := v1.UniqueVolumeName("volume-name")
|
volumeName := v1.UniqueVolumeName("volume-name")
|
||||||
operation := func() (error, error) { return nil, nil }
|
operation := func() (error, error) { return nil, nil }
|
||||||
|
|
||||||
// Act
|
// Act
|
||||||
err := grm.Run(volumeName, "" /* operationSubName */, "" /* nodeName */, volumetypes.GeneratedOperations{OperationFunc: operation})
|
err := grm.Run(volumeName, "" /* operationSubName */, types.GeneratedOperations{OperationFunc: operation})
|
||||||
|
|
||||||
// Assert
|
// Assert
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatalf("NestedPendingOperations failed. Expected: <no error> Actual: <%v>", err)
|
t.Fatalf("NewGoRoutine failed. Expected: <no error> Actual: <%v>", err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func Test_NestedPendingOperations_Positive_TwoOps(t *testing.T) {
|
func Test_NewGoRoutineMap_Positive_TwoOps(t *testing.T) {
|
||||||
// Arrange
|
// Arrange
|
||||||
grm := NewNestedPendingOperations(false /* exponentialBackOffOnError */)
|
grm := NewNestedPendingOperations(false /* exponentialBackOffOnError */)
|
||||||
volume1Name := v1.UniqueVolumeName("volume1-name")
|
volume1Name := v1.UniqueVolumeName("volume1-name")
|
||||||
@ -68,65 +67,65 @@ func Test_NestedPendingOperations_Positive_TwoOps(t *testing.T) {
|
|||||||
operation := func() (error, error) { return nil, nil }
|
operation := func() (error, error) { return nil, nil }
|
||||||
|
|
||||||
// Act
|
// Act
|
||||||
err1 := grm.Run(volume1Name, "" /* operationSubName */, "" /* nodeName */, volumetypes.GeneratedOperations{OperationFunc: operation})
|
err1 := grm.Run(volume1Name, "" /* operationSubName */, types.GeneratedOperations{OperationFunc: operation})
|
||||||
err2 := grm.Run(volume2Name, "" /* operationSubName */, "" /* nodeName */, volumetypes.GeneratedOperations{OperationFunc: operation})
|
err2 := grm.Run(volume2Name, "" /* operationSubName */, types.GeneratedOperations{OperationFunc: operation})
|
||||||
|
|
||||||
// Assert
|
// Assert
|
||||||
if err1 != nil {
|
if err1 != nil {
|
||||||
t.Fatalf("NestedPendingOperations %q failed. Expected: <no error> Actual: <%v>", volume1Name, err1)
|
t.Fatalf("NewGoRoutine %q failed. Expected: <no error> Actual: <%v>", volume1Name, err1)
|
||||||
}
|
}
|
||||||
|
|
||||||
if err2 != nil {
|
if err2 != nil {
|
||||||
t.Fatalf("NestedPendingOperations %q failed. Expected: <no error> Actual: <%v>", volume2Name, err2)
|
t.Fatalf("NewGoRoutine %q failed. Expected: <no error> Actual: <%v>", volume2Name, err2)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func Test_NestedPendingOperations_Positive_TwoSubOps(t *testing.T) {
|
func Test_NewGoRoutineMap_Positive_TwoSubOps(t *testing.T) {
|
||||||
// Arrange
|
// Arrange
|
||||||
grm := NewNestedPendingOperations(false /* exponentialBackOffOnError */)
|
grm := NewNestedPendingOperations(false /* exponentialBackOffOnError */)
|
||||||
volumeName := v1.UniqueVolumeName("volume-name")
|
volumeName := v1.UniqueVolumeName("volume-name")
|
||||||
operation1PodName := volumetypes.UniquePodName("operation1-podname")
|
operation1PodName := types.UniquePodName("operation1-podname")
|
||||||
operation2PodName := volumetypes.UniquePodName("operation2-podname")
|
operation2PodName := types.UniquePodName("operation2-podname")
|
||||||
operation := func() (error, error) { return nil, nil }
|
operation := func() (error, error) { return nil, nil }
|
||||||
|
|
||||||
// Act
|
// Act
|
||||||
err1 := grm.Run(volumeName, operation1PodName, "" /* nodeName */, volumetypes.GeneratedOperations{OperationFunc: operation})
|
err1 := grm.Run(volumeName, operation1PodName, types.GeneratedOperations{OperationFunc: operation})
|
||||||
err2 := grm.Run(volumeName, operation2PodName, "" /* nodeName */, volumetypes.GeneratedOperations{OperationFunc: operation})
|
err2 := grm.Run(volumeName, operation2PodName, types.GeneratedOperations{OperationFunc: operation})
|
||||||
|
|
||||||
// Assert
|
// Assert
|
||||||
if err1 != nil {
|
if err1 != nil {
|
||||||
t.Fatalf("NestedPendingOperations %q failed. Expected: <no error> Actual: <%v>", operation1PodName, err1)
|
t.Fatalf("NewGoRoutine %q failed. Expected: <no error> Actual: <%v>", operation1PodName, err1)
|
||||||
}
|
}
|
||||||
|
|
||||||
if err2 != nil {
|
if err2 != nil {
|
||||||
t.Fatalf("NestedPendingOperations %q failed. Expected: <no error> Actual: <%v>", operation2PodName, err2)
|
t.Fatalf("NewGoRoutine %q failed. Expected: <no error> Actual: <%v>", operation2PodName, err2)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func Test_NestedPendingOperations_Positive_SingleOpWithExpBackoff(t *testing.T) {
|
func Test_NewGoRoutineMap_Positive_SingleOpWithExpBackoff(t *testing.T) {
|
||||||
// Arrange
|
// Arrange
|
||||||
grm := NewNestedPendingOperations(true /* exponentialBackOffOnError */)
|
grm := NewNestedPendingOperations(true /* exponentialBackOffOnError */)
|
||||||
volumeName := v1.UniqueVolumeName("volume-name")
|
volumeName := v1.UniqueVolumeName("volume-name")
|
||||||
operation := func() (error, error) { return nil, nil }
|
operation := func() (error, error) { return nil, nil }
|
||||||
|
|
||||||
// Act
|
// Act
|
||||||
err := grm.Run(volumeName, "" /* operationSubName */, "" /* nodeName */, volumetypes.GeneratedOperations{OperationFunc: operation})
|
err := grm.Run(volumeName, "" /* operationSubName */, types.GeneratedOperations{OperationFunc: operation})
|
||||||
|
|
||||||
// Assert
|
// Assert
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatalf("NestedPendingOperations failed. Expected: <no error> Actual: <%v>", err)
|
t.Fatalf("NewGoRoutine failed. Expected: <no error> Actual: <%v>", err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func Test_NestedPendingOperations_Positive_SecondOpAfterFirstCompletes(t *testing.T) {
|
func Test_NewGoRoutineMap_Positive_SecondOpAfterFirstCompletes(t *testing.T) {
|
||||||
// Arrange
|
// Arrange
|
||||||
grm := NewNestedPendingOperations(false /* exponentialBackOffOnError */)
|
grm := NewNestedPendingOperations(false /* exponentialBackOffOnError */)
|
||||||
volumeName := v1.UniqueVolumeName("volume-name")
|
volumeName := v1.UniqueVolumeName("volume-name")
|
||||||
operation1DoneCh := make(chan interface{}, 0 /* bufferSize */)
|
operation1DoneCh := make(chan interface{}, 0 /* bufferSize */)
|
||||||
operation1 := generateCallbackFunc(operation1DoneCh)
|
operation1 := generateCallbackFunc(operation1DoneCh)
|
||||||
err1 := grm.Run(volumeName, "" /* operationSubName */, "" /* nodeName */, volumetypes.GeneratedOperations{OperationFunc: operation1})
|
err1 := grm.Run(volumeName, "" /* operationSubName */, types.GeneratedOperations{OperationFunc: operation1})
|
||||||
if err1 != nil {
|
if err1 != nil {
|
||||||
t.Fatalf("NestedPendingOperations failed. Expected: <no error> Actual: <%v>", err1)
|
t.Fatalf("NewGoRoutine failed. Expected: <no error> Actual: <%v>", err1)
|
||||||
}
|
}
|
||||||
operation2 := generateNoopFunc()
|
operation2 := generateNoopFunc()
|
||||||
<-operation1DoneCh // Force operation1 to complete
|
<-operation1DoneCh // Force operation1 to complete
|
||||||
@ -135,9 +134,9 @@ func Test_NestedPendingOperations_Positive_SecondOpAfterFirstCompletes(t *testin
|
|||||||
err2 := retryWithExponentialBackOff(
|
err2 := retryWithExponentialBackOff(
|
||||||
time.Duration(initialOperationWaitTimeShort),
|
time.Duration(initialOperationWaitTimeShort),
|
||||||
func() (bool, error) {
|
func() (bool, error) {
|
||||||
err := grm.Run(volumeName, "" /* operationSubName */, "" /* nodeName */, volumetypes.GeneratedOperations{OperationFunc: operation2})
|
err := grm.Run(volumeName, "" /* operationSubName */, types.GeneratedOperations{OperationFunc: operation2})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Logf("Warning: NestedPendingOperations failed with %v. Will retry.", err)
|
t.Logf("Warning: NewGoRoutine failed with %v. Will retry.", err)
|
||||||
return false, nil
|
return false, nil
|
||||||
}
|
}
|
||||||
return true, nil
|
return true, nil
|
||||||
@ -146,19 +145,19 @@ func Test_NestedPendingOperations_Positive_SecondOpAfterFirstCompletes(t *testin
|
|||||||
|
|
||||||
// Assert
|
// Assert
|
||||||
if err2 != nil {
|
if err2 != nil {
|
||||||
t.Fatalf("NestedPendingOperations failed. Expected: <no error> Actual: <%v>", err2)
|
t.Fatalf("NewGoRoutine failed. Expected: <no error> Actual: <%v>", err2)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func Test_NestedPendingOperations_Positive_SecondOpAfterFirstCompletesWithExpBackoff(t *testing.T) {
|
func Test_NewGoRoutineMap_Positive_SecondOpAfterFirstCompletesWithExpBackoff(t *testing.T) {
|
||||||
// Arrange
|
// Arrange
|
||||||
grm := NewNestedPendingOperations(true /* exponentialBackOffOnError */)
|
grm := NewNestedPendingOperations(true /* exponentialBackOffOnError */)
|
||||||
volumeName := v1.UniqueVolumeName("volume-name")
|
volumeName := v1.UniqueVolumeName("volume-name")
|
||||||
operation1DoneCh := make(chan interface{}, 0 /* bufferSize */)
|
operation1DoneCh := make(chan interface{}, 0 /* bufferSize */)
|
||||||
operation1 := generateCallbackFunc(operation1DoneCh)
|
operation1 := generateCallbackFunc(operation1DoneCh)
|
||||||
err1 := grm.Run(volumeName, "" /* operationSubName */, "" /* nodeName */, volumetypes.GeneratedOperations{OperationFunc: operation1})
|
err1 := grm.Run(volumeName, "" /* operationSubName */, types.GeneratedOperations{OperationFunc: operation1})
|
||||||
if err1 != nil {
|
if err1 != nil {
|
||||||
t.Fatalf("NestedPendingOperations failed. Expected: <no error> Actual: <%v>", err1)
|
t.Fatalf("NewGoRoutine failed. Expected: <no error> Actual: <%v>", err1)
|
||||||
}
|
}
|
||||||
operation2 := generateNoopFunc()
|
operation2 := generateNoopFunc()
|
||||||
<-operation1DoneCh // Force operation1 to complete
|
<-operation1DoneCh // Force operation1 to complete
|
||||||
@ -167,9 +166,9 @@ func Test_NestedPendingOperations_Positive_SecondOpAfterFirstCompletesWithExpBac
|
|||||||
err2 := retryWithExponentialBackOff(
|
err2 := retryWithExponentialBackOff(
|
||||||
time.Duration(initialOperationWaitTimeShort),
|
time.Duration(initialOperationWaitTimeShort),
|
||||||
func() (bool, error) {
|
func() (bool, error) {
|
||||||
err := grm.Run(volumeName, "" /* operationSubName */, "" /* nodeName */, volumetypes.GeneratedOperations{OperationFunc: operation2})
|
err := grm.Run(volumeName, "" /* operationSubName */, types.GeneratedOperations{OperationFunc: operation2})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Logf("Warning: NestedPendingOperations failed with %v. Will retry.", err)
|
t.Logf("Warning: NewGoRoutine failed with %v. Will retry.", err)
|
||||||
return false, nil
|
return false, nil
|
||||||
}
|
}
|
||||||
return true, nil
|
return true, nil
|
||||||
@ -178,18 +177,18 @@ func Test_NestedPendingOperations_Positive_SecondOpAfterFirstCompletesWithExpBac
|
|||||||
|
|
||||||
// Assert
|
// Assert
|
||||||
if err2 != nil {
|
if err2 != nil {
|
||||||
t.Fatalf("NestedPendingOperations failed. Expected: <no error> Actual: <%v>", err2)
|
t.Fatalf("NewGoRoutine failed. Expected: <no error> Actual: <%v>", err2)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func Test_NestedPendingOperations_Positive_SecondOpAfterFirstPanics(t *testing.T) {
|
func Test_NewGoRoutineMap_Positive_SecondOpAfterFirstPanics(t *testing.T) {
|
||||||
// Arrange
|
// Arrange
|
||||||
grm := NewNestedPendingOperations(false /* exponentialBackOffOnError */)
|
grm := NewNestedPendingOperations(false /* exponentialBackOffOnError */)
|
||||||
volumeName := v1.UniqueVolumeName("volume-name")
|
volumeName := v1.UniqueVolumeName("volume-name")
|
||||||
operation1 := generatePanicFunc()
|
operation1 := generatePanicFunc()
|
||||||
err1 := grm.Run(volumeName, "" /* operationSubName */, "" /* nodeName */, volumetypes.GeneratedOperations{OperationFunc: operation1})
|
err1 := grm.Run(volumeName, "" /* operationSubName */, types.GeneratedOperations{OperationFunc: operation1})
|
||||||
if err1 != nil {
|
if err1 != nil {
|
||||||
t.Fatalf("NestedPendingOperations failed. Expected: <no error> Actual: <%v>", err1)
|
t.Fatalf("NewGoRoutine failed. Expected: <no error> Actual: <%v>", err1)
|
||||||
}
|
}
|
||||||
operation2 := generateNoopFunc()
|
operation2 := generateNoopFunc()
|
||||||
|
|
||||||
@ -197,9 +196,9 @@ func Test_NestedPendingOperations_Positive_SecondOpAfterFirstPanics(t *testing.T
|
|||||||
err2 := retryWithExponentialBackOff(
|
err2 := retryWithExponentialBackOff(
|
||||||
time.Duration(initialOperationWaitTimeShort),
|
time.Duration(initialOperationWaitTimeShort),
|
||||||
func() (bool, error) {
|
func() (bool, error) {
|
||||||
err := grm.Run(volumeName, "" /* operationSubName */, "" /* nodeName */, volumetypes.GeneratedOperations{OperationFunc: operation2})
|
err := grm.Run(volumeName, "" /* operationSubName */, types.GeneratedOperations{OperationFunc: operation2})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Logf("Warning: NestedPendingOperations failed with %v. Will retry.", err)
|
t.Logf("Warning: NewGoRoutine failed with %v. Will retry.", err)
|
||||||
return false, nil
|
return false, nil
|
||||||
}
|
}
|
||||||
return true, nil
|
return true, nil
|
||||||
@ -208,18 +207,18 @@ func Test_NestedPendingOperations_Positive_SecondOpAfterFirstPanics(t *testing.T
|
|||||||
|
|
||||||
// Assert
|
// Assert
|
||||||
if err2 != nil {
|
if err2 != nil {
|
||||||
t.Fatalf("NestedPendingOperations failed. Expected: <no error> Actual: <%v>", err2)
|
t.Fatalf("NewGoRoutine failed. Expected: <no error> Actual: <%v>", err2)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func Test_NestedPendingOperations_Positive_SecondOpAfterFirstPanicsWithExpBackoff(t *testing.T) {
|
func Test_NewGoRoutineMap_Positive_SecondOpAfterFirstPanicsWithExpBackoff(t *testing.T) {
|
||||||
// Arrange
|
// Arrange
|
||||||
grm := NewNestedPendingOperations(true /* exponentialBackOffOnError */)
|
grm := NewNestedPendingOperations(true /* exponentialBackOffOnError */)
|
||||||
volumeName := v1.UniqueVolumeName("volume-name")
|
volumeName := v1.UniqueVolumeName("volume-name")
|
||||||
operation1 := generatePanicFunc()
|
operation1 := generatePanicFunc()
|
||||||
err1 := grm.Run(volumeName, "" /* operationSubName */, "" /* nodeName */, volumetypes.GeneratedOperations{OperationFunc: operation1})
|
err1 := grm.Run(volumeName, "" /* operationSubName */, types.GeneratedOperations{OperationFunc: operation1})
|
||||||
if err1 != nil {
|
if err1 != nil {
|
||||||
t.Fatalf("NestedPendingOperations failed. Expected: <no error> Actual: <%v>", err1)
|
t.Fatalf("NewGoRoutine failed. Expected: <no error> Actual: <%v>", err1)
|
||||||
}
|
}
|
||||||
operation2 := generateNoopFunc()
|
operation2 := generateNoopFunc()
|
||||||
|
|
||||||
@ -227,9 +226,9 @@ func Test_NestedPendingOperations_Positive_SecondOpAfterFirstPanicsWithExpBackof
|
|||||||
err2 := retryWithExponentialBackOff(
|
err2 := retryWithExponentialBackOff(
|
||||||
time.Duration(initialOperationWaitTimeLong), // Longer duration to accommodate for backoff
|
time.Duration(initialOperationWaitTimeLong), // Longer duration to accommodate for backoff
|
||||||
func() (bool, error) {
|
func() (bool, error) {
|
||||||
err := grm.Run(volumeName, "" /* operationSubName */, "" /* nodeName */, volumetypes.GeneratedOperations{OperationFunc: operation2})
|
err := grm.Run(volumeName, "" /* operationSubName */, types.GeneratedOperations{OperationFunc: operation2})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Logf("Warning: NestedPendingOperations failed with %v. Will retry.", err)
|
t.Logf("Warning: NewGoRoutine failed with %v. Will retry.", err)
|
||||||
return false, nil
|
return false, nil
|
||||||
}
|
}
|
||||||
return true, nil
|
return true, nil
|
||||||
@ -238,43 +237,43 @@ func Test_NestedPendingOperations_Positive_SecondOpAfterFirstPanicsWithExpBackof
|
|||||||
|
|
||||||
// Assert
|
// Assert
|
||||||
if err2 != nil {
|
if err2 != nil {
|
||||||
t.Fatalf("NestedPendingOperations failed. Expected: <no error> Actual: <%v>", err2)
|
t.Fatalf("NewGoRoutine failed. Expected: <no error> Actual: <%v>", err2)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func Test_NestedPendingOperations_Negative_SecondOpBeforeFirstCompletes(t *testing.T) {
|
func Test_NewGoRoutineMap_Negative_SecondOpBeforeFirstCompletes(t *testing.T) {
|
||||||
// Arrange
|
// Arrange
|
||||||
grm := NewNestedPendingOperations(false /* exponentialBackOffOnError */)
|
grm := NewNestedPendingOperations(false /* exponentialBackOffOnError */)
|
||||||
volumeName := v1.UniqueVolumeName("volume-name")
|
volumeName := v1.UniqueVolumeName("volume-name")
|
||||||
operation1DoneCh := make(chan interface{}, 0 /* bufferSize */)
|
operation1DoneCh := make(chan interface{}, 0 /* bufferSize */)
|
||||||
operation1 := generateWaitFunc(operation1DoneCh)
|
operation1 := generateWaitFunc(operation1DoneCh)
|
||||||
err1 := grm.Run(volumeName, "" /* operationSubName */, "" /* nodeName */, volumetypes.GeneratedOperations{OperationFunc: operation1})
|
err1 := grm.Run(volumeName, "" /* operationSubName */, types.GeneratedOperations{OperationFunc: operation1})
|
||||||
if err1 != nil {
|
if err1 != nil {
|
||||||
t.Fatalf("NestedPendingOperations failed. Expected: <no error> Actual: <%v>", err1)
|
t.Fatalf("NewGoRoutine failed. Expected: <no error> Actual: <%v>", err1)
|
||||||
}
|
}
|
||||||
operation2 := generateNoopFunc()
|
operation2 := generateNoopFunc()
|
||||||
|
|
||||||
// Act
|
// Act
|
||||||
err2 := grm.Run(volumeName, "" /* operationSubName */, "" /* nodeName */, volumetypes.GeneratedOperations{OperationFunc: operation2})
|
err2 := grm.Run(volumeName, "" /* operationSubName */, types.GeneratedOperations{OperationFunc: operation2})
|
||||||
|
|
||||||
// Assert
|
// Assert
|
||||||
if err2 == nil {
|
if err2 == nil {
|
||||||
t.Fatalf("NestedPendingOperations did not fail. Expected: <Failed to create operation with name \"%s\". An operation with that name already exists.> Actual: <no error>", volumeName)
|
t.Fatalf("NewGoRoutine did not fail. Expected: <Failed to create operation with name \"%s\". An operation with that name already exists.> Actual: <no error>", volumeName)
|
||||||
}
|
}
|
||||||
if !IsAlreadyExists(err2) {
|
if !IsAlreadyExists(err2) {
|
||||||
t.Fatalf("NestedPendingOperations did not return alreadyExistsError, got: %v", err2)
|
t.Fatalf("NewGoRoutine did not return alreadyExistsError, got: %v", err2)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func Test_NestedPendingOperations_Negative_SecondThirdOpWithDifferentNames(t *testing.T) {
|
func Test_NewGoRoutineMap_Negative_SecondThirdOpWithDifferentNames(t *testing.T) {
|
||||||
// Arrange
|
// Arrange
|
||||||
grm := NewNestedPendingOperations(true /* exponentialBackOffOnError */)
|
grm := NewNestedPendingOperations(true /* exponentialBackOffOnError */)
|
||||||
volumeName := v1.UniqueVolumeName("volume-name")
|
volumeName := v1.UniqueVolumeName("volume-name")
|
||||||
op1Name := "mount_volume"
|
op1Name := "mount_volume"
|
||||||
operation1 := generateErrorFunc()
|
operation1 := generateErrorFunc()
|
||||||
err1 := grm.Run(volumeName, "" /* operationSubName */, "" /* nodeName */, volumetypes.GeneratedOperations{OperationFunc: operation1, OperationName: op1Name})
|
err1 := grm.Run(volumeName, "" /* operationSubName */, types.GeneratedOperations{OperationFunc: operation1, OperationName: op1Name})
|
||||||
if err1 != nil {
|
if err1 != nil {
|
||||||
t.Fatalf("NestedPendingOperations failed. Expected: <no error> Actual: <%v>", err1)
|
t.Fatalf("NewGoRoutine failed. Expected: <no error> Actual: <%v>", err1)
|
||||||
}
|
}
|
||||||
// Shorter than exponential backoff period, so as to trigger exponential backoff error on second
|
// Shorter than exponential backoff period, so as to trigger exponential backoff error on second
|
||||||
// operation.
|
// operation.
|
||||||
@ -284,8 +283,7 @@ func Test_NestedPendingOperations_Negative_SecondThirdOpWithDifferentNames(t *te
|
|||||||
func() (bool, error) {
|
func() (bool, error) {
|
||||||
err := grm.Run(volumeName,
|
err := grm.Run(volumeName,
|
||||||
"", /* operationSubName */
|
"", /* operationSubName */
|
||||||
"", /* nodeName */
|
types.GeneratedOperations{OperationFunc: operation2, OperationName: op1Name})
|
||||||
volumetypes.GeneratedOperations{OperationFunc: operation2, OperationName: op1Name})
|
|
||||||
|
|
||||||
if exponentialbackoff.IsExponentialBackoff(err) {
|
if exponentialbackoff.IsExponentialBackoff(err) {
|
||||||
return true, nil
|
return true, nil
|
||||||
@ -296,114 +294,114 @@ func Test_NestedPendingOperations_Negative_SecondThirdOpWithDifferentNames(t *te
|
|||||||
|
|
||||||
// Assert
|
// Assert
|
||||||
if err2 != nil {
|
if err2 != nil {
|
||||||
t.Fatalf("Expected NestedPendingOperations to fail with exponential backoff for operationKey : %s and operationName : %s", volumeName, op1Name)
|
t.Fatalf("Expected NewGoRoutine to fail with exponential backoff for operationKey : %s and operationName : %s", volumeName, op1Name)
|
||||||
}
|
}
|
||||||
|
|
||||||
operation3 := generateNoopFunc()
|
operation3 := generateNoopFunc()
|
||||||
op3Name := "unmount_volume"
|
op3Name := "unmount_volume"
|
||||||
// Act
|
// Act
|
||||||
err3 := grm.Run(volumeName, "" /*pod name*/, "" /* nodeName */, volumetypes.GeneratedOperations{OperationFunc: operation3, OperationName: op3Name})
|
err3 := grm.Run(volumeName, "" /*pod name*/, types.GeneratedOperations{OperationFunc: operation3, OperationName: op3Name})
|
||||||
if err3 != nil {
|
if err3 != nil {
|
||||||
t.Fatalf("NestedPendingOperations failed. Expected <no error> Actual: <%v>", err3)
|
t.Fatalf("NewGoRoutine failed. Expected <no error> Actual: <%v>", err3)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func Test_NestedPendingOperations_Negative_SecondSubOpBeforeFirstCompletes2(t *testing.T) {
|
func Test_NewGoRoutineMap_Negative_SecondSubOpBeforeFirstCompletes2(t *testing.T) {
|
||||||
// Arrange
|
// Arrange
|
||||||
grm := NewNestedPendingOperations(false /* exponentialBackOffOnError */)
|
grm := NewNestedPendingOperations(false /* exponentialBackOffOnError */)
|
||||||
volumeName := v1.UniqueVolumeName("volume-name")
|
volumeName := v1.UniqueVolumeName("volume-name")
|
||||||
operationPodName := volumetypes.UniquePodName("operation-podname")
|
operationPodName := types.UniquePodName("operation-podname")
|
||||||
operation1DoneCh := make(chan interface{}, 0 /* bufferSize */)
|
operation1DoneCh := make(chan interface{}, 0 /* bufferSize */)
|
||||||
operation1 := generateWaitFunc(operation1DoneCh)
|
operation1 := generateWaitFunc(operation1DoneCh)
|
||||||
err1 := grm.Run(volumeName, operationPodName, "" /* nodeName */, volumetypes.GeneratedOperations{OperationFunc: operation1})
|
err1 := grm.Run(volumeName, operationPodName, types.GeneratedOperations{OperationFunc: operation1})
|
||||||
if err1 != nil {
|
if err1 != nil {
|
||||||
t.Fatalf("NestedPendingOperations failed. Expected: <no error> Actual: <%v>", err1)
|
t.Fatalf("NewGoRoutine failed. Expected: <no error> Actual: <%v>", err1)
|
||||||
}
|
}
|
||||||
operation2 := generateNoopFunc()
|
operation2 := generateNoopFunc()
|
||||||
|
|
||||||
// Act
|
// Act
|
||||||
err2 := grm.Run(volumeName, operationPodName, "" /* nodeName */, volumetypes.GeneratedOperations{OperationFunc: operation2})
|
err2 := grm.Run(volumeName, operationPodName, types.GeneratedOperations{OperationFunc: operation2})
|
||||||
|
|
||||||
// Assert
|
// Assert
|
||||||
if err2 == nil {
|
if err2 == nil {
|
||||||
t.Fatalf("NestedPendingOperations did not fail. Expected: <Failed to create operation with name \"%s\". An operation with that name already exists.> Actual: <no error>", volumeName)
|
t.Fatalf("NewGoRoutine did not fail. Expected: <Failed to create operation with name \"%s\". An operation with that name already exists.> Actual: <no error>", volumeName)
|
||||||
}
|
}
|
||||||
if !IsAlreadyExists(err2) {
|
if !IsAlreadyExists(err2) {
|
||||||
t.Fatalf("NestedPendingOperations did not return alreadyExistsError, got: %v", err2)
|
t.Fatalf("NewGoRoutine did not return alreadyExistsError, got: %v", err2)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func Test_NestedPendingOperations_Negative_SecondSubOpBeforeFirstCompletes(t *testing.T) {
|
func Test_NewGoRoutineMap_Negative_SecondSubOpBeforeFirstCompletes(t *testing.T) {
|
||||||
// Arrange
|
// Arrange
|
||||||
grm := NewNestedPendingOperations(false /* exponentialBackOffOnError */)
|
grm := NewNestedPendingOperations(false /* exponentialBackOffOnError */)
|
||||||
volumeName := v1.UniqueVolumeName("volume-name")
|
volumeName := v1.UniqueVolumeName("volume-name")
|
||||||
operationPodName := volumetypes.UniquePodName("operation-podname")
|
operationPodName := types.UniquePodName("operation-podname")
|
||||||
operation1DoneCh := make(chan interface{}, 0 /* bufferSize */)
|
operation1DoneCh := make(chan interface{}, 0 /* bufferSize */)
|
||||||
operation1 := generateWaitFunc(operation1DoneCh)
|
operation1 := generateWaitFunc(operation1DoneCh)
|
||||||
err1 := grm.Run(volumeName, operationPodName, "" /* nodeName */, volumetypes.GeneratedOperations{OperationFunc: operation1})
|
err1 := grm.Run(volumeName, operationPodName, types.GeneratedOperations{OperationFunc: operation1})
|
||||||
if err1 != nil {
|
if err1 != nil {
|
||||||
t.Fatalf("NestedPendingOperations failed. Expected: <no error> Actual: <%v>", err1)
|
t.Fatalf("NewGoRoutine failed. Expected: <no error> Actual: <%v>", err1)
|
||||||
}
|
}
|
||||||
operation2 := generateNoopFunc()
|
operation2 := generateNoopFunc()
|
||||||
|
|
||||||
// Act
|
// Act
|
||||||
err2 := grm.Run(volumeName, operationPodName, "" /* nodeName */, volumetypes.GeneratedOperations{OperationFunc: operation2})
|
err2 := grm.Run(volumeName, operationPodName, types.GeneratedOperations{OperationFunc: operation2})
|
||||||
|
|
||||||
// Assert
|
// Assert
|
||||||
if err2 == nil {
|
if err2 == nil {
|
||||||
t.Fatalf("NestedPendingOperations did not fail. Expected: <Failed to create operation with name \"%s\". An operation with that name already exists.> Actual: <no error>", volumeName)
|
t.Fatalf("NewGoRoutine did not fail. Expected: <Failed to create operation with name \"%s\". An operation with that name already exists.> Actual: <no error>", volumeName)
|
||||||
}
|
}
|
||||||
if !IsAlreadyExists(err2) {
|
if !IsAlreadyExists(err2) {
|
||||||
t.Fatalf("NestedPendingOperations did not return alreadyExistsError, got: %v", err2)
|
t.Fatalf("NewGoRoutine did not return alreadyExistsError, got: %v", err2)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func Test_NestedPendingOperations_Negative_SecondOpBeforeFirstCompletesWithExpBackoff(t *testing.T) {
|
func Test_NewGoRoutineMap_Negative_SecondOpBeforeFirstCompletesWithExpBackoff(t *testing.T) {
|
||||||
// Arrange
|
// Arrange
|
||||||
grm := NewNestedPendingOperations(true /* exponentialBackOffOnError */)
|
grm := NewNestedPendingOperations(true /* exponentialBackOffOnError */)
|
||||||
volumeName := v1.UniqueVolumeName("volume-name")
|
volumeName := v1.UniqueVolumeName("volume-name")
|
||||||
operation1DoneCh := make(chan interface{}, 0 /* bufferSize */)
|
operation1DoneCh := make(chan interface{}, 0 /* bufferSize */)
|
||||||
operation1 := generateWaitFunc(operation1DoneCh)
|
operation1 := generateWaitFunc(operation1DoneCh)
|
||||||
err1 := grm.Run(volumeName, "" /* operationSubName */, "" /* nodeName */, volumetypes.GeneratedOperations{OperationFunc: operation1})
|
err1 := grm.Run(volumeName, "" /* operationSubName */, types.GeneratedOperations{OperationFunc: operation1})
|
||||||
if err1 != nil {
|
if err1 != nil {
|
||||||
t.Fatalf("NestedPendingOperations failed. Expected: <no error> Actual: <%v>", err1)
|
t.Fatalf("NewGoRoutine failed. Expected: <no error> Actual: <%v>", err1)
|
||||||
}
|
}
|
||||||
operation2 := generateNoopFunc()
|
operation2 := generateNoopFunc()
|
||||||
|
|
||||||
// Act
|
// Act
|
||||||
err2 := grm.Run(volumeName, "" /* operationSubName */, "" /* nodeName */, volumetypes.GeneratedOperations{OperationFunc: operation2})
|
err2 := grm.Run(volumeName, "" /* operationSubName */, types.GeneratedOperations{OperationFunc: operation2})
|
||||||
|
|
||||||
// Assert
|
// Assert
|
||||||
if err2 == nil {
|
if err2 == nil {
|
||||||
t.Fatalf("NestedPendingOperations did not fail. Expected: <Failed to create operation with name \"%s\". An operation with that name already exists.> Actual: <no error>", volumeName)
|
t.Fatalf("NewGoRoutine did not fail. Expected: <Failed to create operation with name \"%s\". An operation with that name already exists.> Actual: <no error>", volumeName)
|
||||||
}
|
}
|
||||||
if !IsAlreadyExists(err2) {
|
if !IsAlreadyExists(err2) {
|
||||||
t.Fatalf("NestedPendingOperations did not return alreadyExistsError, got: %v", err2)
|
t.Fatalf("NewGoRoutine did not return alreadyExistsError, got: %v", err2)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func Test_NestedPendingOperations_Positive_ThirdOpAfterFirstCompletes(t *testing.T) {
|
func Test_NewGoRoutineMap_Positive_ThirdOpAfterFirstCompletes(t *testing.T) {
|
||||||
// Arrange
|
// Arrange
|
||||||
grm := NewNestedPendingOperations(false /* exponentialBackOffOnError */)
|
grm := NewNestedPendingOperations(false /* exponentialBackOffOnError */)
|
||||||
volumeName := v1.UniqueVolumeName("volume-name")
|
volumeName := v1.UniqueVolumeName("volume-name")
|
||||||
operation1DoneCh := make(chan interface{}, 0 /* bufferSize */)
|
operation1DoneCh := make(chan interface{}, 0 /* bufferSize */)
|
||||||
operation1 := generateWaitFunc(operation1DoneCh)
|
operation1 := generateWaitFunc(operation1DoneCh)
|
||||||
err1 := grm.Run(volumeName, "" /* operationSubName */, "" /* nodeName */, volumetypes.GeneratedOperations{OperationFunc: operation1})
|
err1 := grm.Run(volumeName, "" /* operationSubName */, types.GeneratedOperations{OperationFunc: operation1})
|
||||||
if err1 != nil {
|
if err1 != nil {
|
||||||
t.Fatalf("NestedPendingOperations failed. Expected: <no error> Actual: <%v>", err1)
|
t.Fatalf("NewGoRoutine failed. Expected: <no error> Actual: <%v>", err1)
|
||||||
}
|
}
|
||||||
operation2 := generateNoopFunc()
|
operation2 := generateNoopFunc()
|
||||||
operation3 := generateNoopFunc()
|
operation3 := generateNoopFunc()
|
||||||
|
|
||||||
// Act
|
// Act
|
||||||
err2 := grm.Run(volumeName, "" /* operationSubName */, "" /* nodeName */, volumetypes.GeneratedOperations{OperationFunc: operation2})
|
err2 := grm.Run(volumeName, "" /* operationSubName */, types.GeneratedOperations{OperationFunc: operation2})
|
||||||
|
|
||||||
// Assert
|
// Assert
|
||||||
if err2 == nil {
|
if err2 == nil {
|
||||||
t.Fatalf("NestedPendingOperations did not fail. Expected: <Failed to create operation with name \"%s\". An operation with that name already exists.> Actual: <no error>", volumeName)
|
t.Fatalf("NewGoRoutine did not fail. Expected: <Failed to create operation with name \"%s\". An operation with that name already exists.> Actual: <no error>", volumeName)
|
||||||
}
|
}
|
||||||
if !IsAlreadyExists(err2) {
|
if !IsAlreadyExists(err2) {
|
||||||
t.Fatalf("NestedPendingOperations did not return alreadyExistsError, got: %v", err2)
|
t.Fatalf("NewGoRoutine did not return alreadyExistsError, got: %v", err2)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Act
|
// Act
|
||||||
@ -411,9 +409,9 @@ func Test_NestedPendingOperations_Positive_ThirdOpAfterFirstCompletes(t *testing
|
|||||||
err3 := retryWithExponentialBackOff(
|
err3 := retryWithExponentialBackOff(
|
||||||
time.Duration(initialOperationWaitTimeShort),
|
time.Duration(initialOperationWaitTimeShort),
|
||||||
func() (bool, error) {
|
func() (bool, error) {
|
||||||
err := grm.Run(volumeName, "" /* operationSubName */, "" /* nodeName */, volumetypes.GeneratedOperations{OperationFunc: operation3})
|
err := grm.Run(volumeName, "" /* operationSubName */, types.GeneratedOperations{OperationFunc: operation3})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Logf("Warning: NestedPendingOperations failed with %v. Will retry.", err)
|
t.Logf("Warning: NewGoRoutine failed with %v. Will retry.", err)
|
||||||
return false, nil
|
return false, nil
|
||||||
}
|
}
|
||||||
return true, nil
|
return true, nil
|
||||||
@ -422,32 +420,32 @@ func Test_NestedPendingOperations_Positive_ThirdOpAfterFirstCompletes(t *testing
|
|||||||
|
|
||||||
// Assert
|
// Assert
|
||||||
if err3 != nil {
|
if err3 != nil {
|
||||||
t.Fatalf("NestedPendingOperations failed. Expected: <no error> Actual: <%v>", err3)
|
t.Fatalf("NewGoRoutine failed. Expected: <no error> Actual: <%v>", err3)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func Test_NestedPendingOperations_Positive_ThirdOpAfterFirstCompletesWithExpBackoff(t *testing.T) {
|
func Test_NewGoRoutineMap_Positive_ThirdOpAfterFirstCompletesWithExpBackoff(t *testing.T) {
|
||||||
// Arrange
|
// Arrange
|
||||||
grm := NewNestedPendingOperations(true /* exponentialBackOffOnError */)
|
grm := NewNestedPendingOperations(true /* exponentialBackOffOnError */)
|
||||||
volumeName := v1.UniqueVolumeName("volume-name")
|
volumeName := v1.UniqueVolumeName("volume-name")
|
||||||
operation1DoneCh := make(chan interface{}, 0 /* bufferSize */)
|
operation1DoneCh := make(chan interface{}, 0 /* bufferSize */)
|
||||||
operation1 := generateWaitFunc(operation1DoneCh)
|
operation1 := generateWaitFunc(operation1DoneCh)
|
||||||
err1 := grm.Run(volumeName, "" /* operationSubName */, "" /* nodeName */, volumetypes.GeneratedOperations{OperationFunc: operation1})
|
err1 := grm.Run(volumeName, "" /* operationSubName */, types.GeneratedOperations{OperationFunc: operation1})
|
||||||
if err1 != nil {
|
if err1 != nil {
|
||||||
t.Fatalf("NestedPendingOperations failed. Expected: <no error> Actual: <%v>", err1)
|
t.Fatalf("NewGoRoutine failed. Expected: <no error> Actual: <%v>", err1)
|
||||||
}
|
}
|
||||||
operation2 := generateNoopFunc()
|
operation2 := generateNoopFunc()
|
||||||
operation3 := generateNoopFunc()
|
operation3 := generateNoopFunc()
|
||||||
|
|
||||||
// Act
|
// Act
|
||||||
err2 := grm.Run(volumeName, "" /* operationSubName */, "" /* nodeName */, volumetypes.GeneratedOperations{OperationFunc: operation2})
|
err2 := grm.Run(volumeName, "" /* operationSubName */, types.GeneratedOperations{OperationFunc: operation2})
|
||||||
|
|
||||||
// Assert
|
// Assert
|
||||||
if err2 == nil {
|
if err2 == nil {
|
||||||
t.Fatalf("NestedPendingOperations did not fail. Expected: <Failed to create operation with name \"%s\". An operation with that name already exists.> Actual: <no error>", volumeName)
|
t.Fatalf("NewGoRoutine did not fail. Expected: <Failed to create operation with name \"%s\". An operation with that name already exists.> Actual: <no error>", volumeName)
|
||||||
}
|
}
|
||||||
if !IsAlreadyExists(err2) {
|
if !IsAlreadyExists(err2) {
|
||||||
t.Fatalf("NestedPendingOperations did not return alreadyExistsError, got: %v", err2)
|
t.Fatalf("NewGoRoutine did not return alreadyExistsError, got: %v", err2)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Act
|
// Act
|
||||||
@ -455,9 +453,9 @@ func Test_NestedPendingOperations_Positive_ThirdOpAfterFirstCompletesWithExpBack
|
|||||||
err3 := retryWithExponentialBackOff(
|
err3 := retryWithExponentialBackOff(
|
||||||
time.Duration(initialOperationWaitTimeShort),
|
time.Duration(initialOperationWaitTimeShort),
|
||||||
func() (bool, error) {
|
func() (bool, error) {
|
||||||
err := grm.Run(volumeName, "" /* operationSubName */, "" /* nodeName */, volumetypes.GeneratedOperations{OperationFunc: operation3})
|
err := grm.Run(volumeName, "" /* operationSubName */, types.GeneratedOperations{OperationFunc: operation3})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Logf("Warning: NestedPendingOperations failed with %v. Will retry.", err)
|
t.Logf("Warning: NewGoRoutine failed with %v. Will retry.", err)
|
||||||
return false, nil
|
return false, nil
|
||||||
}
|
}
|
||||||
return true, nil
|
return true, nil
|
||||||
@ -466,11 +464,11 @@ func Test_NestedPendingOperations_Positive_ThirdOpAfterFirstCompletesWithExpBack
|
|||||||
|
|
||||||
// Assert
|
// Assert
|
||||||
if err3 != nil {
|
if err3 != nil {
|
||||||
t.Fatalf("NestedPendingOperations failed. Expected: <no error> Actual: <%v>", err3)
|
t.Fatalf("NewGoRoutine failed. Expected: <no error> Actual: <%v>", err3)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func Test_NestedPendingOperations_Positive_WaitEmpty(t *testing.T) {
|
func Test_NewGoRoutineMap_Positive_WaitEmpty(t *testing.T) {
|
||||||
// Test than Wait() on empty GoRoutineMap always succeeds without blocking
|
// Test than Wait() on empty GoRoutineMap always succeeds without blocking
|
||||||
// Arrange
|
// Arrange
|
||||||
grm := NewNestedPendingOperations(false /* exponentialBackOffOnError */)
|
grm := NewNestedPendingOperations(false /* exponentialBackOffOnError */)
|
||||||
@ -489,7 +487,7 @@ func Test_NestedPendingOperations_Positive_WaitEmpty(t *testing.T) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func Test_NestedPendingOperations_Positive_WaitEmptyWithExpBackoff(t *testing.T) {
|
func Test_NewGoRoutineMap_Positive_WaitEmptyWithExpBackoff(t *testing.T) {
|
||||||
// Test than Wait() on empty GoRoutineMap always succeeds without blocking
|
// Test than Wait() on empty GoRoutineMap always succeeds without blocking
|
||||||
// Arrange
|
// Arrange
|
||||||
grm := NewNestedPendingOperations(true /* exponentialBackOffOnError */)
|
grm := NewNestedPendingOperations(true /* exponentialBackOffOnError */)
|
||||||
@ -508,16 +506,16 @@ func Test_NestedPendingOperations_Positive_WaitEmptyWithExpBackoff(t *testing.T)
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func Test_NestedPendingOperations_Positive_Wait(t *testing.T) {
|
func Test_NewGoRoutineMap_Positive_Wait(t *testing.T) {
|
||||||
// Test that Wait() really blocks until the last operation succeeds
|
// Test that Wait() really blocks until the last operation succeeds
|
||||||
// Arrange
|
// Arrange
|
||||||
grm := NewNestedPendingOperations(false /* exponentialBackOffOnError */)
|
grm := NewNestedPendingOperations(false /* exponentialBackOffOnError */)
|
||||||
volumeName := v1.UniqueVolumeName("volume-name")
|
volumeName := v1.UniqueVolumeName("volume-name")
|
||||||
operation1DoneCh := make(chan interface{}, 0 /* bufferSize */)
|
operation1DoneCh := make(chan interface{}, 0 /* bufferSize */)
|
||||||
operation1 := generateWaitFunc(operation1DoneCh)
|
operation1 := generateWaitFunc(operation1DoneCh)
|
||||||
err := grm.Run(volumeName, "" /* operationSubName */, "" /* nodeName */, volumetypes.GeneratedOperations{OperationFunc: operation1})
|
err := grm.Run(volumeName, "" /* operationSubName */, types.GeneratedOperations{OperationFunc: operation1})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatalf("NestedPendingOperations failed. Expected: <no error> Actual: <%v>", err)
|
t.Fatalf("NewGoRoutine failed. Expected: <no error> Actual: <%v>", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Act
|
// Act
|
||||||
@ -537,16 +535,16 @@ func Test_NestedPendingOperations_Positive_Wait(t *testing.T) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func Test_NestedPendingOperations_Positive_WaitWithExpBackoff(t *testing.T) {
|
func Test_NewGoRoutineMap_Positive_WaitWithExpBackoff(t *testing.T) {
|
||||||
// Test that Wait() really blocks until the last operation succeeds
|
// Test that Wait() really blocks until the last operation succeeds
|
||||||
// Arrange
|
// Arrange
|
||||||
grm := NewNestedPendingOperations(true /* exponentialBackOffOnError */)
|
grm := NewNestedPendingOperations(true /* exponentialBackOffOnError */)
|
||||||
volumeName := v1.UniqueVolumeName("volume-name")
|
volumeName := v1.UniqueVolumeName("volume-name")
|
||||||
operation1DoneCh := make(chan interface{}, 0 /* bufferSize */)
|
operation1DoneCh := make(chan interface{}, 0 /* bufferSize */)
|
||||||
operation1 := generateWaitFunc(operation1DoneCh)
|
operation1 := generateWaitFunc(operation1DoneCh)
|
||||||
err := grm.Run(volumeName, "" /* operationSubName */, "" /* nodeName */, volumetypes.GeneratedOperations{OperationFunc: operation1})
|
err := grm.Run(volumeName, "" /* operationSubName */, types.GeneratedOperations{OperationFunc: operation1})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatalf("NestedPendingOperations failed. Expected: <no error> Actual: <%v>", err)
|
t.Fatalf("NewGoRoutine failed. Expected: <no error> Actual: <%v>", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Act
|
// Act
|
||||||
@ -566,215 +564,6 @@ func Test_NestedPendingOperations_Positive_WaitWithExpBackoff(t *testing.T) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/* Concurrent operations tests */
|
|
||||||
|
|
||||||
// Covered cases:
|
|
||||||
// FIRST OP | SECOND OP | RESULT
|
|
||||||
// None | None | Positive
|
|
||||||
// None | Volume | Positive
|
|
||||||
// None | Volume Pod | Positive
|
|
||||||
// None | Volume Node | Positive
|
|
||||||
// Volume | None | Positive
|
|
||||||
// Volume | Volume | Negative (covered in Test_NestedPendingOperations_Negative_SecondOpBeforeFirstCompletes above)
|
|
||||||
// Volume | Volume Pod | Negative
|
|
||||||
// Volume | Volume Node | Negative
|
|
||||||
// Volume Pod | None | Positive
|
|
||||||
// Volume Pod | Volume | Negative
|
|
||||||
// Volume Pod | Volume Pod | Negative (covered in Test_NestedPendingOperations_Negative_SecondSubOpBeforeFirstCompletes above)
|
|
||||||
// Volume Node | None | Positive
|
|
||||||
// Volume Node | Volume | Negative
|
|
||||||
// Volume Node | Volume Node | Negative
|
|
||||||
|
|
||||||
// These cases are not covered because they will never occur within the same
|
|
||||||
// binary, so either result works.
|
|
||||||
// Volume Pod | Volume Node
|
|
||||||
// Volume Node | Volume Pod
|
|
||||||
|
|
||||||
func Test_NestedPendingOperations_Positive_SecondOpBeforeFirstCompletes_FirstNone_SecondNone(t *testing.T) {
|
|
||||||
testConcurrentOperationsPositive(t,
|
|
||||||
"", /* volumeName1 */
|
|
||||||
"", /* podName1 */
|
|
||||||
"", /* nodeName1 */
|
|
||||||
"", /* volumeName1 */
|
|
||||||
"", /* podName2 */
|
|
||||||
"" /* nodeName2 */)
|
|
||||||
}
|
|
||||||
|
|
||||||
func Test_NestedPendingOperations_Positive_SecondOpBeforeFirstCompletes_FirstNone_SecondVolume(t *testing.T) {
|
|
||||||
testConcurrentOperationsPositive(t,
|
|
||||||
"", /* volumeName1 */
|
|
||||||
"", /* podName1 */
|
|
||||||
"", /* nodeName1 */
|
|
||||||
v1.UniqueVolumeName("volume-name"),
|
|
||||||
"", /* podName2 */
|
|
||||||
"" /* nodeName2 */)
|
|
||||||
}
|
|
||||||
|
|
||||||
func Test_NestedPendingOperations_Positive_SecondOpBeforeFirstCompletes_FirstNone_SecondVolumePod(t *testing.T) {
|
|
||||||
testConcurrentOperationsPositive(t,
|
|
||||||
"", /* volumeName1 */
|
|
||||||
"", /* podName1 */
|
|
||||||
"", /* nodeName1 */
|
|
||||||
v1.UniqueVolumeName("volume-name"),
|
|
||||||
volumetypes.UniquePodName("operation-podname"),
|
|
||||||
"" /* nodeName2 */)
|
|
||||||
}
|
|
||||||
|
|
||||||
func Test_NestedPendingOperations_Positive_SecondOpBeforeFirstCompletes_FirstNone_SecondVolumeNode(t *testing.T) {
|
|
||||||
testConcurrentOperationsPositive(t,
|
|
||||||
"", /* volumeName1 */
|
|
||||||
"", /* podName1 */
|
|
||||||
"", /* nodeName1 */
|
|
||||||
v1.UniqueVolumeName("volume-name"),
|
|
||||||
"", /* podName2 */
|
|
||||||
types.NodeName("operation-nodename"))
|
|
||||||
}
|
|
||||||
|
|
||||||
func Test_NestedPendingOperations_Positive_SecondOpBeforeFirstCompletes_FirstVolume_SecondNone(t *testing.T) {
|
|
||||||
testConcurrentOperationsPositive(t,
|
|
||||||
v1.UniqueVolumeName("volume-name"),
|
|
||||||
"", /* podName1 */
|
|
||||||
"", /* nodeName1 */
|
|
||||||
"", /* volumeName1 */
|
|
||||||
"", /* podName2 */
|
|
||||||
"" /* nodeName2 */)
|
|
||||||
}
|
|
||||||
|
|
||||||
func Test_NestedPendingOperations_Negative_SecondOpBeforeFirstCompletes_FirstVolume_SecondVolumePod(t *testing.T) {
|
|
||||||
testConcurrentOperationsNegative(t,
|
|
||||||
v1.UniqueVolumeName("volume-name"),
|
|
||||||
"", /* podName1 */
|
|
||||||
"", /* nodeName1 */
|
|
||||||
v1.UniqueVolumeName("volume-name"),
|
|
||||||
volumetypes.UniquePodName("operation-podname"),
|
|
||||||
"" /* nodeName2 */)
|
|
||||||
}
|
|
||||||
|
|
||||||
func Test_NestedPendingOperations_Negative_SecondOpBeforeFirstCompletes_FirstVolume_SecondVolumeNode(t *testing.T) {
|
|
||||||
testConcurrentOperationsNegative(t,
|
|
||||||
v1.UniqueVolumeName("volume-name"),
|
|
||||||
"", /* podName1 */
|
|
||||||
"", /* nodeName1 */
|
|
||||||
v1.UniqueVolumeName("volume-name"),
|
|
||||||
"", /* podName2 */
|
|
||||||
types.NodeName("operation-nodename"))
|
|
||||||
}
|
|
||||||
|
|
||||||
func Test_NestedPendingOperations_Positive_SecondOpBeforeFirstCompletes_FirstVolumePod_SecondNone(t *testing.T) {
|
|
||||||
testConcurrentOperationsPositive(t,
|
|
||||||
v1.UniqueVolumeName("volume-name"),
|
|
||||||
volumetypes.UniquePodName("operation-podname"),
|
|
||||||
"", /* nodeName1 */
|
|
||||||
"", /* volumeName1 */
|
|
||||||
"", /* podName2 */
|
|
||||||
"" /* nodeName2 */)
|
|
||||||
}
|
|
||||||
|
|
||||||
func Test_NestedPendingOperations_Negative_SecondOpBeforeFirstCompletes_FirstVolumePod_SecondVolume(t *testing.T) {
|
|
||||||
testConcurrentOperationsNegative(t,
|
|
||||||
v1.UniqueVolumeName("volume-name"),
|
|
||||||
volumetypes.UniquePodName("operation-podname"),
|
|
||||||
"", /* nodeName1 */
|
|
||||||
v1.UniqueVolumeName("volume-name"),
|
|
||||||
"", /* podName2 */
|
|
||||||
"" /* nodeName2 */)
|
|
||||||
}
|
|
||||||
|
|
||||||
func Test_NestedPendingOperations_Positive_SecondOpBeforeFirstCompletes_FirstVolumeNode_SecondNone(t *testing.T) {
|
|
||||||
testConcurrentOperationsPositive(t,
|
|
||||||
v1.UniqueVolumeName("volume-name"),
|
|
||||||
"", /* podName1 */
|
|
||||||
types.NodeName("operation-nodename"),
|
|
||||||
"", /* volumeName1 */
|
|
||||||
"", /* podName2 */
|
|
||||||
"" /* nodeName2 */)
|
|
||||||
}
|
|
||||||
|
|
||||||
func Test_NestedPendingOperations_Negative_SecondOpBeforeFirstCompletes_FirstVolumeNode_SecondVolume(t *testing.T) {
|
|
||||||
testConcurrentOperationsNegative(t,
|
|
||||||
v1.UniqueVolumeName("volume-name"),
|
|
||||||
"", /* podName1 */
|
|
||||||
types.NodeName("operation-nodename"),
|
|
||||||
v1.UniqueVolumeName("volume-name"),
|
|
||||||
"", /* podName2 */
|
|
||||||
"" /* nodeName2 */)
|
|
||||||
}
|
|
||||||
|
|
||||||
func Test_NestedPendingOperations_Negative_SecondOpBeforeFirstCompletes_FirstVolumeNode_SecondVolumeNode(t *testing.T) {
|
|
||||||
testConcurrentOperationsNegative(t,
|
|
||||||
v1.UniqueVolumeName("volume-name"),
|
|
||||||
"", /* podName1 */
|
|
||||||
types.NodeName("operation-nodename"),
|
|
||||||
v1.UniqueVolumeName("volume-name"),
|
|
||||||
"", /* podName2 */
|
|
||||||
types.NodeName("operation-nodename"))
|
|
||||||
}
|
|
||||||
|
|
||||||
// testConcurrentOperationsPositive passes if the two operations keyed by the
|
|
||||||
// provided parameters are executed in parallel, and fails otherwise.
|
|
||||||
func testConcurrentOperationsPositive(
|
|
||||||
t *testing.T,
|
|
||||||
volumeName1 v1.UniqueVolumeName,
|
|
||||||
podName1 volumetypes.UniquePodName,
|
|
||||||
nodeName1 types.NodeName,
|
|
||||||
volumeName2 v1.UniqueVolumeName,
|
|
||||||
podName2 volumetypes.UniquePodName,
|
|
||||||
nodeName2 types.NodeName) {
|
|
||||||
|
|
||||||
// Arrange
|
|
||||||
grm := NewNestedPendingOperations(false /* exponentialBackOffOnError */)
|
|
||||||
operation1DoneCh := make(chan interface{}, 0 /* bufferSize */)
|
|
||||||
operation1 := generateWaitFunc(operation1DoneCh)
|
|
||||||
err1 := grm.Run(volumeName1, podName1, nodeName1 /* nodeName */, volumetypes.GeneratedOperations{OperationFunc: operation1})
|
|
||||||
if err1 != nil {
|
|
||||||
t.Fatalf("NestedPendingOperations failed. Expected: <no error> Actual: <%v>", err1)
|
|
||||||
}
|
|
||||||
operation2 := generateNoopFunc()
|
|
||||||
|
|
||||||
// Act
|
|
||||||
err2 := grm.Run(volumeName2, podName2, nodeName2, volumetypes.GeneratedOperations{OperationFunc: operation2})
|
|
||||||
|
|
||||||
// Assert
|
|
||||||
if err2 != nil {
|
|
||||||
t.Fatalf("NestedPendingOperations failed. Expected: <no error> Actual: <%v>", err2)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// testConcurrentOperationsNegative passes if the creation of the second
|
|
||||||
// operation returns an alreadyExists error, and fails otherwise.
|
|
||||||
func testConcurrentOperationsNegative(
|
|
||||||
t *testing.T,
|
|
||||||
volumeName1 v1.UniqueVolumeName,
|
|
||||||
podName1 volumetypes.UniquePodName,
|
|
||||||
nodeName1 types.NodeName,
|
|
||||||
volumeName2 v1.UniqueVolumeName,
|
|
||||||
podName2 volumetypes.UniquePodName,
|
|
||||||
nodeName2 types.NodeName) {
|
|
||||||
|
|
||||||
// Arrange
|
|
||||||
grm := NewNestedPendingOperations(false /* exponentialBackOffOnError */)
|
|
||||||
operation1DoneCh := make(chan interface{}, 0 /* bufferSize */)
|
|
||||||
operation1 := generateWaitFunc(operation1DoneCh)
|
|
||||||
err1 := grm.Run(volumeName1, podName1, nodeName1 /* nodeName */, volumetypes.GeneratedOperations{OperationFunc: operation1})
|
|
||||||
if err1 != nil {
|
|
||||||
t.Fatalf("NestedPendingOperations failed. Expected: <no error> Actual: <%v>", err1)
|
|
||||||
}
|
|
||||||
operation2 := generateNoopFunc()
|
|
||||||
|
|
||||||
// Act
|
|
||||||
err2 := grm.Run(volumeName2, podName2, nodeName2, volumetypes.GeneratedOperations{OperationFunc: operation2})
|
|
||||||
|
|
||||||
// Assert
|
|
||||||
if err2 == nil {
|
|
||||||
t.Fatalf("NestedPendingOperations did not fail. Expected an operation to already exist")
|
|
||||||
}
|
|
||||||
if !IsAlreadyExists(err2) {
|
|
||||||
t.Fatalf("NestedPendingOperations did not return alreadyExistsError, got: %v", err2)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/* END concurrent operations tests */
|
|
||||||
|
|
||||||
func generateCallbackFunc(done chan<- interface{}) func() (error, error) {
|
func generateCallbackFunc(done chan<- interface{}) func() (error, error) {
|
||||||
return func() (error, error) {
|
return func() (error, error) {
|
||||||
done <- true
|
done <- true
|
||||||
|
@ -138,9 +138,9 @@ type OperationExecutor interface {
|
|||||||
// back off on retries.
|
// back off on retries.
|
||||||
VerifyControllerAttachedVolume(volumeToMount VolumeToMount, nodeName types.NodeName, actualStateOfWorld ActualStateOfWorldAttacherUpdater) error
|
VerifyControllerAttachedVolume(volumeToMount VolumeToMount, nodeName types.NodeName, actualStateOfWorld ActualStateOfWorldAttacherUpdater) error
|
||||||
|
|
||||||
// IsOperationPending returns true if an operation for the given volumeName
|
// IsOperationPending returns true if an operation for the given volumeName and podName is pending,
|
||||||
// and one of podName or nodeName is pending, otherwise it returns false
|
// otherwise it returns false
|
||||||
IsOperationPending(volumeName v1.UniqueVolumeName, podName volumetypes.UniquePodName, nodeName types.NodeName) bool
|
IsOperationPending(volumeName v1.UniqueVolumeName, podName volumetypes.UniquePodName) bool
|
||||||
// ExpandInUseVolume will resize volume's file system to expected size without unmounting the volume.
|
// ExpandInUseVolume will resize volume's file system to expected size without unmounting the volume.
|
||||||
ExpandInUseVolume(volumeToMount VolumeToMount, actualStateOfWorld ActualStateOfWorldMounterUpdater) error
|
ExpandInUseVolume(volumeToMount VolumeToMount, actualStateOfWorld ActualStateOfWorldMounterUpdater) error
|
||||||
// ReconstructVolumeOperation construct a new volumeSpec and returns it created by plugin
|
// ReconstructVolumeOperation construct a new volumeSpec and returns it created by plugin
|
||||||
@ -650,11 +650,8 @@ type operationExecutor struct {
|
|||||||
operationGenerator OperationGenerator
|
operationGenerator OperationGenerator
|
||||||
}
|
}
|
||||||
|
|
||||||
func (oe *operationExecutor) IsOperationPending(
|
func (oe *operationExecutor) IsOperationPending(volumeName v1.UniqueVolumeName, podName volumetypes.UniquePodName) bool {
|
||||||
volumeName v1.UniqueVolumeName,
|
return oe.pendingOperations.IsOperationPending(volumeName, podName)
|
||||||
podName volumetypes.UniquePodName,
|
|
||||||
nodeName types.NodeName) bool {
|
|
||||||
return oe.pendingOperations.IsOperationPending(volumeName, podName, nodeName)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (oe *operationExecutor) AttachVolume(
|
func (oe *operationExecutor) AttachVolume(
|
||||||
@ -663,13 +660,8 @@ func (oe *operationExecutor) AttachVolume(
|
|||||||
generatedOperations :=
|
generatedOperations :=
|
||||||
oe.operationGenerator.GenerateAttachVolumeFunc(volumeToAttach, actualStateOfWorld)
|
oe.operationGenerator.GenerateAttachVolumeFunc(volumeToAttach, actualStateOfWorld)
|
||||||
|
|
||||||
if util.IsMultiAttachForbidden(volumeToAttach.VolumeSpec) {
|
|
||||||
return oe.pendingOperations.Run(
|
|
||||||
volumeToAttach.VolumeName, "" /* podName */, "" /* nodeName */, generatedOperations)
|
|
||||||
}
|
|
||||||
|
|
||||||
return oe.pendingOperations.Run(
|
return oe.pendingOperations.Run(
|
||||||
volumeToAttach.VolumeName, "" /* podName */, volumeToAttach.NodeName, generatedOperations)
|
volumeToAttach.VolumeName, "" /* podName */, generatedOperations)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (oe *operationExecutor) DetachVolume(
|
func (oe *operationExecutor) DetachVolume(
|
||||||
@ -682,13 +674,8 @@ func (oe *operationExecutor) DetachVolume(
|
|||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
if util.IsMultiAttachForbidden(volumeToDetach.VolumeSpec) {
|
|
||||||
return oe.pendingOperations.Run(
|
|
||||||
volumeToDetach.VolumeName, "" /* podName */, "" /* nodeName */, generatedOperations)
|
|
||||||
}
|
|
||||||
|
|
||||||
return oe.pendingOperations.Run(
|
return oe.pendingOperations.Run(
|
||||||
volumeToDetach.VolumeName, "" /* podName */, volumeToDetach.NodeName, generatedOperations)
|
volumeToDetach.VolumeName, "" /* podName */, generatedOperations)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (oe *operationExecutor) VerifyVolumesAreAttached(
|
func (oe *operationExecutor) VerifyVolumesAreAttached(
|
||||||
@ -774,7 +761,7 @@ func (oe *operationExecutor) VerifyVolumesAreAttached(
|
|||||||
|
|
||||||
// Ugly hack to ensure - we don't do parallel bulk polling of same volume plugin
|
// Ugly hack to ensure - we don't do parallel bulk polling of same volume plugin
|
||||||
uniquePluginName := v1.UniqueVolumeName(pluginName)
|
uniquePluginName := v1.UniqueVolumeName(pluginName)
|
||||||
err = oe.pendingOperations.Run(uniquePluginName, "" /* Pod Name */, "" /* nodeName */, generatedOperations)
|
err = oe.pendingOperations.Run(uniquePluginName, "" /* Pod Name */, generatedOperations)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
klog.Errorf("BulkVerifyVolumes.Run Error bulk volume verification for plugin %q with %v", pluginName, err)
|
klog.Errorf("BulkVerifyVolumes.Run Error bulk volume verification for plugin %q with %v", pluginName, err)
|
||||||
}
|
}
|
||||||
@ -792,7 +779,7 @@ func (oe *operationExecutor) VerifyVolumesAreAttachedPerNode(
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Give an empty UniqueVolumeName so that this operation could be executed concurrently.
|
// Give an empty UniqueVolumeName so that this operation could be executed concurrently.
|
||||||
return oe.pendingOperations.Run("" /* volumeName */, "" /* podName */, "" /* nodeName */, generatedOperations)
|
return oe.pendingOperations.Run("" /* volumeName */, "" /* podName */, generatedOperations)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (oe *operationExecutor) MountVolume(
|
func (oe *operationExecutor) MountVolume(
|
||||||
@ -833,7 +820,7 @@ func (oe *operationExecutor) MountVolume(
|
|||||||
|
|
||||||
// TODO mount_device
|
// TODO mount_device
|
||||||
return oe.pendingOperations.Run(
|
return oe.pendingOperations.Run(
|
||||||
volumeToMount.VolumeName, podName, "" /* nodeName */, generatedOperations)
|
volumeToMount.VolumeName, podName, generatedOperations)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (oe *operationExecutor) UnmountVolume(
|
func (oe *operationExecutor) UnmountVolume(
|
||||||
@ -864,7 +851,7 @@ func (oe *operationExecutor) UnmountVolume(
|
|||||||
podName := volumetypes.UniquePodName(volumeToUnmount.PodUID)
|
podName := volumetypes.UniquePodName(volumeToUnmount.PodUID)
|
||||||
|
|
||||||
return oe.pendingOperations.Run(
|
return oe.pendingOperations.Run(
|
||||||
volumeToUnmount.VolumeName, podName, "" /* nodeName */, generatedOperations)
|
volumeToUnmount.VolumeName, podName, generatedOperations)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (oe *operationExecutor) UnmountDevice(
|
func (oe *operationExecutor) UnmountDevice(
|
||||||
@ -895,7 +882,7 @@ func (oe *operationExecutor) UnmountDevice(
|
|||||||
podName := nestedpendingoperations.EmptyUniquePodName
|
podName := nestedpendingoperations.EmptyUniquePodName
|
||||||
|
|
||||||
return oe.pendingOperations.Run(
|
return oe.pendingOperations.Run(
|
||||||
deviceToDetach.VolumeName, podName, "" /* nodeName */, generatedOperations)
|
deviceToDetach.VolumeName, podName, generatedOperations)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (oe *operationExecutor) ExpandInUseVolume(volumeToMount VolumeToMount, actualStateOfWorld ActualStateOfWorldMounterUpdater) error {
|
func (oe *operationExecutor) ExpandInUseVolume(volumeToMount VolumeToMount, actualStateOfWorld ActualStateOfWorldMounterUpdater) error {
|
||||||
@ -903,7 +890,7 @@ func (oe *operationExecutor) ExpandInUseVolume(volumeToMount VolumeToMount, actu
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
return oe.pendingOperations.Run(volumeToMount.VolumeName, "", "" /* nodeName */, generatedOperations)
|
return oe.pendingOperations.Run(volumeToMount.VolumeName, "", generatedOperations)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (oe *operationExecutor) VerifyControllerAttachedVolume(
|
func (oe *operationExecutor) VerifyControllerAttachedVolume(
|
||||||
@ -917,7 +904,7 @@ func (oe *operationExecutor) VerifyControllerAttachedVolume(
|
|||||||
}
|
}
|
||||||
|
|
||||||
return oe.pendingOperations.Run(
|
return oe.pendingOperations.Run(
|
||||||
volumeToMount.VolumeName, "" /* podName */, "" /* nodeName */, generatedOperations)
|
volumeToMount.VolumeName, "" /* podName */, generatedOperations)
|
||||||
}
|
}
|
||||||
|
|
||||||
// ReconstructVolumeOperation return a func to create volumeSpec from mount path
|
// ReconstructVolumeOperation return a func to create volumeSpec from mount path
|
||||||
|
@ -17,7 +17,6 @@ limitations under the License.
|
|||||||
package operationexecutor
|
package operationexecutor
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"fmt"
|
|
||||||
"strconv"
|
"strconv"
|
||||||
"testing"
|
"testing"
|
||||||
"time"
|
"time"
|
||||||
@ -181,7 +180,7 @@ func TestOperationExecutor_UnmountDeviceConcurrently(t *testing.T) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestOperationExecutor_AttachSingleNodeVolumeConcurrentlyToSameNode(t *testing.T) {
|
func TestOperationExecutor_AttachVolumeConcurrently(t *testing.T) {
|
||||||
// Arrange
|
// Arrange
|
||||||
ch, quit, oe := setup()
|
ch, quit, oe := setup()
|
||||||
volumesToAttach := make([]VolumeToAttach, numVolumesToAttach)
|
volumesToAttach := make([]VolumeToAttach, numVolumesToAttach)
|
||||||
@ -192,13 +191,6 @@ func TestOperationExecutor_AttachSingleNodeVolumeConcurrentlyToSameNode(t *testi
|
|||||||
volumesToAttach[i] = VolumeToAttach{
|
volumesToAttach[i] = VolumeToAttach{
|
||||||
VolumeName: v1.UniqueVolumeName(pdName),
|
VolumeName: v1.UniqueVolumeName(pdName),
|
||||||
NodeName: "node",
|
NodeName: "node",
|
||||||
VolumeSpec: &volume.Spec{
|
|
||||||
PersistentVolume: &v1.PersistentVolume{
|
|
||||||
Spec: v1.PersistentVolumeSpec{
|
|
||||||
AccessModes: []v1.PersistentVolumeAccessMode{v1.ReadWriteOnce},
|
|
||||||
},
|
|
||||||
},
|
|
||||||
},
|
|
||||||
}
|
}
|
||||||
oe.AttachVolume(volumesToAttach[i], nil /* actualStateOfWorldAttacherUpdater */)
|
oe.AttachVolume(volumesToAttach[i], nil /* actualStateOfWorldAttacherUpdater */)
|
||||||
}
|
}
|
||||||
@ -209,91 +201,7 @@ func TestOperationExecutor_AttachSingleNodeVolumeConcurrentlyToSameNode(t *testi
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestOperationExecutor_AttachMultiNodeVolumeConcurrentlyToSameNode(t *testing.T) {
|
func TestOperationExecutor_DetachVolumeConcurrently(t *testing.T) {
|
||||||
// Arrange
|
|
||||||
ch, quit, oe := setup()
|
|
||||||
volumesToAttach := make([]VolumeToAttach, numVolumesToAttach)
|
|
||||||
pdName := "pd-volume"
|
|
||||||
|
|
||||||
// Act
|
|
||||||
for i := range volumesToAttach {
|
|
||||||
volumesToAttach[i] = VolumeToAttach{
|
|
||||||
VolumeName: v1.UniqueVolumeName(pdName),
|
|
||||||
NodeName: "node",
|
|
||||||
VolumeSpec: &volume.Spec{
|
|
||||||
PersistentVolume: &v1.PersistentVolume{
|
|
||||||
Spec: v1.PersistentVolumeSpec{
|
|
||||||
AccessModes: []v1.PersistentVolumeAccessMode{v1.ReadOnlyMany},
|
|
||||||
},
|
|
||||||
},
|
|
||||||
},
|
|
||||||
}
|
|
||||||
oe.AttachVolume(volumesToAttach[i], nil /* actualStateOfWorldAttacherUpdater */)
|
|
||||||
}
|
|
||||||
|
|
||||||
// Assert
|
|
||||||
if !isOperationRunSerially(ch, quit) {
|
|
||||||
t.Fatalf("Attach volume operations should not start concurrently")
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestOperationExecutor_AttachSingleNodeVolumeConcurrentlyToDifferentNodes(t *testing.T) {
|
|
||||||
// Arrange
|
|
||||||
ch, quit, oe := setup()
|
|
||||||
volumesToAttach := make([]VolumeToAttach, numVolumesToAttach)
|
|
||||||
pdName := "pd-volume"
|
|
||||||
|
|
||||||
// Act
|
|
||||||
for i := range volumesToAttach {
|
|
||||||
volumesToAttach[i] = VolumeToAttach{
|
|
||||||
VolumeName: v1.UniqueVolumeName(pdName),
|
|
||||||
NodeName: types.NodeName(fmt.Sprintf("node%d", i)),
|
|
||||||
VolumeSpec: &volume.Spec{
|
|
||||||
PersistentVolume: &v1.PersistentVolume{
|
|
||||||
Spec: v1.PersistentVolumeSpec{
|
|
||||||
AccessModes: []v1.PersistentVolumeAccessMode{v1.ReadWriteOnce},
|
|
||||||
},
|
|
||||||
},
|
|
||||||
},
|
|
||||||
}
|
|
||||||
oe.AttachVolume(volumesToAttach[i], nil /* actualStateOfWorldAttacherUpdater */)
|
|
||||||
}
|
|
||||||
|
|
||||||
// Assert
|
|
||||||
if !isOperationRunSerially(ch, quit) {
|
|
||||||
t.Fatalf("Attach volume operations should not start concurrently")
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestOperationExecutor_AttachMultiNodeVolumeConcurrentlyToDifferentNodes(t *testing.T) {
|
|
||||||
// Arrange
|
|
||||||
ch, quit, oe := setup()
|
|
||||||
volumesToAttach := make([]VolumeToAttach, numVolumesToAttach)
|
|
||||||
pdName := "pd-volume"
|
|
||||||
|
|
||||||
// Act
|
|
||||||
for i := range volumesToAttach {
|
|
||||||
volumesToAttach[i] = VolumeToAttach{
|
|
||||||
VolumeName: v1.UniqueVolumeName(pdName),
|
|
||||||
NodeName: types.NodeName(fmt.Sprintf("node%d", i)),
|
|
||||||
VolumeSpec: &volume.Spec{
|
|
||||||
PersistentVolume: &v1.PersistentVolume{
|
|
||||||
Spec: v1.PersistentVolumeSpec{
|
|
||||||
AccessModes: []v1.PersistentVolumeAccessMode{v1.ReadOnlyMany},
|
|
||||||
},
|
|
||||||
},
|
|
||||||
},
|
|
||||||
}
|
|
||||||
oe.AttachVolume(volumesToAttach[i], nil /* actualStateOfWorldAttacherUpdater */)
|
|
||||||
}
|
|
||||||
|
|
||||||
// Assert
|
|
||||||
if !isOperationRunConcurrently(ch, quit, numVolumesToAttach) {
|
|
||||||
t.Fatalf("Attach volume operations should not execute serially")
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestOperationExecutor_DetachSingleNodeVolumeConcurrentlyFromSameNode(t *testing.T) {
|
|
||||||
// Arrange
|
// Arrange
|
||||||
ch, quit, oe := setup()
|
ch, quit, oe := setup()
|
||||||
attachedVolumes := make([]AttachedVolume, numVolumesToDetach)
|
attachedVolumes := make([]AttachedVolume, numVolumesToDetach)
|
||||||
@ -304,13 +212,6 @@ func TestOperationExecutor_DetachSingleNodeVolumeConcurrentlyFromSameNode(t *tes
|
|||||||
attachedVolumes[i] = AttachedVolume{
|
attachedVolumes[i] = AttachedVolume{
|
||||||
VolumeName: v1.UniqueVolumeName(pdName),
|
VolumeName: v1.UniqueVolumeName(pdName),
|
||||||
NodeName: "node",
|
NodeName: "node",
|
||||||
VolumeSpec: &volume.Spec{
|
|
||||||
PersistentVolume: &v1.PersistentVolume{
|
|
||||||
Spec: v1.PersistentVolumeSpec{
|
|
||||||
AccessModes: []v1.PersistentVolumeAccessMode{v1.ReadWriteOnce},
|
|
||||||
},
|
|
||||||
},
|
|
||||||
},
|
|
||||||
}
|
}
|
||||||
oe.DetachVolume(attachedVolumes[i], true /* verifySafeToDetach */, nil /* actualStateOfWorldAttacherUpdater */)
|
oe.DetachVolume(attachedVolumes[i], true /* verifySafeToDetach */, nil /* actualStateOfWorldAttacherUpdater */)
|
||||||
}
|
}
|
||||||
@ -321,91 +222,7 @@ func TestOperationExecutor_DetachSingleNodeVolumeConcurrentlyFromSameNode(t *tes
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestOperationExecutor_DetachMultiNodeVolumeConcurrentlyFromSameNode(t *testing.T) {
|
func TestOperationExecutor_VerifyVolumesAreAttachedConcurrently(t *testing.T) {
|
||||||
// Arrange
|
|
||||||
ch, quit, oe := setup()
|
|
||||||
attachedVolumes := make([]AttachedVolume, numVolumesToDetach)
|
|
||||||
pdName := "pd-volume"
|
|
||||||
|
|
||||||
// Act
|
|
||||||
for i := range attachedVolumes {
|
|
||||||
attachedVolumes[i] = AttachedVolume{
|
|
||||||
VolumeName: v1.UniqueVolumeName(pdName),
|
|
||||||
NodeName: "node",
|
|
||||||
VolumeSpec: &volume.Spec{
|
|
||||||
PersistentVolume: &v1.PersistentVolume{
|
|
||||||
Spec: v1.PersistentVolumeSpec{
|
|
||||||
AccessModes: []v1.PersistentVolumeAccessMode{v1.ReadOnlyMany},
|
|
||||||
},
|
|
||||||
},
|
|
||||||
},
|
|
||||||
}
|
|
||||||
oe.DetachVolume(attachedVolumes[i], true /* verifySafeToDetach */, nil /* actualStateOfWorldAttacherUpdater */)
|
|
||||||
}
|
|
||||||
|
|
||||||
// Assert
|
|
||||||
if !isOperationRunSerially(ch, quit) {
|
|
||||||
t.Fatalf("DetachVolume operations should not run concurrently")
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestOperationExecutor_DetachSingleNodeVolumeConcurrentlyFromDifferentNodes(t *testing.T) {
|
|
||||||
// Arrange
|
|
||||||
ch, quit, oe := setup()
|
|
||||||
attachedVolumes := make([]AttachedVolume, numVolumesToDetach)
|
|
||||||
pdName := "pd-volume"
|
|
||||||
|
|
||||||
// Act
|
|
||||||
for i := range attachedVolumes {
|
|
||||||
attachedVolumes[i] = AttachedVolume{
|
|
||||||
VolumeName: v1.UniqueVolumeName(pdName),
|
|
||||||
NodeName: types.NodeName(fmt.Sprintf("node%d", i)),
|
|
||||||
VolumeSpec: &volume.Spec{
|
|
||||||
PersistentVolume: &v1.PersistentVolume{
|
|
||||||
Spec: v1.PersistentVolumeSpec{
|
|
||||||
AccessModes: []v1.PersistentVolumeAccessMode{v1.ReadWriteOnce},
|
|
||||||
},
|
|
||||||
},
|
|
||||||
},
|
|
||||||
}
|
|
||||||
oe.DetachVolume(attachedVolumes[i], true /* verifySafeToDetach */, nil /* actualStateOfWorldAttacherUpdater */)
|
|
||||||
}
|
|
||||||
|
|
||||||
// Assert
|
|
||||||
if !isOperationRunSerially(ch, quit) {
|
|
||||||
t.Fatalf("DetachVolume operations should not run concurrently")
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestOperationExecutor_DetachMultiNodeVolumeConcurrentlyFromDifferentNodes(t *testing.T) {
|
|
||||||
// Arrange
|
|
||||||
ch, quit, oe := setup()
|
|
||||||
attachedVolumes := make([]AttachedVolume, numVolumesToDetach)
|
|
||||||
pdName := "pd-volume"
|
|
||||||
|
|
||||||
// Act
|
|
||||||
for i := range attachedVolumes {
|
|
||||||
attachedVolumes[i] = AttachedVolume{
|
|
||||||
VolumeName: v1.UniqueVolumeName(pdName),
|
|
||||||
NodeName: types.NodeName(fmt.Sprintf("node%d", i)),
|
|
||||||
VolumeSpec: &volume.Spec{
|
|
||||||
PersistentVolume: &v1.PersistentVolume{
|
|
||||||
Spec: v1.PersistentVolumeSpec{
|
|
||||||
AccessModes: []v1.PersistentVolumeAccessMode{v1.ReadOnlyMany},
|
|
||||||
},
|
|
||||||
},
|
|
||||||
},
|
|
||||||
}
|
|
||||||
oe.DetachVolume(attachedVolumes[i], true /* verifySafeToDetach */, nil /* actualStateOfWorldAttacherUpdater */)
|
|
||||||
}
|
|
||||||
|
|
||||||
// Assert
|
|
||||||
if !isOperationRunConcurrently(ch, quit, numVolumesToDetach) {
|
|
||||||
t.Fatalf("Attach volume operations should not execute serially")
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestOperationExecutor_VerifyVolumesAreAttachedConcurrentlyOnSameNode(t *testing.T) {
|
|
||||||
// Arrange
|
// Arrange
|
||||||
ch, quit, oe := setup()
|
ch, quit, oe := setup()
|
||||||
|
|
||||||
@ -420,24 +237,6 @@ func TestOperationExecutor_VerifyVolumesAreAttachedConcurrentlyOnSameNode(t *tes
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestOperationExecutor_VerifyVolumesAreAttachedConcurrentlyOnDifferentNodes(t *testing.T) {
|
|
||||||
// Arrange
|
|
||||||
ch, quit, oe := setup()
|
|
||||||
|
|
||||||
// Act
|
|
||||||
for i := 0; i < numVolumesToVerifyAttached; i++ {
|
|
||||||
oe.VerifyVolumesAreAttachedPerNode(
|
|
||||||
nil, /* attachedVolumes */
|
|
||||||
types.NodeName(fmt.Sprintf("node-name-%d", i)),
|
|
||||||
nil /* actualStateOfWorldAttacherUpdater */)
|
|
||||||
}
|
|
||||||
|
|
||||||
// Assert
|
|
||||||
if !isOperationRunConcurrently(ch, quit, numVolumesToVerifyAttached) {
|
|
||||||
t.Fatalf("VerifyVolumesAreAttached operation is not being run concurrently")
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestOperationExecutor_VerifyControllerAttachedVolumeConcurrently(t *testing.T) {
|
func TestOperationExecutor_VerifyControllerAttachedVolumeConcurrently(t *testing.T) {
|
||||||
// Arrange
|
// Arrange
|
||||||
ch, quit, oe := setup()
|
ch, quit, oe := setup()
|
||||||
|
@ -644,44 +644,3 @@ func WriteVolumeCache(deviceMountPath string, exec utilexec.Interface) error {
|
|||||||
// For linux runtime, it skips because unmount will automatically flush disk data
|
// For linux runtime, it skips because unmount will automatically flush disk data
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// IsMultiAttachForbidden checks if attaching this volume to multiple nodes is definitely not allowed/possible.
|
|
||||||
// In its current form, this function can only reliably say for which volumes it's definitely forbidden. If it returns
|
|
||||||
// false, it is not guaranteed that multi-attach is actually supported by the volume type and we must rely on the
|
|
||||||
// attacher to fail fast in such cases.
|
|
||||||
// Please see https://github.com/kubernetes/kubernetes/issues/40669 and https://github.com/kubernetes/kubernetes/pull/40148#discussion_r98055047
|
|
||||||
func IsMultiAttachForbidden(volumeSpec *volume.Spec) bool {
|
|
||||||
if volumeSpec == nil {
|
|
||||||
// we don't know if it's supported or not and let the attacher fail later in cases it's not supported
|
|
||||||
return false
|
|
||||||
}
|
|
||||||
|
|
||||||
if volumeSpec.Volume != nil {
|
|
||||||
// Check for volume types which are known to fail slow or cause trouble when trying to multi-attach
|
|
||||||
if volumeSpec.Volume.AzureDisk != nil ||
|
|
||||||
volumeSpec.Volume.Cinder != nil {
|
|
||||||
return true
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// Only if this volume is a persistent volume, we have reliable information on whether it's allowed or not to
|
|
||||||
// multi-attach. We trust in the individual volume implementations to not allow unsupported access modes
|
|
||||||
if volumeSpec.PersistentVolume != nil {
|
|
||||||
// Check for persistent volume types which do not fail when trying to multi-attach
|
|
||||||
if len(volumeSpec.PersistentVolume.Spec.AccessModes) == 0 {
|
|
||||||
// No access mode specified so we don't know for sure. Let the attacher fail if needed
|
|
||||||
return false
|
|
||||||
}
|
|
||||||
|
|
||||||
// check if this volume is allowed to be attached to multiple PODs/nodes, if yes, return false
|
|
||||||
for _, accessMode := range volumeSpec.PersistentVolume.Spec.AccessModes {
|
|
||||||
if accessMode == v1.ReadWriteMany || accessMode == v1.ReadOnlyMany {
|
|
||||||
return false
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return true
|
|
||||||
}
|
|
||||||
|
|
||||||
// we don't know if it's supported or not and let the attacher fail later in cases it's not supported
|
|
||||||
return false
|
|
||||||
}
|
|
||||||
|
Loading…
Reference in New Issue
Block a user