Merge pull request #108167 from jfremy/fix-107973

Fix nodes volumesAttached status not being updated
This commit is contained in:
Kubernetes Prow Robot 2022-03-01 12:49:54 -08:00 committed by GitHub
commit 66daef4aa7
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 399 additions and 42 deletions

View File

@ -135,6 +135,11 @@ type ActualStateOfWorld interface {
// is considered, before the detach operation is triggered).
GetVolumesToReportAttached() map[types.NodeName][]v1.AttachedVolume
// GetVolumesToReportAttachedForNode returns the list of volumes that should be reported as
// attached for the given node. It reports a boolean indicating if there is an update for that
// node and the corresponding attachedVolumes list.
GetVolumesToReportAttachedForNode(name types.NodeName) (bool, []v1.AttachedVolume)
// GetNodesToUpdateStatusFor returns the map of nodeNames to nodeToUpdateStatusFor
GetNodesToUpdateStatusFor() map[types.NodeName]nodeToUpdateStatusFor
}
@ -647,24 +652,13 @@ func (asw *actualStateOfWorld) GetNodesForAttachedVolume(volumeName v1.UniqueVol
}
func (asw *actualStateOfWorld) GetVolumesToReportAttached() map[types.NodeName][]v1.AttachedVolume {
asw.RLock()
defer asw.RUnlock()
asw.Lock()
defer asw.Unlock()
volumesToReportAttached := make(map[types.NodeName][]v1.AttachedVolume)
for nodeName, nodeToUpdateObj := range asw.nodesToUpdateStatusFor {
if nodeToUpdateObj.statusUpdateNeeded {
attachedVolumes := make(
[]v1.AttachedVolume,
0,
len(nodeToUpdateObj.volumesToReportAsAttached) /* len */)
for _, volume := range nodeToUpdateObj.volumesToReportAsAttached {
attachedVolumes = append(attachedVolumes,
v1.AttachedVolume{
Name: volume,
DevicePath: asw.attachedVolumes[volume].devicePath,
})
}
volumesToReportAttached[nodeToUpdateObj.nodeName] = attachedVolumes
volumesToReportAttached[nodeToUpdateObj.nodeName] = asw.getAttachedVolumeFromUpdateObject(nodeToUpdateObj.volumesToReportAsAttached)
}
// When GetVolumesToReportAttached is called by node status updater, the current status
// of this node will be updated, so set the flag statusUpdateNeeded to false indicating
@ -677,10 +671,48 @@ func (asw *actualStateOfWorld) GetVolumesToReportAttached() map[types.NodeName][
return volumesToReportAttached
}
func (asw *actualStateOfWorld) GetVolumesToReportAttachedForNode(nodeName types.NodeName) (bool, []v1.AttachedVolume) {
asw.Lock()
defer asw.Unlock()
nodeToUpdateObj, ok := asw.nodesToUpdateStatusFor[nodeName]
if !ok {
return false, nil
}
if !nodeToUpdateObj.statusUpdateNeeded {
return false, nil
}
volumesToReportAttached := asw.getAttachedVolumeFromUpdateObject(nodeToUpdateObj.volumesToReportAsAttached)
// When GetVolumesToReportAttached is called by node status updater, the current status
// of this node will be updated, so set the flag statusUpdateNeeded to false indicating
// the current status is already updated.
if err := asw.updateNodeStatusUpdateNeeded(nodeName, false); err != nil {
klog.Errorf("Failed to update statusUpdateNeeded field when getting volumes: %v", err)
}
return true, volumesToReportAttached
}
func (asw *actualStateOfWorld) GetNodesToUpdateStatusFor() map[types.NodeName]nodeToUpdateStatusFor {
return asw.nodesToUpdateStatusFor
}
func (asw *actualStateOfWorld) getAttachedVolumeFromUpdateObject(volumesToReportAsAttached map[v1.UniqueVolumeName]v1.UniqueVolumeName) []v1.AttachedVolume {
var attachedVolumes = make(
[]v1.AttachedVolume,
0,
len(volumesToReportAsAttached) /* len */)
for _, volume := range volumesToReportAsAttached {
attachedVolumes = append(attachedVolumes,
v1.AttachedVolume{
Name: volume,
DevicePath: asw.attachedVolumes[volume].devicePath,
})
}
return attachedVolumes
}
func getAttachedVolume(
attachedVolume *attachedVolume,
nodeAttachedTo *nodeAttachedTo) AttachedVolume {

View File

@ -1447,6 +1447,70 @@ func Test_MarkVolumeAsUncertain(t *testing.T) {
verifyAttachedVolume(t, attachedVolumes, volumeName, string(volumeName), nodeName, "", true /* expectedMountedByNode */, false /* expectNonZeroDetachRequestedTime */)
}
// Calls AddVolumeNode() once with attached set to true.
// Verifies GetVolumesToReportAttachedForNode has an update for the node.
// Call GetVolumesToReportAttachedForNode a second time for the node, verify it does not report
// an update is needed any more
// Then calls RemoveVolumeFromReportAsAttached()
// Verifies GetVolumesToReportAttachedForNode reports an update is needed
func Test_GetVolumesToReportAttachedForNode_Positive(t *testing.T) {
// Arrange
volumePluginMgr, _ := volumetesting.GetTestVolumePluginMgr(t)
asw := NewActualStateOfWorld(volumePluginMgr)
volumeName := v1.UniqueVolumeName("volume-name")
volumeSpec := controllervolumetesting.GetTestVolumeSpec(string(volumeName), volumeName)
nodeName := types.NodeName("node-name")
devicePath := "fake/device/path"
// Act
generatedVolumeName, err := asw.AddVolumeNode(volumeName, volumeSpec, nodeName, devicePath, true)
// Assert
if err != nil {
t.Fatalf("AddVolumeNode failed. Expected: <no error> Actual: <%v>", err)
}
needsUpdate, attachedVolumes := asw.GetVolumesToReportAttachedForNode(nodeName)
if !needsUpdate {
t.Fatalf("GetVolumesToReportAttachedForNode_Positive_NewVolumeNewNodeWithTrueAttached failed. Actual: <node %q does not need an update> Expect: <node exists in the reportedAsAttached map and needs an update", nodeName)
}
if len(attachedVolumes) != 1 {
t.Fatalf("len(attachedVolumes) Expected: <1> Actual: <%v>", len(attachedVolumes))
}
needsUpdate, _ = asw.GetVolumesToReportAttachedForNode(nodeName)
if needsUpdate {
t.Fatalf("GetVolumesToReportAttachedForNode_Positive_NewVolumeNewNodeWithTrueAttached failed. Actual: <node %q needs an update> Expect: <node exists in the reportedAsAttached map and does not need an update", nodeName)
}
removeVolumeDetachErr := asw.RemoveVolumeFromReportAsAttached(generatedVolumeName, nodeName)
if removeVolumeDetachErr != nil {
t.Fatalf("RemoveVolumeFromReportAsAttached failed. Expected: <no error> Actual: <%v>", removeVolumeDetachErr)
}
needsUpdate, attachedVolumes = asw.GetVolumesToReportAttachedForNode(nodeName)
if !needsUpdate {
t.Fatalf("GetVolumesToReportAttachedForNode_Positive_NewVolumeNewNodeWithTrueAttached failed. Actual: <node %q does not need an update> Expect: <node exists in the reportedAsAttached map and needs an update", nodeName)
}
if len(attachedVolumes) != 0 {
t.Fatalf("len(attachedVolumes) Expected: <0> Actual: <%v>", len(attachedVolumes))
}
}
// Verifies GetVolumesToReportAttachedForNode reports no update needed for an unknown node.
func Test_GetVolumesToReportAttachedForNode_UnknownNode(t *testing.T) {
// Arrange
volumePluginMgr, _ := volumetesting.GetTestVolumePluginMgr(t)
asw := NewActualStateOfWorld(volumePluginMgr)
nodeName := types.NodeName("node-name")
needsUpdate, _ := asw.GetVolumesToReportAttachedForNode(nodeName)
if needsUpdate {
t.Fatalf("GetVolumesToReportAttachedForNode_UnknownNode failed. Actual: <node %q needs an update> Expect: <node does not exist in the reportedAsAttached map and does not need an update", nodeName)
}
}
func verifyAttachedVolume(
t *testing.T,
attachedVolumes []AttachedVolume,

View File

@ -202,10 +202,10 @@ func (rc *reconciler) reconcile() {
}
// Update Node Status to indicate volume is no longer safe to mount.
err = rc.nodeStatusUpdater.UpdateNodeStatuses()
err = rc.nodeStatusUpdater.UpdateNodeStatusForNode(attachedVolume.NodeName)
if err != nil {
// Skip detaching this volume if unable to update node status
klog.ErrorS(err, "UpdateNodeStatuses failed while attempting to report volume as attached", "volume", attachedVolume)
klog.ErrorS(err, "UpdateNodeStatusForNode failed while attempting to report volume as attached", "volume", attachedVolume)
continue
}

View File

@ -18,6 +18,7 @@ package statusupdater
import (
"fmt"
"k8s.io/apimachinery/pkg/types"
)
func NewFakeNodeStatusUpdater(returnError bool) NodeStatusUpdater {
@ -37,3 +38,11 @@ func (fnsu *fakeNodeStatusUpdater) UpdateNodeStatuses() error {
return nil
}
func (fnsu *fakeNodeStatusUpdater) UpdateNodeStatusForNode(nodeName types.NodeName) error {
if fnsu.returnError {
return fmt.Errorf("fake error on update node status")
}
return nil
}

View File

@ -19,6 +19,7 @@ limitations under the License.
package statusupdater
import (
"fmt"
"k8s.io/klog/v2"
"k8s.io/api/core/v1"
@ -36,6 +37,8 @@ type NodeStatusUpdater interface {
// Gets a list of node statuses that should be updated from the actual state
// of the world and updates them.
UpdateNodeStatuses() error
// Update any pending status change for the given node
UpdateNodeStatusForNode(nodeName types.NodeName) error
}
// NewNodeStatusUpdater returns a new instance of NodeStatusUpdater.
@ -57,40 +60,67 @@ type nodeStatusUpdater struct {
}
func (nsu *nodeStatusUpdater) UpdateNodeStatuses() error {
var nodeIssues int
// TODO: investigate right behavior if nodeName is empty
// kubernetes/kubernetes/issues/37777
nodesToUpdate := nsu.actualStateOfWorld.GetVolumesToReportAttached()
for nodeName, attachedVolumes := range nodesToUpdate {
nodeObj, err := nsu.nodeLister.Get(string(nodeName))
if errors.IsNotFound(err) {
// If node does not exist, its status cannot be updated.
// Do nothing so that there is no retry until node is created.
klog.V(2).Infof(
"Could not update node status. Failed to find node %q in NodeInformer cache. Error: '%v'",
nodeName,
err)
continue
} else if err != nil {
// For all other errors, log error and reset flag statusUpdateNeeded
// back to true to indicate this node status needs to be updated again.
klog.V(2).Infof("Error retrieving nodes from node lister. Error: %v", err)
nsu.actualStateOfWorld.SetNodeStatusUpdateNeeded(nodeName)
continue
err := nsu.processNodeVolumes(nodeName, attachedVolumes)
if err != nil {
nodeIssues += 1
}
}
if nodeIssues > 0 {
return fmt.Errorf("unable to update %d nodes", nodeIssues)
}
return nil
}
if err := nsu.updateNodeStatus(nodeName, nodeObj, attachedVolumes); err != nil {
// If update node status fails, reset flag statusUpdateNeeded back to true
// to indicate this node status needs to be updated again
nsu.actualStateOfWorld.SetNodeStatusUpdateNeeded(nodeName)
func (nsu *nodeStatusUpdater) UpdateNodeStatusForNode(nodeName types.NodeName) error {
needsUpdate, attachedVolumes := nsu.actualStateOfWorld.GetVolumesToReportAttachedForNode(nodeName)
if !needsUpdate {
return nil
}
return nsu.processNodeVolumes(nodeName, attachedVolumes)
}
klog.V(2).Infof(
"Could not update node status for %q; re-marking for update. %v",
nodeName,
err)
func (nsu *nodeStatusUpdater) processNodeVolumes(nodeName types.NodeName, attachedVolumes []v1.AttachedVolume) error {
nodeObj, err := nsu.nodeLister.Get(string(nodeName))
if errors.IsNotFound(err) {
// If node does not exist, its status cannot be updated.
// Do nothing so that there is no retry until node is created.
klog.V(2).Infof(
"Could not update node status. Failed to find node %q in NodeInformer cache. Error: '%v'",
nodeName,
err)
return nil
} else if err != nil {
// For all other errors, log error and reset flag statusUpdateNeeded
// back to true to indicate this node status needs to be updated again.
klog.V(2).Infof("Error retrieving nodes from node lister. Error: %v", err)
nsu.actualStateOfWorld.SetNodeStatusUpdateNeeded(nodeName)
return err
}
// We currently always return immediately on error
return err
}
err = nsu.updateNodeStatus(nodeName, nodeObj, attachedVolumes)
if errors.IsNotFound(err) {
// If node does not exist, its status cannot be updated.
// Do nothing so that there is no retry until node is created.
klog.V(2).Infof(
"Could not update node status for %q; node does not exist - skipping",
nodeName)
return nil
} else if err != nil {
// If update node status fails, reset flag statusUpdateNeeded back to true
// to indicate this node status needs to be updated again
nsu.actualStateOfWorld.SetNodeStatusUpdateNeeded(nodeName)
klog.V(2).Infof(
"Could not update node status for %q; re-marking for update. %v",
nodeName,
err)
return err
}
return nil
}

View File

@ -0,0 +1,222 @@
/*
Copyright 2022 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package statusupdater
import (
"context"
"errors"
"fmt"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/informers"
"k8s.io/client-go/kubernetes/fake"
core "k8s.io/client-go/testing"
"k8s.io/kubernetes/pkg/controller"
"k8s.io/kubernetes/pkg/controller/volume/attachdetach/cache"
controllervolumetesting "k8s.io/kubernetes/pkg/controller/volume/attachdetach/testing"
volumetesting "k8s.io/kubernetes/pkg/volume/testing"
"testing"
)
// setupNodeStatusUpdate creates all the needed objects for testing.
// the initial environment has 2 nodes with no volumes attached
// and adds one volume to attach to each node to the actual state of the world
func setupNodeStatusUpdate(ctx context.Context, t *testing.T) (cache.ActualStateOfWorld, *fake.Clientset, NodeStatusUpdater) {
testNode1 := corev1.Node{
TypeMeta: metav1.TypeMeta{
Kind: "Node",
APIVersion: "v1",
},
ObjectMeta: metav1.ObjectMeta{
Name: "testnode-1",
},
Status: corev1.NodeStatus{},
}
testNode2 := corev1.Node{
TypeMeta: metav1.TypeMeta{
Kind: "Node",
APIVersion: "v1",
},
ObjectMeta: metav1.ObjectMeta{
Name: "testnode-2",
},
Status: corev1.NodeStatus{},
}
volumePluginMgr, _ := volumetesting.GetTestVolumePluginMgr(t)
asw := cache.NewActualStateOfWorld(volumePluginMgr)
fakeKubeClient := fake.NewSimpleClientset(&testNode1, &testNode2)
informerFactory := informers.NewSharedInformerFactory(fakeKubeClient, controller.NoResyncPeriodFunc())
nodeInformer := informerFactory.Core().V1().Nodes()
nsu := NewNodeStatusUpdater(fakeKubeClient, nodeInformer.Lister(), asw)
err := nodeInformer.Informer().GetStore().Add(&testNode1)
if err != nil {
t.Fatalf(".Informer().GetStore().Add failed. Expected: <no error> Actual: <%v>", err)
}
err = nodeInformer.Informer().GetStore().Add(&testNode2)
if err != nil {
t.Fatalf(".Informer().GetStore().Add failed. Expected: <no error> Actual: <%v>", err)
}
volumeName1 := corev1.UniqueVolumeName("volume-name-1")
volumeName2 := corev1.UniqueVolumeName("volume-name-2")
volumeSpec1 := controllervolumetesting.GetTestVolumeSpec(string(volumeName1), volumeName1)
volumeSpec2 := controllervolumetesting.GetTestVolumeSpec(string(volumeName2), volumeName2)
nodeName1 := types.NodeName("testnode-1")
nodeName2 := types.NodeName("testnode-2")
devicePath := "fake/device/path"
_, err = asw.AddVolumeNode(volumeName1, volumeSpec1, nodeName1, devicePath, true)
if err != nil {
t.Fatalf("AddVolumeNode failed. Expected: <no error> Actual: <%v>", err)
}
_, err = asw.AddVolumeNode(volumeName2, volumeSpec2, nodeName2, devicePath, true)
if err != nil {
t.Fatalf("AddVolumeNode failed. Expected: <no error> Actual: <%v>", err)
}
return asw, fakeKubeClient, nsu
}
// TestNodeStatusUpdater_UpdateNodeStatuses_TwoNodesUpdate calls setup
// calls UpdateNodeStatuses()
// check that asw.GetVolumesToReportAttached reports nothing left to attach
// checks that each node status.volumesAttached is of length 1 and contains the correct volume
func TestNodeStatusUpdater_UpdateNodeStatuses_TwoNodesUpdate(t *testing.T) {
ctx := context.Background()
asw, fakeKubeClient, nsu := setupNodeStatusUpdate(ctx, t)
err := nsu.UpdateNodeStatuses()
if err != nil {
t.Fatalf("UpdateNodeStatuses failed. Expected: <no error> Actual: <%v>", err)
}
needToReport := asw.GetVolumesToReportAttached()
if len(needToReport) != 0 {
t.Fatalf("len(asw.GetVolumesToReportAttached()) Expected: <0> Actual: <%v>", len(needToReport))
}
node, err := fakeKubeClient.CoreV1().Nodes().Get(ctx, "testnode-1", metav1.GetOptions{})
if err != nil {
t.Fatalf("Nodes().Get failed. Expected: <no error> Actual: <%v>", err)
}
if len(node.Status.VolumesAttached) != 1 {
t.Fatalf("len(node.Status.VolumesAttached) Expected: <1> Actual: <%v>", len(node.Status.VolumesAttached))
}
if node.Status.VolumesAttached[0].Name != "volume-name-1" {
t.Fatalf("volumeName Expected: <volume-name-1> Actual: <%s>", node.Status.VolumesAttached[0].Name)
}
node, err = fakeKubeClient.CoreV1().Nodes().Get(ctx, "testnode-2", metav1.GetOptions{})
if err != nil {
t.Fatalf("Nodes().Get failed. Expected: <no error> Actual: <%v>", err)
}
if len(node.Status.VolumesAttached) != 1 {
t.Fatalf("len(node.Status.VolumesAttached) Expected: <1> Actual: <%v>", len(node.Status.VolumesAttached))
}
if node.Status.VolumesAttached[0].Name != "volume-name-2" {
t.Fatalf("volumeName Expected: <volume-name-2> Actual: <%s>", node.Status.VolumesAttached[0].Name)
}
}
func TestNodeStatusUpdater_UpdateNodeStatuses_FailureInFirstUpdate(t *testing.T) {
ctx := context.Background()
asw, fakeKubeClient, nsu := setupNodeStatusUpdate(ctx, t)
var failedNode string
failedOnce := false
failureErr := fmt.Errorf("test generated error")
fakeKubeClient.PrependReactor("patch", "nodes", func(action core.Action) (handled bool, ret runtime.Object, err error) {
patchAction := action.(core.PatchAction)
if !failedOnce {
failedNode = patchAction.GetName()
failedOnce = true
return true, nil, failureErr
}
return false, nil, nil
})
err := nsu.UpdateNodeStatuses()
if errors.Is(err, failureErr) {
t.Fatalf("UpdateNodeStatuses failed. Expected: <test generated error> Actual: <%v>", err)
}
needToReport := asw.GetVolumesToReportAttached()
if len(needToReport) != 1 {
t.Fatalf("len(asw.GetVolumesToReportAttached()) Expected: <1> Actual: <%v>", len(needToReport))
}
if _, ok := needToReport[types.NodeName(failedNode)]; !ok {
t.Fatalf("GetVolumesToReportAttached() did not report correct node Expected: <%s> Actual: <%v>", failedNode, needToReport)
}
nodes, err := fakeKubeClient.CoreV1().Nodes().List(ctx, metav1.ListOptions{})
if err != nil {
t.Fatalf("Nodes().List failed. Expected: <no error> Actual: <%v>", err)
}
if len(nodes.Items) != 2 {
t.Fatalf("len(nodes.Items) Expected: <2> Actual: <%v>", len(nodes.Items))
}
for _, node := range nodes.Items {
if node.Name == failedNode {
if len(node.Status.VolumesAttached) != 0 {
t.Fatalf("len(node.Status.VolumesAttached) Expected: <0> Actual: <%v>", len(node.Status.VolumesAttached))
}
} else {
if len(node.Status.VolumesAttached) != 1 {
t.Fatalf("len(node.Status.VolumesAttached) Expected: <1> Actual: <%v>", len(node.Status.VolumesAttached))
}
}
}
}
// TestNodeStatusUpdater_UpdateNodeStatusForNode calls setup
// calls UpdateNodeStatusesForNode on testnode-1
// check that asw.GetVolumesToReportAttached reports testnode-2 needs to be reported
// checks that testnode-1 status.volumesAttached is of length 1 and contains the correct volume
func TestNodeStatusUpdater_UpdateNodeStatusForNode(t *testing.T) {
ctx := context.Background()
asw, fakeKubeClient, nsu := setupNodeStatusUpdate(ctx, t)
err := nsu.UpdateNodeStatusForNode("testnode-1")
if err != nil {
t.Fatalf("UpdateNodeStatuses failed. Expected: <no error> Actual: <%v>", err)
}
needToReport := asw.GetVolumesToReportAttached()
if len(needToReport) != 1 {
t.Fatalf("len(asw.GetVolumesToReportAttached()) Expected: <1> Actual: <%v>", len(needToReport))
}
if _, ok := needToReport["testnode-2"]; !ok {
t.Fatalf("GetVolumesToReportAttached() did not report correct node Expected: <testnode-2> Actual: <%v>", needToReport)
}
node, err := fakeKubeClient.CoreV1().Nodes().Get(ctx, "testnode-1", metav1.GetOptions{})
if err != nil {
t.Fatalf("Nodes().Get failed. Expected: <no error> Actual: <%v>", err)
}
if len(node.Status.VolumesAttached) != 1 {
t.Fatalf("len(node.Status.VolumesAttached) Expected: <1> Actual: <%v>", len(node.Status.VolumesAttached))
}
if node.Status.VolumesAttached[0].Name != "volume-name-1" {
t.Fatalf("volumeName Expected: <volume-name-1> Actual: <%s>", node.Status.VolumesAttached[0].Name)
}
}