Merge pull request #24647 from yifan-gu/rkt_gc

Automatic merge from submit-queue

rkt: Refactor GarbageCollect to enforce GCPolicy.

Previously, we uses `rkt gc` to garbage collect dead pods, which is very coarse, and can cause the dead pods to be removed too aggressively. 

This PR improves the garbage collection, now after one GC iteration:
- The deleted pods will be removed.
- If the number of containers exceeds gcPolicy.MaxContainers,
  then containers whose ages are older than gcPolicy.minAge will be removed.

cc @kubernetes/sig-node @euank @sjpotter 

Pending on #23887 for the Godep updates.

<!-- Reviewable:start -->
---
This change is [<img src="http://reviewable.k8s.io/review_button.svg" height="35" align="absmiddle" alt="Reviewable"/>](http://reviewable.k8s.io/reviews/kubernetes/kubernetes/24647)
<!-- Reviewable:end -->
This commit is contained in:
k8s-merge-robot 2016-05-14 12:13:01 -07:00
commit 5df28fa18d
15 changed files with 555 additions and 68 deletions

View File

@ -230,7 +230,7 @@ func startComponents(firstManifestURL, secondManifestURL string) (string, string
cadvisorInterface, cadvisorInterface,
configFilePath, configFilePath,
nil, nil,
containertest.FakeOS{}, &containertest.FakeOS{},
1*time.Second, /* FileCheckFrequency */ 1*time.Second, /* FileCheckFrequency */
1*time.Second, /* HTTPCheckFrequency */ 1*time.Second, /* HTTPCheckFrequency */
10*time.Second, /* MinimumGCAge */ 10*time.Second, /* MinimumGCAge */
@ -263,7 +263,7 @@ func startComponents(firstManifestURL, secondManifestURL string) (string, string
cadvisorInterface, cadvisorInterface,
"", "",
nil, nil,
containertest.FakeOS{}, &containertest.FakeOS{},
1*time.Second, /* FileCheckFrequency */ 1*time.Second, /* FileCheckFrequency */
1*time.Second, /* HTTPCheckFrequency */ 1*time.Second, /* HTTPCheckFrequency */
10*time.Second, /* MinimumGCAge */ 10*time.Second, /* MinimumGCAge */

View File

@ -17,7 +17,9 @@ limitations under the License.
package container package container
import ( import (
"io/ioutil"
"os" "os"
"time"
) )
// OSInterface collects system level operations that need to be mocked out // OSInterface collects system level operations that need to be mocked out
@ -26,6 +28,12 @@ type OSInterface interface {
Mkdir(path string, perm os.FileMode) error Mkdir(path string, perm os.FileMode) error
Symlink(oldname string, newname string) error Symlink(oldname string, newname string) error
Stat(path string) (os.FileInfo, error) Stat(path string) (os.FileInfo, error)
Remove(path string) error
Create(path string) (*os.File, error)
Hostname() (name string, err error)
Chtimes(path string, atime time.Time, mtime time.Time) error
Pipe() (r *os.File, w *os.File, err error)
ReadDir(dirname string) ([]os.FileInfo, error)
} }
// RealOS is used to dispatch the real system level operaitons. // RealOS is used to dispatch the real system level operaitons.
@ -45,3 +53,34 @@ func (RealOS) Symlink(oldname string, newname string) error {
func (RealOS) Stat(path string) (os.FileInfo, error) { func (RealOS) Stat(path string) (os.FileInfo, error) {
return os.Stat(path) return os.Stat(path)
} }
// Remove will call os.Remove to remove the path.
func (RealOS) Remove(path string) error {
return os.Remove(path)
}
// Create will call os.Create to create and return a file
// at path.
func (RealOS) Create(path string) (*os.File, error) {
return os.Create(path)
}
// Hostname will call os.Hostname to return the hostname.
func (RealOS) Hostname() (name string, err error) {
return os.Hostname()
}
// Chtimes will call os.Chtimes to change the atime and mtime of the path
func (RealOS) Chtimes(path string, atime time.Time, mtime time.Time) error {
return os.Chtimes(path, atime, mtime)
}
// Pipe will call os.Pipe to return a connected pair of pipe.
func (RealOS) Pipe() (r *os.File, w *os.File, err error) {
return os.Pipe()
}
// ReadDir will call ioutil.ReadDir to return the files under the directory.
func (RealOS) ReadDir(dirname string) ([]os.FileInfo, error) {
return ioutil.ReadDir(dirname)
}

View File

@ -19,14 +19,25 @@ package testing
import ( import (
"errors" "errors"
"os" "os"
"time"
) )
// FakeOS mocks out certain OS calls to avoid perturbing the filesystem // FakeOS mocks out certain OS calls to avoid perturbing the filesystem
// on the test machine.
// If a member of the form `*Fn` is set, that function will be called in place // If a member of the form `*Fn` is set, that function will be called in place
// of the real call. // of the real call.
type FakeOS struct { type FakeOS struct {
StatFn func(string) (os.FileInfo, error) StatFn func(string) (os.FileInfo, error)
ReadDirFn func(string) ([]os.FileInfo, error)
HostName string
Removes []string
Files map[string][]*os.FileInfo
}
func NewFakeOS() *FakeOS {
return &FakeOS{
Removes: []string{},
Files: make(map[string][]*os.FileInfo),
}
} }
// Mkdir is a fake call that just returns nil. // Mkdir is a fake call that just returns nil.
@ -46,3 +57,37 @@ func (f FakeOS) Stat(path string) (os.FileInfo, error) {
} }
return nil, errors.New("unimplemented testing mock") return nil, errors.New("unimplemented testing mock")
} }
// Remove is a fake call that returns nil.
func (f *FakeOS) Remove(path string) error {
f.Removes = append(f.Removes, path)
return nil
}
// Create is a fake call that returns nil.
func (FakeOS) Create(path string) (*os.File, error) {
return nil, nil
}
// Hostname is a fake call that returns nil.
func (f *FakeOS) Hostname() (name string, err error) {
return f.HostName, nil
}
// Chtimes is a fake call that returns nil.
func (FakeOS) Chtimes(path string, atime time.Time, mtime time.Time) error {
return nil
}
// Pipe is a fake call that returns nil.
func (FakeOS) Pipe() (r *os.File, w *os.File, err error) {
return nil, nil, nil
}
// ReadDir is a fake call that returns the files under the directory.
func (f *FakeOS) ReadDir(dirname string) ([]os.FileInfo, error) {
if f.ReadDirFn != nil {
return f.ReadDirFn(dirname)
}
return nil, errors.New("unimplemented testing mock")
}

View File

@ -653,7 +653,7 @@ func TestFindContainersByPod(t *testing.T) {
fakeClient := NewFakeDockerClient() fakeClient := NewFakeDockerClient()
np, _ := network.InitNetworkPlugin([]network.NetworkPlugin{}, "", nettest.NewFakeHost(nil), componentconfig.HairpinNone) np, _ := network.InitNetworkPlugin([]network.NetworkPlugin{}, "", nettest.NewFakeHost(nil), componentconfig.HairpinNone)
// image back-off is set to nil, this test should not pull images // image back-off is set to nil, this test should not pull images
containerManager := NewFakeDockerManager(fakeClient, &record.FakeRecorder{}, nil, nil, &cadvisorapi.MachineInfo{}, options.GetDefaultPodInfraContainerImage(), 0, 0, "", containertest.FakeOS{}, np, nil, nil, nil) containerManager := NewFakeDockerManager(fakeClient, &record.FakeRecorder{}, nil, nil, &cadvisorapi.MachineInfo{}, options.GetDefaultPodInfraContainerImage(), 0, 0, "", &containertest.FakeOS{}, np, nil, nil, nil)
for i, test := range tests { for i, test := range tests {
fakeClient.RunningContainerList = test.runningContainerList fakeClient.RunningContainerList = test.runningContainerList
fakeClient.ExitedContainerList = test.exitedContainerList fakeClient.ExitedContainerList = test.exitedContainerList

View File

@ -115,7 +115,7 @@ func createTestDockerManager(fakeHTTPClient *fakeHTTP, fakeDocker *FakeDockerCli
&cadvisorapi.MachineInfo{}, &cadvisorapi.MachineInfo{},
options.GetDefaultPodInfraContainerImage(), options.GetDefaultPodInfraContainerImage(),
0, 0, "", 0, 0, "",
containertest.FakeOS{}, &containertest.FakeOS{},
networkPlugin, networkPlugin,
&fakeRuntimeHelper{}, &fakeRuntimeHelper{},
fakeHTTPClient, fakeHTTPClient,

View File

@ -430,6 +430,7 @@ func NewMainKubelet(
klet, klet,
recorder, recorder,
containerRefManager, containerRefManager,
klet.podManager,
klet.livenessManager, klet.livenessManager,
klet.volumeManager, klet.volumeManager,
klet.httpClient, klet.httpClient,

View File

@ -123,7 +123,7 @@ func newTestKubelet(t *testing.T) *TestKubelet {
fakeKubeClient := &fake.Clientset{} fakeKubeClient := &fake.Clientset{}
kubelet := &Kubelet{} kubelet := &Kubelet{}
kubelet.kubeClient = fakeKubeClient kubelet.kubeClient = fakeKubeClient
kubelet.os = containertest.FakeOS{} kubelet.os = &containertest.FakeOS{}
kubelet.hostname = testKubeletHostname kubelet.hostname = testKubeletHostname
kubelet.nodeName = testKubeletHostname kubelet.nodeName = testKubeletHostname

View File

@ -154,7 +154,7 @@ func newTestDockerManager() (*dockertools.DockerManager, *dockertools.FakeDocker
&cadvisorapi.MachineInfo{}, &cadvisorapi.MachineInfo{},
options.GetDefaultPodInfraContainerImage(), options.GetDefaultPodInfraContainerImage(),
0, 0, "", 0, 0, "",
containertest.FakeOS{}, &containertest.FakeOS{},
networkPlugin, networkPlugin,
nil, nil,
nil, nil,

View File

@ -19,16 +19,16 @@ package rkt
import ( import (
"fmt" "fmt"
"strconv" "strconv"
"strings"
"sync" "sync"
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/types"
"github.com/coreos/go-systemd/dbus" "github.com/coreos/go-systemd/dbus"
rktapi "github.com/coreos/rkt/api/v1alpha" rktapi "github.com/coreos/rkt/api/v1alpha"
"golang.org/x/net/context" "golang.org/x/net/context"
"google.golang.org/grpc" "google.golang.org/grpc"
"k8s.io/kubernetes/pkg/api"
kubecontainer "k8s.io/kubernetes/pkg/kubelet/container" kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
"k8s.io/kubernetes/pkg/types"
) )
// fakeRktInterface mocks the rktapi.PublicAPIClient interface for testing purpose. // fakeRktInterface mocks the rktapi.PublicAPIClient interface for testing purpose.
@ -147,6 +147,14 @@ func (f *fakeSystemd) Reload() error {
return fmt.Errorf("Not implemented") return fmt.Errorf("Not implemented")
} }
func (f *fakeSystemd) ResetFailed() error {
f.Lock()
defer f.Unlock()
f.called = append(f.called, "ResetFailed")
return f.err
}
// fakeRuntimeHelper implementes kubecontainer.RuntimeHelper interfaces for testing purpose. // fakeRuntimeHelper implementes kubecontainer.RuntimeHelper interfaces for testing purpose.
type fakeRuntimeHelper struct { type fakeRuntimeHelper struct {
dnsServers []string dnsServers []string
@ -171,3 +179,44 @@ func (f *fakeRuntimeHelper) GeneratePodHostNameAndDomain(pod *api.Pod) (string,
func (f *fakeRuntimeHelper) GetPodDir(podUID types.UID) string { func (f *fakeRuntimeHelper) GetPodDir(podUID types.UID) string {
return "/poddir/" + string(podUID) return "/poddir/" + string(podUID)
} }
type fakeRktCli struct {
sync.Mutex
cmds []string
result []string
err error
}
func newFakeRktCli() *fakeRktCli {
return &fakeRktCli{
cmds: []string{},
result: []string{},
}
}
func (f *fakeRktCli) RunCommand(args ...string) (result []string, err error) {
f.Lock()
defer f.Unlock()
cmd := append([]string{"rkt"}, args...)
f.cmds = append(f.cmds, strings.Join(cmd, " "))
return f.result, f.err
}
func (f *fakeRktCli) Reset() {
f.cmds = []string{}
f.result = []string{}
f.err = nil
}
type fakePodGetter struct {
pods map[types.UID]*api.Pod
}
func newFakePodGetter() *fakePodGetter {
return &fakePodGetter{pods: make(map[types.UID]*api.Pod)}
}
func (f fakePodGetter) GetPodByUID(uid types.UID) (*api.Pod, bool) {
p, found := f.pods[uid]
return p, found
}

View File

@ -68,7 +68,7 @@ func (r *Runtime) PullImage(image kubecontainer.ImageSpec, pullSecrets []api.Sec
return err return err
} }
if _, err := r.runCommand("fetch", dockerPrefix+img); err != nil { if _, err := r.cli.RunCommand("fetch", dockerPrefix+img); err != nil {
glog.Errorf("Failed to fetch: %v", err) glog.Errorf("Failed to fetch: %v", err)
return err return err
} }
@ -104,7 +104,7 @@ func (r *Runtime) RemoveImage(image kubecontainer.ImageSpec) error {
if err != nil { if err != nil {
return err return err
} }
if _, err := r.runCommand("image", "rm", imageID); err != nil { if _, err := r.cli.RunCommand("image", "rm", imageID); err != nil {
return err return err
} }
return nil return nil

View File

@ -26,6 +26,7 @@ import (
"os/exec" "os/exec"
"path" "path"
"path/filepath" "path/filepath"
"sort"
"strconv" "strconv"
"strings" "strings"
"sync" "sync"
@ -53,7 +54,6 @@ import (
"k8s.io/kubernetes/pkg/util/errors" "k8s.io/kubernetes/pkg/util/errors"
utilexec "k8s.io/kubernetes/pkg/util/exec" utilexec "k8s.io/kubernetes/pkg/util/exec"
"k8s.io/kubernetes/pkg/util/flowcontrol" "k8s.io/kubernetes/pkg/util/flowcontrol"
"k8s.io/kubernetes/pkg/util/sets"
utilstrings "k8s.io/kubernetes/pkg/util/strings" utilstrings "k8s.io/kubernetes/pkg/util/strings"
utilwait "k8s.io/kubernetes/pkg/util/wait" utilwait "k8s.io/kubernetes/pkg/util/wait"
) )
@ -72,7 +72,7 @@ const (
rktDataDir = "/var/lib/rkt" rktDataDir = "/var/lib/rkt"
rktLocalConfigDir = "/etc/rkt" rktLocalConfigDir = "/etc/rkt"
kubernetesUnitPrefix = "k8s" kubernetesUnitPrefix = "k8s_"
unitKubernetesSection = "X-Kubernetes" unitKubernetesSection = "X-Kubernetes"
unitPodName = "POD" unitPodName = "POD"
unitRktID = "RktID" unitRktID = "RktID"
@ -113,6 +113,7 @@ const (
// uses systemd, so in order to run this runtime, systemd must be installed // uses systemd, so in order to run this runtime, systemd must be installed
// on the machine. // on the machine.
type Runtime struct { type Runtime struct {
cli cliInterface
systemd systemdInterface systemd systemdInterface
// The grpc client for rkt api-service. // The grpc client for rkt api-service.
apisvcConn *grpc.ClientConn apisvcConn *grpc.ClientConn
@ -122,6 +123,7 @@ type Runtime struct {
dockerKeyring credentialprovider.DockerKeyring dockerKeyring credentialprovider.DockerKeyring
containerRefManager *kubecontainer.RefManager containerRefManager *kubecontainer.RefManager
podGetter podGetter
runtimeHelper kubecontainer.RuntimeHelper runtimeHelper kubecontainer.RuntimeHelper
recorder record.EventRecorder recorder record.EventRecorder
livenessManager proberesults.Manager livenessManager proberesults.Manager
@ -144,6 +146,18 @@ type volumeGetter interface {
GetVolumes(podUID types.UID) (kubecontainer.VolumeMap, bool) GetVolumes(podUID types.UID) (kubecontainer.VolumeMap, bool)
} }
// TODO(yifan): This duplicates the podGetter in dockertools.
type podGetter interface {
GetPodByUID(types.UID) (*api.Pod, bool)
}
// cliInterface wrapps the command line calls for testing purpose.
type cliInterface interface {
// args are the arguments given to the 'rkt' command,
// e.g. args can be 'rm ${UUID}'.
RunCommand(args ...string) (result []string, err error)
}
// New creates the rkt container runtime which implements the container runtime interface. // New creates the rkt container runtime which implements the container runtime interface.
// It will test if the rkt binary is in the $PATH, and whether we can get the // It will test if the rkt binary is in the $PATH, and whether we can get the
// version of it. If so, creates the rkt container runtime, otherwise returns an error. // version of it. If so, creates the rkt container runtime, otherwise returns an error.
@ -153,6 +167,7 @@ func New(
runtimeHelper kubecontainer.RuntimeHelper, runtimeHelper kubecontainer.RuntimeHelper,
recorder record.EventRecorder, recorder record.EventRecorder,
containerRefManager *kubecontainer.RefManager, containerRefManager *kubecontainer.RefManager,
podGetter podGetter,
livenessManager proberesults.Manager, livenessManager proberesults.Manager,
volumeGetter volumeGetter, volumeGetter volumeGetter,
httpClient kubetypes.HttpGetter, httpClient kubetypes.HttpGetter,
@ -189,18 +204,19 @@ func New(
} }
rkt := &Runtime{ rkt := &Runtime{
os: kubecontainer.RealOS{},
systemd: systemd, systemd: systemd,
apisvcConn: apisvcConn, apisvcConn: apisvcConn,
apisvc: rktapi.NewPublicAPIClient(apisvcConn), apisvc: rktapi.NewPublicAPIClient(apisvcConn),
config: config, config: config,
dockerKeyring: credentialprovider.NewDockerKeyring(), dockerKeyring: credentialprovider.NewDockerKeyring(),
containerRefManager: containerRefManager, containerRefManager: containerRefManager,
podGetter: podGetter,
runtimeHelper: runtimeHelper, runtimeHelper: runtimeHelper,
recorder: recorder, recorder: recorder,
livenessManager: livenessManager, livenessManager: livenessManager,
volumeGetter: volumeGetter, volumeGetter: volumeGetter,
execer: execer, execer: execer,
os: os,
touchPath: touchPath, touchPath: touchPath,
} }
@ -221,6 +237,8 @@ func New(
return nil, fmt.Errorf("rkt: error getting version info: %v", err) return nil, fmt.Errorf("rkt: error getting version info: %v", err)
} }
rkt.cli = rkt
return rkt, nil return rkt, nil
} }
@ -239,9 +257,9 @@ func convertToACName(name string) appctypes.ACName {
return *appctypes.MustACName(acname) return *appctypes.MustACName(acname)
} }
// runCommand invokes rkt binary with arguments and returns the result // RunCommand invokes rkt binary with arguments and returns the result
// from stdout in a list of strings. Each string in the list is a line. // from stdout in a list of strings. Each string in the list is a line.
func (r *Runtime) runCommand(args ...string) ([]string, error) { func (r *Runtime) RunCommand(args ...string) ([]string, error) {
glog.V(4).Info("rkt: Run command:", args) glog.V(4).Info("rkt: Run command:", args)
var stdout, stderr bytes.Buffer var stdout, stderr bytes.Buffer
@ -257,7 +275,11 @@ func (r *Runtime) runCommand(args ...string) ([]string, error) {
func makePodServiceFileName(uuid string) string { func makePodServiceFileName(uuid string) string {
// TODO(yifan): Add name for readability? We need to consider the // TODO(yifan): Add name for readability? We need to consider the
// limit of the length. // limit of the length.
return fmt.Sprintf("%s_%s.service", kubernetesUnitPrefix, uuid) return fmt.Sprintf("%s%s.service", kubernetesUnitPrefix, uuid)
}
func getRktUUIDFromServiceFileName(filename string) string {
return strings.TrimPrefix(strings.TrimSuffix(filename, path.Ext(filename)), kubernetesUnitPrefix)
} }
// setIsolators sets the apps' isolators according to the security context and resource spec. // setIsolators sets the apps' isolators according to the security context and resource spec.
@ -629,7 +651,7 @@ func (r *Runtime) podFinishedAt(podUID types.UID, rktUID string) time.Time {
return stat.ModTime() return stat.ModTime()
} }
func makeContainerLogMount(opts *kubecontainer.RunContainerOptions, container *api.Container) (*kubecontainer.Mount, error) { func (r *Runtime) makeContainerLogMount(opts *kubecontainer.RunContainerOptions, container *api.Container) (*kubecontainer.Mount, error) {
if opts.PodContainerDir == "" || container.TerminationMessagePath == "" { if opts.PodContainerDir == "" || container.TerminationMessagePath == "" {
return nil, nil return nil, nil
} }
@ -641,7 +663,7 @@ func makeContainerLogMount(opts *kubecontainer.RunContainerOptions, container *a
// on the disk. // on the disk.
randomUID := util.NewUUID() randomUID := util.NewUUID()
containerLogPath := path.Join(opts.PodContainerDir, string(randomUID)) containerLogPath := path.Join(opts.PodContainerDir, string(randomUID))
fs, err := os.Create(containerLogPath) fs, err := r.os.Create(containerLogPath)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -693,7 +715,7 @@ func (r *Runtime) newAppcRuntimeApp(pod *api.Pod, c api.Container, pullSecrets [
} }
// create the container log file and make a mount pair. // create the container log file and make a mount pair.
mnt, err := makeContainerLogMount(opts, &c) mnt, err := r.makeContainerLogMount(opts, &c)
if err != nil { if err != nil {
return err return err
} }
@ -798,6 +820,19 @@ func kubernetesPodFilters(uid types.UID) []*rktapi.PodFilter {
} }
} }
func kubernetesPodsFilters() []*rktapi.PodFilter {
return []*rktapi.PodFilter{
{
Annotations: []*rktapi.KeyValue{
{
Key: k8sRktKubeletAnno,
Value: k8sRktKubeletAnnoValue,
},
},
},
}
}
func newUnitOption(section, name, value string) *unit.UnitOption { func newUnitOption(section, name, value string) *unit.UnitOption {
return &unit.UnitOption{Section: section, Name: name, Value: value} return &unit.UnitOption{Section: section, Name: name, Value: value}
} }
@ -837,7 +872,7 @@ func (r *Runtime) generateRunCommand(pod *api.Pod, uuid string) (string, error)
runPrepared = append(runPrepared, "--net=host") runPrepared = append(runPrepared, "--net=host")
// TODO(yifan): Let runtimeHelper.GeneratePodHostNameAndDomain() to handle this. // TODO(yifan): Let runtimeHelper.GeneratePodHostNameAndDomain() to handle this.
hostname, err = os.Hostname() hostname, err = r.os.Hostname()
if err != nil { if err != nil {
return "", err return "", err
} }
@ -889,7 +924,7 @@ func (r *Runtime) preparePod(pod *api.Pod, pullSecrets []api.Secret) (string, *k
} }
defer func() { defer func() {
manifestFile.Close() manifestFile.Close()
if err := os.Remove(manifestFile.Name()); err != nil { if err := r.os.Remove(manifestFile.Name()); err != nil {
glog.Warningf("rkt: Cannot remove temp manifest file %q: %v", manifestFile.Name(), err) glog.Warningf("rkt: Cannot remove temp manifest file %q: %v", manifestFile.Name(), err)
} }
}() }()
@ -911,7 +946,7 @@ func (r *Runtime) preparePod(pod *api.Pod, pullSecrets []api.Secret) (string, *k
if r.config.Stage1Image != "" { if r.config.Stage1Image != "" {
cmds = append(cmds, "--stage1-path", r.config.Stage1Image) cmds = append(cmds, "--stage1-path", r.config.Stage1Image)
} }
output, err := r.runCommand(cmds...) output, err := r.cli.RunCommand(cmds...)
if err != nil { if err != nil {
return "", nil, err return "", nil, err
} }
@ -941,7 +976,7 @@ func (r *Runtime) preparePod(pod *api.Pod, pullSecrets []api.Secret) (string, *k
serviceName := makePodServiceFileName(uuid) serviceName := makePodServiceFileName(uuid)
glog.V(4).Infof("rkt: Creating service file %q for pod %q", serviceName, format.Pod(pod)) glog.V(4).Infof("rkt: Creating service file %q for pod %q", serviceName, format.Pod(pod))
serviceFile, err := os.Create(serviceFilePath(serviceName)) serviceFile, err := r.os.Create(serviceFilePath(serviceName))
if err != nil { if err != nil {
return "", nil, err return "", nil, err
} }
@ -1309,7 +1344,7 @@ func (r *Runtime) KillPod(pod *api.Pod, runningPod kubecontainer.Pod, gracePerio
// Touch the systemd service file to update the mod time so it will // Touch the systemd service file to update the mod time so it will
// not be garbage collected too soon. // not be garbage collected too soon.
if err := os.Chtimes(serviceFilePath(serviceName), time.Now(), time.Now()); err != nil { if err := r.os.Chtimes(serviceFilePath(serviceName), time.Now(), time.Now()); err != nil {
glog.Errorf("rkt: Failed to change the modification time of the service file %q: %v", serviceName, err) glog.Errorf("rkt: Failed to change the modification time of the service file %q: %v", serviceName, err)
return err return err
} }
@ -1426,45 +1461,142 @@ func (r *Runtime) SyncPod(pod *api.Pod, podStatus api.PodStatus, internalPodStat
return return
} }
// Sort rkt pods by creation time.
type podsByCreatedAt []*rktapi.Pod
func (s podsByCreatedAt) Len() int { return len(s) }
func (s podsByCreatedAt) Swap(i, j int) { s[i], s[j] = s[j], s[i] }
func (s podsByCreatedAt) Less(i, j int) bool { return s[i].CreatedAt < s[j].CreatedAt }
// getPodUID returns the pod's API UID, it returns
// empty UID if the UID cannot be determined.
func getPodUID(pod *rktapi.Pod) types.UID {
for _, anno := range pod.Annotations {
if anno.Key == k8sRktUIDAnno {
return types.UID(anno.Value)
}
}
return types.UID("")
}
// podIsActive returns true if the pod is embryo, preparing or running.
// If a pod is prepared, it is not guaranteed to be active (e.g. the systemd
// service might fail).
func podIsActive(pod *rktapi.Pod) bool {
return pod.State == rktapi.PodState_POD_STATE_EMBRYO ||
pod.State == rktapi.PodState_POD_STATE_PREPARING ||
pod.State == rktapi.PodState_POD_STATE_RUNNING
}
// GarbageCollect collects the pods/containers. // GarbageCollect collects the pods/containers.
// TODO(yifan): Enforce the gc policy, also, it would be better if we can // After one GC iteration:
// just GC kubernetes pods. // - The deleted pods will be removed.
// - If the number of containers exceeds gcPolicy.MaxContainers,
// then containers whose ages are older than gcPolicy.minAge will
// be removed.
func (r *Runtime) GarbageCollect(gcPolicy kubecontainer.ContainerGCPolicy) error { func (r *Runtime) GarbageCollect(gcPolicy kubecontainer.ContainerGCPolicy) error {
if err := exec.Command("systemctl", "reset-failed").Run(); err != nil { var errlist []error
var totalInactiveContainers int
var inactivePods []*rktapi.Pod
var removeCandidates []*rktapi.Pod
var allPods = map[string]*rktapi.Pod{}
glog.V(4).Infof("rkt: Garbage collecting triggered with policy %v", gcPolicy)
if err := r.systemd.ResetFailed(); err != nil {
glog.Errorf("rkt: Failed to reset failed systemd services: %v, continue to gc anyway...", err) glog.Errorf("rkt: Failed to reset failed systemd services: %v, continue to gc anyway...", err)
} }
if _, err := r.runCommand("gc", "--grace-period="+gcPolicy.MinAge.String(), "--expire-prepared="+gcPolicy.MinAge.String()); err != nil { // GC all inactive systemd service files and pods.
glog.Errorf("rkt: Failed to gc: %v", err) files, err := r.os.ReadDir(systemdServiceDir)
}
// GC all inactive systemd service files.
units, err := r.systemd.ListUnits()
if err != nil {
glog.Errorf("rkt: Failed to list units: %v", err)
return err
}
runningKubernetesUnits := sets.NewString()
for _, u := range units {
if strings.HasPrefix(u.Name, kubernetesUnitPrefix) && u.SubState == "running" {
runningKubernetesUnits.Insert(u.Name)
}
}
files, err := ioutil.ReadDir(systemdServiceDir)
if err != nil { if err != nil {
glog.Errorf("rkt: Failed to read the systemd service directory: %v", err) glog.Errorf("rkt: Failed to read the systemd service directory: %v", err)
return err return err
} }
resp, err := r.apisvc.ListPods(context.Background(), &rktapi.ListPodsRequest{Filters: kubernetesPodsFilters()})
if err != nil {
glog.Errorf("rkt: Failed to list pods: %v", err)
return err
}
// Mark inactive pods.
for _, pod := range resp.Pods {
allPods[pod.Id] = pod
if !podIsActive(pod) {
uid := getPodUID(pod)
if uid == types.UID("") {
glog.Errorf("rkt: Cannot get the UID of pod %q, pod is broken, will remove it", pod.Id)
removeCandidates = append(removeCandidates, pod)
continue
}
_, found := r.podGetter.GetPodByUID(uid)
if !found {
removeCandidates = append(removeCandidates, pod)
continue
}
inactivePods = append(inactivePods, pod)
totalInactiveContainers = totalInactiveContainers + len(pod.Apps)
}
}
// Remove any orphan service files.
for _, f := range files { for _, f := range files {
if strings.HasPrefix(f.Name(), kubernetesUnitPrefix) && !runningKubernetesUnits.Has(f.Name()) && f.ModTime().Before(time.Now().Add(-gcPolicy.MinAge)) { serviceName := f.Name()
glog.V(4).Infof("rkt: Removing inactive systemd service file: %v", f.Name()) if strings.HasPrefix(serviceName, kubernetesUnitPrefix) {
if err := os.Remove(serviceFilePath(f.Name())); err != nil { rktUUID := getRktUUIDFromServiceFileName(serviceName)
glog.Warningf("rkt: Failed to remove inactive systemd service file %v: %v", f.Name(), err) if _, ok := allPods[rktUUID]; !ok {
glog.V(4).Infof("rkt: No rkt pod found for service file %q, will remove it", serviceName)
if err := r.os.Remove(serviceFilePath(serviceName)); err != nil {
errlist = append(errlist, fmt.Errorf("rkt: Failed to remove service file %q: %v", serviceName, err))
}
} }
} }
} }
return nil
sort.Sort(podsByCreatedAt(inactivePods))
// Enforce GCPolicy.MaxContainers.
for _, pod := range inactivePods {
if totalInactiveContainers <= gcPolicy.MaxContainers {
break
}
creationTime := time.Unix(0, pod.CreatedAt)
if creationTime.Add(gcPolicy.MinAge).Before(time.Now()) {
// The pod is old and we are exceeding the MaxContainers limit.
// Delete the pod.
removeCandidates = append(removeCandidates, pod)
totalInactiveContainers = totalInactiveContainers - len(pod.Apps)
}
}
// Remove pods and their servie files.
for _, pod := range removeCandidates {
if err := r.removePod(pod.Id); err != nil {
errlist = append(errlist, fmt.Errorf("rkt: Failed to clean up rkt pod %q: %v", pod.Id, err))
}
}
return errors.NewAggregate(errlist)
}
// removePod calls 'rkt rm $UUID' to delete a rkt pod, it also remove the systemd service file
// related to the pod.
func (r *Runtime) removePod(uuid string) error {
var errlist []error
glog.V(4).Infof("rkt: GC is removing pod %q", uuid)
if _, err := r.cli.RunCommand("rm", uuid); err != nil {
errlist = append(errlist, fmt.Errorf("rkt: Failed to remove pod %q: %v", uuid, err))
}
// GC systemd service files as well.
serviceName := makePodServiceFileName(uuid)
if err := r.os.Remove(serviceFilePath(serviceName)); err != nil {
errlist = append(errlist, fmt.Errorf("rkt: Failed to remove service file %q for pod %q: %v", serviceName, uuid, err))
}
return errors.NewAggregate(errlist)
} }
// Note: In rkt, the container ID is in the form of "UUID:appName", where // Note: In rkt, the container ID is in the form of "UUID:appName", where
@ -1542,7 +1674,7 @@ func (r *Runtime) ExecInContainer(containerID kubecontainer.ContainerID, cmd []s
// This way, if you run 'kubectl exec <pod> -i bash' (no tty) and type 'exit', // This way, if you run 'kubectl exec <pod> -i bash' (no tty) and type 'exit',
// the call below to command.Run() can unblock because its Stdin is the read half // the call below to command.Run() can unblock because its Stdin is the read half
// of the pipe. // of the pipe.
r, w, err := os.Pipe() r, w, err := r.os.Pipe()
if err != nil { if err != nil {
return err return err
} }

View File

@ -33,8 +33,10 @@ import (
"k8s.io/kubernetes/pkg/api/resource" "k8s.io/kubernetes/pkg/api/resource"
kubecontainer "k8s.io/kubernetes/pkg/kubelet/container" kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
containertesting "k8s.io/kubernetes/pkg/kubelet/container/testing" containertesting "k8s.io/kubernetes/pkg/kubelet/container/testing"
kubetesting "k8s.io/kubernetes/pkg/kubelet/container/testing"
"k8s.io/kubernetes/pkg/kubelet/lifecycle" "k8s.io/kubernetes/pkg/kubelet/lifecycle"
"k8s.io/kubernetes/pkg/kubelet/rkt/mock_os" "k8s.io/kubernetes/pkg/kubelet/rkt/mock_os"
"k8s.io/kubernetes/pkg/types"
"k8s.io/kubernetes/pkg/util/errors" "k8s.io/kubernetes/pkg/util/errors"
utiltesting "k8s.io/kubernetes/pkg/util/testing" utiltesting "k8s.io/kubernetes/pkg/util/testing"
) )
@ -1072,11 +1074,7 @@ func TestSetApp(t *testing.T) {
} }
func TestGenerateRunCommand(t *testing.T) { func TestGenerateRunCommand(t *testing.T) {
hostName, err := os.Hostname() hostName := "test-hostname"
if err != nil {
t.Fatalf("Cannot get the hostname: %v", err)
}
tests := []struct { tests := []struct {
pod *api.Pod pod *api.Pod
uuid string uuid string
@ -1177,6 +1175,7 @@ func TestGenerateRunCommand(t *testing.T) {
} }
rkt := &Runtime{ rkt := &Runtime{
os: &kubetesting.FakeOS{HostName: hostName},
config: &Config{ config: &Config{
Path: "/bin/rkt/rkt", Path: "/bin/rkt/rkt",
Stage1Image: "/bin/rkt/stage1-coreos.aci", Stage1Image: "/bin/rkt/stage1-coreos.aci",
@ -1397,3 +1396,218 @@ func TestImageStats(t *testing.T) {
assert.NoError(t, err) assert.NoError(t, err)
assert.Equal(t, result, &kubecontainer.ImageStats{TotalStorageBytes: 600}) assert.Equal(t, result, &kubecontainer.ImageStats{TotalStorageBytes: 600})
} }
func TestGarbageCollect(t *testing.T) {
fr := newFakeRktInterface()
fs := newFakeSystemd()
cli := newFakeRktCli()
fakeOS := kubetesting.NewFakeOS()
getter := newFakePodGetter()
rkt := &Runtime{
os: fakeOS,
cli: cli,
apisvc: fr,
podGetter: getter,
systemd: fs,
containerRefManager: kubecontainer.NewRefManager(),
}
fakeApp := &rktapi.App{Name: "app-foo"}
tests := []struct {
gcPolicy kubecontainer.ContainerGCPolicy
apiPods []*api.Pod
pods []*rktapi.Pod
serviceFilesOnDisk []string
expectedCommands []string
expectedServiceFiles []string
}{
// All running pods, should not be gc'd.
// Dead, new pods should not be gc'd.
// Dead, old pods should be gc'd.
// Deleted pods should be gc'd.
// Service files without corresponded pods should be removed.
{
kubecontainer.ContainerGCPolicy{
MinAge: 0,
MaxContainers: 0,
},
[]*api.Pod{
{ObjectMeta: api.ObjectMeta{UID: "pod-uid-1"}},
{ObjectMeta: api.ObjectMeta{UID: "pod-uid-2"}},
{ObjectMeta: api.ObjectMeta{UID: "pod-uid-3"}},
{ObjectMeta: api.ObjectMeta{UID: "pod-uid-4"}},
},
[]*rktapi.Pod{
{
Id: "deleted-foo",
State: rktapi.PodState_POD_STATE_EXITED,
CreatedAt: time.Now().Add(time.Hour).UnixNano(),
StartedAt: time.Now().Add(time.Hour).UnixNano(),
Apps: []*rktapi.App{fakeApp},
Annotations: []*rktapi.KeyValue{
{
Key: k8sRktUIDAnno,
Value: "pod-uid-0",
},
},
},
{
Id: "running-foo",
State: rktapi.PodState_POD_STATE_RUNNING,
CreatedAt: 0,
StartedAt: 0,
Apps: []*rktapi.App{fakeApp},
Annotations: []*rktapi.KeyValue{
{
Key: k8sRktUIDAnno,
Value: "pod-uid-1",
},
},
},
{
Id: "running-bar",
State: rktapi.PodState_POD_STATE_RUNNING,
CreatedAt: 0,
StartedAt: 0,
Apps: []*rktapi.App{fakeApp},
Annotations: []*rktapi.KeyValue{
{
Key: k8sRktUIDAnno,
Value: "pod-uid-2",
},
},
},
{
Id: "dead-old",
State: rktapi.PodState_POD_STATE_EXITED,
CreatedAt: 0,
StartedAt: 0,
Apps: []*rktapi.App{fakeApp},
Annotations: []*rktapi.KeyValue{
{
Key: k8sRktUIDAnno,
Value: "pod-uid-3",
},
},
},
{
Id: "dead-new",
State: rktapi.PodState_POD_STATE_EXITED,
CreatedAt: time.Now().Add(time.Hour).UnixNano(),
StartedAt: time.Now().Add(time.Hour).UnixNano(),
Apps: []*rktapi.App{fakeApp},
Annotations: []*rktapi.KeyValue{
{
Key: k8sRktUIDAnno,
Value: "pod-uid-4",
},
},
},
},
[]string{"k8s_dead-old.service", "k8s_deleted-foo.service", "k8s_non-existing-bar.service"},
[]string{"rkt rm dead-old", "rkt rm deleted-foo"},
[]string{"/run/systemd/system/k8s_dead-old.service", "/run/systemd/system/k8s_deleted-foo.service", "/run/systemd/system/k8s_non-existing-bar.service"},
},
// gcPolicy.MaxContainers should be enforced.
// Oldest ones are removed first.
{
kubecontainer.ContainerGCPolicy{
MinAge: 0,
MaxContainers: 1,
},
[]*api.Pod{
{ObjectMeta: api.ObjectMeta{UID: "pod-uid-0"}},
{ObjectMeta: api.ObjectMeta{UID: "pod-uid-1"}},
{ObjectMeta: api.ObjectMeta{UID: "pod-uid-2"}},
},
[]*rktapi.Pod{
{
Id: "dead-2",
State: rktapi.PodState_POD_STATE_EXITED,
CreatedAt: 2,
StartedAt: 2,
Apps: []*rktapi.App{fakeApp},
Annotations: []*rktapi.KeyValue{
{
Key: k8sRktUIDAnno,
Value: "pod-uid-2",
},
},
},
{
Id: "dead-1",
State: rktapi.PodState_POD_STATE_EXITED,
CreatedAt: 1,
StartedAt: 1,
Apps: []*rktapi.App{fakeApp},
Annotations: []*rktapi.KeyValue{
{
Key: k8sRktUIDAnno,
Value: "pod-uid-1",
},
},
},
{
Id: "dead-0",
State: rktapi.PodState_POD_STATE_EXITED,
CreatedAt: 0,
StartedAt: 0,
Apps: []*rktapi.App{fakeApp},
Annotations: []*rktapi.KeyValue{
{
Key: k8sRktUIDAnno,
Value: "pod-uid-0",
},
},
},
},
[]string{"k8s_dead-0.service", "k8s_dead-1.service", "k8s_dead-2.service"},
[]string{"rkt rm dead-0", "rkt rm dead-1"},
[]string{"/run/systemd/system/k8s_dead-0.service", "/run/systemd/system/k8s_dead-1.service"},
},
}
for i, tt := range tests {
testCaseHint := fmt.Sprintf("test case #%d", i)
ctrl := gomock.NewController(t)
fakeOS.ReadDirFn = func(dirname string) ([]os.FileInfo, error) {
serviceFileNames := tt.serviceFilesOnDisk
var fileInfos []os.FileInfo
for _, name := range serviceFileNames {
mockFI := mock_os.NewMockFileInfo(ctrl)
mockFI.EXPECT().Name().Return(name)
fileInfos = append(fileInfos, mockFI)
}
return fileInfos, nil
}
fr.pods = tt.pods
for _, p := range tt.apiPods {
getter.pods[p.UID] = p
}
err := rkt.GarbageCollect(tt.gcPolicy)
assert.NoError(t, err, testCaseHint)
sort.Sort(sortedStringList(tt.expectedCommands))
sort.Sort(sortedStringList(cli.cmds))
assert.Equal(t, tt.expectedCommands, cli.cmds, testCaseHint)
sort.Sort(sortedStringList(tt.expectedServiceFiles))
sort.Sort(sortedStringList(fakeOS.Removes))
assert.Equal(t, tt.expectedServiceFiles, fakeOS.Removes, testCaseHint)
// Cleanup after each test.
cli.Reset()
ctrl.Finish()
fakeOS.Removes = []string{}
getter.pods = make(map[types.UID]*api.Pod)
}
}

View File

@ -62,6 +62,8 @@ type systemdInterface interface {
RestartUnit(name string, mode string, ch chan<- string) (int, error) RestartUnit(name string, mode string, ch chan<- string) (int, error)
// Reload is equivalent to 'systemctl daemon-reload'. // Reload is equivalent to 'systemctl daemon-reload'.
Reload() error Reload() error
// ResetFailed is equivalent to 'systemctl reset-failed'.
ResetFailed() error
} }
// systemd implements the systemdInterface using dbus and systemctl. // systemd implements the systemdInterface using dbus and systemctl.
@ -101,3 +103,8 @@ func (s *systemd) Version() (systemdVersion, error) {
} }
return systemdVersion(result), nil return systemdVersion(result), nil
} }
// ResetFailed calls 'systemctl reset failed'
func (s *systemd) ResetFailed() error {
return exec.Command("systemctl", "reset-failed").Run()
}

View File

@ -69,7 +69,7 @@ func TestRunOnce(t *testing.T) {
statusManager: status.NewManager(nil, podManager), statusManager: status.NewManager(nil, podManager),
containerRefManager: kubecontainer.NewRefManager(), containerRefManager: kubecontainer.NewRefManager(),
podManager: podManager, podManager: podManager,
os: containertest.FakeOS{}, os: &containertest.FakeOS{},
volumeManager: newVolumeManager(), volumeManager: newVolumeManager(),
diskSpaceManager: diskSpaceManager, diskSpaceManager: diskSpaceManager,
containerRuntime: fakeRuntime, containerRuntime: fakeRuntime,

View File

@ -65,14 +65,14 @@ func NewHollowKubelet(
cadvisorInterface, cadvisorInterface,
manifestFilePath, manifestFilePath,
nil, /* cloud-provider */ nil, /* cloud-provider */
containertest.FakeOS{}, /* os-interface */ &containertest.FakeOS{}, /* os-interface */
20*time.Second, /* FileCheckFrequency */ 20*time.Second, /* FileCheckFrequency */
20*time.Second, /* HTTPCheckFrequency */ 20*time.Second, /* HTTPCheckFrequency */
1*time.Minute, /* MinimumGCAge */ 1*time.Minute, /* MinimumGCAge */
10*time.Second, /* NodeStatusUpdateFrequency */ 10*time.Second, /* NodeStatusUpdateFrequency */
10*time.Second, /* SyncFrequency */ 10*time.Second, /* SyncFrequency */
5*time.Minute, /* OutOfDiskTransitionFrequency */ 5*time.Minute, /* OutOfDiskTransitionFrequency */
5*time.Minute, /* EvictionPressureTransitionPeriod */ 5*time.Minute, /* EvictionPressureTransitionPeriod */
maxPods, maxPods,
containerManager, containerManager,
nil, nil,