From f1717baaaa13d3232e652664f17182f03cee10f5 Mon Sep 17 00:00:00 2001 From: Jean-Francois Remy Date: Wed, 16 Feb 2022 10:28:46 -0800 Subject: [PATCH 1/2] Fix nodes volumesAttached status not updated The UpdateNodeStatuses code stops too early in case there is an error when calling updateNodeStatus. It will return immediately which means any remaining node won't have its update status put back to true. Looking at the call sites for UpdateNodeStatuses, it appears this is not the only issue. If the lister call fails with anything but a Not Found error, it's silently ignored which is wrong in the detach path. Also the reconciler detach path calls UpdateNodeStatuses but the real intent is to only update the node currently processed in the loop and not proceed with the detach call if there is an error updating that specifi node volumesAttached property. With the current implementation, it will not proceed if there is an error updating another node (which is not completely bad but not ideal) and worse it will proceed if there is a lister error on that node which means the node volumesAttached property won't have been updated. To fix those issues, introduce the following changes: - [node_status_updater] introduce UpdateNodeStatusForNode which does what UpdateNodeStatuses does but only for the provided node - [node_status_updater] if the node lister call fails for anything but a Not Found error, we will return an error, not ignore it - [node_status_updater] if the update of a node volumesAttached properties fails we continue processing the other nodes - [actual_state_of_world] introduce GetVolumesToReportAttachedForNode which does what GetVolumesToReportAttached but for the node whose name is provided it returns a bool which indicates if the node in question needs an update as well as the volumesAttached list. It is used by UpdateNodeStatusForNode - [actual_state_of_world] use write lock in updateNodeStatusUpdateNeeded, we're modifying the map content - [reconciler] use UpdateNodeStatusForNode in the detach loop --- .../cache/actual_state_of_world.go | 61 ++++++++++---- .../attachdetach/reconciler/reconciler.go | 2 +- .../statusupdater/fake_node_status_updater.go | 9 +++ .../statusupdater/node_status_updater.go | 80 +++++++++++++------ 4 files changed, 111 insertions(+), 41 deletions(-) diff --git a/pkg/controller/volume/attachdetach/cache/actual_state_of_world.go b/pkg/controller/volume/attachdetach/cache/actual_state_of_world.go index 4d420e01706..23e39d8aa69 100644 --- a/pkg/controller/volume/attachdetach/cache/actual_state_of_world.go +++ b/pkg/controller/volume/attachdetach/cache/actual_state_of_world.go @@ -135,6 +135,11 @@ type ActualStateOfWorld interface { // is considered, before the detach operation is triggered). GetVolumesToReportAttached() map[types.NodeName][]v1.AttachedVolume + // GetVolumesToReportAttachedForNode returns the list of volumes that should be reported as + // attached for the given node. It reports a boolean indicating if there is an update for that + // node and the corresponding attachedVolumes list. + GetVolumesToReportAttachedForNode(name types.NodeName) (bool, []v1.AttachedVolume) + // GetNodesToUpdateStatusFor returns the map of nodeNames to nodeToUpdateStatusFor GetNodesToUpdateStatusFor() map[types.NodeName]nodeToUpdateStatusFor } @@ -647,24 +652,13 @@ func (asw *actualStateOfWorld) GetNodesForAttachedVolume(volumeName v1.UniqueVol } func (asw *actualStateOfWorld) GetVolumesToReportAttached() map[types.NodeName][]v1.AttachedVolume { - asw.RLock() - defer asw.RUnlock() + asw.Lock() + defer asw.Unlock() volumesToReportAttached := make(map[types.NodeName][]v1.AttachedVolume) for nodeName, nodeToUpdateObj := range asw.nodesToUpdateStatusFor { if nodeToUpdateObj.statusUpdateNeeded { - attachedVolumes := make( - []v1.AttachedVolume, - 0, - len(nodeToUpdateObj.volumesToReportAsAttached) /* len */) - for _, volume := range nodeToUpdateObj.volumesToReportAsAttached { - attachedVolumes = append(attachedVolumes, - v1.AttachedVolume{ - Name: volume, - DevicePath: asw.attachedVolumes[volume].devicePath, - }) - } - volumesToReportAttached[nodeToUpdateObj.nodeName] = attachedVolumes + volumesToReportAttached[nodeToUpdateObj.nodeName] = asw.getAttachedVolumeFromUpdateObject(nodeToUpdateObj.volumesToReportAsAttached) } // When GetVolumesToReportAttached is called by node status updater, the current status // of this node will be updated, so set the flag statusUpdateNeeded to false indicating @@ -677,10 +671,49 @@ func (asw *actualStateOfWorld) GetVolumesToReportAttached() map[types.NodeName][ return volumesToReportAttached } +func (asw *actualStateOfWorld) GetVolumesToReportAttachedForNode(nodeName types.NodeName) (bool, []v1.AttachedVolume) { + asw.Lock() + defer asw.Unlock() + + var attachedVolumes []v1.AttachedVolume + nodeToUpdateObj, ok := asw.nodesToUpdateStatusFor[nodeName] + if !ok { + return false, nil + } + if !nodeToUpdateObj.statusUpdateNeeded { + return false, nil + } + + attachedVolumes = asw.getAttachedVolumeFromUpdateObject(nodeToUpdateObj.volumesToReportAsAttached) + // When GetVolumesToReportAttached is called by node status updater, the current status + // of this node will be updated, so set the flag statusUpdateNeeded to false indicating + // the current status is already updated. + if err := asw.updateNodeStatusUpdateNeeded(nodeName, false); err != nil { + klog.Errorf("Failed to update statusUpdateNeeded field when getting volumes: %v", err) + } + + return true, attachedVolumes +} + func (asw *actualStateOfWorld) GetNodesToUpdateStatusFor() map[types.NodeName]nodeToUpdateStatusFor { return asw.nodesToUpdateStatusFor } +func (asw *actualStateOfWorld) getAttachedVolumeFromUpdateObject(volumesToReportAsAttached map[v1.UniqueVolumeName]v1.UniqueVolumeName) []v1.AttachedVolume { + var attachedVolumes = make( + []v1.AttachedVolume, + 0, + len(volumesToReportAsAttached) /* len */) + for _, volume := range volumesToReportAsAttached { + attachedVolumes = append(attachedVolumes, + v1.AttachedVolume{ + Name: volume, + DevicePath: asw.attachedVolumes[volume].devicePath, + }) + } + return attachedVolumes +} + func getAttachedVolume( attachedVolume *attachedVolume, nodeAttachedTo *nodeAttachedTo) AttachedVolume { diff --git a/pkg/controller/volume/attachdetach/reconciler/reconciler.go b/pkg/controller/volume/attachdetach/reconciler/reconciler.go index 463e0e3fac7..c3ba55bd135 100644 --- a/pkg/controller/volume/attachdetach/reconciler/reconciler.go +++ b/pkg/controller/volume/attachdetach/reconciler/reconciler.go @@ -202,7 +202,7 @@ func (rc *reconciler) reconcile() { } // Update Node Status to indicate volume is no longer safe to mount. - err = rc.nodeStatusUpdater.UpdateNodeStatuses() + err = rc.nodeStatusUpdater.UpdateNodeStatusForNode(attachedVolume.NodeName) if err != nil { // Skip detaching this volume if unable to update node status klog.ErrorS(err, "UpdateNodeStatuses failed while attempting to report volume as attached", "volume", attachedVolume) diff --git a/pkg/controller/volume/attachdetach/statusupdater/fake_node_status_updater.go b/pkg/controller/volume/attachdetach/statusupdater/fake_node_status_updater.go index b78e80e3863..a321293321b 100644 --- a/pkg/controller/volume/attachdetach/statusupdater/fake_node_status_updater.go +++ b/pkg/controller/volume/attachdetach/statusupdater/fake_node_status_updater.go @@ -18,6 +18,7 @@ package statusupdater import ( "fmt" + "k8s.io/apimachinery/pkg/types" ) func NewFakeNodeStatusUpdater(returnError bool) NodeStatusUpdater { @@ -37,3 +38,11 @@ func (fnsu *fakeNodeStatusUpdater) UpdateNodeStatuses() error { return nil } + +func (fnsu *fakeNodeStatusUpdater) UpdateNodeStatusForNode(nodeName types.NodeName) error { + if fnsu.returnError { + return fmt.Errorf("fake error on update node status") + } + + return nil +} diff --git a/pkg/controller/volume/attachdetach/statusupdater/node_status_updater.go b/pkg/controller/volume/attachdetach/statusupdater/node_status_updater.go index 9396872d570..a8a9415b37a 100644 --- a/pkg/controller/volume/attachdetach/statusupdater/node_status_updater.go +++ b/pkg/controller/volume/attachdetach/statusupdater/node_status_updater.go @@ -19,6 +19,7 @@ limitations under the License. package statusupdater import ( + "fmt" "k8s.io/klog/v2" "k8s.io/api/core/v1" @@ -36,6 +37,8 @@ type NodeStatusUpdater interface { // Gets a list of node statuses that should be updated from the actual state // of the world and updates them. UpdateNodeStatuses() error + // Update any pending status change for the given node + UpdateNodeStatusForNode(nodeName types.NodeName) error } // NewNodeStatusUpdater returns a new instance of NodeStatusUpdater. @@ -57,40 +60,65 @@ type nodeStatusUpdater struct { } func (nsu *nodeStatusUpdater) UpdateNodeStatuses() error { + var nodeIssues int // TODO: investigate right behavior if nodeName is empty // kubernetes/kubernetes/issues/37777 nodesToUpdate := nsu.actualStateOfWorld.GetVolumesToReportAttached() for nodeName, attachedVolumes := range nodesToUpdate { - nodeObj, err := nsu.nodeLister.Get(string(nodeName)) - if errors.IsNotFound(err) { - // If node does not exist, its status cannot be updated. - // Do nothing so that there is no retry until node is created. - klog.V(2).Infof( - "Could not update node status. Failed to find node %q in NodeInformer cache. Error: '%v'", - nodeName, - err) - continue - } else if err != nil { - // For all other errors, log error and reset flag statusUpdateNeeded - // back to true to indicate this node status needs to be updated again. - klog.V(2).Infof("Error retrieving nodes from node lister. Error: %v", err) - nsu.actualStateOfWorld.SetNodeStatusUpdateNeeded(nodeName) - continue + err := nsu.processNodeVolumes(nodeName, attachedVolumes) + if err != nil { + nodeIssues += 1 } + } + if nodeIssues > 0 { + return fmt.Errorf("unable to update %d nodes", nodeIssues) + } + return nil +} - if err := nsu.updateNodeStatus(nodeName, nodeObj, attachedVolumes); err != nil { - // If update node status fails, reset flag statusUpdateNeeded back to true - // to indicate this node status needs to be updated again - nsu.actualStateOfWorld.SetNodeStatusUpdateNeeded(nodeName) +func (nsu *nodeStatusUpdater) UpdateNodeStatusForNode(nodeName types.NodeName) error { + needsUpdate, attachedVolumes := nsu.actualStateOfWorld.GetVolumesToReportAttachedForNode(nodeName) + if !needsUpdate { + return nil + } + return nsu.processNodeVolumes(nodeName, attachedVolumes) +} - klog.V(2).Infof( - "Could not update node status for %q; re-marking for update. %v", - nodeName, - err) +func (nsu *nodeStatusUpdater) processNodeVolumes(nodeName types.NodeName, attachedVolumes []v1.AttachedVolume) error { + nodeObj, err := nsu.nodeLister.Get(string(nodeName)) + if errors.IsNotFound(err) { + // If node does not exist, its status cannot be updated. + // Do nothing so that there is no retry until node is created. + klog.V(2).Infof( + "Could not update node status. Failed to find node %q in NodeInformer cache. Error: '%v'", + nodeName, + err) + return nil + } else if err != nil { + // For all other errors, log error and reset flag statusUpdateNeeded + // back to true to indicate this node status needs to be updated again. + klog.V(2).Infof("Error retrieving nodes from node lister. Error: %v", err) + nsu.actualStateOfWorld.SetNodeStatusUpdateNeeded(nodeName) + return err + } - // We currently always return immediately on error - return err - } + err = nsu.updateNodeStatus(nodeName, nodeObj, attachedVolumes) + if errors.IsNotFound(err) { + klog.V(2).Infof( + "Could not update node status for %q; node does not exist - skipping", + nodeName) + return nil + } else if err != nil { + // If update node status fails, reset flag statusUpdateNeeded back to true + // to indicate this node status needs to be updated again + nsu.actualStateOfWorld.SetNodeStatusUpdateNeeded(nodeName) + + klog.V(2).Infof( + "Could not update node status for %q; re-marking for update. %v", + nodeName, + err) + + return err } return nil } From e83184568d0d6a4337f3f3892efbf2cf94c18da0 Mon Sep 17 00:00:00 2001 From: Jean-Francois Remy Date: Mon, 21 Feb 2022 13:51:15 -0800 Subject: [PATCH 2/2] Add unit tests - actual_state_of_world_test.go: test the new method GetVolumesToReportAttachedForNode for an existing node and a non-existing node - node_status_updater_test.go: test UpdateNodeStatuses and UpdateNodeStatuses in nominal case with 2 nodes getting one volume each. Test UpdateNodeStatuses with the first call to node.patch failing but the following one succeeding - add comment in node_status_updater.go - fix log line in reconciler.go - rename variable in actual_state_of_world.go --- .../cache/actual_state_of_world.go | 5 +- .../cache/actual_state_of_world_test.go | 64 +++++ .../attachdetach/reconciler/reconciler.go | 2 +- .../statusupdater/node_status_updater.go | 2 + .../statusupdater/node_status_updater_test.go | 222 ++++++++++++++++++ 5 files changed, 291 insertions(+), 4 deletions(-) create mode 100644 pkg/controller/volume/attachdetach/statusupdater/node_status_updater_test.go diff --git a/pkg/controller/volume/attachdetach/cache/actual_state_of_world.go b/pkg/controller/volume/attachdetach/cache/actual_state_of_world.go index 23e39d8aa69..652d5421d9a 100644 --- a/pkg/controller/volume/attachdetach/cache/actual_state_of_world.go +++ b/pkg/controller/volume/attachdetach/cache/actual_state_of_world.go @@ -675,7 +675,6 @@ func (asw *actualStateOfWorld) GetVolumesToReportAttachedForNode(nodeName types. asw.Lock() defer asw.Unlock() - var attachedVolumes []v1.AttachedVolume nodeToUpdateObj, ok := asw.nodesToUpdateStatusFor[nodeName] if !ok { return false, nil @@ -684,7 +683,7 @@ func (asw *actualStateOfWorld) GetVolumesToReportAttachedForNode(nodeName types. return false, nil } - attachedVolumes = asw.getAttachedVolumeFromUpdateObject(nodeToUpdateObj.volumesToReportAsAttached) + volumesToReportAttached := asw.getAttachedVolumeFromUpdateObject(nodeToUpdateObj.volumesToReportAsAttached) // When GetVolumesToReportAttached is called by node status updater, the current status // of this node will be updated, so set the flag statusUpdateNeeded to false indicating // the current status is already updated. @@ -692,7 +691,7 @@ func (asw *actualStateOfWorld) GetVolumesToReportAttachedForNode(nodeName types. klog.Errorf("Failed to update statusUpdateNeeded field when getting volumes: %v", err) } - return true, attachedVolumes + return true, volumesToReportAttached } func (asw *actualStateOfWorld) GetNodesToUpdateStatusFor() map[types.NodeName]nodeToUpdateStatusFor { diff --git a/pkg/controller/volume/attachdetach/cache/actual_state_of_world_test.go b/pkg/controller/volume/attachdetach/cache/actual_state_of_world_test.go index 71406909a9a..984b4d7a7b1 100644 --- a/pkg/controller/volume/attachdetach/cache/actual_state_of_world_test.go +++ b/pkg/controller/volume/attachdetach/cache/actual_state_of_world_test.go @@ -1447,6 +1447,70 @@ func Test_MarkVolumeAsUncertain(t *testing.T) { verifyAttachedVolume(t, attachedVolumes, volumeName, string(volumeName), nodeName, "", true /* expectedMountedByNode */, false /* expectNonZeroDetachRequestedTime */) } +// Calls AddVolumeNode() once with attached set to true. +// Verifies GetVolumesToReportAttachedForNode has an update for the node. +// Call GetVolumesToReportAttachedForNode a second time for the node, verify it does not report +// an update is needed any more +// Then calls RemoveVolumeFromReportAsAttached() +// Verifies GetVolumesToReportAttachedForNode reports an update is needed +func Test_GetVolumesToReportAttachedForNode_Positive(t *testing.T) { + // Arrange + volumePluginMgr, _ := volumetesting.GetTestVolumePluginMgr(t) + asw := NewActualStateOfWorld(volumePluginMgr) + volumeName := v1.UniqueVolumeName("volume-name") + volumeSpec := controllervolumetesting.GetTestVolumeSpec(string(volumeName), volumeName) + + nodeName := types.NodeName("node-name") + devicePath := "fake/device/path" + + // Act + generatedVolumeName, err := asw.AddVolumeNode(volumeName, volumeSpec, nodeName, devicePath, true) + + // Assert + if err != nil { + t.Fatalf("AddVolumeNode failed. Expected: Actual: <%v>", err) + } + + needsUpdate, attachedVolumes := asw.GetVolumesToReportAttachedForNode(nodeName) + if !needsUpdate { + t.Fatalf("GetVolumesToReportAttachedForNode_Positive_NewVolumeNewNodeWithTrueAttached failed. Actual: Expect: Actual: <%v>", len(attachedVolumes)) + } + + needsUpdate, _ = asw.GetVolumesToReportAttachedForNode(nodeName) + if needsUpdate { + t.Fatalf("GetVolumesToReportAttachedForNode_Positive_NewVolumeNewNodeWithTrueAttached failed. Actual: Expect: Actual: <%v>", removeVolumeDetachErr) + } + + needsUpdate, attachedVolumes = asw.GetVolumesToReportAttachedForNode(nodeName) + if !needsUpdate { + t.Fatalf("GetVolumesToReportAttachedForNode_Positive_NewVolumeNewNodeWithTrueAttached failed. Actual: Expect: Actual: <%v>", len(attachedVolumes)) + } +} + +// Verifies GetVolumesToReportAttachedForNode reports no update needed for an unknown node. +func Test_GetVolumesToReportAttachedForNode_UnknownNode(t *testing.T) { + // Arrange + volumePluginMgr, _ := volumetesting.GetTestVolumePluginMgr(t) + asw := NewActualStateOfWorld(volumePluginMgr) + nodeName := types.NodeName("node-name") + + needsUpdate, _ := asw.GetVolumesToReportAttachedForNode(nodeName) + if needsUpdate { + t.Fatalf("GetVolumesToReportAttachedForNode_UnknownNode failed. Actual: Expect: Actual: <%v>", err) + } + err = nodeInformer.Informer().GetStore().Add(&testNode2) + if err != nil { + t.Fatalf(".Informer().GetStore().Add failed. Expected: Actual: <%v>", err) + } + + volumeName1 := corev1.UniqueVolumeName("volume-name-1") + volumeName2 := corev1.UniqueVolumeName("volume-name-2") + volumeSpec1 := controllervolumetesting.GetTestVolumeSpec(string(volumeName1), volumeName1) + volumeSpec2 := controllervolumetesting.GetTestVolumeSpec(string(volumeName2), volumeName2) + + nodeName1 := types.NodeName("testnode-1") + nodeName2 := types.NodeName("testnode-2") + devicePath := "fake/device/path" + + _, err = asw.AddVolumeNode(volumeName1, volumeSpec1, nodeName1, devicePath, true) + if err != nil { + t.Fatalf("AddVolumeNode failed. Expected: Actual: <%v>", err) + } + _, err = asw.AddVolumeNode(volumeName2, volumeSpec2, nodeName2, devicePath, true) + if err != nil { + t.Fatalf("AddVolumeNode failed. Expected: Actual: <%v>", err) + } + + return asw, fakeKubeClient, nsu +} + +// TestNodeStatusUpdater_UpdateNodeStatuses_TwoNodesUpdate calls setup +// calls UpdateNodeStatuses() +// check that asw.GetVolumesToReportAttached reports nothing left to attach +// checks that each node status.volumesAttached is of length 1 and contains the correct volume +func TestNodeStatusUpdater_UpdateNodeStatuses_TwoNodesUpdate(t *testing.T) { + ctx := context.Background() + asw, fakeKubeClient, nsu := setupNodeStatusUpdate(ctx, t) + + err := nsu.UpdateNodeStatuses() + if err != nil { + t.Fatalf("UpdateNodeStatuses failed. Expected: Actual: <%v>", err) + } + + needToReport := asw.GetVolumesToReportAttached() + if len(needToReport) != 0 { + t.Fatalf("len(asw.GetVolumesToReportAttached()) Expected: <0> Actual: <%v>", len(needToReport)) + } + + node, err := fakeKubeClient.CoreV1().Nodes().Get(ctx, "testnode-1", metav1.GetOptions{}) + if err != nil { + t.Fatalf("Nodes().Get failed. Expected: Actual: <%v>", err) + } + if len(node.Status.VolumesAttached) != 1 { + t.Fatalf("len(node.Status.VolumesAttached) Expected: <1> Actual: <%v>", len(node.Status.VolumesAttached)) + } + if node.Status.VolumesAttached[0].Name != "volume-name-1" { + t.Fatalf("volumeName Expected: Actual: <%s>", node.Status.VolumesAttached[0].Name) + } + + node, err = fakeKubeClient.CoreV1().Nodes().Get(ctx, "testnode-2", metav1.GetOptions{}) + if err != nil { + t.Fatalf("Nodes().Get failed. Expected: Actual: <%v>", err) + } + if len(node.Status.VolumesAttached) != 1 { + t.Fatalf("len(node.Status.VolumesAttached) Expected: <1> Actual: <%v>", len(node.Status.VolumesAttached)) + } + if node.Status.VolumesAttached[0].Name != "volume-name-2" { + t.Fatalf("volumeName Expected: Actual: <%s>", node.Status.VolumesAttached[0].Name) + } +} + +func TestNodeStatusUpdater_UpdateNodeStatuses_FailureInFirstUpdate(t *testing.T) { + ctx := context.Background() + asw, fakeKubeClient, nsu := setupNodeStatusUpdate(ctx, t) + + var failedNode string + failedOnce := false + failureErr := fmt.Errorf("test generated error") + fakeKubeClient.PrependReactor("patch", "nodes", func(action core.Action) (handled bool, ret runtime.Object, err error) { + patchAction := action.(core.PatchAction) + if !failedOnce { + failedNode = patchAction.GetName() + failedOnce = true + return true, nil, failureErr + } + return false, nil, nil + }) + + err := nsu.UpdateNodeStatuses() + if errors.Is(err, failureErr) { + t.Fatalf("UpdateNodeStatuses failed. Expected: Actual: <%v>", err) + } + + needToReport := asw.GetVolumesToReportAttached() + if len(needToReport) != 1 { + t.Fatalf("len(asw.GetVolumesToReportAttached()) Expected: <1> Actual: <%v>", len(needToReport)) + } + if _, ok := needToReport[types.NodeName(failedNode)]; !ok { + t.Fatalf("GetVolumesToReportAttached() did not report correct node Expected: <%s> Actual: <%v>", failedNode, needToReport) + } + + nodes, err := fakeKubeClient.CoreV1().Nodes().List(ctx, metav1.ListOptions{}) + if err != nil { + t.Fatalf("Nodes().List failed. Expected: Actual: <%v>", err) + } + + if len(nodes.Items) != 2 { + t.Fatalf("len(nodes.Items) Expected: <2> Actual: <%v>", len(nodes.Items)) + } + + for _, node := range nodes.Items { + if node.Name == failedNode { + if len(node.Status.VolumesAttached) != 0 { + t.Fatalf("len(node.Status.VolumesAttached) Expected: <0> Actual: <%v>", len(node.Status.VolumesAttached)) + } + } else { + if len(node.Status.VolumesAttached) != 1 { + t.Fatalf("len(node.Status.VolumesAttached) Expected: <1> Actual: <%v>", len(node.Status.VolumesAttached)) + } + } + } +} + +// TestNodeStatusUpdater_UpdateNodeStatusForNode calls setup +// calls UpdateNodeStatusesForNode on testnode-1 +// check that asw.GetVolumesToReportAttached reports testnode-2 needs to be reported +// checks that testnode-1 status.volumesAttached is of length 1 and contains the correct volume +func TestNodeStatusUpdater_UpdateNodeStatusForNode(t *testing.T) { + ctx := context.Background() + asw, fakeKubeClient, nsu := setupNodeStatusUpdate(ctx, t) + + err := nsu.UpdateNodeStatusForNode("testnode-1") + if err != nil { + t.Fatalf("UpdateNodeStatuses failed. Expected: Actual: <%v>", err) + } + + needToReport := asw.GetVolumesToReportAttached() + if len(needToReport) != 1 { + t.Fatalf("len(asw.GetVolumesToReportAttached()) Expected: <1> Actual: <%v>", len(needToReport)) + } + if _, ok := needToReport["testnode-2"]; !ok { + t.Fatalf("GetVolumesToReportAttached() did not report correct node Expected: Actual: <%v>", needToReport) + } + + node, err := fakeKubeClient.CoreV1().Nodes().Get(ctx, "testnode-1", metav1.GetOptions{}) + if err != nil { + t.Fatalf("Nodes().Get failed. Expected: Actual: <%v>", err) + } + if len(node.Status.VolumesAttached) != 1 { + t.Fatalf("len(node.Status.VolumesAttached) Expected: <1> Actual: <%v>", len(node.Status.VolumesAttached)) + } + if node.Status.VolumesAttached[0].Name != "volume-name-1" { + t.Fatalf("volumeName Expected: Actual: <%s>", node.Status.VolumesAttached[0].Name) + } +}