rbd: Use VolumeHost.GetExec() to execute stuff in volume plugins

This commit is contained in:
Jan Safranek 2017-08-22 15:49:21 +02:00
parent 07dea6b447
commit 73b101c14b
4 changed files with 55 additions and 44 deletions

View File

@ -29,7 +29,6 @@ go_library(
"//vendor/k8s.io/apimachinery/pkg/util/sets:go_default_library", "//vendor/k8s.io/apimachinery/pkg/util/sets:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/util/uuid:go_default_library", "//vendor/k8s.io/apimachinery/pkg/util/uuid:go_default_library",
"//vendor/k8s.io/client-go/kubernetes:go_default_library", "//vendor/k8s.io/client-go/kubernetes:go_default_library",
"//vendor/k8s.io/utils/exec:go_default_library",
], ],
) )

View File

@ -33,7 +33,6 @@ import (
"k8s.io/kubernetes/pkg/volume" "k8s.io/kubernetes/pkg/volume"
volutil "k8s.io/kubernetes/pkg/volume/util" volutil "k8s.io/kubernetes/pkg/volume/util"
"k8s.io/kubernetes/pkg/volume/util/volumehelper" "k8s.io/kubernetes/pkg/volume/util/volumehelper"
"k8s.io/utils/exec"
) )
var ( var (
@ -42,12 +41,11 @@ var (
// This is the primary entrypoint for volume plugins. // This is the primary entrypoint for volume plugins.
func ProbeVolumePlugins() []volume.VolumePlugin { func ProbeVolumePlugins() []volume.VolumePlugin {
return []volume.VolumePlugin{&rbdPlugin{nil, exec.New()}} return []volume.VolumePlugin{&rbdPlugin{nil}}
} }
type rbdPlugin struct { type rbdPlugin struct {
host volume.VolumeHost host volume.VolumeHost
exe exec.Interface
} }
var _ volume.VolumePlugin = &rbdPlugin{} var _ volume.VolumePlugin = &rbdPlugin{}
@ -131,7 +129,7 @@ func (plugin *rbdPlugin) NewMounter(spec *volume.Spec, pod *v1.Pod, _ volume.Vol
} }
// Inject real implementations here, test through the internal function. // Inject real implementations here, test through the internal function.
return plugin.newMounterInternal(spec, pod.UID, &RBDUtil{}, plugin.host.GetMounter(plugin.GetPluginName()), secret) return plugin.newMounterInternal(spec, pod.UID, &RBDUtil{}, plugin.host.GetMounter(plugin.GetPluginName()), plugin.host.GetExec(plugin.GetPluginName()), secret)
} }
func (plugin *rbdPlugin) getRBDVolumeSource(spec *volume.Spec) (*v1.RBDVolumeSource, bool) { func (plugin *rbdPlugin) getRBDVolumeSource(spec *volume.Spec) (*v1.RBDVolumeSource, bool) {
@ -144,7 +142,7 @@ func (plugin *rbdPlugin) getRBDVolumeSource(spec *volume.Spec) (*v1.RBDVolumeSou
} }
} }
func (plugin *rbdPlugin) newMounterInternal(spec *volume.Spec, podUID types.UID, manager diskManager, mounter mount.Interface, secret string) (volume.Mounter, error) { func (plugin *rbdPlugin) newMounterInternal(spec *volume.Spec, podUID types.UID, manager diskManager, mounter mount.Interface, exec mount.Exec, secret string) (volume.Mounter, error) {
source, readOnly := plugin.getRBDVolumeSource(spec) source, readOnly := plugin.getRBDVolumeSource(spec)
pool := source.RBDPool pool := source.RBDPool
id := source.RadosUser id := source.RadosUser
@ -158,7 +156,8 @@ func (plugin *rbdPlugin) newMounterInternal(spec *volume.Spec, podUID types.UID,
Pool: pool, Pool: pool,
ReadOnly: readOnly, ReadOnly: readOnly,
manager: manager, manager: manager,
mounter: volumehelper.NewSafeFormatAndMountFromHost(plugin.GetPluginName(), plugin.host), mounter: &mount.SafeFormatAndMount{Interface: mounter, Exec: exec},
exec: exec,
plugin: plugin, plugin: plugin,
MetricsProvider: volume.NewMetricsStatFS(getPath(podUID, spec.Name(), plugin.host)), MetricsProvider: volume.NewMetricsStatFS(getPath(podUID, spec.Name(), plugin.host)),
}, },
@ -173,17 +172,18 @@ func (plugin *rbdPlugin) newMounterInternal(spec *volume.Spec, podUID types.UID,
func (plugin *rbdPlugin) NewUnmounter(volName string, podUID types.UID) (volume.Unmounter, error) { func (plugin *rbdPlugin) NewUnmounter(volName string, podUID types.UID) (volume.Unmounter, error) {
// Inject real implementations here, test through the internal function. // Inject real implementations here, test through the internal function.
return plugin.newUnmounterInternal(volName, podUID, &RBDUtil{}, plugin.host.GetMounter(plugin.GetPluginName())) return plugin.newUnmounterInternal(volName, podUID, &RBDUtil{}, plugin.host.GetMounter(plugin.GetPluginName()), plugin.host.GetExec(plugin.GetPluginName()))
} }
func (plugin *rbdPlugin) newUnmounterInternal(volName string, podUID types.UID, manager diskManager, mounter mount.Interface) (volume.Unmounter, error) { func (plugin *rbdPlugin) newUnmounterInternal(volName string, podUID types.UID, manager diskManager, mounter mount.Interface, exec mount.Exec) (volume.Unmounter, error) {
return &rbdUnmounter{ return &rbdUnmounter{
rbdMounter: &rbdMounter{ rbdMounter: &rbdMounter{
rbd: &rbd{ rbd: &rbd{
podUID: podUID, podUID: podUID,
volName: volName, volName: volName,
manager: manager, manager: manager,
mounter: volumehelper.NewSafeFormatAndMountFromHost(plugin.GetPluginName(), plugin.host), mounter: &mount.SafeFormatAndMount{Interface: mounter, Exec: exec},
exec: exec,
plugin: plugin, plugin: plugin,
MetricsProvider: volume.NewMetricsStatFS(getPath(podUID, volName, plugin.host)), MetricsProvider: volume.NewMetricsStatFS(getPath(podUID, volName, plugin.host)),
}, },
@ -246,6 +246,8 @@ func (plugin *rbdPlugin) newDeleterInternal(spec *volume.Spec, admin, secret str
Pool: spec.PersistentVolume.Spec.RBD.RBDPool, Pool: spec.PersistentVolume.Spec.RBD.RBDPool,
manager: manager, manager: manager,
plugin: plugin, plugin: plugin,
mounter: &mount.SafeFormatAndMount{Interface: plugin.host.GetMounter(plugin.GetPluginName())},
exec: plugin.host.GetExec(plugin.GetPluginName()),
}, },
Mon: spec.PersistentVolume.Spec.RBD.CephMonitors, Mon: spec.PersistentVolume.Spec.RBD.CephMonitors,
adminId: admin, adminId: admin,
@ -263,6 +265,8 @@ func (plugin *rbdPlugin) newProvisionerInternal(options volume.VolumeOptions, ma
rbd: &rbd{ rbd: &rbd{
manager: manager, manager: manager,
plugin: plugin, plugin: plugin,
mounter: &mount.SafeFormatAndMount{Interface: plugin.host.GetMounter(plugin.GetPluginName())},
exec: plugin.host.GetExec(plugin.GetPluginName()),
}, },
}, },
options: options, options: options,
@ -402,6 +406,7 @@ type rbd struct {
ReadOnly bool ReadOnly bool
plugin *rbdPlugin plugin *rbdPlugin
mounter *mount.SafeFormatAndMount mounter *mount.SafeFormatAndMount
exec mount.Exec
// Utility interface that provides API calls to the provider to attach/detach disks. // Utility interface that provides API calls to the provider to attach/detach disks.
manager diskManager manager diskManager
volume.MetricsProvider volume.MetricsProvider
@ -480,11 +485,6 @@ func (c *rbdUnmounter) TearDownAt(dir string) error {
return diskTearDown(c.manager, *c, dir, c.mounter) return diskTearDown(c.manager, *c, dir, c.mounter)
} }
func (plugin *rbdPlugin) execCommand(command string, args []string) ([]byte, error) {
cmd := plugin.exe.Command(command, args...)
return cmd.CombinedOutput()
}
func getVolumeSource( func getVolumeSource(
spec *volume.Spec) (*v1.RBDVolumeSource, bool, error) { spec *volume.Spec) (*v1.RBDVolumeSource, bool, error) {
if spec.Volume != nil && spec.Volume.RBD != nil { if spec.Volume != nil && spec.Volume.RBD != nil {

View File

@ -112,7 +112,8 @@ func doTestPlugin(t *testing.T, spec *volume.Spec) {
} }
fdm := NewFakeDiskManager() fdm := NewFakeDiskManager()
defer fdm.Cleanup() defer fdm.Cleanup()
mounter, err := plug.(*rbdPlugin).newMounterInternal(spec, types.UID("poduid"), fdm, &mount.FakeMounter{}, "secrets") exec := mount.NewFakeExec(nil)
mounter, err := plug.(*rbdPlugin).newMounterInternal(spec, types.UID("poduid"), fdm, &mount.FakeMounter{}, exec, "secrets")
if err != nil { if err != nil {
t.Errorf("Failed to make a new Mounter: %v", err) t.Errorf("Failed to make a new Mounter: %v", err)
} }
@ -137,7 +138,7 @@ func doTestPlugin(t *testing.T, spec *volume.Spec) {
} }
} }
unmounter, err := plug.(*rbdPlugin).newUnmounterInternal("vol1", types.UID("poduid"), fdm, &mount.FakeMounter{}) unmounter, err := plug.(*rbdPlugin).newUnmounterInternal("vol1", types.UID("poduid"), fdm, &mount.FakeMounter{}, exec)
if err != nil { if err != nil {
t.Errorf("Failed to make a new Unmounter: %v", err) t.Errorf("Failed to make a new Unmounter: %v", err)
} }

View File

@ -38,7 +38,6 @@ import (
"k8s.io/kubernetes/pkg/util/mount" "k8s.io/kubernetes/pkg/util/mount"
"k8s.io/kubernetes/pkg/util/node" "k8s.io/kubernetes/pkg/util/node"
"k8s.io/kubernetes/pkg/volume" "k8s.io/kubernetes/pkg/volume"
"k8s.io/utils/exec"
) )
const ( const (
@ -58,15 +57,24 @@ func getDevFromImageAndPool(pool, image string) (string, bool) {
// https://github.com/torvalds/linux/blob/master/drivers/block/rbd.c // https://github.com/torvalds/linux/blob/master/drivers/block/rbd.c
name := f.Name() name := f.Name()
// first match pool, then match name // first match pool, then match name
po := path.Join(sys_path, name, "pool") poolFile := path.Join(sys_path, name, "pool")
img := path.Join(sys_path, name, "name") poolBytes, err := ioutil.ReadFile(poolFile)
exe := exec.New()
out, err := exe.Command("cat", po, img).CombinedOutput()
if err != nil { if err != nil {
glog.V(4).Infof("Error reading %s: %v", poolFile, err)
continue continue
} }
matched, err := regexp.MatchString("^"+pool+"\n"+image+"\n$", string(out)) if strings.TrimSpace(string(poolBytes)) != pool {
if err != nil || !matched { glog.V(4).Infof("Device %s is not %q: %q", name, pool, string(poolBytes))
continue
}
imgFile := path.Join(sys_path, name, "name")
imgBytes, err := ioutil.ReadFile(imgFile)
if err != nil {
glog.V(4).Infof("Error reading %s: %v", imgFile, err)
continue
}
if strings.TrimSpace(string(imgBytes)) != image {
glog.V(4).Infof("Device %s is not %q: %q", name, image, string(imgBytes))
continue continue
} }
// found a match, check if device exists // found a match, check if device exists
@ -142,8 +150,9 @@ func (util *RBDUtil) rbdLock(b rbdMounter, lock bool) error {
// for fencing, check if lock already held for this host // for fencing, check if lock already held for this host
// this edge case happens if host crashes in the middle of acquiring lock and mounting rbd // this edge case happens if host crashes in the middle of acquiring lock and mounting rbd
// for defencing, get the locker name, something like "client.1234" // for defencing, get the locker name, something like "client.1234"
cmd, err = b.plugin.execCommand("rbd", args := []string{"lock", "list", b.Image, "--pool", b.Pool, "--id", b.Id, "-m", mon}
append([]string{"lock", "list", b.Image, "--pool", b.Pool, "--id", b.Id, "-m", mon}, secret_opt...)) args = append(args, secret_opt...)
cmd, err = b.exec.Run("rbd", args...)
output = string(cmd) output = string(cmd)
glog.Infof("lock list output %q", output) glog.Infof("lock list output %q", output)
if err != nil { if err != nil {
@ -166,8 +175,9 @@ func (util *RBDUtil) rbdLock(b rbdMounter, lock bool) error {
if len(v) > 0 { if len(v) > 0 {
lockInfo := strings.Split(v[0], " ") lockInfo := strings.Split(v[0], " ")
if len(lockInfo) > 2 { if len(lockInfo) > 2 {
cmd, err = b.plugin.execCommand("rbd", args := []string{"lock", "remove", b.Image, lockInfo[1], lockInfo[0], "--pool", b.Pool, "--id", b.Id, "-m", mon}
append([]string{"lock", "remove", b.Image, lockInfo[1], lockInfo[0], "--pool", b.Pool, "--id", b.Id, "-m", mon}, secret_opt...)) args = append(args, secret_opt...)
cmd, err = b.exec.Run("rbd", args...)
glog.Infof("remove orphaned locker %s from client %s: err %v, output: %s", lockInfo[1], lockInfo[0], err, string(cmd)) glog.Infof("remove orphaned locker %s from client %s: err %v, output: %s", lockInfo[1], lockInfo[0], err, string(cmd))
} }
} }
@ -175,8 +185,9 @@ func (util *RBDUtil) rbdLock(b rbdMounter, lock bool) error {
} }
// hold a lock: rbd lock add // hold a lock: rbd lock add
cmd, err = b.plugin.execCommand("rbd", args := []string{"lock", "add", b.Image, lock_id, "--pool", b.Pool, "--id", b.Id, "-m", mon}
append([]string{"lock", "add", b.Image, lock_id, "--pool", b.Pool, "--id", b.Id, "-m", mon}, secret_opt...)) args = append(args, secret_opt...)
cmd, err = b.exec.Run("rbd", args...)
} else { } else {
// defencing, find locker name // defencing, find locker name
ind := strings.LastIndex(output, lock_id) - 1 ind := strings.LastIndex(output, lock_id) - 1
@ -187,8 +198,9 @@ func (util *RBDUtil) rbdLock(b rbdMounter, lock bool) error {
} }
} }
// remove a lock: rbd lock remove // remove a lock: rbd lock remove
cmd, err = b.plugin.execCommand("rbd", args := []string{"lock", "remove", b.Image, lock_id, locker, "--pool", b.Pool, "--id", b.Id, "-m", mon}
append([]string{"lock", "remove", b.Image, lock_id, locker, "--pool", b.Pool, "--id", b.Id, "-m", mon}, secret_opt...)) args = append(args, secret_opt...)
cmd, err = b.exec.Run("rbd", args...)
} }
if err == nil { if err == nil {
@ -268,8 +280,7 @@ func (util *RBDUtil) AttachDisk(b rbdMounter) error {
devicePath, found := waitForPath(b.Pool, b.Image, 1) devicePath, found := waitForPath(b.Pool, b.Image, 1)
if !found { if !found {
// modprobe _, err = b.exec.Run("modprobe", "rbd")
_, err = b.plugin.execCommand("modprobe", []string{"rbd"})
if err != nil { if err != nil {
glog.Warningf("rbd: failed to load rbd kernel module:%v", err) glog.Warningf("rbd: failed to load rbd kernel module:%v", err)
} }
@ -294,11 +305,11 @@ func (util *RBDUtil) AttachDisk(b rbdMounter) error {
mon := b.Mon[i%l] mon := b.Mon[i%l]
glog.V(1).Infof("rbd: map mon %s", mon) glog.V(1).Infof("rbd: map mon %s", mon)
if b.Secret != "" { if b.Secret != "" {
output, err = b.plugin.execCommand("rbd", output, err = b.exec.Run("rbd",
[]string{"map", b.Image, "--pool", b.Pool, "--id", b.Id, "-m", mon, "--key=" + b.Secret}) "map", b.Image, "--pool", b.Pool, "--id", b.Id, "-m", mon, "--key="+b.Secret)
} else { } else {
output, err = b.plugin.execCommand("rbd", output, err = b.exec.Run("rbd",
[]string{"map", b.Image, "--pool", b.Pool, "--id", b.Id, "-m", mon, "-k", b.Keyring}) "map", b.Image, "--pool", b.Pool, "--id", b.Id, "-m", mon, "-k", b.Keyring)
} }
if err == nil { if err == nil {
break break
@ -332,7 +343,7 @@ func (util *RBDUtil) DetachDisk(c rbdUnmounter, mntPath string) error {
// if device is no longer used, see if can unmap // if device is no longer used, see if can unmap
if cnt <= 1 { if cnt <= 1 {
// rbd unmap // rbd unmap
_, err = c.plugin.execCommand("rbd", []string{"unmap", device}) _, err = c.exec.Run("rbd", "unmap", device)
if err != nil { if err != nil {
return rbdErrors(err, fmt.Errorf("rbd: failed to unmap device %s:Error: %v", device, err)) return rbdErrors(err, fmt.Errorf("rbd: failed to unmap device %s:Error: %v", device, err))
} }
@ -374,7 +385,7 @@ func (util *RBDUtil) CreateImage(p *rbdVolumeProvisioner) (r *v1.RBDVolumeSource
features := strings.Join(p.rbdMounter.imageFeatures, ",") features := strings.Join(p.rbdMounter.imageFeatures, ",")
args = append(args, "--image-feature", features) args = append(args, "--image-feature", features)
} }
output, err = p.rbdMounter.plugin.execCommand("rbd", args) output, err = p.exec.Run("rbd", args...)
if err == nil { if err == nil {
break break
} else { } else {
@ -411,8 +422,8 @@ func (util *RBDUtil) DeleteImage(p *rbdVolumeDeleter) error {
for i := start; i < start+l; i++ { for i := start; i < start+l; i++ {
mon := p.rbdMounter.Mon[i%l] mon := p.rbdMounter.Mon[i%l]
glog.V(4).Infof("rbd: rm %s using mon %s, pool %s id %s key %s", p.rbdMounter.Image, mon, p.rbdMounter.Pool, p.rbdMounter.adminId, p.rbdMounter.adminSecret) glog.V(4).Infof("rbd: rm %s using mon %s, pool %s id %s key %s", p.rbdMounter.Image, mon, p.rbdMounter.Pool, p.rbdMounter.adminId, p.rbdMounter.adminSecret)
output, err = p.plugin.execCommand("rbd", output, err = p.exec.Run("rbd",
[]string{"rm", p.rbdMounter.Image, "--pool", p.rbdMounter.Pool, "--id", p.rbdMounter.adminId, "-m", mon, "--key=" + p.rbdMounter.adminSecret}) "rm", p.rbdMounter.Image, "--pool", p.rbdMounter.Pool, "--id", p.rbdMounter.adminId, "-m", mon, "--key="+p.rbdMounter.adminSecret)
if err == nil { if err == nil {
return nil return nil
} else { } else {
@ -437,8 +448,8 @@ func (util *RBDUtil) rbdStatus(b *rbdMounter) (bool, error) {
// Watchers: // Watchers:
// watcher=10.16.153.105:0/710245699 client.14163 cookie=1 // watcher=10.16.153.105:0/710245699 client.14163 cookie=1
glog.V(4).Infof("rbd: status %s using mon %s, pool %s id %s key %s", b.Image, mon, b.Pool, b.adminId, b.adminSecret) glog.V(4).Infof("rbd: status %s using mon %s, pool %s id %s key %s", b.Image, mon, b.Pool, b.adminId, b.adminSecret)
cmd, err = b.plugin.execCommand("rbd", cmd, err = b.exec.Run("rbd",
[]string{"status", b.Image, "--pool", b.Pool, "-m", mon, "--id", b.adminId, "--key=" + b.adminSecret}) "status", b.Image, "--pool", b.Pool, "-m", mon, "--id", b.adminId, "--key="+b.adminSecret)
output = string(cmd) output = string(cmd)
if err != nil { if err != nil {