diff --git a/pkg/controller/persistentvolume/controller.go b/pkg/controller/persistentvolume/controller.go index f552f773ecd..6b7d31dfebd 100644 --- a/pkg/controller/persistentvolume/controller.go +++ b/pkg/controller/persistentvolume/controller.go @@ -106,10 +106,8 @@ const createProvisionedPVInterval = 10 * time.Second // framework.Controllers that watch PerstentVolume and PersistentVolumeClaim // changes. type PersistentVolumeController struct { - volumes persistentVolumeOrderedIndex volumeController *framework.Controller volumeControllerStopCh chan struct{} - claims cache.Store claimController *framework.Controller claimControllerStopCh chan struct{} kubeClient clientset.Interface @@ -119,6 +117,14 @@ type PersistentVolumeController struct { provisioner vol.ProvisionableVolumePlugin clusterName string + // Cache of the last known version of volumes and claims. This cache is + // thread safe as long as the volumes/claims there are not modified, they + // must be cloned before any modification. These caches get updated both by + // "xxx added/updated/deleted" events from etcd and by the controller when + // it saves newer version to etcd. + volumes persistentVolumeOrderedIndex + claims cache.Store + // PersistentVolumeController keeps track of long running operations and // makes sure it won't start the same operation twice in parallel. // Each operation is identified by unique operationName. @@ -507,6 +513,11 @@ func (ctrl *PersistentVolumeController) updateClaimPhase(claim *api.PersistentVo glog.V(4).Infof("updating PersistentVolumeClaim[%s]: set phase %s failed: %v", claimToClaimKey(claim), phase, err) return newClaim, err } + _, err = storeObjectUpdate(ctrl.claims, newClaim, "claim") + if err != nil { + glog.V(4).Infof("updating PersistentVolumeClaim[%s]: cannot update internal cache: %v", claimToClaimKey(claim), err) + return newClaim, err + } glog.V(2).Infof("claim %q entered phase %q", claimToClaimKey(claim), phase) return newClaim, nil } @@ -559,6 +570,11 @@ func (ctrl *PersistentVolumeController) updateVolumePhase(volume *api.Persistent glog.V(4).Infof("updating PersistentVolume[%s]: set phase %s failed: %v", volume.Name, phase, err) return newVol, err } + _, err = storeObjectUpdate(ctrl.volumes.store, newVol, "volume") + if err != nil { + glog.V(4).Infof("updating PersistentVolume[%s]: cannot update internal cache: %v", volume.Name, err) + return newVol, err + } glog.V(2).Infof("volume %q entered phase %q", volume.Name, phase) return newVol, err } @@ -639,6 +655,11 @@ func (ctrl *PersistentVolumeController) bindVolumeToClaim(volume *api.Persistent glog.V(4).Infof("updating PersistentVolume[%s]: binding to %q failed: %v", volume.Name, claimToClaimKey(claim), err) return newVol, err } + _, err = storeObjectUpdate(ctrl.volumes.store, newVol, "volume") + if err != nil { + glog.V(4).Infof("updating PersistentVolume[%s]: cannot update internal cache: %v", volume.Name, err) + return newVol, err + } glog.V(4).Infof("updating PersistentVolume[%s]: bound to %q", newVol.Name, claimToClaimKey(claim)) return newVol, nil } @@ -696,6 +717,11 @@ func (ctrl *PersistentVolumeController) bindClaimToVolume(claim *api.PersistentV glog.V(4).Infof("updating PersistentVolumeClaim[%s]: binding to %q failed: %v", claimToClaimKey(claim), volume.Name, err) return newClaim, err } + _, err = storeObjectUpdate(ctrl.claims, newClaim, "claim") + if err != nil { + glog.V(4).Infof("updating PersistentVolumeClaim[%s]: cannot update internal cache: %v", claimToClaimKey(claim), err) + return newClaim, err + } glog.V(4).Infof("updating PersistentVolumeClaim[%s]: bound to %q", claimToClaimKey(claim), volume.Name) return newClaim, nil } @@ -785,6 +811,11 @@ func (ctrl *PersistentVolumeController) unbindVolume(volume *api.PersistentVolum glog.V(4).Infof("updating PersistentVolume[%s]: rollback failed: %v", volume.Name, err) return err } + _, err = storeObjectUpdate(ctrl.volumes.store, newVol, "volume") + if err != nil { + glog.V(4).Infof("updating PersistentVolume[%s]: cannot update internal cache: %v", volume.Name, err) + return err + } glog.V(4).Infof("updating PersistentVolume[%s]: rolled back", newVol.Name) // Update the status @@ -1127,9 +1158,16 @@ func (ctrl *PersistentVolumeController) provisionClaimOperation(claimObj interfa // Try to create the PV object several times for i := 0; i < ctrl.createProvisionedPVRetryCount; i++ { glog.V(4).Infof("provisionClaimOperation [%s]: trying to save volume %s", claimToClaimKey(claim), volume.Name) - if _, err = ctrl.kubeClient.Core().PersistentVolumes().Create(volume); err == nil { + var newVol *api.PersistentVolume + if newVol, err = ctrl.kubeClient.Core().PersistentVolumes().Create(volume); err == nil { // Save succeeded. glog.V(3).Infof("volume %q for claim %q saved", volume.Name, claimToClaimKey(claim)) + + _, err = storeObjectUpdate(ctrl.volumes.store, newVol, "volume") + if err != nil { + // We will get an "volume added" event soon, this is not a big error + glog.V(4).Infof("provisionClaimOperation [%s]: cannot update internal cache: %v", volume.Name, err) + } break } // Save failed, try again after a while. diff --git a/pkg/controller/persistentvolume/controller_base.go b/pkg/controller/persistentvolume/controller_base.go index 55c7417f00c..fcd1fda6624 100644 --- a/pkg/controller/persistentvolume/controller_base.go +++ b/pkg/controller/persistentvolume/controller_base.go @@ -18,10 +18,12 @@ package persistentvolume import ( "fmt" + "strconv" "time" "k8s.io/kubernetes/pkg/api" "k8s.io/kubernetes/pkg/api/errors" + "k8s.io/kubernetes/pkg/api/meta" "k8s.io/kubernetes/pkg/client/cache" clientset "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset" unversioned_core "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset/typed/core/unversioned" @@ -58,6 +60,8 @@ func NewPersistentVolumeController( } controller := &PersistentVolumeController{ + volumes: newPersistentVolumeOrderedIndex(), + claims: cache.NewStore(framework.DeletionHandlingMetaNamespaceKeyFunc), kubeClient: kubeClient, eventRecorder: eventRecorder, runningOperations: make(map[string]bool), @@ -97,7 +101,7 @@ func NewPersistentVolumeController( } } - controller.volumes.store, controller.volumeController = framework.NewIndexerInformer( + _, controller.volumeController = framework.NewIndexerInformer( volumeSource, &api.PersistentVolume{}, syncPeriod, @@ -108,7 +112,7 @@ func NewPersistentVolumeController( }, cache.Indexers{"accessmodes": accessModesIndexFunc}, ) - controller.claims, controller.claimController = framework.NewInformer( + _, controller.claimController = framework.NewInformer( claimSource, &api.PersistentVolumeClaim{}, syncPeriod, @@ -124,6 +128,16 @@ func NewPersistentVolumeController( // addVolume is callback from framework.Controller watching PersistentVolume // events. func (ctrl *PersistentVolumeController) addVolume(obj interface{}) { + // Store the new volume version in the cache and do not process it if this + // is an old version. + new, err := storeObjectUpdate(ctrl.volumes.store, obj, "volume") + if err != nil { + glog.Errorf("%v", err) + } + if !new { + return + } + if !ctrl.isFullySynced() { return } @@ -147,6 +161,16 @@ func (ctrl *PersistentVolumeController) addVolume(obj interface{}) { // updateVolume is callback from framework.Controller watching PersistentVolume // events. func (ctrl *PersistentVolumeController) updateVolume(oldObj, newObj interface{}) { + // Store the new volume version in the cache and do not process it if this + // is an old version. + new, err := storeObjectUpdate(ctrl.volumes.store, newObj, "volume") + if err != nil { + glog.Errorf("%v", err) + } + if !new { + return + } + if !ctrl.isFullySynced() { return } @@ -170,6 +194,8 @@ func (ctrl *PersistentVolumeController) updateVolume(oldObj, newObj interface{}) // deleteVolume is callback from framework.Controller watching PersistentVolume // events. func (ctrl *PersistentVolumeController) deleteVolume(obj interface{}) { + _ = ctrl.volumes.store.Delete(obj) + if !ctrl.isFullySynced() { return } @@ -218,6 +244,16 @@ func (ctrl *PersistentVolumeController) deleteVolume(obj interface{}) { // addClaim is callback from framework.Controller watching PersistentVolumeClaim // events. func (ctrl *PersistentVolumeController) addClaim(obj interface{}) { + // Store the new claim version in the cache and do not process it if this is + // an old version. + new, err := storeObjectUpdate(ctrl.claims, obj, "claim") + if err != nil { + glog.Errorf("%v", err) + } + if !new { + return + } + if !ctrl.isFullySynced() { return } @@ -241,6 +277,16 @@ func (ctrl *PersistentVolumeController) addClaim(obj interface{}) { // updateClaim is callback from framework.Controller watching PersistentVolumeClaim // events. func (ctrl *PersistentVolumeController) updateClaim(oldObj, newObj interface{}) { + // Store the new claim version in the cache and do not process it if this is + // an old version. + new, err := storeObjectUpdate(ctrl.claims, newObj, "claim") + if err != nil { + glog.Errorf("%v", err) + } + if !new { + return + } + if !ctrl.isFullySynced() { return } @@ -264,6 +310,8 @@ func (ctrl *PersistentVolumeController) updateClaim(oldObj, newObj interface{}) // deleteClaim is callback from framework.Controller watching PersistentVolumeClaim // events. func (ctrl *PersistentVolumeController) deleteClaim(obj interface{}) { + _ = ctrl.claims.Delete(obj) + if !ctrl.isFullySynced() { return } @@ -388,3 +436,56 @@ func isVolumeBoundToClaim(volume *api.PersistentVolume, claim *api.PersistentVol } return true } + +// storeObjectUpdate updates given cache with a new object version from Informer +// callback (i.e. with events from etcd) or with an object modified by the +// controller itself. Returns "true", if the cache was updated, false if the +// object is an old version and should be ignored. +func storeObjectUpdate(store cache.Store, obj interface{}, className string) (bool, error) { + objAccessor, err := meta.Accessor(obj) + if err != nil { + return false, fmt.Errorf("Error reading cache of %s: %v", className, err) + } + objName := objAccessor.GetNamespace() + "/" + objAccessor.GetName() + + oldObj, found, err := store.Get(obj) + if err != nil { + return false, fmt.Errorf("Error finding %s %q in controller cache: %v", className, objName, err) + } + + if !found { + // This is a new object + glog.V(4).Infof("storeObjectUpdate: adding %s %q, version %s", className, objName, objAccessor.GetResourceVersion()) + if err = store.Add(obj); err != nil { + return false, fmt.Errorf("Error adding %s %q to controller cache: %v", className, objName, err) + } + return true, nil + } + + oldObjAccessor, err := meta.Accessor(oldObj) + if err != nil { + return false, err + } + + objResourceVersion, err := strconv.ParseInt(objAccessor.GetResourceVersion(), 10, 64) + if err != nil { + return false, fmt.Errorf("Error parsing ResourceVersion %q of %s %q: %s", objAccessor.GetResourceVersion(), className, objName, err) + } + oldObjResourceVersion, err := strconv.ParseInt(oldObjAccessor.GetResourceVersion(), 10, 64) + if err != nil { + return false, fmt.Errorf("Error parsing old ResourceVersion %q of %s %q: %s", oldObjAccessor.GetResourceVersion(), className, objName, err) + } + + // Throw away only older version, let the same version pass - we do want to + // get periodic sync events. + if oldObjResourceVersion > objResourceVersion { + glog.V(4).Infof("storeObjectUpdate: ignoring %s %q version %s", className, objName, objAccessor.GetResourceVersion()) + return false, nil + } + + glog.V(4).Infof("storeObjectUpdate updating %s %q with version %s", className, objName, objAccessor.GetResourceVersion()) + if err = store.Update(obj); err != nil { + return false, fmt.Errorf("Error updating %s %q in controller cache: %v", className, objName, err) + } + return true, nil +} diff --git a/pkg/controller/persistentvolume/controller_test.go b/pkg/controller/persistentvolume/controller_test.go index 5b65b1176f5..0dd4f0c466c 100644 --- a/pkg/controller/persistentvolume/controller_test.go +++ b/pkg/controller/persistentvolume/controller_test.go @@ -22,6 +22,7 @@ import ( "github.com/golang/glog" "k8s.io/kubernetes/pkg/api" + "k8s.io/kubernetes/pkg/client/cache" "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset/fake" "k8s.io/kubernetes/pkg/controller/framework" "k8s.io/kubernetes/pkg/conversion" @@ -164,3 +165,71 @@ func TestControllerSync(t *testing.T) { evaluateTestResults(ctrl, reactor, test, t) } } + +func storeVersion(t *testing.T, prefix string, c cache.Store, version string, expectedReturn bool) { + pv := newVolume("pvName", "1Gi", "", "", api.VolumeAvailable, api.PersistentVolumeReclaimDelete) + pv.ResourceVersion = version + ret, err := storeObjectUpdate(c, pv, "volume") + if err != nil { + t.Errorf("%s: expected storeObjectUpdate to succeed, got: %v", prefix, err) + } + if expectedReturn != ret { + t.Errorf("%s: expected storeObjectUpdate to return %v, got: %v", prefix, expectedReturn, ret) + } + + // find the stored version + + pvObj, found, err := c.GetByKey("pvName") + if err != nil { + t.Errorf("expected volume 'pvName' in the cache, got error instead: %v", err) + } + if !found { + t.Errorf("expected volume 'pvName' in the cache but it was not found") + } + pv, ok := pvObj.(*api.PersistentVolume) + if !ok { + t.Errorf("expected volume in the cache, got different object instead: %+v", pvObj) + } + + if ret { + if pv.ResourceVersion != version { + t.Errorf("expected volume with version %s in the cache, got %s instead", version, pv.ResourceVersion) + } + } else { + if pv.ResourceVersion == version { + t.Errorf("expected volume with version other than %s in the cache, got %s instead", version, pv.ResourceVersion) + } + } +} + +// TestControllerCache tests func storeObjectUpdate() +func TestControllerCache(t *testing.T) { + // Cache under test + c := cache.NewStore(framework.DeletionHandlingMetaNamespaceKeyFunc) + + // Store new PV + storeVersion(t, "Step1", c, "1", true) + // Store the same PV + storeVersion(t, "Step2", c, "1", true) + // Store newer PV + storeVersion(t, "Step3", c, "2", true) + // Store older PV - simulating old "PV updated" event or periodic sync with + // old data + storeVersion(t, "Step4", c, "1", false) + // Store newer PV - test integer parsing ("2" > "10" as string, + // while 2 < 10 as integers) + storeVersion(t, "Step5", c, "10", true) +} + +func TestControllerCacheParsingError(t *testing.T) { + c := cache.NewStore(framework.DeletionHandlingMetaNamespaceKeyFunc) + // There must be something in the cache to compare with + storeVersion(t, "Step1", c, "1", true) + + pv := newVolume("pvName", "1Gi", "", "", api.VolumeAvailable, api.PersistentVolumeReclaimDelete) + pv.ResourceVersion = "xxx" + _, err := storeObjectUpdate(c, pv, "volume") + if err == nil { + t.Errorf("Expected parsing error, got nil instead") + } +} diff --git a/pkg/controller/persistentvolume/index.go b/pkg/controller/persistentvolume/index.go index 39c85d939ed..d083ea9008b 100644 --- a/pkg/controller/persistentvolume/index.go +++ b/pkg/controller/persistentvolume/index.go @@ -29,6 +29,10 @@ type persistentVolumeOrderedIndex struct { store cache.Indexer } +func newPersistentVolumeOrderedIndex() persistentVolumeOrderedIndex { + return persistentVolumeOrderedIndex{cache.NewIndexer(cache.MetaNamespaceKeyFunc, cache.Indexers{"accessmodes": accessModesIndexFunc})} +} + // accessModesIndexFunc is an indexing function that returns a persistent volume's AccessModes as a string func accessModesIndexFunc(obj interface{}) ([]string, error) { if pv, ok := obj.(*api.PersistentVolume); ok { diff --git a/pkg/controller/persistentvolume/index_test.go b/pkg/controller/persistentvolume/index_test.go index c80984649cd..47a66a8bc05 100644 --- a/pkg/controller/persistentvolume/index_test.go +++ b/pkg/controller/persistentvolume/index_test.go @@ -23,13 +23,8 @@ import ( "k8s.io/kubernetes/pkg/api" "k8s.io/kubernetes/pkg/api/resource" "k8s.io/kubernetes/pkg/api/testapi" - "k8s.io/kubernetes/pkg/client/cache" ) -func newPersistentVolumeOrderedIndex() persistentVolumeOrderedIndex { - return persistentVolumeOrderedIndex{cache.NewIndexer(cache.MetaNamespaceKeyFunc, cache.Indexers{"accessmodes": accessModesIndexFunc})} -} - func TestMatchVolume(t *testing.T) { volList := newPersistentVolumeOrderedIndex() for _, pv := range createTestVolumes() {