diff --git a/staging/src/k8s.io/apimachinery/pkg/runtime/allocator.go b/staging/src/k8s.io/apimachinery/pkg/runtime/allocator.go new file mode 100644 index 00000000000..0d00d8c3a3b --- /dev/null +++ b/staging/src/k8s.io/apimachinery/pkg/runtime/allocator.go @@ -0,0 +1,74 @@ +/* +Copyright 2022 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package runtime + +import ( + "sync" +) + +// AllocatorPool simply stores Allocator objects to avoid additional memory allocations +// by caching created but unused items for later reuse, relieving pressure on the garbage collector. +// +// Usage: +// memoryAllocator := runtime.AllocatorPool.Get().(*runtime.Allocator) +// defer runtime.AllocatorPool.Put(memoryAllocator) +// +// A note for future: +// consider introducing multiple pools for storing buffers of different sizes +// perhaps this could allow us to be more efficient. +var AllocatorPool = sync.Pool{ + New: func() interface{} { + return &Allocator{} + }, +} + +// Allocator knows how to allocate memory +// It exists to make the cost of object serialization cheaper. +// In some cases, it allows for allocating memory only once and then reusing it. +// This approach puts less load on GC and leads to less fragmented memory in general. +type Allocator struct { + buf []byte +} + +var _ MemoryAllocator = &Allocator{} + +// Allocate reserves memory for n bytes only if the underlying array doesn't have enough capacity +// otherwise it returns previously allocated block of memory. +// +// Note that the returned array is not zeroed, it is the caller's +// responsibility to clean the memory if needed. +func (a *Allocator) Allocate(n uint64) []byte { + if uint64(cap(a.buf)) >= n { + a.buf = a.buf[:n] + return a.buf + } + // grow the buffer + size := uint64(2*cap(a.buf)) + n + a.buf = make([]byte, size, size) + a.buf = a.buf[:n] + return a.buf +} + +// SimpleAllocator a wrapper around make([]byte) +// conforms to the MemoryAllocator interface +type SimpleAllocator struct{} + +var _ MemoryAllocator = &SimpleAllocator{} + +func (sa *SimpleAllocator) Allocate(n uint64) []byte { + return make([]byte, n, n) +} diff --git a/staging/src/k8s.io/apimachinery/pkg/runtime/allocator_test.go b/staging/src/k8s.io/apimachinery/pkg/runtime/allocator_test.go new file mode 100644 index 00000000000..067a5dda5e4 --- /dev/null +++ b/staging/src/k8s.io/apimachinery/pkg/runtime/allocator_test.go @@ -0,0 +1,78 @@ +/* +Copyright 2022 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package runtime + +import ( + "math/rand" + "testing" +) + +func TestAllocatorRandomInputs(t *testing.T) { + maxBytes := 5 * 1000000 // 5 MB + iterations := rand.Intn(10000) + 10 + target := &Allocator{} + + for i := 0; i < iterations; i++ { + bytesToAllocate := rand.Intn(maxBytes) + buff := target.Allocate(uint64(bytesToAllocate)) + if cap(buff) < bytesToAllocate { + t.Fatalf("expected the buffer to allocate: %v bytes whereas it allocated: %v bytes", bytesToAllocate, cap(buff)) + } + if len(buff) != bytesToAllocate { + t.Fatalf("unexpected length of the buffer, expected: %v, got: %v", bytesToAllocate, len(buff)) + } + } +} + +func TestAllocatorNeverShrinks(t *testing.T) { + target := &Allocator{} + initialSize := 1000000 // 1MB + initialBuff := target.Allocate(uint64(initialSize)) + if cap(initialBuff) < initialSize { + t.Fatalf("unexpected size of the buffer, expected at least 1MB, got: %v", cap(initialBuff)) + } + + for i := initialSize; i > 0; i = i / 10 { + newBuff := target.Allocate(uint64(i)) + if cap(newBuff) < initialSize { + t.Fatalf("allocator is now allowed to shrink memory") + } + if len(newBuff) != i { + t.Fatalf("unexpected length of the buffer, expected: %v, got: %v", i, len(newBuff)) + } + } +} + +func TestAllocatorZero(t *testing.T) { + target := &Allocator{} + initialSize := 1000000 // 1MB + buff := target.Allocate(uint64(initialSize)) + if cap(buff) < initialSize { + t.Fatalf("unexpected size of the buffer, expected at least 1MB, got: %v", cap(buff)) + } + if len(buff) != initialSize { + t.Fatalf("unexpected length of the buffer, expected: %v, got: %v", initialSize, len(buff)) + } + + buff = target.Allocate(0) + if cap(buff) < initialSize { + t.Fatalf("unexpected size of the buffer, expected at least 1MB, got: %v", cap(buff)) + } + if len(buff) != 0 { + t.Fatalf("unexpected length of the buffer, expected: 0, got: %v", len(buff)) + } +} diff --git a/staging/src/k8s.io/apimachinery/pkg/runtime/interfaces.go b/staging/src/k8s.io/apimachinery/pkg/runtime/interfaces.go index bd4f22b1bc8..710a977952f 100644 --- a/staging/src/k8s.io/apimachinery/pkg/runtime/interfaces.go +++ b/staging/src/k8s.io/apimachinery/pkg/runtime/interfaces.go @@ -69,6 +69,24 @@ type Encoder interface { Identifier() Identifier } +// MemoryAllocator is responsible for allocating memory. +// By encapsulating memory allocation into its own interface, we can reuse the memory +// across many operations in places we know it can significantly improve the performance. +type MemoryAllocator interface { + // Allocate reserves memory for n bytes. + // Note that implementations of this method are not required to zero the returned array. + // It is the caller's responsibility to clean the memory if needed. + Allocate(n uint64) []byte +} + +// EncoderWithAllocator serializes objects in a way that allows callers to manage any additional memory allocations. +type EncoderWithAllocator interface { + Encoder + // EncodeWithAllocator writes an object to a stream as Encode does. + // In addition, it allows for providing a memory allocator for efficient memory usage during object serialization + EncodeWithAllocator(obj Object, w io.Writer, memAlloc MemoryAllocator) error +} + // Decoder attempts to load an object from data. type Decoder interface { // Decode attempts to deserialize the provided data using either the innate typing of the scheme or the diff --git a/staging/src/k8s.io/apimachinery/pkg/runtime/serializer/encoder_with_allocator_test.go b/staging/src/k8s.io/apimachinery/pkg/runtime/serializer/encoder_with_allocator_test.go new file mode 100644 index 00000000000..73406d0731b --- /dev/null +++ b/staging/src/k8s.io/apimachinery/pkg/runtime/serializer/encoder_with_allocator_test.go @@ -0,0 +1,139 @@ +/* +Copyright 2022 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package serializer + +import ( + "crypto/rand" + "io/ioutil" + "testing" + + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + testapigroupv1 "k8s.io/apimachinery/pkg/apis/testapigroup/v1" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/apimachinery/pkg/runtime/serializer/protobuf" +) + +func BenchmarkProtobufEncoder(b *testing.B) { + benchmarkEncodeFor(b, protobuf.NewSerializer(nil, nil)) +} + +func BenchmarkProtobufEncodeWithAllocator(b *testing.B) { + benchmarkEncodeWithAllocatorFor(b, protobuf.NewSerializer(nil, nil)) +} + +func BenchmarkRawProtobufEncoder(b *testing.B) { + benchmarkEncodeFor(b, protobuf.NewRawSerializer(nil, nil)) +} + +func BenchmarkRawProtobufEncodeWithAllocator(b *testing.B) { + benchmarkEncodeWithAllocatorFor(b, protobuf.NewRawSerializer(nil, nil)) +} + +func benchmarkEncodeFor(b *testing.B, target runtime.Encoder) { + for _, tc := range benchTestCases() { + b.Run(tc.name, func(b *testing.B) { + b.ReportAllocs() + for n := 0; n < b.N; n++ { + err := target.Encode(tc.obj, ioutil.Discard) + if err != nil { + b.Fatal(err) + } + } + }) + } +} + +func benchmarkEncodeWithAllocatorFor(b *testing.B, target runtime.EncoderWithAllocator) { + for _, tc := range benchTestCases() { + b.Run(tc.name, func(b *testing.B) { + b.ReportAllocs() + allocator := &runtime.Allocator{} + for n := 0; n < b.N; n++ { + err := target.EncodeWithAllocator(tc.obj, ioutil.Discard, allocator) + if err != nil { + b.Fatal(err) + } + } + }) + } +} + +type benchTestCase struct { + name string + obj runtime.Object +} + +func benchTestCases() []benchTestCase { + return []benchTestCase{ + { + name: "an obj with 1kB payload", + obj: func() runtime.Object { + carpPayload := make([]byte, 1000) // 1 kB + if _, err := rand.Read(carpPayload); err != nil { + panic(err) + } + return carpWithPayload(carpPayload) + }(), + }, + { + name: "an obj with 10kB payload", + obj: func() runtime.Object { + carpPayload := make([]byte, 10000) // 10 kB + if _, err := rand.Read(carpPayload); err != nil { + panic(err) + } + return carpWithPayload(carpPayload) + }(), + }, + { + name: "an obj with 100kB payload", + obj: func() runtime.Object { + carpPayload := make([]byte, 100000) // 100 kB + if _, err := rand.Read(carpPayload); err != nil { + panic(err) + } + return carpWithPayload(carpPayload) + }(), + }, + { + name: "an obj with 1MB payload", + obj: func() runtime.Object { + carpPayload := make([]byte, 1000000) // 1 MB + if _, err := rand.Read(carpPayload); err != nil { + panic(err) + } + return carpWithPayload(carpPayload) + }(), + }, + } +} + +func carpWithPayload(carpPayload []byte) *testapigroupv1.Carp { + gvk := &schema.GroupVersionKind{Group: "group", Version: "version", Kind: "Carp"} + return &testapigroupv1.Carp{ + TypeMeta: metav1.TypeMeta{APIVersion: gvk.GroupVersion().String(), Kind: gvk.Kind}, + ObjectMeta: metav1.ObjectMeta{ + Name: "name", + Namespace: "namespace", + }, + Spec: testapigroupv1.CarpSpec{ + Subdomain: "carp.k8s.io", + NodeSelector: map[string]string{"payload": string(carpPayload)}, + }, + } +} diff --git a/staging/src/k8s.io/apimachinery/pkg/runtime/serializer/protobuf/protobuf.go b/staging/src/k8s.io/apimachinery/pkg/runtime/serializer/protobuf/protobuf.go index 8358d77c39e..c63e6dc63f6 100644 --- a/staging/src/k8s.io/apimachinery/pkg/runtime/serializer/protobuf/protobuf.go +++ b/staging/src/k8s.io/apimachinery/pkg/runtime/serializer/protobuf/protobuf.go @@ -30,6 +30,7 @@ import ( "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apimachinery/pkg/runtime/serializer/recognizer" "k8s.io/apimachinery/pkg/util/framer" + "k8s.io/klog/v2" ) var ( @@ -86,6 +87,7 @@ type Serializer struct { } var _ runtime.Serializer = &Serializer{} +var _ runtime.EncoderWithAllocator = &Serializer{} var _ recognizer.RecognizingDecoder = &Serializer{} const serializerIdentifier runtime.Identifier = "protobuf" @@ -161,22 +163,36 @@ func (s *Serializer) Decode(originalData []byte, gvk *schema.GroupVersionKind, i return unmarshalToObject(s.typer, s.creater, &actual, into, unk.Raw) } -// Encode serializes the provided object to the given writer. -func (s *Serializer) Encode(obj runtime.Object, w io.Writer) error { - if co, ok := obj.(runtime.CacheableObject); ok { - return co.CacheEncode(s.Identifier(), s.doEncode, w) - } - return s.doEncode(obj, w) +// EncodeWithAllocator writes an object to the provided writer. +// In addition, it allows for providing a memory allocator for efficient memory usage during object serialization. +func (s *Serializer) EncodeWithAllocator(obj runtime.Object, w io.Writer, memAlloc runtime.MemoryAllocator) error { + return s.encode(obj, w, memAlloc) } -func (s *Serializer) doEncode(obj runtime.Object, w io.Writer) error { +// Encode serializes the provided object to the given writer. +func (s *Serializer) Encode(obj runtime.Object, w io.Writer) error { + return s.encode(obj, w, &runtime.SimpleAllocator{}) +} + +func (s *Serializer) encode(obj runtime.Object, w io.Writer, memAlloc runtime.MemoryAllocator) error { + if co, ok := obj.(runtime.CacheableObject); ok { + return co.CacheEncode(s.Identifier(), func(obj runtime.Object, w io.Writer) error { return s.doEncode(obj, w, memAlloc) }, w) + } + return s.doEncode(obj, w, memAlloc) +} + +func (s *Serializer) doEncode(obj runtime.Object, w io.Writer, memAlloc runtime.MemoryAllocator) error { + if memAlloc == nil { + klog.Error("a mandatory memory allocator wasn't provided, this might have a negative impact on performance, check invocations of EncodeWithAllocator method, falling back on runtime.SimpleAllocator") + memAlloc = &runtime.SimpleAllocator{} + } prefixSize := uint64(len(s.prefix)) var unk runtime.Unknown switch t := obj.(type) { case *runtime.Unknown: estimatedSize := prefixSize + uint64(t.Size()) - data := make([]byte, estimatedSize) + data := memAlloc.Allocate(estimatedSize) i, err := t.MarshalTo(data[prefixSize:]) if err != nil { return err @@ -196,11 +212,11 @@ func (s *Serializer) doEncode(obj runtime.Object, w io.Writer) error { switch t := obj.(type) { case bufferedMarshaller: - // this path performs a single allocation during write but requires the caller to implement - // the more efficient Size and MarshalToSizedBuffer methods + // this path performs a single allocation during write only when the Allocator wasn't provided + // it also requires the caller to implement the more efficient Size and MarshalToSizedBuffer methods encodedSize := uint64(t.Size()) estimatedSize := prefixSize + estimateUnknownSize(&unk, encodedSize) - data := make([]byte, estimatedSize) + data := memAlloc.Allocate(estimatedSize) i, err := unk.NestedMarshalTo(data[prefixSize:], t, encodedSize) if err != nil { @@ -221,7 +237,7 @@ func (s *Serializer) doEncode(obj runtime.Object, w io.Writer) error { unk.Raw = data estimatedSize := prefixSize + uint64(unk.Size()) - data = make([]byte, estimatedSize) + data = memAlloc.Allocate(estimatedSize) i, err := unk.MarshalTo(data[prefixSize:]) if err != nil { @@ -395,19 +411,33 @@ func unmarshalToObject(typer runtime.ObjectTyper, creater runtime.ObjectCreater, // Encode serializes the provided object to the given writer. Overrides is ignored. func (s *RawSerializer) Encode(obj runtime.Object, w io.Writer) error { - if co, ok := obj.(runtime.CacheableObject); ok { - return co.CacheEncode(s.Identifier(), s.doEncode, w) - } - return s.doEncode(obj, w) + return s.encode(obj, w, &runtime.SimpleAllocator{}) } -func (s *RawSerializer) doEncode(obj runtime.Object, w io.Writer) error { +// EncodeWithAllocator writes an object to the provided writer. +// In addition, it allows for providing a memory allocator for efficient memory usage during object serialization. +func (s *RawSerializer) EncodeWithAllocator(obj runtime.Object, w io.Writer, memAlloc runtime.MemoryAllocator) error { + return s.encode(obj, w, memAlloc) +} + +func (s *RawSerializer) encode(obj runtime.Object, w io.Writer, memAlloc runtime.MemoryAllocator) error { + if co, ok := obj.(runtime.CacheableObject); ok { + return co.CacheEncode(s.Identifier(), func(obj runtime.Object, w io.Writer) error { return s.doEncode(obj, w, memAlloc) }, w) + } + return s.doEncode(obj, w, memAlloc) +} + +func (s *RawSerializer) doEncode(obj runtime.Object, w io.Writer, memAlloc runtime.MemoryAllocator) error { + if memAlloc == nil { + klog.Error("a mandatory memory allocator wasn't provided, this might have a negative impact on performance, check invocations of EncodeWithAllocator method, falling back on runtime.SimpleAllocator") + memAlloc = &runtime.SimpleAllocator{} + } switch t := obj.(type) { case bufferedReverseMarshaller: - // this path performs a single allocation during write but requires the caller to implement - // the more efficient Size and MarshalToSizedBuffer methods + // this path performs a single allocation during write only when the Allocator wasn't provided + // it also requires the caller to implement the more efficient Size and MarshalToSizedBuffer methods encodedSize := uint64(t.Size()) - data := make([]byte, encodedSize) + data := memAlloc.Allocate(encodedSize) n, err := t.MarshalToSizedBuffer(data) if err != nil { @@ -417,10 +447,10 @@ func (s *RawSerializer) doEncode(obj runtime.Object, w io.Writer) error { return err case bufferedMarshaller: - // this path performs a single allocation during write but requires the caller to implement - // the more efficient Size and MarshalTo methods + // this path performs a single allocation during write only when the Allocator wasn't provided + // it also requires the caller to implement the more efficient Size and MarshalTo methods encodedSize := uint64(t.Size()) - data := make([]byte, encodedSize) + data := memAlloc.Allocate(encodedSize) n, err := t.MarshalTo(data) if err != nil { diff --git a/staging/src/k8s.io/apimachinery/pkg/runtime/serializer/protobuf/protobuf_test.go b/staging/src/k8s.io/apimachinery/pkg/runtime/serializer/protobuf/protobuf_test.go index 001b0f64786..27f4496d18a 100644 --- a/staging/src/k8s.io/apimachinery/pkg/runtime/serializer/protobuf/protobuf_test.go +++ b/staging/src/k8s.io/apimachinery/pkg/runtime/serializer/protobuf/protobuf_test.go @@ -17,8 +17,12 @@ limitations under the License. package protobuf import ( + "bytes" + "reflect" "testing" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + testapigroupv1 "k8s.io/apimachinery/pkg/apis/testapigroup/v1" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime/schema" runtimetesting "k8s.io/apimachinery/pkg/runtime/testing" @@ -66,3 +70,111 @@ func (t *mockTyper) ObjectKinds(obj runtime.Object) ([]schema.GroupVersionKind, func (t *mockTyper) Recognizes(_ schema.GroupVersionKind) bool { return false } + +func TestSerializerEncodeWithAllocator(t *testing.T) { + testCases := []struct { + name string + obj runtime.Object + }{ + { + name: "encode a bufferedMarshaller obj", + obj: &testapigroupv1.Carp{ + TypeMeta: metav1.TypeMeta{APIVersion: "group/version", Kind: "Carp"}, + ObjectMeta: metav1.ObjectMeta{ + Name: "name", + Namespace: "namespace", + }, + Spec: testapigroupv1.CarpSpec{ + Subdomain: "carp.k8s.io", + }, + }, + }, + + { + name: "encode a runtime.Unknown obj", + obj: &runtime.Unknown{TypeMeta: runtime.TypeMeta{APIVersion: "group/version", Kind: "Unknown"}, Raw: []byte("hello world")}, + }, + } + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + target := NewSerializer(nil, nil) + + writer := &bytes.Buffer{} + if err := target.Encode(tc.obj, writer); err != nil { + t.Fatal(err) + } + + writer2 := &bytes.Buffer{} + alloc := &testAllocator{} + if err := target.EncodeWithAllocator(tc.obj, writer2, alloc); err != nil { + t.Fatal(err) + } + if alloc.allocateCount != 1 { + t.Fatalf("expected the Allocate method to be called exactly 1 but it was executed: %v times ", alloc.allocateCount) + } + + // to ensure compatibility of the new method with the old one, serialized data must be equal + // also we are not testing decoding since "roundtripping" is tested elsewhere for all known types + if !reflect.DeepEqual(writer.Bytes(), writer2.Bytes()) { + t.Fatal("data mismatch, data serialized with the Encode method is different than serialized with the EncodeWithAllocator method") + } + }) + } +} + +func TestRawSerializerEncodeWithAllocator(t *testing.T) { + testCases := []struct { + name string + obj runtime.Object + }{ + { + name: "encode a bufferedReverseMarshaller obj", + obj: &testapigroupv1.Carp{ + TypeMeta: metav1.TypeMeta{APIVersion: "group/version", Kind: "Carp"}, + ObjectMeta: metav1.ObjectMeta{ + Name: "name", + Namespace: "namespace", + }, + Spec: testapigroupv1.CarpSpec{ + Subdomain: "carp.k8s.io", + }, + }, + }, + } + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + writer := &bytes.Buffer{} + target := NewRawSerializer(nil, nil) + + if err := target.Encode(tc.obj, writer); err != nil { + t.Fatal(err) + } + + writer2 := &bytes.Buffer{} + alloc := &testAllocator{} + if err := target.EncodeWithAllocator(tc.obj, writer2, alloc); err != nil { + t.Fatal(err) + } + if alloc.allocateCount != 1 { + t.Fatalf("expected the Allocate method to be called exactly 1 but it was executed: %v times ", alloc.allocateCount) + } + + // to ensure compatibility of the new method with the old one, serialized data must be equal + // also we are not testing decoding since "roundtripping" is tested elsewhere for all known types + if !reflect.DeepEqual(writer.Bytes(), writer2.Bytes()) { + t.Fatal("data mismatch, data serialized with the Encode method is different than serialized with the EncodeWithAllocator method") + } + }) + } +} + +type testAllocator struct { + buf []byte + allocateCount int +} + +func (ta *testAllocator) Allocate(n uint64) []byte { + ta.buf = make([]byte, n, n) + ta.allocateCount++ + return ta.buf +} diff --git a/staging/src/k8s.io/apimachinery/pkg/runtime/serializer/streaming/streaming.go b/staging/src/k8s.io/apimachinery/pkg/runtime/serializer/streaming/streaming.go index 971c46d496a..87b3fec3f2d 100644 --- a/staging/src/k8s.io/apimachinery/pkg/runtime/serializer/streaming/streaming.go +++ b/staging/src/k8s.io/apimachinery/pkg/runtime/serializer/streaming/streaming.go @@ -134,3 +134,23 @@ func (e *encoder) Encode(obj runtime.Object) error { e.buf.Reset() return err } + +type encoderWithAllocator struct { + writer io.Writer + encoder runtime.EncoderWithAllocator + memAllocator runtime.MemoryAllocator +} + +// NewEncoderWithAllocator returns a new streaming encoder +func NewEncoderWithAllocator(w io.Writer, e runtime.EncoderWithAllocator, a runtime.MemoryAllocator) Encoder { + return &encoderWithAllocator{ + writer: w, + encoder: e, + memAllocator: a, + } +} + +// Encode writes the provided object to the nested writer +func (e *encoderWithAllocator) Encode(obj runtime.Object) error { + return e.encoder.EncodeWithAllocator(obj, e.writer, e.memAllocator) +} diff --git a/staging/src/k8s.io/apimachinery/pkg/runtime/serializer/versioning/versioning.go b/staging/src/k8s.io/apimachinery/pkg/runtime/serializer/versioning/versioning.go index 844730e6ba3..7b60b128b09 100644 --- a/staging/src/k8s.io/apimachinery/pkg/runtime/serializer/versioning/versioning.go +++ b/staging/src/k8s.io/apimachinery/pkg/runtime/serializer/versioning/versioning.go @@ -89,6 +89,8 @@ type codec struct { originalSchemeName string } +var _ runtime.EncoderWithAllocator = &codec{} + var identifiersMap sync.Map type codecIdentifier struct { @@ -192,19 +194,40 @@ func (c *codec) Decode(data []byte, defaultGVK *schema.GroupVersionKind, into ru return out, gvk, strictDecodingErr } +// EncodeWithAllocator ensures the provided object is output in the appropriate group and version, invoking +// conversion if necessary. Unversioned objects (according to the ObjectTyper) are output as is. +// In addition, it allows for providing a memory allocator for efficient memory usage during object serialization. +func (c *codec) EncodeWithAllocator(obj runtime.Object, w io.Writer, memAlloc runtime.MemoryAllocator) error { + return c.encode(obj, w, memAlloc) +} + // Encode ensures the provided object is output in the appropriate group and version, invoking // conversion if necessary. Unversioned objects (according to the ObjectTyper) are output as is. func (c *codec) Encode(obj runtime.Object, w io.Writer) error { - if co, ok := obj.(runtime.CacheableObject); ok { - return co.CacheEncode(c.Identifier(), c.doEncode, w) - } - return c.doEncode(obj, w) + return c.encode(obj, w, nil) } -func (c *codec) doEncode(obj runtime.Object, w io.Writer) error { +func (c *codec) encode(obj runtime.Object, w io.Writer, memAlloc runtime.MemoryAllocator) error { + if co, ok := obj.(runtime.CacheableObject); ok { + return co.CacheEncode(c.Identifier(), func(obj runtime.Object, w io.Writer) error { return c.doEncode(obj, w, memAlloc) }, w) + } + return c.doEncode(obj, w, memAlloc) +} + +func (c *codec) doEncode(obj runtime.Object, w io.Writer, memAlloc runtime.MemoryAllocator) error { + encodeFn := c.encoder.Encode + if memAlloc != nil { + if encoder, supportsAllocator := c.encoder.(runtime.EncoderWithAllocator); supportsAllocator { + encodeFn = func(obj runtime.Object, w io.Writer) error { + return encoder.EncodeWithAllocator(obj, w, memAlloc) + } + } else { + klog.V(4).Infof("a memory allocator was provided but the encoder %T doesn't implement the runtime.EncoderWithAllocator, using regular encoder.Encode method") + } + } switch obj := obj.(type) { case *runtime.Unknown: - return c.encoder.Encode(obj, w) + return encodeFn(obj, w) case runtime.Unstructured: // An unstructured list can contain objects of multiple group version kinds. don't short-circuit just // because the top-level type matches our desired destination type. actually send the object to the converter @@ -213,14 +236,14 @@ func (c *codec) doEncode(obj runtime.Object, w io.Writer) error { // avoid conversion roundtrip if GVK is the right one already or is empty (yes, this is a hack, but the old behaviour we rely on in kubectl) objGVK := obj.GetObjectKind().GroupVersionKind() if len(objGVK.Version) == 0 { - return c.encoder.Encode(obj, w) + return encodeFn(obj, w) } targetGVK, ok := c.encodeVersion.KindForGroupVersionKinds([]schema.GroupVersionKind{objGVK}) if !ok { return runtime.NewNotRegisteredGVKErrForTarget(c.originalSchemeName, objGVK, c.encodeVersion) } if targetGVK == objGVK { - return c.encoder.Encode(obj, w) + return encodeFn(obj, w) } } } @@ -242,7 +265,7 @@ func (c *codec) doEncode(obj runtime.Object, w io.Writer) error { } } objectKind.SetGroupVersionKind(gvks[0]) - return c.encoder.Encode(obj, w) + return encodeFn(obj, w) } // Perform a conversion if necessary @@ -258,7 +281,7 @@ func (c *codec) doEncode(obj runtime.Object, w io.Writer) error { } // Conversion is responsible for setting the proper group, version, and kind onto the outgoing object - return c.encoder.Encode(out, w) + return encodeFn(out, w) } // Identifier implements runtime.Encoder interface. diff --git a/staging/src/k8s.io/apimachinery/pkg/runtime/serializer/versioning/versioning_test.go b/staging/src/k8s.io/apimachinery/pkg/runtime/serializer/versioning/versioning_test.go index 82372cd25cf..ee12a69adf2 100644 --- a/staging/src/k8s.io/apimachinery/pkg/runtime/serializer/versioning/versioning_test.go +++ b/staging/src/k8s.io/apimachinery/pkg/runtime/serializer/versioning/versioning_test.go @@ -125,8 +125,9 @@ func TestNestedEncodeError(t *testing.T) { gvk1 := schema.GroupVersionKind{Kind: "test", Group: "other", Version: "v1"} gvk2 := schema.GroupVersionKind{Kind: "test", Group: "other", Version: "v2"} n.SetGroupVersionKind(gvk1) + encoder := &mockSerializer{obj: n} codec := NewCodec( - nil, nil, + encoder, nil, &mockConvertor{}, nil, &mockTyper{gvks: []schema.GroupVersionKind{gvk1, gvk2}}, diff --git a/staging/src/k8s.io/apiserver/pkg/endpoints/handlers/watch.go b/staging/src/k8s.io/apiserver/pkg/endpoints/handlers/watch.go index 818d94ae4c8..c155b925f78 100644 --- a/staging/src/k8s.io/apiserver/pkg/endpoints/handlers/watch.go +++ b/staging/src/k8s.io/apiserver/pkg/endpoints/handlers/watch.go @@ -19,10 +19,13 @@ package handlers import ( "bytes" "fmt" + "io" "net/http" "reflect" "time" + "golang.org/x/net/websocket" + "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" @@ -32,8 +35,6 @@ import ( "k8s.io/apiserver/pkg/endpoints/handlers/negotiation" "k8s.io/apiserver/pkg/endpoints/metrics" "k8s.io/apiserver/pkg/util/wsstream" - - "golang.org/x/net/websocket" ) // nothing will ever be sent down this channel @@ -187,7 +188,17 @@ func (s *WatchServer) ServeHTTP(w http.ResponseWriter, req *http.Request) { s.Scope.err(errors.NewBadRequest(err.Error()), w, req) return } - e := streaming.NewEncoder(framer, s.Encoder) + + var e streaming.Encoder + var memoryAllocator runtime.MemoryAllocator + + if encoder, supportsAllocator := s.Encoder.(runtime.EncoderWithAllocator); supportsAllocator { + memoryAllocator = runtime.AllocatorPool.Get().(*runtime.Allocator) + defer runtime.AllocatorPool.Put(memoryAllocator) + e = streaming.NewEncoderWithAllocator(framer, encoder, memoryAllocator) + } else { + e = streaming.NewEncoder(framer, s.Encoder) + } // ensure the connection times out timeoutCh, cleanup := s.TimeoutFactory.TimeoutCh() @@ -206,6 +217,19 @@ func (s *WatchServer) ServeHTTP(w http.ResponseWriter, req *http.Request) { ch := s.Watching.ResultChan() done := req.Context().Done() + embeddedEncodeFn := s.EmbeddedEncoder.Encode + if encoder, supportsAllocator := s.EmbeddedEncoder.(runtime.EncoderWithAllocator); supportsAllocator { + if memoryAllocator == nil { + // don't put the allocator inside the embeddedEncodeFn as that would allocate memory on every call. + // instead, we allocate the buffer for the entire watch session and release it when we close the connection. + memoryAllocator = runtime.AllocatorPool.Get().(*runtime.Allocator) + defer runtime.AllocatorPool.Put(memoryAllocator) + } + embeddedEncodeFn = func(obj runtime.Object, w io.Writer) error { + return encoder.EncodeWithAllocator(obj, w, memoryAllocator) + } + } + for { select { case <-done: @@ -220,7 +244,7 @@ func (s *WatchServer) ServeHTTP(w http.ResponseWriter, req *http.Request) { metrics.WatchEvents.WithContext(req.Context()).WithLabelValues(kind.Group, kind.Version, kind.Kind).Inc() obj := s.Fixup(event.Object) - if err := s.EmbeddedEncoder.Encode(obj, buf); err != nil { + if err := embeddedEncodeFn(obj, buf); err != nil { // unexpected error utilruntime.HandleError(fmt.Errorf("unable to encode watch object %T: %v", obj, err)) return