From db93c5fcbec3b89fba78459bedb7c3bead49d846 Mon Sep 17 00:00:00 2001 From: Harsh Desai Date: Wed, 31 Oct 2018 10:09:53 -0700 Subject: [PATCH] cache portworx API port - reused client whenever possible - refactor get client function into explicit cluster-wide and local functions Signed-off-by: Harsh Desai --- pkg/volume/portworx/portworx.go | 15 ++- pkg/volume/portworx/portworx_util.go | 170 +++++++++++++++------------ 2 files changed, 110 insertions(+), 75 deletions(-) diff --git a/pkg/volume/portworx/portworx.go b/pkg/volume/portworx/portworx.go index 212a4c23d79..e2cb8982986 100644 --- a/pkg/volume/portworx/portworx.go +++ b/pkg/volume/portworx/portworx.go @@ -20,6 +20,7 @@ import ( "fmt" "os" + volumeclient "github.com/libopenstorage/openstorage/api/client/volume" "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/resource" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -43,7 +44,7 @@ func ProbeVolumePlugins() []volume.VolumePlugin { type portworxVolumePlugin struct { host volume.VolumeHost - util *PortworxVolumeUtil + util *portworxVolumeUtil } var _ volume.VolumePlugin = &portworxVolumePlugin{} @@ -61,8 +62,18 @@ func getPath(uid types.UID, volName string, host volume.VolumeHost) string { } func (plugin *portworxVolumePlugin) Init(host volume.VolumeHost) error { + client, err := volumeclient.NewDriverClient( + fmt.Sprintf("http://%s:%d", host.GetHostName(), osdMgmtDefaultPort), + pxdDriverName, osdDriverVersion, pxDriverName) + if err != nil { + return err + } + plugin.host = host - plugin.util = &PortworxVolumeUtil{} + plugin.util = &portworxVolumeUtil{ + portworxClient: client, + } + return nil } diff --git a/pkg/volume/portworx/portworx_util.go b/pkg/volume/portworx/portworx_util.go index 66e90e6a5e5..ea6092da770 100644 --- a/pkg/volume/portworx/portworx_util.go +++ b/pkg/volume/portworx/portworx_util.go @@ -43,13 +43,13 @@ const ( pxDriverName = "pxd-sched" ) -type PortworxVolumeUtil struct { +type portworxVolumeUtil struct { portworxClient *osdclient.Client } // CreateVolume creates a Portworx volume. -func (util *PortworxVolumeUtil) CreateVolume(p *portworxVolumeProvisioner) (string, int64, map[string]string, error) { - driver, err := util.getPortworxDriver(p.plugin.host, false /*localOnly*/) +func (util *portworxVolumeUtil) CreateVolume(p *portworxVolumeProvisioner) (string, int64, map[string]string, error) { + driver, err := util.getPortworxDriver(p.plugin.host) if err != nil || driver == nil { klog.Errorf("Failed to get portworx driver. Err: %v", err) return "", 0, nil, err @@ -112,8 +112,8 @@ func (util *PortworxVolumeUtil) CreateVolume(p *portworxVolumeProvisioner) (stri } // DeleteVolume deletes a Portworx volume -func (util *PortworxVolumeUtil) DeleteVolume(d *portworxVolumeDeleter) error { - driver, err := util.getPortworxDriver(d.plugin.host, false /*localOnly*/) +func (util *portworxVolumeUtil) DeleteVolume(d *portworxVolumeDeleter) error { + driver, err := util.getPortworxDriver(d.plugin.host) if err != nil || driver == nil { klog.Errorf("Failed to get portworx driver. Err: %v", err) return err @@ -128,8 +128,8 @@ func (util *PortworxVolumeUtil) DeleteVolume(d *portworxVolumeDeleter) error { } // AttachVolume attaches a Portworx Volume -func (util *PortworxVolumeUtil) AttachVolume(m *portworxVolumeMounter, attachOptions map[string]string) (string, error) { - driver, err := util.getPortworxDriver(m.plugin.host, true /*localOnly*/) +func (util *portworxVolumeUtil) AttachVolume(m *portworxVolumeMounter, attachOptions map[string]string) (string, error) { + driver, err := util.getLocalPortworxDriver(m.plugin.host) if err != nil || driver == nil { klog.Errorf("Failed to get portworx driver. Err: %v", err) return "", err @@ -144,8 +144,8 @@ func (util *PortworxVolumeUtil) AttachVolume(m *portworxVolumeMounter, attachOpt } // DetachVolume detaches a Portworx Volume -func (util *PortworxVolumeUtil) DetachVolume(u *portworxVolumeUnmounter) error { - driver, err := util.getPortworxDriver(u.plugin.host, true /*localOnly*/) +func (util *portworxVolumeUtil) DetachVolume(u *portworxVolumeUnmounter) error { + driver, err := util.getLocalPortworxDriver(u.plugin.host) if err != nil || driver == nil { klog.Errorf("Failed to get portworx driver. Err: %v", err) return err @@ -160,8 +160,8 @@ func (util *PortworxVolumeUtil) DetachVolume(u *portworxVolumeUnmounter) error { } // MountVolume mounts a Portworx Volume on the specified mountPath -func (util *PortworxVolumeUtil) MountVolume(m *portworxVolumeMounter, mountPath string) error { - driver, err := util.getPortworxDriver(m.plugin.host, true /*localOnly*/) +func (util *portworxVolumeUtil) MountVolume(m *portworxVolumeMounter, mountPath string) error { + driver, err := util.getLocalPortworxDriver(m.plugin.host) if err != nil || driver == nil { klog.Errorf("Failed to get portworx driver. Err: %v", err) return err @@ -176,8 +176,8 @@ func (util *PortworxVolumeUtil) MountVolume(m *portworxVolumeMounter, mountPath } // UnmountVolume unmounts a Portworx Volume -func (util *PortworxVolumeUtil) UnmountVolume(u *portworxVolumeUnmounter, mountPath string) error { - driver, err := util.getPortworxDriver(u.plugin.host, true /*localOnly*/) +func (util *portworxVolumeUtil) UnmountVolume(u *portworxVolumeUnmounter, mountPath string) error { + driver, err := util.getLocalPortworxDriver(u.plugin.host) if err != nil || driver == nil { klog.Errorf("Failed to get portworx driver. Err: %v", err) return err @@ -191,8 +191,8 @@ func (util *PortworxVolumeUtil) UnmountVolume(u *portworxVolumeUnmounter, mountP return nil } -func (util *PortworxVolumeUtil) ResizeVolume(spec *volume.Spec, newSize resource.Quantity, volumeHost volume.VolumeHost) error { - driver, err := util.getPortworxDriver(volumeHost, false /*localOnly*/) +func (util *portworxVolumeUtil) ResizeVolume(spec *volume.Spec, newSize resource.Quantity, volumeHost volume.VolumeHost) error { + driver, err := util.getPortworxDriver(volumeHost) if err != nil || driver == nil { klog.Errorf("Failed to get portworx driver. Err: %v", err) return err @@ -268,21 +268,89 @@ func createDriverClient(hostname string, port int32) (*osdclient.Client, error) } } -// getPortworxDriver() returns a Portworx volume driver which can be used for volume operations -// localOnly: If true, the returned driver will be connected to Portworx API server on volume host. -// If false, driver will be connected to API server on volume host or Portworx k8s service cluster IP -// This flag is required to explicitly force certain operations (mount, unmount, detach, attach) to -// go to the volume host instead of the k8s service which might route it to any host. This pertains to how -// Portworx mounts and attaches a volume to the running container. The node getting these requests needs to -// see the pod container mounts (specifically /var/lib/kubelet/pods/) -// Operations like create and delete volume don't need to be restricted to local volume host since -// any node in the Portworx cluster can co-ordinate the create/delete request and forward the operations to -// the Portworx node that will own/owns the data. -func (util *PortworxVolumeUtil) getPortworxDriver(volumeHost volume.VolumeHost, localOnly bool) (volumeapi.VolumeDriver, error) { +// getPortworxDriver returns a Portworx volume driver which can be used for cluster wide operations. +// Operations like create and delete volume don't need to be restricted to local volume host since +// any node in the Portworx cluster can co-ordinate the create/delete request and forward the operations to +// the Portworx node that will own/owns the data. +func (util *portworxVolumeUtil) getPortworxDriver(volumeHost volume.VolumeHost) (volumeapi.VolumeDriver, error) { + // check if existing saved client is valid + if isValid, _ := isClientValid(util.portworxClient); isValid { + return volumeclient.VolumeDriver(util.portworxClient), nil + } + + // create new client var err error - kubeClient := volumeHost.GetKubeClient() + util.portworxClient, err = createDriverClient(volumeHost.GetHostName(), osdMgmtDefaultPort) // for backward compatibility + if err != nil || util.portworxClient == nil { + // Create client from portworx k8s service. + svc, err := getPortworxService(volumeHost) + if err != nil { + return nil, err + } + + // The port here is always the default one since it's the service port + util.portworxClient, err = createDriverClient(svc.Spec.ClusterIP, osdMgmtDefaultPort) + if err != nil || util.portworxClient == nil { + klog.Errorf("Failed to connect to portworx service. Err: %v", err) + return nil, err + } + + glog.Infof("Using portworx cluster service at: %v:%d as api endpoint", + svc.Spec.ClusterIP, osdMgmtDefaultPort) + } else { + glog.Infof("Using portworx service at: %v:%d as api endpoint", + volumeHost.GetHostName(), osdMgmtDefaultPort) + } + + return volumeclient.VolumeDriver(util.portworxClient), nil +} + +// getLocalPortworxDriver returns driver connected to Portworx API server on volume host. +// This is required to force certain operations (mount, unmount, detach, attach) to +// go to the volume host instead of the k8s service which might route it to any host. This pertains to how +// Portworx mounts and attaches a volume to the running container. The node getting these requests needs to +// see the pod container mounts (specifically /var/lib/kubelet/pods/) +func (util *portworxVolumeUtil) getLocalPortworxDriver(volumeHost volume.VolumeHost) (volumeapi.VolumeDriver, error) { + if util.portworxClient != nil { + // check if existing saved client is valid + if isValid, _ := isClientValid(util.portworxClient); isValid { + return volumeclient.VolumeDriver(util.portworxClient), nil + } + } + + // Lookup port + svc, err := getPortworxService(volumeHost) + if err != nil { + return nil, err + } + + osgMgmtPort := lookupPXAPIPortFromService(svc) + util.portworxClient, err = createDriverClient(volumeHost.GetHostName(), osgMgmtPort) + if err != nil { + return nil, err + } + + glog.Infof("Using portworx local service at: %v:%d as api endpoint", + volumeHost.GetHostName(), osgMgmtPort) + return volumeclient.VolumeDriver(util.portworxClient), nil +} + +// lookupPXAPIPortFromService goes over all the ports in the given service and returns the target +// port for osdMgmtDefaultPort +func lookupPXAPIPortFromService(svc *v1.Service) int32 { + for _, p := range svc.Spec.Ports { + if p.Port == osdMgmtDefaultPort { + return p.TargetPort.IntVal + } + } + return osdMgmtDefaultPort // default +} + +// getPortworxService returns the portworx cluster service from the API server +func getPortworxService(host volume.VolumeHost) (*v1.Service, error) { + kubeClient := host.GetKubeClient() if kubeClient == nil { - err = fmt.Errorf("Failed to get kubeclient when creating portworx client") + err := fmt.Errorf("Failed to get kubeclient when creating portworx client") glog.Errorf(err.Error()) return nil, err } @@ -300,49 +368,5 @@ func (util *PortworxVolumeUtil) getPortworxDriver(volumeHost volume.VolumeHost, return nil, err } - var pxAPIPort int32 = osdMgmtDefaultPort - - if localOnly { - pxAPIPort = lookupPXAPIPortFromService(svc) - util.portworxClient, err = createDriverClient(volumeHost.GetHostName(), pxAPIPort) - if err != nil { - return nil, err - } else { - klog.V(4).Infof("Using portworx local service at: %v as api endpoint", volumeHost.GetHostName()) - return volumeclient.VolumeDriver(util.portworxClient), nil - } - } - - // check if existing saved client is valid - if isValid, _ := isClientValid(util.portworxClient); isValid { - return volumeclient.VolumeDriver(util.portworxClient), nil - } - - // create new client - util.portworxClient, err = createDriverClient(volumeHost.GetHostName(), pxAPIPort) // for backward compatibility - if err != nil || util.portworxClient == nil { - // Create client from portworx k8s service - util.portworxClient, err = createDriverClient(svc.Spec.ClusterIP, pxAPIPort) - if err != nil || util.portworxClient == nil { - klog.Errorf("Failed to connect to portworx service. Err: %v", err) - return nil, err - } - - klog.Infof("Using portworx cluster service at: %v as api endpoint", svc.Spec.ClusterIP) - } else { - klog.Infof("Using portworx service at: %v as api endpoint", volumeHost.GetHostName()) - } - - return volumeclient.VolumeDriver(util.portworxClient), nil -} - -// lookupPXAPIPortFromService goes over all the ports in the given service and returns the target -// port for osdMgmtDefaultPort -func lookupPXAPIPortFromService(svc *v1.Service) int32 { - for _, p := range svc.Spec.Ports { - if p.Port == osdMgmtDefaultPort { - return p.TargetPort.IntVal - } - } - return osdMgmtDefaultPort // default + return svc, nil }