mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-23 19:56:01 +00:00
Merge pull request #70392 from harsh-px/cfg-port
Lookup PX api port from k8s service
This commit is contained in:
commit
b548d92b7c
@ -20,6 +20,7 @@ import (
|
|||||||
"fmt"
|
"fmt"
|
||||||
"os"
|
"os"
|
||||||
|
|
||||||
|
volumeclient "github.com/libopenstorage/openstorage/api/client/volume"
|
||||||
"k8s.io/api/core/v1"
|
"k8s.io/api/core/v1"
|
||||||
"k8s.io/apimachinery/pkg/api/resource"
|
"k8s.io/apimachinery/pkg/api/resource"
|
||||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||||
@ -43,7 +44,7 @@ func ProbeVolumePlugins() []volume.VolumePlugin {
|
|||||||
|
|
||||||
type portworxVolumePlugin struct {
|
type portworxVolumePlugin struct {
|
||||||
host volume.VolumeHost
|
host volume.VolumeHost
|
||||||
util *PortworxVolumeUtil
|
util *portworxVolumeUtil
|
||||||
}
|
}
|
||||||
|
|
||||||
var _ volume.VolumePlugin = &portworxVolumePlugin{}
|
var _ volume.VolumePlugin = &portworxVolumePlugin{}
|
||||||
@ -61,8 +62,18 @@ func getPath(uid types.UID, volName string, host volume.VolumeHost) string {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (plugin *portworxVolumePlugin) Init(host volume.VolumeHost) error {
|
func (plugin *portworxVolumePlugin) Init(host volume.VolumeHost) error {
|
||||||
|
client, err := volumeclient.NewDriverClient(
|
||||||
|
fmt.Sprintf("http://%s:%d", host.GetHostName(), osdMgmtDefaultPort),
|
||||||
|
pxdDriverName, osdDriverVersion, pxDriverName)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
plugin.host = host
|
plugin.host = host
|
||||||
plugin.util = &PortworxVolumeUtil{}
|
plugin.util = &portworxVolumeUtil{
|
||||||
|
portworxClient: client,
|
||||||
|
}
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -34,22 +34,22 @@ import (
|
|||||||
)
|
)
|
||||||
|
|
||||||
const (
|
const (
|
||||||
osdMgmtPort = "9001"
|
osdMgmtDefaultPort = 9001
|
||||||
osdDriverVersion = "v1"
|
osdDriverVersion = "v1"
|
||||||
pxdDriverName = "pxd"
|
pxdDriverName = "pxd"
|
||||||
pvcClaimLabel = "pvc"
|
pvcClaimLabel = "pvc"
|
||||||
pvcNamespaceLabel = "namespace"
|
pvcNamespaceLabel = "namespace"
|
||||||
pxServiceName = "portworx-service"
|
pxServiceName = "portworx-service"
|
||||||
pxDriverName = "pxd-sched"
|
pxDriverName = "pxd-sched"
|
||||||
)
|
)
|
||||||
|
|
||||||
type PortworxVolumeUtil struct {
|
type portworxVolumeUtil struct {
|
||||||
portworxClient *osdclient.Client
|
portworxClient *osdclient.Client
|
||||||
}
|
}
|
||||||
|
|
||||||
// CreateVolume creates a Portworx volume.
|
// CreateVolume creates a Portworx volume.
|
||||||
func (util *PortworxVolumeUtil) CreateVolume(p *portworxVolumeProvisioner) (string, int64, map[string]string, error) {
|
func (util *portworxVolumeUtil) CreateVolume(p *portworxVolumeProvisioner) (string, int64, map[string]string, error) {
|
||||||
driver, err := util.getPortworxDriver(p.plugin.host, false /*localOnly*/)
|
driver, err := util.getPortworxDriver(p.plugin.host)
|
||||||
if err != nil || driver == nil {
|
if err != nil || driver == nil {
|
||||||
klog.Errorf("Failed to get portworx driver. Err: %v", err)
|
klog.Errorf("Failed to get portworx driver. Err: %v", err)
|
||||||
return "", 0, nil, err
|
return "", 0, nil, err
|
||||||
@ -112,8 +112,8 @@ func (util *PortworxVolumeUtil) CreateVolume(p *portworxVolumeProvisioner) (stri
|
|||||||
}
|
}
|
||||||
|
|
||||||
// DeleteVolume deletes a Portworx volume
|
// DeleteVolume deletes a Portworx volume
|
||||||
func (util *PortworxVolumeUtil) DeleteVolume(d *portworxVolumeDeleter) error {
|
func (util *portworxVolumeUtil) DeleteVolume(d *portworxVolumeDeleter) error {
|
||||||
driver, err := util.getPortworxDriver(d.plugin.host, false /*localOnly*/)
|
driver, err := util.getPortworxDriver(d.plugin.host)
|
||||||
if err != nil || driver == nil {
|
if err != nil || driver == nil {
|
||||||
klog.Errorf("Failed to get portworx driver. Err: %v", err)
|
klog.Errorf("Failed to get portworx driver. Err: %v", err)
|
||||||
return err
|
return err
|
||||||
@ -128,8 +128,8 @@ func (util *PortworxVolumeUtil) DeleteVolume(d *portworxVolumeDeleter) error {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// AttachVolume attaches a Portworx Volume
|
// AttachVolume attaches a Portworx Volume
|
||||||
func (util *PortworxVolumeUtil) AttachVolume(m *portworxVolumeMounter, attachOptions map[string]string) (string, error) {
|
func (util *portworxVolumeUtil) AttachVolume(m *portworxVolumeMounter, attachOptions map[string]string) (string, error) {
|
||||||
driver, err := util.getPortworxDriver(m.plugin.host, true /*localOnly*/)
|
driver, err := util.getLocalPortworxDriver(m.plugin.host)
|
||||||
if err != nil || driver == nil {
|
if err != nil || driver == nil {
|
||||||
klog.Errorf("Failed to get portworx driver. Err: %v", err)
|
klog.Errorf("Failed to get portworx driver. Err: %v", err)
|
||||||
return "", err
|
return "", err
|
||||||
@ -144,8 +144,8 @@ func (util *PortworxVolumeUtil) AttachVolume(m *portworxVolumeMounter, attachOpt
|
|||||||
}
|
}
|
||||||
|
|
||||||
// DetachVolume detaches a Portworx Volume
|
// DetachVolume detaches a Portworx Volume
|
||||||
func (util *PortworxVolumeUtil) DetachVolume(u *portworxVolumeUnmounter) error {
|
func (util *portworxVolumeUtil) DetachVolume(u *portworxVolumeUnmounter) error {
|
||||||
driver, err := util.getPortworxDriver(u.plugin.host, true /*localOnly*/)
|
driver, err := util.getLocalPortworxDriver(u.plugin.host)
|
||||||
if err != nil || driver == nil {
|
if err != nil || driver == nil {
|
||||||
klog.Errorf("Failed to get portworx driver. Err: %v", err)
|
klog.Errorf("Failed to get portworx driver. Err: %v", err)
|
||||||
return err
|
return err
|
||||||
@ -160,8 +160,8 @@ func (util *PortworxVolumeUtil) DetachVolume(u *portworxVolumeUnmounter) error {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// MountVolume mounts a Portworx Volume on the specified mountPath
|
// MountVolume mounts a Portworx Volume on the specified mountPath
|
||||||
func (util *PortworxVolumeUtil) MountVolume(m *portworxVolumeMounter, mountPath string) error {
|
func (util *portworxVolumeUtil) MountVolume(m *portworxVolumeMounter, mountPath string) error {
|
||||||
driver, err := util.getPortworxDriver(m.plugin.host, true /*localOnly*/)
|
driver, err := util.getLocalPortworxDriver(m.plugin.host)
|
||||||
if err != nil || driver == nil {
|
if err != nil || driver == nil {
|
||||||
klog.Errorf("Failed to get portworx driver. Err: %v", err)
|
klog.Errorf("Failed to get portworx driver. Err: %v", err)
|
||||||
return err
|
return err
|
||||||
@ -176,8 +176,8 @@ func (util *PortworxVolumeUtil) MountVolume(m *portworxVolumeMounter, mountPath
|
|||||||
}
|
}
|
||||||
|
|
||||||
// UnmountVolume unmounts a Portworx Volume
|
// UnmountVolume unmounts a Portworx Volume
|
||||||
func (util *PortworxVolumeUtil) UnmountVolume(u *portworxVolumeUnmounter, mountPath string) error {
|
func (util *portworxVolumeUtil) UnmountVolume(u *portworxVolumeUnmounter, mountPath string) error {
|
||||||
driver, err := util.getPortworxDriver(u.plugin.host, true /*localOnly*/)
|
driver, err := util.getLocalPortworxDriver(u.plugin.host)
|
||||||
if err != nil || driver == nil {
|
if err != nil || driver == nil {
|
||||||
klog.Errorf("Failed to get portworx driver. Err: %v", err)
|
klog.Errorf("Failed to get portworx driver. Err: %v", err)
|
||||||
return err
|
return err
|
||||||
@ -191,8 +191,8 @@ func (util *PortworxVolumeUtil) UnmountVolume(u *portworxVolumeUnmounter, mountP
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (util *PortworxVolumeUtil) ResizeVolume(spec *volume.Spec, newSize resource.Quantity, volumeHost volume.VolumeHost) error {
|
func (util *portworxVolumeUtil) ResizeVolume(spec *volume.Spec, newSize resource.Quantity, volumeHost volume.VolumeHost) error {
|
||||||
driver, err := util.getPortworxDriver(volumeHost, false /*localOnly*/)
|
driver, err := util.getPortworxDriver(volumeHost)
|
||||||
if err != nil || driver == nil {
|
if err != nil || driver == nil {
|
||||||
klog.Errorf("Failed to get portworx driver. Err: %v", err)
|
klog.Errorf("Failed to get portworx driver. Err: %v", err)
|
||||||
return err
|
return err
|
||||||
@ -254,8 +254,8 @@ func isClientValid(client *osdclient.Client) (bool, error) {
|
|||||||
return true, nil
|
return true, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func createDriverClient(hostname string) (*osdclient.Client, error) {
|
func createDriverClient(hostname string, port int32) (*osdclient.Client, error) {
|
||||||
client, err := volumeclient.NewDriverClient("http://"+hostname+":"+osdMgmtPort,
|
client, err := volumeclient.NewDriverClient(fmt.Sprintf("http://%s:%d", hostname, port),
|
||||||
pxdDriverName, osdDriverVersion, pxDriverName)
|
pxdDriverName, osdDriverVersion, pxDriverName)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
@ -268,65 +268,105 @@ func createDriverClient(hostname string) (*osdclient.Client, error) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// getPortworxDriver() returns a Portworx volume driver which can be used for volume operations
|
// getPortworxDriver returns a Portworx volume driver which can be used for cluster wide operations.
|
||||||
// localOnly: If true, the returned driver will be connected to Portworx API server on volume host.
|
// Operations like create and delete volume don't need to be restricted to local volume host since
|
||||||
// If false, driver will be connected to API server on volume host or Portworx k8s service cluster IP
|
// any node in the Portworx cluster can co-ordinate the create/delete request and forward the operations to
|
||||||
// This flag is required to explicitly force certain operations (mount, unmount, detach, attach) to
|
// the Portworx node that will own/owns the data.
|
||||||
// go to the volume host instead of the k8s service which might route it to any host. This pertains to how
|
func (util *portworxVolumeUtil) getPortworxDriver(volumeHost volume.VolumeHost) (volumeapi.VolumeDriver, error) {
|
||||||
// Portworx mounts and attaches a volume to the running container. The node getting these requests needs to
|
|
||||||
// see the pod container mounts (specifically /var/lib/kubelet/pods/<pod_id>)
|
|
||||||
// Operations like create and delete volume don't need to be restricted to local volume host since
|
|
||||||
// any node in the Portworx cluster can co-ordinate the create/delete request and forward the operations to
|
|
||||||
// the Portworx node that will own/owns the data.
|
|
||||||
func (util *PortworxVolumeUtil) getPortworxDriver(volumeHost volume.VolumeHost, localOnly bool) (volumeapi.VolumeDriver, error) {
|
|
||||||
var err error
|
|
||||||
if localOnly {
|
|
||||||
util.portworxClient, err = createDriverClient(volumeHost.GetHostName())
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
} else {
|
|
||||||
klog.V(4).Infof("Using portworx local service at: %v as api endpoint", volumeHost.GetHostName())
|
|
||||||
return volumeclient.VolumeDriver(util.portworxClient), nil
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// check if existing saved client is valid
|
// check if existing saved client is valid
|
||||||
if isValid, _ := isClientValid(util.portworxClient); isValid {
|
if isValid, _ := isClientValid(util.portworxClient); isValid {
|
||||||
return volumeclient.VolumeDriver(util.portworxClient), nil
|
return volumeclient.VolumeDriver(util.portworxClient), nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// create new client
|
// create new client
|
||||||
util.portworxClient, err = createDriverClient(volumeHost.GetHostName()) // for backward compatibility
|
var err error
|
||||||
|
util.portworxClient, err = createDriverClient(volumeHost.GetHostName(), osdMgmtDefaultPort) // for backward compatibility
|
||||||
if err != nil || util.portworxClient == nil {
|
if err != nil || util.portworxClient == nil {
|
||||||
// Create client from portworx service
|
// Create client from portworx k8s service.
|
||||||
kubeClient := volumeHost.GetKubeClient()
|
svc, err := getPortworxService(volumeHost)
|
||||||
if kubeClient == nil {
|
|
||||||
klog.Error("Failed to get kubeclient when creating portworx client")
|
|
||||||
return nil, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
opts := metav1.GetOptions{}
|
|
||||||
svc, err := kubeClient.CoreV1().Services(api.NamespaceSystem).Get(pxServiceName, opts)
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
klog.Errorf("Failed to get service. Err: %v", err)
|
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
if svc == nil {
|
// The port here is always the default one since it's the service port
|
||||||
klog.Errorf("Service: %v not found. Consult Portworx docs to deploy it.", pxServiceName)
|
util.portworxClient, err = createDriverClient(svc.Spec.ClusterIP, osdMgmtDefaultPort)
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
|
|
||||||
util.portworxClient, err = createDriverClient(svc.Spec.ClusterIP)
|
|
||||||
if err != nil || util.portworxClient == nil {
|
if err != nil || util.portworxClient == nil {
|
||||||
klog.Errorf("Failed to connect to portworx service. Err: %v", err)
|
klog.Errorf("Failed to connect to portworx service. Err: %v", err)
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
klog.Infof("Using portworx cluster service at: %v as api endpoint", svc.Spec.ClusterIP)
|
klog.Infof("Using portworx cluster service at: %v:%d as api endpoint",
|
||||||
|
svc.Spec.ClusterIP, osdMgmtDefaultPort)
|
||||||
} else {
|
} else {
|
||||||
klog.Infof("Using portworx service at: %v as api endpoint", volumeHost.GetHostName())
|
klog.Infof("Using portworx service at: %v:%d as api endpoint",
|
||||||
|
volumeHost.GetHostName(), osdMgmtDefaultPort)
|
||||||
}
|
}
|
||||||
|
|
||||||
return volumeclient.VolumeDriver(util.portworxClient), nil
|
return volumeclient.VolumeDriver(util.portworxClient), nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// getLocalPortworxDriver returns driver connected to Portworx API server on volume host.
|
||||||
|
// This is required to force certain operations (mount, unmount, detach, attach) to
|
||||||
|
// go to the volume host instead of the k8s service which might route it to any host. This pertains to how
|
||||||
|
// Portworx mounts and attaches a volume to the running container. The node getting these requests needs to
|
||||||
|
// see the pod container mounts (specifically /var/lib/kubelet/pods/<pod_id>)
|
||||||
|
func (util *portworxVolumeUtil) getLocalPortworxDriver(volumeHost volume.VolumeHost) (volumeapi.VolumeDriver, error) {
|
||||||
|
if util.portworxClient != nil {
|
||||||
|
// check if existing saved client is valid
|
||||||
|
if isValid, _ := isClientValid(util.portworxClient); isValid {
|
||||||
|
return volumeclient.VolumeDriver(util.portworxClient), nil
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Lookup port
|
||||||
|
svc, err := getPortworxService(volumeHost)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
osgMgmtPort := lookupPXAPIPortFromService(svc)
|
||||||
|
util.portworxClient, err = createDriverClient(volumeHost.GetHostName(), osgMgmtPort)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
klog.Infof("Using portworx local service at: %v:%d as api endpoint",
|
||||||
|
volumeHost.GetHostName(), osgMgmtPort)
|
||||||
|
return volumeclient.VolumeDriver(util.portworxClient), nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// lookupPXAPIPortFromService goes over all the ports in the given service and returns the target
|
||||||
|
// port for osdMgmtDefaultPort
|
||||||
|
func lookupPXAPIPortFromService(svc *v1.Service) int32 {
|
||||||
|
for _, p := range svc.Spec.Ports {
|
||||||
|
if p.Port == osdMgmtDefaultPort {
|
||||||
|
return p.TargetPort.IntVal
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return osdMgmtDefaultPort // default
|
||||||
|
}
|
||||||
|
|
||||||
|
// getPortworxService returns the portworx cluster service from the API server
|
||||||
|
func getPortworxService(host volume.VolumeHost) (*v1.Service, error) {
|
||||||
|
kubeClient := host.GetKubeClient()
|
||||||
|
if kubeClient == nil {
|
||||||
|
err := fmt.Errorf("Failed to get kubeclient when creating portworx client")
|
||||||
|
klog.Errorf(err.Error())
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
opts := metav1.GetOptions{}
|
||||||
|
svc, err := kubeClient.CoreV1().Services(api.NamespaceSystem).Get(pxServiceName, opts)
|
||||||
|
if err != nil {
|
||||||
|
klog.Errorf("Failed to get service. Err: %v", err)
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
if svc == nil {
|
||||||
|
err = fmt.Errorf("Service: %v not found. Consult Portworx docs to deploy it.", pxServiceName)
|
||||||
|
klog.Errorf(err.Error())
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
return svc, nil
|
||||||
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user