Merge pull request #23265 from AdoHe/big_yaml

Automatic merge from submit-queue

use Reader.ReadLine instead of bufio.Scanner to support bigger yaml

@smarterclayton ptal. Also refer #19603 #23125 for more details.
This commit is contained in:
Kubernetes Submit Queue 2016-08-17 02:26:46 -07:00 committed by GitHub
commit 7b49d0c19d
2 changed files with 137 additions and 15 deletions

View File

@ -45,7 +45,7 @@ func ToJSON(data []byte) ([]byte, error) {
// separating individual documents. It first converts the YAML
// body to JSON, then unmarshals the JSON.
type YAMLToJSONDecoder struct {
scanner *bufio.Scanner
reader Reader
}
// NewYAMLToJSONDecoder decodes YAML documents from the provided
@ -53,10 +53,9 @@ type YAMLToJSONDecoder struct {
// the YAML spec) into its own chunk, converting it to JSON via
// yaml.YAMLToJSON, and then passing it to json.Decoder.
func NewYAMLToJSONDecoder(r io.Reader) *YAMLToJSONDecoder {
scanner := bufio.NewScanner(r)
scanner.Split(splitYAMLDocument)
reader := bufio.NewReader(r)
return &YAMLToJSONDecoder{
scanner: scanner,
reader: NewYAMLReader(reader),
}
}
@ -64,17 +63,18 @@ func NewYAMLToJSONDecoder(r io.Reader) *YAMLToJSONDecoder {
// an error. The decoding rules match json.Unmarshal, not
// yaml.Unmarshal.
func (d *YAMLToJSONDecoder) Decode(into interface{}) error {
if d.scanner.Scan() {
data, err := yaml.YAMLToJSON(d.scanner.Bytes())
bytes, err := d.reader.Read()
if err != nil && err != io.EOF {
return err
}
if len(bytes) != 0 {
data, err := yaml.YAMLToJSON(bytes)
if err != nil {
return err
}
return json.Unmarshal(data, into)
}
err := d.scanner.Err()
if err == nil {
err = io.EOF
}
return err
}
@ -137,6 +137,7 @@ func (d *YAMLDecoder) Close() error {
}
const yamlSeparator = "\n---"
const separator = "---\n"
// splitYAMLDocument is a bufio.SplitFunc for splitting YAML streams into individual documents.
func splitYAMLDocument(data []byte, atEOF bool) (advance int, token []byte, err error) {
@ -222,6 +223,64 @@ func (d *YAMLOrJSONDecoder) Decode(into interface{}) error {
return err
}
type Reader interface {
Read() ([]byte, error)
}
type YAMLReader struct {
reader Reader
}
func NewYAMLReader(r *bufio.Reader) *YAMLReader {
return &YAMLReader{
reader: &LineReader{reader: r},
}
}
// Read returns a full YAML document.
func (r *YAMLReader) Read() ([]byte, error) {
var buffer bytes.Buffer
for {
line, err := r.reader.Read()
if err != nil && err != io.EOF {
return nil, err
}
if string(line) == separator || err == io.EOF {
if buffer.Len() != 0 {
return buffer.Bytes(), nil
}
if err == io.EOF {
return nil, err
}
} else {
buffer.Write(line)
}
}
}
type LineReader struct {
reader *bufio.Reader
}
// Read returns a single line (with '\n' ended) from the underlying reader.
// An error is returned iff there is an error with the underlying reader.
func (r *LineReader) Read() ([]byte, error) {
var (
isPrefix bool = true
err error = nil
line []byte
buffer bytes.Buffer
)
for isPrefix && err == nil {
line, isPrefix, err = r.reader.ReadLine()
buffer.Write(line)
}
buffer.WriteByte('\n')
return buffer.Bytes(), err
}
// GuessJSONStream scans the provided reader up to size, looking
// for an open brace indicating this is JSON. It will return the
// bufio.Reader it creates for the consumer.

View File

@ -22,6 +22,8 @@ import (
"encoding/json"
"fmt"
"io"
"math/rand"
"reflect"
"strings"
"testing"
)
@ -81,7 +83,7 @@ func TestScanYAML(t *testing.T) {
s := bufio.NewScanner(bytes.NewReader([]byte(`---
stuff: 1
---
---
`)))
s.Split(splitYAMLDocument)
if !s.Scan() {
@ -104,7 +106,7 @@ func TestDecodeYAML(t *testing.T) {
s := NewYAMLToJSONDecoder(bytes.NewReader([]byte(`---
stuff: 1
---
---
`)))
obj := generic{}
if err := s.Decode(&obj); err != nil {
@ -131,15 +133,16 @@ func TestDecodeBrokenYAML(t *testing.T) {
stuff: 1
test-foo: 1
---
---
`)), 100)
obj := generic{}
err := s.Decode(&obj)
if err == nil {
t.Fatal("expected error with yaml: prefix, got no error")
}
if !strings.HasPrefix(err.Error(), "yaml: line 2:") {
t.Fatalf("expected %q to have 'yaml: line 2:' prefix", err.Error())
fmt.Printf("err: %s", err.Error())
if !strings.HasPrefix(err.Error(), "yaml: line 1:") {
t.Fatalf("expected %q to have 'yaml: line 1:' prefix", err.Error())
}
}
@ -251,3 +254,63 @@ func TestYAMLOrJSONDecoder(t *testing.T) {
}
}
}
func TestReadSingleLongLine(t *testing.T) {
testReadLines(t, []int{128 * 1024})
}
func TestReadRandomLineLengths(t *testing.T) {
minLength := 100
maxLength := 96 * 1024
maxLines := 100
lineLengths := make([]int, maxLines)
for i := 0; i < maxLines; i++ {
lineLengths[i] = rand.Intn(maxLength-minLength) + minLength
}
testReadLines(t, lineLengths)
}
func testReadLines(t *testing.T, lineLengths []int) {
var (
lines [][]byte
inputStream []byte
)
for _, lineLength := range lineLengths {
inputLine := make([]byte, lineLength+1)
for i := 0; i < lineLength; i++ {
char := rand.Intn('z'-'A') + 'A'
inputLine[i] = byte(char)
}
inputLine[len(inputLine)-1] = '\n'
lines = append(lines, inputLine)
}
for _, line := range lines {
inputStream = append(inputStream, line...)
}
// init Reader
reader := bufio.NewReader(bytes.NewReader(inputStream))
lineReader := &LineReader{reader: reader}
// read lines
var readLines [][]byte
for range lines {
bytes, err := lineReader.Read()
if err != nil && err != io.EOF {
t.Fatalf("failed to read lines: %v", err)
}
readLines = append(readLines, bytes)
}
// validate
for i := range lines {
if len(lines[i]) != len(readLines[i]) {
t.Fatalf("expected line length: %d, but got %d", len(lines[i]), len(readLines[i]))
}
if !reflect.DeepEqual(lines[i], readLines[i]) {
t.Fatalf("expected line: %v, but got %v", lines[i], readLines[i])
}
}
}