diff --git a/pkg/controller/persistentvolume/persistentvolume_controller.go b/pkg/controller/persistentvolume/persistentvolume_controller.go index 3cf1c4627d6..366605044d2 100644 --- a/pkg/controller/persistentvolume/persistentvolume_controller.go +++ b/pkg/controller/persistentvolume/persistentvolume_controller.go @@ -94,6 +94,7 @@ type PersistentVolumeController struct { kubeClient clientset.Interface eventRecorder record.EventRecorder cloud cloudprovider.Interface + recyclePluginMgr vol.VolumePluginMgr // PersistentVolumeController keeps track of long running operations and // makes sure it won't start the same operation twice in parallel. @@ -127,6 +128,7 @@ func NewPersistentVolumeController( runningOperations: make(map[string]bool), cloud: cloud, } + controller.recyclePluginMgr.InitPlugins(recyclers, controller) volumeSource := &cache.ListWatch{ ListFunc: func(options api.ListOptions) (runtime.Object, error) { @@ -965,54 +967,208 @@ func (ctrl *PersistentVolumeController) unbindVolume(volume *api.PersistentVolum } +// reclaimVolume implements volume.Spec.PersistentVolumeReclaimPolicy and +// starts appropriate reclaim action. func (ctrl *PersistentVolumeController) reclaimVolume(volume *api.PersistentVolume) error { - if volume.Spec.PersistentVolumeReclaimPolicy == api.PersistentVolumeReclaimRetain { - glog.V(4).Infof("synchronizing PersistentVolume[%s]: policy is Retain, nothing to do", volume.Name) - return nil - } - /* else if volume.Spec.PersistentVolumeReclaimPolicy == api.PersistentVolumeReclaimRecycle { - plugin := findRecyclerPluginForPV(pv) - if plugin != nil { - // HOWTO RELEASE A PV - // maintain a map of running scrubber-pod-monitoring - // goroutines, guarded by mutex - // - // launch a goroutine that: - // 0. verify the PV object still needs to be recycled or return - // 1. launches a scrubber pod; the pod's name is deterministically created based on PV uid - // 2. if the pod is rejected for dup, adopt the existing pod - // 2.5. if the pod is rejected for any other reason, retry later - // 3. else (the create succeeds), ok - // 4. wait for pod completion - // 5. marks the PV API object as available - // 5.5. clear ClaimRef.UID - // 5.6. if boundByController, clear ClaimRef & boundByController annotation - // 6. deletes itself from the map when it's done - } else { - // make an event calling out that no recycler was configured - // mark the PV as failed + switch volume.Spec.PersistentVolumeReclaimPolicy { + case api.PersistentVolumeReclaimRetain: + glog.V(4).Infof("reclaimVolume[%s]: policy is Retain, nothing to do", volume.Name) + case api.PersistentVolumeReclaimRecycle: + glog.V(4).Infof("reclaimVolume[%s]: policy is Recycle", volume.Name) + ctrl.scheduleOperation("recycle-"+string(volume.UID), ctrl.recycleVolumeOperation, volume) + + case api.PersistentVolumeReclaimDelete: + glog.V(4).Infof("reclaimVolume[%s]: policy is Delete", volume.Name) + ctrl.scheduleOperation("delete-"+string(volume.UID), ctrl.deleteVolumeOperation, volume) + + default: + // Unknown PersistentVolumeReclaimPolicy + if volume.Status.Phase != api.VolumeFailed { + // Log the error only once, when we enter 'Failed' phase + glog.V(3).Infof("reclaimVolume[%s]: volume has unrecognized PersistentVolumeReclaimPolicy %q, marking as Failed", volume.Name, volume.Spec.PersistentVolumeReclaimPolicy) + ctrl.eventRecorder.Event(volume, api.EventTypeWarning, "VolumeUnknownReclaimPolicy", "Volume has unrecognized PersistentVolumeReclaimPolicy") } - } else if pv.Spec.ReclaimPolicy == "Delete" { - plugin := findDeleterPluginForPV(pv) - if plugin != nil { - // maintain a map with the current deleter goroutines that are running - // if the key is already present in the map, return - // - // launch the goroutine that: - // 1. deletes the storage asset - // 2. deletes the PV API object - // 3. deletes itself from the map when it's done - } else { - // make an event calling out that no deleter was configured - // mark the PV as failed - // NB: external provisioners/deleters are currently not - // considered. - } - */ + if _, err := ctrl.updateVolumePhase(volume, api.VolumeFailed); err != nil { + return err + } + } return nil } +// doRerecycleVolumeOperationcycleVolume recycles a volume. This method is +// running in standalone goroutine and already has all necessary locks. +func (ctrl *PersistentVolumeController) recycleVolumeOperation(arg interface{}) { + volume, ok := arg.(*api.PersistentVolume) + if !ok { + glog.Errorf("Cannot convert recycleVolumeOperation argument to volume, got %+v", arg) + return + } + + glog.V(4).Infof("recycleVolumeOperation [%s] started", volume.Name) + + // This method may have been waiting for a volume lock for some time. + // Previous recycleVolumeOperation might just have saved an updated version, + // so read current volume state now. + newVolume, err := ctrl.kubeClient.Core().PersistentVolumes().Get(volume.Name) + if err != nil { + glog.V(3).Infof("error reading peristent volume %q: %v", volume.Name, err) + return + } + needsReclaim, err := ctrl.isVolumeReleased(newVolume) + if err != nil { + glog.V(3).Infof("error reading claim for volume %q: %v", volume.Name, err) + return + } + if !needsReclaim { + glog.V(3).Infof("volume %q no longer needs recycling, skipping", volume.Name) + return + } + + // Use the newest volume copy, this will save us from version conflicts on + // saving. + volume = newVolume + + // Find a plugin. + spec := vol.NewSpecFromPersistentVolume(volume, false) + plugin, err := ctrl.recyclePluginMgr.FindRecyclablePluginBySpec(spec) + if err != nil { + // No recycler found. Emit an event and mark the volume Failed. + if volume.Status.Phase != api.VolumeFailed { + // Log the error only once, when we enter 'Failed' phase + glog.V(2).Infof("failed to find recycle plugin for volume %q: %v", volume.Name, err) + ctrl.eventRecorder.Event(volume, api.EventTypeWarning, "VolumeFailedRecycle", "No recycler plugin found for the volume!") + } + + if _, err = ctrl.updateVolumePhase(volume, api.VolumeFailed); err != nil { + glog.V(4).Infof("recycleVolumeOperation [%s]: failed to mark volume as failed: %v", volume.Name, err) + // Save failed, retry on the next deletion attempt + return + } + // Despite the volume being Failed, the controller will retry recycling + // the volume in every syncVolume() call. + return + } + + // Plugin found + recycler, err := plugin.NewRecycler(spec) + if err != nil { + // Cannot create recycler + if volume.Status.Phase != api.VolumeFailed { + // Log the error only once, when we enter 'Failed' phase + glog.V(2).Infof("failed to create recycler for volume %q: %v", volume.Name, err) + strerr := fmt.Sprintf("Failed to create recycler: %v", err) + ctrl.eventRecorder.Event(volume, api.EventTypeWarning, "VolumeFailedRecycle", strerr) + } + + if _, err = ctrl.updateVolumePhase(volume, api.VolumeFailed); err != nil { + glog.V(4).Infof("recycleVolumeOperation [%s]: failed to mark volume as failed: %v", volume.Name, err) + // Save failed, retry on the next deletion attempt + return + } + // Despite the volume being Failed, the controller will retry recycling + // the volume in every syncVolume() call. + return + } + + if err = recycler.Recycle(); err != nil { + // Recycler failed + if volume.Status.Phase != api.VolumeFailed { + // Log the error only once, when we enter 'Failed' phase + glog.V(2).Infof("recycler for volume %q failed: %v", volume.Name, err) + strerr := fmt.Sprintf("Recycler failed: %s", err) + ctrl.eventRecorder.Event(volume, api.EventTypeWarning, "VolumeFailedRecycle", strerr) + } + + if _, err = ctrl.updateVolumePhase(volume, api.VolumeFailed); err != nil { + glog.V(4).Infof("recycleVolumeOperation [%s]: failed to mark volume as failed: %v", volume.Name, err) + // Save failed, retry on the next deletion attempt + return + } + // Despite the volume being Failed, the controller will retry recycling + // the volume in every syncVolume() call. + return + } + + glog.V(2).Infof("volume %q recycled", volume.Name) + // Make the volume available again + if err = ctrl.unbindVolume(volume); err != nil { + // Oops, could not save the volume and therefore the controller will + // recycle the volume again on next update. We _could_ maintain a cache + // of "recently recycled volumes" and avoid unnecessary recycling, this + // is left out as future optimization. + glog.V(3).Infof("recycleVolumeOperation [%s]: failed to make recycled volume 'Available' (%v), we will recycle the volume again", volume.Name, err) + return + } + return +} + +// deleteVolumeOperation deletes a volume. This method is running in standalone +// goroutine and already has all necessary locks. +func (ctrl *PersistentVolumeController) deleteVolumeOperation(arg interface{}) { + volume := arg.(*api.PersistentVolume) + glog.V(4).Infof("deleteVolumeOperation [%s] started", volume.Name) + /*else if pv.Spec.ReclaimPolicy == "Delete" { + plugin := findDeleterPluginForPV(pv) + if plugin != nil { + // maintain a map with the current deleter goroutines that are running + // if the key is already present in the map, return + // + // launch the goroutine that: + // 1. deletes the storage asset + // 2. deletes the PV API object + // 3. deletes itself from the map when it's done + } else { + // make an event calling out that no deleter was configured + // mark the PV as failed + // NB: external provisioners/deleters are currently not + // considered. + } + */ +} + +// isVolumeReleased returns true if given volume is released and can be recycled +// or deleted, based on its retain policy. I.e. the volume is bound to a claim +// and the claim does not exist or exists and is bound to different volume. +func (ctrl *PersistentVolumeController) isVolumeReleased(volume *api.PersistentVolume) (bool, error) { + // A volume needs reclaim if it has ClaimRef and appropriate claim does not + // exist. + if volume.Spec.ClaimRef == nil { + glog.V(4).Infof("isVolumeReleased[%s]: ClaimRef is nil", volume.Name) + return false, nil + } + if volume.Spec.ClaimRef.UID == "" { + // This is a volume bound by user and the controller has not finished + // binding to the real claim yet. + glog.V(4).Infof("isVolumeReleased[%s]: ClaimRef is not bound", volume.Name) + return false, nil + } + + var claim *api.PersistentVolumeClaim + claimName := claimrefToClaimKey(volume.Spec.ClaimRef) + obj, found, err := ctrl.claims.GetByKey(claimName) + if err != nil { + return false, err + } + if !found { + // Fall through with claim = nil + } else { + var ok bool + claim, ok = obj.(*api.PersistentVolumeClaim) + if !ok { + return false, fmt.Errorf("Cannot convert object from claim cache to claim!?: %+v", obj) + } + } + if claim != nil && claim.UID == volume.Spec.ClaimRef.UID { + // the claim still exists and has the right UID + glog.V(4).Infof("isVolumeReleased[%s]: ClaimRef is still valid, volume is not released", volume.Name) + return false, nil + } + + glog.V(2).Infof("isVolumeReleased[%s]: volume is released", volume.Name) + return true, nil +} + // scheduleOperation starts given asynchronous operation on given volume. It // makes sure the operation is already not running. func (ctrl *PersistentVolumeController) scheduleOperation(operationName string, operation func(arg interface{}), arg interface{}) {