diff --git a/pkg/controller/volume/attachdetach/cache/actual_state_of_world.go b/pkg/controller/volume/attachdetach/cache/actual_state_of_world.go index 4d420e01706..23e39d8aa69 100644 --- a/pkg/controller/volume/attachdetach/cache/actual_state_of_world.go +++ b/pkg/controller/volume/attachdetach/cache/actual_state_of_world.go @@ -135,6 +135,11 @@ type ActualStateOfWorld interface { // is considered, before the detach operation is triggered). GetVolumesToReportAttached() map[types.NodeName][]v1.AttachedVolume + // GetVolumesToReportAttachedForNode returns the list of volumes that should be reported as + // attached for the given node. It reports a boolean indicating if there is an update for that + // node and the corresponding attachedVolumes list. + GetVolumesToReportAttachedForNode(name types.NodeName) (bool, []v1.AttachedVolume) + // GetNodesToUpdateStatusFor returns the map of nodeNames to nodeToUpdateStatusFor GetNodesToUpdateStatusFor() map[types.NodeName]nodeToUpdateStatusFor } @@ -647,24 +652,13 @@ func (asw *actualStateOfWorld) GetNodesForAttachedVolume(volumeName v1.UniqueVol } func (asw *actualStateOfWorld) GetVolumesToReportAttached() map[types.NodeName][]v1.AttachedVolume { - asw.RLock() - defer asw.RUnlock() + asw.Lock() + defer asw.Unlock() volumesToReportAttached := make(map[types.NodeName][]v1.AttachedVolume) for nodeName, nodeToUpdateObj := range asw.nodesToUpdateStatusFor { if nodeToUpdateObj.statusUpdateNeeded { - attachedVolumes := make( - []v1.AttachedVolume, - 0, - len(nodeToUpdateObj.volumesToReportAsAttached) /* len */) - for _, volume := range nodeToUpdateObj.volumesToReportAsAttached { - attachedVolumes = append(attachedVolumes, - v1.AttachedVolume{ - Name: volume, - DevicePath: asw.attachedVolumes[volume].devicePath, - }) - } - volumesToReportAttached[nodeToUpdateObj.nodeName] = attachedVolumes + volumesToReportAttached[nodeToUpdateObj.nodeName] = asw.getAttachedVolumeFromUpdateObject(nodeToUpdateObj.volumesToReportAsAttached) } // When GetVolumesToReportAttached is called by node status updater, the current status // of this node will be updated, so set the flag statusUpdateNeeded to false indicating @@ -677,10 +671,49 @@ func (asw *actualStateOfWorld) GetVolumesToReportAttached() map[types.NodeName][ return volumesToReportAttached } +func (asw *actualStateOfWorld) GetVolumesToReportAttachedForNode(nodeName types.NodeName) (bool, []v1.AttachedVolume) { + asw.Lock() + defer asw.Unlock() + + var attachedVolumes []v1.AttachedVolume + nodeToUpdateObj, ok := asw.nodesToUpdateStatusFor[nodeName] + if !ok { + return false, nil + } + if !nodeToUpdateObj.statusUpdateNeeded { + return false, nil + } + + attachedVolumes = asw.getAttachedVolumeFromUpdateObject(nodeToUpdateObj.volumesToReportAsAttached) + // When GetVolumesToReportAttached is called by node status updater, the current status + // of this node will be updated, so set the flag statusUpdateNeeded to false indicating + // the current status is already updated. + if err := asw.updateNodeStatusUpdateNeeded(nodeName, false); err != nil { + klog.Errorf("Failed to update statusUpdateNeeded field when getting volumes: %v", err) + } + + return true, attachedVolumes +} + func (asw *actualStateOfWorld) GetNodesToUpdateStatusFor() map[types.NodeName]nodeToUpdateStatusFor { return asw.nodesToUpdateStatusFor } +func (asw *actualStateOfWorld) getAttachedVolumeFromUpdateObject(volumesToReportAsAttached map[v1.UniqueVolumeName]v1.UniqueVolumeName) []v1.AttachedVolume { + var attachedVolumes = make( + []v1.AttachedVolume, + 0, + len(volumesToReportAsAttached) /* len */) + for _, volume := range volumesToReportAsAttached { + attachedVolumes = append(attachedVolumes, + v1.AttachedVolume{ + Name: volume, + DevicePath: asw.attachedVolumes[volume].devicePath, + }) + } + return attachedVolumes +} + func getAttachedVolume( attachedVolume *attachedVolume, nodeAttachedTo *nodeAttachedTo) AttachedVolume { diff --git a/pkg/controller/volume/attachdetach/reconciler/reconciler.go b/pkg/controller/volume/attachdetach/reconciler/reconciler.go index 463e0e3fac7..c3ba55bd135 100644 --- a/pkg/controller/volume/attachdetach/reconciler/reconciler.go +++ b/pkg/controller/volume/attachdetach/reconciler/reconciler.go @@ -202,7 +202,7 @@ func (rc *reconciler) reconcile() { } // Update Node Status to indicate volume is no longer safe to mount. - err = rc.nodeStatusUpdater.UpdateNodeStatuses() + err = rc.nodeStatusUpdater.UpdateNodeStatusForNode(attachedVolume.NodeName) if err != nil { // Skip detaching this volume if unable to update node status klog.ErrorS(err, "UpdateNodeStatuses failed while attempting to report volume as attached", "volume", attachedVolume) diff --git a/pkg/controller/volume/attachdetach/statusupdater/fake_node_status_updater.go b/pkg/controller/volume/attachdetach/statusupdater/fake_node_status_updater.go index b78e80e3863..a321293321b 100644 --- a/pkg/controller/volume/attachdetach/statusupdater/fake_node_status_updater.go +++ b/pkg/controller/volume/attachdetach/statusupdater/fake_node_status_updater.go @@ -18,6 +18,7 @@ package statusupdater import ( "fmt" + "k8s.io/apimachinery/pkg/types" ) func NewFakeNodeStatusUpdater(returnError bool) NodeStatusUpdater { @@ -37,3 +38,11 @@ func (fnsu *fakeNodeStatusUpdater) UpdateNodeStatuses() error { return nil } + +func (fnsu *fakeNodeStatusUpdater) UpdateNodeStatusForNode(nodeName types.NodeName) error { + if fnsu.returnError { + return fmt.Errorf("fake error on update node status") + } + + return nil +} diff --git a/pkg/controller/volume/attachdetach/statusupdater/node_status_updater.go b/pkg/controller/volume/attachdetach/statusupdater/node_status_updater.go index 9396872d570..a8a9415b37a 100644 --- a/pkg/controller/volume/attachdetach/statusupdater/node_status_updater.go +++ b/pkg/controller/volume/attachdetach/statusupdater/node_status_updater.go @@ -19,6 +19,7 @@ limitations under the License. package statusupdater import ( + "fmt" "k8s.io/klog/v2" "k8s.io/api/core/v1" @@ -36,6 +37,8 @@ type NodeStatusUpdater interface { // Gets a list of node statuses that should be updated from the actual state // of the world and updates them. UpdateNodeStatuses() error + // Update any pending status change for the given node + UpdateNodeStatusForNode(nodeName types.NodeName) error } // NewNodeStatusUpdater returns a new instance of NodeStatusUpdater. @@ -57,40 +60,65 @@ type nodeStatusUpdater struct { } func (nsu *nodeStatusUpdater) UpdateNodeStatuses() error { + var nodeIssues int // TODO: investigate right behavior if nodeName is empty // kubernetes/kubernetes/issues/37777 nodesToUpdate := nsu.actualStateOfWorld.GetVolumesToReportAttached() for nodeName, attachedVolumes := range nodesToUpdate { - nodeObj, err := nsu.nodeLister.Get(string(nodeName)) - if errors.IsNotFound(err) { - // If node does not exist, its status cannot be updated. - // Do nothing so that there is no retry until node is created. - klog.V(2).Infof( - "Could not update node status. Failed to find node %q in NodeInformer cache. Error: '%v'", - nodeName, - err) - continue - } else if err != nil { - // For all other errors, log error and reset flag statusUpdateNeeded - // back to true to indicate this node status needs to be updated again. - klog.V(2).Infof("Error retrieving nodes from node lister. Error: %v", err) - nsu.actualStateOfWorld.SetNodeStatusUpdateNeeded(nodeName) - continue + err := nsu.processNodeVolumes(nodeName, attachedVolumes) + if err != nil { + nodeIssues += 1 } + } + if nodeIssues > 0 { + return fmt.Errorf("unable to update %d nodes", nodeIssues) + } + return nil +} - if err := nsu.updateNodeStatus(nodeName, nodeObj, attachedVolumes); err != nil { - // If update node status fails, reset flag statusUpdateNeeded back to true - // to indicate this node status needs to be updated again - nsu.actualStateOfWorld.SetNodeStatusUpdateNeeded(nodeName) +func (nsu *nodeStatusUpdater) UpdateNodeStatusForNode(nodeName types.NodeName) error { + needsUpdate, attachedVolumes := nsu.actualStateOfWorld.GetVolumesToReportAttachedForNode(nodeName) + if !needsUpdate { + return nil + } + return nsu.processNodeVolumes(nodeName, attachedVolumes) +} - klog.V(2).Infof( - "Could not update node status for %q; re-marking for update. %v", - nodeName, - err) +func (nsu *nodeStatusUpdater) processNodeVolumes(nodeName types.NodeName, attachedVolumes []v1.AttachedVolume) error { + nodeObj, err := nsu.nodeLister.Get(string(nodeName)) + if errors.IsNotFound(err) { + // If node does not exist, its status cannot be updated. + // Do nothing so that there is no retry until node is created. + klog.V(2).Infof( + "Could not update node status. Failed to find node %q in NodeInformer cache. Error: '%v'", + nodeName, + err) + return nil + } else if err != nil { + // For all other errors, log error and reset flag statusUpdateNeeded + // back to true to indicate this node status needs to be updated again. + klog.V(2).Infof("Error retrieving nodes from node lister. Error: %v", err) + nsu.actualStateOfWorld.SetNodeStatusUpdateNeeded(nodeName) + return err + } - // We currently always return immediately on error - return err - } + err = nsu.updateNodeStatus(nodeName, nodeObj, attachedVolumes) + if errors.IsNotFound(err) { + klog.V(2).Infof( + "Could not update node status for %q; node does not exist - skipping", + nodeName) + return nil + } else if err != nil { + // If update node status fails, reset flag statusUpdateNeeded back to true + // to indicate this node status needs to be updated again + nsu.actualStateOfWorld.SetNodeStatusUpdateNeeded(nodeName) + + klog.V(2).Infof( + "Could not update node status for %q; re-marking for update. %v", + nodeName, + err) + + return err } return nil }