diff --git a/pkg/kubelet/rkt/pod_info.go b/pkg/kubelet/rkt/pod_info.go index e30a13e327f..d76614813d4 100644 --- a/pkg/kubelet/rkt/pod_info.go +++ b/pkg/kubelet/rkt/pod_info.go @@ -18,6 +18,7 @@ package rkt import ( "fmt" + "reflect" "strconv" "strings" @@ -43,6 +44,21 @@ const ( exitCodePrefix = "app-" ) +// rktInfo represents the information of the rkt pod that stored in the +// systemd service file. +type rktInfo struct { + uuid string + restartCount int +} + +func emptyRktInfo() *rktInfo { + return &rktInfo{restartCount: -1} +} + +func (r *rktInfo) isEmpty() bool { + return reflect.DeepEqual(r, emptyRktInfo()) +} + // podInfo is the internal type that represents the state of // the rkt pod. type podInfo struct { @@ -122,15 +138,15 @@ func getIPFromNetworkInfo(networkInfo string) string { return "" } -// getContainerStatus creates the api.containerStatus of a container from the podInfo. -func (p *podInfo) getContainerStatus(container *kubecontainer.Container) api.ContainerStatus { +// makeContainerStatus creates the api.containerStatus of a container from the podInfo. +func makeContainerStatus(container *kubecontainer.Container, podInfo *podInfo) api.ContainerStatus { var status api.ContainerStatus status.Name = container.Name status.Image = container.Image status.ContainerID = string(container.ID) // TODO(yifan): Add image ID info. - switch p.state { + switch podInfo.state { case Running: // TODO(yifan): Get StartedAt. status.State = api.ContainerState{ @@ -141,7 +157,7 @@ func (p *podInfo) getContainerStatus(container *kubecontainer.Container) api.Con case Embryo, Preparing, Prepared: status.State = api.ContainerState{Waiting: &api.ContainerStateWaiting{}} case AbortedPrepare, Deleting, Exited, Garbage: - exitCode, ok := p.exitCodes[status.Name] + exitCode, ok := podInfo.exitCodes[status.Name] if !ok { glog.Warningf("rkt: Cannot get exit code for container %v", container) exitCode = -1 @@ -154,18 +170,20 @@ func (p *podInfo) getContainerStatus(container *kubecontainer.Container) api.Con }, } default: - glog.Warningf("rkt: Unknown pod state: %q", p.state) + glog.Warningf("rkt: Unknown pod state: %q", podInfo.state) } return status } -// toPodStatus converts a podInfo type into an api.PodStatus type. -func (p *podInfo) toPodStatus(pod *kubecontainer.Pod) api.PodStatus { +// makePodStatus constructs the pod status from the pod info and rkt info. +func makePodStatus(pod *kubecontainer.Pod, podInfo *podInfo, rktInfo *rktInfo) api.PodStatus { var status api.PodStatus - status.PodIP = p.ip + status.PodIP = podInfo.ip // For now just make every container's state the same as the pod. for _, container := range pod.Containers { - status.ContainerStatuses = append(status.ContainerStatuses, p.getContainerStatus(container)) + containerStatus := makeContainerStatus(container, podInfo) + containerStatus.RestartCount = rktInfo.restartCount + status.ContainerStatuses = append(status.ContainerStatuses, containerStatus) } return status } diff --git a/pkg/kubelet/rkt/rkt.go b/pkg/kubelet/rkt/rkt.go index d32d2a3cc0b..87255b66a47 100644 --- a/pkg/kubelet/rkt/rkt.go +++ b/pkg/kubelet/rkt/rkt.go @@ -24,7 +24,6 @@ import ( "os" "os/exec" "path" - "sort" "strconv" "strings" "syscall" @@ -46,7 +45,6 @@ import ( "k8s.io/kubernetes/pkg/securitycontext" "k8s.io/kubernetes/pkg/types" "k8s.io/kubernetes/pkg/util" - "k8s.io/kubernetes/pkg/util/errors" ) const ( @@ -62,6 +60,7 @@ const ( unitKubernetesSection = "X-Kubernetes" unitPodName = "POD" unitRktID = "RktID" + unitRestartCount = "RestartCount" dockerPrefix = "docker://" @@ -301,13 +300,26 @@ func setIsolators(app *appctypes.App, c *api.Container) error { return nil } +// findEnvInList returns the index of environment variable in the environment whose Name equals env.Name. +func findEnvInList(envs appctypes.Environment, env kubecontainer.EnvVar) int { + for i, e := range envs { + if e.Name == env.Name { + return i + } + } + return -1 +} + // setApp overrides the app's fields if any of them are specified in the // container's spec. func setApp(app *appctypes.App, c *api.Container, opts *kubecontainer.RunContainerOptions) error { // Override the exec. - // TOOD(yifan): Revisit this for the overriding rule. - if len(c.Command) > 0 || len(c.Args) > 0 { - app.Exec = append(c.Command, c.Args...) + + if len(c.Command) > 0 { + app.Exec = c.Command + } + if len(c.Args) > 0 { + app.Exec = append(app.Exec, c.Args...) } // TODO(yifan): Use non-root user in the future, see: @@ -319,11 +331,12 @@ func setApp(app *appctypes.App, c *api.Container, opts *kubecontainer.RunContain app.WorkingDirectory = c.WorkingDir } - // Override the environment. - if len(opts.Envs) > 0 { - app.Environment = []appctypes.EnvironmentVariable{} - } - for _, env := range c.Env { + // Merge the environment. Override the image with the ones defined in the spec if necessary. + for _, env := range opts.Envs { + if ix := findEnvInList(app.Environment, env); ix >= 0 { + app.Environment[ix].Value = env.Value + continue + } app.Environment = append(app.Environment, appctypes.EnvironmentVariable{ Name: env.Name, Value: env.Value, @@ -413,7 +426,7 @@ func (r *runtime) makePodManifest(pod *api.Pod, pullSecrets []api.Secret) (*appc } if imgManifest.App == nil { - return nil, fmt.Errorf("no app section in image manifest for image: %q", c.Image) + imgManifest.App = new(appctypes.App) } img, err := r.getImageByName(c.Image) @@ -509,25 +522,26 @@ func apiPodToruntimePod(uuid string, pod *api.Pod) *kubecontainer.Pod { return p } +// serviceFilePath returns the absolute path of the service file. +func serviceFilePath(serviceName string) string { + return path.Join(systemdServiceDir, serviceName) +} + // preparePod will: // // 1. Invoke 'rkt prepare' to prepare the pod, and get the rkt pod uuid. // 2. Creates the unit file and save it under systemdUnitDir. // -// On success, it will return a string that represents name of the unit file -// and a boolean that indicates if the unit file needs to be reloaded (whether -// the file is already existed). -func (r *runtime) preparePod(pod *api.Pod, pullSecrets []api.Secret) (string, bool, error) { - cmds := []string{"prepare", "--quiet", "--pod-manifest"} - +// On success, it will return a string that represents name of the unit file. +func (r *runtime) preparePod(pod *api.Pod, pullSecrets []api.Secret) (string, error) { // Generate the pod manifest from the pod spec. manifest, err := r.makePodManifest(pod, pullSecrets) if err != nil { - return "", false, err + return "", err } manifestFile, err := ioutil.TempFile("", fmt.Sprintf("manifest-%s-", pod.Name)) if err != nil { - return "", false, err + return "", err } defer func() { manifestFile.Close() @@ -538,29 +552,31 @@ func (r *runtime) preparePod(pod *api.Pod, pullSecrets []api.Secret) (string, bo data, err := json.Marshal(manifest) if err != nil { - return "", false, err + return "", err } // Since File.Write returns error if the written length is less than len(data), // so check error is enough for us. if _, err := manifestFile.Write(data); err != nil { - return "", false, err + return "", err } - cmds = append(cmds, manifestFile.Name()) + // Run 'rkt prepare' to get the rkt UUID. + cmds := []string{"prepare", "--quiet", "--pod-manifest", manifestFile.Name()} output, err := r.runCommand(cmds...) if err != nil { - return "", false, err + return "", err } if len(output) != 1 { - return "", false, fmt.Errorf("cannot get uuid from 'rkt prepare'") + return "", fmt.Errorf("invalid output from 'rkt prepare': %v", output) } uuid := output[0] glog.V(4).Infof("'rkt prepare' returns %q", uuid) + // Create systemd service file for the rkt pod. p := apiPodToruntimePod(uuid, pod) b, err := json.Marshal(p) if err != nil { - return "", false, err + return "", err } var runPrepared string @@ -578,69 +594,79 @@ func (r *runtime) preparePod(pod *api.Pod, pullSecrets []api.Secret) (string, bo newUnitOption("Service", "ExecStart", runPrepared), } - // Save the unit file under systemd's service directory. - // TODO(yifan) Garbage collect 'dead' service files. - needReload := false - unitName := makePodServiceFileName(pod.UID) - if _, err := os.Stat(path.Join(systemdServiceDir, unitName)); err == nil { + // Check if there's old rkt pod corresponding to the same pod, if so, update the restart count. + var restartCount int + var needReload bool + serviceName := makePodServiceFileName(pod.UID) + if _, err := os.Stat(serviceFilePath(serviceName)); err == nil { + // Service file already exists, that means the pod is being restarted. needReload = true + _, info, err := r.readServiceFile(serviceName) + if err != nil { + glog.Warningf("rkt: Cannot get old pod's info from service file %q: (%v), will ignore it", serviceName, err) + restartCount = 0 + } else { + restartCount = info.restartCount + 1 + } } - unitFile, err := os.Create(path.Join(systemdServiceDir, unitName)) - if err != nil { - return "", false, err - } - defer unitFile.Close() + units = append(units, newUnitOption(unitKubernetesSection, unitRestartCount, strconv.Itoa(restartCount))) - _, err = io.Copy(unitFile, unit.Serialize(units)) + glog.V(4).Infof("rkt: Creating service file %q for pod %q", serviceName, pod.Name) + serviceFile, err := os.Create(serviceFilePath(serviceName)) if err != nil { - return "", false, err + return "", err } - return unitName, needReload, nil + defer serviceFile.Close() + + _, err = io.Copy(serviceFile, unit.Serialize(units)) + if err != nil { + return "", err + } + + if needReload { + if err := r.systemd.Reload(); err != nil { + return "", err + } + } + return serviceName, nil } -// RunPod first creates the unit file for a pod, and then calls -// StartUnit over d-bus. +// RunPod first creates the unit file for a pod, and then +// starts the unit over d-bus. func (r *runtime) RunPod(pod *api.Pod, pullSecrets []api.Secret) error { glog.V(4).Infof("Rkt starts to run pod: name %q.", pod.Name) - name, needReload, err := r.preparePod(pod, pullSecrets) + name, err := r.preparePod(pod, pullSecrets) if err != nil { return err } - if needReload { - // TODO(yifan): More graceful stop. Replace with StopUnit and wait for a timeout. - r.systemd.KillUnit(name, int32(syscall.SIGKILL)) - if err := r.systemd.Reload(); err != nil { - return err - } - } // TODO(yifan): This is the old version of go-systemd. Should update when libcontainer updates // its version of go-systemd. - _, err = r.systemd.StartUnit(name, "replace") + // RestartUnit has the same effect as StartUnit if the unit is not running, besides it can restart + // a unit if the unit file is changed and reloaded. + _, err = r.systemd.RestartUnit(name, "replace") if err != nil { return err } return nil } -// makeRuntimePod constructs the container runtime pod. It will: -// 1, Construct the pod by the information stored in the unit file. -// 2, Return the rkt uuid. -func (r *runtime) makeRuntimePod(unitName string) (*kubecontainer.Pod, string, error) { - f, err := os.Open(path.Join(systemdServiceDir, unitName)) +// readServiceFile reads the service file and constructs the runtime pod and the rkt info. +func (r *runtime) readServiceFile(serviceName string) (*kubecontainer.Pod, *rktInfo, error) { + f, err := os.Open(serviceFilePath(serviceName)) if err != nil { - return nil, "", err + return nil, nil, err } defer f.Close() var pod kubecontainer.Pod opts, err := unit.Deserialize(f) if err != nil { - return nil, "", err + return nil, nil, err } - var rktID string + info := emptyRktInfo() for _, opt := range opts { if opt.Section != unitKubernetesSection { continue @@ -649,19 +675,25 @@ func (r *runtime) makeRuntimePod(unitName string) (*kubecontainer.Pod, string, e case unitPodName: err = json.Unmarshal([]byte(opt.Value), &pod) if err != nil { - return nil, "", err + return nil, nil, err } case unitRktID: - rktID = opt.Value + info.uuid = opt.Value + case unitRestartCount: + cnt, err := strconv.Atoi(opt.Value) + if err != nil { + return nil, nil, err + } + info.restartCount = cnt default: - return nil, "", fmt.Errorf("rkt: Unexpected key: %q", opt.Name) + return nil, nil, fmt.Errorf("rkt: unexpected key: %q", opt.Name) } } - if len(rktID) == 0 { - return nil, "", fmt.Errorf("rkt: cannot find rkt ID of pod %v, unit file is broken", pod) + if info.isEmpty() { + return nil, nil, fmt.Errorf("rkt: cannot find rkt info of pod %v, unit file is broken", pod) } - return &pod, "", nil + return &pod, info, nil } // GetPods runs 'systemctl list-unit' and 'rkt list' to get the list of rkt pods. @@ -682,7 +714,7 @@ func (r *runtime) GetPods(all bool) ([]*kubecontainer.Pod, error) { if !all && u.SubState != "running" { continue } - pod, _, err := r.makeRuntimePod(u.Name) + pod, _, err := r.readServiceFile(u.Name) if err != nil { glog.Warningf("rkt: Cannot construct pod from unit file: %v.", err) continue @@ -696,87 +728,33 @@ func (r *runtime) GetPods(all bool) ([]*kubecontainer.Pod, error) { // KillPod invokes 'systemctl kill' to kill the unit that runs the pod. func (r *runtime) KillPod(pod *api.Pod, runningPod kubecontainer.Pod) error { glog.V(4).Infof("Rkt is killing pod: name %q.", runningPod.Name) - + serviceName := makePodServiceFileName(runningPod.ID) // TODO(yifan): More graceful stop. Replace with StopUnit and wait for a timeout. - r.systemd.KillUnit(makePodServiceFileName(runningPod.ID), int32(syscall.SIGKILL)) - return r.systemd.Reload() -} - -type byModTime []os.FileInfo - -func (b byModTime) Len() int { return len(b) } -func (b byModTime) Swap(i, j int) { b[i], b[j] = b[j], b[i] } -func (b byModTime) Less(i, j int) bool { return b[i].ModTime().After(b[j].ModTime()) } - -// listUnitFiles reads the systemd directory and returns a list of rkt -// service file names, sorted by the modification date from newest to oldest. -// TODO(yifan): Listing all units under the directory is inefficent, consider to -// create the list during startup, and then record every unit creation to avoid -// reading the whole directory. -func listUnitFiles() ([]string, error) { - files, err := ioutil.ReadDir(systemdServiceDir) - if err != nil { - return nil, err - } - - sort.Sort(byModTime(files)) - - var rktFiles []string - for _, f := range files { - if strings.HasPrefix(f.Name(), kubernetesUnitPrefix) { - rktFiles = append(rktFiles, f.Name()) - } - } - return rktFiles, nil + r.systemd.KillUnit(serviceName, int32(syscall.SIGKILL)) + // Remove the systemd service file as well. + return os.Remove(serviceFilePath(serviceName)) } // getPodStatus reads the service file and invokes 'rkt status $UUID' to get the // pod's status. func (r *runtime) getPodStatus(serviceName string) (*api.PodStatus, error) { // TODO(yifan): Get rkt uuid from the service file name. - pod, uuid, err := r.makeRuntimePod(serviceName) + pod, rktInfo, err := r.readServiceFile(serviceName) if err != nil { return nil, err } - podInfo, err := r.getPodInfo(uuid) + podInfo, err := r.getPodInfo(rktInfo.uuid) if err != nil { return nil, err } - status := podInfo.toPodStatus(pod) + status := makePodStatus(pod, podInfo, rktInfo) return &status, nil } // GetPodStatus returns the status of the given pod. func (r *runtime) GetPodStatus(pod *api.Pod) (*api.PodStatus, error) { - unitNames, err := listUnitFiles() - if err != nil { - glog.Errorf("rkt: Cannot list unit files: %v", err) - return nil, err - } - - var status *api.PodStatus - var errlist []error - for _, name := range unitNames { - if !strings.Contains(name, string(pod.UID)) { - continue - } - - if status != nil { - // This means the pod has been restarted. - for _, c := range status.ContainerStatuses { - c.RestartCount++ - } - continue - } - - status, err = r.getPodStatus(name) - if err != nil { - glog.Errorf("rkt: Cannot get pod status for pod %q, service file %q: %v", pod.Name, name, err) - errlist = append(errlist, err) - continue - } - } - return status, errors.NewAggregate(errlist) + serviceName := makePodServiceFileName(pod.UID) + return r.getPodStatus(serviceName) } // Version invokes 'rkt version' to get the version information of the rkt @@ -913,6 +891,10 @@ func (r *runtime) IsImagePresent(image kubecontainer.ImageSpec) (bool, error) { // SyncPod syncs the running pod to match the specified desired pod. func (r *runtime) SyncPod(pod *api.Pod, runningPod kubecontainer.Pod, podStatus api.PodStatus, pullSecrets []api.Secret, backOff *util.Backoff) error { podFullName := kubecontainer.GetPodFullName(pod) + if len(runningPod.Containers) == 0 { + glog.V(4).Infof("Pod %q is not running, will start it", podFullName) + return r.RunPod(pod, pullSecrets) + } // Add references to all containers. unidentifiedContainers := make(map[types.UID]*kubecontainer.Container) @@ -1092,36 +1074,26 @@ func (r *runtime) ExecInContainer(containerID string, cmd []string, stdin io.Rea } // findRktID returns the rkt uuid for the pod. -// TODO(yifan): This is unefficient which require us to list -// all the unit files. func (r *runtime) findRktID(pod *kubecontainer.Pod) (string, error) { - units, err := r.systemd.ListUnits() + serviceName := makePodServiceFileName(pod.ID) + + f, err := os.Open(serviceFilePath(serviceName)) + if err != nil { + if os.IsNotExist(err) { + return "", fmt.Errorf("no service file %v for pod %q, UID %q", serviceName, pod.Name, pod.ID) + } + return "", err + } + defer f.Close() + + opts, err := unit.Deserialize(f) if err != nil { return "", err } - unitName := makePodServiceFileName(pod.ID) - for _, u := range units { - // u.Name contains file name ext such as .service, .socket, etc. - if u.Name != unitName { - continue - } - - f, err := os.Open(path.Join(systemdServiceDir, u.Name)) - if err != nil { - return "", err - } - defer f.Close() - - opts, err := unit.Deserialize(f) - if err != nil { - return "", err - } - - for _, opt := range opts { - if opt.Section == unitKubernetesSection && opt.Name == unitRktID { - return opt.Value, nil - } + for _, opt := range opts { + if opt.Section == unitKubernetesSection && opt.Name == unitRktID { + return opt.Value, nil } } return "", fmt.Errorf("rkt uuid not found for pod %v", pod) @@ -1140,22 +1112,14 @@ func (r *runtime) findRktID(pod *kubecontainer.Pod) (string, error) { func (r *runtime) PortForward(pod *kubecontainer.Pod, port uint16, stream io.ReadWriteCloser) error { glog.V(4).Infof("Rkt port forwarding in container.") - podInfos, err := r.getPodInfos() - if err != nil { - return err - } - rktID, err := r.findRktID(pod) if err != nil { return err } - info, ok := podInfos[rktID] - if !ok { - return fmt.Errorf("cannot find the pod info for pod %v", pod) - } - if info.pid < 0 { - return fmt.Errorf("cannot get the pid for pod %v", pod) + info, err := r.getPodInfo(rktID) + if err != nil { + return err } _, lookupErr := exec.LookPath("socat") @@ -1197,52 +1161,6 @@ func (r *runtime) getPodInfo(uuid string) (*podInfo, error) { return info, nil } -// getPodInfos returns a map of [pod-uuid]:*podInfo -func (r *runtime) getPodInfos() (map[string]*podInfo, error) { - result := make(map[string]*podInfo) - - output, err := r.runCommand("list", "--no-legend", "--full") - if err != nil { - return result, err - } - if len(output) == 0 { - // No pods are running. - return result, nil - } - - // Example output of 'rkt list --full' (version == 0.7.0): - // - // UUID APP ACI STATE NETWORKS - // 2372bc17-47cb-43fb-8d78-20b31729feda foo coreos.com/etcd running default:ip4=172.16.28.3 - // bar nginx running - // 40e2813b-9d5d-4146-a817-0de92646da96 foo redis exited - // 40e2813b-9d5d-4146-a817-0de92646da96 bar busybox exited - // - // With '--no-legend', the first line is eliminated. - for _, line := range output { - tuples := splitLineByTab(line) - if len(tuples) < 1 { - continue - } - if !isUUID(tuples[0]) { - continue - } - id := tuples[0] - status, err := r.runCommand("status", id) - if err != nil { - glog.Errorf("rkt: Cannot get status for pod (uuid=%q): %v", id, err) - continue - } - info, err := parsePodInfo(status) - if err != nil { - glog.Errorf("rkt: Cannot parse status for pod (uuid=%q): %v", id, err) - continue - } - result[id] = info - } - return result, nil -} - // getImageByName tries to find the image info with the given image name. // TODO(yifan): Replace with 'rkt image cat-manifest'. // imageName should be in the form of 'example.com/app:latest', which should matches