rkt: rewrote GetPods to use rkt's api service

This involved adding annotations to the rkt pod's manifest that contain
information about the kubernetes pod, which is later read by the
kubelet.
This commit is contained in:
Derek Gonyeo
2015-11-21 00:56:35 +00:00
parent b7dc1175ba
commit 5a16b4751b
3 changed files with 522 additions and 69 deletions

View File

@@ -30,10 +30,12 @@ import (
// fakeRktInterface mocks the rktapi.PublicAPIClient interface for testing purpose. // fakeRktInterface mocks the rktapi.PublicAPIClient interface for testing purpose.
type fakeRktInterface struct { type fakeRktInterface struct {
sync.Mutex sync.Mutex
info rktapi.Info info rktapi.Info
images []*rktapi.Image images []*rktapi.Image
called []string podFilter *rktapi.PodFilter
err error pods []*rktapi.Pod
called []string
err error
} }
func newFakeRktInterface() *fakeRktInterface { func newFakeRktInterface() *fakeRktInterface {
@@ -55,11 +57,25 @@ func (f *fakeRktInterface) GetInfo(ctx context.Context, in *rktapi.GetInfoReques
} }
func (f *fakeRktInterface) ListPods(ctx context.Context, in *rktapi.ListPodsRequest, opts ...grpc.CallOption) (*rktapi.ListPodsResponse, error) { func (f *fakeRktInterface) ListPods(ctx context.Context, in *rktapi.ListPodsRequest, opts ...grpc.CallOption) (*rktapi.ListPodsResponse, error) {
return nil, fmt.Errorf("Not implemented") f.Lock()
defer f.Unlock()
f.called = append(f.called, "ListPods")
f.podFilter = in.Filter
return &rktapi.ListPodsResponse{f.pods}, f.err
} }
func (f *fakeRktInterface) InspectPod(ctx context.Context, in *rktapi.InspectPodRequest, opts ...grpc.CallOption) (*rktapi.InspectPodResponse, error) { func (f *fakeRktInterface) InspectPod(ctx context.Context, in *rktapi.InspectPodRequest, opts ...grpc.CallOption) (*rktapi.InspectPodResponse, error) {
return nil, fmt.Errorf("Not implemented") f.Lock()
defer f.Unlock()
f.called = append(f.called, "InspectPod")
for _, pod := range f.pods {
if pod.Id == in.Id {
return &rktapi.InspectPodResponse{pod}, f.err
}
}
return &rktapi.InspectPodResponse{nil}, f.err
} }
func (f *fakeRktInterface) ListImages(ctx context.Context, in *rktapi.ListImagesRequest, opts ...grpc.CallOption) (*rktapi.ListImagesResponse, error) { func (f *fakeRktInterface) ListImages(ctx context.Context, in *rktapi.ListImagesRequest, opts ...grpc.CallOption) (*rktapi.ListImagesResponse, error) {

View File

@@ -72,6 +72,16 @@ const (
unitRktID = "RktID" unitRktID = "RktID"
unitRestartCount = "RestartCount" unitRestartCount = "RestartCount"
k8sRktKubeletAnno = "rkt.kubernetes.io/managed-by-kubelet"
k8sRktKubeletAnnoValue = "true"
k8sRktUIDAnno = "rkt.kubernetes.io/uid"
k8sRktNameAnno = "rkt.kubernetes.io/name"
k8sRktNamespaceAnno = "rkt.kubernetes.io/namespace"
//TODO: remove the creation time annotation once this is closed: https://github.com/coreos/rkt/issues/1789
k8sRktCreationTimeAnno = "rkt.kubernetes.io/created"
k8sRktContainerHashAnno = "rkt.kubernetes.io/containerhash"
k8sRktRestartCountAnno = "rkt.kubernetes.io/restartcount"
dockerPrefix = "docker://" dockerPrefix = "docker://"
authDir = "auth.d" authDir = "auth.d"
@@ -415,50 +425,60 @@ func (r *Runtime) makePodManifest(pod *api.Pod, pullSecrets []api.Secret) (*appc
var globalPortMappings []kubecontainer.PortMapping var globalPortMappings []kubecontainer.PortMapping
manifest := appcschema.BlankPodManifest() manifest := appcschema.BlankPodManifest()
listResp, err := r.apisvc.ListPods(context.Background(), &rktapi.ListPodsRequest{
Filter: kubernetesPodFilter(pod),
})
if err != nil {
return nil, fmt.Errorf("couldn't list pods: %v", err)
}
restartCount := 0
for _, rktpod := range listResp.Pods {
//TODO: get the manifest from listresp.Pods when this gets merged: https://github.com/coreos/rkt/pull/1786
inspectResp, err := r.apisvc.InspectPod(context.Background(), &rktapi.InspectPodRequest{rktpod.Id})
if err != nil {
glog.Warningf("rkt: error while inspecting pod %s", rktpod.Id)
continue
}
if inspectResp.Pod == nil {
glog.Warningf("rkt: pod %s vanished?!", rktpod.Id)
continue
}
manifest := &appcschema.PodManifest{}
err = json.Unmarshal(inspectResp.Pod.Manifest, manifest)
if err != nil {
glog.Warningf("rkt: error unmatshaling pod manifest: %v", err)
continue
}
if countString, ok := manifest.Annotations.Get(k8sRktRestartCountAnno); ok {
num, err := strconv.Atoi(countString)
if err != nil {
glog.Warningf("rkt: error reading restart count on pod: %v", err)
continue
}
if num+1 > restartCount {
restartCount = num + 1
}
}
}
manifest.Annotations.Set(*appctypes.MustACIdentifier(k8sRktKubeletAnno), k8sRktKubeletAnnoValue)
manifest.Annotations.Set(*appctypes.MustACIdentifier(k8sRktUIDAnno), string(pod.UID))
manifest.Annotations.Set(*appctypes.MustACIdentifier(k8sRktNameAnno), pod.Name)
manifest.Annotations.Set(*appctypes.MustACIdentifier(k8sRktNamespaceAnno), pod.Namespace)
manifest.Annotations.Set(*appctypes.MustACIdentifier(k8sRktCreationTimeAnno), strconv.FormatInt(time.Now().Unix(), 10))
manifest.Annotations.Set(*appctypes.MustACIdentifier(k8sRktRestartCountAnno), strconv.Itoa(restartCount))
for _, c := range pod.Spec.Containers { for _, c := range pod.Spec.Containers {
if err, _ := r.imagePuller.PullImage(pod, &c, pullSecrets); err != nil { app, portMappings, err := r.newAppcRuntimeApp(pod, c, pullSecrets)
return nil, err
}
imgManifest, err := r.getImageManifest(c.Image)
if err != nil { if err != nil {
return nil, err return nil, err
} }
manifest.Apps = append(manifest.Apps, *app)
if imgManifest.App == nil { globalPortMappings = append(globalPortMappings, portMappings...)
imgManifest.App = new(appctypes.App)
}
img, err := r.getImageByName(c.Image)
if err != nil {
return nil, err
}
hash, err := appctypes.NewHash(img.ID)
if err != nil {
return nil, err
}
opts, err := r.generator.GenerateRunContainerOptions(pod, &c)
if err != nil {
return nil, err
}
globalPortMappings = append(globalPortMappings, opts.PortMappings...)
if err := setApp(imgManifest.App, &c, opts); err != nil {
return nil, err
}
name, err := appctypes.SanitizeACName(c.Name)
if err != nil {
return nil, err
}
appName := appctypes.MustACName(name)
manifest.Apps = append(manifest.Apps, appcschema.RuntimeApp{
Name: *appName,
Image: appcschema.RuntimeImage{ID: *hash},
App: imgManifest.App,
})
} }
volumeMap, ok := r.volumeGetter.GetVolumes(pod.UID) volumeMap, ok := r.volumeGetter.GetVolumes(pod.UID)
@@ -495,6 +515,80 @@ func (r *Runtime) makePodManifest(pod *api.Pod, pullSecrets []api.Secret) (*appc
return manifest, nil return manifest, nil
} }
func (r *Runtime) newAppcRuntimeApp(pod *api.Pod, c api.Container, pullSecrets []api.Secret) (*appcschema.RuntimeApp, []kubecontainer.PortMapping, error) {
if err, _ := r.imagePuller.PullImage(pod, &c, pullSecrets); err != nil {
return nil, nil, err
}
imgManifest, err := r.getImageManifest(c.Image)
if err != nil {
return nil, nil, err
}
if imgManifest.App == nil {
imgManifest.App = new(appctypes.App)
}
img, err := r.getImageByName(c.Image)
if err != nil {
return nil, nil, err
}
hash, err := appctypes.NewHash(img.ID)
if err != nil {
return nil, nil, err
}
opts, err := r.generator.GenerateRunContainerOptions(pod, &c)
if err != nil {
return nil, nil, err
}
if err := setApp(imgManifest.App, &c, opts); err != nil {
return nil, nil, err
}
name, err := appctypes.SanitizeACName(c.Name)
if err != nil {
return nil, nil, err
}
appName := appctypes.MustACName(name)
kubehash := kubecontainer.HashContainer(&c)
return &appcschema.RuntimeApp{
Name: *appName,
Image: appcschema.RuntimeImage{ID: *hash},
App: imgManifest.App,
Annotations: []appctypes.Annotation{
{
Name: *appctypes.MustACIdentifier(k8sRktContainerHashAnno),
Value: strconv.FormatUint(kubehash, 10),
},
},
}, opts.PortMappings, nil
}
func kubernetesPodFilter(pod *api.Pod) *rktapi.PodFilter {
return &rktapi.PodFilter{
States: []rktapi.PodState{
//TODO: In the future some pods can remain running after some apps exit: https://github.com/appc/spec/pull/500
rktapi.PodState_POD_STATE_RUNNING,
rktapi.PodState_POD_STATE_EXITED,
rktapi.PodState_POD_STATE_DELETING,
rktapi.PodState_POD_STATE_GARBAGE,
},
Annotations: []*rktapi.KeyValue{
{
Key: k8sRktKubeletAnno,
Value: k8sRktKubeletAnnoValue,
},
{
Key: k8sRktUIDAnno,
Value: string(pod.UID),
},
},
}
}
func newUnitOption(section, name, value string) *unit.UnitOption { func newUnitOption(section, name, value string) *unit.UnitOption {
return &unit.UnitOption{Section: section, Name: name, Value: value} return &unit.UnitOption{Section: section, Name: name, Value: value}
} }
@@ -719,6 +813,79 @@ func (r *Runtime) RunPod(pod *api.Pod, pullSecrets []api.Secret) error {
return nil return nil
} }
// convertRktPod will convert a rktapi.Pod to a kubecontainer.Pod
func (r *Runtime) convertRktPod(rktpod rktapi.Pod) (*kubecontainer.Pod, error) {
manifest := &appcschema.PodManifest{}
err := json.Unmarshal(rktpod.Manifest, manifest)
if err != nil {
return nil, err
}
podUID, ok := manifest.Annotations.Get(k8sRktUIDAnno)
if !ok {
return nil, fmt.Errorf("pod is missing annotation %s", k8sRktUIDAnno)
}
podName, ok := manifest.Annotations.Get(k8sRktNameAnno)
if !ok {
return nil, fmt.Errorf("pod is missing annotation %s", k8sRktNameAnno)
}
podNamespace, ok := manifest.Annotations.Get(k8sRktNamespaceAnno)
if !ok {
return nil, fmt.Errorf("pod is missing annotation %s", k8sRktNamespaceAnno)
}
podCreatedString, ok := manifest.Annotations.Get(k8sRktCreationTimeAnno)
if !ok {
return nil, fmt.Errorf("pod is missing annotation %s", k8sRktCreationTimeAnno)
}
podCreated, err := strconv.ParseInt(podCreatedString, 10, 64)
if err != nil {
return nil, fmt.Errorf("couldn't parse pod creation timestamp: %v", err)
}
var state kubecontainer.ContainerState
switch rktpod.State {
case rktapi.PodState_POD_STATE_RUNNING:
state = kubecontainer.ContainerStateRunning
case rktapi.PodState_POD_STATE_ABORTED_PREPARE, rktapi.PodState_POD_STATE_EXITED,
rktapi.PodState_POD_STATE_DELETING, rktapi.PodState_POD_STATE_GARBAGE:
state = kubecontainer.ContainerStateExited
default:
state = kubecontainer.ContainerStateUnknown
}
kubepod := &kubecontainer.Pod{
ID: types.UID(podUID),
Name: podName,
Namespace: podNamespace,
}
for _, app := range rktpod.Apps {
manifest := &appcschema.ImageManifest{}
err := json.Unmarshal(app.Image.Manifest, manifest)
if err != nil {
return nil, err
}
containerHashString, ok := manifest.Annotations.Get(k8sRktContainerHashAnno)
if !ok {
return nil, fmt.Errorf("app is missing annotation %s", k8sRktContainerHashAnno)
}
containerHash, err := strconv.ParseUint(containerHashString, 10, 64)
if err != nil {
return nil, fmt.Errorf("couldn't parse container's hash: %v", err)
}
kubepod.Containers = append(kubepod.Containers, &kubecontainer.Container{
ID: buildContainerID(&containerID{rktpod.Id, app.Name}),
Name: app.Name,
Image: app.Image.Name,
Hash: containerHash,
Created: podCreated,
State: state,
})
}
return kubepod, nil
}
// readServiceFile reads the service file and constructs the runtime pod and the rkt info. // readServiceFile reads the service file and constructs the runtime pod and the rkt info.
func (r *Runtime) readServiceFile(serviceName string) (*kubecontainer.Pod, *rktInfo, error) { func (r *Runtime) readServiceFile(serviceName string) (*kubecontainer.Pod, *rktInfo, error) {
f, err := os.Open(serviceFilePath(serviceName)) f, err := os.Open(serviceFilePath(serviceName))
@@ -770,34 +937,42 @@ func (r *Runtime) readServiceFile(serviceName string) (*kubecontainer.Pod, *rktI
func (r *Runtime) GetPods(all bool) ([]*kubecontainer.Pod, error) { func (r *Runtime) GetPods(all bool) ([]*kubecontainer.Pod, error) {
glog.V(4).Infof("Rkt getting pods") glog.V(4).Infof("Rkt getting pods")
units, err := r.systemd.ListUnits() listReq := &rktapi.ListPodsRequest{
Filter: &rktapi.PodFilter{
Annotations: []*rktapi.KeyValue{
{
Key: k8sRktKubeletAnno,
Value: k8sRktKubeletAnnoValue,
},
},
},
}
if !all {
listReq.Filter.States = []rktapi.PodState{rktapi.PodState_POD_STATE_RUNNING}
}
listResp, err := r.apisvc.ListPods(context.Background(), listReq)
if err != nil { if err != nil {
return nil, err return nil, fmt.Errorf("couldn't list pods: %v", err)
} }
var pods []*kubecontainer.Pod var pods []*kubecontainer.Pod
for _, u := range units { for _, rktpod := range listResp.Pods {
if strings.HasPrefix(u.Name, kubernetesUnitPrefix) { //TODO: get the manifest from listresp.Pods when this gets merged: https://github.com/coreos/rkt/pull/1786
var state kubecontainer.ContainerState inspectResp, err := r.apisvc.InspectPod(context.Background(), &rktapi.InspectPodRequest{rktpod.Id})
switch { if err != nil {
case u.SubState == "running": return nil, err
state = kubecontainer.ContainerStateRunning
default:
state = kubecontainer.ContainerStateExited
}
if !all && state != kubecontainer.ContainerStateRunning {
continue
}
pod, _, err := r.readServiceFile(u.Name)
if err != nil {
glog.Warningf("rkt: Cannot construct pod from unit file: %v.", err)
continue
}
for _, c := range pod.Containers {
c.State = state
}
pods = append(pods, pod)
} }
if inspectResp.Pod == nil {
return nil, fmt.Errorf("pod %s vanished?!", rktpod.Id)
}
pod, err := r.convertRktPod(*inspectResp.Pod)
if err != nil {
glog.Warningf("rkt: Cannot construct pod from unit file: %v.", err)
continue
}
pods = append(pods, pod)
} }
return pods, nil return pods, nil
} }

View File

@@ -17,11 +17,15 @@ limitations under the License.
package rkt package rkt
import ( import (
"encoding/json"
"fmt" "fmt"
"testing" "testing"
appcschema "github.com/appc/spec/schema"
appctypes "github.com/appc/spec/schema/types"
rktapi "github.com/coreos/rkt/api/v1alpha" rktapi "github.com/coreos/rkt/api/v1alpha"
"github.com/stretchr/testify/assert" "github.com/stretchr/testify/assert"
"k8s.io/kubernetes/pkg/types"
) )
func TestCheckVersion(t *testing.T) { func TestCheckVersion(t *testing.T) {
@@ -197,3 +201,261 @@ func TestListImages(t *testing.T) {
fr.CleanCalls() fr.CleanCalls()
} }
} }
func TestGetPods(t *testing.T) {
fr := newFakeRktInterface()
fs := newFakeSystemd()
r := &Runtime{apisvc: fr, systemd: fs}
tests := []struct {
k8sUID types.UID
k8sName string
k8sNamespace string
k8sCreation int64
k8sRestart int
k8sContHashes []uint64
rktPodState rktapi.PodState
pods []*rktapi.Pod
}{
{},
{
k8sUID: types.UID("0"),
k8sName: "guestbook",
k8sNamespace: "default",
k8sCreation: 10000000000,
k8sRestart: 1,
k8sContHashes: []uint64{2353434678},
rktPodState: rktapi.PodState_POD_STATE_RUNNING,
pods: []*rktapi.Pod{
{
State: rktapi.PodState_POD_STATE_RUNNING,
Apps: []*rktapi.App{
{
Name: "test",
Image: &rktapi.Image{
Name: "test",
Manifest: mustMarshalImageManifest(
&appcschema.ImageManifest{
ACKind: appcschema.ImageManifestKind,
ACVersion: appcschema.AppContainerVersion,
Name: *appctypes.MustACIdentifier("test"),
Annotations: appctypes.Annotations{
appctypes.Annotation{
Name: *appctypes.MustACIdentifier(k8sRktContainerHashAnno),
Value: "2353434678",
},
},
},
),
},
},
},
Manifest: mustMarshalPodManifest(
&appcschema.PodManifest{
ACKind: appcschema.PodManifestKind,
ACVersion: appcschema.AppContainerVersion,
Annotations: appctypes.Annotations{
appctypes.Annotation{
Name: *appctypes.MustACIdentifier(k8sRktKubeletAnno),
Value: k8sRktKubeletAnnoValue,
},
appctypes.Annotation{
Name: *appctypes.MustACIdentifier(k8sRktUIDAnno),
Value: "0",
},
appctypes.Annotation{
Name: *appctypes.MustACIdentifier(k8sRktNameAnno),
Value: "guestbook",
},
appctypes.Annotation{
Name: *appctypes.MustACIdentifier(k8sRktNamespaceAnno),
Value: "default",
},
appctypes.Annotation{
Name: *appctypes.MustACIdentifier(k8sRktCreationTimeAnno),
Value: "10000000000",
},
appctypes.Annotation{
Name: *appctypes.MustACIdentifier(k8sRktRestartCountAnno),
Value: "1",
},
},
},
),
},
},
},
{
k8sUID: types.UID("1"),
k8sName: "test-pod",
k8sNamespace: "default",
k8sCreation: 10000000001,
k8sRestart: 3,
k8sContHashes: []uint64{2353434682, 8732645},
rktPodState: rktapi.PodState_POD_STATE_EXITED,
pods: []*rktapi.Pod{
{
State: rktapi.PodState_POD_STATE_EXITED,
Apps: []*rktapi.App{
{
Name: "test",
Image: &rktapi.Image{
Name: "test",
Manifest: mustMarshalImageManifest(
&appcschema.ImageManifest{
ACKind: appcschema.ImageManifestKind,
ACVersion: appcschema.AppContainerVersion,
Name: *appctypes.MustACIdentifier("test"),
Annotations: appctypes.Annotations{
appctypes.Annotation{
Name: *appctypes.MustACIdentifier(k8sRktContainerHashAnno),
Value: "2353434682",
},
},
},
),
},
},
{
Name: "test2",
Image: &rktapi.Image{
Name: "test2",
Manifest: mustMarshalImageManifest(
&appcschema.ImageManifest{
ACKind: appcschema.ImageManifestKind,
ACVersion: appcschema.AppContainerVersion,
Name: *appctypes.MustACIdentifier("test2"),
Annotations: appctypes.Annotations{
appctypes.Annotation{
Name: *appctypes.MustACIdentifier(k8sRktContainerHashAnno),
Value: "8732645",
},
},
},
),
},
},
},
Manifest: mustMarshalPodManifest(
&appcschema.PodManifest{
ACKind: appcschema.PodManifestKind,
ACVersion: appcschema.AppContainerVersion,
Annotations: appctypes.Annotations{
appctypes.Annotation{
Name: *appctypes.MustACIdentifier(k8sRktKubeletAnno),
Value: k8sRktKubeletAnnoValue,
},
appctypes.Annotation{
Name: *appctypes.MustACIdentifier(k8sRktUIDAnno),
Value: "1",
},
appctypes.Annotation{
Name: *appctypes.MustACIdentifier(k8sRktNameAnno),
Value: "test-pod",
},
appctypes.Annotation{
Name: *appctypes.MustACIdentifier(k8sRktNamespaceAnno),
Value: "default",
},
appctypes.Annotation{
Name: *appctypes.MustACIdentifier(k8sRktCreationTimeAnno),
Value: "10000000001",
},
appctypes.Annotation{
Name: *appctypes.MustACIdentifier(k8sRktRestartCountAnno),
Value: "3",
},
},
},
),
},
},
},
}
for i, tt := range tests {
fr.pods = tt.pods
pods, err := r.GetPods(true)
if err != nil {
t.Errorf("%v", err)
}
assert.Equal(t, len(pods), len(tt.pods), fmt.Sprintf("test case %d: mismatched number of pods", i))
for j, pod := range pods {
assert.Equal(t, pod.ID, tt.k8sUID, fmt.Sprintf("test case %d: mismatched UIDs", i))
assert.Equal(t, pod.Name, tt.k8sName, fmt.Sprintf("test case %d: mismatched Names", i))
assert.Equal(t, pod.Namespace, tt.k8sNamespace, fmt.Sprintf("test case %d: mismatched Namespaces", i))
assert.Equal(t, len(pod.Containers), len(tt.pods[j].Apps), fmt.Sprintf("test case %d: mismatched number of containers", i))
for k, cont := range pod.Containers {
assert.Equal(t, cont.Created, tt.k8sCreation, fmt.Sprintf("test case %d: mismatched creation times", i))
assert.Equal(t, cont.Hash, tt.k8sContHashes[k], fmt.Sprintf("test case %d: mismatched container hashes", i))
}
}
var inspectPodCalls []string
for range pods {
inspectPodCalls = append(inspectPodCalls, "InspectPod")
}
assert.Equal(t, append([]string{"ListPods"}, inspectPodCalls...), fr.called, fmt.Sprintf("test case %d: unexpected called list", i))
fr.CleanCalls()
}
}
func TestGetPodsFilter(t *testing.T) {
fr := newFakeRktInterface()
fs := newFakeSystemd()
r := &Runtime{apisvc: fr, systemd: fs}
for _, test := range []struct {
All bool
ExpectedFilter *rktapi.PodFilter
}{
{
true,
&rktapi.PodFilter{
Annotations: []*rktapi.KeyValue{
{
Key: k8sRktKubeletAnno,
Value: k8sRktKubeletAnnoValue,
},
},
},
},
{
false,
&rktapi.PodFilter{
States: []rktapi.PodState{rktapi.PodState_POD_STATE_RUNNING},
Annotations: []*rktapi.KeyValue{
{
Key: k8sRktKubeletAnno,
Value: k8sRktKubeletAnnoValue,
},
},
},
},
} {
_, err := r.GetPods(test.All)
if err != nil {
t.Errorf("%v", err)
}
assert.Equal(t, test.ExpectedFilter, fr.podFilter, "filters didn't match when all=%b", test.All)
}
}
func mustMarshalPodManifest(man *appcschema.PodManifest) []byte {
manblob, err := json.Marshal(man)
if err != nil {
panic(err)
}
return manblob
}
func mustMarshalImageManifest(man *appcschema.ImageManifest) []byte {
manblob, err := json.Marshal(man)
if err != nil {
panic(err)
}
return manblob
}