From e6ca651bedf27b2a77fd141a3612c6e676116dc2 Mon Sep 17 00:00:00 2001 From: Dan Winship Date: Tue, 2 Jul 2019 17:53:54 -0400 Subject: [PATCH 1/2] Document the problem with Node.Status.Addresses and strategic merge patch --- api/openapi-spec/swagger.json | 2 +- staging/src/k8s.io/api/core/v1/generated.proto | 3 +++ staging/src/k8s.io/api/core/v1/types.go | 3 +++ staging/src/k8s.io/api/core/v1/types_swagger_doc_generated.go | 2 +- 4 files changed, 8 insertions(+), 2 deletions(-) diff --git a/api/openapi-spec/swagger.json b/api/openapi-spec/swagger.json index 83ccba35e34..b79ba5db67f 100644 --- a/api/openapi-spec/swagger.json +++ b/api/openapi-spec/swagger.json @@ -8430,7 +8430,7 @@ "description": "NodeStatus is information about the current status of a node.", "properties": { "addresses": { - "description": "List of addresses reachable to the node. Queried from cloud provider, if available. More info: https://kubernetes.io/docs/concepts/nodes/node/#addresses", + "description": "List of addresses reachable to the node. Queried from cloud provider, if available. More info: https://kubernetes.io/docs/concepts/nodes/node/#addresses Note: This field is declared as mergeable, but the merge key is not sufficiently unique, which can cause data corruption when it is merged. Callers should instead use a full-replacement patch. See http://pr.k8s.io/79391 for an example.", "items": { "$ref": "#/definitions/io.k8s.api.core.v1.NodeAddress" }, diff --git a/staging/src/k8s.io/api/core/v1/generated.proto b/staging/src/k8s.io/api/core/v1/generated.proto index 742627b094a..a363d2dbfb1 100644 --- a/staging/src/k8s.io/api/core/v1/generated.proto +++ b/staging/src/k8s.io/api/core/v1/generated.proto @@ -2136,6 +2136,9 @@ message NodeStatus { // List of addresses reachable to the node. // Queried from cloud provider, if available. // More info: https://kubernetes.io/docs/concepts/nodes/node/#addresses + // Note: This field is declared as mergeable, but the merge key is not sufficiently + // unique, which can cause data corruption when it is merged. Callers should instead + // use a full-replacement patch. See http://pr.k8s.io/79391 for an example. // +optional // +patchMergeKey=type // +patchStrategy=merge diff --git a/staging/src/k8s.io/api/core/v1/types.go b/staging/src/k8s.io/api/core/v1/types.go index 2279a4b7a53..a1ecce473fb 100644 --- a/staging/src/k8s.io/api/core/v1/types.go +++ b/staging/src/k8s.io/api/core/v1/types.go @@ -4082,6 +4082,9 @@ type NodeStatus struct { // List of addresses reachable to the node. // Queried from cloud provider, if available. // More info: https://kubernetes.io/docs/concepts/nodes/node/#addresses + // Note: This field is declared as mergeable, but the merge key is not sufficiently + // unique, which can cause data corruption when it is merged. Callers should instead + // use a full-replacement patch. See http://pr.k8s.io/79391 for an example. // +optional // +patchMergeKey=type // +patchStrategy=merge diff --git a/staging/src/k8s.io/api/core/v1/types_swagger_doc_generated.go b/staging/src/k8s.io/api/core/v1/types_swagger_doc_generated.go index 89723b82121..0760ab06ae7 100644 --- a/staging/src/k8s.io/api/core/v1/types_swagger_doc_generated.go +++ b/staging/src/k8s.io/api/core/v1/types_swagger_doc_generated.go @@ -1129,7 +1129,7 @@ var map_NodeStatus = map[string]string{ "allocatable": "Allocatable represents the resources of a node that are available for scheduling. Defaults to Capacity.", "phase": "NodePhase is the recently observed lifecycle phase of the node. More info: https://kubernetes.io/docs/concepts/nodes/node/#phase The field is never populated, and now is deprecated.", "conditions": "Conditions is an array of current observed node conditions. More info: https://kubernetes.io/docs/concepts/nodes/node/#condition", - "addresses": "List of addresses reachable to the node. Queried from cloud provider, if available. More info: https://kubernetes.io/docs/concepts/nodes/node/#addresses", + "addresses": "List of addresses reachable to the node. Queried from cloud provider, if available. More info: https://kubernetes.io/docs/concepts/nodes/node/#addresses Note: This field is declared as mergeable, but the merge key is not sufficiently unique, which can cause data corruption when it is merged. Callers should instead use a full-replacement patch. See http://pr.k8s.io/79391 for an example.", "daemonEndpoints": "Endpoints of daemons running on the Node.", "nodeInfo": "Set of ids/uuids to uniquely identify the node. More info: https://kubernetes.io/docs/concepts/nodes/node/#info", "images": "List of container images on this node", From 05a9634fb32926b0e6192f9786ceec0b48eb83ed Mon Sep 17 00:00:00 2001 From: Dan Winship Date: Mon, 1 Jul 2019 14:33:52 -0400 Subject: [PATCH 2/2] Hack PatchNodeStatus() to override the patch type on Status.Addresses --- pkg/kubelet/kubelet_node_status_test.go | 153 ++++++++++++++++++++++++ pkg/util/node/BUILD | 1 + pkg/util/node/node.go | 72 ++++++++++- 3 files changed, 224 insertions(+), 2 deletions(-) diff --git a/pkg/kubelet/kubelet_node_status_test.go b/pkg/kubelet/kubelet_node_status_test.go index 665f4309d03..b363ea28e05 100644 --- a/pkg/kubelet/kubelet_node_status_test.go +++ b/pkg/kubelet/kubelet_node_status_test.go @@ -2166,3 +2166,156 @@ func TestNodeStatusHasChanged(t *testing.T) { }) } } + +func TestUpdateNodeAddresses(t *testing.T) { + testKubelet := newTestKubelet(t, false /* controllerAttachDetachEnabled */) + defer testKubelet.Cleanup() + kubelet := testKubelet.kubelet + kubeClient := testKubelet.fakeKubeClient + + existingNode := v1.Node{ObjectMeta: metav1.ObjectMeta{Name: testKubeletHostname}} + kubeClient.ReactionChain = fake.NewSimpleClientset(&v1.NodeList{Items: []v1.Node{existingNode}}).ReactionChain + + tests := []struct { + Name string + Before []v1.NodeAddress + After []v1.NodeAddress + }{ + { + Name: "nil to populated", + Before: nil, + After: []v1.NodeAddress{ + {Type: v1.NodeInternalIP, Address: "127.0.0.1"}, + {Type: v1.NodeHostName, Address: testKubeletHostname}, + }, + }, + { + Name: "empty to populated", + Before: []v1.NodeAddress{}, + After: []v1.NodeAddress{ + {Type: v1.NodeInternalIP, Address: "127.0.0.1"}, + {Type: v1.NodeHostName, Address: testKubeletHostname}, + }, + }, + { + Name: "populated to nil", + Before: []v1.NodeAddress{ + {Type: v1.NodeInternalIP, Address: "127.0.0.1"}, + {Type: v1.NodeHostName, Address: testKubeletHostname}, + }, + After: nil, + }, + { + Name: "populated to empty", + Before: []v1.NodeAddress{ + {Type: v1.NodeInternalIP, Address: "127.0.0.1"}, + {Type: v1.NodeHostName, Address: testKubeletHostname}, + }, + After: []v1.NodeAddress{}, + }, + { + Name: "multiple addresses of same type, no change", + Before: []v1.NodeAddress{ + {Type: v1.NodeInternalIP, Address: "127.0.0.1"}, + {Type: v1.NodeInternalIP, Address: "127.0.0.2"}, + {Type: v1.NodeInternalIP, Address: "127.0.0.3"}, + {Type: v1.NodeHostName, Address: testKubeletHostname}, + }, + After: []v1.NodeAddress{ + {Type: v1.NodeInternalIP, Address: "127.0.0.1"}, + {Type: v1.NodeInternalIP, Address: "127.0.0.2"}, + {Type: v1.NodeInternalIP, Address: "127.0.0.3"}, + {Type: v1.NodeHostName, Address: testKubeletHostname}, + }, + }, + { + Name: "1 InternalIP to 2 InternalIP", + Before: []v1.NodeAddress{ + {Type: v1.NodeInternalIP, Address: "127.0.0.1"}, + {Type: v1.NodeHostName, Address: testKubeletHostname}, + }, + After: []v1.NodeAddress{ + {Type: v1.NodeInternalIP, Address: "127.0.0.1"}, + {Type: v1.NodeInternalIP, Address: "127.0.0.2"}, + {Type: v1.NodeHostName, Address: testKubeletHostname}, + }, + }, + { + Name: "2 InternalIP to 1 InternalIP", + Before: []v1.NodeAddress{ + {Type: v1.NodeInternalIP, Address: "127.0.0.1"}, + {Type: v1.NodeInternalIP, Address: "127.0.0.2"}, + {Type: v1.NodeHostName, Address: testKubeletHostname}, + }, + After: []v1.NodeAddress{ + {Type: v1.NodeInternalIP, Address: "127.0.0.1"}, + {Type: v1.NodeHostName, Address: testKubeletHostname}, + }, + }, + { + Name: "2 InternalIP to 2 different InternalIP", + Before: []v1.NodeAddress{ + {Type: v1.NodeInternalIP, Address: "127.0.0.1"}, + {Type: v1.NodeInternalIP, Address: "127.0.0.2"}, + {Type: v1.NodeHostName, Address: testKubeletHostname}, + }, + After: []v1.NodeAddress{ + {Type: v1.NodeInternalIP, Address: "127.0.0.3"}, + {Type: v1.NodeInternalIP, Address: "127.0.0.4"}, + {Type: v1.NodeHostName, Address: testKubeletHostname}, + }, + }, + { + Name: "2 InternalIP to reversed order", + Before: []v1.NodeAddress{ + {Type: v1.NodeInternalIP, Address: "127.0.0.1"}, + {Type: v1.NodeInternalIP, Address: "127.0.0.2"}, + {Type: v1.NodeHostName, Address: testKubeletHostname}, + }, + After: []v1.NodeAddress{ + {Type: v1.NodeInternalIP, Address: "127.0.0.2"}, + {Type: v1.NodeInternalIP, Address: "127.0.0.1"}, + {Type: v1.NodeHostName, Address: testKubeletHostname}, + }, + }, + } + + for _, test := range tests { + t.Run(test.Name, func(t *testing.T) { + oldNode := &v1.Node{ + ObjectMeta: metav1.ObjectMeta{Name: testKubeletHostname}, + Spec: v1.NodeSpec{}, + Status: v1.NodeStatus{ + Addresses: test.Before, + }, + } + expectedNode := &v1.Node{ + ObjectMeta: metav1.ObjectMeta{Name: testKubeletHostname}, + Spec: v1.NodeSpec{}, + Status: v1.NodeStatus{ + Addresses: test.After, + }, + } + + _, err := kubeClient.CoreV1().Nodes().Update(oldNode) + assert.NoError(t, err) + kubelet.setNodeStatusFuncs = []func(*v1.Node) error{ + func(node *v1.Node) error { + node.Status.Addresses = expectedNode.Status.Addresses + return nil + }, + } + assert.NoError(t, kubelet.updateNodeStatus()) + + actions := kubeClient.Actions() + lastAction := actions[len(actions)-1] + assert.IsType(t, core.PatchActionImpl{}, lastAction) + patchAction := lastAction.(core.PatchActionImpl) + + updatedNode, err := applyNodeStatusPatch(oldNode, patchAction.GetPatch()) + require.NoError(t, err) + + assert.True(t, apiequality.Semantic.DeepEqual(updatedNode, expectedNode), "%s", diff.ObjectDiff(expectedNode, updatedNode)) + }) + } +} diff --git a/pkg/util/node/BUILD b/pkg/util/node/BUILD index 08692c148a7..106bfa982b3 100644 --- a/pkg/util/node/BUILD +++ b/pkg/util/node/BUILD @@ -12,6 +12,7 @@ go_library( importpath = "k8s.io/kubernetes/pkg/util/node", deps = [ "//staging/src/k8s.io/api/core/v1:go_default_library", + "//staging/src/k8s.io/apimachinery/pkg/api/equality:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/types:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/util/strategicpatch:go_default_library", diff --git a/pkg/util/node/node.go b/pkg/util/node/node.go index 087a0bc82dd..5c43ca1f3ef 100644 --- a/pkg/util/node/node.go +++ b/pkg/util/node/node.go @@ -27,6 +27,7 @@ import ( "k8s.io/klog" "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/equality" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/util/strategicpatch" @@ -195,12 +196,21 @@ func preparePatchBytesforNodeStatus(nodeName types.NodeName, oldNode *v1.Node, n return nil, fmt.Errorf("failed to Marshal oldData for node %q: %v", nodeName, err) } + // NodeStatus.Addresses is incorrectly annotated as patchStrategy=merge, which + // will cause strategicpatch.CreateTwoWayMergePatch to create an incorrect patch + // if it changed. + manuallyPatchAddresses := (len(oldNode.Status.Addresses) > 0) && !equality.Semantic.DeepEqual(oldNode.Status.Addresses, newNode.Status.Addresses) + // Reset spec to make sure only patch for Status or ObjectMeta is generated. // Note that we don't reset ObjectMeta here, because: // 1. This aligns with Nodes().UpdateStatus(). // 2. Some component does use this to update node annotations. - newNode.Spec = oldNode.Spec - newData, err := json.Marshal(newNode) + diffNode := newNode.DeepCopy() + diffNode.Spec = oldNode.Spec + if manuallyPatchAddresses { + diffNode.Status.Addresses = oldNode.Status.Addresses + } + newData, err := json.Marshal(diffNode) if err != nil { return nil, fmt.Errorf("failed to Marshal newData for node %q: %v", nodeName, err) } @@ -209,5 +219,63 @@ func preparePatchBytesforNodeStatus(nodeName types.NodeName, oldNode *v1.Node, n if err != nil { return nil, fmt.Errorf("failed to CreateTwoWayMergePatch for node %q: %v", nodeName, err) } + if manuallyPatchAddresses { + patchBytes, err = fixupPatchForNodeStatusAddresses(patchBytes, newNode.Status.Addresses) + if err != nil { + return nil, fmt.Errorf("failed to fix up NodeAddresses in patch for node %q: %v", nodeName, err) + } + } + return patchBytes, nil } + +// fixupPatchForNodeStatusAddresses adds a replace-strategy patch for Status.Addresses to +// the existing patch +func fixupPatchForNodeStatusAddresses(patchBytes []byte, addresses []v1.NodeAddress) ([]byte, error) { + // Given patchBytes='{"status": {"conditions": [ ... ], "phase": ...}}' and + // addresses=[{"type": "InternalIP", "address": "10.0.0.1"}], we need to generate: + // + // { + // "status": { + // "conditions": [ ... ], + // "phase": ..., + // "addresses": [ + // { + // "type": "InternalIP", + // "address": "10.0.0.1" + // }, + // { + // "$patch": "replace" + // } + // ] + // } + // } + + var patchMap map[string]interface{} + if err := json.Unmarshal(patchBytes, &patchMap); err != nil { + return nil, err + } + + addrBytes, err := json.Marshal(addresses) + if err != nil { + return nil, err + } + var addrArray []interface{} + if err := json.Unmarshal(addrBytes, &addrArray); err != nil { + return nil, err + } + addrArray = append(addrArray, map[string]interface{}{"$patch": "replace"}) + + status := patchMap["status"] + if status == nil { + status = map[string]interface{}{} + patchMap["status"] = status + } + statusMap, ok := status.(map[string]interface{}) + if !ok { + return nil, fmt.Errorf("unexpected data in patch") + } + statusMap["addresses"] = addrArray + + return json.Marshal(patchMap) +}