Merge pull request #108186 from p0lyn0mial/watch-list-reduce-allocations-in-watch-server

reduce the number of allocations in the WatchServer during objects serialisation
This commit is contained in:
Kubernetes Prow Robot 2022-02-23 09:04:19 -08:00 committed by GitHub
commit b435061c80
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 557 additions and 38 deletions

View File

@ -0,0 +1,74 @@
/*
Copyright 2022 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package runtime
import (
"sync"
)
// AllocatorPool simply stores Allocator objects to avoid additional memory allocations
// by caching created but unused items for later reuse, relieving pressure on the garbage collector.
//
// Usage:
// memoryAllocator := runtime.AllocatorPool.Get().(*runtime.Allocator)
// defer runtime.AllocatorPool.Put(memoryAllocator)
//
// A note for future:
// consider introducing multiple pools for storing buffers of different sizes
// perhaps this could allow us to be more efficient.
var AllocatorPool = sync.Pool{
New: func() interface{} {
return &Allocator{}
},
}
// Allocator knows how to allocate memory
// It exists to make the cost of object serialization cheaper.
// In some cases, it allows for allocating memory only once and then reusing it.
// This approach puts less load on GC and leads to less fragmented memory in general.
type Allocator struct {
buf []byte
}
var _ MemoryAllocator = &Allocator{}
// Allocate reserves memory for n bytes only if the underlying array doesn't have enough capacity
// otherwise it returns previously allocated block of memory.
//
// Note that the returned array is not zeroed, it is the caller's
// responsibility to clean the memory if needed.
func (a *Allocator) Allocate(n uint64) []byte {
if uint64(cap(a.buf)) >= n {
a.buf = a.buf[:n]
return a.buf
}
// grow the buffer
size := uint64(2*cap(a.buf)) + n
a.buf = make([]byte, size, size)
a.buf = a.buf[:n]
return a.buf
}
// SimpleAllocator a wrapper around make([]byte)
// conforms to the MemoryAllocator interface
type SimpleAllocator struct{}
var _ MemoryAllocator = &SimpleAllocator{}
func (sa *SimpleAllocator) Allocate(n uint64) []byte {
return make([]byte, n, n)
}

View File

@ -0,0 +1,78 @@
/*
Copyright 2022 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package runtime
import (
"math/rand"
"testing"
)
func TestAllocatorRandomInputs(t *testing.T) {
maxBytes := 5 * 1000000 // 5 MB
iterations := rand.Intn(10000) + 10
target := &Allocator{}
for i := 0; i < iterations; i++ {
bytesToAllocate := rand.Intn(maxBytes)
buff := target.Allocate(uint64(bytesToAllocate))
if cap(buff) < bytesToAllocate {
t.Fatalf("expected the buffer to allocate: %v bytes whereas it allocated: %v bytes", bytesToAllocate, cap(buff))
}
if len(buff) != bytesToAllocate {
t.Fatalf("unexpected length of the buffer, expected: %v, got: %v", bytesToAllocate, len(buff))
}
}
}
func TestAllocatorNeverShrinks(t *testing.T) {
target := &Allocator{}
initialSize := 1000000 // 1MB
initialBuff := target.Allocate(uint64(initialSize))
if cap(initialBuff) < initialSize {
t.Fatalf("unexpected size of the buffer, expected at least 1MB, got: %v", cap(initialBuff))
}
for i := initialSize; i > 0; i = i / 10 {
newBuff := target.Allocate(uint64(i))
if cap(newBuff) < initialSize {
t.Fatalf("allocator is now allowed to shrink memory")
}
if len(newBuff) != i {
t.Fatalf("unexpected length of the buffer, expected: %v, got: %v", i, len(newBuff))
}
}
}
func TestAllocatorZero(t *testing.T) {
target := &Allocator{}
initialSize := 1000000 // 1MB
buff := target.Allocate(uint64(initialSize))
if cap(buff) < initialSize {
t.Fatalf("unexpected size of the buffer, expected at least 1MB, got: %v", cap(buff))
}
if len(buff) != initialSize {
t.Fatalf("unexpected length of the buffer, expected: %v, got: %v", initialSize, len(buff))
}
buff = target.Allocate(0)
if cap(buff) < initialSize {
t.Fatalf("unexpected size of the buffer, expected at least 1MB, got: %v", cap(buff))
}
if len(buff) != 0 {
t.Fatalf("unexpected length of the buffer, expected: 0, got: %v", len(buff))
}
}

View File

@ -69,6 +69,24 @@ type Encoder interface {
Identifier() Identifier
}
// MemoryAllocator is responsible for allocating memory.
// By encapsulating memory allocation into its own interface, we can reuse the memory
// across many operations in places we know it can significantly improve the performance.
type MemoryAllocator interface {
// Allocate reserves memory for n bytes.
// Note that implementations of this method are not required to zero the returned array.
// It is the caller's responsibility to clean the memory if needed.
Allocate(n uint64) []byte
}
// EncoderWithAllocator serializes objects in a way that allows callers to manage any additional memory allocations.
type EncoderWithAllocator interface {
Encoder
// EncodeWithAllocator writes an object to a stream as Encode does.
// In addition, it allows for providing a memory allocator for efficient memory usage during object serialization
EncodeWithAllocator(obj Object, w io.Writer, memAlloc MemoryAllocator) error
}
// Decoder attempts to load an object from data.
type Decoder interface {
// Decode attempts to deserialize the provided data using either the innate typing of the scheme or the

View File

@ -0,0 +1,139 @@
/*
Copyright 2022 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package serializer
import (
"crypto/rand"
"io/ioutil"
"testing"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
testapigroupv1 "k8s.io/apimachinery/pkg/apis/testapigroup/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/runtime/serializer/protobuf"
)
func BenchmarkProtobufEncoder(b *testing.B) {
benchmarkEncodeFor(b, protobuf.NewSerializer(nil, nil))
}
func BenchmarkProtobufEncodeWithAllocator(b *testing.B) {
benchmarkEncodeWithAllocatorFor(b, protobuf.NewSerializer(nil, nil))
}
func BenchmarkRawProtobufEncoder(b *testing.B) {
benchmarkEncodeFor(b, protobuf.NewRawSerializer(nil, nil))
}
func BenchmarkRawProtobufEncodeWithAllocator(b *testing.B) {
benchmarkEncodeWithAllocatorFor(b, protobuf.NewRawSerializer(nil, nil))
}
func benchmarkEncodeFor(b *testing.B, target runtime.Encoder) {
for _, tc := range benchTestCases() {
b.Run(tc.name, func(b *testing.B) {
b.ReportAllocs()
for n := 0; n < b.N; n++ {
err := target.Encode(tc.obj, ioutil.Discard)
if err != nil {
b.Fatal(err)
}
}
})
}
}
func benchmarkEncodeWithAllocatorFor(b *testing.B, target runtime.EncoderWithAllocator) {
for _, tc := range benchTestCases() {
b.Run(tc.name, func(b *testing.B) {
b.ReportAllocs()
allocator := &runtime.Allocator{}
for n := 0; n < b.N; n++ {
err := target.EncodeWithAllocator(tc.obj, ioutil.Discard, allocator)
if err != nil {
b.Fatal(err)
}
}
})
}
}
type benchTestCase struct {
name string
obj runtime.Object
}
func benchTestCases() []benchTestCase {
return []benchTestCase{
{
name: "an obj with 1kB payload",
obj: func() runtime.Object {
carpPayload := make([]byte, 1000) // 1 kB
if _, err := rand.Read(carpPayload); err != nil {
panic(err)
}
return carpWithPayload(carpPayload)
}(),
},
{
name: "an obj with 10kB payload",
obj: func() runtime.Object {
carpPayload := make([]byte, 10000) // 10 kB
if _, err := rand.Read(carpPayload); err != nil {
panic(err)
}
return carpWithPayload(carpPayload)
}(),
},
{
name: "an obj with 100kB payload",
obj: func() runtime.Object {
carpPayload := make([]byte, 100000) // 100 kB
if _, err := rand.Read(carpPayload); err != nil {
panic(err)
}
return carpWithPayload(carpPayload)
}(),
},
{
name: "an obj with 1MB payload",
obj: func() runtime.Object {
carpPayload := make([]byte, 1000000) // 1 MB
if _, err := rand.Read(carpPayload); err != nil {
panic(err)
}
return carpWithPayload(carpPayload)
}(),
},
}
}
func carpWithPayload(carpPayload []byte) *testapigroupv1.Carp {
gvk := &schema.GroupVersionKind{Group: "group", Version: "version", Kind: "Carp"}
return &testapigroupv1.Carp{
TypeMeta: metav1.TypeMeta{APIVersion: gvk.GroupVersion().String(), Kind: gvk.Kind},
ObjectMeta: metav1.ObjectMeta{
Name: "name",
Namespace: "namespace",
},
Spec: testapigroupv1.CarpSpec{
Subdomain: "carp.k8s.io",
NodeSelector: map[string]string{"payload": string(carpPayload)},
},
}
}

View File

@ -30,6 +30,7 @@ import (
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/runtime/serializer/recognizer"
"k8s.io/apimachinery/pkg/util/framer"
"k8s.io/klog/v2"
)
var (
@ -86,6 +87,7 @@ type Serializer struct {
}
var _ runtime.Serializer = &Serializer{}
var _ runtime.EncoderWithAllocator = &Serializer{}
var _ recognizer.RecognizingDecoder = &Serializer{}
const serializerIdentifier runtime.Identifier = "protobuf"
@ -161,22 +163,36 @@ func (s *Serializer) Decode(originalData []byte, gvk *schema.GroupVersionKind, i
return unmarshalToObject(s.typer, s.creater, &actual, into, unk.Raw)
}
// Encode serializes the provided object to the given writer.
func (s *Serializer) Encode(obj runtime.Object, w io.Writer) error {
if co, ok := obj.(runtime.CacheableObject); ok {
return co.CacheEncode(s.Identifier(), s.doEncode, w)
}
return s.doEncode(obj, w)
// EncodeWithAllocator writes an object to the provided writer.
// In addition, it allows for providing a memory allocator for efficient memory usage during object serialization.
func (s *Serializer) EncodeWithAllocator(obj runtime.Object, w io.Writer, memAlloc runtime.MemoryAllocator) error {
return s.encode(obj, w, memAlloc)
}
func (s *Serializer) doEncode(obj runtime.Object, w io.Writer) error {
// Encode serializes the provided object to the given writer.
func (s *Serializer) Encode(obj runtime.Object, w io.Writer) error {
return s.encode(obj, w, &runtime.SimpleAllocator{})
}
func (s *Serializer) encode(obj runtime.Object, w io.Writer, memAlloc runtime.MemoryAllocator) error {
if co, ok := obj.(runtime.CacheableObject); ok {
return co.CacheEncode(s.Identifier(), func(obj runtime.Object, w io.Writer) error { return s.doEncode(obj, w, memAlloc) }, w)
}
return s.doEncode(obj, w, memAlloc)
}
func (s *Serializer) doEncode(obj runtime.Object, w io.Writer, memAlloc runtime.MemoryAllocator) error {
if memAlloc == nil {
klog.Error("a mandatory memory allocator wasn't provided, this might have a negative impact on performance, check invocations of EncodeWithAllocator method, falling back on runtime.SimpleAllocator")
memAlloc = &runtime.SimpleAllocator{}
}
prefixSize := uint64(len(s.prefix))
var unk runtime.Unknown
switch t := obj.(type) {
case *runtime.Unknown:
estimatedSize := prefixSize + uint64(t.Size())
data := make([]byte, estimatedSize)
data := memAlloc.Allocate(estimatedSize)
i, err := t.MarshalTo(data[prefixSize:])
if err != nil {
return err
@ -196,11 +212,11 @@ func (s *Serializer) doEncode(obj runtime.Object, w io.Writer) error {
switch t := obj.(type) {
case bufferedMarshaller:
// this path performs a single allocation during write but requires the caller to implement
// the more efficient Size and MarshalToSizedBuffer methods
// this path performs a single allocation during write only when the Allocator wasn't provided
// it also requires the caller to implement the more efficient Size and MarshalToSizedBuffer methods
encodedSize := uint64(t.Size())
estimatedSize := prefixSize + estimateUnknownSize(&unk, encodedSize)
data := make([]byte, estimatedSize)
data := memAlloc.Allocate(estimatedSize)
i, err := unk.NestedMarshalTo(data[prefixSize:], t, encodedSize)
if err != nil {
@ -221,7 +237,7 @@ func (s *Serializer) doEncode(obj runtime.Object, w io.Writer) error {
unk.Raw = data
estimatedSize := prefixSize + uint64(unk.Size())
data = make([]byte, estimatedSize)
data = memAlloc.Allocate(estimatedSize)
i, err := unk.MarshalTo(data[prefixSize:])
if err != nil {
@ -395,19 +411,33 @@ func unmarshalToObject(typer runtime.ObjectTyper, creater runtime.ObjectCreater,
// Encode serializes the provided object to the given writer. Overrides is ignored.
func (s *RawSerializer) Encode(obj runtime.Object, w io.Writer) error {
if co, ok := obj.(runtime.CacheableObject); ok {
return co.CacheEncode(s.Identifier(), s.doEncode, w)
}
return s.doEncode(obj, w)
return s.encode(obj, w, &runtime.SimpleAllocator{})
}
func (s *RawSerializer) doEncode(obj runtime.Object, w io.Writer) error {
// EncodeWithAllocator writes an object to the provided writer.
// In addition, it allows for providing a memory allocator for efficient memory usage during object serialization.
func (s *RawSerializer) EncodeWithAllocator(obj runtime.Object, w io.Writer, memAlloc runtime.MemoryAllocator) error {
return s.encode(obj, w, memAlloc)
}
func (s *RawSerializer) encode(obj runtime.Object, w io.Writer, memAlloc runtime.MemoryAllocator) error {
if co, ok := obj.(runtime.CacheableObject); ok {
return co.CacheEncode(s.Identifier(), func(obj runtime.Object, w io.Writer) error { return s.doEncode(obj, w, memAlloc) }, w)
}
return s.doEncode(obj, w, memAlloc)
}
func (s *RawSerializer) doEncode(obj runtime.Object, w io.Writer, memAlloc runtime.MemoryAllocator) error {
if memAlloc == nil {
klog.Error("a mandatory memory allocator wasn't provided, this might have a negative impact on performance, check invocations of EncodeWithAllocator method, falling back on runtime.SimpleAllocator")
memAlloc = &runtime.SimpleAllocator{}
}
switch t := obj.(type) {
case bufferedReverseMarshaller:
// this path performs a single allocation during write but requires the caller to implement
// the more efficient Size and MarshalToSizedBuffer methods
// this path performs a single allocation during write only when the Allocator wasn't provided
// it also requires the caller to implement the more efficient Size and MarshalToSizedBuffer methods
encodedSize := uint64(t.Size())
data := make([]byte, encodedSize)
data := memAlloc.Allocate(encodedSize)
n, err := t.MarshalToSizedBuffer(data)
if err != nil {
@ -417,10 +447,10 @@ func (s *RawSerializer) doEncode(obj runtime.Object, w io.Writer) error {
return err
case bufferedMarshaller:
// this path performs a single allocation during write but requires the caller to implement
// the more efficient Size and MarshalTo methods
// this path performs a single allocation during write only when the Allocator wasn't provided
// it also requires the caller to implement the more efficient Size and MarshalTo methods
encodedSize := uint64(t.Size())
data := make([]byte, encodedSize)
data := memAlloc.Allocate(encodedSize)
n, err := t.MarshalTo(data)
if err != nil {

View File

@ -17,8 +17,12 @@ limitations under the License.
package protobuf
import (
"bytes"
"reflect"
"testing"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
testapigroupv1 "k8s.io/apimachinery/pkg/apis/testapigroup/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
runtimetesting "k8s.io/apimachinery/pkg/runtime/testing"
@ -66,3 +70,111 @@ func (t *mockTyper) ObjectKinds(obj runtime.Object) ([]schema.GroupVersionKind,
func (t *mockTyper) Recognizes(_ schema.GroupVersionKind) bool {
return false
}
func TestSerializerEncodeWithAllocator(t *testing.T) {
testCases := []struct {
name string
obj runtime.Object
}{
{
name: "encode a bufferedMarshaller obj",
obj: &testapigroupv1.Carp{
TypeMeta: metav1.TypeMeta{APIVersion: "group/version", Kind: "Carp"},
ObjectMeta: metav1.ObjectMeta{
Name: "name",
Namespace: "namespace",
},
Spec: testapigroupv1.CarpSpec{
Subdomain: "carp.k8s.io",
},
},
},
{
name: "encode a runtime.Unknown obj",
obj: &runtime.Unknown{TypeMeta: runtime.TypeMeta{APIVersion: "group/version", Kind: "Unknown"}, Raw: []byte("hello world")},
},
}
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
target := NewSerializer(nil, nil)
writer := &bytes.Buffer{}
if err := target.Encode(tc.obj, writer); err != nil {
t.Fatal(err)
}
writer2 := &bytes.Buffer{}
alloc := &testAllocator{}
if err := target.EncodeWithAllocator(tc.obj, writer2, alloc); err != nil {
t.Fatal(err)
}
if alloc.allocateCount != 1 {
t.Fatalf("expected the Allocate method to be called exactly 1 but it was executed: %v times ", alloc.allocateCount)
}
// to ensure compatibility of the new method with the old one, serialized data must be equal
// also we are not testing decoding since "roundtripping" is tested elsewhere for all known types
if !reflect.DeepEqual(writer.Bytes(), writer2.Bytes()) {
t.Fatal("data mismatch, data serialized with the Encode method is different than serialized with the EncodeWithAllocator method")
}
})
}
}
func TestRawSerializerEncodeWithAllocator(t *testing.T) {
testCases := []struct {
name string
obj runtime.Object
}{
{
name: "encode a bufferedReverseMarshaller obj",
obj: &testapigroupv1.Carp{
TypeMeta: metav1.TypeMeta{APIVersion: "group/version", Kind: "Carp"},
ObjectMeta: metav1.ObjectMeta{
Name: "name",
Namespace: "namespace",
},
Spec: testapigroupv1.CarpSpec{
Subdomain: "carp.k8s.io",
},
},
},
}
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
writer := &bytes.Buffer{}
target := NewRawSerializer(nil, nil)
if err := target.Encode(tc.obj, writer); err != nil {
t.Fatal(err)
}
writer2 := &bytes.Buffer{}
alloc := &testAllocator{}
if err := target.EncodeWithAllocator(tc.obj, writer2, alloc); err != nil {
t.Fatal(err)
}
if alloc.allocateCount != 1 {
t.Fatalf("expected the Allocate method to be called exactly 1 but it was executed: %v times ", alloc.allocateCount)
}
// to ensure compatibility of the new method with the old one, serialized data must be equal
// also we are not testing decoding since "roundtripping" is tested elsewhere for all known types
if !reflect.DeepEqual(writer.Bytes(), writer2.Bytes()) {
t.Fatal("data mismatch, data serialized with the Encode method is different than serialized with the EncodeWithAllocator method")
}
})
}
}
type testAllocator struct {
buf []byte
allocateCount int
}
func (ta *testAllocator) Allocate(n uint64) []byte {
ta.buf = make([]byte, n, n)
ta.allocateCount++
return ta.buf
}

View File

@ -134,3 +134,23 @@ func (e *encoder) Encode(obj runtime.Object) error {
e.buf.Reset()
return err
}
type encoderWithAllocator struct {
writer io.Writer
encoder runtime.EncoderWithAllocator
memAllocator runtime.MemoryAllocator
}
// NewEncoderWithAllocator returns a new streaming encoder
func NewEncoderWithAllocator(w io.Writer, e runtime.EncoderWithAllocator, a runtime.MemoryAllocator) Encoder {
return &encoderWithAllocator{
writer: w,
encoder: e,
memAllocator: a,
}
}
// Encode writes the provided object to the nested writer
func (e *encoderWithAllocator) Encode(obj runtime.Object) error {
return e.encoder.EncodeWithAllocator(obj, e.writer, e.memAllocator)
}

View File

@ -89,6 +89,8 @@ type codec struct {
originalSchemeName string
}
var _ runtime.EncoderWithAllocator = &codec{}
var identifiersMap sync.Map
type codecIdentifier struct {
@ -192,19 +194,40 @@ func (c *codec) Decode(data []byte, defaultGVK *schema.GroupVersionKind, into ru
return out, gvk, strictDecodingErr
}
// EncodeWithAllocator ensures the provided object is output in the appropriate group and version, invoking
// conversion if necessary. Unversioned objects (according to the ObjectTyper) are output as is.
// In addition, it allows for providing a memory allocator for efficient memory usage during object serialization.
func (c *codec) EncodeWithAllocator(obj runtime.Object, w io.Writer, memAlloc runtime.MemoryAllocator) error {
return c.encode(obj, w, memAlloc)
}
// Encode ensures the provided object is output in the appropriate group and version, invoking
// conversion if necessary. Unversioned objects (according to the ObjectTyper) are output as is.
func (c *codec) Encode(obj runtime.Object, w io.Writer) error {
if co, ok := obj.(runtime.CacheableObject); ok {
return co.CacheEncode(c.Identifier(), c.doEncode, w)
}
return c.doEncode(obj, w)
return c.encode(obj, w, nil)
}
func (c *codec) doEncode(obj runtime.Object, w io.Writer) error {
func (c *codec) encode(obj runtime.Object, w io.Writer, memAlloc runtime.MemoryAllocator) error {
if co, ok := obj.(runtime.CacheableObject); ok {
return co.CacheEncode(c.Identifier(), func(obj runtime.Object, w io.Writer) error { return c.doEncode(obj, w, memAlloc) }, w)
}
return c.doEncode(obj, w, memAlloc)
}
func (c *codec) doEncode(obj runtime.Object, w io.Writer, memAlloc runtime.MemoryAllocator) error {
encodeFn := c.encoder.Encode
if memAlloc != nil {
if encoder, supportsAllocator := c.encoder.(runtime.EncoderWithAllocator); supportsAllocator {
encodeFn = func(obj runtime.Object, w io.Writer) error {
return encoder.EncodeWithAllocator(obj, w, memAlloc)
}
} else {
klog.V(4).Infof("a memory allocator was provided but the encoder %T doesn't implement the runtime.EncoderWithAllocator, using regular encoder.Encode method")
}
}
switch obj := obj.(type) {
case *runtime.Unknown:
return c.encoder.Encode(obj, w)
return encodeFn(obj, w)
case runtime.Unstructured:
// An unstructured list can contain objects of multiple group version kinds. don't short-circuit just
// because the top-level type matches our desired destination type. actually send the object to the converter
@ -213,14 +236,14 @@ func (c *codec) doEncode(obj runtime.Object, w io.Writer) error {
// avoid conversion roundtrip if GVK is the right one already or is empty (yes, this is a hack, but the old behaviour we rely on in kubectl)
objGVK := obj.GetObjectKind().GroupVersionKind()
if len(objGVK.Version) == 0 {
return c.encoder.Encode(obj, w)
return encodeFn(obj, w)
}
targetGVK, ok := c.encodeVersion.KindForGroupVersionKinds([]schema.GroupVersionKind{objGVK})
if !ok {
return runtime.NewNotRegisteredGVKErrForTarget(c.originalSchemeName, objGVK, c.encodeVersion)
}
if targetGVK == objGVK {
return c.encoder.Encode(obj, w)
return encodeFn(obj, w)
}
}
}
@ -242,7 +265,7 @@ func (c *codec) doEncode(obj runtime.Object, w io.Writer) error {
}
}
objectKind.SetGroupVersionKind(gvks[0])
return c.encoder.Encode(obj, w)
return encodeFn(obj, w)
}
// Perform a conversion if necessary
@ -258,7 +281,7 @@ func (c *codec) doEncode(obj runtime.Object, w io.Writer) error {
}
// Conversion is responsible for setting the proper group, version, and kind onto the outgoing object
return c.encoder.Encode(out, w)
return encodeFn(out, w)
}
// Identifier implements runtime.Encoder interface.

View File

@ -125,8 +125,9 @@ func TestNestedEncodeError(t *testing.T) {
gvk1 := schema.GroupVersionKind{Kind: "test", Group: "other", Version: "v1"}
gvk2 := schema.GroupVersionKind{Kind: "test", Group: "other", Version: "v2"}
n.SetGroupVersionKind(gvk1)
encoder := &mockSerializer{obj: n}
codec := NewCodec(
nil, nil,
encoder, nil,
&mockConvertor{},
nil,
&mockTyper{gvks: []schema.GroupVersionKind{gvk1, gvk2}},

View File

@ -19,10 +19,13 @@ package handlers
import (
"bytes"
"fmt"
"io"
"net/http"
"reflect"
"time"
"golang.org/x/net/websocket"
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
@ -32,8 +35,6 @@ import (
"k8s.io/apiserver/pkg/endpoints/handlers/negotiation"
"k8s.io/apiserver/pkg/endpoints/metrics"
"k8s.io/apiserver/pkg/util/wsstream"
"golang.org/x/net/websocket"
)
// nothing will ever be sent down this channel
@ -187,7 +188,17 @@ func (s *WatchServer) ServeHTTP(w http.ResponseWriter, req *http.Request) {
s.Scope.err(errors.NewBadRequest(err.Error()), w, req)
return
}
e := streaming.NewEncoder(framer, s.Encoder)
var e streaming.Encoder
var memoryAllocator runtime.MemoryAllocator
if encoder, supportsAllocator := s.Encoder.(runtime.EncoderWithAllocator); supportsAllocator {
memoryAllocator = runtime.AllocatorPool.Get().(*runtime.Allocator)
defer runtime.AllocatorPool.Put(memoryAllocator)
e = streaming.NewEncoderWithAllocator(framer, encoder, memoryAllocator)
} else {
e = streaming.NewEncoder(framer, s.Encoder)
}
// ensure the connection times out
timeoutCh, cleanup := s.TimeoutFactory.TimeoutCh()
@ -206,6 +217,19 @@ func (s *WatchServer) ServeHTTP(w http.ResponseWriter, req *http.Request) {
ch := s.Watching.ResultChan()
done := req.Context().Done()
embeddedEncodeFn := s.EmbeddedEncoder.Encode
if encoder, supportsAllocator := s.EmbeddedEncoder.(runtime.EncoderWithAllocator); supportsAllocator {
if memoryAllocator == nil {
// don't put the allocator inside the embeddedEncodeFn as that would allocate memory on every call.
// instead, we allocate the buffer for the entire watch session and release it when we close the connection.
memoryAllocator = runtime.AllocatorPool.Get().(*runtime.Allocator)
defer runtime.AllocatorPool.Put(memoryAllocator)
}
embeddedEncodeFn = func(obj runtime.Object, w io.Writer) error {
return encoder.EncodeWithAllocator(obj, w, memoryAllocator)
}
}
for {
select {
case <-done:
@ -220,7 +244,7 @@ func (s *WatchServer) ServeHTTP(w http.ResponseWriter, req *http.Request) {
metrics.WatchEvents.WithContext(req.Context()).WithLabelValues(kind.Group, kind.Version, kind.Kind).Inc()
obj := s.Fixup(event.Object)
if err := s.EmbeddedEncoder.Encode(obj, buf); err != nil {
if err := embeddedEncodeFn(obj, buf); err != nil {
// unexpected error
utilruntime.HandleError(fmt.Errorf("unable to encode watch object %T: %v", obj, err))
return