Allow different operation names

This commit is contained in:
Hemant Kumar 2019-03-08 15:37:25 -05:00
parent 583ff363fa
commit 97ec61561a
5 changed files with 86 additions and 21 deletions

View File

@ -24,6 +24,7 @@ go_test(
srcs = ["nestedpendingoperations_test.go"], srcs = ["nestedpendingoperations_test.go"],
embed = [":go_default_library"], embed = [":go_default_library"],
deps = [ deps = [
"//pkg/util/goroutinemap/exponentialbackoff:go_default_library",
"//pkg/volume/util/types:go_default_library", "//pkg/volume/util/types:go_default_library",
"//staging/src/k8s.io/api/core/v1:go_default_library", "//staging/src/k8s.io/api/core/v1:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/wait:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/util/wait:go_default_library",

View File

@ -47,8 +47,11 @@ const (
type NestedPendingOperations interface { type NestedPendingOperations interface {
// Run adds the concatenation of volumeName and podName to the list of // Run adds the concatenation of volumeName and podName to the list of
// running operations and spawns a new go routine to execute operationFunc. // running operations and spawns a new go routine to execute operationFunc.
// If an operation with the same volumeName and same or empty podName // If an operation with the same volumeName, same or empty podName
// exists, an AlreadyExists or ExponentialBackoff error is returned. // and same operationName exits, an AlreadyExists or ExponentialBackoff
// error is returned. If an operation with same volumeName and podName
// has ExponentialBackoff error but operationName is different, exponential
// backoff is reset and operation is allowed to proceed.
// This enables multiple operations to execute in parallel for the same // This enables multiple operations to execute in parallel for the same
// volumeName as long as they have different podName. // volumeName as long as they have different podName.
// Once the operation is complete, the go routine is terminated and the // Once the operation is complete, the go routine is terminated and the
@ -87,6 +90,7 @@ type nestedPendingOperations struct {
type operation struct { type operation struct {
volumeName v1.UniqueVolumeName volumeName v1.UniqueVolumeName
podName types.UniquePodName podName types.UniquePodName
operationName string
operationPending bool operationPending bool
expBackoff exponentialbackoff.ExponentialBackoff expBackoff exponentialbackoff.ExponentialBackoff
} }
@ -103,13 +107,19 @@ func (grm *nestedPendingOperations) Run(
// Operation already exists // Operation already exists
if previousOp.operationPending { if previousOp.operationPending {
// Operation is pending // Operation is pending
operationName := getOperationName(volumeName, podName) operationKey := getOperationKey(volumeName, podName)
return NewAlreadyExistsError(operationName) return NewAlreadyExistsError(operationKey)
} }
operationName := getOperationName(volumeName, podName) operationKey := getOperationKey(volumeName, podName)
if err := previousOp.expBackoff.SafeToRetry(operationName); err != nil { backOffErr := previousOp.expBackoff.SafeToRetry(operationKey)
return err if backOffErr != nil {
if previousOp.operationName == generatedOperations.OperationName {
return backOffErr
}
// previous operation and new operation are different. reset op. name and exp. backoff
grm.operations[previousOpIndex].operationName = generatedOperations.OperationName
grm.operations[previousOpIndex].expBackoff = exponentialbackoff.ExponentialBackoff{}
} }
// Update existing operation to mark as pending. // Update existing operation to mark as pending.
@ -123,6 +133,7 @@ func (grm *nestedPendingOperations) Run(
operationPending: true, operationPending: true,
volumeName: volumeName, volumeName: volumeName,
podName: podName, podName: podName,
operationName: generatedOperations.OperationName,
expBackoff: exponentialbackoff.ExponentialBackoff{}, expBackoff: exponentialbackoff.ExponentialBackoff{},
}) })
} }
@ -201,8 +212,8 @@ func (grm *nestedPendingOperations) getOperation(
} }
} }
logOperationName := getOperationName(volumeName, podName) logOperationKey := getOperationKey(volumeName, podName)
return 0, fmt.Errorf("Operation %q not found", logOperationName) return 0, fmt.Errorf("Operation %q not found", logOperationKey)
} }
func (grm *nestedPendingOperations) deleteOperation( func (grm *nestedPendingOperations) deleteOperation(
@ -239,9 +250,9 @@ func (grm *nestedPendingOperations) operationComplete(
grm.deleteOperation(volumeName, podName) grm.deleteOperation(volumeName, podName)
if *err != nil { if *err != nil {
// Log error // Log error
logOperationName := getOperationName(volumeName, podName) logOperationKey := getOperationKey(volumeName, podName)
klog.Errorf("operation %s failed with: %v", klog.Errorf("operation %s failed with: %v",
logOperationName, logOperationKey,
*err) *err)
} }
return return
@ -251,9 +262,9 @@ func (grm *nestedPendingOperations) operationComplete(
existingOpIndex, getOpErr := grm.getOperation(volumeName, podName) existingOpIndex, getOpErr := grm.getOperation(volumeName, podName)
if getOpErr != nil { if getOpErr != nil {
// Failed to find existing operation // Failed to find existing operation
logOperationName := getOperationName(volumeName, podName) logOperationKey := getOperationKey(volumeName, podName)
klog.Errorf("Operation %s completed. error: %v. exponentialBackOffOnError is enabled, but failed to get operation to update.", klog.Errorf("Operation %s completed. error: %v. exponentialBackOffOnError is enabled, but failed to get operation to update.",
logOperationName, logOperationKey,
*err) *err)
return return
} }
@ -262,10 +273,10 @@ func (grm *nestedPendingOperations) operationComplete(
grm.operations[existingOpIndex].operationPending = false grm.operations[existingOpIndex].operationPending = false
// Log error // Log error
operationName := operationKey :=
getOperationName(volumeName, podName) getOperationKey(volumeName, podName)
klog.Errorf("%v", grm.operations[existingOpIndex].expBackoff. klog.Errorf("%v", grm.operations[existingOpIndex].expBackoff.
GenerateNoRetriesPermittedMsg(operationName)) GenerateNoRetriesPermittedMsg(operationKey))
} }
func (grm *nestedPendingOperations) Wait() { func (grm *nestedPendingOperations) Wait() {
@ -277,7 +288,7 @@ func (grm *nestedPendingOperations) Wait() {
} }
} }
func getOperationName( func getOperationKey(
volumeName v1.UniqueVolumeName, podName types.UniquePodName) string { volumeName v1.UniqueVolumeName, podName types.UniquePodName) string {
podNameStr := "" podNameStr := ""
if podName != EmptyUniquePodName { if podName != EmptyUniquePodName {
@ -290,8 +301,8 @@ func getOperationName(
} }
// NewAlreadyExistsError returns a new instance of AlreadyExists error. // NewAlreadyExistsError returns a new instance of AlreadyExists error.
func NewAlreadyExistsError(operationName string) error { func NewAlreadyExistsError(operationKey string) error {
return alreadyExistsError{operationName} return alreadyExistsError{operationKey}
} }
// IsAlreadyExists returns true if an error returned from // IsAlreadyExists returns true if an error returned from
@ -310,7 +321,7 @@ func IsAlreadyExists(err error) bool {
// new operation can not be started because an operation with the same operation // new operation can not be started because an operation with the same operation
// name is already executing. // name is already executing.
type alreadyExistsError struct { type alreadyExistsError struct {
operationName string operationKey string
} }
var _ error = alreadyExistsError{} var _ error = alreadyExistsError{}
@ -318,5 +329,5 @@ var _ error = alreadyExistsError{}
func (err alreadyExistsError) Error() string { func (err alreadyExistsError) Error() string {
return fmt.Sprintf( return fmt.Sprintf(
"Failed to create operation with name %q. An operation with that name is already executing.", "Failed to create operation with name %q. An operation with that name is already executing.",
err.operationName) err.operationKey)
} }

View File

@ -23,6 +23,7 @@ import (
"k8s.io/api/core/v1" "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/util/wait" "k8s.io/apimachinery/pkg/util/wait"
"k8s.io/kubernetes/pkg/util/goroutinemap/exponentialbackoff"
"k8s.io/kubernetes/pkg/volume/util/types" "k8s.io/kubernetes/pkg/volume/util/types"
) )
@ -264,6 +265,47 @@ func Test_NewGoRoutineMap_Negative_SecondOpBeforeFirstCompletes(t *testing.T) {
} }
} }
func Test_NewGoRoutineMap_Negative_SecondThirdOpWithDifferentNames(t *testing.T) {
// Arrange
grm := NewNestedPendingOperations(true /* exponentialBackOffOnError */)
volumeName := v1.UniqueVolumeName("volume-name")
op1Name := "mount_volume"
operation1 := generateErrorFunc()
err1 := grm.Run(volumeName, "" /* operationSubName */, types.GeneratedOperations{OperationFunc: operation1, OperationName: op1Name})
if err1 != nil {
t.Fatalf("NewGoRoutine failed. Expected: <no error> Actual: <%v>", err1)
}
// Shorter than exponential backoff period, so as to trigger exponential backoff error on second
// operation.
operation2 := generateErrorFunc()
err2 := retryWithExponentialBackOff(
initialOperationWaitTimeShort,
func() (bool, error) {
err := grm.Run(volumeName,
"", /* operationSubName */
types.GeneratedOperations{OperationFunc: operation2, OperationName: op1Name})
if exponentialbackoff.IsExponentialBackoff(err) {
return true, nil
}
return false, nil
},
)
// Assert
if err2 != nil {
t.Fatalf("Expected NewGoRoutine to fail with exponential backoff for operationKey : %s and operationName : %s", volumeName, op1Name)
}
operation3 := generateNoopFunc()
op3Name := "unmount_volume"
// Act
err3 := grm.Run(volumeName, "" /*pod name*/, types.GeneratedOperations{OperationFunc: operation3, OperationName: op3Name})
if err3 != nil {
t.Fatalf("NewGoRoutine failed. Expected <no error> Actual: <%v>", err3)
}
}
func Test_NewGoRoutineMap_Negative_SecondSubOpBeforeFirstCompletes2(t *testing.T) { func Test_NewGoRoutineMap_Negative_SecondSubOpBeforeFirstCompletes2(t *testing.T) {
// Arrange // Arrange
grm := NewNestedPendingOperations(false /* exponentialBackOffOnError */) grm := NewNestedPendingOperations(false /* exponentialBackOffOnError */)
@ -542,6 +584,12 @@ func generatePanicFunc() func() (error, error) {
} }
} }
func generateErrorFunc() func() (error, error) {
return func() (error, error) {
return fmt.Errorf("placholder1"), fmt.Errorf("placeholder2")
}
}
func generateNoopFunc() func() (error, error) { func generateNoopFunc() func() (error, error) {
return func() (error, error) { return nil, nil } return func() (error, error) { return nil, nil }
} }

View File

@ -707,6 +707,7 @@ func (og *operationGenerator) GenerateMountVolumeFunc(
} }
return volumetypes.GeneratedOperations{ return volumetypes.GeneratedOperations{
OperationName: "volume_mount",
OperationFunc: mountVolumeFunc, OperationFunc: mountVolumeFunc,
EventRecorderFunc: eventRecorderFunc, EventRecorderFunc: eventRecorderFunc,
CompleteFunc: util.OperationCompleteHook(util.GetFullQualifiedPluginNameForVolume(volumePluginName, volumeToMount.VolumeSpec), "volume_mount"), CompleteFunc: util.OperationCompleteHook(util.GetFullQualifiedPluginNameForVolume(volumePluginName, volumeToMount.VolumeSpec), "volume_mount"),
@ -834,6 +835,7 @@ func (og *operationGenerator) GenerateUnmountVolumeFunc(
} }
return volumetypes.GeneratedOperations{ return volumetypes.GeneratedOperations{
OperationName: "volume_unmount",
OperationFunc: unmountVolumeFunc, OperationFunc: unmountVolumeFunc,
CompleteFunc: util.OperationCompleteHook(util.GetFullQualifiedPluginNameForVolume(volumePlugin.GetPluginName(), volumeToUnmount.VolumeSpec), "volume_unmount"), CompleteFunc: util.OperationCompleteHook(util.GetFullQualifiedPluginNameForVolume(volumePlugin.GetPluginName(), volumeToUnmount.VolumeSpec), "volume_unmount"),
EventRecorderFunc: nil, // nil because we do not want to generate event on error EventRecorderFunc: nil, // nil because we do not want to generate event on error
@ -925,6 +927,7 @@ func (og *operationGenerator) GenerateUnmountDeviceFunc(
} }
return volumetypes.GeneratedOperations{ return volumetypes.GeneratedOperations{
OperationName: "unmount_device",
OperationFunc: unmountDeviceFunc, OperationFunc: unmountDeviceFunc,
CompleteFunc: util.OperationCompleteHook(util.GetFullQualifiedPluginNameForVolume(deviceMountableVolumePlugin.GetPluginName(), deviceToDetach.VolumeSpec), "unmount_device"), CompleteFunc: util.OperationCompleteHook(util.GetFullQualifiedPluginNameForVolume(deviceMountableVolumePlugin.GetPluginName(), deviceToDetach.VolumeSpec), "unmount_device"),
EventRecorderFunc: nil, // nil because we do not want to generate event on error EventRecorderFunc: nil, // nil because we do not want to generate event on error

View File

@ -28,6 +28,8 @@ type UniquePVCName types.UID
// GeneratedOperations contains the operation that is created as well as // GeneratedOperations contains the operation that is created as well as
// supporting functions required for the operation executor // supporting functions required for the operation executor
type GeneratedOperations struct { type GeneratedOperations struct {
// Name of operation - could be used for resetting shared exponential backoff
OperationName string
OperationFunc func() (eventErr error, detailedErr error) OperationFunc func() (eventErr error, detailedErr error)
EventRecorderFunc func(*error) EventRecorderFunc func(*error)
CompleteFunc func(*error) CompleteFunc func(*error)