diff --git a/pkg/controller/volume/attachdetach/cache/actual_state_of_world.go b/pkg/controller/volume/attachdetach/cache/actual_state_of_world.go index 4d420e01706..652d5421d9a 100644 --- a/pkg/controller/volume/attachdetach/cache/actual_state_of_world.go +++ b/pkg/controller/volume/attachdetach/cache/actual_state_of_world.go @@ -135,6 +135,11 @@ type ActualStateOfWorld interface { // is considered, before the detach operation is triggered). GetVolumesToReportAttached() map[types.NodeName][]v1.AttachedVolume + // GetVolumesToReportAttachedForNode returns the list of volumes that should be reported as + // attached for the given node. It reports a boolean indicating if there is an update for that + // node and the corresponding attachedVolumes list. + GetVolumesToReportAttachedForNode(name types.NodeName) (bool, []v1.AttachedVolume) + // GetNodesToUpdateStatusFor returns the map of nodeNames to nodeToUpdateStatusFor GetNodesToUpdateStatusFor() map[types.NodeName]nodeToUpdateStatusFor } @@ -647,24 +652,13 @@ func (asw *actualStateOfWorld) GetNodesForAttachedVolume(volumeName v1.UniqueVol } func (asw *actualStateOfWorld) GetVolumesToReportAttached() map[types.NodeName][]v1.AttachedVolume { - asw.RLock() - defer asw.RUnlock() + asw.Lock() + defer asw.Unlock() volumesToReportAttached := make(map[types.NodeName][]v1.AttachedVolume) for nodeName, nodeToUpdateObj := range asw.nodesToUpdateStatusFor { if nodeToUpdateObj.statusUpdateNeeded { - attachedVolumes := make( - []v1.AttachedVolume, - 0, - len(nodeToUpdateObj.volumesToReportAsAttached) /* len */) - for _, volume := range nodeToUpdateObj.volumesToReportAsAttached { - attachedVolumes = append(attachedVolumes, - v1.AttachedVolume{ - Name: volume, - DevicePath: asw.attachedVolumes[volume].devicePath, - }) - } - volumesToReportAttached[nodeToUpdateObj.nodeName] = attachedVolumes + volumesToReportAttached[nodeToUpdateObj.nodeName] = asw.getAttachedVolumeFromUpdateObject(nodeToUpdateObj.volumesToReportAsAttached) } // When GetVolumesToReportAttached is called by node status updater, the current status // of this node will be updated, so set the flag statusUpdateNeeded to false indicating @@ -677,10 +671,48 @@ func (asw *actualStateOfWorld) GetVolumesToReportAttached() map[types.NodeName][ return volumesToReportAttached } +func (asw *actualStateOfWorld) GetVolumesToReportAttachedForNode(nodeName types.NodeName) (bool, []v1.AttachedVolume) { + asw.Lock() + defer asw.Unlock() + + nodeToUpdateObj, ok := asw.nodesToUpdateStatusFor[nodeName] + if !ok { + return false, nil + } + if !nodeToUpdateObj.statusUpdateNeeded { + return false, nil + } + + volumesToReportAttached := asw.getAttachedVolumeFromUpdateObject(nodeToUpdateObj.volumesToReportAsAttached) + // When GetVolumesToReportAttached is called by node status updater, the current status + // of this node will be updated, so set the flag statusUpdateNeeded to false indicating + // the current status is already updated. + if err := asw.updateNodeStatusUpdateNeeded(nodeName, false); err != nil { + klog.Errorf("Failed to update statusUpdateNeeded field when getting volumes: %v", err) + } + + return true, volumesToReportAttached +} + func (asw *actualStateOfWorld) GetNodesToUpdateStatusFor() map[types.NodeName]nodeToUpdateStatusFor { return asw.nodesToUpdateStatusFor } +func (asw *actualStateOfWorld) getAttachedVolumeFromUpdateObject(volumesToReportAsAttached map[v1.UniqueVolumeName]v1.UniqueVolumeName) []v1.AttachedVolume { + var attachedVolumes = make( + []v1.AttachedVolume, + 0, + len(volumesToReportAsAttached) /* len */) + for _, volume := range volumesToReportAsAttached { + attachedVolumes = append(attachedVolumes, + v1.AttachedVolume{ + Name: volume, + DevicePath: asw.attachedVolumes[volume].devicePath, + }) + } + return attachedVolumes +} + func getAttachedVolume( attachedVolume *attachedVolume, nodeAttachedTo *nodeAttachedTo) AttachedVolume { diff --git a/pkg/controller/volume/attachdetach/cache/actual_state_of_world_test.go b/pkg/controller/volume/attachdetach/cache/actual_state_of_world_test.go index 71406909a9a..984b4d7a7b1 100644 --- a/pkg/controller/volume/attachdetach/cache/actual_state_of_world_test.go +++ b/pkg/controller/volume/attachdetach/cache/actual_state_of_world_test.go @@ -1447,6 +1447,70 @@ func Test_MarkVolumeAsUncertain(t *testing.T) { verifyAttachedVolume(t, attachedVolumes, volumeName, string(volumeName), nodeName, "", true /* expectedMountedByNode */, false /* expectNonZeroDetachRequestedTime */) } +// Calls AddVolumeNode() once with attached set to true. +// Verifies GetVolumesToReportAttachedForNode has an update for the node. +// Call GetVolumesToReportAttachedForNode a second time for the node, verify it does not report +// an update is needed any more +// Then calls RemoveVolumeFromReportAsAttached() +// Verifies GetVolumesToReportAttachedForNode reports an update is needed +func Test_GetVolumesToReportAttachedForNode_Positive(t *testing.T) { + // Arrange + volumePluginMgr, _ := volumetesting.GetTestVolumePluginMgr(t) + asw := NewActualStateOfWorld(volumePluginMgr) + volumeName := v1.UniqueVolumeName("volume-name") + volumeSpec := controllervolumetesting.GetTestVolumeSpec(string(volumeName), volumeName) + + nodeName := types.NodeName("node-name") + devicePath := "fake/device/path" + + // Act + generatedVolumeName, err := asw.AddVolumeNode(volumeName, volumeSpec, nodeName, devicePath, true) + + // Assert + if err != nil { + t.Fatalf("AddVolumeNode failed. Expected: Actual: <%v>", err) + } + + needsUpdate, attachedVolumes := asw.GetVolumesToReportAttachedForNode(nodeName) + if !needsUpdate { + t.Fatalf("GetVolumesToReportAttachedForNode_Positive_NewVolumeNewNodeWithTrueAttached failed. Actual: Expect: Actual: <%v>", len(attachedVolumes)) + } + + needsUpdate, _ = asw.GetVolumesToReportAttachedForNode(nodeName) + if needsUpdate { + t.Fatalf("GetVolumesToReportAttachedForNode_Positive_NewVolumeNewNodeWithTrueAttached failed. Actual: Expect: Actual: <%v>", removeVolumeDetachErr) + } + + needsUpdate, attachedVolumes = asw.GetVolumesToReportAttachedForNode(nodeName) + if !needsUpdate { + t.Fatalf("GetVolumesToReportAttachedForNode_Positive_NewVolumeNewNodeWithTrueAttached failed. Actual: Expect: Actual: <%v>", len(attachedVolumes)) + } +} + +// Verifies GetVolumesToReportAttachedForNode reports no update needed for an unknown node. +func Test_GetVolumesToReportAttachedForNode_UnknownNode(t *testing.T) { + // Arrange + volumePluginMgr, _ := volumetesting.GetTestVolumePluginMgr(t) + asw := NewActualStateOfWorld(volumePluginMgr) + nodeName := types.NodeName("node-name") + + needsUpdate, _ := asw.GetVolumesToReportAttachedForNode(nodeName) + if needsUpdate { + t.Fatalf("GetVolumesToReportAttachedForNode_UnknownNode failed. Actual: Expect: 0 { + return fmt.Errorf("unable to update %d nodes", nodeIssues) + } + return nil +} - if err := nsu.updateNodeStatus(nodeName, nodeObj, attachedVolumes); err != nil { - // If update node status fails, reset flag statusUpdateNeeded back to true - // to indicate this node status needs to be updated again - nsu.actualStateOfWorld.SetNodeStatusUpdateNeeded(nodeName) +func (nsu *nodeStatusUpdater) UpdateNodeStatusForNode(nodeName types.NodeName) error { + needsUpdate, attachedVolumes := nsu.actualStateOfWorld.GetVolumesToReportAttachedForNode(nodeName) + if !needsUpdate { + return nil + } + return nsu.processNodeVolumes(nodeName, attachedVolumes) +} - klog.V(2).Infof( - "Could not update node status for %q; re-marking for update. %v", - nodeName, - err) +func (nsu *nodeStatusUpdater) processNodeVolumes(nodeName types.NodeName, attachedVolumes []v1.AttachedVolume) error { + nodeObj, err := nsu.nodeLister.Get(string(nodeName)) + if errors.IsNotFound(err) { + // If node does not exist, its status cannot be updated. + // Do nothing so that there is no retry until node is created. + klog.V(2).Infof( + "Could not update node status. Failed to find node %q in NodeInformer cache. Error: '%v'", + nodeName, + err) + return nil + } else if err != nil { + // For all other errors, log error and reset flag statusUpdateNeeded + // back to true to indicate this node status needs to be updated again. + klog.V(2).Infof("Error retrieving nodes from node lister. Error: %v", err) + nsu.actualStateOfWorld.SetNodeStatusUpdateNeeded(nodeName) + return err + } - // We currently always return immediately on error - return err - } + err = nsu.updateNodeStatus(nodeName, nodeObj, attachedVolumes) + if errors.IsNotFound(err) { + // If node does not exist, its status cannot be updated. + // Do nothing so that there is no retry until node is created. + klog.V(2).Infof( + "Could not update node status for %q; node does not exist - skipping", + nodeName) + return nil + } else if err != nil { + // If update node status fails, reset flag statusUpdateNeeded back to true + // to indicate this node status needs to be updated again + nsu.actualStateOfWorld.SetNodeStatusUpdateNeeded(nodeName) + + klog.V(2).Infof( + "Could not update node status for %q; re-marking for update. %v", + nodeName, + err) + + return err } return nil } diff --git a/pkg/controller/volume/attachdetach/statusupdater/node_status_updater_test.go b/pkg/controller/volume/attachdetach/statusupdater/node_status_updater_test.go new file mode 100644 index 00000000000..c51067b7c69 --- /dev/null +++ b/pkg/controller/volume/attachdetach/statusupdater/node_status_updater_test.go @@ -0,0 +1,222 @@ +/* +Copyright 2022 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package statusupdater + +import ( + "context" + "errors" + "fmt" + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/types" + "k8s.io/client-go/informers" + "k8s.io/client-go/kubernetes/fake" + core "k8s.io/client-go/testing" + "k8s.io/kubernetes/pkg/controller" + "k8s.io/kubernetes/pkg/controller/volume/attachdetach/cache" + controllervolumetesting "k8s.io/kubernetes/pkg/controller/volume/attachdetach/testing" + volumetesting "k8s.io/kubernetes/pkg/volume/testing" + "testing" +) + +// setupNodeStatusUpdate creates all the needed objects for testing. +// the initial environment has 2 nodes with no volumes attached +// and adds one volume to attach to each node to the actual state of the world +func setupNodeStatusUpdate(ctx context.Context, t *testing.T) (cache.ActualStateOfWorld, *fake.Clientset, NodeStatusUpdater) { + testNode1 := corev1.Node{ + TypeMeta: metav1.TypeMeta{ + Kind: "Node", + APIVersion: "v1", + }, + ObjectMeta: metav1.ObjectMeta{ + Name: "testnode-1", + }, + Status: corev1.NodeStatus{}, + } + testNode2 := corev1.Node{ + TypeMeta: metav1.TypeMeta{ + Kind: "Node", + APIVersion: "v1", + }, + ObjectMeta: metav1.ObjectMeta{ + Name: "testnode-2", + }, + Status: corev1.NodeStatus{}, + } + volumePluginMgr, _ := volumetesting.GetTestVolumePluginMgr(t) + asw := cache.NewActualStateOfWorld(volumePluginMgr) + fakeKubeClient := fake.NewSimpleClientset(&testNode1, &testNode2) + informerFactory := informers.NewSharedInformerFactory(fakeKubeClient, controller.NoResyncPeriodFunc()) + nodeInformer := informerFactory.Core().V1().Nodes() + nsu := NewNodeStatusUpdater(fakeKubeClient, nodeInformer.Lister(), asw) + + err := nodeInformer.Informer().GetStore().Add(&testNode1) + if err != nil { + t.Fatalf(".Informer().GetStore().Add failed. Expected: Actual: <%v>", err) + } + err = nodeInformer.Informer().GetStore().Add(&testNode2) + if err != nil { + t.Fatalf(".Informer().GetStore().Add failed. Expected: Actual: <%v>", err) + } + + volumeName1 := corev1.UniqueVolumeName("volume-name-1") + volumeName2 := corev1.UniqueVolumeName("volume-name-2") + volumeSpec1 := controllervolumetesting.GetTestVolumeSpec(string(volumeName1), volumeName1) + volumeSpec2 := controllervolumetesting.GetTestVolumeSpec(string(volumeName2), volumeName2) + + nodeName1 := types.NodeName("testnode-1") + nodeName2 := types.NodeName("testnode-2") + devicePath := "fake/device/path" + + _, err = asw.AddVolumeNode(volumeName1, volumeSpec1, nodeName1, devicePath, true) + if err != nil { + t.Fatalf("AddVolumeNode failed. Expected: Actual: <%v>", err) + } + _, err = asw.AddVolumeNode(volumeName2, volumeSpec2, nodeName2, devicePath, true) + if err != nil { + t.Fatalf("AddVolumeNode failed. Expected: Actual: <%v>", err) + } + + return asw, fakeKubeClient, nsu +} + +// TestNodeStatusUpdater_UpdateNodeStatuses_TwoNodesUpdate calls setup +// calls UpdateNodeStatuses() +// check that asw.GetVolumesToReportAttached reports nothing left to attach +// checks that each node status.volumesAttached is of length 1 and contains the correct volume +func TestNodeStatusUpdater_UpdateNodeStatuses_TwoNodesUpdate(t *testing.T) { + ctx := context.Background() + asw, fakeKubeClient, nsu := setupNodeStatusUpdate(ctx, t) + + err := nsu.UpdateNodeStatuses() + if err != nil { + t.Fatalf("UpdateNodeStatuses failed. Expected: Actual: <%v>", err) + } + + needToReport := asw.GetVolumesToReportAttached() + if len(needToReport) != 0 { + t.Fatalf("len(asw.GetVolumesToReportAttached()) Expected: <0> Actual: <%v>", len(needToReport)) + } + + node, err := fakeKubeClient.CoreV1().Nodes().Get(ctx, "testnode-1", metav1.GetOptions{}) + if err != nil { + t.Fatalf("Nodes().Get failed. Expected: Actual: <%v>", err) + } + if len(node.Status.VolumesAttached) != 1 { + t.Fatalf("len(node.Status.VolumesAttached) Expected: <1> Actual: <%v>", len(node.Status.VolumesAttached)) + } + if node.Status.VolumesAttached[0].Name != "volume-name-1" { + t.Fatalf("volumeName Expected: Actual: <%s>", node.Status.VolumesAttached[0].Name) + } + + node, err = fakeKubeClient.CoreV1().Nodes().Get(ctx, "testnode-2", metav1.GetOptions{}) + if err != nil { + t.Fatalf("Nodes().Get failed. Expected: Actual: <%v>", err) + } + if len(node.Status.VolumesAttached) != 1 { + t.Fatalf("len(node.Status.VolumesAttached) Expected: <1> Actual: <%v>", len(node.Status.VolumesAttached)) + } + if node.Status.VolumesAttached[0].Name != "volume-name-2" { + t.Fatalf("volumeName Expected: Actual: <%s>", node.Status.VolumesAttached[0].Name) + } +} + +func TestNodeStatusUpdater_UpdateNodeStatuses_FailureInFirstUpdate(t *testing.T) { + ctx := context.Background() + asw, fakeKubeClient, nsu := setupNodeStatusUpdate(ctx, t) + + var failedNode string + failedOnce := false + failureErr := fmt.Errorf("test generated error") + fakeKubeClient.PrependReactor("patch", "nodes", func(action core.Action) (handled bool, ret runtime.Object, err error) { + patchAction := action.(core.PatchAction) + if !failedOnce { + failedNode = patchAction.GetName() + failedOnce = true + return true, nil, failureErr + } + return false, nil, nil + }) + + err := nsu.UpdateNodeStatuses() + if errors.Is(err, failureErr) { + t.Fatalf("UpdateNodeStatuses failed. Expected: Actual: <%v>", err) + } + + needToReport := asw.GetVolumesToReportAttached() + if len(needToReport) != 1 { + t.Fatalf("len(asw.GetVolumesToReportAttached()) Expected: <1> Actual: <%v>", len(needToReport)) + } + if _, ok := needToReport[types.NodeName(failedNode)]; !ok { + t.Fatalf("GetVolumesToReportAttached() did not report correct node Expected: <%s> Actual: <%v>", failedNode, needToReport) + } + + nodes, err := fakeKubeClient.CoreV1().Nodes().List(ctx, metav1.ListOptions{}) + if err != nil { + t.Fatalf("Nodes().List failed. Expected: Actual: <%v>", err) + } + + if len(nodes.Items) != 2 { + t.Fatalf("len(nodes.Items) Expected: <2> Actual: <%v>", len(nodes.Items)) + } + + for _, node := range nodes.Items { + if node.Name == failedNode { + if len(node.Status.VolumesAttached) != 0 { + t.Fatalf("len(node.Status.VolumesAttached) Expected: <0> Actual: <%v>", len(node.Status.VolumesAttached)) + } + } else { + if len(node.Status.VolumesAttached) != 1 { + t.Fatalf("len(node.Status.VolumesAttached) Expected: <1> Actual: <%v>", len(node.Status.VolumesAttached)) + } + } + } +} + +// TestNodeStatusUpdater_UpdateNodeStatusForNode calls setup +// calls UpdateNodeStatusesForNode on testnode-1 +// check that asw.GetVolumesToReportAttached reports testnode-2 needs to be reported +// checks that testnode-1 status.volumesAttached is of length 1 and contains the correct volume +func TestNodeStatusUpdater_UpdateNodeStatusForNode(t *testing.T) { + ctx := context.Background() + asw, fakeKubeClient, nsu := setupNodeStatusUpdate(ctx, t) + + err := nsu.UpdateNodeStatusForNode("testnode-1") + if err != nil { + t.Fatalf("UpdateNodeStatuses failed. Expected: Actual: <%v>", err) + } + + needToReport := asw.GetVolumesToReportAttached() + if len(needToReport) != 1 { + t.Fatalf("len(asw.GetVolumesToReportAttached()) Expected: <1> Actual: <%v>", len(needToReport)) + } + if _, ok := needToReport["testnode-2"]; !ok { + t.Fatalf("GetVolumesToReportAttached() did not report correct node Expected: Actual: <%v>", needToReport) + } + + node, err := fakeKubeClient.CoreV1().Nodes().Get(ctx, "testnode-1", metav1.GetOptions{}) + if err != nil { + t.Fatalf("Nodes().Get failed. Expected: Actual: <%v>", err) + } + if len(node.Status.VolumesAttached) != 1 { + t.Fatalf("len(node.Status.VolumesAttached) Expected: <1> Actual: <%v>", len(node.Status.VolumesAttached)) + } + if node.Status.VolumesAttached[0].Name != "volume-name-1" { + t.Fatalf("volumeName Expected: Actual: <%s>", node.Status.VolumesAttached[0].Name) + } +}