diff --git a/pkg/controller/volume/attachdetach/cache/actual_state_of_world.go b/pkg/controller/volume/attachdetach/cache/actual_state_of_world.go index 9513d09a603..4d420e01706 100644 --- a/pkg/controller/volume/attachdetach/cache/actual_state_of_world.go +++ b/pkg/controller/volume/attachdetach/cache/actual_state_of_world.go @@ -28,7 +28,7 @@ import ( "k8s.io/klog/v2" - "k8s.io/api/core/v1" + v1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/types" "k8s.io/kubernetes/pkg/volume" "k8s.io/kubernetes/pkg/volume/util" diff --git a/pkg/controller/volume/attachdetach/reconciler/reconciler.go b/pkg/controller/volume/attachdetach/reconciler/reconciler.go index c3273013aee..51391630185 100644 --- a/pkg/controller/volume/attachdetach/reconciler/reconciler.go +++ b/pkg/controller/volume/attachdetach/reconciler/reconciler.go @@ -151,13 +151,13 @@ func (rc *reconciler) reconcile() { // The operation key format is different depending on whether the volume // allows multi attach across different nodes. if util.IsMultiAttachAllowed(attachedVolume.VolumeSpec) { - if rc.attacherDetacher.IsOperationPending(attachedVolume.VolumeName, "" /* podName */, attachedVolume.NodeName) { - klog.V(10).Infof("Operation for volume %q is already running for node %q. Can't start detach", attachedVolume.VolumeName, attachedVolume.NodeName) + if !rc.attacherDetacher.IsOperationSafeToRetry(attachedVolume.VolumeName, "" /* podName */, attachedVolume.NodeName, operationexecutor.DetachOperationName) { + klog.V(10).Infof("Operation for volume %q is already running or still in exponential backoff for node %q. Can't start detach", attachedVolume.VolumeName, attachedVolume.NodeName) continue } } else { - if rc.attacherDetacher.IsOperationPending(attachedVolume.VolumeName, "" /* podName */, "" /* nodeName */) { - klog.V(10).Infof("Operation for volume %q is already running in the cluster. Can't start detach for %q", attachedVolume.VolumeName, attachedVolume.NodeName) + if !rc.attacherDetacher.IsOperationSafeToRetry(attachedVolume.VolumeName, "" /* podName */, "" /* nodeName */, operationexecutor.DetachOperationName) { + klog.V(10).Infof("Operation for volume %q is already running or still in exponential backoff in the cluster. Can't start detach for %q", attachedVolume.VolumeName, attachedVolume.NodeName) continue } } @@ -193,6 +193,8 @@ func (rc *reconciler) reconcile() { // Before triggering volume detach, mark volume as detached and update the node status // If it fails to update node status, skip detach volume + // If volume detach operation fails, the volume needs to be added back to report as attached so that node status + // has the correct volume attachment information. err = rc.actualStateOfWorld.RemoveVolumeFromReportAsAttached(attachedVolume.VolumeName, attachedVolume.NodeName) if err != nil { klog.V(5).Infof("RemoveVolumeFromReportAsAttached failed while removing volume %q from node %q with: %v", @@ -222,10 +224,17 @@ func (rc *reconciler) reconcile() { klog.Warningf(attachedVolume.GenerateMsgDetailed("attacherDetacher.DetachVolume started", fmt.Sprintf("This volume is not safe to detach, but maxWaitForUnmountDuration %v expired, force detaching", rc.maxWaitForUnmountDuration))) } } - if err != nil && !exponentialbackoff.IsExponentialBackoff(err) { - // Ignore exponentialbackoff.IsExponentialBackoff errors, they are expected. - // Log all other errors. - klog.Errorf(attachedVolume.GenerateErrorDetailed("attacherDetacher.DetachVolume failed to start", err).Error()) + if err != nil { + // Add volume back to ReportAsAttached if DetachVolume call failed so that node status updater will add it back to VolumeAttached list. + // This function is also called during executing the volume detach operation in operation_generoator. + // It is needed here too because DetachVolume call might fail before executing the actual operation in operation_executor (e.g., cannot find volume plugin etc.) + rc.actualStateOfWorld.AddVolumeToReportAsAttached(attachedVolume.VolumeName, attachedVolume.NodeName) + + if !exponentialbackoff.IsExponentialBackoff(err) { + // Ignore exponentialbackoff.IsExponentialBackoff errors, they are expected. + // Log all other errors. + klog.Errorf(attachedVolume.GenerateErrorDetailed("attacherDetacher.DetachVolume failed to start", err).Error()) + } } } } diff --git a/pkg/controller/volume/attachdetach/reconciler/reconciler_test.go b/pkg/controller/volume/attachdetach/reconciler/reconciler_test.go index 76bdf4fda40..552572ccaa4 100644 --- a/pkg/controller/volume/attachdetach/reconciler/reconciler_test.go +++ b/pkg/controller/volume/attachdetach/reconciler/reconciler_test.go @@ -20,7 +20,7 @@ import ( "testing" "time" - "k8s.io/api/core/v1" + v1 "k8s.io/api/core/v1" k8stypes "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/util/wait" "k8s.io/client-go/informers" @@ -36,7 +36,7 @@ import ( ) const ( - reconcilerLoopPeriod time.Duration = 0 * time.Millisecond + reconcilerLoopPeriod time.Duration = 10 * time.Millisecond syncLoopPeriod time.Duration = 100 * time.Minute maxWaitForUnmountDuration time.Duration = 50 * time.Millisecond ) @@ -599,6 +599,103 @@ func Test_Run_OneVolumeAttachAndDetachUncertainNodesWithReadWriteOnce(t *testing } +func Test_Run_OneVolumeDetachFailNodeWithReadWriteOnce(t *testing.T) { + // Arrange + volumePluginMgr, _ := volumetesting.GetTestVolumePluginMgr(t) + dsw := cache.NewDesiredStateOfWorld(volumePluginMgr) + asw := cache.NewActualStateOfWorld(volumePluginMgr) + fakeKubeClient := controllervolumetesting.CreateTestClient() + fakeRecorder := &record.FakeRecorder{} + fakeHandler := volumetesting.NewBlockVolumePathHandler() + ad := operationexecutor.NewOperationExecutor(operationexecutor.NewOperationGenerator( + fakeKubeClient, + volumePluginMgr, + fakeRecorder, + false, /* checkNodeCapabilitiesBeforeMount */ + fakeHandler)) + nsu := statusupdater.NewFakeNodeStatusUpdater(false /* returnError */) + reconciler := NewReconciler( + reconcilerLoopPeriod, maxWaitForUnmountDuration, syncLoopPeriod, false, dsw, asw, ad, nsu, fakeRecorder) + podName1 := "pod-uid1" + podName2 := "pod-uid2" + podName3 := "pod-uid3" + volumeName := v1.UniqueVolumeName("volume-name") + volumeSpec := controllervolumetesting.GetTestVolumeSpec(string(volumeName), volumeName) + volumeSpec.PersistentVolume.Spec.AccessModes = []v1.PersistentVolumeAccessMode{v1.ReadWriteOnce} + nodeName1 := k8stypes.NodeName(volumetesting.FailDetachNode) + nodeName2 := k8stypes.NodeName("node-name2") + dsw.AddNode(nodeName1, false /*keepTerminatedPodVolumes*/) + dsw.AddNode(nodeName2, false /*keepTerminatedPodVolumes*/) + + // Act + ch := make(chan struct{}) + go reconciler.Run(ch) + defer close(ch) + + // Add the pod in which the volume is attached to the FailDetachNode + generatedVolumeName, podAddErr := dsw.AddPod(types.UniquePodName(podName1), controllervolumetesting.NewPod(podName1, podName1), volumeSpec, nodeName1) + if podAddErr != nil { + t.Fatalf("AddPod failed. Expected: Actual: <%v>", podAddErr) + } + + time.Sleep(1000 * time.Millisecond) + // Volume is added to asw, volume should be reported as attached to the node. + waitForVolumeAddedToNode(t, generatedVolumeName, nodeName1, asw) + verifyVolumeAttachedToNode(t, generatedVolumeName, nodeName1, cache.AttachStateAttached, asw) + verifyVolumeReportedAsAttachedToNode(t, generatedVolumeName, nodeName1, true, asw) + + // Delete the pod, but detach will fail + dsw.DeletePod(types.UniquePodName(podName1), generatedVolumeName, nodeName1) + + // The first detach will be triggered after at leaset 50ms (maxWaitForUnmountDuration in test). + // Right before detach operation is performed, the volume will be first removed from being reported + // as attached on node status (RemoveVolumeFromReportAsAttached). After detach operation which is expected to fail, + // controller then added the volume back as attached. + // Here it sleeps 100ms so that detach should be triggered already at this point. + // verifyVolumeReportedAsAttachedToNode will check volume is in the list of volume attached that needs to be updated + // in node status. By calling this function (GetVolumesToReportAttached), node status should be updated, and the volume + // will not need to be updated until new changes are applied (detach is triggered again) + time.Sleep(100 * time.Millisecond) + verifyVolumeAttachedToNode(t, generatedVolumeName, nodeName1, cache.AttachStateAttached, asw) + verifyVolumeReportedAsAttachedToNode(t, generatedVolumeName, nodeName1, true, asw) + + // After the first detach fails, reconciler will wait for a period of time before retrying to detach. + // The wait time is increasing exponentially from initial value of 0.5s (0.5, 1, 2, 4, ...). + // The test here waits for 100 Millisecond to make sure it is in exponential backoff period after + // the first detach operation. At this point, volumes status should not be updated + time.Sleep(100 * time.Millisecond) + verifyVolumeAttachedToNode(t, generatedVolumeName, nodeName1, cache.AttachStateAttached, asw) + verifyVolumeNoStatusUpdateNeeded(t, generatedVolumeName, nodeName1, asw) + + // Wait for 600ms to make sure second detach operation triggered. Again, The volume will be + // removed from being reported as attached on node status and then added back as attached. + // The volume will be in the list of attached volumes that need to be updated to node status. + time.Sleep(600 * time.Millisecond) + verifyVolumeAttachedToNode(t, generatedVolumeName, nodeName1, cache.AttachStateAttached, asw) + verifyVolumeReportedAsAttachedToNode(t, generatedVolumeName, nodeName1, true, asw) + + // Add a second pod which tries to attach the volume to the same node. + // After adding pod to the same node, detach will not be triggered any more. + generatedVolumeName, podAddErr = dsw.AddPod(types.UniquePodName(podName2), controllervolumetesting.NewPod(podName2, podName2), volumeSpec, nodeName1) + if podAddErr != nil { + t.Fatalf("AddPod failed. Expected: Actual: <%v>", podAddErr) + } + // Sleep 1s to verify no detach are triggered after second pod is added in the future. + time.Sleep(1000 * time.Millisecond) + verifyVolumeAttachedToNode(t, generatedVolumeName, nodeName1, cache.AttachStateAttached, asw) + verifyVolumeNoStatusUpdateNeeded(t, generatedVolumeName, nodeName1, asw) + + // Add a third pod which tries to attach the volume to a different node. + // At this point, volume is still attached to first node. There are no status update for both nodes. + generatedVolumeName, podAddErr = dsw.AddPod(types.UniquePodName(podName3), controllervolumetesting.NewPod(podName3, podName3), volumeSpec, nodeName2) + if podAddErr != nil { + t.Fatalf("AddPod failed. Expected: Actual: <%v>", podAddErr) + } + verifyVolumeAttachedToNode(t, generatedVolumeName, nodeName1, cache.AttachStateAttached, asw) + verifyVolumeNoStatusUpdateNeeded(t, generatedVolumeName, nodeName1, asw) + verifyVolumeNoStatusUpdateNeeded(t, generatedVolumeName, nodeName2, asw) +} + // Creates a volume with accessMode ReadWriteOnce // First create a pod which will try to attach the volume to the a node named "timeout-node". The attach call for this node will // fail for timeout, but the volume will be actually attached to the node after the call. @@ -1181,6 +1278,22 @@ func verifyVolumeReportedAsAttachedToNode( } +func verifyVolumeNoStatusUpdateNeeded( + t *testing.T, + volumeName v1.UniqueVolumeName, + nodeName k8stypes.NodeName, + asw cache.ActualStateOfWorld, +) { + volumes := asw.GetVolumesToReportAttached() + for _, volume := range volumes[nodeName] { + if volume.Name == volumeName { + t.Fatalf("Check volume <%v> is reported as need to update status on node <%v>, expected false", + volumeName, + nodeName) + } + } +} + func verifyNewDetacherCallCount( t *testing.T, expectZeroNewDetacherCallCount bool, diff --git a/pkg/volume/testing/testing.go b/pkg/volume/testing/testing.go index 8ca324ec925..55352fa9c74 100644 --- a/pkg/volume/testing/testing.go +++ b/pkg/volume/testing/testing.go @@ -51,6 +51,8 @@ const ( // The node is marked as uncertain. The attach operation will fail and return timeout error // for the first attach call. The following call will return sucesssfully. UncertainAttachNode = "uncertain-attach-node" + // The detach operation will keep failing on the node. + FailDetachNode = "fail-detach-node" // The node is marked as timeout. The attach operation will always fail and return timeout error // but the operation is actually succeeded. TimeoutAttachNode = "timeout-attach-node" @@ -1083,6 +1085,10 @@ func (fv *FakeVolume) Detach(volumeName string, nodeName types.NodeName) error { return fmt.Errorf("trying to detach volume %q that is not attached to the node %q", volumeName, node) } + if nodeName == FailDetachNode { + return fmt.Errorf("fail to detach volume %q to node %q", volumeName, nodeName) + } + volumeNodes.Delete(node) if volumeNodes.Len() == 0 { delete(fv.VolumesAttached, volumeName) diff --git a/pkg/volume/util/nestedpendingoperations/nestedpendingoperations.go b/pkg/volume/util/nestedpendingoperations/nestedpendingoperations.go index 652ed1c76d6..aded86aa21a 100644 --- a/pkg/volume/util/nestedpendingoperations/nestedpendingoperations.go +++ b/pkg/volume/util/nestedpendingoperations/nestedpendingoperations.go @@ -28,7 +28,7 @@ import ( "fmt" "sync" - "k8s.io/api/core/v1" + v1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/types" k8sRuntime "k8s.io/apimachinery/pkg/util/runtime" "k8s.io/klog/v2" @@ -106,6 +106,13 @@ type NestedPendingOperations interface { volumeName v1.UniqueVolumeName, podName volumetypes.UniquePodName, nodeName types.NodeName) bool + + // IsOperationSafeToRetry returns false if an operation for the given volumeName + // and one of podName or nodeName is pending or in exponential backoff, otherwise it returns true + IsOperationSafeToRetry( + volumeName v1.UniqueVolumeName, + podName volumetypes.UniquePodName, + nodeName types.NodeName, operationName string) bool } // NewNestedPendingOperations returns a new instance of NestedPendingOperations. @@ -185,6 +192,33 @@ func (grm *nestedPendingOperations) Run( return nil } +func (grm *nestedPendingOperations) IsOperationSafeToRetry( + volumeName v1.UniqueVolumeName, + podName volumetypes.UniquePodName, + nodeName types.NodeName, + operationName string) bool { + + grm.lock.RLock() + defer grm.lock.RUnlock() + + opKey := operationKey{volumeName, podName, nodeName} + exist, previousOpIndex := grm.isOperationExists(opKey) + if !exist { + return true + } + previousOp := grm.operations[previousOpIndex] + if previousOp.operationPending { + return false + } + backOffErr := previousOp.expBackoff.SafeToRetry(fmt.Sprintf("%+v", opKey)) + if backOffErr != nil { + if previousOp.operationName == operationName { + return false + } + } + + return true +} func (grm *nestedPendingOperations) IsOperationPending( volumeName v1.UniqueVolumeName, diff --git a/pkg/volume/util/operationexecutor/operation_executor.go b/pkg/volume/util/operationexecutor/operation_executor.go index 6b177b8571e..ca4f6ca0cf9 100644 --- a/pkg/volume/util/operationexecutor/operation_executor.go +++ b/pkg/volume/util/operationexecutor/operation_executor.go @@ -141,6 +141,9 @@ type OperationExecutor interface { // IsOperationPending returns true if an operation for the given volumeName // and one of podName or nodeName is pending, otherwise it returns false IsOperationPending(volumeName v1.UniqueVolumeName, podName volumetypes.UniquePodName, nodeName types.NodeName) bool + // IsOperationSafeToRetry returns false if an operation for the given volumeName + // and one of podName or nodeName is pending or in exponential backoff, otherwise it returns true + IsOperationSafeToRetry(volumeName v1.UniqueVolumeName, podName volumetypes.UniquePodName, nodeName types.NodeName, operationName string) bool // ExpandInUseVolume will resize volume's file system to expected size without unmounting the volume. ExpandInUseVolume(volumeToMount VolumeToMount, actualStateOfWorld ActualStateOfWorldMounterUpdater) error // ReconstructVolumeOperation construct a new volumeSpec and returns it created by plugin @@ -664,6 +667,14 @@ func (oe *operationExecutor) IsOperationPending( return oe.pendingOperations.IsOperationPending(volumeName, podName, nodeName) } +func (oe *operationExecutor) IsOperationSafeToRetry( + volumeName v1.UniqueVolumeName, + podName volumetypes.UniquePodName, + nodeName types.NodeName, + operationName string) bool { + return oe.pendingOperations.IsOperationSafeToRetry(volumeName, podName, nodeName, operationName) +} + func (oe *operationExecutor) AttachVolume( volumeToAttach VolumeToAttach, actualStateOfWorld ActualStateOfWorldAttacherUpdater) error { diff --git a/pkg/volume/util/operationexecutor/operation_generator.go b/pkg/volume/util/operationexecutor/operation_generator.go index e28d1996f7a..f4635a89196 100644 --- a/pkg/volume/util/operationexecutor/operation_generator.go +++ b/pkg/volume/util/operationexecutor/operation_generator.go @@ -48,6 +48,7 @@ import ( const ( unknownVolumePlugin string = "UnknownVolumePlugin" unknownAttachableVolumePlugin string = "UnknownAttachableVolumePlugin" + DetachOperationName string = "volume_detach" ) // InTreeToCSITranslator contains methods required to check migratable status @@ -491,9 +492,9 @@ func (og *operationGenerator) GenerateDetachVolumeFunc( } return volumetypes.GeneratedOperations{ - OperationName: "volume_detach", + OperationName: DetachOperationName, OperationFunc: detachVolumeFunc, - CompleteFunc: util.OperationCompleteHook(util.GetFullQualifiedPluginNameForVolume(pluginName, volumeToDetach.VolumeSpec), "volume_detach"), + CompleteFunc: util.OperationCompleteHook(util.GetFullQualifiedPluginNameForVolume(pluginName, volumeToDetach.VolumeSpec), DetachOperationName), EventRecorderFunc: nil, // nil because we do not want to generate event on error }, nil }