Merge pull request #28939 from saad-ali/fixIssue28616ParallelMount

Automatic merge from submit-queue

Allow mounts to run in parallel for non-attachable volumes

This PR:
* Fixes https://github.com/kubernetes/kubernetes/issues/28616
  * Enables mount volume operations to run in parallel for non-attachable volume plugins.
  * Enables unmount volume operations to run in parallel for all volume plugins.
* Renames `GoRoutineMap` to `GoroutineMap`, resolving a long outstanding request from @thockin: `"Goroutine" is a noun`
This commit is contained in:
k8s-merge-robot 2016-07-20 14:56:58 -07:00 committed by GitHub
commit 4379619a63
10 changed files with 1131 additions and 156 deletions

View File

@ -25,8 +25,9 @@ import (
"github.com/golang/glog"
"k8s.io/kubernetes/pkg/controller/volume/attachdetach/cache"
"k8s.io/kubernetes/pkg/controller/volume/attachdetach/statusupdater"
"k8s.io/kubernetes/pkg/util/goroutinemap"
"k8s.io/kubernetes/pkg/util/goroutinemap/exponentialbackoff"
"k8s.io/kubernetes/pkg/util/wait"
"k8s.io/kubernetes/pkg/volume/util/nestedpendingoperations"
"k8s.io/kubernetes/pkg/volume/util/operationexecutor"
)
@ -114,9 +115,9 @@ func (rc *reconciler) reconciliationLoopFunc() func() {
glog.Infof("Started DetachVolume for volume %q from node %q", attachedVolume.VolumeName, attachedVolume.NodeName)
}
if err != nil &&
!goroutinemap.IsAlreadyExists(err) &&
!goroutinemap.IsExponentialBackoff(err) {
// Ignore goroutinemap.IsAlreadyExists && goroutinemap.IsExponentialBackoff errors, they are expected.
!nestedpendingoperations.IsAlreadyExists(err) &&
!exponentialbackoff.IsExponentialBackoff(err) {
// Ignore nestedpendingoperations.IsAlreadyExists && exponentialbackoff.IsExponentialBackoff errors, they are expected.
// Log all other errors.
glog.Errorf(
"operationExecutor.DetachVolume failed to start for volume %q (spec.Name: %q) from node %q with err: %v",
@ -134,9 +135,9 @@ func (rc *reconciler) reconciliationLoopFunc() func() {
glog.Infof("Started DetachVolume for volume %q from node %q due to maxWaitForUnmountDuration expiry.", attachedVolume.VolumeName, attachedVolume.NodeName)
}
if err != nil &&
!goroutinemap.IsAlreadyExists(err) &&
!goroutinemap.IsExponentialBackoff(err) {
// Ignore goroutinemap.IsAlreadyExists && goroutinemap.IsExponentialBackoff errors, they are expected.
!nestedpendingoperations.IsAlreadyExists(err) &&
!exponentialbackoff.IsExponentialBackoff(err) {
// Ignore nestedpendingoperations.IsAlreadyExists && exponentialbackoff.IsExponentialBackoff errors, they are expected.
// Log all other errors.
glog.Errorf(
"operationExecutor.DetachVolume failed to start (maxWaitForUnmountDuration expiry) for volume %q (spec.Name: %q) from node %q with err: %v",
@ -169,9 +170,9 @@ func (rc *reconciler) reconciliationLoopFunc() func() {
glog.Infof("Started AttachVolume for volume %q to node %q", volumeToAttach.VolumeName, volumeToAttach.NodeName)
}
if err != nil &&
!goroutinemap.IsAlreadyExists(err) &&
!goroutinemap.IsExponentialBackoff(err) {
// Ignore goroutinemap.IsAlreadyExists && goroutinemap.IsExponentialBackoff errors, they are expected.
!nestedpendingoperations.IsAlreadyExists(err) &&
!exponentialbackoff.IsExponentialBackoff(err) {
// Ignore nestedpendingoperations.IsAlreadyExists && exponentialbackoff.IsExponentialBackoff errors, they are expected.
// Log all other errors.
glog.Errorf(
"operationExecutor.AttachVolume failed to start for volume %q (spec.Name: %q) to node %q with err: %v",

View File

@ -25,9 +25,10 @@ import (
"github.com/golang/glog"
"k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset"
"k8s.io/kubernetes/pkg/kubelet/volumemanager/cache"
"k8s.io/kubernetes/pkg/util/goroutinemap"
"k8s.io/kubernetes/pkg/util/goroutinemap/exponentialbackoff"
"k8s.io/kubernetes/pkg/util/mount"
"k8s.io/kubernetes/pkg/util/wait"
"k8s.io/kubernetes/pkg/volume/util/nestedpendingoperations"
"k8s.io/kubernetes/pkg/volume/util/operationexecutor"
)
@ -122,9 +123,9 @@ func (rc *reconciler) reconciliationLoopFunc() func() {
err := rc.operationExecutor.UnmountVolume(
mountedVolume.MountedVolume, rc.actualStateOfWorld)
if err != nil &&
!goroutinemap.IsAlreadyExists(err) &&
!goroutinemap.IsExponentialBackoff(err) {
// Ignore goroutinemap.IsAlreadyExists and goroutinemap.IsExponentialBackoff errors, they are expected.
!nestedpendingoperations.IsAlreadyExists(err) &&
!exponentialbackoff.IsExponentialBackoff(err) {
// Ignore nestedpendingoperations.IsAlreadyExists and exponentialbackoff.IsExponentialBackoff errors, they are expected.
// Log all other errors.
glog.Errorf(
"operationExecutor.UnmountVolume failed for volume %q (spec.Name: %q) pod %q (UID: %q) controllerAttachDetachEnabled: %v with err: %v",
@ -163,9 +164,9 @@ func (rc *reconciler) reconciliationLoopFunc() func() {
rc.hostName,
rc.actualStateOfWorld)
if err != nil &&
!goroutinemap.IsAlreadyExists(err) &&
!goroutinemap.IsExponentialBackoff(err) {
// Ignore goroutinemap.IsAlreadyExists and goroutinemap.IsExponentialBackoff errors, they are expected.
!nestedpendingoperations.IsAlreadyExists(err) &&
!exponentialbackoff.IsExponentialBackoff(err) {
// Ignore nestedpendingoperations.IsAlreadyExists and exponentialbackoff.IsExponentialBackoff errors, they are expected.
// Log all other errors.
glog.Errorf(
"operationExecutor.VerifyControllerAttachedVolume failed for volume %q (spec.Name: %q) pod %q (UID: %q) controllerAttachDetachEnabled: %v with err: %v",
@ -198,9 +199,9 @@ func (rc *reconciler) reconciliationLoopFunc() func() {
volumeToMount.Pod.UID)
err := rc.operationExecutor.AttachVolume(volumeToAttach, rc.actualStateOfWorld)
if err != nil &&
!goroutinemap.IsAlreadyExists(err) &&
!goroutinemap.IsExponentialBackoff(err) {
// Ignore goroutinemap.IsAlreadyExists and goroutinemap.IsExponentialBackoff errors, they are expected.
!nestedpendingoperations.IsAlreadyExists(err) &&
!exponentialbackoff.IsExponentialBackoff(err) {
// Ignore nestedpendingoperations.IsAlreadyExists and exponentialbackoff.IsExponentialBackoff errors, they are expected.
// Log all other errors.
glog.Errorf(
"operationExecutor.AttachVolume failed for volume %q (spec.Name: %q) pod %q (UID: %q) controllerAttachDetachEnabled: %v with err: %v",
@ -236,9 +237,9 @@ func (rc *reconciler) reconciliationLoopFunc() func() {
volumeToMount.VolumeToMount,
rc.actualStateOfWorld)
if err != nil &&
!goroutinemap.IsAlreadyExists(err) &&
!goroutinemap.IsExponentialBackoff(err) {
// Ignore goroutinemap.IsAlreadyExists and goroutinemap.IsExponentialBackoff errors, they are expected.
!nestedpendingoperations.IsAlreadyExists(err) &&
!exponentialbackoff.IsExponentialBackoff(err) {
// Ignore nestedpendingoperations.IsAlreadyExists and exponentialbackoff.IsExponentialBackoff errors, they are expected.
// Log all other errors.
glog.Errorf(
"operationExecutor.MountVolume failed for volume %q (spec.Name: %q) pod %q (UID: %q) controllerAttachDetachEnabled: %v with err: %v",
@ -271,9 +272,9 @@ func (rc *reconciler) reconciliationLoopFunc() func() {
err := rc.operationExecutor.UnmountDevice(
attachedVolume.AttachedVolume, rc.actualStateOfWorld, rc.mounter)
if err != nil &&
!goroutinemap.IsAlreadyExists(err) &&
!goroutinemap.IsExponentialBackoff(err) {
// Ignore goroutinemap.IsAlreadyExists and goroutinemap.IsExponentialBackoff errors, they are expected.
!nestedpendingoperations.IsAlreadyExists(err) &&
!exponentialbackoff.IsExponentialBackoff(err) {
// Ignore nestedpendingoperations.IsAlreadyExists and exponentialbackoff.IsExponentialBackoff errors, they are expected.
// Log all other errors.
glog.Errorf(
"operationExecutor.UnmountDevice failed for volume %q (spec.Name: %q) controllerAttachDetachEnabled: %v with err: %v",
@ -302,9 +303,9 @@ func (rc *reconciler) reconciliationLoopFunc() func() {
err := rc.operationExecutor.DetachVolume(
attachedVolume.AttachedVolume, false /* verifySafeToDetach */, rc.actualStateOfWorld)
if err != nil &&
!goroutinemap.IsAlreadyExists(err) &&
!goroutinemap.IsExponentialBackoff(err) {
// Ignore goroutinemap.IsAlreadyExists && goroutinemap.IsExponentialBackoff errors, they are expected.
!nestedpendingoperations.IsAlreadyExists(err) &&
!exponentialbackoff.IsExponentialBackoff(err) {
// Ignore nestedpendingoperations.IsAlreadyExists && exponentialbackoff.IsExponentialBackoff errors, they are expected.
// Log all other errors.
glog.Errorf(
"operationExecutor.DetachVolume failed for volume %q (spec.Name: %q) controllerAttachDetachEnabled: %v with err: %v",

View File

@ -0,0 +1,120 @@
/*
Copyright 2016 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
// Package exponentialbackoff contains logic for implementing exponential
// backoff for GoRoutineMap and NestedPendingOperations.
package exponentialbackoff
import (
"fmt"
"time"
)
const (
// initialDurationBeforeRetry is the amount of time after an error occurs
// that GoroutineMap will refuse to allow another operation to start with
// the same target (if exponentialBackOffOnError is enabled). Each
// successive error results in a wait 2x times the previous.
initialDurationBeforeRetry time.Duration = 500 * time.Millisecond
// maxDurationBeforeRetry is the maximum amount of time that
// durationBeforeRetry will grow to due to exponential backoff.
maxDurationBeforeRetry time.Duration = 2 * time.Minute
)
// ExponentialBackoff contains the last occurrence of an error and the duration
// that retries are not permitted.
type ExponentialBackoff struct {
lastError error
lastErrorTime time.Time
durationBeforeRetry time.Duration
}
// SafeToRetry returns an error if the durationBeforeRetry period for the given
// lastErrorTime has not yet expired. Otherwise it returns nil.
func (expBackoff *ExponentialBackoff) SafeToRetry(operationName string) error {
if time.Since(expBackoff.lastErrorTime) <= expBackoff.durationBeforeRetry {
return NewExponentialBackoffError(operationName, *expBackoff)
}
return nil
}
func (expBackoff *ExponentialBackoff) Update(err *error) {
if expBackoff.durationBeforeRetry == 0 {
expBackoff.durationBeforeRetry = initialDurationBeforeRetry
} else {
expBackoff.durationBeforeRetry = 2 * expBackoff.durationBeforeRetry
if expBackoff.durationBeforeRetry > maxDurationBeforeRetry {
expBackoff.durationBeforeRetry = maxDurationBeforeRetry
}
}
expBackoff.lastError = *err
expBackoff.lastErrorTime = time.Now()
}
func (expBackoff *ExponentialBackoff) GenerateNoRetriesPermittedMsg(
operationName string) string {
return fmt.Sprintf("Operation for %q failed. No retries permitted until %v (durationBeforeRetry %v). Error: %v",
operationName,
expBackoff.lastErrorTime.Add(expBackoff.durationBeforeRetry),
expBackoff.durationBeforeRetry,
expBackoff.lastError)
}
// NewExponentialBackoffError returns a new instance of ExponentialBackoff error.
func NewExponentialBackoffError(
operationName string, expBackoff ExponentialBackoff) error {
return exponentialBackoffError{
operationName: operationName,
expBackoff: expBackoff,
}
}
// IsExponentialBackoff returns true if an error returned from GoroutineMap
// indicates that a new operation can not be started because
// exponentialBackOffOnError is enabled and a previous operation with the same
// operation failed within the durationBeforeRetry period.
func IsExponentialBackoff(err error) bool {
switch err.(type) {
case exponentialBackoffError:
return true
default:
return false
}
}
// exponentialBackoffError is the error returned returned from GoroutineMap when
// a new operation can not be started because exponentialBackOffOnError is
// enabled and a previous operation with the same operation failed within the
// durationBeforeRetry period.
type exponentialBackoffError struct {
operationName string
expBackoff ExponentialBackoff
}
var _ error = exponentialBackoffError{}
func (err exponentialBackoffError) Error() string {
return fmt.Sprintf(
"Failed to create operation with name %q. An operation with that name failed at %v. No retries permitted until %v (%v). Last error: %q.",
err.operationName,
err.expBackoff.lastErrorTime,
err.expBackoff.lastErrorTime.Add(err.expBackoff.durationBeforeRetry),
err.expBackoff.durationBeforeRetry,
err.expBackoff.lastError)
}

View File

@ -23,18 +23,18 @@ package goroutinemap
import (
"fmt"
"runtime"
"sync"
"time"
"github.com/golang/glog"
"k8s.io/kubernetes/pkg/util/goroutinemap/exponentialbackoff"
k8sRuntime "k8s.io/kubernetes/pkg/util/runtime"
)
const (
// initialDurationBeforeRetry is the amount of time after an error occurs
// that GoRoutineMap will refuse to allow another operation to start with
// the same operationName (if exponentialBackOffOnError is enabled). Each
// the same operation name (if exponentialBackOffOnError is enabled). Each
// successive error results in a wait 2x times the previous.
initialDurationBeforeRetry time.Duration = 500 * time.Millisecond
@ -45,12 +45,13 @@ const (
// GoRoutineMap defines the supported set of operations.
type GoRoutineMap interface {
// Run adds operationName to the list of running operations and spawns a new
// go routine to execute the operation. If an operation with the same name
// already exists, an error is returned. Once the operation is complete, the
// go routine is terminated and the operationName is removed from the list
// of executing operations allowing a new operation to be started with the
// same name without error.
// Run adds operation name to the list of running operations and spawns a
// new go routine to execute the operation.
// If an operation with the same operation name already exists, an
// AlreadyExists or ExponentialBackoff error is returned.
// Once the operation is complete, the go routine is terminated and the
// operation name is removed from the list of executing operations allowing
// a new operation to be started with the same operation name without error.
Run(operationName string, operationFunc func() error) error
// Wait blocks until all operations are completed. This is typically
@ -64,8 +65,10 @@ func NewGoRoutineMap(exponentialBackOffOnError bool) GoRoutineMap {
g := &goRoutineMap{
operations: make(map[string]operation),
exponentialBackOffOnError: exponentialBackOffOnError,
lock: &sync.Mutex{},
}
g.cond = sync.NewCond(g)
g.cond = sync.NewCond(g.lock)
return g
}
@ -73,36 +76,35 @@ type goRoutineMap struct {
operations map[string]operation
exponentialBackOffOnError bool
cond *sync.Cond
sync.Mutex
lock *sync.Mutex
}
type operation struct {
operationPending bool
lastError error
lastErrorTime time.Time
durationBeforeRetry time.Duration
operationPending bool
expBackoff exponentialbackoff.ExponentialBackoff
}
func (grm *goRoutineMap) Run(operationName string, operationFunc func() error) error {
grm.Lock()
defer grm.Unlock()
func (grm *goRoutineMap) Run(
operationName string,
operationFunc func() error) error {
grm.lock.Lock()
defer grm.lock.Unlock()
existingOp, exists := grm.operations[operationName]
if exists {
// Operation with name exists
if existingOp.operationPending {
return newAlreadyExistsError(operationName)
return NewAlreadyExistsError(operationName)
}
if time.Since(existingOp.lastErrorTime) <= existingOp.durationBeforeRetry {
return newExponentialBackoffError(operationName, existingOp)
if err := existingOp.expBackoff.SafeToRetry(operationName); err != nil {
return err
}
}
grm.operations[operationName] = operation{
operationPending: true,
lastError: existingOp.lastError,
lastErrorTime: existingOp.lastErrorTime,
durationBeforeRetry: existingOp.durationBeforeRetry,
operationPending: true,
expBackoff: existingOp.expBackoff,
}
go func() (err error) {
// Handle unhandled panics (very unlikely)
@ -110,17 +112,22 @@ func (grm *goRoutineMap) Run(operationName string, operationFunc func() error) e
// Handle completion of and error, if any, from operationFunc()
defer grm.operationComplete(operationName, &err)
// Handle panic, if any, from operationFunc()
defer recoverFromPanic(operationName, &err)
defer k8sRuntime.RecoverFromPanic(&err)
return operationFunc()
}()
return nil
}
func (grm *goRoutineMap) operationComplete(operationName string, err *error) {
func (grm *goRoutineMap) operationComplete(
operationName string, err *error) {
// Defer operations are executed in Last-In is First-Out order. In this case
// the lock is acquired first when operationCompletes begins, and is
// released when the method finishes, after the lock is released cond is
// signaled to wake waiting goroutine.
defer grm.cond.Signal()
grm.Lock()
defer grm.Unlock()
grm.lock.Lock()
defer grm.lock.Unlock()
if *err == nil || !grm.exponentialBackOffOnError {
// Operation completed without error, or exponentialBackOffOnError disabled
@ -134,75 +141,33 @@ func (grm *goRoutineMap) operationComplete(operationName string, err *error) {
} else {
// Operation completed with error and exponentialBackOffOnError Enabled
existingOp := grm.operations[operationName]
if existingOp.durationBeforeRetry == 0 {
existingOp.durationBeforeRetry = initialDurationBeforeRetry
} else {
existingOp.durationBeforeRetry = 2 * existingOp.durationBeforeRetry
if existingOp.durationBeforeRetry > maxDurationBeforeRetry {
existingOp.durationBeforeRetry = maxDurationBeforeRetry
}
}
existingOp.lastError = *err
existingOp.lastErrorTime = time.Now()
existingOp.expBackoff.Update(err)
existingOp.operationPending = false
grm.operations[operationName] = existingOp
// Log error
glog.Errorf("Operation for %q failed. No retries permitted until %v (durationBeforeRetry %v). error: %v",
operationName,
existingOp.lastErrorTime.Add(existingOp.durationBeforeRetry),
existingOp.durationBeforeRetry,
*err)
glog.Errorf("%v",
existingOp.expBackoff.GenerateNoRetriesPermittedMsg(operationName))
}
}
func (grm *goRoutineMap) Wait() {
grm.Lock()
defer grm.Unlock()
grm.lock.Lock()
defer grm.lock.Unlock()
for len(grm.operations) > 0 {
grm.cond.Wait()
}
}
func recoverFromPanic(operationName string, err *error) {
if r := recover(); r != nil {
callers := ""
for i := 0; true; i++ {
_, file, line, ok := runtime.Caller(i)
if !ok {
break
}
callers = callers + fmt.Sprintf("%v:%v\n", file, line)
}
*err = fmt.Errorf(
"operation for %q recovered from panic %q. (err=%v) Call stack:\n%v",
operationName,
r,
*err,
callers)
}
}
// alreadyExistsError is the error returned when NewGoRoutine() detects that
// an operation with the given name is already running.
type alreadyExistsError struct {
operationName string
}
var _ error = alreadyExistsError{}
func (err alreadyExistsError) Error() string {
return fmt.Sprintf("Failed to create operation with name %q. An operation with that name is already executing.", err.operationName)
}
func newAlreadyExistsError(operationName string) error {
// NewAlreadyExistsError returns a new instance of AlreadyExists error.
func NewAlreadyExistsError(operationName string) error {
return alreadyExistsError{operationName}
}
// IsAlreadyExists returns true if an error returned from NewGoRoutine indicates
// that operation with the same name already exists.
// IsAlreadyExists returns true if an error returned from GoRoutineMap indicates
// a new operation can not be started because an operation with the same
// operation name is already executing.
func IsAlreadyExists(err error) bool {
switch err.(type) {
case alreadyExistsError:
@ -212,42 +177,17 @@ func IsAlreadyExists(err error) bool {
}
}
// exponentialBackoffError is the error returned when NewGoRoutine() detects
// that the previous operation for given name failed less then
// durationBeforeRetry.
type exponentialBackoffError struct {
// alreadyExistsError is the error returned by GoRoutineMap when a new operation
// can not be started because an operation with the same operation name is
// already executing.
type alreadyExistsError struct {
operationName string
failedOp operation
}
var _ error = exponentialBackoffError{}
var _ error = alreadyExistsError{}
func (err exponentialBackoffError) Error() string {
func (err alreadyExistsError) Error() string {
return fmt.Sprintf(
"Failed to create operation with name %q. An operation with that name failed at %v. No retries permitted until %v (%v). Last error: %q.",
err.operationName,
err.failedOp.lastErrorTime,
err.failedOp.lastErrorTime.Add(err.failedOp.durationBeforeRetry),
err.failedOp.durationBeforeRetry,
err.failedOp.lastError)
}
func newExponentialBackoffError(
operationName string, failedOp operation) error {
return exponentialBackoffError{
operationName: operationName,
failedOp: failedOp,
}
}
// IsExponentialBackoff returns true if an error returned from NewGoRoutine()
// indicates that the previous operation for given name failed less then
// durationBeforeRetry.
func IsExponentialBackoff(err error) bool {
switch err.(type) {
case exponentialBackoffError:
return true
default:
return false
}
"Failed to create operation with name %q. An operation with that name is already executing.",
err.operationName)
}

View File

@ -56,6 +56,27 @@ func Test_NewGoRoutineMap_Positive_SingleOp(t *testing.T) {
}
}
func Test_NewGoRoutineMap_Positive_TwoOps(t *testing.T) {
// Arrange
grm := NewGoRoutineMap(false /* exponentialBackOffOnError */)
operation1Name := "operation1-name"
operation2Name := "operation2-name"
operation := func() error { return nil }
// Act
err1 := grm.Run(operation1Name, operation)
err2 := grm.Run(operation2Name, operation)
// Assert
if err1 != nil {
t.Fatalf("NewGoRoutine %q failed. Expected: <no error> Actual: <%v>", operation1Name, err1)
}
if err2 != nil {
t.Fatalf("NewGoRoutine %q failed. Expected: <no error> Actual: <%v>", operation2Name, err2)
}
}
func Test_NewGoRoutineMap_Positive_SingleOpWithExpBackoff(t *testing.T) {
// Arrange
grm := NewGoRoutineMap(true /* exponentialBackOffOnError */)

View File

@ -18,8 +18,9 @@ package runtime
import (
"fmt"
"github.com/golang/glog"
"runtime"
"github.com/golang/glog"
)
var (
@ -59,6 +60,11 @@ func HandleCrash(additionalHandlers ...func(interface{})) {
// logPanic logs the caller tree when a panic occurs.
func logPanic(r interface{}) {
callers := getCallers(r)
glog.Errorf("Observed a panic: %#v (%v)\n%v", r, r, callers)
}
func getCallers(r interface{}) string {
callers := ""
for i := 0; true; i++ {
_, file, line, ok := runtime.Caller(i)
@ -67,7 +73,8 @@ func logPanic(r interface{}) {
}
callers = callers + fmt.Sprintf("%v:%v\n", file, line)
}
glog.Errorf("Observed a panic: %#v (%v)\n%v", r, r, callers)
return callers
}
// ErrorHandlers is a list of functions which will be invoked when an unreturnable
@ -104,3 +111,18 @@ func GetCaller() string {
}
return f.Name()
}
// RecoverFromPanic replaces the specified error with an error containing the
// original error, and the call tree when a panic occurs. This enables error
// handlers to handle errors and panics the same way.
func RecoverFromPanic(err *error) {
if r := recover(); r != nil {
callers := getCallers(r)
*err = fmt.Errorf(
"recovered from panic %q. (err=%v) Call stack:\n%v",
r,
*err,
callers)
}
}

View File

@ -0,0 +1,2 @@
assignees:
- saad-ali

View File

@ -0,0 +1,287 @@
/*
Copyright 2016 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
/*
Package nestedpendingoperations is a modified implementation of
pkg/util/goroutinemap. It implements a data structure for managing go routines
by volume/pod name. It prevents the creation of new go routines if an existing
go routine for the volume already exists. It also allows multiple operations to
execute in parallel for the same volume as long as they are operating on
different pods.
*/
package nestedpendingoperations
import (
"fmt"
"sync"
"github.com/golang/glog"
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/util/goroutinemap/exponentialbackoff"
k8sRuntime "k8s.io/kubernetes/pkg/util/runtime"
"k8s.io/kubernetes/pkg/volume/util/types"
)
const (
// emptyUniquePodName is a UniquePodName for empty string.
emptyUniquePodName types.UniquePodName = types.UniquePodName("")
)
// NestedPendingOperations defines the supported set of operations.
type NestedPendingOperations interface {
// Run adds the concatenation of volumeName and podName to the list of
// running operations and spawns a new go routine to execute operationFunc.
// If an operation with the same volumeName and same or empty podName
// exists, an AlreadyExists or ExponentialBackoff error is returned.
// This enables multiple operations to execute in parallel for the same
// volumeName as long as they have different podName.
// Once the operation is complete, the go routine is terminated and the
// concatenation of volumeName and podName is removed from the list of
// executing operations allowing a new operation to be started with the
// volumeName without error.
Run(volumeName api.UniqueVolumeName, podName types.UniquePodName, operationFunc func() error) error
// Wait blocks until all operations are completed. This is typically
// necessary during tests - the test should wait until all operations finish
// and evaluate results after that.
Wait()
}
// NewNestedPendingOperations returns a new instance of NestedPendingOperations.
func NewNestedPendingOperations(exponentialBackOffOnError bool) NestedPendingOperations {
g := &nestedPendingOperations{
operations: []operation{},
exponentialBackOffOnError: exponentialBackOffOnError,
lock: &sync.Mutex{},
}
g.cond = sync.NewCond(g.lock)
return g
}
type nestedPendingOperations struct {
operations []operation
exponentialBackOffOnError bool
cond *sync.Cond
lock *sync.Mutex
}
type operation struct {
volumeName api.UniqueVolumeName
podName types.UniquePodName
operationPending bool
expBackoff exponentialbackoff.ExponentialBackoff
}
func (grm *nestedPendingOperations) Run(
volumeName api.UniqueVolumeName,
podName types.UniquePodName,
operationFunc func() error) error {
grm.lock.Lock()
defer grm.lock.Unlock()
var previousOp operation
opExists := false
previousOpIndex := -1
for previousOpIndex, previousOp = range grm.operations {
if previousOp.volumeName != volumeName {
// No match, keep searching
continue
}
if previousOp.podName != emptyUniquePodName &&
podName != emptyUniquePodName &&
previousOp.podName != podName {
// No match, keep searching
continue
}
// Match
opExists = true
break
}
if opExists {
// Operation already exists
if previousOp.operationPending {
// Operation is pending
operationName := getOperationName(volumeName, podName)
return NewAlreadyExistsError(operationName)
}
operationName := getOperationName(volumeName, podName)
if err := previousOp.expBackoff.SafeToRetry(operationName); err != nil {
return err
}
// Update existing operation to mark as pending.
grm.operations[previousOpIndex].operationPending = true
grm.operations[previousOpIndex].volumeName = volumeName
grm.operations[previousOpIndex].podName = podName
} else {
// Create a new operation
grm.operations = append(grm.operations,
operation{
operationPending: true,
volumeName: volumeName,
podName: podName,
expBackoff: exponentialbackoff.ExponentialBackoff{},
})
}
go func() (err error) {
// Handle unhandled panics (very unlikely)
defer k8sRuntime.HandleCrash()
// Handle completion of and error, if any, from operationFunc()
defer grm.operationComplete(volumeName, podName, &err)
// Handle panic, if any, from operationFunc()
defer k8sRuntime.RecoverFromPanic(&err)
return operationFunc()
}()
return nil
}
func (grm *nestedPendingOperations) getOperation(
volumeName api.UniqueVolumeName,
podName types.UniquePodName) (uint, error) {
// Assumes lock has been acquired by caller.
for i, op := range grm.operations {
if op.volumeName == volumeName &&
op.podName == podName {
return uint(i), nil
}
}
logOperationName := getOperationName(volumeName, podName)
return 0, fmt.Errorf("Operation %q not found.", logOperationName)
}
func (grm *nestedPendingOperations) deleteOperation(
// Assumes lock has been acquired by caller.
volumeName api.UniqueVolumeName,
podName types.UniquePodName) {
opIndex := -1
for i, op := range grm.operations {
if op.volumeName == volumeName &&
op.podName == podName {
opIndex = i
break
}
}
// Delete index without preserving order
grm.operations[opIndex] = grm.operations[len(grm.operations)-1]
grm.operations = grm.operations[:len(grm.operations)-1]
}
func (grm *nestedPendingOperations) operationComplete(
volumeName api.UniqueVolumeName, podName types.UniquePodName, err *error) {
// Defer operations are executed in Last-In is First-Out order. In this case
// the lock is acquired first when operationCompletes begins, and is
// released when the method finishes, after the lock is released cond is
// signaled to wake waiting goroutine.
defer grm.cond.Signal()
grm.lock.Lock()
defer grm.lock.Unlock()
if *err == nil || !grm.exponentialBackOffOnError {
// Operation completed without error, or exponentialBackOffOnError disabled
grm.deleteOperation(volumeName, podName)
if *err != nil {
// Log error
logOperationName := getOperationName(volumeName, podName)
glog.Errorf("operation %s failed with: %v",
logOperationName,
*err)
}
return
}
// Operation completed with error and exponentialBackOffOnError Enabled
existingOpIndex, getOpErr := grm.getOperation(volumeName, podName)
if getOpErr != nil {
// Failed to find existing operation
logOperationName := getOperationName(volumeName, podName)
glog.Errorf("Operation %s completed. error: %v. exponentialBackOffOnError is enabled, but failed to get operation to update.",
logOperationName,
*err)
return
}
grm.operations[existingOpIndex].expBackoff.Update(err)
grm.operations[existingOpIndex].operationPending = false
// Log error
operationName :=
getOperationName(volumeName, podName)
glog.Errorf("%v", grm.operations[existingOpIndex].expBackoff.
GenerateNoRetriesPermittedMsg(operationName))
}
func (grm *nestedPendingOperations) Wait() {
grm.lock.Lock()
defer grm.lock.Unlock()
for len(grm.operations) > 0 {
grm.cond.Wait()
}
}
func getOperationName(
volumeName api.UniqueVolumeName, podName types.UniquePodName) string {
podNameStr := ""
if podName != emptyUniquePodName {
podNameStr = fmt.Sprintf(" (%q)", podName)
}
return fmt.Sprintf("%q%s",
volumeName,
podNameStr)
}
// NewAlreadyExistsError returns a new instance of AlreadyExists error.
func NewAlreadyExistsError(operationName string) error {
return alreadyExistsError{operationName}
}
// IsAlreadyExists returns true if an error returned from
// NestedPendingOperations indicates a new operation can not be started because
// an operation with the same operation name is already executing.
func IsAlreadyExists(err error) bool {
switch err.(type) {
case alreadyExistsError:
return true
default:
return false
}
}
// alreadyExistsError is the error returned by NestedPendingOperations when a
// new operation can not be started because an operation with the same operation
// name is already executing.
type alreadyExistsError struct {
operationName string
}
var _ error = alreadyExistsError{}
func (err alreadyExistsError) Error() string {
return fmt.Sprintf(
"Failed to create operation with name %q. An operation with that name is already executing.",
err.operationName)
}

View File

@ -0,0 +1,569 @@
/*
Copyright 2016 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package nestedpendingoperations
import (
"fmt"
"testing"
"time"
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/util/wait"
"k8s.io/kubernetes/pkg/volume/util/types"
)
const (
// testTimeout is a timeout of goroutines to finish. This _should_ be just a
// "context switch" and it should take several ms, however, Clayton says "We
// have had flakes due to tests that assumed that 15s is long enough to sleep")
testTimeout time.Duration = 1 * time.Minute
// initialOperationWaitTimeShort is the initial amount of time the test will
// wait for an operation to complete (each successive failure results in
// exponential backoff).
initialOperationWaitTimeShort time.Duration = 20 * time.Millisecond
// initialOperationWaitTimeLong is the initial amount of time the test will
// wait for an operation to complete (each successive failure results in
// exponential backoff).
initialOperationWaitTimeLong time.Duration = 500 * time.Millisecond
)
func Test_NewGoRoutineMap_Positive_SingleOp(t *testing.T) {
// Arrange
grm := NewNestedPendingOperations(false /* exponentialBackOffOnError */)
volumeName := api.UniqueVolumeName("volume-name")
operation := func() error { return nil }
// Act
err := grm.Run(volumeName, "" /* operationSubName */, operation)
// Assert
if err != nil {
t.Fatalf("NewGoRoutine failed. Expected: <no error> Actual: <%v>", err)
}
}
func Test_NewGoRoutineMap_Positive_TwoOps(t *testing.T) {
// Arrange
grm := NewNestedPendingOperations(false /* exponentialBackOffOnError */)
volume1Name := api.UniqueVolumeName("volume1-name")
volume2Name := api.UniqueVolumeName("volume2-name")
operation := func() error { return nil }
// Act
err1 := grm.Run(volume1Name, "" /* operationSubName */, operation)
err2 := grm.Run(volume2Name, "" /* operationSubName */, operation)
// Assert
if err1 != nil {
t.Fatalf("NewGoRoutine %q failed. Expected: <no error> Actual: <%v>", volume1Name, err1)
}
if err2 != nil {
t.Fatalf("NewGoRoutine %q failed. Expected: <no error> Actual: <%v>", volume2Name, err2)
}
}
func Test_NewGoRoutineMap_Positive_TwoSubOps(t *testing.T) {
// Arrange
grm := NewNestedPendingOperations(false /* exponentialBackOffOnError */)
volumeName := api.UniqueVolumeName("volume-name")
operation1PodName := types.UniquePodName("operation1-podname")
operation2PodName := types.UniquePodName("operation2-podname")
operation := func() error { return nil }
// Act
err1 := grm.Run(volumeName, operation1PodName, operation)
err2 := grm.Run(volumeName, operation2PodName, operation)
// Assert
if err1 != nil {
t.Fatalf("NewGoRoutine %q failed. Expected: <no error> Actual: <%v>", operation1PodName, err1)
}
if err2 != nil {
t.Fatalf("NewGoRoutine %q failed. Expected: <no error> Actual: <%v>", operation2PodName, err2)
}
}
func Test_NewGoRoutineMap_Positive_SingleOpWithExpBackoff(t *testing.T) {
// Arrange
grm := NewNestedPendingOperations(true /* exponentialBackOffOnError */)
volumeName := api.UniqueVolumeName("volume-name")
operation := func() error { return nil }
// Act
err := grm.Run(volumeName, "" /* operationSubName */, operation)
// Assert
if err != nil {
t.Fatalf("NewGoRoutine failed. Expected: <no error> Actual: <%v>", err)
}
}
func Test_NewGoRoutineMap_Positive_SecondOpAfterFirstCompletes(t *testing.T) {
// Arrange
grm := NewNestedPendingOperations(false /* exponentialBackOffOnError */)
volumeName := api.UniqueVolumeName("volume-name")
operation1DoneCh := make(chan interface{}, 0 /* bufferSize */)
operation1 := generateCallbackFunc(operation1DoneCh)
err1 := grm.Run(volumeName, "" /* operationSubName */, operation1)
if err1 != nil {
t.Fatalf("NewGoRoutine failed. Expected: <no error> Actual: <%v>", err1)
}
operation2 := generateNoopFunc()
<-operation1DoneCh // Force operation1 to complete
// Act
err2 := retryWithExponentialBackOff(
time.Duration(initialOperationWaitTimeShort),
func() (bool, error) {
err := grm.Run(volumeName, "" /* operationSubName */, operation2)
if err != nil {
t.Logf("Warning: NewGoRoutine failed with %v. Will retry.", err)
return false, nil
}
return true, nil
},
)
// Assert
if err2 != nil {
t.Fatalf("NewGoRoutine failed. Expected: <no error> Actual: <%v>", err2)
}
}
func Test_NewGoRoutineMap_Positive_SecondOpAfterFirstCompletesWithExpBackoff(t *testing.T) {
// Arrange
grm := NewNestedPendingOperations(true /* exponentialBackOffOnError */)
volumeName := api.UniqueVolumeName("volume-name")
operation1DoneCh := make(chan interface{}, 0 /* bufferSize */)
operation1 := generateCallbackFunc(operation1DoneCh)
err1 := grm.Run(volumeName, "" /* operationSubName */, operation1)
if err1 != nil {
t.Fatalf("NewGoRoutine failed. Expected: <no error> Actual: <%v>", err1)
}
operation2 := generateNoopFunc()
<-operation1DoneCh // Force operation1 to complete
// Act
err2 := retryWithExponentialBackOff(
time.Duration(initialOperationWaitTimeShort),
func() (bool, error) {
err := grm.Run(volumeName, "" /* operationSubName */, operation2)
if err != nil {
t.Logf("Warning: NewGoRoutine failed with %v. Will retry.", err)
return false, nil
}
return true, nil
},
)
// Assert
if err2 != nil {
t.Fatalf("NewGoRoutine failed. Expected: <no error> Actual: <%v>", err2)
}
}
func Test_NewGoRoutineMap_Positive_SecondOpAfterFirstPanics(t *testing.T) {
// Arrange
grm := NewNestedPendingOperations(false /* exponentialBackOffOnError */)
volumeName := api.UniqueVolumeName("volume-name")
operation1 := generatePanicFunc()
err1 := grm.Run(volumeName, "" /* operationSubName */, operation1)
if err1 != nil {
t.Fatalf("NewGoRoutine failed. Expected: <no error> Actual: <%v>", err1)
}
operation2 := generateNoopFunc()
// Act
err2 := retryWithExponentialBackOff(
time.Duration(initialOperationWaitTimeShort),
func() (bool, error) {
err := grm.Run(volumeName, "" /* operationSubName */, operation2)
if err != nil {
t.Logf("Warning: NewGoRoutine failed with %v. Will retry.", err)
return false, nil
}
return true, nil
},
)
// Assert
if err2 != nil {
t.Fatalf("NewGoRoutine failed. Expected: <no error> Actual: <%v>", err2)
}
}
func Test_NewGoRoutineMap_Positive_SecondOpAfterFirstPanicsWithExpBackoff(t *testing.T) {
// Arrange
grm := NewNestedPendingOperations(true /* exponentialBackOffOnError */)
volumeName := api.UniqueVolumeName("volume-name")
operation1 := generatePanicFunc()
err1 := grm.Run(volumeName, "" /* operationSubName */, operation1)
if err1 != nil {
t.Fatalf("NewGoRoutine failed. Expected: <no error> Actual: <%v>", err1)
}
operation2 := generateNoopFunc()
// Act
err2 := retryWithExponentialBackOff(
time.Duration(initialOperationWaitTimeLong), // Longer duration to accommodate for backoff
func() (bool, error) {
err := grm.Run(volumeName, "" /* operationSubName */, operation2)
if err != nil {
t.Logf("Warning: NewGoRoutine failed with %v. Will retry.", err)
return false, nil
}
return true, nil
},
)
// Assert
if err2 != nil {
t.Fatalf("NewGoRoutine failed. Expected: <no error> Actual: <%v>", err2)
}
}
func Test_NewGoRoutineMap_Negative_SecondOpBeforeFirstCompletes(t *testing.T) {
// Arrange
grm := NewNestedPendingOperations(false /* exponentialBackOffOnError */)
volumeName := api.UniqueVolumeName("volume-name")
operation1DoneCh := make(chan interface{}, 0 /* bufferSize */)
operation1 := generateWaitFunc(operation1DoneCh)
err1 := grm.Run(volumeName, "" /* operationSubName */, operation1)
if err1 != nil {
t.Fatalf("NewGoRoutine failed. Expected: <no error> Actual: <%v>", err1)
}
operation2 := generateNoopFunc()
// Act
err2 := grm.Run(volumeName, "" /* operationSubName */, operation2)
// Assert
if err2 == nil {
t.Fatalf("NewGoRoutine did not fail. Expected: <Failed to create operation with name \"%s\". An operation with that name already exists.> Actual: <no error>", volumeName)
}
if !IsAlreadyExists(err2) {
t.Fatalf("NewGoRoutine did not return alreadyExistsError, got: %v", err2)
}
}
func Test_NewGoRoutineMap_Negative_SecondSubOpBeforeFirstCompletes2(t *testing.T) {
// Arrange
grm := NewNestedPendingOperations(false /* exponentialBackOffOnError */)
volumeName := api.UniqueVolumeName("volume-name")
operationPodName := types.UniquePodName("operation-podname")
operation1DoneCh := make(chan interface{}, 0 /* bufferSize */)
operation1 := generateWaitFunc(operation1DoneCh)
err1 := grm.Run(volumeName, operationPodName, operation1)
if err1 != nil {
t.Fatalf("NewGoRoutine failed. Expected: <no error> Actual: <%v>", err1)
}
operation2 := generateNoopFunc()
// Act
err2 := grm.Run(volumeName, operationPodName, operation2)
// Assert
if err2 == nil {
t.Fatalf("NewGoRoutine did not fail. Expected: <Failed to create operation with name \"%s\". An operation with that name already exists.> Actual: <no error>", volumeName)
}
if !IsAlreadyExists(err2) {
t.Fatalf("NewGoRoutine did not return alreadyExistsError, got: %v", err2)
}
}
func Test_NewGoRoutineMap_Negative_SecondSubOpBeforeFirstCompletes(t *testing.T) {
// Arrange
grm := NewNestedPendingOperations(false /* exponentialBackOffOnError */)
volumeName := api.UniqueVolumeName("volume-name")
operationPodName := types.UniquePodName("operation-podname")
operation1DoneCh := make(chan interface{}, 0 /* bufferSize */)
operation1 := generateWaitFunc(operation1DoneCh)
err1 := grm.Run(volumeName, operationPodName, operation1)
if err1 != nil {
t.Fatalf("NewGoRoutine failed. Expected: <no error> Actual: <%v>", err1)
}
operation2 := generateNoopFunc()
// Act
err2 := grm.Run(volumeName, operationPodName, operation2)
// Assert
if err2 == nil {
t.Fatalf("NewGoRoutine did not fail. Expected: <Failed to create operation with name \"%s\". An operation with that name already exists.> Actual: <no error>", volumeName)
}
if !IsAlreadyExists(err2) {
t.Fatalf("NewGoRoutine did not return alreadyExistsError, got: %v", err2)
}
}
func Test_NewGoRoutineMap_Negative_SecondOpBeforeFirstCompletesWithExpBackoff(t *testing.T) {
// Arrange
grm := NewNestedPendingOperations(true /* exponentialBackOffOnError */)
volumeName := api.UniqueVolumeName("volume-name")
operation1DoneCh := make(chan interface{}, 0 /* bufferSize */)
operation1 := generateWaitFunc(operation1DoneCh)
err1 := grm.Run(volumeName, "" /* operationSubName */, operation1)
if err1 != nil {
t.Fatalf("NewGoRoutine failed. Expected: <no error> Actual: <%v>", err1)
}
operation2 := generateNoopFunc()
// Act
err2 := grm.Run(volumeName, "" /* operationSubName */, operation2)
// Assert
if err2 == nil {
t.Fatalf("NewGoRoutine did not fail. Expected: <Failed to create operation with name \"%s\". An operation with that name already exists.> Actual: <no error>", volumeName)
}
if !IsAlreadyExists(err2) {
t.Fatalf("NewGoRoutine did not return alreadyExistsError, got: %v", err2)
}
}
func Test_NewGoRoutineMap_Positive_ThirdOpAfterFirstCompletes(t *testing.T) {
// Arrange
grm := NewNestedPendingOperations(false /* exponentialBackOffOnError */)
volumeName := api.UniqueVolumeName("volume-name")
operation1DoneCh := make(chan interface{}, 0 /* bufferSize */)
operation1 := generateWaitFunc(operation1DoneCh)
err1 := grm.Run(volumeName, "" /* operationSubName */, operation1)
if err1 != nil {
t.Fatalf("NewGoRoutine failed. Expected: <no error> Actual: <%v>", err1)
}
operation2 := generateNoopFunc()
operation3 := generateNoopFunc()
// Act
err2 := grm.Run(volumeName, "" /* operationSubName */, operation2)
// Assert
if err2 == nil {
t.Fatalf("NewGoRoutine did not fail. Expected: <Failed to create operation with name \"%s\". An operation with that name already exists.> Actual: <no error>", volumeName)
}
if !IsAlreadyExists(err2) {
t.Fatalf("NewGoRoutine did not return alreadyExistsError, got: %v", err2)
}
// Act
operation1DoneCh <- true // Force operation1 to complete
err3 := retryWithExponentialBackOff(
time.Duration(initialOperationWaitTimeShort),
func() (bool, error) {
err := grm.Run(volumeName, "" /* operationSubName */, operation3)
if err != nil {
t.Logf("Warning: NewGoRoutine failed with %v. Will retry.", err)
return false, nil
}
return true, nil
},
)
// Assert
if err3 != nil {
t.Fatalf("NewGoRoutine failed. Expected: <no error> Actual: <%v>", err3)
}
}
func Test_NewGoRoutineMap_Positive_ThirdOpAfterFirstCompletesWithExpBackoff(t *testing.T) {
// Arrange
grm := NewNestedPendingOperations(true /* exponentialBackOffOnError */)
volumeName := api.UniqueVolumeName("volume-name")
operation1DoneCh := make(chan interface{}, 0 /* bufferSize */)
operation1 := generateWaitFunc(operation1DoneCh)
err1 := grm.Run(volumeName, "" /* operationSubName */, operation1)
if err1 != nil {
t.Fatalf("NewGoRoutine failed. Expected: <no error> Actual: <%v>", err1)
}
operation2 := generateNoopFunc()
operation3 := generateNoopFunc()
// Act
err2 := grm.Run(volumeName, "" /* operationSubName */, operation2)
// Assert
if err2 == nil {
t.Fatalf("NewGoRoutine did not fail. Expected: <Failed to create operation with name \"%s\". An operation with that name already exists.> Actual: <no error>", volumeName)
}
if !IsAlreadyExists(err2) {
t.Fatalf("NewGoRoutine did not return alreadyExistsError, got: %v", err2)
}
// Act
operation1DoneCh <- true // Force operation1 to complete
err3 := retryWithExponentialBackOff(
time.Duration(initialOperationWaitTimeShort),
func() (bool, error) {
err := grm.Run(volumeName, "" /* operationSubName */, operation3)
if err != nil {
t.Logf("Warning: NewGoRoutine failed with %v. Will retry.", err)
return false, nil
}
return true, nil
},
)
// Assert
if err3 != nil {
t.Fatalf("NewGoRoutine failed. Expected: <no error> Actual: <%v>", err3)
}
}
func Test_NewGoRoutineMap_Positive_WaitEmpty(t *testing.T) {
// Test than Wait() on empty GoRoutineMap always succeeds without blocking
// Arrange
grm := NewNestedPendingOperations(false /* exponentialBackOffOnError */)
// Act
waitDoneCh := make(chan interface{}, 1)
go func() {
grm.Wait()
waitDoneCh <- true
}()
// Assert
err := waitChannelWithTimeout(waitDoneCh, testTimeout)
if err != nil {
t.Errorf("Error waiting for GoRoutineMap.Wait: %v", err)
}
}
func Test_NewGoRoutineMap_Positive_WaitEmptyWithExpBackoff(t *testing.T) {
// Test than Wait() on empty GoRoutineMap always succeeds without blocking
// Arrange
grm := NewNestedPendingOperations(true /* exponentialBackOffOnError */)
// Act
waitDoneCh := make(chan interface{}, 1)
go func() {
grm.Wait()
waitDoneCh <- true
}()
// Assert
err := waitChannelWithTimeout(waitDoneCh, testTimeout)
if err != nil {
t.Errorf("Error waiting for GoRoutineMap.Wait: %v", err)
}
}
func Test_NewGoRoutineMap_Positive_Wait(t *testing.T) {
// Test that Wait() really blocks until the last operation succeeds
// Arrange
grm := NewNestedPendingOperations(false /* exponentialBackOffOnError */)
volumeName := api.UniqueVolumeName("volume-name")
operation1DoneCh := make(chan interface{}, 0 /* bufferSize */)
operation1 := generateWaitFunc(operation1DoneCh)
err := grm.Run(volumeName, "" /* operationSubName */, operation1)
if err != nil {
t.Fatalf("NewGoRoutine failed. Expected: <no error> Actual: <%v>", err)
}
// Act
waitDoneCh := make(chan interface{}, 1)
go func() {
grm.Wait()
waitDoneCh <- true
}()
// Finish the operation
operation1DoneCh <- true
// Assert
err = waitChannelWithTimeout(waitDoneCh, testTimeout)
if err != nil {
t.Fatalf("Error waiting for GoRoutineMap.Wait: %v", err)
}
}
func Test_NewGoRoutineMap_Positive_WaitWithExpBackoff(t *testing.T) {
// Test that Wait() really blocks until the last operation succeeds
// Arrange
grm := NewNestedPendingOperations(true /* exponentialBackOffOnError */)
volumeName := api.UniqueVolumeName("volume-name")
operation1DoneCh := make(chan interface{}, 0 /* bufferSize */)
operation1 := generateWaitFunc(operation1DoneCh)
err := grm.Run(volumeName, "" /* operationSubName */, operation1)
if err != nil {
t.Fatalf("NewGoRoutine failed. Expected: <no error> Actual: <%v>", err)
}
// Act
waitDoneCh := make(chan interface{}, 1)
go func() {
grm.Wait()
waitDoneCh <- true
}()
// Finish the operation
operation1DoneCh <- true
// Assert
err = waitChannelWithTimeout(waitDoneCh, testTimeout)
if err != nil {
t.Fatalf("Error waiting for GoRoutineMap.Wait: %v", err)
}
}
func generateCallbackFunc(done chan<- interface{}) func() error {
return func() error {
done <- true
return nil
}
}
func generateWaitFunc(done <-chan interface{}) func() error {
return func() error {
<-done
return nil
}
}
func generatePanicFunc() func() error {
return func() error {
panic("testing panic")
}
}
func generateNoopFunc() func() error {
return func() error { return nil }
}
func retryWithExponentialBackOff(initialDuration time.Duration, fn wait.ConditionFunc) error {
backoff := wait.Backoff{
Duration: initialDuration,
Factor: 3,
Jitter: 0,
Steps: 4,
}
return wait.ExponentialBackoff(backoff, fn)
}
func waitChannelWithTimeout(ch <-chan interface{}, timeout time.Duration) error {
timer := time.NewTimer(timeout)
select {
case <-ch:
// Success!
return nil
case <-timer.C:
return fmt.Errorf("timeout after %v", timeout)
}
}

View File

@ -15,8 +15,8 @@ limitations under the License.
*/
// Package operationexecutor implements interfaces that enable execution of
// attach, detach, mount, and unmount operations with a goroutinemap so that
// more than one operation is never triggered on the same volume.
// attach, detach, mount, and unmount operations with a nestedpendingoperations
// so that more than one operation is never triggered on the same volume.
package operationexecutor
import (
@ -27,14 +27,15 @@ import (
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset"
"k8s.io/kubernetes/pkg/types"
"k8s.io/kubernetes/pkg/util/goroutinemap"
"k8s.io/kubernetes/pkg/util/mount"
"k8s.io/kubernetes/pkg/volume"
"k8s.io/kubernetes/pkg/volume/util/nestedpendingoperations"
volumetypes "k8s.io/kubernetes/pkg/volume/util/types"
"k8s.io/kubernetes/pkg/volume/util/volumehelper"
)
// OperationExecutor defines a set of operations for attaching, detaching,
// mounting, or unmounting a volume that are executed with a goroutinemap which
// mounting, or unmounting a volume that are executed with a NewNestedPendingOperations which
// prevents more than one operation from being triggered on the same volume.
//
// These operations should be idempotent (for example, AttachVolume should
@ -106,7 +107,7 @@ func NewOperationExecutor(
return &operationExecutor{
kubeClient: kubeClient,
volumePluginMgr: volumePluginMgr,
pendingOperations: goroutinemap.NewGoRoutineMap(
pendingOperations: nestedpendingoperations.NewNestedPendingOperations(
true /* exponentialBackOffOnError */),
}
}
@ -328,7 +329,7 @@ type operationExecutor struct {
// pendingOperations keeps track of pending attach and detach operations so
// multiple operations are not started on the same volume
pendingOperations goroutinemap.GoRoutineMap
pendingOperations nestedpendingoperations.NestedPendingOperations
}
func (oe *operationExecutor) AttachVolume(
@ -341,7 +342,7 @@ func (oe *operationExecutor) AttachVolume(
}
return oe.pendingOperations.Run(
string(volumeToAttach.VolumeName), attachFunc)
volumeToAttach.VolumeName, "" /* podName */, attachFunc)
}
func (oe *operationExecutor) DetachVolume(
@ -355,7 +356,7 @@ func (oe *operationExecutor) DetachVolume(
}
return oe.pendingOperations.Run(
string(volumeToDetach.VolumeName), detachFunc)
volumeToDetach.VolumeName, "" /* podName */, detachFunc)
}
func (oe *operationExecutor) MountVolume(
@ -368,8 +369,15 @@ func (oe *operationExecutor) MountVolume(
return err
}
podName := volumetypes.UniquePodName("")
if !volumeToMount.PluginIsAttachable {
// Non-attachable volume plugins can execute mount for multiple pods
// referencing the same volume in parallel
podName = volumehelper.GetUniquePodName(volumeToMount.Pod)
}
return oe.pendingOperations.Run(
string(volumeToMount.VolumeName), mountFunc)
volumeToMount.VolumeName, podName, mountFunc)
}
func (oe *operationExecutor) UnmountVolume(
@ -381,8 +389,12 @@ func (oe *operationExecutor) UnmountVolume(
return err
}
// All volume plugins can execute mount for multiple pods referencing the
// same volume in parallel
podName := volumetypes.UniquePodName(volumeToUnmount.PodUID)
return oe.pendingOperations.Run(
string(volumeToUnmount.VolumeName), unmountFunc)
volumeToUnmount.VolumeName, podName, unmountFunc)
}
func (oe *operationExecutor) UnmountDevice(
@ -396,7 +408,7 @@ func (oe *operationExecutor) UnmountDevice(
}
return oe.pendingOperations.Run(
string(deviceToDetach.VolumeName), unmountDeviceFunc)
deviceToDetach.VolumeName, "" /* podName */, unmountDeviceFunc)
}
func (oe *operationExecutor) VerifyControllerAttachedVolume(
@ -410,7 +422,7 @@ func (oe *operationExecutor) VerifyControllerAttachedVolume(
}
return oe.pendingOperations.Run(
string(volumeToMount.VolumeName), verifyControllerAttachedVolumeFunc)
volumeToMount.VolumeName, "" /* podName */, verifyControllerAttachedVolumeFunc)
}
func (oe *operationExecutor) generateAttachVolumeFunc(