diff --git a/pkg/apiserver/watch_test.go b/pkg/apiserver/watch_test.go index 8f2d0c42b76..858e0bd441c 100644 --- a/pkg/apiserver/watch_test.go +++ b/pkg/apiserver/watch_test.go @@ -241,7 +241,7 @@ func TestWatchRead(t *testing.T) { } objectCodec := api.Codecs.DecoderToVersion(objectSerializer, testInternalGroupVersion) - var fr io.Reader = r + var fr io.ReadCloser = r if !protocol.selfFraming { fr = framer.NewFrameReader(r) } diff --git a/pkg/runtime/helper.go b/pkg/runtime/helper.go index 8bb33a91f4c..8cb4c207d56 100644 --- a/pkg/runtime/helper.go +++ b/pkg/runtime/helper.go @@ -187,5 +187,5 @@ var DefaultFramer = defaultFramer{} type defaultFramer struct{} -func (defaultFramer) NewFrameReader(r io.Reader) io.Reader { return r } -func (defaultFramer) NewFrameWriter(w io.Writer) io.Writer { return w } +func (defaultFramer) NewFrameReader(r io.ReadCloser) io.ReadCloser { return r } +func (defaultFramer) NewFrameWriter(w io.Writer) io.Writer { return w } diff --git a/pkg/runtime/interfaces.go b/pkg/runtime/interfaces.go index be2fa71321b..9954238a9cd 100644 --- a/pkg/runtime/interfaces.go +++ b/pkg/runtime/interfaces.go @@ -82,7 +82,7 @@ type ParameterCodec interface { // Framer is a factory for creating readers and writers that obey a particular framing pattern. type Framer interface { - NewFrameReader(r io.Reader) io.Reader + NewFrameReader(r io.ReadCloser) io.ReadCloser NewFrameWriter(w io.Writer) io.Writer } diff --git a/pkg/runtime/serializer/json/json.go b/pkg/runtime/serializer/json/json.go index e91fe262042..61f01215e04 100644 --- a/pkg/runtime/serializer/json/json.go +++ b/pkg/runtime/serializer/json/json.go @@ -214,7 +214,7 @@ func (jsonFramer) NewFrameWriter(w io.Writer) io.Writer { } // NewFrameReader implements stream framing for this serializer -func (jsonFramer) NewFrameReader(r io.Reader) io.Reader { +func (jsonFramer) NewFrameReader(r io.ReadCloser) io.ReadCloser { // we need to extract the JSON chunks of data to pass to Decode() return framer.NewJSONFramedReader(r) } @@ -230,7 +230,7 @@ func (yamlFramer) NewFrameWriter(w io.Writer) io.Writer { } // NewFrameReader implements stream framing for this serializer -func (yamlFramer) NewFrameReader(r io.Reader) io.Reader { +func (yamlFramer) NewFrameReader(r io.ReadCloser) io.ReadCloser { // extract the YAML document chunks directly return utilyaml.NewDocumentDecoder(r) } diff --git a/pkg/runtime/serializer/protobuf/protobuf.go b/pkg/runtime/serializer/protobuf/protobuf.go index c55fa1cfd0a..25ac1cdaaf5 100644 --- a/pkg/runtime/serializer/protobuf/protobuf.go +++ b/pkg/runtime/serializer/protobuf/protobuf.go @@ -435,6 +435,6 @@ func (lengthDelimitedFramer) NewFrameWriter(w io.Writer) io.Writer { } // NewFrameReader implements stream framing for this serializer -func (lengthDelimitedFramer) NewFrameReader(r io.Reader) io.Reader { +func (lengthDelimitedFramer) NewFrameReader(r io.ReadCloser) io.ReadCloser { return framer.NewLengthDelimitedFrameReader(r) } diff --git a/pkg/runtime/serializer/streaming/streaming.go b/pkg/runtime/serializer/streaming/streaming.go index 2268ebeb3e3..8bc39d66e6b 100644 --- a/pkg/runtime/serializer/streaming/streaming.go +++ b/pkg/runtime/serializer/streaming/streaming.go @@ -38,16 +38,18 @@ type Encoder interface { type Decoder interface { // Decode will return io.EOF when no more objects are available. Decode(defaults *unversioned.GroupVersionKind, into runtime.Object) (runtime.Object, *unversioned.GroupVersionKind, error) + // Close closes the underlying stream. + Close() error } // Serializer is a factory for creating encoders and decoders that work over streams. type Serializer interface { NewEncoder(w io.Writer) Encoder - NewDecoder(r io.Reader) Decoder + NewDecoder(r io.ReadCloser) Decoder } type decoder struct { - reader io.Reader + reader io.ReadCloser decoder runtime.Decoder buf []byte maxBytes int @@ -57,7 +59,7 @@ type decoder struct { // NewDecoder creates a streaming decoder that reads object chunks from r and decodes them with d. // The reader is expected to return ErrShortRead if the provided buffer is not large enough to read // an entire object. -func NewDecoder(r io.Reader, d runtime.Decoder) Decoder { +func NewDecoder(r io.ReadCloser, d runtime.Decoder) Decoder { return &decoder{ reader: r, decoder: d, @@ -105,6 +107,10 @@ func (d *decoder) Decode(defaults *unversioned.GroupVersionKind, into runtime.Ob return d.decoder.Decode(d.buf[:base], defaults, into) } +func (d *decoder) Close() error { + return d.reader.Close() +} + type encoder struct { writer io.Writer encoder runtime.Encoder diff --git a/pkg/runtime/serializer/streaming/streaming_test.go b/pkg/runtime/serializer/streaming/streaming_test.go index a5a7f0d5c1b..b3d500c986e 100644 --- a/pkg/runtime/serializer/streaming/streaming_test.go +++ b/pkg/runtime/serializer/streaming/streaming_test.go @@ -19,6 +19,7 @@ package streaming import ( "bytes" "io" + "io/ioutil" "testing" "k8s.io/kubernetes/pkg/api/unversioned" @@ -40,7 +41,7 @@ func (d *fakeDecoder) Decode(data []byte, gvk *unversioned.GroupVersionKind, int func TestEmptyDecoder(t *testing.T) { buf := bytes.NewBuffer([]byte{}) d := &fakeDecoder{} - _, _, err := NewDecoder(buf, d).Decode(nil, nil) + _, _, err := NewDecoder(ioutil.NopCloser(buf), d).Decode(nil, nil) if err != io.EOF { t.Fatal(err) } diff --git a/pkg/util/framer/framer.go b/pkg/util/framer/framer.go index 254e8865737..b2a0269c505 100644 --- a/pkg/util/framer/framer.go +++ b/pkg/util/framer/framer.go @@ -47,7 +47,7 @@ func (w *lengthDelimitedFrameWriter) Write(data []byte) (int, error) { } type lengthDelimitedFrameReader struct { - r io.Reader + r io.ReadCloser remaining int } @@ -63,7 +63,7 @@ type lengthDelimitedFrameReader struct { // // If the buffer passed to Read is not long enough to contain an entire frame, io.ErrShortRead // will be returned along with the number of bytes read. -func NewLengthDelimitedFrameReader(r io.Reader) io.Reader { +func NewLengthDelimitedFrameReader(r io.ReadCloser) io.ReadCloser { return &lengthDelimitedFrameReader{r: r} } @@ -104,7 +104,12 @@ func (r *lengthDelimitedFrameReader) Read(data []byte) (int, error) { return n, nil } +func (r *lengthDelimitedFrameReader) Close() error { + return r.r.Close() +} + type jsonFrameReader struct { + r io.ReadCloser decoder *json.Decoder remaining []byte } @@ -114,8 +119,9 @@ type jsonFrameReader struct { // // The boundaries between each frame are valid JSON objects. A JSON parsing error will terminate // the read. -func NewJSONFramedReader(r io.Reader) io.Reader { +func NewJSONFramedReader(r io.ReadCloser) io.ReadCloser { return &jsonFrameReader{ + r: r, decoder: json.NewDecoder(r), } } @@ -154,3 +160,7 @@ func (r *jsonFrameReader) Read(data []byte) (int, error) { } return len(m), nil } + +func (r *jsonFrameReader) Close() error { + return r.r.Close() +} diff --git a/pkg/util/framer/framer_test.go b/pkg/util/framer/framer_test.go index 83f712a6efc..661cab43ea5 100644 --- a/pkg/util/framer/framer_test.go +++ b/pkg/util/framer/framer_test.go @@ -19,6 +19,7 @@ package framer import ( "bytes" "io" + "io/ioutil" "testing" ) @@ -33,7 +34,7 @@ func TestRead(t *testing.T) { 0x08, } b := bytes.NewBuffer(data) - r := NewLengthDelimitedFrameReader(b) + r := NewLengthDelimitedFrameReader(ioutil.NopCloser(b)) buf := make([]byte, 1) if n, err := r.Read(buf); err != io.ErrShortBuffer && n != 1 && bytes.Equal(buf, []byte{0x01}) { t.Fatalf("unexpected: %v %d %v", err, n, buf) @@ -78,7 +79,7 @@ func TestReadLarge(t *testing.T) { 0x08, } b := bytes.NewBuffer(data) - r := NewLengthDelimitedFrameReader(b) + r := NewLengthDelimitedFrameReader(ioutil.NopCloser(b)) buf := make([]byte, 40) if n, err := r.Read(buf); err != nil && n != 4 && bytes.Equal(buf, []byte{0x01, 0x02, 0x03, 0x04}) { t.Fatalf("unexpected: %v %d %v", err, n, buf) @@ -103,7 +104,7 @@ func TestReadInvalidFrame(t *testing.T) { 0x01, 0x02, } b := bytes.NewBuffer(data) - r := NewLengthDelimitedFrameReader(b) + r := NewLengthDelimitedFrameReader(ioutil.NopCloser(b)) buf := make([]byte, 1) if n, err := r.Read(buf); err != io.ErrShortBuffer && n != 1 && bytes.Equal(buf, []byte{0x01}) { t.Fatalf("unexpected: %v %d %v", err, n, buf) @@ -121,7 +122,7 @@ func TestReadInvalidFrame(t *testing.T) { func TestJSONFrameReader(t *testing.T) { b := bytes.NewBufferString("{\"test\":true}\n1\n[\"a\"]") - r := NewJSONFramedReader(b) + r := NewJSONFramedReader(ioutil.NopCloser(b)) buf := make([]byte, 20) if n, err := r.Read(buf); err != nil || n != 13 || string(buf[:n]) != `{"test":true}` { t.Fatalf("unexpected: %v %d %q", err, n, buf) @@ -139,7 +140,7 @@ func TestJSONFrameReader(t *testing.T) { func TestJSONFrameReaderShortBuffer(t *testing.T) { b := bytes.NewBufferString("{\"test\":true}\n1\n[\"a\"]") - r := NewJSONFramedReader(b) + r := NewJSONFramedReader(ioutil.NopCloser(b)) buf := make([]byte, 0, 3) if n, err := r.Read(buf); err != io.ErrShortBuffer || n != 3 || string(buf[:n]) != `{"t` { diff --git a/pkg/util/yaml/decoder.go b/pkg/util/yaml/decoder.go index 519242a383a..7f06ba1bb38 100644 --- a/pkg/util/yaml/decoder.go +++ b/pkg/util/yaml/decoder.go @@ -78,6 +78,7 @@ func (d *YAMLToJSONDecoder) Decode(into interface{}) error { // YAMLDecoder reads chunks of objects and returns ErrShortBuffer if // the data is not sufficient. type YAMLDecoder struct { + r io.ReadCloser scanner *bufio.Scanner remaining []byte } @@ -87,10 +88,11 @@ type YAMLDecoder struct { // the YAML spec) into its own chunk. io.ErrShortBuffer will be // returned if the entire buffer could not be read to assist // the caller in framing the chunk. -func NewDocumentDecoder(r io.Reader) io.Reader { +func NewDocumentDecoder(r io.ReadCloser) io.ReadCloser { scanner := bufio.NewScanner(r) scanner.Split(splitYAMLDocument) return &YAMLDecoder{ + r: r, scanner: scanner, } } @@ -127,6 +129,10 @@ func (d *YAMLDecoder) Read(data []byte) (n int, err error) { return len(data), io.ErrShortBuffer } +func (d *YAMLDecoder) Close() error { + return d.r.Close() +} + const yamlSeparator = "\n---" // splitYAMLDocument is a bufio.SplitFunc for splitting YAML streams into individual documents.