recycler: implement recycler

Also update the old unit test to pass. New unit tests will be added in
subsequent commit.
This commit is contained in:
Jan Safranek 2016-05-17 14:55:20 +02:00
parent 56cae2dc20
commit 1feb346830

View File

@ -94,6 +94,7 @@ type PersistentVolumeController struct {
kubeClient clientset.Interface
eventRecorder record.EventRecorder
cloud cloudprovider.Interface
recyclePluginMgr vol.VolumePluginMgr
// PersistentVolumeController keeps track of long running operations and
// makes sure it won't start the same operation twice in parallel.
@ -127,6 +128,7 @@ func NewPersistentVolumeController(
runningOperations: make(map[string]bool),
cloud: cloud,
}
controller.recyclePluginMgr.InitPlugins(recyclers, controller)
volumeSource := &cache.ListWatch{
ListFunc: func(options api.ListOptions) (runtime.Object, error) {
@ -965,54 +967,208 @@ func (ctrl *PersistentVolumeController) unbindVolume(volume *api.PersistentVolum
}
// reclaimVolume implements volume.Spec.PersistentVolumeReclaimPolicy and
// starts appropriate reclaim action.
func (ctrl *PersistentVolumeController) reclaimVolume(volume *api.PersistentVolume) error {
if volume.Spec.PersistentVolumeReclaimPolicy == api.PersistentVolumeReclaimRetain {
glog.V(4).Infof("synchronizing PersistentVolume[%s]: policy is Retain, nothing to do", volume.Name)
return nil
}
/* else if volume.Spec.PersistentVolumeReclaimPolicy == api.PersistentVolumeReclaimRecycle {
plugin := findRecyclerPluginForPV(pv)
if plugin != nil {
// HOWTO RELEASE A PV
// maintain a map of running scrubber-pod-monitoring
// goroutines, guarded by mutex
//
// launch a goroutine that:
// 0. verify the PV object still needs to be recycled or return
// 1. launches a scrubber pod; the pod's name is deterministically created based on PV uid
// 2. if the pod is rejected for dup, adopt the existing pod
// 2.5. if the pod is rejected for any other reason, retry later
// 3. else (the create succeeds), ok
// 4. wait for pod completion
// 5. marks the PV API object as available
// 5.5. clear ClaimRef.UID
// 5.6. if boundByController, clear ClaimRef & boundByController annotation
// 6. deletes itself from the map when it's done
} else {
// make an event calling out that no recycler was configured
// mark the PV as failed
switch volume.Spec.PersistentVolumeReclaimPolicy {
case api.PersistentVolumeReclaimRetain:
glog.V(4).Infof("reclaimVolume[%s]: policy is Retain, nothing to do", volume.Name)
case api.PersistentVolumeReclaimRecycle:
glog.V(4).Infof("reclaimVolume[%s]: policy is Recycle", volume.Name)
ctrl.scheduleOperation("recycle-"+string(volume.UID), ctrl.recycleVolumeOperation, volume)
case api.PersistentVolumeReclaimDelete:
glog.V(4).Infof("reclaimVolume[%s]: policy is Delete", volume.Name)
ctrl.scheduleOperation("delete-"+string(volume.UID), ctrl.deleteVolumeOperation, volume)
default:
// Unknown PersistentVolumeReclaimPolicy
if volume.Status.Phase != api.VolumeFailed {
// Log the error only once, when we enter 'Failed' phase
glog.V(3).Infof("reclaimVolume[%s]: volume has unrecognized PersistentVolumeReclaimPolicy %q, marking as Failed", volume.Name, volume.Spec.PersistentVolumeReclaimPolicy)
ctrl.eventRecorder.Event(volume, api.EventTypeWarning, "VolumeUnknownReclaimPolicy", "Volume has unrecognized PersistentVolumeReclaimPolicy")
}
} else if pv.Spec.ReclaimPolicy == "Delete" {
plugin := findDeleterPluginForPV(pv)
if plugin != nil {
// maintain a map with the current deleter goroutines that are running
// if the key is already present in the map, return
//
// launch the goroutine that:
// 1. deletes the storage asset
// 2. deletes the PV API object
// 3. deletes itself from the map when it's done
} else {
// make an event calling out that no deleter was configured
// mark the PV as failed
// NB: external provisioners/deleters are currently not
// considered.
}
*/
if _, err := ctrl.updateVolumePhase(volume, api.VolumeFailed); err != nil {
return err
}
}
return nil
}
// doRerecycleVolumeOperationcycleVolume recycles a volume. This method is
// running in standalone goroutine and already has all necessary locks.
func (ctrl *PersistentVolumeController) recycleVolumeOperation(arg interface{}) {
volume, ok := arg.(*api.PersistentVolume)
if !ok {
glog.Errorf("Cannot convert recycleVolumeOperation argument to volume, got %+v", arg)
return
}
glog.V(4).Infof("recycleVolumeOperation [%s] started", volume.Name)
// This method may have been waiting for a volume lock for some time.
// Previous recycleVolumeOperation might just have saved an updated version,
// so read current volume state now.
newVolume, err := ctrl.kubeClient.Core().PersistentVolumes().Get(volume.Name)
if err != nil {
glog.V(3).Infof("error reading peristent volume %q: %v", volume.Name, err)
return
}
needsReclaim, err := ctrl.isVolumeReleased(newVolume)
if err != nil {
glog.V(3).Infof("error reading claim for volume %q: %v", volume.Name, err)
return
}
if !needsReclaim {
glog.V(3).Infof("volume %q no longer needs recycling, skipping", volume.Name)
return
}
// Use the newest volume copy, this will save us from version conflicts on
// saving.
volume = newVolume
// Find a plugin.
spec := vol.NewSpecFromPersistentVolume(volume, false)
plugin, err := ctrl.recyclePluginMgr.FindRecyclablePluginBySpec(spec)
if err != nil {
// No recycler found. Emit an event and mark the volume Failed.
if volume.Status.Phase != api.VolumeFailed {
// Log the error only once, when we enter 'Failed' phase
glog.V(2).Infof("failed to find recycle plugin for volume %q: %v", volume.Name, err)
ctrl.eventRecorder.Event(volume, api.EventTypeWarning, "VolumeFailedRecycle", "No recycler plugin found for the volume!")
}
if _, err = ctrl.updateVolumePhase(volume, api.VolumeFailed); err != nil {
glog.V(4).Infof("recycleVolumeOperation [%s]: failed to mark volume as failed: %v", volume.Name, err)
// Save failed, retry on the next deletion attempt
return
}
// Despite the volume being Failed, the controller will retry recycling
// the volume in every syncVolume() call.
return
}
// Plugin found
recycler, err := plugin.NewRecycler(spec)
if err != nil {
// Cannot create recycler
if volume.Status.Phase != api.VolumeFailed {
// Log the error only once, when we enter 'Failed' phase
glog.V(2).Infof("failed to create recycler for volume %q: %v", volume.Name, err)
strerr := fmt.Sprintf("Failed to create recycler: %v", err)
ctrl.eventRecorder.Event(volume, api.EventTypeWarning, "VolumeFailedRecycle", strerr)
}
if _, err = ctrl.updateVolumePhase(volume, api.VolumeFailed); err != nil {
glog.V(4).Infof("recycleVolumeOperation [%s]: failed to mark volume as failed: %v", volume.Name, err)
// Save failed, retry on the next deletion attempt
return
}
// Despite the volume being Failed, the controller will retry recycling
// the volume in every syncVolume() call.
return
}
if err = recycler.Recycle(); err != nil {
// Recycler failed
if volume.Status.Phase != api.VolumeFailed {
// Log the error only once, when we enter 'Failed' phase
glog.V(2).Infof("recycler for volume %q failed: %v", volume.Name, err)
strerr := fmt.Sprintf("Recycler failed: %s", err)
ctrl.eventRecorder.Event(volume, api.EventTypeWarning, "VolumeFailedRecycle", strerr)
}
if _, err = ctrl.updateVolumePhase(volume, api.VolumeFailed); err != nil {
glog.V(4).Infof("recycleVolumeOperation [%s]: failed to mark volume as failed: %v", volume.Name, err)
// Save failed, retry on the next deletion attempt
return
}
// Despite the volume being Failed, the controller will retry recycling
// the volume in every syncVolume() call.
return
}
glog.V(2).Infof("volume %q recycled", volume.Name)
// Make the volume available again
if err = ctrl.unbindVolume(volume); err != nil {
// Oops, could not save the volume and therefore the controller will
// recycle the volume again on next update. We _could_ maintain a cache
// of "recently recycled volumes" and avoid unnecessary recycling, this
// is left out as future optimization.
glog.V(3).Infof("recycleVolumeOperation [%s]: failed to make recycled volume 'Available' (%v), we will recycle the volume again", volume.Name, err)
return
}
return
}
// deleteVolumeOperation deletes a volume. This method is running in standalone
// goroutine and already has all necessary locks.
func (ctrl *PersistentVolumeController) deleteVolumeOperation(arg interface{}) {
volume := arg.(*api.PersistentVolume)
glog.V(4).Infof("deleteVolumeOperation [%s] started", volume.Name)
/*else if pv.Spec.ReclaimPolicy == "Delete" {
plugin := findDeleterPluginForPV(pv)
if plugin != nil {
// maintain a map with the current deleter goroutines that are running
// if the key is already present in the map, return
//
// launch the goroutine that:
// 1. deletes the storage asset
// 2. deletes the PV API object
// 3. deletes itself from the map when it's done
} else {
// make an event calling out that no deleter was configured
// mark the PV as failed
// NB: external provisioners/deleters are currently not
// considered.
}
*/
}
// isVolumeReleased returns true if given volume is released and can be recycled
// or deleted, based on its retain policy. I.e. the volume is bound to a claim
// and the claim does not exist or exists and is bound to different volume.
func (ctrl *PersistentVolumeController) isVolumeReleased(volume *api.PersistentVolume) (bool, error) {
// A volume needs reclaim if it has ClaimRef and appropriate claim does not
// exist.
if volume.Spec.ClaimRef == nil {
glog.V(4).Infof("isVolumeReleased[%s]: ClaimRef is nil", volume.Name)
return false, nil
}
if volume.Spec.ClaimRef.UID == "" {
// This is a volume bound by user and the controller has not finished
// binding to the real claim yet.
glog.V(4).Infof("isVolumeReleased[%s]: ClaimRef is not bound", volume.Name)
return false, nil
}
var claim *api.PersistentVolumeClaim
claimName := claimrefToClaimKey(volume.Spec.ClaimRef)
obj, found, err := ctrl.claims.GetByKey(claimName)
if err != nil {
return false, err
}
if !found {
// Fall through with claim = nil
} else {
var ok bool
claim, ok = obj.(*api.PersistentVolumeClaim)
if !ok {
return false, fmt.Errorf("Cannot convert object from claim cache to claim!?: %+v", obj)
}
}
if claim != nil && claim.UID == volume.Spec.ClaimRef.UID {
// the claim still exists and has the right UID
glog.V(4).Infof("isVolumeReleased[%s]: ClaimRef is still valid, volume is not released", volume.Name)
return false, nil
}
glog.V(2).Infof("isVolumeReleased[%s]: volume is released", volume.Name)
return true, nil
}
// scheduleOperation starts given asynchronous operation on given volume. It
// makes sure the operation is already not running.
func (ctrl *PersistentVolumeController) scheduleOperation(operationName string, operation func(arg interface{}), arg interface{}) {