diff --git a/pkg/kubelet/kubelet.go b/pkg/kubelet/kubelet.go index e6db9884d49..da8a450732d 100644 --- a/pkg/kubelet/kubelet.go +++ b/pkg/kubelet/kubelet.go @@ -349,6 +349,8 @@ func NewMainKubelet( return nil, err } klet.containerRuntime = rktRuntime + klet.containerGC = rktRuntime + klet.imageManager = rkt.NewImageManager(rktRuntime) // No Docker daemon to put in a container. dockerDaemonContainer = "" diff --git a/pkg/kubelet/rkt/gc.go b/pkg/kubelet/rkt/gc.go index ff3bfda959a..555774de474 100644 --- a/pkg/kubelet/rkt/gc.go +++ b/pkg/kubelet/rkt/gc.go @@ -16,16 +16,28 @@ limitations under the License. package rkt +import "github.com/golang/glog" + // ImageManager manages and garbage collects the container images for rkt. type ImageManager struct { - runtime *runtime + runtime *Runtime } -func NewImageManager(r *runtime) *ImageManager { +func NewImageManager(r *Runtime) *ImageManager { return &ImageManager{runtime: r} } -// GarbageCollect collects the images. It is not implemented by rkt yet. +// GarbageCollect collects the images. +// TODO(yifan): Enforce ImageGCPolicy. func (im *ImageManager) GarbageCollect() error { + if _, err := im.runtime.runCommand("image", "gc"); err != nil { + glog.Errorf("rkt: Failed to gc image: %v", err) + return err + } + return nil +} + +// Start is a no-op for rkt as we don't need to mark unused images in kubelet. +func (im *ImageManager) Start() error { return nil } diff --git a/pkg/kubelet/rkt/rkt.go b/pkg/kubelet/rkt/rkt.go index 1f9c9a13b13..80614684e75 100644 --- a/pkg/kubelet/rkt/rkt.go +++ b/pkg/kubelet/rkt/rkt.go @@ -79,10 +79,10 @@ const ( defaultImageTag = "latest" ) -// runtime implements the Containerruntime for rkt. The implementation +// Runtime implements the Containerruntime for rkt. The implementation // uses systemd, so in order to run this runtime, systemd must be installed // on the machine. -type runtime struct { +type Runtime struct { systemd *dbus.Conn // The absolute path to rkt binary. rktBinAbsPath string @@ -98,7 +98,7 @@ type runtime struct { imagePuller kubecontainer.ImagePuller } -var _ kubecontainer.Runtime = &runtime{} +var _ kubecontainer.Runtime = &Runtime{} // TODO(yifan): Remove this when volumeManager is moved to separate package. type volumeGetter interface { @@ -113,7 +113,7 @@ func New(config *Config, recorder record.EventRecorder, containerRefManager *kubecontainer.RefManager, prober prober.Prober, - volumeGetter volumeGetter) (kubecontainer.Runtime, error) { + volumeGetter volumeGetter) (*Runtime, error) { systemdVersion, err := getSystemdVersion() if err != nil { @@ -142,7 +142,7 @@ func New(config *Config, } } - rkt := &runtime{ + rkt := &Runtime{ systemd: systemd, rktBinAbsPath: rktBinAbsPath, config: config, @@ -170,7 +170,7 @@ func New(config *Config, return rkt, nil } -func (r *runtime) buildCommand(args ...string) *exec.Cmd { +func (r *Runtime) buildCommand(args ...string) *exec.Cmd { cmd := exec.Command(r.rktBinAbsPath) cmd.Args = append(cmd.Args, r.config.buildGlobalOptions()...) cmd.Args = append(cmd.Args, args...) @@ -179,7 +179,7 @@ func (r *runtime) buildCommand(args ...string) *exec.Cmd { // runCommand invokes rkt binary with arguments and returns the result // from stdout in a list of strings. Each string in the list is a line. -func (r *runtime) runCommand(args ...string) ([]string, error) { +func (r *Runtime) runCommand(args ...string) ([]string, error) { glog.V(4).Info("rkt: Run command:", args) var stdout, stderr bytes.Buffer @@ -394,7 +394,7 @@ func parseImageName(image string) (string, string) { // getImageManifest invokes 'rkt image cat-manifest' to retrive the image manifest // for the image. -func (r *runtime) getImageManifest(image string) (*appcschema.ImageManifest, error) { +func (r *Runtime) getImageManifest(image string) (*appcschema.ImageManifest, error) { var manifest appcschema.ImageManifest repoToPull, tag := parseImageName(image) @@ -413,7 +413,7 @@ func (r *runtime) getImageManifest(image string) (*appcschema.ImageManifest, err } // makePodManifest transforms a kubelet pod spec to the rkt pod manifest. -func (r *runtime) makePodManifest(pod *api.Pod, pullSecrets []api.Secret) (*appcschema.PodManifest, error) { +func (r *Runtime) makePodManifest(pod *api.Pod, pullSecrets []api.Secret) (*appcschema.PodManifest, error) { var globalPortMappings []kubecontainer.PortMapping manifest := appcschema.BlankPodManifest() @@ -535,7 +535,7 @@ func serviceFilePath(serviceName string) string { // // On success, it will return a string that represents name of the unit file // and the runtime pod. -func (r *runtime) preparePod(pod *api.Pod, pullSecrets []api.Secret) (string, *kubecontainer.Pod, error) { +func (r *Runtime) preparePod(pod *api.Pod, pullSecrets []api.Secret) (string, *kubecontainer.Pod, error) { // Generate the pod manifest from the pod spec. manifest, err := r.makePodManifest(pod, pullSecrets) if err != nil { @@ -643,7 +643,7 @@ func (r *runtime) preparePod(pod *api.Pod, pullSecrets []api.Secret) (string, *k // generateEvents is a helper function that generates some container // life cycle events for containers in a pod. -func (r *runtime) generateEvents(runtimePod *kubecontainer.Pod, reason string, failure error) { +func (r *Runtime) generateEvents(runtimePod *kubecontainer.Pod, reason string, failure error) { // Set up container references. for _, c := range runtimePod.Containers { containerID := string(c.ID) @@ -679,7 +679,7 @@ func (r *runtime) generateEvents(runtimePod *kubecontainer.Pod, reason string, f // RunPod first creates the unit file for a pod, and then // starts the unit over d-bus. -func (r *runtime) RunPod(pod *api.Pod, pullSecrets []api.Secret) error { +func (r *Runtime) RunPod(pod *api.Pod, pullSecrets []api.Secret) error { glog.V(4).Infof("Rkt starts to run pod: name %q.", kubeletUtil.FormatPodName(pod)) name, runtimePod, prepareErr := r.preparePod(pod, pullSecrets) @@ -722,7 +722,7 @@ func (r *runtime) RunPod(pod *api.Pod, pullSecrets []api.Secret) error { } // readServiceFile reads the service file and constructs the runtime pod and the rkt info. -func (r *runtime) readServiceFile(serviceName string) (*kubecontainer.Pod, *rktInfo, error) { +func (r *Runtime) readServiceFile(serviceName string) (*kubecontainer.Pod, *rktInfo, error) { f, err := os.Open(serviceFilePath(serviceName)) if err != nil { return nil, nil, err @@ -769,7 +769,7 @@ func (r *runtime) readServiceFile(serviceName string) (*kubecontainer.Pod, *rktI // Then it will use the result to construct a list of container runtime pods. // If all is false, then only running pods will be returned, otherwise all pods will be // returned. -func (r *runtime) GetPods(all bool) ([]*kubecontainer.Pod, error) { +func (r *Runtime) GetPods(all bool) ([]*kubecontainer.Pod, error) { glog.V(4).Infof("Rkt getting pods") units, err := r.systemd.ListUnits() @@ -796,7 +796,7 @@ func (r *runtime) GetPods(all bool) ([]*kubecontainer.Pod, error) { // KillPod invokes 'systemctl kill' to kill the unit that runs the pod. // TODO(yifan): Handle network plugin. -func (r *runtime) KillPod(pod *api.Pod, runningPod kubecontainer.Pod) error { +func (r *Runtime) KillPod(pod *api.Pod, runningPod kubecontainer.Pod) error { glog.V(4).Infof("Rkt is killing pod: name %q.", runningPod.Name) serviceName := makePodServiceFileName(runningPod.ID) @@ -817,7 +817,7 @@ func (r *runtime) KillPod(pod *api.Pod, runningPod kubecontainer.Pod) error { // getPodStatus reads the service file and invokes 'rkt status $UUID' to get the // pod's status. -func (r *runtime) getPodStatus(serviceName string) (*api.PodStatus, error) { +func (r *Runtime) getPodStatus(serviceName string) (*api.PodStatus, error) { var status api.PodStatus // TODO(yifan): Get rkt uuid from the service file name. @@ -842,7 +842,7 @@ func (r *runtime) getPodStatus(serviceName string) (*api.PodStatus, error) { } // GetPodStatus returns the status of the given pod. -func (r *runtime) GetPodStatus(pod *api.Pod) (*api.PodStatus, error) { +func (r *Runtime) GetPodStatus(pod *api.Pod) (*api.PodStatus, error) { serviceName := makePodServiceFileName(pod.UID) return r.getPodStatus(serviceName) } @@ -854,7 +854,7 @@ func (r *runtime) GetPodStatus(pod *api.Pod) (*api.PodStatus, error) { // Example: // rkt:0.3.2+git --> []int{0, 3, 2}. // -func (r *runtime) Version() (kubecontainer.Version, error) { +func (r *Runtime) Version() (kubecontainer.Version, error) { output, err := r.runCommand("version") if err != nil { return nil, err @@ -878,7 +878,7 @@ func (r *runtime) Version() (kubecontainer.Version, error) { // TODO(yifan): This is very racy, unefficient, and unsafe, we need to provide // different namespaces. See: https://github.com/coreos/rkt/issues/836. -func (r *runtime) writeDockerAuthConfig(image string, credsSlice []docker.AuthConfiguration) error { +func (r *Runtime) writeDockerAuthConfig(image string, credsSlice []docker.AuthConfiguration) error { if len(credsSlice) == 0 { return nil } @@ -922,7 +922,7 @@ func (r *runtime) writeDockerAuthConfig(image string, credsSlice []docker.AuthCo // // http://issue.k8s.io/7203 // -func (r *runtime) PullImage(image kubecontainer.ImageSpec, pullSecrets []api.Secret) error { +func (r *Runtime) PullImage(image kubecontainer.ImageSpec, pullSecrets []api.Secret) error { img := image.Image // TODO(yifan): The credential operation is a copy from dockertools package, // Need to resolve the code duplication. @@ -951,7 +951,7 @@ func (r *runtime) PullImage(image kubecontainer.ImageSpec, pullSecrets []api.Sec } // TODO(yifan): Searching the image via 'rkt images' might not be the most efficient way. -func (r *runtime) IsImagePresent(image kubecontainer.ImageSpec) (bool, error) { +func (r *Runtime) IsImagePresent(image kubecontainer.ImageSpec) (bool, error) { repoToPull, tag := parseImageName(image.Image) // Example output of 'rkt image list --fields=name': // @@ -985,7 +985,7 @@ func (r *runtime) IsImagePresent(image kubecontainer.ImageSpec) (bool, error) { } // SyncPod syncs the running pod to match the specified desired pod. -func (r *runtime) SyncPod(pod *api.Pod, runningPod kubecontainer.Pod, podStatus api.PodStatus, pullSecrets []api.Secret, backOff *util.Backoff) error { +func (r *Runtime) SyncPod(pod *api.Pod, runningPod kubecontainer.Pod, podStatus api.PodStatus, pullSecrets []api.Secret, backOff *util.Backoff) error { podFullName := kubeletUtil.FormatPodName(pod) // Add references to all containers. @@ -1062,7 +1062,7 @@ func (r *runtime) SyncPod(pod *api.Pod, runningPod kubecontainer.Pod, podStatus // See https://github.com/coreos/rkt/blob/master/Documentation/commands.md#logging for more details. // // TODO(yifan): If the rkt is using lkvm as the stage1 image, then this function will fail. -func (r *runtime) GetContainerLogs(pod *api.Pod, containerID string, logOptions *api.PodLogOptions, stdout, stderr io.Writer) error { +func (r *Runtime) GetContainerLogs(pod *api.Pod, containerID string, logOptions *api.PodLogOptions, stdout, stderr io.Writer) error { id, err := parseContainerID(containerID) if err != nil { return err @@ -1081,8 +1081,10 @@ func (r *runtime) GetContainerLogs(pod *api.Pod, containerID string, logOptions return cmd.Run() } -// GarbageCollect collects the pods/containers. TODO(yifan): Enforce the gc policy. -func (r *runtime) GarbageCollect() error { +// GarbageCollect collects the pods/containers. +// TODO(yifan): Enforce the gc policy, also, it would be better if we can +// just GC kubernetes pods. +func (r *Runtime) GarbageCollect() error { if err := exec.Command("systemctl", "reset-failed").Run(); err != nil { glog.Errorf("rkt: Failed to reset failed systemd services: %v", err) } @@ -1096,7 +1098,7 @@ func (r *runtime) GarbageCollect() error { // Note: In rkt, the container ID is in the form of "UUID:appName", where // appName is the container name. // TODO(yifan): If the rkt is using lkvm as the stage1 image, then this function will fail. -func (r *runtime) RunInContainer(containerID string, cmd []string) ([]byte, error) { +func (r *Runtime) RunInContainer(containerID string, cmd []string) ([]byte, error) { glog.V(4).Infof("Rkt running in container.") id, err := parseContainerID(containerID) @@ -1110,14 +1112,14 @@ func (r *runtime) RunInContainer(containerID string, cmd []string) ([]byte, erro return []byte(strings.Join(result, "\n")), err } -func (r *runtime) AttachContainer(containerID string, stdin io.Reader, stdout, stderr io.WriteCloser, tty bool) error { +func (r *Runtime) AttachContainer(containerID string, stdin io.Reader, stdout, stderr io.WriteCloser, tty bool) error { return fmt.Errorf("unimplemented") } // Note: In rkt, the container ID is in the form of "UUID:appName", where UUID is // the rkt UUID, and appName is the container name. // TODO(yifan): If the rkt is using lkvm as the stage1 image, then this function will fail. -func (r *runtime) ExecInContainer(containerID string, cmd []string, stdin io.Reader, stdout, stderr io.WriteCloser, tty bool) error { +func (r *Runtime) ExecInContainer(containerID string, cmd []string, stdin io.Reader, stdout, stderr io.WriteCloser, tty bool) error { glog.V(4).Infof("Rkt execing in container.") id, err := parseContainerID(containerID) @@ -1169,7 +1171,7 @@ func (r *runtime) ExecInContainer(containerID string, cmd []string, stdin io.Rea } // findRktID returns the rkt uuid for the pod. -func (r *runtime) findRktID(pod *kubecontainer.Pod) (string, error) { +func (r *Runtime) findRktID(pod *kubecontainer.Pod) (string, error) { serviceName := makePodServiceFileName(pod.ID) f, err := os.Open(serviceFilePath(serviceName)) @@ -1205,7 +1207,7 @@ func (r *runtime) findRktID(pod *kubecontainer.Pod) (string, error) { // // TODO(yifan): Merge with the same function in dockertools. // TODO(yifan): If the rkt is using lkvm as the stage1 image, then this function will fail. -func (r *runtime) PortForward(pod *kubecontainer.Pod, port uint16, stream io.ReadWriteCloser) error { +func (r *Runtime) PortForward(pod *kubecontainer.Pod, port uint16, stream io.ReadWriteCloser) error { glog.V(4).Infof("Rkt port forwarding in container.") rktID, err := r.findRktID(pod) @@ -1265,7 +1267,7 @@ func isUUID(input string) bool { // getPodInfo returns the pod info of a single pod according // to the uuid. -func (r *runtime) getPodInfo(uuid string) (*podInfo, error) { +func (r *Runtime) getPodInfo(uuid string) (*podInfo, error) { status, err := r.runCommand("status", uuid) if err != nil { return nil, err @@ -1281,7 +1283,7 @@ func (r *runtime) getPodInfo(uuid string) (*podInfo, error) { // TODO(yifan): Replace with 'rkt image cat-manifest'. // imageName should be in the form of 'example.com/app:latest', which should matches // the result of 'rkt image list'. If the version is empty, then 'latest' is assumed. -func (r *runtime) getImageByName(imageName string) (*kubecontainer.Image, error) { +func (r *Runtime) getImageByName(imageName string) (*kubecontainer.Image, error) { // TODO(yifan): Print hash in 'rkt image cat-manifest'? images, err := r.ListImages() if err != nil { @@ -1309,7 +1311,7 @@ func (r *runtime) getImageByName(imageName string) (*kubecontainer.Image, error) } // ListImages lists all the available appc images on the machine by invoking 'rkt image list'. -func (r *runtime) ListImages() ([]kubecontainer.Image, error) { +func (r *Runtime) ListImages() ([]kubecontainer.Image, error) { // Example output of 'rkt image list --fields=key,name': // // KEY NAME @@ -1355,7 +1357,7 @@ func parseImageInfo(input string) (*kubecontainer.Image, error) { // RemoveImage removes an on-disk image using 'rkt image rm'. // TODO(yifan): Use image ID to reference image. -func (r *runtime) RemoveImage(image kubecontainer.ImageSpec) error { +func (r *Runtime) RemoveImage(image kubecontainer.ImageSpec) error { img, err := r.getImageByName(image.Image) if err != nil { return err