From eda1b0c68ec166ee52c50e4a6ab682ce7227b6a5 Mon Sep 17 00:00:00 2001 From: Lukasz Szaszkiewicz Date: Thu, 17 Feb 2022 16:04:50 +0100 Subject: [PATCH 1/7] update the watch server to use EncoderWithAllocator during object serialization. It allows us to allocate a single buffer for the entire watch session and release it when a watch connection is closed. Previously memory was allocated for every object serialization putting a lot of pressure on GC and consuming more memory than needed. --- .../apiserver/pkg/endpoints/handlers/watch.go | 32 ++++++++++++++++--- 1 file changed, 28 insertions(+), 4 deletions(-) diff --git a/staging/src/k8s.io/apiserver/pkg/endpoints/handlers/watch.go b/staging/src/k8s.io/apiserver/pkg/endpoints/handlers/watch.go index 818d94ae4c8..c155b925f78 100644 --- a/staging/src/k8s.io/apiserver/pkg/endpoints/handlers/watch.go +++ b/staging/src/k8s.io/apiserver/pkg/endpoints/handlers/watch.go @@ -19,10 +19,13 @@ package handlers import ( "bytes" "fmt" + "io" "net/http" "reflect" "time" + "golang.org/x/net/websocket" + "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" @@ -32,8 +35,6 @@ import ( "k8s.io/apiserver/pkg/endpoints/handlers/negotiation" "k8s.io/apiserver/pkg/endpoints/metrics" "k8s.io/apiserver/pkg/util/wsstream" - - "golang.org/x/net/websocket" ) // nothing will ever be sent down this channel @@ -187,7 +188,17 @@ func (s *WatchServer) ServeHTTP(w http.ResponseWriter, req *http.Request) { s.Scope.err(errors.NewBadRequest(err.Error()), w, req) return } - e := streaming.NewEncoder(framer, s.Encoder) + + var e streaming.Encoder + var memoryAllocator runtime.MemoryAllocator + + if encoder, supportsAllocator := s.Encoder.(runtime.EncoderWithAllocator); supportsAllocator { + memoryAllocator = runtime.AllocatorPool.Get().(*runtime.Allocator) + defer runtime.AllocatorPool.Put(memoryAllocator) + e = streaming.NewEncoderWithAllocator(framer, encoder, memoryAllocator) + } else { + e = streaming.NewEncoder(framer, s.Encoder) + } // ensure the connection times out timeoutCh, cleanup := s.TimeoutFactory.TimeoutCh() @@ -206,6 +217,19 @@ func (s *WatchServer) ServeHTTP(w http.ResponseWriter, req *http.Request) { ch := s.Watching.ResultChan() done := req.Context().Done() + embeddedEncodeFn := s.EmbeddedEncoder.Encode + if encoder, supportsAllocator := s.EmbeddedEncoder.(runtime.EncoderWithAllocator); supportsAllocator { + if memoryAllocator == nil { + // don't put the allocator inside the embeddedEncodeFn as that would allocate memory on every call. + // instead, we allocate the buffer for the entire watch session and release it when we close the connection. + memoryAllocator = runtime.AllocatorPool.Get().(*runtime.Allocator) + defer runtime.AllocatorPool.Put(memoryAllocator) + } + embeddedEncodeFn = func(obj runtime.Object, w io.Writer) error { + return encoder.EncodeWithAllocator(obj, w, memoryAllocator) + } + } + for { select { case <-done: @@ -220,7 +244,7 @@ func (s *WatchServer) ServeHTTP(w http.ResponseWriter, req *http.Request) { metrics.WatchEvents.WithContext(req.Context()).WithLabelValues(kind.Group, kind.Version, kind.Kind).Inc() obj := s.Fixup(event.Object) - if err := s.EmbeddedEncoder.Encode(obj, buf); err != nil { + if err := embeddedEncodeFn(obj, buf); err != nil { // unexpected error utilruntime.HandleError(fmt.Errorf("unable to encode watch object %T: %v", obj, err)) return From f3d5f42aa493c8cdf56b52823ca183fcfb019ec3 Mon Sep 17 00:00:00 2001 From: Lukasz Szaszkiewicz Date: Thu, 17 Feb 2022 16:05:04 +0100 Subject: [PATCH 2/7] codec interfaces --- .../apimachinery/pkg/runtime/interfaces.go | 18 ++++++++++++++++++ 1 file changed, 18 insertions(+) diff --git a/staging/src/k8s.io/apimachinery/pkg/runtime/interfaces.go b/staging/src/k8s.io/apimachinery/pkg/runtime/interfaces.go index 3ed5bf7bce6..dda874861d6 100644 --- a/staging/src/k8s.io/apimachinery/pkg/runtime/interfaces.go +++ b/staging/src/k8s.io/apimachinery/pkg/runtime/interfaces.go @@ -69,6 +69,24 @@ type Encoder interface { Identifier() Identifier } +// MemoryAllocator is responsible for allocating memory. +// By encapsulating memory allocation into its own interface, we can reuse the memory +// across many operations in places we know it can significantly improve the performance. +type MemoryAllocator interface { + // Allocate reserves memory for n bytes. + // Note that implementations of this method are not required to zero the returned array. + // It is the caller's responsibility to clean the memory if needed. + Allocate(n uint64) []byte +} + +// EncoderWithAllocator serializes objects in a way that allows callers to manage any additional memory allocations. +type EncoderWithAllocator interface { + Encoder + // EncodeWithAllocator writes an object to a stream as Encode does. + // In addition, it allows for providing a memory allocator for efficient memory usage during object serialization + EncodeWithAllocator(obj Object, w io.Writer, memAlloc MemoryAllocator) error +} + // Decoder attempts to load an object from data. type Decoder interface { // Decode attempts to deserialize the provided data using either the innate typing of the scheme or the From 81cf09675166a082949c00d8c853400f26fbf0c9 Mon Sep 17 00:00:00 2001 From: Lukasz Szaszkiewicz Date: Thu, 17 Feb 2022 16:05:25 +0100 Subject: [PATCH 3/7] codec: exposes EncodeWithAllocator method The new method is implemented by the protobuf serializer and helps to reduce memory footprint during object serialization. --- .../serializer/versioning/versioning.go | 43 ++++++++++++++----- 1 file changed, 33 insertions(+), 10 deletions(-) diff --git a/staging/src/k8s.io/apimachinery/pkg/runtime/serializer/versioning/versioning.go b/staging/src/k8s.io/apimachinery/pkg/runtime/serializer/versioning/versioning.go index ea7c580bd6b..e637f8b8e00 100644 --- a/staging/src/k8s.io/apimachinery/pkg/runtime/serializer/versioning/versioning.go +++ b/staging/src/k8s.io/apimachinery/pkg/runtime/serializer/versioning/versioning.go @@ -89,6 +89,8 @@ type codec struct { originalSchemeName string } +var _ runtime.EncoderWithAllocator = &codec{} + var identifiersMap sync.Map type codecIdentifier struct { @@ -182,19 +184,40 @@ func (c *codec) Decode(data []byte, defaultGVK *schema.GroupVersionKind, into ru return out, gvk, strictDecodingErr } +// EncodeWithAllocator ensures the provided object is output in the appropriate group and version, invoking +// conversion if necessary. Unversioned objects (according to the ObjectTyper) are output as is. +// In addition, it allows for providing a memory allocator for efficient memory usage during object serialization. +func (c *codec) EncodeWithAllocator(obj runtime.Object, w io.Writer, memAlloc runtime.MemoryAllocator) error { + return c.encode(obj, w, memAlloc) +} + // Encode ensures the provided object is output in the appropriate group and version, invoking // conversion if necessary. Unversioned objects (according to the ObjectTyper) are output as is. func (c *codec) Encode(obj runtime.Object, w io.Writer) error { - if co, ok := obj.(runtime.CacheableObject); ok { - return co.CacheEncode(c.Identifier(), c.doEncode, w) - } - return c.doEncode(obj, w) + return c.encode(obj, w, nil) } -func (c *codec) doEncode(obj runtime.Object, w io.Writer) error { +func (c *codec) encode(obj runtime.Object, w io.Writer, memAlloc runtime.MemoryAllocator) error { + if co, ok := obj.(runtime.CacheableObject); ok { + return co.CacheEncode(c.Identifier(), func(obj runtime.Object, w io.Writer) error { return c.doEncode(obj, w, memAlloc) }, w) + } + return c.doEncode(obj, w, memAlloc) +} + +func (c *codec) doEncode(obj runtime.Object, w io.Writer, memAlloc runtime.MemoryAllocator) error { + encodeFn := c.encoder.Encode + if memAlloc != nil { + if encoder, supportsAllocator := c.encoder.(runtime.EncoderWithAllocator); supportsAllocator { + encodeFn = func(obj runtime.Object, w io.Writer) error { + return encoder.EncodeWithAllocator(obj, w, memAlloc) + } + } else { + klog.V(4).Infof("a memory allocator was provided but the encoder %T doesn't implement the runtime.EncoderWithAllocator, using regular encoder.Encode method") + } + } switch obj := obj.(type) { case *runtime.Unknown: - return c.encoder.Encode(obj, w) + return encodeFn(obj, w) case runtime.Unstructured: // An unstructured list can contain objects of multiple group version kinds. don't short-circuit just // because the top-level type matches our desired destination type. actually send the object to the converter @@ -203,14 +226,14 @@ func (c *codec) doEncode(obj runtime.Object, w io.Writer) error { // avoid conversion roundtrip if GVK is the right one already or is empty (yes, this is a hack, but the old behaviour we rely on in kubectl) objGVK := obj.GetObjectKind().GroupVersionKind() if len(objGVK.Version) == 0 { - return c.encoder.Encode(obj, w) + return encodeFn(obj, w) } targetGVK, ok := c.encodeVersion.KindForGroupVersionKinds([]schema.GroupVersionKind{objGVK}) if !ok { return runtime.NewNotRegisteredGVKErrForTarget(c.originalSchemeName, objGVK, c.encodeVersion) } if targetGVK == objGVK { - return c.encoder.Encode(obj, w) + return encodeFn(obj, w) } } } @@ -232,7 +255,7 @@ func (c *codec) doEncode(obj runtime.Object, w io.Writer) error { } } objectKind.SetGroupVersionKind(gvks[0]) - return c.encoder.Encode(obj, w) + return encodeFn(obj, w) } // Perform a conversion if necessary @@ -248,7 +271,7 @@ func (c *codec) doEncode(obj runtime.Object, w io.Writer) error { } // Conversion is responsible for setting the proper group, version, and kind onto the outgoing object - return c.encoder.Encode(out, w) + return encodeFn(out, w) } // Identifier implements runtime.Encoder interface. From 32ca2b881d1d7748090aca3b3df95a69ec99fb0d Mon Sep 17 00:00:00 2001 From: Lukasz Szaszkiewicz Date: Thu, 17 Feb 2022 16:05:45 +0100 Subject: [PATCH 4/7] provides EncodeWithAllocator method for the protobuf encoder The new method allows for providing a memory allocator for efficient memory usage during object serialization. --- .../runtime/serializer/protobuf/protobuf.go | 76 ++++++++---- .../serializer/protobuf/protobuf_test.go | 112 ++++++++++++++++++ 2 files changed, 165 insertions(+), 23 deletions(-) diff --git a/staging/src/k8s.io/apimachinery/pkg/runtime/serializer/protobuf/protobuf.go b/staging/src/k8s.io/apimachinery/pkg/runtime/serializer/protobuf/protobuf.go index 8358d77c39e..c63e6dc63f6 100644 --- a/staging/src/k8s.io/apimachinery/pkg/runtime/serializer/protobuf/protobuf.go +++ b/staging/src/k8s.io/apimachinery/pkg/runtime/serializer/protobuf/protobuf.go @@ -30,6 +30,7 @@ import ( "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apimachinery/pkg/runtime/serializer/recognizer" "k8s.io/apimachinery/pkg/util/framer" + "k8s.io/klog/v2" ) var ( @@ -86,6 +87,7 @@ type Serializer struct { } var _ runtime.Serializer = &Serializer{} +var _ runtime.EncoderWithAllocator = &Serializer{} var _ recognizer.RecognizingDecoder = &Serializer{} const serializerIdentifier runtime.Identifier = "protobuf" @@ -161,22 +163,36 @@ func (s *Serializer) Decode(originalData []byte, gvk *schema.GroupVersionKind, i return unmarshalToObject(s.typer, s.creater, &actual, into, unk.Raw) } -// Encode serializes the provided object to the given writer. -func (s *Serializer) Encode(obj runtime.Object, w io.Writer) error { - if co, ok := obj.(runtime.CacheableObject); ok { - return co.CacheEncode(s.Identifier(), s.doEncode, w) - } - return s.doEncode(obj, w) +// EncodeWithAllocator writes an object to the provided writer. +// In addition, it allows for providing a memory allocator for efficient memory usage during object serialization. +func (s *Serializer) EncodeWithAllocator(obj runtime.Object, w io.Writer, memAlloc runtime.MemoryAllocator) error { + return s.encode(obj, w, memAlloc) } -func (s *Serializer) doEncode(obj runtime.Object, w io.Writer) error { +// Encode serializes the provided object to the given writer. +func (s *Serializer) Encode(obj runtime.Object, w io.Writer) error { + return s.encode(obj, w, &runtime.SimpleAllocator{}) +} + +func (s *Serializer) encode(obj runtime.Object, w io.Writer, memAlloc runtime.MemoryAllocator) error { + if co, ok := obj.(runtime.CacheableObject); ok { + return co.CacheEncode(s.Identifier(), func(obj runtime.Object, w io.Writer) error { return s.doEncode(obj, w, memAlloc) }, w) + } + return s.doEncode(obj, w, memAlloc) +} + +func (s *Serializer) doEncode(obj runtime.Object, w io.Writer, memAlloc runtime.MemoryAllocator) error { + if memAlloc == nil { + klog.Error("a mandatory memory allocator wasn't provided, this might have a negative impact on performance, check invocations of EncodeWithAllocator method, falling back on runtime.SimpleAllocator") + memAlloc = &runtime.SimpleAllocator{} + } prefixSize := uint64(len(s.prefix)) var unk runtime.Unknown switch t := obj.(type) { case *runtime.Unknown: estimatedSize := prefixSize + uint64(t.Size()) - data := make([]byte, estimatedSize) + data := memAlloc.Allocate(estimatedSize) i, err := t.MarshalTo(data[prefixSize:]) if err != nil { return err @@ -196,11 +212,11 @@ func (s *Serializer) doEncode(obj runtime.Object, w io.Writer) error { switch t := obj.(type) { case bufferedMarshaller: - // this path performs a single allocation during write but requires the caller to implement - // the more efficient Size and MarshalToSizedBuffer methods + // this path performs a single allocation during write only when the Allocator wasn't provided + // it also requires the caller to implement the more efficient Size and MarshalToSizedBuffer methods encodedSize := uint64(t.Size()) estimatedSize := prefixSize + estimateUnknownSize(&unk, encodedSize) - data := make([]byte, estimatedSize) + data := memAlloc.Allocate(estimatedSize) i, err := unk.NestedMarshalTo(data[prefixSize:], t, encodedSize) if err != nil { @@ -221,7 +237,7 @@ func (s *Serializer) doEncode(obj runtime.Object, w io.Writer) error { unk.Raw = data estimatedSize := prefixSize + uint64(unk.Size()) - data = make([]byte, estimatedSize) + data = memAlloc.Allocate(estimatedSize) i, err := unk.MarshalTo(data[prefixSize:]) if err != nil { @@ -395,19 +411,33 @@ func unmarshalToObject(typer runtime.ObjectTyper, creater runtime.ObjectCreater, // Encode serializes the provided object to the given writer. Overrides is ignored. func (s *RawSerializer) Encode(obj runtime.Object, w io.Writer) error { - if co, ok := obj.(runtime.CacheableObject); ok { - return co.CacheEncode(s.Identifier(), s.doEncode, w) - } - return s.doEncode(obj, w) + return s.encode(obj, w, &runtime.SimpleAllocator{}) } -func (s *RawSerializer) doEncode(obj runtime.Object, w io.Writer) error { +// EncodeWithAllocator writes an object to the provided writer. +// In addition, it allows for providing a memory allocator for efficient memory usage during object serialization. +func (s *RawSerializer) EncodeWithAllocator(obj runtime.Object, w io.Writer, memAlloc runtime.MemoryAllocator) error { + return s.encode(obj, w, memAlloc) +} + +func (s *RawSerializer) encode(obj runtime.Object, w io.Writer, memAlloc runtime.MemoryAllocator) error { + if co, ok := obj.(runtime.CacheableObject); ok { + return co.CacheEncode(s.Identifier(), func(obj runtime.Object, w io.Writer) error { return s.doEncode(obj, w, memAlloc) }, w) + } + return s.doEncode(obj, w, memAlloc) +} + +func (s *RawSerializer) doEncode(obj runtime.Object, w io.Writer, memAlloc runtime.MemoryAllocator) error { + if memAlloc == nil { + klog.Error("a mandatory memory allocator wasn't provided, this might have a negative impact on performance, check invocations of EncodeWithAllocator method, falling back on runtime.SimpleAllocator") + memAlloc = &runtime.SimpleAllocator{} + } switch t := obj.(type) { case bufferedReverseMarshaller: - // this path performs a single allocation during write but requires the caller to implement - // the more efficient Size and MarshalToSizedBuffer methods + // this path performs a single allocation during write only when the Allocator wasn't provided + // it also requires the caller to implement the more efficient Size and MarshalToSizedBuffer methods encodedSize := uint64(t.Size()) - data := make([]byte, encodedSize) + data := memAlloc.Allocate(encodedSize) n, err := t.MarshalToSizedBuffer(data) if err != nil { @@ -417,10 +447,10 @@ func (s *RawSerializer) doEncode(obj runtime.Object, w io.Writer) error { return err case bufferedMarshaller: - // this path performs a single allocation during write but requires the caller to implement - // the more efficient Size and MarshalTo methods + // this path performs a single allocation during write only when the Allocator wasn't provided + // it also requires the caller to implement the more efficient Size and MarshalTo methods encodedSize := uint64(t.Size()) - data := make([]byte, encodedSize) + data := memAlloc.Allocate(encodedSize) n, err := t.MarshalTo(data) if err != nil { diff --git a/staging/src/k8s.io/apimachinery/pkg/runtime/serializer/protobuf/protobuf_test.go b/staging/src/k8s.io/apimachinery/pkg/runtime/serializer/protobuf/protobuf_test.go index 001b0f64786..27f4496d18a 100644 --- a/staging/src/k8s.io/apimachinery/pkg/runtime/serializer/protobuf/protobuf_test.go +++ b/staging/src/k8s.io/apimachinery/pkg/runtime/serializer/protobuf/protobuf_test.go @@ -17,8 +17,12 @@ limitations under the License. package protobuf import ( + "bytes" + "reflect" "testing" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + testapigroupv1 "k8s.io/apimachinery/pkg/apis/testapigroup/v1" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime/schema" runtimetesting "k8s.io/apimachinery/pkg/runtime/testing" @@ -66,3 +70,111 @@ func (t *mockTyper) ObjectKinds(obj runtime.Object) ([]schema.GroupVersionKind, func (t *mockTyper) Recognizes(_ schema.GroupVersionKind) bool { return false } + +func TestSerializerEncodeWithAllocator(t *testing.T) { + testCases := []struct { + name string + obj runtime.Object + }{ + { + name: "encode a bufferedMarshaller obj", + obj: &testapigroupv1.Carp{ + TypeMeta: metav1.TypeMeta{APIVersion: "group/version", Kind: "Carp"}, + ObjectMeta: metav1.ObjectMeta{ + Name: "name", + Namespace: "namespace", + }, + Spec: testapigroupv1.CarpSpec{ + Subdomain: "carp.k8s.io", + }, + }, + }, + + { + name: "encode a runtime.Unknown obj", + obj: &runtime.Unknown{TypeMeta: runtime.TypeMeta{APIVersion: "group/version", Kind: "Unknown"}, Raw: []byte("hello world")}, + }, + } + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + target := NewSerializer(nil, nil) + + writer := &bytes.Buffer{} + if err := target.Encode(tc.obj, writer); err != nil { + t.Fatal(err) + } + + writer2 := &bytes.Buffer{} + alloc := &testAllocator{} + if err := target.EncodeWithAllocator(tc.obj, writer2, alloc); err != nil { + t.Fatal(err) + } + if alloc.allocateCount != 1 { + t.Fatalf("expected the Allocate method to be called exactly 1 but it was executed: %v times ", alloc.allocateCount) + } + + // to ensure compatibility of the new method with the old one, serialized data must be equal + // also we are not testing decoding since "roundtripping" is tested elsewhere for all known types + if !reflect.DeepEqual(writer.Bytes(), writer2.Bytes()) { + t.Fatal("data mismatch, data serialized with the Encode method is different than serialized with the EncodeWithAllocator method") + } + }) + } +} + +func TestRawSerializerEncodeWithAllocator(t *testing.T) { + testCases := []struct { + name string + obj runtime.Object + }{ + { + name: "encode a bufferedReverseMarshaller obj", + obj: &testapigroupv1.Carp{ + TypeMeta: metav1.TypeMeta{APIVersion: "group/version", Kind: "Carp"}, + ObjectMeta: metav1.ObjectMeta{ + Name: "name", + Namespace: "namespace", + }, + Spec: testapigroupv1.CarpSpec{ + Subdomain: "carp.k8s.io", + }, + }, + }, + } + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + writer := &bytes.Buffer{} + target := NewRawSerializer(nil, nil) + + if err := target.Encode(tc.obj, writer); err != nil { + t.Fatal(err) + } + + writer2 := &bytes.Buffer{} + alloc := &testAllocator{} + if err := target.EncodeWithAllocator(tc.obj, writer2, alloc); err != nil { + t.Fatal(err) + } + if alloc.allocateCount != 1 { + t.Fatalf("expected the Allocate method to be called exactly 1 but it was executed: %v times ", alloc.allocateCount) + } + + // to ensure compatibility of the new method with the old one, serialized data must be equal + // also we are not testing decoding since "roundtripping" is tested elsewhere for all known types + if !reflect.DeepEqual(writer.Bytes(), writer2.Bytes()) { + t.Fatal("data mismatch, data serialized with the Encode method is different than serialized with the EncodeWithAllocator method") + } + }) + } +} + +type testAllocator struct { + buf []byte + allocateCount int +} + +func (ta *testAllocator) Allocate(n uint64) []byte { + ta.buf = make([]byte, n, n) + ta.allocateCount++ + return ta.buf +} From 52de201b0fc780841ac7a4c22bbbb63a16a5df0a Mon Sep 17 00:00:00 2001 From: Lukasz Szaszkiewicz Date: Mon, 21 Feb 2022 15:31:31 +0100 Subject: [PATCH 5/7] introduces a memory allocator The primary use case for the allocator is to reduce cost of object serialization. Initially it will be used by the protobuf serializer. This approach puts less load on GC and leads to less fragmented memory in general. --- .../apimachinery/pkg/runtime/allocator.go | 74 ++++++++++ .../pkg/runtime/allocator_test.go | 78 ++++++++++ .../serializer/encoder_with_allocator_test.go | 139 ++++++++++++++++++ 3 files changed, 291 insertions(+) create mode 100644 staging/src/k8s.io/apimachinery/pkg/runtime/allocator.go create mode 100644 staging/src/k8s.io/apimachinery/pkg/runtime/allocator_test.go create mode 100644 staging/src/k8s.io/apimachinery/pkg/runtime/serializer/encoder_with_allocator_test.go diff --git a/staging/src/k8s.io/apimachinery/pkg/runtime/allocator.go b/staging/src/k8s.io/apimachinery/pkg/runtime/allocator.go new file mode 100644 index 00000000000..0d00d8c3a3b --- /dev/null +++ b/staging/src/k8s.io/apimachinery/pkg/runtime/allocator.go @@ -0,0 +1,74 @@ +/* +Copyright 2022 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package runtime + +import ( + "sync" +) + +// AllocatorPool simply stores Allocator objects to avoid additional memory allocations +// by caching created but unused items for later reuse, relieving pressure on the garbage collector. +// +// Usage: +// memoryAllocator := runtime.AllocatorPool.Get().(*runtime.Allocator) +// defer runtime.AllocatorPool.Put(memoryAllocator) +// +// A note for future: +// consider introducing multiple pools for storing buffers of different sizes +// perhaps this could allow us to be more efficient. +var AllocatorPool = sync.Pool{ + New: func() interface{} { + return &Allocator{} + }, +} + +// Allocator knows how to allocate memory +// It exists to make the cost of object serialization cheaper. +// In some cases, it allows for allocating memory only once and then reusing it. +// This approach puts less load on GC and leads to less fragmented memory in general. +type Allocator struct { + buf []byte +} + +var _ MemoryAllocator = &Allocator{} + +// Allocate reserves memory for n bytes only if the underlying array doesn't have enough capacity +// otherwise it returns previously allocated block of memory. +// +// Note that the returned array is not zeroed, it is the caller's +// responsibility to clean the memory if needed. +func (a *Allocator) Allocate(n uint64) []byte { + if uint64(cap(a.buf)) >= n { + a.buf = a.buf[:n] + return a.buf + } + // grow the buffer + size := uint64(2*cap(a.buf)) + n + a.buf = make([]byte, size, size) + a.buf = a.buf[:n] + return a.buf +} + +// SimpleAllocator a wrapper around make([]byte) +// conforms to the MemoryAllocator interface +type SimpleAllocator struct{} + +var _ MemoryAllocator = &SimpleAllocator{} + +func (sa *SimpleAllocator) Allocate(n uint64) []byte { + return make([]byte, n, n) +} diff --git a/staging/src/k8s.io/apimachinery/pkg/runtime/allocator_test.go b/staging/src/k8s.io/apimachinery/pkg/runtime/allocator_test.go new file mode 100644 index 00000000000..067a5dda5e4 --- /dev/null +++ b/staging/src/k8s.io/apimachinery/pkg/runtime/allocator_test.go @@ -0,0 +1,78 @@ +/* +Copyright 2022 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package runtime + +import ( + "math/rand" + "testing" +) + +func TestAllocatorRandomInputs(t *testing.T) { + maxBytes := 5 * 1000000 // 5 MB + iterations := rand.Intn(10000) + 10 + target := &Allocator{} + + for i := 0; i < iterations; i++ { + bytesToAllocate := rand.Intn(maxBytes) + buff := target.Allocate(uint64(bytesToAllocate)) + if cap(buff) < bytesToAllocate { + t.Fatalf("expected the buffer to allocate: %v bytes whereas it allocated: %v bytes", bytesToAllocate, cap(buff)) + } + if len(buff) != bytesToAllocate { + t.Fatalf("unexpected length of the buffer, expected: %v, got: %v", bytesToAllocate, len(buff)) + } + } +} + +func TestAllocatorNeverShrinks(t *testing.T) { + target := &Allocator{} + initialSize := 1000000 // 1MB + initialBuff := target.Allocate(uint64(initialSize)) + if cap(initialBuff) < initialSize { + t.Fatalf("unexpected size of the buffer, expected at least 1MB, got: %v", cap(initialBuff)) + } + + for i := initialSize; i > 0; i = i / 10 { + newBuff := target.Allocate(uint64(i)) + if cap(newBuff) < initialSize { + t.Fatalf("allocator is now allowed to shrink memory") + } + if len(newBuff) != i { + t.Fatalf("unexpected length of the buffer, expected: %v, got: %v", i, len(newBuff)) + } + } +} + +func TestAllocatorZero(t *testing.T) { + target := &Allocator{} + initialSize := 1000000 // 1MB + buff := target.Allocate(uint64(initialSize)) + if cap(buff) < initialSize { + t.Fatalf("unexpected size of the buffer, expected at least 1MB, got: %v", cap(buff)) + } + if len(buff) != initialSize { + t.Fatalf("unexpected length of the buffer, expected: %v, got: %v", initialSize, len(buff)) + } + + buff = target.Allocate(0) + if cap(buff) < initialSize { + t.Fatalf("unexpected size of the buffer, expected at least 1MB, got: %v", cap(buff)) + } + if len(buff) != 0 { + t.Fatalf("unexpected length of the buffer, expected: 0, got: %v", len(buff)) + } +} diff --git a/staging/src/k8s.io/apimachinery/pkg/runtime/serializer/encoder_with_allocator_test.go b/staging/src/k8s.io/apimachinery/pkg/runtime/serializer/encoder_with_allocator_test.go new file mode 100644 index 00000000000..73406d0731b --- /dev/null +++ b/staging/src/k8s.io/apimachinery/pkg/runtime/serializer/encoder_with_allocator_test.go @@ -0,0 +1,139 @@ +/* +Copyright 2022 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package serializer + +import ( + "crypto/rand" + "io/ioutil" + "testing" + + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + testapigroupv1 "k8s.io/apimachinery/pkg/apis/testapigroup/v1" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/apimachinery/pkg/runtime/serializer/protobuf" +) + +func BenchmarkProtobufEncoder(b *testing.B) { + benchmarkEncodeFor(b, protobuf.NewSerializer(nil, nil)) +} + +func BenchmarkProtobufEncodeWithAllocator(b *testing.B) { + benchmarkEncodeWithAllocatorFor(b, protobuf.NewSerializer(nil, nil)) +} + +func BenchmarkRawProtobufEncoder(b *testing.B) { + benchmarkEncodeFor(b, protobuf.NewRawSerializer(nil, nil)) +} + +func BenchmarkRawProtobufEncodeWithAllocator(b *testing.B) { + benchmarkEncodeWithAllocatorFor(b, protobuf.NewRawSerializer(nil, nil)) +} + +func benchmarkEncodeFor(b *testing.B, target runtime.Encoder) { + for _, tc := range benchTestCases() { + b.Run(tc.name, func(b *testing.B) { + b.ReportAllocs() + for n := 0; n < b.N; n++ { + err := target.Encode(tc.obj, ioutil.Discard) + if err != nil { + b.Fatal(err) + } + } + }) + } +} + +func benchmarkEncodeWithAllocatorFor(b *testing.B, target runtime.EncoderWithAllocator) { + for _, tc := range benchTestCases() { + b.Run(tc.name, func(b *testing.B) { + b.ReportAllocs() + allocator := &runtime.Allocator{} + for n := 0; n < b.N; n++ { + err := target.EncodeWithAllocator(tc.obj, ioutil.Discard, allocator) + if err != nil { + b.Fatal(err) + } + } + }) + } +} + +type benchTestCase struct { + name string + obj runtime.Object +} + +func benchTestCases() []benchTestCase { + return []benchTestCase{ + { + name: "an obj with 1kB payload", + obj: func() runtime.Object { + carpPayload := make([]byte, 1000) // 1 kB + if _, err := rand.Read(carpPayload); err != nil { + panic(err) + } + return carpWithPayload(carpPayload) + }(), + }, + { + name: "an obj with 10kB payload", + obj: func() runtime.Object { + carpPayload := make([]byte, 10000) // 10 kB + if _, err := rand.Read(carpPayload); err != nil { + panic(err) + } + return carpWithPayload(carpPayload) + }(), + }, + { + name: "an obj with 100kB payload", + obj: func() runtime.Object { + carpPayload := make([]byte, 100000) // 100 kB + if _, err := rand.Read(carpPayload); err != nil { + panic(err) + } + return carpWithPayload(carpPayload) + }(), + }, + { + name: "an obj with 1MB payload", + obj: func() runtime.Object { + carpPayload := make([]byte, 1000000) // 1 MB + if _, err := rand.Read(carpPayload); err != nil { + panic(err) + } + return carpWithPayload(carpPayload) + }(), + }, + } +} + +func carpWithPayload(carpPayload []byte) *testapigroupv1.Carp { + gvk := &schema.GroupVersionKind{Group: "group", Version: "version", Kind: "Carp"} + return &testapigroupv1.Carp{ + TypeMeta: metav1.TypeMeta{APIVersion: gvk.GroupVersion().String(), Kind: gvk.Kind}, + ObjectMeta: metav1.ObjectMeta{ + Name: "name", + Namespace: "namespace", + }, + Spec: testapigroupv1.CarpSpec{ + Subdomain: "carp.k8s.io", + NodeSelector: map[string]string{"payload": string(carpPayload)}, + }, + } +} From 034868e6af32149ac1ada27f1fab52454cd03bcb Mon Sep 17 00:00:00 2001 From: Lukasz Szaszkiewicz Date: Mon, 21 Feb 2022 13:03:26 +0100 Subject: [PATCH 6/7] fixes TestNestedEncodeError test --- .../pkg/runtime/serializer/versioning/versioning_test.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/staging/src/k8s.io/apimachinery/pkg/runtime/serializer/versioning/versioning_test.go b/staging/src/k8s.io/apimachinery/pkg/runtime/serializer/versioning/versioning_test.go index ee3058c3c79..86f60131a9c 100644 --- a/staging/src/k8s.io/apimachinery/pkg/runtime/serializer/versioning/versioning_test.go +++ b/staging/src/k8s.io/apimachinery/pkg/runtime/serializer/versioning/versioning_test.go @@ -108,8 +108,9 @@ func TestNestedEncodeError(t *testing.T) { gvk1 := schema.GroupVersionKind{Kind: "test", Group: "other", Version: "v1"} gvk2 := schema.GroupVersionKind{Kind: "test", Group: "other", Version: "v2"} n.SetGroupVersionKind(gvk1) + encoder := &mockSerializer{obj: n} codec := NewCodec( - nil, nil, + encoder, nil, &mockConvertor{}, nil, &mockTyper{gvks: []schema.GroupVersionKind{gvk1, gvk2}}, From 9dd77ac0174178d23c362d8472a3a63724c55ae9 Mon Sep 17 00:00:00 2001 From: Lukasz Szaszkiewicz Date: Wed, 23 Feb 2022 11:04:02 +0100 Subject: [PATCH 7/7] introduces a new streaming encoder that utilizes a memory allocator during objects serialization --- .../runtime/serializer/streaming/streaming.go | 20 +++++++++++++++++++ 1 file changed, 20 insertions(+) diff --git a/staging/src/k8s.io/apimachinery/pkg/runtime/serializer/streaming/streaming.go b/staging/src/k8s.io/apimachinery/pkg/runtime/serializer/streaming/streaming.go index 971c46d496a..87b3fec3f2d 100644 --- a/staging/src/k8s.io/apimachinery/pkg/runtime/serializer/streaming/streaming.go +++ b/staging/src/k8s.io/apimachinery/pkg/runtime/serializer/streaming/streaming.go @@ -134,3 +134,23 @@ func (e *encoder) Encode(obj runtime.Object) error { e.buf.Reset() return err } + +type encoderWithAllocator struct { + writer io.Writer + encoder runtime.EncoderWithAllocator + memAllocator runtime.MemoryAllocator +} + +// NewEncoderWithAllocator returns a new streaming encoder +func NewEncoderWithAllocator(w io.Writer, e runtime.EncoderWithAllocator, a runtime.MemoryAllocator) Encoder { + return &encoderWithAllocator{ + writer: w, + encoder: e, + memAllocator: a, + } +} + +// Encode writes the provided object to the nested writer +func (e *encoderWithAllocator) Encode(obj runtime.Object) error { + return e.encoder.EncodeWithAllocator(obj, e.writer, e.memAllocator) +}