From 22cf8b94e6afc506f4a0bf1901883d83f19aa9b3 Mon Sep 17 00:00:00 2001 From: Clayton Coleman Date: Wed, 14 Jan 2015 00:03:22 -0500 Subject: [PATCH] Add a package for doing YAML-to-JSON streaming, one document at a time Will be moved upstream soon --- pkg/util/yaml/decoder.go | 154 ++++++++++++++++++++++++++ pkg/util/yaml/decoder_test.go | 199 ++++++++++++++++++++++++++++++++++ 2 files changed, 353 insertions(+) create mode 100644 pkg/util/yaml/decoder.go create mode 100644 pkg/util/yaml/decoder_test.go diff --git a/pkg/util/yaml/decoder.go b/pkg/util/yaml/decoder.go new file mode 100644 index 00000000000..ccab145759e --- /dev/null +++ b/pkg/util/yaml/decoder.go @@ -0,0 +1,154 @@ +/* +Copyright 2014 Google Inc. All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package yaml + +import ( + "bufio" + "bytes" + "encoding/json" + "io" + "unicode" + + "github.com/ghodss/yaml" + "github.com/golang/glog" +) + +// YAMLToJSONDecoder decodes YAML documents from an io.Reader by +// separating individual documents. It first converts the YAML +// body to JSON, then unmarshals the JSON. +type YAMLToJSONDecoder struct { + scanner *bufio.Scanner +} + +// NewYAMLToJSONDecoder decodes YAML documents from the provided +// stream in chunks by converting each document (as defined by +// the YAML spec) into its own chunk, converting it to JSON via +// yaml.YAMLToJSON, and then passing it to json.Decoder. +func NewYAMLToJSONDecoder(r io.Reader) *YAMLToJSONDecoder { + scanner := bufio.NewScanner(r) + scanner.Split(splitYAMLDocument) + return &YAMLToJSONDecoder{ + scanner: scanner, + } +} + +// Decode reads a YAML document as JSON from the stream or returns +// an error. The decoding rules match json.Unmarshal, not +// yaml.Unmarshal. +func (d *YAMLToJSONDecoder) Decode(into interface{}) error { + if d.scanner.Scan() { + data, err := yaml.YAMLToJSON(d.scanner.Bytes()) + if err != nil { + return err + } + return json.Unmarshal(data, into) + } + err := d.scanner.Err() + if err == nil { + err = io.EOF + } + return err +} + +const yamlSeparator = "\n---" + +// splitYAMLDocument is a bufio.SplitFunc for splitting YAML streams into individual documents. +func splitYAMLDocument(data []byte, atEOF bool) (advance int, token []byte, err error) { + if atEOF && len(data) == 0 { + return 0, nil, nil + } + sep := len([]byte(yamlSeparator)) + if i := bytes.Index(data, []byte(yamlSeparator)); i >= 0 { + // We have a potential document terminator + i += sep + after := data[i:] + if len(after) == 0 { + // we can't read any more characters + if atEOF { + return len(data), data[:len(data)-sep], nil + } + return 0, nil, nil + } + if j := bytes.IndexByte(after, '\n'); j >= 0 { + return i + j + 1, data[0 : i-sep], nil + } + return 0, nil, nil + } + // If we're at EOF, we have a final, non-terminated line. Return it. + if atEOF { + return len(data), data, nil + } + // Request more data. + return 0, nil, nil +} + +// decoder is a convenience interface for Decode. +type decoder interface { + Decode(into interface{}) error +} + +// YAMLOrJSONDecoder attempts to decode a stream of JSON documents or +// YAML documents by sniffing for a leading { character. +type YAMLOrJSONDecoder struct { + r io.Reader + bufferSize int + + decoder decoder +} + +// NewYAMLOrJSONDecoder returns a decoder that will process YAML documents +// or JSON documents from the given reader as a stream. bufferSize determines +// how far into the stream the decoder will look to figure out whether this +// is a JSON stream (has whitespace followed by an open brace). +func NewYAMLOrJSONDecoder(r io.Reader, bufferSize int) *YAMLOrJSONDecoder { + return &YAMLOrJSONDecoder{ + r: r, + bufferSize: bufferSize, + } +} + +// Decode unmarshals the next object from the underlying stream into the +// provide object, or returns an error. +func (d *YAMLOrJSONDecoder) Decode(into interface{}) error { + if d.decoder == nil { + buffer, isJSON := guessJSONStream(d.r, d.bufferSize) + if isJSON { + glog.V(4).Infof("decoding stream as JSON") + d.decoder = json.NewDecoder(buffer) + } else { + glog.V(4).Infof("decoding stream as YAML") + d.decoder = NewYAMLToJSONDecoder(buffer) + } + } + return d.decoder.Decode(into) +} + +// guessJSONStream scans the provided reader up to size, looking +// for an open brace indicating this is JSON. It will return the +// bufio.Reader it creates for the consumer. +func guessJSONStream(r io.Reader, size int) (io.Reader, bool) { + buffer := bufio.NewReaderSize(r, size) + b, _ := buffer.Peek(size) + return buffer, hasPrefix(b, []byte("{")) +} + +// Return true if the first non-whitespace bytes in buf is +// prefix +func hasPrefix(buf []byte, prefix []byte) bool { + buf = bytes.TrimLeftFunc(buf, unicode.IsSpace) + return bytes.HasPrefix(buf, prefix) +} diff --git a/pkg/util/yaml/decoder_test.go b/pkg/util/yaml/decoder_test.go new file mode 100644 index 00000000000..c590b5452f8 --- /dev/null +++ b/pkg/util/yaml/decoder_test.go @@ -0,0 +1,199 @@ +/* +Copyright 2014 Google Inc. All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package yaml + +import ( + "bufio" + "bytes" + "encoding/json" + "fmt" + "io" + "testing" +) + +func TestSplitYAMLDocument(t *testing.T) { + testCases := []struct { + input string + atEOF bool + expect string + adv int + }{ + {"foo", true, "foo", 3}, + {"fo", false, "", 0}, + + {"---", true, "---", 3}, + {"---\n", true, "---\n", 4}, + {"---\n", false, "", 0}, + + {"\n---\n", false, "", 5}, + {"\n---\n", true, "", 5}, + + {"abc\n---\ndef", true, "abc", 8}, + {"def", true, "def", 3}, + {"", true, "", 0}, + } + for i, testCase := range testCases { + adv, token, err := splitYAMLDocument([]byte(testCase.input), testCase.atEOF) + if err != nil { + t.Errorf("%d: unexpected error: %v", i, err) + continue + } + if adv != testCase.adv { + t.Errorf("%d: advance did not match: %d %d", i, testCase.adv, adv) + } + if testCase.expect != string(token) { + t.Errorf("%d: token did not match: %q %q", i, testCase.expect, string(token)) + } + } +} + +func TestScanYAML(t *testing.T) { + s := bufio.NewScanner(bytes.NewReader([]byte(`--- +stuff: 1 + +--- + `))) + s.Split(splitYAMLDocument) + if !s.Scan() { + t.Fatalf("should have been able to scan") + } + t.Logf("scan: %s", s.Text()) + if !s.Scan() { + t.Fatalf("should have been able to scan") + } + t.Logf("scan: %s", s.Text()) + if s.Scan() { + t.Fatalf("scan should have been done") + } + if s.Err() != nil { + t.Fatalf("err should have been nil: %v", s.Err()) + } +} + +func TestDecodeYAML(t *testing.T) { + s := NewYAMLToJSONDecoder(bytes.NewReader([]byte(`--- +stuff: 1 + +--- + `))) + obj := generic{} + if err := s.Decode(&obj); err != nil { + t.Fatalf("unexpected error: %v", err) + } + if fmt.Sprintf("%#v", obj) != `yaml.generic{"stuff":1}` { + t.Errorf("unexpected object: %#v", obj) + } + obj = generic{} + if err := s.Decode(&obj); err != nil { + t.Fatalf("unexpected error: %v", err) + } + if len(obj) != 0 { + t.Fatalf("unexpected object: %#v", obj) + } + obj = generic{} + if err := s.Decode(&obj); err != io.EOF { + t.Fatalf("unexpected error: %v", err) + } +} + +type generic map[string]interface{} + +func TestYAMLOrJSONDecoder(t *testing.T) { + testCases := []struct { + input string + buffer int + isJSON bool + err bool + out []generic + }{ + {` {"1":2}{"3":4}`, 2, true, false, []generic{ + {"1": 2}, + {"3": 4}, + }}, + {" \n{}", 3, true, false, []generic{ + {}, + }}, + {" \na: b", 2, false, false, []generic{ + {"a": "b"}, + }}, + {` \n{"a": "b"}`, 2, false, true, []generic{ + {"a": "b"}, + }}, + {` {"a":"b"}`, 100, true, false, []generic{ + {"a": "b"}, + }}, + {"", 1, false, false, []generic{}}, + {"foo: bar\n---\nbaz: biz", 100, false, false, []generic{ + {"foo": "bar"}, + {"baz": "biz"}, + }}, + {"foo: bar\n---\n", 100, false, false, []generic{ + {"foo": "bar"}, + }}, + {"foo: bar\n---", 100, false, false, []generic{ + {"foo": "bar"}, + }}, + {"foo: bar\n--", 100, false, true, []generic{ + {"foo": "bar"}, + }}, + {"foo: bar\n-", 100, false, true, []generic{ + {"foo": "bar"}, + }}, + {"foo: bar\n", 100, false, false, []generic{ + {"foo": "bar"}, + }}, + } + for i, testCase := range testCases { + decoder := NewYAMLOrJSONDecoder(bytes.NewReader([]byte(testCase.input)), testCase.buffer) + objs := []generic{} + + var err error + for { + out := make(generic) + err = decoder.Decode(&out) + if err != nil { + break + } + objs = append(objs, out) + } + if err != io.EOF { + switch { + case testCase.err && err == nil: + t.Errorf("%d: unexpected non-error", i) + continue + case !testCase.err && err != nil: + t.Errorf("%d: unexpected error: %v", i, err) + continue + case err != nil: + continue + } + } + switch decoder.decoder.(type) { + case *YAMLToJSONDecoder: + if testCase.isJSON { + t.Errorf("%d: expected JSON decoder, got YAML", i) + } + case *json.Decoder: + if !testCase.isJSON { + t.Errorf("%d: expected YAML decoder, got JSON", i) + } + } + if fmt.Sprintf("%#v", testCase.out) != fmt.Sprintf("%#v", objs) { + t.Errorf("%d: objects were not equal: \n%#v\n%#v", i, testCase.out, objs) + } + } +}