From 6081bd61f05263f6adccfe52c83776eb1a6f32cc Mon Sep 17 00:00:00 2001 From: Jan Safranek Date: Thu, 9 Jun 2016 13:49:04 +0200 Subject: [PATCH] Rework PV controller to use util/goroutinemap --- pkg/controller/persistentvolume/controller.go | 78 ++++++------------- .../persistentvolume/controller_base.go | 3 +- .../persistentvolume/framework_test.go | 11 +-- 3 files changed, 28 insertions(+), 64 deletions(-) diff --git a/pkg/controller/persistentvolume/controller.go b/pkg/controller/persistentvolume/controller.go index 92d92af63ff..bef158fb281 100644 --- a/pkg/controller/persistentvolume/controller.go +++ b/pkg/controller/persistentvolume/controller.go @@ -18,7 +18,6 @@ package persistentvolume import ( "fmt" - "sync" "time" "k8s.io/kubernetes/pkg/api" @@ -28,6 +27,7 @@ import ( "k8s.io/kubernetes/pkg/cloudprovider" "k8s.io/kubernetes/pkg/controller/framework" "k8s.io/kubernetes/pkg/conversion" + "k8s.io/kubernetes/pkg/util/goroutinemap" vol "k8s.io/kubernetes/pkg/volume" "github.com/golang/glog" @@ -127,22 +127,12 @@ type PersistentVolumeController struct { volumes persistentVolumeOrderedIndex claims cache.Store - // PersistentVolumeController keeps track of long running operations and - // makes sure it won't start the same operation twice in parallel. - // Each operation is identified by unique operationName. - // Simple keymutex.KeyMutex is not enough, we need to know what operations - // are in progress (so we don't schedule a new one) and keymutex.KeyMutex - // does not provide such functionality. - - // runningOperationsMapLock guards access to runningOperations map - runningOperationsMapLock sync.Mutex - // runningOperations is map of running operations. The value does not - // matter, presence of a key is enough to consider an operation running. - runningOperations map[string]bool + // Map of scheduled/running operations. + runningOperations goroutinemap.GoRoutineMap // For testing only: hook to call before an asynchronous operation starts. // Not used when set to nil. - preOperationHook func(operationName string, operationArgument interface{}) + preOperationHook func(operationName string) createProvisionedPVRetryCount int createProvisionedPVInterval time.Duration @@ -836,12 +826,18 @@ func (ctrl *PersistentVolumeController) reclaimVolume(volume *api.PersistentVolu case api.PersistentVolumeReclaimRecycle: glog.V(4).Infof("reclaimVolume[%s]: policy is Recycle", volume.Name) opName := fmt.Sprintf("recycle-%s[%s]", volume.Name, string(volume.UID)) - ctrl.scheduleOperation(opName, ctrl.recycleVolumeOperation, volume) + ctrl.scheduleOperation(opName, func() error { + ctrl.recycleVolumeOperation(volume) + return nil + }) case api.PersistentVolumeReclaimDelete: glog.V(4).Infof("reclaimVolume[%s]: policy is Delete", volume.Name) opName := fmt.Sprintf("delete-%s[%s]", volume.Name, string(volume.UID)) - ctrl.scheduleOperation(opName, ctrl.deleteVolumeOperation, volume) + ctrl.scheduleOperation(opName, func() error { + ctrl.deleteVolumeOperation(volume) + return nil + }) default: // Unknown PersistentVolumeReclaimPolicy @@ -1068,7 +1064,10 @@ func (ctrl *PersistentVolumeController) doDeleteVolume(volume *api.PersistentVol func (ctrl *PersistentVolumeController) provisionClaim(claim *api.PersistentVolumeClaim) error { glog.V(4).Infof("provisionClaim[%s]: started", claimToClaimKey(claim)) opName := fmt.Sprintf("provision-%s[%s]", claimToClaimKey(claim), string(claim.UID)) - ctrl.scheduleOperation(opName, ctrl.provisionClaimOperation, claim) + ctrl.scheduleOperation(opName, func() error { + ctrl.provisionClaimOperation(claim) + return nil + }) return nil } @@ -1217,51 +1216,20 @@ func (ctrl *PersistentVolumeController) getProvisionedVolumeNameForClaim(claim * // scheduleOperation starts given asynchronous operation on given volume. It // makes sure the operation is already not running. -func (ctrl *PersistentVolumeController) scheduleOperation(operationName string, operation func(arg interface{}), arg interface{}) { +func (ctrl *PersistentVolumeController) scheduleOperation(operationName string, operation func() error) { glog.V(4).Infof("scheduleOperation[%s]", operationName) // Poke test code that an operation is just about to get started. if ctrl.preOperationHook != nil { - ctrl.preOperationHook(operationName, arg) + ctrl.preOperationHook(operationName) } - isRunning := func() bool { - // In anonymous func() to get the locking right. - ctrl.runningOperationsMapLock.Lock() - defer ctrl.runningOperationsMapLock.Unlock() - - if ctrl.isOperationRunning(operationName) { + err := ctrl.runningOperations.Run(operationName, operation) + if err != nil { + if goroutinemap.IsAlreadyExists(err) { glog.V(4).Infof("operation %q is already running, skipping", operationName) - return true + } else { + glog.Errorf("error scheduling operaion %q: %v", operationName, err) } - ctrl.startRunningOperation(operationName) - return false - }() - - if isRunning { - return } - - // Run the operation in a separate goroutine - go func() { - glog.V(4).Infof("scheduleOperation[%s]: running the operation", operationName) - operation(arg) - - ctrl.runningOperationsMapLock.Lock() - defer ctrl.runningOperationsMapLock.Unlock() - ctrl.finishRunningOperation(operationName) - }() -} - -func (ctrl *PersistentVolumeController) isOperationRunning(operationName string) bool { - _, found := ctrl.runningOperations[operationName] - return found -} - -func (ctrl *PersistentVolumeController) finishRunningOperation(operationName string) { - delete(ctrl.runningOperations, operationName) -} - -func (ctrl *PersistentVolumeController) startRunningOperation(operationName string) { - ctrl.runningOperations[operationName] = true } diff --git a/pkg/controller/persistentvolume/controller_base.go b/pkg/controller/persistentvolume/controller_base.go index 89c0de3fd9c..2457884c761 100644 --- a/pkg/controller/persistentvolume/controller_base.go +++ b/pkg/controller/persistentvolume/controller_base.go @@ -31,6 +31,7 @@ import ( "k8s.io/kubernetes/pkg/cloudprovider" "k8s.io/kubernetes/pkg/controller/framework" "k8s.io/kubernetes/pkg/runtime" + "k8s.io/kubernetes/pkg/util/goroutinemap" vol "k8s.io/kubernetes/pkg/volume" "k8s.io/kubernetes/pkg/watch" @@ -64,7 +65,7 @@ func NewPersistentVolumeController( claims: cache.NewStore(framework.DeletionHandlingMetaNamespaceKeyFunc), kubeClient: kubeClient, eventRecorder: eventRecorder, - runningOperations: make(map[string]bool), + runningOperations: goroutinemap.NewGoRoutineMap(), cloud: cloud, provisioner: provisioner, clusterName: clusterName, diff --git a/pkg/controller/persistentvolume/framework_test.go b/pkg/controller/persistentvolume/framework_test.go index 821a108b1b2..2d0e8167b46 100644 --- a/pkg/controller/persistentvolume/framework_test.go +++ b/pkg/controller/persistentvolume/framework_test.go @@ -454,22 +454,17 @@ func (r *volumeReactor) getChangeCount() int { return r.changedSinceLastSync } -func (r *volumeReactor) getOperationCount() int { - r.ctrl.runningOperationsMapLock.Lock() - defer r.ctrl.runningOperationsMapLock.Unlock() - return len(r.ctrl.runningOperations) -} - // waitTest waits until all tests, controllers and other goroutines do their // job and no new actions are registered for 10 milliseconds. func (r *volumeReactor) waitTest() { + r.ctrl.runningOperations.Wait() // Check every 10ms if the controller does something and stop if it's // idle. oldChanges := -1 for { time.Sleep(10 * time.Millisecond) changes := r.getChangeCount() - if changes == oldChanges && r.getOperationCount() == 0 { + if changes == oldChanges { // No changes for last 10ms -> controller must be idle. break } @@ -774,7 +769,7 @@ func wrapTestWithInjectedOperation(toWrap testCall, injectBeforeOperation func(c return func(ctrl *PersistentVolumeController, reactor *volumeReactor, test controllerTest) error { // Inject a hook before async operation starts - ctrl.preOperationHook = func(operationName string, arg interface{}) { + ctrl.preOperationHook = func(operationName string) { // Inside the hook, run the function to inject glog.V(4).Infof("reactor: scheduleOperation reached, injecting call") injectBeforeOperation(ctrl, reactor)