From 0ee9160f8833249e596912b405cc5d6696594af2 Mon Sep 17 00:00:00 2001 From: Jan Safranek Date: Thu, 19 May 2016 12:58:25 +0200 Subject: [PATCH] volume recycler: Don't start a new recycler pod if one already exists. Recycling is a long duration process and when the recycler controller is restarted in the meantime, it should not start a new recycler pod if there is one already running. This means that the recycler pod must have deterministic name based on name of the recycled PV, we then get name conflicts when creating the pod. Two things need to be changed: - recycler controller and recycler plugins must pass the PV.Name to place, where the pod is created. - create recycler pod with deterministic name and check "already exists" error. When at it, remove useless 'resourceVersion' argument and make log messages starting with lowercase. --- pkg/controller/persistentvolume/controller.go | 2 +- .../persistentvolume/framework_test.go | 5 +- pkg/volume/host_path/host_path.go | 15 +++-- pkg/volume/host_path/host_path_test.go | 2 +- pkg/volume/nfs/nfs.go | 12 ++-- pkg/volume/nfs/nfs_test.go | 4 +- pkg/volume/plugins.go | 5 +- pkg/volume/testing/testing.go | 4 +- pkg/volume/util.go | 53 ++++++++++----- pkg/volume/util_test.go | 66 +++++++++++++++++-- 10 files changed, 125 insertions(+), 43 deletions(-) diff --git a/pkg/controller/persistentvolume/controller.go b/pkg/controller/persistentvolume/controller.go index e61a30baa65..eb499fc89b2 100644 --- a/pkg/controller/persistentvolume/controller.go +++ b/pkg/controller/persistentvolume/controller.go @@ -865,7 +865,7 @@ func (ctrl *PersistentVolumeController) recycleVolumeOperation(arg interface{}) } // Plugin found - recycler, err := plugin.NewRecycler(spec) + recycler, err := plugin.NewRecycler(volume.Name, spec) if err != nil { // Cannot create recycler strerr := fmt.Sprintf("Failed to create recycler: %v", err) diff --git a/pkg/controller/persistentvolume/framework_test.go b/pkg/controller/persistentvolume/framework_test.go index aea5dc4e8bd..6096799dc93 100644 --- a/pkg/controller/persistentvolume/framework_test.go +++ b/pkg/controller/persistentvolume/framework_test.go @@ -882,6 +882,9 @@ type mockVolumePlugin struct { } var _ vol.VolumePlugin = &mockVolumePlugin{} +var _ vol.RecyclableVolumePlugin = &mockVolumePlugin{} +var _ vol.DeletableVolumePlugin = &mockVolumePlugin{} +var _ vol.ProvisionableVolumePlugin = &mockVolumePlugin{} func (plugin *mockVolumePlugin) Init(host vol.VolumeHost) error { return nil @@ -981,7 +984,7 @@ func (plugin *mockVolumePlugin) GetMetrics() (*vol.Metrics, error) { // Recycler interfaces -func (plugin *mockVolumePlugin) NewRecycler(spec *vol.Spec) (vol.Recycler, error) { +func (plugin *mockVolumePlugin) NewRecycler(pvName string, spec *vol.Spec) (vol.Recycler, error) { if len(plugin.recycleCalls) > 0 { // mockVolumePlugin directly implements Recycler interface glog.V(4).Infof("mock plugin NewRecycler called, returning mock recycler") diff --git a/pkg/volume/host_path/host_path.go b/pkg/volume/host_path/host_path.go index ff2d9c667ff..8a6cae457dc 100644 --- a/pkg/volume/host_path/host_path.go +++ b/pkg/volume/host_path/host_path.go @@ -43,7 +43,7 @@ func ProbeVolumePlugins(volumeConfig volume.VolumeConfig) []volume.VolumePlugin } } -func ProbeRecyclableVolumePlugins(recyclerFunc func(spec *volume.Spec, host volume.VolumeHost, volumeConfig volume.VolumeConfig) (volume.Recycler, error), volumeConfig volume.VolumeConfig) []volume.VolumePlugin { +func ProbeRecyclableVolumePlugins(recyclerFunc func(pvName string, spec *volume.Spec, host volume.VolumeHost, volumeConfig volume.VolumeConfig) (volume.Recycler, error), volumeConfig volume.VolumeConfig) []volume.VolumePlugin { return []volume.VolumePlugin{ &hostPathPlugin{ host: nil, @@ -57,7 +57,7 @@ func ProbeRecyclableVolumePlugins(recyclerFunc func(spec *volume.Spec, host volu type hostPathPlugin struct { host volume.VolumeHost // decouple creating Recyclers/Deleters/Provisioners by deferring to a function. Allows for easier testing. - newRecyclerFunc func(spec *volume.Spec, host volume.VolumeHost, volumeConfig volume.VolumeConfig) (volume.Recycler, error) + newRecyclerFunc func(pvName string, spec *volume.Spec, host volume.VolumeHost, volumeConfig volume.VolumeConfig) (volume.Recycler, error) newDeleterFunc func(spec *volume.Spec, host volume.VolumeHost) (volume.Deleter, error) newProvisionerFunc func(options volume.VolumeOptions, host volume.VolumeHost) (volume.Provisioner, error) config volume.VolumeConfig @@ -115,8 +115,8 @@ func (plugin *hostPathPlugin) NewUnmounter(volName string, podUID types.UID) (vo }}, nil } -func (plugin *hostPathPlugin) NewRecycler(spec *volume.Spec) (volume.Recycler, error) { - return plugin.newRecyclerFunc(spec, plugin.host, plugin.config) +func (plugin *hostPathPlugin) NewRecycler(pvName string, spec *volume.Spec) (volume.Recycler, error) { + return plugin.newRecyclerFunc(pvName, spec, plugin.host, plugin.config) } func (plugin *hostPathPlugin) NewDeleter(spec *volume.Spec) (volume.Deleter, error) { @@ -130,7 +130,7 @@ func (plugin *hostPathPlugin) NewProvisioner(options volume.VolumeOptions) (volu return plugin.newProvisionerFunc(options, plugin.host) } -func newRecycler(spec *volume.Spec, host volume.VolumeHost, config volume.VolumeConfig) (volume.Recycler, error) { +func newRecycler(pvName string, spec *volume.Spec, host volume.VolumeHost, config volume.VolumeConfig) (volume.Recycler, error) { if spec.PersistentVolume == nil || spec.PersistentVolume.Spec.HostPath == nil { return nil, fmt.Errorf("spec.PersistentVolumeSource.HostPath is nil") } @@ -141,6 +141,7 @@ func newRecycler(spec *volume.Spec, host volume.VolumeHost, config volume.Volume host: host, config: config, timeout: volume.CalculateTimeoutForVolume(config.RecyclerMinimumTimeout, config.RecyclerTimeoutIncrement, spec.PersistentVolume), + pvName: pvName, }, nil } @@ -221,6 +222,7 @@ type hostPathRecycler struct { config volume.VolumeConfig timeout int64 volume.MetricsNil + pvName string } func (r *hostPathRecycler) GetPath() string { @@ -234,13 +236,12 @@ func (r *hostPathRecycler) Recycle() error { pod := r.config.RecyclerPodTemplate // overrides pod.Spec.ActiveDeadlineSeconds = &r.timeout - pod.GenerateName = "pv-recycler-hostpath-" pod.Spec.Volumes[0].VolumeSource = api.VolumeSource{ HostPath: &api.HostPathVolumeSource{ Path: r.path, }, } - return volume.RecycleVolumeByWatchingPodUntilCompletion(pod, r.host.GetKubeClient()) + return volume.RecycleVolumeByWatchingPodUntilCompletion(r.pvName, pod, r.host.GetKubeClient()) } // hostPathProvisioner implements a Provisioner for the HostPath plugin diff --git a/pkg/volume/host_path/host_path_test.go b/pkg/volume/host_path/host_path_test.go index 6a6e9e2f3bb..1f4e8922756 100644 --- a/pkg/volume/host_path/host_path_test.go +++ b/pkg/volume/host_path/host_path_test.go @@ -77,7 +77,7 @@ func TestRecycler(t *testing.T) { if err != nil { t.Errorf("Can't find the plugin by name") } - recycler, err := plug.NewRecycler(spec) + recycler, err := plug.NewRecycler("pv-name", spec) if err != nil { t.Errorf("Failed to make a new Recyler: %v", err) } diff --git a/pkg/volume/nfs/nfs.go b/pkg/volume/nfs/nfs.go index 09a6af5017e..6b787c6bc39 100644 --- a/pkg/volume/nfs/nfs.go +++ b/pkg/volume/nfs/nfs.go @@ -46,7 +46,7 @@ func ProbeVolumePlugins(volumeConfig volume.VolumeConfig) []volume.VolumePlugin type nfsPlugin struct { host volume.VolumeHost // decouple creating recyclers by deferring to a function. Allows for easier testing. - newRecyclerFunc func(spec *volume.Spec, host volume.VolumeHost, volumeConfig volume.VolumeConfig) (volume.Recycler, error) + newRecyclerFunc func(pvName string, spec *volume.Spec, host volume.VolumeHost, volumeConfig volume.VolumeConfig) (volume.Recycler, error) config volume.VolumeConfig } @@ -120,8 +120,8 @@ func (plugin *nfsPlugin) newUnmounterInternal(volName string, podUID types.UID, }}, nil } -func (plugin *nfsPlugin) NewRecycler(spec *volume.Spec) (volume.Recycler, error) { - return plugin.newRecyclerFunc(spec, plugin.host, plugin.config) +func (plugin *nfsPlugin) NewRecycler(pvName string, spec *volume.Spec) (volume.Recycler, error) { + return plugin.newRecyclerFunc(pvName, spec, plugin.host, plugin.config) } // NFS volumes represent a bare host file or directory mount of an NFS export. @@ -250,7 +250,7 @@ func (c *nfsUnmounter) TearDownAt(dir string) error { return nil } -func newRecycler(spec *volume.Spec, host volume.VolumeHost, volumeConfig volume.VolumeConfig) (volume.Recycler, error) { +func newRecycler(pvName string, spec *volume.Spec, host volume.VolumeHost, volumeConfig volume.VolumeConfig) (volume.Recycler, error) { if spec.PersistentVolume == nil || spec.PersistentVolume.Spec.NFS == nil { return nil, fmt.Errorf("spec.PersistentVolumeSource.NFS is nil") } @@ -261,6 +261,7 @@ func newRecycler(spec *volume.Spec, host volume.VolumeHost, volumeConfig volume. host: host, config: volumeConfig, timeout: volume.CalculateTimeoutForVolume(volumeConfig.RecyclerMinimumTimeout, volumeConfig.RecyclerTimeoutIncrement, spec.PersistentVolume), + pvName: pvName, }, nil } @@ -273,6 +274,7 @@ type nfsRecycler struct { config volume.VolumeConfig timeout int64 volume.MetricsNil + pvName string } func (r *nfsRecycler) GetPath() string { @@ -292,5 +294,5 @@ func (r *nfsRecycler) Recycle() error { Path: r.path, }, } - return volume.RecycleVolumeByWatchingPodUntilCompletion(pod, r.host.GetKubeClient()) + return volume.RecycleVolumeByWatchingPodUntilCompletion(r.pvName, pod, r.host.GetKubeClient()) } diff --git a/pkg/volume/nfs/nfs_test.go b/pkg/volume/nfs/nfs_test.go index 4a045148c2a..29b6fdf2338 100644 --- a/pkg/volume/nfs/nfs_test.go +++ b/pkg/volume/nfs/nfs_test.go @@ -91,7 +91,7 @@ func TestRecycler(t *testing.T) { if err != nil { t.Errorf("Can't find the plugin by name") } - recycler, err := plug.NewRecycler(spec) + recycler, err := plug.NewRecycler("pv-name", spec) if err != nil { t.Errorf("Failed to make a new Recyler: %v", err) } @@ -103,7 +103,7 @@ func TestRecycler(t *testing.T) { } } -func newMockRecycler(spec *volume.Spec, host volume.VolumeHost, config volume.VolumeConfig) (volume.Recycler, error) { +func newMockRecycler(pvName string, spec *volume.Spec, host volume.VolumeHost, config volume.VolumeConfig) (volume.Recycler, error) { return &mockRecycler{ path: spec.PersistentVolume.Spec.NFS.Path, }, nil diff --git a/pkg/volume/plugins.go b/pkg/volume/plugins.go index c9195f6c28b..31a69e1b173 100644 --- a/pkg/volume/plugins.go +++ b/pkg/volume/plugins.go @@ -101,7 +101,7 @@ type RecyclableVolumePlugin interface { VolumePlugin // NewRecycler creates a new volume.Recycler which knows how to reclaim this resource // after the volume's release from a PersistentVolumeClaim - NewRecycler(spec *Spec) (Recycler, error) + NewRecycler(pvName string, spec *Spec) (Recycler, error) } // DeletableVolumePlugin is an extended interface of VolumePlugin and is used by persistent volumes that want @@ -238,6 +238,9 @@ type VolumeConfig struct { // Example: 5Gi volume x 30s increment = 150s + 30s minimum = 180s ActiveDeadlineSeconds for recycler pod RecyclerTimeoutIncrement int + // PVName is name of the PersistentVolume instance that is being recycled. It is used to generate unique recycler pod name. + PVName string + // OtherAttributes stores config as strings. These strings are opaque to the system and only understood by the binary // hosting the plugin and the plugin itself. OtherAttributes map[string]string diff --git a/pkg/volume/testing/testing.go b/pkg/volume/testing/testing.go index f1a420cf28b..63e96c3f485 100644 --- a/pkg/volume/testing/testing.go +++ b/pkg/volume/testing/testing.go @@ -199,7 +199,7 @@ func (plugin *FakeVolumePlugin) NewDetacher() (Detacher, error) { return plugin.getFakeVolume(&plugin.Detachers), nil } -func (plugin *FakeVolumePlugin) NewRecycler(spec *Spec) (Recycler, error) { +func (plugin *FakeVolumePlugin) NewRecycler(pvName string, spec *Spec) (Recycler, error) { return &fakeRecycler{"/attributesTransferredFromSpec", MetricsNil{}}, nil } @@ -312,7 +312,7 @@ func (fr *fakeRecycler) GetPath() string { return fr.path } -func NewFakeRecycler(spec *Spec, host VolumeHost, config VolumeConfig) (Recycler, error) { +func NewFakeRecycler(pvName string, spec *Spec, host VolumeHost, config VolumeConfig) (Recycler, error) { if spec.PersistentVolume == nil || spec.PersistentVolume.Spec.HostPath == nil { return nil, fmt.Errorf("fakeRecycler only supports spec.PersistentVolume.Spec.HostPath") } diff --git a/pkg/volume/util.go b/pkg/volume/util.go index 7dc454d3cd1..c4a532fa16e 100644 --- a/pkg/volume/util.go +++ b/pkg/volume/util.go @@ -28,31 +28,52 @@ import ( "k8s.io/kubernetes/pkg/watch" "github.com/golang/glog" + "k8s.io/kubernetes/pkg/api/errors" "k8s.io/kubernetes/pkg/api/resource" ) -// RecycleVolumeByWatchingPodUntilCompletion is intended for use with volume Recyclers. This function will -// save the given Pod to the API and watch it until it completes, fails, or the pod's ActiveDeadlineSeconds is exceeded, whichever comes first. -// An attempt to delete a recycler pod is always attempted before returning. -// pod - the pod designed by a volume plugin to recycle the volume +// RecycleVolumeByWatchingPodUntilCompletion is intended for use with volume +// Recyclers. This function will save the given Pod to the API and watch it +// until it completes, fails, or the pod's ActiveDeadlineSeconds is exceeded, +// whichever comes first. An attempt to delete a recycler pod is always +// attempted before returning. +// +// In case there is a pod with the same namespace+name already running, this +// function assumes it's an older instance of the recycler pod and watches this +// old pod instead of starting a new one. +// +// pod - the pod designed by a volume plugin to recycle the volume. pod.Name +// will be overwritten with unique name based on PV.Name. // client - kube client for API operations. -func RecycleVolumeByWatchingPodUntilCompletion(pod *api.Pod, kubeClient clientset.Interface) error { - return internalRecycleVolumeByWatchingPodUntilCompletion(pod, newRecyclerClient(kubeClient)) +func RecycleVolumeByWatchingPodUntilCompletion(pvName string, pod *api.Pod, kubeClient clientset.Interface) error { + return internalRecycleVolumeByWatchingPodUntilCompletion(pvName, pod, newRecyclerClient(kubeClient)) } // same as above func comments, except 'recyclerClient' is a narrower pod API interface to ease testing -func internalRecycleVolumeByWatchingPodUntilCompletion(pod *api.Pod, recyclerClient recyclerClient) error { - glog.V(5).Infof("Creating recycler pod for volume %s\n", pod.Name) - pod, err := recyclerClient.CreatePod(pod) - if err != nil { - return fmt.Errorf("Unexpected error creating recycler pod: %+v\n", err) - } +func internalRecycleVolumeByWatchingPodUntilCompletion(pvName string, pod *api.Pod, recyclerClient recyclerClient) error { + glog.V(5).Infof("creating recycler pod for volume %s\n", pod.Name) + // Generate unique name for the recycler pod - we need to get "already + // exists" error when a previous controller has already started recycling + // the volume. Here we assume that pv.Name is already unique. + pod.Name = "recycler-for-" + pvName + pod.GenerateName = "" + + // Start the pod + _, err := recyclerClient.CreatePod(pod) + if err != nil { + if errors.IsAlreadyExists(err) { + glog.V(5).Infof("old recycler pod %q found for volume", pod.Name) + } else { + return fmt.Errorf("Unexpected error creating recycler pod: %+v\n", err) + } + } defer recyclerClient.DeletePod(pod.Name, pod.Namespace) + // Now only the old pod or the new pod run. Watch it until it finishes. stopChannel := make(chan struct{}) defer close(stopChannel) - nextPod := recyclerClient.WatchPod(pod.Name, pod.Namespace, pod.ResourceVersion, stopChannel) + nextPod := recyclerClient.WatchPod(pod.Name, pod.Namespace, stopChannel) for { watchedPod := nextPod() @@ -65,7 +86,7 @@ func internalRecycleVolumeByWatchingPodUntilCompletion(pod *api.Pod, recyclerCli if watchedPod.Status.Message != "" { return fmt.Errorf(watchedPod.Status.Message) } else { - return fmt.Errorf("Pod failed, pod.Status.Message unknown.") + return fmt.Errorf("pod failed, pod.Status.Message unknown.") } } } @@ -77,7 +98,7 @@ type recyclerClient interface { CreatePod(pod *api.Pod) (*api.Pod, error) GetPod(name, namespace string) (*api.Pod, error) DeletePod(name, namespace string) error - WatchPod(name, namespace, resourceVersion string, stopChannel chan struct{}) func() *api.Pod + WatchPod(name, namespace string, stopChannel chan struct{}) func() *api.Pod } func newRecyclerClient(client clientset.Interface) recyclerClient { @@ -103,7 +124,7 @@ func (c *realRecyclerClient) DeletePod(name, namespace string) error { // WatchPod returns a ListWatch for watching a pod. The stopChannel is used // to close the reflector backing the watch. The caller is responsible for derring a close on the channel to // stop the reflector. -func (c *realRecyclerClient) WatchPod(name, namespace, resourceVersion string, stopChannel chan struct{}) func() *api.Pod { +func (c *realRecyclerClient) WatchPod(name, namespace string, stopChannel chan struct{}) func() *api.Pod { fieldSelector, _ := fields.ParseSelector("metadata.name=" + name) podLW := &cache.ListWatch{ diff --git a/pkg/volume/util_test.go b/pkg/volume/util_test.go index faccd021ad2..aab1be192e8 100644 --- a/pkg/volume/util_test.go +++ b/pkg/volume/util_test.go @@ -22,6 +22,7 @@ import ( "testing" "k8s.io/kubernetes/pkg/api" + "k8s.io/kubernetes/pkg/api/errors" "k8s.io/kubernetes/pkg/api/resource" ) @@ -29,7 +30,6 @@ func TestRecyclerSuccess(t *testing.T) { client := &mockRecyclerClient{} recycler := &api.Pod{ ObjectMeta: api.ObjectMeta{ - Name: "recycler-test", Namespace: api.NamespaceDefault, }, Status: api.PodStatus{ @@ -37,7 +37,7 @@ func TestRecyclerSuccess(t *testing.T) { }, } - err := internalRecycleVolumeByWatchingPodUntilCompletion(recycler, client) + err := internalRecycleVolumeByWatchingPodUntilCompletion("pv-name", recycler, client) if err != nil { t.Errorf("Unexpected error watching recycler pod: %+v", err) } @@ -50,7 +50,6 @@ func TestRecyclerFailure(t *testing.T) { client := &mockRecyclerClient{} recycler := &api.Pod{ ObjectMeta: api.ObjectMeta{ - Name: "recycler-test", Namespace: api.NamespaceDefault, }, Status: api.PodStatus{ @@ -59,7 +58,7 @@ func TestRecyclerFailure(t *testing.T) { }, } - err := internalRecycleVolumeByWatchingPodUntilCompletion(recycler, client) + err := internalRecycleVolumeByWatchingPodUntilCompletion("pv-name", recycler, client) if err == nil { t.Fatalf("Expected pod failure but got nil error returned") } @@ -73,14 +72,67 @@ func TestRecyclerFailure(t *testing.T) { } } +func TestRecyclerAlreadyExists(t *testing.T) { + // Test that internalRecycleVolumeByWatchingPodUntilCompletion does not + // start a new recycler when an old one is already running. + + // Old recycler is running and fails with "foo" error message + oldRecycler := &api.Pod{ + ObjectMeta: api.ObjectMeta{ + Name: "recycler-test", + Namespace: api.NamespaceDefault, + }, + Status: api.PodStatus{ + Phase: api.PodFailed, + Message: "foo", + }, + } + + // New recycler _would_ succeed if it was run + newRecycler := &api.Pod{ + ObjectMeta: api.ObjectMeta{ + Name: "recycler-test", + Namespace: api.NamespaceDefault, + }, + Status: api.PodStatus{ + Phase: api.PodSucceeded, + Message: "bar", + }, + } + + client := &mockRecyclerClient{ + pod: oldRecycler, + } + + err := internalRecycleVolumeByWatchingPodUntilCompletion("pv-name", newRecycler, client) + if err == nil { + t.Fatalf("Expected pod failure but got nil error returned") + } + + // Check the recycler failed with "foo" error message, i.e. it was the + // old recycler that finished and not the new one. + if err != nil { + if !strings.Contains(err.Error(), "foo") { + t.Errorf("Expected pod.Status.Message %s but got %s", oldRecycler.Status.Message, err) + } + } + if !client.deletedCalled { + t.Errorf("Expected deferred client.Delete to be called on recycler pod") + } +} + type mockRecyclerClient struct { pod *api.Pod deletedCalled bool } func (c *mockRecyclerClient) CreatePod(pod *api.Pod) (*api.Pod, error) { - c.pod = pod - return c.pod, nil + if c.pod == nil { + c.pod = pod + return c.pod, nil + } + // Simulate "already exists" error + return nil, errors.NewAlreadyExists(api.Resource("pods"), pod.Name) } func (c *mockRecyclerClient) GetPod(name, namespace string) (*api.Pod, error) { @@ -96,7 +148,7 @@ func (c *mockRecyclerClient) DeletePod(name, namespace string) error { return nil } -func (c *mockRecyclerClient) WatchPod(name, namespace, resourceVersion string, stopChannel chan struct{}) func() *api.Pod { +func (c *mockRecyclerClient) WatchPod(name, namespace string, stopChannel chan struct{}) func() *api.Pod { return func() *api.Pod { return c.pod }