rkt: Update for newest api version.

This commit is contained in:
Yifan Gu 2015-12-16 16:52:39 -08:00
parent 1693b74006
commit ee7251ed10
3 changed files with 172 additions and 203 deletions

View File

@ -30,12 +30,12 @@ import (
// fakeRktInterface mocks the rktapi.PublicAPIClient interface for testing purpose.
type fakeRktInterface struct {
sync.Mutex
info rktapi.Info
images []*rktapi.Image
podFilter *rktapi.PodFilter
pods []*rktapi.Pod
called []string
err error
info rktapi.Info
images []*rktapi.Image
podFilters []*rktapi.PodFilter
pods []*rktapi.Pod
called []string
err error
}
func newFakeRktInterface() *fakeRktInterface {
@ -61,7 +61,7 @@ func (f *fakeRktInterface) ListPods(ctx context.Context, in *rktapi.ListPodsRequ
defer f.Unlock()
f.called = append(f.called, "ListPods")
f.podFilter = in.Filter
f.podFilters = in.Filters
return &rktapi.ListPodsResponse{f.pods}, f.err
}

View File

@ -58,9 +58,9 @@ import (
const (
RktType = "rkt"
minimumAppcVersion = "0.7.1"
minimumRktBinVersion = "0.9.0"
recommendedRktBinVersion = "0.9.0"
minimumAppcVersion = "0.7.4"
minimumRktBinVersion = "0.13.0"
recommendedRktBinVersion = "0.13.0"
minimumRktApiVersion = "1.0.0-alpha"
minimumSystemdVersion = "219"
@ -403,24 +403,38 @@ func setApp(app *appctypes.App, c *api.Container, opts *kubecontainer.RunContain
return setIsolators(app, c)
}
// getImageManifest invokes 'rkt image cat-manifest' to retrive the image manifest
// for the image.
// listImages lists the images that have the given name. If detail is true,
// then image manifest is also included in the result.
func (r *Runtime) listImages(image string, detail bool) ([]*rktapi.Image, error) {
repoToPull, tag := parsers.ParseImageName(image)
listResp, err := r.apisvc.ListImages(context.Background(), &rktapi.ListImagesRequest{
Detail: detail,
Filters: []*rktapi.ImageFilter{
{
BaseNames: []string{repoToPull},
Labels: []*rktapi.KeyValue{{Key: "version", Value: tag}},
},
},
})
if err != nil {
return nil, fmt.Errorf("couldn't list images: %v", err)
}
return listResp.Images, nil
}
// getImageManifest retrives the image manifest for the given image.
func (r *Runtime) getImageManifest(image string) (*appcschema.ImageManifest, error) {
var manifest appcschema.ImageManifest
repoToPull, tag := parsers.ParseImageName(image)
imgName, err := appctypes.SanitizeACIdentifier(repoToPull)
images, err := r.listImages(image, true)
if err != nil {
return nil, err
}
output, err := r.runCommand("image", "cat-manifest", fmt.Sprintf("%s:%s", imgName, tag))
if err != nil {
return nil, err
if len(images) == 0 {
return nil, fmt.Errorf("cannot find the image %q", image)
}
if len(output) != 1 {
return nil, fmt.Errorf("invalid output: %v", output)
}
return &manifest, json.Unmarshal([]byte(output[0]), &manifest)
return &manifest, json.Unmarshal(images[0].Manifest, &manifest)
}
// makePodManifest transforms a kubelet pod spec to the rkt pod manifest.
@ -429,28 +443,17 @@ func (r *Runtime) makePodManifest(pod *api.Pod, pullSecrets []api.Secret) (*appc
manifest := appcschema.BlankPodManifest()
listResp, err := r.apisvc.ListPods(context.Background(), &rktapi.ListPodsRequest{
Filter: kubernetesPodFilter(pod),
Detail: true,
Filters: kubernetesPodFilters(pod.UID),
})
if err != nil {
return nil, fmt.Errorf("couldn't list pods: %v", err)
}
restartCount := 0
for _, rktpod := range listResp.Pods {
//TODO: get the manifest from listresp.Pods when this gets merged: https://github.com/coreos/rkt/pull/1786
inspectResp, err := r.apisvc.InspectPod(context.Background(), &rktapi.InspectPodRequest{rktpod.Id})
if err != nil {
glog.Warningf("rkt: error while inspecting pod %s", rktpod.Id)
continue
}
if inspectResp.Pod == nil {
glog.Warningf("rkt: pod %s vanished?!", rktpod.Id)
continue
}
for _, pod := range listResp.Pods {
manifest := &appcschema.PodManifest{}
err = json.Unmarshal(inspectResp.Pod.Manifest, manifest)
err = json.Unmarshal(pod.Manifest, manifest)
if err != nil {
glog.Warningf("rkt: error unmatshaling pod manifest: %v", err)
continue
@ -531,11 +534,11 @@ func (r *Runtime) newAppcRuntimeApp(pod *api.Pod, c api.Container, pullSecrets [
imgManifest.App = new(appctypes.App)
}
img, err := r.getImageByName(c.Image)
imageID, err := r.getImageID(c.Image)
if err != nil {
return nil, nil, err
}
hash, err := appctypes.NewHash(img.ID)
hash, err := appctypes.NewHash(imageID)
if err != nil {
return nil, nil, err
}
@ -570,23 +573,18 @@ func (r *Runtime) newAppcRuntimeApp(pod *api.Pod, c api.Container, pullSecrets [
}, opts.PortMappings, nil
}
func kubernetesPodFilter(pod *api.Pod) *rktapi.PodFilter {
return &rktapi.PodFilter{
States: []rktapi.PodState{
//TODO: In the future some pods can remain running after some apps exit: https://github.com/appc/spec/pull/500
rktapi.PodState_POD_STATE_RUNNING,
rktapi.PodState_POD_STATE_EXITED,
rktapi.PodState_POD_STATE_DELETING,
rktapi.PodState_POD_STATE_GARBAGE,
},
Annotations: []*rktapi.KeyValue{
{
Key: k8sRktKubeletAnno,
Value: k8sRktKubeletAnnoValue,
},
{
Key: k8sRktUIDAnno,
Value: string(pod.UID),
func kubernetesPodFilters(uid types.UID) []*rktapi.PodFilter {
return []*rktapi.PodFilter{
{
Annotations: []*rktapi.KeyValue{
{
Key: k8sRktKubeletAnno,
Value: k8sRktKubeletAnnoValue,
},
{
Key: k8sRktUIDAnno,
Value: string(uid),
},
},
},
}
@ -824,7 +822,7 @@ func (r *Runtime) RunPod(pod *api.Pod, pullSecrets []api.Secret) error {
}
// convertRktPod will convert a rktapi.Pod to a kubecontainer.Pod
func (r *Runtime) convertRktPod(rktpod rktapi.Pod) (*kubecontainer.Pod, error) {
func (r *Runtime) convertRktPod(rktpod *rktapi.Pod) (*kubecontainer.Pod, error) {
manifest := &appcschema.PodManifest{}
err := json.Unmarshal(rktpod.Manifest, manifest)
if err != nil {
@ -935,17 +933,20 @@ func (r *Runtime) GetPods(all bool) ([]*kubecontainer.Pod, error) {
glog.V(4).Infof("Rkt getting pods")
listReq := &rktapi.ListPodsRequest{
Filter: &rktapi.PodFilter{
Annotations: []*rktapi.KeyValue{
{
Key: k8sRktKubeletAnno,
Value: k8sRktKubeletAnnoValue,
Detail: true,
Filters: []*rktapi.PodFilter{
{
Annotations: []*rktapi.KeyValue{
{
Key: k8sRktKubeletAnno,
Value: k8sRktKubeletAnnoValue,
},
},
},
},
}
if !all {
listReq.Filter.States = []rktapi.PodState{rktapi.PodState_POD_STATE_RUNNING}
listReq.Filters[0].States = []rktapi.PodState{rktapi.PodState_POD_STATE_RUNNING}
}
listResp, err := r.apisvc.ListPods(context.Background(), listReq)
if err != nil {
@ -953,18 +954,8 @@ func (r *Runtime) GetPods(all bool) ([]*kubecontainer.Pod, error) {
}
var pods []*kubecontainer.Pod
for _, rktpod := range listResp.Pods {
//TODO: get the manifest from listresp.Pods when this gets merged: https://github.com/coreos/rkt/pull/1786
inspectResp, err := r.apisvc.InspectPod(context.Background(), &rktapi.InspectPodRequest{rktpod.Id})
if err != nil {
return nil, err
}
if inspectResp.Pod == nil {
return nil, fmt.Errorf("pod %s vanished?!", rktpod.Id)
}
pod, err := r.convertRktPod(*inspectResp.Pod)
for _, pod := range listResp.Pods {
pod, err := r.convertRktPod(pod)
if err != nil {
glog.Warningf("rkt: Cannot construct pod from unit file: %v.", err)
continue
@ -1125,17 +1116,8 @@ func (r *Runtime) PullImage(image kubecontainer.ImageSpec, pullSecrets []api.Sec
}
func (r *Runtime) IsImagePresent(image kubecontainer.ImageSpec) (bool, error) {
repoToPull, tag := parsers.ParseImageName(image.Image)
listResp, err := r.apisvc.ListImages(context.Background(), &rktapi.ListImagesRequest{
Filter: &rktapi.ImageFilter{
BaseNames: []string{repoToPull},
Labels: []*rktapi.KeyValue{{Key: "version", Value: tag}},
},
})
if err != nil {
return false, fmt.Errorf("couldn't list images: %v", err)
}
return len(listResp.Images) > 0, nil
images, err := r.listImages(image.Image, false)
return len(images) > 0, err
}
// SyncPod syncs the running pod to match the specified desired pod.
@ -1448,27 +1430,19 @@ func buildImageName(img *rktapi.Image) string {
return fmt.Sprintf("%s:%s", img.Name, img.Version)
}
// getImageByName tries to find the image info with the given image name.
// imageName should be in the form of 'example.com/app:latest', which should matches
// the result of 'rkt image list'. If the version is empty, then 'latest' is assumed.
func (r *Runtime) getImageByName(imageName string) (*kubecontainer.Image, error) {
repoToPull, tag := parsers.ParseImageName(imageName)
listResp, err := r.apisvc.ListImages(context.Background(), &rktapi.ListImagesRequest{
Filter: &rktapi.ImageFilter{
BaseNames: []string{repoToPull},
Labels: []*rktapi.KeyValue{{Key: "version", Value: tag}},
},
})
// getImageID tries to find the image ID for the given image name.
// imageName should be in the form of 'name[:version]', e.g., 'example.com/app:latest'.
// The name should matches the result of 'rkt image list'. If the version is empty,
// then 'latest' is assumed.
func (r *Runtime) getImageID(imageName string) (string, error) {
images, err := r.listImages(imageName, false)
if err != nil {
return nil, fmt.Errorf("couldn't list images: %v", err)
return "", err
}
if len(listResp.Images) == 0 {
return nil, fmt.Errorf("cannot find the image %q", imageName)
if len(images) == 0 {
return "", fmt.Errorf("cannot find the image %q", imageName)
}
return &kubecontainer.Image{
ID: listResp.Images[0].Id,
Tags: []string{buildImageName(listResp.Images[0])},
}, nil
return images[0].Id, nil
}
// ListImages lists all the available appc images on the machine by invoking 'rkt image list'.
@ -1482,7 +1456,7 @@ func (r *Runtime) ListImages() ([]kubecontainer.Image, error) {
for i, image := range listResp.Images {
images[i] = kubecontainer.Image{
ID: image.Id,
Tags: []string{buildImageName(listResp.Images[0])},
Tags: []string{buildImageName(image)},
//TODO: fill in the size of the image
}
}
@ -1491,12 +1465,11 @@ func (r *Runtime) ListImages() ([]kubecontainer.Image, error) {
// RemoveImage removes an on-disk image using 'rkt image rm'.
func (r *Runtime) RemoveImage(image kubecontainer.ImageSpec) error {
img, err := r.getImageByName(image.Image)
imageID, err := r.getImageID(image.Image)
if err != nil {
return err
}
if _, err := r.runCommand("image", "rm", img.ID); err != nil {
if _, err := r.runCommand("image", "rm", imageID); err != nil {
return err
}
return nil
@ -1513,8 +1486,11 @@ func appStateToContainerState(state rktapi.AppState) kubecontainer.ContainerStat
return kubecontainer.ContainerStateUnknown
}
// retrievePodInfo returns the pod manifest, creation time and restart count of the pod.
// TODO(yifan): Rename to getPodInfo when the old getPodInfo is removed.
func retrievePodInfo(pod *rktapi.Pod) (podManifest *appcschema.PodManifest, creationTime time.Time, restartCount int, err error) {
// TODO(yifan): The manifest is only used for getting the annotations.
// Consider to let the server to unmarshal the annotations.
var manifest appcschema.PodManifest
if err = json.Unmarshal(pod.Manifest, &manifest); err != nil {
return
@ -1540,6 +1516,37 @@ func retrievePodInfo(pod *rktapi.Pod) (podManifest *appcschema.PodManifest, crea
return &manifest, time.Unix(unixSec, 0), restartCount, nil
}
// populateContainerStatus fills the container status according to the app's information.
func populateContainerStatus(pod rktapi.Pod, app rktapi.App, runtimeApp appcschema.RuntimeApp, restartCount int, creationTime time.Time) (*kubecontainer.ContainerStatus, error) {
hashStr, ok := runtimeApp.Annotations.Get(k8sRktContainerHashAnno)
if !ok {
return nil, fmt.Errorf("No container hash in pod manifest")
}
hashNum, err := strconv.ParseUint(hashStr, 10, 64)
if err != nil {
return nil, err
}
return &kubecontainer.ContainerStatus{
ID: buildContainerID(&containerID{uuid: pod.Id, appName: app.Name}),
Name: app.Name,
State: appStateToContainerState(app.State),
// TODO(yifan): Use the creation/start/finished timestamp when it's implemented.
CreatedAt: creationTime,
StartedAt: creationTime,
ExitCode: int(app.ExitCode),
Image: app.Image.Name,
ImageID: "rkt://" + app.Image.Id, // TODO(yifan): Add the prefix only in api.PodStatus.
Hash: hashNum,
// TODO(yifan): Note that now all apps share the same restart count, this might
// change once apps don't share the same lifecycle.
// See https://github.com/appc/spec/pull/547.
RestartCount: restartCount,
// TODO(yifan): Add reason and message field.
}, nil
}
func (r *Runtime) GetPodStatus(uid types.UID, name, namespace string) (*kubecontainer.PodStatus, error) {
podStatus := &kubecontainer.PodStatus{
ID: uid,
@ -1547,23 +1554,10 @@ func (r *Runtime) GetPodStatus(uid types.UID, name, namespace string) (*kubecont
Namespace: namespace,
}
// TODO(yifan): Use ListPods(detail=true) to avoid another InspectPod rpc here.
listReq := &rktapi.ListPodsRequest{
Filter: &rktapi.PodFilter{
Annotations: []*rktapi.KeyValue{
{
Key: k8sRktKubeletAnno,
Value: k8sRktKubeletAnnoValue,
},
{
Key: k8sRktUIDAnno,
Value: string(uid),
},
},
},
}
listResp, err := r.apisvc.ListPods(context.Background(), listReq)
listResp, err := r.apisvc.ListPods(context.Background(), &rktapi.ListPodsRequest{
Detail: true,
Filters: kubernetesPodFilters(uid),
})
if err != nil {
return nil, fmt.Errorf("couldn't list pods: %v", err)
}
@ -1571,18 +1565,9 @@ func (r *Runtime) GetPodStatus(uid types.UID, name, namespace string) (*kubecont
var latestPod *rktapi.Pod
var latestRestartCount int = -1
// Only use the latest pod to get the information, (which has the
// largest restart count).
for _, rktpod := range listResp.Pods {
inspectReq := &rktapi.InspectPodRequest{rktpod.Id}
inspectResp, err := r.apisvc.InspectPod(context.Background(), inspectReq)
if err != nil {
glog.Warningf("rkt: Couldn't inspect rkt pod, (uuid %q): %v", rktpod.Id, err)
continue
}
pod := inspectResp.Pod
// In this loop, we group all containers from all pods together,
// also we try to find the latest pod, so we can fill other info of the pod below.
for _, pod := range listResp.Pods {
manifest, creationTime, restartCount, err := retrievePodInfo(pod)
if err != nil {
glog.Warning("rkt: Couldn't get necessary info from the rkt pod, (uuid %q): %v", pod.Id, err)
@ -1596,39 +1581,14 @@ func (r *Runtime) GetPodStatus(uid types.UID, name, namespace string) (*kubecont
for i, app := range pod.Apps {
// The order of the apps is determined by the rkt pod manifest.
// TODO(yifan): Let the server to unmarshal the annotations?
hashStr, ok := manifest.Apps[i].Annotations.Get(k8sRktContainerHashAnno)
if !ok {
glog.Warningf("rkt: No container hash in pod manifest for rkt pod, (uuid %q, app %q)", pod.Id, app.Name)
continue
}
hashNum, err := strconv.ParseUint(hashStr, 10, 64)
// TODO(yifan): Save creationTime, restartCount in each app's annotation,
// so we don't need to pass them.
cs, err := populateContainerStatus(*pod, *app, manifest.Apps[i], restartCount, creationTime)
if err != nil {
glog.Warningf("rkt: Invalid hash value %q for rkt pod, (uuid %q, app %q)", pod.Id, app.Name)
glog.Warningf("rkt: Failed to populate container status(uuid %q, app %q): %v", pod.Id, app.Name, err)
continue
}
cs := kubecontainer.ContainerStatus{
ID: buildContainerID(&containerID{uuid: pod.Id, appName: app.Name}),
Name: app.Name,
State: appStateToContainerState(app.State),
// TODO(yifan): Use the creation/start/finished timestamp when it's implemented.
CreatedAt: creationTime,
StartedAt: creationTime,
ExitCode: int(app.ExitCode),
Image: app.Image.Name,
ImageID: "rkt://" + app.Image.Id, // TODO(yifan): Add the prefix only in api.PodStatus.
Hash: hashNum,
// TODO(yifan): Note that now all apps share the same restart count, this might
// change once apps don't share the same lifecycle.
// See https://github.com/appc/spec/pull/547.
RestartCount: restartCount,
// TODO(yifan): Add reason and message field.
}
podStatus.ContainerStatuses = append(podStatus.ContainerStatuses, &cs)
podStatus.ContainerStatuses = append(podStatus.ContainerStatuses, cs)
}
}
@ -1648,7 +1608,7 @@ type sortByRestartCount []*kubecontainer.ContainerStatus
func (s sortByRestartCount) Len() int { return len(s) }
func (s sortByRestartCount) Swap(i, j int) { s[i], s[j] = s[j], s[i] }
func (s sortByRestartCount) Less(i, j int) bool { return s[i].RestartCount > s[j].RestartCount }
func (s sortByRestartCount) Less(i, j int) bool { return s[i].RestartCount < s[j].RestartCount }
// TODO(yifan): Delete this function when the logic is moved to kubelet.
func (r *Runtime) ConvertPodStatusToAPIPodStatus(pod *api.Pod, status *kubecontainer.PodStatus) (*api.PodStatus, error) {
@ -1657,7 +1617,9 @@ func (r *Runtime) ConvertPodStatusToAPIPodStatus(pod *api.Pod, status *kubeconta
PodIP: status.IP,
}
sort.Sort(sortByRestartCount(status.ContainerStatuses))
// Sort in the reverse order of the restart count because the
// lastest one will have the largest restart count.
sort.Sort(sort.Reverse(sortByRestartCount(status.ContainerStatuses)))
containerStatuses := make(map[string]*api.ContainerStatus)
for _, c := range status.ContainerStatuses {
@ -1723,6 +1685,7 @@ func (r *Runtime) ConvertPodStatusToAPIPodStatus(pod *api.Pod, status *kubeconta
return apiPodStatus, nil
}
// TODO(yifan): Delete this function when the logic is moved to kubelet.
func (r *Runtime) GetPodStatusAndAPIPodStatus(pod *api.Pod) (*kubecontainer.PodStatus, *api.PodStatus, error) {
// Get the pod status.
podStatus, err := r.GetPodStatus(pod.UID, pod.Name, pod.Namespace)

View File

@ -269,9 +269,10 @@ func TestListImages(t *testing.T) {
r := &Runtime{apisvc: fr, systemd: fs}
tests := []struct {
images []*rktapi.Image
images []*rktapi.Image
expected []kubecontainer.Image
}{
{},
{nil, []kubecontainer.Image{}},
{
[]*rktapi.Image{
{
@ -280,6 +281,12 @@ func TestListImages(t *testing.T) {
Version: "latest",
},
},
[]kubecontainer.Image{
{
ID: "sha512-a2fb8f390702",
Tags: []string{"quay.io/coreos/alpine-sh:latest"},
},
},
},
{
[]*rktapi.Image{
@ -294,6 +301,16 @@ func TestListImages(t *testing.T) {
Version: "0.10.0",
},
},
[]kubecontainer.Image{
{
ID: "sha512-a2fb8f390702",
Tags: []string{"quay.io/coreos/alpine-sh:latest"},
},
{
ID: "sha512-c6b597f42816",
Tags: []string{"coreos.com/rkt/stage1-coreos:0.10.0"},
},
},
},
}
@ -304,12 +321,7 @@ func TestListImages(t *testing.T) {
if err != nil {
t.Errorf("%v", err)
}
assert.Equal(t, len(images), len(tt.images), fmt.Sprintf("test case %d: mismatched number of images", i))
for i, image := range images {
assert.Equal(t, image.ID, tt.images[i].Id, fmt.Sprintf("test case %d: mismatched image IDs", i))
assert.Equal(t, []string{tt.images[i].Name}, image.Tags, fmt.Sprintf("test case %d: mismatched image tags", i))
}
assert.Equal(t, tt.expected, images)
assert.Equal(t, fr.called, []string{"ListImages"}, fmt.Sprintf("test case %d: unexpected called list", i))
fr.CleanCalls()
@ -449,45 +461,44 @@ func TestGetPods(t *testing.T) {
}
assert.Equal(t, tt.result, pods, testCaseHint)
var inspectPodCalls []string
for range pods {
inspectPodCalls = append(inspectPodCalls, "InspectPod")
}
assert.Equal(t, append([]string{"ListPods"}, inspectPodCalls...), fr.called, fmt.Sprintf("test case %d: unexpected called list", i))
assert.Equal(t, []string{"ListPods"}, fr.called, fmt.Sprintf("test case %d: unexpected called list", i))
fr.CleanCalls()
}
}
func TestGetPodsFilter(t *testing.T) {
func TestGetPodsFilters(t *testing.T) {
fr := newFakeRktInterface()
fs := newFakeSystemd()
r := &Runtime{apisvc: fr, systemd: fs}
for _, test := range []struct {
All bool
ExpectedFilter *rktapi.PodFilter
All bool
ExpectedFilters []*rktapi.PodFilter
}{
{
true,
&rktapi.PodFilter{
Annotations: []*rktapi.KeyValue{
{
Key: k8sRktKubeletAnno,
Value: k8sRktKubeletAnnoValue,
[]*rktapi.PodFilter{
{
Annotations: []*rktapi.KeyValue{
{
Key: k8sRktKubeletAnno,
Value: k8sRktKubeletAnnoValue,
},
},
},
},
},
{
false,
&rktapi.PodFilter{
States: []rktapi.PodState{rktapi.PodState_POD_STATE_RUNNING},
Annotations: []*rktapi.KeyValue{
{
Key: k8sRktKubeletAnno,
Value: k8sRktKubeletAnnoValue,
[]*rktapi.PodFilter{
{
States: []rktapi.PodState{rktapi.PodState_POD_STATE_RUNNING},
Annotations: []*rktapi.KeyValue{
{
Key: k8sRktKubeletAnno,
Value: k8sRktKubeletAnnoValue,
},
},
},
},
@ -497,7 +508,7 @@ func TestGetPodsFilter(t *testing.T) {
if err != nil {
t.Errorf("%v", err)
}
assert.Equal(t, test.ExpectedFilter, fr.podFilter, "filters didn't match when all=%b", test.All)
assert.Equal(t, test.ExpectedFilters, fr.podFilters, "filters didn't match when all=%b", test.All)
}
}
@ -647,12 +658,7 @@ func TestGetPodStatus(t *testing.T) {
}
assert.Equal(t, tt.result, status, testCaseHint)
var inspectPodCalls []string
for range tt.pods {
inspectPodCalls = append(inspectPodCalls, "InspectPod")
}
assert.Equal(t, append([]string{"ListPods"}, inspectPodCalls...), fr.called, testCaseHint)
assert.Equal(t, []string{"ListPods"}, fr.called, testCaseHint)
fr.CleanCalls()
}
}