mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-27 13:37:30 +00:00
Merge pull request #27127 from jsafrane/refactor-binder-operations
Automatic merge from submit-queue Rework PV controller to use util/goroutinemap @kubernetes/sig-storage
This commit is contained in:
commit
f97bca37a5
@ -18,7 +18,6 @@ package persistentvolume
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"fmt"
|
"fmt"
|
||||||
"sync"
|
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"k8s.io/kubernetes/pkg/api"
|
"k8s.io/kubernetes/pkg/api"
|
||||||
@ -28,6 +27,7 @@ import (
|
|||||||
"k8s.io/kubernetes/pkg/cloudprovider"
|
"k8s.io/kubernetes/pkg/cloudprovider"
|
||||||
"k8s.io/kubernetes/pkg/controller/framework"
|
"k8s.io/kubernetes/pkg/controller/framework"
|
||||||
"k8s.io/kubernetes/pkg/conversion"
|
"k8s.io/kubernetes/pkg/conversion"
|
||||||
|
"k8s.io/kubernetes/pkg/util/goroutinemap"
|
||||||
vol "k8s.io/kubernetes/pkg/volume"
|
vol "k8s.io/kubernetes/pkg/volume"
|
||||||
|
|
||||||
"github.com/golang/glog"
|
"github.com/golang/glog"
|
||||||
@ -127,22 +127,12 @@ type PersistentVolumeController struct {
|
|||||||
volumes persistentVolumeOrderedIndex
|
volumes persistentVolumeOrderedIndex
|
||||||
claims cache.Store
|
claims cache.Store
|
||||||
|
|
||||||
// PersistentVolumeController keeps track of long running operations and
|
// Map of scheduled/running operations.
|
||||||
// makes sure it won't start the same operation twice in parallel.
|
runningOperations goroutinemap.GoRoutineMap
|
||||||
// Each operation is identified by unique operationName.
|
|
||||||
// Simple keymutex.KeyMutex is not enough, we need to know what operations
|
|
||||||
// are in progress (so we don't schedule a new one) and keymutex.KeyMutex
|
|
||||||
// does not provide such functionality.
|
|
||||||
|
|
||||||
// runningOperationsMapLock guards access to runningOperations map
|
|
||||||
runningOperationsMapLock sync.Mutex
|
|
||||||
// runningOperations is map of running operations. The value does not
|
|
||||||
// matter, presence of a key is enough to consider an operation running.
|
|
||||||
runningOperations map[string]bool
|
|
||||||
|
|
||||||
// For testing only: hook to call before an asynchronous operation starts.
|
// For testing only: hook to call before an asynchronous operation starts.
|
||||||
// Not used when set to nil.
|
// Not used when set to nil.
|
||||||
preOperationHook func(operationName string, operationArgument interface{})
|
preOperationHook func(operationName string)
|
||||||
|
|
||||||
createProvisionedPVRetryCount int
|
createProvisionedPVRetryCount int
|
||||||
createProvisionedPVInterval time.Duration
|
createProvisionedPVInterval time.Duration
|
||||||
@ -836,12 +826,18 @@ func (ctrl *PersistentVolumeController) reclaimVolume(volume *api.PersistentVolu
|
|||||||
case api.PersistentVolumeReclaimRecycle:
|
case api.PersistentVolumeReclaimRecycle:
|
||||||
glog.V(4).Infof("reclaimVolume[%s]: policy is Recycle", volume.Name)
|
glog.V(4).Infof("reclaimVolume[%s]: policy is Recycle", volume.Name)
|
||||||
opName := fmt.Sprintf("recycle-%s[%s]", volume.Name, string(volume.UID))
|
opName := fmt.Sprintf("recycle-%s[%s]", volume.Name, string(volume.UID))
|
||||||
ctrl.scheduleOperation(opName, ctrl.recycleVolumeOperation, volume)
|
ctrl.scheduleOperation(opName, func() error {
|
||||||
|
ctrl.recycleVolumeOperation(volume)
|
||||||
|
return nil
|
||||||
|
})
|
||||||
|
|
||||||
case api.PersistentVolumeReclaimDelete:
|
case api.PersistentVolumeReclaimDelete:
|
||||||
glog.V(4).Infof("reclaimVolume[%s]: policy is Delete", volume.Name)
|
glog.V(4).Infof("reclaimVolume[%s]: policy is Delete", volume.Name)
|
||||||
opName := fmt.Sprintf("delete-%s[%s]", volume.Name, string(volume.UID))
|
opName := fmt.Sprintf("delete-%s[%s]", volume.Name, string(volume.UID))
|
||||||
ctrl.scheduleOperation(opName, ctrl.deleteVolumeOperation, volume)
|
ctrl.scheduleOperation(opName, func() error {
|
||||||
|
ctrl.deleteVolumeOperation(volume)
|
||||||
|
return nil
|
||||||
|
})
|
||||||
|
|
||||||
default:
|
default:
|
||||||
// Unknown PersistentVolumeReclaimPolicy
|
// Unknown PersistentVolumeReclaimPolicy
|
||||||
@ -1068,7 +1064,10 @@ func (ctrl *PersistentVolumeController) doDeleteVolume(volume *api.PersistentVol
|
|||||||
func (ctrl *PersistentVolumeController) provisionClaim(claim *api.PersistentVolumeClaim) error {
|
func (ctrl *PersistentVolumeController) provisionClaim(claim *api.PersistentVolumeClaim) error {
|
||||||
glog.V(4).Infof("provisionClaim[%s]: started", claimToClaimKey(claim))
|
glog.V(4).Infof("provisionClaim[%s]: started", claimToClaimKey(claim))
|
||||||
opName := fmt.Sprintf("provision-%s[%s]", claimToClaimKey(claim), string(claim.UID))
|
opName := fmt.Sprintf("provision-%s[%s]", claimToClaimKey(claim), string(claim.UID))
|
||||||
ctrl.scheduleOperation(opName, ctrl.provisionClaimOperation, claim)
|
ctrl.scheduleOperation(opName, func() error {
|
||||||
|
ctrl.provisionClaimOperation(claim)
|
||||||
|
return nil
|
||||||
|
})
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -1217,51 +1216,20 @@ func (ctrl *PersistentVolumeController) getProvisionedVolumeNameForClaim(claim *
|
|||||||
|
|
||||||
// scheduleOperation starts given asynchronous operation on given volume. It
|
// scheduleOperation starts given asynchronous operation on given volume. It
|
||||||
// makes sure the operation is already not running.
|
// makes sure the operation is already not running.
|
||||||
func (ctrl *PersistentVolumeController) scheduleOperation(operationName string, operation func(arg interface{}), arg interface{}) {
|
func (ctrl *PersistentVolumeController) scheduleOperation(operationName string, operation func() error) {
|
||||||
glog.V(4).Infof("scheduleOperation[%s]", operationName)
|
glog.V(4).Infof("scheduleOperation[%s]", operationName)
|
||||||
|
|
||||||
// Poke test code that an operation is just about to get started.
|
// Poke test code that an operation is just about to get started.
|
||||||
if ctrl.preOperationHook != nil {
|
if ctrl.preOperationHook != nil {
|
||||||
ctrl.preOperationHook(operationName, arg)
|
ctrl.preOperationHook(operationName)
|
||||||
}
|
}
|
||||||
|
|
||||||
isRunning := func() bool {
|
err := ctrl.runningOperations.Run(operationName, operation)
|
||||||
// In anonymous func() to get the locking right.
|
if err != nil {
|
||||||
ctrl.runningOperationsMapLock.Lock()
|
if goroutinemap.IsAlreadyExists(err) {
|
||||||
defer ctrl.runningOperationsMapLock.Unlock()
|
|
||||||
|
|
||||||
if ctrl.isOperationRunning(operationName) {
|
|
||||||
glog.V(4).Infof("operation %q is already running, skipping", operationName)
|
glog.V(4).Infof("operation %q is already running, skipping", operationName)
|
||||||
return true
|
} else {
|
||||||
|
glog.Errorf("error scheduling operaion %q: %v", operationName, err)
|
||||||
}
|
}
|
||||||
ctrl.startRunningOperation(operationName)
|
|
||||||
return false
|
|
||||||
}()
|
|
||||||
|
|
||||||
if isRunning {
|
|
||||||
return
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// Run the operation in a separate goroutine
|
|
||||||
go func() {
|
|
||||||
glog.V(4).Infof("scheduleOperation[%s]: running the operation", operationName)
|
|
||||||
operation(arg)
|
|
||||||
|
|
||||||
ctrl.runningOperationsMapLock.Lock()
|
|
||||||
defer ctrl.runningOperationsMapLock.Unlock()
|
|
||||||
ctrl.finishRunningOperation(operationName)
|
|
||||||
}()
|
|
||||||
}
|
|
||||||
|
|
||||||
func (ctrl *PersistentVolumeController) isOperationRunning(operationName string) bool {
|
|
||||||
_, found := ctrl.runningOperations[operationName]
|
|
||||||
return found
|
|
||||||
}
|
|
||||||
|
|
||||||
func (ctrl *PersistentVolumeController) finishRunningOperation(operationName string) {
|
|
||||||
delete(ctrl.runningOperations, operationName)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (ctrl *PersistentVolumeController) startRunningOperation(operationName string) {
|
|
||||||
ctrl.runningOperations[operationName] = true
|
|
||||||
}
|
}
|
||||||
|
@ -31,6 +31,7 @@ import (
|
|||||||
"k8s.io/kubernetes/pkg/cloudprovider"
|
"k8s.io/kubernetes/pkg/cloudprovider"
|
||||||
"k8s.io/kubernetes/pkg/controller/framework"
|
"k8s.io/kubernetes/pkg/controller/framework"
|
||||||
"k8s.io/kubernetes/pkg/runtime"
|
"k8s.io/kubernetes/pkg/runtime"
|
||||||
|
"k8s.io/kubernetes/pkg/util/goroutinemap"
|
||||||
vol "k8s.io/kubernetes/pkg/volume"
|
vol "k8s.io/kubernetes/pkg/volume"
|
||||||
"k8s.io/kubernetes/pkg/watch"
|
"k8s.io/kubernetes/pkg/watch"
|
||||||
|
|
||||||
@ -64,7 +65,7 @@ func NewPersistentVolumeController(
|
|||||||
claims: cache.NewStore(framework.DeletionHandlingMetaNamespaceKeyFunc),
|
claims: cache.NewStore(framework.DeletionHandlingMetaNamespaceKeyFunc),
|
||||||
kubeClient: kubeClient,
|
kubeClient: kubeClient,
|
||||||
eventRecorder: eventRecorder,
|
eventRecorder: eventRecorder,
|
||||||
runningOperations: make(map[string]bool),
|
runningOperations: goroutinemap.NewGoRoutineMap(),
|
||||||
cloud: cloud,
|
cloud: cloud,
|
||||||
provisioner: provisioner,
|
provisioner: provisioner,
|
||||||
clusterName: clusterName,
|
clusterName: clusterName,
|
||||||
|
@ -454,22 +454,17 @@ func (r *volumeReactor) getChangeCount() int {
|
|||||||
return r.changedSinceLastSync
|
return r.changedSinceLastSync
|
||||||
}
|
}
|
||||||
|
|
||||||
func (r *volumeReactor) getOperationCount() int {
|
|
||||||
r.ctrl.runningOperationsMapLock.Lock()
|
|
||||||
defer r.ctrl.runningOperationsMapLock.Unlock()
|
|
||||||
return len(r.ctrl.runningOperations)
|
|
||||||
}
|
|
||||||
|
|
||||||
// waitTest waits until all tests, controllers and other goroutines do their
|
// waitTest waits until all tests, controllers and other goroutines do their
|
||||||
// job and no new actions are registered for 10 milliseconds.
|
// job and no new actions are registered for 10 milliseconds.
|
||||||
func (r *volumeReactor) waitTest() {
|
func (r *volumeReactor) waitTest() {
|
||||||
|
r.ctrl.runningOperations.Wait()
|
||||||
// Check every 10ms if the controller does something and stop if it's
|
// Check every 10ms if the controller does something and stop if it's
|
||||||
// idle.
|
// idle.
|
||||||
oldChanges := -1
|
oldChanges := -1
|
||||||
for {
|
for {
|
||||||
time.Sleep(10 * time.Millisecond)
|
time.Sleep(10 * time.Millisecond)
|
||||||
changes := r.getChangeCount()
|
changes := r.getChangeCount()
|
||||||
if changes == oldChanges && r.getOperationCount() == 0 {
|
if changes == oldChanges {
|
||||||
// No changes for last 10ms -> controller must be idle.
|
// No changes for last 10ms -> controller must be idle.
|
||||||
break
|
break
|
||||||
}
|
}
|
||||||
@ -774,7 +769,7 @@ func wrapTestWithInjectedOperation(toWrap testCall, injectBeforeOperation func(c
|
|||||||
|
|
||||||
return func(ctrl *PersistentVolumeController, reactor *volumeReactor, test controllerTest) error {
|
return func(ctrl *PersistentVolumeController, reactor *volumeReactor, test controllerTest) error {
|
||||||
// Inject a hook before async operation starts
|
// Inject a hook before async operation starts
|
||||||
ctrl.preOperationHook = func(operationName string, arg interface{}) {
|
ctrl.preOperationHook = func(operationName string) {
|
||||||
// Inside the hook, run the function to inject
|
// Inside the hook, run the function to inject
|
||||||
glog.V(4).Infof("reactor: scheduleOperation reached, injecting call")
|
glog.V(4).Infof("reactor: scheduleOperation reached, injecting call")
|
||||||
injectBeforeOperation(ctrl, reactor)
|
injectBeforeOperation(ctrl, reactor)
|
||||||
|
Loading…
Reference in New Issue
Block a user