diff --git a/hack/benchmark-integration.sh b/hack/benchmark-integration.sh index 2cd1db0b893..0954d50aae4 100755 --- a/hack/benchmark-integration.sh +++ b/hack/benchmark-integration.sh @@ -28,16 +28,18 @@ cleanup() { kube::log::status "Benchmark cleanup complete" } +ARGS="-bench-pods 3000 -bench-tasks 100 -bench-tasks 10" + runTests() { kube::etcd::start kube::log::status "Running benchmarks" - KUBE_GOFLAGS="-tags 'benchmark no-docker' -bench . -benchtime 1s -cpu 4" \ + KUBE_GOFLAGS="-tags 'benchmark no-docker' -bench . -benchmem -benchtime 1s -cpu 4" \ KUBE_RACE="-race" \ KUBE_TEST_API_VERSIONS="v1" \ KUBE_TIMEOUT="-timeout 10m" \ KUBE_TEST_ETCD_PREFIXES="registry"\ ETCD_CUSTOM_PREFIX="None" \ - KUBE_TEST_ARGS="-bench-quiet 0 -bench-pods 30 -bench-tasks 1"\ + KUBE_TEST_ARGS="${ARGS}" \ "${KUBE_ROOT}/hack/test-go.sh" test/integration cleanup } diff --git a/pkg/api/meta/restmapper_test.go b/pkg/api/meta/restmapper_test.go index 3d2145590f2..40a52b5cefa 100644 --- a/pkg/api/meta/restmapper_test.go +++ b/pkg/api/meta/restmapper_test.go @@ -18,6 +18,7 @@ package meta import ( "errors" + "io" "testing" "k8s.io/kubernetes/pkg/runtime" @@ -29,6 +30,10 @@ func (fakeCodec) Encode(runtime.Object) ([]byte, error) { return []byte{}, nil } +func (fakeCodec) EncodeToStream(runtime.Object, io.Writer) error { + return nil +} + func (fakeCodec) Decode([]byte) (runtime.Object, error) { return nil, nil } diff --git a/pkg/apiserver/apiserver.go b/pkg/apiserver/apiserver.go index 6d3f92ebe4b..7ef887e1dda 100644 --- a/pkg/apiserver/apiserver.go +++ b/pkg/apiserver/apiserver.go @@ -380,24 +380,32 @@ func isPrettyPrint(req *http.Request) bool { // writeJSON renders an object as JSON to the response. func writeJSON(statusCode int, codec runtime.Codec, object runtime.Object, w http.ResponseWriter, pretty bool) { + w.Header().Set("Content-Type", "application/json") + // We send the status code before we encode the object, so if we error, the status code stays but there will + // still be an error object. This seems ok, the alternative is to validate the object before + // encoding, but this really should never happen, so it's wasted compute for every API request. + w.WriteHeader(statusCode) + if pretty { + prettyJSON(codec, object, w) + return + } + err := codec.EncodeToStream(object, w) + if err != nil { + errorJSONFatal(err, codec, w) + } +} + +func prettyJSON(codec runtime.Codec, object runtime.Object, w http.ResponseWriter) { + formatted := &bytes.Buffer{} output, err := codec.Encode(object) if err != nil { errorJSONFatal(err, codec, w) + } + if err := json.Indent(formatted, output, "", " "); err != nil { + errorJSONFatal(err, codec, w) return } - if pretty { - // PR #2243: Pretty-print JSON by default. - formatted := &bytes.Buffer{} - err = json.Indent(formatted, output, "", " ") - if err != nil { - errorJSONFatal(err, codec, w) - return - } - output = formatted.Bytes() - } - w.Header().Set("Content-Type", "application/json") - w.WriteHeader(statusCode) - w.Write(output) + w.Write(formatted.Bytes()) } // errorJSON renders an error to the response. Returns the HTTP status code of the error. diff --git a/pkg/apiserver/apiserver_test.go b/pkg/apiserver/apiserver_test.go index 687d730cfad..82a2c292175 100644 --- a/pkg/apiserver/apiserver_test.go +++ b/pkg/apiserver/apiserver_test.go @@ -2432,9 +2432,8 @@ func expectApiStatus(t *testing.T, method, url string, data []byte, code int) *u return nil } var status unversioned.Status - _, err = extractBody(response, &status) - if err != nil { - t.Fatalf("unexpected error on %s %s: %v", method, url, err) + if body, err := extractBody(response, &status); err != nil { + t.Fatalf("unexpected error on %s %s: %v\nbody:\n%s", method, url, err, body) return nil } if code != response.StatusCode { @@ -2470,7 +2469,10 @@ func TestWriteJSONDecodeError(t *testing.T) { writeJSON(http.StatusOK, codec, &UnregisteredAPIObject{"Undecodable"}, w, false) })) defer server.Close() - status := expectApiStatus(t, "GET", server.URL, nil, http.StatusInternalServerError) + // We send a 200 status code before we encode the object, so we expect OK, but there will + // still be an error object. This seems ok, the alternative is to validate the object before + // encoding, but this really should never happen, so it's wasted compute for every API request. + status := expectApiStatus(t, "GET", server.URL, nil, http.StatusOK) if status.Reason != unversioned.StatusReasonUnknown { t.Errorf("unexpected reason %#v", status) } diff --git a/pkg/conversion/encode.go b/pkg/conversion/encode.go index 41a6561d6c8..7e1b82445bf 100644 --- a/pkg/conversion/encode.go +++ b/pkg/conversion/encode.go @@ -17,8 +17,10 @@ limitations under the License. package conversion import ( + "bytes" "encoding/json" "fmt" + "io" "path" ) @@ -51,6 +53,14 @@ import ( // config files. // func (s *Scheme) EncodeToVersion(obj interface{}, destVersion string) (data []byte, err error) { + buff := &bytes.Buffer{} + if err := s.EncodeToVersionStream(obj, destVersion, buff); err != nil { + return nil, err + } + return buff.Bytes(), nil +} + +func (s *Scheme) EncodeToVersionStream(obj interface{}, destVersion string, stream io.Writer) error { obj = maybeCopy(obj) v, _ := EnforcePtr(obj) // maybeCopy guarantees a pointer @@ -58,28 +68,34 @@ func (s *Scheme) EncodeToVersion(obj interface{}, destVersion string) (data []by // destVersion is v1, encode it to v1 for backward compatibility. pkg := path.Base(v.Type().PkgPath()) if pkg == "unversioned" && destVersion != "v1" { - return s.encodeUnversionedObject(obj) + // TODO: convert this to streaming too + data, err := s.encodeUnversionedObject(obj) + if err != nil { + return err + } + _, err = stream.Write(data) + return err } if _, registered := s.typeToVersion[v.Type()]; !registered { - return nil, fmt.Errorf("type %v is not registered for %q and it will be impossible to Decode it, therefore Encode will refuse to encode it.", v.Type(), destVersion) + return fmt.Errorf("type %v is not registered for %q and it will be impossible to Decode it, therefore Encode will refuse to encode it.", v.Type(), destVersion) } objVersion, objKind, err := s.ObjectVersionAndKind(obj) if err != nil { - return nil, err + return err } // Perform a conversion if necessary. if objVersion != destVersion { objOut, err := s.NewObject(destVersion, objKind) if err != nil { - return nil, err + return err } flags, meta := s.generateConvertMeta(objVersion, destVersion, obj) err = s.converter.Convert(obj, objOut, flags, meta) if err != nil { - return nil, err + return err } obj = objOut } @@ -87,29 +103,29 @@ func (s *Scheme) EncodeToVersion(obj interface{}, destVersion string) (data []by // ensure the output object name comes from the destination type _, objKind, err = s.ObjectVersionAndKind(obj) if err != nil { - return nil, err + return err } // Version and Kind should be set on the wire. err = s.SetVersionAndKind(destVersion, objKind, obj) if err != nil { - return nil, err + return err } // To add metadata, do some simple surgery on the JSON. - data, err = json.Marshal(obj) - if err != nil { - return nil, err + encoder := json.NewEncoder(stream) + if err := encoder.Encode(obj); err != nil { + return err } // Version and Kind should be blank in memory. Reset them, since it's // possible that we modified a user object and not a copy above. err = s.SetVersionAndKind("", "", obj) if err != nil { - return nil, err + return err } - return data, nil + return nil } func (s *Scheme) encodeUnversionedObject(obj interface{}) (data []byte, err error) { diff --git a/pkg/registry/thirdpartyresourcedata/codec.go b/pkg/registry/thirdpartyresourcedata/codec.go index c3e70948b94..c11b4bc91e2 100644 --- a/pkg/registry/thirdpartyresourcedata/codec.go +++ b/pkg/registry/thirdpartyresourcedata/codec.go @@ -20,6 +20,7 @@ import ( "bytes" "encoding/json" "fmt" + "io" "strings" "k8s.io/kubernetes/pkg/api/latest" @@ -219,40 +220,49 @@ const template = `{ "items": [ %s ] }` -func encodeToJSON(obj *experimental.ThirdPartyResourceData) ([]byte, error) { +func encodeToJSON(obj *experimental.ThirdPartyResourceData, stream io.Writer) error { var objOut interface{} if err := json.Unmarshal(obj.Data, &objOut); err != nil { - return nil, err + return err } objMap, ok := objOut.(map[string]interface{}) if !ok { - return nil, fmt.Errorf("unexpected type: %v", objOut) + return fmt.Errorf("unexpected type: %v", objOut) } objMap["metadata"] = obj.ObjectMeta - return json.Marshal(objMap) + encoder := json.NewEncoder(stream) + return encoder.Encode(objMap) } -func (t *thirdPartyResourceDataCodec) Encode(obj runtime.Object) (data []byte, err error) { +func (t *thirdPartyResourceDataCodec) Encode(obj runtime.Object) ([]byte, error) { + buff := &bytes.Buffer{} + if err := t.EncodeToStream(obj, buff); err != nil { + return nil, err + } + return buff.Bytes(), nil +} + +func (t *thirdPartyResourceDataCodec) EncodeToStream(obj runtime.Object, stream io.Writer) (err error) { switch obj := obj.(type) { case *experimental.ThirdPartyResourceData: - return encodeToJSON(obj) + return encodeToJSON(obj, stream) case *experimental.ThirdPartyResourceDataList: // TODO: There must be a better way to do this... - buff := &bytes.Buffer{} dataStrings := make([]string, len(obj.Items)) for ix := range obj.Items { - data, err := encodeToJSON(&obj.Items[ix]) + buff := &bytes.Buffer{} + err := encodeToJSON(&obj.Items[ix], buff) if err != nil { - return nil, err + return err } - dataStrings[ix] = string(data) + dataStrings[ix] = buff.String() } - fmt.Fprintf(buff, template, t.kind+"List", strings.Join(dataStrings, ",")) - return buff.Bytes(), nil + fmt.Fprintf(stream, template, t.kind+"List", strings.Join(dataStrings, ",")) + return nil case *unversioned.Status: - return t.delegate.Encode(obj) + return t.delegate.EncodeToStream(obj, stream) default: - return nil, fmt.Errorf("unexpected object to encode: %#v", obj) + return fmt.Errorf("unexpected object to encode: %#v", obj) } } diff --git a/pkg/runtime/codec.go b/pkg/runtime/codec.go index 93b7102eb69..5d06482ea54 100644 --- a/pkg/runtime/codec.go +++ b/pkg/runtime/codec.go @@ -17,6 +17,8 @@ limitations under the License. package runtime import ( + "io" + "k8s.io/kubernetes/pkg/util/yaml" ) @@ -78,6 +80,10 @@ func (c *codecWrapper) Encode(obj Object) ([]byte, error) { return c.EncodeToVersion(obj, c.version) } +func (c *codecWrapper) EncodeToStream(obj Object, stream io.Writer) error { + return c.EncodeToVersionStream(obj, c.version, stream) +} + // TODO: Make this behaviour default when we move everyone away from // the unversioned types. // diff --git a/pkg/runtime/interfaces.go b/pkg/runtime/interfaces.go index d21274e402f..12f65f8c897 100644 --- a/pkg/runtime/interfaces.go +++ b/pkg/runtime/interfaces.go @@ -16,6 +16,10 @@ limitations under the License. package runtime +import ( + "io" +) + // ObjectScheme represents common conversions between formal external API versions // and the internal Go structs. ObjectScheme is typically used with ObjectCodec to // transform internal Go structs into serialized versions. There may be many valid @@ -45,6 +49,7 @@ type Decoder interface { // Encoder defines methods for serializing API objects into bytes type Encoder interface { Encode(obj Object) (data []byte, err error) + EncodeToStream(obj Object, stream io.Writer) error } // Codec defines methods for serializing and deserializing API objects. @@ -67,6 +72,7 @@ type ObjectEncoder interface { // to a specified output version. An error is returned if the object // cannot be converted for any reason. EncodeToVersion(obj Object, outVersion string) ([]byte, error) + EncodeToVersionStream(obj Object, outVersion string, stream io.Writer) error } // ObjectConvertor converts an object to a different version. diff --git a/pkg/runtime/scheme.go b/pkg/runtime/scheme.go index 4819a9322b1..0a2bf47c3e6 100644 --- a/pkg/runtime/scheme.go +++ b/pkg/runtime/scheme.go @@ -19,6 +19,7 @@ package runtime import ( "encoding/json" "fmt" + "io" "net/url" "reflect" @@ -434,6 +435,10 @@ func (s *Scheme) EncodeToVersion(obj Object, destVersion string) (data []byte, e return s.raw.EncodeToVersion(obj, destVersion) } +func (s *Scheme) EncodeToVersionStream(obj Object, destVersion string, stream io.Writer) error { + return s.raw.EncodeToVersionStream(obj, destVersion, stream) +} + // Decode converts a YAML or JSON string back into a pointer to an api object. // Deduces the type based upon the APIVersion and Kind fields, which are set // by Encode. Only versioned objects (APIVersion != "") are accepted. The object diff --git a/pkg/runtime/scheme_test.go b/pkg/runtime/scheme_test.go index 270ac42e9f3..83f8d4c6e25 100644 --- a/pkg/runtime/scheme_test.go +++ b/pkg/runtime/scheme_test.go @@ -246,13 +246,16 @@ func TestExtensionMapping(t *testing.T) { }{ { &InternalExtensionType{Extension: runtime.EmbeddedObject{Object: &ExtensionA{TestString: "foo"}}}, - `{"kind":"ExtensionType","apiVersion":"testExternal","extension":{"kind":"A","testString":"foo"}}`, + `{"kind":"ExtensionType","apiVersion":"testExternal","extension":{"kind":"A","testString":"foo"}} +`, }, { &InternalExtensionType{Extension: runtime.EmbeddedObject{Object: &ExtensionB{TestString: "bar"}}}, - `{"kind":"ExtensionType","apiVersion":"testExternal","extension":{"kind":"B","testString":"bar"}}`, + `{"kind":"ExtensionType","apiVersion":"testExternal","extension":{"kind":"B","testString":"bar"}} +`, }, { &InternalExtensionType{Extension: runtime.EmbeddedObject{Object: nil}}, - `{"kind":"ExtensionType","apiVersion":"testExternal","extension":null}`, + `{"kind":"ExtensionType","apiVersion":"testExternal","extension":null} +`, }, } @@ -261,7 +264,7 @@ func TestExtensionMapping(t *testing.T) { if err != nil { t.Errorf("unexpected error '%v' (%#v)", err, item.obj) } else if e, a := item.encoded, string(gotEncoded); e != a { - t.Errorf("expected %v, got %v", e, a) + t.Errorf("expected\n%#v\ngot\n%#v\n", e, a) } gotDecoded, err := scheme.Decode([]byte(item.encoded))