diff --git a/pkg/master/master.go b/pkg/master/master.go index 0339856229b..a306c18f3ed 100644 --- a/pkg/master/master.go +++ b/pkg/master/master.go @@ -357,7 +357,7 @@ func logStackOnRecover(panicReason interface{}, httpWriter http.ResponseWriter) // init initializes master. func (m *Master) init(c *Config) { - podStorage := podetcd.NewStorage(c.EtcdHelper) + podStorage := podetcd.NewStorage(c.EtcdHelper, c.KubeletClient) podRegistry := pod.NewRegistry(podStorage.Pod) eventRegistry := event.NewEtcdRegistry(c.EtcdHelper, uint64(c.EventTTL.Seconds())) @@ -387,6 +387,7 @@ func (m *Master) init(c *Config) { m.storage = map[string]rest.Storage{ "pods": podStorage.Pod, "pods/status": podStorage.Status, + "pods/log": podStorage.Log, "pods/binding": podStorage.Binding, "bindings": podStorage.Binding, diff --git a/pkg/registry/etcd/etcd_test.go b/pkg/registry/etcd/etcd_test.go index 430e8f64897..2039d963503 100644 --- a/pkg/registry/etcd/etcd_test.go +++ b/pkg/registry/etcd/etcd_test.go @@ -44,7 +44,7 @@ func NewTestEtcdRegistry(client tools.EtcdClient) *Registry { func NewTestEtcdRegistryWithPods(client tools.EtcdClient) *Registry { helper := tools.NewEtcdHelper(client, latest.Codec) - podStorage := podetcd.NewStorage(helper) + podStorage := podetcd.NewStorage(helper, nil) endpointStorage := endpointetcd.NewStorage(helper) registry := NewRegistry(helper, pod.NewRegistry(podStorage.Pod), endpoint.NewRegistry(endpointStorage)) return registry diff --git a/pkg/registry/generic/rest/doc.go b/pkg/registry/generic/rest/doc.go new file mode 100644 index 00000000000..9bf2b7c78b7 --- /dev/null +++ b/pkg/registry/generic/rest/doc.go @@ -0,0 +1,19 @@ +/* +Copyright 2014 Google Inc. All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +// Package rest has generic implementations of resources used for +// REST responses +package rest diff --git a/pkg/registry/generic/rest/streamer.go b/pkg/registry/generic/rest/streamer.go new file mode 100644 index 00000000000..9cebbb95092 --- /dev/null +++ b/pkg/registry/generic/rest/streamer.go @@ -0,0 +1,69 @@ +/* +Copyright 2014 Google Inc. All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package rest + +import ( + "io" + "net/http" + "net/url" + "strings" + + "github.com/GoogleCloudPlatform/kubernetes/pkg/api/rest" +) + +// LocationStreamer is a resource that streams the contents of a particular +// location URL +type LocationStreamer struct { + Location *url.URL + Transport http.RoundTripper + ContentType string + Flush bool +} + +// a LocationStreamer must implement a rest.ResourceStreamer +var _ rest.ResourceStreamer = &LocationStreamer{} + +// IsAnAPIObject marks this object as a runtime.Object +func (*LocationStreamer) IsAnAPIObject() {} + +// InputStream returns a stream with the contents of the URL location. If no location is provided, +// a null stream is returned. +func (s *LocationStreamer) InputStream(apiVersion, acceptHeader string) (stream io.ReadCloser, flush bool, contentType string, err error) { + if s.Location == nil { + // If no location was provided, return a null stream + return nil, false, "", nil + } + transport := s.Transport + if transport == nil { + transport = http.DefaultTransport + } + client := &http.Client{Transport: transport} + resp, err := client.Get(s.Location.String()) + if err != nil { + return + } + contentType = s.ContentType + if len(contentType) == 0 { + contentType = resp.Header.Get("Content-Type") + if len(contentType) > 0 { + contentType = strings.TrimSpace(strings.SplitN(contentType, ";", 2)[0]) + } + } + flush = s.Flush + stream = resp.Body + return +} diff --git a/pkg/registry/generic/rest/streamer_test.go b/pkg/registry/generic/rest/streamer_test.go new file mode 100644 index 00000000000..53d2d9aaaa3 --- /dev/null +++ b/pkg/registry/generic/rest/streamer_test.go @@ -0,0 +1,118 @@ +/* +Copyright 2014 Google Inc. All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package rest + +import ( + "bufio" + "bytes" + "fmt" + "io/ioutil" + "net/http" + "net/http/httptest" + "net/url" + "testing" +) + +func TestInputStreamReader(t *testing.T) { + resultString := "Test output" + s := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) { + w.Write([]byte(resultString)) + })) + defer s.Close() + u, err := url.Parse(s.URL) + if err != nil { + t.Errorf("Error parsing server URL: %v", err) + return + } + streamer := &LocationStreamer{ + Location: u, + } + readCloser, _, _, err := streamer.InputStream("v1beta1", "text/plain") + if err != nil { + t.Errorf("Unexpected error when getting stream: %v", err) + return + } + defer readCloser.Close() + result, err := ioutil.ReadAll(readCloser) + if string(result) != resultString { + t.Errorf("Stream content does not match. Got: %s. Expected: %s.", string(result), resultString) + } +} + +func TestInputStreamNullLocation(t *testing.T) { + streamer := &LocationStreamer{ + Location: nil, + } + readCloser, _, _, err := streamer.InputStream("v1beta1", "text/plain") + if err != nil { + t.Errorf("Unexpected error when getting stream with null location: %v", err) + } + if readCloser != nil { + t.Errorf("Expected stream to be nil. Got: %#v", readCloser) + } +} + +type testTransport struct { + body string + err error +} + +func (tt *testTransport) RoundTrip(req *http.Request) (*http.Response, error) { + r := bufio.NewReader(bytes.NewBufferString(tt.body)) + return http.ReadResponse(r, req) +} + +func fakeTransport(mime, message string) http.RoundTripper { + content := fmt.Sprintf("HTTP/1.1 200 OK\nContent-Type: %s\n\n%s", mime, message) + return &testTransport{body: content} +} + +func TestInputStreamContentType(t *testing.T) { + location, _ := url.Parse("http://www.example.com") + streamer := &LocationStreamer{ + Location: location, + Transport: fakeTransport("application/json", "hello world"), + } + readCloser, _, contentType, err := streamer.InputStream("v1beta1", "text/plain") + if err != nil { + t.Errorf("Unexpected error when getting stream: %v", err) + return + } + defer readCloser.Close() + if contentType != "application/json" { + t.Errorf("Unexpected content type. Got: %s. Expected: application/json", contentType) + } +} + +func TestInputStreamTransport(t *testing.T) { + message := "hello world" + location, _ := url.Parse("http://www.example.com") + streamer := &LocationStreamer{ + Location: location, + Transport: fakeTransport("text/plain", message), + } + readCloser, _, _, err := streamer.InputStream("v1beta1", "text/plain") + if err != nil { + t.Errorf("Unexpected error when getting stream: %v", err) + return + } + defer readCloser.Close() + result, err := ioutil.ReadAll(readCloser) + if string(result) != message { + t.Errorf("Stream content does not match. Got: %s. Expected: %s.", string(result), message) + } +} diff --git a/pkg/registry/pod/etcd/etcd.go b/pkg/registry/pod/etcd/etcd.go index 8729ea1e557..008d27a8b9e 100644 --- a/pkg/registry/pod/etcd/etcd.go +++ b/pkg/registry/pod/etcd/etcd.go @@ -25,10 +25,12 @@ import ( "github.com/GoogleCloudPlatform/kubernetes/pkg/api/errors" etcderr "github.com/GoogleCloudPlatform/kubernetes/pkg/api/errors/etcd" "github.com/GoogleCloudPlatform/kubernetes/pkg/api/rest" + "github.com/GoogleCloudPlatform/kubernetes/pkg/client" "github.com/GoogleCloudPlatform/kubernetes/pkg/fields" "github.com/GoogleCloudPlatform/kubernetes/pkg/labels" "github.com/GoogleCloudPlatform/kubernetes/pkg/registry/generic" etcdgeneric "github.com/GoogleCloudPlatform/kubernetes/pkg/registry/generic/etcd" + genericrest "github.com/GoogleCloudPlatform/kubernetes/pkg/registry/generic/rest" "github.com/GoogleCloudPlatform/kubernetes/pkg/registry/pod" "github.com/GoogleCloudPlatform/kubernetes/pkg/runtime" "github.com/GoogleCloudPlatform/kubernetes/pkg/tools" @@ -40,6 +42,7 @@ type PodStorage struct { Pod *REST Binding *BindingREST Status *StatusREST + Log *LogREST } // REST implements a RESTStorage for pods against etcd @@ -48,7 +51,7 @@ type REST struct { } // NewStorage returns a RESTStorage object that will work against pods. -func NewStorage(h tools.EtcdHelper) PodStorage { +func NewStorage(h tools.EtcdHelper, k client.ConnectionInfoGetter) PodStorage { prefix := "/registry/pods" store := &etcdgeneric.Etcd{ NewFunc: func() runtime.Object { return &api.Pod{} }, @@ -85,6 +88,7 @@ func NewStorage(h tools.EtcdHelper) PodStorage { Pod: &REST{*store}, Binding: &BindingREST{store: store}, Status: &StatusREST{store: &statusStore}, + Log: &LogREST{store: store, kubeletConn: k}, } } @@ -186,3 +190,37 @@ func (r *StatusREST) New() runtime.Object { func (r *StatusREST) Update(ctx api.Context, obj runtime.Object) (runtime.Object, bool, error) { return r.store.Update(ctx, obj) } + +// LogREST implements the log endpoint for a Pod +type LogREST struct { + store *etcdgeneric.Etcd + kubeletConn client.ConnectionInfoGetter +} + +// New creates a new Pod log options object +func (r *LogREST) New() runtime.Object { + return &api.PodLogOptions{} +} + +// Get retrieves a runtime.Object that will stream the contents of the pod log +func (r *LogREST) Get(ctx api.Context, name string, opts runtime.Object) (runtime.Object, error) { + logOpts, ok := opts.(*api.PodLogOptions) + if !ok { + return nil, fmt.Errorf("Invalid options object: %#v", opts) + } + location, transport, err := pod.LogLocation(r.store, r.kubeletConn, ctx, name, logOpts) + if err != nil { + return nil, err + } + return &genericrest.LocationStreamer{ + Location: location, + Transport: transport, + ContentType: "text/plain", + Flush: logOpts.Follow, + }, nil +} + +// NewGetOptions creates a new options object +func (r *LogREST) NewGetOptions() runtime.Object { + return &api.PodLogOptions{} +} diff --git a/pkg/registry/pod/etcd/etcd_test.go b/pkg/registry/pod/etcd/etcd_test.go index 9d0f4dad589..d682ddda0f0 100644 --- a/pkg/registry/pod/etcd/etcd_test.go +++ b/pkg/registry/pod/etcd/etcd_test.go @@ -47,7 +47,7 @@ func newHelper(t *testing.T) (*tools.FakeEtcdClient, tools.EtcdHelper) { func newStorage(t *testing.T) (*REST, *BindingREST, *StatusREST, *tools.FakeEtcdClient, tools.EtcdHelper) { fakeEtcdClient, h := newHelper(t) - storage := NewStorage(h) + storage := NewStorage(h, nil) return storage.Pod, storage.Binding, storage.Status, fakeEtcdClient, h } @@ -89,7 +89,7 @@ func TestStorage(t *testing.T) { func TestCreate(t *testing.T) { fakeEtcdClient, helper := newHelper(t) - storage := NewStorage(helper).Pod + storage := NewStorage(helper, nil).Pod test := resttest.New(t, storage, fakeEtcdClient.SetError) pod := validNewPod() pod.ObjectMeta = api.ObjectMeta{} @@ -107,7 +107,7 @@ func TestCreate(t *testing.T) { func TestDelete(t *testing.T) { fakeEtcdClient, helper := newHelper(t) - storage := NewStorage(helper).Pod + storage := NewStorage(helper, nil).Pod test := resttest.New(t, storage, fakeEtcdClient.SetError) createFn := func() runtime.Object { @@ -143,7 +143,7 @@ func expectPod(t *testing.T, out runtime.Object) (*api.Pod, bool) { func TestCreateRegistryError(t *testing.T) { fakeEtcdClient, helper := newHelper(t) fakeEtcdClient.Err = fmt.Errorf("test error") - storage := NewStorage(helper).Pod + storage := NewStorage(helper, nil).Pod pod := validNewPod() _, err := storage.Create(api.NewDefaultContext(), pod) @@ -154,7 +154,7 @@ func TestCreateRegistryError(t *testing.T) { func TestCreateSetsFields(t *testing.T) { fakeEtcdClient, helper := newHelper(t) - storage := NewStorage(helper).Pod + storage := NewStorage(helper, nil).Pod pod := validNewPod() _, err := storage.Create(api.NewDefaultContext(), pod) if err != fakeEtcdClient.Err { @@ -176,7 +176,7 @@ func TestCreateSetsFields(t *testing.T) { func TestListError(t *testing.T) { fakeEtcdClient, helper := newHelper(t) fakeEtcdClient.Err = fmt.Errorf("test error") - storage := NewStorage(helper).Pod + storage := NewStorage(helper, nil).Pod pods, err := storage.List(api.NewDefaultContext(), labels.Everything(), fields.Everything()) if err != fakeEtcdClient.Err { t.Fatalf("Expected %#v, Got %#v", fakeEtcdClient.Err, err) @@ -194,7 +194,7 @@ func TestListEmptyPodList(t *testing.T) { E: fakeEtcdClient.NewError(tools.EtcdErrorCodeNotFound), } - storage := NewStorage(helper).Pod + storage := NewStorage(helper, nil).Pod pods, err := storage.List(api.NewContext(), labels.Everything(), fields.Everything()) if err != nil { t.Fatalf("unexpected error: %v", err) @@ -231,7 +231,7 @@ func TestListPodList(t *testing.T) { }, }, } - storage := NewStorage(helper).Pod + storage := NewStorage(helper, nil).Pod podsObj, err := storage.List(api.NewDefaultContext(), labels.Everything(), fields.Everything()) pods := podsObj.(*api.PodList) @@ -280,7 +280,7 @@ func TestListPodListSelection(t *testing.T) { }, }, } - storage := NewStorage(helper).Pod + storage := NewStorage(helper, nil).Pod ctx := api.NewDefaultContext() @@ -345,7 +345,7 @@ func TestListPodListSelection(t *testing.T) { } func TestPodDecode(t *testing.T) { - storage := NewStorage(tools.EtcdHelper{}).Pod + storage := NewStorage(tools.EtcdHelper{}, nil).Pod expected := validNewPod() body, err := latest.Codec.Encode(expected) if err != nil { @@ -375,7 +375,7 @@ func TestGet(t *testing.T) { }, }, } - storage := NewStorage(helper).Pod + storage := NewStorage(helper, nil).Pod obj, err := storage.Get(api.WithNamespace(api.NewContext(), "test"), "foo") pod := obj.(*api.Pod) @@ -392,7 +392,7 @@ func TestGet(t *testing.T) { func TestPodStorageValidatesCreate(t *testing.T) { fakeEtcdClient, helper := newHelper(t) fakeEtcdClient.Err = fmt.Errorf("test error") - storage := NewStorage(helper).Pod + storage := NewStorage(helper, nil).Pod pod := validNewPod() pod.Labels = map[string]string{ @@ -410,7 +410,7 @@ func TestPodStorageValidatesCreate(t *testing.T) { // TODO: remove, this is covered by RESTTest.TestCreate func TestCreatePod(t *testing.T) { _, helper := newHelper(t) - storage := NewStorage(helper).Pod + storage := NewStorage(helper, nil).Pod pod := validNewPod() obj, err := storage.Create(api.NewDefaultContext(), pod) @@ -432,7 +432,7 @@ func TestCreatePod(t *testing.T) { // TODO: remove, this is covered by RESTTest.TestCreate func TestCreateWithConflictingNamespace(t *testing.T) { _, helper := newHelper(t) - storage := NewStorage(helper).Pod + storage := NewStorage(helper, nil).Pod pod := validNewPod() pod.Namespace = "not-default" @@ -461,7 +461,7 @@ func TestUpdateWithConflictingNamespace(t *testing.T) { }, }, } - storage := NewStorage(helper).Pod + storage := NewStorage(helper, nil).Pod pod := validChangedPod() pod.Namespace = "not-default" @@ -578,7 +578,7 @@ func TestResourceLocation(t *testing.T) { }, }, } - storage := NewStorage(helper).Pod + storage := NewStorage(helper, nil).Pod redirector := rest.Redirector(storage) location, _, err := redirector.ResourceLocation(api.NewDefaultContext(), tc.query) @@ -616,7 +616,7 @@ func TestDeletePod(t *testing.T) { }, }, } - storage := NewStorage(helper).Pod + storage := NewStorage(helper, nil).Pod _, err := storage.Delete(api.NewDefaultContext(), "foo", nil) if err != nil { diff --git a/pkg/registry/pod/rest.go b/pkg/registry/pod/rest.go index 9c1867d853e..04aab008267 100644 --- a/pkg/registry/pod/rest.go +++ b/pkg/registry/pod/rest.go @@ -26,6 +26,7 @@ import ( "github.com/GoogleCloudPlatform/kubernetes/pkg/api" "github.com/GoogleCloudPlatform/kubernetes/pkg/api/errors" "github.com/GoogleCloudPlatform/kubernetes/pkg/api/validation" + "github.com/GoogleCloudPlatform/kubernetes/pkg/client" "github.com/GoogleCloudPlatform/kubernetes/pkg/fields" "github.com/GoogleCloudPlatform/kubernetes/pkg/labels" "github.com/GoogleCloudPlatform/kubernetes/pkg/registry/generic" @@ -133,6 +134,18 @@ type ResourceGetter interface { Get(api.Context, string) (runtime.Object, error) } +func getPod(getter ResourceGetter, ctx api.Context, name string) (*api.Pod, error) { + obj, err := getter.Get(ctx, name) + if err != nil { + return nil, err + } + pod := obj.(*api.Pod) + if pod == nil { + return nil, fmt.Errorf("Unexpected object type: %#v", pod) + } + return pod, nil +} + // ResourceLocation returns a URL to which one can send traffic for the specified pod. func ResourceLocation(getter ResourceGetter, ctx api.Context, id string) (*url.URL, http.RoundTripper, error) { // Allow ID as "podname" or "podname:port". If port is not specified, @@ -148,14 +161,10 @@ func ResourceLocation(getter ResourceGetter, ctx api.Context, id string) (*url.U port = parts[1] } - obj, err := getter.Get(ctx, name) + pod, err := getPod(getter, ctx, name) if err != nil { return nil, nil, err } - pod := obj.(*api.Pod) - if pod == nil { - return nil, nil, nil - } // Try to figure out a port. if port == "" { @@ -177,3 +186,43 @@ func ResourceLocation(getter ResourceGetter, ctx api.Context, id string) (*url.U } return loc, nil, nil } + +// LogLocation returns a the log URL for a pod container. If opts.Container is blank +// and only one container is present in the pod, that container is used. +func LogLocation(getter ResourceGetter, connInfo client.ConnectionInfoGetter, ctx api.Context, name string, opts *api.PodLogOptions) (*url.URL, http.RoundTripper, error) { + + pod, err := getPod(getter, ctx, name) + if err != nil { + return nil, nil, err + } + + // Try to figure out a container + container := opts.Container + if container == "" { + if len(pod.Spec.Containers) == 1 { + container = pod.Spec.Containers[0].Name + } else { + return nil, nil, fmt.Errorf("a container name must be specified for pod %s", name) + } + } + nodeHost := pod.Status.HostIP + if len(nodeHost) == 0 { + // If pod has not been assigned a host, return an empty location + return nil, nil, nil + } + nodeScheme, nodePort, nodeTransport, err := connInfo.GetConnectionInfo(nodeHost) + if err != nil { + return nil, nil, err + } + params := url.Values{} + if opts.Follow { + params.Add("follow", "true") + } + loc := &url.URL{ + Scheme: nodeScheme, + Host: fmt.Sprintf("%s:%d", nodeHost, nodePort), + Path: fmt.Sprintf("/containerLogs/%s/%s/%s", pod.Namespace, name, container), + RawQuery: params.Encode(), + } + return loc, nodeTransport, nil +}