From 52de201b0fc780841ac7a4c22bbbb63a16a5df0a Mon Sep 17 00:00:00 2001 From: Lukasz Szaszkiewicz Date: Mon, 21 Feb 2022 15:31:31 +0100 Subject: [PATCH] introduces a memory allocator The primary use case for the allocator is to reduce cost of object serialization. Initially it will be used by the protobuf serializer. This approach puts less load on GC and leads to less fragmented memory in general. --- .../apimachinery/pkg/runtime/allocator.go | 74 ++++++++++ .../pkg/runtime/allocator_test.go | 78 ++++++++++ .../serializer/encoder_with_allocator_test.go | 139 ++++++++++++++++++ 3 files changed, 291 insertions(+) create mode 100644 staging/src/k8s.io/apimachinery/pkg/runtime/allocator.go create mode 100644 staging/src/k8s.io/apimachinery/pkg/runtime/allocator_test.go create mode 100644 staging/src/k8s.io/apimachinery/pkg/runtime/serializer/encoder_with_allocator_test.go diff --git a/staging/src/k8s.io/apimachinery/pkg/runtime/allocator.go b/staging/src/k8s.io/apimachinery/pkg/runtime/allocator.go new file mode 100644 index 00000000000..0d00d8c3a3b --- /dev/null +++ b/staging/src/k8s.io/apimachinery/pkg/runtime/allocator.go @@ -0,0 +1,74 @@ +/* +Copyright 2022 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package runtime + +import ( + "sync" +) + +// AllocatorPool simply stores Allocator objects to avoid additional memory allocations +// by caching created but unused items for later reuse, relieving pressure on the garbage collector. +// +// Usage: +// memoryAllocator := runtime.AllocatorPool.Get().(*runtime.Allocator) +// defer runtime.AllocatorPool.Put(memoryAllocator) +// +// A note for future: +// consider introducing multiple pools for storing buffers of different sizes +// perhaps this could allow us to be more efficient. +var AllocatorPool = sync.Pool{ + New: func() interface{} { + return &Allocator{} + }, +} + +// Allocator knows how to allocate memory +// It exists to make the cost of object serialization cheaper. +// In some cases, it allows for allocating memory only once and then reusing it. +// This approach puts less load on GC and leads to less fragmented memory in general. +type Allocator struct { + buf []byte +} + +var _ MemoryAllocator = &Allocator{} + +// Allocate reserves memory for n bytes only if the underlying array doesn't have enough capacity +// otherwise it returns previously allocated block of memory. +// +// Note that the returned array is not zeroed, it is the caller's +// responsibility to clean the memory if needed. +func (a *Allocator) Allocate(n uint64) []byte { + if uint64(cap(a.buf)) >= n { + a.buf = a.buf[:n] + return a.buf + } + // grow the buffer + size := uint64(2*cap(a.buf)) + n + a.buf = make([]byte, size, size) + a.buf = a.buf[:n] + return a.buf +} + +// SimpleAllocator a wrapper around make([]byte) +// conforms to the MemoryAllocator interface +type SimpleAllocator struct{} + +var _ MemoryAllocator = &SimpleAllocator{} + +func (sa *SimpleAllocator) Allocate(n uint64) []byte { + return make([]byte, n, n) +} diff --git a/staging/src/k8s.io/apimachinery/pkg/runtime/allocator_test.go b/staging/src/k8s.io/apimachinery/pkg/runtime/allocator_test.go new file mode 100644 index 00000000000..067a5dda5e4 --- /dev/null +++ b/staging/src/k8s.io/apimachinery/pkg/runtime/allocator_test.go @@ -0,0 +1,78 @@ +/* +Copyright 2022 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package runtime + +import ( + "math/rand" + "testing" +) + +func TestAllocatorRandomInputs(t *testing.T) { + maxBytes := 5 * 1000000 // 5 MB + iterations := rand.Intn(10000) + 10 + target := &Allocator{} + + for i := 0; i < iterations; i++ { + bytesToAllocate := rand.Intn(maxBytes) + buff := target.Allocate(uint64(bytesToAllocate)) + if cap(buff) < bytesToAllocate { + t.Fatalf("expected the buffer to allocate: %v bytes whereas it allocated: %v bytes", bytesToAllocate, cap(buff)) + } + if len(buff) != bytesToAllocate { + t.Fatalf("unexpected length of the buffer, expected: %v, got: %v", bytesToAllocate, len(buff)) + } + } +} + +func TestAllocatorNeverShrinks(t *testing.T) { + target := &Allocator{} + initialSize := 1000000 // 1MB + initialBuff := target.Allocate(uint64(initialSize)) + if cap(initialBuff) < initialSize { + t.Fatalf("unexpected size of the buffer, expected at least 1MB, got: %v", cap(initialBuff)) + } + + for i := initialSize; i > 0; i = i / 10 { + newBuff := target.Allocate(uint64(i)) + if cap(newBuff) < initialSize { + t.Fatalf("allocator is now allowed to shrink memory") + } + if len(newBuff) != i { + t.Fatalf("unexpected length of the buffer, expected: %v, got: %v", i, len(newBuff)) + } + } +} + +func TestAllocatorZero(t *testing.T) { + target := &Allocator{} + initialSize := 1000000 // 1MB + buff := target.Allocate(uint64(initialSize)) + if cap(buff) < initialSize { + t.Fatalf("unexpected size of the buffer, expected at least 1MB, got: %v", cap(buff)) + } + if len(buff) != initialSize { + t.Fatalf("unexpected length of the buffer, expected: %v, got: %v", initialSize, len(buff)) + } + + buff = target.Allocate(0) + if cap(buff) < initialSize { + t.Fatalf("unexpected size of the buffer, expected at least 1MB, got: %v", cap(buff)) + } + if len(buff) != 0 { + t.Fatalf("unexpected length of the buffer, expected: 0, got: %v", len(buff)) + } +} diff --git a/staging/src/k8s.io/apimachinery/pkg/runtime/serializer/encoder_with_allocator_test.go b/staging/src/k8s.io/apimachinery/pkg/runtime/serializer/encoder_with_allocator_test.go new file mode 100644 index 00000000000..73406d0731b --- /dev/null +++ b/staging/src/k8s.io/apimachinery/pkg/runtime/serializer/encoder_with_allocator_test.go @@ -0,0 +1,139 @@ +/* +Copyright 2022 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package serializer + +import ( + "crypto/rand" + "io/ioutil" + "testing" + + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + testapigroupv1 "k8s.io/apimachinery/pkg/apis/testapigroup/v1" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/apimachinery/pkg/runtime/serializer/protobuf" +) + +func BenchmarkProtobufEncoder(b *testing.B) { + benchmarkEncodeFor(b, protobuf.NewSerializer(nil, nil)) +} + +func BenchmarkProtobufEncodeWithAllocator(b *testing.B) { + benchmarkEncodeWithAllocatorFor(b, protobuf.NewSerializer(nil, nil)) +} + +func BenchmarkRawProtobufEncoder(b *testing.B) { + benchmarkEncodeFor(b, protobuf.NewRawSerializer(nil, nil)) +} + +func BenchmarkRawProtobufEncodeWithAllocator(b *testing.B) { + benchmarkEncodeWithAllocatorFor(b, protobuf.NewRawSerializer(nil, nil)) +} + +func benchmarkEncodeFor(b *testing.B, target runtime.Encoder) { + for _, tc := range benchTestCases() { + b.Run(tc.name, func(b *testing.B) { + b.ReportAllocs() + for n := 0; n < b.N; n++ { + err := target.Encode(tc.obj, ioutil.Discard) + if err != nil { + b.Fatal(err) + } + } + }) + } +} + +func benchmarkEncodeWithAllocatorFor(b *testing.B, target runtime.EncoderWithAllocator) { + for _, tc := range benchTestCases() { + b.Run(tc.name, func(b *testing.B) { + b.ReportAllocs() + allocator := &runtime.Allocator{} + for n := 0; n < b.N; n++ { + err := target.EncodeWithAllocator(tc.obj, ioutil.Discard, allocator) + if err != nil { + b.Fatal(err) + } + } + }) + } +} + +type benchTestCase struct { + name string + obj runtime.Object +} + +func benchTestCases() []benchTestCase { + return []benchTestCase{ + { + name: "an obj with 1kB payload", + obj: func() runtime.Object { + carpPayload := make([]byte, 1000) // 1 kB + if _, err := rand.Read(carpPayload); err != nil { + panic(err) + } + return carpWithPayload(carpPayload) + }(), + }, + { + name: "an obj with 10kB payload", + obj: func() runtime.Object { + carpPayload := make([]byte, 10000) // 10 kB + if _, err := rand.Read(carpPayload); err != nil { + panic(err) + } + return carpWithPayload(carpPayload) + }(), + }, + { + name: "an obj with 100kB payload", + obj: func() runtime.Object { + carpPayload := make([]byte, 100000) // 100 kB + if _, err := rand.Read(carpPayload); err != nil { + panic(err) + } + return carpWithPayload(carpPayload) + }(), + }, + { + name: "an obj with 1MB payload", + obj: func() runtime.Object { + carpPayload := make([]byte, 1000000) // 1 MB + if _, err := rand.Read(carpPayload); err != nil { + panic(err) + } + return carpWithPayload(carpPayload) + }(), + }, + } +} + +func carpWithPayload(carpPayload []byte) *testapigroupv1.Carp { + gvk := &schema.GroupVersionKind{Group: "group", Version: "version", Kind: "Carp"} + return &testapigroupv1.Carp{ + TypeMeta: metav1.TypeMeta{APIVersion: gvk.GroupVersion().String(), Kind: gvk.Kind}, + ObjectMeta: metav1.ObjectMeta{ + Name: "name", + Namespace: "namespace", + }, + Spec: testapigroupv1.CarpSpec{ + Subdomain: "carp.k8s.io", + NodeSelector: map[string]string{"payload": string(carpPayload)}, + }, + } +}