From 72e899e1b585d7f43eef17feb2dfdaa648dd9843 Mon Sep 17 00:00:00 2001 From: jiangyaoguo Date: Fri, 24 Jul 2015 17:20:42 +0800 Subject: [PATCH] refector rbd volume to seperate builder and cleaner --- pkg/volume/rbd/disk_manager.go | 16 +++---- pkg/volume/rbd/rbd.go | 88 ++++++++++++++++++++-------------- pkg/volume/rbd/rbd_test.go | 8 ++-- pkg/volume/rbd/rbd_util.go | 85 ++++++++++++++++---------------- 4 files changed, 105 insertions(+), 92 deletions(-) diff --git a/pkg/volume/rbd/disk_manager.go b/pkg/volume/rbd/disk_manager.go index f999b82bbd3..146a109e388 100644 --- a/pkg/volume/rbd/disk_manager.go +++ b/pkg/volume/rbd/disk_manager.go @@ -33,14 +33,14 @@ import ( type diskManager interface { MakeGlobalPDName(disk rbd) string // Attaches the disk to the kubelet's host machine. - AttachDisk(disk rbd) error + AttachDisk(disk rbdBuilder) error // Detaches the disk from the kubelet's host machine. - DetachDisk(disk rbd, mntPath string) error + DetachDisk(disk rbdCleaner, mntPath string) error } // utility to mount a disk based filesystem -func diskSetUp(manager diskManager, disk rbd, volPath string, mounter mount.Interface) error { - globalPDPath := manager.MakeGlobalPDName(disk) +func diskSetUp(manager diskManager, b rbdBuilder, volPath string, mounter mount.Interface) error { + globalPDPath := manager.MakeGlobalPDName(*b.rbd) // TODO: handle failed mounts here. mountpoint, err := mounter.IsMountPoint(volPath) @@ -51,7 +51,7 @@ func diskSetUp(manager diskManager, disk rbd, volPath string, mounter mount.Inte if mountpoint { return nil } - if err := manager.AttachDisk(disk); err != nil { + if err := manager.AttachDisk(b); err != nil { glog.Errorf("failed to attach disk") return err } @@ -62,7 +62,7 @@ func diskSetUp(manager diskManager, disk rbd, volPath string, mounter mount.Inte } // Perform a bind mount to the full path to allow duplicate mounts of the same disk. options := []string{"bind"} - if disk.ReadOnly { + if b.ReadOnly { options = append(options, "ro") } err = mounter.Mount(globalPDPath, volPath, "", options) @@ -74,7 +74,7 @@ func diskSetUp(manager diskManager, disk rbd, volPath string, mounter mount.Inte } // utility to tear down a disk based filesystem -func diskTearDown(manager diskManager, disk rbd, volPath string, mounter mount.Interface) error { +func diskTearDown(manager diskManager, c rbdCleaner, volPath string, mounter mount.Interface) error { mountpoint, err := mounter.IsMountPoint(volPath) if err != nil { glog.Errorf("cannot validate mountpoint %s", volPath) @@ -97,7 +97,7 @@ func diskTearDown(manager diskManager, disk rbd, volPath string, mounter mount.I // remaining reference is the global mount. It is safe to detach. if len(refs) == 1 { mntPath := refs[0] - if err := manager.DetachDisk(disk, mntPath); err != nil { + if err := manager.DetachDisk(c, mntPath); err != nil { glog.Errorf("failed to detach disk from %s", mntPath) return err } diff --git a/pkg/volume/rbd/rbd.go b/pkg/volume/rbd/rbd.go index b93d6a0e415..1bcb12fc708 100644 --- a/pkg/volume/rbd/rbd.go +++ b/pkg/volume/rbd/rbd.go @@ -120,20 +120,22 @@ func (plugin *rbdPlugin) newBuilderInternal(spec *volume.Spec, podUID types.UID, keyring = "/etc/ceph/keyring" } - return &rbd{ - podUID: podUID, - volName: spec.Name, - Mon: source.CephMonitors, - Image: source.RBDImage, - Pool: pool, - Id: id, - Keyring: keyring, - Secret: secret, - fsType: source.FSType, - ReadOnly: source.ReadOnly, - manager: manager, - mounter: mounter, - plugin: plugin, + return &rbdBuilder{ + rbd: &rbd{ + podUID: podUID, + volName: spec.Name, + Image: source.RBDImage, + Pool: pool, + ReadOnly: source.ReadOnly, + manager: manager, + mounter: mounter, + plugin: plugin, + }, + Mon: source.CephMonitors, + Id: id, + Keyring: keyring, + Secret: secret, + fsType: source.FSType, }, nil } @@ -143,26 +145,20 @@ func (plugin *rbdPlugin) NewCleaner(volName string, podUID types.UID, mounter mo } func (plugin *rbdPlugin) newCleanerInternal(volName string, podUID types.UID, manager diskManager, mounter mount.Interface) (volume.Cleaner, error) { - return &rbd{ + return &rbdCleaner{&rbd{ podUID: podUID, volName: volName, manager: manager, mounter: mounter, plugin: plugin, - }, nil + }}, nil } type rbd struct { - volName string - podUID types.UID - // capitalized so they can be exported in persistRBD() - Mon []string + volName string + podUID types.UID Pool string - Id string Image string - Keyring string - Secret string - fsType string ReadOnly bool plugin *rbdPlugin mounter mount.Interface @@ -176,37 +172,55 @@ func (rbd *rbd) GetPath() string { return rbd.plugin.host.GetPodVolumeDir(rbd.podUID, util.EscapeQualifiedNameForDisk(name), rbd.volName) } -func (rbd *rbd) SetUp() error { - return rbd.SetUpAt(rbd.GetPath()) +type rbdBuilder struct { + *rbd + // capitalized so they can be exported in persistRBD() + Mon []string + Id string + Keyring string + Secret string + fsType string } -func (rbd *rbd) SetUpAt(dir string) error { +var _ volume.Builder = &rbdBuilder{} + +func (b *rbdBuilder) SetUp() error { + return b.SetUpAt(b.GetPath()) +} + +func (b *rbdBuilder) SetUpAt(dir string) error { // diskSetUp checks mountpoints and prevent repeated calls - err := diskSetUp(rbd.manager, *rbd, dir, rbd.mounter) + err := diskSetUp(b.manager, *b, dir, b.mounter) if err != nil { glog.Errorf("rbd: failed to setup") return err } - globalPDPath := rbd.manager.MakeGlobalPDName(*rbd) + globalPDPath := b.manager.MakeGlobalPDName(*b.rbd) // make mountpoint rw/ro work as expected //FIXME revisit pkg/util/mount and ensure rw/ro is implemented as expected mode := "rw" - if rbd.ReadOnly { + if b.ReadOnly { mode = "ro" } - rbd.plugin.execCommand("mount", []string{"-o", "remount," + mode, globalPDPath, dir}) + b.plugin.execCommand("mount", []string{"-o", "remount," + mode, globalPDPath, dir}) return nil } -// Unmounts the bind mount, and detaches the disk only if the disk -// resource was the last reference to that disk on the kubelet. -func (rbd *rbd) TearDown() error { - return rbd.TearDownAt(rbd.GetPath()) +type rbdCleaner struct { + *rbd } -func (rbd *rbd) TearDownAt(dir string) error { - return diskTearDown(rbd.manager, *rbd, dir, rbd.mounter) +var _ volume.Cleaner = &rbdCleaner{} + +// Unmounts the bind mount, and detaches the disk only if the disk +// resource was the last reference to that disk on the kubelet. +func (c *rbdCleaner) TearDown() error { + return c.TearDownAt(c.GetPath()) +} + +func (c *rbdCleaner) TearDownAt(dir string) error { + return diskTearDown(c.manager, *c, dir, c.mounter) } func (plugin *rbdPlugin) execCommand(command string, args []string) ([]byte, error) { diff --git a/pkg/volume/rbd/rbd_test.go b/pkg/volume/rbd/rbd_test.go index 1b9a53419e7..94d3aa1c5ec 100644 --- a/pkg/volume/rbd/rbd_test.go +++ b/pkg/volume/rbd/rbd_test.go @@ -47,8 +47,8 @@ type fakeDiskManager struct{} func (fake *fakeDiskManager) MakeGlobalPDName(disk rbd) string { return "/tmp/fake_rbd_path" } -func (fake *fakeDiskManager) AttachDisk(disk rbd) error { - globalPath := disk.manager.MakeGlobalPDName(disk) +func (fake *fakeDiskManager) AttachDisk(b rbdBuilder) error { + globalPath := b.manager.MakeGlobalPDName(*b.rbd) err := os.MkdirAll(globalPath, 0750) if err != nil { return err @@ -56,8 +56,8 @@ func (fake *fakeDiskManager) AttachDisk(disk rbd) error { return nil } -func (fake *fakeDiskManager) DetachDisk(disk rbd, mntPath string) error { - globalPath := disk.manager.MakeGlobalPDName(disk) +func (fake *fakeDiskManager) DetachDisk(c rbdCleaner, mntPath string) error { + globalPath := c.manager.MakeGlobalPDName(*c.rbd) err := os.RemoveAll(globalPath) if err != nil { return err diff --git a/pkg/volume/rbd/rbd_util.go b/pkg/volume/rbd/rbd_util.go index 55d1cf814c7..f40c358de0a 100644 --- a/pkg/volume/rbd/rbd_util.go +++ b/pkg/volume/rbd/rbd_util.go @@ -63,32 +63,32 @@ func (util *RBDUtil) MakeGlobalPDName(rbd rbd) string { return makePDNameInternal(rbd.plugin.host, rbd.Pool, rbd.Image) } -func (util *RBDUtil) rbdLock(rbd rbd, lock bool) error { +func (util *RBDUtil) rbdLock(b rbdBuilder, lock bool) error { var err error var output, locker string var cmd []byte var secret_opt []string - if rbd.Secret != "" { - secret_opt = []string{"--key=" + rbd.Secret} + if b.Secret != "" { + secret_opt = []string{"--key=" + b.Secret} } else { - secret_opt = []string{"-k", rbd.Keyring} + secret_opt = []string{"-k", b.Keyring} } // construct lock id using host name and a magic prefix lock_id := "kubelet_lock_magic_" + node.GetHostname("") - l := len(rbd.Mon) + l := len(b.Mon) // avoid mount storm, pick a host randomly start := rand.Int() % l // iterate all hosts until mount succeeds. for i := start; i < start+l; i++ { - mon := rbd.Mon[i%l] + mon := b.Mon[i%l] // cmd "rbd lock list" serves two purposes: // for fencing, check if lock already held for this host // this edge case happens if host crashes in the middle of acquiring lock and mounting rbd // for defencing, get the locker name, something like "client.1234" - cmd, err = rbd.plugin.execCommand("rbd", - append([]string{"lock", "list", rbd.Image, "--pool", rbd.Pool, "--id", rbd.Id, "-m", mon}, secret_opt...)) + cmd, err = b.plugin.execCommand("rbd", + append([]string{"lock", "list", b.Image, "--pool", b.Pool, "--id", b.Id, "-m", mon}, secret_opt...)) output = string(cmd) if err != nil { @@ -103,8 +103,8 @@ func (util *RBDUtil) rbdLock(rbd rbd, lock bool) error { return nil } // hold a lock: rbd lock add - cmd, err = rbd.plugin.execCommand("rbd", - append([]string{"lock", "add", rbd.Image, lock_id, "--pool", rbd.Pool, "--id", rbd.Id, "-m", mon}, secret_opt...)) + cmd, err = b.plugin.execCommand("rbd", + append([]string{"lock", "add", b.Image, lock_id, "--pool", b.Pool, "--id", b.Id, "-m", mon}, secret_opt...)) } else { // defencing, find locker name ind := strings.LastIndex(output, lock_id) - 1 @@ -115,8 +115,8 @@ func (util *RBDUtil) rbdLock(rbd rbd, lock bool) error { } } // remove a lock: rbd lock remove - cmd, err = rbd.plugin.execCommand("rbd", - append([]string{"lock", "remove", rbd.Image, lock_id, locker, "--pool", rbd.Pool, "--id", rbd.Id, "-m", mon}, secret_opt...)) + cmd, err = b.plugin.execCommand("rbd", + append([]string{"lock", "remove", b.Image, lock_id, locker, "--pool", b.Pool, "--id", b.Id, "-m", mon}, secret_opt...)) } if err == nil { @@ -127,7 +127,7 @@ func (util *RBDUtil) rbdLock(rbd rbd, lock bool) error { return err } -func (util *RBDUtil) persistRBD(rbd rbd, mnt string) error { +func (util *RBDUtil) persistRBD(rbd rbdBuilder, mnt string) error { file := path.Join(mnt, "rbd.json") fp, err := os.Create(file) if err != nil { @@ -159,47 +159,47 @@ func (util *RBDUtil) loadRBD(rbd *rbd, mnt string) error { return nil } -func (util *RBDUtil) fencing(rbd rbd) error { +func (util *RBDUtil) fencing(b rbdBuilder) error { // no need to fence readOnly - if rbd.ReadOnly { + if b.ReadOnly { return nil } - return util.rbdLock(rbd, true) + return util.rbdLock(b, true) } -func (util *RBDUtil) defencing(rbd rbd) error { +func (util *RBDUtil) defencing(c rbdCleaner) error { // no need to fence readOnly - if rbd.ReadOnly { + if c.ReadOnly { return nil } - return util.rbdLock(rbd, false) + return util.rbdLock(rbdBuilder{rbd: c.rbd}, false) } -func (util *RBDUtil) AttachDisk(rbd rbd) error { +func (util *RBDUtil) AttachDisk(b rbdBuilder) error { var err error - devicePath := strings.Join([]string{"/dev/rbd", rbd.Pool, rbd.Image}, "/") + devicePath := strings.Join([]string{"/dev/rbd", b.Pool, b.Image}, "/") exist := waitForPathToExist(devicePath, 1) if !exist { // modprobe - _, err = rbd.plugin.execCommand("modprobe", []string{"rbd"}) + _, err = b.plugin.execCommand("modprobe", []string{"rbd"}) if err != nil { return fmt.Errorf("rbd: failed to modprobe rbd error:%v", err) } // rbd map - l := len(rbd.Mon) + l := len(b.Mon) // avoid mount storm, pick a host randomly start := rand.Int() % l // iterate all hosts until mount succeeds. for i := start; i < start+l; i++ { - mon := rbd.Mon[i%l] + mon := b.Mon[i%l] glog.V(1).Infof("rbd: map mon %s", mon) - if rbd.Secret != "" { - _, err = rbd.plugin.execCommand("rbd", - []string{"map", rbd.Image, "--pool", rbd.Pool, "--id", rbd.Id, "-m", mon, "--key=" + rbd.Secret}) + if b.Secret != "" { + _, err = b.plugin.execCommand("rbd", + []string{"map", b.Image, "--pool", b.Pool, "--id", b.Id, "-m", mon, "--key=" + b.Secret}) } else { - _, err = rbd.plugin.execCommand("rbd", - []string{"map", rbd.Image, "--pool", rbd.Pool, "--id", rbd.Id, "-m", mon, "-k", rbd.Keyring}) + _, err = b.plugin.execCommand("rbd", + []string{"map", b.Image, "--pool", b.Pool, "--id", b.Id, "-m", mon, "-k", b.Keyring}) } if err == nil { break @@ -214,8 +214,8 @@ func (util *RBDUtil) AttachDisk(rbd rbd) error { return errors.New("Could not map image: Timeout after 10s") } // mount it - globalPDPath := rbd.manager.MakeGlobalPDName(rbd) - mountpoint, err := rbd.mounter.IsMountPoint(globalPDPath) + globalPDPath := b.manager.MakeGlobalPDName(*b.rbd) + mountpoint, err := b.mounter.IsMountPoint(globalPDPath) // in the first time, the path shouldn't exist and IsMountPoint is expected to get NotExist if err != nil && !os.IsNotExist(err) { return fmt.Errorf("rbd: %s failed to check mountpoint", globalPDPath) @@ -229,43 +229,42 @@ func (util *RBDUtil) AttachDisk(rbd rbd) error { } // fence off other mappers - if err := util.fencing(rbd); err != nil { - return fmt.Errorf("rbd: image %s is locked by other nodes", rbd.Image) + if err := util.fencing(b); err != nil { + return fmt.Errorf("rbd: image %s is locked by other nodes", b.Image) } // rbd lock remove needs ceph and image config // but kubelet doesn't get them from apiserver during teardown // so persit rbd config so upon disk detach, rbd lock can be removed // since rbd json is persisted in the same local directory that is used as rbd mountpoint later, // the json file remains invisible during rbd mount and thus won't be removed accidentally. - util.persistRBD(rbd, globalPDPath) + util.persistRBD(b, globalPDPath) - if err = rbd.mounter.Mount(devicePath, globalPDPath, rbd.fsType, nil); err != nil { - err = fmt.Errorf("rbd: failed to mount rbd volume %s [%s] to %s, error %v", devicePath, rbd.fsType, globalPDPath, err) + if err = b.mounter.Mount(devicePath, globalPDPath, b.fsType, nil); err != nil { + err = fmt.Errorf("rbd: failed to mount rbd volume %s [%s] to %s, error %v", devicePath, b.fsType, globalPDPath, err) } - return err } -func (util *RBDUtil) DetachDisk(rbd rbd, mntPath string) error { - device, cnt, err := mount.GetDeviceNameFromMount(rbd.mounter, mntPath) +func (util *RBDUtil) DetachDisk(c rbdCleaner, mntPath string) error { + device, cnt, err := mount.GetDeviceNameFromMount(c.mounter, mntPath) if err != nil { return fmt.Errorf("rbd detach disk: failed to get device from mnt: %s\nError: %v", mntPath, err) } - if err = rbd.mounter.Unmount(mntPath); err != nil { + if err = c.mounter.Unmount(mntPath); err != nil { return fmt.Errorf("rbd detach disk: failed to umount: %s\nError: %v", mntPath, err) } // if device is no longer used, see if can unmap if cnt <= 1 { // rbd unmap - _, err = rbd.plugin.execCommand("rbd", []string{"unmap", device}) + _, err = c.plugin.execCommand("rbd", []string{"unmap", device}) if err != nil { return fmt.Errorf("rbd: failed to unmap device %s:Error: %v", device, err) } // load ceph and image/pool info to remove fencing - if err := util.loadRBD(&rbd, mntPath); err == nil { + if err := util.loadRBD(c.rbd, mntPath); err == nil { // remove rbd lock - util.defencing(rbd) + util.defencing(c) } glog.Infof("rbd: successfully unmap device %s", device)