diff --git a/pkg/api/serialization_test.go b/pkg/api/serialization_test.go index 676a0ea62f9..8cdbf664d83 100644 --- a/pkg/api/serialization_test.go +++ b/pkg/api/serialization_test.go @@ -17,8 +17,10 @@ limitations under the License. package api_test import ( + "bytes" "encoding/hex" "encoding/json" + "io/ioutil" "math/rand" "reflect" "strings" @@ -36,8 +38,11 @@ import ( "k8s.io/kubernetes/pkg/api/unversioned" "k8s.io/kubernetes/pkg/api/v1" "k8s.io/kubernetes/pkg/runtime" + "k8s.io/kubernetes/pkg/runtime/serializer/streaming" "k8s.io/kubernetes/pkg/util/diff" "k8s.io/kubernetes/pkg/util/sets" + "k8s.io/kubernetes/pkg/watch" + "k8s.io/kubernetes/pkg/watch/versioned" ) var fuzzIters = flag.Int("fuzz-iters", 20, "How many fuzzing iterations to do.") @@ -269,6 +274,86 @@ func TestUnversionedTypes(t *testing.T) { } } +func TestObjectWatchFraming(t *testing.T) { + f := apitesting.FuzzerFor(nil, api.SchemeGroupVersion, rand.NewSource(benchmarkSeed)) + secret := &api.Secret{} + f.Fuzz(secret) + secret.Data["binary"] = []byte{0x00, 0x10, 0x30, 0x55, 0xff, 0x00} + secret.Data["utf8"] = []byte("a string with \u0345 characters") + secret.Data["long"] = bytes.Repeat([]byte{0x01, 0x02, 0x03, 0x00}, 1000) + converted, _ := api.Scheme.ConvertToVersion(secret, "v1") + v1secret := converted.(*v1.Secret) + for _, streamingMediaType := range api.Codecs.SupportedStreamingMediaTypes() { + s, framer, mediaType, _ := api.Codecs.StreamingSerializerForMediaType(streamingMediaType, nil) + // TODO: remove this when the runtime.SerializerInfo PR lands + if mediaType == "application/vnd.kubernetes.protobuf;stream=watch" { + mediaType = "application/vnd.kubernetes.protobuf" + } + embedded, ok := api.Codecs.SerializerForMediaType(mediaType, nil) + if !ok { + t.Logf("no embedded serializer for %s", mediaType) + embedded = s + } + innerDecode := api.Codecs.DecoderToVersion(embedded, api.SchemeGroupVersion) + //innerEncode := api.Codecs.EncoderForVersion(embedded, api.SchemeGroupVersion) + + // write a single object through the framer and back out + obj := &bytes.Buffer{} + if err := s.EncodeToStream(v1secret, obj); err != nil { + t.Fatal(err) + } + out := &bytes.Buffer{} + w := framer.NewFrameWriter(out) + if n, err := w.Write(obj.Bytes()); err != nil || n != len(obj.Bytes()) { + t.Fatal(err) + } + sr := streaming.NewDecoder(framer.NewFrameReader(ioutil.NopCloser(out)), s) + resultSecret := &v1.Secret{} + res, _, err := sr.Decode(nil, resultSecret) + if err != nil { + t.Fatalf("%v:\n%s", err, hex.Dump(obj.Bytes())) + } + resultSecret.Kind = "Secret" + resultSecret.APIVersion = "v1" + if !api.Semantic.DeepEqual(v1secret, res) { + t.Fatalf("objects did not match: %s", diff.ObjectGoPrintDiff(v1secret, res)) + } + + // write a watch event through and back out + obj = &bytes.Buffer{} + if err := embedded.EncodeToStream(v1secret, obj); err != nil { + t.Fatal(err) + } + event := &versioned.Event{Type: string(watch.Added)} + event.Object.Raw = obj.Bytes() + obj = &bytes.Buffer{} + if err := s.EncodeToStream(event, obj); err != nil { + t.Fatal(err) + } + out = &bytes.Buffer{} + w = framer.NewFrameWriter(out) + if n, err := w.Write(obj.Bytes()); err != nil || n != len(obj.Bytes()) { + t.Fatal(err) + } + sr = streaming.NewDecoder(framer.NewFrameReader(ioutil.NopCloser(out)), s) + outEvent := &versioned.Event{} + res, _, err = sr.Decode(nil, outEvent) + if err != nil || outEvent.Type != string(watch.Added) { + t.Fatalf("%v: %#v", err, outEvent) + } + if outEvent.Object.Object == nil && outEvent.Object.Raw != nil { + outEvent.Object.Object, err = runtime.Decode(innerDecode, outEvent.Object.Raw) + if err != nil { + t.Fatalf("%v:\n%s", err, hex.Dump(outEvent.Object.Raw)) + } + } + + if !api.Semantic.DeepEqual(secret, outEvent.Object.Object) { + t.Fatalf("%s: did not match after frame decoding: %s", streamingMediaType, diff.ObjectGoPrintDiff(secret, outEvent.Object.Object)) + } + } +} + const benchmarkSeed = 100 func benchmarkItems() []v1.Pod { diff --git a/pkg/runtime/serializer/streaming/streaming.go b/pkg/runtime/serializer/streaming/streaming.go index 8bc39d66e6b..6c5ff056326 100644 --- a/pkg/runtime/serializer/streaming/streaming.go +++ b/pkg/runtime/serializer/streaming/streaming.go @@ -83,9 +83,9 @@ func (d *decoder) Decode(defaults *unversioned.GroupVersionKind, into runtime.Ob continue } // double the buffer size up to maxBytes - if cap(d.buf) < d.maxBytes { + if len(d.buf) < d.maxBytes { base += n - d.buf = append(d.buf, make([]byte, cap(d.buf))...) + d.buf = append(d.buf, make([]byte, len(d.buf))...) continue } // must read the rest of the frame (until we stop getting ErrShortBuffer) diff --git a/pkg/util/framer/framer.go b/pkg/util/framer/framer.go index b2a0269c505..1e886e818f6 100644 --- a/pkg/util/framer/framer.go +++ b/pkg/util/framer/framer.go @@ -131,13 +131,13 @@ func NewJSONFramedReader(r io.ReadCloser) io.ReadCloser { func (r *jsonFrameReader) Read(data []byte) (int, error) { // Return whatever remaining data exists from an in progress frame if n := len(r.remaining); n > 0 { - if n <= cap(data) { + if n <= len(data) { data = append(data[0:0], r.remaining...) r.remaining = nil return n, nil } - n = cap(data) + n = len(data) data = append(data[0:0], r.remaining[:n]...) r.remaining = r.remaining[n:] return n, io.ErrShortBuffer diff --git a/pkg/util/framer/framer_test.go b/pkg/util/framer/framer_test.go index 661cab43ea5..c5e4ba874ee 100644 --- a/pkg/util/framer/framer_test.go +++ b/pkg/util/framer/framer_test.go @@ -141,7 +141,7 @@ func TestJSONFrameReader(t *testing.T) { func TestJSONFrameReaderShortBuffer(t *testing.T) { b := bytes.NewBufferString("{\"test\":true}\n1\n[\"a\"]") r := NewJSONFramedReader(ioutil.NopCloser(b)) - buf := make([]byte, 0, 3) + buf := make([]byte, 3) if n, err := r.Read(buf); err != io.ErrShortBuffer || n != 3 || string(buf[:n]) != `{"t` { t.Fatalf("unexpected: %v %d %q", err, n, buf)