kubelet/cni: make cni plugin runtime agnostic

Use the generic runtime method to get the netns path.  Also
move reading the container IP address into cni (based off kubenet)
instead of having it in the Docker manager code.  Both old and new
methods use nsenter and /sbin/ip and should be functionally
equivalent.
This commit is contained in:
Dan Williams 2016-06-20 17:14:08 -05:00
parent 4e2433cfab
commit 9865ac325c
9 changed files with 269 additions and 218 deletions

View File

@ -25,18 +25,28 @@ import (
ctest "k8s.io/kubernetes/pkg/kubelet/container/testing"
)
func comparePods(t *testing.T, expected []*ctest.FakePod, actual []*Pod) {
if len(expected) != len(actual) {
t.Errorf("expected %d pods, got %d instead", len(expected), len(actual))
}
for i := range expected {
if !reflect.DeepEqual(expected[i].Pod, actual[i]) {
t.Errorf("expected %#v, got %#v", expected[i].Pod, actual[i])
}
}
}
func TestGetPods(t *testing.T) {
runtime := &ctest.FakeRuntime{}
expected := []*Pod{{ID: "1111"}, {ID: "2222"}, {ID: "3333"}}
expected := []*ctest.FakePod{{Pod: &Pod{ID: "1111"}}, {Pod: &Pod{ID: "2222"}}, {Pod: &Pod{ID: "3333"}}}
runtime.PodList = expected
cache := NewTestRuntimeCache(runtime)
actual, err := cache.GetPods()
if err != nil {
t.Errorf("unexpected error %v", err)
}
if !reflect.DeepEqual(expected, actual) {
t.Errorf("expected %#v, got %#v", expected, actual)
}
comparePods(t, expected, actual)
}
func TestForceUpdateIfOlder(t *testing.T) {
@ -44,25 +54,21 @@ func TestForceUpdateIfOlder(t *testing.T) {
cache := NewTestRuntimeCache(runtime)
// Cache old pods.
oldpods := []*Pod{{ID: "1111"}}
oldpods := []*ctest.FakePod{{Pod: &Pod{ID: "1111"}}}
runtime.PodList = oldpods
cache.UpdateCacheWithLock()
// Update the runtime to new pods.
newpods := []*Pod{{ID: "1111"}, {ID: "2222"}, {ID: "3333"}}
newpods := []*ctest.FakePod{{Pod: &Pod{ID: "1111"}}, {Pod: &Pod{ID: "2222"}}, {Pod: &Pod{ID: "3333"}}}
runtime.PodList = newpods
// An older timestamp should not force an update.
cache.ForceUpdateIfOlder(time.Now().Add(-20 * time.Minute))
actual := cache.GetCachedPods()
if !reflect.DeepEqual(oldpods, actual) {
t.Errorf("expected %#v, got %#v", oldpods, actual)
}
comparePods(t, oldpods, actual)
// A newer timestamp should force an update.
cache.ForceUpdateIfOlder(time.Now().Add(20 * time.Second))
actual = cache.GetCachedPods()
if !reflect.DeepEqual(newpods, actual) {
t.Errorf("expected %#v, got %#v", newpods, actual)
}
comparePods(t, newpods, actual)
}

View File

@ -30,12 +30,17 @@ import (
"k8s.io/kubernetes/pkg/volume"
)
type FakePod struct {
Pod *Pod
NetnsPath string
}
// FakeRuntime is a fake container runtime for testing.
type FakeRuntime struct {
sync.Mutex
CalledFunctions []string
PodList []*Pod
AllPodList []*Pod
PodList []*FakePod
AllPodList []*FakePod
ImageList []Image
APIPodStatus api.PodStatus
PodStatus PodStatus
@ -98,8 +103,8 @@ func (f *FakeRuntime) ClearCalls() {
defer f.Unlock()
f.CalledFunctions = []string{}
f.PodList = []*Pod{}
f.AllPodList = []*Pod{}
f.PodList = []*FakePod{}
f.AllPodList = []*FakePod{}
f.APIPodStatus = api.PodStatus{}
f.StartedPods = []string{}
f.KilledPods = []string{}
@ -182,11 +187,19 @@ func (f *FakeRuntime) GetPods(all bool) ([]*Pod, error) {
f.Lock()
defer f.Unlock()
var pods []*Pod
f.CalledFunctions = append(f.CalledFunctions, "GetPods")
if all {
return f.AllPodList, f.Err
for _, fakePod := range f.AllPodList {
pods = append(pods, fakePod.Pod)
}
} else {
for _, fakePod := range f.PodList {
pods = append(pods, fakePod.Pod)
}
}
return f.PodList, f.Err
return pods, f.Err
}
func (f *FakeRuntime) SyncPod(pod *api.Pod, _ api.PodStatus, _ *PodStatus, _ []api.Secret, backOff *flowcontrol.Backoff) (result PodSyncResult) {
@ -343,6 +356,15 @@ func (f *FakeRuntime) GetNetNS(containerID ContainerID) (string, error) {
defer f.Unlock()
f.CalledFunctions = append(f.CalledFunctions, "GetNetNS")
for _, fp := range f.AllPodList {
for _, c := range fp.Pod.Containers {
if c.ID == containerID {
return fp.NetnsPath, nil
}
}
}
return "", f.Err
}

View File

@ -1157,41 +1157,6 @@ func (dm *DockerManager) PortForward(pod *kubecontainer.Pod, port uint16, stream
return nil
}
// Get the IP address of a container's interface using nsenter
func (dm *DockerManager) GetContainerIP(containerID, interfaceName string) (string, error) {
_, lookupErr := exec.LookPath("nsenter")
if lookupErr != nil {
return "", fmt.Errorf("Unable to obtain IP address of container: missing nsenter.")
}
container, err := dm.client.InspectContainer(containerID)
if err != nil {
return "", err
}
if !container.State.Running {
return "", fmt.Errorf("container not running (%s)", container.ID)
}
containerPid := container.State.Pid
extractIPCmd := fmt.Sprintf("ip -4 addr show %s | grep inet | awk -F\" \" '{print $2}'", interfaceName)
args := []string{"-t", fmt.Sprintf("%d", containerPid), "-n", "--", "bash", "-c", extractIPCmd}
command := exec.Command("nsenter", args...)
out, err := command.CombinedOutput()
// Fall back to IPv6 address if no IPv4 address is present
if err == nil && string(out) == "" {
extractIPCmd = fmt.Sprintf("ip -6 addr show %s scope global | grep inet6 | awk -F\" \" '{print $2}'", interfaceName)
args = []string{"-t", fmt.Sprintf("%d", containerPid), "-n", "--", "bash", "-c", extractIPCmd}
command = exec.Command("nsenter", args...)
out, err = command.CombinedOutput()
}
if err != nil {
return "", err
}
return string(out), nil
}
// TODO(random-liu): Change running pod to pod status in the future. We can't do it now, because kubelet also uses this function without pod status.
// We can only deprecate this after refactoring kubelet.
// TODO(random-liu): After using pod status for KillPod(), we can also remove the kubernetesPodLabel, because all the needed information should have

View File

@ -86,12 +86,12 @@ func TestDetectImagesInitialDetect(t *testing.T) {
makeImage(0, 1024),
makeImage(1, 2048),
}
fakeRuntime.AllPodList = []*container.Pod{
{
fakeRuntime.AllPodList = []*containertest.FakePod{
{Pod: &container.Pod{
Containers: []*container.Container{
makeContainer(1),
},
},
}},
}
startTime := time.Now().Add(-time.Millisecond)
@ -116,12 +116,12 @@ func TestDetectImagesWithNewImage(t *testing.T) {
makeImage(0, 1024),
makeImage(1, 2048),
}
fakeRuntime.AllPodList = []*container.Pod{
{
fakeRuntime.AllPodList = []*containertest.FakePod{
{Pod: &container.Pod{
Containers: []*container.Container{
makeContainer(1),
},
},
}},
}
err := manager.detectImages(zero)
@ -161,12 +161,12 @@ func TestDetectImagesContainerStopped(t *testing.T) {
makeImage(0, 1024),
makeImage(1, 2048),
}
fakeRuntime.AllPodList = []*container.Pod{
{
fakeRuntime.AllPodList = []*containertest.FakePod{
{Pod: &container.Pod{
Containers: []*container.Container{
makeContainer(1),
},
},
}},
}
err := manager.detectImages(zero)
@ -177,7 +177,7 @@ func TestDetectImagesContainerStopped(t *testing.T) {
require.True(t, ok)
// Simulate container being stopped.
fakeRuntime.AllPodList = []*container.Pod{}
fakeRuntime.AllPodList = []*containertest.FakePod{}
err = manager.detectImages(time.Now())
require.NoError(t, err)
assert.Equal(manager.imageRecordsLen(), 2)
@ -197,12 +197,12 @@ func TestDetectImagesWithRemovedImages(t *testing.T) {
makeImage(0, 1024),
makeImage(1, 2048),
}
fakeRuntime.AllPodList = []*container.Pod{
{
fakeRuntime.AllPodList = []*containertest.FakePod{
{Pod: &container.Pod{
Containers: []*container.Container{
makeContainer(1),
},
},
}},
}
err := manager.detectImages(zero)
@ -223,12 +223,12 @@ func TestFreeSpaceImagesInUseContainersAreIgnored(t *testing.T) {
makeImage(0, 1024),
makeImage(1, 2048),
}
fakeRuntime.AllPodList = []*container.Pod{
{
fakeRuntime.AllPodList = []*containertest.FakePod{
{Pod: &container.Pod{
Containers: []*container.Container{
makeContainer(1),
},
},
}},
}
spaceFreed, err := manager.freeSpace(2048, time.Now())
@ -244,29 +244,29 @@ func TestFreeSpaceRemoveByLeastRecentlyUsed(t *testing.T) {
makeImage(0, 1024),
makeImage(1, 2048),
}
fakeRuntime.AllPodList = []*container.Pod{
{
fakeRuntime.AllPodList = []*containertest.FakePod{
{Pod: &container.Pod{
Containers: []*container.Container{
makeContainer(0),
makeContainer(1),
},
},
}},
}
// Make 1 be more recently used than 0.
require.NoError(t, manager.detectImages(zero))
fakeRuntime.AllPodList = []*container.Pod{
{
fakeRuntime.AllPodList = []*containertest.FakePod{
{Pod: &container.Pod{
Containers: []*container.Container{
makeContainer(1),
},
},
}},
}
require.NoError(t, manager.detectImages(time.Now()))
fakeRuntime.AllPodList = []*container.Pod{
{
fakeRuntime.AllPodList = []*containertest.FakePod{
{Pod: &container.Pod{
Containers: []*container.Container{},
},
}},
}
require.NoError(t, manager.detectImages(time.Now()))
require.Equal(t, manager.imageRecordsLen(), 2)
@ -283,12 +283,12 @@ func TestFreeSpaceTiesBrokenByDetectedTime(t *testing.T) {
fakeRuntime.ImageList = []container.Image{
makeImage(0, 1024),
}
fakeRuntime.AllPodList = []*container.Pod{
{
fakeRuntime.AllPodList = []*containertest.FakePod{
{Pod: &container.Pod{
Containers: []*container.Container{
makeContainer(0),
},
},
}},
}
// Make 1 more recently detected but used at the same time as 0.
@ -298,7 +298,7 @@ func TestFreeSpaceTiesBrokenByDetectedTime(t *testing.T) {
makeImage(1, 2048),
}
require.NoError(t, manager.detectImages(time.Now()))
fakeRuntime.AllPodList = []*container.Pod{}
fakeRuntime.AllPodList = []*containertest.FakePod{}
require.NoError(t, manager.detectImages(time.Now()))
require.Equal(t, manager.imageRecordsLen(), 2)
@ -319,15 +319,15 @@ func TestFreeSpaceImagesAlsoDoesLookupByRepoTags(t *testing.T) {
Size: 2048,
},
}
fakeRuntime.AllPodList = []*container.Pod{
{
fakeRuntime.AllPodList = []*containertest.FakePod{
{Pod: &container.Pod{
Containers: []*container.Container{
{
ID: container.ContainerID{Type: "test", ID: "c5678"},
Image: "salad",
},
},
},
}},
}
spaceFreed, err := manager.freeSpace(1024, time.Now())
@ -347,15 +347,15 @@ func TestFreeSpaceImagesAlsoDoesLookupByRepoDigests(t *testing.T) {
Size: 2048,
},
}
fakeRuntime.AllPodList = []*container.Pod{
{
fakeRuntime.AllPodList = []*containertest.FakePod{
{Pod: &container.Pod{
Containers: []*container.Container{
{
ID: container.ContainerID{Type: "test", ID: "c5678"},
Image: "salad",
},
},
},
}},
}
spaceFreed, err := manager.freeSpace(1024, time.Now())
@ -451,12 +451,12 @@ func TestGarbageCollectImageNotOldEnough(t *testing.T) {
makeImage(1, 2048),
}
// 1 image is in use, and another one is not old enough
fakeRuntime.AllPodList = []*container.Pod{
{
fakeRuntime.AllPodList = []*containertest.FakePod{
{Pod: &container.Pod{
Containers: []*container.Container{
makeContainer(1),
},
},
}},
}
fakeClock := util.NewFakeClock(time.Now())

View File

@ -22,6 +22,7 @@ import (
cadvisorapi "github.com/google/cadvisor/info/v1"
kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
kubecontainertest "k8s.io/kubernetes/pkg/kubelet/container/testing"
)
func TestGetContainerInfo(t *testing.T) {
@ -39,8 +40,8 @@ func TestGetContainerInfo(t *testing.T) {
cadvisorReq := &cadvisorapi.ContainerInfoRequest{}
mockCadvisor := testKubelet.fakeCadvisor
mockCadvisor.On("DockerContainer", containerID, cadvisorReq).Return(containerInfo, nil)
fakeRuntime.PodList = []*kubecontainer.Pod{
{
fakeRuntime.PodList = []*kubecontainertest.FakePod{
{Pod: &kubecontainer.Pod{
ID: "12345678",
Name: "qux",
Namespace: "ns",
@ -50,7 +51,7 @@ func TestGetContainerInfo(t *testing.T) {
ID: kubecontainer.ContainerID{Type: "test", ID: containerID},
},
},
},
}},
}
stats, err := kubelet.GetContainerInfo("qux_ns", "", "foo", cadvisorReq)
if err != nil {
@ -122,8 +123,8 @@ func TestGetContainerInfoWhenCadvisorFailed(t *testing.T) {
containerInfo := cadvisorapi.ContainerInfo{}
cadvisorReq := &cadvisorapi.ContainerInfoRequest{}
mockCadvisor.On("DockerContainer", containerID, cadvisorReq).Return(containerInfo, cadvisorApiFailure)
fakeRuntime.PodList = []*kubecontainer.Pod{
{
fakeRuntime.PodList = []*kubecontainertest.FakePod{
{Pod: &kubecontainer.Pod{
ID: "uuid",
Name: "qux",
Namespace: "ns",
@ -132,7 +133,7 @@ func TestGetContainerInfoWhenCadvisorFailed(t *testing.T) {
ID: kubecontainer.ContainerID{Type: "test", ID: containerID},
},
},
},
}},
}
stats, err := kubelet.GetContainerInfo("qux_ns", "uuid", "foo", cadvisorReq)
if stats != nil {
@ -153,7 +154,7 @@ func TestGetContainerInfoOnNonExistContainer(t *testing.T) {
kubelet := testKubelet.kubelet
mockCadvisor := testKubelet.fakeCadvisor
fakeRuntime := testKubelet.fakeRuntime
fakeRuntime.PodList = []*kubecontainer.Pod{}
fakeRuntime.PodList = []*kubecontainertest.FakePod{}
stats, _ := kubelet.GetContainerInfo("qux", "", "foo", nil)
if stats != nil {
@ -206,8 +207,8 @@ func TestGetContainerInfoWithNoMatchingContainers(t *testing.T) {
fakeRuntime := testKubelet.fakeRuntime
kubelet := testKubelet.kubelet
mockCadvisor := testKubelet.fakeCadvisor
fakeRuntime.PodList = []*kubecontainer.Pod{
{
fakeRuntime.PodList = []*kubecontainertest.FakePod{
{Pod: &kubecontainer.Pod{
ID: "12345678",
Name: "qux",
Namespace: "ns",
@ -216,6 +217,7 @@ func TestGetContainerInfoWithNoMatchingContainers(t *testing.T) {
ID: kubecontainer.ContainerID{Type: "test", ID: "fakeID"},
},
}},
},
}
stats, err := kubelet.GetContainerInfo("qux_ns", "", "foo", nil)

View File

@ -408,15 +408,15 @@ func TestSyncPodsDeletesWhenSourcesAreReady(t *testing.T) {
kubelet := testKubelet.kubelet
kubelet.sourcesReady = config.NewSourcesReady(func(_ sets.String) bool { return ready })
fakeRuntime.PodList = []*kubecontainer.Pod{
{
fakeRuntime.PodList = []*containertest.FakePod{
{Pod: &kubecontainer.Pod{
ID: "12345678",
Name: "foo",
Namespace: "new",
Containers: []*kubecontainer.Container{
{Name: "bar"},
},
},
}},
}
kubelet.HandlePodCleanups()
// Sources are not ready yet. Don't remove any pods.
@ -1087,7 +1087,7 @@ func TestRunInContainerNoSuchPod(t *testing.T) {
testKubelet := newTestKubelet(t, false /* controllerAttachDetachEnabled */)
kubelet := testKubelet.kubelet
fakeRuntime := testKubelet.fakeRuntime
fakeRuntime.PodList = []*kubecontainer.Pod{}
fakeRuntime.PodList = []*containertest.FakePod{}
podName := "podFoo"
podNamespace := "nsFoo"
@ -1113,8 +1113,8 @@ func TestRunInContainer(t *testing.T) {
kubelet.runner = &fakeCommandRunner
containerID := kubecontainer.ContainerID{Type: "test", ID: "abc1234"}
fakeRuntime.PodList = []*kubecontainer.Pod{
{
fakeRuntime.PodList = []*containertest.FakePod{
{Pod: &kubecontainer.Pod{
ID: "12345678",
Name: "podFoo",
Namespace: "nsFoo",
@ -1123,7 +1123,7 @@ func TestRunInContainer(t *testing.T) {
ID: containerID,
},
},
},
}},
}
cmd := []string{"ls"}
_, err := kubelet.RunInContainer("podFoo_nsFoo", "", "containerFoo", cmd)
@ -2069,7 +2069,7 @@ func TestExecInContainerNoSuchPod(t *testing.T) {
fakeRuntime := testKubelet.fakeRuntime
fakeCommandRunner := fakeContainerCommandRunner{}
kubelet.runner = &fakeCommandRunner
fakeRuntime.PodList = []*kubecontainer.Pod{}
fakeRuntime.PodList = []*containertest.FakePod{}
podName := "podFoo"
podNamespace := "nsFoo"
@ -2102,8 +2102,8 @@ func TestExecInContainerNoSuchContainer(t *testing.T) {
podName := "podFoo"
podNamespace := "nsFoo"
containerID := "containerFoo"
fakeRuntime.PodList = []*kubecontainer.Pod{
{
fakeRuntime.PodList = []*containertest.FakePod{
{Pod: &kubecontainer.Pod{
ID: "12345678",
Name: podName,
Namespace: podNamespace,
@ -2111,7 +2111,7 @@ func TestExecInContainerNoSuchContainer(t *testing.T) {
{Name: "bar",
ID: kubecontainer.ContainerID{Type: "test", ID: "barID"}},
},
},
}},
}
err := kubelet.ExecInContainer(
@ -2165,8 +2165,8 @@ func TestExecInContainer(t *testing.T) {
stdout := &fakeReadWriteCloser{}
stderr := &fakeReadWriteCloser{}
tty := true
fakeRuntime.PodList = []*kubecontainer.Pod{
{
fakeRuntime.PodList = []*containertest.FakePod{
{Pod: &kubecontainer.Pod{
ID: "12345678",
Name: podName,
Namespace: podNamespace,
@ -2175,7 +2175,7 @@ func TestExecInContainer(t *testing.T) {
ID: kubecontainer.ContainerID{Type: "test", ID: containerID},
},
},
},
}},
}
err := kubelet.ExecInContainer(
@ -2215,7 +2215,7 @@ func TestPortForwardNoSuchPod(t *testing.T) {
testKubelet := newTestKubelet(t, false /* controllerAttachDetachEnabled */)
kubelet := testKubelet.kubelet
fakeRuntime := testKubelet.fakeRuntime
fakeRuntime.PodList = []*kubecontainer.Pod{}
fakeRuntime.PodList = []*containertest.FakePod{}
fakeCommandRunner := fakeContainerCommandRunner{}
kubelet.runner = &fakeCommandRunner
@ -2245,8 +2245,8 @@ func TestPortForward(t *testing.T) {
podName := "podFoo"
podNamespace := "nsFoo"
podID := types.UID("12345678")
fakeRuntime.PodList = []*kubecontainer.Pod{
{
fakeRuntime.PodList = []*containertest.FakePod{
{Pod: &kubecontainer.Pod{
ID: podID,
Name: podName,
Namespace: podNamespace,
@ -2256,7 +2256,7 @@ func TestPortForward(t *testing.T) {
ID: kubecontainer.ContainerID{Type: "test", ID: "containerFoo"},
},
},
},
}},
}
fakeCommandRunner := fakeContainerCommandRunner{}
kubelet.runner = &fakeCommandRunner
@ -3594,8 +3594,8 @@ func TestGetContainerInfoForMirrorPods(t *testing.T) {
mockCadvisor.On("DockerContainer", containerID, cadvisorReq).Return(containerInfo, nil)
kubelet := testKubelet.kubelet
fakeRuntime.PodList = []*kubecontainer.Pod{
{
fakeRuntime.PodList = []*containertest.FakePod{
{Pod: &kubecontainer.Pod{
ID: "1234",
Name: "qux",
Namespace: "ns",
@ -3605,7 +3605,7 @@ func TestGetContainerInfoForMirrorPods(t *testing.T) {
ID: kubecontainer.ContainerID{Type: "test", ID: containerID},
},
},
},
}},
}
kubelet.podManager.SetPods(pods)
@ -3930,15 +3930,15 @@ func TestSyncPodsSetStatusToFailedForPodsThatRunTooLong(t *testing.T) {
},
}
fakeRuntime.PodList = []*kubecontainer.Pod{
{
fakeRuntime.PodList = []*containertest.FakePod{
{Pod: &kubecontainer.Pod{
ID: "12345678",
Name: "bar",
Namespace: "new",
Containers: []*kubecontainer.Container{
{Name: "foo"},
},
},
}},
}
// Let the pod worker sets the status to fail after this sync.
@ -3986,15 +3986,15 @@ func TestSyncPodsDoesNotSetPodsThatDidNotRunTooLongToFailed(t *testing.T) {
},
}
fakeRuntime.PodList = []*kubecontainer.Pod{
{
fakeRuntime.PodList = []*containertest.FakePod{
{Pod: &kubecontainer.Pod{
ID: "12345678",
Name: "bar",
Namespace: "new",
Containers: []*kubecontainer.Container{
{Name: "foo"},
},
},
}},
}
kubelet.podManager.SetPods(pods)
@ -4118,13 +4118,13 @@ func TestDoesNotDeletePodDirsIfContainerIsRunning(t *testing.T) {
// Pretend the pod is deleted from apiserver, but is still active on the node.
// The pod directory should not be removed.
pods = []*api.Pod{}
testKubelet.fakeRuntime.PodList = []*kubecontainer.Pod{runningPod}
testKubelet.fakeRuntime.PodList = []*containertest.FakePod{{runningPod, ""}}
syncAndVerifyPodDir(t, testKubelet, pods, []*api.Pod{apiPod}, true)
// The pod is deleted and also not active on the node. The pod directory
// should be removed.
pods = []*api.Pod{}
testKubelet.fakeRuntime.PodList = []*kubecontainer.Pod{}
testKubelet.fakeRuntime.PodList = []*containertest.FakePod{}
syncAndVerifyPodDir(t, testKubelet, pods, []*api.Pod{apiPod}, false)
}

View File

@ -27,8 +27,8 @@ import (
"github.com/golang/glog"
"k8s.io/kubernetes/pkg/apis/componentconfig"
kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
"k8s.io/kubernetes/pkg/kubelet/dockertools"
"k8s.io/kubernetes/pkg/kubelet/network"
utilexec "k8s.io/kubernetes/pkg/util/exec"
)
const (
@ -43,6 +43,8 @@ type cniNetworkPlugin struct {
defaultNetwork *cniNetwork
host network.Host
execer utilexec.Interface
nsenterPath string
}
type cniNetwork struct {
@ -57,7 +59,10 @@ func probeNetworkPluginsWithVendorCNIDirPrefix(pluginDir, vendorCNIDirPrefix str
if err != nil {
return configList
}
return append(configList, &cniNetworkPlugin{defaultNetwork: network})
return append(configList, &cniNetworkPlugin{
defaultNetwork: network,
execer: utilexec.New(),
})
}
func ProbeNetworkPlugins(pluginDir string) []network.NetworkPlugin {
@ -95,6 +100,12 @@ func getDefaultCNINetwork(pluginDir, vendorCNIDirPrefix string) (*cniNetwork, er
}
func (plugin *cniNetworkPlugin) Init(host network.Host, hairpinMode componentconfig.HairpinMode, nonMasqueradeCIDR string) error {
var err error
plugin.nsenterPath, err = plugin.execer.LookPath("nsenter")
if err != nil {
return err
}
plugin.host = host
return nil
}
@ -104,16 +115,12 @@ func (plugin *cniNetworkPlugin) Name() string {
}
func (plugin *cniNetworkPlugin) SetUpPod(namespace string, name string, id kubecontainer.ContainerID) error {
runtime, ok := plugin.host.GetRuntime().(*dockertools.DockerManager)
if !ok {
return fmt.Errorf("CNI execution called on non-docker runtime")
}
netns, err := runtime.GetNetNS(id)
netnsPath, err := plugin.host.GetRuntime().GetNetNS(id)
if err != nil {
return err
return fmt.Errorf("CNI failed to retrieve network namespace path: %v", err)
}
_, err = plugin.defaultNetwork.addToNetwork(name, namespace, id, netns)
_, err = plugin.defaultNetwork.addToNetwork(name, namespace, id, netnsPath)
if err != nil {
glog.Errorf("Error while adding to cni network: %s", err)
return err
@ -123,33 +130,55 @@ func (plugin *cniNetworkPlugin) SetUpPod(namespace string, name string, id kubec
}
func (plugin *cniNetworkPlugin) TearDownPod(namespace string, name string, id kubecontainer.ContainerID) error {
runtime, ok := plugin.host.GetRuntime().(*dockertools.DockerManager)
if !ok {
return fmt.Errorf("CNI execution called on non-docker runtime")
}
netns, err := runtime.GetNetNS(id)
netnsPath, err := plugin.host.GetRuntime().GetNetNS(id)
if err != nil {
return err
return fmt.Errorf("CNI failed to retrieve network namespace path: %v", err)
}
return plugin.defaultNetwork.deleteFromNetwork(name, namespace, id, netns)
return plugin.defaultNetwork.deleteFromNetwork(name, namespace, id, netnsPath)
}
func (plugin *cniNetworkPlugin) getContainerIPAddress(netnsPath, addrType string) (net.IP, error) {
// Try to retrieve ip inside container network namespace
output, err := plugin.execer.Command(plugin.nsenterPath, fmt.Sprintf("--net=%s", netnsPath), "-F", "--",
"ip", "-o", addrType, "addr", "show", "dev", network.DefaultInterfaceName, "scope", "global").CombinedOutput()
if err != nil {
return nil, fmt.Errorf("Unexpected command output %s with error: %v", output, err)
}
lines := strings.Split(string(output), "\n")
if len(lines) < 1 {
return nil, fmt.Errorf("Unexpected command output %s", output)
}
fields := strings.Fields(lines[0])
if len(fields) < 4 {
return nil, fmt.Errorf("Unexpected address output %s ", lines[0])
}
ip, _, err := net.ParseCIDR(fields[3])
if err != nil {
return nil, fmt.Errorf("CNI failed to parse ip from output %s due to %v", output, err)
}
return ip, nil
}
// TODO: Use the addToNetwork function to obtain the IP of the Pod. That will assume idempotent ADD call to the plugin.
// Also fix the runtime's call to Status function to be done only in the case that the IP is lost, no need to do periodic calls
func (plugin *cniNetworkPlugin) GetPodNetworkStatus(namespace string, name string, id kubecontainer.ContainerID) (*network.PodNetworkStatus, error) {
runtime, ok := plugin.host.GetRuntime().(*dockertools.DockerManager)
if !ok {
return nil, fmt.Errorf("CNI execution called on non-docker runtime")
netnsPath, err := plugin.host.GetRuntime().GetNetNS(id)
if err != nil {
return nil, fmt.Errorf("CNI failed to retrieve network namespace path: %v", err)
}
ip, err := plugin.getContainerIPAddress(netnsPath, "-4")
if err != nil {
// Fall back to IPv6 address if no IPv4 address is present
ip, err = plugin.getContainerIPAddress(netnsPath, "-6")
}
ipStr, err := runtime.GetContainerIP(id.ID, network.DefaultInterfaceName)
if err != nil {
return nil, err
}
ip, _, err := net.ParseCIDR(strings.Trim(ipStr, "\n"))
if err != nil {
return nil, err
}
return &network.PodNetworkStatus{IP: ip}, nil
}

View File

@ -30,18 +30,12 @@ import (
clientset "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset"
cadvisorapi "github.com/google/cadvisor/info/v1"
"k8s.io/kubernetes/cmd/kubelet/app/options"
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/apis/componentconfig"
"k8s.io/kubernetes/pkg/client/record"
kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
containertest "k8s.io/kubernetes/pkg/kubelet/container/testing"
"k8s.io/kubernetes/pkg/kubelet/dockertools"
"k8s.io/kubernetes/pkg/kubelet/network"
nettest "k8s.io/kubernetes/pkg/kubelet/network/testing"
proberesults "k8s.io/kubernetes/pkg/kubelet/prober/results"
utilexec "k8s.io/kubernetes/pkg/util/exec"
utiltesting "k8s.io/kubernetes/pkg/util/testing"
)
@ -115,10 +109,16 @@ func tearDownPlugin(tmpDir string) {
type fakeNetworkHost struct {
kubeClient clientset.Interface
runtime kubecontainer.Runtime
}
func NewFakeHost(kubeClient clientset.Interface) *fakeNetworkHost {
host := &fakeNetworkHost{kubeClient: kubeClient}
func NewFakeHost(kubeClient clientset.Interface, pods []*containertest.FakePod) *fakeNetworkHost {
host := &fakeNetworkHost{
kubeClient: kubeClient,
runtime: &containertest.FakeRuntime{
AllPodList: pods,
},
}
return host
}
@ -127,40 +127,11 @@ func (fnh *fakeNetworkHost) GetPodByName(name, namespace string) (*api.Pod, bool
}
func (fnh *fakeNetworkHost) GetKubeClient() clientset.Interface {
return nil
return fnh.kubeClient
}
func (nh *fakeNetworkHost) GetRuntime() kubecontainer.Runtime {
dm, fakeDockerClient := newTestDockerManager()
fakeDockerClient.SetFakeRunningContainers([]*dockertools.FakeContainer{
{
ID: "test_infra_container",
Pid: 12345,
},
})
return dm
}
func newTestDockerManager() (*dockertools.DockerManager, *dockertools.FakeDockerClient) {
fakeDocker := dockertools.NewFakeDockerClient()
fakeRecorder := &record.FakeRecorder{}
containerRefManager := kubecontainer.NewRefManager()
networkPlugin, _ := network.InitNetworkPlugin([]network.NetworkPlugin{}, "", nettest.NewFakeHost(nil), componentconfig.HairpinNone, "10.0.0.0/8")
dockerManager := dockertools.NewFakeDockerManager(
fakeDocker,
fakeRecorder,
proberesults.NewManager(),
containerRefManager,
&cadvisorapi.MachineInfo{},
options.GetDefaultPodInfraContainerImage(),
0, 0, "",
&containertest.FakeOS{},
networkPlugin,
nil,
nil,
nil)
return dockerManager, fakeDocker
func (fnh *fakeNetworkHost) GetRuntime() kubecontainer.Runtime {
return fnh.runtime
}
func TestCNIPlugin(t *testing.T) {
@ -168,19 +139,64 @@ func TestCNIPlugin(t *testing.T) {
pluginName := fmt.Sprintf("test%d", rand.Intn(1000))
vendorName := fmt.Sprintf("test_vendor%d", rand.Intn(1000))
podIP := "10.0.0.2"
podIPOutput := fmt.Sprintf("4: eth0 inet %s/24 scope global dynamic eth0\\ valid_lft forever preferred_lft forever", podIP)
fakeCmds := []utilexec.FakeCommandAction{
func(cmd string, args ...string) utilexec.Cmd {
return utilexec.InitFakeCmd(&utilexec.FakeCmd{
CombinedOutputScript: []utilexec.FakeCombinedOutputAction{
func() ([]byte, error) {
return []byte(podIPOutput), nil
},
},
}, cmd, args...)
},
}
fexec := &utilexec.FakeExec{
CommandScript: fakeCmds,
LookPathFunc: func(file string) (string, error) {
return fmt.Sprintf("/fake-bin/%s", file), nil
},
}
tmpDir := utiltesting.MkTmpdirOrDie("cni-test")
testNetworkConfigPath := path.Join(tmpDir, "plugins", "net", "cni")
testVendorCNIDirPrefix := tmpDir
defer tearDownPlugin(tmpDir)
installPluginUnderTest(t, testVendorCNIDirPrefix, testNetworkConfigPath, vendorName, pluginName)
np := probeNetworkPluginsWithVendorCNIDirPrefix(path.Join(testNetworkConfigPath, pluginName), testVendorCNIDirPrefix)
plug, err := network.InitNetworkPlugin(np, "cni", NewFakeHost(nil), componentconfig.HairpinNone, "10.0.0.0/8")
containerID := kubecontainer.ContainerID{Type: "test", ID: "test_infra_container"}
pods := []*containertest.FakePod{{
Pod: &kubecontainer.Pod{
Containers: []*kubecontainer.Container{
{ID: containerID},
},
},
NetnsPath: "/proc/12345/ns/net",
}}
plugins := probeNetworkPluginsWithVendorCNIDirPrefix(path.Join(testNetworkConfigPath, pluginName), testVendorCNIDirPrefix)
if len(plugins) != 1 {
t.Fatalf("Expected only one network plugin, got %d", len(plugins))
}
if plugins[0].Name() != "cni" {
t.Fatalf("Expected CNI network plugin, got %q", plugins[0].Name())
}
cniPlugin, ok := plugins[0].(*cniNetworkPlugin)
if !ok {
t.Fatalf("Not a CNI network plugin!")
}
cniPlugin.execer = fexec
plug, err := network.InitNetworkPlugin(plugins, "cni", NewFakeHost(nil, pods), componentconfig.HairpinNone, "10.0.0.0/8")
if err != nil {
t.Fatalf("Failed to select the desired plugin: %v", err)
}
err = plug.SetUpPod("podNamespace", "podName", kubecontainer.ContainerID{Type: "docker", ID: "test_infra_container"})
// Set up the pod
err = plug.SetUpPod("podNamespace", "podName", containerID)
if err != nil {
t.Errorf("Expected nil: %v", err)
}
@ -195,7 +211,18 @@ func TestCNIPlugin(t *testing.T) {
if string(output) != expectedOutput {
t.Errorf("Mismatch in expected output for setup hook. Expected '%s', got '%s'", expectedOutput, string(output))
}
err = plug.TearDownPod("podNamespace", "podName", kubecontainer.ContainerID{Type: "docker", ID: "test_infra_container"})
// Get its IP address
status, err := plug.GetPodNetworkStatus("podNamespace", "podName", containerID)
if err != nil {
t.Errorf("Failed to read pod network status: %v", err)
}
if status.IP.String() != podIP {
t.Errorf("Expected pod IP %q but got %q", podIP, status.IP.String())
}
// Tear it down
err = plug.TearDownPod("podNamespace", "podName", containerID)
if err != nil {
t.Errorf("Expected nil: %v", err)
}

View File

@ -97,21 +97,21 @@ func TestRelisting(t *testing.T) {
pleg, runtime := testPleg.pleg, testPleg.runtime
ch := pleg.Watch()
// The first relist should send a PodSync event to each pod.
runtime.AllPodList = []*kubecontainer.Pod{
{
runtime.AllPodList = []*containertest.FakePod{
{Pod: &kubecontainer.Pod{
ID: "1234",
Containers: []*kubecontainer.Container{
createTestContainer("c1", kubecontainer.ContainerStateExited),
createTestContainer("c2", kubecontainer.ContainerStateRunning),
createTestContainer("c3", kubecontainer.ContainerStateUnknown),
},
},
{
}},
{Pod: &kubecontainer.Pod{
ID: "4567",
Containers: []*kubecontainer.Container{
createTestContainer("c1", kubecontainer.ContainerStateExited),
},
},
}},
}
pleg.relist()
// Report every running/exited container if we see them for the first time.
@ -128,20 +128,20 @@ func TestRelisting(t *testing.T) {
pleg.relist()
verifyEvents(t, expected, actual)
runtime.AllPodList = []*kubecontainer.Pod{
{
runtime.AllPodList = []*containertest.FakePod{
{Pod: &kubecontainer.Pod{
ID: "1234",
Containers: []*kubecontainer.Container{
createTestContainer("c2", kubecontainer.ContainerStateExited),
createTestContainer("c3", kubecontainer.ContainerStateRunning),
},
},
{
}},
{Pod: &kubecontainer.Pod{
ID: "4567",
Containers: []*kubecontainer.Container{
createTestContainer("c4", kubecontainer.ContainerStateRunning),
},
},
}},
}
pleg.relist()
// Only report containers that transitioned to running or exited status.
@ -169,15 +169,15 @@ func testReportMissingContainers(t *testing.T, numRelists int) {
testPleg := newTestGenericPLEG()
pleg, runtime := testPleg.pleg, testPleg.runtime
ch := pleg.Watch()
runtime.AllPodList = []*kubecontainer.Pod{
{
runtime.AllPodList = []*containertest.FakePod{
{Pod: &kubecontainer.Pod{
ID: "1234",
Containers: []*kubecontainer.Container{
createTestContainer("c1", kubecontainer.ContainerStateRunning),
createTestContainer("c2", kubecontainer.ContainerStateRunning),
createTestContainer("c3", kubecontainer.ContainerStateExited),
},
},
}},
}
// Relist and drain the events from the channel.
for i := 0; i < numRelists; i++ {
@ -188,13 +188,13 @@ func testReportMissingContainers(t *testing.T, numRelists int) {
// Container c2 was stopped and removed between relists. We should report
// the event. The exited container c3 was garbage collected (i.e., removed)
// between relists. We should ignore that event.
runtime.AllPodList = []*kubecontainer.Pod{
{
runtime.AllPodList = []*containertest.FakePod{
{Pod: &kubecontainer.Pod{
ID: "1234",
Containers: []*kubecontainer.Container{
createTestContainer("c1", kubecontainer.ContainerStateRunning),
},
},
}},
}
pleg.relist()
expected := []*PodLifecycleEvent{
@ -208,13 +208,13 @@ func testReportMissingPods(t *testing.T, numRelists int) {
testPleg := newTestGenericPLEG()
pleg, runtime := testPleg.pleg, testPleg.runtime
ch := pleg.Watch()
runtime.AllPodList = []*kubecontainer.Pod{
{
runtime.AllPodList = []*containertest.FakePod{
{Pod: &kubecontainer.Pod{
ID: "1234",
Containers: []*kubecontainer.Container{
createTestContainer("c2", kubecontainer.ContainerStateRunning),
},
},
}},
}
// Relist and drain the events from the channel.
for i := 0; i < numRelists; i++ {
@ -224,7 +224,7 @@ func testReportMissingPods(t *testing.T, numRelists int) {
// Container c2 was stopped and removed between relists. We should report
// the event.
runtime.AllPodList = []*kubecontainer.Pod{}
runtime.AllPodList = []*containertest.FakePod{}
pleg.relist()
expected := []*PodLifecycleEvent{
{ID: "1234", Type: ContainerDied, Data: "c2"},