From 787d42d50b5af68d4cad535746ff73b08f045068 Mon Sep 17 00:00:00 2001 From: Victor Marmol Date: Tue, 28 Apr 2015 11:02:29 -0700 Subject: [PATCH] Move network plugin teardown to DockerManager. This teardown is Docker-specific and will let us also do the setup in the manager in future cleanups. --- pkg/kubelet/dockertools/docker_test.go | 7 +++- pkg/kubelet/dockertools/manager.go | 23 ++++++++-- pkg/kubelet/kubelet.go | 58 +++++++++----------------- pkg/kubelet/kubelet_test.go | 2 +- pkg/kubelet/pod_workers_test.go | 4 +- pkg/kubelet/runonce_test.go | 3 +- 6 files changed, 51 insertions(+), 46 deletions(-) diff --git a/pkg/kubelet/dockertools/docker_test.go b/pkg/kubelet/dockertools/docker_test.go index da2d7b3b660..be9d1ca610e 100644 --- a/pkg/kubelet/dockertools/docker_test.go +++ b/pkg/kubelet/dockertools/docker_test.go @@ -27,6 +27,7 @@ import ( "github.com/GoogleCloudPlatform/kubernetes/pkg/client/record" "github.com/GoogleCloudPlatform/kubernetes/pkg/credentialprovider" kubecontainer "github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/container" + "github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/network" "github.com/GoogleCloudPlatform/kubernetes/pkg/types" "github.com/GoogleCloudPlatform/kubernetes/pkg/util" docker "github.com/fsouza/go-dockerclient" @@ -392,7 +393,8 @@ func TestIsImagePresent(t *testing.T) { func TestGetRunningContainers(t *testing.T) { fakeDocker := &FakeDockerClient{Errors: make(map[string]error)} fakeRecorder := &record.FakeRecorder{} - containerManager := NewDockerManager(fakeDocker, fakeRecorder, nil, nil, PodInfraContainerImage, 0, 0, "", kubecontainer.FakeOS{}) + np, _ := network.InitNetworkPlugin([]network.NetworkPlugin{}, "", network.NewFakeHost(nil)) + containerManager := NewDockerManager(fakeDocker, fakeRecorder, nil, nil, PodInfraContainerImage, 0, 0, "", kubecontainer.FakeOS{}, np) tests := []struct { containers map[string]*docker.Container inputIDs []string @@ -657,7 +659,8 @@ func TestFindContainersByPod(t *testing.T) { }, } fakeClient := &FakeDockerClient{} - containerManager := NewDockerManager(fakeClient, &record.FakeRecorder{}, nil, nil, PodInfraContainerImage, 0, 0, "", kubecontainer.FakeOS{}) + np, _ := network.InitNetworkPlugin([]network.NetworkPlugin{}, "", network.NewFakeHost(nil)) + containerManager := NewDockerManager(fakeClient, &record.FakeRecorder{}, nil, nil, PodInfraContainerImage, 0, 0, "", kubecontainer.FakeOS{}, np) for i, test := range tests { fakeClient.ContainerList = test.containerList fakeClient.ExitedContainerList = test.exitedContainerList diff --git a/pkg/kubelet/dockertools/manager.go b/pkg/kubelet/dockertools/manager.go index 440806db5ac..040883859f0 100644 --- a/pkg/kubelet/dockertools/manager.go +++ b/pkg/kubelet/dockertools/manager.go @@ -34,6 +34,7 @@ import ( "github.com/GoogleCloudPlatform/kubernetes/pkg/capabilities" "github.com/GoogleCloudPlatform/kubernetes/pkg/client/record" kubecontainer "github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/container" + "github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/network" kubeletTypes "github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/types" "github.com/GoogleCloudPlatform/kubernetes/pkg/types" "github.com/GoogleCloudPlatform/kubernetes/pkg/util" @@ -82,6 +83,9 @@ type DockerManager struct { // Directory of container logs. containerLogsDir string + + // Network plugin. + networkPlugin network.NetworkPlugin } func NewDockerManager( @@ -93,7 +97,8 @@ func NewDockerManager( qps float32, burst int, containerLogsDir string, - osInterface kubecontainer.OSInterface) *DockerManager { + osInterface kubecontainer.OSInterface, + networkPlugin network.NetworkPlugin) *DockerManager { // Work out the location of the Docker runtime, defaulting to /var/lib/docker // if there are any problems. dockerRoot := "/var/lib/docker" @@ -136,6 +141,7 @@ func NewDockerManager( Puller: newDockerPuller(client, qps, burst), dockerRoot: dockerRoot, containerLogsDir: containerLogsDir, + networkPlugin: networkPlugin, } } @@ -942,13 +948,24 @@ func (dm *DockerManager) PortForward(pod *kubecontainer.Pod, port uint16, stream // Kills all containers in the specified pod func (dm *DockerManager) KillPod(pod kubecontainer.Pod) error { - // Send the kills in parallel since they may take a long time. - errs := make(chan error, len(pod.Containers)) + // Send the kills in parallel since they may take a long time. Len + 1 since there + // can be Len errors + the networkPlugin teardown error. + errs := make(chan error, len(pod.Containers)+1) wg := sync.WaitGroup{} for _, container := range pod.Containers { wg.Add(1) go func(container *kubecontainer.Container) { defer util.HandleCrash() + + // TODO: Handle this without signaling the pod infra container to + // adapt to the generic container runtime. + if container.Name == PodInfraContainerName { + err := dm.networkPlugin.TearDownPod(pod.Namespace, pod.Name, kubeletTypes.DockerID(container.ID)) + if err != nil { + glog.Errorf("Failed tearing down the infra container: %v", err) + errs <- err + } + } err := dm.KillContainer(container.ID) if err != nil { glog.Errorf("Failed to delete container: %v; Skipping pod %q", err, pod.ID) diff --git a/pkg/kubelet/kubelet.go b/pkg/kubelet/kubelet.go index c05ce6f8d22..639faa4fe53 100644 --- a/pkg/kubelet/kubelet.go +++ b/pkg/kubelet/kubelet.go @@ -203,16 +203,6 @@ func NewMainKubelet( statusManager := newStatusManager(kubeClient) readinessManager := kubecontainer.NewReadinessManager() containerRefManager := kubecontainer.NewRefManager() - containerManager := dockertools.NewDockerManager( - dockerClient, - recorder, - readinessManager, - containerRefManager, - podInfraContainerImage, - pullQPS, - pullBurst, - containerLogsDir, - osInterface) volumeManager := newVolumeManager() @@ -224,7 +214,6 @@ func NewMainKubelet( resyncInterval: resyncInterval, containerRefManager: containerRefManager, readinessManager: readinessManager, - runner: containerManager, httpClient: &http.Client{}, sourcesReady: sourcesReady, clusterDomain: clusterDomain, @@ -241,12 +230,30 @@ func NewMainKubelet( volumeManager: volumeManager, cloud: cloud, nodeRef: nodeRef, - containerManager: containerManager, nodeStatusUpdateFrequency: nodeStatusUpdateFrequency, resourceContainer: resourceContainer, os: osInterface, } + if plug, err := network.InitNetworkPlugin(networkPlugins, networkPluginName, &networkHost{klet}); err != nil { + return nil, err + } else { + klet.networkPlugin = plug + } + containerManager := dockertools.NewDockerManager( + dockerClient, + recorder, + readinessManager, + containerRefManager, + podInfraContainerImage, + pullQPS, + pullBurst, + containerLogsDir, + osInterface, + klet.networkPlugin) + klet.runner = containerManager + klet.containerManager = containerManager + klet.podManager = newBasicPodManager(klet.kubeClient) klet.prober = newProber(klet.runner, klet.readinessManager, klet.containerRefManager, klet.recorder) klet.handlerRunner = newHandlerRunner(klet.httpClient, klet.runner, klet.containerManager) @@ -267,11 +274,6 @@ func NewMainKubelet( return nil, err } - if plug, err := network.InitNetworkPlugin(networkPlugins, networkPluginName, &networkHost{klet}); err != nil { - return nil, err - } else { - klet.networkPlugin = plug - } // If the container logs directory does not exist, create it. if _, err := os.Stat(containerLogsDir); err != nil { if err := osInterface.Mkdir(containerLogsDir, 0755); err != nil { @@ -888,27 +890,7 @@ func (kl *Kubelet) pullImage(pod *api.Pod, container *api.Container) error { // Kill all running containers in a pod (includes the pod infra container). func (kl *Kubelet) killPod(pod kubecontainer.Pod) error { - // TODO(vmarmol): Consider handling non-Docker runtimes, the plugins are not friendly to it today. - container, err := kl.containerManager.GetPodInfraContainer(pod) - errList := []error{} - if err == nil { - // Call the networking plugin for teardown. - err = kl.networkPlugin.TearDownPod(pod.Namespace, pod.Name, kubeletTypes.DockerID(container.ID)) - if err != nil { - glog.Errorf("Failed tearing down the network plugin for pod %q: %v", pod.ID, err) - errList = append(errList, err) - } - } - - err = kl.containerManager.KillPod(pod) - if err != nil { - errList = append(errList, err) - } - - if len(errList) > 0 { - return utilErrors.NewAggregate(errList) - } - return nil + return kl.containerManager.KillPod(pod) } type empty struct{} diff --git a/pkg/kubelet/kubelet_test.go b/pkg/kubelet/kubelet_test.go index 8f911963064..5fd1b21712f 100644 --- a/pkg/kubelet/kubelet_test.go +++ b/pkg/kubelet/kubelet_test.go @@ -105,7 +105,7 @@ func newTestKubelet(t *testing.T) *TestKubelet { podManager, fakeMirrorClient := newFakePodManager() kubelet.podManager = podManager kubelet.containerRefManager = kubecontainer.NewRefManager() - kubelet.containerManager = dockertools.NewDockerManager(fakeDocker, fakeRecorder, kubelet.readinessManager, kubelet.containerRefManager, dockertools.PodInfraContainerImage, 0, 0, "", kubelet.os) + kubelet.containerManager = dockertools.NewDockerManager(fakeDocker, fakeRecorder, kubelet.readinessManager, kubelet.containerRefManager, dockertools.PodInfraContainerImage, 0, 0, "", kubelet.os, kubelet.networkPlugin) kubelet.runtimeCache = kubecontainer.NewFakeRuntimeCache(kubelet.containerManager) kubelet.podWorkers = newPodWorkers( kubelet.runtimeCache, diff --git a/pkg/kubelet/pod_workers_test.go b/pkg/kubelet/pod_workers_test.go index 24ac7ac8678..d540ff38394 100644 --- a/pkg/kubelet/pod_workers_test.go +++ b/pkg/kubelet/pod_workers_test.go @@ -25,6 +25,7 @@ import ( "github.com/GoogleCloudPlatform/kubernetes/pkg/client/record" kubecontainer "github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/container" "github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/dockertools" + "github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/network" "github.com/GoogleCloudPlatform/kubernetes/pkg/types" ) @@ -40,7 +41,8 @@ func newPod(uid, name string) *api.Pod { func createPodWorkers() (*podWorkers, map[types.UID][]string) { fakeDocker := &dockertools.FakeDockerClient{} fakeRecorder := &record.FakeRecorder{} - dockerManager := dockertools.NewDockerManager(fakeDocker, fakeRecorder, nil, nil, dockertools.PodInfraContainerImage, 0, 0, "", kubecontainer.FakeOS{}) + np, _ := network.InitNetworkPlugin([]network.NetworkPlugin{}, "", network.NewFakeHost(nil)) + dockerManager := dockertools.NewDockerManager(fakeDocker, fakeRecorder, nil, nil, dockertools.PodInfraContainerImage, 0, 0, "", kubecontainer.FakeOS{}, np) fakeRuntimeCache := kubecontainer.NewFakeRuntimeCache(dockerManager) lock := sync.Mutex{} diff --git a/pkg/kubelet/runonce_test.go b/pkg/kubelet/runonce_test.go index 544885d2ef4..ce799610244 100644 --- a/pkg/kubelet/runonce_test.go +++ b/pkg/kubelet/runonce_test.go @@ -156,7 +156,8 @@ func TestRunOnce(t *testing.T) { 0, 0, "", - kubecontainer.FakeOS{}) + kubecontainer.FakeOS{}, + kb.networkPlugin) kb.containerManager.Puller = &dockertools.FakeDockerPuller{} pods := []*api.Pod{