Add a streaming and "raw" abstraction to codec factory

This commit is contained in:
Clayton Coleman 2016-03-21 12:40:41 -04:00
parent 2bb6f74bf9
commit 54eaa56b92
10 changed files with 299 additions and 87 deletions

View File

@ -59,8 +59,6 @@ func fuzzInternalObject(t *testing.T, forVersion unversioned.GroupVersion, item
}
func roundTrip(t *testing.T, codec runtime.Codec, item runtime.Object) {
//t.Logf("codec: %#v", codec)
printer := spew.ConfigState{DisableMethods: true}
name := reflect.TypeOf(item).Elem().Name()
@ -118,9 +116,6 @@ func roundTripSame(t *testing.T, group testapi.TestGroup, item runtime.Object, e
// For debugging problems
func TestSpecificKind(t *testing.T) {
// api.Scheme.Log(t)
// defer api.Scheme.Log(nil)
kind := "DaemonSet"
for i := 0; i < *fuzzIters; i++ {
doRoundTripTest(testapi.Groups["extensions"], kind, t)
@ -131,9 +126,6 @@ func TestSpecificKind(t *testing.T) {
}
func TestList(t *testing.T) {
// api.Scheme.Log(t)
// defer api.Scheme.Log(nil)
kind := "List"
item, err := api.Scheme.New(api.SchemeGroupVersion.WithKind(kind))
if err != nil {
@ -149,9 +141,6 @@ var nonInternalRoundTrippableTypes = sets.NewString("List", "ListOptions", "Expo
var nonRoundTrippableTypesByVersion = map[string][]string{}
func TestRoundTripTypes(t *testing.T) {
// api.Scheme.Log(t)
// defer api.Scheme.Log(nil)
for groupKey, group := range testapi.Groups {
for kind := range group.InternalTypes() {
t.Logf("working on %v in %v", kind, groupKey)
@ -286,6 +275,26 @@ func BenchmarkEncodeCodec(b *testing.B) {
b.StopTimer()
}
// BenchmarkEncodeCodecFromInternal measures the cost of performing a codec encode,
// including conversions.
func BenchmarkEncodeCodecFromInternal(b *testing.B) {
items := benchmarkItems()
width := len(items)
encodable := make([]api.Pod, width)
for i := range items {
if err := api.Scheme.Convert(&items[i], &encodable[i]); err != nil {
b.Fatal(err)
}
}
b.ResetTimer()
for i := 0; i < b.N; i++ {
if _, err := runtime.Encode(testapi.Default.Codec(), &encodable[i%width]); err != nil {
b.Fatal(err)
}
}
b.StopTimer()
}
// BenchmarkEncodeJSONMarshal provides a baseline for regular JSON encode performance
func BenchmarkEncodeJSONMarshal(b *testing.B) {
items := benchmarkItems()

View File

@ -235,6 +235,16 @@ func (g TestGroup) RESTMapper() meta.RESTMapper {
return registered.RESTMapper()
}
// ExternalGroupVersions returns all external group versions allowed for the server.
func ExternalGroupVersions() []unversioned.GroupVersion {
versions := []unversioned.GroupVersion{}
for _, g := range Groups {
gv := g.GroupVersion()
versions = append(versions, *gv)
}
return versions
}
// Get codec based on runtime.Object
func GetCodecForObject(obj runtime.Object) (runtime.Codec, error) {
kind, err := api.Scheme.ObjectKind(obj)

View File

@ -40,9 +40,15 @@ const OldestVersion = "v1"
// with a set of versions to choose.
var Versions = []string{"v1"}
var Codec = versioning.NewCodecForScheme(
api.Scheme,
json.NewYAMLSerializer(json.DefaultMetaFactory, api.Scheme, runtime.ObjectTyperToTyper(api.Scheme)),
[]unversioned.GroupVersion{{Version: Version}},
[]unversioned.GroupVersion{{Version: runtime.APIVersionInternal}},
)
var Codec runtime.Codec
func init() {
yamlSerializer := json.NewYAMLSerializer(json.DefaultMetaFactory, api.Scheme, runtime.ObjectTyperToTyper(api.Scheme))
Codec = versioning.NewCodecForScheme(
api.Scheme,
yamlSerializer,
yamlSerializer,
[]unversioned.GroupVersion{{Version: Version}},
[]unversioned.GroupVersion{{Version: runtime.APIVersionInternal}},
)
}

View File

@ -17,36 +17,44 @@ limitations under the License.
package serializer
import (
"io/ioutil"
"k8s.io/kubernetes/pkg/api/unversioned"
"k8s.io/kubernetes/pkg/runtime"
"k8s.io/kubernetes/pkg/runtime/serializer/json"
"k8s.io/kubernetes/pkg/runtime/serializer/recognizer"
"k8s.io/kubernetes/pkg/runtime/serializer/streaming"
"k8s.io/kubernetes/pkg/runtime/serializer/versioning"
)
// serializerExtensions are for serializers that are conditionally compiled in
var serializerExtensions = []func(*runtime.Scheme) (serializerType, bool){}
type serializerType struct {
AcceptContentTypes []string
ContentType string
FileExtensions []string
Serializer runtime.Serializer
PrettySerializer runtime.Serializer
Serializer runtime.Serializer
PrettySerializer runtime.Serializer
// RawSerializer serializes an object without adding a type wrapper. Some serializers, like JSON
// automatically include identifying type information with the JSON. Others, like Protobuf, need
// a wrapper object that includes type information. This serializer should be set if the serializer
// can serialize / deserialize objects without type info. Note that this serializer will always
// be expected to pass into or a gvk to Decode, since no type information will be available on
// the object itself.
RawSerializer runtime.Serializer
// Specialize gives the type the opportunity to return a different serializer implementation if
// the content type contains alternate operations. Here it is used to implement "pretty" as an
// option to application/json, but could also be used to allow serializers to perform type
// defaulting or alter output.
Specialize func(map[string]string) (runtime.Serializer, bool)
}
// NewCodecFactory provides methods for retrieving serializers for the supported wire formats
// and conversion wrappers to define preferred internal and external versions. In the future,
// as the internal version is used less, callers may instead use a defaulting serializer and
// only convert objects which are shared internally (Status, common API machinery).
// TODO: allow other codecs to be compiled in?
// TODO: accept a scheme interface
func NewCodecFactory(scheme *runtime.Scheme) CodecFactory {
return newCodecFactory(scheme, json.DefaultMetaFactory)
}
// newCodecFactory is a helper for testing that allows a different metafactory to be specified.
func newCodecFactory(scheme *runtime.Scheme, mf json.MetaFactory) CodecFactory {
func newSerializersForScheme(scheme *runtime.Scheme, mf json.MetaFactory) []serializerType {
jsonSerializer := json.NewSerializer(mf, scheme, runtime.ObjectTyperToTyper(scheme), false)
jsonPrettySerializer := json.NewSerializer(mf, scheme, runtime.ObjectTyperToTyper(scheme), true)
yamlSerializer := json.NewYAMLSerializer(mf, scheme, runtime.ObjectTyperToTyper(scheme))
serializers := []serializerType{
{
AcceptContentTypes: []string{"application/json"},
@ -55,34 +63,21 @@ func newCodecFactory(scheme *runtime.Scheme, mf json.MetaFactory) CodecFactory {
Serializer: jsonSerializer,
PrettySerializer: jsonPrettySerializer,
},
{
AcceptContentTypes: []string{"application/yaml"},
ContentType: "application/yaml",
FileExtensions: []string{"yaml"},
Serializer: yamlSerializer,
},
}
decoders := make([]runtime.Decoder, 0, len(serializers))
accepts := []string{}
alreadyAccepted := make(map[string]struct{})
for _, d := range serializers {
decoders = append(decoders, d.Serializer)
for _, mediaType := range d.AcceptContentTypes {
if _, ok := alreadyAccepted[mediaType]; ok {
continue
}
alreadyAccepted[mediaType] = struct{}{}
accepts = append(accepts, mediaType)
yamlSerializer := json.NewYAMLSerializer(mf, scheme, runtime.ObjectTyperToTyper(scheme))
serializers = append(serializers, serializerType{
AcceptContentTypes: []string{"application/yaml"},
ContentType: "application/yaml",
FileExtensions: []string{"yaml"},
Serializer: yamlSerializer,
})
for _, fn := range serializerExtensions {
if serializer, ok := fn(scheme); ok {
serializers = append(serializers, serializer)
}
}
return CodecFactory{
scheme: scheme,
serializers: serializers,
universal: recognizer.NewDecoder(decoders...),
accepts: accepts,
legacySerializer: jsonSerializer,
}
return serializers
}
// CodecFactory provides methods for retrieving codecs and serializers for specific
@ -96,6 +91,78 @@ type CodecFactory struct {
legacySerializer runtime.Serializer
}
// NewCodecFactory provides methods for retrieving serializers for the supported wire formats
// and conversion wrappers to define preferred internal and external versions. In the future,
// as the internal version is used less, callers may instead use a defaulting serializer and
// only convert objects which are shared internally (Status, common API machinery).
// TODO: allow other codecs to be compiled in?
// TODO: accept a scheme interface
func NewCodecFactory(scheme *runtime.Scheme) CodecFactory {
serializers := newSerializersForScheme(scheme, json.DefaultMetaFactory)
return newCodecFactory(scheme, serializers)
}
// NewStreamingCodecFactory returns serializers that support the streaming.Serializer interface.
// TODO: determine whether this returns a streaming.Serializer AND runtime.Serializer, or whether
// streaming should be added to the CodecFactory interface.
func NewStreamingCodecFactory(scheme *runtime.Scheme) CodecFactory {
return newStreamingCodecFactory(scheme, json.DefaultMetaFactory)
}
// newStreamingCodecFactory handles providing streaming codecs
func newStreamingCodecFactory(scheme *runtime.Scheme, mf json.MetaFactory) CodecFactory {
serializers := newSerializersForScheme(scheme, mf)
streamers := []serializerType{}
for i := range serializers {
if serializers[i].RawSerializer != nil {
serializers[i].Serializer = serializers[i].RawSerializer
}
if s, ok := serializers[i].Serializer.(streaming.Framer); ok {
// TODO: more elegant option?
// TODO: add tests and assertions for which serializers should
// have framers. We need to answer whether all Serializers
// are streaming serializers or not.
if s.NewFrameWriter(ioutil.Discard) == nil {
continue
}
streamers = append(streamers, serializers[i])
}
}
return newCodecFactory(scheme, streamers)
}
// newCodecFactory is a helper for testing that allows a different metafactory to be specified.
func newCodecFactory(scheme *runtime.Scheme, serializers []serializerType) CodecFactory {
decoders := make([]runtime.Decoder, 0, len(serializers))
accepts := []string{}
alreadyAccepted := make(map[string]struct{})
var legacySerializer runtime.Serializer
for _, d := range serializers {
decoders = append(decoders, d.Serializer)
for _, mediaType := range d.AcceptContentTypes {
if _, ok := alreadyAccepted[mediaType]; ok {
continue
}
alreadyAccepted[mediaType] = struct{}{}
accepts = append(accepts, mediaType)
if mediaType == "application/json" {
legacySerializer = d.Serializer
}
}
}
if legacySerializer == nil {
legacySerializer = serializers[0].Serializer
}
return CodecFactory{
scheme: scheme,
serializers: serializers,
universal: recognizer.NewDecoder(decoders...),
accepts: accepts,
legacySerializer: legacySerializer,
}
}
var _ runtime.NegotiatedSerializer = &CodecFactory{}
// SupportedMediaTypes returns the RFC2046 media types that this factory has serializers for.
@ -109,7 +176,7 @@ func (f CodecFactory) SupportedMediaTypes() []string {
// This method is deprecated - clients and servers should negotiate a serializer by mime-type and
// invoke CodecForVersions. Callers that need only to read data should use UniversalDecoder().
func (f CodecFactory) LegacyCodec(version ...unversioned.GroupVersion) runtime.Codec {
return f.CodecForVersions(runtime.NewCodec(f.legacySerializer, f.universal), version, nil)
return versioning.NewCodecForScheme(f.scheme, f.legacySerializer, f.universal, version, nil)
}
// UniversalDeserializer can convert any stored data recognized by this factory into a Go object that satisfies
@ -134,7 +201,7 @@ func (f CodecFactory) UniversalDecoder(versions ...unversioned.GroupVersion) run
// it will default to runtime.APIVersionInternal. If encode is not specified for an object's group, the object is not
// converted. If encode or decode are nil, no conversion is performed.
func (f CodecFactory) CodecForVersions(serializer runtime.Serializer, encode []unversioned.GroupVersion, decode []unversioned.GroupVersion) runtime.Codec {
return versioning.NewCodecForScheme(f.scheme, serializer, encode, decode)
return versioning.NewCodecForScheme(f.scheme, serializer, serializer, encode, decode)
}
// DecoderToVersion returns a decoder that targets the provided group version.
@ -153,6 +220,10 @@ func (f CodecFactory) SerializerForMediaType(mediaType string, options map[strin
for _, s := range f.serializers {
for _, accepted := range s.AcceptContentTypes {
if accepted == mediaType {
if s.Specialize != nil && len(options) > 0 {
serializer, ok := s.Specialize(options)
return serializer, ok
}
if v, ok := options["pretty"]; ok && v == "1" && s.PrettySerializer != nil {
return s.PrettySerializer, true
}

View File

@ -173,7 +173,7 @@ func GetTestScheme() (*runtime.Scheme, runtime.Codec) {
s.AddUnversionedTypes(externalGV, &unversioned.Status{})
cf := newCodecFactory(s, testMetaFactory{})
cf := newCodecFactory(s, newSerializersForScheme(s, testMetaFactory{}))
codec := cf.LegacyCodec(unversioned.GroupVersion{Version: "v1"})
return s, codec
}
@ -263,7 +263,7 @@ func TestVersionedEncoding(t *testing.T) {
t.Fatal(err)
}
cf := newCodecFactory(s, testMetaFactory{})
cf := newCodecFactory(s, newSerializersForScheme(s, testMetaFactory{}))
encoder, _ := cf.SerializerForFileExtension("json")
// codec that is unversioned uses the target version
@ -326,7 +326,7 @@ func TestConvertTypesWhenDefaultNamesMatch(t *testing.T) {
}
expect := &TestType1{A: "test"}
codec := newCodecFactory(s, testMetaFactory{}).LegacyCodec(unversioned.GroupVersion{Version: "v1"})
codec := newCodecFactory(s, newSerializersForScheme(s, testMetaFactory{})).LegacyCodec(unversioned.GroupVersion{Version: "v1"})
obj, err := runtime.Decode(codec, data)
if err != nil {

View File

@ -0,0 +1,106 @@
/*
Copyright 2015 The Kubernetes Authors All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
// Package streaming implements encoder and decoder for streams
// of runtime.Objects over io.Writer/Readers.
package streaming
import (
"bytes"
"io"
"k8s.io/kubernetes/pkg/api/unversioned"
"k8s.io/kubernetes/pkg/runtime"
)
// Framer is a factory for creating readers and writers that obey a particular framing pattern.
type Framer interface {
NewFrameReader(r io.Reader) io.Reader
NewFrameWriter(w io.Writer) io.Writer
}
// Encoder is a runtime.Encoder on a stream.
type Encoder interface {
// Encode will write the provided object to the stream or return an error. It obeys the same
// contract as runtime.Encoder.
Encode(obj runtime.Object, overrides ...unversioned.GroupVersion) error
}
// Decoder is a runtime.Decoder from a stream.
type Decoder interface {
// Decode will return io.EOF when no more objects are available.
Decode(defaults *unversioned.GroupVersionKind, into runtime.Object) (runtime.Object, *unversioned.GroupVersionKind, error)
}
// Serializer is a factory for creating encoders and decoders that work over streams.
type Serializer interface {
NewEncoder(w io.Writer) Encoder
NewDecoder(r io.Reader) Decoder
}
type decoder struct {
reader io.Reader
decoder runtime.Decoder
buf []byte
}
// NewDecoder creates a streaming decoder that reads object chunks from r and decodes them with d.
// The reader is expected to return ErrShortRead if the provided buffer is not large enough to read
// an entire object.
func NewDecoder(r io.Reader, d runtime.Decoder) Decoder {
return &decoder{
reader: r,
decoder: d,
buf: make([]byte, 1024*1024),
}
}
// Decode reads the next object from the stream and decodes it.
func (d *decoder) Decode(defaults *unversioned.GroupVersionKind, into runtime.Object) (runtime.Object, *unversioned.GroupVersionKind, error) {
// TODO: instead of depending on a fixed sized buffer, we should handle ErrShortRead specially and
// grow the buffer capacity up to a maximum amount. Requires the framer to allow repeated reads to
// the stream until the frame is finished.
n, err := d.reader.Read(d.buf)
if err != nil {
return nil, nil, err
}
return d.decoder.Decode(d.buf[:n], defaults, into)
}
type encoder struct {
writer io.Writer
encoder runtime.Encoder
buf *bytes.Buffer
}
// NewEncoder returns a new streaming encoder.
func NewEncoder(w io.Writer, e runtime.Encoder) Encoder {
return &encoder{
writer: w,
encoder: e,
buf: &bytes.Buffer{},
}
}
// Encode writes the provided object to the nested writer.
func (e *encoder) Encode(obj runtime.Object, overrides ...unversioned.GroupVersion) error {
if err := e.encoder.EncodeToStream(obj, e.buf, overrides...); err != nil {
return err
}
_, err := e.writer.Write(e.buf.Bytes())
e.buf.Reset()
return err
}

View File

@ -64,18 +64,20 @@ func EnableCrossGroupEncoding(e runtime.Encoder, sourceGroup, destGroup string)
func NewCodecForScheme(
// TODO: I should be a scheme interface?
scheme *runtime.Scheme,
serializer runtime.Serializer,
encoder runtime.Encoder,
decoder runtime.Decoder,
encodeVersion []unversioned.GroupVersion,
decodeVersion []unversioned.GroupVersion,
) runtime.Codec {
return NewCodec(serializer, scheme, scheme, scheme, runtime.ObjectTyperToTyper(scheme), encodeVersion, decodeVersion)
return NewCodec(encoder, decoder, scheme, scheme, scheme, runtime.ObjectTyperToTyper(scheme), encodeVersion, decodeVersion)
}
// NewCodec takes objects in their internal versions and converts them to external versions before
// serializing them. It assumes the serializer provided to it only deals with external versions.
// This class is also a serializer, but is generally used with a specific version.
func NewCodec(
serializer runtime.Serializer,
encoder runtime.Encoder,
decoder runtime.Decoder,
convertor runtime.ObjectConvertor,
creater runtime.ObjectCreater,
copier runtime.ObjectCopier,
@ -84,11 +86,12 @@ func NewCodec(
decodeVersion []unversioned.GroupVersion,
) runtime.Codec {
internal := &codec{
serializer: serializer,
convertor: convertor,
creater: creater,
copier: copier,
typer: typer,
encoder: encoder,
decoder: decoder,
convertor: convertor,
creater: creater,
copier: copier,
typer: typer,
}
if encodeVersion != nil {
internal.encodeVersion = make(map[string]unversioned.GroupVersion)
@ -115,11 +118,12 @@ func NewCodec(
}
type codec struct {
serializer runtime.Serializer
convertor runtime.ObjectConvertor
creater runtime.ObjectCreater
copier runtime.ObjectCopier
typer runtime.Typer
encoder runtime.Encoder
decoder runtime.Decoder
convertor runtime.ObjectConvertor
creater runtime.ObjectCreater
copier runtime.ObjectCopier
typer runtime.Typer
encodeVersion map[string]unversioned.GroupVersion
decodeVersion map[string]unversioned.GroupVersion
@ -134,7 +138,7 @@ func (c *codec) Decode(data []byte, defaultGVK *unversioned.GroupVersionKind, in
into = versioned.Last()
}
obj, gvk, err := c.serializer.Decode(data, defaultGVK, into)
obj, gvk, err := c.decoder.Decode(data, defaultGVK, into)
if err != nil {
return nil, gvk, err
}
@ -213,7 +217,7 @@ func (c *codec) Decode(data []byte, defaultGVK *unversioned.GroupVersionKind, in
// encoding the object the first override that matches the object's group is used. Other overrides are ignored.
func (c *codec) EncodeToStream(obj runtime.Object, w io.Writer, overrides ...unversioned.GroupVersion) error {
if _, ok := obj.(*runtime.Unknown); ok {
return c.serializer.EncodeToStream(obj, w, overrides...)
return c.encoder.EncodeToStream(obj, w, overrides...)
}
gvk, isUnversioned, err := c.typer.ObjectKind(obj)
if err != nil {
@ -224,7 +228,7 @@ func (c *codec) EncodeToStream(obj runtime.Object, w io.Writer, overrides ...unv
old := obj.GetObjectKind().GroupVersionKind()
obj.GetObjectKind().SetGroupVersionKind(gvk)
defer obj.GetObjectKind().SetGroupVersionKind(old)
return c.serializer.EncodeToStream(obj, w, overrides...)
return c.encoder.EncodeToStream(obj, w, overrides...)
}
targetGV, ok := c.encodeVersion[gvk.Group]
@ -270,7 +274,7 @@ func (c *codec) EncodeToStream(obj runtime.Object, w io.Writer, overrides ...unv
obj.GetObjectKind().SetGroupVersionKind(&unversioned.GroupVersionKind{Group: targetGV.Group, Version: targetGV.Version, Kind: gvk.Kind})
}
return c.serializer.EncodeToStream(obj, w, overrides...)
return c.encoder.EncodeToStream(obj, w, overrides...)
}
// promoteOrPrependGroupVersion finds the group version in the provided group versions that has the same group as target.

View File

@ -169,7 +169,7 @@ func TestDecode(t *testing.T) {
for i, test := range testCases {
t.Logf("%d", i)
s := NewCodec(test.serializer, test.convertor, test.creater, test.copier, test.typer, test.encodes, test.decodes)
s := NewCodec(test.serializer, test.serializer, test.convertor, test.creater, test.copier, test.typer, test.encodes, test.decodes)
obj, gvk, err := s.Decode([]byte(`{}`), test.defaultGVK, test.into)
if !reflect.DeepEqual(test.expectedGVK, gvk) {

View File

@ -86,7 +86,7 @@ func New(kubeConfigFile string) (*WebhookAuthorizer, error) {
return nil, err
}
serializer := json.NewSerializer(json.DefaultMetaFactory, api.Scheme, runtime.ObjectTyperToTyper(api.Scheme), false)
clientConfig.ContentConfig.Codec = versioning.NewCodecForScheme(api.Scheme, serializer, encodeVersions, decodeVersions)
clientConfig.ContentConfig.Codec = versioning.NewCodecForScheme(api.Scheme, serializer, serializer, encodeVersions, decodeVersions)
restClient, err := restclient.UnversionedRESTClientFor(clientConfig)
if err != nil {

View File

@ -39,9 +39,15 @@ var Versions = []string{"v1"}
// Codec is the default codec for serializing input that should use
// the latest supported version. It supports JSON by default.
var Codec = versioning.NewCodecForScheme(
api.Scheme,
json.NewSerializer(json.DefaultMetaFactory, api.Scheme, runtime.ObjectTyperToTyper(api.Scheme), true),
[]unversioned.GroupVersion{{Version: Version}},
[]unversioned.GroupVersion{{Version: runtime.APIVersionInternal}},
)
var Codec runtime.Codec
func init() {
jsonSerializer := json.NewSerializer(json.DefaultMetaFactory, api.Scheme, runtime.ObjectTyperToTyper(api.Scheme), true)
Codec = versioning.NewCodecForScheme(
api.Scheme,
jsonSerializer,
jsonSerializer,
[]unversioned.GroupVersion{{Version: Version}},
[]unversioned.GroupVersion{{Version: runtime.APIVersionInternal}},
)
}