Merge pull request #51608 from cofyc/rbd_attach_detach

Automatic merge from submit-queue (batch tested with PRs 53730, 51608, 54459, 54534, 54585). If you want to cherry-pick this change to another branch, please follow the instructions <a href="https://github.com/kubernetes/community/blob/master/contributors/devel/cherry-picks.md">here</a>.

RBD Plugin: Implement Attacher/Detacher interfaces.

**What this PR does / why we need it**:

This PR continues @rootfs 's work in #33660. It implements volume.Attacher/Volume.Detacher interfaces to resolve RBD image locking and makes RBD plugin more robust.

Summary of interfaces and what they do for RBD plugin:

- Attacher.Attach(): does nothing
- Attacher.VolumesAreAttached(): method to query volume attach status
- Attacher.GetDeviceMountPath(): method to get device mount path 
- Attacher.WaitForAttach(): kubelet maps the image on the node (and lock the image if needed)
- Attacher.MountDevice(): kubelet mounts device at the device mount path
- Detacher.UnmountDevice: kubelet unmounts device from the device mount path (currently, we need to unmaps image from the node here) (and unlock the image if needed)
- Detacher.Detach(): does nothing

**Which issue this PR fixes** *(optional, in `fixes #<issue number>(, fixes #<issue_number>, ...)` format, will close that issue when PR gets merged)*: fixes #

fixes #50142.

**Special notes for your reviewer**:

RBD changes:

  1) Modify rbdPlugin to implement volume.AttachableVolumePlugin interface.
  2) Add rbdAttacher/rbdDetacher structs to implement
  volume.Attacher/Detacher interfaces.
  3) Add mount.SafeFormatAndMount/mount.Exec fields to rbdPlugin, and setup them in
  rbdPlugin.Init for later uses. Attacher/Mounter/Unmounter/Detacher
  reference rbdPlugin to use mounter and exec. This simplifies
  code.
  4) Add testcase struct to abstract RBD Plugin test case, etc.
  5) Add newRBD constructor to unify rbd struct initialization.

Non-RBD changes:

  1) Fix FakeMounter.IsLikelyNotMountPoint to return ErrNotExist if the
  directory does not exist. Mounter.IsLikelyNotMountPoint interface
  requires this, and RBD plugin depends on it.
  2) ~~Extend Detacher.Detach method to pass `*volume.Spec`, RBD plugin
  needs it to detach device from the node.~~
  3) ~~Extend Volume.Spec struct to include namespace string, RBD Plugin needs
  it to locate objects (e.g. secrets) in Pod's namespace.~~
  4) ~~Update RABC bootstrap policy to allow
  `system:controller:attachdetach-controller` cluster role to get
  Secrets object. RBD attach/detach needs to access secrets object in
  Pod's namespace.~~

**Release note**:

```
NONE
```
This commit is contained in:
Kubernetes Submit Queue 2017-10-26 19:59:19 -07:00 committed by GitHub
commit 90a35f1d16
9 changed files with 693 additions and 323 deletions

View File

@ -78,6 +78,7 @@ func ProbeAttachableVolumePlugins() []volume.VolumePlugin {
allPlugins = append(allPlugins, storageos.ProbeVolumePlugins()...)
allPlugins = append(allPlugins, fc.ProbeVolumePlugins()...)
allPlugins = append(allPlugins, iscsi.ProbeVolumePlugins()...)
allPlugins = append(allPlugins, rbd.ProbeVolumePlugins()...)
return allPlugins
}

View File

@ -17,6 +17,7 @@ limitations under the License.
package mount
import (
"os"
"path/filepath"
"sync"
@ -136,6 +137,11 @@ func (f *FakeMounter) IsLikelyNotMountPoint(file string) (bool, error) {
f.mutex.Lock()
defer f.mutex.Unlock()
_, err := os.Stat(file)
if err != nil {
return true, err
}
// If file is a symlink, get its absolute path
absFile, err := filepath.EvalSymlinks(file)
if err != nil {

View File

@ -18,6 +18,8 @@ package mount
import (
"fmt"
"io/ioutil"
"os"
"runtime"
"testing"
@ -50,6 +52,11 @@ func TestSafeFormatAndMount(t *testing.T) {
if runtime.GOOS == "darwin" || runtime.GOOS == "windows" {
t.Skipf("not supported on GOOS=%s", runtime.GOOS)
}
mntDir, err := ioutil.TempDir(os.TempDir(), "mount")
if err != nil {
t.Fatalf("failed to create tmp dir: %v", err)
}
defer os.RemoveAll(mntDir)
tests := []struct {
description string
fstype string
@ -207,7 +214,7 @@ func TestSafeFormatAndMount(t *testing.T) {
}
device := "/dev/foo"
dest := "/mnt/bar"
dest := mntDir
err := mounter.FormatAndMount(device, dest, test.fstype, test.mountOptions)
if test.expectedError == nil {
if err != nil {

View File

@ -9,6 +9,7 @@ load(
go_library(
name = "go_default_library",
srcs = [
"attacher.go",
"disk_manager.go",
"doc.go",
"rbd.go",
@ -16,6 +17,7 @@ go_library(
],
importpath = "k8s.io/kubernetes/pkg/volume/rbd",
deps = [
"//pkg/util/file:go_default_library",
"//pkg/util/mount:go_default_library",
"//pkg/util/node:go_default_library",
"//pkg/util/strings:go_default_library",
@ -42,10 +44,10 @@ go_test(
"//pkg/util/mount:go_default_library",
"//pkg/volume:go_default_library",
"//pkg/volume/testing:go_default_library",
"//vendor/github.com/stretchr/testify/assert:go_default_library",
"//vendor/k8s.io/api/core/v1:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/types:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/util/uuid:go_default_library",
"//vendor/k8s.io/client-go/kubernetes/fake:go_default_library",
"//vendor/k8s.io/client-go/util/testing:go_default_library",
],

220
pkg/volume/rbd/attacher.go Normal file
View File

@ -0,0 +1,220 @@
/*
Copyright 2017 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package rbd
import (
"fmt"
"os"
"time"
"github.com/golang/glog"
"k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/kubernetes/pkg/util/mount"
"k8s.io/kubernetes/pkg/volume"
volutil "k8s.io/kubernetes/pkg/volume/util"
)
// NewAttacher implements AttachableVolumePlugin.NewAttacher.
func (plugin *rbdPlugin) NewAttacher() (volume.Attacher, error) {
return plugin.newAttacherInternal(&RBDUtil{})
}
func (plugin *rbdPlugin) newAttacherInternal(manager diskManager) (volume.Attacher, error) {
return &rbdAttacher{
plugin: plugin,
manager: manager,
}, nil
}
// NewDetacher implements AttachableVolumePlugin.NewDetacher.
func (plugin *rbdPlugin) NewDetacher() (volume.Detacher, error) {
return plugin.newDetacherInternal(&RBDUtil{})
}
func (plugin *rbdPlugin) newDetacherInternal(manager diskManager) (volume.Detacher, error) {
return &rbdDetacher{
plugin: plugin,
manager: manager,
}, nil
}
// GetDeviceMountRefs implements AttachableVolumePlugin.GetDeviceMountRefs.
func (plugin *rbdPlugin) GetDeviceMountRefs(deviceMountPath string) ([]string, error) {
mounter := plugin.host.GetMounter(plugin.GetPluginName())
return mount.GetMountRefs(mounter, deviceMountPath)
}
// rbdAttacher implements volume.Attacher interface.
type rbdAttacher struct {
plugin *rbdPlugin
manager diskManager
}
var _ volume.Attacher = &rbdAttacher{}
// Attach implements Attacher.Attach.
// We do not lock image here, because it requires kube-controller-manager to
// access external `rbd` utility. And there is no need since AttachDetach
// controller will not try to attach RWO volumes which are already attached to
// other nodes.
func (attacher *rbdAttacher) Attach(spec *volume.Spec, nodeName types.NodeName) (string, error) {
return "", nil
}
// VolumesAreAttached implements Attacher.VolumesAreAttached.
// There is no way to confirm whether the volume is attached or not from
// outside of the kubelet node. This method needs to return true always, like
// iSCSI, FC plugin.
func (attacher *rbdAttacher) VolumesAreAttached(specs []*volume.Spec, nodeName types.NodeName) (map[*volume.Spec]bool, error) {
volumesAttachedCheck := make(map[*volume.Spec]bool)
for _, spec := range specs {
volumesAttachedCheck[spec] = true
}
return volumesAttachedCheck, nil
}
// WaitForAttach implements Attacher.WaitForAttach. It's called by kublet to
// attach volume onto the node.
// This method is idempotent, callers are responsible for retrying on failure.
func (attacher *rbdAttacher) WaitForAttach(spec *volume.Spec, devicePath string, pod *v1.Pod, timeout time.Duration) (string, error) {
glog.V(4).Infof("rbd: waiting for attach volume (name: %s) for pod (name: %s, uid: %s)", spec.Name(), pod.Name, pod.UID)
mounter, err := attacher.plugin.createMounterFromVolumeSpecAndPod(spec, pod)
if err != nil {
glog.Warningf("failed to create mounter: %v", spec)
return "", err
}
realDevicePath, err := attacher.manager.AttachDisk(*mounter)
if err != nil {
return "", err
}
glog.V(3).Infof("rbd: successfully wait for attach volume (spec: %s, pool: %s, image: %s) at %s", spec.Name(), mounter.Pool, mounter.Image, realDevicePath)
return realDevicePath, nil
}
// GetDeviceMountPath implements Attacher.GetDeviceMountPath.
func (attacher *rbdAttacher) GetDeviceMountPath(spec *volume.Spec) (string, error) {
img, err := getVolumeSourceImage(spec)
if err != nil {
return "", err
}
pool, err := getVolumeSourcePool(spec)
if err != nil {
return "", err
}
return makePDNameInternal(attacher.plugin.host, pool, img), nil
}
// MountDevice implements Attacher.MountDevice. It is called by the kubelet to
// mount device at the given mount path.
// This method is idempotent, callers are responsible for retrying on failure.
func (attacher *rbdAttacher) MountDevice(spec *volume.Spec, devicePath string, deviceMountPath string) error {
glog.V(4).Infof("rbd: mouting device %s to %s", devicePath, deviceMountPath)
notMnt, err := attacher.plugin.mounter.IsLikelyNotMountPoint(deviceMountPath)
if err != nil {
if os.IsNotExist(err) {
if err := os.MkdirAll(deviceMountPath, 0750); err != nil {
return err
}
notMnt = true
} else {
return err
}
}
if !notMnt {
return nil
}
fstype, err := getVolumeSourceFSType(spec)
if err != nil {
return err
}
ro, err := getVolumeSourceReadOnly(spec)
if err != nil {
return err
}
options := []string{}
if ro {
options = append(options, "ro")
}
mountOptions := volume.MountOptionFromSpec(spec, options...)
err = attacher.plugin.mounter.FormatAndMount(devicePath, deviceMountPath, fstype, mountOptions)
if err != nil {
os.Remove(deviceMountPath)
return fmt.Errorf("rbd: failed to mount device %s at %s (fstype: %s), error %v", devicePath, deviceMountPath, fstype, err)
}
glog.V(3).Infof("rbd: successfully mount device %s at %s (fstype: %s)", devicePath, deviceMountPath, fstype)
return nil
}
// rbdDetacher implements volume.Detacher interface.
type rbdDetacher struct {
plugin *rbdPlugin
manager diskManager
}
var _ volume.Detacher = &rbdDetacher{}
// UnmountDevice implements Detacher.UnmountDevice. It unmounts the global
// mount of the RBD image. This is called once all bind mounts have been
// unmounted.
// Internally, it does four things:
// - Unmount device from deviceMountPath.
// - Detach device from the node.
// - Remove lock if found. (No need to check volume readonly or not, because
// device is not on the node anymore, it's safe to remove lock.)
// - Remove the deviceMountPath at last.
// This method is idempotent, callers are responsible for retrying on failure.
func (detacher *rbdDetacher) UnmountDevice(deviceMountPath string) error {
if pathExists, pathErr := volutil.PathExists(deviceMountPath); pathErr != nil {
return fmt.Errorf("Error checking if path exists: %v", pathErr)
} else if !pathExists {
glog.Warningf("Warning: Unmount skipped because path does not exist: %v", deviceMountPath)
return nil
}
devicePath, cnt, err := mount.GetDeviceNameFromMount(detacher.plugin.mounter, deviceMountPath)
if err != nil {
return err
}
if cnt > 1 {
return fmt.Errorf("rbd: more than 1 reference counts at %s", deviceMountPath)
}
if cnt == 1 {
// Unmount the device from the device mount point.
glog.V(4).Infof("rbd: unmouting device mountpoint %s", deviceMountPath)
if err = detacher.plugin.mounter.Unmount(deviceMountPath); err != nil {
return err
}
glog.V(3).Infof("rbd: successfully umount device mountpath %s", deviceMountPath)
}
glog.V(4).Infof("rbd: detaching device %s", devicePath)
err = detacher.manager.DetachDisk(detacher.plugin, deviceMountPath, devicePath)
if err != nil {
return err
}
glog.V(3).Infof("rbd: successfully detach device %s", devicePath)
err = os.Remove(deviceMountPath)
if err != nil {
return err
}
glog.V(3).Infof("rbd: successfully remove device mount point %s", deviceMountPath)
return nil
}
// Detach implements Detacher.Detach.
func (detacher *rbdDetacher) Detach(deviceName string, nodeName types.NodeName) error {
return nil
}

View File

@ -23,6 +23,7 @@ limitations under the License.
package rbd
import (
"fmt"
"os"
"github.com/golang/glog"
@ -33,23 +34,33 @@ import (
// Abstract interface to disk operations.
type diskManager interface {
// MakeGlobalPDName creates global persistent disk path.
MakeGlobalPDName(disk rbd) string
// Attaches the disk to the kubelet's host machine.
AttachDisk(disk rbdMounter) error
// If it successfully attaches, the path to the device
// is returned. Otherwise, an error will be returned.
AttachDisk(disk rbdMounter) (string, error)
// Detaches the disk from the kubelet's host machine.
DetachDisk(disk rbdUnmounter, mntPath string) error
// Creates a rbd image
DetachDisk(plugin *rbdPlugin, deviceMountPath string, device string) error
// Creates a rbd image.
CreateImage(provisioner *rbdVolumeProvisioner) (r *v1.RBDPersistentVolumeSource, volumeSizeGB int, err error)
// Deletes a rbd image
// Deletes a rbd image.
DeleteImage(deleter *rbdVolumeDeleter) error
}
// utility to mount a disk based filesystem
func diskSetUp(manager diskManager, b rbdMounter, volPath string, mounter mount.Interface, fsGroup *int64) error {
globalPDPath := manager.MakeGlobalPDName(*b.rbd)
// TODO: handle failed mounts here.
notMnt, err := mounter.IsLikelyNotMountPoint(volPath)
notMnt, err := mounter.IsLikelyNotMountPoint(globalPDPath)
if err != nil && !os.IsNotExist(err) {
glog.Errorf("cannot validate mountpoint: %s", globalPDPath)
return err
}
if notMnt {
return fmt.Errorf("no device is mounted at %s", globalPDPath)
}
notMnt, err = mounter.IsLikelyNotMountPoint(volPath)
if err != nil && !os.IsNotExist(err) {
glog.Errorf("cannot validate mountpoint: %s", volPath)
return err
@ -57,10 +68,6 @@ func diskSetUp(manager diskManager, b rbdMounter, volPath string, mounter mount.
if !notMnt {
return nil
}
if err := manager.AttachDisk(b); err != nil {
glog.Errorf("failed to attach disk")
return err
}
if err := os.MkdirAll(volPath, 0750); err != nil {
glog.Errorf("failed to mkdir:%s", volPath)
@ -89,43 +96,31 @@ func diskSetUp(manager diskManager, b rbdMounter, volPath string, mounter mount.
// utility to tear down a disk based filesystem
func diskTearDown(manager diskManager, c rbdUnmounter, volPath string, mounter mount.Interface) error {
notMnt, err := mounter.IsLikelyNotMountPoint(volPath)
if err != nil {
glog.Errorf("cannot validate mountpoint %s", volPath)
if err != nil && !os.IsNotExist(err) {
glog.Errorf("cannot validate mountpoint: %s", volPath)
return err
}
if notMnt {
glog.V(3).Infof("volume path %s is not a mountpoint, deleting", volPath)
return os.Remove(volPath)
}
refs, err := mount.GetMountRefs(mounter, volPath)
if err != nil {
glog.Errorf("failed to get reference count %s", volPath)
return err
}
// Unmount the bind-mount inside this pod.
if err := mounter.Unmount(volPath); err != nil {
glog.Errorf("failed to umount %s", volPath)
return err
}
// If len(refs) is 1, then all bind mounts have been removed, and the
// remaining reference is the global mount. It is safe to detach.
if len(refs) == 1 {
mntPath := refs[0]
if err := manager.DetachDisk(c, mntPath); err != nil {
glog.Errorf("failed to detach disk from %s", mntPath)
return err
}
}
notMnt, mntErr := mounter.IsLikelyNotMountPoint(volPath)
if mntErr != nil {
if err != nil && !os.IsNotExist(err) {
glog.Errorf("IsLikelyNotMountPoint check failed: %v", mntErr)
return err
}
if notMnt {
if err := os.Remove(volPath); err != nil {
glog.V(2).Info("Error removing mountpoint ", volPath, ": ", err)
return err
}
}
return nil
}

View File

@ -41,17 +41,21 @@ var (
// This is the primary entrypoint for volume plugins.
func ProbeVolumePlugins() []volume.VolumePlugin {
return []volume.VolumePlugin{&rbdPlugin{nil}}
return []volume.VolumePlugin{&rbdPlugin{nil, nil, nil}}
}
// rbdPlugin implements Volume.VolumePlugin.
type rbdPlugin struct {
host volume.VolumeHost
exec mount.Exec
mounter *mount.SafeFormatAndMount
}
var _ volume.VolumePlugin = &rbdPlugin{}
var _ volume.PersistentVolumePlugin = &rbdPlugin{}
var _ volume.DeletableVolumePlugin = &rbdPlugin{}
var _ volume.ProvisionableVolumePlugin = &rbdPlugin{}
var _ volume.AttachableVolumePlugin = &rbdPlugin{}
const (
rbdPluginName = "kubernetes.io/rbd"
@ -70,6 +74,8 @@ func getPath(uid types.UID, volName string, host volume.VolumeHost) string {
func (plugin *rbdPlugin) Init(host volume.VolumeHost) error {
plugin.host = host
plugin.exec = host.GetExec(plugin.GetPluginName())
plugin.mounter = volumehelper.NewSafeFormatAndMountFromHost(plugin.GetPluginName(), plugin.host)
return nil
}
@ -120,6 +126,68 @@ func (plugin *rbdPlugin) GetAccessModes() []v1.PersistentVolumeAccessMode {
}
}
func (plugin *rbdPlugin) createMounterFromVolumeSpecAndPod(spec *volume.Spec, pod *v1.Pod) (*rbdMounter, error) {
var err error
mon, err := getVolumeSourceMonitors(spec)
if err != nil {
return nil, err
}
img, err := getVolumeSourceImage(spec)
if err != nil {
return nil, err
}
fstype, err := getVolumeSourceFSType(spec)
if err != nil {
return nil, err
}
pool, err := getVolumeSourcePool(spec)
if err != nil {
return nil, err
}
id, err := getVolumeSourceUser(spec)
if err != nil {
return nil, err
}
keyring, err := getVolumeSourceKeyRing(spec)
if err != nil {
return nil, err
}
ro, err := getVolumeSourceReadOnly(spec)
if err != nil {
return nil, err
}
secretName, secretNs, err := getSecretNameAndNamespace(spec, pod.Namespace)
if err != nil {
return nil, err
}
secret := ""
if len(secretName) > 0 && len(secretNs) > 0 {
// if secret is provideded, retrieve it
kubeClient := plugin.host.GetKubeClient()
if kubeClient == nil {
return nil, fmt.Errorf("Cannot get kube client")
}
secrets, err := kubeClient.Core().Secrets(secretNs).Get(secretName, metav1.GetOptions{})
if err != nil {
err = fmt.Errorf("Couldn't get secret %v/%v err: %v", secretNs, secretName, err)
return nil, err
}
for _, data := range secrets.Data {
secret = string(data)
}
}
return &rbdMounter{
rbd: newRBD("", spec.Name(), img, pool, ro, plugin, &RBDUtil{}),
Mon: mon,
Id: id,
Keyring: keyring,
Secret: secret,
fsType: fstype,
}, nil
}
func (plugin *rbdPlugin) NewMounter(spec *volume.Spec, pod *v1.Pod, _ volume.VolumeOptions) (volume.Mounter, error) {
secretName, secretNs, err := getSecretNameAndNamespace(spec, pod.Namespace)
if err != nil {
@ -143,10 +211,10 @@ func (plugin *rbdPlugin) NewMounter(spec *volume.Spec, pod *v1.Pod, _ volume.Vol
}
// Inject real implementations here, test through the internal function.
return plugin.newMounterInternal(spec, pod.UID, &RBDUtil{}, plugin.host.GetMounter(plugin.GetPluginName()), plugin.host.GetExec(plugin.GetPluginName()), secret)
return plugin.newMounterInternal(spec, pod.UID, &RBDUtil{}, secret)
}
func (plugin *rbdPlugin) newMounterInternal(spec *volume.Spec, podUID types.UID, manager diskManager, mounter mount.Interface, exec mount.Exec, secret string) (volume.Mounter, error) {
func (plugin *rbdPlugin) newMounterInternal(spec *volume.Spec, podUID types.UID, manager diskManager, secret string) (volume.Mounter, error) {
mon, err := getVolumeSourceMonitors(spec)
if err != nil {
return nil, err
@ -177,18 +245,7 @@ func (plugin *rbdPlugin) newMounterInternal(spec *volume.Spec, podUID types.UID,
}
return &rbdMounter{
rbd: &rbd{
podUID: podUID,
volName: spec.Name(),
Image: img,
Pool: pool,
ReadOnly: ro,
manager: manager,
mounter: &mount.SafeFormatAndMount{Interface: mounter, Exec: exec},
exec: exec,
plugin: plugin,
MetricsProvider: volume.NewMetricsStatFS(getPath(podUID, spec.Name(), plugin.host)),
},
rbd: newRBD(podUID, spec.Name(), img, pool, ro, plugin, manager),
Mon: mon,
Id: id,
Keyring: keyring,
@ -200,21 +257,13 @@ func (plugin *rbdPlugin) newMounterInternal(spec *volume.Spec, podUID types.UID,
func (plugin *rbdPlugin) NewUnmounter(volName string, podUID types.UID) (volume.Unmounter, error) {
// Inject real implementations here, test through the internal function.
return plugin.newUnmounterInternal(volName, podUID, &RBDUtil{}, plugin.host.GetMounter(plugin.GetPluginName()), plugin.host.GetExec(plugin.GetPluginName()))
return plugin.newUnmounterInternal(volName, podUID, &RBDUtil{})
}
func (plugin *rbdPlugin) newUnmounterInternal(volName string, podUID types.UID, manager diskManager, mounter mount.Interface, exec mount.Exec) (volume.Unmounter, error) {
func (plugin *rbdPlugin) newUnmounterInternal(volName string, podUID types.UID, manager diskManager) (volume.Unmounter, error) {
return &rbdUnmounter{
rbdMounter: &rbdMounter{
rbd: &rbd{
podUID: podUID,
volName: volName,
manager: manager,
mounter: &mount.SafeFormatAndMount{Interface: mounter, Exec: exec},
exec: exec,
plugin: plugin,
MetricsProvider: volume.NewMetricsStatFS(getPath(podUID, volName, plugin.host)),
},
rbd: newRBD(podUID, volName, "", "", false, plugin, manager),
Mon: make([]string, 0),
},
}, nil
@ -268,15 +317,7 @@ func (plugin *rbdPlugin) NewDeleter(spec *volume.Spec) (volume.Deleter, error) {
func (plugin *rbdPlugin) newDeleterInternal(spec *volume.Spec, admin, secret string, manager diskManager) (volume.Deleter, error) {
return &rbdVolumeDeleter{
rbdMounter: &rbdMounter{
rbd: &rbd{
volName: spec.Name(),
Image: spec.PersistentVolume.Spec.RBD.RBDImage,
Pool: spec.PersistentVolume.Spec.RBD.RBDPool,
manager: manager,
plugin: plugin,
mounter: &mount.SafeFormatAndMount{Interface: plugin.host.GetMounter(plugin.GetPluginName())},
exec: plugin.host.GetExec(plugin.GetPluginName()),
},
rbd: newRBD("", spec.Name(), spec.PersistentVolume.Spec.RBD.RBDImage, spec.PersistentVolume.Spec.RBD.RBDPool, false, plugin, manager),
Mon: spec.PersistentVolume.Spec.RBD.CephMonitors,
adminId: admin,
adminSecret: secret,
@ -290,22 +331,20 @@ func (plugin *rbdPlugin) NewProvisioner(options volume.VolumeOptions) (volume.Pr
func (plugin *rbdPlugin) newProvisionerInternal(options volume.VolumeOptions, manager diskManager) (volume.Provisioner, error) {
return &rbdVolumeProvisioner{
rbdMounter: &rbdMounter{
rbd: &rbd{
manager: manager,
plugin: plugin,
mounter: &mount.SafeFormatAndMount{Interface: plugin.host.GetMounter(plugin.GetPluginName())},
exec: plugin.host.GetExec(plugin.GetPluginName()),
},
rbd: newRBD("", "", "", "", false, plugin, manager),
},
options: options,
}, nil
}
// rbdVolumeProvisioner implements volume.Provisioner interface.
type rbdVolumeProvisioner struct {
*rbdMounter
options volume.VolumeOptions
}
var _ volume.Provisioner = &rbdVolumeProvisioner{}
func (r *rbdVolumeProvisioner) Provision() (*v1.PersistentVolume, error) {
if !volume.AccessModesContainedInAll(r.plugin.GetAccessModes(), r.options.PVC.Spec.AccessModes) {
return nil, fmt.Errorf("invalid AccessModes %v: only AccessModes %v are supported", r.options.PVC.Spec.AccessModes, r.plugin.GetAccessModes())
@ -419,10 +458,13 @@ func (r *rbdVolumeProvisioner) Provision() (*v1.PersistentVolume, error) {
return pv, nil
}
// rbdVolumeDeleter implements volume.Deleter interface.
type rbdVolumeDeleter struct {
*rbdMounter
}
var _ volume.Deleter = &rbdVolumeDeleter{}
func (r *rbdVolumeDeleter) GetPath() string {
return getPath(r.podUID, r.volName, r.plugin.host)
}
@ -431,6 +473,8 @@ func (r *rbdVolumeDeleter) Delete() error {
return r.manager.DeleteImage(r)
}
// rbd implmenets volume.Volume interface.
// It's embedded in Mounter/Unmounter/Deleter.
type rbd struct {
volName string
podUID types.UID
@ -445,11 +489,35 @@ type rbd struct {
volume.MetricsProvider `json:"-"`
}
var _ volume.Volume = &rbd{}
func (rbd *rbd) GetPath() string {
// safe to use PodVolumeDir now: volume teardown occurs before pod is cleaned up
return getPath(rbd.podUID, rbd.volName, rbd.plugin.host)
}
// newRBD creates a new rbd.
func newRBD(podUID types.UID, volName string, image string, pool string, readOnly bool, plugin *rbdPlugin, manager diskManager) *rbd {
return &rbd{
podUID: podUID,
volName: volName,
Image: image,
Pool: pool,
ReadOnly: readOnly,
plugin: plugin,
mounter: plugin.mounter,
exec: plugin.exec,
manager: manager,
MetricsProvider: volume.NewMetricsStatFS(getPath(podUID, volName, plugin.host)),
}
}
// rbdMounter implements volume.Mounter interface.
// It contains information which need to be persisted in whole life cycle of PV
// on the node. It is persisted at the very beginning in the pod mount point
// directory.
// Note: Capitalized field names of this struct determines the information
// persisted on the disk, DO NOT change them. (TODO: refactoring to use a dedicated struct?)
type rbdMounter struct {
*rbd
// capitalized so they can be exported in persistRBD()
@ -488,14 +556,16 @@ func (b *rbdMounter) SetUp(fsGroup *int64) error {
func (b *rbdMounter) SetUpAt(dir string, fsGroup *int64) error {
// diskSetUp checks mountpoints and prevent repeated calls
glog.V(4).Infof("rbd: attempting to SetUp and mount %s", dir)
glog.V(4).Infof("rbd: attempting to setup at %s", dir)
err := diskSetUp(b.manager, *b, dir, b.mounter, fsGroup)
if err != nil {
glog.Errorf("rbd: failed to setup mount %s %v", dir, err)
glog.Errorf("rbd: failed to setup at %s %v", dir, err)
}
glog.V(3).Infof("rbd: successfully setup at %s", dir)
return err
}
// rbdUnmounter implements volume.Unmounter interface.
type rbdUnmounter struct {
*rbdMounter
}
@ -509,13 +579,19 @@ func (c *rbdUnmounter) TearDown() error {
}
func (c *rbdUnmounter) TearDownAt(dir string) error {
glog.V(4).Infof("rbd: attempting to teardown at %s", dir)
if pathExists, pathErr := volutil.PathExists(dir); pathErr != nil {
return fmt.Errorf("Error checking if path exists: %v", pathErr)
} else if !pathExists {
glog.Warningf("Warning: Unmount skipped because path does not exist: %v", dir)
return nil
}
return diskTearDown(c.manager, *c, dir, c.mounter)
err := diskTearDown(c.manager, *c, dir, c.mounter)
if err != nil {
return err
}
glog.V(3).Infof("rbd: successfully teardown at %s", dir)
return nil
}
func getVolumeSourceMonitors(spec *volume.Spec) ([]string, error) {

View File

@ -18,17 +18,17 @@ package rbd
import (
"fmt"
"io/ioutil"
"os"
"path/filepath"
"reflect"
"strings"
"sync"
"testing"
"time"
"github.com/stretchr/testify/assert"
"k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/uuid"
"k8s.io/client-go/kubernetes/fake"
utiltesting "k8s.io/client-go/util/testing"
"k8s.io/kubernetes/pkg/util/mount"
@ -59,37 +59,43 @@ func TestCanSupport(t *testing.T) {
}
type fakeDiskManager struct {
tmpDir string
// Make sure we can run tests in parallel.
mutex sync.RWMutex
// Key format: "<pool>/<image>"
rbdImageLocks map[string]bool
rbdMapIndex int
rbdDevices map[string]bool
}
func NewFakeDiskManager() *fakeDiskManager {
return &fakeDiskManager{
tmpDir: utiltesting.MkTmpdirOrDie("rbd_test"),
rbdImageLocks: make(map[string]bool),
rbdMapIndex: 0,
rbdDevices: make(map[string]bool),
}
}
func (fake *fakeDiskManager) Cleanup() {
os.RemoveAll(fake.tmpDir)
func (fake *fakeDiskManager) MakeGlobalPDName(rbd rbd) string {
return makePDNameInternal(rbd.plugin.host, rbd.Pool, rbd.Image)
}
func (fake *fakeDiskManager) MakeGlobalPDName(disk rbd) string {
return fake.tmpDir
}
func (fake *fakeDiskManager) AttachDisk(b rbdMounter) error {
globalPath := b.manager.MakeGlobalPDName(*b.rbd)
err := os.MkdirAll(globalPath, 0750)
if err != nil {
return err
}
return nil
func (fake *fakeDiskManager) AttachDisk(b rbdMounter) (string, error) {
fake.mutex.Lock()
defer fake.mutex.Unlock()
fake.rbdMapIndex += 1
devicePath := fmt.Sprintf("/dev/rbd%d", fake.rbdMapIndex)
fake.rbdDevices[devicePath] = true
return devicePath, nil
}
func (fake *fakeDiskManager) DetachDisk(c rbdUnmounter, mntPath string) error {
globalPath := c.manager.MakeGlobalPDName(*c.rbd)
err := os.RemoveAll(globalPath)
if err != nil {
return err
func (fake *fakeDiskManager) DetachDisk(r *rbdPlugin, deviceMountPath string, device string) error {
fake.mutex.Lock()
defer fake.mutex.Unlock()
ok := fake.rbdDevices[device]
if !ok {
return fmt.Errorf("rbd: failed to detach device %s, it does not exist", device)
}
delete(fake.rbdDevices, device)
return nil
}
@ -101,35 +107,111 @@ func (fake *fakeDiskManager) DeleteImage(deleter *rbdVolumeDeleter) error {
return fmt.Errorf("not implemented")
}
func doTestPlugin(t *testing.T, spec *volume.Spec) {
tmpDir, err := utiltesting.MkTmpdir("rbd_test")
if err != nil {
t.Fatalf("error creating temp dir: %v", err)
func (fake *fakeDiskManager) Fencing(r rbdMounter, nodeName string) error {
fake.mutex.Lock()
defer fake.mutex.Unlock()
key := fmt.Sprintf("%s/%s", r.Pool, r.Image)
isLocked, ok := fake.rbdImageLocks[key]
if ok && isLocked {
// not expected in testing
return fmt.Errorf("%s is already locked", key)
}
defer os.RemoveAll(tmpDir)
fake.rbdImageLocks[key] = true
return nil
}
func (fake *fakeDiskManager) Defencing(r rbdMounter, nodeName string) error {
fake.mutex.Lock()
defer fake.mutex.Unlock()
key := fmt.Sprintf("%s/%s", r.Pool, r.Image)
isLocked, ok := fake.rbdImageLocks[key]
if !ok || !isLocked {
// not expected in testing
return fmt.Errorf("%s is not locked", key)
}
delete(fake.rbdImageLocks, key)
return nil
}
func (fake *fakeDiskManager) IsLocked(r rbdMounter, nodeName string) (bool, error) {
fake.mutex.RLock()
defer fake.mutex.RUnlock()
key := fmt.Sprintf("%s/%s", r.Pool, r.Image)
isLocked, ok := fake.rbdImageLocks[key]
return ok && isLocked, nil
}
// checkMounterLog checks fakeMounter must have expected logs, and the last action msut equal to expectedAction.
func checkMounterLog(t *testing.T, fakeMounter *mount.FakeMounter, expected int, expectedAction mount.FakeAction) {
if len(fakeMounter.Log) != expected {
t.Fatalf("fakeMounter should have %d logs, actual: %d", expected, len(fakeMounter.Log))
}
lastIndex := len(fakeMounter.Log) - 1
lastAction := fakeMounter.Log[lastIndex]
if !reflect.DeepEqual(expectedAction, lastAction) {
t.Fatalf("fakeMounter.Log[%d] should be %v, not: %v", lastIndex, expectedAction, lastAction)
}
}
func doTestPlugin(t *testing.T, c *testcase) {
fakeVolumeHost := volumetest.NewFakeVolumeHost(c.root, nil, nil)
plugMgr := volume.VolumePluginMgr{}
plugMgr.InitPlugins(ProbeVolumePlugins(), nil /* prober */, volumetest.NewFakeVolumeHost(tmpDir, nil, nil))
plugMgr.InitPlugins(ProbeVolumePlugins(), nil /* prober */, fakeVolumeHost)
plug, err := plugMgr.FindPluginByName("kubernetes.io/rbd")
if err != nil {
t.Errorf("Can't find the plugin by name")
}
fakeMounter := fakeVolumeHost.GetMounter(plug.GetPluginName()).(*mount.FakeMounter)
fakeNodeName := types.NodeName("localhost")
fdm := NewFakeDiskManager()
defer fdm.Cleanup()
exec := mount.NewFakeExec(nil)
mounter, err := plug.(*rbdPlugin).newMounterInternal(spec, types.UID("poduid"), fdm, &mount.FakeMounter{}, exec, "secrets")
// attacher
attacher, err := plug.(*rbdPlugin).newAttacherInternal(fdm)
if err != nil {
t.Errorf("Failed to make a new Attacher: %v", err)
}
deviceAttachPath, err := attacher.Attach(c.spec, fakeNodeName)
if err != nil {
t.Fatal(err)
}
devicePath, err := attacher.WaitForAttach(c.spec, deviceAttachPath, c.pod, time.Second*10)
if err != nil {
t.Fatal(err)
}
if devicePath != c.expectedDevicePath {
t.Errorf("Unexpected path, expected %q, not: %q", c.expectedDevicePath, devicePath)
}
deviceMountPath, err := attacher.GetDeviceMountPath(c.spec)
if err != nil {
t.Fatal(err)
}
if deviceMountPath != c.expectedDeviceMountPath {
t.Errorf("Unexpected mount path, expected %q, not: %q", c.expectedDeviceMountPath, deviceMountPath)
}
err = attacher.MountDevice(c.spec, devicePath, deviceMountPath)
if err != nil {
t.Fatal(err)
}
if _, err := os.Stat(deviceMountPath); err != nil {
if os.IsNotExist(err) {
t.Errorf("Attacher.MountDevice() failed, device mount path not created: %s", deviceMountPath)
} else {
t.Errorf("Attacher.MountDevice() failed: %v", err)
}
}
checkMounterLog(t, fakeMounter, 1, mount.FakeAction{Action: "mount", Target: c.expectedDeviceMountPath, Source: devicePath, FSType: "ext4"})
// mounter
mounter, err := plug.(*rbdPlugin).newMounterInternal(c.spec, c.pod.UID, fdm, "secrets")
if err != nil {
t.Errorf("Failed to make a new Mounter: %v", err)
}
if mounter == nil {
t.Error("Got a nil Mounter")
}
path := mounter.GetPath()
expectedPath := fmt.Sprintf("%s/pods/poduid/volumes/kubernetes.io~rbd/vol1", tmpDir)
if path != expectedPath {
t.Errorf("Unexpected path, expected %q, got: %q", expectedPath, path)
if path != c.expectedPodMountPath {
t.Errorf("Unexpected path, expected %q, got: %q", c.expectedPodMountPath, path)
}
if err := mounter.SetUp(nil); err != nil {
@ -142,8 +224,10 @@ func doTestPlugin(t *testing.T, spec *volume.Spec) {
t.Errorf("SetUp() failed: %v", err)
}
}
checkMounterLog(t, fakeMounter, 2, mount.FakeAction{Action: "mount", Target: c.expectedPodMountPath, Source: devicePath, FSType: ""})
unmounter, err := plug.(*rbdPlugin).newUnmounterInternal("vol1", types.UID("poduid"), fdm, &mount.FakeMounter{}, exec)
// unmounter
unmounter, err := plug.(*rbdPlugin).newUnmounterInternal(c.spec.Name(), c.pod.UID, fdm)
if err != nil {
t.Errorf("Failed to make a new Unmounter: %v", err)
}
@ -159,38 +243,98 @@ func doTestPlugin(t *testing.T, spec *volume.Spec) {
} else if !os.IsNotExist(err) {
t.Errorf("SetUp() failed: %v", err)
}
checkMounterLog(t, fakeMounter, 3, mount.FakeAction{Action: "unmount", Target: c.expectedPodMountPath, Source: "", FSType: ""})
// detacher
detacher, err := plug.(*rbdPlugin).newDetacherInternal(fdm)
if err != nil {
t.Errorf("Failed to make a new Attacher: %v", err)
}
err = detacher.UnmountDevice(deviceMountPath)
if err != nil {
t.Fatalf("Detacher.UnmountDevice failed to unmount %s", deviceMountPath)
}
checkMounterLog(t, fakeMounter, 4, mount.FakeAction{Action: "unmount", Target: c.expectedDeviceMountPath, Source: "", FSType: ""})
err = detacher.Detach(deviceMountPath, fakeNodeName)
if err != nil {
t.Fatalf("Detacher.Detach failed to detach %s from %s", deviceMountPath, fakeNodeName)
}
}
func TestPluginVolume(t *testing.T) {
vol := &v1.Volume{
type testcase struct {
spec *volume.Spec
root string
pod *v1.Pod
expectedDevicePath string
expectedDeviceMountPath string
expectedPodMountPath string
}
func TestPlugin(t *testing.T) {
tmpDir, err := utiltesting.MkTmpdir("rbd_test")
if err != nil {
t.Fatalf("error creating temp dir: %v", err)
}
defer os.RemoveAll(tmpDir)
podUID := uuid.NewUUID()
var cases []*testcase
cases = append(cases, &testcase{
spec: volume.NewSpecFromVolume(&v1.Volume{
Name: "vol1",
VolumeSource: v1.VolumeSource{
RBD: &v1.RBDVolumeSource{
CephMonitors: []string{"a", "b"},
RBDImage: "bar",
RBDPool: "pool1",
RBDImage: "image1",
FSType: "ext4",
},
},
}
doTestPlugin(t, volume.NewSpecFromVolume(vol))
}
func TestPluginPersistentVolume(t *testing.T) {
vol := &v1.PersistentVolume{
}),
root: tmpDir,
pod: &v1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: "vol1",
Name: "testpod",
Namespace: "testns",
UID: podUID,
},
},
expectedDevicePath: "/dev/rbd1",
expectedDeviceMountPath: fmt.Sprintf("%s/plugins/kubernetes.io/rbd/rbd/pool1-image-image1", tmpDir),
expectedPodMountPath: fmt.Sprintf("%s/pods/%s/volumes/kubernetes.io~rbd/vol1", tmpDir, podUID),
})
cases = append(cases, &testcase{
spec: volume.NewSpecFromPersistentVolume(&v1.PersistentVolume{
ObjectMeta: metav1.ObjectMeta{
Name: "vol2",
},
Spec: v1.PersistentVolumeSpec{
PersistentVolumeSource: v1.PersistentVolumeSource{
RBD: &v1.RBDPersistentVolumeSource{
CephMonitors: []string{"a", "b"},
RBDImage: "bar",
RBDPool: "pool2",
RBDImage: "image2",
FSType: "ext4",
},
},
},
}
}, false),
root: tmpDir,
pod: &v1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: "testpod",
Namespace: "testns",
UID: podUID,
},
},
expectedDevicePath: "/dev/rbd1",
expectedDeviceMountPath: fmt.Sprintf("%s/plugins/kubernetes.io/rbd/rbd/pool2-image-image2", tmpDir),
expectedPodMountPath: fmt.Sprintf("%s/pods/%s/volumes/kubernetes.io~rbd/vol2", tmpDir, podUID),
})
doTestPlugin(t, volume.NewSpecFromPersistentVolume(vol, false))
for i := 0; i < len(cases); i++ {
doTestPlugin(t, cases[i])
}
}
func TestPersistentClaimReadOnlyFlag(t *testing.T) {
@ -250,87 +394,6 @@ func TestPersistentClaimReadOnlyFlag(t *testing.T) {
}
}
func TestPersistAndLoadRBD(t *testing.T) {
tmpDir, err := utiltesting.MkTmpdir("rbd_test")
if err != nil {
t.Fatalf("error creating temp dir: %v", err)
}
defer os.RemoveAll(tmpDir)
testcases := []struct {
rbdMounter rbdMounter
expectedJSONStr string
expectedLoadedRBDMounter rbdMounter
}{
{
rbdMounter{},
`{"Mon":null,"Id":"","Keyring":"","Secret":""}`,
rbdMounter{},
},
{
rbdMounter{
rbd: &rbd{
podUID: "poduid",
Pool: "kube",
Image: "some-test-image",
ReadOnly: false,
MetricsProvider: volume.NewMetricsStatFS("/tmp"),
},
Mon: []string{"127.0.0.1"},
Id: "kube",
Keyring: "",
Secret: "QVFEcTdKdFp4SmhtTFJBQUNwNDI3UnhGRzBvQ1Y0SUJwLy9pRUE9PQ==",
},
`
{
"Pool": "kube",
"Image": "some-test-image",
"ReadOnly": false,
"Mon": ["127.0.0.1"],
"Id": "kube",
"Keyring": "",
"Secret": "QVFEcTdKdFp4SmhtTFJBQUNwNDI3UnhGRzBvQ1Y0SUJwLy9pRUE9PQ=="
}
`,
rbdMounter{
rbd: &rbd{
Pool: "kube",
Image: "some-test-image",
ReadOnly: false,
},
Mon: []string{"127.0.0.1"},
Id: "kube",
Keyring: "",
Secret: "QVFEcTdKdFp4SmhtTFJBQUNwNDI3UnhGRzBvQ1Y0SUJwLy9pRUE9PQ==",
},
},
}
util := &RBDUtil{}
for _, c := range testcases {
err = util.persistRBD(c.rbdMounter, tmpDir)
if err != nil {
t.Errorf("failed to persist rbd: %v, err: %v", c.rbdMounter, err)
}
jsonFile := filepath.Join(tmpDir, "rbd.json")
jsonData, err := ioutil.ReadFile(jsonFile)
if err != nil {
t.Errorf("failed to read json file %s: %v", jsonFile, err)
}
if !assert.JSONEq(t, c.expectedJSONStr, string(jsonData)) {
t.Errorf("json file does not match expected one: %s, should be %s", string(jsonData), c.expectedJSONStr)
}
tmpRBDMounter := rbdMounter{}
err = util.loadRBD(&tmpRBDMounter, tmpDir)
if err != nil {
t.Errorf("faild to load rbd: %v", err)
}
if !reflect.DeepEqual(tmpRBDMounter, c.expectedLoadedRBDMounter) {
t.Errorf("loaded rbd does not equal to expected one: %v, should be %v", tmpRBDMounter, c.rbdMounter)
}
}
}
func TestGetSecretNameAndNamespace(t *testing.T) {
secretName := "test-secret-name"
secretNamespace := "test-secret-namespace"

View File

@ -35,9 +35,10 @@ import (
"github.com/golang/glog"
"k8s.io/api/core/v1"
"k8s.io/kubernetes/pkg/util/mount"
fileutil "k8s.io/kubernetes/pkg/util/file"
"k8s.io/kubernetes/pkg/util/node"
"k8s.io/kubernetes/pkg/volume"
volutil "k8s.io/kubernetes/pkg/volume/util"
)
const (
@ -107,11 +108,15 @@ func makePDNameInternal(host volume.VolumeHost, pool string, image string) strin
return path.Join(host.GetPluginDir(rbdPluginName), "rbd", pool+"-image-"+image)
}
// RBDUtil implements diskManager interface.
type RBDUtil struct{}
var _ diskManager = &RBDUtil{}
func (util *RBDUtil) MakeGlobalPDName(rbd rbd) string {
return makePDNameInternal(rbd.plugin.host, rbd.Pool, rbd.Image)
}
func rbdErrors(runErr, resultErr error) error {
if runErr.Error() == rbdCmdErr {
return fmt.Errorf("rbd: rbd cmd not found")
@ -119,6 +124,8 @@ func rbdErrors(runErr, resultErr error) error {
return resultErr
}
// rbdLock acquires a lock on image if lock is true, otherwise releases if a
// lock is found on image.
func (util *RBDUtil) rbdLock(b rbdMounter, lock bool) error {
var err error
var output, locker string
@ -188,6 +195,9 @@ func (util *RBDUtil) rbdLock(b rbdMounter, lock bool) error {
args := []string{"lock", "add", b.Image, lock_id, "--pool", b.Pool, "--id", b.Id, "-m", mon}
args = append(args, secret_opt...)
cmd, err = b.exec.Run("rbd", args...)
if err == nil {
glog.V(4).Infof("rbd: successfully add lock (locker_id: %s) on image: %s/%s with id %s mon %s", lock_id, b.Pool, b.Image, b.Id, mon)
}
} else {
// defencing, find locker name
ind := strings.LastIndex(output, lock_id) - 1
@ -197,85 +207,38 @@ func (util *RBDUtil) rbdLock(b rbdMounter, lock bool) error {
break
}
}
// remove a lock: rbd lock remove
// remove a lock if found: rbd lock remove
if len(locker) > 0 {
args := []string{"lock", "remove", b.Image, lock_id, locker, "--pool", b.Pool, "--id", b.Id, "-m", mon}
args = append(args, secret_opt...)
cmd, err = b.exec.Run("rbd", args...)
if err == nil {
glog.V(4).Infof("rbd: successfully remove lock (locker_id: %s) on image: %s/%s with id %s mon %s", lock_id, b.Pool, b.Image, b.Id, mon)
}
}
}
if err == nil {
//lock is acquired
// break if operation succeeds
break
}
}
return err
}
func (util *RBDUtil) persistRBD(rbd rbdMounter, mnt string) error {
file := path.Join(mnt, "rbd.json")
fp, err := os.Create(file)
if err != nil {
return fmt.Errorf("rbd: create err %s/%s", file, err)
}
defer fp.Close()
encoder := json.NewEncoder(fp)
if err = encoder.Encode(rbd); err != nil {
return fmt.Errorf("rbd: encode err: %v.", err)
}
return nil
}
func (util *RBDUtil) loadRBD(mounter *rbdMounter, mnt string) error {
file := path.Join(mnt, "rbd.json")
fp, err := os.Open(file)
if err != nil {
return fmt.Errorf("rbd: open err %s/%s", file, err)
}
defer fp.Close()
decoder := json.NewDecoder(fp)
if err = decoder.Decode(mounter); err != nil {
return fmt.Errorf("rbd: decode err: %v.", err)
}
return nil
}
func (util *RBDUtil) fencing(b rbdMounter) error {
// no need to fence readOnly
if (&b).GetAttributes().ReadOnly {
return nil
}
return util.rbdLock(b, true)
}
func (util *RBDUtil) defencing(c rbdUnmounter) error {
// no need to fence readOnly
if c.ReadOnly {
return nil
}
return util.rbdLock(*c.rbdMounter, false)
}
func (util *RBDUtil) AttachDisk(b rbdMounter) error {
// AttachDisk attaches the disk on the node.
// If Volume is not read-only, acquire a lock on image first.
func (util *RBDUtil) AttachDisk(b rbdMounter) (string, error) {
var err error
var output []byte
// create mount point
globalPDPath := b.manager.MakeGlobalPDName(*b.rbd)
notMnt, err := b.mounter.IsLikelyNotMountPoint(globalPDPath)
// in the first time, the path shouldn't exist and IsLikelyNotMountPoint is expected to get NotExist
if err != nil && !os.IsNotExist(err) {
return fmt.Errorf("rbd: %s failed to check mountpoint", globalPDPath)
globalPDPath := util.MakeGlobalPDName(*b.rbd)
if pathExists, pathErr := volutil.PathExists(globalPDPath); pathErr != nil {
return "", fmt.Errorf("Error checking if path exists: %v", pathErr)
} else if !pathExists {
if err := os.MkdirAll(globalPDPath, 0750); err != nil {
return "", err
}
if !notMnt {
return nil
}
if err = os.MkdirAll(globalPDPath, 0750); err != nil {
return fmt.Errorf("rbd: failed to mkdir %s, error", globalPDPath)
}
devicePath, found := waitForPath(b.Pool, b.Image, 1)
@ -285,16 +248,16 @@ func (util *RBDUtil) AttachDisk(b rbdMounter) error {
glog.Warningf("rbd: failed to load rbd kernel module:%v", err)
}
// fence off other mappers
if err = util.fencing(b); err != nil {
return rbdErrors(err, fmt.Errorf("rbd: failed to lock image %s (maybe locked by other nodes), error %v", b.Image, err))
// Currently, we don't acquire advisory lock on image, but for backward
// compatibility, we need to check if the image is being used by nodes running old kubelet.
found, err := util.rbdStatus(&b)
if err != nil {
return "", err
}
if found {
glog.Info("rbd is still being used ", b.Image)
return "", fmt.Errorf("rbd %s is still being used", b.Image)
}
// rbd lock remove needs ceph and image config
// but kubelet doesn't get them from apiserver during teardown
// so persit rbd config so upon disk detach, rbd lock can be removed
// since rbd json is persisted in the same local directory that is used as rbd mountpoint later,
// the json file remains invisible during rbd mount and thus won't be removed accidentally.
util.persistRBD(b, globalPDPath)
// rbd map
l := len(b.Mon)
@ -317,51 +280,80 @@ func (util *RBDUtil) AttachDisk(b rbdMounter) error {
glog.V(1).Infof("rbd: map error %v %s", err, string(output))
}
if err != nil {
return fmt.Errorf("rbd: map failed %v %s", err, string(output))
return "", fmt.Errorf("rbd: map failed %v %s", err, string(output))
}
devicePath, found = waitForPath(b.Pool, b.Image, 10)
if !found {
return errors.New("Could not map image: Timeout after 10s")
return "", errors.New("Could not map image: Timeout after 10s")
}
glog.V(3).Infof("rbd: successfully map image %s/%s to %s", b.Pool, b.Image, devicePath)
}
// mount it
if err = b.mounter.FormatAndMount(devicePath, globalPDPath, b.fsType, nil); err != nil {
err = fmt.Errorf("rbd: failed to mount rbd volume %s [%s] to %s, error %v", devicePath, b.fsType, globalPDPath, err)
}
glog.V(3).Infof("rbd: successfully mount image %s/%s at %s", b.Pool, b.Image, globalPDPath)
return err
return devicePath, err
}
func (util *RBDUtil) DetachDisk(c rbdUnmounter, mntPath string) error {
device, cnt, err := mount.GetDeviceNameFromMount(c.mounter, mntPath)
if err != nil {
return fmt.Errorf("rbd detach disk: failed to get device from mnt: %s\nError: %v", mntPath, err)
}
if err = c.mounter.Unmount(mntPath); err != nil {
return fmt.Errorf("rbd detach disk: failed to umount: %s\nError: %v", mntPath, err)
}
glog.V(3).Infof("rbd: successfully umount mountpoint %s", mntPath)
// if device is no longer used, see if can unmap
if cnt <= 1 {
// DetachDisk detaches the disk from the node.
// It detaches device from the node if device is provided, and removes the lock
// if there is persisted RBD info under deviceMountPath.
func (util *RBDUtil) DetachDisk(plugin *rbdPlugin, deviceMountPath string, device string) error {
var err error
if len(device) > 0 {
// rbd unmap
_, err = c.exec.Run("rbd", "unmap", device)
_, err = plugin.exec.Run("rbd", "unmap", device)
if err != nil {
return rbdErrors(err, fmt.Errorf("rbd: failed to unmap device %s:Error: %v", device, err))
}
// load ceph and image/pool info to remove fencing
if err := util.loadRBD(c.rbdMounter, mntPath); err == nil {
// remove rbd lock
util.defencing(c)
}
glog.V(3).Infof("rbd: successfully unmap device %s", device)
}
// Currently, we don't persist rbd info on the disk, but for backward
// compatbility, we need to clean it if found.
rbdFile := path.Join(deviceMountPath, "rbd.json")
exists, err := fileutil.FileExists(rbdFile)
if err != nil {
return err
}
if exists {
glog.V(3).Infof("rbd: old rbd.json is found under %s, cleaning it", deviceMountPath)
err = util.cleanOldRBDFile(plugin, rbdFile)
if err != nil {
glog.Errorf("rbd: failed to clean %s", rbdFile)
return err
}
glog.V(3).Infof("rbd: successfully remove %s", rbdFile)
}
return nil
}
// cleanOldRBDFile read rbd info from rbd.json file and removes lock if found.
// At last, it removes rbd.json file.
func (util *RBDUtil) cleanOldRBDFile(plugin *rbdPlugin, rbdFile string) error {
mounter := &rbdMounter{
// util.rbdLock needs it to run command.
rbd: newRBD("", "", "", "", false, plugin, util),
}
fp, err := os.Open(rbdFile)
if err != nil {
return fmt.Errorf("rbd: open err %s/%s", rbdFile, err)
}
defer fp.Close()
decoder := json.NewDecoder(fp)
if err = decoder.Decode(mounter); err != nil {
return fmt.Errorf("rbd: decode err: %v.", err)
}
if err != nil {
glog.Errorf("failed to load rbd info from %s: %v", rbdFile, err)
return err
}
// remove rbd lock if found
// the disk is not attached to this node anymore, so the lock on image
// for this node can be removed safely
err = util.rbdLock(*mounter, false)
if err == nil {
os.Remove(rbdFile)
}
return err
}
func (util *RBDUtil) CreateImage(p *rbdVolumeProvisioner) (r *v1.RBDPersistentVolumeSource, size int, err error) {
var output []byte
capacity := p.options.PVC.Spec.Resources.Requests[v1.ResourceName(v1.ResourceStorage)]
@ -442,6 +434,14 @@ func (util *RBDUtil) rbdStatus(b *rbdMounter) (bool, error) {
var output string
var cmd []byte
// If we don't have admin id/secret (e.g. attaching), fallback to user id/secret.
id := b.adminId
secret := b.adminSecret
if id == "" {
id = b.Id
secret = b.Secret
}
l := len(b.Mon)
start := rand.Int() % l
// iterate all hosts until rbd command succeeds.
@ -461,9 +461,9 @@ func (util *RBDUtil) rbdStatus(b *rbdMounter) (bool, error) {
// # image does not exist (exit=2)
// rbd: error opening image kubernetes-dynamic-pvc-<UUID>: (2) No such file or directory
//
glog.V(4).Infof("rbd: status %s using mon %s, pool %s id %s key %s", b.Image, mon, b.Pool, b.adminId, b.adminSecret)
glog.V(4).Infof("rbd: status %s using mon %s, pool %s id %s key %s", b.Image, mon, b.Pool, id, secret)
cmd, err = b.exec.Run("rbd",
"status", b.Image, "--pool", b.Pool, "-m", mon, "--id", b.adminId, "--key="+b.adminSecret)
"status", b.Image, "--pool", b.Pool, "-m", mon, "--id", id, "--key="+secret)
output = string(cmd)
// break if command succeeds