From 06b1955c4aeade07c5bea359ab10c69136bea355 Mon Sep 17 00:00:00 2001 From: Yifan Gu Date: Wed, 20 Apr 2016 17:49:08 -0700 Subject: [PATCH 1/2] rkt: Refactor GarbageCollect() to enforce GCPolicy.MaxContainers. --- pkg/kubelet/kubelet.go | 1 + pkg/kubelet/rkt/rkt.go | 186 ++++++++++++++++++++++++++++++++++------- 2 files changed, 158 insertions(+), 29 deletions(-) diff --git a/pkg/kubelet/kubelet.go b/pkg/kubelet/kubelet.go index 595e0650a8a..5000e1ce29c 100644 --- a/pkg/kubelet/kubelet.go +++ b/pkg/kubelet/kubelet.go @@ -430,6 +430,7 @@ func NewMainKubelet( klet, recorder, containerRefManager, + klet.podManager, klet.livenessManager, klet.volumeManager, klet.httpClient, diff --git a/pkg/kubelet/rkt/rkt.go b/pkg/kubelet/rkt/rkt.go index 940b7d0c18a..8f8624e952c 100644 --- a/pkg/kubelet/rkt/rkt.go +++ b/pkg/kubelet/rkt/rkt.go @@ -26,6 +26,7 @@ import ( "os/exec" "path" "path/filepath" + "sort" "strconv" "strings" "sync" @@ -53,7 +54,6 @@ import ( "k8s.io/kubernetes/pkg/util/errors" utilexec "k8s.io/kubernetes/pkg/util/exec" "k8s.io/kubernetes/pkg/util/flowcontrol" - "k8s.io/kubernetes/pkg/util/sets" utilstrings "k8s.io/kubernetes/pkg/util/strings" utilwait "k8s.io/kubernetes/pkg/util/wait" ) @@ -72,7 +72,7 @@ const ( rktDataDir = "/var/lib/rkt" rktLocalConfigDir = "/etc/rkt" - kubernetesUnitPrefix = "k8s" + kubernetesUnitPrefix = "k8s_" unitKubernetesSection = "X-Kubernetes" unitPodName = "POD" unitRktID = "RktID" @@ -122,6 +122,7 @@ type Runtime struct { dockerKeyring credentialprovider.DockerKeyring containerRefManager *kubecontainer.RefManager + podGetter podGetter runtimeHelper kubecontainer.RuntimeHelper recorder record.EventRecorder livenessManager proberesults.Manager @@ -144,6 +145,18 @@ type volumeGetter interface { GetVolumes(podUID types.UID) (kubecontainer.VolumeMap, bool) } +// TODO(yifan): This duplicates the podGetter in dockertools. +type podGetter interface { + GetPodByUID(types.UID) (*api.Pod, bool) +} + +// cliInterface wrapps the command line calls for testing purpose. +type cliInterface interface { + // args are the arguments given to the 'rkt' command, + // e.g. args can be 'rm ${UUID}'. + RunCommand(args ...string) (result []string, err error) +} + // New creates the rkt container runtime which implements the container runtime interface. // It will test if the rkt binary is in the $PATH, and whether we can get the // version of it. If so, creates the rkt container runtime, otherwise returns an error. @@ -153,6 +166,7 @@ func New( runtimeHelper kubecontainer.RuntimeHelper, recorder record.EventRecorder, containerRefManager *kubecontainer.RefManager, + podGetter podGetter, livenessManager proberesults.Manager, volumeGetter volumeGetter, httpClient kubetypes.HttpGetter, @@ -195,12 +209,12 @@ func New( config: config, dockerKeyring: credentialprovider.NewDockerKeyring(), containerRefManager: containerRefManager, + podGetter: podGetter, runtimeHelper: runtimeHelper, recorder: recorder, livenessManager: livenessManager, volumeGetter: volumeGetter, execer: execer, - os: os, touchPath: touchPath, } @@ -257,7 +271,11 @@ func (r *Runtime) runCommand(args ...string) ([]string, error) { func makePodServiceFileName(uuid string) string { // TODO(yifan): Add name for readability? We need to consider the // limit of the length. - return fmt.Sprintf("%s_%s.service", kubernetesUnitPrefix, uuid) + return fmt.Sprintf("%s%s.service", kubernetesUnitPrefix, uuid) +} + +func getRktUUIDFromServiceFileName(filename string) string { + return strings.TrimPrefix(strings.TrimSuffix(filename, path.Ext(filename)), kubernetesUnitPrefix) } // setIsolators sets the apps' isolators according to the security context and resource spec. @@ -798,6 +816,19 @@ func kubernetesPodFilters(uid types.UID) []*rktapi.PodFilter { } } +func kubernetesPodsFilters() []*rktapi.PodFilter { + return []*rktapi.PodFilter{ + { + Annotations: []*rktapi.KeyValue{ + { + Key: k8sRktKubeletAnno, + Value: k8sRktKubeletAnnoValue, + }, + }, + }, + } +} + func newUnitOption(section, name, value string) *unit.UnitOption { return &unit.UnitOption{Section: section, Name: name, Value: value} } @@ -1426,45 +1457,142 @@ func (r *Runtime) SyncPod(pod *api.Pod, podStatus api.PodStatus, internalPodStat return } +// Sort rkt pods by creation time. +type podsByCreatedAt []*rktapi.Pod + +func (s podsByCreatedAt) Len() int { return len(s) } +func (s podsByCreatedAt) Swap(i, j int) { s[i], s[j] = s[j], s[i] } +func (s podsByCreatedAt) Less(i, j int) bool { return s[i].CreatedAt < s[j].CreatedAt } + +// getPodUID returns the pod's API UID, it returns +// empty UID if the UID cannot be determined. +func getPodUID(pod *rktapi.Pod) types.UID { + for _, anno := range pod.Annotations { + if anno.Key == k8sRktUIDAnno { + return types.UID(anno.Value) + } + } + return types.UID("") +} + +// podIsActive returns true if the pod is embryo, preparing or running. +// If a pod is prepared, it is not guaranteed to be active (e.g. the systemd +// service might fail). +func podIsActive(pod *rktapi.Pod) bool { + return pod.State == rktapi.PodState_POD_STATE_EMBRYO || + pod.State == rktapi.PodState_POD_STATE_PREPARING || + pod.State == rktapi.PodState_POD_STATE_RUNNING +} + // GarbageCollect collects the pods/containers. -// TODO(yifan): Enforce the gc policy, also, it would be better if we can -// just GC kubernetes pods. +// After one GC iteration: +// - The deleted pods will be removed. +// - If the number of containers exceeds gcPolicy.MaxContainers, +// then containers whose ages are older than gcPolicy.minAge will +// be removed. func (r *Runtime) GarbageCollect(gcPolicy kubecontainer.ContainerGCPolicy) error { + var errlist []error + var totalInactiveContainers int + var inactivePods []*rktapi.Pod + var removeCandidates []*rktapi.Pod + var allPods = map[string]*rktapi.Pod{} + + glog.V(4).Infof("rkt: Garbage collecting triggered with policy %v", gcPolicy) + if err := exec.Command("systemctl", "reset-failed").Run(); err != nil { glog.Errorf("rkt: Failed to reset failed systemd services: %v, continue to gc anyway...", err) } - if _, err := r.runCommand("gc", "--grace-period="+gcPolicy.MinAge.String(), "--expire-prepared="+gcPolicy.MinAge.String()); err != nil { - glog.Errorf("rkt: Failed to gc: %v", err) - } - - // GC all inactive systemd service files. - units, err := r.systemd.ListUnits() - if err != nil { - glog.Errorf("rkt: Failed to list units: %v", err) - return err - } - runningKubernetesUnits := sets.NewString() - for _, u := range units { - if strings.HasPrefix(u.Name, kubernetesUnitPrefix) && u.SubState == "running" { - runningKubernetesUnits.Insert(u.Name) - } - } - - files, err := ioutil.ReadDir(systemdServiceDir) + // GC all inactive systemd service files and pods. + files, err := r.os.ReadDir(systemdServiceDir) if err != nil { glog.Errorf("rkt: Failed to read the systemd service directory: %v", err) return err } + + resp, err := r.apisvc.ListPods(context.Background(), &rktapi.ListPodsRequest{Filters: kubernetesPodsFilters()}) + if err != nil { + glog.Errorf("rkt: Failed to list pods: %v", err) + return err + } + + // Mark inactive pods. + for _, pod := range resp.Pods { + allPods[pod.Id] = pod + if !podIsActive(pod) { + uid := getPodUID(pod) + if uid == types.UID("") { + glog.Errorf("rkt: Cannot get the UID of pod %q, pod is broken, will remove it", pod.Id) + removeCandidates = append(removeCandidates, pod) + continue + } + _, found := r.podGetter.GetPodByUID(uid) + if !found { + removeCandidates = append(removeCandidates, pod) + continue + } + + inactivePods = append(inactivePods, pod) + totalInactiveContainers = totalInactiveContainers + len(pod.Apps) + } + } + + // Remove any orphan service files. for _, f := range files { - if strings.HasPrefix(f.Name(), kubernetesUnitPrefix) && !runningKubernetesUnits.Has(f.Name()) && f.ModTime().Before(time.Now().Add(-gcPolicy.MinAge)) { - glog.V(4).Infof("rkt: Removing inactive systemd service file: %v", f.Name()) - if err := os.Remove(serviceFilePath(f.Name())); err != nil { - glog.Warningf("rkt: Failed to remove inactive systemd service file %v: %v", f.Name(), err) + serviceName := f.Name() + if strings.HasPrefix(serviceName, kubernetesUnitPrefix) { + rktUUID := getRktUUIDFromServiceFileName(serviceName) + if _, ok := allPods[rktUUID]; !ok { + glog.V(4).Infof("rkt: No rkt pod found for service file %q, will remove it", serviceName) + if err := r.os.Remove(serviceFilePath(serviceName)); err != nil { + errlist = append(errlist, fmt.Errorf("rkt: Failed to remove service file %q: %v", serviceName, err)) + } } } } - return nil + + sort.Sort(podsByCreatedAt(inactivePods)) + + // Enforce GCPolicy.MaxContainers. + for _, pod := range inactivePods { + if totalInactiveContainers <= gcPolicy.MaxContainers { + break + } + creationTime := time.Unix(0, pod.CreatedAt) + if creationTime.Add(gcPolicy.MinAge).Before(time.Now()) { + // The pod is old and we are exceeding the MaxContainers limit. + // Delete the pod. + removeCandidates = append(removeCandidates, pod) + totalInactiveContainers = totalInactiveContainers - len(pod.Apps) + } + } + + // Remove pods and their servie files. + for _, pod := range removeCandidates { + if err := r.removePod(pod.Id); err != nil { + errlist = append(errlist, fmt.Errorf("rkt: Failed to clean up rkt pod %q: %v", pod.Id, err)) + } + } + + return errors.NewAggregate(errlist) +} + +// removePod calls 'rkt rm $UUID' to delete a rkt pod, it also remove the systemd service file +// related to the pod. +func (r *Runtime) removePod(uuid string) error { + var errlist []error + glog.V(4).Infof("rkt: GC is removing pod %q", uuid) + if _, err := r.cli.RunCommand("rm", uuid); err != nil { + errlist = append(errlist, fmt.Errorf("rkt: Failed to remove pod %q: %v", uuid, err)) + } + + // GC systemd service files as well. + serviceName := makePodServiceFileName(uuid) + if err := r.os.Remove(serviceFilePath(serviceName)); err != nil { + errlist = append(errlist, fmt.Errorf("rkt: Failed to remove service file %q for pod %q: %v", serviceName, uuid, err)) + } + + return errors.NewAggregate(errlist) } // Note: In rkt, the container ID is in the form of "UUID:appName", where From 9d5bcf4251f7c18735b841cb455ae91188fc157c Mon Sep 17 00:00:00 2001 From: Yifan Gu Date: Wed, 20 Apr 2016 18:21:41 -0700 Subject: [PATCH 2/2] rkt: Add tests for GarbageCollect(). --- cmd/integration/integration.go | 4 +- pkg/kubelet/container/os.go | 39 ++++ pkg/kubelet/container/testing/os.go | 49 ++++- pkg/kubelet/dockertools/docker_test.go | 2 +- pkg/kubelet/dockertools/manager_test.go | 2 +- pkg/kubelet/kubelet_test.go | 2 +- pkg/kubelet/network/cni/cni_test.go | 2 +- pkg/kubelet/rkt/fake_rkt_interface_test.go | 55 ++++- pkg/kubelet/rkt/image.go | 4 +- pkg/kubelet/rkt/rkt.go | 28 +-- pkg/kubelet/rkt/rkt_test.go | 224 ++++++++++++++++++++- pkg/kubelet/rkt/systemd.go | 7 + pkg/kubelet/runonce_test.go | 2 +- pkg/kubemark/hollow_kubelet.go | 16 +- 14 files changed, 397 insertions(+), 39 deletions(-) diff --git a/cmd/integration/integration.go b/cmd/integration/integration.go index a26264a8886..af2239c96c3 100644 --- a/cmd/integration/integration.go +++ b/cmd/integration/integration.go @@ -230,7 +230,7 @@ func startComponents(firstManifestURL, secondManifestURL string) (string, string cadvisorInterface, configFilePath, nil, - containertest.FakeOS{}, + &containertest.FakeOS{}, 1*time.Second, /* FileCheckFrequency */ 1*time.Second, /* HTTPCheckFrequency */ 10*time.Second, /* MinimumGCAge */ @@ -263,7 +263,7 @@ func startComponents(firstManifestURL, secondManifestURL string) (string, string cadvisorInterface, "", nil, - containertest.FakeOS{}, + &containertest.FakeOS{}, 1*time.Second, /* FileCheckFrequency */ 1*time.Second, /* HTTPCheckFrequency */ 10*time.Second, /* MinimumGCAge */ diff --git a/pkg/kubelet/container/os.go b/pkg/kubelet/container/os.go index 7fedd5703f2..b9f7e0e6632 100644 --- a/pkg/kubelet/container/os.go +++ b/pkg/kubelet/container/os.go @@ -17,7 +17,9 @@ limitations under the License. package container import ( + "io/ioutil" "os" + "time" ) // OSInterface collects system level operations that need to be mocked out @@ -26,6 +28,12 @@ type OSInterface interface { Mkdir(path string, perm os.FileMode) error Symlink(oldname string, newname string) error Stat(path string) (os.FileInfo, error) + Remove(path string) error + Create(path string) (*os.File, error) + Hostname() (name string, err error) + Chtimes(path string, atime time.Time, mtime time.Time) error + Pipe() (r *os.File, w *os.File, err error) + ReadDir(dirname string) ([]os.FileInfo, error) } // RealOS is used to dispatch the real system level operaitons. @@ -45,3 +53,34 @@ func (RealOS) Symlink(oldname string, newname string) error { func (RealOS) Stat(path string) (os.FileInfo, error) { return os.Stat(path) } + +// Remove will call os.Remove to remove the path. +func (RealOS) Remove(path string) error { + return os.Remove(path) +} + +// Create will call os.Create to create and return a file +// at path. +func (RealOS) Create(path string) (*os.File, error) { + return os.Create(path) +} + +// Hostname will call os.Hostname to return the hostname. +func (RealOS) Hostname() (name string, err error) { + return os.Hostname() +} + +// Chtimes will call os.Chtimes to change the atime and mtime of the path +func (RealOS) Chtimes(path string, atime time.Time, mtime time.Time) error { + return os.Chtimes(path, atime, mtime) +} + +// Pipe will call os.Pipe to return a connected pair of pipe. +func (RealOS) Pipe() (r *os.File, w *os.File, err error) { + return os.Pipe() +} + +// ReadDir will call ioutil.ReadDir to return the files under the directory. +func (RealOS) ReadDir(dirname string) ([]os.FileInfo, error) { + return ioutil.ReadDir(dirname) +} diff --git a/pkg/kubelet/container/testing/os.go b/pkg/kubelet/container/testing/os.go index 97dd73305c9..0a366a9f790 100644 --- a/pkg/kubelet/container/testing/os.go +++ b/pkg/kubelet/container/testing/os.go @@ -19,14 +19,25 @@ package testing import ( "errors" "os" + "time" ) // FakeOS mocks out certain OS calls to avoid perturbing the filesystem -// on the test machine. // If a member of the form `*Fn` is set, that function will be called in place // of the real call. type FakeOS struct { - StatFn func(string) (os.FileInfo, error) + StatFn func(string) (os.FileInfo, error) + ReadDirFn func(string) ([]os.FileInfo, error) + HostName string + Removes []string + Files map[string][]*os.FileInfo +} + +func NewFakeOS() *FakeOS { + return &FakeOS{ + Removes: []string{}, + Files: make(map[string][]*os.FileInfo), + } } // Mkdir is a fake call that just returns nil. @@ -46,3 +57,37 @@ func (f FakeOS) Stat(path string) (os.FileInfo, error) { } return nil, errors.New("unimplemented testing mock") } + +// Remove is a fake call that returns nil. +func (f *FakeOS) Remove(path string) error { + f.Removes = append(f.Removes, path) + return nil +} + +// Create is a fake call that returns nil. +func (FakeOS) Create(path string) (*os.File, error) { + return nil, nil +} + +// Hostname is a fake call that returns nil. +func (f *FakeOS) Hostname() (name string, err error) { + return f.HostName, nil +} + +// Chtimes is a fake call that returns nil. +func (FakeOS) Chtimes(path string, atime time.Time, mtime time.Time) error { + return nil +} + +// Pipe is a fake call that returns nil. +func (FakeOS) Pipe() (r *os.File, w *os.File, err error) { + return nil, nil, nil +} + +// ReadDir is a fake call that returns the files under the directory. +func (f *FakeOS) ReadDir(dirname string) ([]os.FileInfo, error) { + if f.ReadDirFn != nil { + return f.ReadDirFn(dirname) + } + return nil, errors.New("unimplemented testing mock") +} diff --git a/pkg/kubelet/dockertools/docker_test.go b/pkg/kubelet/dockertools/docker_test.go index bbdde7c7b63..25825b52291 100644 --- a/pkg/kubelet/dockertools/docker_test.go +++ b/pkg/kubelet/dockertools/docker_test.go @@ -653,7 +653,7 @@ func TestFindContainersByPod(t *testing.T) { fakeClient := NewFakeDockerClient() np, _ := network.InitNetworkPlugin([]network.NetworkPlugin{}, "", nettest.NewFakeHost(nil), componentconfig.HairpinNone) // image back-off is set to nil, this test should not pull images - containerManager := NewFakeDockerManager(fakeClient, &record.FakeRecorder{}, nil, nil, &cadvisorapi.MachineInfo{}, options.GetDefaultPodInfraContainerImage(), 0, 0, "", containertest.FakeOS{}, np, nil, nil, nil) + containerManager := NewFakeDockerManager(fakeClient, &record.FakeRecorder{}, nil, nil, &cadvisorapi.MachineInfo{}, options.GetDefaultPodInfraContainerImage(), 0, 0, "", &containertest.FakeOS{}, np, nil, nil, nil) for i, test := range tests { fakeClient.RunningContainerList = test.runningContainerList fakeClient.ExitedContainerList = test.exitedContainerList diff --git a/pkg/kubelet/dockertools/manager_test.go b/pkg/kubelet/dockertools/manager_test.go index c64bf9bb029..8f60c3d898f 100644 --- a/pkg/kubelet/dockertools/manager_test.go +++ b/pkg/kubelet/dockertools/manager_test.go @@ -115,7 +115,7 @@ func createTestDockerManager(fakeHTTPClient *fakeHTTP, fakeDocker *FakeDockerCli &cadvisorapi.MachineInfo{}, options.GetDefaultPodInfraContainerImage(), 0, 0, "", - containertest.FakeOS{}, + &containertest.FakeOS{}, networkPlugin, &fakeRuntimeHelper{}, fakeHTTPClient, diff --git a/pkg/kubelet/kubelet_test.go b/pkg/kubelet/kubelet_test.go index 2c3cc7b15ce..6ad215cd298 100644 --- a/pkg/kubelet/kubelet_test.go +++ b/pkg/kubelet/kubelet_test.go @@ -123,7 +123,7 @@ func newTestKubelet(t *testing.T) *TestKubelet { fakeKubeClient := &fake.Clientset{} kubelet := &Kubelet{} kubelet.kubeClient = fakeKubeClient - kubelet.os = containertest.FakeOS{} + kubelet.os = &containertest.FakeOS{} kubelet.hostname = testKubeletHostname kubelet.nodeName = testKubeletHostname diff --git a/pkg/kubelet/network/cni/cni_test.go b/pkg/kubelet/network/cni/cni_test.go index edc2f029d38..32beb984898 100644 --- a/pkg/kubelet/network/cni/cni_test.go +++ b/pkg/kubelet/network/cni/cni_test.go @@ -154,7 +154,7 @@ func newTestDockerManager() (*dockertools.DockerManager, *dockertools.FakeDocker &cadvisorapi.MachineInfo{}, options.GetDefaultPodInfraContainerImage(), 0, 0, "", - containertest.FakeOS{}, + &containertest.FakeOS{}, networkPlugin, nil, nil, diff --git a/pkg/kubelet/rkt/fake_rkt_interface_test.go b/pkg/kubelet/rkt/fake_rkt_interface_test.go index 71906ea16e7..740701a331b 100644 --- a/pkg/kubelet/rkt/fake_rkt_interface_test.go +++ b/pkg/kubelet/rkt/fake_rkt_interface_test.go @@ -19,16 +19,16 @@ package rkt import ( "fmt" "strconv" + "strings" "sync" - "k8s.io/kubernetes/pkg/api" - "k8s.io/kubernetes/pkg/types" - "github.com/coreos/go-systemd/dbus" rktapi "github.com/coreos/rkt/api/v1alpha" "golang.org/x/net/context" "google.golang.org/grpc" + "k8s.io/kubernetes/pkg/api" kubecontainer "k8s.io/kubernetes/pkg/kubelet/container" + "k8s.io/kubernetes/pkg/types" ) // fakeRktInterface mocks the rktapi.PublicAPIClient interface for testing purpose. @@ -147,6 +147,14 @@ func (f *fakeSystemd) Reload() error { return fmt.Errorf("Not implemented") } +func (f *fakeSystemd) ResetFailed() error { + f.Lock() + defer f.Unlock() + + f.called = append(f.called, "ResetFailed") + return f.err +} + // fakeRuntimeHelper implementes kubecontainer.RuntimeHelper interfaces for testing purpose. type fakeRuntimeHelper struct { dnsServers []string @@ -171,3 +179,44 @@ func (f *fakeRuntimeHelper) GeneratePodHostNameAndDomain(pod *api.Pod) (string, func (f *fakeRuntimeHelper) GetPodDir(podUID types.UID) string { return "/poddir/" + string(podUID) } + +type fakeRktCli struct { + sync.Mutex + cmds []string + result []string + err error +} + +func newFakeRktCli() *fakeRktCli { + return &fakeRktCli{ + cmds: []string{}, + result: []string{}, + } +} + +func (f *fakeRktCli) RunCommand(args ...string) (result []string, err error) { + f.Lock() + defer f.Unlock() + cmd := append([]string{"rkt"}, args...) + f.cmds = append(f.cmds, strings.Join(cmd, " ")) + return f.result, f.err +} + +func (f *fakeRktCli) Reset() { + f.cmds = []string{} + f.result = []string{} + f.err = nil +} + +type fakePodGetter struct { + pods map[types.UID]*api.Pod +} + +func newFakePodGetter() *fakePodGetter { + return &fakePodGetter{pods: make(map[types.UID]*api.Pod)} +} + +func (f fakePodGetter) GetPodByUID(uid types.UID) (*api.Pod, bool) { + p, found := f.pods[uid] + return p, found +} diff --git a/pkg/kubelet/rkt/image.go b/pkg/kubelet/rkt/image.go index aad4eb2f489..3e4166aff74 100644 --- a/pkg/kubelet/rkt/image.go +++ b/pkg/kubelet/rkt/image.go @@ -68,7 +68,7 @@ func (r *Runtime) PullImage(image kubecontainer.ImageSpec, pullSecrets []api.Sec return err } - if _, err := r.runCommand("fetch", dockerPrefix+img); err != nil { + if _, err := r.cli.RunCommand("fetch", dockerPrefix+img); err != nil { glog.Errorf("Failed to fetch: %v", err) return err } @@ -104,7 +104,7 @@ func (r *Runtime) RemoveImage(image kubecontainer.ImageSpec) error { if err != nil { return err } - if _, err := r.runCommand("image", "rm", imageID); err != nil { + if _, err := r.cli.RunCommand("image", "rm", imageID); err != nil { return err } return nil diff --git a/pkg/kubelet/rkt/rkt.go b/pkg/kubelet/rkt/rkt.go index 8f8624e952c..eadac9593f5 100644 --- a/pkg/kubelet/rkt/rkt.go +++ b/pkg/kubelet/rkt/rkt.go @@ -113,6 +113,7 @@ const ( // uses systemd, so in order to run this runtime, systemd must be installed // on the machine. type Runtime struct { + cli cliInterface systemd systemdInterface // The grpc client for rkt api-service. apisvcConn *grpc.ClientConn @@ -203,6 +204,7 @@ func New( } rkt := &Runtime{ + os: kubecontainer.RealOS{}, systemd: systemd, apisvcConn: apisvcConn, apisvc: rktapi.NewPublicAPIClient(apisvcConn), @@ -235,6 +237,8 @@ func New( return nil, fmt.Errorf("rkt: error getting version info: %v", err) } + rkt.cli = rkt + return rkt, nil } @@ -253,9 +257,9 @@ func convertToACName(name string) appctypes.ACName { return *appctypes.MustACName(acname) } -// runCommand invokes rkt binary with arguments and returns the result +// RunCommand invokes rkt binary with arguments and returns the result // from stdout in a list of strings. Each string in the list is a line. -func (r *Runtime) runCommand(args ...string) ([]string, error) { +func (r *Runtime) RunCommand(args ...string) ([]string, error) { glog.V(4).Info("rkt: Run command:", args) var stdout, stderr bytes.Buffer @@ -647,7 +651,7 @@ func (r *Runtime) podFinishedAt(podUID types.UID, rktUID string) time.Time { return stat.ModTime() } -func makeContainerLogMount(opts *kubecontainer.RunContainerOptions, container *api.Container) (*kubecontainer.Mount, error) { +func (r *Runtime) makeContainerLogMount(opts *kubecontainer.RunContainerOptions, container *api.Container) (*kubecontainer.Mount, error) { if opts.PodContainerDir == "" || container.TerminationMessagePath == "" { return nil, nil } @@ -659,7 +663,7 @@ func makeContainerLogMount(opts *kubecontainer.RunContainerOptions, container *a // on the disk. randomUID := util.NewUUID() containerLogPath := path.Join(opts.PodContainerDir, string(randomUID)) - fs, err := os.Create(containerLogPath) + fs, err := r.os.Create(containerLogPath) if err != nil { return nil, err } @@ -711,7 +715,7 @@ func (r *Runtime) newAppcRuntimeApp(pod *api.Pod, c api.Container, pullSecrets [ } // create the container log file and make a mount pair. - mnt, err := makeContainerLogMount(opts, &c) + mnt, err := r.makeContainerLogMount(opts, &c) if err != nil { return err } @@ -868,7 +872,7 @@ func (r *Runtime) generateRunCommand(pod *api.Pod, uuid string) (string, error) runPrepared = append(runPrepared, "--net=host") // TODO(yifan): Let runtimeHelper.GeneratePodHostNameAndDomain() to handle this. - hostname, err = os.Hostname() + hostname, err = r.os.Hostname() if err != nil { return "", err } @@ -920,7 +924,7 @@ func (r *Runtime) preparePod(pod *api.Pod, pullSecrets []api.Secret) (string, *k } defer func() { manifestFile.Close() - if err := os.Remove(manifestFile.Name()); err != nil { + if err := r.os.Remove(manifestFile.Name()); err != nil { glog.Warningf("rkt: Cannot remove temp manifest file %q: %v", manifestFile.Name(), err) } }() @@ -942,7 +946,7 @@ func (r *Runtime) preparePod(pod *api.Pod, pullSecrets []api.Secret) (string, *k if r.config.Stage1Image != "" { cmds = append(cmds, "--stage1-path", r.config.Stage1Image) } - output, err := r.runCommand(cmds...) + output, err := r.cli.RunCommand(cmds...) if err != nil { return "", nil, err } @@ -972,7 +976,7 @@ func (r *Runtime) preparePod(pod *api.Pod, pullSecrets []api.Secret) (string, *k serviceName := makePodServiceFileName(uuid) glog.V(4).Infof("rkt: Creating service file %q for pod %q", serviceName, format.Pod(pod)) - serviceFile, err := os.Create(serviceFilePath(serviceName)) + serviceFile, err := r.os.Create(serviceFilePath(serviceName)) if err != nil { return "", nil, err } @@ -1340,7 +1344,7 @@ func (r *Runtime) KillPod(pod *api.Pod, runningPod kubecontainer.Pod, gracePerio // Touch the systemd service file to update the mod time so it will // not be garbage collected too soon. - if err := os.Chtimes(serviceFilePath(serviceName), time.Now(), time.Now()); err != nil { + if err := r.os.Chtimes(serviceFilePath(serviceName), time.Now(), time.Now()); err != nil { glog.Errorf("rkt: Failed to change the modification time of the service file %q: %v", serviceName, err) return err } @@ -1499,7 +1503,7 @@ func (r *Runtime) GarbageCollect(gcPolicy kubecontainer.ContainerGCPolicy) error glog.V(4).Infof("rkt: Garbage collecting triggered with policy %v", gcPolicy) - if err := exec.Command("systemctl", "reset-failed").Run(); err != nil { + if err := r.systemd.ResetFailed(); err != nil { glog.Errorf("rkt: Failed to reset failed systemd services: %v, continue to gc anyway...", err) } @@ -1670,7 +1674,7 @@ func (r *Runtime) ExecInContainer(containerID kubecontainer.ContainerID, cmd []s // This way, if you run 'kubectl exec -i bash' (no tty) and type 'exit', // the call below to command.Run() can unblock because its Stdin is the read half // of the pipe. - r, w, err := os.Pipe() + r, w, err := r.os.Pipe() if err != nil { return err } diff --git a/pkg/kubelet/rkt/rkt_test.go b/pkg/kubelet/rkt/rkt_test.go index 1438dae8d94..944829be224 100644 --- a/pkg/kubelet/rkt/rkt_test.go +++ b/pkg/kubelet/rkt/rkt_test.go @@ -33,8 +33,10 @@ import ( "k8s.io/kubernetes/pkg/api/resource" kubecontainer "k8s.io/kubernetes/pkg/kubelet/container" containertesting "k8s.io/kubernetes/pkg/kubelet/container/testing" + kubetesting "k8s.io/kubernetes/pkg/kubelet/container/testing" "k8s.io/kubernetes/pkg/kubelet/lifecycle" "k8s.io/kubernetes/pkg/kubelet/rkt/mock_os" + "k8s.io/kubernetes/pkg/types" "k8s.io/kubernetes/pkg/util/errors" utiltesting "k8s.io/kubernetes/pkg/util/testing" ) @@ -1072,11 +1074,7 @@ func TestSetApp(t *testing.T) { } func TestGenerateRunCommand(t *testing.T) { - hostName, err := os.Hostname() - if err != nil { - t.Fatalf("Cannot get the hostname: %v", err) - } - + hostName := "test-hostname" tests := []struct { pod *api.Pod uuid string @@ -1177,6 +1175,7 @@ func TestGenerateRunCommand(t *testing.T) { } rkt := &Runtime{ + os: &kubetesting.FakeOS{HostName: hostName}, config: &Config{ Path: "/bin/rkt/rkt", Stage1Image: "/bin/rkt/stage1-coreos.aci", @@ -1397,3 +1396,218 @@ func TestImageStats(t *testing.T) { assert.NoError(t, err) assert.Equal(t, result, &kubecontainer.ImageStats{TotalStorageBytes: 600}) } + +func TestGarbageCollect(t *testing.T) { + fr := newFakeRktInterface() + fs := newFakeSystemd() + cli := newFakeRktCli() + fakeOS := kubetesting.NewFakeOS() + getter := newFakePodGetter() + + rkt := &Runtime{ + os: fakeOS, + cli: cli, + apisvc: fr, + podGetter: getter, + systemd: fs, + containerRefManager: kubecontainer.NewRefManager(), + } + + fakeApp := &rktapi.App{Name: "app-foo"} + + tests := []struct { + gcPolicy kubecontainer.ContainerGCPolicy + apiPods []*api.Pod + pods []*rktapi.Pod + serviceFilesOnDisk []string + expectedCommands []string + expectedServiceFiles []string + }{ + // All running pods, should not be gc'd. + // Dead, new pods should not be gc'd. + // Dead, old pods should be gc'd. + // Deleted pods should be gc'd. + // Service files without corresponded pods should be removed. + { + kubecontainer.ContainerGCPolicy{ + MinAge: 0, + MaxContainers: 0, + }, + []*api.Pod{ + {ObjectMeta: api.ObjectMeta{UID: "pod-uid-1"}}, + {ObjectMeta: api.ObjectMeta{UID: "pod-uid-2"}}, + {ObjectMeta: api.ObjectMeta{UID: "pod-uid-3"}}, + {ObjectMeta: api.ObjectMeta{UID: "pod-uid-4"}}, + }, + []*rktapi.Pod{ + { + Id: "deleted-foo", + State: rktapi.PodState_POD_STATE_EXITED, + CreatedAt: time.Now().Add(time.Hour).UnixNano(), + StartedAt: time.Now().Add(time.Hour).UnixNano(), + Apps: []*rktapi.App{fakeApp}, + Annotations: []*rktapi.KeyValue{ + { + Key: k8sRktUIDAnno, + Value: "pod-uid-0", + }, + }, + }, + { + Id: "running-foo", + State: rktapi.PodState_POD_STATE_RUNNING, + CreatedAt: 0, + StartedAt: 0, + Apps: []*rktapi.App{fakeApp}, + Annotations: []*rktapi.KeyValue{ + { + Key: k8sRktUIDAnno, + Value: "pod-uid-1", + }, + }, + }, + { + Id: "running-bar", + State: rktapi.PodState_POD_STATE_RUNNING, + CreatedAt: 0, + StartedAt: 0, + Apps: []*rktapi.App{fakeApp}, + Annotations: []*rktapi.KeyValue{ + { + Key: k8sRktUIDAnno, + Value: "pod-uid-2", + }, + }, + }, + { + Id: "dead-old", + State: rktapi.PodState_POD_STATE_EXITED, + CreatedAt: 0, + StartedAt: 0, + Apps: []*rktapi.App{fakeApp}, + Annotations: []*rktapi.KeyValue{ + { + Key: k8sRktUIDAnno, + Value: "pod-uid-3", + }, + }, + }, + { + Id: "dead-new", + State: rktapi.PodState_POD_STATE_EXITED, + CreatedAt: time.Now().Add(time.Hour).UnixNano(), + StartedAt: time.Now().Add(time.Hour).UnixNano(), + Apps: []*rktapi.App{fakeApp}, + Annotations: []*rktapi.KeyValue{ + { + Key: k8sRktUIDAnno, + Value: "pod-uid-4", + }, + }, + }, + }, + []string{"k8s_dead-old.service", "k8s_deleted-foo.service", "k8s_non-existing-bar.service"}, + []string{"rkt rm dead-old", "rkt rm deleted-foo"}, + []string{"/run/systemd/system/k8s_dead-old.service", "/run/systemd/system/k8s_deleted-foo.service", "/run/systemd/system/k8s_non-existing-bar.service"}, + }, + // gcPolicy.MaxContainers should be enforced. + // Oldest ones are removed first. + { + kubecontainer.ContainerGCPolicy{ + MinAge: 0, + MaxContainers: 1, + }, + []*api.Pod{ + {ObjectMeta: api.ObjectMeta{UID: "pod-uid-0"}}, + {ObjectMeta: api.ObjectMeta{UID: "pod-uid-1"}}, + {ObjectMeta: api.ObjectMeta{UID: "pod-uid-2"}}, + }, + []*rktapi.Pod{ + { + Id: "dead-2", + State: rktapi.PodState_POD_STATE_EXITED, + CreatedAt: 2, + StartedAt: 2, + Apps: []*rktapi.App{fakeApp}, + Annotations: []*rktapi.KeyValue{ + { + Key: k8sRktUIDAnno, + Value: "pod-uid-2", + }, + }, + }, + { + Id: "dead-1", + State: rktapi.PodState_POD_STATE_EXITED, + CreatedAt: 1, + StartedAt: 1, + Apps: []*rktapi.App{fakeApp}, + Annotations: []*rktapi.KeyValue{ + { + Key: k8sRktUIDAnno, + Value: "pod-uid-1", + }, + }, + }, + { + Id: "dead-0", + State: rktapi.PodState_POD_STATE_EXITED, + CreatedAt: 0, + StartedAt: 0, + Apps: []*rktapi.App{fakeApp}, + Annotations: []*rktapi.KeyValue{ + { + Key: k8sRktUIDAnno, + Value: "pod-uid-0", + }, + }, + }, + }, + []string{"k8s_dead-0.service", "k8s_dead-1.service", "k8s_dead-2.service"}, + []string{"rkt rm dead-0", "rkt rm dead-1"}, + []string{"/run/systemd/system/k8s_dead-0.service", "/run/systemd/system/k8s_dead-1.service"}, + }, + } + + for i, tt := range tests { + testCaseHint := fmt.Sprintf("test case #%d", i) + + ctrl := gomock.NewController(t) + + fakeOS.ReadDirFn = func(dirname string) ([]os.FileInfo, error) { + serviceFileNames := tt.serviceFilesOnDisk + var fileInfos []os.FileInfo + + for _, name := range serviceFileNames { + mockFI := mock_os.NewMockFileInfo(ctrl) + mockFI.EXPECT().Name().Return(name) + fileInfos = append(fileInfos, mockFI) + } + return fileInfos, nil + } + + fr.pods = tt.pods + for _, p := range tt.apiPods { + getter.pods[p.UID] = p + } + + err := rkt.GarbageCollect(tt.gcPolicy) + assert.NoError(t, err, testCaseHint) + + sort.Sort(sortedStringList(tt.expectedCommands)) + sort.Sort(sortedStringList(cli.cmds)) + + assert.Equal(t, tt.expectedCommands, cli.cmds, testCaseHint) + + sort.Sort(sortedStringList(tt.expectedServiceFiles)) + sort.Sort(sortedStringList(fakeOS.Removes)) + + assert.Equal(t, tt.expectedServiceFiles, fakeOS.Removes, testCaseHint) + + // Cleanup after each test. + cli.Reset() + ctrl.Finish() + fakeOS.Removes = []string{} + getter.pods = make(map[types.UID]*api.Pod) + } +} diff --git a/pkg/kubelet/rkt/systemd.go b/pkg/kubelet/rkt/systemd.go index 14b2dc9db0d..7151cfdce60 100644 --- a/pkg/kubelet/rkt/systemd.go +++ b/pkg/kubelet/rkt/systemd.go @@ -62,6 +62,8 @@ type systemdInterface interface { RestartUnit(name string, mode string, ch chan<- string) (int, error) // Reload is equivalent to 'systemctl daemon-reload'. Reload() error + // ResetFailed is equivalent to 'systemctl reset-failed'. + ResetFailed() error } // systemd implements the systemdInterface using dbus and systemctl. @@ -101,3 +103,8 @@ func (s *systemd) Version() (systemdVersion, error) { } return systemdVersion(result), nil } + +// ResetFailed calls 'systemctl reset failed' +func (s *systemd) ResetFailed() error { + return exec.Command("systemctl", "reset-failed").Run() +} diff --git a/pkg/kubelet/runonce_test.go b/pkg/kubelet/runonce_test.go index 85bbe7f1ba2..b4b11e957e7 100644 --- a/pkg/kubelet/runonce_test.go +++ b/pkg/kubelet/runonce_test.go @@ -69,7 +69,7 @@ func TestRunOnce(t *testing.T) { statusManager: status.NewManager(nil, podManager), containerRefManager: kubecontainer.NewRefManager(), podManager: podManager, - os: containertest.FakeOS{}, + os: &containertest.FakeOS{}, volumeManager: newVolumeManager(), diskSpaceManager: diskSpaceManager, containerRuntime: fakeRuntime, diff --git a/pkg/kubemark/hollow_kubelet.go b/pkg/kubemark/hollow_kubelet.go index 18115d92152..0a43583214f 100644 --- a/pkg/kubemark/hollow_kubelet.go +++ b/pkg/kubemark/hollow_kubelet.go @@ -65,14 +65,14 @@ func NewHollowKubelet( cadvisorInterface, manifestFilePath, nil, /* cloud-provider */ - containertest.FakeOS{}, /* os-interface */ - 20*time.Second, /* FileCheckFrequency */ - 20*time.Second, /* HTTPCheckFrequency */ - 1*time.Minute, /* MinimumGCAge */ - 10*time.Second, /* NodeStatusUpdateFrequency */ - 10*time.Second, /* SyncFrequency */ - 5*time.Minute, /* OutOfDiskTransitionFrequency */ - 5*time.Minute, /* EvictionPressureTransitionPeriod */ + &containertest.FakeOS{}, /* os-interface */ + 20*time.Second, /* FileCheckFrequency */ + 20*time.Second, /* HTTPCheckFrequency */ + 1*time.Minute, /* MinimumGCAge */ + 10*time.Second, /* NodeStatusUpdateFrequency */ + 10*time.Second, /* SyncFrequency */ + 5*time.Minute, /* OutOfDiskTransitionFrequency */ + 5*time.Minute, /* EvictionPressureTransitionPeriod */ maxPods, containerManager, nil,