Fix initialization of volume controller caches.

Fix PersistentVolumeController.initializeCaches() to pass pointers to volume
or claim to storeObjectUpdate() and add extra functions to enforce that the
right types are checked in the future.


Fixes #28076
This commit is contained in:
Jan Safranek 2016-06-27 13:08:02 +02:00
parent ecfd4aa131
commit 169076e7da
2 changed files with 43 additions and 22 deletions

View File

@ -506,7 +506,7 @@ func (ctrl *PersistentVolumeController) updateClaimPhase(claim *api.PersistentVo
glog.V(4).Infof("updating PersistentVolumeClaim[%s]: set phase %s failed: %v", claimToClaimKey(claim), phase, err)
return newClaim, err
}
_, err = storeObjectUpdate(ctrl.claims, newClaim, "claim")
_, err = ctrl.storeClaimUpdate(newClaim)
if err != nil {
glog.V(4).Infof("updating PersistentVolumeClaim[%s]: cannot update internal cache: %v", claimToClaimKey(claim), err)
return newClaim, err
@ -565,7 +565,7 @@ func (ctrl *PersistentVolumeController) updateVolumePhase(volume *api.Persistent
glog.V(4).Infof("updating PersistentVolume[%s]: set phase %s failed: %v", volume.Name, phase, err)
return newVol, err
}
_, err = storeObjectUpdate(ctrl.volumes.store, newVol, "volume")
_, err = ctrl.storeVolumeUpdate(newVol)
if err != nil {
glog.V(4).Infof("updating PersistentVolume[%s]: cannot update internal cache: %v", volume.Name, err)
return newVol, err
@ -650,7 +650,7 @@ func (ctrl *PersistentVolumeController) bindVolumeToClaim(volume *api.Persistent
glog.V(4).Infof("updating PersistentVolume[%s]: binding to %q failed: %v", volume.Name, claimToClaimKey(claim), err)
return newVol, err
}
_, err = storeObjectUpdate(ctrl.volumes.store, newVol, "volume")
_, err = ctrl.storeVolumeUpdate(newVol)
if err != nil {
glog.V(4).Infof("updating PersistentVolume[%s]: cannot update internal cache: %v", volume.Name, err)
return newVol, err
@ -712,7 +712,7 @@ func (ctrl *PersistentVolumeController) bindClaimToVolume(claim *api.PersistentV
glog.V(4).Infof("updating PersistentVolumeClaim[%s]: binding to %q failed: %v", claimToClaimKey(claim), volume.Name, err)
return newClaim, err
}
_, err = storeObjectUpdate(ctrl.claims, newClaim, "claim")
_, err = ctrl.storeClaimUpdate(newClaim)
if err != nil {
glog.V(4).Infof("updating PersistentVolumeClaim[%s]: cannot update internal cache: %v", claimToClaimKey(claim), err)
return newClaim, err
@ -806,7 +806,7 @@ func (ctrl *PersistentVolumeController) unbindVolume(volume *api.PersistentVolum
glog.V(4).Infof("updating PersistentVolume[%s]: rollback failed: %v", volume.Name, err)
return err
}
_, err = storeObjectUpdate(ctrl.volumes.store, newVol, "volume")
_, err = ctrl.storeVolumeUpdate(newVol)
if err != nil {
glog.V(4).Infof("updating PersistentVolume[%s]: cannot update internal cache: %v", volume.Name, err)
return err
@ -1171,7 +1171,7 @@ func (ctrl *PersistentVolumeController) provisionClaimOperation(claimObj interfa
// Save succeeded.
glog.V(3).Infof("volume %q for claim %q saved", volume.Name, claimToClaimKey(claim))
_, err = storeObjectUpdate(ctrl.volumes.store, newVol, "volume")
_, err = ctrl.storeVolumeUpdate(newVol)
if err != nil {
// We will get an "volume added" event soon, this is not a big error
glog.V(4).Infof("provisionClaimOperation [%s]: cannot update internal cache: %v", volume.Name, err)

View File

@ -30,6 +30,7 @@ import (
"k8s.io/kubernetes/pkg/client/record"
"k8s.io/kubernetes/pkg/cloudprovider"
"k8s.io/kubernetes/pkg/controller/framework"
"k8s.io/kubernetes/pkg/conversion"
"k8s.io/kubernetes/pkg/runtime"
"k8s.io/kubernetes/pkg/util/goroutinemap"
vol "k8s.io/kubernetes/pkg/volume"
@ -148,7 +149,12 @@ func (ctrl *PersistentVolumeController) initializeCaches(volumeSource, claimSour
// Ignore template volumes from kubernetes 1.2
deleted := ctrl.upgradeVolumeFrom1_2(&volume)
if !deleted {
storeObjectUpdate(ctrl.volumes.store, volume, "volume")
clone, err := conversion.NewCloner().DeepCopy(&volume)
if err != nil {
glog.Errorf("error cloning volume %q: %v", volume.Name, err)
}
volumeClone := clone.(*api.PersistentVolume)
ctrl.storeVolumeUpdate(volumeClone)
}
}
@ -163,11 +169,24 @@ func (ctrl *PersistentVolumeController) initializeCaches(volumeSource, claimSour
return
}
for _, claim := range claimList.Items {
storeObjectUpdate(ctrl.claims, claim, "claim")
clone, err := conversion.NewCloner().DeepCopy(&claim)
if err != nil {
glog.Errorf("error cloning claim %q: %v", claimToClaimKey(&claim), err)
}
claimClone := clone.(*api.PersistentVolumeClaim)
ctrl.storeClaimUpdate(claimClone)
}
glog.V(4).Infof("controller initialized")
}
func (ctrl *PersistentVolumeController) storeVolumeUpdate(volume *api.PersistentVolume) (bool, error) {
return storeObjectUpdate(ctrl.volumes.store, volume, "volume")
}
func (ctrl *PersistentVolumeController) storeClaimUpdate(claim *api.PersistentVolumeClaim) (bool, error) {
return storeObjectUpdate(ctrl.claims, claim, "claim")
}
// addVolume is callback from framework.Controller watching PersistentVolume
// events.
func (ctrl *PersistentVolumeController) addVolume(obj interface{}) {
@ -184,7 +203,7 @@ func (ctrl *PersistentVolumeController) addVolume(obj interface{}) {
// Store the new volume version in the cache and do not process it if this
// is an old version.
new, err := storeObjectUpdate(ctrl.volumes.store, obj, "volume")
new, err := ctrl.storeVolumeUpdate(pv)
if err != nil {
glog.Errorf("%v", err)
}
@ -219,7 +238,7 @@ func (ctrl *PersistentVolumeController) updateVolume(oldObj, newObj interface{})
// Store the new volume version in the cache and do not process it if this
// is an old version.
new, err := storeObjectUpdate(ctrl.volumes.store, newObj, "volume")
new, err := ctrl.storeVolumeUpdate(newVolume)
if err != nil {
glog.Errorf("%v", err)
}
@ -291,7 +310,13 @@ func (ctrl *PersistentVolumeController) deleteVolume(obj interface{}) {
func (ctrl *PersistentVolumeController) addClaim(obj interface{}) {
// Store the new claim version in the cache and do not process it if this is
// an old version.
new, err := storeObjectUpdate(ctrl.claims, obj, "claim")
claim, ok := obj.(*api.PersistentVolumeClaim)
if !ok {
glog.Errorf("Expected PersistentVolumeClaim but addClaim received %+v", obj)
return
}
new, err := ctrl.storeClaimUpdate(claim)
if err != nil {
glog.Errorf("%v", err)
}
@ -299,11 +324,6 @@ func (ctrl *PersistentVolumeController) addClaim(obj interface{}) {
return
}
claim, ok := obj.(*api.PersistentVolumeClaim)
if !ok {
glog.Errorf("Expected PersistentVolumeClaim but addClaim received %+v", obj)
return
}
if err := ctrl.syncClaim(claim); err != nil {
if errors.IsConflict(err) {
// Version conflict error happens quite often and the controller
@ -320,7 +340,13 @@ func (ctrl *PersistentVolumeController) addClaim(obj interface{}) {
func (ctrl *PersistentVolumeController) updateClaim(oldObj, newObj interface{}) {
// Store the new claim version in the cache and do not process it if this is
// an old version.
new, err := storeObjectUpdate(ctrl.claims, newObj, "claim")
newClaim, ok := newObj.(*api.PersistentVolumeClaim)
if !ok {
glog.Errorf("Expected PersistentVolumeClaim but updateClaim received %+v", newObj)
return
}
new, err := ctrl.storeClaimUpdate(newClaim)
if err != nil {
glog.Errorf("%v", err)
}
@ -328,11 +354,6 @@ func (ctrl *PersistentVolumeController) updateClaim(oldObj, newObj interface{})
return
}
newClaim, ok := newObj.(*api.PersistentVolumeClaim)
if !ok {
glog.Errorf("Expected PersistentVolumeClaim but updateClaim received %+v", newObj)
return
}
if err := ctrl.syncClaim(newClaim); err != nil {
if errors.IsConflict(err) {
// Version conflict error happens quite often and the controller