diff --git a/pkg/kubelet/rkt/rkt.go b/pkg/kubelet/rkt/rkt.go index e7714dd62c5..62fc0e8ba62 100644 --- a/pkg/kubelet/rkt/rkt.go +++ b/pkg/kubelet/rkt/rkt.go @@ -25,6 +25,7 @@ import ( "os" "os/exec" "path" + "sort" "strconv" "strings" "syscall" @@ -40,6 +41,7 @@ import ( "github.com/golang/glog" "golang.org/x/net/context" "k8s.io/kubernetes/pkg/api" + "k8s.io/kubernetes/pkg/api/unversioned" "k8s.io/kubernetes/pkg/client/record" "k8s.io/kubernetes/pkg/credentialprovider" kubecontainer "k8s.io/kubernetes/pkg/kubelet/container" @@ -89,6 +91,7 @@ const ( defaultImageTag = "latest" defaultRktAPIServiceAddr = "localhost:15441" + defaultNetworkName = "rkt.kubernetes.io" ) // Runtime implements the Containerruntime for rkt. The implementation @@ -680,7 +683,7 @@ func (r *Runtime) preparePod(pod *api.Pod, pullSecrets []api.Secret) (string, *k if pod.Spec.SecurityContext != nil && pod.Spec.SecurityContext.HostNetwork { runPrepared = fmt.Sprintf("%s run-prepared --mds-register=false --net=host %s", r.rktBinAbsPath, uuid) } else { - runPrepared = fmt.Sprintf("%s run-prepared --mds-register=false %s", r.rktBinAbsPath, uuid) + runPrepared = fmt.Sprintf("%s run-prepared --mds-register=false --net=%s %s", r.rktBinAbsPath, defaultNetworkName, uuid) } // TODO handle pod.Spec.HostPID @@ -1521,16 +1524,234 @@ func (r *Runtime) RemoveImage(image kubecontainer.ImageSpec) error { return nil } -func (r *Runtime) GetPodStatus(uid types.UID, name, namespace string) (*kubecontainer.PodStatus, error) { - return nil, fmt.Errorf("Not implemented yet") +// appStateToContainerState converts rktapi.AppState to kubecontainer.ContainerState. +func appStateToContainerState(state rktapi.AppState) kubecontainer.ContainerState { + switch state { + case rktapi.AppState_APP_STATE_RUNNING: + return kubecontainer.ContainerStateRunning + case rktapi.AppState_APP_STATE_EXITED: + return kubecontainer.ContainerStateExited + } + return kubecontainer.ContainerStateUnknown } -func (r *Runtime) ConvertPodStatusToAPIPodStatus(_ *api.Pod, _ *kubecontainer.PodStatus) (*api.PodStatus, error) { - return nil, fmt.Errorf("Not implemented yet") +// TODO(yifan): Rename to getPodInfo when the old getPodInfo is removed. +func retrievePodInfo(pod *rktapi.Pod) (podManifest *appcschema.PodManifest, creationTime time.Time, restartCount int, err error) { + var manifest appcschema.PodManifest + if err = json.Unmarshal(pod.Manifest, &manifest); err != nil { + return + } + + creationTimeStr, ok := manifest.Annotations.Get(k8sRktCreationTimeAnno) + if !ok { + err = fmt.Errorf("no creation timestamp in pod manifest") + return + } + unixSec, err := strconv.ParseInt(creationTimeStr, 10, 64) + if err != nil { + return + } + + if countString, ok := manifest.Annotations.Get(k8sRktRestartCountAnno); ok { + restartCount, err = strconv.Atoi(countString) + if err != nil { + return + } + } + + return &manifest, time.Unix(unixSec, 0), restartCount, nil +} + +func (r *Runtime) GetPodStatus(uid types.UID, name, namespace string) (*kubecontainer.PodStatus, error) { + podStatus := &kubecontainer.PodStatus{ + ID: uid, + Name: name, + Namespace: namespace, + } + + // TODO(yifan): Use ListPods(detail=true) to avoid another InspectPod rpc here. + listReq := &rktapi.ListPodsRequest{ + Filter: &rktapi.PodFilter{ + Annotations: []*rktapi.KeyValue{ + { + Key: k8sRktKubeletAnno, + Value: k8sRktKubeletAnnoValue, + }, + { + Key: k8sRktUIDAnno, + Value: string(uid), + }, + }, + }, + } + + listResp, err := r.apisvc.ListPods(context.Background(), listReq) + if err != nil { + return nil, fmt.Errorf("couldn't list pods: %v", err) + } + + var latestPod *rktapi.Pod + var latestRestartCount int = -1 + + // Only use the latest pod to get the information, (which has the + // largest restart count). + for _, rktpod := range listResp.Pods { + inspectReq := &rktapi.InspectPodRequest{rktpod.Id} + inspectResp, err := r.apisvc.InspectPod(context.Background(), inspectReq) + if err != nil { + glog.Warningf("rkt: Couldn't inspect rkt pod, (uuid %q): %v", rktpod.Id, err) + continue + } + + pod := inspectResp.Pod + + manifest, creationTime, restartCount, err := retrievePodInfo(pod) + if err != nil { + glog.Warning("rkt: Couldn't get necessary info from the rkt pod, (uuid %q): %v", pod.Id, err) + continue + } + + if restartCount > latestRestartCount { + latestPod = pod + latestRestartCount = restartCount + } + + for i, app := range pod.Apps { + // The order of the apps is determined by the rkt pod manifest. + // TODO(yifan): Let the server to unmarshal the annotations? + hashStr, ok := manifest.Apps[i].Annotations.Get(k8sRktContainerHashAnno) + if !ok { + glog.Warningf("rkt: No container hash in pod manifest for rkt pod, (uuid %q, app %q)", pod.Id, app.Name) + continue + } + + hashNum, err := strconv.ParseUint(hashStr, 10, 64) + if err != nil { + glog.Warningf("rkt: Invalid hash value %q for rkt pod, (uuid %q, app %q)", pod.Id, app.Name) + continue + } + + cs := kubecontainer.ContainerStatus{ + ID: buildContainerID(&containerID{uuid: pod.Id, appName: app.Name}), + Name: app.Name, + State: appStateToContainerState(app.State), + // TODO(yifan): Use the creation/start/finished timestamp when it's implemented. + CreatedAt: creationTime, + StartedAt: creationTime, + ExitCode: int(app.ExitCode), + Image: app.Image.Name, + ImageID: "rkt://" + app.Image.Id, // TODO(yifan): Add the prefix only in api.PodStatus. + Hash: hashNum, + // TODO(yifan): Note that now all apps share the same restart count, this might + // change once apps don't share the same lifecycle. + // See https://github.com/appc/spec/pull/547. + RestartCount: restartCount, + // TODO(yifan): Add reason and message field. + } + + podStatus.ContainerStatuses = append(podStatus.ContainerStatuses, &cs) + + } + } + + if latestPod != nil { + // Try to fill the IP info. + for _, n := range latestPod.Networks { + if n.Name == defaultNetworkName { + podStatus.IP = n.Ipv4 + } + } + } + + return podStatus, nil +} + +type sortByRestartCount []*kubecontainer.ContainerStatus + +func (s sortByRestartCount) Len() int { return len(s) } +func (s sortByRestartCount) Swap(i, j int) { s[i], s[j] = s[j], s[i] } +func (s sortByRestartCount) Less(i, j int) bool { return s[i].RestartCount > s[j].RestartCount } + +// TODO(yifan): Delete this function when the logic is moved to kubelet. +func (r *Runtime) ConvertPodStatusToAPIPodStatus(pod *api.Pod, status *kubecontainer.PodStatus) (*api.PodStatus, error) { + apiPodStatus := &api.PodStatus{ + // TODO(yifan): Add reason and message field. + PodIP: status.IP, + } + + sort.Sort(sortByRestartCount(status.ContainerStatuses)) + + containerStatuses := make(map[string]*api.ContainerStatus) + for _, c := range status.ContainerStatuses { + var st api.ContainerState + switch c.State { + case kubecontainer.ContainerStateRunning: + st.Running = &api.ContainerStateRunning{ + StartedAt: unversioned.NewTime(c.StartedAt), + } + case kubecontainer.ContainerStateExited: + if pod.Spec.RestartPolicy == api.RestartPolicyAlways || + pod.Spec.RestartPolicy == api.RestartPolicyOnFailure && c.ExitCode != 0 { + // TODO(yifan): Add reason and message. + st.Waiting = &api.ContainerStateWaiting{} + break + } + st.Terminated = &api.ContainerStateTerminated{ + ExitCode: c.ExitCode, + StartedAt: unversioned.NewTime(c.StartedAt), + // TODO(yifan): Add reason, message, finishedAt, signal. + ContainerID: c.ID.String(), + } + default: + // Unknown state. + // TODO(yifan): Add reason and message. + st.Waiting = &api.ContainerStateWaiting{} + } + + status, ok := containerStatuses[c.Name] + if !ok { + containerStatuses[c.Name] = &api.ContainerStatus{ + Name: c.Name, + Image: c.Image, + ImageID: c.ImageID, + ContainerID: c.ID.String(), + RestartCount: c.RestartCount, + State: st, + } + continue + } + + // Found multiple container statuses, fill that as last termination state. + if status.LastTerminationState.Waiting == nil && + status.LastTerminationState.Running == nil && + status.LastTerminationState.Terminated == nil { + status.LastTerminationState = st + } + } + + for _, c := range pod.Spec.Containers { + cs, ok := containerStatuses[c.Name] + if !ok { + cs = &api.ContainerStatus{ + Name: c.Name, + Image: c.Image, + // TODO(yifan): Add reason and message. + State: api.ContainerState{Waiting: &api.ContainerStateWaiting{}}, + } + } + apiPodStatus.ContainerStatuses = append(apiPodStatus.ContainerStatuses, *cs) + } + + return apiPodStatus, nil } func (r *Runtime) GetPodStatusAndAPIPodStatus(pod *api.Pod) (*kubecontainer.PodStatus, *api.PodStatus, error) { - podStatus, err := r.GetAPIPodStatus(pod) - return nil, podStatus, err - + // Get the pod status. + podStatus, err := r.GetPodStatus(pod.UID, pod.Name, pod.Namespace) + if err != nil { + return nil, nil, err + } + var apiPodStatus *api.PodStatus + apiPodStatus, err = r.ConvertPodStatusToAPIPodStatus(pod, podStatus) + return podStatus, apiPodStatus, err } diff --git a/pkg/kubelet/rkt/rkt_test.go b/pkg/kubelet/rkt/rkt_test.go index 7a71545de2c..eee9e927f56 100644 --- a/pkg/kubelet/rkt/rkt_test.go +++ b/pkg/kubelet/rkt/rkt_test.go @@ -20,11 +20,13 @@ import ( "encoding/json" "fmt" "testing" + "time" appcschema "github.com/appc/spec/schema" appctypes "github.com/appc/spec/schema/types" rktapi "github.com/coreos/rkt/api/v1alpha" "github.com/stretchr/testify/assert" + kubecontainer "k8s.io/kubernetes/pkg/kubelet/container" "k8s.io/kubernetes/pkg/types" ) @@ -459,3 +461,255 @@ func mustMarshalImageManifest(man *appcschema.ImageManifest) []byte { } return manblob } + +func mustRktHash(hash string) *appctypes.Hash { + h, err := appctypes.NewHash(hash) + if err != nil { + panic(err) + } + return h +} + +func makeRktPod(rktPodState rktapi.PodState, + rktPodID, podUID, podName, podNamespace, + podIP, podCreationTs, podRestartCount string, + appNames, imgIDs, imgNames, containerHashes []string, + appStates []rktapi.AppState) *rktapi.Pod { + + podManifest := &appcschema.PodManifest{ + ACKind: appcschema.PodManifestKind, + ACVersion: appcschema.AppContainerVersion, + Annotations: appctypes.Annotations{ + appctypes.Annotation{ + Name: *appctypes.MustACIdentifier(k8sRktKubeletAnno), + Value: k8sRktKubeletAnnoValue, + }, + appctypes.Annotation{ + Name: *appctypes.MustACIdentifier(k8sRktUIDAnno), + Value: podUID, + }, + appctypes.Annotation{ + Name: *appctypes.MustACIdentifier(k8sRktNameAnno), + Value: podName, + }, + appctypes.Annotation{ + Name: *appctypes.MustACIdentifier(k8sRktNamespaceAnno), + Value: podNamespace, + }, + appctypes.Annotation{ + Name: *appctypes.MustACIdentifier(k8sRktCreationTimeAnno), + Value: podCreationTs, + }, + appctypes.Annotation{ + Name: *appctypes.MustACIdentifier(k8sRktRestartCountAnno), + Value: podRestartCount, + }, + }, + } + + appNum := len(appNames) + if appNum != len(imgNames) || + appNum != len(imgIDs) || + appNum != len(containerHashes) || + appNum != len(appStates) { + panic("inconsistent app number") + } + + apps := make([]*rktapi.App, appNum) + for i := range appNames { + apps[i] = &rktapi.App{ + Name: appNames[i], + State: appStates[i], + Image: &rktapi.Image{ + Id: imgIDs[i], + Name: imgNames[i], + Manifest: mustMarshalImageManifest( + &appcschema.ImageManifest{ + ACKind: appcschema.ImageManifestKind, + ACVersion: appcschema.AppContainerVersion, + Name: *appctypes.MustACIdentifier(imgNames[i]), + Annotations: appctypes.Annotations{ + appctypes.Annotation{ + Name: *appctypes.MustACIdentifier(k8sRktContainerHashAnno), + Value: containerHashes[i], + }, + }, + }, + ), + }, + } + podManifest.Apps = append(podManifest.Apps, appcschema.RuntimeApp{ + Name: *appctypes.MustACName(appNames[i]), + Image: appcschema.RuntimeImage{ID: *mustRktHash("sha512-foo")}, + Annotations: appctypes.Annotations{ + appctypes.Annotation{ + Name: *appctypes.MustACIdentifier(k8sRktContainerHashAnno), + Value: containerHashes[i], + }, + }, + }) + } + + return &rktapi.Pod{ + Id: rktPodID, + State: rktPodState, + Networks: []*rktapi.Network{{Name: defaultNetworkName, Ipv4: podIP}}, + Apps: apps, + Manifest: mustMarshalPodManifest(podManifest), + } +} + +func TestGetPodStatus(t *testing.T) { + fr := newFakeRktInterface() + fs := newFakeSystemd() + r := &Runtime{apisvc: fr, systemd: fs} + + tests := []struct { + pods []*rktapi.Pod + result *kubecontainer.PodStatus + }{ + // No pods. + { + nil, + &kubecontainer.PodStatus{ID: "42", Name: "guestbook", Namespace: "default"}, + }, + // One pod. + { + []*rktapi.Pod{ + makeRktPod(rktapi.PodState_POD_STATE_RUNNING, + "uuid-4002", "42", "guestbook", "default", + "10.10.10.42", "100000", "7", + []string{"app-1", "app-2"}, + []string{"img-id-1", "img-id-2"}, + []string{"img-name-1", "img-name-2"}, + []string{"1001", "1002"}, + []rktapi.AppState{rktapi.AppState_APP_STATE_RUNNING, rktapi.AppState_APP_STATE_EXITED}, + ), + }, + &kubecontainer.PodStatus{ + ID: "42", + Name: "guestbook", + Namespace: "default", + IP: "10.10.10.42", + ContainerStatuses: []*kubecontainer.ContainerStatus{ + { + ID: kubecontainer.BuildContainerID("rkt", "uuid-4002:app-1"), + Name: "app-1", + State: kubecontainer.ContainerStateRunning, + CreatedAt: time.Unix(100000, 0), + StartedAt: time.Unix(100000, 0), + Image: "img-name-1", + ImageID: "rkt://img-id-1", + Hash: 1001, + RestartCount: 7, + }, + { + ID: kubecontainer.BuildContainerID("rkt", "uuid-4002:app-2"), + Name: "app-2", + State: kubecontainer.ContainerStateExited, + CreatedAt: time.Unix(100000, 0), + StartedAt: time.Unix(100000, 0), + Image: "img-name-2", + ImageID: "rkt://img-id-2", + Hash: 1002, + RestartCount: 7, + }, + }, + }, + }, + // Multiple pods. + { + []*rktapi.Pod{ + makeRktPod(rktapi.PodState_POD_STATE_EXITED, + "uuid-4002", "42", "guestbook", "default", + "10.10.10.42", "90000", "7", + []string{"app-1", "app-2"}, + []string{"img-id-1", "img-id-2"}, + []string{"img-name-1", "img-name-2"}, + []string{"1001", "1002"}, + []rktapi.AppState{rktapi.AppState_APP_STATE_RUNNING, rktapi.AppState_APP_STATE_EXITED}, + ), + makeRktPod(rktapi.PodState_POD_STATE_RUNNING, // The latest pod is running. + "uuid-4003", "42", "guestbook", "default", + "10.10.10.42", "100000", "10", + []string{"app-1", "app-2"}, + []string{"img-id-1", "img-id-2"}, + []string{"img-name-1", "img-name-2"}, + []string{"1001", "1002"}, + []rktapi.AppState{rktapi.AppState_APP_STATE_RUNNING, rktapi.AppState_APP_STATE_EXITED}, + ), + }, + &kubecontainer.PodStatus{ + ID: "42", + Name: "guestbook", + Namespace: "default", + IP: "10.10.10.42", + // Result should contain all contianers. + ContainerStatuses: []*kubecontainer.ContainerStatus{ + { + ID: kubecontainer.BuildContainerID("rkt", "uuid-4002:app-1"), + Name: "app-1", + State: kubecontainer.ContainerStateRunning, + CreatedAt: time.Unix(90000, 0), + StartedAt: time.Unix(90000, 0), + Image: "img-name-1", + ImageID: "rkt://img-id-1", + Hash: 1001, + RestartCount: 7, + }, + { + ID: kubecontainer.BuildContainerID("rkt", "uuid-4002:app-2"), + Name: "app-2", + State: kubecontainer.ContainerStateExited, + CreatedAt: time.Unix(90000, 0), + StartedAt: time.Unix(90000, 0), + Image: "img-name-2", + ImageID: "rkt://img-id-2", + Hash: 1002, + RestartCount: 7, + }, + { + ID: kubecontainer.BuildContainerID("rkt", "uuid-4003:app-1"), + Name: "app-1", + State: kubecontainer.ContainerStateRunning, + CreatedAt: time.Unix(100000, 0), + StartedAt: time.Unix(100000, 0), + Image: "img-name-1", + ImageID: "rkt://img-id-1", + Hash: 1001, + RestartCount: 10, + }, + { + ID: kubecontainer.BuildContainerID("rkt", "uuid-4003:app-2"), + Name: "app-2", + State: kubecontainer.ContainerStateExited, + CreatedAt: time.Unix(100000, 0), + StartedAt: time.Unix(100000, 0), + Image: "img-name-2", + ImageID: "rkt://img-id-2", + Hash: 1002, + RestartCount: 10, + }, + }, + }, + }, + } + + for i, tt := range tests { + testCaseHint := fmt.Sprintf("test case #%d", i) + fr.pods = tt.pods + + status, err := r.GetPodStatus("42", "guestbook", "default") + if err != nil { + t.Errorf("test case #%d: unexpected error: %v", i, err) + } + assert.Equal(t, tt.result, status, testCaseHint) + + var inspectPodCalls []string + for range tt.pods { + inspectPodCalls = append(inspectPodCalls, "InspectPod") + } + assert.Equal(t, append([]string{"ListPods"}, inspectPodCalls...), fr.called, testCaseHint) + fr.CleanCalls() + } +}