diff --git a/pkg/kubelet/handlers.go b/pkg/kubelet/handlers.go index 2e8181b3c08..0f37709e70a 100644 --- a/pkg/kubelet/handlers.go +++ b/pkg/kubelet/handlers.go @@ -18,9 +18,7 @@ package kubelet import ( "fmt" - "io" "net" - "net/http" "strconv" "github.com/GoogleCloudPlatform/kubernetes/pkg/api" @@ -95,21 +93,3 @@ func (h *httpActionHandler) Run(podFullName string, uid types.UID, container *ap _, err := h.client.Get(url) return err } - -// FlushWriter provides wrapper for responseWriter with HTTP streaming capabilities -type FlushWriter struct { - flusher http.Flusher - writer io.Writer -} - -// Write is a FlushWriter implementation of the io.Writer that sends any buffered data to the client. -func (fw *FlushWriter) Write(p []byte) (n int, err error) { - n, err = fw.writer.Write(p) - if err != nil { - return - } - if fw.flusher != nil { - fw.flusher.Flush() - } - return -} diff --git a/pkg/kubelet/server.go b/pkg/kubelet/server.go index 3eca3ce1e20..a63a40c2873 100644 --- a/pkg/kubelet/server.go +++ b/pkg/kubelet/server.go @@ -38,6 +38,7 @@ import ( kubecontainer "github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/container" "github.com/GoogleCloudPlatform/kubernetes/pkg/runtime" "github.com/GoogleCloudPlatform/kubernetes/pkg/types" + "github.com/GoogleCloudPlatform/kubernetes/pkg/util/flushwriter" "github.com/GoogleCloudPlatform/kubernetes/pkg/util/httpstream" "github.com/GoogleCloudPlatform/kubernetes/pkg/util/httpstream/spdy" "github.com/golang/glog" @@ -262,15 +263,14 @@ func (s *Server) handleContainerLogs(w http.ResponseWriter, req *http.Request) { return } - fw := FlushWriter{writer: w} - if flusher, ok := fw.writer.(http.Flusher); ok { - fw.flusher = flusher - } else { - s.error(w, fmt.Errorf("unable to convert %v into http.Flusher", fw)) + if _, ok := w.(http.Flusher); !ok { + s.error(w, fmt.Errorf("unable to convert %v into http.Flusher", w)) + return } + fw := flushwriter.Wrap(w) w.Header().Set("Transfer-Encoding", "chunked") w.WriteHeader(http.StatusOK) - err = s.host.GetKubeletContainerLogs(kubecontainer.GetPodFullName(pod), containerName, tail, follow, &fw, &fw) + err = s.host.GetKubeletContainerLogs(kubecontainer.GetPodFullName(pod), containerName, tail, follow, fw, fw) if err != nil { s.error(w, err) return diff --git a/pkg/util/flushwriter/doc.go b/pkg/util/flushwriter/doc.go new file mode 100644 index 00000000000..5a81e510732 --- /dev/null +++ b/pkg/util/flushwriter/doc.go @@ -0,0 +1,19 @@ +/* +Copyright 2014 Google Inc. All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +// Package flushwriter implements a wrapper for a writer that flushes on every +// write if that writer implements the io.Flusher interface +package flushwriter diff --git a/pkg/util/flushwriter/writer.go b/pkg/util/flushwriter/writer.go new file mode 100644 index 00000000000..01dffb44a37 --- /dev/null +++ b/pkg/util/flushwriter/writer.go @@ -0,0 +1,53 @@ +/* +Copyright 2014 Google Inc. All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package flushwriter + +import ( + "io" + "net/http" +) + +// Wrap wraps an io.Writer into a writer that flushes after every write if +// the writer implements the Flusher interface. +func Wrap(w io.Writer) io.Writer { + fw := &flushWriter{ + writer: w, + } + if flusher, ok := w.(http.Flusher); ok { + fw.flusher = flusher + } + return fw +} + +// flushWriter provides wrapper for responseWriter with HTTP streaming capabilities +type flushWriter struct { + flusher http.Flusher + writer io.Writer +} + +// Write is a FlushWriter implementation of the io.Writer that sends any buffered +// data to the client. +func (fw *flushWriter) Write(p []byte) (n int, err error) { + n, err = fw.writer.Write(p) + if err != nil { + return + } + if fw.flusher != nil { + fw.flusher.Flush() + } + return +} diff --git a/pkg/util/flushwriter/writer_test.go b/pkg/util/flushwriter/writer_test.go new file mode 100644 index 00000000000..61ba9795416 --- /dev/null +++ b/pkg/util/flushwriter/writer_test.go @@ -0,0 +1,86 @@ +/* +Copyright 2014 Google Inc. All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package flushwriter + +import ( + "fmt" + "testing" +) + +type writerWithFlush struct { + writeCount, flushCount int + err error +} + +func (w *writerWithFlush) Flush() { + w.flushCount++ +} + +func (w *writerWithFlush) Write(p []byte) (n int, err error) { + w.writeCount++ + return len(p), w.err +} + +type writerWithNoFlush struct { + writeCount int +} + +func (w *writerWithNoFlush) Write(p []byte) (n int, err error) { + w.writeCount++ + return len(p), nil +} + +func TestWriteWithFlush(t *testing.T) { + w := &writerWithFlush{} + fw := Wrap(w) + for i := 0; i < 10; i++ { + _, err := fw.Write([]byte("Test write")) + if err != nil { + t.Errorf("Unexpected error while writing with flush writer: %v", err) + } + } + if w.flushCount != 10 { + t.Errorf("Flush not called the expected number of times. Actual: %d", w.flushCount) + } + if w.writeCount != 10 { + t.Errorf("Write not called the expected number of times. Actual: %d", w.writeCount) + } +} + +func TestWriteWithoutFlush(t *testing.T) { + w := &writerWithNoFlush{} + fw := Wrap(w) + for i := 0; i < 10; i++ { + _, err := fw.Write([]byte("Test write")) + if err != nil { + t.Errorf("Unexpected error while writing with flush writer: %v", err) + } + } + if w.writeCount != 10 { + t.Errorf("Write not called the expected number of times. Actual: %d", w.writeCount) + } +} + +func TestWriteError(t *testing.T) { + e := fmt.Errorf("Error") + w := &writerWithFlush{err: e} + fw := Wrap(w) + _, err := fw.Write([]byte("Test write")) + if err != e { + t.Errorf("Did not get expected error. Got: %#v", err) + } +}