From 169076e7da8bf1edcb03a4435f5e5032f61726b0 Mon Sep 17 00:00:00 2001 From: Jan Safranek Date: Mon, 27 Jun 2016 13:08:02 +0200 Subject: [PATCH] Fix initialization of volume controller caches. Fix PersistentVolumeController.initializeCaches() to pass pointers to volume or claim to storeObjectUpdate() and add extra functions to enforce that the right types are checked in the future. Fixes #28076 --- pkg/controller/persistentvolume/controller.go | 12 ++--- .../persistentvolume/controller_base.go | 53 +++++++++++++------ 2 files changed, 43 insertions(+), 22 deletions(-) diff --git a/pkg/controller/persistentvolume/controller.go b/pkg/controller/persistentvolume/controller.go index 69e1ba54d96..87dc695c275 100644 --- a/pkg/controller/persistentvolume/controller.go +++ b/pkg/controller/persistentvolume/controller.go @@ -506,7 +506,7 @@ func (ctrl *PersistentVolumeController) updateClaimPhase(claim *api.PersistentVo glog.V(4).Infof("updating PersistentVolumeClaim[%s]: set phase %s failed: %v", claimToClaimKey(claim), phase, err) return newClaim, err } - _, err = storeObjectUpdate(ctrl.claims, newClaim, "claim") + _, err = ctrl.storeClaimUpdate(newClaim) if err != nil { glog.V(4).Infof("updating PersistentVolumeClaim[%s]: cannot update internal cache: %v", claimToClaimKey(claim), err) return newClaim, err @@ -565,7 +565,7 @@ func (ctrl *PersistentVolumeController) updateVolumePhase(volume *api.Persistent glog.V(4).Infof("updating PersistentVolume[%s]: set phase %s failed: %v", volume.Name, phase, err) return newVol, err } - _, err = storeObjectUpdate(ctrl.volumes.store, newVol, "volume") + _, err = ctrl.storeVolumeUpdate(newVol) if err != nil { glog.V(4).Infof("updating PersistentVolume[%s]: cannot update internal cache: %v", volume.Name, err) return newVol, err @@ -650,7 +650,7 @@ func (ctrl *PersistentVolumeController) bindVolumeToClaim(volume *api.Persistent glog.V(4).Infof("updating PersistentVolume[%s]: binding to %q failed: %v", volume.Name, claimToClaimKey(claim), err) return newVol, err } - _, err = storeObjectUpdate(ctrl.volumes.store, newVol, "volume") + _, err = ctrl.storeVolumeUpdate(newVol) if err != nil { glog.V(4).Infof("updating PersistentVolume[%s]: cannot update internal cache: %v", volume.Name, err) return newVol, err @@ -712,7 +712,7 @@ func (ctrl *PersistentVolumeController) bindClaimToVolume(claim *api.PersistentV glog.V(4).Infof("updating PersistentVolumeClaim[%s]: binding to %q failed: %v", claimToClaimKey(claim), volume.Name, err) return newClaim, err } - _, err = storeObjectUpdate(ctrl.claims, newClaim, "claim") + _, err = ctrl.storeClaimUpdate(newClaim) if err != nil { glog.V(4).Infof("updating PersistentVolumeClaim[%s]: cannot update internal cache: %v", claimToClaimKey(claim), err) return newClaim, err @@ -806,7 +806,7 @@ func (ctrl *PersistentVolumeController) unbindVolume(volume *api.PersistentVolum glog.V(4).Infof("updating PersistentVolume[%s]: rollback failed: %v", volume.Name, err) return err } - _, err = storeObjectUpdate(ctrl.volumes.store, newVol, "volume") + _, err = ctrl.storeVolumeUpdate(newVol) if err != nil { glog.V(4).Infof("updating PersistentVolume[%s]: cannot update internal cache: %v", volume.Name, err) return err @@ -1171,7 +1171,7 @@ func (ctrl *PersistentVolumeController) provisionClaimOperation(claimObj interfa // Save succeeded. glog.V(3).Infof("volume %q for claim %q saved", volume.Name, claimToClaimKey(claim)) - _, err = storeObjectUpdate(ctrl.volumes.store, newVol, "volume") + _, err = ctrl.storeVolumeUpdate(newVol) if err != nil { // We will get an "volume added" event soon, this is not a big error glog.V(4).Infof("provisionClaimOperation [%s]: cannot update internal cache: %v", volume.Name, err) diff --git a/pkg/controller/persistentvolume/controller_base.go b/pkg/controller/persistentvolume/controller_base.go index 9cf03a81a21..73915973e2d 100644 --- a/pkg/controller/persistentvolume/controller_base.go +++ b/pkg/controller/persistentvolume/controller_base.go @@ -30,6 +30,7 @@ import ( "k8s.io/kubernetes/pkg/client/record" "k8s.io/kubernetes/pkg/cloudprovider" "k8s.io/kubernetes/pkg/controller/framework" + "k8s.io/kubernetes/pkg/conversion" "k8s.io/kubernetes/pkg/runtime" "k8s.io/kubernetes/pkg/util/goroutinemap" vol "k8s.io/kubernetes/pkg/volume" @@ -148,7 +149,12 @@ func (ctrl *PersistentVolumeController) initializeCaches(volumeSource, claimSour // Ignore template volumes from kubernetes 1.2 deleted := ctrl.upgradeVolumeFrom1_2(&volume) if !deleted { - storeObjectUpdate(ctrl.volumes.store, volume, "volume") + clone, err := conversion.NewCloner().DeepCopy(&volume) + if err != nil { + glog.Errorf("error cloning volume %q: %v", volume.Name, err) + } + volumeClone := clone.(*api.PersistentVolume) + ctrl.storeVolumeUpdate(volumeClone) } } @@ -163,11 +169,24 @@ func (ctrl *PersistentVolumeController) initializeCaches(volumeSource, claimSour return } for _, claim := range claimList.Items { - storeObjectUpdate(ctrl.claims, claim, "claim") + clone, err := conversion.NewCloner().DeepCopy(&claim) + if err != nil { + glog.Errorf("error cloning claim %q: %v", claimToClaimKey(&claim), err) + } + claimClone := clone.(*api.PersistentVolumeClaim) + ctrl.storeClaimUpdate(claimClone) } glog.V(4).Infof("controller initialized") } +func (ctrl *PersistentVolumeController) storeVolumeUpdate(volume *api.PersistentVolume) (bool, error) { + return storeObjectUpdate(ctrl.volumes.store, volume, "volume") +} + +func (ctrl *PersistentVolumeController) storeClaimUpdate(claim *api.PersistentVolumeClaim) (bool, error) { + return storeObjectUpdate(ctrl.claims, claim, "claim") +} + // addVolume is callback from framework.Controller watching PersistentVolume // events. func (ctrl *PersistentVolumeController) addVolume(obj interface{}) { @@ -184,7 +203,7 @@ func (ctrl *PersistentVolumeController) addVolume(obj interface{}) { // Store the new volume version in the cache and do not process it if this // is an old version. - new, err := storeObjectUpdate(ctrl.volumes.store, obj, "volume") + new, err := ctrl.storeVolumeUpdate(pv) if err != nil { glog.Errorf("%v", err) } @@ -219,7 +238,7 @@ func (ctrl *PersistentVolumeController) updateVolume(oldObj, newObj interface{}) // Store the new volume version in the cache and do not process it if this // is an old version. - new, err := storeObjectUpdate(ctrl.volumes.store, newObj, "volume") + new, err := ctrl.storeVolumeUpdate(newVolume) if err != nil { glog.Errorf("%v", err) } @@ -291,7 +310,13 @@ func (ctrl *PersistentVolumeController) deleteVolume(obj interface{}) { func (ctrl *PersistentVolumeController) addClaim(obj interface{}) { // Store the new claim version in the cache and do not process it if this is // an old version. - new, err := storeObjectUpdate(ctrl.claims, obj, "claim") + claim, ok := obj.(*api.PersistentVolumeClaim) + if !ok { + glog.Errorf("Expected PersistentVolumeClaim but addClaim received %+v", obj) + return + } + + new, err := ctrl.storeClaimUpdate(claim) if err != nil { glog.Errorf("%v", err) } @@ -299,11 +324,6 @@ func (ctrl *PersistentVolumeController) addClaim(obj interface{}) { return } - claim, ok := obj.(*api.PersistentVolumeClaim) - if !ok { - glog.Errorf("Expected PersistentVolumeClaim but addClaim received %+v", obj) - return - } if err := ctrl.syncClaim(claim); err != nil { if errors.IsConflict(err) { // Version conflict error happens quite often and the controller @@ -320,7 +340,13 @@ func (ctrl *PersistentVolumeController) addClaim(obj interface{}) { func (ctrl *PersistentVolumeController) updateClaim(oldObj, newObj interface{}) { // Store the new claim version in the cache and do not process it if this is // an old version. - new, err := storeObjectUpdate(ctrl.claims, newObj, "claim") + newClaim, ok := newObj.(*api.PersistentVolumeClaim) + if !ok { + glog.Errorf("Expected PersistentVolumeClaim but updateClaim received %+v", newObj) + return + } + + new, err := ctrl.storeClaimUpdate(newClaim) if err != nil { glog.Errorf("%v", err) } @@ -328,11 +354,6 @@ func (ctrl *PersistentVolumeController) updateClaim(oldObj, newObj interface{}) return } - newClaim, ok := newObj.(*api.PersistentVolumeClaim) - if !ok { - glog.Errorf("Expected PersistentVolumeClaim but updateClaim received %+v", newObj) - return - } if err := ctrl.syncClaim(newClaim); err != nil { if errors.IsConflict(err) { // Version conflict error happens quite often and the controller