From 644aa9536ae8ca0652a655a8bf6b2c3779dcc8b7 Mon Sep 17 00:00:00 2001 From: Yifan Gu Date: Mon, 21 Dec 2015 11:25:38 -0800 Subject: [PATCH] rkt: Remove pod_info.go, clean up codes that not needed anymore. --- pkg/kubelet/kubelet.go | 6 +- pkg/kubelet/rkt/pod_info.go | 201 -------------------------------- pkg/kubelet/rkt/rkt.go | 224 +++++++++++------------------------- 3 files changed, 67 insertions(+), 364 deletions(-) delete mode 100644 pkg/kubelet/rkt/pod_info.go diff --git a/pkg/kubelet/kubelet.go b/pkg/kubelet/kubelet.go index 5db68fb323e..ae920762666 100644 --- a/pkg/kubelet/kubelet.go +++ b/pkg/kubelet/kubelet.go @@ -1270,7 +1270,7 @@ func (kl *Kubelet) GenerateRunContainerOptions(pod *api.Pod, container *api.Cont vol, ok := kl.volumeManager.GetVolumes(pod.UID) if !ok { - return nil, fmt.Errorf("impossible: cannot find the mounted volumes for pod %q", kubecontainer.GetPodFullName(pod)) + return nil, fmt.Errorf("impossible: cannot find the mounted volumes for pod %q", format.Pod(pod)) } opts.PortMappings = makePortMappings(container) @@ -1458,7 +1458,7 @@ func (kl *Kubelet) getClusterDNS(pod *api.Pod) ([]string, []string, error) { // clusterDNS is not known. // pod with ClusterDNSFirst Policy cannot be created kl.recorder.Eventf(pod, api.EventTypeWarning, "MissingClusterDNS", "kubelet does not have ClusterDNS IP configured and cannot create Pod using %q policy. Falling back to DNSDefault policy.", pod.Spec.DNSPolicy) - log := fmt.Sprintf("kubelet does not have ClusterDNS IP configured and cannot create Pod using %q policy. pod:%q. Falling back to DNSDefault policy.", pod.Spec.DNSPolicy, kubecontainer.GetPodFullName(pod)) + log := fmt.Sprintf("kubelet does not have ClusterDNS IP configured and cannot create Pod using %q policy. pod: %q. Falling back to DNSDefault policy.", pod.Spec.DNSPolicy, format.Pod(pod)) kl.recorder.Eventf(kl.nodeRef, api.EventTypeWarning, "MissingClusterDNS", log) // fallback to DNSDefault @@ -2152,7 +2152,7 @@ func hasHostPortConflicts(pods []*api.Pod) bool { ports := sets.String{} for _, pod := range pods { if errs := validation.AccumulateUniqueHostPorts(pod.Spec.Containers, &ports, field.NewPath("spec", "containers")); len(errs) > 0 { - glog.Errorf("Pod %q: HostPort is already allocated, ignoring: %v", kubecontainer.GetPodFullName(pod), errs) + glog.Errorf("Pod %q: HostPort is already allocated, ignoring: %v", format.Pod(pod), errs) return true } } diff --git a/pkg/kubelet/rkt/pod_info.go b/pkg/kubelet/rkt/pod_info.go deleted file mode 100644 index 7c52ce7730d..00000000000 --- a/pkg/kubelet/rkt/pod_info.go +++ /dev/null @@ -1,201 +0,0 @@ -/* -Copyright 2015 The Kubernetes Authors All rights reserved. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package rkt - -import ( - "fmt" - "reflect" - "strconv" - "strings" - - "github.com/golang/glog" - "k8s.io/kubernetes/pkg/api" - "k8s.io/kubernetes/pkg/api/unversioned" - kubecontainer "k8s.io/kubernetes/pkg/kubelet/container" -) - -// rkt pod state. -// TODO(yifan): Use exported definition in rkt. -const ( - Embryo = "embryo" - Preparing = "preparing" - AbortedPrepare = "aborted prepare" - Prepared = "prepared" - Running = "running" - Deleting = "deleting" // This covers pod.isExitedDeleting and pod.isDeleting. - Exited = "exited" // This covers pod.isExited and pod.isExitedGarbage. - Garbage = "garbage" - - // The prefix before the app name for each app's exit code in the output of 'rkt status'. - exitCodePrefix = "app-" -) - -// rktInfo represents the information of the rkt pod that stored in the -// systemd service file. -type rktInfo struct { - uuid string - restartCount int -} - -func emptyRktInfo() *rktInfo { - return &rktInfo{restartCount: -1} -} - -func (r *rktInfo) isEmpty() bool { - return reflect.DeepEqual(r, emptyRktInfo()) -} - -// podInfo is the internal type that represents the state of -// the rkt pod. -type podInfo struct { - // The state of the pod, e.g. Embryo, Preparing. - state string - // The ip of the pod. IPv4 for now. - ip string - // The pid of the init process in the pod. - pid int - // A map of [app name]:[exit code]. - exitCodes map[string]int - // TODO(yifan): Expose [app name]:[image id]. -} - -// parsePodInfo parses the result of 'rkt status' into podInfo. -// -// Example output of 'rkt status': -// -// state=exited -// pid=-1 -// exited=true -// app-etcd=0 # The exit code of the app "etcd" in the pod. -// app-redis=0 # The exit code of the app "redis" in the pod. -// -func parsePodInfo(status []string) (*podInfo, error) { - p := &podInfo{ - pid: -1, - exitCodes: make(map[string]int), - } - - for _, line := range status { - tuples := strings.SplitN(line, "=", 2) - if len(tuples) != 2 { - return nil, fmt.Errorf("invalid status line: %q", line) - } - switch tuples[0] { - case "state": - // TODO(yifan): Parse the status here. This requires more details in - // the rkt status, (e.g. started time, image name, etc). - p.state = tuples[1] - case "networks": - p.ip = getIPFromNetworkInfo(tuples[1]) - case "pid": - pid, err := strconv.Atoi(tuples[1]) - if err != nil { - return nil, fmt.Errorf("cannot parse pid from %s: %v", tuples[1], err) - } - p.pid = pid - } - - if strings.HasPrefix(tuples[0], exitCodePrefix) { - exitcode, err := strconv.Atoi(tuples[1]) - if err != nil { - return nil, fmt.Errorf("cannot parse exit code from %s : %v", tuples[1], err) - } - appName := strings.TrimPrefix(tuples[0], exitCodePrefix) - p.exitCodes[appName] = exitcode - } - } - return p, nil -} - -// getIPFromNetworkInfo returns the IP of a pod by parsing the network info. -// The network info looks like this: -// -// default:ip4=172.16.28.3 -// database:ip4=172.16.28.42 -// -func getIPFromNetworkInfo(networkInfo string) string { - parts := strings.Split(networkInfo, ",") - for _, part := range parts { - tuples := strings.Split(part, "=") - if len(tuples) == 2 { - return tuples[1] - } - } - return "" -} - -// makeContainerStatus creates the api.containerStatus of a container from the podInfo. -func makeContainerStatus(container *kubecontainer.Container, podInfo *podInfo) api.ContainerStatus { - var status api.ContainerStatus - status.Name = container.Name - status.Image = container.Image - status.ContainerID = container.ID.String() - // TODO(yifan): Add image ID info. - - switch podInfo.state { - case Running: - // TODO(yifan): Get StartedAt. - status.State = api.ContainerState{ - Running: &api.ContainerStateRunning{ - StartedAt: unversioned.Unix(container.Created, 0), - }, - } - case Embryo, Preparing, Prepared: - status.State = api.ContainerState{Waiting: &api.ContainerStateWaiting{}} - case AbortedPrepare, Deleting, Exited, Garbage: - exitCode, ok := podInfo.exitCodes[status.Name] - if !ok { - glog.Warningf("rkt: Cannot get exit code for container %v", container) - exitCode = -1 - - } - status.State = api.ContainerState{ - Terminated: &api.ContainerStateTerminated{ - ExitCode: exitCode, - StartedAt: unversioned.Unix(container.Created, 0), - }, - } - default: - glog.Warningf("rkt: Unknown pod state: %q", podInfo.state) - } - return status -} - -// makePodStatus constructs the pod status from the pod info and rkt info. -func makePodStatus(pod *kubecontainer.Pod, podInfo *podInfo, rktInfo *rktInfo) api.PodStatus { - var status api.PodStatus - status.PodIP = podInfo.ip - // For now just make every container's state the same as the pod. - for _, container := range pod.Containers { - containerStatus := makeContainerStatus(container, podInfo) - containerStatus.RestartCount = rktInfo.restartCount - status.ContainerStatuses = append(status.ContainerStatuses, containerStatus) - } - return status -} - -// splitLineByTab breaks a line by tabs, and trims the leading and tailing spaces. -func splitLineByTab(line string) []string { - var result []string - tuples := strings.Split(strings.TrimSpace(line), "\t") - for _, t := range tuples { - if t != "" { - result = append(result, t) - } - } - return result -} diff --git a/pkg/kubelet/rkt/rkt.go b/pkg/kubelet/rkt/rkt.go index 8fa89356678..6b403d7c27c 100644 --- a/pkg/kubelet/rkt/rkt.go +++ b/pkg/kubelet/rkt/rkt.go @@ -403,26 +403,42 @@ func setApp(app *appctypes.App, c *api.Container, opts *kubecontainer.RunContain return setIsolators(app, c) } +type sortByImportTime []*rktapi.Image + +func (s sortByImportTime) Len() int { return len(s) } +func (s sortByImportTime) Swap(i, j int) { s[i], s[j] = s[j], s[i] } +func (s sortByImportTime) Less(i, j int) bool { return s[i].ImportTimestamp < s[j].ImportTimestamp } + // listImages lists the images that have the given name. If detail is true, // then image manifest is also included in the result. +// Note that there could be more than one images that have the given name, we +// will return the result reversely sorted by the import time, so that the latest +// image comes first. func (r *Runtime) listImages(image string, detail bool) ([]*rktapi.Image, error) { repoToPull, tag := parsers.ParseImageName(image) listResp, err := r.apisvc.ListImages(context.Background(), &rktapi.ListImagesRequest{ Detail: detail, Filters: []*rktapi.ImageFilter{ { - BaseNames: []string{repoToPull}, - Labels: []*rktapi.KeyValue{{Key: "version", Value: tag}}, + // TODO(yifan): Add a field in the ImageFilter to match the whole name, + // not just keywords. + // https://github.com/coreos/rkt/issues/1872#issuecomment-166456938 + Keywords: []string{repoToPull}, + Labels: []*rktapi.KeyValue{{Key: "version", Value: tag}}, }, }, }) if err != nil { return nil, fmt.Errorf("couldn't list images: %v", err) } + + // TODO(yifan): Let the API service to sort the result: + // See https://github.com/coreos/rkt/issues/1911. + sort.Sort(sort.Reverse(sortByImportTime(listResp.Images))) return listResp.Images, nil } -// getImageManifest retrives the image manifest for the given image. +// getImageManifest retrieves the image manifest for the given image. func (r *Runtime) getImageManifest(image string) (*appcschema.ImageManifest, error) { var manifest appcschema.ImageManifest @@ -573,6 +589,26 @@ func (r *Runtime) newAppcRuntimeApp(pod *api.Pod, c api.Container, pullSecrets [ }, opts.PortMappings, nil } +func runningKubernetesPodFilters(uid types.UID) []*rktapi.PodFilter { + return []*rktapi.PodFilter{ + { + States: []rktapi.PodState{ + rktapi.PodState_POD_STATE_RUNNING, + }, + Annotations: []*rktapi.KeyValue{ + { + Key: k8sRktKubeletAnno, + Value: k8sRktKubeletAnnoValue, + }, + { + Key: k8sRktUIDAnno, + Value: string(uid), + }, + }, + }, + } +} + func kubernetesPodFilters(uid types.UID) []*rktapi.PodFilter { return []*rktapi.PodFilter{ { @@ -595,8 +631,6 @@ func newUnitOption(section, name, value string) *unit.UnitOption { } // apiPodToruntimePod converts an api.Pod to kubelet/container.Pod. -// we save the this for later reconstruction of the kubelet/container.Pod -// such as in GetPods(). func apiPodToruntimePod(uuid string, pod *api.Pod) *kubecontainer.Pod { p := &kubecontainer.Pod{ ID: pod.UID, @@ -671,12 +705,6 @@ func (r *Runtime) preparePod(pod *api.Pod, pullSecrets []api.Secret) (string, *k glog.V(4).Infof("'rkt prepare' returns %q", uuid) // Create systemd service file for the rkt pod. - runtimePod := apiPodToruntimePod(uuid, pod) - b, err := json.Marshal(runtimePod) - if err != nil { - return "", nil, err - } - var runPrepared string if pod.Spec.SecurityContext != nil && pod.Spec.SecurityContext.HostNetwork { runPrepared = fmt.Sprintf("%s run-prepared --mds-register=false --net=host %s", r.rktBinAbsPath, uuid) @@ -688,8 +716,6 @@ func (r *Runtime) preparePod(pod *api.Pod, pullSecrets []api.Secret) (string, *k // TODO handle pod.Spec.HostIPC units := []*unit.UnitOption{ - newUnitOption(unitKubernetesSection, unitRktID, uuid), - newUnitOption(unitKubernetesSection, unitPodName, string(b)), // This makes the service show up for 'systemctl list-units' even if it exits successfully. newUnitOption("Service", "RemainAfterExit", "true"), newUnitOption("Service", "ExecStart", runPrepared), @@ -698,40 +724,29 @@ func (r *Runtime) preparePod(pod *api.Pod, pullSecrets []api.Secret) (string, *k } // Check if there's old rkt pod corresponding to the same pod, if so, update the restart count. - var restartCount int var needReload bool serviceName := makePodServiceFileName(pod.UID) if _, err := os.Stat(serviceFilePath(serviceName)); err == nil { // Service file already exists, that means the pod is being restarted. needReload = true - _, info, err := r.readServiceFile(serviceName) - if err != nil { - glog.Warningf("rkt: Cannot get old pod's info from service file %q: (%v), will ignore it", serviceName, err) - restartCount = 0 - } else { - restartCount = info.restartCount + 1 - } } - units = append(units, newUnitOption(unitKubernetesSection, unitRestartCount, strconv.Itoa(restartCount))) glog.V(4).Infof("rkt: Creating service file %q for pod %q", serviceName, format.Pod(pod)) serviceFile, err := os.Create(serviceFilePath(serviceName)) if err != nil { return "", nil, err } - defer serviceFile.Close() - - _, err = io.Copy(serviceFile, unit.Serialize(units)) - if err != nil { + if _, err := io.Copy(serviceFile, unit.Serialize(units)); err != nil { return "", nil, err } - + serviceFile.Close() if needReload { if err := r.systemd.Reload(); err != nil { return "", nil, err } } - return serviceName, runtimePod, nil + + return serviceName, apiPodToruntimePod(uuid, pod), nil } // generateEvents is a helper function that generates some container @@ -881,50 +896,6 @@ func (r *Runtime) convertRktPod(rktpod *rktapi.Pod) (*kubecontainer.Pod, error) return kubepod, nil } -// readServiceFile reads the service file and constructs the runtime pod and the rkt info. -func (r *Runtime) readServiceFile(serviceName string) (*kubecontainer.Pod, *rktInfo, error) { - f, err := os.Open(serviceFilePath(serviceName)) - if err != nil { - return nil, nil, err - } - defer f.Close() - - var pod kubecontainer.Pod - opts, err := unit.Deserialize(f) - if err != nil { - return nil, nil, err - } - - info := emptyRktInfo() - for _, opt := range opts { - if opt.Section != unitKubernetesSection { - continue - } - switch opt.Name { - case unitPodName: - err = json.Unmarshal([]byte(opt.Value), &pod) - if err != nil { - return nil, nil, err - } - case unitRktID: - info.uuid = opt.Value - case unitRestartCount: - cnt, err := strconv.Atoi(opt.Value) - if err != nil { - return nil, nil, err - } - info.restartCount = cnt - default: - return nil, nil, fmt.Errorf("rkt: unexpected key: %q", opt.Name) - } - } - - if info.isEmpty() { - return nil, nil, fmt.Errorf("rkt: cannot find rkt info of pod %v, unit file is broken", pod) - } - return &pod, info, nil -} - // GetPods runs 'systemctl list-unit' and 'rkt list' to get the list of rkt pods. // Then it will use the result to construct a list of container runtime pods. // If all is false, then only running pods will be returned, otherwise all pods will be @@ -1001,36 +972,14 @@ func (r *Runtime) KillPod(pod *api.Pod, runningPod kubecontainer.Pod) error { return nil } -// getAPIPodStatus reads the service file and invokes 'rkt status $UUID' to get the -// pod's status. -func (r *Runtime) getAPIPodStatus(serviceName string) (*api.PodStatus, error) { - var status api.PodStatus - - // TODO(yifan): Get rkt uuid from the service file name. - pod, rktInfo, err := r.readServiceFile(serviceName) - if err != nil && !os.IsNotExist(err) { - return nil, err - } - - if os.IsNotExist(err) { - // Pod does not exit, means it's not been created yet, - // return empty status for now. - // TODO(yifan): Maybe inspect the image and return waiting status. - return &status, nil - } - - podInfo, err := r.getPodInfo(rktInfo.uuid) +// GetAPIPodStatus returns the status of the given pod. +func (r *Runtime) GetAPIPodStatus(pod *api.Pod) (*api.PodStatus, error) { + // Get the pod status. + podStatus, err := r.GetPodStatus(pod.UID, pod.Name, pod.Namespace) if err != nil { return nil, err } - status = makePodStatus(pod, podInfo, rktInfo) - return &status, nil -} - -// GetAPIPodStatus returns the status of the given pod. -func (r *Runtime) GetAPIPodStatus(pod *api.Pod) (*api.PodStatus, error) { - serviceName := makePodServiceFileName(pod.UID) - return r.getAPIPodStatus(serviceName) + return r.ConvertPodStatusToAPIPodStatus(pod, podStatus) } func (r *Runtime) Type() string { @@ -1316,32 +1265,6 @@ func (r *Runtime) ExecInContainer(containerID kubecontainer.ContainerID, cmd []s return command.Run() } -// findRktID returns the rkt uuid for the pod. -func (r *Runtime) findRktID(pod *kubecontainer.Pod) (string, error) { - serviceName := makePodServiceFileName(pod.ID) - - f, err := os.Open(serviceFilePath(serviceName)) - if err != nil { - if os.IsNotExist(err) { - return "", fmt.Errorf("no service file %v for runtime pod %q, ID %q", serviceName, pod.Name, pod.ID) - } - return "", err - } - defer f.Close() - - opts, err := unit.Deserialize(f) - if err != nil { - return "", err - } - - for _, opt := range opts { - if opt.Section == unitKubernetesSection && opt.Name == unitRktID { - return opt.Value, nil - } - } - return "", fmt.Errorf("rkt uuid not found for pod %v", pod) -} - // PortForward executes socat in the pod's network namespace and copies // data between stream (representing the user's local connection on their // computer) and the specified port in the container. @@ -1356,14 +1279,20 @@ func (r *Runtime) findRktID(pod *kubecontainer.Pod) (string, error) { func (r *Runtime) PortForward(pod *kubecontainer.Pod, port uint16, stream io.ReadWriteCloser) error { glog.V(4).Infof("Rkt port forwarding in container.") - rktID, err := r.findRktID(pod) + listResp, err := r.apisvc.ListPods(context.Background(), &rktapi.ListPodsRequest{ + Detail: true, + Filters: runningKubernetesPodFilters(pod.ID), + }) if err != nil { - return err + return fmt.Errorf("couldn't list pods: %v", err) } - info, err := r.getPodInfo(rktID) - if err != nil { - return err + if len(listResp.Pods) != 1 { + var podlist []string + for _, p := range listResp.Pods { + podlist = append(podlist, p.Id) + } + return fmt.Errorf("more than one running rkt pod for the kubernetes pod [%s]", strings.Join(podlist, ", ")) } socatPath, lookupErr := exec.LookPath("socat") @@ -1371,7 +1300,7 @@ func (r *Runtime) PortForward(pod *kubecontainer.Pod, port uint16, stream io.Rea return fmt.Errorf("unable to do port forwarding: socat not found.") } - args := []string{"-t", fmt.Sprintf("%d", info.pid), "-n", socatPath, "-", fmt.Sprintf("TCP4:localhost:%d", port)} + args := []string{"-t", fmt.Sprintf("%d", listResp.Pods[0].Pid), "-n", socatPath, "-", fmt.Sprintf("TCP4:localhost:%d", port)} nsenterPath, lookupErr := exec.LookPath("nsenter") if lookupErr != nil { @@ -1402,29 +1331,6 @@ func (r *Runtime) PortForward(pod *kubecontainer.Pod, port uint16, stream io.Rea return command.Run() } -// isUUID returns true if the input is a valid rkt UUID, -// e.g. "2372bc17-47cb-43fb-8d78-20b31729feda". -func isUUID(input string) bool { - if _, err := appctypes.NewUUID(input); err != nil { - return false - } - return true -} - -// getPodInfo returns the pod info of a single pod according -// to the uuid. -func (r *Runtime) getPodInfo(uuid string) (*podInfo, error) { - status, err := r.runCommand("status", uuid) - if err != nil { - return nil, err - } - info, err := parsePodInfo(status) - if err != nil { - return nil, err - } - return info, nil -} - // buildImageName constructs the image name for kubecontainer.Image. func buildImageName(img *rktapi.Image) string { return fmt.Sprintf("%s:%s", img.Name, img.Version) @@ -1486,9 +1392,8 @@ func appStateToContainerState(state rktapi.AppState) kubecontainer.ContainerStat return kubecontainer.ContainerStateUnknown } -// retrievePodInfo returns the pod manifest, creation time and restart count of the pod. -// TODO(yifan): Rename to getPodInfo when the old getPodInfo is removed. -func retrievePodInfo(pod *rktapi.Pod) (podManifest *appcschema.PodManifest, creationTime time.Time, restartCount int, err error) { +// getPodInfo returns the pod manifest, creation time and restart count of the pod. +func getPodInfo(pod *rktapi.Pod) (podManifest *appcschema.PodManifest, creationTime time.Time, restartCount int, err error) { // TODO(yifan): The manifest is only used for getting the annotations. // Consider to let the server to unmarshal the annotations. var manifest appcschema.PodManifest @@ -1568,7 +1473,7 @@ func (r *Runtime) GetPodStatus(uid types.UID, name, namespace string) (*kubecont // In this loop, we group all containers from all pods together, // also we try to find the latest pod, so we can fill other info of the pod below. for _, pod := range listResp.Pods { - manifest, creationTime, restartCount, err := retrievePodInfo(pod) + manifest, creationTime, restartCount, err := getPodInfo(pod) if err != nil { glog.Warning("rkt: Couldn't get necessary info from the rkt pod, (uuid %q): %v", pod.Id, err) continue @@ -1692,7 +1597,6 @@ func (r *Runtime) GetPodStatusAndAPIPodStatus(pod *api.Pod) (*kubecontainer.PodS if err != nil { return nil, nil, err } - var apiPodStatus *api.PodStatus - apiPodStatus, err = r.ConvertPodStatusToAPIPodStatus(pod, podStatus) + apiPodStatus, err := r.ConvertPodStatusToAPIPodStatus(pod, podStatus) return podStatus, apiPodStatus, err }