rkt: Remove pod_info.go, clean up codes that not needed anymore.

This commit is contained in:
Yifan Gu 2015-12-21 11:25:38 -08:00
parent ee7251ed10
commit 644aa9536a
3 changed files with 67 additions and 364 deletions

View File

@ -1270,7 +1270,7 @@ func (kl *Kubelet) GenerateRunContainerOptions(pod *api.Pod, container *api.Cont
vol, ok := kl.volumeManager.GetVolumes(pod.UID)
if !ok {
return nil, fmt.Errorf("impossible: cannot find the mounted volumes for pod %q", kubecontainer.GetPodFullName(pod))
return nil, fmt.Errorf("impossible: cannot find the mounted volumes for pod %q", format.Pod(pod))
}
opts.PortMappings = makePortMappings(container)
@ -1458,7 +1458,7 @@ func (kl *Kubelet) getClusterDNS(pod *api.Pod) ([]string, []string, error) {
// clusterDNS is not known.
// pod with ClusterDNSFirst Policy cannot be created
kl.recorder.Eventf(pod, api.EventTypeWarning, "MissingClusterDNS", "kubelet does not have ClusterDNS IP configured and cannot create Pod using %q policy. Falling back to DNSDefault policy.", pod.Spec.DNSPolicy)
log := fmt.Sprintf("kubelet does not have ClusterDNS IP configured and cannot create Pod using %q policy. pod:%q. Falling back to DNSDefault policy.", pod.Spec.DNSPolicy, kubecontainer.GetPodFullName(pod))
log := fmt.Sprintf("kubelet does not have ClusterDNS IP configured and cannot create Pod using %q policy. pod: %q. Falling back to DNSDefault policy.", pod.Spec.DNSPolicy, format.Pod(pod))
kl.recorder.Eventf(kl.nodeRef, api.EventTypeWarning, "MissingClusterDNS", log)
// fallback to DNSDefault
@ -2152,7 +2152,7 @@ func hasHostPortConflicts(pods []*api.Pod) bool {
ports := sets.String{}
for _, pod := range pods {
if errs := validation.AccumulateUniqueHostPorts(pod.Spec.Containers, &ports, field.NewPath("spec", "containers")); len(errs) > 0 {
glog.Errorf("Pod %q: HostPort is already allocated, ignoring: %v", kubecontainer.GetPodFullName(pod), errs)
glog.Errorf("Pod %q: HostPort is already allocated, ignoring: %v", format.Pod(pod), errs)
return true
}
}

View File

@ -1,201 +0,0 @@
/*
Copyright 2015 The Kubernetes Authors All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package rkt
import (
"fmt"
"reflect"
"strconv"
"strings"
"github.com/golang/glog"
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/api/unversioned"
kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
)
// rkt pod state.
// TODO(yifan): Use exported definition in rkt.
const (
Embryo = "embryo"
Preparing = "preparing"
AbortedPrepare = "aborted prepare"
Prepared = "prepared"
Running = "running"
Deleting = "deleting" // This covers pod.isExitedDeleting and pod.isDeleting.
Exited = "exited" // This covers pod.isExited and pod.isExitedGarbage.
Garbage = "garbage"
// The prefix before the app name for each app's exit code in the output of 'rkt status'.
exitCodePrefix = "app-"
)
// rktInfo represents the information of the rkt pod that stored in the
// systemd service file.
type rktInfo struct {
uuid string
restartCount int
}
func emptyRktInfo() *rktInfo {
return &rktInfo{restartCount: -1}
}
func (r *rktInfo) isEmpty() bool {
return reflect.DeepEqual(r, emptyRktInfo())
}
// podInfo is the internal type that represents the state of
// the rkt pod.
type podInfo struct {
// The state of the pod, e.g. Embryo, Preparing.
state string
// The ip of the pod. IPv4 for now.
ip string
// The pid of the init process in the pod.
pid int
// A map of [app name]:[exit code].
exitCodes map[string]int
// TODO(yifan): Expose [app name]:[image id].
}
// parsePodInfo parses the result of 'rkt status' into podInfo.
//
// Example output of 'rkt status':
//
// state=exited
// pid=-1
// exited=true
// app-etcd=0 # The exit code of the app "etcd" in the pod.
// app-redis=0 # The exit code of the app "redis" in the pod.
//
func parsePodInfo(status []string) (*podInfo, error) {
p := &podInfo{
pid: -1,
exitCodes: make(map[string]int),
}
for _, line := range status {
tuples := strings.SplitN(line, "=", 2)
if len(tuples) != 2 {
return nil, fmt.Errorf("invalid status line: %q", line)
}
switch tuples[0] {
case "state":
// TODO(yifan): Parse the status here. This requires more details in
// the rkt status, (e.g. started time, image name, etc).
p.state = tuples[1]
case "networks":
p.ip = getIPFromNetworkInfo(tuples[1])
case "pid":
pid, err := strconv.Atoi(tuples[1])
if err != nil {
return nil, fmt.Errorf("cannot parse pid from %s: %v", tuples[1], err)
}
p.pid = pid
}
if strings.HasPrefix(tuples[0], exitCodePrefix) {
exitcode, err := strconv.Atoi(tuples[1])
if err != nil {
return nil, fmt.Errorf("cannot parse exit code from %s : %v", tuples[1], err)
}
appName := strings.TrimPrefix(tuples[0], exitCodePrefix)
p.exitCodes[appName] = exitcode
}
}
return p, nil
}
// getIPFromNetworkInfo returns the IP of a pod by parsing the network info.
// The network info looks like this:
//
// default:ip4=172.16.28.3
// database:ip4=172.16.28.42
//
func getIPFromNetworkInfo(networkInfo string) string {
parts := strings.Split(networkInfo, ",")
for _, part := range parts {
tuples := strings.Split(part, "=")
if len(tuples) == 2 {
return tuples[1]
}
}
return ""
}
// makeContainerStatus creates the api.containerStatus of a container from the podInfo.
func makeContainerStatus(container *kubecontainer.Container, podInfo *podInfo) api.ContainerStatus {
var status api.ContainerStatus
status.Name = container.Name
status.Image = container.Image
status.ContainerID = container.ID.String()
// TODO(yifan): Add image ID info.
switch podInfo.state {
case Running:
// TODO(yifan): Get StartedAt.
status.State = api.ContainerState{
Running: &api.ContainerStateRunning{
StartedAt: unversioned.Unix(container.Created, 0),
},
}
case Embryo, Preparing, Prepared:
status.State = api.ContainerState{Waiting: &api.ContainerStateWaiting{}}
case AbortedPrepare, Deleting, Exited, Garbage:
exitCode, ok := podInfo.exitCodes[status.Name]
if !ok {
glog.Warningf("rkt: Cannot get exit code for container %v", container)
exitCode = -1
}
status.State = api.ContainerState{
Terminated: &api.ContainerStateTerminated{
ExitCode: exitCode,
StartedAt: unversioned.Unix(container.Created, 0),
},
}
default:
glog.Warningf("rkt: Unknown pod state: %q", podInfo.state)
}
return status
}
// makePodStatus constructs the pod status from the pod info and rkt info.
func makePodStatus(pod *kubecontainer.Pod, podInfo *podInfo, rktInfo *rktInfo) api.PodStatus {
var status api.PodStatus
status.PodIP = podInfo.ip
// For now just make every container's state the same as the pod.
for _, container := range pod.Containers {
containerStatus := makeContainerStatus(container, podInfo)
containerStatus.RestartCount = rktInfo.restartCount
status.ContainerStatuses = append(status.ContainerStatuses, containerStatus)
}
return status
}
// splitLineByTab breaks a line by tabs, and trims the leading and tailing spaces.
func splitLineByTab(line string) []string {
var result []string
tuples := strings.Split(strings.TrimSpace(line), "\t")
for _, t := range tuples {
if t != "" {
result = append(result, t)
}
}
return result
}

View File

@ -403,26 +403,42 @@ func setApp(app *appctypes.App, c *api.Container, opts *kubecontainer.RunContain
return setIsolators(app, c)
}
type sortByImportTime []*rktapi.Image
func (s sortByImportTime) Len() int { return len(s) }
func (s sortByImportTime) Swap(i, j int) { s[i], s[j] = s[j], s[i] }
func (s sortByImportTime) Less(i, j int) bool { return s[i].ImportTimestamp < s[j].ImportTimestamp }
// listImages lists the images that have the given name. If detail is true,
// then image manifest is also included in the result.
// Note that there could be more than one images that have the given name, we
// will return the result reversely sorted by the import time, so that the latest
// image comes first.
func (r *Runtime) listImages(image string, detail bool) ([]*rktapi.Image, error) {
repoToPull, tag := parsers.ParseImageName(image)
listResp, err := r.apisvc.ListImages(context.Background(), &rktapi.ListImagesRequest{
Detail: detail,
Filters: []*rktapi.ImageFilter{
{
BaseNames: []string{repoToPull},
Labels: []*rktapi.KeyValue{{Key: "version", Value: tag}},
// TODO(yifan): Add a field in the ImageFilter to match the whole name,
// not just keywords.
// https://github.com/coreos/rkt/issues/1872#issuecomment-166456938
Keywords: []string{repoToPull},
Labels: []*rktapi.KeyValue{{Key: "version", Value: tag}},
},
},
})
if err != nil {
return nil, fmt.Errorf("couldn't list images: %v", err)
}
// TODO(yifan): Let the API service to sort the result:
// See https://github.com/coreos/rkt/issues/1911.
sort.Sort(sort.Reverse(sortByImportTime(listResp.Images)))
return listResp.Images, nil
}
// getImageManifest retrives the image manifest for the given image.
// getImageManifest retrieves the image manifest for the given image.
func (r *Runtime) getImageManifest(image string) (*appcschema.ImageManifest, error) {
var manifest appcschema.ImageManifest
@ -573,6 +589,26 @@ func (r *Runtime) newAppcRuntimeApp(pod *api.Pod, c api.Container, pullSecrets [
}, opts.PortMappings, nil
}
func runningKubernetesPodFilters(uid types.UID) []*rktapi.PodFilter {
return []*rktapi.PodFilter{
{
States: []rktapi.PodState{
rktapi.PodState_POD_STATE_RUNNING,
},
Annotations: []*rktapi.KeyValue{
{
Key: k8sRktKubeletAnno,
Value: k8sRktKubeletAnnoValue,
},
{
Key: k8sRktUIDAnno,
Value: string(uid),
},
},
},
}
}
func kubernetesPodFilters(uid types.UID) []*rktapi.PodFilter {
return []*rktapi.PodFilter{
{
@ -595,8 +631,6 @@ func newUnitOption(section, name, value string) *unit.UnitOption {
}
// apiPodToruntimePod converts an api.Pod to kubelet/container.Pod.
// we save the this for later reconstruction of the kubelet/container.Pod
// such as in GetPods().
func apiPodToruntimePod(uuid string, pod *api.Pod) *kubecontainer.Pod {
p := &kubecontainer.Pod{
ID: pod.UID,
@ -671,12 +705,6 @@ func (r *Runtime) preparePod(pod *api.Pod, pullSecrets []api.Secret) (string, *k
glog.V(4).Infof("'rkt prepare' returns %q", uuid)
// Create systemd service file for the rkt pod.
runtimePod := apiPodToruntimePod(uuid, pod)
b, err := json.Marshal(runtimePod)
if err != nil {
return "", nil, err
}
var runPrepared string
if pod.Spec.SecurityContext != nil && pod.Spec.SecurityContext.HostNetwork {
runPrepared = fmt.Sprintf("%s run-prepared --mds-register=false --net=host %s", r.rktBinAbsPath, uuid)
@ -688,8 +716,6 @@ func (r *Runtime) preparePod(pod *api.Pod, pullSecrets []api.Secret) (string, *k
// TODO handle pod.Spec.HostIPC
units := []*unit.UnitOption{
newUnitOption(unitKubernetesSection, unitRktID, uuid),
newUnitOption(unitKubernetesSection, unitPodName, string(b)),
// This makes the service show up for 'systemctl list-units' even if it exits successfully.
newUnitOption("Service", "RemainAfterExit", "true"),
newUnitOption("Service", "ExecStart", runPrepared),
@ -698,40 +724,29 @@ func (r *Runtime) preparePod(pod *api.Pod, pullSecrets []api.Secret) (string, *k
}
// Check if there's old rkt pod corresponding to the same pod, if so, update the restart count.
var restartCount int
var needReload bool
serviceName := makePodServiceFileName(pod.UID)
if _, err := os.Stat(serviceFilePath(serviceName)); err == nil {
// Service file already exists, that means the pod is being restarted.
needReload = true
_, info, err := r.readServiceFile(serviceName)
if err != nil {
glog.Warningf("rkt: Cannot get old pod's info from service file %q: (%v), will ignore it", serviceName, err)
restartCount = 0
} else {
restartCount = info.restartCount + 1
}
}
units = append(units, newUnitOption(unitKubernetesSection, unitRestartCount, strconv.Itoa(restartCount)))
glog.V(4).Infof("rkt: Creating service file %q for pod %q", serviceName, format.Pod(pod))
serviceFile, err := os.Create(serviceFilePath(serviceName))
if err != nil {
return "", nil, err
}
defer serviceFile.Close()
_, err = io.Copy(serviceFile, unit.Serialize(units))
if err != nil {
if _, err := io.Copy(serviceFile, unit.Serialize(units)); err != nil {
return "", nil, err
}
serviceFile.Close()
if needReload {
if err := r.systemd.Reload(); err != nil {
return "", nil, err
}
}
return serviceName, runtimePod, nil
return serviceName, apiPodToruntimePod(uuid, pod), nil
}
// generateEvents is a helper function that generates some container
@ -881,50 +896,6 @@ func (r *Runtime) convertRktPod(rktpod *rktapi.Pod) (*kubecontainer.Pod, error)
return kubepod, nil
}
// readServiceFile reads the service file and constructs the runtime pod and the rkt info.
func (r *Runtime) readServiceFile(serviceName string) (*kubecontainer.Pod, *rktInfo, error) {
f, err := os.Open(serviceFilePath(serviceName))
if err != nil {
return nil, nil, err
}
defer f.Close()
var pod kubecontainer.Pod
opts, err := unit.Deserialize(f)
if err != nil {
return nil, nil, err
}
info := emptyRktInfo()
for _, opt := range opts {
if opt.Section != unitKubernetesSection {
continue
}
switch opt.Name {
case unitPodName:
err = json.Unmarshal([]byte(opt.Value), &pod)
if err != nil {
return nil, nil, err
}
case unitRktID:
info.uuid = opt.Value
case unitRestartCount:
cnt, err := strconv.Atoi(opt.Value)
if err != nil {
return nil, nil, err
}
info.restartCount = cnt
default:
return nil, nil, fmt.Errorf("rkt: unexpected key: %q", opt.Name)
}
}
if info.isEmpty() {
return nil, nil, fmt.Errorf("rkt: cannot find rkt info of pod %v, unit file is broken", pod)
}
return &pod, info, nil
}
// GetPods runs 'systemctl list-unit' and 'rkt list' to get the list of rkt pods.
// Then it will use the result to construct a list of container runtime pods.
// If all is false, then only running pods will be returned, otherwise all pods will be
@ -1001,36 +972,14 @@ func (r *Runtime) KillPod(pod *api.Pod, runningPod kubecontainer.Pod) error {
return nil
}
// getAPIPodStatus reads the service file and invokes 'rkt status $UUID' to get the
// pod's status.
func (r *Runtime) getAPIPodStatus(serviceName string) (*api.PodStatus, error) {
var status api.PodStatus
// TODO(yifan): Get rkt uuid from the service file name.
pod, rktInfo, err := r.readServiceFile(serviceName)
if err != nil && !os.IsNotExist(err) {
return nil, err
}
if os.IsNotExist(err) {
// Pod does not exit, means it's not been created yet,
// return empty status for now.
// TODO(yifan): Maybe inspect the image and return waiting status.
return &status, nil
}
podInfo, err := r.getPodInfo(rktInfo.uuid)
// GetAPIPodStatus returns the status of the given pod.
func (r *Runtime) GetAPIPodStatus(pod *api.Pod) (*api.PodStatus, error) {
// Get the pod status.
podStatus, err := r.GetPodStatus(pod.UID, pod.Name, pod.Namespace)
if err != nil {
return nil, err
}
status = makePodStatus(pod, podInfo, rktInfo)
return &status, nil
}
// GetAPIPodStatus returns the status of the given pod.
func (r *Runtime) GetAPIPodStatus(pod *api.Pod) (*api.PodStatus, error) {
serviceName := makePodServiceFileName(pod.UID)
return r.getAPIPodStatus(serviceName)
return r.ConvertPodStatusToAPIPodStatus(pod, podStatus)
}
func (r *Runtime) Type() string {
@ -1316,32 +1265,6 @@ func (r *Runtime) ExecInContainer(containerID kubecontainer.ContainerID, cmd []s
return command.Run()
}
// findRktID returns the rkt uuid for the pod.
func (r *Runtime) findRktID(pod *kubecontainer.Pod) (string, error) {
serviceName := makePodServiceFileName(pod.ID)
f, err := os.Open(serviceFilePath(serviceName))
if err != nil {
if os.IsNotExist(err) {
return "", fmt.Errorf("no service file %v for runtime pod %q, ID %q", serviceName, pod.Name, pod.ID)
}
return "", err
}
defer f.Close()
opts, err := unit.Deserialize(f)
if err != nil {
return "", err
}
for _, opt := range opts {
if opt.Section == unitKubernetesSection && opt.Name == unitRktID {
return opt.Value, nil
}
}
return "", fmt.Errorf("rkt uuid not found for pod %v", pod)
}
// PortForward executes socat in the pod's network namespace and copies
// data between stream (representing the user's local connection on their
// computer) and the specified port in the container.
@ -1356,14 +1279,20 @@ func (r *Runtime) findRktID(pod *kubecontainer.Pod) (string, error) {
func (r *Runtime) PortForward(pod *kubecontainer.Pod, port uint16, stream io.ReadWriteCloser) error {
glog.V(4).Infof("Rkt port forwarding in container.")
rktID, err := r.findRktID(pod)
listResp, err := r.apisvc.ListPods(context.Background(), &rktapi.ListPodsRequest{
Detail: true,
Filters: runningKubernetesPodFilters(pod.ID),
})
if err != nil {
return err
return fmt.Errorf("couldn't list pods: %v", err)
}
info, err := r.getPodInfo(rktID)
if err != nil {
return err
if len(listResp.Pods) != 1 {
var podlist []string
for _, p := range listResp.Pods {
podlist = append(podlist, p.Id)
}
return fmt.Errorf("more than one running rkt pod for the kubernetes pod [%s]", strings.Join(podlist, ", "))
}
socatPath, lookupErr := exec.LookPath("socat")
@ -1371,7 +1300,7 @@ func (r *Runtime) PortForward(pod *kubecontainer.Pod, port uint16, stream io.Rea
return fmt.Errorf("unable to do port forwarding: socat not found.")
}
args := []string{"-t", fmt.Sprintf("%d", info.pid), "-n", socatPath, "-", fmt.Sprintf("TCP4:localhost:%d", port)}
args := []string{"-t", fmt.Sprintf("%d", listResp.Pods[0].Pid), "-n", socatPath, "-", fmt.Sprintf("TCP4:localhost:%d", port)}
nsenterPath, lookupErr := exec.LookPath("nsenter")
if lookupErr != nil {
@ -1402,29 +1331,6 @@ func (r *Runtime) PortForward(pod *kubecontainer.Pod, port uint16, stream io.Rea
return command.Run()
}
// isUUID returns true if the input is a valid rkt UUID,
// e.g. "2372bc17-47cb-43fb-8d78-20b31729feda".
func isUUID(input string) bool {
if _, err := appctypes.NewUUID(input); err != nil {
return false
}
return true
}
// getPodInfo returns the pod info of a single pod according
// to the uuid.
func (r *Runtime) getPodInfo(uuid string) (*podInfo, error) {
status, err := r.runCommand("status", uuid)
if err != nil {
return nil, err
}
info, err := parsePodInfo(status)
if err != nil {
return nil, err
}
return info, nil
}
// buildImageName constructs the image name for kubecontainer.Image.
func buildImageName(img *rktapi.Image) string {
return fmt.Sprintf("%s:%s", img.Name, img.Version)
@ -1486,9 +1392,8 @@ func appStateToContainerState(state rktapi.AppState) kubecontainer.ContainerStat
return kubecontainer.ContainerStateUnknown
}
// retrievePodInfo returns the pod manifest, creation time and restart count of the pod.
// TODO(yifan): Rename to getPodInfo when the old getPodInfo is removed.
func retrievePodInfo(pod *rktapi.Pod) (podManifest *appcschema.PodManifest, creationTime time.Time, restartCount int, err error) {
// getPodInfo returns the pod manifest, creation time and restart count of the pod.
func getPodInfo(pod *rktapi.Pod) (podManifest *appcschema.PodManifest, creationTime time.Time, restartCount int, err error) {
// TODO(yifan): The manifest is only used for getting the annotations.
// Consider to let the server to unmarshal the annotations.
var manifest appcschema.PodManifest
@ -1568,7 +1473,7 @@ func (r *Runtime) GetPodStatus(uid types.UID, name, namespace string) (*kubecont
// In this loop, we group all containers from all pods together,
// also we try to find the latest pod, so we can fill other info of the pod below.
for _, pod := range listResp.Pods {
manifest, creationTime, restartCount, err := retrievePodInfo(pod)
manifest, creationTime, restartCount, err := getPodInfo(pod)
if err != nil {
glog.Warning("rkt: Couldn't get necessary info from the rkt pod, (uuid %q): %v", pod.Id, err)
continue
@ -1692,7 +1597,6 @@ func (r *Runtime) GetPodStatusAndAPIPodStatus(pod *api.Pod) (*kubecontainer.PodS
if err != nil {
return nil, nil, err
}
var apiPodStatus *api.PodStatus
apiPodStatus, err = r.ConvertPodStatusToAPIPodStatus(pod, podStatus)
apiPodStatus, err := r.ConvertPodStatusToAPIPodStatus(pod, podStatus)
return podStatus, apiPodStatus, err
}