ad controller: lift nodeAttachedTo.mountedByNode

optimize adc.nodeUpdate(). Time complexity reduced from O(n) to O(1), where n is the number of nodes.

Data stored in nodeAttachedTo.mountedByNode is now at actualStateOfWorld.inUseVolumes.

This refactor also ensures that we can record the state update even if the volume is not present in ASW yet.

The added BenchmarkNodeUpdate result is reduced from 28076923 to 16030 ns/op.
The previous BenchmarkPopulateActualStateOfWorld result is also reduced from 13s to 8s.
This commit is contained in:
huweiwen 2023-12-20 13:17:32 +08:00 committed by 胡玮文
parent 3086d88dc6
commit 3a71fe57f7
5 changed files with 126 additions and 207 deletions

View File

@ -34,7 +34,6 @@ import (
"k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/runtime" "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/apimachinery/pkg/util/wait" "k8s.io/apimachinery/pkg/util/wait"
utilfeature "k8s.io/apiserver/pkg/util/feature" utilfeature "k8s.io/apiserver/pkg/util/feature"
coreinformers "k8s.io/client-go/informers/core/v1" coreinformers "k8s.io/client-go/informers/core/v1"
@ -380,7 +379,6 @@ func (adc *attachDetachController) populateActualStateOfWorld(logger klog.Logger
for _, node := range nodes { for _, node := range nodes {
nodeName := types.NodeName(node.Name) nodeName := types.NodeName(node.Name)
volumesInUse := sets.New(node.Status.VolumesInUse...)
for _, attachedVolume := range node.Status.VolumesAttached { for _, attachedVolume := range node.Status.VolumesAttached {
uniqueName := attachedVolume.Name uniqueName := attachedVolume.Name
@ -394,12 +392,8 @@ func (adc *attachDetachController) populateActualStateOfWorld(logger klog.Logger
logger.Error(err, "Failed to mark the volume as attached") logger.Error(err, "Failed to mark the volume as attached")
continue continue
} }
inUse := volumesInUse.Has(uniqueName)
err = adc.actualStateOfWorld.SetVolumeMountedByNode(logger, uniqueName, nodeName, inUse)
if err != nil {
logger.Error(err, "Failed to set volume mounted by node")
}
} }
adc.actualStateOfWorld.SetVolumesMountedByNode(logger, node.Status.VolumesInUse, nodeName)
adc.addNodeToDswp(node, types.NodeName(node.Name)) adc.addNodeToDswp(node, types.NodeName(node.Name))
} }
err = adc.processVolumeAttachments(logger) err = adc.processVolumeAttachments(logger)
@ -678,24 +672,7 @@ func (adc *attachDetachController) syncPVCByKey(logger klog.Logger, key string)
func (adc *attachDetachController) processVolumesInUse( func (adc *attachDetachController) processVolumesInUse(
logger klog.Logger, nodeName types.NodeName, volumesInUse []v1.UniqueVolumeName) { logger klog.Logger, nodeName types.NodeName, volumesInUse []v1.UniqueVolumeName) {
logger.V(4).Info("processVolumesInUse for node", "node", klog.KRef("", string(nodeName))) logger.V(4).Info("processVolumesInUse for node", "node", klog.KRef("", string(nodeName)))
for _, attachedVolume := range adc.actualStateOfWorld.GetAttachedVolumesForNode(nodeName) { adc.actualStateOfWorld.SetVolumesMountedByNode(logger, volumesInUse, nodeName)
mounted := false
for _, volumeInUse := range volumesInUse {
if attachedVolume.VolumeName == volumeInUse {
mounted = true
break
}
}
err := adc.actualStateOfWorld.SetVolumeMountedByNode(logger, attachedVolume.VolumeName, nodeName, mounted)
if err != nil {
logger.Info(
"SetVolumeMountedByNode returned an error",
"node", klog.KRef("", string(nodeName)),
"volumeName", attachedVolume.VolumeName,
"mounted", mounted,
"err", err)
}
}
} }
// Process Volume-Attachment objects. // Process Volume-Attachment objects.

View File

@ -19,6 +19,7 @@ package attachdetach
import ( import (
"context" "context"
"fmt" "fmt"
"runtime"
"testing" "testing"
"time" "time"
@ -44,44 +45,9 @@ const (
csiPDUniqueNamePrefix = "kubernetes.io/csi/pd.csi.storage.gke.io^projects/UNSPECIFIED/zones/UNSPECIFIED/disks/" csiPDUniqueNamePrefix = "kubernetes.io/csi/pd.csi.storage.gke.io^projects/UNSPECIFIED/zones/UNSPECIFIED/disks/"
) )
func Test_NewAttachDetachController_Positive(t *testing.T) { func createADC(t testing.TB, tCtx ktesting.TContext, fakeKubeClient *fake.Clientset,
// Arrange informerFactory informers.SharedInformerFactory, plugins []volume.VolumePlugin) *attachDetachController {
fakeKubeClient := controllervolumetesting.CreateTestClient()
informerFactory := informers.NewSharedInformerFactory(fakeKubeClient, controller.NoResyncPeriodFunc())
// Act
tCtx := ktesting.Init(t)
_, err := NewAttachDetachController(
tCtx,
fakeKubeClient,
informerFactory.Core().V1().Pods(),
informerFactory.Core().V1().Nodes(),
informerFactory.Core().V1().PersistentVolumeClaims(),
informerFactory.Core().V1().PersistentVolumes(),
informerFactory.Storage().V1().CSINodes(),
informerFactory.Storage().V1().CSIDrivers(),
informerFactory.Storage().V1().VolumeAttachments(),
nil, /* cloud */
nil, /* plugins */
nil, /* prober */
false,
5*time.Second,
false,
DefaultTimerConfig,
)
// Assert
if err != nil {
t.Fatalf("Run failed with error. Expected: <no error> Actual: <%v>", err)
}
}
func Test_AttachDetachControllerStateOfWorldPopulators_Positive(t *testing.T) {
// Arrange
fakeKubeClient := controllervolumetesting.CreateTestClient()
informerFactory := informers.NewSharedInformerFactory(fakeKubeClient, controller.NoResyncPeriodFunc())
logger, tCtx := ktesting.NewTestContext(t)
adcObj, err := NewAttachDetachController( adcObj, err := NewAttachDetachController(
tCtx, tCtx,
fakeKubeClient, fakeKubeClient,
@ -93,10 +59,10 @@ func Test_AttachDetachControllerStateOfWorldPopulators_Positive(t *testing.T) {
informerFactory.Storage().V1().CSIDrivers(), informerFactory.Storage().V1().CSIDrivers(),
informerFactory.Storage().V1().VolumeAttachments(), informerFactory.Storage().V1().VolumeAttachments(),
nil, /* cloud */ nil, /* cloud */
controllervolumetesting.CreateTestPlugin(), plugins,
nil, /* prober */ nil, /* prober */
false, false,
5*time.Second, 1*time.Second,
false, false,
DefaultTimerConfig, DefaultTimerConfig,
) )
@ -104,13 +70,32 @@ func Test_AttachDetachControllerStateOfWorldPopulators_Positive(t *testing.T) {
if err != nil { if err != nil {
t.Fatalf("Run failed with error. Expected: <no error> Actual: <%v>", err) t.Fatalf("Run failed with error. Expected: <no error> Actual: <%v>", err)
} }
adc := adcObj.(*attachDetachController) return adcObj.(*attachDetachController)
}
func Test_NewAttachDetachController_Positive(t *testing.T) {
// Arrange
fakeKubeClient := controllervolumetesting.CreateTestClient()
informerFactory := informers.NewSharedInformerFactory(fakeKubeClient, controller.NoResyncPeriodFunc())
tCtx := ktesting.Init(t)
// Act
createADC(t, tCtx, fakeKubeClient, informerFactory, nil)
}
func Test_AttachDetachControllerStateOfWorldPopulators_Positive(t *testing.T) {
// Arrange
fakeKubeClient := controllervolumetesting.CreateTestClient()
informerFactory := informers.NewSharedInformerFactory(fakeKubeClient, controller.NoResyncPeriodFunc())
logger, tCtx := ktesting.NewTestContext(t)
adc := createADC(t, tCtx, fakeKubeClient, informerFactory, controllervolumetesting.CreateTestPlugin())
// Act // Act
informerFactory.Start(tCtx.Done()) informerFactory.Start(tCtx.Done())
informerFactory.WaitForCacheSync(tCtx.Done()) informerFactory.WaitForCacheSync(tCtx.Done())
err = adc.populateActualStateOfWorld(logger) err := adc.populateActualStateOfWorld(logger)
if err != nil { if err != nil {
t.Fatalf("Run failed with error. Expected: <no error> Actual: <%v>", err) t.Fatalf("Run failed with error. Expected: <no error> Actual: <%v>", err)
} }
@ -163,12 +148,12 @@ func Test_AttachDetachControllerStateOfWorldPopulators_Positive(t *testing.T) {
} }
} }
func BenchmarkPopulateActualStateOfWorld(b *testing.B) { func largeClusterClient(t testing.TB, numNodes int) *fake.Clientset {
// Arrange // Arrange
fakeKubeClient := fake.NewSimpleClientset() fakeKubeClient := fake.NewSimpleClientset()
// populate 10000 nodes, each with 100 volumes // populate numNodes nodes, each with 100 volumes
for i := 0; i < 10000; i++ { for i := 0; i < numNodes; i++ {
nodeName := fmt.Sprintf("node-%d", i) nodeName := fmt.Sprintf("node-%d", i)
node := &v1.Node{ node := &v1.Node{
ObjectMeta: metav1.ObjectMeta{ ObjectMeta: metav1.ObjectMeta{
@ -194,51 +179,64 @@ func BenchmarkPopulateActualStateOfWorld(b *testing.B) {
}, },
}, metav1.CreateOptions{}) }, metav1.CreateOptions{})
if err != nil { if err != nil {
b.Fatalf("failed to create PV: %v", err) t.Fatalf("failed to create PV: %v", err)
} }
} }
_, err := fakeKubeClient.CoreV1().Nodes().Create(context.Background(), node, metav1.CreateOptions{}) _, err := fakeKubeClient.CoreV1().Nodes().Create(context.Background(), node, metav1.CreateOptions{})
if err != nil { if err != nil {
b.Fatalf("failed to create node: %v", err) t.Fatalf("failed to create node: %v", err)
} }
} }
return fakeKubeClient
}
func BenchmarkPopulateActualStateOfWorld(b *testing.B) {
fakeKubeClient := largeClusterClient(b, 10000)
informerFactory := informers.NewSharedInformerFactory(fakeKubeClient, controller.NoResyncPeriodFunc()) informerFactory := informers.NewSharedInformerFactory(fakeKubeClient, controller.NoResyncPeriodFunc())
logger, tCtx := ktesting.NewTestContext(b) logger, tCtx := ktesting.NewTestContext(b)
adcObj, err := NewAttachDetachController( adc := createADC(b, tCtx, fakeKubeClient, informerFactory, nil)
tCtx,
fakeKubeClient,
informerFactory.Core().V1().Pods(),
informerFactory.Core().V1().Nodes(),
informerFactory.Core().V1().PersistentVolumeClaims(),
informerFactory.Core().V1().PersistentVolumes(),
informerFactory.Storage().V1().CSINodes(),
informerFactory.Storage().V1().CSIDrivers(),
informerFactory.Storage().V1().VolumeAttachments(),
nil, /* cloud */
nil, /* plugins */
nil, /* prober */
false,
5*time.Second,
false,
DefaultTimerConfig,
)
if err != nil {
b.Fatalf("Run failed with error. Expected: <no error> Actual: <%v>", err)
}
adc := adcObj.(*attachDetachController)
// Act // Act
informerFactory.Start(tCtx.Done()) informerFactory.Start(tCtx.Done())
informerFactory.WaitForCacheSync(tCtx.Done()) informerFactory.WaitForCacheSync(tCtx.Done())
b.ResetTimer() b.ResetTimer()
err = adc.populateActualStateOfWorld(logger) for i := 0; i < b.N; i++ {
err := adc.populateActualStateOfWorld(logger)
if err != nil { if err != nil {
b.Fatalf("Run failed with error. Expected: <no error> Actual: <%v>", err) b.Fatalf("Run failed with error. Expected: <no error> Actual: <%v>", err)
} }
} }
}
func BenchmarkNodeUpdate(b *testing.B) {
fakeKubeClient := largeClusterClient(b, 3000)
informerFactory := informers.NewSharedInformerFactory(fakeKubeClient, controller.NoResyncPeriodFunc())
logger, tCtx := ktesting.NewTestContext(b)
adc := createADC(b, tCtx, fakeKubeClient, informerFactory, nil)
informerFactory.Start(tCtx.Done())
informerFactory.WaitForCacheSync(tCtx.Done())
err := adc.populateActualStateOfWorld(logger.V(2))
if err != nil {
b.Fatalf("Run failed with error. Expected: <no error> Actual: <%v>", err)
}
node, err := fakeKubeClient.CoreV1().Nodes().Get(context.Background(), "node-123", metav1.GetOptions{})
if err != nil {
b.Fatalf("Run failed with error. Expected: <no error> Actual: <%v>", err)
}
// Act
runtime.GC()
b.ResetTimer()
for i := 0; i < b.N; i++ {
adc.nodeUpdate(logger, node, node)
}
}
func Test_AttachDetachControllerRecovery(t *testing.T) { func Test_AttachDetachControllerRecovery(t *testing.T) {
attachDetachRecoveryTestCase(t, []*v1.Pod{}, []*v1.Pod{}) attachDetachRecoveryTestCase(t, []*v1.Pod{}, []*v1.Pod{})
@ -528,28 +526,7 @@ func volumeAttachmentRecoveryTestCase(t *testing.T, tc vaTest) {
// Create the controller // Create the controller
logger, tCtx := ktesting.NewTestContext(t) logger, tCtx := ktesting.NewTestContext(t)
adcObj, err := NewAttachDetachController( adc := createADC(t, tCtx, fakeKubeClient, informerFactory, plugins)
tCtx,
fakeKubeClient,
informerFactory.Core().V1().Pods(),
informerFactory.Core().V1().Nodes(),
informerFactory.Core().V1().PersistentVolumeClaims(),
informerFactory.Core().V1().PersistentVolumes(),
informerFactory.Storage().V1().CSINodes(),
informerFactory.Storage().V1().CSIDrivers(),
informerFactory.Storage().V1().VolumeAttachments(),
nil, /* cloud */
plugins,
nil, /* prober */
false,
1*time.Second,
false,
DefaultTimerConfig,
)
if err != nil {
t.Fatalf("NewAttachDetachController failed with error. Expected: <no error> Actual: <%v>", err)
}
adc := adcObj.(*attachDetachController)
// Add existing objects (created by testplugin) to the respective informers // Add existing objects (created by testplugin) to the respective informers
pods, err := fakeKubeClient.CoreV1().Pods(v1.NamespaceAll).List(tCtx, metav1.ListOptions{}) pods, err := fakeKubeClient.CoreV1().Pods(v1.NamespaceAll).List(tCtx, metav1.ListOptions{})

View File

@ -29,6 +29,7 @@ import (
v1 "k8s.io/api/core/v1" v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/resource" "k8s.io/apimachinery/pkg/api/resource"
"k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/klog/v2" "k8s.io/klog/v2"
"k8s.io/kubernetes/pkg/volume" "k8s.io/kubernetes/pkg/volume"
"k8s.io/kubernetes/pkg/volume/util" "k8s.io/kubernetes/pkg/volume/util"
@ -60,14 +61,13 @@ type ActualStateOfWorld interface {
// the specified volume, the node is added. // the specified volume, the node is added.
AddVolumeNode(logger klog.Logger, uniqueName v1.UniqueVolumeName, volumeSpec *volume.Spec, nodeName types.NodeName, devicePath string, attached bool) (v1.UniqueVolumeName, error) AddVolumeNode(logger klog.Logger, uniqueName v1.UniqueVolumeName, volumeSpec *volume.Spec, nodeName types.NodeName, devicePath string, attached bool) (v1.UniqueVolumeName, error)
// SetVolumeMountedByNode sets the MountedByNode value for the given volume // SetVolumesMountedByNode sets all the volumes mounted by the given node.
// and node. When set to true the mounted parameter indicates the volume // These volumes should include attached volumes, not-yet-attached volumes,
// and may also include non-attachable volumes.
// When present in the volumeNames parameter, the volume
// is mounted by the given node, indicating it may not be safe to detach. // is mounted by the given node, indicating it may not be safe to detach.
// If no volume with the name volumeName exists in the store, an error is // Otherwise, the volume is not mounted by the given node.
// returned. SetVolumesMountedByNode(logger klog.Logger, volumeNames []v1.UniqueVolumeName, nodeName types.NodeName)
// If no node with the name nodeName exists in list of attached nodes for
// the specified volume, an error is returned.
SetVolumeMountedByNode(logger klog.Logger, volumeName v1.UniqueVolumeName, nodeName types.NodeName, mounted bool) error
// SetNodeStatusUpdateNeeded sets statusUpdateNeeded for the specified // SetNodeStatusUpdateNeeded sets statusUpdateNeeded for the specified
// node to true indicating the AttachedVolume field in the Node's Status // node to true indicating the AttachedVolume field in the Node's Status
@ -147,7 +147,7 @@ type AttachedVolume struct {
// MountedByNode indicates that this volume has been mounted by the node and // MountedByNode indicates that this volume has been mounted by the node and
// is unsafe to detach. // is unsafe to detach.
// The value is set and unset by SetVolumeMountedByNode(...). // The value is set and unset by SetVolumesMountedByNode(...).
MountedByNode bool MountedByNode bool
// DetachRequestedTime is used to capture the desire to detach this volume. // DetachRequestedTime is used to capture the desire to detach this volume.
@ -188,6 +188,7 @@ func NewActualStateOfWorld(volumePluginMgr *volume.VolumePluginMgr) ActualStateO
return &actualStateOfWorld{ return &actualStateOfWorld{
attachedVolumes: make(map[v1.UniqueVolumeName]attachedVolume), attachedVolumes: make(map[v1.UniqueVolumeName]attachedVolume),
nodesToUpdateStatusFor: make(map[types.NodeName]nodeToUpdateStatusFor), nodesToUpdateStatusFor: make(map[types.NodeName]nodeToUpdateStatusFor),
inUseVolumes: make(map[types.NodeName]sets.Set[v1.UniqueVolumeName]),
volumePluginMgr: volumePluginMgr, volumePluginMgr: volumePluginMgr,
} }
} }
@ -205,6 +206,10 @@ type actualStateOfWorld struct {
// the node (including the list of volumes to report attached). // the node (including the list of volumes to report attached).
nodesToUpdateStatusFor map[types.NodeName]nodeToUpdateStatusFor nodesToUpdateStatusFor map[types.NodeName]nodeToUpdateStatusFor
// inUseVolumes is a map containing the set of volumes that are reported as
// in use by the kubelet.
inUseVolumes map[types.NodeName]sets.Set[v1.UniqueVolumeName]
// volumePluginMgr is the volume plugin manager used to create volume // volumePluginMgr is the volume plugin manager used to create volume
// plugin objects. // plugin objects.
volumePluginMgr *volume.VolumePluginMgr volumePluginMgr *volume.VolumePluginMgr
@ -239,10 +244,6 @@ type nodeAttachedTo struct {
// nodeName contains the name of this node. // nodeName contains the name of this node.
nodeName types.NodeName nodeName types.NodeName
// mountedByNode indicates that this node/volume combo is mounted by the
// node and is unsafe to detach
mountedByNode bool
// attachConfirmed indicates that the storage system verified the volume has been attached to this node. // attachConfirmed indicates that the storage system verified the volume has been attached to this node.
// This value is set to false when an attach operation fails and the volume may be attached or not. // This value is set to false when an attach operation fails and the volume may be attached or not.
attachedConfirmed bool attachedConfirmed bool
@ -363,10 +364,15 @@ func (asw *actualStateOfWorld) AddVolumeNode(
// Create object if it doesn't exist. // Create object if it doesn't exist.
node = nodeAttachedTo{ node = nodeAttachedTo{
nodeName: nodeName, nodeName: nodeName,
mountedByNode: true, // Assume mounted, until proven otherwise
attachedConfirmed: isAttached, attachedConfirmed: isAttached,
detachRequestedTime: time.Time{}, detachRequestedTime: time.Time{},
} }
// Assume mounted, until proven otherwise
if asw.inUseVolumes[nodeName] == nil {
asw.inUseVolumes[nodeName] = sets.New(volumeName)
} else {
asw.inUseVolumes[nodeName].Insert(volumeName)
}
} else { } else {
node.attachedConfirmed = isAttached node.attachedConfirmed = isAttached
logger.V(5).Info("Volume is already added to attachedVolume list to the node", logger.V(5).Info("Volume is already added to attachedVolume list to the node",
@ -384,24 +390,15 @@ func (asw *actualStateOfWorld) AddVolumeNode(
return volumeName, nil return volumeName, nil
} }
func (asw *actualStateOfWorld) SetVolumeMountedByNode( func (asw *actualStateOfWorld) SetVolumesMountedByNode(
logger klog.Logger, logger klog.Logger, volumeNames []v1.UniqueVolumeName, nodeName types.NodeName) {
volumeName v1.UniqueVolumeName, nodeName types.NodeName, mounted bool) error {
asw.Lock() asw.Lock()
defer asw.Unlock() defer asw.Unlock()
volumeObj, nodeObj, err := asw.getNodeAndVolume(volumeName, nodeName) asw.inUseVolumes[nodeName] = sets.New(volumeNames...)
if err != nil { logger.V(5).Info("SetVolumesMountedByNode volume to the node",
return fmt.Errorf("failed to SetVolumeMountedByNode with error: %v", err)
}
nodeObj.mountedByNode = mounted
volumeObj.nodesAttachedTo[nodeName] = nodeObj
logger.V(4).Info("SetVolumeMountedByNode volume to the node",
"node", klog.KRef("", string(nodeName)), "node", klog.KRef("", string(nodeName)),
"volumeName", volumeName, "volumeNames", volumeNames)
"mounted", mounted)
return nil
} }
func (asw *actualStateOfWorld) ResetDetachRequestTime( func (asw *actualStateOfWorld) ResetDetachRequestTime(
@ -604,7 +601,7 @@ func (asw *actualStateOfWorld) GetAttachedVolumes() []AttachedVolume {
for _, nodeObj := range volumeObj.nodesAttachedTo { for _, nodeObj := range volumeObj.nodesAttachedTo {
attachedVolumes = append( attachedVolumes = append(
attachedVolumes, attachedVolumes,
getAttachedVolume(&volumeObj, &nodeObj)) asw.getAttachedVolume(&volumeObj, &nodeObj))
} }
} }
@ -622,7 +619,7 @@ func (asw *actualStateOfWorld) GetAttachedVolumesForNode(
if nodeObj, nodeExists := volumeObj.nodesAttachedTo[nodeName]; nodeExists { if nodeObj, nodeExists := volumeObj.nodesAttachedTo[nodeName]; nodeExists {
attachedVolumes = append( attachedVolumes = append(
attachedVolumes, attachedVolumes,
getAttachedVolume(&volumeObj, &nodeObj)) asw.getAttachedVolume(&volumeObj, &nodeObj))
} }
} }
@ -638,7 +635,7 @@ func (asw *actualStateOfWorld) GetAttachedVolumesPerNode() map[types.NodeName][]
for nodeName, nodeObj := range volumeObj.nodesAttachedTo { for nodeName, nodeObj := range volumeObj.nodesAttachedTo {
if nodeObj.attachedConfirmed { if nodeObj.attachedConfirmed {
volumes := attachedVolumesPerNode[nodeName] volumes := attachedVolumesPerNode[nodeName]
volumes = append(volumes, getAttachedVolume(&volumeObj, &nodeObj).AttachedVolume) volumes = append(volumes, asw.getAttachedVolume(&volumeObj, &nodeObj).AttachedVolume)
attachedVolumesPerNode[nodeName] = volumes attachedVolumesPerNode[nodeName] = volumes
} }
} }
@ -727,7 +724,7 @@ func (asw *actualStateOfWorld) getAttachedVolumeFromUpdateObject(volumesToReport
return attachedVolumes return attachedVolumes
} }
func getAttachedVolume( func (asw *actualStateOfWorld) getAttachedVolume(
attachedVolume *attachedVolume, attachedVolume *attachedVolume,
nodeAttachedTo *nodeAttachedTo) AttachedVolume { nodeAttachedTo *nodeAttachedTo) AttachedVolume {
return AttachedVolume{ return AttachedVolume{
@ -738,6 +735,6 @@ func getAttachedVolume(
DevicePath: attachedVolume.devicePath, DevicePath: attachedVolume.devicePath,
PluginIsAttachable: true, PluginIsAttachable: true,
}, },
MountedByNode: nodeAttachedTo.mountedByNode, MountedByNode: asw.inUseVolumes[nodeAttachedTo.nodeName].Has(attachedVolume.volumeName),
DetachRequestedTime: nodeAttachedTo.detachRequestedTime} DetachRequestedTime: nodeAttachedTo.detachRequestedTime}
} }

View File

@ -664,7 +664,7 @@ func Test_GetAttachedVolumes_Positive_OneVolumeTwoNodes(t *testing.T) {
// Populates data struct with one volume/node entry. // Populates data struct with one volume/node entry.
// Verifies mountedByNode is true and DetachRequestedTime is zero. // Verifies mountedByNode is true and DetachRequestedTime is zero.
func Test_SetVolumeMountedByNode_Positive_Set(t *testing.T) { func Test_SetVolumesMountedByNode_Positive_Set(t *testing.T) {
// Arrange // Arrange
volumePluginMgr, _ := volumetesting.GetTestVolumePluginMgr(t) volumePluginMgr, _ := volumetesting.GetTestVolumePluginMgr(t)
asw := NewActualStateOfWorld(volumePluginMgr) asw := NewActualStateOfWorld(volumePluginMgr)
@ -690,9 +690,9 @@ func Test_SetVolumeMountedByNode_Positive_Set(t *testing.T) {
} }
// Populates data struct with one volume/node entry. // Populates data struct with one volume/node entry.
// Calls SetVolumeMountedByNode twice, first setting mounted to true then false. // Calls SetVolumesMountedByNode twice, first setting mounted to true then false.
// Verifies mountedByNode is false. // Verifies mountedByNode is false.
func Test_SetVolumeMountedByNode_Positive_UnsetWithInitialSet(t *testing.T) { func Test_SetVolumesMountedByNode_Positive_UnsetWithInitialSet(t *testing.T) {
// Arrange // Arrange
volumePluginMgr, _ := volumetesting.GetTestVolumePluginMgr(t) volumePluginMgr, _ := volumetesting.GetTestVolumePluginMgr(t)
asw := NewActualStateOfWorld(volumePluginMgr) asw := NewActualStateOfWorld(volumePluginMgr)
@ -707,16 +707,8 @@ func Test_SetVolumeMountedByNode_Positive_UnsetWithInitialSet(t *testing.T) {
} }
// Act // Act
setVolumeMountedErr1 := asw.SetVolumeMountedByNode(logger, generatedVolumeName, nodeName, true /* mounted */) asw.SetVolumesMountedByNode(logger, []v1.UniqueVolumeName{generatedVolumeName}, nodeName)
setVolumeMountedErr2 := asw.SetVolumeMountedByNode(logger, generatedVolumeName, nodeName, false /* mounted */) asw.SetVolumesMountedByNode(logger, nil, nodeName)
// Assert
if setVolumeMountedErr1 != nil {
t.Fatalf("SetVolumeMountedByNode1 failed. Expected <no error> Actual: <%v>", setVolumeMountedErr1)
}
if setVolumeMountedErr2 != nil {
t.Fatalf("SetVolumeMountedByNode2 failed. Expected <no error> Actual: <%v>", setVolumeMountedErr2)
}
attachedVolumes := asw.GetAttachedVolumes() attachedVolumes := asw.GetAttachedVolumes()
if len(attachedVolumes) != 1 { if len(attachedVolumes) != 1 {
@ -727,9 +719,9 @@ func Test_SetVolumeMountedByNode_Positive_UnsetWithInitialSet(t *testing.T) {
} }
// Populates data struct with one volume/node entry. // Populates data struct with one volume/node entry.
// Calls SetVolumeMountedByNode once, setting mounted to false. // Calls SetVolumesMountedByNode once, setting mounted to false.
// Verifies mountedByNode is false because value is overwritten // Verifies mountedByNode is false because value is overwritten
func Test_SetVolumeMountedByNode_Positive_UnsetWithoutInitialSet(t *testing.T) { func Test_SetVolumesMountedByNode_Positive_UnsetWithoutInitialSet(t *testing.T) {
// Arrange // Arrange
volumePluginMgr, _ := volumetesting.GetTestVolumePluginMgr(t) volumePluginMgr, _ := volumetesting.GetTestVolumePluginMgr(t)
asw := NewActualStateOfWorld(volumePluginMgr) asw := NewActualStateOfWorld(volumePluginMgr)
@ -751,13 +743,9 @@ func Test_SetVolumeMountedByNode_Positive_UnsetWithoutInitialSet(t *testing.T) {
verifyAttachedVolume(t, attachedVolumes, generatedVolumeName, string(volumeName), nodeName, devicePath, true /* expectedMountedByNode */, false /* expectNonZeroDetachRequestedTime */) verifyAttachedVolume(t, attachedVolumes, generatedVolumeName, string(volumeName), nodeName, devicePath, true /* expectedMountedByNode */, false /* expectNonZeroDetachRequestedTime */)
// Act // Act
setVolumeMountedErr := asw.SetVolumeMountedByNode(logger, generatedVolumeName, nodeName, false /* mounted */) asw.SetVolumesMountedByNode(logger, nil, nodeName)
// Assert // Assert
if setVolumeMountedErr != nil {
t.Fatalf("SetVolumeMountedByNode failed. Expected <no error> Actual: <%v>", setVolumeMountedErr)
}
attachedVolumes = asw.GetAttachedVolumes() attachedVolumes = asw.GetAttachedVolumes()
if len(attachedVolumes) != 1 { if len(attachedVolumes) != 1 {
t.Fatalf("len(attachedVolumes) Expected: <1> Actual: <%v>", len(attachedVolumes)) t.Fatalf("len(attachedVolumes) Expected: <1> Actual: <%v>", len(attachedVolumes))
@ -767,10 +755,10 @@ func Test_SetVolumeMountedByNode_Positive_UnsetWithoutInitialSet(t *testing.T) {
} }
// Populates data struct with one volume/node entry. // Populates data struct with one volume/node entry.
// Calls SetVolumeMountedByNode twice, first setting mounted to true then false. // Calls SetVolumesMountedByNode twice, first setting mounted to true then false.
// Calls AddVolumeNode to readd the same volume/node. // Calls AddVolumeNode to readd the same volume/node.
// Verifies mountedByNode is false and detachRequestedTime is zero. // Verifies mountedByNode is false and detachRequestedTime is zero.
func Test_SetVolumeMountedByNode_Positive_UnsetWithInitialSetAddVolumeNodeNotReset(t *testing.T) { func Test_SetVolumesMountedByNode_Positive_UnsetWithInitialSetAddVolumeNodeNotReset(t *testing.T) {
// Arrange // Arrange
volumePluginMgr, _ := volumetesting.GetTestVolumePluginMgr(t) volumePluginMgr, _ := volumetesting.GetTestVolumePluginMgr(t)
asw := NewActualStateOfWorld(volumePluginMgr) asw := NewActualStateOfWorld(volumePluginMgr)
@ -785,17 +773,11 @@ func Test_SetVolumeMountedByNode_Positive_UnsetWithInitialSetAddVolumeNodeNotRes
} }
// Act // Act
setVolumeMountedErr1 := asw.SetVolumeMountedByNode(logger, generatedVolumeName, nodeName, true /* mounted */) asw.SetVolumesMountedByNode(logger, []v1.UniqueVolumeName{generatedVolumeName}, nodeName)
setVolumeMountedErr2 := asw.SetVolumeMountedByNode(logger, generatedVolumeName, nodeName, false /* mounted */) asw.SetVolumesMountedByNode(logger, nil, nodeName)
generatedVolumeName, addErr = asw.AddVolumeNode(logger, volumeName, volumeSpec, nodeName, devicePath, true) generatedVolumeName, addErr = asw.AddVolumeNode(logger, volumeName, volumeSpec, nodeName, devicePath, true)
// Assert // Assert
if setVolumeMountedErr1 != nil {
t.Fatalf("SetVolumeMountedByNode1 failed. Expected <no error> Actual: <%v>", setVolumeMountedErr1)
}
if setVolumeMountedErr2 != nil {
t.Fatalf("SetVolumeMountedByNode2 failed. Expected <no error> Actual: <%v>", setVolumeMountedErr2)
}
if addErr != nil { if addErr != nil {
t.Fatalf("AddVolumeNode failed. Expected: <no error> Actual: <%v>", addErr) t.Fatalf("AddVolumeNode failed. Expected: <no error> Actual: <%v>", addErr)
} }
@ -810,9 +792,9 @@ func Test_SetVolumeMountedByNode_Positive_UnsetWithInitialSetAddVolumeNodeNotRes
// Populates data struct with one volume/node entry. // Populates data struct with one volume/node entry.
// Calls RemoveVolumeFromReportAsAttached() once on volume/node entry. // Calls RemoveVolumeFromReportAsAttached() once on volume/node entry.
// Calls SetVolumeMountedByNode() twice, first setting mounted to true then false. // Calls SetVolumesMountedByNode() twice, first setting mounted to true then false.
// Verifies mountedByNode is false and detachRequestedTime is NOT zero. // Verifies mountedByNode is false and detachRequestedTime is NOT zero.
func Test_SetVolumeMountedByNode_Positive_UnsetWithInitialSetVerifyDetachRequestedTimePerserved(t *testing.T) { func Test_SetVolumesMountedByNode_Positive_UnsetWithInitialSetVerifyDetachRequestedTimePerserved(t *testing.T) {
// Arrange // Arrange
volumePluginMgr, _ := volumetesting.GetTestVolumePluginMgr(t) volumePluginMgr, _ := volumetesting.GetTestVolumePluginMgr(t)
asw := NewActualStateOfWorld(volumePluginMgr) asw := NewActualStateOfWorld(volumePluginMgr)
@ -836,17 +818,10 @@ func Test_SetVolumeMountedByNode_Positive_UnsetWithInitialSetVerifyDetachRequest
expectedDetachRequestedTime := asw.GetAttachedVolumes()[0].DetachRequestedTime expectedDetachRequestedTime := asw.GetAttachedVolumes()[0].DetachRequestedTime
// Act // Act
setVolumeMountedErr1 := asw.SetVolumeMountedByNode(logger, generatedVolumeName, nodeName, true /* mounted */) asw.SetVolumesMountedByNode(logger, []v1.UniqueVolumeName{generatedVolumeName}, nodeName)
setVolumeMountedErr2 := asw.SetVolumeMountedByNode(logger, generatedVolumeName, nodeName, false /* mounted */) asw.SetVolumesMountedByNode(logger, nil, nodeName)
// Assert // Assert
if setVolumeMountedErr1 != nil {
t.Fatalf("SetVolumeMountedByNode1 failed. Expected <no error> Actual: <%v>", setVolumeMountedErr1)
}
if setVolumeMountedErr2 != nil {
t.Fatalf("SetVolumeMountedByNode2 failed. Expected <no error> Actual: <%v>", setVolumeMountedErr2)
}
attachedVolumes := asw.GetAttachedVolumes() attachedVolumes := asw.GetAttachedVolumes()
if len(attachedVolumes) != 1 { if len(attachedVolumes) != 1 {
t.Fatalf("len(attachedVolumes) Expected: <1> Actual: <%v>", len(attachedVolumes)) t.Fatalf("len(attachedVolumes) Expected: <1> Actual: <%v>", len(attachedVolumes))
@ -966,10 +941,10 @@ func Test_MarkDesireToDetach_Positive_MarkedAddVolumeNodeReset(t *testing.T) {
} }
// Populates data struct with one volume/node entry. // Populates data struct with one volume/node entry.
// Calls SetVolumeMountedByNode() twice, first setting mounted to true then false. // Calls SetVolumesMountedByNode() twice, first setting mounted to true then false.
// Calls RemoveVolumeFromReportAsAttached() once on volume/node entry. // Calls RemoveVolumeFromReportAsAttached() once on volume/node entry.
// Verifies mountedByNode is false and detachRequestedTime is NOT zero. // Verifies mountedByNode is false and detachRequestedTime is NOT zero.
func Test_RemoveVolumeFromReportAsAttached_Positive_UnsetWithInitialSetVolumeMountedByNodePreserved(t *testing.T) { func Test_RemoveVolumeFromReportAsAttached_Positive_UnsetWithInitialSetVolumesMountedByNodePreserved(t *testing.T) {
// Arrange // Arrange
volumePluginMgr, _ := volumetesting.GetTestVolumePluginMgr(t) volumePluginMgr, _ := volumetesting.GetTestVolumePluginMgr(t)
asw := NewActualStateOfWorld(volumePluginMgr) asw := NewActualStateOfWorld(volumePluginMgr)
@ -982,15 +957,8 @@ func Test_RemoveVolumeFromReportAsAttached_Positive_UnsetWithInitialSetVolumeMou
if addErr != nil { if addErr != nil {
t.Fatalf("AddVolumeNode failed. Expected: <no error> Actual: <%v>", addErr) t.Fatalf("AddVolumeNode failed. Expected: <no error> Actual: <%v>", addErr)
} }
setVolumeMountedErr1 := asw.SetVolumeMountedByNode(logger, generatedVolumeName, nodeName, true /* mounted */) asw.SetVolumesMountedByNode(logger, []v1.UniqueVolumeName{generatedVolumeName}, nodeName)
setVolumeMountedErr2 := asw.SetVolumeMountedByNode(logger, generatedVolumeName, nodeName, false /* mounted */) asw.SetVolumesMountedByNode(logger, nil, nodeName)
if setVolumeMountedErr1 != nil {
t.Fatalf("SetVolumeMountedByNode1 failed. Expected <no error> Actual: <%v>", setVolumeMountedErr1)
}
if setVolumeMountedErr2 != nil {
t.Fatalf("SetVolumeMountedByNode2 failed. Expected <no error> Actual: <%v>", setVolumeMountedErr2)
}
// Act // Act
_, err := asw.SetDetachRequestTime(logger, generatedVolumeName, nodeName) _, err := asw.SetDetachRequestTime(logger, generatedVolumeName, nodeName)
if err != nil { if err != nil {

View File

@ -207,8 +207,8 @@ func Test_Run_Positive_OneDesiredVolumeAttachThenDetachWithUnmountedVolume(t *te
generatedVolumeName, generatedVolumeName,
nodeName) nodeName)
} }
asw.SetVolumeMountedByNode(logger, generatedVolumeName, nodeName, true /* mounted */) asw.SetVolumesMountedByNode(logger, []v1.UniqueVolumeName{generatedVolumeName}, nodeName)
asw.SetVolumeMountedByNode(logger, generatedVolumeName, nodeName, false /* mounted */) asw.SetVolumesMountedByNode(logger, nil, nodeName)
// Assert // Assert
waitForNewDetacherCallCount(t, 1 /* expectedCallCount */, fakePlugin) waitForNewDetacherCallCount(t, 1 /* expectedCallCount */, fakePlugin)
@ -364,8 +364,8 @@ func Test_Run_Negative_OneDesiredVolumeAttachThenDetachWithUnmountedVolumeUpdate
generatedVolumeName, generatedVolumeName,
nodeName) nodeName)
} }
asw.SetVolumeMountedByNode(logger, generatedVolumeName, nodeName, true /* mounted */) asw.SetVolumesMountedByNode(logger, []v1.UniqueVolumeName{generatedVolumeName}, nodeName)
asw.SetVolumeMountedByNode(logger, generatedVolumeName, nodeName, false /* mounted */) asw.SetVolumesMountedByNode(logger, nil, nodeName)
// Assert // Assert
verifyNewDetacherCallCount(t, true /* expectZeroNewDetacherCallCount */, fakePlugin) verifyNewDetacherCallCount(t, true /* expectZeroNewDetacherCallCount */, fakePlugin)
@ -619,7 +619,7 @@ func Test_Run_OneVolumeAttachAndDetachUncertainNodesWithReadWriteOnce(t *testing
// When volume is added to the node, it is set to mounted by default. Then the status will be updated by checking node status VolumeInUse. // When volume is added to the node, it is set to mounted by default. Then the status will be updated by checking node status VolumeInUse.
// Without this, the delete operation will be delayed due to mounted status // Without this, the delete operation will be delayed due to mounted status
asw.SetVolumeMountedByNode(logger, generatedVolumeName, nodeName1, false /* mounted */) asw.SetVolumesMountedByNode(logger, nil, nodeName1)
dsw.DeletePod(types.UniquePodName(podName1), generatedVolumeName, nodeName1) dsw.DeletePod(types.UniquePodName(podName1), generatedVolumeName, nodeName1)
@ -832,7 +832,7 @@ func Test_Run_OneVolumeAttachAndDetachTimeoutNodesWithReadWriteOnce(t *testing.T
// When volume is added to the node, it is set to mounted by default. Then the status will be updated by checking node status VolumeInUse. // When volume is added to the node, it is set to mounted by default. Then the status will be updated by checking node status VolumeInUse.
// Without this, the delete operation will be delayed due to mounted status // Without this, the delete operation will be delayed due to mounted status
asw.SetVolumeMountedByNode(logger, generatedVolumeName, nodeName1, false /* mounted */) asw.SetVolumesMountedByNode(logger, nil, nodeName1)
dsw.DeletePod(types.UniquePodName(podName1), generatedVolumeName, nodeName1) dsw.DeletePod(types.UniquePodName(podName1), generatedVolumeName, nodeName1)