diff --git a/cmd/kube-controller-manager/app/controllermanager.go b/cmd/kube-controller-manager/app/controllermanager.go index f22eeee4d93..7f99a8f25bd 100644 --- a/cmd/kube-controller-manager/app/controllermanager.go +++ b/cmd/kube-controller-manager/app/controllermanager.go @@ -118,7 +118,10 @@ func NewCMServer() *CMServer { EnableDeploymentController: false, VolumeConfigFlags: VolumeConfigFlags{ // default values here - PersistentVolumeRecyclerTimeoutNFS: 300, + PersistentVolumeRecyclerMinimumTimeoutNFS: 300, + PersistentVolumeRecyclerIncrementTimeoutNFS: 30, + PersistentVolumeRecyclerMinimumTimeoutHostPath: 60, + PersistentVolumeRecyclerIncrementTimeoutHostPath: 30, }, } return &s @@ -129,7 +132,12 @@ func NewCMServer() *CMServer { // of volume.VolumeConfig which are then passed to the appropriate plugin. The ControllerManager binary is the only // part of the code which knows what plugins are supported and which CLI flags correspond to each plugin. type VolumeConfigFlags struct { - PersistentVolumeRecyclerTimeoutNFS int + PersistentVolumeRecyclerMinimumTimeoutNFS int + PersistentVolumeRecyclerPodTemplateFilePathNFS string + PersistentVolumeRecyclerIncrementTimeoutNFS int + PersistentVolumeRecyclerPodTemplateFilePathHostPath string + PersistentVolumeRecyclerMinimumTimeoutHostPath int + PersistentVolumeRecyclerIncrementTimeoutHostPath int } // AddFlags adds flags for a specific CMServer to the specified FlagSet @@ -147,8 +155,12 @@ func (s *CMServer) AddFlags(fs *pflag.FlagSet) { fs.DurationVar(&s.ResourceQuotaSyncPeriod, "resource-quota-sync-period", s.ResourceQuotaSyncPeriod, "The period for syncing quota usage status in the system") fs.DurationVar(&s.NamespaceSyncPeriod, "namespace-sync-period", s.NamespaceSyncPeriod, "The period for syncing namespace life-cycle updates") fs.DurationVar(&s.PVClaimBinderSyncPeriod, "pvclaimbinder-sync-period", s.PVClaimBinderSyncPeriod, "The period for syncing persistent volumes and persistent volume claims") - // TODO markt -- make this example a working config item with Recycler Config PR. - // fs.MyExample(&s.VolumeConfig.PersistentVolumeRecyclerTimeoutNFS, "pv-recycler-timeout-nfs", s.VolumeConfig.PersistentVolumeRecyclerTimeoutNFS, "The minimum timeout for an NFS PV recycling operation") + fs.StringVar(&s.VolumeConfigFlags.PersistentVolumeRecyclerPodTemplateFilePathNFS, "pv-recycler-pod-template-filepath-nfs", s.VolumeConfigFlags.PersistentVolumeRecyclerPodTemplateFilePathNFS, "The file path to a pod definition used as a template for NFS persistent volume recycling") + fs.IntVar(&s.VolumeConfigFlags.PersistentVolumeRecyclerMinimumTimeoutNFS, "pv-recycler-minimum-timeout-nfs", s.VolumeConfigFlags.PersistentVolumeRecyclerMinimumTimeoutNFS, "The minimum ActiveDeadlineSeconds to use for an NFS Recycler pod") + fs.IntVar(&s.VolumeConfigFlags.PersistentVolumeRecyclerIncrementTimeoutNFS, "pv-recycler-increment-timeout-nfs", s.VolumeConfigFlags.PersistentVolumeRecyclerIncrementTimeoutNFS, "the increment of time added per Gi to ActiveDeadlineSeconds for an NFS scrubber pod") + fs.StringVar(&s.VolumeConfigFlags.PersistentVolumeRecyclerPodTemplateFilePathHostPath, "pv-recycler-pod-template-filepath-hostpath", s.VolumeConfigFlags.PersistentVolumeRecyclerPodTemplateFilePathHostPath, "The file path to a pod definition used as a template for HostPath persistent volume recycling. This is for development and testing only and will not work in a multi-node cluster.") + fs.IntVar(&s.VolumeConfigFlags.PersistentVolumeRecyclerMinimumTimeoutHostPath, "pv-recycler-minimum-timeout-hostpath", s.VolumeConfigFlags.PersistentVolumeRecyclerMinimumTimeoutHostPath, "The minimum ActiveDeadlineSeconds to use for a HostPath Recycler pod. This is for development and testing only and will not work in a multi-node cluster.") + fs.IntVar(&s.VolumeConfigFlags.PersistentVolumeRecyclerIncrementTimeoutHostPath, "pv-recycler-timeout-increment-hostpath", s.VolumeConfigFlags.PersistentVolumeRecyclerIncrementTimeoutHostPath, "the increment of time added per Gi to ActiveDeadlineSeconds for a HostPath scrubber pod. This is for development and testing only and will not work in a multi-node cluster.") fs.DurationVar(&s.HorizontalPodAutoscalerSyncPeriod, "horizontal-pod-autoscaler-sync-period", s.HorizontalPodAutoscalerSyncPeriod, "The period for syncing the number of pods in horizontal pod autoscaler.") fs.DurationVar(&s.DeploymentControllerSyncPeriod, "deployment-controller-sync-period", s.DeploymentControllerSyncPeriod, "Period for syncing the deployments.") fs.DurationVar(&s.PodEvictionTimeout, "pod-eviction-timeout", s.PodEvictionTimeout, "The grace period for deleting pods on failed nodes.") @@ -271,6 +283,7 @@ func (s *CMServer) Run(_ []string) error { pvclaimBinder := volumeclaimbinder.NewPersistentVolumeClaimBinder(kubeClient, s.PVClaimBinderSyncPeriod) pvclaimBinder.Run() + pvRecycler, err := volumeclaimbinder.NewPersistentVolumeRecycler(kubeClient, s.PVClaimBinderSyncPeriod, ProbeRecyclableVolumePlugins(s.VolumeConfigFlags)) if err != nil { glog.Fatalf("Failed to start persistent volume recycler: %+v", err) diff --git a/cmd/kube-controller-manager/app/plugins.go b/cmd/kube-controller-manager/app/plugins.go index 76f41e5df1f..9239eac1a37 100644 --- a/cmd/kube-controller-manager/app/plugins.go +++ b/cmd/kube-controller-manager/app/plugins.go @@ -25,9 +25,12 @@ import ( _ "k8s.io/kubernetes/pkg/cloudprovider/providers" // Volume plugins + "k8s.io/kubernetes/pkg/util/io" "k8s.io/kubernetes/pkg/volume" "k8s.io/kubernetes/pkg/volume/host_path" "k8s.io/kubernetes/pkg/volume/nfs" + + "github.com/golang/glog" ) // ProbeRecyclableVolumePlugins collects all persistent volume plugins into an easy to use list. @@ -41,15 +44,41 @@ func ProbeRecyclableVolumePlugins(flags VolumeConfigFlags) []volume.VolumePlugin // Each plugin can make use of VolumeConfig. The single arg to this func contains *all* enumerated // CLI flags meant to configure volume plugins. From that single config, create an instance of volume.VolumeConfig // for a specific plugin and pass that instance to the plugin's ProbeVolumePlugins(config) func. - hostPathConfig := volume.VolumeConfig{ - // transfer attributes from VolumeConfig to this instance of volume.VolumeConfig - } - nfsConfig := volume.VolumeConfig{ - // TODO transfer config.PersistentVolumeRecyclerTimeoutNFS and other flags to this instance of VolumeConfig - // Configuring recyclers will be done in a follow-up PR - } + // HostPath recycling is for testing and development purposes only! + hostPathConfig := volume.VolumeConfig{ + RecyclerMinimumTimeout: flags.PersistentVolumeRecyclerMinimumTimeoutHostPath, + RecyclerTimeoutIncrement: flags.PersistentVolumeRecyclerIncrementTimeoutHostPath, + RecyclerPodTemplate: volume.NewPersistentVolumeRecyclerPodTemplate(), + } + if err := attemptToLoadRecycler(flags.PersistentVolumeRecyclerPodTemplateFilePathHostPath, &hostPathConfig); err != nil { + glog.Fatalf("Could not create hostpath recycler pod from file %s: %+v", err) + } allPlugins = append(allPlugins, host_path.ProbeVolumePlugins(hostPathConfig)...) + + nfsConfig := volume.VolumeConfig{ + RecyclerMinimumTimeout: flags.PersistentVolumeRecyclerMinimumTimeoutNFS, + RecyclerTimeoutIncrement: flags.PersistentVolumeRecyclerIncrementTimeoutNFS, + RecyclerPodTemplate: volume.NewPersistentVolumeRecyclerPodTemplate(), + } + if err := attemptToLoadRecycler(flags.PersistentVolumeRecyclerPodTemplateFilePathNFS, &nfsConfig); err != nil { + glog.Fatalf("Could not create NFS recycler pod from file %s: %+v", err) + } allPlugins = append(allPlugins, nfs.ProbeVolumePlugins(nfsConfig)...) + return allPlugins } + +// attemptToLoadRecycler tries decoding a pod from a filepath for use as a recycler for a volume. +// If successful, this method will set the recycler on the config. +// If unsucessful, an error is returned. +func attemptToLoadRecycler(path string, config *volume.VolumeConfig) error { + if path != "" { + recyclerPod, err := io.LoadPodFromFile(path) + if err != nil { + return err + } + config.RecyclerPodTemplate = recyclerPod + } + return nil +} diff --git a/hack/verify-flags/known-flags.txt b/hack/verify-flags/known-flags.txt index 59839911f7f..e260cc12561 100644 --- a/hack/verify-flags/known-flags.txt +++ b/hack/verify-flags/known-flags.txt @@ -208,6 +208,12 @@ proxy-logv proxy-port-range public-address-override pvclaimbinder-sync-period +pv-recycler-pod-template-filepath-nfs +pv-recycler-minimum-timeout-nfs +pv-recycler-increment-timeout-nfs +pv-recycler-pod-template-filepath-hostpath +pv-recycler-minimum-timeout-hostpath +pv-recycler-timeout-increment-hostpath read-only-port really-crash-for-testing reconcile-cooldown diff --git a/pkg/controller/persistentvolume/persistentvolume_claim_binder_controller_test.go b/pkg/controller/persistentvolume/persistentvolume_claim_binder_controller_test.go index 2b35f65678c..352b44aa5b5 100644 --- a/pkg/controller/persistentvolume/persistentvolume_claim_binder_controller_test.go +++ b/pkg/controller/persistentvolume/persistentvolume_claim_binder_controller_test.go @@ -199,7 +199,7 @@ func TestBindingWithExamples(t *testing.T) { } plugMgr := volume.VolumePluginMgr{} - plugMgr.InitPlugins(host_path.ProbeRecyclableVolumePlugins(newMockRecycler), volume.NewFakeVolumeHost("/tmp/fake", nil, nil)) + plugMgr.InitPlugins(host_path.ProbeRecyclableVolumePlugins(newMockRecycler, volume.VolumeConfig{}), volume.NewFakeVolumeHost("/tmp/fake", nil, nil)) recycler := &PersistentVolumeRecycler{ kubeClient: client, @@ -388,7 +388,7 @@ func (c *mockBinderClient) UpdatePersistentVolumeClaimStatus(claim *api.Persiste return claim, nil } -func newMockRecycler(spec *volume.Spec, host volume.VolumeHost) (volume.Recycler, error) { +func newMockRecycler(spec *volume.Spec, host volume.VolumeHost, config volume.VolumeConfig) (volume.Recycler, error) { return &mockRecycler{ path: spec.PersistentVolume.Spec.HostPath.Path, }, nil diff --git a/pkg/controller/persistentvolume/persistentvolume_recycler_controller.go b/pkg/controller/persistentvolume/persistentvolume_recycler_controller.go index 6b8e8b8a501..fadbe023b0e 100644 --- a/pkg/controller/persistentvolume/persistentvolume_recycler_controller.go +++ b/pkg/controller/persistentvolume/persistentvolume_recycler_controller.go @@ -35,6 +35,8 @@ import ( "k8s.io/kubernetes/pkg/watch" ) +var _ volume.VolumeHost = &PersistentVolumeRecycler{} + // PersistentVolumeRecycler is a controller that watches for PersistentVolumes that are released from their claims. // This controller will Recycle those volumes whose reclaim policy is set to PersistentVolumeReclaimRecycle and make them // available again for a new claim. diff --git a/pkg/util/io/io.go b/pkg/util/io/io.go new file mode 100644 index 00000000000..8324a83b928 --- /dev/null +++ b/pkg/util/io/io.go @@ -0,0 +1,57 @@ +/* +Copyright 2015 The Kubernetes Authors All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package io + +import ( + "fmt" + "io/ioutil" + "os" + + "k8s.io/kubernetes/pkg/api" + "k8s.io/kubernetes/pkg/api/latest" +) + +// LoadPodFromFile will read, decode, and return a Pod from a file. +func LoadPodFromFile(filePath string) (*api.Pod, error) { + if filePath == "" { + return nil, fmt.Errorf("file path not specified") + } + podDef, err := ioutil.ReadFile(filePath) + if err != nil { + return nil, fmt.Errorf("failed to read file path %s: %+v", filePath, err) + } + if len(podDef) == 0 { + return nil, fmt.Errorf("file was empty: %s", filePath) + } + pod := &api.Pod{} + if err := latest.Codec.DecodeInto(podDef, pod); err != nil { + return nil, fmt.Errorf("failed decoding file: %v", err) + } + return pod, nil +} + +// SavePodToFile will encode and save a pod to a given path & permissions +func SavePodToFile(pod *api.Pod, filePath string, perm os.FileMode) error { + if filePath == "" { + return fmt.Errorf("file path not specified") + } + data, err := latest.Codec.Encode(pod) + if err != nil { + return fmt.Errorf("failed encoding pod: %v", err) + } + return ioutil.WriteFile(filePath, data, perm) +} diff --git a/pkg/util/io/io_test.go b/pkg/util/io/io_test.go new file mode 100644 index 00000000000..105bb5c5ee9 --- /dev/null +++ b/pkg/util/io/io_test.go @@ -0,0 +1,50 @@ +/* +Copyright 2015 The Kubernetes Authors All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package io + +import ( + "fmt" + "github.com/pborman/uuid" + "k8s.io/kubernetes/pkg/api" + "k8s.io/kubernetes/pkg/api/latest" + "k8s.io/kubernetes/pkg/volume" + "os" + "testing" +) + +func TestSavePodToFile(t *testing.T) { + pod := volume.NewPersistentVolumeRecyclerPodTemplate() + + // sets all default values on a pod for equality comparison after decoding from file + encoded, err := latest.Codec.Encode(pod) + latest.Codec.DecodeInto(encoded, pod) + + path := fmt.Sprintf("/tmp/kube-io-test-%s", uuid.New()) + defer os.Remove(path) + + if err := SavePodToFile(pod, path, 777); err != nil { + t.Fatalf("failed to save pod to file: %v", err) + } + + podFromFile, err := LoadPodFromFile(path) + if err != nil { + t.Fatalf("failed to load pod from file: %v", err) + } + if !api.Semantic.DeepEqual(pod, podFromFile) { + t.Errorf("\nexpected %#v\ngot %#v\n", pod, podFromFile) + } +} diff --git a/pkg/volume/host_path/host_path.go b/pkg/volume/host_path/host_path.go index a9213b4b09b..02a83851848 100644 --- a/pkg/volume/host_path/host_path.go +++ b/pkg/volume/host_path/host_path.go @@ -21,26 +21,39 @@ import ( "k8s.io/kubernetes/pkg/api" "k8s.io/kubernetes/pkg/types" - "k8s.io/kubernetes/pkg/util" "k8s.io/kubernetes/pkg/util/mount" "k8s.io/kubernetes/pkg/volume" ) // This is the primary entrypoint for volume plugins. -// Tests covering recycling should not use this func but instead -// use their own array of plugins w/ a custom recyclerFunc as appropriate -func ProbeVolumePlugins(config volume.VolumeConfig) []volume.VolumePlugin { - return []volume.VolumePlugin{&hostPathPlugin{nil, newRecycler}} +// The volumeConfig arg provides the ability to configure volume behavior. It is implemented as a pointer to allow nils. +// The hostPathPlugin is used to store the volumeConfig and give it, when needed, to the func that creates HostPath Recyclers. +// Tests that exercise recycling should not use this func but instead use ProbeRecyclablePlugins() to override default behavior. +func ProbeVolumePlugins(volumeConfig volume.VolumeConfig) []volume.VolumePlugin { + return []volume.VolumePlugin{ + &hostPathPlugin{ + host: nil, + newRecyclerFunc: newRecycler, + config: volumeConfig, + }, + } } -func ProbeRecyclableVolumePlugins(recyclerFunc func(spec *volume.Spec, host volume.VolumeHost) (volume.Recycler, error)) []volume.VolumePlugin { - return []volume.VolumePlugin{&hostPathPlugin{nil, recyclerFunc}} +func ProbeRecyclableVolumePlugins(recyclerFunc func(spec *volume.Spec, host volume.VolumeHost, volumeConfig volume.VolumeConfig) (volume.Recycler, error), volumeConfig volume.VolumeConfig) []volume.VolumePlugin { + return []volume.VolumePlugin{ + &hostPathPlugin{ + host: nil, + newRecyclerFunc: recyclerFunc, + config: volumeConfig, + }, + } } type hostPathPlugin struct { host volume.VolumeHost // decouple creating recyclers by deferring to a function. Allows for easier testing. - newRecyclerFunc func(spec *volume.Spec, host volume.VolumeHost) (volume.Recycler, error) + newRecyclerFunc func(spec *volume.Spec, host volume.VolumeHost, volumeConfig volume.VolumeConfig) (volume.Recycler, error) + config volume.VolumeConfig } var _ volume.VolumePlugin = &hostPathPlugin{} @@ -89,14 +102,20 @@ func (plugin *hostPathPlugin) NewCleaner(volName string, podUID types.UID, _ mou } func (plugin *hostPathPlugin) NewRecycler(spec *volume.Spec) (volume.Recycler, error) { - return plugin.newRecyclerFunc(spec, plugin.host) + return plugin.newRecyclerFunc(spec, plugin.host, plugin.config) } -func newRecycler(spec *volume.Spec, host volume.VolumeHost) (volume.Recycler, error) { +func newRecycler(spec *volume.Spec, host volume.VolumeHost, config volume.VolumeConfig) (volume.Recycler, error) { if spec.PersistentVolume == nil || spec.PersistentVolume.Spec.HostPath == nil { return nil, fmt.Errorf("spec.PersistentVolumeSource.HostPath is nil") } - return &hostPathRecycler{spec.Name(), spec.PersistentVolume.Spec.HostPath.Path, host}, nil + return &hostPathRecycler{ + name: spec.Name(), + path: spec.PersistentVolume.Spec.HostPath.Path, + host: host, + config: config, + timeout: volume.CalculateTimeoutForVolume(config.RecyclerMinimumTimeout, config.RecyclerTimeoutIncrement, spec.PersistentVolume), + }, nil } // HostPath volumes represent a bare host file or directory mount. @@ -153,60 +172,29 @@ func (c *hostPathCleaner) TearDownAt(dir string) error { // hostPathRecycler scrubs a hostPath volume by running "rm -rf" on the volume in a pod // This recycler only works on a single host cluster and is for testing purposes only. type hostPathRecycler struct { - name string - path string - host volume.VolumeHost + name string + path string + host volume.VolumeHost + config volume.VolumeConfig + timeout int64 } func (r *hostPathRecycler) GetPath() string { return r.path } -// Recycler provides methods to reclaim the volume resource. -// A HostPath is recycled by scheduling a pod to run "rm -rf" on the contents of the volume. This is meant for -// development and testing in a single node cluster only. +// Recycle recycles/scrubs clean a HostPath volume. // Recycle blocks until the pod has completed or any error occurs. -// The scrubber pod's is expected to succeed within 30 seconds when testing localhost. +// HostPath recycling only works in single node clusters and is meant for testing purposes only. func (r *hostPathRecycler) Recycle() error { - timeout := int64(30) - pod := &api.Pod{ - ObjectMeta: api.ObjectMeta{ - GenerateName: "pv-scrubber-" + util.ShortenString(r.name, 44) + "-", - Namespace: api.NamespaceDefault, - }, - Spec: api.PodSpec{ - ActiveDeadlineSeconds: &timeout, - RestartPolicy: api.RestartPolicyNever, - Volumes: []api.Volume{ - { - Name: "vol", - VolumeSource: api.VolumeSource{ - HostPath: &api.HostPathVolumeSource{Path: r.path}, - }, - }, - }, - Containers: []api.Container{ - { - Name: "scrubber", - Image: "gcr.io/google_containers/busybox", - // delete the contents of the volume, but not the directory itself - Command: []string{"/bin/sh"}, - // the scrubber: - // 1. validates the /scrub directory exists - // 2. creates a text file in the directory to be scrubbed - // 3. performs rm -rf on the directory - // 4. tests to see if the directory is empty - // the pod fails if the error code is returned - Args: []string{"-c", "test -e /scrub && echo $(date) > /scrub/trash.txt && rm -rf /scrub/* && test -z \"$(ls -A /scrub)\" || exit 1"}, - VolumeMounts: []api.VolumeMount{ - { - Name: "vol", - MountPath: "/scrub", - }, - }, - }, - }, + pod := r.config.RecyclerPodTemplate + // overrides + pod.Spec.ActiveDeadlineSeconds = &r.timeout + pod.GenerateName = "pv-recycler-hostpath-" + pod.Spec.Volumes[0].VolumeSource = api.VolumeSource{ + HostPath: &api.HostPathVolumeSource{ + Path: r.path, }, } - return volume.ScrubPodVolumeAndWatchUntilCompletion(pod, r.host.GetKubeClient()) + return volume.RecycleVolumeByWatchingPodUntilCompletion(pod, r.host.GetKubeClient()) } diff --git a/pkg/volume/host_path/host_path_test.go b/pkg/volume/host_path/host_path_test.go index 5c5be1ea649..6094eed4d5a 100644 --- a/pkg/volume/host_path/host_path_test.go +++ b/pkg/volume/host_path/host_path_test.go @@ -63,8 +63,8 @@ func TestGetAccessModes(t *testing.T) { func TestRecycler(t *testing.T) { plugMgr := volume.VolumePluginMgr{} - volumeHost := volume.NewFakeVolumeHost("/tmp/fake", nil, nil) - plugMgr.InitPlugins([]volume.VolumePlugin{&hostPathPlugin{nil, volume.NewFakeRecycler}}, volumeHost) + pluginHost := volume.NewFakeVolumeHost("/tmp/fake", nil, nil) + plugMgr.InitPlugins([]volume.VolumePlugin{&hostPathPlugin{nil, volume.NewFakeRecycler, volume.VolumeConfig{}}}, pluginHost) spec := &volume.Spec{PersistentVolume: &api.PersistentVolume{Spec: api.PersistentVolumeSpec{PersistentVolumeSource: api.PersistentVolumeSource{HostPath: &api.HostPathVolumeSource{Path: "/foo"}}}}} plug, err := plugMgr.FindRecyclablePluginBySpec(spec) diff --git a/pkg/volume/nfs/nfs.go b/pkg/volume/nfs/nfs.go index c382a9761d4..dc4352e4f96 100644 --- a/pkg/volume/nfs/nfs.go +++ b/pkg/volume/nfs/nfs.go @@ -30,16 +30,24 @@ import ( ) // This is the primary entrypoint for volume plugins. -// Tests covering recycling should not use this func but instead -// use their own array of plugins w/ a custom recyclerFunc as appropriate -func ProbeVolumePlugins(config volume.VolumeConfig) []volume.VolumePlugin { - return []volume.VolumePlugin{&nfsPlugin{nil, newRecycler}} +// The volumeConfig arg provides the ability to configure recycler behavior. It is implemented as a pointer to allow nils. +// The nfsPlugin is used to store the volumeConfig and give it, when needed, to the func that creates NFS Recyclers. +// Tests that exercise recycling should not use this func but instead use ProbeRecyclablePlugins() to override default behavior. +func ProbeVolumePlugins(volumeConfig volume.VolumeConfig) []volume.VolumePlugin { + return []volume.VolumePlugin{ + &nfsPlugin{ + host: nil, + newRecyclerFunc: newRecycler, + config: volumeConfig, + }, + } } type nfsPlugin struct { host volume.VolumeHost // decouple creating recyclers by deferring to a function. Allows for easier testing. - newRecyclerFunc func(spec *volume.Spec, host volume.VolumeHost) (volume.Recycler, error) + newRecyclerFunc func(spec *volume.Spec, host volume.VolumeHost, volumeConfig volume.VolumeConfig) (volume.Recycler, error) + config volume.VolumeConfig } var _ volume.VolumePlugin = &nfsPlugin{} @@ -112,7 +120,7 @@ func (plugin *nfsPlugin) newCleanerInternal(volName string, podUID types.UID, mo } func (plugin *nfsPlugin) NewRecycler(spec *volume.Spec) (volume.Recycler, error) { - return plugin.newRecyclerFunc(spec, plugin.host) + return plugin.newRecyclerFunc(spec, plugin.host, plugin.config) } // NFS volumes represent a bare host file or directory mount of an NFS export. @@ -122,7 +130,7 @@ type nfs struct { mounter mount.Interface plugin *nfsPlugin // decouple creating recyclers by deferring to a function. Allows for easier testing. - newRecyclerFunc func(spec *volume.Spec, host volume.VolumeHost) (volume.Recycler, error) + newRecyclerFunc func(spec *volume.Spec, host volume.VolumeHost, volumeConfig volume.VolumeConfig) (volume.Recycler, error) } func (nfsVolume *nfs) GetPath() string { @@ -236,77 +244,46 @@ func (c *nfsCleaner) TearDownAt(dir string) error { return nil } -func newRecycler(spec *volume.Spec, host volume.VolumeHost) (volume.Recycler, error) { +func newRecycler(spec *volume.Spec, host volume.VolumeHost, volumeConfig volume.VolumeConfig) (volume.Recycler, error) { if spec.PersistentVolume == nil || spec.PersistentVolume.Spec.NFS == nil { return nil, fmt.Errorf("spec.PersistentVolumeSource.NFS is nil") } return &nfsRecycler{ - name: spec.Name(), - server: spec.PersistentVolume.Spec.NFS.Server, - path: spec.PersistentVolume.Spec.NFS.Path, - host: host, + name: spec.Name(), + server: spec.PersistentVolume.Spec.NFS.Server, + path: spec.PersistentVolume.Spec.NFS.Path, + host: host, + config: volumeConfig, + timeout: volume.CalculateTimeoutForVolume(volumeConfig.RecyclerMinimumTimeout, volumeConfig.RecyclerTimeoutIncrement, spec.PersistentVolume), }, nil } // nfsRecycler scrubs an NFS volume by running "rm -rf" on the volume in a pod. type nfsRecycler struct { - name string - server string - path string - host volume.VolumeHost + name string + server string + path string + host volume.VolumeHost + config volume.VolumeConfig + timeout int64 } func (r *nfsRecycler) GetPath() string { return r.path } -// Recycler provides methods to reclaim the volume resource. -// A NFS volume is recycled by scheduling a pod to run "rm -rf" on the contents of the volume. +// Recycle recycles/scrubs clean an NFS volume. // Recycle blocks until the pod has completed or any error occurs. -// The scrubber pod's is expected to succeed within 5 minutes else an error will be returned func (r *nfsRecycler) Recycle() error { - timeout := int64(300) // 5 minutes - pod := &api.Pod{ - ObjectMeta: api.ObjectMeta{ - GenerateName: "pv-scrubber-" + util.ShortenString(r.name, 44) + "-", - Namespace: api.NamespaceDefault, - }, - Spec: api.PodSpec{ - ActiveDeadlineSeconds: &timeout, - RestartPolicy: api.RestartPolicyNever, - Volumes: []api.Volume{ - { - Name: "vol", - VolumeSource: api.VolumeSource{ - NFS: &api.NFSVolumeSource{ - Server: r.server, - Path: r.path, - }, - }, - }, - }, - Containers: []api.Container{ - { - Name: "scrubber", - Image: "gcr.io/google_containers/busybox", - // delete the contents of the volume, but not the directory itself - Command: []string{"/bin/sh"}, - // the scrubber: - // 1. validates the /scrub directory exists - // 2. creates a text file to be scrubbed - // 3. performs rm -rf on the directory - // 4. tests to see if the directory is empty - // the pod fails if the error code is returned - Args: []string{"-c", "test -e /scrub && echo $(date) > /scrub/trash.txt && rm -rf /scrub/* && test -z \"$(ls -A /scrub)\" || exit 1"}, - VolumeMounts: []api.VolumeMount{ - { - Name: "vol", - MountPath: "/scrub", - }, - }, - }, - }, + pod := r.config.RecyclerPodTemplate + // overrides + pod.Spec.ActiveDeadlineSeconds = &r.timeout + pod.GenerateName = "pv-recycler-nfs-" + pod.Spec.Volumes[0].VolumeSource = api.VolumeSource{ + NFS: &api.NFSVolumeSource{ + Server: r.server, + Path: r.path, }, } - return volume.ScrubPodVolumeAndWatchUntilCompletion(pod, r.host.GetKubeClient()) + return volume.RecycleVolumeByWatchingPodUntilCompletion(pod, r.host.GetKubeClient()) } diff --git a/pkg/volume/nfs/nfs_test.go b/pkg/volume/nfs/nfs_test.go index 3cb92e7740a..186d291664f 100644 --- a/pkg/volume/nfs/nfs_test.go +++ b/pkg/volume/nfs/nfs_test.go @@ -64,7 +64,7 @@ func TestGetAccessModes(t *testing.T) { func TestRecycler(t *testing.T) { plugMgr := volume.VolumePluginMgr{} - plugMgr.InitPlugins([]volume.VolumePlugin{&nfsPlugin{nil, newMockRecycler}}, volume.NewFakeVolumeHost("/tmp/fake", nil, nil)) + plugMgr.InitPlugins([]volume.VolumePlugin{&nfsPlugin{nil, newMockRecycler, volume.VolumeConfig{}}}, volume.NewFakeVolumeHost("/tmp/fake", nil, nil)) spec := &volume.Spec{PersistentVolume: &api.PersistentVolume{Spec: api.PersistentVolumeSpec{PersistentVolumeSource: api.PersistentVolumeSource{NFS: &api.NFSVolumeSource{Path: "/foo"}}}}} plug, err := plugMgr.FindRecyclablePluginBySpec(spec) @@ -83,7 +83,7 @@ func TestRecycler(t *testing.T) { } } -func newMockRecycler(spec *volume.Spec, host volume.VolumeHost) (volume.Recycler, error) { +func newMockRecycler(spec *volume.Spec, host volume.VolumeHost, config volume.VolumeConfig) (volume.Recycler, error) { return &mockRecycler{ path: spec.PersistentVolume.Spec.NFS.Path, }, nil diff --git a/pkg/volume/plugins.go b/pkg/volume/plugins.go index bacaa736f8c..8c8a42d97e6 100644 --- a/pkg/volume/plugins.go +++ b/pkg/volume/plugins.go @@ -169,8 +169,22 @@ func (spec *Spec) Name() string { // The binary should still use strong typing for this value when binding CLI values before they are passed as strings // in OtherAttributes. type VolumeConfig struct { - // thockin: do we want to wait on this until we have an actual use case? I can change the comments above to - // reflect our intention for one-off config. + // RecyclerPodTemplate is pod template that understands how to scrub clean a persistent volume after its release. + // The template is used by plugins which override specific properties of the pod in accordance with that plugin. + // See NewPersistentVolumeRecyclerPodTemplate for the properties that are expected to be overridden. + RecyclerPodTemplate *api.Pod + + // RecyclerMinimumTimeout is the minimum amount of time in seconds for the recycler pod's ActiveDeadlineSeconds attribute. + // Added to the minimum timeout is the increment per Gi of capacity. + RecyclerMinimumTimeout int + + // RecyclerTimeoutIncrement is the number of seconds added to the recycler pod's ActiveDeadlineSeconds for each + // Gi of capacity in the persistent volume. + // Example: 5Gi volume x 30s increment = 150s + 30s minimum = 180s ActiveDeadlineSeconds for recycler pod + RecyclerTimeoutIncrement int + + // OtherAttributes stores config as strings. These strings are opaque to the system and only understood by the binary + // hosting the plugin and the plugin itself. OtherAttributes map[string]string } @@ -301,3 +315,49 @@ func (pm *VolumePluginMgr) FindRecyclablePluginBySpec(spec *Spec) (RecyclableVol } return nil, fmt.Errorf("no recyclable volume plugin matched") } + +// NewPersistentVolumeRecyclerPodTemplate creates a template for a recycler pod. By default, a recycler pod simply runs +// "rm -rf" on a volume and tests for emptiness. Most attributes of the template will be correct for most +// plugin implementations. The following attributes can be overridden per plugin via configuration: +// +// 1. pod.Spec.Volumes[0].VolumeSource must be overridden. Recycler implementations without a valid VolumeSource will fail. +// 2. pod.GenerateName helps distinguish recycler pods by name. Recommended. Default is "pv-recycler-". +// 3. pod.Spec.ActiveDeadlineSeconds gives the recycler pod a maximum timeout before failing. Recommended. Default is 60 seconds. +// +// See HostPath and NFS for working recycler examples +func NewPersistentVolumeRecyclerPodTemplate() *api.Pod { + timeout := int64(60) + pod := &api.Pod{ + ObjectMeta: api.ObjectMeta{ + GenerateName: "pv-recycler-", + Namespace: api.NamespaceDefault, + }, + Spec: api.PodSpec{ + ActiveDeadlineSeconds: &timeout, + RestartPolicy: api.RestartPolicyNever, + Volumes: []api.Volume{ + { + Name: "vol", + // IMPORTANT! All plugins using this template MUST override pod.Spec.Volumes[0].VolumeSource + // Recycler implementations without a valid VolumeSource will fail. + VolumeSource: api.VolumeSource{}, + }, + }, + Containers: []api.Container{ + { + Name: "pv-recycler", + Image: "gcr.io/google_containers/busybox", + Command: []string{"/bin/sh"}, + Args: []string{"-c", "test -e /scrub && echo $(date) > /scrub/trash.txt && rm -rf /scrub/* /scrub/.* && test -z \"$(ls -A /scrub)\" || exit 1"}, + VolumeMounts: []api.VolumeMount{ + { + Name: "vol", + MountPath: "/scrub", + }, + }, + }, + }, + }, + } + return pod +} diff --git a/pkg/volume/testing.go b/pkg/volume/testing.go index b347f5461f3..31e34de7de0 100644 --- a/pkg/volume/testing.go +++ b/pkg/volume/testing.go @@ -176,7 +176,7 @@ func (fr *fakeRecycler) GetPath() string { return fr.path } -func NewFakeRecycler(spec *Spec, host VolumeHost) (Recycler, error) { +func NewFakeRecycler(spec *Spec, host VolumeHost, config VolumeConfig) (Recycler, error) { if spec.PersistentVolume == nil || spec.PersistentVolume.Spec.HostPath == nil { return nil, fmt.Errorf("fakeRecycler only supports spec.PersistentVolume.Spec.HostPath") } diff --git a/pkg/volume/util.go b/pkg/volume/util.go index 9dba580b898..c8379b046d7 100644 --- a/pkg/volume/util.go +++ b/pkg/volume/util.go @@ -29,30 +29,31 @@ import ( "k8s.io/kubernetes/pkg/watch" "github.com/golang/glog" + "k8s.io/kubernetes/pkg/api/resource" ) -// ScrubPodVolumeAndWatchUntilCompletion is intended for use with volume Recyclers. This function will +// RecycleVolumeByWatchingPodUntilCompletion is intended for use with volume Recyclers. This function will // save the given Pod to the API and watch it until it completes, fails, or the pod's ActiveDeadlineSeconds is exceeded, whichever comes first. -// An attempt to delete a scrubber pod is always attempted before returning. -// pod - the pod designed by a volume plugin to scrub the volume's contents +// An attempt to delete a recycler pod is always attempted before returning. +// pod - the pod designed by a volume plugin to recycle the volume // client - kube client for API operations. -func ScrubPodVolumeAndWatchUntilCompletion(pod *api.Pod, kubeClient client.Interface) error { - return internalScrubPodVolumeAndWatchUntilCompletion(pod, newScrubberClient(kubeClient)) +func RecycleVolumeByWatchingPodUntilCompletion(pod *api.Pod, kubeClient client.Interface) error { + return internalRecycleVolumeByWatchingPodUntilCompletion(pod, newRecyclerClient(kubeClient)) } -// same as above func comments, except 'scrubberClient' is a narrower pod API interface to ease testing -func internalScrubPodVolumeAndWatchUntilCompletion(pod *api.Pod, scrubberClient scrubberClient) error { - glog.V(5).Infof("Creating scrubber pod for volume %s\n", pod.Name) - pod, err := scrubberClient.CreatePod(pod) +// same as above func comments, except 'recyclerClient' is a narrower pod API interface to ease testing +func internalRecycleVolumeByWatchingPodUntilCompletion(pod *api.Pod, recyclerClient recyclerClient) error { + glog.V(5).Infof("Creating recycler pod for volume %s\n", pod.Name) + pod, err := recyclerClient.CreatePod(pod) if err != nil { - return fmt.Errorf("Unexpected error creating a pod to scrub volume %s: %+v\n", pod.Name, err) + return fmt.Errorf("Unexpected error creating recycler pod: %+v\n", err) } - defer scrubberClient.DeletePod(pod.Name, pod.Namespace) + defer recyclerClient.DeletePod(pod.Name, pod.Namespace) stopChannel := make(chan struct{}) defer close(stopChannel) - nextPod := scrubberClient.WatchPod(pod.Name, pod.Namespace, pod.ResourceVersion, stopChannel) + nextPod := recyclerClient.WatchPod(pod.Name, pod.Namespace, pod.ResourceVersion, stopChannel) for { watchedPod := nextPod() @@ -71,39 +72,39 @@ func internalScrubPodVolumeAndWatchUntilCompletion(pod *api.Pod, scrubberClient } } -// scrubberClient abstracts access to a Pod by providing a narrower interface. +// recyclerClient abstracts access to a Pod by providing a narrower interface. // this makes it easier to mock a client for testing -type scrubberClient interface { +type recyclerClient interface { CreatePod(pod *api.Pod) (*api.Pod, error) GetPod(name, namespace string) (*api.Pod, error) DeletePod(name, namespace string) error WatchPod(name, namespace, resourceVersion string, stopChannel chan struct{}) func() *api.Pod } -func newScrubberClient(client client.Interface) scrubberClient { - return &realScrubberClient{client} +func newRecyclerClient(client client.Interface) recyclerClient { + return &realRecyclerClient{client} } -type realScrubberClient struct { +type realRecyclerClient struct { client client.Interface } -func (c *realScrubberClient) CreatePod(pod *api.Pod) (*api.Pod, error) { +func (c *realRecyclerClient) CreatePod(pod *api.Pod) (*api.Pod, error) { return c.client.Pods(pod.Namespace).Create(pod) } -func (c *realScrubberClient) GetPod(name, namespace string) (*api.Pod, error) { +func (c *realRecyclerClient) GetPod(name, namespace string) (*api.Pod, error) { return c.client.Pods(namespace).Get(name) } -func (c *realScrubberClient) DeletePod(name, namespace string) error { +func (c *realRecyclerClient) DeletePod(name, namespace string) error { return c.client.Pods(namespace).Delete(name, nil) } // WatchPod returns a ListWatch for watching a pod. The stopChannel is used // to close the reflector backing the watch. The caller is responsible for derring a close on the channel to // stop the reflector. -func (c *realScrubberClient) WatchPod(name, namespace, resourceVersion string, stopChannel chan struct{}) func() *api.Pod { +func (c *realRecyclerClient) WatchPod(name, namespace, resourceVersion string, stopChannel chan struct{}) func() *api.Pod { fieldSelector, _ := fields.ParseSelector("metadata.name=" + name) podLW := &cache.ListWatch{ @@ -122,3 +123,18 @@ func (c *realScrubberClient) WatchPod(name, namespace, resourceVersion string, s return obj.(*api.Pod) } } + +// CalculateTimeoutForVolume calculates time for a Recycler pod to complete a recycle operation. +// The calculation and return value is either the minimumTimeout or the timeoutIncrement per Gi of storage size, whichever is greater. +func CalculateTimeoutForVolume(minimumTimeout, timeoutIncrement int, pv *api.PersistentVolume) int64 { + giQty := resource.MustParse("1Gi") + pvQty := pv.Spec.Capacity[api.ResourceStorage] + giSize := giQty.Value() + pvSize := pvQty.Value() + timeout := (pvSize / giSize) * int64(timeoutIncrement) + if timeout < int64(minimumTimeout) { + return int64(minimumTimeout) + } else { + return timeout + } +} diff --git a/pkg/volume/util_test.go b/pkg/volume/util_test.go index 8fe7810fb38..aaef904837e 100644 --- a/pkg/volume/util_test.go +++ b/pkg/volume/util_test.go @@ -21,14 +21,15 @@ import ( "testing" "k8s.io/kubernetes/pkg/api" + "k8s.io/kubernetes/pkg/api/resource" "strings" ) -func TestScrubberSuccess(t *testing.T) { - client := &mockScrubberClient{} - scrubber := &api.Pod{ +func TestRecyclerSuccess(t *testing.T) { + client := &mockRecyclerClient{} + recycler := &api.Pod{ ObjectMeta: api.ObjectMeta{ - Name: "scrubber-test", + Name: "recycler-test", Namespace: api.NamespaceDefault, }, Status: api.PodStatus{ @@ -36,20 +37,20 @@ func TestScrubberSuccess(t *testing.T) { }, } - err := internalScrubPodVolumeAndWatchUntilCompletion(scrubber, client) + err := internalRecycleVolumeByWatchingPodUntilCompletion(recycler, client) if err != nil { - t.Errorf("Unexpected error watching scrubber pod: %+v", err) + t.Errorf("Unexpected error watching recycler pod: %+v", err) } if !client.deletedCalled { - t.Errorf("Expected deferred client.Delete to be called on scrubber pod") + t.Errorf("Expected deferred client.Delete to be called on recycler pod") } } -func TestScrubberFailure(t *testing.T) { - client := &mockScrubberClient{} - scrubber := &api.Pod{ +func TestRecyclerFailure(t *testing.T) { + client := &mockRecyclerClient{} + recycler := &api.Pod{ ObjectMeta: api.ObjectMeta{ - Name: "scrubber-test", + Name: "recycler-test", Namespace: api.NamespaceDefault, }, Status: api.PodStatus{ @@ -58,31 +59,31 @@ func TestScrubberFailure(t *testing.T) { }, } - err := internalScrubPodVolumeAndWatchUntilCompletion(scrubber, client) + err := internalRecycleVolumeByWatchingPodUntilCompletion(recycler, client) if err == nil { t.Fatalf("Expected pod failure but got nil error returned") } if err != nil { if !strings.Contains(err.Error(), "foo") { - t.Errorf("Expected pod.Status.Message %s but got %s", scrubber.Status.Message, err) + t.Errorf("Expected pod.Status.Message %s but got %s", recycler.Status.Message, err) } } if !client.deletedCalled { - t.Errorf("Expected deferred client.Delete to be called on scrubber pod") + t.Errorf("Expected deferred client.Delete to be called on recycler pod") } } -type mockScrubberClient struct { +type mockRecyclerClient struct { pod *api.Pod deletedCalled bool } -func (c *mockScrubberClient) CreatePod(pod *api.Pod) (*api.Pod, error) { +func (c *mockRecyclerClient) CreatePod(pod *api.Pod) (*api.Pod, error) { c.pod = pod return c.pod, nil } -func (c *mockScrubberClient) GetPod(name, namespace string) (*api.Pod, error) { +func (c *mockRecyclerClient) GetPod(name, namespace string) (*api.Pod, error) { if c.pod != nil { return c.pod, nil } else { @@ -90,13 +91,40 @@ func (c *mockScrubberClient) GetPod(name, namespace string) (*api.Pod, error) { } } -func (c *mockScrubberClient) DeletePod(name, namespace string) error { +func (c *mockRecyclerClient) DeletePod(name, namespace string) error { c.deletedCalled = true return nil } -func (c *mockScrubberClient) WatchPod(name, namespace, resourceVersion string, stopChannel chan struct{}) func() *api.Pod { +func (c *mockRecyclerClient) WatchPod(name, namespace, resourceVersion string, stopChannel chan struct{}) func() *api.Pod { return func() *api.Pod { return c.pod } } + +func TestCalculateTimeoutForVolume(t *testing.T) { + pv := &api.PersistentVolume{ + Spec: api.PersistentVolumeSpec{ + Capacity: api.ResourceList{ + api.ResourceName(api.ResourceStorage): resource.MustParse("500M"), + }, + }, + } + + timeout := CalculateTimeoutForVolume(50, 30, pv) + if timeout != 50 { + t.Errorf("Expected 50 for timeout but got %v", timeout) + } + + pv.Spec.Capacity[api.ResourceStorage] = resource.MustParse("2Gi") + timeout = CalculateTimeoutForVolume(50, 30, pv) + if timeout != 60 { + t.Errorf("Expected 60 for timeout but got %v", timeout) + } + + pv.Spec.Capacity[api.ResourceStorage] = resource.MustParse("150Gi") + timeout = CalculateTimeoutForVolume(50, 30, pv) + if timeout != 4500 { + t.Errorf("Expected 4500 for timeout but got %v", timeout) + } +} diff --git a/test/integration/persistent_volumes_test.go b/test/integration/persistent_volumes_test.go index fdd1f406dd8..7f7fd98cf17 100644 --- a/test/integration/persistent_volumes_test.go +++ b/test/integration/persistent_volumes_test.go @@ -29,6 +29,8 @@ import ( "k8s.io/kubernetes/pkg/controller/persistentvolume" "k8s.io/kubernetes/pkg/fields" "k8s.io/kubernetes/pkg/labels" + "k8s.io/kubernetes/pkg/volume" + "k8s.io/kubernetes/pkg/watch" ) func init() { @@ -205,3 +207,65 @@ func createTestVolumes() []*api.PersistentVolume { }, } } + +func TestPersistentVolumeRecycler(t *testing.T) { + _, s := runAMaster(t) + defer s.Close() + + deleteAllEtcdKeys() + client := client.NewOrDie(&client.Config{Host: s.URL, Version: testapi.Default.Version()}) + + binder := volumeclaimbinder.NewPersistentVolumeClaimBinder(client, 1*time.Second) + binder.Run() + defer binder.Stop() + + recycler, _ := volumeclaimbinder.NewPersistentVolumeRecycler(client, 1*time.Second, []volume.VolumePlugin{&volume.FakeVolumePlugin{"plugin-name", volume.NewFakeVolumeHost("/tmp/fake", nil, nil)}}) + recycler.Run() + defer recycler.Stop() + + // This PV will be claimed, released, and recycled. + pv := &api.PersistentVolume{ + ObjectMeta: api.ObjectMeta{Name: "fake-pv"}, + Spec: api.PersistentVolumeSpec{ + PersistentVolumeSource: api.PersistentVolumeSource{HostPath: &api.HostPathVolumeSource{Path: "foo"}}, + Capacity: api.ResourceList{api.ResourceName(api.ResourceStorage): resource.MustParse("10G")}, + AccessModes: []api.PersistentVolumeAccessMode{api.ReadWriteOnce}, + PersistentVolumeReclaimPolicy: api.PersistentVolumeReclaimRecycle, + }, + } + + pvc := &api.PersistentVolumeClaim{ + ObjectMeta: api.ObjectMeta{Name: "fake-pvc"}, + Spec: api.PersistentVolumeClaimSpec{ + Resources: api.ResourceRequirements{Requests: api.ResourceList{api.ResourceName(api.ResourceStorage): resource.MustParse("5G")}}, + AccessModes: []api.PersistentVolumeAccessMode{api.ReadWriteOnce}, + }, + } + + watch, _ := client.PersistentVolumes().Watch(labels.Everything(), fields.Everything(), "0") + defer watch.Stop() + + _, _ = client.PersistentVolumes().Create(pv) + _, _ = client.PersistentVolumeClaims(api.NamespaceDefault).Create(pvc) + + // wait until the binder pairs the volume and claim + waitForPersistentVolumePhase(watch, api.VolumeBound) + + // deleting a claim releases the volume, after which it can be recycled + if err := client.PersistentVolumeClaims(api.NamespaceDefault).Delete(pvc.Name); err != nil { + t.Errorf("error deleting claim %s", pvc.Name) + } + + waitForPersistentVolumePhase(watch, api.VolumeReleased) + waitForPersistentVolumePhase(watch, api.VolumeAvailable) +} + +func waitForPersistentVolumePhase(w watch.Interface, phase api.PersistentVolumePhase) { + for { + event := <-w.ResultChan() + volume := event.Object.(*api.PersistentVolume) + if volume.Status.Phase == phase { + break + } + } +}