Merge pull request #33616 from jingxu97/statusupdater-9-22

Automatic merge from submit-queue

Fix volume states out of sync problem after kubelet restarts

When kubelet restarts, all the information about the volumes will be
gone from actual/desired states. When update node status with mounted
volumes, the volume list might be empty although there are still volumes
are mounted and in turn causing master to detach those volumes since
they are not in the mounted volumes list. This fix is to make sure only
update mounted volumes list after reconciler starts sync states process.
This sync state process will scan the existing volume directories and
reconstruct actual states if they are missing.

This PR also fixes the problem during orphaned pods' directories. In
case of the pod directory is unmounted but has not yet deleted (e.g.,
interrupted with kubelet restarts), clean up routine will delete the
directory so that the pod directoriy could be cleaned up (it is safe to
delete directory since it is no longer mounted)

The third issue this PR fixes is that during reconstruct volume in
actual state, mounter could not be nil since it is required for creating
container.VolumeMap. If it is nil, it might cause nil pointer exception
in kubelet.
Detailed design proposal is #33203
This commit is contained in:
Kubernetes Submit Queue 2016-10-25 16:19:19 -07:00 committed by GitHub
commit d7f1484e6f
11 changed files with 231 additions and 100 deletions

View File

@ -239,9 +239,9 @@ func (kl *Kubelet) GetExtraSupplementalGroupsForPod(pod *api.Pod) []int64 {
return kl.volumeManager.GetExtraSupplementalGroupsForPod(pod)
}
// getPodVolumeNameListFromDisk returns a list of the volume names by reading the
// getPodVolumePathListFromDisk returns a list of the volume paths by reading the
// volume directories for the given pod from the disk.
func (kl *Kubelet) getPodVolumeNameListFromDisk(podUID types.UID) ([]string, error) {
func (kl *Kubelet) getPodVolumePathListFromDisk(podUID types.UID) ([]string, error) {
volumes := []string{}
podVolDir := kl.getPodVolumesDir(podUID)
volumePluginDirs, err := ioutil.ReadDir(podVolDir)
@ -254,9 +254,11 @@ func (kl *Kubelet) getPodVolumeNameListFromDisk(podUID types.UID) ([]string, err
volumePluginPath := path.Join(podVolDir, volumePluginName)
volumeDirs, err := util.ReadDirNoStat(volumePluginPath)
if err != nil {
return volumes, err
return volumes, fmt.Errorf("Could not read directory %s: %v", volumePluginPath, err)
}
for _, volumeDir := range volumeDirs {
volumes = append(volumes, path.Join(volumePluginPath, volumeDir))
}
volumes = append(volumes, volumeDirs...)
}
return volumes, nil
}

View File

@ -342,6 +342,8 @@ func (kl *Kubelet) tryUpdateNodeStatus() error {
}
// Update the current status on the API server
updatedNode, err := kl.kubeClient.Core().Nodes().UpdateStatus(node)
// If update finishes sucessfully, mark the volumeInUse as reportedInUse to indicate
// those volumes are already updated in the node's status
if err == nil {
kl.volumeManager.MarkVolumesAsReportedInUse(
updatedNode.Status.VolumesInUse)
@ -882,9 +884,13 @@ func (kl *Kubelet) recordNodeSchedulableEvent(node *api.Node) {
}
}
// Update VolumesInUse field in Node Status
// Update VolumesInUse field in Node Status only after states are synced up at least once
// in volume reconciler.
func (kl *Kubelet) setNodeVolumesInUseStatus(node *api.Node) {
node.Status.VolumesInUse = kl.volumeManager.GetVolumesInUse()
// Make sure to only update node status after reconciler starts syncing up states
if kl.volumeManager.ReconcilerStatesHasBeenSynced() {
node.Status.VolumesInUse = kl.volumeManager.GetVolumesInUse()
}
}
// setNodeStatus fills in the Status fields of the given Node, overwriting

View File

@ -27,6 +27,7 @@ import (
"k8s.io/kubernetes/pkg/securitycontext"
"k8s.io/kubernetes/pkg/types"
utilerrors "k8s.io/kubernetes/pkg/util/errors"
"k8s.io/kubernetes/pkg/util/mount"
"k8s.io/kubernetes/pkg/util/selinux"
"k8s.io/kubernetes/pkg/util/sets"
"k8s.io/kubernetes/pkg/volume"
@ -153,8 +154,20 @@ func (kl *Kubelet) cleanupOrphanedPodDirs(
continue
}
// Check whether volume is still mounted on disk. If so, do not delete directory
if volumeNames, err := kl.getPodVolumeNameListFromDisk(uid); err != nil || len(volumeNames) != 0 {
glog.V(3).Infof("Orphaned pod %q found, but volumes are still mounted; err: %v, volumes: %v ", uid, err, volumeNames)
volumePaths, err := kl.getPodVolumePathListFromDisk(uid)
if err != nil {
glog.Errorf("Orphaned pod %q found, but error %v occured during reading volume dir from disk", uid, err)
continue
} else if len(volumePaths) > 0 {
for _, path := range volumePaths {
notMount, err := mount.IsNotMountPoint(path)
if err == nil && notMount {
glog.V(2).Infof("Volume path %q is no longer mounted, remove it", path)
os.Remove(path)
} else {
glog.Errorf("Orphaned pod %q found, but it might still mounted with error %v", uid, err)
}
}
continue
}

View File

@ -186,6 +186,7 @@ func IsRemountRequiredError(err error) bool {
type actualStateOfWorld struct {
// nodeName is the name of this node. This value is passed to Attach/Detach
nodeName types.NodeName
// attachedVolumes is a map containing the set of volumes the kubelet volume
// manager believes to be successfully attached to this node. Volume types
// that do not implement an attacher interface are assumed to be in this
@ -193,6 +194,7 @@ type actualStateOfWorld struct {
// The key in this map is the name of the volume and the value is an object
// containing more information about the attached volume.
attachedVolumes map[api.UniqueVolumeName]attachedVolume
// volumePluginMgr is the volume plugin manager used to create volume
// plugin objects.
volumePluginMgr *volume.VolumePluginMgr

View File

@ -58,7 +58,8 @@ type DesiredStateOfWorld interface {
// ReportedInUse value is reset to false. The default ReportedInUse value
// for a newly created volume is false.
// When set to true this value indicates that the volume was successfully
// added to the VolumesInUse field in the node's status.
// added to the VolumesInUse field in the node's status. Mount operation needs
// to check this value before issuing the operation.
// If a volume in the reportedVolumes list does not exist in the list of
// volumes that should be attached to this node, it is skipped without error.
MarkVolumesReportedInUse(reportedVolumes []api.UniqueVolumeName)

View File

@ -37,7 +37,7 @@ import (
"k8s.io/kubernetes/pkg/util/mount"
"k8s.io/kubernetes/pkg/util/strings"
"k8s.io/kubernetes/pkg/util/wait"
"k8s.io/kubernetes/pkg/volume"
volumepkg "k8s.io/kubernetes/pkg/volume"
"k8s.io/kubernetes/pkg/volume/util/nestedpendingoperations"
"k8s.io/kubernetes/pkg/volume/util/operationexecutor"
volumetypes "k8s.io/kubernetes/pkg/volume/util/types"
@ -59,6 +59,10 @@ type Reconciler interface {
// volumes that should be attached are attached and volumes that should
// be detached are detached and trigger attach/detach operations as needed.
Run(sourcesReady config.SourcesReady, stopCh <-chan struct{})
// StatesHasBeenSynced returns true only after syncStates process starts to sync
// states at least once after kubelet starts
StatesHasBeenSynced() bool
}
// NewReconciler returns a new instance of Reconciler.
@ -68,7 +72,7 @@ type Reconciler interface {
// this node, and therefore the volume manager should not
// loopSleepDuration - the amount of time the reconciler loop sleeps between
// successive executions
// reconstructDuration - the amount of time the reconstruct sleeps between
// syncDuration - the amount of time the syncStates sleeps between
// successive executions
// waitForAttachTimeout - the amount of time the Mount function will wait for
// the volume to be attached
@ -84,20 +88,20 @@ func NewReconciler(
kubeClient internalclientset.Interface,
controllerAttachDetachEnabled bool,
loopSleepDuration time.Duration,
reconstructDuration time.Duration,
syncDuration time.Duration,
waitForAttachTimeout time.Duration,
nodeName types.NodeName,
desiredStateOfWorld cache.DesiredStateOfWorld,
actualStateOfWorld cache.ActualStateOfWorld,
operationExecutor operationexecutor.OperationExecutor,
mounter mount.Interface,
volumePluginMgr *volume.VolumePluginMgr,
volumePluginMgr *volumepkg.VolumePluginMgr,
kubeletPodsDir string) Reconciler {
return &reconciler{
kubeClient: kubeClient,
controllerAttachDetachEnabled: controllerAttachDetachEnabled,
loopSleepDuration: loopSleepDuration,
reconstructDuration: reconstructDuration,
syncDuration: syncDuration,
waitForAttachTimeout: waitForAttachTimeout,
nodeName: nodeName,
desiredStateOfWorld: desiredStateOfWorld,
@ -106,7 +110,7 @@ func NewReconciler(
mounter: mounter,
volumePluginMgr: volumePluginMgr,
kubeletPodsDir: kubeletPodsDir,
timeOfLastReconstruct: time.Now(),
timeOfLastSync: time.Time{},
}
}
@ -114,16 +118,16 @@ type reconciler struct {
kubeClient internalclientset.Interface
controllerAttachDetachEnabled bool
loopSleepDuration time.Duration
reconstructDuration time.Duration
syncDuration time.Duration
waitForAttachTimeout time.Duration
nodeName types.NodeName
desiredStateOfWorld cache.DesiredStateOfWorld
actualStateOfWorld cache.ActualStateOfWorld
operationExecutor operationexecutor.OperationExecutor
mounter mount.Interface
volumePluginMgr *volume.VolumePluginMgr
volumePluginMgr *volumepkg.VolumePluginMgr
kubeletPodsDir string
timeOfLastReconstruct time.Time
timeOfLastSync time.Time
}
func (rc *reconciler) Run(sourcesReady config.SourcesReady, stopCh <-chan struct{}) {
@ -139,9 +143,9 @@ func (rc *reconciler) reconciliationLoopFunc(sourcesReady config.SourcesReady) f
// reconciler's reconstruct process may add incomplete volume information and cause confusion.
// In addition, if some sources are not ready, the reconstruct process may clean up pods' volumes
// that are still in use because desired states could not get a complete list of pods.
if sourcesReady.AllReady() && time.Since(rc.timeOfLastReconstruct) > rc.reconstructDuration {
if sourcesReady.AllReady() && time.Since(rc.timeOfLastSync) > rc.syncDuration {
glog.V(5).Infof("Sources are all ready, starting reconstruct state function")
rc.reconstruct()
rc.sync()
}
}
}
@ -292,12 +296,17 @@ func (rc *reconciler) reconcile() {
err)
}
if err == nil {
glog.Infof("MountVolume operation started for volume %q (spec.Name: %q) to pod %q (UID: %q). %s",
logMsg := fmt.Sprintf("MountVolume operation started for volume %q (spec.Name: %q) to pod %q (UID: %q). %s",
volumeToMount.VolumeName,
volumeToMount.VolumeSpec.Name(),
volumeToMount.PodName,
volumeToMount.Pod.UID,
remountingLogStr)
if remountingLogStr == "" {
glog.V(1).Infof(logMsg)
} else {
glog.V(5).Infof(logMsg)
}
}
}
}
@ -366,18 +375,22 @@ func (rc *reconciler) reconcile() {
}
}
// reconstruct process tries to observe the real world by scanning all pods' volume directories from the disk.
// sync process tries to observe the real world by scanning all pods' volume directories from the disk.
// If the actual and desired state of worlds are not consistent with the observed world, it means that some
// mounted volumes are left out probably during kubelet restart. This process will reconstruct
// the volumes and udpate the actual and desired states. In the following reconciler loop, those volumes will
// be cleaned up.
func (rc *reconciler) reconstruct() {
defer rc.updateReconstructTime()
rc.reconstructStates(rc.kubeletPodsDir)
func (rc *reconciler) sync() {
defer rc.updateLastSyncTime()
rc.syncStates(rc.kubeletPodsDir)
}
func (rc *reconciler) updateReconstructTime() {
rc.timeOfLastReconstruct = time.Now()
func (rc *reconciler) updateLastSyncTime() {
rc.timeOfLastSync = time.Now()
}
func (rc *reconciler) StatesHasBeenSynced() bool {
return !rc.timeOfLastSync.IsZero()
}
type podVolume struct {
@ -387,25 +400,39 @@ type podVolume struct {
pluginName string
}
type reconstructedVolume struct {
volumeName api.UniqueVolumeName
podName volumetypes.UniquePodName
volumeSpec *volumepkg.Spec
outerVolumeSpecName string
pod *api.Pod
pluginIsAttachable bool
volumeGidValue string
devicePath string
reportedInUse bool
mounter volumepkg.Mounter
}
// reconstructFromDisk scans the volume directories under the given pod directory. If the volume is not
// in either actual or desired state of world, or pending operation, this function will reconstruct
// the volume spec and put it in both the actual and desired state of worlds. If no running
// container is mounting the volume, the volume will be removed by desired state of world's populator and
// cleaned up by the reconciler.
func (rc *reconciler) reconstructStates(podsDir string) {
func (rc *reconciler) syncStates(podsDir string) {
// Get volumes information by reading the pod's directory
podVolumes, err := getVolumesFromPodDir(podsDir)
if err != nil {
glog.Errorf("Cannot get volumes from disk %v", err)
return
}
volumesNeedUpdate := make(map[api.UniqueVolumeName]*reconstructedVolume)
for _, volume := range podVolumes {
volumeToMount, err := rc.reconstructVolume(volume)
reconstructedVolume, err := rc.reconstructVolume(volume)
if err != nil {
glog.Errorf("Could not construct volume information: %v", err)
continue
}
// Check if there is an pending operation for the given pod and volume.
// Need to check pending operation before checking the actual and desired
// states to avoid race condition during checking. For example, the following
@ -413,26 +440,50 @@ func (rc *reconciler) reconstructStates(podsDir string) {
// 1. Checking the pod and it does not exist in either actual or desired state.
// 2. An operation for the given pod finishes and the actual state is updated.
// 3. Checking and there is no pending operation for the given pod.
if rc.operationExecutor.IsOperationPending(volumeToMount.VolumeName, volumeToMount.PodName) {
continue
}
desiredPods := rc.desiredStateOfWorld.GetPods()
actualPods := rc.actualStateOfWorld.GetPods()
if desiredPods[volume.podName] || actualPods[volume.podName] {
continue
// During state reconstruction period, no new volume operations could be issued. If the
// mounted path is not in either pending operation, or actual or desired states, this
// volume needs to be reconstructed back to the states.
pending := rc.operationExecutor.IsOperationPending(reconstructedVolume.volumeName, reconstructedVolume.podName)
dswExist := rc.desiredStateOfWorld.PodExistsInVolume(reconstructedVolume.podName, reconstructedVolume.volumeName)
aswExist, _, _ := rc.actualStateOfWorld.PodExistsInVolume(reconstructedVolume.podName, reconstructedVolume.volumeName)
if !rc.StatesHasBeenSynced() {
// In case this is the first time to reconstruct state after kubelet starts, for a persistant volume, it must have
// been mounted before kubelet restarts because no mount operations could be started at this time (node
// status has not yet been updated before this very first syncStates finishes, so that VerifyControllerAttachedVolume will fail),
// In this case, the volume state should be put back to actual state now no matter desired state has it or not.
// This is to prevent node status from being updated to empty for attachable volumes. This might happen because
// in the case that a volume is discovered on disk, and it is part of desired state, but is then quickly deleted
// from the desired state. If in such situation, the volume is not added to the actual state, the node status updater will
// not get this volume from either actual or desired state. In turn, this might cause master controller
// detaching while the volume is still mounted.
if aswExist || !reconstructedVolume.pluginIsAttachable {
continue
}
} else {
// Check pending first since no new operations could be started at this point.
// Otherwise there might a race condition in checking actual states and pending operations
if pending || dswExist || aswExist {
continue
}
}
glog.V(3).Infof(
"Could not find pod information in desired or actual states or pending operation, update it in both states: %+v",
volumeToMount)
if err = rc.updateStates(volumeToMount); err != nil {
glog.V(2).Infof(
"Reconciler sync states: could not find pod information in desired or actual states or pending operation, update it in both states: %+v",
reconstructedVolume)
volumesNeedUpdate[reconstructedVolume.volumeName] = reconstructedVolume
}
if len(volumesNeedUpdate) > 0 {
if err = rc.updateStates(volumesNeedUpdate); err != nil {
glog.Errorf("Error occurred during reconstruct volume from disk: %v", err)
}
}
}
// Reconstruct Volume object and volumeToMount data structure by reading the pod's volume directories
func (rc *reconciler) reconstructVolume(volume podVolume) (*operationexecutor.VolumeToMount, error) {
// Reconstruct Volume object and reconstructedVolume data structure by reading the pod's volume directories
func (rc *reconciler) reconstructVolume(volume podVolume) (*reconstructedVolume, error) {
plugin, err := rc.volumePluginMgr.FindPluginByName(volume.pluginName)
if err != nil {
return nil, err
@ -461,48 +512,83 @@ func (rc *reconciler) reconstructVolume(volume podVolume) (*operationexecutor.Vo
uniqueVolumeName = volumehelper.GetUniqueVolumeNameForNonAttachableVolume(volume.podName, plugin, volumeSpec)
}
volumeToMount := &operationexecutor.VolumeToMount{
VolumeName: uniqueVolumeName,
PodName: volume.podName,
VolumeSpec: volumeSpec,
OuterVolumeSpecName: volumeName, /*volumeName is InnerVolumeSpecName. But this information will not be used for cleanup*/
Pod: pod,
PluginIsAttachable: attachablePlugin != nil,
VolumeGidValue: "",
DevicePath: "",
volumeMounter, newMounterErr := plugin.NewMounter(
volumeSpec,
pod,
volumepkg.VolumeOptions{})
if newMounterErr != nil {
return nil, fmt.Errorf(
"MountVolume.NewMounter failed for volume %q (spec.Name: %q) pod %q (UID: %q) with: %v",
uniqueVolumeName,
volumeSpec.Name(),
volume.podName,
pod.UID,
newMounterErr)
}
return volumeToMount, nil
reconstructedVolume := &reconstructedVolume{
volumeName: uniqueVolumeName,
podName: volume.podName,
volumeSpec: volumeSpec,
outerVolumeSpecName: volumeName, /* volumeName is InnerVolumeSpecName. But this information will not be used for cleanup */
pod: pod,
pluginIsAttachable: attachablePlugin != nil,
volumeGidValue: "",
devicePath: "",
mounter: volumeMounter,
}
return reconstructedVolume, nil
}
func (rc *reconciler) updateStates(volumeToMount *operationexecutor.VolumeToMount) error {
err := rc.actualStateOfWorld.MarkVolumeAsAttached(
volumeToMount.VolumeName, volumeToMount.VolumeSpec, "", volumeToMount.DevicePath)
if err != nil {
return fmt.Errorf("Could not add volume information to actual state of world: %v", err)
}
err = rc.actualStateOfWorld.AddPodToVolume(
volumeToMount.PodName,
types.UID(volumeToMount.PodName),
volumeToMount.VolumeName,
nil,
volumeToMount.OuterVolumeSpecName,
volumeToMount.DevicePath)
if err != nil {
return fmt.Errorf("Could not add pod to volume information to actual state of world: %v", err)
}
if volumeToMount.PluginIsAttachable {
err = rc.actualStateOfWorld.MarkDeviceAsMounted(volumeToMount.VolumeName)
if err != nil {
return fmt.Errorf("Could not mark device is mounted to actual state of world: %v", err)
func (rc *reconciler) updateStates(volumesNeedUpdate map[api.UniqueVolumeName]*reconstructedVolume) error {
// Get the node status to retrieve volume device path information.
node, fetchErr := rc.kubeClient.Core().Nodes().Get(string(rc.nodeName))
if fetchErr != nil {
glog.Errorf("updateStates in reconciler: could not get node status with error %v", fetchErr)
} else {
for _, attachedVolume := range node.Status.VolumesAttached {
if volume, exists := volumesNeedUpdate[attachedVolume.Name]; exists {
volume.devicePath = attachedVolume.DevicePath
volumesNeedUpdate[attachedVolume.Name] = volume
glog.V(4).Infof("Get devicePath from node status for volume (%q): %q", attachedVolume.Name, volume.devicePath)
}
}
}
_, err = rc.desiredStateOfWorld.AddPodToVolume(volumeToMount.PodName,
volumeToMount.Pod,
volumeToMount.VolumeSpec,
volumeToMount.OuterVolumeSpecName,
volumeToMount.VolumeGidValue)
if err != nil {
return fmt.Errorf("Could not add pod to volume information to desired state of world: %v", err)
for _, volume := range volumesNeedUpdate {
err := rc.actualStateOfWorld.MarkVolumeAsAttached(
volume.volumeName, volume.volumeSpec, "" /* nodeName */, volume.devicePath)
if err != nil {
glog.Errorf("Could not add volume information to actual state of world: %v", err)
continue
}
err = rc.actualStateOfWorld.AddPodToVolume(
volume.podName,
types.UID(volume.podName),
volume.volumeName,
volume.mounter,
volume.outerVolumeSpecName,
volume.devicePath)
if err != nil {
glog.Errorf("Could not add pod to volume information to actual state of world: %v", err)
continue
}
if volume.pluginIsAttachable {
err = rc.actualStateOfWorld.MarkDeviceAsMounted(volume.volumeName)
if err != nil {
glog.Errorf("Could not mark device is mounted to actual state of world: %v", err)
continue
}
}
_, err = rc.desiredStateOfWorld.AddPodToVolume(volume.podName,
volume.pod,
volume.volumeSpec,
volume.outerVolumeSpecName,
volume.volumeGidValue)
if err != nil {
glog.Errorf("Could not add pod to volume information to desired state of world: %v", err)
}
}
return nil
}

View File

@ -42,8 +42,8 @@ import (
const (
// reconcilerLoopSleepDuration is the amount of time the reconciler loop
// waits between successive executions
reconcilerLoopSleepDuration time.Duration = 0 * time.Millisecond
reconcilerReconstructSleepPeriod time.Duration = 10 * time.Minute
reconcilerLoopSleepDuration time.Duration = 0 * time.Millisecond
reconcilerSyncStatesSleepPeriod time.Duration = 10 * time.Minute
// waitForAttachTimeout is the maximum amount of time a
// operationexecutor.Mount call will wait for a volume to be attached.
waitForAttachTimeout time.Duration = 1 * time.Second
@ -65,7 +65,7 @@ func Test_Run_Positive_DoNothing(t *testing.T) {
kubeClient,
false, /* controllerAttachDetachEnabled */
reconcilerLoopSleepDuration,
reconcilerReconstructSleepPeriod,
reconcilerSyncStatesSleepPeriod,
waitForAttachTimeout,
nodeName,
dsw,
@ -102,7 +102,7 @@ func Test_Run_Positive_VolumeAttachAndMount(t *testing.T) {
kubeClient,
false, /* controllerAttachDetachEnabled */
reconcilerLoopSleepDuration,
reconcilerReconstructSleepPeriod,
reconcilerSyncStatesSleepPeriod,
waitForAttachTimeout,
nodeName,
dsw,
@ -173,7 +173,7 @@ func Test_Run_Positive_VolumeMountControllerAttachEnabled(t *testing.T) {
kubeClient,
true, /* controllerAttachDetachEnabled */
reconcilerLoopSleepDuration,
reconcilerReconstructSleepPeriod,
reconcilerSyncStatesSleepPeriod,
waitForAttachTimeout,
nodeName,
dsw,
@ -245,7 +245,7 @@ func Test_Run_Positive_VolumeAttachMountUnmountDetach(t *testing.T) {
kubeClient,
false, /* controllerAttachDetachEnabled */
reconcilerLoopSleepDuration,
reconcilerReconstructSleepPeriod,
reconcilerSyncStatesSleepPeriod,
waitForAttachTimeout,
nodeName,
dsw,
@ -328,7 +328,7 @@ func Test_Run_Positive_VolumeUnmountControllerAttachEnabled(t *testing.T) {
kubeClient,
true, /* controllerAttachDetachEnabled */
reconcilerLoopSleepDuration,
reconcilerReconstructSleepPeriod,
reconcilerSyncStatesSleepPeriod,
waitForAttachTimeout,
nodeName,
dsw,

View File

@ -49,9 +49,9 @@ const (
// between successive executions
reconcilerLoopSleepPeriod time.Duration = 100 * time.Millisecond
// reconcilerReconstructSleepPeriod is the amount of time the reconciler reconstruct process
// reconcilerSyncStatesSleepPeriod is the amount of time the reconciler reconstruct process
// waits between successive executions
reconcilerReconstructSleepPeriod time.Duration = 3 * time.Minute
reconcilerSyncStatesSleepPeriod time.Duration = 3 * time.Minute
// desiredStateOfWorldPopulatorLoopSleepPeriod is the amount of time the
// DesiredStateOfWorldPopulator loop waits between successive executions
@ -115,7 +115,7 @@ type VolumeManager interface {
// from annotations on persistent volumes that the pod depends on.
GetExtraSupplementalGroupsForPod(pod *api.Pod) []int64
// Returns a list of all volumes that implement the volume.Attacher
// GetVolumesInUse returns a list of all volumes that implement the volume.Attacher
// interface and are currently in use according to the actual and desired
// state of the world caches. A volume is considered "in use" as soon as it
// is added to the desired state of world, indicating it *should* be
@ -126,6 +126,11 @@ type VolumeManager interface {
// restarts.
GetVolumesInUse() []api.UniqueVolumeName
// ReconcilerStatesHasBeenSynced returns true only after the actual states in reconciler
// has been synced at least once after kubelet starts so that it is safe to update mounted
// volume list retrieved from actual state.
ReconcilerStatesHasBeenSynced() bool
// VolumeIsAttached returns true if the given volume is attached to this
// node.
VolumeIsAttached(volumeName api.UniqueVolumeName) bool
@ -168,7 +173,7 @@ func NewVolumeManager(
kubeClient,
controllerAttachDetachEnabled,
reconcilerLoopSleepPeriod,
reconcilerReconstructSleepPeriod,
reconcilerSyncStatesSleepPeriod,
waitForAttachTimeout,
nodeName,
vm.desiredStateOfWorld,
@ -305,6 +310,10 @@ func (vm *volumeManager) GetVolumesInUse() []api.UniqueVolumeName {
return volumesToReportInUse
}
func (vm *volumeManager) ReconcilerStatesHasBeenSynced() bool {
return vm.reconciler.StatesHasBeenSynced()
}
func (vm *volumeManager) VolumeIsAttached(
volumeName api.UniqueVolumeName) bool {
return vm.actualStateOfWorld.VolumeExists(volumeName)

View File

@ -101,9 +101,10 @@ func isBind(options []string) (bool, []string) {
// doMount runs the mount command.
func doMount(mountCmd string, source string, target string, fstype string, options []string) error {
glog.V(5).Infof("Mounting %s %s %s %v with command: %q", source, target, fstype, options, mountCmd)
glog.V(4).Infof("Mounting %s %s %s %v with command: %q", source, target, fstype, options, mountCmd)
mountArgs := makeMountArgs(source, target, fstype, options)
glog.V(4).Infof("Mounting cmd (%s) with arguments (%s)", mountCmd, mountArgs)
command := exec.Command(mountCmd, mountArgs...)
output, err := command.CombinedOutput()
if err != nil {
@ -135,7 +136,7 @@ func makeMountArgs(source, target, fstype string, options []string) []string {
// Unmount unmounts the target.
func (mounter *Mounter) Unmount(target string) error {
glog.V(5).Infof("Unmounting %s", target)
glog.V(4).Infof("Unmounting %s", target)
command := exec.Command("umount", target)
output, err := command.CombinedOutput()
if err != nil {
@ -156,6 +157,10 @@ func (*Mounter) List() ([]MountPoint, error) {
// will return true. When in fact /tmp/b is a mount point. If this situation
// if of interest to you, don't use this function...
func (mounter *Mounter) IsLikelyNotMountPoint(file string) (bool, error) {
return IsNotMountPoint(file)
}
func IsNotMountPoint(file string) (bool, error) {
stat, err := os.Stat(file)
if err != nil {
return true, err
@ -173,9 +178,10 @@ func (mounter *Mounter) IsLikelyNotMountPoint(file string) (bool, error) {
}
// DeviceOpened checks if block device in use by calling Open with O_EXCL flag.
// Returns true if open returns errno EBUSY, and false if errno is nil.
// Returns an error if errno is any error other than EBUSY.
// Returns with error if pathname is not a device.
// If pathname is not a device, log and return false with nil error.
// If open returns errno EBUSY, return true with nil error.
// If open returns nil, return false with nil error.
// Otherwise, return false with error
func (mounter *Mounter) DeviceOpened(pathname string) (bool, error) {
return exclusiveOpenFailsOnDevice(pathname)
}
@ -187,12 +193,17 @@ func (mounter *Mounter) PathIsDevice(pathname string) (bool, error) {
}
func exclusiveOpenFailsOnDevice(pathname string) (bool, error) {
if isDevice, err := pathIsDevice(pathname); !isDevice {
isDevice, err := pathIsDevice(pathname)
if err != nil {
return false, fmt.Errorf(
"PathIsDevice failed for path %q: %v",
pathname,
err)
}
if !isDevice {
glog.Errorf("Path %q is not refering to a device.", pathname)
return false, nil
}
fd, errno := syscall.Open(pathname, syscall.O_RDONLY|syscall.O_EXCL, 0)
// If the device is in use, open will return an invalid fd.
// When this happens, it is expected that Close will fail and throw an error.

View File

@ -247,7 +247,7 @@ func (b *gcePersistentDiskMounter) SetUp(fsGroup *int64) error {
func (b *gcePersistentDiskMounter) SetUpAt(dir string, fsGroup *int64) error {
// TODO: handle failed mounts here.
notMnt, err := b.mounter.IsLikelyNotMountPoint(dir)
glog.V(4).Infof("PersistentDisk set up: %s %v %v, pd name %v readOnly %v", dir, !notMnt, err, b.pdName, b.readOnly)
glog.V(4).Infof("GCE PersistentDisk set up: Dir (%s) PD name (%q) Mounted (%t) Error (%v), ReadOnly (%t)", dir, b.pdName, !notMnt, err, b.readOnly)
if err != nil && !os.IsNotExist(err) {
glog.Errorf("cannot validate mount point: %s %v", dir, err)
return err

View File

@ -758,11 +758,12 @@ func (oe *operationExecutor) generateMountVolumeFunc(
}
glog.Infof(
"MountVolume.MountDevice succeeded for volume %q (spec.Name: %q) pod %q (UID: %q).",
"MountVolume.MountDevice succeeded for volume %q (spec.Name: %q) pod %q (UID: %q) device mount path %q",
volumeToMount.VolumeName,
volumeToMount.VolumeSpec.Name(),
volumeToMount.PodName,
volumeToMount.Pod.UID)
volumeToMount.Pod.UID,
deviceMountPath)
// Update actual state of world to reflect volume is globally mounted
markDeviceMountedErr := actualStateOfWorld.MarkDeviceAsMounted(