From a768092f9a9d6312aa4fd0b036144d2dfa57cd40 Mon Sep 17 00:00:00 2001 From: Yecheng Fu Date: Fri, 1 Sep 2017 05:33:47 +0000 Subject: [PATCH 1/4] RBD Plugin: Prepare to implement Attacher/Detacher interfaces. 1) Fix FakeMounter.IsLikelyNotMountPoint to return ErrNotExist if the directory does not exist. Mounter.IsLikelyNotMountPoint interface requires this, and RBD plugin depends on it. --- pkg/util/mount/fake.go | 6 ++++++ pkg/util/mount/safe_format_and_mount_test.go | 9 ++++++++- 2 files changed, 14 insertions(+), 1 deletion(-) diff --git a/pkg/util/mount/fake.go b/pkg/util/mount/fake.go index d34510ae2a8..f4e2e411de1 100644 --- a/pkg/util/mount/fake.go +++ b/pkg/util/mount/fake.go @@ -17,6 +17,7 @@ limitations under the License. package mount import ( + "os" "path/filepath" "sync" @@ -136,6 +137,11 @@ func (f *FakeMounter) IsLikelyNotMountPoint(file string) (bool, error) { f.mutex.Lock() defer f.mutex.Unlock() + _, err := os.Stat(file) + if err != nil { + return true, err + } + // If file is a symlink, get its absolute path absFile, err := filepath.EvalSymlinks(file) if err != nil { diff --git a/pkg/util/mount/safe_format_and_mount_test.go b/pkg/util/mount/safe_format_and_mount_test.go index 72b768f3bf4..a7e7cc29a23 100644 --- a/pkg/util/mount/safe_format_and_mount_test.go +++ b/pkg/util/mount/safe_format_and_mount_test.go @@ -18,6 +18,8 @@ package mount import ( "fmt" + "io/ioutil" + "os" "runtime" "testing" @@ -50,6 +52,11 @@ func TestSafeFormatAndMount(t *testing.T) { if runtime.GOOS == "darwin" || runtime.GOOS == "windows" { t.Skipf("not supported on GOOS=%s", runtime.GOOS) } + mntDir, err := ioutil.TempDir(os.TempDir(), "mount") + if err != nil { + t.Fatalf("failed to create tmp dir: %v", err) + } + defer os.RemoveAll(mntDir) tests := []struct { description string fstype string @@ -207,7 +214,7 @@ func TestSafeFormatAndMount(t *testing.T) { } device := "/dev/foo" - dest := "/mnt/bar" + dest := mntDir err := mounter.FormatAndMount(device, dest, test.fstype, test.mountOptions) if test.expectedError == nil { if err != nil { From ba0d275f3b6b133c2d891d728028c331c7620c8b Mon Sep 17 00:00:00 2001 From: Yecheng Fu Date: Wed, 30 Aug 2017 16:52:11 +0000 Subject: [PATCH 2/4] RBD Plugin: Implement Attacher/Detacher interfaces. 1) Modify rbdPlugin to implement volume.AttachableVolumePlugin interface. 2) Add rbdAttacher/rbdDetacher structs to implement volume.Attacher/Detacher interfaces. 3) Add mount.SafeFormatAndMount/mount.Exec fields to rbdPlugin, and setup them in rbdPlugin.Init for later uses. Attacher/Mounter/Unmounter/Detacher reference rbdPlugin to use mounter and exec. This simplifies code. 4) Add testcase struct to abstract RBD Plugin test case, etc. 5) Add newRBD constructor to unify rbd struct initialization. --- cmd/kube-controller-manager/app/plugins.go | 1 + pkg/volume/rbd/BUILD | 2 + pkg/volume/rbd/attacher.go | 219 +++++++++++++++++ pkg/volume/rbd/disk_manager.go | 51 ++-- pkg/volume/rbd/rbd.go | 166 +++++++++---- pkg/volume/rbd/rbd_test.go | 259 ++++++++++++++++----- pkg/volume/rbd/rbd_util.go | 114 +++++---- 7 files changed, 624 insertions(+), 188 deletions(-) create mode 100644 pkg/volume/rbd/attacher.go diff --git a/cmd/kube-controller-manager/app/plugins.go b/cmd/kube-controller-manager/app/plugins.go index bf9c9fe1299..716539b9fe0 100644 --- a/cmd/kube-controller-manager/app/plugins.go +++ b/cmd/kube-controller-manager/app/plugins.go @@ -78,6 +78,7 @@ func ProbeAttachableVolumePlugins() []volume.VolumePlugin { allPlugins = append(allPlugins, storageos.ProbeVolumePlugins()...) allPlugins = append(allPlugins, fc.ProbeVolumePlugins()...) allPlugins = append(allPlugins, iscsi.ProbeVolumePlugins()...) + allPlugins = append(allPlugins, rbd.ProbeVolumePlugins()...) return allPlugins } diff --git a/pkg/volume/rbd/BUILD b/pkg/volume/rbd/BUILD index 2ead168a5c3..5185d960231 100644 --- a/pkg/volume/rbd/BUILD +++ b/pkg/volume/rbd/BUILD @@ -9,6 +9,7 @@ load( go_library( name = "go_default_library", srcs = [ + "attacher.go", "disk_manager.go", "doc.go", "rbd.go", @@ -46,6 +47,7 @@ go_test( "//vendor/k8s.io/api/core/v1:go_default_library", "//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library", "//vendor/k8s.io/apimachinery/pkg/types:go_default_library", + "//vendor/k8s.io/apimachinery/pkg/util/uuid:go_default_library", "//vendor/k8s.io/client-go/kubernetes/fake:go_default_library", "//vendor/k8s.io/client-go/util/testing:go_default_library", ], diff --git a/pkg/volume/rbd/attacher.go b/pkg/volume/rbd/attacher.go new file mode 100644 index 00000000000..a7df8dbeb63 --- /dev/null +++ b/pkg/volume/rbd/attacher.go @@ -0,0 +1,219 @@ +/* +Copyright 2017 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package rbd + +import ( + "fmt" + "os" + "time" + + "github.com/golang/glog" + "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/types" + "k8s.io/kubernetes/pkg/util/mount" + "k8s.io/kubernetes/pkg/volume" + volutil "k8s.io/kubernetes/pkg/volume/util" +) + +// NewAttacher implements AttachableVolumePlugin.NewAttacher. +func (plugin *rbdPlugin) NewAttacher() (volume.Attacher, error) { + return plugin.newAttacherInternal(&RBDUtil{}) +} + +func (plugin *rbdPlugin) newAttacherInternal(manager diskManager) (volume.Attacher, error) { + return &rbdAttacher{ + plugin: plugin, + manager: manager, + }, nil +} + +// NewDetacher implements AttachableVolumePlugin.NewDetacher. +func (plugin *rbdPlugin) NewDetacher() (volume.Detacher, error) { + return plugin.newDetacherInternal(&RBDUtil{}) +} + +func (plugin *rbdPlugin) newDetacherInternal(manager diskManager) (volume.Detacher, error) { + return &rbdDetacher{ + plugin: plugin, + manager: manager, + }, nil +} + +// GetDeviceMountRefs implements AttachableVolumePlugin.GetDeviceMountRefs. +func (plugin *rbdPlugin) GetDeviceMountRefs(deviceMountPath string) ([]string, error) { + mounter := plugin.host.GetMounter(plugin.GetPluginName()) + return mount.GetMountRefs(mounter, deviceMountPath) +} + +// rbdAttacher implements volume.Attacher interface. +type rbdAttacher struct { + plugin *rbdPlugin + manager diskManager +} + +var _ volume.Attacher = &rbdAttacher{} + +// Attach implements Attacher.Attach. +// We do not lock image here, because it requires kube-controller-manager to +// access external `rbd` utility. And there is no need since AttachDetach +// controller will not try to attach RWO volumes which are already attached to +// other nodes. +func (attacher *rbdAttacher) Attach(spec *volume.Spec, nodeName types.NodeName) (string, error) { + return "", nil +} + +// VolumesAreAttached implements Attacher.VolumesAreAttached. +// There is no way to confirm whether the volume is attached or not from +// outside of the kubelet node. This method needs to return true always, like +// iSCSI, FC plugin. +func (attacher *rbdAttacher) VolumesAreAttached(specs []*volume.Spec, nodeName types.NodeName) (map[*volume.Spec]bool, error) { + volumesAttachedCheck := make(map[*volume.Spec]bool) + for _, spec := range specs { + volumesAttachedCheck[spec] = true + } + return volumesAttachedCheck, nil +} + +// WaitForAttach implements Attacher.WaitForAttach. It's called by kublet to +// attach volume onto the node. +// This method is idempotent, callers are responsible for retrying on failure. +func (attacher *rbdAttacher) WaitForAttach(spec *volume.Spec, devicePath string, pod *v1.Pod, timeout time.Duration) (string, error) { + glog.V(4).Infof("rbd: waiting for attach volume (name: %s) for pod (name: %s, uid: %s)", spec.Name(), pod.Name, pod.UID) + mounter, err := attacher.plugin.createMounterFromVolumeSpecAndPod(spec, pod) + if err != nil { + glog.Warningf("failed to create mounter: %v", spec) + return "", err + } + realDevicePath, err := attacher.manager.AttachDisk(*mounter) + if err != nil { + return "", err + } + glog.V(3).Infof("rbd: successfully wait for attach volume (spec: %s, pool: %s, image: %s) at %s", spec.Name(), mounter.Pool, mounter.Image, realDevicePath) + return realDevicePath, nil +} + +// GetDeviceMountPath implements Attacher.GetDeviceMountPath. +func (attacher *rbdAttacher) GetDeviceMountPath(spec *volume.Spec) (string, error) { + img, err := getVolumeSourceImage(spec) + if err != nil { + return "", err + } + pool, err := getVolumeSourcePool(spec) + if err != nil { + return "", err + } + return makePDNameInternal(attacher.plugin.host, pool, img), nil +} + +// MountDevice implements Attacher.MountDevice. It is called by the kubelet to +// mount device at the given mount path. +// This method is idempotent, callers are responsible for retrying on failure. +func (attacher *rbdAttacher) MountDevice(spec *volume.Spec, devicePath string, deviceMountPath string) error { + glog.V(4).Infof("rbd: mouting device %s to %s", devicePath, deviceMountPath) + notMnt, err := attacher.plugin.mounter.IsLikelyNotMountPoint(deviceMountPath) + if err != nil { + if os.IsNotExist(err) { + if err := os.MkdirAll(deviceMountPath, 0750); err != nil { + return err + } + notMnt = true + } else { + return err + } + } + if !notMnt { + return nil + } + fstype, err := getVolumeSourceFSType(spec) + if err != nil { + return err + } + ro, err := getVolumeSourceReadOnly(spec) + if err != nil { + return err + } + options := []string{} + if ro { + options = append(options, "ro") + } + mountOptions := volume.MountOptionFromSpec(spec, options...) + err = attacher.plugin.mounter.FormatAndMount(devicePath, deviceMountPath, fstype, mountOptions) + if err != nil { + return fmt.Errorf("rbd: failed to mount device %s at %s (fstype: %s), error %v", devicePath, deviceMountPath, fstype, err) + } + glog.V(3).Infof("rbd: successfully mount device %s at %s (fstype: %s)", devicePath, deviceMountPath, fstype) + return nil +} + +// rbdDetacher implements volume.Detacher interface. +type rbdDetacher struct { + plugin *rbdPlugin + manager diskManager +} + +var _ volume.Detacher = &rbdDetacher{} + +// UnmountDevice implements Detacher.UnmountDevice. It unmounts the global +// mount of the RBD image. This is called once all bind mounts have been +// unmounted. +// Internally, it does four things: +// - Unmount device from deviceMountPath. +// - Detach device from the node. +// - Remove lock if found. (No need to check volume readonly or not, because +// device is not on the node anymore, it's safe to remove lock.) +// - Remove the deviceMountPath at last. +// This method is idempotent, callers are responsible for retrying on failure. +func (detacher *rbdDetacher) UnmountDevice(deviceMountPath string) error { + if pathExists, pathErr := volutil.PathExists(deviceMountPath); pathErr != nil { + return fmt.Errorf("Error checking if path exists: %v", pathErr) + } else if !pathExists { + glog.Warningf("Warning: Unmount skipped because path does not exist: %v", deviceMountPath) + return nil + } + devicePath, cnt, err := mount.GetDeviceNameFromMount(detacher.plugin.mounter, deviceMountPath) + if err != nil { + return err + } + if cnt > 1 { + return fmt.Errorf("rbd: more than 1 reference counts at %s", deviceMountPath) + } + if cnt == 1 { + // Unmount the device from the device mount point. + glog.V(4).Infof("rbd: unmouting device mountpoint %s", deviceMountPath) + if err = detacher.plugin.mounter.Unmount(deviceMountPath); err != nil { + return err + } + glog.V(3).Infof("rbd: successfully umount device mountpath %s", deviceMountPath) + } + glog.V(4).Infof("rbd: detaching device %s", devicePath) + err = detacher.manager.DetachDisk(detacher.plugin, deviceMountPath, devicePath) + if err != nil { + return err + } + glog.V(3).Infof("rbd: successfully detach device %s", devicePath) + err = os.RemoveAll(deviceMountPath) + if err != nil { + return err + } + glog.V(3).Infof("rbd: successfully remove device mount point %s", deviceMountPath) + return nil +} + +// Detach implements Detacher.Detach. +func (detacher *rbdDetacher) Detach(deviceName string, nodeName types.NodeName) error { + return nil +} diff --git a/pkg/volume/rbd/disk_manager.go b/pkg/volume/rbd/disk_manager.go index 24a090109fd..3ba83fe36cb 100644 --- a/pkg/volume/rbd/disk_manager.go +++ b/pkg/volume/rbd/disk_manager.go @@ -23,6 +23,7 @@ limitations under the License. package rbd import ( + "fmt" "os" "github.com/golang/glog" @@ -33,23 +34,33 @@ import ( // Abstract interface to disk operations. type diskManager interface { + // MakeGlobalPDName creates global persistent disk path. MakeGlobalPDName(disk rbd) string // Attaches the disk to the kubelet's host machine. - AttachDisk(disk rbdMounter) error + // If it successfully attaches, the path to the device + // is returned. Otherwise, an error will be returned. + AttachDisk(disk rbdMounter) (string, error) // Detaches the disk from the kubelet's host machine. - DetachDisk(disk rbdUnmounter, mntPath string) error - // Creates a rbd image + DetachDisk(plugin *rbdPlugin, deviceMountPath string, device string) error + // Creates a rbd image. CreateImage(provisioner *rbdVolumeProvisioner) (r *v1.RBDPersistentVolumeSource, volumeSizeGB int, err error) - // Deletes a rbd image + // Deletes a rbd image. DeleteImage(deleter *rbdVolumeDeleter) error } // utility to mount a disk based filesystem func diskSetUp(manager diskManager, b rbdMounter, volPath string, mounter mount.Interface, fsGroup *int64) error { globalPDPath := manager.MakeGlobalPDName(*b.rbd) - // TODO: handle failed mounts here. - notMnt, err := mounter.IsLikelyNotMountPoint(volPath) + notMnt, err := mounter.IsLikelyNotMountPoint(globalPDPath) + if err != nil && !os.IsNotExist(err) { + glog.Errorf("cannot validate mountpoint: %s", globalPDPath) + return err + } + if notMnt { + return fmt.Errorf("no device is mounted at %s", globalPDPath) + } + notMnt, err = mounter.IsLikelyNotMountPoint(volPath) if err != nil && !os.IsNotExist(err) { glog.Errorf("cannot validate mountpoint: %s", volPath) return err @@ -57,10 +68,6 @@ func diskSetUp(manager diskManager, b rbdMounter, volPath string, mounter mount. if !notMnt { return nil } - if err := manager.AttachDisk(b); err != nil { - glog.Errorf("failed to attach disk") - return err - } if err := os.MkdirAll(volPath, 0750); err != nil { glog.Errorf("failed to mkdir:%s", volPath) @@ -89,43 +96,31 @@ func diskSetUp(manager diskManager, b rbdMounter, volPath string, mounter mount. // utility to tear down a disk based filesystem func diskTearDown(manager diskManager, c rbdUnmounter, volPath string, mounter mount.Interface) error { notMnt, err := mounter.IsLikelyNotMountPoint(volPath) - if err != nil { - glog.Errorf("cannot validate mountpoint %s", volPath) + if err != nil && !os.IsNotExist(err) { + glog.Errorf("cannot validate mountpoint: %s", volPath) return err } if notMnt { + glog.V(3).Infof("volume path %s is not a mountpoint, deleting", volPath) return os.Remove(volPath) } - refs, err := mount.GetMountRefs(mounter, volPath) - if err != nil { - glog.Errorf("failed to get reference count %s", volPath) - return err - } + // Unmount the bind-mount inside this pod. if err := mounter.Unmount(volPath); err != nil { glog.Errorf("failed to umount %s", volPath) return err } - // If len(refs) is 1, then all bind mounts have been removed, and the - // remaining reference is the global mount. It is safe to detach. - if len(refs) == 1 { - mntPath := refs[0] - if err := manager.DetachDisk(c, mntPath); err != nil { - glog.Errorf("failed to detach disk from %s", mntPath) - return err - } - } notMnt, mntErr := mounter.IsLikelyNotMountPoint(volPath) - if mntErr != nil { + if err != nil && !os.IsNotExist(err) { glog.Errorf("IsLikelyNotMountPoint check failed: %v", mntErr) return err } if notMnt { if err := os.Remove(volPath); err != nil { + glog.V(2).Info("Error removing mountpoint ", volPath, ": ", err) return err } } return nil - } diff --git a/pkg/volume/rbd/rbd.go b/pkg/volume/rbd/rbd.go index 87ca9992f19..b3cf958a349 100644 --- a/pkg/volume/rbd/rbd.go +++ b/pkg/volume/rbd/rbd.go @@ -41,17 +41,21 @@ var ( // This is the primary entrypoint for volume plugins. func ProbeVolumePlugins() []volume.VolumePlugin { - return []volume.VolumePlugin{&rbdPlugin{nil}} + return []volume.VolumePlugin{&rbdPlugin{nil, nil, nil}} } +// rbdPlugin implements Volume.VolumePlugin. type rbdPlugin struct { - host volume.VolumeHost + host volume.VolumeHost + exec mount.Exec + mounter *mount.SafeFormatAndMount } var _ volume.VolumePlugin = &rbdPlugin{} var _ volume.PersistentVolumePlugin = &rbdPlugin{} var _ volume.DeletableVolumePlugin = &rbdPlugin{} var _ volume.ProvisionableVolumePlugin = &rbdPlugin{} +var _ volume.AttachableVolumePlugin = &rbdPlugin{} const ( rbdPluginName = "kubernetes.io/rbd" @@ -70,6 +74,8 @@ func getPath(uid types.UID, volName string, host volume.VolumeHost) string { func (plugin *rbdPlugin) Init(host volume.VolumeHost) error { plugin.host = host + plugin.exec = host.GetExec(plugin.GetPluginName()) + plugin.mounter = volumehelper.NewSafeFormatAndMountFromHost(plugin.GetPluginName(), plugin.host) return nil } @@ -120,6 +126,68 @@ func (plugin *rbdPlugin) GetAccessModes() []v1.PersistentVolumeAccessMode { } } +func (plugin *rbdPlugin) createMounterFromVolumeSpecAndPod(spec *volume.Spec, pod *v1.Pod) (*rbdMounter, error) { + var err error + mon, err := getVolumeSourceMonitors(spec) + if err != nil { + return nil, err + } + img, err := getVolumeSourceImage(spec) + if err != nil { + return nil, err + } + fstype, err := getVolumeSourceFSType(spec) + if err != nil { + return nil, err + } + pool, err := getVolumeSourcePool(spec) + if err != nil { + return nil, err + } + id, err := getVolumeSourceUser(spec) + if err != nil { + return nil, err + } + keyring, err := getVolumeSourceKeyRing(spec) + if err != nil { + return nil, err + } + ro, err := getVolumeSourceReadOnly(spec) + if err != nil { + return nil, err + } + + secretName, secretNs, err := getSecretNameAndNamespace(spec, pod.Namespace) + if err != nil { + return nil, err + } + secret := "" + if len(secretName) > 0 && len(secretNs) > 0 { + // if secret is provideded, retrieve it + kubeClient := plugin.host.GetKubeClient() + if kubeClient == nil { + return nil, fmt.Errorf("Cannot get kube client") + } + secrets, err := kubeClient.Core().Secrets(secretNs).Get(secretName, metav1.GetOptions{}) + if err != nil { + err = fmt.Errorf("Couldn't get secret %v/%v err: %v", secretNs, secretName, err) + return nil, err + } + for _, data := range secrets.Data { + secret = string(data) + } + } + + return &rbdMounter{ + rbd: newRBD("", spec.Name(), img, pool, ro, plugin, &RBDUtil{}), + Mon: mon, + Id: id, + Keyring: keyring, + Secret: secret, + fsType: fstype, + }, nil +} + func (plugin *rbdPlugin) NewMounter(spec *volume.Spec, pod *v1.Pod, _ volume.VolumeOptions) (volume.Mounter, error) { secretName, secretNs, err := getSecretNameAndNamespace(spec, pod.Namespace) if err != nil { @@ -143,10 +211,10 @@ func (plugin *rbdPlugin) NewMounter(spec *volume.Spec, pod *v1.Pod, _ volume.Vol } // Inject real implementations here, test through the internal function. - return plugin.newMounterInternal(spec, pod.UID, &RBDUtil{}, plugin.host.GetMounter(plugin.GetPluginName()), plugin.host.GetExec(plugin.GetPluginName()), secret) + return plugin.newMounterInternal(spec, pod.UID, &RBDUtil{}, secret) } -func (plugin *rbdPlugin) newMounterInternal(spec *volume.Spec, podUID types.UID, manager diskManager, mounter mount.Interface, exec mount.Exec, secret string) (volume.Mounter, error) { +func (plugin *rbdPlugin) newMounterInternal(spec *volume.Spec, podUID types.UID, manager diskManager, secret string) (volume.Mounter, error) { mon, err := getVolumeSourceMonitors(spec) if err != nil { return nil, err @@ -177,18 +245,7 @@ func (plugin *rbdPlugin) newMounterInternal(spec *volume.Spec, podUID types.UID, } return &rbdMounter{ - rbd: &rbd{ - podUID: podUID, - volName: spec.Name(), - Image: img, - Pool: pool, - ReadOnly: ro, - manager: manager, - mounter: &mount.SafeFormatAndMount{Interface: mounter, Exec: exec}, - exec: exec, - plugin: plugin, - MetricsProvider: volume.NewMetricsStatFS(getPath(podUID, spec.Name(), plugin.host)), - }, + rbd: newRBD(podUID, spec.Name(), img, pool, ro, plugin, manager), Mon: mon, Id: id, Keyring: keyring, @@ -200,21 +257,13 @@ func (plugin *rbdPlugin) newMounterInternal(spec *volume.Spec, podUID types.UID, func (plugin *rbdPlugin) NewUnmounter(volName string, podUID types.UID) (volume.Unmounter, error) { // Inject real implementations here, test through the internal function. - return plugin.newUnmounterInternal(volName, podUID, &RBDUtil{}, plugin.host.GetMounter(plugin.GetPluginName()), plugin.host.GetExec(plugin.GetPluginName())) + return plugin.newUnmounterInternal(volName, podUID, &RBDUtil{}) } -func (plugin *rbdPlugin) newUnmounterInternal(volName string, podUID types.UID, manager diskManager, mounter mount.Interface, exec mount.Exec) (volume.Unmounter, error) { +func (plugin *rbdPlugin) newUnmounterInternal(volName string, podUID types.UID, manager diskManager) (volume.Unmounter, error) { return &rbdUnmounter{ rbdMounter: &rbdMounter{ - rbd: &rbd{ - podUID: podUID, - volName: volName, - manager: manager, - mounter: &mount.SafeFormatAndMount{Interface: mounter, Exec: exec}, - exec: exec, - plugin: plugin, - MetricsProvider: volume.NewMetricsStatFS(getPath(podUID, volName, plugin.host)), - }, + rbd: newRBD(podUID, volName, "", "", false, plugin, manager), Mon: make([]string, 0), }, }, nil @@ -268,15 +317,7 @@ func (plugin *rbdPlugin) NewDeleter(spec *volume.Spec) (volume.Deleter, error) { func (plugin *rbdPlugin) newDeleterInternal(spec *volume.Spec, admin, secret string, manager diskManager) (volume.Deleter, error) { return &rbdVolumeDeleter{ rbdMounter: &rbdMounter{ - rbd: &rbd{ - volName: spec.Name(), - Image: spec.PersistentVolume.Spec.RBD.RBDImage, - Pool: spec.PersistentVolume.Spec.RBD.RBDPool, - manager: manager, - plugin: plugin, - mounter: &mount.SafeFormatAndMount{Interface: plugin.host.GetMounter(plugin.GetPluginName())}, - exec: plugin.host.GetExec(plugin.GetPluginName()), - }, + rbd: newRBD("", spec.Name(), spec.PersistentVolume.Spec.RBD.RBDImage, spec.PersistentVolume.Spec.RBD.RBDPool, false, plugin, manager), Mon: spec.PersistentVolume.Spec.RBD.CephMonitors, adminId: admin, adminSecret: secret, @@ -290,22 +331,20 @@ func (plugin *rbdPlugin) NewProvisioner(options volume.VolumeOptions) (volume.Pr func (plugin *rbdPlugin) newProvisionerInternal(options volume.VolumeOptions, manager diskManager) (volume.Provisioner, error) { return &rbdVolumeProvisioner{ rbdMounter: &rbdMounter{ - rbd: &rbd{ - manager: manager, - plugin: plugin, - mounter: &mount.SafeFormatAndMount{Interface: plugin.host.GetMounter(plugin.GetPluginName())}, - exec: plugin.host.GetExec(plugin.GetPluginName()), - }, + rbd: newRBD("", "", "", "", false, plugin, manager), }, options: options, }, nil } +// rbdVolumeProvisioner implements volume.Provisioner interface. type rbdVolumeProvisioner struct { *rbdMounter options volume.VolumeOptions } +var _ volume.Provisioner = &rbdVolumeProvisioner{} + func (r *rbdVolumeProvisioner) Provision() (*v1.PersistentVolume, error) { if !volume.AccessModesContainedInAll(r.plugin.GetAccessModes(), r.options.PVC.Spec.AccessModes) { return nil, fmt.Errorf("invalid AccessModes %v: only AccessModes %v are supported", r.options.PVC.Spec.AccessModes, r.plugin.GetAccessModes()) @@ -419,10 +458,13 @@ func (r *rbdVolumeProvisioner) Provision() (*v1.PersistentVolume, error) { return pv, nil } +// rbdVolumeDeleter implements volume.Deleter interface. type rbdVolumeDeleter struct { *rbdMounter } +var _ volume.Deleter = &rbdVolumeDeleter{} + func (r *rbdVolumeDeleter) GetPath() string { return getPath(r.podUID, r.volName, r.plugin.host) } @@ -431,6 +473,8 @@ func (r *rbdVolumeDeleter) Delete() error { return r.manager.DeleteImage(r) } +// rbd implmenets volume.Volume interface. +// It's embedded in Mounter/Unmounter/Deleter. type rbd struct { volName string podUID types.UID @@ -445,11 +489,35 @@ type rbd struct { volume.MetricsProvider `json:"-"` } +var _ volume.Volume = &rbd{} + func (rbd *rbd) GetPath() string { // safe to use PodVolumeDir now: volume teardown occurs before pod is cleaned up return getPath(rbd.podUID, rbd.volName, rbd.plugin.host) } +// newRBD creates a new rbd. +func newRBD(podUID types.UID, volName string, image string, pool string, readOnly bool, plugin *rbdPlugin, manager diskManager) *rbd { + return &rbd{ + podUID: podUID, + volName: volName, + Image: image, + Pool: pool, + ReadOnly: readOnly, + plugin: plugin, + mounter: plugin.mounter, + exec: plugin.exec, + manager: manager, + MetricsProvider: volume.NewMetricsStatFS(getPath(podUID, volName, plugin.host)), + } +} + +// rbdMounter implements volume.Mounter interface. +// It contains information which need to be persisted in whole life cycle of PV +// on the node. It is persisted at the very beginning in the pod mount point +// directory. +// Note: Capitalized field names of this struct determines the information +// persisted on the disk, DO NOT change them. (TODO: refactoring to use a dedicated struct?) type rbdMounter struct { *rbd // capitalized so they can be exported in persistRBD() @@ -488,14 +556,16 @@ func (b *rbdMounter) SetUp(fsGroup *int64) error { func (b *rbdMounter) SetUpAt(dir string, fsGroup *int64) error { // diskSetUp checks mountpoints and prevent repeated calls - glog.V(4).Infof("rbd: attempting to SetUp and mount %s", dir) + glog.V(4).Infof("rbd: attempting to setup at %s", dir) err := diskSetUp(b.manager, *b, dir, b.mounter, fsGroup) if err != nil { - glog.Errorf("rbd: failed to setup mount %s %v", dir, err) + glog.Errorf("rbd: failed to setup at %s %v", dir, err) } + glog.V(3).Infof("rbd: successfully setup at %s", dir) return err } +// rbdUnmounter implements volume.Unmounter interface. type rbdUnmounter struct { *rbdMounter } @@ -509,13 +579,19 @@ func (c *rbdUnmounter) TearDown() error { } func (c *rbdUnmounter) TearDownAt(dir string) error { + glog.V(4).Infof("rbd: attempting to teardown at %s", dir) if pathExists, pathErr := volutil.PathExists(dir); pathErr != nil { return fmt.Errorf("Error checking if path exists: %v", pathErr) } else if !pathExists { glog.Warningf("Warning: Unmount skipped because path does not exist: %v", dir) return nil } - return diskTearDown(c.manager, *c, dir, c.mounter) + err := diskTearDown(c.manager, *c, dir, c.mounter) + if err != nil { + return err + } + glog.V(3).Infof("rbd: successfully teardown at %s", dir) + return nil } func getVolumeSourceMonitors(spec *volume.Spec) ([]string, error) { diff --git a/pkg/volume/rbd/rbd_test.go b/pkg/volume/rbd/rbd_test.go index 8a0a098de36..99192a2cb37 100644 --- a/pkg/volume/rbd/rbd_test.go +++ b/pkg/volume/rbd/rbd_test.go @@ -23,12 +23,15 @@ import ( "path/filepath" "reflect" "strings" + "sync" "testing" + "time" "github.com/stretchr/testify/assert" "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/types" + "k8s.io/apimachinery/pkg/util/uuid" "k8s.io/client-go/kubernetes/fake" utiltesting "k8s.io/client-go/util/testing" "k8s.io/kubernetes/pkg/util/mount" @@ -59,37 +62,43 @@ func TestCanSupport(t *testing.T) { } type fakeDiskManager struct { - tmpDir string + // Make sure we can run tests in parallel. + mutex sync.RWMutex + // Key format: "/" + rbdImageLocks map[string]bool + rbdMapIndex int + rbdDevices map[string]bool } func NewFakeDiskManager() *fakeDiskManager { return &fakeDiskManager{ - tmpDir: utiltesting.MkTmpdirOrDie("rbd_test"), + rbdImageLocks: make(map[string]bool), + rbdMapIndex: 0, + rbdDevices: make(map[string]bool), } } -func (fake *fakeDiskManager) Cleanup() { - os.RemoveAll(fake.tmpDir) +func (fake *fakeDiskManager) MakeGlobalPDName(rbd rbd) string { + return makePDNameInternal(rbd.plugin.host, rbd.Pool, rbd.Image) } -func (fake *fakeDiskManager) MakeGlobalPDName(disk rbd) string { - return fake.tmpDir -} -func (fake *fakeDiskManager) AttachDisk(b rbdMounter) error { - globalPath := b.manager.MakeGlobalPDName(*b.rbd) - err := os.MkdirAll(globalPath, 0750) - if err != nil { - return err - } - return nil +func (fake *fakeDiskManager) AttachDisk(b rbdMounter) (string, error) { + fake.mutex.Lock() + defer fake.mutex.Unlock() + fake.rbdMapIndex += 1 + devicePath := fmt.Sprintf("/dev/rbd%d", fake.rbdMapIndex) + fake.rbdDevices[devicePath] = true + return devicePath, nil } -func (fake *fakeDiskManager) DetachDisk(c rbdUnmounter, mntPath string) error { - globalPath := c.manager.MakeGlobalPDName(*c.rbd) - err := os.RemoveAll(globalPath) - if err != nil { - return err +func (fake *fakeDiskManager) DetachDisk(r *rbdPlugin, deviceMountPath string, device string) error { + fake.mutex.Lock() + defer fake.mutex.Unlock() + ok := fake.rbdDevices[device] + if !ok { + return fmt.Errorf("rbd: failed to detach device %s, it does not exist", device) } + delete(fake.rbdDevices, device) return nil } @@ -101,35 +110,111 @@ func (fake *fakeDiskManager) DeleteImage(deleter *rbdVolumeDeleter) error { return fmt.Errorf("not implemented") } -func doTestPlugin(t *testing.T, spec *volume.Spec) { - tmpDir, err := utiltesting.MkTmpdir("rbd_test") - if err != nil { - t.Fatalf("error creating temp dir: %v", err) +func (fake *fakeDiskManager) Fencing(r rbdMounter, nodeName string) error { + fake.mutex.Lock() + defer fake.mutex.Unlock() + key := fmt.Sprintf("%s/%s", r.Pool, r.Image) + isLocked, ok := fake.rbdImageLocks[key] + if ok && isLocked { + // not expected in testing + return fmt.Errorf("%s is already locked", key) } - defer os.RemoveAll(tmpDir) + fake.rbdImageLocks[key] = true + return nil +} +func (fake *fakeDiskManager) Defencing(r rbdMounter, nodeName string) error { + fake.mutex.Lock() + defer fake.mutex.Unlock() + key := fmt.Sprintf("%s/%s", r.Pool, r.Image) + isLocked, ok := fake.rbdImageLocks[key] + if !ok || !isLocked { + // not expected in testing + return fmt.Errorf("%s is not locked", key) + } + delete(fake.rbdImageLocks, key) + return nil +} + +func (fake *fakeDiskManager) IsLocked(r rbdMounter, nodeName string) (bool, error) { + fake.mutex.RLock() + defer fake.mutex.RUnlock() + key := fmt.Sprintf("%s/%s", r.Pool, r.Image) + isLocked, ok := fake.rbdImageLocks[key] + return ok && isLocked, nil +} + +// checkMounterLog checks fakeMounter must have expected logs, and the last action msut equal to expectedAction. +func checkMounterLog(t *testing.T, fakeMounter *mount.FakeMounter, expected int, expectedAction mount.FakeAction) { + if len(fakeMounter.Log) != expected { + t.Fatalf("fakeMounter should have %d logs, actual: %d", expected, len(fakeMounter.Log)) + } + lastIndex := len(fakeMounter.Log) - 1 + lastAction := fakeMounter.Log[lastIndex] + if !reflect.DeepEqual(expectedAction, lastAction) { + t.Fatalf("fakeMounter.Log[%d] should be %v, not: %v", lastIndex, expectedAction, lastAction) + } +} + +func doTestPlugin(t *testing.T, c *testcase) { + fakeVolumeHost := volumetest.NewFakeVolumeHost(c.root, nil, nil) plugMgr := volume.VolumePluginMgr{} - plugMgr.InitPlugins(ProbeVolumePlugins(), nil /* prober */, volumetest.NewFakeVolumeHost(tmpDir, nil, nil)) - + plugMgr.InitPlugins(ProbeVolumePlugins(), nil /* prober */, fakeVolumeHost) plug, err := plugMgr.FindPluginByName("kubernetes.io/rbd") if err != nil { t.Errorf("Can't find the plugin by name") } + fakeMounter := fakeVolumeHost.GetMounter(plug.GetPluginName()).(*mount.FakeMounter) + fakeNodeName := types.NodeName("localhost") fdm := NewFakeDiskManager() - defer fdm.Cleanup() - exec := mount.NewFakeExec(nil) - mounter, err := plug.(*rbdPlugin).newMounterInternal(spec, types.UID("poduid"), fdm, &mount.FakeMounter{}, exec, "secrets") + + // attacher + attacher, err := plug.(*rbdPlugin).newAttacherInternal(fdm) + if err != nil { + t.Errorf("Failed to make a new Attacher: %v", err) + } + deviceAttachPath, err := attacher.Attach(c.spec, fakeNodeName) + if err != nil { + t.Fatal(err) + } + devicePath, err := attacher.WaitForAttach(c.spec, deviceAttachPath, c.pod, time.Second*10) + if err != nil { + t.Fatal(err) + } + if devicePath != c.expectedDevicePath { + t.Errorf("Unexpected path, expected %q, not: %q", c.expectedDevicePath, devicePath) + } + deviceMountPath, err := attacher.GetDeviceMountPath(c.spec) + if err != nil { + t.Fatal(err) + } + if deviceMountPath != c.expectedDeviceMountPath { + t.Errorf("Unexpected mount path, expected %q, not: %q", c.expectedDeviceMountPath, deviceMountPath) + } + err = attacher.MountDevice(c.spec, devicePath, deviceMountPath) + if err != nil { + t.Fatal(err) + } + if _, err := os.Stat(deviceMountPath); err != nil { + if os.IsNotExist(err) { + t.Errorf("Attacher.MountDevice() failed, device mount path not created: %s", deviceMountPath) + } else { + t.Errorf("Attacher.MountDevice() failed: %v", err) + } + } + checkMounterLog(t, fakeMounter, 1, mount.FakeAction{Action: "mount", Target: c.expectedDeviceMountPath, Source: devicePath, FSType: "ext4"}) + + // mounter + mounter, err := plug.(*rbdPlugin).newMounterInternal(c.spec, c.pod.UID, fdm, "secrets") if err != nil { t.Errorf("Failed to make a new Mounter: %v", err) } if mounter == nil { t.Error("Got a nil Mounter") } - path := mounter.GetPath() - expectedPath := fmt.Sprintf("%s/pods/poduid/volumes/kubernetes.io~rbd/vol1", tmpDir) - if path != expectedPath { - t.Errorf("Unexpected path, expected %q, got: %q", expectedPath, path) + if path != c.expectedPodMountPath { + t.Errorf("Unexpected path, expected %q, got: %q", c.expectedPodMountPath, path) } if err := mounter.SetUp(nil); err != nil { @@ -142,8 +227,10 @@ func doTestPlugin(t *testing.T, spec *volume.Spec) { t.Errorf("SetUp() failed: %v", err) } } + checkMounterLog(t, fakeMounter, 2, mount.FakeAction{Action: "mount", Target: c.expectedPodMountPath, Source: devicePath, FSType: ""}) - unmounter, err := plug.(*rbdPlugin).newUnmounterInternal("vol1", types.UID("poduid"), fdm, &mount.FakeMounter{}, exec) + // unmounter + unmounter, err := plug.(*rbdPlugin).newUnmounterInternal(c.spec.Name(), c.pod.UID, fdm) if err != nil { t.Errorf("Failed to make a new Unmounter: %v", err) } @@ -159,38 +246,98 @@ func doTestPlugin(t *testing.T, spec *volume.Spec) { } else if !os.IsNotExist(err) { t.Errorf("SetUp() failed: %v", err) } + checkMounterLog(t, fakeMounter, 3, mount.FakeAction{Action: "unmount", Target: c.expectedPodMountPath, Source: "", FSType: ""}) + + // detacher + detacher, err := plug.(*rbdPlugin).newDetacherInternal(fdm) + if err != nil { + t.Errorf("Failed to make a new Attacher: %v", err) + } + err = detacher.UnmountDevice(deviceMountPath) + if err != nil { + t.Fatalf("Detacher.UnmountDevice failed to unmount %s", deviceMountPath) + } + checkMounterLog(t, fakeMounter, 4, mount.FakeAction{Action: "unmount", Target: c.expectedDeviceMountPath, Source: "", FSType: ""}) + err = detacher.Detach(deviceMountPath, fakeNodeName) + if err != nil { + t.Fatalf("Detacher.Detach failed to detach %s from %s", deviceMountPath, fakeNodeName) + } } -func TestPluginVolume(t *testing.T) { - vol := &v1.Volume{ - Name: "vol1", - VolumeSource: v1.VolumeSource{ - RBD: &v1.RBDVolumeSource{ - CephMonitors: []string{"a", "b"}, - RBDImage: "bar", - FSType: "ext4", - }, - }, - } - doTestPlugin(t, volume.NewSpecFromVolume(vol)) +type testcase struct { + spec *volume.Spec + root string + pod *v1.Pod + expectedDevicePath string + expectedDeviceMountPath string + expectedPodMountPath string } -func TestPluginPersistentVolume(t *testing.T) { - vol := &v1.PersistentVolume{ - ObjectMeta: metav1.ObjectMeta{ + +func TestPlugin(t *testing.T) { + tmpDir, err := utiltesting.MkTmpdir("rbd_test") + if err != nil { + t.Fatalf("error creating temp dir: %v", err) + } + defer os.RemoveAll(tmpDir) + + podUID := uuid.NewUUID() + var cases []*testcase + cases = append(cases, &testcase{ + spec: volume.NewSpecFromVolume(&v1.Volume{ Name: "vol1", - }, - Spec: v1.PersistentVolumeSpec{ - PersistentVolumeSource: v1.PersistentVolumeSource{ - RBD: &v1.RBDPersistentVolumeSource{ + VolumeSource: v1.VolumeSource{ + RBD: &v1.RBDVolumeSource{ CephMonitors: []string{"a", "b"}, - RBDImage: "bar", + RBDPool: "pool1", + RBDImage: "image1", FSType: "ext4", }, }, + }), + root: tmpDir, + pod: &v1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: "testpod", + Namespace: "testns", + UID: podUID, + }, }, - } + expectedDevicePath: "/dev/rbd1", + expectedDeviceMountPath: fmt.Sprintf("%s/plugins/kubernetes.io/rbd/rbd/pool1-image-image1", tmpDir), + expectedPodMountPath: fmt.Sprintf("%s/pods/%s/volumes/kubernetes.io~rbd/vol1", tmpDir, podUID), + }) + cases = append(cases, &testcase{ + spec: volume.NewSpecFromPersistentVolume(&v1.PersistentVolume{ + ObjectMeta: metav1.ObjectMeta{ + Name: "vol2", + }, + Spec: v1.PersistentVolumeSpec{ + PersistentVolumeSource: v1.PersistentVolumeSource{ + RBD: &v1.RBDPersistentVolumeSource{ + CephMonitors: []string{"a", "b"}, + RBDPool: "pool2", + RBDImage: "image2", + FSType: "ext4", + }, + }, + }, + }, false), + root: tmpDir, + pod: &v1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: "testpod", + Namespace: "testns", + UID: podUID, + }, + }, + expectedDevicePath: "/dev/rbd1", + expectedDeviceMountPath: fmt.Sprintf("%s/plugins/kubernetes.io/rbd/rbd/pool2-image-image2", tmpDir), + expectedPodMountPath: fmt.Sprintf("%s/pods/%s/volumes/kubernetes.io~rbd/vol2", tmpDir, podUID), + }) - doTestPlugin(t, volume.NewSpecFromPersistentVolume(vol, false)) + for i := 0; i < len(cases); i++ { + doTestPlugin(t, cases[i]) + } } func TestPersistentClaimReadOnlyFlag(t *testing.T) { diff --git a/pkg/volume/rbd/rbd_util.go b/pkg/volume/rbd/rbd_util.go index 8ec9dfa6a42..2e598a99f73 100644 --- a/pkg/volume/rbd/rbd_util.go +++ b/pkg/volume/rbd/rbd_util.go @@ -35,9 +35,9 @@ import ( "github.com/golang/glog" "k8s.io/api/core/v1" - "k8s.io/kubernetes/pkg/util/mount" "k8s.io/kubernetes/pkg/util/node" "k8s.io/kubernetes/pkg/volume" + volutil "k8s.io/kubernetes/pkg/volume/util" ) const ( @@ -107,11 +107,15 @@ func makePDNameInternal(host volume.VolumeHost, pool string, image string) strin return path.Join(host.GetPluginDir(rbdPluginName), "rbd", pool+"-image-"+image) } +// RBDUtil implements diskManager interface. type RBDUtil struct{} +var _ diskManager = &RBDUtil{} + func (util *RBDUtil) MakeGlobalPDName(rbd rbd) string { return makePDNameInternal(rbd.plugin.host, rbd.Pool, rbd.Image) } + func rbdErrors(runErr, resultErr error) error { if runErr.Error() == rbdCmdErr { return fmt.Errorf("rbd: rbd cmd not found") @@ -119,6 +123,8 @@ func rbdErrors(runErr, resultErr error) error { return resultErr } +// rbdLock acquires a lock on image if lock is true, otherwise releases if a +// lock is found on image. func (util *RBDUtil) rbdLock(b rbdMounter, lock bool) error { var err error var output, locker string @@ -188,6 +194,9 @@ func (util *RBDUtil) rbdLock(b rbdMounter, lock bool) error { args := []string{"lock", "add", b.Image, lock_id, "--pool", b.Pool, "--id", b.Id, "-m", mon} args = append(args, secret_opt...) cmd, err = b.exec.Run("rbd", args...) + if err == nil { + glog.V(4).Infof("rbd: successfully add lock (locker_id: %s) on image: %s/%s with id %s mon %s", lock_id, b.Pool, b.Image, b.Id, mon) + } } else { // defencing, find locker name ind := strings.LastIndex(output, lock_id) - 1 @@ -197,14 +206,19 @@ func (util *RBDUtil) rbdLock(b rbdMounter, lock bool) error { break } } - // remove a lock: rbd lock remove - args := []string{"lock", "remove", b.Image, lock_id, locker, "--pool", b.Pool, "--id", b.Id, "-m", mon} - args = append(args, secret_opt...) - cmd, err = b.exec.Run("rbd", args...) + // remove a lock if found: rbd lock remove + if len(locker) > 0 { + args := []string{"lock", "remove", b.Image, lock_id, locker, "--pool", b.Pool, "--id", b.Id, "-m", mon} + args = append(args, secret_opt...) + cmd, err = b.exec.Run("rbd", args...) + if err == nil { + glog.V(4).Infof("rbd: successfully remove lock (locker_id: %s) on image: %s/%s with id %s mon %s", lock_id, b.Pool, b.Image, b.Id, mon) + } + } } if err == nil { - //lock is acquired + // break if operation succeeds break } } @@ -251,31 +265,19 @@ func (util *RBDUtil) fencing(b rbdMounter) error { return util.rbdLock(b, true) } -func (util *RBDUtil) defencing(c rbdUnmounter) error { - // no need to fence readOnly - if c.ReadOnly { - return nil - } - - return util.rbdLock(*c.rbdMounter, false) -} - -func (util *RBDUtil) AttachDisk(b rbdMounter) error { +// AttachDisk attaches the disk on the node. +// If Volume is not read-only, acquire a lock on image first. +func (util *RBDUtil) AttachDisk(b rbdMounter) (string, error) { var err error var output []byte - // create mount point - globalPDPath := b.manager.MakeGlobalPDName(*b.rbd) - notMnt, err := b.mounter.IsLikelyNotMountPoint(globalPDPath) - // in the first time, the path shouldn't exist and IsLikelyNotMountPoint is expected to get NotExist - if err != nil && !os.IsNotExist(err) { - return fmt.Errorf("rbd: %s failed to check mountpoint", globalPDPath) - } - if !notMnt { - return nil - } - if err = os.MkdirAll(globalPDPath, 0750); err != nil { - return fmt.Errorf("rbd: failed to mkdir %s, error", globalPDPath) + globalPDPath := util.MakeGlobalPDName(*b.rbd) + if pathExists, pathErr := volutil.PathExists(globalPDPath); pathErr != nil { + return "", fmt.Errorf("Error checking if path exists: %v", pathErr) + } else if !pathExists { + if err := os.MkdirAll(globalPDPath, 0750); err != nil { + return "", err + } } devicePath, found := waitForPath(b.Pool, b.Image, 1) @@ -287,7 +289,7 @@ func (util *RBDUtil) AttachDisk(b rbdMounter) error { // fence off other mappers if err = util.fencing(b); err != nil { - return rbdErrors(err, fmt.Errorf("rbd: failed to lock image %s (maybe locked by other nodes), error %v", b.Image, err)) + return "", rbdErrors(err, fmt.Errorf("rbd: failed to lock image %s (maybe locked by other nodes), error %v", b.Image, err)) } // rbd lock remove needs ceph and image config // but kubelet doesn't get them from apiserver during teardown @@ -317,49 +319,43 @@ func (util *RBDUtil) AttachDisk(b rbdMounter) error { glog.V(1).Infof("rbd: map error %v %s", err, string(output)) } if err != nil { - return fmt.Errorf("rbd: map failed %v %s", err, string(output)) + return "", fmt.Errorf("rbd: map failed %v %s", err, string(output)) } devicePath, found = waitForPath(b.Pool, b.Image, 10) if !found { - return errors.New("Could not map image: Timeout after 10s") + return "", errors.New("Could not map image: Timeout after 10s") } - glog.V(3).Infof("rbd: successfully map image %s/%s to %s", b.Pool, b.Image, devicePath) } - - // mount it - if err = b.mounter.FormatAndMount(devicePath, globalPDPath, b.fsType, nil); err != nil { - err = fmt.Errorf("rbd: failed to mount rbd volume %s [%s] to %s, error %v", devicePath, b.fsType, globalPDPath, err) - } - glog.V(3).Infof("rbd: successfully mount image %s/%s at %s", b.Pool, b.Image, globalPDPath) - return err + return devicePath, err } -func (util *RBDUtil) DetachDisk(c rbdUnmounter, mntPath string) error { - device, cnt, err := mount.GetDeviceNameFromMount(c.mounter, mntPath) - if err != nil { - return fmt.Errorf("rbd detach disk: failed to get device from mnt: %s\nError: %v", mntPath, err) - } - if err = c.mounter.Unmount(mntPath); err != nil { - return fmt.Errorf("rbd detach disk: failed to umount: %s\nError: %v", mntPath, err) - } - glog.V(3).Infof("rbd: successfully umount mountpoint %s", mntPath) - // if device is no longer used, see if can unmap - if cnt <= 1 { +// DetachDisk detaches the disk from the node. +// It detaches device from the node if device is provided, and removes the lock +// if there is persisted RBD info under deviceMountPath. +func (util *RBDUtil) DetachDisk(plugin *rbdPlugin, deviceMountPath string, device string) error { + var err error + if len(device) > 0 { // rbd unmap - _, err = c.exec.Run("rbd", "unmap", device) + _, err = plugin.exec.Run("rbd", "unmap", device) if err != nil { return rbdErrors(err, fmt.Errorf("rbd: failed to unmap device %s:Error: %v", device, err)) } - - // load ceph and image/pool info to remove fencing - if err := util.loadRBD(c.rbdMounter, mntPath); err == nil { - // remove rbd lock - util.defencing(c) - } - glog.V(3).Infof("rbd: successfully unmap device %s", device) } - return nil + // load ceph and image/pool info to remove fencing + mounter := &rbdMounter{ + // util.rbdLock needs it to run command. + rbd: newRBD("", "", "", "", false, plugin, util), + } + err = util.loadRBD(mounter, deviceMountPath) + if err != nil { + glog.Errorf("failed to load rbd info from %s: %v", deviceMountPath, err) + return err + } + // remove rbd lock if found + // the disk is not attached to this node anymore, so the lock on image + // for this node can be removed safely + return util.rbdLock(*mounter, false) } func (util *RBDUtil) CreateImage(p *rbdVolumeProvisioner) (r *v1.RBDPersistentVolumeSource, size int, err error) { From 3e570ad36d4b1ac0cd96f9190bf5753e6c5bbc70 Mon Sep 17 00:00:00 2001 From: Yecheng Fu Date: Sat, 21 Oct 2017 12:41:18 +0800 Subject: [PATCH 3/4] RBD Plugin: Remove deviceMountPath before return on error Attach.MountDevice. --- pkg/volume/rbd/attacher.go | 1 + 1 file changed, 1 insertion(+) diff --git a/pkg/volume/rbd/attacher.go b/pkg/volume/rbd/attacher.go index a7df8dbeb63..f08f499ec7f 100644 --- a/pkg/volume/rbd/attacher.go +++ b/pkg/volume/rbd/attacher.go @@ -153,6 +153,7 @@ func (attacher *rbdAttacher) MountDevice(spec *volume.Spec, devicePath string, d mountOptions := volume.MountOptionFromSpec(spec, options...) err = attacher.plugin.mounter.FormatAndMount(devicePath, deviceMountPath, fstype, mountOptions) if err != nil { + os.Remove(deviceMountPath) return fmt.Errorf("rbd: failed to mount device %s at %s (fstype: %s), error %v", devicePath, deviceMountPath, fstype, err) } glog.V(3).Infof("rbd: successfully mount device %s at %s (fstype: %s)", devicePath, deviceMountPath, fstype) From f2af1af82fceacddf6958b31b1946d204c313574 Mon Sep 17 00:00:00 2001 From: Yecheng Fu Date: Tue, 24 Oct 2017 12:16:12 +0800 Subject: [PATCH 4/4] RBD Plugin: No need to acquire advisory lock any more! With central attachdetach controller, we don't need to lock the image any more. But for backward compatibility, we should: 1) Check if the image is still used by nodes running old kubelet in attaching. 2) Clean old rbd.json file and remove lock if found in detaching. --- pkg/volume/rbd/BUILD | 2 +- pkg/volume/rbd/attacher.go | 2 +- pkg/volume/rbd/rbd_test.go | 84 --------------------------- pkg/volume/rbd/rbd_util.go | 114 +++++++++++++++++++------------------ 4 files changed, 61 insertions(+), 141 deletions(-) diff --git a/pkg/volume/rbd/BUILD b/pkg/volume/rbd/BUILD index 5185d960231..aef8b366b4e 100644 --- a/pkg/volume/rbd/BUILD +++ b/pkg/volume/rbd/BUILD @@ -17,6 +17,7 @@ go_library( ], importpath = "k8s.io/kubernetes/pkg/volume/rbd", deps = [ + "//pkg/util/file:go_default_library", "//pkg/util/mount:go_default_library", "//pkg/util/node:go_default_library", "//pkg/util/strings:go_default_library", @@ -43,7 +44,6 @@ go_test( "//pkg/util/mount:go_default_library", "//pkg/volume:go_default_library", "//pkg/volume/testing:go_default_library", - "//vendor/github.com/stretchr/testify/assert:go_default_library", "//vendor/k8s.io/api/core/v1:go_default_library", "//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library", "//vendor/k8s.io/apimachinery/pkg/types:go_default_library", diff --git a/pkg/volume/rbd/attacher.go b/pkg/volume/rbd/attacher.go index f08f499ec7f..7d2fab2bbb9 100644 --- a/pkg/volume/rbd/attacher.go +++ b/pkg/volume/rbd/attacher.go @@ -206,7 +206,7 @@ func (detacher *rbdDetacher) UnmountDevice(deviceMountPath string) error { return err } glog.V(3).Infof("rbd: successfully detach device %s", devicePath) - err = os.RemoveAll(deviceMountPath) + err = os.Remove(deviceMountPath) if err != nil { return err } diff --git a/pkg/volume/rbd/rbd_test.go b/pkg/volume/rbd/rbd_test.go index 99192a2cb37..4f67311918b 100644 --- a/pkg/volume/rbd/rbd_test.go +++ b/pkg/volume/rbd/rbd_test.go @@ -18,16 +18,13 @@ package rbd import ( "fmt" - "io/ioutil" "os" - "path/filepath" "reflect" "strings" "sync" "testing" "time" - "github.com/stretchr/testify/assert" "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/types" @@ -397,87 +394,6 @@ func TestPersistentClaimReadOnlyFlag(t *testing.T) { } } -func TestPersistAndLoadRBD(t *testing.T) { - tmpDir, err := utiltesting.MkTmpdir("rbd_test") - if err != nil { - t.Fatalf("error creating temp dir: %v", err) - } - defer os.RemoveAll(tmpDir) - - testcases := []struct { - rbdMounter rbdMounter - expectedJSONStr string - expectedLoadedRBDMounter rbdMounter - }{ - { - rbdMounter{}, - `{"Mon":null,"Id":"","Keyring":"","Secret":""}`, - rbdMounter{}, - }, - { - rbdMounter{ - rbd: &rbd{ - podUID: "poduid", - Pool: "kube", - Image: "some-test-image", - ReadOnly: false, - MetricsProvider: volume.NewMetricsStatFS("/tmp"), - }, - Mon: []string{"127.0.0.1"}, - Id: "kube", - Keyring: "", - Secret: "QVFEcTdKdFp4SmhtTFJBQUNwNDI3UnhGRzBvQ1Y0SUJwLy9pRUE9PQ==", - }, - ` -{ - "Pool": "kube", - "Image": "some-test-image", - "ReadOnly": false, - "Mon": ["127.0.0.1"], - "Id": "kube", - "Keyring": "", - "Secret": "QVFEcTdKdFp4SmhtTFJBQUNwNDI3UnhGRzBvQ1Y0SUJwLy9pRUE9PQ==" -} - `, - rbdMounter{ - rbd: &rbd{ - Pool: "kube", - Image: "some-test-image", - ReadOnly: false, - }, - Mon: []string{"127.0.0.1"}, - Id: "kube", - Keyring: "", - Secret: "QVFEcTdKdFp4SmhtTFJBQUNwNDI3UnhGRzBvQ1Y0SUJwLy9pRUE9PQ==", - }, - }, - } - - util := &RBDUtil{} - for _, c := range testcases { - err = util.persistRBD(c.rbdMounter, tmpDir) - if err != nil { - t.Errorf("failed to persist rbd: %v, err: %v", c.rbdMounter, err) - } - jsonFile := filepath.Join(tmpDir, "rbd.json") - jsonData, err := ioutil.ReadFile(jsonFile) - if err != nil { - t.Errorf("failed to read json file %s: %v", jsonFile, err) - } - if !assert.JSONEq(t, c.expectedJSONStr, string(jsonData)) { - t.Errorf("json file does not match expected one: %s, should be %s", string(jsonData), c.expectedJSONStr) - } - tmpRBDMounter := rbdMounter{} - err = util.loadRBD(&tmpRBDMounter, tmpDir) - if err != nil { - t.Errorf("faild to load rbd: %v", err) - } - if !reflect.DeepEqual(tmpRBDMounter, c.expectedLoadedRBDMounter) { - t.Errorf("loaded rbd does not equal to expected one: %v, should be %v", tmpRBDMounter, c.rbdMounter) - } - } -} - func TestGetSecretNameAndNamespace(t *testing.T) { secretName := "test-secret-name" secretNamespace := "test-secret-namespace" diff --git a/pkg/volume/rbd/rbd_util.go b/pkg/volume/rbd/rbd_util.go index 2e598a99f73..edf0efbb290 100644 --- a/pkg/volume/rbd/rbd_util.go +++ b/pkg/volume/rbd/rbd_util.go @@ -35,6 +35,7 @@ import ( "github.com/golang/glog" "k8s.io/api/core/v1" + fileutil "k8s.io/kubernetes/pkg/util/file" "k8s.io/kubernetes/pkg/util/node" "k8s.io/kubernetes/pkg/volume" volutil "k8s.io/kubernetes/pkg/volume/util" @@ -225,46 +226,6 @@ func (util *RBDUtil) rbdLock(b rbdMounter, lock bool) error { return err } -func (util *RBDUtil) persistRBD(rbd rbdMounter, mnt string) error { - file := path.Join(mnt, "rbd.json") - fp, err := os.Create(file) - if err != nil { - return fmt.Errorf("rbd: create err %s/%s", file, err) - } - defer fp.Close() - - encoder := json.NewEncoder(fp) - if err = encoder.Encode(rbd); err != nil { - return fmt.Errorf("rbd: encode err: %v.", err) - } - - return nil -} - -func (util *RBDUtil) loadRBD(mounter *rbdMounter, mnt string) error { - file := path.Join(mnt, "rbd.json") - fp, err := os.Open(file) - if err != nil { - return fmt.Errorf("rbd: open err %s/%s", file, err) - } - defer fp.Close() - - decoder := json.NewDecoder(fp) - if err = decoder.Decode(mounter); err != nil { - return fmt.Errorf("rbd: decode err: %v.", err) - } - - return nil -} - -func (util *RBDUtil) fencing(b rbdMounter) error { - // no need to fence readOnly - if (&b).GetAttributes().ReadOnly { - return nil - } - return util.rbdLock(b, true) -} - // AttachDisk attaches the disk on the node. // If Volume is not read-only, acquire a lock on image first. func (util *RBDUtil) AttachDisk(b rbdMounter) (string, error) { @@ -287,16 +248,16 @@ func (util *RBDUtil) AttachDisk(b rbdMounter) (string, error) { glog.Warningf("rbd: failed to load rbd kernel module:%v", err) } - // fence off other mappers - if err = util.fencing(b); err != nil { - return "", rbdErrors(err, fmt.Errorf("rbd: failed to lock image %s (maybe locked by other nodes), error %v", b.Image, err)) + // Currently, we don't acquire advisory lock on image, but for backward + // compatibility, we need to check if the image is being used by nodes running old kubelet. + found, err := util.rbdStatus(&b) + if err != nil { + return "", err + } + if found { + glog.Info("rbd is still being used ", b.Image) + return "", fmt.Errorf("rbd %s is still being used", b.Image) } - // rbd lock remove needs ceph and image config - // but kubelet doesn't get them from apiserver during teardown - // so persit rbd config so upon disk detach, rbd lock can be removed - // since rbd json is persisted in the same local directory that is used as rbd mountpoint later, - // the json file remains invisible during rbd mount and thus won't be removed accidentally. - util.persistRBD(b, globalPDPath) // rbd map l := len(b.Mon) @@ -342,20 +303,55 @@ func (util *RBDUtil) DetachDisk(plugin *rbdPlugin, deviceMountPath string, devic } glog.V(3).Infof("rbd: successfully unmap device %s", device) } - // load ceph and image/pool info to remove fencing + // Currently, we don't persist rbd info on the disk, but for backward + // compatbility, we need to clean it if found. + rbdFile := path.Join(deviceMountPath, "rbd.json") + exists, err := fileutil.FileExists(rbdFile) + if err != nil { + return err + } + if exists { + glog.V(3).Infof("rbd: old rbd.json is found under %s, cleaning it", deviceMountPath) + err = util.cleanOldRBDFile(plugin, rbdFile) + if err != nil { + glog.Errorf("rbd: failed to clean %s", rbdFile) + return err + } + glog.V(3).Infof("rbd: successfully remove %s", rbdFile) + } + return nil +} + +// cleanOldRBDFile read rbd info from rbd.json file and removes lock if found. +// At last, it removes rbd.json file. +func (util *RBDUtil) cleanOldRBDFile(plugin *rbdPlugin, rbdFile string) error { mounter := &rbdMounter{ // util.rbdLock needs it to run command. rbd: newRBD("", "", "", "", false, plugin, util), } - err = util.loadRBD(mounter, deviceMountPath) + fp, err := os.Open(rbdFile) if err != nil { - glog.Errorf("failed to load rbd info from %s: %v", deviceMountPath, err) + return fmt.Errorf("rbd: open err %s/%s", rbdFile, err) + } + defer fp.Close() + + decoder := json.NewDecoder(fp) + if err = decoder.Decode(mounter); err != nil { + return fmt.Errorf("rbd: decode err: %v.", err) + } + + if err != nil { + glog.Errorf("failed to load rbd info from %s: %v", rbdFile, err) return err } // remove rbd lock if found // the disk is not attached to this node anymore, so the lock on image // for this node can be removed safely - return util.rbdLock(*mounter, false) + err = util.rbdLock(*mounter, false) + if err == nil { + os.Remove(rbdFile) + } + return err } func (util *RBDUtil) CreateImage(p *rbdVolumeProvisioner) (r *v1.RBDPersistentVolumeSource, size int, err error) { @@ -438,6 +434,14 @@ func (util *RBDUtil) rbdStatus(b *rbdMounter) (bool, error) { var output string var cmd []byte + // If we don't have admin id/secret (e.g. attaching), fallback to user id/secret. + id := b.adminId + secret := b.adminSecret + if id == "" { + id = b.Id + secret = b.Secret + } + l := len(b.Mon) start := rand.Int() % l // iterate all hosts until rbd command succeeds. @@ -457,9 +461,9 @@ func (util *RBDUtil) rbdStatus(b *rbdMounter) (bool, error) { // # image does not exist (exit=2) // rbd: error opening image kubernetes-dynamic-pvc-: (2) No such file or directory // - glog.V(4).Infof("rbd: status %s using mon %s, pool %s id %s key %s", b.Image, mon, b.Pool, b.adminId, b.adminSecret) + glog.V(4).Infof("rbd: status %s using mon %s, pool %s id %s key %s", b.Image, mon, b.Pool, id, secret) cmd, err = b.exec.Run("rbd", - "status", b.Image, "--pool", b.Pool, "-m", mon, "--id", b.adminId, "--key="+b.adminSecret) + "status", b.Image, "--pool", b.Pool, "-m", mon, "--id", id, "--key="+secret) output = string(cmd) // break if command succeeds