Merge pull request #28714 from ciwang/robust-unmount

Automatic merge from submit-queue

Add OpenFile check if device is in use before unmount

Fixes #28252
This commit is contained in:
k8s-merge-robot 2016-07-18 17:45:54 -07:00 committed by GitHub
commit adef589e37
19 changed files with 227 additions and 25 deletions

View File

@ -51,6 +51,19 @@ func (mi *fakeMountInterface) IsLikelyNotMountPoint(file string) (bool, error) {
return false, fmt.Errorf("unsupported")
}
func (mi *fakeMountInterface) DeviceOpened(pathname string) (bool, error) {
for _, mp := range mi.mountPoints {
if mp.Device == pathname {
return true, nil
}
}
return false, nil
}
func (mi *fakeMountInterface) PathIsDevice(pathname string) (bool, error) {
return true, nil
}
func fakeContainerMgrMountInt() mount.Interface {
return &fakeMountInterface{
[]mount.MountPoint{

View File

@ -44,6 +44,19 @@ func (mi *fakeMountInterface) IsLikelyNotMountPoint(file string) (bool, error) {
return false, fmt.Errorf("unsupported")
}
func (mi *fakeMountInterface) DeviceOpened(pathname string) (bool, error) {
for _, mp := range mi.mountPoints {
if mp.Device == pathname {
return true, nil
}
}
return false, nil
}
func (mi *fakeMountInterface) PathIsDevice(pathname string) (bool, error) {
return true, nil
}
func fakeContainerMgrMountInt() mount.Interface {
return &fakeMountInterface{
[]mount.MountPoint{

View File

@ -405,6 +405,21 @@ func NewMainKubelet(
klet.podCache = kubecontainer.NewCache()
klet.podManager = kubepod.NewBasicPodManager(kubepod.NewBasicMirrorClient(klet.kubeClient))
klet.volumePluginMgr, err =
NewInitializedVolumePluginMgr(klet, volumePlugins)
if err != nil {
return nil, err
}
klet.volumeManager, err = volumemanager.NewVolumeManager(
enableControllerAttachDetach,
hostname,
klet.podManager,
klet.kubeClient,
klet.volumePluginMgr,
klet.containerRuntime,
mounter)
// Initialize the runtime.
switch containerRuntime {
case "docker":
@ -510,7 +525,8 @@ func NewMainKubelet(
klet.podManager,
klet.kubeClient,
klet.volumePluginMgr,
klet.containerRuntime)
klet.containerRuntime,
mounter)
runtimeCache, err := kubecontainer.NewRuntimeCache(klet.containerRuntime)
if err != nil {

View File

@ -284,13 +284,15 @@ func newTestKubeletWithImageList(
t.Fatalf("failed to initialize VolumePluginMgr: %v", err)
}
kubelet.mounter = &mount.FakeMounter{}
kubelet.volumeManager, err = kubeletvolume.NewVolumeManager(
controllerAttachDetachEnabled,
kubelet.hostname,
kubelet.podManager,
fakeKubeClient,
kubelet.volumePluginMgr,
fakeRuntime)
fakeRuntime,
kubelet.mounter)
if err != nil {
t.Fatalf("failed to initialize volume manager: %v", err)
}
@ -432,7 +434,6 @@ func TestSyncPodsDeletesWhenSourcesAreReady(t *testing.T) {
func TestVolumeAttachAndMountControllerDisabled(t *testing.T) {
testKubelet := newTestKubelet(t, false /* controllerAttachDetachEnabled */)
kubelet := testKubelet.kubelet
kubelet.mounter = &mount.FakeMounter{}
pod := podWithUidNameNsSpec("12345678", "foo", "test", api.PodSpec{
Volumes: []api.Volume{
@ -503,7 +504,6 @@ func TestVolumeAttachAndMountControllerDisabled(t *testing.T) {
func TestVolumeUnmountAndDetachControllerDisabled(t *testing.T) {
testKubelet := newTestKubelet(t, false /* controllerAttachDetachEnabled */)
kubelet := testKubelet.kubelet
kubelet.mounter = &mount.FakeMounter{}
pod := podWithUidNameNsSpec("12345678", "foo", "test", api.PodSpec{
Volumes: []api.Volume{
@ -615,7 +615,6 @@ func TestVolumeUnmountAndDetachControllerDisabled(t *testing.T) {
func TestVolumeAttachAndMountControllerEnabled(t *testing.T) {
testKubelet := newTestKubelet(t, true /* controllerAttachDetachEnabled */)
kubelet := testKubelet.kubelet
kubelet.mounter = &mount.FakeMounter{}
kubeClient := testKubelet.fakeKubeClient
kubeClient.AddReactor("get", "nodes",
func(action core.Action) (bool, runtime.Object, error) {
@ -710,7 +709,6 @@ func TestVolumeAttachAndMountControllerEnabled(t *testing.T) {
func TestVolumeUnmountAndDetachControllerEnabled(t *testing.T) {
testKubelet := newTestKubelet(t, true /* controllerAttachDetachEnabled */)
kubelet := testKubelet.kubelet
kubelet.mounter = &mount.FakeMounter{}
kubeClient := testKubelet.fakeKubeClient
kubeClient.AddReactor("get", "nodes",
func(action core.Action) (bool, runtime.Object, error) {

View File

@ -98,7 +98,8 @@ func TestRunOnce(t *testing.T) {
kb.podManager,
kb.kubeClient,
kb.volumePluginMgr,
fakeRuntime)
fakeRuntime,
kb.mounter)
kb.networkPlugin, _ = network.InitNetworkPlugin([]network.NetworkPlugin{}, "", nettest.NewFakeHost(nil), componentconfig.HairpinNone, kb.nonMasqueradeCIDR)
// TODO: Factor out "StatsProvider" from Kubelet so we don't have a cyclic dependency

View File

@ -582,7 +582,8 @@ func (asw *actualStateOfWorld) newAttachedVolume(
VolumeName: attachedVolume.volumeName,
VolumeSpec: attachedVolume.spec,
NodeName: asw.nodeName,
PluginIsAttachable: attachedVolume.pluginIsAttachable},
PluginIsAttachable: attachedVolume.pluginIsAttachable,
DevicePath: attachedVolume.devicePath},
GloballyMounted: attachedVolume.globallyMounted}
}

View File

@ -26,6 +26,7 @@ import (
"k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset"
"k8s.io/kubernetes/pkg/kubelet/volumemanager/cache"
"k8s.io/kubernetes/pkg/util/goroutinemap"
"k8s.io/kubernetes/pkg/util/mount"
"k8s.io/kubernetes/pkg/util/wait"
"k8s.io/kubernetes/pkg/volume/util/operationexecutor"
)
@ -62,6 +63,7 @@ type Reconciler interface {
// operationExecutor - used to trigger attach/detach/mount/unmount operations
// safely (prevents more than one operation from being triggered on the same
// volume)
// mounter - mounter passed in from kubelet, passed down unmount path
func NewReconciler(
kubeClient internalclientset.Interface,
controllerAttachDetachEnabled bool,
@ -70,7 +72,8 @@ func NewReconciler(
hostName string,
desiredStateOfWorld cache.DesiredStateOfWorld,
actualStateOfWorld cache.ActualStateOfWorld,
operationExecutor operationexecutor.OperationExecutor) Reconciler {
operationExecutor operationexecutor.OperationExecutor,
mounter mount.Interface) Reconciler {
return &reconciler{
kubeClient: kubeClient,
controllerAttachDetachEnabled: controllerAttachDetachEnabled,
@ -80,6 +83,7 @@ func NewReconciler(
desiredStateOfWorld: desiredStateOfWorld,
actualStateOfWorld: actualStateOfWorld,
operationExecutor: operationExecutor,
mounter: mounter,
}
}
@ -92,6 +96,7 @@ type reconciler struct {
desiredStateOfWorld cache.DesiredStateOfWorld
actualStateOfWorld cache.ActualStateOfWorld
operationExecutor operationexecutor.OperationExecutor
mounter mount.Interface
}
func (rc *reconciler) Run(stopCh <-chan struct{}) {
@ -264,7 +269,7 @@ func (rc *reconciler) reconciliationLoopFunc() func() {
attachedVolume.VolumeName,
attachedVolume.VolumeSpec.Name())
err := rc.operationExecutor.UnmountDevice(
attachedVolume.AttachedVolume, rc.actualStateOfWorld)
attachedVolume.AttachedVolume, rc.actualStateOfWorld, rc.mounter)
if err != nil &&
!goroutinemap.IsAlreadyExists(err) &&
!goroutinemap.IsExponentialBackoff(err) {

View File

@ -27,6 +27,7 @@ import (
"k8s.io/kubernetes/pkg/client/testing/core"
"k8s.io/kubernetes/pkg/kubelet/volumemanager/cache"
"k8s.io/kubernetes/pkg/runtime"
"k8s.io/kubernetes/pkg/util/mount"
"k8s.io/kubernetes/pkg/util/wait"
"k8s.io/kubernetes/pkg/volume"
volumetesting "k8s.io/kubernetes/pkg/volume/testing"
@ -62,7 +63,8 @@ func Test_Run_Positive_DoNothing(t *testing.T) {
nodeName,
dsw,
asw,
oex)
oex,
&mount.FakeMounter{})
// Act
go reconciler.Run(wait.NeverStop)
@ -94,7 +96,8 @@ func Test_Run_Positive_VolumeAttachAndMount(t *testing.T) {
nodeName,
dsw,
asw,
oex)
oex,
&mount.FakeMounter{})
pod := &api.Pod{
ObjectMeta: api.ObjectMeta{
Name: "pod1",
@ -161,7 +164,8 @@ func Test_Run_Positive_VolumeMountControllerAttachEnabled(t *testing.T) {
nodeName,
dsw,
asw,
oex)
oex,
&mount.FakeMounter{})
pod := &api.Pod{
ObjectMeta: api.ObjectMeta{
Name: "pod1",
@ -228,7 +232,8 @@ func Test_Run_Positive_VolumeAttachMountUnmountDetach(t *testing.T) {
nodeName,
dsw,
asw,
oex)
oex,
&mount.FakeMounter{})
pod := &api.Pod{
ObjectMeta: api.ObjectMeta{
Name: "pod1",
@ -307,7 +312,8 @@ func Test_Run_Positive_VolumeUnmountControllerAttachEnabled(t *testing.T) {
nodeName,
dsw,
asw,
oex)
oex,
&mount.FakeMounter{})
pod := &api.Pod{
ObjectMeta: api.ObjectMeta{
Name: "pod1",

View File

@ -31,6 +31,7 @@ import (
"k8s.io/kubernetes/pkg/kubelet/volumemanager/cache"
"k8s.io/kubernetes/pkg/kubelet/volumemanager/populator"
"k8s.io/kubernetes/pkg/kubelet/volumemanager/reconciler"
"k8s.io/kubernetes/pkg/util/mount"
"k8s.io/kubernetes/pkg/util/runtime"
"k8s.io/kubernetes/pkg/util/sets"
"k8s.io/kubernetes/pkg/util/wait"
@ -142,7 +143,8 @@ func NewVolumeManager(
podManager pod.Manager,
kubeClient internalclientset.Interface,
volumePluginMgr *volume.VolumePluginMgr,
kubeContainerRuntime kubecontainer.Runtime) (VolumeManager, error) {
kubeContainerRuntime kubecontainer.Runtime,
mounter mount.Interface) (VolumeManager, error) {
vm := &volumeManager{
kubeClient: kubeClient,
volumePluginMgr: volumePluginMgr,
@ -161,7 +163,8 @@ func NewVolumeManager(
hostName,
vm.desiredStateOfWorld,
vm.actualStateOfWorld,
vm.operationExecutor)
vm.operationExecutor,
mounter)
vm.desiredStateOfWorldPopulator = populator.NewDesiredStateOfWorldPopulator(
kubeClient,
desiredStateOfWorldPopulatorLoopSleepPeriod,

View File

@ -81,7 +81,7 @@ func (f *FakeMounter) Mount(source string, target string, fstype string, options
}
f.MountPoints = append(f.MountPoints, MountPoint{Device: source, Path: target, Type: fstype})
glog.V(5).Infof("Fake mounter: mouted %s to %s", source, target)
glog.V(5).Infof("Fake mounter: mounted %s to %s", source, target)
f.Log = append(f.Log, FakeAction{Action: FakeActionMount, Target: target, Source: source, FSType: fstype})
return nil
}
@ -93,7 +93,7 @@ func (f *FakeMounter) Unmount(target string) error {
newMountpoints := []MountPoint{}
for _, mp := range f.MountPoints {
if mp.Path == target {
glog.V(5).Infof("Fake mounter: unmouted %s from %s", mp.Device, target)
glog.V(5).Infof("Fake mounter: unmounted %s from %s", mp.Device, target)
// Don't copy it to newMountpoints
continue
}
@ -117,10 +117,26 @@ func (f *FakeMounter) IsLikelyNotMountPoint(file string) (bool, error) {
for _, mp := range f.MountPoints {
if mp.Path == file {
glog.V(5).Infof("isLikelyMountPoint for %s: monted %s, false", file, mp.Path)
glog.V(5).Infof("isLikelyMountPoint for %s: mounted %s, false", file, mp.Path)
return false, nil
}
}
glog.V(5).Infof("isLikelyMountPoint for %s: true", file)
return true, nil
}
func (f *FakeMounter) DeviceOpened(pathname string) (bool, error) {
f.mutex.Lock()
defer f.mutex.Unlock()
for _, mp := range f.MountPoints {
if mp.Device == pathname {
return true, nil
}
}
return false, nil
}
func (f *FakeMounter) PathIsDevice(pathname string) (bool, error) {
return true, nil
}

View File

@ -19,9 +19,10 @@ limitations under the License.
package mount
import (
"path/filepath"
"github.com/golang/glog"
"k8s.io/kubernetes/pkg/util/exec"
"path/filepath"
)
type Interface interface {
@ -37,6 +38,11 @@ type Interface interface {
// IsLikelyNotMountPoint determines if a directory is a mountpoint.
// It should return ErrNotExist when the directory does not exist.
IsLikelyNotMountPoint(file string) (bool, error)
// DeviceOpened determines if the device is in use elsewhere
// on the system, i.e. still mounted.
DeviceOpened(pathname string) (bool, error)
// PathIsDevice determines if a path is a device.
PathIsDevice(pathname string) (bool, error)
}
// This represents a single line in /proc/mounts or /etc/fstab.

View File

@ -171,6 +171,56 @@ func (mounter *Mounter) IsLikelyNotMountPoint(file string) (bool, error) {
return true, nil
}
// DeviceOpened checks if block device in use by calling Open with O_EXCL flag.
// Returns true if open returns errno EBUSY, and false if errno is nil.
// Returns an error if errno is any error other than EBUSY.
// Returns with error if pathname is not a device.
func (mounter *Mounter) DeviceOpened(pathname string) (bool, error) {
return exclusiveOpenFailsOnDevice(pathname)
}
// PathIsDevice uses FileInfo returned from os.Stat to check if path refers
// to a device.
func (mounter *Mounter) PathIsDevice(pathname string) (bool, error) {
return pathIsDevice(pathname)
}
func exclusiveOpenFailsOnDevice(pathname string) (bool, error) {
if isDevice, err := pathIsDevice(pathname); !isDevice {
return false, fmt.Errorf(
"PathIsDevice failed for path %q: %v",
pathname,
err)
}
fd, errno := syscall.Open(pathname, syscall.O_RDONLY|syscall.O_EXCL, 0)
// If the device is in use, open will return an invalid fd.
// When this happens, it is expected that Close will fail and throw an error.
defer syscall.Close(fd)
if errno == nil {
// device not in use
return false, nil
} else if errno == syscall.EBUSY {
// device is in use
return true, nil
}
// error during call to Open
return false, errno
}
func pathIsDevice(pathname string) (bool, error) {
finfo, err := os.Stat(pathname)
// err in call to os.Stat
if err != nil {
return false, err
}
// path refers to a device
if finfo.Mode()&os.ModeDevice != 0 {
return true, nil
}
// path does not refer to device
return false, nil
}
func listProcMounts(mountFilePath string) ([]MountPoint, error) {
hash1, err := readProcMounts(mountFilePath, nil)
if err != nil {

View File

@ -36,6 +36,14 @@ func (mounter *Mounter) IsLikelyNotMountPoint(file string) (bool, error) {
return true, nil
}
func (mounter *Mounter) DeviceOpened(pathname string) (bool, error) {
return false, nil
}
func (mounter *Mounter) PathIsDevice(pathname string) (bool, error) {
return true, nil
}
func (mounter *SafeFormatAndMount) formatAndMount(source string, target string, fstype string, options []string) error {
return nil
}

View File

@ -203,6 +203,20 @@ func (n *NsenterMounter) IsLikelyNotMountPoint(file string) (bool, error) {
return true, nil
}
// DeviceOpened checks if block device in use by calling Open with O_EXCL flag.
// Returns true if open returns errno EBUSY, and false if errno is nil.
// Returns an error if errno is any error other than EBUSY.
// Returns with error if pathname is not a device.
func (n *NsenterMounter) DeviceOpened(pathname string) (bool, error) {
return exclusiveOpenFailsOnDevice(pathname)
}
// PathIsDevice uses FileInfo returned from os.Stat to check if path refers
// to a device.
func (n *NsenterMounter) PathIsDevice(pathname string) (bool, error) {
return pathIsDevice(pathname)
}
func (n *NsenterMounter) absHostPath(command string) string {
path, ok := n.paths[command]
if !ok {

View File

@ -41,3 +41,11 @@ func (*NsenterMounter) List() ([]MountPoint, error) {
func (*NsenterMounter) IsLikelyNotMountPoint(file string) (bool, error) {
return true, nil
}
func (*NsenterMounter) DeviceOpened(pathname string) (bool, error) {
return false, nil
}
func (*NsenterMounter) PathIsDevice(pathname string) (bool, error) {
return true, nil
}

View File

@ -116,6 +116,12 @@ func verifyDevicePath(devicePaths []string) (string, error) {
// Unmount the global mount path, which should be the only one, and delete it.
func unmountPDAndRemoveGlobalPath(globalMountPath string, mounter mount.Interface) error {
if pathExists, pathErr := pathExists(globalMountPath); pathErr != nil {
return fmt.Errorf("Error checking if path exists: %v", pathErr)
} else if !pathExists {
glog.V(5).Infof("Warning: Unmount skipped because path does not exist: %v", globalMountPath)
return nil
}
err := mounter.Unmount(globalMountPath)
os.Remove(globalMountPath)
return err

View File

@ -279,6 +279,12 @@ func pathExists(path string) (bool, error) {
// Unmount the global mount path, which should be the only one, and delete it.
func unmountPDAndRemoveGlobalPath(globalMountPath string, mounter mount.Interface) error {
if pathExists, pathErr := pathExists(globalMountPath); pathErr != nil {
return fmt.Errorf("Error checking if path exists: %v", pathErr)
} else if !pathExists {
glog.V(5).Infof("Warning: Unmount skipped because path does not exist: %v", globalMountPath)
return nil
}
err := mounter.Unmount(globalMountPath)
os.Remove(globalMountPath)
return err

View File

@ -125,7 +125,14 @@ func verifyDevicePath(devicePaths []string, sdBeforeSet sets.String) (string, er
}
// Unmount the global PD mount, which should be the only one, and delete it.
// Does nothing if globalMountPath does not exist.
func unmountPDAndRemoveGlobalPath(globalMountPath string, mounter mount.Interface) error {
if pathExists, pathErr := pathExists(globalMountPath); pathErr != nil {
return fmt.Errorf("Error checking if path exists: %v", pathErr)
} else if !pathExists {
glog.V(5).Infof("Warning: Unmount skipped because path does not exist: %v", globalMountPath)
return nil
}
err := mounter.Unmount(globalMountPath)
os.Remove(globalMountPath)
return err

View File

@ -28,6 +28,7 @@ import (
"k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset"
"k8s.io/kubernetes/pkg/types"
"k8s.io/kubernetes/pkg/util/goroutinemap"
"k8s.io/kubernetes/pkg/util/mount"
"k8s.io/kubernetes/pkg/volume"
volumetypes "k8s.io/kubernetes/pkg/volume/util/types"
)
@ -83,7 +84,7 @@ type OperationExecutor interface {
// UnmountDevice unmounts the volumes global mount path from the device (for
// attachable volumes only, freeing it for detach. It then updates the
// actual state of the world to reflect that.
UnmountDevice(deviceToDetach AttachedVolume, actualStateOfWorld ActualStateOfWorldMounterUpdater) error
UnmountDevice(deviceToDetach AttachedVolume, actualStateOfWorld ActualStateOfWorldMounterUpdater, mounter mount.Interface) error
// VerifyControllerAttachedVolume checks if the specified volume is present
// in the specified nodes AttachedVolumes Status field. It uses kubeClient
@ -206,6 +207,10 @@ type AttachedVolume struct {
// PluginIsAttachable indicates that the plugin for this volume implements
// the volume.Attacher interface
PluginIsAttachable bool
// DevicePath contains the path on the node where the volume is attached.
// For non-attachable volumes this is empty.
DevicePath string
}
// MountedVolume represents a volume that has successfully been mounted to a pod.
@ -382,9 +387,10 @@ func (oe *operationExecutor) UnmountVolume(
func (oe *operationExecutor) UnmountDevice(
deviceToDetach AttachedVolume,
actualStateOfWorld ActualStateOfWorldMounterUpdater) error {
actualStateOfWorld ActualStateOfWorldMounterUpdater,
mounter mount.Interface) error {
unmountDeviceFunc, err :=
oe.generateUnmountDeviceFunc(deviceToDetach, actualStateOfWorld)
oe.generateUnmountDeviceFunc(deviceToDetach, actualStateOfWorld, mounter)
if err != nil {
return err
}
@ -811,7 +817,8 @@ func (oe *operationExecutor) generateUnmountVolumeFunc(
func (oe *operationExecutor) generateUnmountDeviceFunc(
deviceToDetach AttachedVolume,
actualStateOfWorld ActualStateOfWorldMounterUpdater) (func() error, error) {
actualStateOfWorld ActualStateOfWorldMounterUpdater,
mounter mount.Interface) (func() error, error) {
// Get attacher plugin
attachableVolumePlugin, err :=
oe.volumePluginMgr.FindAttachablePluginBySpec(deviceToDetach.VolumeSpec)
@ -863,6 +870,24 @@ func (oe *operationExecutor) generateUnmountDeviceFunc(
deviceToDetach.VolumeSpec.Name(),
unmountDeviceErr)
}
// Before logging that UnmountDevice succeeded and moving on,
// use mounter.DeviceOpened to check if the device is in use anywhere
// else on the system. Retry if it returns true.
deviceOpened, deviceOpenedErr := mounter.DeviceOpened(deviceToDetach.DevicePath)
if deviceOpenedErr != nil {
return fmt.Errorf(
"UnmountDevice.DeviceOpened failed for volume %q (spec.Name: %q) with: %v",
deviceToDetach.VolumeName,
deviceToDetach.VolumeSpec.Name(),
deviceOpenedErr)
}
// The device is still in use elsewhere. Caller will log and retry.
if deviceOpened {
return fmt.Errorf(
"UnmountDevice failed for volume %q (spec.Name: %q) because the device is in use when it was no longer expected to be in use",
deviceToDetach.VolumeName,
deviceToDetach.VolumeSpec.Name())
}
glog.Infof(
"UnmountDevice succeeded for volume %q (spec.Name: %q).",