Merge pull request #122411 from huww98/lift-mountedByNode

ad controller: lift nodeAttachedTo.mountedByNode
This commit is contained in:
Kubernetes Prow Robot 2024-04-18 00:00:14 -07:00 committed by GitHub
commit 854af6aba6
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
5 changed files with 126 additions and 207 deletions

View File

@ -34,7 +34,6 @@ import (
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/apimachinery/pkg/util/wait"
utilfeature "k8s.io/apiserver/pkg/util/feature"
coreinformers "k8s.io/client-go/informers/core/v1"
@ -380,7 +379,6 @@ func (adc *attachDetachController) populateActualStateOfWorld(logger klog.Logger
for _, node := range nodes {
nodeName := types.NodeName(node.Name)
volumesInUse := sets.New(node.Status.VolumesInUse...)
for _, attachedVolume := range node.Status.VolumesAttached {
uniqueName := attachedVolume.Name
@ -394,12 +392,8 @@ func (adc *attachDetachController) populateActualStateOfWorld(logger klog.Logger
logger.Error(err, "Failed to mark the volume as attached")
continue
}
inUse := volumesInUse.Has(uniqueName)
err = adc.actualStateOfWorld.SetVolumeMountedByNode(logger, uniqueName, nodeName, inUse)
if err != nil {
logger.Error(err, "Failed to set volume mounted by node")
}
}
adc.actualStateOfWorld.SetVolumesMountedByNode(logger, node.Status.VolumesInUse, nodeName)
adc.addNodeToDswp(node, types.NodeName(node.Name))
}
err = adc.processVolumeAttachments(logger)
@ -678,24 +672,7 @@ func (adc *attachDetachController) syncPVCByKey(logger klog.Logger, key string)
func (adc *attachDetachController) processVolumesInUse(
logger klog.Logger, nodeName types.NodeName, volumesInUse []v1.UniqueVolumeName) {
logger.V(4).Info("processVolumesInUse for node", "node", klog.KRef("", string(nodeName)))
for _, attachedVolume := range adc.actualStateOfWorld.GetAttachedVolumesForNode(nodeName) {
mounted := false
for _, volumeInUse := range volumesInUse {
if attachedVolume.VolumeName == volumeInUse {
mounted = true
break
}
}
err := adc.actualStateOfWorld.SetVolumeMountedByNode(logger, attachedVolume.VolumeName, nodeName, mounted)
if err != nil {
logger.Info(
"SetVolumeMountedByNode returned an error",
"node", klog.KRef("", string(nodeName)),
"volumeName", attachedVolume.VolumeName,
"mounted", mounted,
"err", err)
}
}
adc.actualStateOfWorld.SetVolumesMountedByNode(logger, volumesInUse, nodeName)
}
// Process Volume-Attachment objects.

View File

@ -19,6 +19,7 @@ package attachdetach
import (
"context"
"fmt"
"runtime"
"testing"
"time"
@ -44,44 +45,9 @@ const (
csiPDUniqueNamePrefix = "kubernetes.io/csi/pd.csi.storage.gke.io^projects/UNSPECIFIED/zones/UNSPECIFIED/disks/"
)
func Test_NewAttachDetachController_Positive(t *testing.T) {
// Arrange
fakeKubeClient := controllervolumetesting.CreateTestClient()
informerFactory := informers.NewSharedInformerFactory(fakeKubeClient, controller.NoResyncPeriodFunc())
func createADC(t testing.TB, tCtx ktesting.TContext, fakeKubeClient *fake.Clientset,
informerFactory informers.SharedInformerFactory, plugins []volume.VolumePlugin) *attachDetachController {
// Act
tCtx := ktesting.Init(t)
_, err := NewAttachDetachController(
tCtx,
fakeKubeClient,
informerFactory.Core().V1().Pods(),
informerFactory.Core().V1().Nodes(),
informerFactory.Core().V1().PersistentVolumeClaims(),
informerFactory.Core().V1().PersistentVolumes(),
informerFactory.Storage().V1().CSINodes(),
informerFactory.Storage().V1().CSIDrivers(),
informerFactory.Storage().V1().VolumeAttachments(),
nil, /* cloud */
nil, /* plugins */
nil, /* prober */
false,
5*time.Second,
false,
DefaultTimerConfig,
)
// Assert
if err != nil {
t.Fatalf("Run failed with error. Expected: <no error> Actual: <%v>", err)
}
}
func Test_AttachDetachControllerStateOfWorldPopulators_Positive(t *testing.T) {
// Arrange
fakeKubeClient := controllervolumetesting.CreateTestClient()
informerFactory := informers.NewSharedInformerFactory(fakeKubeClient, controller.NoResyncPeriodFunc())
logger, tCtx := ktesting.NewTestContext(t)
adcObj, err := NewAttachDetachController(
tCtx,
fakeKubeClient,
@ -93,10 +59,10 @@ func Test_AttachDetachControllerStateOfWorldPopulators_Positive(t *testing.T) {
informerFactory.Storage().V1().CSIDrivers(),
informerFactory.Storage().V1().VolumeAttachments(),
nil, /* cloud */
controllervolumetesting.CreateTestPlugin(),
plugins,
nil, /* prober */
false,
5*time.Second,
1*time.Second,
false,
DefaultTimerConfig,
)
@ -104,13 +70,32 @@ func Test_AttachDetachControllerStateOfWorldPopulators_Positive(t *testing.T) {
if err != nil {
t.Fatalf("Run failed with error. Expected: <no error> Actual: <%v>", err)
}
adc := adcObj.(*attachDetachController)
return adcObj.(*attachDetachController)
}
func Test_NewAttachDetachController_Positive(t *testing.T) {
// Arrange
fakeKubeClient := controllervolumetesting.CreateTestClient()
informerFactory := informers.NewSharedInformerFactory(fakeKubeClient, controller.NoResyncPeriodFunc())
tCtx := ktesting.Init(t)
// Act
createADC(t, tCtx, fakeKubeClient, informerFactory, nil)
}
func Test_AttachDetachControllerStateOfWorldPopulators_Positive(t *testing.T) {
// Arrange
fakeKubeClient := controllervolumetesting.CreateTestClient()
informerFactory := informers.NewSharedInformerFactory(fakeKubeClient, controller.NoResyncPeriodFunc())
logger, tCtx := ktesting.NewTestContext(t)
adc := createADC(t, tCtx, fakeKubeClient, informerFactory, controllervolumetesting.CreateTestPlugin())
// Act
informerFactory.Start(tCtx.Done())
informerFactory.WaitForCacheSync(tCtx.Done())
err = adc.populateActualStateOfWorld(logger)
err := adc.populateActualStateOfWorld(logger)
if err != nil {
t.Fatalf("Run failed with error. Expected: <no error> Actual: <%v>", err)
}
@ -163,12 +148,12 @@ func Test_AttachDetachControllerStateOfWorldPopulators_Positive(t *testing.T) {
}
}
func BenchmarkPopulateActualStateOfWorld(b *testing.B) {
func largeClusterClient(t testing.TB, numNodes int) *fake.Clientset {
// Arrange
fakeKubeClient := fake.NewSimpleClientset()
// populate 10000 nodes, each with 100 volumes
for i := 0; i < 10000; i++ {
// populate numNodes nodes, each with 100 volumes
for i := 0; i < numNodes; i++ {
nodeName := fmt.Sprintf("node-%d", i)
node := &v1.Node{
ObjectMeta: metav1.ObjectMeta{
@ -194,50 +179,63 @@ func BenchmarkPopulateActualStateOfWorld(b *testing.B) {
},
}, metav1.CreateOptions{})
if err != nil {
b.Fatalf("failed to create PV: %v", err)
t.Fatalf("failed to create PV: %v", err)
}
}
_, err := fakeKubeClient.CoreV1().Nodes().Create(context.Background(), node, metav1.CreateOptions{})
if err != nil {
b.Fatalf("failed to create node: %v", err)
t.Fatalf("failed to create node: %v", err)
}
}
return fakeKubeClient
}
func BenchmarkPopulateActualStateOfWorld(b *testing.B) {
fakeKubeClient := largeClusterClient(b, 10000)
informerFactory := informers.NewSharedInformerFactory(fakeKubeClient, controller.NoResyncPeriodFunc())
logger, tCtx := ktesting.NewTestContext(b)
adcObj, err := NewAttachDetachController(
tCtx,
fakeKubeClient,
informerFactory.Core().V1().Pods(),
informerFactory.Core().V1().Nodes(),
informerFactory.Core().V1().PersistentVolumeClaims(),
informerFactory.Core().V1().PersistentVolumes(),
informerFactory.Storage().V1().CSINodes(),
informerFactory.Storage().V1().CSIDrivers(),
informerFactory.Storage().V1().VolumeAttachments(),
nil, /* cloud */
nil, /* plugins */
nil, /* prober */
false,
5*time.Second,
false,
DefaultTimerConfig,
)
if err != nil {
b.Fatalf("Run failed with error. Expected: <no error> Actual: <%v>", err)
}
adc := adcObj.(*attachDetachController)
adc := createADC(b, tCtx, fakeKubeClient, informerFactory, nil)
// Act
informerFactory.Start(tCtx.Done())
informerFactory.WaitForCacheSync(tCtx.Done())
b.ResetTimer()
err = adc.populateActualStateOfWorld(logger)
for i := 0; i < b.N; i++ {
err := adc.populateActualStateOfWorld(logger)
if err != nil {
b.Fatalf("Run failed with error. Expected: <no error> Actual: <%v>", err)
}
}
}
func BenchmarkNodeUpdate(b *testing.B) {
fakeKubeClient := largeClusterClient(b, 3000)
informerFactory := informers.NewSharedInformerFactory(fakeKubeClient, controller.NoResyncPeriodFunc())
logger, tCtx := ktesting.NewTestContext(b)
adc := createADC(b, tCtx, fakeKubeClient, informerFactory, nil)
informerFactory.Start(tCtx.Done())
informerFactory.WaitForCacheSync(tCtx.Done())
err := adc.populateActualStateOfWorld(logger.V(2))
if err != nil {
b.Fatalf("Run failed with error. Expected: <no error> Actual: <%v>", err)
}
node, err := fakeKubeClient.CoreV1().Nodes().Get(context.Background(), "node-123", metav1.GetOptions{})
if err != nil {
b.Fatalf("Run failed with error. Expected: <no error> Actual: <%v>", err)
}
// Act
runtime.GC()
b.ResetTimer()
for i := 0; i < b.N; i++ {
adc.nodeUpdate(logger, node, node)
}
}
func Test_AttachDetachControllerRecovery(t *testing.T) {
@ -528,28 +526,7 @@ func volumeAttachmentRecoveryTestCase(t *testing.T, tc vaTest) {
// Create the controller
logger, tCtx := ktesting.NewTestContext(t)
adcObj, err := NewAttachDetachController(
tCtx,
fakeKubeClient,
informerFactory.Core().V1().Pods(),
informerFactory.Core().V1().Nodes(),
informerFactory.Core().V1().PersistentVolumeClaims(),
informerFactory.Core().V1().PersistentVolumes(),
informerFactory.Storage().V1().CSINodes(),
informerFactory.Storage().V1().CSIDrivers(),
informerFactory.Storage().V1().VolumeAttachments(),
nil, /* cloud */
plugins,
nil, /* prober */
false,
1*time.Second,
false,
DefaultTimerConfig,
)
if err != nil {
t.Fatalf("NewAttachDetachController failed with error. Expected: <no error> Actual: <%v>", err)
}
adc := adcObj.(*attachDetachController)
adc := createADC(t, tCtx, fakeKubeClient, informerFactory, plugins)
// Add existing objects (created by testplugin) to the respective informers
pods, err := fakeKubeClient.CoreV1().Pods(v1.NamespaceAll).List(tCtx, metav1.ListOptions{})

View File

@ -29,6 +29,7 @@ import (
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/resource"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/klog/v2"
"k8s.io/kubernetes/pkg/volume"
"k8s.io/kubernetes/pkg/volume/util"
@ -60,14 +61,13 @@ type ActualStateOfWorld interface {
// the specified volume, the node is added.
AddVolumeNode(logger klog.Logger, uniqueName v1.UniqueVolumeName, volumeSpec *volume.Spec, nodeName types.NodeName, devicePath string, attached bool) (v1.UniqueVolumeName, error)
// SetVolumeMountedByNode sets the MountedByNode value for the given volume
// and node. When set to true the mounted parameter indicates the volume
// SetVolumesMountedByNode sets all the volumes mounted by the given node.
// These volumes should include attached volumes, not-yet-attached volumes,
// and may also include non-attachable volumes.
// When present in the volumeNames parameter, the volume
// is mounted by the given node, indicating it may not be safe to detach.
// If no volume with the name volumeName exists in the store, an error is
// returned.
// If no node with the name nodeName exists in list of attached nodes for
// the specified volume, an error is returned.
SetVolumeMountedByNode(logger klog.Logger, volumeName v1.UniqueVolumeName, nodeName types.NodeName, mounted bool) error
// Otherwise, the volume is not mounted by the given node.
SetVolumesMountedByNode(logger klog.Logger, volumeNames []v1.UniqueVolumeName, nodeName types.NodeName)
// SetNodeStatusUpdateNeeded sets statusUpdateNeeded for the specified
// node to true indicating the AttachedVolume field in the Node's Status
@ -147,7 +147,7 @@ type AttachedVolume struct {
// MountedByNode indicates that this volume has been mounted by the node and
// is unsafe to detach.
// The value is set and unset by SetVolumeMountedByNode(...).
// The value is set and unset by SetVolumesMountedByNode(...).
MountedByNode bool
// DetachRequestedTime is used to capture the desire to detach this volume.
@ -188,6 +188,7 @@ func NewActualStateOfWorld(volumePluginMgr *volume.VolumePluginMgr) ActualStateO
return &actualStateOfWorld{
attachedVolumes: make(map[v1.UniqueVolumeName]attachedVolume),
nodesToUpdateStatusFor: make(map[types.NodeName]nodeToUpdateStatusFor),
inUseVolumes: make(map[types.NodeName]sets.Set[v1.UniqueVolumeName]),
volumePluginMgr: volumePluginMgr,
}
}
@ -205,6 +206,10 @@ type actualStateOfWorld struct {
// the node (including the list of volumes to report attached).
nodesToUpdateStatusFor map[types.NodeName]nodeToUpdateStatusFor
// inUseVolumes is a map containing the set of volumes that are reported as
// in use by the kubelet.
inUseVolumes map[types.NodeName]sets.Set[v1.UniqueVolumeName]
// volumePluginMgr is the volume plugin manager used to create volume
// plugin objects.
volumePluginMgr *volume.VolumePluginMgr
@ -239,10 +244,6 @@ type nodeAttachedTo struct {
// nodeName contains the name of this node.
nodeName types.NodeName
// mountedByNode indicates that this node/volume combo is mounted by the
// node and is unsafe to detach
mountedByNode bool
// attachConfirmed indicates that the storage system verified the volume has been attached to this node.
// This value is set to false when an attach operation fails and the volume may be attached or not.
attachedConfirmed bool
@ -363,10 +364,15 @@ func (asw *actualStateOfWorld) AddVolumeNode(
// Create object if it doesn't exist.
node = nodeAttachedTo{
nodeName: nodeName,
mountedByNode: true, // Assume mounted, until proven otherwise
attachedConfirmed: isAttached,
detachRequestedTime: time.Time{},
}
// Assume mounted, until proven otherwise
if asw.inUseVolumes[nodeName] == nil {
asw.inUseVolumes[nodeName] = sets.New(volumeName)
} else {
asw.inUseVolumes[nodeName].Insert(volumeName)
}
} else {
node.attachedConfirmed = isAttached
logger.V(5).Info("Volume is already added to attachedVolume list to the node",
@ -384,24 +390,15 @@ func (asw *actualStateOfWorld) AddVolumeNode(
return volumeName, nil
}
func (asw *actualStateOfWorld) SetVolumeMountedByNode(
logger klog.Logger,
volumeName v1.UniqueVolumeName, nodeName types.NodeName, mounted bool) error {
func (asw *actualStateOfWorld) SetVolumesMountedByNode(
logger klog.Logger, volumeNames []v1.UniqueVolumeName, nodeName types.NodeName) {
asw.Lock()
defer asw.Unlock()
volumeObj, nodeObj, err := asw.getNodeAndVolume(volumeName, nodeName)
if err != nil {
return fmt.Errorf("failed to SetVolumeMountedByNode with error: %v", err)
}
nodeObj.mountedByNode = mounted
volumeObj.nodesAttachedTo[nodeName] = nodeObj
logger.V(4).Info("SetVolumeMountedByNode volume to the node",
asw.inUseVolumes[nodeName] = sets.New(volumeNames...)
logger.V(5).Info("SetVolumesMountedByNode volume to the node",
"node", klog.KRef("", string(nodeName)),
"volumeName", volumeName,
"mounted", mounted)
return nil
"volumeNames", volumeNames)
}
func (asw *actualStateOfWorld) ResetDetachRequestTime(
@ -604,7 +601,7 @@ func (asw *actualStateOfWorld) GetAttachedVolumes() []AttachedVolume {
for _, nodeObj := range volumeObj.nodesAttachedTo {
attachedVolumes = append(
attachedVolumes,
getAttachedVolume(&volumeObj, &nodeObj))
asw.getAttachedVolume(&volumeObj, &nodeObj))
}
}
@ -622,7 +619,7 @@ func (asw *actualStateOfWorld) GetAttachedVolumesForNode(
if nodeObj, nodeExists := volumeObj.nodesAttachedTo[nodeName]; nodeExists {
attachedVolumes = append(
attachedVolumes,
getAttachedVolume(&volumeObj, &nodeObj))
asw.getAttachedVolume(&volumeObj, &nodeObj))
}
}
@ -638,7 +635,7 @@ func (asw *actualStateOfWorld) GetAttachedVolumesPerNode() map[types.NodeName][]
for nodeName, nodeObj := range volumeObj.nodesAttachedTo {
if nodeObj.attachedConfirmed {
volumes := attachedVolumesPerNode[nodeName]
volumes = append(volumes, getAttachedVolume(&volumeObj, &nodeObj).AttachedVolume)
volumes = append(volumes, asw.getAttachedVolume(&volumeObj, &nodeObj).AttachedVolume)
attachedVolumesPerNode[nodeName] = volumes
}
}
@ -727,7 +724,7 @@ func (asw *actualStateOfWorld) getAttachedVolumeFromUpdateObject(volumesToReport
return attachedVolumes
}
func getAttachedVolume(
func (asw *actualStateOfWorld) getAttachedVolume(
attachedVolume *attachedVolume,
nodeAttachedTo *nodeAttachedTo) AttachedVolume {
return AttachedVolume{
@ -738,6 +735,6 @@ func getAttachedVolume(
DevicePath: attachedVolume.devicePath,
PluginIsAttachable: true,
},
MountedByNode: nodeAttachedTo.mountedByNode,
MountedByNode: asw.inUseVolumes[nodeAttachedTo.nodeName].Has(attachedVolume.volumeName),
DetachRequestedTime: nodeAttachedTo.detachRequestedTime}
}

View File

@ -664,7 +664,7 @@ func Test_GetAttachedVolumes_Positive_OneVolumeTwoNodes(t *testing.T) {
// Populates data struct with one volume/node entry.
// Verifies mountedByNode is true and DetachRequestedTime is zero.
func Test_SetVolumeMountedByNode_Positive_Set(t *testing.T) {
func Test_SetVolumesMountedByNode_Positive_Set(t *testing.T) {
// Arrange
volumePluginMgr, _ := volumetesting.GetTestVolumePluginMgr(t)
asw := NewActualStateOfWorld(volumePluginMgr)
@ -690,9 +690,9 @@ func Test_SetVolumeMountedByNode_Positive_Set(t *testing.T) {
}
// Populates data struct with one volume/node entry.
// Calls SetVolumeMountedByNode twice, first setting mounted to true then false.
// Calls SetVolumesMountedByNode twice, first setting mounted to true then false.
// Verifies mountedByNode is false.
func Test_SetVolumeMountedByNode_Positive_UnsetWithInitialSet(t *testing.T) {
func Test_SetVolumesMountedByNode_Positive_UnsetWithInitialSet(t *testing.T) {
// Arrange
volumePluginMgr, _ := volumetesting.GetTestVolumePluginMgr(t)
asw := NewActualStateOfWorld(volumePluginMgr)
@ -707,16 +707,8 @@ func Test_SetVolumeMountedByNode_Positive_UnsetWithInitialSet(t *testing.T) {
}
// Act
setVolumeMountedErr1 := asw.SetVolumeMountedByNode(logger, generatedVolumeName, nodeName, true /* mounted */)
setVolumeMountedErr2 := asw.SetVolumeMountedByNode(logger, generatedVolumeName, nodeName, false /* mounted */)
// Assert
if setVolumeMountedErr1 != nil {
t.Fatalf("SetVolumeMountedByNode1 failed. Expected <no error> Actual: <%v>", setVolumeMountedErr1)
}
if setVolumeMountedErr2 != nil {
t.Fatalf("SetVolumeMountedByNode2 failed. Expected <no error> Actual: <%v>", setVolumeMountedErr2)
}
asw.SetVolumesMountedByNode(logger, []v1.UniqueVolumeName{generatedVolumeName}, nodeName)
asw.SetVolumesMountedByNode(logger, nil, nodeName)
attachedVolumes := asw.GetAttachedVolumes()
if len(attachedVolumes) != 1 {
@ -727,9 +719,9 @@ func Test_SetVolumeMountedByNode_Positive_UnsetWithInitialSet(t *testing.T) {
}
// Populates data struct with one volume/node entry.
// Calls SetVolumeMountedByNode once, setting mounted to false.
// Calls SetVolumesMountedByNode once, setting mounted to false.
// Verifies mountedByNode is false because value is overwritten
func Test_SetVolumeMountedByNode_Positive_UnsetWithoutInitialSet(t *testing.T) {
func Test_SetVolumesMountedByNode_Positive_UnsetWithoutInitialSet(t *testing.T) {
// Arrange
volumePluginMgr, _ := volumetesting.GetTestVolumePluginMgr(t)
asw := NewActualStateOfWorld(volumePluginMgr)
@ -751,13 +743,9 @@ func Test_SetVolumeMountedByNode_Positive_UnsetWithoutInitialSet(t *testing.T) {
verifyAttachedVolume(t, attachedVolumes, generatedVolumeName, string(volumeName), nodeName, devicePath, true /* expectedMountedByNode */, false /* expectNonZeroDetachRequestedTime */)
// Act
setVolumeMountedErr := asw.SetVolumeMountedByNode(logger, generatedVolumeName, nodeName, false /* mounted */)
asw.SetVolumesMountedByNode(logger, nil, nodeName)
// Assert
if setVolumeMountedErr != nil {
t.Fatalf("SetVolumeMountedByNode failed. Expected <no error> Actual: <%v>", setVolumeMountedErr)
}
attachedVolumes = asw.GetAttachedVolumes()
if len(attachedVolumes) != 1 {
t.Fatalf("len(attachedVolumes) Expected: <1> Actual: <%v>", len(attachedVolumes))
@ -767,10 +755,10 @@ func Test_SetVolumeMountedByNode_Positive_UnsetWithoutInitialSet(t *testing.T) {
}
// Populates data struct with one volume/node entry.
// Calls SetVolumeMountedByNode twice, first setting mounted to true then false.
// Calls SetVolumesMountedByNode twice, first setting mounted to true then false.
// Calls AddVolumeNode to readd the same volume/node.
// Verifies mountedByNode is false and detachRequestedTime is zero.
func Test_SetVolumeMountedByNode_Positive_UnsetWithInitialSetAddVolumeNodeNotReset(t *testing.T) {
func Test_SetVolumesMountedByNode_Positive_UnsetWithInitialSetAddVolumeNodeNotReset(t *testing.T) {
// Arrange
volumePluginMgr, _ := volumetesting.GetTestVolumePluginMgr(t)
asw := NewActualStateOfWorld(volumePluginMgr)
@ -785,17 +773,11 @@ func Test_SetVolumeMountedByNode_Positive_UnsetWithInitialSetAddVolumeNodeNotRes
}
// Act
setVolumeMountedErr1 := asw.SetVolumeMountedByNode(logger, generatedVolumeName, nodeName, true /* mounted */)
setVolumeMountedErr2 := asw.SetVolumeMountedByNode(logger, generatedVolumeName, nodeName, false /* mounted */)
asw.SetVolumesMountedByNode(logger, []v1.UniqueVolumeName{generatedVolumeName}, nodeName)
asw.SetVolumesMountedByNode(logger, nil, nodeName)
generatedVolumeName, addErr = asw.AddVolumeNode(logger, volumeName, volumeSpec, nodeName, devicePath, true)
// Assert
if setVolumeMountedErr1 != nil {
t.Fatalf("SetVolumeMountedByNode1 failed. Expected <no error> Actual: <%v>", setVolumeMountedErr1)
}
if setVolumeMountedErr2 != nil {
t.Fatalf("SetVolumeMountedByNode2 failed. Expected <no error> Actual: <%v>", setVolumeMountedErr2)
}
if addErr != nil {
t.Fatalf("AddVolumeNode failed. Expected: <no error> Actual: <%v>", addErr)
}
@ -810,9 +792,9 @@ func Test_SetVolumeMountedByNode_Positive_UnsetWithInitialSetAddVolumeNodeNotRes
// Populates data struct with one volume/node entry.
// Calls RemoveVolumeFromReportAsAttached() once on volume/node entry.
// Calls SetVolumeMountedByNode() twice, first setting mounted to true then false.
// Calls SetVolumesMountedByNode() twice, first setting mounted to true then false.
// Verifies mountedByNode is false and detachRequestedTime is NOT zero.
func Test_SetVolumeMountedByNode_Positive_UnsetWithInitialSetVerifyDetachRequestedTimePerserved(t *testing.T) {
func Test_SetVolumesMountedByNode_Positive_UnsetWithInitialSetVerifyDetachRequestedTimePerserved(t *testing.T) {
// Arrange
volumePluginMgr, _ := volumetesting.GetTestVolumePluginMgr(t)
asw := NewActualStateOfWorld(volumePluginMgr)
@ -836,17 +818,10 @@ func Test_SetVolumeMountedByNode_Positive_UnsetWithInitialSetVerifyDetachRequest
expectedDetachRequestedTime := asw.GetAttachedVolumes()[0].DetachRequestedTime
// Act
setVolumeMountedErr1 := asw.SetVolumeMountedByNode(logger, generatedVolumeName, nodeName, true /* mounted */)
setVolumeMountedErr2 := asw.SetVolumeMountedByNode(logger, generatedVolumeName, nodeName, false /* mounted */)
asw.SetVolumesMountedByNode(logger, []v1.UniqueVolumeName{generatedVolumeName}, nodeName)
asw.SetVolumesMountedByNode(logger, nil, nodeName)
// Assert
if setVolumeMountedErr1 != nil {
t.Fatalf("SetVolumeMountedByNode1 failed. Expected <no error> Actual: <%v>", setVolumeMountedErr1)
}
if setVolumeMountedErr2 != nil {
t.Fatalf("SetVolumeMountedByNode2 failed. Expected <no error> Actual: <%v>", setVolumeMountedErr2)
}
attachedVolumes := asw.GetAttachedVolumes()
if len(attachedVolumes) != 1 {
t.Fatalf("len(attachedVolumes) Expected: <1> Actual: <%v>", len(attachedVolumes))
@ -966,10 +941,10 @@ func Test_MarkDesireToDetach_Positive_MarkedAddVolumeNodeReset(t *testing.T) {
}
// Populates data struct with one volume/node entry.
// Calls SetVolumeMountedByNode() twice, first setting mounted to true then false.
// Calls SetVolumesMountedByNode() twice, first setting mounted to true then false.
// Calls RemoveVolumeFromReportAsAttached() once on volume/node entry.
// Verifies mountedByNode is false and detachRequestedTime is NOT zero.
func Test_RemoveVolumeFromReportAsAttached_Positive_UnsetWithInitialSetVolumeMountedByNodePreserved(t *testing.T) {
func Test_RemoveVolumeFromReportAsAttached_Positive_UnsetWithInitialSetVolumesMountedByNodePreserved(t *testing.T) {
// Arrange
volumePluginMgr, _ := volumetesting.GetTestVolumePluginMgr(t)
asw := NewActualStateOfWorld(volumePluginMgr)
@ -982,15 +957,8 @@ func Test_RemoveVolumeFromReportAsAttached_Positive_UnsetWithInitialSetVolumeMou
if addErr != nil {
t.Fatalf("AddVolumeNode failed. Expected: <no error> Actual: <%v>", addErr)
}
setVolumeMountedErr1 := asw.SetVolumeMountedByNode(logger, generatedVolumeName, nodeName, true /* mounted */)
setVolumeMountedErr2 := asw.SetVolumeMountedByNode(logger, generatedVolumeName, nodeName, false /* mounted */)
if setVolumeMountedErr1 != nil {
t.Fatalf("SetVolumeMountedByNode1 failed. Expected <no error> Actual: <%v>", setVolumeMountedErr1)
}
if setVolumeMountedErr2 != nil {
t.Fatalf("SetVolumeMountedByNode2 failed. Expected <no error> Actual: <%v>", setVolumeMountedErr2)
}
asw.SetVolumesMountedByNode(logger, []v1.UniqueVolumeName{generatedVolumeName}, nodeName)
asw.SetVolumesMountedByNode(logger, nil, nodeName)
// Act
_, err := asw.SetDetachRequestTime(logger, generatedVolumeName, nodeName)
if err != nil {

View File

@ -207,8 +207,8 @@ func Test_Run_Positive_OneDesiredVolumeAttachThenDetachWithUnmountedVolume(t *te
generatedVolumeName,
nodeName)
}
asw.SetVolumeMountedByNode(logger, generatedVolumeName, nodeName, true /* mounted */)
asw.SetVolumeMountedByNode(logger, generatedVolumeName, nodeName, false /* mounted */)
asw.SetVolumesMountedByNode(logger, []v1.UniqueVolumeName{generatedVolumeName}, nodeName)
asw.SetVolumesMountedByNode(logger, nil, nodeName)
// Assert
waitForNewDetacherCallCount(t, 1 /* expectedCallCount */, fakePlugin)
@ -364,8 +364,8 @@ func Test_Run_Negative_OneDesiredVolumeAttachThenDetachWithUnmountedVolumeUpdate
generatedVolumeName,
nodeName)
}
asw.SetVolumeMountedByNode(logger, generatedVolumeName, nodeName, true /* mounted */)
asw.SetVolumeMountedByNode(logger, generatedVolumeName, nodeName, false /* mounted */)
asw.SetVolumesMountedByNode(logger, []v1.UniqueVolumeName{generatedVolumeName}, nodeName)
asw.SetVolumesMountedByNode(logger, nil, nodeName)
// Assert
verifyNewDetacherCallCount(t, true /* expectZeroNewDetacherCallCount */, fakePlugin)
@ -619,7 +619,7 @@ func Test_Run_OneVolumeAttachAndDetachUncertainNodesWithReadWriteOnce(t *testing
// When volume is added to the node, it is set to mounted by default. Then the status will be updated by checking node status VolumeInUse.
// Without this, the delete operation will be delayed due to mounted status
asw.SetVolumeMountedByNode(logger, generatedVolumeName, nodeName1, false /* mounted */)
asw.SetVolumesMountedByNode(logger, nil, nodeName1)
dsw.DeletePod(types.UniquePodName(podName1), generatedVolumeName, nodeName1)
@ -824,7 +824,7 @@ func Test_Run_OneVolumeAttachAndDetachTimeoutNodesWithReadWriteOnce(t *testing.T
// When volume is added to the node, it is set to mounted by default. Then the status will be updated by checking node status VolumeInUse.
// Without this, the delete operation will be delayed due to mounted status
asw.SetVolumeMountedByNode(logger, generatedVolumeName, nodeName1, false /* mounted */)
asw.SetVolumesMountedByNode(logger, nil, nodeName1)
dsw.DeletePod(types.UniquePodName(podName1), generatedVolumeName, nodeName1)