diff --git a/pkg/kubelet/kubelet_test.go b/pkg/kubelet/kubelet_test.go index 3e7d214d9d4..347fcd801bc 100644 --- a/pkg/kubelet/kubelet_test.go +++ b/pkg/kubelet/kubelet_test.go @@ -407,6 +407,7 @@ func TestSyncPodsDeletesWhenSourcesAreReady(t *testing.T) { func TestMountExternalVolumes(t *testing.T) { testKubelet := newTestKubelet(t) kubelet := testKubelet.kubelet + kubelet.mounter = &mount.FakeMounter{} plug := &volumetest.FakeVolumePlugin{PluginName: "fake", Host: nil} kubelet.volumePluginMgr.InitPlugins([]volume.VolumePlugin{plug}, &volumeHost{kubelet}) diff --git a/pkg/kubelet/volumes.go b/pkg/kubelet/volumes.go index 80aabab5fcc..fe2a78f6940 100644 --- a/pkg/kubelet/volumes.go +++ b/pkg/kubelet/volumes.go @@ -19,6 +19,7 @@ package kubelet import ( "fmt" "io/ioutil" + "os" "path" "strconv" @@ -160,20 +161,28 @@ func (kl *Kubelet) mountExternalVolumes(pod *api.Pod) (kubecontainer.VolumeMap, return nil, err } if attacher != nil { - err = attacher.Attach(volSpec, kl.hostname) - if err != nil { - return nil, err - } - - devicePath, err := attacher.WaitForAttach(volSpec, maxWaitForVolumeOps) - if err != nil { - return nil, err - } - + // If the device path is already mounted, avoid an expensive call to the + // cloud provider. deviceMountPath := attacher.GetDeviceMountPath(volSpec) - if err = attacher.MountDevice(volSpec, devicePath, deviceMountPath, kl.mounter); err != nil { + notMountPoint, err := kl.mounter.IsLikelyNotMountPoint(deviceMountPath) + if err != nil && !os.IsNotExist(err) { return nil, err } + if notMountPoint { + err = attacher.Attach(volSpec, kl.hostname) + if err != nil { + return nil, err + } + + devicePath, err := attacher.WaitForAttach(volSpec, maxWaitForVolumeOps) + if err != nil { + return nil, err + } + + if err = attacher.MountDevice(volSpec, devicePath, deviceMountPath, kl.mounter); err != nil { + return nil, err + } + } } err = mounter.SetUp(fsGroup) diff --git a/pkg/volume/gce_pd/attacher.go b/pkg/volume/gce_pd/attacher.go new file mode 100644 index 00000000000..5ad0b77ad80 --- /dev/null +++ b/pkg/volume/gce_pd/attacher.go @@ -0,0 +1,220 @@ +/* +Copyright 2016 The Kubernetes Authors All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package gce_pd + +import ( + "fmt" + "os" + "path" + "path/filepath" + "strconv" + "time" + + "github.com/golang/glog" + "k8s.io/kubernetes/pkg/util/exec" + "k8s.io/kubernetes/pkg/util/keymutex" + "k8s.io/kubernetes/pkg/util/mount" + "k8s.io/kubernetes/pkg/util/sets" + "k8s.io/kubernetes/pkg/volume" +) + +type gcePersistentDiskAttacher struct { + host volume.VolumeHost +} + +var _ volume.Attacher = &gcePersistentDiskAttacher{} + +var _ volume.AttachableVolumePlugin = &gcePersistentDiskPlugin{} + +// Singleton key mutex for keeping attach/detach operations for the +// same PD atomic +// TODO(swagiaal): Once the Mount/Unmount manager is implemented this +// will no longer be needed and should be removed. +var attachDetachMutex = keymutex.NewKeyMutex() + +func (plugin *gcePersistentDiskPlugin) NewAttacher() (volume.Attacher, error) { + return &gcePersistentDiskAttacher{host: plugin.host}, nil +} + +func (attacher *gcePersistentDiskAttacher) Attach(spec *volume.Spec, hostName string) error { + volumeSource, readOnly := getVolumeSource(spec) + pdName := volumeSource.PDName + + // Block execution until any pending detach operations for this PD have completed + attachDetachMutex.LockKey(pdName) + defer attachDetachMutex.UnlockKey(pdName) + + gceCloud, err := getCloudProvider(attacher.host.GetCloudProvider()) + if err != nil { + return err + } + + for numRetries := 0; numRetries < maxRetries; numRetries++ { + if numRetries > 0 { + glog.Warningf("Retrying attach for GCE PD %q (retry count=%v).", pdName, numRetries) + } + + if err = gceCloud.AttachDisk(pdName, hostName, readOnly); err != nil { + glog.Errorf("Error attaching PD %q: %+v", pdName, err) + time.Sleep(errorSleepDuration) + continue + } + + return nil + } + + return err +} + +func (attacher *gcePersistentDiskAttacher) WaitForAttach(spec *volume.Spec, timeout time.Duration) (string, error) { + ticker := time.NewTicker(checkSleepDuration) + defer ticker.Stop() + timer := time.NewTimer(timeout) + defer timer.Stop() + + volumeSource, _ := getVolumeSource(spec) + pdName := volumeSource.PDName + partition := "" + if volumeSource.Partition != 0 { + partition = strconv.Itoa(int(volumeSource.Partition)) + } + + sdBefore, err := filepath.Glob(diskSDPattern) + if err != nil { + glog.Errorf("Error filepath.Glob(\"%s\"): %v\r\n", diskSDPattern, err) + } + sdBeforeSet := sets.NewString(sdBefore...) + + devicePaths := getDiskByIdPaths(pdName, partition) + for { + select { + case <-ticker.C: + glog.V(5).Infof("Checking GCE PD %q is attached.", pdName) + path, err := verifyDevicePath(devicePaths, sdBeforeSet) + if err != nil { + // Log error, if any, and continue checking periodically. See issue #11321 + glog.Errorf("Error verifying GCE PD (%q) is attached: %v", pdName, err) + } else if path != "" { + // A device path has successfully been created for the PD + glog.Infof("Successfully found attached GCE PD %q.", pdName) + return path, nil + } + case <-timer.C: + return "", fmt.Errorf("Could not find attached GCE PD %q. Timeout waiting for mount paths to be created.", pdName) + } + } +} + +func (attacher *gcePersistentDiskAttacher) GetDeviceMountPath(spec *volume.Spec) string { + volumeSource, _ := getVolumeSource(spec) + return makeGlobalPDName(attacher.host, volumeSource.PDName) +} + +func (attacher *gcePersistentDiskAttacher) MountDevice(spec *volume.Spec, devicePath string, deviceMountPath string, mounter mount.Interface) error { + // Only mount the PD globally once. + notMnt, err := mounter.IsLikelyNotMountPoint(deviceMountPath) + if err != nil { + if os.IsNotExist(err) { + if err := os.MkdirAll(deviceMountPath, 0750); err != nil { + return err + } + notMnt = true + } else { + return err + } + } + + volumeSource, readOnly := getVolumeSource(spec) + + options := []string{} + if readOnly { + options = append(options, "ro") + } + if notMnt { + diskMounter := &mount.SafeFormatAndMount{Interface: mounter, Runner: exec.New()} + err = diskMounter.FormatAndMount(devicePath, deviceMountPath, volumeSource.FSType, options) + if err != nil { + os.Remove(deviceMountPath) + return err + } + } + return nil +} + +type gcePersistentDiskDetacher struct { + host volume.VolumeHost +} + +var _ volume.Detacher = &gcePersistentDiskDetacher{} + +func (plugin *gcePersistentDiskPlugin) NewDetacher() (volume.Detacher, error) { + return &gcePersistentDiskDetacher{host: plugin.host}, nil +} + +func (detacher *gcePersistentDiskDetacher) Detach(deviceMountPath string, hostName string) error { + pdName := path.Base(deviceMountPath) + + // Block execution until any pending attach/detach operations for this PD have completed + attachDetachMutex.LockKey(pdName) + defer attachDetachMutex.UnlockKey(pdName) + + gceCloud, err := getCloudProvider(detacher.host.GetCloudProvider()) + if err != nil { + return err + } + + for numRetries := 0; numRetries < maxRetries; numRetries++ { + if numRetries > 0 { + glog.Warningf("Retrying detach for GCE PD %q (retry count=%v).", pdName, numRetries) + } + + if err = gceCloud.DetachDisk(pdName, hostName); err != nil { + glog.Errorf("Error detaching PD %q: %v", pdName, err) + time.Sleep(errorSleepDuration) + continue + } + + return nil + } + + return err +} + +func (detacher *gcePersistentDiskDetacher) WaitForDetach(devicePath string, timeout time.Duration) error { + ticker := time.NewTicker(checkSleepDuration) + defer ticker.Stop() + timer := time.NewTimer(timeout) + defer timer.Stop() + + for { + select { + case <-ticker.C: + glog.V(5).Infof("Checking device %q is detached.", devicePath) + if pathExists, err := pathExists(devicePath); err != nil { + return fmt.Errorf("Error checking if device path exists: %v", err) + } else if !pathExists { + return nil + } + case <-timer.C: + return fmt.Errorf("Timeout reached; PD Device %v is still attached", devicePath) + } + } +} + +func (detacher *gcePersistentDiskDetacher) UnmountDevice(deviceMountPath string, mounter mount.Interface) error { + return unmountPDAndRemoveGlobalPath(deviceMountPath, mounter) +} diff --git a/pkg/volume/gce_pd/gce_pd.go b/pkg/volume/gce_pd/gce_pd.go index 32ea1043420..c923406574b 100644 --- a/pkg/volume/gce_pd/gce_pd.go +++ b/pkg/volume/gce_pd/gce_pd.go @@ -26,7 +26,6 @@ import ( "k8s.io/kubernetes/pkg/api" "k8s.io/kubernetes/pkg/api/resource" "k8s.io/kubernetes/pkg/types" - "k8s.io/kubernetes/pkg/util/exec" "k8s.io/kubernetes/pkg/util/mount" "k8s.io/kubernetes/pkg/util/strings" "k8s.io/kubernetes/pkg/volume" @@ -76,25 +75,30 @@ func (plugin *gcePersistentDiskPlugin) NewMounter(spec *volume.Spec, pod *api.Po return plugin.newMounterInternal(spec, pod.UID, &GCEDiskUtil{}, plugin.host.GetMounter()) } -func (plugin *gcePersistentDiskPlugin) newMounterInternal(spec *volume.Spec, podUID types.UID, manager pdManager, mounter mount.Interface) (volume.Mounter, error) { - // GCEPDs used directly in a pod have a ReadOnly flag set by the pod author. - // GCEPDs used as a PersistentVolume gets the ReadOnly flag indirectly through the persistent-claim volume used to mount the PV +func getVolumeSource(spec *volume.Spec) (*api.GCEPersistentDiskVolumeSource, bool) { var readOnly bool + var volumeSource *api.GCEPersistentDiskVolumeSource - var gce *api.GCEPersistentDiskVolumeSource if spec.Volume != nil && spec.Volume.GCEPersistentDisk != nil { - gce = spec.Volume.GCEPersistentDisk - readOnly = gce.ReadOnly + volumeSource = spec.Volume.GCEPersistentDisk + readOnly = volumeSource.ReadOnly } else { - gce = spec.PersistentVolume.Spec.GCEPersistentDisk + volumeSource = spec.PersistentVolume.Spec.GCEPersistentDisk readOnly = spec.ReadOnly } - pdName := gce.PDName - fsType := gce.FSType + return volumeSource, readOnly +} + +func (plugin *gcePersistentDiskPlugin) newMounterInternal(spec *volume.Spec, podUID types.UID, manager pdManager, mounter mount.Interface) (volume.Mounter, error) { + // GCEPDs used directly in a pod have a ReadOnly flag set by the pod author. + // GCEPDs used as a PersistentVolume gets the ReadOnly flag indirectly through the persistent-claim volume used to mount the PV + volumeSource, readOnly := getVolumeSource(spec) + + pdName := volumeSource.PDName partition := "" - if gce.Partition != 0 { - partition = strconv.Itoa(int(gce.Partition)) + if volumeSource.Partition != 0 { + partition = strconv.Itoa(int(volumeSource.Partition)) } return &gcePersistentDiskMounter{ @@ -107,9 +111,7 @@ func (plugin *gcePersistentDiskPlugin) newMounterInternal(spec *volume.Spec, pod manager: manager, plugin: plugin, }, - fsType: fsType, - readOnly: readOnly, - diskMounter: &mount.SafeFormatAndMount{Interface: mounter, Runner: exec.New()}}, nil + readOnly: readOnly}, nil } func (plugin *gcePersistentDiskPlugin) NewUnmounter(volName string, podUID types.UID) (volume.Unmounter, error) { @@ -163,10 +165,6 @@ func (plugin *gcePersistentDiskPlugin) newProvisionerInternal(options volume.Vol // Abstract interface to PD operations. type pdManager interface { - // Attaches the disk to the kubelet's host machine. - AttachAndMountDisk(b *gcePersistentDiskMounter, globalPDPath string) error - // Detaches the disk from the kubelet's host machine. - DetachDisk(c *gcePersistentDiskUnmounter) error // Creates a volume CreateVolume(provisioner *gcePersistentDiskProvisioner) (volumeID string, volumeSizeGB int, labels map[string]string, err error) // Deletes a volume @@ -182,7 +180,7 @@ type gcePersistentDisk struct { pdName string // Specifies the partition to mount partition string - // Utility interface that provides API calls to the provider to attach/detach disks. + // Utility interface to provision and delete disks manager pdManager // Mounter interface that provides system calls to mount the global path to the pod local path. mounter mount.Interface @@ -190,21 +188,10 @@ type gcePersistentDisk struct { volume.MetricsNil } -func detachDiskLogError(pd *gcePersistentDisk) { - err := pd.manager.DetachDisk(&gcePersistentDiskUnmounter{pd}) - if err != nil { - glog.Warningf("Failed to detach disk: %v (%v)", pd, err) - } -} - type gcePersistentDiskMounter struct { *gcePersistentDisk - // Filesystem type, optional. - fsType string - // Specifies whether the disk will be attached as read-only. + // Specifies whether the disk will be mounted as read-only. readOnly bool - // diskMounter provides the interface that is used to mount the actual block device. - diskMounter *mount.SafeFormatAndMount } var _ volume.Mounter = &gcePersistentDiskMounter{} @@ -217,12 +204,12 @@ func (b *gcePersistentDiskMounter) GetAttributes() volume.Attributes { } } -// SetUp attaches the disk and bind mounts to the volume path. +// SetUp bind mounts the disk global mount to the volume path. func (b *gcePersistentDiskMounter) SetUp(fsGroup *int64) error { return b.SetUpAt(b.GetPath(), fsGroup) } -// SetUpAt attaches the disk and bind mounts to the volume path. +// SetUp bind mounts the disk global mount to the give volume path. func (b *gcePersistentDiskMounter) SetUpAt(dir string, fsGroup *int64) error { // TODO: handle failed mounts here. notMnt, err := b.mounter.IsLikelyNotMountPoint(dir) @@ -234,14 +221,7 @@ func (b *gcePersistentDiskMounter) SetUpAt(dir string, fsGroup *int64) error { return nil } - globalPDPath := makeGlobalPDName(b.plugin.host, b.pdName) - if err := b.manager.AttachAndMountDisk(b, globalPDPath); err != nil { - return err - } - if err := os.MkdirAll(dir, 0750); err != nil { - // TODO: we should really eject the attach/detach out into its own control loop. - detachDiskLogError(b.gcePersistentDisk) return err } @@ -250,6 +230,8 @@ func (b *gcePersistentDiskMounter) SetUpAt(dir string, fsGroup *int64) error { if b.readOnly { options = append(options, "ro") } + + globalPDPath := makeGlobalPDName(b.plugin.host, b.pdName) err = b.mounter.Mount(globalPDPath, dir, "", options) if err != nil { notMnt, mntErr := b.mounter.IsLikelyNotMountPoint(dir) @@ -274,8 +256,6 @@ func (b *gcePersistentDiskMounter) SetUpAt(dir string, fsGroup *int64) error { } } os.Remove(dir) - // TODO: we should really eject the attach/detach out into its own control loop. - detachDiskLogError(b.gcePersistentDisk) return err } @@ -301,14 +281,12 @@ type gcePersistentDiskUnmounter struct { var _ volume.Unmounter = &gcePersistentDiskUnmounter{} -// Unmounts the bind mount, and detaches the disk only if the PD -// resource was the last reference to that disk on the kubelet. +// TearDown unmounts the bind mount func (c *gcePersistentDiskUnmounter) TearDown() error { return c.TearDownAt(c.GetPath()) } -// Unmounts the bind mount, and detaches the disk only if the PD -// resource was the last reference to that disk on the kubelet. +// TearDownAt unmounts the bind mount func (c *gcePersistentDiskUnmounter) TearDownAt(dir string) error { notMnt, err := c.mounter.IsLikelyNotMountPoint(dir) if err != nil { @@ -317,35 +295,18 @@ func (c *gcePersistentDiskUnmounter) TearDownAt(dir string) error { if notMnt { return os.Remove(dir) } - - refs, err := mount.GetMountRefs(c.mounter, dir) - if err != nil { - return err - } - // Unmount the bind-mount inside this pod if err := c.mounter.Unmount(dir); err != nil { return err } - // If len(refs) is 1, then all bind mounts have been removed, and the - // remaining reference is the global mount. It is safe to detach. - if len(refs) == 1 { - // c.pdName is not initially set for volume-unmounters, so set it here. - c.pdName = path.Base(refs[0]) - if err := c.manager.DetachDisk(c); err != nil { - return err - } - } notMnt, mntErr := c.mounter.IsLikelyNotMountPoint(dir) if mntErr != nil { glog.Errorf("IsLikelyNotMountPoint check failed: %v", mntErr) return err } if notMnt { - if err := os.Remove(dir); err != nil { - return err - } + return os.Remove(dir) } - return nil + return fmt.Errorf("Failed to unmount volume dir") } type gcePersistentDiskDeleter struct { @@ -393,7 +354,6 @@ func (c *gcePersistentDiskProvisioner) Provision() (*api.PersistentVolume, error PersistentVolumeSource: api.PersistentVolumeSource{ GCEPersistentDisk: &api.GCEPersistentDiskVolumeSource{ PDName: volumeID, - FSType: "ext4", Partition: 0, ReadOnly: false, }, diff --git a/pkg/volume/gce_pd/gce_pd_test.go b/pkg/volume/gce_pd/gce_pd_test.go index 9c2bf7e5ffe..e0ae1f51587 100644 --- a/pkg/volume/gce_pd/gce_pd_test.go +++ b/pkg/volume/gce_pd/gce_pd_test.go @@ -84,33 +84,6 @@ func contains(modes []api.PersistentVolumeAccessMode, mode api.PersistentVolumeA } type fakePDManager struct { - attachCalled bool - detachCalled bool -} - -// TODO(jonesdl) To fully test this, we could create a loopback device -// and mount that instead. -func (fake *fakePDManager) AttachAndMountDisk(b *gcePersistentDiskMounter, globalPDPath string) error { - globalPath := makeGlobalPDName(b.plugin.host, b.pdName) - err := os.MkdirAll(globalPath, 0750) - if err != nil { - return err - } - fake.attachCalled = true - // Simulate the global mount so that the fakeMounter returns the - // expected number of mounts for the attached disk. - b.mounter.Mount(globalPath, globalPath, b.fsType, nil) - return nil -} - -func (fake *fakePDManager) DetachDisk(c *gcePersistentDiskUnmounter) error { - globalPath := makeGlobalPDName(c.plugin.host, c.pdName) - err := os.RemoveAll(globalPath) - if err != nil { - return err - } - fake.detachCalled = true - return nil } func (fake *fakePDManager) CreateVolume(c *gcePersistentDiskProvisioner) (volumeID string, volumeSizeGB int, labels map[string]string, err error) { @@ -181,9 +154,6 @@ func TestPlugin(t *testing.T) { t.Errorf("SetUp() failed: %v", err) } } - if !fakeManager.attachCalled { - t.Errorf("Attach watch not called") - } fakeManager = &fakePDManager{} unmounter, err := plug.(*gcePersistentDiskPlugin).newUnmounterInternal("vol1", types.UID("poduid"), fakeManager, fakeMounter) @@ -202,9 +172,6 @@ func TestPlugin(t *testing.T) { } else if !os.IsNotExist(err) { t.Errorf("SetUp() failed: %v", err) } - if !fakeManager.detachCalled { - t.Errorf("Detach watch not called") - } // Test Provisioner cap := resource.MustParse("100Mi") diff --git a/pkg/volume/gce_pd/gce_util.go b/pkg/volume/gce_pd/gce_util.go index af9171f76ab..523262720aa 100644 --- a/pkg/volume/gce_pd/gce_util.go +++ b/pkg/volume/gce_pd/gce_util.go @@ -25,10 +25,10 @@ import ( "time" "github.com/golang/glog" + "k8s.io/kubernetes/pkg/cloudprovider" gcecloud "k8s.io/kubernetes/pkg/cloudprovider/providers/gce" "k8s.io/kubernetes/pkg/util/exec" - "k8s.io/kubernetes/pkg/util/keymutex" - "k8s.io/kubernetes/pkg/util/runtime" + "k8s.io/kubernetes/pkg/util/mount" "k8s.io/kubernetes/pkg/util/sets" "k8s.io/kubernetes/pkg/volume" ) @@ -46,74 +46,10 @@ const ( errorSleepDuration = 5 * time.Second ) -// Singleton key mutex for keeping attach/detach operations for the same PD atomic -var attachDetachMutex = keymutex.NewKeyMutex() - type GCEDiskUtil struct{} -// Attaches a disk specified by a volume.GCEPersistentDisk to the current kubelet. -// Mounts the disk to it's global path. -func (diskUtil *GCEDiskUtil) AttachAndMountDisk(b *gcePersistentDiskMounter, globalPDPath string) error { - glog.V(5).Infof("AttachAndMountDisk(...) called for PD %q. Will block for existing operations, if any. (globalPDPath=%q)\r\n", b.pdName, globalPDPath) - - // Block execution until any pending detach operations for this PD have completed - attachDetachMutex.LockKey(b.pdName) - defer attachDetachMutex.UnlockKey(b.pdName) - - glog.V(5).Infof("AttachAndMountDisk(...) called for PD %q. Awake and ready to execute. (globalPDPath=%q)\r\n", b.pdName, globalPDPath) - - sdBefore, err := filepath.Glob(diskSDPattern) - if err != nil { - glog.Errorf("Error filepath.Glob(\"%s\"): %v\r\n", diskSDPattern, err) - } - sdBeforeSet := sets.NewString(sdBefore...) - - devicePath, err := attachDiskAndVerify(b, sdBeforeSet) - if err != nil { - return err - } - - // Only mount the PD globally once. - notMnt, err := b.mounter.IsLikelyNotMountPoint(globalPDPath) - if err != nil { - if os.IsNotExist(err) { - if err := os.MkdirAll(globalPDPath, 0750); err != nil { - return err - } - notMnt = true - } else { - return err - } - } - options := []string{} - if b.readOnly { - options = append(options, "ro") - } - if notMnt { - err = b.diskMounter.FormatAndMount(devicePath, globalPDPath, b.fsType, options) - if err != nil { - os.Remove(globalPDPath) - return err - } - } - return nil -} - -// Unmounts the device and detaches the disk from the kubelet's host machine. -func (util *GCEDiskUtil) DetachDisk(c *gcePersistentDiskUnmounter) error { - glog.V(5).Infof("DetachDisk(...) for PD %q\r\n", c.pdName) - - if err := unmountPDAndRemoveGlobalPath(c); err != nil { - glog.Errorf("Error unmounting PD %q: %v", c.pdName, err) - } - - // Detach disk asynchronously so that the kubelet sync loop is not blocked. - go detachDiskAndVerify(c) - return nil -} - func (util *GCEDiskUtil) DeleteVolume(d *gcePersistentDiskDeleter) error { - cloud, err := getCloudProvider(d.gcePersistentDisk.plugin) + cloud, err := getCloudProvider(d.gcePersistentDisk.plugin.host.GetCloudProvider()) if err != nil { return err } @@ -129,7 +65,7 @@ func (util *GCEDiskUtil) DeleteVolume(d *gcePersistentDiskDeleter) error { // CreateVolume creates a GCE PD. // Returns: volumeID, volumeSizeGB, labels, error func (gceutil *GCEDiskUtil) CreateVolume(c *gcePersistentDiskProvisioner) (string, int, map[string]string, error) { - cloud, err := getCloudProvider(c.gcePersistentDisk.plugin) + cloud, err := getCloudProvider(c.gcePersistentDisk.plugin.host.GetCloudProvider()) if err != nil { return "", 0, nil, err } @@ -163,46 +99,6 @@ func (gceutil *GCEDiskUtil) CreateVolume(c *gcePersistentDiskProvisioner) (strin return name, int(requestGB), labels, nil } -// Attaches the specified persistent disk device to node, verifies that it is attached, and retries if it fails. -func attachDiskAndVerify(b *gcePersistentDiskMounter, sdBeforeSet sets.String) (string, error) { - devicePaths := getDiskByIdPaths(b.gcePersistentDisk) - gceCloud, err := getCloudProvider(b.gcePersistentDisk.plugin) - if err != nil { - return "", err - } - - for numRetries := 0; numRetries < maxRetries; numRetries++ { - - if numRetries > 0 { - glog.Warningf("Retrying attach for GCE PD %q (retry count=%v).", b.pdName, numRetries) - } - - if err := gceCloud.AttachDisk(b.pdName, b.plugin.host.GetHostName(), b.readOnly); err != nil { - glog.Errorf("Error attaching PD %q: %v", b.pdName, err) - time.Sleep(errorSleepDuration) - continue - } - - for numChecks := 0; numChecks < maxChecks; numChecks++ { - path, err := verifyDevicePath(devicePaths, sdBeforeSet) - if err != nil { - // Log error, if any, and continue checking periodically. See issue #11321 - glog.Errorf("Error verifying GCE PD (%q) is attached: %v", b.pdName, err) - } else if path != "" { - // A device path has successfully been created for the PD - glog.Infof("Successfully attached GCE PD %q.", b.pdName) - return path, nil - } - - // Sleep then check again - glog.V(3).Infof("Waiting for GCE PD %q to attach.", b.pdName) - time.Sleep(checkSleepDuration) - } - } - - return "", fmt.Errorf("Could not attach GCE PD %q. Timeout waiting for mount paths to be created.", b.pdName) -} - // Returns the first path that exists, or empty string if none exist. func verifyDevicePath(devicePaths []string, sdBeforeSet sets.String) (string, error) { if err := udevadmChangeToNewDrives(sdBeforeSet); err != nil { @@ -221,65 +117,10 @@ func verifyDevicePath(devicePaths []string, sdBeforeSet sets.String) (string, er return "", nil } -// Detaches the specified persistent disk device from node, verifies that it is detached, and retries if it fails. -// This function is intended to be called asynchronously as a go routine. -func detachDiskAndVerify(c *gcePersistentDiskUnmounter) { - glog.V(5).Infof("detachDiskAndVerify(...) for pd %q. Will block for pending operations", c.pdName) - defer runtime.HandleCrash() - - // Block execution until any pending attach/detach operations for this PD have completed - attachDetachMutex.LockKey(c.pdName) - defer attachDetachMutex.UnlockKey(c.pdName) - - glog.V(5).Infof("detachDiskAndVerify(...) for pd %q. Awake and ready to execute.", c.pdName) - - devicePaths := getDiskByIdPaths(c.gcePersistentDisk) - gceCloud, err := getCloudProvider(c.gcePersistentDisk.plugin) - if err != nil { - glog.Errorf("Failed to get GCECloudProvider while detaching %v ", err) - return - } - - for numRetries := 0; numRetries < maxRetries; numRetries++ { - - if numRetries > 0 { - glog.Warningf("Retrying detach for GCE PD %q (retry count=%v).", c.pdName, numRetries) - } - - if err := gceCloud.DetachDisk(c.pdName, c.plugin.host.GetHostName()); err != nil { - glog.Errorf("Error detaching PD %q: %v", c.pdName, err) - time.Sleep(errorSleepDuration) - continue - } - - for numChecks := 0; numChecks < maxChecks; numChecks++ { - allPathsRemoved, err := verifyAllPathsRemoved(devicePaths) - if err != nil { - // Log error, if any, and continue checking periodically. - glog.Errorf("Error verifying GCE PD (%q) is detached: %v", c.pdName, err) - } else if allPathsRemoved { - // All paths to the PD have been successfully removed - unmountPDAndRemoveGlobalPath(c) - glog.Infof("Successfully detached GCE PD %q.", c.pdName) - return - } - - // Sleep then check again - glog.V(3).Infof("Waiting for GCE PD %q to detach.", c.pdName) - time.Sleep(checkSleepDuration) - } - - } - - glog.Errorf("Failed to detach GCE PD %q. One or more mount paths was not removed.", c.pdName) -} - // Unmount the global PD mount, which should be the only one, and delete it. -func unmountPDAndRemoveGlobalPath(c *gcePersistentDiskUnmounter) error { - globalPDPath := makeGlobalPDName(c.plugin.host, c.pdName) - - err := c.mounter.Unmount(globalPDPath) - os.Remove(globalPDPath) +func unmountPDAndRemoveGlobalPath(globalMountPath string, mounter mount.Interface) error { + err := mounter.Unmount(globalMountPath) + os.Remove(globalMountPath) return err } @@ -302,15 +143,15 @@ func verifyAllPathsRemoved(devicePaths []string) (bool, error) { } // Returns list of all /dev/disk/by-id/* paths for given PD. -func getDiskByIdPaths(pd *gcePersistentDisk) []string { +func getDiskByIdPaths(pdName string, partition string) []string { devicePaths := []string{ - path.Join(diskByIdPath, diskGooglePrefix+pd.pdName), - path.Join(diskByIdPath, diskScsiGooglePrefix+pd.pdName), + path.Join(diskByIdPath, diskGooglePrefix+pdName), + path.Join(diskByIdPath, diskScsiGooglePrefix+pdName), } - if pd.partition != "" { + if partition != "" { for i, path := range devicePaths { - devicePaths[i] = path + diskPartitionSuffix + pd.partition + devicePaths[i] = path + diskPartitionSuffix + partition } } @@ -330,17 +171,9 @@ func pathExists(path string) (bool, error) { } // Return cloud provider -func getCloudProvider(plugin *gcePersistentDiskPlugin) (*gcecloud.GCECloud, error) { - if plugin == nil { - return nil, fmt.Errorf("Failed to get GCE Cloud Provider. plugin object is nil.") - } - if plugin.host == nil { - return nil, fmt.Errorf("Failed to get GCE Cloud Provider. plugin.host object is nil.") - } - +func getCloudProvider(cloudProvider cloudprovider.Interface) (*gcecloud.GCECloud, error) { var err error for numRetries := 0; numRetries < maxRetries; numRetries++ { - cloudProvider := plugin.host.GetCloudProvider() gceCloudProvider, ok := cloudProvider.(*gcecloud.GCECloud) if !ok || gceCloudProvider == nil { // Retry on error. See issue #11321 @@ -355,8 +188,10 @@ func getCloudProvider(plugin *gcePersistentDiskPlugin) (*gcecloud.GCECloud, erro return nil, fmt.Errorf("Failed to get GCE GCECloudProvider with error %v", err) } -// Calls "udevadm trigger --action=change" for newly created "/dev/sd*" drives (exist only in after set). -// This is workaround for Issue #7972. Once the underlying issue has been resolved, this may be removed. +// Triggers the application of udev rules by calling "udevadm trigger +// --action=change" for newly created "/dev/sd*" drives (exist only in +// after set). This is workaround for Issue #7972. Once the underlying +// issue has been resolved, this may be removed. func udevadmChangeToNewDrives(sdBeforeSet sets.String) error { sdAfter, err := filepath.Glob(diskSDPattern) if err != nil {