Merge pull request #48898 from portworx/fix-px-volume-calls

Automatic merge from submit-queue (batch tested with PRs 48997, 48595, 48898, 48711, 48972)

Use local PX endpoint for mount, unmount, detach and attach calls

**What this PR does / why we need it**:
This PR fixes an issue with Setup and TearDown of Portworx volumes which has side-effects such a Pod using a Portworx volume not being able to start on the minion.

**Which issue this PR fixes**: fixes #49034 
This PR addresses an issue that fails to mount, attach, unmount or detach a volume when Kubernetes sends these requests to Portworx when it's API server on that particular minion is down. 

Portworx mount, unmount, attach and detach requests need to be received on the minion where the pod is running. So these calls need to talk to the Portworx API server running locally on that node (and NOT to the Portworx k8s service since it may route the request to any node in the cluster). This PR explicitly makes such requests local only.

**Release note**:
```release-note
NONE
```
This commit is contained in:
Kubernetes Submit Queue 2017-07-17 13:23:52 -07:00 committed by GitHub
commit 2c1c33d416
2 changed files with 45 additions and 19 deletions

View File

@ -266,7 +266,7 @@ func (b *portworxVolumeMounter) SetUp(fsGroup *int64) error {
// SetUpAt attaches the disk and bind mounts to the volume path. // SetUpAt attaches the disk and bind mounts to the volume path.
func (b *portworxVolumeMounter) SetUpAt(dir string, fsGroup *int64) error { func (b *portworxVolumeMounter) SetUpAt(dir string, fsGroup *int64) error {
notMnt, err := b.mounter.IsLikelyNotMountPoint(dir) notMnt, err := b.mounter.IsLikelyNotMountPoint(dir)
glog.V(4).Infof("Portworx Volume set up: %s %v %v", dir, !notMnt, err) glog.Infof("Portworx Volume set up. Dir: %s %v %v", dir, !notMnt, err)
if err != nil && !os.IsNotExist(err) { if err != nil && !os.IsNotExist(err) {
glog.Errorf("Cannot validate mountpoint: %s", dir) glog.Errorf("Cannot validate mountpoint: %s", dir)
return err return err
@ -291,7 +291,7 @@ func (b *portworxVolumeMounter) SetUpAt(dir string, fsGroup *int64) error {
if !b.readOnly { if !b.readOnly {
volume.SetVolumeOwnership(b, fsGroup) volume.SetVolumeOwnership(b, fsGroup)
} }
glog.V(4).Infof("Portworx Volume %s mounted to %s", b.volumeID, dir) glog.Infof("Portworx Volume %s setup at %s", b.volumeID, dir)
return nil return nil
} }
@ -314,8 +314,8 @@ func (c *portworxVolumeUnmounter) TearDown() error {
// Unmounts the bind mount, and detaches the disk only if the PD // Unmounts the bind mount, and detaches the disk only if the PD
// resource was the last reference to that disk on the kubelet. // resource was the last reference to that disk on the kubelet.
func (c *portworxVolumeUnmounter) TearDownAt(dir string) error { func (c *portworxVolumeUnmounter) TearDownAt(dir string) error {
glog.V(4).Infof("Portworx Volume TearDown of %s", dir) glog.Infof("Portworx Volume TearDown of %s", dir)
// Call Portworx Unmount for Portworx's book-keeping.
if err := c.manager.UnmountVolume(c, dir); err != nil { if err := c.manager.UnmountVolume(c, dir); err != nil {
return err return err
} }

View File

@ -43,12 +43,14 @@ type PortworxVolumeUtil struct {
// CreateVolume creates a Portworx volume. // CreateVolume creates a Portworx volume.
func (util *PortworxVolumeUtil) CreateVolume(p *portworxVolumeProvisioner) (string, int, map[string]string, error) { func (util *PortworxVolumeUtil) CreateVolume(p *portworxVolumeProvisioner) (string, int, map[string]string, error) {
driver, err := util.getPortworxDriver(p.plugin.host) driver, err := util.getPortworxDriver(p.plugin.host, false /*localOnly*/)
if err != nil || driver == nil { if err != nil || driver == nil {
glog.Errorf("Failed to get portworx driver. Err: %v", err) glog.Errorf("Failed to get portworx driver. Err: %v", err)
return "", 0, nil, err return "", 0, nil, err
} }
glog.Infof("Creating Portworx volume for PVC: %v", p.options.PVC.Name)
capacity := p.options.PVC.Spec.Resources.Requests[v1.ResourceName(v1.ResourceStorage)] capacity := p.options.PVC.Spec.Resources.Requests[v1.ResourceName(v1.ResourceStorage)]
// Portworx Volumes are specified in GB // Portworx Volumes are specified in GB
requestGB := int(volume.RoundUpSize(capacity.Value(), 1024*1024*1024)) requestGB := int(volume.RoundUpSize(capacity.Value(), 1024*1024*1024))
@ -56,6 +58,7 @@ func (util *PortworxVolumeUtil) CreateVolume(p *portworxVolumeProvisioner) (stri
specHandler := osdspec.NewSpecHandler() specHandler := osdspec.NewSpecHandler()
spec, err := specHandler.SpecFromOpts(p.options.Parameters) spec, err := specHandler.SpecFromOpts(p.options.Parameters)
if err != nil { if err != nil {
glog.Errorf("Error parsing parameters for PVC: %v. Err: %v", p.options.PVC.Name, err)
return "", 0, nil, err return "", 0, nil, err
} }
spec.Size = uint64(requestGB * 1024 * 1024 * 1024) spec.Size = uint64(requestGB * 1024 * 1024 * 1024)
@ -68,14 +71,16 @@ func (util *PortworxVolumeUtil) CreateVolume(p *portworxVolumeProvisioner) (stri
locator.VolumeLabels[pvcClaimLabel] = p.options.PVC.Name locator.VolumeLabels[pvcClaimLabel] = p.options.PVC.Name
volumeID, err := driver.Create(&locator, &source, spec) volumeID, err := driver.Create(&locator, &source, spec)
if err != nil { if err != nil {
glog.V(2).Infof("Error creating Portworx Volume : %v", err) glog.Errorf("Error creating Portworx Volume : %v", err)
} }
glog.Infof("Successfully created Portworx volume for PVC: %v", p.options.PVC.Name)
return volumeID, requestGB, nil, err return volumeID, requestGB, nil, err
} }
// DeleteVolume deletes a Portworx volume // DeleteVolume deletes a Portworx volume
func (util *PortworxVolumeUtil) DeleteVolume(d *portworxVolumeDeleter) error { func (util *PortworxVolumeUtil) DeleteVolume(d *portworxVolumeDeleter) error {
driver, err := util.getPortworxDriver(d.plugin.host) driver, err := util.getPortworxDriver(d.plugin.host, false /*localOnly*/)
if err != nil || driver == nil { if err != nil || driver == nil {
glog.Errorf("Failed to get portworx driver. Err: %v", err) glog.Errorf("Failed to get portworx driver. Err: %v", err)
return err return err
@ -83,7 +88,7 @@ func (util *PortworxVolumeUtil) DeleteVolume(d *portworxVolumeDeleter) error {
err = driver.Delete(d.volumeID) err = driver.Delete(d.volumeID)
if err != nil { if err != nil {
glog.V(2).Infof("Error deleting Portworx Volume (%v): %v", d.volName, err) glog.Errorf("Error deleting Portworx Volume (%v): %v", d.volName, err)
return err return err
} }
return nil return nil
@ -91,7 +96,7 @@ func (util *PortworxVolumeUtil) DeleteVolume(d *portworxVolumeDeleter) error {
// AttachVolume attaches a Portworx Volume // AttachVolume attaches a Portworx Volume
func (util *PortworxVolumeUtil) AttachVolume(m *portworxVolumeMounter) (string, error) { func (util *PortworxVolumeUtil) AttachVolume(m *portworxVolumeMounter) (string, error) {
driver, err := util.getPortworxDriver(m.plugin.host) driver, err := util.getPortworxDriver(m.plugin.host, true /*localOnly*/)
if err != nil || driver == nil { if err != nil || driver == nil {
glog.Errorf("Failed to get portworx driver. Err: %v", err) glog.Errorf("Failed to get portworx driver. Err: %v", err)
return "", err return "", err
@ -99,7 +104,7 @@ func (util *PortworxVolumeUtil) AttachVolume(m *portworxVolumeMounter) (string,
devicePath, err := driver.Attach(m.volName) devicePath, err := driver.Attach(m.volName)
if err != nil { if err != nil {
glog.V(2).Infof("Error attaching Portworx Volume (%v): %v", m.volName, err) glog.Errorf("Error attaching Portworx Volume (%v): %v", m.volName, err)
return "", err return "", err
} }
return devicePath, nil return devicePath, nil
@ -107,7 +112,7 @@ func (util *PortworxVolumeUtil) AttachVolume(m *portworxVolumeMounter) (string,
// DetachVolume detaches a Portworx Volume // DetachVolume detaches a Portworx Volume
func (util *PortworxVolumeUtil) DetachVolume(u *portworxVolumeUnmounter) error { func (util *PortworxVolumeUtil) DetachVolume(u *portworxVolumeUnmounter) error {
driver, err := util.getPortworxDriver(u.plugin.host) driver, err := util.getPortworxDriver(u.plugin.host, true /*localOnly*/)
if err != nil || driver == nil { if err != nil || driver == nil {
glog.Errorf("Failed to get portworx driver. Err: %v", err) glog.Errorf("Failed to get portworx driver. Err: %v", err)
return err return err
@ -115,7 +120,7 @@ func (util *PortworxVolumeUtil) DetachVolume(u *portworxVolumeUnmounter) error {
err = driver.Detach(u.volName) err = driver.Detach(u.volName)
if err != nil { if err != nil {
glog.V(2).Infof("Error detaching Portworx Volume (%v): %v", u.volName, err) glog.Errorf("Error detaching Portworx Volume (%v): %v", u.volName, err)
return err return err
} }
return nil return nil
@ -123,7 +128,7 @@ func (util *PortworxVolumeUtil) DetachVolume(u *portworxVolumeUnmounter) error {
// MountVolume mounts a Portworx Volume on the specified mountPath // MountVolume mounts a Portworx Volume on the specified mountPath
func (util *PortworxVolumeUtil) MountVolume(m *portworxVolumeMounter, mountPath string) error { func (util *PortworxVolumeUtil) MountVolume(m *portworxVolumeMounter, mountPath string) error {
driver, err := util.getPortworxDriver(m.plugin.host) driver, err := util.getPortworxDriver(m.plugin.host, true /*localOnly*/)
if err != nil || driver == nil { if err != nil || driver == nil {
glog.Errorf("Failed to get portworx driver. Err: %v", err) glog.Errorf("Failed to get portworx driver. Err: %v", err)
return err return err
@ -131,7 +136,7 @@ func (util *PortworxVolumeUtil) MountVolume(m *portworxVolumeMounter, mountPath
err = driver.Mount(m.volName, mountPath) err = driver.Mount(m.volName, mountPath)
if err != nil { if err != nil {
glog.V(2).Infof("Error mounting Portworx Volume (%v) on Path (%v): %v", m.volName, mountPath, err) glog.Errorf("Error mounting Portworx Volume (%v) on Path (%v): %v", m.volName, mountPath, err)
return err return err
} }
return nil return nil
@ -139,7 +144,7 @@ func (util *PortworxVolumeUtil) MountVolume(m *portworxVolumeMounter, mountPath
// UnmountVolume unmounts a Portworx Volume // UnmountVolume unmounts a Portworx Volume
func (util *PortworxVolumeUtil) UnmountVolume(u *portworxVolumeUnmounter, mountPath string) error { func (util *PortworxVolumeUtil) UnmountVolume(u *portworxVolumeUnmounter, mountPath string) error {
driver, err := util.getPortworxDriver(u.plugin.host) driver, err := util.getPortworxDriver(u.plugin.host, true /*localOnly*/)
if err != nil || driver == nil { if err != nil || driver == nil {
glog.Errorf("Failed to get portworx driver. Err: %v", err) glog.Errorf("Failed to get portworx driver. Err: %v", err)
return err return err
@ -147,7 +152,7 @@ func (util *PortworxVolumeUtil) UnmountVolume(u *portworxVolumeUnmounter, mountP
err = driver.Unmount(u.volName, mountPath) err = driver.Unmount(u.volName, mountPath)
if err != nil { if err != nil {
glog.V(2).Infof("Error unmounting Portworx Volume (%v) on Path (%v): %v", u.volName, mountPath, err) glog.Errorf("Error unmounting Portworx Volume (%v) on Path (%v): %v", u.volName, mountPath, err)
return err return err
} }
return nil return nil
@ -181,13 +186,34 @@ func createDriverClient(hostname string) (*osdclient.Client, error) {
} }
} }
func (util *PortworxVolumeUtil) getPortworxDriver(volumeHost volume.VolumeHost) (volumeapi.VolumeDriver, error) { // getPortworxDriver() returns a Portworx volume driver which can be used for volume operations
// localOnly: If true, the returned driver will be connected to Portworx API server on volume host.
// If false, driver will be connected to API server on volume host or Portworx k8s service cluster IP
// This flag is required to explicitly force certain operations (mount, unmount, detach, attach) to
// go to the volume host instead of the k8s service which might route it to any host. This pertains to how
// Portworx mounts and attaches a volume to the running container. The node getting these requests needs to
// see the pod container mounts (specifically /var/lib/kubelet/pods/<pod_id>)
// Operations like create and delete volume don't need to be restricted to local volume host since
// any node in the Portworx cluster can co-ordinate the create/delete request and forward the operations to
// the Portworx node that will own/owns the data.
func (util *PortworxVolumeUtil) getPortworxDriver(volumeHost volume.VolumeHost, localOnly bool) (volumeapi.VolumeDriver, error) {
var err error
if localOnly {
util.portworxClient, err = createDriverClient(volumeHost.GetHostName())
if err != nil {
return nil, err
} else {
glog.V(4).Infof("Using portworx local service at: %v as api endpoint", volumeHost.GetHostName())
return volumeclient.VolumeDriver(util.portworxClient), nil
}
}
// check if existing saved client is valid
if isValid, _ := isClientValid(util.portworxClient); isValid { if isValid, _ := isClientValid(util.portworxClient); isValid {
return volumeclient.VolumeDriver(util.portworxClient), nil return volumeclient.VolumeDriver(util.portworxClient), nil
} }
// create new client // create new client
var err error
util.portworxClient, err = createDriverClient(volumeHost.GetHostName()) // for backward compatibility util.portworxClient, err = createDriverClient(volumeHost.GetHostName()) // for backward compatibility
if err != nil || util.portworxClient == nil { if err != nil || util.portworxClient == nil {
// Create client from portworx service // Create client from portworx service
@ -215,7 +241,7 @@ func (util *PortworxVolumeUtil) getPortworxDriver(volumeHost volume.VolumeHost)
return nil, err return nil, err
} }
glog.Infof("Using portworx service at: %v as api endpoint", svc.Spec.ClusterIP) glog.Infof("Using portworx cluster service at: %v as api endpoint", svc.Spec.ClusterIP)
} else { } else {
glog.Infof("Using portworx service at: %v as api endpoint", volumeHost.GetHostName()) glog.Infof("Using portworx service at: %v as api endpoint", volumeHost.GetHostName())
} }