dockertools: clean up networking when garbage-collecting pods

The docker runtime doesn't tear down networking when GC-ing pods.
rkt already does so make docker do it too. To ensure this happens,
networking is always torn down for the container even if the
container itself is not deleted.

This prevents IPAM from leaking when the pod gets killed for
some reason outside kubelet (like docker restart) or when pods
are killed while kubelet isn't running.

Fixes: https://github.com/kubernetes/kubernetes/issues/14940
Related: https://github.com/kubernetes/kubernetes/pull/35572
This commit is contained in:
Dan Williams
2016-11-17 16:06:44 -06:00
parent dc2fd511ab
commit 4d7d7faa81
6 changed files with 169 additions and 50 deletions

View File

@@ -28,18 +28,21 @@ import (
"github.com/golang/glog"
"k8s.io/apimachinery/pkg/types"
kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
knetwork "k8s.io/kubernetes/pkg/kubelet/network"
)
type containerGC struct {
client DockerInterface
podGetter podGetter
network *knetwork.PluginManager
containerLogsDir string
}
func NewContainerGC(client DockerInterface, podGetter podGetter, containerLogsDir string) *containerGC {
func NewContainerGC(client DockerInterface, podGetter podGetter, network *knetwork.PluginManager, containerLogsDir string) *containerGC {
return &containerGC{
client: client,
podGetter: podGetter,
network: network,
containerLogsDir: containerLogsDir,
}
}
@@ -50,7 +53,7 @@ type containerGCInfo struct {
id string
// Docker name of the container.
name string
dockerName string
// Creation time for the container.
createTime time.Time
@@ -59,8 +62,14 @@ type containerGCInfo struct {
// This comes from dockertools.ParseDockerName(...)
podNameWithNamespace string
// Kubernetes pod UID
podUID types.UID
// Container name in pod
containerName string
// Container network mode
isHostNetwork bool
}
// Containers are considered for eviction as units of (UID, container name) pair.
@@ -111,22 +120,45 @@ func (cgc *containerGC) removeOldestN(containers []containerGCInfo, toRemove int
// Remove from oldest to newest (last to first).
numToKeep := len(containers) - toRemove
for i := numToKeep; i < len(containers); i++ {
cgc.removeContainer(containers[i].id, containers[i].podNameWithNamespace, containers[i].containerName)
cgc.removeContainer(containers[i])
}
// Assume we removed the containers so that we're not too aggressive.
return containers[:numToKeep]
}
// Returns a full GC info structure on success, or a partial one on failure
func newContainerGCInfo(id string, inspectResult *dockertypes.ContainerJSON, created time.Time) (containerGCInfo, error) {
containerName, _, err := ParseDockerName(inspectResult.Name)
if err != nil {
return containerGCInfo{
id: id,
dockerName: inspectResult.Name,
}, fmt.Errorf("failed to parse docker name %q: %v", inspectResult.Name, err)
}
networkMode := getDockerNetworkMode(inspectResult)
return containerGCInfo{
id: id,
dockerName: inspectResult.Name,
podNameWithNamespace: containerName.PodFullName,
podUID: containerName.PodUID,
containerName: containerName.ContainerName,
createTime: created,
isHostNetwork: networkMode == namespaceModeHost,
}, nil
}
// Get all containers that are evictable. Evictable containers are: not running
// and created more than MinAge ago.
func (cgc *containerGC) evictableContainers(minAge time.Duration) (containersByEvictUnit, []containerGCInfo, error) {
func (cgc *containerGC) evictableContainers(minAge time.Duration) (containersByEvictUnit, []containerGCInfo, []containerGCInfo, error) {
containers, err := GetKubeletDockerContainers(cgc.client, true)
if err != nil {
return containersByEvictUnit{}, []containerGCInfo{}, err
return containersByEvictUnit{}, []containerGCInfo{}, []containerGCInfo{}, err
}
unidentifiedContainers := make([]containerGCInfo, 0)
netContainers := make([]containerGCInfo, 0)
evictUnits := make(containersByEvictUnit)
newestGCTime := time.Now().Add(-minAge)
for _, container := range containers {
@@ -147,23 +179,19 @@ func (cgc *containerGC) evictableContainers(minAge time.Duration) (containersByE
continue
}
containerInfo := containerGCInfo{
id: container.ID,
name: container.Names[0],
createTime: created,
}
containerName, _, err := ParseDockerName(container.Names[0])
containerInfo, err := newContainerGCInfo(container.ID, data, created)
if err != nil {
unidentifiedContainers = append(unidentifiedContainers, containerInfo)
} else {
key := evictUnit{
uid: containerName.PodUID,
name: containerName.ContainerName,
// Track net containers for special cleanup
if containerIsNetworked(containerInfo.containerName) {
netContainers = append(netContainers, containerInfo)
}
key := evictUnit{
uid: containerInfo.podUID,
name: containerInfo.containerName,
}
containerInfo.podNameWithNamespace = containerName.PodFullName
containerInfo.containerName = containerName.ContainerName
evictUnits[key] = append(evictUnits[key], containerInfo)
}
}
@@ -173,26 +201,34 @@ func (cgc *containerGC) evictableContainers(minAge time.Duration) (containersByE
sort.Sort(byCreated(evictUnits[uid]))
}
return evictUnits, unidentifiedContainers, nil
return evictUnits, netContainers, unidentifiedContainers, nil
}
// GarbageCollect removes dead containers using the specified container gc policy
func (cgc *containerGC) GarbageCollect(gcPolicy kubecontainer.ContainerGCPolicy, allSourcesReady bool) error {
// Separate containers by evict units.
evictUnits, unidentifiedContainers, err := cgc.evictableContainers(gcPolicy.MinAge)
evictUnits, netContainers, unidentifiedContainers, err := cgc.evictableContainers(gcPolicy.MinAge)
if err != nil {
return err
}
// Remove unidentified containers.
for _, container := range unidentifiedContainers {
glog.Infof("Removing unidentified dead container %q with ID %q", container.name, container.id)
glog.Infof("Removing unidentified dead container %q", container.dockerName)
err = cgc.client.RemoveContainer(container.id, dockertypes.ContainerRemoveOptions{RemoveVolumes: true})
if err != nil {
glog.Warningf("Failed to remove unidentified dead container %q: %v", container.name, err)
glog.Warningf("Failed to remove unidentified dead container %q: %v", container.dockerName, err)
}
}
// Always clean up net containers to ensure network resources are released
// TODO: this may tear down networking again if the container doesn't get
// removed in this GC cycle, but that already happens elsewhere...
for _, container := range netContainers {
glog.Infof("Cleaning up dead net container %q", container.dockerName)
cgc.netContainerCleanup(container)
}
// Remove deleted pod containers if all sources are ready.
if allSourcesReady {
for key, unit := range evictUnits {
@@ -245,35 +281,56 @@ func (cgc *containerGC) GarbageCollect(gcPolicy kubecontainer.ContainerGCPolicy,
return nil
}
func (cgc *containerGC) removeContainer(id string, podNameWithNamespace string, containerName string) {
glog.V(4).Infof("Removing container %q name %q", id, containerName)
err := cgc.client.RemoveContainer(id, dockertypes.ContainerRemoveOptions{RemoveVolumes: true})
if err != nil {
glog.Warningf("Failed to remove container %q: %v", id, err)
func (cgc *containerGC) netContainerCleanup(containerInfo containerGCInfo) {
if containerInfo.isHostNetwork {
return
}
symlinkPath := LogSymlink(cgc.containerLogsDir, podNameWithNamespace, containerName, id)
podName, podNamespace, err := kubecontainer.ParsePodFullName(containerInfo.podNameWithNamespace)
if err != nil {
glog.Warningf("failed to parse container %q pod full name: %v", containerInfo.dockerName, err)
return
}
containerID := kubecontainer.DockerID(containerInfo.id).ContainerID()
if err := cgc.network.TearDownPod(podNamespace, podName, containerID); err != nil {
glog.Warningf("failed to tear down container %q network: %v", containerInfo.dockerName, err)
}
}
func (cgc *containerGC) removeContainer(containerInfo containerGCInfo) {
glog.V(4).Infof("Removing container %q", containerInfo.dockerName)
err := cgc.client.RemoveContainer(containerInfo.id, dockertypes.ContainerRemoveOptions{RemoveVolumes: true})
if err != nil {
glog.Warningf("Failed to remove container %q: %v", containerInfo.dockerName, err)
}
symlinkPath := LogSymlink(cgc.containerLogsDir, containerInfo.podNameWithNamespace, containerInfo.containerName, containerInfo.id)
err = os.Remove(symlinkPath)
if err != nil && !os.IsNotExist(err) {
glog.Warningf("Failed to remove container %q log symlink %q: %v", id, symlinkPath, err)
glog.Warningf("Failed to remove container %q log symlink %q: %v", containerInfo.dockerName, symlinkPath, err)
}
}
func (cgc *containerGC) deleteContainer(id string) error {
containerInfo, err := cgc.client.InspectContainer(id)
data, err := cgc.client.InspectContainer(id)
if err != nil {
glog.Warningf("Failed to inspect container %q: %v", id, err)
return err
}
if containerInfo.State.Running {
if data.State.Running {
return fmt.Errorf("container %q is still running", id)
}
containerName, _, err := ParseDockerName(containerInfo.Name)
containerInfo, err := newContainerGCInfo(id, data, time.Now())
if err != nil {
return err
}
cgc.removeContainer(id, containerName.PodFullName, containerName.ContainerName)
if containerIsNetworked(containerInfo.containerName) {
cgc.netContainerCleanup(containerInfo)
}
cgc.removeContainer(containerInfo)
return nil
}

View File

@@ -23,18 +23,23 @@ import (
"testing"
"time"
"github.com/golang/mock/gomock"
"github.com/stretchr/testify/assert"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/kubernetes/pkg/api/v1"
kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
knetwork "k8s.io/kubernetes/pkg/kubelet/network"
nettest "k8s.io/kubernetes/pkg/kubelet/network/testing"
)
func newTestContainerGC(t *testing.T) (*containerGC, *FakeDockerClient) {
func newTestContainerGC(t *testing.T) (*containerGC, *FakeDockerClient, *nettest.MockNetworkPlugin) {
fakeDocker := NewFakeDockerClient()
fakePodGetter := newFakePodGetter()
gc := NewContainerGC(fakeDocker, fakePodGetter, "")
return gc, fakeDocker
fakePlugin := nettest.NewMockNetworkPlugin(gomock.NewController(t))
fakePlugin.EXPECT().Name().Return("someNetworkPlugin").AnyTimes()
gc := NewContainerGC(fakeDocker, fakePodGetter, knetwork.NewPluginManager(fakePlugin), "")
return gc, fakeDocker, fakePlugin
}
// Makes a stable time object, lower id is earlier time.
@@ -91,7 +96,7 @@ func verifyStringArrayEqualsAnyOrder(t *testing.T, actual, expected []string) {
}
func TestDeleteContainerSkipRunningContainer(t *testing.T) {
gc, fakeDocker := newTestContainerGC(t)
gc, fakeDocker, _ := newTestContainerGC(t)
fakeDocker.SetFakeContainers([]*FakeContainer{
makeContainer("1876", "foo", "POD", true, makeTime(0)),
})
@@ -102,29 +107,65 @@ func TestDeleteContainerSkipRunningContainer(t *testing.T) {
}
func TestDeleteContainerRemoveDeadContainer(t *testing.T) {
gc, fakeDocker := newTestContainerGC(t)
gc, fakeDocker, fakePlugin := newTestContainerGC(t)
defer fakePlugin.Finish()
fakeDocker.SetFakeContainers([]*FakeContainer{
makeContainer("1876", "foo", "POD", false, makeTime(0)),
})
addPods(gc.podGetter, "foo")
fakePlugin.EXPECT().TearDownPod(gomock.Any(), gomock.Any(), gomock.Any()).Return(nil)
assert.Nil(t, gc.deleteContainer("1876"))
assert.Len(t, fakeDocker.Removed, 1)
}
func TestGarbageCollectNetworkTeardown(t *testing.T) {
// Ensure infra container gets teardown called
gc, fakeDocker, fakePlugin := newTestContainerGC(t)
defer fakePlugin.Finish()
id := kubecontainer.DockerID("1867").ContainerID()
fakeDocker.SetFakeContainers([]*FakeContainer{
makeContainer(id.ID, "foo", "POD", false, makeTime(0)),
})
addPods(gc.podGetter, "foo")
fakePlugin.EXPECT().TearDownPod(gomock.Any(), gomock.Any(), id).Return(nil)
assert.Nil(t, gc.deleteContainer(id.ID))
assert.Len(t, fakeDocker.Removed, 1)
// Ensure non-infra container does not have teardown called
gc, fakeDocker, fakePlugin = newTestContainerGC(t)
id = kubecontainer.DockerID("1877").ContainerID()
fakeDocker.SetFakeContainers([]*FakeContainer{
makeContainer(id.ID, "foo", "adsfasdfasdf", false, makeTime(0)),
})
fakePlugin.EXPECT().SetUpPod(gomock.Any(), gomock.Any(), id).Return(nil)
addPods(gc.podGetter, "foo")
assert.Nil(t, gc.deleteContainer(id.ID))
assert.Len(t, fakeDocker.Removed, 1)
}
func TestGarbageCollectZeroMaxContainers(t *testing.T) {
gc, fakeDocker := newTestContainerGC(t)
gc, fakeDocker, fakePlugin := newTestContainerGC(t)
defer fakePlugin.Finish()
fakeDocker.SetFakeContainers([]*FakeContainer{
makeContainer("1876", "foo", "POD", false, makeTime(0)),
})
addPods(gc.podGetter, "foo")
fakePlugin.EXPECT().TearDownPod(gomock.Any(), gomock.Any(), gomock.Any()).Return(nil)
assert.Nil(t, gc.GarbageCollect(kubecontainer.ContainerGCPolicy{MinAge: time.Minute, MaxPerPodContainer: 1, MaxContainers: 0}, true))
assert.Len(t, fakeDocker.Removed, 1)
}
func TestGarbageCollectNoMaxPerPodContainerLimit(t *testing.T) {
gc, fakeDocker := newTestContainerGC(t)
gc, fakeDocker, fakePlugin := newTestContainerGC(t)
defer fakePlugin.Finish()
fakeDocker.SetFakeContainers([]*FakeContainer{
makeContainer("1876", "foo", "POD", false, makeTime(0)),
makeContainer("2876", "foo1", "POD", false, makeTime(1)),
@@ -134,12 +175,15 @@ func TestGarbageCollectNoMaxPerPodContainerLimit(t *testing.T) {
})
addPods(gc.podGetter, "foo", "foo1", "foo2", "foo3", "foo4")
fakePlugin.EXPECT().TearDownPod(gomock.Any(), gomock.Any(), gomock.Any()).Return(nil).Times(5)
assert.Nil(t, gc.GarbageCollect(kubecontainer.ContainerGCPolicy{MinAge: time.Minute, MaxPerPodContainer: -1, MaxContainers: 4}, true))
assert.Len(t, fakeDocker.Removed, 1)
}
func TestGarbageCollectNoMaxLimit(t *testing.T) {
gc, fakeDocker := newTestContainerGC(t)
gc, fakeDocker, fakePlugin := newTestContainerGC(t)
defer fakePlugin.Finish()
fakeDocker.SetFakeContainers([]*FakeContainer{
makeContainer("1876", "foo", "POD", false, makeTime(0)),
makeContainer("2876", "foo1", "POD", false, makeTime(0)),
@@ -149,6 +193,8 @@ func TestGarbageCollectNoMaxLimit(t *testing.T) {
})
addPods(gc.podGetter, "foo", "foo1", "foo2", "foo3", "foo4")
fakePlugin.EXPECT().TearDownPod(gomock.Any(), gomock.Any(), gomock.Any()).Return(nil).Times(5)
assert.Nil(t, gc.GarbageCollect(kubecontainer.ContainerGCPolicy{MinAge: time.Minute, MaxPerPodContainer: -1, MaxContainers: -1}, true))
assert.Len(t, fakeDocker.Removed, 0)
}
@@ -261,10 +307,12 @@ func TestGarbageCollect(t *testing.T) {
}
for i, test := range tests {
t.Logf("Running test case with index %d", i)
gc, fakeDocker := newTestContainerGC(t)
gc, fakeDocker, fakePlugin := newTestContainerGC(t)
fakeDocker.SetFakeContainers(test.containers)
addPods(gc.podGetter, "foo", "foo1", "foo2", "foo3", "foo4", "foo5", "foo6", "foo7")
fakePlugin.EXPECT().TearDownPod(gomock.Any(), gomock.Any(), gomock.Any()).Return(nil).AnyTimes()
assert.Nil(t, gc.GarbageCollect(kubecontainer.ContainerGCPolicy{MinAge: time.Hour, MaxPerPodContainer: 2, MaxContainers: 6}, true))
verifyStringArrayEqualsAnyOrder(t, fakeDocker.Removed, test.expectedRemoved)
fakePlugin.Finish()
}
}

View File

@@ -282,7 +282,7 @@ func NewDockerManager(
cmdRunner := kubecontainer.DirectStreamingRunner(dm)
dm.runner = lifecycle.NewHandlerRunner(httpClient, cmdRunner, dm)
dm.imagePuller = images.NewImageManager(kubecontainer.FilterEventRecorder(recorder), dm, imageBackOff, serializeImagePulls, qps, burst)
dm.containerGC = NewContainerGC(client, podGetter, containerLogsDir)
dm.containerGC = NewContainerGC(client, podGetter, dm.network, containerLogsDir)
dm.versionCache = cache.NewObjectCache(
func() (interface{}, error) {
@@ -436,7 +436,7 @@ func (dm *DockerManager) inspectContainer(id string, podName, podNamespace strin
// Container that are running, restarting and paused
status.State = kubecontainer.ContainerStateRunning
status.StartedAt = startedAt
if containerProvidesPodIP(dockerName) {
if containerProvidesPodIP(dockerName.ContainerName) {
ip, err = dm.determineContainerIP(podNamespace, podName, iResult)
// Kubelet doesn't handle the network error scenario
if err != nil {
@@ -2675,7 +2675,7 @@ func (dm *DockerManager) GetPodStatus(uid kubetypes.UID, name, namespace string)
}
}
containerStatuses = append(containerStatuses, result)
if containerProvidesPodIP(dockerName) && ip != "" {
if containerProvidesPodIP(dockerName.ContainerName) && ip != "" {
podStatus.IP = ip
}
}

View File

@@ -52,8 +52,13 @@ func getContainerIP(container *dockertypes.ContainerJSON) string {
func getNetworkingMode() string { return "" }
// Returns true if the container name matches the infrastructure's container name
func containerProvidesPodIP(name *KubeletContainerName) bool {
return name.ContainerName == PodInfraContainerName
func containerProvidesPodIP(containerName string) bool {
return containerName == PodInfraContainerName
}
// Only the infrastructure container needs network setup/teardown
func containerIsNetworked(containerName string) bool {
return containerName == PodInfraContainerName
}
// Returns Seccomp and AppArmor Security options

View File

@@ -42,7 +42,11 @@ func getNetworkingMode() string {
return ""
}
func containerProvidesPodIP(name *KubeletContainerName) bool {
func containerProvidesPodIP(containerName string) bool {
return false
}
func containerIsNetworked(containerName string) bool {
return false
}

View File

@@ -65,8 +65,13 @@ func getNetworkingMode() string {
// Infrastructure containers are not supported on Windows. For this reason, we
// make sure to not grab the infra container's IP for the pod.
func containerProvidesPodIP(name *KubeletContainerName) bool {
return name.ContainerName != PodInfraContainerName
func containerProvidesPodIP(containerName string) bool {
return containerName != PodInfraContainerName
}
// All containers in Windows need networking setup/teardown
func containerIsNetworked(containerName string) bool {
return true
}
// Returns nil as both Seccomp and AppArmor security options are not valid on Windows