mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-08-05 18:24:07 +00:00
Merge pull request #125676 from benluddy/cbor-bufferpool
KEP-4222: Don't pool large CBOR encode buffers
This commit is contained in:
commit
c3c8a9cfd4
@ -99,16 +99,35 @@ func (s *serializer) Identifier() runtime.Identifier {
|
|||||||
return "cbor"
|
return "cbor"
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Encode writes a CBOR representation of the given object.
|
||||||
|
//
|
||||||
|
// Because the CBOR data item written by a call to Encode is always enclosed in the "self-described
|
||||||
|
// CBOR" tag, its encoded form always has the prefix 0xd9d9f7. This prefix is suitable for use as a
|
||||||
|
// "magic number" for distinguishing encoded CBOR from other protocols.
|
||||||
|
//
|
||||||
|
// The default serialization behavior for any given object replicates the behavior of the JSON
|
||||||
|
// serializer as far as it is necessary to allow the CBOR serializer to be used as a drop-in
|
||||||
|
// replacement for the JSON serializer, with limited exceptions. For example, the distinction
|
||||||
|
// between integers and floating-point numbers is preserved in CBOR due to its distinct
|
||||||
|
// representations for each type.
|
||||||
|
//
|
||||||
|
// Objects implementing runtime.Unstructured will have their unstructured content encoded rather
|
||||||
|
// than following the default behavior for their dynamic type.
|
||||||
func (s *serializer) Encode(obj runtime.Object, w io.Writer) error {
|
func (s *serializer) Encode(obj runtime.Object, w io.Writer) error {
|
||||||
|
return s.encode(modes.Encode, obj, w)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *serializer) encode(mode modes.EncMode, obj runtime.Object, w io.Writer) error {
|
||||||
if _, err := w.Write(selfDescribedCBOR); err != nil {
|
if _, err := w.Write(selfDescribedCBOR); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
e := modes.Encode.NewEncoder(w)
|
var v interface{} = obj
|
||||||
if u, ok := obj.(runtime.Unstructured); ok {
|
if u, ok := obj.(runtime.Unstructured); ok {
|
||||||
return e.Encode(u.UnstructuredContent())
|
v = u.UnstructuredContent()
|
||||||
}
|
}
|
||||||
return e.Encode(obj)
|
|
||||||
|
return mode.MarshalTo(v, w)
|
||||||
}
|
}
|
||||||
|
|
||||||
// gvkWithDefaults returns group kind and version defaulting from provided default
|
// gvkWithDefaults returns group kind and version defaulting from provided default
|
||||||
|
@ -0,0 +1,65 @@
|
|||||||
|
/*
|
||||||
|
Copyright 2024 The Kubernetes Authors.
|
||||||
|
|
||||||
|
Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
you may not use this file except in compliance with the License.
|
||||||
|
You may obtain a copy of the License at
|
||||||
|
|
||||||
|
http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
|
||||||
|
Unless required by applicable law or agreed to in writing, software
|
||||||
|
distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
See the License for the specific language governing permissions and
|
||||||
|
limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package modes
|
||||||
|
|
||||||
|
import (
|
||||||
|
"bytes"
|
||||||
|
"sync"
|
||||||
|
)
|
||||||
|
|
||||||
|
var buffers = BufferProvider{p: new(sync.Pool)}
|
||||||
|
|
||||||
|
type buffer struct {
|
||||||
|
bytes.Buffer
|
||||||
|
}
|
||||||
|
|
||||||
|
type pool interface {
|
||||||
|
Get() interface{}
|
||||||
|
Put(interface{})
|
||||||
|
}
|
||||||
|
|
||||||
|
type BufferProvider struct {
|
||||||
|
p pool
|
||||||
|
}
|
||||||
|
|
||||||
|
func (b *BufferProvider) Get() *buffer {
|
||||||
|
if buf, ok := b.p.Get().(*buffer); ok {
|
||||||
|
return buf
|
||||||
|
}
|
||||||
|
return &buffer{}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (b *BufferProvider) Put(buf *buffer) {
|
||||||
|
if buf.Cap() > 3*1024*1024 /* Default MaxRequestBodyBytes */ {
|
||||||
|
// Objects in a sync.Pool are assumed to be fungible. This is not a good assumption
|
||||||
|
// for pools of *bytes.Buffer because a *bytes.Buffer's underlying array grows as
|
||||||
|
// needed to accommodate writes. In Kubernetes, apiservers tend to encode "small"
|
||||||
|
// objects very frequently and much larger objects (especially large lists) only
|
||||||
|
// occasionally. Under steady load, pooled buffers tend to be borrowed frequently
|
||||||
|
// enough to prevent them from being released. Over time, each buffer is used to
|
||||||
|
// encode a large object and its capacity increases accordingly. The result is that
|
||||||
|
// practically all buffers in the pool retain much more capacity than needed to
|
||||||
|
// encode most objects.
|
||||||
|
|
||||||
|
// As a basic mitigation for the worst case, buffers with more capacity than the
|
||||||
|
// default max request body size are never returned to the pool.
|
||||||
|
// TODO: Optimize for higher buffer utilization.
|
||||||
|
return
|
||||||
|
}
|
||||||
|
buf.Reset()
|
||||||
|
b.p.Put(buf)
|
||||||
|
}
|
@ -0,0 +1,61 @@
|
|||||||
|
/*
|
||||||
|
Copyright 2024 The Kubernetes Authors.
|
||||||
|
|
||||||
|
Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
you may not use this file except in compliance with the License.
|
||||||
|
You may obtain a copy of the License at
|
||||||
|
|
||||||
|
http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
|
||||||
|
Unless required by applicable law or agreed to in writing, software
|
||||||
|
distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
See the License for the specific language governing permissions and
|
||||||
|
limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package modes
|
||||||
|
|
||||||
|
import (
|
||||||
|
"testing"
|
||||||
|
)
|
||||||
|
|
||||||
|
type mockPool struct {
|
||||||
|
v interface{}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (*mockPool) Get() interface{} {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (p *mockPool) Put(v interface{}) {
|
||||||
|
p.v = v
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestBufferProviderPut(t *testing.T) {
|
||||||
|
{
|
||||||
|
p := new(mockPool)
|
||||||
|
bp := &BufferProvider{p: p}
|
||||||
|
small := new(buffer)
|
||||||
|
small.Grow(3 * 1024 * 1024)
|
||||||
|
small.WriteString("hello world")
|
||||||
|
bp.Put(small)
|
||||||
|
if p.v != small {
|
||||||
|
t.Errorf("expected buf with capacity %d to be returned to pool", small.Cap())
|
||||||
|
}
|
||||||
|
if small.Len() != 0 {
|
||||||
|
t.Errorf("expected buf to be reset before returning to pool")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
{
|
||||||
|
p := new(mockPool)
|
||||||
|
bp := &BufferProvider{p: p}
|
||||||
|
big := new(buffer)
|
||||||
|
big.Grow(3*1024*1024 + 1)
|
||||||
|
bp.Put(big)
|
||||||
|
if p.v != nil {
|
||||||
|
t.Errorf("expected buf with capacity %d not to be returned to pool", big.Cap())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
@ -17,10 +17,13 @@ limitations under the License.
|
|||||||
package modes
|
package modes
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"io"
|
||||||
|
|
||||||
"github.com/fxamacker/cbor/v2"
|
"github.com/fxamacker/cbor/v2"
|
||||||
)
|
)
|
||||||
|
|
||||||
var Encode cbor.EncMode = func() cbor.EncMode {
|
var Encode = EncMode{
|
||||||
|
delegate: func() cbor.UserBufferEncMode {
|
||||||
encode, err := cbor.EncOptions{
|
encode, err := cbor.EncOptions{
|
||||||
// Map keys need to be sorted to have deterministic output, and this is the order
|
// Map keys need to be sorted to have deterministic output, and this is the order
|
||||||
// defined in RFC 8949 4.2.1 "Core Deterministic Encoding Requirements".
|
// defined in RFC 8949 4.2.1 "Core Deterministic Encoding Requirements".
|
||||||
@ -91,19 +94,62 @@ var Encode cbor.EncMode = func() cbor.EncMode {
|
|||||||
// Disable default recognition of types implementing encoding.BinaryMarshaler, which
|
// Disable default recognition of types implementing encoding.BinaryMarshaler, which
|
||||||
// is not recognized for JSON encoding.
|
// is not recognized for JSON encoding.
|
||||||
BinaryMarshaler: cbor.BinaryMarshalerNone,
|
BinaryMarshaler: cbor.BinaryMarshalerNone,
|
||||||
}.EncMode()
|
}.UserBufferEncMode()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
panic(err)
|
panic(err)
|
||||||
}
|
}
|
||||||
return encode
|
return encode
|
||||||
}()
|
}(),
|
||||||
|
}
|
||||||
|
|
||||||
var EncodeNondeterministic cbor.EncMode = func() cbor.EncMode {
|
var EncodeNondeterministic = EncMode{
|
||||||
opts := Encode.EncOptions()
|
delegate: func() cbor.UserBufferEncMode {
|
||||||
opts.Sort = cbor.SortNone
|
opts := Encode.options()
|
||||||
em, err := opts.EncMode()
|
opts.Sort = cbor.SortNone // TODO: Use cbor.SortFastShuffle after bump to v2.7.0.
|
||||||
|
em, err := opts.UserBufferEncMode()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
panic(err)
|
panic(err)
|
||||||
}
|
}
|
||||||
return em
|
return em
|
||||||
}()
|
}(),
|
||||||
|
}
|
||||||
|
|
||||||
|
type EncMode struct {
|
||||||
|
delegate cbor.UserBufferEncMode
|
||||||
|
}
|
||||||
|
|
||||||
|
func (em EncMode) options() cbor.EncOptions {
|
||||||
|
return em.delegate.EncOptions()
|
||||||
|
}
|
||||||
|
|
||||||
|
func (em EncMode) MarshalTo(v interface{}, w io.Writer) error {
|
||||||
|
if buf, ok := w.(*buffer); ok {
|
||||||
|
return em.delegate.MarshalToBuffer(v, &buf.Buffer)
|
||||||
|
}
|
||||||
|
|
||||||
|
buf := buffers.Get()
|
||||||
|
defer buffers.Put(buf)
|
||||||
|
if err := em.delegate.MarshalToBuffer(v, &buf.Buffer); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
if _, err := io.Copy(w, buf); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (em EncMode) Marshal(v interface{}) ([]byte, error) {
|
||||||
|
buf := buffers.Get()
|
||||||
|
defer buffers.Put(buf)
|
||||||
|
|
||||||
|
if err := em.MarshalTo(v, &buf.Buffer); err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
clone := make([]byte, buf.Len())
|
||||||
|
copy(clone, buf.Bytes())
|
||||||
|
|
||||||
|
return clone, nil
|
||||||
|
}
|
||||||
|
@ -22,6 +22,8 @@ import (
|
|||||||
"reflect"
|
"reflect"
|
||||||
"testing"
|
"testing"
|
||||||
|
|
||||||
|
"k8s.io/apimachinery/pkg/runtime/serializer/cbor/internal/modes"
|
||||||
|
|
||||||
"github.com/fxamacker/cbor/v2"
|
"github.com/fxamacker/cbor/v2"
|
||||||
"github.com/google/go-cmp/cmp"
|
"github.com/google/go-cmp/cmp"
|
||||||
)
|
)
|
||||||
@ -35,7 +37,7 @@ func (i int64BinaryMarshaler) MarshalBinary() ([]byte, error) {
|
|||||||
func TestEncode(t *testing.T) {
|
func TestEncode(t *testing.T) {
|
||||||
for _, tc := range []struct {
|
for _, tc := range []struct {
|
||||||
name string
|
name string
|
||||||
modes []cbor.EncMode
|
modes []modes.EncMode
|
||||||
in interface{}
|
in interface{}
|
||||||
want []byte
|
want []byte
|
||||||
assertOnError func(t *testing.T, e error)
|
assertOnError func(t *testing.T, e error)
|
||||||
|
@ -25,12 +25,12 @@ import (
|
|||||||
"k8s.io/apimachinery/pkg/runtime/serializer/cbor/internal/modes"
|
"k8s.io/apimachinery/pkg/runtime/serializer/cbor/internal/modes"
|
||||||
)
|
)
|
||||||
|
|
||||||
var encModeNames = map[cbor.EncMode]string{
|
var encModeNames = map[modes.EncMode]string{
|
||||||
modes.Encode: "Encode",
|
modes.Encode: "Encode",
|
||||||
modes.EncodeNondeterministic: "EncodeNondeterministic",
|
modes.EncodeNondeterministic: "EncodeNondeterministic",
|
||||||
}
|
}
|
||||||
|
|
||||||
var allEncModes = []cbor.EncMode{
|
var allEncModes = []modes.EncMode{
|
||||||
modes.Encode,
|
modes.Encode,
|
||||||
modes.EncodeNondeterministic,
|
modes.EncodeNondeterministic,
|
||||||
}
|
}
|
||||||
|
@ -24,6 +24,8 @@ import (
|
|||||||
"testing"
|
"testing"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
"k8s.io/apimachinery/pkg/runtime/serializer/cbor/internal/modes"
|
||||||
|
|
||||||
"github.com/fxamacker/cbor/v2"
|
"github.com/fxamacker/cbor/v2"
|
||||||
"github.com/google/go-cmp/cmp"
|
"github.com/google/go-cmp/cmp"
|
||||||
)
|
)
|
||||||
@ -35,7 +37,7 @@ func nilPointerFor[T interface{}]() *T {
|
|||||||
// TestRoundtrip roundtrips object serialization to interface{} and back via CBOR.
|
// TestRoundtrip roundtrips object serialization to interface{} and back via CBOR.
|
||||||
func TestRoundtrip(t *testing.T) {
|
func TestRoundtrip(t *testing.T) {
|
||||||
type modePair struct {
|
type modePair struct {
|
||||||
enc cbor.EncMode
|
enc modes.EncMode
|
||||||
dec cbor.DecMode
|
dec cbor.DecMode
|
||||||
}
|
}
|
||||||
|
|
||||||
|
Loading…
Reference in New Issue
Block a user