refactor method to pkg/util/node

This commit is contained in:
Di Xu 2017-07-31 13:08:42 +08:00
parent 210626577b
commit 13a355c837
5 changed files with 27 additions and 59 deletions

View File

@ -183,7 +183,7 @@ func (cnc *CloudNodeController) updateNodeAddress(node *v1.Node, instances cloud
if !nodeAddressesChangeDetected(node.Status.Addresses, newNode.Status.Addresses) { if !nodeAddressesChangeDetected(node.Status.Addresses, newNode.Status.Addresses) {
return return
} }
_, err = nodeutil.PatchNodeStatus(cnc.kubeClient.CoreV1(), types.NodeName(node.Name), node, newNode) _, _, err = nodeutil.PatchNodeStatus(cnc.kubeClient.CoreV1(), types.NodeName(node.Name), node, newNode)
if err != nil { if err != nil {
glog.Errorf("Error patching node with cloud ip addresses = [%v]", err) glog.Errorf("Error patching node with cloud ip addresses = [%v]", err)
} }

View File

@ -14,11 +14,11 @@ go_library(
importpath = "k8s.io/kubernetes/pkg/controller/volume/attachdetach/statusupdater", importpath = "k8s.io/kubernetes/pkg/controller/volume/attachdetach/statusupdater",
deps = [ deps = [
"//pkg/controller/volume/attachdetach/cache:go_default_library", "//pkg/controller/volume/attachdetach/cache:go_default_library",
"//pkg/util/node:go_default_library",
"//vendor/github.com/golang/glog:go_default_library", "//vendor/github.com/golang/glog:go_default_library",
"//vendor/k8s.io/api/core/v1:go_default_library", "//vendor/k8s.io/api/core/v1:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/api/errors:go_default_library", "//vendor/k8s.io/apimachinery/pkg/api/errors:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/types:go_default_library", "//vendor/k8s.io/apimachinery/pkg/types:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/util/strategicpatch:go_default_library",
"//vendor/k8s.io/client-go/kubernetes:go_default_library", "//vendor/k8s.io/client-go/kubernetes:go_default_library",
"//vendor/k8s.io/client-go/listers/core/v1:go_default_library", "//vendor/k8s.io/client-go/listers/core/v1:go_default_library",
], ],

View File

@ -19,18 +19,15 @@ limitations under the License.
package statusupdater package statusupdater
import ( import (
"encoding/json"
"fmt"
"github.com/golang/glog" "github.com/golang/glog"
"k8s.io/api/core/v1" "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/strategicpatch"
clientset "k8s.io/client-go/kubernetes" clientset "k8s.io/client-go/kubernetes"
corelisters "k8s.io/client-go/listers/core/v1" corelisters "k8s.io/client-go/listers/core/v1"
"k8s.io/kubernetes/pkg/controller/volume/attachdetach/cache" "k8s.io/kubernetes/pkg/controller/volume/attachdetach/cache"
nodeutil "k8s.io/kubernetes/pkg/util/node"
) )
// NodeStatusUpdater defines a set of operations for updating the // NodeStatusUpdater defines a set of operations for updating the
@ -100,47 +97,12 @@ func (nsu *nodeStatusUpdater) UpdateNodeStatuses() error {
func (nsu *nodeStatusUpdater) updateNodeStatus(nodeName types.NodeName, nodeObj *v1.Node, attachedVolumes []v1.AttachedVolume) error { func (nsu *nodeStatusUpdater) updateNodeStatus(nodeName types.NodeName, nodeObj *v1.Node, attachedVolumes []v1.AttachedVolume) error {
node := nodeObj.DeepCopy() node := nodeObj.DeepCopy()
// TODO: Change to pkg/util/node.UpdateNodeStatus.
oldData, err := json.Marshal(node)
if err != nil {
return fmt.Errorf(
"failed to Marshal oldData for node %q. %v",
nodeName,
err)
}
node.Status.VolumesAttached = attachedVolumes node.Status.VolumesAttached = attachedVolumes
_, patchBytes, err := nodeutil.PatchNodeStatus(nsu.kubeClient.CoreV1(), nodeName, nodeObj, node)
newData, err := json.Marshal(node)
if err != nil { if err != nil {
return fmt.Errorf( return err
"failed to Marshal newData for node %q. %v",
nodeName,
err)
} }
patchBytes, err := glog.V(4).Infof("Updating status %q for node %q succeeded. VolumesAttached: %v", patchBytes, nodeName, attachedVolumes)
strategicpatch.CreateTwoWayMergePatch(oldData, newData, node)
if err != nil {
return fmt.Errorf(
"failed to CreateTwoWayMergePatch for node %q. %v",
nodeName,
err)
}
_, err = nsu.kubeClient.CoreV1().Nodes().PatchStatus(string(nodeName), patchBytes)
if err != nil {
return fmt.Errorf(
"failed to kubeClient.CoreV1().Nodes().Patch for node %q. %v",
nodeName,
err)
}
glog.V(4).Infof(
"Updating status for node %q succeeded. patchBytes: %q VolumesAttached: %v",
nodeName,
string(patchBytes),
node.Status.VolumesAttached)
return nil return nil
} }

View File

@ -132,8 +132,7 @@ func (kl *Kubelet) tryRegisterWithAPIServer(node *v1.Node) bool {
requiresUpdate := kl.reconcileCMADAnnotationWithExistingNode(node, existingNode) requiresUpdate := kl.reconcileCMADAnnotationWithExistingNode(node, existingNode)
requiresUpdate = kl.updateDefaultLabels(node, existingNode) || requiresUpdate requiresUpdate = kl.updateDefaultLabels(node, existingNode) || requiresUpdate
if requiresUpdate { if requiresUpdate {
if _, err := nodeutil.PatchNodeStatus(kl.kubeClient.CoreV1(), types.NodeName(kl.nodeName), if _, _, err := nodeutil.PatchNodeStatus(kl.kubeClient.CoreV1(), types.NodeName(kl.nodeName), originalNode, existingNode); err != nil {
originalNode, existingNode); err != nil {
glog.Errorf("Unable to reconcile node %q with API server: error updating node: %v", kl.nodeName, err) glog.Errorf("Unable to reconcile node %q with API server: error updating node: %v", kl.nodeName, err)
return false return false
} }
@ -142,8 +141,7 @@ func (kl *Kubelet) tryRegisterWithAPIServer(node *v1.Node) bool {
return true return true
} }
glog.Errorf( glog.Errorf("Previously node %q had externalID %q; now it is %q; will delete and recreate.",
"Previously node %q had externalID %q; now it is %q; will delete and recreate.",
kl.nodeName, node.Spec.ExternalID, existingNode.Spec.ExternalID, kl.nodeName, node.Spec.ExternalID, existingNode.Spec.ExternalID,
) )
if err := kl.kubeClient.CoreV1().Nodes().Delete(node.Name, nil); err != nil { if err := kl.kubeClient.CoreV1().Nodes().Delete(node.Name, nil); err != nil {
@ -415,7 +413,7 @@ func (kl *Kubelet) tryUpdateNodeStatus(tryNumber int) error {
kl.setNodeStatus(node) kl.setNodeStatus(node)
// Patch the current status on the API server // Patch the current status on the API server
updatedNode, err := nodeutil.PatchNodeStatus(kl.heartbeatClient, types.NodeName(kl.nodeName), originalNode, node) updatedNode, _, err := nodeutil.PatchNodeStatus(kl.heartbeatClient, types.NodeName(kl.nodeName), originalNode, node)
if err != nil { if err != nil {
return err return err
} }

View File

@ -151,10 +151,23 @@ func SetNodeCondition(c clientset.Interface, node types.NodeName, condition v1.N
} }
// PatchNodeStatus patches node status. // PatchNodeStatus patches node status.
func PatchNodeStatus(c v1core.CoreV1Interface, nodeName types.NodeName, oldNode *v1.Node, newNode *v1.Node) (*v1.Node, error) { func PatchNodeStatus(c v1core.CoreV1Interface, nodeName types.NodeName, oldNode *v1.Node, newNode *v1.Node) (*v1.Node, []byte, error) {
patchBytes, err := preparePatchBytesforNodeStatus(nodeName, oldNode, newNode)
if err != nil {
return nil, nil, err
}
updatedNode, err := c.Nodes().Patch(string(nodeName), types.StrategicMergePatchType, patchBytes, "status")
if err != nil {
return nil, nil, fmt.Errorf("failed to patch status %q for node %q: %v", patchBytes, nodeName, err)
}
return updatedNode, patchBytes, nil
}
func preparePatchBytesforNodeStatus(nodeName types.NodeName, oldNode *v1.Node, newNode *v1.Node) ([]byte, error) {
oldData, err := json.Marshal(oldNode) oldData, err := json.Marshal(oldNode)
if err != nil { if err != nil {
return nil, fmt.Errorf("failed to marshal old node %#v for node %q: %v", oldNode, nodeName, err) return nil, fmt.Errorf("failed to Marshal oldData for node %q: %v", nodeName, err)
} }
// Reset spec to make sure only patch for Status or ObjectMeta is generated. // Reset spec to make sure only patch for Status or ObjectMeta is generated.
@ -164,17 +177,12 @@ func PatchNodeStatus(c v1core.CoreV1Interface, nodeName types.NodeName, oldNode
newNode.Spec = oldNode.Spec newNode.Spec = oldNode.Spec
newData, err := json.Marshal(newNode) newData, err := json.Marshal(newNode)
if err != nil { if err != nil {
return nil, fmt.Errorf("failed to marshal new node %#v for node %q: %v", newNode, nodeName, err) return nil, fmt.Errorf("failed to Marshal newData for node %q: %v", nodeName, err)
} }
patchBytes, err := strategicpatch.CreateTwoWayMergePatch(oldData, newData, v1.Node{}) patchBytes, err := strategicpatch.CreateTwoWayMergePatch(oldData, newData, v1.Node{})
if err != nil { if err != nil {
return nil, fmt.Errorf("failed to create patch for node %q: %v", nodeName, err) return nil, fmt.Errorf("failed to CreateTwoWayMergePatch for node %q: %v", nodeName, err)
} }
return patchBytes, nil
updatedNode, err := c.Nodes().Patch(string(nodeName), types.StrategicMergePatchType, patchBytes, "status")
if err != nil {
return nil, fmt.Errorf("failed to patch status %q for node %q: %v", patchBytes, nodeName, err)
}
return updatedNode, nil
} }