Merge pull request #7449 from vmarmol/runtime-network-plugins

Move network plugin TearDown to DockerManager
This commit is contained in:
Yu-Ju Hong 2015-04-29 10:09:58 -07:00
commit 33b8f487f6
11 changed files with 114 additions and 71 deletions

View File

@ -29,6 +29,7 @@ import (
"github.com/GoogleCloudPlatform/kubernetes/pkg/credentialprovider"
kubecontainer "github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/container"
"github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/leaky"
kubeletTypes "github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/types"
"github.com/GoogleCloudPlatform/kubernetes/pkg/types"
"github.com/GoogleCloudPlatform/kubernetes/pkg/util"
"github.com/docker/docker/pkg/parsers"
@ -68,9 +69,6 @@ type DockerInterface interface {
StartExec(string, docker.StartExecOptions) error
}
// DockerID is an ID of docker container. It is a type to make it clear when we're working with docker container Ids
type DockerID string
// KubeletContainerName encapsulates a pod name and a Kubernetes container name.
type KubeletContainerName struct {
PodFullName string
@ -174,7 +172,7 @@ func (p throttledDockerPuller) IsImagePresent(name string) (bool, error) {
}
// DockerContainers is a map of containers
type DockerContainers map[DockerID]*docker.APIContainers
type DockerContainers map[kubeletTypes.DockerID]*docker.APIContainers
func (c DockerContainers) FindPodContainer(podFullName string, uid types.UID, containerName string) (*docker.APIContainers, bool, uint64) {
for _, dockerContainer := range c {
@ -319,7 +317,7 @@ func GetKubeletDockerContainers(client DockerInterface, allContainers bool) (Doc
glog.V(3).Infof("Docker Container: %s is not managed by kubelet.", container.Names[0])
continue
}
result[DockerID(container.ID)] = container
result[kubeletTypes.DockerID(container.ID)] = container
}
return result, nil
}

View File

@ -27,6 +27,7 @@ import (
"github.com/GoogleCloudPlatform/kubernetes/pkg/client/record"
"github.com/GoogleCloudPlatform/kubernetes/pkg/credentialprovider"
kubecontainer "github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/container"
"github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/network"
"github.com/GoogleCloudPlatform/kubernetes/pkg/types"
"github.com/GoogleCloudPlatform/kubernetes/pkg/util"
docker "github.com/fsouza/go-dockerclient"
@ -392,7 +393,8 @@ func TestIsImagePresent(t *testing.T) {
func TestGetRunningContainers(t *testing.T) {
fakeDocker := &FakeDockerClient{Errors: make(map[string]error)}
fakeRecorder := &record.FakeRecorder{}
containerManager := NewDockerManager(fakeDocker, fakeRecorder, nil, nil, PodInfraContainerImage, 0, 0, "", kubecontainer.FakeOS{})
np, _ := network.InitNetworkPlugin([]network.NetworkPlugin{}, "", network.NewFakeHost(nil))
containerManager := NewDockerManager(fakeDocker, fakeRecorder, nil, nil, PodInfraContainerImage, 0, 0, "", kubecontainer.FakeOS{}, np)
tests := []struct {
containers map[string]*docker.Container
inputIDs []string
@ -657,7 +659,8 @@ func TestFindContainersByPod(t *testing.T) {
},
}
fakeClient := &FakeDockerClient{}
containerManager := NewDockerManager(fakeClient, &record.FakeRecorder{}, nil, nil, PodInfraContainerImage, 0, 0, "", kubecontainer.FakeOS{})
np, _ := network.InitNetworkPlugin([]network.NetworkPlugin{}, "", network.NewFakeHost(nil))
containerManager := NewDockerManager(fakeClient, &record.FakeRecorder{}, nil, nil, PodInfraContainerImage, 0, 0, "", kubecontainer.FakeOS{}, np)
for i, test := range tests {
fakeClient.ContainerList = test.containerList
fakeClient.ExitedContainerList = test.exitedContainerList

View File

@ -34,6 +34,8 @@ import (
"github.com/GoogleCloudPlatform/kubernetes/pkg/capabilities"
"github.com/GoogleCloudPlatform/kubernetes/pkg/client/record"
kubecontainer "github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/container"
"github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/network"
kubeletTypes "github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/types"
"github.com/GoogleCloudPlatform/kubernetes/pkg/types"
"github.com/GoogleCloudPlatform/kubernetes/pkg/util"
"github.com/fsouza/go-dockerclient"
@ -81,6 +83,9 @@ type DockerManager struct {
// Directory of container logs.
containerLogsDir string
// Network plugin.
networkPlugin network.NetworkPlugin
}
func NewDockerManager(
@ -92,7 +97,8 @@ func NewDockerManager(
qps float32,
burst int,
containerLogsDir string,
osInterface kubecontainer.OSInterface) *DockerManager {
osInterface kubecontainer.OSInterface,
networkPlugin network.NetworkPlugin) *DockerManager {
// Work out the location of the Docker runtime, defaulting to /var/lib/docker
// if there are any problems.
dockerRoot := "/var/lib/docker"
@ -135,6 +141,7 @@ func NewDockerManager(
Puller: newDockerPuller(client, qps, burst),
dockerRoot: dockerRoot,
containerLogsDir: containerLogsDir,
networkPlugin: networkPlugin,
}
}
@ -941,13 +948,24 @@ func (dm *DockerManager) PortForward(pod *kubecontainer.Pod, port uint16, stream
// Kills all containers in the specified pod
func (dm *DockerManager) KillPod(pod kubecontainer.Pod) error {
// Send the kills in parallel since they may take a long time.
errs := make(chan error, len(pod.Containers))
// Send the kills in parallel since they may take a long time. Len + 1 since there
// can be Len errors + the networkPlugin teardown error.
errs := make(chan error, len(pod.Containers)+1)
wg := sync.WaitGroup{}
for _, container := range pod.Containers {
wg.Add(1)
go func(container *kubecontainer.Container) {
defer util.HandleCrash()
// TODO: Handle this without signaling the pod infra container to
// adapt to the generic container runtime.
if container.Name == PodInfraContainerName {
err := dm.networkPlugin.TearDownPod(pod.Namespace, pod.Name, kubeletTypes.DockerID(container.ID))
if err != nil {
glog.Errorf("Failed tearing down the infra container: %v", err)
errs <- err
}
}
err := dm.KillContainer(container.ID)
if err != nil {
glog.Errorf("Failed to delete container: %v; Skipping pod %q", err, pod.ID)
@ -988,7 +1006,7 @@ func (dm *DockerManager) KillContainer(containerID types.UID) error {
}
// Run a single container from a pod. Returns the docker container ID
func (dm *DockerManager) RunContainer(pod *api.Pod, container *api.Container, generator kubecontainer.RunContainerOptionsGenerator, runner kubecontainer.HandlerRunner, netMode, ipcMode string) (DockerID, error) {
func (dm *DockerManager) RunContainer(pod *api.Pod, container *api.Container, generator kubecontainer.RunContainerOptionsGenerator, runner kubecontainer.HandlerRunner, netMode, ipcMode string) (kubeletTypes.DockerID, error) {
ref, err := kubecontainer.GenerateContainerRef(pod, container)
if err != nil {
glog.Errorf("Couldn't make a ref to pod %v, container %v: '%v'", pod.Name, container.Name, err)
@ -1013,7 +1031,7 @@ func (dm *DockerManager) RunContainer(pod *api.Pod, container *api.Container, ge
handlerErr := runner.Run(id, pod, container, container.Lifecycle.PostStart)
if handlerErr != nil {
dm.KillContainer(types.UID(id))
return DockerID(""), fmt.Errorf("failed to call event handler: %v", handlerErr)
return kubeletTypes.DockerID(""), fmt.Errorf("failed to call event handler: %v", handlerErr)
}
}
@ -1027,11 +1045,11 @@ func (dm *DockerManager) RunContainer(pod *api.Pod, container *api.Container, ge
if err = dm.os.Symlink(containerLogFile, symlinkFile); err != nil {
glog.Errorf("Failed to create symbolic link to the log file of pod %q container %q: %v", podFullName, container.Name, err)
}
return DockerID(id), err
return kubeletTypes.DockerID(id), err
}
// CreatePodInfraContainer starts the pod infra container for a pod. Returns the docker container ID of the newly created container.
func (dm *DockerManager) CreatePodInfraContainer(pod *api.Pod, generator kubecontainer.RunContainerOptionsGenerator, runner kubecontainer.HandlerRunner) (DockerID, error) {
func (dm *DockerManager) CreatePodInfraContainer(pod *api.Pod, generator kubecontainer.RunContainerOptionsGenerator, runner kubecontainer.HandlerRunner) (kubeletTypes.DockerID, error) {
// Use host networking if specified.
netNamespace := ""
var ports []api.ContainerPort

View File

@ -43,6 +43,7 @@ import (
"github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/envvars"
"github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/metrics"
"github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/network"
kubeletTypes "github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/types"
"github.com/GoogleCloudPlatform/kubernetes/pkg/labels"
"github.com/GoogleCloudPlatform/kubernetes/pkg/probe"
"github.com/GoogleCloudPlatform/kubernetes/pkg/runtime"
@ -202,16 +203,6 @@ func NewMainKubelet(
statusManager := newStatusManager(kubeClient)
readinessManager := kubecontainer.NewReadinessManager()
containerRefManager := kubecontainer.NewRefManager()
containerManager := dockertools.NewDockerManager(
dockerClient,
recorder,
readinessManager,
containerRefManager,
podInfraContainerImage,
pullQPS,
pullBurst,
containerLogsDir,
osInterface)
volumeManager := newVolumeManager()
@ -223,7 +214,6 @@ func NewMainKubelet(
resyncInterval: resyncInterval,
containerRefManager: containerRefManager,
readinessManager: readinessManager,
runner: containerManager,
httpClient: &http.Client{},
sourcesReady: sourcesReady,
clusterDomain: clusterDomain,
@ -240,12 +230,30 @@ func NewMainKubelet(
volumeManager: volumeManager,
cloud: cloud,
nodeRef: nodeRef,
containerManager: containerManager,
nodeStatusUpdateFrequency: nodeStatusUpdateFrequency,
resourceContainer: resourceContainer,
os: osInterface,
}
if plug, err := network.InitNetworkPlugin(networkPlugins, networkPluginName, &networkHost{klet}); err != nil {
return nil, err
} else {
klet.networkPlugin = plug
}
containerManager := dockertools.NewDockerManager(
dockerClient,
recorder,
readinessManager,
containerRefManager,
podInfraContainerImage,
pullQPS,
pullBurst,
containerLogsDir,
osInterface,
klet.networkPlugin)
klet.runner = containerManager
klet.containerManager = containerManager
klet.podManager = newBasicPodManager(klet.kubeClient)
klet.prober = newProber(klet.runner, klet.readinessManager, klet.containerRefManager, klet.recorder)
klet.handlerRunner = newHandlerRunner(klet.httpClient, klet.runner, klet.containerManager)
@ -266,11 +274,6 @@ func NewMainKubelet(
return nil, err
}
if plug, err := network.InitNetworkPlugin(networkPlugins, networkPluginName, &networkHost{klet}); err != nil {
return nil, err
} else {
klet.networkPlugin = plug
}
// If the container logs directory does not exist, create it.
if _, err := os.Stat(containerLogsDir); err != nil {
if err := osInterface.Mkdir(containerLogsDir, 0755); err != nil {
@ -887,27 +890,7 @@ func (kl *Kubelet) pullImage(pod *api.Pod, container *api.Container) error {
// Kill all running containers in a pod (includes the pod infra container).
func (kl *Kubelet) killPod(pod kubecontainer.Pod) error {
// TODO(vmarmol): Consider handling non-Docker runtimes, the plugins are not friendly to it today.
container, err := kl.containerManager.GetPodInfraContainer(pod)
errList := []error{}
if err == nil {
// Call the networking plugin for teardown.
err = kl.networkPlugin.TearDownPod(pod.Namespace, pod.Name, dockertools.DockerID(container.ID))
if err != nil {
glog.Errorf("Failed tearing down the network plugin for pod %q: %v", pod.ID, err)
errList = append(errList, err)
}
}
err = kl.containerManager.KillPod(pod)
if err != nil {
errList = append(errList, err)
}
if len(errList) > 0 {
return utilErrors.NewAggregate(errList)
}
return nil
return kl.containerManager.KillPod(pod)
}
type empty struct{}
@ -973,9 +956,9 @@ func shouldContainerBeRestarted(container *api.Container, pod *api.Pod, podStatu
// - all running containers which are NOT contained in containersToKeep should be killed.
type podContainerChangesSpec struct {
startInfraContainer bool
infraContainerId dockertools.DockerID
infraContainerId kubeletTypes.DockerID
containersToStart map[int]empty
containersToKeep map[dockertools.DockerID]int
containersToKeep map[kubeletTypes.DockerID]int
}
func (kl *Kubelet) computePodContainerChanges(pod *api.Pod, runningPod kubecontainer.Pod, podStatus api.PodStatus) (podContainerChangesSpec, error) {
@ -984,11 +967,11 @@ func (kl *Kubelet) computePodContainerChanges(pod *api.Pod, runningPod kubeconta
glog.V(4).Infof("Syncing Pod %+v, podFullName: %q, uid: %q", pod, podFullName, uid)
containersToStart := make(map[int]empty)
containersToKeep := make(map[dockertools.DockerID]int)
containersToKeep := make(map[kubeletTypes.DockerID]int)
createPodInfraContainer := false
var err error
var podInfraContainerID dockertools.DockerID
var podInfraContainerID kubeletTypes.DockerID
var changed bool
podInfraContainer := runningPod.FindContainerByName(dockertools.PodInfraContainerName)
if podInfraContainer != nil {
@ -1007,7 +990,7 @@ func (kl *Kubelet) computePodContainerChanges(pod *api.Pod, runningPod kubeconta
} else {
glog.V(4).Infof("Pod infra container looks good, keep it %q", podFullName)
createPodInfraContainer = false
podInfraContainerID = dockertools.DockerID(podInfraContainer.ID)
podInfraContainerID = kubeletTypes.DockerID(podInfraContainer.ID)
containersToKeep[podInfraContainerID] = -1
}
@ -1026,7 +1009,7 @@ func (kl *Kubelet) computePodContainerChanges(pod *api.Pod, runningPod kubeconta
continue
}
containerID := dockertools.DockerID(c.ID)
containerID := kubeletTypes.DockerID(c.ID)
hash := c.Hash
glog.V(3).Infof("pod %q container %q exists as %v", podFullName, container.Name, containerID)
@ -1074,7 +1057,7 @@ func (kl *Kubelet) computePodContainerChanges(pod *api.Pod, runningPod kubeconta
// If Infra container is the last running one, we don't want to keep it.
if !createPodInfraContainer && len(containersToStart) == 0 && len(containersToKeep) == 1 {
containersToKeep = make(map[dockertools.DockerID]int)
containersToKeep = make(map[kubeletTypes.DockerID]int)
}
return podContainerChangesSpec{
@ -1147,7 +1130,7 @@ func (kl *Kubelet) syncPod(pod *api.Pod, mirrorPod *api.Pod, runningPod kubecont
} else {
// Otherwise kill any containers in this pod which are not specified as ones to keep.
for _, container := range runningPod.Containers {
_, keep := containerChanges.containersToKeep[dockertools.DockerID(container.ID)]
_, keep := containerChanges.containersToKeep[kubeletTypes.DockerID(container.ID)]
if !keep {
glog.V(3).Infof("Killing unwanted container %+v", container)
err = kl.containerManager.KillContainer(container.ID)

View File

@ -105,7 +105,7 @@ func newTestKubelet(t *testing.T) *TestKubelet {
podManager, fakeMirrorClient := newFakePodManager()
kubelet.podManager = podManager
kubelet.containerRefManager = kubecontainer.NewRefManager()
kubelet.containerManager = dockertools.NewDockerManager(fakeDocker, fakeRecorder, kubelet.readinessManager, kubelet.containerRefManager, dockertools.PodInfraContainerImage, 0, 0, "", kubelet.os)
kubelet.containerManager = dockertools.NewDockerManager(fakeDocker, fakeRecorder, kubelet.readinessManager, kubelet.containerRefManager, dockertools.PodInfraContainerImage, 0, 0, "", kubelet.os, kubelet.networkPlugin)
kubelet.runtimeCache = kubecontainer.NewFakeRuntimeCache(kubelet.containerManager)
kubelet.podWorkers = newPodWorkers(
kubelet.runtimeCache,

View File

@ -54,8 +54,8 @@ import (
"path"
"strings"
"github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/dockertools"
"github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/network"
kubeletTypes "github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/types"
utilexec "github.com/GoogleCloudPlatform/kubernetes/pkg/util/exec"
"github.com/golang/glog"
)
@ -125,13 +125,13 @@ func (plugin *execNetworkPlugin) validate() error {
return nil
}
func (plugin *execNetworkPlugin) SetUpPod(namespace string, name string, id dockertools.DockerID) error {
func (plugin *execNetworkPlugin) SetUpPod(namespace string, name string, id kubeletTypes.DockerID) error {
out, err := utilexec.New().Command(plugin.getExecutable(), setUpCmd, namespace, name, string(id)).CombinedOutput()
glog.V(5).Infof("SetUpPod 'exec' network plugin output: %s, %v", string(out), err)
return err
}
func (plugin *execNetworkPlugin) TearDownPod(namespace string, name string, id dockertools.DockerID) error {
func (plugin *execNetworkPlugin) TearDownPod(namespace string, name string, id kubeletTypes.DockerID) error {
out, err := utilexec.New().Command(plugin.getExecutable(), tearDownCmd, namespace, name, string(id)).CombinedOutput()
glog.V(5).Infof("TearDownPod 'exec' network plugin output: %s, %v", string(out), err)
return err

View File

@ -22,7 +22,7 @@ import (
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
"github.com/GoogleCloudPlatform/kubernetes/pkg/client"
"github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/dockertools"
kubeletTypes "github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/types"
"github.com/GoogleCloudPlatform/kubernetes/pkg/util"
"github.com/GoogleCloudPlatform/kubernetes/pkg/util/errors"
"github.com/golang/glog"
@ -43,10 +43,10 @@ type NetworkPlugin interface {
// SetUpPod is the method called after the infra container of
// the pod has been created but before the other containers of the
// pod are launched.
SetUpPod(namespace string, name string, podInfraContainerID dockertools.DockerID) error
SetUpPod(namespace string, name string, podInfraContainerID kubeletTypes.DockerID) error
// TearDownPod is the method called before a pod's infra container will be deleted
TearDownPod(namespace string, name string, podInfraContainerID dockertools.DockerID) error
TearDownPod(namespace string, name string, podInfraContainerID kubeletTypes.DockerID) error
}
// Host is an interface that plugins can use to access the kubelet.
@ -113,10 +113,10 @@ func (plugin *noopNetworkPlugin) Name() string {
return DefaultPluginName
}
func (plugin *noopNetworkPlugin) SetUpPod(namespace string, name string, id dockertools.DockerID) error {
func (plugin *noopNetworkPlugin) SetUpPod(namespace string, name string, id kubeletTypes.DockerID) error {
return nil
}
func (plugin *noopNetworkPlugin) TearDownPod(namespace string, name string, id dockertools.DockerID) error {
func (plugin *noopNetworkPlugin) TearDownPod(namespace string, name string, id kubeletTypes.DockerID) error {
return nil
}

View File

@ -25,6 +25,7 @@ import (
"github.com/GoogleCloudPlatform/kubernetes/pkg/client/record"
kubecontainer "github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/container"
"github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/dockertools"
"github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/network"
"github.com/GoogleCloudPlatform/kubernetes/pkg/types"
)
@ -40,7 +41,8 @@ func newPod(uid, name string) *api.Pod {
func createPodWorkers() (*podWorkers, map[types.UID][]string) {
fakeDocker := &dockertools.FakeDockerClient{}
fakeRecorder := &record.FakeRecorder{}
dockerManager := dockertools.NewDockerManager(fakeDocker, fakeRecorder, nil, nil, dockertools.PodInfraContainerImage, 0, 0, "", kubecontainer.FakeOS{})
np, _ := network.InitNetworkPlugin([]network.NetworkPlugin{}, "", network.NewFakeHost(nil))
dockerManager := dockertools.NewDockerManager(fakeDocker, fakeRecorder, nil, nil, dockertools.PodInfraContainerImage, 0, 0, "", kubecontainer.FakeOS{}, np)
fakeRuntimeCache := kubecontainer.NewFakeRuntimeCache(dockerManager)
lock := sync.Mutex{}

View File

@ -157,7 +157,8 @@ func TestRunOnce(t *testing.T) {
0,
0,
"",
kubecontainer.FakeOS{})
kubecontainer.FakeOS{},
kb.networkPlugin)
kb.containerManager.Puller = &dockertools.FakeDockerPuller{}
pods := []*api.Pod{

18
pkg/kubelet/types/doc.go Normal file
View File

@ -0,0 +1,18 @@
/*
Copyright 2015 Google Inc. All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
// Common types in the Kubelet.
package types

View File

@ -0,0 +1,20 @@
/*
Copyright 2015 Google Inc. All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package types
// DockerID is an ID of docker container. It is a type to make it clear when we're working with docker container Ids
type DockerID string