diff --git a/pkg/volume/rbd/disk_manager.go b/pkg/volume/rbd/disk_manager.go index 7cf570b04ff..f999b82bbd3 100644 --- a/pkg/volume/rbd/disk_manager.go +++ b/pkg/volume/rbd/disk_manager.go @@ -62,7 +62,7 @@ func diskSetUp(manager diskManager, disk rbd, volPath string, mounter mount.Inte } // Perform a bind mount to the full path to allow duplicate mounts of the same disk. options := []string{"bind"} - if disk.readOnly { + if disk.ReadOnly { options = append(options, "ro") } err = mounter.Mount(globalPDPath, volPath, "", options) diff --git a/pkg/volume/rbd/rbd.go b/pkg/volume/rbd/rbd.go index c1a8dd5e1a6..b93d6a0e415 100644 --- a/pkg/volume/rbd/rbd.go +++ b/pkg/volume/rbd/rbd.go @@ -123,14 +123,14 @@ func (plugin *rbdPlugin) newBuilderInternal(spec *volume.Spec, podUID types.UID, return &rbd{ podUID: podUID, volName: spec.Name, - mon: source.CephMonitors, - image: source.RBDImage, - pool: pool, - id: id, - keyring: keyring, - secret: secret, + Mon: source.CephMonitors, + Image: source.RBDImage, + Pool: pool, + Id: id, + Keyring: keyring, + Secret: secret, fsType: source.FSType, - readOnly: source.ReadOnly, + ReadOnly: source.ReadOnly, manager: manager, mounter: mounter, plugin: plugin, @@ -153,16 +153,17 @@ func (plugin *rbdPlugin) newCleanerInternal(volName string, podUID types.UID, ma } type rbd struct { - volName string - podUID types.UID - mon []string - pool string - id string - image string - keyring string - secret string + volName string + podUID types.UID + // capitalized so they can be exported in persistRBD() + Mon []string + Pool string + Id string + Image string + Keyring string + Secret string fsType string - readOnly bool + ReadOnly bool plugin *rbdPlugin mounter mount.Interface // Utility interface that provides API calls to the provider to attach/detach disks. @@ -190,7 +191,7 @@ func (rbd *rbd) SetUpAt(dir string) error { // make mountpoint rw/ro work as expected //FIXME revisit pkg/util/mount and ensure rw/ro is implemented as expected mode := "rw" - if rbd.readOnly { + if rbd.ReadOnly { mode = "ro" } rbd.plugin.execCommand("mount", []string{"-o", "remount," + mode, globalPDPath, dir}) diff --git a/pkg/volume/rbd/rbd_util.go b/pkg/volume/rbd/rbd_util.go index 6425b1b88c0..55d1cf814c7 100644 --- a/pkg/volume/rbd/rbd_util.go +++ b/pkg/volume/rbd/rbd_util.go @@ -22,6 +22,7 @@ limitations under the License. package rbd import ( + "encoding/json" "errors" "fmt" "math/rand" @@ -31,6 +32,7 @@ import ( "time" "github.com/GoogleCloudPlatform/kubernetes/pkg/util/mount" + "github.com/GoogleCloudPlatform/kubernetes/pkg/util/node" "github.com/GoogleCloudPlatform/kubernetes/pkg/volume" "github.com/golang/glog" ) @@ -58,12 +60,125 @@ func makePDNameInternal(host volume.VolumeHost, pool string, image string) strin type RBDUtil struct{} func (util *RBDUtil) MakeGlobalPDName(rbd rbd) string { - return makePDNameInternal(rbd.plugin.host, rbd.pool, rbd.image) + return makePDNameInternal(rbd.plugin.host, rbd.Pool, rbd.Image) +} + +func (util *RBDUtil) rbdLock(rbd rbd, lock bool) error { + var err error + var output, locker string + var cmd []byte + var secret_opt []string + + if rbd.Secret != "" { + secret_opt = []string{"--key=" + rbd.Secret} + } else { + secret_opt = []string{"-k", rbd.Keyring} + } + // construct lock id using host name and a magic prefix + lock_id := "kubelet_lock_magic_" + node.GetHostname("") + + l := len(rbd.Mon) + // avoid mount storm, pick a host randomly + start := rand.Int() % l + // iterate all hosts until mount succeeds. + for i := start; i < start+l; i++ { + mon := rbd.Mon[i%l] + // cmd "rbd lock list" serves two purposes: + // for fencing, check if lock already held for this host + // this edge case happens if host crashes in the middle of acquiring lock and mounting rbd + // for defencing, get the locker name, something like "client.1234" + cmd, err = rbd.plugin.execCommand("rbd", + append([]string{"lock", "list", rbd.Image, "--pool", rbd.Pool, "--id", rbd.Id, "-m", mon}, secret_opt...)) + output = string(cmd) + + if err != nil { + continue + } + + if lock { + // check if lock is already held for this host by matching lock_id and rbd lock id + if strings.Contains(output, lock_id) { + // this host already holds the lock, exit + glog.V(1).Infof("rbd: lock already held for %s", lock_id) + return nil + } + // hold a lock: rbd lock add + cmd, err = rbd.plugin.execCommand("rbd", + append([]string{"lock", "add", rbd.Image, lock_id, "--pool", rbd.Pool, "--id", rbd.Id, "-m", mon}, secret_opt...)) + } else { + // defencing, find locker name + ind := strings.LastIndex(output, lock_id) - 1 + for i := ind; i >= 0; i-- { + if output[i] == '\n' { + locker = output[(i + 1):ind] + break + } + } + // remove a lock: rbd lock remove + cmd, err = rbd.plugin.execCommand("rbd", + append([]string{"lock", "remove", rbd.Image, lock_id, locker, "--pool", rbd.Pool, "--id", rbd.Id, "-m", mon}, secret_opt...)) + } + + if err == nil { + //lock is acquired + break + } + } + return err +} + +func (util *RBDUtil) persistRBD(rbd rbd, mnt string) error { + file := path.Join(mnt, "rbd.json") + fp, err := os.Create(file) + if err != nil { + return fmt.Errorf("rbd: create err %s/%s", file, err) + } + defer fp.Close() + + encoder := json.NewEncoder(fp) + if err = encoder.Encode(rbd); err != nil { + return fmt.Errorf("rbd: encode err: %v.", err) + } + + return nil +} + +func (util *RBDUtil) loadRBD(rbd *rbd, mnt string) error { + file := path.Join(mnt, "rbd.json") + fp, err := os.Open(file) + if err != nil { + return fmt.Errorf("rbd: open err %s/%s", file, err) + } + defer fp.Close() + + decoder := json.NewDecoder(fp) + if err = decoder.Decode(rbd); err != nil { + return fmt.Errorf("rbd: decode err: %v.", err) + } + + return nil +} + +func (util *RBDUtil) fencing(rbd rbd) error { + // no need to fence readOnly + if rbd.ReadOnly { + return nil + } + return util.rbdLock(rbd, true) +} + +func (util *RBDUtil) defencing(rbd rbd) error { + // no need to fence readOnly + if rbd.ReadOnly { + return nil + } + + return util.rbdLock(rbd, false) } func (util *RBDUtil) AttachDisk(rbd rbd) error { var err error - devicePath := strings.Join([]string{"/dev/rbd", rbd.pool, rbd.image}, "/") + devicePath := strings.Join([]string{"/dev/rbd", rbd.Pool, rbd.Image}, "/") exist := waitForPathToExist(devicePath, 1) if !exist { // modprobe @@ -72,19 +187,19 @@ func (util *RBDUtil) AttachDisk(rbd rbd) error { return fmt.Errorf("rbd: failed to modprobe rbd error:%v", err) } // rbd map - l := len(rbd.mon) + l := len(rbd.Mon) // avoid mount storm, pick a host randomly start := rand.Int() % l // iterate all hosts until mount succeeds. for i := start; i < start+l; i++ { - mon := rbd.mon[i%l] + mon := rbd.Mon[i%l] glog.V(1).Infof("rbd: map mon %s", mon) - if rbd.secret != "" { + if rbd.Secret != "" { _, err = rbd.plugin.execCommand("rbd", - []string{"map", rbd.image, "--pool", rbd.pool, "--id", rbd.id, "-m", mon, "--key=" + rbd.secret}) + []string{"map", rbd.Image, "--pool", rbd.Pool, "--id", rbd.Id, "-m", mon, "--key=" + rbd.Secret}) } else { _, err = rbd.plugin.execCommand("rbd", - []string{"map", rbd.image, "--pool", rbd.pool, "--id", rbd.id, "-m", mon, "-k", rbd.keyring}) + []string{"map", rbd.Image, "--pool", rbd.Pool, "--id", rbd.Id, "-m", mon, "-k", rbd.Keyring}) } if err == nil { break @@ -113,6 +228,17 @@ func (util *RBDUtil) AttachDisk(rbd rbd) error { return fmt.Errorf("rbd: failed to mkdir %s, error", globalPDPath) } + // fence off other mappers + if err := util.fencing(rbd); err != nil { + return fmt.Errorf("rbd: image %s is locked by other nodes", rbd.Image) + } + // rbd lock remove needs ceph and image config + // but kubelet doesn't get them from apiserver during teardown + // so persit rbd config so upon disk detach, rbd lock can be removed + // since rbd json is persisted in the same local directory that is used as rbd mountpoint later, + // the json file remains invisible during rbd mount and thus won't be removed accidentally. + util.persistRBD(rbd, globalPDPath) + if err = rbd.mounter.Mount(devicePath, globalPDPath, rbd.fsType, nil); err != nil { err = fmt.Errorf("rbd: failed to mount rbd volume %s [%s] to %s, error %v", devicePath, rbd.fsType, globalPDPath, err) } @@ -135,6 +261,13 @@ func (util *RBDUtil) DetachDisk(rbd rbd, mntPath string) error { if err != nil { return fmt.Errorf("rbd: failed to unmap device %s:Error: %v", device, err) } + + // load ceph and image/pool info to remove fencing + if err := util.loadRBD(&rbd, mntPath); err == nil { + // remove rbd lock + util.defencing(rbd) + } + glog.Infof("rbd: successfully unmap device %s", device) } return nil