Mark VolumeInUse before checking if it is Attached

Ensure that kublet marks VolumeInUse before checking if it is Attached.
Also ensures that the attach/detach controller always fetches a fresh
copy of the node object before detach (instead ofKubelet relying on node
informer cache).
This commit is contained in:
saadali 2016-06-26 17:33:01 -07:00
parent ff7c280200
commit e06b32b1ef
12 changed files with 525 additions and 83 deletions

View File

@ -50,7 +50,7 @@ const (
// attach detach controller will wait for a volume to be safely unmounted
// from its node. Once this time has expired, the controller will assume the
// node or kubelet are unresponsive and will detach the volume anyway.
reconcilerMaxWaitForUnmountDuration time.Duration = 3 * time.Minute
reconcilerMaxWaitForUnmountDuration time.Duration = 6 * time.Minute
// desiredStateOfWorldPopulatorLoopSleepPeriod is the amount of time the
// DesiredStateOfWorldPopulator loop waits between successive executions

View File

@ -109,7 +109,7 @@ func (rc *reconciler) reconciliationLoopFunc() func() {
if !attachedVolume.MountedByNode {
glog.V(5).Infof("Attempting to start DetachVolume for volume %q from node %q", attachedVolume.VolumeName, attachedVolume.NodeName)
err := rc.attacherDetacher.DetachVolume(attachedVolume.AttachedVolume, rc.actualStateOfWorld)
err := rc.attacherDetacher.DetachVolume(attachedVolume.AttachedVolume, true /* verifySafeToDetach */, rc.actualStateOfWorld)
if err == nil {
glog.Infof("Started DetachVolume for volume %q from node %q", attachedVolume.VolumeName, attachedVolume.NodeName)
}
@ -129,7 +129,7 @@ func (rc *reconciler) reconciliationLoopFunc() func() {
// If volume is not safe to detach (is mounted) wait a max amount of time before detaching any way.
if timeElapsed > rc.maxWaitForUnmountDuration {
glog.V(5).Infof("Attempting to start DetachVolume for volume %q from node %q. Volume is not safe to detach, but maxWaitForUnmountDuration expired.", attachedVolume.VolumeName, attachedVolume.NodeName)
err := rc.attacherDetacher.DetachVolume(attachedVolume.AttachedVolume, rc.actualStateOfWorld)
err := rc.attacherDetacher.DetachVolume(attachedVolume.AttachedVolume, false /* verifySafeToDetach */, rc.actualStateOfWorld)
if err == nil {
glog.Infof("Started DetachVolume for volume %q from node %q due to maxWaitForUnmountDuration expiry.", attachedVolume.VolumeName, attachedVolume.NodeName)
}

View File

@ -3404,7 +3404,11 @@ func (kl *Kubelet) tryUpdateNodeStatus() error {
return err
}
// Update the current status on the API server
_, err = kl.kubeClient.Core().Nodes().UpdateStatus(node)
updatedNode, err := kl.kubeClient.Core().Nodes().UpdateStatus(node)
if err == nil {
kl.volumeManager.MarkVolumesAsReportedInUse(
updatedNode.Status.VolumesInUse)
}
return err
}

View File

@ -597,7 +597,7 @@ func TestVolumeUnmountAndDetachControllerDisabled(t *testing.T) {
}
// Verify volumes detached and no longer reported as in use
err = waitForVolumeDetach(kubelet.volumeManager)
err = waitForVolumeDetach(api.UniqueVolumeName("fake/vol1"), kubelet.volumeManager)
if err != nil {
t.Error(err)
}
@ -611,7 +611,6 @@ func TestVolumeUnmountAndDetachControllerDisabled(t *testing.T) {
if err != nil {
t.Error(err)
}
}
func TestVolumeAttachAndMountControllerEnabled(t *testing.T) {
@ -657,6 +656,13 @@ func TestVolumeAttachAndMountControllerEnabled(t *testing.T) {
}()
kubelet.podManager.SetPods([]*api.Pod{pod})
// Fake node status update
go simulateVolumeInUseUpdate(
api.UniqueVolumeName("fake/vol1"),
stopCh,
kubelet.volumeManager)
err := kubelet.volumeManager.WaitForAttachAndMount(pod)
if err != nil {
t.Errorf("Expected success: %v", err)
@ -747,6 +753,12 @@ func TestVolumeUnmountAndDetachControllerEnabled(t *testing.T) {
// Add pod
kubelet.podManager.SetPods([]*api.Pod{pod})
// Fake node status update
go simulateVolumeInUseUpdate(
api.UniqueVolumeName("fake/vol1"),
stopCh,
kubelet.volumeManager)
// Verify volumes attached
err := kubelet.volumeManager.WaitForAttachAndMount(pod)
if err != nil {
@ -815,7 +827,7 @@ func TestVolumeUnmountAndDetachControllerEnabled(t *testing.T) {
}
// Verify volumes detached and no longer reported as in use
err = waitForVolumeDetach(kubelet.volumeManager)
err = waitForVolumeDetach(api.UniqueVolumeName("fake/vol1"), kubelet.volumeManager)
if err != nil {
t.Error(err)
}
@ -828,7 +840,6 @@ func TestVolumeUnmountAndDetachControllerEnabled(t *testing.T) {
if err != nil {
t.Error(err)
}
}
func TestPodVolumesExist(t *testing.T) {
@ -4987,19 +4998,15 @@ func waitForVolumeUnmount(
}
func waitForVolumeDetach(
volumeName api.UniqueVolumeName,
volumeManager kubeletvolume.VolumeManager) error {
attachedVolumes := []api.UniqueVolumeName{}
err := retryWithExponentialBackOff(
time.Duration(50*time.Millisecond),
func() (bool, error) {
// Verify volumes detached
attachedVolumes = volumeManager.GetVolumesInUse()
if len(attachedVolumes) != 0 {
return false, nil
}
return true, nil
volumeAttached := volumeManager.VolumeIsAttached(volumeName)
return !volumeAttached, nil
},
)
@ -5020,3 +5027,20 @@ func retryWithExponentialBackOff(initialDuration time.Duration, fn wait.Conditio
}
return wait.ExponentialBackoff(backoff, fn)
}
func simulateVolumeInUseUpdate(
volumeName api.UniqueVolumeName,
stopCh <-chan struct{},
volumeManager kubeletvolume.VolumeManager) {
ticker := time.NewTicker(100 * time.Millisecond)
defer ticker.Stop()
for {
select {
case <-ticker.C:
volumeManager.MarkVolumesAsReportedInUse(
[]api.UniqueVolumeName{volumeName})
case <-stopCh:
return
}
}
}

View File

@ -117,6 +117,11 @@ type ActualStateOfWorld interface {
// volumes that do not need to update contents should not fail.
PodExistsInVolume(podName volumetypes.UniquePodName, volumeName api.UniqueVolumeName) (bool, string, error)
// VolumeExists returns true if the given volume exists in the list of
// attached volumes in the cache, indicating the volume is attached to this
// node.
VolumeExists(volumeName api.UniqueVolumeName) bool
// GetMountedVolumes generates and returns a list of volumes and the pods
// they are successfully attached and mounted for based on the current
// actual state of the world.
@ -127,12 +132,17 @@ type ActualStateOfWorld interface {
// current actual state of the world.
GetMountedVolumesForPod(podName volumetypes.UniquePodName) []MountedVolume
// GetAttachedVolumes generates and returns a list of all attached volumes.
GetAttachedVolumes() []AttachedVolume
// GetGloballyMountedVolumes generates and returns a list of all attached
// volumes that are globally mounted. This list can be used to determine
// which volumes should be reported as "in use" in the node's VolumesInUse
// status field. Globally mounted here refers to the shared plugin mount
// point for the attachable volume from which the pod specific mount points
// are created (via bind mount).
GetGloballyMountedVolumes() []AttachedVolume
// GetUnmountedVolumes generates and returns a list of attached volumes that
// have no mountedPods. This list can be used to determine which volumes are
// no longer referenced and may be detached.
// no longer referenced and may be globally unmounted and detached.
GetUnmountedVolumes() []AttachedVolume
}
@ -492,6 +502,15 @@ func (asw *actualStateOfWorld) PodExistsInVolume(
return podExists, volumeObj.devicePath, nil
}
func (asw *actualStateOfWorld) VolumeExists(
volumeName api.UniqueVolumeName) bool {
asw.RLock()
defer asw.RUnlock()
_, volumeExists := asw.attachedVolumes[volumeName]
return volumeExists
}
func (asw *actualStateOfWorld) GetMountedVolumes() []MountedVolume {
asw.RLock()
defer asw.RUnlock()
@ -525,17 +544,20 @@ func (asw *actualStateOfWorld) GetMountedVolumesForPod(
return mountedVolume
}
func (asw *actualStateOfWorld) GetAttachedVolumes() []AttachedVolume {
func (asw *actualStateOfWorld) GetGloballyMountedVolumes() []AttachedVolume {
asw.RLock()
defer asw.RUnlock()
unmountedVolumes := make([]AttachedVolume, 0 /* len */, len(asw.attachedVolumes) /* cap */)
globallyMountedVolumes := make(
[]AttachedVolume, 0 /* len */, len(asw.attachedVolumes) /* cap */)
for _, volumeObj := range asw.attachedVolumes {
unmountedVolumes = append(
unmountedVolumes,
asw.getAttachedVolume(&volumeObj))
if volumeObj.globallyMounted {
globallyMountedVolumes = append(
globallyMountedVolumes,
asw.newAttachedVolume(&volumeObj))
}
}
return unmountedVolumes
return globallyMountedVolumes
}
func (asw *actualStateOfWorld) GetUnmountedVolumes() []AttachedVolume {
@ -546,14 +568,14 @@ func (asw *actualStateOfWorld) GetUnmountedVolumes() []AttachedVolume {
if len(volumeObj.mountedPods) == 0 {
unmountedVolumes = append(
unmountedVolumes,
asw.getAttachedVolume(&volumeObj))
asw.newAttachedVolume(&volumeObj))
}
}
return unmountedVolumes
}
func (asw *actualStateOfWorld) getAttachedVolume(
func (asw *actualStateOfWorld) newAttachedVolume(
attachedVolume *attachedVolume) AttachedVolume {
return AttachedVolume{
AttachedVolume: operationexecutor.AttachedVolume{

View File

@ -27,7 +27,8 @@ import (
)
// Calls AddVolume() once to add volume
// Verifies newly added volume exists in GetAttachedVolumes()
// Verifies newly added volume exists in GetUnmountedVolumes()
// Verifies newly added volume doesn't exist in GetGloballyMountedVolumes()
func Test_AddVolume_Positive_NewVolume(t *testing.T) {
// Arrange
volumePluginMgr, _ := volumetesting.GetTestVolumePluginMgr(t)
@ -61,12 +62,15 @@ func Test_AddVolume_Positive_NewVolume(t *testing.T) {
t.Fatalf("AddVolume failed. Expected: <no error> Actual: <%v>", err)
}
verifyVolumeExistsInAttachedVolumes(t, generatedVolumeName, asw)
verifyVolumeExistsAsw(t, generatedVolumeName, true /* shouldExist */, asw)
verifyVolumeExistsInUnmountedVolumes(t, generatedVolumeName, asw)
verifyVolumeDoesntExistInGloballyMountedVolumes(t, generatedVolumeName, asw)
}
// Calls AddVolume() twice to add the same volume
// Verifies newly added volume exists in GetAttachedVolumes() and second call
// doesn't fail
// Verifies second call doesn't fail
// Verifies newly added volume exists in GetUnmountedVolumes()
// Verifies newly added volume doesn't exist in GetGloballyMountedVolumes()
func Test_AddVolume_Positive_ExistingVolume(t *testing.T) {
// Arrange
volumePluginMgr, _ := volumetesting.GetTestVolumePluginMgr(t)
@ -105,7 +109,9 @@ func Test_AddVolume_Positive_ExistingVolume(t *testing.T) {
t.Fatalf("AddVolume failed. Expected: <no error> Actual: <%v>", err)
}
verifyVolumeExistsInAttachedVolumes(t, generatedVolumeName, asw)
verifyVolumeExistsAsw(t, generatedVolumeName, true /* shouldExist */, asw)
verifyVolumeExistsInUnmountedVolumes(t, generatedVolumeName, asw)
verifyVolumeDoesntExistInGloballyMountedVolumes(t, generatedVolumeName, asw)
}
// Populates data struct with a volume
@ -160,7 +166,9 @@ func Test_AddPodToVolume_Positive_ExistingVolumeNewNode(t *testing.T) {
t.Fatalf("AddPodToVolume failed. Expected: <no error> Actual: <%v>", err)
}
verifyVolumeExistsInAttachedVolumes(t, generatedVolumeName, asw)
verifyVolumeExistsAsw(t, generatedVolumeName, true /* shouldExist */, asw)
verifyVolumeDoesntExistInUnmountedVolumes(t, generatedVolumeName, asw)
verifyVolumeDoesntExistInGloballyMountedVolumes(t, generatedVolumeName, asw)
verifyPodExistsInVolumeAsw(t, podName, generatedVolumeName, "fake/device/path" /* expectedDevicePath */, asw)
}
@ -223,7 +231,9 @@ func Test_AddPodToVolume_Positive_ExistingVolumeExistingNode(t *testing.T) {
t.Fatalf("AddPodToVolume failed. Expected: <no error> Actual: <%v>", err)
}
verifyVolumeExistsInAttachedVolumes(t, generatedVolumeName, asw)
verifyVolumeExistsAsw(t, generatedVolumeName, true /* shouldExist */, asw)
verifyVolumeDoesntExistInUnmountedVolumes(t, generatedVolumeName, asw)
verifyVolumeDoesntExistInGloballyMountedVolumes(t, generatedVolumeName, asw)
verifyPodExistsInVolumeAsw(t, podName, generatedVolumeName, "fake/device/path" /* expectedDevicePath */, asw)
}
@ -280,7 +290,9 @@ func Test_AddPodToVolume_Negative_VolumeDoesntExist(t *testing.T) {
t.Fatalf("AddPodToVolume did not fail. Expected: <\"no volume with the name ... exists in the list of attached volumes\"> Actual: <no error>")
}
verifyVolumeDoesntExistInAttachedVolumes(t, volumeName, asw)
verifyVolumeExistsAsw(t, volumeName, false /* shouldExist */, asw)
verifyVolumeDoesntExistInUnmountedVolumes(t, volumeName, asw)
verifyVolumeDoesntExistInGloballyMountedVolumes(t, volumeName, asw)
verifyPodDoesntExistInVolumeAsw(
t,
podName,
@ -289,28 +301,116 @@ func Test_AddPodToVolume_Negative_VolumeDoesntExist(t *testing.T) {
asw)
}
func verifyVolumeExistsInAttachedVolumes(
// Calls AddVolume() once to add volume
// Calls MarkDeviceAsMounted() to mark volume as globally mounted.
// Verifies newly added volume exists in GetUnmountedVolumes()
// Verifies newly added volume exists in GetGloballyMountedVolumes()
func Test_MarkDeviceAsMounted_Positive_NewVolume(t *testing.T) {
// Arrange
volumePluginMgr, _ := volumetesting.GetTestVolumePluginMgr(t)
asw := NewActualStateOfWorld("mynode" /* nodeName */, volumePluginMgr)
pod := &api.Pod{
ObjectMeta: api.ObjectMeta{
Name: "pod1",
UID: "pod1uid",
},
Spec: api.PodSpec{
Volumes: []api.Volume{
{
Name: "volume-name",
VolumeSource: api.VolumeSource{
GCEPersistentDisk: &api.GCEPersistentDiskVolumeSource{
PDName: "fake-device1",
},
},
},
},
},
}
volumeSpec := &volume.Spec{Volume: &pod.Spec.Volumes[0]}
devicePath := "fake/device/path"
generatedVolumeName, err := asw.AddVolume(volumeSpec, devicePath)
if err != nil {
t.Fatalf("AddVolume failed. Expected: <no error> Actual: <%v>", err)
}
// Act
err = asw.MarkDeviceAsMounted(generatedVolumeName)
// Assert
if err != nil {
t.Fatalf("MarkDeviceAsMounted failed. Expected: <no error> Actual: <%v>", err)
}
verifyVolumeExistsAsw(t, generatedVolumeName, true /* shouldExist */, asw)
verifyVolumeExistsInUnmountedVolumes(t, generatedVolumeName, asw)
verifyVolumeExistsInGloballyMountedVolumes(t, generatedVolumeName, asw)
}
func verifyVolumeExistsInGloballyMountedVolumes(
t *testing.T, expectedVolumeName api.UniqueVolumeName, asw ActualStateOfWorld) {
attachedVolumes := asw.GetAttachedVolumes()
for _, volume := range attachedVolumes {
globallyMountedVolumes := asw.GetGloballyMountedVolumes()
for _, volume := range globallyMountedVolumes {
if volume.VolumeName == expectedVolumeName {
return
}
}
t.Fatalf(
"Could not find volume %v in the list of attached volumes for actual state of world %+v",
"Could not find volume %v in the list of GloballyMountedVolumes for actual state of world %+v",
expectedVolumeName,
attachedVolumes)
globallyMountedVolumes)
}
func verifyVolumeDoesntExistInAttachedVolumes(
func verifyVolumeDoesntExistInGloballyMountedVolumes(
t *testing.T, volumeToCheck api.UniqueVolumeName, asw ActualStateOfWorld) {
attachedVolumes := asw.GetAttachedVolumes()
for _, volume := range attachedVolumes {
globallyMountedVolumes := asw.GetGloballyMountedVolumes()
for _, volume := range globallyMountedVolumes {
if volume.VolumeName == volumeToCheck {
t.Fatalf(
"Found volume %v in the list of attached volumes. Expected it not to exist.",
"Found volume %v in the list of GloballyMountedVolumes. Expected it not to exist.",
volumeToCheck)
}
}
}
func verifyVolumeExistsAsw(
t *testing.T,
expectedVolumeName api.UniqueVolumeName,
shouldExist bool,
asw ActualStateOfWorld) {
volumeExists := asw.VolumeExists(expectedVolumeName)
if shouldExist != volumeExists {
t.Fatalf(
"VolumeExists(%q) response incorrect. Expected: <%v> Actual: <%v>",
expectedVolumeName,
shouldExist,
volumeExists)
}
}
func verifyVolumeExistsInUnmountedVolumes(
t *testing.T, expectedVolumeName api.UniqueVolumeName, asw ActualStateOfWorld) {
unmountedVolumes := asw.GetUnmountedVolumes()
for _, volume := range unmountedVolumes {
if volume.VolumeName == expectedVolumeName {
return
}
}
t.Fatalf(
"Could not find volume %v in the list of UnmountedVolumes for actual state of world %+v",
expectedVolumeName,
unmountedVolumes)
}
func verifyVolumeDoesntExistInUnmountedVolumes(
t *testing.T, volumeToCheck api.UniqueVolumeName, asw ActualStateOfWorld) {
unmountedVolumes := asw.GetUnmountedVolumes()
for _, volume := range unmountedVolumes {
if volume.VolumeName == volumeToCheck {
t.Fatalf(
"Found volume %v in the list of UnmountedVolumes. Expected it not to exist.",
volumeToCheck)
}
}

View File

@ -53,6 +53,16 @@ type DesiredStateOfWorld interface {
// volume, this is a no-op.
AddPodToVolume(podName types.UniquePodName, pod *api.Pod, volumeSpec *volume.Spec, outerVolumeSpecName string, volumeGidValue string) (api.UniqueVolumeName, error)
// MarkVolumesReportedInUse sets the ReportedInUse value to true for the
// reportedVolumes. For volumes not in the reportedVolumes list, the
// ReportedInUse value is reset to false. The default ReportedInUse value
// for a newly created volume is false.
// When set to true this value indicates that the volume was successfully
// added to the VolumesInUse field in the node's status.
// If a volume in the reportedVolumes list does not exist in the list of
// volumes that should be attached to this node, it is skipped without error.
MarkVolumesReportedInUse(reportedVolumes []api.UniqueVolumeName)
// DeletePodFromVolume removes the given pod from the given volume in the
// cache indicating the specified pod no longer requires the specified
// volume.
@ -128,6 +138,10 @@ type volumeToMount struct {
// volumeGidValue contains the value of the GID annotation, if present.
volumeGidValue string
// reportedInUse indicates that the volume was successfully added to the
// VolumesInUse field in the node's status.
reportedInUse bool
}
// The pod object represents a pod that references the underlying volume and
@ -186,6 +200,7 @@ func (dsw *desiredStateOfWorld) AddPodToVolume(
podsToMount: make(map[types.UniquePodName]podToMount),
pluginIsAttachable: dsw.isAttachableVolume(volumeSpec),
volumeGidValue: volumeGidValue,
reportedInUse: false,
}
dsw.volumesToMount[volumeName] = volumeObj
}
@ -203,6 +218,25 @@ func (dsw *desiredStateOfWorld) AddPodToVolume(
return volumeName, nil
}
func (dsw *desiredStateOfWorld) MarkVolumesReportedInUse(
reportedVolumes []api.UniqueVolumeName) {
dsw.Lock()
defer dsw.Unlock()
reportedVolumesMap := make(
map[api.UniqueVolumeName]bool, len(reportedVolumes) /* capacity */)
for _, reportedVolume := range reportedVolumes {
reportedVolumesMap[reportedVolume] = true
}
for volumeName, volumeObj := range dsw.volumesToMount {
_, volumeReported := reportedVolumesMap[volumeName]
volumeObj.reportedInUse = volumeReported
dsw.volumesToMount[volumeName] = volumeObj
}
}
func (dsw *desiredStateOfWorld) DeletePodFromVolume(
podName types.UniquePodName, volumeName api.UniqueVolumeName) {
dsw.Lock()
@ -266,7 +300,8 @@ func (dsw *desiredStateOfWorld) GetVolumesToMount() []VolumeToMount {
VolumeSpec: podObj.spec,
PluginIsAttachable: volumeObj.pluginIsAttachable,
OuterVolumeSpecName: podObj.outerVolumeSpecName,
VolumeGidValue: volumeObj.volumeGidValue}})
VolumeGidValue: volumeObj.volumeGidValue,
ReportedInUse: volumeObj.reportedInUse}})
}
}
return volumesToMount

View File

@ -64,8 +64,9 @@ func Test_AddPodToVolume_Positive_NewPodNewVolume(t *testing.T) {
t.Fatalf("AddPodToVolume failed. Expected: <no error> Actual: <%v>", err)
}
verifyVolumeExists(t, generatedVolumeName, dsw)
verifyVolumeExistsInVolumesToMount(t, generatedVolumeName, dsw)
verifyVolumeExistsDsw(t, generatedVolumeName, dsw)
verifyVolumeExistsInVolumesToMount(
t, generatedVolumeName, false /* expectReportedInUse */, dsw)
verifyPodExistsInVolumeDsw(t, podName, generatedVolumeName, dsw)
}
@ -107,8 +108,9 @@ func Test_AddPodToVolume_Positive_ExistingPodExistingVolume(t *testing.T) {
t.Fatalf("AddPodToVolume failed. Expected: <no error> Actual: <%v>", err)
}
verifyVolumeExists(t, generatedVolumeName, dsw)
verifyVolumeExistsInVolumesToMount(t, generatedVolumeName, dsw)
verifyVolumeExistsDsw(t, generatedVolumeName, dsw)
verifyVolumeExistsInVolumesToMount(
t, generatedVolumeName, false /* expectReportedInUse */, dsw)
verifyPodExistsInVolumeDsw(t, podName, generatedVolumeName, dsw)
}
@ -145,8 +147,9 @@ func Test_DeletePodFromVolume_Positive_PodExistsVolumeExists(t *testing.T) {
if err != nil {
t.Fatalf("AddPodToVolume failed. Expected: <no error> Actual: <%v>", err)
}
verifyVolumeExists(t, generatedVolumeName, dsw)
verifyVolumeExistsInVolumesToMount(t, generatedVolumeName, dsw)
verifyVolumeExistsDsw(t, generatedVolumeName, dsw)
verifyVolumeExistsInVolumesToMount(
t, generatedVolumeName, false /* expectReportedInUse */, dsw)
verifyPodExistsInVolumeDsw(t, podName, generatedVolumeName, dsw)
// Act
@ -158,7 +161,140 @@ func Test_DeletePodFromVolume_Positive_PodExistsVolumeExists(t *testing.T) {
verifyPodDoesntExistInVolumeDsw(t, podName, generatedVolumeName, dsw)
}
func verifyVolumeExists(
// Calls AddPodToVolume() to add three new volumes to data struct
// Verifies newly added pod/volume exists via PodExistsInVolume()
// VolumeExists() and GetVolumesToMount()
// Marks only second volume as reported in use.
// Verifies only that volume is marked reported in use
// Marks only first volume as reported in use.
// Verifies only that volume is marked reported in use
func Test_MarkVolumesReportedInUse_Positive_NewPodNewVolume(t *testing.T) {
// Arrange
volumePluginMgr, _ := volumetesting.GetTestVolumePluginMgr(t)
dsw := NewDesiredStateOfWorld(volumePluginMgr)
pod1 := &api.Pod{
ObjectMeta: api.ObjectMeta{
Name: "pod1",
UID: "pod1uid",
},
Spec: api.PodSpec{
Volumes: []api.Volume{
{
Name: "volume1-name",
VolumeSource: api.VolumeSource{
GCEPersistentDisk: &api.GCEPersistentDiskVolumeSource{
PDName: "fake-device1",
},
},
},
},
},
}
volume1Spec := &volume.Spec{Volume: &pod1.Spec.Volumes[0]}
pod1Name := volumehelper.GetUniquePodName(pod1)
pod2 := &api.Pod{
ObjectMeta: api.ObjectMeta{
Name: "pod2",
UID: "pod2uid",
},
Spec: api.PodSpec{
Volumes: []api.Volume{
{
Name: "volume2-name",
VolumeSource: api.VolumeSource{
GCEPersistentDisk: &api.GCEPersistentDiskVolumeSource{
PDName: "fake-device2",
},
},
},
},
},
}
volume2Spec := &volume.Spec{Volume: &pod2.Spec.Volumes[0]}
pod2Name := volumehelper.GetUniquePodName(pod2)
pod3 := &api.Pod{
ObjectMeta: api.ObjectMeta{
Name: "pod3",
UID: "pod3uid",
},
Spec: api.PodSpec{
Volumes: []api.Volume{
{
Name: "volume3-name",
VolumeSource: api.VolumeSource{
GCEPersistentDisk: &api.GCEPersistentDiskVolumeSource{
PDName: "fake-device3",
},
},
},
},
},
}
volume3Spec := &volume.Spec{Volume: &pod3.Spec.Volumes[0]}
pod3Name := volumehelper.GetUniquePodName(pod3)
generatedVolume1Name, err := dsw.AddPodToVolume(
pod1Name, pod1, volume1Spec, volume1Spec.Name(), "" /* volumeGidValue */)
if err != nil {
t.Fatalf("AddPodToVolume failed. Expected: <no error> Actual: <%v>", err)
}
generatedVolume2Name, err := dsw.AddPodToVolume(
pod2Name, pod2, volume2Spec, volume2Spec.Name(), "" /* volumeGidValue */)
if err != nil {
t.Fatalf("AddPodToVolume failed. Expected: <no error> Actual: <%v>", err)
}
generatedVolume3Name, err := dsw.AddPodToVolume(
pod3Name, pod3, volume3Spec, volume3Spec.Name(), "" /* volumeGidValue */)
if err != nil {
t.Fatalf("AddPodToVolume failed. Expected: <no error> Actual: <%v>", err)
}
// Act
volumesReportedInUse := []api.UniqueVolumeName{generatedVolume2Name}
dsw.MarkVolumesReportedInUse(volumesReportedInUse)
// Assert
verifyVolumeExistsDsw(t, generatedVolume1Name, dsw)
verifyVolumeExistsInVolumesToMount(
t, generatedVolume1Name, false /* expectReportedInUse */, dsw)
verifyPodExistsInVolumeDsw(t, pod1Name, generatedVolume1Name, dsw)
verifyVolumeExistsDsw(t, generatedVolume2Name, dsw)
verifyVolumeExistsInVolumesToMount(
t, generatedVolume2Name, true /* expectReportedInUse */, dsw)
verifyPodExistsInVolumeDsw(t, pod2Name, generatedVolume2Name, dsw)
verifyVolumeExistsDsw(t, generatedVolume3Name, dsw)
verifyVolumeExistsInVolumesToMount(
t, generatedVolume3Name, false /* expectReportedInUse */, dsw)
verifyPodExistsInVolumeDsw(t, pod3Name, generatedVolume3Name, dsw)
// Act
volumesReportedInUse = []api.UniqueVolumeName{generatedVolume3Name}
dsw.MarkVolumesReportedInUse(volumesReportedInUse)
// Assert
verifyVolumeExistsDsw(t, generatedVolume1Name, dsw)
verifyVolumeExistsInVolumesToMount(
t, generatedVolume1Name, false /* expectReportedInUse */, dsw)
verifyPodExistsInVolumeDsw(t, pod1Name, generatedVolume1Name, dsw)
verifyVolumeExistsDsw(t, generatedVolume2Name, dsw)
verifyVolumeExistsInVolumesToMount(
t, generatedVolume2Name, false /* expectReportedInUse */, dsw)
verifyPodExistsInVolumeDsw(t, pod2Name, generatedVolume2Name, dsw)
verifyVolumeExistsDsw(t, generatedVolume3Name, dsw)
verifyVolumeExistsInVolumesToMount(
t, generatedVolume3Name, true /* expectReportedInUse */, dsw)
verifyPodExistsInVolumeDsw(t, pod3Name, generatedVolume3Name, dsw)
}
func verifyVolumeExistsDsw(
t *testing.T, expectedVolumeName api.UniqueVolumeName, dsw DesiredStateOfWorld) {
volumeExists := dsw.VolumeExists(expectedVolumeName)
if !volumeExists {
@ -181,10 +317,21 @@ func verifyVolumeDoesntExist(
}
func verifyVolumeExistsInVolumesToMount(
t *testing.T, expectedVolumeName api.UniqueVolumeName, dsw DesiredStateOfWorld) {
t *testing.T,
expectedVolumeName api.UniqueVolumeName,
expectReportedInUse bool,
dsw DesiredStateOfWorld) {
volumesToMount := dsw.GetVolumesToMount()
for _, volume := range volumesToMount {
if volume.VolumeName == expectedVolumeName {
if volume.ReportedInUse != expectReportedInUse {
t.Fatalf(
"Found volume %v in the list of VolumesToMount, but ReportedInUse incorrect. Expected: <%v> Actual: <%v>",
expectedVolumeName,
expectReportedInUse,
volume.ReportedInUse)
}
return
}
}

View File

@ -295,7 +295,7 @@ func (rc *reconciler) reconciliationLoopFunc() func() {
attachedVolume.VolumeName,
attachedVolume.VolumeSpec.Name())
err := rc.operationExecutor.DetachVolume(
attachedVolume.AttachedVolume, rc.actualStateOfWorld)
attachedVolume.AttachedVolume, false /* verifySafeToDetach */, rc.actualStateOfWorld)
if err != nil &&
!goroutinemap.IsAlreadyExists(err) &&
!goroutinemap.IsExponentialBackoff(err) {

View File

@ -116,7 +116,7 @@ func Test_Run_Positive_VolumeAttachAndMount(t *testing.T) {
volumeSpec := &volume.Spec{Volume: &pod.Spec.Volumes[0]}
podName := volumehelper.GetUniquePodName(pod)
_, err := dsw.AddPodToVolume(
generatedVolumeName, err := dsw.AddPodToVolume(
podName, pod, volumeSpec, volumeSpec.Name(), "" /* volumeGidValue */)
// Assert
@ -126,7 +126,7 @@ func Test_Run_Positive_VolumeAttachAndMount(t *testing.T) {
// Act
go reconciler.Run(wait.NeverStop)
waitForAttach(t, fakePlugin, asw)
waitForMount(t, fakePlugin, generatedVolumeName, asw)
// Assert
assert.NoError(t, volumetesting.VerifyAttachCallCount(
@ -183,8 +183,9 @@ func Test_Run_Positive_VolumeMountControllerAttachEnabled(t *testing.T) {
volumeSpec := &volume.Spec{Volume: &pod.Spec.Volumes[0]}
podName := volumehelper.GetUniquePodName(pod)
_, err := dsw.AddPodToVolume(
generatedVolumeName, err := dsw.AddPodToVolume(
podName, pod, volumeSpec, volumeSpec.Name(), "" /* volumeGidValue */)
dsw.MarkVolumesReportedInUse([]api.UniqueVolumeName{generatedVolumeName})
// Assert
if err != nil {
@ -193,7 +194,7 @@ func Test_Run_Positive_VolumeMountControllerAttachEnabled(t *testing.T) {
// Act
go reconciler.Run(wait.NeverStop)
waitForAttach(t, fakePlugin, asw)
waitForMount(t, fakePlugin, generatedVolumeName, asw)
// Assert
assert.NoError(t, volumetesting.VerifyZeroAttachCalls(fakePlugin))
@ -259,7 +260,7 @@ func Test_Run_Positive_VolumeAttachMountUnmountDetach(t *testing.T) {
// Act
go reconciler.Run(wait.NeverStop)
waitForAttach(t, fakePlugin, asw)
waitForMount(t, fakePlugin, generatedVolumeName, asw)
// Assert
assert.NoError(t, volumetesting.VerifyAttachCallCount(
@ -275,7 +276,7 @@ func Test_Run_Positive_VolumeAttachMountUnmountDetach(t *testing.T) {
// Act
dsw.DeletePodFromVolume(podName, generatedVolumeName)
waitForDetach(t, fakePlugin, asw)
waitForDetach(t, fakePlugin, generatedVolumeName, asw)
// Assert
assert.NoError(t, volumetesting.VerifyTearDownCallCount(
@ -338,7 +339,8 @@ func Test_Run_Positive_VolumeUnmountControllerAttachEnabled(t *testing.T) {
// Act
go reconciler.Run(wait.NeverStop)
waitForAttach(t, fakePlugin, asw)
dsw.MarkVolumesReportedInUse([]api.UniqueVolumeName{generatedVolumeName})
waitForMount(t, fakePlugin, generatedVolumeName, asw)
// Assert
assert.NoError(t, volumetesting.VerifyZeroAttachCalls(fakePlugin))
@ -353,7 +355,7 @@ func Test_Run_Positive_VolumeUnmountControllerAttachEnabled(t *testing.T) {
// Act
dsw.DeletePodFromVolume(podName, generatedVolumeName)
waitForDetach(t, fakePlugin, asw)
waitForDetach(t, fakePlugin, generatedVolumeName, asw)
// Assert
assert.NoError(t, volumetesting.VerifyTearDownCallCount(
@ -361,16 +363,19 @@ func Test_Run_Positive_VolumeUnmountControllerAttachEnabled(t *testing.T) {
assert.NoError(t, volumetesting.VerifyZeroDetachCallCount(fakePlugin))
}
func waitForAttach(
func waitForMount(
t *testing.T,
fakePlugin *volumetesting.FakeVolumePlugin,
volumeName api.UniqueVolumeName,
asw cache.ActualStateOfWorld) {
err := retryWithExponentialBackOff(
time.Duration(5*time.Millisecond),
func() (bool, error) {
mountedVolumes := asw.GetMountedVolumes()
if len(mountedVolumes) > 0 {
return true, nil
for _, mountedVolume := range mountedVolumes {
if mountedVolume.VolumeName == volumeName {
return true, nil
}
}
return false, nil
@ -378,28 +383,28 @@ func waitForAttach(
)
if err != nil {
t.Fatalf("Timed out waiting for len of asw.GetMountedVolumes() to become non-zero.")
t.Fatalf("Timed out waiting for volume %q to be attached.", volumeName)
}
}
func waitForDetach(
t *testing.T,
fakePlugin *volumetesting.FakeVolumePlugin,
volumeName api.UniqueVolumeName,
asw cache.ActualStateOfWorld) {
err := retryWithExponentialBackOff(
time.Duration(5*time.Millisecond),
func() (bool, error) {
attachedVolumes := asw.GetAttachedVolumes()
if len(attachedVolumes) == 0 {
return true, nil
if asw.VolumeExists(volumeName) {
return false, nil
}
return false, nil
return true, nil
},
)
if err != nil {
t.Fatalf("Timed out waiting for len of asw.attachedVolumes() to become zero.")
t.Fatalf("Timed out waiting for volume %q to be detached.", volumeName)
}
}

View File

@ -102,10 +102,24 @@ type VolumeManager interface {
// pod object is bad, and should be avoided.
GetVolumesForPodAndAppendSupplementalGroups(pod *api.Pod) container.VolumeMap
// Returns a list of all volumes that are currently attached according to
// the actual state of the world cache and implement the volume.Attacher
// interface.
// Returns a list of all volumes that implement the volume.Attacher
// interface and are currently in use according to the actual and desired
// state of the world caches. A volume is considered "in use" as soon as it
// is added to the desired state of world, indicating it *should* be
// attached to this node and remains "in use" until it is removed from both
// the desired state of the world and the actual state of the world, or it
// has been unmounted (as indicated in actual state of world).
// TODO(#27653): VolumesInUse should be handled gracefully on kubelet'
// restarts.
GetVolumesInUse() []api.UniqueVolumeName
// VolumeIsAttached returns true if the given volume is attached to this
// node.
VolumeIsAttached(volumeName api.UniqueVolumeName) bool
// Marks the specified volume as having successfully been reported as "in
// use" in the nodes's volume status.
MarkVolumesAsReportedInUse(volumesReportedAsInUse []api.UniqueVolumeName)
}
// NewVolumeManager returns a new concrete instance implementing the
@ -209,16 +223,47 @@ func (vm *volumeManager) GetVolumesForPodAndAppendSupplementalGroups(
}
func (vm *volumeManager) GetVolumesInUse() []api.UniqueVolumeName {
attachedVolumes := vm.actualStateOfWorld.GetAttachedVolumes()
volumesInUse :=
make([]api.UniqueVolumeName, 0 /* len */, len(attachedVolumes) /* cap */)
for _, attachedVolume := range attachedVolumes {
if attachedVolume.PluginIsAttachable {
volumesInUse = append(volumesInUse, attachedVolume.VolumeName)
// Report volumes in desired state of world and actual state of world so
// that volumes are marked in use as soon as the decision is made that the
// volume *should* be attached to this node until it is safely unmounted.
desiredVolumes := vm.desiredStateOfWorld.GetVolumesToMount()
mountedVolumes := vm.actualStateOfWorld.GetGloballyMountedVolumes()
volumesToReportInUse :=
make(
[]api.UniqueVolumeName,
0, /* len */
len(desiredVolumes)+len(mountedVolumes) /* cap */)
desiredVolumesMap :=
make(
map[api.UniqueVolumeName]bool,
len(desiredVolumes)+len(mountedVolumes) /* cap */)
for _, volume := range desiredVolumes {
if volume.PluginIsAttachable {
desiredVolumesMap[volume.VolumeName] = true
volumesToReportInUse = append(volumesToReportInUse, volume.VolumeName)
}
}
return volumesInUse
for _, volume := range mountedVolumes {
if volume.PluginIsAttachable {
if _, exists := desiredVolumesMap[volume.VolumeName]; !exists {
volumesToReportInUse = append(volumesToReportInUse, volume.VolumeName)
}
}
}
return volumesToReportInUse
}
func (vm *volumeManager) VolumeIsAttached(
volumeName api.UniqueVolumeName) bool {
return vm.actualStateOfWorld.VolumeExists(volumeName)
}
func (vm *volumeManager) MarkVolumesAsReportedInUse(
volumesReportedAsInUse []api.UniqueVolumeName) {
vm.desiredStateOfWorld.MarkVolumesReportedInUse(volumesReportedAsInUse)
}
// getVolumesForPodHelper is a helper method implements the common logic for

View File

@ -60,8 +60,10 @@ type OperationExecutor interface {
// DetachVolume detaches the volume from the node specified in
// volumeToDetach, and updates the actual state of the world to reflect
// that.
DetachVolume(volumeToDetach AttachedVolume, actualStateOfWorld ActualStateOfWorldAttacherUpdater) error
// that. If verifySafeToDetach is set, a call is made to the fetch the node
// object and it is used to verify that the volume does not exist in Node's
// Status.VolumesInUse list (operation fails with error if it is).
DetachVolume(volumeToDetach AttachedVolume, verifySafeToDetach bool, actualStateOfWorld ActualStateOfWorldAttacherUpdater) error
// MountVolume mounts the volume to the pod specified in volumeToMount.
// Specifically it will:
@ -183,6 +185,10 @@ type VolumeToMount struct {
// DevicePath contains the path on the node where the volume is attached.
// For non-attachable volumes this is empty.
DevicePath string
// ReportedInUse indicates that the volume was successfully added to the
// VolumesInUse field in the node's status.
ReportedInUse bool
}
// AttachedVolume represents a volume that is attached to a node.
@ -335,9 +341,10 @@ func (oe *operationExecutor) AttachVolume(
func (oe *operationExecutor) DetachVolume(
volumeToDetach AttachedVolume,
verifySafeToDetach bool,
actualStateOfWorld ActualStateOfWorldAttacherUpdater) error {
detachFunc, err :=
oe.generateDetachVolumeFunc(volumeToDetach, actualStateOfWorld)
oe.generateDetachVolumeFunc(volumeToDetach, verifySafeToDetach, actualStateOfWorld)
if err != nil {
return err
}
@ -465,6 +472,7 @@ func (oe *operationExecutor) generateAttachVolumeFunc(
func (oe *operationExecutor) generateDetachVolumeFunc(
volumeToDetach AttachedVolume,
verifySafeToDetach bool,
actualStateOfWorld ActualStateOfWorldAttacherUpdater) (func() error, error) {
// Get attacher plugin
attachableVolumePlugin, err :=
@ -500,6 +508,44 @@ func (oe *operationExecutor) generateDetachVolumeFunc(
}
return func() error {
if verifySafeToDetach {
// Fetch current node object
node, fetchErr := oe.kubeClient.Core().Nodes().Get(volumeToDetach.NodeName)
if fetchErr != nil {
// On failure, return error. Caller will log and retry.
return fmt.Errorf(
"DetachVolume failed fetching node from API server for volume %q (spec.Name: %q) from node %q with: %v",
volumeToDetach.VolumeName,
volumeToDetach.VolumeSpec.Name(),
volumeToDetach.NodeName,
fetchErr)
}
if node == nil {
// On failure, return error. Caller will log and retry.
return fmt.Errorf(
"DetachVolume failed fetching node from API server for volume %q (spec.Name: %q) from node %q. Error: node object retrieved from API server is nil.",
volumeToDetach.VolumeName,
volumeToDetach.VolumeSpec.Name(),
volumeToDetach.NodeName)
}
for _, inUseVolume := range node.Status.VolumesInUse {
if inUseVolume == volumeToDetach.VolumeName {
return fmt.Errorf("DetachVolume failed for volume %q (spec.Name: %q) from node %q. Error: volume is still in use by node, according to Node status.",
volumeToDetach.VolumeName,
volumeToDetach.VolumeSpec.Name(),
volumeToDetach.NodeName)
}
}
// Volume not attached, return error. Caller will log and retry.
glog.Infof("Verified volume is safe to detach for volume %q (spec.Name: %q) from node %q.",
volumeToDetach.VolumeName,
volumeToDetach.VolumeSpec.Name(),
volumeToDetach.NodeName)
}
// Execute detach
detachErr := volumeDetacher.Detach(volumeName, volumeToDetach.NodeName)
if detachErr != nil {
@ -864,6 +910,20 @@ func (oe *operationExecutor) generateVerifyControllerAttachedVolumeFunc(
return nil
}
if !volumeToMount.ReportedInUse {
// If the given volume has not yet been added to the list of
// VolumesInUse in the node's volume status, do not proceed, return
// error. Caller will log and retry. The node status is updated
// periodically by kubelet, so it may take as much as 10 seconds
// before this clears.
// Issue #28141 to enable on demand status updates.
return fmt.Errorf("Volume %q (spec.Name: %q) pod %q (UID: %q) has not yet been added to the list of VolumesInUse in the node's volume status.",
volumeToMount.VolumeName,
volumeToMount.VolumeSpec.Name(),
volumeToMount.PodName,
volumeToMount.Pod.UID)
}
// Fetch current node object
node, fetchErr := oe.kubeClient.Core().Nodes().Get(nodeName)
if fetchErr != nil {