From 2d064083001a7a7f7dfc8a5f8f2fc50582449bf6 Mon Sep 17 00:00:00 2001 From: AdoHe Date: Sun, 14 Aug 2016 16:02:43 +0800 Subject: [PATCH] use Reader.ReadLine instead of bufio.Scanner to support bigger yaml --- pkg/util/yaml/decoder.go | 79 ++++++++++++++++++++++++++++++----- pkg/util/yaml/decoder_test.go | 73 +++++++++++++++++++++++++++++--- 2 files changed, 137 insertions(+), 15 deletions(-) diff --git a/pkg/util/yaml/decoder.go b/pkg/util/yaml/decoder.go index 6a9f05a5874..c65c2d6ba34 100644 --- a/pkg/util/yaml/decoder.go +++ b/pkg/util/yaml/decoder.go @@ -45,7 +45,7 @@ func ToJSON(data []byte) ([]byte, error) { // separating individual documents. It first converts the YAML // body to JSON, then unmarshals the JSON. type YAMLToJSONDecoder struct { - scanner *bufio.Scanner + reader Reader } // NewYAMLToJSONDecoder decodes YAML documents from the provided @@ -53,10 +53,9 @@ type YAMLToJSONDecoder struct { // the YAML spec) into its own chunk, converting it to JSON via // yaml.YAMLToJSON, and then passing it to json.Decoder. func NewYAMLToJSONDecoder(r io.Reader) *YAMLToJSONDecoder { - scanner := bufio.NewScanner(r) - scanner.Split(splitYAMLDocument) + reader := bufio.NewReader(r) return &YAMLToJSONDecoder{ - scanner: scanner, + reader: NewYAMLReader(reader), } } @@ -64,17 +63,18 @@ func NewYAMLToJSONDecoder(r io.Reader) *YAMLToJSONDecoder { // an error. The decoding rules match json.Unmarshal, not // yaml.Unmarshal. func (d *YAMLToJSONDecoder) Decode(into interface{}) error { - if d.scanner.Scan() { - data, err := yaml.YAMLToJSON(d.scanner.Bytes()) + bytes, err := d.reader.Read() + if err != nil && err != io.EOF { + return err + } + + if len(bytes) != 0 { + data, err := yaml.YAMLToJSON(bytes) if err != nil { return err } return json.Unmarshal(data, into) } - err := d.scanner.Err() - if err == nil { - err = io.EOF - } return err } @@ -137,6 +137,7 @@ func (d *YAMLDecoder) Close() error { } const yamlSeparator = "\n---" +const separator = "---\n" // splitYAMLDocument is a bufio.SplitFunc for splitting YAML streams into individual documents. func splitYAMLDocument(data []byte, atEOF bool) (advance int, token []byte, err error) { @@ -222,6 +223,64 @@ func (d *YAMLOrJSONDecoder) Decode(into interface{}) error { return err } +type Reader interface { + Read() ([]byte, error) +} + +type YAMLReader struct { + reader Reader +} + +func NewYAMLReader(r *bufio.Reader) *YAMLReader { + return &YAMLReader{ + reader: &LineReader{reader: r}, + } +} + +// Read returns a full YAML document. +func (r *YAMLReader) Read() ([]byte, error) { + var buffer bytes.Buffer + for { + line, err := r.reader.Read() + if err != nil && err != io.EOF { + return nil, err + } + + if string(line) == separator || err == io.EOF { + if buffer.Len() != 0 { + return buffer.Bytes(), nil + } + if err == io.EOF { + return nil, err + } + } else { + buffer.Write(line) + } + } +} + +type LineReader struct { + reader *bufio.Reader +} + +// Read returns a single line (with '\n' ended) from the underlying reader. +// An error is returned iff there is an error with the underlying reader. +func (r *LineReader) Read() ([]byte, error) { + var ( + isPrefix bool = true + err error = nil + line []byte + buffer bytes.Buffer + ) + + for isPrefix && err == nil { + line, isPrefix, err = r.reader.ReadLine() + buffer.Write(line) + } + buffer.WriteByte('\n') + return buffer.Bytes(), err +} + // GuessJSONStream scans the provided reader up to size, looking // for an open brace indicating this is JSON. It will return the // bufio.Reader it creates for the consumer. diff --git a/pkg/util/yaml/decoder_test.go b/pkg/util/yaml/decoder_test.go index 6d66d54e60d..bc8c7698c31 100644 --- a/pkg/util/yaml/decoder_test.go +++ b/pkg/util/yaml/decoder_test.go @@ -22,6 +22,8 @@ import ( "encoding/json" "fmt" "io" + "math/rand" + "reflect" "strings" "testing" ) @@ -81,7 +83,7 @@ func TestScanYAML(t *testing.T) { s := bufio.NewScanner(bytes.NewReader([]byte(`--- stuff: 1 ---- +--- `))) s.Split(splitYAMLDocument) if !s.Scan() { @@ -104,7 +106,7 @@ func TestDecodeYAML(t *testing.T) { s := NewYAMLToJSONDecoder(bytes.NewReader([]byte(`--- stuff: 1 ---- +--- `))) obj := generic{} if err := s.Decode(&obj); err != nil { @@ -131,15 +133,16 @@ func TestDecodeBrokenYAML(t *testing.T) { stuff: 1 test-foo: 1 ---- +--- `)), 100) obj := generic{} err := s.Decode(&obj) if err == nil { t.Fatal("expected error with yaml: prefix, got no error") } - if !strings.HasPrefix(err.Error(), "yaml: line 2:") { - t.Fatalf("expected %q to have 'yaml: line 2:' prefix", err.Error()) + fmt.Printf("err: %s", err.Error()) + if !strings.HasPrefix(err.Error(), "yaml: line 1:") { + t.Fatalf("expected %q to have 'yaml: line 1:' prefix", err.Error()) } } @@ -251,3 +254,63 @@ func TestYAMLOrJSONDecoder(t *testing.T) { } } } + +func TestReadSingleLongLine(t *testing.T) { + testReadLines(t, []int{128 * 1024}) +} + +func TestReadRandomLineLengths(t *testing.T) { + minLength := 100 + maxLength := 96 * 1024 + maxLines := 100 + + lineLengths := make([]int, maxLines) + for i := 0; i < maxLines; i++ { + lineLengths[i] = rand.Intn(maxLength-minLength) + minLength + } + + testReadLines(t, lineLengths) +} + +func testReadLines(t *testing.T, lineLengths []int) { + var ( + lines [][]byte + inputStream []byte + ) + for _, lineLength := range lineLengths { + inputLine := make([]byte, lineLength+1) + for i := 0; i < lineLength; i++ { + char := rand.Intn('z'-'A') + 'A' + inputLine[i] = byte(char) + } + inputLine[len(inputLine)-1] = '\n' + lines = append(lines, inputLine) + } + for _, line := range lines { + inputStream = append(inputStream, line...) + } + + // init Reader + reader := bufio.NewReader(bytes.NewReader(inputStream)) + lineReader := &LineReader{reader: reader} + + // read lines + var readLines [][]byte + for range lines { + bytes, err := lineReader.Read() + if err != nil && err != io.EOF { + t.Fatalf("failed to read lines: %v", err) + } + readLines = append(readLines, bytes) + } + + // validate + for i := range lines { + if len(lines[i]) != len(readLines[i]) { + t.Fatalf("expected line length: %d, but got %d", len(lines[i]), len(readLines[i])) + } + if !reflect.DeepEqual(lines[i], readLines[i]) { + t.Fatalf("expected line: %v, but got %v", lines[i], readLines[i]) + } + } +}