volumes: SetNodeStatusUpdateNeeded on error

If an error happened during the UpdateNodeStatuses loop, there were some
code paths where we would not call SetNodeStatusUpdateNeeded, leaking
the state.  Add it to all paths by adding a function.

Part of #40583
This commit is contained in:
Justin Santa Barbara 2017-02-28 10:56:09 -05:00
parent 7043372d05
commit d420531f95
2 changed files with 69 additions and 53 deletions

View File

@ -22,6 +22,7 @@ go_library(
"//pkg/controller/volume/attachdetach/cache:go_default_library", "//pkg/controller/volume/attachdetach/cache:go_default_library",
"//vendor/github.com/golang/glog:go_default_library", "//vendor/github.com/golang/glog:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/api/errors:go_default_library", "//vendor/k8s.io/apimachinery/pkg/api/errors:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/types:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/util/strategicpatch:go_default_library", "//vendor/k8s.io/apimachinery/pkg/util/strategicpatch:go_default_library",
], ],
) )

View File

@ -25,6 +25,7 @@ import (
"github.com/golang/glog" "github.com/golang/glog"
"k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/strategicpatch" "k8s.io/apimachinery/pkg/util/strategicpatch"
"k8s.io/kubernetes/pkg/api" "k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/api/v1" "k8s.io/kubernetes/pkg/api/v1"
@ -83,65 +84,79 @@ func (nsu *nodeStatusUpdater) UpdateNodeStatuses() error {
continue continue
} }
clonedNode, err := api.Scheme.DeepCopy(nodeObj) if err := nsu.updateNodeStatus(nodeName, nodeObj, attachedVolumes); err != nil {
if err != nil {
return fmt.Errorf("error cloning node %q: %v",
nodeName,
err)
}
node, ok := clonedNode.(*v1.Node)
if !ok || node == nil {
return fmt.Errorf(
"failed to cast %q object %#v to Node",
nodeName,
clonedNode)
}
// TODO: Change to pkg/util/node.UpdateNodeStatus.
oldData, err := json.Marshal(node)
if err != nil {
return fmt.Errorf(
"failed to Marshal oldData for node %q. %v",
nodeName,
err)
}
node.Status.VolumesAttached = attachedVolumes
newData, err := json.Marshal(node)
if err != nil {
return fmt.Errorf(
"failed to Marshal newData for node %q. %v",
nodeName,
err)
}
patchBytes, err :=
strategicpatch.CreateTwoWayMergePatch(oldData, newData, node)
if err != nil {
return fmt.Errorf(
"failed to CreateTwoWayMergePatch for node %q. %v",
nodeName,
err)
}
_, err = nsu.kubeClient.Core().Nodes().PatchStatus(string(nodeName), patchBytes)
if err != nil {
// If update node status fails, reset flag statusUpdateNeeded back to true // If update node status fails, reset flag statusUpdateNeeded back to true
// to indicate this node status needs to be updated again // to indicate this node status needs to be updated again
nsu.actualStateOfWorld.SetNodeStatusUpdateNeeded(nodeName) nsu.actualStateOfWorld.SetNodeStatusUpdateNeeded(nodeName)
return fmt.Errorf(
"failed to kubeClient.Core().Nodes().Patch for node %q. %v", glog.V(2).Infof(
"Could not update node status for %q; re-marking for update. %v",
nodeName, nodeName,
err) err)
}
glog.V(2).Infof(
"Updating status for node %q succeeded. patchBytes: %q VolumesAttached: %v",
nodeName,
string(patchBytes),
node.Status.VolumesAttached)
// We currently always return immediately on error
return err
}
} }
return nil return nil
} }
func (nsu *nodeStatusUpdater) updateNodeStatus(nodeName types.NodeName, nodeObj *v1.Node, attachedVolumes []v1.AttachedVolume) error {
clonedNode, err := api.Scheme.DeepCopy(nodeObj)
if err != nil {
return fmt.Errorf("error cloning node %q: %v",
nodeName,
err)
}
node, ok := clonedNode.(*v1.Node)
if !ok || node == nil {
return fmt.Errorf(
"failed to cast %q object %#v to Node",
nodeName,
clonedNode)
}
// TODO: Change to pkg/util/node.UpdateNodeStatus.
oldData, err := json.Marshal(node)
if err != nil {
return fmt.Errorf(
"failed to Marshal oldData for node %q. %v",
nodeName,
err)
}
node.Status.VolumesAttached = attachedVolumes
newData, err := json.Marshal(node)
if err != nil {
return fmt.Errorf(
"failed to Marshal newData for node %q. %v",
nodeName,
err)
}
patchBytes, err :=
strategicpatch.CreateTwoWayMergePatch(oldData, newData, node)
if err != nil {
return fmt.Errorf(
"failed to CreateTwoWayMergePatch for node %q. %v",
nodeName,
err)
}
_, err = nsu.kubeClient.Core().Nodes().PatchStatus(string(nodeName), patchBytes)
if err != nil {
return fmt.Errorf(
"failed to kubeClient.Core().Nodes().Patch for node %q. %v",
nodeName,
err)
}
glog.V(4).Infof(
"Updating status for node %q succeeded. patchBytes: %q VolumesAttached: %v",
nodeName,
string(patchBytes),
node.Status.VolumesAttached)
return nil
}