mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-23 19:56:01 +00:00
Refactor operation keys for NestedPendingOperations
This commit is contained in:
parent
3fca0a6e41
commit
37957e2a0d
@ -88,8 +88,7 @@ type nestedPendingOperations struct {
|
||||
}
|
||||
|
||||
type operation struct {
|
||||
volumeName v1.UniqueVolumeName
|
||||
podName types.UniquePodName
|
||||
key operationKey
|
||||
operationName string
|
||||
operationPending bool
|
||||
expBackoff exponentialbackoff.ExponentialBackoff
|
||||
@ -101,18 +100,19 @@ func (grm *nestedPendingOperations) Run(
|
||||
generatedOperations types.GeneratedOperations) error {
|
||||
grm.lock.Lock()
|
||||
defer grm.lock.Unlock()
|
||||
opExists, previousOpIndex := grm.isOperationExists(volumeName, podName)
|
||||
|
||||
opKey := operationKey{volumeName, podName}
|
||||
|
||||
opExists, previousOpIndex := grm.isOperationExists(opKey)
|
||||
if opExists {
|
||||
previousOp := grm.operations[previousOpIndex]
|
||||
// Operation already exists
|
||||
if previousOp.operationPending {
|
||||
// Operation is pending
|
||||
operationKey := getOperationKey(volumeName, podName)
|
||||
return NewAlreadyExistsError(operationKey)
|
||||
return NewAlreadyExistsError(opKey)
|
||||
}
|
||||
|
||||
operationKey := getOperationKey(volumeName, podName)
|
||||
backOffErr := previousOp.expBackoff.SafeToRetry(operationKey)
|
||||
backOffErr := previousOp.expBackoff.SafeToRetry(opKey.String())
|
||||
if backOffErr != nil {
|
||||
if previousOp.operationName == generatedOperations.OperationName {
|
||||
return backOffErr
|
||||
@ -124,15 +124,13 @@ func (grm *nestedPendingOperations) Run(
|
||||
|
||||
// Update existing operation to mark as pending.
|
||||
grm.operations[previousOpIndex].operationPending = true
|
||||
grm.operations[previousOpIndex].volumeName = volumeName
|
||||
grm.operations[previousOpIndex].podName = podName
|
||||
grm.operations[previousOpIndex].key = opKey
|
||||
} else {
|
||||
// Create a new operation
|
||||
grm.operations = append(grm.operations,
|
||||
operation{
|
||||
key: opKey,
|
||||
operationPending: true,
|
||||
volumeName: volumeName,
|
||||
podName: podName,
|
||||
operationName: generatedOperations.OperationName,
|
||||
expBackoff: exponentialbackoff.ExponentialBackoff{},
|
||||
})
|
||||
@ -142,7 +140,7 @@ func (grm *nestedPendingOperations) Run(
|
||||
// Handle unhandled panics (very unlikely)
|
||||
defer k8sRuntime.HandleCrash()
|
||||
// Handle completion of and error, if any, from operationFunc()
|
||||
defer grm.operationComplete(volumeName, podName, &detailedErr)
|
||||
defer grm.operationComplete(opKey, &detailedErr)
|
||||
return generatedOperations.Run()
|
||||
}()
|
||||
|
||||
@ -156,7 +154,8 @@ func (grm *nestedPendingOperations) IsOperationPending(
|
||||
grm.lock.RLock()
|
||||
defer grm.lock.RUnlock()
|
||||
|
||||
exist, previousOpIndex := grm.isOperationExists(volumeName, podName)
|
||||
opKey := operationKey{volumeName, podName}
|
||||
exist, previousOpIndex := grm.isOperationExists(opKey)
|
||||
if exist && grm.operations[previousOpIndex].operationPending {
|
||||
return true
|
||||
}
|
||||
@ -164,59 +163,49 @@ func (grm *nestedPendingOperations) IsOperationPending(
|
||||
}
|
||||
|
||||
// This is an internal function and caller should acquire and release the lock
|
||||
func (grm *nestedPendingOperations) isOperationExists(
|
||||
volumeName v1.UniqueVolumeName,
|
||||
podName types.UniquePodName) (bool, int) {
|
||||
func (grm *nestedPendingOperations) isOperationExists(key operationKey) (bool, int) {
|
||||
|
||||
// If volumeName is empty, operation can be executed concurrently
|
||||
if volumeName == EmptyUniqueVolumeName {
|
||||
if key.volumeName == EmptyUniqueVolumeName {
|
||||
return false, -1
|
||||
}
|
||||
|
||||
for previousOpIndex, previousOp := range grm.operations {
|
||||
if previousOp.volumeName != volumeName {
|
||||
// No match, keep searching
|
||||
continue
|
||||
}
|
||||
volumeNameMatch := previousOp.key.volumeName == key.volumeName
|
||||
|
||||
if previousOp.podName != EmptyUniquePodName &&
|
||||
podName != EmptyUniquePodName &&
|
||||
previousOp.podName != podName {
|
||||
// No match, keep searching
|
||||
continue
|
||||
}
|
||||
podNameMatch := previousOp.key.podName == EmptyUniquePodName ||
|
||||
key.podName == EmptyUniquePodName ||
|
||||
previousOp.key.podName == key.podName
|
||||
|
||||
// Match
|
||||
return true, previousOpIndex
|
||||
|
||||
if volumeNameMatch && podNameMatch {
|
||||
return true, previousOpIndex
|
||||
}
|
||||
}
|
||||
|
||||
return false, -1
|
||||
}
|
||||
|
||||
func (grm *nestedPendingOperations) getOperation(
|
||||
volumeName v1.UniqueVolumeName,
|
||||
podName types.UniquePodName) (uint, error) {
|
||||
func (grm *nestedPendingOperations) getOperation(key operationKey) (uint, error) {
|
||||
// Assumes lock has been acquired by caller.
|
||||
|
||||
for i, op := range grm.operations {
|
||||
if op.volumeName == volumeName &&
|
||||
op.podName == podName {
|
||||
if op.key.volumeName == key.volumeName &&
|
||||
op.key.podName == key.podName {
|
||||
return uint(i), nil
|
||||
}
|
||||
}
|
||||
|
||||
logOperationKey := getOperationKey(volumeName, podName)
|
||||
return 0, fmt.Errorf("Operation %q not found", logOperationKey)
|
||||
return 0, fmt.Errorf("Operation %q not found", key)
|
||||
}
|
||||
|
||||
func (grm *nestedPendingOperations) deleteOperation(
|
||||
func (grm *nestedPendingOperations) deleteOperation(key operationKey) {
|
||||
// Assumes lock has been acquired by caller.
|
||||
volumeName v1.UniqueVolumeName,
|
||||
podName types.UniquePodName) {
|
||||
|
||||
opIndex := -1
|
||||
for i, op := range grm.operations {
|
||||
if op.volumeName == volumeName &&
|
||||
op.podName == podName {
|
||||
if op.key.volumeName == key.volumeName &&
|
||||
op.key.podName == key.podName {
|
||||
opIndex = i
|
||||
break
|
||||
}
|
||||
@ -227,8 +216,7 @@ func (grm *nestedPendingOperations) deleteOperation(
|
||||
grm.operations = grm.operations[:len(grm.operations)-1]
|
||||
}
|
||||
|
||||
func (grm *nestedPendingOperations) operationComplete(
|
||||
volumeName v1.UniqueVolumeName, podName types.UniquePodName, err *error) {
|
||||
func (grm *nestedPendingOperations) operationComplete(key operationKey, err *error) {
|
||||
// Defer operations are executed in Last-In is First-Out order. In this case
|
||||
// the lock is acquired first when operationCompletes begins, and is
|
||||
// released when the method finishes, after the lock is released cond is
|
||||
@ -239,24 +227,20 @@ func (grm *nestedPendingOperations) operationComplete(
|
||||
|
||||
if *err == nil || !grm.exponentialBackOffOnError {
|
||||
// Operation completed without error, or exponentialBackOffOnError disabled
|
||||
grm.deleteOperation(volumeName, podName)
|
||||
grm.deleteOperation(key)
|
||||
if *err != nil {
|
||||
// Log error
|
||||
logOperationKey := getOperationKey(volumeName, podName)
|
||||
klog.Errorf("operation %s failed with: %v",
|
||||
logOperationKey,
|
||||
*err)
|
||||
klog.Errorf("operation %s failed with: %v", key, *err)
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
// Operation completed with error and exponentialBackOffOnError Enabled
|
||||
existingOpIndex, getOpErr := grm.getOperation(volumeName, podName)
|
||||
existingOpIndex, getOpErr := grm.getOperation(key)
|
||||
if getOpErr != nil {
|
||||
// Failed to find existing operation
|
||||
logOperationKey := getOperationKey(volumeName, podName)
|
||||
klog.Errorf("Operation %s completed. error: %v. exponentialBackOffOnError is enabled, but failed to get operation to update.",
|
||||
logOperationKey,
|
||||
key,
|
||||
*err)
|
||||
return
|
||||
}
|
||||
@ -265,10 +249,8 @@ func (grm *nestedPendingOperations) operationComplete(
|
||||
grm.operations[existingOpIndex].operationPending = false
|
||||
|
||||
// Log error
|
||||
operationKey :=
|
||||
getOperationKey(volumeName, podName)
|
||||
klog.Errorf("%v", grm.operations[existingOpIndex].expBackoff.
|
||||
GenerateNoRetriesPermittedMsg(operationKey))
|
||||
GenerateNoRetriesPermittedMsg(key.String()))
|
||||
}
|
||||
|
||||
func (grm *nestedPendingOperations) Wait() {
|
||||
@ -280,21 +262,22 @@ func (grm *nestedPendingOperations) Wait() {
|
||||
}
|
||||
}
|
||||
|
||||
func getOperationKey(
|
||||
volumeName v1.UniqueVolumeName, podName types.UniquePodName) string {
|
||||
podNameStr := ""
|
||||
if podName != EmptyUniquePodName {
|
||||
podNameStr = fmt.Sprintf(" (%q)", podName)
|
||||
}
|
||||
type operationKey struct {
|
||||
volumeName v1.UniqueVolumeName
|
||||
podName types.UniquePodName
|
||||
}
|
||||
|
||||
func (key operationKey) String() string {
|
||||
podNameStr := fmt.Sprintf(" (%q)", key.podName)
|
||||
|
||||
return fmt.Sprintf("%q%s",
|
||||
volumeName,
|
||||
key.volumeName,
|
||||
podNameStr)
|
||||
}
|
||||
|
||||
// NewAlreadyExistsError returns a new instance of AlreadyExists error.
|
||||
func NewAlreadyExistsError(operationKey string) error {
|
||||
return alreadyExistsError{operationKey}
|
||||
func NewAlreadyExistsError(key operationKey) error {
|
||||
return alreadyExistsError{key}
|
||||
}
|
||||
|
||||
// IsAlreadyExists returns true if an error returned from
|
||||
@ -313,7 +296,7 @@ func IsAlreadyExists(err error) bool {
|
||||
// new operation can not be started because an operation with the same operation
|
||||
// name is already executing.
|
||||
type alreadyExistsError struct {
|
||||
operationKey string
|
||||
operationKey operationKey
|
||||
}
|
||||
|
||||
var _ error = alreadyExistsError{}
|
||||
|
Loading…
Reference in New Issue
Block a user