Merge pull request #14148 from brendandburns/perf

Add a method for encoding directly to a io.Writer and use it for HTTP
This commit is contained in:
Brian Grant 2015-09-25 13:19:58 -07:00
commit 313918f561
10 changed files with 112 additions and 49 deletions

View File

@ -28,16 +28,18 @@ cleanup() {
kube::log::status "Benchmark cleanup complete" kube::log::status "Benchmark cleanup complete"
} }
ARGS="-bench-pods 3000 -bench-tasks 100 -bench-tasks 10"
runTests() { runTests() {
kube::etcd::start kube::etcd::start
kube::log::status "Running benchmarks" kube::log::status "Running benchmarks"
KUBE_GOFLAGS="-tags 'benchmark no-docker' -bench . -benchtime 1s -cpu 4" \ KUBE_GOFLAGS="-tags 'benchmark no-docker' -bench . -benchmem -benchtime 1s -cpu 4" \
KUBE_RACE="-race" \ KUBE_RACE="-race" \
KUBE_TEST_API_VERSIONS="v1" \ KUBE_TEST_API_VERSIONS="v1" \
KUBE_TIMEOUT="-timeout 10m" \ KUBE_TIMEOUT="-timeout 10m" \
KUBE_TEST_ETCD_PREFIXES="registry"\ KUBE_TEST_ETCD_PREFIXES="registry"\
ETCD_CUSTOM_PREFIX="None" \ ETCD_CUSTOM_PREFIX="None" \
KUBE_TEST_ARGS="-bench-quiet 0 -bench-pods 30 -bench-tasks 1"\ KUBE_TEST_ARGS="${ARGS}" \
"${KUBE_ROOT}/hack/test-go.sh" test/integration "${KUBE_ROOT}/hack/test-go.sh" test/integration
cleanup cleanup
} }

View File

@ -18,6 +18,7 @@ package meta
import ( import (
"errors" "errors"
"io"
"testing" "testing"
"k8s.io/kubernetes/pkg/runtime" "k8s.io/kubernetes/pkg/runtime"
@ -29,6 +30,10 @@ func (fakeCodec) Encode(runtime.Object) ([]byte, error) {
return []byte{}, nil return []byte{}, nil
} }
func (fakeCodec) EncodeToStream(runtime.Object, io.Writer) error {
return nil
}
func (fakeCodec) Decode([]byte) (runtime.Object, error) { func (fakeCodec) Decode([]byte) (runtime.Object, error) {
return nil, nil return nil, nil
} }

View File

@ -380,24 +380,32 @@ func isPrettyPrint(req *http.Request) bool {
// writeJSON renders an object as JSON to the response. // writeJSON renders an object as JSON to the response.
func writeJSON(statusCode int, codec runtime.Codec, object runtime.Object, w http.ResponseWriter, pretty bool) { func writeJSON(statusCode int, codec runtime.Codec, object runtime.Object, w http.ResponseWriter, pretty bool) {
w.Header().Set("Content-Type", "application/json")
// We send the status code before we encode the object, so if we error, the status code stays but there will
// still be an error object. This seems ok, the alternative is to validate the object before
// encoding, but this really should never happen, so it's wasted compute for every API request.
w.WriteHeader(statusCode)
if pretty {
prettyJSON(codec, object, w)
return
}
err := codec.EncodeToStream(object, w)
if err != nil {
errorJSONFatal(err, codec, w)
}
}
func prettyJSON(codec runtime.Codec, object runtime.Object, w http.ResponseWriter) {
formatted := &bytes.Buffer{}
output, err := codec.Encode(object) output, err := codec.Encode(object)
if err != nil { if err != nil {
errorJSONFatal(err, codec, w) errorJSONFatal(err, codec, w)
return
} }
if pretty { if err := json.Indent(formatted, output, "", " "); err != nil {
// PR #2243: Pretty-print JSON by default.
formatted := &bytes.Buffer{}
err = json.Indent(formatted, output, "", " ")
if err != nil {
errorJSONFatal(err, codec, w) errorJSONFatal(err, codec, w)
return return
} }
output = formatted.Bytes() w.Write(formatted.Bytes())
}
w.Header().Set("Content-Type", "application/json")
w.WriteHeader(statusCode)
w.Write(output)
} }
// errorJSON renders an error to the response. Returns the HTTP status code of the error. // errorJSON renders an error to the response. Returns the HTTP status code of the error.

View File

@ -2432,9 +2432,8 @@ func expectApiStatus(t *testing.T, method, url string, data []byte, code int) *u
return nil return nil
} }
var status unversioned.Status var status unversioned.Status
_, err = extractBody(response, &status) if body, err := extractBody(response, &status); err != nil {
if err != nil { t.Fatalf("unexpected error on %s %s: %v\nbody:\n%s", method, url, err, body)
t.Fatalf("unexpected error on %s %s: %v", method, url, err)
return nil return nil
} }
if code != response.StatusCode { if code != response.StatusCode {
@ -2470,7 +2469,10 @@ func TestWriteJSONDecodeError(t *testing.T) {
writeJSON(http.StatusOK, codec, &UnregisteredAPIObject{"Undecodable"}, w, false) writeJSON(http.StatusOK, codec, &UnregisteredAPIObject{"Undecodable"}, w, false)
})) }))
defer server.Close() defer server.Close()
status := expectApiStatus(t, "GET", server.URL, nil, http.StatusInternalServerError) // We send a 200 status code before we encode the object, so we expect OK, but there will
// still be an error object. This seems ok, the alternative is to validate the object before
// encoding, but this really should never happen, so it's wasted compute for every API request.
status := expectApiStatus(t, "GET", server.URL, nil, http.StatusOK)
if status.Reason != unversioned.StatusReasonUnknown { if status.Reason != unversioned.StatusReasonUnknown {
t.Errorf("unexpected reason %#v", status) t.Errorf("unexpected reason %#v", status)
} }

View File

@ -17,8 +17,10 @@ limitations under the License.
package conversion package conversion
import ( import (
"bytes"
"encoding/json" "encoding/json"
"fmt" "fmt"
"io"
"path" "path"
) )
@ -51,6 +53,14 @@ import (
// config files. // config files.
// //
func (s *Scheme) EncodeToVersion(obj interface{}, destVersion string) (data []byte, err error) { func (s *Scheme) EncodeToVersion(obj interface{}, destVersion string) (data []byte, err error) {
buff := &bytes.Buffer{}
if err := s.EncodeToVersionStream(obj, destVersion, buff); err != nil {
return nil, err
}
return buff.Bytes(), nil
}
func (s *Scheme) EncodeToVersionStream(obj interface{}, destVersion string, stream io.Writer) error {
obj = maybeCopy(obj) obj = maybeCopy(obj)
v, _ := EnforcePtr(obj) // maybeCopy guarantees a pointer v, _ := EnforcePtr(obj) // maybeCopy guarantees a pointer
@ -58,28 +68,34 @@ func (s *Scheme) EncodeToVersion(obj interface{}, destVersion string) (data []by
// destVersion is v1, encode it to v1 for backward compatibility. // destVersion is v1, encode it to v1 for backward compatibility.
pkg := path.Base(v.Type().PkgPath()) pkg := path.Base(v.Type().PkgPath())
if pkg == "unversioned" && destVersion != "v1" { if pkg == "unversioned" && destVersion != "v1" {
return s.encodeUnversionedObject(obj) // TODO: convert this to streaming too
data, err := s.encodeUnversionedObject(obj)
if err != nil {
return err
}
_, err = stream.Write(data)
return err
} }
if _, registered := s.typeToVersion[v.Type()]; !registered { if _, registered := s.typeToVersion[v.Type()]; !registered {
return nil, fmt.Errorf("type %v is not registered for %q and it will be impossible to Decode it, therefore Encode will refuse to encode it.", v.Type(), destVersion) return fmt.Errorf("type %v is not registered for %q and it will be impossible to Decode it, therefore Encode will refuse to encode it.", v.Type(), destVersion)
} }
objVersion, objKind, err := s.ObjectVersionAndKind(obj) objVersion, objKind, err := s.ObjectVersionAndKind(obj)
if err != nil { if err != nil {
return nil, err return err
} }
// Perform a conversion if necessary. // Perform a conversion if necessary.
if objVersion != destVersion { if objVersion != destVersion {
objOut, err := s.NewObject(destVersion, objKind) objOut, err := s.NewObject(destVersion, objKind)
if err != nil { if err != nil {
return nil, err return err
} }
flags, meta := s.generateConvertMeta(objVersion, destVersion, obj) flags, meta := s.generateConvertMeta(objVersion, destVersion, obj)
err = s.converter.Convert(obj, objOut, flags, meta) err = s.converter.Convert(obj, objOut, flags, meta)
if err != nil { if err != nil {
return nil, err return err
} }
obj = objOut obj = objOut
} }
@ -87,29 +103,29 @@ func (s *Scheme) EncodeToVersion(obj interface{}, destVersion string) (data []by
// ensure the output object name comes from the destination type // ensure the output object name comes from the destination type
_, objKind, err = s.ObjectVersionAndKind(obj) _, objKind, err = s.ObjectVersionAndKind(obj)
if err != nil { if err != nil {
return nil, err return err
} }
// Version and Kind should be set on the wire. // Version and Kind should be set on the wire.
err = s.SetVersionAndKind(destVersion, objKind, obj) err = s.SetVersionAndKind(destVersion, objKind, obj)
if err != nil { if err != nil {
return nil, err return err
} }
// To add metadata, do some simple surgery on the JSON. // To add metadata, do some simple surgery on the JSON.
data, err = json.Marshal(obj) encoder := json.NewEncoder(stream)
if err != nil { if err := encoder.Encode(obj); err != nil {
return nil, err return err
} }
// Version and Kind should be blank in memory. Reset them, since it's // Version and Kind should be blank in memory. Reset them, since it's
// possible that we modified a user object and not a copy above. // possible that we modified a user object and not a copy above.
err = s.SetVersionAndKind("", "", obj) err = s.SetVersionAndKind("", "", obj)
if err != nil { if err != nil {
return nil, err return err
} }
return data, nil return nil
} }
func (s *Scheme) encodeUnversionedObject(obj interface{}) (data []byte, err error) { func (s *Scheme) encodeUnversionedObject(obj interface{}) (data []byte, err error) {

View File

@ -20,6 +20,7 @@ import (
"bytes" "bytes"
"encoding/json" "encoding/json"
"fmt" "fmt"
"io"
"strings" "strings"
"k8s.io/kubernetes/pkg/api/latest" "k8s.io/kubernetes/pkg/api/latest"
@ -219,40 +220,49 @@ const template = `{
"items": [ %s ] "items": [ %s ]
}` }`
func encodeToJSON(obj *experimental.ThirdPartyResourceData) ([]byte, error) { func encodeToJSON(obj *experimental.ThirdPartyResourceData, stream io.Writer) error {
var objOut interface{} var objOut interface{}
if err := json.Unmarshal(obj.Data, &objOut); err != nil { if err := json.Unmarshal(obj.Data, &objOut); err != nil {
return nil, err return err
} }
objMap, ok := objOut.(map[string]interface{}) objMap, ok := objOut.(map[string]interface{})
if !ok { if !ok {
return nil, fmt.Errorf("unexpected type: %v", objOut) return fmt.Errorf("unexpected type: %v", objOut)
} }
objMap["metadata"] = obj.ObjectMeta objMap["metadata"] = obj.ObjectMeta
return json.Marshal(objMap) encoder := json.NewEncoder(stream)
return encoder.Encode(objMap)
} }
func (t *thirdPartyResourceDataCodec) Encode(obj runtime.Object) (data []byte, err error) { func (t *thirdPartyResourceDataCodec) Encode(obj runtime.Object) ([]byte, error) {
switch obj := obj.(type) {
case *experimental.ThirdPartyResourceData:
return encodeToJSON(obj)
case *experimental.ThirdPartyResourceDataList:
// TODO: There must be a better way to do this...
buff := &bytes.Buffer{} buff := &bytes.Buffer{}
dataStrings := make([]string, len(obj.Items)) if err := t.EncodeToStream(obj, buff); err != nil {
for ix := range obj.Items {
data, err := encodeToJSON(&obj.Items[ix])
if err != nil {
return nil, err return nil, err
} }
dataStrings[ix] = string(data)
}
fmt.Fprintf(buff, template, t.kind+"List", strings.Join(dataStrings, ","))
return buff.Bytes(), nil return buff.Bytes(), nil
}
func (t *thirdPartyResourceDataCodec) EncodeToStream(obj runtime.Object, stream io.Writer) (err error) {
switch obj := obj.(type) {
case *experimental.ThirdPartyResourceData:
return encodeToJSON(obj, stream)
case *experimental.ThirdPartyResourceDataList:
// TODO: There must be a better way to do this...
dataStrings := make([]string, len(obj.Items))
for ix := range obj.Items {
buff := &bytes.Buffer{}
err := encodeToJSON(&obj.Items[ix], buff)
if err != nil {
return err
}
dataStrings[ix] = buff.String()
}
fmt.Fprintf(stream, template, t.kind+"List", strings.Join(dataStrings, ","))
return nil
case *unversioned.Status: case *unversioned.Status:
return t.delegate.Encode(obj) return t.delegate.EncodeToStream(obj, stream)
default: default:
return nil, fmt.Errorf("unexpected object to encode: %#v", obj) return fmt.Errorf("unexpected object to encode: %#v", obj)
} }
} }

View File

@ -17,6 +17,8 @@ limitations under the License.
package runtime package runtime
import ( import (
"io"
"k8s.io/kubernetes/pkg/util/yaml" "k8s.io/kubernetes/pkg/util/yaml"
) )
@ -78,6 +80,10 @@ func (c *codecWrapper) Encode(obj Object) ([]byte, error) {
return c.EncodeToVersion(obj, c.version) return c.EncodeToVersion(obj, c.version)
} }
func (c *codecWrapper) EncodeToStream(obj Object, stream io.Writer) error {
return c.EncodeToVersionStream(obj, c.version, stream)
}
// TODO: Make this behaviour default when we move everyone away from // TODO: Make this behaviour default when we move everyone away from
// the unversioned types. // the unversioned types.
// //

View File

@ -16,6 +16,10 @@ limitations under the License.
package runtime package runtime
import (
"io"
)
// ObjectScheme represents common conversions between formal external API versions // ObjectScheme represents common conversions between formal external API versions
// and the internal Go structs. ObjectScheme is typically used with ObjectCodec to // and the internal Go structs. ObjectScheme is typically used with ObjectCodec to
// transform internal Go structs into serialized versions. There may be many valid // transform internal Go structs into serialized versions. There may be many valid
@ -45,6 +49,7 @@ type Decoder interface {
// Encoder defines methods for serializing API objects into bytes // Encoder defines methods for serializing API objects into bytes
type Encoder interface { type Encoder interface {
Encode(obj Object) (data []byte, err error) Encode(obj Object) (data []byte, err error)
EncodeToStream(obj Object, stream io.Writer) error
} }
// Codec defines methods for serializing and deserializing API objects. // Codec defines methods for serializing and deserializing API objects.
@ -67,6 +72,7 @@ type ObjectEncoder interface {
// to a specified output version. An error is returned if the object // to a specified output version. An error is returned if the object
// cannot be converted for any reason. // cannot be converted for any reason.
EncodeToVersion(obj Object, outVersion string) ([]byte, error) EncodeToVersion(obj Object, outVersion string) ([]byte, error)
EncodeToVersionStream(obj Object, outVersion string, stream io.Writer) error
} }
// ObjectConvertor converts an object to a different version. // ObjectConvertor converts an object to a different version.

View File

@ -19,6 +19,7 @@ package runtime
import ( import (
"encoding/json" "encoding/json"
"fmt" "fmt"
"io"
"net/url" "net/url"
"reflect" "reflect"
@ -434,6 +435,10 @@ func (s *Scheme) EncodeToVersion(obj Object, destVersion string) (data []byte, e
return s.raw.EncodeToVersion(obj, destVersion) return s.raw.EncodeToVersion(obj, destVersion)
} }
func (s *Scheme) EncodeToVersionStream(obj Object, destVersion string, stream io.Writer) error {
return s.raw.EncodeToVersionStream(obj, destVersion, stream)
}
// Decode converts a YAML or JSON string back into a pointer to an api object. // Decode converts a YAML or JSON string back into a pointer to an api object.
// Deduces the type based upon the APIVersion and Kind fields, which are set // Deduces the type based upon the APIVersion and Kind fields, which are set
// by Encode. Only versioned objects (APIVersion != "") are accepted. The object // by Encode. Only versioned objects (APIVersion != "") are accepted. The object

View File

@ -246,13 +246,16 @@ func TestExtensionMapping(t *testing.T) {
}{ }{
{ {
&InternalExtensionType{Extension: runtime.EmbeddedObject{Object: &ExtensionA{TestString: "foo"}}}, &InternalExtensionType{Extension: runtime.EmbeddedObject{Object: &ExtensionA{TestString: "foo"}}},
`{"kind":"ExtensionType","apiVersion":"testExternal","extension":{"kind":"A","testString":"foo"}}`, `{"kind":"ExtensionType","apiVersion":"testExternal","extension":{"kind":"A","testString":"foo"}}
`,
}, { }, {
&InternalExtensionType{Extension: runtime.EmbeddedObject{Object: &ExtensionB{TestString: "bar"}}}, &InternalExtensionType{Extension: runtime.EmbeddedObject{Object: &ExtensionB{TestString: "bar"}}},
`{"kind":"ExtensionType","apiVersion":"testExternal","extension":{"kind":"B","testString":"bar"}}`, `{"kind":"ExtensionType","apiVersion":"testExternal","extension":{"kind":"B","testString":"bar"}}
`,
}, { }, {
&InternalExtensionType{Extension: runtime.EmbeddedObject{Object: nil}}, &InternalExtensionType{Extension: runtime.EmbeddedObject{Object: nil}},
`{"kind":"ExtensionType","apiVersion":"testExternal","extension":null}`, `{"kind":"ExtensionType","apiVersion":"testExternal","extension":null}
`,
}, },
} }
@ -261,7 +264,7 @@ func TestExtensionMapping(t *testing.T) {
if err != nil { if err != nil {
t.Errorf("unexpected error '%v' (%#v)", err, item.obj) t.Errorf("unexpected error '%v' (%#v)", err, item.obj)
} else if e, a := item.encoded, string(gotEncoded); e != a { } else if e, a := item.encoded, string(gotEncoded); e != a {
t.Errorf("expected %v, got %v", e, a) t.Errorf("expected\n%#v\ngot\n%#v\n", e, a)
} }
gotDecoded, err := scheme.Decode([]byte(item.encoded)) gotDecoded, err := scheme.Decode([]byte(item.encoded))