From f2139b186ccaac91a28473ce872a825fbed31691 Mon Sep 17 00:00:00 2001 From: Clayton Coleman Date: Sat, 30 Jan 2016 17:55:08 -0500 Subject: [PATCH] Add an experimental protobuf serializer Provide a core protobuf serializer that can either write objects with an envelope (a 4 byte prefix and a runtime.Object) or raw to a byte array. --- .../go2idl/go-to-protobuf/protobuf/cmd.go | 14 +- pkg/api/serialization_proto_test.go | 48 +- pkg/runtime/protobuf/protobuf.go | 165 ------- pkg/runtime/{ => serializer}/protobuf/doc.go | 2 +- pkg/runtime/serializer/protobuf/protobuf.go | 432 ++++++++++++++++++ .../serializer/protobuf/protobuf_test.go | 187 ++++++++ pkg/runtime/serializer/protobuf_extension.go | 46 ++ pkg/runtime/types.go | 2 - pkg/runtime/types_proto.go | 62 +++ 9 files changed, 781 insertions(+), 177 deletions(-) delete mode 100644 pkg/runtime/protobuf/protobuf.go rename pkg/runtime/{ => serializer}/protobuf/doc.go (88%) create mode 100644 pkg/runtime/serializer/protobuf/protobuf.go create mode 100644 pkg/runtime/serializer/protobuf/protobuf_test.go create mode 100644 pkg/runtime/serializer/protobuf_extension.go create mode 100644 pkg/runtime/types_proto.go diff --git a/cmd/libs/go2idl/go-to-protobuf/protobuf/cmd.go b/cmd/libs/go2idl/go-to-protobuf/protobuf/cmd.go index f64b27bc67f..938224e3f89 100644 --- a/cmd/libs/go2idl/go-to-protobuf/protobuf/cmd.go +++ b/cmd/libs/go2idl/go-to-protobuf/protobuf/cmd.go @@ -59,12 +59,16 @@ func New() *Generator { Common: common, OutputBase: sourceTree, ProtoImport: []string{defaultProtoImport}, - Packages: `+k8s.io/kubernetes/pkg/util/intstr,` + - `+k8s.io/kubernetes/pkg/api/resource,` + - `+k8s.io/kubernetes/pkg/runtime,` + - `k8s.io/kubernetes/pkg/api/unversioned,` + - `k8s.io/kubernetes/pkg/api/v1,` + + Packages: strings.Join([]string{ + `+k8s.io/kubernetes/pkg/util/intstr`, + `+k8s.io/kubernetes/pkg/api/resource`, + `+k8s.io/kubernetes/pkg/runtime`, + `k8s.io/kubernetes/pkg/api/unversioned`, + `k8s.io/kubernetes/pkg/api/v1`, `k8s.io/kubernetes/pkg/apis/extensions/v1beta1`, + `k8s.io/kubernetes/pkg/apis/autoscaling/v1`, + `k8s.io/kubernetes/pkg/apis/batch/v1`, + }, ","), DropEmbeddedFields: "k8s.io/kubernetes/pkg/api/unversioned.TypeMeta", } } diff --git a/pkg/api/serialization_proto_test.go b/pkg/api/serialization_proto_test.go index 752a7160c93..ec21848dcc0 100644 --- a/pkg/api/serialization_proto_test.go +++ b/pkg/api/serialization_proto_test.go @@ -25,24 +25,27 @@ import ( "github.com/gogo/protobuf/proto" "k8s.io/kubernetes/pkg/api" + "k8s.io/kubernetes/pkg/api/testapi" apitesting "k8s.io/kubernetes/pkg/api/testing" + "k8s.io/kubernetes/pkg/api/unversioned" "k8s.io/kubernetes/pkg/api/v1" _ "k8s.io/kubernetes/pkg/apis/extensions" _ "k8s.io/kubernetes/pkg/apis/extensions/v1beta1" "k8s.io/kubernetes/pkg/runtime" - "k8s.io/kubernetes/pkg/runtime/protobuf" + "k8s.io/kubernetes/pkg/runtime/serializer/protobuf" "k8s.io/kubernetes/pkg/util/diff" ) func init() { - codecsToTest = append(codecsToTest, func(version string, item runtime.Object) (runtime.Codec, error) { - return protobuf.NewCodec(version, api.Scheme, api.Scheme, api.Scheme), nil + codecsToTest = append(codecsToTest, func(version unversioned.GroupVersion, item runtime.Object) (runtime.Codec, error) { + s := protobuf.NewSerializer(api.Scheme, runtime.ObjectTyperToTyper(api.Scheme)) + return api.Codecs.CodecForVersions(s, testapi.ExternalGroupVersions(), nil), nil }) } func TestProtobufRoundTrip(t *testing.T) { obj := &v1.Pod{} - apitesting.FuzzerFor(t, "v1", rand.NewSource(benchmarkSeed)).Fuzz(obj) + apitesting.FuzzerFor(t, v1.SchemeGroupVersion, rand.NewSource(benchmarkSeed)).Fuzz(obj) data, err := obj.Marshal() if err != nil { t.Fatal(err) @@ -57,6 +60,43 @@ func TestProtobufRoundTrip(t *testing.T) { } } +// BenchmarkEncodeCodec measures the cost of performing a codec encode, which includes +// reflection (to clear APIVersion and Kind) +func BenchmarkEncodeCodecProtobuf(b *testing.B) { + items := benchmarkItems() + width := len(items) + s := protobuf.NewSerializer(nil, nil) + b.ResetTimer() + for i := 0; i < b.N; i++ { + if _, err := runtime.Encode(s, &items[i%width]); err != nil { + b.Fatal(err) + } + } + b.StopTimer() +} + +// BenchmarkEncodeCodecFromInternalProtobuf measures the cost of performing a codec encode, +// including conversions and any type setting. This is a "full" encode. +func BenchmarkEncodeCodecFromInternalProtobuf(b *testing.B) { + items := benchmarkItems() + width := len(items) + encodable := make([]api.Pod, width) + for i := range items { + if err := api.Scheme.Convert(&items[i], &encodable[i]); err != nil { + b.Fatal(err) + } + } + s := protobuf.NewSerializer(nil, nil) + codec := api.Codecs.EncoderForVersion(s, v1.SchemeGroupVersion) + b.ResetTimer() + for i := 0; i < b.N; i++ { + if _, err := runtime.Encode(codec, &encodable[i%width]); err != nil { + b.Fatal(err) + } + } + b.StopTimer() +} + func BenchmarkEncodeProtobufGeneratedMarshal(b *testing.B) { items := benchmarkItems() width := len(items) diff --git a/pkg/runtime/protobuf/protobuf.go b/pkg/runtime/protobuf/protobuf.go deleted file mode 100644 index c555b124d84..00000000000 --- a/pkg/runtime/protobuf/protobuf.go +++ /dev/null @@ -1,165 +0,0 @@ -// +build proto - -/* -Copyright 2015 The Kubernetes Authors All rights reserved. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package protobuf - -import ( - "fmt" - "io" - "net/url" - "reflect" - - "github.com/gogo/protobuf/proto" - - "k8s.io/kubernetes/pkg/api/unversioned" - "k8s.io/kubernetes/pkg/runtime" -) - -// NewCodec -func NewCodec(version string, creater runtime.ObjectCreater, typer runtime.ObjectTyper, convertor runtime.ObjectConvertor) runtime.Codec { - return &codec{ - version: version, - creater: creater, - typer: typer, - convertor: convertor, - } -} - -// codec decodes protobuf objects -type codec struct { - version string - outputVersion string - creater runtime.ObjectCreater - typer runtime.ObjectTyper - convertor runtime.ObjectConvertor -} - -var _ runtime.Codec = codec{} - -func (c codec) Decode(data []byte) (runtime.Object, error) { - unknown := &runtime.Unknown{} - if err := proto.Unmarshal(data, unknown); err != nil { - return nil, err - } - obj, err := c.creater.New(unknown.APIVersion, unknown.Kind) - if err != nil { - return nil, err - } - pobj, ok := obj.(proto.Message) - if !ok { - return nil, fmt.Errorf("runtime object is not a proto.Message: %v", reflect.TypeOf(obj)) - } - if unknown.ContentType != runtime.ContentTypeProtobuf { - return nil, fmt.Errorf("unmarshal non-protobuf object with protobuf decoder") - } - if err := proto.Unmarshal(unknown.Raw, pobj); err != nil { - return nil, err - } - if unknown.APIVersion != c.outputVersion { - out, err := c.convertor.ConvertToVersion(obj, c.outputVersion) - if err != nil { - return nil, err - } - obj = out - } - return obj, nil -} - -func (c codec) DecodeToVersion(data []byte, version unversioned.GroupVersion) (runtime.Object, error) { - return nil, fmt.Errorf("unimplemented") -} - -func (c codec) DecodeInto(data []byte, obj runtime.Object) error { - version, kind, err := c.typer.ObjectVersionAndKind(obj) - if err != nil { - return err - } - unknown := &runtime.Unknown{} - if err := proto.Unmarshal(data, unknown); err != nil { - return err - } - if unknown.ContentType != runtime.ContentTypeProtobuf { - return nil, fmt.Errorf("unmarshal non-protobuf object with protobuf decoder") - } - if unknown.APIVersion == version && unknown.Kind == kind { - pobj, ok := obj.(proto.Message) - if !ok { - return fmt.Errorf("runtime object is not a proto.Message: %v", reflect.TypeOf(obj)) - } - - return proto.Unmarshal(unknown.Raw, pobj) - } - - versioned, err := c.creater.New(unknown.APIVersion, unknown.Kind) - if err != nil { - return err - } - - pobj, ok := versioned.(proto.Message) - if !ok { - return fmt.Errorf("runtime object is not a proto.Message: %v", reflect.TypeOf(obj)) - } - - if err := proto.Unmarshal(unknown.Raw, pobj); err != nil { - return err - } - return c.convertor.Convert(versioned, obj) -} - -func (c codec) DecodeIntoWithSpecifiedVersionKind(data []byte, obj runtime.Object, kind unversioned.GroupVersionKind) error { - return fmt.Errorf("unimplemented") -} - -func (c codec) DecodeParametersInto(parameters url.Values, obj runtime.Object) error { - return fmt.Errorf("unimplemented") -} - -func (c codec) Encode(obj runtime.Object) (data []byte, err error) { - version, kind, err := c.typer.ObjectVersionAndKind(obj) - if err != nil { - return nil, err - } - if len(version) == 0 { - version = c.version - converted, err := c.convertor.ConvertToVersion(obj, version) - if err != nil { - return nil, err - } - obj = converted - } - m, ok := obj.(proto.Marshaler) - if !ok { - return nil, fmt.Errorf("object %v (kind: %s in version: %s) does not implement ProtoBuf marshalling", reflect.TypeOf(obj), kind, c.version) - } - b, err := m.Marshal() - if err != nil { - return nil, err - } - return (&runtime.Unknown{ - TypeMeta: runtime.TypeMeta{ - Kind: kind, - APIVersion: version, - }, - Raw: b, - ContentType: runtime.ContentTypeProtobuf, - }).Marshal() -} - -func (c codec) EncodeToStream(obj runtime.Object, stream io.Writer) error { - return fmt.Errorf("unimplemented") -} diff --git a/pkg/runtime/protobuf/doc.go b/pkg/runtime/serializer/protobuf/doc.go similarity index 88% rename from pkg/runtime/protobuf/doc.go rename to pkg/runtime/serializer/protobuf/doc.go index 33316d0c4d8..91b86af6cdb 100644 --- a/pkg/runtime/protobuf/doc.go +++ b/pkg/runtime/serializer/protobuf/doc.go @@ -14,5 +14,5 @@ See the License for the specific language governing permissions and limitations under the License. */ -// Package protobuf implements ProtoBuf serialization and deserialization. +// Package protobuf provides a Kubernetes serializer for the protobuf format. package protobuf diff --git a/pkg/runtime/serializer/protobuf/protobuf.go b/pkg/runtime/serializer/protobuf/protobuf.go new file mode 100644 index 00000000000..a00c1389c4f --- /dev/null +++ b/pkg/runtime/serializer/protobuf/protobuf.go @@ -0,0 +1,432 @@ +// +build proto + +/* +Copyright 2015 The Kubernetes Authors All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package protobuf + +import ( + "bytes" + "fmt" + "io" + "reflect" + "sync" + + "github.com/gogo/protobuf/proto" + + "k8s.io/kubernetes/pkg/api/unversioned" + "k8s.io/kubernetes/pkg/runtime" +) + +var ( + // protoEncodingPrefix serves as a magic number for an encoded protobuf message on this serializer. All + // proto messages serialized by this schema will be preceeded by the bytes 0x6b 0x38 0x73, with the fourth + // byte being reserved for the encoding style. The only encoding style defined is 0x00, which means that + // the rest of the byte stream is a message of type k8s.io.kubernetes.pkg.runtime.Unknown (proto2). + // + // See k8s.io/kubernetes/pkg/runtime/generated.proto for details of the runtime.Unknown message. + // + // This encoding scheme is experimental, and is subject to change at any time. + protoEncodingPrefix = []byte{0x6b, 0x38, 0x73, 0x00} + + bufferSize = uint64(16384) + availableBuffers = sync.Pool{New: func() interface{} { + return make([]byte, bufferSize) + }} +) + +type errNotMarshalable struct { + t reflect.Type +} + +func (e errNotMarshalable) Error() string { + return fmt.Sprintf("object %v does not implement the protobuf marshalling interface and cannot be encoded to a protobuf message", e.t) +} + +func IsNotMarshalable(err error) bool { + _, ok := err.(errNotMarshalable) + return err != nil && ok +} + +// NewSerializer creates a Protobuf serializer that handles encoding versioned objects into the proper wire form. If a typer +// is passed, the encoded object will have group, version, and kind fields set. If typer is nil, the objects will be written +// as-is (any type info passed with the object will be used). +// +// This encoding scheme is experimental, and is subject to change at any time. +func NewSerializer(creater runtime.ObjectCreater, typer runtime.Typer, defaultContentType string) *Serializer { + return &Serializer{ + prefix: protoEncodingPrefix, + creater: creater, + typer: typer, + contentType: defaultContentType, + } +} + +type Serializer struct { + prefix []byte + creater runtime.ObjectCreater + typer runtime.Typer +} + +var _ runtime.Serializer = &Serializer{} + +// Decode attempts to convert the provided data into a protobuf message, extract the stored schema kind, apply the provided default +// gvk, and then load that data into an object matching the desired schema kind or the provided into. If into is *runtime.Unknown, +// the raw data will be extracted and no decoding will be performed. If into is not registered with the typer, then the object will +// be straight decoded using normal protobuf unmarshalling (the MarshalTo interface). If into is provided and the original data is +// not fully qualified with kind/version/group, the type of the into will be used to alter the returned gvk. On success or most +// errors, the method will return the calculated schema kind. +func (s *Serializer) Decode(originalData []byte, gvk *unversioned.GroupVersionKind, into runtime.Object) (runtime.Object, *unversioned.GroupVersionKind, error) { + if versioned, ok := into.(*runtime.VersionedObjects); ok { + into = versioned.Last() + obj, actual, err := s.Decode(originalData, gvk, into) + if err != nil { + return nil, actual, err + } + // the last item in versioned becomes into, so if versioned was not originally empty we reset the object + // array so the first position is the decoded object and the second position is the outermost object. + // if there were no objects in the versioned list passed to us, only add ourselves. + if into != nil && into != obj { + versioned.Objects = []runtime.Object{obj, into} + } else { + versioned.Objects = []runtime.Object{obj} + } + return versioned, actual, err + } + + prefixLen := len(s.prefix) + switch { + case len(originalData) == 0: + // TODO: treat like decoding {} from JSON with defaulting + return nil, nil, fmt.Errorf("empty data") + case len(originalData) < prefixLen || !bytes.Equal(s.prefix, originalData[:prefixLen]): + return nil, nil, fmt.Errorf("provided data does not appear to be a protobuf message, expected prefix %v", s.prefix) + case len(originalData) == prefixLen: + // TODO: treat like decoding {} from JSON with defaulting + return nil, nil, fmt.Errorf("empty body") + } + + data := originalData[prefixLen:] + unk := runtime.Unknown{} + if err := unk.Unmarshal(data); err != nil { + return nil, nil, err + } + + actual := unk.GroupVersionKind() + copyKindDefaults(actual, gvk) + + if intoUnknown, ok := into.(*runtime.Unknown); ok && intoUnknown != nil { + *intoUnknown = unk + if len(intoUnknown.ContentType) == 0 { + intoUnknown.ContentType = s.contentType + } + return intoUnknown, actual, nil + } + + if into != nil { + typed, _, err := s.typer.ObjectKind(into) + switch { + case runtime.IsNotRegisteredError(err): + pb, ok := into.(proto.Message) + if !ok { + return nil, actual, errNotMarshalable{reflect.TypeOf(into)} + } + if err := proto.Unmarshal(unk.Raw, pb); err != nil { + return nil, actual, err + } + return into, actual, nil + case err != nil: + return nil, actual, err + default: + copyKindDefaults(actual, typed) + // if the result of defaulting did not set a version or group, ensure that at least group is set + // (copyKindDefaults will not assign Group if version is already set). This guarantees that the group + // of into is set if there is no better information from the caller or object. + if len(actual.Version) == 0 && len(actual.Group) == 0 { + actual.Group = typed.Group + } + } + } + + if len(actual.Kind) == 0 { + return nil, actual, runtime.NewMissingKindErr(fmt.Sprintf("%#v", unk.TypeMeta)) + } + if len(actual.Version) == 0 { + return nil, actual, runtime.NewMissingVersionErr(fmt.Sprintf("%#v", unk.TypeMeta)) + } + + return unmarshalToObject(s.typer, s.creater, actual, into) +} + +// EncodeToStream serializes the provided object to the given writer. Overrides is ignored. +func (s *Serializer) EncodeToStream(obj runtime.Object, w io.Writer, overrides ...unversioned.GroupVersion) error { + var unk runtime.Unknown + if kind := obj.GetObjectKind().GroupVersionKind(); kind != nil { + unk = runtime.Unknown{ + TypeMeta: runtime.TypeMeta{ + Kind: kind.Kind, + APIVersion: kind.GroupVersion().String(), + }, + } + } + + prefixSize := uint64(len(s.prefix)) + + switch t := obj.(type) { + case bufferedMarshaller: + // this path performs a single allocation during write but requires the caller to implement + // the more efficient Size and MarshalTo methods + encodedSize := uint64(t.Size()) + estimatedSize := prefixSize + estimateUnknownSize(&unk, encodedSize) + data := make([]byte, estimatedSize) + + i, err := unk.NestedMarshalTo(data[prefixSize:], t, encodedSize) + if err != nil { + return err + } + + copy(data, s.prefix) + + _, err = w.Write(data[:prefixSize+uint64(i)]) + return err + + case proto.Marshaler: + // this path performs extra allocations + data, err := t.Marshal() + if err != nil { + return err + } + unk.Raw = data + + estimatedSize := prefixSize + uint64(unk.Size()) + data = make([]byte, estimatedSize) + + i, err := unk.MarshalTo(data[prefixSize:]) + if err != nil { + return err + } + + copy(data, s.prefix) + + _, err = w.Write(data[:prefixSize+uint64(i)]) + return err + + default: + // TODO: marshal with a different content type and serializer (JSON for third party objects) + return errNotMarshalable{reflect.TypeOf(obj)} + } +} + +// RecognizesData implements the RecognizingDecoder interface. +func (s *Serializer) RecognizesData(peek io.Reader) (bool, error) { + prefix := make([]byte, 4) + n, err := peek.Read(prefix) + if err != nil { + if err == io.EOF { + return false, nil + } + return false, err + } + if n != 4 { + return false, nil + } + return bytes.Equal(s.prefix, prefix), nil +} + +// copyKindDefaults defaults dst to the value in src if dst does not have a value set. +func copyKindDefaults(dst, src *unversioned.GroupVersionKind) { + if src == nil { + return + } + // apply kind and version defaulting from provided default + if len(dst.Kind) == 0 { + dst.Kind = src.Kind + } + if len(dst.Version) == 0 && len(src.Version) > 0 { + dst.Group = src.Group + dst.Version = src.Version + } +} + +// bufferedMarshaller describes a more efficient marshalling interface that can avoid allocating multiple +// byte buffers by pre-calculating the size of the final buffer needed. +type bufferedMarshaller interface { + proto.Sizer + runtime.ProtobufMarshaller +} + +// estimateUnknownSize returns the expected bytes consumed by a given runtime.Unknown +// object with a nil RawJSON struct and the expected size of the provided buffer. The +// returned size will not be correct if RawJSOn is set on unk. +func estimateUnknownSize(unk *runtime.Unknown, byteSize uint64) uint64 { + size := uint64(unk.Size()) + // protobuf uses 1 byte for the tag, a varint for the length of the array (at most 8 bytes - uint64 - here), + // and the size of the array. + size += 1 + 8 + byteSize + return size +} + +// NewRawSerializer creates a Protobuf serializer that handles encoding versioned objects into the proper wire form. If typer +// is not nil, the object has the group, version, and kind fields set. This serializer does not provide type information for the +// encoded object, and thus is not self describing (callers must know what type is being described in order to decode). +// +// This encoding scheme is experimental, and is subject to change at any time. +func NewRawSerializer(creater runtime.ObjectCreater, typer runtime.Typer, defaultContentType string) *RawSerializer { + return &RawSerializer{ + creater: creater, + typer: typer, + contentType: defaultContentType, + } +} + +// RawSerializer encodes and decodes objects without adding a runtime.Unknown wrapper (objects are encoded without identifying +// type). +type RawSerializer struct { + creater runtime.ObjectCreater + typer runtime.Typer + contentType string +} + +var _ runtime.Serializer = &RawSerializer{} + +// Decode attempts to convert the provided data into a protobuf message, extract the stored schema kind, apply the provided default +// gvk, and then load that data into an object matching the desired schema kind or the provided into. If into is *runtime.Unknown, +// the raw data will be extracted and no decoding will be performed. If into is not registered with the typer, then the object will +// be straight decoded using normal protobuf unmarshalling (the MarshalTo interface). If into is provided and the original data is +// not fully qualified with kind/version/group, the type of the into will be used to alter the returned gvk. On success or most +// errors, the method will return the calculated schema kind. +func (s *RawSerializer) Decode(originalData []byte, gvk *unversioned.GroupVersionKind, into runtime.Object) (runtime.Object, *unversioned.GroupVersionKind, error) { + if into == nil { + return nil, nil, fmt.Errorf("this serializer requires an object to decode into: %#v", s) + } + + if versioned, ok := into.(*runtime.VersionedObjects); ok { + into = versioned.Last() + obj, actual, err := s.Decode(originalData, gvk, into) + if err != nil { + return nil, actual, err + } + if into != nil && into != obj { + versioned.Objects = []runtime.Object{obj, into} + } else { + versioned.Objects = []runtime.Object{obj} + } + return versioned, actual, err + } + + if len(originalData) == 0 { + // TODO: treat like decoding {} from JSON with defaulting + return nil, nil, fmt.Errorf("empty data") + } + data := originalData + + actual := &unversioned.GroupVersionKind{} + copyKindDefaults(actual, gvk) + + if intoUnknown, ok := into.(*runtime.Unknown); ok && intoUnknown != nil { + intoUnknown.Raw = data + intoUnknown.ContentEncoding = "" + intoUnknown.ContentType = s.contentType + intoUnknown.SetGroupVersionKind(actual) + return intoUnknown, actual, nil + } + + typed, _, err := s.typer.ObjectKind(into) + switch { + case runtime.IsNotRegisteredError(err): + pb, ok := into.(proto.Message) + if !ok { + return nil, actual, errNotMarshalable{reflect.TypeOf(into)} + } + if err := proto.Unmarshal(data, pb); err != nil { + return nil, actual, err + } + return into, actual, nil + case err != nil: + return nil, actual, err + default: + copyKindDefaults(actual, typed) + // if the result of defaulting did not set a version or group, ensure that at least group is set + // (copyKindDefaults will not assign Group if version is already set). This guarantees that the group + // of into is set if there is no better information from the caller or object. + if len(actual.Version) == 0 && len(actual.Group) == 0 { + actual.Group = typed.Group + } + } + + if len(actual.Kind) == 0 { + return nil, actual, runtime.NewMissingKindErr("") + } + if len(actual.Version) == 0 { + return nil, actual, runtime.NewMissingVersionErr("") + } + + return unmarshalToObject(s.typer, s.creater, actual, into) +} + +// unmarshalToObject is the common code between decode in the raw and normal serializer. +func unmarshalToObject(typer runtime.Typer, creater runtime.ObjectCreater, actual *unversioned.GroupVersionKind, into runtime.Object) (runtime.Object, *unversioned.GroupVersionKind, error) { + // use the target if necessary + obj, err := runtime.UseOrCreateObject(s.typer, s.creater, *actual, into) + if err != nil { + return nil, actual, err + } + + pb, ok := obj.(proto.Message) + if !ok { + return nil, actual, errNotMarshalable{reflect.TypeOf(obj)} + } + if err := proto.Unmarshal(data, pb); err != nil { + return nil, actual, err + } + return obj, actual, nil +} + +// EncodeToStream serializes the provided object to the given writer. Overrides is ignored. +func (s *RawSerializer) EncodeToStream(obj runtime.Object, w io.Writer, overrides ...unversioned.GroupVersion) error { + switch t := obj.(type) { + case bufferedMarshaller: + // this path performs a single allocation during write but requires the caller to implement + // the more efficient Size and MarshalTo methods + encodedSize := uint64(t.Size()) + data := make([]byte, encodedSize) + + n, err := t.MarshalTo(data) + if err != nil { + return err + } + _, err = w.Write(data[:n]) + return err + + case proto.Marshaler: + // this path performs extra allocations + data, err := t.Marshal() + if err != nil { + return err + } + _, err = w.Write(data) + return err + + default: + return errNotMarshalable{reflect.TypeOf(obj)} + } +} + +// RecognizesData implements the RecognizingDecoder interface - objects encoded with this serializer +// have no innate identifying information and so cannot be recognized. +func (s *RawSerializer) RecognizesData(peek io.Reader) (bool, error) { + return false, nil +} diff --git a/pkg/runtime/serializer/protobuf/protobuf_test.go b/pkg/runtime/serializer/protobuf/protobuf_test.go new file mode 100644 index 00000000000..023fd6eb86c --- /dev/null +++ b/pkg/runtime/serializer/protobuf/protobuf_test.go @@ -0,0 +1,187 @@ +// +build proto + +/* +Copyright 2015 The Kubernetes Authors All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package protobuf_test + +import ( + "bytes" + "encoding/hex" + "fmt" + "testing" + + "k8s.io/kubernetes/pkg/api/unversioned" + "k8s.io/kubernetes/pkg/runtime" + "k8s.io/kubernetes/pkg/runtime/serializer/protobuf" +) + +type testObject struct { + gvk *unversioned.GroupVersionKind +} + +func (d *testObject) GetObjectKind() unversioned.ObjectKind { return d } +func (d *testObject) SetGroupVersionKind(gvk *unversioned.GroupVersionKind) { d.gvk = gvk } +func (d *testObject) GroupVersionKind() *unversioned.GroupVersionKind { return d.gvk } + +type testMarshalable struct { + testObject + data []byte + err error +} + +func (d *testMarshalable) Marshal() ([]byte, error) { + return d.data, d.err +} + +type testBufferedMarshalable struct { + testObject + data []byte + err error +} + +func (d *testBufferedMarshalable) Marshal() ([]byte, error) { + return nil, fmt.Errorf("not invokable") +} + +func (d *testBufferedMarshalable) MarshalTo(data []byte) (int, error) { + copy(data, d.data) + return len(d.data), d.err +} + +func (d *testBufferedMarshalable) Size() int { + return len(d.data) +} + +func TestRecognize(t *testing.T) { + s := protobuf.NewSerializer(nil, nil, "application/protobuf") + ignores := [][]byte{ + nil, + {}, + []byte("k8s"), + {0x6b, 0x38, 0x73, 0x01}, + } + for i, data := range ignores { + if ok, err := s.RecognizesData(bytes.NewBuffer(data)); err != nil || ok { + t.Errorf("%d: should not recognize data: %v", i, err) + } + } + recognizes := [][]byte{ + {0x6b, 0x38, 0x73, 0x00}, + {0x6b, 0x38, 0x73, 0x00, 0x01}, + } + for i, data := range recognizes { + if ok, err := s.RecognizesData(bytes.NewBuffer(data)); err != nil || !ok { + t.Errorf("%d: should recognize data: %v", i, err) + } + } +} + +func TestEncode(t *testing.T) { + obj1 := &testMarshalable{testObject: testObject{}, data: []byte{}} + wire1 := []byte{ + 0x6b, 0x38, 0x73, 0x00, // prefix + 0x0a, 0x04, + 0x0a, 0x00, // apiversion + 0x12, 0x00, // kind + 0x12, 0x00, // data + 0x1a, 0x00, // content-type + 0x22, 0x00, // content-encoding + } + obj2 := &testMarshalable{ + testObject: testObject{gvk: &unversioned.GroupVersionKind{Kind: "test", Group: "other", Version: "version"}}, + data: []byte{0x01, 0x02, 0x03}, + } + wire2 := []byte{ + 0x6b, 0x38, 0x73, 0x00, // prefix + 0x0a, 0x15, + 0x0a, 0x0d, 0x6f, 0x74, 0x68, 0x65, 0x72, 0x2f, 0x76, 0x65, 0x72, 0x73, 0x69, 0x6f, 0x6e, // apiversion + 0x12, 0x04, 0x74, 0x65, 0x73, 0x74, // kind + 0x12, 0x03, 0x01, 0x02, 0x03, // data + 0x1a, 0x00, // content-type + 0x22, 0x00, // content-encoding + } + + err1 := fmt.Errorf("a test error") + + testCases := []struct { + obj runtime.Object + data []byte + errFn func(error) bool + }{ + { + obj: &testObject{}, + errFn: protobuf.IsNotMarshalable, + }, + { + obj: obj1, + data: wire1, + }, + { + obj: &testMarshalable{testObject: obj1.testObject, err: err1}, + errFn: func(err error) bool { return err == err1 }, + }, + { + // if this test fails, writing the "fast path" marshal is not the same as the "slow path" + obj: &testBufferedMarshalable{testObject: obj1.testObject, data: obj1.data}, + data: wire1, + }, + { + obj: obj2, + data: wire2, + }, + { + // if this test fails, writing the "fast path" marshal is not the same as the "slow path" + obj: &testBufferedMarshalable{testObject: obj2.testObject, data: obj2.data}, + data: wire2, + }, + { + obj: &testBufferedMarshalable{testObject: obj1.testObject, err: err1}, + errFn: func(err error) bool { return err == err1 }, + }, + } + + for i, test := range testCases { + s := protobuf.NewSerializer(nil, nil, "application/protobuf") + data, err := runtime.Encode(s, test.obj) + + switch { + case err == nil && test.errFn != nil: + t.Errorf("%d: failed: %v", i, err) + continue + case err != nil && test.errFn == nil: + t.Errorf("%d: failed: %v", i, err) + continue + case err != nil: + if !test.errFn(err) { + t.Errorf("%d: failed: %v", i, err) + } + if data != nil { + t.Errorf("%d: should not have returned nil data", i) + } + continue + } + + if test.data != nil && !bytes.Equal(test.data, data) { + t.Errorf("%d: unexpected data:\n%s", i, hex.Dump(data)) + continue + } + + if ok, err := s.RecognizesData(bytes.NewBuffer(data)); !ok || err != nil { + t.Errorf("%d: did not recognize data generated by call: %v", i, err) + } + } +} diff --git a/pkg/runtime/serializer/protobuf_extension.go b/pkg/runtime/serializer/protobuf_extension.go new file mode 100644 index 00000000000..1f55df27eda --- /dev/null +++ b/pkg/runtime/serializer/protobuf_extension.go @@ -0,0 +1,46 @@ +// +build proto + +/* +Copyright 2014 The Kubernetes Authors All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package serializer + +import ( + "k8s.io/kubernetes/pkg/runtime" + "k8s.io/kubernetes/pkg/runtime/serializer/protobuf" +) + +// contentTypeProtobuf is the protobuf type exposed for Kubernetes. It is private to prevent others from +// depending on it unintentionally. +// TODO: potentially move to pkg/api (since it's part of the Kube public API) and pass it in to the +// CodecFactory on initialization. +const contentTypeProtobuf = "application/vnd.kubernetes.protobuf" + +func protobufSerializer(scheme *runtime.Scheme) (serializerType, bool) { + serializer := protobuf.NewSerializer(scheme, runtime.ObjectTyperToTyper(scheme), contentTypeProtobuf) + raw := protobuf.NewRawSerializer(scheme, runtime.ObjectTyperToTyper(scheme), contentTypeProtobuf) + return serializerType{ + AcceptContentTypes: []string{contentTypeProtobuf}, + ContentType: contentTypeProtobuf, + FileExtensions: []string{"pb"}, + Serializer: serializer, + RawSerializer: raw, + }, true +} + +func init() { + serializerExtensions = append(serializerExtensions, protobufSerializer) +} diff --git a/pkg/runtime/types.go b/pkg/runtime/types.go index 5a3f3fdb251..9b1f8301dcc 100644 --- a/pkg/runtime/types.go +++ b/pkg/runtime/types.go @@ -38,8 +38,6 @@ type TypeMeta struct { const ( ContentTypeJSON string = "application/json" - // TODO: Fix the value. - ContentTypeProtobuf string = "application/protobuf" ) // RawExtension is used to hold extensions in external versions. diff --git a/pkg/runtime/types_proto.go b/pkg/runtime/types_proto.go new file mode 100644 index 00000000000..dd9a288c42d --- /dev/null +++ b/pkg/runtime/types_proto.go @@ -0,0 +1,62 @@ +// +build proto + +/* +Copyright 2015 The Kubernetes Authors All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package runtime + +type ProtobufMarshaller interface { + MarshalTo(data []byte) (int, error) +} + +// NestedMarshalTo allows a caller to avoid extra allocations during serialization of an Unknown +// that will contain an object that implements ProtobufMarshaller. +func (m *Unknown) NestedMarshalTo(data []byte, b ProtobufMarshaller, size uint64) (int, error) { + var i int + _ = i + var l int + _ = l + data[i] = 0xa + i++ + i = encodeVarintGenerated(data, i, uint64(m.TypeMeta.Size())) + n1, err := m.TypeMeta.MarshalTo(data[i:]) + if err != nil { + return 0, err + } + i += n1 + + if b != nil { + data[i] = 0x12 + i++ + i = encodeVarintGenerated(data, i, size) + n2, err := b.MarshalTo(data[i:]) + if err != nil { + return 0, err + } + i += n2 + } + + data[i] = 0x1a + i++ + i = encodeVarintGenerated(data, i, uint64(len(m.ContentEncoding))) + i += copy(data[i:], m.ContentEncoding) + + data[i] = 0x22 + i++ + i = encodeVarintGenerated(data, i, uint64(len(m.ContentType))) + i += copy(data[i:], m.ContentType) + return i, nil +}