cache portworx API port

- reused client whenever possible
- refactor get client function into explicit cluster-wide and local functions

Signed-off-by: Harsh Desai <harsh@portworx.com>
This commit is contained in:
Harsh Desai 2018-10-31 10:09:53 -07:00
parent c6a2d24fa2
commit db93c5fcbe
2 changed files with 110 additions and 75 deletions

View File

@ -20,6 +20,7 @@ import (
"fmt" "fmt"
"os" "os"
volumeclient "github.com/libopenstorage/openstorage/api/client/volume"
"k8s.io/api/core/v1" "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/resource" "k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
@ -43,7 +44,7 @@ func ProbeVolumePlugins() []volume.VolumePlugin {
type portworxVolumePlugin struct { type portworxVolumePlugin struct {
host volume.VolumeHost host volume.VolumeHost
util *PortworxVolumeUtil util *portworxVolumeUtil
} }
var _ volume.VolumePlugin = &portworxVolumePlugin{} var _ volume.VolumePlugin = &portworxVolumePlugin{}
@ -61,8 +62,18 @@ func getPath(uid types.UID, volName string, host volume.VolumeHost) string {
} }
func (plugin *portworxVolumePlugin) Init(host volume.VolumeHost) error { func (plugin *portworxVolumePlugin) Init(host volume.VolumeHost) error {
client, err := volumeclient.NewDriverClient(
fmt.Sprintf("http://%s:%d", host.GetHostName(), osdMgmtDefaultPort),
pxdDriverName, osdDriverVersion, pxDriverName)
if err != nil {
return err
}
plugin.host = host plugin.host = host
plugin.util = &PortworxVolumeUtil{} plugin.util = &portworxVolumeUtil{
portworxClient: client,
}
return nil return nil
} }

View File

@ -43,13 +43,13 @@ const (
pxDriverName = "pxd-sched" pxDriverName = "pxd-sched"
) )
type PortworxVolumeUtil struct { type portworxVolumeUtil struct {
portworxClient *osdclient.Client portworxClient *osdclient.Client
} }
// CreateVolume creates a Portworx volume. // CreateVolume creates a Portworx volume.
func (util *PortworxVolumeUtil) CreateVolume(p *portworxVolumeProvisioner) (string, int64, map[string]string, error) { func (util *portworxVolumeUtil) CreateVolume(p *portworxVolumeProvisioner) (string, int64, map[string]string, error) {
driver, err := util.getPortworxDriver(p.plugin.host, false /*localOnly*/) driver, err := util.getPortworxDriver(p.plugin.host)
if err != nil || driver == nil { if err != nil || driver == nil {
klog.Errorf("Failed to get portworx driver. Err: %v", err) klog.Errorf("Failed to get portworx driver. Err: %v", err)
return "", 0, nil, err return "", 0, nil, err
@ -112,8 +112,8 @@ func (util *PortworxVolumeUtil) CreateVolume(p *portworxVolumeProvisioner) (stri
} }
// DeleteVolume deletes a Portworx volume // DeleteVolume deletes a Portworx volume
func (util *PortworxVolumeUtil) DeleteVolume(d *portworxVolumeDeleter) error { func (util *portworxVolumeUtil) DeleteVolume(d *portworxVolumeDeleter) error {
driver, err := util.getPortworxDriver(d.plugin.host, false /*localOnly*/) driver, err := util.getPortworxDriver(d.plugin.host)
if err != nil || driver == nil { if err != nil || driver == nil {
klog.Errorf("Failed to get portworx driver. Err: %v", err) klog.Errorf("Failed to get portworx driver. Err: %v", err)
return err return err
@ -128,8 +128,8 @@ func (util *PortworxVolumeUtil) DeleteVolume(d *portworxVolumeDeleter) error {
} }
// AttachVolume attaches a Portworx Volume // AttachVolume attaches a Portworx Volume
func (util *PortworxVolumeUtil) AttachVolume(m *portworxVolumeMounter, attachOptions map[string]string) (string, error) { func (util *portworxVolumeUtil) AttachVolume(m *portworxVolumeMounter, attachOptions map[string]string) (string, error) {
driver, err := util.getPortworxDriver(m.plugin.host, true /*localOnly*/) driver, err := util.getLocalPortworxDriver(m.plugin.host)
if err != nil || driver == nil { if err != nil || driver == nil {
klog.Errorf("Failed to get portworx driver. Err: %v", err) klog.Errorf("Failed to get portworx driver. Err: %v", err)
return "", err return "", err
@ -144,8 +144,8 @@ func (util *PortworxVolumeUtil) AttachVolume(m *portworxVolumeMounter, attachOpt
} }
// DetachVolume detaches a Portworx Volume // DetachVolume detaches a Portworx Volume
func (util *PortworxVolumeUtil) DetachVolume(u *portworxVolumeUnmounter) error { func (util *portworxVolumeUtil) DetachVolume(u *portworxVolumeUnmounter) error {
driver, err := util.getPortworxDriver(u.plugin.host, true /*localOnly*/) driver, err := util.getLocalPortworxDriver(u.plugin.host)
if err != nil || driver == nil { if err != nil || driver == nil {
klog.Errorf("Failed to get portworx driver. Err: %v", err) klog.Errorf("Failed to get portworx driver. Err: %v", err)
return err return err
@ -160,8 +160,8 @@ func (util *PortworxVolumeUtil) DetachVolume(u *portworxVolumeUnmounter) error {
} }
// MountVolume mounts a Portworx Volume on the specified mountPath // MountVolume mounts a Portworx Volume on the specified mountPath
func (util *PortworxVolumeUtil) MountVolume(m *portworxVolumeMounter, mountPath string) error { func (util *portworxVolumeUtil) MountVolume(m *portworxVolumeMounter, mountPath string) error {
driver, err := util.getPortworxDriver(m.plugin.host, true /*localOnly*/) driver, err := util.getLocalPortworxDriver(m.plugin.host)
if err != nil || driver == nil { if err != nil || driver == nil {
klog.Errorf("Failed to get portworx driver. Err: %v", err) klog.Errorf("Failed to get portworx driver. Err: %v", err)
return err return err
@ -176,8 +176,8 @@ func (util *PortworxVolumeUtil) MountVolume(m *portworxVolumeMounter, mountPath
} }
// UnmountVolume unmounts a Portworx Volume // UnmountVolume unmounts a Portworx Volume
func (util *PortworxVolumeUtil) UnmountVolume(u *portworxVolumeUnmounter, mountPath string) error { func (util *portworxVolumeUtil) UnmountVolume(u *portworxVolumeUnmounter, mountPath string) error {
driver, err := util.getPortworxDriver(u.plugin.host, true /*localOnly*/) driver, err := util.getLocalPortworxDriver(u.plugin.host)
if err != nil || driver == nil { if err != nil || driver == nil {
klog.Errorf("Failed to get portworx driver. Err: %v", err) klog.Errorf("Failed to get portworx driver. Err: %v", err)
return err return err
@ -191,8 +191,8 @@ func (util *PortworxVolumeUtil) UnmountVolume(u *portworxVolumeUnmounter, mountP
return nil return nil
} }
func (util *PortworxVolumeUtil) ResizeVolume(spec *volume.Spec, newSize resource.Quantity, volumeHost volume.VolumeHost) error { func (util *portworxVolumeUtil) ResizeVolume(spec *volume.Spec, newSize resource.Quantity, volumeHost volume.VolumeHost) error {
driver, err := util.getPortworxDriver(volumeHost, false /*localOnly*/) driver, err := util.getPortworxDriver(volumeHost)
if err != nil || driver == nil { if err != nil || driver == nil {
klog.Errorf("Failed to get portworx driver. Err: %v", err) klog.Errorf("Failed to get portworx driver. Err: %v", err)
return err return err
@ -268,21 +268,89 @@ func createDriverClient(hostname string, port int32) (*osdclient.Client, error)
} }
} }
// getPortworxDriver() returns a Portworx volume driver which can be used for volume operations // getPortworxDriver returns a Portworx volume driver which can be used for cluster wide operations.
// localOnly: If true, the returned driver will be connected to Portworx API server on volume host. // Operations like create and delete volume don't need to be restricted to local volume host since
// If false, driver will be connected to API server on volume host or Portworx k8s service cluster IP // any node in the Portworx cluster can co-ordinate the create/delete request and forward the operations to
// This flag is required to explicitly force certain operations (mount, unmount, detach, attach) to // the Portworx node that will own/owns the data.
// go to the volume host instead of the k8s service which might route it to any host. This pertains to how func (util *portworxVolumeUtil) getPortworxDriver(volumeHost volume.VolumeHost) (volumeapi.VolumeDriver, error) {
// Portworx mounts and attaches a volume to the running container. The node getting these requests needs to // check if existing saved client is valid
// see the pod container mounts (specifically /var/lib/kubelet/pods/<pod_id>) if isValid, _ := isClientValid(util.portworxClient); isValid {
// Operations like create and delete volume don't need to be restricted to local volume host since return volumeclient.VolumeDriver(util.portworxClient), nil
// any node in the Portworx cluster can co-ordinate the create/delete request and forward the operations to }
// the Portworx node that will own/owns the data.
func (util *PortworxVolumeUtil) getPortworxDriver(volumeHost volume.VolumeHost, localOnly bool) (volumeapi.VolumeDriver, error) { // create new client
var err error var err error
kubeClient := volumeHost.GetKubeClient() util.portworxClient, err = createDriverClient(volumeHost.GetHostName(), osdMgmtDefaultPort) // for backward compatibility
if err != nil || util.portworxClient == nil {
// Create client from portworx k8s service.
svc, err := getPortworxService(volumeHost)
if err != nil {
return nil, err
}
// The port here is always the default one since it's the service port
util.portworxClient, err = createDriverClient(svc.Spec.ClusterIP, osdMgmtDefaultPort)
if err != nil || util.portworxClient == nil {
klog.Errorf("Failed to connect to portworx service. Err: %v", err)
return nil, err
}
glog.Infof("Using portworx cluster service at: %v:%d as api endpoint",
svc.Spec.ClusterIP, osdMgmtDefaultPort)
} else {
glog.Infof("Using portworx service at: %v:%d as api endpoint",
volumeHost.GetHostName(), osdMgmtDefaultPort)
}
return volumeclient.VolumeDriver(util.portworxClient), nil
}
// getLocalPortworxDriver returns driver connected to Portworx API server on volume host.
// This is required to force certain operations (mount, unmount, detach, attach) to
// go to the volume host instead of the k8s service which might route it to any host. This pertains to how
// Portworx mounts and attaches a volume to the running container. The node getting these requests needs to
// see the pod container mounts (specifically /var/lib/kubelet/pods/<pod_id>)
func (util *portworxVolumeUtil) getLocalPortworxDriver(volumeHost volume.VolumeHost) (volumeapi.VolumeDriver, error) {
if util.portworxClient != nil {
// check if existing saved client is valid
if isValid, _ := isClientValid(util.portworxClient); isValid {
return volumeclient.VolumeDriver(util.portworxClient), nil
}
}
// Lookup port
svc, err := getPortworxService(volumeHost)
if err != nil {
return nil, err
}
osgMgmtPort := lookupPXAPIPortFromService(svc)
util.portworxClient, err = createDriverClient(volumeHost.GetHostName(), osgMgmtPort)
if err != nil {
return nil, err
}
glog.Infof("Using portworx local service at: %v:%d as api endpoint",
volumeHost.GetHostName(), osgMgmtPort)
return volumeclient.VolumeDriver(util.portworxClient), nil
}
// lookupPXAPIPortFromService goes over all the ports in the given service and returns the target
// port for osdMgmtDefaultPort
func lookupPXAPIPortFromService(svc *v1.Service) int32 {
for _, p := range svc.Spec.Ports {
if p.Port == osdMgmtDefaultPort {
return p.TargetPort.IntVal
}
}
return osdMgmtDefaultPort // default
}
// getPortworxService returns the portworx cluster service from the API server
func getPortworxService(host volume.VolumeHost) (*v1.Service, error) {
kubeClient := host.GetKubeClient()
if kubeClient == nil { if kubeClient == nil {
err = fmt.Errorf("Failed to get kubeclient when creating portworx client") err := fmt.Errorf("Failed to get kubeclient when creating portworx client")
glog.Errorf(err.Error()) glog.Errorf(err.Error())
return nil, err return nil, err
} }
@ -300,49 +368,5 @@ func (util *PortworxVolumeUtil) getPortworxDriver(volumeHost volume.VolumeHost,
return nil, err return nil, err
} }
var pxAPIPort int32 = osdMgmtDefaultPort return svc, nil
if localOnly {
pxAPIPort = lookupPXAPIPortFromService(svc)
util.portworxClient, err = createDriverClient(volumeHost.GetHostName(), pxAPIPort)
if err != nil {
return nil, err
} else {
klog.V(4).Infof("Using portworx local service at: %v as api endpoint", volumeHost.GetHostName())
return volumeclient.VolumeDriver(util.portworxClient), nil
}
}
// check if existing saved client is valid
if isValid, _ := isClientValid(util.portworxClient); isValid {
return volumeclient.VolumeDriver(util.portworxClient), nil
}
// create new client
util.portworxClient, err = createDriverClient(volumeHost.GetHostName(), pxAPIPort) // for backward compatibility
if err != nil || util.portworxClient == nil {
// Create client from portworx k8s service
util.portworxClient, err = createDriverClient(svc.Spec.ClusterIP, pxAPIPort)
if err != nil || util.portworxClient == nil {
klog.Errorf("Failed to connect to portworx service. Err: %v", err)
return nil, err
}
klog.Infof("Using portworx cluster service at: %v as api endpoint", svc.Spec.ClusterIP)
} else {
klog.Infof("Using portworx service at: %v as api endpoint", volumeHost.GetHostName())
}
return volumeclient.VolumeDriver(util.portworxClient), nil
}
// lookupPXAPIPortFromService goes over all the ports in the given service and returns the target
// port for osdMgmtDefaultPort
func lookupPXAPIPortFromService(svc *v1.Service) int32 {
for _, p := range svc.Spec.Ports {
if p.Port == osdMgmtDefaultPort {
return p.TargetPort.IntVal
}
}
return osdMgmtDefaultPort // default
} }