From c6a2d24fa2c48d4a2270cf9a0ccfd60992f0b732 Mon Sep 17 00:00:00 2001 From: Harsh Desai Date: Mon, 22 Oct 2018 12:34:58 -0700 Subject: [PATCH 1/3] Lookup PX api port from k8s service Fixes #70033 Signed-off-by: Harsh Desai --- pkg/volume/portworx/portworx_util.go | 78 +++++++++++++++++----------- 1 file changed, 47 insertions(+), 31 deletions(-) diff --git a/pkg/volume/portworx/portworx_util.go b/pkg/volume/portworx/portworx_util.go index 62b3e3f4d31..66e90e6a5e5 100644 --- a/pkg/volume/portworx/portworx_util.go +++ b/pkg/volume/portworx/portworx_util.go @@ -34,13 +34,13 @@ import ( ) const ( - osdMgmtPort = "9001" - osdDriverVersion = "v1" - pxdDriverName = "pxd" - pvcClaimLabel = "pvc" - pvcNamespaceLabel = "namespace" - pxServiceName = "portworx-service" - pxDriverName = "pxd-sched" + osdMgmtDefaultPort = 9001 + osdDriverVersion = "v1" + pxdDriverName = "pxd" + pvcClaimLabel = "pvc" + pvcNamespaceLabel = "namespace" + pxServiceName = "portworx-service" + pxDriverName = "pxd-sched" ) type PortworxVolumeUtil struct { @@ -254,8 +254,8 @@ func isClientValid(client *osdclient.Client) (bool, error) { return true, nil } -func createDriverClient(hostname string) (*osdclient.Client, error) { - client, err := volumeclient.NewDriverClient("http://"+hostname+":"+osdMgmtPort, +func createDriverClient(hostname string, port int32) (*osdclient.Client, error) { + client, err := volumeclient.NewDriverClient(fmt.Sprintf("http://%s:%d", hostname, port), pxdDriverName, osdDriverVersion, pxDriverName) if err != nil { return nil, err @@ -280,8 +280,31 @@ func createDriverClient(hostname string) (*osdclient.Client, error) { // the Portworx node that will own/owns the data. func (util *PortworxVolumeUtil) getPortworxDriver(volumeHost volume.VolumeHost, localOnly bool) (volumeapi.VolumeDriver, error) { var err error + kubeClient := volumeHost.GetKubeClient() + if kubeClient == nil { + err = fmt.Errorf("Failed to get kubeclient when creating portworx client") + glog.Errorf(err.Error()) + return nil, err + } + + opts := metav1.GetOptions{} + svc, err := kubeClient.CoreV1().Services(api.NamespaceSystem).Get(pxServiceName, opts) + if err != nil { + glog.Errorf("Failed to get service. Err: %v", err) + return nil, err + } + + if svc == nil { + err = fmt.Errorf("Service: %v not found. Consult Portworx docs to deploy it.", pxServiceName) + glog.Errorf(err.Error()) + return nil, err + } + + var pxAPIPort int32 = osdMgmtDefaultPort + if localOnly { - util.portworxClient, err = createDriverClient(volumeHost.GetHostName()) + pxAPIPort = lookupPXAPIPortFromService(svc) + util.portworxClient, err = createDriverClient(volumeHost.GetHostName(), pxAPIPort) if err != nil { return nil, err } else { @@ -296,28 +319,10 @@ func (util *PortworxVolumeUtil) getPortworxDriver(volumeHost volume.VolumeHost, } // create new client - util.portworxClient, err = createDriverClient(volumeHost.GetHostName()) // for backward compatibility + util.portworxClient, err = createDriverClient(volumeHost.GetHostName(), pxAPIPort) // for backward compatibility if err != nil || util.portworxClient == nil { - // Create client from portworx service - kubeClient := volumeHost.GetKubeClient() - if kubeClient == nil { - klog.Error("Failed to get kubeclient when creating portworx client") - return nil, nil - } - - opts := metav1.GetOptions{} - svc, err := kubeClient.CoreV1().Services(api.NamespaceSystem).Get(pxServiceName, opts) - if err != nil { - klog.Errorf("Failed to get service. Err: %v", err) - return nil, err - } - - if svc == nil { - klog.Errorf("Service: %v not found. Consult Portworx docs to deploy it.", pxServiceName) - return nil, err - } - - util.portworxClient, err = createDriverClient(svc.Spec.ClusterIP) + // Create client from portworx k8s service + util.portworxClient, err = createDriverClient(svc.Spec.ClusterIP, pxAPIPort) if err != nil || util.portworxClient == nil { klog.Errorf("Failed to connect to portworx service. Err: %v", err) return nil, err @@ -330,3 +335,14 @@ func (util *PortworxVolumeUtil) getPortworxDriver(volumeHost volume.VolumeHost, return volumeclient.VolumeDriver(util.portworxClient), nil } + +// lookupPXAPIPortFromService goes over all the ports in the given service and returns the target +// port for osdMgmtDefaultPort +func lookupPXAPIPortFromService(svc *v1.Service) int32 { + for _, p := range svc.Spec.Ports { + if p.Port == osdMgmtDefaultPort { + return p.TargetPort.IntVal + } + } + return osdMgmtDefaultPort // default +} From db93c5fcbec3b89fba78459bedb7c3bead49d846 Mon Sep 17 00:00:00 2001 From: Harsh Desai Date: Wed, 31 Oct 2018 10:09:53 -0700 Subject: [PATCH 2/3] cache portworx API port - reused client whenever possible - refactor get client function into explicit cluster-wide and local functions Signed-off-by: Harsh Desai --- pkg/volume/portworx/portworx.go | 15 ++- pkg/volume/portworx/portworx_util.go | 170 +++++++++++++++------------ 2 files changed, 110 insertions(+), 75 deletions(-) diff --git a/pkg/volume/portworx/portworx.go b/pkg/volume/portworx/portworx.go index 212a4c23d79..e2cb8982986 100644 --- a/pkg/volume/portworx/portworx.go +++ b/pkg/volume/portworx/portworx.go @@ -20,6 +20,7 @@ import ( "fmt" "os" + volumeclient "github.com/libopenstorage/openstorage/api/client/volume" "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/resource" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -43,7 +44,7 @@ func ProbeVolumePlugins() []volume.VolumePlugin { type portworxVolumePlugin struct { host volume.VolumeHost - util *PortworxVolumeUtil + util *portworxVolumeUtil } var _ volume.VolumePlugin = &portworxVolumePlugin{} @@ -61,8 +62,18 @@ func getPath(uid types.UID, volName string, host volume.VolumeHost) string { } func (plugin *portworxVolumePlugin) Init(host volume.VolumeHost) error { + client, err := volumeclient.NewDriverClient( + fmt.Sprintf("http://%s:%d", host.GetHostName(), osdMgmtDefaultPort), + pxdDriverName, osdDriverVersion, pxDriverName) + if err != nil { + return err + } + plugin.host = host - plugin.util = &PortworxVolumeUtil{} + plugin.util = &portworxVolumeUtil{ + portworxClient: client, + } + return nil } diff --git a/pkg/volume/portworx/portworx_util.go b/pkg/volume/portworx/portworx_util.go index 66e90e6a5e5..ea6092da770 100644 --- a/pkg/volume/portworx/portworx_util.go +++ b/pkg/volume/portworx/portworx_util.go @@ -43,13 +43,13 @@ const ( pxDriverName = "pxd-sched" ) -type PortworxVolumeUtil struct { +type portworxVolumeUtil struct { portworxClient *osdclient.Client } // CreateVolume creates a Portworx volume. -func (util *PortworxVolumeUtil) CreateVolume(p *portworxVolumeProvisioner) (string, int64, map[string]string, error) { - driver, err := util.getPortworxDriver(p.plugin.host, false /*localOnly*/) +func (util *portworxVolumeUtil) CreateVolume(p *portworxVolumeProvisioner) (string, int64, map[string]string, error) { + driver, err := util.getPortworxDriver(p.plugin.host) if err != nil || driver == nil { klog.Errorf("Failed to get portworx driver. Err: %v", err) return "", 0, nil, err @@ -112,8 +112,8 @@ func (util *PortworxVolumeUtil) CreateVolume(p *portworxVolumeProvisioner) (stri } // DeleteVolume deletes a Portworx volume -func (util *PortworxVolumeUtil) DeleteVolume(d *portworxVolumeDeleter) error { - driver, err := util.getPortworxDriver(d.plugin.host, false /*localOnly*/) +func (util *portworxVolumeUtil) DeleteVolume(d *portworxVolumeDeleter) error { + driver, err := util.getPortworxDriver(d.plugin.host) if err != nil || driver == nil { klog.Errorf("Failed to get portworx driver. Err: %v", err) return err @@ -128,8 +128,8 @@ func (util *PortworxVolumeUtil) DeleteVolume(d *portworxVolumeDeleter) error { } // AttachVolume attaches a Portworx Volume -func (util *PortworxVolumeUtil) AttachVolume(m *portworxVolumeMounter, attachOptions map[string]string) (string, error) { - driver, err := util.getPortworxDriver(m.plugin.host, true /*localOnly*/) +func (util *portworxVolumeUtil) AttachVolume(m *portworxVolumeMounter, attachOptions map[string]string) (string, error) { + driver, err := util.getLocalPortworxDriver(m.plugin.host) if err != nil || driver == nil { klog.Errorf("Failed to get portworx driver. Err: %v", err) return "", err @@ -144,8 +144,8 @@ func (util *PortworxVolumeUtil) AttachVolume(m *portworxVolumeMounter, attachOpt } // DetachVolume detaches a Portworx Volume -func (util *PortworxVolumeUtil) DetachVolume(u *portworxVolumeUnmounter) error { - driver, err := util.getPortworxDriver(u.plugin.host, true /*localOnly*/) +func (util *portworxVolumeUtil) DetachVolume(u *portworxVolumeUnmounter) error { + driver, err := util.getLocalPortworxDriver(u.plugin.host) if err != nil || driver == nil { klog.Errorf("Failed to get portworx driver. Err: %v", err) return err @@ -160,8 +160,8 @@ func (util *PortworxVolumeUtil) DetachVolume(u *portworxVolumeUnmounter) error { } // MountVolume mounts a Portworx Volume on the specified mountPath -func (util *PortworxVolumeUtil) MountVolume(m *portworxVolumeMounter, mountPath string) error { - driver, err := util.getPortworxDriver(m.plugin.host, true /*localOnly*/) +func (util *portworxVolumeUtil) MountVolume(m *portworxVolumeMounter, mountPath string) error { + driver, err := util.getLocalPortworxDriver(m.plugin.host) if err != nil || driver == nil { klog.Errorf("Failed to get portworx driver. Err: %v", err) return err @@ -176,8 +176,8 @@ func (util *PortworxVolumeUtil) MountVolume(m *portworxVolumeMounter, mountPath } // UnmountVolume unmounts a Portworx Volume -func (util *PortworxVolumeUtil) UnmountVolume(u *portworxVolumeUnmounter, mountPath string) error { - driver, err := util.getPortworxDriver(u.plugin.host, true /*localOnly*/) +func (util *portworxVolumeUtil) UnmountVolume(u *portworxVolumeUnmounter, mountPath string) error { + driver, err := util.getLocalPortworxDriver(u.plugin.host) if err != nil || driver == nil { klog.Errorf("Failed to get portworx driver. Err: %v", err) return err @@ -191,8 +191,8 @@ func (util *PortworxVolumeUtil) UnmountVolume(u *portworxVolumeUnmounter, mountP return nil } -func (util *PortworxVolumeUtil) ResizeVolume(spec *volume.Spec, newSize resource.Quantity, volumeHost volume.VolumeHost) error { - driver, err := util.getPortworxDriver(volumeHost, false /*localOnly*/) +func (util *portworxVolumeUtil) ResizeVolume(spec *volume.Spec, newSize resource.Quantity, volumeHost volume.VolumeHost) error { + driver, err := util.getPortworxDriver(volumeHost) if err != nil || driver == nil { klog.Errorf("Failed to get portworx driver. Err: %v", err) return err @@ -268,21 +268,89 @@ func createDriverClient(hostname string, port int32) (*osdclient.Client, error) } } -// getPortworxDriver() returns a Portworx volume driver which can be used for volume operations -// localOnly: If true, the returned driver will be connected to Portworx API server on volume host. -// If false, driver will be connected to API server on volume host or Portworx k8s service cluster IP -// This flag is required to explicitly force certain operations (mount, unmount, detach, attach) to -// go to the volume host instead of the k8s service which might route it to any host. This pertains to how -// Portworx mounts and attaches a volume to the running container. The node getting these requests needs to -// see the pod container mounts (specifically /var/lib/kubelet/pods/) -// Operations like create and delete volume don't need to be restricted to local volume host since -// any node in the Portworx cluster can co-ordinate the create/delete request and forward the operations to -// the Portworx node that will own/owns the data. -func (util *PortworxVolumeUtil) getPortworxDriver(volumeHost volume.VolumeHost, localOnly bool) (volumeapi.VolumeDriver, error) { +// getPortworxDriver returns a Portworx volume driver which can be used for cluster wide operations. +// Operations like create and delete volume don't need to be restricted to local volume host since +// any node in the Portworx cluster can co-ordinate the create/delete request and forward the operations to +// the Portworx node that will own/owns the data. +func (util *portworxVolumeUtil) getPortworxDriver(volumeHost volume.VolumeHost) (volumeapi.VolumeDriver, error) { + // check if existing saved client is valid + if isValid, _ := isClientValid(util.portworxClient); isValid { + return volumeclient.VolumeDriver(util.portworxClient), nil + } + + // create new client var err error - kubeClient := volumeHost.GetKubeClient() + util.portworxClient, err = createDriverClient(volumeHost.GetHostName(), osdMgmtDefaultPort) // for backward compatibility + if err != nil || util.portworxClient == nil { + // Create client from portworx k8s service. + svc, err := getPortworxService(volumeHost) + if err != nil { + return nil, err + } + + // The port here is always the default one since it's the service port + util.portworxClient, err = createDriverClient(svc.Spec.ClusterIP, osdMgmtDefaultPort) + if err != nil || util.portworxClient == nil { + klog.Errorf("Failed to connect to portworx service. Err: %v", err) + return nil, err + } + + glog.Infof("Using portworx cluster service at: %v:%d as api endpoint", + svc.Spec.ClusterIP, osdMgmtDefaultPort) + } else { + glog.Infof("Using portworx service at: %v:%d as api endpoint", + volumeHost.GetHostName(), osdMgmtDefaultPort) + } + + return volumeclient.VolumeDriver(util.portworxClient), nil +} + +// getLocalPortworxDriver returns driver connected to Portworx API server on volume host. +// This is required to force certain operations (mount, unmount, detach, attach) to +// go to the volume host instead of the k8s service which might route it to any host. This pertains to how +// Portworx mounts and attaches a volume to the running container. The node getting these requests needs to +// see the pod container mounts (specifically /var/lib/kubelet/pods/) +func (util *portworxVolumeUtil) getLocalPortworxDriver(volumeHost volume.VolumeHost) (volumeapi.VolumeDriver, error) { + if util.portworxClient != nil { + // check if existing saved client is valid + if isValid, _ := isClientValid(util.portworxClient); isValid { + return volumeclient.VolumeDriver(util.portworxClient), nil + } + } + + // Lookup port + svc, err := getPortworxService(volumeHost) + if err != nil { + return nil, err + } + + osgMgmtPort := lookupPXAPIPortFromService(svc) + util.portworxClient, err = createDriverClient(volumeHost.GetHostName(), osgMgmtPort) + if err != nil { + return nil, err + } + + glog.Infof("Using portworx local service at: %v:%d as api endpoint", + volumeHost.GetHostName(), osgMgmtPort) + return volumeclient.VolumeDriver(util.portworxClient), nil +} + +// lookupPXAPIPortFromService goes over all the ports in the given service and returns the target +// port for osdMgmtDefaultPort +func lookupPXAPIPortFromService(svc *v1.Service) int32 { + for _, p := range svc.Spec.Ports { + if p.Port == osdMgmtDefaultPort { + return p.TargetPort.IntVal + } + } + return osdMgmtDefaultPort // default +} + +// getPortworxService returns the portworx cluster service from the API server +func getPortworxService(host volume.VolumeHost) (*v1.Service, error) { + kubeClient := host.GetKubeClient() if kubeClient == nil { - err = fmt.Errorf("Failed to get kubeclient when creating portworx client") + err := fmt.Errorf("Failed to get kubeclient when creating portworx client") glog.Errorf(err.Error()) return nil, err } @@ -300,49 +368,5 @@ func (util *PortworxVolumeUtil) getPortworxDriver(volumeHost volume.VolumeHost, return nil, err } - var pxAPIPort int32 = osdMgmtDefaultPort - - if localOnly { - pxAPIPort = lookupPXAPIPortFromService(svc) - util.portworxClient, err = createDriverClient(volumeHost.GetHostName(), pxAPIPort) - if err != nil { - return nil, err - } else { - klog.V(4).Infof("Using portworx local service at: %v as api endpoint", volumeHost.GetHostName()) - return volumeclient.VolumeDriver(util.portworxClient), nil - } - } - - // check if existing saved client is valid - if isValid, _ := isClientValid(util.portworxClient); isValid { - return volumeclient.VolumeDriver(util.portworxClient), nil - } - - // create new client - util.portworxClient, err = createDriverClient(volumeHost.GetHostName(), pxAPIPort) // for backward compatibility - if err != nil || util.portworxClient == nil { - // Create client from portworx k8s service - util.portworxClient, err = createDriverClient(svc.Spec.ClusterIP, pxAPIPort) - if err != nil || util.portworxClient == nil { - klog.Errorf("Failed to connect to portworx service. Err: %v", err) - return nil, err - } - - klog.Infof("Using portworx cluster service at: %v as api endpoint", svc.Spec.ClusterIP) - } else { - klog.Infof("Using portworx service at: %v as api endpoint", volumeHost.GetHostName()) - } - - return volumeclient.VolumeDriver(util.portworxClient), nil -} - -// lookupPXAPIPortFromService goes over all the ports in the given service and returns the target -// port for osdMgmtDefaultPort -func lookupPXAPIPortFromService(svc *v1.Service) int32 { - for _, p := range svc.Spec.Ports { - if p.Port == osdMgmtDefaultPort { - return p.TargetPort.IntVal - } - } - return osdMgmtDefaultPort // default + return svc, nil } From fdc60629c9b7640f2711645e9ad55ebc4a52ce47 Mon Sep 17 00:00:00 2001 From: Harsh Desai Date: Mon, 12 Nov 2018 10:59:57 -0800 Subject: [PATCH 3/3] Update portworx to move from glog to klog Signed-off-by: Harsh Desai --- pkg/volume/portworx/portworx_util.go | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/pkg/volume/portworx/portworx_util.go b/pkg/volume/portworx/portworx_util.go index ea6092da770..7ac5522dac1 100644 --- a/pkg/volume/portworx/portworx_util.go +++ b/pkg/volume/portworx/portworx_util.go @@ -295,10 +295,10 @@ func (util *portworxVolumeUtil) getPortworxDriver(volumeHost volume.VolumeHost) return nil, err } - glog.Infof("Using portworx cluster service at: %v:%d as api endpoint", + klog.Infof("Using portworx cluster service at: %v:%d as api endpoint", svc.Spec.ClusterIP, osdMgmtDefaultPort) } else { - glog.Infof("Using portworx service at: %v:%d as api endpoint", + klog.Infof("Using portworx service at: %v:%d as api endpoint", volumeHost.GetHostName(), osdMgmtDefaultPort) } @@ -330,7 +330,7 @@ func (util *portworxVolumeUtil) getLocalPortworxDriver(volumeHost volume.VolumeH return nil, err } - glog.Infof("Using portworx local service at: %v:%d as api endpoint", + klog.Infof("Using portworx local service at: %v:%d as api endpoint", volumeHost.GetHostName(), osgMgmtPort) return volumeclient.VolumeDriver(util.portworxClient), nil } @@ -351,20 +351,20 @@ func getPortworxService(host volume.VolumeHost) (*v1.Service, error) { kubeClient := host.GetKubeClient() if kubeClient == nil { err := fmt.Errorf("Failed to get kubeclient when creating portworx client") - glog.Errorf(err.Error()) + klog.Errorf(err.Error()) return nil, err } opts := metav1.GetOptions{} svc, err := kubeClient.CoreV1().Services(api.NamespaceSystem).Get(pxServiceName, opts) if err != nil { - glog.Errorf("Failed to get service. Err: %v", err) + klog.Errorf("Failed to get service. Err: %v", err) return nil, err } if svc == nil { err = fmt.Errorf("Service: %v not found. Consult Portworx docs to deploy it.", pxServiceName) - glog.Errorf(err.Error()) + klog.Errorf(err.Error()) return nil, err }