Use PatchStatus to update node status in kubelet.

This commit is contained in:
Random-Liu
2016-12-01 14:46:20 -08:00
parent 7fe882c479
commit beba1ebbf8
6 changed files with 196 additions and 118 deletions

View File

@@ -17,6 +17,7 @@ limitations under the License.
package kubelet
import (
"encoding/json"
"fmt"
"reflect"
goruntime "runtime"
@@ -39,6 +40,7 @@ import (
"k8s.io/kubernetes/pkg/runtime"
"k8s.io/kubernetes/pkg/util/diff"
"k8s.io/kubernetes/pkg/util/rand"
"k8s.io/kubernetes/pkg/util/strategicpatch"
"k8s.io/kubernetes/pkg/util/uuid"
"k8s.io/kubernetes/pkg/util/wait"
"k8s.io/kubernetes/pkg/version"
@@ -90,6 +92,23 @@ func generateImageTags() []string {
return tagList
}
func applyNodeStatusPatch(originalNode *v1.Node, patch []byte) (*v1.Node, error) {
original, err := json.Marshal(originalNode)
if err != nil {
return nil, fmt.Errorf("failed to marshal original node %#v: %v", originalNode, err)
}
updated, err := strategicpatch.StrategicMergePatch(original, patch, v1.Node{})
if err != nil {
return nil, fmt.Errorf("failed to apply strategic merge patch %q on node %#v: %v",
patch, originalNode, err)
}
updatedNode := &v1.Node{}
if err := json.Unmarshal(updated, updatedNode); err != nil {
return nil, fmt.Errorf("failed to unmarshal updated node %q: %v", updated, err)
}
return updatedNode, nil
}
func TestUpdateNewNodeStatus(t *testing.T) {
// generate one more than maxImagesInNodeStatus in inputImageList
inputImageList, expectedImageList := generateTestingImageList(maxImagesInNodeStatus + 1)
@@ -97,9 +116,8 @@ func TestUpdateNewNodeStatus(t *testing.T) {
t, inputImageList, false /* controllerAttachDetachEnabled */)
kubelet := testKubelet.kubelet
kubeClient := testKubelet.fakeKubeClient
kubeClient.ReactionChain = fake.NewSimpleClientset(&v1.NodeList{Items: []v1.Node{
{ObjectMeta: v1.ObjectMeta{Name: testKubeletHostname}},
}}).ReactionChain
existingNode := v1.Node{ObjectMeta: v1.ObjectMeta{Name: testKubeletHostname}}
kubeClient.ReactionChain = fake.NewSimpleClientset(&v1.NodeList{Items: []v1.Node{existingNode}}).ReactionChain
machineInfo := &cadvisorapi.MachineInfo{
MachineID: "123",
SystemUUID: "abc",
@@ -200,12 +218,12 @@ func TestUpdateNewNodeStatus(t *testing.T) {
if len(actions) != 2 {
t.Fatalf("unexpected actions: %v", actions)
}
if !actions[1].Matches("update", "nodes") || actions[1].GetSubresource() != "status" {
if !actions[1].Matches("patch", "nodes") || actions[1].GetSubresource() != "status" {
t.Fatalf("unexpected actions: %v", actions)
}
updatedNode, ok := actions[1].(core.UpdateAction).GetObject().(*v1.Node)
if !ok {
t.Errorf("unexpected object type")
updatedNode, err := applyNodeStatusPatch(&existingNode, actions[1].(core.PatchActionImpl).GetPatch())
if err != nil {
t.Fatalf("can't apply node status patch: %v", err)
}
for i, cond := range updatedNode.Status.Conditions {
if cond.LastHeartbeatTime.IsZero() {
@@ -237,9 +255,8 @@ func TestUpdateNewNodeOutOfDiskStatusWithTransitionFrequency(t *testing.T) {
testKubelet := newTestKubelet(t, false /* controllerAttachDetachEnabled */)
kubelet := testKubelet.kubelet
kubeClient := testKubelet.fakeKubeClient
kubeClient.ReactionChain = fake.NewSimpleClientset(&v1.NodeList{Items: []v1.Node{
{ObjectMeta: v1.ObjectMeta{Name: testKubeletHostname}},
}}).ReactionChain
existingNode := v1.Node{ObjectMeta: v1.ObjectMeta{Name: testKubeletHostname}}
kubeClient.ReactionChain = fake.NewSimpleClientset(&v1.NodeList{Items: []v1.Node{existingNode}}).ReactionChain
machineInfo := &cadvisorapi.MachineInfo{
MachineID: "123",
SystemUUID: "abc",
@@ -280,12 +297,13 @@ func TestUpdateNewNodeOutOfDiskStatusWithTransitionFrequency(t *testing.T) {
if len(actions) != 2 {
t.Fatalf("unexpected actions: %v", actions)
}
if !actions[1].Matches("update", "nodes") || actions[1].GetSubresource() != "status" {
// StrategicMergePatch(original, patch []byte, dataStruct interface{}) ([]byte, error)
if !actions[1].Matches("patch", "nodes") || actions[1].GetSubresource() != "status" {
t.Fatalf("unexpected actions: %v", actions)
}
updatedNode, ok := actions[1].(core.UpdateAction).GetObject().(*v1.Node)
if !ok {
t.Errorf("unexpected object type")
updatedNode, err := applyNodeStatusPatch(&existingNode, actions[1].(core.PatchActionImpl).GetPatch())
if err != nil {
t.Fatalf("can't apply node status patch: %v", err)
}
var oodCondition v1.NodeCondition
@@ -312,58 +330,57 @@ func TestUpdateExistingNodeStatus(t *testing.T) {
testKubelet := newTestKubelet(t, false /* controllerAttachDetachEnabled */)
kubelet := testKubelet.kubelet
kubeClient := testKubelet.fakeKubeClient
kubeClient.ReactionChain = fake.NewSimpleClientset(&v1.NodeList{Items: []v1.Node{
{
ObjectMeta: v1.ObjectMeta{Name: testKubeletHostname},
Spec: v1.NodeSpec{},
Status: v1.NodeStatus{
Conditions: []v1.NodeCondition{
{
Type: v1.NodeOutOfDisk,
Status: v1.ConditionTrue,
Reason: "KubeletOutOfDisk",
Message: "out of disk space",
LastHeartbeatTime: metav1.Date(2012, 1, 1, 0, 0, 0, 0, time.UTC),
LastTransitionTime: metav1.Date(2012, 1, 1, 0, 0, 0, 0, time.UTC),
},
{
Type: v1.NodeMemoryPressure,
Status: v1.ConditionFalse,
Reason: "KubeletHasSufficientMemory",
Message: fmt.Sprintf("kubelet has sufficient memory available"),
LastHeartbeatTime: metav1.Date(2012, 1, 1, 0, 0, 0, 0, time.UTC),
LastTransitionTime: metav1.Date(2012, 1, 1, 0, 0, 0, 0, time.UTC),
},
{
Type: v1.NodeDiskPressure,
Status: v1.ConditionFalse,
Reason: "KubeletHasSufficientDisk",
Message: fmt.Sprintf("kubelet has sufficient disk space available"),
LastHeartbeatTime: metav1.Date(2012, 1, 1, 0, 0, 0, 0, time.UTC),
LastTransitionTime: metav1.Date(2012, 1, 1, 0, 0, 0, 0, time.UTC),
},
{
Type: v1.NodeReady,
Status: v1.ConditionTrue,
Reason: "KubeletReady",
Message: fmt.Sprintf("kubelet is posting ready status"),
LastHeartbeatTime: metav1.Date(2012, 1, 1, 0, 0, 0, 0, time.UTC),
LastTransitionTime: metav1.Date(2012, 1, 1, 0, 0, 0, 0, time.UTC),
},
existingNode := v1.Node{
ObjectMeta: v1.ObjectMeta{Name: testKubeletHostname},
Spec: v1.NodeSpec{},
Status: v1.NodeStatus{
Conditions: []v1.NodeCondition{
{
Type: v1.NodeOutOfDisk,
Status: v1.ConditionTrue,
Reason: "KubeletOutOfDisk",
Message: "out of disk space",
LastHeartbeatTime: metav1.Date(2012, 1, 1, 0, 0, 0, 0, time.UTC),
LastTransitionTime: metav1.Date(2012, 1, 1, 0, 0, 0, 0, time.UTC),
},
Capacity: v1.ResourceList{
v1.ResourceCPU: *resource.NewMilliQuantity(3000, resource.DecimalSI),
v1.ResourceMemory: *resource.NewQuantity(20E9, resource.BinarySI),
v1.ResourcePods: *resource.NewQuantity(0, resource.DecimalSI),
{
Type: v1.NodeMemoryPressure,
Status: v1.ConditionFalse,
Reason: "KubeletHasSufficientMemory",
Message: fmt.Sprintf("kubelet has sufficient memory available"),
LastHeartbeatTime: metav1.Date(2012, 1, 1, 0, 0, 0, 0, time.UTC),
LastTransitionTime: metav1.Date(2012, 1, 1, 0, 0, 0, 0, time.UTC),
},
Allocatable: v1.ResourceList{
v1.ResourceCPU: *resource.NewMilliQuantity(2800, resource.DecimalSI),
v1.ResourceMemory: *resource.NewQuantity(19900E6, resource.BinarySI),
v1.ResourcePods: *resource.NewQuantity(0, resource.DecimalSI),
{
Type: v1.NodeDiskPressure,
Status: v1.ConditionFalse,
Reason: "KubeletHasSufficientDisk",
Message: fmt.Sprintf("kubelet has sufficient disk space available"),
LastHeartbeatTime: metav1.Date(2012, 1, 1, 0, 0, 0, 0, time.UTC),
LastTransitionTime: metav1.Date(2012, 1, 1, 0, 0, 0, 0, time.UTC),
},
{
Type: v1.NodeReady,
Status: v1.ConditionTrue,
Reason: "KubeletReady",
Message: fmt.Sprintf("kubelet is posting ready status"),
LastHeartbeatTime: metav1.Date(2012, 1, 1, 0, 0, 0, 0, time.UTC),
LastTransitionTime: metav1.Date(2012, 1, 1, 0, 0, 0, 0, time.UTC),
},
},
Capacity: v1.ResourceList{
v1.ResourceCPU: *resource.NewMilliQuantity(3000, resource.DecimalSI),
v1.ResourceMemory: *resource.NewQuantity(20E9, resource.BinarySI),
v1.ResourcePods: *resource.NewQuantity(0, resource.DecimalSI),
},
Allocatable: v1.ResourceList{
v1.ResourceCPU: *resource.NewMilliQuantity(2800, resource.DecimalSI),
v1.ResourceMemory: *resource.NewQuantity(19900E6, resource.BinarySI),
v1.ResourcePods: *resource.NewQuantity(0, resource.DecimalSI),
},
},
}}).ReactionChain
}
kubeClient.ReactionChain = fake.NewSimpleClientset(&v1.NodeList{Items: []v1.Node{existingNode}}).ReactionChain
mockCadvisor := testKubelet.fakeCadvisor
mockCadvisor.On("Start").Return(nil)
machineInfo := &cadvisorapi.MachineInfo{
@@ -474,13 +491,13 @@ func TestUpdateExistingNodeStatus(t *testing.T) {
if len(actions) != 2 {
t.Errorf("unexpected actions: %v", actions)
}
updateAction, ok := actions[1].(core.UpdateAction)
patchAction, ok := actions[1].(core.PatchActionImpl)
if !ok {
t.Errorf("unexpected action type. expected UpdateAction, got %#v", actions[1])
t.Errorf("unexpected action type. expected PatchActionImpl, got %#v", actions[1])
}
updatedNode, ok := updateAction.GetObject().(*v1.Node)
updatedNode, err := applyNodeStatusPatch(&existingNode, patchAction.GetPatch())
if !ok {
t.Errorf("unexpected object type")
t.Fatalf("can't apply node status patch: %v", err)
}
for i, cond := range updatedNode.Status.Conditions {
// Expect LastProbeTime to be updated to Now, while LastTransitionTime to be the same.
@@ -508,33 +525,35 @@ func TestUpdateExistingNodeOutOfDiskStatusWithTransitionFrequency(t *testing.T)
testKubelet := newTestKubelet(t, false /* controllerAttachDetachEnabled */)
kubelet := testKubelet.kubelet
clock := testKubelet.fakeClock
// Do not set nano second, because apiserver function doesn't support nano second. (Only support
// RFC3339).
clock.SetTime(time.Unix(123456, 0))
kubeClient := testKubelet.fakeKubeClient
kubeClient.ReactionChain = fake.NewSimpleClientset(&v1.NodeList{Items: []v1.Node{
{
ObjectMeta: v1.ObjectMeta{Name: testKubeletHostname},
Spec: v1.NodeSpec{},
Status: v1.NodeStatus{
Conditions: []v1.NodeCondition{
{
Type: v1.NodeReady,
Status: v1.ConditionTrue,
Reason: "KubeletReady",
Message: fmt.Sprintf("kubelet is posting ready status"),
LastHeartbeatTime: metav1.NewTime(clock.Now()),
LastTransitionTime: metav1.NewTime(clock.Now()),
},
{
Type: v1.NodeOutOfDisk,
Status: v1.ConditionTrue,
Reason: "KubeletOutOfDisk",
Message: "out of disk space",
LastHeartbeatTime: metav1.NewTime(clock.Now()),
LastTransitionTime: metav1.NewTime(clock.Now()),
},
existingNode := v1.Node{
ObjectMeta: v1.ObjectMeta{Name: testKubeletHostname},
Spec: v1.NodeSpec{},
Status: v1.NodeStatus{
Conditions: []v1.NodeCondition{
{
Type: v1.NodeReady,
Status: v1.ConditionTrue,
Reason: "KubeletReady",
Message: fmt.Sprintf("kubelet is posting ready status"),
LastHeartbeatTime: metav1.NewTime(clock.Now()),
LastTransitionTime: metav1.NewTime(clock.Now()),
},
{
Type: v1.NodeOutOfDisk,
Status: v1.ConditionTrue,
Reason: "KubeletOutOfDisk",
Message: "out of disk space",
LastHeartbeatTime: metav1.NewTime(clock.Now()),
LastTransitionTime: metav1.NewTime(clock.Now()),
},
},
},
}}).ReactionChain
}
kubeClient.ReactionChain = fake.NewSimpleClientset(&v1.NodeList{Items: []v1.Node{existingNode}}).ReactionChain
mockCadvisor := testKubelet.fakeCadvisor
machineInfo := &cadvisorapi.MachineInfo{
MachineID: "123",
@@ -637,13 +656,13 @@ func TestUpdateExistingNodeOutOfDiskStatusWithTransitionFrequency(t *testing.T)
if len(actions) != 2 {
t.Errorf("%d. unexpected actions: %v", tcIdx, actions)
}
updateAction, ok := actions[1].(core.UpdateAction)
patchAction, ok := actions[1].(core.PatchActionImpl)
if !ok {
t.Errorf("%d. unexpected action type. expected UpdateAction, got %#v", tcIdx, actions[1])
t.Errorf("%d. unexpected action type. expected PatchActionImpl, got %#v", tcIdx, actions[1])
}
updatedNode, ok := updateAction.GetObject().(*v1.Node)
if !ok {
t.Errorf("%d. unexpected object type", tcIdx)
updatedNode, err := applyNodeStatusPatch(&existingNode, patchAction.GetPatch())
if err != nil {
t.Fatalf("can't apply node status patch: %v", err)
}
kubeClient.ClearActions()
@@ -656,7 +675,6 @@ func TestUpdateExistingNodeOutOfDiskStatusWithTransitionFrequency(t *testing.T)
if !reflect.DeepEqual(tc.expected, oodCondition) {
t.Errorf("%d.\nunexpected objects: %s", tcIdx, diff.ObjectDiff(tc.expected, oodCondition))
}
}
}
@@ -666,9 +684,8 @@ func TestUpdateNodeStatusWithRuntimeStateError(t *testing.T) {
kubelet := testKubelet.kubelet
clock := testKubelet.fakeClock
kubeClient := testKubelet.fakeKubeClient
kubeClient.ReactionChain = fake.NewSimpleClientset(&v1.NodeList{Items: []v1.Node{
{ObjectMeta: v1.ObjectMeta{Name: testKubeletHostname}},
}}).ReactionChain
existingNode := v1.Node{ObjectMeta: v1.ObjectMeta{Name: testKubeletHostname}}
kubeClient.ReactionChain = fake.NewSimpleClientset(&v1.NodeList{Items: []v1.Node{existingNode}}).ReactionChain
mockCadvisor := testKubelet.fakeCadvisor
mockCadvisor.On("Start").Return(nil)
machineInfo := &cadvisorapi.MachineInfo{
@@ -772,12 +789,12 @@ func TestUpdateNodeStatusWithRuntimeStateError(t *testing.T) {
if len(actions) != 2 {
t.Fatalf("unexpected actions: %v", actions)
}
if !actions[1].Matches("update", "nodes") || actions[1].GetSubresource() != "status" {
if !actions[1].Matches("patch", "nodes") || actions[1].GetSubresource() != "status" {
t.Fatalf("unexpected actions: %v", actions)
}
updatedNode, ok := actions[1].(core.UpdateAction).GetObject().(*v1.Node)
if !ok {
t.Errorf("unexpected action type. expected UpdateAction, got %#v", actions[1])
updatedNode, err := applyNodeStatusPatch(&existingNode, actions[1].(core.PatchActionImpl).GetPatch())
if err != nil {
t.Fatalf("can't apply node status patch: %v", err)
}
for i, cond := range updatedNode.Status.Conditions {
@@ -984,7 +1001,7 @@ func TestTryRegisterWithApiServer(t *testing.T) {
existingNode *v1.Node
createError error
getError error
updateError error
patchError error
deleteError error
expectedResult bool
expectedActions int
@@ -1056,7 +1073,7 @@ func TestTryRegisterWithApiServer(t *testing.T) {
newNode: newNode(false, "a"),
createError: alreadyExists,
existingNode: newNode(true, "a"),
updateError: conflict,
patchError: conflict,
expectedResult: false,
expectedActions: 3,
},
@@ -1087,9 +1104,9 @@ func TestTryRegisterWithApiServer(t *testing.T) {
// Return an existing (matching) node on get.
return true, tc.existingNode, tc.getError
})
kubeClient.AddReactor("update", "nodes", func(action core.Action) (bool, runtime.Object, error) {
kubeClient.AddReactor("patch", "nodes", func(action core.Action) (bool, runtime.Object, error) {
if action.GetSubresource() == "status" {
return true, nil, tc.updateError
return true, nil, tc.patchError
}
return notImplemented(action)
})
@@ -1124,11 +1141,12 @@ func TestTryRegisterWithApiServer(t *testing.T) {
t.Errorf("%v: unexpected type; couldn't convert to *v1.Node: %+v", tc.name, createAction.GetObject())
continue
}
} else if action.GetVerb() == "update" {
updateAction := action.(core.UpdateAction)
savedNode, ok = updateAction.GetObject().(*v1.Node)
if !ok {
t.Errorf("%v: unexpected type; couldn't convert to *v1.Node: %+v", tc.name, updateAction.GetObject())
} else if action.GetVerb() == "patch" {
patchAction := action.(core.PatchActionImpl)
var err error
savedNode, err = applyNodeStatusPatch(tc.existingNode, patchAction.GetPatch())
if err != nil {
t.Errorf("can't apply node status patch: %v", err)
continue
}
}