Support Close() in streaming decoder

This commit is contained in:
Wojciech Tyczynski 2016-04-28 08:40:12 +02:00
parent 4a0e0826e5
commit 3175d18b14
10 changed files with 44 additions and 20 deletions

View File

@ -241,7 +241,7 @@ func TestWatchRead(t *testing.T) {
} }
objectCodec := api.Codecs.DecoderToVersion(objectSerializer, testInternalGroupVersion) objectCodec := api.Codecs.DecoderToVersion(objectSerializer, testInternalGroupVersion)
var fr io.Reader = r var fr io.ReadCloser = r
if !protocol.selfFraming { if !protocol.selfFraming {
fr = framer.NewFrameReader(r) fr = framer.NewFrameReader(r)
} }

View File

@ -187,5 +187,5 @@ var DefaultFramer = defaultFramer{}
type defaultFramer struct{} type defaultFramer struct{}
func (defaultFramer) NewFrameReader(r io.Reader) io.Reader { return r } func (defaultFramer) NewFrameReader(r io.ReadCloser) io.ReadCloser { return r }
func (defaultFramer) NewFrameWriter(w io.Writer) io.Writer { return w } func (defaultFramer) NewFrameWriter(w io.Writer) io.Writer { return w }

View File

@ -82,7 +82,7 @@ type ParameterCodec interface {
// Framer is a factory for creating readers and writers that obey a particular framing pattern. // Framer is a factory for creating readers and writers that obey a particular framing pattern.
type Framer interface { type Framer interface {
NewFrameReader(r io.Reader) io.Reader NewFrameReader(r io.ReadCloser) io.ReadCloser
NewFrameWriter(w io.Writer) io.Writer NewFrameWriter(w io.Writer) io.Writer
} }

View File

@ -214,7 +214,7 @@ func (jsonFramer) NewFrameWriter(w io.Writer) io.Writer {
} }
// NewFrameReader implements stream framing for this serializer // NewFrameReader implements stream framing for this serializer
func (jsonFramer) NewFrameReader(r io.Reader) io.Reader { func (jsonFramer) NewFrameReader(r io.ReadCloser) io.ReadCloser {
// we need to extract the JSON chunks of data to pass to Decode() // we need to extract the JSON chunks of data to pass to Decode()
return framer.NewJSONFramedReader(r) return framer.NewJSONFramedReader(r)
} }
@ -230,7 +230,7 @@ func (yamlFramer) NewFrameWriter(w io.Writer) io.Writer {
} }
// NewFrameReader implements stream framing for this serializer // NewFrameReader implements stream framing for this serializer
func (yamlFramer) NewFrameReader(r io.Reader) io.Reader { func (yamlFramer) NewFrameReader(r io.ReadCloser) io.ReadCloser {
// extract the YAML document chunks directly // extract the YAML document chunks directly
return utilyaml.NewDocumentDecoder(r) return utilyaml.NewDocumentDecoder(r)
} }

View File

@ -435,6 +435,6 @@ func (lengthDelimitedFramer) NewFrameWriter(w io.Writer) io.Writer {
} }
// NewFrameReader implements stream framing for this serializer // NewFrameReader implements stream framing for this serializer
func (lengthDelimitedFramer) NewFrameReader(r io.Reader) io.Reader { func (lengthDelimitedFramer) NewFrameReader(r io.ReadCloser) io.ReadCloser {
return framer.NewLengthDelimitedFrameReader(r) return framer.NewLengthDelimitedFrameReader(r)
} }

View File

@ -38,16 +38,18 @@ type Encoder interface {
type Decoder interface { type Decoder interface {
// Decode will return io.EOF when no more objects are available. // Decode will return io.EOF when no more objects are available.
Decode(defaults *unversioned.GroupVersionKind, into runtime.Object) (runtime.Object, *unversioned.GroupVersionKind, error) Decode(defaults *unversioned.GroupVersionKind, into runtime.Object) (runtime.Object, *unversioned.GroupVersionKind, error)
// Close closes the underlying stream.
Close() error
} }
// Serializer is a factory for creating encoders and decoders that work over streams. // Serializer is a factory for creating encoders and decoders that work over streams.
type Serializer interface { type Serializer interface {
NewEncoder(w io.Writer) Encoder NewEncoder(w io.Writer) Encoder
NewDecoder(r io.Reader) Decoder NewDecoder(r io.ReadCloser) Decoder
} }
type decoder struct { type decoder struct {
reader io.Reader reader io.ReadCloser
decoder runtime.Decoder decoder runtime.Decoder
buf []byte buf []byte
maxBytes int maxBytes int
@ -57,7 +59,7 @@ type decoder struct {
// NewDecoder creates a streaming decoder that reads object chunks from r and decodes them with d. // NewDecoder creates a streaming decoder that reads object chunks from r and decodes them with d.
// The reader is expected to return ErrShortRead if the provided buffer is not large enough to read // The reader is expected to return ErrShortRead if the provided buffer is not large enough to read
// an entire object. // an entire object.
func NewDecoder(r io.Reader, d runtime.Decoder) Decoder { func NewDecoder(r io.ReadCloser, d runtime.Decoder) Decoder {
return &decoder{ return &decoder{
reader: r, reader: r,
decoder: d, decoder: d,
@ -105,6 +107,10 @@ func (d *decoder) Decode(defaults *unversioned.GroupVersionKind, into runtime.Ob
return d.decoder.Decode(d.buf[:base], defaults, into) return d.decoder.Decode(d.buf[:base], defaults, into)
} }
func (d *decoder) Close() error {
return d.reader.Close()
}
type encoder struct { type encoder struct {
writer io.Writer writer io.Writer
encoder runtime.Encoder encoder runtime.Encoder

View File

@ -19,6 +19,7 @@ package streaming
import ( import (
"bytes" "bytes"
"io" "io"
"io/ioutil"
"testing" "testing"
"k8s.io/kubernetes/pkg/api/unversioned" "k8s.io/kubernetes/pkg/api/unversioned"
@ -40,7 +41,7 @@ func (d *fakeDecoder) Decode(data []byte, gvk *unversioned.GroupVersionKind, int
func TestEmptyDecoder(t *testing.T) { func TestEmptyDecoder(t *testing.T) {
buf := bytes.NewBuffer([]byte{}) buf := bytes.NewBuffer([]byte{})
d := &fakeDecoder{} d := &fakeDecoder{}
_, _, err := NewDecoder(buf, d).Decode(nil, nil) _, _, err := NewDecoder(ioutil.NopCloser(buf), d).Decode(nil, nil)
if err != io.EOF { if err != io.EOF {
t.Fatal(err) t.Fatal(err)
} }

View File

@ -47,7 +47,7 @@ func (w *lengthDelimitedFrameWriter) Write(data []byte) (int, error) {
} }
type lengthDelimitedFrameReader struct { type lengthDelimitedFrameReader struct {
r io.Reader r io.ReadCloser
remaining int remaining int
} }
@ -63,7 +63,7 @@ type lengthDelimitedFrameReader struct {
// //
// If the buffer passed to Read is not long enough to contain an entire frame, io.ErrShortRead // If the buffer passed to Read is not long enough to contain an entire frame, io.ErrShortRead
// will be returned along with the number of bytes read. // will be returned along with the number of bytes read.
func NewLengthDelimitedFrameReader(r io.Reader) io.Reader { func NewLengthDelimitedFrameReader(r io.ReadCloser) io.ReadCloser {
return &lengthDelimitedFrameReader{r: r} return &lengthDelimitedFrameReader{r: r}
} }
@ -104,7 +104,12 @@ func (r *lengthDelimitedFrameReader) Read(data []byte) (int, error) {
return n, nil return n, nil
} }
func (r *lengthDelimitedFrameReader) Close() error {
return r.r.Close()
}
type jsonFrameReader struct { type jsonFrameReader struct {
r io.ReadCloser
decoder *json.Decoder decoder *json.Decoder
remaining []byte remaining []byte
} }
@ -114,8 +119,9 @@ type jsonFrameReader struct {
// //
// The boundaries between each frame are valid JSON objects. A JSON parsing error will terminate // The boundaries between each frame are valid JSON objects. A JSON parsing error will terminate
// the read. // the read.
func NewJSONFramedReader(r io.Reader) io.Reader { func NewJSONFramedReader(r io.ReadCloser) io.ReadCloser {
return &jsonFrameReader{ return &jsonFrameReader{
r: r,
decoder: json.NewDecoder(r), decoder: json.NewDecoder(r),
} }
} }
@ -154,3 +160,7 @@ func (r *jsonFrameReader) Read(data []byte) (int, error) {
} }
return len(m), nil return len(m), nil
} }
func (r *jsonFrameReader) Close() error {
return r.r.Close()
}

View File

@ -19,6 +19,7 @@ package framer
import ( import (
"bytes" "bytes"
"io" "io"
"io/ioutil"
"testing" "testing"
) )
@ -33,7 +34,7 @@ func TestRead(t *testing.T) {
0x08, 0x08,
} }
b := bytes.NewBuffer(data) b := bytes.NewBuffer(data)
r := NewLengthDelimitedFrameReader(b) r := NewLengthDelimitedFrameReader(ioutil.NopCloser(b))
buf := make([]byte, 1) buf := make([]byte, 1)
if n, err := r.Read(buf); err != io.ErrShortBuffer && n != 1 && bytes.Equal(buf, []byte{0x01}) { if n, err := r.Read(buf); err != io.ErrShortBuffer && n != 1 && bytes.Equal(buf, []byte{0x01}) {
t.Fatalf("unexpected: %v %d %v", err, n, buf) t.Fatalf("unexpected: %v %d %v", err, n, buf)
@ -78,7 +79,7 @@ func TestReadLarge(t *testing.T) {
0x08, 0x08,
} }
b := bytes.NewBuffer(data) b := bytes.NewBuffer(data)
r := NewLengthDelimitedFrameReader(b) r := NewLengthDelimitedFrameReader(ioutil.NopCloser(b))
buf := make([]byte, 40) buf := make([]byte, 40)
if n, err := r.Read(buf); err != nil && n != 4 && bytes.Equal(buf, []byte{0x01, 0x02, 0x03, 0x04}) { if n, err := r.Read(buf); err != nil && n != 4 && bytes.Equal(buf, []byte{0x01, 0x02, 0x03, 0x04}) {
t.Fatalf("unexpected: %v %d %v", err, n, buf) t.Fatalf("unexpected: %v %d %v", err, n, buf)
@ -103,7 +104,7 @@ func TestReadInvalidFrame(t *testing.T) {
0x01, 0x02, 0x01, 0x02,
} }
b := bytes.NewBuffer(data) b := bytes.NewBuffer(data)
r := NewLengthDelimitedFrameReader(b) r := NewLengthDelimitedFrameReader(ioutil.NopCloser(b))
buf := make([]byte, 1) buf := make([]byte, 1)
if n, err := r.Read(buf); err != io.ErrShortBuffer && n != 1 && bytes.Equal(buf, []byte{0x01}) { if n, err := r.Read(buf); err != io.ErrShortBuffer && n != 1 && bytes.Equal(buf, []byte{0x01}) {
t.Fatalf("unexpected: %v %d %v", err, n, buf) t.Fatalf("unexpected: %v %d %v", err, n, buf)
@ -121,7 +122,7 @@ func TestReadInvalidFrame(t *testing.T) {
func TestJSONFrameReader(t *testing.T) { func TestJSONFrameReader(t *testing.T) {
b := bytes.NewBufferString("{\"test\":true}\n1\n[\"a\"]") b := bytes.NewBufferString("{\"test\":true}\n1\n[\"a\"]")
r := NewJSONFramedReader(b) r := NewJSONFramedReader(ioutil.NopCloser(b))
buf := make([]byte, 20) buf := make([]byte, 20)
if n, err := r.Read(buf); err != nil || n != 13 || string(buf[:n]) != `{"test":true}` { if n, err := r.Read(buf); err != nil || n != 13 || string(buf[:n]) != `{"test":true}` {
t.Fatalf("unexpected: %v %d %q", err, n, buf) t.Fatalf("unexpected: %v %d %q", err, n, buf)
@ -139,7 +140,7 @@ func TestJSONFrameReader(t *testing.T) {
func TestJSONFrameReaderShortBuffer(t *testing.T) { func TestJSONFrameReaderShortBuffer(t *testing.T) {
b := bytes.NewBufferString("{\"test\":true}\n1\n[\"a\"]") b := bytes.NewBufferString("{\"test\":true}\n1\n[\"a\"]")
r := NewJSONFramedReader(b) r := NewJSONFramedReader(ioutil.NopCloser(b))
buf := make([]byte, 0, 3) buf := make([]byte, 0, 3)
if n, err := r.Read(buf); err != io.ErrShortBuffer || n != 3 || string(buf[:n]) != `{"t` { if n, err := r.Read(buf); err != io.ErrShortBuffer || n != 3 || string(buf[:n]) != `{"t` {

View File

@ -78,6 +78,7 @@ func (d *YAMLToJSONDecoder) Decode(into interface{}) error {
// YAMLDecoder reads chunks of objects and returns ErrShortBuffer if // YAMLDecoder reads chunks of objects and returns ErrShortBuffer if
// the data is not sufficient. // the data is not sufficient.
type YAMLDecoder struct { type YAMLDecoder struct {
r io.ReadCloser
scanner *bufio.Scanner scanner *bufio.Scanner
remaining []byte remaining []byte
} }
@ -87,10 +88,11 @@ type YAMLDecoder struct {
// the YAML spec) into its own chunk. io.ErrShortBuffer will be // the YAML spec) into its own chunk. io.ErrShortBuffer will be
// returned if the entire buffer could not be read to assist // returned if the entire buffer could not be read to assist
// the caller in framing the chunk. // the caller in framing the chunk.
func NewDocumentDecoder(r io.Reader) io.Reader { func NewDocumentDecoder(r io.ReadCloser) io.ReadCloser {
scanner := bufio.NewScanner(r) scanner := bufio.NewScanner(r)
scanner.Split(splitYAMLDocument) scanner.Split(splitYAMLDocument)
return &YAMLDecoder{ return &YAMLDecoder{
r: r,
scanner: scanner, scanner: scanner,
} }
} }
@ -127,6 +129,10 @@ func (d *YAMLDecoder) Read(data []byte) (n int, err error) {
return len(data), io.ErrShortBuffer return len(data), io.ErrShortBuffer
} }
func (d *YAMLDecoder) Close() error {
return d.r.Close()
}
const yamlSeparator = "\n---" const yamlSeparator = "\n---"
// splitYAMLDocument is a bufio.SplitFunc for splitting YAML streams into individual documents. // splitYAMLDocument is a bufio.SplitFunc for splitting YAML streams into individual documents.