recycler: Maintain a list of long-running operations.

We need to keep list of running recyclers, deleters and provisioners in
memory in order not to start a new recycling/deleting/provisioning twice
for the same volume/claim.

This will be eventually replaced by GoRoutineMap from PR #24838.
This commit is contained in:
Jan Safranek 2016-05-17 14:55:18 +02:00
parent 4e47f69cba
commit cf68370371

View File

@ -18,6 +18,7 @@ package persistentvolume
import (
"fmt"
"sync"
"time"
"k8s.io/kubernetes/pkg/api"
@ -93,6 +94,19 @@ type PersistentVolumeController struct {
kubeClient clientset.Interface
eventRecorder record.EventRecorder
cloud cloudprovider.Interface
// PersistentVolumeController keeps track of long running operations and
// makes sure it won't start the same operation twice in parallel.
// Each operation is identified by unique operationName.
// Simple keymutex.KeyMutex is not enough, we need to know what operations
// are in progress (so we don't schedule a new one) and keymutex.KeyMutex
// does not provide such functionality.
// runningOperationsMapLock guards access to runningOperations map
runningOperationsMapLock sync.Mutex
// runningOperations is map of running operations. The value does not
// matter, presence of a key is enough to consider an operation running.
runningOperations map[string]bool
}
// NewPersistentVolumeController creates a new PersistentVolumeController
@ -108,9 +122,10 @@ func NewPersistentVolumeController(
recorder := broadcaster.NewRecorder(api.EventSource{Component: "persistentvolume-controller"})
controller := &PersistentVolumeController{
kubeClient: kubeClient,
eventRecorder: recorder,
cloud: cloud,
kubeClient: kubeClient,
eventRecorder: recorder,
runningOperations: make(map[string]bool),
cloud: cloud,
}
volumeSource := &cache.ListWatch{
@ -998,6 +1013,52 @@ func (ctrl *PersistentVolumeController) reclaimVolume(volume *api.PersistentVolu
return nil
}
// scheduleOperation starts given asynchronous operation on given volume. It
// makes sure the operation is already not running.
func (ctrl *PersistentVolumeController) scheduleOperation(operationName string, operation func(arg interface{}), arg interface{}) {
glog.V(4).Infof("scheduleOperation[%s]", operationName)
isRunning := func() bool {
// In anonymous func() to get the locking right.
ctrl.runningOperationsMapLock.Lock()
defer ctrl.runningOperationsMapLock.Unlock()
if ctrl.isOperationRunning(operationName) {
glog.V(4).Infof("operation %q is already running, skipping", operationName)
return true
}
ctrl.startRunningOperation(operationName)
return false
}()
if isRunning {
return
}
// Run the operation in separate goroutine
go func() {
glog.V(4).Infof("scheduleOperation[%s]: running the operation", operationName)
operation(arg)
ctrl.runningOperationsMapLock.Lock()
defer ctrl.runningOperationsMapLock.Unlock()
ctrl.finishRunningOperation(operationName)
}()
}
func (ctrl *PersistentVolumeController) isOperationRunning(operationName string) bool {
_, found := ctrl.runningOperations[operationName]
return found
}
func (ctrl *PersistentVolumeController) finishRunningOperation(operationName string) {
delete(ctrl.runningOperations, operationName)
}
func (ctrl *PersistentVolumeController) startRunningOperation(operationName string) {
ctrl.runningOperations[operationName] = true
}
// Stateless functions
func hasAnnotation(obj api.ObjectMeta, ann string) bool {