diff --git a/pkg/kubelet/fake_node_manager.go b/pkg/kubelet/fake_node_manager.go new file mode 100644 index 00000000000..f24041377c4 --- /dev/null +++ b/pkg/kubelet/fake_node_manager.go @@ -0,0 +1,46 @@ +/* +Copyright 2015 The Kubernetes Authors All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package kubelet + +import ( + "net" + + "k8s.io/kubernetes/pkg/api" +) + +type fakeNodeManager struct { + podCIDR string + node *api.Node + IP net.IP +} + +var _ nodeManager = &fakeNodeManager{} + +func (f *fakeNodeManager) Start() { +} + +func (f *fakeNodeManager) GetNode() (*api.Node, error) { + return f.node, nil +} + +func (f *fakeNodeManager) GetHostIP() (net.IP, error) { + return f.IP, nil +} + +func (f *fakeNodeManager) GetPodCIDR() string { + return f.podCIDR +} diff --git a/pkg/kubelet/kubelet_test.go b/pkg/kubelet/kubelet_test.go index 339b055eed7..f10ae37c4b2 100644 --- a/pkg/kubelet/kubelet_test.go +++ b/pkg/kubelet/kubelet_test.go @@ -34,7 +34,6 @@ import ( cadvisorApi "github.com/google/cadvisor/info/v1" cadvisorApiv2 "github.com/google/cadvisor/info/v2" "k8s.io/kubernetes/pkg/api" - apierrors "k8s.io/kubernetes/pkg/api/errors" "k8s.io/kubernetes/pkg/api/resource" "k8s.io/kubernetes/pkg/api/testapi" "k8s.io/kubernetes/pkg/capabilities" @@ -45,11 +44,9 @@ import ( kubecontainer "k8s.io/kubernetes/pkg/kubelet/container" "k8s.io/kubernetes/pkg/kubelet/network" "k8s.io/kubernetes/pkg/kubelet/status" - "k8s.io/kubernetes/pkg/runtime" "k8s.io/kubernetes/pkg/types" "k8s.io/kubernetes/pkg/util" "k8s.io/kubernetes/pkg/util/bandwidth" - "k8s.io/kubernetes/pkg/version" "k8s.io/kubernetes/pkg/volume" _ "k8s.io/kubernetes/pkg/volume/host_path" ) @@ -77,6 +74,7 @@ type TestKubelet struct { fakeCadvisor *cadvisor.Mock fakeKubeClient *testclient.Fake fakeMirrorClient *fakeMirrorClient + fakeNodeManager *fakeNodeManager } func newTestKubelet(t *testing.T) *TestKubelet { @@ -103,14 +101,15 @@ func newTestKubelet(t *testing.T) *TestKubelet { kubelet.sourcesReady = func() bool { return true } kubelet.masterServiceNamespace = api.NamespaceDefault kubelet.serviceLister = testServiceLister{} - kubelet.nodeLister = testNodeLister{} + kubelet.readinessManager = kubecontainer.NewReadinessManager() kubelet.recorder = fakeRecorder kubelet.statusManager = status.NewManager(fakeKubeClient) if err := kubelet.setupDataDirs(); err != nil { t.Fatalf("can't initialize kubelet data dirs: %v", err) } - kubelet.daemonEndpoints = &api.NodeDaemonEndpoints{} + fakeNodeManager := &fakeNodeManager{} + kubelet.nodeManager = fakeNodeManager mockCadvisor := &cadvisor.Mock{} kubelet.cadvisor = mockCadvisor podManager, fakeMirrorClient := newFakePodManager() @@ -136,7 +135,7 @@ func newTestKubelet(t *testing.T) *TestKubelet { kubelet.backOff = util.NewBackOff(time.Second, time.Minute) kubelet.backOff.Clock = fakeClock kubelet.podKillingCh = make(chan *kubecontainer.Pod, 20) - return &TestKubelet{kubelet, fakeRuntime, mockCadvisor, fakeKubeClient, fakeMirrorClient} + return &TestKubelet{kubelet, fakeRuntime, mockCadvisor, fakeKubeClient, fakeMirrorClient, fakeNodeManager} } func newTestPods(count int) []*api.Pod { @@ -962,25 +961,6 @@ func (ls testServiceLister) List() (api.ServiceList, error) { }, nil } -type testNodeLister struct { - nodes []api.Node -} - -func (ls testNodeLister) GetNodeInfo(id string) (*api.Node, error) { - for _, node := range ls.nodes { - if node.Name == id { - return &node, nil - } - } - return nil, fmt.Errorf("Node with name: %s does not exist", id) -} - -func (ls testNodeLister) List() (api.NodeList, error) { - return api.NodeList{ - Items: ls.nodes, - }, nil -} - type envs []kubecontainer.EnvVar func (e envs) Len() int { @@ -2142,10 +2122,10 @@ func TestHandlePortConflicts(t *testing.T) { // Tests that we handle not matching labels selector correctly by setting the failed status in status map. func TestHandleNodeSelector(t *testing.T) { testKubelet := newTestKubelet(t) + testKubelet.fakeNodeManager.node = &api.Node{ + ObjectMeta: api.ObjectMeta{Name: testKubeletHostname, Labels: map[string]string{"key": "B"}}} + kl := testKubelet.kubelet - kl.nodeLister = testNodeLister{nodes: []api.Node{ - {ObjectMeta: api.ObjectMeta{Name: testKubeletHostname, Labels: map[string]string{"key": "B"}}}, - }} testKubelet.fakeCadvisor.On("MachineInfo").Return(&cadvisorApi.MachineInfo{}, nil) testKubelet.fakeCadvisor.On("DockerImagesFsInfo").Return(cadvisorApiv2.FsInfo{}, nil) testKubelet.fakeCadvisor.On("RootFsInfo").Return(cadvisorApiv2.FsInfo{}, nil) @@ -2356,309 +2336,6 @@ func TestValidateContainerStatus(t *testing.T) { } } -func TestUpdateNewNodeStatus(t *testing.T) { - testKubelet := newTestKubelet(t) - kubelet := testKubelet.kubelet - kubeClient := testKubelet.fakeKubeClient - kubeClient.ReactionChain = testclient.NewSimpleFake(&api.NodeList{Items: []api.Node{ - {ObjectMeta: api.ObjectMeta{Name: testKubeletHostname}}, - }}).ReactionChain - machineInfo := &cadvisorApi.MachineInfo{ - MachineID: "123", - SystemUUID: "abc", - BootID: "1b3", - NumCores: 2, - MemoryCapacity: 1024, - } - mockCadvisor := testKubelet.fakeCadvisor - mockCadvisor.On("MachineInfo").Return(machineInfo, nil) - versionInfo := &cadvisorApi.VersionInfo{ - KernelVersion: "3.16.0-0.bpo.4-amd64", - ContainerOsVersion: "Debian GNU/Linux 7 (wheezy)", - DockerVersion: "1.5.0", - } - mockCadvisor.On("VersionInfo").Return(versionInfo, nil) - expectedNode := &api.Node{ - ObjectMeta: api.ObjectMeta{Name: testKubeletHostname}, - Spec: api.NodeSpec{}, - Status: api.NodeStatus{ - Conditions: []api.NodeCondition{ - { - Type: api.NodeReady, - Status: api.ConditionTrue, - Reason: "KubeletReady", - Message: fmt.Sprintf("kubelet is posting ready status"), - LastHeartbeatTime: util.Time{}, - LastTransitionTime: util.Time{}, - }, - }, - NodeInfo: api.NodeSystemInfo{ - MachineID: "123", - SystemUUID: "abc", - BootID: "1b3", - KernelVersion: "3.16.0-0.bpo.4-amd64", - OsImage: "Debian GNU/Linux 7 (wheezy)", - ContainerRuntimeVersion: "docker://1.5.0", - KubeletVersion: version.Get().String(), - KubeProxyVersion: version.Get().String(), - }, - Capacity: api.ResourceList{ - api.ResourceCPU: *resource.NewMilliQuantity(2000, resource.DecimalSI), - api.ResourceMemory: *resource.NewQuantity(1024, resource.BinarySI), - api.ResourcePods: *resource.NewQuantity(0, resource.DecimalSI), - }, - Addresses: []api.NodeAddress{ - {Type: api.NodeLegacyHostIP, Address: "127.0.0.1"}, - {Type: api.NodeInternalIP, Address: "127.0.0.1"}, - }, - }, - } - - kubelet.updateRuntimeUp() - if err := kubelet.updateNodeStatus(); err != nil { - t.Errorf("unexpected error: %v", err) - } - actions := kubeClient.Actions() - if len(actions) != 2 { - t.Fatalf("unexpected actions: %v", actions) - } - if !actions[1].Matches("update", "nodes") || actions[1].GetSubresource() != "status" { - t.Fatalf("unexpected actions: %v", actions) - } - updatedNode, ok := actions[1].(testclient.UpdateAction).GetObject().(*api.Node) - if !ok { - t.Errorf("unexpected object type") - } - if updatedNode.Status.Conditions[0].LastHeartbeatTime.IsZero() { - t.Errorf("unexpected zero last probe timestamp") - } - if updatedNode.Status.Conditions[0].LastTransitionTime.IsZero() { - t.Errorf("unexpected zero last transition timestamp") - } - updatedNode.Status.Conditions[0].LastHeartbeatTime = util.Time{} - updatedNode.Status.Conditions[0].LastTransitionTime = util.Time{} - if !reflect.DeepEqual(expectedNode, updatedNode) { - t.Errorf("unexpected objects: %s", util.ObjectDiff(expectedNode, updatedNode)) - } -} - -func TestUpdateExistingNodeStatus(t *testing.T) { - testKubelet := newTestKubelet(t) - kubelet := testKubelet.kubelet - kubeClient := testKubelet.fakeKubeClient - kubeClient.ReactionChain = testclient.NewSimpleFake(&api.NodeList{Items: []api.Node{ - { - ObjectMeta: api.ObjectMeta{Name: testKubeletHostname}, - Spec: api.NodeSpec{}, - Status: api.NodeStatus{ - Conditions: []api.NodeCondition{ - { - Type: api.NodeReady, - Status: api.ConditionTrue, - Reason: "KubeletReady", - Message: fmt.Sprintf("kubelet is posting ready status"), - LastHeartbeatTime: util.Date(2012, 1, 1, 0, 0, 0, 0, time.UTC), - LastTransitionTime: util.Date(2012, 1, 1, 0, 0, 0, 0, time.UTC), - }, - }, - Capacity: api.ResourceList{ - api.ResourceCPU: *resource.NewMilliQuantity(3000, resource.DecimalSI), - api.ResourceMemory: *resource.NewQuantity(2048, resource.BinarySI), - api.ResourcePods: *resource.NewQuantity(0, resource.DecimalSI), - }, - }, - }, - }}).ReactionChain - mockCadvisor := testKubelet.fakeCadvisor - machineInfo := &cadvisorApi.MachineInfo{ - MachineID: "123", - SystemUUID: "abc", - BootID: "1b3", - NumCores: 2, - MemoryCapacity: 1024, - } - mockCadvisor.On("MachineInfo").Return(machineInfo, nil) - versionInfo := &cadvisorApi.VersionInfo{ - KernelVersion: "3.16.0-0.bpo.4-amd64", - ContainerOsVersion: "Debian GNU/Linux 7 (wheezy)", - DockerVersion: "1.5.0", - } - mockCadvisor.On("VersionInfo").Return(versionInfo, nil) - expectedNode := &api.Node{ - ObjectMeta: api.ObjectMeta{Name: testKubeletHostname}, - Spec: api.NodeSpec{}, - Status: api.NodeStatus{ - Conditions: []api.NodeCondition{ - { - Type: api.NodeReady, - Status: api.ConditionTrue, - Reason: "KubeletReady", - Message: fmt.Sprintf("kubelet is posting ready status"), - LastHeartbeatTime: util.Time{}, // placeholder - LastTransitionTime: util.Time{}, // placeholder - }, - }, - NodeInfo: api.NodeSystemInfo{ - MachineID: "123", - SystemUUID: "abc", - BootID: "1b3", - KernelVersion: "3.16.0-0.bpo.4-amd64", - OsImage: "Debian GNU/Linux 7 (wheezy)", - ContainerRuntimeVersion: "docker://1.5.0", - KubeletVersion: version.Get().String(), - KubeProxyVersion: version.Get().String(), - }, - Capacity: api.ResourceList{ - api.ResourceCPU: *resource.NewMilliQuantity(2000, resource.DecimalSI), - api.ResourceMemory: *resource.NewQuantity(1024, resource.BinarySI), - api.ResourcePods: *resource.NewQuantity(0, resource.DecimalSI), - }, - Addresses: []api.NodeAddress{ - {Type: api.NodeLegacyHostIP, Address: "127.0.0.1"}, - {Type: api.NodeInternalIP, Address: "127.0.0.1"}, - }, - }, - } - - kubelet.updateRuntimeUp() - if err := kubelet.updateNodeStatus(); err != nil { - t.Errorf("unexpected error: %v", err) - } - actions := kubeClient.Actions() - if len(actions) != 2 { - t.Errorf("unexpected actions: %v", actions) - } - updateAction, ok := actions[1].(testclient.UpdateAction) - if !ok { - t.Errorf("unexpected action type. expected UpdateAction, got %#v", actions[1]) - } - updatedNode, ok := updateAction.GetObject().(*api.Node) - if !ok { - t.Errorf("unexpected object type") - } - // Expect LastProbeTime to be updated to Now, while LastTransitionTime to be the same. - if reflect.DeepEqual(updatedNode.Status.Conditions[0].LastHeartbeatTime.Rfc3339Copy().UTC(), util.Date(2012, 1, 1, 0, 0, 0, 0, time.UTC).Time) { - t.Errorf("expected \n%v\n, got \n%v", util.Now(), util.Date(2012, 1, 1, 0, 0, 0, 0, time.UTC)) - } - if !reflect.DeepEqual(updatedNode.Status.Conditions[0].LastTransitionTime.Rfc3339Copy().UTC(), util.Date(2012, 1, 1, 0, 0, 0, 0, time.UTC).Time) { - t.Errorf("expected \n%#v\n, got \n%#v", updatedNode.Status.Conditions[0].LastTransitionTime.Rfc3339Copy(), - util.Date(2012, 1, 1, 0, 0, 0, 0, time.UTC)) - } - updatedNode.Status.Conditions[0].LastHeartbeatTime = util.Time{} - updatedNode.Status.Conditions[0].LastTransitionTime = util.Time{} - if !reflect.DeepEqual(expectedNode, updatedNode) { - t.Errorf("expected \n%v\n, got \n%v", expectedNode, updatedNode) - } -} - -func TestUpdateNodeStatusWithoutContainerRuntime(t *testing.T) { - testKubelet := newTestKubelet(t) - kubelet := testKubelet.kubelet - kubeClient := testKubelet.fakeKubeClient - fakeRuntime := testKubelet.fakeRuntime - // This causes returning an error from GetContainerRuntimeVersion() which - // simulates that container runtime is down. - fakeRuntime.VersionInfo = "" - - kubeClient.ReactionChain = testclient.NewSimpleFake(&api.NodeList{Items: []api.Node{ - {ObjectMeta: api.ObjectMeta{Name: testKubeletHostname}}, - }}).ReactionChain - mockCadvisor := testKubelet.fakeCadvisor - machineInfo := &cadvisorApi.MachineInfo{ - MachineID: "123", - SystemUUID: "abc", - BootID: "1b3", - NumCores: 2, - MemoryCapacity: 1024, - } - mockCadvisor.On("MachineInfo").Return(machineInfo, nil) - versionInfo := &cadvisorApi.VersionInfo{ - KernelVersion: "3.16.0-0.bpo.4-amd64", - ContainerOsVersion: "Debian GNU/Linux 7 (wheezy)", - DockerVersion: "1.5.0", - } - mockCadvisor.On("VersionInfo").Return(versionInfo, nil) - - expectedNode := &api.Node{ - ObjectMeta: api.ObjectMeta{Name: testKubeletHostname}, - Spec: api.NodeSpec{}, - Status: api.NodeStatus{ - Conditions: []api.NodeCondition{ - { - Type: api.NodeReady, - Status: api.ConditionFalse, - Reason: "KubeletNotReady", - Message: fmt.Sprintf("container runtime is down"), - LastHeartbeatTime: util.Time{}, - LastTransitionTime: util.Time{}, - }, - }, - NodeInfo: api.NodeSystemInfo{ - MachineID: "123", - SystemUUID: "abc", - BootID: "1b3", - KernelVersion: "3.16.0-0.bpo.4-amd64", - OsImage: "Debian GNU/Linux 7 (wheezy)", - ContainerRuntimeVersion: "docker://1.5.0", - KubeletVersion: version.Get().String(), - KubeProxyVersion: version.Get().String(), - }, - Capacity: api.ResourceList{ - api.ResourceCPU: *resource.NewMilliQuantity(2000, resource.DecimalSI), - api.ResourceMemory: *resource.NewQuantity(1024, resource.BinarySI), - api.ResourcePods: *resource.NewQuantity(0, resource.DecimalSI), - }, - Addresses: []api.NodeAddress{ - {Type: api.NodeLegacyHostIP, Address: "127.0.0.1"}, - {Type: api.NodeInternalIP, Address: "127.0.0.1"}, - }, - }, - } - - kubelet.runtimeUpThreshold = time.Duration(0) - kubelet.updateRuntimeUp() - if err := kubelet.updateNodeStatus(); err != nil { - t.Errorf("unexpected error: %v", err) - } - actions := kubeClient.Actions() - if len(actions) != 2 { - t.Fatalf("unexpected actions: %v", actions) - } - if !actions[1].Matches("update", "nodes") || actions[1].GetSubresource() != "status" { - t.Fatalf("unexpected actions: %v", actions) - } - updatedNode, ok := actions[1].(testclient.UpdateAction).GetObject().(*api.Node) - if !ok { - t.Errorf("unexpected action type. expected UpdateAction, got %#v", actions[1]) - } - - if updatedNode.Status.Conditions[0].LastHeartbeatTime.IsZero() { - t.Errorf("unexpected zero last probe timestamp") - } - if updatedNode.Status.Conditions[0].LastTransitionTime.IsZero() { - t.Errorf("unexpected zero last transition timestamp") - } - updatedNode.Status.Conditions[0].LastHeartbeatTime = util.Time{} - updatedNode.Status.Conditions[0].LastTransitionTime = util.Time{} - if !reflect.DeepEqual(expectedNode, updatedNode) { - t.Errorf("unexpected objects: %s", util.ObjectDiff(expectedNode, updatedNode)) - } -} - -func TestUpdateNodeStatusError(t *testing.T) { - testKubelet := newTestKubelet(t) - kubelet := testKubelet.kubelet - // No matching node for the kubelet - testKubelet.fakeKubeClient.ReactionChain = testclient.NewSimpleFake(&api.NodeList{Items: []api.Node{}}).ReactionChain - - if err := kubelet.updateNodeStatus(); err == nil { - t.Errorf("unexpected non error: %v", err) - } - if len(testKubelet.fakeKubeClient.Actions()) != nodeStatusUpdateRetry { - t.Errorf("unexpected actions: %v", testKubelet.fakeKubeClient.Actions()) - } -} - func TestCreateMirrorPod(t *testing.T) { for _, updateType := range []SyncPodType{SyncPodCreate, SyncPodUpdate} { testKubelet := newTestKubelet(t) @@ -3034,55 +2711,6 @@ func TestFilterOutTerminatedPods(t *testing.T) { } } -func TestRegisterExistingNodeWithApiserver(t *testing.T) { - testKubelet := newTestKubelet(t) - kubelet := testKubelet.kubelet - kubeClient := testKubelet.fakeKubeClient - kubeClient.AddReactor("create", "nodes", func(action testclient.Action) (bool, runtime.Object, error) { - // Return an error on create. - return true, &api.Node{}, &apierrors.StatusError{ - ErrStatus: api.Status{Reason: api.StatusReasonAlreadyExists}, - } - }) - kubeClient.AddReactor("get", "nodes", func(action testclient.Action) (bool, runtime.Object, error) { - // Return an existing (matching) node on get. - return true, &api.Node{ - ObjectMeta: api.ObjectMeta{Name: testKubeletHostname}, - Spec: api.NodeSpec{ExternalID: testKubeletHostname}, - }, nil - }) - kubeClient.AddReactor("*", "*", func(action testclient.Action) (bool, runtime.Object, error) { - return true, nil, fmt.Errorf("no reaction implemented for %s", action) - }) - machineInfo := &cadvisorApi.MachineInfo{ - MachineID: "123", - SystemUUID: "abc", - BootID: "1b3", - NumCores: 2, - MemoryCapacity: 1024, - } - mockCadvisor := testKubelet.fakeCadvisor - mockCadvisor.On("MachineInfo").Return(machineInfo, nil) - versionInfo := &cadvisorApi.VersionInfo{ - KernelVersion: "3.16.0-0.bpo.4-amd64", - ContainerOsVersion: "Debian GNU/Linux 7 (wheezy)", - DockerVersion: "1.5.0", - } - mockCadvisor.On("VersionInfo").Return(versionInfo, nil) - - done := make(chan struct{}) - go func() { - kubelet.registerWithApiserver() - done <- struct{}{} - }() - select { - case <-time.After(5 * time.Second): - t.Errorf("timed out waiting for registration") - case <-done: - return - } -} - func TestMakePortMappings(t *testing.T) { tests := []struct { container *api.Container diff --git a/pkg/kubelet/node_manager_test.go b/pkg/kubelet/node_manager_test.go new file mode 100644 index 00000000000..146627c71c8 --- /dev/null +++ b/pkg/kubelet/node_manager_test.go @@ -0,0 +1,444 @@ +/* +Copyright 2014 The Kubernetes Authors All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package kubelet + +import ( + "fmt" + "reflect" + "testing" + "time" + + cadvisorApi "github.com/google/cadvisor/info/v1" + "k8s.io/kubernetes/pkg/api" + apierrors "k8s.io/kubernetes/pkg/api/errors" + "k8s.io/kubernetes/pkg/api/resource" + "k8s.io/kubernetes/pkg/client/record" + "k8s.io/kubernetes/pkg/client/unversioned/testclient" + "k8s.io/kubernetes/pkg/runtime" + "k8s.io/kubernetes/pkg/util" + "k8s.io/kubernetes/pkg/version" +) + +type fakeInfoGetter struct { + machineInfo *cadvisorApi.MachineInfo + versionInfo *cadvisorApi.VersionInfo + runtimeUp bool + networkConfigured bool +} + +func (f *fakeInfoGetter) GetMachineInfo() (*cadvisorApi.MachineInfo, error) { + return f.machineInfo, nil +} + +func (f *fakeInfoGetter) GetVersionInfo() (*cadvisorApi.VersionInfo, error) { + return f.versionInfo, nil +} + +func (f *fakeInfoGetter) ContainerRuntimeUp() bool { + return f.runtimeUp +} + +func (f *fakeInfoGetter) NetworkConfigured() bool { + return f.networkConfigured +} + +var _ infoGetter = &fakeInfoGetter{} + +type testNodeLister struct { + nodes []api.Node +} + +func (ls testNodeLister) GetNodeInfo(id string) (*api.Node, error) { + for _, node := range ls.nodes { + if node.Name == id { + return &node, nil + } + } + return nil, fmt.Errorf("Node with name: %s does not exist", id) +} + +func (ls testNodeLister) List() (api.NodeList, error) { + return api.NodeList{ + Items: ls.nodes, + }, nil +} + +type testNodeManager struct { + fakeClient *testclient.Fake + fakeInfoGetter *fakeInfoGetter + nodeManager *realNodeManager +} + +func newTestNodeManager() *testNodeManager { + fakeRecorder := &record.FakeRecorder{} + fakeClient := &testclient.Fake{} + fakeInfoGetter := &fakeInfoGetter{} + nodeManager := newRealNodeManager(fakeClient, nil, true, time.Second, fakeRecorder, testKubeletHostname, + testKubeletHostname, "", 0, fakeInfoGetter, &api.NodeDaemonEndpoints{}, nil) + nodeManager.nodeLister = &testNodeLister{} + return &testNodeManager{fakeClient: fakeClient, fakeInfoGetter: fakeInfoGetter, nodeManager: nodeManager} +} + +func TestUpdateNewNodeStatus(t *testing.T) { + testNodeManager := newTestNodeManager() + nodeManager := testNodeManager.nodeManager + client := testNodeManager.fakeClient + fakeInfoGetter := testNodeManager.fakeInfoGetter + + client.ReactionChain = testclient.NewSimpleFake(&api.NodeList{Items: []api.Node{ + {ObjectMeta: api.ObjectMeta{Name: testKubeletHostname}}, + }}).ReactionChain + + fakeInfoGetter.machineInfo = &cadvisorApi.MachineInfo{ + MachineID: "123", + SystemUUID: "abc", + BootID: "1b3", + NumCores: 2, + MemoryCapacity: 1024, + } + fakeInfoGetter.versionInfo = &cadvisorApi.VersionInfo{ + KernelVersion: "3.16.0-0.bpo.4-amd64", + ContainerOsVersion: "Debian GNU/Linux 7 (wheezy)", + DockerVersion: "1.5.0", + } + + expectedNode := &api.Node{ + ObjectMeta: api.ObjectMeta{Name: testKubeletHostname}, + Spec: api.NodeSpec{}, + Status: api.NodeStatus{ + Conditions: []api.NodeCondition{ + { + Type: api.NodeReady, + Status: api.ConditionTrue, + Reason: "KubeletReady", + Message: fmt.Sprintf("kubelet is posting ready status"), + LastHeartbeatTime: util.Time{}, + LastTransitionTime: util.Time{}, + }, + }, + NodeInfo: api.NodeSystemInfo{ + MachineID: "123", + SystemUUID: "abc", + BootID: "1b3", + KernelVersion: "3.16.0-0.bpo.4-amd64", + OsImage: "Debian GNU/Linux 7 (wheezy)", + ContainerRuntimeVersion: "docker://1.5.0", + KubeletVersion: version.Get().String(), + KubeProxyVersion: version.Get().String(), + }, + Capacity: api.ResourceList{ + api.ResourceCPU: *resource.NewMilliQuantity(2000, resource.DecimalSI), + api.ResourceMemory: *resource.NewQuantity(1024, resource.BinarySI), + api.ResourcePods: *resource.NewQuantity(0, resource.DecimalSI), + }, + Addresses: []api.NodeAddress{ + {Type: api.NodeLegacyHostIP, Address: "127.0.0.1"}, + {Type: api.NodeInternalIP, Address: "127.0.0.1"}, + }, + }, + } + fakeInfoGetter.runtimeUp = true + fakeInfoGetter.networkConfigured = true + + if err := nodeManager.updateNodeStatus(); err != nil { + t.Errorf("unexpected error: %v", err) + } + actions := client.Actions() + if len(actions) != 2 { + t.Fatalf("unexpected actions: %v", actions) + } + if !actions[1].Matches("update", "nodes") || actions[1].GetSubresource() != "status" { + t.Fatalf("unexpected actions: %v", actions) + } + updatedNode, ok := actions[1].(testclient.UpdateAction).GetObject().(*api.Node) + if !ok { + t.Errorf("unexpected object type") + } + if updatedNode.Status.Conditions[0].LastHeartbeatTime.IsZero() { + t.Errorf("unexpected zero last probe timestamp") + } + if updatedNode.Status.Conditions[0].LastTransitionTime.IsZero() { + t.Errorf("unexpected zero last transition timestamp") + } + updatedNode.Status.Conditions[0].LastHeartbeatTime = util.Time{} + updatedNode.Status.Conditions[0].LastTransitionTime = util.Time{} + if !reflect.DeepEqual(expectedNode, updatedNode) { + t.Errorf("unexpected objects: %s", util.ObjectDiff(expectedNode, updatedNode)) + } +} + +func TestUpdateExistingNodeStatus(t *testing.T) { + testNodeManager := newTestNodeManager() + nodeManager := testNodeManager.nodeManager + client := testNodeManager.fakeClient + fakeInfoGetter := testNodeManager.fakeInfoGetter + + client.ReactionChain = testclient.NewSimpleFake(&api.NodeList{Items: []api.Node{ + { + ObjectMeta: api.ObjectMeta{Name: testKubeletHostname}, + Spec: api.NodeSpec{}, + Status: api.NodeStatus{ + Conditions: []api.NodeCondition{ + { + Type: api.NodeReady, + Status: api.ConditionTrue, + Reason: "KubeletReady", + Message: fmt.Sprintf("kubelet is posting ready status"), + LastHeartbeatTime: util.Date(2012, 1, 1, 0, 0, 0, 0, time.UTC), + LastTransitionTime: util.Date(2012, 1, 1, 0, 0, 0, 0, time.UTC), + }, + }, + Capacity: api.ResourceList{ + api.ResourceCPU: *resource.NewMilliQuantity(3000, resource.DecimalSI), + api.ResourceMemory: *resource.NewQuantity(2048, resource.BinarySI), + api.ResourcePods: *resource.NewQuantity(0, resource.DecimalSI), + }, + }, + }, + }}).ReactionChain + fakeInfoGetter.machineInfo = &cadvisorApi.MachineInfo{ + MachineID: "123", + SystemUUID: "abc", + BootID: "1b3", + NumCores: 2, + MemoryCapacity: 1024, + } + fakeInfoGetter.versionInfo = &cadvisorApi.VersionInfo{ + KernelVersion: "3.16.0-0.bpo.4-amd64", + ContainerOsVersion: "Debian GNU/Linux 7 (wheezy)", + DockerVersion: "1.5.0", + } + + expectedNode := &api.Node{ + ObjectMeta: api.ObjectMeta{Name: testKubeletHostname}, + Spec: api.NodeSpec{}, + Status: api.NodeStatus{ + Conditions: []api.NodeCondition{ + { + Type: api.NodeReady, + Status: api.ConditionTrue, + Reason: "KubeletReady", + Message: fmt.Sprintf("kubelet is posting ready status"), + LastHeartbeatTime: util.Time{}, // placeholder + LastTransitionTime: util.Time{}, // placeholder + }, + }, + NodeInfo: api.NodeSystemInfo{ + MachineID: "123", + SystemUUID: "abc", + BootID: "1b3", + KernelVersion: "3.16.0-0.bpo.4-amd64", + OsImage: "Debian GNU/Linux 7 (wheezy)", + ContainerRuntimeVersion: "docker://1.5.0", + KubeletVersion: version.Get().String(), + KubeProxyVersion: version.Get().String(), + }, + Capacity: api.ResourceList{ + api.ResourceCPU: *resource.NewMilliQuantity(2000, resource.DecimalSI), + api.ResourceMemory: *resource.NewQuantity(1024, resource.BinarySI), + api.ResourcePods: *resource.NewQuantity(0, resource.DecimalSI), + }, + Addresses: []api.NodeAddress{ + {Type: api.NodeLegacyHostIP, Address: "127.0.0.1"}, + {Type: api.NodeInternalIP, Address: "127.0.0.1"}, + }, + }, + } + fakeInfoGetter.runtimeUp = true + fakeInfoGetter.networkConfigured = true + + if err := nodeManager.updateNodeStatus(); err != nil { + t.Errorf("unexpected error: %v", err) + } + actions := client.Actions() + if len(actions) != 2 { + t.Errorf("unexpected actions: %v", actions) + } + updateAction, ok := actions[1].(testclient.UpdateAction) + if !ok { + t.Errorf("unexpected action type. expected UpdateAction, got %#v", actions[1]) + } + updatedNode, ok := updateAction.GetObject().(*api.Node) + if !ok { + t.Errorf("unexpected object type") + } + // Expect LastProbeTime to be updated to Now, while LastTransitionTime to be the same. + if reflect.DeepEqual(updatedNode.Status.Conditions[0].LastHeartbeatTime.Rfc3339Copy().UTC(), util.Date(2012, 1, 1, 0, 0, 0, 0, time.UTC).Time) { + t.Errorf("expected \n%v\n, got \n%v", util.Now(), util.Date(2012, 1, 1, 0, 0, 0, 0, time.UTC)) + } + if !reflect.DeepEqual(updatedNode.Status.Conditions[0].LastTransitionTime.Rfc3339Copy().UTC(), util.Date(2012, 1, 1, 0, 0, 0, 0, time.UTC).Time) { + t.Errorf("expected \n%#v\n, got \n%#v", updatedNode.Status.Conditions[0].LastTransitionTime.Rfc3339Copy(), + util.Date(2012, 1, 1, 0, 0, 0, 0, time.UTC)) + } + updatedNode.Status.Conditions[0].LastHeartbeatTime = util.Time{} + updatedNode.Status.Conditions[0].LastTransitionTime = util.Time{} + if !reflect.DeepEqual(expectedNode, updatedNode) { + t.Errorf("expected \n%v\n, got \n%v", expectedNode, updatedNode) + } +} + +func TestUpdateNodeStatusWithoutContainerRuntime(t *testing.T) { + testNodeManager := newTestNodeManager() + nodeManager := testNodeManager.nodeManager + client := testNodeManager.fakeClient + fakeInfoGetter := testNodeManager.fakeInfoGetter + + client.ReactionChain = testclient.NewSimpleFake(&api.NodeList{Items: []api.Node{ + {ObjectMeta: api.ObjectMeta{Name: testKubeletHostname}}, + }}).ReactionChain + + fakeInfoGetter.machineInfo = &cadvisorApi.MachineInfo{ + MachineID: "123", + SystemUUID: "abc", + BootID: "1b3", + NumCores: 2, + MemoryCapacity: 1024, + } + fakeInfoGetter.versionInfo = &cadvisorApi.VersionInfo{ + KernelVersion: "3.16.0-0.bpo.4-amd64", + ContainerOsVersion: "Debian GNU/Linux 7 (wheezy)", + DockerVersion: "1.5.0", + } + + expectedNode := &api.Node{ + ObjectMeta: api.ObjectMeta{Name: testKubeletHostname}, + Spec: api.NodeSpec{}, + Status: api.NodeStatus{ + Conditions: []api.NodeCondition{ + { + Type: api.NodeReady, + Status: api.ConditionFalse, + Reason: "KubeletNotReady", + Message: fmt.Sprintf("container runtime is down"), + LastHeartbeatTime: util.Time{}, + LastTransitionTime: util.Time{}, + }, + }, + NodeInfo: api.NodeSystemInfo{ + MachineID: "123", + SystemUUID: "abc", + BootID: "1b3", + KernelVersion: "3.16.0-0.bpo.4-amd64", + OsImage: "Debian GNU/Linux 7 (wheezy)", + ContainerRuntimeVersion: "docker://1.5.0", + KubeletVersion: version.Get().String(), + KubeProxyVersion: version.Get().String(), + }, + Capacity: api.ResourceList{ + api.ResourceCPU: *resource.NewMilliQuantity(2000, resource.DecimalSI), + api.ResourceMemory: *resource.NewQuantity(1024, resource.BinarySI), + api.ResourcePods: *resource.NewQuantity(0, resource.DecimalSI), + }, + Addresses: []api.NodeAddress{ + {Type: api.NodeLegacyHostIP, Address: "127.0.0.1"}, + {Type: api.NodeInternalIP, Address: "127.0.0.1"}, + }, + }, + } + // Pretend that container runtime is down. + fakeInfoGetter.runtimeUp = false + fakeInfoGetter.networkConfigured = true + + if err := nodeManager.updateNodeStatus(); err != nil { + t.Errorf("unexpected error: %v", err) + } + actions := client.Actions() + if len(actions) != 2 { + t.Fatalf("unexpected actions: %v", actions) + } + if !actions[1].Matches("update", "nodes") || actions[1].GetSubresource() != "status" { + t.Fatalf("unexpected actions: %v", actions) + } + updatedNode, ok := actions[1].(testclient.UpdateAction).GetObject().(*api.Node) + if !ok { + t.Errorf("unexpected action type. expected UpdateAction, got %#v", actions[1]) + } + + if updatedNode.Status.Conditions[0].LastHeartbeatTime.IsZero() { + t.Errorf("unexpected zero last probe timestamp") + } + if updatedNode.Status.Conditions[0].LastTransitionTime.IsZero() { + t.Errorf("unexpected zero last transition timestamp") + } + updatedNode.Status.Conditions[0].LastHeartbeatTime = util.Time{} + updatedNode.Status.Conditions[0].LastTransitionTime = util.Time{} + if !reflect.DeepEqual(expectedNode, updatedNode) { + t.Errorf("unexpected objects: %s", util.ObjectDiff(expectedNode, updatedNode)) + } +} + +func TestUpdateNodeStatusError(t *testing.T) { + testNodeManager := newTestNodeManager() + nodeManager := testNodeManager.nodeManager + client := testNodeManager.fakeClient + // No matching node for the kubelet + client.ReactionChain = testclient.NewSimpleFake(&api.NodeList{Items: []api.Node{}}).ReactionChain + + if err := nodeManager.updateNodeStatus(); err == nil { + t.Errorf("unexpected non error: %v", err) + } + if len(client.Actions()) != nodeStatusUpdateRetry { + t.Errorf("unexpected actions: %v", client.Actions()) + } +} + +func TestRegisterExistingNodeWithApiserver(t *testing.T) { + testNodeManager := newTestNodeManager() + nodeManager := testNodeManager.nodeManager + client := testNodeManager.fakeClient + fakeInfoGetter := testNodeManager.fakeInfoGetter + client.AddReactor("create", "nodes", func(action testclient.Action) (bool, runtime.Object, error) { + // Return an error on create. + return true, &api.Node{}, &apierrors.StatusError{ + ErrStatus: api.Status{Reason: api.StatusReasonAlreadyExists}, + } + }) + client.AddReactor("get", "nodes", func(action testclient.Action) (bool, runtime.Object, error) { + // Return an existing (matching) node on get. + return true, &api.Node{ + ObjectMeta: api.ObjectMeta{Name: testKubeletHostname}, + Spec: api.NodeSpec{ExternalID: testKubeletHostname}, + }, nil + }) + client.AddReactor("*", "*", func(action testclient.Action) (bool, runtime.Object, error) { + return true, nil, fmt.Errorf("no reaction implemented for %s", action) + }) + fakeInfoGetter.machineInfo = &cadvisorApi.MachineInfo{ + MachineID: "123", + SystemUUID: "abc", + BootID: "1b3", + NumCores: 2, + MemoryCapacity: 1024, + } + fakeInfoGetter.versionInfo = &cadvisorApi.VersionInfo{ + KernelVersion: "3.16.0-0.bpo.4-amd64", + ContainerOsVersion: "Debian GNU/Linux 7 (wheezy)", + DockerVersion: "1.5.0", + } + + done := make(chan struct{}) + go func() { + nodeManager.registerWithApiserver() + done <- struct{}{} + }() + select { + case <-time.After(5 * time.Second): + t.Errorf("timed out waiting for registration") + case <-done: + return + } +} diff --git a/pkg/kubelet/runonce_test.go b/pkg/kubelet/runonce_test.go index f236ebfa6cd..e5c25b10fff 100644 --- a/pkg/kubelet/runonce_test.go +++ b/pkg/kubelet/runonce_test.go @@ -83,11 +83,11 @@ func TestRunOnce(t *testing.T) { rootDirectory: "/tmp/kubelet", recorder: &record.FakeRecorder{}, cadvisor: cadvisor, - nodeLister: testNodeLister{}, statusManager: status.NewManager(nil), containerRefManager: kubecontainer.NewRefManager(), readinessManager: kubecontainer.NewReadinessManager(), podManager: podManager, + nodeManager: &fakeNodeManager{}, os: kubecontainer.FakeOS{}, volumeManager: newVolumeManager(), diskSpaceManager: diskSpaceManager, diff --git a/pkg/kubelet/server_test.go b/pkg/kubelet/server_test.go index bd4fdc03f49..213b49cbda5 100644 --- a/pkg/kubelet/server_test.go +++ b/pkg/kubelet/server_test.go @@ -85,7 +85,7 @@ func (fk *fakeKubelet) GetContainerRuntimeVersion() (kubecontainer.Version, erro return fk.containerVersionFunc() } -func (fk *fakeKubelet) GetCachedMachineInfo() (*cadvisorApi.MachineInfo, error) { +func (fk *fakeKubelet) GetMachineInfo() (*cadvisorApi.MachineInfo, error) { return fk.machineInfoFunc() }