From 6fa527a460551d617e80d3fd8350f3cc1ad7770b Mon Sep 17 00:00:00 2001 From: Jan Safranek Date: Tue, 17 May 2016 14:55:02 +0200 Subject: [PATCH] Remove all three PersistentVolume controllers. We will add new ones gradually in smaller chunks. --- ...ersistentvolume_claim_binder_controller.go | 530 ------------- ...tentvolume_claim_binder_controller_test.go | 732 ------------------ ...persistentvolume_provisioner_controller.go | 536 ------------- ...stentvolume_provisioner_controller_test.go | 295 ------- .../persistentvolume_recycler_controller.go | 415 ---------- ...rsistentvolume_recycler_controller_test.go | 265 ------- 6 files changed, 2773 deletions(-) delete mode 100644 pkg/controller/persistentvolume/persistentvolume_claim_binder_controller.go delete mode 100644 pkg/controller/persistentvolume/persistentvolume_claim_binder_controller_test.go delete mode 100644 pkg/controller/persistentvolume/persistentvolume_provisioner_controller.go delete mode 100644 pkg/controller/persistentvolume/persistentvolume_provisioner_controller_test.go delete mode 100644 pkg/controller/persistentvolume/persistentvolume_recycler_controller.go delete mode 100644 pkg/controller/persistentvolume/persistentvolume_recycler_controller_test.go diff --git a/pkg/controller/persistentvolume/persistentvolume_claim_binder_controller.go b/pkg/controller/persistentvolume/persistentvolume_claim_binder_controller.go deleted file mode 100644 index a0e105a09c4..00000000000 --- a/pkg/controller/persistentvolume/persistentvolume_claim_binder_controller.go +++ /dev/null @@ -1,530 +0,0 @@ -/* -Copyright 2014 The Kubernetes Authors All rights reserved. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package persistentvolume - -import ( - "fmt" - "sync" - "time" - - "k8s.io/kubernetes/pkg/api" - "k8s.io/kubernetes/pkg/api/errors" - "k8s.io/kubernetes/pkg/client/cache" - clientset "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset" - "k8s.io/kubernetes/pkg/controller/framework" - "k8s.io/kubernetes/pkg/conversion" - "k8s.io/kubernetes/pkg/runtime" - "k8s.io/kubernetes/pkg/util/metrics" - "k8s.io/kubernetes/pkg/watch" - - "github.com/golang/glog" -) - -// PersistentVolumeClaimBinder is a controller that synchronizes PersistentVolumeClaims. -type PersistentVolumeClaimBinder struct { - volumeIndex *persistentVolumeOrderedIndex - volumeController *framework.Controller - claimController *framework.Controller - client binderClient - stopChannels map[string]chan struct{} - lock sync.RWMutex -} - -// NewPersistentVolumeClaimBinder creates a new PersistentVolumeClaimBinder -func NewPersistentVolumeClaimBinder(kubeClient clientset.Interface, syncPeriod time.Duration) *PersistentVolumeClaimBinder { - if kubeClient != nil && kubeClient.Core().GetRESTClient().GetRateLimiter() != nil { - metrics.RegisterMetricAndTrackRateLimiterUsage("pv_claim_binder_controller", kubeClient.Core().GetRESTClient().GetRateLimiter()) - } - volumeIndex := NewPersistentVolumeOrderedIndex() - binderClient := NewBinderClient(kubeClient) - binder := &PersistentVolumeClaimBinder{ - volumeIndex: volumeIndex, - client: binderClient, - } - - _, volumeController := framework.NewInformer( - &cache.ListWatch{ - ListFunc: func(options api.ListOptions) (runtime.Object, error) { - return kubeClient.Core().PersistentVolumes().List(options) - }, - WatchFunc: func(options api.ListOptions) (watch.Interface, error) { - return kubeClient.Core().PersistentVolumes().Watch(options) - }, - }, - &api.PersistentVolume{}, - // TODO: Can we have much longer period here? - syncPeriod, - framework.ResourceEventHandlerFuncs{ - AddFunc: binder.addVolume, - UpdateFunc: binder.updateVolume, - DeleteFunc: binder.deleteVolume, - }, - ) - _, claimController := framework.NewInformer( - &cache.ListWatch{ - ListFunc: func(options api.ListOptions) (runtime.Object, error) { - return kubeClient.Core().PersistentVolumeClaims(api.NamespaceAll).List(options) - }, - WatchFunc: func(options api.ListOptions) (watch.Interface, error) { - return kubeClient.Core().PersistentVolumeClaims(api.NamespaceAll).Watch(options) - }, - }, - &api.PersistentVolumeClaim{}, - // TODO: Can we have much longer period here? - syncPeriod, - framework.ResourceEventHandlerFuncs{ - AddFunc: binder.addClaim, - UpdateFunc: binder.updateClaim, - DeleteFunc: binder.deleteClaim, - }, - ) - - binder.claimController = claimController - binder.volumeController = volumeController - - return binder -} -func (binder *PersistentVolumeClaimBinder) addVolume(obj interface{}) { - binder.lock.Lock() - defer binder.lock.Unlock() - pv, ok := obj.(*api.PersistentVolume) - if !ok { - glog.Errorf("Expected PersistentVolume but handler received %+v", obj) - return - } - if err := syncVolume(binder.volumeIndex, binder.client, pv); err != nil { - glog.Errorf("PVClaimBinder could not add volume %s: %+v", pv.Name, err) - } -} - -func (binder *PersistentVolumeClaimBinder) updateVolume(oldObj, newObj interface{}) { - binder.lock.Lock() - defer binder.lock.Unlock() - newVolume, ok := newObj.(*api.PersistentVolume) - if !ok { - glog.Errorf("Expected PersistentVolume but handler received %+v", newObj) - return - } - if err := binder.volumeIndex.Update(newVolume); err != nil { - glog.Errorf("Error updating volume %s in index: %v", newVolume.Name, err) - return - } - if err := syncVolume(binder.volumeIndex, binder.client, newVolume); err != nil { - glog.Errorf("PVClaimBinder could not update volume %s: %+v", newVolume.Name, err) - } -} - -func (binder *PersistentVolumeClaimBinder) deleteVolume(obj interface{}) { - binder.lock.Lock() - defer binder.lock.Unlock() - volume, ok := obj.(*api.PersistentVolume) - if !ok { - glog.Errorf("Expected PersistentVolume but handler received %+v", obj) - return - } - if err := binder.volumeIndex.Delete(volume); err != nil { - glog.Errorf("Error deleting volume %s from index: %v", volume.Name, err) - } -} - -func (binder *PersistentVolumeClaimBinder) addClaim(obj interface{}) { - binder.lock.Lock() - defer binder.lock.Unlock() - claim, ok := obj.(*api.PersistentVolumeClaim) - if !ok { - glog.Errorf("Expected PersistentVolumeClaim but handler received %+v", obj) - return - } - if err := syncClaim(binder.volumeIndex, binder.client, claim); err != nil { - glog.Errorf("PVClaimBinder could not add claim %s: %+v", claim.Name, err) - } -} - -func (binder *PersistentVolumeClaimBinder) updateClaim(oldObj, newObj interface{}) { - binder.lock.Lock() - defer binder.lock.Unlock() - newClaim, ok := newObj.(*api.PersistentVolumeClaim) - if !ok { - glog.Errorf("Expected PersistentVolumeClaim but handler received %+v", newObj) - return - } - if err := syncClaim(binder.volumeIndex, binder.client, newClaim); err != nil { - glog.Errorf("PVClaimBinder could not update claim %s: %+v", newClaim.Name, err) - } -} - -func (binder *PersistentVolumeClaimBinder) deleteClaim(obj interface{}) { - binder.lock.Lock() - defer binder.lock.Unlock() - var volume *api.PersistentVolume - if pvc, ok := obj.(*api.PersistentVolumeClaim); ok { - if pvObj, exists, _ := binder.volumeIndex.GetByKey(pvc.Spec.VolumeName); exists { - if pv, ok := pvObj.(*api.PersistentVolume); ok { - volume = pv - } - } - } - if unk, ok := obj.(cache.DeletedFinalStateUnknown); ok && unk.Obj != nil { - if pv, ok := unk.Obj.(*api.PersistentVolume); ok { - volume = pv - } - } - - // sync the volume when its claim is deleted. Explicitly sync'ing the volume here in response to - // claim deletion prevents the volume from waiting until the next sync period for its Release. - if volume != nil { - err := syncVolume(binder.volumeIndex, binder.client, volume) - if err != nil { - glog.Errorf("PVClaimBinder could not update volume %s from deleteClaim handler: %+v", volume.Name, err) - } - } -} - -func syncVolume(volumeIndex *persistentVolumeOrderedIndex, binderClient binderClient, volume *api.PersistentVolume) (err error) { - glog.V(5).Infof("Synchronizing PersistentVolume[%s], current phase: %s\n", volume.Name, volume.Status.Phase) - - // The PV may have been modified by parallel call to syncVolume, load - // the current version. - newPv, err := binderClient.GetPersistentVolume(volume.Name) - if err != nil { - return fmt.Errorf("Cannot reload volume %s: %v", volume.Name, err) - } - volume = newPv - - // volumes can be in one of the following states: - // - // VolumePending -- default value -- not bound to a claim and not yet processed through this controller. - // VolumeAvailable -- not bound to a claim, but processed at least once and found in this controller's volumeIndex. - // VolumeBound -- bound to a claim because volume.Spec.ClaimRef != nil. Claim status may not be correct. - // VolumeReleased -- volume.Spec.ClaimRef != nil but the claim has been deleted by the user. - // VolumeFailed -- volume.Spec.ClaimRef != nil and the volume failed processing in the recycler - currentPhase := volume.Status.Phase - nextPhase := currentPhase - - // Always store the newest volume state in local cache. - _, exists, err := volumeIndex.Get(volume) - if err != nil { - return err - } - if !exists { - volumeIndex.Add(volume) - } else { - volumeIndex.Update(volume) - } - - if isBeingProvisioned(volume) { - glog.V(4).Infof("Skipping PersistentVolume[%s], waiting for provisioning to finish", volume.Name) - return nil - } - - switch currentPhase { - case api.VolumePending: - - // 4 possible states: - // 1. ClaimRef != nil, Claim exists, Claim UID == ClaimRef UID: Prebound to claim. Make volume available for binding (it will match PVC). - // 2. ClaimRef != nil, Claim exists, Claim UID != ClaimRef UID: Recently recycled. Remove bind. Make volume available for new claim. - // 3. ClaimRef != nil, Claim !exists: Recently recycled. Remove bind. Make volume available for new claim. - // 4. ClaimRef == nil: Neither recycled nor prebound. Make volume available for binding. - nextPhase = api.VolumeAvailable - - if volume.Spec.ClaimRef != nil { - claim, err := binderClient.GetPersistentVolumeClaim(volume.Spec.ClaimRef.Namespace, volume.Spec.ClaimRef.Name) - switch { - case err != nil && !errors.IsNotFound(err): - return fmt.Errorf("Error getting PersistentVolumeClaim[%s/%s]: %v", volume.Spec.ClaimRef.Namespace, volume.Spec.ClaimRef.Name, err) - case errors.IsNotFound(err) || (claim != nil && claim.UID != volume.Spec.ClaimRef.UID): - glog.V(5).Infof("PersistentVolume[%s] has a claim ref to a claim which does not exist", volume.Name) - if volume.Spec.PersistentVolumeReclaimPolicy == api.PersistentVolumeReclaimRecycle { - // Pending volumes that have a ClaimRef where the claim is missing were recently recycled. - // The Recycler set the phase to VolumePending to start the volume at the beginning of this lifecycle. - // removing ClaimRef unbinds the volume - clone, err := conversion.NewCloner().DeepCopy(volume) - if err != nil { - return fmt.Errorf("Error cloning pv: %v", err) - } - volumeClone, ok := clone.(*api.PersistentVolume) - if !ok { - return fmt.Errorf("Unexpected pv cast error : %v\n", volumeClone) - } - glog.V(5).Infof("PersistentVolume[%s] is recently recycled; remove claimRef.", volume.Name) - volumeClone.Spec.ClaimRef = nil - - if updatedVolume, err := binderClient.UpdatePersistentVolume(volumeClone); err != nil { - return fmt.Errorf("Unexpected error saving PersistentVolume: %+v", err) - } else { - volume = updatedVolume - volumeIndex.Update(volume) - } - } else { - // Pending volumes that has a ClaimRef and the claim is missing and is was not recycled. - // It must have been freshly provisioned and the claim was deleted during the provisioning. - // Mark the volume as Released, it will be deleted. - nextPhase = api.VolumeReleased - } - } - - // Dynamically provisioned claims remain Pending until its volume is completely provisioned. - // The provisioner updates the PV and triggers this update for the volume. Explicitly sync'ing - // the claim here prevents the need to wait until the next sync period when the claim would normally - // advance to Bound phase. Otherwise, the maximum wait time for the claim to be Bound is the default sync period. - if claim != nil && claim.Status.Phase == api.ClaimPending && keyExists(qosProvisioningKey, claim.Annotations) && isProvisioningComplete(volume) { - syncClaim(volumeIndex, binderClient, claim) - } - } - glog.V(5).Infof("PersistentVolume[%s] is available\n", volume.Name) - - // available volumes await a claim - case api.VolumeAvailable: - if volume.Spec.ClaimRef != nil { - _, err := binderClient.GetPersistentVolumeClaim(volume.Spec.ClaimRef.Namespace, volume.Spec.ClaimRef.Name) - if err == nil { - // change of phase will trigger an update event with the newly bound volume - glog.V(5).Infof("PersistentVolume[%s] is now bound\n", volume.Name) - nextPhase = api.VolumeBound - } else { - if errors.IsNotFound(err) { - nextPhase = api.VolumeReleased - } - } - } - - //bound volumes require verification of their bound claims - case api.VolumeBound: - if volume.Spec.ClaimRef == nil { - return fmt.Errorf("PersistentVolume[%s] expected to be bound but found nil claimRef: %+v", volume.Name, volume) - } else { - claim, err := binderClient.GetPersistentVolumeClaim(volume.Spec.ClaimRef.Namespace, volume.Spec.ClaimRef.Name) - - // A volume is Released when its bound claim cannot be found in the API server. - // A claim by the same name can be found if deleted and recreated before this controller can release - // the volume from the original claim, so a UID check is necessary. - if err != nil { - if errors.IsNotFound(err) { - nextPhase = api.VolumeReleased - } else { - return err - } - } else if claim != nil && claim.UID != volume.Spec.ClaimRef.UID { - nextPhase = api.VolumeReleased - } - } - - // released volumes require recycling - case api.VolumeReleased: - if volume.Spec.ClaimRef == nil { - return fmt.Errorf("PersistentVolume[%s] expected to be bound but found nil claimRef: %+v", volume.Name, volume) - } else { - // another process is watching for released volumes. - // PersistentVolumeReclaimPolicy is set per PersistentVolume - // Recycle - sets the PV to Pending and back under this controller's management - // Delete - delete events are handled by this controller's watch. PVs are removed from the index. - } - - // volumes are removed by processes external to this binder and must be removed from the cluster - case api.VolumeFailed: - if volume.Spec.ClaimRef == nil { - return fmt.Errorf("PersistentVolume[%s] expected to be bound but found nil claimRef: %+v", volume.Name, volume) - } else { - glog.V(5).Infof("PersistentVolume[%s] previously failed recycling. Skipping.\n", volume.Name) - } - } - - if currentPhase != nextPhase { - volume.Status.Phase = nextPhase - - // a change in state will trigger another update through this controller. - // each pass through this controller evaluates current phase and decides whether or not to change to the next phase - glog.V(5).Infof("PersistentVolume[%s] changing phase from %s to %s\n", volume.Name, currentPhase, nextPhase) - volume, err := binderClient.UpdatePersistentVolumeStatus(volume) - if err != nil { - // Rollback to previous phase - volume.Status.Phase = currentPhase - } - volumeIndex.Update(volume) - } - - return nil -} - -func syncClaim(volumeIndex *persistentVolumeOrderedIndex, binderClient binderClient, claim *api.PersistentVolumeClaim) (err error) { - glog.V(5).Infof("Synchronizing PersistentVolumeClaim[%s] for binding", claim.Name) - - // The claim may have been modified by parallel call to syncClaim, load - // the current version. - newClaim, err := binderClient.GetPersistentVolumeClaim(claim.Namespace, claim.Name) - if err != nil { - return fmt.Errorf("Cannot reload claim %s/%s: %v", claim.Namespace, claim.Name, err) - } - claim = newClaim - - switch claim.Status.Phase { - case api.ClaimPending: - // claims w/ a storage-class annotation for provisioning with *only* match volumes with a ClaimRef of the claim. - volume, err := volumeIndex.findBestMatchForClaim(claim) - if err != nil { - return err - } - - if volume == nil { - glog.V(5).Infof("A volume match does not exist for persistent claim: %s", claim.Name) - return nil - } - - if isBeingProvisioned(volume) { - glog.V(5).Infof("PersistentVolume[%s] for PersistentVolumeClaim[%s/%s] is still being provisioned.", volume.Name, claim.Namespace, claim.Name) - return nil - } - - claimRef, err := api.GetReference(claim) - if err != nil { - return fmt.Errorf("Unexpected error getting claim reference: %v\n", err) - } - - // Make a binding reference to the claim by persisting claimRef on the volume. - // The local cache must be updated with the new bind to prevent subsequent - // claims from binding to the volume. - if volume.Spec.ClaimRef == nil { - clone, err := conversion.NewCloner().DeepCopy(volume) - if err != nil { - return fmt.Errorf("Error cloning pv: %v", err) - } - volumeClone, ok := clone.(*api.PersistentVolume) - if !ok { - return fmt.Errorf("Unexpected pv cast error : %v\n", volumeClone) - } - volumeClone.Spec.ClaimRef = claimRef - if updatedVolume, err := binderClient.UpdatePersistentVolume(volumeClone); err != nil { - return fmt.Errorf("Unexpected error saving PersistentVolume.Status: %+v", err) - } else { - volume = updatedVolume - volumeIndex.Update(updatedVolume) - } - } - - // the bind is persisted on the volume above and will always match the claim in a search. - // claim would remain Pending if the update fails, so processing this state is idempotent. - // this only needs to be processed once. - if claim.Spec.VolumeName != volume.Name { - claim.Spec.VolumeName = volume.Name - claim, err = binderClient.UpdatePersistentVolumeClaim(claim) - if err != nil { - return fmt.Errorf("Error updating claim with VolumeName %s: %+v\n", volume.Name, err) - } - } - - claim.Status.Phase = api.ClaimBound - claim.Status.AccessModes = volume.Spec.AccessModes - claim.Status.Capacity = volume.Spec.Capacity - _, err = binderClient.UpdatePersistentVolumeClaimStatus(claim) - if err != nil { - return fmt.Errorf("Unexpected error saving claim status: %+v", err) - } - - case api.ClaimBound: - // no-op. Claim is bound, values from PV are set. PVCs are technically mutable in the API server - // and we don't want to handle those changes at this time. - - default: - return fmt.Errorf("Unknown state for PVC: %#v", claim) - - } - - glog.V(5).Infof("PersistentVolumeClaim[%s] is bound\n", claim.Name) - return nil -} - -func isBeingProvisioned(volume *api.PersistentVolume) bool { - value, found := volume.Annotations[pvProvisioningRequiredAnnotationKey] - if found && value != pvProvisioningCompletedAnnotationValue { - return true - } - return false -} - -// Run starts all of this binder's control loops -func (controller *PersistentVolumeClaimBinder) Run() { - glog.V(5).Infof("Starting PersistentVolumeClaimBinder\n") - if controller.stopChannels == nil { - controller.stopChannels = make(map[string]chan struct{}) - } - - if _, exists := controller.stopChannels["volumes"]; !exists { - controller.stopChannels["volumes"] = make(chan struct{}) - go controller.volumeController.Run(controller.stopChannels["volumes"]) - } - - if _, exists := controller.stopChannels["claims"]; !exists { - controller.stopChannels["claims"] = make(chan struct{}) - go controller.claimController.Run(controller.stopChannels["claims"]) - } -} - -// Stop gracefully shuts down this binder -func (controller *PersistentVolumeClaimBinder) Stop() { - glog.V(5).Infof("Stopping PersistentVolumeClaimBinder\n") - for name, stopChan := range controller.stopChannels { - close(stopChan) - delete(controller.stopChannels, name) - } -} - -// binderClient abstracts access to PVs and PVCs -type binderClient interface { - GetPersistentVolume(name string) (*api.PersistentVolume, error) - UpdatePersistentVolume(volume *api.PersistentVolume) (*api.PersistentVolume, error) - DeletePersistentVolume(volume *api.PersistentVolume) error - UpdatePersistentVolumeStatus(volume *api.PersistentVolume) (*api.PersistentVolume, error) - GetPersistentVolumeClaim(namespace, name string) (*api.PersistentVolumeClaim, error) - UpdatePersistentVolumeClaim(claim *api.PersistentVolumeClaim) (*api.PersistentVolumeClaim, error) - UpdatePersistentVolumeClaimStatus(claim *api.PersistentVolumeClaim) (*api.PersistentVolumeClaim, error) -} - -func NewBinderClient(c clientset.Interface) binderClient { - return &realBinderClient{c} -} - -type realBinderClient struct { - client clientset.Interface -} - -func (c *realBinderClient) GetPersistentVolume(name string) (*api.PersistentVolume, error) { - return c.client.Core().PersistentVolumes().Get(name) -} - -func (c *realBinderClient) UpdatePersistentVolume(volume *api.PersistentVolume) (*api.PersistentVolume, error) { - return c.client.Core().PersistentVolumes().Update(volume) -} - -func (c *realBinderClient) DeletePersistentVolume(volume *api.PersistentVolume) error { - return c.client.Core().PersistentVolumes().Delete(volume.Name, nil) -} - -func (c *realBinderClient) UpdatePersistentVolumeStatus(volume *api.PersistentVolume) (*api.PersistentVolume, error) { - return c.client.Core().PersistentVolumes().UpdateStatus(volume) -} - -func (c *realBinderClient) GetPersistentVolumeClaim(namespace, name string) (*api.PersistentVolumeClaim, error) { - return c.client.Core().PersistentVolumeClaims(namespace).Get(name) -} - -func (c *realBinderClient) UpdatePersistentVolumeClaim(claim *api.PersistentVolumeClaim) (*api.PersistentVolumeClaim, error) { - return c.client.Core().PersistentVolumeClaims(claim.Namespace).Update(claim) -} - -func (c *realBinderClient) UpdatePersistentVolumeClaimStatus(claim *api.PersistentVolumeClaim) (*api.PersistentVolumeClaim, error) { - return c.client.Core().PersistentVolumeClaims(claim.Namespace).UpdateStatus(claim) -} diff --git a/pkg/controller/persistentvolume/persistentvolume_claim_binder_controller_test.go b/pkg/controller/persistentvolume/persistentvolume_claim_binder_controller_test.go deleted file mode 100644 index f01908c7f7f..00000000000 --- a/pkg/controller/persistentvolume/persistentvolume_claim_binder_controller_test.go +++ /dev/null @@ -1,732 +0,0 @@ -/* -Copyright 2014 The Kubernetes Authors All rights reserved. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package persistentvolume - -import ( - "fmt" - "os" - "reflect" - "testing" - "time" - - "k8s.io/kubernetes/pkg/api" - "k8s.io/kubernetes/pkg/api/errors" - "k8s.io/kubernetes/pkg/api/resource" - "k8s.io/kubernetes/pkg/api/testapi" - "k8s.io/kubernetes/pkg/apimachinery/registered" - "k8s.io/kubernetes/pkg/client/cache" - "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset/fake" - "k8s.io/kubernetes/pkg/client/testing/core" - "k8s.io/kubernetes/pkg/types" - utiltesting "k8s.io/kubernetes/pkg/util/testing" - "k8s.io/kubernetes/pkg/volume" - "k8s.io/kubernetes/pkg/volume/host_path" - volumetest "k8s.io/kubernetes/pkg/volume/testing" -) - -func TestRunStop(t *testing.T) { - clientset := fake.NewSimpleClientset() - binder := NewPersistentVolumeClaimBinder(clientset, 1*time.Second) - - if len(binder.stopChannels) != 0 { - t.Errorf("Non-running binder should not have any stopChannels. Got %v", len(binder.stopChannels)) - } - - binder.Run() - - if len(binder.stopChannels) != 2 { - t.Errorf("Running binder should have exactly 2 stopChannels. Got %v", len(binder.stopChannels)) - } - - binder.Stop() - - if len(binder.stopChannels) != 0 { - t.Errorf("Non-running binder should not have any stopChannels. Got %v", len(binder.stopChannels)) - } -} - -func TestClaimRace(t *testing.T) { - tmpDir, err := utiltesting.MkTmpdir("claimbinder-test") - if err != nil { - t.Fatalf("error creating temp dir: %v", err) - } - defer os.RemoveAll(tmpDir) - c1 := &api.PersistentVolumeClaim{ - ObjectMeta: api.ObjectMeta{ - Name: "c1", - }, - Spec: api.PersistentVolumeClaimSpec{ - AccessModes: []api.PersistentVolumeAccessMode{api.ReadWriteOnce}, - Resources: api.ResourceRequirements{ - Requests: api.ResourceList{ - api.ResourceName(api.ResourceStorage): resource.MustParse("3Gi"), - }, - }, - }, - Status: api.PersistentVolumeClaimStatus{ - Phase: api.ClaimPending, - }, - } - c1.ObjectMeta.SelfLink = testapi.Default.SelfLink("pvc", "") - - c2 := &api.PersistentVolumeClaim{ - ObjectMeta: api.ObjectMeta{ - Name: "c2", - }, - Spec: api.PersistentVolumeClaimSpec{ - AccessModes: []api.PersistentVolumeAccessMode{api.ReadWriteOnce}, - Resources: api.ResourceRequirements{ - Requests: api.ResourceList{ - api.ResourceName(api.ResourceStorage): resource.MustParse("3Gi"), - }, - }, - }, - Status: api.PersistentVolumeClaimStatus{ - Phase: api.ClaimPending, - }, - } - c2.ObjectMeta.SelfLink = testapi.Default.SelfLink("pvc", "") - - v := &api.PersistentVolume{ - ObjectMeta: api.ObjectMeta{ - Name: "foo", - }, - Spec: api.PersistentVolumeSpec{ - AccessModes: []api.PersistentVolumeAccessMode{api.ReadWriteOnce}, - Capacity: api.ResourceList{ - api.ResourceName(api.ResourceStorage): resource.MustParse("10Gi"), - }, - PersistentVolumeSource: api.PersistentVolumeSource{ - HostPath: &api.HostPathVolumeSource{ - Path: fmt.Sprintf("%s/data01", tmpDir), - }, - }, - }, - Status: api.PersistentVolumeStatus{ - Phase: api.VolumePending, - }, - } - - volumeIndex := NewPersistentVolumeOrderedIndex() - mockClient := &mockBinderClient{} - mockClient.volume = v - - plugMgr := volume.VolumePluginMgr{} - plugMgr.InitPlugins(host_path.ProbeRecyclableVolumePlugins(newMockRecycler, volume.VolumeConfig{}), volumetest.NewFakeVolumeHost(tmpDir, nil, nil)) - // adds the volume to the index, making the volume available - syncVolume(volumeIndex, mockClient, v) - if mockClient.volume.Status.Phase != api.VolumeAvailable { - t.Errorf("Expected phase %s but got %s", api.VolumeAvailable, mockClient.volume.Status.Phase) - } - if _, exists, _ := volumeIndex.Get(v); !exists { - t.Errorf("Expected to find volume in index but it did not exist") - } - - // add the claim to fake API server - mockClient.UpdatePersistentVolumeClaim(c1) - // an initial sync for a claim matches the volume - err = syncClaim(volumeIndex, mockClient, c1) - if err != nil { - t.Errorf("Unexpected error: %v", err) - } - if c1.Status.Phase != api.ClaimBound { - t.Errorf("Expected phase %s but got %s", api.ClaimBound, c1.Status.Phase) - } - - // before the volume gets updated w/ claimRef, a 2nd claim can attempt to bind and find the same volume - // add the 2nd claim to fake API server - mockClient.UpdatePersistentVolumeClaim(c2) - err = syncClaim(volumeIndex, mockClient, c2) - if err != nil { - t.Errorf("unexpected error for unmatched claim: %v", err) - } - if c2.Status.Phase != api.ClaimPending { - t.Errorf("Expected phase %s but got %s", api.ClaimPending, c2.Status.Phase) - } -} - -func TestNewClaimWithSameNameAsOldClaim(t *testing.T) { - c1 := &api.PersistentVolumeClaim{ - ObjectMeta: api.ObjectMeta{ - Name: "c1", - Namespace: "foo", - UID: "12345", - }, - Spec: api.PersistentVolumeClaimSpec{ - AccessModes: []api.PersistentVolumeAccessMode{api.ReadWriteOnce}, - Resources: api.ResourceRequirements{ - Requests: api.ResourceList{ - api.ResourceName(api.ResourceStorage): resource.MustParse("3Gi"), - }, - }, - }, - Status: api.PersistentVolumeClaimStatus{ - Phase: api.ClaimBound, - }, - } - c1.ObjectMeta.SelfLink = testapi.Default.SelfLink("pvc", "") - - v := &api.PersistentVolume{ - ObjectMeta: api.ObjectMeta{ - Name: "foo", - }, - Spec: api.PersistentVolumeSpec{ - ClaimRef: &api.ObjectReference{ - Name: c1.Name, - Namespace: c1.Namespace, - UID: "45678", - }, - AccessModes: []api.PersistentVolumeAccessMode{api.ReadWriteOnce}, - Capacity: api.ResourceList{ - api.ResourceName(api.ResourceStorage): resource.MustParse("10Gi"), - }, - PersistentVolumeSource: api.PersistentVolumeSource{ - HostPath: &api.HostPathVolumeSource{ - Path: "/tmp/data01", - }, - }, - }, - Status: api.PersistentVolumeStatus{ - Phase: api.VolumeBound, - }, - } - - volumeIndex := NewPersistentVolumeOrderedIndex() - mockClient := &mockBinderClient{ - claim: c1, - volume: v, - } - - plugMgr := volume.VolumePluginMgr{} - plugMgr.InitPlugins(host_path.ProbeRecyclableVolumePlugins(newMockRecycler, volume.VolumeConfig{}), volumetest.NewFakeVolumeHost("/tmp/fake", nil, nil)) - - syncVolume(volumeIndex, mockClient, v) - if mockClient.volume.Status.Phase != api.VolumeReleased { - t.Errorf("Expected phase %s but got %s", api.VolumeReleased, mockClient.volume.Status.Phase) - } - -} - -func TestClaimSyncAfterVolumeProvisioning(t *testing.T) { - tmpDir, err := utiltesting.MkTmpdir("claimbinder-test") - if err != nil { - t.Fatalf("error creating temp dir: %v", err) - } - defer os.RemoveAll(tmpDir) - - // Tests that binder.syncVolume will also syncClaim if the PV has completed - // provisioning but the claim is still Pending. We want to advance to Bound - // without having to wait until the binder's next sync period. - claim := &api.PersistentVolumeClaim{ - ObjectMeta: api.ObjectMeta{ - Name: "foo", - Namespace: "bar", - Annotations: map[string]string{ - qosProvisioningKey: "foo", - }, - }, - Spec: api.PersistentVolumeClaimSpec{ - AccessModes: []api.PersistentVolumeAccessMode{api.ReadWriteOnce}, - Resources: api.ResourceRequirements{ - Requests: api.ResourceList{ - api.ResourceName(api.ResourceStorage): resource.MustParse("3Gi"), - }, - }, - }, - Status: api.PersistentVolumeClaimStatus{ - Phase: api.ClaimPending, - }, - } - claim.ObjectMeta.SelfLink = testapi.Default.SelfLink("pvc", "") - claimRef, _ := api.GetReference(claim) - - pv := &api.PersistentVolume{ - ObjectMeta: api.ObjectMeta{ - Name: "foo", - Annotations: map[string]string{ - pvProvisioningRequiredAnnotationKey: pvProvisioningCompletedAnnotationValue, - }, - }, - Spec: api.PersistentVolumeSpec{ - AccessModes: []api.PersistentVolumeAccessMode{api.ReadWriteOnce}, - Capacity: api.ResourceList{ - api.ResourceName(api.ResourceStorage): resource.MustParse("10Gi"), - }, - PersistentVolumeSource: api.PersistentVolumeSource{ - HostPath: &api.HostPathVolumeSource{ - Path: fmt.Sprintf("%s/data01", tmpDir), - }, - }, - ClaimRef: claimRef, - }, - Status: api.PersistentVolumeStatus{ - Phase: api.VolumePending, - }, - } - - volumeIndex := NewPersistentVolumeOrderedIndex() - mockClient := &mockBinderClient{ - claim: claim, - volume: pv, - } - - plugMgr := volume.VolumePluginMgr{} - plugMgr.InitPlugins(host_path.ProbeRecyclableVolumePlugins(newMockRecycler, volume.VolumeConfig{}), volumetest.NewFakeVolumeHost(tmpDir, nil, nil)) - - // adds the volume to the index, making the volume available. - // pv also completed provisioning, so syncClaim should cause claim's phase to advance to Bound - syncVolume(volumeIndex, mockClient, pv) - if mockClient.volume.Status.Phase != api.VolumeAvailable { - t.Errorf("Expected phase %s but got %s", api.VolumeAvailable, mockClient.volume.Status.Phase) - } - if mockClient.claim.Status.Phase != api.ClaimBound { - t.Errorf("Expected phase %s but got %s", api.ClaimBound, claim.Status.Phase) - } -} - -func TestExampleObjects(t *testing.T) { - scenarios := map[string]struct { - expected interface{} - }{ - "claims/claim-01.yaml": { - expected: &api.PersistentVolumeClaim{ - Spec: api.PersistentVolumeClaimSpec{ - AccessModes: []api.PersistentVolumeAccessMode{api.ReadWriteOnce}, - Resources: api.ResourceRequirements{ - Requests: api.ResourceList{ - api.ResourceName(api.ResourceStorage): resource.MustParse("3Gi"), - }, - }, - }, - }, - }, - "claims/claim-02.yaml": { - expected: &api.PersistentVolumeClaim{ - Spec: api.PersistentVolumeClaimSpec{ - AccessModes: []api.PersistentVolumeAccessMode{api.ReadWriteOnce}, - Resources: api.ResourceRequirements{ - Requests: api.ResourceList{ - api.ResourceName(api.ResourceStorage): resource.MustParse("8Gi"), - }, - }, - }, - }, - }, - "volumes/local-01.yaml": { - expected: &api.PersistentVolume{ - Spec: api.PersistentVolumeSpec{ - AccessModes: []api.PersistentVolumeAccessMode{api.ReadWriteOnce}, - Capacity: api.ResourceList{ - api.ResourceName(api.ResourceStorage): resource.MustParse("10Gi"), - }, - PersistentVolumeSource: api.PersistentVolumeSource{ - HostPath: &api.HostPathVolumeSource{ - Path: "/somepath/data01", - }, - }, - }, - }, - }, - "volumes/local-02.yaml": { - expected: &api.PersistentVolume{ - Spec: api.PersistentVolumeSpec{ - AccessModes: []api.PersistentVolumeAccessMode{api.ReadWriteOnce}, - Capacity: api.ResourceList{ - api.ResourceName(api.ResourceStorage): resource.MustParse("8Gi"), - }, - PersistentVolumeSource: api.PersistentVolumeSource{ - HostPath: &api.HostPathVolumeSource{ - Path: "/somepath/data02", - }, - }, - PersistentVolumeReclaimPolicy: api.PersistentVolumeReclaimRecycle, - }, - }, - }, - } - - for name, scenario := range scenarios { - codec := api.Codecs.UniversalDecoder() - o := core.NewObjects(api.Scheme, codec) - if err := core.AddObjectsFromPath("../../../docs/user-guide/persistent-volumes/"+name, o, codec); err != nil { - t.Fatal(err) - } - - clientset := &fake.Clientset{} - clientset.AddReactor("*", "*", core.ObjectReaction(o, registered.RESTMapper())) - - if reflect.TypeOf(scenario.expected) == reflect.TypeOf(&api.PersistentVolumeClaim{}) { - pvc, err := clientset.Core().PersistentVolumeClaims("ns").Get("doesntmatter") - if err != nil { - t.Fatalf("Error retrieving object: %v", err) - } - - expected := scenario.expected.(*api.PersistentVolumeClaim) - if pvc.Spec.AccessModes[0] != expected.Spec.AccessModes[0] { - t.Errorf("Unexpected mismatch. Got %v wanted %v", pvc.Spec.AccessModes[0], expected.Spec.AccessModes[0]) - } - - aQty := pvc.Spec.Resources.Requests[api.ResourceStorage] - bQty := expected.Spec.Resources.Requests[api.ResourceStorage] - aSize := aQty.Value() - bSize := bQty.Value() - - if aSize != bSize { - t.Errorf("Unexpected mismatch. Got %v wanted %v", aSize, bSize) - } - } - - if reflect.TypeOf(scenario.expected) == reflect.TypeOf(&api.PersistentVolume{}) { - pv, err := clientset.Core().PersistentVolumes().Get("doesntmatter") - if err != nil { - t.Fatalf("Error retrieving object: %v", err) - } - - expected := scenario.expected.(*api.PersistentVolume) - if pv.Spec.AccessModes[0] != expected.Spec.AccessModes[0] { - t.Errorf("Unexpected mismatch. Got %v wanted %v", pv.Spec.AccessModes[0], expected.Spec.AccessModes[0]) - } - - aQty := pv.Spec.Capacity[api.ResourceStorage] - bQty := expected.Spec.Capacity[api.ResourceStorage] - aSize := aQty.Value() - bSize := bQty.Value() - - if aSize != bSize { - t.Errorf("Unexpected mismatch. Got %v wanted %v", aSize, bSize) - } - - if pv.Spec.HostPath.Path != expected.Spec.HostPath.Path { - t.Errorf("Unexpected mismatch. Got %v wanted %v", pv.Spec.HostPath.Path, expected.Spec.HostPath.Path) - } - } - } -} - -func TestBindingWithExamples(t *testing.T) { - tmpDir, err := utiltesting.MkTmpdir("claimbinder-test") - if err != nil { - t.Fatalf("error creating temp dir: %v", err) - } - defer os.RemoveAll(tmpDir) - - codec := api.Codecs.UniversalDecoder() - o := core.NewObjects(api.Scheme, codec) - if err := core.AddObjectsFromPath("../../../docs/user-guide/persistent-volumes/claims/claim-01.yaml", o, codec); err != nil { - t.Fatal(err) - } - if err := core.AddObjectsFromPath("../../../docs/user-guide/persistent-volumes/volumes/local-01.yaml", o, codec); err != nil { - t.Fatal(err) - } - - clientset := &fake.Clientset{} - clientset.AddReactor("*", "*", core.ObjectReaction(o, registered.RESTMapper())) - - pv, err := clientset.Core().PersistentVolumes().Get("any") - if err != nil { - t.Errorf("Unexpected error getting PV from client: %v", err) - } - pv.Spec.PersistentVolumeReclaimPolicy = api.PersistentVolumeReclaimRecycle - if err != nil { - t.Errorf("Unexpected error getting PV from client: %v", err) - } - pv.ObjectMeta.SelfLink = testapi.Default.SelfLink("pv", "") - - // the default value of the PV is Pending. if processed at least once, its status in etcd is Available. - // There was a bug where only Pending volumes were being indexed and made ready for claims. - // Test that !Pending gets correctly added - pv.Status.Phase = api.VolumeAvailable - - claim, error := clientset.Core().PersistentVolumeClaims("ns").Get("any") - if error != nil { - t.Errorf("Unexpected error getting PVC from client: %v", err) - } - claim.ObjectMeta.SelfLink = testapi.Default.SelfLink("pvc", "") - - volumeIndex := NewPersistentVolumeOrderedIndex() - mockClient := &mockBinderClient{ - volume: pv, - claim: claim, - } - - plugMgr := volume.VolumePluginMgr{} - plugMgr.InitPlugins(host_path.ProbeRecyclableVolumePlugins(newMockRecycler, volume.VolumeConfig{}), volumetest.NewFakeVolumeHost(tmpDir, nil, nil)) - - recycler := &PersistentVolumeRecycler{ - kubeClient: clientset, - client: mockClient, - pluginMgr: plugMgr, - } - - // adds the volume to the index, making the volume available - syncVolume(volumeIndex, mockClient, pv) - if mockClient.volume.Status.Phase != api.VolumeAvailable { - t.Errorf("Expected phase %s but got %s", api.VolumeAvailable, mockClient.volume.Status.Phase) - } - - // add the claim to fake API server - mockClient.UpdatePersistentVolumeClaim(claim) - // an initial sync for a claim will bind it to an unbound volume - syncClaim(volumeIndex, mockClient, claim) - - // bind expected on pv.Spec but status update hasn't happened yet - if mockClient.volume.Spec.ClaimRef == nil { - t.Errorf("Expected ClaimRef but got nil for pv.Status.ClaimRef\n") - } - if mockClient.volume.Status.Phase != api.VolumeAvailable { - t.Errorf("Expected phase %s but got %s", api.VolumeAvailable, mockClient.volume.Status.Phase) - } - if mockClient.claim.Spec.VolumeName != pv.Name { - t.Errorf("Expected claim.Spec.VolumeName %s but got %s", mockClient.claim.Spec.VolumeName, pv.Name) - } - if mockClient.claim.Status.Phase != api.ClaimBound { - t.Errorf("Expected phase %s but got %s", api.ClaimBound, claim.Status.Phase) - } - - // state changes in pvc triggers sync that sets pv attributes to pvc.Status - syncClaim(volumeIndex, mockClient, claim) - if len(mockClient.claim.Status.AccessModes) == 0 { - t.Errorf("Expected %d access modes but got 0", len(pv.Spec.AccessModes)) - } - - // persisting the bind to pv.Spec.ClaimRef triggers a sync - syncVolume(volumeIndex, mockClient, mockClient.volume) - if mockClient.volume.Status.Phase != api.VolumeBound { - t.Errorf("Expected phase %s but got %s", api.VolumeBound, mockClient.volume.Status.Phase) - } - - // pretend the user deleted their claim. periodic resync picks it up. - mockClient.claim = nil - syncVolume(volumeIndex, mockClient, mockClient.volume) - - if mockClient.volume.Status.Phase != api.VolumeReleased { - t.Errorf("Expected phase %s but got %s", api.VolumeReleased, mockClient.volume.Status.Phase) - } - - // released volumes with a PersistentVolumeReclaimPolicy (recycle/delete) can have further processing - err = recycler.reclaimVolume(mockClient.volume) - if err != nil { - t.Errorf("Unexpected error reclaiming volume: %+v", err) - } - if mockClient.volume.Status.Phase != api.VolumePending { - t.Errorf("Expected phase %s but got %s", api.VolumePending, mockClient.volume.Status.Phase) - } - - // after the recycling changes the phase to Pending, the binder picks up again - // to remove any vestiges of binding and make the volume Available again - syncVolume(volumeIndex, mockClient, mockClient.volume) - - if mockClient.volume.Status.Phase != api.VolumeAvailable { - t.Errorf("Expected phase %s but got %s", api.VolumeAvailable, mockClient.volume.Status.Phase) - } - if mockClient.volume.Spec.ClaimRef != nil { - t.Errorf("Expected nil ClaimRef: %+v", mockClient.volume.Spec.ClaimRef) - } -} - -func TestCasting(t *testing.T) { - clientset := fake.NewSimpleClientset() - binder := NewPersistentVolumeClaimBinder(clientset, 1*time.Second) - - pv := &api.PersistentVolume{} - unk := cache.DeletedFinalStateUnknown{} - pvc := &api.PersistentVolumeClaim{ - ObjectMeta: api.ObjectMeta{Name: "foo"}, - Status: api.PersistentVolumeClaimStatus{Phase: api.ClaimBound}, - } - - // Inject mockClient into the binder. This prevents weird errors on stderr - // as the binder wants to load PV/PVC from API server. - mockClient := &mockBinderClient{ - volume: pv, - claim: pvc, - } - binder.client = mockClient - - // none of these should fail casting. - // the real test is not failing when passed DeletedFinalStateUnknown in the deleteHandler - binder.addVolume(pv) - binder.updateVolume(pv, pv) - binder.deleteVolume(pv) - binder.deleteVolume(unk) - binder.addClaim(pvc) - binder.updateClaim(pvc, pvc) -} - -func TestRecycledPersistentVolumeUID(t *testing.T) { - tmpDir, err := utiltesting.MkTmpdir("claimbinder-test") - if err != nil { - t.Fatalf("error creating temp dir: %v", err) - } - defer os.RemoveAll(tmpDir) - - codec := api.Codecs.UniversalDecoder() - o := core.NewObjects(api.Scheme, codec) - if err := core.AddObjectsFromPath("../../../docs/user-guide/persistent-volumes/claims/claim-01.yaml", o, codec); err != nil { - t.Fatal(err) - } - if err := core.AddObjectsFromPath("../../../docs/user-guide/persistent-volumes/volumes/local-01.yaml", o, codec); err != nil { - t.Fatal(err) - } - - clientset := &fake.Clientset{} - clientset.AddReactor("*", "*", core.ObjectReaction(o, registered.RESTMapper())) - - pv, err := clientset.Core().PersistentVolumes().Get("any") - if err != nil { - t.Errorf("Unexpected error getting PV from client: %v", err) - } - pv.Spec.PersistentVolumeReclaimPolicy = api.PersistentVolumeReclaimRecycle - if err != nil { - t.Errorf("Unexpected error getting PV from client: %v", err) - } - pv.ObjectMeta.SelfLink = testapi.Default.SelfLink("pv", "") - - // the default value of the PV is Pending. if processed at least once, its status in etcd is Available. - // There was a bug where only Pending volumes were being indexed and made ready for claims. - // Test that !Pending gets correctly added - pv.Status.Phase = api.VolumeAvailable - - claim, error := clientset.Core().PersistentVolumeClaims("ns").Get("any") - if error != nil { - t.Errorf("Unexpected error getting PVC from client: %v", err) - } - claim.ObjectMeta.SelfLink = testapi.Default.SelfLink("pvc", "") - claim.ObjectMeta.UID = types.UID("uid1") - - volumeIndex := NewPersistentVolumeOrderedIndex() - mockClient := &mockBinderClient{ - volume: pv, - claim: claim, - } - - plugMgr := volume.VolumePluginMgr{} - plugMgr.InitPlugins(host_path.ProbeRecyclableVolumePlugins(newMockRecycler, volume.VolumeConfig{}), volumetest.NewFakeVolumeHost(tmpDir, nil, nil)) - - recycler := &PersistentVolumeRecycler{ - kubeClient: clientset, - client: mockClient, - pluginMgr: plugMgr, - } - - // adds the volume to the index, making the volume available - syncVolume(volumeIndex, mockClient, pv) - if mockClient.volume.Status.Phase != api.VolumeAvailable { - t.Errorf("Expected phase %s but got %s", api.VolumeAvailable, mockClient.volume.Status.Phase) - } - - // add the claim to fake API server - mockClient.UpdatePersistentVolumeClaim(claim) - // an initial sync for a claim will bind it to an unbound volume - syncClaim(volumeIndex, mockClient, claim) - - // pretend the user deleted their claim. periodic resync picks it up. - mockClient.claim = nil - syncVolume(volumeIndex, mockClient, mockClient.volume) - - if mockClient.volume.Status.Phase != api.VolumeReleased { - t.Errorf("Expected phase %s but got %s", api.VolumeReleased, mockClient.volume.Status.Phase) - } - - // released volumes with a PersistentVolumeReclaimPolicy (recycle/delete) can have further processing - err = recycler.reclaimVolume(mockClient.volume) - if err != nil { - t.Errorf("Unexpected error reclaiming volume: %+v", err) - } - if mockClient.volume.Status.Phase != api.VolumePending { - t.Errorf("Expected phase %s but got %s", api.VolumePending, mockClient.volume.Status.Phase) - } - - // after the recycling changes the phase to Pending, the binder picks up again - // to remove any vestiges of binding and make the volume Available again - // - // explicitly set the claim's UID to a different value to ensure that a new claim with the same - // name as what the PV was previously bound still yields an available volume - claim.ObjectMeta.UID = types.UID("uid2") - mockClient.claim = claim - syncVolume(volumeIndex, mockClient, mockClient.volume) - - if mockClient.volume.Status.Phase != api.VolumeAvailable { - t.Errorf("Expected phase %s but got %s", api.VolumeAvailable, mockClient.volume.Status.Phase) - } - if mockClient.volume.Spec.ClaimRef != nil { - t.Errorf("Expected nil ClaimRef: %+v", mockClient.volume.Spec.ClaimRef) - } -} - -type mockBinderClient struct { - volume *api.PersistentVolume - claim *api.PersistentVolumeClaim -} - -func (c *mockBinderClient) GetPersistentVolume(name string) (*api.PersistentVolume, error) { - return c.volume, nil -} - -func (c *mockBinderClient) UpdatePersistentVolume(volume *api.PersistentVolume) (*api.PersistentVolume, error) { - c.volume = volume - return c.volume, nil -} - -func (c *mockBinderClient) DeletePersistentVolume(volume *api.PersistentVolume) error { - c.volume = nil - return nil -} - -func (c *mockBinderClient) UpdatePersistentVolumeStatus(volume *api.PersistentVolume) (*api.PersistentVolume, error) { - c.volume = volume - return c.volume, nil -} - -func (c *mockBinderClient) GetPersistentVolumeClaim(namespace, name string) (*api.PersistentVolumeClaim, error) { - if c.claim != nil { - return c.claim, nil - } else { - return nil, errors.NewNotFound(api.Resource("persistentvolumes"), name) - } -} - -func (c *mockBinderClient) UpdatePersistentVolumeClaim(claim *api.PersistentVolumeClaim) (*api.PersistentVolumeClaim, error) { - c.claim = claim - return c.claim, nil -} - -func (c *mockBinderClient) UpdatePersistentVolumeClaimStatus(claim *api.PersistentVolumeClaim) (*api.PersistentVolumeClaim, error) { - c.claim = claim - return c.claim, nil -} - -func newMockRecycler(spec *volume.Spec, host volume.VolumeHost, config volume.VolumeConfig) (volume.Recycler, error) { - return &mockRecycler{ - path: spec.PersistentVolume.Spec.HostPath.Path, - }, nil -} - -type mockRecycler struct { - path string - host volume.VolumeHost - volume.MetricsNil -} - -func (r *mockRecycler) GetPath() string { - return r.path -} - -func (r *mockRecycler) Recycle() error { - // return nil means recycle passed - return nil -} diff --git a/pkg/controller/persistentvolume/persistentvolume_provisioner_controller.go b/pkg/controller/persistentvolume/persistentvolume_provisioner_controller.go deleted file mode 100644 index fdb7804a3ea..00000000000 --- a/pkg/controller/persistentvolume/persistentvolume_provisioner_controller.go +++ /dev/null @@ -1,536 +0,0 @@ -/* -Copyright 2015 The Kubernetes Authors All rights reserved. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package persistentvolume - -import ( - "fmt" - "sync" - "time" - - "k8s.io/kubernetes/pkg/api" - "k8s.io/kubernetes/pkg/client/cache" - clientset "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset" - "k8s.io/kubernetes/pkg/cloudprovider" - "k8s.io/kubernetes/pkg/controller/framework" - "k8s.io/kubernetes/pkg/conversion" - "k8s.io/kubernetes/pkg/runtime" - "k8s.io/kubernetes/pkg/types" - "k8s.io/kubernetes/pkg/util/io" - "k8s.io/kubernetes/pkg/util/mount" - "k8s.io/kubernetes/pkg/volume" - "k8s.io/kubernetes/pkg/watch" - - "github.com/golang/glog" -) - -// PersistentVolumeProvisionerController reconciles the state of all PersistentVolumes and PersistentVolumeClaims. -type PersistentVolumeProvisionerController struct { - volumeController *framework.Controller - volumeStore cache.Store - claimController *framework.Controller - claimStore cache.Store - client controllerClient - cloud cloudprovider.Interface - provisioner volume.ProvisionableVolumePlugin - pluginMgr volume.VolumePluginMgr - stopChannels map[string]chan struct{} - mutex sync.RWMutex - clusterName string -} - -// constant name values for the controllers stopChannels map. -// the controller uses these for graceful shutdown -const volumesStopChannel = "volumes" -const claimsStopChannel = "claims" - -// NewPersistentVolumeProvisionerController creates a new PersistentVolumeProvisionerController -func NewPersistentVolumeProvisionerController(client controllerClient, syncPeriod time.Duration, clusterName string, plugins []volume.VolumePlugin, provisioner volume.ProvisionableVolumePlugin, cloud cloudprovider.Interface) (*PersistentVolumeProvisionerController, error) { - controller := &PersistentVolumeProvisionerController{ - client: client, - cloud: cloud, - provisioner: provisioner, - clusterName: clusterName, - } - - if err := controller.pluginMgr.InitPlugins(plugins, controller); err != nil { - return nil, fmt.Errorf("Could not initialize volume plugins for PersistentVolumeProvisionerController: %+v", err) - } - - glog.V(5).Infof("Initializing provisioner: %s", controller.provisioner.Name()) - controller.provisioner.Init(controller) - - controller.volumeStore, controller.volumeController = framework.NewInformer( - &cache.ListWatch{ - ListFunc: func(options api.ListOptions) (runtime.Object, error) { - return client.ListPersistentVolumes(options) - }, - WatchFunc: func(options api.ListOptions) (watch.Interface, error) { - return client.WatchPersistentVolumes(options) - }, - }, - &api.PersistentVolume{}, - syncPeriod, - framework.ResourceEventHandlerFuncs{ - AddFunc: controller.handleAddVolume, - UpdateFunc: controller.handleUpdateVolume, - // delete handler not needed in this controller. - // volume deletion is handled by the recycler controller - }, - ) - controller.claimStore, controller.claimController = framework.NewInformer( - &cache.ListWatch{ - ListFunc: func(options api.ListOptions) (runtime.Object, error) { - return client.ListPersistentVolumeClaims(api.NamespaceAll, options) - }, - WatchFunc: func(options api.ListOptions) (watch.Interface, error) { - return client.WatchPersistentVolumeClaims(api.NamespaceAll, options) - }, - }, - &api.PersistentVolumeClaim{}, - syncPeriod, - framework.ResourceEventHandlerFuncs{ - AddFunc: controller.handleAddClaim, - UpdateFunc: controller.handleUpdateClaim, - // delete handler not needed. - // normal recycling applies when a claim is deleted. - // recycling is handled by the binding controller. - }, - ) - - return controller, nil -} - -func (controller *PersistentVolumeProvisionerController) handleAddVolume(obj interface{}) { - controller.mutex.Lock() - defer controller.mutex.Unlock() - cachedPv, _, _ := controller.volumeStore.Get(obj) - if pv, ok := cachedPv.(*api.PersistentVolume); ok { - err := controller.reconcileVolume(pv) - if err != nil { - glog.Errorf("Error reconciling volume %s: %+v", pv.Name, err) - } - } -} - -func (controller *PersistentVolumeProvisionerController) handleUpdateVolume(oldObj, newObj interface{}) { - // The flow for Update is the same as Add. - // A volume is only provisioned if not done so already. - controller.handleAddVolume(newObj) -} - -func (controller *PersistentVolumeProvisionerController) handleAddClaim(obj interface{}) { - controller.mutex.Lock() - defer controller.mutex.Unlock() - cachedPvc, exists, _ := controller.claimStore.Get(obj) - if !exists { - glog.Errorf("PersistentVolumeClaim does not exist in the local cache: %+v", obj) - return - } - if pvc, ok := cachedPvc.(*api.PersistentVolumeClaim); ok { - err := controller.reconcileClaim(pvc) - if err != nil { - glog.Errorf("Error encoutered reconciling claim %s: %+v", pvc.Name, err) - } - } -} - -func (controller *PersistentVolumeProvisionerController) handleUpdateClaim(oldObj, newObj interface{}) { - // The flow for Update is the same as Add. - // A volume is only provisioned for a claim if not done so already. - controller.handleAddClaim(newObj) -} - -func (controller *PersistentVolumeProvisionerController) reconcileClaim(claim *api.PersistentVolumeClaim) error { - glog.V(5).Infof("Synchronizing PersistentVolumeClaim[%s] for dynamic provisioning", claim.Name) - - // The claim may have been modified by parallel call to reconcileClaim, load - // the current version. - newClaim, err := controller.client.GetPersistentVolumeClaim(claim.Namespace, claim.Name) - if err != nil { - return fmt.Errorf("Cannot reload claim %s/%s: %v", claim.Namespace, claim.Name, err) - } - claim = newClaim - err = controller.claimStore.Update(claim) - if err != nil { - return fmt.Errorf("Cannot update claim %s/%s: %v", claim.Namespace, claim.Name, err) - } - - if controller.provisioner == nil { - return fmt.Errorf("No provisioner configured for controller") - } - - // no provisioning requested, return Pending. Claim may be pending indefinitely without a match. - if !keyExists(qosProvisioningKey, claim.Annotations) { - glog.V(5).Infof("PersistentVolumeClaim[%s] no provisioning required", claim.Name) - return nil - } - if len(claim.Spec.VolumeName) != 0 { - glog.V(5).Infof("PersistentVolumeClaim[%s] already bound. No provisioning required", claim.Name) - return nil - } - if isAnnotationMatch(pvProvisioningRequiredAnnotationKey, pvProvisioningCompletedAnnotationValue, claim.Annotations) { - glog.V(5).Infof("PersistentVolumeClaim[%s] is already provisioned.", claim.Name) - return nil - } - - glog.V(5).Infof("PersistentVolumeClaim[%s] provisioning", claim.Name) - provisioner, err := controller.newProvisioner(controller.provisioner, claim, nil) - if err != nil { - return fmt.Errorf("Unexpected error getting new provisioner for claim %s: %v\n", claim.Name, err) - } - newVolume, err := provisioner.NewPersistentVolumeTemplate() - if err != nil { - return fmt.Errorf("Unexpected error getting new volume template for claim %s: %v\n", claim.Name, err) - } - - claimRef, err := api.GetReference(claim) - if err != nil { - return fmt.Errorf("Unexpected error getting claim reference for %s: %v\n", claim.Name, err) - } - - storageClass, _ := claim.Annotations[qosProvisioningKey] - - // the creation of this volume is the bind to the claim. - // The claim will match the volume during the next sync period when the volume is in the local cache - newVolume.Spec.ClaimRef = claimRef - newVolume.Annotations[pvProvisioningRequiredAnnotationKey] = "true" - newVolume.Annotations[qosProvisioningKey] = storageClass - newVolume, err = controller.client.CreatePersistentVolume(newVolume) - glog.V(5).Infof("Unprovisioned PersistentVolume[%s] created for PVC[%s], which will be fulfilled in the storage provider", newVolume.Name, claim.Name) - if err != nil { - return fmt.Errorf("PersistentVolumeClaim[%s] failed provisioning: %+v", claim.Name, err) - } - - claim.Annotations[pvProvisioningRequiredAnnotationKey] = pvProvisioningCompletedAnnotationValue - _, err = controller.client.UpdatePersistentVolumeClaim(claim) - if err != nil { - glog.Errorf("error updating persistent volume claim: %v", err) - } - - return nil -} - -func (controller *PersistentVolumeProvisionerController) reconcileVolume(pv *api.PersistentVolume) error { - glog.V(5).Infof("PersistentVolume[%s] reconciling", pv.Name) - - // The PV may have been modified by parallel call to reconcileVolume, load - // the current version. - newPv, err := controller.client.GetPersistentVolume(pv.Name) - if err != nil { - return fmt.Errorf("Cannot reload volume %s: %v", pv.Name, err) - } - pv = newPv - - if pv.Spec.ClaimRef == nil { - glog.V(5).Infof("PersistentVolume[%s] is not bound to a claim. No provisioning required", pv.Name) - return nil - } - - // TODO: fix this leaky abstraction. Had to make our own store key because ClaimRef fails the default keyfunc (no Meta on object). - obj, exists, _ := controller.claimStore.GetByKey(fmt.Sprintf("%s/%s", pv.Spec.ClaimRef.Namespace, pv.Spec.ClaimRef.Name)) - if !exists { - return fmt.Errorf("PersistentVolumeClaim[%s/%s] not found in local cache", pv.Spec.ClaimRef.Namespace, pv.Spec.ClaimRef.Name) - } - - claim, ok := obj.(*api.PersistentVolumeClaim) - if !ok { - return fmt.Errorf("PersistentVolumeClaim expected, but got %v", obj) - } - - // no provisioning required, volume is ready and Bound - if !keyExists(pvProvisioningRequiredAnnotationKey, pv.Annotations) { - glog.V(5).Infof("PersistentVolume[%s] does not require provisioning", pv.Name) - return nil - } - - // provisioning is completed, volume is ready. - if isProvisioningComplete(pv) { - glog.V(5).Infof("PersistentVolume[%s] is bound and provisioning is complete", pv.Name) - if pv.Spec.ClaimRef.Namespace != claim.Namespace || pv.Spec.ClaimRef.Name != claim.Name { - return fmt.Errorf("pre-bind mismatch - expected %s but found %s/%s", claimToClaimKey(claim), pv.Spec.ClaimRef.Namespace, pv.Spec.ClaimRef.Name) - } - return nil - } - - // provisioning is incomplete. Attempt to provision the volume. - glog.V(5).Infof("PersistentVolume[%s] provisioning in progress", pv.Name) - err = provisionVolume(pv, controller) - if err != nil { - return fmt.Errorf("Error provisioning PersistentVolume[%s]: %v", pv.Name, err) - } - - return nil -} - -// provisionVolume provisions a volume that has been created in the cluster but not yet fulfilled by -// the storage provider. -func provisionVolume(pv *api.PersistentVolume, controller *PersistentVolumeProvisionerController) error { - if isProvisioningComplete(pv) { - return fmt.Errorf("PersistentVolume[%s] is already provisioned", pv.Name) - } - - if _, exists := pv.Annotations[qosProvisioningKey]; !exists { - return fmt.Errorf("PersistentVolume[%s] does not contain a provisioning request. Provisioning not required.", pv.Name) - } - - if controller.provisioner == nil { - return fmt.Errorf("No provisioner found for volume: %s", pv.Name) - } - - // Find the claim in local cache - obj, exists, _ := controller.claimStore.GetByKey(fmt.Sprintf("%s/%s", pv.Spec.ClaimRef.Namespace, pv.Spec.ClaimRef.Name)) - if !exists { - return fmt.Errorf("Could not find PersistentVolumeClaim[%s/%s] in local cache", pv.Spec.ClaimRef.Name, pv.Name) - } - claim := obj.(*api.PersistentVolumeClaim) - - provisioner, _ := controller.newProvisioner(controller.provisioner, claim, pv) - err := provisioner.Provision(pv) - if err != nil { - glog.Errorf("Could not provision %s", pv.Name) - pv.Status.Phase = api.VolumeFailed - pv.Status.Message = err.Error() - if pv, apiErr := controller.client.UpdatePersistentVolumeStatus(pv); apiErr != nil { - return fmt.Errorf("PersistentVolume[%s] failed provisioning and also failed status update: %v - %v", pv.Name, err, apiErr) - } - return fmt.Errorf("PersistentVolume[%s] failed provisioning: %v", pv.Name, err) - } - - clone, err := conversion.NewCloner().DeepCopy(pv) - volumeClone, ok := clone.(*api.PersistentVolume) - if !ok { - return fmt.Errorf("Unexpected pv cast error : %v\n", volumeClone) - } - volumeClone.Annotations[pvProvisioningRequiredAnnotationKey] = pvProvisioningCompletedAnnotationValue - - pv, err = controller.client.UpdatePersistentVolume(volumeClone) - if err != nil { - // TODO: https://github.com/kubernetes/kubernetes/issues/14443 - // the volume was created in the infrastructure and likely has a PV name on it, - // but we failed to save the annotation that marks the volume as provisioned. - return fmt.Errorf("Error updating PersistentVolume[%s] with provisioning completed annotation. There is a potential for dupes and orphans.", volumeClone.Name) - } - return nil -} - -// Run starts all of this controller's control loops -func (controller *PersistentVolumeProvisionerController) Run() { - glog.V(5).Infof("Starting PersistentVolumeProvisionerController\n") - if controller.stopChannels == nil { - controller.stopChannels = make(map[string]chan struct{}) - } - - if _, exists := controller.stopChannels[volumesStopChannel]; !exists { - controller.stopChannels[volumesStopChannel] = make(chan struct{}) - go controller.volumeController.Run(controller.stopChannels[volumesStopChannel]) - } - - if _, exists := controller.stopChannels[claimsStopChannel]; !exists { - controller.stopChannels[claimsStopChannel] = make(chan struct{}) - go controller.claimController.Run(controller.stopChannels[claimsStopChannel]) - } -} - -// Stop gracefully shuts down this controller -func (controller *PersistentVolumeProvisionerController) Stop() { - glog.V(5).Infof("Stopping PersistentVolumeProvisionerController\n") - for name, stopChan := range controller.stopChannels { - close(stopChan) - delete(controller.stopChannels, name) - } -} - -func (controller *PersistentVolumeProvisionerController) newProvisioner(plugin volume.ProvisionableVolumePlugin, claim *api.PersistentVolumeClaim, pv *api.PersistentVolume) (volume.Provisioner, error) { - tags := make(map[string]string) - tags[cloudVolumeCreatedForClaimNamespaceTag] = claim.Namespace - tags[cloudVolumeCreatedForClaimNameTag] = claim.Name - - // pv can be nil when the provisioner has not created the PV yet - if pv != nil { - tags[cloudVolumeCreatedForVolumeNameTag] = pv.Name - } - - volumeOptions := volume.VolumeOptions{ - Capacity: claim.Spec.Resources.Requests[api.ResourceName(api.ResourceStorage)], - AccessModes: claim.Spec.AccessModes, - PersistentVolumeReclaimPolicy: api.PersistentVolumeReclaimDelete, - CloudTags: &tags, - ClusterName: controller.clusterName, - } - - if pv != nil { - volumeOptions.PVName = pv.Name - } - - provisioner, err := plugin.NewProvisioner(volumeOptions) - return provisioner, err -} - -// controllerClient abstracts access to PVs and PVCs. Easy to mock for testing and wrap for real client. -type controllerClient interface { - CreatePersistentVolume(pv *api.PersistentVolume) (*api.PersistentVolume, error) - ListPersistentVolumes(options api.ListOptions) (*api.PersistentVolumeList, error) - WatchPersistentVolumes(options api.ListOptions) (watch.Interface, error) - GetPersistentVolume(name string) (*api.PersistentVolume, error) - UpdatePersistentVolume(volume *api.PersistentVolume) (*api.PersistentVolume, error) - DeletePersistentVolume(volume *api.PersistentVolume) error - UpdatePersistentVolumeStatus(volume *api.PersistentVolume) (*api.PersistentVolume, error) - - GetPersistentVolumeClaim(namespace, name string) (*api.PersistentVolumeClaim, error) - ListPersistentVolumeClaims(namespace string, options api.ListOptions) (*api.PersistentVolumeClaimList, error) - WatchPersistentVolumeClaims(namespace string, options api.ListOptions) (watch.Interface, error) - UpdatePersistentVolumeClaim(claim *api.PersistentVolumeClaim) (*api.PersistentVolumeClaim, error) - UpdatePersistentVolumeClaimStatus(claim *api.PersistentVolumeClaim) (*api.PersistentVolumeClaim, error) - - // provided to give VolumeHost and plugins access to the kube client - GetKubeClient() clientset.Interface -} - -func NewControllerClient(c clientset.Interface) controllerClient { - return &realControllerClient{c} -} - -var _ controllerClient = &realControllerClient{} - -type realControllerClient struct { - client clientset.Interface -} - -func (c *realControllerClient) GetPersistentVolume(name string) (*api.PersistentVolume, error) { - return c.client.Core().PersistentVolumes().Get(name) -} - -func (c *realControllerClient) ListPersistentVolumes(options api.ListOptions) (*api.PersistentVolumeList, error) { - return c.client.Core().PersistentVolumes().List(options) -} - -func (c *realControllerClient) WatchPersistentVolumes(options api.ListOptions) (watch.Interface, error) { - return c.client.Core().PersistentVolumes().Watch(options) -} - -func (c *realControllerClient) CreatePersistentVolume(pv *api.PersistentVolume) (*api.PersistentVolume, error) { - return c.client.Core().PersistentVolumes().Create(pv) -} - -func (c *realControllerClient) UpdatePersistentVolume(volume *api.PersistentVolume) (*api.PersistentVolume, error) { - return c.client.Core().PersistentVolumes().Update(volume) -} - -func (c *realControllerClient) DeletePersistentVolume(volume *api.PersistentVolume) error { - return c.client.Core().PersistentVolumes().Delete(volume.Name, nil) -} - -func (c *realControllerClient) UpdatePersistentVolumeStatus(volume *api.PersistentVolume) (*api.PersistentVolume, error) { - return c.client.Core().PersistentVolumes().UpdateStatus(volume) -} - -func (c *realControllerClient) GetPersistentVolumeClaim(namespace, name string) (*api.PersistentVolumeClaim, error) { - return c.client.Core().PersistentVolumeClaims(namespace).Get(name) -} - -func (c *realControllerClient) ListPersistentVolumeClaims(namespace string, options api.ListOptions) (*api.PersistentVolumeClaimList, error) { - return c.client.Core().PersistentVolumeClaims(namespace).List(options) -} - -func (c *realControllerClient) WatchPersistentVolumeClaims(namespace string, options api.ListOptions) (watch.Interface, error) { - return c.client.Core().PersistentVolumeClaims(namespace).Watch(options) -} - -func (c *realControllerClient) UpdatePersistentVolumeClaim(claim *api.PersistentVolumeClaim) (*api.PersistentVolumeClaim, error) { - return c.client.Core().PersistentVolumeClaims(claim.Namespace).Update(claim) -} - -func (c *realControllerClient) UpdatePersistentVolumeClaimStatus(claim *api.PersistentVolumeClaim) (*api.PersistentVolumeClaim, error) { - return c.client.Core().PersistentVolumeClaims(claim.Namespace).UpdateStatus(claim) -} - -func (c *realControllerClient) GetKubeClient() clientset.Interface { - return c.client -} - -func keyExists(key string, haystack map[string]string) bool { - _, exists := haystack[key] - return exists -} - -func isProvisioningComplete(pv *api.PersistentVolume) bool { - return isAnnotationMatch(pvProvisioningRequiredAnnotationKey, pvProvisioningCompletedAnnotationValue, pv.Annotations) -} - -func isAnnotationMatch(key, needle string, haystack map[string]string) bool { - value, exists := haystack[key] - if !exists { - return false - } - return value == needle -} - -func isRecyclable(policy api.PersistentVolumeReclaimPolicy) bool { - return policy == api.PersistentVolumeReclaimDelete || policy == api.PersistentVolumeReclaimRecycle -} - -// VolumeHost implementation -// PersistentVolumeRecycler is host to the volume plugins, but does not actually mount any volumes. -// Because no mounting is performed, most of the VolumeHost methods are not implemented. -func (c *PersistentVolumeProvisionerController) GetPluginDir(podUID string) string { - return "" -} - -func (c *PersistentVolumeProvisionerController) GetPodVolumeDir(podUID types.UID, pluginName, volumeName string) string { - return "" -} - -func (c *PersistentVolumeProvisionerController) GetPodPluginDir(podUID types.UID, pluginName string) string { - return "" -} - -func (c *PersistentVolumeProvisionerController) GetKubeClient() clientset.Interface { - return c.client.GetKubeClient() -} - -func (c *PersistentVolumeProvisionerController) NewWrapperMounter(volName string, spec volume.Spec, pod *api.Pod, opts volume.VolumeOptions) (volume.Mounter, error) { - return nil, fmt.Errorf("NewWrapperMounter not supported by PVClaimBinder's VolumeHost implementation") -} - -func (c *PersistentVolumeProvisionerController) NewWrapperUnmounter(volName string, spec volume.Spec, podUID types.UID) (volume.Unmounter, error) { - return nil, fmt.Errorf("NewWrapperUnmounter not supported by PVClaimBinder's VolumeHost implementation") -} - -func (c *PersistentVolumeProvisionerController) GetCloudProvider() cloudprovider.Interface { - return c.cloud -} - -func (c *PersistentVolumeProvisionerController) GetMounter() mount.Interface { - return nil -} - -func (c *PersistentVolumeProvisionerController) GetWriter() io.Writer { - return nil -} - -func (c *PersistentVolumeProvisionerController) GetHostName() string { - return "" -} - -const ( - // these pair of constants are used by the provisioner. - // The key is a kube namespaced key that denotes a volume requires provisioning. - // The value is set only when provisioning is completed. Any other value will tell the provisioner - // that provisioning has not yet occurred. - pvProvisioningRequiredAnnotationKey = "volume.experimental.kubernetes.io/provisioning-required" - pvProvisioningCompletedAnnotationValue = "volume.experimental.kubernetes.io/provisioning-completed" -) diff --git a/pkg/controller/persistentvolume/persistentvolume_provisioner_controller_test.go b/pkg/controller/persistentvolume/persistentvolume_provisioner_controller_test.go deleted file mode 100644 index c72e8e4473e..00000000000 --- a/pkg/controller/persistentvolume/persistentvolume_provisioner_controller_test.go +++ /dev/null @@ -1,295 +0,0 @@ -/* -Copyright 2015 The Kubernetes Authors All rights reserved. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package persistentvolume - -import ( - "fmt" - "testing" - "time" - - "k8s.io/kubernetes/pkg/api" - "k8s.io/kubernetes/pkg/api/errors" - "k8s.io/kubernetes/pkg/api/resource" - "k8s.io/kubernetes/pkg/api/testapi" - clientset "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset" - fake_cloud "k8s.io/kubernetes/pkg/cloudprovider/providers/fake" - "k8s.io/kubernetes/pkg/util" - volumetest "k8s.io/kubernetes/pkg/volume/testing" - "k8s.io/kubernetes/pkg/watch" -) - -func TestProvisionerRunStop(t *testing.T) { - controller, _, _ := makeTestController() - - if len(controller.stopChannels) != 0 { - t.Errorf("Non-running provisioner should not have any stopChannels. Got %v", len(controller.stopChannels)) - } - - controller.Run() - - if len(controller.stopChannels) != 2 { - t.Errorf("Running provisioner should have exactly 2 stopChannels. Got %v", len(controller.stopChannels)) - } - - controller.Stop() - - if len(controller.stopChannels) != 0 { - t.Errorf("Non-running provisioner should not have any stopChannels. Got %v", len(controller.stopChannels)) - } -} - -func makeTestVolume() *api.PersistentVolume { - return &api.PersistentVolume{ - ObjectMeta: api.ObjectMeta{ - Annotations: map[string]string{}, - Name: "pv01", - }, - Spec: api.PersistentVolumeSpec{ - PersistentVolumeReclaimPolicy: api.PersistentVolumeReclaimDelete, - AccessModes: []api.PersistentVolumeAccessMode{api.ReadWriteOnce}, - Capacity: api.ResourceList{ - api.ResourceName(api.ResourceStorage): resource.MustParse("10Gi"), - }, - PersistentVolumeSource: api.PersistentVolumeSource{ - HostPath: &api.HostPathVolumeSource{ - Path: "/somepath/data01", - }, - }, - }, - } -} - -func makeTestClaim() *api.PersistentVolumeClaim { - return &api.PersistentVolumeClaim{ - ObjectMeta: api.ObjectMeta{ - Annotations: map[string]string{}, - Name: "claim01", - Namespace: "ns", - SelfLink: testapi.Default.SelfLink("pvc", ""), - }, - Spec: api.PersistentVolumeClaimSpec{ - AccessModes: []api.PersistentVolumeAccessMode{api.ReadWriteOnce}, - Resources: api.ResourceRequirements{ - Requests: api.ResourceList{ - api.ResourceName(api.ResourceStorage): resource.MustParse("8G"), - }, - }, - }, - } -} - -func makeTestController() (*PersistentVolumeProvisionerController, *mockControllerClient, *volumetest.FakeVolumePlugin) { - mockClient := &mockControllerClient{} - mockVolumePlugin := &volumetest.FakeVolumePlugin{} - controller, _ := NewPersistentVolumeProvisionerController(mockClient, 1*time.Second, "fake-kubernetes", nil, mockVolumePlugin, &fake_cloud.FakeCloud{}) - return controller, mockClient, mockVolumePlugin -} - -func TestReconcileClaim(t *testing.T) { - controller, mockClient, _ := makeTestController() - pvc := makeTestClaim() - - // watch would have added the claim to the store - controller.claimStore.Add(pvc) - // store it in fake API server - mockClient.UpdatePersistentVolumeClaim(pvc) - - err := controller.reconcileClaim(pvc) - if err != nil { - t.Errorf("Unexpected error: %v", err) - } - - // non-provisionable PVC should not have created a volume on reconciliation - if mockClient.volume != nil { - t.Error("Unexpected volume found in mock client. Expected nil") - } - - pvc.Annotations[qosProvisioningKey] = "foo" - // store it in fake API server - mockClient.UpdatePersistentVolumeClaim(pvc) - - err = controller.reconcileClaim(pvc) - if err != nil { - t.Errorf("Unexpected error: %v", err) - } - - // PVC requesting provisioning should have a PV created for it - if mockClient.volume == nil { - t.Error("Expected to find bound volume but got nil") - } - - if mockClient.volume.Spec.ClaimRef.Name != pvc.Name { - t.Errorf("Expected PV to be bound to %s but got %s", mockClient.volume.Spec.ClaimRef.Name, pvc.Name) - } - - // the PVC should have correct annotation - if mockClient.claim.Annotations[pvProvisioningRequiredAnnotationKey] != pvProvisioningCompletedAnnotationValue { - t.Errorf("Annotation %q not set", pvProvisioningRequiredAnnotationKey) - } - - // Run the syncClaim 2nd time to simulate periodic sweep running in parallel - // to the previous syncClaim. There is a lock in handleUpdateVolume(), so - // they will be called sequentially, but the second call will have old - // version of the claim. - oldPVName := mockClient.volume.Name - - // Make the "old" claim - pvc2 := makeTestClaim() - pvc2.Annotations[qosProvisioningKey] = "foo" - // Add a dummy annotation so we recognize the claim was updated (i.e. - // stored in mockClient) - pvc2.Annotations["test"] = "test" - - err = controller.reconcileClaim(pvc2) - if err != nil { - t.Errorf("Unexpected error: %v", err) - } - - // The 2nd PVC should be ignored, no new PV was created - if val, found := pvc2.Annotations[pvProvisioningRequiredAnnotationKey]; found { - t.Errorf("2nd PVC got unexpected annotation %q: %q", pvProvisioningRequiredAnnotationKey, val) - } - if mockClient.volume.Name != oldPVName { - t.Errorf("2nd PVC unexpectedly provisioned a new volume") - } - if _, found := mockClient.claim.Annotations["test"]; found { - t.Errorf("2nd PVC was unexpectedly updated") - } -} - -func checkTagValue(t *testing.T, tags map[string]string, tag string, expectedValue string) { - value, found := tags[tag] - if !found || value != expectedValue { - t.Errorf("Expected tag value %s = %s but value %s found", tag, expectedValue, value) - } -} - -func TestReconcileVolume(t *testing.T) { - - controller, mockClient, mockVolumePlugin := makeTestController() - pv := makeTestVolume() - pvc := makeTestClaim() - mockClient.volume = pv - - err := controller.reconcileVolume(pv) - if err != nil { - t.Errorf("Unexpected error %v", err) - } - - // watch adds claim to the store. - // we need to add it to our mock client to mimic normal Get call - controller.claimStore.Add(pvc) - mockClient.claim = pvc - - // pretend the claim and volume are bound, no provisioning required - claimRef, _ := api.GetReference(pvc) - pv.Spec.ClaimRef = claimRef - mockClient.volume = pv - err = controller.reconcileVolume(pv) - if err != nil { - t.Errorf("Unexpected error %v", err) - } - - pv.Annotations[pvProvisioningRequiredAnnotationKey] = "!pvProvisioningCompleted" - pv.Annotations[qosProvisioningKey] = "foo" - mockClient.volume = pv - err = controller.reconcileVolume(pv) - - if !isAnnotationMatch(pvProvisioningRequiredAnnotationKey, pvProvisioningCompletedAnnotationValue, mockClient.volume.Annotations) { - t.Errorf("Expected %s but got %s", pvProvisioningRequiredAnnotationKey, mockClient.volume.Annotations[pvProvisioningRequiredAnnotationKey]) - } - - // Check that the volume plugin was called with correct tags - tags := *mockVolumePlugin.LastProvisionerOptions.CloudTags - checkTagValue(t, tags, cloudVolumeCreatedForClaimNamespaceTag, pvc.Namespace) - checkTagValue(t, tags, cloudVolumeCreatedForClaimNameTag, pvc.Name) - checkTagValue(t, tags, cloudVolumeCreatedForVolumeNameTag, pv.Name) - -} - -var _ controllerClient = &mockControllerClient{} - -type mockControllerClient struct { - volume *api.PersistentVolume - claim *api.PersistentVolumeClaim -} - -func (c *mockControllerClient) GetPersistentVolume(name string) (*api.PersistentVolume, error) { - return c.volume, nil -} - -func (c *mockControllerClient) CreatePersistentVolume(pv *api.PersistentVolume) (*api.PersistentVolume, error) { - if pv.GenerateName != "" && pv.Name == "" { - pv.Name = fmt.Sprintf(pv.GenerateName, util.NewUUID()) - } - c.volume = pv - return c.volume, nil -} - -func (c *mockControllerClient) ListPersistentVolumes(options api.ListOptions) (*api.PersistentVolumeList, error) { - return &api.PersistentVolumeList{ - Items: []api.PersistentVolume{*c.volume}, - }, nil -} - -func (c *mockControllerClient) WatchPersistentVolumes(options api.ListOptions) (watch.Interface, error) { - return watch.NewFake(), nil -} - -func (c *mockControllerClient) UpdatePersistentVolume(pv *api.PersistentVolume) (*api.PersistentVolume, error) { - return c.CreatePersistentVolume(pv) -} - -func (c *mockControllerClient) DeletePersistentVolume(volume *api.PersistentVolume) error { - c.volume = nil - return nil -} - -func (c *mockControllerClient) UpdatePersistentVolumeStatus(volume *api.PersistentVolume) (*api.PersistentVolume, error) { - return volume, nil -} - -func (c *mockControllerClient) GetPersistentVolumeClaim(namespace, name string) (*api.PersistentVolumeClaim, error) { - if c.claim != nil { - return c.claim, nil - } else { - return nil, errors.NewNotFound(api.Resource("persistentvolumes"), name) - } -} - -func (c *mockControllerClient) ListPersistentVolumeClaims(namespace string, options api.ListOptions) (*api.PersistentVolumeClaimList, error) { - return &api.PersistentVolumeClaimList{ - Items: []api.PersistentVolumeClaim{*c.claim}, - }, nil -} - -func (c *mockControllerClient) WatchPersistentVolumeClaims(namespace string, options api.ListOptions) (watch.Interface, error) { - return watch.NewFake(), nil -} - -func (c *mockControllerClient) UpdatePersistentVolumeClaim(claim *api.PersistentVolumeClaim) (*api.PersistentVolumeClaim, error) { - c.claim = claim - return c.claim, nil -} - -func (c *mockControllerClient) UpdatePersistentVolumeClaimStatus(claim *api.PersistentVolumeClaim) (*api.PersistentVolumeClaim, error) { - return claim, nil -} - -func (c *mockControllerClient) GetKubeClient() clientset.Interface { - return nil -} diff --git a/pkg/controller/persistentvolume/persistentvolume_recycler_controller.go b/pkg/controller/persistentvolume/persistentvolume_recycler_controller.go deleted file mode 100644 index e73a5b9ebc7..00000000000 --- a/pkg/controller/persistentvolume/persistentvolume_recycler_controller.go +++ /dev/null @@ -1,415 +0,0 @@ -/* -Copyright 2014 The Kubernetes Authors All rights reserved. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package persistentvolume - -import ( - "fmt" - "time" - - "github.com/golang/glog" - "k8s.io/kubernetes/pkg/api" - "k8s.io/kubernetes/pkg/client/cache" - clientset "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset" - "k8s.io/kubernetes/pkg/cloudprovider" - "k8s.io/kubernetes/pkg/controller/framework" - "k8s.io/kubernetes/pkg/runtime" - "k8s.io/kubernetes/pkg/types" - ioutil "k8s.io/kubernetes/pkg/util/io" - "k8s.io/kubernetes/pkg/util/metrics" - "k8s.io/kubernetes/pkg/util/mount" - "k8s.io/kubernetes/pkg/volume" - "k8s.io/kubernetes/pkg/watch" -) - -var _ volume.VolumeHost = &PersistentVolumeRecycler{} - -// PersistentVolumeRecycler is a controller that watches for PersistentVolumes that are released from their claims. -// This controller will Recycle those volumes whose reclaim policy is set to PersistentVolumeReclaimRecycle and make them -// available again for a new claim. -type PersistentVolumeRecycler struct { - volumeController *framework.Controller - stopChannel chan struct{} - client recyclerClient - kubeClient clientset.Interface - pluginMgr volume.VolumePluginMgr - cloud cloudprovider.Interface - maximumRetry int - syncPeriod time.Duration - // Local cache of failed recycle / delete operations. Map volume.Name -> status of the volume. - // Only PVs in Released state have an entry here. - releasedVolumes map[string]releasedVolumeStatus -} - -// releasedVolumeStatus holds state of failed delete/recycle operation on a -// volume. The controller re-tries the operation several times and it stores -// retry count + timestamp of the last attempt here. -type releasedVolumeStatus struct { - // How many recycle/delete operations failed. - retryCount int - // Timestamp of the last attempt. - lastAttempt time.Time -} - -// NewPersistentVolumeRecycler creates a new PersistentVolumeRecycler -func NewPersistentVolumeRecycler(kubeClient clientset.Interface, syncPeriod time.Duration, maximumRetry int, plugins []volume.VolumePlugin, cloud cloudprovider.Interface) (*PersistentVolumeRecycler, error) { - recyclerClient := NewRecyclerClient(kubeClient) - if kubeClient != nil && kubeClient.Core().GetRESTClient().GetRateLimiter() != nil { - metrics.RegisterMetricAndTrackRateLimiterUsage("pv_recycler_controller", kubeClient.Core().GetRESTClient().GetRateLimiter()) - } - recycler := &PersistentVolumeRecycler{ - client: recyclerClient, - kubeClient: kubeClient, - cloud: cloud, - maximumRetry: maximumRetry, - syncPeriod: syncPeriod, - releasedVolumes: make(map[string]releasedVolumeStatus), - } - - if err := recycler.pluginMgr.InitPlugins(plugins, recycler); err != nil { - return nil, fmt.Errorf("Could not initialize volume plugins for PVClaimBinder: %+v", err) - } - - _, volumeController := framework.NewInformer( - &cache.ListWatch{ - ListFunc: func(options api.ListOptions) (runtime.Object, error) { - return kubeClient.Core().PersistentVolumes().List(options) - }, - WatchFunc: func(options api.ListOptions) (watch.Interface, error) { - return kubeClient.Core().PersistentVolumes().Watch(options) - }, - }, - &api.PersistentVolume{}, - syncPeriod, - framework.ResourceEventHandlerFuncs{ - AddFunc: func(obj interface{}) { - pv, ok := obj.(*api.PersistentVolume) - if !ok { - glog.Errorf("Error casting object to PersistentVolume: %v", obj) - return - } - recycler.reclaimVolume(pv) - }, - UpdateFunc: func(oldObj, newObj interface{}) { - pv, ok := newObj.(*api.PersistentVolume) - if !ok { - glog.Errorf("Error casting object to PersistentVolume: %v", newObj) - return - } - recycler.reclaimVolume(pv) - }, - DeleteFunc: func(obj interface{}) { - pv, ok := obj.(*api.PersistentVolume) - if !ok { - glog.Errorf("Error casting object to PersistentVolume: %v", obj) - return - } - recycler.reclaimVolume(pv) - recycler.removeReleasedVolume(pv) - }, - }, - ) - - recycler.volumeController = volumeController - return recycler, nil -} - -// shouldRecycle checks a volume and returns nil, if the volume should be -// recycled right now. Otherwise it returns an error with reason why it should -// not be recycled. -func (recycler *PersistentVolumeRecycler) shouldRecycle(pv *api.PersistentVolume) error { - if pv.Spec.ClaimRef == nil { - return fmt.Errorf("Volume does not have a reference to claim") - } - if pv.Status.Phase != api.VolumeReleased { - return fmt.Errorf("The volume is not in 'Released' phase") - } - - // The volume is Released, should we retry recycling? - status, found := recycler.releasedVolumes[pv.Name] - if !found { - // We don't know anything about this volume. The controller has been - // restarted or the volume has been marked as Released by another - // controller. Recycle/delete this volume as if it was just Released. - glog.V(5).Infof("PersistentVolume[%s] not found in local cache, recycling", pv.Name) - return nil - } - - // Check the timestamp - expectedRetry := status.lastAttempt.Add(recycler.syncPeriod) - if time.Now().After(expectedRetry) { - glog.V(5).Infof("PersistentVolume[%s] retrying recycle after timeout", pv.Name) - return nil - } - // It's too early - glog.V(5).Infof("PersistentVolume[%s] skipping recycle, it's too early: now: %v, next retry: %v", pv.Name, time.Now(), expectedRetry) - return fmt.Errorf("Too early after previous failure") -} - -func (recycler *PersistentVolumeRecycler) reclaimVolume(pv *api.PersistentVolume) error { - glog.V(5).Infof("Recycler: checking PersistentVolume[%s]\n", pv.Name) - // Always load the latest version of the volume - newPV, err := recycler.client.GetPersistentVolume(pv.Name) - if err != nil { - return fmt.Errorf("Could not find PersistentVolume %s", pv.Name) - } - pv = newPV - - err = recycler.shouldRecycle(pv) - if err == nil { - glog.V(5).Infof("Reclaiming PersistentVolume[%s]\n", pv.Name) - - // both handleRecycle and handleDelete block until completion - // TODO: allow parallel recycling operations to increase throughput - switch pv.Spec.PersistentVolumeReclaimPolicy { - case api.PersistentVolumeReclaimRecycle: - err = recycler.handleRecycle(pv) - case api.PersistentVolumeReclaimDelete: - err = recycler.handleDelete(pv) - case api.PersistentVolumeReclaimRetain: - glog.V(5).Infof("Volume %s is set to retain after release. Skipping.\n", pv.Name) - default: - err = fmt.Errorf("No PersistentVolumeReclaimPolicy defined for spec: %+v", pv) - } - if err != nil { - errMsg := fmt.Sprintf("Could not recycle volume spec: %+v", err) - glog.Errorf(errMsg) - return fmt.Errorf(errMsg) - } - return nil - } - glog.V(3).Infof("PersistentVolume[%s] phase %s - skipping: %v", pv.Name, pv.Status.Phase, err) - return nil -} - -// handleReleaseFailure evaluates a failed Recycle/Delete operation, updates -// internal controller state with new nr. of attempts and timestamp of the last -// attempt. Based on the number of failures it returns the next state of the -// volume (Released / Failed). -func (recycler *PersistentVolumeRecycler) handleReleaseFailure(pv *api.PersistentVolume) api.PersistentVolumePhase { - status, found := recycler.releasedVolumes[pv.Name] - if !found { - // First failure, set retryCount to 0 (will be inceremented few lines below) - status = releasedVolumeStatus{} - } - status.retryCount += 1 - - if status.retryCount > recycler.maximumRetry { - // This was the last attempt. Remove any internal state and mark the - // volume as Failed. - glog.V(3).Infof("PersistentVolume[%s] failed %d times - marking Failed", pv.Name, status.retryCount) - recycler.removeReleasedVolume(pv) - return api.VolumeFailed - } - - status.lastAttempt = time.Now() - recycler.releasedVolumes[pv.Name] = status - return api.VolumeReleased -} - -func (recycler *PersistentVolumeRecycler) removeReleasedVolume(pv *api.PersistentVolume) { - delete(recycler.releasedVolumes, pv.Name) -} - -func (recycler *PersistentVolumeRecycler) handleRecycle(pv *api.PersistentVolume) error { - glog.V(5).Infof("Recycling PersistentVolume[%s]\n", pv.Name) - - currentPhase := pv.Status.Phase - nextPhase := currentPhase - - spec := volume.NewSpecFromPersistentVolume(pv, false) - plugin, err := recycler.pluginMgr.FindRecyclablePluginBySpec(spec) - if err != nil { - nextPhase = api.VolumeFailed - pv.Status.Message = fmt.Sprintf("%v", err) - } - - // an error above means a suitable plugin for this volume was not found. - // we don't need to attempt recycling when plugin is nil, but we do need to persist the next/failed phase - // of the volume so that subsequent syncs won't attempt recycling through this handler func. - if plugin != nil { - volRecycler, err := plugin.NewRecycler(spec) - if err != nil { - return fmt.Errorf("Could not obtain Recycler for spec: %#v error: %v", spec, err) - } - // blocks until completion - if err := volRecycler.Recycle(); err != nil { - glog.Errorf("PersistentVolume[%s] failed recycling: %+v", pv.Name, err) - pv.Status.Message = fmt.Sprintf("Recycling error: %s", err) - nextPhase = recycler.handleReleaseFailure(pv) - } else { - glog.V(5).Infof("PersistentVolume[%s] successfully recycled\n", pv.Name) - // The volume has been recycled. Remove any internal state to make - // any subsequent bind+recycle cycle working. - recycler.removeReleasedVolume(pv) - nextPhase = api.VolumePending - } - } - - if currentPhase != nextPhase { - glog.V(5).Infof("PersistentVolume[%s] changing phase from %s to %s\n", pv.Name, currentPhase, nextPhase) - pv.Status.Phase = nextPhase - _, err := recycler.client.UpdatePersistentVolumeStatus(pv) - if err != nil { - // Rollback to previous phase - pv.Status.Phase = currentPhase - } - } - - return nil -} - -func (recycler *PersistentVolumeRecycler) handleDelete(pv *api.PersistentVolume) error { - glog.V(5).Infof("Deleting PersistentVolume[%s]\n", pv.Name) - - currentPhase := pv.Status.Phase - nextPhase := currentPhase - - spec := volume.NewSpecFromPersistentVolume(pv, false) - plugin, err := recycler.pluginMgr.FindDeletablePluginBySpec(spec) - if err != nil { - nextPhase = api.VolumeFailed - pv.Status.Message = fmt.Sprintf("%v", err) - } - - // an error above means a suitable plugin for this volume was not found. - // we don't need to attempt deleting when plugin is nil, but we do need to persist the next/failed phase - // of the volume so that subsequent syncs won't attempt deletion through this handler func. - if plugin != nil { - deleter, err := plugin.NewDeleter(spec) - if err != nil { - return fmt.Errorf("Could not obtain Deleter for spec: %#v error: %v", spec, err) - } - // blocks until completion - err = deleter.Delete() - if err != nil { - glog.Errorf("PersistentVolume[%s] failed deletion: %+v", pv.Name, err) - pv.Status.Message = fmt.Sprintf("Deletion error: %s", err) - nextPhase = recycler.handleReleaseFailure(pv) - } else { - glog.V(5).Infof("PersistentVolume[%s] successfully deleted through plugin\n", pv.Name) - recycler.removeReleasedVolume(pv) - // after successful deletion through the plugin, we can also remove the PV from the cluster - if err := recycler.client.DeletePersistentVolume(pv); err != nil { - return fmt.Errorf("error deleting persistent volume: %+v", err) - } - } - } - - if currentPhase != nextPhase { - glog.V(5).Infof("PersistentVolume[%s] changing phase from %s to %s\n", pv.Name, currentPhase, nextPhase) - pv.Status.Phase = nextPhase - _, err := recycler.client.UpdatePersistentVolumeStatus(pv) - if err != nil { - // Rollback to previous phase - pv.Status.Phase = currentPhase - } - } - - return nil -} - -// Run starts this recycler's control loops -func (recycler *PersistentVolumeRecycler) Run() { - glog.V(5).Infof("Starting PersistentVolumeRecycler\n") - if recycler.stopChannel == nil { - recycler.stopChannel = make(chan struct{}) - go recycler.volumeController.Run(recycler.stopChannel) - } -} - -// Stop gracefully shuts down this binder -func (recycler *PersistentVolumeRecycler) Stop() { - glog.V(5).Infof("Stopping PersistentVolumeRecycler\n") - if recycler.stopChannel != nil { - close(recycler.stopChannel) - recycler.stopChannel = nil - } -} - -// recyclerClient abstracts access to PVs -type recyclerClient interface { - GetPersistentVolume(name string) (*api.PersistentVolume, error) - UpdatePersistentVolume(volume *api.PersistentVolume) (*api.PersistentVolume, error) - DeletePersistentVolume(volume *api.PersistentVolume) error - UpdatePersistentVolumeStatus(volume *api.PersistentVolume) (*api.PersistentVolume, error) -} - -func NewRecyclerClient(c clientset.Interface) recyclerClient { - return &realRecyclerClient{c} -} - -type realRecyclerClient struct { - client clientset.Interface -} - -func (c *realRecyclerClient) GetPersistentVolume(name string) (*api.PersistentVolume, error) { - return c.client.Core().PersistentVolumes().Get(name) -} - -func (c *realRecyclerClient) UpdatePersistentVolume(volume *api.PersistentVolume) (*api.PersistentVolume, error) { - return c.client.Core().PersistentVolumes().Update(volume) -} - -func (c *realRecyclerClient) DeletePersistentVolume(volume *api.PersistentVolume) error { - return c.client.Core().PersistentVolumes().Delete(volume.Name, nil) -} - -func (c *realRecyclerClient) UpdatePersistentVolumeStatus(volume *api.PersistentVolume) (*api.PersistentVolume, error) { - return c.client.Core().PersistentVolumes().UpdateStatus(volume) -} - -// PersistentVolumeRecycler is host to the volume plugins, but does not actually mount any volumes. -// Because no mounting is performed, most of the VolumeHost methods are not implemented. -func (f *PersistentVolumeRecycler) GetPluginDir(podUID string) string { - return "" -} - -func (f *PersistentVolumeRecycler) GetPodVolumeDir(podUID types.UID, pluginName, volumeName string) string { - return "" -} - -func (f *PersistentVolumeRecycler) GetPodPluginDir(podUID types.UID, pluginName string) string { - return "" -} - -func (f *PersistentVolumeRecycler) GetKubeClient() clientset.Interface { - return f.kubeClient -} - -func (f *PersistentVolumeRecycler) NewWrapperMounter(volName string, spec volume.Spec, pod *api.Pod, opts volume.VolumeOptions) (volume.Mounter, error) { - return nil, fmt.Errorf("NewWrapperMounter not supported by PVClaimBinder's VolumeHost implementation") -} - -func (f *PersistentVolumeRecycler) NewWrapperUnmounter(volName string, spec volume.Spec, podUID types.UID) (volume.Unmounter, error) { - return nil, fmt.Errorf("NewWrapperUnmounter not supported by PVClaimBinder's VolumeHost implementation") -} - -func (f *PersistentVolumeRecycler) GetCloudProvider() cloudprovider.Interface { - return f.cloud -} - -func (f *PersistentVolumeRecycler) GetMounter() mount.Interface { - return nil -} - -func (f *PersistentVolumeRecycler) GetWriter() ioutil.Writer { - return nil -} - -func (f *PersistentVolumeRecycler) GetHostName() string { - return "" -} diff --git a/pkg/controller/persistentvolume/persistentvolume_recycler_controller_test.go b/pkg/controller/persistentvolume/persistentvolume_recycler_controller_test.go deleted file mode 100644 index 8312fd32210..00000000000 --- a/pkg/controller/persistentvolume/persistentvolume_recycler_controller_test.go +++ /dev/null @@ -1,265 +0,0 @@ -/* -Copyright 2015 The Kubernetes Authors All rights reserved. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package persistentvolume - -import ( - "fmt" - "testing" - "time" - - "k8s.io/kubernetes/pkg/api" - "k8s.io/kubernetes/pkg/api/resource" - "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset/fake" - "k8s.io/kubernetes/pkg/volume" - "k8s.io/kubernetes/pkg/volume/host_path" - volumetest "k8s.io/kubernetes/pkg/volume/testing" -) - -const ( - mySyncPeriod = 2 * time.Second - myMaximumRetry = 3 -) - -func TestFailedRecycling(t *testing.T) { - pv := preparePV() - - mockClient := &mockBinderClient{ - volume: pv, - } - - // no Init called for pluginMgr and no plugins are available. Volume should fail recycling. - plugMgr := volume.VolumePluginMgr{} - - recycler := &PersistentVolumeRecycler{ - kubeClient: fake.NewSimpleClientset(), - client: mockClient, - pluginMgr: plugMgr, - releasedVolumes: make(map[string]releasedVolumeStatus), - } - - err := recycler.reclaimVolume(pv) - if err != nil { - t.Errorf("Unexpected non-nil error: %v", err) - } - - if mockClient.volume.Status.Phase != api.VolumeFailed { - t.Errorf("Expected %s but got %s", api.VolumeFailed, mockClient.volume.Status.Phase) - } - - // Use a new volume for the next test - pv = preparePV() - mockClient.volume = pv - - pv.Spec.PersistentVolumeReclaimPolicy = api.PersistentVolumeReclaimDelete - err = recycler.reclaimVolume(pv) - if err != nil { - t.Errorf("Unexpected non-nil error: %v", err) - } - - if mockClient.volume.Status.Phase != api.VolumeFailed { - t.Errorf("Expected %s but got %s", api.VolumeFailed, mockClient.volume.Status.Phase) - } -} - -func TestRecyclingRetry(t *testing.T) { - // Test that recycler controller retries to recycle a volume several times, which succeeds eventually - pv := preparePV() - - mockClient := &mockBinderClient{ - volume: pv, - } - - plugMgr := volume.VolumePluginMgr{} - // Use a fake NewRecycler function - plugMgr.InitPlugins(host_path.ProbeRecyclableVolumePlugins(newFailingMockRecycler, volume.VolumeConfig{}), volumetest.NewFakeVolumeHost("/tmp/fake", nil, nil)) - // Reset a global call counter - failedCallCount = 0 - - recycler := &PersistentVolumeRecycler{ - kubeClient: fake.NewSimpleClientset(), - client: mockClient, - pluginMgr: plugMgr, - syncPeriod: mySyncPeriod, - maximumRetry: myMaximumRetry, - releasedVolumes: make(map[string]releasedVolumeStatus), - } - - // All but the last attempt will fail - testRecycleFailures(t, recycler, mockClient, pv, myMaximumRetry-1) - - // The last attempt should succeed - err := recycler.reclaimVolume(pv) - if err != nil { - t.Errorf("Last step: Recycler failed: %v", err) - } - - if mockClient.volume.Status.Phase != api.VolumePending { - t.Errorf("Last step: The volume should be Pending, but is %s instead", mockClient.volume.Status.Phase) - } - // Check the cache, it should not have any entry - status, found := recycler.releasedVolumes[pv.Name] - if found { - t.Errorf("Last step: Expected PV to be removed from cache, got %v", status) - } -} - -func TestRecyclingRetryAlwaysFail(t *testing.T) { - // Test that recycler controller retries to recycle a volume several times, which always fails. - pv := preparePV() - - mockClient := &mockBinderClient{ - volume: pv, - } - - plugMgr := volume.VolumePluginMgr{} - // Use a fake NewRecycler function - plugMgr.InitPlugins(host_path.ProbeRecyclableVolumePlugins(newAlwaysFailingMockRecycler, volume.VolumeConfig{}), volumetest.NewFakeVolumeHost("/tmp/fake", nil, nil)) - // Reset a global call counter - failedCallCount = 0 - - recycler := &PersistentVolumeRecycler{ - kubeClient: fake.NewSimpleClientset(), - client: mockClient, - pluginMgr: plugMgr, - syncPeriod: mySyncPeriod, - maximumRetry: myMaximumRetry, - releasedVolumes: make(map[string]releasedVolumeStatus), - } - - // myMaximumRetry recycle attempts will fail - testRecycleFailures(t, recycler, mockClient, pv, myMaximumRetry) - - // The volume should be failed after myMaximumRetry attempts - err := recycler.reclaimVolume(pv) - if err != nil { - t.Errorf("Last step: Recycler failed: %v", err) - } - - if mockClient.volume.Status.Phase != api.VolumeFailed { - t.Errorf("Last step: The volume should be Failed, but is %s instead", mockClient.volume.Status.Phase) - } - // Check the cache, it should not have any entry - status, found := recycler.releasedVolumes[pv.Name] - if found { - t.Errorf("Last step: Expected PV to be removed from cache, got %v", status) - } -} - -func preparePV() *api.PersistentVolume { - return &api.PersistentVolume{ - Spec: api.PersistentVolumeSpec{ - AccessModes: []api.PersistentVolumeAccessMode{api.ReadWriteOnce}, - Capacity: api.ResourceList{ - api.ResourceName(api.ResourceStorage): resource.MustParse("8Gi"), - }, - PersistentVolumeSource: api.PersistentVolumeSource{ - HostPath: &api.HostPathVolumeSource{ - Path: "/tmp/data02", - }, - }, - PersistentVolumeReclaimPolicy: api.PersistentVolumeReclaimRecycle, - ClaimRef: &api.ObjectReference{ - Name: "foo", - Namespace: "bar", - }, - }, - Status: api.PersistentVolumeStatus{ - Phase: api.VolumeReleased, - }, - } -} - -// Test that `count` attempts to recycle a PV fails. -func testRecycleFailures(t *testing.T, recycler *PersistentVolumeRecycler, mockClient *mockBinderClient, pv *api.PersistentVolume, count int) { - for i := 1; i <= count; i++ { - err := recycler.reclaimVolume(pv) - if err != nil { - t.Errorf("STEP %d: Recycler faled: %v", i, err) - } - - // Check the status, it should be failed - if mockClient.volume.Status.Phase != api.VolumeReleased { - t.Errorf("STEP %d: The volume should be Released, but is %s instead", i, mockClient.volume.Status.Phase) - } - - // Check the failed volume cache - status, found := recycler.releasedVolumes[pv.Name] - if !found { - t.Errorf("STEP %d: cannot find released volume status", i) - } - if status.retryCount != i { - t.Errorf("STEP %d: Expected nr. of attempts to be %d, got %d", i, i, status.retryCount) - } - - // call reclaimVolume too early, it should not increment the retryCount - time.Sleep(mySyncPeriod / 2) - err = recycler.reclaimVolume(pv) - if err != nil { - t.Errorf("STEP %d: Recycler failed: %v", i, err) - } - - status, found = recycler.releasedVolumes[pv.Name] - if !found { - t.Errorf("STEP %d: cannot find released volume status", i) - } - if status.retryCount != i { - t.Errorf("STEP %d: Expected nr. of attempts to be %d, got %d", i, i, status.retryCount) - } - - // Call the next reclaimVolume() after full pvRecycleRetryPeriod - time.Sleep(mySyncPeriod / 2) - } -} - -func newFailingMockRecycler(spec *volume.Spec, host volume.VolumeHost, config volume.VolumeConfig) (volume.Recycler, error) { - return &failingMockRecycler{ - path: spec.PersistentVolume.Spec.HostPath.Path, - errorCount: myMaximumRetry - 1, // fail two times and then successfully recycle the volume - }, nil -} - -func newAlwaysFailingMockRecycler(spec *volume.Spec, host volume.VolumeHost, config volume.VolumeConfig) (volume.Recycler, error) { - return &failingMockRecycler{ - path: spec.PersistentVolume.Spec.HostPath.Path, - errorCount: 1000, // always fail - }, nil -} - -type failingMockRecycler struct { - path string - // How many times should the recycler fail before returning success. - errorCount int - volume.MetricsNil -} - -// Counter of failingMockRecycler.Recycle() calls. Global variable just for -// testing. It's too much code to create a custom volume plugin, which would -// hold this variable. -var failedCallCount = 0 - -func (r *failingMockRecycler) GetPath() string { - return r.path -} - -func (r *failingMockRecycler) Recycle() error { - failedCallCount += 1 - if failedCallCount <= r.errorCount { - return fmt.Errorf("Failing for %d. time", failedCallCount) - } - // return nil means recycle passed - return nil -}