Remove all three PersistentVolume controllers.

We will add new ones gradually in smaller chunks.
This commit is contained in:
Jan Safranek 2016-05-17 14:55:02 +02:00
parent dee24333ff
commit 6fa527a460
6 changed files with 0 additions and 2773 deletions

View File

@ -1,530 +0,0 @@
/*
Copyright 2014 The Kubernetes Authors All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package persistentvolume
import (
"fmt"
"sync"
"time"
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/api/errors"
"k8s.io/kubernetes/pkg/client/cache"
clientset "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset"
"k8s.io/kubernetes/pkg/controller/framework"
"k8s.io/kubernetes/pkg/conversion"
"k8s.io/kubernetes/pkg/runtime"
"k8s.io/kubernetes/pkg/util/metrics"
"k8s.io/kubernetes/pkg/watch"
"github.com/golang/glog"
)
// PersistentVolumeClaimBinder is a controller that synchronizes PersistentVolumeClaims.
type PersistentVolumeClaimBinder struct {
volumeIndex *persistentVolumeOrderedIndex
volumeController *framework.Controller
claimController *framework.Controller
client binderClient
stopChannels map[string]chan struct{}
lock sync.RWMutex
}
// NewPersistentVolumeClaimBinder creates a new PersistentVolumeClaimBinder
func NewPersistentVolumeClaimBinder(kubeClient clientset.Interface, syncPeriod time.Duration) *PersistentVolumeClaimBinder {
if kubeClient != nil && kubeClient.Core().GetRESTClient().GetRateLimiter() != nil {
metrics.RegisterMetricAndTrackRateLimiterUsage("pv_claim_binder_controller", kubeClient.Core().GetRESTClient().GetRateLimiter())
}
volumeIndex := NewPersistentVolumeOrderedIndex()
binderClient := NewBinderClient(kubeClient)
binder := &PersistentVolumeClaimBinder{
volumeIndex: volumeIndex,
client: binderClient,
}
_, volumeController := framework.NewInformer(
&cache.ListWatch{
ListFunc: func(options api.ListOptions) (runtime.Object, error) {
return kubeClient.Core().PersistentVolumes().List(options)
},
WatchFunc: func(options api.ListOptions) (watch.Interface, error) {
return kubeClient.Core().PersistentVolumes().Watch(options)
},
},
&api.PersistentVolume{},
// TODO: Can we have much longer period here?
syncPeriod,
framework.ResourceEventHandlerFuncs{
AddFunc: binder.addVolume,
UpdateFunc: binder.updateVolume,
DeleteFunc: binder.deleteVolume,
},
)
_, claimController := framework.NewInformer(
&cache.ListWatch{
ListFunc: func(options api.ListOptions) (runtime.Object, error) {
return kubeClient.Core().PersistentVolumeClaims(api.NamespaceAll).List(options)
},
WatchFunc: func(options api.ListOptions) (watch.Interface, error) {
return kubeClient.Core().PersistentVolumeClaims(api.NamespaceAll).Watch(options)
},
},
&api.PersistentVolumeClaim{},
// TODO: Can we have much longer period here?
syncPeriod,
framework.ResourceEventHandlerFuncs{
AddFunc: binder.addClaim,
UpdateFunc: binder.updateClaim,
DeleteFunc: binder.deleteClaim,
},
)
binder.claimController = claimController
binder.volumeController = volumeController
return binder
}
func (binder *PersistentVolumeClaimBinder) addVolume(obj interface{}) {
binder.lock.Lock()
defer binder.lock.Unlock()
pv, ok := obj.(*api.PersistentVolume)
if !ok {
glog.Errorf("Expected PersistentVolume but handler received %+v", obj)
return
}
if err := syncVolume(binder.volumeIndex, binder.client, pv); err != nil {
glog.Errorf("PVClaimBinder could not add volume %s: %+v", pv.Name, err)
}
}
func (binder *PersistentVolumeClaimBinder) updateVolume(oldObj, newObj interface{}) {
binder.lock.Lock()
defer binder.lock.Unlock()
newVolume, ok := newObj.(*api.PersistentVolume)
if !ok {
glog.Errorf("Expected PersistentVolume but handler received %+v", newObj)
return
}
if err := binder.volumeIndex.Update(newVolume); err != nil {
glog.Errorf("Error updating volume %s in index: %v", newVolume.Name, err)
return
}
if err := syncVolume(binder.volumeIndex, binder.client, newVolume); err != nil {
glog.Errorf("PVClaimBinder could not update volume %s: %+v", newVolume.Name, err)
}
}
func (binder *PersistentVolumeClaimBinder) deleteVolume(obj interface{}) {
binder.lock.Lock()
defer binder.lock.Unlock()
volume, ok := obj.(*api.PersistentVolume)
if !ok {
glog.Errorf("Expected PersistentVolume but handler received %+v", obj)
return
}
if err := binder.volumeIndex.Delete(volume); err != nil {
glog.Errorf("Error deleting volume %s from index: %v", volume.Name, err)
}
}
func (binder *PersistentVolumeClaimBinder) addClaim(obj interface{}) {
binder.lock.Lock()
defer binder.lock.Unlock()
claim, ok := obj.(*api.PersistentVolumeClaim)
if !ok {
glog.Errorf("Expected PersistentVolumeClaim but handler received %+v", obj)
return
}
if err := syncClaim(binder.volumeIndex, binder.client, claim); err != nil {
glog.Errorf("PVClaimBinder could not add claim %s: %+v", claim.Name, err)
}
}
func (binder *PersistentVolumeClaimBinder) updateClaim(oldObj, newObj interface{}) {
binder.lock.Lock()
defer binder.lock.Unlock()
newClaim, ok := newObj.(*api.PersistentVolumeClaim)
if !ok {
glog.Errorf("Expected PersistentVolumeClaim but handler received %+v", newObj)
return
}
if err := syncClaim(binder.volumeIndex, binder.client, newClaim); err != nil {
glog.Errorf("PVClaimBinder could not update claim %s: %+v", newClaim.Name, err)
}
}
func (binder *PersistentVolumeClaimBinder) deleteClaim(obj interface{}) {
binder.lock.Lock()
defer binder.lock.Unlock()
var volume *api.PersistentVolume
if pvc, ok := obj.(*api.PersistentVolumeClaim); ok {
if pvObj, exists, _ := binder.volumeIndex.GetByKey(pvc.Spec.VolumeName); exists {
if pv, ok := pvObj.(*api.PersistentVolume); ok {
volume = pv
}
}
}
if unk, ok := obj.(cache.DeletedFinalStateUnknown); ok && unk.Obj != nil {
if pv, ok := unk.Obj.(*api.PersistentVolume); ok {
volume = pv
}
}
// sync the volume when its claim is deleted. Explicitly sync'ing the volume here in response to
// claim deletion prevents the volume from waiting until the next sync period for its Release.
if volume != nil {
err := syncVolume(binder.volumeIndex, binder.client, volume)
if err != nil {
glog.Errorf("PVClaimBinder could not update volume %s from deleteClaim handler: %+v", volume.Name, err)
}
}
}
func syncVolume(volumeIndex *persistentVolumeOrderedIndex, binderClient binderClient, volume *api.PersistentVolume) (err error) {
glog.V(5).Infof("Synchronizing PersistentVolume[%s], current phase: %s\n", volume.Name, volume.Status.Phase)
// The PV may have been modified by parallel call to syncVolume, load
// the current version.
newPv, err := binderClient.GetPersistentVolume(volume.Name)
if err != nil {
return fmt.Errorf("Cannot reload volume %s: %v", volume.Name, err)
}
volume = newPv
// volumes can be in one of the following states:
//
// VolumePending -- default value -- not bound to a claim and not yet processed through this controller.
// VolumeAvailable -- not bound to a claim, but processed at least once and found in this controller's volumeIndex.
// VolumeBound -- bound to a claim because volume.Spec.ClaimRef != nil. Claim status may not be correct.
// VolumeReleased -- volume.Spec.ClaimRef != nil but the claim has been deleted by the user.
// VolumeFailed -- volume.Spec.ClaimRef != nil and the volume failed processing in the recycler
currentPhase := volume.Status.Phase
nextPhase := currentPhase
// Always store the newest volume state in local cache.
_, exists, err := volumeIndex.Get(volume)
if err != nil {
return err
}
if !exists {
volumeIndex.Add(volume)
} else {
volumeIndex.Update(volume)
}
if isBeingProvisioned(volume) {
glog.V(4).Infof("Skipping PersistentVolume[%s], waiting for provisioning to finish", volume.Name)
return nil
}
switch currentPhase {
case api.VolumePending:
// 4 possible states:
// 1. ClaimRef != nil, Claim exists, Claim UID == ClaimRef UID: Prebound to claim. Make volume available for binding (it will match PVC).
// 2. ClaimRef != nil, Claim exists, Claim UID != ClaimRef UID: Recently recycled. Remove bind. Make volume available for new claim.
// 3. ClaimRef != nil, Claim !exists: Recently recycled. Remove bind. Make volume available for new claim.
// 4. ClaimRef == nil: Neither recycled nor prebound. Make volume available for binding.
nextPhase = api.VolumeAvailable
if volume.Spec.ClaimRef != nil {
claim, err := binderClient.GetPersistentVolumeClaim(volume.Spec.ClaimRef.Namespace, volume.Spec.ClaimRef.Name)
switch {
case err != nil && !errors.IsNotFound(err):
return fmt.Errorf("Error getting PersistentVolumeClaim[%s/%s]: %v", volume.Spec.ClaimRef.Namespace, volume.Spec.ClaimRef.Name, err)
case errors.IsNotFound(err) || (claim != nil && claim.UID != volume.Spec.ClaimRef.UID):
glog.V(5).Infof("PersistentVolume[%s] has a claim ref to a claim which does not exist", volume.Name)
if volume.Spec.PersistentVolumeReclaimPolicy == api.PersistentVolumeReclaimRecycle {
// Pending volumes that have a ClaimRef where the claim is missing were recently recycled.
// The Recycler set the phase to VolumePending to start the volume at the beginning of this lifecycle.
// removing ClaimRef unbinds the volume
clone, err := conversion.NewCloner().DeepCopy(volume)
if err != nil {
return fmt.Errorf("Error cloning pv: %v", err)
}
volumeClone, ok := clone.(*api.PersistentVolume)
if !ok {
return fmt.Errorf("Unexpected pv cast error : %v\n", volumeClone)
}
glog.V(5).Infof("PersistentVolume[%s] is recently recycled; remove claimRef.", volume.Name)
volumeClone.Spec.ClaimRef = nil
if updatedVolume, err := binderClient.UpdatePersistentVolume(volumeClone); err != nil {
return fmt.Errorf("Unexpected error saving PersistentVolume: %+v", err)
} else {
volume = updatedVolume
volumeIndex.Update(volume)
}
} else {
// Pending volumes that has a ClaimRef and the claim is missing and is was not recycled.
// It must have been freshly provisioned and the claim was deleted during the provisioning.
// Mark the volume as Released, it will be deleted.
nextPhase = api.VolumeReleased
}
}
// Dynamically provisioned claims remain Pending until its volume is completely provisioned.
// The provisioner updates the PV and triggers this update for the volume. Explicitly sync'ing
// the claim here prevents the need to wait until the next sync period when the claim would normally
// advance to Bound phase. Otherwise, the maximum wait time for the claim to be Bound is the default sync period.
if claim != nil && claim.Status.Phase == api.ClaimPending && keyExists(qosProvisioningKey, claim.Annotations) && isProvisioningComplete(volume) {
syncClaim(volumeIndex, binderClient, claim)
}
}
glog.V(5).Infof("PersistentVolume[%s] is available\n", volume.Name)
// available volumes await a claim
case api.VolumeAvailable:
if volume.Spec.ClaimRef != nil {
_, err := binderClient.GetPersistentVolumeClaim(volume.Spec.ClaimRef.Namespace, volume.Spec.ClaimRef.Name)
if err == nil {
// change of phase will trigger an update event with the newly bound volume
glog.V(5).Infof("PersistentVolume[%s] is now bound\n", volume.Name)
nextPhase = api.VolumeBound
} else {
if errors.IsNotFound(err) {
nextPhase = api.VolumeReleased
}
}
}
//bound volumes require verification of their bound claims
case api.VolumeBound:
if volume.Spec.ClaimRef == nil {
return fmt.Errorf("PersistentVolume[%s] expected to be bound but found nil claimRef: %+v", volume.Name, volume)
} else {
claim, err := binderClient.GetPersistentVolumeClaim(volume.Spec.ClaimRef.Namespace, volume.Spec.ClaimRef.Name)
// A volume is Released when its bound claim cannot be found in the API server.
// A claim by the same name can be found if deleted and recreated before this controller can release
// the volume from the original claim, so a UID check is necessary.
if err != nil {
if errors.IsNotFound(err) {
nextPhase = api.VolumeReleased
} else {
return err
}
} else if claim != nil && claim.UID != volume.Spec.ClaimRef.UID {
nextPhase = api.VolumeReleased
}
}
// released volumes require recycling
case api.VolumeReleased:
if volume.Spec.ClaimRef == nil {
return fmt.Errorf("PersistentVolume[%s] expected to be bound but found nil claimRef: %+v", volume.Name, volume)
} else {
// another process is watching for released volumes.
// PersistentVolumeReclaimPolicy is set per PersistentVolume
// Recycle - sets the PV to Pending and back under this controller's management
// Delete - delete events are handled by this controller's watch. PVs are removed from the index.
}
// volumes are removed by processes external to this binder and must be removed from the cluster
case api.VolumeFailed:
if volume.Spec.ClaimRef == nil {
return fmt.Errorf("PersistentVolume[%s] expected to be bound but found nil claimRef: %+v", volume.Name, volume)
} else {
glog.V(5).Infof("PersistentVolume[%s] previously failed recycling. Skipping.\n", volume.Name)
}
}
if currentPhase != nextPhase {
volume.Status.Phase = nextPhase
// a change in state will trigger another update through this controller.
// each pass through this controller evaluates current phase and decides whether or not to change to the next phase
glog.V(5).Infof("PersistentVolume[%s] changing phase from %s to %s\n", volume.Name, currentPhase, nextPhase)
volume, err := binderClient.UpdatePersistentVolumeStatus(volume)
if err != nil {
// Rollback to previous phase
volume.Status.Phase = currentPhase
}
volumeIndex.Update(volume)
}
return nil
}
func syncClaim(volumeIndex *persistentVolumeOrderedIndex, binderClient binderClient, claim *api.PersistentVolumeClaim) (err error) {
glog.V(5).Infof("Synchronizing PersistentVolumeClaim[%s] for binding", claim.Name)
// The claim may have been modified by parallel call to syncClaim, load
// the current version.
newClaim, err := binderClient.GetPersistentVolumeClaim(claim.Namespace, claim.Name)
if err != nil {
return fmt.Errorf("Cannot reload claim %s/%s: %v", claim.Namespace, claim.Name, err)
}
claim = newClaim
switch claim.Status.Phase {
case api.ClaimPending:
// claims w/ a storage-class annotation for provisioning with *only* match volumes with a ClaimRef of the claim.
volume, err := volumeIndex.findBestMatchForClaim(claim)
if err != nil {
return err
}
if volume == nil {
glog.V(5).Infof("A volume match does not exist for persistent claim: %s", claim.Name)
return nil
}
if isBeingProvisioned(volume) {
glog.V(5).Infof("PersistentVolume[%s] for PersistentVolumeClaim[%s/%s] is still being provisioned.", volume.Name, claim.Namespace, claim.Name)
return nil
}
claimRef, err := api.GetReference(claim)
if err != nil {
return fmt.Errorf("Unexpected error getting claim reference: %v\n", err)
}
// Make a binding reference to the claim by persisting claimRef on the volume.
// The local cache must be updated with the new bind to prevent subsequent
// claims from binding to the volume.
if volume.Spec.ClaimRef == nil {
clone, err := conversion.NewCloner().DeepCopy(volume)
if err != nil {
return fmt.Errorf("Error cloning pv: %v", err)
}
volumeClone, ok := clone.(*api.PersistentVolume)
if !ok {
return fmt.Errorf("Unexpected pv cast error : %v\n", volumeClone)
}
volumeClone.Spec.ClaimRef = claimRef
if updatedVolume, err := binderClient.UpdatePersistentVolume(volumeClone); err != nil {
return fmt.Errorf("Unexpected error saving PersistentVolume.Status: %+v", err)
} else {
volume = updatedVolume
volumeIndex.Update(updatedVolume)
}
}
// the bind is persisted on the volume above and will always match the claim in a search.
// claim would remain Pending if the update fails, so processing this state is idempotent.
// this only needs to be processed once.
if claim.Spec.VolumeName != volume.Name {
claim.Spec.VolumeName = volume.Name
claim, err = binderClient.UpdatePersistentVolumeClaim(claim)
if err != nil {
return fmt.Errorf("Error updating claim with VolumeName %s: %+v\n", volume.Name, err)
}
}
claim.Status.Phase = api.ClaimBound
claim.Status.AccessModes = volume.Spec.AccessModes
claim.Status.Capacity = volume.Spec.Capacity
_, err = binderClient.UpdatePersistentVolumeClaimStatus(claim)
if err != nil {
return fmt.Errorf("Unexpected error saving claim status: %+v", err)
}
case api.ClaimBound:
// no-op. Claim is bound, values from PV are set. PVCs are technically mutable in the API server
// and we don't want to handle those changes at this time.
default:
return fmt.Errorf("Unknown state for PVC: %#v", claim)
}
glog.V(5).Infof("PersistentVolumeClaim[%s] is bound\n", claim.Name)
return nil
}
func isBeingProvisioned(volume *api.PersistentVolume) bool {
value, found := volume.Annotations[pvProvisioningRequiredAnnotationKey]
if found && value != pvProvisioningCompletedAnnotationValue {
return true
}
return false
}
// Run starts all of this binder's control loops
func (controller *PersistentVolumeClaimBinder) Run() {
glog.V(5).Infof("Starting PersistentVolumeClaimBinder\n")
if controller.stopChannels == nil {
controller.stopChannels = make(map[string]chan struct{})
}
if _, exists := controller.stopChannels["volumes"]; !exists {
controller.stopChannels["volumes"] = make(chan struct{})
go controller.volumeController.Run(controller.stopChannels["volumes"])
}
if _, exists := controller.stopChannels["claims"]; !exists {
controller.stopChannels["claims"] = make(chan struct{})
go controller.claimController.Run(controller.stopChannels["claims"])
}
}
// Stop gracefully shuts down this binder
func (controller *PersistentVolumeClaimBinder) Stop() {
glog.V(5).Infof("Stopping PersistentVolumeClaimBinder\n")
for name, stopChan := range controller.stopChannels {
close(stopChan)
delete(controller.stopChannels, name)
}
}
// binderClient abstracts access to PVs and PVCs
type binderClient interface {
GetPersistentVolume(name string) (*api.PersistentVolume, error)
UpdatePersistentVolume(volume *api.PersistentVolume) (*api.PersistentVolume, error)
DeletePersistentVolume(volume *api.PersistentVolume) error
UpdatePersistentVolumeStatus(volume *api.PersistentVolume) (*api.PersistentVolume, error)
GetPersistentVolumeClaim(namespace, name string) (*api.PersistentVolumeClaim, error)
UpdatePersistentVolumeClaim(claim *api.PersistentVolumeClaim) (*api.PersistentVolumeClaim, error)
UpdatePersistentVolumeClaimStatus(claim *api.PersistentVolumeClaim) (*api.PersistentVolumeClaim, error)
}
func NewBinderClient(c clientset.Interface) binderClient {
return &realBinderClient{c}
}
type realBinderClient struct {
client clientset.Interface
}
func (c *realBinderClient) GetPersistentVolume(name string) (*api.PersistentVolume, error) {
return c.client.Core().PersistentVolumes().Get(name)
}
func (c *realBinderClient) UpdatePersistentVolume(volume *api.PersistentVolume) (*api.PersistentVolume, error) {
return c.client.Core().PersistentVolumes().Update(volume)
}
func (c *realBinderClient) DeletePersistentVolume(volume *api.PersistentVolume) error {
return c.client.Core().PersistentVolumes().Delete(volume.Name, nil)
}
func (c *realBinderClient) UpdatePersistentVolumeStatus(volume *api.PersistentVolume) (*api.PersistentVolume, error) {
return c.client.Core().PersistentVolumes().UpdateStatus(volume)
}
func (c *realBinderClient) GetPersistentVolumeClaim(namespace, name string) (*api.PersistentVolumeClaim, error) {
return c.client.Core().PersistentVolumeClaims(namespace).Get(name)
}
func (c *realBinderClient) UpdatePersistentVolumeClaim(claim *api.PersistentVolumeClaim) (*api.PersistentVolumeClaim, error) {
return c.client.Core().PersistentVolumeClaims(claim.Namespace).Update(claim)
}
func (c *realBinderClient) UpdatePersistentVolumeClaimStatus(claim *api.PersistentVolumeClaim) (*api.PersistentVolumeClaim, error) {
return c.client.Core().PersistentVolumeClaims(claim.Namespace).UpdateStatus(claim)
}

View File

@ -1,732 +0,0 @@
/*
Copyright 2014 The Kubernetes Authors All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package persistentvolume
import (
"fmt"
"os"
"reflect"
"testing"
"time"
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/api/errors"
"k8s.io/kubernetes/pkg/api/resource"
"k8s.io/kubernetes/pkg/api/testapi"
"k8s.io/kubernetes/pkg/apimachinery/registered"
"k8s.io/kubernetes/pkg/client/cache"
"k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset/fake"
"k8s.io/kubernetes/pkg/client/testing/core"
"k8s.io/kubernetes/pkg/types"
utiltesting "k8s.io/kubernetes/pkg/util/testing"
"k8s.io/kubernetes/pkg/volume"
"k8s.io/kubernetes/pkg/volume/host_path"
volumetest "k8s.io/kubernetes/pkg/volume/testing"
)
func TestRunStop(t *testing.T) {
clientset := fake.NewSimpleClientset()
binder := NewPersistentVolumeClaimBinder(clientset, 1*time.Second)
if len(binder.stopChannels) != 0 {
t.Errorf("Non-running binder should not have any stopChannels. Got %v", len(binder.stopChannels))
}
binder.Run()
if len(binder.stopChannels) != 2 {
t.Errorf("Running binder should have exactly 2 stopChannels. Got %v", len(binder.stopChannels))
}
binder.Stop()
if len(binder.stopChannels) != 0 {
t.Errorf("Non-running binder should not have any stopChannels. Got %v", len(binder.stopChannels))
}
}
func TestClaimRace(t *testing.T) {
tmpDir, err := utiltesting.MkTmpdir("claimbinder-test")
if err != nil {
t.Fatalf("error creating temp dir: %v", err)
}
defer os.RemoveAll(tmpDir)
c1 := &api.PersistentVolumeClaim{
ObjectMeta: api.ObjectMeta{
Name: "c1",
},
Spec: api.PersistentVolumeClaimSpec{
AccessModes: []api.PersistentVolumeAccessMode{api.ReadWriteOnce},
Resources: api.ResourceRequirements{
Requests: api.ResourceList{
api.ResourceName(api.ResourceStorage): resource.MustParse("3Gi"),
},
},
},
Status: api.PersistentVolumeClaimStatus{
Phase: api.ClaimPending,
},
}
c1.ObjectMeta.SelfLink = testapi.Default.SelfLink("pvc", "")
c2 := &api.PersistentVolumeClaim{
ObjectMeta: api.ObjectMeta{
Name: "c2",
},
Spec: api.PersistentVolumeClaimSpec{
AccessModes: []api.PersistentVolumeAccessMode{api.ReadWriteOnce},
Resources: api.ResourceRequirements{
Requests: api.ResourceList{
api.ResourceName(api.ResourceStorage): resource.MustParse("3Gi"),
},
},
},
Status: api.PersistentVolumeClaimStatus{
Phase: api.ClaimPending,
},
}
c2.ObjectMeta.SelfLink = testapi.Default.SelfLink("pvc", "")
v := &api.PersistentVolume{
ObjectMeta: api.ObjectMeta{
Name: "foo",
},
Spec: api.PersistentVolumeSpec{
AccessModes: []api.PersistentVolumeAccessMode{api.ReadWriteOnce},
Capacity: api.ResourceList{
api.ResourceName(api.ResourceStorage): resource.MustParse("10Gi"),
},
PersistentVolumeSource: api.PersistentVolumeSource{
HostPath: &api.HostPathVolumeSource{
Path: fmt.Sprintf("%s/data01", tmpDir),
},
},
},
Status: api.PersistentVolumeStatus{
Phase: api.VolumePending,
},
}
volumeIndex := NewPersistentVolumeOrderedIndex()
mockClient := &mockBinderClient{}
mockClient.volume = v
plugMgr := volume.VolumePluginMgr{}
plugMgr.InitPlugins(host_path.ProbeRecyclableVolumePlugins(newMockRecycler, volume.VolumeConfig{}), volumetest.NewFakeVolumeHost(tmpDir, nil, nil))
// adds the volume to the index, making the volume available
syncVolume(volumeIndex, mockClient, v)
if mockClient.volume.Status.Phase != api.VolumeAvailable {
t.Errorf("Expected phase %s but got %s", api.VolumeAvailable, mockClient.volume.Status.Phase)
}
if _, exists, _ := volumeIndex.Get(v); !exists {
t.Errorf("Expected to find volume in index but it did not exist")
}
// add the claim to fake API server
mockClient.UpdatePersistentVolumeClaim(c1)
// an initial sync for a claim matches the volume
err = syncClaim(volumeIndex, mockClient, c1)
if err != nil {
t.Errorf("Unexpected error: %v", err)
}
if c1.Status.Phase != api.ClaimBound {
t.Errorf("Expected phase %s but got %s", api.ClaimBound, c1.Status.Phase)
}
// before the volume gets updated w/ claimRef, a 2nd claim can attempt to bind and find the same volume
// add the 2nd claim to fake API server
mockClient.UpdatePersistentVolumeClaim(c2)
err = syncClaim(volumeIndex, mockClient, c2)
if err != nil {
t.Errorf("unexpected error for unmatched claim: %v", err)
}
if c2.Status.Phase != api.ClaimPending {
t.Errorf("Expected phase %s but got %s", api.ClaimPending, c2.Status.Phase)
}
}
func TestNewClaimWithSameNameAsOldClaim(t *testing.T) {
c1 := &api.PersistentVolumeClaim{
ObjectMeta: api.ObjectMeta{
Name: "c1",
Namespace: "foo",
UID: "12345",
},
Spec: api.PersistentVolumeClaimSpec{
AccessModes: []api.PersistentVolumeAccessMode{api.ReadWriteOnce},
Resources: api.ResourceRequirements{
Requests: api.ResourceList{
api.ResourceName(api.ResourceStorage): resource.MustParse("3Gi"),
},
},
},
Status: api.PersistentVolumeClaimStatus{
Phase: api.ClaimBound,
},
}
c1.ObjectMeta.SelfLink = testapi.Default.SelfLink("pvc", "")
v := &api.PersistentVolume{
ObjectMeta: api.ObjectMeta{
Name: "foo",
},
Spec: api.PersistentVolumeSpec{
ClaimRef: &api.ObjectReference{
Name: c1.Name,
Namespace: c1.Namespace,
UID: "45678",
},
AccessModes: []api.PersistentVolumeAccessMode{api.ReadWriteOnce},
Capacity: api.ResourceList{
api.ResourceName(api.ResourceStorage): resource.MustParse("10Gi"),
},
PersistentVolumeSource: api.PersistentVolumeSource{
HostPath: &api.HostPathVolumeSource{
Path: "/tmp/data01",
},
},
},
Status: api.PersistentVolumeStatus{
Phase: api.VolumeBound,
},
}
volumeIndex := NewPersistentVolumeOrderedIndex()
mockClient := &mockBinderClient{
claim: c1,
volume: v,
}
plugMgr := volume.VolumePluginMgr{}
plugMgr.InitPlugins(host_path.ProbeRecyclableVolumePlugins(newMockRecycler, volume.VolumeConfig{}), volumetest.NewFakeVolumeHost("/tmp/fake", nil, nil))
syncVolume(volumeIndex, mockClient, v)
if mockClient.volume.Status.Phase != api.VolumeReleased {
t.Errorf("Expected phase %s but got %s", api.VolumeReleased, mockClient.volume.Status.Phase)
}
}
func TestClaimSyncAfterVolumeProvisioning(t *testing.T) {
tmpDir, err := utiltesting.MkTmpdir("claimbinder-test")
if err != nil {
t.Fatalf("error creating temp dir: %v", err)
}
defer os.RemoveAll(tmpDir)
// Tests that binder.syncVolume will also syncClaim if the PV has completed
// provisioning but the claim is still Pending. We want to advance to Bound
// without having to wait until the binder's next sync period.
claim := &api.PersistentVolumeClaim{
ObjectMeta: api.ObjectMeta{
Name: "foo",
Namespace: "bar",
Annotations: map[string]string{
qosProvisioningKey: "foo",
},
},
Spec: api.PersistentVolumeClaimSpec{
AccessModes: []api.PersistentVolumeAccessMode{api.ReadWriteOnce},
Resources: api.ResourceRequirements{
Requests: api.ResourceList{
api.ResourceName(api.ResourceStorage): resource.MustParse("3Gi"),
},
},
},
Status: api.PersistentVolumeClaimStatus{
Phase: api.ClaimPending,
},
}
claim.ObjectMeta.SelfLink = testapi.Default.SelfLink("pvc", "")
claimRef, _ := api.GetReference(claim)
pv := &api.PersistentVolume{
ObjectMeta: api.ObjectMeta{
Name: "foo",
Annotations: map[string]string{
pvProvisioningRequiredAnnotationKey: pvProvisioningCompletedAnnotationValue,
},
},
Spec: api.PersistentVolumeSpec{
AccessModes: []api.PersistentVolumeAccessMode{api.ReadWriteOnce},
Capacity: api.ResourceList{
api.ResourceName(api.ResourceStorage): resource.MustParse("10Gi"),
},
PersistentVolumeSource: api.PersistentVolumeSource{
HostPath: &api.HostPathVolumeSource{
Path: fmt.Sprintf("%s/data01", tmpDir),
},
},
ClaimRef: claimRef,
},
Status: api.PersistentVolumeStatus{
Phase: api.VolumePending,
},
}
volumeIndex := NewPersistentVolumeOrderedIndex()
mockClient := &mockBinderClient{
claim: claim,
volume: pv,
}
plugMgr := volume.VolumePluginMgr{}
plugMgr.InitPlugins(host_path.ProbeRecyclableVolumePlugins(newMockRecycler, volume.VolumeConfig{}), volumetest.NewFakeVolumeHost(tmpDir, nil, nil))
// adds the volume to the index, making the volume available.
// pv also completed provisioning, so syncClaim should cause claim's phase to advance to Bound
syncVolume(volumeIndex, mockClient, pv)
if mockClient.volume.Status.Phase != api.VolumeAvailable {
t.Errorf("Expected phase %s but got %s", api.VolumeAvailable, mockClient.volume.Status.Phase)
}
if mockClient.claim.Status.Phase != api.ClaimBound {
t.Errorf("Expected phase %s but got %s", api.ClaimBound, claim.Status.Phase)
}
}
func TestExampleObjects(t *testing.T) {
scenarios := map[string]struct {
expected interface{}
}{
"claims/claim-01.yaml": {
expected: &api.PersistentVolumeClaim{
Spec: api.PersistentVolumeClaimSpec{
AccessModes: []api.PersistentVolumeAccessMode{api.ReadWriteOnce},
Resources: api.ResourceRequirements{
Requests: api.ResourceList{
api.ResourceName(api.ResourceStorage): resource.MustParse("3Gi"),
},
},
},
},
},
"claims/claim-02.yaml": {
expected: &api.PersistentVolumeClaim{
Spec: api.PersistentVolumeClaimSpec{
AccessModes: []api.PersistentVolumeAccessMode{api.ReadWriteOnce},
Resources: api.ResourceRequirements{
Requests: api.ResourceList{
api.ResourceName(api.ResourceStorage): resource.MustParse("8Gi"),
},
},
},
},
},
"volumes/local-01.yaml": {
expected: &api.PersistentVolume{
Spec: api.PersistentVolumeSpec{
AccessModes: []api.PersistentVolumeAccessMode{api.ReadWriteOnce},
Capacity: api.ResourceList{
api.ResourceName(api.ResourceStorage): resource.MustParse("10Gi"),
},
PersistentVolumeSource: api.PersistentVolumeSource{
HostPath: &api.HostPathVolumeSource{
Path: "/somepath/data01",
},
},
},
},
},
"volumes/local-02.yaml": {
expected: &api.PersistentVolume{
Spec: api.PersistentVolumeSpec{
AccessModes: []api.PersistentVolumeAccessMode{api.ReadWriteOnce},
Capacity: api.ResourceList{
api.ResourceName(api.ResourceStorage): resource.MustParse("8Gi"),
},
PersistentVolumeSource: api.PersistentVolumeSource{
HostPath: &api.HostPathVolumeSource{
Path: "/somepath/data02",
},
},
PersistentVolumeReclaimPolicy: api.PersistentVolumeReclaimRecycle,
},
},
},
}
for name, scenario := range scenarios {
codec := api.Codecs.UniversalDecoder()
o := core.NewObjects(api.Scheme, codec)
if err := core.AddObjectsFromPath("../../../docs/user-guide/persistent-volumes/"+name, o, codec); err != nil {
t.Fatal(err)
}
clientset := &fake.Clientset{}
clientset.AddReactor("*", "*", core.ObjectReaction(o, registered.RESTMapper()))
if reflect.TypeOf(scenario.expected) == reflect.TypeOf(&api.PersistentVolumeClaim{}) {
pvc, err := clientset.Core().PersistentVolumeClaims("ns").Get("doesntmatter")
if err != nil {
t.Fatalf("Error retrieving object: %v", err)
}
expected := scenario.expected.(*api.PersistentVolumeClaim)
if pvc.Spec.AccessModes[0] != expected.Spec.AccessModes[0] {
t.Errorf("Unexpected mismatch. Got %v wanted %v", pvc.Spec.AccessModes[0], expected.Spec.AccessModes[0])
}
aQty := pvc.Spec.Resources.Requests[api.ResourceStorage]
bQty := expected.Spec.Resources.Requests[api.ResourceStorage]
aSize := aQty.Value()
bSize := bQty.Value()
if aSize != bSize {
t.Errorf("Unexpected mismatch. Got %v wanted %v", aSize, bSize)
}
}
if reflect.TypeOf(scenario.expected) == reflect.TypeOf(&api.PersistentVolume{}) {
pv, err := clientset.Core().PersistentVolumes().Get("doesntmatter")
if err != nil {
t.Fatalf("Error retrieving object: %v", err)
}
expected := scenario.expected.(*api.PersistentVolume)
if pv.Spec.AccessModes[0] != expected.Spec.AccessModes[0] {
t.Errorf("Unexpected mismatch. Got %v wanted %v", pv.Spec.AccessModes[0], expected.Spec.AccessModes[0])
}
aQty := pv.Spec.Capacity[api.ResourceStorage]
bQty := expected.Spec.Capacity[api.ResourceStorage]
aSize := aQty.Value()
bSize := bQty.Value()
if aSize != bSize {
t.Errorf("Unexpected mismatch. Got %v wanted %v", aSize, bSize)
}
if pv.Spec.HostPath.Path != expected.Spec.HostPath.Path {
t.Errorf("Unexpected mismatch. Got %v wanted %v", pv.Spec.HostPath.Path, expected.Spec.HostPath.Path)
}
}
}
}
func TestBindingWithExamples(t *testing.T) {
tmpDir, err := utiltesting.MkTmpdir("claimbinder-test")
if err != nil {
t.Fatalf("error creating temp dir: %v", err)
}
defer os.RemoveAll(tmpDir)
codec := api.Codecs.UniversalDecoder()
o := core.NewObjects(api.Scheme, codec)
if err := core.AddObjectsFromPath("../../../docs/user-guide/persistent-volumes/claims/claim-01.yaml", o, codec); err != nil {
t.Fatal(err)
}
if err := core.AddObjectsFromPath("../../../docs/user-guide/persistent-volumes/volumes/local-01.yaml", o, codec); err != nil {
t.Fatal(err)
}
clientset := &fake.Clientset{}
clientset.AddReactor("*", "*", core.ObjectReaction(o, registered.RESTMapper()))
pv, err := clientset.Core().PersistentVolumes().Get("any")
if err != nil {
t.Errorf("Unexpected error getting PV from client: %v", err)
}
pv.Spec.PersistentVolumeReclaimPolicy = api.PersistentVolumeReclaimRecycle
if err != nil {
t.Errorf("Unexpected error getting PV from client: %v", err)
}
pv.ObjectMeta.SelfLink = testapi.Default.SelfLink("pv", "")
// the default value of the PV is Pending. if processed at least once, its status in etcd is Available.
// There was a bug where only Pending volumes were being indexed and made ready for claims.
// Test that !Pending gets correctly added
pv.Status.Phase = api.VolumeAvailable
claim, error := clientset.Core().PersistentVolumeClaims("ns").Get("any")
if error != nil {
t.Errorf("Unexpected error getting PVC from client: %v", err)
}
claim.ObjectMeta.SelfLink = testapi.Default.SelfLink("pvc", "")
volumeIndex := NewPersistentVolumeOrderedIndex()
mockClient := &mockBinderClient{
volume: pv,
claim: claim,
}
plugMgr := volume.VolumePluginMgr{}
plugMgr.InitPlugins(host_path.ProbeRecyclableVolumePlugins(newMockRecycler, volume.VolumeConfig{}), volumetest.NewFakeVolumeHost(tmpDir, nil, nil))
recycler := &PersistentVolumeRecycler{
kubeClient: clientset,
client: mockClient,
pluginMgr: plugMgr,
}
// adds the volume to the index, making the volume available
syncVolume(volumeIndex, mockClient, pv)
if mockClient.volume.Status.Phase != api.VolumeAvailable {
t.Errorf("Expected phase %s but got %s", api.VolumeAvailable, mockClient.volume.Status.Phase)
}
// add the claim to fake API server
mockClient.UpdatePersistentVolumeClaim(claim)
// an initial sync for a claim will bind it to an unbound volume
syncClaim(volumeIndex, mockClient, claim)
// bind expected on pv.Spec but status update hasn't happened yet
if mockClient.volume.Spec.ClaimRef == nil {
t.Errorf("Expected ClaimRef but got nil for pv.Status.ClaimRef\n")
}
if mockClient.volume.Status.Phase != api.VolumeAvailable {
t.Errorf("Expected phase %s but got %s", api.VolumeAvailable, mockClient.volume.Status.Phase)
}
if mockClient.claim.Spec.VolumeName != pv.Name {
t.Errorf("Expected claim.Spec.VolumeName %s but got %s", mockClient.claim.Spec.VolumeName, pv.Name)
}
if mockClient.claim.Status.Phase != api.ClaimBound {
t.Errorf("Expected phase %s but got %s", api.ClaimBound, claim.Status.Phase)
}
// state changes in pvc triggers sync that sets pv attributes to pvc.Status
syncClaim(volumeIndex, mockClient, claim)
if len(mockClient.claim.Status.AccessModes) == 0 {
t.Errorf("Expected %d access modes but got 0", len(pv.Spec.AccessModes))
}
// persisting the bind to pv.Spec.ClaimRef triggers a sync
syncVolume(volumeIndex, mockClient, mockClient.volume)
if mockClient.volume.Status.Phase != api.VolumeBound {
t.Errorf("Expected phase %s but got %s", api.VolumeBound, mockClient.volume.Status.Phase)
}
// pretend the user deleted their claim. periodic resync picks it up.
mockClient.claim = nil
syncVolume(volumeIndex, mockClient, mockClient.volume)
if mockClient.volume.Status.Phase != api.VolumeReleased {
t.Errorf("Expected phase %s but got %s", api.VolumeReleased, mockClient.volume.Status.Phase)
}
// released volumes with a PersistentVolumeReclaimPolicy (recycle/delete) can have further processing
err = recycler.reclaimVolume(mockClient.volume)
if err != nil {
t.Errorf("Unexpected error reclaiming volume: %+v", err)
}
if mockClient.volume.Status.Phase != api.VolumePending {
t.Errorf("Expected phase %s but got %s", api.VolumePending, mockClient.volume.Status.Phase)
}
// after the recycling changes the phase to Pending, the binder picks up again
// to remove any vestiges of binding and make the volume Available again
syncVolume(volumeIndex, mockClient, mockClient.volume)
if mockClient.volume.Status.Phase != api.VolumeAvailable {
t.Errorf("Expected phase %s but got %s", api.VolumeAvailable, mockClient.volume.Status.Phase)
}
if mockClient.volume.Spec.ClaimRef != nil {
t.Errorf("Expected nil ClaimRef: %+v", mockClient.volume.Spec.ClaimRef)
}
}
func TestCasting(t *testing.T) {
clientset := fake.NewSimpleClientset()
binder := NewPersistentVolumeClaimBinder(clientset, 1*time.Second)
pv := &api.PersistentVolume{}
unk := cache.DeletedFinalStateUnknown{}
pvc := &api.PersistentVolumeClaim{
ObjectMeta: api.ObjectMeta{Name: "foo"},
Status: api.PersistentVolumeClaimStatus{Phase: api.ClaimBound},
}
// Inject mockClient into the binder. This prevents weird errors on stderr
// as the binder wants to load PV/PVC from API server.
mockClient := &mockBinderClient{
volume: pv,
claim: pvc,
}
binder.client = mockClient
// none of these should fail casting.
// the real test is not failing when passed DeletedFinalStateUnknown in the deleteHandler
binder.addVolume(pv)
binder.updateVolume(pv, pv)
binder.deleteVolume(pv)
binder.deleteVolume(unk)
binder.addClaim(pvc)
binder.updateClaim(pvc, pvc)
}
func TestRecycledPersistentVolumeUID(t *testing.T) {
tmpDir, err := utiltesting.MkTmpdir("claimbinder-test")
if err != nil {
t.Fatalf("error creating temp dir: %v", err)
}
defer os.RemoveAll(tmpDir)
codec := api.Codecs.UniversalDecoder()
o := core.NewObjects(api.Scheme, codec)
if err := core.AddObjectsFromPath("../../../docs/user-guide/persistent-volumes/claims/claim-01.yaml", o, codec); err != nil {
t.Fatal(err)
}
if err := core.AddObjectsFromPath("../../../docs/user-guide/persistent-volumes/volumes/local-01.yaml", o, codec); err != nil {
t.Fatal(err)
}
clientset := &fake.Clientset{}
clientset.AddReactor("*", "*", core.ObjectReaction(o, registered.RESTMapper()))
pv, err := clientset.Core().PersistentVolumes().Get("any")
if err != nil {
t.Errorf("Unexpected error getting PV from client: %v", err)
}
pv.Spec.PersistentVolumeReclaimPolicy = api.PersistentVolumeReclaimRecycle
if err != nil {
t.Errorf("Unexpected error getting PV from client: %v", err)
}
pv.ObjectMeta.SelfLink = testapi.Default.SelfLink("pv", "")
// the default value of the PV is Pending. if processed at least once, its status in etcd is Available.
// There was a bug where only Pending volumes were being indexed and made ready for claims.
// Test that !Pending gets correctly added
pv.Status.Phase = api.VolumeAvailable
claim, error := clientset.Core().PersistentVolumeClaims("ns").Get("any")
if error != nil {
t.Errorf("Unexpected error getting PVC from client: %v", err)
}
claim.ObjectMeta.SelfLink = testapi.Default.SelfLink("pvc", "")
claim.ObjectMeta.UID = types.UID("uid1")
volumeIndex := NewPersistentVolumeOrderedIndex()
mockClient := &mockBinderClient{
volume: pv,
claim: claim,
}
plugMgr := volume.VolumePluginMgr{}
plugMgr.InitPlugins(host_path.ProbeRecyclableVolumePlugins(newMockRecycler, volume.VolumeConfig{}), volumetest.NewFakeVolumeHost(tmpDir, nil, nil))
recycler := &PersistentVolumeRecycler{
kubeClient: clientset,
client: mockClient,
pluginMgr: plugMgr,
}
// adds the volume to the index, making the volume available
syncVolume(volumeIndex, mockClient, pv)
if mockClient.volume.Status.Phase != api.VolumeAvailable {
t.Errorf("Expected phase %s but got %s", api.VolumeAvailable, mockClient.volume.Status.Phase)
}
// add the claim to fake API server
mockClient.UpdatePersistentVolumeClaim(claim)
// an initial sync for a claim will bind it to an unbound volume
syncClaim(volumeIndex, mockClient, claim)
// pretend the user deleted their claim. periodic resync picks it up.
mockClient.claim = nil
syncVolume(volumeIndex, mockClient, mockClient.volume)
if mockClient.volume.Status.Phase != api.VolumeReleased {
t.Errorf("Expected phase %s but got %s", api.VolumeReleased, mockClient.volume.Status.Phase)
}
// released volumes with a PersistentVolumeReclaimPolicy (recycle/delete) can have further processing
err = recycler.reclaimVolume(mockClient.volume)
if err != nil {
t.Errorf("Unexpected error reclaiming volume: %+v", err)
}
if mockClient.volume.Status.Phase != api.VolumePending {
t.Errorf("Expected phase %s but got %s", api.VolumePending, mockClient.volume.Status.Phase)
}
// after the recycling changes the phase to Pending, the binder picks up again
// to remove any vestiges of binding and make the volume Available again
//
// explicitly set the claim's UID to a different value to ensure that a new claim with the same
// name as what the PV was previously bound still yields an available volume
claim.ObjectMeta.UID = types.UID("uid2")
mockClient.claim = claim
syncVolume(volumeIndex, mockClient, mockClient.volume)
if mockClient.volume.Status.Phase != api.VolumeAvailable {
t.Errorf("Expected phase %s but got %s", api.VolumeAvailable, mockClient.volume.Status.Phase)
}
if mockClient.volume.Spec.ClaimRef != nil {
t.Errorf("Expected nil ClaimRef: %+v", mockClient.volume.Spec.ClaimRef)
}
}
type mockBinderClient struct {
volume *api.PersistentVolume
claim *api.PersistentVolumeClaim
}
func (c *mockBinderClient) GetPersistentVolume(name string) (*api.PersistentVolume, error) {
return c.volume, nil
}
func (c *mockBinderClient) UpdatePersistentVolume(volume *api.PersistentVolume) (*api.PersistentVolume, error) {
c.volume = volume
return c.volume, nil
}
func (c *mockBinderClient) DeletePersistentVolume(volume *api.PersistentVolume) error {
c.volume = nil
return nil
}
func (c *mockBinderClient) UpdatePersistentVolumeStatus(volume *api.PersistentVolume) (*api.PersistentVolume, error) {
c.volume = volume
return c.volume, nil
}
func (c *mockBinderClient) GetPersistentVolumeClaim(namespace, name string) (*api.PersistentVolumeClaim, error) {
if c.claim != nil {
return c.claim, nil
} else {
return nil, errors.NewNotFound(api.Resource("persistentvolumes"), name)
}
}
func (c *mockBinderClient) UpdatePersistentVolumeClaim(claim *api.PersistentVolumeClaim) (*api.PersistentVolumeClaim, error) {
c.claim = claim
return c.claim, nil
}
func (c *mockBinderClient) UpdatePersistentVolumeClaimStatus(claim *api.PersistentVolumeClaim) (*api.PersistentVolumeClaim, error) {
c.claim = claim
return c.claim, nil
}
func newMockRecycler(spec *volume.Spec, host volume.VolumeHost, config volume.VolumeConfig) (volume.Recycler, error) {
return &mockRecycler{
path: spec.PersistentVolume.Spec.HostPath.Path,
}, nil
}
type mockRecycler struct {
path string
host volume.VolumeHost
volume.MetricsNil
}
func (r *mockRecycler) GetPath() string {
return r.path
}
func (r *mockRecycler) Recycle() error {
// return nil means recycle passed
return nil
}

View File

@ -1,536 +0,0 @@
/*
Copyright 2015 The Kubernetes Authors All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package persistentvolume
import (
"fmt"
"sync"
"time"
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/client/cache"
clientset "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset"
"k8s.io/kubernetes/pkg/cloudprovider"
"k8s.io/kubernetes/pkg/controller/framework"
"k8s.io/kubernetes/pkg/conversion"
"k8s.io/kubernetes/pkg/runtime"
"k8s.io/kubernetes/pkg/types"
"k8s.io/kubernetes/pkg/util/io"
"k8s.io/kubernetes/pkg/util/mount"
"k8s.io/kubernetes/pkg/volume"
"k8s.io/kubernetes/pkg/watch"
"github.com/golang/glog"
)
// PersistentVolumeProvisionerController reconciles the state of all PersistentVolumes and PersistentVolumeClaims.
type PersistentVolumeProvisionerController struct {
volumeController *framework.Controller
volumeStore cache.Store
claimController *framework.Controller
claimStore cache.Store
client controllerClient
cloud cloudprovider.Interface
provisioner volume.ProvisionableVolumePlugin
pluginMgr volume.VolumePluginMgr
stopChannels map[string]chan struct{}
mutex sync.RWMutex
clusterName string
}
// constant name values for the controllers stopChannels map.
// the controller uses these for graceful shutdown
const volumesStopChannel = "volumes"
const claimsStopChannel = "claims"
// NewPersistentVolumeProvisionerController creates a new PersistentVolumeProvisionerController
func NewPersistentVolumeProvisionerController(client controllerClient, syncPeriod time.Duration, clusterName string, plugins []volume.VolumePlugin, provisioner volume.ProvisionableVolumePlugin, cloud cloudprovider.Interface) (*PersistentVolumeProvisionerController, error) {
controller := &PersistentVolumeProvisionerController{
client: client,
cloud: cloud,
provisioner: provisioner,
clusterName: clusterName,
}
if err := controller.pluginMgr.InitPlugins(plugins, controller); err != nil {
return nil, fmt.Errorf("Could not initialize volume plugins for PersistentVolumeProvisionerController: %+v", err)
}
glog.V(5).Infof("Initializing provisioner: %s", controller.provisioner.Name())
controller.provisioner.Init(controller)
controller.volumeStore, controller.volumeController = framework.NewInformer(
&cache.ListWatch{
ListFunc: func(options api.ListOptions) (runtime.Object, error) {
return client.ListPersistentVolumes(options)
},
WatchFunc: func(options api.ListOptions) (watch.Interface, error) {
return client.WatchPersistentVolumes(options)
},
},
&api.PersistentVolume{},
syncPeriod,
framework.ResourceEventHandlerFuncs{
AddFunc: controller.handleAddVolume,
UpdateFunc: controller.handleUpdateVolume,
// delete handler not needed in this controller.
// volume deletion is handled by the recycler controller
},
)
controller.claimStore, controller.claimController = framework.NewInformer(
&cache.ListWatch{
ListFunc: func(options api.ListOptions) (runtime.Object, error) {
return client.ListPersistentVolumeClaims(api.NamespaceAll, options)
},
WatchFunc: func(options api.ListOptions) (watch.Interface, error) {
return client.WatchPersistentVolumeClaims(api.NamespaceAll, options)
},
},
&api.PersistentVolumeClaim{},
syncPeriod,
framework.ResourceEventHandlerFuncs{
AddFunc: controller.handleAddClaim,
UpdateFunc: controller.handleUpdateClaim,
// delete handler not needed.
// normal recycling applies when a claim is deleted.
// recycling is handled by the binding controller.
},
)
return controller, nil
}
func (controller *PersistentVolumeProvisionerController) handleAddVolume(obj interface{}) {
controller.mutex.Lock()
defer controller.mutex.Unlock()
cachedPv, _, _ := controller.volumeStore.Get(obj)
if pv, ok := cachedPv.(*api.PersistentVolume); ok {
err := controller.reconcileVolume(pv)
if err != nil {
glog.Errorf("Error reconciling volume %s: %+v", pv.Name, err)
}
}
}
func (controller *PersistentVolumeProvisionerController) handleUpdateVolume(oldObj, newObj interface{}) {
// The flow for Update is the same as Add.
// A volume is only provisioned if not done so already.
controller.handleAddVolume(newObj)
}
func (controller *PersistentVolumeProvisionerController) handleAddClaim(obj interface{}) {
controller.mutex.Lock()
defer controller.mutex.Unlock()
cachedPvc, exists, _ := controller.claimStore.Get(obj)
if !exists {
glog.Errorf("PersistentVolumeClaim does not exist in the local cache: %+v", obj)
return
}
if pvc, ok := cachedPvc.(*api.PersistentVolumeClaim); ok {
err := controller.reconcileClaim(pvc)
if err != nil {
glog.Errorf("Error encoutered reconciling claim %s: %+v", pvc.Name, err)
}
}
}
func (controller *PersistentVolumeProvisionerController) handleUpdateClaim(oldObj, newObj interface{}) {
// The flow for Update is the same as Add.
// A volume is only provisioned for a claim if not done so already.
controller.handleAddClaim(newObj)
}
func (controller *PersistentVolumeProvisionerController) reconcileClaim(claim *api.PersistentVolumeClaim) error {
glog.V(5).Infof("Synchronizing PersistentVolumeClaim[%s] for dynamic provisioning", claim.Name)
// The claim may have been modified by parallel call to reconcileClaim, load
// the current version.
newClaim, err := controller.client.GetPersistentVolumeClaim(claim.Namespace, claim.Name)
if err != nil {
return fmt.Errorf("Cannot reload claim %s/%s: %v", claim.Namespace, claim.Name, err)
}
claim = newClaim
err = controller.claimStore.Update(claim)
if err != nil {
return fmt.Errorf("Cannot update claim %s/%s: %v", claim.Namespace, claim.Name, err)
}
if controller.provisioner == nil {
return fmt.Errorf("No provisioner configured for controller")
}
// no provisioning requested, return Pending. Claim may be pending indefinitely without a match.
if !keyExists(qosProvisioningKey, claim.Annotations) {
glog.V(5).Infof("PersistentVolumeClaim[%s] no provisioning required", claim.Name)
return nil
}
if len(claim.Spec.VolumeName) != 0 {
glog.V(5).Infof("PersistentVolumeClaim[%s] already bound. No provisioning required", claim.Name)
return nil
}
if isAnnotationMatch(pvProvisioningRequiredAnnotationKey, pvProvisioningCompletedAnnotationValue, claim.Annotations) {
glog.V(5).Infof("PersistentVolumeClaim[%s] is already provisioned.", claim.Name)
return nil
}
glog.V(5).Infof("PersistentVolumeClaim[%s] provisioning", claim.Name)
provisioner, err := controller.newProvisioner(controller.provisioner, claim, nil)
if err != nil {
return fmt.Errorf("Unexpected error getting new provisioner for claim %s: %v\n", claim.Name, err)
}
newVolume, err := provisioner.NewPersistentVolumeTemplate()
if err != nil {
return fmt.Errorf("Unexpected error getting new volume template for claim %s: %v\n", claim.Name, err)
}
claimRef, err := api.GetReference(claim)
if err != nil {
return fmt.Errorf("Unexpected error getting claim reference for %s: %v\n", claim.Name, err)
}
storageClass, _ := claim.Annotations[qosProvisioningKey]
// the creation of this volume is the bind to the claim.
// The claim will match the volume during the next sync period when the volume is in the local cache
newVolume.Spec.ClaimRef = claimRef
newVolume.Annotations[pvProvisioningRequiredAnnotationKey] = "true"
newVolume.Annotations[qosProvisioningKey] = storageClass
newVolume, err = controller.client.CreatePersistentVolume(newVolume)
glog.V(5).Infof("Unprovisioned PersistentVolume[%s] created for PVC[%s], which will be fulfilled in the storage provider", newVolume.Name, claim.Name)
if err != nil {
return fmt.Errorf("PersistentVolumeClaim[%s] failed provisioning: %+v", claim.Name, err)
}
claim.Annotations[pvProvisioningRequiredAnnotationKey] = pvProvisioningCompletedAnnotationValue
_, err = controller.client.UpdatePersistentVolumeClaim(claim)
if err != nil {
glog.Errorf("error updating persistent volume claim: %v", err)
}
return nil
}
func (controller *PersistentVolumeProvisionerController) reconcileVolume(pv *api.PersistentVolume) error {
glog.V(5).Infof("PersistentVolume[%s] reconciling", pv.Name)
// The PV may have been modified by parallel call to reconcileVolume, load
// the current version.
newPv, err := controller.client.GetPersistentVolume(pv.Name)
if err != nil {
return fmt.Errorf("Cannot reload volume %s: %v", pv.Name, err)
}
pv = newPv
if pv.Spec.ClaimRef == nil {
glog.V(5).Infof("PersistentVolume[%s] is not bound to a claim. No provisioning required", pv.Name)
return nil
}
// TODO: fix this leaky abstraction. Had to make our own store key because ClaimRef fails the default keyfunc (no Meta on object).
obj, exists, _ := controller.claimStore.GetByKey(fmt.Sprintf("%s/%s", pv.Spec.ClaimRef.Namespace, pv.Spec.ClaimRef.Name))
if !exists {
return fmt.Errorf("PersistentVolumeClaim[%s/%s] not found in local cache", pv.Spec.ClaimRef.Namespace, pv.Spec.ClaimRef.Name)
}
claim, ok := obj.(*api.PersistentVolumeClaim)
if !ok {
return fmt.Errorf("PersistentVolumeClaim expected, but got %v", obj)
}
// no provisioning required, volume is ready and Bound
if !keyExists(pvProvisioningRequiredAnnotationKey, pv.Annotations) {
glog.V(5).Infof("PersistentVolume[%s] does not require provisioning", pv.Name)
return nil
}
// provisioning is completed, volume is ready.
if isProvisioningComplete(pv) {
glog.V(5).Infof("PersistentVolume[%s] is bound and provisioning is complete", pv.Name)
if pv.Spec.ClaimRef.Namespace != claim.Namespace || pv.Spec.ClaimRef.Name != claim.Name {
return fmt.Errorf("pre-bind mismatch - expected %s but found %s/%s", claimToClaimKey(claim), pv.Spec.ClaimRef.Namespace, pv.Spec.ClaimRef.Name)
}
return nil
}
// provisioning is incomplete. Attempt to provision the volume.
glog.V(5).Infof("PersistentVolume[%s] provisioning in progress", pv.Name)
err = provisionVolume(pv, controller)
if err != nil {
return fmt.Errorf("Error provisioning PersistentVolume[%s]: %v", pv.Name, err)
}
return nil
}
// provisionVolume provisions a volume that has been created in the cluster but not yet fulfilled by
// the storage provider.
func provisionVolume(pv *api.PersistentVolume, controller *PersistentVolumeProvisionerController) error {
if isProvisioningComplete(pv) {
return fmt.Errorf("PersistentVolume[%s] is already provisioned", pv.Name)
}
if _, exists := pv.Annotations[qosProvisioningKey]; !exists {
return fmt.Errorf("PersistentVolume[%s] does not contain a provisioning request. Provisioning not required.", pv.Name)
}
if controller.provisioner == nil {
return fmt.Errorf("No provisioner found for volume: %s", pv.Name)
}
// Find the claim in local cache
obj, exists, _ := controller.claimStore.GetByKey(fmt.Sprintf("%s/%s", pv.Spec.ClaimRef.Namespace, pv.Spec.ClaimRef.Name))
if !exists {
return fmt.Errorf("Could not find PersistentVolumeClaim[%s/%s] in local cache", pv.Spec.ClaimRef.Name, pv.Name)
}
claim := obj.(*api.PersistentVolumeClaim)
provisioner, _ := controller.newProvisioner(controller.provisioner, claim, pv)
err := provisioner.Provision(pv)
if err != nil {
glog.Errorf("Could not provision %s", pv.Name)
pv.Status.Phase = api.VolumeFailed
pv.Status.Message = err.Error()
if pv, apiErr := controller.client.UpdatePersistentVolumeStatus(pv); apiErr != nil {
return fmt.Errorf("PersistentVolume[%s] failed provisioning and also failed status update: %v - %v", pv.Name, err, apiErr)
}
return fmt.Errorf("PersistentVolume[%s] failed provisioning: %v", pv.Name, err)
}
clone, err := conversion.NewCloner().DeepCopy(pv)
volumeClone, ok := clone.(*api.PersistentVolume)
if !ok {
return fmt.Errorf("Unexpected pv cast error : %v\n", volumeClone)
}
volumeClone.Annotations[pvProvisioningRequiredAnnotationKey] = pvProvisioningCompletedAnnotationValue
pv, err = controller.client.UpdatePersistentVolume(volumeClone)
if err != nil {
// TODO: https://github.com/kubernetes/kubernetes/issues/14443
// the volume was created in the infrastructure and likely has a PV name on it,
// but we failed to save the annotation that marks the volume as provisioned.
return fmt.Errorf("Error updating PersistentVolume[%s] with provisioning completed annotation. There is a potential for dupes and orphans.", volumeClone.Name)
}
return nil
}
// Run starts all of this controller's control loops
func (controller *PersistentVolumeProvisionerController) Run() {
glog.V(5).Infof("Starting PersistentVolumeProvisionerController\n")
if controller.stopChannels == nil {
controller.stopChannels = make(map[string]chan struct{})
}
if _, exists := controller.stopChannels[volumesStopChannel]; !exists {
controller.stopChannels[volumesStopChannel] = make(chan struct{})
go controller.volumeController.Run(controller.stopChannels[volumesStopChannel])
}
if _, exists := controller.stopChannels[claimsStopChannel]; !exists {
controller.stopChannels[claimsStopChannel] = make(chan struct{})
go controller.claimController.Run(controller.stopChannels[claimsStopChannel])
}
}
// Stop gracefully shuts down this controller
func (controller *PersistentVolumeProvisionerController) Stop() {
glog.V(5).Infof("Stopping PersistentVolumeProvisionerController\n")
for name, stopChan := range controller.stopChannels {
close(stopChan)
delete(controller.stopChannels, name)
}
}
func (controller *PersistentVolumeProvisionerController) newProvisioner(plugin volume.ProvisionableVolumePlugin, claim *api.PersistentVolumeClaim, pv *api.PersistentVolume) (volume.Provisioner, error) {
tags := make(map[string]string)
tags[cloudVolumeCreatedForClaimNamespaceTag] = claim.Namespace
tags[cloudVolumeCreatedForClaimNameTag] = claim.Name
// pv can be nil when the provisioner has not created the PV yet
if pv != nil {
tags[cloudVolumeCreatedForVolumeNameTag] = pv.Name
}
volumeOptions := volume.VolumeOptions{
Capacity: claim.Spec.Resources.Requests[api.ResourceName(api.ResourceStorage)],
AccessModes: claim.Spec.AccessModes,
PersistentVolumeReclaimPolicy: api.PersistentVolumeReclaimDelete,
CloudTags: &tags,
ClusterName: controller.clusterName,
}
if pv != nil {
volumeOptions.PVName = pv.Name
}
provisioner, err := plugin.NewProvisioner(volumeOptions)
return provisioner, err
}
// controllerClient abstracts access to PVs and PVCs. Easy to mock for testing and wrap for real client.
type controllerClient interface {
CreatePersistentVolume(pv *api.PersistentVolume) (*api.PersistentVolume, error)
ListPersistentVolumes(options api.ListOptions) (*api.PersistentVolumeList, error)
WatchPersistentVolumes(options api.ListOptions) (watch.Interface, error)
GetPersistentVolume(name string) (*api.PersistentVolume, error)
UpdatePersistentVolume(volume *api.PersistentVolume) (*api.PersistentVolume, error)
DeletePersistentVolume(volume *api.PersistentVolume) error
UpdatePersistentVolumeStatus(volume *api.PersistentVolume) (*api.PersistentVolume, error)
GetPersistentVolumeClaim(namespace, name string) (*api.PersistentVolumeClaim, error)
ListPersistentVolumeClaims(namespace string, options api.ListOptions) (*api.PersistentVolumeClaimList, error)
WatchPersistentVolumeClaims(namespace string, options api.ListOptions) (watch.Interface, error)
UpdatePersistentVolumeClaim(claim *api.PersistentVolumeClaim) (*api.PersistentVolumeClaim, error)
UpdatePersistentVolumeClaimStatus(claim *api.PersistentVolumeClaim) (*api.PersistentVolumeClaim, error)
// provided to give VolumeHost and plugins access to the kube client
GetKubeClient() clientset.Interface
}
func NewControllerClient(c clientset.Interface) controllerClient {
return &realControllerClient{c}
}
var _ controllerClient = &realControllerClient{}
type realControllerClient struct {
client clientset.Interface
}
func (c *realControllerClient) GetPersistentVolume(name string) (*api.PersistentVolume, error) {
return c.client.Core().PersistentVolumes().Get(name)
}
func (c *realControllerClient) ListPersistentVolumes(options api.ListOptions) (*api.PersistentVolumeList, error) {
return c.client.Core().PersistentVolumes().List(options)
}
func (c *realControllerClient) WatchPersistentVolumes(options api.ListOptions) (watch.Interface, error) {
return c.client.Core().PersistentVolumes().Watch(options)
}
func (c *realControllerClient) CreatePersistentVolume(pv *api.PersistentVolume) (*api.PersistentVolume, error) {
return c.client.Core().PersistentVolumes().Create(pv)
}
func (c *realControllerClient) UpdatePersistentVolume(volume *api.PersistentVolume) (*api.PersistentVolume, error) {
return c.client.Core().PersistentVolumes().Update(volume)
}
func (c *realControllerClient) DeletePersistentVolume(volume *api.PersistentVolume) error {
return c.client.Core().PersistentVolumes().Delete(volume.Name, nil)
}
func (c *realControllerClient) UpdatePersistentVolumeStatus(volume *api.PersistentVolume) (*api.PersistentVolume, error) {
return c.client.Core().PersistentVolumes().UpdateStatus(volume)
}
func (c *realControllerClient) GetPersistentVolumeClaim(namespace, name string) (*api.PersistentVolumeClaim, error) {
return c.client.Core().PersistentVolumeClaims(namespace).Get(name)
}
func (c *realControllerClient) ListPersistentVolumeClaims(namespace string, options api.ListOptions) (*api.PersistentVolumeClaimList, error) {
return c.client.Core().PersistentVolumeClaims(namespace).List(options)
}
func (c *realControllerClient) WatchPersistentVolumeClaims(namespace string, options api.ListOptions) (watch.Interface, error) {
return c.client.Core().PersistentVolumeClaims(namespace).Watch(options)
}
func (c *realControllerClient) UpdatePersistentVolumeClaim(claim *api.PersistentVolumeClaim) (*api.PersistentVolumeClaim, error) {
return c.client.Core().PersistentVolumeClaims(claim.Namespace).Update(claim)
}
func (c *realControllerClient) UpdatePersistentVolumeClaimStatus(claim *api.PersistentVolumeClaim) (*api.PersistentVolumeClaim, error) {
return c.client.Core().PersistentVolumeClaims(claim.Namespace).UpdateStatus(claim)
}
func (c *realControllerClient) GetKubeClient() clientset.Interface {
return c.client
}
func keyExists(key string, haystack map[string]string) bool {
_, exists := haystack[key]
return exists
}
func isProvisioningComplete(pv *api.PersistentVolume) bool {
return isAnnotationMatch(pvProvisioningRequiredAnnotationKey, pvProvisioningCompletedAnnotationValue, pv.Annotations)
}
func isAnnotationMatch(key, needle string, haystack map[string]string) bool {
value, exists := haystack[key]
if !exists {
return false
}
return value == needle
}
func isRecyclable(policy api.PersistentVolumeReclaimPolicy) bool {
return policy == api.PersistentVolumeReclaimDelete || policy == api.PersistentVolumeReclaimRecycle
}
// VolumeHost implementation
// PersistentVolumeRecycler is host to the volume plugins, but does not actually mount any volumes.
// Because no mounting is performed, most of the VolumeHost methods are not implemented.
func (c *PersistentVolumeProvisionerController) GetPluginDir(podUID string) string {
return ""
}
func (c *PersistentVolumeProvisionerController) GetPodVolumeDir(podUID types.UID, pluginName, volumeName string) string {
return ""
}
func (c *PersistentVolumeProvisionerController) GetPodPluginDir(podUID types.UID, pluginName string) string {
return ""
}
func (c *PersistentVolumeProvisionerController) GetKubeClient() clientset.Interface {
return c.client.GetKubeClient()
}
func (c *PersistentVolumeProvisionerController) NewWrapperMounter(volName string, spec volume.Spec, pod *api.Pod, opts volume.VolumeOptions) (volume.Mounter, error) {
return nil, fmt.Errorf("NewWrapperMounter not supported by PVClaimBinder's VolumeHost implementation")
}
func (c *PersistentVolumeProvisionerController) NewWrapperUnmounter(volName string, spec volume.Spec, podUID types.UID) (volume.Unmounter, error) {
return nil, fmt.Errorf("NewWrapperUnmounter not supported by PVClaimBinder's VolumeHost implementation")
}
func (c *PersistentVolumeProvisionerController) GetCloudProvider() cloudprovider.Interface {
return c.cloud
}
func (c *PersistentVolumeProvisionerController) GetMounter() mount.Interface {
return nil
}
func (c *PersistentVolumeProvisionerController) GetWriter() io.Writer {
return nil
}
func (c *PersistentVolumeProvisionerController) GetHostName() string {
return ""
}
const (
// these pair of constants are used by the provisioner.
// The key is a kube namespaced key that denotes a volume requires provisioning.
// The value is set only when provisioning is completed. Any other value will tell the provisioner
// that provisioning has not yet occurred.
pvProvisioningRequiredAnnotationKey = "volume.experimental.kubernetes.io/provisioning-required"
pvProvisioningCompletedAnnotationValue = "volume.experimental.kubernetes.io/provisioning-completed"
)

View File

@ -1,295 +0,0 @@
/*
Copyright 2015 The Kubernetes Authors All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package persistentvolume
import (
"fmt"
"testing"
"time"
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/api/errors"
"k8s.io/kubernetes/pkg/api/resource"
"k8s.io/kubernetes/pkg/api/testapi"
clientset "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset"
fake_cloud "k8s.io/kubernetes/pkg/cloudprovider/providers/fake"
"k8s.io/kubernetes/pkg/util"
volumetest "k8s.io/kubernetes/pkg/volume/testing"
"k8s.io/kubernetes/pkg/watch"
)
func TestProvisionerRunStop(t *testing.T) {
controller, _, _ := makeTestController()
if len(controller.stopChannels) != 0 {
t.Errorf("Non-running provisioner should not have any stopChannels. Got %v", len(controller.stopChannels))
}
controller.Run()
if len(controller.stopChannels) != 2 {
t.Errorf("Running provisioner should have exactly 2 stopChannels. Got %v", len(controller.stopChannels))
}
controller.Stop()
if len(controller.stopChannels) != 0 {
t.Errorf("Non-running provisioner should not have any stopChannels. Got %v", len(controller.stopChannels))
}
}
func makeTestVolume() *api.PersistentVolume {
return &api.PersistentVolume{
ObjectMeta: api.ObjectMeta{
Annotations: map[string]string{},
Name: "pv01",
},
Spec: api.PersistentVolumeSpec{
PersistentVolumeReclaimPolicy: api.PersistentVolumeReclaimDelete,
AccessModes: []api.PersistentVolumeAccessMode{api.ReadWriteOnce},
Capacity: api.ResourceList{
api.ResourceName(api.ResourceStorage): resource.MustParse("10Gi"),
},
PersistentVolumeSource: api.PersistentVolumeSource{
HostPath: &api.HostPathVolumeSource{
Path: "/somepath/data01",
},
},
},
}
}
func makeTestClaim() *api.PersistentVolumeClaim {
return &api.PersistentVolumeClaim{
ObjectMeta: api.ObjectMeta{
Annotations: map[string]string{},
Name: "claim01",
Namespace: "ns",
SelfLink: testapi.Default.SelfLink("pvc", ""),
},
Spec: api.PersistentVolumeClaimSpec{
AccessModes: []api.PersistentVolumeAccessMode{api.ReadWriteOnce},
Resources: api.ResourceRequirements{
Requests: api.ResourceList{
api.ResourceName(api.ResourceStorage): resource.MustParse("8G"),
},
},
},
}
}
func makeTestController() (*PersistentVolumeProvisionerController, *mockControllerClient, *volumetest.FakeVolumePlugin) {
mockClient := &mockControllerClient{}
mockVolumePlugin := &volumetest.FakeVolumePlugin{}
controller, _ := NewPersistentVolumeProvisionerController(mockClient, 1*time.Second, "fake-kubernetes", nil, mockVolumePlugin, &fake_cloud.FakeCloud{})
return controller, mockClient, mockVolumePlugin
}
func TestReconcileClaim(t *testing.T) {
controller, mockClient, _ := makeTestController()
pvc := makeTestClaim()
// watch would have added the claim to the store
controller.claimStore.Add(pvc)
// store it in fake API server
mockClient.UpdatePersistentVolumeClaim(pvc)
err := controller.reconcileClaim(pvc)
if err != nil {
t.Errorf("Unexpected error: %v", err)
}
// non-provisionable PVC should not have created a volume on reconciliation
if mockClient.volume != nil {
t.Error("Unexpected volume found in mock client. Expected nil")
}
pvc.Annotations[qosProvisioningKey] = "foo"
// store it in fake API server
mockClient.UpdatePersistentVolumeClaim(pvc)
err = controller.reconcileClaim(pvc)
if err != nil {
t.Errorf("Unexpected error: %v", err)
}
// PVC requesting provisioning should have a PV created for it
if mockClient.volume == nil {
t.Error("Expected to find bound volume but got nil")
}
if mockClient.volume.Spec.ClaimRef.Name != pvc.Name {
t.Errorf("Expected PV to be bound to %s but got %s", mockClient.volume.Spec.ClaimRef.Name, pvc.Name)
}
// the PVC should have correct annotation
if mockClient.claim.Annotations[pvProvisioningRequiredAnnotationKey] != pvProvisioningCompletedAnnotationValue {
t.Errorf("Annotation %q not set", pvProvisioningRequiredAnnotationKey)
}
// Run the syncClaim 2nd time to simulate periodic sweep running in parallel
// to the previous syncClaim. There is a lock in handleUpdateVolume(), so
// they will be called sequentially, but the second call will have old
// version of the claim.
oldPVName := mockClient.volume.Name
// Make the "old" claim
pvc2 := makeTestClaim()
pvc2.Annotations[qosProvisioningKey] = "foo"
// Add a dummy annotation so we recognize the claim was updated (i.e.
// stored in mockClient)
pvc2.Annotations["test"] = "test"
err = controller.reconcileClaim(pvc2)
if err != nil {
t.Errorf("Unexpected error: %v", err)
}
// The 2nd PVC should be ignored, no new PV was created
if val, found := pvc2.Annotations[pvProvisioningRequiredAnnotationKey]; found {
t.Errorf("2nd PVC got unexpected annotation %q: %q", pvProvisioningRequiredAnnotationKey, val)
}
if mockClient.volume.Name != oldPVName {
t.Errorf("2nd PVC unexpectedly provisioned a new volume")
}
if _, found := mockClient.claim.Annotations["test"]; found {
t.Errorf("2nd PVC was unexpectedly updated")
}
}
func checkTagValue(t *testing.T, tags map[string]string, tag string, expectedValue string) {
value, found := tags[tag]
if !found || value != expectedValue {
t.Errorf("Expected tag value %s = %s but value %s found", tag, expectedValue, value)
}
}
func TestReconcileVolume(t *testing.T) {
controller, mockClient, mockVolumePlugin := makeTestController()
pv := makeTestVolume()
pvc := makeTestClaim()
mockClient.volume = pv
err := controller.reconcileVolume(pv)
if err != nil {
t.Errorf("Unexpected error %v", err)
}
// watch adds claim to the store.
// we need to add it to our mock client to mimic normal Get call
controller.claimStore.Add(pvc)
mockClient.claim = pvc
// pretend the claim and volume are bound, no provisioning required
claimRef, _ := api.GetReference(pvc)
pv.Spec.ClaimRef = claimRef
mockClient.volume = pv
err = controller.reconcileVolume(pv)
if err != nil {
t.Errorf("Unexpected error %v", err)
}
pv.Annotations[pvProvisioningRequiredAnnotationKey] = "!pvProvisioningCompleted"
pv.Annotations[qosProvisioningKey] = "foo"
mockClient.volume = pv
err = controller.reconcileVolume(pv)
if !isAnnotationMatch(pvProvisioningRequiredAnnotationKey, pvProvisioningCompletedAnnotationValue, mockClient.volume.Annotations) {
t.Errorf("Expected %s but got %s", pvProvisioningRequiredAnnotationKey, mockClient.volume.Annotations[pvProvisioningRequiredAnnotationKey])
}
// Check that the volume plugin was called with correct tags
tags := *mockVolumePlugin.LastProvisionerOptions.CloudTags
checkTagValue(t, tags, cloudVolumeCreatedForClaimNamespaceTag, pvc.Namespace)
checkTagValue(t, tags, cloudVolumeCreatedForClaimNameTag, pvc.Name)
checkTagValue(t, tags, cloudVolumeCreatedForVolumeNameTag, pv.Name)
}
var _ controllerClient = &mockControllerClient{}
type mockControllerClient struct {
volume *api.PersistentVolume
claim *api.PersistentVolumeClaim
}
func (c *mockControllerClient) GetPersistentVolume(name string) (*api.PersistentVolume, error) {
return c.volume, nil
}
func (c *mockControllerClient) CreatePersistentVolume(pv *api.PersistentVolume) (*api.PersistentVolume, error) {
if pv.GenerateName != "" && pv.Name == "" {
pv.Name = fmt.Sprintf(pv.GenerateName, util.NewUUID())
}
c.volume = pv
return c.volume, nil
}
func (c *mockControllerClient) ListPersistentVolumes(options api.ListOptions) (*api.PersistentVolumeList, error) {
return &api.PersistentVolumeList{
Items: []api.PersistentVolume{*c.volume},
}, nil
}
func (c *mockControllerClient) WatchPersistentVolumes(options api.ListOptions) (watch.Interface, error) {
return watch.NewFake(), nil
}
func (c *mockControllerClient) UpdatePersistentVolume(pv *api.PersistentVolume) (*api.PersistentVolume, error) {
return c.CreatePersistentVolume(pv)
}
func (c *mockControllerClient) DeletePersistentVolume(volume *api.PersistentVolume) error {
c.volume = nil
return nil
}
func (c *mockControllerClient) UpdatePersistentVolumeStatus(volume *api.PersistentVolume) (*api.PersistentVolume, error) {
return volume, nil
}
func (c *mockControllerClient) GetPersistentVolumeClaim(namespace, name string) (*api.PersistentVolumeClaim, error) {
if c.claim != nil {
return c.claim, nil
} else {
return nil, errors.NewNotFound(api.Resource("persistentvolumes"), name)
}
}
func (c *mockControllerClient) ListPersistentVolumeClaims(namespace string, options api.ListOptions) (*api.PersistentVolumeClaimList, error) {
return &api.PersistentVolumeClaimList{
Items: []api.PersistentVolumeClaim{*c.claim},
}, nil
}
func (c *mockControllerClient) WatchPersistentVolumeClaims(namespace string, options api.ListOptions) (watch.Interface, error) {
return watch.NewFake(), nil
}
func (c *mockControllerClient) UpdatePersistentVolumeClaim(claim *api.PersistentVolumeClaim) (*api.PersistentVolumeClaim, error) {
c.claim = claim
return c.claim, nil
}
func (c *mockControllerClient) UpdatePersistentVolumeClaimStatus(claim *api.PersistentVolumeClaim) (*api.PersistentVolumeClaim, error) {
return claim, nil
}
func (c *mockControllerClient) GetKubeClient() clientset.Interface {
return nil
}

View File

@ -1,415 +0,0 @@
/*
Copyright 2014 The Kubernetes Authors All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package persistentvolume
import (
"fmt"
"time"
"github.com/golang/glog"
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/client/cache"
clientset "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset"
"k8s.io/kubernetes/pkg/cloudprovider"
"k8s.io/kubernetes/pkg/controller/framework"
"k8s.io/kubernetes/pkg/runtime"
"k8s.io/kubernetes/pkg/types"
ioutil "k8s.io/kubernetes/pkg/util/io"
"k8s.io/kubernetes/pkg/util/metrics"
"k8s.io/kubernetes/pkg/util/mount"
"k8s.io/kubernetes/pkg/volume"
"k8s.io/kubernetes/pkg/watch"
)
var _ volume.VolumeHost = &PersistentVolumeRecycler{}
// PersistentVolumeRecycler is a controller that watches for PersistentVolumes that are released from their claims.
// This controller will Recycle those volumes whose reclaim policy is set to PersistentVolumeReclaimRecycle and make them
// available again for a new claim.
type PersistentVolumeRecycler struct {
volumeController *framework.Controller
stopChannel chan struct{}
client recyclerClient
kubeClient clientset.Interface
pluginMgr volume.VolumePluginMgr
cloud cloudprovider.Interface
maximumRetry int
syncPeriod time.Duration
// Local cache of failed recycle / delete operations. Map volume.Name -> status of the volume.
// Only PVs in Released state have an entry here.
releasedVolumes map[string]releasedVolumeStatus
}
// releasedVolumeStatus holds state of failed delete/recycle operation on a
// volume. The controller re-tries the operation several times and it stores
// retry count + timestamp of the last attempt here.
type releasedVolumeStatus struct {
// How many recycle/delete operations failed.
retryCount int
// Timestamp of the last attempt.
lastAttempt time.Time
}
// NewPersistentVolumeRecycler creates a new PersistentVolumeRecycler
func NewPersistentVolumeRecycler(kubeClient clientset.Interface, syncPeriod time.Duration, maximumRetry int, plugins []volume.VolumePlugin, cloud cloudprovider.Interface) (*PersistentVolumeRecycler, error) {
recyclerClient := NewRecyclerClient(kubeClient)
if kubeClient != nil && kubeClient.Core().GetRESTClient().GetRateLimiter() != nil {
metrics.RegisterMetricAndTrackRateLimiterUsage("pv_recycler_controller", kubeClient.Core().GetRESTClient().GetRateLimiter())
}
recycler := &PersistentVolumeRecycler{
client: recyclerClient,
kubeClient: kubeClient,
cloud: cloud,
maximumRetry: maximumRetry,
syncPeriod: syncPeriod,
releasedVolumes: make(map[string]releasedVolumeStatus),
}
if err := recycler.pluginMgr.InitPlugins(plugins, recycler); err != nil {
return nil, fmt.Errorf("Could not initialize volume plugins for PVClaimBinder: %+v", err)
}
_, volumeController := framework.NewInformer(
&cache.ListWatch{
ListFunc: func(options api.ListOptions) (runtime.Object, error) {
return kubeClient.Core().PersistentVolumes().List(options)
},
WatchFunc: func(options api.ListOptions) (watch.Interface, error) {
return kubeClient.Core().PersistentVolumes().Watch(options)
},
},
&api.PersistentVolume{},
syncPeriod,
framework.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
pv, ok := obj.(*api.PersistentVolume)
if !ok {
glog.Errorf("Error casting object to PersistentVolume: %v", obj)
return
}
recycler.reclaimVolume(pv)
},
UpdateFunc: func(oldObj, newObj interface{}) {
pv, ok := newObj.(*api.PersistentVolume)
if !ok {
glog.Errorf("Error casting object to PersistentVolume: %v", newObj)
return
}
recycler.reclaimVolume(pv)
},
DeleteFunc: func(obj interface{}) {
pv, ok := obj.(*api.PersistentVolume)
if !ok {
glog.Errorf("Error casting object to PersistentVolume: %v", obj)
return
}
recycler.reclaimVolume(pv)
recycler.removeReleasedVolume(pv)
},
},
)
recycler.volumeController = volumeController
return recycler, nil
}
// shouldRecycle checks a volume and returns nil, if the volume should be
// recycled right now. Otherwise it returns an error with reason why it should
// not be recycled.
func (recycler *PersistentVolumeRecycler) shouldRecycle(pv *api.PersistentVolume) error {
if pv.Spec.ClaimRef == nil {
return fmt.Errorf("Volume does not have a reference to claim")
}
if pv.Status.Phase != api.VolumeReleased {
return fmt.Errorf("The volume is not in 'Released' phase")
}
// The volume is Released, should we retry recycling?
status, found := recycler.releasedVolumes[pv.Name]
if !found {
// We don't know anything about this volume. The controller has been
// restarted or the volume has been marked as Released by another
// controller. Recycle/delete this volume as if it was just Released.
glog.V(5).Infof("PersistentVolume[%s] not found in local cache, recycling", pv.Name)
return nil
}
// Check the timestamp
expectedRetry := status.lastAttempt.Add(recycler.syncPeriod)
if time.Now().After(expectedRetry) {
glog.V(5).Infof("PersistentVolume[%s] retrying recycle after timeout", pv.Name)
return nil
}
// It's too early
glog.V(5).Infof("PersistentVolume[%s] skipping recycle, it's too early: now: %v, next retry: %v", pv.Name, time.Now(), expectedRetry)
return fmt.Errorf("Too early after previous failure")
}
func (recycler *PersistentVolumeRecycler) reclaimVolume(pv *api.PersistentVolume) error {
glog.V(5).Infof("Recycler: checking PersistentVolume[%s]\n", pv.Name)
// Always load the latest version of the volume
newPV, err := recycler.client.GetPersistentVolume(pv.Name)
if err != nil {
return fmt.Errorf("Could not find PersistentVolume %s", pv.Name)
}
pv = newPV
err = recycler.shouldRecycle(pv)
if err == nil {
glog.V(5).Infof("Reclaiming PersistentVolume[%s]\n", pv.Name)
// both handleRecycle and handleDelete block until completion
// TODO: allow parallel recycling operations to increase throughput
switch pv.Spec.PersistentVolumeReclaimPolicy {
case api.PersistentVolumeReclaimRecycle:
err = recycler.handleRecycle(pv)
case api.PersistentVolumeReclaimDelete:
err = recycler.handleDelete(pv)
case api.PersistentVolumeReclaimRetain:
glog.V(5).Infof("Volume %s is set to retain after release. Skipping.\n", pv.Name)
default:
err = fmt.Errorf("No PersistentVolumeReclaimPolicy defined for spec: %+v", pv)
}
if err != nil {
errMsg := fmt.Sprintf("Could not recycle volume spec: %+v", err)
glog.Errorf(errMsg)
return fmt.Errorf(errMsg)
}
return nil
}
glog.V(3).Infof("PersistentVolume[%s] phase %s - skipping: %v", pv.Name, pv.Status.Phase, err)
return nil
}
// handleReleaseFailure evaluates a failed Recycle/Delete operation, updates
// internal controller state with new nr. of attempts and timestamp of the last
// attempt. Based on the number of failures it returns the next state of the
// volume (Released / Failed).
func (recycler *PersistentVolumeRecycler) handleReleaseFailure(pv *api.PersistentVolume) api.PersistentVolumePhase {
status, found := recycler.releasedVolumes[pv.Name]
if !found {
// First failure, set retryCount to 0 (will be inceremented few lines below)
status = releasedVolumeStatus{}
}
status.retryCount += 1
if status.retryCount > recycler.maximumRetry {
// This was the last attempt. Remove any internal state and mark the
// volume as Failed.
glog.V(3).Infof("PersistentVolume[%s] failed %d times - marking Failed", pv.Name, status.retryCount)
recycler.removeReleasedVolume(pv)
return api.VolumeFailed
}
status.lastAttempt = time.Now()
recycler.releasedVolumes[pv.Name] = status
return api.VolumeReleased
}
func (recycler *PersistentVolumeRecycler) removeReleasedVolume(pv *api.PersistentVolume) {
delete(recycler.releasedVolumes, pv.Name)
}
func (recycler *PersistentVolumeRecycler) handleRecycle(pv *api.PersistentVolume) error {
glog.V(5).Infof("Recycling PersistentVolume[%s]\n", pv.Name)
currentPhase := pv.Status.Phase
nextPhase := currentPhase
spec := volume.NewSpecFromPersistentVolume(pv, false)
plugin, err := recycler.pluginMgr.FindRecyclablePluginBySpec(spec)
if err != nil {
nextPhase = api.VolumeFailed
pv.Status.Message = fmt.Sprintf("%v", err)
}
// an error above means a suitable plugin for this volume was not found.
// we don't need to attempt recycling when plugin is nil, but we do need to persist the next/failed phase
// of the volume so that subsequent syncs won't attempt recycling through this handler func.
if plugin != nil {
volRecycler, err := plugin.NewRecycler(spec)
if err != nil {
return fmt.Errorf("Could not obtain Recycler for spec: %#v error: %v", spec, err)
}
// blocks until completion
if err := volRecycler.Recycle(); err != nil {
glog.Errorf("PersistentVolume[%s] failed recycling: %+v", pv.Name, err)
pv.Status.Message = fmt.Sprintf("Recycling error: %s", err)
nextPhase = recycler.handleReleaseFailure(pv)
} else {
glog.V(5).Infof("PersistentVolume[%s] successfully recycled\n", pv.Name)
// The volume has been recycled. Remove any internal state to make
// any subsequent bind+recycle cycle working.
recycler.removeReleasedVolume(pv)
nextPhase = api.VolumePending
}
}
if currentPhase != nextPhase {
glog.V(5).Infof("PersistentVolume[%s] changing phase from %s to %s\n", pv.Name, currentPhase, nextPhase)
pv.Status.Phase = nextPhase
_, err := recycler.client.UpdatePersistentVolumeStatus(pv)
if err != nil {
// Rollback to previous phase
pv.Status.Phase = currentPhase
}
}
return nil
}
func (recycler *PersistentVolumeRecycler) handleDelete(pv *api.PersistentVolume) error {
glog.V(5).Infof("Deleting PersistentVolume[%s]\n", pv.Name)
currentPhase := pv.Status.Phase
nextPhase := currentPhase
spec := volume.NewSpecFromPersistentVolume(pv, false)
plugin, err := recycler.pluginMgr.FindDeletablePluginBySpec(spec)
if err != nil {
nextPhase = api.VolumeFailed
pv.Status.Message = fmt.Sprintf("%v", err)
}
// an error above means a suitable plugin for this volume was not found.
// we don't need to attempt deleting when plugin is nil, but we do need to persist the next/failed phase
// of the volume so that subsequent syncs won't attempt deletion through this handler func.
if plugin != nil {
deleter, err := plugin.NewDeleter(spec)
if err != nil {
return fmt.Errorf("Could not obtain Deleter for spec: %#v error: %v", spec, err)
}
// blocks until completion
err = deleter.Delete()
if err != nil {
glog.Errorf("PersistentVolume[%s] failed deletion: %+v", pv.Name, err)
pv.Status.Message = fmt.Sprintf("Deletion error: %s", err)
nextPhase = recycler.handleReleaseFailure(pv)
} else {
glog.V(5).Infof("PersistentVolume[%s] successfully deleted through plugin\n", pv.Name)
recycler.removeReleasedVolume(pv)
// after successful deletion through the plugin, we can also remove the PV from the cluster
if err := recycler.client.DeletePersistentVolume(pv); err != nil {
return fmt.Errorf("error deleting persistent volume: %+v", err)
}
}
}
if currentPhase != nextPhase {
glog.V(5).Infof("PersistentVolume[%s] changing phase from %s to %s\n", pv.Name, currentPhase, nextPhase)
pv.Status.Phase = nextPhase
_, err := recycler.client.UpdatePersistentVolumeStatus(pv)
if err != nil {
// Rollback to previous phase
pv.Status.Phase = currentPhase
}
}
return nil
}
// Run starts this recycler's control loops
func (recycler *PersistentVolumeRecycler) Run() {
glog.V(5).Infof("Starting PersistentVolumeRecycler\n")
if recycler.stopChannel == nil {
recycler.stopChannel = make(chan struct{})
go recycler.volumeController.Run(recycler.stopChannel)
}
}
// Stop gracefully shuts down this binder
func (recycler *PersistentVolumeRecycler) Stop() {
glog.V(5).Infof("Stopping PersistentVolumeRecycler\n")
if recycler.stopChannel != nil {
close(recycler.stopChannel)
recycler.stopChannel = nil
}
}
// recyclerClient abstracts access to PVs
type recyclerClient interface {
GetPersistentVolume(name string) (*api.PersistentVolume, error)
UpdatePersistentVolume(volume *api.PersistentVolume) (*api.PersistentVolume, error)
DeletePersistentVolume(volume *api.PersistentVolume) error
UpdatePersistentVolumeStatus(volume *api.PersistentVolume) (*api.PersistentVolume, error)
}
func NewRecyclerClient(c clientset.Interface) recyclerClient {
return &realRecyclerClient{c}
}
type realRecyclerClient struct {
client clientset.Interface
}
func (c *realRecyclerClient) GetPersistentVolume(name string) (*api.PersistentVolume, error) {
return c.client.Core().PersistentVolumes().Get(name)
}
func (c *realRecyclerClient) UpdatePersistentVolume(volume *api.PersistentVolume) (*api.PersistentVolume, error) {
return c.client.Core().PersistentVolumes().Update(volume)
}
func (c *realRecyclerClient) DeletePersistentVolume(volume *api.PersistentVolume) error {
return c.client.Core().PersistentVolumes().Delete(volume.Name, nil)
}
func (c *realRecyclerClient) UpdatePersistentVolumeStatus(volume *api.PersistentVolume) (*api.PersistentVolume, error) {
return c.client.Core().PersistentVolumes().UpdateStatus(volume)
}
// PersistentVolumeRecycler is host to the volume plugins, but does not actually mount any volumes.
// Because no mounting is performed, most of the VolumeHost methods are not implemented.
func (f *PersistentVolumeRecycler) GetPluginDir(podUID string) string {
return ""
}
func (f *PersistentVolumeRecycler) GetPodVolumeDir(podUID types.UID, pluginName, volumeName string) string {
return ""
}
func (f *PersistentVolumeRecycler) GetPodPluginDir(podUID types.UID, pluginName string) string {
return ""
}
func (f *PersistentVolumeRecycler) GetKubeClient() clientset.Interface {
return f.kubeClient
}
func (f *PersistentVolumeRecycler) NewWrapperMounter(volName string, spec volume.Spec, pod *api.Pod, opts volume.VolumeOptions) (volume.Mounter, error) {
return nil, fmt.Errorf("NewWrapperMounter not supported by PVClaimBinder's VolumeHost implementation")
}
func (f *PersistentVolumeRecycler) NewWrapperUnmounter(volName string, spec volume.Spec, podUID types.UID) (volume.Unmounter, error) {
return nil, fmt.Errorf("NewWrapperUnmounter not supported by PVClaimBinder's VolumeHost implementation")
}
func (f *PersistentVolumeRecycler) GetCloudProvider() cloudprovider.Interface {
return f.cloud
}
func (f *PersistentVolumeRecycler) GetMounter() mount.Interface {
return nil
}
func (f *PersistentVolumeRecycler) GetWriter() ioutil.Writer {
return nil
}
func (f *PersistentVolumeRecycler) GetHostName() string {
return ""
}

View File

@ -1,265 +0,0 @@
/*
Copyright 2015 The Kubernetes Authors All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package persistentvolume
import (
"fmt"
"testing"
"time"
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/api/resource"
"k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset/fake"
"k8s.io/kubernetes/pkg/volume"
"k8s.io/kubernetes/pkg/volume/host_path"
volumetest "k8s.io/kubernetes/pkg/volume/testing"
)
const (
mySyncPeriod = 2 * time.Second
myMaximumRetry = 3
)
func TestFailedRecycling(t *testing.T) {
pv := preparePV()
mockClient := &mockBinderClient{
volume: pv,
}
// no Init called for pluginMgr and no plugins are available. Volume should fail recycling.
plugMgr := volume.VolumePluginMgr{}
recycler := &PersistentVolumeRecycler{
kubeClient: fake.NewSimpleClientset(),
client: mockClient,
pluginMgr: plugMgr,
releasedVolumes: make(map[string]releasedVolumeStatus),
}
err := recycler.reclaimVolume(pv)
if err != nil {
t.Errorf("Unexpected non-nil error: %v", err)
}
if mockClient.volume.Status.Phase != api.VolumeFailed {
t.Errorf("Expected %s but got %s", api.VolumeFailed, mockClient.volume.Status.Phase)
}
// Use a new volume for the next test
pv = preparePV()
mockClient.volume = pv
pv.Spec.PersistentVolumeReclaimPolicy = api.PersistentVolumeReclaimDelete
err = recycler.reclaimVolume(pv)
if err != nil {
t.Errorf("Unexpected non-nil error: %v", err)
}
if mockClient.volume.Status.Phase != api.VolumeFailed {
t.Errorf("Expected %s but got %s", api.VolumeFailed, mockClient.volume.Status.Phase)
}
}
func TestRecyclingRetry(t *testing.T) {
// Test that recycler controller retries to recycle a volume several times, which succeeds eventually
pv := preparePV()
mockClient := &mockBinderClient{
volume: pv,
}
plugMgr := volume.VolumePluginMgr{}
// Use a fake NewRecycler function
plugMgr.InitPlugins(host_path.ProbeRecyclableVolumePlugins(newFailingMockRecycler, volume.VolumeConfig{}), volumetest.NewFakeVolumeHost("/tmp/fake", nil, nil))
// Reset a global call counter
failedCallCount = 0
recycler := &PersistentVolumeRecycler{
kubeClient: fake.NewSimpleClientset(),
client: mockClient,
pluginMgr: plugMgr,
syncPeriod: mySyncPeriod,
maximumRetry: myMaximumRetry,
releasedVolumes: make(map[string]releasedVolumeStatus),
}
// All but the last attempt will fail
testRecycleFailures(t, recycler, mockClient, pv, myMaximumRetry-1)
// The last attempt should succeed
err := recycler.reclaimVolume(pv)
if err != nil {
t.Errorf("Last step: Recycler failed: %v", err)
}
if mockClient.volume.Status.Phase != api.VolumePending {
t.Errorf("Last step: The volume should be Pending, but is %s instead", mockClient.volume.Status.Phase)
}
// Check the cache, it should not have any entry
status, found := recycler.releasedVolumes[pv.Name]
if found {
t.Errorf("Last step: Expected PV to be removed from cache, got %v", status)
}
}
func TestRecyclingRetryAlwaysFail(t *testing.T) {
// Test that recycler controller retries to recycle a volume several times, which always fails.
pv := preparePV()
mockClient := &mockBinderClient{
volume: pv,
}
plugMgr := volume.VolumePluginMgr{}
// Use a fake NewRecycler function
plugMgr.InitPlugins(host_path.ProbeRecyclableVolumePlugins(newAlwaysFailingMockRecycler, volume.VolumeConfig{}), volumetest.NewFakeVolumeHost("/tmp/fake", nil, nil))
// Reset a global call counter
failedCallCount = 0
recycler := &PersistentVolumeRecycler{
kubeClient: fake.NewSimpleClientset(),
client: mockClient,
pluginMgr: plugMgr,
syncPeriod: mySyncPeriod,
maximumRetry: myMaximumRetry,
releasedVolumes: make(map[string]releasedVolumeStatus),
}
// myMaximumRetry recycle attempts will fail
testRecycleFailures(t, recycler, mockClient, pv, myMaximumRetry)
// The volume should be failed after myMaximumRetry attempts
err := recycler.reclaimVolume(pv)
if err != nil {
t.Errorf("Last step: Recycler failed: %v", err)
}
if mockClient.volume.Status.Phase != api.VolumeFailed {
t.Errorf("Last step: The volume should be Failed, but is %s instead", mockClient.volume.Status.Phase)
}
// Check the cache, it should not have any entry
status, found := recycler.releasedVolumes[pv.Name]
if found {
t.Errorf("Last step: Expected PV to be removed from cache, got %v", status)
}
}
func preparePV() *api.PersistentVolume {
return &api.PersistentVolume{
Spec: api.PersistentVolumeSpec{
AccessModes: []api.PersistentVolumeAccessMode{api.ReadWriteOnce},
Capacity: api.ResourceList{
api.ResourceName(api.ResourceStorage): resource.MustParse("8Gi"),
},
PersistentVolumeSource: api.PersistentVolumeSource{
HostPath: &api.HostPathVolumeSource{
Path: "/tmp/data02",
},
},
PersistentVolumeReclaimPolicy: api.PersistentVolumeReclaimRecycle,
ClaimRef: &api.ObjectReference{
Name: "foo",
Namespace: "bar",
},
},
Status: api.PersistentVolumeStatus{
Phase: api.VolumeReleased,
},
}
}
// Test that `count` attempts to recycle a PV fails.
func testRecycleFailures(t *testing.T, recycler *PersistentVolumeRecycler, mockClient *mockBinderClient, pv *api.PersistentVolume, count int) {
for i := 1; i <= count; i++ {
err := recycler.reclaimVolume(pv)
if err != nil {
t.Errorf("STEP %d: Recycler faled: %v", i, err)
}
// Check the status, it should be failed
if mockClient.volume.Status.Phase != api.VolumeReleased {
t.Errorf("STEP %d: The volume should be Released, but is %s instead", i, mockClient.volume.Status.Phase)
}
// Check the failed volume cache
status, found := recycler.releasedVolumes[pv.Name]
if !found {
t.Errorf("STEP %d: cannot find released volume status", i)
}
if status.retryCount != i {
t.Errorf("STEP %d: Expected nr. of attempts to be %d, got %d", i, i, status.retryCount)
}
// call reclaimVolume too early, it should not increment the retryCount
time.Sleep(mySyncPeriod / 2)
err = recycler.reclaimVolume(pv)
if err != nil {
t.Errorf("STEP %d: Recycler failed: %v", i, err)
}
status, found = recycler.releasedVolumes[pv.Name]
if !found {
t.Errorf("STEP %d: cannot find released volume status", i)
}
if status.retryCount != i {
t.Errorf("STEP %d: Expected nr. of attempts to be %d, got %d", i, i, status.retryCount)
}
// Call the next reclaimVolume() after full pvRecycleRetryPeriod
time.Sleep(mySyncPeriod / 2)
}
}
func newFailingMockRecycler(spec *volume.Spec, host volume.VolumeHost, config volume.VolumeConfig) (volume.Recycler, error) {
return &failingMockRecycler{
path: spec.PersistentVolume.Spec.HostPath.Path,
errorCount: myMaximumRetry - 1, // fail two times and then successfully recycle the volume
}, nil
}
func newAlwaysFailingMockRecycler(spec *volume.Spec, host volume.VolumeHost, config volume.VolumeConfig) (volume.Recycler, error) {
return &failingMockRecycler{
path: spec.PersistentVolume.Spec.HostPath.Path,
errorCount: 1000, // always fail
}, nil
}
type failingMockRecycler struct {
path string
// How many times should the recycler fail before returning success.
errorCount int
volume.MetricsNil
}
// Counter of failingMockRecycler.Recycle() calls. Global variable just for
// testing. It's too much code to create a custom volume plugin, which would
// hold this variable.
var failedCallCount = 0
func (r *failingMockRecycler) GetPath() string {
return r.path
}
func (r *failingMockRecycler) Recycle() error {
failedCallCount += 1
if failedCallCount <= r.errorCount {
return fmt.Errorf("Failing for %d. time", failedCallCount)
}
// return nil means recycle passed
return nil
}