Merge pull request #45518 from portworx/px-remote

Automatic merge from submit-queue (batch tested with PRs 45518, 46127, 46146, 45932, 45003)

Remove requirement to run the Portworx volume driver on master node

**What this PR does / why we need it**:
This change removes requirement to run the Portworx volume driver on Kubernetes master node.

**Special notes for your reviewer**:
Before this pull request, in order to use a Portworx volume, users had to run the Portworx container on the master node. Since it isn't ideal (and impossible on GKE) to schedule any pods on the master node, this PR removes that requirement.
This commit is contained in:
Kubernetes Submit Queue 2017-05-25 11:45:59 -07:00 committed by GitHub
commit b017a7a392
3 changed files with 101 additions and 44 deletions

View File

@ -32,12 +32,12 @@ go_library(
],
tags = ["automanaged"],
deps = [
"//pkg/api:go_default_library",
"//pkg/api/v1:go_default_library",
"//pkg/util/exec:go_default_library",
"//pkg/util/mount:go_default_library",
"//pkg/util/strings:go_default_library",
"//pkg/volume:go_default_library",
"//pkg/volume/util:go_default_library",
"//vendor/github.com/golang/glog:go_default_library",
"//vendor/github.com/libopenstorage/openstorage/api:go_default_library",
"//vendor/github.com/libopenstorage/openstorage/api/client:go_default_library",

View File

@ -29,16 +29,16 @@ import (
"k8s.io/kubernetes/pkg/util/mount"
kstrings "k8s.io/kubernetes/pkg/util/strings"
"k8s.io/kubernetes/pkg/volume"
"k8s.io/kubernetes/pkg/volume/util"
)
// This is the primary entrypoint for volume plugins.
func ProbeVolumePlugins() []volume.VolumePlugin {
return []volume.VolumePlugin{&portworxVolumePlugin{nil}}
return []volume.VolumePlugin{&portworxVolumePlugin{nil, nil}}
}
type portworxVolumePlugin struct {
host volume.VolumeHost
util *PortworxVolumeUtil
}
var _ volume.VolumePlugin = &portworxVolumePlugin{}
@ -56,6 +56,7 @@ func getPath(uid types.UID, volName string, host volume.VolumeHost) string {
func (plugin *portworxVolumePlugin) Init(host volume.VolumeHost) error {
plugin.host = host
plugin.util = &PortworxVolumeUtil{}
return nil
}
@ -89,7 +90,7 @@ func (plugin *portworxVolumePlugin) GetAccessModes() []v1.PersistentVolumeAccess
}
func (plugin *portworxVolumePlugin) NewMounter(spec *volume.Spec, pod *v1.Pod, _ volume.VolumeOptions) (volume.Mounter, error) {
return plugin.newMounterInternal(spec, pod.UID, &PortworxVolumeUtil{}, plugin.host.GetMounter())
return plugin.newMounterInternal(spec, pod.UID, plugin.util, plugin.host.GetMounter())
}
func (plugin *portworxVolumePlugin) newMounterInternal(spec *volume.Spec, podUID types.UID, manager portworxManager, mounter mount.Interface) (volume.Mounter, error) {
@ -117,10 +118,11 @@ func (plugin *portworxVolumePlugin) newMounterInternal(spec *volume.Spec, podUID
}
func (plugin *portworxVolumePlugin) NewUnmounter(volName string, podUID types.UID) (volume.Unmounter, error) {
return plugin.newUnmounterInternal(volName, podUID, &PortworxVolumeUtil{}, plugin.host.GetMounter())
return plugin.newUnmounterInternal(volName, podUID, plugin.util, plugin.host.GetMounter())
}
func (plugin *portworxVolumePlugin) newUnmounterInternal(volName string, podUID types.UID, manager portworxManager, mounter mount.Interface) (volume.Unmounter, error) {
func (plugin *portworxVolumePlugin) newUnmounterInternal(volName string, podUID types.UID, manager portworxManager,
mounter mount.Interface) (volume.Unmounter, error) {
return &portworxVolumeUnmounter{
&portworxVolume{
podUID: podUID,
@ -133,13 +135,14 @@ func (plugin *portworxVolumePlugin) newUnmounterInternal(volName string, podUID
}
func (plugin *portworxVolumePlugin) NewDeleter(spec *volume.Spec) (volume.Deleter, error) {
return plugin.newDeleterInternal(spec, &PortworxVolumeUtil{})
return plugin.newDeleterInternal(spec, plugin.util)
}
func (plugin *portworxVolumePlugin) newDeleterInternal(spec *volume.Spec, manager portworxManager) (volume.Deleter, error) {
if spec.PersistentVolume != nil && spec.PersistentVolume.Spec.PortworxVolume == nil {
return nil, fmt.Errorf("spec.PersistentVolumeSource.PortworxVolume is nil")
}
return &portworxVolumeDeleter{
portworxVolume: &portworxVolume{
volName: spec.Name(),
@ -150,7 +153,7 @@ func (plugin *portworxVolumePlugin) newDeleterInternal(spec *volume.Spec, manage
}
func (plugin *portworxVolumePlugin) NewProvisioner(options volume.VolumeOptions) (volume.Provisioner, error) {
return plugin.newProvisionerInternal(options, &PortworxVolumeUtil{})
return plugin.newProvisionerInternal(options, plugin.util)
}
func (plugin *portworxVolumePlugin) newProvisionerInternal(options volume.VolumeOptions, manager portworxManager) (volume.Provisioner, error) {
@ -311,11 +314,6 @@ func (c *portworxVolumeUnmounter) TearDown() error {
// resource was the last reference to that disk on the kubelet.
func (c *portworxVolumeUnmounter) TearDownAt(dir string) error {
glog.V(4).Infof("Portworx Volume TearDown of %s", dir)
// Unmount the bind mount inside the pod
if err := util.UnmountPath(dir, c.mounter); err != nil {
return err
}
// Call Portworx Unmount for Portworx's book-keeping.
if err := c.manager.UnmountVolume(c, dir); err != nil {
return err

View File

@ -22,7 +22,9 @@ import (
osdclient "github.com/libopenstorage/openstorage/api/client"
volumeclient "github.com/libopenstorage/openstorage/api/client/volume"
osdspec "github.com/libopenstorage/openstorage/api/spec"
osdvolume "github.com/libopenstorage/openstorage/volume"
volumeapi "github.com/libopenstorage/openstorage/volume"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/api/v1"
"k8s.io/kubernetes/pkg/volume"
)
@ -31,8 +33,8 @@ const (
osdMgmtPort = "9001"
osdDriverVersion = "v1"
pxdDriverName = "pxd"
pwxSockName = "pwx"
pvcClaimLabel = "pvc"
pxServiceName = "portworx-service"
)
type PortworxVolumeUtil struct {
@ -41,9 +43,9 @@ type PortworxVolumeUtil struct {
// CreateVolume creates a Portworx volume.
func (util *PortworxVolumeUtil) CreateVolume(p *portworxVolumeProvisioner) (string, int, map[string]string, error) {
hostname := p.plugin.host.GetHostName()
client, err := util.osdClient(hostname)
if err != nil {
driver, err := util.getPortworxDriver(p.plugin.host)
if err != nil || driver == nil {
glog.Errorf("Failed to get portworx driver. Err: %v", err)
return "", 0, nil, err
}
@ -64,7 +66,7 @@ func (util *PortworxVolumeUtil) CreateVolume(p *portworxVolumeProvisioner) (stri
// Add claim Name as a part of Portworx Volume Labels
locator.VolumeLabels = make(map[string]string)
locator.VolumeLabels[pvcClaimLabel] = p.options.PVC.Name
volumeID, err := client.Create(&locator, &source, spec)
volumeID, err := driver.Create(&locator, &source, spec)
if err != nil {
glog.V(2).Infof("Error creating Portworx Volume : %v", err)
}
@ -73,13 +75,13 @@ func (util *PortworxVolumeUtil) CreateVolume(p *portworxVolumeProvisioner) (stri
// DeleteVolume deletes a Portworx volume
func (util *PortworxVolumeUtil) DeleteVolume(d *portworxVolumeDeleter) error {
hostname := d.plugin.host.GetHostName()
client, err := util.osdClient(hostname)
if err != nil {
driver, err := util.getPortworxDriver(d.plugin.host)
if err != nil || driver == nil {
glog.Errorf("Failed to get portworx driver. Err: %v", err)
return err
}
err = client.Delete(d.volumeID)
err = driver.Delete(d.volumeID)
if err != nil {
glog.V(2).Infof("Error deleting Portworx Volume (%v): %v", d.volName, err)
return err
@ -89,13 +91,13 @@ func (util *PortworxVolumeUtil) DeleteVolume(d *portworxVolumeDeleter) error {
// AttachVolume attaches a Portworx Volume
func (util *PortworxVolumeUtil) AttachVolume(m *portworxVolumeMounter) (string, error) {
hostname := m.plugin.host.GetHostName()
client, err := util.osdClient(hostname)
if err != nil {
driver, err := util.getPortworxDriver(m.plugin.host)
if err != nil || driver == nil {
glog.Errorf("Failed to get portworx driver. Err: %v", err)
return "", err
}
devicePath, err := client.Attach(m.volName)
devicePath, err := driver.Attach(m.volName)
if err != nil {
glog.V(2).Infof("Error attaching Portworx Volume (%v): %v", m.volName, err)
return "", err
@ -105,13 +107,13 @@ func (util *PortworxVolumeUtil) AttachVolume(m *portworxVolumeMounter) (string,
// DetachVolume detaches a Portworx Volume
func (util *PortworxVolumeUtil) DetachVolume(u *portworxVolumeUnmounter) error {
hostname := u.plugin.host.GetHostName()
client, err := util.osdClient(hostname)
if err != nil {
driver, err := util.getPortworxDriver(u.plugin.host)
if err != nil || driver == nil {
glog.Errorf("Failed to get portworx driver. Err: %v", err)
return err
}
err = client.Detach(u.volName)
err = driver.Detach(u.volName)
if err != nil {
glog.V(2).Infof("Error detaching Portworx Volume (%v): %v", u.volName, err)
return err
@ -121,13 +123,13 @@ func (util *PortworxVolumeUtil) DetachVolume(u *portworxVolumeUnmounter) error {
// MountVolume mounts a Portworx Volume on the specified mountPath
func (util *PortworxVolumeUtil) MountVolume(m *portworxVolumeMounter, mountPath string) error {
hostname := m.plugin.host.GetHostName()
client, err := util.osdClient(hostname)
if err != nil {
driver, err := util.getPortworxDriver(m.plugin.host)
if err != nil || driver == nil {
glog.Errorf("Failed to get portworx driver. Err: %v", err)
return err
}
err = client.Mount(m.volName, mountPath)
err = driver.Mount(m.volName, mountPath)
if err != nil {
glog.V(2).Infof("Error mounting Portworx Volume (%v) on Path (%v): %v", m.volName, mountPath, err)
return err
@ -137,13 +139,13 @@ func (util *PortworxVolumeUtil) MountVolume(m *portworxVolumeMounter, mountPath
// UnmountVolume unmounts a Portworx Volume
func (util *PortworxVolumeUtil) UnmountVolume(u *portworxVolumeUnmounter, mountPath string) error {
hostname := u.plugin.host.GetHostName()
client, err := util.osdClient(hostname)
if err != nil {
driver, err := util.getPortworxDriver(u.plugin.host)
if err != nil || driver == nil {
glog.Errorf("Failed to get portworx driver. Err: %v", err)
return err
}
err = client.Unmount(u.volName, mountPath)
err = driver.Unmount(u.volName, mountPath)
if err != nil {
glog.V(2).Infof("Error unmounting Portworx Volume (%v) on Path (%v): %v", u.volName, mountPath, err)
return err
@ -151,14 +153,71 @@ func (util *PortworxVolumeUtil) UnmountVolume(u *portworxVolumeUnmounter, mountP
return nil
}
func (util *PortworxVolumeUtil) osdClient(hostname string) (osdvolume.VolumeDriver, error) {
osdEndpoint := "http://" + hostname + ":" + osdMgmtPort
if util.portworxClient == nil {
driverClient, err := volumeclient.NewDriverClient(osdEndpoint, pxdDriverName, osdDriverVersion)
func isClientValid(client *osdclient.Client) (bool, error) {
if client == nil {
return false, nil
}
_, err := client.Versions(osdapi.OsdVolumePath)
if err != nil {
glog.Errorf("portworx client failed driver versions check. Err: %v", err)
return false, err
}
return true, nil
}
func createDriverClient(hostname string) (*osdclient.Client, error) {
client, err := volumeclient.NewDriverClient("http://"+hostname+":"+osdMgmtPort,
pxdDriverName, osdDriverVersion)
if err != nil {
return nil, err
}
if isValid, err := isClientValid(client); isValid {
return client, nil
} else {
return nil, err
}
}
func (util *PortworxVolumeUtil) getPortworxDriver(volumeHost volume.VolumeHost) (volumeapi.VolumeDriver, error) {
if isValid, _ := isClientValid(util.portworxClient); isValid {
return volumeclient.VolumeDriver(util.portworxClient), nil
}
// create new client
var err error
util.portworxClient, err = createDriverClient(volumeHost.GetHostName()) // for backward compatibility
if err != nil || util.portworxClient == nil {
// Create client from portworx service
kubeClient := volumeHost.GetKubeClient()
if kubeClient == nil {
glog.Error("Failed to get kubeclient when creating portworx client")
return nil, nil
}
opts := metav1.GetOptions{}
svc, err := kubeClient.CoreV1().Services(api.NamespaceSystem).Get(pxServiceName, opts)
if err != nil {
glog.Errorf("Failed to get service. Err: %v", err)
return nil, err
}
util.portworxClient = driverClient
if svc == nil {
glog.Errorf("Service: %v not found. Consult Portworx docs to deploy it.", pxServiceName)
return nil, err
}
util.portworxClient, err = createDriverClient(svc.Spec.ClusterIP)
if err != nil || util.portworxClient == nil {
glog.Errorf("Failed to connect to portworx service. Err: %v", err)
return nil, err
}
glog.Infof("Using portworx service at: %v as api endpoint", svc.Spec.ClusterIP)
} else {
glog.Infof("Using portworx service at: %v as api endpoint", volumeHost.GetHostName())
}
return volumeclient.VolumeDriver(util.portworxClient), nil