Rework PV controller to use util/goroutinemap

This commit is contained in:
Jan Safranek 2016-06-09 13:49:04 +02:00
parent c80f650b70
commit 6081bd61f0
3 changed files with 28 additions and 64 deletions

View File

@ -18,7 +18,6 @@ package persistentvolume
import (
"fmt"
"sync"
"time"
"k8s.io/kubernetes/pkg/api"
@ -28,6 +27,7 @@ import (
"k8s.io/kubernetes/pkg/cloudprovider"
"k8s.io/kubernetes/pkg/controller/framework"
"k8s.io/kubernetes/pkg/conversion"
"k8s.io/kubernetes/pkg/util/goroutinemap"
vol "k8s.io/kubernetes/pkg/volume"
"github.com/golang/glog"
@ -127,22 +127,12 @@ type PersistentVolumeController struct {
volumes persistentVolumeOrderedIndex
claims cache.Store
// PersistentVolumeController keeps track of long running operations and
// makes sure it won't start the same operation twice in parallel.
// Each operation is identified by unique operationName.
// Simple keymutex.KeyMutex is not enough, we need to know what operations
// are in progress (so we don't schedule a new one) and keymutex.KeyMutex
// does not provide such functionality.
// runningOperationsMapLock guards access to runningOperations map
runningOperationsMapLock sync.Mutex
// runningOperations is map of running operations. The value does not
// matter, presence of a key is enough to consider an operation running.
runningOperations map[string]bool
// Map of scheduled/running operations.
runningOperations goroutinemap.GoRoutineMap
// For testing only: hook to call before an asynchronous operation starts.
// Not used when set to nil.
preOperationHook func(operationName string, operationArgument interface{})
preOperationHook func(operationName string)
createProvisionedPVRetryCount int
createProvisionedPVInterval time.Duration
@ -836,12 +826,18 @@ func (ctrl *PersistentVolumeController) reclaimVolume(volume *api.PersistentVolu
case api.PersistentVolumeReclaimRecycle:
glog.V(4).Infof("reclaimVolume[%s]: policy is Recycle", volume.Name)
opName := fmt.Sprintf("recycle-%s[%s]", volume.Name, string(volume.UID))
ctrl.scheduleOperation(opName, ctrl.recycleVolumeOperation, volume)
ctrl.scheduleOperation(opName, func() error {
ctrl.recycleVolumeOperation(volume)
return nil
})
case api.PersistentVolumeReclaimDelete:
glog.V(4).Infof("reclaimVolume[%s]: policy is Delete", volume.Name)
opName := fmt.Sprintf("delete-%s[%s]", volume.Name, string(volume.UID))
ctrl.scheduleOperation(opName, ctrl.deleteVolumeOperation, volume)
ctrl.scheduleOperation(opName, func() error {
ctrl.deleteVolumeOperation(volume)
return nil
})
default:
// Unknown PersistentVolumeReclaimPolicy
@ -1068,7 +1064,10 @@ func (ctrl *PersistentVolumeController) doDeleteVolume(volume *api.PersistentVol
func (ctrl *PersistentVolumeController) provisionClaim(claim *api.PersistentVolumeClaim) error {
glog.V(4).Infof("provisionClaim[%s]: started", claimToClaimKey(claim))
opName := fmt.Sprintf("provision-%s[%s]", claimToClaimKey(claim), string(claim.UID))
ctrl.scheduleOperation(opName, ctrl.provisionClaimOperation, claim)
ctrl.scheduleOperation(opName, func() error {
ctrl.provisionClaimOperation(claim)
return nil
})
return nil
}
@ -1217,51 +1216,20 @@ func (ctrl *PersistentVolumeController) getProvisionedVolumeNameForClaim(claim *
// scheduleOperation starts given asynchronous operation on given volume. It
// makes sure the operation is already not running.
func (ctrl *PersistentVolumeController) scheduleOperation(operationName string, operation func(arg interface{}), arg interface{}) {
func (ctrl *PersistentVolumeController) scheduleOperation(operationName string, operation func() error) {
glog.V(4).Infof("scheduleOperation[%s]", operationName)
// Poke test code that an operation is just about to get started.
if ctrl.preOperationHook != nil {
ctrl.preOperationHook(operationName, arg)
ctrl.preOperationHook(operationName)
}
isRunning := func() bool {
// In anonymous func() to get the locking right.
ctrl.runningOperationsMapLock.Lock()
defer ctrl.runningOperationsMapLock.Unlock()
if ctrl.isOperationRunning(operationName) {
err := ctrl.runningOperations.Run(operationName, operation)
if err != nil {
if goroutinemap.IsAlreadyExists(err) {
glog.V(4).Infof("operation %q is already running, skipping", operationName)
return true
} else {
glog.Errorf("error scheduling operaion %q: %v", operationName, err)
}
ctrl.startRunningOperation(operationName)
return false
}()
if isRunning {
return
}
// Run the operation in a separate goroutine
go func() {
glog.V(4).Infof("scheduleOperation[%s]: running the operation", operationName)
operation(arg)
ctrl.runningOperationsMapLock.Lock()
defer ctrl.runningOperationsMapLock.Unlock()
ctrl.finishRunningOperation(operationName)
}()
}
func (ctrl *PersistentVolumeController) isOperationRunning(operationName string) bool {
_, found := ctrl.runningOperations[operationName]
return found
}
func (ctrl *PersistentVolumeController) finishRunningOperation(operationName string) {
delete(ctrl.runningOperations, operationName)
}
func (ctrl *PersistentVolumeController) startRunningOperation(operationName string) {
ctrl.runningOperations[operationName] = true
}

View File

@ -31,6 +31,7 @@ import (
"k8s.io/kubernetes/pkg/cloudprovider"
"k8s.io/kubernetes/pkg/controller/framework"
"k8s.io/kubernetes/pkg/runtime"
"k8s.io/kubernetes/pkg/util/goroutinemap"
vol "k8s.io/kubernetes/pkg/volume"
"k8s.io/kubernetes/pkg/watch"
@ -64,7 +65,7 @@ func NewPersistentVolumeController(
claims: cache.NewStore(framework.DeletionHandlingMetaNamespaceKeyFunc),
kubeClient: kubeClient,
eventRecorder: eventRecorder,
runningOperations: make(map[string]bool),
runningOperations: goroutinemap.NewGoRoutineMap(),
cloud: cloud,
provisioner: provisioner,
clusterName: clusterName,

View File

@ -454,22 +454,17 @@ func (r *volumeReactor) getChangeCount() int {
return r.changedSinceLastSync
}
func (r *volumeReactor) getOperationCount() int {
r.ctrl.runningOperationsMapLock.Lock()
defer r.ctrl.runningOperationsMapLock.Unlock()
return len(r.ctrl.runningOperations)
}
// waitTest waits until all tests, controllers and other goroutines do their
// job and no new actions are registered for 10 milliseconds.
func (r *volumeReactor) waitTest() {
r.ctrl.runningOperations.Wait()
// Check every 10ms if the controller does something and stop if it's
// idle.
oldChanges := -1
for {
time.Sleep(10 * time.Millisecond)
changes := r.getChangeCount()
if changes == oldChanges && r.getOperationCount() == 0 {
if changes == oldChanges {
// No changes for last 10ms -> controller must be idle.
break
}
@ -774,7 +769,7 @@ func wrapTestWithInjectedOperation(toWrap testCall, injectBeforeOperation func(c
return func(ctrl *PersistentVolumeController, reactor *volumeReactor, test controllerTest) error {
// Inject a hook before async operation starts
ctrl.preOperationHook = func(operationName string, arg interface{}) {
ctrl.preOperationHook = func(operationName string) {
// Inside the hook, run the function to inject
glog.V(4).Infof("reactor: scheduleOperation reached, injecting call")
injectBeforeOperation(ctrl, reactor)