From 02792323606b80a846e8ea7a24733393acafe6e2 Mon Sep 17 00:00:00 2001 From: Jan Safranek Date: Thu, 19 May 2016 13:31:19 +0200 Subject: [PATCH] volume controller: Add cache with the latest version of PVs and PVCs When the controller binds a PV to PVC, it saves both objects to etcd. However, there is still an old version of these objects in the controller Informer cache. So, when a new PVC comes, the PV is still seen as available and may get bound to the new PVC. This will be blocked by etcd, still, it creates unnecessary traffic that slows everything down. Also, we save bound PV/PVC as two transactions - we save PV/PVC.Spec first and then .Status. The controller gets "PV/PVC.Spec updated" event from etcd and tries to fix the Status, as it seems to the controller it's outdated. This write again fails - there already is a correct version in etcd. We can't influence the Informer cache, it is read-only to the controller. To prevent these useless writes to etcd, this patch introduces second cache in the controller, which holds latest and greatest version on PVs and PVCs. It gets updated with events from etcd *and* after etcd confirms successful save of PV/PVC modified by the controller. The cache stores only *pointers* to PVs/PVCs, so in ideal case it shares the actual object data with the informer cache. They will diverge only when the controller modifies something and the informer cache did not get update events yet. --- pkg/controller/persistentvolume/controller.go | 44 +++++++- .../persistentvolume/controller_base.go | 105 +++++++++++++++++- .../persistentvolume/controller_test.go | 69 ++++++++++++ pkg/controller/persistentvolume/index.go | 4 + pkg/controller/persistentvolume/index_test.go | 5 - 5 files changed, 217 insertions(+), 10 deletions(-) diff --git a/pkg/controller/persistentvolume/controller.go b/pkg/controller/persistentvolume/controller.go index e61a30baa65..5ebad4da2e6 100644 --- a/pkg/controller/persistentvolume/controller.go +++ b/pkg/controller/persistentvolume/controller.go @@ -106,10 +106,8 @@ const createProvisionedPVInterval = 10 * time.Second // framework.Controllers that watch PerstentVolume and PersistentVolumeClaim // changes. type PersistentVolumeController struct { - volumes persistentVolumeOrderedIndex volumeController *framework.Controller volumeControllerStopCh chan struct{} - claims cache.Store claimController *framework.Controller claimControllerStopCh chan struct{} kubeClient clientset.Interface @@ -119,6 +117,14 @@ type PersistentVolumeController struct { provisioner vol.ProvisionableVolumePlugin clusterName string + // Cache of the last known version of volumes and claims. This cache is + // thread safe as long as the volumes/claims there are not modified, they + // must be cloned before any modification. These caches get updated both by + // "xxx added/updated/deleted" events from etcd and by the controller when + // it saves newer version to etcd. + volumes persistentVolumeOrderedIndex + claims cache.Store + // PersistentVolumeController keeps track of long running operations and // makes sure it won't start the same operation twice in parallel. // Each operation is identified by unique operationName. @@ -507,6 +513,11 @@ func (ctrl *PersistentVolumeController) updateClaimPhase(claim *api.PersistentVo glog.V(4).Infof("updating PersistentVolumeClaim[%s]: set phase %s failed: %v", claimToClaimKey(claim), phase, err) return newClaim, err } + _, err = storeObjectUpdate(ctrl.claims, newClaim, "claim") + if err != nil { + glog.V(4).Infof("updating PersistentVolumeClaim[%s]: cannot update internal cache: %v", claimToClaimKey(claim), err) + return newClaim, err + } glog.V(2).Infof("claim %q entered phase %q", claimToClaimKey(claim), phase) return newClaim, nil } @@ -559,6 +570,11 @@ func (ctrl *PersistentVolumeController) updateVolumePhase(volume *api.Persistent glog.V(4).Infof("updating PersistentVolume[%s]: set phase %s failed: %v", volume.Name, phase, err) return newVol, err } + _, err = storeObjectUpdate(ctrl.volumes.store, newVol, "volume") + if err != nil { + glog.V(4).Infof("updating PersistentVolume[%s]: cannot update internal cache: %v", volume.Name, err) + return newVol, err + } glog.V(2).Infof("volume %q entered phase %q", volume.Name, phase) return newVol, err } @@ -639,6 +655,11 @@ func (ctrl *PersistentVolumeController) bindVolumeToClaim(volume *api.Persistent glog.V(4).Infof("updating PersistentVolume[%s]: binding to %q failed: %v", volume.Name, claimToClaimKey(claim), err) return newVol, err } + _, err = storeObjectUpdate(ctrl.volumes.store, newVol, "volume") + if err != nil { + glog.V(4).Infof("updating PersistentVolume[%s]: cannot update internal cache: %v", volume.Name, err) + return newVol, err + } glog.V(4).Infof("updating PersistentVolume[%s]: bound to %q", newVol.Name, claimToClaimKey(claim)) return newVol, nil } @@ -696,6 +717,11 @@ func (ctrl *PersistentVolumeController) bindClaimToVolume(claim *api.PersistentV glog.V(4).Infof("updating PersistentVolumeClaim[%s]: binding to %q failed: %v", claimToClaimKey(claim), volume.Name, err) return newClaim, err } + _, err = storeObjectUpdate(ctrl.claims, newClaim, "claim") + if err != nil { + glog.V(4).Infof("updating PersistentVolumeClaim[%s]: cannot update internal cache: %v", claimToClaimKey(claim), err) + return newClaim, err + } glog.V(4).Infof("updating PersistentVolumeClaim[%s]: bound to %q", claimToClaimKey(claim), volume.Name) return newClaim, nil } @@ -785,6 +811,11 @@ func (ctrl *PersistentVolumeController) unbindVolume(volume *api.PersistentVolum glog.V(4).Infof("updating PersistentVolume[%s]: rollback failed: %v", volume.Name, err) return err } + _, err = storeObjectUpdate(ctrl.volumes.store, newVol, "volume") + if err != nil { + glog.V(4).Infof("updating PersistentVolume[%s]: cannot update internal cache: %v", volume.Name, err) + return err + } glog.V(4).Infof("updating PersistentVolume[%s]: rolled back", newVol.Name) // Update the status @@ -1124,9 +1155,16 @@ func (ctrl *PersistentVolumeController) provisionClaimOperation(claimObj interfa // Try to create the PV object several times for i := 0; i < ctrl.createProvisionedPVRetryCount; i++ { glog.V(4).Infof("provisionClaimOperation [%s]: trying to save volume %s", claimToClaimKey(claim), volume.Name) - if _, err = ctrl.kubeClient.Core().PersistentVolumes().Create(volume); err == nil { + var newVol *api.PersistentVolume + if newVol, err = ctrl.kubeClient.Core().PersistentVolumes().Create(volume); err == nil { // Save succeeded. glog.V(3).Infof("volume %q for claim %q saved", volume.Name, claimToClaimKey(claim)) + + _, err = storeObjectUpdate(ctrl.volumes.store, newVol, "volume") + if err != nil { + // We will get an "volume added" event soon, this is not a big error + glog.V(4).Infof("provisionClaimOperation [%s]: cannot update internal cache: %v", volume.Name, err) + } break } // Save failed, try again after a while. diff --git a/pkg/controller/persistentvolume/controller_base.go b/pkg/controller/persistentvolume/controller_base.go index 6a217a590c5..a2c33d700ae 100644 --- a/pkg/controller/persistentvolume/controller_base.go +++ b/pkg/controller/persistentvolume/controller_base.go @@ -18,10 +18,12 @@ package persistentvolume import ( "fmt" + "strconv" "time" "k8s.io/kubernetes/pkg/api" "k8s.io/kubernetes/pkg/api/errors" + "k8s.io/kubernetes/pkg/api/meta" "k8s.io/kubernetes/pkg/client/cache" clientset "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset" unversioned_core "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset/typed/core/unversioned" @@ -58,6 +60,8 @@ func NewPersistentVolumeController( } controller := &PersistentVolumeController{ + volumes: newPersistentVolumeOrderedIndex(), + claims: cache.NewStore(framework.DeletionHandlingMetaNamespaceKeyFunc), kubeClient: kubeClient, eventRecorder: eventRecorder, runningOperations: make(map[string]bool), @@ -97,7 +101,7 @@ func NewPersistentVolumeController( } } - controller.volumes.store, controller.volumeController = framework.NewIndexerInformer( + _, controller.volumeController = framework.NewIndexerInformer( volumeSource, &api.PersistentVolume{}, syncPeriod, @@ -108,7 +112,7 @@ func NewPersistentVolumeController( }, cache.Indexers{"accessmodes": accessModesIndexFunc}, ) - controller.claims, controller.claimController = framework.NewInformer( + _, controller.claimController = framework.NewInformer( claimSource, &api.PersistentVolumeClaim{}, syncPeriod, @@ -124,6 +128,16 @@ func NewPersistentVolumeController( // addVolume is callback from framework.Controller watching PersistentVolume // events. func (ctrl *PersistentVolumeController) addVolume(obj interface{}) { + // Store the new volume version in the cache and do not process it if this + // is an old version. + new, err := storeObjectUpdate(ctrl.volumes.store, obj, "volume") + if err != nil { + glog.Errorf("%v", err) + } + if !new { + return + } + if !ctrl.isFullySynced() { return } @@ -147,6 +161,16 @@ func (ctrl *PersistentVolumeController) addVolume(obj interface{}) { // updateVolume is callback from framework.Controller watching PersistentVolume // events. func (ctrl *PersistentVolumeController) updateVolume(oldObj, newObj interface{}) { + // Store the new volume version in the cache and do not process it if this + // is an old version. + new, err := storeObjectUpdate(ctrl.volumes.store, newObj, "volume") + if err != nil { + glog.Errorf("%v", err) + } + if !new { + return + } + if !ctrl.isFullySynced() { return } @@ -170,6 +194,8 @@ func (ctrl *PersistentVolumeController) updateVolume(oldObj, newObj interface{}) // deleteVolume is callback from framework.Controller watching PersistentVolume // events. func (ctrl *PersistentVolumeController) deleteVolume(obj interface{}) { + _ = ctrl.volumes.store.Delete(obj) + if !ctrl.isFullySynced() { return } @@ -218,6 +244,16 @@ func (ctrl *PersistentVolumeController) deleteVolume(obj interface{}) { // addClaim is callback from framework.Controller watching PersistentVolumeClaim // events. func (ctrl *PersistentVolumeController) addClaim(obj interface{}) { + // Store the new claim version in the cache and do not process it if this is + // an old version. + new, err := storeObjectUpdate(ctrl.claims, obj, "claim") + if err != nil { + glog.Errorf("%v", err) + } + if !new { + return + } + if !ctrl.isFullySynced() { return } @@ -241,6 +277,16 @@ func (ctrl *PersistentVolumeController) addClaim(obj interface{}) { // updateClaim is callback from framework.Controller watching PersistentVolumeClaim // events. func (ctrl *PersistentVolumeController) updateClaim(oldObj, newObj interface{}) { + // Store the new claim version in the cache and do not process it if this is + // an old version. + new, err := storeObjectUpdate(ctrl.claims, newObj, "claim") + if err != nil { + glog.Errorf("%v", err) + } + if !new { + return + } + if !ctrl.isFullySynced() { return } @@ -264,6 +310,8 @@ func (ctrl *PersistentVolumeController) updateClaim(oldObj, newObj interface{}) // deleteClaim is callback from framework.Controller watching PersistentVolumeClaim // events. func (ctrl *PersistentVolumeController) deleteClaim(obj interface{}) { + _ = ctrl.claims.Delete(obj) + if !ctrl.isFullySynced() { return } @@ -388,3 +436,56 @@ func isVolumeBoundToClaim(volume *api.PersistentVolume, claim *api.PersistentVol } return true } + +// storeObjectUpdate updates given cache with a new object version from Informer +// callback (i.e. with events from etcd) or with an object modified by the +// controller itself. Returns "true", if the cache was updated, false if the +// object is an old version and should be ignored. +func storeObjectUpdate(store cache.Store, obj interface{}, className string) (bool, error) { + objAccessor, err := meta.Accessor(obj) + if err != nil { + return false, fmt.Errorf("Error reading cache of %s: %v", className, err) + } + objName := objAccessor.GetNamespace() + "/" + objAccessor.GetName() + + oldObj, found, err := store.Get(obj) + if err != nil { + return false, fmt.Errorf("Error finding %s %q in controller cache: %v", className, objName, err) + } + + if !found { + // This is a new object + glog.V(4).Infof("storeObjectUpdate: adding %s %q, version %s", className, objName, objAccessor.GetResourceVersion()) + if err = store.Add(obj); err != nil { + return false, fmt.Errorf("Error adding %s %q to controller cache: %v", className, objName, err) + } + return true, nil + } + + oldObjAccessor, err := meta.Accessor(oldObj) + if err != nil { + return false, err + } + + objResourceVersion, err := strconv.ParseInt(objAccessor.GetResourceVersion(), 10, 64) + if err != nil { + return false, fmt.Errorf("Error parsing ResourceVersion %q of %s %q: %s", objAccessor.GetResourceVersion(), className, objName, err) + } + oldObjResourceVersion, err := strconv.ParseInt(oldObjAccessor.GetResourceVersion(), 10, 64) + if err != nil { + return false, fmt.Errorf("Error parsing old ResourceVersion %q of %s %q: %s", oldObjAccessor.GetResourceVersion(), className, objName, err) + } + + // Throw away only older version, let the same version pass - we do want to + // get periodic sync events. + if oldObjResourceVersion > objResourceVersion { + glog.V(4).Infof("storeObjectUpdate: ignoring %s %q version %s", className, objName, objAccessor.GetResourceVersion()) + return false, nil + } + + glog.V(4).Infof("storeObjectUpdate updating %s %q with version %s", className, objName, objAccessor.GetResourceVersion()) + if err = store.Update(obj); err != nil { + return false, fmt.Errorf("Error updating %s %q in controller cache: %v", className, objName, err) + } + return true, nil +} diff --git a/pkg/controller/persistentvolume/controller_test.go b/pkg/controller/persistentvolume/controller_test.go index d3f92e6b6f7..0c816d590da 100644 --- a/pkg/controller/persistentvolume/controller_test.go +++ b/pkg/controller/persistentvolume/controller_test.go @@ -22,6 +22,7 @@ import ( "github.com/golang/glog" "k8s.io/kubernetes/pkg/api" + "k8s.io/kubernetes/pkg/client/cache" "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset/fake" "k8s.io/kubernetes/pkg/controller/framework" "k8s.io/kubernetes/pkg/conversion" @@ -159,3 +160,71 @@ func TestControllerSync(t *testing.T) { evaluateTestResults(ctrl, reactor, test, t) } } + +func storeVersion(t *testing.T, prefix string, c cache.Store, version string, expectedReturn bool) { + pv := newVolume("pvName", "1Gi", "", "", api.VolumeAvailable, api.PersistentVolumeReclaimDelete) + pv.ResourceVersion = version + ret, err := storeObjectUpdate(c, pv, "volume") + if err != nil { + t.Errorf("%s: expected storeObjectUpdate to succeed, got: %v", prefix, err) + } + if expectedReturn != ret { + t.Errorf("%s: expected storeObjectUpdate to return %v, got: %v", prefix, expectedReturn, ret) + } + + // find the stored version + + pvObj, found, err := c.GetByKey("pvName") + if err != nil { + t.Errorf("expected volume 'pvName' in the cache, got error instead: %v", err) + } + if !found { + t.Errorf("expected volume 'pvName' in the cache but it was not found") + } + pv, ok := pvObj.(*api.PersistentVolume) + if !ok { + t.Errorf("expected volume in the cache, got different object instead: %+v", pvObj) + } + + if ret { + if pv.ResourceVersion != version { + t.Errorf("expected volume with version %s in the cache, got %s instead", version, pv.ResourceVersion) + } + } else { + if pv.ResourceVersion == version { + t.Errorf("expected volume with version other than %s in the cache, got %s instead", version, pv.ResourceVersion) + } + } +} + +// TestControllerCache tests func storeObjectUpdate() +func TestControllerCache(t *testing.T) { + // Cache under test + c := cache.NewStore(framework.DeletionHandlingMetaNamespaceKeyFunc) + + // Store new PV + storeVersion(t, "Step1", c, "1", true) + // Store the same PV + storeVersion(t, "Step2", c, "1", true) + // Store newer PV + storeVersion(t, "Step3", c, "2", true) + // Store older PV - simulating old "PV updated" event or periodic sync with + // old data + storeVersion(t, "Step4", c, "1", false) + // Store newer PV - test integer parsing ("2" > "10" as string, + // while 2 < 10 as integers) + storeVersion(t, "Step5", c, "10", true) +} + +func TestControllerCacheParsingError(t *testing.T) { + c := cache.NewStore(framework.DeletionHandlingMetaNamespaceKeyFunc) + // There must be something in the cache to compare with + storeVersion(t, "Step1", c, "1", true) + + pv := newVolume("pvName", "1Gi", "", "", api.VolumeAvailable, api.PersistentVolumeReclaimDelete) + pv.ResourceVersion = "xxx" + _, err := storeObjectUpdate(c, pv, "volume") + if err == nil { + t.Errorf("Expected parsing error, got nil instead") + } +} diff --git a/pkg/controller/persistentvolume/index.go b/pkg/controller/persistentvolume/index.go index 8da72389bdb..d0e927d0487 100644 --- a/pkg/controller/persistentvolume/index.go +++ b/pkg/controller/persistentvolume/index.go @@ -29,6 +29,10 @@ type persistentVolumeOrderedIndex struct { store cache.Indexer } +func newPersistentVolumeOrderedIndex() persistentVolumeOrderedIndex { + return persistentVolumeOrderedIndex{cache.NewIndexer(cache.MetaNamespaceKeyFunc, cache.Indexers{"accessmodes": accessModesIndexFunc})} +} + // accessModesIndexFunc is an indexing function that returns a persistent volume's AccessModes as a string func accessModesIndexFunc(obj interface{}) ([]string, error) { if pv, ok := obj.(*api.PersistentVolume); ok { diff --git a/pkg/controller/persistentvolume/index_test.go b/pkg/controller/persistentvolume/index_test.go index 61134f13700..548b8dc324c 100644 --- a/pkg/controller/persistentvolume/index_test.go +++ b/pkg/controller/persistentvolume/index_test.go @@ -22,13 +22,8 @@ import ( "k8s.io/kubernetes/pkg/api" "k8s.io/kubernetes/pkg/api/resource" "k8s.io/kubernetes/pkg/api/testapi" - "k8s.io/kubernetes/pkg/client/cache" ) -func newPersistentVolumeOrderedIndex() persistentVolumeOrderedIndex { - return persistentVolumeOrderedIndex{cache.NewIndexer(cache.MetaNamespaceKeyFunc, cache.Indexers{"accessmodes": accessModesIndexFunc})} -} - func TestMatchVolume(t *testing.T) { volList := newPersistentVolumeOrderedIndex() for _, pv := range createTestVolumes() {