Fix issue in node status updating VolumeAttached list

During volume detach, the following might happen in reconciler

1. Pod is deleting
2. remove volume from reportedAsAttached, so node status updater will
update volumeAttached list
3. detach failed due to some issue
4. volume is added back in reportedAsAttached
5. reconciler loops again the volume, remove volume from
reportedAsAttached
6. detach will not be trigged because exponential back off, detach call
will fail with exponential backoff error
7. another pod is added which using the same volume on the same node
8. reconciler loops and it will NOT try to tigger detach anymore

At this point, volume is still attached and in actual state, but
volumeAttached list in node status does not has this volume anymore, and
will block volume mount from kubelet.

The fix in first round is to add volume back into the volume list that
need to reported as attached at step 6 when detach call failed with
error (exponentical backoff). However this might has some performance
issue if detach fail for a while. During this time, volume will be keep
removing/adding back to node status which will cause a surge of API
calls.

So we changed to logic to check first whether operation is safe to retry which
means no pending operation or it is not in exponentical backoff time
period before calling detach. This way we can avoid keep removing/adding
volume from node status.

Change-Id: I5d4e760c880d72937d34b9d3e904ecad125f802e
This commit is contained in:
Jing Xu 2021-08-20 15:26:32 -07:00
parent f4e1558af0
commit 69b9f9b1f0
7 changed files with 188 additions and 14 deletions

View File

@ -28,7 +28,7 @@ import (
"k8s.io/klog/v2"
"k8s.io/api/core/v1"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/kubernetes/pkg/volume"
"k8s.io/kubernetes/pkg/volume/util"

View File

@ -151,13 +151,13 @@ func (rc *reconciler) reconcile() {
// The operation key format is different depending on whether the volume
// allows multi attach across different nodes.
if util.IsMultiAttachAllowed(attachedVolume.VolumeSpec) {
if rc.attacherDetacher.IsOperationPending(attachedVolume.VolumeName, "" /* podName */, attachedVolume.NodeName) {
klog.V(10).Infof("Operation for volume %q is already running for node %q. Can't start detach", attachedVolume.VolumeName, attachedVolume.NodeName)
if !rc.attacherDetacher.IsOperationSafeToRetry(attachedVolume.VolumeName, "" /* podName */, attachedVolume.NodeName, operationexecutor.DetachOperationName) {
klog.V(10).Infof("Operation for volume %q is already running or still in exponential backoff for node %q. Can't start detach", attachedVolume.VolumeName, attachedVolume.NodeName)
continue
}
} else {
if rc.attacherDetacher.IsOperationPending(attachedVolume.VolumeName, "" /* podName */, "" /* nodeName */) {
klog.V(10).Infof("Operation for volume %q is already running in the cluster. Can't start detach for %q", attachedVolume.VolumeName, attachedVolume.NodeName)
if !rc.attacherDetacher.IsOperationSafeToRetry(attachedVolume.VolumeName, "" /* podName */, "" /* nodeName */, operationexecutor.DetachOperationName) {
klog.V(10).Infof("Operation for volume %q is already running or still in exponential backoff in the cluster. Can't start detach for %q", attachedVolume.VolumeName, attachedVolume.NodeName)
continue
}
}
@ -193,6 +193,8 @@ func (rc *reconciler) reconcile() {
// Before triggering volume detach, mark volume as detached and update the node status
// If it fails to update node status, skip detach volume
// If volume detach operation fails, the volume needs to be added back to report as attached so that node status
// has the correct volume attachment information.
err = rc.actualStateOfWorld.RemoveVolumeFromReportAsAttached(attachedVolume.VolumeName, attachedVolume.NodeName)
if err != nil {
klog.V(5).Infof("RemoveVolumeFromReportAsAttached failed while removing volume %q from node %q with: %v",
@ -222,10 +224,17 @@ func (rc *reconciler) reconcile() {
klog.Warningf(attachedVolume.GenerateMsgDetailed("attacherDetacher.DetachVolume started", fmt.Sprintf("This volume is not safe to detach, but maxWaitForUnmountDuration %v expired, force detaching", rc.maxWaitForUnmountDuration)))
}
}
if err != nil && !exponentialbackoff.IsExponentialBackoff(err) {
// Ignore exponentialbackoff.IsExponentialBackoff errors, they are expected.
// Log all other errors.
klog.Errorf(attachedVolume.GenerateErrorDetailed("attacherDetacher.DetachVolume failed to start", err).Error())
if err != nil {
// Add volume back to ReportAsAttached if DetachVolume call failed so that node status updater will add it back to VolumeAttached list.
// This function is also called during executing the volume detach operation in operation_generoator.
// It is needed here too because DetachVolume call might fail before executing the actual operation in operation_executor (e.g., cannot find volume plugin etc.)
rc.actualStateOfWorld.AddVolumeToReportAsAttached(attachedVolume.VolumeName, attachedVolume.NodeName)
if !exponentialbackoff.IsExponentialBackoff(err) {
// Ignore exponentialbackoff.IsExponentialBackoff errors, they are expected.
// Log all other errors.
klog.Errorf(attachedVolume.GenerateErrorDetailed("attacherDetacher.DetachVolume failed to start", err).Error())
}
}
}
}

View File

@ -20,7 +20,7 @@ import (
"testing"
"time"
"k8s.io/api/core/v1"
v1 "k8s.io/api/core/v1"
k8stypes "k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/informers"
@ -36,7 +36,7 @@ import (
)
const (
reconcilerLoopPeriod time.Duration = 0 * time.Millisecond
reconcilerLoopPeriod time.Duration = 10 * time.Millisecond
syncLoopPeriod time.Duration = 100 * time.Minute
maxWaitForUnmountDuration time.Duration = 50 * time.Millisecond
)
@ -599,6 +599,103 @@ func Test_Run_OneVolumeAttachAndDetachUncertainNodesWithReadWriteOnce(t *testing
}
func Test_Run_OneVolumeDetachFailNodeWithReadWriteOnce(t *testing.T) {
// Arrange
volumePluginMgr, _ := volumetesting.GetTestVolumePluginMgr(t)
dsw := cache.NewDesiredStateOfWorld(volumePluginMgr)
asw := cache.NewActualStateOfWorld(volumePluginMgr)
fakeKubeClient := controllervolumetesting.CreateTestClient()
fakeRecorder := &record.FakeRecorder{}
fakeHandler := volumetesting.NewBlockVolumePathHandler()
ad := operationexecutor.NewOperationExecutor(operationexecutor.NewOperationGenerator(
fakeKubeClient,
volumePluginMgr,
fakeRecorder,
false, /* checkNodeCapabilitiesBeforeMount */
fakeHandler))
nsu := statusupdater.NewFakeNodeStatusUpdater(false /* returnError */)
reconciler := NewReconciler(
reconcilerLoopPeriod, maxWaitForUnmountDuration, syncLoopPeriod, false, dsw, asw, ad, nsu, fakeRecorder)
podName1 := "pod-uid1"
podName2 := "pod-uid2"
podName3 := "pod-uid3"
volumeName := v1.UniqueVolumeName("volume-name")
volumeSpec := controllervolumetesting.GetTestVolumeSpec(string(volumeName), volumeName)
volumeSpec.PersistentVolume.Spec.AccessModes = []v1.PersistentVolumeAccessMode{v1.ReadWriteOnce}
nodeName1 := k8stypes.NodeName(volumetesting.FailDetachNode)
nodeName2 := k8stypes.NodeName("node-name2")
dsw.AddNode(nodeName1, false /*keepTerminatedPodVolumes*/)
dsw.AddNode(nodeName2, false /*keepTerminatedPodVolumes*/)
// Act
ch := make(chan struct{})
go reconciler.Run(ch)
defer close(ch)
// Add the pod in which the volume is attached to the FailDetachNode
generatedVolumeName, podAddErr := dsw.AddPod(types.UniquePodName(podName1), controllervolumetesting.NewPod(podName1, podName1), volumeSpec, nodeName1)
if podAddErr != nil {
t.Fatalf("AddPod failed. Expected: <no error> Actual: <%v>", podAddErr)
}
time.Sleep(1000 * time.Millisecond)
// Volume is added to asw, volume should be reported as attached to the node.
waitForVolumeAddedToNode(t, generatedVolumeName, nodeName1, asw)
verifyVolumeAttachedToNode(t, generatedVolumeName, nodeName1, cache.AttachStateAttached, asw)
verifyVolumeReportedAsAttachedToNode(t, generatedVolumeName, nodeName1, true, asw)
// Delete the pod, but detach will fail
dsw.DeletePod(types.UniquePodName(podName1), generatedVolumeName, nodeName1)
// The first detach will be triggered after at leaset 50ms (maxWaitForUnmountDuration in test).
// Right before detach operation is performed, the volume will be first removed from being reported
// as attached on node status (RemoveVolumeFromReportAsAttached). After detach operation which is expected to fail,
// controller then added the volume back as attached.
// Here it sleeps 100ms so that detach should be triggered already at this point.
// verifyVolumeReportedAsAttachedToNode will check volume is in the list of volume attached that needs to be updated
// in node status. By calling this function (GetVolumesToReportAttached), node status should be updated, and the volume
// will not need to be updated until new changes are applied (detach is triggered again)
time.Sleep(100 * time.Millisecond)
verifyVolumeAttachedToNode(t, generatedVolumeName, nodeName1, cache.AttachStateAttached, asw)
verifyVolumeReportedAsAttachedToNode(t, generatedVolumeName, nodeName1, true, asw)
// After the first detach fails, reconciler will wait for a period of time before retrying to detach.
// The wait time is increasing exponentially from initial value of 0.5s (0.5, 1, 2, 4, ...).
// The test here waits for 100 Millisecond to make sure it is in exponential backoff period after
// the first detach operation. At this point, volumes status should not be updated
time.Sleep(100 * time.Millisecond)
verifyVolumeAttachedToNode(t, generatedVolumeName, nodeName1, cache.AttachStateAttached, asw)
verifyVolumeNoStatusUpdateNeeded(t, generatedVolumeName, nodeName1, asw)
// Wait for 600ms to make sure second detach operation triggered. Again, The volume will be
// removed from being reported as attached on node status and then added back as attached.
// The volume will be in the list of attached volumes that need to be updated to node status.
time.Sleep(600 * time.Millisecond)
verifyVolumeAttachedToNode(t, generatedVolumeName, nodeName1, cache.AttachStateAttached, asw)
verifyVolumeReportedAsAttachedToNode(t, generatedVolumeName, nodeName1, true, asw)
// Add a second pod which tries to attach the volume to the same node.
// After adding pod to the same node, detach will not be triggered any more.
generatedVolumeName, podAddErr = dsw.AddPod(types.UniquePodName(podName2), controllervolumetesting.NewPod(podName2, podName2), volumeSpec, nodeName1)
if podAddErr != nil {
t.Fatalf("AddPod failed. Expected: <no error> Actual: <%v>", podAddErr)
}
// Sleep 1s to verify no detach are triggered after second pod is added in the future.
time.Sleep(1000 * time.Millisecond)
verifyVolumeAttachedToNode(t, generatedVolumeName, nodeName1, cache.AttachStateAttached, asw)
verifyVolumeNoStatusUpdateNeeded(t, generatedVolumeName, nodeName1, asw)
// Add a third pod which tries to attach the volume to a different node.
// At this point, volume is still attached to first node. There are no status update for both nodes.
generatedVolumeName, podAddErr = dsw.AddPod(types.UniquePodName(podName3), controllervolumetesting.NewPod(podName3, podName3), volumeSpec, nodeName2)
if podAddErr != nil {
t.Fatalf("AddPod failed. Expected: <no error> Actual: <%v>", podAddErr)
}
verifyVolumeAttachedToNode(t, generatedVolumeName, nodeName1, cache.AttachStateAttached, asw)
verifyVolumeNoStatusUpdateNeeded(t, generatedVolumeName, nodeName1, asw)
verifyVolumeNoStatusUpdateNeeded(t, generatedVolumeName, nodeName2, asw)
}
// Creates a volume with accessMode ReadWriteOnce
// First create a pod which will try to attach the volume to the a node named "timeout-node". The attach call for this node will
// fail for timeout, but the volume will be actually attached to the node after the call.
@ -1181,6 +1278,22 @@ func verifyVolumeReportedAsAttachedToNode(
}
func verifyVolumeNoStatusUpdateNeeded(
t *testing.T,
volumeName v1.UniqueVolumeName,
nodeName k8stypes.NodeName,
asw cache.ActualStateOfWorld,
) {
volumes := asw.GetVolumesToReportAttached()
for _, volume := range volumes[nodeName] {
if volume.Name == volumeName {
t.Fatalf("Check volume <%v> is reported as need to update status on node <%v>, expected false",
volumeName,
nodeName)
}
}
}
func verifyNewDetacherCallCount(
t *testing.T,
expectZeroNewDetacherCallCount bool,

View File

@ -51,6 +51,8 @@ const (
// The node is marked as uncertain. The attach operation will fail and return timeout error
// for the first attach call. The following call will return sucesssfully.
UncertainAttachNode = "uncertain-attach-node"
// The detach operation will keep failing on the node.
FailDetachNode = "fail-detach-node"
// The node is marked as timeout. The attach operation will always fail and return timeout error
// but the operation is actually succeeded.
TimeoutAttachNode = "timeout-attach-node"
@ -1083,6 +1085,10 @@ func (fv *FakeVolume) Detach(volumeName string, nodeName types.NodeName) error {
return fmt.Errorf("trying to detach volume %q that is not attached to the node %q", volumeName, node)
}
if nodeName == FailDetachNode {
return fmt.Errorf("fail to detach volume %q to node %q", volumeName, nodeName)
}
volumeNodes.Delete(node)
if volumeNodes.Len() == 0 {
delete(fv.VolumesAttached, volumeName)

View File

@ -28,7 +28,7 @@ import (
"fmt"
"sync"
"k8s.io/api/core/v1"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/types"
k8sRuntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/klog/v2"
@ -106,6 +106,13 @@ type NestedPendingOperations interface {
volumeName v1.UniqueVolumeName,
podName volumetypes.UniquePodName,
nodeName types.NodeName) bool
// IsOperationSafeToRetry returns false if an operation for the given volumeName
// and one of podName or nodeName is pending or in exponential backoff, otherwise it returns true
IsOperationSafeToRetry(
volumeName v1.UniqueVolumeName,
podName volumetypes.UniquePodName,
nodeName types.NodeName, operationName string) bool
}
// NewNestedPendingOperations returns a new instance of NestedPendingOperations.
@ -185,6 +192,33 @@ func (grm *nestedPendingOperations) Run(
return nil
}
func (grm *nestedPendingOperations) IsOperationSafeToRetry(
volumeName v1.UniqueVolumeName,
podName volumetypes.UniquePodName,
nodeName types.NodeName,
operationName string) bool {
grm.lock.RLock()
defer grm.lock.RUnlock()
opKey := operationKey{volumeName, podName, nodeName}
exist, previousOpIndex := grm.isOperationExists(opKey)
if !exist {
return true
}
previousOp := grm.operations[previousOpIndex]
if previousOp.operationPending {
return false
}
backOffErr := previousOp.expBackoff.SafeToRetry(fmt.Sprintf("%+v", opKey))
if backOffErr != nil {
if previousOp.operationName == operationName {
return false
}
}
return true
}
func (grm *nestedPendingOperations) IsOperationPending(
volumeName v1.UniqueVolumeName,

View File

@ -141,6 +141,9 @@ type OperationExecutor interface {
// IsOperationPending returns true if an operation for the given volumeName
// and one of podName or nodeName is pending, otherwise it returns false
IsOperationPending(volumeName v1.UniqueVolumeName, podName volumetypes.UniquePodName, nodeName types.NodeName) bool
// IsOperationSafeToRetry returns false if an operation for the given volumeName
// and one of podName or nodeName is pending or in exponential backoff, otherwise it returns true
IsOperationSafeToRetry(volumeName v1.UniqueVolumeName, podName volumetypes.UniquePodName, nodeName types.NodeName, operationName string) bool
// ExpandInUseVolume will resize volume's file system to expected size without unmounting the volume.
ExpandInUseVolume(volumeToMount VolumeToMount, actualStateOfWorld ActualStateOfWorldMounterUpdater) error
// ReconstructVolumeOperation construct a new volumeSpec and returns it created by plugin
@ -664,6 +667,14 @@ func (oe *operationExecutor) IsOperationPending(
return oe.pendingOperations.IsOperationPending(volumeName, podName, nodeName)
}
func (oe *operationExecutor) IsOperationSafeToRetry(
volumeName v1.UniqueVolumeName,
podName volumetypes.UniquePodName,
nodeName types.NodeName,
operationName string) bool {
return oe.pendingOperations.IsOperationSafeToRetry(volumeName, podName, nodeName, operationName)
}
func (oe *operationExecutor) AttachVolume(
volumeToAttach VolumeToAttach,
actualStateOfWorld ActualStateOfWorldAttacherUpdater) error {

View File

@ -48,6 +48,7 @@ import (
const (
unknownVolumePlugin string = "UnknownVolumePlugin"
unknownAttachableVolumePlugin string = "UnknownAttachableVolumePlugin"
DetachOperationName string = "volume_detach"
)
// InTreeToCSITranslator contains methods required to check migratable status
@ -491,9 +492,9 @@ func (og *operationGenerator) GenerateDetachVolumeFunc(
}
return volumetypes.GeneratedOperations{
OperationName: "volume_detach",
OperationName: DetachOperationName,
OperationFunc: detachVolumeFunc,
CompleteFunc: util.OperationCompleteHook(util.GetFullQualifiedPluginNameForVolume(pluginName, volumeToDetach.VolumeSpec), "volume_detach"),
CompleteFunc: util.OperationCompleteHook(util.GetFullQualifiedPluginNameForVolume(pluginName, volumeToDetach.VolumeSpec), DetachOperationName),
EventRecorderFunc: nil, // nil because we do not want to generate event on error
}, nil
}