Merge pull request #15051 from feiskyer/kubelet/garbage-collection

Auto commit by PR queue bot
This commit is contained in:
k8s-merge-robot 2015-10-09 02:11:47 -07:00
commit 538cf72208
10 changed files with 188 additions and 150 deletions

View File

@ -861,7 +861,7 @@ func CreateAndInitKubelet(kc *KubeletConfig) (k KubeletBootstrap, pc *config.Pod
kubeClient = kc.KubeClient
}
gcPolicy := kubelet.ContainerGCPolicy{
gcPolicy := kubecontainer.ContainerGCPolicy{
MinAge: kc.MinimumGCAge,
MaxPerPodContainer: kc.MaxPerPodContainerCount,
MaxContainers: kc.MaxContainerCount,

View File

@ -298,7 +298,7 @@ func (ks *KubeletExecutorServer) createAndInitKubelet(
kubeClient = kc.KubeClient
}
gcPolicy := kubelet.ContainerGCPolicy{
gcPolicy := kubecontainer.ContainerGCPolicy{
MinAge: kc.MinimumGCAge,
MaxPerPodContainer: kc.MaxPerPodContainerCount,
MaxContainers: kc.MaxContainerCount,

View File

@ -0,0 +1,68 @@
/*
Copyright 2014 The Kubernetes Authors All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package container
import (
"fmt"
"time"
)
// Specified a policy for garbage collecting containers.
type ContainerGCPolicy struct {
// Minimum age at which a container can be garbage collected, zero for no limit.
MinAge time.Duration
// Max number of dead containers any single pod (UID, container name) pair is
// allowed to have, less than zero for no limit.
MaxPerPodContainer int
// Max number of total dead containers, less than zero for no limit.
MaxContainers int
}
// Manages garbage collection of dead containers.
//
// Implementation is thread-compatible.
type ContainerGC interface {
// Garbage collect containers.
GarbageCollect() error
}
// TODO(vmarmol): Preferentially remove pod infra containers.
type realContainerGC struct {
// Container runtime
runtime Runtime
// Policy for garbage collection.
policy ContainerGCPolicy
}
// New ContainerGC instance with the specified policy.
func NewContainerGC(runtime Runtime, policy ContainerGCPolicy) (ContainerGC, error) {
if policy.MinAge < 0 {
return nil, fmt.Errorf("invalid minimum garbage collection age: %v", policy.MinAge)
}
return &realContainerGC{
runtime: runtime,
policy: policy,
}, nil
}
func (cgc *realContainerGC) GarbageCollect() error {
return cgc.runtime.GarbageCollect(cgc.policy)
}

View File

@ -304,3 +304,11 @@ func (f *FakeRuntime) PortForward(pod *Pod, port uint16, stream io.ReadWriteClos
f.CalledFunctions = append(f.CalledFunctions, "PortForward")
return f.Err
}
func (f *FakeRuntime) GarbageCollect(gcPolicy ContainerGCPolicy) error {
f.Lock()
defer f.Unlock()
f.CalledFunctions = append(f.CalledFunctions, "GarbageCollect")
return f.Err
}

View File

@ -74,6 +74,8 @@ type Runtime interface {
// specifies whether the runtime returns all containers including those already
// exited and dead containers (used for garbage collection).
GetPods(all bool) ([]*Pod, error)
// GarbageCollect removes dead containers using the specified container gc policy
GarbageCollect(gcPolicy ContainerGCPolicy) error
// Syncs the running pod into the desired pod.
SyncPod(pod *api.Pod, runningPod Pod, podStatus api.PodStatus, pullSecrets []api.Secret, backOff *util.Backoff) error
// KillPod kills all the containers of a pod. Pod may be nil, running pod must not be.

View File

@ -14,7 +14,7 @@ See the License for the specific language governing permissions and
limitations under the License.
*/
package kubelet
package dockertools
import (
"fmt"
@ -26,54 +26,17 @@ import (
docker "github.com/fsouza/go-dockerclient"
"github.com/golang/glog"
"k8s.io/kubernetes/pkg/kubelet/dockertools"
kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
"k8s.io/kubernetes/pkg/types"
)
// Specified a policy for garbage collecting containers.
type ContainerGCPolicy struct {
// Minimum age at which a container can be garbage collected, zero for no limit.
MinAge time.Duration
// Max number of dead containers any single pod (UID, container name) pair is
// allowed to have, less than zero for no limit.
MaxPerPodContainer int
// Max number of total dead containers, less than zero for no limit.
MaxContainers int
}
// Manages garbage collection of dead containers.
//
// Implementation is thread-compatible.
type containerGC interface {
// Garbage collect containers.
GarbageCollect() error
}
// TODO(vmarmol): Preferentially remove pod infra containers.
type realContainerGC struct {
// Docker client to use.
dockerClient dockertools.DockerInterface
// Policy for garbage collection.
policy ContainerGCPolicy
// The path to the symlinked docker logs
type containerGC struct {
client DockerInterface
containerLogsDir string
}
// New containerGC instance with the specified policy.
func newContainerGC(dockerClient dockertools.DockerInterface, policy ContainerGCPolicy) (containerGC, error) {
if policy.MinAge < 0 {
return nil, fmt.Errorf("invalid minimum garbage collection age: %v", policy.MinAge)
}
return &realContainerGC{
dockerClient: dockerClient,
policy: policy,
containerLogsDir: containerLogsDir,
}, nil
func NewContainerGC(client DockerInterface, containerLogsDir string) *containerGC {
return &containerGC{client: client, containerLogsDir: containerLogsDir}
}
// Internal information kept for containers being considered for GC.
@ -128,65 +91,7 @@ func (a byCreated) Len() int { return len(a) }
func (a byCreated) Swap(i, j int) { a[i], a[j] = a[j], a[i] }
func (a byCreated) Less(i, j int) bool { return a[i].createTime.After(a[j].createTime) }
func (cgc *realContainerGC) GarbageCollect() error {
// Separate containers by evict units.
evictUnits, unidentifiedContainers, err := cgc.evictableContainers()
if err != nil {
return err
}
// Remove unidentified containers.
for _, container := range unidentifiedContainers {
glog.Infof("Removing unidentified dead container %q with ID %q", container.name, container.id)
err = cgc.dockerClient.RemoveContainer(docker.RemoveContainerOptions{ID: container.id, RemoveVolumes: true})
if err != nil {
glog.Warningf("Failed to remove unidentified dead container %q: %v", container.name, err)
}
}
// Enforce max containers per evict unit.
if cgc.policy.MaxPerPodContainer >= 0 {
cgc.enforceMaxContainersPerEvictUnit(evictUnits, cgc.policy.MaxPerPodContainer)
}
// Enforce max total number of containers.
if cgc.policy.MaxContainers >= 0 && evictUnits.NumContainers() > cgc.policy.MaxContainers {
// Leave an equal number of containers per evict unit (min: 1).
numContainersPerEvictUnit := cgc.policy.MaxContainers / evictUnits.NumEvictUnits()
if numContainersPerEvictUnit < 1 {
numContainersPerEvictUnit = 1
}
cgc.enforceMaxContainersPerEvictUnit(evictUnits, numContainersPerEvictUnit)
// If we still need to evict, evict oldest first.
numContainers := evictUnits.NumContainers()
if numContainers > cgc.policy.MaxContainers {
flattened := make([]containerGCInfo, 0, numContainers)
for uid := range evictUnits {
flattened = append(flattened, evictUnits[uid]...)
}
sort.Sort(byCreated(flattened))
cgc.removeOldestN(flattened, numContainers-cgc.policy.MaxContainers)
}
}
// Remove dead symlinks - should only happen on upgrade
// from a k8s version without proper log symlink cleanup
logSymlinks, _ := filepath.Glob(path.Join(cgc.containerLogsDir, fmt.Sprintf("*.%s", dockertools.LogSuffix)))
for _, logSymlink := range logSymlinks {
if _, err = os.Stat(logSymlink); os.IsNotExist(err) {
err = os.Remove(logSymlink)
if err != nil {
glog.Warningf("Failed to remove container log dead symlink %q: %v", logSymlink, err)
}
}
}
return nil
}
func (cgc *realContainerGC) enforceMaxContainersPerEvictUnit(evictUnits containersByEvictUnit, MaxContainers int) {
func (cgc *containerGC) enforceMaxContainersPerEvictUnit(evictUnits containersByEvictUnit, MaxContainers int) {
for uid := range evictUnits {
toRemove := len(evictUnits[uid]) - MaxContainers
@ -197,15 +102,15 @@ func (cgc *realContainerGC) enforceMaxContainersPerEvictUnit(evictUnits containe
}
// Removes the oldest toRemove containers and returns the resulting slice.
func (cgc *realContainerGC) removeOldestN(containers []containerGCInfo, toRemove int) []containerGCInfo {
func (cgc *containerGC) removeOldestN(containers []containerGCInfo, toRemove int) []containerGCInfo {
// Remove from oldest to newest (last to first).
numToKeep := len(containers) - toRemove
for i := numToKeep; i < len(containers); i++ {
err := cgc.dockerClient.RemoveContainer(docker.RemoveContainerOptions{ID: containers[i].id, RemoveVolumes: true})
err := cgc.client.RemoveContainer(docker.RemoveContainerOptions{ID: containers[i].id, RemoveVolumes: true})
if err != nil {
glog.Warningf("Failed to remove dead container %q: %v", containers[i].name, err)
}
symlinkPath := dockertools.LogSymlink(cgc.containerLogsDir, containers[i].podNameWithNamespace, containers[i].containerName, containers[i].id)
symlinkPath := LogSymlink(cgc.containerLogsDir, containers[i].podNameWithNamespace, containers[i].containerName, containers[i].id)
err = os.Remove(symlinkPath)
if err != nil && !os.IsNotExist(err) {
glog.Warningf("Failed to remove container %q log symlink %q: %v", containers[i].name, symlinkPath, err)
@ -218,18 +123,18 @@ func (cgc *realContainerGC) removeOldestN(containers []containerGCInfo, toRemove
// Get all containers that are evictable. Evictable containers are: not running
// and created more than MinAge ago.
func (cgc *realContainerGC) evictableContainers() (containersByEvictUnit, []containerGCInfo, error) {
containers, err := dockertools.GetKubeletDockerContainers(cgc.dockerClient, true)
func (cgc *containerGC) evictableContainers(minAge time.Duration) (containersByEvictUnit, []containerGCInfo, error) {
containers, err := GetKubeletDockerContainers(cgc.client, true)
if err != nil {
return containersByEvictUnit{}, []containerGCInfo{}, err
}
unidentifiedContainers := make([]containerGCInfo, 0)
evictUnits := make(containersByEvictUnit)
newestGCTime := time.Now().Add(-cgc.policy.MinAge)
newestGCTime := time.Now().Add(-minAge)
for _, container := range containers {
// Prune out running containers.
data, err := cgc.dockerClient.InspectContainer(container.ID)
data, err := cgc.client.InspectContainer(container.ID)
if err != nil {
// Container may have been removed already, skip.
continue
@ -245,7 +150,7 @@ func (cgc *realContainerGC) evictableContainers() (containersByEvictUnit, []cont
createTime: data.Created,
}
containerName, _, err := dockertools.ParseDockerName(container.Names[0])
containerName, _, err := ParseDockerName(container.Names[0])
if err != nil {
unidentifiedContainers = append(unidentifiedContainers, containerInfo)
@ -267,3 +172,62 @@ func (cgc *realContainerGC) evictableContainers() (containersByEvictUnit, []cont
return evictUnits, unidentifiedContainers, nil
}
// GarbageCollect removes dead containers using the specified container gc policy
func (cgc *containerGC) GarbageCollect(gcPolicy kubecontainer.ContainerGCPolicy) error {
// Separate containers by evict units.
evictUnits, unidentifiedContainers, err := cgc.evictableContainers(gcPolicy.MinAge)
if err != nil {
return err
}
// Remove unidentified containers.
for _, container := range unidentifiedContainers {
glog.Infof("Removing unidentified dead container %q with ID %q", container.name, container.id)
err = cgc.client.RemoveContainer(docker.RemoveContainerOptions{ID: container.id, RemoveVolumes: true})
if err != nil {
glog.Warningf("Failed to remove unidentified dead container %q: %v", container.name, err)
}
}
// Enforce max containers per evict unit.
if gcPolicy.MaxPerPodContainer >= 0 {
cgc.enforceMaxContainersPerEvictUnit(evictUnits, gcPolicy.MaxPerPodContainer)
}
// Enforce max total number of containers.
if gcPolicy.MaxContainers >= 0 && evictUnits.NumContainers() > gcPolicy.MaxContainers {
// Leave an equal number of containers per evict unit (min: 1).
numContainersPerEvictUnit := gcPolicy.MaxContainers / evictUnits.NumEvictUnits()
if numContainersPerEvictUnit < 1 {
numContainersPerEvictUnit = 1
}
cgc.enforceMaxContainersPerEvictUnit(evictUnits, numContainersPerEvictUnit)
// If we still need to evict, evict oldest first.
numContainers := evictUnits.NumContainers()
if numContainers > gcPolicy.MaxContainers {
flattened := make([]containerGCInfo, 0, numContainers)
for uid := range evictUnits {
flattened = append(flattened, evictUnits[uid]...)
}
sort.Sort(byCreated(flattened))
cgc.removeOldestN(flattened, numContainers-gcPolicy.MaxContainers)
}
}
// Remove dead symlinks - should only happen on upgrade
// from a k8s version without proper log symlink cleanup
logSymlinks, _ := filepath.Glob(path.Join(cgc.containerLogsDir, fmt.Sprintf("*.%s", LogSuffix)))
for _, logSymlink := range logSymlinks {
if _, err = os.Stat(logSymlink); os.IsNotExist(err) {
err = os.Remove(logSymlink)
if err != nil {
glog.Warningf("Failed to remove container log dead symlink %q: %v", logSymlink, err)
}
}
}
return nil
}

View File

@ -14,7 +14,7 @@ See the License for the specific language governing permissions and
limitations under the License.
*/
package kubelet
package dockertools
import (
"fmt"
@ -25,23 +25,18 @@ import (
docker "github.com/fsouza/go-dockerclient"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"k8s.io/kubernetes/pkg/kubelet/dockertools"
kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
)
func newTestContainerGC(t *testing.T, MinAge time.Duration, MaxPerPodContainer, MaxContainers int) (containerGC, *dockertools.FakeDockerClient) {
fakeDocker := new(dockertools.FakeDockerClient)
gc, err := newContainerGC(fakeDocker, ContainerGCPolicy{
MinAge: MinAge,
MaxPerPodContainer: MaxPerPodContainer,
MaxContainers: MaxContainers,
})
require.Nil(t, err)
func newTestContainerGC(t *testing.T) (*containerGC, *FakeDockerClient) {
fakeDocker := new(FakeDockerClient)
gc := NewContainerGC(fakeDocker, "")
return gc, fakeDocker
}
// Makes a stable time object, lower id is earlier time.
func makeTime(id int) time.Time {
var zero time.Time
return zero.Add(time.Duration(id) * time.Second)
}
@ -90,7 +85,7 @@ func verifyStringArrayEqualsAnyOrder(t *testing.T, actual, expected []string) {
}
func TestGarbageCollectZeroMaxContainers(t *testing.T) {
gc, fakeDocker := newTestContainerGC(t, time.Minute, 1, 0)
gc, fakeDocker := newTestContainerGC(t)
fakeDocker.ContainerList = []docker.APIContainers{
makeAPIContainer("foo", "POD", "1876"),
}
@ -98,12 +93,12 @@ func TestGarbageCollectZeroMaxContainers(t *testing.T) {
makeContainerDetail("1876", false, makeTime(0)),
)
assert.Nil(t, gc.GarbageCollect())
assert.Nil(t, gc.GarbageCollect(kubecontainer.ContainerGCPolicy{time.Minute, 1, 0}))
assert.Len(t, fakeDocker.Removed, 1)
}
func TestGarbageCollectNoMaxPerPodContainerLimit(t *testing.T) {
gc, fakeDocker := newTestContainerGC(t, time.Minute, -1, 4)
gc, fakeDocker := newTestContainerGC(t)
fakeDocker.ContainerList = []docker.APIContainers{
makeAPIContainer("foo", "POD", "1876"),
makeAPIContainer("foo1", "POD", "2876"),
@ -119,12 +114,12 @@ func TestGarbageCollectNoMaxPerPodContainerLimit(t *testing.T) {
makeContainerDetail("5876", false, makeTime(4)),
)
assert.Nil(t, gc.GarbageCollect())
assert.Nil(t, gc.GarbageCollect(kubecontainer.ContainerGCPolicy{time.Minute, -1, 4}))
assert.Len(t, fakeDocker.Removed, 1)
}
func TestGarbageCollectNoMaxLimit(t *testing.T) {
gc, fakeDocker := newTestContainerGC(t, time.Minute, 1, -1)
gc, fakeDocker := newTestContainerGC(t)
fakeDocker.ContainerList = []docker.APIContainers{
makeAPIContainer("foo", "POD", "1876"),
makeAPIContainer("foo1", "POD", "2876"),
@ -140,7 +135,7 @@ func TestGarbageCollectNoMaxLimit(t *testing.T) {
makeContainerDetail("5876", false, makeTime(0)),
)
assert.Nil(t, gc.GarbageCollect())
assert.Nil(t, gc.GarbageCollect(kubecontainer.ContainerGCPolicy{time.Minute, 1, -1}))
assert.Len(t, fakeDocker.Removed, 0)
}
@ -309,10 +304,10 @@ func TestGarbageCollect(t *testing.T) {
}
for i, test := range tests {
t.Logf("Running test case with index %d", i)
gc, fakeDocker := newTestContainerGC(t, time.Hour, 2, 6)
gc, fakeDocker := newTestContainerGC(t)
fakeDocker.ContainerList = test.containers
fakeDocker.ContainerMap = test.containerDetails
assert.Nil(t, gc.GarbageCollect())
assert.Nil(t, gc.GarbageCollect(kubecontainer.ContainerGCPolicy{time.Hour, 2, 6}))
verifyStringArrayEqualsAnyOrder(t, fakeDocker.Removed, test.expectedRemoved)
}
}

View File

@ -139,6 +139,9 @@ type DockerManager struct {
// If true, enforce container cpu limits with CFS quota support
cpuCFSQuota bool
// Container GC manager
containerGC *containerGC
}
func NewDockerManager(
@ -214,6 +217,7 @@ func NewDockerManager(
}
dm.runner = lifecycle.NewHandlerRunner(httpClient, dm, dm)
dm.imagePuller = kubecontainer.NewImagePuller(recorder, dm, imageBackOff)
dm.containerGC = NewContainerGC(client, containerLogsDir)
return dm
}
@ -2019,3 +2023,8 @@ func (dm *DockerManager) GetNetNs(containerID kubecontainer.ContainerID) (string
netnsPath := fmt.Sprintf(DockerNetnsFmt, inspectResult.State.Pid)
return netnsPath, nil
}
// Garbage collection of dead containers
func (dm *DockerManager) GarbageCollect(gcPolicy kubecontainer.ContainerGCPolicy) error {
return dm.containerGC.GarbageCollect(gcPolicy)
}

View File

@ -147,7 +147,7 @@ func NewMainKubelet(
pullBurst int,
eventQPS float32,
eventBurst int,
containerGCPolicy ContainerGCPolicy,
containerGCPolicy kubecontainer.ContainerGCPolicy,
sourcesReady SourcesReadyFn,
registerNode bool,
standaloneMode bool,
@ -236,11 +236,6 @@ func NewMainKubelet(
Namespace: "",
}
containerGC, err := newContainerGC(dockerClient, containerGCPolicy)
if err != nil {
return nil, err
}
diskSpaceManager, err := newDiskSpaceManager(cadvisorInterface, diskSpacePolicy)
if err != nil {
return nil, fmt.Errorf("failed to initialize disk manager: %v", err)
@ -275,7 +270,6 @@ func NewMainKubelet(
streamingConnectionIdleTimeout: streamingConnectionIdleTimeout,
recorder: recorder,
cadvisor: cadvisorInterface,
containerGC: containerGC,
diskSpaceManager: diskSpaceManager,
statusManager: statusManager,
volumeManager: volumeManager,
@ -352,7 +346,6 @@ func NewMainKubelet(
return nil, err
}
klet.containerRuntime = rktRuntime
klet.containerGC = rktRuntime
klet.imageManager = rkt.NewImageManager(rktRuntime)
// No Docker daemon to put in a container.
@ -361,6 +354,13 @@ func NewMainKubelet(
return nil, fmt.Errorf("unsupported container runtime %q specified", containerRuntime)
}
// setup containerGC
containerGC, err := kubecontainer.NewContainerGC(klet.containerRuntime, containerGCPolicy)
if err != nil {
return nil, err
}
klet.containerGC = containerGC
// setup imageManager
imageManager, err := newImageManager(klet.containerRuntime, cadvisorInterface, recorder, nodeRef, imageGCPolicy)
if err != nil {
@ -510,7 +510,7 @@ type Kubelet struct {
recorder record.EventRecorder
// Policy for handling garbage collection of dead containers.
containerGC containerGC
containerGC kubecontainer.ContainerGC
// Manager for images.
imageManager imageManager

View File

@ -68,14 +68,6 @@ const (
authDir = "auth.d"
dockerAuthTemplate = `{"rktKind":"dockerAuth","rktVersion":"v1","registries":[%q],"credentials":{"user":%q,"password":%q}}`
// TODO(yifan): Merge with ContainerGCPolicy, i.e., derive
// the grace period from MinAge in ContainerGCPolicy.
//
// Duration to wait before discarding inactive pods from garbage
defaultGracePeriod = "1m"
// Duration to wait before expiring prepared pods.
defaultExpirePrepared = "1m"
defaultImageTag = "latest"
)
@ -1083,11 +1075,11 @@ func (r *Runtime) GetContainerLogs(pod *api.Pod, containerID kubecontainer.Conta
// GarbageCollect collects the pods/containers.
// TODO(yifan): Enforce the gc policy, also, it would be better if we can
// just GC kubernetes pods.
func (r *Runtime) GarbageCollect() error {
func (r *Runtime) GarbageCollect(gcPolicy kubecontainer.ContainerGCPolicy) error {
if err := exec.Command("systemctl", "reset-failed").Run(); err != nil {
glog.Errorf("rkt: Failed to reset failed systemd services: %v", err)
}
if _, err := r.runCommand("gc", "--grace-period="+defaultGracePeriod, "--expire-prepared="+defaultExpirePrepared); err != nil {
if _, err := r.runCommand("gc", "--grace-period="+gcPolicy.MinAge.String(), "--expire-prepared="+gcPolicy.MinAge.String()); err != nil {
glog.Errorf("rkt: Failed to gc: %v", err)
return err
}