mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-24 20:24:09 +00:00
Merge pull request #104526 from jingxu97/aug/volumeattach
Fix issue in node status updating VolumeAttached list
This commit is contained in:
commit
debd6c1e9e
@ -28,7 +28,7 @@ import (
|
||||
|
||||
"k8s.io/klog/v2"
|
||||
|
||||
"k8s.io/api/core/v1"
|
||||
v1 "k8s.io/api/core/v1"
|
||||
"k8s.io/apimachinery/pkg/types"
|
||||
"k8s.io/kubernetes/pkg/volume"
|
||||
"k8s.io/kubernetes/pkg/volume/util"
|
||||
|
@ -151,13 +151,13 @@ func (rc *reconciler) reconcile() {
|
||||
// The operation key format is different depending on whether the volume
|
||||
// allows multi attach across different nodes.
|
||||
if util.IsMultiAttachAllowed(attachedVolume.VolumeSpec) {
|
||||
if rc.attacherDetacher.IsOperationPending(attachedVolume.VolumeName, "" /* podName */, attachedVolume.NodeName) {
|
||||
klog.V(10).Infof("Operation for volume %q is already running for node %q. Can't start detach", attachedVolume.VolumeName, attachedVolume.NodeName)
|
||||
if !rc.attacherDetacher.IsOperationSafeToRetry(attachedVolume.VolumeName, "" /* podName */, attachedVolume.NodeName, operationexecutor.DetachOperationName) {
|
||||
klog.V(10).Infof("Operation for volume %q is already running or still in exponential backoff for node %q. Can't start detach", attachedVolume.VolumeName, attachedVolume.NodeName)
|
||||
continue
|
||||
}
|
||||
} else {
|
||||
if rc.attacherDetacher.IsOperationPending(attachedVolume.VolumeName, "" /* podName */, "" /* nodeName */) {
|
||||
klog.V(10).Infof("Operation for volume %q is already running in the cluster. Can't start detach for %q", attachedVolume.VolumeName, attachedVolume.NodeName)
|
||||
if !rc.attacherDetacher.IsOperationSafeToRetry(attachedVolume.VolumeName, "" /* podName */, "" /* nodeName */, operationexecutor.DetachOperationName) {
|
||||
klog.V(10).Infof("Operation for volume %q is already running or still in exponential backoff in the cluster. Can't start detach for %q", attachedVolume.VolumeName, attachedVolume.NodeName)
|
||||
continue
|
||||
}
|
||||
}
|
||||
@ -193,6 +193,8 @@ func (rc *reconciler) reconcile() {
|
||||
|
||||
// Before triggering volume detach, mark volume as detached and update the node status
|
||||
// If it fails to update node status, skip detach volume
|
||||
// If volume detach operation fails, the volume needs to be added back to report as attached so that node status
|
||||
// has the correct volume attachment information.
|
||||
err = rc.actualStateOfWorld.RemoveVolumeFromReportAsAttached(attachedVolume.VolumeName, attachedVolume.NodeName)
|
||||
if err != nil {
|
||||
klog.V(5).Infof("RemoveVolumeFromReportAsAttached failed while removing volume %q from node %q with: %v",
|
||||
@ -222,10 +224,17 @@ func (rc *reconciler) reconcile() {
|
||||
klog.Warningf(attachedVolume.GenerateMsgDetailed("attacherDetacher.DetachVolume started", fmt.Sprintf("This volume is not safe to detach, but maxWaitForUnmountDuration %v expired, force detaching", rc.maxWaitForUnmountDuration)))
|
||||
}
|
||||
}
|
||||
if err != nil && !exponentialbackoff.IsExponentialBackoff(err) {
|
||||
// Ignore exponentialbackoff.IsExponentialBackoff errors, they are expected.
|
||||
// Log all other errors.
|
||||
klog.Errorf(attachedVolume.GenerateErrorDetailed("attacherDetacher.DetachVolume failed to start", err).Error())
|
||||
if err != nil {
|
||||
// Add volume back to ReportAsAttached if DetachVolume call failed so that node status updater will add it back to VolumeAttached list.
|
||||
// This function is also called during executing the volume detach operation in operation_generoator.
|
||||
// It is needed here too because DetachVolume call might fail before executing the actual operation in operation_executor (e.g., cannot find volume plugin etc.)
|
||||
rc.actualStateOfWorld.AddVolumeToReportAsAttached(attachedVolume.VolumeName, attachedVolume.NodeName)
|
||||
|
||||
if !exponentialbackoff.IsExponentialBackoff(err) {
|
||||
// Ignore exponentialbackoff.IsExponentialBackoff errors, they are expected.
|
||||
// Log all other errors.
|
||||
klog.Errorf(attachedVolume.GenerateErrorDetailed("attacherDetacher.DetachVolume failed to start", err).Error())
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -20,7 +20,7 @@ import (
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"k8s.io/api/core/v1"
|
||||
v1 "k8s.io/api/core/v1"
|
||||
k8stypes "k8s.io/apimachinery/pkg/types"
|
||||
"k8s.io/apimachinery/pkg/util/wait"
|
||||
"k8s.io/client-go/informers"
|
||||
@ -36,7 +36,7 @@ import (
|
||||
)
|
||||
|
||||
const (
|
||||
reconcilerLoopPeriod time.Duration = 0 * time.Millisecond
|
||||
reconcilerLoopPeriod time.Duration = 10 * time.Millisecond
|
||||
syncLoopPeriod time.Duration = 100 * time.Minute
|
||||
maxWaitForUnmountDuration time.Duration = 50 * time.Millisecond
|
||||
)
|
||||
@ -599,6 +599,103 @@ func Test_Run_OneVolumeAttachAndDetachUncertainNodesWithReadWriteOnce(t *testing
|
||||
|
||||
}
|
||||
|
||||
func Test_Run_OneVolumeDetachFailNodeWithReadWriteOnce(t *testing.T) {
|
||||
// Arrange
|
||||
volumePluginMgr, _ := volumetesting.GetTestVolumePluginMgr(t)
|
||||
dsw := cache.NewDesiredStateOfWorld(volumePluginMgr)
|
||||
asw := cache.NewActualStateOfWorld(volumePluginMgr)
|
||||
fakeKubeClient := controllervolumetesting.CreateTestClient()
|
||||
fakeRecorder := &record.FakeRecorder{}
|
||||
fakeHandler := volumetesting.NewBlockVolumePathHandler()
|
||||
ad := operationexecutor.NewOperationExecutor(operationexecutor.NewOperationGenerator(
|
||||
fakeKubeClient,
|
||||
volumePluginMgr,
|
||||
fakeRecorder,
|
||||
false, /* checkNodeCapabilitiesBeforeMount */
|
||||
fakeHandler))
|
||||
nsu := statusupdater.NewFakeNodeStatusUpdater(false /* returnError */)
|
||||
reconciler := NewReconciler(
|
||||
reconcilerLoopPeriod, maxWaitForUnmountDuration, syncLoopPeriod, false, dsw, asw, ad, nsu, fakeRecorder)
|
||||
podName1 := "pod-uid1"
|
||||
podName2 := "pod-uid2"
|
||||
podName3 := "pod-uid3"
|
||||
volumeName := v1.UniqueVolumeName("volume-name")
|
||||
volumeSpec := controllervolumetesting.GetTestVolumeSpec(string(volumeName), volumeName)
|
||||
volumeSpec.PersistentVolume.Spec.AccessModes = []v1.PersistentVolumeAccessMode{v1.ReadWriteOnce}
|
||||
nodeName1 := k8stypes.NodeName(volumetesting.FailDetachNode)
|
||||
nodeName2 := k8stypes.NodeName("node-name2")
|
||||
dsw.AddNode(nodeName1, false /*keepTerminatedPodVolumes*/)
|
||||
dsw.AddNode(nodeName2, false /*keepTerminatedPodVolumes*/)
|
||||
|
||||
// Act
|
||||
ch := make(chan struct{})
|
||||
go reconciler.Run(ch)
|
||||
defer close(ch)
|
||||
|
||||
// Add the pod in which the volume is attached to the FailDetachNode
|
||||
generatedVolumeName, podAddErr := dsw.AddPod(types.UniquePodName(podName1), controllervolumetesting.NewPod(podName1, podName1), volumeSpec, nodeName1)
|
||||
if podAddErr != nil {
|
||||
t.Fatalf("AddPod failed. Expected: <no error> Actual: <%v>", podAddErr)
|
||||
}
|
||||
|
||||
time.Sleep(1000 * time.Millisecond)
|
||||
// Volume is added to asw, volume should be reported as attached to the node.
|
||||
waitForVolumeAddedToNode(t, generatedVolumeName, nodeName1, asw)
|
||||
verifyVolumeAttachedToNode(t, generatedVolumeName, nodeName1, cache.AttachStateAttached, asw)
|
||||
verifyVolumeReportedAsAttachedToNode(t, generatedVolumeName, nodeName1, true, asw)
|
||||
|
||||
// Delete the pod, but detach will fail
|
||||
dsw.DeletePod(types.UniquePodName(podName1), generatedVolumeName, nodeName1)
|
||||
|
||||
// The first detach will be triggered after at leaset 50ms (maxWaitForUnmountDuration in test).
|
||||
// Right before detach operation is performed, the volume will be first removed from being reported
|
||||
// as attached on node status (RemoveVolumeFromReportAsAttached). After detach operation which is expected to fail,
|
||||
// controller then added the volume back as attached.
|
||||
// Here it sleeps 100ms so that detach should be triggered already at this point.
|
||||
// verifyVolumeReportedAsAttachedToNode will check volume is in the list of volume attached that needs to be updated
|
||||
// in node status. By calling this function (GetVolumesToReportAttached), node status should be updated, and the volume
|
||||
// will not need to be updated until new changes are applied (detach is triggered again)
|
||||
time.Sleep(100 * time.Millisecond)
|
||||
verifyVolumeAttachedToNode(t, generatedVolumeName, nodeName1, cache.AttachStateAttached, asw)
|
||||
verifyVolumeReportedAsAttachedToNode(t, generatedVolumeName, nodeName1, true, asw)
|
||||
|
||||
// After the first detach fails, reconciler will wait for a period of time before retrying to detach.
|
||||
// The wait time is increasing exponentially from initial value of 0.5s (0.5, 1, 2, 4, ...).
|
||||
// The test here waits for 100 Millisecond to make sure it is in exponential backoff period after
|
||||
// the first detach operation. At this point, volumes status should not be updated
|
||||
time.Sleep(100 * time.Millisecond)
|
||||
verifyVolumeAttachedToNode(t, generatedVolumeName, nodeName1, cache.AttachStateAttached, asw)
|
||||
verifyVolumeNoStatusUpdateNeeded(t, generatedVolumeName, nodeName1, asw)
|
||||
|
||||
// Wait for 600ms to make sure second detach operation triggered. Again, The volume will be
|
||||
// removed from being reported as attached on node status and then added back as attached.
|
||||
// The volume will be in the list of attached volumes that need to be updated to node status.
|
||||
time.Sleep(600 * time.Millisecond)
|
||||
verifyVolumeAttachedToNode(t, generatedVolumeName, nodeName1, cache.AttachStateAttached, asw)
|
||||
verifyVolumeReportedAsAttachedToNode(t, generatedVolumeName, nodeName1, true, asw)
|
||||
|
||||
// Add a second pod which tries to attach the volume to the same node.
|
||||
// After adding pod to the same node, detach will not be triggered any more.
|
||||
generatedVolumeName, podAddErr = dsw.AddPod(types.UniquePodName(podName2), controllervolumetesting.NewPod(podName2, podName2), volumeSpec, nodeName1)
|
||||
if podAddErr != nil {
|
||||
t.Fatalf("AddPod failed. Expected: <no error> Actual: <%v>", podAddErr)
|
||||
}
|
||||
// Sleep 1s to verify no detach are triggered after second pod is added in the future.
|
||||
time.Sleep(1000 * time.Millisecond)
|
||||
verifyVolumeAttachedToNode(t, generatedVolumeName, nodeName1, cache.AttachStateAttached, asw)
|
||||
verifyVolumeNoStatusUpdateNeeded(t, generatedVolumeName, nodeName1, asw)
|
||||
|
||||
// Add a third pod which tries to attach the volume to a different node.
|
||||
// At this point, volume is still attached to first node. There are no status update for both nodes.
|
||||
generatedVolumeName, podAddErr = dsw.AddPod(types.UniquePodName(podName3), controllervolumetesting.NewPod(podName3, podName3), volumeSpec, nodeName2)
|
||||
if podAddErr != nil {
|
||||
t.Fatalf("AddPod failed. Expected: <no error> Actual: <%v>", podAddErr)
|
||||
}
|
||||
verifyVolumeAttachedToNode(t, generatedVolumeName, nodeName1, cache.AttachStateAttached, asw)
|
||||
verifyVolumeNoStatusUpdateNeeded(t, generatedVolumeName, nodeName1, asw)
|
||||
verifyVolumeNoStatusUpdateNeeded(t, generatedVolumeName, nodeName2, asw)
|
||||
}
|
||||
|
||||
// Creates a volume with accessMode ReadWriteOnce
|
||||
// First create a pod which will try to attach the volume to the a node named "timeout-node". The attach call for this node will
|
||||
// fail for timeout, but the volume will be actually attached to the node after the call.
|
||||
@ -1181,6 +1278,22 @@ func verifyVolumeReportedAsAttachedToNode(
|
||||
|
||||
}
|
||||
|
||||
func verifyVolumeNoStatusUpdateNeeded(
|
||||
t *testing.T,
|
||||
volumeName v1.UniqueVolumeName,
|
||||
nodeName k8stypes.NodeName,
|
||||
asw cache.ActualStateOfWorld,
|
||||
) {
|
||||
volumes := asw.GetVolumesToReportAttached()
|
||||
for _, volume := range volumes[nodeName] {
|
||||
if volume.Name == volumeName {
|
||||
t.Fatalf("Check volume <%v> is reported as need to update status on node <%v>, expected false",
|
||||
volumeName,
|
||||
nodeName)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func verifyNewDetacherCallCount(
|
||||
t *testing.T,
|
||||
expectZeroNewDetacherCallCount bool,
|
||||
|
@ -51,6 +51,8 @@ const (
|
||||
// The node is marked as uncertain. The attach operation will fail and return timeout error
|
||||
// for the first attach call. The following call will return sucesssfully.
|
||||
UncertainAttachNode = "uncertain-attach-node"
|
||||
// The detach operation will keep failing on the node.
|
||||
FailDetachNode = "fail-detach-node"
|
||||
// The node is marked as timeout. The attach operation will always fail and return timeout error
|
||||
// but the operation is actually succeeded.
|
||||
TimeoutAttachNode = "timeout-attach-node"
|
||||
@ -1083,6 +1085,10 @@ func (fv *FakeVolume) Detach(volumeName string, nodeName types.NodeName) error {
|
||||
return fmt.Errorf("trying to detach volume %q that is not attached to the node %q", volumeName, node)
|
||||
}
|
||||
|
||||
if nodeName == FailDetachNode {
|
||||
return fmt.Errorf("fail to detach volume %q to node %q", volumeName, nodeName)
|
||||
}
|
||||
|
||||
volumeNodes.Delete(node)
|
||||
if volumeNodes.Len() == 0 {
|
||||
delete(fv.VolumesAttached, volumeName)
|
||||
|
@ -28,7 +28,7 @@ import (
|
||||
"fmt"
|
||||
"sync"
|
||||
|
||||
"k8s.io/api/core/v1"
|
||||
v1 "k8s.io/api/core/v1"
|
||||
"k8s.io/apimachinery/pkg/types"
|
||||
k8sRuntime "k8s.io/apimachinery/pkg/util/runtime"
|
||||
"k8s.io/klog/v2"
|
||||
@ -106,6 +106,13 @@ type NestedPendingOperations interface {
|
||||
volumeName v1.UniqueVolumeName,
|
||||
podName volumetypes.UniquePodName,
|
||||
nodeName types.NodeName) bool
|
||||
|
||||
// IsOperationSafeToRetry returns false if an operation for the given volumeName
|
||||
// and one of podName or nodeName is pending or in exponential backoff, otherwise it returns true
|
||||
IsOperationSafeToRetry(
|
||||
volumeName v1.UniqueVolumeName,
|
||||
podName volumetypes.UniquePodName,
|
||||
nodeName types.NodeName, operationName string) bool
|
||||
}
|
||||
|
||||
// NewNestedPendingOperations returns a new instance of NestedPendingOperations.
|
||||
@ -185,6 +192,33 @@ func (grm *nestedPendingOperations) Run(
|
||||
|
||||
return nil
|
||||
}
|
||||
func (grm *nestedPendingOperations) IsOperationSafeToRetry(
|
||||
volumeName v1.UniqueVolumeName,
|
||||
podName volumetypes.UniquePodName,
|
||||
nodeName types.NodeName,
|
||||
operationName string) bool {
|
||||
|
||||
grm.lock.RLock()
|
||||
defer grm.lock.RUnlock()
|
||||
|
||||
opKey := operationKey{volumeName, podName, nodeName}
|
||||
exist, previousOpIndex := grm.isOperationExists(opKey)
|
||||
if !exist {
|
||||
return true
|
||||
}
|
||||
previousOp := grm.operations[previousOpIndex]
|
||||
if previousOp.operationPending {
|
||||
return false
|
||||
}
|
||||
backOffErr := previousOp.expBackoff.SafeToRetry(fmt.Sprintf("%+v", opKey))
|
||||
if backOffErr != nil {
|
||||
if previousOp.operationName == operationName {
|
||||
return false
|
||||
}
|
||||
}
|
||||
|
||||
return true
|
||||
}
|
||||
|
||||
func (grm *nestedPendingOperations) IsOperationPending(
|
||||
volumeName v1.UniqueVolumeName,
|
||||
|
@ -141,6 +141,9 @@ type OperationExecutor interface {
|
||||
// IsOperationPending returns true if an operation for the given volumeName
|
||||
// and one of podName or nodeName is pending, otherwise it returns false
|
||||
IsOperationPending(volumeName v1.UniqueVolumeName, podName volumetypes.UniquePodName, nodeName types.NodeName) bool
|
||||
// IsOperationSafeToRetry returns false if an operation for the given volumeName
|
||||
// and one of podName or nodeName is pending or in exponential backoff, otherwise it returns true
|
||||
IsOperationSafeToRetry(volumeName v1.UniqueVolumeName, podName volumetypes.UniquePodName, nodeName types.NodeName, operationName string) bool
|
||||
// ExpandInUseVolume will resize volume's file system to expected size without unmounting the volume.
|
||||
ExpandInUseVolume(volumeToMount VolumeToMount, actualStateOfWorld ActualStateOfWorldMounterUpdater) error
|
||||
// ReconstructVolumeOperation construct a new volumeSpec and returns it created by plugin
|
||||
@ -664,6 +667,14 @@ func (oe *operationExecutor) IsOperationPending(
|
||||
return oe.pendingOperations.IsOperationPending(volumeName, podName, nodeName)
|
||||
}
|
||||
|
||||
func (oe *operationExecutor) IsOperationSafeToRetry(
|
||||
volumeName v1.UniqueVolumeName,
|
||||
podName volumetypes.UniquePodName,
|
||||
nodeName types.NodeName,
|
||||
operationName string) bool {
|
||||
return oe.pendingOperations.IsOperationSafeToRetry(volumeName, podName, nodeName, operationName)
|
||||
}
|
||||
|
||||
func (oe *operationExecutor) AttachVolume(
|
||||
volumeToAttach VolumeToAttach,
|
||||
actualStateOfWorld ActualStateOfWorldAttacherUpdater) error {
|
||||
|
@ -48,6 +48,7 @@ import (
|
||||
const (
|
||||
unknownVolumePlugin string = "UnknownVolumePlugin"
|
||||
unknownAttachableVolumePlugin string = "UnknownAttachableVolumePlugin"
|
||||
DetachOperationName string = "volume_detach"
|
||||
)
|
||||
|
||||
// InTreeToCSITranslator contains methods required to check migratable status
|
||||
@ -491,9 +492,9 @@ func (og *operationGenerator) GenerateDetachVolumeFunc(
|
||||
}
|
||||
|
||||
return volumetypes.GeneratedOperations{
|
||||
OperationName: "volume_detach",
|
||||
OperationName: DetachOperationName,
|
||||
OperationFunc: detachVolumeFunc,
|
||||
CompleteFunc: util.OperationCompleteHook(util.GetFullQualifiedPluginNameForVolume(pluginName, volumeToDetach.VolumeSpec), "volume_detach"),
|
||||
CompleteFunc: util.OperationCompleteHook(util.GetFullQualifiedPluginNameForVolume(pluginName, volumeToDetach.VolumeSpec), DetachOperationName),
|
||||
EventRecorderFunc: nil, // nil because we do not want to generate event on error
|
||||
}, nil
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user