diff --git a/pkg/kubelet/kubelet.go b/pkg/kubelet/kubelet.go index 595e0650a8a..5000e1ce29c 100644 --- a/pkg/kubelet/kubelet.go +++ b/pkg/kubelet/kubelet.go @@ -430,6 +430,7 @@ func NewMainKubelet( klet, recorder, containerRefManager, + klet.podManager, klet.livenessManager, klet.volumeManager, klet.httpClient, diff --git a/pkg/kubelet/rkt/rkt.go b/pkg/kubelet/rkt/rkt.go index 940b7d0c18a..8f8624e952c 100644 --- a/pkg/kubelet/rkt/rkt.go +++ b/pkg/kubelet/rkt/rkt.go @@ -26,6 +26,7 @@ import ( "os/exec" "path" "path/filepath" + "sort" "strconv" "strings" "sync" @@ -53,7 +54,6 @@ import ( "k8s.io/kubernetes/pkg/util/errors" utilexec "k8s.io/kubernetes/pkg/util/exec" "k8s.io/kubernetes/pkg/util/flowcontrol" - "k8s.io/kubernetes/pkg/util/sets" utilstrings "k8s.io/kubernetes/pkg/util/strings" utilwait "k8s.io/kubernetes/pkg/util/wait" ) @@ -72,7 +72,7 @@ const ( rktDataDir = "/var/lib/rkt" rktLocalConfigDir = "/etc/rkt" - kubernetesUnitPrefix = "k8s" + kubernetesUnitPrefix = "k8s_" unitKubernetesSection = "X-Kubernetes" unitPodName = "POD" unitRktID = "RktID" @@ -122,6 +122,7 @@ type Runtime struct { dockerKeyring credentialprovider.DockerKeyring containerRefManager *kubecontainer.RefManager + podGetter podGetter runtimeHelper kubecontainer.RuntimeHelper recorder record.EventRecorder livenessManager proberesults.Manager @@ -144,6 +145,18 @@ type volumeGetter interface { GetVolumes(podUID types.UID) (kubecontainer.VolumeMap, bool) } +// TODO(yifan): This duplicates the podGetter in dockertools. +type podGetter interface { + GetPodByUID(types.UID) (*api.Pod, bool) +} + +// cliInterface wrapps the command line calls for testing purpose. +type cliInterface interface { + // args are the arguments given to the 'rkt' command, + // e.g. args can be 'rm ${UUID}'. + RunCommand(args ...string) (result []string, err error) +} + // New creates the rkt container runtime which implements the container runtime interface. // It will test if the rkt binary is in the $PATH, and whether we can get the // version of it. If so, creates the rkt container runtime, otherwise returns an error. @@ -153,6 +166,7 @@ func New( runtimeHelper kubecontainer.RuntimeHelper, recorder record.EventRecorder, containerRefManager *kubecontainer.RefManager, + podGetter podGetter, livenessManager proberesults.Manager, volumeGetter volumeGetter, httpClient kubetypes.HttpGetter, @@ -195,12 +209,12 @@ func New( config: config, dockerKeyring: credentialprovider.NewDockerKeyring(), containerRefManager: containerRefManager, + podGetter: podGetter, runtimeHelper: runtimeHelper, recorder: recorder, livenessManager: livenessManager, volumeGetter: volumeGetter, execer: execer, - os: os, touchPath: touchPath, } @@ -257,7 +271,11 @@ func (r *Runtime) runCommand(args ...string) ([]string, error) { func makePodServiceFileName(uuid string) string { // TODO(yifan): Add name for readability? We need to consider the // limit of the length. - return fmt.Sprintf("%s_%s.service", kubernetesUnitPrefix, uuid) + return fmt.Sprintf("%s%s.service", kubernetesUnitPrefix, uuid) +} + +func getRktUUIDFromServiceFileName(filename string) string { + return strings.TrimPrefix(strings.TrimSuffix(filename, path.Ext(filename)), kubernetesUnitPrefix) } // setIsolators sets the apps' isolators according to the security context and resource spec. @@ -798,6 +816,19 @@ func kubernetesPodFilters(uid types.UID) []*rktapi.PodFilter { } } +func kubernetesPodsFilters() []*rktapi.PodFilter { + return []*rktapi.PodFilter{ + { + Annotations: []*rktapi.KeyValue{ + { + Key: k8sRktKubeletAnno, + Value: k8sRktKubeletAnnoValue, + }, + }, + }, + } +} + func newUnitOption(section, name, value string) *unit.UnitOption { return &unit.UnitOption{Section: section, Name: name, Value: value} } @@ -1426,45 +1457,142 @@ func (r *Runtime) SyncPod(pod *api.Pod, podStatus api.PodStatus, internalPodStat return } +// Sort rkt pods by creation time. +type podsByCreatedAt []*rktapi.Pod + +func (s podsByCreatedAt) Len() int { return len(s) } +func (s podsByCreatedAt) Swap(i, j int) { s[i], s[j] = s[j], s[i] } +func (s podsByCreatedAt) Less(i, j int) bool { return s[i].CreatedAt < s[j].CreatedAt } + +// getPodUID returns the pod's API UID, it returns +// empty UID if the UID cannot be determined. +func getPodUID(pod *rktapi.Pod) types.UID { + for _, anno := range pod.Annotations { + if anno.Key == k8sRktUIDAnno { + return types.UID(anno.Value) + } + } + return types.UID("") +} + +// podIsActive returns true if the pod is embryo, preparing or running. +// If a pod is prepared, it is not guaranteed to be active (e.g. the systemd +// service might fail). +func podIsActive(pod *rktapi.Pod) bool { + return pod.State == rktapi.PodState_POD_STATE_EMBRYO || + pod.State == rktapi.PodState_POD_STATE_PREPARING || + pod.State == rktapi.PodState_POD_STATE_RUNNING +} + // GarbageCollect collects the pods/containers. -// TODO(yifan): Enforce the gc policy, also, it would be better if we can -// just GC kubernetes pods. +// After one GC iteration: +// - The deleted pods will be removed. +// - If the number of containers exceeds gcPolicy.MaxContainers, +// then containers whose ages are older than gcPolicy.minAge will +// be removed. func (r *Runtime) GarbageCollect(gcPolicy kubecontainer.ContainerGCPolicy) error { + var errlist []error + var totalInactiveContainers int + var inactivePods []*rktapi.Pod + var removeCandidates []*rktapi.Pod + var allPods = map[string]*rktapi.Pod{} + + glog.V(4).Infof("rkt: Garbage collecting triggered with policy %v", gcPolicy) + if err := exec.Command("systemctl", "reset-failed").Run(); err != nil { glog.Errorf("rkt: Failed to reset failed systemd services: %v, continue to gc anyway...", err) } - if _, err := r.runCommand("gc", "--grace-period="+gcPolicy.MinAge.String(), "--expire-prepared="+gcPolicy.MinAge.String()); err != nil { - glog.Errorf("rkt: Failed to gc: %v", err) - } - - // GC all inactive systemd service files. - units, err := r.systemd.ListUnits() - if err != nil { - glog.Errorf("rkt: Failed to list units: %v", err) - return err - } - runningKubernetesUnits := sets.NewString() - for _, u := range units { - if strings.HasPrefix(u.Name, kubernetesUnitPrefix) && u.SubState == "running" { - runningKubernetesUnits.Insert(u.Name) - } - } - - files, err := ioutil.ReadDir(systemdServiceDir) + // GC all inactive systemd service files and pods. + files, err := r.os.ReadDir(systemdServiceDir) if err != nil { glog.Errorf("rkt: Failed to read the systemd service directory: %v", err) return err } + + resp, err := r.apisvc.ListPods(context.Background(), &rktapi.ListPodsRequest{Filters: kubernetesPodsFilters()}) + if err != nil { + glog.Errorf("rkt: Failed to list pods: %v", err) + return err + } + + // Mark inactive pods. + for _, pod := range resp.Pods { + allPods[pod.Id] = pod + if !podIsActive(pod) { + uid := getPodUID(pod) + if uid == types.UID("") { + glog.Errorf("rkt: Cannot get the UID of pod %q, pod is broken, will remove it", pod.Id) + removeCandidates = append(removeCandidates, pod) + continue + } + _, found := r.podGetter.GetPodByUID(uid) + if !found { + removeCandidates = append(removeCandidates, pod) + continue + } + + inactivePods = append(inactivePods, pod) + totalInactiveContainers = totalInactiveContainers + len(pod.Apps) + } + } + + // Remove any orphan service files. for _, f := range files { - if strings.HasPrefix(f.Name(), kubernetesUnitPrefix) && !runningKubernetesUnits.Has(f.Name()) && f.ModTime().Before(time.Now().Add(-gcPolicy.MinAge)) { - glog.V(4).Infof("rkt: Removing inactive systemd service file: %v", f.Name()) - if err := os.Remove(serviceFilePath(f.Name())); err != nil { - glog.Warningf("rkt: Failed to remove inactive systemd service file %v: %v", f.Name(), err) + serviceName := f.Name() + if strings.HasPrefix(serviceName, kubernetesUnitPrefix) { + rktUUID := getRktUUIDFromServiceFileName(serviceName) + if _, ok := allPods[rktUUID]; !ok { + glog.V(4).Infof("rkt: No rkt pod found for service file %q, will remove it", serviceName) + if err := r.os.Remove(serviceFilePath(serviceName)); err != nil { + errlist = append(errlist, fmt.Errorf("rkt: Failed to remove service file %q: %v", serviceName, err)) + } } } } - return nil + + sort.Sort(podsByCreatedAt(inactivePods)) + + // Enforce GCPolicy.MaxContainers. + for _, pod := range inactivePods { + if totalInactiveContainers <= gcPolicy.MaxContainers { + break + } + creationTime := time.Unix(0, pod.CreatedAt) + if creationTime.Add(gcPolicy.MinAge).Before(time.Now()) { + // The pod is old and we are exceeding the MaxContainers limit. + // Delete the pod. + removeCandidates = append(removeCandidates, pod) + totalInactiveContainers = totalInactiveContainers - len(pod.Apps) + } + } + + // Remove pods and their servie files. + for _, pod := range removeCandidates { + if err := r.removePod(pod.Id); err != nil { + errlist = append(errlist, fmt.Errorf("rkt: Failed to clean up rkt pod %q: %v", pod.Id, err)) + } + } + + return errors.NewAggregate(errlist) +} + +// removePod calls 'rkt rm $UUID' to delete a rkt pod, it also remove the systemd service file +// related to the pod. +func (r *Runtime) removePod(uuid string) error { + var errlist []error + glog.V(4).Infof("rkt: GC is removing pod %q", uuid) + if _, err := r.cli.RunCommand("rm", uuid); err != nil { + errlist = append(errlist, fmt.Errorf("rkt: Failed to remove pod %q: %v", uuid, err)) + } + + // GC systemd service files as well. + serviceName := makePodServiceFileName(uuid) + if err := r.os.Remove(serviceFilePath(serviceName)); err != nil { + errlist = append(errlist, fmt.Errorf("rkt: Failed to remove service file %q for pod %q: %v", serviceName, uuid, err)) + } + + return errors.NewAggregate(errlist) } // Note: In rkt, the container ID is in the form of "UUID:appName", where