From deec5f26cd068dfe7a5b5fee7ff27801181070f6 Mon Sep 17 00:00:00 2001 From: markturansky Date: Fri, 29 May 2015 16:34:32 -0400 Subject: [PATCH] Recycler controller --- .../app/controllermanager.go | 5 + cmd/kube-controller-manager/app/plugins.go | 19 ++ .../persistent_volume_claim_binder.go | 60 ++++- .../persistent_volume_claim_binder_test.go | 63 +++++ .../persistent_volume_recycler.go | 231 ++++++++++++++++++ test/e2e/persistent_volumes.go | 200 +++++++++++++++ 6 files changed, 571 insertions(+), 7 deletions(-) create mode 100644 pkg/volumeclaimbinder/persistent_volume_recycler.go create mode 100644 test/e2e/persistent_volumes.go diff --git a/cmd/kube-controller-manager/app/controllermanager.go b/cmd/kube-controller-manager/app/controllermanager.go index 79ca36491b3..8d68d457aab 100644 --- a/cmd/kube-controller-manager/app/controllermanager.go +++ b/cmd/kube-controller-manager/app/controllermanager.go @@ -234,6 +234,11 @@ func (s *CMServer) Run(_ []string) error { pvclaimBinder := volumeclaimbinder.NewPersistentVolumeClaimBinder(kubeClient, s.PVClaimBinderSyncPeriod) pvclaimBinder.Run() + pvRecycler, err := volumeclaimbinder.NewPersistentVolumeRecycler(kubeClient, s.PVClaimBinderSyncPeriod, ProbeRecyclableVolumePlugins()) + if err != nil { + glog.Fatalf("Failed to start persistent volume recycler: %+v", err) + } + pvRecycler.Run() if len(s.ServiceAccountKeyFile) > 0 { privateKey, err := serviceaccount.ReadPrivateKey(s.ServiceAccountKeyFile) diff --git a/cmd/kube-controller-manager/app/plugins.go b/cmd/kube-controller-manager/app/plugins.go index 7f8cc61f5f6..cd6b1649b83 100644 --- a/cmd/kube-controller-manager/app/plugins.go +++ b/cmd/kube-controller-manager/app/plugins.go @@ -20,6 +20,8 @@ import ( // This file exists to force the desired plugin implementations to be linked. // This should probably be part of some configuration fed into the build for a // given binary target. + + //Cloud providers _ "github.com/GoogleCloudPlatform/kubernetes/pkg/cloudprovider/aws" _ "github.com/GoogleCloudPlatform/kubernetes/pkg/cloudprovider/gce" _ "github.com/GoogleCloudPlatform/kubernetes/pkg/cloudprovider/mesos" @@ -27,4 +29,21 @@ import ( _ "github.com/GoogleCloudPlatform/kubernetes/pkg/cloudprovider/ovirt" _ "github.com/GoogleCloudPlatform/kubernetes/pkg/cloudprovider/rackspace" _ "github.com/GoogleCloudPlatform/kubernetes/pkg/cloudprovider/vagrant" + + // Volume plugins + "github.com/GoogleCloudPlatform/kubernetes/pkg/volume" + "github.com/GoogleCloudPlatform/kubernetes/pkg/volume/host_path" + "github.com/GoogleCloudPlatform/kubernetes/pkg/volume/nfs" ) + +// ProbeRecyclableVolumePlugins collects all persistent volume plugins into an easy to use list. +func ProbeRecyclableVolumePlugins() []volume.VolumePlugin { + allPlugins := []volume.VolumePlugin{} + + // The list of plugins to probe is decided by the kubelet binary, not + // by dynamic linking or other "magic". Plugins will be analyzed and + // initialized later. + allPlugins = append(allPlugins, host_path.ProbeVolumePlugins()...) + allPlugins = append(allPlugins, nfs.ProbeVolumePlugins()...) + return allPlugins +} diff --git a/pkg/volumeclaimbinder/persistent_volume_claim_binder.go b/pkg/volumeclaimbinder/persistent_volume_claim_binder.go index ecfd736d229..6e89292eedf 100644 --- a/pkg/volumeclaimbinder/persistent_volume_claim_binder.go +++ b/pkg/volumeclaimbinder/persistent_volume_claim_binder.go @@ -99,7 +99,10 @@ func (binder *PersistentVolumeClaimBinder) addVolume(obj interface{}) { binder.lock.Lock() defer binder.lock.Unlock() volume := obj.(*api.PersistentVolume) - syncVolume(binder.volumeIndex, binder.client, volume) + err := syncVolume(binder.volumeIndex, binder.client, volume) + if err != nil { + glog.Errorf("PVClaimBinder could not add volume %s: %+v", volume.Name, err) + } } func (binder *PersistentVolumeClaimBinder) updateVolume(oldObj, newObj interface{}) { @@ -107,7 +110,10 @@ func (binder *PersistentVolumeClaimBinder) updateVolume(oldObj, newObj interface defer binder.lock.Unlock() newVolume := newObj.(*api.PersistentVolume) binder.volumeIndex.Update(newVolume) - syncVolume(binder.volumeIndex, binder.client, newVolume) + err := syncVolume(binder.volumeIndex, binder.client, newVolume) + if err != nil { + glog.Errorf("PVClaimBinder could not update volume %s: %+v", newVolume.Name, err) + } } func (binder *PersistentVolumeClaimBinder) deleteVolume(obj interface{}) { @@ -121,18 +127,24 @@ func (binder *PersistentVolumeClaimBinder) addClaim(obj interface{}) { binder.lock.Lock() defer binder.lock.Unlock() claim := obj.(*api.PersistentVolumeClaim) - syncClaim(binder.volumeIndex, binder.client, claim) + err := syncClaim(binder.volumeIndex, binder.client, claim) + if err != nil { + glog.Errorf("PVClaimBinder could not add claim %s: %+v", claim.Name, err) + } } func (binder *PersistentVolumeClaimBinder) updateClaim(oldObj, newObj interface{}) { binder.lock.Lock() defer binder.lock.Unlock() newClaim := newObj.(*api.PersistentVolumeClaim) - syncClaim(binder.volumeIndex, binder.client, newClaim) + err := syncClaim(binder.volumeIndex, binder.client, newClaim) + if err != nil { + glog.Errorf("PVClaimBinder could not update claim %s: %+v", newClaim.Name, err) + } } func syncVolume(volumeIndex *persistentVolumeOrderedIndex, binderClient binderClient, volume *api.PersistentVolume) (err error) { - glog.V(5).Infof("Synchronizing PersistentVolume[%s]\n", volume.Name) + glog.V(5).Infof("Synchronizing PersistentVolume[%s], current phase: %s\n", volume.Name, volume.Status.Phase) // volumes can be in one of the following states: // @@ -140,12 +152,28 @@ func syncVolume(volumeIndex *persistentVolumeOrderedIndex, binderClient binderCl // VolumeAvailable -- not bound to a claim, but processed at least once and found in this controller's volumeIndex. // VolumeBound -- bound to a claim because volume.Spec.ClaimRef != nil. Claim status may not be correct. // VolumeReleased -- volume.Spec.ClaimRef != nil but the claim has been deleted by the user. + // VolumeFailed -- volume.Spec.ClaimRef != nil and the volume failed processing in the recycler currentPhase := volume.Status.Phase nextPhase := currentPhase switch currentPhase { // pending volumes are available only after indexing in order to be matched to claims. case api.VolumePending: + if volume.Spec.ClaimRef != nil { + // Pending volumes that have a ClaimRef were recently recycled. The Recycler set the phase to VolumePending + // to start the volume again at the beginning of this lifecycle. + // ClaimRef is the last bind between persistent volume and claim. + // The claim has already been deleted by the user at this point + oldClaimRef := volume.Spec.ClaimRef + volume.Spec.ClaimRef = nil + _, err = binderClient.UpdatePersistentVolume(volume) + if err != nil { + // rollback on error, keep the ClaimRef until we can successfully update the volume + volume.Spec.ClaimRef = oldClaimRef + return fmt.Errorf("Unexpected error saving PersistentVolume: %+v", err) + } + } + _, exists, err := volumeIndex.Get(volume) if err != nil { return err @@ -170,10 +198,11 @@ func syncVolume(volumeIndex *persistentVolumeOrderedIndex, binderClient binderCl } } } + //bound volumes require verification of their bound claims case api.VolumeBound: if volume.Spec.ClaimRef == nil { - return fmt.Errorf("PersistentVolume[%s] expected to be bound but found nil claimRef: %+v", volume) + return fmt.Errorf("PersistentVolume[%s] expected to be bound but found nil claimRef: %+v", volume.Name, volume) } else { _, err := binderClient.GetPersistentVolumeClaim(volume.Spec.ClaimRef.Namespace, volume.Spec.ClaimRef.Name) if err != nil { @@ -184,12 +213,22 @@ func syncVolume(volumeIndex *persistentVolumeOrderedIndex, binderClient binderCl } } } + // released volumes require recycling case api.VolumeReleased: + if volume.Spec.ClaimRef == nil { + return fmt.Errorf("PersistentVolume[%s] expected to be bound but found nil claimRef: %+v", volume.Name, volume) + } else { + // another process is watching for released volumes. + // PersistentVolumeReclaimPolicy is set per PersistentVolume + } + + // volumes are removed by processes external to this binder and must be removed from the cluster + case api.VolumeFailed: if volume.Spec.ClaimRef == nil { return fmt.Errorf("PersistentVolume[%s] expected to be bound but found nil claimRef: %+v", volume) } else { - // TODO: implement Recycle method on plugins + glog.V(5).Infof("PersistentVolume[%s] previously failed recycling. Skipping.\n", volume.Name) } } @@ -198,6 +237,7 @@ func syncVolume(volumeIndex *persistentVolumeOrderedIndex, binderClient binderCl // a change in state will trigger another update through this controller. // each pass through this controller evaluates current phase and decides whether or not to change to the next phase + glog.V(5).Infof("PersistentVolume[%s] changing phase from %s to %s\n", volume.Name, currentPhase, nextPhase) volume, err := binderClient.UpdatePersistentVolumeStatus(volume) if err != nil { // Rollback to previous phase @@ -254,6 +294,7 @@ func syncClaim(volumeIndex *persistentVolumeOrderedIndex, binderClient binderCli } if volume.Spec.ClaimRef == nil { + glog.V(5).Infof("Rebuilding bind on pv.Spec.ClaimRef\n") claimRef, err := api.GetReference(claim) if err != nil { return fmt.Errorf("Unexpected error getting claim reference: %v\n", err) @@ -318,6 +359,7 @@ func (controller *PersistentVolumeClaimBinder) Stop() { type binderClient interface { GetPersistentVolume(name string) (*api.PersistentVolume, error) UpdatePersistentVolume(volume *api.PersistentVolume) (*api.PersistentVolume, error) + DeletePersistentVolume(volume *api.PersistentVolume) error UpdatePersistentVolumeStatus(volume *api.PersistentVolume) (*api.PersistentVolume, error) GetPersistentVolumeClaim(namespace, name string) (*api.PersistentVolumeClaim, error) UpdatePersistentVolumeClaim(claim *api.PersistentVolumeClaim) (*api.PersistentVolumeClaim, error) @@ -340,6 +382,10 @@ func (c *realBinderClient) UpdatePersistentVolume(volume *api.PersistentVolume) return c.client.PersistentVolumes().Update(volume) } +func (c *realBinderClient) DeletePersistentVolume(volume *api.PersistentVolume) error { + return c.client.PersistentVolumes().Delete(volume.Name) +} + func (c *realBinderClient) UpdatePersistentVolumeStatus(volume *api.PersistentVolume) (*api.PersistentVolume, error) { return c.client.PersistentVolumes().UpdateStatus(volume) } diff --git a/pkg/volumeclaimbinder/persistent_volume_claim_binder_test.go b/pkg/volumeclaimbinder/persistent_volume_claim_binder_test.go index cd21f0c942c..4a2f3229fa3 100644 --- a/pkg/volumeclaimbinder/persistent_volume_claim_binder_test.go +++ b/pkg/volumeclaimbinder/persistent_volume_claim_binder_test.go @@ -26,6 +26,8 @@ import ( "github.com/GoogleCloudPlatform/kubernetes/pkg/api/latest" "github.com/GoogleCloudPlatform/kubernetes/pkg/api/resource" "github.com/GoogleCloudPlatform/kubernetes/pkg/client/testclient" + "github.com/GoogleCloudPlatform/kubernetes/pkg/volume" + "github.com/GoogleCloudPlatform/kubernetes/pkg/volume/host_path" ) func TestRunStop(t *testing.T) { @@ -105,6 +107,7 @@ func TestExampleObjects(t *testing.T) { Path: "/tmp/data02", }, }, + PersistentVolumeReclaimPolicy: api.PersistentVolumeReclaimRecycle, }, }, }, @@ -179,6 +182,7 @@ func TestBindingWithExamples(t *testing.T) { client := &testclient.Fake{ReactFn: testclient.ObjectReaction(o, latest.RESTMapper)} pv, err := client.PersistentVolumes().Get("any") + pv.Spec.PersistentVolumeReclaimPolicy = api.PersistentVolumeReclaimRecycle if err != nil { t.Error("Unexpected error getting PV from client: %v", err) } @@ -194,6 +198,15 @@ func TestBindingWithExamples(t *testing.T) { claim: claim, } + plugMgr := volume.VolumePluginMgr{} + plugMgr.InitPlugins(host_path.ProbeRecyclableVolumePlugins(newMockRecycler), volume.NewFakeVolumeHost("/tmp/fake", nil, nil)) + + recycler := &PersistentVolumeRecycler{ + kubeClient: client, + client: mockClient, + pluginMgr: plugMgr, + } + // adds the volume to the index, making the volume available syncVolume(volumeIndex, mockClient, pv) if pv.Status.Phase != api.VolumeAvailable { @@ -232,6 +245,31 @@ func TestBindingWithExamples(t *testing.T) { if pv.Status.Phase != api.VolumeReleased { t.Errorf("Expected phase %s but got %s", api.VolumeReleased, pv.Status.Phase) } + if pv.Spec.ClaimRef == nil { + t.Errorf("Expected non-nil ClaimRef: %+v", pv.Spec) + } + + mockClient.volume = pv + + // released volumes with a PersistentVolumeReclaimPolicy (recycle/delete) can have further processing + err = recycler.reclaimVolume(pv) + if err != nil { + t.Errorf("Unexpected error reclaiming volume: %+v", err) + } + if pv.Status.Phase != api.VolumePending { + t.Errorf("Expected phase %s but got %s", api.VolumePending, pv.Status.Phase) + } + + // after the recycling changes the phase to Pending, the binder picks up again + // to remove any vestiges of binding and make the volume Available again + syncVolume(volumeIndex, mockClient, pv) + + if pv.Status.Phase != api.VolumeAvailable { + t.Errorf("Expected phase %s but got %s", api.VolumeAvailable, pv.Status.Phase) + } + if pv.Spec.ClaimRef != nil { + t.Errorf("Expected nil ClaimRef: %+v", pv.Spec) + } } type mockBinderClient struct { @@ -247,6 +285,11 @@ func (c *mockBinderClient) UpdatePersistentVolume(volume *api.PersistentVolume) return volume, nil } +func (c *mockBinderClient) DeletePersistentVolume(volume *api.PersistentVolume) error { + c.volume = nil + return nil +} + func (c *mockBinderClient) UpdatePersistentVolumeStatus(volume *api.PersistentVolume) (*api.PersistentVolume, error) { return volume, nil } @@ -266,3 +309,23 @@ func (c *mockBinderClient) UpdatePersistentVolumeClaim(claim *api.PersistentVolu func (c *mockBinderClient) UpdatePersistentVolumeClaimStatus(claim *api.PersistentVolumeClaim) (*api.PersistentVolumeClaim, error) { return claim, nil } + +func newMockRecycler(spec *volume.Spec, host volume.VolumeHost) (volume.Recycler, error) { + return &mockRecycler{ + path: spec.PersistentVolumeSource.HostPath.Path, + }, nil +} + +type mockRecycler struct { + path string + host volume.VolumeHost +} + +func (r *mockRecycler) GetPath() string { + return r.path +} + +func (r *mockRecycler) Recycle() error { + // return nil means recycle passed + return nil +} diff --git a/pkg/volumeclaimbinder/persistent_volume_recycler.go b/pkg/volumeclaimbinder/persistent_volume_recycler.go new file mode 100644 index 00000000000..c1b6bff0f22 --- /dev/null +++ b/pkg/volumeclaimbinder/persistent_volume_recycler.go @@ -0,0 +1,231 @@ +/* +Copyright 2014 The Kubernetes Authors All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package volumeclaimbinder + +import ( + "fmt" + "time" + + "github.com/GoogleCloudPlatform/kubernetes/pkg/api" + "github.com/GoogleCloudPlatform/kubernetes/pkg/client" + "github.com/GoogleCloudPlatform/kubernetes/pkg/client/cache" + "github.com/GoogleCloudPlatform/kubernetes/pkg/controller/framework" + "github.com/GoogleCloudPlatform/kubernetes/pkg/fields" + "github.com/GoogleCloudPlatform/kubernetes/pkg/labels" + "github.com/GoogleCloudPlatform/kubernetes/pkg/runtime" + "github.com/GoogleCloudPlatform/kubernetes/pkg/volume" + "github.com/GoogleCloudPlatform/kubernetes/pkg/watch" + + "github.com/GoogleCloudPlatform/kubernetes/pkg/types" + "github.com/GoogleCloudPlatform/kubernetes/pkg/util/mount" + "github.com/golang/glog" +) + +// PersistentVolumeRecycler is a controller that watches for PersistentVolumes that are released from their claims. +// This controller will Recycle those volumes whose reclaim policy is set to PersistentVolumeReclaimRecycle and make them +// available again for a new claim. +type PersistentVolumeRecycler struct { + volumeController *framework.Controller + stopChannel chan struct{} + client recyclerClient + kubeClient client.Interface + pluginMgr volume.VolumePluginMgr +} + +// PersistentVolumeRecycler creates a new PersistentVolumeRecycler +func NewPersistentVolumeRecycler(kubeClient client.Interface, syncPeriod time.Duration, plugins []volume.VolumePlugin) (*PersistentVolumeRecycler, error) { + recyclerClient := NewRecyclerClient(kubeClient) + recycler := &PersistentVolumeRecycler{ + client: recyclerClient, + kubeClient: kubeClient, + } + + if err := recycler.pluginMgr.InitPlugins(plugins, recycler); err != nil { + return nil, fmt.Errorf("Could not initialize volume plugins for PVClaimBinder: %+v", err) + } + + _, volumeController := framework.NewInformer( + &cache.ListWatch{ + ListFunc: func() (runtime.Object, error) { + return kubeClient.PersistentVolumes().List(labels.Everything(), fields.Everything()) + }, + WatchFunc: func(resourceVersion string) (watch.Interface, error) { + return kubeClient.PersistentVolumes().Watch(labels.Everything(), fields.Everything(), resourceVersion) + }, + }, + &api.PersistentVolume{}, + syncPeriod, + framework.ResourceEventHandlerFuncs{ + AddFunc: func(obj interface{}) { + pv := obj.(*api.PersistentVolume) + recycler.reclaimVolume(pv) + }, + UpdateFunc: func(oldObj, newObj interface{}) { + pv := newObj.(*api.PersistentVolume) + recycler.reclaimVolume(pv) + }, + }, + ) + + recycler.volumeController = volumeController + return recycler, nil +} + +func (recycler *PersistentVolumeRecycler) reclaimVolume(pv *api.PersistentVolume) error { + if pv.Status.Phase == api.VolumeReleased && pv.Spec.ClaimRef != nil { + glog.V(5).Infof("Reclaiming PersistentVolume[%s]\n", pv.Name) + + latest, err := recycler.client.GetPersistentVolume(pv.Name) + if err != nil { + return fmt.Errorf("Could not find PersistentVolume %s", pv.Name) + } + if latest.Status.Phase != api.VolumeReleased { + return fmt.Errorf("PersistentVolume[%s] phase is %s, expected %s. Skipping.", pv.Name, latest.Status.Phase, api.VolumeReleased) + } + + // handleRecycle blocks until completion + // TODO: allow parallel recycling operations to increase throughput + // TODO implement handleDelete in a separate PR w/ cloud volumes + switch pv.Spec.PersistentVolumeReclaimPolicy { + case api.PersistentVolumeReclaimRecycle: + err = recycler.handleRecycle(pv) + case api.PersistentVolumeReclaimRetain: + glog.V(5).Infof("Volume %s is set to retain after release. Skipping.\n", pv.Name) + default: + err = fmt.Errorf("No PersistentVolumeReclaimPolicy defined for spec: %+v", pv) + } + if err != nil { + errMsg := fmt.Sprintf("Could not recycle volume spec: %+v", err) + glog.Errorf(errMsg) + return fmt.Errorf(errMsg) + } + } + return nil +} + +func (recycler *PersistentVolumeRecycler) handleRecycle(pv *api.PersistentVolume) error { + glog.V(5).Infof("Recycling PersistentVolume[%s]\n", pv.Name) + + currentPhase := pv.Status.Phase + nextPhase := currentPhase + + spec := volume.NewSpecFromPersistentVolume(pv) + plugin, err := recycler.pluginMgr.FindRecyclablePluginBySpec(spec) + if err != nil { + return fmt.Errorf("Could not find recyclable volume plugin for spec: %+v", err) + } + volRecycler, err := plugin.NewRecycler(spec) + if err != nil { + return fmt.Errorf("Could not obtain Recycler for spec: %+v", err) + } + // blocks until completion + err = volRecycler.Recycle() + if err != nil { + glog.Errorf("PersistentVolume[%s] failed recycling: %+v", err) + pv.Status.Message = fmt.Sprintf("Recycling error: %s", err) + nextPhase = api.VolumeFailed + } else { + glog.V(5).Infof("PersistentVolume[%s] successfully recycled\n", pv.Name) + nextPhase = api.VolumePending + if err != nil { + glog.Errorf("Error updating pv.Status: %+v", err) + } + } + + if currentPhase != nextPhase { + glog.V(5).Infof("PersistentVolume[%s] changing phase from %s to %s\n", pv.Name, currentPhase, nextPhase) + pv.Status.Phase = nextPhase + _, err := recycler.client.UpdatePersistentVolumeStatus(pv) + if err != nil { + // Rollback to previous phase + pv.Status.Phase = currentPhase + } + } + + return nil +} + +// Run starts this recycler's control loops +func (recycler *PersistentVolumeRecycler) Run() { + glog.V(5).Infof("Starting PersistentVolumeRecycler\n") + if recycler.stopChannel == nil { + recycler.stopChannel = make(chan struct{}) + go recycler.volumeController.Run(recycler.stopChannel) + } +} + +// Stop gracefully shuts down this binder +func (recycler *PersistentVolumeRecycler) Stop() { + glog.V(5).Infof("Stopping PersistentVolumeRecycler\n") + if recycler.stopChannel != nil { + close(recycler.stopChannel) + recycler.stopChannel = nil + } +} + +// recyclerClient abstracts access to PVs +type recyclerClient interface { + GetPersistentVolume(name string) (*api.PersistentVolume, error) + UpdatePersistentVolume(volume *api.PersistentVolume) (*api.PersistentVolume, error) + UpdatePersistentVolumeStatus(volume *api.PersistentVolume) (*api.PersistentVolume, error) +} + +func NewRecyclerClient(c client.Interface) recyclerClient { + return &realRecyclerClient{c} +} + +type realRecyclerClient struct { + client client.Interface +} + +func (c *realRecyclerClient) GetPersistentVolume(name string) (*api.PersistentVolume, error) { + return c.client.PersistentVolumes().Get(name) +} + +func (c *realRecyclerClient) UpdatePersistentVolume(volume *api.PersistentVolume) (*api.PersistentVolume, error) { + return c.client.PersistentVolumes().Update(volume) +} + +func (c *realRecyclerClient) UpdatePersistentVolumeStatus(volume *api.PersistentVolume) (*api.PersistentVolume, error) { + return c.client.PersistentVolumes().UpdateStatus(volume) +} + +// PersistentVolumeRecycler is host to the volume plugins, but does not actually mount any volumes. +// Because no mounting is performed, most of the VolumeHost methods are not implemented. +func (f *PersistentVolumeRecycler) GetPluginDir(podUID string) string { + return "" +} + +func (f *PersistentVolumeRecycler) GetPodVolumeDir(podUID types.UID, pluginName, volumeName string) string { + return "" +} + +func (f *PersistentVolumeRecycler) GetPodPluginDir(podUID types.UID, pluginName string) string { + return "" +} + +func (f *PersistentVolumeRecycler) GetKubeClient() client.Interface { + return f.kubeClient +} + +func (f *PersistentVolumeRecycler) NewWrapperBuilder(spec *volume.Spec, pod *api.Pod, opts volume.VolumeOptions, mounter mount.Interface) (volume.Builder, error) { + return nil, fmt.Errorf("NewWrapperBuilder not supported by PVClaimBinder's VolumeHost implementation") +} + +func (f *PersistentVolumeRecycler) NewWrapperCleaner(spec *volume.Spec, podUID types.UID, mounter mount.Interface) (volume.Cleaner, error) { + return nil, fmt.Errorf("NewWrapperCleaner not supported by PVClaimBinder's VolumeHost implementation") +} diff --git a/test/e2e/persistent_volumes.go b/test/e2e/persistent_volumes.go new file mode 100644 index 00000000000..f3bc0e4558c --- /dev/null +++ b/test/e2e/persistent_volumes.go @@ -0,0 +1,200 @@ +/* +Copyright 2015 The Kubernetes Authors All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package e2e + +import ( + "fmt" + + "github.com/GoogleCloudPlatform/kubernetes/pkg/api" + "github.com/GoogleCloudPlatform/kubernetes/pkg/api/resource" + "github.com/GoogleCloudPlatform/kubernetes/pkg/client" + "github.com/GoogleCloudPlatform/kubernetes/pkg/util" + + . "github.com/onsi/ginkgo" + . "github.com/onsi/gomega" + "time" +) + +// Marked with [Skipped] to skip the test by default (see driver.go), +// the test needs privileged containers, which are disabled by default. +// Run the test with "go run hack/e2e.go ... --ginkgo.focus=PersistentVolume" +var _ = Describe("[Skipped] persistentVolumes", func() { + // f := NewFramework("pv") + + var c *client.Client + var ns string + + BeforeEach(func() { + var err error + c, err = loadClient() + Expect(err).NotTo(HaveOccurred()) + ns_, err := createTestingNS("pv", c) + ns = ns_.Name + Expect(err).NotTo(HaveOccurred()) + }) + + AfterEach(func() { + By(fmt.Sprintf("Destroying namespace for this suite %v", ns)) + if err := c.Namespaces().Delete(ns); err != nil { + Failf("Couldn't delete ns %s", err) + } + }) + + It("PersistentVolume", func() { + config := VolumeTestConfig{ + namespace: ns, + prefix: "nfs", + serverImage: "gcr.io/google_containers/volume-nfs", + serverPorts: []int{2049}, + } + + defer func() { + volumeTestCleanup(c, config) + }() + + pod := startVolumeServer(c, config) + serverIP := pod.Status.PodIP + Logf("NFS server IP address: %v", serverIP) + + pv := makePersistentVolume(serverIP) + pvc := makePersistentVolumeClaim(ns) + + Logf("Creating PersistentVolume using NFS") + pv, err := c.PersistentVolumes().Create(pv) + Expect(err).NotTo(HaveOccurred()) + + Logf("Creating PersistentVolumeClaim") + pvc, err = c.PersistentVolumeClaims(ns).Create(pvc) + Expect(err).NotTo(HaveOccurred()) + + // allow the binder a chance to catch up + time.Sleep(20 * time.Second) + + pv, err = c.PersistentVolumes().Get(pv.Name) + Expect(err).NotTo(HaveOccurred()) + if pv.Spec.ClaimRef == nil { + Failf("Expected PersistentVolume to be bound, but got nil ClaimRef: %+v", pv) + } + + Logf("Deleting PersistentVolumeClaim to trigger PV Recycling") + err = c.PersistentVolumeClaims(ns).Delete(pvc.Name) + Expect(err).NotTo(HaveOccurred()) + + // allow the recycler a chance to catch up + time.Sleep(120 * time.Second) + + pv, err = c.PersistentVolumes().Get(pv.Name) + Expect(err).NotTo(HaveOccurred()) + if pv.Spec.ClaimRef != nil { + Failf("Expected PersistentVolume to be unbound, but found non-nil ClaimRef: %+v", pv) + } + + // Now check that index.html from the NFS server was really removed + checkpod := makeCheckPod(ns, serverIP) + testContainerOutputInNamespace("the volume was scrubbed", c, checkpod, []string{"index.html does not exist"}, ns) + + }) +}) + +func makePersistentVolume(serverIP string) *api.PersistentVolume { + return &api.PersistentVolume{ + ObjectMeta: api.ObjectMeta{ + Name: "nfs-" + string(util.NewUUID()), + }, + Spec: api.PersistentVolumeSpec{ + PersistentVolumeReclaimPolicy: api.PersistentVolumeReclaimRecycle, + Capacity: api.ResourceList{ + api.ResourceName(api.ResourceStorage): resource.MustParse("2Gi"), + }, + PersistentVolumeSource: api.PersistentVolumeSource{ + NFS: &api.NFSVolumeSource{ + Server: serverIP, + Path: "/", + ReadOnly: false, + }, + }, + AccessModes: []api.PersistentVolumeAccessMode{ + api.ReadWriteOnce, + api.ReadOnlyMany, + api.ReadWriteMany, + }, + }, + } +} + +func makePersistentVolumeClaim(ns string) *api.PersistentVolumeClaim { + return &api.PersistentVolumeClaim{ + ObjectMeta: api.ObjectMeta{ + Name: "pvc-" + string(util.NewUUID()), + Namespace: ns, + }, + Spec: api.PersistentVolumeClaimSpec{ + AccessModes: []api.PersistentVolumeAccessMode{ + api.ReadWriteOnce, + api.ReadOnlyMany, + api.ReadWriteMany, + }, + Resources: api.ResourceRequirements{ + Requests: api.ResourceList{ + api.ResourceName(api.ResourceStorage): resource.MustParse("1Gi"), + }, + }, + }, + } +} + +func makeCheckPod(ns string, nfsserver string) *api.Pod { + // Prepare pod that mounts the NFS volume again and + // checks that /mnt/index.html was scrubbed there + return &api.Pod{ + TypeMeta: api.TypeMeta{ + Kind: "Pod", + APIVersion: "v1beta3", + }, + ObjectMeta: api.ObjectMeta{ + Name: "checker-" + string(util.NewUUID()), + }, + Spec: api.PodSpec{ + Containers: []api.Container{ + { + Name: "checker-" + string(util.NewUUID()), + Image: "busybox", + Command: []string{"/bin/sh"}, + Args: []string{"-c", "test -e /mnt/index.html || echo 'index.html does not exist'"}, + VolumeMounts: []api.VolumeMount{ + { + Name: "nfs-volume", + MountPath: "/mnt", + }, + }, + }, + }, + Volumes: []api.Volume{ + { + Name: "nfs-volume", + VolumeSource: api.VolumeSource{ + NFS: &api.NFSVolumeSource{ + Server: nfsserver, + Path: "/", + }, + }, + }, + }, + }, + } + +}