Merge pull request #27878 from dcbw/cni-cleanup

Automatic merge from submit-queue

Make kubelet CNI network plugin runtime-agnostic

cni.go has a couple docker-isms in it still, so let's remove those and make the plugin runtime-agnostic.  Also fixes some docker-isms in kubenet that snuck in with the HostPort changes.
This commit is contained in:
k8s-merge-robot 2016-06-22 13:43:04 -07:00 committed by GitHub
commit a505958f2b
14 changed files with 336 additions and 266 deletions

View File

@ -108,6 +108,11 @@ type Runtime interface {
// TODO: Change ContainerID to a Pod ID since the namespace is shared
// by all containers in the pod.
GetNetNS(containerID ContainerID) (string, error)
// Returns the container ID that represents the Pod, as passed to network
// plugins. For example if the runtime uses an infra container, returns
// the infra container's ContainerID.
// TODO: Change ContainerID to a Pod ID, see GetNetNS()
GetPodContainerID(*Pod) (ContainerID, error)
// TODO(vmarmol): Unify pod and containerID args.
// GetContainerLogs returns logs of a specific container. By
// default, it returns a snapshot of the container log. Set 'follow' to true to

View File

@ -25,18 +25,28 @@ import (
ctest "k8s.io/kubernetes/pkg/kubelet/container/testing"
)
func comparePods(t *testing.T, expected []*ctest.FakePod, actual []*Pod) {
if len(expected) != len(actual) {
t.Errorf("expected %d pods, got %d instead", len(expected), len(actual))
}
for i := range expected {
if !reflect.DeepEqual(expected[i].Pod, actual[i]) {
t.Errorf("expected %#v, got %#v", expected[i].Pod, actual[i])
}
}
}
func TestGetPods(t *testing.T) {
runtime := &ctest.FakeRuntime{}
expected := []*Pod{{ID: "1111"}, {ID: "2222"}, {ID: "3333"}}
expected := []*ctest.FakePod{{Pod: &Pod{ID: "1111"}}, {Pod: &Pod{ID: "2222"}}, {Pod: &Pod{ID: "3333"}}}
runtime.PodList = expected
cache := NewTestRuntimeCache(runtime)
actual, err := cache.GetPods()
if err != nil {
t.Errorf("unexpected error %v", err)
}
if !reflect.DeepEqual(expected, actual) {
t.Errorf("expected %#v, got %#v", expected, actual)
}
comparePods(t, expected, actual)
}
func TestForceUpdateIfOlder(t *testing.T) {
@ -44,25 +54,21 @@ func TestForceUpdateIfOlder(t *testing.T) {
cache := NewTestRuntimeCache(runtime)
// Cache old pods.
oldpods := []*Pod{{ID: "1111"}}
oldpods := []*ctest.FakePod{{Pod: &Pod{ID: "1111"}}}
runtime.PodList = oldpods
cache.UpdateCacheWithLock()
// Update the runtime to new pods.
newpods := []*Pod{{ID: "1111"}, {ID: "2222"}, {ID: "3333"}}
newpods := []*ctest.FakePod{{Pod: &Pod{ID: "1111"}}, {Pod: &Pod{ID: "2222"}}, {Pod: &Pod{ID: "3333"}}}
runtime.PodList = newpods
// An older timestamp should not force an update.
cache.ForceUpdateIfOlder(time.Now().Add(-20 * time.Minute))
actual := cache.GetCachedPods()
if !reflect.DeepEqual(oldpods, actual) {
t.Errorf("expected %#v, got %#v", oldpods, actual)
}
comparePods(t, oldpods, actual)
// A newer timestamp should force an update.
cache.ForceUpdateIfOlder(time.Now().Add(20 * time.Second))
actual = cache.GetCachedPods()
if !reflect.DeepEqual(newpods, actual) {
t.Errorf("expected %#v, got %#v", newpods, actual)
}
comparePods(t, newpods, actual)
}

View File

@ -30,12 +30,17 @@ import (
"k8s.io/kubernetes/pkg/volume"
)
type FakePod struct {
Pod *Pod
NetnsPath string
}
// FakeRuntime is a fake container runtime for testing.
type FakeRuntime struct {
sync.Mutex
CalledFunctions []string
PodList []*Pod
AllPodList []*Pod
PodList []*FakePod
AllPodList []*FakePod
ImageList []Image
APIPodStatus api.PodStatus
PodStatus PodStatus
@ -98,8 +103,8 @@ func (f *FakeRuntime) ClearCalls() {
defer f.Unlock()
f.CalledFunctions = []string{}
f.PodList = []*Pod{}
f.AllPodList = []*Pod{}
f.PodList = []*FakePod{}
f.AllPodList = []*FakePod{}
f.APIPodStatus = api.PodStatus{}
f.StartedPods = []string{}
f.KilledPods = []string{}
@ -182,11 +187,19 @@ func (f *FakeRuntime) GetPods(all bool) ([]*Pod, error) {
f.Lock()
defer f.Unlock()
var pods []*Pod
f.CalledFunctions = append(f.CalledFunctions, "GetPods")
if all {
return f.AllPodList, f.Err
for _, fakePod := range f.AllPodList {
pods = append(pods, fakePod.Pod)
}
return f.PodList, f.Err
} else {
for _, fakePod := range f.PodList {
pods = append(pods, fakePod.Pod)
}
}
return pods, f.Err
}
func (f *FakeRuntime) SyncPod(pod *api.Pod, _ api.PodStatus, _ *PodStatus, _ []api.Secret, backOff *flowcontrol.Backoff) (result PodSyncResult) {
@ -343,9 +356,26 @@ func (f *FakeRuntime) GetNetNS(containerID ContainerID) (string, error) {
defer f.Unlock()
f.CalledFunctions = append(f.CalledFunctions, "GetNetNS")
for _, fp := range f.AllPodList {
for _, c := range fp.Pod.Containers {
if c.ID == containerID {
return fp.NetnsPath, nil
}
}
}
return "", f.Err
}
func (f *FakeRuntime) GetPodContainerID(pod *Pod) (ContainerID, error) {
f.Lock()
defer f.Unlock()
f.CalledFunctions = append(f.CalledFunctions, "GetPodContainerID")
return ContainerID{}, f.Err
}
func (f *FakeRuntime) GarbageCollect(gcPolicy ContainerGCPolicy, ready bool) error {
f.Lock()
defer f.Unlock()

View File

@ -133,6 +133,11 @@ func (r *Mock) GetNetNS(containerID ContainerID) (string, error) {
return "", args.Error(0)
}
func (r *Mock) GetPodContainerID(pod *Pod) (ContainerID, error) {
args := r.Called(pod)
return ContainerID{}, args.Error(0)
}
func (r *Mock) GarbageCollect(gcPolicy ContainerGCPolicy, ready bool) error {
args := r.Called(gcPolicy, ready)
return args.Error(0)

View File

@ -1157,41 +1157,6 @@ func (dm *DockerManager) PortForward(pod *kubecontainer.Pod, port uint16, stream
return nil
}
// Get the IP address of a container's interface using nsenter
func (dm *DockerManager) GetContainerIP(containerID, interfaceName string) (string, error) {
_, lookupErr := exec.LookPath("nsenter")
if lookupErr != nil {
return "", fmt.Errorf("Unable to obtain IP address of container: missing nsenter.")
}
container, err := dm.client.InspectContainer(containerID)
if err != nil {
return "", err
}
if !container.State.Running {
return "", fmt.Errorf("container not running (%s)", container.ID)
}
containerPid := container.State.Pid
extractIPCmd := fmt.Sprintf("ip -4 addr show %s | grep inet | awk -F\" \" '{print $2}'", interfaceName)
args := []string{"-t", fmt.Sprintf("%d", containerPid), "-n", "--", "bash", "-c", extractIPCmd}
command := exec.Command("nsenter", args...)
out, err := command.CombinedOutput()
// Fall back to IPv6 address if no IPv4 address is present
if err == nil && string(out) == "" {
extractIPCmd = fmt.Sprintf("ip -6 addr show %s scope global | grep inet6 | awk -F\" \" '{print $2}'", interfaceName)
args = []string{"-t", fmt.Sprintf("%d", containerPid), "-n", "--", "bash", "-c", extractIPCmd}
command = exec.Command("nsenter", args...)
out, err = command.CombinedOutput()
}
if err != nil {
return "", err
}
return string(out), nil
}
// TODO(random-liu): Change running pod to pod status in the future. We can't do it now, because kubelet also uses this function without pod status.
// We can only deprecate this after refactoring kubelet.
// TODO(random-liu): After using pod status for KillPod(), we can also remove the kubernetesPodLabel, because all the needed information should have
@ -2353,6 +2318,16 @@ func (dm *DockerManager) GetNetNS(containerID kubecontainer.ContainerID) (string
return netnsPath, nil
}
func (dm *DockerManager) GetPodContainerID(pod *kubecontainer.Pod) (kubecontainer.ContainerID, error) {
for _, c := range pod.Containers {
if c.Name == PodInfraContainerName {
return c.ID, nil
}
}
return kubecontainer.ContainerID{}, fmt.Errorf("Pod %s unknown to docker.", kubecontainer.BuildPodFullName(pod.Name, pod.Namespace))
}
// Garbage collection of dead containers
func (dm *DockerManager) GarbageCollect(gcPolicy kubecontainer.ContainerGCPolicy, allSourcesReady bool) error {
return dm.containerGC.GarbageCollect(gcPolicy, allSourcesReady)

View File

@ -86,12 +86,12 @@ func TestDetectImagesInitialDetect(t *testing.T) {
makeImage(0, 1024),
makeImage(1, 2048),
}
fakeRuntime.AllPodList = []*container.Pod{
{
fakeRuntime.AllPodList = []*containertest.FakePod{
{Pod: &container.Pod{
Containers: []*container.Container{
makeContainer(1),
},
},
}},
}
startTime := time.Now().Add(-time.Millisecond)
@ -116,12 +116,12 @@ func TestDetectImagesWithNewImage(t *testing.T) {
makeImage(0, 1024),
makeImage(1, 2048),
}
fakeRuntime.AllPodList = []*container.Pod{
{
fakeRuntime.AllPodList = []*containertest.FakePod{
{Pod: &container.Pod{
Containers: []*container.Container{
makeContainer(1),
},
},
}},
}
err := manager.detectImages(zero)
@ -161,12 +161,12 @@ func TestDetectImagesContainerStopped(t *testing.T) {
makeImage(0, 1024),
makeImage(1, 2048),
}
fakeRuntime.AllPodList = []*container.Pod{
{
fakeRuntime.AllPodList = []*containertest.FakePod{
{Pod: &container.Pod{
Containers: []*container.Container{
makeContainer(1),
},
},
}},
}
err := manager.detectImages(zero)
@ -177,7 +177,7 @@ func TestDetectImagesContainerStopped(t *testing.T) {
require.True(t, ok)
// Simulate container being stopped.
fakeRuntime.AllPodList = []*container.Pod{}
fakeRuntime.AllPodList = []*containertest.FakePod{}
err = manager.detectImages(time.Now())
require.NoError(t, err)
assert.Equal(manager.imageRecordsLen(), 2)
@ -197,12 +197,12 @@ func TestDetectImagesWithRemovedImages(t *testing.T) {
makeImage(0, 1024),
makeImage(1, 2048),
}
fakeRuntime.AllPodList = []*container.Pod{
{
fakeRuntime.AllPodList = []*containertest.FakePod{
{Pod: &container.Pod{
Containers: []*container.Container{
makeContainer(1),
},
},
}},
}
err := manager.detectImages(zero)
@ -223,12 +223,12 @@ func TestFreeSpaceImagesInUseContainersAreIgnored(t *testing.T) {
makeImage(0, 1024),
makeImage(1, 2048),
}
fakeRuntime.AllPodList = []*container.Pod{
{
fakeRuntime.AllPodList = []*containertest.FakePod{
{Pod: &container.Pod{
Containers: []*container.Container{
makeContainer(1),
},
},
}},
}
spaceFreed, err := manager.freeSpace(2048, time.Now())
@ -244,29 +244,29 @@ func TestFreeSpaceRemoveByLeastRecentlyUsed(t *testing.T) {
makeImage(0, 1024),
makeImage(1, 2048),
}
fakeRuntime.AllPodList = []*container.Pod{
{
fakeRuntime.AllPodList = []*containertest.FakePod{
{Pod: &container.Pod{
Containers: []*container.Container{
makeContainer(0),
makeContainer(1),
},
},
}},
}
// Make 1 be more recently used than 0.
require.NoError(t, manager.detectImages(zero))
fakeRuntime.AllPodList = []*container.Pod{
{
fakeRuntime.AllPodList = []*containertest.FakePod{
{Pod: &container.Pod{
Containers: []*container.Container{
makeContainer(1),
},
},
}},
}
require.NoError(t, manager.detectImages(time.Now()))
fakeRuntime.AllPodList = []*container.Pod{
{
fakeRuntime.AllPodList = []*containertest.FakePod{
{Pod: &container.Pod{
Containers: []*container.Container{},
},
}},
}
require.NoError(t, manager.detectImages(time.Now()))
require.Equal(t, manager.imageRecordsLen(), 2)
@ -283,12 +283,12 @@ func TestFreeSpaceTiesBrokenByDetectedTime(t *testing.T) {
fakeRuntime.ImageList = []container.Image{
makeImage(0, 1024),
}
fakeRuntime.AllPodList = []*container.Pod{
{
fakeRuntime.AllPodList = []*containertest.FakePod{
{Pod: &container.Pod{
Containers: []*container.Container{
makeContainer(0),
},
},
}},
}
// Make 1 more recently detected but used at the same time as 0.
@ -298,7 +298,7 @@ func TestFreeSpaceTiesBrokenByDetectedTime(t *testing.T) {
makeImage(1, 2048),
}
require.NoError(t, manager.detectImages(time.Now()))
fakeRuntime.AllPodList = []*container.Pod{}
fakeRuntime.AllPodList = []*containertest.FakePod{}
require.NoError(t, manager.detectImages(time.Now()))
require.Equal(t, manager.imageRecordsLen(), 2)
@ -319,15 +319,15 @@ func TestFreeSpaceImagesAlsoDoesLookupByRepoTags(t *testing.T) {
Size: 2048,
},
}
fakeRuntime.AllPodList = []*container.Pod{
{
fakeRuntime.AllPodList = []*containertest.FakePod{
{Pod: &container.Pod{
Containers: []*container.Container{
{
ID: container.ContainerID{Type: "test", ID: "c5678"},
Image: "salad",
},
},
},
}},
}
spaceFreed, err := manager.freeSpace(1024, time.Now())
@ -347,15 +347,15 @@ func TestFreeSpaceImagesAlsoDoesLookupByRepoDigests(t *testing.T) {
Size: 2048,
},
}
fakeRuntime.AllPodList = []*container.Pod{
{
fakeRuntime.AllPodList = []*containertest.FakePod{
{Pod: &container.Pod{
Containers: []*container.Container{
{
ID: container.ContainerID{Type: "test", ID: "c5678"},
Image: "salad",
},
},
},
}},
}
spaceFreed, err := manager.freeSpace(1024, time.Now())
@ -451,12 +451,12 @@ func TestGarbageCollectImageNotOldEnough(t *testing.T) {
makeImage(1, 2048),
}
// 1 image is in use, and another one is not old enough
fakeRuntime.AllPodList = []*container.Pod{
{
fakeRuntime.AllPodList = []*containertest.FakePod{
{Pod: &container.Pod{
Containers: []*container.Container{
makeContainer(1),
},
},
}},
}
fakeClock := util.NewFakeClock(time.Now())

View File

@ -22,6 +22,7 @@ import (
cadvisorapi "github.com/google/cadvisor/info/v1"
kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
kubecontainertest "k8s.io/kubernetes/pkg/kubelet/container/testing"
)
func TestGetContainerInfo(t *testing.T) {
@ -39,8 +40,8 @@ func TestGetContainerInfo(t *testing.T) {
cadvisorReq := &cadvisorapi.ContainerInfoRequest{}
mockCadvisor := testKubelet.fakeCadvisor
mockCadvisor.On("DockerContainer", containerID, cadvisorReq).Return(containerInfo, nil)
fakeRuntime.PodList = []*kubecontainer.Pod{
{
fakeRuntime.PodList = []*kubecontainertest.FakePod{
{Pod: &kubecontainer.Pod{
ID: "12345678",
Name: "qux",
Namespace: "ns",
@ -50,7 +51,7 @@ func TestGetContainerInfo(t *testing.T) {
ID: kubecontainer.ContainerID{Type: "test", ID: containerID},
},
},
},
}},
}
stats, err := kubelet.GetContainerInfo("qux_ns", "", "foo", cadvisorReq)
if err != nil {
@ -122,8 +123,8 @@ func TestGetContainerInfoWhenCadvisorFailed(t *testing.T) {
containerInfo := cadvisorapi.ContainerInfo{}
cadvisorReq := &cadvisorapi.ContainerInfoRequest{}
mockCadvisor.On("DockerContainer", containerID, cadvisorReq).Return(containerInfo, cadvisorApiFailure)
fakeRuntime.PodList = []*kubecontainer.Pod{
{
fakeRuntime.PodList = []*kubecontainertest.FakePod{
{Pod: &kubecontainer.Pod{
ID: "uuid",
Name: "qux",
Namespace: "ns",
@ -132,7 +133,7 @@ func TestGetContainerInfoWhenCadvisorFailed(t *testing.T) {
ID: kubecontainer.ContainerID{Type: "test", ID: containerID},
},
},
},
}},
}
stats, err := kubelet.GetContainerInfo("qux_ns", "uuid", "foo", cadvisorReq)
if stats != nil {
@ -153,7 +154,7 @@ func TestGetContainerInfoOnNonExistContainer(t *testing.T) {
kubelet := testKubelet.kubelet
mockCadvisor := testKubelet.fakeCadvisor
fakeRuntime := testKubelet.fakeRuntime
fakeRuntime.PodList = []*kubecontainer.Pod{}
fakeRuntime.PodList = []*kubecontainertest.FakePod{}
stats, _ := kubelet.GetContainerInfo("qux", "", "foo", nil)
if stats != nil {
@ -206,8 +207,8 @@ func TestGetContainerInfoWithNoMatchingContainers(t *testing.T) {
fakeRuntime := testKubelet.fakeRuntime
kubelet := testKubelet.kubelet
mockCadvisor := testKubelet.fakeCadvisor
fakeRuntime.PodList = []*kubecontainer.Pod{
{
fakeRuntime.PodList = []*kubecontainertest.FakePod{
{Pod: &kubecontainer.Pod{
ID: "12345678",
Name: "qux",
Namespace: "ns",
@ -216,6 +217,7 @@ func TestGetContainerInfoWithNoMatchingContainers(t *testing.T) {
ID: kubecontainer.ContainerID{Type: "test", ID: "fakeID"},
},
}},
},
}
stats, err := kubelet.GetContainerInfo("qux_ns", "", "foo", nil)

View File

@ -408,15 +408,15 @@ func TestSyncPodsDeletesWhenSourcesAreReady(t *testing.T) {
kubelet := testKubelet.kubelet
kubelet.sourcesReady = config.NewSourcesReady(func(_ sets.String) bool { return ready })
fakeRuntime.PodList = []*kubecontainer.Pod{
{
fakeRuntime.PodList = []*containertest.FakePod{
{Pod: &kubecontainer.Pod{
ID: "12345678",
Name: "foo",
Namespace: "new",
Containers: []*kubecontainer.Container{
{Name: "bar"},
},
},
}},
}
kubelet.HandlePodCleanups()
// Sources are not ready yet. Don't remove any pods.
@ -1087,7 +1087,7 @@ func TestRunInContainerNoSuchPod(t *testing.T) {
testKubelet := newTestKubelet(t, false /* controllerAttachDetachEnabled */)
kubelet := testKubelet.kubelet
fakeRuntime := testKubelet.fakeRuntime
fakeRuntime.PodList = []*kubecontainer.Pod{}
fakeRuntime.PodList = []*containertest.FakePod{}
podName := "podFoo"
podNamespace := "nsFoo"
@ -1113,8 +1113,8 @@ func TestRunInContainer(t *testing.T) {
kubelet.runner = &fakeCommandRunner
containerID := kubecontainer.ContainerID{Type: "test", ID: "abc1234"}
fakeRuntime.PodList = []*kubecontainer.Pod{
{
fakeRuntime.PodList = []*containertest.FakePod{
{Pod: &kubecontainer.Pod{
ID: "12345678",
Name: "podFoo",
Namespace: "nsFoo",
@ -1123,7 +1123,7 @@ func TestRunInContainer(t *testing.T) {
ID: containerID,
},
},
},
}},
}
cmd := []string{"ls"}
_, err := kubelet.RunInContainer("podFoo_nsFoo", "", "containerFoo", cmd)
@ -2069,7 +2069,7 @@ func TestExecInContainerNoSuchPod(t *testing.T) {
fakeRuntime := testKubelet.fakeRuntime
fakeCommandRunner := fakeContainerCommandRunner{}
kubelet.runner = &fakeCommandRunner
fakeRuntime.PodList = []*kubecontainer.Pod{}
fakeRuntime.PodList = []*containertest.FakePod{}
podName := "podFoo"
podNamespace := "nsFoo"
@ -2102,8 +2102,8 @@ func TestExecInContainerNoSuchContainer(t *testing.T) {
podName := "podFoo"
podNamespace := "nsFoo"
containerID := "containerFoo"
fakeRuntime.PodList = []*kubecontainer.Pod{
{
fakeRuntime.PodList = []*containertest.FakePod{
{Pod: &kubecontainer.Pod{
ID: "12345678",
Name: podName,
Namespace: podNamespace,
@ -2111,7 +2111,7 @@ func TestExecInContainerNoSuchContainer(t *testing.T) {
{Name: "bar",
ID: kubecontainer.ContainerID{Type: "test", ID: "barID"}},
},
},
}},
}
err := kubelet.ExecInContainer(
@ -2165,8 +2165,8 @@ func TestExecInContainer(t *testing.T) {
stdout := &fakeReadWriteCloser{}
stderr := &fakeReadWriteCloser{}
tty := true
fakeRuntime.PodList = []*kubecontainer.Pod{
{
fakeRuntime.PodList = []*containertest.FakePod{
{Pod: &kubecontainer.Pod{
ID: "12345678",
Name: podName,
Namespace: podNamespace,
@ -2175,7 +2175,7 @@ func TestExecInContainer(t *testing.T) {
ID: kubecontainer.ContainerID{Type: "test", ID: containerID},
},
},
},
}},
}
err := kubelet.ExecInContainer(
@ -2215,7 +2215,7 @@ func TestPortForwardNoSuchPod(t *testing.T) {
testKubelet := newTestKubelet(t, false /* controllerAttachDetachEnabled */)
kubelet := testKubelet.kubelet
fakeRuntime := testKubelet.fakeRuntime
fakeRuntime.PodList = []*kubecontainer.Pod{}
fakeRuntime.PodList = []*containertest.FakePod{}
fakeCommandRunner := fakeContainerCommandRunner{}
kubelet.runner = &fakeCommandRunner
@ -2245,8 +2245,8 @@ func TestPortForward(t *testing.T) {
podName := "podFoo"
podNamespace := "nsFoo"
podID := types.UID("12345678")
fakeRuntime.PodList = []*kubecontainer.Pod{
{
fakeRuntime.PodList = []*containertest.FakePod{
{Pod: &kubecontainer.Pod{
ID: podID,
Name: podName,
Namespace: podNamespace,
@ -2256,7 +2256,7 @@ func TestPortForward(t *testing.T) {
ID: kubecontainer.ContainerID{Type: "test", ID: "containerFoo"},
},
},
},
}},
}
fakeCommandRunner := fakeContainerCommandRunner{}
kubelet.runner = &fakeCommandRunner
@ -3594,8 +3594,8 @@ func TestGetContainerInfoForMirrorPods(t *testing.T) {
mockCadvisor.On("DockerContainer", containerID, cadvisorReq).Return(containerInfo, nil)
kubelet := testKubelet.kubelet
fakeRuntime.PodList = []*kubecontainer.Pod{
{
fakeRuntime.PodList = []*containertest.FakePod{
{Pod: &kubecontainer.Pod{
ID: "1234",
Name: "qux",
Namespace: "ns",
@ -3605,7 +3605,7 @@ func TestGetContainerInfoForMirrorPods(t *testing.T) {
ID: kubecontainer.ContainerID{Type: "test", ID: containerID},
},
},
},
}},
}
kubelet.podManager.SetPods(pods)
@ -3930,15 +3930,15 @@ func TestSyncPodsSetStatusToFailedForPodsThatRunTooLong(t *testing.T) {
},
}
fakeRuntime.PodList = []*kubecontainer.Pod{
{
fakeRuntime.PodList = []*containertest.FakePod{
{Pod: &kubecontainer.Pod{
ID: "12345678",
Name: "bar",
Namespace: "new",
Containers: []*kubecontainer.Container{
{Name: "foo"},
},
},
}},
}
// Let the pod worker sets the status to fail after this sync.
@ -3986,15 +3986,15 @@ func TestSyncPodsDoesNotSetPodsThatDidNotRunTooLongToFailed(t *testing.T) {
},
}
fakeRuntime.PodList = []*kubecontainer.Pod{
{
fakeRuntime.PodList = []*containertest.FakePod{
{Pod: &kubecontainer.Pod{
ID: "12345678",
Name: "bar",
Namespace: "new",
Containers: []*kubecontainer.Container{
{Name: "foo"},
},
},
}},
}
kubelet.podManager.SetPods(pods)
@ -4118,13 +4118,13 @@ func TestDoesNotDeletePodDirsIfContainerIsRunning(t *testing.T) {
// Pretend the pod is deleted from apiserver, but is still active on the node.
// The pod directory should not be removed.
pods = []*api.Pod{}
testKubelet.fakeRuntime.PodList = []*kubecontainer.Pod{runningPod}
testKubelet.fakeRuntime.PodList = []*containertest.FakePod{{runningPod, ""}}
syncAndVerifyPodDir(t, testKubelet, pods, []*api.Pod{apiPod}, true)
// The pod is deleted and also not active on the node. The pod directory
// should be removed.
pods = []*api.Pod{}
testKubelet.fakeRuntime.PodList = []*kubecontainer.Pod{}
testKubelet.fakeRuntime.PodList = []*containertest.FakePod{}
syncAndVerifyPodDir(t, testKubelet, pods, []*api.Pod{apiPod}, false)
}

View File

@ -18,17 +18,15 @@ package cni
import (
"fmt"
"net"
"sort"
"strings"
"github.com/appc/cni/libcni"
cnitypes "github.com/appc/cni/pkg/types"
"github.com/golang/glog"
"k8s.io/kubernetes/pkg/apis/componentconfig"
kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
"k8s.io/kubernetes/pkg/kubelet/dockertools"
"k8s.io/kubernetes/pkg/kubelet/network"
utilexec "k8s.io/kubernetes/pkg/util/exec"
)
const (
@ -43,6 +41,8 @@ type cniNetworkPlugin struct {
defaultNetwork *cniNetwork
host network.Host
execer utilexec.Interface
nsenterPath string
}
type cniNetwork struct {
@ -57,7 +57,10 @@ func probeNetworkPluginsWithVendorCNIDirPrefix(pluginDir, vendorCNIDirPrefix str
if err != nil {
return configList
}
return append(configList, &cniNetworkPlugin{defaultNetwork: network})
return append(configList, &cniNetworkPlugin{
defaultNetwork: network,
execer: utilexec.New(),
})
}
func ProbeNetworkPlugins(pluginDir string) []network.NetworkPlugin {
@ -95,6 +98,12 @@ func getDefaultCNINetwork(pluginDir, vendorCNIDirPrefix string) (*cniNetwork, er
}
func (plugin *cniNetworkPlugin) Init(host network.Host, hairpinMode componentconfig.HairpinMode, nonMasqueradeCIDR string) error {
var err error
plugin.nsenterPath, err = plugin.execer.LookPath("nsenter")
if err != nil {
return err
}
plugin.host = host
return nil
}
@ -104,16 +113,12 @@ func (plugin *cniNetworkPlugin) Name() string {
}
func (plugin *cniNetworkPlugin) SetUpPod(namespace string, name string, id kubecontainer.ContainerID) error {
runtime, ok := plugin.host.GetRuntime().(*dockertools.DockerManager)
if !ok {
return fmt.Errorf("CNI execution called on non-docker runtime")
}
netns, err := runtime.GetNetNS(id)
netnsPath, err := plugin.host.GetRuntime().GetNetNS(id)
if err != nil {
return err
return fmt.Errorf("CNI failed to retrieve network namespace path: %v", err)
}
_, err = plugin.defaultNetwork.addToNetwork(name, namespace, id, netns)
_, err = plugin.defaultNetwork.addToNetwork(name, namespace, id, netnsPath)
if err != nil {
glog.Errorf("Error while adding to cni network: %s", err)
return err
@ -123,33 +128,27 @@ func (plugin *cniNetworkPlugin) SetUpPod(namespace string, name string, id kubec
}
func (plugin *cniNetworkPlugin) TearDownPod(namespace string, name string, id kubecontainer.ContainerID) error {
runtime, ok := plugin.host.GetRuntime().(*dockertools.DockerManager)
if !ok {
return fmt.Errorf("CNI execution called on non-docker runtime")
}
netns, err := runtime.GetNetNS(id)
netnsPath, err := plugin.host.GetRuntime().GetNetNS(id)
if err != nil {
return err
return fmt.Errorf("CNI failed to retrieve network namespace path: %v", err)
}
return plugin.defaultNetwork.deleteFromNetwork(name, namespace, id, netns)
return plugin.defaultNetwork.deleteFromNetwork(name, namespace, id, netnsPath)
}
// TODO: Use the addToNetwork function to obtain the IP of the Pod. That will assume idempotent ADD call to the plugin.
// Also fix the runtime's call to Status function to be done only in the case that the IP is lost, no need to do periodic calls
func (plugin *cniNetworkPlugin) GetPodNetworkStatus(namespace string, name string, id kubecontainer.ContainerID) (*network.PodNetworkStatus, error) {
runtime, ok := plugin.host.GetRuntime().(*dockertools.DockerManager)
if !ok {
return nil, fmt.Errorf("CNI execution called on non-docker runtime")
netnsPath, err := plugin.host.GetRuntime().GetNetNS(id)
if err != nil {
return nil, fmt.Errorf("CNI failed to retrieve network namespace path: %v", err)
}
ipStr, err := runtime.GetContainerIP(id.ID, network.DefaultInterfaceName)
if err != nil {
return nil, err
}
ip, _, err := net.ParseCIDR(strings.Trim(ipStr, "\n"))
ip, err := network.GetPodIP(plugin.execer, plugin.nsenterPath, netnsPath, network.DefaultInterfaceName)
if err != nil {
return nil, err
}
return &network.PodNetworkStatus{IP: ip}, nil
}

View File

@ -30,18 +30,12 @@ import (
clientset "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset"
cadvisorapi "github.com/google/cadvisor/info/v1"
"k8s.io/kubernetes/cmd/kubelet/app/options"
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/apis/componentconfig"
"k8s.io/kubernetes/pkg/client/record"
kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
containertest "k8s.io/kubernetes/pkg/kubelet/container/testing"
"k8s.io/kubernetes/pkg/kubelet/dockertools"
"k8s.io/kubernetes/pkg/kubelet/network"
nettest "k8s.io/kubernetes/pkg/kubelet/network/testing"
proberesults "k8s.io/kubernetes/pkg/kubelet/prober/results"
utilexec "k8s.io/kubernetes/pkg/util/exec"
utiltesting "k8s.io/kubernetes/pkg/util/testing"
)
@ -115,10 +109,16 @@ func tearDownPlugin(tmpDir string) {
type fakeNetworkHost struct {
kubeClient clientset.Interface
runtime kubecontainer.Runtime
}
func NewFakeHost(kubeClient clientset.Interface) *fakeNetworkHost {
host := &fakeNetworkHost{kubeClient: kubeClient}
func NewFakeHost(kubeClient clientset.Interface, pods []*containertest.FakePod) *fakeNetworkHost {
host := &fakeNetworkHost{
kubeClient: kubeClient,
runtime: &containertest.FakeRuntime{
AllPodList: pods,
},
}
return host
}
@ -127,40 +127,11 @@ func (fnh *fakeNetworkHost) GetPodByName(name, namespace string) (*api.Pod, bool
}
func (fnh *fakeNetworkHost) GetKubeClient() clientset.Interface {
return nil
return fnh.kubeClient
}
func (nh *fakeNetworkHost) GetRuntime() kubecontainer.Runtime {
dm, fakeDockerClient := newTestDockerManager()
fakeDockerClient.SetFakeRunningContainers([]*dockertools.FakeContainer{
{
ID: "test_infra_container",
Pid: 12345,
},
})
return dm
}
func newTestDockerManager() (*dockertools.DockerManager, *dockertools.FakeDockerClient) {
fakeDocker := dockertools.NewFakeDockerClient()
fakeRecorder := &record.FakeRecorder{}
containerRefManager := kubecontainer.NewRefManager()
networkPlugin, _ := network.InitNetworkPlugin([]network.NetworkPlugin{}, "", nettest.NewFakeHost(nil), componentconfig.HairpinNone, "10.0.0.0/8")
dockerManager := dockertools.NewFakeDockerManager(
fakeDocker,
fakeRecorder,
proberesults.NewManager(),
containerRefManager,
&cadvisorapi.MachineInfo{},
options.GetDefaultPodInfraContainerImage(),
0, 0, "",
&containertest.FakeOS{},
networkPlugin,
nil,
nil,
nil)
return dockerManager, fakeDocker
func (fnh *fakeNetworkHost) GetRuntime() kubecontainer.Runtime {
return fnh.runtime
}
func TestCNIPlugin(t *testing.T) {
@ -168,19 +139,64 @@ func TestCNIPlugin(t *testing.T) {
pluginName := fmt.Sprintf("test%d", rand.Intn(1000))
vendorName := fmt.Sprintf("test_vendor%d", rand.Intn(1000))
podIP := "10.0.0.2"
podIPOutput := fmt.Sprintf("4: eth0 inet %s/24 scope global dynamic eth0\\ valid_lft forever preferred_lft forever", podIP)
fakeCmds := []utilexec.FakeCommandAction{
func(cmd string, args ...string) utilexec.Cmd {
return utilexec.InitFakeCmd(&utilexec.FakeCmd{
CombinedOutputScript: []utilexec.FakeCombinedOutputAction{
func() ([]byte, error) {
return []byte(podIPOutput), nil
},
},
}, cmd, args...)
},
}
fexec := &utilexec.FakeExec{
CommandScript: fakeCmds,
LookPathFunc: func(file string) (string, error) {
return fmt.Sprintf("/fake-bin/%s", file), nil
},
}
tmpDir := utiltesting.MkTmpdirOrDie("cni-test")
testNetworkConfigPath := path.Join(tmpDir, "plugins", "net", "cni")
testVendorCNIDirPrefix := tmpDir
defer tearDownPlugin(tmpDir)
installPluginUnderTest(t, testVendorCNIDirPrefix, testNetworkConfigPath, vendorName, pluginName)
np := probeNetworkPluginsWithVendorCNIDirPrefix(path.Join(testNetworkConfigPath, pluginName), testVendorCNIDirPrefix)
plug, err := network.InitNetworkPlugin(np, "cni", NewFakeHost(nil), componentconfig.HairpinNone, "10.0.0.0/8")
containerID := kubecontainer.ContainerID{Type: "test", ID: "test_infra_container"}
pods := []*containertest.FakePod{{
Pod: &kubecontainer.Pod{
Containers: []*kubecontainer.Container{
{ID: containerID},
},
},
NetnsPath: "/proc/12345/ns/net",
}}
plugins := probeNetworkPluginsWithVendorCNIDirPrefix(path.Join(testNetworkConfigPath, pluginName), testVendorCNIDirPrefix)
if len(plugins) != 1 {
t.Fatalf("Expected only one network plugin, got %d", len(plugins))
}
if plugins[0].Name() != "cni" {
t.Fatalf("Expected CNI network plugin, got %q", plugins[0].Name())
}
cniPlugin, ok := plugins[0].(*cniNetworkPlugin)
if !ok {
t.Fatalf("Not a CNI network plugin!")
}
cniPlugin.execer = fexec
plug, err := network.InitNetworkPlugin(plugins, "cni", NewFakeHost(nil, pods), componentconfig.HairpinNone, "10.0.0.0/8")
if err != nil {
t.Fatalf("Failed to select the desired plugin: %v", err)
}
err = plug.SetUpPod("podNamespace", "podName", kubecontainer.ContainerID{Type: "docker", ID: "test_infra_container"})
// Set up the pod
err = plug.SetUpPod("podNamespace", "podName", containerID)
if err != nil {
t.Errorf("Expected nil: %v", err)
}
@ -195,7 +211,18 @@ func TestCNIPlugin(t *testing.T) {
if string(output) != expectedOutput {
t.Errorf("Mismatch in expected output for setup hook. Expected '%s', got '%s'", expectedOutput, string(output))
}
err = plug.TearDownPod("podNamespace", "podName", kubecontainer.ContainerID{Type: "docker", ID: "test_infra_container"})
// Get its IP address
status, err := plug.GetPodNetworkStatus("podNamespace", "podName", containerID)
if err != nil {
t.Errorf("Failed to read pod network status: %v", err)
}
if status.IP.String() != podIP {
t.Errorf("Expected pod IP %q but got %q", podIP, status.IP.String())
}
// Tear it down
err = plug.TearDownPod("podNamespace", "podName", containerID)
if err != nil {
t.Errorf("Expected nil: %v", err)
}

View File

@ -34,7 +34,6 @@ import (
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/apis/componentconfig"
kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
"k8s.io/kubernetes/pkg/kubelet/dockertools"
"k8s.io/kubernetes/pkg/kubelet/network"
"k8s.io/kubernetes/pkg/util/bandwidth"
utildbus "k8s.io/kubernetes/pkg/util/dbus"
@ -96,6 +95,7 @@ func NewPlugin(networkPluginDir string) network.NetworkPlugin {
func (plugin *kubenetNetworkPlugin) Init(host network.Host, hairpinMode componentconfig.HairpinMode, nonMasqueradeCIDR string) error {
plugin.host = host
plugin.hairpinMode = hairpinMode
plugin.nonMasqueradeCIDR = nonMasqueradeCIDR
plugin.cniConfig = &libcni.CNIConfig{
Path: []string{DefaultCNIDir, plugin.vendorDir},
}
@ -128,7 +128,11 @@ func (plugin *kubenetNetworkPlugin) Init(host network.Host, hairpinMode componen
return fmt.Errorf("Failed to generate loopback config: %v", err)
}
plugin.nonMasqueradeCIDR = nonMasqueradeCIDR
plugin.nsenterPath, err = plugin.execer.LookPath("nsenter")
if err != nil {
return fmt.Errorf("Failed to find nsenter binary: %v", err)
}
// Need to SNAT outbound traffic from cluster
if err = plugin.ensureMasqRule(); err != nil {
return err
@ -464,24 +468,11 @@ func (plugin *kubenetNetworkPlugin) GetPodNetworkStatus(namespace string, name s
if err != nil {
return nil, fmt.Errorf("Kubenet failed to retrieve network namespace path: %v", err)
}
nsenterPath, err := plugin.getNsenterPath()
ip, err := network.GetPodIP(plugin.execer, plugin.nsenterPath, netnsPath, network.DefaultInterfaceName)
if err != nil {
return nil, err
}
// Try to retrieve ip inside container network namespace
output, err := plugin.execer.Command(nsenterPath, fmt.Sprintf("--net=%s", netnsPath), "-F", "--",
"ip", "-o", "-4", "addr", "show", "dev", network.DefaultInterfaceName).CombinedOutput()
if err != nil {
return nil, fmt.Errorf("Unexpected command output %s with error: %v", output, err)
}
fields := strings.Fields(string(output))
if len(fields) < 4 {
return nil, fmt.Errorf("Unexpected command output %s ", output)
}
ip, _, err := net.ParseCIDR(fields[3])
if err != nil {
return nil, fmt.Errorf("Kubenet failed to parse ip from output %s due to %v", output, err)
}
plugin.podIPs[id] = ip.String()
return &network.PodNetworkStatus{IP: ip}, nil
}
@ -504,11 +495,11 @@ func (plugin *kubenetNetworkPlugin) getRunningPods() ([]*hostport.RunningPod, er
}
runningPods := make([]*hostport.RunningPod, 0)
for _, p := range pods {
for _, c := range p.Containers {
if c.Name != dockertools.PodInfraContainerName {
containerID, err := plugin.host.GetRuntime().GetPodContainerID(p)
if err != nil {
continue
}
ipString, ok := plugin.podIPs[c.ID]
ipString, ok := plugin.podIPs[containerID]
if !ok {
continue
}
@ -523,7 +514,6 @@ func (plugin *kubenetNetworkPlugin) getRunningPods() ([]*hostport.RunningPod, er
})
}
}
}
return runningPods, nil
}
@ -567,17 +557,6 @@ func (plugin *kubenetNetworkPlugin) delContainerFromNetwork(config *libcni.Netwo
return nil
}
func (plugin *kubenetNetworkPlugin) getNsenterPath() (string, error) {
if plugin.nsenterPath == "" {
nsenterPath, err := plugin.execer.LookPath("nsenter")
if err != nil {
return "", err
}
plugin.nsenterPath = nsenterPath
}
return plugin.nsenterPath, nil
}
// shaper retrieves the bandwidth shaper and, if it hasn't been fetched before,
// initializes it and ensures the bridge is appropriately configured
// This function should only be called while holding the `plugin.mu` lock

View File

@ -199,3 +199,41 @@ func (plugin *NoopNetworkPlugin) GetPodNetworkStatus(namespace string, name stri
func (plugin *NoopNetworkPlugin) Status() error {
return nil
}
func getOnePodIP(execer utilexec.Interface, nsenterPath, netnsPath, interfaceName, addrType string) (net.IP, error) {
// Try to retrieve ip inside container network namespace
output, err := execer.Command(nsenterPath, fmt.Sprintf("--net=%s", netnsPath), "-F", "--",
"ip", "-o", addrType, "addr", "show", "dev", interfaceName, "scope", "global").CombinedOutput()
if err != nil {
return nil, fmt.Errorf("Unexpected command output %s with error: %v", output, err)
}
lines := strings.Split(string(output), "\n")
if len(lines) < 1 {
return nil, fmt.Errorf("Unexpected command output %s", output)
}
fields := strings.Fields(lines[0])
if len(fields) < 4 {
return nil, fmt.Errorf("Unexpected address output %s ", lines[0])
}
ip, _, err := net.ParseCIDR(fields[3])
if err != nil {
return nil, fmt.Errorf("CNI failed to parse ip from output %s due to %v", output, err)
}
return ip, nil
}
// GetPodIP gets the IP of the pod by inspecting the network info inside the pod's network namespace.
func GetPodIP(execer utilexec.Interface, nsenterPath, netnsPath, interfaceName string) (net.IP, error) {
ip, err := getOnePodIP(execer, nsenterPath, netnsPath, interfaceName, "-4")
if err != nil {
// Fall back to IPv6 address if no IPv4 address is present
ip, err = getOnePodIP(execer, nsenterPath, netnsPath, interfaceName, "-6")
}
if err != nil {
return nil, err
}
return ip, nil
}

View File

@ -97,21 +97,21 @@ func TestRelisting(t *testing.T) {
pleg, runtime := testPleg.pleg, testPleg.runtime
ch := pleg.Watch()
// The first relist should send a PodSync event to each pod.
runtime.AllPodList = []*kubecontainer.Pod{
{
runtime.AllPodList = []*containertest.FakePod{
{Pod: &kubecontainer.Pod{
ID: "1234",
Containers: []*kubecontainer.Container{
createTestContainer("c1", kubecontainer.ContainerStateExited),
createTestContainer("c2", kubecontainer.ContainerStateRunning),
createTestContainer("c3", kubecontainer.ContainerStateUnknown),
},
},
{
}},
{Pod: &kubecontainer.Pod{
ID: "4567",
Containers: []*kubecontainer.Container{
createTestContainer("c1", kubecontainer.ContainerStateExited),
},
},
}},
}
pleg.relist()
// Report every running/exited container if we see them for the first time.
@ -128,20 +128,20 @@ func TestRelisting(t *testing.T) {
pleg.relist()
verifyEvents(t, expected, actual)
runtime.AllPodList = []*kubecontainer.Pod{
{
runtime.AllPodList = []*containertest.FakePod{
{Pod: &kubecontainer.Pod{
ID: "1234",
Containers: []*kubecontainer.Container{
createTestContainer("c2", kubecontainer.ContainerStateExited),
createTestContainer("c3", kubecontainer.ContainerStateRunning),
},
},
{
}},
{Pod: &kubecontainer.Pod{
ID: "4567",
Containers: []*kubecontainer.Container{
createTestContainer("c4", kubecontainer.ContainerStateRunning),
},
},
}},
}
pleg.relist()
// Only report containers that transitioned to running or exited status.
@ -169,15 +169,15 @@ func testReportMissingContainers(t *testing.T, numRelists int) {
testPleg := newTestGenericPLEG()
pleg, runtime := testPleg.pleg, testPleg.runtime
ch := pleg.Watch()
runtime.AllPodList = []*kubecontainer.Pod{
{
runtime.AllPodList = []*containertest.FakePod{
{Pod: &kubecontainer.Pod{
ID: "1234",
Containers: []*kubecontainer.Container{
createTestContainer("c1", kubecontainer.ContainerStateRunning),
createTestContainer("c2", kubecontainer.ContainerStateRunning),
createTestContainer("c3", kubecontainer.ContainerStateExited),
},
},
}},
}
// Relist and drain the events from the channel.
for i := 0; i < numRelists; i++ {
@ -188,13 +188,13 @@ func testReportMissingContainers(t *testing.T, numRelists int) {
// Container c2 was stopped and removed between relists. We should report
// the event. The exited container c3 was garbage collected (i.e., removed)
// between relists. We should ignore that event.
runtime.AllPodList = []*kubecontainer.Pod{
{
runtime.AllPodList = []*containertest.FakePod{
{Pod: &kubecontainer.Pod{
ID: "1234",
Containers: []*kubecontainer.Container{
createTestContainer("c1", kubecontainer.ContainerStateRunning),
},
},
}},
}
pleg.relist()
expected := []*PodLifecycleEvent{
@ -208,13 +208,13 @@ func testReportMissingPods(t *testing.T, numRelists int) {
testPleg := newTestGenericPLEG()
pleg, runtime := testPleg.pleg, testPleg.runtime
ch := pleg.Watch()
runtime.AllPodList = []*kubecontainer.Pod{
{
runtime.AllPodList = []*containertest.FakePod{
{Pod: &kubecontainer.Pod{
ID: "1234",
Containers: []*kubecontainer.Container{
createTestContainer("c2", kubecontainer.ContainerStateRunning),
},
},
}},
}
// Relist and drain the events from the channel.
for i := 0; i < numRelists; i++ {
@ -224,7 +224,7 @@ func testReportMissingPods(t *testing.T, numRelists int) {
// Container c2 was stopped and removed between relists. We should report
// the event.
runtime.AllPodList = []*kubecontainer.Pod{}
runtime.AllPodList = []*containertest.FakePod{}
pleg.relist()
expected := []*PodLifecycleEvent{
{ID: "1234", Type: ContainerDied, Data: "c2"},

View File

@ -1785,6 +1785,10 @@ func (r *Runtime) GetNetNS(containerID kubecontainer.ContainerID) (string, error
return netnsPathFromName(makePodNetnsName(kubetypes.UID(containerID.ID))), nil
}
func (r *Runtime) GetPodContainerID(pod *kubecontainer.Pod) (kubecontainer.ContainerID, error) {
return kubecontainer.ContainerID{ID: string(pod.ID)}, nil
}
func podDetailsFromServiceFile(serviceFilePath string) (string, string, string, bool, error) {
f, err := os.Open(serviceFilePath)
if err != nil {