diff --git a/pkg/volume/portworx/portworx.go b/pkg/volume/portworx/portworx.go index 212a4c23d79..e2cb8982986 100644 --- a/pkg/volume/portworx/portworx.go +++ b/pkg/volume/portworx/portworx.go @@ -20,6 +20,7 @@ import ( "fmt" "os" + volumeclient "github.com/libopenstorage/openstorage/api/client/volume" "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/resource" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -43,7 +44,7 @@ func ProbeVolumePlugins() []volume.VolumePlugin { type portworxVolumePlugin struct { host volume.VolumeHost - util *PortworxVolumeUtil + util *portworxVolumeUtil } var _ volume.VolumePlugin = &portworxVolumePlugin{} @@ -61,8 +62,18 @@ func getPath(uid types.UID, volName string, host volume.VolumeHost) string { } func (plugin *portworxVolumePlugin) Init(host volume.VolumeHost) error { + client, err := volumeclient.NewDriverClient( + fmt.Sprintf("http://%s:%d", host.GetHostName(), osdMgmtDefaultPort), + pxdDriverName, osdDriverVersion, pxDriverName) + if err != nil { + return err + } + plugin.host = host - plugin.util = &PortworxVolumeUtil{} + plugin.util = &portworxVolumeUtil{ + portworxClient: client, + } + return nil } diff --git a/pkg/volume/portworx/portworx_util.go b/pkg/volume/portworx/portworx_util.go index 62b3e3f4d31..7ac5522dac1 100644 --- a/pkg/volume/portworx/portworx_util.go +++ b/pkg/volume/portworx/portworx_util.go @@ -34,22 +34,22 @@ import ( ) const ( - osdMgmtPort = "9001" - osdDriverVersion = "v1" - pxdDriverName = "pxd" - pvcClaimLabel = "pvc" - pvcNamespaceLabel = "namespace" - pxServiceName = "portworx-service" - pxDriverName = "pxd-sched" + osdMgmtDefaultPort = 9001 + osdDriverVersion = "v1" + pxdDriverName = "pxd" + pvcClaimLabel = "pvc" + pvcNamespaceLabel = "namespace" + pxServiceName = "portworx-service" + pxDriverName = "pxd-sched" ) -type PortworxVolumeUtil struct { +type portworxVolumeUtil struct { portworxClient *osdclient.Client } // CreateVolume creates a Portworx volume. -func (util *PortworxVolumeUtil) CreateVolume(p *portworxVolumeProvisioner) (string, int64, map[string]string, error) { - driver, err := util.getPortworxDriver(p.plugin.host, false /*localOnly*/) +func (util *portworxVolumeUtil) CreateVolume(p *portworxVolumeProvisioner) (string, int64, map[string]string, error) { + driver, err := util.getPortworxDriver(p.plugin.host) if err != nil || driver == nil { klog.Errorf("Failed to get portworx driver. Err: %v", err) return "", 0, nil, err @@ -112,8 +112,8 @@ func (util *PortworxVolumeUtil) CreateVolume(p *portworxVolumeProvisioner) (stri } // DeleteVolume deletes a Portworx volume -func (util *PortworxVolumeUtil) DeleteVolume(d *portworxVolumeDeleter) error { - driver, err := util.getPortworxDriver(d.plugin.host, false /*localOnly*/) +func (util *portworxVolumeUtil) DeleteVolume(d *portworxVolumeDeleter) error { + driver, err := util.getPortworxDriver(d.plugin.host) if err != nil || driver == nil { klog.Errorf("Failed to get portworx driver. Err: %v", err) return err @@ -128,8 +128,8 @@ func (util *PortworxVolumeUtil) DeleteVolume(d *portworxVolumeDeleter) error { } // AttachVolume attaches a Portworx Volume -func (util *PortworxVolumeUtil) AttachVolume(m *portworxVolumeMounter, attachOptions map[string]string) (string, error) { - driver, err := util.getPortworxDriver(m.plugin.host, true /*localOnly*/) +func (util *portworxVolumeUtil) AttachVolume(m *portworxVolumeMounter, attachOptions map[string]string) (string, error) { + driver, err := util.getLocalPortworxDriver(m.plugin.host) if err != nil || driver == nil { klog.Errorf("Failed to get portworx driver. Err: %v", err) return "", err @@ -144,8 +144,8 @@ func (util *PortworxVolumeUtil) AttachVolume(m *portworxVolumeMounter, attachOpt } // DetachVolume detaches a Portworx Volume -func (util *PortworxVolumeUtil) DetachVolume(u *portworxVolumeUnmounter) error { - driver, err := util.getPortworxDriver(u.plugin.host, true /*localOnly*/) +func (util *portworxVolumeUtil) DetachVolume(u *portworxVolumeUnmounter) error { + driver, err := util.getLocalPortworxDriver(u.plugin.host) if err != nil || driver == nil { klog.Errorf("Failed to get portworx driver. Err: %v", err) return err @@ -160,8 +160,8 @@ func (util *PortworxVolumeUtil) DetachVolume(u *portworxVolumeUnmounter) error { } // MountVolume mounts a Portworx Volume on the specified mountPath -func (util *PortworxVolumeUtil) MountVolume(m *portworxVolumeMounter, mountPath string) error { - driver, err := util.getPortworxDriver(m.plugin.host, true /*localOnly*/) +func (util *portworxVolumeUtil) MountVolume(m *portworxVolumeMounter, mountPath string) error { + driver, err := util.getLocalPortworxDriver(m.plugin.host) if err != nil || driver == nil { klog.Errorf("Failed to get portworx driver. Err: %v", err) return err @@ -176,8 +176,8 @@ func (util *PortworxVolumeUtil) MountVolume(m *portworxVolumeMounter, mountPath } // UnmountVolume unmounts a Portworx Volume -func (util *PortworxVolumeUtil) UnmountVolume(u *portworxVolumeUnmounter, mountPath string) error { - driver, err := util.getPortworxDriver(u.plugin.host, true /*localOnly*/) +func (util *portworxVolumeUtil) UnmountVolume(u *portworxVolumeUnmounter, mountPath string) error { + driver, err := util.getLocalPortworxDriver(u.plugin.host) if err != nil || driver == nil { klog.Errorf("Failed to get portworx driver. Err: %v", err) return err @@ -191,8 +191,8 @@ func (util *PortworxVolumeUtil) UnmountVolume(u *portworxVolumeUnmounter, mountP return nil } -func (util *PortworxVolumeUtil) ResizeVolume(spec *volume.Spec, newSize resource.Quantity, volumeHost volume.VolumeHost) error { - driver, err := util.getPortworxDriver(volumeHost, false /*localOnly*/) +func (util *portworxVolumeUtil) ResizeVolume(spec *volume.Spec, newSize resource.Quantity, volumeHost volume.VolumeHost) error { + driver, err := util.getPortworxDriver(volumeHost) if err != nil || driver == nil { klog.Errorf("Failed to get portworx driver. Err: %v", err) return err @@ -254,8 +254,8 @@ func isClientValid(client *osdclient.Client) (bool, error) { return true, nil } -func createDriverClient(hostname string) (*osdclient.Client, error) { - client, err := volumeclient.NewDriverClient("http://"+hostname+":"+osdMgmtPort, +func createDriverClient(hostname string, port int32) (*osdclient.Client, error) { + client, err := volumeclient.NewDriverClient(fmt.Sprintf("http://%s:%d", hostname, port), pxdDriverName, osdDriverVersion, pxDriverName) if err != nil { return nil, err @@ -268,65 +268,105 @@ func createDriverClient(hostname string) (*osdclient.Client, error) { } } -// getPortworxDriver() returns a Portworx volume driver which can be used for volume operations -// localOnly: If true, the returned driver will be connected to Portworx API server on volume host. -// If false, driver will be connected to API server on volume host or Portworx k8s service cluster IP -// This flag is required to explicitly force certain operations (mount, unmount, detach, attach) to -// go to the volume host instead of the k8s service which might route it to any host. This pertains to how -// Portworx mounts and attaches a volume to the running container. The node getting these requests needs to -// see the pod container mounts (specifically /var/lib/kubelet/pods/) -// Operations like create and delete volume don't need to be restricted to local volume host since -// any node in the Portworx cluster can co-ordinate the create/delete request and forward the operations to -// the Portworx node that will own/owns the data. -func (util *PortworxVolumeUtil) getPortworxDriver(volumeHost volume.VolumeHost, localOnly bool) (volumeapi.VolumeDriver, error) { - var err error - if localOnly { - util.portworxClient, err = createDriverClient(volumeHost.GetHostName()) - if err != nil { - return nil, err - } else { - klog.V(4).Infof("Using portworx local service at: %v as api endpoint", volumeHost.GetHostName()) - return volumeclient.VolumeDriver(util.portworxClient), nil - } - } - +// getPortworxDriver returns a Portworx volume driver which can be used for cluster wide operations. +// Operations like create and delete volume don't need to be restricted to local volume host since +// any node in the Portworx cluster can co-ordinate the create/delete request and forward the operations to +// the Portworx node that will own/owns the data. +func (util *portworxVolumeUtil) getPortworxDriver(volumeHost volume.VolumeHost) (volumeapi.VolumeDriver, error) { // check if existing saved client is valid if isValid, _ := isClientValid(util.portworxClient); isValid { return volumeclient.VolumeDriver(util.portworxClient), nil } // create new client - util.portworxClient, err = createDriverClient(volumeHost.GetHostName()) // for backward compatibility + var err error + util.portworxClient, err = createDriverClient(volumeHost.GetHostName(), osdMgmtDefaultPort) // for backward compatibility if err != nil || util.portworxClient == nil { - // Create client from portworx service - kubeClient := volumeHost.GetKubeClient() - if kubeClient == nil { - klog.Error("Failed to get kubeclient when creating portworx client") - return nil, nil - } - - opts := metav1.GetOptions{} - svc, err := kubeClient.CoreV1().Services(api.NamespaceSystem).Get(pxServiceName, opts) + // Create client from portworx k8s service. + svc, err := getPortworxService(volumeHost) if err != nil { - klog.Errorf("Failed to get service. Err: %v", err) return nil, err } - if svc == nil { - klog.Errorf("Service: %v not found. Consult Portworx docs to deploy it.", pxServiceName) - return nil, err - } - - util.portworxClient, err = createDriverClient(svc.Spec.ClusterIP) + // The port here is always the default one since it's the service port + util.portworxClient, err = createDriverClient(svc.Spec.ClusterIP, osdMgmtDefaultPort) if err != nil || util.portworxClient == nil { klog.Errorf("Failed to connect to portworx service. Err: %v", err) return nil, err } - klog.Infof("Using portworx cluster service at: %v as api endpoint", svc.Spec.ClusterIP) + klog.Infof("Using portworx cluster service at: %v:%d as api endpoint", + svc.Spec.ClusterIP, osdMgmtDefaultPort) } else { - klog.Infof("Using portworx service at: %v as api endpoint", volumeHost.GetHostName()) + klog.Infof("Using portworx service at: %v:%d as api endpoint", + volumeHost.GetHostName(), osdMgmtDefaultPort) } return volumeclient.VolumeDriver(util.portworxClient), nil } + +// getLocalPortworxDriver returns driver connected to Portworx API server on volume host. +// This is required to force certain operations (mount, unmount, detach, attach) to +// go to the volume host instead of the k8s service which might route it to any host. This pertains to how +// Portworx mounts and attaches a volume to the running container. The node getting these requests needs to +// see the pod container mounts (specifically /var/lib/kubelet/pods/) +func (util *portworxVolumeUtil) getLocalPortworxDriver(volumeHost volume.VolumeHost) (volumeapi.VolumeDriver, error) { + if util.portworxClient != nil { + // check if existing saved client is valid + if isValid, _ := isClientValid(util.portworxClient); isValid { + return volumeclient.VolumeDriver(util.portworxClient), nil + } + } + + // Lookup port + svc, err := getPortworxService(volumeHost) + if err != nil { + return nil, err + } + + osgMgmtPort := lookupPXAPIPortFromService(svc) + util.portworxClient, err = createDriverClient(volumeHost.GetHostName(), osgMgmtPort) + if err != nil { + return nil, err + } + + klog.Infof("Using portworx local service at: %v:%d as api endpoint", + volumeHost.GetHostName(), osgMgmtPort) + return volumeclient.VolumeDriver(util.portworxClient), nil +} + +// lookupPXAPIPortFromService goes over all the ports in the given service and returns the target +// port for osdMgmtDefaultPort +func lookupPXAPIPortFromService(svc *v1.Service) int32 { + for _, p := range svc.Spec.Ports { + if p.Port == osdMgmtDefaultPort { + return p.TargetPort.IntVal + } + } + return osdMgmtDefaultPort // default +} + +// getPortworxService returns the portworx cluster service from the API server +func getPortworxService(host volume.VolumeHost) (*v1.Service, error) { + kubeClient := host.GetKubeClient() + if kubeClient == nil { + err := fmt.Errorf("Failed to get kubeclient when creating portworx client") + klog.Errorf(err.Error()) + return nil, err + } + + opts := metav1.GetOptions{} + svc, err := kubeClient.CoreV1().Services(api.NamespaceSystem).Get(pxServiceName, opts) + if err != nil { + klog.Errorf("Failed to get service. Err: %v", err) + return nil, err + } + + if svc == nil { + err = fmt.Errorf("Service: %v not found. Consult Portworx docs to deploy it.", pxServiceName) + klog.Errorf(err.Error()) + return nil, err + } + + return svc, nil +}