From 1725c23eb2355dc2124547d4a108ce6563602808 Mon Sep 17 00:00:00 2001 From: Tim Hockin Date: Sat, 7 Mar 2015 13:38:50 -0800 Subject: [PATCH] Implement volume plugin wrappers Convert git_repo and secret into wrappers around empty_dir. --- pkg/kubelet/volume/empty_dir/empty_dir.go | 84 +++++++++++-------- .../volume/empty_dir/empty_dir_test.go | 2 +- pkg/kubelet/volume/gce_pd/gce_pd.go | 42 ++++++---- pkg/kubelet/volume/gce_pd/gce_pd_test.go | 6 +- pkg/kubelet/volume/git_repo/git_repo.go | 59 ++++++++----- pkg/kubelet/volume/git_repo/git_repo_test.go | 3 +- pkg/kubelet/volume/host_path/host_path.go | 12 +++ .../volume/host_path/host_path_test.go | 4 +- pkg/kubelet/volume/plugins.go | 11 +++ pkg/kubelet/volume/secret/secret.go | 54 +++++++----- pkg/kubelet/volume/secret/secret_test.go | 6 +- pkg/kubelet/volume/testing.go | 59 +++++++++---- pkg/kubelet/volume/volume.go | 16 +++- pkg/kubelet/volumes.go | 66 ++++++++++----- 14 files changed, 286 insertions(+), 138 deletions(-) diff --git a/pkg/kubelet/volume/empty_dir/empty_dir.go b/pkg/kubelet/volume/empty_dir/empty_dir.go index d40eef169b4..fcfe2aab045 100644 --- a/pkg/kubelet/volume/empty_dir/empty_dir.go +++ b/pkg/kubelet/volume/empty_dir/empty_dir.go @@ -29,11 +29,21 @@ import ( // This is the primary entrypoint for volume plugins. func ProbeVolumePlugins() []volume.Plugin { - return []volume.Plugin{&emptyDirPlugin{nil, false}, &emptyDirPlugin{nil, true}} + return ProbeVolumePluginsWithMounter(mount.New()) +} + +// ProbePluginsWithMounter is a convenience for testing other plugins which wrap this one. +//FIXME: alternative: pass mount.Interface to all ProbeVolumePlugins() functions? Opinions? +func ProbeVolumePluginsWithMounter(mounter mount.Interface) []volume.Plugin { + return []volume.Plugin{ + &emptyDirPlugin{nil, mounter, false}, + &emptyDirPlugin{nil, mounter, true}, + } } type emptyDirPlugin struct { host volume.Host + mounter mount.Interface legacyMode bool // if set, plugin answers to the legacy name } @@ -72,7 +82,7 @@ func (plugin *emptyDirPlugin) CanSupport(spec *api.Volume) bool { func (plugin *emptyDirPlugin) NewBuilder(spec *api.Volume, podRef *api.ObjectReference) (volume.Builder, error) { // Inject real implementations here, test through the internal function. - return plugin.newBuilderInternal(spec, podRef, mount.New(), &realMediumer{}) + return plugin.newBuilderInternal(spec, podRef, plugin.mounter, &realMediumer{}) } func (plugin *emptyDirPlugin) newBuilderInternal(spec *api.Volume, podRef *api.ObjectReference, mounter mount.Interface, mediumer mediumer) (volume.Builder, error) { @@ -88,8 +98,8 @@ func (plugin *emptyDirPlugin) newBuilderInternal(spec *api.Volume, podRef *api.O podUID: podRef.UID, volName: spec.Name, medium: medium, - mediumer: mediumer, mounter: mounter, + mediumer: mediumer, plugin: plugin, legacyMode: false, }, nil @@ -97,7 +107,7 @@ func (plugin *emptyDirPlugin) newBuilderInternal(spec *api.Volume, podRef *api.O func (plugin *emptyDirPlugin) NewCleaner(volName string, podUID types.UID) (volume.Cleaner, error) { // Inject real implementations here, test through the internal function. - return plugin.newCleanerInternal(volName, podUID, mount.New(), &realMediumer{}) + return plugin.newCleanerInternal(volName, podUID, plugin.mounter, &realMediumer{}) } func (plugin *emptyDirPlugin) newCleanerInternal(volName string, podUID types.UID, mounter mount.Interface, mediumer mediumer) (volume.Cleaner, error) { @@ -114,17 +124,6 @@ func (plugin *emptyDirPlugin) newCleanerInternal(volName string, podUID types.UI plugin: plugin, legacyMode: legacy, } - // Figure out the medium. - if medium, err := mediumer.GetMedium(ed.GetPath()); err != nil { - return nil, err - } else { - switch medium { - case mediumMemory: - ed.medium = api.StorageTypeMemory - default: - // assume StorageTypeDefault - } - } return ed, nil } @@ -154,37 +153,42 @@ type emptyDir struct { // SetUp creates new directory. func (ed *emptyDir) SetUp() error { + return ed.SetUpAt(ed.GetPath()) +} + +// SetUpAt creates new directory. +func (ed *emptyDir) SetUpAt(dir string) error { if ed.legacyMode { return fmt.Errorf("legacy mode: can not create new instances") } switch ed.medium { case api.StorageTypeDefault: - return ed.setupDefault() + return ed.setupDefault(dir) case api.StorageTypeMemory: - return ed.setupTmpfs() + return ed.setupTmpfs(dir) default: return fmt.Errorf("unknown storage medium %q", ed.medium) } } -func (ed *emptyDir) setupDefault() error { - return os.MkdirAll(ed.GetPath(), 0750) +func (ed *emptyDir) setupDefault(dir string) error { + return os.MkdirAll(dir, 0750) } -func (ed *emptyDir) setupTmpfs() error { +func (ed *emptyDir) setupTmpfs(dir string) error { if ed.mounter == nil { return fmt.Errorf("memory storage requested, but mounter is nil") } - if err := os.MkdirAll(ed.GetPath(), 0750); err != nil { + if err := os.MkdirAll(dir, 0750); err != nil { return err } // Make SetUp idempotent. - if medium, err := ed.mediumer.GetMedium(ed.GetPath()); err != nil { + if medium, err := ed.mediumer.GetMedium(dir); err != nil { return err } else if medium == mediumMemory { return nil // current state is what we expect } - return ed.mounter.Mount("tmpfs", ed.GetPath(), "tmpfs", 0, "") + return ed.mounter.Mount("tmpfs", dir, "tmpfs", 0, "") } func (ed *emptyDir) GetPath() string { @@ -197,18 +201,28 @@ func (ed *emptyDir) GetPath() string { // TearDown simply discards everything in the directory. func (ed *emptyDir) TearDown() error { - switch ed.medium { - case api.StorageTypeDefault: - return ed.teardownDefault() - case api.StorageTypeMemory: - return ed.teardownTmpfs() - default: - return fmt.Errorf("unknown storage medium %q", ed.medium) + return ed.TearDownAt(ed.GetPath()) +} + +// TearDownAt simply discards everything in the directory. +func (ed *emptyDir) TearDownAt(dir string) error { + // Figure out the medium. + if medium, err := ed.mediumer.GetMedium(dir); err != nil { + return err + } else { + switch medium { + case mediumMemory: + ed.medium = api.StorageTypeMemory + return ed.teardownTmpfs(dir) + default: + // assume StorageTypeDefault + return ed.teardownDefault(dir) + } } } -func (ed *emptyDir) teardownDefault() error { - tmpDir, err := volume.RenameDirectory(ed.GetPath(), ed.volName+".deleting~") +func (ed *emptyDir) teardownDefault(dir string) error { + tmpDir, err := volume.RenameDirectory(dir, ed.volName+".deleting~") if err != nil { return err } @@ -219,14 +233,14 @@ func (ed *emptyDir) teardownDefault() error { return nil } -func (ed *emptyDir) teardownTmpfs() error { +func (ed *emptyDir) teardownTmpfs(dir string) error { if ed.mounter == nil { return fmt.Errorf("memory storage requested, but mounter is nil") } - if err := ed.mounter.Unmount(ed.GetPath(), 0); err != nil { + if err := ed.mounter.Unmount(dir, 0); err != nil { return err } - if err := os.RemoveAll(ed.GetPath()); err != nil { + if err := os.RemoveAll(dir); err != nil { return err } return nil diff --git a/pkg/kubelet/volume/empty_dir/empty_dir_test.go b/pkg/kubelet/volume/empty_dir/empty_dir_test.go index 81206c6ba0f..5c79d7592b9 100644 --- a/pkg/kubelet/volume/empty_dir/empty_dir_test.go +++ b/pkg/kubelet/volume/empty_dir/empty_dir_test.go @@ -33,7 +33,7 @@ const basePath = "/tmp/fake" // Construct an instance of a plugin, by name. func makePluginUnderTest(t *testing.T, plugName string) volume.Plugin { plugMgr := volume.PluginMgr{} - plugMgr.InitPlugins(ProbeVolumePlugins(), &volume.FakeHost{basePath, nil}) + plugMgr.InitPlugins(ProbeVolumePlugins(), volume.NewFakeHost(basePath, nil, nil)) plug, err := plugMgr.FindPluginByName(plugName) if err != nil { diff --git a/pkg/kubelet/volume/gce_pd/gce_pd.go b/pkg/kubelet/volume/gce_pd/gce_pd.go index dc89c12f328..54bb1cbf72e 100644 --- a/pkg/kubelet/volume/gce_pd/gce_pd.go +++ b/pkg/kubelet/volume/gce_pd/gce_pd.go @@ -165,13 +165,18 @@ func detachDiskLogError(pd *gcePersistentDisk) { // SetUp attaches the disk and bind mounts to the volume path. func (pd *gcePersistentDisk) SetUp() error { + return pd.SetUpAt(pd.GetPath()) +} + +// SetUpAt attaches the disk and bind mounts to the volume path. +func (pd *gcePersistentDisk) SetUpAt(dir string) error { if pd.legacyMode { return fmt.Errorf("legacy mode: can not create new instances") } // TODO: handle failed mounts here. - mountpoint, err := mount.IsMountPoint(pd.GetPath()) - glog.V(4).Infof("PersistentDisk set up: %s %v %v", pd.GetPath(), mountpoint, err) + mountpoint, err := mount.IsMountPoint(dir) + glog.V(4).Infof("PersistentDisk set up: %s %v %v", dir, mountpoint, err) if err != nil && !os.IsNotExist(err) { return err } @@ -189,38 +194,37 @@ func (pd *gcePersistentDisk) SetUp() error { flags = mount.FlagReadOnly } - volPath := pd.GetPath() - if err := os.MkdirAll(volPath, 0750); err != nil { + if err := os.MkdirAll(dir, 0750); err != nil { // TODO: we should really eject the attach/detach out into its own control loop. detachDiskLogError(pd) return err } // Perform a bind mount to the full path to allow duplicate mounts of the same PD. - err = pd.mounter.Mount(globalPDPath, pd.GetPath(), "", mount.FlagBind|flags, "") + err = pd.mounter.Mount(globalPDPath, dir, "", mount.FlagBind|flags, "") if err != nil { - mountpoint, mntErr := mount.IsMountPoint(pd.GetPath()) + mountpoint, mntErr := mount.IsMountPoint(dir) if mntErr != nil { glog.Errorf("isMountpoint check failed: %v", mntErr) return err } if mountpoint { - if mntErr = pd.mounter.Unmount(pd.GetPath(), 0); mntErr != nil { + if mntErr = pd.mounter.Unmount(dir, 0); mntErr != nil { glog.Errorf("Failed to unmount: %v", mntErr) return err } - mountpoint, mntErr := mount.IsMountPoint(pd.GetPath()) + mountpoint, mntErr := mount.IsMountPoint(dir) if mntErr != nil { glog.Errorf("isMountpoint check failed: %v", mntErr) return err } if mountpoint { // This is very odd, we don't expect it. We'll try again next sync loop. - glog.Errorf("%s is still mounted, despite call to unmount(). Will try again next sync loop.", pd.GetPath()) + glog.Errorf("%s is still mounted, despite call to unmount(). Will try again next sync loop.", dir) return err } } - os.Remove(pd.GetPath()) + os.Remove(dir) // TODO: we should really eject the attach/detach out into its own control loop. detachDiskLogError(pd) return err @@ -244,20 +248,26 @@ func (pd *gcePersistentDisk) GetPath() string { // Unmounts the bind mount, and detaches the disk only if the PD // resource was the last reference to that disk on the kubelet. func (pd *gcePersistentDisk) TearDown() error { - mountpoint, err := mount.IsMountPoint(pd.GetPath()) + return pd.TearDownAt(pd.GetPath()) +} + +// Unmounts the bind mount, and detaches the disk only if the PD +// resource was the last reference to that disk on the kubelet. +func (pd *gcePersistentDisk) TearDownAt(dir string) error { + mountpoint, err := mount.IsMountPoint(dir) if err != nil { return err } if !mountpoint { - return os.Remove(pd.GetPath()) + return os.Remove(dir) } - refs, err := mount.GetMountRefs(pd.mounter, pd.GetPath()) + refs, err := mount.GetMountRefs(pd.mounter, dir) if err != nil { return err } // Unmount the bind-mount inside this pod - if err := pd.mounter.Unmount(pd.GetPath(), 0); err != nil { + if err := pd.mounter.Unmount(dir, 0); err != nil { return err } // If len(refs) is 1, then all bind mounts have been removed, and the @@ -269,13 +279,13 @@ func (pd *gcePersistentDisk) TearDown() error { return err } } - mountpoint, mntErr := mount.IsMountPoint(pd.GetPath()) + mountpoint, mntErr := mount.IsMountPoint(dir) if mntErr != nil { glog.Errorf("isMountpoint check failed: %v", mntErr) return err } if !mountpoint { - if err := os.Remove(pd.GetPath()); err != nil { + if err := os.Remove(dir); err != nil { return err } } diff --git a/pkg/kubelet/volume/gce_pd/gce_pd_test.go b/pkg/kubelet/volume/gce_pd/gce_pd_test.go index 5a90cb3d8cd..3c902413c7a 100644 --- a/pkg/kubelet/volume/gce_pd/gce_pd_test.go +++ b/pkg/kubelet/volume/gce_pd/gce_pd_test.go @@ -28,7 +28,7 @@ import ( func TestCanSupport(t *testing.T) { plugMgr := volume.PluginMgr{} - plugMgr.InitPlugins(ProbeVolumePlugins(), &volume.FakeHost{"/tmp/fake", nil}) + plugMgr.InitPlugins(ProbeVolumePlugins(), volume.NewFakeHost("/tmp/fake", nil, nil)) plug, err := plugMgr.FindPluginByName("kubernetes.io/gce-pd") if err != nil { @@ -66,7 +66,7 @@ func (fake *fakePDManager) DetachDisk(pd *gcePersistentDisk) error { func TestPlugin(t *testing.T) { plugMgr := volume.PluginMgr{} - plugMgr.InitPlugins(ProbeVolumePlugins(), &volume.FakeHost{"/tmp/fake", nil}) + plugMgr.InitPlugins(ProbeVolumePlugins(), volume.NewFakeHost("/tmp/fake", nil, nil)) plug, err := plugMgr.FindPluginByName("kubernetes.io/gce-pd") if err != nil { @@ -132,7 +132,7 @@ func TestPlugin(t *testing.T) { func TestPluginLegacy(t *testing.T) { plugMgr := volume.PluginMgr{} - plugMgr.InitPlugins(ProbeVolumePlugins(), &volume.FakeHost{"/tmp/fake", nil}) + plugMgr.InitPlugins(ProbeVolumePlugins(), volume.NewFakeHost("/tmp/fake", nil, nil)) plug, err := plugMgr.FindPluginByName("gce-pd") if err != nil { diff --git a/pkg/kubelet/volume/git_repo/git_repo.go b/pkg/kubelet/volume/git_repo/git_repo.go index 65546251ed9..5842fdeda38 100644 --- a/pkg/kubelet/volume/git_repo/git_repo.go +++ b/pkg/kubelet/volume/git_repo/git_repo.go @@ -75,7 +75,7 @@ func (plugin *gitRepoPlugin) NewBuilder(spec *api.Volume, podRef *api.ObjectRefe return nil, fmt.Errorf("legacy mode: can not create new instances") } return &gitRepo{ - podUID: podRef.UID, + podRef: *podRef, volName: spec.Name, source: spec.GitRepo.Repository, revision: spec.GitRepo.Revision, @@ -91,7 +91,7 @@ func (plugin *gitRepoPlugin) NewCleaner(volName string, podUID types.UID) (volum legacy = true } return &gitRepo{ - podUID: podUID, + podRef: api.ObjectReference{UID: podUID}, volName: volName, plugin: plugin, legacyMode: legacy, @@ -102,7 +102,7 @@ func (plugin *gitRepoPlugin) NewCleaner(volName string, podUID types.UID) (volum // These do not persist beyond the lifetime of a pod. type gitRepo struct { volName string - podUID types.UID + podRef api.ObjectReference source string revision string exec exec.Interface @@ -112,6 +112,17 @@ type gitRepo struct { // SetUp creates new directory and clones a git repo. func (gr *gitRepo) SetUp() error { + return gr.SetUpAt(gr.GetPath()) +} + +// This is the spec for the volume that this plugin wraps. +var wrappedVolumeSpec = &api.Volume{ + Name: "not-used", + VolumeSource: api.VolumeSource{EmptyDir: &api.EmptyDirVolumeSource{}}, +} + +// SetUpAt creates new directory and clones a git repo. +func (gr *gitRepo) SetUpAt(dir string) error { if gr.isReady() { return nil } @@ -119,16 +130,20 @@ func (gr *gitRepo) SetUp() error { return fmt.Errorf("legacy mode: can not create new instances") } - volPath := gr.GetPath() - if err := os.MkdirAll(volPath, 0750); err != nil { + // Wrap EmptyDir, let it do the setup. + wrapped, err := gr.plugin.host.NewWrapperBuilder(wrappedVolumeSpec, &gr.podRef) + if err != nil { + return err + } + if err := wrapped.SetUpAt(dir); err != nil { return err } - if output, err := gr.execCommand("git", []string{"clone", gr.source}, gr.GetPath()); err != nil { + if output, err := gr.execCommand("git", []string{"clone", gr.source}, dir); err != nil { return fmt.Errorf("failed to exec 'git clone %s': %s: %v", gr.source, output, err) } - files, err := ioutil.ReadDir(gr.GetPath()) + files, err := ioutil.ReadDir(dir) if err != nil { return err } @@ -141,11 +156,11 @@ func (gr *gitRepo) SetUp() error { return nil } - dir := path.Join(gr.GetPath(), files[0].Name()) - if output, err := gr.execCommand("git", []string{"checkout", gr.revision}, dir); err != nil { + subdir := path.Join(dir, files[0].Name()) + if output, err := gr.execCommand("git", []string{"checkout", gr.revision}, subdir); err != nil { return fmt.Errorf("failed to exec 'git checkout %s': %s: %v", gr.revision, output, err) } - if output, err := gr.execCommand("git", []string{"reset", "--hard"}, dir); err != nil { + if output, err := gr.execCommand("git", []string{"reset", "--hard"}, subdir); err != nil { return fmt.Errorf("failed to exec 'git reset --hard': %s: %v", output, err) } @@ -154,7 +169,7 @@ func (gr *gitRepo) SetUp() error { } func (gr *gitRepo) getMetaDir() string { - return path.Join(gr.plugin.host.GetPodPluginDir(gr.podUID, volume.EscapePluginName(gitRepoPluginName)), gr.volName) + return path.Join(gr.plugin.host.GetPodPluginDir(gr.podRef.UID, volume.EscapePluginName(gitRepoPluginName)), gr.volName) } func (gr *gitRepo) isReady() bool { @@ -197,18 +212,20 @@ func (gr *gitRepo) GetPath() string { if gr.legacyMode { name = gitRepoPluginLegacyName } - return gr.plugin.host.GetPodVolumeDir(gr.podUID, volume.EscapePluginName(name), gr.volName) + return gr.plugin.host.GetPodVolumeDir(gr.podRef.UID, volume.EscapePluginName(name), gr.volName) } // TearDown simply deletes everything in the directory. func (gr *gitRepo) TearDown() error { - tmpDir, err := volume.RenameDirectory(gr.GetPath(), gr.volName+".deleting~") - if err != nil { - return err - } - err = os.RemoveAll(tmpDir) - if err != nil { - return err - } - return nil + return gr.TearDownAt(gr.GetPath()) +} + +// TearDownAt simply deletes everything in the directory. +func (gr *gitRepo) TearDownAt(dir string) error { + // Wrap EmptyDir, let it do the teardown. + wrapped, err := gr.plugin.host.NewWrapperCleaner(wrappedVolumeSpec, gr.podRef.UID) + if err != nil { + return err + } + return wrapped.TearDownAt(dir) } diff --git a/pkg/kubelet/volume/git_repo/git_repo_test.go b/pkg/kubelet/volume/git_repo/git_repo_test.go index 76f44e71d83..6e309697724 100644 --- a/pkg/kubelet/volume/git_repo/git_repo_test.go +++ b/pkg/kubelet/volume/git_repo/git_repo_test.go @@ -26,6 +26,7 @@ import ( "github.com/GoogleCloudPlatform/kubernetes/pkg/api" "github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/volume" + "github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/volume/empty_dir" "github.com/GoogleCloudPlatform/kubernetes/pkg/types" "github.com/GoogleCloudPlatform/kubernetes/pkg/util/exec" ) @@ -35,7 +36,7 @@ func newTestHost(t *testing.T) volume.Host { if err != nil { t.Fatalf("can't make a temp rootdir: %v", err) } - return &volume.FakeHost{tempDir, nil} + return volume.NewFakeHost(tempDir, nil, empty_dir.ProbeVolumePlugins()) } func TestCanSupport(t *testing.T) { diff --git a/pkg/kubelet/volume/host_path/host_path.go b/pkg/kubelet/volume/host_path/host_path.go index 314c8d42c8f..3f397ea5318 100644 --- a/pkg/kubelet/volume/host_path/host_path.go +++ b/pkg/kubelet/volume/host_path/host_path.go @@ -17,6 +17,8 @@ limitations under the License. package host_path import ( + "fmt" + "github.com/GoogleCloudPlatform/kubernetes/pkg/api" "github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/volume" "github.com/GoogleCloudPlatform/kubernetes/pkg/types" @@ -71,6 +73,11 @@ func (hp *hostPath) SetUp() error { return nil } +// SetUpAt does not make sense for host paths - probably programmer error. +func (hp *hostPath) SetUpAt(dir string) error { + return fmt.Errorf("SetUpAt() does not make sense for host paths") +} + func (hp *hostPath) GetPath() string { return hp.path } @@ -79,3 +86,8 @@ func (hp *hostPath) GetPath() string { func (hp *hostPath) TearDown() error { return nil } + +// TearDownAt does not make sense for host paths - probably programmer error. +func (hp *hostPath) TearDownAt(dir string) error { + return fmt.Errorf("TearDownAt() does not make sense for host paths") +} diff --git a/pkg/kubelet/volume/host_path/host_path_test.go b/pkg/kubelet/volume/host_path/host_path_test.go index 2fd6681b572..b7cf0cb6c2c 100644 --- a/pkg/kubelet/volume/host_path/host_path_test.go +++ b/pkg/kubelet/volume/host_path/host_path_test.go @@ -26,7 +26,7 @@ import ( func TestCanSupport(t *testing.T) { plugMgr := volume.PluginMgr{} - plugMgr.InitPlugins(ProbeVolumePlugins(), &volume.FakeHost{"fake", nil}) + plugMgr.InitPlugins(ProbeVolumePlugins(), volume.NewFakeHost("fake", nil, nil)) plug, err := plugMgr.FindPluginByName("kubernetes.io/host-path") if err != nil { @@ -45,7 +45,7 @@ func TestCanSupport(t *testing.T) { func TestPlugin(t *testing.T) { plugMgr := volume.PluginMgr{} - plugMgr.InitPlugins(ProbeVolumePlugins(), &volume.FakeHost{"fake", nil}) + plugMgr.InitPlugins(ProbeVolumePlugins(), volume.NewFakeHost("fake", nil, nil)) plug, err := plugMgr.FindPluginByName("kubernetes.io/host-path") if err != nil { diff --git a/pkg/kubelet/volume/plugins.go b/pkg/kubelet/volume/plugins.go index dbd2baf765f..4614360771f 100644 --- a/pkg/kubelet/volume/plugins.go +++ b/pkg/kubelet/volume/plugins.go @@ -80,6 +80,17 @@ type Host interface { // GetKubeClient returns a client interface GetKubeClient() client.Interface + + // NewWrapperBuilder finds an appropriate plugin with which to handle + // the provided spec. This is used to implement volume plugins which + // "wrap" other plugins. For example, the "secret" volume is + // implemented in terms of the "emptyDir" volume. + NewWrapperBuilder(spec *api.Volume, podRef *api.ObjectReference) (Builder, error) + + // NewWrapperCleaner finds an appropriate plugin with which to handle + // the provided spec. See comments on NewWrapperBuilder for more + // context. + NewWrapperCleaner(spec *api.Volume, podUID types.UID) (Cleaner, error) } // PluginMgr tracks registered plugins. diff --git a/pkg/kubelet/volume/secret/secret.go b/pkg/kubelet/volume/secret/secret.go index 6a8bfc7ee75..6f2ac2bb1b2 100644 --- a/pkg/kubelet/volume/secret/secret.go +++ b/pkg/kubelet/volume/secret/secret.go @@ -19,7 +19,6 @@ package secret import ( "fmt" "io/ioutil" - "os" "path" "github.com/GoogleCloudPlatform/kubernetes/pkg/api" @@ -63,7 +62,7 @@ func (plugin *secretPlugin) NewBuilder(spec *api.Volume, podRef *api.ObjectRefer } func (plugin *secretPlugin) newBuilderInternal(spec *api.Volume, podRef *api.ObjectReference) (volume.Builder, error) { - return &secretVolume{spec.Name, podRef, plugin, &spec.Secret.Target}, nil + return &secretVolume{spec.Name, *podRef, plugin, spec.Secret.Target}, nil } func (plugin *secretPlugin) NewCleaner(volName string, podUID types.UID) (volume.Cleaner, error) { @@ -71,26 +70,39 @@ func (plugin *secretPlugin) NewCleaner(volName string, podUID types.UID) (volume } func (plugin *secretPlugin) newCleanerInternal(volName string, podUID types.UID) (volume.Cleaner, error) { - return &secretVolume{volName, &api.ObjectReference{UID: podUID}, plugin, nil}, nil + return &secretVolume{volName, api.ObjectReference{UID: podUID}, plugin, api.ObjectReference{}}, nil } // secretVolume handles retrieving secrets from the API server // and placing them into the volume on the host. type secretVolume struct { volName string - podRef *api.ObjectReference + podRef api.ObjectReference plugin *secretPlugin - secretRef *api.ObjectReference + secretRef api.ObjectReference } func (sv *secretVolume) SetUp() error { - // TODO: explore tmpfs for secret volumes - hostPath := sv.GetPath() - glog.V(3).Infof("Setting up volume %v for pod %v at %v", sv.volName, sv.podRef.UID, hostPath) - err := os.MkdirAll(hostPath, 0777) + return sv.SetUpAt(sv.GetPath()) +} + +// This is the spec for the volume that this plugin wraps. +var wrappedVolumeSpec = &api.Volume{ + Name: "not-used", + VolumeSource: api.VolumeSource{EmptyDir: &api.EmptyDirVolumeSource{Medium: api.StorageTypeMemory}}, +} + +func (sv *secretVolume) SetUpAt(dir string) error { + glog.V(3).Infof("Setting up volume %v for pod %v at %v", sv.volName, sv.podRef.UID, dir) + + // Wrap EmptyDir, let it do the setup. + wrapped, err := sv.plugin.host.NewWrapperBuilder(wrappedVolumeSpec, &sv.podRef) if err != nil { return err } + if err := wrapped.SetUpAt(dir); err != nil { + return err + } kubeClient := sv.plugin.host.GetKubeClient() if kubeClient == nil { @@ -104,7 +116,7 @@ func (sv *secretVolume) SetUp() error { } for name, data := range secret.Data { - hostFilePath := path.Join(hostPath, name) + hostFilePath := path.Join(dir, name) err := ioutil.WriteFile(hostFilePath, data, 0777) if err != nil { glog.Errorf("Error writing secret data to host path: %v, %v", hostFilePath, err) @@ -120,14 +132,16 @@ func (sv *secretVolume) GetPath() string { } func (sv *secretVolume) TearDown() error { - glog.V(3).Infof("Tearing down volume %v for pod %v at %v", sv.volName, sv.podRef.UID, sv.GetPath()) - tmpDir, err := volume.RenameDirectory(sv.GetPath(), sv.volName+".deleting~") - if err != nil { - return err - } - err = os.RemoveAll(tmpDir) - if err != nil { - return err - } - return nil + return sv.TearDownAt(sv.GetPath()) +} + +func (sv *secretVolume) TearDownAt(dir string) error { + glog.V(3).Infof("Tearing down volume %v for pod %v at %v", sv.volName, sv.podRef.UID, dir) + + // Wrap EmptyDir, let it do the teardown. + wrapped, err := sv.plugin.host.NewWrapperCleaner(wrappedVolumeSpec, sv.podRef.UID) + if err != nil { + return err + } + return wrapped.TearDownAt(dir) } diff --git a/pkg/kubelet/volume/secret/secret_test.go b/pkg/kubelet/volume/secret/secret_test.go index d770ae9b352..01b36cbaf01 100644 --- a/pkg/kubelet/volume/secret/secret_test.go +++ b/pkg/kubelet/volume/secret/secret_test.go @@ -27,16 +27,18 @@ import ( "github.com/GoogleCloudPlatform/kubernetes/pkg/api" "github.com/GoogleCloudPlatform/kubernetes/pkg/client" "github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/volume" + "github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/volume/empty_dir" "github.com/GoogleCloudPlatform/kubernetes/pkg/types" + "github.com/GoogleCloudPlatform/kubernetes/pkg/util/mount" ) -func newTestHost(t *testing.T, fakeKubeClient client.Interface) volume.Host { +func newTestHost(t *testing.T, client client.Interface) volume.Host { tempDir, err := ioutil.TempDir("/tmp", "secret_volume_test.") if err != nil { t.Fatalf("can't make a temp rootdir: %v", err) } - return &volume.FakeHost{tempDir, fakeKubeClient} + return volume.NewFakeHost(tempDir, client, empty_dir.ProbeVolumePluginsWithMounter(&mount.FakeMounter{})) } func TestCanSupport(t *testing.T) { diff --git a/pkg/kubelet/volume/testing.go b/pkg/kubelet/volume/testing.go index 0e727e8ce20..baa17cdbf0e 100644 --- a/pkg/kubelet/volume/testing.go +++ b/pkg/kubelet/volume/testing.go @@ -25,26 +25,49 @@ import ( "github.com/GoogleCloudPlatform/kubernetes/pkg/types" ) -// FakeHost is useful for testing volume plugins. -type FakeHost struct { - RootDir string - KubeClient client.Interface +// fakeHost is useful for testing volume plugins. +type fakeHost struct { + rootDir string + kubeClient client.Interface + pluginMgr PluginMgr } -func (f *FakeHost) GetPluginDir(podUID string) string { - return path.Join(f.RootDir, "plugins", podUID) +func NewFakeHost(rootDir string, kubeClient client.Interface, plugins []Plugin) *fakeHost { + host := &fakeHost{rootDir: rootDir, kubeClient: kubeClient} + host.pluginMgr.InitPlugins(plugins, host) + return host } -func (f *FakeHost) GetPodVolumeDir(podUID types.UID, pluginName, volumeName string) string { - return path.Join(f.RootDir, "pods", string(podUID), "volumes", pluginName, volumeName) +func (f *fakeHost) GetPluginDir(podUID string) string { + return path.Join(f.rootDir, "plugins", podUID) } -func (f *FakeHost) GetPodPluginDir(podUID types.UID, pluginName string) string { - return path.Join(f.RootDir, "pods", string(podUID), "plugins", pluginName) +func (f *fakeHost) GetPodVolumeDir(podUID types.UID, pluginName, volumeName string) string { + return path.Join(f.rootDir, "pods", string(podUID), "volumes", pluginName, volumeName) } -func (f *FakeHost) GetKubeClient() client.Interface { - return f.KubeClient +func (f *fakeHost) GetPodPluginDir(podUID types.UID, pluginName string) string { + return path.Join(f.rootDir, "pods", string(podUID), "plugins", pluginName) +} + +func (f *fakeHost) GetKubeClient() client.Interface { + return f.kubeClient +} + +func (f *fakeHost) NewWrapperBuilder(spec *api.Volume, podRef *api.ObjectReference) (Builder, error) { + plug, err := f.pluginMgr.FindPluginBySpec(spec) + if err != nil { + return nil, err + } + return plug.NewBuilder(spec, podRef) +} + +func (f *fakeHost) NewWrapperCleaner(spec *api.Volume, podUID types.UID) (Cleaner, error) { + plug, err := f.pluginMgr.FindPluginBySpec(spec) + if err != nil { + return nil, err + } + return plug.NewCleaner(spec.Name, podUID) } // FakePlugin is useful for for testing. It tries to be a fully compliant @@ -86,7 +109,11 @@ type FakeVolume struct { } func (fv *FakeVolume) SetUp() error { - return os.MkdirAll(fv.GetPath(), 0750) + return fv.SetUpAt(fv.GetPath()) +} + +func (fv *FakeVolume) SetUpAt(dir string) error { + return os.MkdirAll(dir, 0750) } func (fv *FakeVolume) GetPath() string { @@ -94,5 +121,9 @@ func (fv *FakeVolume) GetPath() string { } func (fv *FakeVolume) TearDown() error { - return os.RemoveAll(fv.GetPath()) + return fv.TearDownAt(fv.GetPath()) +} + +func (fv *FakeVolume) TearDownAt(dir string) error { + return os.RemoveAll(dir) } diff --git a/pkg/kubelet/volume/volume.go b/pkg/kubelet/volume/volume.go index cfa7faaf6ea..1ed7cfddb3d 100644 --- a/pkg/kubelet/volume/volume.go +++ b/pkg/kubelet/volume/volume.go @@ -33,17 +33,25 @@ type Interface interface { type Builder interface { // Uses Interface to provide the path for Docker binds. Interface - // SetUp prepares and mounts/unpacks the volume to a directory path. - // This may be called more than once, so implementations must be - // idempotent. + // SetUp prepares and mounts/unpacks the volume to a self-determined + // directory path. This may be called more than once, so + // implementations must be idempotent. SetUp() error + // SetUpAt prepares and mounts/unpacks the volume to the specified + // directory path, which may or may not exist yet. This may be called + // more than once, so implementations must be idempotent. + SetUpAt(dir string) error } // Cleaner interface provides method to cleanup/unmount the volumes. type Cleaner interface { Interface - // TearDown unmounts the volume and removes traces of the SetUp procedure. + // TearDown unmounts the volume from a self-determined directory and + // removes traces of the SetUp procedure. TearDown() error + // TearDown unmounts the volume from the specified directory and + // removes traces of the SetUp procedure. + TearDownAt(dir string) error } func RenameDirectory(oldPath, newName string) (string, error) { diff --git a/pkg/kubelet/volumes.go b/pkg/kubelet/volumes.go index 09850990524..5fda3388c50 100644 --- a/pkg/kubelet/volumes.go +++ b/pkg/kubelet/volumes.go @@ -53,23 +53,45 @@ func (vh *volumeHost) GetKubeClient() client.Interface { return vh.kubelet.kubeClient } -func (kl *Kubelet) newVolumeBuilderFromPlugins(spec *api.Volume, podRef *api.ObjectReference) volume.Builder { - plugin, err := kl.volumePluginMgr.FindPluginBySpec(spec) +func (vh *volumeHost) NewWrapperBuilder(spec *api.Volume, podRef *api.ObjectReference) (volume.Builder, error) { + b, err := vh.kubelet.newVolumeBuilderFromPlugins(spec, podRef) + if err == nil && b == nil { + return nil, errUnsupportedVolumeType + } + return b, nil +} + +func (vh *volumeHost) NewWrapperCleaner(spec *api.Volume, podUID types.UID) (volume.Cleaner, error) { + plugin, err := vh.kubelet.volumePluginMgr.FindPluginBySpec(spec) if err != nil { - glog.Warningf("Can't use volume plugins for %s: %v", spew.Sprintf("%#v", *spec), err) - return nil + return nil, err } if plugin == nil { - glog.Errorf("No error, but nil volume plugin for %s", spew.Sprintf("%#v", *spec)) - return nil + // Not found but not an error + return nil, nil + } + c, err := plugin.NewCleaner(spec.Name, podUID) + if err == nil && c == nil { + return nil, errUnsupportedVolumeType + } + return c, nil +} + +func (kl *Kubelet) newVolumeBuilderFromPlugins(spec *api.Volume, podRef *api.ObjectReference) (volume.Builder, error) { + plugin, err := kl.volumePluginMgr.FindPluginBySpec(spec) + if err != nil { + return nil, fmt.Errorf("can't use volume plugins for %s: %v", spew.Sprintf("%#v", *spec), err) + } + if plugin == nil { + // Not found but not an error + return nil, nil } builder, err := plugin.NewBuilder(spec, podRef) if err != nil { - glog.Warningf("Error instantiating volume plugin for %s: %v", spew.Sprintf("%#v", *spec), err) - return nil + return nil, fmt.Errorf("failed to instantiate volume plugin for %s: %v", spew.Sprintf("%#v", *spec), err) } glog.V(3).Infof("Used volume plugin %q for %s", plugin.Name(), spew.Sprintf("%#v", *spec)) - return builder + return builder, nil } func (kl *Kubelet) mountExternalVolumes(pod *api.Pod) (volumeMap, error) { @@ -84,7 +106,11 @@ func (kl *Kubelet) mountExternalVolumes(pod *api.Pod) (volumeMap, error) { } // Try to use a plugin for this volume. - builder := kl.newVolumeBuilderFromPlugins(volSpec, podRef) + builder, err := kl.newVolumeBuilderFromPlugins(volSpec, podRef) + if err != nil { + glog.Errorf("Could not create volume builder for pod %s: %v", pod.UID, err) + return nil, err + } if builder == nil { return nil, errUnsupportedVolumeType } @@ -131,7 +157,11 @@ func (kl *Kubelet) getPodVolumesFromDisk() map[string]volume.Cleaner { // or volume objects. // Try to use a plugin for this volume. - cleaner := kl.newVolumeCleanerFromPlugins(volumeKind, volumeName, podUID) + cleaner, err := kl.newVolumeCleanerFromPlugins(volumeKind, volumeName, podUID) + if err != nil { + glog.Errorf("Could not create volume cleaner for %s: %v", volumeNameDir.Name(), err) + continue + } if cleaner == nil { glog.Errorf("Could not create volume cleaner for %s: %v", volumeNameDir.Name(), errUnsupportedVolumeType) continue @@ -143,23 +173,21 @@ func (kl *Kubelet) getPodVolumesFromDisk() map[string]volume.Cleaner { return currentVolumes } -func (kl *Kubelet) newVolumeCleanerFromPlugins(kind string, name string, podUID types.UID) volume.Cleaner { +func (kl *Kubelet) newVolumeCleanerFromPlugins(kind string, name string, podUID types.UID) (volume.Cleaner, error) { plugName := volume.UnescapePluginName(kind) plugin, err := kl.volumePluginMgr.FindPluginByName(plugName) if err != nil { // TODO: Maybe we should launch a cleanup of this dir? - glog.Warningf("Can't use volume plugins for %s/%s: %v", podUID, kind, err) - return nil + return nil, fmt.Errorf("can't use volume plugins for %s/%s: %v", podUID, kind, err) } if plugin == nil { - glog.Errorf("No error, but nil volume plugin for %s/%s", podUID, kind) - return nil + // Not found but not an error. + return nil, nil } cleaner, err := plugin.NewCleaner(name, podUID) if err != nil { - glog.Warningf("Error instantiating volume plugin for %s/%s: %v", podUID, kind, err) - return nil + return nil, fmt.Errorf("failed to instantiate volume plugin for %s/%s: %v", podUID, kind, err) } glog.V(3).Infof("Used volume plugin %q for %s/%s", plugin.Name(), podUID, kind) - return cleaner + return cleaner, nil }