Merge pull request #129407 from serathius/streaming-proto-list-encoder

Implement streaming proto list encoder
This commit is contained in:
Kubernetes Prow Robot 2025-03-11 17:11:45 -07:00 committed by GitHub
commit 1b6e321e23
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
15 changed files with 667 additions and 32 deletions

View File

@ -701,9 +701,13 @@ const (
StorageVersionMigrator featuregate.Feature = "StorageVersionMigrator"
// owner: @serathius
// Allow API server to encode collections item by item, instead of all at once.
// Allow API server JSON encoder to encode collections item by item, instead of all at once.
StreamingCollectionEncodingToJSON featuregate.Feature = "StreamingCollectionEncodingToJSON"
// owner: serathius
// Allow API server Protobuf encoder to encode collections item by item, instead of all at once.
StreamingCollectionEncodingToProtobuf featuregate.Feature = "StreamingCollectionEncodingToProtobuf"
// owner: @robscott
// kep: https://kep.k8s.io/2433
//

View File

@ -747,6 +747,10 @@ var defaultVersionedKubernetesFeatureGates = map[featuregate.Feature]featuregate
{Version: version.MustParse("1.33"), Default: true, PreRelease: featuregate.Beta},
},
StreamingCollectionEncodingToProtobuf: {
{Version: version.MustParse("1.33"), Default: true, PreRelease: featuregate.Beta},
},
SupplementalGroupsPolicy: {
{Version: version.MustParse("1.31"), Default: false, PreRelease: featuregate.Alpha},
},

View File

@ -80,6 +80,9 @@ func (c *GenericConfig) NewRESTStorage(apiResourceConfigSource serverstorage.API
if utilfeature.DefaultFeatureGate.Enabled(features.StreamingCollectionEncodingToJSON) {
opts = append(opts, serializer.WithStreamingCollectionEncodingToJSON())
}
if utilfeature.DefaultFeatureGate.Enabled(features.StreamingCollectionEncodingToProtobuf) {
opts = append(opts, serializer.WithStreamingCollectionEncodingToProtobuf())
}
if len(opts) != 0 {
apiGroupInfo.NegotiatedSerializer = serializer.NewCodecFactory(legacyscheme.Scheme, opts...)
}

View File

@ -895,7 +895,9 @@ func (r *crdHandler) getOrCreateServingInfoFor(uid types.UID, name string) (*crd
MediaType: "application/vnd.kubernetes.protobuf",
MediaTypeType: "application",
MediaTypeSubType: "vnd.kubernetes.protobuf",
Serializer: protobuf.NewSerializer(creator, typer),
Serializer: protobuf.NewSerializerWithOptions(creator, typer, protobuf.SerializerOptions{
StreamingCollectionsEncoding: utilfeature.DefaultFeatureGate.Enabled(features.StreamingCollectionEncodingToProtobuf),
}),
StreamSerializer: &runtime.StreamSerializerInfo{
Serializer: protobuf.NewRawSerializer(creator, typer),
Framer: protobuf.LengthDelimitedFramer,
@ -978,6 +980,9 @@ func (r *crdHandler) getOrCreateServingInfoFor(uid types.UID, name string) (*crd
if utilfeature.DefaultFeatureGate.Enabled(features.StreamingCollectionEncodingToJSON) {
opts = append(opts, serializer.WithStreamingCollectionEncodingToJSON())
}
if utilfeature.DefaultFeatureGate.Enabled(features.StreamingCollectionEncodingToProtobuf) {
opts = append(opts, serializer.WithStreamingCollectionEncodingToProtobuf())
}
scaleScope.Serializer = serializer.NewCodecFactory(scaleConverter.Scheme(), opts...)
scaleScope.Kind = autoscalingv1.SchemeGroupVersion.WithKind("Scale")
scaleScope.Namer = handlers.ContextBasedNaming{

View File

@ -61,7 +61,9 @@ func newSerializersForScheme(scheme *runtime.Scheme, mf json.MetaFactory, option
mf, scheme, scheme,
json.SerializerOptions{Yaml: true, Pretty: false, Strict: true},
)
protoSerializer := protobuf.NewSerializer(scheme, scheme)
protoSerializer := protobuf.NewSerializerWithOptions(scheme, scheme, protobuf.SerializerOptions{
StreamingCollectionsEncoding: options.StreamingCollectionsEncodingToProtobuf,
})
protoRawSerializer := protobuf.NewRawSerializer(scheme, scheme)
serializers := []runtime.SerializerInfo{
@ -113,7 +115,8 @@ type CodecFactoryOptions struct {
// Pretty includes a pretty serializer along with the non-pretty one
Pretty bool
StreamingCollectionsEncodingToJSON bool
StreamingCollectionsEncodingToJSON bool
StreamingCollectionsEncodingToProtobuf bool
serializers []func(runtime.ObjectCreater, runtime.ObjectTyper) runtime.SerializerInfo
}
@ -155,6 +158,12 @@ func WithStreamingCollectionEncodingToJSON() CodecFactoryOptionsMutator {
}
}
func WithStreamingCollectionEncodingToProtobuf() CodecFactoryOptionsMutator {
return func(options *CodecFactoryOptions) {
options.StreamingCollectionsEncodingToProtobuf = true
}
}
// NewCodecFactory provides methods for retrieving serializers for the supported wire formats
// and conversion wrappers to define preferred internal and external versions. In the future,
// as the internal version is used less, callers may instead use a defaulting serializer and

View File

@ -0,0 +1,174 @@
/*
Copyright 2025 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package protobuf
import (
"errors"
"io"
"math/bits"
"github.com/gogo/protobuf/proto"
"k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/conversion"
"k8s.io/apimachinery/pkg/runtime"
)
var (
errFieldCount = errors.New("expected ListType to have 3 fields")
errTypeMetaField = errors.New("expected TypeMeta field to have TypeMeta type")
errTypeMetaProtobufTag = errors.New(`expected TypeMeta protobuf field tag to be ""`)
errListMetaField = errors.New("expected ListMeta field to have ListMeta type")
errListMetaProtobufTag = errors.New(`expected ListMeta protobuf field tag to be "bytes,1,opt,name=metadata"`)
errItemsProtobufTag = errors.New(`expected Items protobuf field tag to be "bytes,2,rep,name=items"`)
errItemsSizer = errors.New(`expected Items elements to implement proto.Sizer`)
)
// getStreamingListData implements list extraction logic for protobuf stream serialization.
//
// Reason for a custom logic instead of reusing accessors from meta package:
// * Validate proto tags to prevent incompatibility with proto standard package.
// * ListMetaAccessor doesn't distinguish empty from nil value.
// * TypeAccessor reparsing "apiVersion" and serializing it with "{group}/{version}"
func getStreamingListData(list runtime.Object) (data streamingListData, err error) {
listValue, err := conversion.EnforcePtr(list)
if err != nil {
return data, err
}
listType := listValue.Type()
if listType.NumField() != 3 {
return data, errFieldCount
}
// TypeMeta: validated, but not returned as is not serialized.
_, ok := listValue.Field(0).Interface().(metav1.TypeMeta)
if !ok {
return data, errTypeMetaField
}
if listType.Field(0).Tag.Get("protobuf") != "" {
return data, errTypeMetaProtobufTag
}
// ListMeta
listMeta, ok := listValue.Field(1).Interface().(metav1.ListMeta)
if !ok {
return data, errListMetaField
}
// if we were ever to relax the protobuf tag check we should update the hardcoded `0xa` below when writing ListMeta.
if listType.Field(1).Tag.Get("protobuf") != "bytes,1,opt,name=metadata" {
return data, errListMetaProtobufTag
}
data.listMeta = listMeta
// Items; if we were ever to relax the protobuf tag check we should update the hardcoded `0x12` below when writing Items.
if listType.Field(2).Tag.Get("protobuf") != "bytes,2,rep,name=items" {
return data, errItemsProtobufTag
}
items, err := meta.ExtractList(list)
if err != nil {
return data, err
}
data.items = items
data.totalSize, data.listMetaSize, data.itemsSizes, err = listSize(listMeta, items)
return data, err
}
type streamingListData struct {
// totalSize is the total size of the serialized List object, including their proto headers/size bytes
totalSize int
// listMetaSize caches results from .Size() call to listMeta, doesn't include header bytes (field identifier, size)
listMetaSize int
listMeta metav1.ListMeta
// itemsSizes caches results from .Size() call to items, doesn't include header bytes (field identifier, size)
itemsSizes []int
items []runtime.Object
}
// listSize return size of ListMeta and items to be later used for preallocations.
// listMetaSize and itemSizes do not include header bytes (field identifier, size).
func listSize(listMeta metav1.ListMeta, items []runtime.Object) (totalSize, listMetaSize int, itemSizes []int, err error) {
// ListMeta
listMetaSize = listMeta.Size()
totalSize += 1 + sovGenerated(uint64(listMetaSize)) + listMetaSize
// Items
itemSizes = make([]int, len(items))
for i, item := range items {
sizer, ok := item.(proto.Sizer)
if !ok {
return totalSize, listMetaSize, nil, errItemsSizer
}
n := sizer.Size()
itemSizes[i] = n
totalSize += 1 + sovGenerated(uint64(n)) + n
}
return totalSize, listMetaSize, itemSizes, nil
}
func streamingEncodeUnknownList(w io.Writer, unk runtime.Unknown, listData streamingListData, memAlloc runtime.MemoryAllocator) error {
_, err := w.Write(protoEncodingPrefix)
if err != nil {
return err
}
// encodeList is responsible for encoding the List into the unknown Raw.
encodeList := func(writer io.Writer) (int, error) {
return streamingEncodeList(writer, listData, memAlloc)
}
_, err = unk.MarshalToWriter(w, listData.totalSize, encodeList)
return err
}
func streamingEncodeList(w io.Writer, listData streamingListData, memAlloc runtime.MemoryAllocator) (size int, err error) {
// ListMeta; 0xa = (1 << 3) | 2; field number: 1, type: 2 (LEN). https://protobuf.dev/programming-guides/encoding/#structure
n, err := doEncodeWithHeader(&listData.listMeta, w, 0xa, listData.listMetaSize, memAlloc)
size += n
if err != nil {
return size, err
}
// Items; 0x12 = (2 << 3) | 2; field number: 2, type: 2 (LEN). https://protobuf.dev/programming-guides/encoding/#structure
for i, item := range listData.items {
n, err := doEncodeWithHeader(item, w, 0x12, listData.itemsSizes[i], memAlloc)
size += n
if err != nil {
return size, err
}
}
return size, nil
}
func writeVarintGenerated(w io.Writer, v int) (int, error) {
buf := make([]byte, sovGenerated(uint64(v)))
encodeVarintGenerated(buf, len(buf), uint64(v))
return w.Write(buf)
}
// sovGenerated is copied from `generated.pb.go` returns size of varint.
func sovGenerated(v uint64) int {
return (bits.Len64(v|1) + 6) / 7
}
// encodeVarintGenerated is copied from `generated.pb.go` encodes varint.
func encodeVarintGenerated(dAtA []byte, offset int, v uint64) int {
offset -= sovGenerated(v)
base := offset
for v >= 1<<7 {
dAtA[offset] = uint8(v&0x7f | 0x80)
v >>= 7
offset++
}
dAtA[offset] = uint8(v)
return base
}

View File

@ -23,21 +23,26 @@ import (
"os/exec"
"testing"
"github.com/gogo/protobuf/proto"
"github.com/google/go-cmp/cmp"
"sigs.k8s.io/randfill"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
testapigroupv1 "k8s.io/apimachinery/pkg/apis/testapigroup/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
)
func TestCollectionsEncoding(t *testing.T) {
t.Run("Normal", func(t *testing.T) {
testCollectionsEncoding(t, NewSerializer(nil, nil))
testCollectionsEncoding(t, NewSerializer(nil, nil), false)
})
t.Run("Streaming", func(t *testing.T) {
testCollectionsEncoding(t, NewSerializerWithOptions(nil, nil, SerializerOptions{StreamingCollectionsEncoding: true}), true)
})
// Leave place for testing streaming collection serializer proposed as part of KEP-5116
}
func testCollectionsEncoding(t *testing.T, s *Serializer) {
func testCollectionsEncoding(t *testing.T, s *Serializer, streamingEnabled bool) {
var remainingItems int64 = 1
testCases := []struct {
name string
@ -191,7 +196,7 @@ func testCollectionsEncoding(t *testing.T, s *Serializer) {
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
var buf bytes.Buffer
var buf writeCountingBuffer
if err := s.Encode(tc.in, &buf); err != nil {
t.Fatalf("unexpected error: %v", err)
}
@ -201,8 +206,25 @@ func testCollectionsEncoding(t *testing.T, s *Serializer) {
t.Fatal(err)
}
if !bytes.Equal(expectBytes, actualBytes) {
t.Errorf("expected:\n%s\ngot:\n%s", tc.expect, base64.StdEncoding.EncodeToString(actualBytes))
t.Log(cmp.Diff(dumpProto(t, actualBytes[4:]), dumpProto(t, expectBytes[4:])))
expectedBytes, err := base64.StdEncoding.DecodeString(tc.expect)
if err == nil {
t.Errorf("expected:\n%v\ngot:\n%v", expectedBytes, actualBytes)
} else {
t.Errorf("expected:\n%v\ngot:\n%v", tc.expect, base64.StdEncoding.EncodeToString(actualBytes))
}
actualProto := dumpProto(t, actualBytes[4:])
expectedProto := dumpProto(t, expectBytes[4:])
if actualProto != "" && expectedProto != "" {
t.Log(cmp.Diff(actualProto, expectedProto))
} else {
t.Log(cmp.Diff(actualBytes, expectBytes))
}
}
if streamingEnabled && buf.writeCount <= 1 {
t.Errorf("expected streaming but Write was called only: %d", buf.writeCount)
}
if !streamingEnabled && buf.writeCount > 1 {
t.Errorf("expected non-streaming but Write was called more than once: %d", buf.writeCount)
}
})
}
@ -226,3 +248,86 @@ func dumpProto(t *testing.T, data []byte) string {
}
return string(d)
}
type writeCountingBuffer struct {
writeCount int
bytes.Buffer
}
func (b *writeCountingBuffer) Write(data []byte) (int, error) {
b.writeCount++
return b.Buffer.Write(data)
}
func (b *writeCountingBuffer) Reset() {
b.writeCount = 0
b.Buffer.Reset()
}
func TestFuzzCollection(t *testing.T) {
f := randfill.New()
streamingEncoder := NewSerializerWithOptions(nil, nil, SerializerOptions{StreamingCollectionsEncoding: true})
streamingBuffer := &bytes.Buffer{}
normalEncoder := NewSerializerWithOptions(nil, nil, SerializerOptions{StreamingCollectionsEncoding: false})
normalBuffer := &bytes.Buffer{}
for i := 0; i < 1000; i++ {
list := &testapigroupv1.CarpList{}
f.FillNoCustom(list)
streamingBuffer.Reset()
normalBuffer.Reset()
if err := streamingEncoder.Encode(list, streamingBuffer); err != nil {
t.Fatal(err)
}
if err := normalEncoder.Encode(list, normalBuffer); err != nil {
t.Fatal(err)
}
if diff := cmp.Diff(streamingBuffer.String(), normalBuffer.String()); diff != "" {
t.Logf("normal: %s", normalBuffer.String())
t.Logf("streaming: %s", streamingBuffer.String())
t.Fatalf("unexpected output:\n%s", diff)
}
}
}
func TestCallsToSize(t *testing.T) {
counter := &countingSizer{data: []byte("abba")}
listMeta := metav1.ListMeta{}
listData := streamingListData{
totalSize: 14,
listMeta: listMeta,
listMetaSize: listMeta.Size(),
itemsSizes: []int{counter.Size()},
items: []runtime.Object{counter},
}
err := streamingEncodeUnknownList(io.Discard, runtime.Unknown{}, listData, &runtime.Allocator{})
if err != nil {
t.Fatal(err)
}
if counter.count != 1 {
t.Errorf("Expected only 1 call to sizer, got %d", counter.count)
}
}
type countingSizer struct {
data []byte
count int
}
var _ proto.Sizer = (*countingSizer)(nil)
var _ runtime.ProtobufMarshaller = (*countingSizer)(nil)
func (s *countingSizer) MarshalTo(data []byte) (int, error) {
return copy(data, s.data), nil
}
func (s *countingSizer) Size() int {
s.count++
return len(s.data)
}
func (s *countingSizer) DeepCopyObject() runtime.Object {
return nil
}
func (s *countingSizer) GetObjectKind() schema.ObjectKind {
return nil
}

View File

@ -72,10 +72,18 @@ func IsNotMarshalable(err error) bool {
// is passed, the encoded object will have group, version, and kind fields set. If typer is nil, the objects will be written
// as-is (any type info passed with the object will be used).
func NewSerializer(creater runtime.ObjectCreater, typer runtime.ObjectTyper) *Serializer {
return NewSerializerWithOptions(creater, typer, SerializerOptions{})
}
// NewSerializerWithOptions creates a Protobuf serializer that handles encoding versioned objects into the proper wire form. If a typer
// is passed, the encoded object will have group, version, and kind fields set. If typer is nil, the objects will be written
// as-is (any type info passed with the object will be used).
func NewSerializerWithOptions(creater runtime.ObjectCreater, typer runtime.ObjectTyper, opts SerializerOptions) *Serializer {
return &Serializer{
prefix: protoEncodingPrefix,
creater: creater,
typer: typer,
options: opts,
}
}
@ -84,6 +92,14 @@ type Serializer struct {
prefix []byte
creater runtime.ObjectCreater
typer runtime.ObjectTyper
options SerializerOptions
}
// SerializerOptions holds the options which are used to configure a Proto serializer.
type SerializerOptions struct {
// StreamingCollectionsEncoding enables encoding collection, one item at the time, drastically reducing memory needed.
StreamingCollectionsEncoding bool
}
var _ runtime.Serializer = &Serializer{}
@ -209,6 +225,13 @@ func (s *Serializer) doEncode(obj runtime.Object, w io.Writer, memAlloc runtime.
},
}
}
if s.options.StreamingCollectionsEncoding {
listData, err := getStreamingListData(obj)
if err == nil {
// Doesn't honor custom proto marshaling methods (like json streaming), because all proto objects implement proto methods.
return streamingEncodeUnknownList(w, unk, listData, memAlloc)
}
}
switch t := obj.(type) {
case bufferedMarshaller:
@ -428,6 +451,39 @@ func (s *RawSerializer) encode(obj runtime.Object, w io.Writer, memAlloc runtime
}
func (s *RawSerializer) doEncode(obj runtime.Object, w io.Writer, memAlloc runtime.MemoryAllocator) error {
_, err := doEncode(obj, w, nil, memAlloc)
return err
}
func doEncodeWithHeader(obj any, w io.Writer, field byte, precomputedSize int, memAlloc runtime.MemoryAllocator) (size int, err error) {
// Field identifier
n, err := w.Write([]byte{field})
size += n
if err != nil {
return size, err
}
// Size
n, err = writeVarintGenerated(w, precomputedSize)
size += n
if err != nil {
return size, err
}
// Obj
n, err = doEncode(obj, w, &precomputedSize, memAlloc)
size += n
if err != nil {
return size, err
}
if n != precomputedSize {
return size, fmt.Errorf("the size value was %d, but doEncode wrote %d bytes to data", precomputedSize, n)
}
return size, nil
}
// doEncode encodes provided object into writer using a allocator if possible.
// Avoids call by object Size if precomputedObjSize is provided.
// precomputedObjSize should not include header bytes (field identifier, size).
func doEncode(obj any, w io.Writer, precomputedObjSize *int, memAlloc runtime.MemoryAllocator) (int, error) {
if memAlloc == nil {
klog.Error("a mandatory memory allocator wasn't provided, this might have a negative impact on performance, check invocations of EncodeWithAllocator method, falling back on runtime.SimpleAllocator")
memAlloc = &runtime.SimpleAllocator{}
@ -436,40 +492,43 @@ func (s *RawSerializer) doEncode(obj runtime.Object, w io.Writer, memAlloc runti
case bufferedReverseMarshaller:
// this path performs a single allocation during write only when the Allocator wasn't provided
// it also requires the caller to implement the more efficient Size and MarshalToSizedBuffer methods
encodedSize := uint64(t.Size())
data := memAlloc.Allocate(encodedSize)
if precomputedObjSize == nil {
s := t.Size()
precomputedObjSize = &s
}
data := memAlloc.Allocate(uint64(*precomputedObjSize))
n, err := t.MarshalToSizedBuffer(data)
if err != nil {
return err
return 0, err
}
_, err = w.Write(data[:n])
return err
return w.Write(data[:n])
case bufferedMarshaller:
// this path performs a single allocation during write only when the Allocator wasn't provided
// it also requires the caller to implement the more efficient Size and MarshalTo methods
encodedSize := uint64(t.Size())
data := memAlloc.Allocate(encodedSize)
if precomputedObjSize == nil {
s := t.Size()
precomputedObjSize = &s
}
data := memAlloc.Allocate(uint64(*precomputedObjSize))
n, err := t.MarshalTo(data)
if err != nil {
return err
return 0, err
}
_, err = w.Write(data[:n])
return err
return w.Write(data[:n])
case proto.Marshaler:
// this path performs extra allocations
data, err := t.Marshal()
if err != nil {
return err
return 0, err
}
_, err = w.Write(data)
return err
return w.Write(data)
default:
return errNotMarshalable{reflect.TypeOf(obj)}
return 0, errNotMarshalable{reflect.TypeOf(obj)}
}
}

View File

@ -18,6 +18,7 @@ package runtime
import (
"fmt"
"io"
)
type ProtobufMarshaller interface {
@ -28,6 +29,124 @@ type ProtobufReverseMarshaller interface {
MarshalToSizedBuffer(data []byte) (int, error)
}
const (
typeMetaTag = 0xa
rawTag = 0x12
contentEncodingTag = 0x1a
contentTypeTag = 0x22
// max length of a varint for a uint64
maxUint64VarIntLength = 10
)
// MarshalToWriter allows a caller to provide a streaming writer for raw bytes,
// instead of populating them inside the Unknown struct.
// rawSize is the number of bytes rawWriter will write in a success case.
// writeRaw is called when it is time to write the raw bytes. It must return `rawSize, nil` or an error.
func (m *Unknown) MarshalToWriter(w io.Writer, rawSize int, writeRaw func(io.Writer) (int, error)) (int, error) {
size := 0
// reuse the buffer for varint marshaling
varintBuffer := make([]byte, maxUint64VarIntLength)
writeVarint := func(i int) (int, error) {
offset := encodeVarintGenerated(varintBuffer, len(varintBuffer), uint64(i))
return w.Write(varintBuffer[offset:])
}
// TypeMeta
{
n, err := w.Write([]byte{typeMetaTag})
size += n
if err != nil {
return size, err
}
typeMetaBytes, err := m.TypeMeta.Marshal()
if err != nil {
return size, err
}
n, err = writeVarint(len(typeMetaBytes))
size += n
if err != nil {
return size, err
}
n, err = w.Write(typeMetaBytes)
size += n
if err != nil {
return size, err
}
}
// Raw, delegating write to writeRaw()
{
n, err := w.Write([]byte{rawTag})
size += n
if err != nil {
return size, err
}
n, err = writeVarint(rawSize)
size += n
if err != nil {
return size, err
}
n, err = writeRaw(w)
size += n
if err != nil {
return size, err
}
if n != int(rawSize) {
return size, fmt.Errorf("the size value was %d, but encoding wrote %d bytes to data", rawSize, n)
}
}
// ContentEncoding
{
n, err := w.Write([]byte{contentEncodingTag})
size += n
if err != nil {
return size, err
}
n, err = writeVarint(len(m.ContentEncoding))
size += n
if err != nil {
return size, err
}
n, err = w.Write([]byte(m.ContentEncoding))
size += n
if err != nil {
return size, err
}
}
// ContentEncoding
{
n, err := w.Write([]byte{contentTypeTag})
size += n
if err != nil {
return size, err
}
n, err = writeVarint(len(m.ContentType))
size += n
if err != nil {
return size, err
}
n, err = w.Write([]byte(m.ContentType))
size += n
if err != nil {
return size, err
}
}
return size, nil
}
// NestedMarshalTo allows a caller to avoid extra allocations during serialization of an Unknown
// that will contain an object that implements ProtobufMarshaller or ProtobufReverseMarshaller.
func (m *Unknown) NestedMarshalTo(data []byte, b ProtobufMarshaller, size uint64) (int, error) {
@ -43,12 +162,12 @@ func (m *Unknown) NestedMarshalTo(data []byte, b ProtobufMarshaller, size uint64
copy(data[i:], m.ContentType)
i = encodeVarintGenerated(data, i, uint64(len(m.ContentType)))
i--
data[i] = 0x22
data[i] = contentTypeTag
i -= len(m.ContentEncoding)
copy(data[i:], m.ContentEncoding)
i = encodeVarintGenerated(data, i, uint64(len(m.ContentEncoding)))
i--
data[i] = 0x1a
data[i] = contentEncodingTag
if b != nil {
if r, ok := b.(ProtobufReverseMarshaller); ok {
n1, err := r.MarshalToSizedBuffer(data[:i])
@ -75,7 +194,7 @@ func (m *Unknown) NestedMarshalTo(data []byte, b ProtobufMarshaller, size uint64
}
i = encodeVarintGenerated(data, i, size)
i--
data[i] = 0x12
data[i] = rawTag
}
n2, err := m.TypeMeta.MarshalToSizedBuffer(data[:i])
if err != nil {
@ -84,6 +203,6 @@ func (m *Unknown) NestedMarshalTo(data []byte, b ProtobufMarshaller, size uint64
i -= n2
i = encodeVarintGenerated(data, i, uint64(n2))
i--
data[i] = 0xa
data[i] = typeMetaTag
return msgSize - i, nil
}

View File

@ -0,0 +1,107 @@
/*
Copyright 2025 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package runtime
import (
"bytes"
"io"
"math"
"testing"
"github.com/google/go-cmp/cmp"
)
func TestVarint(t *testing.T) {
varintBuffer := make([]byte, maxUint64VarIntLength)
offset := encodeVarintGenerated(varintBuffer, len(varintBuffer), math.MaxUint64)
used := len(varintBuffer) - offset
if used != maxUint64VarIntLength {
t.Fatalf("expected encodeVarintGenerated to use %d bytes to encode MaxUint64, got %d", maxUint64VarIntLength, used)
}
}
func TestNestedMarshalToWriter(t *testing.T) {
testcases := []struct {
name string
raw []byte
}{
{
name: "zero-length",
raw: []byte{},
},
{
name: "simple",
raw: []byte{0x00, 0x01, 0x02, 0x03},
},
}
for _, tc := range testcases {
t.Run(tc.name, func(t *testing.T) {
u := &Unknown{
ContentType: "ct",
ContentEncoding: "ce",
TypeMeta: TypeMeta{
APIVersion: "v1",
Kind: "k",
},
}
// Marshal normally with Raw inlined
u.Raw = tc.raw
marshalData, err := u.Marshal()
if err != nil {
t.Fatal(err)
}
u.Raw = nil
// Marshal with NestedMarshalTo
nestedMarshalData := make([]byte, len(marshalData))
n, err := u.NestedMarshalTo(nestedMarshalData, copyMarshaler(tc.raw), uint64(len(tc.raw)))
if err != nil {
t.Fatal(err)
}
if n != len(marshalData) {
t.Errorf("NestedMarshalTo returned %d, expected %d", n, len(marshalData))
}
if e, a := marshalData, nestedMarshalData; !bytes.Equal(e, a) {
t.Errorf("NestedMarshalTo and Marshal differ:\n%s", cmp.Diff(e, a))
}
// Streaming marshal with MarshalToWriter
buf := bytes.NewBuffer(nil)
n, err = u.MarshalToWriter(buf, len(tc.raw), func(w io.Writer) (int, error) {
return w.Write(tc.raw)
})
if err != nil {
t.Fatal(err)
}
if n != len(marshalData) {
t.Errorf("MarshalToWriter returned %d, expected %d", n, len(marshalData))
}
if e, a := marshalData, buf.Bytes(); !bytes.Equal(e, a) {
t.Errorf("MarshalToWriter and Marshal differ:\n%s", cmp.Diff(e, a))
}
})
}
}
type copyMarshaler []byte
func (c copyMarshaler) MarshalTo(dest []byte) (int, error) {
n := copy(dest, []byte(c))
return n, nil
}

View File

@ -157,9 +157,9 @@ const (
// (usually the entire object), and if the size is smaller no gzipping will be performed
// if the client requests it.
defaultGzipThresholdBytes = 128 * 1024
// Use the length of the first write of streaming implementations.
// TODO: Update when streaming proto is implemented
firstWriteStreamingThresholdBytes = 1
// Use the length of the first write to recognize streaming implementations.
// When streaming JSON first write is "{", while Kubernetes protobuf starts unique 4 byte header.
firstWriteStreamingThresholdBytes = 4
)
// negotiateContentEncoding returns a supported client-requested content encoding for the

View File

@ -44,6 +44,7 @@ import (
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
jsonserializer "k8s.io/apimachinery/pkg/runtime/serializer/json"
"k8s.io/apimachinery/pkg/runtime/serializer/protobuf"
rand2 "k8s.io/apimachinery/pkg/util/rand"
"k8s.io/apimachinery/pkg/util/uuid"
"k8s.io/apiserver/pkg/features"
@ -845,6 +846,34 @@ func TestStreamingGzipIntegration(t *testing.T) {
expectGzip: true,
expectStreaming: true,
},
{
name: "Protobuf, small object, default -> no gzip",
serializer: protobuf.NewSerializerWithOptions(nil, nil, protobuf.SerializerOptions{}),
object: &testapigroupv1.CarpList{},
expectGzip: false,
expectStreaming: false,
},
{
name: "Protobuf, small object, streaming -> no gzip",
serializer: protobuf.NewSerializerWithOptions(nil, nil, protobuf.SerializerOptions{StreamingCollectionsEncoding: true}),
object: &testapigroupv1.CarpList{},
expectGzip: false,
expectStreaming: true,
},
{
name: "Protobuf, large object, default -> gzip",
serializer: protobuf.NewSerializerWithOptions(nil, nil, protobuf.SerializerOptions{}),
object: &testapigroupv1.CarpList{TypeMeta: metav1.TypeMeta{Kind: string(largeChunk)}},
expectGzip: true,
expectStreaming: false,
},
{
name: "Protobuf, large object, streaming -> gzip",
serializer: protobuf.NewSerializerWithOptions(nil, nil, protobuf.SerializerOptions{StreamingCollectionsEncoding: true}),
object: &testapigroupv1.CarpList{TypeMeta: metav1.TypeMeta{Kind: string(largeChunk)}},
expectGzip: true,
expectStreaming: true,
},
}
for _, tc := range tcs {
t.Run(tc.name, func(t *testing.T) {

View File

@ -197,9 +197,13 @@ const (
StorageVersionHash featuregate.Feature = "StorageVersionHash"
// owner: @serathius
// Allow API server to encode collections item by item, instead of all at once.
// Allow API server JSON encoder to encode collections item by item, instead of all at once.
StreamingCollectionEncodingToJSON featuregate.Feature = "StreamingCollectionEncodingToJSON"
// owner: @serathius
// Allow API server Protobuf encoder to encode collections item by item, instead of all at once.
StreamingCollectionEncodingToProtobuf featuregate.Feature = "StreamingCollectionEncodingToProtobuf"
// owner: @aramase, @enj, @nabokihms
// kep: https://kep.k8s.io/3331
//
@ -356,6 +360,10 @@ var defaultVersionedKubernetesFeatureGates = map[featuregate.Feature]featuregate
{Version: version.MustParse("1.33"), Default: true, PreRelease: featuregate.Beta},
},
StreamingCollectionEncodingToProtobuf: {
{Version: version.MustParse("1.33"), Default: true, PreRelease: featuregate.Beta},
},
StrictCostEnforcementForVAP: {
{Version: version.MustParse("1.30"), Default: false, PreRelease: featuregate.Beta},
{Version: version.MustParse("1.32"), Default: true, PreRelease: featuregate.GA, LockToDefault: true},

View File

@ -992,6 +992,9 @@ func NewDefaultAPIGroupInfo(group string, scheme *runtime.Scheme, parameterCodec
if utilfeature.DefaultFeatureGate.Enabled(features.StreamingCollectionEncodingToJSON) {
opts = append(opts, serializer.WithStreamingCollectionEncodingToJSON())
}
if utilfeature.DefaultFeatureGate.Enabled(features.StreamingCollectionEncodingToProtobuf) {
opts = append(opts, serializer.WithStreamingCollectionEncodingToProtobuf())
}
if len(opts) != 0 {
codecs = serializer.NewCodecFactory(scheme, opts...)
}

View File

@ -1344,6 +1344,12 @@
lockToDefault: false
preRelease: Beta
version: "1.33"
- name: StreamingCollectionEncodingToProtobuf
versionedSpecs:
- default: true
lockToDefault: false
preRelease: Beta
version: "1.33"
- name: StrictCostEnforcementForVAP
versionedSpecs:
- default: false