Merge pull request #13041 from yifan-gu/rkt_patch_fix

kubelet/rkt: merge environments instead of overriding.
This commit is contained in:
Yu-Ju Hong 2015-08-25 16:58:00 -07:00
commit 655645eb9c
2 changed files with 154 additions and 218 deletions

View File

@ -18,6 +18,7 @@ package rkt
import (
"fmt"
"reflect"
"strconv"
"strings"
@ -43,6 +44,21 @@ const (
exitCodePrefix = "app-"
)
// rktInfo represents the information of the rkt pod that stored in the
// systemd service file.
type rktInfo struct {
uuid string
restartCount int
}
func emptyRktInfo() *rktInfo {
return &rktInfo{restartCount: -1}
}
func (r *rktInfo) isEmpty() bool {
return reflect.DeepEqual(r, emptyRktInfo())
}
// podInfo is the internal type that represents the state of
// the rkt pod.
type podInfo struct {
@ -122,15 +138,15 @@ func getIPFromNetworkInfo(networkInfo string) string {
return ""
}
// getContainerStatus creates the api.containerStatus of a container from the podInfo.
func (p *podInfo) getContainerStatus(container *kubecontainer.Container) api.ContainerStatus {
// makeContainerStatus creates the api.containerStatus of a container from the podInfo.
func makeContainerStatus(container *kubecontainer.Container, podInfo *podInfo) api.ContainerStatus {
var status api.ContainerStatus
status.Name = container.Name
status.Image = container.Image
status.ContainerID = string(container.ID)
// TODO(yifan): Add image ID info.
switch p.state {
switch podInfo.state {
case Running:
// TODO(yifan): Get StartedAt.
status.State = api.ContainerState{
@ -141,7 +157,7 @@ func (p *podInfo) getContainerStatus(container *kubecontainer.Container) api.Con
case Embryo, Preparing, Prepared:
status.State = api.ContainerState{Waiting: &api.ContainerStateWaiting{}}
case AbortedPrepare, Deleting, Exited, Garbage:
exitCode, ok := p.exitCodes[status.Name]
exitCode, ok := podInfo.exitCodes[status.Name]
if !ok {
glog.Warningf("rkt: Cannot get exit code for container %v", container)
exitCode = -1
@ -154,18 +170,20 @@ func (p *podInfo) getContainerStatus(container *kubecontainer.Container) api.Con
},
}
default:
glog.Warningf("rkt: Unknown pod state: %q", p.state)
glog.Warningf("rkt: Unknown pod state: %q", podInfo.state)
}
return status
}
// toPodStatus converts a podInfo type into an api.PodStatus type.
func (p *podInfo) toPodStatus(pod *kubecontainer.Pod) api.PodStatus {
// makePodStatus constructs the pod status from the pod info and rkt info.
func makePodStatus(pod *kubecontainer.Pod, podInfo *podInfo, rktInfo *rktInfo) api.PodStatus {
var status api.PodStatus
status.PodIP = p.ip
status.PodIP = podInfo.ip
// For now just make every container's state the same as the pod.
for _, container := range pod.Containers {
status.ContainerStatuses = append(status.ContainerStatuses, p.getContainerStatus(container))
containerStatus := makeContainerStatus(container, podInfo)
containerStatus.RestartCount = rktInfo.restartCount
status.ContainerStatuses = append(status.ContainerStatuses, containerStatus)
}
return status
}

View File

@ -24,7 +24,6 @@ import (
"os"
"os/exec"
"path"
"sort"
"strconv"
"strings"
"syscall"
@ -46,7 +45,6 @@ import (
"k8s.io/kubernetes/pkg/securitycontext"
"k8s.io/kubernetes/pkg/types"
"k8s.io/kubernetes/pkg/util"
"k8s.io/kubernetes/pkg/util/errors"
)
const (
@ -62,6 +60,7 @@ const (
unitKubernetesSection = "X-Kubernetes"
unitPodName = "POD"
unitRktID = "RktID"
unitRestartCount = "RestartCount"
dockerPrefix = "docker://"
@ -301,13 +300,26 @@ func setIsolators(app *appctypes.App, c *api.Container) error {
return nil
}
// findEnvInList returns the index of environment variable in the environment whose Name equals env.Name.
func findEnvInList(envs appctypes.Environment, env kubecontainer.EnvVar) int {
for i, e := range envs {
if e.Name == env.Name {
return i
}
}
return -1
}
// setApp overrides the app's fields if any of them are specified in the
// container's spec.
func setApp(app *appctypes.App, c *api.Container, opts *kubecontainer.RunContainerOptions) error {
// Override the exec.
// TOOD(yifan): Revisit this for the overriding rule.
if len(c.Command) > 0 || len(c.Args) > 0 {
app.Exec = append(c.Command, c.Args...)
if len(c.Command) > 0 {
app.Exec = c.Command
}
if len(c.Args) > 0 {
app.Exec = append(app.Exec, c.Args...)
}
// TODO(yifan): Use non-root user in the future, see:
@ -319,11 +331,12 @@ func setApp(app *appctypes.App, c *api.Container, opts *kubecontainer.RunContain
app.WorkingDirectory = c.WorkingDir
}
// Override the environment.
if len(opts.Envs) > 0 {
app.Environment = []appctypes.EnvironmentVariable{}
}
for _, env := range c.Env {
// Merge the environment. Override the image with the ones defined in the spec if necessary.
for _, env := range opts.Envs {
if ix := findEnvInList(app.Environment, env); ix >= 0 {
app.Environment[ix].Value = env.Value
continue
}
app.Environment = append(app.Environment, appctypes.EnvironmentVariable{
Name: env.Name,
Value: env.Value,
@ -413,7 +426,7 @@ func (r *runtime) makePodManifest(pod *api.Pod, pullSecrets []api.Secret) (*appc
}
if imgManifest.App == nil {
return nil, fmt.Errorf("no app section in image manifest for image: %q", c.Image)
imgManifest.App = new(appctypes.App)
}
img, err := r.getImageByName(c.Image)
@ -509,25 +522,26 @@ func apiPodToruntimePod(uuid string, pod *api.Pod) *kubecontainer.Pod {
return p
}
// serviceFilePath returns the absolute path of the service file.
func serviceFilePath(serviceName string) string {
return path.Join(systemdServiceDir, serviceName)
}
// preparePod will:
//
// 1. Invoke 'rkt prepare' to prepare the pod, and get the rkt pod uuid.
// 2. Creates the unit file and save it under systemdUnitDir.
//
// On success, it will return a string that represents name of the unit file
// and a boolean that indicates if the unit file needs to be reloaded (whether
// the file is already existed).
func (r *runtime) preparePod(pod *api.Pod, pullSecrets []api.Secret) (string, bool, error) {
cmds := []string{"prepare", "--quiet", "--pod-manifest"}
// On success, it will return a string that represents name of the unit file.
func (r *runtime) preparePod(pod *api.Pod, pullSecrets []api.Secret) (string, error) {
// Generate the pod manifest from the pod spec.
manifest, err := r.makePodManifest(pod, pullSecrets)
if err != nil {
return "", false, err
return "", err
}
manifestFile, err := ioutil.TempFile("", fmt.Sprintf("manifest-%s-", pod.Name))
if err != nil {
return "", false, err
return "", err
}
defer func() {
manifestFile.Close()
@ -538,29 +552,31 @@ func (r *runtime) preparePod(pod *api.Pod, pullSecrets []api.Secret) (string, bo
data, err := json.Marshal(manifest)
if err != nil {
return "", false, err
return "", err
}
// Since File.Write returns error if the written length is less than len(data),
// so check error is enough for us.
if _, err := manifestFile.Write(data); err != nil {
return "", false, err
return "", err
}
cmds = append(cmds, manifestFile.Name())
// Run 'rkt prepare' to get the rkt UUID.
cmds := []string{"prepare", "--quiet", "--pod-manifest", manifestFile.Name()}
output, err := r.runCommand(cmds...)
if err != nil {
return "", false, err
return "", err
}
if len(output) != 1 {
return "", false, fmt.Errorf("cannot get uuid from 'rkt prepare'")
return "", fmt.Errorf("invalid output from 'rkt prepare': %v", output)
}
uuid := output[0]
glog.V(4).Infof("'rkt prepare' returns %q", uuid)
// Create systemd service file for the rkt pod.
p := apiPodToruntimePod(uuid, pod)
b, err := json.Marshal(p)
if err != nil {
return "", false, err
return "", err
}
var runPrepared string
@ -578,69 +594,79 @@ func (r *runtime) preparePod(pod *api.Pod, pullSecrets []api.Secret) (string, bo
newUnitOption("Service", "ExecStart", runPrepared),
}
// Save the unit file under systemd's service directory.
// TODO(yifan) Garbage collect 'dead' service files.
needReload := false
unitName := makePodServiceFileName(pod.UID)
if _, err := os.Stat(path.Join(systemdServiceDir, unitName)); err == nil {
// Check if there's old rkt pod corresponding to the same pod, if so, update the restart count.
var restartCount int
var needReload bool
serviceName := makePodServiceFileName(pod.UID)
if _, err := os.Stat(serviceFilePath(serviceName)); err == nil {
// Service file already exists, that means the pod is being restarted.
needReload = true
_, info, err := r.readServiceFile(serviceName)
if err != nil {
glog.Warningf("rkt: Cannot get old pod's info from service file %q: (%v), will ignore it", serviceName, err)
restartCount = 0
} else {
restartCount = info.restartCount + 1
}
}
unitFile, err := os.Create(path.Join(systemdServiceDir, unitName))
if err != nil {
return "", false, err
}
defer unitFile.Close()
units = append(units, newUnitOption(unitKubernetesSection, unitRestartCount, strconv.Itoa(restartCount)))
_, err = io.Copy(unitFile, unit.Serialize(units))
glog.V(4).Infof("rkt: Creating service file %q for pod %q", serviceName, pod.Name)
serviceFile, err := os.Create(serviceFilePath(serviceName))
if err != nil {
return "", false, err
return "", err
}
return unitName, needReload, nil
defer serviceFile.Close()
_, err = io.Copy(serviceFile, unit.Serialize(units))
if err != nil {
return "", err
}
if needReload {
if err := r.systemd.Reload(); err != nil {
return "", err
}
}
return serviceName, nil
}
// RunPod first creates the unit file for a pod, and then calls
// StartUnit over d-bus.
// RunPod first creates the unit file for a pod, and then
// starts the unit over d-bus.
func (r *runtime) RunPod(pod *api.Pod, pullSecrets []api.Secret) error {
glog.V(4).Infof("Rkt starts to run pod: name %q.", pod.Name)
name, needReload, err := r.preparePod(pod, pullSecrets)
name, err := r.preparePod(pod, pullSecrets)
if err != nil {
return err
}
if needReload {
// TODO(yifan): More graceful stop. Replace with StopUnit and wait for a timeout.
r.systemd.KillUnit(name, int32(syscall.SIGKILL))
if err := r.systemd.Reload(); err != nil {
return err
}
}
// TODO(yifan): This is the old version of go-systemd. Should update when libcontainer updates
// its version of go-systemd.
_, err = r.systemd.StartUnit(name, "replace")
// RestartUnit has the same effect as StartUnit if the unit is not running, besides it can restart
// a unit if the unit file is changed and reloaded.
_, err = r.systemd.RestartUnit(name, "replace")
if err != nil {
return err
}
return nil
}
// makeRuntimePod constructs the container runtime pod. It will:
// 1, Construct the pod by the information stored in the unit file.
// 2, Return the rkt uuid.
func (r *runtime) makeRuntimePod(unitName string) (*kubecontainer.Pod, string, error) {
f, err := os.Open(path.Join(systemdServiceDir, unitName))
// readServiceFile reads the service file and constructs the runtime pod and the rkt info.
func (r *runtime) readServiceFile(serviceName string) (*kubecontainer.Pod, *rktInfo, error) {
f, err := os.Open(serviceFilePath(serviceName))
if err != nil {
return nil, "", err
return nil, nil, err
}
defer f.Close()
var pod kubecontainer.Pod
opts, err := unit.Deserialize(f)
if err != nil {
return nil, "", err
return nil, nil, err
}
var rktID string
info := emptyRktInfo()
for _, opt := range opts {
if opt.Section != unitKubernetesSection {
continue
@ -649,19 +675,25 @@ func (r *runtime) makeRuntimePod(unitName string) (*kubecontainer.Pod, string, e
case unitPodName:
err = json.Unmarshal([]byte(opt.Value), &pod)
if err != nil {
return nil, "", err
return nil, nil, err
}
case unitRktID:
rktID = opt.Value
info.uuid = opt.Value
case unitRestartCount:
cnt, err := strconv.Atoi(opt.Value)
if err != nil {
return nil, nil, err
}
info.restartCount = cnt
default:
return nil, "", fmt.Errorf("rkt: Unexpected key: %q", opt.Name)
return nil, nil, fmt.Errorf("rkt: unexpected key: %q", opt.Name)
}
}
if len(rktID) == 0 {
return nil, "", fmt.Errorf("rkt: cannot find rkt ID of pod %v, unit file is broken", pod)
if info.isEmpty() {
return nil, nil, fmt.Errorf("rkt: cannot find rkt info of pod %v, unit file is broken", pod)
}
return &pod, "", nil
return &pod, info, nil
}
// GetPods runs 'systemctl list-unit' and 'rkt list' to get the list of rkt pods.
@ -682,7 +714,7 @@ func (r *runtime) GetPods(all bool) ([]*kubecontainer.Pod, error) {
if !all && u.SubState != "running" {
continue
}
pod, _, err := r.makeRuntimePod(u.Name)
pod, _, err := r.readServiceFile(u.Name)
if err != nil {
glog.Warningf("rkt: Cannot construct pod from unit file: %v.", err)
continue
@ -696,87 +728,33 @@ func (r *runtime) GetPods(all bool) ([]*kubecontainer.Pod, error) {
// KillPod invokes 'systemctl kill' to kill the unit that runs the pod.
func (r *runtime) KillPod(pod *api.Pod, runningPod kubecontainer.Pod) error {
glog.V(4).Infof("Rkt is killing pod: name %q.", runningPod.Name)
serviceName := makePodServiceFileName(runningPod.ID)
// TODO(yifan): More graceful stop. Replace with StopUnit and wait for a timeout.
r.systemd.KillUnit(makePodServiceFileName(runningPod.ID), int32(syscall.SIGKILL))
return r.systemd.Reload()
}
type byModTime []os.FileInfo
func (b byModTime) Len() int { return len(b) }
func (b byModTime) Swap(i, j int) { b[i], b[j] = b[j], b[i] }
func (b byModTime) Less(i, j int) bool { return b[i].ModTime().After(b[j].ModTime()) }
// listUnitFiles reads the systemd directory and returns a list of rkt
// service file names, sorted by the modification date from newest to oldest.
// TODO(yifan): Listing all units under the directory is inefficent, consider to
// create the list during startup, and then record every unit creation to avoid
// reading the whole directory.
func listUnitFiles() ([]string, error) {
files, err := ioutil.ReadDir(systemdServiceDir)
if err != nil {
return nil, err
}
sort.Sort(byModTime(files))
var rktFiles []string
for _, f := range files {
if strings.HasPrefix(f.Name(), kubernetesUnitPrefix) {
rktFiles = append(rktFiles, f.Name())
}
}
return rktFiles, nil
r.systemd.KillUnit(serviceName, int32(syscall.SIGKILL))
// Remove the systemd service file as well.
return os.Remove(serviceFilePath(serviceName))
}
// getPodStatus reads the service file and invokes 'rkt status $UUID' to get the
// pod's status.
func (r *runtime) getPodStatus(serviceName string) (*api.PodStatus, error) {
// TODO(yifan): Get rkt uuid from the service file name.
pod, uuid, err := r.makeRuntimePod(serviceName)
pod, rktInfo, err := r.readServiceFile(serviceName)
if err != nil {
return nil, err
}
podInfo, err := r.getPodInfo(uuid)
podInfo, err := r.getPodInfo(rktInfo.uuid)
if err != nil {
return nil, err
}
status := podInfo.toPodStatus(pod)
status := makePodStatus(pod, podInfo, rktInfo)
return &status, nil
}
// GetPodStatus returns the status of the given pod.
func (r *runtime) GetPodStatus(pod *api.Pod) (*api.PodStatus, error) {
unitNames, err := listUnitFiles()
if err != nil {
glog.Errorf("rkt: Cannot list unit files: %v", err)
return nil, err
}
var status *api.PodStatus
var errlist []error
for _, name := range unitNames {
if !strings.Contains(name, string(pod.UID)) {
continue
}
if status != nil {
// This means the pod has been restarted.
for _, c := range status.ContainerStatuses {
c.RestartCount++
}
continue
}
status, err = r.getPodStatus(name)
if err != nil {
glog.Errorf("rkt: Cannot get pod status for pod %q, service file %q: %v", pod.Name, name, err)
errlist = append(errlist, err)
continue
}
}
return status, errors.NewAggregate(errlist)
serviceName := makePodServiceFileName(pod.UID)
return r.getPodStatus(serviceName)
}
// Version invokes 'rkt version' to get the version information of the rkt
@ -913,6 +891,10 @@ func (r *runtime) IsImagePresent(image kubecontainer.ImageSpec) (bool, error) {
// SyncPod syncs the running pod to match the specified desired pod.
func (r *runtime) SyncPod(pod *api.Pod, runningPod kubecontainer.Pod, podStatus api.PodStatus, pullSecrets []api.Secret, backOff *util.Backoff) error {
podFullName := kubecontainer.GetPodFullName(pod)
if len(runningPod.Containers) == 0 {
glog.V(4).Infof("Pod %q is not running, will start it", podFullName)
return r.RunPod(pod, pullSecrets)
}
// Add references to all containers.
unidentifiedContainers := make(map[types.UID]*kubecontainer.Container)
@ -1092,36 +1074,26 @@ func (r *runtime) ExecInContainer(containerID string, cmd []string, stdin io.Rea
}
// findRktID returns the rkt uuid for the pod.
// TODO(yifan): This is unefficient which require us to list
// all the unit files.
func (r *runtime) findRktID(pod *kubecontainer.Pod) (string, error) {
units, err := r.systemd.ListUnits()
serviceName := makePodServiceFileName(pod.ID)
f, err := os.Open(serviceFilePath(serviceName))
if err != nil {
if os.IsNotExist(err) {
return "", fmt.Errorf("no service file %v for pod %q, UID %q", serviceName, pod.Name, pod.ID)
}
return "", err
}
defer f.Close()
opts, err := unit.Deserialize(f)
if err != nil {
return "", err
}
unitName := makePodServiceFileName(pod.ID)
for _, u := range units {
// u.Name contains file name ext such as .service, .socket, etc.
if u.Name != unitName {
continue
}
f, err := os.Open(path.Join(systemdServiceDir, u.Name))
if err != nil {
return "", err
}
defer f.Close()
opts, err := unit.Deserialize(f)
if err != nil {
return "", err
}
for _, opt := range opts {
if opt.Section == unitKubernetesSection && opt.Name == unitRktID {
return opt.Value, nil
}
for _, opt := range opts {
if opt.Section == unitKubernetesSection && opt.Name == unitRktID {
return opt.Value, nil
}
}
return "", fmt.Errorf("rkt uuid not found for pod %v", pod)
@ -1140,22 +1112,14 @@ func (r *runtime) findRktID(pod *kubecontainer.Pod) (string, error) {
func (r *runtime) PortForward(pod *kubecontainer.Pod, port uint16, stream io.ReadWriteCloser) error {
glog.V(4).Infof("Rkt port forwarding in container.")
podInfos, err := r.getPodInfos()
if err != nil {
return err
}
rktID, err := r.findRktID(pod)
if err != nil {
return err
}
info, ok := podInfos[rktID]
if !ok {
return fmt.Errorf("cannot find the pod info for pod %v", pod)
}
if info.pid < 0 {
return fmt.Errorf("cannot get the pid for pod %v", pod)
info, err := r.getPodInfo(rktID)
if err != nil {
return err
}
_, lookupErr := exec.LookPath("socat")
@ -1197,52 +1161,6 @@ func (r *runtime) getPodInfo(uuid string) (*podInfo, error) {
return info, nil
}
// getPodInfos returns a map of [pod-uuid]:*podInfo
func (r *runtime) getPodInfos() (map[string]*podInfo, error) {
result := make(map[string]*podInfo)
output, err := r.runCommand("list", "--no-legend", "--full")
if err != nil {
return result, err
}
if len(output) == 0 {
// No pods are running.
return result, nil
}
// Example output of 'rkt list --full' (version == 0.7.0):
//
// UUID APP ACI STATE NETWORKS
// 2372bc17-47cb-43fb-8d78-20b31729feda foo coreos.com/etcd running default:ip4=172.16.28.3
// bar nginx running
// 40e2813b-9d5d-4146-a817-0de92646da96 foo redis exited
// 40e2813b-9d5d-4146-a817-0de92646da96 bar busybox exited
//
// With '--no-legend', the first line is eliminated.
for _, line := range output {
tuples := splitLineByTab(line)
if len(tuples) < 1 {
continue
}
if !isUUID(tuples[0]) {
continue
}
id := tuples[0]
status, err := r.runCommand("status", id)
if err != nil {
glog.Errorf("rkt: Cannot get status for pod (uuid=%q): %v", id, err)
continue
}
info, err := parsePodInfo(status)
if err != nil {
glog.Errorf("rkt: Cannot parse status for pod (uuid=%q): %v", id, err)
continue
}
result[id] = info
}
return result, nil
}
// getImageByName tries to find the image info with the given image name.
// TODO(yifan): Replace with 'rkt image cat-manifest'.
// imageName should be in the form of 'example.com/app:latest', which should matches