Refactor rkt into one file.

This will make it easy to add build labels and stub out in non-Linux
builds.
This commit is contained in:
Victor Marmol 2015-05-04 16:51:31 -07:00
parent b0129089da
commit de2212983f
8 changed files with 440 additions and 556 deletions

View File

@ -16,34 +16,6 @@ limitations under the License.
package rkt
import (
"os/exec"
"github.com/golang/glog"
)
const (
// TODO(yifan): Merge with ContainerGCPolicy, i.e., derive
// the grace period from MinAge in ContainerGCPolicy.
//
// Duration to wait before discarding inactive pods from garbage
defaultGracePeriod = "1m"
// Duration to wait before expiring prepared pods.
defaultExpirePrepared = "1m"
)
// GarbageCollect collects the pods/containers. TODO(yifan): Enforce the gc policy.
func (r *Runtime) GarbageCollect() error {
if err := exec.Command("systemctl", "reset-failed").Run(); err != nil {
glog.Errorf("rkt: Failed to reset failed systemd services: %v", err)
}
if _, err := r.runCommand("gc", "--grace-period="+defaultGracePeriod, "--expire-prepared="+defaultExpirePrepared); err != nil {
glog.Errorf("rkt: Failed to gc: %v", err)
return err
}
return nil
}
// ImageManager manages and garbage collects the container images for rkt.
type ImageManager struct {
runtime *Runtime

View File

@ -1,50 +0,0 @@
/*
Copyright 2015 The Kubernetes Authors All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package rkt
import (
"io"
"os/exec"
"strconv"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
)
// GetContainerLogs uses journalctl to get the logs of the container.
// By default, it returns a snapshot of the container log. Set |follow| to true to
// stream the log. Set |follow| to false and specify the number of lines (e.g.
// "100" or "all") to tail the log.
// TODO(yifan): Currently, it fetches all the containers' log within a pod. We will
// be able to fetch individual container's log once https://github.com/coreos/rkt/pull/841
// landed.
func (r *Runtime) GetContainerLogs(pod *api.Pod, containerID string, tail string, follow bool, stdout, stderr io.Writer) error {
unitName := makePodServiceFileName(pod.UID)
cmd := exec.Command("journalctl", "-u", unitName)
if follow {
cmd.Args = append(cmd.Args, "-f")
}
if tail == "all" {
cmd.Args = append(cmd.Args, "-a")
} else {
_, err := strconv.Atoi(tail)
if err == nil {
cmd.Args = append(cmd.Args, "-n", tail)
}
}
cmd.Stdout, cmd.Stderr = stdout, stderr
return cmd.Start()
}

View File

@ -169,47 +169,3 @@ func splitLineByTab(line string) []string {
}
return result
}
// getPodInfos returns a map of [pod-uuid]:*podInfo
func (r *Runtime) getPodInfos() (map[string]*podInfo, error) {
result := make(map[string]*podInfo)
output, err := r.runCommand("list", "--no-legend", "--full")
if err != nil {
return result, err
}
if len(output) == 0 {
// No pods are running.
return result, nil
}
// Example output of current 'rkt list --full' (version == 0.4.2):
// UUID ACI STATE NETWORKS
// 2372bc17-47cb-43fb-8d78-20b31729feda foo running default:ip4=172.16.28.3
// bar
// 40e2813b-9d5d-4146-a817-0de92646da96 foo exited
// 40e2813b-9d5d-4146-a817-0de92646da96 bar exited
//
// With '--no-legend', the first line is eliminated.
for _, line := range output {
tuples := splitLineByTab(line)
if len(tuples) < 3 { // At least it should have 3 entries.
glog.Warningf("rkt: Unrecognized line: %q", line)
continue
}
id := tuples[0]
status, err := r.runCommand("status", id)
if err != nil {
glog.Errorf("rkt: Cannot get status for pod (uuid=%q): %v", id, err)
continue
}
info, err := parsePodInfo(status)
if err != nil {
glog.Errorf("rkt: Cannot parse status for pod (uuid=%q): %v", id, err)
continue
}
result[id] = info
}
return result, nil
}

View File

@ -1,119 +0,0 @@
/*
Copyright 2015 The Kubernetes Authors All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package rkt
import (
"fmt"
"os"
"path"
"strings"
kubecontainer "github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/container"
"github.com/docker/docker/pkg/parsers"
docker "github.com/fsouza/go-dockerclient"
"github.com/golang/glog"
)
const (
authDir = "auth.d"
dockerAuthTemplate = `{"rktKind":"dockerAuth","rktVersion":"v1","registries":[%q],"credentials":{"user":%q,"password":%q}}`
)
// writeDockerAuthConfig writes the docker credentials to rkt auth config files.
// This enables rkt to pull docker images from docker registry with credentials.
func (r *Runtime) writeDockerAuthConfig(image string, creds docker.AuthConfiguration) error {
registry := "index.docker.io"
// Image spec: [<registry>/]<repository>/<image>[:<version]
explicitRegistry := (strings.Count(image, "/") == 2)
if explicitRegistry {
registry = strings.Split(image, "/")[0]
}
localConfigDir := rktLocalConfigDir
if r.config.LocalConfigDir != "" {
localConfigDir = r.config.LocalConfigDir
}
authDir := path.Join(localConfigDir, "auth.d")
if _, err := os.Stat(authDir); os.IsNotExist(err) {
if err := os.Mkdir(authDir, 0600); err != nil {
glog.Errorf("rkt: Cannot create auth dir: %v", err)
return err
}
}
f, err := os.Create(path.Join(localConfigDir, authDir, registry+".json"))
if err != nil {
glog.Errorf("rkt: Cannot create docker auth config file: %v", err)
return err
}
defer f.Close()
config := fmt.Sprintf(dockerAuthTemplate, registry, creds.Username, creds.Password)
if _, err := f.Write([]byte(config)); err != nil {
glog.Errorf("rkt: Cannot write docker auth config file: %v", err)
return err
}
return nil
}
// PullImage invokes 'rkt fetch' to download an aci.
func (r *Runtime) PullImage(img string) error {
// Use credentials for docker images. This string operation can be cleaned up
// once the format of image is landed, see:
// https://github.com/GoogleCloudPlatform/kubernetes/issues/7203
//
if strings.HasPrefix(img, dockerPrefix) {
repoToPull, tag := parsers.ParseRepositoryTag(img)
// If no tag was specified, use the default "latest".
if len(tag) == 0 {
tag = "latest"
}
creds, ok := r.dockerKeyring.Lookup(repoToPull)
if !ok {
glog.V(1).Infof("Pulling image %s without credentials", img)
}
// Let's update a json.
// TODO(yifan): Find a way to feed this to rkt.
if err := r.writeDockerAuthConfig(img, creds); err != nil {
return err
}
}
output, err := r.runCommand("fetch", img)
if err != nil {
return fmt.Errorf("rkt: Failed to fetch image: %v:", output)
}
return nil
}
// IsImagePresent returns true if the image is available on the machine.
// TODO(yifan): 'rkt image' is now landed on master, use that once we bump up
// the rkt version.
func (r *Runtime) IsImagePresent(img string) (bool, error) {
if _, err := r.runCommand("prepare", "--local=true", img); err != nil {
return false, nil
}
return true, nil
}
func (r *Runtime) ListImages() ([]kubecontainer.Image, error) {
return []kubecontainer.Image{}, fmt.Errorf("rkt: ListImages unimplemented")
}
func (r *Runtime) RemoveImage(image string) error {
return fmt.Errorf("rkt: RemoveImages unimplemented")
}

View File

@ -1,3 +1,5 @@
// +build linux
/*
Copyright 2015 The Kubernetes Authors All rights reserved.
@ -25,6 +27,7 @@ import (
"os"
"os/exec"
"path"
"strconv"
"strings"
"syscall"
"time"
@ -34,6 +37,7 @@ import (
"github.com/GoogleCloudPlatform/kubernetes/pkg/credentialprovider"
kubecontainer "github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/container"
"github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/prober"
"github.com/GoogleCloudPlatform/kubernetes/pkg/probe"
"github.com/GoogleCloudPlatform/kubernetes/pkg/types"
"github.com/GoogleCloudPlatform/kubernetes/pkg/util"
"github.com/GoogleCloudPlatform/kubernetes/pkg/volume"
@ -42,7 +46,10 @@ import (
"github.com/coreos/go-systemd/dbus"
"github.com/coreos/go-systemd/unit"
"github.com/coreos/rkt/store"
"github.com/docker/docker/pkg/parsers"
"github.com/fsouza/go-dockerclient"
"github.com/golang/glog"
"github.com/kr/pty"
)
const (
@ -64,6 +71,17 @@ const (
unitRktID = "RktID"
dockerPrefix = "docker://"
authDir = "auth.d"
dockerAuthTemplate = `{"rktKind":"dockerAuth","rktVersion":"v1","registries":[%q],"credentials":{"user":%q,"password":%q}}`
// TODO(yifan): Merge with ContainerGCPolicy, i.e., derive
// the grace period from MinAge in ContainerGCPolicy.
//
// Duration to wait before discarding inactive pods from garbage
defaultGracePeriod = "1m"
// Duration to wait before expiring prepared pods.
defaultExpirePrepared = "1m"
)
// Runtime implements the ContainerRuntime for rkt. The implementation
@ -646,3 +664,425 @@ func (r *Runtime) GetPodStatus(pod *api.Pod) (*api.PodStatus, error) {
}
return &p.Status, nil
}
// Version invokes 'rkt version' to get the version information of the rkt
// runtime on the machine.
// The return values are an int array containers the version number.
//
// Example:
// rkt:0.3.2+git --> []int{0, 3, 2}.
//
func (r *Runtime) Version() (kubecontainer.Version, error) {
output, err := r.runCommand("version")
if err != nil {
return nil, err
}
// Example output for 'rkt version':
// rkt version 0.3.2+git
// appc version 0.3.0+git
for _, line := range output {
tuples := strings.Split(strings.TrimSpace(line), " ")
if len(tuples) != 3 {
glog.Warningf("rkt: cannot parse the output: %q.", line)
continue
}
if tuples[0] == "rkt" {
return parseVersion(tuples[2])
}
}
return nil, fmt.Errorf("rkt: cannot determine the version")
}
// writeDockerAuthConfig writes the docker credentials to rkt auth config files.
// This enables rkt to pull docker images from docker registry with credentials.
func (r *Runtime) writeDockerAuthConfig(image string, creds docker.AuthConfiguration) error {
registry := "index.docker.io"
// Image spec: [<registry>/]<repository>/<image>[:<version]
explicitRegistry := (strings.Count(image, "/") == 2)
if explicitRegistry {
registry = strings.Split(image, "/")[0]
}
localConfigDir := rktLocalConfigDir
if r.config.LocalConfigDir != "" {
localConfigDir = r.config.LocalConfigDir
}
authDir := path.Join(localConfigDir, "auth.d")
if _, err := os.Stat(authDir); os.IsNotExist(err) {
if err := os.Mkdir(authDir, 0600); err != nil {
glog.Errorf("rkt: Cannot create auth dir: %v", err)
return err
}
}
f, err := os.Create(path.Join(localConfigDir, authDir, registry+".json"))
if err != nil {
glog.Errorf("rkt: Cannot create docker auth config file: %v", err)
return err
}
defer f.Close()
config := fmt.Sprintf(dockerAuthTemplate, registry, creds.Username, creds.Password)
if _, err := f.Write([]byte(config)); err != nil {
glog.Errorf("rkt: Cannot write docker auth config file: %v", err)
return err
}
return nil
}
// PullImage invokes 'rkt fetch' to download an aci.
func (r *Runtime) PullImage(img string) error {
// Use credentials for docker images. This string operation can be cleaned up
// once the format of image is landed, see:
// https://github.com/GoogleCloudPlatform/kubernetes/issues/7203
//
if strings.HasPrefix(img, dockerPrefix) {
repoToPull, tag := parsers.ParseRepositoryTag(img)
// If no tag was specified, use the default "latest".
if len(tag) == 0 {
tag = "latest"
}
creds, ok := r.dockerKeyring.Lookup(repoToPull)
if !ok {
glog.V(1).Infof("Pulling image %s without credentials", img)
}
// Let's update a json.
// TODO(yifan): Find a way to feed this to rkt.
if err := r.writeDockerAuthConfig(img, creds); err != nil {
return err
}
}
output, err := r.runCommand("fetch", img)
if err != nil {
return fmt.Errorf("rkt: Failed to fetch image: %v:", output)
}
return nil
}
// IsImagePresent returns true if the image is available on the machine.
// TODO(yifan): 'rkt image' is now landed on master, use that once we bump up
// the rkt version.
func (r *Runtime) IsImagePresent(img string) (bool, error) {
if _, err := r.runCommand("prepare", "--local=true", img); err != nil {
return false, nil
}
return true, nil
}
func (r *Runtime) ListImages() ([]kubecontainer.Image, error) {
return []kubecontainer.Image{}, fmt.Errorf("rkt: ListImages unimplemented")
}
func (r *Runtime) RemoveImage(image string) error {
return fmt.Errorf("rkt: RemoveImages unimplemented")
}
// SyncPod syncs the running pod to match the specified desired pod.
func (r *Runtime) SyncPod(pod *api.Pod, runningPod kubecontainer.Pod, podStatus api.PodStatus) error {
podFullName := kubecontainer.GetPodFullName(pod)
if len(runningPod.Containers) == 0 {
glog.V(4).Infof("Pod %q is not running, will start it", podFullName)
// TODO(yifan): Use RunContainerOptionsGeneratior to get volumeMaps, etc.
return r.RunPod(pod, nil)
}
// Add references to all containers.
unidentifiedContainers := make(map[types.UID]*kubecontainer.Container)
for _, c := range runningPod.Containers {
unidentifiedContainers[c.ID] = c
}
restartPod := false
for _, container := range pod.Spec.Containers {
expectedHash := hashContainer(&container)
c := runningPod.FindContainerByName(container.Name)
if c == nil {
if kubecontainer.ShouldContainerBeRestarted(&container, pod, &podStatus, r.readinessManager) {
glog.V(3).Infof("Container %+v is dead, but RestartPolicy says that we should restart it.", container)
// TODO(yifan): Containers in one pod are fate-sharing at this moment, see:
// https://github.com/appc/spec/issues/276.
restartPod = true
break
}
continue
}
// TODO(yifan): Take care of host network change.
containerChanged := c.Hash != 0 && c.Hash != expectedHash
if containerChanged {
glog.Infof("Pod %q container %q hash changed (%d vs %d), it will be killed and re-created.", podFullName, container.Name, c.Hash, expectedHash)
restartPod = true
break
}
result, err := r.prober.Probe(pod, podStatus, container, string(c.ID), c.Created)
// TODO(vmarmol): examine this logic.
if err == nil && result != probe.Success {
glog.Infof("Pod %q container %q is unhealthy (probe result: %v), it will be killed and re-created.", podFullName, container.Name, result)
restartPod = true
break
}
if err != nil {
glog.V(2).Infof("Probe container %q failed: %v", container.Name, err)
}
delete(unidentifiedContainers, c.ID)
}
// If there is any unidentified containers, restart the pod.
if len(unidentifiedContainers) > 0 {
restartPod = true
}
if restartPod {
// TODO(yifan): Handle network plugin.
if err := r.KillPod(runningPod); err != nil {
return err
}
if err := r.RunPod(pod, nil); err != nil {
return err
}
}
return nil
}
// GetContainerLogs uses journalctl to get the logs of the container.
// By default, it returns a snapshot of the container log. Set |follow| to true to
// stream the log. Set |follow| to false and specify the number of lines (e.g.
// "100" or "all") to tail the log.
// TODO(yifan): Currently, it fetches all the containers' log within a pod. We will
// be able to fetch individual container's log once https://github.com/coreos/rkt/pull/841
// landed.
func (r *Runtime) GetContainerLogs(pod *api.Pod, containerID string, tail string, follow bool, stdout, stderr io.Writer) error {
unitName := makePodServiceFileName(pod.UID)
cmd := exec.Command("journalctl", "-u", unitName)
if follow {
cmd.Args = append(cmd.Args, "-f")
}
if tail == "all" {
cmd.Args = append(cmd.Args, "-a")
} else {
_, err := strconv.Atoi(tail)
if err == nil {
cmd.Args = append(cmd.Args, "-n", tail)
}
}
cmd.Stdout, cmd.Stderr = stdout, stderr
return cmd.Start()
}
// GarbageCollect collects the pods/containers. TODO(yifan): Enforce the gc policy.
func (r *Runtime) GarbageCollect() error {
if err := exec.Command("systemctl", "reset-failed").Run(); err != nil {
glog.Errorf("rkt: Failed to reset failed systemd services: %v", err)
}
if _, err := r.runCommand("gc", "--grace-period="+defaultGracePeriod, "--expire-prepared="+defaultExpirePrepared); err != nil {
glog.Errorf("rkt: Failed to gc: %v", err)
return err
}
return nil
}
// Note: In rkt, the container ID is in the form of "UUID:appName:ImageID", where
// appName is the container name.
func (r *Runtime) RunInContainer(containerID string, cmd []string) ([]byte, error) {
glog.V(4).Infof("Rkt running in container.")
id, err := parseContainerID(containerID)
if err != nil {
return nil, err
}
// TODO(yifan): Use appName instead of imageID.
// see https://github.com/coreos/rkt/pull/640
args := append([]string{}, "enter", "--imageid", id.imageID, id.uuid)
args = append(args, cmd...)
result, err := r.runCommand(args...)
return []byte(strings.Join(result, "\n")), err
}
// Note: In rkt, the container ID is in the form of "UUID:appName:ImageID", where
// appName is the container name.
func (r *Runtime) ExecInContainer(containerID string, cmd []string, stdin io.Reader, stdout, stderr io.WriteCloser, tty bool) error {
glog.V(4).Infof("Rkt execing in container.")
id, err := parseContainerID(containerID)
if err != nil {
return err
}
// TODO(yifan): Use appName instead of imageID.
// see https://github.com/coreos/rkt/pull/640
args := append([]string{}, "enter", "--imageid", id.imageID, id.uuid)
args = append(args, cmd...)
command := r.buildCommand(args...)
if tty {
// TODO(yifan): Merge with dockertools.StartPty().
p, err := pty.Start(command)
if err != nil {
return err
}
defer p.Close()
// make sure to close the stdout stream
defer stdout.Close()
if stdin != nil {
go io.Copy(p, stdin)
}
if stdout != nil {
go io.Copy(stdout, p)
}
return command.Wait()
}
if stdin != nil {
// Use an os.Pipe here as it returns true *os.File objects.
// This way, if you run 'kubectl exec -p <pod> -i bash' (no tty) and type 'exit',
// the call below to command.Run() can unblock because its Stdin is the read half
// of the pipe.
r, w, err := os.Pipe()
if err != nil {
return err
}
go io.Copy(w, stdin)
command.Stdin = r
}
if stdout != nil {
command.Stdout = stdout
}
if stderr != nil {
command.Stderr = stderr
}
return command.Run()
}
// findRktID returns the rkt uuid for the pod.
// TODO(yifan): This is unefficient which require us to list
// all the unit files.
func (r *Runtime) findRktID(pod *kubecontainer.Pod) (string, error) {
units, err := r.systemd.ListUnits()
if err != nil {
return "", err
}
unitName := makePodServiceFileName(pod.ID)
for _, u := range units {
// u.Name contains file name ext such as .service, .socket, etc.
if u.Name != unitName {
continue
}
f, err := os.Open(path.Join(systemdServiceDir, u.Name))
if err != nil {
return "", err
}
defer f.Close()
opts, err := unit.Deserialize(f)
if err != nil {
return "", err
}
for _, opt := range opts {
if opt.Section == unitKubernetesSection && opt.Name == unitRktID {
return opt.Value, nil
}
}
}
return "", fmt.Errorf("rkt uuid not found for pod %v", pod)
}
// PortForward executes socat in the pod's network namespace and copies
// data between stream (representing the user's local connection on their
// computer) and the specified port in the container.
//
// TODO:
// - match cgroups of container
// - should we support nsenter + socat on the host? (current impl)
// - should we support nsenter + socat in a container, running with elevated privs and --pid=host?
//
// TODO(yifan): Merge with the same function in dockertools.
func (r *Runtime) PortForward(pod *kubecontainer.Pod, port uint16, stream io.ReadWriteCloser) error {
glog.V(4).Infof("Rkt port forwarding in container.")
podInfos, err := r.getPodInfos()
if err != nil {
return err
}
rktID, err := r.findRktID(pod)
if err != nil {
return err
}
info, ok := podInfos[rktID]
if !ok {
return fmt.Errorf("cannot find the pod info for pod %v", pod)
}
if info.pid < 0 {
return fmt.Errorf("cannot get the pid for pod %v", pod)
}
_, lookupErr := exec.LookPath("socat")
if lookupErr != nil {
return fmt.Errorf("unable to do port forwarding: socat not found.")
}
args := []string{"-t", fmt.Sprintf("%d", info.pid), "-n", "socat", "-", fmt.Sprintf("TCP4:localhost:%d", port)}
_, lookupErr = exec.LookPath("nsenter")
if lookupErr != nil {
return fmt.Errorf("unable to do port forwarding: nsenter not found.")
}
command := exec.Command("nsenter", args...)
command.Stdin = stream
command.Stdout = stream
return command.Run()
}
// getPodInfos returns a map of [pod-uuid]:*podInfo
func (r *Runtime) getPodInfos() (map[string]*podInfo, error) {
result := make(map[string]*podInfo)
output, err := r.runCommand("list", "--no-legend", "--full")
if err != nil {
return result, err
}
if len(output) == 0 {
// No pods are running.
return result, nil
}
// Example output of current 'rkt list --full' (version == 0.4.2):
// UUID ACI STATE NETWORKS
// 2372bc17-47cb-43fb-8d78-20b31729feda foo running default:ip4=172.16.28.3
// bar
// 40e2813b-9d5d-4146-a817-0de92646da96 foo exited
// 40e2813b-9d5d-4146-a817-0de92646da96 bar exited
//
// With '--no-legend', the first line is eliminated.
for _, line := range output {
tuples := splitLineByTab(line)
if len(tuples) < 3 { // At least it should have 3 entries.
glog.Warningf("rkt: Unrecognized line: %q", line)
continue
}
id := tuples[0]
status, err := r.runCommand("status", id)
if err != nil {
glog.Errorf("rkt: Cannot get status for pod (uuid=%q): %v", id, err)
continue
}
info, err := parsePodInfo(status)
if err != nil {
glog.Errorf("rkt: Cannot parse status for pod (uuid=%q): %v", id, err)
continue
}
result[id] = info
}
return result, nil
}

View File

@ -1,188 +0,0 @@
/*
Copyright 2015 The Kubernetes Authors All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package rkt
import (
"fmt"
"io"
"os"
"os/exec"
"path"
"strings"
kubecontainer "github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/container"
"github.com/coreos/go-systemd/unit"
"github.com/golang/glog"
"github.com/kr/pty"
)
// Note: In rkt, the container ID is in the form of "UUID:appName:ImageID", where
// appName is the container name.
func (r *Runtime) RunInContainer(containerID string, cmd []string) ([]byte, error) {
glog.V(4).Infof("Rkt running in container.")
id, err := parseContainerID(containerID)
if err != nil {
return nil, err
}
// TODO(yifan): Use appName instead of imageID.
// see https://github.com/coreos/rkt/pull/640
args := append([]string{}, "enter", "--imageid", id.imageID, id.uuid)
args = append(args, cmd...)
result, err := r.runCommand(args...)
return []byte(strings.Join(result, "\n")), err
}
// Note: In rkt, the container ID is in the form of "UUID:appName:ImageID", where
// appName is the container name.
func (r *Runtime) ExecInContainer(containerID string, cmd []string, stdin io.Reader, stdout, stderr io.WriteCloser, tty bool) error {
glog.V(4).Infof("Rkt execing in container.")
id, err := parseContainerID(containerID)
if err != nil {
return err
}
// TODO(yifan): Use appName instead of imageID.
// see https://github.com/coreos/rkt/pull/640
args := append([]string{}, "enter", "--imageid", id.imageID, id.uuid)
args = append(args, cmd...)
command := r.buildCommand(args...)
if tty {
// TODO(yifan): Merge with dockertools.StartPty().
p, err := pty.Start(command)
if err != nil {
return err
}
defer p.Close()
// make sure to close the stdout stream
defer stdout.Close()
if stdin != nil {
go io.Copy(p, stdin)
}
if stdout != nil {
go io.Copy(stdout, p)
}
return command.Wait()
}
if stdin != nil {
// Use an os.Pipe here as it returns true *os.File objects.
// This way, if you run 'kubectl exec -p <pod> -i bash' (no tty) and type 'exit',
// the call below to command.Run() can unblock because its Stdin is the read half
// of the pipe.
r, w, err := os.Pipe()
if err != nil {
return err
}
go io.Copy(w, stdin)
command.Stdin = r
}
if stdout != nil {
command.Stdout = stdout
}
if stderr != nil {
command.Stderr = stderr
}
return command.Run()
}
// findRktID returns the rkt uuid for the pod.
// TODO(yifan): This is unefficient which require us to list
// all the unit files.
func (r *Runtime) findRktID(pod *kubecontainer.Pod) (string, error) {
units, err := r.systemd.ListUnits()
if err != nil {
return "", err
}
unitName := makePodServiceFileName(pod.ID)
for _, u := range units {
// u.Name contains file name ext such as .service, .socket, etc.
if u.Name != unitName {
continue
}
f, err := os.Open(path.Join(systemdServiceDir, u.Name))
if err != nil {
return "", err
}
defer f.Close()
opts, err := unit.Deserialize(f)
if err != nil {
return "", err
}
for _, opt := range opts {
if opt.Section == unitKubernetesSection && opt.Name == unitRktID {
return opt.Value, nil
}
}
}
return "", fmt.Errorf("rkt uuid not found for pod %v", pod)
}
// PortForward executes socat in the pod's network namespace and copies
// data between stream (representing the user's local connection on their
// computer) and the specified port in the container.
//
// TODO:
// - match cgroups of container
// - should we support nsenter + socat on the host? (current impl)
// - should we support nsenter + socat in a container, running with elevated privs and --pid=host?
//
// TODO(yifan): Merge with the same function in dockertools.
func (r *Runtime) PortForward(pod *kubecontainer.Pod, port uint16, stream io.ReadWriteCloser) error {
glog.V(4).Infof("Rkt port forwarding in container.")
podInfos, err := r.getPodInfos()
if err != nil {
return err
}
rktID, err := r.findRktID(pod)
if err != nil {
return err
}
info, ok := podInfos[rktID]
if !ok {
return fmt.Errorf("cannot find the pod info for pod %v", pod)
}
if info.pid < 0 {
return fmt.Errorf("cannot get the pid for pod %v", pod)
}
_, lookupErr := exec.LookPath("socat")
if lookupErr != nil {
return fmt.Errorf("unable to do port forwarding: socat not found.")
}
args := []string{"-t", fmt.Sprintf("%d", info.pid), "-n", "socat", "-", fmt.Sprintf("TCP4:localhost:%d", port)}
_, lookupErr = exec.LookPath("nsenter")
if lookupErr != nil {
return fmt.Errorf("unable to do port forwarding: nsenter not found.")
}
command := exec.Command("nsenter", args...)
command.Stdin = stream
command.Stdout = stream
return command.Run()
}

View File

@ -1,95 +0,0 @@
/*
Copyright 2015 The Kubernetes Authors All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package rkt
import (
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
kubecontainer "github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/container"
"github.com/GoogleCloudPlatform/kubernetes/pkg/probe"
"github.com/GoogleCloudPlatform/kubernetes/pkg/types"
"github.com/golang/glog"
)
// SyncPod syncs the running pod to match the specified desired pod.
func (r *Runtime) SyncPod(pod *api.Pod, runningPod kubecontainer.Pod, podStatus api.PodStatus) error {
podFullName := kubecontainer.GetPodFullName(pod)
if len(runningPod.Containers) == 0 {
glog.V(4).Infof("Pod %q is not running, will start it", podFullName)
// TODO(yifan): Use RunContainerOptionsGeneratior to get volumeMaps, etc.
return r.RunPod(pod, nil)
}
// Add references to all containers.
unidentifiedContainers := make(map[types.UID]*kubecontainer.Container)
for _, c := range runningPod.Containers {
unidentifiedContainers[c.ID] = c
}
restartPod := false
for _, container := range pod.Spec.Containers {
expectedHash := hashContainer(&container)
c := runningPod.FindContainerByName(container.Name)
if c == nil {
if kubecontainer.ShouldContainerBeRestarted(&container, pod, &podStatus, r.readinessManager) {
glog.V(3).Infof("Container %+v is dead, but RestartPolicy says that we should restart it.", container)
// TODO(yifan): Containers in one pod are fate-sharing at this moment, see:
// https://github.com/appc/spec/issues/276.
restartPod = true
break
}
continue
}
// TODO(yifan): Take care of host network change.
containerChanged := c.Hash != 0 && c.Hash != expectedHash
if containerChanged {
glog.Infof("Pod %q container %q hash changed (%d vs %d), it will be killed and re-created.", podFullName, container.Name, c.Hash, expectedHash)
restartPod = true
break
}
result, err := r.prober.Probe(pod, podStatus, container, string(c.ID), c.Created)
// TODO(vmarmol): examine this logic.
if err == nil && result != probe.Success {
glog.Infof("Pod %q container %q is unhealthy (probe result: %v), it will be killed and re-created.", podFullName, container.Name, result)
restartPod = true
break
}
if err != nil {
glog.V(2).Infof("Probe container %q failed: %v", container.Name, err)
}
delete(unidentifiedContainers, c.ID)
}
// If there is any unidentified containers, restart the pod.
if len(unidentifiedContainers) > 0 {
restartPod = true
}
if restartPod {
// TODO(yifan): Handle network plugin.
if err := r.KillPod(runningPod); err != nil {
return err
}
if err := r.RunPod(pod, nil); err != nil {
return err
}
}
return nil
}

View File

@ -21,9 +21,6 @@ import (
"os/exec"
"strconv"
"strings"
kubecontainer "github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/container"
"github.com/golang/glog"
)
type rktVersion []int
@ -78,35 +75,6 @@ func (r rktVersion) String() string {
return strings.Join(version, ".")
}
// Version invokes 'rkt version' to get the version information of the rkt
// runtime on the machine.
// The return values are an int array containers the version number.
//
// Example:
// rkt:0.3.2+git --> []int{0, 3, 2}.
//
func (r *Runtime) Version() (kubecontainer.Version, error) {
output, err := r.runCommand("version")
if err != nil {
return nil, err
}
// Example output for 'rkt version':
// rkt version 0.3.2+git
// appc version 0.3.0+git
for _, line := range output {
tuples := strings.Split(strings.TrimSpace(line), " ")
if len(tuples) != 3 {
glog.Warningf("rkt: cannot parse the output: %q.", line)
continue
}
if tuples[0] == "rkt" {
return parseVersion(tuples[2])
}
}
return nil, fmt.Errorf("rkt: cannot determine the version")
}
type systemdVersion int
func (s systemdVersion) String() string {