diff --git a/pkg/runtime/serializer/streaming/streaming.go b/pkg/runtime/serializer/streaming/streaming.go index 1598f660211..b7daf774d26 100644 --- a/pkg/runtime/serializer/streaming/streaming.go +++ b/pkg/runtime/serializer/streaming/streaming.go @@ -20,6 +20,7 @@ package streaming import ( "bytes" + "fmt" "io" "k8s.io/kubernetes/pkg/api/unversioned" @@ -52,9 +53,11 @@ type Serializer interface { } type decoder struct { - reader io.Reader - decoder runtime.Decoder - buf []byte + reader io.Reader + decoder runtime.Decoder + buf []byte + maxBytes int + resetRead bool } // NewDecoder creates a streaming decoder that reads object chunks from r and decodes them with d. @@ -62,22 +65,50 @@ type decoder struct { // an entire object. func NewDecoder(r io.Reader, d runtime.Decoder) Decoder { return &decoder{ - reader: r, - decoder: d, - buf: make([]byte, 1024*1024), + reader: r, + decoder: d, + buf: make([]byte, 1024), + maxBytes: 1024 * 1024, } } +var ErrObjectTooLarge = fmt.Errorf("object to decode was longer than maximum allowed size") + // Decode reads the next object from the stream and decodes it. func (d *decoder) Decode(defaults *unversioned.GroupVersionKind, into runtime.Object) (runtime.Object, *unversioned.GroupVersionKind, error) { - // TODO: instead of depending on a fixed sized buffer, we should handle ErrShortRead specially and - // grow the buffer capacity up to a maximum amount. Requires the framer to allow repeated reads to - // the stream until the frame is finished. - n, err := d.reader.Read(d.buf) - if err != nil { - return nil, nil, err + base := 0 + for { + n, err := d.reader.Read(d.buf[base:]) + if err == io.ErrShortBuffer { + if n == 0 { + return nil, nil, fmt.Errorf("got short buffer with n=0, base=%d, cap=%d", base, cap(d.buf)) + } + if d.resetRead { + continue + } + // double the buffer size up to maxBytes + if cap(d.buf) < d.maxBytes { + base += n + d.buf = append(d.buf, make([]byte, cap(d.buf))...) + continue + } + // must read the rest of the frame (until we stop getting ErrShortBuffer) + d.resetRead = true + base = 0 + return nil, nil, ErrObjectTooLarge + } + if err != nil { + return nil, nil, err + } + if d.resetRead { + // now that we have drained the large read, continue + d.resetRead = false + continue + } + base += n + break } - return d.decoder.Decode(d.buf[:n], defaults, into) + return d.decoder.Decode(d.buf[:base], defaults, into) } type encoder struct { diff --git a/pkg/runtime/serializer/streaming/streaming_test.go b/pkg/runtime/serializer/streaming/streaming_test.go new file mode 100644 index 00000000000..a5a7f0d5c1b --- /dev/null +++ b/pkg/runtime/serializer/streaming/streaming_test.go @@ -0,0 +1,83 @@ +/* +Copyright 2016 The Kubernetes Authors All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package streaming + +import ( + "bytes" + "io" + "testing" + + "k8s.io/kubernetes/pkg/api/unversioned" + "k8s.io/kubernetes/pkg/runtime" + "k8s.io/kubernetes/pkg/util/framer" +) + +type fakeDecoder struct { + got []byte + obj runtime.Object + err error +} + +func (d *fakeDecoder) Decode(data []byte, gvk *unversioned.GroupVersionKind, into runtime.Object) (runtime.Object, *unversioned.GroupVersionKind, error) { + d.got = data + return d.obj, nil, d.err +} + +func TestEmptyDecoder(t *testing.T) { + buf := bytes.NewBuffer([]byte{}) + d := &fakeDecoder{} + _, _, err := NewDecoder(buf, d).Decode(nil, nil) + if err != io.EOF { + t.Fatal(err) + } +} + +func TestDecoder(t *testing.T) { + frames := [][]byte{ + make([]byte, 1025), + make([]byte, 1024*5), + make([]byte, 1024*1024*5), + make([]byte, 1025), + } + pr, pw := io.Pipe() + fw := framer.NewLengthDelimitedFrameWriter(pw) + go func() { + for i := range frames { + fw.Write(frames[i]) + } + pw.Close() + }() + + r := framer.NewLengthDelimitedFrameReader(pr) + d := &fakeDecoder{} + dec := NewDecoder(r, d) + if _, _, err := dec.Decode(nil, nil); err != nil || !bytes.Equal(d.got, frames[0]) { + t.Fatalf("unexpected %v %v", err, len(d.got)) + } + if _, _, err := dec.Decode(nil, nil); err != nil || !bytes.Equal(d.got, frames[1]) { + t.Fatalf("unexpected %v %v", err, len(d.got)) + } + if _, _, err := dec.Decode(nil, nil); err != ErrObjectTooLarge || !bytes.Equal(d.got, frames[1]) { + t.Fatalf("unexpected %v %v", err, len(d.got)) + } + if _, _, err := dec.Decode(nil, nil); err != nil || !bytes.Equal(d.got, frames[3]) { + t.Fatalf("unexpected %v %v", err, len(d.got)) + } + if _, _, err := dec.Decode(nil, nil); err != io.EOF { + t.Fatalf("unexpected %v %v", err, len(d.got)) + } +} diff --git a/pkg/util/framer/framer.go b/pkg/util/framer/framer.go new file mode 100644 index 00000000000..254e8865737 --- /dev/null +++ b/pkg/util/framer/framer.go @@ -0,0 +1,156 @@ +/* +Copyright 2015 The Kubernetes Authors All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +// Package framer implements simple frame decoding techniques for an io.ReadCloser +package framer + +import ( + "encoding/binary" + "encoding/json" + "io" +) + +type lengthDelimitedFrameWriter struct { + w io.Writer +} + +func NewLengthDelimitedFrameWriter(w io.Writer) io.Writer { + return &lengthDelimitedFrameWriter{w: w} +} + +// Write writes a single frame to the nested writer, prepending it with the length in +// in bytes of data (as a 4 byte, bigendian uint32). +func (w *lengthDelimitedFrameWriter) Write(data []byte) (int, error) { + header := [4]byte{} + binary.BigEndian.PutUint32(header[:], uint32(len(data))) + n, err := w.w.Write(header[:]) + if err != nil { + return 0, err + } + if n != len(header) { + return 0, io.ErrShortWrite + } + return w.w.Write(data) +} + +type lengthDelimitedFrameReader struct { + r io.Reader + remaining int +} + +// NewLengthDelimitedFrameReader returns an io.Reader that will decode length-prefixed +// frames off of a stream. +// +// The protocol is: +// +// stream: message ... +// message: prefix body +// prefix: 4 byte uint32 in BigEndian order, denotes length of body +// body: bytes (0..prefix) +// +// If the buffer passed to Read is not long enough to contain an entire frame, io.ErrShortRead +// will be returned along with the number of bytes read. +func NewLengthDelimitedFrameReader(r io.Reader) io.Reader { + return &lengthDelimitedFrameReader{r: r} +} + +// Read attempts to read an entire frame into data. If that is not possible, io.ErrShortBuffer +// is returned and subsequent calls will attempt to read the last frame. A frame is complete when +// err is nil. +func (r *lengthDelimitedFrameReader) Read(data []byte) (int, error) { + if r.remaining <= 0 { + header := [4]byte{} + n, err := io.ReadAtLeast(r.r, header[:4], 4) + if err != nil { + return 0, err + } + if n != 4 { + return 0, io.ErrUnexpectedEOF + } + frameLength := int(binary.BigEndian.Uint32(header[:])) + r.remaining = frameLength + } + + expect := r.remaining + max := expect + if max > len(data) { + max = len(data) + } + n, err := io.ReadAtLeast(r.r, data[:max], int(max)) + r.remaining -= n + if err == io.ErrShortBuffer || r.remaining > 0 { + return n, io.ErrShortBuffer + } + if err != nil { + return n, err + } + if n != expect { + return n, io.ErrUnexpectedEOF + } + + return n, nil +} + +type jsonFrameReader struct { + decoder *json.Decoder + remaining []byte +} + +// NewJSONFramedReader returns an io.Reader that will decode individual JSON objects off +// of a wire. +// +// The boundaries between each frame are valid JSON objects. A JSON parsing error will terminate +// the read. +func NewJSONFramedReader(r io.Reader) io.Reader { + return &jsonFrameReader{ + decoder: json.NewDecoder(r), + } +} + +// ReadFrame decodes the next JSON object in the stream, or returns an error. The returned +// byte slice will be modified the next time ReadFrame is invoked and should not be altered. +func (r *jsonFrameReader) Read(data []byte) (int, error) { + // Return whatever remaining data exists from an in progress frame + if n := len(r.remaining); n > 0 { + if n <= cap(data) { + data = append(data[0:0], r.remaining...) + r.remaining = nil + return n, nil + } + + n = cap(data) + data = append(data[0:0], r.remaining[:n]...) + r.remaining = r.remaining[n:] + return n, io.ErrShortBuffer + } + + // RawMessage#Unmarshal appends to data - we reset the slice down to 0 and will either see + // data written to data, or be larger than data and a different array. + m := json.RawMessage(data[:0]) + if err := r.decoder.Decode(&m); err != nil { + return 0, err + } + + // If capacity of data is less than length of the message, decoder will allocate a new slice + // and set m to it, which means we need to copy the partial result back into data and preserve + // the remaining result for subsequent reads. + if n := cap(data); len(m) > n { + data = append(data[0:0], m[:n]...) + r.remaining = m[n:] + return n, io.ErrShortBuffer + } + return len(m), nil +} diff --git a/pkg/util/framer/framer_test.go b/pkg/util/framer/framer_test.go new file mode 100644 index 00000000000..83f712a6efc --- /dev/null +++ b/pkg/util/framer/framer_test.go @@ -0,0 +1,175 @@ +/* +Copyright 2016 The Kubernetes Authors All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package framer + +import ( + "bytes" + "io" + "testing" +) + +func TestRead(t *testing.T) { + data := []byte{ + 0x00, 0x00, 0x00, 0x04, + 0x01, 0x02, 0x03, 0x04, + 0x00, 0x00, 0x00, 0x03, + 0x05, 0x06, 0x07, + 0x00, 0x00, 0x00, 0x00, + 0x00, 0x00, 0x00, 0x01, + 0x08, + } + b := bytes.NewBuffer(data) + r := NewLengthDelimitedFrameReader(b) + buf := make([]byte, 1) + if n, err := r.Read(buf); err != io.ErrShortBuffer && n != 1 && bytes.Equal(buf, []byte{0x01}) { + t.Fatalf("unexpected: %v %d %v", err, n, buf) + } + if n, err := r.Read(buf); err != io.ErrShortBuffer && n != 1 && bytes.Equal(buf, []byte{0x02}) { + t.Fatalf("unexpected: %v %d %v", err, n, buf) + } + // read the remaining frame + buf = make([]byte, 2) + if n, err := r.Read(buf); err != nil && n != 2 && bytes.Equal(buf, []byte{0x03, 0x04}) { + t.Fatalf("unexpected: %v %d %v", err, n, buf) + } + // read with buffer equal to frame + buf = make([]byte, 3) + if n, err := r.Read(buf); err != nil && n != 3 && bytes.Equal(buf, []byte{0x05, 0x06, 0x07}) { + t.Fatalf("unexpected: %v %d %v", err, n, buf) + } + // read empty frame + buf = make([]byte, 3) + if n, err := r.Read(buf); err != nil && n != 0 && bytes.Equal(buf, []byte{}) { + t.Fatalf("unexpected: %v %d %v", err, n, buf) + } + // read with larger buffer than frame + buf = make([]byte, 3) + if n, err := r.Read(buf); err != nil && n != 1 && bytes.Equal(buf, []byte{0x08}) { + t.Fatalf("unexpected: %v %d %v", err, n, buf) + } + // read EOF + if n, err := r.Read(buf); err != io.EOF && n != 0 { + t.Fatalf("unexpected: %v %d", err, n) + } +} + +func TestReadLarge(t *testing.T) { + data := []byte{ + 0x00, 0x00, 0x00, 0x04, + 0x01, 0x02, 0x03, 0x04, + 0x00, 0x00, 0x00, 0x03, + 0x05, 0x06, 0x07, + 0x00, 0x00, 0x00, 0x00, + 0x00, 0x00, 0x00, 0x01, + 0x08, + } + b := bytes.NewBuffer(data) + r := NewLengthDelimitedFrameReader(b) + buf := make([]byte, 40) + if n, err := r.Read(buf); err != nil && n != 4 && bytes.Equal(buf, []byte{0x01, 0x02, 0x03, 0x04}) { + t.Fatalf("unexpected: %v %d %v", err, n, buf) + } + if n, err := r.Read(buf); err != nil && n != 3 && bytes.Equal(buf, []byte{0x05, 0x06, 0x7}) { + t.Fatalf("unexpected: %v %d %v", err, n, buf) + } + if n, err := r.Read(buf); err != nil && n != 0 && bytes.Equal(buf, []byte{}) { + t.Fatalf("unexpected: %v %d %v", err, n, buf) + } + if n, err := r.Read(buf); err != nil && n != 1 && bytes.Equal(buf, []byte{0x08}) { + t.Fatalf("unexpected: %v %d %v", err, n, buf) + } + // read EOF + if n, err := r.Read(buf); err != io.EOF && n != 0 { + t.Fatalf("unexpected: %v %d", err, n) + } +} +func TestReadInvalidFrame(t *testing.T) { + data := []byte{ + 0x00, 0x00, 0x00, 0x04, + 0x01, 0x02, + } + b := bytes.NewBuffer(data) + r := NewLengthDelimitedFrameReader(b) + buf := make([]byte, 1) + if n, err := r.Read(buf); err != io.ErrShortBuffer && n != 1 && bytes.Equal(buf, []byte{0x01}) { + t.Fatalf("unexpected: %v %d %v", err, n, buf) + } + // read the remaining frame + buf = make([]byte, 3) + if n, err := r.Read(buf); err != io.ErrUnexpectedEOF && n != 1 && bytes.Equal(buf, []byte{0x02}) { + t.Fatalf("unexpected: %v %d %v", err, n, buf) + } + // read EOF + if n, err := r.Read(buf); err != io.EOF && n != 0 { + t.Fatalf("unexpected: %v %d", err, n) + } +} + +func TestJSONFrameReader(t *testing.T) { + b := bytes.NewBufferString("{\"test\":true}\n1\n[\"a\"]") + r := NewJSONFramedReader(b) + buf := make([]byte, 20) + if n, err := r.Read(buf); err != nil || n != 13 || string(buf[:n]) != `{"test":true}` { + t.Fatalf("unexpected: %v %d %q", err, n, buf) + } + if n, err := r.Read(buf); err != nil || n != 1 || string(buf[:n]) != `1` { + t.Fatalf("unexpected: %v %d %q", err, n, buf) + } + if n, err := r.Read(buf); err != nil || n != 5 || string(buf[:n]) != `["a"]` { + t.Fatalf("unexpected: %v %d %q", err, n, buf) + } + if n, err := r.Read(buf); err != io.EOF || n != 0 { + t.Fatalf("unexpected: %v %d %q", err, n, buf) + } +} + +func TestJSONFrameReaderShortBuffer(t *testing.T) { + b := bytes.NewBufferString("{\"test\":true}\n1\n[\"a\"]") + r := NewJSONFramedReader(b) + buf := make([]byte, 0, 3) + + if n, err := r.Read(buf); err != io.ErrShortBuffer || n != 3 || string(buf[:n]) != `{"t` { + t.Fatalf("unexpected: %v %d %q", err, n, buf) + } + if n, err := r.Read(buf); err != io.ErrShortBuffer || n != 3 || string(buf[:n]) != `est` { + t.Fatalf("unexpected: %v %d %q", err, n, buf) + } + if n, err := r.Read(buf); err != io.ErrShortBuffer || n != 3 || string(buf[:n]) != `":t` { + t.Fatalf("unexpected: %v %d %q", err, n, buf) + } + if n, err := r.Read(buf); err != io.ErrShortBuffer || n != 3 || string(buf[:n]) != `rue` { + t.Fatalf("unexpected: %v %d %q", err, n, buf) + } + if n, err := r.Read(buf); err != nil || n != 1 || string(buf[:n]) != `}` { + t.Fatalf("unexpected: %v %d %q", err, n, buf) + } + + if n, err := r.Read(buf); err != nil || n != 1 || string(buf[:n]) != `1` { + t.Fatalf("unexpected: %v %d %q", err, n, buf) + } + + if n, err := r.Read(buf); err != io.ErrShortBuffer || n != 3 || string(buf[:n]) != `["a` { + t.Fatalf("unexpected: %v %d %q", err, n, buf) + } + if n, err := r.Read(buf); err != nil || n != 2 || string(buf[:n]) != `"]` { + t.Fatalf("unexpected: %v %d %q", err, n, buf) + } + + if n, err := r.Read(buf); err != io.EOF || n != 0 { + t.Fatalf("unexpected: %v %d %q", err, n, buf) + } +}