diff --git a/cmd/kube-controller-manager/app/plugins.go b/cmd/kube-controller-manager/app/plugins.go index bf9c9fe1299..716539b9fe0 100644 --- a/cmd/kube-controller-manager/app/plugins.go +++ b/cmd/kube-controller-manager/app/plugins.go @@ -78,6 +78,7 @@ func ProbeAttachableVolumePlugins() []volume.VolumePlugin { allPlugins = append(allPlugins, storageos.ProbeVolumePlugins()...) allPlugins = append(allPlugins, fc.ProbeVolumePlugins()...) allPlugins = append(allPlugins, iscsi.ProbeVolumePlugins()...) + allPlugins = append(allPlugins, rbd.ProbeVolumePlugins()...) return allPlugins } diff --git a/pkg/util/mount/fake.go b/pkg/util/mount/fake.go index d34510ae2a8..f4e2e411de1 100644 --- a/pkg/util/mount/fake.go +++ b/pkg/util/mount/fake.go @@ -17,6 +17,7 @@ limitations under the License. package mount import ( + "os" "path/filepath" "sync" @@ -136,6 +137,11 @@ func (f *FakeMounter) IsLikelyNotMountPoint(file string) (bool, error) { f.mutex.Lock() defer f.mutex.Unlock() + _, err := os.Stat(file) + if err != nil { + return true, err + } + // If file is a symlink, get its absolute path absFile, err := filepath.EvalSymlinks(file) if err != nil { diff --git a/pkg/util/mount/safe_format_and_mount_test.go b/pkg/util/mount/safe_format_and_mount_test.go index 72b768f3bf4..a7e7cc29a23 100644 --- a/pkg/util/mount/safe_format_and_mount_test.go +++ b/pkg/util/mount/safe_format_and_mount_test.go @@ -18,6 +18,8 @@ package mount import ( "fmt" + "io/ioutil" + "os" "runtime" "testing" @@ -50,6 +52,11 @@ func TestSafeFormatAndMount(t *testing.T) { if runtime.GOOS == "darwin" || runtime.GOOS == "windows" { t.Skipf("not supported on GOOS=%s", runtime.GOOS) } + mntDir, err := ioutil.TempDir(os.TempDir(), "mount") + if err != nil { + t.Fatalf("failed to create tmp dir: %v", err) + } + defer os.RemoveAll(mntDir) tests := []struct { description string fstype string @@ -207,7 +214,7 @@ func TestSafeFormatAndMount(t *testing.T) { } device := "/dev/foo" - dest := "/mnt/bar" + dest := mntDir err := mounter.FormatAndMount(device, dest, test.fstype, test.mountOptions) if test.expectedError == nil { if err != nil { diff --git a/pkg/volume/rbd/BUILD b/pkg/volume/rbd/BUILD index 2ead168a5c3..aef8b366b4e 100644 --- a/pkg/volume/rbd/BUILD +++ b/pkg/volume/rbd/BUILD @@ -9,6 +9,7 @@ load( go_library( name = "go_default_library", srcs = [ + "attacher.go", "disk_manager.go", "doc.go", "rbd.go", @@ -16,6 +17,7 @@ go_library( ], importpath = "k8s.io/kubernetes/pkg/volume/rbd", deps = [ + "//pkg/util/file:go_default_library", "//pkg/util/mount:go_default_library", "//pkg/util/node:go_default_library", "//pkg/util/strings:go_default_library", @@ -42,10 +44,10 @@ go_test( "//pkg/util/mount:go_default_library", "//pkg/volume:go_default_library", "//pkg/volume/testing:go_default_library", - "//vendor/github.com/stretchr/testify/assert:go_default_library", "//vendor/k8s.io/api/core/v1:go_default_library", "//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library", "//vendor/k8s.io/apimachinery/pkg/types:go_default_library", + "//vendor/k8s.io/apimachinery/pkg/util/uuid:go_default_library", "//vendor/k8s.io/client-go/kubernetes/fake:go_default_library", "//vendor/k8s.io/client-go/util/testing:go_default_library", ], diff --git a/pkg/volume/rbd/attacher.go b/pkg/volume/rbd/attacher.go new file mode 100644 index 00000000000..7d2fab2bbb9 --- /dev/null +++ b/pkg/volume/rbd/attacher.go @@ -0,0 +1,220 @@ +/* +Copyright 2017 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package rbd + +import ( + "fmt" + "os" + "time" + + "github.com/golang/glog" + "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/types" + "k8s.io/kubernetes/pkg/util/mount" + "k8s.io/kubernetes/pkg/volume" + volutil "k8s.io/kubernetes/pkg/volume/util" +) + +// NewAttacher implements AttachableVolumePlugin.NewAttacher. +func (plugin *rbdPlugin) NewAttacher() (volume.Attacher, error) { + return plugin.newAttacherInternal(&RBDUtil{}) +} + +func (plugin *rbdPlugin) newAttacherInternal(manager diskManager) (volume.Attacher, error) { + return &rbdAttacher{ + plugin: plugin, + manager: manager, + }, nil +} + +// NewDetacher implements AttachableVolumePlugin.NewDetacher. +func (plugin *rbdPlugin) NewDetacher() (volume.Detacher, error) { + return plugin.newDetacherInternal(&RBDUtil{}) +} + +func (plugin *rbdPlugin) newDetacherInternal(manager diskManager) (volume.Detacher, error) { + return &rbdDetacher{ + plugin: plugin, + manager: manager, + }, nil +} + +// GetDeviceMountRefs implements AttachableVolumePlugin.GetDeviceMountRefs. +func (plugin *rbdPlugin) GetDeviceMountRefs(deviceMountPath string) ([]string, error) { + mounter := plugin.host.GetMounter(plugin.GetPluginName()) + return mount.GetMountRefs(mounter, deviceMountPath) +} + +// rbdAttacher implements volume.Attacher interface. +type rbdAttacher struct { + plugin *rbdPlugin + manager diskManager +} + +var _ volume.Attacher = &rbdAttacher{} + +// Attach implements Attacher.Attach. +// We do not lock image here, because it requires kube-controller-manager to +// access external `rbd` utility. And there is no need since AttachDetach +// controller will not try to attach RWO volumes which are already attached to +// other nodes. +func (attacher *rbdAttacher) Attach(spec *volume.Spec, nodeName types.NodeName) (string, error) { + return "", nil +} + +// VolumesAreAttached implements Attacher.VolumesAreAttached. +// There is no way to confirm whether the volume is attached or not from +// outside of the kubelet node. This method needs to return true always, like +// iSCSI, FC plugin. +func (attacher *rbdAttacher) VolumesAreAttached(specs []*volume.Spec, nodeName types.NodeName) (map[*volume.Spec]bool, error) { + volumesAttachedCheck := make(map[*volume.Spec]bool) + for _, spec := range specs { + volumesAttachedCheck[spec] = true + } + return volumesAttachedCheck, nil +} + +// WaitForAttach implements Attacher.WaitForAttach. It's called by kublet to +// attach volume onto the node. +// This method is idempotent, callers are responsible for retrying on failure. +func (attacher *rbdAttacher) WaitForAttach(spec *volume.Spec, devicePath string, pod *v1.Pod, timeout time.Duration) (string, error) { + glog.V(4).Infof("rbd: waiting for attach volume (name: %s) for pod (name: %s, uid: %s)", spec.Name(), pod.Name, pod.UID) + mounter, err := attacher.plugin.createMounterFromVolumeSpecAndPod(spec, pod) + if err != nil { + glog.Warningf("failed to create mounter: %v", spec) + return "", err + } + realDevicePath, err := attacher.manager.AttachDisk(*mounter) + if err != nil { + return "", err + } + glog.V(3).Infof("rbd: successfully wait for attach volume (spec: %s, pool: %s, image: %s) at %s", spec.Name(), mounter.Pool, mounter.Image, realDevicePath) + return realDevicePath, nil +} + +// GetDeviceMountPath implements Attacher.GetDeviceMountPath. +func (attacher *rbdAttacher) GetDeviceMountPath(spec *volume.Spec) (string, error) { + img, err := getVolumeSourceImage(spec) + if err != nil { + return "", err + } + pool, err := getVolumeSourcePool(spec) + if err != nil { + return "", err + } + return makePDNameInternal(attacher.plugin.host, pool, img), nil +} + +// MountDevice implements Attacher.MountDevice. It is called by the kubelet to +// mount device at the given mount path. +// This method is idempotent, callers are responsible for retrying on failure. +func (attacher *rbdAttacher) MountDevice(spec *volume.Spec, devicePath string, deviceMountPath string) error { + glog.V(4).Infof("rbd: mouting device %s to %s", devicePath, deviceMountPath) + notMnt, err := attacher.plugin.mounter.IsLikelyNotMountPoint(deviceMountPath) + if err != nil { + if os.IsNotExist(err) { + if err := os.MkdirAll(deviceMountPath, 0750); err != nil { + return err + } + notMnt = true + } else { + return err + } + } + if !notMnt { + return nil + } + fstype, err := getVolumeSourceFSType(spec) + if err != nil { + return err + } + ro, err := getVolumeSourceReadOnly(spec) + if err != nil { + return err + } + options := []string{} + if ro { + options = append(options, "ro") + } + mountOptions := volume.MountOptionFromSpec(spec, options...) + err = attacher.plugin.mounter.FormatAndMount(devicePath, deviceMountPath, fstype, mountOptions) + if err != nil { + os.Remove(deviceMountPath) + return fmt.Errorf("rbd: failed to mount device %s at %s (fstype: %s), error %v", devicePath, deviceMountPath, fstype, err) + } + glog.V(3).Infof("rbd: successfully mount device %s at %s (fstype: %s)", devicePath, deviceMountPath, fstype) + return nil +} + +// rbdDetacher implements volume.Detacher interface. +type rbdDetacher struct { + plugin *rbdPlugin + manager diskManager +} + +var _ volume.Detacher = &rbdDetacher{} + +// UnmountDevice implements Detacher.UnmountDevice. It unmounts the global +// mount of the RBD image. This is called once all bind mounts have been +// unmounted. +// Internally, it does four things: +// - Unmount device from deviceMountPath. +// - Detach device from the node. +// - Remove lock if found. (No need to check volume readonly or not, because +// device is not on the node anymore, it's safe to remove lock.) +// - Remove the deviceMountPath at last. +// This method is idempotent, callers are responsible for retrying on failure. +func (detacher *rbdDetacher) UnmountDevice(deviceMountPath string) error { + if pathExists, pathErr := volutil.PathExists(deviceMountPath); pathErr != nil { + return fmt.Errorf("Error checking if path exists: %v", pathErr) + } else if !pathExists { + glog.Warningf("Warning: Unmount skipped because path does not exist: %v", deviceMountPath) + return nil + } + devicePath, cnt, err := mount.GetDeviceNameFromMount(detacher.plugin.mounter, deviceMountPath) + if err != nil { + return err + } + if cnt > 1 { + return fmt.Errorf("rbd: more than 1 reference counts at %s", deviceMountPath) + } + if cnt == 1 { + // Unmount the device from the device mount point. + glog.V(4).Infof("rbd: unmouting device mountpoint %s", deviceMountPath) + if err = detacher.plugin.mounter.Unmount(deviceMountPath); err != nil { + return err + } + glog.V(3).Infof("rbd: successfully umount device mountpath %s", deviceMountPath) + } + glog.V(4).Infof("rbd: detaching device %s", devicePath) + err = detacher.manager.DetachDisk(detacher.plugin, deviceMountPath, devicePath) + if err != nil { + return err + } + glog.V(3).Infof("rbd: successfully detach device %s", devicePath) + err = os.Remove(deviceMountPath) + if err != nil { + return err + } + glog.V(3).Infof("rbd: successfully remove device mount point %s", deviceMountPath) + return nil +} + +// Detach implements Detacher.Detach. +func (detacher *rbdDetacher) Detach(deviceName string, nodeName types.NodeName) error { + return nil +} diff --git a/pkg/volume/rbd/disk_manager.go b/pkg/volume/rbd/disk_manager.go index 24a090109fd..3ba83fe36cb 100644 --- a/pkg/volume/rbd/disk_manager.go +++ b/pkg/volume/rbd/disk_manager.go @@ -23,6 +23,7 @@ limitations under the License. package rbd import ( + "fmt" "os" "github.com/golang/glog" @@ -33,23 +34,33 @@ import ( // Abstract interface to disk operations. type diskManager interface { + // MakeGlobalPDName creates global persistent disk path. MakeGlobalPDName(disk rbd) string // Attaches the disk to the kubelet's host machine. - AttachDisk(disk rbdMounter) error + // If it successfully attaches, the path to the device + // is returned. Otherwise, an error will be returned. + AttachDisk(disk rbdMounter) (string, error) // Detaches the disk from the kubelet's host machine. - DetachDisk(disk rbdUnmounter, mntPath string) error - // Creates a rbd image + DetachDisk(plugin *rbdPlugin, deviceMountPath string, device string) error + // Creates a rbd image. CreateImage(provisioner *rbdVolumeProvisioner) (r *v1.RBDPersistentVolumeSource, volumeSizeGB int, err error) - // Deletes a rbd image + // Deletes a rbd image. DeleteImage(deleter *rbdVolumeDeleter) error } // utility to mount a disk based filesystem func diskSetUp(manager diskManager, b rbdMounter, volPath string, mounter mount.Interface, fsGroup *int64) error { globalPDPath := manager.MakeGlobalPDName(*b.rbd) - // TODO: handle failed mounts here. - notMnt, err := mounter.IsLikelyNotMountPoint(volPath) + notMnt, err := mounter.IsLikelyNotMountPoint(globalPDPath) + if err != nil && !os.IsNotExist(err) { + glog.Errorf("cannot validate mountpoint: %s", globalPDPath) + return err + } + if notMnt { + return fmt.Errorf("no device is mounted at %s", globalPDPath) + } + notMnt, err = mounter.IsLikelyNotMountPoint(volPath) if err != nil && !os.IsNotExist(err) { glog.Errorf("cannot validate mountpoint: %s", volPath) return err @@ -57,10 +68,6 @@ func diskSetUp(manager diskManager, b rbdMounter, volPath string, mounter mount. if !notMnt { return nil } - if err := manager.AttachDisk(b); err != nil { - glog.Errorf("failed to attach disk") - return err - } if err := os.MkdirAll(volPath, 0750); err != nil { glog.Errorf("failed to mkdir:%s", volPath) @@ -89,43 +96,31 @@ func diskSetUp(manager diskManager, b rbdMounter, volPath string, mounter mount. // utility to tear down a disk based filesystem func diskTearDown(manager diskManager, c rbdUnmounter, volPath string, mounter mount.Interface) error { notMnt, err := mounter.IsLikelyNotMountPoint(volPath) - if err != nil { - glog.Errorf("cannot validate mountpoint %s", volPath) + if err != nil && !os.IsNotExist(err) { + glog.Errorf("cannot validate mountpoint: %s", volPath) return err } if notMnt { + glog.V(3).Infof("volume path %s is not a mountpoint, deleting", volPath) return os.Remove(volPath) } - refs, err := mount.GetMountRefs(mounter, volPath) - if err != nil { - glog.Errorf("failed to get reference count %s", volPath) - return err - } + // Unmount the bind-mount inside this pod. if err := mounter.Unmount(volPath); err != nil { glog.Errorf("failed to umount %s", volPath) return err } - // If len(refs) is 1, then all bind mounts have been removed, and the - // remaining reference is the global mount. It is safe to detach. - if len(refs) == 1 { - mntPath := refs[0] - if err := manager.DetachDisk(c, mntPath); err != nil { - glog.Errorf("failed to detach disk from %s", mntPath) - return err - } - } notMnt, mntErr := mounter.IsLikelyNotMountPoint(volPath) - if mntErr != nil { + if err != nil && !os.IsNotExist(err) { glog.Errorf("IsLikelyNotMountPoint check failed: %v", mntErr) return err } if notMnt { if err := os.Remove(volPath); err != nil { + glog.V(2).Info("Error removing mountpoint ", volPath, ": ", err) return err } } return nil - } diff --git a/pkg/volume/rbd/rbd.go b/pkg/volume/rbd/rbd.go index 87ca9992f19..b3cf958a349 100644 --- a/pkg/volume/rbd/rbd.go +++ b/pkg/volume/rbd/rbd.go @@ -41,17 +41,21 @@ var ( // This is the primary entrypoint for volume plugins. func ProbeVolumePlugins() []volume.VolumePlugin { - return []volume.VolumePlugin{&rbdPlugin{nil}} + return []volume.VolumePlugin{&rbdPlugin{nil, nil, nil}} } +// rbdPlugin implements Volume.VolumePlugin. type rbdPlugin struct { - host volume.VolumeHost + host volume.VolumeHost + exec mount.Exec + mounter *mount.SafeFormatAndMount } var _ volume.VolumePlugin = &rbdPlugin{} var _ volume.PersistentVolumePlugin = &rbdPlugin{} var _ volume.DeletableVolumePlugin = &rbdPlugin{} var _ volume.ProvisionableVolumePlugin = &rbdPlugin{} +var _ volume.AttachableVolumePlugin = &rbdPlugin{} const ( rbdPluginName = "kubernetes.io/rbd" @@ -70,6 +74,8 @@ func getPath(uid types.UID, volName string, host volume.VolumeHost) string { func (plugin *rbdPlugin) Init(host volume.VolumeHost) error { plugin.host = host + plugin.exec = host.GetExec(plugin.GetPluginName()) + plugin.mounter = volumehelper.NewSafeFormatAndMountFromHost(plugin.GetPluginName(), plugin.host) return nil } @@ -120,6 +126,68 @@ func (plugin *rbdPlugin) GetAccessModes() []v1.PersistentVolumeAccessMode { } } +func (plugin *rbdPlugin) createMounterFromVolumeSpecAndPod(spec *volume.Spec, pod *v1.Pod) (*rbdMounter, error) { + var err error + mon, err := getVolumeSourceMonitors(spec) + if err != nil { + return nil, err + } + img, err := getVolumeSourceImage(spec) + if err != nil { + return nil, err + } + fstype, err := getVolumeSourceFSType(spec) + if err != nil { + return nil, err + } + pool, err := getVolumeSourcePool(spec) + if err != nil { + return nil, err + } + id, err := getVolumeSourceUser(spec) + if err != nil { + return nil, err + } + keyring, err := getVolumeSourceKeyRing(spec) + if err != nil { + return nil, err + } + ro, err := getVolumeSourceReadOnly(spec) + if err != nil { + return nil, err + } + + secretName, secretNs, err := getSecretNameAndNamespace(spec, pod.Namespace) + if err != nil { + return nil, err + } + secret := "" + if len(secretName) > 0 && len(secretNs) > 0 { + // if secret is provideded, retrieve it + kubeClient := plugin.host.GetKubeClient() + if kubeClient == nil { + return nil, fmt.Errorf("Cannot get kube client") + } + secrets, err := kubeClient.Core().Secrets(secretNs).Get(secretName, metav1.GetOptions{}) + if err != nil { + err = fmt.Errorf("Couldn't get secret %v/%v err: %v", secretNs, secretName, err) + return nil, err + } + for _, data := range secrets.Data { + secret = string(data) + } + } + + return &rbdMounter{ + rbd: newRBD("", spec.Name(), img, pool, ro, plugin, &RBDUtil{}), + Mon: mon, + Id: id, + Keyring: keyring, + Secret: secret, + fsType: fstype, + }, nil +} + func (plugin *rbdPlugin) NewMounter(spec *volume.Spec, pod *v1.Pod, _ volume.VolumeOptions) (volume.Mounter, error) { secretName, secretNs, err := getSecretNameAndNamespace(spec, pod.Namespace) if err != nil { @@ -143,10 +211,10 @@ func (plugin *rbdPlugin) NewMounter(spec *volume.Spec, pod *v1.Pod, _ volume.Vol } // Inject real implementations here, test through the internal function. - return plugin.newMounterInternal(spec, pod.UID, &RBDUtil{}, plugin.host.GetMounter(plugin.GetPluginName()), plugin.host.GetExec(plugin.GetPluginName()), secret) + return plugin.newMounterInternal(spec, pod.UID, &RBDUtil{}, secret) } -func (plugin *rbdPlugin) newMounterInternal(spec *volume.Spec, podUID types.UID, manager diskManager, mounter mount.Interface, exec mount.Exec, secret string) (volume.Mounter, error) { +func (plugin *rbdPlugin) newMounterInternal(spec *volume.Spec, podUID types.UID, manager diskManager, secret string) (volume.Mounter, error) { mon, err := getVolumeSourceMonitors(spec) if err != nil { return nil, err @@ -177,18 +245,7 @@ func (plugin *rbdPlugin) newMounterInternal(spec *volume.Spec, podUID types.UID, } return &rbdMounter{ - rbd: &rbd{ - podUID: podUID, - volName: spec.Name(), - Image: img, - Pool: pool, - ReadOnly: ro, - manager: manager, - mounter: &mount.SafeFormatAndMount{Interface: mounter, Exec: exec}, - exec: exec, - plugin: plugin, - MetricsProvider: volume.NewMetricsStatFS(getPath(podUID, spec.Name(), plugin.host)), - }, + rbd: newRBD(podUID, spec.Name(), img, pool, ro, plugin, manager), Mon: mon, Id: id, Keyring: keyring, @@ -200,21 +257,13 @@ func (plugin *rbdPlugin) newMounterInternal(spec *volume.Spec, podUID types.UID, func (plugin *rbdPlugin) NewUnmounter(volName string, podUID types.UID) (volume.Unmounter, error) { // Inject real implementations here, test through the internal function. - return plugin.newUnmounterInternal(volName, podUID, &RBDUtil{}, plugin.host.GetMounter(plugin.GetPluginName()), plugin.host.GetExec(plugin.GetPluginName())) + return plugin.newUnmounterInternal(volName, podUID, &RBDUtil{}) } -func (plugin *rbdPlugin) newUnmounterInternal(volName string, podUID types.UID, manager diskManager, mounter mount.Interface, exec mount.Exec) (volume.Unmounter, error) { +func (plugin *rbdPlugin) newUnmounterInternal(volName string, podUID types.UID, manager diskManager) (volume.Unmounter, error) { return &rbdUnmounter{ rbdMounter: &rbdMounter{ - rbd: &rbd{ - podUID: podUID, - volName: volName, - manager: manager, - mounter: &mount.SafeFormatAndMount{Interface: mounter, Exec: exec}, - exec: exec, - plugin: plugin, - MetricsProvider: volume.NewMetricsStatFS(getPath(podUID, volName, plugin.host)), - }, + rbd: newRBD(podUID, volName, "", "", false, plugin, manager), Mon: make([]string, 0), }, }, nil @@ -268,15 +317,7 @@ func (plugin *rbdPlugin) NewDeleter(spec *volume.Spec) (volume.Deleter, error) { func (plugin *rbdPlugin) newDeleterInternal(spec *volume.Spec, admin, secret string, manager diskManager) (volume.Deleter, error) { return &rbdVolumeDeleter{ rbdMounter: &rbdMounter{ - rbd: &rbd{ - volName: spec.Name(), - Image: spec.PersistentVolume.Spec.RBD.RBDImage, - Pool: spec.PersistentVolume.Spec.RBD.RBDPool, - manager: manager, - plugin: plugin, - mounter: &mount.SafeFormatAndMount{Interface: plugin.host.GetMounter(plugin.GetPluginName())}, - exec: plugin.host.GetExec(plugin.GetPluginName()), - }, + rbd: newRBD("", spec.Name(), spec.PersistentVolume.Spec.RBD.RBDImage, spec.PersistentVolume.Spec.RBD.RBDPool, false, plugin, manager), Mon: spec.PersistentVolume.Spec.RBD.CephMonitors, adminId: admin, adminSecret: secret, @@ -290,22 +331,20 @@ func (plugin *rbdPlugin) NewProvisioner(options volume.VolumeOptions) (volume.Pr func (plugin *rbdPlugin) newProvisionerInternal(options volume.VolumeOptions, manager diskManager) (volume.Provisioner, error) { return &rbdVolumeProvisioner{ rbdMounter: &rbdMounter{ - rbd: &rbd{ - manager: manager, - plugin: plugin, - mounter: &mount.SafeFormatAndMount{Interface: plugin.host.GetMounter(plugin.GetPluginName())}, - exec: plugin.host.GetExec(plugin.GetPluginName()), - }, + rbd: newRBD("", "", "", "", false, plugin, manager), }, options: options, }, nil } +// rbdVolumeProvisioner implements volume.Provisioner interface. type rbdVolumeProvisioner struct { *rbdMounter options volume.VolumeOptions } +var _ volume.Provisioner = &rbdVolumeProvisioner{} + func (r *rbdVolumeProvisioner) Provision() (*v1.PersistentVolume, error) { if !volume.AccessModesContainedInAll(r.plugin.GetAccessModes(), r.options.PVC.Spec.AccessModes) { return nil, fmt.Errorf("invalid AccessModes %v: only AccessModes %v are supported", r.options.PVC.Spec.AccessModes, r.plugin.GetAccessModes()) @@ -419,10 +458,13 @@ func (r *rbdVolumeProvisioner) Provision() (*v1.PersistentVolume, error) { return pv, nil } +// rbdVolumeDeleter implements volume.Deleter interface. type rbdVolumeDeleter struct { *rbdMounter } +var _ volume.Deleter = &rbdVolumeDeleter{} + func (r *rbdVolumeDeleter) GetPath() string { return getPath(r.podUID, r.volName, r.plugin.host) } @@ -431,6 +473,8 @@ func (r *rbdVolumeDeleter) Delete() error { return r.manager.DeleteImage(r) } +// rbd implmenets volume.Volume interface. +// It's embedded in Mounter/Unmounter/Deleter. type rbd struct { volName string podUID types.UID @@ -445,11 +489,35 @@ type rbd struct { volume.MetricsProvider `json:"-"` } +var _ volume.Volume = &rbd{} + func (rbd *rbd) GetPath() string { // safe to use PodVolumeDir now: volume teardown occurs before pod is cleaned up return getPath(rbd.podUID, rbd.volName, rbd.plugin.host) } +// newRBD creates a new rbd. +func newRBD(podUID types.UID, volName string, image string, pool string, readOnly bool, plugin *rbdPlugin, manager diskManager) *rbd { + return &rbd{ + podUID: podUID, + volName: volName, + Image: image, + Pool: pool, + ReadOnly: readOnly, + plugin: plugin, + mounter: plugin.mounter, + exec: plugin.exec, + manager: manager, + MetricsProvider: volume.NewMetricsStatFS(getPath(podUID, volName, plugin.host)), + } +} + +// rbdMounter implements volume.Mounter interface. +// It contains information which need to be persisted in whole life cycle of PV +// on the node. It is persisted at the very beginning in the pod mount point +// directory. +// Note: Capitalized field names of this struct determines the information +// persisted on the disk, DO NOT change them. (TODO: refactoring to use a dedicated struct?) type rbdMounter struct { *rbd // capitalized so they can be exported in persistRBD() @@ -488,14 +556,16 @@ func (b *rbdMounter) SetUp(fsGroup *int64) error { func (b *rbdMounter) SetUpAt(dir string, fsGroup *int64) error { // diskSetUp checks mountpoints and prevent repeated calls - glog.V(4).Infof("rbd: attempting to SetUp and mount %s", dir) + glog.V(4).Infof("rbd: attempting to setup at %s", dir) err := diskSetUp(b.manager, *b, dir, b.mounter, fsGroup) if err != nil { - glog.Errorf("rbd: failed to setup mount %s %v", dir, err) + glog.Errorf("rbd: failed to setup at %s %v", dir, err) } + glog.V(3).Infof("rbd: successfully setup at %s", dir) return err } +// rbdUnmounter implements volume.Unmounter interface. type rbdUnmounter struct { *rbdMounter } @@ -509,13 +579,19 @@ func (c *rbdUnmounter) TearDown() error { } func (c *rbdUnmounter) TearDownAt(dir string) error { + glog.V(4).Infof("rbd: attempting to teardown at %s", dir) if pathExists, pathErr := volutil.PathExists(dir); pathErr != nil { return fmt.Errorf("Error checking if path exists: %v", pathErr) } else if !pathExists { glog.Warningf("Warning: Unmount skipped because path does not exist: %v", dir) return nil } - return diskTearDown(c.manager, *c, dir, c.mounter) + err := diskTearDown(c.manager, *c, dir, c.mounter) + if err != nil { + return err + } + glog.V(3).Infof("rbd: successfully teardown at %s", dir) + return nil } func getVolumeSourceMonitors(spec *volume.Spec) ([]string, error) { diff --git a/pkg/volume/rbd/rbd_test.go b/pkg/volume/rbd/rbd_test.go index 8a0a098de36..4f67311918b 100644 --- a/pkg/volume/rbd/rbd_test.go +++ b/pkg/volume/rbd/rbd_test.go @@ -18,17 +18,17 @@ package rbd import ( "fmt" - "io/ioutil" "os" - "path/filepath" "reflect" "strings" + "sync" "testing" + "time" - "github.com/stretchr/testify/assert" "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/types" + "k8s.io/apimachinery/pkg/util/uuid" "k8s.io/client-go/kubernetes/fake" utiltesting "k8s.io/client-go/util/testing" "k8s.io/kubernetes/pkg/util/mount" @@ -59,37 +59,43 @@ func TestCanSupport(t *testing.T) { } type fakeDiskManager struct { - tmpDir string + // Make sure we can run tests in parallel. + mutex sync.RWMutex + // Key format: "/" + rbdImageLocks map[string]bool + rbdMapIndex int + rbdDevices map[string]bool } func NewFakeDiskManager() *fakeDiskManager { return &fakeDiskManager{ - tmpDir: utiltesting.MkTmpdirOrDie("rbd_test"), + rbdImageLocks: make(map[string]bool), + rbdMapIndex: 0, + rbdDevices: make(map[string]bool), } } -func (fake *fakeDiskManager) Cleanup() { - os.RemoveAll(fake.tmpDir) +func (fake *fakeDiskManager) MakeGlobalPDName(rbd rbd) string { + return makePDNameInternal(rbd.plugin.host, rbd.Pool, rbd.Image) } -func (fake *fakeDiskManager) MakeGlobalPDName(disk rbd) string { - return fake.tmpDir -} -func (fake *fakeDiskManager) AttachDisk(b rbdMounter) error { - globalPath := b.manager.MakeGlobalPDName(*b.rbd) - err := os.MkdirAll(globalPath, 0750) - if err != nil { - return err - } - return nil +func (fake *fakeDiskManager) AttachDisk(b rbdMounter) (string, error) { + fake.mutex.Lock() + defer fake.mutex.Unlock() + fake.rbdMapIndex += 1 + devicePath := fmt.Sprintf("/dev/rbd%d", fake.rbdMapIndex) + fake.rbdDevices[devicePath] = true + return devicePath, nil } -func (fake *fakeDiskManager) DetachDisk(c rbdUnmounter, mntPath string) error { - globalPath := c.manager.MakeGlobalPDName(*c.rbd) - err := os.RemoveAll(globalPath) - if err != nil { - return err +func (fake *fakeDiskManager) DetachDisk(r *rbdPlugin, deviceMountPath string, device string) error { + fake.mutex.Lock() + defer fake.mutex.Unlock() + ok := fake.rbdDevices[device] + if !ok { + return fmt.Errorf("rbd: failed to detach device %s, it does not exist", device) } + delete(fake.rbdDevices, device) return nil } @@ -101,35 +107,111 @@ func (fake *fakeDiskManager) DeleteImage(deleter *rbdVolumeDeleter) error { return fmt.Errorf("not implemented") } -func doTestPlugin(t *testing.T, spec *volume.Spec) { - tmpDir, err := utiltesting.MkTmpdir("rbd_test") - if err != nil { - t.Fatalf("error creating temp dir: %v", err) +func (fake *fakeDiskManager) Fencing(r rbdMounter, nodeName string) error { + fake.mutex.Lock() + defer fake.mutex.Unlock() + key := fmt.Sprintf("%s/%s", r.Pool, r.Image) + isLocked, ok := fake.rbdImageLocks[key] + if ok && isLocked { + // not expected in testing + return fmt.Errorf("%s is already locked", key) } - defer os.RemoveAll(tmpDir) + fake.rbdImageLocks[key] = true + return nil +} +func (fake *fakeDiskManager) Defencing(r rbdMounter, nodeName string) error { + fake.mutex.Lock() + defer fake.mutex.Unlock() + key := fmt.Sprintf("%s/%s", r.Pool, r.Image) + isLocked, ok := fake.rbdImageLocks[key] + if !ok || !isLocked { + // not expected in testing + return fmt.Errorf("%s is not locked", key) + } + delete(fake.rbdImageLocks, key) + return nil +} + +func (fake *fakeDiskManager) IsLocked(r rbdMounter, nodeName string) (bool, error) { + fake.mutex.RLock() + defer fake.mutex.RUnlock() + key := fmt.Sprintf("%s/%s", r.Pool, r.Image) + isLocked, ok := fake.rbdImageLocks[key] + return ok && isLocked, nil +} + +// checkMounterLog checks fakeMounter must have expected logs, and the last action msut equal to expectedAction. +func checkMounterLog(t *testing.T, fakeMounter *mount.FakeMounter, expected int, expectedAction mount.FakeAction) { + if len(fakeMounter.Log) != expected { + t.Fatalf("fakeMounter should have %d logs, actual: %d", expected, len(fakeMounter.Log)) + } + lastIndex := len(fakeMounter.Log) - 1 + lastAction := fakeMounter.Log[lastIndex] + if !reflect.DeepEqual(expectedAction, lastAction) { + t.Fatalf("fakeMounter.Log[%d] should be %v, not: %v", lastIndex, expectedAction, lastAction) + } +} + +func doTestPlugin(t *testing.T, c *testcase) { + fakeVolumeHost := volumetest.NewFakeVolumeHost(c.root, nil, nil) plugMgr := volume.VolumePluginMgr{} - plugMgr.InitPlugins(ProbeVolumePlugins(), nil /* prober */, volumetest.NewFakeVolumeHost(tmpDir, nil, nil)) - + plugMgr.InitPlugins(ProbeVolumePlugins(), nil /* prober */, fakeVolumeHost) plug, err := plugMgr.FindPluginByName("kubernetes.io/rbd") if err != nil { t.Errorf("Can't find the plugin by name") } + fakeMounter := fakeVolumeHost.GetMounter(plug.GetPluginName()).(*mount.FakeMounter) + fakeNodeName := types.NodeName("localhost") fdm := NewFakeDiskManager() - defer fdm.Cleanup() - exec := mount.NewFakeExec(nil) - mounter, err := plug.(*rbdPlugin).newMounterInternal(spec, types.UID("poduid"), fdm, &mount.FakeMounter{}, exec, "secrets") + + // attacher + attacher, err := plug.(*rbdPlugin).newAttacherInternal(fdm) + if err != nil { + t.Errorf("Failed to make a new Attacher: %v", err) + } + deviceAttachPath, err := attacher.Attach(c.spec, fakeNodeName) + if err != nil { + t.Fatal(err) + } + devicePath, err := attacher.WaitForAttach(c.spec, deviceAttachPath, c.pod, time.Second*10) + if err != nil { + t.Fatal(err) + } + if devicePath != c.expectedDevicePath { + t.Errorf("Unexpected path, expected %q, not: %q", c.expectedDevicePath, devicePath) + } + deviceMountPath, err := attacher.GetDeviceMountPath(c.spec) + if err != nil { + t.Fatal(err) + } + if deviceMountPath != c.expectedDeviceMountPath { + t.Errorf("Unexpected mount path, expected %q, not: %q", c.expectedDeviceMountPath, deviceMountPath) + } + err = attacher.MountDevice(c.spec, devicePath, deviceMountPath) + if err != nil { + t.Fatal(err) + } + if _, err := os.Stat(deviceMountPath); err != nil { + if os.IsNotExist(err) { + t.Errorf("Attacher.MountDevice() failed, device mount path not created: %s", deviceMountPath) + } else { + t.Errorf("Attacher.MountDevice() failed: %v", err) + } + } + checkMounterLog(t, fakeMounter, 1, mount.FakeAction{Action: "mount", Target: c.expectedDeviceMountPath, Source: devicePath, FSType: "ext4"}) + + // mounter + mounter, err := plug.(*rbdPlugin).newMounterInternal(c.spec, c.pod.UID, fdm, "secrets") if err != nil { t.Errorf("Failed to make a new Mounter: %v", err) } if mounter == nil { t.Error("Got a nil Mounter") } - path := mounter.GetPath() - expectedPath := fmt.Sprintf("%s/pods/poduid/volumes/kubernetes.io~rbd/vol1", tmpDir) - if path != expectedPath { - t.Errorf("Unexpected path, expected %q, got: %q", expectedPath, path) + if path != c.expectedPodMountPath { + t.Errorf("Unexpected path, expected %q, got: %q", c.expectedPodMountPath, path) } if err := mounter.SetUp(nil); err != nil { @@ -142,8 +224,10 @@ func doTestPlugin(t *testing.T, spec *volume.Spec) { t.Errorf("SetUp() failed: %v", err) } } + checkMounterLog(t, fakeMounter, 2, mount.FakeAction{Action: "mount", Target: c.expectedPodMountPath, Source: devicePath, FSType: ""}) - unmounter, err := plug.(*rbdPlugin).newUnmounterInternal("vol1", types.UID("poduid"), fdm, &mount.FakeMounter{}, exec) + // unmounter + unmounter, err := plug.(*rbdPlugin).newUnmounterInternal(c.spec.Name(), c.pod.UID, fdm) if err != nil { t.Errorf("Failed to make a new Unmounter: %v", err) } @@ -159,38 +243,98 @@ func doTestPlugin(t *testing.T, spec *volume.Spec) { } else if !os.IsNotExist(err) { t.Errorf("SetUp() failed: %v", err) } + checkMounterLog(t, fakeMounter, 3, mount.FakeAction{Action: "unmount", Target: c.expectedPodMountPath, Source: "", FSType: ""}) + + // detacher + detacher, err := plug.(*rbdPlugin).newDetacherInternal(fdm) + if err != nil { + t.Errorf("Failed to make a new Attacher: %v", err) + } + err = detacher.UnmountDevice(deviceMountPath) + if err != nil { + t.Fatalf("Detacher.UnmountDevice failed to unmount %s", deviceMountPath) + } + checkMounterLog(t, fakeMounter, 4, mount.FakeAction{Action: "unmount", Target: c.expectedDeviceMountPath, Source: "", FSType: ""}) + err = detacher.Detach(deviceMountPath, fakeNodeName) + if err != nil { + t.Fatalf("Detacher.Detach failed to detach %s from %s", deviceMountPath, fakeNodeName) + } } -func TestPluginVolume(t *testing.T) { - vol := &v1.Volume{ - Name: "vol1", - VolumeSource: v1.VolumeSource{ - RBD: &v1.RBDVolumeSource{ - CephMonitors: []string{"a", "b"}, - RBDImage: "bar", - FSType: "ext4", - }, - }, - } - doTestPlugin(t, volume.NewSpecFromVolume(vol)) +type testcase struct { + spec *volume.Spec + root string + pod *v1.Pod + expectedDevicePath string + expectedDeviceMountPath string + expectedPodMountPath string } -func TestPluginPersistentVolume(t *testing.T) { - vol := &v1.PersistentVolume{ - ObjectMeta: metav1.ObjectMeta{ + +func TestPlugin(t *testing.T) { + tmpDir, err := utiltesting.MkTmpdir("rbd_test") + if err != nil { + t.Fatalf("error creating temp dir: %v", err) + } + defer os.RemoveAll(tmpDir) + + podUID := uuid.NewUUID() + var cases []*testcase + cases = append(cases, &testcase{ + spec: volume.NewSpecFromVolume(&v1.Volume{ Name: "vol1", - }, - Spec: v1.PersistentVolumeSpec{ - PersistentVolumeSource: v1.PersistentVolumeSource{ - RBD: &v1.RBDPersistentVolumeSource{ + VolumeSource: v1.VolumeSource{ + RBD: &v1.RBDVolumeSource{ CephMonitors: []string{"a", "b"}, - RBDImage: "bar", + RBDPool: "pool1", + RBDImage: "image1", FSType: "ext4", }, }, + }), + root: tmpDir, + pod: &v1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: "testpod", + Namespace: "testns", + UID: podUID, + }, }, - } + expectedDevicePath: "/dev/rbd1", + expectedDeviceMountPath: fmt.Sprintf("%s/plugins/kubernetes.io/rbd/rbd/pool1-image-image1", tmpDir), + expectedPodMountPath: fmt.Sprintf("%s/pods/%s/volumes/kubernetes.io~rbd/vol1", tmpDir, podUID), + }) + cases = append(cases, &testcase{ + spec: volume.NewSpecFromPersistentVolume(&v1.PersistentVolume{ + ObjectMeta: metav1.ObjectMeta{ + Name: "vol2", + }, + Spec: v1.PersistentVolumeSpec{ + PersistentVolumeSource: v1.PersistentVolumeSource{ + RBD: &v1.RBDPersistentVolumeSource{ + CephMonitors: []string{"a", "b"}, + RBDPool: "pool2", + RBDImage: "image2", + FSType: "ext4", + }, + }, + }, + }, false), + root: tmpDir, + pod: &v1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: "testpod", + Namespace: "testns", + UID: podUID, + }, + }, + expectedDevicePath: "/dev/rbd1", + expectedDeviceMountPath: fmt.Sprintf("%s/plugins/kubernetes.io/rbd/rbd/pool2-image-image2", tmpDir), + expectedPodMountPath: fmt.Sprintf("%s/pods/%s/volumes/kubernetes.io~rbd/vol2", tmpDir, podUID), + }) - doTestPlugin(t, volume.NewSpecFromPersistentVolume(vol, false)) + for i := 0; i < len(cases); i++ { + doTestPlugin(t, cases[i]) + } } func TestPersistentClaimReadOnlyFlag(t *testing.T) { @@ -250,87 +394,6 @@ func TestPersistentClaimReadOnlyFlag(t *testing.T) { } } -func TestPersistAndLoadRBD(t *testing.T) { - tmpDir, err := utiltesting.MkTmpdir("rbd_test") - if err != nil { - t.Fatalf("error creating temp dir: %v", err) - } - defer os.RemoveAll(tmpDir) - - testcases := []struct { - rbdMounter rbdMounter - expectedJSONStr string - expectedLoadedRBDMounter rbdMounter - }{ - { - rbdMounter{}, - `{"Mon":null,"Id":"","Keyring":"","Secret":""}`, - rbdMounter{}, - }, - { - rbdMounter{ - rbd: &rbd{ - podUID: "poduid", - Pool: "kube", - Image: "some-test-image", - ReadOnly: false, - MetricsProvider: volume.NewMetricsStatFS("/tmp"), - }, - Mon: []string{"127.0.0.1"}, - Id: "kube", - Keyring: "", - Secret: "QVFEcTdKdFp4SmhtTFJBQUNwNDI3UnhGRzBvQ1Y0SUJwLy9pRUE9PQ==", - }, - ` -{ - "Pool": "kube", - "Image": "some-test-image", - "ReadOnly": false, - "Mon": ["127.0.0.1"], - "Id": "kube", - "Keyring": "", - "Secret": "QVFEcTdKdFp4SmhtTFJBQUNwNDI3UnhGRzBvQ1Y0SUJwLy9pRUE9PQ==" -} - `, - rbdMounter{ - rbd: &rbd{ - Pool: "kube", - Image: "some-test-image", - ReadOnly: false, - }, - Mon: []string{"127.0.0.1"}, - Id: "kube", - Keyring: "", - Secret: "QVFEcTdKdFp4SmhtTFJBQUNwNDI3UnhGRzBvQ1Y0SUJwLy9pRUE9PQ==", - }, - }, - } - - util := &RBDUtil{} - for _, c := range testcases { - err = util.persistRBD(c.rbdMounter, tmpDir) - if err != nil { - t.Errorf("failed to persist rbd: %v, err: %v", c.rbdMounter, err) - } - jsonFile := filepath.Join(tmpDir, "rbd.json") - jsonData, err := ioutil.ReadFile(jsonFile) - if err != nil { - t.Errorf("failed to read json file %s: %v", jsonFile, err) - } - if !assert.JSONEq(t, c.expectedJSONStr, string(jsonData)) { - t.Errorf("json file does not match expected one: %s, should be %s", string(jsonData), c.expectedJSONStr) - } - tmpRBDMounter := rbdMounter{} - err = util.loadRBD(&tmpRBDMounter, tmpDir) - if err != nil { - t.Errorf("faild to load rbd: %v", err) - } - if !reflect.DeepEqual(tmpRBDMounter, c.expectedLoadedRBDMounter) { - t.Errorf("loaded rbd does not equal to expected one: %v, should be %v", tmpRBDMounter, c.rbdMounter) - } - } -} - func TestGetSecretNameAndNamespace(t *testing.T) { secretName := "test-secret-name" secretNamespace := "test-secret-namespace" diff --git a/pkg/volume/rbd/rbd_util.go b/pkg/volume/rbd/rbd_util.go index 8ec9dfa6a42..edf0efbb290 100644 --- a/pkg/volume/rbd/rbd_util.go +++ b/pkg/volume/rbd/rbd_util.go @@ -35,9 +35,10 @@ import ( "github.com/golang/glog" "k8s.io/api/core/v1" - "k8s.io/kubernetes/pkg/util/mount" + fileutil "k8s.io/kubernetes/pkg/util/file" "k8s.io/kubernetes/pkg/util/node" "k8s.io/kubernetes/pkg/volume" + volutil "k8s.io/kubernetes/pkg/volume/util" ) const ( @@ -107,11 +108,15 @@ func makePDNameInternal(host volume.VolumeHost, pool string, image string) strin return path.Join(host.GetPluginDir(rbdPluginName), "rbd", pool+"-image-"+image) } +// RBDUtil implements diskManager interface. type RBDUtil struct{} +var _ diskManager = &RBDUtil{} + func (util *RBDUtil) MakeGlobalPDName(rbd rbd) string { return makePDNameInternal(rbd.plugin.host, rbd.Pool, rbd.Image) } + func rbdErrors(runErr, resultErr error) error { if runErr.Error() == rbdCmdErr { return fmt.Errorf("rbd: rbd cmd not found") @@ -119,6 +124,8 @@ func rbdErrors(runErr, resultErr error) error { return resultErr } +// rbdLock acquires a lock on image if lock is true, otherwise releases if a +// lock is found on image. func (util *RBDUtil) rbdLock(b rbdMounter, lock bool) error { var err error var output, locker string @@ -188,6 +195,9 @@ func (util *RBDUtil) rbdLock(b rbdMounter, lock bool) error { args := []string{"lock", "add", b.Image, lock_id, "--pool", b.Pool, "--id", b.Id, "-m", mon} args = append(args, secret_opt...) cmd, err = b.exec.Run("rbd", args...) + if err == nil { + glog.V(4).Infof("rbd: successfully add lock (locker_id: %s) on image: %s/%s with id %s mon %s", lock_id, b.Pool, b.Image, b.Id, mon) + } } else { // defencing, find locker name ind := strings.LastIndex(output, lock_id) - 1 @@ -197,85 +207,38 @@ func (util *RBDUtil) rbdLock(b rbdMounter, lock bool) error { break } } - // remove a lock: rbd lock remove - args := []string{"lock", "remove", b.Image, lock_id, locker, "--pool", b.Pool, "--id", b.Id, "-m", mon} - args = append(args, secret_opt...) - cmd, err = b.exec.Run("rbd", args...) + // remove a lock if found: rbd lock remove + if len(locker) > 0 { + args := []string{"lock", "remove", b.Image, lock_id, locker, "--pool", b.Pool, "--id", b.Id, "-m", mon} + args = append(args, secret_opt...) + cmd, err = b.exec.Run("rbd", args...) + if err == nil { + glog.V(4).Infof("rbd: successfully remove lock (locker_id: %s) on image: %s/%s with id %s mon %s", lock_id, b.Pool, b.Image, b.Id, mon) + } + } } if err == nil { - //lock is acquired + // break if operation succeeds break } } return err } -func (util *RBDUtil) persistRBD(rbd rbdMounter, mnt string) error { - file := path.Join(mnt, "rbd.json") - fp, err := os.Create(file) - if err != nil { - return fmt.Errorf("rbd: create err %s/%s", file, err) - } - defer fp.Close() - - encoder := json.NewEncoder(fp) - if err = encoder.Encode(rbd); err != nil { - return fmt.Errorf("rbd: encode err: %v.", err) - } - - return nil -} - -func (util *RBDUtil) loadRBD(mounter *rbdMounter, mnt string) error { - file := path.Join(mnt, "rbd.json") - fp, err := os.Open(file) - if err != nil { - return fmt.Errorf("rbd: open err %s/%s", file, err) - } - defer fp.Close() - - decoder := json.NewDecoder(fp) - if err = decoder.Decode(mounter); err != nil { - return fmt.Errorf("rbd: decode err: %v.", err) - } - - return nil -} - -func (util *RBDUtil) fencing(b rbdMounter) error { - // no need to fence readOnly - if (&b).GetAttributes().ReadOnly { - return nil - } - return util.rbdLock(b, true) -} - -func (util *RBDUtil) defencing(c rbdUnmounter) error { - // no need to fence readOnly - if c.ReadOnly { - return nil - } - - return util.rbdLock(*c.rbdMounter, false) -} - -func (util *RBDUtil) AttachDisk(b rbdMounter) error { +// AttachDisk attaches the disk on the node. +// If Volume is not read-only, acquire a lock on image first. +func (util *RBDUtil) AttachDisk(b rbdMounter) (string, error) { var err error var output []byte - // create mount point - globalPDPath := b.manager.MakeGlobalPDName(*b.rbd) - notMnt, err := b.mounter.IsLikelyNotMountPoint(globalPDPath) - // in the first time, the path shouldn't exist and IsLikelyNotMountPoint is expected to get NotExist - if err != nil && !os.IsNotExist(err) { - return fmt.Errorf("rbd: %s failed to check mountpoint", globalPDPath) - } - if !notMnt { - return nil - } - if err = os.MkdirAll(globalPDPath, 0750); err != nil { - return fmt.Errorf("rbd: failed to mkdir %s, error", globalPDPath) + globalPDPath := util.MakeGlobalPDName(*b.rbd) + if pathExists, pathErr := volutil.PathExists(globalPDPath); pathErr != nil { + return "", fmt.Errorf("Error checking if path exists: %v", pathErr) + } else if !pathExists { + if err := os.MkdirAll(globalPDPath, 0750); err != nil { + return "", err + } } devicePath, found := waitForPath(b.Pool, b.Image, 1) @@ -285,16 +248,16 @@ func (util *RBDUtil) AttachDisk(b rbdMounter) error { glog.Warningf("rbd: failed to load rbd kernel module:%v", err) } - // fence off other mappers - if err = util.fencing(b); err != nil { - return rbdErrors(err, fmt.Errorf("rbd: failed to lock image %s (maybe locked by other nodes), error %v", b.Image, err)) + // Currently, we don't acquire advisory lock on image, but for backward + // compatibility, we need to check if the image is being used by nodes running old kubelet. + found, err := util.rbdStatus(&b) + if err != nil { + return "", err + } + if found { + glog.Info("rbd is still being used ", b.Image) + return "", fmt.Errorf("rbd %s is still being used", b.Image) } - // rbd lock remove needs ceph and image config - // but kubelet doesn't get them from apiserver during teardown - // so persit rbd config so upon disk detach, rbd lock can be removed - // since rbd json is persisted in the same local directory that is used as rbd mountpoint later, - // the json file remains invisible during rbd mount and thus won't be removed accidentally. - util.persistRBD(b, globalPDPath) // rbd map l := len(b.Mon) @@ -317,51 +280,80 @@ func (util *RBDUtil) AttachDisk(b rbdMounter) error { glog.V(1).Infof("rbd: map error %v %s", err, string(output)) } if err != nil { - return fmt.Errorf("rbd: map failed %v %s", err, string(output)) + return "", fmt.Errorf("rbd: map failed %v %s", err, string(output)) } devicePath, found = waitForPath(b.Pool, b.Image, 10) if !found { - return errors.New("Could not map image: Timeout after 10s") + return "", errors.New("Could not map image: Timeout after 10s") } - glog.V(3).Infof("rbd: successfully map image %s/%s to %s", b.Pool, b.Image, devicePath) } - - // mount it - if err = b.mounter.FormatAndMount(devicePath, globalPDPath, b.fsType, nil); err != nil { - err = fmt.Errorf("rbd: failed to mount rbd volume %s [%s] to %s, error %v", devicePath, b.fsType, globalPDPath, err) - } - glog.V(3).Infof("rbd: successfully mount image %s/%s at %s", b.Pool, b.Image, globalPDPath) - return err + return devicePath, err } -func (util *RBDUtil) DetachDisk(c rbdUnmounter, mntPath string) error { - device, cnt, err := mount.GetDeviceNameFromMount(c.mounter, mntPath) - if err != nil { - return fmt.Errorf("rbd detach disk: failed to get device from mnt: %s\nError: %v", mntPath, err) - } - if err = c.mounter.Unmount(mntPath); err != nil { - return fmt.Errorf("rbd detach disk: failed to umount: %s\nError: %v", mntPath, err) - } - glog.V(3).Infof("rbd: successfully umount mountpoint %s", mntPath) - // if device is no longer used, see if can unmap - if cnt <= 1 { +// DetachDisk detaches the disk from the node. +// It detaches device from the node if device is provided, and removes the lock +// if there is persisted RBD info under deviceMountPath. +func (util *RBDUtil) DetachDisk(plugin *rbdPlugin, deviceMountPath string, device string) error { + var err error + if len(device) > 0 { // rbd unmap - _, err = c.exec.Run("rbd", "unmap", device) + _, err = plugin.exec.Run("rbd", "unmap", device) if err != nil { return rbdErrors(err, fmt.Errorf("rbd: failed to unmap device %s:Error: %v", device, err)) } - - // load ceph and image/pool info to remove fencing - if err := util.loadRBD(c.rbdMounter, mntPath); err == nil { - // remove rbd lock - util.defencing(c) - } - glog.V(3).Infof("rbd: successfully unmap device %s", device) } + // Currently, we don't persist rbd info on the disk, but for backward + // compatbility, we need to clean it if found. + rbdFile := path.Join(deviceMountPath, "rbd.json") + exists, err := fileutil.FileExists(rbdFile) + if err != nil { + return err + } + if exists { + glog.V(3).Infof("rbd: old rbd.json is found under %s, cleaning it", deviceMountPath) + err = util.cleanOldRBDFile(plugin, rbdFile) + if err != nil { + glog.Errorf("rbd: failed to clean %s", rbdFile) + return err + } + glog.V(3).Infof("rbd: successfully remove %s", rbdFile) + } return nil } +// cleanOldRBDFile read rbd info from rbd.json file and removes lock if found. +// At last, it removes rbd.json file. +func (util *RBDUtil) cleanOldRBDFile(plugin *rbdPlugin, rbdFile string) error { + mounter := &rbdMounter{ + // util.rbdLock needs it to run command. + rbd: newRBD("", "", "", "", false, plugin, util), + } + fp, err := os.Open(rbdFile) + if err != nil { + return fmt.Errorf("rbd: open err %s/%s", rbdFile, err) + } + defer fp.Close() + + decoder := json.NewDecoder(fp) + if err = decoder.Decode(mounter); err != nil { + return fmt.Errorf("rbd: decode err: %v.", err) + } + + if err != nil { + glog.Errorf("failed to load rbd info from %s: %v", rbdFile, err) + return err + } + // remove rbd lock if found + // the disk is not attached to this node anymore, so the lock on image + // for this node can be removed safely + err = util.rbdLock(*mounter, false) + if err == nil { + os.Remove(rbdFile) + } + return err +} + func (util *RBDUtil) CreateImage(p *rbdVolumeProvisioner) (r *v1.RBDPersistentVolumeSource, size int, err error) { var output []byte capacity := p.options.PVC.Spec.Resources.Requests[v1.ResourceName(v1.ResourceStorage)] @@ -442,6 +434,14 @@ func (util *RBDUtil) rbdStatus(b *rbdMounter) (bool, error) { var output string var cmd []byte + // If we don't have admin id/secret (e.g. attaching), fallback to user id/secret. + id := b.adminId + secret := b.adminSecret + if id == "" { + id = b.Id + secret = b.Secret + } + l := len(b.Mon) start := rand.Int() % l // iterate all hosts until rbd command succeeds. @@ -461,9 +461,9 @@ func (util *RBDUtil) rbdStatus(b *rbdMounter) (bool, error) { // # image does not exist (exit=2) // rbd: error opening image kubernetes-dynamic-pvc-: (2) No such file or directory // - glog.V(4).Infof("rbd: status %s using mon %s, pool %s id %s key %s", b.Image, mon, b.Pool, b.adminId, b.adminSecret) + glog.V(4).Infof("rbd: status %s using mon %s, pool %s id %s key %s", b.Image, mon, b.Pool, id, secret) cmd, err = b.exec.Run("rbd", - "status", b.Image, "--pool", b.Pool, "-m", mon, "--id", b.adminId, "--key="+b.adminSecret) + "status", b.Image, "--pool", b.Pool, "-m", mon, "--id", id, "--key="+secret) output = string(cmd) // break if command succeeds