Merge pull request #7752 from vmarmol/rkt-compile

Add build labels to rkt
This commit is contained in:
Yu-Ju Hong
2015-05-05 09:26:59 -07:00
9 changed files with 548 additions and 577 deletions

View File

@@ -16,40 +16,12 @@ limitations under the License.
package rkt
import (
"os/exec"
"github.com/golang/glog"
)
const (
// TODO(yifan): Merge with ContainerGCPolicy, i.e., derive
// the grace period from MinAge in ContainerGCPolicy.
//
// Duration to wait before discarding inactive pods from garbage
defaultGracePeriod = "1m"
// Duration to wait before expiring prepared pods.
defaultExpirePrepared = "1m"
)
// GarbageCollect collects the pods/containers. TODO(yifan): Enforce the gc policy.
func (r *Runtime) GarbageCollect() error {
if err := exec.Command("systemctl", "reset-failed").Run(); err != nil {
glog.Errorf("rkt: Failed to reset failed systemd services: %v", err)
}
if _, err := r.runCommand("gc", "--grace-period="+defaultGracePeriod, "--expire-prepared="+defaultExpirePrepared); err != nil {
glog.Errorf("rkt: Failed to gc: %v", err)
return err
}
return nil
}
// ImageManager manages and garbage collects the container images for rkt.
type ImageManager struct {
runtime *Runtime
runtime *runtime
}
func NewImageManager(r *Runtime) *ImageManager {
func NewImageManager(r *runtime) *ImageManager {
return &ImageManager{runtime: r}
}

View File

@@ -1,50 +0,0 @@
/*
Copyright 2015 The Kubernetes Authors All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package rkt
import (
"io"
"os/exec"
"strconv"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
)
// GetContainerLogs uses journalctl to get the logs of the container.
// By default, it returns a snapshot of the container log. Set |follow| to true to
// stream the log. Set |follow| to false and specify the number of lines (e.g.
// "100" or "all") to tail the log.
// TODO(yifan): Currently, it fetches all the containers' log within a pod. We will
// be able to fetch individual container's log once https://github.com/coreos/rkt/pull/841
// landed.
func (r *Runtime) GetContainerLogs(pod *api.Pod, containerID string, tail string, follow bool, stdout, stderr io.Writer) error {
unitName := makePodServiceFileName(pod.UID)
cmd := exec.Command("journalctl", "-u", unitName)
if follow {
cmd.Args = append(cmd.Args, "-f")
}
if tail == "all" {
cmd.Args = append(cmd.Args, "-a")
} else {
_, err := strconv.Atoi(tail)
if err == nil {
cmd.Args = append(cmd.Args, "-n", tail)
}
}
cmd.Stdout, cmd.Stderr = stdout, stderr
return cmd.Start()
}

View File

@@ -169,47 +169,3 @@ func splitLineByTab(line string) []string {
}
return result
}
// getPodInfos returns a map of [pod-uuid]:*podInfo
func (r *Runtime) getPodInfos() (map[string]*podInfo, error) {
result := make(map[string]*podInfo)
output, err := r.runCommand("list", "--no-legend", "--full")
if err != nil {
return result, err
}
if len(output) == 0 {
// No pods are running.
return result, nil
}
// Example output of current 'rkt list --full' (version == 0.4.2):
// UUID ACI STATE NETWORKS
// 2372bc17-47cb-43fb-8d78-20b31729feda foo running default:ip4=172.16.28.3
// bar
// 40e2813b-9d5d-4146-a817-0de92646da96 foo exited
// 40e2813b-9d5d-4146-a817-0de92646da96 bar exited
//
// With '--no-legend', the first line is eliminated.
for _, line := range output {
tuples := splitLineByTab(line)
if len(tuples) < 3 { // At least it should have 3 entries.
glog.Warningf("rkt: Unrecognized line: %q", line)
continue
}
id := tuples[0]
status, err := r.runCommand("status", id)
if err != nil {
glog.Errorf("rkt: Cannot get status for pod (uuid=%q): %v", id, err)
continue
}
info, err := parsePodInfo(status)
if err != nil {
glog.Errorf("rkt: Cannot parse status for pod (uuid=%q): %v", id, err)
continue
}
result[id] = info
}
return result, nil
}

View File

@@ -1,119 +0,0 @@
/*
Copyright 2015 The Kubernetes Authors All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package rkt
import (
"fmt"
"os"
"path"
"strings"
kubecontainer "github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/container"
"github.com/docker/docker/pkg/parsers"
docker "github.com/fsouza/go-dockerclient"
"github.com/golang/glog"
)
const (
authDir = "auth.d"
dockerAuthTemplate = `{"rktKind":"dockerAuth","rktVersion":"v1","registries":[%q],"credentials":{"user":%q,"password":%q}}`
)
// writeDockerAuthConfig writes the docker credentials to rkt auth config files.
// This enables rkt to pull docker images from docker registry with credentials.
func (r *Runtime) writeDockerAuthConfig(image string, creds docker.AuthConfiguration) error {
registry := "index.docker.io"
// Image spec: [<registry>/]<repository>/<image>[:<version]
explicitRegistry := (strings.Count(image, "/") == 2)
if explicitRegistry {
registry = strings.Split(image, "/")[0]
}
localConfigDir := rktLocalConfigDir
if r.config.LocalConfigDir != "" {
localConfigDir = r.config.LocalConfigDir
}
authDir := path.Join(localConfigDir, "auth.d")
if _, err := os.Stat(authDir); os.IsNotExist(err) {
if err := os.Mkdir(authDir, 0600); err != nil {
glog.Errorf("rkt: Cannot create auth dir: %v", err)
return err
}
}
f, err := os.Create(path.Join(localConfigDir, authDir, registry+".json"))
if err != nil {
glog.Errorf("rkt: Cannot create docker auth config file: %v", err)
return err
}
defer f.Close()
config := fmt.Sprintf(dockerAuthTemplate, registry, creds.Username, creds.Password)
if _, err := f.Write([]byte(config)); err != nil {
glog.Errorf("rkt: Cannot write docker auth config file: %v", err)
return err
}
return nil
}
// PullImage invokes 'rkt fetch' to download an aci.
func (r *Runtime) PullImage(img string) error {
// Use credentials for docker images. This string operation can be cleaned up
// once the format of image is landed, see:
// https://github.com/GoogleCloudPlatform/kubernetes/issues/7203
//
if strings.HasPrefix(img, dockerPrefix) {
repoToPull, tag := parsers.ParseRepositoryTag(img)
// If no tag was specified, use the default "latest".
if len(tag) == 0 {
tag = "latest"
}
creds, ok := r.dockerKeyring.Lookup(repoToPull)
if !ok {
glog.V(1).Infof("Pulling image %s without credentials", img)
}
// Let's update a json.
// TODO(yifan): Find a way to feed this to rkt.
if err := r.writeDockerAuthConfig(img, creds); err != nil {
return err
}
}
output, err := r.runCommand("fetch", img)
if err != nil {
return fmt.Errorf("rkt: Failed to fetch image: %v:", output)
}
return nil
}
// IsImagePresent returns true if the image is available on the machine.
// TODO(yifan): 'rkt image' is now landed on master, use that once we bump up
// the rkt version.
func (r *Runtime) IsImagePresent(img string) (bool, error) {
if _, err := r.runCommand("prepare", "--local=true", img); err != nil {
return false, nil
}
return true, nil
}
func (r *Runtime) ListImages() ([]kubecontainer.Image, error) {
return []kubecontainer.Image{}, fmt.Errorf("rkt: ListImages unimplemented")
}
func (r *Runtime) RemoveImage(image string) error {
return fmt.Errorf("rkt: RemoveImages unimplemented")
}

View File

@@ -1,3 +1,5 @@
// +build linux
/*
Copyright 2015 The Kubernetes Authors All rights reserved.
@@ -25,6 +27,7 @@ import (
"os"
"os/exec"
"path"
"strconv"
"strings"
"syscall"
"time"
@@ -34,6 +37,7 @@ import (
"github.com/GoogleCloudPlatform/kubernetes/pkg/credentialprovider"
kubecontainer "github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/container"
"github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/prober"
"github.com/GoogleCloudPlatform/kubernetes/pkg/probe"
"github.com/GoogleCloudPlatform/kubernetes/pkg/types"
"github.com/GoogleCloudPlatform/kubernetes/pkg/util"
"github.com/GoogleCloudPlatform/kubernetes/pkg/volume"
@@ -42,7 +46,10 @@ import (
"github.com/coreos/go-systemd/dbus"
"github.com/coreos/go-systemd/unit"
"github.com/coreos/rkt/store"
"github.com/docker/docker/pkg/parsers"
"github.com/fsouza/go-dockerclient"
"github.com/golang/glog"
"github.com/kr/pty"
)
const (
@@ -64,12 +71,23 @@ const (
unitRktID = "RktID"
dockerPrefix = "docker://"
authDir = "auth.d"
dockerAuthTemplate = `{"rktKind":"dockerAuth","rktVersion":"v1","registries":[%q],"credentials":{"user":%q,"password":%q}}`
// TODO(yifan): Merge with ContainerGCPolicy, i.e., derive
// the grace period from MinAge in ContainerGCPolicy.
//
// Duration to wait before discarding inactive pods from garbage
defaultGracePeriod = "1m"
// Duration to wait before expiring prepared pods.
defaultExpirePrepared = "1m"
)
// Runtime implements the ContainerRuntime for rkt. The implementation
// runtime implements the Containerruntime for rkt. The implementation
// uses systemd, so in order to run this runtime, systemd must be installed
// on the machine.
type Runtime struct {
type runtime struct {
generator kubecontainer.RunContainerOptionsGenerator
readinessManager *kubecontainer.ReadinessManager
prober prober.Prober
@@ -81,12 +99,12 @@ type Runtime struct {
dockerKeyring credentialprovider.DockerKeyring
}
var _ kubecontainer.Runtime = &Runtime{}
var _ kubecontainer.Runtime = &runtime{}
// New creates the rkt container runtime which implements the container runtime interface.
// It will test if the rkt binary is in the $PATH, and whether we can get the
// version of it. If so, creates the rkt container runtime, otherwise returns an error.
func New(config *Config) (*Runtime, error) {
func New(config *Config) (kubecontainer.Runtime, error) {
systemdVersion, err := getSystemdVersion()
if err != nil {
return nil, err
@@ -110,7 +128,7 @@ func New(config *Config) (*Runtime, error) {
return nil, fmt.Errorf("cannot find rkt binary: %v", err)
}
rkt := &Runtime{
rkt := &runtime{
systemd: systemd,
rktBinAbsPath: rktBinAbsPath,
config: config,
@@ -132,7 +150,7 @@ func New(config *Config) (*Runtime, error) {
return rkt, nil
}
func (r *Runtime) buildCommand(args ...string) *exec.Cmd {
func (r *runtime) buildCommand(args ...string) *exec.Cmd {
cmd := exec.Command(rktBinName)
cmd.Args = append(cmd.Args, r.config.buildGlobalOptions()...)
cmd.Args = append(cmd.Args, args...)
@@ -141,7 +159,7 @@ func (r *Runtime) buildCommand(args ...string) *exec.Cmd {
// runCommand invokes rkt binary with arguments and returns the result
// from stdout in a list of strings.
func (r *Runtime) runCommand(args ...string) ([]string, error) {
func (r *runtime) runCommand(args ...string) ([]string, error) {
glog.V(4).Info("rkt: Run command:", args)
output, err := r.buildCommand(args...).Output()
@@ -311,7 +329,7 @@ func setApp(app *appctypes.App, c *api.Container) error {
// makePodManifest transforms a kubelet pod spec to the rkt pod manifest.
// TODO(yifan): Use the RunContainerOptions generated by GenerateRunContainerOptions().
func (r *Runtime) makePodManifest(pod *api.Pod, volumeMap map[string]volume.Volume) (*appcschema.PodManifest, error) {
func (r *runtime) makePodManifest(pod *api.Pod, volumeMap map[string]volume.Volume) (*appcschema.PodManifest, error) {
manifest := appcschema.BlankPodManifest()
// Get the image manifests, assume they are already in the cas,
@@ -385,7 +403,7 @@ func (r *Runtime) makePodManifest(pod *api.Pod, volumeMap map[string]volume.Volu
}
// TODO(yifan): Replace with 'rkt images'.
func (r *Runtime) getImageID(imageName string) (string, error) {
func (r *runtime) getImageID(imageName string) (string, error) {
output, err := r.runCommand("fetch", imageName)
if err != nil {
return "", err
@@ -413,7 +431,7 @@ func hashContainer(container *api.Container) uint64 {
}
// TODO(yifan): Remove the receiver once we can solve the appName->imageID problem.
func (r *Runtime) apiPodToRuntimePod(uuid string, pod *api.Pod) *kubecontainer.Pod {
func (r *runtime) apiPodToruntimePod(uuid string, pod *api.Pod) *kubecontainer.Pod {
p := &kubecontainer.Pod{
ID: pod.UID,
Name: pod.Name,
@@ -444,7 +462,7 @@ func (r *Runtime) apiPodToRuntimePod(uuid string, pod *api.Pod) *kubecontainer.P
// On success, it will return a string that represents name of the unit file
// and a boolean that indicates if the unit file needs to be reloaded (whether
// the file is already existed).
func (r *Runtime) preparePod(pod *api.Pod, volumeMap map[string]volume.Volume) (string, bool, error) {
func (r *runtime) preparePod(pod *api.Pod, volumeMap map[string]volume.Volume) (string, bool, error) {
cmds := []string{"prepare", "--quiet", "--pod-manifest"}
// Generate the pod manifest from the pod spec.
@@ -484,7 +502,7 @@ func (r *Runtime) preparePod(pod *api.Pod, volumeMap map[string]volume.Volume) (
uuid := output[0]
glog.V(4).Infof("'rkt prepare' returns %q.", uuid)
p := r.apiPodToRuntimePod(uuid, pod)
p := r.apiPodToruntimePod(uuid, pod)
b, err := json.Marshal(p)
if err != nil {
return "", false, err
@@ -519,7 +537,7 @@ func (r *Runtime) preparePod(pod *api.Pod, volumeMap map[string]volume.Volume) (
// RunPod first creates the unit file for a pod, and then calls
// StartUnit over d-bus.
func (r *Runtime) RunPod(pod *api.Pod, volumeMap map[string]volume.Volume) error {
func (r *runtime) RunPod(pod *api.Pod, volumeMap map[string]volume.Volume) error {
glog.V(4).Infof("Rkt starts to run pod: name %q.", pod.Name)
name, needReload, err := r.preparePod(pod, volumeMap)
@@ -543,10 +561,10 @@ func (r *Runtime) RunPod(pod *api.Pod, volumeMap map[string]volume.Volume) error
return nil
}
// makeRuntimePod constructs the container runtime pod. It will:
// makeruntimePod constructs the container runtime pod. It will:
// 1, Construct the pod by the information stored in the unit file.
// 2, Construct the pod status from pod info.
func (r *Runtime) makeRuntimePod(unitName string, podInfos map[string]*podInfo) (*kubecontainer.Pod, error) {
func (r *runtime) makeruntimePod(unitName string, podInfos map[string]*podInfo) (*kubecontainer.Pod, error) {
f, err := os.Open(path.Join(systemdServiceDir, unitName))
if err != nil {
return nil, err
@@ -592,7 +610,7 @@ func (r *Runtime) makeRuntimePod(unitName string, podInfos map[string]*podInfo)
// Then it will use the result to contruct a list of container runtime pods.
// If all is false, then only running pods will be returned, otherwise all pods will be
// returned.
func (r *Runtime) GetPods(all bool) ([]*kubecontainer.Pod, error) {
func (r *runtime) GetPods(all bool) ([]*kubecontainer.Pod, error) {
glog.V(4).Infof("Rkt getting pods")
units, err := r.systemd.ListUnits()
@@ -613,7 +631,7 @@ func (r *Runtime) GetPods(all bool) ([]*kubecontainer.Pod, error) {
if !all && u.SubState != "running" {
continue
}
pod, err := r.makeRuntimePod(u.Name, podInfos)
pod, err := r.makeruntimePod(u.Name, podInfos)
if err != nil {
glog.Warningf("rkt: Cannot construct pod from unit file: %v.", err)
continue
@@ -625,7 +643,7 @@ func (r *Runtime) GetPods(all bool) ([]*kubecontainer.Pod, error) {
}
// KillPod invokes 'systemctl kill' to kill the unit that runs the pod.
func (r *Runtime) KillPod(pod kubecontainer.Pod) error {
func (r *runtime) KillPod(pod kubecontainer.Pod) error {
glog.V(4).Infof("Rkt is killing pod: name %q.", pod.Name)
// TODO(yifan): More graceful stop. Replace with StopUnit and wait for a timeout.
@@ -635,7 +653,7 @@ func (r *Runtime) KillPod(pod kubecontainer.Pod) error {
// GetPodStatus currently invokes GetPods() to return the status.
// TODO(yifan): Split the get status logic from GetPods().
func (r *Runtime) GetPodStatus(pod *api.Pod) (*api.PodStatus, error) {
func (r *runtime) GetPodStatus(pod *api.Pod) (*api.PodStatus, error) {
pods, err := r.GetPods(true)
if err != nil {
return nil, err
@@ -646,3 +664,425 @@ func (r *Runtime) GetPodStatus(pod *api.Pod) (*api.PodStatus, error) {
}
return &p.Status, nil
}
// Version invokes 'rkt version' to get the version information of the rkt
// runtime on the machine.
// The return values are an int array containers the version number.
//
// Example:
// rkt:0.3.2+git --> []int{0, 3, 2}.
//
func (r *runtime) Version() (kubecontainer.Version, error) {
output, err := r.runCommand("version")
if err != nil {
return nil, err
}
// Example output for 'rkt version':
// rkt version 0.3.2+git
// appc version 0.3.0+git
for _, line := range output {
tuples := strings.Split(strings.TrimSpace(line), " ")
if len(tuples) != 3 {
glog.Warningf("rkt: cannot parse the output: %q.", line)
continue
}
if tuples[0] == "rkt" {
return parseVersion(tuples[2])
}
}
return nil, fmt.Errorf("rkt: cannot determine the version")
}
// writeDockerAuthConfig writes the docker credentials to rkt auth config files.
// This enables rkt to pull docker images from docker registry with credentials.
func (r *runtime) writeDockerAuthConfig(image string, creds docker.AuthConfiguration) error {
registry := "index.docker.io"
// Image spec: [<registry>/]<repository>/<image>[:<version]
explicitRegistry := (strings.Count(image, "/") == 2)
if explicitRegistry {
registry = strings.Split(image, "/")[0]
}
localConfigDir := rktLocalConfigDir
if r.config.LocalConfigDir != "" {
localConfigDir = r.config.LocalConfigDir
}
authDir := path.Join(localConfigDir, "auth.d")
if _, err := os.Stat(authDir); os.IsNotExist(err) {
if err := os.Mkdir(authDir, 0600); err != nil {
glog.Errorf("rkt: Cannot create auth dir: %v", err)
return err
}
}
f, err := os.Create(path.Join(localConfigDir, authDir, registry+".json"))
if err != nil {
glog.Errorf("rkt: Cannot create docker auth config file: %v", err)
return err
}
defer f.Close()
config := fmt.Sprintf(dockerAuthTemplate, registry, creds.Username, creds.Password)
if _, err := f.Write([]byte(config)); err != nil {
glog.Errorf("rkt: Cannot write docker auth config file: %v", err)
return err
}
return nil
}
// PullImage invokes 'rkt fetch' to download an aci.
func (r *runtime) PullImage(img string) error {
// Use credentials for docker images. This string operation can be cleaned up
// once the format of image is landed, see:
// https://github.com/GoogleCloudPlatform/kubernetes/issues/7203
//
if strings.HasPrefix(img, dockerPrefix) {
repoToPull, tag := parsers.ParseRepositoryTag(img)
// If no tag was specified, use the default "latest".
if len(tag) == 0 {
tag = "latest"
}
creds, ok := r.dockerKeyring.Lookup(repoToPull)
if !ok {
glog.V(1).Infof("Pulling image %s without credentials", img)
}
// Let's update a json.
// TODO(yifan): Find a way to feed this to rkt.
if err := r.writeDockerAuthConfig(img, creds); err != nil {
return err
}
}
output, err := r.runCommand("fetch", img)
if err != nil {
return fmt.Errorf("rkt: Failed to fetch image: %v:", output)
}
return nil
}
// IsImagePresent returns true if the image is available on the machine.
// TODO(yifan): 'rkt image' is now landed on master, use that once we bump up
// the rkt version.
func (r *runtime) IsImagePresent(img string) (bool, error) {
if _, err := r.runCommand("prepare", "--local=true", img); err != nil {
return false, nil
}
return true, nil
}
func (r *runtime) ListImages() ([]kubecontainer.Image, error) {
return []kubecontainer.Image{}, fmt.Errorf("rkt: ListImages unimplemented")
}
func (r *runtime) RemoveImage(image string) error {
return fmt.Errorf("rkt: RemoveImages unimplemented")
}
// SyncPod syncs the running pod to match the specified desired pod.
func (r *runtime) SyncPod(pod *api.Pod, runningPod kubecontainer.Pod, podStatus api.PodStatus) error {
podFullName := kubecontainer.GetPodFullName(pod)
if len(runningPod.Containers) == 0 {
glog.V(4).Infof("Pod %q is not running, will start it", podFullName)
// TODO(yifan): Use RunContainerOptionsGeneratior to get volumeMaps, etc.
return r.RunPod(pod, nil)
}
// Add references to all containers.
unidentifiedContainers := make(map[types.UID]*kubecontainer.Container)
for _, c := range runningPod.Containers {
unidentifiedContainers[c.ID] = c
}
restartPod := false
for _, container := range pod.Spec.Containers {
expectedHash := hashContainer(&container)
c := runningPod.FindContainerByName(container.Name)
if c == nil {
if kubecontainer.ShouldContainerBeRestarted(&container, pod, &podStatus, r.readinessManager) {
glog.V(3).Infof("Container %+v is dead, but RestartPolicy says that we should restart it.", container)
// TODO(yifan): Containers in one pod are fate-sharing at this moment, see:
// https://github.com/appc/spec/issues/276.
restartPod = true
break
}
continue
}
// TODO(yifan): Take care of host network change.
containerChanged := c.Hash != 0 && c.Hash != expectedHash
if containerChanged {
glog.Infof("Pod %q container %q hash changed (%d vs %d), it will be killed and re-created.", podFullName, container.Name, c.Hash, expectedHash)
restartPod = true
break
}
result, err := r.prober.Probe(pod, podStatus, container, string(c.ID), c.Created)
// TODO(vmarmol): examine this logic.
if err == nil && result != probe.Success {
glog.Infof("Pod %q container %q is unhealthy (probe result: %v), it will be killed and re-created.", podFullName, container.Name, result)
restartPod = true
break
}
if err != nil {
glog.V(2).Infof("Probe container %q failed: %v", container.Name, err)
}
delete(unidentifiedContainers, c.ID)
}
// If there is any unidentified containers, restart the pod.
if len(unidentifiedContainers) > 0 {
restartPod = true
}
if restartPod {
// TODO(yifan): Handle network plugin.
if err := r.KillPod(runningPod); err != nil {
return err
}
if err := r.RunPod(pod, nil); err != nil {
return err
}
}
return nil
}
// GetContainerLogs uses journalctl to get the logs of the container.
// By default, it returns a snapshot of the container log. Set |follow| to true to
// stream the log. Set |follow| to false and specify the number of lines (e.g.
// "100" or "all") to tail the log.
// TODO(yifan): Currently, it fetches all the containers' log within a pod. We will
// be able to fetch individual container's log once https://github.com/coreos/rkt/pull/841
// landed.
func (r *runtime) GetContainerLogs(pod *api.Pod, containerID string, tail string, follow bool, stdout, stderr io.Writer) error {
unitName := makePodServiceFileName(pod.UID)
cmd := exec.Command("journalctl", "-u", unitName)
if follow {
cmd.Args = append(cmd.Args, "-f")
}
if tail == "all" {
cmd.Args = append(cmd.Args, "-a")
} else {
_, err := strconv.Atoi(tail)
if err == nil {
cmd.Args = append(cmd.Args, "-n", tail)
}
}
cmd.Stdout, cmd.Stderr = stdout, stderr
return cmd.Start()
}
// GarbageCollect collects the pods/containers. TODO(yifan): Enforce the gc policy.
func (r *runtime) GarbageCollect() error {
if err := exec.Command("systemctl", "reset-failed").Run(); err != nil {
glog.Errorf("rkt: Failed to reset failed systemd services: %v", err)
}
if _, err := r.runCommand("gc", "--grace-period="+defaultGracePeriod, "--expire-prepared="+defaultExpirePrepared); err != nil {
glog.Errorf("rkt: Failed to gc: %v", err)
return err
}
return nil
}
// Note: In rkt, the container ID is in the form of "UUID:appName:ImageID", where
// appName is the container name.
func (r *runtime) RunInContainer(containerID string, cmd []string) ([]byte, error) {
glog.V(4).Infof("Rkt running in container.")
id, err := parseContainerID(containerID)
if err != nil {
return nil, err
}
// TODO(yifan): Use appName instead of imageID.
// see https://github.com/coreos/rkt/pull/640
args := append([]string{}, "enter", "--imageid", id.imageID, id.uuid)
args = append(args, cmd...)
result, err := r.runCommand(args...)
return []byte(strings.Join(result, "\n")), err
}
// Note: In rkt, the container ID is in the form of "UUID:appName:ImageID", where
// appName is the container name.
func (r *runtime) ExecInContainer(containerID string, cmd []string, stdin io.Reader, stdout, stderr io.WriteCloser, tty bool) error {
glog.V(4).Infof("Rkt execing in container.")
id, err := parseContainerID(containerID)
if err != nil {
return err
}
// TODO(yifan): Use appName instead of imageID.
// see https://github.com/coreos/rkt/pull/640
args := append([]string{}, "enter", "--imageid", id.imageID, id.uuid)
args = append(args, cmd...)
command := r.buildCommand(args...)
if tty {
// TODO(yifan): Merge with dockertools.StartPty().
p, err := pty.Start(command)
if err != nil {
return err
}
defer p.Close()
// make sure to close the stdout stream
defer stdout.Close()
if stdin != nil {
go io.Copy(p, stdin)
}
if stdout != nil {
go io.Copy(stdout, p)
}
return command.Wait()
}
if stdin != nil {
// Use an os.Pipe here as it returns true *os.File objects.
// This way, if you run 'kubectl exec -p <pod> -i bash' (no tty) and type 'exit',
// the call below to command.Run() can unblock because its Stdin is the read half
// of the pipe.
r, w, err := os.Pipe()
if err != nil {
return err
}
go io.Copy(w, stdin)
command.Stdin = r
}
if stdout != nil {
command.Stdout = stdout
}
if stderr != nil {
command.Stderr = stderr
}
return command.Run()
}
// findRktID returns the rkt uuid for the pod.
// TODO(yifan): This is unefficient which require us to list
// all the unit files.
func (r *runtime) findRktID(pod *kubecontainer.Pod) (string, error) {
units, err := r.systemd.ListUnits()
if err != nil {
return "", err
}
unitName := makePodServiceFileName(pod.ID)
for _, u := range units {
// u.Name contains file name ext such as .service, .socket, etc.
if u.Name != unitName {
continue
}
f, err := os.Open(path.Join(systemdServiceDir, u.Name))
if err != nil {
return "", err
}
defer f.Close()
opts, err := unit.Deserialize(f)
if err != nil {
return "", err
}
for _, opt := range opts {
if opt.Section == unitKubernetesSection && opt.Name == unitRktID {
return opt.Value, nil
}
}
}
return "", fmt.Errorf("rkt uuid not found for pod %v", pod)
}
// PortForward executes socat in the pod's network namespace and copies
// data between stream (representing the user's local connection on their
// computer) and the specified port in the container.
//
// TODO:
// - match cgroups of container
// - should we support nsenter + socat on the host? (current impl)
// - should we support nsenter + socat in a container, running with elevated privs and --pid=host?
//
// TODO(yifan): Merge with the same function in dockertools.
func (r *runtime) PortForward(pod *kubecontainer.Pod, port uint16, stream io.ReadWriteCloser) error {
glog.V(4).Infof("Rkt port forwarding in container.")
podInfos, err := r.getPodInfos()
if err != nil {
return err
}
rktID, err := r.findRktID(pod)
if err != nil {
return err
}
info, ok := podInfos[rktID]
if !ok {
return fmt.Errorf("cannot find the pod info for pod %v", pod)
}
if info.pid < 0 {
return fmt.Errorf("cannot get the pid for pod %v", pod)
}
_, lookupErr := exec.LookPath("socat")
if lookupErr != nil {
return fmt.Errorf("unable to do port forwarding: socat not found.")
}
args := []string{"-t", fmt.Sprintf("%d", info.pid), "-n", "socat", "-", fmt.Sprintf("TCP4:localhost:%d", port)}
_, lookupErr = exec.LookPath("nsenter")
if lookupErr != nil {
return fmt.Errorf("unable to do port forwarding: nsenter not found.")
}
command := exec.Command("nsenter", args...)
command.Stdin = stream
command.Stdout = stream
return command.Run()
}
// getPodInfos returns a map of [pod-uuid]:*podInfo
func (r *runtime) getPodInfos() (map[string]*podInfo, error) {
result := make(map[string]*podInfo)
output, err := r.runCommand("list", "--no-legend", "--full")
if err != nil {
return result, err
}
if len(output) == 0 {
// No pods are running.
return result, nil
}
// Example output of current 'rkt list --full' (version == 0.4.2):
// UUID ACI STATE NETWORKS
// 2372bc17-47cb-43fb-8d78-20b31729feda foo running default:ip4=172.16.28.3
// bar
// 40e2813b-9d5d-4146-a817-0de92646da96 foo exited
// 40e2813b-9d5d-4146-a817-0de92646da96 bar exited
//
// With '--no-legend', the first line is eliminated.
for _, line := range output {
tuples := splitLineByTab(line)
if len(tuples) < 3 { // At least it should have 3 entries.
glog.Warningf("rkt: Unrecognized line: %q", line)
continue
}
id := tuples[0]
status, err := r.runCommand("status", id)
if err != nil {
glog.Errorf("rkt: Cannot get status for pod (uuid=%q): %v", id, err)
continue
}
info, err := parsePodInfo(status)
if err != nil {
glog.Errorf("rkt: Cannot parse status for pod (uuid=%q): %v", id, err)
continue
}
result[id] = info
}
return result, nil
}

View File

@@ -0,0 +1,87 @@
// +build !linux
/*
Copyright 2015 The Kubernetes Authors All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package rkt
import (
"fmt"
"io"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
kubecontainer "github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/container"
)
// rkt is unsupported in non-Linux builds.
type unsupportedRuntime struct {
}
var _ kubecontainer.Runtime = &unsupportedRuntime{}
var unsupportedError = fmt.Errorf("rkt runtime is unsupported in this platform")
func (ur *unsupportedRuntime) Version() (kubecontainer.Version, error) {
return nil, unsupportedError
}
func (ur *unsupportedRuntime) GetPods(all bool) ([]*kubecontainer.Pod, error) {
return []*kubecontainer.Pod{}, unsupportedError
}
func (ur *unsupportedRuntime) SyncPod(pod *api.Pod, runningPod kubecontainer.Pod, podStatus api.PodStatus) error {
return unsupportedError
}
func (ur *unsupportedRuntime) KillPod(pod kubecontainer.Pod) error {
return unsupportedError
}
func (ur *unsupportedRuntime) GetPodStatus(*api.Pod) (*api.PodStatus, error) {
return nil, unsupportedError
}
func (ur *unsupportedRuntime) RunInContainer(containerID string, cmd []string) ([]byte, error) {
return []byte{}, unsupportedError
}
func (ur *unsupportedRuntime) ExecInContainer(containerID string, cmd []string, stdin io.Reader, stdout, stderr io.WriteCloser, tty bool) error {
return unsupportedError
}
func (ur *unsupportedRuntime) PortForward(pod *kubecontainer.Pod, port uint16, stream io.ReadWriteCloser) error {
return unsupportedError
}
func (ur *unsupportedRuntime) PullImage(image string) error {
return unsupportedError
}
func (ur *unsupportedRuntime) IsImagePresent(image string) (bool, error) {
return false, unsupportedError
}
func (ur *unsupportedRuntime) ListImages() ([]kubecontainer.Image, error) {
return []kubecontainer.Image{}, unsupportedError
}
func (ur *unsupportedRuntime) RemoveImage(image string) error {
return unsupportedError
}
func (ur *unsupportedRuntime) GetContainerLogs(pod *api.Pod, containerID, tail string, follow bool, stdout, stderr io.Writer) error {
return unsupportedError
}

View File

@@ -1,188 +0,0 @@
/*
Copyright 2015 The Kubernetes Authors All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package rkt
import (
"fmt"
"io"
"os"
"os/exec"
"path"
"strings"
kubecontainer "github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/container"
"github.com/coreos/go-systemd/unit"
"github.com/golang/glog"
"github.com/kr/pty"
)
// Note: In rkt, the container ID is in the form of "UUID:appName:ImageID", where
// appName is the container name.
func (r *Runtime) RunInContainer(containerID string, cmd []string) ([]byte, error) {
glog.V(4).Infof("Rkt running in container.")
id, err := parseContainerID(containerID)
if err != nil {
return nil, err
}
// TODO(yifan): Use appName instead of imageID.
// see https://github.com/coreos/rkt/pull/640
args := append([]string{}, "enter", "--imageid", id.imageID, id.uuid)
args = append(args, cmd...)
result, err := r.runCommand(args...)
return []byte(strings.Join(result, "\n")), err
}
// Note: In rkt, the container ID is in the form of "UUID:appName:ImageID", where
// appName is the container name.
func (r *Runtime) ExecInContainer(containerID string, cmd []string, stdin io.Reader, stdout, stderr io.WriteCloser, tty bool) error {
glog.V(4).Infof("Rkt execing in container.")
id, err := parseContainerID(containerID)
if err != nil {
return err
}
// TODO(yifan): Use appName instead of imageID.
// see https://github.com/coreos/rkt/pull/640
args := append([]string{}, "enter", "--imageid", id.imageID, id.uuid)
args = append(args, cmd...)
command := r.buildCommand(args...)
if tty {
// TODO(yifan): Merge with dockertools.StartPty().
p, err := pty.Start(command)
if err != nil {
return err
}
defer p.Close()
// make sure to close the stdout stream
defer stdout.Close()
if stdin != nil {
go io.Copy(p, stdin)
}
if stdout != nil {
go io.Copy(stdout, p)
}
return command.Wait()
}
if stdin != nil {
// Use an os.Pipe here as it returns true *os.File objects.
// This way, if you run 'kubectl exec -p <pod> -i bash' (no tty) and type 'exit',
// the call below to command.Run() can unblock because its Stdin is the read half
// of the pipe.
r, w, err := os.Pipe()
if err != nil {
return err
}
go io.Copy(w, stdin)
command.Stdin = r
}
if stdout != nil {
command.Stdout = stdout
}
if stderr != nil {
command.Stderr = stderr
}
return command.Run()
}
// findRktID returns the rkt uuid for the pod.
// TODO(yifan): This is unefficient which require us to list
// all the unit files.
func (r *Runtime) findRktID(pod *kubecontainer.Pod) (string, error) {
units, err := r.systemd.ListUnits()
if err != nil {
return "", err
}
unitName := makePodServiceFileName(pod.ID)
for _, u := range units {
// u.Name contains file name ext such as .service, .socket, etc.
if u.Name != unitName {
continue
}
f, err := os.Open(path.Join(systemdServiceDir, u.Name))
if err != nil {
return "", err
}
defer f.Close()
opts, err := unit.Deserialize(f)
if err != nil {
return "", err
}
for _, opt := range opts {
if opt.Section == unitKubernetesSection && opt.Name == unitRktID {
return opt.Value, nil
}
}
}
return "", fmt.Errorf("rkt uuid not found for pod %v", pod)
}
// PortForward executes socat in the pod's network namespace and copies
// data between stream (representing the user's local connection on their
// computer) and the specified port in the container.
//
// TODO:
// - match cgroups of container
// - should we support nsenter + socat on the host? (current impl)
// - should we support nsenter + socat in a container, running with elevated privs and --pid=host?
//
// TODO(yifan): Merge with the same function in dockertools.
func (r *Runtime) PortForward(pod *kubecontainer.Pod, port uint16, stream io.ReadWriteCloser) error {
glog.V(4).Infof("Rkt port forwarding in container.")
podInfos, err := r.getPodInfos()
if err != nil {
return err
}
rktID, err := r.findRktID(pod)
if err != nil {
return err
}
info, ok := podInfos[rktID]
if !ok {
return fmt.Errorf("cannot find the pod info for pod %v", pod)
}
if info.pid < 0 {
return fmt.Errorf("cannot get the pid for pod %v", pod)
}
_, lookupErr := exec.LookPath("socat")
if lookupErr != nil {
return fmt.Errorf("unable to do port forwarding: socat not found.")
}
args := []string{"-t", fmt.Sprintf("%d", info.pid), "-n", "socat", "-", fmt.Sprintf("TCP4:localhost:%d", port)}
_, lookupErr = exec.LookPath("nsenter")
if lookupErr != nil {
return fmt.Errorf("unable to do port forwarding: nsenter not found.")
}
command := exec.Command("nsenter", args...)
command.Stdin = stream
command.Stdout = stream
return command.Run()
}

View File

@@ -1,95 +0,0 @@
/*
Copyright 2015 The Kubernetes Authors All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package rkt
import (
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
kubecontainer "github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/container"
"github.com/GoogleCloudPlatform/kubernetes/pkg/probe"
"github.com/GoogleCloudPlatform/kubernetes/pkg/types"
"github.com/golang/glog"
)
// SyncPod syncs the running pod to match the specified desired pod.
func (r *Runtime) SyncPod(pod *api.Pod, runningPod kubecontainer.Pod, podStatus api.PodStatus) error {
podFullName := kubecontainer.GetPodFullName(pod)
if len(runningPod.Containers) == 0 {
glog.V(4).Infof("Pod %q is not running, will start it", podFullName)
// TODO(yifan): Use RunContainerOptionsGeneratior to get volumeMaps, etc.
return r.RunPod(pod, nil)
}
// Add references to all containers.
unidentifiedContainers := make(map[types.UID]*kubecontainer.Container)
for _, c := range runningPod.Containers {
unidentifiedContainers[c.ID] = c
}
restartPod := false
for _, container := range pod.Spec.Containers {
expectedHash := hashContainer(&container)
c := runningPod.FindContainerByName(container.Name)
if c == nil {
if kubecontainer.ShouldContainerBeRestarted(&container, pod, &podStatus, r.readinessManager) {
glog.V(3).Infof("Container %+v is dead, but RestartPolicy says that we should restart it.", container)
// TODO(yifan): Containers in one pod are fate-sharing at this moment, see:
// https://github.com/appc/spec/issues/276.
restartPod = true
break
}
continue
}
// TODO(yifan): Take care of host network change.
containerChanged := c.Hash != 0 && c.Hash != expectedHash
if containerChanged {
glog.Infof("Pod %q container %q hash changed (%d vs %d), it will be killed and re-created.", podFullName, container.Name, c.Hash, expectedHash)
restartPod = true
break
}
result, err := r.prober.Probe(pod, podStatus, container, string(c.ID), c.Created)
// TODO(vmarmol): examine this logic.
if err == nil && result != probe.Success {
glog.Infof("Pod %q container %q is unhealthy (probe result: %v), it will be killed and re-created.", podFullName, container.Name, result)
restartPod = true
break
}
if err != nil {
glog.V(2).Infof("Probe container %q failed: %v", container.Name, err)
}
delete(unidentifiedContainers, c.ID)
}
// If there is any unidentified containers, restart the pod.
if len(unidentifiedContainers) > 0 {
restartPod = true
}
if restartPod {
// TODO(yifan): Handle network plugin.
if err := r.KillPod(runningPod); err != nil {
return err
}
if err := r.RunPod(pod, nil); err != nil {
return err
}
}
return nil
}

View File

@@ -21,9 +21,6 @@ import (
"os/exec"
"strconv"
"strings"
kubecontainer "github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/container"
"github.com/golang/glog"
)
type rktVersion []int
@@ -78,35 +75,6 @@ func (r rktVersion) String() string {
return strings.Join(version, ".")
}
// Version invokes 'rkt version' to get the version information of the rkt
// runtime on the machine.
// The return values are an int array containers the version number.
//
// Example:
// rkt:0.3.2+git --> []int{0, 3, 2}.
//
func (r *Runtime) Version() (kubecontainer.Version, error) {
output, err := r.runCommand("version")
if err != nil {
return nil, err
}
// Example output for 'rkt version':
// rkt version 0.3.2+git
// appc version 0.3.0+git
for _, line := range output {
tuples := strings.Split(strings.TrimSpace(line), " ")
if len(tuples) != 3 {
glog.Warningf("rkt: cannot parse the output: %q.", line)
continue
}
if tuples[0] == "rkt" {
return parseVersion(tuples[2])
}
}
return nil, fmt.Errorf("rkt: cannot determine the version")
}
type systemdVersion int
func (s systemdVersion) String() string {