mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-25 04:33:26 +00:00
Merge pull request #27970 from jingxu97/restartKubelet-6-22
Automatic merge from submit-queue Add volume reconstruct/cleanup logic in kubelet volume manager Currently kubelet volume management works on the concept of desired and actual world of states. The volume manager periodically compares the two worlds and perform volume mount/unmount and/or attach/detach operations. When kubelet restarts, the cache of those two worlds are gone. Although desired world can be recovered through apiserver, actual world can not be recovered which may cause some volumes cannot be cleaned up if their information is deleted by apiserver. This change adds the reconstruction of the actual world by reading the pod directories from disk. The reconstructed volume information is added to both desired world and actual world if it cannot be found in either world. The rest logic would be as same as before, desired world populator may clean up the volume entry if it is no longer in apiserver, and then volume manager should invoke unmount to clean it up. Fixes https://github.com/kubernetes/kubernetes/issues/27653
This commit is contained in:
commit
79ed7064ca
@ -29,6 +29,13 @@ import (
|
|||||||
"github.com/spf13/pflag"
|
"github.com/spf13/pflag"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
const (
|
||||||
|
DefaultKubeletPodsDirName = "pods"
|
||||||
|
DefaultKubeletVolumesDirName = "volumes"
|
||||||
|
DefaultKubeletPluginsDirName = "plugins"
|
||||||
|
DefaultKubeletContainersDirName = "containers"
|
||||||
|
)
|
||||||
|
|
||||||
// KubeletServer encapsulates all of the parameters necessary for starting up
|
// KubeletServer encapsulates all of the parameters necessary for starting up
|
||||||
// a kubelet. These can either be set via command line or directly.
|
// a kubelet. These can either be set via command line or directly.
|
||||||
type KubeletServer struct {
|
type KubeletServer struct {
|
||||||
|
@ -1056,6 +1056,10 @@ func (plugin *mockVolumePlugin) RequiresRemount() bool {
|
|||||||
return false
|
return false
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (plugin *mockVolumePlugin) ConstructVolumeSpec(volumeName, mountPath string) (*vol.Spec, error) {
|
||||||
|
return nil, nil
|
||||||
|
}
|
||||||
|
|
||||||
func (plugin *mockVolumePlugin) NewMounter(spec *vol.Spec, podRef *api.Pod, opts vol.VolumeOptions) (vol.Mounter, error) {
|
func (plugin *mockVolumePlugin) NewMounter(spec *vol.Spec, podRef *api.Pod, opts vol.VolumeOptions) (vol.Mounter, error) {
|
||||||
return nil, fmt.Errorf("Mounter is not supported by this plugin")
|
return nil, fmt.Errorf("Mounter is not supported by this plugin")
|
||||||
}
|
}
|
||||||
|
@ -50,6 +50,9 @@ func (mi *fakeMountInterface) List() ([]mount.MountPoint, error) {
|
|||||||
func (mi *fakeMountInterface) IsLikelyNotMountPoint(file string) (bool, error) {
|
func (mi *fakeMountInterface) IsLikelyNotMountPoint(file string) (bool, error) {
|
||||||
return false, fmt.Errorf("unsupported")
|
return false, fmt.Errorf("unsupported")
|
||||||
}
|
}
|
||||||
|
func (mi *fakeMountInterface) GetDeviceNameFromMount(mountPath, pluginDir string) (string, error) {
|
||||||
|
return "", nil
|
||||||
|
}
|
||||||
|
|
||||||
func (mi *fakeMountInterface) DeviceOpened(pathname string) (bool, error) {
|
func (mi *fakeMountInterface) DeviceOpened(pathname string) (bool, error) {
|
||||||
for _, mp := range mi.mountPoints {
|
for _, mp := range mi.mountPoints {
|
||||||
|
@ -514,6 +514,7 @@ func NewMainKubelet(
|
|||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// setup volumeManager
|
||||||
klet.volumeManager, err = volumemanager.NewVolumeManager(
|
klet.volumeManager, err = volumemanager.NewVolumeManager(
|
||||||
enableControllerAttachDetach,
|
enableControllerAttachDetach,
|
||||||
nodeName,
|
nodeName,
|
||||||
@ -521,7 +522,8 @@ func NewMainKubelet(
|
|||||||
klet.kubeClient,
|
klet.kubeClient,
|
||||||
klet.volumePluginMgr,
|
klet.volumePluginMgr,
|
||||||
klet.containerRuntime,
|
klet.containerRuntime,
|
||||||
mounter)
|
mounter,
|
||||||
|
klet.getPodsDir())
|
||||||
|
|
||||||
runtimeCache, err := kubecontainer.NewRuntimeCache(klet.containerRuntime)
|
runtimeCache, err := kubecontainer.NewRuntimeCache(klet.containerRuntime)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@ -957,7 +959,7 @@ func (kl *Kubelet) Run(updates <-chan kubetypes.PodUpdate) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Start volume manager
|
// Start volume manager
|
||||||
go kl.volumeManager.Run(wait.NeverStop)
|
go kl.volumeManager.Run(kl.sourcesReady, wait.NeverStop)
|
||||||
|
|
||||||
if kl.kubeClient != nil {
|
if kl.kubeClient != nil {
|
||||||
// Start syncing node status immediately, this may set up things the runtime needs to run.
|
// Start syncing node status immediately, this may set up things the runtime needs to run.
|
||||||
@ -1731,16 +1733,21 @@ func (kl *Kubelet) cleanupOrphanedPodDirs(
|
|||||||
if allPods.Has(string(uid)) {
|
if allPods.Has(string(uid)) {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
// If volumes have not been unmounted/detached, do not delete directory.
|
||||||
|
// Doing so may result in corruption of data.
|
||||||
if podVolumesExist := kl.podVolumesExist(uid); podVolumesExist {
|
if podVolumesExist := kl.podVolumesExist(uid); podVolumesExist {
|
||||||
// If volumes have not been unmounted/detached, do not delete directory.
|
|
||||||
// Doing so may result in corruption of data.
|
|
||||||
glog.V(3).Infof("Orphaned pod %q found, but volumes are not cleaned up; err: %v", uid, err)
|
glog.V(3).Infof("Orphaned pod %q found, but volumes are not cleaned up; err: %v", uid, err)
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
// Check whether volume is still mounted on disk. If so, do not delete directory
|
||||||
|
if volumeNames, err := kl.getPodVolumeNameListFromDisk(uid); err != nil || len(volumeNames) != 0 {
|
||||||
|
glog.V(3).Infof("Orphaned pod %q found, but volumes are still mounted; err: %v, volumes: %v ", uid, err, volumeNames)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
glog.V(3).Infof("Orphaned pod %q found, removing", uid)
|
glog.V(3).Infof("Orphaned pod %q found, removing", uid)
|
||||||
if err := os.RemoveAll(kl.getPodDir(uid)); err != nil {
|
if err := os.RemoveAll(kl.getPodDir(uid)); err != nil {
|
||||||
glog.Infof("Failed to remove orphaned pod %q dir; err: %v", uid, err)
|
glog.Errorf("Failed to remove orphaned pod %q dir; err: %v", uid, err)
|
||||||
errlist = append(errlist, err)
|
errlist = append(errlist, err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -18,14 +18,17 @@ package kubelet
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"fmt"
|
"fmt"
|
||||||
|
"io/ioutil"
|
||||||
"net"
|
"net"
|
||||||
"path"
|
"path"
|
||||||
|
|
||||||
"github.com/golang/glog"
|
"github.com/golang/glog"
|
||||||
|
"k8s.io/kubernetes/cmd/kubelet/app/options"
|
||||||
"k8s.io/kubernetes/pkg/api"
|
"k8s.io/kubernetes/pkg/api"
|
||||||
"k8s.io/kubernetes/pkg/kubelet/cm"
|
"k8s.io/kubernetes/pkg/kubelet/cm"
|
||||||
kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
|
kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
|
||||||
"k8s.io/kubernetes/pkg/types"
|
"k8s.io/kubernetes/pkg/types"
|
||||||
|
"k8s.io/kubernetes/pkg/util"
|
||||||
nodeutil "k8s.io/kubernetes/pkg/util/node"
|
nodeutil "k8s.io/kubernetes/pkg/util/node"
|
||||||
)
|
)
|
||||||
|
|
||||||
@ -40,7 +43,7 @@ func (kl *Kubelet) getRootDir() string {
|
|||||||
// getPodsDir returns the full path to the directory under which pod
|
// getPodsDir returns the full path to the directory under which pod
|
||||||
// directories are created.
|
// directories are created.
|
||||||
func (kl *Kubelet) getPodsDir() string {
|
func (kl *Kubelet) getPodsDir() string {
|
||||||
return path.Join(kl.getRootDir(), "pods")
|
return path.Join(kl.getRootDir(), options.DefaultKubeletPodsDirName)
|
||||||
}
|
}
|
||||||
|
|
||||||
// getPluginsDir returns the full path to the directory under which plugin
|
// getPluginsDir returns the full path to the directory under which plugin
|
||||||
@ -48,7 +51,7 @@ func (kl *Kubelet) getPodsDir() string {
|
|||||||
// they need to persist. Plugins should create subdirectories under this named
|
// they need to persist. Plugins should create subdirectories under this named
|
||||||
// after their own names.
|
// after their own names.
|
||||||
func (kl *Kubelet) getPluginsDir() string {
|
func (kl *Kubelet) getPluginsDir() string {
|
||||||
return path.Join(kl.getRootDir(), "plugins")
|
return path.Join(kl.getRootDir(), options.DefaultKubeletPluginsDirName)
|
||||||
}
|
}
|
||||||
|
|
||||||
// getPluginDir returns a data directory name for a given plugin name.
|
// getPluginDir returns a data directory name for a given plugin name.
|
||||||
@ -90,7 +93,7 @@ func (kl *Kubelet) getPodDir(podUID types.UID) string {
|
|||||||
// which volumes are created for the specified pod. This directory may not
|
// which volumes are created for the specified pod. This directory may not
|
||||||
// exist if the pod does not exist.
|
// exist if the pod does not exist.
|
||||||
func (kl *Kubelet) getPodVolumesDir(podUID types.UID) string {
|
func (kl *Kubelet) getPodVolumesDir(podUID types.UID) string {
|
||||||
return path.Join(kl.getPodDir(podUID), "volumes")
|
return path.Join(kl.getPodDir(podUID), options.DefaultKubeletVolumesDirName)
|
||||||
}
|
}
|
||||||
|
|
||||||
// getPodVolumeDir returns the full path to the directory which represents the
|
// getPodVolumeDir returns the full path to the directory which represents the
|
||||||
@ -104,7 +107,7 @@ func (kl *Kubelet) getPodVolumeDir(podUID types.UID, pluginName string, volumeNa
|
|||||||
// which plugins may store data for the specified pod. This directory may not
|
// which plugins may store data for the specified pod. This directory may not
|
||||||
// exist if the pod does not exist.
|
// exist if the pod does not exist.
|
||||||
func (kl *Kubelet) getPodPluginsDir(podUID types.UID) string {
|
func (kl *Kubelet) getPodPluginsDir(podUID types.UID) string {
|
||||||
return path.Join(kl.getPodDir(podUID), "plugins")
|
return path.Join(kl.getPodDir(podUID), options.DefaultKubeletPluginsDirName)
|
||||||
}
|
}
|
||||||
|
|
||||||
// getPodPluginDir returns a data directory name for a given plugin name for a
|
// getPodPluginDir returns a data directory name for a given plugin name for a
|
||||||
@ -126,7 +129,7 @@ func (kl *Kubelet) getPodContainerDir(podUID types.UID, ctrName string) string {
|
|||||||
// old && new = use new (but warn)
|
// old && new = use new (but warn)
|
||||||
oldPath := path.Join(kl.getPodDir(podUID), ctrName)
|
oldPath := path.Join(kl.getPodDir(podUID), ctrName)
|
||||||
oldExists := dirExists(oldPath)
|
oldExists := dirExists(oldPath)
|
||||||
newPath := path.Join(kl.getPodDir(podUID), "containers", ctrName)
|
newPath := path.Join(kl.getPodDir(podUID), options.DefaultKubeletContainersDirName, ctrName)
|
||||||
newExists := dirExists(newPath)
|
newExists := dirExists(newPath)
|
||||||
if oldExists && !newExists {
|
if oldExists && !newExists {
|
||||||
return oldPath
|
return oldPath
|
||||||
@ -234,3 +237,32 @@ func (kl *Kubelet) getHostIPAnyWay() (net.IP, error) {
|
|||||||
func (kl *Kubelet) GetExtraSupplementalGroupsForPod(pod *api.Pod) []int64 {
|
func (kl *Kubelet) GetExtraSupplementalGroupsForPod(pod *api.Pod) []int64 {
|
||||||
return kl.volumeManager.GetExtraSupplementalGroupsForPod(pod)
|
return kl.volumeManager.GetExtraSupplementalGroupsForPod(pod)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// getPodVolumeNameListFromDisk returns a list of the volume names by reading the
|
||||||
|
// volume directories for the given pod from the disk.
|
||||||
|
func (kl *Kubelet) getPodVolumeNameListFromDisk(podUID types.UID) ([]string, error) {
|
||||||
|
volumes := []string{}
|
||||||
|
podVolDir := kl.getPodVolumesDir(podUID)
|
||||||
|
volumePluginDirs, err := ioutil.ReadDir(podVolDir)
|
||||||
|
if err != nil {
|
||||||
|
glog.Errorf("Could not read directory %s: %v", podVolDir, err)
|
||||||
|
return volumes, err
|
||||||
|
}
|
||||||
|
for _, volumePluginDir := range volumePluginDirs {
|
||||||
|
volumePluginName := volumePluginDir.Name()
|
||||||
|
volumePluginPath := path.Join(podVolDir, volumePluginName)
|
||||||
|
volumeDirs, volumeDirsStatErrs, err := util.ReadDirNoExit(volumePluginPath)
|
||||||
|
if err != nil {
|
||||||
|
return volumes, fmt.Errorf("Could not read directory %s: %v", volumePluginPath, err)
|
||||||
|
}
|
||||||
|
for i, volumeDir := range volumeDirs {
|
||||||
|
if volumeDir != nil {
|
||||||
|
volumes = append(volumes, volumeDir.Name())
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
glog.Errorf("Could not read directory %s: %v", podVolDir, volumeDirsStatErrs[i])
|
||||||
|
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return volumes, nil
|
||||||
|
}
|
||||||
|
@ -248,7 +248,8 @@ func newTestKubeletWithImageList(
|
|||||||
fakeKubeClient,
|
fakeKubeClient,
|
||||||
kubelet.volumePluginMgr,
|
kubelet.volumePluginMgr,
|
||||||
fakeRuntime,
|
fakeRuntime,
|
||||||
kubelet.mounter)
|
kubelet.mounter,
|
||||||
|
kubelet.getPodsDir())
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatalf("failed to initialize volume manager: %v", err)
|
t.Fatalf("failed to initialize volume manager: %v", err)
|
||||||
}
|
}
|
||||||
@ -404,8 +405,7 @@ func TestVolumeAttachAndMountControllerDisabled(t *testing.T) {
|
|||||||
},
|
},
|
||||||
})
|
})
|
||||||
|
|
||||||
stopCh := make(chan struct{})
|
stopCh := runVolumeManager(kubelet)
|
||||||
go kubelet.volumeManager.Run(stopCh)
|
|
||||||
defer func() {
|
defer func() {
|
||||||
close(stopCh)
|
close(stopCh)
|
||||||
}()
|
}()
|
||||||
@ -474,8 +474,7 @@ func TestVolumeUnmountAndDetachControllerDisabled(t *testing.T) {
|
|||||||
},
|
},
|
||||||
})
|
})
|
||||||
|
|
||||||
stopCh := make(chan struct{})
|
stopCh := runVolumeManager(kubelet)
|
||||||
go kubelet.volumeManager.Run(stopCh)
|
|
||||||
defer func() {
|
defer func() {
|
||||||
close(stopCh)
|
close(stopCh)
|
||||||
}()
|
}()
|
||||||
@ -603,8 +602,7 @@ func TestVolumeAttachAndMountControllerEnabled(t *testing.T) {
|
|||||||
},
|
},
|
||||||
})
|
})
|
||||||
|
|
||||||
stopCh := make(chan struct{})
|
stopCh := runVolumeManager(kubelet)
|
||||||
go kubelet.volumeManager.Run(stopCh)
|
|
||||||
defer func() {
|
defer func() {
|
||||||
close(stopCh)
|
close(stopCh)
|
||||||
}()
|
}()
|
||||||
@ -697,8 +695,7 @@ func TestVolumeUnmountAndDetachControllerEnabled(t *testing.T) {
|
|||||||
},
|
},
|
||||||
})
|
})
|
||||||
|
|
||||||
stopCh := make(chan struct{})
|
stopCh := runVolumeManager(kubelet)
|
||||||
go kubelet.volumeManager.Run(stopCh)
|
|
||||||
defer func() {
|
defer func() {
|
||||||
close(stopCh)
|
close(stopCh)
|
||||||
}()
|
}()
|
||||||
@ -856,8 +853,7 @@ func TestPodVolumesExist(t *testing.T) {
|
|||||||
},
|
},
|
||||||
}
|
}
|
||||||
|
|
||||||
stopCh := make(chan struct{})
|
stopCh := runVolumeManager(kubelet)
|
||||||
go kubelet.volumeManager.Run(stopCh)
|
|
||||||
defer func() {
|
defer func() {
|
||||||
close(stopCh)
|
close(stopCh)
|
||||||
}()
|
}()
|
||||||
@ -3939,3 +3935,9 @@ func simulateVolumeInUseUpdate(
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func runVolumeManager(kubelet *Kubelet) chan struct{} {
|
||||||
|
stopCh := make(chan struct{})
|
||||||
|
go kubelet.volumeManager.Run(kubelet.sourcesReady, stopCh)
|
||||||
|
return stopCh
|
||||||
|
}
|
||||||
|
@ -99,7 +99,8 @@ func TestRunOnce(t *testing.T) {
|
|||||||
kb.kubeClient,
|
kb.kubeClient,
|
||||||
kb.volumePluginMgr,
|
kb.volumePluginMgr,
|
||||||
fakeRuntime,
|
fakeRuntime,
|
||||||
kb.mounter)
|
kb.mounter,
|
||||||
|
kb.getPodsDir())
|
||||||
|
|
||||||
kb.networkPlugin, _ = network.InitNetworkPlugin([]network.NetworkPlugin{}, "", nettest.NewFakeHost(nil), componentconfig.HairpinNone, kb.nonMasqueradeCIDR)
|
kb.networkPlugin, _ = network.InitNetworkPlugin([]network.NetworkPlugin{}, "", nettest.NewFakeHost(nil), componentconfig.HairpinNone, kb.nonMasqueradeCIDR)
|
||||||
// TODO: Factor out "StatsProvider" from Kubelet so we don't have a cyclic dependency
|
// TODO: Factor out "StatsProvider" from Kubelet so we don't have a cyclic dependency
|
||||||
|
@ -136,6 +136,11 @@ type ActualStateOfWorld interface {
|
|||||||
// have no mountedPods. This list can be used to determine which volumes are
|
// have no mountedPods. This list can be used to determine which volumes are
|
||||||
// no longer referenced and may be globally unmounted and detached.
|
// no longer referenced and may be globally unmounted and detached.
|
||||||
GetUnmountedVolumes() []AttachedVolume
|
GetUnmountedVolumes() []AttachedVolume
|
||||||
|
|
||||||
|
// GetPods generates and returns a map of pods in which map is indexed
|
||||||
|
// with pod's unique name. This map can be used to determine which pod is currently
|
||||||
|
// in actual state of world.
|
||||||
|
GetPods() map[volumetypes.UniquePodName]bool
|
||||||
}
|
}
|
||||||
|
|
||||||
// MountedVolume represents a volume that has successfully been mounted to a pod.
|
// MountedVolume represents a volume that has successfully been mounted to a pod.
|
||||||
@ -573,6 +578,21 @@ func (asw *actualStateOfWorld) GetUnmountedVolumes() []AttachedVolume {
|
|||||||
return unmountedVolumes
|
return unmountedVolumes
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (asw *actualStateOfWorld) GetPods() map[volumetypes.UniquePodName]bool {
|
||||||
|
asw.RLock()
|
||||||
|
defer asw.RUnlock()
|
||||||
|
|
||||||
|
podList := make(map[volumetypes.UniquePodName]bool)
|
||||||
|
for _, volumeObj := range asw.attachedVolumes {
|
||||||
|
for podName := range volumeObj.mountedPods {
|
||||||
|
if !podList[podName] {
|
||||||
|
podList[podName] = true
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return podList
|
||||||
|
}
|
||||||
|
|
||||||
func (asw *actualStateOfWorld) newAttachedVolume(
|
func (asw *actualStateOfWorld) newAttachedVolume(
|
||||||
attachedVolume *attachedVolume) AttachedVolume {
|
attachedVolume *attachedVolume) AttachedVolume {
|
||||||
return AttachedVolume{
|
return AttachedVolume{
|
||||||
|
@ -92,6 +92,11 @@ type DesiredStateOfWorld interface {
|
|||||||
// attached to this node and the pods they should be mounted to based on the
|
// attached to this node and the pods they should be mounted to based on the
|
||||||
// current desired state of the world.
|
// current desired state of the world.
|
||||||
GetVolumesToMount() []VolumeToMount
|
GetVolumesToMount() []VolumeToMount
|
||||||
|
|
||||||
|
// GetPods generates and returns a map of pods in which map is indexed
|
||||||
|
// with pod's unique name. This map can be used to determine which pod is currently
|
||||||
|
// in desired state of world.
|
||||||
|
GetPods() map[types.UniquePodName]bool
|
||||||
}
|
}
|
||||||
|
|
||||||
// VolumeToMount represents a volume that is attached to this node and needs to
|
// VolumeToMount represents a volume that is attached to this node and needs to
|
||||||
@ -117,6 +122,7 @@ type desiredStateOfWorld struct {
|
|||||||
// volumePluginMgr is the volume plugin manager used to create volume
|
// volumePluginMgr is the volume plugin manager used to create volume
|
||||||
// plugin objects.
|
// plugin objects.
|
||||||
volumePluginMgr *volume.VolumePluginMgr
|
volumePluginMgr *volume.VolumePluginMgr
|
||||||
|
|
||||||
sync.RWMutex
|
sync.RWMutex
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -203,7 +209,7 @@ func (dsw *desiredStateOfWorld) AddPodToVolume(
|
|||||||
} else {
|
} else {
|
||||||
// For non-attachable volumes, generate a unique name based on the pod
|
// For non-attachable volumes, generate a unique name based on the pod
|
||||||
// namespace and name and the name of the volume within the pod.
|
// namespace and name and the name of the volume within the pod.
|
||||||
volumeName = volumehelper.GetUniqueVolumeNameForNonAttachableVolume(podName, volumePlugin, outerVolumeSpecName)
|
volumeName = volumehelper.GetUniqueVolumeNameForNonAttachableVolume(podName, volumePlugin, volumeSpec)
|
||||||
}
|
}
|
||||||
|
|
||||||
volumeObj, volumeExists := dsw.volumesToMount[volumeName]
|
volumeObj, volumeExists := dsw.volumesToMount[volumeName]
|
||||||
@ -296,6 +302,21 @@ func (dsw *desiredStateOfWorld) PodExistsInVolume(
|
|||||||
return podExists
|
return podExists
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (dsw *desiredStateOfWorld) GetPods() map[types.UniquePodName]bool {
|
||||||
|
dsw.RLock()
|
||||||
|
defer dsw.RUnlock()
|
||||||
|
|
||||||
|
podList := make(map[types.UniquePodName]bool)
|
||||||
|
for _, volumeObj := range dsw.volumesToMount {
|
||||||
|
for podName := range volumeObj.podsToMount {
|
||||||
|
if !podList[podName] {
|
||||||
|
podList[podName] = true
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return podList
|
||||||
|
}
|
||||||
|
|
||||||
func (dsw *desiredStateOfWorld) GetVolumesToMount() []VolumeToMount {
|
func (dsw *desiredStateOfWorld) GetVolumesToMount() []VolumeToMount {
|
||||||
dsw.RLock()
|
dsw.RLock()
|
||||||
defer dsw.RUnlock()
|
defer dsw.RUnlock()
|
||||||
|
@ -20,16 +20,27 @@ limitations under the License.
|
|||||||
package reconciler
|
package reconciler
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"fmt"
|
||||||
|
"io/ioutil"
|
||||||
|
"path"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/golang/glog"
|
"github.com/golang/glog"
|
||||||
|
"k8s.io/kubernetes/cmd/kubelet/app/options"
|
||||||
|
"k8s.io/kubernetes/pkg/api"
|
||||||
"k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset"
|
"k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset"
|
||||||
|
"k8s.io/kubernetes/pkg/kubelet/config"
|
||||||
"k8s.io/kubernetes/pkg/kubelet/volumemanager/cache"
|
"k8s.io/kubernetes/pkg/kubelet/volumemanager/cache"
|
||||||
|
"k8s.io/kubernetes/pkg/types"
|
||||||
"k8s.io/kubernetes/pkg/util/goroutinemap/exponentialbackoff"
|
"k8s.io/kubernetes/pkg/util/goroutinemap/exponentialbackoff"
|
||||||
"k8s.io/kubernetes/pkg/util/mount"
|
"k8s.io/kubernetes/pkg/util/mount"
|
||||||
|
"k8s.io/kubernetes/pkg/util/strings"
|
||||||
"k8s.io/kubernetes/pkg/util/wait"
|
"k8s.io/kubernetes/pkg/util/wait"
|
||||||
|
"k8s.io/kubernetes/pkg/volume"
|
||||||
"k8s.io/kubernetes/pkg/volume/util/nestedpendingoperations"
|
"k8s.io/kubernetes/pkg/volume/util/nestedpendingoperations"
|
||||||
"k8s.io/kubernetes/pkg/volume/util/operationexecutor"
|
"k8s.io/kubernetes/pkg/volume/util/operationexecutor"
|
||||||
|
volumetypes "k8s.io/kubernetes/pkg/volume/util/types"
|
||||||
|
"k8s.io/kubernetes/pkg/volume/util/volumehelper"
|
||||||
)
|
)
|
||||||
|
|
||||||
// Reconciler runs a periodic loop to reconcile the desired state of the world
|
// Reconciler runs a periodic loop to reconcile the desired state of the world
|
||||||
@ -46,7 +57,7 @@ type Reconciler interface {
|
|||||||
// If attach/detach management is enabled, the manager will also check if
|
// If attach/detach management is enabled, the manager will also check if
|
||||||
// volumes that should be attached are attached and volumes that should
|
// volumes that should be attached are attached and volumes that should
|
||||||
// be detached are detached and trigger attach/detach operations as needed.
|
// be detached are detached and trigger attach/detach operations as needed.
|
||||||
Run(stopCh <-chan struct{})
|
Run(sourcesReady config.SourcesReady, stopCh <-chan struct{})
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewReconciler returns a new instance of Reconciler.
|
// NewReconciler returns a new instance of Reconciler.
|
||||||
@ -56,6 +67,8 @@ type Reconciler interface {
|
|||||||
// this node, and therefore the volume manager should not
|
// this node, and therefore the volume manager should not
|
||||||
// loopSleepDuration - the amount of time the reconciler loop sleeps between
|
// loopSleepDuration - the amount of time the reconciler loop sleeps between
|
||||||
// successive executions
|
// successive executions
|
||||||
|
// reconstructDuration - the amount of time the reconstruct sleeps between
|
||||||
|
// successive executions
|
||||||
// waitForAttachTimeout - the amount of time the Mount function will wait for
|
// waitForAttachTimeout - the amount of time the Mount function will wait for
|
||||||
// the volume to be attached
|
// the volume to be attached
|
||||||
// hostName - the hostname for this node, used by Attach and Detach methods
|
// hostName - the hostname for this node, used by Attach and Detach methods
|
||||||
@ -65,26 +78,34 @@ type Reconciler interface {
|
|||||||
// safely (prevents more than one operation from being triggered on the same
|
// safely (prevents more than one operation from being triggered on the same
|
||||||
// volume)
|
// volume)
|
||||||
// mounter - mounter passed in from kubelet, passed down unmount path
|
// mounter - mounter passed in from kubelet, passed down unmount path
|
||||||
|
// volumePluginMrg - volume plugin manager passed from kubelet
|
||||||
func NewReconciler(
|
func NewReconciler(
|
||||||
kubeClient internalclientset.Interface,
|
kubeClient internalclientset.Interface,
|
||||||
controllerAttachDetachEnabled bool,
|
controllerAttachDetachEnabled bool,
|
||||||
loopSleepDuration time.Duration,
|
loopSleepDuration time.Duration,
|
||||||
|
reconstructDuration time.Duration,
|
||||||
waitForAttachTimeout time.Duration,
|
waitForAttachTimeout time.Duration,
|
||||||
hostName string,
|
hostName string,
|
||||||
desiredStateOfWorld cache.DesiredStateOfWorld,
|
desiredStateOfWorld cache.DesiredStateOfWorld,
|
||||||
actualStateOfWorld cache.ActualStateOfWorld,
|
actualStateOfWorld cache.ActualStateOfWorld,
|
||||||
operationExecutor operationexecutor.OperationExecutor,
|
operationExecutor operationexecutor.OperationExecutor,
|
||||||
mounter mount.Interface) Reconciler {
|
mounter mount.Interface,
|
||||||
|
volumePluginMgr *volume.VolumePluginMgr,
|
||||||
|
kubeletPodsDir string) Reconciler {
|
||||||
return &reconciler{
|
return &reconciler{
|
||||||
kubeClient: kubeClient,
|
kubeClient: kubeClient,
|
||||||
controllerAttachDetachEnabled: controllerAttachDetachEnabled,
|
controllerAttachDetachEnabled: controllerAttachDetachEnabled,
|
||||||
loopSleepDuration: loopSleepDuration,
|
loopSleepDuration: loopSleepDuration,
|
||||||
|
reconstructDuration: reconstructDuration,
|
||||||
waitForAttachTimeout: waitForAttachTimeout,
|
waitForAttachTimeout: waitForAttachTimeout,
|
||||||
hostName: hostName,
|
hostName: hostName,
|
||||||
desiredStateOfWorld: desiredStateOfWorld,
|
desiredStateOfWorld: desiredStateOfWorld,
|
||||||
actualStateOfWorld: actualStateOfWorld,
|
actualStateOfWorld: actualStateOfWorld,
|
||||||
operationExecutor: operationExecutor,
|
operationExecutor: operationExecutor,
|
||||||
mounter: mounter,
|
mounter: mounter,
|
||||||
|
volumePluginMgr: volumePluginMgr,
|
||||||
|
kubeletPodsDir: kubeletPodsDir,
|
||||||
|
timeOfLastReconstruct: time.Now(),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -92,149 +113,95 @@ type reconciler struct {
|
|||||||
kubeClient internalclientset.Interface
|
kubeClient internalclientset.Interface
|
||||||
controllerAttachDetachEnabled bool
|
controllerAttachDetachEnabled bool
|
||||||
loopSleepDuration time.Duration
|
loopSleepDuration time.Duration
|
||||||
|
reconstructDuration time.Duration
|
||||||
waitForAttachTimeout time.Duration
|
waitForAttachTimeout time.Duration
|
||||||
hostName string
|
hostName string
|
||||||
desiredStateOfWorld cache.DesiredStateOfWorld
|
desiredStateOfWorld cache.DesiredStateOfWorld
|
||||||
actualStateOfWorld cache.ActualStateOfWorld
|
actualStateOfWorld cache.ActualStateOfWorld
|
||||||
operationExecutor operationexecutor.OperationExecutor
|
operationExecutor operationexecutor.OperationExecutor
|
||||||
mounter mount.Interface
|
mounter mount.Interface
|
||||||
|
volumePluginMgr *volume.VolumePluginMgr
|
||||||
|
kubeletPodsDir string
|
||||||
|
timeOfLastReconstruct time.Time
|
||||||
}
|
}
|
||||||
|
|
||||||
func (rc *reconciler) Run(stopCh <-chan struct{}) {
|
func (rc *reconciler) Run(sourcesReady config.SourcesReady, stopCh <-chan struct{}) {
|
||||||
wait.Until(rc.reconciliationLoopFunc(), rc.loopSleepDuration, stopCh)
|
wait.Until(rc.reconciliationLoopFunc(sourcesReady), rc.loopSleepDuration, stopCh)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (rc *reconciler) reconciliationLoopFunc() func() {
|
func (rc *reconciler) reconciliationLoopFunc(sourcesReady config.SourcesReady) func() {
|
||||||
return func() {
|
return func() {
|
||||||
// Unmounts are triggered before mounts so that a volume that was
|
rc.reconcile()
|
||||||
// referenced by a pod that was deleted and is now referenced by another
|
|
||||||
// pod is unmounted from the first pod before being mounted to the new
|
|
||||||
// pod.
|
|
||||||
|
|
||||||
// Ensure volumes that should be unmounted are unmounted.
|
// Add all sources ready check so that reconciler's reconstruct process will start after
|
||||||
for _, mountedVolume := range rc.actualStateOfWorld.GetMountedVolumes() {
|
// desired state of world is populated with pod volume information from different sources. Otherwise,
|
||||||
if !rc.desiredStateOfWorld.PodExistsInVolume(mountedVolume.PodName, mountedVolume.VolumeName) {
|
// reconciler's reconstruct process may add incomplete volume information and cause confusion.
|
||||||
// Volume is mounted, unmount it
|
// In addition, if some sources are not ready, the reconstruct process may clean up pods' volumes
|
||||||
glog.V(12).Infof("Attempting to start UnmountVolume for volume %q (spec.Name: %q) from pod %q (UID: %q).",
|
// that are still in use because desired states could not get a complete list of pods.
|
||||||
|
if sourcesReady.AllReady() && time.Since(rc.timeOfLastReconstruct) > rc.reconstructDuration {
|
||||||
|
glog.V(5).Infof("Sources are all ready, starting reconstruct state function")
|
||||||
|
rc.reconstruct()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (rc *reconciler) reconcile() {
|
||||||
|
// Unmounts are triggered before mounts so that a volume that was
|
||||||
|
// referenced by a pod that was deleted and is now referenced by another
|
||||||
|
// pod is unmounted from the first pod before being mounted to the new
|
||||||
|
// pod.
|
||||||
|
|
||||||
|
// Ensure volumes that should be unmounted are unmounted.
|
||||||
|
for _, mountedVolume := range rc.actualStateOfWorld.GetMountedVolumes() {
|
||||||
|
if !rc.desiredStateOfWorld.PodExistsInVolume(mountedVolume.PodName, mountedVolume.VolumeName) {
|
||||||
|
// Volume is mounted, unmount it
|
||||||
|
glog.V(12).Infof("Attempting to start UnmountVolume for volume %q (spec.Name: %q) from pod %q (UID: %q).",
|
||||||
|
mountedVolume.VolumeName,
|
||||||
|
mountedVolume.OuterVolumeSpecName,
|
||||||
|
mountedVolume.PodName,
|
||||||
|
mountedVolume.PodUID)
|
||||||
|
err := rc.operationExecutor.UnmountVolume(
|
||||||
|
mountedVolume.MountedVolume, rc.actualStateOfWorld)
|
||||||
|
if err != nil &&
|
||||||
|
!nestedpendingoperations.IsAlreadyExists(err) &&
|
||||||
|
!exponentialbackoff.IsExponentialBackoff(err) {
|
||||||
|
// Ignore nestedpendingoperations.IsAlreadyExists and exponentialbackoff.IsExponentialBackoff errors, they are expected.
|
||||||
|
// Log all other errors.
|
||||||
|
glog.Errorf(
|
||||||
|
"operationExecutor.UnmountVolume failed for volume %q (spec.Name: %q) pod %q (UID: %q) controllerAttachDetachEnabled: %v with err: %v",
|
||||||
|
mountedVolume.VolumeName,
|
||||||
|
mountedVolume.OuterVolumeSpecName,
|
||||||
|
mountedVolume.PodName,
|
||||||
|
mountedVolume.PodUID,
|
||||||
|
rc.controllerAttachDetachEnabled,
|
||||||
|
err)
|
||||||
|
}
|
||||||
|
if err == nil {
|
||||||
|
glog.Infof("UnmountVolume operation started for volume %q (spec.Name: %q) from pod %q (UID: %q).",
|
||||||
mountedVolume.VolumeName,
|
mountedVolume.VolumeName,
|
||||||
mountedVolume.OuterVolumeSpecName,
|
mountedVolume.OuterVolumeSpecName,
|
||||||
mountedVolume.PodName,
|
mountedVolume.PodName,
|
||||||
mountedVolume.PodUID)
|
mountedVolume.PodUID)
|
||||||
err := rc.operationExecutor.UnmountVolume(
|
|
||||||
mountedVolume.MountedVolume, rc.actualStateOfWorld)
|
|
||||||
if err != nil &&
|
|
||||||
!nestedpendingoperations.IsAlreadyExists(err) &&
|
|
||||||
!exponentialbackoff.IsExponentialBackoff(err) {
|
|
||||||
// Ignore nestedpendingoperations.IsAlreadyExists and exponentialbackoff.IsExponentialBackoff errors, they are expected.
|
|
||||||
// Log all other errors.
|
|
||||||
glog.Errorf(
|
|
||||||
"operationExecutor.UnmountVolume failed for volume %q (spec.Name: %q) pod %q (UID: %q) controllerAttachDetachEnabled: %v with err: %v",
|
|
||||||
mountedVolume.VolumeName,
|
|
||||||
mountedVolume.OuterVolumeSpecName,
|
|
||||||
mountedVolume.PodName,
|
|
||||||
mountedVolume.PodUID,
|
|
||||||
rc.controllerAttachDetachEnabled,
|
|
||||||
err)
|
|
||||||
}
|
|
||||||
if err == nil {
|
|
||||||
glog.Infof("UnmountVolume operation started for volume %q (spec.Name: %q) from pod %q (UID: %q).",
|
|
||||||
mountedVolume.VolumeName,
|
|
||||||
mountedVolume.OuterVolumeSpecName,
|
|
||||||
mountedVolume.PodName,
|
|
||||||
mountedVolume.PodUID)
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// Ensure volumes that should be attached/mounted are attached/mounted.
|
// Ensure volumes that should be attached/mounted are attached/mounted.
|
||||||
for _, volumeToMount := range rc.desiredStateOfWorld.GetVolumesToMount() {
|
for _, volumeToMount := range rc.desiredStateOfWorld.GetVolumesToMount() {
|
||||||
volMounted, devicePath, err := rc.actualStateOfWorld.PodExistsInVolume(volumeToMount.PodName, volumeToMount.VolumeName)
|
volMounted, devicePath, err := rc.actualStateOfWorld.PodExistsInVolume(volumeToMount.PodName, volumeToMount.VolumeName)
|
||||||
volumeToMount.DevicePath = devicePath
|
volumeToMount.DevicePath = devicePath
|
||||||
if cache.IsVolumeNotAttachedError(err) {
|
if cache.IsVolumeNotAttachedError(err) {
|
||||||
if rc.controllerAttachDetachEnabled || !volumeToMount.PluginIsAttachable {
|
if rc.controllerAttachDetachEnabled || !volumeToMount.PluginIsAttachable {
|
||||||
// Volume is not attached (or doesn't implement attacher), kubelet attach is disabled, wait
|
// Volume is not attached (or doesn't implement attacher), kubelet attach is disabled, wait
|
||||||
// for controller to finish attaching volume.
|
// for controller to finish attaching volume.
|
||||||
glog.V(12).Infof("Attempting to start VerifyControllerAttachedVolume for volume %q (spec.Name: %q) pod %q (UID: %q)",
|
glog.V(12).Infof("Attempting to start VerifyControllerAttachedVolume for volume %q (spec.Name: %q) pod %q (UID: %q)",
|
||||||
volumeToMount.VolumeName,
|
|
||||||
volumeToMount.VolumeSpec.Name(),
|
|
||||||
volumeToMount.PodName,
|
|
||||||
volumeToMount.Pod.UID)
|
|
||||||
err := rc.operationExecutor.VerifyControllerAttachedVolume(
|
|
||||||
volumeToMount.VolumeToMount,
|
|
||||||
rc.hostName,
|
|
||||||
rc.actualStateOfWorld)
|
|
||||||
if err != nil &&
|
|
||||||
!nestedpendingoperations.IsAlreadyExists(err) &&
|
|
||||||
!exponentialbackoff.IsExponentialBackoff(err) {
|
|
||||||
// Ignore nestedpendingoperations.IsAlreadyExists and exponentialbackoff.IsExponentialBackoff errors, they are expected.
|
|
||||||
// Log all other errors.
|
|
||||||
glog.Errorf(
|
|
||||||
"operationExecutor.VerifyControllerAttachedVolume failed for volume %q (spec.Name: %q) pod %q (UID: %q) controllerAttachDetachEnabled: %v with err: %v",
|
|
||||||
volumeToMount.VolumeName,
|
|
||||||
volumeToMount.VolumeSpec.Name(),
|
|
||||||
volumeToMount.PodName,
|
|
||||||
volumeToMount.Pod.UID,
|
|
||||||
rc.controllerAttachDetachEnabled,
|
|
||||||
err)
|
|
||||||
}
|
|
||||||
if err == nil {
|
|
||||||
glog.Infof("VerifyControllerAttachedVolume operation started for volume %q (spec.Name: %q) pod %q (UID: %q)",
|
|
||||||
volumeToMount.VolumeName,
|
|
||||||
volumeToMount.VolumeSpec.Name(),
|
|
||||||
volumeToMount.PodName,
|
|
||||||
volumeToMount.Pod.UID)
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
// Volume is not attached to node, kubelet attach is enabled, volume implements an attacher,
|
|
||||||
// so attach it
|
|
||||||
volumeToAttach := operationexecutor.VolumeToAttach{
|
|
||||||
VolumeName: volumeToMount.VolumeName,
|
|
||||||
VolumeSpec: volumeToMount.VolumeSpec,
|
|
||||||
NodeName: rc.hostName,
|
|
||||||
}
|
|
||||||
glog.V(12).Infof("Attempting to start AttachVolume for volume %q (spec.Name: %q) pod %q (UID: %q)",
|
|
||||||
volumeToMount.VolumeName,
|
|
||||||
volumeToMount.VolumeSpec.Name(),
|
|
||||||
volumeToMount.PodName,
|
|
||||||
volumeToMount.Pod.UID)
|
|
||||||
err := rc.operationExecutor.AttachVolume(volumeToAttach, rc.actualStateOfWorld)
|
|
||||||
if err != nil &&
|
|
||||||
!nestedpendingoperations.IsAlreadyExists(err) &&
|
|
||||||
!exponentialbackoff.IsExponentialBackoff(err) {
|
|
||||||
// Ignore nestedpendingoperations.IsAlreadyExists and exponentialbackoff.IsExponentialBackoff errors, they are expected.
|
|
||||||
// Log all other errors.
|
|
||||||
glog.Errorf(
|
|
||||||
"operationExecutor.AttachVolume failed for volume %q (spec.Name: %q) pod %q (UID: %q) controllerAttachDetachEnabled: %v with err: %v",
|
|
||||||
volumeToMount.VolumeName,
|
|
||||||
volumeToMount.VolumeSpec.Name(),
|
|
||||||
volumeToMount.PodName,
|
|
||||||
volumeToMount.Pod.UID,
|
|
||||||
rc.controllerAttachDetachEnabled,
|
|
||||||
err)
|
|
||||||
}
|
|
||||||
if err == nil {
|
|
||||||
glog.Infof("AttachVolume operation started for volume %q (spec.Name: %q) pod %q (UID: %q)",
|
|
||||||
volumeToMount.VolumeName,
|
|
||||||
volumeToMount.VolumeSpec.Name(),
|
|
||||||
volumeToMount.PodName,
|
|
||||||
volumeToMount.Pod.UID)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
} else if !volMounted || cache.IsRemountRequiredError(err) {
|
|
||||||
// Volume is not mounted, or is already mounted, but requires remounting
|
|
||||||
remountingLogStr := ""
|
|
||||||
if cache.IsRemountRequiredError(err) {
|
|
||||||
remountingLogStr = "Volume is already mounted to pod, but remount was requested."
|
|
||||||
}
|
|
||||||
glog.V(12).Infof("Attempting to start MountVolume for volume %q (spec.Name: %q) to pod %q (UID: %q). %s",
|
|
||||||
volumeToMount.VolumeName,
|
volumeToMount.VolumeName,
|
||||||
volumeToMount.VolumeSpec.Name(),
|
volumeToMount.VolumeSpec.Name(),
|
||||||
volumeToMount.PodName,
|
volumeToMount.PodName,
|
||||||
volumeToMount.Pod.UID,
|
volumeToMount.Pod.UID)
|
||||||
remountingLogStr)
|
err := rc.operationExecutor.VerifyControllerAttachedVolume(
|
||||||
err := rc.operationExecutor.MountVolume(
|
|
||||||
rc.waitForAttachTimeout,
|
|
||||||
volumeToMount.VolumeToMount,
|
volumeToMount.VolumeToMount,
|
||||||
|
rc.hostName,
|
||||||
rc.actualStateOfWorld)
|
rc.actualStateOfWorld)
|
||||||
if err != nil &&
|
if err != nil &&
|
||||||
!nestedpendingoperations.IsAlreadyExists(err) &&
|
!nestedpendingoperations.IsAlreadyExists(err) &&
|
||||||
@ -242,7 +209,7 @@ func (rc *reconciler) reconciliationLoopFunc() func() {
|
|||||||
// Ignore nestedpendingoperations.IsAlreadyExists and exponentialbackoff.IsExponentialBackoff errors, they are expected.
|
// Ignore nestedpendingoperations.IsAlreadyExists and exponentialbackoff.IsExponentialBackoff errors, they are expected.
|
||||||
// Log all other errors.
|
// Log all other errors.
|
||||||
glog.Errorf(
|
glog.Errorf(
|
||||||
"operationExecutor.MountVolume failed for volume %q (spec.Name: %q) pod %q (UID: %q) controllerAttachDetachEnabled: %v with err: %v",
|
"operationExecutor.VerifyControllerAttachedVolume failed for volume %q (spec.Name: %q) pod %q (UID: %q) controllerAttachDetachEnabled: %v with err: %v",
|
||||||
volumeToMount.VolumeName,
|
volumeToMount.VolumeName,
|
||||||
volumeToMount.VolumeSpec.Name(),
|
volumeToMount.VolumeSpec.Name(),
|
||||||
volumeToMount.PodName,
|
volumeToMount.PodName,
|
||||||
@ -251,77 +218,341 @@ func (rc *reconciler) reconciliationLoopFunc() func() {
|
|||||||
err)
|
err)
|
||||||
}
|
}
|
||||||
if err == nil {
|
if err == nil {
|
||||||
glog.Infof("MountVolume operation started for volume %q (spec.Name: %q) to pod %q (UID: %q). %s",
|
glog.Infof("VerifyControllerAttachedVolume operation started for volume %q (spec.Name: %q) pod %q (UID: %q)",
|
||||||
|
volumeToMount.VolumeName,
|
||||||
|
volumeToMount.VolumeSpec.Name(),
|
||||||
|
volumeToMount.PodName,
|
||||||
|
volumeToMount.Pod.UID)
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
// Volume is not attached to node, kubelet attach is enabled, volume implements an attacher,
|
||||||
|
// so attach it
|
||||||
|
volumeToAttach := operationexecutor.VolumeToAttach{
|
||||||
|
VolumeName: volumeToMount.VolumeName,
|
||||||
|
VolumeSpec: volumeToMount.VolumeSpec,
|
||||||
|
NodeName: rc.hostName,
|
||||||
|
}
|
||||||
|
glog.V(12).Infof("Attempting to start AttachVolume for volume %q (spec.Name: %q) pod %q (UID: %q)",
|
||||||
|
volumeToMount.VolumeName,
|
||||||
|
volumeToMount.VolumeSpec.Name(),
|
||||||
|
volumeToMount.PodName,
|
||||||
|
volumeToMount.Pod.UID)
|
||||||
|
err := rc.operationExecutor.AttachVolume(volumeToAttach, rc.actualStateOfWorld)
|
||||||
|
if err != nil &&
|
||||||
|
!nestedpendingoperations.IsAlreadyExists(err) &&
|
||||||
|
!exponentialbackoff.IsExponentialBackoff(err) {
|
||||||
|
// Ignore nestedpendingoperations.IsAlreadyExists and exponentialbackoff.IsExponentialBackoff errors, they are expected.
|
||||||
|
// Log all other errors.
|
||||||
|
glog.Errorf(
|
||||||
|
"operationExecutor.AttachVolume failed for volume %q (spec.Name: %q) pod %q (UID: %q) controllerAttachDetachEnabled: %v with err: %v",
|
||||||
volumeToMount.VolumeName,
|
volumeToMount.VolumeName,
|
||||||
volumeToMount.VolumeSpec.Name(),
|
volumeToMount.VolumeSpec.Name(),
|
||||||
volumeToMount.PodName,
|
volumeToMount.PodName,
|
||||||
volumeToMount.Pod.UID,
|
volumeToMount.Pod.UID,
|
||||||
remountingLogStr)
|
rc.controllerAttachDetachEnabled,
|
||||||
|
err)
|
||||||
|
}
|
||||||
|
if err == nil {
|
||||||
|
glog.Infof("AttachVolume operation started for volume %q (spec.Name: %q) pod %q (UID: %q)",
|
||||||
|
volumeToMount.VolumeName,
|
||||||
|
volumeToMount.VolumeSpec.Name(),
|
||||||
|
volumeToMount.PodName,
|
||||||
|
volumeToMount.Pod.UID)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
} else if !volMounted || cache.IsRemountRequiredError(err) {
|
||||||
|
// Volume is not mounted, or is already mounted, but requires remounting
|
||||||
|
remountingLogStr := ""
|
||||||
|
if cache.IsRemountRequiredError(err) {
|
||||||
|
remountingLogStr = "Volume is already mounted to pod, but remount was requested."
|
||||||
|
}
|
||||||
|
glog.V(12).Infof("Attempting to start MountVolume for volume %q (spec.Name: %q) to pod %q (UID: %q). %s",
|
||||||
|
volumeToMount.VolumeName,
|
||||||
|
volumeToMount.VolumeSpec.Name(),
|
||||||
|
volumeToMount.PodName,
|
||||||
|
volumeToMount.Pod.UID,
|
||||||
|
remountingLogStr)
|
||||||
|
err := rc.operationExecutor.MountVolume(
|
||||||
|
rc.waitForAttachTimeout,
|
||||||
|
volumeToMount.VolumeToMount,
|
||||||
|
rc.actualStateOfWorld)
|
||||||
|
if err != nil &&
|
||||||
|
!nestedpendingoperations.IsAlreadyExists(err) &&
|
||||||
|
!exponentialbackoff.IsExponentialBackoff(err) {
|
||||||
|
// Ignore nestedpendingoperations.IsAlreadyExists and exponentialbackoff.IsExponentialBackoff errors, they are expected.
|
||||||
|
// Log all other errors.
|
||||||
|
glog.Errorf(
|
||||||
|
"operationExecutor.MountVolume failed for volume %q (spec.Name: %q) pod %q (UID: %q) controllerAttachDetachEnabled: %v with err: %v",
|
||||||
|
volumeToMount.VolumeName,
|
||||||
|
volumeToMount.VolumeSpec.Name(),
|
||||||
|
volumeToMount.PodName,
|
||||||
|
volumeToMount.Pod.UID,
|
||||||
|
rc.controllerAttachDetachEnabled,
|
||||||
|
err)
|
||||||
|
}
|
||||||
|
if err == nil {
|
||||||
|
glog.Infof("MountVolume operation started for volume %q (spec.Name: %q) to pod %q (UID: %q). %s",
|
||||||
|
volumeToMount.VolumeName,
|
||||||
|
volumeToMount.VolumeSpec.Name(),
|
||||||
|
volumeToMount.PodName,
|
||||||
|
volumeToMount.Pod.UID,
|
||||||
|
remountingLogStr)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// Ensure devices that should be detached/unmounted are detached/unmounted.
|
// Ensure devices that should be detached/unmounted are detached/unmounted.
|
||||||
for _, attachedVolume := range rc.actualStateOfWorld.GetUnmountedVolumes() {
|
for _, attachedVolume := range rc.actualStateOfWorld.GetUnmountedVolumes() {
|
||||||
if !rc.desiredStateOfWorld.VolumeExists(attachedVolume.VolumeName) {
|
if !rc.desiredStateOfWorld.VolumeExists(attachedVolume.VolumeName) {
|
||||||
if attachedVolume.GloballyMounted {
|
if attachedVolume.GloballyMounted {
|
||||||
// Volume is globally mounted to device, unmount it
|
// Volume is globally mounted to device, unmount it
|
||||||
glog.V(12).Infof("Attempting to start UnmountDevice for volume %q (spec.Name: %q)",
|
glog.V(12).Infof("Attempting to start UnmountDevice for volume %q (spec.Name: %q)",
|
||||||
|
attachedVolume.VolumeName,
|
||||||
|
attachedVolume.VolumeSpec.Name())
|
||||||
|
err := rc.operationExecutor.UnmountDevice(
|
||||||
|
attachedVolume.AttachedVolume, rc.actualStateOfWorld, rc.mounter)
|
||||||
|
if err != nil &&
|
||||||
|
!nestedpendingoperations.IsAlreadyExists(err) &&
|
||||||
|
!exponentialbackoff.IsExponentialBackoff(err) {
|
||||||
|
// Ignore nestedpendingoperations.IsAlreadyExists and exponentialbackoff.IsExponentialBackoff errors, they are expected.
|
||||||
|
// Log all other errors.
|
||||||
|
glog.Errorf(
|
||||||
|
"operationExecutor.UnmountDevice failed for volume %q (spec.Name: %q) controllerAttachDetachEnabled: %v with err: %v",
|
||||||
|
attachedVolume.VolumeName,
|
||||||
|
attachedVolume.VolumeSpec.Name(),
|
||||||
|
rc.controllerAttachDetachEnabled,
|
||||||
|
err)
|
||||||
|
}
|
||||||
|
if err == nil {
|
||||||
|
glog.Infof("UnmountDevice operation started for volume %q (spec.Name: %q)",
|
||||||
attachedVolume.VolumeName,
|
attachedVolume.VolumeName,
|
||||||
attachedVolume.VolumeSpec.Name())
|
attachedVolume.VolumeSpec.Name())
|
||||||
err := rc.operationExecutor.UnmountDevice(
|
}
|
||||||
attachedVolume.AttachedVolume, rc.actualStateOfWorld, rc.mounter)
|
} else {
|
||||||
|
// Volume is attached to node, detach it
|
||||||
|
if rc.controllerAttachDetachEnabled || !attachedVolume.PluginIsAttachable {
|
||||||
|
// Kubelet not responsible for detaching or this volume has a non-attachable volume plugin,
|
||||||
|
// so just remove it to actualStateOfWorld without attach.
|
||||||
|
rc.actualStateOfWorld.MarkVolumeAsDetached(
|
||||||
|
attachedVolume.VolumeName, rc.hostName)
|
||||||
|
} else {
|
||||||
|
// Only detach if kubelet detach is enabled
|
||||||
|
glog.V(12).Infof("Attempting to start DetachVolume for volume %q (spec.Name: %q)",
|
||||||
|
attachedVolume.VolumeName,
|
||||||
|
attachedVolume.VolumeSpec.Name())
|
||||||
|
err := rc.operationExecutor.DetachVolume(
|
||||||
|
attachedVolume.AttachedVolume, false /* verifySafeToDetach */, rc.actualStateOfWorld)
|
||||||
if err != nil &&
|
if err != nil &&
|
||||||
!nestedpendingoperations.IsAlreadyExists(err) &&
|
!nestedpendingoperations.IsAlreadyExists(err) &&
|
||||||
!exponentialbackoff.IsExponentialBackoff(err) {
|
!exponentialbackoff.IsExponentialBackoff(err) {
|
||||||
// Ignore nestedpendingoperations.IsAlreadyExists and exponentialbackoff.IsExponentialBackoff errors, they are expected.
|
// Ignore nestedpendingoperations.IsAlreadyExists && exponentialbackoff.IsExponentialBackoff errors, they are expected.
|
||||||
// Log all other errors.
|
// Log all other errors.
|
||||||
glog.Errorf(
|
glog.Errorf(
|
||||||
"operationExecutor.UnmountDevice failed for volume %q (spec.Name: %q) controllerAttachDetachEnabled: %v with err: %v",
|
"operationExecutor.DetachVolume failed for volume %q (spec.Name: %q) controllerAttachDetachEnabled: %v with err: %v",
|
||||||
attachedVolume.VolumeName,
|
attachedVolume.VolumeName,
|
||||||
attachedVolume.VolumeSpec.Name(),
|
attachedVolume.VolumeSpec.Name(),
|
||||||
rc.controllerAttachDetachEnabled,
|
rc.controllerAttachDetachEnabled,
|
||||||
err)
|
err)
|
||||||
}
|
}
|
||||||
if err == nil {
|
if err == nil {
|
||||||
glog.Infof("UnmountDevice operation started for volume %q (spec.Name: %q)",
|
glog.Infof("DetachVolume operation started for volume %q (spec.Name: %q)",
|
||||||
attachedVolume.VolumeName,
|
attachedVolume.VolumeName,
|
||||||
attachedVolume.VolumeSpec.Name())
|
attachedVolume.VolumeSpec.Name())
|
||||||
}
|
}
|
||||||
} else {
|
|
||||||
// Volume is attached to node, detach it
|
|
||||||
if rc.controllerAttachDetachEnabled || !attachedVolume.PluginIsAttachable {
|
|
||||||
// Kubelet not responsible for detaching or this volume has a non-attachable volume plugin,
|
|
||||||
// so just remove it to actualStateOfWorld without attach.
|
|
||||||
rc.actualStateOfWorld.MarkVolumeAsDetached(
|
|
||||||
attachedVolume.VolumeName, rc.hostName)
|
|
||||||
} else {
|
|
||||||
// Only detach if kubelet detach is enabled
|
|
||||||
glog.V(12).Infof("Attempting to start DetachVolume for volume %q (spec.Name: %q)",
|
|
||||||
attachedVolume.VolumeName,
|
|
||||||
attachedVolume.VolumeSpec.Name())
|
|
||||||
err := rc.operationExecutor.DetachVolume(
|
|
||||||
attachedVolume.AttachedVolume, false /* verifySafeToDetach */, rc.actualStateOfWorld)
|
|
||||||
if err != nil &&
|
|
||||||
!nestedpendingoperations.IsAlreadyExists(err) &&
|
|
||||||
!exponentialbackoff.IsExponentialBackoff(err) {
|
|
||||||
// Ignore nestedpendingoperations.IsAlreadyExists && exponentialbackoff.IsExponentialBackoff errors, they are expected.
|
|
||||||
// Log all other errors.
|
|
||||||
glog.Errorf(
|
|
||||||
"operationExecutor.DetachVolume failed for volume %q (spec.Name: %q) controllerAttachDetachEnabled: %v with err: %v",
|
|
||||||
attachedVolume.VolumeName,
|
|
||||||
attachedVolume.VolumeSpec.Name(),
|
|
||||||
rc.controllerAttachDetachEnabled,
|
|
||||||
err)
|
|
||||||
}
|
|
||||||
if err == nil {
|
|
||||||
glog.Infof("DetachVolume operation started for volume %q (spec.Name: %q)",
|
|
||||||
attachedVolume.VolumeName,
|
|
||||||
attachedVolume.VolumeSpec.Name())
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// reconstruct process tries to observe the real world by scanning all pods' volume directories from the disk.
|
||||||
|
// If the actual and desired state of worlds are not consistent with the observed world, it means that some
|
||||||
|
// mounted volumes are left out probably during kubelet restart. This process will reconstruct
|
||||||
|
// the volumes and udpate the actual and desired states. In the following reconciler loop, those volumes will
|
||||||
|
// be cleaned up.
|
||||||
|
func (rc *reconciler) reconstruct() {
|
||||||
|
defer rc.updateReconstructTime()
|
||||||
|
rc.reconstructStates(rc.kubeletPodsDir)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (rc *reconciler) updateReconstructTime() {
|
||||||
|
rc.timeOfLastReconstruct = time.Now()
|
||||||
|
}
|
||||||
|
|
||||||
|
type podVolume struct {
|
||||||
|
podName volumetypes.UniquePodName
|
||||||
|
volumeSpecName string
|
||||||
|
mountPath string
|
||||||
|
pluginName string
|
||||||
|
}
|
||||||
|
|
||||||
|
// reconstructFromDisk scans the volume directories under the given pod directory. If the volume is not
|
||||||
|
// in either actual or desired state of world, or pending operation, this function will reconstuct
|
||||||
|
// the volume spec and put it in both the actual and desired state of worlds. If no running
|
||||||
|
// container is mounting the volume, the volume will be removed by desired state of world's populator and
|
||||||
|
// cleaned up by the reconciler.
|
||||||
|
func (rc *reconciler) reconstructStates(podsDir string) {
|
||||||
|
// Get volumes information by reading the pod's directory
|
||||||
|
podVolumes, err := getVolumesFromPodDir(podsDir)
|
||||||
|
if err != nil {
|
||||||
|
glog.Errorf("Cannot get volumes from disk %v", err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
for _, volume := range podVolumes {
|
||||||
|
volumeToMount, err := rc.reconstructVolume(volume)
|
||||||
|
if err != nil {
|
||||||
|
glog.Errorf("Could not construct volume information: %v", err)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
// Check if there is an pending operation for the given pod and volume.
|
||||||
|
// Need to check pending operation before checking the actual and desired
|
||||||
|
// states to avoid race condition during checking. For exmaple, the following
|
||||||
|
// might happen if pending operation is checked after checking actual and desired states.
|
||||||
|
// 1. Checking the pod and it does not exist in either actual or desired state.
|
||||||
|
// 2. An operation for the given pod finishes and the actual state is updated.
|
||||||
|
// 3. Checking and there is no pending operation for the given pod.
|
||||||
|
if rc.operationExecutor.IsOperationPending(volumeToMount.VolumeName, volumeToMount.PodName) {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
desiredPods := rc.desiredStateOfWorld.GetPods()
|
||||||
|
actualPods := rc.actualStateOfWorld.GetPods()
|
||||||
|
if desiredPods[volume.podName] || actualPods[volume.podName] {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
glog.V(3).Infof(
|
||||||
|
"Could not find pod information in desired or actual states or pending operation, update it in both states: %+v",
|
||||||
|
volumeToMount)
|
||||||
|
if err = rc.updateStates(volumeToMount); err != nil {
|
||||||
|
glog.Errorf("Error occured during reconstruct volume from disk: %v", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Reconstruct Volume object and volumeToMount data structure by reading the pod's volume directories
|
||||||
|
func (rc *reconciler) reconstructVolume(volume podVolume) (*operationexecutor.VolumeToMount, error) {
|
||||||
|
plugin, err := rc.volumePluginMgr.FindPluginByName(volume.pluginName)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
volumeSpec, err := plugin.ConstructVolumeSpec(volume.volumeSpecName, volume.mountPath)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
volumeName, err := plugin.GetVolumeName(volumeSpec)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
pod := &api.Pod{
|
||||||
|
ObjectMeta: api.ObjectMeta{
|
||||||
|
UID: types.UID(volume.podName),
|
||||||
|
},
|
||||||
|
}
|
||||||
|
attachablePlugin, err := rc.volumePluginMgr.FindAttachablePluginByName(volume.pluginName)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
var uniqueVolumeName api.UniqueVolumeName
|
||||||
|
if attachablePlugin != nil {
|
||||||
|
uniqueVolumeName = volumehelper.GetUniqueVolumeName(volume.pluginName, volumeName)
|
||||||
|
} else {
|
||||||
|
uniqueVolumeName = volumehelper.GetUniqueVolumeNameForNonAttachableVolume(volume.podName, plugin, volumeSpec)
|
||||||
|
}
|
||||||
|
|
||||||
|
volumeToMount := &operationexecutor.VolumeToMount{
|
||||||
|
VolumeName: uniqueVolumeName,
|
||||||
|
PodName: volume.podName,
|
||||||
|
VolumeSpec: volumeSpec,
|
||||||
|
OuterVolumeSpecName: volumeName, /*volumeName is InnerVolumeSpecName. But this information will not be used for cleanup*/
|
||||||
|
Pod: pod,
|
||||||
|
PluginIsAttachable: attachablePlugin != nil,
|
||||||
|
VolumeGidValue: "",
|
||||||
|
DevicePath: "",
|
||||||
|
}
|
||||||
|
return volumeToMount, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (rc *reconciler) updateStates(volumeToMount *operationexecutor.VolumeToMount) error {
|
||||||
|
err := rc.actualStateOfWorld.MarkVolumeAsAttached(
|
||||||
|
volumeToMount.VolumeName, volumeToMount.VolumeSpec, "", volumeToMount.DevicePath)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("Could not add volume information to actual state of world: %v", err)
|
||||||
|
}
|
||||||
|
err = rc.actualStateOfWorld.AddPodToVolume(
|
||||||
|
volumeToMount.PodName,
|
||||||
|
types.UID(volumeToMount.PodName),
|
||||||
|
volumeToMount.VolumeName,
|
||||||
|
nil,
|
||||||
|
volumeToMount.OuterVolumeSpecName,
|
||||||
|
volumeToMount.DevicePath)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("Could not add pod to volume information to actual state of world: %v", err)
|
||||||
|
}
|
||||||
|
if volumeToMount.PluginIsAttachable {
|
||||||
|
err = rc.actualStateOfWorld.MarkDeviceAsMounted(volumeToMount.VolumeName)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("Could not mark device is mounted to actual state of world: %v", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
_, err = rc.desiredStateOfWorld.AddPodToVolume(volumeToMount.PodName,
|
||||||
|
volumeToMount.Pod,
|
||||||
|
volumeToMount.VolumeSpec,
|
||||||
|
volumeToMount.OuterVolumeSpecName,
|
||||||
|
volumeToMount.VolumeGidValue)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("Could not add pod to volume information to desired state of world: %v", err)
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// getVolumesFromPodDir scans through the volumes directories under the given pod directory.
|
||||||
|
// It returns a list of pod volume information including pod's uid, volume's plugin name, mount path,
|
||||||
|
// and volume spec name.
|
||||||
|
func getVolumesFromPodDir(podDir string) ([]podVolume, error) {
|
||||||
|
podsDirInfo, err := ioutil.ReadDir(podDir)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
volumes := []podVolume{}
|
||||||
|
for i := range podsDirInfo {
|
||||||
|
if !podsDirInfo[i].IsDir() {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
podName := podsDirInfo[i].Name()
|
||||||
|
podDir := path.Join(podDir, podName)
|
||||||
|
volumesDir := path.Join(podDir, options.DefaultKubeletVolumesDirName)
|
||||||
|
volumesDirInfo, err := ioutil.ReadDir(volumesDir)
|
||||||
|
if err != nil {
|
||||||
|
glog.Errorf("Could not read volume directory %q: %v", volumesDir, err)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
for _, volumeDir := range volumesDirInfo {
|
||||||
|
pluginName := volumeDir.Name()
|
||||||
|
volumePluginPath := path.Join(volumesDir, pluginName)
|
||||||
|
|
||||||
|
volumePluginDirs, err := ioutil.ReadDir(volumePluginPath)
|
||||||
|
if err != nil {
|
||||||
|
glog.Errorf("Could not read volume plugin directory %q: %v", volumePluginPath, err)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
unescapePluginName := strings.UnescapeQualifiedNameForDisk(pluginName)
|
||||||
|
for _, volumeNameDir := range volumePluginDirs {
|
||||||
|
if volumeNameDir != nil {
|
||||||
|
volumeName := volumeNameDir.Name()
|
||||||
|
mountPath := path.Join(volumePluginPath, volumeName)
|
||||||
|
volumes = append(volumes, podVolume{
|
||||||
|
podName: volumetypes.UniquePodName(podName),
|
||||||
|
volumeSpecName: volumeName,
|
||||||
|
mountPath: mountPath,
|
||||||
|
pluginName: unescapePluginName,
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
}
|
||||||
|
glog.V(10).Infof("Get volumes from pod directory %q %+v", podDir, volumes)
|
||||||
|
return volumes, nil
|
||||||
|
}
|
||||||
|
@ -25,9 +25,11 @@ import (
|
|||||||
"k8s.io/kubernetes/pkg/api"
|
"k8s.io/kubernetes/pkg/api"
|
||||||
"k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset/fake"
|
"k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset/fake"
|
||||||
"k8s.io/kubernetes/pkg/client/testing/core"
|
"k8s.io/kubernetes/pkg/client/testing/core"
|
||||||
|
"k8s.io/kubernetes/pkg/kubelet/config"
|
||||||
"k8s.io/kubernetes/pkg/kubelet/volumemanager/cache"
|
"k8s.io/kubernetes/pkg/kubelet/volumemanager/cache"
|
||||||
"k8s.io/kubernetes/pkg/runtime"
|
"k8s.io/kubernetes/pkg/runtime"
|
||||||
"k8s.io/kubernetes/pkg/util/mount"
|
"k8s.io/kubernetes/pkg/util/mount"
|
||||||
|
"k8s.io/kubernetes/pkg/util/sets"
|
||||||
"k8s.io/kubernetes/pkg/util/wait"
|
"k8s.io/kubernetes/pkg/util/wait"
|
||||||
"k8s.io/kubernetes/pkg/volume"
|
"k8s.io/kubernetes/pkg/volume"
|
||||||
volumetesting "k8s.io/kubernetes/pkg/volume/testing"
|
volumetesting "k8s.io/kubernetes/pkg/volume/testing"
|
||||||
@ -38,12 +40,13 @@ import (
|
|||||||
const (
|
const (
|
||||||
// reconcilerLoopSleepDuration is the amount of time the reconciler loop
|
// reconcilerLoopSleepDuration is the amount of time the reconciler loop
|
||||||
// waits between successive executions
|
// waits between successive executions
|
||||||
reconcilerLoopSleepDuration time.Duration = 0 * time.Millisecond
|
reconcilerLoopSleepDuration time.Duration = 0 * time.Millisecond
|
||||||
|
reconcilerReconstructSleepPeriod time.Duration = 10 * time.Minute
|
||||||
// waitForAttachTimeout is the maximum amount of time a
|
// waitForAttachTimeout is the maximum amount of time a
|
||||||
// operationexecutor.Mount call will wait for a volume to be attached.
|
// operationexecutor.Mount call will wait for a volume to be attached.
|
||||||
waitForAttachTimeout time.Duration = 1 * time.Second
|
waitForAttachTimeout time.Duration = 1 * time.Second
|
||||||
nodeName string = "myhostname"
|
nodeName string = "myhostname"
|
||||||
|
kubeletPodsDir string = "fake-dir"
|
||||||
)
|
)
|
||||||
|
|
||||||
// Calls Run()
|
// Calls Run()
|
||||||
@ -59,15 +62,18 @@ func Test_Run_Positive_DoNothing(t *testing.T) {
|
|||||||
kubeClient,
|
kubeClient,
|
||||||
false, /* controllerAttachDetachEnabled */
|
false, /* controllerAttachDetachEnabled */
|
||||||
reconcilerLoopSleepDuration,
|
reconcilerLoopSleepDuration,
|
||||||
|
reconcilerReconstructSleepPeriod,
|
||||||
waitForAttachTimeout,
|
waitForAttachTimeout,
|
||||||
nodeName,
|
nodeName,
|
||||||
dsw,
|
dsw,
|
||||||
asw,
|
asw,
|
||||||
oex,
|
oex,
|
||||||
&mount.FakeMounter{})
|
&mount.FakeMounter{},
|
||||||
|
volumePluginMgr,
|
||||||
|
kubeletPodsDir)
|
||||||
|
|
||||||
// Act
|
// Act
|
||||||
go reconciler.Run(wait.NeverStop)
|
runReconciler(reconciler)
|
||||||
|
|
||||||
// Assert
|
// Assert
|
||||||
assert.NoError(t, volumetesting.VerifyZeroAttachCalls(fakePlugin))
|
assert.NoError(t, volumetesting.VerifyZeroAttachCalls(fakePlugin))
|
||||||
@ -92,12 +98,15 @@ func Test_Run_Positive_VolumeAttachAndMount(t *testing.T) {
|
|||||||
kubeClient,
|
kubeClient,
|
||||||
false, /* controllerAttachDetachEnabled */
|
false, /* controllerAttachDetachEnabled */
|
||||||
reconcilerLoopSleepDuration,
|
reconcilerLoopSleepDuration,
|
||||||
|
reconcilerReconstructSleepPeriod,
|
||||||
waitForAttachTimeout,
|
waitForAttachTimeout,
|
||||||
nodeName,
|
nodeName,
|
||||||
dsw,
|
dsw,
|
||||||
asw,
|
asw,
|
||||||
oex,
|
oex,
|
||||||
&mount.FakeMounter{})
|
&mount.FakeMounter{},
|
||||||
|
volumePluginMgr,
|
||||||
|
kubeletPodsDir)
|
||||||
pod := &api.Pod{
|
pod := &api.Pod{
|
||||||
ObjectMeta: api.ObjectMeta{
|
ObjectMeta: api.ObjectMeta{
|
||||||
Name: "pod1",
|
Name: "pod1",
|
||||||
@ -128,9 +137,8 @@ func Test_Run_Positive_VolumeAttachAndMount(t *testing.T) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Act
|
// Act
|
||||||
go reconciler.Run(wait.NeverStop)
|
runReconciler(reconciler)
|
||||||
waitForMount(t, fakePlugin, generatedVolumeName, asw)
|
waitForMount(t, fakePlugin, generatedVolumeName, asw)
|
||||||
|
|
||||||
// Assert
|
// Assert
|
||||||
assert.NoError(t, volumetesting.VerifyAttachCallCount(
|
assert.NoError(t, volumetesting.VerifyAttachCallCount(
|
||||||
1 /* expectedAttachCallCount */, fakePlugin))
|
1 /* expectedAttachCallCount */, fakePlugin))
|
||||||
@ -160,12 +168,15 @@ func Test_Run_Positive_VolumeMountControllerAttachEnabled(t *testing.T) {
|
|||||||
kubeClient,
|
kubeClient,
|
||||||
true, /* controllerAttachDetachEnabled */
|
true, /* controllerAttachDetachEnabled */
|
||||||
reconcilerLoopSleepDuration,
|
reconcilerLoopSleepDuration,
|
||||||
|
reconcilerReconstructSleepPeriod,
|
||||||
waitForAttachTimeout,
|
waitForAttachTimeout,
|
||||||
nodeName,
|
nodeName,
|
||||||
dsw,
|
dsw,
|
||||||
asw,
|
asw,
|
||||||
oex,
|
oex,
|
||||||
&mount.FakeMounter{})
|
&mount.FakeMounter{},
|
||||||
|
volumePluginMgr,
|
||||||
|
kubeletPodsDir)
|
||||||
pod := &api.Pod{
|
pod := &api.Pod{
|
||||||
ObjectMeta: api.ObjectMeta{
|
ObjectMeta: api.ObjectMeta{
|
||||||
Name: "pod1",
|
Name: "pod1",
|
||||||
@ -197,7 +208,7 @@ func Test_Run_Positive_VolumeMountControllerAttachEnabled(t *testing.T) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Act
|
// Act
|
||||||
go reconciler.Run(wait.NeverStop)
|
runReconciler(reconciler)
|
||||||
waitForMount(t, fakePlugin, generatedVolumeName, asw)
|
waitForMount(t, fakePlugin, generatedVolumeName, asw)
|
||||||
|
|
||||||
// Assert
|
// Assert
|
||||||
@ -228,12 +239,15 @@ func Test_Run_Positive_VolumeAttachMountUnmountDetach(t *testing.T) {
|
|||||||
kubeClient,
|
kubeClient,
|
||||||
false, /* controllerAttachDetachEnabled */
|
false, /* controllerAttachDetachEnabled */
|
||||||
reconcilerLoopSleepDuration,
|
reconcilerLoopSleepDuration,
|
||||||
|
reconcilerReconstructSleepPeriod,
|
||||||
waitForAttachTimeout,
|
waitForAttachTimeout,
|
||||||
nodeName,
|
nodeName,
|
||||||
dsw,
|
dsw,
|
||||||
asw,
|
asw,
|
||||||
oex,
|
oex,
|
||||||
&mount.FakeMounter{})
|
&mount.FakeMounter{},
|
||||||
|
volumePluginMgr,
|
||||||
|
kubeletPodsDir)
|
||||||
pod := &api.Pod{
|
pod := &api.Pod{
|
||||||
ObjectMeta: api.ObjectMeta{
|
ObjectMeta: api.ObjectMeta{
|
||||||
Name: "pod1",
|
Name: "pod1",
|
||||||
@ -264,9 +278,8 @@ func Test_Run_Positive_VolumeAttachMountUnmountDetach(t *testing.T) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Act
|
// Act
|
||||||
go reconciler.Run(wait.NeverStop)
|
runReconciler(reconciler)
|
||||||
waitForMount(t, fakePlugin, generatedVolumeName, asw)
|
waitForMount(t, fakePlugin, generatedVolumeName, asw)
|
||||||
|
|
||||||
// Assert
|
// Assert
|
||||||
assert.NoError(t, volumetesting.VerifyAttachCallCount(
|
assert.NoError(t, volumetesting.VerifyAttachCallCount(
|
||||||
1 /* expectedAttachCallCount */, fakePlugin))
|
1 /* expectedAttachCallCount */, fakePlugin))
|
||||||
@ -308,12 +321,15 @@ func Test_Run_Positive_VolumeUnmountControllerAttachEnabled(t *testing.T) {
|
|||||||
kubeClient,
|
kubeClient,
|
||||||
true, /* controllerAttachDetachEnabled */
|
true, /* controllerAttachDetachEnabled */
|
||||||
reconcilerLoopSleepDuration,
|
reconcilerLoopSleepDuration,
|
||||||
|
reconcilerReconstructSleepPeriod,
|
||||||
waitForAttachTimeout,
|
waitForAttachTimeout,
|
||||||
nodeName,
|
nodeName,
|
||||||
dsw,
|
dsw,
|
||||||
asw,
|
asw,
|
||||||
oex,
|
oex,
|
||||||
&mount.FakeMounter{})
|
&mount.FakeMounter{},
|
||||||
|
volumePluginMgr,
|
||||||
|
kubeletPodsDir)
|
||||||
pod := &api.Pod{
|
pod := &api.Pod{
|
||||||
ObjectMeta: api.ObjectMeta{
|
ObjectMeta: api.ObjectMeta{
|
||||||
Name: "pod1",
|
Name: "pod1",
|
||||||
@ -344,7 +360,8 @@ func Test_Run_Positive_VolumeUnmountControllerAttachEnabled(t *testing.T) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Act
|
// Act
|
||||||
go reconciler.Run(wait.NeverStop)
|
runReconciler(reconciler)
|
||||||
|
|
||||||
dsw.MarkVolumesReportedInUse([]api.UniqueVolumeName{generatedVolumeName})
|
dsw.MarkVolumesReportedInUse([]api.UniqueVolumeName{generatedVolumeName})
|
||||||
waitForMount(t, fakePlugin, generatedVolumeName, asw)
|
waitForMount(t, fakePlugin, generatedVolumeName, asw)
|
||||||
|
|
||||||
@ -445,3 +462,8 @@ func createTestClient() *fake.Clientset {
|
|||||||
})
|
})
|
||||||
return fakeClient
|
return fakeClient
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func runReconciler(reconciler Reconciler) {
|
||||||
|
sourcesReady := config.NewSourcesReady(func(_ sets.String) bool { return false })
|
||||||
|
go reconciler.Run(sourcesReady, wait.NeverStop)
|
||||||
|
}
|
||||||
|
@ -24,6 +24,7 @@ import (
|
|||||||
"github.com/golang/glog"
|
"github.com/golang/glog"
|
||||||
"k8s.io/kubernetes/pkg/api"
|
"k8s.io/kubernetes/pkg/api"
|
||||||
"k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset"
|
"k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset"
|
||||||
|
"k8s.io/kubernetes/pkg/kubelet/config"
|
||||||
"k8s.io/kubernetes/pkg/kubelet/container"
|
"k8s.io/kubernetes/pkg/kubelet/container"
|
||||||
kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
|
kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
|
||||||
"k8s.io/kubernetes/pkg/kubelet/pod"
|
"k8s.io/kubernetes/pkg/kubelet/pod"
|
||||||
@ -46,6 +47,10 @@ const (
|
|||||||
// between successive executions
|
// between successive executions
|
||||||
reconcilerLoopSleepPeriod time.Duration = 100 * time.Millisecond
|
reconcilerLoopSleepPeriod time.Duration = 100 * time.Millisecond
|
||||||
|
|
||||||
|
// reconcilerReconstructSleepPeriod is the amount of time the reconciler reconstruct process
|
||||||
|
// waits between successive executions
|
||||||
|
reconcilerReconstructSleepPeriod time.Duration = 3 * time.Minute
|
||||||
|
|
||||||
// desiredStateOfWorldPopulatorLoopSleepPeriod is the amount of time the
|
// desiredStateOfWorldPopulatorLoopSleepPeriod is the amount of time the
|
||||||
// DesiredStateOfWorldPopulator loop waits between successive executions
|
// DesiredStateOfWorldPopulator loop waits between successive executions
|
||||||
desiredStateOfWorldPopulatorLoopSleepPeriod time.Duration = 100 * time.Millisecond
|
desiredStateOfWorldPopulatorLoopSleepPeriod time.Duration = 100 * time.Millisecond
|
||||||
@ -76,6 +81,10 @@ const (
|
|||||||
// operation is waiting it only blocks other operations on the same device,
|
// operation is waiting it only blocks other operations on the same device,
|
||||||
// other devices are not affected.
|
// other devices are not affected.
|
||||||
waitForAttachTimeout time.Duration = 10 * time.Minute
|
waitForAttachTimeout time.Duration = 10 * time.Minute
|
||||||
|
|
||||||
|
// reconcilerStartGracePeriod is the maximum amount of time volume manager
|
||||||
|
// can wait to start reconciler
|
||||||
|
reconcilerStartGracePeriod time.Duration = 60 * time.Second
|
||||||
)
|
)
|
||||||
|
|
||||||
// VolumeManager runs a set of asynchronous loops that figure out which volumes
|
// VolumeManager runs a set of asynchronous loops that figure out which volumes
|
||||||
@ -83,7 +92,7 @@ const (
|
|||||||
// this node and makes it so.
|
// this node and makes it so.
|
||||||
type VolumeManager interface {
|
type VolumeManager interface {
|
||||||
// Starts the volume manager and all the asynchronous loops that it controls
|
// Starts the volume manager and all the asynchronous loops that it controls
|
||||||
Run(stopCh <-chan struct{})
|
Run(sourcesReady config.SourcesReady, stopCh <-chan struct{})
|
||||||
|
|
||||||
// WaitForAttachAndMount processes the volumes referenced in the specified
|
// WaitForAttachAndMount processes the volumes referenced in the specified
|
||||||
// pod and blocks until they are all attached and mounted (reflected in
|
// pod and blocks until they are all attached and mounted (reflected in
|
||||||
@ -138,7 +147,8 @@ func NewVolumeManager(
|
|||||||
kubeClient internalclientset.Interface,
|
kubeClient internalclientset.Interface,
|
||||||
volumePluginMgr *volume.VolumePluginMgr,
|
volumePluginMgr *volume.VolumePluginMgr,
|
||||||
kubeContainerRuntime kubecontainer.Runtime,
|
kubeContainerRuntime kubecontainer.Runtime,
|
||||||
mounter mount.Interface) (VolumeManager, error) {
|
mounter mount.Interface,
|
||||||
|
kubeletPodsDir string) (VolumeManager, error) {
|
||||||
vm := &volumeManager{
|
vm := &volumeManager{
|
||||||
kubeClient: kubeClient,
|
kubeClient: kubeClient,
|
||||||
volumePluginMgr: volumePluginMgr,
|
volumePluginMgr: volumePluginMgr,
|
||||||
@ -153,12 +163,15 @@ func NewVolumeManager(
|
|||||||
kubeClient,
|
kubeClient,
|
||||||
controllerAttachDetachEnabled,
|
controllerAttachDetachEnabled,
|
||||||
reconcilerLoopSleepPeriod,
|
reconcilerLoopSleepPeriod,
|
||||||
|
reconcilerReconstructSleepPeriod,
|
||||||
waitForAttachTimeout,
|
waitForAttachTimeout,
|
||||||
hostName,
|
hostName,
|
||||||
vm.desiredStateOfWorld,
|
vm.desiredStateOfWorld,
|
||||||
vm.actualStateOfWorld,
|
vm.actualStateOfWorld,
|
||||||
vm.operationExecutor,
|
vm.operationExecutor,
|
||||||
mounter)
|
mounter,
|
||||||
|
volumePluginMgr,
|
||||||
|
kubeletPodsDir)
|
||||||
vm.desiredStateOfWorldPopulator = populator.NewDesiredStateOfWorldPopulator(
|
vm.desiredStateOfWorldPopulator = populator.NewDesiredStateOfWorldPopulator(
|
||||||
kubeClient,
|
kubeClient,
|
||||||
desiredStateOfWorldPopulatorLoopSleepPeriod,
|
desiredStateOfWorldPopulatorLoopSleepPeriod,
|
||||||
@ -208,12 +221,14 @@ type volumeManager struct {
|
|||||||
desiredStateOfWorldPopulator populator.DesiredStateOfWorldPopulator
|
desiredStateOfWorldPopulator populator.DesiredStateOfWorldPopulator
|
||||||
}
|
}
|
||||||
|
|
||||||
func (vm *volumeManager) Run(stopCh <-chan struct{}) {
|
func (vm *volumeManager) Run(sourcesReady config.SourcesReady, stopCh <-chan struct{}) {
|
||||||
defer runtime.HandleCrash()
|
defer runtime.HandleCrash()
|
||||||
glog.Infof("Starting Kubelet Volume Manager")
|
|
||||||
|
|
||||||
go vm.reconciler.Run(stopCh)
|
|
||||||
go vm.desiredStateOfWorldPopulator.Run(stopCh)
|
go vm.desiredStateOfWorldPopulator.Run(stopCh)
|
||||||
|
glog.V(2).Infof("The desired_state_of_world populator starts")
|
||||||
|
|
||||||
|
glog.Infof("Starting Kubelet Volume Manager")
|
||||||
|
go vm.reconciler.Run(sourcesReady, stopCh)
|
||||||
|
|
||||||
<-stopCh
|
<-stopCh
|
||||||
glog.Infof("Shutting down Kubelet Volume Manager")
|
glog.Infof("Shutting down Kubelet Volume Manager")
|
||||||
|
@ -26,11 +26,13 @@ import (
|
|||||||
"k8s.io/kubernetes/pkg/api"
|
"k8s.io/kubernetes/pkg/api"
|
||||||
"k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset"
|
"k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset"
|
||||||
"k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset/fake"
|
"k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset/fake"
|
||||||
|
"k8s.io/kubernetes/pkg/kubelet/config"
|
||||||
containertest "k8s.io/kubernetes/pkg/kubelet/container/testing"
|
containertest "k8s.io/kubernetes/pkg/kubelet/container/testing"
|
||||||
"k8s.io/kubernetes/pkg/kubelet/pod"
|
"k8s.io/kubernetes/pkg/kubelet/pod"
|
||||||
kubepod "k8s.io/kubernetes/pkg/kubelet/pod"
|
kubepod "k8s.io/kubernetes/pkg/kubelet/pod"
|
||||||
podtest "k8s.io/kubernetes/pkg/kubelet/pod/testing"
|
podtest "k8s.io/kubernetes/pkg/kubelet/pod/testing"
|
||||||
"k8s.io/kubernetes/pkg/util/mount"
|
"k8s.io/kubernetes/pkg/util/mount"
|
||||||
|
"k8s.io/kubernetes/pkg/util/sets"
|
||||||
utiltesting "k8s.io/kubernetes/pkg/util/testing"
|
utiltesting "k8s.io/kubernetes/pkg/util/testing"
|
||||||
"k8s.io/kubernetes/pkg/volume"
|
"k8s.io/kubernetes/pkg/volume"
|
||||||
volumetest "k8s.io/kubernetes/pkg/volume/testing"
|
volumetest "k8s.io/kubernetes/pkg/volume/testing"
|
||||||
@ -58,8 +60,7 @@ func TestGetMountedVolumesForPodAndGetVolumesInUse(t *testing.T) {
|
|||||||
t.Fatalf("Failed to initialize volume manager: %v", err)
|
t.Fatalf("Failed to initialize volume manager: %v", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
stopCh := make(chan struct{})
|
stopCh := runVolumeManager(manager)
|
||||||
go manager.Run(stopCh)
|
|
||||||
defer close(stopCh)
|
defer close(stopCh)
|
||||||
|
|
||||||
podManager.SetPods([]*api.Pod{pod})
|
podManager.SetPods([]*api.Pod{pod})
|
||||||
@ -149,8 +150,10 @@ func TestGetExtraSupplementalGroupsForPod(t *testing.T) {
|
|||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
stopCh := make(chan struct{})
|
stopCh := runVolumeManager(manager)
|
||||||
go manager.Run(stopCh)
|
defer func() {
|
||||||
|
close(stopCh)
|
||||||
|
}()
|
||||||
|
|
||||||
podManager.SetPods([]*api.Pod{pod})
|
podManager.SetPods([]*api.Pod{pod})
|
||||||
|
|
||||||
@ -170,8 +173,6 @@ func TestGetExtraSupplementalGroupsForPod(t *testing.T) {
|
|||||||
if !reflect.DeepEqual(tc.expected, actual) {
|
if !reflect.DeepEqual(tc.expected, actual) {
|
||||||
t.Errorf("Expected supplemental groups %v, got %v", tc.expected, actual)
|
t.Errorf("Expected supplemental groups %v, got %v", tc.expected, actual)
|
||||||
}
|
}
|
||||||
|
|
||||||
close(stopCh)
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -190,7 +191,8 @@ func newTestVolumeManager(
|
|||||||
kubeClient,
|
kubeClient,
|
||||||
plugMgr,
|
plugMgr,
|
||||||
&containertest.FakeRuntime{},
|
&containertest.FakeRuntime{},
|
||||||
&mount.FakeMounter{})
|
&mount.FakeMounter{},
|
||||||
|
"")
|
||||||
return vm, err
|
return vm, err
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -276,3 +278,12 @@ func simulateVolumeInUseUpdate(
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func runVolumeManager(manager VolumeManager) chan struct{} {
|
||||||
|
stopCh := make(chan struct{})
|
||||||
|
//readyCh := make(chan bool, 1)
|
||||||
|
//readyCh <- true
|
||||||
|
sourcesReady := config.NewSourcesReady(func(_ sets.String) bool { return true })
|
||||||
|
go manager.Run(sourcesReady, stopCh)
|
||||||
|
return stopCh
|
||||||
|
}
|
||||||
|
@ -58,6 +58,10 @@ type GoRoutineMap interface {
|
|||||||
// necessary during tests - the test should wait until all operations finish
|
// necessary during tests - the test should wait until all operations finish
|
||||||
// and evaluate results after that.
|
// and evaluate results after that.
|
||||||
Wait()
|
Wait()
|
||||||
|
|
||||||
|
// IsOperationPending returns true if the operation is pending, otherwise
|
||||||
|
// returns false
|
||||||
|
IsOperationPending(operationName string) bool
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewGoRoutineMap returns a new instance of GoRoutineMap.
|
// NewGoRoutineMap returns a new instance of GoRoutineMap.
|
||||||
@ -75,7 +79,7 @@ type goRoutineMap struct {
|
|||||||
operations map[string]operation
|
operations map[string]operation
|
||||||
exponentialBackOffOnError bool
|
exponentialBackOffOnError bool
|
||||||
cond *sync.Cond
|
cond *sync.Cond
|
||||||
lock sync.Mutex
|
lock sync.RWMutex
|
||||||
}
|
}
|
||||||
|
|
||||||
type operation struct {
|
type operation struct {
|
||||||
@ -150,6 +154,16 @@ func (grm *goRoutineMap) operationComplete(
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (grm *goRoutineMap) IsOperationPending(operationName string) bool {
|
||||||
|
grm.lock.RLock()
|
||||||
|
defer grm.lock.RUnlock()
|
||||||
|
existingOp, exists := grm.operations[operationName]
|
||||||
|
if exists && existingOp.operationPending {
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
|
||||||
func (grm *goRoutineMap) Wait() {
|
func (grm *goRoutineMap) Wait() {
|
||||||
grm.lock.Lock()
|
grm.lock.Lock()
|
||||||
defer grm.lock.Unlock()
|
defer grm.lock.Unlock()
|
||||||
|
@ -140,3 +140,7 @@ func (f *FakeMounter) DeviceOpened(pathname string) (bool, error) {
|
|||||||
func (f *FakeMounter) PathIsDevice(pathname string) (bool, error) {
|
func (f *FakeMounter) PathIsDevice(pathname string) (bool, error) {
|
||||||
return true, nil
|
return true, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (f *FakeMounter) GetDeviceNameFromMount(mountPath, pluginDir string) (string, error) {
|
||||||
|
return getDeviceNameFromMount(f, mountPath, pluginDir)
|
||||||
|
}
|
||||||
|
@ -19,7 +19,10 @@ limitations under the License.
|
|||||||
package mount
|
package mount
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"fmt"
|
||||||
|
"path"
|
||||||
"path/filepath"
|
"path/filepath"
|
||||||
|
"strings"
|
||||||
|
|
||||||
"github.com/golang/glog"
|
"github.com/golang/glog"
|
||||||
"k8s.io/kubernetes/pkg/util/exec"
|
"k8s.io/kubernetes/pkg/util/exec"
|
||||||
@ -43,6 +46,9 @@ type Interface interface {
|
|||||||
DeviceOpened(pathname string) (bool, error)
|
DeviceOpened(pathname string) (bool, error)
|
||||||
// PathIsDevice determines if a path is a device.
|
// PathIsDevice determines if a path is a device.
|
||||||
PathIsDevice(pathname string) (bool, error)
|
PathIsDevice(pathname string) (bool, error)
|
||||||
|
// GetDeviceNameFromMount finds the device name by checking the mount path
|
||||||
|
// to get the global mount path which matches its plugin directory
|
||||||
|
GetDeviceNameFromMount(mountPath, pluginDir string) (string, error)
|
||||||
}
|
}
|
||||||
|
|
||||||
// This represents a single line in /proc/mounts or /etc/fstab.
|
// This represents a single line in /proc/mounts or /etc/fstab.
|
||||||
@ -151,3 +157,25 @@ func GetDeviceNameFromMount(mounter Interface, mountPath string) (string, int, e
|
|||||||
}
|
}
|
||||||
return device, refCount, nil
|
return device, refCount, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// getDeviceNameFromMount find the device name from /proc/mounts in which
|
||||||
|
// the mount path reference should match the given plugin directory. In case no mount path reference
|
||||||
|
// matches, returns the volume name taken from its given mountPath
|
||||||
|
func getDeviceNameFromMount(mounter Interface, mountPath, pluginDir string) (string, error) {
|
||||||
|
refs, err := GetMountRefs(mounter, mountPath)
|
||||||
|
if err != nil {
|
||||||
|
glog.V(4).Infof("GetMountRefs failed for mount path %q: %v", mountPath, err)
|
||||||
|
return "", err
|
||||||
|
}
|
||||||
|
if len(refs) == 0 {
|
||||||
|
glog.V(4).Infof("Directory %s is not mounted", mountPath)
|
||||||
|
return "", fmt.Errorf("directory %s is not mounted", mountPath)
|
||||||
|
}
|
||||||
|
for _, ref := range refs {
|
||||||
|
if strings.HasPrefix(ref, pluginDir) {
|
||||||
|
return path.Base(ref), nil
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return path.Base(mountPath), nil
|
||||||
|
}
|
||||||
|
@ -222,6 +222,11 @@ func pathIsDevice(pathname string) (bool, error) {
|
|||||||
return false, nil
|
return false, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
//GetDeviceNameFromMount: given a mount point, find the device name from its global mount point
|
||||||
|
func (mounter *Mounter) GetDeviceNameFromMount(mountPath, pluginDir string) (string, error) {
|
||||||
|
return getDeviceNameFromMount(mounter, mountPath, pluginDir)
|
||||||
|
}
|
||||||
|
|
||||||
func listProcMounts(mountFilePath string) ([]MountPoint, error) {
|
func listProcMounts(mountFilePath string) ([]MountPoint, error) {
|
||||||
hash1, err := readProcMounts(mountFilePath, nil)
|
hash1, err := readProcMounts(mountFilePath, nil)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -217,6 +217,11 @@ func (n *NsenterMounter) PathIsDevice(pathname string) (bool, error) {
|
|||||||
return pathIsDevice(pathname)
|
return pathIsDevice(pathname)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
//GetDeviceNameFromMount given a mount point, find the volume id from checking /proc/mounts
|
||||||
|
func (n *NsenterMounter) GetDeviceNameFromMount(mountPath, pluginDir string) (string, error) {
|
||||||
|
return getDeviceNameFromMount(n, mountPath, pluginDir)
|
||||||
|
}
|
||||||
|
|
||||||
func (n *NsenterMounter) absHostPath(command string) string {
|
func (n *NsenterMounter) absHostPath(command string) string {
|
||||||
path, ok := n.paths[command]
|
path, ok := n.paths[command]
|
||||||
if !ok {
|
if !ok {
|
||||||
|
@ -49,3 +49,7 @@ func (*NsenterMounter) DeviceOpened(pathname string) (bool, error) {
|
|||||||
func (*NsenterMounter) PathIsDevice(pathname string) (bool, error) {
|
func (*NsenterMounter) PathIsDevice(pathname string) (bool, error) {
|
||||||
return true, nil
|
return true, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (*NsenterMounter) GetDeviceNameFromMount(mountPath, pluginDir string) (string, error) {
|
||||||
|
return "", nil
|
||||||
|
}
|
||||||
|
@ -51,6 +51,11 @@ func (plugin *awsElasticBlockStorePlugin) NewAttacher() (volume.Attacher, error)
|
|||||||
}, nil
|
}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (plugin *awsElasticBlockStorePlugin) GetDeviceMountRefs(deviceMountPath string) ([]string, error) {
|
||||||
|
mounter := plugin.host.GetMounter()
|
||||||
|
return mount.GetMountRefs(mounter, deviceMountPath)
|
||||||
|
}
|
||||||
|
|
||||||
func (attacher *awsElasticBlockStoreAttacher) Attach(spec *volume.Spec, hostName string) (string, error) {
|
func (attacher *awsElasticBlockStoreAttacher) Attach(spec *volume.Spec, hostName string) (string, error) {
|
||||||
volumeSource, readOnly, err := getVolumeSource(spec)
|
volumeSource, readOnly, err := getVolumeSource(spec)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -188,6 +188,24 @@ func getVolumeSource(
|
|||||||
return nil, false, fmt.Errorf("Spec does not reference an AWS EBS volume type")
|
return nil, false, fmt.Errorf("Spec does not reference an AWS EBS volume type")
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (plugin *awsElasticBlockStorePlugin) ConstructVolumeSpec(volName, mountPath string) (*volume.Spec, error) {
|
||||||
|
mounter := plugin.host.GetMounter()
|
||||||
|
pluginDir := plugin.host.GetPluginDir(plugin.GetPluginName())
|
||||||
|
sourceName, err := mounter.GetDeviceNameFromMount(mountPath, pluginDir)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
awsVolume := &api.Volume{
|
||||||
|
Name: volName,
|
||||||
|
VolumeSource: api.VolumeSource{
|
||||||
|
AWSElasticBlockStore: &api.AWSElasticBlockStoreVolumeSource{
|
||||||
|
VolumeID: sourceName,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
return volume.NewSpecFromVolume(awsVolume), nil
|
||||||
|
}
|
||||||
|
|
||||||
// Abstract interface to PD operations.
|
// Abstract interface to PD operations.
|
||||||
type ebsManager interface {
|
type ebsManager interface {
|
||||||
CreateVolume(provisioner *awsElasticBlockStoreProvisioner) (volumeID string, volumeSizeGB int, labels map[string]string, err error)
|
CreateVolume(provisioner *awsElasticBlockStoreProvisioner) (volumeID string, volumeSizeGB int, labels map[string]string, err error)
|
||||||
|
@ -124,6 +124,19 @@ func (plugin *azureFilePlugin) newUnmounterInternal(volName string, podUID types
|
|||||||
}}, nil
|
}}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (plugin *azureFilePlugin) ConstructVolumeSpec(volName, mountPath string) (*volume.Spec, error) {
|
||||||
|
azureVolume := &api.Volume{
|
||||||
|
Name: volName,
|
||||||
|
VolumeSource: api.VolumeSource{
|
||||||
|
AzureFile: &api.AzureFileVolumeSource{
|
||||||
|
SecretName: volName,
|
||||||
|
ShareName: volName,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
return volume.NewSpecFromVolume(azureVolume), nil
|
||||||
|
}
|
||||||
|
|
||||||
// azureFile volumes represent mount of an AzureFile share.
|
// azureFile volumes represent mount of an AzureFile share.
|
||||||
type azureFile struct {
|
type azureFile struct {
|
||||||
volName string
|
volName string
|
||||||
|
@ -154,6 +154,19 @@ func (plugin *cephfsPlugin) newUnmounterInternal(volName string, podUID types.UI
|
|||||||
}, nil
|
}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (plugin *cephfsPlugin) ConstructVolumeSpec(volumeName, mountPath string) (*volume.Spec, error) {
|
||||||
|
cephfsVolume := &api.Volume{
|
||||||
|
Name: volumeName,
|
||||||
|
VolumeSource: api.VolumeSource{
|
||||||
|
CephFS: &api.CephFSVolumeSource{
|
||||||
|
Monitors: []string{},
|
||||||
|
Path: volumeName,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
return volume.NewSpecFromVolume(cephfsVolume), nil
|
||||||
|
}
|
||||||
|
|
||||||
// CephFS volumes represent a bare host file or directory mount of an CephFS export.
|
// CephFS volumes represent a bare host file or directory mount of an CephFS export.
|
||||||
type cephfs struct {
|
type cephfs struct {
|
||||||
volName string
|
volName string
|
||||||
|
@ -53,6 +53,11 @@ func (plugin *cinderPlugin) NewAttacher() (volume.Attacher, error) {
|
|||||||
}, nil
|
}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (plugin *cinderPlugin) GetDeviceMountRefs(deviceMountPath string) ([]string, error) {
|
||||||
|
mounter := plugin.host.GetMounter()
|
||||||
|
return mount.GetMountRefs(mounter, deviceMountPath)
|
||||||
|
}
|
||||||
|
|
||||||
func (attacher *cinderDiskAttacher) Attach(spec *volume.Spec, hostName string) (string, error) {
|
func (attacher *cinderDiskAttacher) Attach(spec *volume.Spec, hostName string) (string, error) {
|
||||||
volumeSource, _, err := getVolumeSource(spec)
|
volumeSource, _, err := getVolumeSource(spec)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -204,6 +204,25 @@ func (plugin *cinderPlugin) getCloudProvider() (CinderProvider, error) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (plugin *cinderPlugin) ConstructVolumeSpec(volumeName, mountPath string) (*volume.Spec, error) {
|
||||||
|
mounter := plugin.host.GetMounter()
|
||||||
|
pluginDir := plugin.host.GetPluginDir(plugin.GetPluginName())
|
||||||
|
sourceName, err := mounter.GetDeviceNameFromMount(mountPath, pluginDir)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
glog.V(4).Infof("Found volume %s mounted to %s", sourceName, mountPath)
|
||||||
|
cinderVolume := &api.Volume{
|
||||||
|
Name: volumeName,
|
||||||
|
VolumeSource: api.VolumeSource{
|
||||||
|
Cinder: &api.CinderVolumeSource{
|
||||||
|
VolumeID: sourceName,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
return volume.NewSpecFromVolume(cinderVolume), nil
|
||||||
|
}
|
||||||
|
|
||||||
// Abstract interface to PD operations.
|
// Abstract interface to PD operations.
|
||||||
type cdManager interface {
|
type cdManager interface {
|
||||||
// Attaches the disk to the kubelet's host machine.
|
// Attaches the disk to the kubelet's host machine.
|
||||||
|
@ -86,6 +86,16 @@ func (plugin *configMapPlugin) NewUnmounter(volName string, podUID types.UID) (v
|
|||||||
return &configMapVolumeUnmounter{&configMapVolume{volName, podUID, plugin, plugin.host.GetMounter(), plugin.host.GetWriter(), volume.MetricsNil{}}}, nil
|
return &configMapVolumeUnmounter{&configMapVolume{volName, podUID, plugin, plugin.host.GetMounter(), plugin.host.GetWriter(), volume.MetricsNil{}}}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (plugin *configMapPlugin) ConstructVolumeSpec(volumeName, mountPath string) (*volume.Spec, error) {
|
||||||
|
configMapVolume := &api.Volume{
|
||||||
|
Name: volumeName,
|
||||||
|
VolumeSource: api.VolumeSource{
|
||||||
|
ConfigMap: &api.ConfigMapVolumeSource{},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
return volume.NewSpecFromVolume(configMapVolume), nil
|
||||||
|
}
|
||||||
|
|
||||||
type configMapVolume struct {
|
type configMapVolume struct {
|
||||||
volName string
|
volName string
|
||||||
podUID types.UID
|
podUID types.UID
|
||||||
|
@ -106,6 +106,16 @@ func (plugin *downwardAPIPlugin) NewUnmounter(volName string, podUID types.UID)
|
|||||||
}, nil
|
}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (plugin *downwardAPIPlugin) ConstructVolumeSpec(volumeName, mountPath string) (*volume.Spec, error) {
|
||||||
|
downwardAPIVolume := &api.Volume{
|
||||||
|
Name: volumeName,
|
||||||
|
VolumeSource: api.VolumeSource{
|
||||||
|
DownwardAPI: &api.DownwardAPIVolumeSource{},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
return volume.NewSpecFromVolume(downwardAPIVolume), nil
|
||||||
|
}
|
||||||
|
|
||||||
// downwardAPIVolume retrieves downward API data and placing them into the volume on the host.
|
// downwardAPIVolume retrieves downward API data and placing them into the volume on the host.
|
||||||
type downwardAPIVolume struct {
|
type downwardAPIVolume struct {
|
||||||
volName string
|
volName string
|
||||||
|
@ -128,6 +128,16 @@ func (plugin *emptyDirPlugin) newUnmounterInternal(volName string, podUID types.
|
|||||||
return ed, nil
|
return ed, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (plugin *emptyDirPlugin) ConstructVolumeSpec(volName, mountPath string) (*volume.Spec, error) {
|
||||||
|
emptyDirVolume := &api.Volume{
|
||||||
|
Name: volName,
|
||||||
|
VolumeSource: api.VolumeSource{
|
||||||
|
EmptyDir: &api.EmptyDirVolumeSource{},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
return volume.NewSpecFromVolume(emptyDirVolume), nil
|
||||||
|
}
|
||||||
|
|
||||||
// mountDetector abstracts how to find what kind of mount a path is backed by.
|
// mountDetector abstracts how to find what kind of mount a path is backed by.
|
||||||
type mountDetector interface {
|
type mountDetector interface {
|
||||||
// GetMountMedium determines what type of medium a given path is backed
|
// GetMountMedium determines what type of medium a given path is backed
|
||||||
|
@ -141,6 +141,16 @@ func (plugin *fcPlugin) execCommand(command string, args []string) ([]byte, erro
|
|||||||
return cmd.CombinedOutput()
|
return cmd.CombinedOutput()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (plugin *fcPlugin) ConstructVolumeSpec(volumeName, mountPath string) (*volume.Spec, error) {
|
||||||
|
fcVolume := &api.Volume{
|
||||||
|
Name: volumeName,
|
||||||
|
VolumeSource: api.VolumeSource{
|
||||||
|
FC: &api.FCVolumeSource{},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
return volume.NewSpecFromVolume(fcVolume), nil
|
||||||
|
}
|
||||||
|
|
||||||
type fcDisk struct {
|
type fcDisk struct {
|
||||||
volName string
|
volName string
|
||||||
podUID types.UID
|
podUID types.UID
|
||||||
|
@ -179,6 +179,18 @@ func (plugin *flexVolumePlugin) newUnmounterInternal(volName string, podUID type
|
|||||||
}, nil
|
}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (plugin *flexVolumePlugin) ConstructVolumeSpec(volumeName, sourceName string) (*volume.Spec, error) {
|
||||||
|
flexVolume := &api.Volume{
|
||||||
|
Name: volumeName,
|
||||||
|
VolumeSource: api.VolumeSource{
|
||||||
|
FlexVolume: &api.FlexVolumeSource{
|
||||||
|
Driver: sourceName,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
return volume.NewSpecFromVolume(flexVolume), nil
|
||||||
|
}
|
||||||
|
|
||||||
// flexVolume is the disk resource provided by this plugin.
|
// flexVolume is the disk resource provided by this plugin.
|
||||||
type flexVolumeDisk struct {
|
type flexVolumeDisk struct {
|
||||||
// podUID is the UID of the pod.
|
// podUID is the UID of the pod.
|
||||||
|
@ -117,6 +117,18 @@ func (p *flockerPlugin) NewUnmounter(datasetName string, podUID types.UID) (volu
|
|||||||
return nil, nil
|
return nil, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (p *flockerPlugin) ConstructVolumeSpec(volumeName, mountPath string) (*volume.Spec, error) {
|
||||||
|
flockerVolume := &api.Volume{
|
||||||
|
Name: volumeName,
|
||||||
|
VolumeSource: api.VolumeSource{
|
||||||
|
Flocker: &api.FlockerVolumeSource{
|
||||||
|
DatasetName: volumeName,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
return volume.NewSpecFromVolume(flockerVolume), nil
|
||||||
|
}
|
||||||
|
|
||||||
type flockerMounter struct {
|
type flockerMounter struct {
|
||||||
*flocker
|
*flocker
|
||||||
client flockerclient.Clientable
|
client flockerclient.Clientable
|
||||||
|
@ -53,6 +53,11 @@ func (plugin *gcePersistentDiskPlugin) NewAttacher() (volume.Attacher, error) {
|
|||||||
}, nil
|
}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (plugin *gcePersistentDiskPlugin) GetDeviceMountRefs(deviceMountPath string) ([]string, error) {
|
||||||
|
mounter := plugin.host.GetMounter()
|
||||||
|
return mount.GetMountRefs(mounter, deviceMountPath)
|
||||||
|
}
|
||||||
|
|
||||||
// Attach checks with the GCE cloud provider if the specified volume is already
|
// Attach checks with the GCE cloud provider if the specified volume is already
|
||||||
// attached to the specified node. If the volume is attached, it succeeds
|
// attached to the specified node. If the volume is attached, it succeeds
|
||||||
// (returns nil). If it is not, Attach issues a call to the GCE cloud provider
|
// (returns nil). If it is not, Attach issues a call to the GCE cloud provider
|
||||||
|
@ -182,6 +182,24 @@ func (plugin *gcePersistentDiskPlugin) newProvisionerInternal(options volume.Vol
|
|||||||
}, nil
|
}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (plugin *gcePersistentDiskPlugin) ConstructVolumeSpec(volumeName, mountPath string) (*volume.Spec, error) {
|
||||||
|
mounter := plugin.host.GetMounter()
|
||||||
|
pluginDir := plugin.host.GetPluginDir(plugin.GetPluginName())
|
||||||
|
sourceName, err := mounter.GetDeviceNameFromMount(mountPath, pluginDir)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
gceVolume := &api.Volume{
|
||||||
|
Name: volumeName,
|
||||||
|
VolumeSource: api.VolumeSource{
|
||||||
|
GCEPersistentDisk: &api.GCEPersistentDiskVolumeSource{
|
||||||
|
PDName: sourceName,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
return volume.NewSpecFromVolume(gceVolume), nil
|
||||||
|
}
|
||||||
|
|
||||||
// Abstract interface to PD operations.
|
// Abstract interface to PD operations.
|
||||||
type pdManager interface {
|
type pdManager interface {
|
||||||
// Creates a volume
|
// Creates a volume
|
||||||
|
@ -107,6 +107,16 @@ func (plugin *gitRepoPlugin) NewUnmounter(volName string, podUID types.UID) (vol
|
|||||||
}, nil
|
}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (plugin *gitRepoPlugin) ConstructVolumeSpec(volumeName, mountPath string) (*volume.Spec, error) {
|
||||||
|
gitVolume := &api.Volume{
|
||||||
|
Name: volumeName,
|
||||||
|
VolumeSource: api.VolumeSource{
|
||||||
|
GitRepo: &api.GitRepoVolumeSource{},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
return volume.NewSpecFromVolume(gitVolume), nil
|
||||||
|
}
|
||||||
|
|
||||||
// gitRepo volumes are directories which are pre-filled from a git repository.
|
// gitRepo volumes are directories which are pre-filled from a git repository.
|
||||||
// These do not persist beyond the lifetime of a pod.
|
// These do not persist beyond the lifetime of a pod.
|
||||||
type gitRepoVolume struct {
|
type gitRepoVolume struct {
|
||||||
|
@ -145,6 +145,19 @@ func (plugin *glusterfsPlugin) execCommand(command string, args []string) ([]byt
|
|||||||
return cmd.CombinedOutput()
|
return cmd.CombinedOutput()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (plugin *glusterfsPlugin) ConstructVolumeSpec(volumeName, mountPath string) (*volume.Spec, error) {
|
||||||
|
glusterfsVolume := &api.Volume{
|
||||||
|
Name: volumeName,
|
||||||
|
VolumeSource: api.VolumeSource{
|
||||||
|
Glusterfs: &api.GlusterfsVolumeSource{
|
||||||
|
EndpointsName: volumeName,
|
||||||
|
Path: volumeName,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
return volume.NewSpecFromVolume(glusterfsVolume), nil
|
||||||
|
}
|
||||||
|
|
||||||
// Glusterfs volumes represent a bare host file or directory mount of an Glusterfs export.
|
// Glusterfs volumes represent a bare host file or directory mount of an Glusterfs export.
|
||||||
type glusterfs struct {
|
type glusterfs struct {
|
||||||
volName string
|
volName string
|
||||||
|
@ -138,6 +138,18 @@ func (plugin *hostPathPlugin) NewProvisioner(options volume.VolumeOptions) (volu
|
|||||||
return plugin.newProvisionerFunc(options, plugin.host)
|
return plugin.newProvisionerFunc(options, plugin.host)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (plugin *hostPathPlugin) ConstructVolumeSpec(volumeName, mountPath string) (*volume.Spec, error) {
|
||||||
|
hostPathVolume := &api.Volume{
|
||||||
|
Name: volumeName,
|
||||||
|
VolumeSource: api.VolumeSource{
|
||||||
|
HostPath: &api.HostPathVolumeSource{
|
||||||
|
Path: volumeName,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
return volume.NewSpecFromVolume(hostPathVolume), nil
|
||||||
|
}
|
||||||
|
|
||||||
func newRecycler(pvName string, spec *volume.Spec, host volume.VolumeHost, config volume.VolumeConfig) (volume.Recycler, error) {
|
func newRecycler(pvName string, spec *volume.Spec, host volume.VolumeHost, config volume.VolumeConfig) (volume.Recycler, error) {
|
||||||
if spec.PersistentVolume == nil || spec.PersistentVolume.Spec.HostPath == nil {
|
if spec.PersistentVolume == nil || spec.PersistentVolume.Spec.HostPath == nil {
|
||||||
return nil, fmt.Errorf("spec.PersistentVolumeSource.HostPath is nil")
|
return nil, fmt.Errorf("spec.PersistentVolumeSource.HostPath is nil")
|
||||||
|
@ -146,6 +146,19 @@ func (plugin *iscsiPlugin) execCommand(command string, args []string) ([]byte, e
|
|||||||
return cmd.CombinedOutput()
|
return cmd.CombinedOutput()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (plugin *iscsiPlugin) ConstructVolumeSpec(volumeName, mountPath string) (*volume.Spec, error) {
|
||||||
|
iscsiVolume := &api.Volume{
|
||||||
|
Name: volumeName,
|
||||||
|
VolumeSource: api.VolumeSource{
|
||||||
|
ISCSI: &api.ISCSIVolumeSource{
|
||||||
|
TargetPortal: volumeName,
|
||||||
|
IQN: volumeName,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
return volume.NewSpecFromVolume(iscsiVolume), nil
|
||||||
|
}
|
||||||
|
|
||||||
type iscsiDisk struct {
|
type iscsiDisk struct {
|
||||||
volName string
|
volName string
|
||||||
podUID types.UID
|
podUID types.UID
|
||||||
|
@ -136,6 +136,18 @@ func (plugin *nfsPlugin) NewRecycler(pvName string, spec *volume.Spec) (volume.R
|
|||||||
return plugin.newRecyclerFunc(pvName, spec, plugin.host, plugin.config)
|
return plugin.newRecyclerFunc(pvName, spec, plugin.host, plugin.config)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (plugin *nfsPlugin) ConstructVolumeSpec(volumeName, mountPath string) (*volume.Spec, error) {
|
||||||
|
nfsVolume := &api.Volume{
|
||||||
|
Name: volumeName,
|
||||||
|
VolumeSource: api.VolumeSource{
|
||||||
|
NFS: &api.NFSVolumeSource{
|
||||||
|
Path: volumeName,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
return volume.NewSpecFromVolume(nfsVolume), nil
|
||||||
|
}
|
||||||
|
|
||||||
// NFS volumes represent a bare host file or directory mount of an NFS export.
|
// NFS volumes represent a bare host file or directory mount of an NFS export.
|
||||||
type nfs struct {
|
type nfs struct {
|
||||||
volName string
|
volName string
|
||||||
|
@ -98,6 +98,12 @@ type VolumePlugin interface {
|
|||||||
// - name: The volume name, as per the api.Volume spec.
|
// - name: The volume name, as per the api.Volume spec.
|
||||||
// - podUID: The UID of the enclosing pod
|
// - podUID: The UID of the enclosing pod
|
||||||
NewUnmounter(name string, podUID types.UID) (Unmounter, error)
|
NewUnmounter(name string, podUID types.UID) (Unmounter, error)
|
||||||
|
|
||||||
|
// ConstructVolumeSpec constructs a volume spec based on the given volume name
|
||||||
|
// and mountPath. The spec may have incomplete information due to limited
|
||||||
|
// information from input. This function is used by volume manager to reconstruct
|
||||||
|
// volume spec by reading the volume directories from disk
|
||||||
|
ConstructVolumeSpec(volumeName, mountPath string) (*Spec, error)
|
||||||
}
|
}
|
||||||
|
|
||||||
// PersistentVolumePlugin is an extended interface of VolumePlugin and is used
|
// PersistentVolumePlugin is an extended interface of VolumePlugin and is used
|
||||||
@ -151,6 +157,7 @@ type AttachableVolumePlugin interface {
|
|||||||
VolumePlugin
|
VolumePlugin
|
||||||
NewAttacher() (Attacher, error)
|
NewAttacher() (Attacher, error)
|
||||||
NewDetacher() (Detacher, error)
|
NewDetacher() (Detacher, error)
|
||||||
|
GetDeviceMountRefs(deviceMountPath string) ([]string, error)
|
||||||
}
|
}
|
||||||
|
|
||||||
// VolumeHost is an interface that plugins can use to access the kubelet.
|
// VolumeHost is an interface that plugins can use to access the kubelet.
|
||||||
|
@ -165,6 +165,18 @@ func (plugin *rbdPlugin) newUnmounterInternal(volName string, podUID types.UID,
|
|||||||
}, nil
|
}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (plugin *rbdPlugin) ConstructVolumeSpec(volumeName, mountPath string) (*volume.Spec, error) {
|
||||||
|
rbdVolume := &api.Volume{
|
||||||
|
Name: volumeName,
|
||||||
|
VolumeSource: api.VolumeSource{
|
||||||
|
RBD: &api.RBDVolumeSource{
|
||||||
|
CephMonitors: []string{},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
return volume.NewSpecFromVolume(rbdVolume), nil
|
||||||
|
}
|
||||||
|
|
||||||
type rbd struct {
|
type rbd struct {
|
||||||
volName string
|
volName string
|
||||||
podUID types.UID
|
podUID types.UID
|
||||||
|
@ -110,6 +110,18 @@ func (plugin *secretPlugin) NewUnmounter(volName string, podUID types.UID) (volu
|
|||||||
}, nil
|
}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (plugin *secretPlugin) ConstructVolumeSpec(volName, mountPath string) (*volume.Spec, error) {
|
||||||
|
secretVolume := &api.Volume{
|
||||||
|
Name: volName,
|
||||||
|
VolumeSource: api.VolumeSource{
|
||||||
|
Secret: &api.SecretVolumeSource{
|
||||||
|
SecretName: volName,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
return volume.NewSpecFromVolume(secretVolume), nil
|
||||||
|
}
|
||||||
|
|
||||||
type secretVolume struct {
|
type secretVolume struct {
|
||||||
volName string
|
volName string
|
||||||
podUID types.UID
|
podUID types.UID
|
||||||
|
@ -288,6 +288,14 @@ func (plugin *FakeVolumePlugin) GetAccessModes() []api.PersistentVolumeAccessMod
|
|||||||
return []api.PersistentVolumeAccessMode{}
|
return []api.PersistentVolumeAccessMode{}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (plugin *FakeVolumePlugin) ConstructVolumeSpec(volumeName, mountPath string) (*Spec, error) {
|
||||||
|
return nil, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (plugin *FakeVolumePlugin) GetDeviceMountRefs(deviceMountPath string) ([]string, error) {
|
||||||
|
return []string{}, nil
|
||||||
|
}
|
||||||
|
|
||||||
type FakeVolume struct {
|
type FakeVolume struct {
|
||||||
sync.RWMutex
|
sync.RWMutex
|
||||||
PodUID types.UID
|
PodUID types.UID
|
||||||
|
@ -58,6 +58,10 @@ type NestedPendingOperations interface {
|
|||||||
// necessary during tests - the test should wait until all operations finish
|
// necessary during tests - the test should wait until all operations finish
|
||||||
// and evaluate results after that.
|
// and evaluate results after that.
|
||||||
Wait()
|
Wait()
|
||||||
|
|
||||||
|
// IsOperationPending returns true if an operation for the given volumeName and podName is pending,
|
||||||
|
// otherwise it returns false
|
||||||
|
IsOperationPending(volumeName api.UniqueVolumeName, podName types.UniquePodName) bool
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewNestedPendingOperations returns a new instance of NestedPendingOperations.
|
// NewNestedPendingOperations returns a new instance of NestedPendingOperations.
|
||||||
@ -74,7 +78,7 @@ type nestedPendingOperations struct {
|
|||||||
operations []operation
|
operations []operation
|
||||||
exponentialBackOffOnError bool
|
exponentialBackOffOnError bool
|
||||||
cond *sync.Cond
|
cond *sync.Cond
|
||||||
lock sync.Mutex
|
lock sync.RWMutex
|
||||||
}
|
}
|
||||||
|
|
||||||
type operation struct {
|
type operation struct {
|
||||||
@ -90,29 +94,9 @@ func (grm *nestedPendingOperations) Run(
|
|||||||
operationFunc func() error) error {
|
operationFunc func() error) error {
|
||||||
grm.lock.Lock()
|
grm.lock.Lock()
|
||||||
defer grm.lock.Unlock()
|
defer grm.lock.Unlock()
|
||||||
|
opExists, previousOpIndex := grm.isOperationExists(volumeName, podName)
|
||||||
var previousOp operation
|
|
||||||
opExists := false
|
|
||||||
previousOpIndex := -1
|
|
||||||
for previousOpIndex, previousOp = range grm.operations {
|
|
||||||
if previousOp.volumeName != volumeName {
|
|
||||||
// No match, keep searching
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
|
|
||||||
if previousOp.podName != emptyUniquePodName &&
|
|
||||||
podName != emptyUniquePodName &&
|
|
||||||
previousOp.podName != podName {
|
|
||||||
// No match, keep searching
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
|
|
||||||
// Match
|
|
||||||
opExists = true
|
|
||||||
break
|
|
||||||
}
|
|
||||||
|
|
||||||
if opExists {
|
if opExists {
|
||||||
|
previousOp := grm.operations[previousOpIndex]
|
||||||
// Operation already exists
|
// Operation already exists
|
||||||
if previousOp.operationPending {
|
if previousOp.operationPending {
|
||||||
// Operation is pending
|
// Operation is pending
|
||||||
@ -153,6 +137,43 @@ func (grm *nestedPendingOperations) Run(
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (grm *nestedPendingOperations) IsOperationPending(
|
||||||
|
volumeName api.UniqueVolumeName,
|
||||||
|
podName types.UniquePodName) bool {
|
||||||
|
|
||||||
|
grm.lock.RLock()
|
||||||
|
defer grm.lock.RUnlock()
|
||||||
|
|
||||||
|
exist, previousOpIndex := grm.isOperationExists(volumeName, podName)
|
||||||
|
if exist && grm.operations[previousOpIndex].operationPending {
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
|
||||||
|
func (grm *nestedPendingOperations) isOperationExists(
|
||||||
|
volumeName api.UniqueVolumeName,
|
||||||
|
podName types.UniquePodName) (bool, int) {
|
||||||
|
|
||||||
|
for previousOpIndex, previousOp := range grm.operations {
|
||||||
|
if previousOp.volumeName != volumeName {
|
||||||
|
// No match, keep searching
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
if previousOp.podName != emptyUniquePodName &&
|
||||||
|
podName != emptyUniquePodName &&
|
||||||
|
previousOp.podName != podName {
|
||||||
|
// No match, keep searching
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
// Match
|
||||||
|
return true, previousOpIndex
|
||||||
|
}
|
||||||
|
return false, -1
|
||||||
|
}
|
||||||
|
|
||||||
func (grm *nestedPendingOperations) getOperation(
|
func (grm *nestedPendingOperations) getOperation(
|
||||||
volumeName api.UniqueVolumeName,
|
volumeName api.UniqueVolumeName,
|
||||||
podName types.UniquePodName) (uint, error) {
|
podName types.UniquePodName) (uint, error) {
|
||||||
|
@ -99,6 +99,10 @@ type OperationExecutor interface {
|
|||||||
// object, for example) then an error is returned which triggers exponential
|
// object, for example) then an error is returned which triggers exponential
|
||||||
// back off on retries.
|
// back off on retries.
|
||||||
VerifyControllerAttachedVolume(volumeToMount VolumeToMount, nodeName string, actualStateOfWorld ActualStateOfWorldAttacherUpdater) error
|
VerifyControllerAttachedVolume(volumeToMount VolumeToMount, nodeName string, actualStateOfWorld ActualStateOfWorldAttacherUpdater) error
|
||||||
|
|
||||||
|
// IsOperationPending returns true if an operation for the given volumeName and podName is pending,
|
||||||
|
// otherwise it returns false
|
||||||
|
IsOperationPending(volumeName api.UniqueVolumeName, podName volumetypes.UniquePodName) bool
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewOperationExecutor returns a new instance of OperationExecutor.
|
// NewOperationExecutor returns a new instance of OperationExecutor.
|
||||||
@ -339,6 +343,10 @@ type operationExecutor struct {
|
|||||||
pendingOperations nestedpendingoperations.NestedPendingOperations
|
pendingOperations nestedpendingoperations.NestedPendingOperations
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (oe *operationExecutor) IsOperationPending(volumeName api.UniqueVolumeName, podName volumetypes.UniquePodName) bool {
|
||||||
|
return oe.pendingOperations.IsOperationPending(volumeName, podName)
|
||||||
|
}
|
||||||
|
|
||||||
func (oe *operationExecutor) AttachVolume(
|
func (oe *operationExecutor) AttachVolume(
|
||||||
volumeToAttach VolumeToAttach,
|
volumeToAttach VolumeToAttach,
|
||||||
actualStateOfWorld ActualStateOfWorldAttacherUpdater) error {
|
actualStateOfWorld ActualStateOfWorldAttacherUpdater) error {
|
||||||
@ -391,6 +399,7 @@ func (oe *operationExecutor) MountVolume(
|
|||||||
func (oe *operationExecutor) UnmountVolume(
|
func (oe *operationExecutor) UnmountVolume(
|
||||||
volumeToUnmount MountedVolume,
|
volumeToUnmount MountedVolume,
|
||||||
actualStateOfWorld ActualStateOfWorldMounterUpdater) error {
|
actualStateOfWorld ActualStateOfWorldMounterUpdater) error {
|
||||||
|
|
||||||
unmountFunc, err :=
|
unmountFunc, err :=
|
||||||
oe.generateUnmountVolumeFunc(volumeToUnmount, actualStateOfWorld)
|
oe.generateUnmountVolumeFunc(volumeToUnmount, actualStateOfWorld)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@ -811,11 +820,14 @@ func (oe *operationExecutor) generateUnmountVolumeFunc(
|
|||||||
}
|
}
|
||||||
|
|
||||||
glog.Infof(
|
glog.Infof(
|
||||||
"UnmountVolume.TearDown succeeded for volume %q (volume.spec.Name: %q) pod %q (UID: %q).",
|
"UnmountVolume.TearDown succeeded for volume %q (OuterVolumeSpecName: %q) pod %q (UID: %q). InnerVolumeSpecName %q. PluginName %q, VolumeGidValue %q",
|
||||||
volumeToUnmount.VolumeName,
|
volumeToUnmount.VolumeName,
|
||||||
volumeToUnmount.OuterVolumeSpecName,
|
volumeToUnmount.OuterVolumeSpecName,
|
||||||
volumeToUnmount.PodName,
|
volumeToUnmount.PodName,
|
||||||
volumeToUnmount.PodUID)
|
volumeToUnmount.PodUID,
|
||||||
|
volumeToUnmount.InnerVolumeSpecName,
|
||||||
|
volumeToUnmount.PluginName,
|
||||||
|
volumeToUnmount.VolumeGidValue)
|
||||||
|
|
||||||
// Update actual state of world
|
// Update actual state of world
|
||||||
markVolMountedErr := actualStateOfWorld.MarkVolumeAsUnmounted(
|
markVolMountedErr := actualStateOfWorld.MarkVolumeAsUnmounted(
|
||||||
@ -879,7 +891,17 @@ func (oe *operationExecutor) generateUnmountDeviceFunc(
|
|||||||
deviceToDetach.VolumeSpec.Name(),
|
deviceToDetach.VolumeSpec.Name(),
|
||||||
err)
|
err)
|
||||||
}
|
}
|
||||||
|
refs, err := attachableVolumePlugin.GetDeviceMountRefs(deviceMountPath)
|
||||||
|
if err != nil || len(refs) > 0 {
|
||||||
|
if err == nil {
|
||||||
|
err = fmt.Errorf("The device mount path %q is still mounted by other references %v", deviceMountPath, refs)
|
||||||
|
}
|
||||||
|
return fmt.Errorf(
|
||||||
|
"GetDeviceMountRefs check failed for volume %q (spec.Name: %q) with: %v",
|
||||||
|
deviceToDetach.VolumeName,
|
||||||
|
deviceToDetach.VolumeSpec.Name(),
|
||||||
|
err)
|
||||||
|
}
|
||||||
// Execute unmount
|
// Execute unmount
|
||||||
unmountDeviceErr := volumeDetacher.UnmountDevice(deviceMountPath)
|
unmountDeviceErr := volumeDetacher.UnmountDevice(deviceMountPath)
|
||||||
if unmountDeviceErr != nil {
|
if unmountDeviceErr != nil {
|
||||||
|
@ -54,8 +54,10 @@ func GetUniqueVolumeName(pluginName, volumeName string) api.UniqueVolumeName {
|
|||||||
|
|
||||||
// GetUniqueVolumeNameForNonAttachableVolume returns the unique volume name
|
// GetUniqueVolumeNameForNonAttachableVolume returns the unique volume name
|
||||||
// for a non-attachable volume.
|
// for a non-attachable volume.
|
||||||
func GetUniqueVolumeNameForNonAttachableVolume(podName types.UniquePodName, volumePlugin volume.VolumePlugin, podSpecName string) api.UniqueVolumeName {
|
func GetUniqueVolumeNameForNonAttachableVolume(
|
||||||
return api.UniqueVolumeName(fmt.Sprintf("%s/%v-%s", volumePlugin.GetPluginName(), podName, podSpecName))
|
podName types.UniquePodName, volumePlugin volume.VolumePlugin, volumeSpec *volume.Spec) api.UniqueVolumeName {
|
||||||
|
return api.UniqueVolumeName(
|
||||||
|
fmt.Sprintf("%s/%v-%s", volumePlugin.GetPluginName(), podName, volumeSpec.Name()))
|
||||||
}
|
}
|
||||||
|
|
||||||
// GetUniqueVolumeNameFromSpec uses the given VolumePlugin to generate a unique
|
// GetUniqueVolumeNameFromSpec uses the given VolumePlugin to generate a unique
|
||||||
|
@ -135,6 +135,18 @@ func (plugin *vsphereVolumePlugin) getCloudProvider() (*vsphere.VSphere, error)
|
|||||||
return vs, nil
|
return vs, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (plugin *vsphereVolumePlugin) ConstructVolumeSpec(volumeName, mountPath string) (*volume.Spec, error) {
|
||||||
|
vsphereVolume := &api.Volume{
|
||||||
|
Name: volumeName,
|
||||||
|
VolumeSource: api.VolumeSource{
|
||||||
|
VsphereVolume: &api.VsphereVirtualDiskVolumeSource{
|
||||||
|
VolumePath: volumeName,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
return volume.NewSpecFromVolume(vsphereVolume), nil
|
||||||
|
}
|
||||||
|
|
||||||
// Abstract interface to disk operations.
|
// Abstract interface to disk operations.
|
||||||
type vdManager interface {
|
type vdManager interface {
|
||||||
// Attaches the disk to the kubelet's host machine.
|
// Attaches the disk to the kubelet's host machine.
|
||||||
|
Loading…
Reference in New Issue
Block a user