diff --git a/pkg/controller/volume/attachdetach/reconciler/reconciler.go b/pkg/controller/volume/attachdetach/reconciler/reconciler.go index 020b2d84160..ab39f5b8a1c 100644 --- a/pkg/controller/volume/attachdetach/reconciler/reconciler.go +++ b/pkg/controller/volume/attachdetach/reconciler/reconciler.go @@ -25,8 +25,9 @@ import ( "github.com/golang/glog" "k8s.io/kubernetes/pkg/controller/volume/attachdetach/cache" "k8s.io/kubernetes/pkg/controller/volume/attachdetach/statusupdater" - "k8s.io/kubernetes/pkg/util/goroutinemap" + "k8s.io/kubernetes/pkg/util/goroutinemap/exponentialbackoff" "k8s.io/kubernetes/pkg/util/wait" + "k8s.io/kubernetes/pkg/volume/util/nestedpendingoperations" "k8s.io/kubernetes/pkg/volume/util/operationexecutor" ) @@ -114,9 +115,9 @@ func (rc *reconciler) reconciliationLoopFunc() func() { glog.Infof("Started DetachVolume for volume %q from node %q", attachedVolume.VolumeName, attachedVolume.NodeName) } if err != nil && - !goroutinemap.IsAlreadyExists(err) && - !goroutinemap.IsExponentialBackoff(err) { - // Ignore goroutinemap.IsAlreadyExists && goroutinemap.IsExponentialBackoff errors, they are expected. + !nestedpendingoperations.IsAlreadyExists(err) && + !exponentialbackoff.IsExponentialBackoff(err) { + // Ignore nestedpendingoperations.IsAlreadyExists && exponentialbackoff.IsExponentialBackoff errors, they are expected. // Log all other errors. glog.Errorf( "operationExecutor.DetachVolume failed to start for volume %q (spec.Name: %q) from node %q with err: %v", @@ -134,9 +135,9 @@ func (rc *reconciler) reconciliationLoopFunc() func() { glog.Infof("Started DetachVolume for volume %q from node %q due to maxWaitForUnmountDuration expiry.", attachedVolume.VolumeName, attachedVolume.NodeName) } if err != nil && - !goroutinemap.IsAlreadyExists(err) && - !goroutinemap.IsExponentialBackoff(err) { - // Ignore goroutinemap.IsAlreadyExists && goroutinemap.IsExponentialBackoff errors, they are expected. + !nestedpendingoperations.IsAlreadyExists(err) && + !exponentialbackoff.IsExponentialBackoff(err) { + // Ignore nestedpendingoperations.IsAlreadyExists && exponentialbackoff.IsExponentialBackoff errors, they are expected. // Log all other errors. glog.Errorf( "operationExecutor.DetachVolume failed to start (maxWaitForUnmountDuration expiry) for volume %q (spec.Name: %q) from node %q with err: %v", @@ -169,9 +170,9 @@ func (rc *reconciler) reconciliationLoopFunc() func() { glog.Infof("Started AttachVolume for volume %q to node %q", volumeToAttach.VolumeName, volumeToAttach.NodeName) } if err != nil && - !goroutinemap.IsAlreadyExists(err) && - !goroutinemap.IsExponentialBackoff(err) { - // Ignore goroutinemap.IsAlreadyExists && goroutinemap.IsExponentialBackoff errors, they are expected. + !nestedpendingoperations.IsAlreadyExists(err) && + !exponentialbackoff.IsExponentialBackoff(err) { + // Ignore nestedpendingoperations.IsAlreadyExists && exponentialbackoff.IsExponentialBackoff errors, they are expected. // Log all other errors. glog.Errorf( "operationExecutor.AttachVolume failed to start for volume %q (spec.Name: %q) to node %q with err: %v", diff --git a/pkg/kubelet/volumemanager/reconciler/reconciler.go b/pkg/kubelet/volumemanager/reconciler/reconciler.go index 3a480de00f3..38e11e231aa 100644 --- a/pkg/kubelet/volumemanager/reconciler/reconciler.go +++ b/pkg/kubelet/volumemanager/reconciler/reconciler.go @@ -25,9 +25,10 @@ import ( "github.com/golang/glog" "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset" "k8s.io/kubernetes/pkg/kubelet/volumemanager/cache" - "k8s.io/kubernetes/pkg/util/goroutinemap" + "k8s.io/kubernetes/pkg/util/goroutinemap/exponentialbackoff" "k8s.io/kubernetes/pkg/util/mount" "k8s.io/kubernetes/pkg/util/wait" + "k8s.io/kubernetes/pkg/volume/util/nestedpendingoperations" "k8s.io/kubernetes/pkg/volume/util/operationexecutor" ) @@ -122,9 +123,9 @@ func (rc *reconciler) reconciliationLoopFunc() func() { err := rc.operationExecutor.UnmountVolume( mountedVolume.MountedVolume, rc.actualStateOfWorld) if err != nil && - !goroutinemap.IsAlreadyExists(err) && - !goroutinemap.IsExponentialBackoff(err) { - // Ignore goroutinemap.IsAlreadyExists and goroutinemap.IsExponentialBackoff errors, they are expected. + !nestedpendingoperations.IsAlreadyExists(err) && + !exponentialbackoff.IsExponentialBackoff(err) { + // Ignore nestedpendingoperations.IsAlreadyExists and exponentialbackoff.IsExponentialBackoff errors, they are expected. // Log all other errors. glog.Errorf( "operationExecutor.UnmountVolume failed for volume %q (spec.Name: %q) pod %q (UID: %q) controllerAttachDetachEnabled: %v with err: %v", @@ -163,9 +164,9 @@ func (rc *reconciler) reconciliationLoopFunc() func() { rc.hostName, rc.actualStateOfWorld) if err != nil && - !goroutinemap.IsAlreadyExists(err) && - !goroutinemap.IsExponentialBackoff(err) { - // Ignore goroutinemap.IsAlreadyExists and goroutinemap.IsExponentialBackoff errors, they are expected. + !nestedpendingoperations.IsAlreadyExists(err) && + !exponentialbackoff.IsExponentialBackoff(err) { + // Ignore nestedpendingoperations.IsAlreadyExists and exponentialbackoff.IsExponentialBackoff errors, they are expected. // Log all other errors. glog.Errorf( "operationExecutor.VerifyControllerAttachedVolume failed for volume %q (spec.Name: %q) pod %q (UID: %q) controllerAttachDetachEnabled: %v with err: %v", @@ -198,9 +199,9 @@ func (rc *reconciler) reconciliationLoopFunc() func() { volumeToMount.Pod.UID) err := rc.operationExecutor.AttachVolume(volumeToAttach, rc.actualStateOfWorld) if err != nil && - !goroutinemap.IsAlreadyExists(err) && - !goroutinemap.IsExponentialBackoff(err) { - // Ignore goroutinemap.IsAlreadyExists and goroutinemap.IsExponentialBackoff errors, they are expected. + !nestedpendingoperations.IsAlreadyExists(err) && + !exponentialbackoff.IsExponentialBackoff(err) { + // Ignore nestedpendingoperations.IsAlreadyExists and exponentialbackoff.IsExponentialBackoff errors, they are expected. // Log all other errors. glog.Errorf( "operationExecutor.AttachVolume failed for volume %q (spec.Name: %q) pod %q (UID: %q) controllerAttachDetachEnabled: %v with err: %v", @@ -236,9 +237,9 @@ func (rc *reconciler) reconciliationLoopFunc() func() { volumeToMount.VolumeToMount, rc.actualStateOfWorld) if err != nil && - !goroutinemap.IsAlreadyExists(err) && - !goroutinemap.IsExponentialBackoff(err) { - // Ignore goroutinemap.IsAlreadyExists and goroutinemap.IsExponentialBackoff errors, they are expected. + !nestedpendingoperations.IsAlreadyExists(err) && + !exponentialbackoff.IsExponentialBackoff(err) { + // Ignore nestedpendingoperations.IsAlreadyExists and exponentialbackoff.IsExponentialBackoff errors, they are expected. // Log all other errors. glog.Errorf( "operationExecutor.MountVolume failed for volume %q (spec.Name: %q) pod %q (UID: %q) controllerAttachDetachEnabled: %v with err: %v", @@ -271,9 +272,9 @@ func (rc *reconciler) reconciliationLoopFunc() func() { err := rc.operationExecutor.UnmountDevice( attachedVolume.AttachedVolume, rc.actualStateOfWorld, rc.mounter) if err != nil && - !goroutinemap.IsAlreadyExists(err) && - !goroutinemap.IsExponentialBackoff(err) { - // Ignore goroutinemap.IsAlreadyExists and goroutinemap.IsExponentialBackoff errors, they are expected. + !nestedpendingoperations.IsAlreadyExists(err) && + !exponentialbackoff.IsExponentialBackoff(err) { + // Ignore nestedpendingoperations.IsAlreadyExists and exponentialbackoff.IsExponentialBackoff errors, they are expected. // Log all other errors. glog.Errorf( "operationExecutor.UnmountDevice failed for volume %q (spec.Name: %q) controllerAttachDetachEnabled: %v with err: %v", @@ -302,9 +303,9 @@ func (rc *reconciler) reconciliationLoopFunc() func() { err := rc.operationExecutor.DetachVolume( attachedVolume.AttachedVolume, false /* verifySafeToDetach */, rc.actualStateOfWorld) if err != nil && - !goroutinemap.IsAlreadyExists(err) && - !goroutinemap.IsExponentialBackoff(err) { - // Ignore goroutinemap.IsAlreadyExists && goroutinemap.IsExponentialBackoff errors, they are expected. + !nestedpendingoperations.IsAlreadyExists(err) && + !exponentialbackoff.IsExponentialBackoff(err) { + // Ignore nestedpendingoperations.IsAlreadyExists && exponentialbackoff.IsExponentialBackoff errors, they are expected. // Log all other errors. glog.Errorf( "operationExecutor.DetachVolume failed for volume %q (spec.Name: %q) controllerAttachDetachEnabled: %v with err: %v", diff --git a/pkg/util/goroutinemap/exponentialbackoff/exponential_backoff.go b/pkg/util/goroutinemap/exponentialbackoff/exponential_backoff.go new file mode 100644 index 00000000000..c00fe46823b --- /dev/null +++ b/pkg/util/goroutinemap/exponentialbackoff/exponential_backoff.go @@ -0,0 +1,120 @@ +/* +Copyright 2016 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +// Package exponentialbackoff contains logic for implementing exponential +// backoff for GoRoutineMap and NestedPendingOperations. +package exponentialbackoff + +import ( + "fmt" + "time" +) + +const ( + // initialDurationBeforeRetry is the amount of time after an error occurs + // that GoroutineMap will refuse to allow another operation to start with + // the same target (if exponentialBackOffOnError is enabled). Each + // successive error results in a wait 2x times the previous. + initialDurationBeforeRetry time.Duration = 500 * time.Millisecond + + // maxDurationBeforeRetry is the maximum amount of time that + // durationBeforeRetry will grow to due to exponential backoff. + maxDurationBeforeRetry time.Duration = 2 * time.Minute +) + +// ExponentialBackoff contains the last occurrence of an error and the duration +// that retries are not permitted. +type ExponentialBackoff struct { + lastError error + lastErrorTime time.Time + durationBeforeRetry time.Duration +} + +// SafeToRetry returns an error if the durationBeforeRetry period for the given +// lastErrorTime has not yet expired. Otherwise it returns nil. +func (expBackoff *ExponentialBackoff) SafeToRetry(operationName string) error { + if time.Since(expBackoff.lastErrorTime) <= expBackoff.durationBeforeRetry { + return NewExponentialBackoffError(operationName, *expBackoff) + } + + return nil +} + +func (expBackoff *ExponentialBackoff) Update(err *error) { + if expBackoff.durationBeforeRetry == 0 { + expBackoff.durationBeforeRetry = initialDurationBeforeRetry + } else { + expBackoff.durationBeforeRetry = 2 * expBackoff.durationBeforeRetry + if expBackoff.durationBeforeRetry > maxDurationBeforeRetry { + expBackoff.durationBeforeRetry = maxDurationBeforeRetry + } + } + + expBackoff.lastError = *err + expBackoff.lastErrorTime = time.Now() +} + +func (expBackoff *ExponentialBackoff) GenerateNoRetriesPermittedMsg( + operationName string) string { + return fmt.Sprintf("Operation for %q failed. No retries permitted until %v (durationBeforeRetry %v). Error: %v", + operationName, + expBackoff.lastErrorTime.Add(expBackoff.durationBeforeRetry), + expBackoff.durationBeforeRetry, + expBackoff.lastError) +} + +// NewExponentialBackoffError returns a new instance of ExponentialBackoff error. +func NewExponentialBackoffError( + operationName string, expBackoff ExponentialBackoff) error { + return exponentialBackoffError{ + operationName: operationName, + expBackoff: expBackoff, + } +} + +// IsExponentialBackoff returns true if an error returned from GoroutineMap +// indicates that a new operation can not be started because +// exponentialBackOffOnError is enabled and a previous operation with the same +// operation failed within the durationBeforeRetry period. +func IsExponentialBackoff(err error) bool { + switch err.(type) { + case exponentialBackoffError: + return true + default: + return false + } +} + +// exponentialBackoffError is the error returned returned from GoroutineMap when +// a new operation can not be started because exponentialBackOffOnError is +// enabled and a previous operation with the same operation failed within the +// durationBeforeRetry period. +type exponentialBackoffError struct { + operationName string + expBackoff ExponentialBackoff +} + +var _ error = exponentialBackoffError{} + +func (err exponentialBackoffError) Error() string { + return fmt.Sprintf( + "Failed to create operation with name %q. An operation with that name failed at %v. No retries permitted until %v (%v). Last error: %q.", + err.operationName, + err.expBackoff.lastErrorTime, + err.expBackoff.lastErrorTime.Add(err.expBackoff.durationBeforeRetry), + err.expBackoff.durationBeforeRetry, + err.expBackoff.lastError) +} diff --git a/pkg/util/goroutinemap/goroutinemap.go b/pkg/util/goroutinemap/goroutinemap.go index fcb3608c673..202634f15b9 100644 --- a/pkg/util/goroutinemap/goroutinemap.go +++ b/pkg/util/goroutinemap/goroutinemap.go @@ -23,18 +23,18 @@ package goroutinemap import ( "fmt" - "runtime" "sync" "time" "github.com/golang/glog" + "k8s.io/kubernetes/pkg/util/goroutinemap/exponentialbackoff" k8sRuntime "k8s.io/kubernetes/pkg/util/runtime" ) const ( // initialDurationBeforeRetry is the amount of time after an error occurs // that GoRoutineMap will refuse to allow another operation to start with - // the same operationName (if exponentialBackOffOnError is enabled). Each + // the same operation name (if exponentialBackOffOnError is enabled). Each // successive error results in a wait 2x times the previous. initialDurationBeforeRetry time.Duration = 500 * time.Millisecond @@ -45,12 +45,13 @@ const ( // GoRoutineMap defines the supported set of operations. type GoRoutineMap interface { - // Run adds operationName to the list of running operations and spawns a new - // go routine to execute the operation. If an operation with the same name - // already exists, an error is returned. Once the operation is complete, the - // go routine is terminated and the operationName is removed from the list - // of executing operations allowing a new operation to be started with the - // same name without error. + // Run adds operation name to the list of running operations and spawns a + // new go routine to execute the operation. + // If an operation with the same operation name already exists, an + // AlreadyExists or ExponentialBackoff error is returned. + // Once the operation is complete, the go routine is terminated and the + // operation name is removed from the list of executing operations allowing + // a new operation to be started with the same operation name without error. Run(operationName string, operationFunc func() error) error // Wait blocks until all operations are completed. This is typically @@ -64,8 +65,10 @@ func NewGoRoutineMap(exponentialBackOffOnError bool) GoRoutineMap { g := &goRoutineMap{ operations: make(map[string]operation), exponentialBackOffOnError: exponentialBackOffOnError, + lock: &sync.Mutex{}, } - g.cond = sync.NewCond(g) + + g.cond = sync.NewCond(g.lock) return g } @@ -73,36 +76,35 @@ type goRoutineMap struct { operations map[string]operation exponentialBackOffOnError bool cond *sync.Cond - sync.Mutex + lock *sync.Mutex } type operation struct { - operationPending bool - lastError error - lastErrorTime time.Time - durationBeforeRetry time.Duration + operationPending bool + expBackoff exponentialbackoff.ExponentialBackoff } -func (grm *goRoutineMap) Run(operationName string, operationFunc func() error) error { - grm.Lock() - defer grm.Unlock() +func (grm *goRoutineMap) Run( + operationName string, + operationFunc func() error) error { + grm.lock.Lock() + defer grm.lock.Unlock() + existingOp, exists := grm.operations[operationName] if exists { // Operation with name exists if existingOp.operationPending { - return newAlreadyExistsError(operationName) + return NewAlreadyExistsError(operationName) } - if time.Since(existingOp.lastErrorTime) <= existingOp.durationBeforeRetry { - return newExponentialBackoffError(operationName, existingOp) + if err := existingOp.expBackoff.SafeToRetry(operationName); err != nil { + return err } } grm.operations[operationName] = operation{ - operationPending: true, - lastError: existingOp.lastError, - lastErrorTime: existingOp.lastErrorTime, - durationBeforeRetry: existingOp.durationBeforeRetry, + operationPending: true, + expBackoff: existingOp.expBackoff, } go func() (err error) { // Handle unhandled panics (very unlikely) @@ -110,17 +112,22 @@ func (grm *goRoutineMap) Run(operationName string, operationFunc func() error) e // Handle completion of and error, if any, from operationFunc() defer grm.operationComplete(operationName, &err) // Handle panic, if any, from operationFunc() - defer recoverFromPanic(operationName, &err) + defer k8sRuntime.RecoverFromPanic(&err) return operationFunc() }() return nil } -func (grm *goRoutineMap) operationComplete(operationName string, err *error) { +func (grm *goRoutineMap) operationComplete( + operationName string, err *error) { + // Defer operations are executed in Last-In is First-Out order. In this case + // the lock is acquired first when operationCompletes begins, and is + // released when the method finishes, after the lock is released cond is + // signaled to wake waiting goroutine. defer grm.cond.Signal() - grm.Lock() - defer grm.Unlock() + grm.lock.Lock() + defer grm.lock.Unlock() if *err == nil || !grm.exponentialBackOffOnError { // Operation completed without error, or exponentialBackOffOnError disabled @@ -134,75 +141,33 @@ func (grm *goRoutineMap) operationComplete(operationName string, err *error) { } else { // Operation completed with error and exponentialBackOffOnError Enabled existingOp := grm.operations[operationName] - if existingOp.durationBeforeRetry == 0 { - existingOp.durationBeforeRetry = initialDurationBeforeRetry - } else { - existingOp.durationBeforeRetry = 2 * existingOp.durationBeforeRetry - if existingOp.durationBeforeRetry > maxDurationBeforeRetry { - existingOp.durationBeforeRetry = maxDurationBeforeRetry - } - } - existingOp.lastError = *err - existingOp.lastErrorTime = time.Now() + existingOp.expBackoff.Update(err) existingOp.operationPending = false - grm.operations[operationName] = existingOp // Log error - glog.Errorf("Operation for %q failed. No retries permitted until %v (durationBeforeRetry %v). error: %v", - operationName, - existingOp.lastErrorTime.Add(existingOp.durationBeforeRetry), - existingOp.durationBeforeRetry, - *err) + glog.Errorf("%v", + existingOp.expBackoff.GenerateNoRetriesPermittedMsg(operationName)) } } func (grm *goRoutineMap) Wait() { - grm.Lock() - defer grm.Unlock() + grm.lock.Lock() + defer grm.lock.Unlock() for len(grm.operations) > 0 { grm.cond.Wait() } } -func recoverFromPanic(operationName string, err *error) { - if r := recover(); r != nil { - callers := "" - for i := 0; true; i++ { - _, file, line, ok := runtime.Caller(i) - if !ok { - break - } - callers = callers + fmt.Sprintf("%v:%v\n", file, line) - } - *err = fmt.Errorf( - "operation for %q recovered from panic %q. (err=%v) Call stack:\n%v", - operationName, - r, - *err, - callers) - } -} - -// alreadyExistsError is the error returned when NewGoRoutine() detects that -// an operation with the given name is already running. -type alreadyExistsError struct { - operationName string -} - -var _ error = alreadyExistsError{} - -func (err alreadyExistsError) Error() string { - return fmt.Sprintf("Failed to create operation with name %q. An operation with that name is already executing.", err.operationName) -} - -func newAlreadyExistsError(operationName string) error { +// NewAlreadyExistsError returns a new instance of AlreadyExists error. +func NewAlreadyExistsError(operationName string) error { return alreadyExistsError{operationName} } -// IsAlreadyExists returns true if an error returned from NewGoRoutine indicates -// that operation with the same name already exists. +// IsAlreadyExists returns true if an error returned from GoRoutineMap indicates +// a new operation can not be started because an operation with the same +// operation name is already executing. func IsAlreadyExists(err error) bool { switch err.(type) { case alreadyExistsError: @@ -212,42 +177,17 @@ func IsAlreadyExists(err error) bool { } } -// exponentialBackoffError is the error returned when NewGoRoutine() detects -// that the previous operation for given name failed less then -// durationBeforeRetry. -type exponentialBackoffError struct { +// alreadyExistsError is the error returned by GoRoutineMap when a new operation +// can not be started because an operation with the same operation name is +// already executing. +type alreadyExistsError struct { operationName string - failedOp operation } -var _ error = exponentialBackoffError{} +var _ error = alreadyExistsError{} -func (err exponentialBackoffError) Error() string { +func (err alreadyExistsError) Error() string { return fmt.Sprintf( - "Failed to create operation with name %q. An operation with that name failed at %v. No retries permitted until %v (%v). Last error: %q.", - err.operationName, - err.failedOp.lastErrorTime, - err.failedOp.lastErrorTime.Add(err.failedOp.durationBeforeRetry), - err.failedOp.durationBeforeRetry, - err.failedOp.lastError) -} - -func newExponentialBackoffError( - operationName string, failedOp operation) error { - return exponentialBackoffError{ - operationName: operationName, - failedOp: failedOp, - } -} - -// IsExponentialBackoff returns true if an error returned from NewGoRoutine() -// indicates that the previous operation for given name failed less then -// durationBeforeRetry. -func IsExponentialBackoff(err error) bool { - switch err.(type) { - case exponentialBackoffError: - return true - default: - return false - } + "Failed to create operation with name %q. An operation with that name is already executing.", + err.operationName) } diff --git a/pkg/util/goroutinemap/goroutinemap_test.go b/pkg/util/goroutinemap/goroutinemap_test.go index f4a19570690..9e34a088195 100644 --- a/pkg/util/goroutinemap/goroutinemap_test.go +++ b/pkg/util/goroutinemap/goroutinemap_test.go @@ -56,6 +56,27 @@ func Test_NewGoRoutineMap_Positive_SingleOp(t *testing.T) { } } +func Test_NewGoRoutineMap_Positive_TwoOps(t *testing.T) { + // Arrange + grm := NewGoRoutineMap(false /* exponentialBackOffOnError */) + operation1Name := "operation1-name" + operation2Name := "operation2-name" + operation := func() error { return nil } + + // Act + err1 := grm.Run(operation1Name, operation) + err2 := grm.Run(operation2Name, operation) + + // Assert + if err1 != nil { + t.Fatalf("NewGoRoutine %q failed. Expected: Actual: <%v>", operation1Name, err1) + } + + if err2 != nil { + t.Fatalf("NewGoRoutine %q failed. Expected: Actual: <%v>", operation2Name, err2) + } +} + func Test_NewGoRoutineMap_Positive_SingleOpWithExpBackoff(t *testing.T) { // Arrange grm := NewGoRoutineMap(true /* exponentialBackOffOnError */) diff --git a/pkg/util/runtime/runtime.go b/pkg/util/runtime/runtime.go index 491593c5e96..976de49d582 100644 --- a/pkg/util/runtime/runtime.go +++ b/pkg/util/runtime/runtime.go @@ -18,8 +18,9 @@ package runtime import ( "fmt" - "github.com/golang/glog" "runtime" + + "github.com/golang/glog" ) var ( @@ -59,6 +60,11 @@ func HandleCrash(additionalHandlers ...func(interface{})) { // logPanic logs the caller tree when a panic occurs. func logPanic(r interface{}) { + callers := getCallers(r) + glog.Errorf("Observed a panic: %#v (%v)\n%v", r, r, callers) +} + +func getCallers(r interface{}) string { callers := "" for i := 0; true; i++ { _, file, line, ok := runtime.Caller(i) @@ -67,7 +73,8 @@ func logPanic(r interface{}) { } callers = callers + fmt.Sprintf("%v:%v\n", file, line) } - glog.Errorf("Observed a panic: %#v (%v)\n%v", r, r, callers) + + return callers } // ErrorHandlers is a list of functions which will be invoked when an unreturnable @@ -104,3 +111,18 @@ func GetCaller() string { } return f.Name() } + +// RecoverFromPanic replaces the specified error with an error containing the +// original error, and the call tree when a panic occurs. This enables error +// handlers to handle errors and panics the same way. +func RecoverFromPanic(err *error) { + if r := recover(); r != nil { + callers := getCallers(r) + + *err = fmt.Errorf( + "recovered from panic %q. (err=%v) Call stack:\n%v", + r, + *err, + callers) + } +} diff --git a/pkg/volume/util/nestedpendingoperations/OWNERS b/pkg/volume/util/nestedpendingoperations/OWNERS new file mode 100644 index 00000000000..73ab6a21c98 --- /dev/null +++ b/pkg/volume/util/nestedpendingoperations/OWNERS @@ -0,0 +1,2 @@ +assignees: + - saad-ali diff --git a/pkg/volume/util/nestedpendingoperations/nestedpendingoperations.go b/pkg/volume/util/nestedpendingoperations/nestedpendingoperations.go new file mode 100644 index 00000000000..4aaddf299e8 --- /dev/null +++ b/pkg/volume/util/nestedpendingoperations/nestedpendingoperations.go @@ -0,0 +1,287 @@ +/* +Copyright 2016 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +/* +Package nestedpendingoperations is a modified implementation of +pkg/util/goroutinemap. It implements a data structure for managing go routines +by volume/pod name. It prevents the creation of new go routines if an existing +go routine for the volume already exists. It also allows multiple operations to +execute in parallel for the same volume as long as they are operating on +different pods. +*/ +package nestedpendingoperations + +import ( + "fmt" + "sync" + + "github.com/golang/glog" + "k8s.io/kubernetes/pkg/api" + "k8s.io/kubernetes/pkg/util/goroutinemap/exponentialbackoff" + k8sRuntime "k8s.io/kubernetes/pkg/util/runtime" + "k8s.io/kubernetes/pkg/volume/util/types" +) + +const ( + // emptyUniquePodName is a UniquePodName for empty string. + emptyUniquePodName types.UniquePodName = types.UniquePodName("") +) + +// NestedPendingOperations defines the supported set of operations. +type NestedPendingOperations interface { + // Run adds the concatenation of volumeName and podName to the list of + // running operations and spawns a new go routine to execute operationFunc. + // If an operation with the same volumeName and same or empty podName + // exists, an AlreadyExists or ExponentialBackoff error is returned. + // This enables multiple operations to execute in parallel for the same + // volumeName as long as they have different podName. + // Once the operation is complete, the go routine is terminated and the + // concatenation of volumeName and podName is removed from the list of + // executing operations allowing a new operation to be started with the + // volumeName without error. + Run(volumeName api.UniqueVolumeName, podName types.UniquePodName, operationFunc func() error) error + + // Wait blocks until all operations are completed. This is typically + // necessary during tests - the test should wait until all operations finish + // and evaluate results after that. + Wait() +} + +// NewNestedPendingOperations returns a new instance of NestedPendingOperations. +func NewNestedPendingOperations(exponentialBackOffOnError bool) NestedPendingOperations { + g := &nestedPendingOperations{ + operations: []operation{}, + exponentialBackOffOnError: exponentialBackOffOnError, + lock: &sync.Mutex{}, + } + g.cond = sync.NewCond(g.lock) + return g +} + +type nestedPendingOperations struct { + operations []operation + exponentialBackOffOnError bool + cond *sync.Cond + lock *sync.Mutex +} + +type operation struct { + volumeName api.UniqueVolumeName + podName types.UniquePodName + operationPending bool + expBackoff exponentialbackoff.ExponentialBackoff +} + +func (grm *nestedPendingOperations) Run( + volumeName api.UniqueVolumeName, + podName types.UniquePodName, + operationFunc func() error) error { + grm.lock.Lock() + defer grm.lock.Unlock() + + var previousOp operation + opExists := false + previousOpIndex := -1 + for previousOpIndex, previousOp = range grm.operations { + if previousOp.volumeName != volumeName { + // No match, keep searching + continue + } + + if previousOp.podName != emptyUniquePodName && + podName != emptyUniquePodName && + previousOp.podName != podName { + // No match, keep searching + continue + } + + // Match + opExists = true + break + } + + if opExists { + // Operation already exists + if previousOp.operationPending { + // Operation is pending + operationName := getOperationName(volumeName, podName) + return NewAlreadyExistsError(operationName) + } + + operationName := getOperationName(volumeName, podName) + if err := previousOp.expBackoff.SafeToRetry(operationName); err != nil { + return err + } + + // Update existing operation to mark as pending. + grm.operations[previousOpIndex].operationPending = true + grm.operations[previousOpIndex].volumeName = volumeName + grm.operations[previousOpIndex].podName = podName + } else { + // Create a new operation + grm.operations = append(grm.operations, + operation{ + operationPending: true, + volumeName: volumeName, + podName: podName, + expBackoff: exponentialbackoff.ExponentialBackoff{}, + }) + } + + go func() (err error) { + // Handle unhandled panics (very unlikely) + defer k8sRuntime.HandleCrash() + // Handle completion of and error, if any, from operationFunc() + defer grm.operationComplete(volumeName, podName, &err) + // Handle panic, if any, from operationFunc() + defer k8sRuntime.RecoverFromPanic(&err) + return operationFunc() + }() + + return nil +} + +func (grm *nestedPendingOperations) getOperation( + volumeName api.UniqueVolumeName, + podName types.UniquePodName) (uint, error) { + // Assumes lock has been acquired by caller. + + for i, op := range grm.operations { + if op.volumeName == volumeName && + op.podName == podName { + return uint(i), nil + } + } + + logOperationName := getOperationName(volumeName, podName) + return 0, fmt.Errorf("Operation %q not found.", logOperationName) +} + +func (grm *nestedPendingOperations) deleteOperation( + // Assumes lock has been acquired by caller. + volumeName api.UniqueVolumeName, + podName types.UniquePodName) { + + opIndex := -1 + for i, op := range grm.operations { + if op.volumeName == volumeName && + op.podName == podName { + opIndex = i + break + } + } + + // Delete index without preserving order + grm.operations[opIndex] = grm.operations[len(grm.operations)-1] + grm.operations = grm.operations[:len(grm.operations)-1] +} + +func (grm *nestedPendingOperations) operationComplete( + volumeName api.UniqueVolumeName, podName types.UniquePodName, err *error) { + // Defer operations are executed in Last-In is First-Out order. In this case + // the lock is acquired first when operationCompletes begins, and is + // released when the method finishes, after the lock is released cond is + // signaled to wake waiting goroutine. + defer grm.cond.Signal() + grm.lock.Lock() + defer grm.lock.Unlock() + + if *err == nil || !grm.exponentialBackOffOnError { + // Operation completed without error, or exponentialBackOffOnError disabled + grm.deleteOperation(volumeName, podName) + if *err != nil { + // Log error + logOperationName := getOperationName(volumeName, podName) + glog.Errorf("operation %s failed with: %v", + logOperationName, + *err) + } + return + } + + // Operation completed with error and exponentialBackOffOnError Enabled + existingOpIndex, getOpErr := grm.getOperation(volumeName, podName) + if getOpErr != nil { + // Failed to find existing operation + logOperationName := getOperationName(volumeName, podName) + glog.Errorf("Operation %s completed. error: %v. exponentialBackOffOnError is enabled, but failed to get operation to update.", + logOperationName, + *err) + return + } + + grm.operations[existingOpIndex].expBackoff.Update(err) + grm.operations[existingOpIndex].operationPending = false + + // Log error + operationName := + getOperationName(volumeName, podName) + glog.Errorf("%v", grm.operations[existingOpIndex].expBackoff. + GenerateNoRetriesPermittedMsg(operationName)) +} + +func (grm *nestedPendingOperations) Wait() { + grm.lock.Lock() + defer grm.lock.Unlock() + + for len(grm.operations) > 0 { + grm.cond.Wait() + } +} + +func getOperationName( + volumeName api.UniqueVolumeName, podName types.UniquePodName) string { + podNameStr := "" + if podName != emptyUniquePodName { + podNameStr = fmt.Sprintf(" (%q)", podName) + } + + return fmt.Sprintf("%q%s", + volumeName, + podNameStr) +} + +// NewAlreadyExistsError returns a new instance of AlreadyExists error. +func NewAlreadyExistsError(operationName string) error { + return alreadyExistsError{operationName} +} + +// IsAlreadyExists returns true if an error returned from +// NestedPendingOperations indicates a new operation can not be started because +// an operation with the same operation name is already executing. +func IsAlreadyExists(err error) bool { + switch err.(type) { + case alreadyExistsError: + return true + default: + return false + } +} + +// alreadyExistsError is the error returned by NestedPendingOperations when a +// new operation can not be started because an operation with the same operation +// name is already executing. +type alreadyExistsError struct { + operationName string +} + +var _ error = alreadyExistsError{} + +func (err alreadyExistsError) Error() string { + return fmt.Sprintf( + "Failed to create operation with name %q. An operation with that name is already executing.", + err.operationName) +} diff --git a/pkg/volume/util/nestedpendingoperations/nestedpendingoperations_test.go b/pkg/volume/util/nestedpendingoperations/nestedpendingoperations_test.go new file mode 100644 index 00000000000..f37ec39ce21 --- /dev/null +++ b/pkg/volume/util/nestedpendingoperations/nestedpendingoperations_test.go @@ -0,0 +1,569 @@ +/* +Copyright 2016 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package nestedpendingoperations + +import ( + "fmt" + "testing" + "time" + + "k8s.io/kubernetes/pkg/api" + "k8s.io/kubernetes/pkg/util/wait" + "k8s.io/kubernetes/pkg/volume/util/types" +) + +const ( + // testTimeout is a timeout of goroutines to finish. This _should_ be just a + // "context switch" and it should take several ms, however, Clayton says "We + // have had flakes due to tests that assumed that 15s is long enough to sleep") + testTimeout time.Duration = 1 * time.Minute + + // initialOperationWaitTimeShort is the initial amount of time the test will + // wait for an operation to complete (each successive failure results in + // exponential backoff). + initialOperationWaitTimeShort time.Duration = 20 * time.Millisecond + + // initialOperationWaitTimeLong is the initial amount of time the test will + // wait for an operation to complete (each successive failure results in + // exponential backoff). + initialOperationWaitTimeLong time.Duration = 500 * time.Millisecond +) + +func Test_NewGoRoutineMap_Positive_SingleOp(t *testing.T) { + // Arrange + grm := NewNestedPendingOperations(false /* exponentialBackOffOnError */) + volumeName := api.UniqueVolumeName("volume-name") + operation := func() error { return nil } + + // Act + err := grm.Run(volumeName, "" /* operationSubName */, operation) + + // Assert + if err != nil { + t.Fatalf("NewGoRoutine failed. Expected: Actual: <%v>", err) + } +} + +func Test_NewGoRoutineMap_Positive_TwoOps(t *testing.T) { + // Arrange + grm := NewNestedPendingOperations(false /* exponentialBackOffOnError */) + volume1Name := api.UniqueVolumeName("volume1-name") + volume2Name := api.UniqueVolumeName("volume2-name") + operation := func() error { return nil } + + // Act + err1 := grm.Run(volume1Name, "" /* operationSubName */, operation) + err2 := grm.Run(volume2Name, "" /* operationSubName */, operation) + + // Assert + if err1 != nil { + t.Fatalf("NewGoRoutine %q failed. Expected: Actual: <%v>", volume1Name, err1) + } + + if err2 != nil { + t.Fatalf("NewGoRoutine %q failed. Expected: Actual: <%v>", volume2Name, err2) + } +} + +func Test_NewGoRoutineMap_Positive_TwoSubOps(t *testing.T) { + // Arrange + grm := NewNestedPendingOperations(false /* exponentialBackOffOnError */) + volumeName := api.UniqueVolumeName("volume-name") + operation1PodName := types.UniquePodName("operation1-podname") + operation2PodName := types.UniquePodName("operation2-podname") + operation := func() error { return nil } + + // Act + err1 := grm.Run(volumeName, operation1PodName, operation) + err2 := grm.Run(volumeName, operation2PodName, operation) + + // Assert + if err1 != nil { + t.Fatalf("NewGoRoutine %q failed. Expected: Actual: <%v>", operation1PodName, err1) + } + + if err2 != nil { + t.Fatalf("NewGoRoutine %q failed. Expected: Actual: <%v>", operation2PodName, err2) + } +} + +func Test_NewGoRoutineMap_Positive_SingleOpWithExpBackoff(t *testing.T) { + // Arrange + grm := NewNestedPendingOperations(true /* exponentialBackOffOnError */) + volumeName := api.UniqueVolumeName("volume-name") + operation := func() error { return nil } + + // Act + err := grm.Run(volumeName, "" /* operationSubName */, operation) + + // Assert + if err != nil { + t.Fatalf("NewGoRoutine failed. Expected: Actual: <%v>", err) + } +} + +func Test_NewGoRoutineMap_Positive_SecondOpAfterFirstCompletes(t *testing.T) { + // Arrange + grm := NewNestedPendingOperations(false /* exponentialBackOffOnError */) + volumeName := api.UniqueVolumeName("volume-name") + operation1DoneCh := make(chan interface{}, 0 /* bufferSize */) + operation1 := generateCallbackFunc(operation1DoneCh) + err1 := grm.Run(volumeName, "" /* operationSubName */, operation1) + if err1 != nil { + t.Fatalf("NewGoRoutine failed. Expected: Actual: <%v>", err1) + } + operation2 := generateNoopFunc() + <-operation1DoneCh // Force operation1 to complete + + // Act + err2 := retryWithExponentialBackOff( + time.Duration(initialOperationWaitTimeShort), + func() (bool, error) { + err := grm.Run(volumeName, "" /* operationSubName */, operation2) + if err != nil { + t.Logf("Warning: NewGoRoutine failed with %v. Will retry.", err) + return false, nil + } + return true, nil + }, + ) + + // Assert + if err2 != nil { + t.Fatalf("NewGoRoutine failed. Expected: Actual: <%v>", err2) + } +} + +func Test_NewGoRoutineMap_Positive_SecondOpAfterFirstCompletesWithExpBackoff(t *testing.T) { + // Arrange + grm := NewNestedPendingOperations(true /* exponentialBackOffOnError */) + volumeName := api.UniqueVolumeName("volume-name") + operation1DoneCh := make(chan interface{}, 0 /* bufferSize */) + operation1 := generateCallbackFunc(operation1DoneCh) + err1 := grm.Run(volumeName, "" /* operationSubName */, operation1) + if err1 != nil { + t.Fatalf("NewGoRoutine failed. Expected: Actual: <%v>", err1) + } + operation2 := generateNoopFunc() + <-operation1DoneCh // Force operation1 to complete + + // Act + err2 := retryWithExponentialBackOff( + time.Duration(initialOperationWaitTimeShort), + func() (bool, error) { + err := grm.Run(volumeName, "" /* operationSubName */, operation2) + if err != nil { + t.Logf("Warning: NewGoRoutine failed with %v. Will retry.", err) + return false, nil + } + return true, nil + }, + ) + + // Assert + if err2 != nil { + t.Fatalf("NewGoRoutine failed. Expected: Actual: <%v>", err2) + } +} + +func Test_NewGoRoutineMap_Positive_SecondOpAfterFirstPanics(t *testing.T) { + // Arrange + grm := NewNestedPendingOperations(false /* exponentialBackOffOnError */) + volumeName := api.UniqueVolumeName("volume-name") + operation1 := generatePanicFunc() + err1 := grm.Run(volumeName, "" /* operationSubName */, operation1) + if err1 != nil { + t.Fatalf("NewGoRoutine failed. Expected: Actual: <%v>", err1) + } + operation2 := generateNoopFunc() + + // Act + err2 := retryWithExponentialBackOff( + time.Duration(initialOperationWaitTimeShort), + func() (bool, error) { + err := grm.Run(volumeName, "" /* operationSubName */, operation2) + if err != nil { + t.Logf("Warning: NewGoRoutine failed with %v. Will retry.", err) + return false, nil + } + return true, nil + }, + ) + + // Assert + if err2 != nil { + t.Fatalf("NewGoRoutine failed. Expected: Actual: <%v>", err2) + } +} + +func Test_NewGoRoutineMap_Positive_SecondOpAfterFirstPanicsWithExpBackoff(t *testing.T) { + // Arrange + grm := NewNestedPendingOperations(true /* exponentialBackOffOnError */) + volumeName := api.UniqueVolumeName("volume-name") + operation1 := generatePanicFunc() + err1 := grm.Run(volumeName, "" /* operationSubName */, operation1) + if err1 != nil { + t.Fatalf("NewGoRoutine failed. Expected: Actual: <%v>", err1) + } + operation2 := generateNoopFunc() + + // Act + err2 := retryWithExponentialBackOff( + time.Duration(initialOperationWaitTimeLong), // Longer duration to accommodate for backoff + func() (bool, error) { + err := grm.Run(volumeName, "" /* operationSubName */, operation2) + if err != nil { + t.Logf("Warning: NewGoRoutine failed with %v. Will retry.", err) + return false, nil + } + return true, nil + }, + ) + + // Assert + if err2 != nil { + t.Fatalf("NewGoRoutine failed. Expected: Actual: <%v>", err2) + } +} + +func Test_NewGoRoutineMap_Negative_SecondOpBeforeFirstCompletes(t *testing.T) { + // Arrange + grm := NewNestedPendingOperations(false /* exponentialBackOffOnError */) + volumeName := api.UniqueVolumeName("volume-name") + operation1DoneCh := make(chan interface{}, 0 /* bufferSize */) + operation1 := generateWaitFunc(operation1DoneCh) + err1 := grm.Run(volumeName, "" /* operationSubName */, operation1) + if err1 != nil { + t.Fatalf("NewGoRoutine failed. Expected: Actual: <%v>", err1) + } + operation2 := generateNoopFunc() + + // Act + err2 := grm.Run(volumeName, "" /* operationSubName */, operation2) + + // Assert + if err2 == nil { + t.Fatalf("NewGoRoutine did not fail. Expected: Actual: ", volumeName) + } + if !IsAlreadyExists(err2) { + t.Fatalf("NewGoRoutine did not return alreadyExistsError, got: %v", err2) + } +} + +func Test_NewGoRoutineMap_Negative_SecondSubOpBeforeFirstCompletes2(t *testing.T) { + // Arrange + grm := NewNestedPendingOperations(false /* exponentialBackOffOnError */) + volumeName := api.UniqueVolumeName("volume-name") + operationPodName := types.UniquePodName("operation-podname") + operation1DoneCh := make(chan interface{}, 0 /* bufferSize */) + operation1 := generateWaitFunc(operation1DoneCh) + err1 := grm.Run(volumeName, operationPodName, operation1) + if err1 != nil { + t.Fatalf("NewGoRoutine failed. Expected: Actual: <%v>", err1) + } + operation2 := generateNoopFunc() + + // Act + err2 := grm.Run(volumeName, operationPodName, operation2) + + // Assert + if err2 == nil { + t.Fatalf("NewGoRoutine did not fail. Expected: Actual: ", volumeName) + } + if !IsAlreadyExists(err2) { + t.Fatalf("NewGoRoutine did not return alreadyExistsError, got: %v", err2) + } +} + +func Test_NewGoRoutineMap_Negative_SecondSubOpBeforeFirstCompletes(t *testing.T) { + // Arrange + grm := NewNestedPendingOperations(false /* exponentialBackOffOnError */) + volumeName := api.UniqueVolumeName("volume-name") + operationPodName := types.UniquePodName("operation-podname") + operation1DoneCh := make(chan interface{}, 0 /* bufferSize */) + operation1 := generateWaitFunc(operation1DoneCh) + err1 := grm.Run(volumeName, operationPodName, operation1) + if err1 != nil { + t.Fatalf("NewGoRoutine failed. Expected: Actual: <%v>", err1) + } + operation2 := generateNoopFunc() + + // Act + err2 := grm.Run(volumeName, operationPodName, operation2) + + // Assert + if err2 == nil { + t.Fatalf("NewGoRoutine did not fail. Expected: Actual: ", volumeName) + } + if !IsAlreadyExists(err2) { + t.Fatalf("NewGoRoutine did not return alreadyExistsError, got: %v", err2) + } +} + +func Test_NewGoRoutineMap_Negative_SecondOpBeforeFirstCompletesWithExpBackoff(t *testing.T) { + // Arrange + grm := NewNestedPendingOperations(true /* exponentialBackOffOnError */) + volumeName := api.UniqueVolumeName("volume-name") + operation1DoneCh := make(chan interface{}, 0 /* bufferSize */) + operation1 := generateWaitFunc(operation1DoneCh) + err1 := grm.Run(volumeName, "" /* operationSubName */, operation1) + if err1 != nil { + t.Fatalf("NewGoRoutine failed. Expected: Actual: <%v>", err1) + } + operation2 := generateNoopFunc() + + // Act + err2 := grm.Run(volumeName, "" /* operationSubName */, operation2) + + // Assert + if err2 == nil { + t.Fatalf("NewGoRoutine did not fail. Expected: Actual: ", volumeName) + } + if !IsAlreadyExists(err2) { + t.Fatalf("NewGoRoutine did not return alreadyExistsError, got: %v", err2) + } +} + +func Test_NewGoRoutineMap_Positive_ThirdOpAfterFirstCompletes(t *testing.T) { + // Arrange + grm := NewNestedPendingOperations(false /* exponentialBackOffOnError */) + volumeName := api.UniqueVolumeName("volume-name") + operation1DoneCh := make(chan interface{}, 0 /* bufferSize */) + operation1 := generateWaitFunc(operation1DoneCh) + err1 := grm.Run(volumeName, "" /* operationSubName */, operation1) + if err1 != nil { + t.Fatalf("NewGoRoutine failed. Expected: Actual: <%v>", err1) + } + operation2 := generateNoopFunc() + operation3 := generateNoopFunc() + + // Act + err2 := grm.Run(volumeName, "" /* operationSubName */, operation2) + + // Assert + if err2 == nil { + t.Fatalf("NewGoRoutine did not fail. Expected: Actual: ", volumeName) + } + if !IsAlreadyExists(err2) { + t.Fatalf("NewGoRoutine did not return alreadyExistsError, got: %v", err2) + } + + // Act + operation1DoneCh <- true // Force operation1 to complete + err3 := retryWithExponentialBackOff( + time.Duration(initialOperationWaitTimeShort), + func() (bool, error) { + err := grm.Run(volumeName, "" /* operationSubName */, operation3) + if err != nil { + t.Logf("Warning: NewGoRoutine failed with %v. Will retry.", err) + return false, nil + } + return true, nil + }, + ) + + // Assert + if err3 != nil { + t.Fatalf("NewGoRoutine failed. Expected: Actual: <%v>", err3) + } +} + +func Test_NewGoRoutineMap_Positive_ThirdOpAfterFirstCompletesWithExpBackoff(t *testing.T) { + // Arrange + grm := NewNestedPendingOperations(true /* exponentialBackOffOnError */) + volumeName := api.UniqueVolumeName("volume-name") + operation1DoneCh := make(chan interface{}, 0 /* bufferSize */) + operation1 := generateWaitFunc(operation1DoneCh) + err1 := grm.Run(volumeName, "" /* operationSubName */, operation1) + if err1 != nil { + t.Fatalf("NewGoRoutine failed. Expected: Actual: <%v>", err1) + } + operation2 := generateNoopFunc() + operation3 := generateNoopFunc() + + // Act + err2 := grm.Run(volumeName, "" /* operationSubName */, operation2) + + // Assert + if err2 == nil { + t.Fatalf("NewGoRoutine did not fail. Expected: Actual: ", volumeName) + } + if !IsAlreadyExists(err2) { + t.Fatalf("NewGoRoutine did not return alreadyExistsError, got: %v", err2) + } + + // Act + operation1DoneCh <- true // Force operation1 to complete + err3 := retryWithExponentialBackOff( + time.Duration(initialOperationWaitTimeShort), + func() (bool, error) { + err := grm.Run(volumeName, "" /* operationSubName */, operation3) + if err != nil { + t.Logf("Warning: NewGoRoutine failed with %v. Will retry.", err) + return false, nil + } + return true, nil + }, + ) + + // Assert + if err3 != nil { + t.Fatalf("NewGoRoutine failed. Expected: Actual: <%v>", err3) + } +} + +func Test_NewGoRoutineMap_Positive_WaitEmpty(t *testing.T) { + // Test than Wait() on empty GoRoutineMap always succeeds without blocking + // Arrange + grm := NewNestedPendingOperations(false /* exponentialBackOffOnError */) + + // Act + waitDoneCh := make(chan interface{}, 1) + go func() { + grm.Wait() + waitDoneCh <- true + }() + + // Assert + err := waitChannelWithTimeout(waitDoneCh, testTimeout) + if err != nil { + t.Errorf("Error waiting for GoRoutineMap.Wait: %v", err) + } +} + +func Test_NewGoRoutineMap_Positive_WaitEmptyWithExpBackoff(t *testing.T) { + // Test than Wait() on empty GoRoutineMap always succeeds without blocking + // Arrange + grm := NewNestedPendingOperations(true /* exponentialBackOffOnError */) + + // Act + waitDoneCh := make(chan interface{}, 1) + go func() { + grm.Wait() + waitDoneCh <- true + }() + + // Assert + err := waitChannelWithTimeout(waitDoneCh, testTimeout) + if err != nil { + t.Errorf("Error waiting for GoRoutineMap.Wait: %v", err) + } +} + +func Test_NewGoRoutineMap_Positive_Wait(t *testing.T) { + // Test that Wait() really blocks until the last operation succeeds + // Arrange + grm := NewNestedPendingOperations(false /* exponentialBackOffOnError */) + volumeName := api.UniqueVolumeName("volume-name") + operation1DoneCh := make(chan interface{}, 0 /* bufferSize */) + operation1 := generateWaitFunc(operation1DoneCh) + err := grm.Run(volumeName, "" /* operationSubName */, operation1) + if err != nil { + t.Fatalf("NewGoRoutine failed. Expected: Actual: <%v>", err) + } + + // Act + waitDoneCh := make(chan interface{}, 1) + go func() { + grm.Wait() + waitDoneCh <- true + }() + + // Finish the operation + operation1DoneCh <- true + + // Assert + err = waitChannelWithTimeout(waitDoneCh, testTimeout) + if err != nil { + t.Fatalf("Error waiting for GoRoutineMap.Wait: %v", err) + } +} + +func Test_NewGoRoutineMap_Positive_WaitWithExpBackoff(t *testing.T) { + // Test that Wait() really blocks until the last operation succeeds + // Arrange + grm := NewNestedPendingOperations(true /* exponentialBackOffOnError */) + volumeName := api.UniqueVolumeName("volume-name") + operation1DoneCh := make(chan interface{}, 0 /* bufferSize */) + operation1 := generateWaitFunc(operation1DoneCh) + err := grm.Run(volumeName, "" /* operationSubName */, operation1) + if err != nil { + t.Fatalf("NewGoRoutine failed. Expected: Actual: <%v>", err) + } + + // Act + waitDoneCh := make(chan interface{}, 1) + go func() { + grm.Wait() + waitDoneCh <- true + }() + + // Finish the operation + operation1DoneCh <- true + + // Assert + err = waitChannelWithTimeout(waitDoneCh, testTimeout) + if err != nil { + t.Fatalf("Error waiting for GoRoutineMap.Wait: %v", err) + } +} + +func generateCallbackFunc(done chan<- interface{}) func() error { + return func() error { + done <- true + return nil + } +} + +func generateWaitFunc(done <-chan interface{}) func() error { + return func() error { + <-done + return nil + } +} + +func generatePanicFunc() func() error { + return func() error { + panic("testing panic") + } +} + +func generateNoopFunc() func() error { + return func() error { return nil } +} + +func retryWithExponentialBackOff(initialDuration time.Duration, fn wait.ConditionFunc) error { + backoff := wait.Backoff{ + Duration: initialDuration, + Factor: 3, + Jitter: 0, + Steps: 4, + } + return wait.ExponentialBackoff(backoff, fn) +} + +func waitChannelWithTimeout(ch <-chan interface{}, timeout time.Duration) error { + timer := time.NewTimer(timeout) + + select { + case <-ch: + // Success! + return nil + case <-timer.C: + return fmt.Errorf("timeout after %v", timeout) + } +} diff --git a/pkg/volume/util/operationexecutor/operation_executor.go b/pkg/volume/util/operationexecutor/operation_executor.go index c969f9256a2..5475c2b7b68 100644 --- a/pkg/volume/util/operationexecutor/operation_executor.go +++ b/pkg/volume/util/operationexecutor/operation_executor.go @@ -15,8 +15,8 @@ limitations under the License. */ // Package operationexecutor implements interfaces that enable execution of -// attach, detach, mount, and unmount operations with a goroutinemap so that -// more than one operation is never triggered on the same volume. +// attach, detach, mount, and unmount operations with a nestedpendingoperations +// so that more than one operation is never triggered on the same volume. package operationexecutor import ( @@ -27,14 +27,15 @@ import ( "k8s.io/kubernetes/pkg/api" "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset" "k8s.io/kubernetes/pkg/types" - "k8s.io/kubernetes/pkg/util/goroutinemap" "k8s.io/kubernetes/pkg/util/mount" "k8s.io/kubernetes/pkg/volume" + "k8s.io/kubernetes/pkg/volume/util/nestedpendingoperations" volumetypes "k8s.io/kubernetes/pkg/volume/util/types" + "k8s.io/kubernetes/pkg/volume/util/volumehelper" ) // OperationExecutor defines a set of operations for attaching, detaching, -// mounting, or unmounting a volume that are executed with a goroutinemap which +// mounting, or unmounting a volume that are executed with a NewNestedPendingOperations which // prevents more than one operation from being triggered on the same volume. // // These operations should be idempotent (for example, AttachVolume should @@ -106,7 +107,7 @@ func NewOperationExecutor( return &operationExecutor{ kubeClient: kubeClient, volumePluginMgr: volumePluginMgr, - pendingOperations: goroutinemap.NewGoRoutineMap( + pendingOperations: nestedpendingoperations.NewNestedPendingOperations( true /* exponentialBackOffOnError */), } } @@ -328,7 +329,7 @@ type operationExecutor struct { // pendingOperations keeps track of pending attach and detach operations so // multiple operations are not started on the same volume - pendingOperations goroutinemap.GoRoutineMap + pendingOperations nestedpendingoperations.NestedPendingOperations } func (oe *operationExecutor) AttachVolume( @@ -341,7 +342,7 @@ func (oe *operationExecutor) AttachVolume( } return oe.pendingOperations.Run( - string(volumeToAttach.VolumeName), attachFunc) + volumeToAttach.VolumeName, "" /* podName */, attachFunc) } func (oe *operationExecutor) DetachVolume( @@ -355,7 +356,7 @@ func (oe *operationExecutor) DetachVolume( } return oe.pendingOperations.Run( - string(volumeToDetach.VolumeName), detachFunc) + volumeToDetach.VolumeName, "" /* podName */, detachFunc) } func (oe *operationExecutor) MountVolume( @@ -368,8 +369,15 @@ func (oe *operationExecutor) MountVolume( return err } + podName := volumetypes.UniquePodName("") + if !volumeToMount.PluginIsAttachable { + // Non-attachable volume plugins can execute mount for multiple pods + // referencing the same volume in parallel + podName = volumehelper.GetUniquePodName(volumeToMount.Pod) + } + return oe.pendingOperations.Run( - string(volumeToMount.VolumeName), mountFunc) + volumeToMount.VolumeName, podName, mountFunc) } func (oe *operationExecutor) UnmountVolume( @@ -381,8 +389,12 @@ func (oe *operationExecutor) UnmountVolume( return err } + // All volume plugins can execute mount for multiple pods referencing the + // same volume in parallel + podName := volumetypes.UniquePodName(volumeToUnmount.PodUID) + return oe.pendingOperations.Run( - string(volumeToUnmount.VolumeName), unmountFunc) + volumeToUnmount.VolumeName, podName, unmountFunc) } func (oe *operationExecutor) UnmountDevice( @@ -396,7 +408,7 @@ func (oe *operationExecutor) UnmountDevice( } return oe.pendingOperations.Run( - string(deviceToDetach.VolumeName), unmountDeviceFunc) + deviceToDetach.VolumeName, "" /* podName */, unmountDeviceFunc) } func (oe *operationExecutor) VerifyControllerAttachedVolume( @@ -410,7 +422,7 @@ func (oe *operationExecutor) VerifyControllerAttachedVolume( } return oe.pendingOperations.Run( - string(volumeToMount.VolumeName), verifyControllerAttachedVolumeFunc) + volumeToMount.VolumeName, "" /* podName */, verifyControllerAttachedVolumeFunc) } func (oe *operationExecutor) generateAttachVolumeFunc(