RBD Plugin: Implement Attacher/Detacher interfaces.

1) Modify rbdPlugin to implement volume.AttachableVolumePlugin
   interface.
2) Add rbdAttacher/rbdDetacher structs to implement
   volume.Attacher/Detacher interfaces.
3) Add mount.SafeFormatAndMount/mount.Exec fields to rbdPlugin, and
   setup them in rbdPlugin.Init for later uses.
   Attacher/Mounter/Unmounter/Detacher reference rbdPlugin to use mounter
   and exec. This simplifies code.
4) Add testcase struct to abstract RBD Plugin test case, etc.
5) Add newRBD constructor to unify rbd struct initialization.
This commit is contained in:
Yecheng Fu
2017-08-30 16:52:11 +00:00
parent a768092f9a
commit ba0d275f3b
7 changed files with 624 additions and 188 deletions

View File

@@ -78,6 +78,7 @@ func ProbeAttachableVolumePlugins() []volume.VolumePlugin {
allPlugins = append(allPlugins, storageos.ProbeVolumePlugins()...) allPlugins = append(allPlugins, storageos.ProbeVolumePlugins()...)
allPlugins = append(allPlugins, fc.ProbeVolumePlugins()...) allPlugins = append(allPlugins, fc.ProbeVolumePlugins()...)
allPlugins = append(allPlugins, iscsi.ProbeVolumePlugins()...) allPlugins = append(allPlugins, iscsi.ProbeVolumePlugins()...)
allPlugins = append(allPlugins, rbd.ProbeVolumePlugins()...)
return allPlugins return allPlugins
} }

View File

@@ -9,6 +9,7 @@ load(
go_library( go_library(
name = "go_default_library", name = "go_default_library",
srcs = [ srcs = [
"attacher.go",
"disk_manager.go", "disk_manager.go",
"doc.go", "doc.go",
"rbd.go", "rbd.go",
@@ -46,6 +47,7 @@ go_test(
"//vendor/k8s.io/api/core/v1:go_default_library", "//vendor/k8s.io/api/core/v1:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library", "//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/types:go_default_library", "//vendor/k8s.io/apimachinery/pkg/types:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/util/uuid:go_default_library",
"//vendor/k8s.io/client-go/kubernetes/fake:go_default_library", "//vendor/k8s.io/client-go/kubernetes/fake:go_default_library",
"//vendor/k8s.io/client-go/util/testing:go_default_library", "//vendor/k8s.io/client-go/util/testing:go_default_library",
], ],

219
pkg/volume/rbd/attacher.go Normal file
View File

@@ -0,0 +1,219 @@
/*
Copyright 2017 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package rbd
import (
"fmt"
"os"
"time"
"github.com/golang/glog"
"k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/kubernetes/pkg/util/mount"
"k8s.io/kubernetes/pkg/volume"
volutil "k8s.io/kubernetes/pkg/volume/util"
)
// NewAttacher implements AttachableVolumePlugin.NewAttacher.
func (plugin *rbdPlugin) NewAttacher() (volume.Attacher, error) {
return plugin.newAttacherInternal(&RBDUtil{})
}
func (plugin *rbdPlugin) newAttacherInternal(manager diskManager) (volume.Attacher, error) {
return &rbdAttacher{
plugin: plugin,
manager: manager,
}, nil
}
// NewDetacher implements AttachableVolumePlugin.NewDetacher.
func (plugin *rbdPlugin) NewDetacher() (volume.Detacher, error) {
return plugin.newDetacherInternal(&RBDUtil{})
}
func (plugin *rbdPlugin) newDetacherInternal(manager diskManager) (volume.Detacher, error) {
return &rbdDetacher{
plugin: plugin,
manager: manager,
}, nil
}
// GetDeviceMountRefs implements AttachableVolumePlugin.GetDeviceMountRefs.
func (plugin *rbdPlugin) GetDeviceMountRefs(deviceMountPath string) ([]string, error) {
mounter := plugin.host.GetMounter(plugin.GetPluginName())
return mount.GetMountRefs(mounter, deviceMountPath)
}
// rbdAttacher implements volume.Attacher interface.
type rbdAttacher struct {
plugin *rbdPlugin
manager diskManager
}
var _ volume.Attacher = &rbdAttacher{}
// Attach implements Attacher.Attach.
// We do not lock image here, because it requires kube-controller-manager to
// access external `rbd` utility. And there is no need since AttachDetach
// controller will not try to attach RWO volumes which are already attached to
// other nodes.
func (attacher *rbdAttacher) Attach(spec *volume.Spec, nodeName types.NodeName) (string, error) {
return "", nil
}
// VolumesAreAttached implements Attacher.VolumesAreAttached.
// There is no way to confirm whether the volume is attached or not from
// outside of the kubelet node. This method needs to return true always, like
// iSCSI, FC plugin.
func (attacher *rbdAttacher) VolumesAreAttached(specs []*volume.Spec, nodeName types.NodeName) (map[*volume.Spec]bool, error) {
volumesAttachedCheck := make(map[*volume.Spec]bool)
for _, spec := range specs {
volumesAttachedCheck[spec] = true
}
return volumesAttachedCheck, nil
}
// WaitForAttach implements Attacher.WaitForAttach. It's called by kublet to
// attach volume onto the node.
// This method is idempotent, callers are responsible for retrying on failure.
func (attacher *rbdAttacher) WaitForAttach(spec *volume.Spec, devicePath string, pod *v1.Pod, timeout time.Duration) (string, error) {
glog.V(4).Infof("rbd: waiting for attach volume (name: %s) for pod (name: %s, uid: %s)", spec.Name(), pod.Name, pod.UID)
mounter, err := attacher.plugin.createMounterFromVolumeSpecAndPod(spec, pod)
if err != nil {
glog.Warningf("failed to create mounter: %v", spec)
return "", err
}
realDevicePath, err := attacher.manager.AttachDisk(*mounter)
if err != nil {
return "", err
}
glog.V(3).Infof("rbd: successfully wait for attach volume (spec: %s, pool: %s, image: %s) at %s", spec.Name(), mounter.Pool, mounter.Image, realDevicePath)
return realDevicePath, nil
}
// GetDeviceMountPath implements Attacher.GetDeviceMountPath.
func (attacher *rbdAttacher) GetDeviceMountPath(spec *volume.Spec) (string, error) {
img, err := getVolumeSourceImage(spec)
if err != nil {
return "", err
}
pool, err := getVolumeSourcePool(spec)
if err != nil {
return "", err
}
return makePDNameInternal(attacher.plugin.host, pool, img), nil
}
// MountDevice implements Attacher.MountDevice. It is called by the kubelet to
// mount device at the given mount path.
// This method is idempotent, callers are responsible for retrying on failure.
func (attacher *rbdAttacher) MountDevice(spec *volume.Spec, devicePath string, deviceMountPath string) error {
glog.V(4).Infof("rbd: mouting device %s to %s", devicePath, deviceMountPath)
notMnt, err := attacher.plugin.mounter.IsLikelyNotMountPoint(deviceMountPath)
if err != nil {
if os.IsNotExist(err) {
if err := os.MkdirAll(deviceMountPath, 0750); err != nil {
return err
}
notMnt = true
} else {
return err
}
}
if !notMnt {
return nil
}
fstype, err := getVolumeSourceFSType(spec)
if err != nil {
return err
}
ro, err := getVolumeSourceReadOnly(spec)
if err != nil {
return err
}
options := []string{}
if ro {
options = append(options, "ro")
}
mountOptions := volume.MountOptionFromSpec(spec, options...)
err = attacher.plugin.mounter.FormatAndMount(devicePath, deviceMountPath, fstype, mountOptions)
if err != nil {
return fmt.Errorf("rbd: failed to mount device %s at %s (fstype: %s), error %v", devicePath, deviceMountPath, fstype, err)
}
glog.V(3).Infof("rbd: successfully mount device %s at %s (fstype: %s)", devicePath, deviceMountPath, fstype)
return nil
}
// rbdDetacher implements volume.Detacher interface.
type rbdDetacher struct {
plugin *rbdPlugin
manager diskManager
}
var _ volume.Detacher = &rbdDetacher{}
// UnmountDevice implements Detacher.UnmountDevice. It unmounts the global
// mount of the RBD image. This is called once all bind mounts have been
// unmounted.
// Internally, it does four things:
// - Unmount device from deviceMountPath.
// - Detach device from the node.
// - Remove lock if found. (No need to check volume readonly or not, because
// device is not on the node anymore, it's safe to remove lock.)
// - Remove the deviceMountPath at last.
// This method is idempotent, callers are responsible for retrying on failure.
func (detacher *rbdDetacher) UnmountDevice(deviceMountPath string) error {
if pathExists, pathErr := volutil.PathExists(deviceMountPath); pathErr != nil {
return fmt.Errorf("Error checking if path exists: %v", pathErr)
} else if !pathExists {
glog.Warningf("Warning: Unmount skipped because path does not exist: %v", deviceMountPath)
return nil
}
devicePath, cnt, err := mount.GetDeviceNameFromMount(detacher.plugin.mounter, deviceMountPath)
if err != nil {
return err
}
if cnt > 1 {
return fmt.Errorf("rbd: more than 1 reference counts at %s", deviceMountPath)
}
if cnt == 1 {
// Unmount the device from the device mount point.
glog.V(4).Infof("rbd: unmouting device mountpoint %s", deviceMountPath)
if err = detacher.plugin.mounter.Unmount(deviceMountPath); err != nil {
return err
}
glog.V(3).Infof("rbd: successfully umount device mountpath %s", deviceMountPath)
}
glog.V(4).Infof("rbd: detaching device %s", devicePath)
err = detacher.manager.DetachDisk(detacher.plugin, deviceMountPath, devicePath)
if err != nil {
return err
}
glog.V(3).Infof("rbd: successfully detach device %s", devicePath)
err = os.RemoveAll(deviceMountPath)
if err != nil {
return err
}
glog.V(3).Infof("rbd: successfully remove device mount point %s", deviceMountPath)
return nil
}
// Detach implements Detacher.Detach.
func (detacher *rbdDetacher) Detach(deviceName string, nodeName types.NodeName) error {
return nil
}

View File

@@ -23,6 +23,7 @@ limitations under the License.
package rbd package rbd
import ( import (
"fmt"
"os" "os"
"github.com/golang/glog" "github.com/golang/glog"
@@ -33,23 +34,33 @@ import (
// Abstract interface to disk operations. // Abstract interface to disk operations.
type diskManager interface { type diskManager interface {
// MakeGlobalPDName creates global persistent disk path.
MakeGlobalPDName(disk rbd) string MakeGlobalPDName(disk rbd) string
// Attaches the disk to the kubelet's host machine. // Attaches the disk to the kubelet's host machine.
AttachDisk(disk rbdMounter) error // If it successfully attaches, the path to the device
// is returned. Otherwise, an error will be returned.
AttachDisk(disk rbdMounter) (string, error)
// Detaches the disk from the kubelet's host machine. // Detaches the disk from the kubelet's host machine.
DetachDisk(disk rbdUnmounter, mntPath string) error DetachDisk(plugin *rbdPlugin, deviceMountPath string, device string) error
// Creates a rbd image // Creates a rbd image.
CreateImage(provisioner *rbdVolumeProvisioner) (r *v1.RBDPersistentVolumeSource, volumeSizeGB int, err error) CreateImage(provisioner *rbdVolumeProvisioner) (r *v1.RBDPersistentVolumeSource, volumeSizeGB int, err error)
// Deletes a rbd image // Deletes a rbd image.
DeleteImage(deleter *rbdVolumeDeleter) error DeleteImage(deleter *rbdVolumeDeleter) error
} }
// utility to mount a disk based filesystem // utility to mount a disk based filesystem
func diskSetUp(manager diskManager, b rbdMounter, volPath string, mounter mount.Interface, fsGroup *int64) error { func diskSetUp(manager diskManager, b rbdMounter, volPath string, mounter mount.Interface, fsGroup *int64) error {
globalPDPath := manager.MakeGlobalPDName(*b.rbd) globalPDPath := manager.MakeGlobalPDName(*b.rbd)
// TODO: handle failed mounts here. notMnt, err := mounter.IsLikelyNotMountPoint(globalPDPath)
notMnt, err := mounter.IsLikelyNotMountPoint(volPath) if err != nil && !os.IsNotExist(err) {
glog.Errorf("cannot validate mountpoint: %s", globalPDPath)
return err
}
if notMnt {
return fmt.Errorf("no device is mounted at %s", globalPDPath)
}
notMnt, err = mounter.IsLikelyNotMountPoint(volPath)
if err != nil && !os.IsNotExist(err) { if err != nil && !os.IsNotExist(err) {
glog.Errorf("cannot validate mountpoint: %s", volPath) glog.Errorf("cannot validate mountpoint: %s", volPath)
return err return err
@@ -57,10 +68,6 @@ func diskSetUp(manager diskManager, b rbdMounter, volPath string, mounter mount.
if !notMnt { if !notMnt {
return nil return nil
} }
if err := manager.AttachDisk(b); err != nil {
glog.Errorf("failed to attach disk")
return err
}
if err := os.MkdirAll(volPath, 0750); err != nil { if err := os.MkdirAll(volPath, 0750); err != nil {
glog.Errorf("failed to mkdir:%s", volPath) glog.Errorf("failed to mkdir:%s", volPath)
@@ -89,43 +96,31 @@ func diskSetUp(manager diskManager, b rbdMounter, volPath string, mounter mount.
// utility to tear down a disk based filesystem // utility to tear down a disk based filesystem
func diskTearDown(manager diskManager, c rbdUnmounter, volPath string, mounter mount.Interface) error { func diskTearDown(manager diskManager, c rbdUnmounter, volPath string, mounter mount.Interface) error {
notMnt, err := mounter.IsLikelyNotMountPoint(volPath) notMnt, err := mounter.IsLikelyNotMountPoint(volPath)
if err != nil { if err != nil && !os.IsNotExist(err) {
glog.Errorf("cannot validate mountpoint %s", volPath) glog.Errorf("cannot validate mountpoint: %s", volPath)
return err return err
} }
if notMnt { if notMnt {
glog.V(3).Infof("volume path %s is not a mountpoint, deleting", volPath)
return os.Remove(volPath) return os.Remove(volPath)
} }
refs, err := mount.GetMountRefs(mounter, volPath) // Unmount the bind-mount inside this pod.
if err != nil {
glog.Errorf("failed to get reference count %s", volPath)
return err
}
if err := mounter.Unmount(volPath); err != nil { if err := mounter.Unmount(volPath); err != nil {
glog.Errorf("failed to umount %s", volPath) glog.Errorf("failed to umount %s", volPath)
return err return err
} }
// If len(refs) is 1, then all bind mounts have been removed, and the
// remaining reference is the global mount. It is safe to detach.
if len(refs) == 1 {
mntPath := refs[0]
if err := manager.DetachDisk(c, mntPath); err != nil {
glog.Errorf("failed to detach disk from %s", mntPath)
return err
}
}
notMnt, mntErr := mounter.IsLikelyNotMountPoint(volPath) notMnt, mntErr := mounter.IsLikelyNotMountPoint(volPath)
if mntErr != nil { if err != nil && !os.IsNotExist(err) {
glog.Errorf("IsLikelyNotMountPoint check failed: %v", mntErr) glog.Errorf("IsLikelyNotMountPoint check failed: %v", mntErr)
return err return err
} }
if notMnt { if notMnt {
if err := os.Remove(volPath); err != nil { if err := os.Remove(volPath); err != nil {
glog.V(2).Info("Error removing mountpoint ", volPath, ": ", err)
return err return err
} }
} }
return nil return nil
} }

View File

@@ -41,17 +41,21 @@ var (
// This is the primary entrypoint for volume plugins. // This is the primary entrypoint for volume plugins.
func ProbeVolumePlugins() []volume.VolumePlugin { func ProbeVolumePlugins() []volume.VolumePlugin {
return []volume.VolumePlugin{&rbdPlugin{nil}} return []volume.VolumePlugin{&rbdPlugin{nil, nil, nil}}
} }
// rbdPlugin implements Volume.VolumePlugin.
type rbdPlugin struct { type rbdPlugin struct {
host volume.VolumeHost host volume.VolumeHost
exec mount.Exec
mounter *mount.SafeFormatAndMount
} }
var _ volume.VolumePlugin = &rbdPlugin{} var _ volume.VolumePlugin = &rbdPlugin{}
var _ volume.PersistentVolumePlugin = &rbdPlugin{} var _ volume.PersistentVolumePlugin = &rbdPlugin{}
var _ volume.DeletableVolumePlugin = &rbdPlugin{} var _ volume.DeletableVolumePlugin = &rbdPlugin{}
var _ volume.ProvisionableVolumePlugin = &rbdPlugin{} var _ volume.ProvisionableVolumePlugin = &rbdPlugin{}
var _ volume.AttachableVolumePlugin = &rbdPlugin{}
const ( const (
rbdPluginName = "kubernetes.io/rbd" rbdPluginName = "kubernetes.io/rbd"
@@ -70,6 +74,8 @@ func getPath(uid types.UID, volName string, host volume.VolumeHost) string {
func (plugin *rbdPlugin) Init(host volume.VolumeHost) error { func (plugin *rbdPlugin) Init(host volume.VolumeHost) error {
plugin.host = host plugin.host = host
plugin.exec = host.GetExec(plugin.GetPluginName())
plugin.mounter = volumehelper.NewSafeFormatAndMountFromHost(plugin.GetPluginName(), plugin.host)
return nil return nil
} }
@@ -120,6 +126,68 @@ func (plugin *rbdPlugin) GetAccessModes() []v1.PersistentVolumeAccessMode {
} }
} }
func (plugin *rbdPlugin) createMounterFromVolumeSpecAndPod(spec *volume.Spec, pod *v1.Pod) (*rbdMounter, error) {
var err error
mon, err := getVolumeSourceMonitors(spec)
if err != nil {
return nil, err
}
img, err := getVolumeSourceImage(spec)
if err != nil {
return nil, err
}
fstype, err := getVolumeSourceFSType(spec)
if err != nil {
return nil, err
}
pool, err := getVolumeSourcePool(spec)
if err != nil {
return nil, err
}
id, err := getVolumeSourceUser(spec)
if err != nil {
return nil, err
}
keyring, err := getVolumeSourceKeyRing(spec)
if err != nil {
return nil, err
}
ro, err := getVolumeSourceReadOnly(spec)
if err != nil {
return nil, err
}
secretName, secretNs, err := getSecretNameAndNamespace(spec, pod.Namespace)
if err != nil {
return nil, err
}
secret := ""
if len(secretName) > 0 && len(secretNs) > 0 {
// if secret is provideded, retrieve it
kubeClient := plugin.host.GetKubeClient()
if kubeClient == nil {
return nil, fmt.Errorf("Cannot get kube client")
}
secrets, err := kubeClient.Core().Secrets(secretNs).Get(secretName, metav1.GetOptions{})
if err != nil {
err = fmt.Errorf("Couldn't get secret %v/%v err: %v", secretNs, secretName, err)
return nil, err
}
for _, data := range secrets.Data {
secret = string(data)
}
}
return &rbdMounter{
rbd: newRBD("", spec.Name(), img, pool, ro, plugin, &RBDUtil{}),
Mon: mon,
Id: id,
Keyring: keyring,
Secret: secret,
fsType: fstype,
}, nil
}
func (plugin *rbdPlugin) NewMounter(spec *volume.Spec, pod *v1.Pod, _ volume.VolumeOptions) (volume.Mounter, error) { func (plugin *rbdPlugin) NewMounter(spec *volume.Spec, pod *v1.Pod, _ volume.VolumeOptions) (volume.Mounter, error) {
secretName, secretNs, err := getSecretNameAndNamespace(spec, pod.Namespace) secretName, secretNs, err := getSecretNameAndNamespace(spec, pod.Namespace)
if err != nil { if err != nil {
@@ -143,10 +211,10 @@ func (plugin *rbdPlugin) NewMounter(spec *volume.Spec, pod *v1.Pod, _ volume.Vol
} }
// Inject real implementations here, test through the internal function. // Inject real implementations here, test through the internal function.
return plugin.newMounterInternal(spec, pod.UID, &RBDUtil{}, plugin.host.GetMounter(plugin.GetPluginName()), plugin.host.GetExec(plugin.GetPluginName()), secret) return plugin.newMounterInternal(spec, pod.UID, &RBDUtil{}, secret)
} }
func (plugin *rbdPlugin) newMounterInternal(spec *volume.Spec, podUID types.UID, manager diskManager, mounter mount.Interface, exec mount.Exec, secret string) (volume.Mounter, error) { func (plugin *rbdPlugin) newMounterInternal(spec *volume.Spec, podUID types.UID, manager diskManager, secret string) (volume.Mounter, error) {
mon, err := getVolumeSourceMonitors(spec) mon, err := getVolumeSourceMonitors(spec)
if err != nil { if err != nil {
return nil, err return nil, err
@@ -177,18 +245,7 @@ func (plugin *rbdPlugin) newMounterInternal(spec *volume.Spec, podUID types.UID,
} }
return &rbdMounter{ return &rbdMounter{
rbd: &rbd{ rbd: newRBD(podUID, spec.Name(), img, pool, ro, plugin, manager),
podUID: podUID,
volName: spec.Name(),
Image: img,
Pool: pool,
ReadOnly: ro,
manager: manager,
mounter: &mount.SafeFormatAndMount{Interface: mounter, Exec: exec},
exec: exec,
plugin: plugin,
MetricsProvider: volume.NewMetricsStatFS(getPath(podUID, spec.Name(), plugin.host)),
},
Mon: mon, Mon: mon,
Id: id, Id: id,
Keyring: keyring, Keyring: keyring,
@@ -200,21 +257,13 @@ func (plugin *rbdPlugin) newMounterInternal(spec *volume.Spec, podUID types.UID,
func (plugin *rbdPlugin) NewUnmounter(volName string, podUID types.UID) (volume.Unmounter, error) { func (plugin *rbdPlugin) NewUnmounter(volName string, podUID types.UID) (volume.Unmounter, error) {
// Inject real implementations here, test through the internal function. // Inject real implementations here, test through the internal function.
return plugin.newUnmounterInternal(volName, podUID, &RBDUtil{}, plugin.host.GetMounter(plugin.GetPluginName()), plugin.host.GetExec(plugin.GetPluginName())) return plugin.newUnmounterInternal(volName, podUID, &RBDUtil{})
} }
func (plugin *rbdPlugin) newUnmounterInternal(volName string, podUID types.UID, manager diskManager, mounter mount.Interface, exec mount.Exec) (volume.Unmounter, error) { func (plugin *rbdPlugin) newUnmounterInternal(volName string, podUID types.UID, manager diskManager) (volume.Unmounter, error) {
return &rbdUnmounter{ return &rbdUnmounter{
rbdMounter: &rbdMounter{ rbdMounter: &rbdMounter{
rbd: &rbd{ rbd: newRBD(podUID, volName, "", "", false, plugin, manager),
podUID: podUID,
volName: volName,
manager: manager,
mounter: &mount.SafeFormatAndMount{Interface: mounter, Exec: exec},
exec: exec,
plugin: plugin,
MetricsProvider: volume.NewMetricsStatFS(getPath(podUID, volName, plugin.host)),
},
Mon: make([]string, 0), Mon: make([]string, 0),
}, },
}, nil }, nil
@@ -268,15 +317,7 @@ func (plugin *rbdPlugin) NewDeleter(spec *volume.Spec) (volume.Deleter, error) {
func (plugin *rbdPlugin) newDeleterInternal(spec *volume.Spec, admin, secret string, manager diskManager) (volume.Deleter, error) { func (plugin *rbdPlugin) newDeleterInternal(spec *volume.Spec, admin, secret string, manager diskManager) (volume.Deleter, error) {
return &rbdVolumeDeleter{ return &rbdVolumeDeleter{
rbdMounter: &rbdMounter{ rbdMounter: &rbdMounter{
rbd: &rbd{ rbd: newRBD("", spec.Name(), spec.PersistentVolume.Spec.RBD.RBDImage, spec.PersistentVolume.Spec.RBD.RBDPool, false, plugin, manager),
volName: spec.Name(),
Image: spec.PersistentVolume.Spec.RBD.RBDImage,
Pool: spec.PersistentVolume.Spec.RBD.RBDPool,
manager: manager,
plugin: plugin,
mounter: &mount.SafeFormatAndMount{Interface: plugin.host.GetMounter(plugin.GetPluginName())},
exec: plugin.host.GetExec(plugin.GetPluginName()),
},
Mon: spec.PersistentVolume.Spec.RBD.CephMonitors, Mon: spec.PersistentVolume.Spec.RBD.CephMonitors,
adminId: admin, adminId: admin,
adminSecret: secret, adminSecret: secret,
@@ -290,22 +331,20 @@ func (plugin *rbdPlugin) NewProvisioner(options volume.VolumeOptions) (volume.Pr
func (plugin *rbdPlugin) newProvisionerInternal(options volume.VolumeOptions, manager diskManager) (volume.Provisioner, error) { func (plugin *rbdPlugin) newProvisionerInternal(options volume.VolumeOptions, manager diskManager) (volume.Provisioner, error) {
return &rbdVolumeProvisioner{ return &rbdVolumeProvisioner{
rbdMounter: &rbdMounter{ rbdMounter: &rbdMounter{
rbd: &rbd{ rbd: newRBD("", "", "", "", false, plugin, manager),
manager: manager,
plugin: plugin,
mounter: &mount.SafeFormatAndMount{Interface: plugin.host.GetMounter(plugin.GetPluginName())},
exec: plugin.host.GetExec(plugin.GetPluginName()),
},
}, },
options: options, options: options,
}, nil }, nil
} }
// rbdVolumeProvisioner implements volume.Provisioner interface.
type rbdVolumeProvisioner struct { type rbdVolumeProvisioner struct {
*rbdMounter *rbdMounter
options volume.VolumeOptions options volume.VolumeOptions
} }
var _ volume.Provisioner = &rbdVolumeProvisioner{}
func (r *rbdVolumeProvisioner) Provision() (*v1.PersistentVolume, error) { func (r *rbdVolumeProvisioner) Provision() (*v1.PersistentVolume, error) {
if !volume.AccessModesContainedInAll(r.plugin.GetAccessModes(), r.options.PVC.Spec.AccessModes) { if !volume.AccessModesContainedInAll(r.plugin.GetAccessModes(), r.options.PVC.Spec.AccessModes) {
return nil, fmt.Errorf("invalid AccessModes %v: only AccessModes %v are supported", r.options.PVC.Spec.AccessModes, r.plugin.GetAccessModes()) return nil, fmt.Errorf("invalid AccessModes %v: only AccessModes %v are supported", r.options.PVC.Spec.AccessModes, r.plugin.GetAccessModes())
@@ -419,10 +458,13 @@ func (r *rbdVolumeProvisioner) Provision() (*v1.PersistentVolume, error) {
return pv, nil return pv, nil
} }
// rbdVolumeDeleter implements volume.Deleter interface.
type rbdVolumeDeleter struct { type rbdVolumeDeleter struct {
*rbdMounter *rbdMounter
} }
var _ volume.Deleter = &rbdVolumeDeleter{}
func (r *rbdVolumeDeleter) GetPath() string { func (r *rbdVolumeDeleter) GetPath() string {
return getPath(r.podUID, r.volName, r.plugin.host) return getPath(r.podUID, r.volName, r.plugin.host)
} }
@@ -431,6 +473,8 @@ func (r *rbdVolumeDeleter) Delete() error {
return r.manager.DeleteImage(r) return r.manager.DeleteImage(r)
} }
// rbd implmenets volume.Volume interface.
// It's embedded in Mounter/Unmounter/Deleter.
type rbd struct { type rbd struct {
volName string volName string
podUID types.UID podUID types.UID
@@ -445,11 +489,35 @@ type rbd struct {
volume.MetricsProvider `json:"-"` volume.MetricsProvider `json:"-"`
} }
var _ volume.Volume = &rbd{}
func (rbd *rbd) GetPath() string { func (rbd *rbd) GetPath() string {
// safe to use PodVolumeDir now: volume teardown occurs before pod is cleaned up // safe to use PodVolumeDir now: volume teardown occurs before pod is cleaned up
return getPath(rbd.podUID, rbd.volName, rbd.plugin.host) return getPath(rbd.podUID, rbd.volName, rbd.plugin.host)
} }
// newRBD creates a new rbd.
func newRBD(podUID types.UID, volName string, image string, pool string, readOnly bool, plugin *rbdPlugin, manager diskManager) *rbd {
return &rbd{
podUID: podUID,
volName: volName,
Image: image,
Pool: pool,
ReadOnly: readOnly,
plugin: plugin,
mounter: plugin.mounter,
exec: plugin.exec,
manager: manager,
MetricsProvider: volume.NewMetricsStatFS(getPath(podUID, volName, plugin.host)),
}
}
// rbdMounter implements volume.Mounter interface.
// It contains information which need to be persisted in whole life cycle of PV
// on the node. It is persisted at the very beginning in the pod mount point
// directory.
// Note: Capitalized field names of this struct determines the information
// persisted on the disk, DO NOT change them. (TODO: refactoring to use a dedicated struct?)
type rbdMounter struct { type rbdMounter struct {
*rbd *rbd
// capitalized so they can be exported in persistRBD() // capitalized so they can be exported in persistRBD()
@@ -488,14 +556,16 @@ func (b *rbdMounter) SetUp(fsGroup *int64) error {
func (b *rbdMounter) SetUpAt(dir string, fsGroup *int64) error { func (b *rbdMounter) SetUpAt(dir string, fsGroup *int64) error {
// diskSetUp checks mountpoints and prevent repeated calls // diskSetUp checks mountpoints and prevent repeated calls
glog.V(4).Infof("rbd: attempting to SetUp and mount %s", dir) glog.V(4).Infof("rbd: attempting to setup at %s", dir)
err := diskSetUp(b.manager, *b, dir, b.mounter, fsGroup) err := diskSetUp(b.manager, *b, dir, b.mounter, fsGroup)
if err != nil { if err != nil {
glog.Errorf("rbd: failed to setup mount %s %v", dir, err) glog.Errorf("rbd: failed to setup at %s %v", dir, err)
} }
glog.V(3).Infof("rbd: successfully setup at %s", dir)
return err return err
} }
// rbdUnmounter implements volume.Unmounter interface.
type rbdUnmounter struct { type rbdUnmounter struct {
*rbdMounter *rbdMounter
} }
@@ -509,13 +579,19 @@ func (c *rbdUnmounter) TearDown() error {
} }
func (c *rbdUnmounter) TearDownAt(dir string) error { func (c *rbdUnmounter) TearDownAt(dir string) error {
glog.V(4).Infof("rbd: attempting to teardown at %s", dir)
if pathExists, pathErr := volutil.PathExists(dir); pathErr != nil { if pathExists, pathErr := volutil.PathExists(dir); pathErr != nil {
return fmt.Errorf("Error checking if path exists: %v", pathErr) return fmt.Errorf("Error checking if path exists: %v", pathErr)
} else if !pathExists { } else if !pathExists {
glog.Warningf("Warning: Unmount skipped because path does not exist: %v", dir) glog.Warningf("Warning: Unmount skipped because path does not exist: %v", dir)
return nil return nil
} }
return diskTearDown(c.manager, *c, dir, c.mounter) err := diskTearDown(c.manager, *c, dir, c.mounter)
if err != nil {
return err
}
glog.V(3).Infof("rbd: successfully teardown at %s", dir)
return nil
} }
func getVolumeSourceMonitors(spec *volume.Spec) ([]string, error) { func getVolumeSourceMonitors(spec *volume.Spec) ([]string, error) {

View File

@@ -23,12 +23,15 @@ import (
"path/filepath" "path/filepath"
"reflect" "reflect"
"strings" "strings"
"sync"
"testing" "testing"
"time"
"github.com/stretchr/testify/assert" "github.com/stretchr/testify/assert"
"k8s.io/api/core/v1" "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/uuid"
"k8s.io/client-go/kubernetes/fake" "k8s.io/client-go/kubernetes/fake"
utiltesting "k8s.io/client-go/util/testing" utiltesting "k8s.io/client-go/util/testing"
"k8s.io/kubernetes/pkg/util/mount" "k8s.io/kubernetes/pkg/util/mount"
@@ -59,37 +62,43 @@ func TestCanSupport(t *testing.T) {
} }
type fakeDiskManager struct { type fakeDiskManager struct {
tmpDir string // Make sure we can run tests in parallel.
mutex sync.RWMutex
// Key format: "<pool>/<image>"
rbdImageLocks map[string]bool
rbdMapIndex int
rbdDevices map[string]bool
} }
func NewFakeDiskManager() *fakeDiskManager { func NewFakeDiskManager() *fakeDiskManager {
return &fakeDiskManager{ return &fakeDiskManager{
tmpDir: utiltesting.MkTmpdirOrDie("rbd_test"), rbdImageLocks: make(map[string]bool),
rbdMapIndex: 0,
rbdDevices: make(map[string]bool),
} }
} }
func (fake *fakeDiskManager) Cleanup() { func (fake *fakeDiskManager) MakeGlobalPDName(rbd rbd) string {
os.RemoveAll(fake.tmpDir) return makePDNameInternal(rbd.plugin.host, rbd.Pool, rbd.Image)
} }
func (fake *fakeDiskManager) MakeGlobalPDName(disk rbd) string { func (fake *fakeDiskManager) AttachDisk(b rbdMounter) (string, error) {
return fake.tmpDir fake.mutex.Lock()
} defer fake.mutex.Unlock()
func (fake *fakeDiskManager) AttachDisk(b rbdMounter) error { fake.rbdMapIndex += 1
globalPath := b.manager.MakeGlobalPDName(*b.rbd) devicePath := fmt.Sprintf("/dev/rbd%d", fake.rbdMapIndex)
err := os.MkdirAll(globalPath, 0750) fake.rbdDevices[devicePath] = true
if err != nil { return devicePath, nil
return err
}
return nil
} }
func (fake *fakeDiskManager) DetachDisk(c rbdUnmounter, mntPath string) error { func (fake *fakeDiskManager) DetachDisk(r *rbdPlugin, deviceMountPath string, device string) error {
globalPath := c.manager.MakeGlobalPDName(*c.rbd) fake.mutex.Lock()
err := os.RemoveAll(globalPath) defer fake.mutex.Unlock()
if err != nil { ok := fake.rbdDevices[device]
return err if !ok {
return fmt.Errorf("rbd: failed to detach device %s, it does not exist", device)
} }
delete(fake.rbdDevices, device)
return nil return nil
} }
@@ -101,35 +110,111 @@ func (fake *fakeDiskManager) DeleteImage(deleter *rbdVolumeDeleter) error {
return fmt.Errorf("not implemented") return fmt.Errorf("not implemented")
} }
func doTestPlugin(t *testing.T, spec *volume.Spec) { func (fake *fakeDiskManager) Fencing(r rbdMounter, nodeName string) error {
tmpDir, err := utiltesting.MkTmpdir("rbd_test") fake.mutex.Lock()
if err != nil { defer fake.mutex.Unlock()
t.Fatalf("error creating temp dir: %v", err) key := fmt.Sprintf("%s/%s", r.Pool, r.Image)
isLocked, ok := fake.rbdImageLocks[key]
if ok && isLocked {
// not expected in testing
return fmt.Errorf("%s is already locked", key)
} }
defer os.RemoveAll(tmpDir) fake.rbdImageLocks[key] = true
return nil
}
func (fake *fakeDiskManager) Defencing(r rbdMounter, nodeName string) error {
fake.mutex.Lock()
defer fake.mutex.Unlock()
key := fmt.Sprintf("%s/%s", r.Pool, r.Image)
isLocked, ok := fake.rbdImageLocks[key]
if !ok || !isLocked {
// not expected in testing
return fmt.Errorf("%s is not locked", key)
}
delete(fake.rbdImageLocks, key)
return nil
}
func (fake *fakeDiskManager) IsLocked(r rbdMounter, nodeName string) (bool, error) {
fake.mutex.RLock()
defer fake.mutex.RUnlock()
key := fmt.Sprintf("%s/%s", r.Pool, r.Image)
isLocked, ok := fake.rbdImageLocks[key]
return ok && isLocked, nil
}
// checkMounterLog checks fakeMounter must have expected logs, and the last action msut equal to expectedAction.
func checkMounterLog(t *testing.T, fakeMounter *mount.FakeMounter, expected int, expectedAction mount.FakeAction) {
if len(fakeMounter.Log) != expected {
t.Fatalf("fakeMounter should have %d logs, actual: %d", expected, len(fakeMounter.Log))
}
lastIndex := len(fakeMounter.Log) - 1
lastAction := fakeMounter.Log[lastIndex]
if !reflect.DeepEqual(expectedAction, lastAction) {
t.Fatalf("fakeMounter.Log[%d] should be %v, not: %v", lastIndex, expectedAction, lastAction)
}
}
func doTestPlugin(t *testing.T, c *testcase) {
fakeVolumeHost := volumetest.NewFakeVolumeHost(c.root, nil, nil)
plugMgr := volume.VolumePluginMgr{} plugMgr := volume.VolumePluginMgr{}
plugMgr.InitPlugins(ProbeVolumePlugins(), nil /* prober */, volumetest.NewFakeVolumeHost(tmpDir, nil, nil)) plugMgr.InitPlugins(ProbeVolumePlugins(), nil /* prober */, fakeVolumeHost)
plug, err := plugMgr.FindPluginByName("kubernetes.io/rbd") plug, err := plugMgr.FindPluginByName("kubernetes.io/rbd")
if err != nil { if err != nil {
t.Errorf("Can't find the plugin by name") t.Errorf("Can't find the plugin by name")
} }
fakeMounter := fakeVolumeHost.GetMounter(plug.GetPluginName()).(*mount.FakeMounter)
fakeNodeName := types.NodeName("localhost")
fdm := NewFakeDiskManager() fdm := NewFakeDiskManager()
defer fdm.Cleanup()
exec := mount.NewFakeExec(nil) // attacher
mounter, err := plug.(*rbdPlugin).newMounterInternal(spec, types.UID("poduid"), fdm, &mount.FakeMounter{}, exec, "secrets") attacher, err := plug.(*rbdPlugin).newAttacherInternal(fdm)
if err != nil {
t.Errorf("Failed to make a new Attacher: %v", err)
}
deviceAttachPath, err := attacher.Attach(c.spec, fakeNodeName)
if err != nil {
t.Fatal(err)
}
devicePath, err := attacher.WaitForAttach(c.spec, deviceAttachPath, c.pod, time.Second*10)
if err != nil {
t.Fatal(err)
}
if devicePath != c.expectedDevicePath {
t.Errorf("Unexpected path, expected %q, not: %q", c.expectedDevicePath, devicePath)
}
deviceMountPath, err := attacher.GetDeviceMountPath(c.spec)
if err != nil {
t.Fatal(err)
}
if deviceMountPath != c.expectedDeviceMountPath {
t.Errorf("Unexpected mount path, expected %q, not: %q", c.expectedDeviceMountPath, deviceMountPath)
}
err = attacher.MountDevice(c.spec, devicePath, deviceMountPath)
if err != nil {
t.Fatal(err)
}
if _, err := os.Stat(deviceMountPath); err != nil {
if os.IsNotExist(err) {
t.Errorf("Attacher.MountDevice() failed, device mount path not created: %s", deviceMountPath)
} else {
t.Errorf("Attacher.MountDevice() failed: %v", err)
}
}
checkMounterLog(t, fakeMounter, 1, mount.FakeAction{Action: "mount", Target: c.expectedDeviceMountPath, Source: devicePath, FSType: "ext4"})
// mounter
mounter, err := plug.(*rbdPlugin).newMounterInternal(c.spec, c.pod.UID, fdm, "secrets")
if err != nil { if err != nil {
t.Errorf("Failed to make a new Mounter: %v", err) t.Errorf("Failed to make a new Mounter: %v", err)
} }
if mounter == nil { if mounter == nil {
t.Error("Got a nil Mounter") t.Error("Got a nil Mounter")
} }
path := mounter.GetPath() path := mounter.GetPath()
expectedPath := fmt.Sprintf("%s/pods/poduid/volumes/kubernetes.io~rbd/vol1", tmpDir) if path != c.expectedPodMountPath {
if path != expectedPath { t.Errorf("Unexpected path, expected %q, got: %q", c.expectedPodMountPath, path)
t.Errorf("Unexpected path, expected %q, got: %q", expectedPath, path)
} }
if err := mounter.SetUp(nil); err != nil { if err := mounter.SetUp(nil); err != nil {
@@ -142,8 +227,10 @@ func doTestPlugin(t *testing.T, spec *volume.Spec) {
t.Errorf("SetUp() failed: %v", err) t.Errorf("SetUp() failed: %v", err)
} }
} }
checkMounterLog(t, fakeMounter, 2, mount.FakeAction{Action: "mount", Target: c.expectedPodMountPath, Source: devicePath, FSType: ""})
unmounter, err := plug.(*rbdPlugin).newUnmounterInternal("vol1", types.UID("poduid"), fdm, &mount.FakeMounter{}, exec) // unmounter
unmounter, err := plug.(*rbdPlugin).newUnmounterInternal(c.spec.Name(), c.pod.UID, fdm)
if err != nil { if err != nil {
t.Errorf("Failed to make a new Unmounter: %v", err) t.Errorf("Failed to make a new Unmounter: %v", err)
} }
@@ -159,38 +246,98 @@ func doTestPlugin(t *testing.T, spec *volume.Spec) {
} else if !os.IsNotExist(err) { } else if !os.IsNotExist(err) {
t.Errorf("SetUp() failed: %v", err) t.Errorf("SetUp() failed: %v", err)
} }
checkMounterLog(t, fakeMounter, 3, mount.FakeAction{Action: "unmount", Target: c.expectedPodMountPath, Source: "", FSType: ""})
// detacher
detacher, err := plug.(*rbdPlugin).newDetacherInternal(fdm)
if err != nil {
t.Errorf("Failed to make a new Attacher: %v", err)
}
err = detacher.UnmountDevice(deviceMountPath)
if err != nil {
t.Fatalf("Detacher.UnmountDevice failed to unmount %s", deviceMountPath)
}
checkMounterLog(t, fakeMounter, 4, mount.FakeAction{Action: "unmount", Target: c.expectedDeviceMountPath, Source: "", FSType: ""})
err = detacher.Detach(deviceMountPath, fakeNodeName)
if err != nil {
t.Fatalf("Detacher.Detach failed to detach %s from %s", deviceMountPath, fakeNodeName)
}
} }
func TestPluginVolume(t *testing.T) { type testcase struct {
vol := &v1.Volume{ spec *volume.Spec
root string
pod *v1.Pod
expectedDevicePath string
expectedDeviceMountPath string
expectedPodMountPath string
}
func TestPlugin(t *testing.T) {
tmpDir, err := utiltesting.MkTmpdir("rbd_test")
if err != nil {
t.Fatalf("error creating temp dir: %v", err)
}
defer os.RemoveAll(tmpDir)
podUID := uuid.NewUUID()
var cases []*testcase
cases = append(cases, &testcase{
spec: volume.NewSpecFromVolume(&v1.Volume{
Name: "vol1", Name: "vol1",
VolumeSource: v1.VolumeSource{ VolumeSource: v1.VolumeSource{
RBD: &v1.RBDVolumeSource{ RBD: &v1.RBDVolumeSource{
CephMonitors: []string{"a", "b"}, CephMonitors: []string{"a", "b"},
RBDImage: "bar", RBDPool: "pool1",
RBDImage: "image1",
FSType: "ext4", FSType: "ext4",
}, },
}, },
} }),
doTestPlugin(t, volume.NewSpecFromVolume(vol)) root: tmpDir,
} pod: &v1.Pod{
func TestPluginPersistentVolume(t *testing.T) {
vol := &v1.PersistentVolume{
ObjectMeta: metav1.ObjectMeta{ ObjectMeta: metav1.ObjectMeta{
Name: "vol1", Name: "testpod",
Namespace: "testns",
UID: podUID,
},
},
expectedDevicePath: "/dev/rbd1",
expectedDeviceMountPath: fmt.Sprintf("%s/plugins/kubernetes.io/rbd/rbd/pool1-image-image1", tmpDir),
expectedPodMountPath: fmt.Sprintf("%s/pods/%s/volumes/kubernetes.io~rbd/vol1", tmpDir, podUID),
})
cases = append(cases, &testcase{
spec: volume.NewSpecFromPersistentVolume(&v1.PersistentVolume{
ObjectMeta: metav1.ObjectMeta{
Name: "vol2",
}, },
Spec: v1.PersistentVolumeSpec{ Spec: v1.PersistentVolumeSpec{
PersistentVolumeSource: v1.PersistentVolumeSource{ PersistentVolumeSource: v1.PersistentVolumeSource{
RBD: &v1.RBDPersistentVolumeSource{ RBD: &v1.RBDPersistentVolumeSource{
CephMonitors: []string{"a", "b"}, CephMonitors: []string{"a", "b"},
RBDImage: "bar", RBDPool: "pool2",
RBDImage: "image2",
FSType: "ext4", FSType: "ext4",
}, },
}, },
}, },
} }, false),
root: tmpDir,
pod: &v1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: "testpod",
Namespace: "testns",
UID: podUID,
},
},
expectedDevicePath: "/dev/rbd1",
expectedDeviceMountPath: fmt.Sprintf("%s/plugins/kubernetes.io/rbd/rbd/pool2-image-image2", tmpDir),
expectedPodMountPath: fmt.Sprintf("%s/pods/%s/volumes/kubernetes.io~rbd/vol2", tmpDir, podUID),
})
doTestPlugin(t, volume.NewSpecFromPersistentVolume(vol, false)) for i := 0; i < len(cases); i++ {
doTestPlugin(t, cases[i])
}
} }
func TestPersistentClaimReadOnlyFlag(t *testing.T) { func TestPersistentClaimReadOnlyFlag(t *testing.T) {

View File

@@ -35,9 +35,9 @@ import (
"github.com/golang/glog" "github.com/golang/glog"
"k8s.io/api/core/v1" "k8s.io/api/core/v1"
"k8s.io/kubernetes/pkg/util/mount"
"k8s.io/kubernetes/pkg/util/node" "k8s.io/kubernetes/pkg/util/node"
"k8s.io/kubernetes/pkg/volume" "k8s.io/kubernetes/pkg/volume"
volutil "k8s.io/kubernetes/pkg/volume/util"
) )
const ( const (
@@ -107,11 +107,15 @@ func makePDNameInternal(host volume.VolumeHost, pool string, image string) strin
return path.Join(host.GetPluginDir(rbdPluginName), "rbd", pool+"-image-"+image) return path.Join(host.GetPluginDir(rbdPluginName), "rbd", pool+"-image-"+image)
} }
// RBDUtil implements diskManager interface.
type RBDUtil struct{} type RBDUtil struct{}
var _ diskManager = &RBDUtil{}
func (util *RBDUtil) MakeGlobalPDName(rbd rbd) string { func (util *RBDUtil) MakeGlobalPDName(rbd rbd) string {
return makePDNameInternal(rbd.plugin.host, rbd.Pool, rbd.Image) return makePDNameInternal(rbd.plugin.host, rbd.Pool, rbd.Image)
} }
func rbdErrors(runErr, resultErr error) error { func rbdErrors(runErr, resultErr error) error {
if runErr.Error() == rbdCmdErr { if runErr.Error() == rbdCmdErr {
return fmt.Errorf("rbd: rbd cmd not found") return fmt.Errorf("rbd: rbd cmd not found")
@@ -119,6 +123,8 @@ func rbdErrors(runErr, resultErr error) error {
return resultErr return resultErr
} }
// rbdLock acquires a lock on image if lock is true, otherwise releases if a
// lock is found on image.
func (util *RBDUtil) rbdLock(b rbdMounter, lock bool) error { func (util *RBDUtil) rbdLock(b rbdMounter, lock bool) error {
var err error var err error
var output, locker string var output, locker string
@@ -188,6 +194,9 @@ func (util *RBDUtil) rbdLock(b rbdMounter, lock bool) error {
args := []string{"lock", "add", b.Image, lock_id, "--pool", b.Pool, "--id", b.Id, "-m", mon} args := []string{"lock", "add", b.Image, lock_id, "--pool", b.Pool, "--id", b.Id, "-m", mon}
args = append(args, secret_opt...) args = append(args, secret_opt...)
cmd, err = b.exec.Run("rbd", args...) cmd, err = b.exec.Run("rbd", args...)
if err == nil {
glog.V(4).Infof("rbd: successfully add lock (locker_id: %s) on image: %s/%s with id %s mon %s", lock_id, b.Pool, b.Image, b.Id, mon)
}
} else { } else {
// defencing, find locker name // defencing, find locker name
ind := strings.LastIndex(output, lock_id) - 1 ind := strings.LastIndex(output, lock_id) - 1
@@ -197,14 +206,19 @@ func (util *RBDUtil) rbdLock(b rbdMounter, lock bool) error {
break break
} }
} }
// remove a lock: rbd lock remove // remove a lock if found: rbd lock remove
if len(locker) > 0 {
args := []string{"lock", "remove", b.Image, lock_id, locker, "--pool", b.Pool, "--id", b.Id, "-m", mon} args := []string{"lock", "remove", b.Image, lock_id, locker, "--pool", b.Pool, "--id", b.Id, "-m", mon}
args = append(args, secret_opt...) args = append(args, secret_opt...)
cmd, err = b.exec.Run("rbd", args...) cmd, err = b.exec.Run("rbd", args...)
if err == nil {
glog.V(4).Infof("rbd: successfully remove lock (locker_id: %s) on image: %s/%s with id %s mon %s", lock_id, b.Pool, b.Image, b.Id, mon)
}
}
} }
if err == nil { if err == nil {
//lock is acquired // break if operation succeeds
break break
} }
} }
@@ -251,31 +265,19 @@ func (util *RBDUtil) fencing(b rbdMounter) error {
return util.rbdLock(b, true) return util.rbdLock(b, true)
} }
func (util *RBDUtil) defencing(c rbdUnmounter) error { // AttachDisk attaches the disk on the node.
// no need to fence readOnly // If Volume is not read-only, acquire a lock on image first.
if c.ReadOnly { func (util *RBDUtil) AttachDisk(b rbdMounter) (string, error) {
return nil
}
return util.rbdLock(*c.rbdMounter, false)
}
func (util *RBDUtil) AttachDisk(b rbdMounter) error {
var err error var err error
var output []byte var output []byte
// create mount point globalPDPath := util.MakeGlobalPDName(*b.rbd)
globalPDPath := b.manager.MakeGlobalPDName(*b.rbd) if pathExists, pathErr := volutil.PathExists(globalPDPath); pathErr != nil {
notMnt, err := b.mounter.IsLikelyNotMountPoint(globalPDPath) return "", fmt.Errorf("Error checking if path exists: %v", pathErr)
// in the first time, the path shouldn't exist and IsLikelyNotMountPoint is expected to get NotExist } else if !pathExists {
if err != nil && !os.IsNotExist(err) { if err := os.MkdirAll(globalPDPath, 0750); err != nil {
return fmt.Errorf("rbd: %s failed to check mountpoint", globalPDPath) return "", err
} }
if !notMnt {
return nil
}
if err = os.MkdirAll(globalPDPath, 0750); err != nil {
return fmt.Errorf("rbd: failed to mkdir %s, error", globalPDPath)
} }
devicePath, found := waitForPath(b.Pool, b.Image, 1) devicePath, found := waitForPath(b.Pool, b.Image, 1)
@@ -287,7 +289,7 @@ func (util *RBDUtil) AttachDisk(b rbdMounter) error {
// fence off other mappers // fence off other mappers
if err = util.fencing(b); err != nil { if err = util.fencing(b); err != nil {
return rbdErrors(err, fmt.Errorf("rbd: failed to lock image %s (maybe locked by other nodes), error %v", b.Image, err)) return "", rbdErrors(err, fmt.Errorf("rbd: failed to lock image %s (maybe locked by other nodes), error %v", b.Image, err))
} }
// rbd lock remove needs ceph and image config // rbd lock remove needs ceph and image config
// but kubelet doesn't get them from apiserver during teardown // but kubelet doesn't get them from apiserver during teardown
@@ -317,49 +319,43 @@ func (util *RBDUtil) AttachDisk(b rbdMounter) error {
glog.V(1).Infof("rbd: map error %v %s", err, string(output)) glog.V(1).Infof("rbd: map error %v %s", err, string(output))
} }
if err != nil { if err != nil {
return fmt.Errorf("rbd: map failed %v %s", err, string(output)) return "", fmt.Errorf("rbd: map failed %v %s", err, string(output))
} }
devicePath, found = waitForPath(b.Pool, b.Image, 10) devicePath, found = waitForPath(b.Pool, b.Image, 10)
if !found { if !found {
return errors.New("Could not map image: Timeout after 10s") return "", errors.New("Could not map image: Timeout after 10s")
} }
glog.V(3).Infof("rbd: successfully map image %s/%s to %s", b.Pool, b.Image, devicePath)
} }
return devicePath, err
// mount it
if err = b.mounter.FormatAndMount(devicePath, globalPDPath, b.fsType, nil); err != nil {
err = fmt.Errorf("rbd: failed to mount rbd volume %s [%s] to %s, error %v", devicePath, b.fsType, globalPDPath, err)
}
glog.V(3).Infof("rbd: successfully mount image %s/%s at %s", b.Pool, b.Image, globalPDPath)
return err
} }
func (util *RBDUtil) DetachDisk(c rbdUnmounter, mntPath string) error { // DetachDisk detaches the disk from the node.
device, cnt, err := mount.GetDeviceNameFromMount(c.mounter, mntPath) // It detaches device from the node if device is provided, and removes the lock
if err != nil { // if there is persisted RBD info under deviceMountPath.
return fmt.Errorf("rbd detach disk: failed to get device from mnt: %s\nError: %v", mntPath, err) func (util *RBDUtil) DetachDisk(plugin *rbdPlugin, deviceMountPath string, device string) error {
} var err error
if err = c.mounter.Unmount(mntPath); err != nil { if len(device) > 0 {
return fmt.Errorf("rbd detach disk: failed to umount: %s\nError: %v", mntPath, err)
}
glog.V(3).Infof("rbd: successfully umount mountpoint %s", mntPath)
// if device is no longer used, see if can unmap
if cnt <= 1 {
// rbd unmap // rbd unmap
_, err = c.exec.Run("rbd", "unmap", device) _, err = plugin.exec.Run("rbd", "unmap", device)
if err != nil { if err != nil {
return rbdErrors(err, fmt.Errorf("rbd: failed to unmap device %s:Error: %v", device, err)) return rbdErrors(err, fmt.Errorf("rbd: failed to unmap device %s:Error: %v", device, err))
} }
// load ceph and image/pool info to remove fencing
if err := util.loadRBD(c.rbdMounter, mntPath); err == nil {
// remove rbd lock
util.defencing(c)
}
glog.V(3).Infof("rbd: successfully unmap device %s", device) glog.V(3).Infof("rbd: successfully unmap device %s", device)
} }
return nil // load ceph and image/pool info to remove fencing
mounter := &rbdMounter{
// util.rbdLock needs it to run command.
rbd: newRBD("", "", "", "", false, plugin, util),
}
err = util.loadRBD(mounter, deviceMountPath)
if err != nil {
glog.Errorf("failed to load rbd info from %s: %v", deviceMountPath, err)
return err
}
// remove rbd lock if found
// the disk is not attached to this node anymore, so the lock on image
// for this node can be removed safely
return util.rbdLock(*mounter, false)
} }
func (util *RBDUtil) CreateImage(p *rbdVolumeProvisioner) (r *v1.RBDPersistentVolumeSource, size int, err error) { func (util *RBDUtil) CreateImage(p *rbdVolumeProvisioner) (r *v1.RBDPersistentVolumeSource, size int, err error) {