Pod log subresource

Adds a Log subresource to Pod storage. The Log subresource implements
rest.GetterWithOptions and produces a ResourceStreamer resource that
will stream the log output from the pod's host node.
This commit is contained in:
Cesar Wong 2015-04-06 14:57:06 -04:00
parent efc7f86baf
commit 8df4758ee9
8 changed files with 319 additions and 25 deletions

View File

@ -357,7 +357,7 @@ func logStackOnRecover(panicReason interface{}, httpWriter http.ResponseWriter)
// init initializes master.
func (m *Master) init(c *Config) {
podStorage := podetcd.NewStorage(c.EtcdHelper)
podStorage := podetcd.NewStorage(c.EtcdHelper, c.KubeletClient)
podRegistry := pod.NewRegistry(podStorage.Pod)
eventRegistry := event.NewEtcdRegistry(c.EtcdHelper, uint64(c.EventTTL.Seconds()))
@ -387,6 +387,7 @@ func (m *Master) init(c *Config) {
m.storage = map[string]rest.Storage{
"pods": podStorage.Pod,
"pods/status": podStorage.Status,
"pods/log": podStorage.Log,
"pods/binding": podStorage.Binding,
"bindings": podStorage.Binding,

View File

@ -44,7 +44,7 @@ func NewTestEtcdRegistry(client tools.EtcdClient) *Registry {
func NewTestEtcdRegistryWithPods(client tools.EtcdClient) *Registry {
helper := tools.NewEtcdHelper(client, latest.Codec)
podStorage := podetcd.NewStorage(helper)
podStorage := podetcd.NewStorage(helper, nil)
endpointStorage := endpointetcd.NewStorage(helper)
registry := NewRegistry(helper, pod.NewRegistry(podStorage.Pod), endpoint.NewRegistry(endpointStorage))
return registry

View File

@ -0,0 +1,19 @@
/*
Copyright 2014 Google Inc. All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
// Package rest has generic implementations of resources used for
// REST responses
package rest

View File

@ -0,0 +1,69 @@
/*
Copyright 2014 Google Inc. All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package rest
import (
"io"
"net/http"
"net/url"
"strings"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api/rest"
)
// LocationStreamer is a resource that streams the contents of a particular
// location URL
type LocationStreamer struct {
Location *url.URL
Transport http.RoundTripper
ContentType string
Flush bool
}
// a LocationStreamer must implement a rest.ResourceStreamer
var _ rest.ResourceStreamer = &LocationStreamer{}
// IsAnAPIObject marks this object as a runtime.Object
func (*LocationStreamer) IsAnAPIObject() {}
// InputStream returns a stream with the contents of the URL location. If no location is provided,
// a null stream is returned.
func (s *LocationStreamer) InputStream(apiVersion, acceptHeader string) (stream io.ReadCloser, flush bool, contentType string, err error) {
if s.Location == nil {
// If no location was provided, return a null stream
return nil, false, "", nil
}
transport := s.Transport
if transport == nil {
transport = http.DefaultTransport
}
client := &http.Client{Transport: transport}
resp, err := client.Get(s.Location.String())
if err != nil {
return
}
contentType = s.ContentType
if len(contentType) == 0 {
contentType = resp.Header.Get("Content-Type")
if len(contentType) > 0 {
contentType = strings.TrimSpace(strings.SplitN(contentType, ";", 2)[0])
}
}
flush = s.Flush
stream = resp.Body
return
}

View File

@ -0,0 +1,118 @@
/*
Copyright 2014 Google Inc. All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package rest
import (
"bufio"
"bytes"
"fmt"
"io/ioutil"
"net/http"
"net/http/httptest"
"net/url"
"testing"
)
func TestInputStreamReader(t *testing.T) {
resultString := "Test output"
s := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
w.Write([]byte(resultString))
}))
defer s.Close()
u, err := url.Parse(s.URL)
if err != nil {
t.Errorf("Error parsing server URL: %v", err)
return
}
streamer := &LocationStreamer{
Location: u,
}
readCloser, _, _, err := streamer.InputStream("v1beta1", "text/plain")
if err != nil {
t.Errorf("Unexpected error when getting stream: %v", err)
return
}
defer readCloser.Close()
result, err := ioutil.ReadAll(readCloser)
if string(result) != resultString {
t.Errorf("Stream content does not match. Got: %s. Expected: %s.", string(result), resultString)
}
}
func TestInputStreamNullLocation(t *testing.T) {
streamer := &LocationStreamer{
Location: nil,
}
readCloser, _, _, err := streamer.InputStream("v1beta1", "text/plain")
if err != nil {
t.Errorf("Unexpected error when getting stream with null location: %v", err)
}
if readCloser != nil {
t.Errorf("Expected stream to be nil. Got: %#v", readCloser)
}
}
type testTransport struct {
body string
err error
}
func (tt *testTransport) RoundTrip(req *http.Request) (*http.Response, error) {
r := bufio.NewReader(bytes.NewBufferString(tt.body))
return http.ReadResponse(r, req)
}
func fakeTransport(mime, message string) http.RoundTripper {
content := fmt.Sprintf("HTTP/1.1 200 OK\nContent-Type: %s\n\n%s", mime, message)
return &testTransport{body: content}
}
func TestInputStreamContentType(t *testing.T) {
location, _ := url.Parse("http://www.example.com")
streamer := &LocationStreamer{
Location: location,
Transport: fakeTransport("application/json", "hello world"),
}
readCloser, _, contentType, err := streamer.InputStream("v1beta1", "text/plain")
if err != nil {
t.Errorf("Unexpected error when getting stream: %v", err)
return
}
defer readCloser.Close()
if contentType != "application/json" {
t.Errorf("Unexpected content type. Got: %s. Expected: application/json", contentType)
}
}
func TestInputStreamTransport(t *testing.T) {
message := "hello world"
location, _ := url.Parse("http://www.example.com")
streamer := &LocationStreamer{
Location: location,
Transport: fakeTransport("text/plain", message),
}
readCloser, _, _, err := streamer.InputStream("v1beta1", "text/plain")
if err != nil {
t.Errorf("Unexpected error when getting stream: %v", err)
return
}
defer readCloser.Close()
result, err := ioutil.ReadAll(readCloser)
if string(result) != message {
t.Errorf("Stream content does not match. Got: %s. Expected: %s.", string(result), message)
}
}

View File

@ -25,10 +25,12 @@ import (
"github.com/GoogleCloudPlatform/kubernetes/pkg/api/errors"
etcderr "github.com/GoogleCloudPlatform/kubernetes/pkg/api/errors/etcd"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api/rest"
"github.com/GoogleCloudPlatform/kubernetes/pkg/client"
"github.com/GoogleCloudPlatform/kubernetes/pkg/fields"
"github.com/GoogleCloudPlatform/kubernetes/pkg/labels"
"github.com/GoogleCloudPlatform/kubernetes/pkg/registry/generic"
etcdgeneric "github.com/GoogleCloudPlatform/kubernetes/pkg/registry/generic/etcd"
genericrest "github.com/GoogleCloudPlatform/kubernetes/pkg/registry/generic/rest"
"github.com/GoogleCloudPlatform/kubernetes/pkg/registry/pod"
"github.com/GoogleCloudPlatform/kubernetes/pkg/runtime"
"github.com/GoogleCloudPlatform/kubernetes/pkg/tools"
@ -40,6 +42,7 @@ type PodStorage struct {
Pod *REST
Binding *BindingREST
Status *StatusREST
Log *LogREST
}
// REST implements a RESTStorage for pods against etcd
@ -48,7 +51,7 @@ type REST struct {
}
// NewStorage returns a RESTStorage object that will work against pods.
func NewStorage(h tools.EtcdHelper) PodStorage {
func NewStorage(h tools.EtcdHelper, k client.ConnectionInfoGetter) PodStorage {
prefix := "/registry/pods"
store := &etcdgeneric.Etcd{
NewFunc: func() runtime.Object { return &api.Pod{} },
@ -85,6 +88,7 @@ func NewStorage(h tools.EtcdHelper) PodStorage {
Pod: &REST{*store},
Binding: &BindingREST{store: store},
Status: &StatusREST{store: &statusStore},
Log: &LogREST{store: store, kubeletConn: k},
}
}
@ -186,3 +190,37 @@ func (r *StatusREST) New() runtime.Object {
func (r *StatusREST) Update(ctx api.Context, obj runtime.Object) (runtime.Object, bool, error) {
return r.store.Update(ctx, obj)
}
// LogREST implements the log endpoint for a Pod
type LogREST struct {
store *etcdgeneric.Etcd
kubeletConn client.ConnectionInfoGetter
}
// New creates a new Pod log options object
func (r *LogREST) New() runtime.Object {
return &api.PodLogOptions{}
}
// Get retrieves a runtime.Object that will stream the contents of the pod log
func (r *LogREST) Get(ctx api.Context, name string, opts runtime.Object) (runtime.Object, error) {
logOpts, ok := opts.(*api.PodLogOptions)
if !ok {
return nil, fmt.Errorf("Invalid options object: %#v", opts)
}
location, transport, err := pod.LogLocation(r.store, r.kubeletConn, ctx, name, logOpts)
if err != nil {
return nil, err
}
return &genericrest.LocationStreamer{
Location: location,
Transport: transport,
ContentType: "text/plain",
Flush: logOpts.Follow,
}, nil
}
// NewGetOptions creates a new options object
func (r *LogREST) NewGetOptions() runtime.Object {
return &api.PodLogOptions{}
}

View File

@ -47,7 +47,7 @@ func newHelper(t *testing.T) (*tools.FakeEtcdClient, tools.EtcdHelper) {
func newStorage(t *testing.T) (*REST, *BindingREST, *StatusREST, *tools.FakeEtcdClient, tools.EtcdHelper) {
fakeEtcdClient, h := newHelper(t)
storage := NewStorage(h)
storage := NewStorage(h, nil)
return storage.Pod, storage.Binding, storage.Status, fakeEtcdClient, h
}
@ -89,7 +89,7 @@ func TestStorage(t *testing.T) {
func TestCreate(t *testing.T) {
fakeEtcdClient, helper := newHelper(t)
storage := NewStorage(helper).Pod
storage := NewStorage(helper, nil).Pod
test := resttest.New(t, storage, fakeEtcdClient.SetError)
pod := validNewPod()
pod.ObjectMeta = api.ObjectMeta{}
@ -107,7 +107,7 @@ func TestCreate(t *testing.T) {
func TestDelete(t *testing.T) {
fakeEtcdClient, helper := newHelper(t)
storage := NewStorage(helper).Pod
storage := NewStorage(helper, nil).Pod
test := resttest.New(t, storage, fakeEtcdClient.SetError)
createFn := func() runtime.Object {
@ -143,7 +143,7 @@ func expectPod(t *testing.T, out runtime.Object) (*api.Pod, bool) {
func TestCreateRegistryError(t *testing.T) {
fakeEtcdClient, helper := newHelper(t)
fakeEtcdClient.Err = fmt.Errorf("test error")
storage := NewStorage(helper).Pod
storage := NewStorage(helper, nil).Pod
pod := validNewPod()
_, err := storage.Create(api.NewDefaultContext(), pod)
@ -154,7 +154,7 @@ func TestCreateRegistryError(t *testing.T) {
func TestCreateSetsFields(t *testing.T) {
fakeEtcdClient, helper := newHelper(t)
storage := NewStorage(helper).Pod
storage := NewStorage(helper, nil).Pod
pod := validNewPod()
_, err := storage.Create(api.NewDefaultContext(), pod)
if err != fakeEtcdClient.Err {
@ -176,7 +176,7 @@ func TestCreateSetsFields(t *testing.T) {
func TestListError(t *testing.T) {
fakeEtcdClient, helper := newHelper(t)
fakeEtcdClient.Err = fmt.Errorf("test error")
storage := NewStorage(helper).Pod
storage := NewStorage(helper, nil).Pod
pods, err := storage.List(api.NewDefaultContext(), labels.Everything(), fields.Everything())
if err != fakeEtcdClient.Err {
t.Fatalf("Expected %#v, Got %#v", fakeEtcdClient.Err, err)
@ -194,7 +194,7 @@ func TestListEmptyPodList(t *testing.T) {
E: fakeEtcdClient.NewError(tools.EtcdErrorCodeNotFound),
}
storage := NewStorage(helper).Pod
storage := NewStorage(helper, nil).Pod
pods, err := storage.List(api.NewContext(), labels.Everything(), fields.Everything())
if err != nil {
t.Fatalf("unexpected error: %v", err)
@ -231,7 +231,7 @@ func TestListPodList(t *testing.T) {
},
},
}
storage := NewStorage(helper).Pod
storage := NewStorage(helper, nil).Pod
podsObj, err := storage.List(api.NewDefaultContext(), labels.Everything(), fields.Everything())
pods := podsObj.(*api.PodList)
@ -280,7 +280,7 @@ func TestListPodListSelection(t *testing.T) {
},
},
}
storage := NewStorage(helper).Pod
storage := NewStorage(helper, nil).Pod
ctx := api.NewDefaultContext()
@ -345,7 +345,7 @@ func TestListPodListSelection(t *testing.T) {
}
func TestPodDecode(t *testing.T) {
storage := NewStorage(tools.EtcdHelper{}).Pod
storage := NewStorage(tools.EtcdHelper{}, nil).Pod
expected := validNewPod()
body, err := latest.Codec.Encode(expected)
if err != nil {
@ -375,7 +375,7 @@ func TestGet(t *testing.T) {
},
},
}
storage := NewStorage(helper).Pod
storage := NewStorage(helper, nil).Pod
obj, err := storage.Get(api.WithNamespace(api.NewContext(), "test"), "foo")
pod := obj.(*api.Pod)
@ -392,7 +392,7 @@ func TestGet(t *testing.T) {
func TestPodStorageValidatesCreate(t *testing.T) {
fakeEtcdClient, helper := newHelper(t)
fakeEtcdClient.Err = fmt.Errorf("test error")
storage := NewStorage(helper).Pod
storage := NewStorage(helper, nil).Pod
pod := validNewPod()
pod.Labels = map[string]string{
@ -410,7 +410,7 @@ func TestPodStorageValidatesCreate(t *testing.T) {
// TODO: remove, this is covered by RESTTest.TestCreate
func TestCreatePod(t *testing.T) {
_, helper := newHelper(t)
storage := NewStorage(helper).Pod
storage := NewStorage(helper, nil).Pod
pod := validNewPod()
obj, err := storage.Create(api.NewDefaultContext(), pod)
@ -432,7 +432,7 @@ func TestCreatePod(t *testing.T) {
// TODO: remove, this is covered by RESTTest.TestCreate
func TestCreateWithConflictingNamespace(t *testing.T) {
_, helper := newHelper(t)
storage := NewStorage(helper).Pod
storage := NewStorage(helper, nil).Pod
pod := validNewPod()
pod.Namespace = "not-default"
@ -461,7 +461,7 @@ func TestUpdateWithConflictingNamespace(t *testing.T) {
},
},
}
storage := NewStorage(helper).Pod
storage := NewStorage(helper, nil).Pod
pod := validChangedPod()
pod.Namespace = "not-default"
@ -578,7 +578,7 @@ func TestResourceLocation(t *testing.T) {
},
},
}
storage := NewStorage(helper).Pod
storage := NewStorage(helper, nil).Pod
redirector := rest.Redirector(storage)
location, _, err := redirector.ResourceLocation(api.NewDefaultContext(), tc.query)
@ -616,7 +616,7 @@ func TestDeletePod(t *testing.T) {
},
},
}
storage := NewStorage(helper).Pod
storage := NewStorage(helper, nil).Pod
_, err := storage.Delete(api.NewDefaultContext(), "foo", nil)
if err != nil {

View File

@ -26,6 +26,7 @@ import (
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api/errors"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api/validation"
"github.com/GoogleCloudPlatform/kubernetes/pkg/client"
"github.com/GoogleCloudPlatform/kubernetes/pkg/fields"
"github.com/GoogleCloudPlatform/kubernetes/pkg/labels"
"github.com/GoogleCloudPlatform/kubernetes/pkg/registry/generic"
@ -133,6 +134,18 @@ type ResourceGetter interface {
Get(api.Context, string) (runtime.Object, error)
}
func getPod(getter ResourceGetter, ctx api.Context, name string) (*api.Pod, error) {
obj, err := getter.Get(ctx, name)
if err != nil {
return nil, err
}
pod := obj.(*api.Pod)
if pod == nil {
return nil, fmt.Errorf("Unexpected object type: %#v", pod)
}
return pod, nil
}
// ResourceLocation returns a URL to which one can send traffic for the specified pod.
func ResourceLocation(getter ResourceGetter, ctx api.Context, id string) (*url.URL, http.RoundTripper, error) {
// Allow ID as "podname" or "podname:port". If port is not specified,
@ -148,14 +161,10 @@ func ResourceLocation(getter ResourceGetter, ctx api.Context, id string) (*url.U
port = parts[1]
}
obj, err := getter.Get(ctx, name)
pod, err := getPod(getter, ctx, name)
if err != nil {
return nil, nil, err
}
pod := obj.(*api.Pod)
if pod == nil {
return nil, nil, nil
}
// Try to figure out a port.
if port == "" {
@ -177,3 +186,43 @@ func ResourceLocation(getter ResourceGetter, ctx api.Context, id string) (*url.U
}
return loc, nil, nil
}
// LogLocation returns a the log URL for a pod container. If opts.Container is blank
// and only one container is present in the pod, that container is used.
func LogLocation(getter ResourceGetter, connInfo client.ConnectionInfoGetter, ctx api.Context, name string, opts *api.PodLogOptions) (*url.URL, http.RoundTripper, error) {
pod, err := getPod(getter, ctx, name)
if err != nil {
return nil, nil, err
}
// Try to figure out a container
container := opts.Container
if container == "" {
if len(pod.Spec.Containers) == 1 {
container = pod.Spec.Containers[0].Name
} else {
return nil, nil, fmt.Errorf("a container name must be specified for pod %s", name)
}
}
nodeHost := pod.Status.HostIP
if len(nodeHost) == 0 {
// If pod has not been assigned a host, return an empty location
return nil, nil, nil
}
nodeScheme, nodePort, nodeTransport, err := connInfo.GetConnectionInfo(nodeHost)
if err != nil {
return nil, nil, err
}
params := url.Values{}
if opts.Follow {
params.Add("follow", "true")
}
loc := &url.URL{
Scheme: nodeScheme,
Host: fmt.Sprintf("%s:%d", nodeHost, nodePort),
Path: fmt.Sprintf("/containerLogs/%s/%s/%s", pod.Namespace, name, container),
RawQuery: params.Encode(),
}
return loc, nodeTransport, nil
}