rkt: Add GetPodStatus(), GetPodStatusAndAPIPodStatus().

Also add ConvertPodStatusToAPIPodStatus().
Add tests for GetPodStatus().
This commit is contained in:
Yifan Gu 2015-12-11 17:09:21 -08:00
parent d3243b8778
commit 5f4e6d0908
2 changed files with 483 additions and 8 deletions

View File

@ -25,6 +25,7 @@ import (
"os"
"os/exec"
"path"
"sort"
"strconv"
"strings"
"syscall"
@ -40,6 +41,7 @@ import (
"github.com/golang/glog"
"golang.org/x/net/context"
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/api/unversioned"
"k8s.io/kubernetes/pkg/client/record"
"k8s.io/kubernetes/pkg/credentialprovider"
kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
@ -89,6 +91,7 @@ const (
defaultImageTag = "latest"
defaultRktAPIServiceAddr = "localhost:15441"
defaultNetworkName = "rkt.kubernetes.io"
)
// Runtime implements the Containerruntime for rkt. The implementation
@ -680,7 +683,7 @@ func (r *Runtime) preparePod(pod *api.Pod, pullSecrets []api.Secret) (string, *k
if pod.Spec.SecurityContext != nil && pod.Spec.SecurityContext.HostNetwork {
runPrepared = fmt.Sprintf("%s run-prepared --mds-register=false --net=host %s", r.rktBinAbsPath, uuid)
} else {
runPrepared = fmt.Sprintf("%s run-prepared --mds-register=false %s", r.rktBinAbsPath, uuid)
runPrepared = fmt.Sprintf("%s run-prepared --mds-register=false --net=%s %s", r.rktBinAbsPath, defaultNetworkName, uuid)
}
// TODO handle pod.Spec.HostPID
@ -1521,16 +1524,234 @@ func (r *Runtime) RemoveImage(image kubecontainer.ImageSpec) error {
return nil
}
func (r *Runtime) GetPodStatus(uid types.UID, name, namespace string) (*kubecontainer.PodStatus, error) {
return nil, fmt.Errorf("Not implemented yet")
// appStateToContainerState converts rktapi.AppState to kubecontainer.ContainerState.
func appStateToContainerState(state rktapi.AppState) kubecontainer.ContainerState {
switch state {
case rktapi.AppState_APP_STATE_RUNNING:
return kubecontainer.ContainerStateRunning
case rktapi.AppState_APP_STATE_EXITED:
return kubecontainer.ContainerStateExited
}
return kubecontainer.ContainerStateUnknown
}
func (r *Runtime) ConvertPodStatusToAPIPodStatus(_ *api.Pod, _ *kubecontainer.PodStatus) (*api.PodStatus, error) {
return nil, fmt.Errorf("Not implemented yet")
// TODO(yifan): Rename to getPodInfo when the old getPodInfo is removed.
func retrievePodInfo(pod *rktapi.Pod) (podManifest *appcschema.PodManifest, creationTime time.Time, restartCount int, err error) {
var manifest appcschema.PodManifest
if err = json.Unmarshal(pod.Manifest, &manifest); err != nil {
return
}
creationTimeStr, ok := manifest.Annotations.Get(k8sRktCreationTimeAnno)
if !ok {
err = fmt.Errorf("no creation timestamp in pod manifest")
return
}
unixSec, err := strconv.ParseInt(creationTimeStr, 10, 64)
if err != nil {
return
}
if countString, ok := manifest.Annotations.Get(k8sRktRestartCountAnno); ok {
restartCount, err = strconv.Atoi(countString)
if err != nil {
return
}
}
return &manifest, time.Unix(unixSec, 0), restartCount, nil
}
func (r *Runtime) GetPodStatus(uid types.UID, name, namespace string) (*kubecontainer.PodStatus, error) {
podStatus := &kubecontainer.PodStatus{
ID: uid,
Name: name,
Namespace: namespace,
}
// TODO(yifan): Use ListPods(detail=true) to avoid another InspectPod rpc here.
listReq := &rktapi.ListPodsRequest{
Filter: &rktapi.PodFilter{
Annotations: []*rktapi.KeyValue{
{
Key: k8sRktKubeletAnno,
Value: k8sRktKubeletAnnoValue,
},
{
Key: k8sRktUIDAnno,
Value: string(uid),
},
},
},
}
listResp, err := r.apisvc.ListPods(context.Background(), listReq)
if err != nil {
return nil, fmt.Errorf("couldn't list pods: %v", err)
}
var latestPod *rktapi.Pod
var latestRestartCount int = -1
// Only use the latest pod to get the information, (which has the
// largest restart count).
for _, rktpod := range listResp.Pods {
inspectReq := &rktapi.InspectPodRequest{rktpod.Id}
inspectResp, err := r.apisvc.InspectPod(context.Background(), inspectReq)
if err != nil {
glog.Warningf("rkt: Couldn't inspect rkt pod, (uuid %q): %v", rktpod.Id, err)
continue
}
pod := inspectResp.Pod
manifest, creationTime, restartCount, err := retrievePodInfo(pod)
if err != nil {
glog.Warning("rkt: Couldn't get necessary info from the rkt pod, (uuid %q): %v", pod.Id, err)
continue
}
if restartCount > latestRestartCount {
latestPod = pod
latestRestartCount = restartCount
}
for i, app := range pod.Apps {
// The order of the apps is determined by the rkt pod manifest.
// TODO(yifan): Let the server to unmarshal the annotations?
hashStr, ok := manifest.Apps[i].Annotations.Get(k8sRktContainerHashAnno)
if !ok {
glog.Warningf("rkt: No container hash in pod manifest for rkt pod, (uuid %q, app %q)", pod.Id, app.Name)
continue
}
hashNum, err := strconv.ParseUint(hashStr, 10, 64)
if err != nil {
glog.Warningf("rkt: Invalid hash value %q for rkt pod, (uuid %q, app %q)", pod.Id, app.Name)
continue
}
cs := kubecontainer.ContainerStatus{
ID: buildContainerID(&containerID{uuid: pod.Id, appName: app.Name}),
Name: app.Name,
State: appStateToContainerState(app.State),
// TODO(yifan): Use the creation/start/finished timestamp when it's implemented.
CreatedAt: creationTime,
StartedAt: creationTime,
ExitCode: int(app.ExitCode),
Image: app.Image.Name,
ImageID: "rkt://" + app.Image.Id, // TODO(yifan): Add the prefix only in api.PodStatus.
Hash: hashNum,
// TODO(yifan): Note that now all apps share the same restart count, this might
// change once apps don't share the same lifecycle.
// See https://github.com/appc/spec/pull/547.
RestartCount: restartCount,
// TODO(yifan): Add reason and message field.
}
podStatus.ContainerStatuses = append(podStatus.ContainerStatuses, &cs)
}
}
if latestPod != nil {
// Try to fill the IP info.
for _, n := range latestPod.Networks {
if n.Name == defaultNetworkName {
podStatus.IP = n.Ipv4
}
}
}
return podStatus, nil
}
type sortByRestartCount []*kubecontainer.ContainerStatus
func (s sortByRestartCount) Len() int { return len(s) }
func (s sortByRestartCount) Swap(i, j int) { s[i], s[j] = s[j], s[i] }
func (s sortByRestartCount) Less(i, j int) bool { return s[i].RestartCount > s[j].RestartCount }
// TODO(yifan): Delete this function when the logic is moved to kubelet.
func (r *Runtime) ConvertPodStatusToAPIPodStatus(pod *api.Pod, status *kubecontainer.PodStatus) (*api.PodStatus, error) {
apiPodStatus := &api.PodStatus{
// TODO(yifan): Add reason and message field.
PodIP: status.IP,
}
sort.Sort(sortByRestartCount(status.ContainerStatuses))
containerStatuses := make(map[string]*api.ContainerStatus)
for _, c := range status.ContainerStatuses {
var st api.ContainerState
switch c.State {
case kubecontainer.ContainerStateRunning:
st.Running = &api.ContainerStateRunning{
StartedAt: unversioned.NewTime(c.StartedAt),
}
case kubecontainer.ContainerStateExited:
if pod.Spec.RestartPolicy == api.RestartPolicyAlways ||
pod.Spec.RestartPolicy == api.RestartPolicyOnFailure && c.ExitCode != 0 {
// TODO(yifan): Add reason and message.
st.Waiting = &api.ContainerStateWaiting{}
break
}
st.Terminated = &api.ContainerStateTerminated{
ExitCode: c.ExitCode,
StartedAt: unversioned.NewTime(c.StartedAt),
// TODO(yifan): Add reason, message, finishedAt, signal.
ContainerID: c.ID.String(),
}
default:
// Unknown state.
// TODO(yifan): Add reason and message.
st.Waiting = &api.ContainerStateWaiting{}
}
status, ok := containerStatuses[c.Name]
if !ok {
containerStatuses[c.Name] = &api.ContainerStatus{
Name: c.Name,
Image: c.Image,
ImageID: c.ImageID,
ContainerID: c.ID.String(),
RestartCount: c.RestartCount,
State: st,
}
continue
}
// Found multiple container statuses, fill that as last termination state.
if status.LastTerminationState.Waiting == nil &&
status.LastTerminationState.Running == nil &&
status.LastTerminationState.Terminated == nil {
status.LastTerminationState = st
}
}
for _, c := range pod.Spec.Containers {
cs, ok := containerStatuses[c.Name]
if !ok {
cs = &api.ContainerStatus{
Name: c.Name,
Image: c.Image,
// TODO(yifan): Add reason and message.
State: api.ContainerState{Waiting: &api.ContainerStateWaiting{}},
}
}
apiPodStatus.ContainerStatuses = append(apiPodStatus.ContainerStatuses, *cs)
}
return apiPodStatus, nil
}
func (r *Runtime) GetPodStatusAndAPIPodStatus(pod *api.Pod) (*kubecontainer.PodStatus, *api.PodStatus, error) {
podStatus, err := r.GetAPIPodStatus(pod)
return nil, podStatus, err
// Get the pod status.
podStatus, err := r.GetPodStatus(pod.UID, pod.Name, pod.Namespace)
if err != nil {
return nil, nil, err
}
var apiPodStatus *api.PodStatus
apiPodStatus, err = r.ConvertPodStatusToAPIPodStatus(pod, podStatus)
return podStatus, apiPodStatus, err
}

View File

@ -20,11 +20,13 @@ import (
"encoding/json"
"fmt"
"testing"
"time"
appcschema "github.com/appc/spec/schema"
appctypes "github.com/appc/spec/schema/types"
rktapi "github.com/coreos/rkt/api/v1alpha"
"github.com/stretchr/testify/assert"
kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
"k8s.io/kubernetes/pkg/types"
)
@ -459,3 +461,255 @@ func mustMarshalImageManifest(man *appcschema.ImageManifest) []byte {
}
return manblob
}
func mustRktHash(hash string) *appctypes.Hash {
h, err := appctypes.NewHash(hash)
if err != nil {
panic(err)
}
return h
}
func makeRktPod(rktPodState rktapi.PodState,
rktPodID, podUID, podName, podNamespace,
podIP, podCreationTs, podRestartCount string,
appNames, imgIDs, imgNames, containerHashes []string,
appStates []rktapi.AppState) *rktapi.Pod {
podManifest := &appcschema.PodManifest{
ACKind: appcschema.PodManifestKind,
ACVersion: appcschema.AppContainerVersion,
Annotations: appctypes.Annotations{
appctypes.Annotation{
Name: *appctypes.MustACIdentifier(k8sRktKubeletAnno),
Value: k8sRktKubeletAnnoValue,
},
appctypes.Annotation{
Name: *appctypes.MustACIdentifier(k8sRktUIDAnno),
Value: podUID,
},
appctypes.Annotation{
Name: *appctypes.MustACIdentifier(k8sRktNameAnno),
Value: podName,
},
appctypes.Annotation{
Name: *appctypes.MustACIdentifier(k8sRktNamespaceAnno),
Value: podNamespace,
},
appctypes.Annotation{
Name: *appctypes.MustACIdentifier(k8sRktCreationTimeAnno),
Value: podCreationTs,
},
appctypes.Annotation{
Name: *appctypes.MustACIdentifier(k8sRktRestartCountAnno),
Value: podRestartCount,
},
},
}
appNum := len(appNames)
if appNum != len(imgNames) ||
appNum != len(imgIDs) ||
appNum != len(containerHashes) ||
appNum != len(appStates) {
panic("inconsistent app number")
}
apps := make([]*rktapi.App, appNum)
for i := range appNames {
apps[i] = &rktapi.App{
Name: appNames[i],
State: appStates[i],
Image: &rktapi.Image{
Id: imgIDs[i],
Name: imgNames[i],
Manifest: mustMarshalImageManifest(
&appcschema.ImageManifest{
ACKind: appcschema.ImageManifestKind,
ACVersion: appcschema.AppContainerVersion,
Name: *appctypes.MustACIdentifier(imgNames[i]),
Annotations: appctypes.Annotations{
appctypes.Annotation{
Name: *appctypes.MustACIdentifier(k8sRktContainerHashAnno),
Value: containerHashes[i],
},
},
},
),
},
}
podManifest.Apps = append(podManifest.Apps, appcschema.RuntimeApp{
Name: *appctypes.MustACName(appNames[i]),
Image: appcschema.RuntimeImage{ID: *mustRktHash("sha512-foo")},
Annotations: appctypes.Annotations{
appctypes.Annotation{
Name: *appctypes.MustACIdentifier(k8sRktContainerHashAnno),
Value: containerHashes[i],
},
},
})
}
return &rktapi.Pod{
Id: rktPodID,
State: rktPodState,
Networks: []*rktapi.Network{{Name: defaultNetworkName, Ipv4: podIP}},
Apps: apps,
Manifest: mustMarshalPodManifest(podManifest),
}
}
func TestGetPodStatus(t *testing.T) {
fr := newFakeRktInterface()
fs := newFakeSystemd()
r := &Runtime{apisvc: fr, systemd: fs}
tests := []struct {
pods []*rktapi.Pod
result *kubecontainer.PodStatus
}{
// No pods.
{
nil,
&kubecontainer.PodStatus{ID: "42", Name: "guestbook", Namespace: "default"},
},
// One pod.
{
[]*rktapi.Pod{
makeRktPod(rktapi.PodState_POD_STATE_RUNNING,
"uuid-4002", "42", "guestbook", "default",
"10.10.10.42", "100000", "7",
[]string{"app-1", "app-2"},
[]string{"img-id-1", "img-id-2"},
[]string{"img-name-1", "img-name-2"},
[]string{"1001", "1002"},
[]rktapi.AppState{rktapi.AppState_APP_STATE_RUNNING, rktapi.AppState_APP_STATE_EXITED},
),
},
&kubecontainer.PodStatus{
ID: "42",
Name: "guestbook",
Namespace: "default",
IP: "10.10.10.42",
ContainerStatuses: []*kubecontainer.ContainerStatus{
{
ID: kubecontainer.BuildContainerID("rkt", "uuid-4002:app-1"),
Name: "app-1",
State: kubecontainer.ContainerStateRunning,
CreatedAt: time.Unix(100000, 0),
StartedAt: time.Unix(100000, 0),
Image: "img-name-1",
ImageID: "rkt://img-id-1",
Hash: 1001,
RestartCount: 7,
},
{
ID: kubecontainer.BuildContainerID("rkt", "uuid-4002:app-2"),
Name: "app-2",
State: kubecontainer.ContainerStateExited,
CreatedAt: time.Unix(100000, 0),
StartedAt: time.Unix(100000, 0),
Image: "img-name-2",
ImageID: "rkt://img-id-2",
Hash: 1002,
RestartCount: 7,
},
},
},
},
// Multiple pods.
{
[]*rktapi.Pod{
makeRktPod(rktapi.PodState_POD_STATE_EXITED,
"uuid-4002", "42", "guestbook", "default",
"10.10.10.42", "90000", "7",
[]string{"app-1", "app-2"},
[]string{"img-id-1", "img-id-2"},
[]string{"img-name-1", "img-name-2"},
[]string{"1001", "1002"},
[]rktapi.AppState{rktapi.AppState_APP_STATE_RUNNING, rktapi.AppState_APP_STATE_EXITED},
),
makeRktPod(rktapi.PodState_POD_STATE_RUNNING, // The latest pod is running.
"uuid-4003", "42", "guestbook", "default",
"10.10.10.42", "100000", "10",
[]string{"app-1", "app-2"},
[]string{"img-id-1", "img-id-2"},
[]string{"img-name-1", "img-name-2"},
[]string{"1001", "1002"},
[]rktapi.AppState{rktapi.AppState_APP_STATE_RUNNING, rktapi.AppState_APP_STATE_EXITED},
),
},
&kubecontainer.PodStatus{
ID: "42",
Name: "guestbook",
Namespace: "default",
IP: "10.10.10.42",
// Result should contain all contianers.
ContainerStatuses: []*kubecontainer.ContainerStatus{
{
ID: kubecontainer.BuildContainerID("rkt", "uuid-4002:app-1"),
Name: "app-1",
State: kubecontainer.ContainerStateRunning,
CreatedAt: time.Unix(90000, 0),
StartedAt: time.Unix(90000, 0),
Image: "img-name-1",
ImageID: "rkt://img-id-1",
Hash: 1001,
RestartCount: 7,
},
{
ID: kubecontainer.BuildContainerID("rkt", "uuid-4002:app-2"),
Name: "app-2",
State: kubecontainer.ContainerStateExited,
CreatedAt: time.Unix(90000, 0),
StartedAt: time.Unix(90000, 0),
Image: "img-name-2",
ImageID: "rkt://img-id-2",
Hash: 1002,
RestartCount: 7,
},
{
ID: kubecontainer.BuildContainerID("rkt", "uuid-4003:app-1"),
Name: "app-1",
State: kubecontainer.ContainerStateRunning,
CreatedAt: time.Unix(100000, 0),
StartedAt: time.Unix(100000, 0),
Image: "img-name-1",
ImageID: "rkt://img-id-1",
Hash: 1001,
RestartCount: 10,
},
{
ID: kubecontainer.BuildContainerID("rkt", "uuid-4003:app-2"),
Name: "app-2",
State: kubecontainer.ContainerStateExited,
CreatedAt: time.Unix(100000, 0),
StartedAt: time.Unix(100000, 0),
Image: "img-name-2",
ImageID: "rkt://img-id-2",
Hash: 1002,
RestartCount: 10,
},
},
},
},
}
for i, tt := range tests {
testCaseHint := fmt.Sprintf("test case #%d", i)
fr.pods = tt.pods
status, err := r.GetPodStatus("42", "guestbook", "default")
if err != nil {
t.Errorf("test case #%d: unexpected error: %v", i, err)
}
assert.Equal(t, tt.result, status, testCaseHint)
var inspectPodCalls []string
for range tt.pods {
inspectPodCalls = append(inspectPodCalls, "InspectPod")
}
assert.Equal(t, append([]string{"ListPods"}, inspectPodCalls...), fr.called, testCaseHint)
fr.CleanCalls()
}
}