rkt: Refactor GarbageCollect() to enforce GCPolicy.MaxContainers.

This commit is contained in:
Yifan Gu 2016-04-20 17:49:08 -07:00
parent 3e83de8eeb
commit 06b1955c4a
2 changed files with 158 additions and 29 deletions

View File

@ -430,6 +430,7 @@ func NewMainKubelet(
klet, klet,
recorder, recorder,
containerRefManager, containerRefManager,
klet.podManager,
klet.livenessManager, klet.livenessManager,
klet.volumeManager, klet.volumeManager,
klet.httpClient, klet.httpClient,

View File

@ -26,6 +26,7 @@ import (
"os/exec" "os/exec"
"path" "path"
"path/filepath" "path/filepath"
"sort"
"strconv" "strconv"
"strings" "strings"
"sync" "sync"
@ -53,7 +54,6 @@ import (
"k8s.io/kubernetes/pkg/util/errors" "k8s.io/kubernetes/pkg/util/errors"
utilexec "k8s.io/kubernetes/pkg/util/exec" utilexec "k8s.io/kubernetes/pkg/util/exec"
"k8s.io/kubernetes/pkg/util/flowcontrol" "k8s.io/kubernetes/pkg/util/flowcontrol"
"k8s.io/kubernetes/pkg/util/sets"
utilstrings "k8s.io/kubernetes/pkg/util/strings" utilstrings "k8s.io/kubernetes/pkg/util/strings"
utilwait "k8s.io/kubernetes/pkg/util/wait" utilwait "k8s.io/kubernetes/pkg/util/wait"
) )
@ -72,7 +72,7 @@ const (
rktDataDir = "/var/lib/rkt" rktDataDir = "/var/lib/rkt"
rktLocalConfigDir = "/etc/rkt" rktLocalConfigDir = "/etc/rkt"
kubernetesUnitPrefix = "k8s" kubernetesUnitPrefix = "k8s_"
unitKubernetesSection = "X-Kubernetes" unitKubernetesSection = "X-Kubernetes"
unitPodName = "POD" unitPodName = "POD"
unitRktID = "RktID" unitRktID = "RktID"
@ -122,6 +122,7 @@ type Runtime struct {
dockerKeyring credentialprovider.DockerKeyring dockerKeyring credentialprovider.DockerKeyring
containerRefManager *kubecontainer.RefManager containerRefManager *kubecontainer.RefManager
podGetter podGetter
runtimeHelper kubecontainer.RuntimeHelper runtimeHelper kubecontainer.RuntimeHelper
recorder record.EventRecorder recorder record.EventRecorder
livenessManager proberesults.Manager livenessManager proberesults.Manager
@ -144,6 +145,18 @@ type volumeGetter interface {
GetVolumes(podUID types.UID) (kubecontainer.VolumeMap, bool) GetVolumes(podUID types.UID) (kubecontainer.VolumeMap, bool)
} }
// TODO(yifan): This duplicates the podGetter in dockertools.
type podGetter interface {
GetPodByUID(types.UID) (*api.Pod, bool)
}
// cliInterface wrapps the command line calls for testing purpose.
type cliInterface interface {
// args are the arguments given to the 'rkt' command,
// e.g. args can be 'rm ${UUID}'.
RunCommand(args ...string) (result []string, err error)
}
// New creates the rkt container runtime which implements the container runtime interface. // New creates the rkt container runtime which implements the container runtime interface.
// It will test if the rkt binary is in the $PATH, and whether we can get the // It will test if the rkt binary is in the $PATH, and whether we can get the
// version of it. If so, creates the rkt container runtime, otherwise returns an error. // version of it. If so, creates the rkt container runtime, otherwise returns an error.
@ -153,6 +166,7 @@ func New(
runtimeHelper kubecontainer.RuntimeHelper, runtimeHelper kubecontainer.RuntimeHelper,
recorder record.EventRecorder, recorder record.EventRecorder,
containerRefManager *kubecontainer.RefManager, containerRefManager *kubecontainer.RefManager,
podGetter podGetter,
livenessManager proberesults.Manager, livenessManager proberesults.Manager,
volumeGetter volumeGetter, volumeGetter volumeGetter,
httpClient kubetypes.HttpGetter, httpClient kubetypes.HttpGetter,
@ -195,12 +209,12 @@ func New(
config: config, config: config,
dockerKeyring: credentialprovider.NewDockerKeyring(), dockerKeyring: credentialprovider.NewDockerKeyring(),
containerRefManager: containerRefManager, containerRefManager: containerRefManager,
podGetter: podGetter,
runtimeHelper: runtimeHelper, runtimeHelper: runtimeHelper,
recorder: recorder, recorder: recorder,
livenessManager: livenessManager, livenessManager: livenessManager,
volumeGetter: volumeGetter, volumeGetter: volumeGetter,
execer: execer, execer: execer,
os: os,
touchPath: touchPath, touchPath: touchPath,
} }
@ -257,7 +271,11 @@ func (r *Runtime) runCommand(args ...string) ([]string, error) {
func makePodServiceFileName(uuid string) string { func makePodServiceFileName(uuid string) string {
// TODO(yifan): Add name for readability? We need to consider the // TODO(yifan): Add name for readability? We need to consider the
// limit of the length. // limit of the length.
return fmt.Sprintf("%s_%s.service", kubernetesUnitPrefix, uuid) return fmt.Sprintf("%s%s.service", kubernetesUnitPrefix, uuid)
}
func getRktUUIDFromServiceFileName(filename string) string {
return strings.TrimPrefix(strings.TrimSuffix(filename, path.Ext(filename)), kubernetesUnitPrefix)
} }
// setIsolators sets the apps' isolators according to the security context and resource spec. // setIsolators sets the apps' isolators according to the security context and resource spec.
@ -798,6 +816,19 @@ func kubernetesPodFilters(uid types.UID) []*rktapi.PodFilter {
} }
} }
func kubernetesPodsFilters() []*rktapi.PodFilter {
return []*rktapi.PodFilter{
{
Annotations: []*rktapi.KeyValue{
{
Key: k8sRktKubeletAnno,
Value: k8sRktKubeletAnnoValue,
},
},
},
}
}
func newUnitOption(section, name, value string) *unit.UnitOption { func newUnitOption(section, name, value string) *unit.UnitOption {
return &unit.UnitOption{Section: section, Name: name, Value: value} return &unit.UnitOption{Section: section, Name: name, Value: value}
} }
@ -1426,45 +1457,142 @@ func (r *Runtime) SyncPod(pod *api.Pod, podStatus api.PodStatus, internalPodStat
return return
} }
// Sort rkt pods by creation time.
type podsByCreatedAt []*rktapi.Pod
func (s podsByCreatedAt) Len() int { return len(s) }
func (s podsByCreatedAt) Swap(i, j int) { s[i], s[j] = s[j], s[i] }
func (s podsByCreatedAt) Less(i, j int) bool { return s[i].CreatedAt < s[j].CreatedAt }
// getPodUID returns the pod's API UID, it returns
// empty UID if the UID cannot be determined.
func getPodUID(pod *rktapi.Pod) types.UID {
for _, anno := range pod.Annotations {
if anno.Key == k8sRktUIDAnno {
return types.UID(anno.Value)
}
}
return types.UID("")
}
// podIsActive returns true if the pod is embryo, preparing or running.
// If a pod is prepared, it is not guaranteed to be active (e.g. the systemd
// service might fail).
func podIsActive(pod *rktapi.Pod) bool {
return pod.State == rktapi.PodState_POD_STATE_EMBRYO ||
pod.State == rktapi.PodState_POD_STATE_PREPARING ||
pod.State == rktapi.PodState_POD_STATE_RUNNING
}
// GarbageCollect collects the pods/containers. // GarbageCollect collects the pods/containers.
// TODO(yifan): Enforce the gc policy, also, it would be better if we can // After one GC iteration:
// just GC kubernetes pods. // - The deleted pods will be removed.
// - If the number of containers exceeds gcPolicy.MaxContainers,
// then containers whose ages are older than gcPolicy.minAge will
// be removed.
func (r *Runtime) GarbageCollect(gcPolicy kubecontainer.ContainerGCPolicy) error { func (r *Runtime) GarbageCollect(gcPolicy kubecontainer.ContainerGCPolicy) error {
var errlist []error
var totalInactiveContainers int
var inactivePods []*rktapi.Pod
var removeCandidates []*rktapi.Pod
var allPods = map[string]*rktapi.Pod{}
glog.V(4).Infof("rkt: Garbage collecting triggered with policy %v", gcPolicy)
if err := exec.Command("systemctl", "reset-failed").Run(); err != nil { if err := exec.Command("systemctl", "reset-failed").Run(); err != nil {
glog.Errorf("rkt: Failed to reset failed systemd services: %v, continue to gc anyway...", err) glog.Errorf("rkt: Failed to reset failed systemd services: %v, continue to gc anyway...", err)
} }
if _, err := r.runCommand("gc", "--grace-period="+gcPolicy.MinAge.String(), "--expire-prepared="+gcPolicy.MinAge.String()); err != nil { // GC all inactive systemd service files and pods.
glog.Errorf("rkt: Failed to gc: %v", err) files, err := r.os.ReadDir(systemdServiceDir)
}
// GC all inactive systemd service files.
units, err := r.systemd.ListUnits()
if err != nil {
glog.Errorf("rkt: Failed to list units: %v", err)
return err
}
runningKubernetesUnits := sets.NewString()
for _, u := range units {
if strings.HasPrefix(u.Name, kubernetesUnitPrefix) && u.SubState == "running" {
runningKubernetesUnits.Insert(u.Name)
}
}
files, err := ioutil.ReadDir(systemdServiceDir)
if err != nil { if err != nil {
glog.Errorf("rkt: Failed to read the systemd service directory: %v", err) glog.Errorf("rkt: Failed to read the systemd service directory: %v", err)
return err return err
} }
resp, err := r.apisvc.ListPods(context.Background(), &rktapi.ListPodsRequest{Filters: kubernetesPodsFilters()})
if err != nil {
glog.Errorf("rkt: Failed to list pods: %v", err)
return err
}
// Mark inactive pods.
for _, pod := range resp.Pods {
allPods[pod.Id] = pod
if !podIsActive(pod) {
uid := getPodUID(pod)
if uid == types.UID("") {
glog.Errorf("rkt: Cannot get the UID of pod %q, pod is broken, will remove it", pod.Id)
removeCandidates = append(removeCandidates, pod)
continue
}
_, found := r.podGetter.GetPodByUID(uid)
if !found {
removeCandidates = append(removeCandidates, pod)
continue
}
inactivePods = append(inactivePods, pod)
totalInactiveContainers = totalInactiveContainers + len(pod.Apps)
}
}
// Remove any orphan service files.
for _, f := range files { for _, f := range files {
if strings.HasPrefix(f.Name(), kubernetesUnitPrefix) && !runningKubernetesUnits.Has(f.Name()) && f.ModTime().Before(time.Now().Add(-gcPolicy.MinAge)) { serviceName := f.Name()
glog.V(4).Infof("rkt: Removing inactive systemd service file: %v", f.Name()) if strings.HasPrefix(serviceName, kubernetesUnitPrefix) {
if err := os.Remove(serviceFilePath(f.Name())); err != nil { rktUUID := getRktUUIDFromServiceFileName(serviceName)
glog.Warningf("rkt: Failed to remove inactive systemd service file %v: %v", f.Name(), err) if _, ok := allPods[rktUUID]; !ok {
glog.V(4).Infof("rkt: No rkt pod found for service file %q, will remove it", serviceName)
if err := r.os.Remove(serviceFilePath(serviceName)); err != nil {
errlist = append(errlist, fmt.Errorf("rkt: Failed to remove service file %q: %v", serviceName, err))
}
} }
} }
} }
return nil
sort.Sort(podsByCreatedAt(inactivePods))
// Enforce GCPolicy.MaxContainers.
for _, pod := range inactivePods {
if totalInactiveContainers <= gcPolicy.MaxContainers {
break
}
creationTime := time.Unix(0, pod.CreatedAt)
if creationTime.Add(gcPolicy.MinAge).Before(time.Now()) {
// The pod is old and we are exceeding the MaxContainers limit.
// Delete the pod.
removeCandidates = append(removeCandidates, pod)
totalInactiveContainers = totalInactiveContainers - len(pod.Apps)
}
}
// Remove pods and their servie files.
for _, pod := range removeCandidates {
if err := r.removePod(pod.Id); err != nil {
errlist = append(errlist, fmt.Errorf("rkt: Failed to clean up rkt pod %q: %v", pod.Id, err))
}
}
return errors.NewAggregate(errlist)
}
// removePod calls 'rkt rm $UUID' to delete a rkt pod, it also remove the systemd service file
// related to the pod.
func (r *Runtime) removePod(uuid string) error {
var errlist []error
glog.V(4).Infof("rkt: GC is removing pod %q", uuid)
if _, err := r.cli.RunCommand("rm", uuid); err != nil {
errlist = append(errlist, fmt.Errorf("rkt: Failed to remove pod %q: %v", uuid, err))
}
// GC systemd service files as well.
serviceName := makePodServiceFileName(uuid)
if err := r.os.Remove(serviceFilePath(serviceName)); err != nil {
errlist = append(errlist, fmt.Errorf("rkt: Failed to remove service file %q for pod %q: %v", serviceName, uuid, err))
}
return errors.NewAggregate(errlist)
} }
// Note: In rkt, the container ID is in the form of "UUID:appName", where // Note: In rkt, the container ID is in the form of "UUID:appName", where