Implement volume plugin wrappers

Convert git_repo and secret into wrappers around empty_dir.
This commit is contained in:
Tim Hockin 2015-03-07 13:38:50 -08:00
parent bdc1981eb5
commit 1725c23eb2
14 changed files with 286 additions and 138 deletions

View File

@ -29,11 +29,21 @@ import (
// This is the primary entrypoint for volume plugins. // This is the primary entrypoint for volume plugins.
func ProbeVolumePlugins() []volume.Plugin { func ProbeVolumePlugins() []volume.Plugin {
return []volume.Plugin{&emptyDirPlugin{nil, false}, &emptyDirPlugin{nil, true}} return ProbeVolumePluginsWithMounter(mount.New())
}
// ProbePluginsWithMounter is a convenience for testing other plugins which wrap this one.
//FIXME: alternative: pass mount.Interface to all ProbeVolumePlugins() functions? Opinions?
func ProbeVolumePluginsWithMounter(mounter mount.Interface) []volume.Plugin {
return []volume.Plugin{
&emptyDirPlugin{nil, mounter, false},
&emptyDirPlugin{nil, mounter, true},
}
} }
type emptyDirPlugin struct { type emptyDirPlugin struct {
host volume.Host host volume.Host
mounter mount.Interface
legacyMode bool // if set, plugin answers to the legacy name legacyMode bool // if set, plugin answers to the legacy name
} }
@ -72,7 +82,7 @@ func (plugin *emptyDirPlugin) CanSupport(spec *api.Volume) bool {
func (plugin *emptyDirPlugin) NewBuilder(spec *api.Volume, podRef *api.ObjectReference) (volume.Builder, error) { func (plugin *emptyDirPlugin) NewBuilder(spec *api.Volume, podRef *api.ObjectReference) (volume.Builder, error) {
// Inject real implementations here, test through the internal function. // Inject real implementations here, test through the internal function.
return plugin.newBuilderInternal(spec, podRef, mount.New(), &realMediumer{}) return plugin.newBuilderInternal(spec, podRef, plugin.mounter, &realMediumer{})
} }
func (plugin *emptyDirPlugin) newBuilderInternal(spec *api.Volume, podRef *api.ObjectReference, mounter mount.Interface, mediumer mediumer) (volume.Builder, error) { func (plugin *emptyDirPlugin) newBuilderInternal(spec *api.Volume, podRef *api.ObjectReference, mounter mount.Interface, mediumer mediumer) (volume.Builder, error) {
@ -88,8 +98,8 @@ func (plugin *emptyDirPlugin) newBuilderInternal(spec *api.Volume, podRef *api.O
podUID: podRef.UID, podUID: podRef.UID,
volName: spec.Name, volName: spec.Name,
medium: medium, medium: medium,
mediumer: mediumer,
mounter: mounter, mounter: mounter,
mediumer: mediumer,
plugin: plugin, plugin: plugin,
legacyMode: false, legacyMode: false,
}, nil }, nil
@ -97,7 +107,7 @@ func (plugin *emptyDirPlugin) newBuilderInternal(spec *api.Volume, podRef *api.O
func (plugin *emptyDirPlugin) NewCleaner(volName string, podUID types.UID) (volume.Cleaner, error) { func (plugin *emptyDirPlugin) NewCleaner(volName string, podUID types.UID) (volume.Cleaner, error) {
// Inject real implementations here, test through the internal function. // Inject real implementations here, test through the internal function.
return plugin.newCleanerInternal(volName, podUID, mount.New(), &realMediumer{}) return plugin.newCleanerInternal(volName, podUID, plugin.mounter, &realMediumer{})
} }
func (plugin *emptyDirPlugin) newCleanerInternal(volName string, podUID types.UID, mounter mount.Interface, mediumer mediumer) (volume.Cleaner, error) { func (plugin *emptyDirPlugin) newCleanerInternal(volName string, podUID types.UID, mounter mount.Interface, mediumer mediumer) (volume.Cleaner, error) {
@ -114,17 +124,6 @@ func (plugin *emptyDirPlugin) newCleanerInternal(volName string, podUID types.UI
plugin: plugin, plugin: plugin,
legacyMode: legacy, legacyMode: legacy,
} }
// Figure out the medium.
if medium, err := mediumer.GetMedium(ed.GetPath()); err != nil {
return nil, err
} else {
switch medium {
case mediumMemory:
ed.medium = api.StorageTypeMemory
default:
// assume StorageTypeDefault
}
}
return ed, nil return ed, nil
} }
@ -154,37 +153,42 @@ type emptyDir struct {
// SetUp creates new directory. // SetUp creates new directory.
func (ed *emptyDir) SetUp() error { func (ed *emptyDir) SetUp() error {
return ed.SetUpAt(ed.GetPath())
}
// SetUpAt creates new directory.
func (ed *emptyDir) SetUpAt(dir string) error {
if ed.legacyMode { if ed.legacyMode {
return fmt.Errorf("legacy mode: can not create new instances") return fmt.Errorf("legacy mode: can not create new instances")
} }
switch ed.medium { switch ed.medium {
case api.StorageTypeDefault: case api.StorageTypeDefault:
return ed.setupDefault() return ed.setupDefault(dir)
case api.StorageTypeMemory: case api.StorageTypeMemory:
return ed.setupTmpfs() return ed.setupTmpfs(dir)
default: default:
return fmt.Errorf("unknown storage medium %q", ed.medium) return fmt.Errorf("unknown storage medium %q", ed.medium)
} }
} }
func (ed *emptyDir) setupDefault() error { func (ed *emptyDir) setupDefault(dir string) error {
return os.MkdirAll(ed.GetPath(), 0750) return os.MkdirAll(dir, 0750)
} }
func (ed *emptyDir) setupTmpfs() error { func (ed *emptyDir) setupTmpfs(dir string) error {
if ed.mounter == nil { if ed.mounter == nil {
return fmt.Errorf("memory storage requested, but mounter is nil") return fmt.Errorf("memory storage requested, but mounter is nil")
} }
if err := os.MkdirAll(ed.GetPath(), 0750); err != nil { if err := os.MkdirAll(dir, 0750); err != nil {
return err return err
} }
// Make SetUp idempotent. // Make SetUp idempotent.
if medium, err := ed.mediumer.GetMedium(ed.GetPath()); err != nil { if medium, err := ed.mediumer.GetMedium(dir); err != nil {
return err return err
} else if medium == mediumMemory { } else if medium == mediumMemory {
return nil // current state is what we expect return nil // current state is what we expect
} }
return ed.mounter.Mount("tmpfs", ed.GetPath(), "tmpfs", 0, "") return ed.mounter.Mount("tmpfs", dir, "tmpfs", 0, "")
} }
func (ed *emptyDir) GetPath() string { func (ed *emptyDir) GetPath() string {
@ -197,18 +201,28 @@ func (ed *emptyDir) GetPath() string {
// TearDown simply discards everything in the directory. // TearDown simply discards everything in the directory.
func (ed *emptyDir) TearDown() error { func (ed *emptyDir) TearDown() error {
switch ed.medium { return ed.TearDownAt(ed.GetPath())
case api.StorageTypeDefault: }
return ed.teardownDefault()
case api.StorageTypeMemory: // TearDownAt simply discards everything in the directory.
return ed.teardownTmpfs() func (ed *emptyDir) TearDownAt(dir string) error {
// Figure out the medium.
if medium, err := ed.mediumer.GetMedium(dir); err != nil {
return err
} else {
switch medium {
case mediumMemory:
ed.medium = api.StorageTypeMemory
return ed.teardownTmpfs(dir)
default: default:
return fmt.Errorf("unknown storage medium %q", ed.medium) // assume StorageTypeDefault
return ed.teardownDefault(dir)
}
} }
} }
func (ed *emptyDir) teardownDefault() error { func (ed *emptyDir) teardownDefault(dir string) error {
tmpDir, err := volume.RenameDirectory(ed.GetPath(), ed.volName+".deleting~") tmpDir, err := volume.RenameDirectory(dir, ed.volName+".deleting~")
if err != nil { if err != nil {
return err return err
} }
@ -219,14 +233,14 @@ func (ed *emptyDir) teardownDefault() error {
return nil return nil
} }
func (ed *emptyDir) teardownTmpfs() error { func (ed *emptyDir) teardownTmpfs(dir string) error {
if ed.mounter == nil { if ed.mounter == nil {
return fmt.Errorf("memory storage requested, but mounter is nil") return fmt.Errorf("memory storage requested, but mounter is nil")
} }
if err := ed.mounter.Unmount(ed.GetPath(), 0); err != nil { if err := ed.mounter.Unmount(dir, 0); err != nil {
return err return err
} }
if err := os.RemoveAll(ed.GetPath()); err != nil { if err := os.RemoveAll(dir); err != nil {
return err return err
} }
return nil return nil

View File

@ -33,7 +33,7 @@ const basePath = "/tmp/fake"
// Construct an instance of a plugin, by name. // Construct an instance of a plugin, by name.
func makePluginUnderTest(t *testing.T, plugName string) volume.Plugin { func makePluginUnderTest(t *testing.T, plugName string) volume.Plugin {
plugMgr := volume.PluginMgr{} plugMgr := volume.PluginMgr{}
plugMgr.InitPlugins(ProbeVolumePlugins(), &volume.FakeHost{basePath, nil}) plugMgr.InitPlugins(ProbeVolumePlugins(), volume.NewFakeHost(basePath, nil, nil))
plug, err := plugMgr.FindPluginByName(plugName) plug, err := plugMgr.FindPluginByName(plugName)
if err != nil { if err != nil {

View File

@ -165,13 +165,18 @@ func detachDiskLogError(pd *gcePersistentDisk) {
// SetUp attaches the disk and bind mounts to the volume path. // SetUp attaches the disk and bind mounts to the volume path.
func (pd *gcePersistentDisk) SetUp() error { func (pd *gcePersistentDisk) SetUp() error {
return pd.SetUpAt(pd.GetPath())
}
// SetUpAt attaches the disk and bind mounts to the volume path.
func (pd *gcePersistentDisk) SetUpAt(dir string) error {
if pd.legacyMode { if pd.legacyMode {
return fmt.Errorf("legacy mode: can not create new instances") return fmt.Errorf("legacy mode: can not create new instances")
} }
// TODO: handle failed mounts here. // TODO: handle failed mounts here.
mountpoint, err := mount.IsMountPoint(pd.GetPath()) mountpoint, err := mount.IsMountPoint(dir)
glog.V(4).Infof("PersistentDisk set up: %s %v %v", pd.GetPath(), mountpoint, err) glog.V(4).Infof("PersistentDisk set up: %s %v %v", dir, mountpoint, err)
if err != nil && !os.IsNotExist(err) { if err != nil && !os.IsNotExist(err) {
return err return err
} }
@ -189,38 +194,37 @@ func (pd *gcePersistentDisk) SetUp() error {
flags = mount.FlagReadOnly flags = mount.FlagReadOnly
} }
volPath := pd.GetPath() if err := os.MkdirAll(dir, 0750); err != nil {
if err := os.MkdirAll(volPath, 0750); err != nil {
// TODO: we should really eject the attach/detach out into its own control loop. // TODO: we should really eject the attach/detach out into its own control loop.
detachDiskLogError(pd) detachDiskLogError(pd)
return err return err
} }
// Perform a bind mount to the full path to allow duplicate mounts of the same PD. // Perform a bind mount to the full path to allow duplicate mounts of the same PD.
err = pd.mounter.Mount(globalPDPath, pd.GetPath(), "", mount.FlagBind|flags, "") err = pd.mounter.Mount(globalPDPath, dir, "", mount.FlagBind|flags, "")
if err != nil { if err != nil {
mountpoint, mntErr := mount.IsMountPoint(pd.GetPath()) mountpoint, mntErr := mount.IsMountPoint(dir)
if mntErr != nil { if mntErr != nil {
glog.Errorf("isMountpoint check failed: %v", mntErr) glog.Errorf("isMountpoint check failed: %v", mntErr)
return err return err
} }
if mountpoint { if mountpoint {
if mntErr = pd.mounter.Unmount(pd.GetPath(), 0); mntErr != nil { if mntErr = pd.mounter.Unmount(dir, 0); mntErr != nil {
glog.Errorf("Failed to unmount: %v", mntErr) glog.Errorf("Failed to unmount: %v", mntErr)
return err return err
} }
mountpoint, mntErr := mount.IsMountPoint(pd.GetPath()) mountpoint, mntErr := mount.IsMountPoint(dir)
if mntErr != nil { if mntErr != nil {
glog.Errorf("isMountpoint check failed: %v", mntErr) glog.Errorf("isMountpoint check failed: %v", mntErr)
return err return err
} }
if mountpoint { if mountpoint {
// This is very odd, we don't expect it. We'll try again next sync loop. // This is very odd, we don't expect it. We'll try again next sync loop.
glog.Errorf("%s is still mounted, despite call to unmount(). Will try again next sync loop.", pd.GetPath()) glog.Errorf("%s is still mounted, despite call to unmount(). Will try again next sync loop.", dir)
return err return err
} }
} }
os.Remove(pd.GetPath()) os.Remove(dir)
// TODO: we should really eject the attach/detach out into its own control loop. // TODO: we should really eject the attach/detach out into its own control loop.
detachDiskLogError(pd) detachDiskLogError(pd)
return err return err
@ -244,20 +248,26 @@ func (pd *gcePersistentDisk) GetPath() string {
// Unmounts the bind mount, and detaches the disk only if the PD // Unmounts the bind mount, and detaches the disk only if the PD
// resource was the last reference to that disk on the kubelet. // resource was the last reference to that disk on the kubelet.
func (pd *gcePersistentDisk) TearDown() error { func (pd *gcePersistentDisk) TearDown() error {
mountpoint, err := mount.IsMountPoint(pd.GetPath()) return pd.TearDownAt(pd.GetPath())
}
// Unmounts the bind mount, and detaches the disk only if the PD
// resource was the last reference to that disk on the kubelet.
func (pd *gcePersistentDisk) TearDownAt(dir string) error {
mountpoint, err := mount.IsMountPoint(dir)
if err != nil { if err != nil {
return err return err
} }
if !mountpoint { if !mountpoint {
return os.Remove(pd.GetPath()) return os.Remove(dir)
} }
refs, err := mount.GetMountRefs(pd.mounter, pd.GetPath()) refs, err := mount.GetMountRefs(pd.mounter, dir)
if err != nil { if err != nil {
return err return err
} }
// Unmount the bind-mount inside this pod // Unmount the bind-mount inside this pod
if err := pd.mounter.Unmount(pd.GetPath(), 0); err != nil { if err := pd.mounter.Unmount(dir, 0); err != nil {
return err return err
} }
// If len(refs) is 1, then all bind mounts have been removed, and the // If len(refs) is 1, then all bind mounts have been removed, and the
@ -269,13 +279,13 @@ func (pd *gcePersistentDisk) TearDown() error {
return err return err
} }
} }
mountpoint, mntErr := mount.IsMountPoint(pd.GetPath()) mountpoint, mntErr := mount.IsMountPoint(dir)
if mntErr != nil { if mntErr != nil {
glog.Errorf("isMountpoint check failed: %v", mntErr) glog.Errorf("isMountpoint check failed: %v", mntErr)
return err return err
} }
if !mountpoint { if !mountpoint {
if err := os.Remove(pd.GetPath()); err != nil { if err := os.Remove(dir); err != nil {
return err return err
} }
} }

View File

@ -28,7 +28,7 @@ import (
func TestCanSupport(t *testing.T) { func TestCanSupport(t *testing.T) {
plugMgr := volume.PluginMgr{} plugMgr := volume.PluginMgr{}
plugMgr.InitPlugins(ProbeVolumePlugins(), &volume.FakeHost{"/tmp/fake", nil}) plugMgr.InitPlugins(ProbeVolumePlugins(), volume.NewFakeHost("/tmp/fake", nil, nil))
plug, err := plugMgr.FindPluginByName("kubernetes.io/gce-pd") plug, err := plugMgr.FindPluginByName("kubernetes.io/gce-pd")
if err != nil { if err != nil {
@ -66,7 +66,7 @@ func (fake *fakePDManager) DetachDisk(pd *gcePersistentDisk) error {
func TestPlugin(t *testing.T) { func TestPlugin(t *testing.T) {
plugMgr := volume.PluginMgr{} plugMgr := volume.PluginMgr{}
plugMgr.InitPlugins(ProbeVolumePlugins(), &volume.FakeHost{"/tmp/fake", nil}) plugMgr.InitPlugins(ProbeVolumePlugins(), volume.NewFakeHost("/tmp/fake", nil, nil))
plug, err := plugMgr.FindPluginByName("kubernetes.io/gce-pd") plug, err := plugMgr.FindPluginByName("kubernetes.io/gce-pd")
if err != nil { if err != nil {
@ -132,7 +132,7 @@ func TestPlugin(t *testing.T) {
func TestPluginLegacy(t *testing.T) { func TestPluginLegacy(t *testing.T) {
plugMgr := volume.PluginMgr{} plugMgr := volume.PluginMgr{}
plugMgr.InitPlugins(ProbeVolumePlugins(), &volume.FakeHost{"/tmp/fake", nil}) plugMgr.InitPlugins(ProbeVolumePlugins(), volume.NewFakeHost("/tmp/fake", nil, nil))
plug, err := plugMgr.FindPluginByName("gce-pd") plug, err := plugMgr.FindPluginByName("gce-pd")
if err != nil { if err != nil {

View File

@ -75,7 +75,7 @@ func (plugin *gitRepoPlugin) NewBuilder(spec *api.Volume, podRef *api.ObjectRefe
return nil, fmt.Errorf("legacy mode: can not create new instances") return nil, fmt.Errorf("legacy mode: can not create new instances")
} }
return &gitRepo{ return &gitRepo{
podUID: podRef.UID, podRef: *podRef,
volName: spec.Name, volName: spec.Name,
source: spec.GitRepo.Repository, source: spec.GitRepo.Repository,
revision: spec.GitRepo.Revision, revision: spec.GitRepo.Revision,
@ -91,7 +91,7 @@ func (plugin *gitRepoPlugin) NewCleaner(volName string, podUID types.UID) (volum
legacy = true legacy = true
} }
return &gitRepo{ return &gitRepo{
podUID: podUID, podRef: api.ObjectReference{UID: podUID},
volName: volName, volName: volName,
plugin: plugin, plugin: plugin,
legacyMode: legacy, legacyMode: legacy,
@ -102,7 +102,7 @@ func (plugin *gitRepoPlugin) NewCleaner(volName string, podUID types.UID) (volum
// These do not persist beyond the lifetime of a pod. // These do not persist beyond the lifetime of a pod.
type gitRepo struct { type gitRepo struct {
volName string volName string
podUID types.UID podRef api.ObjectReference
source string source string
revision string revision string
exec exec.Interface exec exec.Interface
@ -112,6 +112,17 @@ type gitRepo struct {
// SetUp creates new directory and clones a git repo. // SetUp creates new directory and clones a git repo.
func (gr *gitRepo) SetUp() error { func (gr *gitRepo) SetUp() error {
return gr.SetUpAt(gr.GetPath())
}
// This is the spec for the volume that this plugin wraps.
var wrappedVolumeSpec = &api.Volume{
Name: "not-used",
VolumeSource: api.VolumeSource{EmptyDir: &api.EmptyDirVolumeSource{}},
}
// SetUpAt creates new directory and clones a git repo.
func (gr *gitRepo) SetUpAt(dir string) error {
if gr.isReady() { if gr.isReady() {
return nil return nil
} }
@ -119,16 +130,20 @@ func (gr *gitRepo) SetUp() error {
return fmt.Errorf("legacy mode: can not create new instances") return fmt.Errorf("legacy mode: can not create new instances")
} }
volPath := gr.GetPath() // Wrap EmptyDir, let it do the setup.
if err := os.MkdirAll(volPath, 0750); err != nil { wrapped, err := gr.plugin.host.NewWrapperBuilder(wrappedVolumeSpec, &gr.podRef)
if err != nil {
return err
}
if err := wrapped.SetUpAt(dir); err != nil {
return err return err
} }
if output, err := gr.execCommand("git", []string{"clone", gr.source}, gr.GetPath()); err != nil { if output, err := gr.execCommand("git", []string{"clone", gr.source}, dir); err != nil {
return fmt.Errorf("failed to exec 'git clone %s': %s: %v", gr.source, output, err) return fmt.Errorf("failed to exec 'git clone %s': %s: %v", gr.source, output, err)
} }
files, err := ioutil.ReadDir(gr.GetPath()) files, err := ioutil.ReadDir(dir)
if err != nil { if err != nil {
return err return err
} }
@ -141,11 +156,11 @@ func (gr *gitRepo) SetUp() error {
return nil return nil
} }
dir := path.Join(gr.GetPath(), files[0].Name()) subdir := path.Join(dir, files[0].Name())
if output, err := gr.execCommand("git", []string{"checkout", gr.revision}, dir); err != nil { if output, err := gr.execCommand("git", []string{"checkout", gr.revision}, subdir); err != nil {
return fmt.Errorf("failed to exec 'git checkout %s': %s: %v", gr.revision, output, err) return fmt.Errorf("failed to exec 'git checkout %s': %s: %v", gr.revision, output, err)
} }
if output, err := gr.execCommand("git", []string{"reset", "--hard"}, dir); err != nil { if output, err := gr.execCommand("git", []string{"reset", "--hard"}, subdir); err != nil {
return fmt.Errorf("failed to exec 'git reset --hard': %s: %v", output, err) return fmt.Errorf("failed to exec 'git reset --hard': %s: %v", output, err)
} }
@ -154,7 +169,7 @@ func (gr *gitRepo) SetUp() error {
} }
func (gr *gitRepo) getMetaDir() string { func (gr *gitRepo) getMetaDir() string {
return path.Join(gr.plugin.host.GetPodPluginDir(gr.podUID, volume.EscapePluginName(gitRepoPluginName)), gr.volName) return path.Join(gr.plugin.host.GetPodPluginDir(gr.podRef.UID, volume.EscapePluginName(gitRepoPluginName)), gr.volName)
} }
func (gr *gitRepo) isReady() bool { func (gr *gitRepo) isReady() bool {
@ -197,18 +212,20 @@ func (gr *gitRepo) GetPath() string {
if gr.legacyMode { if gr.legacyMode {
name = gitRepoPluginLegacyName name = gitRepoPluginLegacyName
} }
return gr.plugin.host.GetPodVolumeDir(gr.podUID, volume.EscapePluginName(name), gr.volName) return gr.plugin.host.GetPodVolumeDir(gr.podRef.UID, volume.EscapePluginName(name), gr.volName)
} }
// TearDown simply deletes everything in the directory. // TearDown simply deletes everything in the directory.
func (gr *gitRepo) TearDown() error { func (gr *gitRepo) TearDown() error {
tmpDir, err := volume.RenameDirectory(gr.GetPath(), gr.volName+".deleting~") return gr.TearDownAt(gr.GetPath())
if err != nil { }
return err
} // TearDownAt simply deletes everything in the directory.
err = os.RemoveAll(tmpDir) func (gr *gitRepo) TearDownAt(dir string) error {
if err != nil { // Wrap EmptyDir, let it do the teardown.
return err wrapped, err := gr.plugin.host.NewWrapperCleaner(wrappedVolumeSpec, gr.podRef.UID)
} if err != nil {
return nil return err
}
return wrapped.TearDownAt(dir)
} }

View File

@ -26,6 +26,7 @@ import (
"github.com/GoogleCloudPlatform/kubernetes/pkg/api" "github.com/GoogleCloudPlatform/kubernetes/pkg/api"
"github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/volume" "github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/volume"
"github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/volume/empty_dir"
"github.com/GoogleCloudPlatform/kubernetes/pkg/types" "github.com/GoogleCloudPlatform/kubernetes/pkg/types"
"github.com/GoogleCloudPlatform/kubernetes/pkg/util/exec" "github.com/GoogleCloudPlatform/kubernetes/pkg/util/exec"
) )
@ -35,7 +36,7 @@ func newTestHost(t *testing.T) volume.Host {
if err != nil { if err != nil {
t.Fatalf("can't make a temp rootdir: %v", err) t.Fatalf("can't make a temp rootdir: %v", err)
} }
return &volume.FakeHost{tempDir, nil} return volume.NewFakeHost(tempDir, nil, empty_dir.ProbeVolumePlugins())
} }
func TestCanSupport(t *testing.T) { func TestCanSupport(t *testing.T) {

View File

@ -17,6 +17,8 @@ limitations under the License.
package host_path package host_path
import ( import (
"fmt"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api" "github.com/GoogleCloudPlatform/kubernetes/pkg/api"
"github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/volume" "github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/volume"
"github.com/GoogleCloudPlatform/kubernetes/pkg/types" "github.com/GoogleCloudPlatform/kubernetes/pkg/types"
@ -71,6 +73,11 @@ func (hp *hostPath) SetUp() error {
return nil return nil
} }
// SetUpAt does not make sense for host paths - probably programmer error.
func (hp *hostPath) SetUpAt(dir string) error {
return fmt.Errorf("SetUpAt() does not make sense for host paths")
}
func (hp *hostPath) GetPath() string { func (hp *hostPath) GetPath() string {
return hp.path return hp.path
} }
@ -79,3 +86,8 @@ func (hp *hostPath) GetPath() string {
func (hp *hostPath) TearDown() error { func (hp *hostPath) TearDown() error {
return nil return nil
} }
// TearDownAt does not make sense for host paths - probably programmer error.
func (hp *hostPath) TearDownAt(dir string) error {
return fmt.Errorf("TearDownAt() does not make sense for host paths")
}

View File

@ -26,7 +26,7 @@ import (
func TestCanSupport(t *testing.T) { func TestCanSupport(t *testing.T) {
plugMgr := volume.PluginMgr{} plugMgr := volume.PluginMgr{}
plugMgr.InitPlugins(ProbeVolumePlugins(), &volume.FakeHost{"fake", nil}) plugMgr.InitPlugins(ProbeVolumePlugins(), volume.NewFakeHost("fake", nil, nil))
plug, err := plugMgr.FindPluginByName("kubernetes.io/host-path") plug, err := plugMgr.FindPluginByName("kubernetes.io/host-path")
if err != nil { if err != nil {
@ -45,7 +45,7 @@ func TestCanSupport(t *testing.T) {
func TestPlugin(t *testing.T) { func TestPlugin(t *testing.T) {
plugMgr := volume.PluginMgr{} plugMgr := volume.PluginMgr{}
plugMgr.InitPlugins(ProbeVolumePlugins(), &volume.FakeHost{"fake", nil}) plugMgr.InitPlugins(ProbeVolumePlugins(), volume.NewFakeHost("fake", nil, nil))
plug, err := plugMgr.FindPluginByName("kubernetes.io/host-path") plug, err := plugMgr.FindPluginByName("kubernetes.io/host-path")
if err != nil { if err != nil {

View File

@ -80,6 +80,17 @@ type Host interface {
// GetKubeClient returns a client interface // GetKubeClient returns a client interface
GetKubeClient() client.Interface GetKubeClient() client.Interface
// NewWrapperBuilder finds an appropriate plugin with which to handle
// the provided spec. This is used to implement volume plugins which
// "wrap" other plugins. For example, the "secret" volume is
// implemented in terms of the "emptyDir" volume.
NewWrapperBuilder(spec *api.Volume, podRef *api.ObjectReference) (Builder, error)
// NewWrapperCleaner finds an appropriate plugin with which to handle
// the provided spec. See comments on NewWrapperBuilder for more
// context.
NewWrapperCleaner(spec *api.Volume, podUID types.UID) (Cleaner, error)
} }
// PluginMgr tracks registered plugins. // PluginMgr tracks registered plugins.

View File

@ -19,7 +19,6 @@ package secret
import ( import (
"fmt" "fmt"
"io/ioutil" "io/ioutil"
"os"
"path" "path"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api" "github.com/GoogleCloudPlatform/kubernetes/pkg/api"
@ -63,7 +62,7 @@ func (plugin *secretPlugin) NewBuilder(spec *api.Volume, podRef *api.ObjectRefer
} }
func (plugin *secretPlugin) newBuilderInternal(spec *api.Volume, podRef *api.ObjectReference) (volume.Builder, error) { func (plugin *secretPlugin) newBuilderInternal(spec *api.Volume, podRef *api.ObjectReference) (volume.Builder, error) {
return &secretVolume{spec.Name, podRef, plugin, &spec.Secret.Target}, nil return &secretVolume{spec.Name, *podRef, plugin, spec.Secret.Target}, nil
} }
func (plugin *secretPlugin) NewCleaner(volName string, podUID types.UID) (volume.Cleaner, error) { func (plugin *secretPlugin) NewCleaner(volName string, podUID types.UID) (volume.Cleaner, error) {
@ -71,26 +70,39 @@ func (plugin *secretPlugin) NewCleaner(volName string, podUID types.UID) (volume
} }
func (plugin *secretPlugin) newCleanerInternal(volName string, podUID types.UID) (volume.Cleaner, error) { func (plugin *secretPlugin) newCleanerInternal(volName string, podUID types.UID) (volume.Cleaner, error) {
return &secretVolume{volName, &api.ObjectReference{UID: podUID}, plugin, nil}, nil return &secretVolume{volName, api.ObjectReference{UID: podUID}, plugin, api.ObjectReference{}}, nil
} }
// secretVolume handles retrieving secrets from the API server // secretVolume handles retrieving secrets from the API server
// and placing them into the volume on the host. // and placing them into the volume on the host.
type secretVolume struct { type secretVolume struct {
volName string volName string
podRef *api.ObjectReference podRef api.ObjectReference
plugin *secretPlugin plugin *secretPlugin
secretRef *api.ObjectReference secretRef api.ObjectReference
} }
func (sv *secretVolume) SetUp() error { func (sv *secretVolume) SetUp() error {
// TODO: explore tmpfs for secret volumes return sv.SetUpAt(sv.GetPath())
hostPath := sv.GetPath() }
glog.V(3).Infof("Setting up volume %v for pod %v at %v", sv.volName, sv.podRef.UID, hostPath)
err := os.MkdirAll(hostPath, 0777) // This is the spec for the volume that this plugin wraps.
var wrappedVolumeSpec = &api.Volume{
Name: "not-used",
VolumeSource: api.VolumeSource{EmptyDir: &api.EmptyDirVolumeSource{Medium: api.StorageTypeMemory}},
}
func (sv *secretVolume) SetUpAt(dir string) error {
glog.V(3).Infof("Setting up volume %v for pod %v at %v", sv.volName, sv.podRef.UID, dir)
// Wrap EmptyDir, let it do the setup.
wrapped, err := sv.plugin.host.NewWrapperBuilder(wrappedVolumeSpec, &sv.podRef)
if err != nil { if err != nil {
return err return err
} }
if err := wrapped.SetUpAt(dir); err != nil {
return err
}
kubeClient := sv.plugin.host.GetKubeClient() kubeClient := sv.plugin.host.GetKubeClient()
if kubeClient == nil { if kubeClient == nil {
@ -104,7 +116,7 @@ func (sv *secretVolume) SetUp() error {
} }
for name, data := range secret.Data { for name, data := range secret.Data {
hostFilePath := path.Join(hostPath, name) hostFilePath := path.Join(dir, name)
err := ioutil.WriteFile(hostFilePath, data, 0777) err := ioutil.WriteFile(hostFilePath, data, 0777)
if err != nil { if err != nil {
glog.Errorf("Error writing secret data to host path: %v, %v", hostFilePath, err) glog.Errorf("Error writing secret data to host path: %v, %v", hostFilePath, err)
@ -120,14 +132,16 @@ func (sv *secretVolume) GetPath() string {
} }
func (sv *secretVolume) TearDown() error { func (sv *secretVolume) TearDown() error {
glog.V(3).Infof("Tearing down volume %v for pod %v at %v", sv.volName, sv.podRef.UID, sv.GetPath()) return sv.TearDownAt(sv.GetPath())
tmpDir, err := volume.RenameDirectory(sv.GetPath(), sv.volName+".deleting~") }
if err != nil {
return err func (sv *secretVolume) TearDownAt(dir string) error {
} glog.V(3).Infof("Tearing down volume %v for pod %v at %v", sv.volName, sv.podRef.UID, dir)
err = os.RemoveAll(tmpDir)
if err != nil { // Wrap EmptyDir, let it do the teardown.
return err wrapped, err := sv.plugin.host.NewWrapperCleaner(wrappedVolumeSpec, sv.podRef.UID)
} if err != nil {
return nil return err
}
return wrapped.TearDownAt(dir)
} }

View File

@ -27,16 +27,18 @@ import (
"github.com/GoogleCloudPlatform/kubernetes/pkg/api" "github.com/GoogleCloudPlatform/kubernetes/pkg/api"
"github.com/GoogleCloudPlatform/kubernetes/pkg/client" "github.com/GoogleCloudPlatform/kubernetes/pkg/client"
"github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/volume" "github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/volume"
"github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/volume/empty_dir"
"github.com/GoogleCloudPlatform/kubernetes/pkg/types" "github.com/GoogleCloudPlatform/kubernetes/pkg/types"
"github.com/GoogleCloudPlatform/kubernetes/pkg/util/mount"
) )
func newTestHost(t *testing.T, fakeKubeClient client.Interface) volume.Host { func newTestHost(t *testing.T, client client.Interface) volume.Host {
tempDir, err := ioutil.TempDir("/tmp", "secret_volume_test.") tempDir, err := ioutil.TempDir("/tmp", "secret_volume_test.")
if err != nil { if err != nil {
t.Fatalf("can't make a temp rootdir: %v", err) t.Fatalf("can't make a temp rootdir: %v", err)
} }
return &volume.FakeHost{tempDir, fakeKubeClient} return volume.NewFakeHost(tempDir, client, empty_dir.ProbeVolumePluginsWithMounter(&mount.FakeMounter{}))
} }
func TestCanSupport(t *testing.T) { func TestCanSupport(t *testing.T) {

View File

@ -25,26 +25,49 @@ import (
"github.com/GoogleCloudPlatform/kubernetes/pkg/types" "github.com/GoogleCloudPlatform/kubernetes/pkg/types"
) )
// FakeHost is useful for testing volume plugins. // fakeHost is useful for testing volume plugins.
type FakeHost struct { type fakeHost struct {
RootDir string rootDir string
KubeClient client.Interface kubeClient client.Interface
pluginMgr PluginMgr
} }
func (f *FakeHost) GetPluginDir(podUID string) string { func NewFakeHost(rootDir string, kubeClient client.Interface, plugins []Plugin) *fakeHost {
return path.Join(f.RootDir, "plugins", podUID) host := &fakeHost{rootDir: rootDir, kubeClient: kubeClient}
host.pluginMgr.InitPlugins(plugins, host)
return host
} }
func (f *FakeHost) GetPodVolumeDir(podUID types.UID, pluginName, volumeName string) string { func (f *fakeHost) GetPluginDir(podUID string) string {
return path.Join(f.RootDir, "pods", string(podUID), "volumes", pluginName, volumeName) return path.Join(f.rootDir, "plugins", podUID)
} }
func (f *FakeHost) GetPodPluginDir(podUID types.UID, pluginName string) string { func (f *fakeHost) GetPodVolumeDir(podUID types.UID, pluginName, volumeName string) string {
return path.Join(f.RootDir, "pods", string(podUID), "plugins", pluginName) return path.Join(f.rootDir, "pods", string(podUID), "volumes", pluginName, volumeName)
} }
func (f *FakeHost) GetKubeClient() client.Interface { func (f *fakeHost) GetPodPluginDir(podUID types.UID, pluginName string) string {
return f.KubeClient return path.Join(f.rootDir, "pods", string(podUID), "plugins", pluginName)
}
func (f *fakeHost) GetKubeClient() client.Interface {
return f.kubeClient
}
func (f *fakeHost) NewWrapperBuilder(spec *api.Volume, podRef *api.ObjectReference) (Builder, error) {
plug, err := f.pluginMgr.FindPluginBySpec(spec)
if err != nil {
return nil, err
}
return plug.NewBuilder(spec, podRef)
}
func (f *fakeHost) NewWrapperCleaner(spec *api.Volume, podUID types.UID) (Cleaner, error) {
plug, err := f.pluginMgr.FindPluginBySpec(spec)
if err != nil {
return nil, err
}
return plug.NewCleaner(spec.Name, podUID)
} }
// FakePlugin is useful for for testing. It tries to be a fully compliant // FakePlugin is useful for for testing. It tries to be a fully compliant
@ -86,7 +109,11 @@ type FakeVolume struct {
} }
func (fv *FakeVolume) SetUp() error { func (fv *FakeVolume) SetUp() error {
return os.MkdirAll(fv.GetPath(), 0750) return fv.SetUpAt(fv.GetPath())
}
func (fv *FakeVolume) SetUpAt(dir string) error {
return os.MkdirAll(dir, 0750)
} }
func (fv *FakeVolume) GetPath() string { func (fv *FakeVolume) GetPath() string {
@ -94,5 +121,9 @@ func (fv *FakeVolume) GetPath() string {
} }
func (fv *FakeVolume) TearDown() error { func (fv *FakeVolume) TearDown() error {
return os.RemoveAll(fv.GetPath()) return fv.TearDownAt(fv.GetPath())
}
func (fv *FakeVolume) TearDownAt(dir string) error {
return os.RemoveAll(dir)
} }

View File

@ -33,17 +33,25 @@ type Interface interface {
type Builder interface { type Builder interface {
// Uses Interface to provide the path for Docker binds. // Uses Interface to provide the path for Docker binds.
Interface Interface
// SetUp prepares and mounts/unpacks the volume to a directory path. // SetUp prepares and mounts/unpacks the volume to a self-determined
// This may be called more than once, so implementations must be // directory path. This may be called more than once, so
// idempotent. // implementations must be idempotent.
SetUp() error SetUp() error
// SetUpAt prepares and mounts/unpacks the volume to the specified
// directory path, which may or may not exist yet. This may be called
// more than once, so implementations must be idempotent.
SetUpAt(dir string) error
} }
// Cleaner interface provides method to cleanup/unmount the volumes. // Cleaner interface provides method to cleanup/unmount the volumes.
type Cleaner interface { type Cleaner interface {
Interface Interface
// TearDown unmounts the volume and removes traces of the SetUp procedure. // TearDown unmounts the volume from a self-determined directory and
// removes traces of the SetUp procedure.
TearDown() error TearDown() error
// TearDown unmounts the volume from the specified directory and
// removes traces of the SetUp procedure.
TearDownAt(dir string) error
} }
func RenameDirectory(oldPath, newName string) (string, error) { func RenameDirectory(oldPath, newName string) (string, error) {

View File

@ -53,23 +53,45 @@ func (vh *volumeHost) GetKubeClient() client.Interface {
return vh.kubelet.kubeClient return vh.kubelet.kubeClient
} }
func (kl *Kubelet) newVolumeBuilderFromPlugins(spec *api.Volume, podRef *api.ObjectReference) volume.Builder { func (vh *volumeHost) NewWrapperBuilder(spec *api.Volume, podRef *api.ObjectReference) (volume.Builder, error) {
plugin, err := kl.volumePluginMgr.FindPluginBySpec(spec) b, err := vh.kubelet.newVolumeBuilderFromPlugins(spec, podRef)
if err == nil && b == nil {
return nil, errUnsupportedVolumeType
}
return b, nil
}
func (vh *volumeHost) NewWrapperCleaner(spec *api.Volume, podUID types.UID) (volume.Cleaner, error) {
plugin, err := vh.kubelet.volumePluginMgr.FindPluginBySpec(spec)
if err != nil { if err != nil {
glog.Warningf("Can't use volume plugins for %s: %v", spew.Sprintf("%#v", *spec), err) return nil, err
return nil
} }
if plugin == nil { if plugin == nil {
glog.Errorf("No error, but nil volume plugin for %s", spew.Sprintf("%#v", *spec)) // Not found but not an error
return nil return nil, nil
}
c, err := plugin.NewCleaner(spec.Name, podUID)
if err == nil && c == nil {
return nil, errUnsupportedVolumeType
}
return c, nil
}
func (kl *Kubelet) newVolumeBuilderFromPlugins(spec *api.Volume, podRef *api.ObjectReference) (volume.Builder, error) {
plugin, err := kl.volumePluginMgr.FindPluginBySpec(spec)
if err != nil {
return nil, fmt.Errorf("can't use volume plugins for %s: %v", spew.Sprintf("%#v", *spec), err)
}
if plugin == nil {
// Not found but not an error
return nil, nil
} }
builder, err := plugin.NewBuilder(spec, podRef) builder, err := plugin.NewBuilder(spec, podRef)
if err != nil { if err != nil {
glog.Warningf("Error instantiating volume plugin for %s: %v", spew.Sprintf("%#v", *spec), err) return nil, fmt.Errorf("failed to instantiate volume plugin for %s: %v", spew.Sprintf("%#v", *spec), err)
return nil
} }
glog.V(3).Infof("Used volume plugin %q for %s", plugin.Name(), spew.Sprintf("%#v", *spec)) glog.V(3).Infof("Used volume plugin %q for %s", plugin.Name(), spew.Sprintf("%#v", *spec))
return builder return builder, nil
} }
func (kl *Kubelet) mountExternalVolumes(pod *api.Pod) (volumeMap, error) { func (kl *Kubelet) mountExternalVolumes(pod *api.Pod) (volumeMap, error) {
@ -84,7 +106,11 @@ func (kl *Kubelet) mountExternalVolumes(pod *api.Pod) (volumeMap, error) {
} }
// Try to use a plugin for this volume. // Try to use a plugin for this volume.
builder := kl.newVolumeBuilderFromPlugins(volSpec, podRef) builder, err := kl.newVolumeBuilderFromPlugins(volSpec, podRef)
if err != nil {
glog.Errorf("Could not create volume builder for pod %s: %v", pod.UID, err)
return nil, err
}
if builder == nil { if builder == nil {
return nil, errUnsupportedVolumeType return nil, errUnsupportedVolumeType
} }
@ -131,7 +157,11 @@ func (kl *Kubelet) getPodVolumesFromDisk() map[string]volume.Cleaner {
// or volume objects. // or volume objects.
// Try to use a plugin for this volume. // Try to use a plugin for this volume.
cleaner := kl.newVolumeCleanerFromPlugins(volumeKind, volumeName, podUID) cleaner, err := kl.newVolumeCleanerFromPlugins(volumeKind, volumeName, podUID)
if err != nil {
glog.Errorf("Could not create volume cleaner for %s: %v", volumeNameDir.Name(), err)
continue
}
if cleaner == nil { if cleaner == nil {
glog.Errorf("Could not create volume cleaner for %s: %v", volumeNameDir.Name(), errUnsupportedVolumeType) glog.Errorf("Could not create volume cleaner for %s: %v", volumeNameDir.Name(), errUnsupportedVolumeType)
continue continue
@ -143,23 +173,21 @@ func (kl *Kubelet) getPodVolumesFromDisk() map[string]volume.Cleaner {
return currentVolumes return currentVolumes
} }
func (kl *Kubelet) newVolumeCleanerFromPlugins(kind string, name string, podUID types.UID) volume.Cleaner { func (kl *Kubelet) newVolumeCleanerFromPlugins(kind string, name string, podUID types.UID) (volume.Cleaner, error) {
plugName := volume.UnescapePluginName(kind) plugName := volume.UnescapePluginName(kind)
plugin, err := kl.volumePluginMgr.FindPluginByName(plugName) plugin, err := kl.volumePluginMgr.FindPluginByName(plugName)
if err != nil { if err != nil {
// TODO: Maybe we should launch a cleanup of this dir? // TODO: Maybe we should launch a cleanup of this dir?
glog.Warningf("Can't use volume plugins for %s/%s: %v", podUID, kind, err) return nil, fmt.Errorf("can't use volume plugins for %s/%s: %v", podUID, kind, err)
return nil
} }
if plugin == nil { if plugin == nil {
glog.Errorf("No error, but nil volume plugin for %s/%s", podUID, kind) // Not found but not an error.
return nil return nil, nil
} }
cleaner, err := plugin.NewCleaner(name, podUID) cleaner, err := plugin.NewCleaner(name, podUID)
if err != nil { if err != nil {
glog.Warningf("Error instantiating volume plugin for %s/%s: %v", podUID, kind, err) return nil, fmt.Errorf("failed to instantiate volume plugin for %s/%s: %v", podUID, kind, err)
return nil
} }
glog.V(3).Infof("Used volume plugin %q for %s/%s", plugin.Name(), podUID, kind) glog.V(3).Infof("Used volume plugin %q for %s/%s", plugin.Name(), podUID, kind)
return cleaner return cleaner, nil
} }