From 9865ac325c7af23b88fdd363dffe9a32f65b242f Mon Sep 17 00:00:00 2001 From: Dan Williams Date: Mon, 20 Jun 2016 17:14:08 -0500 Subject: [PATCH] kubelet/cni: make cni plugin runtime agnostic Use the generic runtime method to get the netns path. Also move reading the container IP address into cni (based off kubenet) instead of having it in the Docker manager code. Both old and new methods use nsenter and /sbin/ip and should be functionally equivalent. --- pkg/kubelet/container/runtime_cache_test.go | 30 +++-- pkg/kubelet/container/testing/fake_runtime.go | 34 ++++- pkg/kubelet/dockertools/docker_manager.go | 35 ------ pkg/kubelet/image_manager_test.go | 76 ++++++------ pkg/kubelet/kubelet_cadvisor_test.go | 20 +-- pkg/kubelet/kubelet_test.go | 58 ++++----- pkg/kubelet/network/cni/cni.go | 77 ++++++++---- pkg/kubelet/network/cni/cni_test.go | 117 +++++++++++------- pkg/kubelet/pleg/generic_test.go | 40 +++--- 9 files changed, 269 insertions(+), 218 deletions(-) diff --git a/pkg/kubelet/container/runtime_cache_test.go b/pkg/kubelet/container/runtime_cache_test.go index d66e07bfc58..e98b8f7ca3e 100644 --- a/pkg/kubelet/container/runtime_cache_test.go +++ b/pkg/kubelet/container/runtime_cache_test.go @@ -25,18 +25,28 @@ import ( ctest "k8s.io/kubernetes/pkg/kubelet/container/testing" ) +func comparePods(t *testing.T, expected []*ctest.FakePod, actual []*Pod) { + if len(expected) != len(actual) { + t.Errorf("expected %d pods, got %d instead", len(expected), len(actual)) + } + for i := range expected { + if !reflect.DeepEqual(expected[i].Pod, actual[i]) { + t.Errorf("expected %#v, got %#v", expected[i].Pod, actual[i]) + } + } +} + func TestGetPods(t *testing.T) { runtime := &ctest.FakeRuntime{} - expected := []*Pod{{ID: "1111"}, {ID: "2222"}, {ID: "3333"}} + expected := []*ctest.FakePod{{Pod: &Pod{ID: "1111"}}, {Pod: &Pod{ID: "2222"}}, {Pod: &Pod{ID: "3333"}}} runtime.PodList = expected cache := NewTestRuntimeCache(runtime) actual, err := cache.GetPods() if err != nil { t.Errorf("unexpected error %v", err) } - if !reflect.DeepEqual(expected, actual) { - t.Errorf("expected %#v, got %#v", expected, actual) - } + + comparePods(t, expected, actual) } func TestForceUpdateIfOlder(t *testing.T) { @@ -44,25 +54,21 @@ func TestForceUpdateIfOlder(t *testing.T) { cache := NewTestRuntimeCache(runtime) // Cache old pods. - oldpods := []*Pod{{ID: "1111"}} + oldpods := []*ctest.FakePod{{Pod: &Pod{ID: "1111"}}} runtime.PodList = oldpods cache.UpdateCacheWithLock() // Update the runtime to new pods. - newpods := []*Pod{{ID: "1111"}, {ID: "2222"}, {ID: "3333"}} + newpods := []*ctest.FakePod{{Pod: &Pod{ID: "1111"}}, {Pod: &Pod{ID: "2222"}}, {Pod: &Pod{ID: "3333"}}} runtime.PodList = newpods // An older timestamp should not force an update. cache.ForceUpdateIfOlder(time.Now().Add(-20 * time.Minute)) actual := cache.GetCachedPods() - if !reflect.DeepEqual(oldpods, actual) { - t.Errorf("expected %#v, got %#v", oldpods, actual) - } + comparePods(t, oldpods, actual) // A newer timestamp should force an update. cache.ForceUpdateIfOlder(time.Now().Add(20 * time.Second)) actual = cache.GetCachedPods() - if !reflect.DeepEqual(newpods, actual) { - t.Errorf("expected %#v, got %#v", newpods, actual) - } + comparePods(t, newpods, actual) } diff --git a/pkg/kubelet/container/testing/fake_runtime.go b/pkg/kubelet/container/testing/fake_runtime.go index fece66685da..9431d61b900 100644 --- a/pkg/kubelet/container/testing/fake_runtime.go +++ b/pkg/kubelet/container/testing/fake_runtime.go @@ -30,12 +30,17 @@ import ( "k8s.io/kubernetes/pkg/volume" ) +type FakePod struct { + Pod *Pod + NetnsPath string +} + // FakeRuntime is a fake container runtime for testing. type FakeRuntime struct { sync.Mutex CalledFunctions []string - PodList []*Pod - AllPodList []*Pod + PodList []*FakePod + AllPodList []*FakePod ImageList []Image APIPodStatus api.PodStatus PodStatus PodStatus @@ -98,8 +103,8 @@ func (f *FakeRuntime) ClearCalls() { defer f.Unlock() f.CalledFunctions = []string{} - f.PodList = []*Pod{} - f.AllPodList = []*Pod{} + f.PodList = []*FakePod{} + f.AllPodList = []*FakePod{} f.APIPodStatus = api.PodStatus{} f.StartedPods = []string{} f.KilledPods = []string{} @@ -182,11 +187,19 @@ func (f *FakeRuntime) GetPods(all bool) ([]*Pod, error) { f.Lock() defer f.Unlock() + var pods []*Pod + f.CalledFunctions = append(f.CalledFunctions, "GetPods") if all { - return f.AllPodList, f.Err + for _, fakePod := range f.AllPodList { + pods = append(pods, fakePod.Pod) + } + } else { + for _, fakePod := range f.PodList { + pods = append(pods, fakePod.Pod) + } } - return f.PodList, f.Err + return pods, f.Err } func (f *FakeRuntime) SyncPod(pod *api.Pod, _ api.PodStatus, _ *PodStatus, _ []api.Secret, backOff *flowcontrol.Backoff) (result PodSyncResult) { @@ -343,6 +356,15 @@ func (f *FakeRuntime) GetNetNS(containerID ContainerID) (string, error) { defer f.Unlock() f.CalledFunctions = append(f.CalledFunctions, "GetNetNS") + + for _, fp := range f.AllPodList { + for _, c := range fp.Pod.Containers { + if c.ID == containerID { + return fp.NetnsPath, nil + } + } + } + return "", f.Err } diff --git a/pkg/kubelet/dockertools/docker_manager.go b/pkg/kubelet/dockertools/docker_manager.go index 122acf6ccc1..7712bf37df3 100644 --- a/pkg/kubelet/dockertools/docker_manager.go +++ b/pkg/kubelet/dockertools/docker_manager.go @@ -1157,41 +1157,6 @@ func (dm *DockerManager) PortForward(pod *kubecontainer.Pod, port uint16, stream return nil } -// Get the IP address of a container's interface using nsenter -func (dm *DockerManager) GetContainerIP(containerID, interfaceName string) (string, error) { - _, lookupErr := exec.LookPath("nsenter") - if lookupErr != nil { - return "", fmt.Errorf("Unable to obtain IP address of container: missing nsenter.") - } - container, err := dm.client.InspectContainer(containerID) - if err != nil { - return "", err - } - - if !container.State.Running { - return "", fmt.Errorf("container not running (%s)", container.ID) - } - - containerPid := container.State.Pid - extractIPCmd := fmt.Sprintf("ip -4 addr show %s | grep inet | awk -F\" \" '{print $2}'", interfaceName) - args := []string{"-t", fmt.Sprintf("%d", containerPid), "-n", "--", "bash", "-c", extractIPCmd} - command := exec.Command("nsenter", args...) - out, err := command.CombinedOutput() - - // Fall back to IPv6 address if no IPv4 address is present - if err == nil && string(out) == "" { - extractIPCmd = fmt.Sprintf("ip -6 addr show %s scope global | grep inet6 | awk -F\" \" '{print $2}'", interfaceName) - args = []string{"-t", fmt.Sprintf("%d", containerPid), "-n", "--", "bash", "-c", extractIPCmd} - command = exec.Command("nsenter", args...) - out, err = command.CombinedOutput() - } - - if err != nil { - return "", err - } - return string(out), nil -} - // TODO(random-liu): Change running pod to pod status in the future. We can't do it now, because kubelet also uses this function without pod status. // We can only deprecate this after refactoring kubelet. // TODO(random-liu): After using pod status for KillPod(), we can also remove the kubernetesPodLabel, because all the needed information should have diff --git a/pkg/kubelet/image_manager_test.go b/pkg/kubelet/image_manager_test.go index cee3fb1bf0e..30d7614e8f4 100644 --- a/pkg/kubelet/image_manager_test.go +++ b/pkg/kubelet/image_manager_test.go @@ -86,12 +86,12 @@ func TestDetectImagesInitialDetect(t *testing.T) { makeImage(0, 1024), makeImage(1, 2048), } - fakeRuntime.AllPodList = []*container.Pod{ - { + fakeRuntime.AllPodList = []*containertest.FakePod{ + {Pod: &container.Pod{ Containers: []*container.Container{ makeContainer(1), }, - }, + }}, } startTime := time.Now().Add(-time.Millisecond) @@ -116,12 +116,12 @@ func TestDetectImagesWithNewImage(t *testing.T) { makeImage(0, 1024), makeImage(1, 2048), } - fakeRuntime.AllPodList = []*container.Pod{ - { + fakeRuntime.AllPodList = []*containertest.FakePod{ + {Pod: &container.Pod{ Containers: []*container.Container{ makeContainer(1), }, - }, + }}, } err := manager.detectImages(zero) @@ -161,12 +161,12 @@ func TestDetectImagesContainerStopped(t *testing.T) { makeImage(0, 1024), makeImage(1, 2048), } - fakeRuntime.AllPodList = []*container.Pod{ - { + fakeRuntime.AllPodList = []*containertest.FakePod{ + {Pod: &container.Pod{ Containers: []*container.Container{ makeContainer(1), }, - }, + }}, } err := manager.detectImages(zero) @@ -177,7 +177,7 @@ func TestDetectImagesContainerStopped(t *testing.T) { require.True(t, ok) // Simulate container being stopped. - fakeRuntime.AllPodList = []*container.Pod{} + fakeRuntime.AllPodList = []*containertest.FakePod{} err = manager.detectImages(time.Now()) require.NoError(t, err) assert.Equal(manager.imageRecordsLen(), 2) @@ -197,12 +197,12 @@ func TestDetectImagesWithRemovedImages(t *testing.T) { makeImage(0, 1024), makeImage(1, 2048), } - fakeRuntime.AllPodList = []*container.Pod{ - { + fakeRuntime.AllPodList = []*containertest.FakePod{ + {Pod: &container.Pod{ Containers: []*container.Container{ makeContainer(1), }, - }, + }}, } err := manager.detectImages(zero) @@ -223,12 +223,12 @@ func TestFreeSpaceImagesInUseContainersAreIgnored(t *testing.T) { makeImage(0, 1024), makeImage(1, 2048), } - fakeRuntime.AllPodList = []*container.Pod{ - { + fakeRuntime.AllPodList = []*containertest.FakePod{ + {Pod: &container.Pod{ Containers: []*container.Container{ makeContainer(1), }, - }, + }}, } spaceFreed, err := manager.freeSpace(2048, time.Now()) @@ -244,29 +244,29 @@ func TestFreeSpaceRemoveByLeastRecentlyUsed(t *testing.T) { makeImage(0, 1024), makeImage(1, 2048), } - fakeRuntime.AllPodList = []*container.Pod{ - { + fakeRuntime.AllPodList = []*containertest.FakePod{ + {Pod: &container.Pod{ Containers: []*container.Container{ makeContainer(0), makeContainer(1), }, - }, + }}, } // Make 1 be more recently used than 0. require.NoError(t, manager.detectImages(zero)) - fakeRuntime.AllPodList = []*container.Pod{ - { + fakeRuntime.AllPodList = []*containertest.FakePod{ + {Pod: &container.Pod{ Containers: []*container.Container{ makeContainer(1), }, - }, + }}, } require.NoError(t, manager.detectImages(time.Now())) - fakeRuntime.AllPodList = []*container.Pod{ - { + fakeRuntime.AllPodList = []*containertest.FakePod{ + {Pod: &container.Pod{ Containers: []*container.Container{}, - }, + }}, } require.NoError(t, manager.detectImages(time.Now())) require.Equal(t, manager.imageRecordsLen(), 2) @@ -283,12 +283,12 @@ func TestFreeSpaceTiesBrokenByDetectedTime(t *testing.T) { fakeRuntime.ImageList = []container.Image{ makeImage(0, 1024), } - fakeRuntime.AllPodList = []*container.Pod{ - { + fakeRuntime.AllPodList = []*containertest.FakePod{ + {Pod: &container.Pod{ Containers: []*container.Container{ makeContainer(0), }, - }, + }}, } // Make 1 more recently detected but used at the same time as 0. @@ -298,7 +298,7 @@ func TestFreeSpaceTiesBrokenByDetectedTime(t *testing.T) { makeImage(1, 2048), } require.NoError(t, manager.detectImages(time.Now())) - fakeRuntime.AllPodList = []*container.Pod{} + fakeRuntime.AllPodList = []*containertest.FakePod{} require.NoError(t, manager.detectImages(time.Now())) require.Equal(t, manager.imageRecordsLen(), 2) @@ -319,15 +319,15 @@ func TestFreeSpaceImagesAlsoDoesLookupByRepoTags(t *testing.T) { Size: 2048, }, } - fakeRuntime.AllPodList = []*container.Pod{ - { + fakeRuntime.AllPodList = []*containertest.FakePod{ + {Pod: &container.Pod{ Containers: []*container.Container{ { ID: container.ContainerID{Type: "test", ID: "c5678"}, Image: "salad", }, }, - }, + }}, } spaceFreed, err := manager.freeSpace(1024, time.Now()) @@ -347,15 +347,15 @@ func TestFreeSpaceImagesAlsoDoesLookupByRepoDigests(t *testing.T) { Size: 2048, }, } - fakeRuntime.AllPodList = []*container.Pod{ - { + fakeRuntime.AllPodList = []*containertest.FakePod{ + {Pod: &container.Pod{ Containers: []*container.Container{ { ID: container.ContainerID{Type: "test", ID: "c5678"}, Image: "salad", }, }, - }, + }}, } spaceFreed, err := manager.freeSpace(1024, time.Now()) @@ -451,12 +451,12 @@ func TestGarbageCollectImageNotOldEnough(t *testing.T) { makeImage(1, 2048), } // 1 image is in use, and another one is not old enough - fakeRuntime.AllPodList = []*container.Pod{ - { + fakeRuntime.AllPodList = []*containertest.FakePod{ + {Pod: &container.Pod{ Containers: []*container.Container{ makeContainer(1), }, - }, + }}, } fakeClock := util.NewFakeClock(time.Now()) diff --git a/pkg/kubelet/kubelet_cadvisor_test.go b/pkg/kubelet/kubelet_cadvisor_test.go index ab26d489802..1c0f7c8e966 100644 --- a/pkg/kubelet/kubelet_cadvisor_test.go +++ b/pkg/kubelet/kubelet_cadvisor_test.go @@ -22,6 +22,7 @@ import ( cadvisorapi "github.com/google/cadvisor/info/v1" kubecontainer "k8s.io/kubernetes/pkg/kubelet/container" + kubecontainertest "k8s.io/kubernetes/pkg/kubelet/container/testing" ) func TestGetContainerInfo(t *testing.T) { @@ -39,8 +40,8 @@ func TestGetContainerInfo(t *testing.T) { cadvisorReq := &cadvisorapi.ContainerInfoRequest{} mockCadvisor := testKubelet.fakeCadvisor mockCadvisor.On("DockerContainer", containerID, cadvisorReq).Return(containerInfo, nil) - fakeRuntime.PodList = []*kubecontainer.Pod{ - { + fakeRuntime.PodList = []*kubecontainertest.FakePod{ + {Pod: &kubecontainer.Pod{ ID: "12345678", Name: "qux", Namespace: "ns", @@ -50,7 +51,7 @@ func TestGetContainerInfo(t *testing.T) { ID: kubecontainer.ContainerID{Type: "test", ID: containerID}, }, }, - }, + }}, } stats, err := kubelet.GetContainerInfo("qux_ns", "", "foo", cadvisorReq) if err != nil { @@ -122,8 +123,8 @@ func TestGetContainerInfoWhenCadvisorFailed(t *testing.T) { containerInfo := cadvisorapi.ContainerInfo{} cadvisorReq := &cadvisorapi.ContainerInfoRequest{} mockCadvisor.On("DockerContainer", containerID, cadvisorReq).Return(containerInfo, cadvisorApiFailure) - fakeRuntime.PodList = []*kubecontainer.Pod{ - { + fakeRuntime.PodList = []*kubecontainertest.FakePod{ + {Pod: &kubecontainer.Pod{ ID: "uuid", Name: "qux", Namespace: "ns", @@ -132,7 +133,7 @@ func TestGetContainerInfoWhenCadvisorFailed(t *testing.T) { ID: kubecontainer.ContainerID{Type: "test", ID: containerID}, }, }, - }, + }}, } stats, err := kubelet.GetContainerInfo("qux_ns", "uuid", "foo", cadvisorReq) if stats != nil { @@ -153,7 +154,7 @@ func TestGetContainerInfoOnNonExistContainer(t *testing.T) { kubelet := testKubelet.kubelet mockCadvisor := testKubelet.fakeCadvisor fakeRuntime := testKubelet.fakeRuntime - fakeRuntime.PodList = []*kubecontainer.Pod{} + fakeRuntime.PodList = []*kubecontainertest.FakePod{} stats, _ := kubelet.GetContainerInfo("qux", "", "foo", nil) if stats != nil { @@ -206,8 +207,8 @@ func TestGetContainerInfoWithNoMatchingContainers(t *testing.T) { fakeRuntime := testKubelet.fakeRuntime kubelet := testKubelet.kubelet mockCadvisor := testKubelet.fakeCadvisor - fakeRuntime.PodList = []*kubecontainer.Pod{ - { + fakeRuntime.PodList = []*kubecontainertest.FakePod{ + {Pod: &kubecontainer.Pod{ ID: "12345678", Name: "qux", Namespace: "ns", @@ -216,6 +217,7 @@ func TestGetContainerInfoWithNoMatchingContainers(t *testing.T) { ID: kubecontainer.ContainerID{Type: "test", ID: "fakeID"}, }, }}, + }, } stats, err := kubelet.GetContainerInfo("qux_ns", "", "foo", nil) diff --git a/pkg/kubelet/kubelet_test.go b/pkg/kubelet/kubelet_test.go index 60b4860e483..bbbcf20ec71 100644 --- a/pkg/kubelet/kubelet_test.go +++ b/pkg/kubelet/kubelet_test.go @@ -408,15 +408,15 @@ func TestSyncPodsDeletesWhenSourcesAreReady(t *testing.T) { kubelet := testKubelet.kubelet kubelet.sourcesReady = config.NewSourcesReady(func(_ sets.String) bool { return ready }) - fakeRuntime.PodList = []*kubecontainer.Pod{ - { + fakeRuntime.PodList = []*containertest.FakePod{ + {Pod: &kubecontainer.Pod{ ID: "12345678", Name: "foo", Namespace: "new", Containers: []*kubecontainer.Container{ {Name: "bar"}, }, - }, + }}, } kubelet.HandlePodCleanups() // Sources are not ready yet. Don't remove any pods. @@ -1087,7 +1087,7 @@ func TestRunInContainerNoSuchPod(t *testing.T) { testKubelet := newTestKubelet(t, false /* controllerAttachDetachEnabled */) kubelet := testKubelet.kubelet fakeRuntime := testKubelet.fakeRuntime - fakeRuntime.PodList = []*kubecontainer.Pod{} + fakeRuntime.PodList = []*containertest.FakePod{} podName := "podFoo" podNamespace := "nsFoo" @@ -1113,8 +1113,8 @@ func TestRunInContainer(t *testing.T) { kubelet.runner = &fakeCommandRunner containerID := kubecontainer.ContainerID{Type: "test", ID: "abc1234"} - fakeRuntime.PodList = []*kubecontainer.Pod{ - { + fakeRuntime.PodList = []*containertest.FakePod{ + {Pod: &kubecontainer.Pod{ ID: "12345678", Name: "podFoo", Namespace: "nsFoo", @@ -1123,7 +1123,7 @@ func TestRunInContainer(t *testing.T) { ID: containerID, }, }, - }, + }}, } cmd := []string{"ls"} _, err := kubelet.RunInContainer("podFoo_nsFoo", "", "containerFoo", cmd) @@ -2069,7 +2069,7 @@ func TestExecInContainerNoSuchPod(t *testing.T) { fakeRuntime := testKubelet.fakeRuntime fakeCommandRunner := fakeContainerCommandRunner{} kubelet.runner = &fakeCommandRunner - fakeRuntime.PodList = []*kubecontainer.Pod{} + fakeRuntime.PodList = []*containertest.FakePod{} podName := "podFoo" podNamespace := "nsFoo" @@ -2102,8 +2102,8 @@ func TestExecInContainerNoSuchContainer(t *testing.T) { podName := "podFoo" podNamespace := "nsFoo" containerID := "containerFoo" - fakeRuntime.PodList = []*kubecontainer.Pod{ - { + fakeRuntime.PodList = []*containertest.FakePod{ + {Pod: &kubecontainer.Pod{ ID: "12345678", Name: podName, Namespace: podNamespace, @@ -2111,7 +2111,7 @@ func TestExecInContainerNoSuchContainer(t *testing.T) { {Name: "bar", ID: kubecontainer.ContainerID{Type: "test", ID: "barID"}}, }, - }, + }}, } err := kubelet.ExecInContainer( @@ -2165,8 +2165,8 @@ func TestExecInContainer(t *testing.T) { stdout := &fakeReadWriteCloser{} stderr := &fakeReadWriteCloser{} tty := true - fakeRuntime.PodList = []*kubecontainer.Pod{ - { + fakeRuntime.PodList = []*containertest.FakePod{ + {Pod: &kubecontainer.Pod{ ID: "12345678", Name: podName, Namespace: podNamespace, @@ -2175,7 +2175,7 @@ func TestExecInContainer(t *testing.T) { ID: kubecontainer.ContainerID{Type: "test", ID: containerID}, }, }, - }, + }}, } err := kubelet.ExecInContainer( @@ -2215,7 +2215,7 @@ func TestPortForwardNoSuchPod(t *testing.T) { testKubelet := newTestKubelet(t, false /* controllerAttachDetachEnabled */) kubelet := testKubelet.kubelet fakeRuntime := testKubelet.fakeRuntime - fakeRuntime.PodList = []*kubecontainer.Pod{} + fakeRuntime.PodList = []*containertest.FakePod{} fakeCommandRunner := fakeContainerCommandRunner{} kubelet.runner = &fakeCommandRunner @@ -2245,8 +2245,8 @@ func TestPortForward(t *testing.T) { podName := "podFoo" podNamespace := "nsFoo" podID := types.UID("12345678") - fakeRuntime.PodList = []*kubecontainer.Pod{ - { + fakeRuntime.PodList = []*containertest.FakePod{ + {Pod: &kubecontainer.Pod{ ID: podID, Name: podName, Namespace: podNamespace, @@ -2256,7 +2256,7 @@ func TestPortForward(t *testing.T) { ID: kubecontainer.ContainerID{Type: "test", ID: "containerFoo"}, }, }, - }, + }}, } fakeCommandRunner := fakeContainerCommandRunner{} kubelet.runner = &fakeCommandRunner @@ -3594,8 +3594,8 @@ func TestGetContainerInfoForMirrorPods(t *testing.T) { mockCadvisor.On("DockerContainer", containerID, cadvisorReq).Return(containerInfo, nil) kubelet := testKubelet.kubelet - fakeRuntime.PodList = []*kubecontainer.Pod{ - { + fakeRuntime.PodList = []*containertest.FakePod{ + {Pod: &kubecontainer.Pod{ ID: "1234", Name: "qux", Namespace: "ns", @@ -3605,7 +3605,7 @@ func TestGetContainerInfoForMirrorPods(t *testing.T) { ID: kubecontainer.ContainerID{Type: "test", ID: containerID}, }, }, - }, + }}, } kubelet.podManager.SetPods(pods) @@ -3930,15 +3930,15 @@ func TestSyncPodsSetStatusToFailedForPodsThatRunTooLong(t *testing.T) { }, } - fakeRuntime.PodList = []*kubecontainer.Pod{ - { + fakeRuntime.PodList = []*containertest.FakePod{ + {Pod: &kubecontainer.Pod{ ID: "12345678", Name: "bar", Namespace: "new", Containers: []*kubecontainer.Container{ {Name: "foo"}, }, - }, + }}, } // Let the pod worker sets the status to fail after this sync. @@ -3986,15 +3986,15 @@ func TestSyncPodsDoesNotSetPodsThatDidNotRunTooLongToFailed(t *testing.T) { }, } - fakeRuntime.PodList = []*kubecontainer.Pod{ - { + fakeRuntime.PodList = []*containertest.FakePod{ + {Pod: &kubecontainer.Pod{ ID: "12345678", Name: "bar", Namespace: "new", Containers: []*kubecontainer.Container{ {Name: "foo"}, }, - }, + }}, } kubelet.podManager.SetPods(pods) @@ -4118,13 +4118,13 @@ func TestDoesNotDeletePodDirsIfContainerIsRunning(t *testing.T) { // Pretend the pod is deleted from apiserver, but is still active on the node. // The pod directory should not be removed. pods = []*api.Pod{} - testKubelet.fakeRuntime.PodList = []*kubecontainer.Pod{runningPod} + testKubelet.fakeRuntime.PodList = []*containertest.FakePod{{runningPod, ""}} syncAndVerifyPodDir(t, testKubelet, pods, []*api.Pod{apiPod}, true) // The pod is deleted and also not active on the node. The pod directory // should be removed. pods = []*api.Pod{} - testKubelet.fakeRuntime.PodList = []*kubecontainer.Pod{} + testKubelet.fakeRuntime.PodList = []*containertest.FakePod{} syncAndVerifyPodDir(t, testKubelet, pods, []*api.Pod{apiPod}, false) } diff --git a/pkg/kubelet/network/cni/cni.go b/pkg/kubelet/network/cni/cni.go index 591ad38c202..06d7fcabf04 100644 --- a/pkg/kubelet/network/cni/cni.go +++ b/pkg/kubelet/network/cni/cni.go @@ -27,8 +27,8 @@ import ( "github.com/golang/glog" "k8s.io/kubernetes/pkg/apis/componentconfig" kubecontainer "k8s.io/kubernetes/pkg/kubelet/container" - "k8s.io/kubernetes/pkg/kubelet/dockertools" "k8s.io/kubernetes/pkg/kubelet/network" + utilexec "k8s.io/kubernetes/pkg/util/exec" ) const ( @@ -43,6 +43,8 @@ type cniNetworkPlugin struct { defaultNetwork *cniNetwork host network.Host + execer utilexec.Interface + nsenterPath string } type cniNetwork struct { @@ -57,7 +59,10 @@ func probeNetworkPluginsWithVendorCNIDirPrefix(pluginDir, vendorCNIDirPrefix str if err != nil { return configList } - return append(configList, &cniNetworkPlugin{defaultNetwork: network}) + return append(configList, &cniNetworkPlugin{ + defaultNetwork: network, + execer: utilexec.New(), + }) } func ProbeNetworkPlugins(pluginDir string) []network.NetworkPlugin { @@ -95,6 +100,12 @@ func getDefaultCNINetwork(pluginDir, vendorCNIDirPrefix string) (*cniNetwork, er } func (plugin *cniNetworkPlugin) Init(host network.Host, hairpinMode componentconfig.HairpinMode, nonMasqueradeCIDR string) error { + var err error + plugin.nsenterPath, err = plugin.execer.LookPath("nsenter") + if err != nil { + return err + } + plugin.host = host return nil } @@ -104,16 +115,12 @@ func (plugin *cniNetworkPlugin) Name() string { } func (plugin *cniNetworkPlugin) SetUpPod(namespace string, name string, id kubecontainer.ContainerID) error { - runtime, ok := plugin.host.GetRuntime().(*dockertools.DockerManager) - if !ok { - return fmt.Errorf("CNI execution called on non-docker runtime") - } - netns, err := runtime.GetNetNS(id) + netnsPath, err := plugin.host.GetRuntime().GetNetNS(id) if err != nil { - return err + return fmt.Errorf("CNI failed to retrieve network namespace path: %v", err) } - _, err = plugin.defaultNetwork.addToNetwork(name, namespace, id, netns) + _, err = plugin.defaultNetwork.addToNetwork(name, namespace, id, netnsPath) if err != nil { glog.Errorf("Error while adding to cni network: %s", err) return err @@ -123,33 +130,55 @@ func (plugin *cniNetworkPlugin) SetUpPod(namespace string, name string, id kubec } func (plugin *cniNetworkPlugin) TearDownPod(namespace string, name string, id kubecontainer.ContainerID) error { - runtime, ok := plugin.host.GetRuntime().(*dockertools.DockerManager) - if !ok { - return fmt.Errorf("CNI execution called on non-docker runtime") - } - netns, err := runtime.GetNetNS(id) + netnsPath, err := plugin.host.GetRuntime().GetNetNS(id) if err != nil { - return err + return fmt.Errorf("CNI failed to retrieve network namespace path: %v", err) } - return plugin.defaultNetwork.deleteFromNetwork(name, namespace, id, netns) + return plugin.defaultNetwork.deleteFromNetwork(name, namespace, id, netnsPath) +} + +func (plugin *cniNetworkPlugin) getContainerIPAddress(netnsPath, addrType string) (net.IP, error) { + // Try to retrieve ip inside container network namespace + output, err := plugin.execer.Command(plugin.nsenterPath, fmt.Sprintf("--net=%s", netnsPath), "-F", "--", + "ip", "-o", addrType, "addr", "show", "dev", network.DefaultInterfaceName, "scope", "global").CombinedOutput() + if err != nil { + return nil, fmt.Errorf("Unexpected command output %s with error: %v", output, err) + } + + lines := strings.Split(string(output), "\n") + if len(lines) < 1 { + return nil, fmt.Errorf("Unexpected command output %s", output) + } + fields := strings.Fields(lines[0]) + if len(fields) < 4 { + return nil, fmt.Errorf("Unexpected address output %s ", lines[0]) + } + ip, _, err := net.ParseCIDR(fields[3]) + if err != nil { + return nil, fmt.Errorf("CNI failed to parse ip from output %s due to %v", output, err) + } + + return ip, nil } // TODO: Use the addToNetwork function to obtain the IP of the Pod. That will assume idempotent ADD call to the plugin. // Also fix the runtime's call to Status function to be done only in the case that the IP is lost, no need to do periodic calls func (plugin *cniNetworkPlugin) GetPodNetworkStatus(namespace string, name string, id kubecontainer.ContainerID) (*network.PodNetworkStatus, error) { - runtime, ok := plugin.host.GetRuntime().(*dockertools.DockerManager) - if !ok { - return nil, fmt.Errorf("CNI execution called on non-docker runtime") + netnsPath, err := plugin.host.GetRuntime().GetNetNS(id) + if err != nil { + return nil, fmt.Errorf("CNI failed to retrieve network namespace path: %v", err) + } + + ip, err := plugin.getContainerIPAddress(netnsPath, "-4") + if err != nil { + // Fall back to IPv6 address if no IPv4 address is present + ip, err = plugin.getContainerIPAddress(netnsPath, "-6") } - ipStr, err := runtime.GetContainerIP(id.ID, network.DefaultInterfaceName) - if err != nil { - return nil, err - } - ip, _, err := net.ParseCIDR(strings.Trim(ipStr, "\n")) if err != nil { return nil, err } + return &network.PodNetworkStatus{IP: ip}, nil } diff --git a/pkg/kubelet/network/cni/cni_test.go b/pkg/kubelet/network/cni/cni_test.go index fe4aedee846..3ad6aaeb944 100644 --- a/pkg/kubelet/network/cni/cni_test.go +++ b/pkg/kubelet/network/cni/cni_test.go @@ -30,18 +30,12 @@ import ( clientset "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset" - cadvisorapi "github.com/google/cadvisor/info/v1" - - "k8s.io/kubernetes/cmd/kubelet/app/options" "k8s.io/kubernetes/pkg/api" "k8s.io/kubernetes/pkg/apis/componentconfig" - "k8s.io/kubernetes/pkg/client/record" kubecontainer "k8s.io/kubernetes/pkg/kubelet/container" containertest "k8s.io/kubernetes/pkg/kubelet/container/testing" - "k8s.io/kubernetes/pkg/kubelet/dockertools" "k8s.io/kubernetes/pkg/kubelet/network" - nettest "k8s.io/kubernetes/pkg/kubelet/network/testing" - proberesults "k8s.io/kubernetes/pkg/kubelet/prober/results" + utilexec "k8s.io/kubernetes/pkg/util/exec" utiltesting "k8s.io/kubernetes/pkg/util/testing" ) @@ -115,10 +109,16 @@ func tearDownPlugin(tmpDir string) { type fakeNetworkHost struct { kubeClient clientset.Interface + runtime kubecontainer.Runtime } -func NewFakeHost(kubeClient clientset.Interface) *fakeNetworkHost { - host := &fakeNetworkHost{kubeClient: kubeClient} +func NewFakeHost(kubeClient clientset.Interface, pods []*containertest.FakePod) *fakeNetworkHost { + host := &fakeNetworkHost{ + kubeClient: kubeClient, + runtime: &containertest.FakeRuntime{ + AllPodList: pods, + }, + } return host } @@ -127,40 +127,11 @@ func (fnh *fakeNetworkHost) GetPodByName(name, namespace string) (*api.Pod, bool } func (fnh *fakeNetworkHost) GetKubeClient() clientset.Interface { - return nil + return fnh.kubeClient } -func (nh *fakeNetworkHost) GetRuntime() kubecontainer.Runtime { - dm, fakeDockerClient := newTestDockerManager() - fakeDockerClient.SetFakeRunningContainers([]*dockertools.FakeContainer{ - { - ID: "test_infra_container", - Pid: 12345, - }, - }) - return dm -} - -func newTestDockerManager() (*dockertools.DockerManager, *dockertools.FakeDockerClient) { - fakeDocker := dockertools.NewFakeDockerClient() - fakeRecorder := &record.FakeRecorder{} - containerRefManager := kubecontainer.NewRefManager() - networkPlugin, _ := network.InitNetworkPlugin([]network.NetworkPlugin{}, "", nettest.NewFakeHost(nil), componentconfig.HairpinNone, "10.0.0.0/8") - dockerManager := dockertools.NewFakeDockerManager( - fakeDocker, - fakeRecorder, - proberesults.NewManager(), - containerRefManager, - &cadvisorapi.MachineInfo{}, - options.GetDefaultPodInfraContainerImage(), - 0, 0, "", - &containertest.FakeOS{}, - networkPlugin, - nil, - nil, - nil) - - return dockerManager, fakeDocker +func (fnh *fakeNetworkHost) GetRuntime() kubecontainer.Runtime { + return fnh.runtime } func TestCNIPlugin(t *testing.T) { @@ -168,19 +139,64 @@ func TestCNIPlugin(t *testing.T) { pluginName := fmt.Sprintf("test%d", rand.Intn(1000)) vendorName := fmt.Sprintf("test_vendor%d", rand.Intn(1000)) + podIP := "10.0.0.2" + podIPOutput := fmt.Sprintf("4: eth0 inet %s/24 scope global dynamic eth0\\ valid_lft forever preferred_lft forever", podIP) + fakeCmds := []utilexec.FakeCommandAction{ + func(cmd string, args ...string) utilexec.Cmd { + return utilexec.InitFakeCmd(&utilexec.FakeCmd{ + CombinedOutputScript: []utilexec.FakeCombinedOutputAction{ + func() ([]byte, error) { + return []byte(podIPOutput), nil + }, + }, + }, cmd, args...) + }, + } + + fexec := &utilexec.FakeExec{ + CommandScript: fakeCmds, + LookPathFunc: func(file string) (string, error) { + return fmt.Sprintf("/fake-bin/%s", file), nil + }, + } + tmpDir := utiltesting.MkTmpdirOrDie("cni-test") testNetworkConfigPath := path.Join(tmpDir, "plugins", "net", "cni") testVendorCNIDirPrefix := tmpDir defer tearDownPlugin(tmpDir) installPluginUnderTest(t, testVendorCNIDirPrefix, testNetworkConfigPath, vendorName, pluginName) - np := probeNetworkPluginsWithVendorCNIDirPrefix(path.Join(testNetworkConfigPath, pluginName), testVendorCNIDirPrefix) - plug, err := network.InitNetworkPlugin(np, "cni", NewFakeHost(nil), componentconfig.HairpinNone, "10.0.0.0/8") + containerID := kubecontainer.ContainerID{Type: "test", ID: "test_infra_container"} + pods := []*containertest.FakePod{{ + Pod: &kubecontainer.Pod{ + Containers: []*kubecontainer.Container{ + {ID: containerID}, + }, + }, + NetnsPath: "/proc/12345/ns/net", + }} + + plugins := probeNetworkPluginsWithVendorCNIDirPrefix(path.Join(testNetworkConfigPath, pluginName), testVendorCNIDirPrefix) + if len(plugins) != 1 { + t.Fatalf("Expected only one network plugin, got %d", len(plugins)) + } + if plugins[0].Name() != "cni" { + t.Fatalf("Expected CNI network plugin, got %q", plugins[0].Name()) + } + + cniPlugin, ok := plugins[0].(*cniNetworkPlugin) + if !ok { + t.Fatalf("Not a CNI network plugin!") + } + cniPlugin.execer = fexec + + plug, err := network.InitNetworkPlugin(plugins, "cni", NewFakeHost(nil, pods), componentconfig.HairpinNone, "10.0.0.0/8") if err != nil { t.Fatalf("Failed to select the desired plugin: %v", err) } - err = plug.SetUpPod("podNamespace", "podName", kubecontainer.ContainerID{Type: "docker", ID: "test_infra_container"}) + // Set up the pod + err = plug.SetUpPod("podNamespace", "podName", containerID) if err != nil { t.Errorf("Expected nil: %v", err) } @@ -195,7 +211,18 @@ func TestCNIPlugin(t *testing.T) { if string(output) != expectedOutput { t.Errorf("Mismatch in expected output for setup hook. Expected '%s', got '%s'", expectedOutput, string(output)) } - err = plug.TearDownPod("podNamespace", "podName", kubecontainer.ContainerID{Type: "docker", ID: "test_infra_container"}) + + // Get its IP address + status, err := plug.GetPodNetworkStatus("podNamespace", "podName", containerID) + if err != nil { + t.Errorf("Failed to read pod network status: %v", err) + } + if status.IP.String() != podIP { + t.Errorf("Expected pod IP %q but got %q", podIP, status.IP.String()) + } + + // Tear it down + err = plug.TearDownPod("podNamespace", "podName", containerID) if err != nil { t.Errorf("Expected nil: %v", err) } diff --git a/pkg/kubelet/pleg/generic_test.go b/pkg/kubelet/pleg/generic_test.go index 460547d85a1..0f92e57448a 100644 --- a/pkg/kubelet/pleg/generic_test.go +++ b/pkg/kubelet/pleg/generic_test.go @@ -97,21 +97,21 @@ func TestRelisting(t *testing.T) { pleg, runtime := testPleg.pleg, testPleg.runtime ch := pleg.Watch() // The first relist should send a PodSync event to each pod. - runtime.AllPodList = []*kubecontainer.Pod{ - { + runtime.AllPodList = []*containertest.FakePod{ + {Pod: &kubecontainer.Pod{ ID: "1234", Containers: []*kubecontainer.Container{ createTestContainer("c1", kubecontainer.ContainerStateExited), createTestContainer("c2", kubecontainer.ContainerStateRunning), createTestContainer("c3", kubecontainer.ContainerStateUnknown), }, - }, - { + }}, + {Pod: &kubecontainer.Pod{ ID: "4567", Containers: []*kubecontainer.Container{ createTestContainer("c1", kubecontainer.ContainerStateExited), }, - }, + }}, } pleg.relist() // Report every running/exited container if we see them for the first time. @@ -128,20 +128,20 @@ func TestRelisting(t *testing.T) { pleg.relist() verifyEvents(t, expected, actual) - runtime.AllPodList = []*kubecontainer.Pod{ - { + runtime.AllPodList = []*containertest.FakePod{ + {Pod: &kubecontainer.Pod{ ID: "1234", Containers: []*kubecontainer.Container{ createTestContainer("c2", kubecontainer.ContainerStateExited), createTestContainer("c3", kubecontainer.ContainerStateRunning), }, - }, - { + }}, + {Pod: &kubecontainer.Pod{ ID: "4567", Containers: []*kubecontainer.Container{ createTestContainer("c4", kubecontainer.ContainerStateRunning), }, - }, + }}, } pleg.relist() // Only report containers that transitioned to running or exited status. @@ -169,15 +169,15 @@ func testReportMissingContainers(t *testing.T, numRelists int) { testPleg := newTestGenericPLEG() pleg, runtime := testPleg.pleg, testPleg.runtime ch := pleg.Watch() - runtime.AllPodList = []*kubecontainer.Pod{ - { + runtime.AllPodList = []*containertest.FakePod{ + {Pod: &kubecontainer.Pod{ ID: "1234", Containers: []*kubecontainer.Container{ createTestContainer("c1", kubecontainer.ContainerStateRunning), createTestContainer("c2", kubecontainer.ContainerStateRunning), createTestContainer("c3", kubecontainer.ContainerStateExited), }, - }, + }}, } // Relist and drain the events from the channel. for i := 0; i < numRelists; i++ { @@ -188,13 +188,13 @@ func testReportMissingContainers(t *testing.T, numRelists int) { // Container c2 was stopped and removed between relists. We should report // the event. The exited container c3 was garbage collected (i.e., removed) // between relists. We should ignore that event. - runtime.AllPodList = []*kubecontainer.Pod{ - { + runtime.AllPodList = []*containertest.FakePod{ + {Pod: &kubecontainer.Pod{ ID: "1234", Containers: []*kubecontainer.Container{ createTestContainer("c1", kubecontainer.ContainerStateRunning), }, - }, + }}, } pleg.relist() expected := []*PodLifecycleEvent{ @@ -208,13 +208,13 @@ func testReportMissingPods(t *testing.T, numRelists int) { testPleg := newTestGenericPLEG() pleg, runtime := testPleg.pleg, testPleg.runtime ch := pleg.Watch() - runtime.AllPodList = []*kubecontainer.Pod{ - { + runtime.AllPodList = []*containertest.FakePod{ + {Pod: &kubecontainer.Pod{ ID: "1234", Containers: []*kubecontainer.Container{ createTestContainer("c2", kubecontainer.ContainerStateRunning), }, - }, + }}, } // Relist and drain the events from the channel. for i := 0; i < numRelists; i++ { @@ -224,7 +224,7 @@ func testReportMissingPods(t *testing.T, numRelists int) { // Container c2 was stopped and removed between relists. We should report // the event. - runtime.AllPodList = []*kubecontainer.Pod{} + runtime.AllPodList = []*containertest.FakePod{} pleg.relist() expected := []*PodLifecycleEvent{ {ID: "1234", Type: ContainerDied, Data: "c2"},