diff --git a/pkg/kubelet/dockershim/BUILD b/pkg/kubelet/dockershim/BUILD index 5571efe085f..84d9298d55d 100644 --- a/pkg/kubelet/dockershim/BUILD +++ b/pkg/kubelet/dockershim/BUILD @@ -89,7 +89,7 @@ go_test( "//pkg/kubelet/dockertools:go_default_library", "//pkg/kubelet/dockertools/securitycontext:go_default_library", "//pkg/kubelet/network:go_default_library", - "//pkg/kubelet/network/mock_network:go_default_library", + "//pkg/kubelet/network/testing:go_default_library", "//pkg/kubelet/types:go_default_library", "//pkg/kubelet/util/cache:go_default_library", "//pkg/security/apparmor:go_default_library", diff --git a/pkg/kubelet/dockershim/docker_sandbox.go b/pkg/kubelet/dockershim/docker_sandbox.go index 2a4ec8b7671..b1e0e1dfbb7 100644 --- a/pkg/kubelet/dockershim/docker_sandbox.go +++ b/pkg/kubelet/dockershim/docker_sandbox.go @@ -103,7 +103,7 @@ func (ds *dockerService) RunPodSandbox(config *runtimeapi.PodSandboxConfig) (str // on the host as well, to satisfy parts of the pod spec that aren't // recognized by the CNI standard yet. cID := kubecontainer.BuildContainerID(runtimeName, createResp.ID) - err = ds.networkPlugin.SetUpPod(config.GetMetadata().Namespace, config.GetMetadata().Name, cID) + err = ds.network.SetUpPod(config.GetMetadata().Namespace, config.GetMetadata().Name, cID) // TODO: Do we need to teardown on failure or can we rely on a StopPodSandbox call with the given ID? return createResp.ID, err } @@ -162,8 +162,8 @@ func (ds *dockerService) StopPodSandbox(podSandboxID string) error { errList := []error{} if needNetworkTearDown { cID := kubecontainer.BuildContainerID(runtimeName, podSandboxID) - if err := ds.networkPlugin.TearDownPod(namespace, name, cID); err != nil { - errList = append(errList, fmt.Errorf("failed to teardown sandbox %q for pod %s/%s: %v", podSandboxID, namespace, name, err)) + if err := ds.network.TearDownPod(namespace, name, cID); err != nil { + errList = append(errList, err) } } if err := ds.client.StopContainer(podSandboxID, defaultSandboxGracePeriod); err != nil { @@ -199,12 +199,12 @@ func (ds *dockerService) getIPFromPlugin(sandbox *dockertypes.ContainerJSON) (st } msg := fmt.Sprintf("Couldn't find network status for %s/%s through plugin", metadata.Namespace, metadata.Name) cID := kubecontainer.BuildContainerID(runtimeName, sandbox.ID) - networkStatus, err := ds.networkPlugin.GetPodNetworkStatus(metadata.Namespace, metadata.Name, cID) + networkStatus, err := ds.network.GetPodNetworkStatus(metadata.Namespace, metadata.Name, cID) if err != nil { // This might be a sandbox that somehow ended up without a default // interface (eth0). We can't distinguish this from a more serious // error, so callers should probably treat it as non-fatal. - return "", fmt.Errorf("%v: %v", msg, err) + return "", err } if networkStatus == nil { return "", fmt.Errorf("%v: invalid network status for", msg) @@ -408,7 +408,7 @@ func (ds *dockerService) applySandboxLinuxOptions(hc *dockercontainer.HostConfig } hc.CgroupParent = cgroupParent // Apply security context. - applySandboxSecurityContext(lc, createConfig.Config, hc, ds.networkPlugin, separator) + applySandboxSecurityContext(lc, createConfig.Config, hc, ds.network, separator) return nil } diff --git a/pkg/kubelet/dockershim/docker_sandbox_test.go b/pkg/kubelet/dockershim/docker_sandbox_test.go index 085a7b7e720..855dd7454fc 100644 --- a/pkg/kubelet/dockershim/docker_sandbox_test.go +++ b/pkg/kubelet/dockershim/docker_sandbox_test.go @@ -26,6 +26,7 @@ import ( runtimeapi "k8s.io/kubernetes/pkg/kubelet/api/v1alpha1/runtime" kubecontainer "k8s.io/kubernetes/pkg/kubelet/container" + "k8s.io/kubernetes/pkg/kubelet/network" "k8s.io/kubernetes/pkg/kubelet/types" ) @@ -146,7 +147,7 @@ func TestSandboxStatus(t *testing.T) { func TestNetworkPluginInvocation(t *testing.T) { ds, _, _ := newTestDockerService() mockPlugin := newTestNetworkPlugin(t) - ds.networkPlugin = mockPlugin + ds.network = network.NewPluginManager(mockPlugin) defer mockPlugin.Finish() name := "foo0" @@ -158,6 +159,7 @@ func TestNetworkPluginInvocation(t *testing.T) { ) cID := kubecontainer.ContainerID{Type: runtimeName, ID: fmt.Sprintf("/%v", makeSandboxName(c))} + mockPlugin.EXPECT().Name().Return("mockNetworkPlugin").AnyTimes() setup := mockPlugin.EXPECT().SetUpPod(ns, name, cID) // StopPodSandbox performs a lookup on status to figure out if the sandbox // is running with hostnetworking, as all its given is the ID. @@ -175,7 +177,7 @@ func TestNetworkPluginInvocation(t *testing.T) { func TestHostNetworkPluginInvocation(t *testing.T) { ds, _, _ := newTestDockerService() mockPlugin := newTestNetworkPlugin(t) - ds.networkPlugin = mockPlugin + ds.network = network.NewPluginManager(mockPlugin) defer mockPlugin.Finish() name := "foo0" diff --git a/pkg/kubelet/dockershim/docker_service.go b/pkg/kubelet/dockershim/docker_service.go index 1bb60131889..db7e8568eae 100644 --- a/pkg/kubelet/dockershim/docker_service.go +++ b/pkg/kubelet/dockershim/docker_service.go @@ -178,7 +178,7 @@ func NewDockerService(client dockertools.DockerInterface, seccompProfileRoot str if err != nil { return nil, fmt.Errorf("didn't find compatible CNI plugin with given settings %+v: %v", pluginSettings, err) } - ds.networkPlugin = plug + ds.network = network.NewPluginManager(plug) glog.Infof("Docker cri networking managed by %v", plug.Name()) // NOTE: cgroup driver is only detectable in docker 1.11+ @@ -224,7 +224,7 @@ type dockerService struct { podSandboxImage string streamingRuntime *streamingRuntime streamingServer streaming.Server - networkPlugin network.NetworkPlugin + network *network.PluginManager containerManager cm.ContainerManager // cgroup driver used by Docker runtime. cgroupDriver string @@ -270,10 +270,10 @@ func (ds *dockerService) UpdateRuntimeConfig(runtimeConfig *runtimeapi.RuntimeCo return } glog.Infof("docker cri received runtime config %+v", runtimeConfig) - if ds.networkPlugin != nil && runtimeConfig.NetworkConfig.PodCidr != "" { + if ds.network != nil && runtimeConfig.NetworkConfig.PodCidr != "" { event := make(map[string]interface{}) event[network.NET_PLUGIN_EVENT_POD_CIDR_CHANGE_DETAIL_CIDR] = runtimeConfig.NetworkConfig.PodCidr - ds.networkPlugin.Event(network.NET_PLUGIN_EVENT_POD_CIDR_CHANGE, event) + ds.network.Event(network.NET_PLUGIN_EVENT_POD_CIDR_CHANGE, event) } return } @@ -339,7 +339,7 @@ func (ds *dockerService) Status() (*runtimeapi.RuntimeStatus, error) { runtimeReady.Reason = "DockerDaemonNotReady" runtimeReady.Message = fmt.Sprintf("docker: failed to get docker version: %v", err) } - if err := ds.networkPlugin.Status(); err != nil { + if err := ds.network.Status(); err != nil { networkReady.Status = false networkReady.Reason = "NetworkPluginNotReady" networkReady.Message = fmt.Sprintf("docker: network plugin is not ready: %v", err) diff --git a/pkg/kubelet/dockershim/docker_service_test.go b/pkg/kubelet/dockershim/docker_service_test.go index 3e6d8c8bd8f..5bc92925d78 100644 --- a/pkg/kubelet/dockershim/docker_service_test.go +++ b/pkg/kubelet/dockershim/docker_service_test.go @@ -32,20 +32,21 @@ import ( containertest "k8s.io/kubernetes/pkg/kubelet/container/testing" "k8s.io/kubernetes/pkg/kubelet/dockertools" "k8s.io/kubernetes/pkg/kubelet/network" - "k8s.io/kubernetes/pkg/kubelet/network/mock_network" + nettest "k8s.io/kubernetes/pkg/kubelet/network/testing" "k8s.io/kubernetes/pkg/kubelet/util/cache" ) // newTestNetworkPlugin returns a mock plugin that implements network.NetworkPlugin -func newTestNetworkPlugin(t *testing.T) *mock_network.MockNetworkPlugin { +func newTestNetworkPlugin(t *testing.T) *nettest.MockNetworkPlugin { ctrl := gomock.NewController(t) - return mock_network.NewMockNetworkPlugin(ctrl) + return nettest.NewMockNetworkPlugin(ctrl) } func newTestDockerService() (*dockerService, *dockertools.FakeDockerClient, *clock.FakeClock) { fakeClock := clock.NewFakeClock(time.Time{}) c := dockertools.NewFakeDockerClient().WithClock(fakeClock).WithVersion("1.11.2", "1.23") - return &dockerService{client: c, os: &containertest.FakeOS{}, networkPlugin: &network.NoopNetworkPlugin{}, + pm := network.NewPluginManager(&network.NoopNetworkPlugin{}) + return &dockerService{client: c, os: &containertest.FakeOS{}, network: pm, legacyCleanup: legacyCleanupFlag{done: 1}, checkpointHandler: NewTestPersistentCheckpointHandler()}, c, fakeClock } @@ -95,7 +96,7 @@ func TestStatus(t *testing.T) { // Should not report ready status is network plugin returns error. mockPlugin := newTestNetworkPlugin(t) - ds.networkPlugin = mockPlugin + ds.network = network.NewPluginManager(mockPlugin) defer mockPlugin.Finish() mockPlugin.EXPECT().Status().Return(errors.New("network error")) status, err = ds.Status() diff --git a/pkg/kubelet/dockershim/security_context.go b/pkg/kubelet/dockershim/security_context.go index 159a171a1b9..0b9f3392a5b 100644 --- a/pkg/kubelet/dockershim/security_context.go +++ b/pkg/kubelet/dockershim/security_context.go @@ -25,11 +25,11 @@ import ( "k8s.io/kubernetes/pkg/api/v1" runtimeapi "k8s.io/kubernetes/pkg/kubelet/api/v1alpha1/runtime" "k8s.io/kubernetes/pkg/kubelet/dockertools/securitycontext" - "k8s.io/kubernetes/pkg/kubelet/network" + knetwork "k8s.io/kubernetes/pkg/kubelet/network" ) // applySandboxSecurityContext updates docker sandbox options according to security context. -func applySandboxSecurityContext(lc *runtimeapi.LinuxPodSandboxConfig, config *dockercontainer.Config, hc *dockercontainer.HostConfig, networkPlugin network.NetworkPlugin, separator rune) { +func applySandboxSecurityContext(lc *runtimeapi.LinuxPodSandboxConfig, config *dockercontainer.Config, hc *dockercontainer.HostConfig, network *knetwork.PluginManager, separator rune) { if lc == nil { return } @@ -47,8 +47,7 @@ func applySandboxSecurityContext(lc *runtimeapi.LinuxPodSandboxConfig, config *d modifyContainerConfig(sc, config) modifyHostConfig(sc, hc, separator) - modifySandboxNamespaceOptions(sc.GetNamespaceOptions(), hc, networkPlugin) - + modifySandboxNamespaceOptions(sc.GetNamespaceOptions(), hc, network) } // applyContainerSecurityContext updates docker container options according to security context. @@ -109,9 +108,9 @@ func modifyHostConfig(sc *runtimeapi.LinuxContainerSecurityContext, hostConfig * } // modifySandboxNamespaceOptions apply namespace options for sandbox -func modifySandboxNamespaceOptions(nsOpts *runtimeapi.NamespaceOption, hostConfig *dockercontainer.HostConfig, networkPlugin network.NetworkPlugin) { +func modifySandboxNamespaceOptions(nsOpts *runtimeapi.NamespaceOption, hostConfig *dockercontainer.HostConfig, network *knetwork.PluginManager) { modifyCommonNamespaceOptions(nsOpts, hostConfig) - modifyHostNetworkOptionForSandbox(nsOpts.HostNetwork, networkPlugin, hostConfig) + modifyHostNetworkOptionForSandbox(nsOpts.HostNetwork, network, hostConfig) } // modifyContainerNamespaceOptions apply namespace options for container @@ -137,18 +136,18 @@ func modifyCommonNamespaceOptions(nsOpts *runtimeapi.NamespaceOption, hostConfig } // modifyHostNetworkOptionForSandbox applies NetworkMode/UTSMode to sandbox's dockercontainer.HostConfig. -func modifyHostNetworkOptionForSandbox(hostNetwork bool, networkPlugin network.NetworkPlugin, hc *dockercontainer.HostConfig) { +func modifyHostNetworkOptionForSandbox(hostNetwork bool, network *knetwork.PluginManager, hc *dockercontainer.HostConfig) { if hostNetwork { hc.NetworkMode = namespaceModeHost return } - if networkPlugin == nil { + if network == nil { hc.NetworkMode = "default" return } - switch networkPlugin.Name() { + switch network.PluginName() { case "cni": fallthrough case "kubenet": diff --git a/pkg/kubelet/dockertools/BUILD b/pkg/kubelet/dockertools/BUILD index 3bf5402ad5c..397edb53051 100644 --- a/pkg/kubelet/dockertools/BUILD +++ b/pkg/kubelet/dockertools/BUILD @@ -112,7 +112,6 @@ go_test( "//pkg/kubelet/events:go_default_library", "//pkg/kubelet/images:go_default_library", "//pkg/kubelet/network:go_default_library", - "//pkg/kubelet/network/mock_network:go_default_library", "//pkg/kubelet/network/testing:go_default_library", "//pkg/kubelet/prober/results:go_default_library", "//pkg/kubelet/types:go_default_library", diff --git a/pkg/kubelet/dockertools/container_gc.go b/pkg/kubelet/dockertools/container_gc.go index 65adddf378c..8d902a7be4b 100644 --- a/pkg/kubelet/dockertools/container_gc.go +++ b/pkg/kubelet/dockertools/container_gc.go @@ -28,18 +28,21 @@ import ( "github.com/golang/glog" "k8s.io/apimachinery/pkg/types" kubecontainer "k8s.io/kubernetes/pkg/kubelet/container" + knetwork "k8s.io/kubernetes/pkg/kubelet/network" ) type containerGC struct { client DockerInterface podGetter podGetter + network *knetwork.PluginManager containerLogsDir string } -func NewContainerGC(client DockerInterface, podGetter podGetter, containerLogsDir string) *containerGC { +func NewContainerGC(client DockerInterface, podGetter podGetter, network *knetwork.PluginManager, containerLogsDir string) *containerGC { return &containerGC{ client: client, podGetter: podGetter, + network: network, containerLogsDir: containerLogsDir, } } @@ -50,7 +53,7 @@ type containerGCInfo struct { id string // Docker name of the container. - name string + dockerName string // Creation time for the container. createTime time.Time @@ -59,8 +62,14 @@ type containerGCInfo struct { // This comes from dockertools.ParseDockerName(...) podNameWithNamespace string + // Kubernetes pod UID + podUID types.UID + // Container name in pod containerName string + + // Container network mode + isHostNetwork bool } // Containers are considered for eviction as units of (UID, container name) pair. @@ -111,22 +120,45 @@ func (cgc *containerGC) removeOldestN(containers []containerGCInfo, toRemove int // Remove from oldest to newest (last to first). numToKeep := len(containers) - toRemove for i := numToKeep; i < len(containers); i++ { - cgc.removeContainer(containers[i].id, containers[i].podNameWithNamespace, containers[i].containerName) + cgc.removeContainer(containers[i]) } // Assume we removed the containers so that we're not too aggressive. return containers[:numToKeep] } +// Returns a full GC info structure on success, or a partial one on failure +func newContainerGCInfo(id string, inspectResult *dockertypes.ContainerJSON, created time.Time) (containerGCInfo, error) { + containerName, _, err := ParseDockerName(inspectResult.Name) + if err != nil { + return containerGCInfo{ + id: id, + dockerName: inspectResult.Name, + }, fmt.Errorf("failed to parse docker name %q: %v", inspectResult.Name, err) + } + + networkMode := getDockerNetworkMode(inspectResult) + return containerGCInfo{ + id: id, + dockerName: inspectResult.Name, + podNameWithNamespace: containerName.PodFullName, + podUID: containerName.PodUID, + containerName: containerName.ContainerName, + createTime: created, + isHostNetwork: networkMode == namespaceModeHost, + }, nil +} + // Get all containers that are evictable. Evictable containers are: not running // and created more than MinAge ago. -func (cgc *containerGC) evictableContainers(minAge time.Duration) (containersByEvictUnit, []containerGCInfo, error) { +func (cgc *containerGC) evictableContainers(minAge time.Duration) (containersByEvictUnit, []containerGCInfo, []containerGCInfo, error) { containers, err := GetKubeletDockerContainers(cgc.client, true) if err != nil { - return containersByEvictUnit{}, []containerGCInfo{}, err + return containersByEvictUnit{}, []containerGCInfo{}, []containerGCInfo{}, err } unidentifiedContainers := make([]containerGCInfo, 0) + netContainers := make([]containerGCInfo, 0) evictUnits := make(containersByEvictUnit) newestGCTime := time.Now().Add(-minAge) for _, container := range containers { @@ -147,23 +179,19 @@ func (cgc *containerGC) evictableContainers(minAge time.Duration) (containersByE continue } - containerInfo := containerGCInfo{ - id: container.ID, - name: container.Names[0], - createTime: created, - } - - containerName, _, err := ParseDockerName(container.Names[0]) - + containerInfo, err := newContainerGCInfo(container.ID, data, created) if err != nil { unidentifiedContainers = append(unidentifiedContainers, containerInfo) } else { - key := evictUnit{ - uid: containerName.PodUID, - name: containerName.ContainerName, + // Track net containers for special cleanup + if containerIsNetworked(containerInfo.containerName) { + netContainers = append(netContainers, containerInfo) + } + + key := evictUnit{ + uid: containerInfo.podUID, + name: containerInfo.containerName, } - containerInfo.podNameWithNamespace = containerName.PodFullName - containerInfo.containerName = containerName.ContainerName evictUnits[key] = append(evictUnits[key], containerInfo) } } @@ -173,26 +201,34 @@ func (cgc *containerGC) evictableContainers(minAge time.Duration) (containersByE sort.Sort(byCreated(evictUnits[uid])) } - return evictUnits, unidentifiedContainers, nil + return evictUnits, netContainers, unidentifiedContainers, nil } // GarbageCollect removes dead containers using the specified container gc policy func (cgc *containerGC) GarbageCollect(gcPolicy kubecontainer.ContainerGCPolicy, allSourcesReady bool) error { // Separate containers by evict units. - evictUnits, unidentifiedContainers, err := cgc.evictableContainers(gcPolicy.MinAge) + evictUnits, netContainers, unidentifiedContainers, err := cgc.evictableContainers(gcPolicy.MinAge) if err != nil { return err } // Remove unidentified containers. for _, container := range unidentifiedContainers { - glog.Infof("Removing unidentified dead container %q with ID %q", container.name, container.id) + glog.Infof("Removing unidentified dead container %q", container.dockerName) err = cgc.client.RemoveContainer(container.id, dockertypes.ContainerRemoveOptions{RemoveVolumes: true}) if err != nil { - glog.Warningf("Failed to remove unidentified dead container %q: %v", container.name, err) + glog.Warningf("Failed to remove unidentified dead container %q: %v", container.dockerName, err) } } + // Always clean up net containers to ensure network resources are released + // TODO: this may tear down networking again if the container doesn't get + // removed in this GC cycle, but that already happens elsewhere... + for _, container := range netContainers { + glog.Infof("Cleaning up dead net container %q", container.dockerName) + cgc.netContainerCleanup(container) + } + // Remove deleted pod containers if all sources are ready. if allSourcesReady { for key, unit := range evictUnits { @@ -245,35 +281,56 @@ func (cgc *containerGC) GarbageCollect(gcPolicy kubecontainer.ContainerGCPolicy, return nil } -func (cgc *containerGC) removeContainer(id string, podNameWithNamespace string, containerName string) { - glog.V(4).Infof("Removing container %q name %q", id, containerName) - err := cgc.client.RemoveContainer(id, dockertypes.ContainerRemoveOptions{RemoveVolumes: true}) - if err != nil { - glog.Warningf("Failed to remove container %q: %v", id, err) +func (cgc *containerGC) netContainerCleanup(containerInfo containerGCInfo) { + if containerInfo.isHostNetwork { + return } - symlinkPath := LogSymlink(cgc.containerLogsDir, podNameWithNamespace, containerName, id) + + podName, podNamespace, err := kubecontainer.ParsePodFullName(containerInfo.podNameWithNamespace) + if err != nil { + glog.Warningf("failed to parse container %q pod full name: %v", containerInfo.dockerName, err) + return + } + + containerID := kubecontainer.DockerID(containerInfo.id).ContainerID() + if err := cgc.network.TearDownPod(podNamespace, podName, containerID); err != nil { + glog.Warningf("failed to tear down container %q network: %v", containerInfo.dockerName, err) + } +} + +func (cgc *containerGC) removeContainer(containerInfo containerGCInfo) { + glog.V(4).Infof("Removing container %q", containerInfo.dockerName) + err := cgc.client.RemoveContainer(containerInfo.id, dockertypes.ContainerRemoveOptions{RemoveVolumes: true}) + if err != nil { + glog.Warningf("Failed to remove container %q: %v", containerInfo.dockerName, err) + } + symlinkPath := LogSymlink(cgc.containerLogsDir, containerInfo.podNameWithNamespace, containerInfo.containerName, containerInfo.id) err = os.Remove(symlinkPath) if err != nil && !os.IsNotExist(err) { - glog.Warningf("Failed to remove container %q log symlink %q: %v", id, symlinkPath, err) + glog.Warningf("Failed to remove container %q log symlink %q: %v", containerInfo.dockerName, symlinkPath, err) } } func (cgc *containerGC) deleteContainer(id string) error { - containerInfo, err := cgc.client.InspectContainer(id) + data, err := cgc.client.InspectContainer(id) if err != nil { glog.Warningf("Failed to inspect container %q: %v", id, err) return err } - if containerInfo.State.Running { + if data.State.Running { return fmt.Errorf("container %q is still running", id) } - containerName, _, err := ParseDockerName(containerInfo.Name) + containerInfo, err := newContainerGCInfo(id, data, time.Now()) if err != nil { return err } - cgc.removeContainer(id, containerName.PodFullName, containerName.ContainerName) + if containerIsNetworked(containerInfo.containerName) { + cgc.netContainerCleanup(containerInfo) + } + + cgc.removeContainer(containerInfo) return nil } diff --git a/pkg/kubelet/dockertools/container_gc_test.go b/pkg/kubelet/dockertools/container_gc_test.go index 9393d794291..c0eeecb2296 100644 --- a/pkg/kubelet/dockertools/container_gc_test.go +++ b/pkg/kubelet/dockertools/container_gc_test.go @@ -23,18 +23,23 @@ import ( "testing" "time" + "github.com/golang/mock/gomock" "github.com/stretchr/testify/assert" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/types" "k8s.io/kubernetes/pkg/api/v1" kubecontainer "k8s.io/kubernetes/pkg/kubelet/container" + knetwork "k8s.io/kubernetes/pkg/kubelet/network" + nettest "k8s.io/kubernetes/pkg/kubelet/network/testing" ) -func newTestContainerGC(t *testing.T) (*containerGC, *FakeDockerClient) { +func newTestContainerGC(t *testing.T) (*containerGC, *FakeDockerClient, *nettest.MockNetworkPlugin) { fakeDocker := NewFakeDockerClient() fakePodGetter := newFakePodGetter() - gc := NewContainerGC(fakeDocker, fakePodGetter, "") - return gc, fakeDocker + fakePlugin := nettest.NewMockNetworkPlugin(gomock.NewController(t)) + fakePlugin.EXPECT().Name().Return("someNetworkPlugin").AnyTimes() + gc := NewContainerGC(fakeDocker, fakePodGetter, knetwork.NewPluginManager(fakePlugin), "") + return gc, fakeDocker, fakePlugin } // Makes a stable time object, lower id is earlier time. @@ -91,7 +96,7 @@ func verifyStringArrayEqualsAnyOrder(t *testing.T, actual, expected []string) { } func TestDeleteContainerSkipRunningContainer(t *testing.T) { - gc, fakeDocker := newTestContainerGC(t) + gc, fakeDocker, _ := newTestContainerGC(t) fakeDocker.SetFakeContainers([]*FakeContainer{ makeContainer("1876", "foo", "POD", true, makeTime(0)), }) @@ -102,29 +107,65 @@ func TestDeleteContainerSkipRunningContainer(t *testing.T) { } func TestDeleteContainerRemoveDeadContainer(t *testing.T) { - gc, fakeDocker := newTestContainerGC(t) + gc, fakeDocker, fakePlugin := newTestContainerGC(t) + defer fakePlugin.Finish() fakeDocker.SetFakeContainers([]*FakeContainer{ makeContainer("1876", "foo", "POD", false, makeTime(0)), }) addPods(gc.podGetter, "foo") + fakePlugin.EXPECT().TearDownPod(gomock.Any(), gomock.Any(), gomock.Any()).Return(nil) + assert.Nil(t, gc.deleteContainer("1876")) assert.Len(t, fakeDocker.Removed, 1) } +func TestGarbageCollectNetworkTeardown(t *testing.T) { + // Ensure infra container gets teardown called + gc, fakeDocker, fakePlugin := newTestContainerGC(t) + defer fakePlugin.Finish() + id := kubecontainer.DockerID("1867").ContainerID() + fakeDocker.SetFakeContainers([]*FakeContainer{ + makeContainer(id.ID, "foo", "POD", false, makeTime(0)), + }) + addPods(gc.podGetter, "foo") + + fakePlugin.EXPECT().TearDownPod(gomock.Any(), gomock.Any(), id).Return(nil) + + assert.Nil(t, gc.deleteContainer(id.ID)) + assert.Len(t, fakeDocker.Removed, 1) + + // Ensure non-infra container does not have teardown called + gc, fakeDocker, fakePlugin = newTestContainerGC(t) + id = kubecontainer.DockerID("1877").ContainerID() + fakeDocker.SetFakeContainers([]*FakeContainer{ + makeContainer(id.ID, "foo", "adsfasdfasdf", false, makeTime(0)), + }) + fakePlugin.EXPECT().SetUpPod(gomock.Any(), gomock.Any(), id).Return(nil) + + addPods(gc.podGetter, "foo") + + assert.Nil(t, gc.deleteContainer(id.ID)) + assert.Len(t, fakeDocker.Removed, 1) +} + func TestGarbageCollectZeroMaxContainers(t *testing.T) { - gc, fakeDocker := newTestContainerGC(t) + gc, fakeDocker, fakePlugin := newTestContainerGC(t) + defer fakePlugin.Finish() fakeDocker.SetFakeContainers([]*FakeContainer{ makeContainer("1876", "foo", "POD", false, makeTime(0)), }) addPods(gc.podGetter, "foo") + fakePlugin.EXPECT().TearDownPod(gomock.Any(), gomock.Any(), gomock.Any()).Return(nil) + assert.Nil(t, gc.GarbageCollect(kubecontainer.ContainerGCPolicy{MinAge: time.Minute, MaxPerPodContainer: 1, MaxContainers: 0}, true)) assert.Len(t, fakeDocker.Removed, 1) } func TestGarbageCollectNoMaxPerPodContainerLimit(t *testing.T) { - gc, fakeDocker := newTestContainerGC(t) + gc, fakeDocker, fakePlugin := newTestContainerGC(t) + defer fakePlugin.Finish() fakeDocker.SetFakeContainers([]*FakeContainer{ makeContainer("1876", "foo", "POD", false, makeTime(0)), makeContainer("2876", "foo1", "POD", false, makeTime(1)), @@ -134,12 +175,15 @@ func TestGarbageCollectNoMaxPerPodContainerLimit(t *testing.T) { }) addPods(gc.podGetter, "foo", "foo1", "foo2", "foo3", "foo4") + fakePlugin.EXPECT().TearDownPod(gomock.Any(), gomock.Any(), gomock.Any()).Return(nil).Times(5) + assert.Nil(t, gc.GarbageCollect(kubecontainer.ContainerGCPolicy{MinAge: time.Minute, MaxPerPodContainer: -1, MaxContainers: 4}, true)) assert.Len(t, fakeDocker.Removed, 1) } func TestGarbageCollectNoMaxLimit(t *testing.T) { - gc, fakeDocker := newTestContainerGC(t) + gc, fakeDocker, fakePlugin := newTestContainerGC(t) + defer fakePlugin.Finish() fakeDocker.SetFakeContainers([]*FakeContainer{ makeContainer("1876", "foo", "POD", false, makeTime(0)), makeContainer("2876", "foo1", "POD", false, makeTime(0)), @@ -149,6 +193,8 @@ func TestGarbageCollectNoMaxLimit(t *testing.T) { }) addPods(gc.podGetter, "foo", "foo1", "foo2", "foo3", "foo4") + fakePlugin.EXPECT().TearDownPod(gomock.Any(), gomock.Any(), gomock.Any()).Return(nil).Times(5) + assert.Nil(t, gc.GarbageCollect(kubecontainer.ContainerGCPolicy{MinAge: time.Minute, MaxPerPodContainer: -1, MaxContainers: -1}, true)) assert.Len(t, fakeDocker.Removed, 0) } @@ -261,10 +307,12 @@ func TestGarbageCollect(t *testing.T) { } for i, test := range tests { t.Logf("Running test case with index %d", i) - gc, fakeDocker := newTestContainerGC(t) + gc, fakeDocker, fakePlugin := newTestContainerGC(t) fakeDocker.SetFakeContainers(test.containers) addPods(gc.podGetter, "foo", "foo1", "foo2", "foo3", "foo4", "foo5", "foo6", "foo7") + fakePlugin.EXPECT().TearDownPod(gomock.Any(), gomock.Any(), gomock.Any()).Return(nil).AnyTimes() assert.Nil(t, gc.GarbageCollect(kubecontainer.ContainerGCPolicy{MinAge: time.Hour, MaxPerPodContainer: 2, MaxContainers: 6}, true)) verifyStringArrayEqualsAnyOrder(t, fakeDocker.Removed, test.expectedRemoved) + fakePlugin.Finish() } } diff --git a/pkg/kubelet/dockertools/docker_manager.go b/pkg/kubelet/dockertools/docker_manager.go index 318293dd21e..e97f0777c11 100644 --- a/pkg/kubelet/dockertools/docker_manager.go +++ b/pkg/kubelet/dockertools/docker_manager.go @@ -59,7 +59,7 @@ import ( "k8s.io/kubernetes/pkg/kubelet/images" "k8s.io/kubernetes/pkg/kubelet/lifecycle" "k8s.io/kubernetes/pkg/kubelet/metrics" - "k8s.io/kubernetes/pkg/kubelet/network" + knetwork "k8s.io/kubernetes/pkg/kubelet/network" "k8s.io/kubernetes/pkg/kubelet/network/hairpin" proberesults "k8s.io/kubernetes/pkg/kubelet/prober/results" "k8s.io/kubernetes/pkg/kubelet/qos" @@ -152,8 +152,8 @@ type DockerManager struct { // Directory of container logs. containerLogsDir string - // Network plugin. - networkPlugin network.NetworkPlugin + // Network plugin manager. + network *knetwork.PluginManager // Health check results. livenessManager proberesults.Manager @@ -226,7 +226,7 @@ func NewDockerManager( burst int, containerLogsDir string, osInterface kubecontainer.OSInterface, - networkPlugin network.NetworkPlugin, + networkPlugin knetwork.NetworkPlugin, runtimeHelper kubecontainer.RuntimeHelper, httpClient types.HttpGetter, execHandler ExecHandler, @@ -267,7 +267,7 @@ func NewDockerManager( dockerPuller: newDockerPuller(client), cgroupDriver: cgroupDriver, containerLogsDir: containerLogsDir, - networkPlugin: networkPlugin, + network: knetwork.NewPluginManager(networkPlugin), livenessManager: livenessManager, runtimeHelper: runtimeHelper, execHandler: execHandler, @@ -282,7 +282,7 @@ func NewDockerManager( cmdRunner := kubecontainer.DirectStreamingRunner(dm) dm.runner = lifecycle.NewHandlerRunner(httpClient, cmdRunner, dm) dm.imagePuller = images.NewImageManager(kubecontainer.FilterEventRecorder(recorder), dm, imageBackOff, serializeImagePulls, qps, burst) - dm.containerGC = NewContainerGC(client, podGetter, containerLogsDir) + dm.containerGC = NewContainerGC(client, podGetter, dm.network, containerLogsDir) dm.versionCache = cache.NewObjectCache( func() (interface{}, error) { @@ -357,10 +357,10 @@ func (dm *DockerManager) determineContainerIP(podNamespace, podName string, cont isHostNetwork := networkMode == namespaceModeHost // For host networking or default network plugin, GetPodNetworkStatus doesn't work - if !isHostNetwork && dm.networkPlugin.Name() != network.DefaultPluginName { - netStatus, err := dm.networkPlugin.GetPodNetworkStatus(podNamespace, podName, kubecontainer.DockerID(container.ID).ContainerID()) + if !isHostNetwork && dm.network.PluginName() != knetwork.DefaultPluginName { + netStatus, err := dm.network.GetPodNetworkStatus(podNamespace, podName, kubecontainer.DockerID(container.ID).ContainerID()) if err != nil { - glog.Errorf("NetworkPlugin %s failed on the status hook for pod '%s' - %v", dm.networkPlugin.Name(), podName, err) + glog.Error(err) return result, err } else if netStatus != nil { result = netStatus.IP.String() @@ -436,7 +436,7 @@ func (dm *DockerManager) inspectContainer(id string, podName, podNamespace strin // Container that are running, restarting and paused status.State = kubecontainer.ContainerStateRunning status.StartedAt = startedAt - if containerProvidesPodIP(dockerName) { + if containerProvidesPodIP(dockerName.ContainerName) { ip, err = dm.determineContainerIP(podNamespace, podName, iResult) // Kubelet doesn't handle the network error scenario if err != nil { @@ -1058,7 +1058,7 @@ func (dm *DockerManager) podInfraContainerChanged(pod *v1.Pod, podInfraContainer glog.V(4).Infof("host: %v, %v", pod.Spec.HostNetwork, networkMode) return true, nil } - } else if dm.networkPlugin.Name() != "cni" && dm.networkPlugin.Name() != "kubenet" { + } else if !dm.pluginDisablesDockerNetworking() { // Docker only exports ports from the pod infra container. Let's // collect all of the relevant ports and export them. for _, container := range pod.Spec.InitContainers { @@ -1091,6 +1091,10 @@ func getDockerNetworkMode(container *dockertypes.ContainerJSON) string { return "" } +func (dm *DockerManager) pluginDisablesDockerNetworking() bool { + return dm.network.PluginName() == "cni" || dm.network.PluginName() == "kubenet" +} + // newDockerVersion returns a semantically versioned docker version value func newDockerVersion(version string) (*utilversion.Version, error) { return utilversion.ParseSemantic(version) @@ -1436,21 +1440,22 @@ func (dm *DockerManager) KillPod(pod *v1.Pod, runningPod kubecontainer.Pod, grac } // NOTE(random-liu): The pod passed in could be *nil* when kubelet restarted. -func (dm *DockerManager) killPodWithSyncResult(pod *v1.Pod, runningPod kubecontainer.Pod, gracePeriodOverride *int64) (result kubecontainer.PodSyncResult) { +// runtimePod may contain either running or exited containers +func (dm *DockerManager) killPodWithSyncResult(pod *v1.Pod, runtimePod kubecontainer.Pod, gracePeriodOverride *int64) (result kubecontainer.PodSyncResult) { // Short circuit if there's nothing to kill. - if len(runningPod.Containers) == 0 { + if len(runtimePod.Containers) == 0 { return } // Send the kills in parallel since they may take a long time. - // There may be len(runningPod.Containers) or len(runningPod.Containers)-1 of result in the channel - containerResults := make(chan *kubecontainer.SyncResult, len(runningPod.Containers)) + // There may be len(runtimePod.Containers) or len(runtimePod.Containers)-1 of result in the channel + containerResults := make(chan *kubecontainer.SyncResult, len(runtimePod.Containers)) wg := sync.WaitGroup{} var ( - networkContainer *kubecontainer.Container - networkSpec *v1.Container + networkContainers []*kubecontainer.Container + networkSpecs []*v1.Container ) - wg.Add(len(runningPod.Containers)) - for _, container := range runningPod.Containers { + wg.Add(len(runtimePod.Containers)) + for _, container := range runtimePod.Containers { go func(container *kubecontainer.Container) { defer utilruntime.HandleCrash() defer wg.Done() @@ -1475,21 +1480,20 @@ func (dm *DockerManager) killPodWithSyncResult(pod *v1.Pod, runningPod kubeconta // TODO: Handle this without signaling the pod infra container to // adapt to the generic container runtime. - if container.Name == PodInfraContainerName { + if containerIsNetworked(container.Name) { // Store the container runtime for later deletion. // We do this so that PreStop handlers can run in the network namespace. - networkContainer = container - networkSpec = containerSpec - return + networkContainers = append(networkContainers, container) + networkSpecs = append(networkSpecs, containerSpec) + } else { + killContainerResult := kubecontainer.NewSyncResult(kubecontainer.KillContainer, container.Name) + err := dm.KillContainerInPod(container.ID, containerSpec, pod, "Need to kill pod.", gracePeriodOverride) + if err != nil { + killContainerResult.Fail(kubecontainer.ErrKillContainer, err.Error()) + glog.Errorf("Failed to delete container %v: %v; Skipping pod %q", container.ID.ID, err, runtimePod.ID) + } + containerResults <- killContainerResult } - - killContainerResult := kubecontainer.NewSyncResult(kubecontainer.KillContainer, container.Name) - err := dm.KillContainerInPod(container.ID, containerSpec, pod, "Need to kill pod.", gracePeriodOverride) - if err != nil { - killContainerResult.Fail(kubecontainer.ErrKillContainer, err.Error()) - glog.Errorf("Failed to delete container %v: %v; Skipping pod %q", container.ID.ID, err, runningPod.ID) - } - containerResults <- killContainerResult }(container) } wg.Wait() @@ -1497,29 +1501,37 @@ func (dm *DockerManager) killPodWithSyncResult(pod *v1.Pod, runningPod kubeconta for containerResult := range containerResults { result.AddSyncResult(containerResult) } - if networkContainer != nil { + + // Tear down any dead or running network/infra containers, but only kill + // those that are still running. + for i := range networkContainers { + networkContainer := networkContainers[i] + networkSpec := networkSpecs[i] + + teardownNetworkResult := kubecontainer.NewSyncResult(kubecontainer.TeardownNetwork, kubecontainer.BuildPodFullName(runtimePod.Name, runtimePod.Namespace)) + result.AddSyncResult(teardownNetworkResult) + ins, err := dm.client.InspectContainer(networkContainer.ID.ID) if err != nil { err = fmt.Errorf("Error inspecting container %v: %v", networkContainer.ID.ID, err) glog.Error(err) - result.Fail(err) - return + teardownNetworkResult.Fail(kubecontainer.ErrTeardownNetwork, err.Error()) + continue } + if getDockerNetworkMode(ins) != namespaceModeHost { - teardownNetworkResult := kubecontainer.NewSyncResult(kubecontainer.TeardownNetwork, kubecontainer.BuildPodFullName(runningPod.Name, runningPod.Namespace)) - result.AddSyncResult(teardownNetworkResult) - glog.V(3).Infof("Calling network plugin %s to tear down pod for %s", dm.networkPlugin.Name(), kubecontainer.BuildPodFullName(runningPod.Name, runningPod.Namespace)) - if err := dm.networkPlugin.TearDownPod(runningPod.Namespace, runningPod.Name, networkContainer.ID); err != nil { - message := fmt.Sprintf("Failed to teardown network for pod %q using network plugins %q: %v", runningPod.ID, dm.networkPlugin.Name(), err) - teardownNetworkResult.Fail(kubecontainer.ErrTeardownNetwork, message) - glog.Error(message) + if err := dm.network.TearDownPod(runtimePod.Namespace, runtimePod.Name, networkContainer.ID); err != nil { + teardownNetworkResult.Fail(kubecontainer.ErrTeardownNetwork, err.Error()) + glog.Error(err) } } - killContainerResult := kubecontainer.NewSyncResult(kubecontainer.KillContainer, networkContainer.Name) - result.AddSyncResult(killContainerResult) - if err := dm.KillContainerInPod(networkContainer.ID, networkSpec, pod, "Need to kill pod.", gracePeriodOverride); err != nil { - killContainerResult.Fail(kubecontainer.ErrKillContainer, err.Error()) - glog.Errorf("Failed to delete container %v: %v; Skipping pod %q", networkContainer.ID.ID, err, runningPod.ID) + if networkContainer.State == kubecontainer.ContainerStateRunning { + killContainerResult := kubecontainer.NewSyncResult(kubecontainer.KillContainer, networkContainer.Name) + result.AddSyncResult(killContainerResult) + if err := dm.KillContainerInPod(networkContainer.ID, networkSpec, pod, "Need to kill pod.", gracePeriodOverride); err != nil { + killContainerResult.Fail(kubecontainer.ErrKillContainer, err.Error()) + glog.Errorf("Failed to delete container %v: %v; Skipping pod %q", networkContainer.ID.ID, err, runtimePod.ID) + } } } return @@ -1915,7 +1927,7 @@ func (dm *DockerManager) createPodInfraContainer(pod *v1.Pod) (kubecontainer.Doc if kubecontainer.IsHostNetworkPod(pod) { netNamespace = namespaceModeHost - } else if dm.networkPlugin.Name() == "cni" || dm.networkPlugin.Name() == "kubenet" { + } else if dm.pluginDisablesDockerNetworking() { netNamespace = "none" } else { // Docker only exports ports from the pod infra container. Let's @@ -2148,9 +2160,29 @@ func (dm *DockerManager) SyncPod(pod *v1.Pod, _ v1.PodStatus, podStatus *kubecon glog.V(4).Infof("Killing Infra Container for %q, will start new one", format.Pod(pod)) } + // Get list of running container(s) to kill + podToKill := kubecontainer.ConvertPodStatusToRunningPod(dm.Type(), podStatus) + + // If there are dead network containers, also kill them to ensure + // their network resources get released and are available to be + // re-used by new net containers + for _, containerStatus := range podStatus.ContainerStatuses { + if containerIsNetworked(containerStatus.Name) && containerStatus.State == kubecontainer.ContainerStateExited { + container := &kubecontainer.Container{ + ID: containerStatus.ID, + Name: containerStatus.Name, + Image: containerStatus.Image, + ImageID: containerStatus.ImageID, + Hash: containerStatus.Hash, + State: containerStatus.State, + } + podToKill.Containers = append(podToKill.Containers, container) + } + } + // Killing phase: if we want to start new infra container, or nothing is running kill everything (including infra container) // TODO(random-liu): We'll use pod status directly in the future - killResult := dm.killPodWithSyncResult(pod, kubecontainer.ConvertPodStatusToRunningPod(dm.Type(), podStatus), nil) + killResult := dm.killPodWithSyncResult(pod, podToKill, nil) result.AddPodSyncResult(killResult) if killResult.Error() != nil { return @@ -2217,20 +2249,14 @@ func (dm *DockerManager) SyncPod(pod *v1.Pod, _ v1.PodStatus, podStatus *kubecon setupNetworkResult := kubecontainer.NewSyncResult(kubecontainer.SetupNetwork, kubecontainer.GetPodFullName(pod)) result.AddSyncResult(setupNetworkResult) if !kubecontainer.IsHostNetworkPod(pod) { - glog.V(3).Infof("Calling network plugin %s to setup pod for %s", dm.networkPlugin.Name(), format.Pod(pod)) - err = dm.networkPlugin.SetUpPod(pod.Namespace, pod.Name, podInfraContainerID.ContainerID()) - if err != nil { - // TODO: (random-liu) There shouldn't be "Skipping pod" in sync result message - message := fmt.Sprintf("Failed to setup network for pod %q using network plugins %q: %v; Skipping pod", format.Pod(pod), dm.networkPlugin.Name(), err) - setupNetworkResult.Fail(kubecontainer.ErrSetupNetwork, message) - glog.Error(message) + if err := dm.network.SetUpPod(pod.Namespace, pod.Name, podInfraContainerID.ContainerID()); err != nil { + setupNetworkResult.Fail(kubecontainer.ErrSetupNetwork, err.Error()) + glog.Error(err) // Delete infra container killContainerResult := kubecontainer.NewSyncResult(kubecontainer.KillContainer, PodInfraContainerName) result.AddSyncResult(killContainerResult) - if delErr := dm.KillContainerInPod(kubecontainer.ContainerID{ - ID: string(podInfraContainerID), - Type: "docker"}, nil, pod, message, nil); delErr != nil { + if delErr := dm.KillContainerInPod(podInfraContainerID.ContainerID(), nil, pod, err.Error(), nil); delErr != nil { killContainerResult.Fail(kubecontainer.ErrKillContainer, delErr.Error()) glog.Warningf("Clear infra container failed for pod %q: %v", format.Pod(pod), delErr) } @@ -2246,7 +2272,7 @@ func (dm *DockerManager) SyncPod(pod *v1.Pod, _ v1.PodStatus, podStatus *kubecon } if dm.configureHairpinMode { - if err = hairpin.SetUpContainerPid(podInfraContainer.State.Pid, network.DefaultInterfaceName); err != nil { + if err = hairpin.SetUpContainerPid(podInfraContainer.State.Pid, knetwork.DefaultInterfaceName); err != nil { glog.Warningf("Hairpin setup failed for pod %q: %v", format.Pod(pod), err) } } @@ -2679,7 +2705,7 @@ func (dm *DockerManager) GetPodStatus(uid kubetypes.UID, name, namespace string) } } containerStatuses = append(containerStatuses, result) - if containerProvidesPodIP(dockerName) && ip != "" { + if containerProvidesPodIP(dockerName.ContainerName) && ip != "" { podStatus.IP = ip } } diff --git a/pkg/kubelet/dockertools/docker_manager_linux.go b/pkg/kubelet/dockertools/docker_manager_linux.go index 333ab4673b8..539ae0a2ca4 100644 --- a/pkg/kubelet/dockertools/docker_manager_linux.go +++ b/pkg/kubelet/dockertools/docker_manager_linux.go @@ -52,8 +52,13 @@ func getContainerIP(container *dockertypes.ContainerJSON) string { func getNetworkingMode() string { return "" } // Returns true if the container name matches the infrastructure's container name -func containerProvidesPodIP(name *KubeletContainerName) bool { - return name.ContainerName == PodInfraContainerName +func containerProvidesPodIP(containerName string) bool { + return containerName == PodInfraContainerName +} + +// Only the infrastructure container needs network setup/teardown +func containerIsNetworked(containerName string) bool { + return containerName == PodInfraContainerName } // Returns Seccomp and AppArmor Security options diff --git a/pkg/kubelet/dockertools/docker_manager_linux_test.go b/pkg/kubelet/dockertools/docker_manager_linux_test.go index 0c61496281c..b749d8f955f 100644 --- a/pkg/kubelet/dockertools/docker_manager_linux_test.go +++ b/pkg/kubelet/dockertools/docker_manager_linux_test.go @@ -33,7 +33,7 @@ import ( kubecontainer "k8s.io/kubernetes/pkg/kubelet/container" "k8s.io/kubernetes/pkg/kubelet/events" "k8s.io/kubernetes/pkg/kubelet/network" - "k8s.io/kubernetes/pkg/kubelet/network/mock_network" + nettest "k8s.io/kubernetes/pkg/kubelet/network/testing" "k8s.io/kubernetes/pkg/security/apparmor" utilstrings "k8s.io/kubernetes/pkg/util/strings" ) @@ -455,8 +455,9 @@ func TestGetPodStatusFromNetworkPlugin(t *testing.T) { for _, test := range cases { dm, fakeDocker := newTestDockerManager() ctrl := gomock.NewController(t) - fnp := mock_network.NewMockNetworkPlugin(ctrl) - dm.networkPlugin = fnp + defer ctrl.Finish() + fnp := nettest.NewMockNetworkPlugin(ctrl) + dm.network = network.NewPluginManager(fnp) fakeDocker.SetFakeRunningContainers([]*FakeContainer{ { diff --git a/pkg/kubelet/dockertools/docker_manager_test.go b/pkg/kubelet/dockertools/docker_manager_test.go index f3f484bf291..da0ca6bf15c 100644 --- a/pkg/kubelet/dockertools/docker_manager_test.go +++ b/pkg/kubelet/dockertools/docker_manager_test.go @@ -54,7 +54,6 @@ import ( containertest "k8s.io/kubernetes/pkg/kubelet/container/testing" "k8s.io/kubernetes/pkg/kubelet/images" "k8s.io/kubernetes/pkg/kubelet/network" - "k8s.io/kubernetes/pkg/kubelet/network/mock_network" nettest "k8s.io/kubernetes/pkg/kubelet/network/testing" proberesults "k8s.io/kubernetes/pkg/kubelet/prober/results" "k8s.io/kubernetes/pkg/kubelet/types" @@ -1829,6 +1828,55 @@ func TestGetPodStatusNoSuchContainer(t *testing.T) { // Verify that we will try to start new contrainers even if the inspections // failed. verifyCalls(t, fakeDocker, []string{ + // Inspect dead infra container for possible network teardown + "inspect_container", + // Start a new infra container. + "create", "start", "inspect_container", "inspect_container", + // Start a new container. + "create", "start", "inspect_container", + }) +} + +func TestSyncPodDeadInfraContainerTeardown(t *testing.T) { + const ( + noSuchContainerID = "nosuchcontainer" + infraContainerID = "9876" + ) + dm, fakeDocker := newTestDockerManager() + dm.podInfraContainerImage = "pod_infra_image" + ctrl := gomock.NewController(t) + defer ctrl.Finish() + fnp := nettest.NewMockNetworkPlugin(ctrl) + dm.network = network.NewPluginManager(fnp) + + pod := makePod("foo", &v1.PodSpec{ + Containers: []v1.Container{{Name: noSuchContainerID}}, + }) + + fakeDocker.SetFakeContainers([]*FakeContainer{ + { + ID: infraContainerID, + Name: "/k8s_POD." + strconv.FormatUint(generatePodInfraContainerHash(pod), 16) + "_foo_new_12345678_42", + ExitCode: 0, + StartedAt: time.Now(), + FinishedAt: time.Now(), + Running: false, + }, + }) + + // Can be called multiple times due to GetPodStatus + fnp.EXPECT().Name().Return("someNetworkPlugin").AnyTimes() + fnp.EXPECT().TearDownPod("new", "foo", gomock.Any()).Return(nil) + fnp.EXPECT().GetPodNetworkStatus("new", "foo", gomock.Any()).Return(&network.PodNetworkStatus{IP: net.ParseIP("1.1.1.1")}, nil).AnyTimes() + fnp.EXPECT().SetUpPod("new", "foo", gomock.Any()).Return(nil) + + runSyncPod(t, dm, fakeDocker, pod, nil, false) + + // Verify that we will try to start new contrainers even if the inspections + // failed. + verifyCalls(t, fakeDocker, []string{ + // Inspect dead infra container for possible network teardown + "inspect_container", // Start a new infra container. "create", "start", "inspect_container", "inspect_container", // Start a new container. @@ -1878,8 +1926,8 @@ func TestSyncPodGetsPodIPFromNetworkPlugin(t *testing.T) { dm.podInfraContainerImage = "pod_infra_image" ctrl := gomock.NewController(t) defer ctrl.Finish() - fnp := mock_network.NewMockNetworkPlugin(ctrl) - dm.networkPlugin = fnp + fnp := nettest.NewMockNetworkPlugin(ctrl) + dm.network = network.NewPluginManager(fnp) pod := makePod("foo", &v1.PodSpec{ Containers: []v1.Container{ diff --git a/pkg/kubelet/dockertools/docker_manager_unsupported.go b/pkg/kubelet/dockertools/docker_manager_unsupported.go index ecfe5fce112..5bbcff7ac82 100644 --- a/pkg/kubelet/dockertools/docker_manager_unsupported.go +++ b/pkg/kubelet/dockertools/docker_manager_unsupported.go @@ -42,7 +42,11 @@ func getNetworkingMode() string { return "" } -func containerProvidesPodIP(name *KubeletContainerName) bool { +func containerProvidesPodIP(containerName string) bool { + return false +} + +func containerIsNetworked(containerName string) bool { return false } diff --git a/pkg/kubelet/dockertools/docker_manager_windows.go b/pkg/kubelet/dockertools/docker_manager_windows.go index 4a9ab80693e..8b29d964e19 100644 --- a/pkg/kubelet/dockertools/docker_manager_windows.go +++ b/pkg/kubelet/dockertools/docker_manager_windows.go @@ -65,8 +65,13 @@ func getNetworkingMode() string { // Infrastructure containers are not supported on Windows. For this reason, we // make sure to not grab the infra container's IP for the pod. -func containerProvidesPodIP(name *KubeletContainerName) bool { - return name.ContainerName != PodInfraContainerName +func containerProvidesPodIP(containerName string) bool { + return containerName != PodInfraContainerName +} + +// All containers in Windows need networking setup/teardown +func containerIsNetworked(containerName string) bool { + return true } // Returns nil as both Seccomp and AppArmor security options are not valid on Windows diff --git a/pkg/kubelet/network/BUILD b/pkg/kubelet/network/BUILD index 1e493f99dc4..36974c2601b 100644 --- a/pkg/kubelet/network/BUILD +++ b/pkg/kubelet/network/BUILD @@ -5,7 +5,6 @@ licenses(["notice"]) load( "@io_bazel_rules_go//go:def.bzl", "go_library", - "go_test", ) go_library( @@ -31,17 +30,6 @@ go_library( ], ) -go_test( - name = "go_default_test", - srcs = ["plugins_test.go"], - library = ":go_default_library", - tags = ["automanaged"], - deps = [ - "//pkg/apis/componentconfig:go_default_library", - "//pkg/kubelet/network/testing:go_default_library", - ], -) - filegroup( name = "package-srcs", srcs = glob(["**"]), @@ -57,7 +45,6 @@ filegroup( "//pkg/kubelet/network/hairpin:all-srcs", "//pkg/kubelet/network/hostport:all-srcs", "//pkg/kubelet/network/kubenet:all-srcs", - "//pkg/kubelet/network/mock_network:all-srcs", "//pkg/kubelet/network/testing:all-srcs", ], tags = ["automanaged"], diff --git a/pkg/kubelet/network/mock_network/BUILD b/pkg/kubelet/network/mock_network/BUILD deleted file mode 100644 index 8f06ba40be0..00000000000 --- a/pkg/kubelet/network/mock_network/BUILD +++ /dev/null @@ -1,34 +0,0 @@ -package(default_visibility = ["//visibility:public"]) - -licenses(["notice"]) - -load( - "@io_bazel_rules_go//go:def.bzl", - "go_library", -) - -go_library( - name = "go_default_library", - srcs = ["network_plugins.go"], - tags = ["automanaged"], - deps = [ - "//pkg/apis/componentconfig:go_default_library", - "//pkg/kubelet/container:go_default_library", - "//pkg/kubelet/network:go_default_library", - "//vendor:github.com/golang/mock/gomock", - "//vendor:k8s.io/apimachinery/pkg/util/sets", - ], -) - -filegroup( - name = "package-srcs", - srcs = glob(["**"]), - tags = ["automanaged"], - visibility = ["//visibility:private"], -) - -filegroup( - name = "all-srcs", - srcs = [":package-srcs"], - tags = ["automanaged"], -) diff --git a/pkg/kubelet/network/plugins.go b/pkg/kubelet/network/plugins.go index ed925562e1c..5b13ef86efb 100644 --- a/pkg/kubelet/network/plugins.go +++ b/pkg/kubelet/network/plugins.go @@ -20,6 +20,7 @@ import ( "fmt" "net" "strings" + "sync" "github.com/golang/glog" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -293,3 +294,123 @@ type NoopPortMappingGetter struct{} func (*NoopPortMappingGetter) GetPodPortMappings(containerID string) ([]*hostport.PortMapping, error) { return nil, nil } + +// The PluginManager wraps a kubelet network plugin and provides synchronization +// for a given pod's network operations. Each pod's setup/teardown/status operations +// are synchronized against each other, but network operations of other pods can +// proceed in parallel. +type PluginManager struct { + // Network plugin being wrapped + plugin NetworkPlugin + + // Pod list and lock + podsLock sync.Mutex + pods map[string]*podLock +} + +func NewPluginManager(plugin NetworkPlugin) *PluginManager { + return &PluginManager{ + plugin: plugin, + pods: make(map[string]*podLock), + } +} + +func (pm *PluginManager) PluginName() string { + return pm.plugin.Name() +} + +func (pm *PluginManager) Event(name string, details map[string]interface{}) { + pm.plugin.Event(name, details) +} + +func (pm *PluginManager) Status() error { + return pm.plugin.Status() +} + +type podLock struct { + // Count of in-flight operations for this pod; when this reaches zero + // the lock can be removed from the pod map + refcount uint + + // Lock to synchronize operations for this specific pod + mu sync.Mutex +} + +// Lock network operations for a specific pod. If that pod is not yet in +// the pod map, it will be added. The reference count for the pod will +// be increased. +func (pm *PluginManager) podLock(fullPodName string) *sync.Mutex { + pm.podsLock.Lock() + defer pm.podsLock.Unlock() + + lock, ok := pm.pods[fullPodName] + if !ok { + lock = &podLock{} + pm.pods[fullPodName] = lock + } + lock.refcount++ + return &lock.mu +} + +// Unlock network operations for a specific pod. The reference count for the +// pod will be decreased. If the reference count reaches zero, the pod will be +// removed from the pod map. +func (pm *PluginManager) podUnlock(fullPodName string) { + pm.podsLock.Lock() + defer pm.podsLock.Unlock() + + lock, ok := pm.pods[fullPodName] + if !ok { + glog.Warningf("Unbalanced pod lock unref for %s", fullPodName) + return + } else if lock.refcount == 0 { + // This should never ever happen, but handle it anyway + delete(pm.pods, fullPodName) + glog.Warningf("Pod lock for %s still in map with zero refcount", fullPodName) + return + } + lock.refcount-- + lock.mu.Unlock() + if lock.refcount == 0 { + delete(pm.pods, fullPodName) + } +} + +func (pm *PluginManager) GetPodNetworkStatus(podNamespace, podName string, id kubecontainer.ContainerID) (*PodNetworkStatus, error) { + fullPodName := kubecontainer.BuildPodFullName(podName, podNamespace) + pm.podLock(fullPodName).Lock() + defer pm.podUnlock(fullPodName) + + netStatus, err := pm.plugin.GetPodNetworkStatus(podNamespace, podName, id) + if err != nil { + return nil, fmt.Errorf("NetworkPlugin %s failed on the status hook for pod %q: %v", pm.plugin.Name(), fullPodName, err) + } + + return netStatus, nil +} + +func (pm *PluginManager) SetUpPod(podNamespace, podName string, id kubecontainer.ContainerID) error { + fullPodName := kubecontainer.BuildPodFullName(podName, podNamespace) + pm.podLock(fullPodName).Lock() + defer pm.podUnlock(fullPodName) + + glog.V(3).Infof("Calling network plugin %s to set up pod %q", pm.plugin.Name(), fullPodName) + if err := pm.plugin.SetUpPod(podNamespace, podName, id); err != nil { + return fmt.Errorf("NetworkPlugin %s failed to set up pod %q network: %v", pm.plugin.Name(), fullPodName, err) + } + + return nil +} + +func (pm *PluginManager) TearDownPod(podNamespace, podName string, id kubecontainer.ContainerID) error { + fullPodName := kubecontainer.BuildPodFullName(podName, podNamespace) + pm.podLock(fullPodName).Lock() + defer pm.podUnlock(fullPodName) + + glog.V(3).Infof("Calling network plugin %s to tear down pod %q", pm.plugin.Name(), fullPodName) + if err := pm.plugin.TearDownPod(podNamespace, podName, id); err != nil { + return fmt.Errorf("NetworkPlugin %s failed to teardown pod %q network: %v", pm.plugin.Name(), fullPodName, err) + } + + return nil +} diff --git a/pkg/kubelet/network/plugins_test.go b/pkg/kubelet/network/plugins_test.go deleted file mode 100644 index bbb6afe51fb..00000000000 --- a/pkg/kubelet/network/plugins_test.go +++ /dev/null @@ -1,38 +0,0 @@ -/* -Copyright 2014 The Kubernetes Authors. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package network - -import ( - "testing" - - "k8s.io/kubernetes/pkg/apis/componentconfig" - nettest "k8s.io/kubernetes/pkg/kubelet/network/testing" -) - -func TestSelectDefaultPlugin(t *testing.T) { - all_plugins := []NetworkPlugin{} - plug, err := InitNetworkPlugin(all_plugins, "", nettest.NewFakeHost(nil), componentconfig.HairpinNone, "10.0.0.0/8", UseDefaultMTU) - if err != nil { - t.Fatalf("Unexpected error in selecting default plugin: %v", err) - } - if plug == nil { - t.Fatalf("Failed to select the default plugin.") - } - if plug.Name() != DefaultPluginName { - t.Errorf("Failed to select the default plugin. Expected %s. Got %s", DefaultPluginName, plug.Name()) - } -} diff --git a/pkg/kubelet/network/testing/BUILD b/pkg/kubelet/network/testing/BUILD index b4082c77ddf..30f594a6aee 100644 --- a/pkg/kubelet/network/testing/BUILD +++ b/pkg/kubelet/network/testing/BUILD @@ -5,18 +5,40 @@ licenses(["notice"]) load( "@io_bazel_rules_go//go:def.bzl", "go_library", + "go_test", ) go_library( name = "go_default_library", - srcs = ["fake_host.go"], + srcs = [ + "fake_host.go", + "mock_network_plugin.go", + ], tags = ["automanaged"], deps = [ "//pkg/api/v1:go_default_library", + "//pkg/apis/componentconfig:go_default_library", "//pkg/client/clientset_generated/clientset:go_default_library", "//pkg/kubelet/container:go_default_library", "//pkg/kubelet/container/testing:go_default_library", + "//pkg/kubelet/network:go_default_library", "//pkg/kubelet/network/hostport:go_default_library", + "//vendor:github.com/golang/mock/gomock", + "//vendor:k8s.io/apimachinery/pkg/util/sets", + ], +) + +go_test( + name = "go_default_test", + srcs = ["plugins_test.go"], + library = ":go_default_library", + tags = ["automanaged"], + deps = [ + "//pkg/apis/componentconfig:go_default_library", + "//pkg/kubelet/container:go_default_library", + "//pkg/kubelet/network:go_default_library", + "//vendor:github.com/golang/mock/gomock", + "//vendor:k8s.io/apimachinery/pkg/util/sets", ], ) diff --git a/pkg/kubelet/network/mock_network/network_plugins.go b/pkg/kubelet/network/testing/mock_network_plugin.go similarity index 98% rename from pkg/kubelet/network/mock_network/network_plugins.go rename to pkg/kubelet/network/testing/mock_network_plugin.go index fd7259d4a86..3eaa3fefe59 100644 --- a/pkg/kubelet/network/mock_network/network_plugins.go +++ b/pkg/kubelet/network/testing/mock_network_plugin.go @@ -14,11 +14,11 @@ See the License for the specific language governing permissions and limitations under the License. */ -// Generated code, generated via: `mockgen k8s.io/kubernetes/pkg/kubelet/network NetworkPlugin > $GOPATH/src/k8s.io/kubernetes/pkg/kubelet/network/mock_network/network_plugins.go` +// Generated code, generated via: `mockgen k8s.io/kubernetes/pkg/kubelet/network NetworkPlugin > $GOPATH/src/k8s.io/kubernetes/pkg/kubelet/network/testing/mock_network_plugin.go` // Edited by hand for boilerplate and gofmt. // TODO, this should be autogenerated/autoupdated by scripts. -package mock_network +package testing import ( gomock "github.com/golang/mock/gomock" diff --git a/pkg/kubelet/network/testing/plugins_test.go b/pkg/kubelet/network/testing/plugins_test.go new file mode 100644 index 00000000000..96f46013ea4 --- /dev/null +++ b/pkg/kubelet/network/testing/plugins_test.go @@ -0,0 +1,218 @@ +/* +Copyright 2014 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package testing + +import ( + "fmt" + "net" + "sync" + "testing" + + utilsets "k8s.io/apimachinery/pkg/util/sets" + "k8s.io/kubernetes/pkg/apis/componentconfig" + kubecontainer "k8s.io/kubernetes/pkg/kubelet/container" + "k8s.io/kubernetes/pkg/kubelet/network" + + "github.com/golang/mock/gomock" +) + +func TestSelectDefaultPlugin(t *testing.T) { + all_plugins := []network.NetworkPlugin{} + plug, err := network.InitNetworkPlugin(all_plugins, "", NewFakeHost(nil), componentconfig.HairpinNone, "10.0.0.0/8", network.UseDefaultMTU) + if err != nil { + t.Fatalf("Unexpected error in selecting default plugin: %v", err) + } + if plug == nil { + t.Fatalf("Failed to select the default plugin.") + } + if plug.Name() != network.DefaultPluginName { + t.Errorf("Failed to select the default plugin. Expected %s. Got %s", network.DefaultPluginName, plug.Name()) + } +} + +func TestPluginManager(t *testing.T) { + ctrl := gomock.NewController(t) + fnp := NewMockNetworkPlugin(ctrl) + defer fnp.Finish() + pm := network.NewPluginManager(fnp) + + fnp.EXPECT().Name().Return("someNetworkPlugin").AnyTimes() + + allCreatedWg := sync.WaitGroup{} + allCreatedWg.Add(1) + allDoneWg := sync.WaitGroup{} + + // 10 pods, 4 setup/status/teardown runs each. Ensure that network locking + // works and the pod map isn't concurrently accessed + for i := 0; i < 10; i++ { + podName := fmt.Sprintf("pod%d", i) + containerID := kubecontainer.ContainerID{ID: podName} + + fnp.EXPECT().SetUpPod("", podName, containerID).Return(nil).Times(4) + fnp.EXPECT().GetPodNetworkStatus("", podName, containerID).Return(&network.PodNetworkStatus{IP: net.ParseIP("1.2.3.4")}, nil).Times(4) + fnp.EXPECT().TearDownPod("", podName, containerID).Return(nil).Times(4) + + for x := 0; x < 4; x++ { + allDoneWg.Add(1) + go func(name string, id kubecontainer.ContainerID, num int) { + defer allDoneWg.Done() + + // Block all goroutines from running until all have + // been created and are ready. This ensures we + // have more pod network operations running + // concurrently. + allCreatedWg.Wait() + + if err := pm.SetUpPod("", name, id); err != nil { + t.Errorf("Failed to set up pod %q: %v", name, err) + return + } + + if _, err := pm.GetPodNetworkStatus("", name, id); err != nil { + t.Errorf("Failed to inspect pod %q: %v", name, err) + return + } + + if err := pm.TearDownPod("", name, id); err != nil { + t.Errorf("Failed to tear down pod %q: %v", name, err) + return + } + }(podName, containerID, x) + } + } + // Block all goroutines from running until all have been created and started + allCreatedWg.Done() + + // Wait for them all to finish + allDoneWg.Wait() +} + +type hookableFakeNetworkPluginSetupHook func(namespace, name string, id kubecontainer.ContainerID) + +type hookableFakeNetworkPlugin struct { + setupHook hookableFakeNetworkPluginSetupHook +} + +func newHookableFakeNetworkPlugin(setupHook hookableFakeNetworkPluginSetupHook) *hookableFakeNetworkPlugin { + return &hookableFakeNetworkPlugin{ + setupHook: setupHook, + } +} + +func (p *hookableFakeNetworkPlugin) Init(host network.Host, hairpinMode componentconfig.HairpinMode, nonMasqueradeCIDR string, mtu int) error { + return nil +} + +func (p *hookableFakeNetworkPlugin) Event(name string, details map[string]interface{}) { +} + +func (p *hookableFakeNetworkPlugin) Name() string { + return "fakeplugin" +} + +func (p *hookableFakeNetworkPlugin) Capabilities() utilsets.Int { + return utilsets.NewInt() +} + +func (p *hookableFakeNetworkPlugin) SetUpPod(namespace string, name string, id kubecontainer.ContainerID) error { + if p.setupHook != nil { + p.setupHook(namespace, name, id) + } + return nil +} + +func (p *hookableFakeNetworkPlugin) TearDownPod(string, string, kubecontainer.ContainerID) error { + return nil +} + +func (p *hookableFakeNetworkPlugin) GetPodNetworkStatus(string, string, kubecontainer.ContainerID) (*network.PodNetworkStatus, error) { + return &network.PodNetworkStatus{IP: net.ParseIP("10.1.2.3")}, nil +} + +func (p *hookableFakeNetworkPlugin) Status() error { + return nil +} + +// Ensure that one pod's network operations don't block another's. If the +// test is successful (eg, first pod doesn't block on second) the test +// will complete. If unsuccessful, it will hang and get killed. +func TestMultiPodParallelNetworkOps(t *testing.T) { + podWg := sync.WaitGroup{} + podWg.Add(1) + + // Can't do this with MockNetworkPlugin because the gomock controller + // has its own locks which don't allow the parallel network operation + // to proceed. + didWait := false + fakePlugin := newHookableFakeNetworkPlugin(func(podNamespace, podName string, id kubecontainer.ContainerID) { + if podName == "waiter" { + podWg.Wait() + didWait = true + } + }) + pm := network.NewPluginManager(fakePlugin) + + opsWg := sync.WaitGroup{} + + // Start the pod that will wait for the other to complete + opsWg.Add(1) + go func() { + defer opsWg.Done() + + podName := "waiter" + containerID := kubecontainer.ContainerID{ID: podName} + + // Setup will block on the runner pod completing. If network + // operations locking isn't correct (eg pod network operations + // block other pods) setUpPod() will never return. + if err := pm.SetUpPod("", podName, containerID); err != nil { + t.Errorf("Failed to set up waiter pod: %v", err) + return + } + + if err := pm.TearDownPod("", podName, containerID); err != nil { + t.Errorf("Failed to tear down waiter pod: %v", err) + return + } + }() + + opsWg.Add(1) + go func() { + defer opsWg.Done() + // Let other pod proceed + defer podWg.Done() + + podName := "runner" + containerID := kubecontainer.ContainerID{ID: podName} + + if err := pm.SetUpPod("", podName, containerID); err != nil { + t.Errorf("Failed to set up runner pod: %v", err) + return + } + + if err := pm.TearDownPod("", podName, containerID); err != nil { + t.Errorf("Failed to tear down runner pod: %v", err) + return + } + }() + + opsWg.Wait() + + if !didWait { + t.Errorf("waiter pod didn't wait for runner pod!") + } +} diff --git a/pkg/kubelet/rkt/BUILD b/pkg/kubelet/rkt/BUILD index b2ce390c7af..679bbeb3467 100644 --- a/pkg/kubelet/rkt/BUILD +++ b/pkg/kubelet/rkt/BUILD @@ -76,7 +76,7 @@ go_test( "//pkg/kubelet/lifecycle:go_default_library", "//pkg/kubelet/network:go_default_library", "//pkg/kubelet/network/kubenet:go_default_library", - "//pkg/kubelet/network/mock_network:go_default_library", + "//pkg/kubelet/network/testing:go_default_library", "//pkg/kubelet/types:go_default_library", "//pkg/util/exec:go_default_library", "//vendor:github.com/appc/spec/schema", diff --git a/pkg/kubelet/rkt/rkt.go b/pkg/kubelet/rkt/rkt.go index b06729cde52..9668c80be13 100644 --- a/pkg/kubelet/rkt/rkt.go +++ b/pkg/kubelet/rkt/rkt.go @@ -163,8 +163,8 @@ type Runtime struct { execer utilexec.Interface os kubecontainer.OSInterface - // Network plugin. - networkPlugin network.NetworkPlugin + // Network plugin manager. + network *network.PluginManager // If true, the "hairpin mode" flag is set on container interfaces. // A false value means the kubelet just backs off from setting it, @@ -266,7 +266,7 @@ func New( runtimeHelper: runtimeHelper, recorder: recorder, livenessManager: livenessManager, - networkPlugin: networkPlugin, + network: network.NewPluginManager(networkPlugin), execer: execer, touchPath: touchPath, nsenterPath: nsenterPath, @@ -946,7 +946,7 @@ func serviceFilePath(serviceName string) string { // The pod does not run in host network. And // The pod runs inside a netns created outside of rkt. func (r *Runtime) shouldCreateNetns(pod *v1.Pod) bool { - return !kubecontainer.IsHostNetworkPod(pod) && r.networkPlugin.Name() != network.DefaultPluginName + return !kubecontainer.IsHostNetworkPod(pod) && r.network.PluginName() != network.DefaultPluginName } // usesRktHostNetwork returns true if: @@ -1047,18 +1047,17 @@ func (r *Runtime) generateRunCommand(pod *v1.Pod, uuid, netnsName string) (strin } func (r *Runtime) cleanupPodNetwork(pod *v1.Pod) error { - glog.V(3).Infof("Calling network plugin %s to tear down pod for %s", r.networkPlugin.Name(), format.Pod(pod)) + glog.V(3).Infof("Calling network plugin %s to tear down pod for %s", r.network.PluginName(), format.Pod(pod)) // No-op if the pod is not running in a created netns. if !r.shouldCreateNetns(pod) { return nil } - var teardownErr error containerID := kubecontainer.ContainerID{ID: string(pod.UID)} - if err := r.networkPlugin.TearDownPod(pod.Namespace, pod.Name, containerID); err != nil { - teardownErr = fmt.Errorf("rkt: failed to tear down network for pod %s: %v", format.Pod(pod), err) - glog.Errorf("%v", teardownErr) + teardownErr := r.network.TearDownPod(pod.Namespace, pod.Name, containerID) + if teardownErr != nil { + glog.Error(teardownErr) } if _, err := r.execer.Command("ip", "netns", "del", makePodNetnsName(pod.UID)).Output(); err != nil { @@ -1265,7 +1264,7 @@ func netnsPathFromName(netnsName string) string { // // If the pod is running in host network or is running using the no-op plugin, then nothing will be done. func (r *Runtime) setupPodNetwork(pod *v1.Pod) (string, string, error) { - glog.V(3).Infof("Calling network plugin %s to set up pod for %s", r.networkPlugin.Name(), format.Pod(pod)) + glog.V(3).Infof("Calling network plugin %s to set up pod for %s", r.network.PluginName(), format.Pod(pod)) // No-op if the pod is not running in a created netns. if !r.shouldCreateNetns(pod) { @@ -1282,15 +1281,14 @@ func (r *Runtime) setupPodNetwork(pod *v1.Pod) (string, string, error) { } // Set up networking with the network plugin - glog.V(3).Infof("Calling network plugin %s to setup pod for %s", r.networkPlugin.Name(), format.Pod(pod)) containerID := kubecontainer.ContainerID{ID: string(pod.UID)} - err = r.networkPlugin.SetUpPod(pod.Namespace, pod.Name, containerID) + err = r.network.SetUpPod(pod.Namespace, pod.Name, containerID) if err != nil { - return "", "", fmt.Errorf("failed to set up pod network: %v", err) + return "", "", err } - status, err := r.networkPlugin.GetPodNetworkStatus(pod.Namespace, pod.Name, containerID) + status, err := r.network.GetPodNetworkStatus(pod.Namespace, pod.Name, containerID) if err != nil { - return "", "", fmt.Errorf("failed to get status of pod network: %v", err) + return "", "", err } if r.configureHairpinMode { @@ -2329,7 +2327,7 @@ func (r *Runtime) GetPodStatus(uid kubetypes.UID, name, namespace string) (*kube } // If we are running no-op network plugin, then get the pod IP from the rkt pod status. - if r.networkPlugin.Name() == network.DefaultPluginName { + if r.network.PluginName() == network.DefaultPluginName { if latestPod != nil { for _, n := range latestPod.Networks { if n.Name == defaultNetworkName { @@ -2340,9 +2338,9 @@ func (r *Runtime) GetPodStatus(uid kubetypes.UID, name, namespace string) (*kube } } else { containerID := kubecontainer.ContainerID{ID: string(uid)} - status, err := r.networkPlugin.GetPodNetworkStatus(namespace, name, containerID) + status, err := r.network.GetPodNetworkStatus(namespace, name, containerID) if err != nil { - glog.Warningf("rkt: Failed to get pod network status for pod (UID %q, name %q, namespace %q): %v", uid, name, namespace, err) + glog.Warningf("rkt: %v", err) } else if status != nil { // status can be nil when the pod is running on the host network, in which case the pod IP // will be populated by the upper layer. diff --git a/pkg/kubelet/rkt/rkt_test.go b/pkg/kubelet/rkt/rkt_test.go index e712aaba0bd..57a6e382ee7 100644 --- a/pkg/kubelet/rkt/rkt_test.go +++ b/pkg/kubelet/rkt/rkt_test.go @@ -43,7 +43,7 @@ import ( "k8s.io/kubernetes/pkg/kubelet/lifecycle" "k8s.io/kubernetes/pkg/kubelet/network" "k8s.io/kubernetes/pkg/kubelet/network/kubenet" - "k8s.io/kubernetes/pkg/kubelet/network/mock_network" + nettest "k8s.io/kubernetes/pkg/kubelet/network/testing" "k8s.io/kubernetes/pkg/kubelet/types" utilexec "k8s.io/kubernetes/pkg/util/exec" ) @@ -583,7 +583,7 @@ func TestGetPodStatus(t *testing.T) { defer ctrl.Finish() fr := newFakeRktInterface() fs := newFakeSystemd() - fnp := mock_network.NewMockNetworkPlugin(ctrl) + fnp := nettest.NewMockNetworkPlugin(ctrl) fos := &containertesting.FakeOS{} frh := &fakeRuntimeHelper{} r := &Runtime{ @@ -591,7 +591,7 @@ func TestGetPodStatus(t *testing.T) { systemd: fs, runtimeHelper: frh, os: fos, - networkPlugin: fnp, + network: network.NewPluginManager(fnp), } ns := func(seconds int64) int64 { @@ -826,6 +826,8 @@ func TestGetPodStatus(t *testing.T) { } else { fnp.EXPECT().GetPodNetworkStatus("default", "guestbook", kubecontainer.ContainerID{ID: "42"}). Return(nil, fmt.Errorf("no such network")) + // Plugin name only requested again in error case + fnp.EXPECT().Name().Return(tt.networkPluginName) } } @@ -1388,7 +1390,7 @@ func TestGenerateRunCommand(t *testing.T) { for i, tt := range tests { testCaseHint := fmt.Sprintf("test case #%d", i) - rkt.networkPlugin = tt.networkPlugin + rkt.network = network.NewPluginManager(tt.networkPlugin) rkt.runtimeHelper = &fakeRuntimeHelper{tt.dnsServers, tt.dnsSearches, tt.hostName, "", tt.err} rkt.execer = &utilexec.FakeExec{CommandScript: []utilexec.FakeCommandAction{func(cmd string, args ...string) utilexec.Cmd { return utilexec.InitFakeCmd(&utilexec.FakeCmd{}, cmd, args...)