Node status updater now deletes the node entry in attach updates when node is missing in NodeInformer cache. Fixes #42438.

- Added RemoveNodeFromAttachUpdates as part of node status updater operations.
This commit is contained in:
Cheng Xing 2017-05-16 11:18:21 -07:00
parent ee0de5f376
commit f9dc2d5ca3
5 changed files with 115 additions and 7 deletions

View File

@ -125,6 +125,10 @@ type ActualStateOfWorld interface {
// GetNodesToUpdateStatusFor returns the map of nodeNames to nodeToUpdateStatusFor
GetNodesToUpdateStatusFor() map[types.NodeName]nodeToUpdateStatusFor
// Removes the given node from the record of attach updates. The node's entire
// volumesToReportAsAttached list is removed.
RemoveNodeFromAttachUpdates(nodeName types.NodeName) error
}
// AttachedVolume represents a volume that is attached to a node.
@ -260,6 +264,19 @@ func (asw *actualStateOfWorld) AddVolumeToReportAsAttached(
asw.addVolumeToReportAsAttached(volumeName, nodeName)
}
func (asw *actualStateOfWorld) RemoveNodeFromAttachUpdates(nodeName types.NodeName) error {
asw.Lock()
defer asw.Unlock()
_, nodeToUpdateExists := asw.nodesToUpdateStatusFor[nodeName]
if nodeToUpdateExists {
delete(asw.nodesToUpdateStatusFor, nodeName)
return nil
}
return fmt.Errorf("node %q does not exist in volumesToReportAsAttached list",
nodeName)
}
func (asw *actualStateOfWorld) AddVolumeNode(
uniqueName v1.UniqueVolumeName, volumeSpec *volume.Spec, nodeName types.NodeName, devicePath string) (v1.UniqueVolumeName, error) {
asw.Lock()

View File

@ -1165,6 +1165,89 @@ func Test_updateNodeStatusUpdateNeededError(t *testing.T) {
}
}
// Test_RemoveNodeFromAttachUpdates_Positive expects an entire node entry to be removed
// from nodesToUpdateStatusFor
func Test_RemoveNodeFromAttachUpdates_Positive(t *testing.T) {
// Arrange
volumePluginMgr, _ := volumetesting.GetTestVolumePluginMgr(t)
asw := &actualStateOfWorld{
attachedVolumes: make(map[v1.UniqueVolumeName]attachedVolume),
nodesToUpdateStatusFor: make(map[types.NodeName]nodeToUpdateStatusFor),
volumePluginMgr: volumePluginMgr,
}
nodeName := types.NodeName("node-1")
nodeToUpdate := nodeToUpdateStatusFor{
nodeName: nodeName,
statusUpdateNeeded: true,
volumesToReportAsAttached: make(map[v1.UniqueVolumeName]v1.UniqueVolumeName),
}
asw.nodesToUpdateStatusFor[nodeName] = nodeToUpdate
// Act
err := asw.RemoveNodeFromAttachUpdates(nodeName)
// Assert
if err != nil {
t.Fatalf("RemoveNodeFromAttachUpdates should not return error, but got: %v", err)
}
if len(asw.nodesToUpdateStatusFor) > 0 {
t.Fatal("nodesToUpdateStatusFor should be empty as its only entry has been deleted.")
}
}
// Test_RemoveNodeFromAttachUpdates_Negative_NodeDoesNotExist expects an error to be thrown
// when nodeName is not in nodesToUpdateStatusFor.
func Test_RemoveNodeFromAttachUpdates_Negative_NodeDoesNotExist(t *testing.T) {
// Arrange
volumePluginMgr, _ := volumetesting.GetTestVolumePluginMgr(t)
asw := &actualStateOfWorld{
attachedVolumes: make(map[v1.UniqueVolumeName]attachedVolume),
nodesToUpdateStatusFor: make(map[types.NodeName]nodeToUpdateStatusFor),
volumePluginMgr: volumePluginMgr,
}
nodeName := types.NodeName("node-1")
nodeToUpdate := nodeToUpdateStatusFor{
nodeName: nodeName,
statusUpdateNeeded: true,
volumesToReportAsAttached: make(map[v1.UniqueVolumeName]v1.UniqueVolumeName),
}
asw.nodesToUpdateStatusFor[nodeName] = nodeToUpdate
// Act
err := asw.RemoveNodeFromAttachUpdates("node-2")
// Assert
if err == nil {
t.Fatal("RemoveNodeFromAttachUpdates should return an error as the nodeName doesn't exist.")
}
if len(asw.nodesToUpdateStatusFor) != 1 {
t.Fatal("The length of nodesToUpdateStatusFor should not change because no operation was performed.")
}
}
// Test_RemoveNodeFromAttachUpdates_Negative_Empty expects an error to be thrown
// when a nodesToUpdateStatusFor is empty.
func Test_RemoveNodeFromAttachUpdates_Negative_Empty(t *testing.T) {
// Arrange
volumePluginMgr, _ := volumetesting.GetTestVolumePluginMgr(t)
asw := &actualStateOfWorld{
attachedVolumes: make(map[v1.UniqueVolumeName]attachedVolume),
nodesToUpdateStatusFor: make(map[types.NodeName]nodeToUpdateStatusFor),
volumePluginMgr: volumePluginMgr,
}
// Act
err := asw.RemoveNodeFromAttachUpdates("node-1")
// Assert
if err == nil {
t.Fatal("RemoveNodeFromAttachUpdates should return an error as nodeToUpdateStatusFor is empty.")
}
if len(asw.nodesToUpdateStatusFor) != 0 {
t.Fatal("The length of nodesToUpdateStatusFor should be 0.")
}
}
func verifyAttachedVolume(
t *testing.T,
attachedVolumes []AttachedVolume,

View File

@ -21,6 +21,7 @@ go_library(
"//pkg/client/listers/core/v1:go_default_library",
"//pkg/controller/volume/attachdetach/cache:go_default_library",
"//vendor/github.com/golang/glog:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/api/errors:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/util/strategicpatch:go_default_library",
],
)

View File

@ -24,6 +24,7 @@ import (
"github.com/golang/glog"
"k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/util/strategicpatch"
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/api/v1"
@ -64,14 +65,20 @@ func (nsu *nodeStatusUpdater) UpdateNodeStatuses() error {
nodesToUpdate := nsu.actualStateOfWorld.GetVolumesToReportAttached()
for nodeName, attachedVolumes := range nodesToUpdate {
nodeObj, err := nsu.nodeLister.Get(string(nodeName))
if nodeObj == nil || err != nil {
// If node does not exist, its status cannot be updated, log error and
// reset flag statusUpdateNeeded back to true to indicate this node status
// needs to be updated again
if errors.IsNotFound(err) {
// If node does not exist, its status cannot be updated.
// Remove the node entry from the collection of attach updates, preventing the
// status updater from unnecessarily updating the node.
glog.V(2).Infof(
"Could not update node status. Failed to find node %q in NodeInformer cache. %v",
"Could not update node status. Failed to find node %q in NodeInformer cache. Error: '%v'",
nodeName,
err)
nsu.actualStateOfWorld.RemoveNodeFromAttachUpdates(nodeName)
continue
} else if err != nil {
// For all other errors, log error and reset flag statusUpdateNeeded
// back to true to indicate this node status needs to be updated again.
glog.V(2).Infof("Error retrieving nodes from node lister. Error: %v", err)
nsu.actualStateOfWorld.SetNodeStatusUpdateNeeded(nodeName)
continue
}

View File

@ -160,11 +160,11 @@ type ActualStateOfWorldAttacherUpdater interface {
MarkVolumeAsDetached(volumeName v1.UniqueVolumeName, nodeName types.NodeName)
// Marks desire to detach the specified volume (remove the volume from the node's
// volumesToReportedAsAttached list)
// volumesToReportAsAttached list)
RemoveVolumeFromReportAsAttached(volumeName v1.UniqueVolumeName, nodeName types.NodeName) error
// Unmarks the desire to detach for the specified volume (add the volume back to
// the node's volumesToReportedAsAttached list)
// the node's volumesToReportAsAttached list)
AddVolumeToReportAsAttached(volumeName v1.UniqueVolumeName, nodeName types.NodeName)
}