Revert "Move and fix nodeManager related unit tests"

This reverts commit 865dc19957.
This commit is contained in:
Yu-Ju Hong
2015-09-21 10:58:54 -07:00
parent 843134885e
commit 7bdf9bfdfa
5 changed files with 382 additions and 501 deletions

View File

@@ -34,6 +34,7 @@ import (
cadvisorApi "github.com/google/cadvisor/info/v1"
cadvisorApiv2 "github.com/google/cadvisor/info/v2"
"k8s.io/kubernetes/pkg/api"
apierrors "k8s.io/kubernetes/pkg/api/errors"
"k8s.io/kubernetes/pkg/api/resource"
"k8s.io/kubernetes/pkg/api/testapi"
"k8s.io/kubernetes/pkg/api/unversioned"
@@ -45,9 +46,11 @@ import (
kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
"k8s.io/kubernetes/pkg/kubelet/network"
"k8s.io/kubernetes/pkg/kubelet/status"
"k8s.io/kubernetes/pkg/runtime"
"k8s.io/kubernetes/pkg/types"
"k8s.io/kubernetes/pkg/util"
"k8s.io/kubernetes/pkg/util/bandwidth"
"k8s.io/kubernetes/pkg/version"
"k8s.io/kubernetes/pkg/volume"
_ "k8s.io/kubernetes/pkg/volume/host_path"
)
@@ -75,7 +78,6 @@ type TestKubelet struct {
fakeCadvisor *cadvisor.Mock
fakeKubeClient *testclient.Fake
fakeMirrorClient *fakeMirrorClient
fakeNodeManager *fakeNodeManager
}
func newTestKubelet(t *testing.T) *TestKubelet {
@@ -102,15 +104,14 @@ func newTestKubelet(t *testing.T) *TestKubelet {
kubelet.sourcesReady = func() bool { return true }
kubelet.masterServiceNamespace = api.NamespaceDefault
kubelet.serviceLister = testServiceLister{}
kubelet.nodeLister = testNodeLister{}
kubelet.readinessManager = kubecontainer.NewReadinessManager()
kubelet.recorder = fakeRecorder
kubelet.statusManager = status.NewManager(fakeKubeClient)
if err := kubelet.setupDataDirs(); err != nil {
t.Fatalf("can't initialize kubelet data dirs: %v", err)
}
fakeNodeManager := &fakeNodeManager{}
kubelet.nodeManager = fakeNodeManager
kubelet.daemonEndpoints = &api.NodeDaemonEndpoints{}
mockCadvisor := &cadvisor.Mock{}
kubelet.cadvisor = mockCadvisor
podManager, fakeMirrorClient := newFakePodManager()
@@ -136,7 +137,7 @@ func newTestKubelet(t *testing.T) *TestKubelet {
kubelet.backOff = util.NewBackOff(time.Second, time.Minute)
kubelet.backOff.Clock = fakeClock
kubelet.podKillingCh = make(chan *kubecontainer.Pod, 20)
return &TestKubelet{kubelet, fakeRuntime, mockCadvisor, fakeKubeClient, fakeMirrorClient, fakeNodeManager}
return &TestKubelet{kubelet, fakeRuntime, mockCadvisor, fakeKubeClient, fakeMirrorClient}
}
func newTestPods(count int) []*api.Pod {
@@ -962,6 +963,25 @@ func (ls testServiceLister) List() (api.ServiceList, error) {
}, nil
}
type testNodeLister struct {
nodes []api.Node
}
func (ls testNodeLister) GetNodeInfo(id string) (*api.Node, error) {
for _, node := range ls.nodes {
if node.Name == id {
return &node, nil
}
}
return nil, fmt.Errorf("Node with name: %s does not exist", id)
}
func (ls testNodeLister) List() (api.NodeList, error) {
return api.NodeList{
Items: ls.nodes,
}, nil
}
type envs []kubecontainer.EnvVar
func (e envs) Len() int {
@@ -2154,10 +2174,10 @@ func TestHandlePortConflicts(t *testing.T) {
// Tests that we handle not matching labels selector correctly by setting the failed status in status map.
func TestHandleNodeSelector(t *testing.T) {
testKubelet := newTestKubelet(t)
testKubelet.fakeNodeManager.node = &api.Node{
ObjectMeta: api.ObjectMeta{Name: testKubeletHostname, Labels: map[string]string{"key": "B"}}}
kl := testKubelet.kubelet
kl.nodeLister = testNodeLister{nodes: []api.Node{
{ObjectMeta: api.ObjectMeta{Name: testKubeletHostname, Labels: map[string]string{"key": "B"}}},
}}
testKubelet.fakeCadvisor.On("MachineInfo").Return(&cadvisorApi.MachineInfo{}, nil)
testKubelet.fakeCadvisor.On("DockerImagesFsInfo").Return(cadvisorApiv2.FsInfo{}, nil)
testKubelet.fakeCadvisor.On("RootFsInfo").Return(cadvisorApiv2.FsInfo{}, nil)
@@ -2368,6 +2388,309 @@ func TestValidateContainerStatus(t *testing.T) {
}
}
func TestUpdateNewNodeStatus(t *testing.T) {
testKubelet := newTestKubelet(t)
kubelet := testKubelet.kubelet
kubeClient := testKubelet.fakeKubeClient
kubeClient.ReactionChain = testclient.NewSimpleFake(&api.NodeList{Items: []api.Node{
{ObjectMeta: api.ObjectMeta{Name: testKubeletHostname}},
}}).ReactionChain
machineInfo := &cadvisorApi.MachineInfo{
MachineID: "123",
SystemUUID: "abc",
BootID: "1b3",
NumCores: 2,
MemoryCapacity: 1024,
}
mockCadvisor := testKubelet.fakeCadvisor
mockCadvisor.On("MachineInfo").Return(machineInfo, nil)
versionInfo := &cadvisorApi.VersionInfo{
KernelVersion: "3.16.0-0.bpo.4-amd64",
ContainerOsVersion: "Debian GNU/Linux 7 (wheezy)",
DockerVersion: "1.5.0",
}
mockCadvisor.On("VersionInfo").Return(versionInfo, nil)
expectedNode := &api.Node{
ObjectMeta: api.ObjectMeta{Name: testKubeletHostname},
Spec: api.NodeSpec{},
Status: api.NodeStatus{
Conditions: []api.NodeCondition{
{
Type: api.NodeReady,
Status: api.ConditionTrue,
Reason: "KubeletReady",
Message: fmt.Sprintf("kubelet is posting ready status"),
LastHeartbeatTime: util.Time{},
LastTransitionTime: util.Time{},
},
},
NodeInfo: api.NodeSystemInfo{
MachineID: "123",
SystemUUID: "abc",
BootID: "1b3",
KernelVersion: "3.16.0-0.bpo.4-amd64",
OsImage: "Debian GNU/Linux 7 (wheezy)",
ContainerRuntimeVersion: "docker://1.5.0",
KubeletVersion: version.Get().String(),
KubeProxyVersion: version.Get().String(),
},
Capacity: api.ResourceList{
api.ResourceCPU: *resource.NewMilliQuantity(2000, resource.DecimalSI),
api.ResourceMemory: *resource.NewQuantity(1024, resource.BinarySI),
api.ResourcePods: *resource.NewQuantity(0, resource.DecimalSI),
},
Addresses: []api.NodeAddress{
{Type: api.NodeLegacyHostIP, Address: "127.0.0.1"},
{Type: api.NodeInternalIP, Address: "127.0.0.1"},
},
},
}
kubelet.updateRuntimeUp()
if err := kubelet.updateNodeStatus(); err != nil {
t.Errorf("unexpected error: %v", err)
}
actions := kubeClient.Actions()
if len(actions) != 2 {
t.Fatalf("unexpected actions: %v", actions)
}
if !actions[1].Matches("update", "nodes") || actions[1].GetSubresource() != "status" {
t.Fatalf("unexpected actions: %v", actions)
}
updatedNode, ok := actions[1].(testclient.UpdateAction).GetObject().(*api.Node)
if !ok {
t.Errorf("unexpected object type")
}
if updatedNode.Status.Conditions[0].LastHeartbeatTime.IsZero() {
t.Errorf("unexpected zero last probe timestamp")
}
if updatedNode.Status.Conditions[0].LastTransitionTime.IsZero() {
t.Errorf("unexpected zero last transition timestamp")
}
updatedNode.Status.Conditions[0].LastHeartbeatTime = util.Time{}
updatedNode.Status.Conditions[0].LastTransitionTime = util.Time{}
if !reflect.DeepEqual(expectedNode, updatedNode) {
t.Errorf("unexpected objects: %s", util.ObjectDiff(expectedNode, updatedNode))
}
}
func TestUpdateExistingNodeStatus(t *testing.T) {
testKubelet := newTestKubelet(t)
kubelet := testKubelet.kubelet
kubeClient := testKubelet.fakeKubeClient
kubeClient.ReactionChain = testclient.NewSimpleFake(&api.NodeList{Items: []api.Node{
{
ObjectMeta: api.ObjectMeta{Name: testKubeletHostname},
Spec: api.NodeSpec{},
Status: api.NodeStatus{
Conditions: []api.NodeCondition{
{
Type: api.NodeReady,
Status: api.ConditionTrue,
Reason: "KubeletReady",
Message: fmt.Sprintf("kubelet is posting ready status"),
LastHeartbeatTime: util.Date(2012, 1, 1, 0, 0, 0, 0, time.UTC),
LastTransitionTime: util.Date(2012, 1, 1, 0, 0, 0, 0, time.UTC),
},
},
Capacity: api.ResourceList{
api.ResourceCPU: *resource.NewMilliQuantity(3000, resource.DecimalSI),
api.ResourceMemory: *resource.NewQuantity(2048, resource.BinarySI),
api.ResourcePods: *resource.NewQuantity(0, resource.DecimalSI),
},
},
},
}}).ReactionChain
mockCadvisor := testKubelet.fakeCadvisor
machineInfo := &cadvisorApi.MachineInfo{
MachineID: "123",
SystemUUID: "abc",
BootID: "1b3",
NumCores: 2,
MemoryCapacity: 1024,
}
mockCadvisor.On("MachineInfo").Return(machineInfo, nil)
versionInfo := &cadvisorApi.VersionInfo{
KernelVersion: "3.16.0-0.bpo.4-amd64",
ContainerOsVersion: "Debian GNU/Linux 7 (wheezy)",
DockerVersion: "1.5.0",
}
mockCadvisor.On("VersionInfo").Return(versionInfo, nil)
expectedNode := &api.Node{
ObjectMeta: api.ObjectMeta{Name: testKubeletHostname},
Spec: api.NodeSpec{},
Status: api.NodeStatus{
Conditions: []api.NodeCondition{
{
Type: api.NodeReady,
Status: api.ConditionTrue,
Reason: "KubeletReady",
Message: fmt.Sprintf("kubelet is posting ready status"),
LastHeartbeatTime: util.Time{}, // placeholder
LastTransitionTime: util.Time{}, // placeholder
},
},
NodeInfo: api.NodeSystemInfo{
MachineID: "123",
SystemUUID: "abc",
BootID: "1b3",
KernelVersion: "3.16.0-0.bpo.4-amd64",
OsImage: "Debian GNU/Linux 7 (wheezy)",
ContainerRuntimeVersion: "docker://1.5.0",
KubeletVersion: version.Get().String(),
KubeProxyVersion: version.Get().String(),
},
Capacity: api.ResourceList{
api.ResourceCPU: *resource.NewMilliQuantity(2000, resource.DecimalSI),
api.ResourceMemory: *resource.NewQuantity(1024, resource.BinarySI),
api.ResourcePods: *resource.NewQuantity(0, resource.DecimalSI),
},
Addresses: []api.NodeAddress{
{Type: api.NodeLegacyHostIP, Address: "127.0.0.1"},
{Type: api.NodeInternalIP, Address: "127.0.0.1"},
},
},
}
kubelet.updateRuntimeUp()
if err := kubelet.updateNodeStatus(); err != nil {
t.Errorf("unexpected error: %v", err)
}
actions := kubeClient.Actions()
if len(actions) != 2 {
t.Errorf("unexpected actions: %v", actions)
}
updateAction, ok := actions[1].(testclient.UpdateAction)
if !ok {
t.Errorf("unexpected action type. expected UpdateAction, got %#v", actions[1])
}
updatedNode, ok := updateAction.GetObject().(*api.Node)
if !ok {
t.Errorf("unexpected object type")
}
// Expect LastProbeTime to be updated to Now, while LastTransitionTime to be the same.
if reflect.DeepEqual(updatedNode.Status.Conditions[0].LastHeartbeatTime.Rfc3339Copy().UTC(), util.Date(2012, 1, 1, 0, 0, 0, 0, time.UTC).Time) {
t.Errorf("expected \n%v\n, got \n%v", util.Now(), util.Date(2012, 1, 1, 0, 0, 0, 0, time.UTC))
}
if !reflect.DeepEqual(updatedNode.Status.Conditions[0].LastTransitionTime.Rfc3339Copy().UTC(), util.Date(2012, 1, 1, 0, 0, 0, 0, time.UTC).Time) {
t.Errorf("expected \n%#v\n, got \n%#v", updatedNode.Status.Conditions[0].LastTransitionTime.Rfc3339Copy(),
util.Date(2012, 1, 1, 0, 0, 0, 0, time.UTC))
}
updatedNode.Status.Conditions[0].LastHeartbeatTime = util.Time{}
updatedNode.Status.Conditions[0].LastTransitionTime = util.Time{}
if !reflect.DeepEqual(expectedNode, updatedNode) {
t.Errorf("expected \n%v\n, got \n%v", expectedNode, updatedNode)
}
}
func TestUpdateNodeStatusWithoutContainerRuntime(t *testing.T) {
testKubelet := newTestKubelet(t)
kubelet := testKubelet.kubelet
kubeClient := testKubelet.fakeKubeClient
fakeRuntime := testKubelet.fakeRuntime
// This causes returning an error from GetContainerRuntimeVersion() which
// simulates that container runtime is down.
fakeRuntime.VersionInfo = ""
kubeClient.ReactionChain = testclient.NewSimpleFake(&api.NodeList{Items: []api.Node{
{ObjectMeta: api.ObjectMeta{Name: testKubeletHostname}},
}}).ReactionChain
mockCadvisor := testKubelet.fakeCadvisor
machineInfo := &cadvisorApi.MachineInfo{
MachineID: "123",
SystemUUID: "abc",
BootID: "1b3",
NumCores: 2,
MemoryCapacity: 1024,
}
mockCadvisor.On("MachineInfo").Return(machineInfo, nil)
versionInfo := &cadvisorApi.VersionInfo{
KernelVersion: "3.16.0-0.bpo.4-amd64",
ContainerOsVersion: "Debian GNU/Linux 7 (wheezy)",
DockerVersion: "1.5.0",
}
mockCadvisor.On("VersionInfo").Return(versionInfo, nil)
expectedNode := &api.Node{
ObjectMeta: api.ObjectMeta{Name: testKubeletHostname},
Spec: api.NodeSpec{},
Status: api.NodeStatus{
Conditions: []api.NodeCondition{
{
Type: api.NodeReady,
Status: api.ConditionFalse,
Reason: "KubeletNotReady",
Message: fmt.Sprintf("container runtime is down"),
LastHeartbeatTime: util.Time{},
LastTransitionTime: util.Time{},
},
},
NodeInfo: api.NodeSystemInfo{
MachineID: "123",
SystemUUID: "abc",
BootID: "1b3",
KernelVersion: "3.16.0-0.bpo.4-amd64",
OsImage: "Debian GNU/Linux 7 (wheezy)",
ContainerRuntimeVersion: "docker://1.5.0",
KubeletVersion: version.Get().String(),
KubeProxyVersion: version.Get().String(),
},
Capacity: api.ResourceList{
api.ResourceCPU: *resource.NewMilliQuantity(2000, resource.DecimalSI),
api.ResourceMemory: *resource.NewQuantity(1024, resource.BinarySI),
api.ResourcePods: *resource.NewQuantity(0, resource.DecimalSI),
},
Addresses: []api.NodeAddress{
{Type: api.NodeLegacyHostIP, Address: "127.0.0.1"},
{Type: api.NodeInternalIP, Address: "127.0.0.1"},
},
},
}
kubelet.runtimeUpThreshold = time.Duration(0)
kubelet.updateRuntimeUp()
if err := kubelet.updateNodeStatus(); err != nil {
t.Errorf("unexpected error: %v", err)
}
actions := kubeClient.Actions()
if len(actions) != 2 {
t.Fatalf("unexpected actions: %v", actions)
}
if !actions[1].Matches("update", "nodes") || actions[1].GetSubresource() != "status" {
t.Fatalf("unexpected actions: %v", actions)
}
updatedNode, ok := actions[1].(testclient.UpdateAction).GetObject().(*api.Node)
if !ok {
t.Errorf("unexpected action type. expected UpdateAction, got %#v", actions[1])
}
if updatedNode.Status.Conditions[0].LastHeartbeatTime.IsZero() {
t.Errorf("unexpected zero last probe timestamp")
}
if updatedNode.Status.Conditions[0].LastTransitionTime.IsZero() {
t.Errorf("unexpected zero last transition timestamp")
}
updatedNode.Status.Conditions[0].LastHeartbeatTime = util.Time{}
updatedNode.Status.Conditions[0].LastTransitionTime = util.Time{}
if !reflect.DeepEqual(expectedNode, updatedNode) {
t.Errorf("unexpected objects: %s", util.ObjectDiff(expectedNode, updatedNode))
}
}
func TestUpdateNodeStatusError(t *testing.T) {
testKubelet := newTestKubelet(t)
kubelet := testKubelet.kubelet
// No matching node for the kubelet
testKubelet.fakeKubeClient.ReactionChain = testclient.NewSimpleFake(&api.NodeList{Items: []api.Node{}}).ReactionChain
if err := kubelet.updateNodeStatus(); err == nil {
t.Errorf("unexpected non error: %v", err)
}
if len(testKubelet.fakeKubeClient.Actions()) != nodeStatusUpdateRetry {
t.Errorf("unexpected actions: %v", testKubelet.fakeKubeClient.Actions())
}
}
func TestCreateMirrorPod(t *testing.T) {
for _, updateType := range []SyncPodType{SyncPodCreate, SyncPodUpdate} {
testKubelet := newTestKubelet(t)
@@ -2743,6 +3066,55 @@ func TestFilterOutTerminatedPods(t *testing.T) {
}
}
func TestRegisterExistingNodeWithApiserver(t *testing.T) {
testKubelet := newTestKubelet(t)
kubelet := testKubelet.kubelet
kubeClient := testKubelet.fakeKubeClient
kubeClient.AddReactor("create", "nodes", func(action testclient.Action) (bool, runtime.Object, error) {
// Return an error on create.
return true, &api.Node{}, &apierrors.StatusError{
ErrStatus: api.Status{Reason: api.StatusReasonAlreadyExists},
}
})
kubeClient.AddReactor("get", "nodes", func(action testclient.Action) (bool, runtime.Object, error) {
// Return an existing (matching) node on get.
return true, &api.Node{
ObjectMeta: api.ObjectMeta{Name: testKubeletHostname},
Spec: api.NodeSpec{ExternalID: testKubeletHostname},
}, nil
})
kubeClient.AddReactor("*", "*", func(action testclient.Action) (bool, runtime.Object, error) {
return true, nil, fmt.Errorf("no reaction implemented for %s", action)
})
machineInfo := &cadvisorApi.MachineInfo{
MachineID: "123",
SystemUUID: "abc",
BootID: "1b3",
NumCores: 2,
MemoryCapacity: 1024,
}
mockCadvisor := testKubelet.fakeCadvisor
mockCadvisor.On("MachineInfo").Return(machineInfo, nil)
versionInfo := &cadvisorApi.VersionInfo{
KernelVersion: "3.16.0-0.bpo.4-amd64",
ContainerOsVersion: "Debian GNU/Linux 7 (wheezy)",
DockerVersion: "1.5.0",
}
mockCadvisor.On("VersionInfo").Return(versionInfo, nil)
done := make(chan struct{})
go func() {
kubelet.registerWithApiserver()
done <- struct{}{}
}()
select {
case <-time.After(5 * time.Second):
t.Errorf("timed out waiting for registration")
case <-done:
return
}
}
func TestMakePortMappings(t *testing.T) {
tests := []struct {
container *api.Container