mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-27 05:27:21 +00:00
Move network plugin teardown to DockerManager.
This teardown is Docker-specific and will let us also do the setup in the manager in future cleanups.
This commit is contained in:
parent
de0957ccb1
commit
787d42d50b
@ -27,6 +27,7 @@ import (
|
|||||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/client/record"
|
"github.com/GoogleCloudPlatform/kubernetes/pkg/client/record"
|
||||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/credentialprovider"
|
"github.com/GoogleCloudPlatform/kubernetes/pkg/credentialprovider"
|
||||||
kubecontainer "github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/container"
|
kubecontainer "github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/container"
|
||||||
|
"github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/network"
|
||||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/types"
|
"github.com/GoogleCloudPlatform/kubernetes/pkg/types"
|
||||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/util"
|
"github.com/GoogleCloudPlatform/kubernetes/pkg/util"
|
||||||
docker "github.com/fsouza/go-dockerclient"
|
docker "github.com/fsouza/go-dockerclient"
|
||||||
@ -392,7 +393,8 @@ func TestIsImagePresent(t *testing.T) {
|
|||||||
func TestGetRunningContainers(t *testing.T) {
|
func TestGetRunningContainers(t *testing.T) {
|
||||||
fakeDocker := &FakeDockerClient{Errors: make(map[string]error)}
|
fakeDocker := &FakeDockerClient{Errors: make(map[string]error)}
|
||||||
fakeRecorder := &record.FakeRecorder{}
|
fakeRecorder := &record.FakeRecorder{}
|
||||||
containerManager := NewDockerManager(fakeDocker, fakeRecorder, nil, nil, PodInfraContainerImage, 0, 0, "", kubecontainer.FakeOS{})
|
np, _ := network.InitNetworkPlugin([]network.NetworkPlugin{}, "", network.NewFakeHost(nil))
|
||||||
|
containerManager := NewDockerManager(fakeDocker, fakeRecorder, nil, nil, PodInfraContainerImage, 0, 0, "", kubecontainer.FakeOS{}, np)
|
||||||
tests := []struct {
|
tests := []struct {
|
||||||
containers map[string]*docker.Container
|
containers map[string]*docker.Container
|
||||||
inputIDs []string
|
inputIDs []string
|
||||||
@ -657,7 +659,8 @@ func TestFindContainersByPod(t *testing.T) {
|
|||||||
},
|
},
|
||||||
}
|
}
|
||||||
fakeClient := &FakeDockerClient{}
|
fakeClient := &FakeDockerClient{}
|
||||||
containerManager := NewDockerManager(fakeClient, &record.FakeRecorder{}, nil, nil, PodInfraContainerImage, 0, 0, "", kubecontainer.FakeOS{})
|
np, _ := network.InitNetworkPlugin([]network.NetworkPlugin{}, "", network.NewFakeHost(nil))
|
||||||
|
containerManager := NewDockerManager(fakeClient, &record.FakeRecorder{}, nil, nil, PodInfraContainerImage, 0, 0, "", kubecontainer.FakeOS{}, np)
|
||||||
for i, test := range tests {
|
for i, test := range tests {
|
||||||
fakeClient.ContainerList = test.containerList
|
fakeClient.ContainerList = test.containerList
|
||||||
fakeClient.ExitedContainerList = test.exitedContainerList
|
fakeClient.ExitedContainerList = test.exitedContainerList
|
||||||
|
@ -34,6 +34,7 @@ import (
|
|||||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/capabilities"
|
"github.com/GoogleCloudPlatform/kubernetes/pkg/capabilities"
|
||||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/client/record"
|
"github.com/GoogleCloudPlatform/kubernetes/pkg/client/record"
|
||||||
kubecontainer "github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/container"
|
kubecontainer "github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/container"
|
||||||
|
"github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/network"
|
||||||
kubeletTypes "github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/types"
|
kubeletTypes "github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/types"
|
||||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/types"
|
"github.com/GoogleCloudPlatform/kubernetes/pkg/types"
|
||||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/util"
|
"github.com/GoogleCloudPlatform/kubernetes/pkg/util"
|
||||||
@ -82,6 +83,9 @@ type DockerManager struct {
|
|||||||
|
|
||||||
// Directory of container logs.
|
// Directory of container logs.
|
||||||
containerLogsDir string
|
containerLogsDir string
|
||||||
|
|
||||||
|
// Network plugin.
|
||||||
|
networkPlugin network.NetworkPlugin
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewDockerManager(
|
func NewDockerManager(
|
||||||
@ -93,7 +97,8 @@ func NewDockerManager(
|
|||||||
qps float32,
|
qps float32,
|
||||||
burst int,
|
burst int,
|
||||||
containerLogsDir string,
|
containerLogsDir string,
|
||||||
osInterface kubecontainer.OSInterface) *DockerManager {
|
osInterface kubecontainer.OSInterface,
|
||||||
|
networkPlugin network.NetworkPlugin) *DockerManager {
|
||||||
// Work out the location of the Docker runtime, defaulting to /var/lib/docker
|
// Work out the location of the Docker runtime, defaulting to /var/lib/docker
|
||||||
// if there are any problems.
|
// if there are any problems.
|
||||||
dockerRoot := "/var/lib/docker"
|
dockerRoot := "/var/lib/docker"
|
||||||
@ -136,6 +141,7 @@ func NewDockerManager(
|
|||||||
Puller: newDockerPuller(client, qps, burst),
|
Puller: newDockerPuller(client, qps, burst),
|
||||||
dockerRoot: dockerRoot,
|
dockerRoot: dockerRoot,
|
||||||
containerLogsDir: containerLogsDir,
|
containerLogsDir: containerLogsDir,
|
||||||
|
networkPlugin: networkPlugin,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -942,13 +948,24 @@ func (dm *DockerManager) PortForward(pod *kubecontainer.Pod, port uint16, stream
|
|||||||
|
|
||||||
// Kills all containers in the specified pod
|
// Kills all containers in the specified pod
|
||||||
func (dm *DockerManager) KillPod(pod kubecontainer.Pod) error {
|
func (dm *DockerManager) KillPod(pod kubecontainer.Pod) error {
|
||||||
// Send the kills in parallel since they may take a long time.
|
// Send the kills in parallel since they may take a long time. Len + 1 since there
|
||||||
errs := make(chan error, len(pod.Containers))
|
// can be Len errors + the networkPlugin teardown error.
|
||||||
|
errs := make(chan error, len(pod.Containers)+1)
|
||||||
wg := sync.WaitGroup{}
|
wg := sync.WaitGroup{}
|
||||||
for _, container := range pod.Containers {
|
for _, container := range pod.Containers {
|
||||||
wg.Add(1)
|
wg.Add(1)
|
||||||
go func(container *kubecontainer.Container) {
|
go func(container *kubecontainer.Container) {
|
||||||
defer util.HandleCrash()
|
defer util.HandleCrash()
|
||||||
|
|
||||||
|
// TODO: Handle this without signaling the pod infra container to
|
||||||
|
// adapt to the generic container runtime.
|
||||||
|
if container.Name == PodInfraContainerName {
|
||||||
|
err := dm.networkPlugin.TearDownPod(pod.Namespace, pod.Name, kubeletTypes.DockerID(container.ID))
|
||||||
|
if err != nil {
|
||||||
|
glog.Errorf("Failed tearing down the infra container: %v", err)
|
||||||
|
errs <- err
|
||||||
|
}
|
||||||
|
}
|
||||||
err := dm.KillContainer(container.ID)
|
err := dm.KillContainer(container.ID)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
glog.Errorf("Failed to delete container: %v; Skipping pod %q", err, pod.ID)
|
glog.Errorf("Failed to delete container: %v; Skipping pod %q", err, pod.ID)
|
||||||
|
@ -203,16 +203,6 @@ func NewMainKubelet(
|
|||||||
statusManager := newStatusManager(kubeClient)
|
statusManager := newStatusManager(kubeClient)
|
||||||
readinessManager := kubecontainer.NewReadinessManager()
|
readinessManager := kubecontainer.NewReadinessManager()
|
||||||
containerRefManager := kubecontainer.NewRefManager()
|
containerRefManager := kubecontainer.NewRefManager()
|
||||||
containerManager := dockertools.NewDockerManager(
|
|
||||||
dockerClient,
|
|
||||||
recorder,
|
|
||||||
readinessManager,
|
|
||||||
containerRefManager,
|
|
||||||
podInfraContainerImage,
|
|
||||||
pullQPS,
|
|
||||||
pullBurst,
|
|
||||||
containerLogsDir,
|
|
||||||
osInterface)
|
|
||||||
|
|
||||||
volumeManager := newVolumeManager()
|
volumeManager := newVolumeManager()
|
||||||
|
|
||||||
@ -224,7 +214,6 @@ func NewMainKubelet(
|
|||||||
resyncInterval: resyncInterval,
|
resyncInterval: resyncInterval,
|
||||||
containerRefManager: containerRefManager,
|
containerRefManager: containerRefManager,
|
||||||
readinessManager: readinessManager,
|
readinessManager: readinessManager,
|
||||||
runner: containerManager,
|
|
||||||
httpClient: &http.Client{},
|
httpClient: &http.Client{},
|
||||||
sourcesReady: sourcesReady,
|
sourcesReady: sourcesReady,
|
||||||
clusterDomain: clusterDomain,
|
clusterDomain: clusterDomain,
|
||||||
@ -241,12 +230,30 @@ func NewMainKubelet(
|
|||||||
volumeManager: volumeManager,
|
volumeManager: volumeManager,
|
||||||
cloud: cloud,
|
cloud: cloud,
|
||||||
nodeRef: nodeRef,
|
nodeRef: nodeRef,
|
||||||
containerManager: containerManager,
|
|
||||||
nodeStatusUpdateFrequency: nodeStatusUpdateFrequency,
|
nodeStatusUpdateFrequency: nodeStatusUpdateFrequency,
|
||||||
resourceContainer: resourceContainer,
|
resourceContainer: resourceContainer,
|
||||||
os: osInterface,
|
os: osInterface,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if plug, err := network.InitNetworkPlugin(networkPlugins, networkPluginName, &networkHost{klet}); err != nil {
|
||||||
|
return nil, err
|
||||||
|
} else {
|
||||||
|
klet.networkPlugin = plug
|
||||||
|
}
|
||||||
|
containerManager := dockertools.NewDockerManager(
|
||||||
|
dockerClient,
|
||||||
|
recorder,
|
||||||
|
readinessManager,
|
||||||
|
containerRefManager,
|
||||||
|
podInfraContainerImage,
|
||||||
|
pullQPS,
|
||||||
|
pullBurst,
|
||||||
|
containerLogsDir,
|
||||||
|
osInterface,
|
||||||
|
klet.networkPlugin)
|
||||||
|
klet.runner = containerManager
|
||||||
|
klet.containerManager = containerManager
|
||||||
|
|
||||||
klet.podManager = newBasicPodManager(klet.kubeClient)
|
klet.podManager = newBasicPodManager(klet.kubeClient)
|
||||||
klet.prober = newProber(klet.runner, klet.readinessManager, klet.containerRefManager, klet.recorder)
|
klet.prober = newProber(klet.runner, klet.readinessManager, klet.containerRefManager, klet.recorder)
|
||||||
klet.handlerRunner = newHandlerRunner(klet.httpClient, klet.runner, klet.containerManager)
|
klet.handlerRunner = newHandlerRunner(klet.httpClient, klet.runner, klet.containerManager)
|
||||||
@ -267,11 +274,6 @@ func NewMainKubelet(
|
|||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
if plug, err := network.InitNetworkPlugin(networkPlugins, networkPluginName, &networkHost{klet}); err != nil {
|
|
||||||
return nil, err
|
|
||||||
} else {
|
|
||||||
klet.networkPlugin = plug
|
|
||||||
}
|
|
||||||
// If the container logs directory does not exist, create it.
|
// If the container logs directory does not exist, create it.
|
||||||
if _, err := os.Stat(containerLogsDir); err != nil {
|
if _, err := os.Stat(containerLogsDir); err != nil {
|
||||||
if err := osInterface.Mkdir(containerLogsDir, 0755); err != nil {
|
if err := osInterface.Mkdir(containerLogsDir, 0755); err != nil {
|
||||||
@ -888,27 +890,7 @@ func (kl *Kubelet) pullImage(pod *api.Pod, container *api.Container) error {
|
|||||||
|
|
||||||
// Kill all running containers in a pod (includes the pod infra container).
|
// Kill all running containers in a pod (includes the pod infra container).
|
||||||
func (kl *Kubelet) killPod(pod kubecontainer.Pod) error {
|
func (kl *Kubelet) killPod(pod kubecontainer.Pod) error {
|
||||||
// TODO(vmarmol): Consider handling non-Docker runtimes, the plugins are not friendly to it today.
|
return kl.containerManager.KillPod(pod)
|
||||||
container, err := kl.containerManager.GetPodInfraContainer(pod)
|
|
||||||
errList := []error{}
|
|
||||||
if err == nil {
|
|
||||||
// Call the networking plugin for teardown.
|
|
||||||
err = kl.networkPlugin.TearDownPod(pod.Namespace, pod.Name, kubeletTypes.DockerID(container.ID))
|
|
||||||
if err != nil {
|
|
||||||
glog.Errorf("Failed tearing down the network plugin for pod %q: %v", pod.ID, err)
|
|
||||||
errList = append(errList, err)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
err = kl.containerManager.KillPod(pod)
|
|
||||||
if err != nil {
|
|
||||||
errList = append(errList, err)
|
|
||||||
}
|
|
||||||
|
|
||||||
if len(errList) > 0 {
|
|
||||||
return utilErrors.NewAggregate(errList)
|
|
||||||
}
|
|
||||||
return nil
|
|
||||||
}
|
}
|
||||||
|
|
||||||
type empty struct{}
|
type empty struct{}
|
||||||
|
@ -105,7 +105,7 @@ func newTestKubelet(t *testing.T) *TestKubelet {
|
|||||||
podManager, fakeMirrorClient := newFakePodManager()
|
podManager, fakeMirrorClient := newFakePodManager()
|
||||||
kubelet.podManager = podManager
|
kubelet.podManager = podManager
|
||||||
kubelet.containerRefManager = kubecontainer.NewRefManager()
|
kubelet.containerRefManager = kubecontainer.NewRefManager()
|
||||||
kubelet.containerManager = dockertools.NewDockerManager(fakeDocker, fakeRecorder, kubelet.readinessManager, kubelet.containerRefManager, dockertools.PodInfraContainerImage, 0, 0, "", kubelet.os)
|
kubelet.containerManager = dockertools.NewDockerManager(fakeDocker, fakeRecorder, kubelet.readinessManager, kubelet.containerRefManager, dockertools.PodInfraContainerImage, 0, 0, "", kubelet.os, kubelet.networkPlugin)
|
||||||
kubelet.runtimeCache = kubecontainer.NewFakeRuntimeCache(kubelet.containerManager)
|
kubelet.runtimeCache = kubecontainer.NewFakeRuntimeCache(kubelet.containerManager)
|
||||||
kubelet.podWorkers = newPodWorkers(
|
kubelet.podWorkers = newPodWorkers(
|
||||||
kubelet.runtimeCache,
|
kubelet.runtimeCache,
|
||||||
|
@ -25,6 +25,7 @@ import (
|
|||||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/client/record"
|
"github.com/GoogleCloudPlatform/kubernetes/pkg/client/record"
|
||||||
kubecontainer "github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/container"
|
kubecontainer "github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/container"
|
||||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/dockertools"
|
"github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/dockertools"
|
||||||
|
"github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/network"
|
||||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/types"
|
"github.com/GoogleCloudPlatform/kubernetes/pkg/types"
|
||||||
)
|
)
|
||||||
|
|
||||||
@ -40,7 +41,8 @@ func newPod(uid, name string) *api.Pod {
|
|||||||
func createPodWorkers() (*podWorkers, map[types.UID][]string) {
|
func createPodWorkers() (*podWorkers, map[types.UID][]string) {
|
||||||
fakeDocker := &dockertools.FakeDockerClient{}
|
fakeDocker := &dockertools.FakeDockerClient{}
|
||||||
fakeRecorder := &record.FakeRecorder{}
|
fakeRecorder := &record.FakeRecorder{}
|
||||||
dockerManager := dockertools.NewDockerManager(fakeDocker, fakeRecorder, nil, nil, dockertools.PodInfraContainerImage, 0, 0, "", kubecontainer.FakeOS{})
|
np, _ := network.InitNetworkPlugin([]network.NetworkPlugin{}, "", network.NewFakeHost(nil))
|
||||||
|
dockerManager := dockertools.NewDockerManager(fakeDocker, fakeRecorder, nil, nil, dockertools.PodInfraContainerImage, 0, 0, "", kubecontainer.FakeOS{}, np)
|
||||||
fakeRuntimeCache := kubecontainer.NewFakeRuntimeCache(dockerManager)
|
fakeRuntimeCache := kubecontainer.NewFakeRuntimeCache(dockerManager)
|
||||||
|
|
||||||
lock := sync.Mutex{}
|
lock := sync.Mutex{}
|
||||||
|
@ -156,7 +156,8 @@ func TestRunOnce(t *testing.T) {
|
|||||||
0,
|
0,
|
||||||
0,
|
0,
|
||||||
"",
|
"",
|
||||||
kubecontainer.FakeOS{})
|
kubecontainer.FakeOS{},
|
||||||
|
kb.networkPlugin)
|
||||||
kb.containerManager.Puller = &dockertools.FakeDockerPuller{}
|
kb.containerManager.Puller = &dockertools.FakeDockerPuller{}
|
||||||
|
|
||||||
pods := []*api.Pod{
|
pods := []*api.Pod{
|
||||||
|
Loading…
Reference in New Issue
Block a user