diff --git a/pkg/controller/persistentvolume/persistentvolume_controller.go b/pkg/controller/persistentvolume/persistentvolume_controller.go index 988cce6b27a..3cf1c4627d6 100644 --- a/pkg/controller/persistentvolume/persistentvolume_controller.go +++ b/pkg/controller/persistentvolume/persistentvolume_controller.go @@ -18,6 +18,7 @@ package persistentvolume import ( "fmt" + "sync" "time" "k8s.io/kubernetes/pkg/api" @@ -93,6 +94,19 @@ type PersistentVolumeController struct { kubeClient clientset.Interface eventRecorder record.EventRecorder cloud cloudprovider.Interface + + // PersistentVolumeController keeps track of long running operations and + // makes sure it won't start the same operation twice in parallel. + // Each operation is identified by unique operationName. + // Simple keymutex.KeyMutex is not enough, we need to know what operations + // are in progress (so we don't schedule a new one) and keymutex.KeyMutex + // does not provide such functionality. + + // runningOperationsMapLock guards access to runningOperations map + runningOperationsMapLock sync.Mutex + // runningOperations is map of running operations. The value does not + // matter, presence of a key is enough to consider an operation running. + runningOperations map[string]bool } // NewPersistentVolumeController creates a new PersistentVolumeController @@ -108,9 +122,10 @@ func NewPersistentVolumeController( recorder := broadcaster.NewRecorder(api.EventSource{Component: "persistentvolume-controller"}) controller := &PersistentVolumeController{ - kubeClient: kubeClient, - eventRecorder: recorder, - cloud: cloud, + kubeClient: kubeClient, + eventRecorder: recorder, + runningOperations: make(map[string]bool), + cloud: cloud, } volumeSource := &cache.ListWatch{ @@ -998,6 +1013,52 @@ func (ctrl *PersistentVolumeController) reclaimVolume(volume *api.PersistentVolu return nil } +// scheduleOperation starts given asynchronous operation on given volume. It +// makes sure the operation is already not running. +func (ctrl *PersistentVolumeController) scheduleOperation(operationName string, operation func(arg interface{}), arg interface{}) { + glog.V(4).Infof("scheduleOperation[%s]", operationName) + + isRunning := func() bool { + // In anonymous func() to get the locking right. + ctrl.runningOperationsMapLock.Lock() + defer ctrl.runningOperationsMapLock.Unlock() + + if ctrl.isOperationRunning(operationName) { + glog.V(4).Infof("operation %q is already running, skipping", operationName) + return true + } + ctrl.startRunningOperation(operationName) + return false + }() + + if isRunning { + return + } + + // Run the operation in separate goroutine + go func() { + glog.V(4).Infof("scheduleOperation[%s]: running the operation", operationName) + operation(arg) + + ctrl.runningOperationsMapLock.Lock() + defer ctrl.runningOperationsMapLock.Unlock() + ctrl.finishRunningOperation(operationName) + }() +} + +func (ctrl *PersistentVolumeController) isOperationRunning(operationName string) bool { + _, found := ctrl.runningOperations[operationName] + return found +} + +func (ctrl *PersistentVolumeController) finishRunningOperation(operationName string) { + delete(ctrl.runningOperations, operationName) +} + +func (ctrl *PersistentVolumeController) startRunningOperation(operationName string) { + ctrl.runningOperations[operationName] = true +} + // Stateless functions func hasAnnotation(obj api.ObjectMeta, ann string) bool {