Merge pull request #31251 from rootfs/rbd-prov3

Automatic merge from submit-queue

support storage class in Ceph RBD volume

replace WIP PR #30959, using PV annotation idea from @jsafrane 

@kubernetes/sig-storage @johscheuer @elsonrodriguez
This commit is contained in:
Kubernetes Submit Queue
2016-09-10 07:03:14 -07:00
committed by GitHub
11 changed files with 511 additions and 21 deletions

View File

@@ -45,6 +45,7 @@ import (
"k8s.io/kubernetes/pkg/volume/glusterfs" "k8s.io/kubernetes/pkg/volume/glusterfs"
"k8s.io/kubernetes/pkg/volume/host_path" "k8s.io/kubernetes/pkg/volume/host_path"
"k8s.io/kubernetes/pkg/volume/nfs" "k8s.io/kubernetes/pkg/volume/nfs"
"k8s.io/kubernetes/pkg/volume/rbd"
"k8s.io/kubernetes/pkg/volume/vsphere_volume" "k8s.io/kubernetes/pkg/volume/vsphere_volume"
) )
@@ -102,7 +103,8 @@ func ProbeControllerVolumePlugins(cloud cloudprovider.Interface, config componen
} }
allPlugins = append(allPlugins, nfs.ProbeVolumePlugins(nfsConfig)...) allPlugins = append(allPlugins, nfs.ProbeVolumePlugins(nfsConfig)...)
allPlugins = append(allPlugins, glusterfs.ProbeVolumePlugins()...) allPlugins = append(allPlugins, glusterfs.ProbeVolumePlugins()...)
// add rbd provisioner
allPlugins = append(allPlugins, rbd.ProbeVolumePlugins()...)
if cloud != nil { if cloud != nil {
switch { switch {
case aws.ProviderName == cloud.ProviderName(): case aws.ProviderName == cloud.ProviderName():

View File

@@ -121,6 +121,32 @@ parameters:
* `type`: [VolumeType](http://docs.openstack.org/admin-guide/dashboard-manage-volumes.html) created in Cinder. Default is empty. * `type`: [VolumeType](http://docs.openstack.org/admin-guide/dashboard-manage-volumes.html) created in Cinder. Default is empty.
* `availability`: Availability Zone. Default is empty. * `availability`: Availability Zone. Default is empty.
#### Ceph RBD
```yaml
apiVersion: extensions/v1beta1
kind: StorageClass
metadata:
name: fast
provisioner: kubernetes.io/rbd
parameters:
monitors: 10.16.153.105:6789
adminId: kube
adminSecretName: ceph-secret
adminSecretNamespace: kube-system
pool: kube
userId: kube
userSecretName: ceph-secret-user
```
* `monitors`: Ceph monitors, comma delimited. It is required.
* `adminId`: Ceph client ID that is capable of creating images in the pool. Default is "admin".
* `adminSecret`: Secret Name for `adminId`. It is required.
* `adminSecretNamespace`: The namespace for `adminSecret`. Default is "default".
* `pool`: Ceph RBD pool. Default is "rbd".
* `userId`: Ceph client ID that is used to map the RBD image. Default is the same as `adminId`.
* `userSecretName`: The name of Ceph Secret for `userId` to map RBD image. It must exist in the same namespace as PVCs. It is required.
### User provisioning requests ### User provisioning requests
Users request dynamically provisioned storage by including a storage class in their `PersistentVolumeClaim`. Users request dynamically provisioned storage by including a storage class in their `PersistentVolumeClaim`.
@@ -152,6 +178,8 @@ In the future, the storage class may remain in an annotation or become a field o
### Sample output ### Sample output
#### GCE
This example uses GCE but any provisioner would follow the same flow. This example uses GCE but any provisioner would follow the same flow.
First we note there are no Persistent Volumes in the cluster. After creating a storage class and a claim including that storage class, we see a new PV is created First we note there are no Persistent Volumes in the cluster. After creating a storage class and a claim including that storage class, we see a new PV is created
@@ -184,6 +212,75 @@ $ kubectl get pv
``` ```
#### Ceph RBD
First create Ceph admin's Secret in the system namespace. Here the Secret is created in `kube-system`:
```
$ kubectl create -f examples/experimental/persistent-volume-provisioning/rbd/ceph-secret-admin.yaml --namespace=kube-system
```
Then create RBD Storage Class:
```
$ kubectl create -f examples/experimental/persistent-volume-provisioning/rbd/rbd-storage-class.yaml
```
Before creating PVC in user's namespace (e.g. myns), make sure the Ceph user's Secret exists, if not, create the Secret:
```
$ kubectl create -f examples/experimental/persistent-volume-provisioning/rbd/ceph-secret-user.yaml --namespace=myns
```
Now create a PVC in user's namespace (e.g. myns):
```
$ kubectl create -f examples/experimental/persistent-volume-provisioning/claim1.json --namespace=myns
```
Check the PV and PVC are created:
```
$ kubectl describe pvc --namespace=myns
Name: claim1
Namespace: myns
Status: Bound
Volume: pvc-1cfa23b3-664b-11e6-9eb9-90b11c09520d
Labels: <none>
Capacity: 3Gi
Access Modes: RWO
No events.
$ kubectl describe pv
Name: pvc-1cfa23b3-664b-11e6-9eb9-90b11c09520d
Labels: <none>
Status: Bound
Claim: myns/claim1
Reclaim Policy: Delete
Access Modes: RWO
Capacity: 3Gi
Message:
Source:
Type: RBD (a Rados Block Device mount on the host that shares a pod's lifetime)
CephMonitors: [10.16.153.105:6789]
RBDImage: kubernetes-dynamic-pvc-1cfb1862-664b-11e6-9a5d-90b11c09520d
FSType:
RBDPool: kube
RadosUser: kube
Keyring: /etc/ceph/keyring
SecretRef: &{ceph-secret-user}
ReadOnly: false
No events.
```
Create a Pod to use the PVC:
```
$ kubectl create -f examples/experimental/persistent-volume-provisioning/rbd/pod.yaml --namespace=myns
```
<!-- BEGIN MUNGE: GENERATED_ANALYTICS --> <!-- BEGIN MUNGE: GENERATED_ANALYTICS -->
[![Analytics](https://kubernetes-site.appspot.com/UA-36037335-10/GitHub/examples/experimental/persistent-volume-provisioning/README.md?pixel)]() [![Analytics](https://kubernetes-site.appspot.com/UA-36037335-10/GitHub/examples/experimental/persistent-volume-provisioning/README.md?pixel)]()
<!-- END MUNGE: GENERATED_ANALYTICS --> <!-- END MUNGE: GENERATED_ANALYTICS -->

View File

@@ -0,0 +1,6 @@
apiVersion: v1
kind: Secret
metadata:
name: ceph-secret-admin
data:
key: QVFEQ1pMdFhPUnQrSmhBQUFYaERWNHJsZ3BsMmNjcDR6RFZST0E9PQ==

View File

@@ -0,0 +1,6 @@
apiVersion: v1
kind: Secret
metadata:
name: ceph-secret-user
data:
key: QVFBTWdYaFZ3QkNlRGhBQTlubFBhRnlmVVNhdEdENGRyRldEdlE9PQ==

View File

@@ -0,0 +1,23 @@
apiVersion: v1
kind: ReplicationController
metadata:
name: server
spec:
replicas: 1
selector:
role: server
template:
metadata:
labels:
role: server
spec:
containers:
- name: server
image: nginx
volumeMounts:
- mountPath: /var/lib/www/html
name: mypvc
volumes:
- name: mypvc
persistentVolumeClaim:
claimName: claim1

View File

@@ -0,0 +1,14 @@
apiVersion: extensions/v1beta1
kind: StorageClass
metadata:
name: slow
provisioner: kubernetes.io/rbd
parameters:
monitors: 10.16.153.105:6789
adminId: admin
adminSecretName: ceph-secret-admin
adminSecretNamespace: "kube-system"
pool: kube
userId: kube
userSecretName: ceph-secret-user

View File

@@ -26,6 +26,7 @@ import (
"os" "os"
"github.com/golang/glog" "github.com/golang/glog"
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/util/mount" "k8s.io/kubernetes/pkg/util/mount"
"k8s.io/kubernetes/pkg/volume" "k8s.io/kubernetes/pkg/volume"
) )
@@ -37,6 +38,10 @@ type diskManager interface {
AttachDisk(disk rbdMounter) error AttachDisk(disk rbdMounter) error
// Detaches the disk from the kubelet's host machine. // Detaches the disk from the kubelet's host machine.
DetachDisk(disk rbdUnmounter, mntPath string) error DetachDisk(disk rbdUnmounter, mntPath string) error
// Creates a rbd image
CreateImage(provisioner *rbdVolumeProvisioner) (r *api.RBDVolumeSource, volumeSizeGB int, err error)
// Deletes a rbd image
DeleteImage(deleter *rbdVolumeDeleter) error
} }
// utility to mount a disk based filesystem // utility to mount a disk based filesystem

View File

@@ -18,14 +18,19 @@ package rbd
import ( import (
"fmt" "fmt"
dstrings "strings"
"github.com/golang/glog" "github.com/golang/glog"
"k8s.io/kubernetes/pkg/api" "k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/api/resource"
clientset "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset"
"k8s.io/kubernetes/pkg/types" "k8s.io/kubernetes/pkg/types"
"k8s.io/kubernetes/pkg/util/exec" "k8s.io/kubernetes/pkg/util/exec"
"k8s.io/kubernetes/pkg/util/mount" "k8s.io/kubernetes/pkg/util/mount"
"k8s.io/kubernetes/pkg/util/strings" "k8s.io/kubernetes/pkg/util/strings"
"k8s.io/kubernetes/pkg/util/uuid"
"k8s.io/kubernetes/pkg/volume" "k8s.io/kubernetes/pkg/volume"
volutil "k8s.io/kubernetes/pkg/volume/util"
) )
// This is the primary entrypoint for volume plugins. // This is the primary entrypoint for volume plugins.
@@ -40,9 +45,15 @@ type rbdPlugin struct {
var _ volume.VolumePlugin = &rbdPlugin{} var _ volume.VolumePlugin = &rbdPlugin{}
var _ volume.PersistentVolumePlugin = &rbdPlugin{} var _ volume.PersistentVolumePlugin = &rbdPlugin{}
var _ volume.DeletableVolumePlugin = &rbdPlugin{}
var _ volume.ProvisionableVolumePlugin = &rbdPlugin{}
const ( const (
rbdPluginName = "kubernetes.io/rbd" rbdPluginName = "kubernetes.io/rbd"
annCephAdminID = "rbd.kubernetes.io/admin"
annCephAdminSecretName = "rbd.kubernetes.io/adminsecretname"
annCephAdminSecretNameSpace = "rbd.kubernetes.io/adminsecretnamespace"
secretKeyName = "key" // key name used in secret
) )
func (plugin *rbdPlugin) Init(host volume.VolumeHost) error { func (plugin *rbdPlugin) Init(host volume.VolumeHost) error {
@@ -86,26 +97,17 @@ func (plugin *rbdPlugin) GetAccessModes() []api.PersistentVolumeAccessMode {
} }
func (plugin *rbdPlugin) NewMounter(spec *volume.Spec, pod *api.Pod, _ volume.VolumeOptions) (volume.Mounter, error) { func (plugin *rbdPlugin) NewMounter(spec *volume.Spec, pod *api.Pod, _ volume.VolumeOptions) (volume.Mounter, error) {
secret := "" var secret string
var err error
source, _ := plugin.getRBDVolumeSource(spec) source, _ := plugin.getRBDVolumeSource(spec)
if source.SecretRef != nil { if source.SecretRef != nil {
kubeClient := plugin.host.GetKubeClient() if secret, err = parseSecret(pod.Namespace, source.SecretRef.Name, plugin.host.GetKubeClient()); err != nil {
if kubeClient == nil { glog.Errorf("Couldn't get secret from %v/%v", pod.Namespace, source.SecretRef)
return nil, fmt.Errorf("Cannot get kube client")
}
secretName, err := kubeClient.Core().Secrets(pod.Namespace).Get(source.SecretRef.Name)
if err != nil {
glog.Errorf("Couldn't get secret %v/%v", pod.Namespace, source.SecretRef)
return nil, err return nil, err
} }
for name, data := range secretName.Data {
secret = string(data)
glog.V(1).Infof("ceph secret info: %s/%s", name, secret)
}
} }
// Inject real implementations here, test through the internal function. // Inject real implementations here, test through the internal function.
return plugin.newMounterInternal(spec, pod.UID, &RBDUtil{}, plugin.host.GetMounter(), secret) return plugin.newMounterInternal(spec, pod.UID, &RBDUtil{}, plugin.host.GetMounter(), secret)
} }
@@ -177,6 +179,188 @@ func (plugin *rbdPlugin) ConstructVolumeSpec(volumeName, mountPath string) (*vol
return volume.NewSpecFromVolume(rbdVolume), nil return volume.NewSpecFromVolume(rbdVolume), nil
} }
func (plugin *rbdPlugin) NewDeleter(spec *volume.Spec) (volume.Deleter, error) {
if spec.PersistentVolume != nil && spec.PersistentVolume.Spec.RBD == nil {
return nil, fmt.Errorf("spec.PersistentVolumeSource.Spec.RBD is nil")
}
admin, adminSecretName, adminSecretNamespace, err := annotationsToParam(spec.PersistentVolume)
if err != nil {
return nil, fmt.Errorf("cannot find Ceph credentials to delete rbd PV: %v", err)
}
secret, err := parseSecret(adminSecretNamespace, adminSecretName, plugin.host.GetKubeClient())
if err != nil {
// log error but don't return yet
glog.Errorf("failed to get admin secret from [%q/%q]: %v", adminSecretNamespace, adminSecretName, err)
}
return plugin.newDeleterInternal(spec, admin, secret, &RBDUtil{})
}
func (plugin *rbdPlugin) newDeleterInternal(spec *volume.Spec, admin, secret string, manager diskManager) (volume.Deleter, error) {
return &rbdVolumeDeleter{
rbdMounter: &rbdMounter{
rbd: &rbd{
volName: spec.Name(),
Image: spec.PersistentVolume.Spec.RBD.RBDImage,
Pool: spec.PersistentVolume.Spec.RBD.RBDPool,
manager: manager,
plugin: plugin,
},
Mon: spec.PersistentVolume.Spec.RBD.CephMonitors,
adminId: admin,
adminSecret: secret,
}}, nil
}
func (plugin *rbdPlugin) NewProvisioner(options volume.VolumeOptions) (volume.Provisioner, error) {
if len(options.AccessModes) == 0 {
options.AccessModes = plugin.GetAccessModes()
}
return plugin.newProvisionerInternal(options, &RBDUtil{})
}
func (plugin *rbdPlugin) newProvisionerInternal(options volume.VolumeOptions, manager diskManager) (volume.Provisioner, error) {
return &rbdVolumeProvisioner{
rbdMounter: &rbdMounter{
rbd: &rbd{
manager: manager,
plugin: plugin,
},
},
options: options,
}, nil
}
type rbdVolumeProvisioner struct {
*rbdMounter
options volume.VolumeOptions
}
func (r *rbdVolumeProvisioner) Provision() (*api.PersistentVolume, error) {
if r.options.Selector != nil {
return nil, fmt.Errorf("claim Selector is not supported")
}
var err error
adminSecretName := ""
adminSecretNamespace := "default"
secretName := ""
secret := ""
for k, v := range r.options.Parameters {
switch dstrings.ToLower(k) {
case "monitors":
arr := dstrings.Split(v, ",")
for _, m := range arr {
r.Mon = append(r.Mon, m)
}
case "adminid":
r.adminId = v
case "adminsecretname":
adminSecretName = v
case "adminsecretnamespace":
adminSecretNamespace = v
case "userid":
r.Id = v
case "pool":
r.Pool = v
case "usersecretname":
secretName = v
default:
return nil, fmt.Errorf("invalid option %q for volume plugin %s", k, r.plugin.GetPluginName())
}
}
// sanity check
if adminSecretName == "" {
return nil, fmt.Errorf("missing Ceph admin secret name")
}
if secret, err = parseSecret(adminSecretNamespace, adminSecretName, r.plugin.host.GetKubeClient()); err != nil {
// log error but don't return yet
glog.Errorf("failed to get admin secret from [%q/%q]", adminSecretNamespace, adminSecretName)
}
r.adminSecret = secret
if len(r.Mon) < 1 {
return nil, fmt.Errorf("missing Ceph monitors")
}
if secretName == "" {
return nil, fmt.Errorf("missing user secret name")
}
if r.adminId == "" {
r.adminId = "admin"
}
if r.Pool == "" {
r.Pool = "rbd"
}
if r.Id == "" {
r.Id = r.adminId
}
// create random image name
image := fmt.Sprintf("kubernetes-dynamic-pvc-%s", uuid.NewUUID())
r.rbdMounter.Image = image
rbd, sizeMB, err := r.manager.CreateImage(r)
if err != nil {
glog.Errorf("rbd: create volume failed, err: %v", err)
return nil, fmt.Errorf("rbd: create volume failed, err: %v", err)
}
glog.Infof("successfully created rbd image %q", image)
pv := new(api.PersistentVolume)
rbd.SecretRef = new(api.LocalObjectReference)
rbd.SecretRef.Name = secretName
rbd.RadosUser = r.Id
pv.Spec.PersistentVolumeSource.RBD = rbd
pv.Spec.PersistentVolumeReclaimPolicy = r.options.PersistentVolumeReclaimPolicy
pv.Spec.AccessModes = r.options.AccessModes
pv.Spec.Capacity = api.ResourceList{
api.ResourceName(api.ResourceStorage): resource.MustParse(fmt.Sprintf("%dMi", sizeMB)),
}
// place parameters in pv selector
paramToAnnotations(r.adminId, adminSecretNamespace, adminSecretName, pv)
return pv, nil
}
type rbdVolumeDeleter struct {
*rbdMounter
}
func (r *rbdVolumeDeleter) GetPath() string {
name := rbdPluginName
return r.plugin.host.GetPodVolumeDir(r.podUID, strings.EscapeQualifiedNameForDisk(name), r.volName)
}
func (r *rbdVolumeDeleter) Delete() error {
return r.manager.DeleteImage(r)
}
func paramToAnnotations(admin, adminSecretNamespace, adminSecretName string, pv *api.PersistentVolume) {
if pv.Annotations == nil {
pv.Annotations = make(map[string]string)
}
pv.Annotations[annCephAdminID] = admin
pv.Annotations[annCephAdminSecretName] = adminSecretName
pv.Annotations[annCephAdminSecretNameSpace] = adminSecretNamespace
}
func annotationsToParam(pv *api.PersistentVolume) (string, string, string, error) {
if pv.Annotations == nil {
return "", "", "", fmt.Errorf("PV has no annotation, cannot get Ceph admin credentials")
}
var admin, adminSecretName, adminSecretNamespace string
found := false
admin, found = pv.Annotations[annCephAdminID]
if !found {
return "", "", "", fmt.Errorf("Cannot get Ceph admin id from PV annotations")
}
adminSecretName, found = pv.Annotations[annCephAdminSecretName]
if !found {
return "", "", "", fmt.Errorf("Cannot get Ceph admin secret from PV annotations")
}
adminSecretNamespace, found = pv.Annotations[annCephAdminSecretNameSpace]
if !found {
return "", "", "", fmt.Errorf("Cannot get Ceph admin secret namespace from PV annotations")
}
return admin, adminSecretName, adminSecretNamespace, nil
}
type rbd struct { type rbd struct {
volName string volName string
podUID types.UID podUID types.UID
@@ -199,11 +383,13 @@ func (rbd *rbd) GetPath() string {
type rbdMounter struct { type rbdMounter struct {
*rbd *rbd
// capitalized so they can be exported in persistRBD() // capitalized so they can be exported in persistRBD()
Mon []string Mon []string
Id string Id string
Keyring string Keyring string
Secret string Secret string
fsType string fsType string
adminSecret string
adminId string
} }
var _ volume.Mounter = &rbdMounter{} var _ volume.Mounter = &rbdMounter{}
@@ -262,3 +448,24 @@ func getVolumeSource(
return nil, false, fmt.Errorf("Spec does not reference a RBD volume type") return nil, false, fmt.Errorf("Spec does not reference a RBD volume type")
} }
// parseSecretMap locates the secret by key name.
func parseSecret(namespace, secretName string, kubeClient clientset.Interface) (string, error) {
secretMap, err := volutil.GetSecret(namespace, secretName, kubeClient)
if err != nil {
glog.Errorf("failed to get secret from [%q/%q]", namespace, secretName)
return "", fmt.Errorf("failed to get secret from [%q/%q]", namespace, secretName)
}
if len(secretMap) == 0 {
return "", fmt.Errorf("empty secret map")
}
secret := ""
for k, v := range secretMap {
if k == secretKeyName {
return v, nil
}
secret = v
}
// If not found, the last secret in the map wins as done before
return secret, nil
}

View File

@@ -87,6 +87,14 @@ func (fake *fakeDiskManager) DetachDisk(c rbdUnmounter, mntPath string) error {
return nil return nil
} }
func (fake *fakeDiskManager) CreateImage(provisioner *rbdVolumeProvisioner) (r *api.RBDVolumeSource, volumeSizeGB int, err error) {
return nil, 0, fmt.Errorf("not implemented")
}
func (fake *fakeDiskManager) DeleteImage(deleter *rbdVolumeDeleter) error {
return fmt.Errorf("not implemented")
}
func doTestPlugin(t *testing.T, spec *volume.Spec) { func doTestPlugin(t *testing.T, spec *volume.Spec) {
tmpDir, err := utiltesting.MkTmpdir("rbd_test") tmpDir, err := utiltesting.MkTmpdir("rbd_test")
if err != nil { if err != nil {

View File

@@ -34,12 +34,17 @@ import (
"time" "time"
"github.com/golang/glog" "github.com/golang/glog"
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/util/exec" "k8s.io/kubernetes/pkg/util/exec"
"k8s.io/kubernetes/pkg/util/mount" "k8s.io/kubernetes/pkg/util/mount"
"k8s.io/kubernetes/pkg/util/node" "k8s.io/kubernetes/pkg/util/node"
"k8s.io/kubernetes/pkg/volume" "k8s.io/kubernetes/pkg/volume"
) )
const (
imageWatcherStr = "watcher="
)
// search /sys/bus for rbd device that matches given pool and image // search /sys/bus for rbd device that matches given pool and image
func getDevFromImageAndPool(pool, image string) (string, bool) { func getDevFromImageAndPool(pool, image string) (string, bool) {
// /sys/bus/rbd/devices/X/name and /sys/bus/rbd/devices/X/pool // /sys/bus/rbd/devices/X/name and /sys/bus/rbd/devices/X/pool
@@ -307,3 +312,102 @@ func (util *RBDUtil) DetachDisk(c rbdUnmounter, mntPath string) error {
} }
return nil return nil
} }
func (util *RBDUtil) CreateImage(p *rbdVolumeProvisioner) (r *api.RBDVolumeSource, size int, err error) {
volSizeBytes := p.options.Capacity.Value()
// convert to MB that rbd defaults on
sz := int(volume.RoundUpSize(volSizeBytes, 1024*1024))
volSz := fmt.Sprintf("%d", sz)
// rbd create
l := len(p.rbdMounter.Mon)
// pick a mon randomly
start := rand.Int() % l
// iterate all monitors until create succeeds.
for i := start; i < start+l; i++ {
mon := p.Mon[i%l]
glog.V(4).Infof("rbd: create %s size %s using mon %s, pool %s id %s key %s", p.rbdMounter.Image, volSz, mon, p.rbdMounter.Pool, p.rbdMounter.adminId, p.rbdMounter.adminSecret)
var output []byte
output, err = p.rbdMounter.plugin.execCommand("rbd",
[]string{"create", p.rbdMounter.Image, "--size", volSz, "--pool", p.rbdMounter.Pool, "--id", p.rbdMounter.adminId, "-m", mon, "--key=" + p.rbdMounter.adminSecret, "--image-format", "1"})
if err == nil {
break
} else {
glog.Warningf("failed to create rbd image, output %v", string(output))
}
}
if err != nil {
glog.Errorf("rbd: Error creating rbd image: %v", err)
return nil, 0, err
}
return &api.RBDVolumeSource{
CephMonitors: p.rbdMounter.Mon,
RBDImage: p.rbdMounter.Image,
RBDPool: p.rbdMounter.Pool,
}, sz, nil
}
func (util *RBDUtil) DeleteImage(p *rbdVolumeDeleter) error {
var output []byte
found, err := util.rbdStatus(p.rbdMounter)
if err != nil {
return err
}
if found {
glog.Info("rbd is still being used ", p.rbdMounter.Image)
return fmt.Errorf("rbd %s is still being used", p.rbdMounter.Image)
}
// rbd rm
l := len(p.rbdMounter.Mon)
// pick a mon randomly
start := rand.Int() % l
// iterate all monitors until rm succeeds.
for i := start; i < start+l; i++ {
mon := p.rbdMounter.Mon[i%l]
glog.V(4).Infof("rbd: rm %s using mon %s, pool %s id %s key %s", p.rbdMounter.Image, mon, p.rbdMounter.Pool, p.rbdMounter.adminId, p.rbdMounter.adminSecret)
output, err = p.plugin.execCommand("rbd",
[]string{"rm", p.rbdMounter.Image, "--pool", p.rbdMounter.Pool, "--id", p.rbdMounter.adminId, "-m", mon, "--key=" + p.rbdMounter.adminSecret})
if err == nil {
return nil
} else {
glog.Errorf("failed to delete rbd image, error %v ouput %v", err, string(output))
}
}
return err
}
// run rbd status command to check if there is watcher on the image
func (util *RBDUtil) rbdStatus(b *rbdMounter) (bool, error) {
var err error
var output string
var cmd []byte
l := len(b.Mon)
start := rand.Int() % l
// iterate all hosts until mount succeeds.
for i := start; i < start+l; i++ {
mon := b.Mon[i%l]
// cmd "rbd status" list the rbd client watch with the following output:
// Watchers:
// watcher=10.16.153.105:0/710245699 client.14163 cookie=1
glog.V(4).Infof("rbd: status %s using mon %s, pool %s id %s key %s", b.Image, mon, b.Pool, b.adminId, b.adminSecret)
cmd, err = b.plugin.execCommand("rbd",
[]string{"status", b.Image, "--pool", b.Pool, "-m", mon, "--id", b.adminId, "--key=" + b.adminSecret})
output = string(cmd)
if err != nil {
// ignore error code, just checkout output for watcher string
glog.Warningf("failed to execute rbd status on mon %s", mon)
}
if strings.Contains(output, imageWatcherStr) {
glog.V(4).Infof("rbd: watchers on %s: %s", b.Image, output)
return true, nil
} else {
glog.Warningf("rbd: no watchers on %s", b.Image)
return false, nil
}
}
return false, nil
}

View File

@@ -22,6 +22,7 @@ import (
"path" "path"
"github.com/golang/glog" "github.com/golang/glog"
clientset "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset"
"k8s.io/kubernetes/pkg/util/mount" "k8s.io/kubernetes/pkg/util/mount"
) )
@@ -108,3 +109,20 @@ func PathExists(path string) (bool, error) {
return false, err return false, err
} }
} }
// GetSecret locates secret by name and namespace and returns secret map
func GetSecret(namespace, secretName string, kubeClient clientset.Interface) (map[string]string, error) {
secret := make(map[string]string)
if kubeClient == nil {
return secret, fmt.Errorf("Cannot get kube client")
}
secrets, err := kubeClient.Core().Secrets(namespace).Get(secretName)
if err != nil {
return secret, err
}
for name, data := range secrets.Data {
secret[name] = string(data)
}
return secret, nil
}