Don't pool large CBOR encode buffers.

Objects in a sync.Pool are assumed to be fungible. This is not a good assumption for pools
of *bytes.Buffer because a *bytes.Buffer's underlying array grows as needed to accomodate writes. In
Kubernetes, apiservers tend to encode "small" objects very frequently and much larger
objects (especially large lists) only occasionally. Under steady load, pooled buffers tend to be
borrowed frequently enough to prevent them from being released. Over time, each buffer is used to
encode a large object and its capacity increases accordingly. The result is that practically all
buffers in the pool retain much more capacity than needed to encode most objects.

As a basic mitigation for the worst case, buffers with more capacity than the default max request
body size are never returned to the pool.
This commit is contained in:
Ben Luddy 2024-06-14 15:51:52 -04:00
parent bffc02b955
commit a19d142f0d
No known key found for this signature in database
GPG Key ID: A6551E73A5974C30
7 changed files with 272 additions and 77 deletions

View File

@ -99,16 +99,35 @@ func (s *serializer) Identifier() runtime.Identifier {
return "cbor" return "cbor"
} }
// Encode writes a CBOR representation of the given object.
//
// Because the CBOR data item written by a call to Encode is always enclosed in the "self-described
// CBOR" tag, its encoded form always has the prefix 0xd9d9f7. This prefix is suitable for use as a
// "magic number" for distinguishing encoded CBOR from other protocols.
//
// The default serialization behavior for any given object replicates the behavior of the JSON
// serializer as far as it is necessary to allow the CBOR serializer to be used as a drop-in
// replacement for the JSON serializer, with limited exceptions. For example, the distinction
// between integers and floating-point numbers is preserved in CBOR due to its distinct
// representations for each type.
//
// Objects implementing runtime.Unstructured will have their unstructured content encoded rather
// than following the default behavior for their dynamic type.
func (s *serializer) Encode(obj runtime.Object, w io.Writer) error { func (s *serializer) Encode(obj runtime.Object, w io.Writer) error {
return s.encode(modes.Encode, obj, w)
}
func (s *serializer) encode(mode modes.EncMode, obj runtime.Object, w io.Writer) error {
if _, err := w.Write(selfDescribedCBOR); err != nil { if _, err := w.Write(selfDescribedCBOR); err != nil {
return err return err
} }
e := modes.Encode.NewEncoder(w) var v interface{} = obj
if u, ok := obj.(runtime.Unstructured); ok { if u, ok := obj.(runtime.Unstructured); ok {
return e.Encode(u.UnstructuredContent()) v = u.UnstructuredContent()
} }
return e.Encode(obj)
return mode.MarshalTo(v, w)
} }
// gvkWithDefaults returns group kind and version defaulting from provided default // gvkWithDefaults returns group kind and version defaulting from provided default

View File

@ -0,0 +1,65 @@
/*
Copyright 2024 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package modes
import (
"bytes"
"sync"
)
var buffers = BufferProvider{p: new(sync.Pool)}
type buffer struct {
bytes.Buffer
}
type pool interface {
Get() interface{}
Put(interface{})
}
type BufferProvider struct {
p pool
}
func (b *BufferProvider) Get() *buffer {
if buf, ok := b.p.Get().(*buffer); ok {
return buf
}
return &buffer{}
}
func (b *BufferProvider) Put(buf *buffer) {
if buf.Cap() > 3*1024*1024 /* Default MaxRequestBodyBytes */ {
// Objects in a sync.Pool are assumed to be fungible. This is not a good assumption
// for pools of *bytes.Buffer because a *bytes.Buffer's underlying array grows as
// needed to accommodate writes. In Kubernetes, apiservers tend to encode "small"
// objects very frequently and much larger objects (especially large lists) only
// occasionally. Under steady load, pooled buffers tend to be borrowed frequently
// enough to prevent them from being released. Over time, each buffer is used to
// encode a large object and its capacity increases accordingly. The result is that
// practically all buffers in the pool retain much more capacity than needed to
// encode most objects.
// As a basic mitigation for the worst case, buffers with more capacity than the
// default max request body size are never returned to the pool.
// TODO: Optimize for higher buffer utilization.
return
}
buf.Reset()
b.p.Put(buf)
}

View File

@ -0,0 +1,61 @@
/*
Copyright 2024 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package modes
import (
"testing"
)
type mockPool struct {
v interface{}
}
func (*mockPool) Get() interface{} {
return nil
}
func (p *mockPool) Put(v interface{}) {
p.v = v
}
func TestBufferProviderPut(t *testing.T) {
{
p := new(mockPool)
bp := &BufferProvider{p: p}
small := new(buffer)
small.Grow(3 * 1024 * 1024)
small.WriteString("hello world")
bp.Put(small)
if p.v != small {
t.Errorf("expected buf with capacity %d to be returned to pool", small.Cap())
}
if small.Len() != 0 {
t.Errorf("expected buf to be reset before returning to pool")
}
}
{
p := new(mockPool)
bp := &BufferProvider{p: p}
big := new(buffer)
big.Grow(3*1024*1024 + 1)
bp.Put(big)
if p.v != nil {
t.Errorf("expected buf with capacity %d not to be returned to pool", big.Cap())
}
}
}

View File

@ -17,10 +17,13 @@ limitations under the License.
package modes package modes
import ( import (
"io"
"github.com/fxamacker/cbor/v2" "github.com/fxamacker/cbor/v2"
) )
var Encode cbor.EncMode = func() cbor.EncMode { var Encode = EncMode{
delegate: func() cbor.UserBufferEncMode {
encode, err := cbor.EncOptions{ encode, err := cbor.EncOptions{
// Map keys need to be sorted to have deterministic output, and this is the order // Map keys need to be sorted to have deterministic output, and this is the order
// defined in RFC 8949 4.2.1 "Core Deterministic Encoding Requirements". // defined in RFC 8949 4.2.1 "Core Deterministic Encoding Requirements".
@ -91,19 +94,62 @@ var Encode cbor.EncMode = func() cbor.EncMode {
// Disable default recognition of types implementing encoding.BinaryMarshaler, which // Disable default recognition of types implementing encoding.BinaryMarshaler, which
// is not recognized for JSON encoding. // is not recognized for JSON encoding.
BinaryMarshaler: cbor.BinaryMarshalerNone, BinaryMarshaler: cbor.BinaryMarshalerNone,
}.EncMode() }.UserBufferEncMode()
if err != nil { if err != nil {
panic(err) panic(err)
} }
return encode return encode
}() }(),
}
var EncodeNondeterministic cbor.EncMode = func() cbor.EncMode { var EncodeNondeterministic = EncMode{
opts := Encode.EncOptions() delegate: func() cbor.UserBufferEncMode {
opts.Sort = cbor.SortNone opts := Encode.options()
em, err := opts.EncMode() opts.Sort = cbor.SortNone // TODO: Use cbor.SortFastShuffle after bump to v2.7.0.
em, err := opts.UserBufferEncMode()
if err != nil { if err != nil {
panic(err) panic(err)
} }
return em return em
}() }(),
}
type EncMode struct {
delegate cbor.UserBufferEncMode
}
func (em EncMode) options() cbor.EncOptions {
return em.delegate.EncOptions()
}
func (em EncMode) MarshalTo(v interface{}, w io.Writer) error {
if buf, ok := w.(*buffer); ok {
return em.delegate.MarshalToBuffer(v, &buf.Buffer)
}
buf := buffers.Get()
defer buffers.Put(buf)
if err := em.delegate.MarshalToBuffer(v, &buf.Buffer); err != nil {
return err
}
if _, err := io.Copy(w, buf); err != nil {
return err
}
return nil
}
func (em EncMode) Marshal(v interface{}) ([]byte, error) {
buf := buffers.Get()
defer buffers.Put(buf)
if err := em.MarshalTo(v, &buf.Buffer); err != nil {
return nil, err
}
clone := make([]byte, buf.Len())
copy(clone, buf.Bytes())
return clone, nil
}

View File

@ -22,6 +22,8 @@ import (
"reflect" "reflect"
"testing" "testing"
"k8s.io/apimachinery/pkg/runtime/serializer/cbor/internal/modes"
"github.com/fxamacker/cbor/v2" "github.com/fxamacker/cbor/v2"
"github.com/google/go-cmp/cmp" "github.com/google/go-cmp/cmp"
) )
@ -35,7 +37,7 @@ func (i int64BinaryMarshaler) MarshalBinary() ([]byte, error) {
func TestEncode(t *testing.T) { func TestEncode(t *testing.T) {
for _, tc := range []struct { for _, tc := range []struct {
name string name string
modes []cbor.EncMode modes []modes.EncMode
in interface{} in interface{}
want []byte want []byte
assertOnError func(t *testing.T, e error) assertOnError func(t *testing.T, e error)

View File

@ -25,12 +25,12 @@ import (
"k8s.io/apimachinery/pkg/runtime/serializer/cbor/internal/modes" "k8s.io/apimachinery/pkg/runtime/serializer/cbor/internal/modes"
) )
var encModeNames = map[cbor.EncMode]string{ var encModeNames = map[modes.EncMode]string{
modes.Encode: "Encode", modes.Encode: "Encode",
modes.EncodeNondeterministic: "EncodeNondeterministic", modes.EncodeNondeterministic: "EncodeNondeterministic",
} }
var allEncModes = []cbor.EncMode{ var allEncModes = []modes.EncMode{
modes.Encode, modes.Encode,
modes.EncodeNondeterministic, modes.EncodeNondeterministic,
} }

View File

@ -24,6 +24,8 @@ import (
"testing" "testing"
"time" "time"
"k8s.io/apimachinery/pkg/runtime/serializer/cbor/internal/modes"
"github.com/fxamacker/cbor/v2" "github.com/fxamacker/cbor/v2"
"github.com/google/go-cmp/cmp" "github.com/google/go-cmp/cmp"
) )
@ -35,7 +37,7 @@ func nilPointerFor[T interface{}]() *T {
// TestRoundtrip roundtrips object serialization to interface{} and back via CBOR. // TestRoundtrip roundtrips object serialization to interface{} and back via CBOR.
func TestRoundtrip(t *testing.T) { func TestRoundtrip(t *testing.T) {
type modePair struct { type modePair struct {
enc cbor.EncMode enc modes.EncMode
dec cbor.DecMode dec cbor.DecMode
} }