Add work queues to PV controller

PV controller should not use Controller.Requeue, as as it is not available in
shared informers. We need to implement our own work queues instead where we
can enqueue volumes/claims as we want.
This commit is contained in:
Jan Safranek
2017-01-02 15:17:24 +01:00
parent dbb8bf5274
commit 0fd5f2028d
6 changed files with 242 additions and 219 deletions

View File

@@ -32,6 +32,7 @@ import (
"k8s.io/kubernetes/pkg/client/record"
"k8s.io/kubernetes/pkg/cloudprovider"
"k8s.io/kubernetes/pkg/util/goroutinemap"
"k8s.io/kubernetes/pkg/util/workqueue"
vol "k8s.io/kubernetes/pkg/volume"
"github.com/golang/glog"
@@ -146,8 +147,10 @@ const createProvisionedPVInterval = 10 * time.Second
// changes.
type PersistentVolumeController struct {
volumeController *cache.Controller
volumeInformer cache.Indexer
volumeSource cache.ListerWatcher
claimController *cache.Controller
claimInformer cache.Store
claimSource cache.ListerWatcher
classReflector *cache.Reflector
classSource cache.ListerWatcher
@@ -163,10 +166,34 @@ type PersistentVolumeController struct {
// must be cloned before any modification. These caches get updated both by
// "xxx added/updated/deleted" events from etcd and by the controller when
// it saves newer version to etcd.
// Why local cache: binding a volume to a claim generates 4 events, roughly
// in this order (depends on goroutine ordering):
// - volume.Spec update
// - volume.Status update
// - claim.Spec update
// - claim.Status update
// With these caches, the controller can check that it has already saved
// volume.Status and claim.Spec+Status and does not need to do anything
// when e.g. volume.Spec update event arrives before all the other events.
// Without this cache, it would see the old version of volume.Status and
// claim in the informers (it has not been updated from API server events
// yet) and it would try to fix these objects to be bound together.
// Any write to API server would fail with version conflict - these objects
// have been already written.
volumes persistentVolumeOrderedIndex
claims cache.Store
classes cache.Store
// Work queues of claims and volumes to process. Every queue should have
// exactly one worker thread, especially syncClaim() is not reentrant.
// Two syncClaims could bind two different claims to the same volume or one
// claim to two volumes. The controller would recover from this (due to
// version errors in API server and other checks in this controller),
// however overall speed of multi-worker controller would be lower than if
// it runs single thread only.
claimQueue *workqueue.Type
volumeQueue *workqueue.Type
// Map of scheduled/running operations.
runningOperations goroutinemap.GoRoutineMap
@@ -463,19 +490,11 @@ func (ctrl *PersistentVolumeController) syncVolume(volume *v1.PersistentVolume)
// In both cases, the volume is Bound and the claim is Pending.
// Next syncClaim will fix it. To speed it up, we enqueue the claim
// into the controller, which results in syncClaim to be called
// shortly (and in the right goroutine).
// shortly (and in the right worker goroutine).
// This speeds up binding of provisioned volumes - provisioner saves
// only the new PV and it expects that next syncClaim will bind the
// claim to it.
clone, err := api.Scheme.DeepCopy(claim)
if err != nil {
return fmt.Errorf("error cloning claim %q: %v", claimToClaimKey(claim), err)
}
glog.V(5).Infof("requeueing claim %q for faster syncClaim", claimToClaimKey(claim))
err = ctrl.claimController.Requeue(clone)
if err != nil {
return fmt.Errorf("error enqueing claim %q for faster sync: %v", claimToClaimKey(claim), err)
}
ctrl.claimQueue.Add(claimToClaimKey(claim))
return nil
} else if claim.Spec.VolumeName == volume.Name {
// Volume is bound to a claim properly, update status if necessary