diff --git a/pkg/util/util.go b/pkg/util/util.go index 639ac8f3cf3..7ba9c78f3dd 100644 --- a/pkg/util/util.go +++ b/pkg/util/util.go @@ -508,3 +508,11 @@ func GetClient(req *http.Request) string { } return "unknown" } + +func ShortenString(str string, n int) string { + if len(str) <= n { + return str + } else { + return str[:n] + } +} diff --git a/pkg/volume/host_path/host_path.go b/pkg/volume/host_path/host_path.go index 214cd28e4f4..a0749b01560 100644 --- a/pkg/volume/host_path/host_path.go +++ b/pkg/volume/host_path/host_path.go @@ -18,23 +18,35 @@ package host_path import ( "fmt" + "time" "github.com/GoogleCloudPlatform/kubernetes/pkg/api" "github.com/GoogleCloudPlatform/kubernetes/pkg/types" + "github.com/GoogleCloudPlatform/kubernetes/pkg/util" "github.com/GoogleCloudPlatform/kubernetes/pkg/util/mount" "github.com/GoogleCloudPlatform/kubernetes/pkg/volume" ) // This is the primary entrypoint for volume plugins. +// Tests covering recycling should not use this func but instead +// use their own array of plugins w/ a custom recyclerFunc as appropriate func ProbeVolumePlugins() []volume.VolumePlugin { - return []volume.VolumePlugin{&hostPathPlugin{nil}} + return []volume.VolumePlugin{&hostPathPlugin{nil, newRecycler}} +} + +func ProbeRecyclableVolumePlugins(recyclerFunc func(spec *volume.Spec, host volume.VolumeHost) (volume.Recycler, error)) []volume.VolumePlugin { + return []volume.VolumePlugin{&hostPathPlugin{nil, recyclerFunc}} } type hostPathPlugin struct { host volume.VolumeHost + // decouple creating recyclers by deferring to a function. Allows for easier testing. + newRecyclerFunc func(spec *volume.Spec, host volume.VolumeHost) (volume.Recycler, error) } var _ volume.VolumePlugin = &hostPathPlugin{} +var _ volume.PersistentVolumePlugin = &hostPathPlugin{} +var _ volume.RecyclableVolumePlugin = &hostPathPlugin{} const ( hostPathPluginName = "kubernetes.io/host-path" @@ -70,6 +82,18 @@ func (plugin *hostPathPlugin) NewCleaner(volName string, podUID types.UID, _ mou return &hostPath{""}, nil } +func (plugin *hostPathPlugin) NewRecycler(spec *volume.Spec) (volume.Recycler, error) { + return plugin.newRecyclerFunc(spec, plugin.host) +} + +func newRecycler(spec *volume.Spec, host volume.VolumeHost) (volume.Recycler, error) { + if spec.VolumeSource.HostPath != nil { + return &hostPathRecycler{spec.Name, spec.VolumeSource.HostPath.Path, host}, nil + } else { + return &hostPathRecycler{spec.Name, spec.PersistentVolumeSource.HostPath.Path, host}, nil + } +} + // HostPath volumes represent a bare host file or directory mount. // The direct at the specified path will be directly exposed to the container. type hostPath struct { @@ -99,3 +123,64 @@ func (hp *hostPath) TearDown() error { func (hp *hostPath) TearDownAt(dir string) error { return fmt.Errorf("TearDownAt() does not make sense for host paths") } + +// hostPathRecycler scrubs a hostPath volume by running "rm -rf" on the volume in a pod +// This recycler only works on a single host cluster and is for testing purposes only. +type hostPathRecycler struct { + name string + path string + host volume.VolumeHost +} + +func (r *hostPathRecycler) GetPath() string { + return r.path +} + +// Recycler provides methods to reclaim the volume resource. +// A HostPath is recycled by scheduling a pod to run "rm -rf" on the contents of the volume. This is meant for +// development and testing in a single node cluster only. +// Recycle blocks until the pod has completed or any error occurs. +// The scrubber pod's is expected to succeed within 30 seconds when testing localhost. +func (r *hostPathRecycler) Recycle() error { + timeout := int64(30 * time.Second) + pod := &api.Pod{ + ObjectMeta: api.ObjectMeta{ + GenerateName: "pv-scrubber-" + util.ShortenString(r.name, 44) + "-", + Namespace: api.NamespaceDefault, + }, + Spec: api.PodSpec{ + ActiveDeadlineSeconds: &timeout, + RestartPolicy: api.RestartPolicyNever, + Volumes: []api.Volume{ + { + Name: "vol", + VolumeSource: api.VolumeSource{ + HostPath: &api.HostPathVolumeSource{r.path}, + }, + }, + }, + Containers: []api.Container{ + { + Name: "scrubber", + Image: "busybox", + // delete the contents of the volume, but not the directory itself + Command: []string{"/bin/sh"}, + // the scrubber: + // 1. validates the /scrub directory exists + // 2. creates a text file in the directory to be scrubbed + // 3. performs rm -rf on the directory + // 4. tests to see if the directory is empty + // the pod fails if the error code is returned + Args: []string{"-c", "test -e /scrub && echo $(date) > /scrub/trash.txt && rm -rf /scrub/* && test -z \"$(ls -A /scrub)\" || exit 1"}, + VolumeMounts: []api.VolumeMount{ + { + Name: "vol", + MountPath: "/scrub", + }, + }, + }, + }, + }, + } + return volume.ScrubPodVolumeAndWatchUntilCompletion(pod, r.host.GetKubeClient()) +} diff --git a/pkg/volume/host_path/host_path_test.go b/pkg/volume/host_path/host_path_test.go index 84ba54b1286..0dfeeec89c4 100644 --- a/pkg/volume/host_path/host_path_test.go +++ b/pkg/volume/host_path/host_path_test.go @@ -59,6 +59,47 @@ func TestGetAccessModes(t *testing.T) { } } +func TestRecycler(t *testing.T) { + plugMgr := volume.VolumePluginMgr{} + plugMgr.InitPlugins([]volume.VolumePlugin{&hostPathPlugin{nil, newMockRecycler}}, volume.NewFakeVolumeHost("/tmp/fake", nil, nil)) + + spec := &volume.Spec{PersistentVolumeSource: api.PersistentVolumeSource{HostPath: &api.HostPathVolumeSource{Path: "/foo"}}} + plug, err := plugMgr.FindRecyclablePluginBySpec(spec) + if err != nil { + t.Errorf("Can't find the plugin by name") + } + recycler, err := plug.NewRecycler(spec) + if err != nil { + t.Error("Failed to make a new Recyler: %v", err) + } + if recycler.GetPath() != spec.PersistentVolumeSource.HostPath.Path { + t.Errorf("Expected %s but got %s", spec.PersistentVolumeSource.HostPath.Path, recycler.GetPath()) + } + if err := recycler.Recycle(); err != nil { + t.Errorf("Mock Recycler expected to return nil but got %s", err) + } +} + +func newMockRecycler(spec *volume.Spec, host volume.VolumeHost) (volume.Recycler, error) { + return &mockRecycler{ + path: spec.PersistentVolumeSource.HostPath.Path, + }, nil +} + +type mockRecycler struct { + path string + host volume.VolumeHost +} + +func (r *mockRecycler) GetPath() string { + return r.path +} + +func (r *mockRecycler) Recycle() error { + // return nil means recycle passed + return nil +} + func TestPlugin(t *testing.T) { plugMgr := volume.VolumePluginMgr{} plugMgr.InitPlugins(ProbeVolumePlugins(), volume.NewFakeVolumeHost("fake", nil, nil)) diff --git a/pkg/volume/nfs/nfs.go b/pkg/volume/nfs/nfs.go index a3183112aab..74f0f3bb821 100644 --- a/pkg/volume/nfs/nfs.go +++ b/pkg/volume/nfs/nfs.go @@ -19,25 +19,33 @@ package nfs import ( "fmt" "os" + "time" "github.com/GoogleCloudPlatform/kubernetes/pkg/api" "github.com/GoogleCloudPlatform/kubernetes/pkg/types" "github.com/GoogleCloudPlatform/kubernetes/pkg/util" "github.com/GoogleCloudPlatform/kubernetes/pkg/util/mount" "github.com/GoogleCloudPlatform/kubernetes/pkg/volume" + "github.com/golang/glog" ) // This is the primary entrypoint for volume plugins. +// Tests covering recycling should not use this func but instead +// use their own array of plugins w/ a custom recyclerFunc as appropriate func ProbeVolumePlugins() []volume.VolumePlugin { - return []volume.VolumePlugin{&nfsPlugin{nil}} + return []volume.VolumePlugin{&nfsPlugin{nil, newRecycler}} } type nfsPlugin struct { host volume.VolumeHost + // decouple creating recyclers by deferring to a function. Allows for easier testing. + newRecyclerFunc func(spec *volume.Spec, host volume.VolumeHost) (volume.Recycler, error) } var _ volume.VolumePlugin = &nfsPlugin{} +var _ volume.PersistentVolumePlugin = &nfsPlugin{} +var _ volume.RecyclableVolumePlugin = &nfsPlugin{} const ( nfsPluginName = "kubernetes.io/nfs" @@ -103,6 +111,28 @@ func (plugin *nfsPlugin) newCleanerInternal(volName string, podUID types.UID, mo }, nil } +func (plugin *nfsPlugin) NewRecycler(spec *volume.Spec) (volume.Recycler, error) { + return plugin.newRecyclerFunc(spec, plugin.host) +} + +func newRecycler(spec *volume.Spec, host volume.VolumeHost) (volume.Recycler, error) { + if spec.VolumeSource.HostPath != nil { + return &nfsRecycler{ + name: spec.Name, + server: spec.VolumeSource.NFS.Server, + path: spec.VolumeSource.NFS.Path, + host: host, + }, nil + } else { + return &nfsRecycler{ + name: spec.Name, + server: spec.PersistentVolumeSource.NFS.Server, + path: spec.PersistentVolumeSource.NFS.Path, + host: host, + }, nil + } +} + // NFS volumes represent a bare host file or directory mount of an NFS export. type nfs struct { volName string @@ -112,6 +142,8 @@ type nfs struct { readOnly bool mounter mount.Interface plugin *nfsPlugin + // decouple creating recyclers by deferring to a function. Allows for easier testing. + newRecyclerFunc func(spec *volume.Spec, host volume.VolumeHost) (volume.Recycler, error) } // SetUp attaches the disk and bind mounts to the volume path. @@ -199,3 +231,66 @@ func (nfsVolume *nfs) TearDownAt(dir string) error { return nil } + +// nfsRecycler scrubs an NFS volume by running "rm -rf" on the volume in a pod. +type nfsRecycler struct { + name string + server string + path string + host volume.VolumeHost +} + +func (r *nfsRecycler) GetPath() string { + return r.path +} + +// Recycler provides methods to reclaim the volume resource. +// A NFS volume is recycled by scheduling a pod to run "rm -rf" on the contents of the volume. +// Recycle blocks until the pod has completed or any error occurs. +// The scrubber pod's is expected to succeed within 5 minutes else an error will be returned +func (r *nfsRecycler) Recycle() error { + timeout := int64(300 * time.Second) // 5 minutes + pod := &api.Pod{ + ObjectMeta: api.ObjectMeta{ + GenerateName: "pv-scrubber-" + util.ShortenString(r.name, 44) + "-", + Namespace: api.NamespaceDefault, + }, + Spec: api.PodSpec{ + ActiveDeadlineSeconds: &timeout, + RestartPolicy: api.RestartPolicyNever, + Volumes: []api.Volume{ + { + Name: "vol", + VolumeSource: api.VolumeSource{ + NFS: &api.NFSVolumeSource{ + Server: r.server, + Path: r.path, + }, + }, + }, + }, + Containers: []api.Container{ + { + Name: "scrubber", + Image: "busybox", + // delete the contents of the volume, but not the directory itself + Command: []string{"/bin/sh"}, + // the scrubber: + // 1. validates the /scrub directory exists + // 2. creates a text file to be scrubbed + // 3. performs rm -rf on the directory + // 4. tests to see if the directory is empty + // the pod fails if the error code is returned + Args: []string{"-c", "test -e /scrub && echo $(date) > /scrub/trash.txt && rm -rf /scrub/* && test -z \"$(ls -A /scrub)\" || exit 1"}, + VolumeMounts: []api.VolumeMount{ + { + Name: "vol", + MountPath: "/scrub", + }, + }, + }, + }, + }, + } + return volume.ScrubPodVolumeAndWatchUntilCompletion(pod, r.host.GetKubeClient()) +} diff --git a/pkg/volume/nfs/nfs_test.go b/pkg/volume/nfs/nfs_test.go index 948397eb3c6..2b7520def83 100644 --- a/pkg/volume/nfs/nfs_test.go +++ b/pkg/volume/nfs/nfs_test.go @@ -60,6 +60,47 @@ func TestGetAccessModes(t *testing.T) { } } +func TestRecycler(t *testing.T) { + plugMgr := volume.VolumePluginMgr{} + plugMgr.InitPlugins([]volume.VolumePlugin{&nfsPlugin{newRecyclerFunc: newMockRecycler}}, volume.NewFakeVolumeHost("/tmp/fake", nil, nil)) + + spec := &volume.Spec{PersistentVolumeSource: api.PersistentVolumeSource{NFS: &api.NFSVolumeSource{Path: "/foo"}}} + plug, err := plugMgr.FindRecyclablePluginBySpec(spec) + if err != nil { + t.Errorf("Can't find the plugin by name") + } + recycler, err := plug.NewRecycler(spec) + if err != nil { + t.Error("Failed to make a new Recyler: %v", err) + } + if recycler.GetPath() != spec.PersistentVolumeSource.NFS.Path { + t.Errorf("Expected %s but got %s", spec.PersistentVolumeSource.NFS.Path, recycler.GetPath()) + } + if err := recycler.Recycle(); err != nil { + t.Errorf("Mock Recycler expected to return nil but got %s", err) + } +} + +func newMockRecycler(spec *volume.Spec, host volume.VolumeHost) (volume.Recycler, error) { + return &mockRecycler{ + path: spec.PersistentVolumeSource.NFS.Path, + }, nil +} + +type mockRecycler struct { + path string + host volume.VolumeHost +} + +func (r *mockRecycler) GetPath() string { + return r.path +} + +func (r *mockRecycler) Recycle() error { + // return nil means recycle passed + return nil +} + func contains(modes []api.PersistentVolumeAccessMode, mode api.PersistentVolumeAccessMode) bool { for _, m := range modes { if m == mode { diff --git a/pkg/volume/util.go b/pkg/volume/util.go index 3efa3823937..d9a1c87ecf0 100644 --- a/pkg/volume/util.go +++ b/pkg/volume/util.go @@ -17,7 +17,18 @@ limitations under the License. package volume import ( + "fmt" + "time" + "github.com/GoogleCloudPlatform/kubernetes/pkg/api" + "github.com/GoogleCloudPlatform/kubernetes/pkg/client" + "github.com/GoogleCloudPlatform/kubernetes/pkg/client/cache" + "github.com/GoogleCloudPlatform/kubernetes/pkg/fields" + "github.com/GoogleCloudPlatform/kubernetes/pkg/labels" + "github.com/GoogleCloudPlatform/kubernetes/pkg/runtime" + "github.com/GoogleCloudPlatform/kubernetes/pkg/watch" + + "github.com/golang/glog" ) func GetAccessModesAsString(modes []api.PersistentVolumeAccessMode) string { @@ -51,3 +62,89 @@ func contains(modes []api.PersistentVolumeAccessMode, mode api.PersistentVolumeA } return false } + +// ScrubPodVolumeAndWatchUntilCompletion is intended for use with volume Recyclers. This function will +// save the given Pod to the API and watch it until it completes, fails, or the pod's ActiveDeadlineSeconds is exceeded, whichever comes first. +// An attempt to delete a scrubber pod is always attempted before returning. +// pod - the pod designed by a volume plugin to scrub the volume's contents +// client - kube client for API operations. +func ScrubPodVolumeAndWatchUntilCompletion(pod *api.Pod, kubeClient client.Interface) error { + return internalScrubPodVolumeAndWatchUntilCompletion(pod, newScrubberClient(kubeClient)) +} + +// same as above func comments, except 'scrubberClient' is a narrower pod API interface to ease testing +func internalScrubPodVolumeAndWatchUntilCompletion(pod *api.Pod, scrubberClient scrubberClient) error { + glog.V(5).Infof("Creating scrubber pod for volume %s\n", pod.Name) + pod, err := scrubberClient.CreatePod(pod) + if err != nil { + return fmt.Errorf("Unexpected error creating a pod to scrub volume %s: %+v\n", pod.Name, err) + } + + defer scrubberClient.DeletePod(pod.Name, pod.Namespace) + + nextPod := scrubberClient.WatchPod(pod.Name, pod.Namespace, pod.ResourceVersion) + for { + watchedPod := nextPod() + if watchedPod.Status.Phase == api.PodSucceeded { + // volume.Recycle() returns nil on success, else error + return nil + } + if watchedPod.Status.Phase == api.PodFailed { + // volume.Recycle() returns nil on success, else error + if watchedPod.Status.Message != "" { + return fmt.Errorf(watchedPod.Status.Message) + } else { + return fmt.Errorf("Pod failed, pod.Status.Message unknown.") + } + } + } +} + +// scrubberClient abstracts access to a Pod by providing a narrower interface. +// this makes it easier to mock a client for testing +type scrubberClient interface { + CreatePod(pod *api.Pod) (*api.Pod, error) + GetPod(name, namespace string) (*api.Pod, error) + DeletePod(name, namespace string) error + WatchPod(name, namespace, resourceVersion string) func() *api.Pod +} + +func newScrubberClient(client client.Interface) scrubberClient { + return &realScrubberClient{client} +} + +type realScrubberClient struct { + client client.Interface +} + +func (c *realScrubberClient) CreatePod(pod *api.Pod) (*api.Pod, error) { + return c.client.Pods(pod.Namespace).Create(pod) +} + +func (c *realScrubberClient) GetPod(name, namespace string) (*api.Pod, error) { + return c.client.Pods(namespace).Get(name) +} + +func (c *realScrubberClient) DeletePod(name, namespace string) error { + return c.client.Pods(namespace).Delete(name, nil) +} + +func (c *realScrubberClient) WatchPod(name, namespace, resourceVersion string) func() *api.Pod { + fieldSelector, _ := fields.ParseSelector("metadata.name=" + name) + + podLW := &cache.ListWatch{ + ListFunc: func() (runtime.Object, error) { + return c.client.Pods(namespace).List(labels.Everything(), fieldSelector) + }, + WatchFunc: func(resourceVersion string) (watch.Interface, error) { + return c.client.Pods(namespace).Watch(labels.Everything(), fieldSelector, resourceVersion) + }, + } + queue := cache.NewFIFO(cache.MetaNamespaceKeyFunc) + cache.NewReflector(podLW, &api.Pod{}, queue, 1*time.Minute).Run() + + return func() *api.Pod { + obj := queue.Pop() + return obj.(*api.Pod) + } +} diff --git a/pkg/volume/util_test.go b/pkg/volume/util_test.go new file mode 100644 index 00000000000..b921d013f51 --- /dev/null +++ b/pkg/volume/util_test.go @@ -0,0 +1,102 @@ +/* +Copyright 2015 The Kubernetes Authors All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package volume + +import ( + "fmt" + "testing" + + "github.com/GoogleCloudPlatform/kubernetes/pkg/api" + "strings" +) + +func TestScrubberSuccess(t *testing.T) { + client := &mockScrubberClient{} + scrubber := &api.Pod{ + ObjectMeta: api.ObjectMeta{ + Name: "scrubber-test", + Namespace: api.NamespaceDefault, + }, + Status: api.PodStatus{ + Phase: api.PodSucceeded, + }, + } + + err := internalScrubPodVolumeAndWatchUntilCompletion(scrubber, client) + if err != nil { + t.Errorf("Unexpected error watching scrubber pod: %+v", err) + } + if !client.deletedCalled { + t.Errorf("Expected deferred client.Delete to be called on scrubber pod") + } +} + +func TestScrubberFailure(t *testing.T) { + client := &mockScrubberClient{} + scrubber := &api.Pod{ + ObjectMeta: api.ObjectMeta{ + Name: "scrubber-test", + Namespace: api.NamespaceDefault, + }, + Status: api.PodStatus{ + Phase: api.PodFailed, + Message: "foo", + }, + } + + err := internalScrubPodVolumeAndWatchUntilCompletion(scrubber, client) + if err == nil { + t.Fatalf("Expected pod failure but got nil error returned") + } + if err != nil { + if !strings.Contains(err.Error(), "foo") { + t.Errorf("Expected pod.Status.Message %s but got %s", scrubber.Status.Message, err) + } + } + if !client.deletedCalled { + t.Errorf("Expected deferred client.Delete to be called on scrubber pod") + } +} + +type mockScrubberClient struct { + pod *api.Pod + deletedCalled bool +} + +func (c *mockScrubberClient) CreatePod(pod *api.Pod) (*api.Pod, error) { + c.pod = pod + return c.pod, nil +} + +func (c *mockScrubberClient) GetPod(name, namespace string) (*api.Pod, error) { + if c.pod != nil { + return c.pod, nil + } else { + return nil, fmt.Errorf("pod does not exist") + } +} + +func (c *mockScrubberClient) DeletePod(name, namespace string) error { + c.deletedCalled = true + return nil +} + +func (c *mockScrubberClient) WatchPod(name, namespace, resourceVersion string) func() *api.Pod { + return func() *api.Pod { + return c.pod + } +}