diff --git a/pkg/volume/util/nestedpendingoperations/BUILD b/pkg/volume/util/nestedpendingoperations/BUILD index 945d33dfdf2..5e940356c8d 100644 --- a/pkg/volume/util/nestedpendingoperations/BUILD +++ b/pkg/volume/util/nestedpendingoperations/BUILD @@ -24,6 +24,7 @@ go_test( srcs = ["nestedpendingoperations_test.go"], embed = [":go_default_library"], deps = [ + "//pkg/util/goroutinemap/exponentialbackoff:go_default_library", "//pkg/volume/util/types:go_default_library", "//staging/src/k8s.io/api/core/v1:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/util/wait:go_default_library", diff --git a/pkg/volume/util/nestedpendingoperations/nestedpendingoperations.go b/pkg/volume/util/nestedpendingoperations/nestedpendingoperations.go index d9d277e0b21..ee9d11978a6 100644 --- a/pkg/volume/util/nestedpendingoperations/nestedpendingoperations.go +++ b/pkg/volume/util/nestedpendingoperations/nestedpendingoperations.go @@ -47,8 +47,11 @@ const ( type NestedPendingOperations interface { // Run adds the concatenation of volumeName and podName to the list of // running operations and spawns a new go routine to execute operationFunc. - // If an operation with the same volumeName and same or empty podName - // exists, an AlreadyExists or ExponentialBackoff error is returned. + // If an operation with the same volumeName, same or empty podName + // and same operationName exits, an AlreadyExists or ExponentialBackoff + // error is returned. If an operation with same volumeName and podName + // has ExponentialBackoff error but operationName is different, exponential + // backoff is reset and operation is allowed to proceed. // This enables multiple operations to execute in parallel for the same // volumeName as long as they have different podName. // Once the operation is complete, the go routine is terminated and the @@ -87,6 +90,7 @@ type nestedPendingOperations struct { type operation struct { volumeName v1.UniqueVolumeName podName types.UniquePodName + operationName string operationPending bool expBackoff exponentialbackoff.ExponentialBackoff } @@ -103,13 +107,19 @@ func (grm *nestedPendingOperations) Run( // Operation already exists if previousOp.operationPending { // Operation is pending - operationName := getOperationName(volumeName, podName) - return NewAlreadyExistsError(operationName) + operationKey := getOperationKey(volumeName, podName) + return NewAlreadyExistsError(operationKey) } - operationName := getOperationName(volumeName, podName) - if err := previousOp.expBackoff.SafeToRetry(operationName); err != nil { - return err + operationKey := getOperationKey(volumeName, podName) + backOffErr := previousOp.expBackoff.SafeToRetry(operationKey) + if backOffErr != nil { + if previousOp.operationName == generatedOperations.OperationName { + return backOffErr + } + // previous operation and new operation are different. reset op. name and exp. backoff + grm.operations[previousOpIndex].operationName = generatedOperations.OperationName + grm.operations[previousOpIndex].expBackoff = exponentialbackoff.ExponentialBackoff{} } // Update existing operation to mark as pending. @@ -123,6 +133,7 @@ func (grm *nestedPendingOperations) Run( operationPending: true, volumeName: volumeName, podName: podName, + operationName: generatedOperations.OperationName, expBackoff: exponentialbackoff.ExponentialBackoff{}, }) } @@ -201,8 +212,8 @@ func (grm *nestedPendingOperations) getOperation( } } - logOperationName := getOperationName(volumeName, podName) - return 0, fmt.Errorf("Operation %q not found", logOperationName) + logOperationKey := getOperationKey(volumeName, podName) + return 0, fmt.Errorf("Operation %q not found", logOperationKey) } func (grm *nestedPendingOperations) deleteOperation( @@ -239,9 +250,9 @@ func (grm *nestedPendingOperations) operationComplete( grm.deleteOperation(volumeName, podName) if *err != nil { // Log error - logOperationName := getOperationName(volumeName, podName) + logOperationKey := getOperationKey(volumeName, podName) klog.Errorf("operation %s failed with: %v", - logOperationName, + logOperationKey, *err) } return @@ -251,9 +262,9 @@ func (grm *nestedPendingOperations) operationComplete( existingOpIndex, getOpErr := grm.getOperation(volumeName, podName) if getOpErr != nil { // Failed to find existing operation - logOperationName := getOperationName(volumeName, podName) + logOperationKey := getOperationKey(volumeName, podName) klog.Errorf("Operation %s completed. error: %v. exponentialBackOffOnError is enabled, but failed to get operation to update.", - logOperationName, + logOperationKey, *err) return } @@ -262,10 +273,10 @@ func (grm *nestedPendingOperations) operationComplete( grm.operations[existingOpIndex].operationPending = false // Log error - operationName := - getOperationName(volumeName, podName) + operationKey := + getOperationKey(volumeName, podName) klog.Errorf("%v", grm.operations[existingOpIndex].expBackoff. - GenerateNoRetriesPermittedMsg(operationName)) + GenerateNoRetriesPermittedMsg(operationKey)) } func (grm *nestedPendingOperations) Wait() { @@ -277,7 +288,7 @@ func (grm *nestedPendingOperations) Wait() { } } -func getOperationName( +func getOperationKey( volumeName v1.UniqueVolumeName, podName types.UniquePodName) string { podNameStr := "" if podName != EmptyUniquePodName { @@ -290,8 +301,8 @@ func getOperationName( } // NewAlreadyExistsError returns a new instance of AlreadyExists error. -func NewAlreadyExistsError(operationName string) error { - return alreadyExistsError{operationName} +func NewAlreadyExistsError(operationKey string) error { + return alreadyExistsError{operationKey} } // IsAlreadyExists returns true if an error returned from @@ -310,7 +321,7 @@ func IsAlreadyExists(err error) bool { // new operation can not be started because an operation with the same operation // name is already executing. type alreadyExistsError struct { - operationName string + operationKey string } var _ error = alreadyExistsError{} @@ -318,5 +329,5 @@ var _ error = alreadyExistsError{} func (err alreadyExistsError) Error() string { return fmt.Sprintf( "Failed to create operation with name %q. An operation with that name is already executing.", - err.operationName) + err.operationKey) } diff --git a/pkg/volume/util/nestedpendingoperations/nestedpendingoperations_test.go b/pkg/volume/util/nestedpendingoperations/nestedpendingoperations_test.go index 5865f96c21a..fe52c897798 100644 --- a/pkg/volume/util/nestedpendingoperations/nestedpendingoperations_test.go +++ b/pkg/volume/util/nestedpendingoperations/nestedpendingoperations_test.go @@ -23,6 +23,7 @@ import ( "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/util/wait" + "k8s.io/kubernetes/pkg/util/goroutinemap/exponentialbackoff" "k8s.io/kubernetes/pkg/volume/util/types" ) @@ -264,6 +265,47 @@ func Test_NewGoRoutineMap_Negative_SecondOpBeforeFirstCompletes(t *testing.T) { } } +func Test_NewGoRoutineMap_Negative_SecondThirdOpWithDifferentNames(t *testing.T) { + // Arrange + grm := NewNestedPendingOperations(true /* exponentialBackOffOnError */) + volumeName := v1.UniqueVolumeName("volume-name") + op1Name := "mount_volume" + operation1 := generateErrorFunc() + err1 := grm.Run(volumeName, "" /* operationSubName */, types.GeneratedOperations{OperationFunc: operation1, OperationName: op1Name}) + if err1 != nil { + t.Fatalf("NewGoRoutine failed. Expected: Actual: <%v>", err1) + } + // Shorter than exponential backoff period, so as to trigger exponential backoff error on second + // operation. + operation2 := generateErrorFunc() + err2 := retryWithExponentialBackOff( + initialOperationWaitTimeShort, + func() (bool, error) { + err := grm.Run(volumeName, + "", /* operationSubName */ + types.GeneratedOperations{OperationFunc: operation2, OperationName: op1Name}) + + if exponentialbackoff.IsExponentialBackoff(err) { + return true, nil + } + return false, nil + }, + ) + + // Assert + if err2 != nil { + t.Fatalf("Expected NewGoRoutine to fail with exponential backoff for operationKey : %s and operationName : %s", volumeName, op1Name) + } + + operation3 := generateNoopFunc() + op3Name := "unmount_volume" + // Act + err3 := grm.Run(volumeName, "" /*pod name*/, types.GeneratedOperations{OperationFunc: operation3, OperationName: op3Name}) + if err3 != nil { + t.Fatalf("NewGoRoutine failed. Expected Actual: <%v>", err3) + } +} + func Test_NewGoRoutineMap_Negative_SecondSubOpBeforeFirstCompletes2(t *testing.T) { // Arrange grm := NewNestedPendingOperations(false /* exponentialBackOffOnError */) @@ -542,6 +584,12 @@ func generatePanicFunc() func() (error, error) { } } +func generateErrorFunc() func() (error, error) { + return func() (error, error) { + return fmt.Errorf("placholder1"), fmt.Errorf("placeholder2") + } +} + func generateNoopFunc() func() (error, error) { return func() (error, error) { return nil, nil } } diff --git a/pkg/volume/util/operationexecutor/operation_generator.go b/pkg/volume/util/operationexecutor/operation_generator.go index 91e4825329d..05da765fc04 100644 --- a/pkg/volume/util/operationexecutor/operation_generator.go +++ b/pkg/volume/util/operationexecutor/operation_generator.go @@ -707,6 +707,7 @@ func (og *operationGenerator) GenerateMountVolumeFunc( } return volumetypes.GeneratedOperations{ + OperationName: "volume_mount", OperationFunc: mountVolumeFunc, EventRecorderFunc: eventRecorderFunc, CompleteFunc: util.OperationCompleteHook(util.GetFullQualifiedPluginNameForVolume(volumePluginName, volumeToMount.VolumeSpec), "volume_mount"), @@ -834,6 +835,7 @@ func (og *operationGenerator) GenerateUnmountVolumeFunc( } return volumetypes.GeneratedOperations{ + OperationName: "volume_unmount", OperationFunc: unmountVolumeFunc, CompleteFunc: util.OperationCompleteHook(util.GetFullQualifiedPluginNameForVolume(volumePlugin.GetPluginName(), volumeToUnmount.VolumeSpec), "volume_unmount"), EventRecorderFunc: nil, // nil because we do not want to generate event on error @@ -925,6 +927,7 @@ func (og *operationGenerator) GenerateUnmountDeviceFunc( } return volumetypes.GeneratedOperations{ + OperationName: "unmount_device", OperationFunc: unmountDeviceFunc, CompleteFunc: util.OperationCompleteHook(util.GetFullQualifiedPluginNameForVolume(deviceMountableVolumePlugin.GetPluginName(), deviceToDetach.VolumeSpec), "unmount_device"), EventRecorderFunc: nil, // nil because we do not want to generate event on error diff --git a/pkg/volume/util/types/types.go b/pkg/volume/util/types/types.go index 9815545ff60..b5f1009bee7 100644 --- a/pkg/volume/util/types/types.go +++ b/pkg/volume/util/types/types.go @@ -28,6 +28,8 @@ type UniquePVCName types.UID // GeneratedOperations contains the operation that is created as well as // supporting functions required for the operation executor type GeneratedOperations struct { + // Name of operation - could be used for resetting shared exponential backoff + OperationName string OperationFunc func() (eventErr error, detailedErr error) EventRecorderFunc func(*error) CompleteFunc func(*error)