Fix nodes volumesAttached status not updated

The UpdateNodeStatuses code stops too early in case there is
an error when calling updateNodeStatus. It will return immediately
which means any remaining node won't have its update status put back
to true.

Looking at the call sites for UpdateNodeStatuses, it appears this is
not the only issue. If the lister call fails with anything but a Not Found
error, it's silently ignored which is wrong in the detach path.
Also the reconciler detach path calls UpdateNodeStatuses but the real intent
is to only update the node currently processed in the loop and not proceed
with the detach call if there is an error updating that specifi node volumesAttached
property. With the current implementation, it will not proceed if there is
an error updating another node (which is not completely bad but not ideal) and
worse it will proceed if there is a lister error on that node which means the
node volumesAttached property won't have been updated.

To fix those issues, introduce the following changes:
- [node_status_updater] introduce UpdateNodeStatusForNode which does what
  UpdateNodeStatuses does but only for the provided node
- [node_status_updater] if the node lister call fails for anything but a Not
  Found error, we will return an error, not ignore it
- [node_status_updater] if the update of a node volumesAttached properties fails
  we continue processing the other nodes
- [actual_state_of_world] introduce GetVolumesToReportAttachedForNode which
  does what GetVolumesToReportAttached but for the node whose name is provided
  it returns a bool which indicates if the node in question needs an update as
  well as the volumesAttached list. It is used by UpdateNodeStatusForNode
- [actual_state_of_world] use write lock in updateNodeStatusUpdateNeeded, we're
  modifying the map content
- [reconciler] use UpdateNodeStatusForNode in the detach loop
This commit is contained in:
Jean-Francois Remy 2022-02-16 10:28:46 -08:00
parent b68063fce7
commit f1717baaaa
4 changed files with 111 additions and 41 deletions

View File

@ -135,6 +135,11 @@ type ActualStateOfWorld interface {
// is considered, before the detach operation is triggered).
GetVolumesToReportAttached() map[types.NodeName][]v1.AttachedVolume
// GetVolumesToReportAttachedForNode returns the list of volumes that should be reported as
// attached for the given node. It reports a boolean indicating if there is an update for that
// node and the corresponding attachedVolumes list.
GetVolumesToReportAttachedForNode(name types.NodeName) (bool, []v1.AttachedVolume)
// GetNodesToUpdateStatusFor returns the map of nodeNames to nodeToUpdateStatusFor
GetNodesToUpdateStatusFor() map[types.NodeName]nodeToUpdateStatusFor
}
@ -647,24 +652,13 @@ func (asw *actualStateOfWorld) GetNodesForAttachedVolume(volumeName v1.UniqueVol
}
func (asw *actualStateOfWorld) GetVolumesToReportAttached() map[types.NodeName][]v1.AttachedVolume {
asw.RLock()
defer asw.RUnlock()
asw.Lock()
defer asw.Unlock()
volumesToReportAttached := make(map[types.NodeName][]v1.AttachedVolume)
for nodeName, nodeToUpdateObj := range asw.nodesToUpdateStatusFor {
if nodeToUpdateObj.statusUpdateNeeded {
attachedVolumes := make(
[]v1.AttachedVolume,
0,
len(nodeToUpdateObj.volumesToReportAsAttached) /* len */)
for _, volume := range nodeToUpdateObj.volumesToReportAsAttached {
attachedVolumes = append(attachedVolumes,
v1.AttachedVolume{
Name: volume,
DevicePath: asw.attachedVolumes[volume].devicePath,
})
}
volumesToReportAttached[nodeToUpdateObj.nodeName] = attachedVolumes
volumesToReportAttached[nodeToUpdateObj.nodeName] = asw.getAttachedVolumeFromUpdateObject(nodeToUpdateObj.volumesToReportAsAttached)
}
// When GetVolumesToReportAttached is called by node status updater, the current status
// of this node will be updated, so set the flag statusUpdateNeeded to false indicating
@ -677,10 +671,49 @@ func (asw *actualStateOfWorld) GetVolumesToReportAttached() map[types.NodeName][
return volumesToReportAttached
}
func (asw *actualStateOfWorld) GetVolumesToReportAttachedForNode(nodeName types.NodeName) (bool, []v1.AttachedVolume) {
asw.Lock()
defer asw.Unlock()
var attachedVolumes []v1.AttachedVolume
nodeToUpdateObj, ok := asw.nodesToUpdateStatusFor[nodeName]
if !ok {
return false, nil
}
if !nodeToUpdateObj.statusUpdateNeeded {
return false, nil
}
attachedVolumes = asw.getAttachedVolumeFromUpdateObject(nodeToUpdateObj.volumesToReportAsAttached)
// When GetVolumesToReportAttached is called by node status updater, the current status
// of this node will be updated, so set the flag statusUpdateNeeded to false indicating
// the current status is already updated.
if err := asw.updateNodeStatusUpdateNeeded(nodeName, false); err != nil {
klog.Errorf("Failed to update statusUpdateNeeded field when getting volumes: %v", err)
}
return true, attachedVolumes
}
func (asw *actualStateOfWorld) GetNodesToUpdateStatusFor() map[types.NodeName]nodeToUpdateStatusFor {
return asw.nodesToUpdateStatusFor
}
func (asw *actualStateOfWorld) getAttachedVolumeFromUpdateObject(volumesToReportAsAttached map[v1.UniqueVolumeName]v1.UniqueVolumeName) []v1.AttachedVolume {
var attachedVolumes = make(
[]v1.AttachedVolume,
0,
len(volumesToReportAsAttached) /* len */)
for _, volume := range volumesToReportAsAttached {
attachedVolumes = append(attachedVolumes,
v1.AttachedVolume{
Name: volume,
DevicePath: asw.attachedVolumes[volume].devicePath,
})
}
return attachedVolumes
}
func getAttachedVolume(
attachedVolume *attachedVolume,
nodeAttachedTo *nodeAttachedTo) AttachedVolume {

View File

@ -202,7 +202,7 @@ func (rc *reconciler) reconcile() {
}
// Update Node Status to indicate volume is no longer safe to mount.
err = rc.nodeStatusUpdater.UpdateNodeStatuses()
err = rc.nodeStatusUpdater.UpdateNodeStatusForNode(attachedVolume.NodeName)
if err != nil {
// Skip detaching this volume if unable to update node status
klog.ErrorS(err, "UpdateNodeStatuses failed while attempting to report volume as attached", "volume", attachedVolume)

View File

@ -18,6 +18,7 @@ package statusupdater
import (
"fmt"
"k8s.io/apimachinery/pkg/types"
)
func NewFakeNodeStatusUpdater(returnError bool) NodeStatusUpdater {
@ -37,3 +38,11 @@ func (fnsu *fakeNodeStatusUpdater) UpdateNodeStatuses() error {
return nil
}
func (fnsu *fakeNodeStatusUpdater) UpdateNodeStatusForNode(nodeName types.NodeName) error {
if fnsu.returnError {
return fmt.Errorf("fake error on update node status")
}
return nil
}

View File

@ -19,6 +19,7 @@ limitations under the License.
package statusupdater
import (
"fmt"
"k8s.io/klog/v2"
"k8s.io/api/core/v1"
@ -36,6 +37,8 @@ type NodeStatusUpdater interface {
// Gets a list of node statuses that should be updated from the actual state
// of the world and updates them.
UpdateNodeStatuses() error
// Update any pending status change for the given node
UpdateNodeStatusForNode(nodeName types.NodeName) error
}
// NewNodeStatusUpdater returns a new instance of NodeStatusUpdater.
@ -57,40 +60,65 @@ type nodeStatusUpdater struct {
}
func (nsu *nodeStatusUpdater) UpdateNodeStatuses() error {
var nodeIssues int
// TODO: investigate right behavior if nodeName is empty
// kubernetes/kubernetes/issues/37777
nodesToUpdate := nsu.actualStateOfWorld.GetVolumesToReportAttached()
for nodeName, attachedVolumes := range nodesToUpdate {
nodeObj, err := nsu.nodeLister.Get(string(nodeName))
if errors.IsNotFound(err) {
// If node does not exist, its status cannot be updated.
// Do nothing so that there is no retry until node is created.
klog.V(2).Infof(
"Could not update node status. Failed to find node %q in NodeInformer cache. Error: '%v'",
nodeName,
err)
continue
} else if err != nil {
// For all other errors, log error and reset flag statusUpdateNeeded
// back to true to indicate this node status needs to be updated again.
klog.V(2).Infof("Error retrieving nodes from node lister. Error: %v", err)
nsu.actualStateOfWorld.SetNodeStatusUpdateNeeded(nodeName)
continue
err := nsu.processNodeVolumes(nodeName, attachedVolumes)
if err != nil {
nodeIssues += 1
}
}
if nodeIssues > 0 {
return fmt.Errorf("unable to update %d nodes", nodeIssues)
}
return nil
}
if err := nsu.updateNodeStatus(nodeName, nodeObj, attachedVolumes); err != nil {
// If update node status fails, reset flag statusUpdateNeeded back to true
// to indicate this node status needs to be updated again
nsu.actualStateOfWorld.SetNodeStatusUpdateNeeded(nodeName)
func (nsu *nodeStatusUpdater) UpdateNodeStatusForNode(nodeName types.NodeName) error {
needsUpdate, attachedVolumes := nsu.actualStateOfWorld.GetVolumesToReportAttachedForNode(nodeName)
if !needsUpdate {
return nil
}
return nsu.processNodeVolumes(nodeName, attachedVolumes)
}
klog.V(2).Infof(
"Could not update node status for %q; re-marking for update. %v",
nodeName,
err)
func (nsu *nodeStatusUpdater) processNodeVolumes(nodeName types.NodeName, attachedVolumes []v1.AttachedVolume) error {
nodeObj, err := nsu.nodeLister.Get(string(nodeName))
if errors.IsNotFound(err) {
// If node does not exist, its status cannot be updated.
// Do nothing so that there is no retry until node is created.
klog.V(2).Infof(
"Could not update node status. Failed to find node %q in NodeInformer cache. Error: '%v'",
nodeName,
err)
return nil
} else if err != nil {
// For all other errors, log error and reset flag statusUpdateNeeded
// back to true to indicate this node status needs to be updated again.
klog.V(2).Infof("Error retrieving nodes from node lister. Error: %v", err)
nsu.actualStateOfWorld.SetNodeStatusUpdateNeeded(nodeName)
return err
}
// We currently always return immediately on error
return err
}
err = nsu.updateNodeStatus(nodeName, nodeObj, attachedVolumes)
if errors.IsNotFound(err) {
klog.V(2).Infof(
"Could not update node status for %q; node does not exist - skipping",
nodeName)
return nil
} else if err != nil {
// If update node status fails, reset flag statusUpdateNeeded back to true
// to indicate this node status needs to be updated again
nsu.actualStateOfWorld.SetNodeStatusUpdateNeeded(nodeName)
klog.V(2).Infof(
"Could not update node status for %q; re-marking for update. %v",
nodeName,
err)
return err
}
return nil
}