mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-09-15 14:14:39 +00:00
Use Portworx service as api endpoint for volume operations
This commit is contained in:
@@ -34,11 +34,12 @@ import (
|
|||||||
|
|
||||||
// This is the primary entrypoint for volume plugins.
|
// This is the primary entrypoint for volume plugins.
|
||||||
func ProbeVolumePlugins() []volume.VolumePlugin {
|
func ProbeVolumePlugins() []volume.VolumePlugin {
|
||||||
return []volume.VolumePlugin{&portworxVolumePlugin{nil}}
|
return []volume.VolumePlugin{&portworxVolumePlugin{nil, nil}}
|
||||||
}
|
}
|
||||||
|
|
||||||
type portworxVolumePlugin struct {
|
type portworxVolumePlugin struct {
|
||||||
host volume.VolumeHost
|
host volume.VolumeHost
|
||||||
|
util *PortworxVolumeUtil
|
||||||
}
|
}
|
||||||
|
|
||||||
var _ volume.VolumePlugin = &portworxVolumePlugin{}
|
var _ volume.VolumePlugin = &portworxVolumePlugin{}
|
||||||
@@ -56,6 +57,7 @@ func getPath(uid types.UID, volName string, host volume.VolumeHost) string {
|
|||||||
|
|
||||||
func (plugin *portworxVolumePlugin) Init(host volume.VolumeHost) error {
|
func (plugin *portworxVolumePlugin) Init(host volume.VolumeHost) error {
|
||||||
plugin.host = host
|
plugin.host = host
|
||||||
|
plugin.util = &PortworxVolumeUtil{}
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -89,7 +91,7 @@ func (plugin *portworxVolumePlugin) GetAccessModes() []v1.PersistentVolumeAccess
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (plugin *portworxVolumePlugin) NewMounter(spec *volume.Spec, pod *v1.Pod, _ volume.VolumeOptions) (volume.Mounter, error) {
|
func (plugin *portworxVolumePlugin) NewMounter(spec *volume.Spec, pod *v1.Pod, _ volume.VolumeOptions) (volume.Mounter, error) {
|
||||||
return plugin.newMounterInternal(spec, pod.UID, &PortworxVolumeUtil{}, plugin.host.GetMounter())
|
return plugin.newMounterInternal(spec, pod.UID, plugin.util, plugin.host.GetMounter())
|
||||||
}
|
}
|
||||||
|
|
||||||
func (plugin *portworxVolumePlugin) newMounterInternal(spec *volume.Spec, podUID types.UID, manager portworxManager, mounter mount.Interface) (volume.Mounter, error) {
|
func (plugin *portworxVolumePlugin) newMounterInternal(spec *volume.Spec, podUID types.UID, manager portworxManager, mounter mount.Interface) (volume.Mounter, error) {
|
||||||
@@ -117,10 +119,11 @@ func (plugin *portworxVolumePlugin) newMounterInternal(spec *volume.Spec, podUID
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (plugin *portworxVolumePlugin) NewUnmounter(volName string, podUID types.UID) (volume.Unmounter, error) {
|
func (plugin *portworxVolumePlugin) NewUnmounter(volName string, podUID types.UID) (volume.Unmounter, error) {
|
||||||
return plugin.newUnmounterInternal(volName, podUID, &PortworxVolumeUtil{}, plugin.host.GetMounter())
|
return plugin.newUnmounterInternal(volName, podUID, plugin.util, plugin.host.GetMounter())
|
||||||
}
|
}
|
||||||
|
|
||||||
func (plugin *portworxVolumePlugin) newUnmounterInternal(volName string, podUID types.UID, manager portworxManager, mounter mount.Interface) (volume.Unmounter, error) {
|
func (plugin *portworxVolumePlugin) newUnmounterInternal(volName string, podUID types.UID, manager portworxManager,
|
||||||
|
mounter mount.Interface) (volume.Unmounter, error) {
|
||||||
return &portworxVolumeUnmounter{
|
return &portworxVolumeUnmounter{
|
||||||
&portworxVolume{
|
&portworxVolume{
|
||||||
podUID: podUID,
|
podUID: podUID,
|
||||||
@@ -133,13 +136,14 @@ func (plugin *portworxVolumePlugin) newUnmounterInternal(volName string, podUID
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (plugin *portworxVolumePlugin) NewDeleter(spec *volume.Spec) (volume.Deleter, error) {
|
func (plugin *portworxVolumePlugin) NewDeleter(spec *volume.Spec) (volume.Deleter, error) {
|
||||||
return plugin.newDeleterInternal(spec, &PortworxVolumeUtil{})
|
return plugin.newDeleterInternal(spec, plugin.util)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (plugin *portworxVolumePlugin) newDeleterInternal(spec *volume.Spec, manager portworxManager) (volume.Deleter, error) {
|
func (plugin *portworxVolumePlugin) newDeleterInternal(spec *volume.Spec, manager portworxManager) (volume.Deleter, error) {
|
||||||
if spec.PersistentVolume != nil && spec.PersistentVolume.Spec.PortworxVolume == nil {
|
if spec.PersistentVolume != nil && spec.PersistentVolume.Spec.PortworxVolume == nil {
|
||||||
return nil, fmt.Errorf("spec.PersistentVolumeSource.PortworxVolume is nil")
|
return nil, fmt.Errorf("spec.PersistentVolumeSource.PortworxVolume is nil")
|
||||||
}
|
}
|
||||||
|
|
||||||
return &portworxVolumeDeleter{
|
return &portworxVolumeDeleter{
|
||||||
portworxVolume: &portworxVolume{
|
portworxVolume: &portworxVolume{
|
||||||
volName: spec.Name(),
|
volName: spec.Name(),
|
||||||
@@ -150,7 +154,7 @@ func (plugin *portworxVolumePlugin) newDeleterInternal(spec *volume.Spec, manage
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (plugin *portworxVolumePlugin) NewProvisioner(options volume.VolumeOptions) (volume.Provisioner, error) {
|
func (plugin *portworxVolumePlugin) NewProvisioner(options volume.VolumeOptions) (volume.Provisioner, error) {
|
||||||
return plugin.newProvisionerInternal(options, &PortworxVolumeUtil{})
|
return plugin.newProvisionerInternal(options, plugin.util)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (plugin *portworxVolumePlugin) newProvisionerInternal(options volume.VolumeOptions, manager portworxManager) (volume.Provisioner, error) {
|
func (plugin *portworxVolumePlugin) newProvisionerInternal(options volume.VolumeOptions, manager portworxManager) (volume.Provisioner, error) {
|
||||||
|
@@ -22,19 +22,18 @@ import (
|
|||||||
osdclient "github.com/libopenstorage/openstorage/api/client"
|
osdclient "github.com/libopenstorage/openstorage/api/client"
|
||||||
volumeclient "github.com/libopenstorage/openstorage/api/client/volume"
|
volumeclient "github.com/libopenstorage/openstorage/api/client/volume"
|
||||||
osdspec "github.com/libopenstorage/openstorage/api/spec"
|
osdspec "github.com/libopenstorage/openstorage/api/spec"
|
||||||
osdvolume "github.com/libopenstorage/openstorage/volume"
|
|
||||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||||
|
"k8s.io/kubernetes/pkg/api"
|
||||||
"k8s.io/kubernetes/pkg/api/v1"
|
"k8s.io/kubernetes/pkg/api/v1"
|
||||||
"k8s.io/kubernetes/pkg/volume"
|
"k8s.io/kubernetes/pkg/volume"
|
||||||
)
|
)
|
||||||
|
|
||||||
const (
|
const (
|
||||||
osdMgmtPort = "9001"
|
osdMgmtPort = "9001"
|
||||||
osdDriverVersion = "v1"
|
osdDriverVersion = "v1"
|
||||||
pxdDriverName = "pxd"
|
pxdDriverName = "pxd"
|
||||||
pwxSockName = "pwx"
|
pvcClaimLabel = "pvc"
|
||||||
pvcClaimLabel = "pvc"
|
pxServiceName = "portworx-service"
|
||||||
labelNodeRoleMaster = "node-role.kubernetes.io/master"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
type PortworxVolumeUtil struct {
|
type PortworxVolumeUtil struct {
|
||||||
@@ -43,11 +42,17 @@ type PortworxVolumeUtil struct {
|
|||||||
|
|
||||||
// CreateVolume creates a Portworx volume.
|
// CreateVolume creates a Portworx volume.
|
||||||
func (util *PortworxVolumeUtil) CreateVolume(p *portworxVolumeProvisioner) (string, int, map[string]string, error) {
|
func (util *PortworxVolumeUtil) CreateVolume(p *portworxVolumeProvisioner) (string, int, map[string]string, error) {
|
||||||
client, err := util.osdClient(p.plugin.host)
|
if util.portworxClient == nil || !isValid(util.portworxClient) {
|
||||||
if err != nil {
|
var err error
|
||||||
return "", 0, nil, err
|
util.portworxClient, err = getPortworxClient(p.plugin.host)
|
||||||
|
if err != nil || util.portworxClient == nil {
|
||||||
|
glog.Errorf("Failed to get portworx client. Err: %v", err)
|
||||||
|
return "", 0, nil, err
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
driver := volumeclient.VolumeDriver(util.portworxClient)
|
||||||
|
|
||||||
capacity := p.options.PVC.Spec.Resources.Requests[v1.ResourceName(v1.ResourceStorage)]
|
capacity := p.options.PVC.Spec.Resources.Requests[v1.ResourceName(v1.ResourceStorage)]
|
||||||
// Portworx Volumes are specified in GB
|
// Portworx Volumes are specified in GB
|
||||||
requestGB := int(volume.RoundUpSize(capacity.Value(), 1024*1024*1024))
|
requestGB := int(volume.RoundUpSize(capacity.Value(), 1024*1024*1024))
|
||||||
@@ -65,7 +70,7 @@ func (util *PortworxVolumeUtil) CreateVolume(p *portworxVolumeProvisioner) (stri
|
|||||||
// Add claim Name as a part of Portworx Volume Labels
|
// Add claim Name as a part of Portworx Volume Labels
|
||||||
locator.VolumeLabels = make(map[string]string)
|
locator.VolumeLabels = make(map[string]string)
|
||||||
locator.VolumeLabels[pvcClaimLabel] = p.options.PVC.Name
|
locator.VolumeLabels[pvcClaimLabel] = p.options.PVC.Name
|
||||||
volumeID, err := client.Create(&locator, &source, spec)
|
volumeID, err := driver.Create(&locator, &source, spec)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
glog.V(2).Infof("Error creating Portworx Volume : %v", err)
|
glog.V(2).Infof("Error creating Portworx Volume : %v", err)
|
||||||
}
|
}
|
||||||
@@ -74,12 +79,17 @@ func (util *PortworxVolumeUtil) CreateVolume(p *portworxVolumeProvisioner) (stri
|
|||||||
|
|
||||||
// DeleteVolume deletes a Portworx volume
|
// DeleteVolume deletes a Portworx volume
|
||||||
func (util *PortworxVolumeUtil) DeleteVolume(d *portworxVolumeDeleter) error {
|
func (util *PortworxVolumeUtil) DeleteVolume(d *portworxVolumeDeleter) error {
|
||||||
client, err := util.osdClient(d.plugin.host)
|
if util.portworxClient == nil || !isValid(util.portworxClient) {
|
||||||
if err != nil {
|
var err error
|
||||||
return err
|
util.portworxClient, err = getPortworxClient(d.plugin.host)
|
||||||
|
if err != nil || util.portworxClient == nil {
|
||||||
|
glog.Errorf("Failed to get portworx client. Err: %v", err)
|
||||||
|
return err
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
err = client.Delete(d.volumeID)
|
driver := volumeclient.VolumeDriver(util.portworxClient)
|
||||||
|
err := driver.Delete(d.volumeID)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
glog.V(2).Infof("Error deleting Portworx Volume (%v): %v", d.volName, err)
|
glog.V(2).Infof("Error deleting Portworx Volume (%v): %v", d.volName, err)
|
||||||
return err
|
return err
|
||||||
@@ -89,12 +99,17 @@ func (util *PortworxVolumeUtil) DeleteVolume(d *portworxVolumeDeleter) error {
|
|||||||
|
|
||||||
// AttachVolume attaches a Portworx Volume
|
// AttachVolume attaches a Portworx Volume
|
||||||
func (util *PortworxVolumeUtil) AttachVolume(m *portworxVolumeMounter) (string, error) {
|
func (util *PortworxVolumeUtil) AttachVolume(m *portworxVolumeMounter) (string, error) {
|
||||||
client, err := util.osdClient(m.plugin.host)
|
if util.portworxClient == nil || !isValid(util.portworxClient) {
|
||||||
if err != nil {
|
var err error
|
||||||
return "", err
|
util.portworxClient, err = getPortworxClient(m.plugin.host)
|
||||||
|
if err != nil || util.portworxClient == nil {
|
||||||
|
glog.Errorf("Failed to get portworx client. Err: %v", err)
|
||||||
|
return "", err
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
devicePath, err := client.Attach(m.volName)
|
driver := volumeclient.VolumeDriver(util.portworxClient)
|
||||||
|
devicePath, err := driver.Attach(m.volName)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
glog.V(2).Infof("Error attaching Portworx Volume (%v): %v", m.volName, err)
|
glog.V(2).Infof("Error attaching Portworx Volume (%v): %v", m.volName, err)
|
||||||
return "", err
|
return "", err
|
||||||
@@ -104,12 +119,17 @@ func (util *PortworxVolumeUtil) AttachVolume(m *portworxVolumeMounter) (string,
|
|||||||
|
|
||||||
// DetachVolume detaches a Portworx Volume
|
// DetachVolume detaches a Portworx Volume
|
||||||
func (util *PortworxVolumeUtil) DetachVolume(u *portworxVolumeUnmounter) error {
|
func (util *PortworxVolumeUtil) DetachVolume(u *portworxVolumeUnmounter) error {
|
||||||
client, err := util.osdClient(u.plugin.host)
|
if util.portworxClient == nil || !isValid(util.portworxClient) {
|
||||||
if err != nil {
|
var err error
|
||||||
return err
|
util.portworxClient, err = getPortworxClient(u.plugin.host)
|
||||||
|
if err != nil || util.portworxClient == nil {
|
||||||
|
glog.Errorf("Failed to get portworx client. Err: %v", err)
|
||||||
|
return err
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
err = client.Detach(u.volName)
|
driver := volumeclient.VolumeDriver(util.portworxClient)
|
||||||
|
err := driver.Detach(u.volName)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
glog.V(2).Infof("Error detaching Portworx Volume (%v): %v", u.volName, err)
|
glog.V(2).Infof("Error detaching Portworx Volume (%v): %v", u.volName, err)
|
||||||
return err
|
return err
|
||||||
@@ -119,12 +139,17 @@ func (util *PortworxVolumeUtil) DetachVolume(u *portworxVolumeUnmounter) error {
|
|||||||
|
|
||||||
// MountVolume mounts a Portworx Volume on the specified mountPath
|
// MountVolume mounts a Portworx Volume on the specified mountPath
|
||||||
func (util *PortworxVolumeUtil) MountVolume(m *portworxVolumeMounter, mountPath string) error {
|
func (util *PortworxVolumeUtil) MountVolume(m *portworxVolumeMounter, mountPath string) error {
|
||||||
client, err := util.osdClient(m.plugin.host)
|
if util.portworxClient == nil || !isValid(util.portworxClient) {
|
||||||
if err != nil {
|
var err error
|
||||||
return err
|
util.portworxClient, err = getPortworxClient(m.plugin.host)
|
||||||
|
if err != nil || util.portworxClient == nil {
|
||||||
|
glog.Errorf("Failed to get portworx client. Err: %v", err)
|
||||||
|
return err
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
err = client.Mount(m.volName, mountPath)
|
driver := volumeclient.VolumeDriver(util.portworxClient)
|
||||||
|
err := driver.Mount(m.volName, mountPath)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
glog.V(2).Infof("Error mounting Portworx Volume (%v) on Path (%v): %v", m.volName, mountPath, err)
|
glog.V(2).Infof("Error mounting Portworx Volume (%v) on Path (%v): %v", m.volName, mountPath, err)
|
||||||
return err
|
return err
|
||||||
@@ -134,12 +159,17 @@ func (util *PortworxVolumeUtil) MountVolume(m *portworxVolumeMounter, mountPath
|
|||||||
|
|
||||||
// UnmountVolume unmounts a Portworx Volume
|
// UnmountVolume unmounts a Portworx Volume
|
||||||
func (util *PortworxVolumeUtil) UnmountVolume(u *portworxVolumeUnmounter, mountPath string) error {
|
func (util *PortworxVolumeUtil) UnmountVolume(u *portworxVolumeUnmounter, mountPath string) error {
|
||||||
client, err := util.osdClient(u.plugin.host)
|
if util.portworxClient == nil || !isValid(util.portworxClient) {
|
||||||
if err != nil {
|
var err error
|
||||||
return err
|
util.portworxClient, err = getPortworxClient(u.plugin.host)
|
||||||
|
if err != nil || util.portworxClient == nil {
|
||||||
|
glog.Errorf("Failed to get portworx client. Err: %v", err)
|
||||||
|
return err
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
err = client.Unmount(u.volName, mountPath)
|
driver := volumeclient.VolumeDriver(util.portworxClient)
|
||||||
|
err := driver.Unmount(u.volName, mountPath)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
glog.V(2).Infof("Error unmounting Portworx Volume (%v) on Path (%v): %v", u.volName, mountPath, err)
|
glog.V(2).Infof("Error unmounting Portworx Volume (%v) on Path (%v): %v", u.volName, mountPath, err)
|
||||||
return err
|
return err
|
||||||
@@ -147,67 +177,61 @@ func (util *PortworxVolumeUtil) UnmountVolume(u *portworxVolumeUnmounter, mountP
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (util *PortworxVolumeUtil) osdClient(volumeHost volume.VolumeHost) (osdvolume.VolumeDriver, error) {
|
func isValid(client *osdclient.Client) bool {
|
||||||
if util.portworxClient == nil {
|
_, err := client.Versions(osdapi.OsdVolumePath)
|
||||||
var e error
|
if err != nil {
|
||||||
|
glog.Errorf("portworx client failed driver versions check. Err: %v", err)
|
||||||
driverClient, err := getValidatedOsdClient(volumeHost.GetHostName())
|
return false
|
||||||
if err == nil && driverClient != nil {
|
|
||||||
util.portworxClient = driverClient
|
|
||||||
} else {
|
|
||||||
e = err
|
|
||||||
kubeClient := volumeHost.GetKubeClient()
|
|
||||||
if kubeClient != nil {
|
|
||||||
nodes, err := kubeClient.CoreV1().Nodes().List(metav1.ListOptions{})
|
|
||||||
if err != nil {
|
|
||||||
glog.Errorf("Failed to list k8s nodes. Err: $v", err.Error())
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
|
|
||||||
OUTER:
|
|
||||||
for _, node := range nodes.Items {
|
|
||||||
if _, present := node.Labels[labelNodeRoleMaster]; present {
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
|
|
||||||
for _, n := range node.Status.Addresses {
|
|
||||||
driverClient, err = getValidatedOsdClient(n.Address)
|
|
||||||
if err != nil {
|
|
||||||
e = err
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
|
|
||||||
if driverClient != nil {
|
|
||||||
util.portworxClient = driverClient
|
|
||||||
break OUTER
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
if util.portworxClient == nil {
|
|
||||||
glog.Errorf("Failed to discover portworx api server.")
|
|
||||||
return nil, e
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
return volumeclient.VolumeDriver(util.portworxClient), nil
|
return true
|
||||||
}
|
}
|
||||||
|
|
||||||
func getValidatedOsdClient(hostname string) (*osdclient.Client, error) {
|
func testDriverClient(hostname string) (*osdclient.Client, error) {
|
||||||
driverClient, err := volumeclient.NewDriverClient("http://"+hostname+":"+osdMgmtPort,
|
client, err := volumeclient.NewDriverClient("http://"+hostname+":"+osdMgmtPort,
|
||||||
pxdDriverName, osdDriverVersion)
|
pxdDriverName, osdDriverVersion)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
glog.Warningf("Failed to create driver client with node: %v. Err: %v", hostname, err)
|
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
_, err = driverClient.Versions(osdapi.OsdVolumePath)
|
if isValid(client) {
|
||||||
if err != nil {
|
return client, nil
|
||||||
glog.Warningf("node: %v failed driver versions check. Err: %v", hostname, err)
|
} else {
|
||||||
return nil, err
|
return nil, nil
|
||||||
}
|
}
|
||||||
|
}
|
||||||
return driverClient, nil
|
|
||||||
|
func getPortworxClient(volumeHost volume.VolumeHost) (*osdclient.Client, error) {
|
||||||
|
pxClient, err := testDriverClient(volumeHost.GetHostName()) // for backward compatibility
|
||||||
|
if err != nil || pxClient == nil {
|
||||||
|
// Create client from portworx service
|
||||||
|
kubeClient := volumeHost.GetKubeClient()
|
||||||
|
if kubeClient == nil {
|
||||||
|
glog.Error("Failed to get kubeclient when creating portworx client")
|
||||||
|
return nil, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
opts := metav1.GetOptions{}
|
||||||
|
svc, err := kubeClient.CoreV1().Services(api.NamespaceSystem).Get(pxServiceName, opts)
|
||||||
|
if err != nil {
|
||||||
|
glog.Errorf("Failed to get service. Err: %v", err)
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
if svc == nil {
|
||||||
|
glog.Errorf("Service: %v not found. Consult Portworx install docs to "+
|
||||||
|
"deploy it.", pxServiceName)
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
pxClient, err = testDriverClient(svc.Spec.ClusterIP)
|
||||||
|
if err != nil || pxClient == nil {
|
||||||
|
glog.Errorf("Failed to connect to portworx service. Err: %v", err)
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
glog.Infof("Using portworx service at: %v as api endpoint", svc.Spec.ClusterIP)
|
||||||
|
}
|
||||||
|
|
||||||
|
return pxClient, nil
|
||||||
}
|
}
|
||||||
|
Reference in New Issue
Block a user