provides EncodeWithAllocator method for the protobuf encoder

The new method allows for providing a memory allocator for efficient memory usage during object serialization.
This commit is contained in:
Lukasz Szaszkiewicz 2022-02-17 16:05:45 +01:00
parent 81cf096751
commit 32ca2b881d
2 changed files with 165 additions and 23 deletions

View File

@ -30,6 +30,7 @@ import (
"k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/runtime/serializer/recognizer" "k8s.io/apimachinery/pkg/runtime/serializer/recognizer"
"k8s.io/apimachinery/pkg/util/framer" "k8s.io/apimachinery/pkg/util/framer"
"k8s.io/klog/v2"
) )
var ( var (
@ -86,6 +87,7 @@ type Serializer struct {
} }
var _ runtime.Serializer = &Serializer{} var _ runtime.Serializer = &Serializer{}
var _ runtime.EncoderWithAllocator = &Serializer{}
var _ recognizer.RecognizingDecoder = &Serializer{} var _ recognizer.RecognizingDecoder = &Serializer{}
const serializerIdentifier runtime.Identifier = "protobuf" const serializerIdentifier runtime.Identifier = "protobuf"
@ -161,22 +163,36 @@ func (s *Serializer) Decode(originalData []byte, gvk *schema.GroupVersionKind, i
return unmarshalToObject(s.typer, s.creater, &actual, into, unk.Raw) return unmarshalToObject(s.typer, s.creater, &actual, into, unk.Raw)
} }
// Encode serializes the provided object to the given writer. // EncodeWithAllocator writes an object to the provided writer.
func (s *Serializer) Encode(obj runtime.Object, w io.Writer) error { // In addition, it allows for providing a memory allocator for efficient memory usage during object serialization.
if co, ok := obj.(runtime.CacheableObject); ok { func (s *Serializer) EncodeWithAllocator(obj runtime.Object, w io.Writer, memAlloc runtime.MemoryAllocator) error {
return co.CacheEncode(s.Identifier(), s.doEncode, w) return s.encode(obj, w, memAlloc)
}
return s.doEncode(obj, w)
} }
func (s *Serializer) doEncode(obj runtime.Object, w io.Writer) error { // Encode serializes the provided object to the given writer.
func (s *Serializer) Encode(obj runtime.Object, w io.Writer) error {
return s.encode(obj, w, &runtime.SimpleAllocator{})
}
func (s *Serializer) encode(obj runtime.Object, w io.Writer, memAlloc runtime.MemoryAllocator) error {
if co, ok := obj.(runtime.CacheableObject); ok {
return co.CacheEncode(s.Identifier(), func(obj runtime.Object, w io.Writer) error { return s.doEncode(obj, w, memAlloc) }, w)
}
return s.doEncode(obj, w, memAlloc)
}
func (s *Serializer) doEncode(obj runtime.Object, w io.Writer, memAlloc runtime.MemoryAllocator) error {
if memAlloc == nil {
klog.Error("a mandatory memory allocator wasn't provided, this might have a negative impact on performance, check invocations of EncodeWithAllocator method, falling back on runtime.SimpleAllocator")
memAlloc = &runtime.SimpleAllocator{}
}
prefixSize := uint64(len(s.prefix)) prefixSize := uint64(len(s.prefix))
var unk runtime.Unknown var unk runtime.Unknown
switch t := obj.(type) { switch t := obj.(type) {
case *runtime.Unknown: case *runtime.Unknown:
estimatedSize := prefixSize + uint64(t.Size()) estimatedSize := prefixSize + uint64(t.Size())
data := make([]byte, estimatedSize) data := memAlloc.Allocate(estimatedSize)
i, err := t.MarshalTo(data[prefixSize:]) i, err := t.MarshalTo(data[prefixSize:])
if err != nil { if err != nil {
return err return err
@ -196,11 +212,11 @@ func (s *Serializer) doEncode(obj runtime.Object, w io.Writer) error {
switch t := obj.(type) { switch t := obj.(type) {
case bufferedMarshaller: case bufferedMarshaller:
// this path performs a single allocation during write but requires the caller to implement // this path performs a single allocation during write only when the Allocator wasn't provided
// the more efficient Size and MarshalToSizedBuffer methods // it also requires the caller to implement the more efficient Size and MarshalToSizedBuffer methods
encodedSize := uint64(t.Size()) encodedSize := uint64(t.Size())
estimatedSize := prefixSize + estimateUnknownSize(&unk, encodedSize) estimatedSize := prefixSize + estimateUnknownSize(&unk, encodedSize)
data := make([]byte, estimatedSize) data := memAlloc.Allocate(estimatedSize)
i, err := unk.NestedMarshalTo(data[prefixSize:], t, encodedSize) i, err := unk.NestedMarshalTo(data[prefixSize:], t, encodedSize)
if err != nil { if err != nil {
@ -221,7 +237,7 @@ func (s *Serializer) doEncode(obj runtime.Object, w io.Writer) error {
unk.Raw = data unk.Raw = data
estimatedSize := prefixSize + uint64(unk.Size()) estimatedSize := prefixSize + uint64(unk.Size())
data = make([]byte, estimatedSize) data = memAlloc.Allocate(estimatedSize)
i, err := unk.MarshalTo(data[prefixSize:]) i, err := unk.MarshalTo(data[prefixSize:])
if err != nil { if err != nil {
@ -395,19 +411,33 @@ func unmarshalToObject(typer runtime.ObjectTyper, creater runtime.ObjectCreater,
// Encode serializes the provided object to the given writer. Overrides is ignored. // Encode serializes the provided object to the given writer. Overrides is ignored.
func (s *RawSerializer) Encode(obj runtime.Object, w io.Writer) error { func (s *RawSerializer) Encode(obj runtime.Object, w io.Writer) error {
if co, ok := obj.(runtime.CacheableObject); ok { return s.encode(obj, w, &runtime.SimpleAllocator{})
return co.CacheEncode(s.Identifier(), s.doEncode, w)
}
return s.doEncode(obj, w)
} }
func (s *RawSerializer) doEncode(obj runtime.Object, w io.Writer) error { // EncodeWithAllocator writes an object to the provided writer.
// In addition, it allows for providing a memory allocator for efficient memory usage during object serialization.
func (s *RawSerializer) EncodeWithAllocator(obj runtime.Object, w io.Writer, memAlloc runtime.MemoryAllocator) error {
return s.encode(obj, w, memAlloc)
}
func (s *RawSerializer) encode(obj runtime.Object, w io.Writer, memAlloc runtime.MemoryAllocator) error {
if co, ok := obj.(runtime.CacheableObject); ok {
return co.CacheEncode(s.Identifier(), func(obj runtime.Object, w io.Writer) error { return s.doEncode(obj, w, memAlloc) }, w)
}
return s.doEncode(obj, w, memAlloc)
}
func (s *RawSerializer) doEncode(obj runtime.Object, w io.Writer, memAlloc runtime.MemoryAllocator) error {
if memAlloc == nil {
klog.Error("a mandatory memory allocator wasn't provided, this might have a negative impact on performance, check invocations of EncodeWithAllocator method, falling back on runtime.SimpleAllocator")
memAlloc = &runtime.SimpleAllocator{}
}
switch t := obj.(type) { switch t := obj.(type) {
case bufferedReverseMarshaller: case bufferedReverseMarshaller:
// this path performs a single allocation during write but requires the caller to implement // this path performs a single allocation during write only when the Allocator wasn't provided
// the more efficient Size and MarshalToSizedBuffer methods // it also requires the caller to implement the more efficient Size and MarshalToSizedBuffer methods
encodedSize := uint64(t.Size()) encodedSize := uint64(t.Size())
data := make([]byte, encodedSize) data := memAlloc.Allocate(encodedSize)
n, err := t.MarshalToSizedBuffer(data) n, err := t.MarshalToSizedBuffer(data)
if err != nil { if err != nil {
@ -417,10 +447,10 @@ func (s *RawSerializer) doEncode(obj runtime.Object, w io.Writer) error {
return err return err
case bufferedMarshaller: case bufferedMarshaller:
// this path performs a single allocation during write but requires the caller to implement // this path performs a single allocation during write only when the Allocator wasn't provided
// the more efficient Size and MarshalTo methods // it also requires the caller to implement the more efficient Size and MarshalTo methods
encodedSize := uint64(t.Size()) encodedSize := uint64(t.Size())
data := make([]byte, encodedSize) data := memAlloc.Allocate(encodedSize)
n, err := t.MarshalTo(data) n, err := t.MarshalTo(data)
if err != nil { if err != nil {

View File

@ -17,8 +17,12 @@ limitations under the License.
package protobuf package protobuf
import ( import (
"bytes"
"reflect"
"testing" "testing"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
testapigroupv1 "k8s.io/apimachinery/pkg/apis/testapigroup/v1"
"k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apimachinery/pkg/runtime/schema"
runtimetesting "k8s.io/apimachinery/pkg/runtime/testing" runtimetesting "k8s.io/apimachinery/pkg/runtime/testing"
@ -66,3 +70,111 @@ func (t *mockTyper) ObjectKinds(obj runtime.Object) ([]schema.GroupVersionKind,
func (t *mockTyper) Recognizes(_ schema.GroupVersionKind) bool { func (t *mockTyper) Recognizes(_ schema.GroupVersionKind) bool {
return false return false
} }
func TestSerializerEncodeWithAllocator(t *testing.T) {
testCases := []struct {
name string
obj runtime.Object
}{
{
name: "encode a bufferedMarshaller obj",
obj: &testapigroupv1.Carp{
TypeMeta: metav1.TypeMeta{APIVersion: "group/version", Kind: "Carp"},
ObjectMeta: metav1.ObjectMeta{
Name: "name",
Namespace: "namespace",
},
Spec: testapigroupv1.CarpSpec{
Subdomain: "carp.k8s.io",
},
},
},
{
name: "encode a runtime.Unknown obj",
obj: &runtime.Unknown{TypeMeta: runtime.TypeMeta{APIVersion: "group/version", Kind: "Unknown"}, Raw: []byte("hello world")},
},
}
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
target := NewSerializer(nil, nil)
writer := &bytes.Buffer{}
if err := target.Encode(tc.obj, writer); err != nil {
t.Fatal(err)
}
writer2 := &bytes.Buffer{}
alloc := &testAllocator{}
if err := target.EncodeWithAllocator(tc.obj, writer2, alloc); err != nil {
t.Fatal(err)
}
if alloc.allocateCount != 1 {
t.Fatalf("expected the Allocate method to be called exactly 1 but it was executed: %v times ", alloc.allocateCount)
}
// to ensure compatibility of the new method with the old one, serialized data must be equal
// also we are not testing decoding since "roundtripping" is tested elsewhere for all known types
if !reflect.DeepEqual(writer.Bytes(), writer2.Bytes()) {
t.Fatal("data mismatch, data serialized with the Encode method is different than serialized with the EncodeWithAllocator method")
}
})
}
}
func TestRawSerializerEncodeWithAllocator(t *testing.T) {
testCases := []struct {
name string
obj runtime.Object
}{
{
name: "encode a bufferedReverseMarshaller obj",
obj: &testapigroupv1.Carp{
TypeMeta: metav1.TypeMeta{APIVersion: "group/version", Kind: "Carp"},
ObjectMeta: metav1.ObjectMeta{
Name: "name",
Namespace: "namespace",
},
Spec: testapigroupv1.CarpSpec{
Subdomain: "carp.k8s.io",
},
},
},
}
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
writer := &bytes.Buffer{}
target := NewRawSerializer(nil, nil)
if err := target.Encode(tc.obj, writer); err != nil {
t.Fatal(err)
}
writer2 := &bytes.Buffer{}
alloc := &testAllocator{}
if err := target.EncodeWithAllocator(tc.obj, writer2, alloc); err != nil {
t.Fatal(err)
}
if alloc.allocateCount != 1 {
t.Fatalf("expected the Allocate method to be called exactly 1 but it was executed: %v times ", alloc.allocateCount)
}
// to ensure compatibility of the new method with the old one, serialized data must be equal
// also we are not testing decoding since "roundtripping" is tested elsewhere for all known types
if !reflect.DeepEqual(writer.Bytes(), writer2.Bytes()) {
t.Fatal("data mismatch, data serialized with the Encode method is different than serialized with the EncodeWithAllocator method")
}
})
}
}
type testAllocator struct {
buf []byte
allocateCount int
}
func (ta *testAllocator) Allocate(n uint64) []byte {
ta.buf = make([]byte, n, n)
ta.allocateCount++
return ta.buf
}