From 335c5d959fbcb019d249f50e8642c608803be215 Mon Sep 17 00:00:00 2001 From: Serguei Bezverkhi Date: Thu, 30 Nov 2017 14:10:03 -0500 Subject: [PATCH] Adding support for Block Volume to rbd plugin --- pkg/volume/rbd/BUILD | 1 + pkg/volume/rbd/disk_manager.go | 4 + pkg/volume/rbd/rbd.go | 252 +++++++++++++++++++++++++++++++++ pkg/volume/rbd/rbd_test.go | 17 ++- pkg/volume/rbd/rbd_util.go | 70 ++++++++- 5 files changed, 337 insertions(+), 7 deletions(-) diff --git a/pkg/volume/rbd/BUILD b/pkg/volume/rbd/BUILD index ea058ed7c67..5a196a794e5 100644 --- a/pkg/volume/rbd/BUILD +++ b/pkg/volume/rbd/BUILD @@ -31,6 +31,7 @@ go_library( "//vendor/k8s.io/apimachinery/pkg/types:go_default_library", "//vendor/k8s.io/apimachinery/pkg/util/sets:go_default_library", "//vendor/k8s.io/apimachinery/pkg/util/uuid:go_default_library", + "//vendor/k8s.io/apimachinery/pkg/util/wait:go_default_library", "//vendor/k8s.io/client-go/kubernetes:go_default_library", ], ) diff --git a/pkg/volume/rbd/disk_manager.go b/pkg/volume/rbd/disk_manager.go index 9610ebc2b7a..70db99b3cff 100644 --- a/pkg/volume/rbd/disk_manager.go +++ b/pkg/volume/rbd/disk_manager.go @@ -37,12 +37,16 @@ import ( type diskManager interface { // MakeGlobalPDName creates global persistent disk path. MakeGlobalPDName(disk rbd) string + // MakeGlobalVDPDName creates global block disk path. + MakeGlobalVDPDName(disk rbd) string // Attaches the disk to the kubelet's host machine. // If it successfully attaches, the path to the device // is returned. Otherwise, an error will be returned. AttachDisk(disk rbdMounter) (string, error) // Detaches the disk from the kubelet's host machine. DetachDisk(plugin *rbdPlugin, deviceMountPath string, device string) error + // Detaches the block disk from the kubelet's host machine. + DetachBlockDisk(disk rbdDiskUnmapper, mntPath string) error // Creates a rbd image. CreateImage(provisioner *rbdVolumeProvisioner) (r *v1.RBDPersistentVolumeSource, volumeSizeGB int, err error) // Deletes a rbd image. diff --git a/pkg/volume/rbd/rbd.go b/pkg/volume/rbd/rbd.go index 6042e743395..0d4e8c21958 100644 --- a/pkg/volume/rbd/rbd.go +++ b/pkg/volume/rbd/rbd.go @@ -18,6 +18,8 @@ package rbd import ( "fmt" + "os" + "path/filepath" dstrings "strings" "github.com/golang/glog" @@ -55,6 +57,7 @@ var _ volume.DeletableVolumePlugin = &rbdPlugin{} var _ volume.ProvisionableVolumePlugin = &rbdPlugin{} var _ volume.AttachableVolumePlugin = &rbdPlugin{} var _ volume.ExpandableVolumePlugin = &rbdPlugin{} +var _ volume.BlockVolumePlugin = &rbdPlugin{} const ( rbdPluginName = "kubernetes.io/rbd" @@ -368,6 +371,127 @@ func (plugin *rbdPlugin) ConstructVolumeSpec(volumeName, mountPath string) (*vol return volume.NewSpecFromVolume(rbdVolume), nil } +func (plugin *rbdPlugin) ConstructBlockVolumeSpec(podUID types.UID, volumeName, mapPath string) (*volume.Spec, error) { + pluginDir := plugin.host.GetVolumeDevicePluginDir(rbdPluginName) + blkutil := volutil.NewBlockVolumePathHandler() + + globalMapPathUUID, err := blkutil.FindGlobalMapPathUUIDFromPod(pluginDir, mapPath, podUID) + if err != nil { + return nil, err + } + glog.V(5).Infof("globalMapPathUUID: %v, err: %v", globalMapPathUUID, err) + globalMapPath := filepath.Dir(globalMapPathUUID) + if len(globalMapPath) == 1 { + return nil, fmt.Errorf("failed to retreive volume plugin information from globalMapPathUUID: %v", globalMapPathUUID) + } + return getVolumeSpecFromGlobalMapPath(globalMapPath) +} + +func getVolumeSpecFromGlobalMapPath(globalMapPath string) (*volume.Spec, error) { + // Retreive volume spec information from globalMapPath + // globalMapPath example: + // plugins/kubernetes.io/{PluginName}/{DefaultKubeletVolumeDevicesDirName}/{volumePluginDependentPath} + pool, image, err := getPoolAndImageFromMapPath(globalMapPath) + if err != nil { + return nil, err + } + block := v1.PersistentVolumeBlock + rbdVolume := &v1.PersistentVolume{ + Spec: v1.PersistentVolumeSpec{ + PersistentVolumeSource: v1.PersistentVolumeSource{ + RBD: &v1.RBDPersistentVolumeSource{ + RBDImage: image, + RBDPool: pool, + }, + }, + VolumeMode: &block, + }, + } + + return volume.NewSpecFromPersistentVolume(rbdVolume, true), nil +} + +func (plugin *rbdPlugin) NewBlockVolumeMapper(spec *volume.Spec, pod *v1.Pod, _ volume.VolumeOptions) (volume.BlockVolumeMapper, error) { + + var uid types.UID + if pod != nil { + uid = pod.UID + } + secret := "" + // var err error + if pod != nil { + secretName, secretNs, err := getSecretNameAndNamespace(spec, pod.Namespace) + if err != nil { + return nil, err + } + if len(secretName) > 0 && len(secretNs) > 0 { + // if secret is provideded, retrieve it + kubeClient := plugin.host.GetKubeClient() + if kubeClient == nil { + return nil, fmt.Errorf("Cannot get kube client") + } + secrets, err := kubeClient.Core().Secrets(secretNs).Get(secretName, metav1.GetOptions{}) + if err != nil { + err = fmt.Errorf("Couldn't get secret %v/%v err: %v", secretNs, secretName, err) + return nil, err + } + for _, data := range secrets.Data { + secret = string(data) + } + } + } + + return plugin.newBlockVolumeMapperInternal(spec, uid, &RBDUtil{}, secret, plugin.host.GetMounter(plugin.GetPluginName()), plugin.host.GetExec(plugin.GetPluginName())) +} + +func (plugin *rbdPlugin) newBlockVolumeMapperInternal(spec *volume.Spec, podUID types.UID, manager diskManager, secret string, mounter mount.Interface, exec mount.Exec) (volume.BlockVolumeMapper, error) { + mon, err := getVolumeSourceMonitors(spec) + if err != nil { + return nil, err + } + img, err := getVolumeSourceImage(spec) + if err != nil { + return nil, err + } + pool, err := getVolumeSourcePool(spec) + if err != nil { + return nil, err + } + id, err := getVolumeSourceUser(spec) + if err != nil { + return nil, err + } + keyring, err := getVolumeSourceKeyRing(spec) + if err != nil { + return nil, err + } + ro, err := getVolumeSourceReadOnly(spec) + if err != nil { + return nil, err + } + + return &rbdDiskMapper{ + rbd: newRBD(podUID, spec.Name(), img, pool, ro, plugin, manager), + mon: mon, + id: id, + keyring: keyring, + secret: secret, + }, nil +} + +func (plugin *rbdPlugin) NewBlockVolumeUnmapper(volName string, podUID types.UID) (volume.BlockVolumeUnmapper, error) { + return plugin.newUnmapperInternal(volName, podUID, &RBDUtil{}) +} + +func (plugin *rbdPlugin) newUnmapperInternal(volName string, podUID types.UID, manager diskManager) (volume.BlockVolumeUnmapper, error) { + return &rbdDiskUnmapper{ + rbdDiskMapper: &rbdDiskMapper{ + rbd: newRBD(podUID, volName, "", "", false, plugin, manager), + mon: make([]string, 0), + }, + }, nil +} + func (plugin *rbdPlugin) NewDeleter(spec *volume.Spec) (volume.Deleter, error) { if spec.PersistentVolume != nil && spec.PersistentVolume.Spec.RBD == nil { return nil, fmt.Errorf("spec.PersistentVolumeSource.Spec.RBD is nil") @@ -661,6 +785,134 @@ func (c *rbdUnmounter) TearDownAt(dir string) error { return nil } +var _ volume.BlockVolumeMapper = &rbdDiskMapper{} + +type rbdDiskMapper struct { + *rbd + mon []string + id string + keyring string + secret string + adminSecret string + adminId string + imageFormat string + imageFeatures []string +} + +var _ volume.BlockVolumeUnmapper = &rbdDiskUnmapper{} + +// GetGlobalMapPath returns global map path and error +// path: plugins/kubernetes.io/{PluginName}/volumeDevices/{rbd pool}-image-{rbd image-name}/{podUid} +func (rbd *rbd) GetGlobalMapPath(spec *volume.Spec) (string, error) { + return rbd.rbdGlobalMapPath(spec) +} + +// GetPodDeviceMapPath returns pod device map path and volume name +// path: pods/{podUid}/volumeDevices/kubernetes.io~rbd +// volumeName: pv0001 +func (rbd *rbd) GetPodDeviceMapPath() (string, string) { + return rbd.rbdPodDeviceMapPath() +} + +func (rbd *rbdDiskMapper) SetUpDevice() (string, error) { + return "", nil +} + +func (rbd *rbd) rbdGlobalMapPath(spec *volume.Spec) (string, error) { + var err error + mon, err := getVolumeSourceMonitors(spec) + if err != nil { + return "", err + } + img, err := getVolumeSourceImage(spec) + if err != nil { + return "", err + } + pool, err := getVolumeSourcePool(spec) + if err != nil { + return "", err + } + ro, err := getVolumeSourceReadOnly(spec) + if err != nil { + return "", err + } + + mounter := &rbdMounter{ + rbd: newRBD("", spec.Name(), img, pool, ro, rbd.plugin, &RBDUtil{}), + Mon: mon, + } + return rbd.manager.MakeGlobalVDPDName(*mounter.rbd), nil +} + +func (rbd *rbd) rbdPodDeviceMapPath() (string, string) { + name := rbdPluginName + return rbd.plugin.host.GetPodVolumeDeviceDir(rbd.podUID, strings.EscapeQualifiedNameForDisk(name)), rbd.volName +} + +type rbdDiskUnmapper struct { + *rbdDiskMapper +} + +func getPoolAndImageFromMapPath(mapPath string) (string, string, error) { + + pathParts := dstrings.Split(mapPath, "/") + if len(pathParts) < 2 { + return "", "", fmt.Errorf("corrupted mapPath") + } + rbdParts := dstrings.Split(pathParts[len(pathParts)-1], "-image-") + + if len(rbdParts) < 2 { + return "", "", fmt.Errorf("corrupted mapPath") + } + return string(rbdParts[0]), string(rbdParts[1]), nil +} + +func getBlockVolumeDevice(mapPath string) (string, error) { + pool, image, err := getPoolAndImageFromMapPath(mapPath) + if err != nil { + return "", err + } + // Getting full device path + device, found := getDevFromImageAndPool(pool, image) + if !found { + return "", err + } + return device, nil +} + +func (rbd *rbdDiskUnmapper) TearDownDevice(mapPath, _ string) error { + + device, err := getBlockVolumeDevice(mapPath) + if err != nil { + return fmt.Errorf("rbd: failed to get loopback for device: %v, err: %v", device, err) + } + blkUtil := volutil.NewBlockVolumePathHandler() + loop, err := volutil.BlockVolumePathHandler.GetLoopDevice(blkUtil, device) + if err != nil { + return fmt.Errorf("rbd: failed to get loopback for device: %v, err: %v", device, err) + } + // Remove loop device before detaching volume since volume detach operation gets busy if volume is opened by loopback. + err = volutil.BlockVolumePathHandler.RemoveLoopDevice(blkUtil, loop) + if err != nil { + return fmt.Errorf("rbd: failed to remove loopback :%v, err: %v", loop, err) + } + glog.V(4).Infof("rbd: successfully removed loop device: %s", loop) + + err = rbd.manager.DetachBlockDisk(*rbd, mapPath) + if err != nil { + return fmt.Errorf("rbd: failed to detach disk: %s\nError: %v", mapPath, err) + } + glog.V(4).Infof("rbd: %q is unmapped, deleting the directory", mapPath) + + err = os.RemoveAll(mapPath) + if err != nil { + return fmt.Errorf("rbd: failed to delete the directory: %s\nError: %v", mapPath, err) + } + glog.V(4).Infof("rbd: successfully detached disk: %s", mapPath) + + return nil +} + func getVolumeSourceMonitors(spec *volume.Spec) ([]string, error) { if spec.Volume != nil && spec.Volume.RBD != nil { return spec.Volume.RBD.CephMonitors, nil diff --git a/pkg/volume/rbd/rbd_test.go b/pkg/volume/rbd/rbd_test.go index d16ce140992..23b9b969f96 100644 --- a/pkg/volume/rbd/rbd_test.go +++ b/pkg/volume/rbd/rbd_test.go @@ -81,10 +81,14 @@ func (fake *fakeDiskManager) MakeGlobalPDName(rbd rbd) string { return makePDNameInternal(rbd.plugin.host, rbd.Pool, rbd.Image) } +func (fake *fakeDiskManager) MakeGlobalVDPDName(rbd rbd) string { + return makePDNameInternal(rbd.plugin.host, rbd.Pool, rbd.Image) +} + func (fake *fakeDiskManager) AttachDisk(b rbdMounter) (string, error) { fake.mutex.Lock() defer fake.mutex.Unlock() - fake.rbdMapIndex += 1 + fake.rbdMapIndex++ devicePath := fmt.Sprintf("/dev/rbd%d", fake.rbdMapIndex) fake.rbdDevices[devicePath] = true return devicePath, nil @@ -101,6 +105,17 @@ func (fake *fakeDiskManager) DetachDisk(r *rbdPlugin, deviceMountPath string, de return nil } +func (fake *fakeDiskManager) DetachBlockDisk(r rbdDiskUnmapper, device string) error { + fake.mutex.Lock() + defer fake.mutex.Unlock() + ok := fake.rbdDevices[device] + if !ok { + return fmt.Errorf("rbd: failed to detach device %s, it does not exist", device) + } + delete(fake.rbdDevices, device) + return nil +} + func (fake *fakeDiskManager) CreateImage(provisioner *rbdVolumeProvisioner) (r *v1.RBDPersistentVolumeSource, volumeSizeGB int, err error) { return nil, 0, fmt.Errorf("not implemented") } diff --git a/pkg/volume/rbd/rbd_util.go b/pkg/volume/rbd/rbd_util.go index 915e0e2e496..4f6d9e0121f 100644 --- a/pkg/volume/rbd/rbd_util.go +++ b/pkg/volume/rbd/rbd_util.go @@ -35,6 +35,7 @@ import ( "github.com/golang/glog" "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/resource" + "k8s.io/apimachinery/pkg/util/wait" fileutil "k8s.io/kubernetes/pkg/util/file" "k8s.io/kubernetes/pkg/util/node" "k8s.io/kubernetes/pkg/volume" @@ -46,6 +47,11 @@ const ( imageSizeStr = "size " sizeDivStr = " MB in" kubeLockMagic = "kubelet_lock_magic_" + // The following three values are used for 30 seconds timeout + // while waiting for RBD Watcher to expire. + rbdImageWatcherInitDelay = 1 * time.Second + rbdImageWatcherFactor = 1.4 + rbdImageWatcherSteps = 10 ) // search /sys/bus for rbd device that matches given pool and image @@ -109,6 +115,11 @@ func makePDNameInternal(host volume.VolumeHost, pool string, image string) strin return path.Join(host.GetPluginDir(rbdPluginName), "rbd", pool+"-image-"+image) } +// make a directory like /var/lib/kubelet/plugins/kubernetes.io/rbd/volumeDevices/pool-image-image +func makeVDPDNameInternal(host volume.VolumeHost, pool string, image string) string { + return path.Join(host.GetVolumeDevicePluginDir(rbdPluginName), pool+"-image-"+image) +} + // RBDUtil implements diskManager interface. type RBDUtil struct{} @@ -118,6 +129,10 @@ func (util *RBDUtil) MakeGlobalPDName(rbd rbd) string { return makePDNameInternal(rbd.plugin.host, rbd.Pool, rbd.Image) } +func (util *RBDUtil) MakeGlobalVDPDName(rbd rbd) string { + return makeVDPDNameInternal(rbd.plugin.host, rbd.Pool, rbd.Image) +} + func rbdErrors(runErr, resultErr error) error { if err, ok := runErr.(*exec.Error); ok { if err.Err == exec.ErrNotFound { @@ -217,13 +232,27 @@ func (util *RBDUtil) AttachDisk(b rbdMounter) (string, error) { // Currently, we don't acquire advisory lock on image, but for backward // compatibility, we need to check if the image is being used by nodes running old kubelet. - found, rbdOutput, err := util.rbdStatus(&b) - if err != nil { - return "", fmt.Errorf("error: %v, rbd output: %v", err, rbdOutput) + // osd_client_watch_timeout defaults to 30 seconds, if the watcher stays active longer than 30 seconds, + // rbd image does not get mounted and failure message gets generated. + backoff := wait.Backoff{ + Duration: rbdImageWatcherInitDelay, + Factor: rbdImageWatcherFactor, + Steps: rbdImageWatcherSteps, } - if found { - glog.Infof("rbd image %s/%s is still being used ", b.Pool, b.Image) - return "", fmt.Errorf("rbd image %s/%s is still being used. rbd output: %s", b.Pool, b.Image, rbdOutput) + err := wait.ExponentialBackoff(backoff, func() (bool, error) { + used, rbdOutput, err := util.rbdStatus(&b) + if err != nil { + return false, fmt.Errorf("fail to check rbd image status with: (%v), rbd output: (%s)", err, rbdOutput) + } + return !used, nil + }) + // return error if rbd image has not become available for the specified timeout + if err == wait.ErrWaitTimeout { + return "", fmt.Errorf("rbd image %s/%s is still being used", b.Pool, b.Image) + } + // return error if any other errors were encountered during wating for the image to becme avialble + if err != nil { + return "", err } mon := util.kernelRBDMonitorsOpt(b.Mon) @@ -281,6 +310,35 @@ func (util *RBDUtil) DetachDisk(plugin *rbdPlugin, deviceMountPath string, devic return nil } +// DetachBlockDisk detaches the disk from the node. +func (util *RBDUtil) DetachBlockDisk(disk rbdDiskUnmapper, mapPath string) error { + + if pathExists, pathErr := volutil.PathExists(mapPath); pathErr != nil { + return fmt.Errorf("Error checking if path exists: %v", pathErr) + } else if !pathExists { + glog.Warningf("Warning: Unmap skipped because path does not exist: %v", mapPath) + return nil + } + // If we arrive here, device is no longer used, see if need to logout the target + device, err := getBlockVolumeDevice(mapPath) + if err != nil { + return err + } + + if len(device) == 0 { + return fmt.Errorf("DetachDisk failed , device is empty") + } + // rbd unmap + exec := disk.plugin.host.GetExec(disk.plugin.GetPluginName()) + output, err := exec.Run("rbd", "unmap", device) + if err != nil { + return rbdErrors(err, fmt.Errorf("rbd: failed to unmap device %s, error %v, rbd output: %s", device, err, string(output))) + } + glog.V(3).Infof("rbd: successfully unmap device %s", device) + + return nil +} + // cleanOldRBDFile read rbd info from rbd.json file and removes lock if found. // At last, it removes rbd.json file. func (util *RBDUtil) cleanOldRBDFile(plugin *rbdPlugin, rbdFile string) error {