mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-30 06:54:01 +00:00
Detangle Attach/Detach from GCE PD
This commit is contained in:
parent
f1e528eab6
commit
4858d0ab6f
@ -407,6 +407,7 @@ func TestSyncPodsDeletesWhenSourcesAreReady(t *testing.T) {
|
||||
func TestMountExternalVolumes(t *testing.T) {
|
||||
testKubelet := newTestKubelet(t)
|
||||
kubelet := testKubelet.kubelet
|
||||
kubelet.mounter = &mount.FakeMounter{}
|
||||
plug := &volumetest.FakeVolumePlugin{PluginName: "fake", Host: nil}
|
||||
kubelet.volumePluginMgr.InitPlugins([]volume.VolumePlugin{plug}, &volumeHost{kubelet})
|
||||
|
||||
|
@ -19,6 +19,7 @@ package kubelet
|
||||
import (
|
||||
"fmt"
|
||||
"io/ioutil"
|
||||
"os"
|
||||
"path"
|
||||
"strconv"
|
||||
|
||||
@ -160,20 +161,28 @@ func (kl *Kubelet) mountExternalVolumes(pod *api.Pod) (kubecontainer.VolumeMap,
|
||||
return nil, err
|
||||
}
|
||||
if attacher != nil {
|
||||
err = attacher.Attach(volSpec, kl.hostname)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
devicePath, err := attacher.WaitForAttach(volSpec, maxWaitForVolumeOps)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// If the device path is already mounted, avoid an expensive call to the
|
||||
// cloud provider.
|
||||
deviceMountPath := attacher.GetDeviceMountPath(volSpec)
|
||||
if err = attacher.MountDevice(volSpec, devicePath, deviceMountPath, kl.mounter); err != nil {
|
||||
notMountPoint, err := kl.mounter.IsLikelyNotMountPoint(deviceMountPath)
|
||||
if err != nil && !os.IsNotExist(err) {
|
||||
return nil, err
|
||||
}
|
||||
if notMountPoint {
|
||||
err = attacher.Attach(volSpec, kl.hostname)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
devicePath, err := attacher.WaitForAttach(volSpec, maxWaitForVolumeOps)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if err = attacher.MountDevice(volSpec, devicePath, deviceMountPath, kl.mounter); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
err = mounter.SetUp(fsGroup)
|
||||
|
220
pkg/volume/gce_pd/attacher.go
Normal file
220
pkg/volume/gce_pd/attacher.go
Normal file
@ -0,0 +1,220 @@
|
||||
/*
|
||||
Copyright 2016 The Kubernetes Authors All rights reserved.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package gce_pd
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"os"
|
||||
"path"
|
||||
"path/filepath"
|
||||
"strconv"
|
||||
"time"
|
||||
|
||||
"github.com/golang/glog"
|
||||
"k8s.io/kubernetes/pkg/util/exec"
|
||||
"k8s.io/kubernetes/pkg/util/keymutex"
|
||||
"k8s.io/kubernetes/pkg/util/mount"
|
||||
"k8s.io/kubernetes/pkg/util/sets"
|
||||
"k8s.io/kubernetes/pkg/volume"
|
||||
)
|
||||
|
||||
type gcePersistentDiskAttacher struct {
|
||||
host volume.VolumeHost
|
||||
}
|
||||
|
||||
var _ volume.Attacher = &gcePersistentDiskAttacher{}
|
||||
|
||||
var _ volume.AttachableVolumePlugin = &gcePersistentDiskPlugin{}
|
||||
|
||||
// Singleton key mutex for keeping attach/detach operations for the
|
||||
// same PD atomic
|
||||
// TODO(swagiaal): Once the Mount/Unmount manager is implemented this
|
||||
// will no longer be needed and should be removed.
|
||||
var attachDetachMutex = keymutex.NewKeyMutex()
|
||||
|
||||
func (plugin *gcePersistentDiskPlugin) NewAttacher() (volume.Attacher, error) {
|
||||
return &gcePersistentDiskAttacher{host: plugin.host}, nil
|
||||
}
|
||||
|
||||
func (attacher *gcePersistentDiskAttacher) Attach(spec *volume.Spec, hostName string) error {
|
||||
volumeSource, readOnly := getVolumeSource(spec)
|
||||
pdName := volumeSource.PDName
|
||||
|
||||
// Block execution until any pending detach operations for this PD have completed
|
||||
attachDetachMutex.LockKey(pdName)
|
||||
defer attachDetachMutex.UnlockKey(pdName)
|
||||
|
||||
gceCloud, err := getCloudProvider(attacher.host.GetCloudProvider())
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
for numRetries := 0; numRetries < maxRetries; numRetries++ {
|
||||
if numRetries > 0 {
|
||||
glog.Warningf("Retrying attach for GCE PD %q (retry count=%v).", pdName, numRetries)
|
||||
}
|
||||
|
||||
if err = gceCloud.AttachDisk(pdName, hostName, readOnly); err != nil {
|
||||
glog.Errorf("Error attaching PD %q: %+v", pdName, err)
|
||||
time.Sleep(errorSleepDuration)
|
||||
continue
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
return err
|
||||
}
|
||||
|
||||
func (attacher *gcePersistentDiskAttacher) WaitForAttach(spec *volume.Spec, timeout time.Duration) (string, error) {
|
||||
ticker := time.NewTicker(checkSleepDuration)
|
||||
defer ticker.Stop()
|
||||
timer := time.NewTimer(timeout)
|
||||
defer timer.Stop()
|
||||
|
||||
volumeSource, _ := getVolumeSource(spec)
|
||||
pdName := volumeSource.PDName
|
||||
partition := ""
|
||||
if volumeSource.Partition != 0 {
|
||||
partition = strconv.Itoa(int(volumeSource.Partition))
|
||||
}
|
||||
|
||||
sdBefore, err := filepath.Glob(diskSDPattern)
|
||||
if err != nil {
|
||||
glog.Errorf("Error filepath.Glob(\"%s\"): %v\r\n", diskSDPattern, err)
|
||||
}
|
||||
sdBeforeSet := sets.NewString(sdBefore...)
|
||||
|
||||
devicePaths := getDiskByIdPaths(pdName, partition)
|
||||
for {
|
||||
select {
|
||||
case <-ticker.C:
|
||||
glog.V(5).Infof("Checking GCE PD %q is attached.", pdName)
|
||||
path, err := verifyDevicePath(devicePaths, sdBeforeSet)
|
||||
if err != nil {
|
||||
// Log error, if any, and continue checking periodically. See issue #11321
|
||||
glog.Errorf("Error verifying GCE PD (%q) is attached: %v", pdName, err)
|
||||
} else if path != "" {
|
||||
// A device path has successfully been created for the PD
|
||||
glog.Infof("Successfully found attached GCE PD %q.", pdName)
|
||||
return path, nil
|
||||
}
|
||||
case <-timer.C:
|
||||
return "", fmt.Errorf("Could not find attached GCE PD %q. Timeout waiting for mount paths to be created.", pdName)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (attacher *gcePersistentDiskAttacher) GetDeviceMountPath(spec *volume.Spec) string {
|
||||
volumeSource, _ := getVolumeSource(spec)
|
||||
return makeGlobalPDName(attacher.host, volumeSource.PDName)
|
||||
}
|
||||
|
||||
func (attacher *gcePersistentDiskAttacher) MountDevice(spec *volume.Spec, devicePath string, deviceMountPath string, mounter mount.Interface) error {
|
||||
// Only mount the PD globally once.
|
||||
notMnt, err := mounter.IsLikelyNotMountPoint(deviceMountPath)
|
||||
if err != nil {
|
||||
if os.IsNotExist(err) {
|
||||
if err := os.MkdirAll(deviceMountPath, 0750); err != nil {
|
||||
return err
|
||||
}
|
||||
notMnt = true
|
||||
} else {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
volumeSource, readOnly := getVolumeSource(spec)
|
||||
|
||||
options := []string{}
|
||||
if readOnly {
|
||||
options = append(options, "ro")
|
||||
}
|
||||
if notMnt {
|
||||
diskMounter := &mount.SafeFormatAndMount{Interface: mounter, Runner: exec.New()}
|
||||
err = diskMounter.FormatAndMount(devicePath, deviceMountPath, volumeSource.FSType, options)
|
||||
if err != nil {
|
||||
os.Remove(deviceMountPath)
|
||||
return err
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
type gcePersistentDiskDetacher struct {
|
||||
host volume.VolumeHost
|
||||
}
|
||||
|
||||
var _ volume.Detacher = &gcePersistentDiskDetacher{}
|
||||
|
||||
func (plugin *gcePersistentDiskPlugin) NewDetacher() (volume.Detacher, error) {
|
||||
return &gcePersistentDiskDetacher{host: plugin.host}, nil
|
||||
}
|
||||
|
||||
func (detacher *gcePersistentDiskDetacher) Detach(deviceMountPath string, hostName string) error {
|
||||
pdName := path.Base(deviceMountPath)
|
||||
|
||||
// Block execution until any pending attach/detach operations for this PD have completed
|
||||
attachDetachMutex.LockKey(pdName)
|
||||
defer attachDetachMutex.UnlockKey(pdName)
|
||||
|
||||
gceCloud, err := getCloudProvider(detacher.host.GetCloudProvider())
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
for numRetries := 0; numRetries < maxRetries; numRetries++ {
|
||||
if numRetries > 0 {
|
||||
glog.Warningf("Retrying detach for GCE PD %q (retry count=%v).", pdName, numRetries)
|
||||
}
|
||||
|
||||
if err = gceCloud.DetachDisk(pdName, hostName); err != nil {
|
||||
glog.Errorf("Error detaching PD %q: %v", pdName, err)
|
||||
time.Sleep(errorSleepDuration)
|
||||
continue
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
return err
|
||||
}
|
||||
|
||||
func (detacher *gcePersistentDiskDetacher) WaitForDetach(devicePath string, timeout time.Duration) error {
|
||||
ticker := time.NewTicker(checkSleepDuration)
|
||||
defer ticker.Stop()
|
||||
timer := time.NewTimer(timeout)
|
||||
defer timer.Stop()
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-ticker.C:
|
||||
glog.V(5).Infof("Checking device %q is detached.", devicePath)
|
||||
if pathExists, err := pathExists(devicePath); err != nil {
|
||||
return fmt.Errorf("Error checking if device path exists: %v", err)
|
||||
} else if !pathExists {
|
||||
return nil
|
||||
}
|
||||
case <-timer.C:
|
||||
return fmt.Errorf("Timeout reached; PD Device %v is still attached", devicePath)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (detacher *gcePersistentDiskDetacher) UnmountDevice(deviceMountPath string, mounter mount.Interface) error {
|
||||
return unmountPDAndRemoveGlobalPath(deviceMountPath, mounter)
|
||||
}
|
@ -26,7 +26,6 @@ import (
|
||||
"k8s.io/kubernetes/pkg/api"
|
||||
"k8s.io/kubernetes/pkg/api/resource"
|
||||
"k8s.io/kubernetes/pkg/types"
|
||||
"k8s.io/kubernetes/pkg/util/exec"
|
||||
"k8s.io/kubernetes/pkg/util/mount"
|
||||
"k8s.io/kubernetes/pkg/util/strings"
|
||||
"k8s.io/kubernetes/pkg/volume"
|
||||
@ -76,25 +75,30 @@ func (plugin *gcePersistentDiskPlugin) NewMounter(spec *volume.Spec, pod *api.Po
|
||||
return plugin.newMounterInternal(spec, pod.UID, &GCEDiskUtil{}, plugin.host.GetMounter())
|
||||
}
|
||||
|
||||
func (plugin *gcePersistentDiskPlugin) newMounterInternal(spec *volume.Spec, podUID types.UID, manager pdManager, mounter mount.Interface) (volume.Mounter, error) {
|
||||
// GCEPDs used directly in a pod have a ReadOnly flag set by the pod author.
|
||||
// GCEPDs used as a PersistentVolume gets the ReadOnly flag indirectly through the persistent-claim volume used to mount the PV
|
||||
func getVolumeSource(spec *volume.Spec) (*api.GCEPersistentDiskVolumeSource, bool) {
|
||||
var readOnly bool
|
||||
var volumeSource *api.GCEPersistentDiskVolumeSource
|
||||
|
||||
var gce *api.GCEPersistentDiskVolumeSource
|
||||
if spec.Volume != nil && spec.Volume.GCEPersistentDisk != nil {
|
||||
gce = spec.Volume.GCEPersistentDisk
|
||||
readOnly = gce.ReadOnly
|
||||
volumeSource = spec.Volume.GCEPersistentDisk
|
||||
readOnly = volumeSource.ReadOnly
|
||||
} else {
|
||||
gce = spec.PersistentVolume.Spec.GCEPersistentDisk
|
||||
volumeSource = spec.PersistentVolume.Spec.GCEPersistentDisk
|
||||
readOnly = spec.ReadOnly
|
||||
}
|
||||
|
||||
pdName := gce.PDName
|
||||
fsType := gce.FSType
|
||||
return volumeSource, readOnly
|
||||
}
|
||||
|
||||
func (plugin *gcePersistentDiskPlugin) newMounterInternal(spec *volume.Spec, podUID types.UID, manager pdManager, mounter mount.Interface) (volume.Mounter, error) {
|
||||
// GCEPDs used directly in a pod have a ReadOnly flag set by the pod author.
|
||||
// GCEPDs used as a PersistentVolume gets the ReadOnly flag indirectly through the persistent-claim volume used to mount the PV
|
||||
volumeSource, readOnly := getVolumeSource(spec)
|
||||
|
||||
pdName := volumeSource.PDName
|
||||
partition := ""
|
||||
if gce.Partition != 0 {
|
||||
partition = strconv.Itoa(int(gce.Partition))
|
||||
if volumeSource.Partition != 0 {
|
||||
partition = strconv.Itoa(int(volumeSource.Partition))
|
||||
}
|
||||
|
||||
return &gcePersistentDiskMounter{
|
||||
@ -107,9 +111,7 @@ func (plugin *gcePersistentDiskPlugin) newMounterInternal(spec *volume.Spec, pod
|
||||
manager: manager,
|
||||
plugin: plugin,
|
||||
},
|
||||
fsType: fsType,
|
||||
readOnly: readOnly,
|
||||
diskMounter: &mount.SafeFormatAndMount{Interface: mounter, Runner: exec.New()}}, nil
|
||||
readOnly: readOnly}, nil
|
||||
}
|
||||
|
||||
func (plugin *gcePersistentDiskPlugin) NewUnmounter(volName string, podUID types.UID) (volume.Unmounter, error) {
|
||||
@ -163,10 +165,6 @@ func (plugin *gcePersistentDiskPlugin) newProvisionerInternal(options volume.Vol
|
||||
|
||||
// Abstract interface to PD operations.
|
||||
type pdManager interface {
|
||||
// Attaches the disk to the kubelet's host machine.
|
||||
AttachAndMountDisk(b *gcePersistentDiskMounter, globalPDPath string) error
|
||||
// Detaches the disk from the kubelet's host machine.
|
||||
DetachDisk(c *gcePersistentDiskUnmounter) error
|
||||
// Creates a volume
|
||||
CreateVolume(provisioner *gcePersistentDiskProvisioner) (volumeID string, volumeSizeGB int, labels map[string]string, err error)
|
||||
// Deletes a volume
|
||||
@ -182,7 +180,7 @@ type gcePersistentDisk struct {
|
||||
pdName string
|
||||
// Specifies the partition to mount
|
||||
partition string
|
||||
// Utility interface that provides API calls to the provider to attach/detach disks.
|
||||
// Utility interface to provision and delete disks
|
||||
manager pdManager
|
||||
// Mounter interface that provides system calls to mount the global path to the pod local path.
|
||||
mounter mount.Interface
|
||||
@ -190,21 +188,10 @@ type gcePersistentDisk struct {
|
||||
volume.MetricsNil
|
||||
}
|
||||
|
||||
func detachDiskLogError(pd *gcePersistentDisk) {
|
||||
err := pd.manager.DetachDisk(&gcePersistentDiskUnmounter{pd})
|
||||
if err != nil {
|
||||
glog.Warningf("Failed to detach disk: %v (%v)", pd, err)
|
||||
}
|
||||
}
|
||||
|
||||
type gcePersistentDiskMounter struct {
|
||||
*gcePersistentDisk
|
||||
// Filesystem type, optional.
|
||||
fsType string
|
||||
// Specifies whether the disk will be attached as read-only.
|
||||
// Specifies whether the disk will be mounted as read-only.
|
||||
readOnly bool
|
||||
// diskMounter provides the interface that is used to mount the actual block device.
|
||||
diskMounter *mount.SafeFormatAndMount
|
||||
}
|
||||
|
||||
var _ volume.Mounter = &gcePersistentDiskMounter{}
|
||||
@ -217,12 +204,12 @@ func (b *gcePersistentDiskMounter) GetAttributes() volume.Attributes {
|
||||
}
|
||||
}
|
||||
|
||||
// SetUp attaches the disk and bind mounts to the volume path.
|
||||
// SetUp bind mounts the disk global mount to the volume path.
|
||||
func (b *gcePersistentDiskMounter) SetUp(fsGroup *int64) error {
|
||||
return b.SetUpAt(b.GetPath(), fsGroup)
|
||||
}
|
||||
|
||||
// SetUpAt attaches the disk and bind mounts to the volume path.
|
||||
// SetUp bind mounts the disk global mount to the give volume path.
|
||||
func (b *gcePersistentDiskMounter) SetUpAt(dir string, fsGroup *int64) error {
|
||||
// TODO: handle failed mounts here.
|
||||
notMnt, err := b.mounter.IsLikelyNotMountPoint(dir)
|
||||
@ -234,14 +221,7 @@ func (b *gcePersistentDiskMounter) SetUpAt(dir string, fsGroup *int64) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
globalPDPath := makeGlobalPDName(b.plugin.host, b.pdName)
|
||||
if err := b.manager.AttachAndMountDisk(b, globalPDPath); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if err := os.MkdirAll(dir, 0750); err != nil {
|
||||
// TODO: we should really eject the attach/detach out into its own control loop.
|
||||
detachDiskLogError(b.gcePersistentDisk)
|
||||
return err
|
||||
}
|
||||
|
||||
@ -250,6 +230,8 @@ func (b *gcePersistentDiskMounter) SetUpAt(dir string, fsGroup *int64) error {
|
||||
if b.readOnly {
|
||||
options = append(options, "ro")
|
||||
}
|
||||
|
||||
globalPDPath := makeGlobalPDName(b.plugin.host, b.pdName)
|
||||
err = b.mounter.Mount(globalPDPath, dir, "", options)
|
||||
if err != nil {
|
||||
notMnt, mntErr := b.mounter.IsLikelyNotMountPoint(dir)
|
||||
@ -274,8 +256,6 @@ func (b *gcePersistentDiskMounter) SetUpAt(dir string, fsGroup *int64) error {
|
||||
}
|
||||
}
|
||||
os.Remove(dir)
|
||||
// TODO: we should really eject the attach/detach out into its own control loop.
|
||||
detachDiskLogError(b.gcePersistentDisk)
|
||||
return err
|
||||
}
|
||||
|
||||
@ -301,14 +281,12 @@ type gcePersistentDiskUnmounter struct {
|
||||
|
||||
var _ volume.Unmounter = &gcePersistentDiskUnmounter{}
|
||||
|
||||
// Unmounts the bind mount, and detaches the disk only if the PD
|
||||
// resource was the last reference to that disk on the kubelet.
|
||||
// TearDown unmounts the bind mount
|
||||
func (c *gcePersistentDiskUnmounter) TearDown() error {
|
||||
return c.TearDownAt(c.GetPath())
|
||||
}
|
||||
|
||||
// Unmounts the bind mount, and detaches the disk only if the PD
|
||||
// resource was the last reference to that disk on the kubelet.
|
||||
// TearDownAt unmounts the bind mount
|
||||
func (c *gcePersistentDiskUnmounter) TearDownAt(dir string) error {
|
||||
notMnt, err := c.mounter.IsLikelyNotMountPoint(dir)
|
||||
if err != nil {
|
||||
@ -317,35 +295,18 @@ func (c *gcePersistentDiskUnmounter) TearDownAt(dir string) error {
|
||||
if notMnt {
|
||||
return os.Remove(dir)
|
||||
}
|
||||
|
||||
refs, err := mount.GetMountRefs(c.mounter, dir)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
// Unmount the bind-mount inside this pod
|
||||
if err := c.mounter.Unmount(dir); err != nil {
|
||||
return err
|
||||
}
|
||||
// If len(refs) is 1, then all bind mounts have been removed, and the
|
||||
// remaining reference is the global mount. It is safe to detach.
|
||||
if len(refs) == 1 {
|
||||
// c.pdName is not initially set for volume-unmounters, so set it here.
|
||||
c.pdName = path.Base(refs[0])
|
||||
if err := c.manager.DetachDisk(c); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
notMnt, mntErr := c.mounter.IsLikelyNotMountPoint(dir)
|
||||
if mntErr != nil {
|
||||
glog.Errorf("IsLikelyNotMountPoint check failed: %v", mntErr)
|
||||
return err
|
||||
}
|
||||
if notMnt {
|
||||
if err := os.Remove(dir); err != nil {
|
||||
return err
|
||||
}
|
||||
return os.Remove(dir)
|
||||
}
|
||||
return nil
|
||||
return fmt.Errorf("Failed to unmount volume dir")
|
||||
}
|
||||
|
||||
type gcePersistentDiskDeleter struct {
|
||||
@ -393,7 +354,6 @@ func (c *gcePersistentDiskProvisioner) Provision() (*api.PersistentVolume, error
|
||||
PersistentVolumeSource: api.PersistentVolumeSource{
|
||||
GCEPersistentDisk: &api.GCEPersistentDiskVolumeSource{
|
||||
PDName: volumeID,
|
||||
FSType: "ext4",
|
||||
Partition: 0,
|
||||
ReadOnly: false,
|
||||
},
|
||||
|
@ -84,33 +84,6 @@ func contains(modes []api.PersistentVolumeAccessMode, mode api.PersistentVolumeA
|
||||
}
|
||||
|
||||
type fakePDManager struct {
|
||||
attachCalled bool
|
||||
detachCalled bool
|
||||
}
|
||||
|
||||
// TODO(jonesdl) To fully test this, we could create a loopback device
|
||||
// and mount that instead.
|
||||
func (fake *fakePDManager) AttachAndMountDisk(b *gcePersistentDiskMounter, globalPDPath string) error {
|
||||
globalPath := makeGlobalPDName(b.plugin.host, b.pdName)
|
||||
err := os.MkdirAll(globalPath, 0750)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
fake.attachCalled = true
|
||||
// Simulate the global mount so that the fakeMounter returns the
|
||||
// expected number of mounts for the attached disk.
|
||||
b.mounter.Mount(globalPath, globalPath, b.fsType, nil)
|
||||
return nil
|
||||
}
|
||||
|
||||
func (fake *fakePDManager) DetachDisk(c *gcePersistentDiskUnmounter) error {
|
||||
globalPath := makeGlobalPDName(c.plugin.host, c.pdName)
|
||||
err := os.RemoveAll(globalPath)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
fake.detachCalled = true
|
||||
return nil
|
||||
}
|
||||
|
||||
func (fake *fakePDManager) CreateVolume(c *gcePersistentDiskProvisioner) (volumeID string, volumeSizeGB int, labels map[string]string, err error) {
|
||||
@ -181,9 +154,6 @@ func TestPlugin(t *testing.T) {
|
||||
t.Errorf("SetUp() failed: %v", err)
|
||||
}
|
||||
}
|
||||
if !fakeManager.attachCalled {
|
||||
t.Errorf("Attach watch not called")
|
||||
}
|
||||
|
||||
fakeManager = &fakePDManager{}
|
||||
unmounter, err := plug.(*gcePersistentDiskPlugin).newUnmounterInternal("vol1", types.UID("poduid"), fakeManager, fakeMounter)
|
||||
@ -202,9 +172,6 @@ func TestPlugin(t *testing.T) {
|
||||
} else if !os.IsNotExist(err) {
|
||||
t.Errorf("SetUp() failed: %v", err)
|
||||
}
|
||||
if !fakeManager.detachCalled {
|
||||
t.Errorf("Detach watch not called")
|
||||
}
|
||||
|
||||
// Test Provisioner
|
||||
cap := resource.MustParse("100Mi")
|
||||
|
@ -25,10 +25,10 @@ import (
|
||||
"time"
|
||||
|
||||
"github.com/golang/glog"
|
||||
"k8s.io/kubernetes/pkg/cloudprovider"
|
||||
gcecloud "k8s.io/kubernetes/pkg/cloudprovider/providers/gce"
|
||||
"k8s.io/kubernetes/pkg/util/exec"
|
||||
"k8s.io/kubernetes/pkg/util/keymutex"
|
||||
"k8s.io/kubernetes/pkg/util/runtime"
|
||||
"k8s.io/kubernetes/pkg/util/mount"
|
||||
"k8s.io/kubernetes/pkg/util/sets"
|
||||
"k8s.io/kubernetes/pkg/volume"
|
||||
)
|
||||
@ -46,74 +46,10 @@ const (
|
||||
errorSleepDuration = 5 * time.Second
|
||||
)
|
||||
|
||||
// Singleton key mutex for keeping attach/detach operations for the same PD atomic
|
||||
var attachDetachMutex = keymutex.NewKeyMutex()
|
||||
|
||||
type GCEDiskUtil struct{}
|
||||
|
||||
// Attaches a disk specified by a volume.GCEPersistentDisk to the current kubelet.
|
||||
// Mounts the disk to it's global path.
|
||||
func (diskUtil *GCEDiskUtil) AttachAndMountDisk(b *gcePersistentDiskMounter, globalPDPath string) error {
|
||||
glog.V(5).Infof("AttachAndMountDisk(...) called for PD %q. Will block for existing operations, if any. (globalPDPath=%q)\r\n", b.pdName, globalPDPath)
|
||||
|
||||
// Block execution until any pending detach operations for this PD have completed
|
||||
attachDetachMutex.LockKey(b.pdName)
|
||||
defer attachDetachMutex.UnlockKey(b.pdName)
|
||||
|
||||
glog.V(5).Infof("AttachAndMountDisk(...) called for PD %q. Awake and ready to execute. (globalPDPath=%q)\r\n", b.pdName, globalPDPath)
|
||||
|
||||
sdBefore, err := filepath.Glob(diskSDPattern)
|
||||
if err != nil {
|
||||
glog.Errorf("Error filepath.Glob(\"%s\"): %v\r\n", diskSDPattern, err)
|
||||
}
|
||||
sdBeforeSet := sets.NewString(sdBefore...)
|
||||
|
||||
devicePath, err := attachDiskAndVerify(b, sdBeforeSet)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Only mount the PD globally once.
|
||||
notMnt, err := b.mounter.IsLikelyNotMountPoint(globalPDPath)
|
||||
if err != nil {
|
||||
if os.IsNotExist(err) {
|
||||
if err := os.MkdirAll(globalPDPath, 0750); err != nil {
|
||||
return err
|
||||
}
|
||||
notMnt = true
|
||||
} else {
|
||||
return err
|
||||
}
|
||||
}
|
||||
options := []string{}
|
||||
if b.readOnly {
|
||||
options = append(options, "ro")
|
||||
}
|
||||
if notMnt {
|
||||
err = b.diskMounter.FormatAndMount(devicePath, globalPDPath, b.fsType, options)
|
||||
if err != nil {
|
||||
os.Remove(globalPDPath)
|
||||
return err
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// Unmounts the device and detaches the disk from the kubelet's host machine.
|
||||
func (util *GCEDiskUtil) DetachDisk(c *gcePersistentDiskUnmounter) error {
|
||||
glog.V(5).Infof("DetachDisk(...) for PD %q\r\n", c.pdName)
|
||||
|
||||
if err := unmountPDAndRemoveGlobalPath(c); err != nil {
|
||||
glog.Errorf("Error unmounting PD %q: %v", c.pdName, err)
|
||||
}
|
||||
|
||||
// Detach disk asynchronously so that the kubelet sync loop is not blocked.
|
||||
go detachDiskAndVerify(c)
|
||||
return nil
|
||||
}
|
||||
|
||||
func (util *GCEDiskUtil) DeleteVolume(d *gcePersistentDiskDeleter) error {
|
||||
cloud, err := getCloudProvider(d.gcePersistentDisk.plugin)
|
||||
cloud, err := getCloudProvider(d.gcePersistentDisk.plugin.host.GetCloudProvider())
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
@ -129,7 +65,7 @@ func (util *GCEDiskUtil) DeleteVolume(d *gcePersistentDiskDeleter) error {
|
||||
// CreateVolume creates a GCE PD.
|
||||
// Returns: volumeID, volumeSizeGB, labels, error
|
||||
func (gceutil *GCEDiskUtil) CreateVolume(c *gcePersistentDiskProvisioner) (string, int, map[string]string, error) {
|
||||
cloud, err := getCloudProvider(c.gcePersistentDisk.plugin)
|
||||
cloud, err := getCloudProvider(c.gcePersistentDisk.plugin.host.GetCloudProvider())
|
||||
if err != nil {
|
||||
return "", 0, nil, err
|
||||
}
|
||||
@ -163,46 +99,6 @@ func (gceutil *GCEDiskUtil) CreateVolume(c *gcePersistentDiskProvisioner) (strin
|
||||
return name, int(requestGB), labels, nil
|
||||
}
|
||||
|
||||
// Attaches the specified persistent disk device to node, verifies that it is attached, and retries if it fails.
|
||||
func attachDiskAndVerify(b *gcePersistentDiskMounter, sdBeforeSet sets.String) (string, error) {
|
||||
devicePaths := getDiskByIdPaths(b.gcePersistentDisk)
|
||||
gceCloud, err := getCloudProvider(b.gcePersistentDisk.plugin)
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
|
||||
for numRetries := 0; numRetries < maxRetries; numRetries++ {
|
||||
|
||||
if numRetries > 0 {
|
||||
glog.Warningf("Retrying attach for GCE PD %q (retry count=%v).", b.pdName, numRetries)
|
||||
}
|
||||
|
||||
if err := gceCloud.AttachDisk(b.pdName, b.plugin.host.GetHostName(), b.readOnly); err != nil {
|
||||
glog.Errorf("Error attaching PD %q: %v", b.pdName, err)
|
||||
time.Sleep(errorSleepDuration)
|
||||
continue
|
||||
}
|
||||
|
||||
for numChecks := 0; numChecks < maxChecks; numChecks++ {
|
||||
path, err := verifyDevicePath(devicePaths, sdBeforeSet)
|
||||
if err != nil {
|
||||
// Log error, if any, and continue checking periodically. See issue #11321
|
||||
glog.Errorf("Error verifying GCE PD (%q) is attached: %v", b.pdName, err)
|
||||
} else if path != "" {
|
||||
// A device path has successfully been created for the PD
|
||||
glog.Infof("Successfully attached GCE PD %q.", b.pdName)
|
||||
return path, nil
|
||||
}
|
||||
|
||||
// Sleep then check again
|
||||
glog.V(3).Infof("Waiting for GCE PD %q to attach.", b.pdName)
|
||||
time.Sleep(checkSleepDuration)
|
||||
}
|
||||
}
|
||||
|
||||
return "", fmt.Errorf("Could not attach GCE PD %q. Timeout waiting for mount paths to be created.", b.pdName)
|
||||
}
|
||||
|
||||
// Returns the first path that exists, or empty string if none exist.
|
||||
func verifyDevicePath(devicePaths []string, sdBeforeSet sets.String) (string, error) {
|
||||
if err := udevadmChangeToNewDrives(sdBeforeSet); err != nil {
|
||||
@ -221,65 +117,10 @@ func verifyDevicePath(devicePaths []string, sdBeforeSet sets.String) (string, er
|
||||
return "", nil
|
||||
}
|
||||
|
||||
// Detaches the specified persistent disk device from node, verifies that it is detached, and retries if it fails.
|
||||
// This function is intended to be called asynchronously as a go routine.
|
||||
func detachDiskAndVerify(c *gcePersistentDiskUnmounter) {
|
||||
glog.V(5).Infof("detachDiskAndVerify(...) for pd %q. Will block for pending operations", c.pdName)
|
||||
defer runtime.HandleCrash()
|
||||
|
||||
// Block execution until any pending attach/detach operations for this PD have completed
|
||||
attachDetachMutex.LockKey(c.pdName)
|
||||
defer attachDetachMutex.UnlockKey(c.pdName)
|
||||
|
||||
glog.V(5).Infof("detachDiskAndVerify(...) for pd %q. Awake and ready to execute.", c.pdName)
|
||||
|
||||
devicePaths := getDiskByIdPaths(c.gcePersistentDisk)
|
||||
gceCloud, err := getCloudProvider(c.gcePersistentDisk.plugin)
|
||||
if err != nil {
|
||||
glog.Errorf("Failed to get GCECloudProvider while detaching %v ", err)
|
||||
return
|
||||
}
|
||||
|
||||
for numRetries := 0; numRetries < maxRetries; numRetries++ {
|
||||
|
||||
if numRetries > 0 {
|
||||
glog.Warningf("Retrying detach for GCE PD %q (retry count=%v).", c.pdName, numRetries)
|
||||
}
|
||||
|
||||
if err := gceCloud.DetachDisk(c.pdName, c.plugin.host.GetHostName()); err != nil {
|
||||
glog.Errorf("Error detaching PD %q: %v", c.pdName, err)
|
||||
time.Sleep(errorSleepDuration)
|
||||
continue
|
||||
}
|
||||
|
||||
for numChecks := 0; numChecks < maxChecks; numChecks++ {
|
||||
allPathsRemoved, err := verifyAllPathsRemoved(devicePaths)
|
||||
if err != nil {
|
||||
// Log error, if any, and continue checking periodically.
|
||||
glog.Errorf("Error verifying GCE PD (%q) is detached: %v", c.pdName, err)
|
||||
} else if allPathsRemoved {
|
||||
// All paths to the PD have been successfully removed
|
||||
unmountPDAndRemoveGlobalPath(c)
|
||||
glog.Infof("Successfully detached GCE PD %q.", c.pdName)
|
||||
return
|
||||
}
|
||||
|
||||
// Sleep then check again
|
||||
glog.V(3).Infof("Waiting for GCE PD %q to detach.", c.pdName)
|
||||
time.Sleep(checkSleepDuration)
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
glog.Errorf("Failed to detach GCE PD %q. One or more mount paths was not removed.", c.pdName)
|
||||
}
|
||||
|
||||
// Unmount the global PD mount, which should be the only one, and delete it.
|
||||
func unmountPDAndRemoveGlobalPath(c *gcePersistentDiskUnmounter) error {
|
||||
globalPDPath := makeGlobalPDName(c.plugin.host, c.pdName)
|
||||
|
||||
err := c.mounter.Unmount(globalPDPath)
|
||||
os.Remove(globalPDPath)
|
||||
func unmountPDAndRemoveGlobalPath(globalMountPath string, mounter mount.Interface) error {
|
||||
err := mounter.Unmount(globalMountPath)
|
||||
os.Remove(globalMountPath)
|
||||
return err
|
||||
}
|
||||
|
||||
@ -302,15 +143,15 @@ func verifyAllPathsRemoved(devicePaths []string) (bool, error) {
|
||||
}
|
||||
|
||||
// Returns list of all /dev/disk/by-id/* paths for given PD.
|
||||
func getDiskByIdPaths(pd *gcePersistentDisk) []string {
|
||||
func getDiskByIdPaths(pdName string, partition string) []string {
|
||||
devicePaths := []string{
|
||||
path.Join(diskByIdPath, diskGooglePrefix+pd.pdName),
|
||||
path.Join(diskByIdPath, diskScsiGooglePrefix+pd.pdName),
|
||||
path.Join(diskByIdPath, diskGooglePrefix+pdName),
|
||||
path.Join(diskByIdPath, diskScsiGooglePrefix+pdName),
|
||||
}
|
||||
|
||||
if pd.partition != "" {
|
||||
if partition != "" {
|
||||
for i, path := range devicePaths {
|
||||
devicePaths[i] = path + diskPartitionSuffix + pd.partition
|
||||
devicePaths[i] = path + diskPartitionSuffix + partition
|
||||
}
|
||||
}
|
||||
|
||||
@ -330,17 +171,9 @@ func pathExists(path string) (bool, error) {
|
||||
}
|
||||
|
||||
// Return cloud provider
|
||||
func getCloudProvider(plugin *gcePersistentDiskPlugin) (*gcecloud.GCECloud, error) {
|
||||
if plugin == nil {
|
||||
return nil, fmt.Errorf("Failed to get GCE Cloud Provider. plugin object is nil.")
|
||||
}
|
||||
if plugin.host == nil {
|
||||
return nil, fmt.Errorf("Failed to get GCE Cloud Provider. plugin.host object is nil.")
|
||||
}
|
||||
|
||||
func getCloudProvider(cloudProvider cloudprovider.Interface) (*gcecloud.GCECloud, error) {
|
||||
var err error
|
||||
for numRetries := 0; numRetries < maxRetries; numRetries++ {
|
||||
cloudProvider := plugin.host.GetCloudProvider()
|
||||
gceCloudProvider, ok := cloudProvider.(*gcecloud.GCECloud)
|
||||
if !ok || gceCloudProvider == nil {
|
||||
// Retry on error. See issue #11321
|
||||
@ -355,8 +188,10 @@ func getCloudProvider(plugin *gcePersistentDiskPlugin) (*gcecloud.GCECloud, erro
|
||||
return nil, fmt.Errorf("Failed to get GCE GCECloudProvider with error %v", err)
|
||||
}
|
||||
|
||||
// Calls "udevadm trigger --action=change" for newly created "/dev/sd*" drives (exist only in after set).
|
||||
// This is workaround for Issue #7972. Once the underlying issue has been resolved, this may be removed.
|
||||
// Triggers the application of udev rules by calling "udevadm trigger
|
||||
// --action=change" for newly created "/dev/sd*" drives (exist only in
|
||||
// after set). This is workaround for Issue #7972. Once the underlying
|
||||
// issue has been resolved, this may be removed.
|
||||
func udevadmChangeToNewDrives(sdBeforeSet sets.String) error {
|
||||
sdAfter, err := filepath.Glob(diskSDPattern)
|
||||
if err != nil {
|
||||
|
Loading…
Reference in New Issue
Block a user