From f5dd7107f7144c4f76ca6159c1eeddb48a12feaa Mon Sep 17 00:00:00 2001 From: Marek Siarkowicz Date: Thu, 19 Dec 2024 12:30:39 +0100 Subject: [PATCH] Implement streaming proto encoding --- pkg/features/kube_features.go | 6 +- pkg/features/versioned_kube_features.go | 4 + .../core/rest/storage_core_generic.go | 3 + .../pkg/apiserver/customresource_handler.go | 7 +- .../pkg/runtime/serializer/codec_factory.go | 13 +- .../serializer/protobuf/collections.go | 174 ++++++++++++++++++ .../serializer/protobuf/collections_test.go | 117 +++++++++++- .../runtime/serializer/protobuf/protobuf.go | 87 +++++++-- .../apimachinery/pkg/runtime/types_proto.go | 127 ++++++++++++- .../pkg/runtime/types_proto_test.go | 107 +++++++++++ .../handlers/responsewriters/writers.go | 6 +- .../handlers/responsewriters/writers_test.go | 29 +++ .../apiserver/pkg/features/kube_features.go | 10 +- .../apiserver/pkg/server/genericapiserver.go | 3 + .../test_data/versioned_feature_list.yaml | 6 + 15 files changed, 667 insertions(+), 32 deletions(-) create mode 100644 staging/src/k8s.io/apimachinery/pkg/runtime/serializer/protobuf/collections.go create mode 100644 staging/src/k8s.io/apimachinery/pkg/runtime/types_proto_test.go diff --git a/pkg/features/kube_features.go b/pkg/features/kube_features.go index 13d6659afe4..d7654649685 100644 --- a/pkg/features/kube_features.go +++ b/pkg/features/kube_features.go @@ -694,9 +694,13 @@ const ( StorageVersionMigrator featuregate.Feature = "StorageVersionMigrator" // owner: @serathius - // Allow API server to encode collections item by item, instead of all at once. + // Allow API server JSON encoder to encode collections item by item, instead of all at once. StreamingCollectionEncodingToJSON featuregate.Feature = "StreamingCollectionEncodingToJSON" + // owner: serathius + // Allow API server Protobuf encoder to encode collections item by item, instead of all at once. + StreamingCollectionEncodingToProtobuf featuregate.Feature = "StreamingCollectionEncodingToProtobuf" + // owner: @robscott // kep: https://kep.k8s.io/2433 // diff --git a/pkg/features/versioned_kube_features.go b/pkg/features/versioned_kube_features.go index 89e650c1c46..6d870099ecc 100644 --- a/pkg/features/versioned_kube_features.go +++ b/pkg/features/versioned_kube_features.go @@ -763,6 +763,10 @@ var defaultVersionedKubernetesFeatureGates = map[featuregate.Feature]featuregate {Version: version.MustParse("1.33"), Default: true, PreRelease: featuregate.Beta}, }, + StreamingCollectionEncodingToProtobuf: { + {Version: version.MustParse("1.33"), Default: true, PreRelease: featuregate.Beta}, + }, + SupplementalGroupsPolicy: { {Version: version.MustParse("1.31"), Default: false, PreRelease: featuregate.Alpha}, }, diff --git a/pkg/registry/core/rest/storage_core_generic.go b/pkg/registry/core/rest/storage_core_generic.go index f5e8941278d..9d82ca85633 100644 --- a/pkg/registry/core/rest/storage_core_generic.go +++ b/pkg/registry/core/rest/storage_core_generic.go @@ -80,6 +80,9 @@ func (c *GenericConfig) NewRESTStorage(apiResourceConfigSource serverstorage.API if utilfeature.DefaultFeatureGate.Enabled(features.StreamingCollectionEncodingToJSON) { opts = append(opts, serializer.WithStreamingCollectionEncodingToJSON()) } + if utilfeature.DefaultFeatureGate.Enabled(features.StreamingCollectionEncodingToProtobuf) { + opts = append(opts, serializer.WithStreamingCollectionEncodingToProtobuf()) + } if len(opts) != 0 { apiGroupInfo.NegotiatedSerializer = serializer.NewCodecFactory(legacyscheme.Scheme, opts...) } diff --git a/staging/src/k8s.io/apiextensions-apiserver/pkg/apiserver/customresource_handler.go b/staging/src/k8s.io/apiextensions-apiserver/pkg/apiserver/customresource_handler.go index 61579f168b4..9afeb2f80ce 100644 --- a/staging/src/k8s.io/apiextensions-apiserver/pkg/apiserver/customresource_handler.go +++ b/staging/src/k8s.io/apiextensions-apiserver/pkg/apiserver/customresource_handler.go @@ -895,7 +895,9 @@ func (r *crdHandler) getOrCreateServingInfoFor(uid types.UID, name string) (*crd MediaType: "application/vnd.kubernetes.protobuf", MediaTypeType: "application", MediaTypeSubType: "vnd.kubernetes.protobuf", - Serializer: protobuf.NewSerializer(creator, typer), + Serializer: protobuf.NewSerializerWithOptions(creator, typer, protobuf.SerializerOptions{ + StreamingCollectionsEncoding: utilfeature.DefaultFeatureGate.Enabled(features.StreamingCollectionEncodingToProtobuf), + }), StreamSerializer: &runtime.StreamSerializerInfo{ Serializer: protobuf.NewRawSerializer(creator, typer), Framer: protobuf.LengthDelimitedFramer, @@ -978,6 +980,9 @@ func (r *crdHandler) getOrCreateServingInfoFor(uid types.UID, name string) (*crd if utilfeature.DefaultFeatureGate.Enabled(features.StreamingCollectionEncodingToJSON) { opts = append(opts, serializer.WithStreamingCollectionEncodingToJSON()) } + if utilfeature.DefaultFeatureGate.Enabled(features.StreamingCollectionEncodingToProtobuf) { + opts = append(opts, serializer.WithStreamingCollectionEncodingToProtobuf()) + } scaleScope.Serializer = serializer.NewCodecFactory(scaleConverter.Scheme(), opts...) scaleScope.Kind = autoscalingv1.SchemeGroupVersion.WithKind("Scale") scaleScope.Namer = handlers.ContextBasedNaming{ diff --git a/staging/src/k8s.io/apimachinery/pkg/runtime/serializer/codec_factory.go b/staging/src/k8s.io/apimachinery/pkg/runtime/serializer/codec_factory.go index 8eb0f20cc78..81286fccb49 100644 --- a/staging/src/k8s.io/apimachinery/pkg/runtime/serializer/codec_factory.go +++ b/staging/src/k8s.io/apimachinery/pkg/runtime/serializer/codec_factory.go @@ -61,7 +61,9 @@ func newSerializersForScheme(scheme *runtime.Scheme, mf json.MetaFactory, option mf, scheme, scheme, json.SerializerOptions{Yaml: true, Pretty: false, Strict: true}, ) - protoSerializer := protobuf.NewSerializer(scheme, scheme) + protoSerializer := protobuf.NewSerializerWithOptions(scheme, scheme, protobuf.SerializerOptions{ + StreamingCollectionsEncoding: options.StreamingCollectionsEncodingToProtobuf, + }) protoRawSerializer := protobuf.NewRawSerializer(scheme, scheme) serializers := []runtime.SerializerInfo{ @@ -113,7 +115,8 @@ type CodecFactoryOptions struct { // Pretty includes a pretty serializer along with the non-pretty one Pretty bool - StreamingCollectionsEncodingToJSON bool + StreamingCollectionsEncodingToJSON bool + StreamingCollectionsEncodingToProtobuf bool serializers []func(runtime.ObjectCreater, runtime.ObjectTyper) runtime.SerializerInfo } @@ -155,6 +158,12 @@ func WithStreamingCollectionEncodingToJSON() CodecFactoryOptionsMutator { } } +func WithStreamingCollectionEncodingToProtobuf() CodecFactoryOptionsMutator { + return func(options *CodecFactoryOptions) { + options.StreamingCollectionsEncodingToProtobuf = true + } +} + // NewCodecFactory provides methods for retrieving serializers for the supported wire formats // and conversion wrappers to define preferred internal and external versions. In the future, // as the internal version is used less, callers may instead use a defaulting serializer and diff --git a/staging/src/k8s.io/apimachinery/pkg/runtime/serializer/protobuf/collections.go b/staging/src/k8s.io/apimachinery/pkg/runtime/serializer/protobuf/collections.go new file mode 100644 index 00000000000..754a80820b0 --- /dev/null +++ b/staging/src/k8s.io/apimachinery/pkg/runtime/serializer/protobuf/collections.go @@ -0,0 +1,174 @@ +/* +Copyright 2025 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package protobuf + +import ( + "errors" + "io" + "math/bits" + + "github.com/gogo/protobuf/proto" + + "k8s.io/apimachinery/pkg/api/meta" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/conversion" + "k8s.io/apimachinery/pkg/runtime" +) + +var ( + errFieldCount = errors.New("expected ListType to have 3 fields") + errTypeMetaField = errors.New("expected TypeMeta field to have TypeMeta type") + errTypeMetaProtobufTag = errors.New(`expected TypeMeta protobuf field tag to be ""`) + errListMetaField = errors.New("expected ListMeta field to have ListMeta type") + errListMetaProtobufTag = errors.New(`expected ListMeta protobuf field tag to be "bytes,1,opt,name=metadata"`) + errItemsProtobufTag = errors.New(`expected Items protobuf field tag to be "bytes,2,rep,name=items"`) + errItemsSizer = errors.New(`expected Items elements to implement proto.Sizer`) +) + +// getStreamingListData implements list extraction logic for protobuf stream serialization. +// +// Reason for a custom logic instead of reusing accessors from meta package: +// * Validate proto tags to prevent incompatibility with proto standard package. +// * ListMetaAccessor doesn't distinguish empty from nil value. +// * TypeAccessor reparsing "apiVersion" and serializing it with "{group}/{version}" +func getStreamingListData(list runtime.Object) (data streamingListData, err error) { + listValue, err := conversion.EnforcePtr(list) + if err != nil { + return data, err + } + listType := listValue.Type() + if listType.NumField() != 3 { + return data, errFieldCount + } + // TypeMeta: validated, but not returned as is not serialized. + _, ok := listValue.Field(0).Interface().(metav1.TypeMeta) + if !ok { + return data, errTypeMetaField + } + if listType.Field(0).Tag.Get("protobuf") != "" { + return data, errTypeMetaProtobufTag + } + // ListMeta + listMeta, ok := listValue.Field(1).Interface().(metav1.ListMeta) + if !ok { + return data, errListMetaField + } + // if we were ever to relax the protobuf tag check we should update the hardcoded `0xa` below when writing ListMeta. + if listType.Field(1).Tag.Get("protobuf") != "bytes,1,opt,name=metadata" { + return data, errListMetaProtobufTag + } + data.listMeta = listMeta + // Items; if we were ever to relax the protobuf tag check we should update the hardcoded `0x12` below when writing Items. + if listType.Field(2).Tag.Get("protobuf") != "bytes,2,rep,name=items" { + return data, errItemsProtobufTag + } + items, err := meta.ExtractList(list) + if err != nil { + return data, err + } + data.items = items + data.totalSize, data.listMetaSize, data.itemsSizes, err = listSize(listMeta, items) + return data, err +} + +type streamingListData struct { + // totalSize is the total size of the serialized List object, including their proto headers/size bytes + totalSize int + + // listMetaSize caches results from .Size() call to listMeta, doesn't include header bytes (field identifier, size) + listMetaSize int + listMeta metav1.ListMeta + + // itemsSizes caches results from .Size() call to items, doesn't include header bytes (field identifier, size) + itemsSizes []int + items []runtime.Object +} + +// listSize return size of ListMeta and items to be later used for preallocations. +// listMetaSize and itemSizes do not include header bytes (field identifier, size). +func listSize(listMeta metav1.ListMeta, items []runtime.Object) (totalSize, listMetaSize int, itemSizes []int, err error) { + // ListMeta + listMetaSize = listMeta.Size() + totalSize += 1 + sovGenerated(uint64(listMetaSize)) + listMetaSize + // Items + itemSizes = make([]int, len(items)) + for i, item := range items { + sizer, ok := item.(proto.Sizer) + if !ok { + return totalSize, listMetaSize, nil, errItemsSizer + } + n := sizer.Size() + itemSizes[i] = n + totalSize += 1 + sovGenerated(uint64(n)) + n + } + return totalSize, listMetaSize, itemSizes, nil +} + +func streamingEncodeUnknownList(w io.Writer, unk runtime.Unknown, listData streamingListData, memAlloc runtime.MemoryAllocator) error { + _, err := w.Write(protoEncodingPrefix) + if err != nil { + return err + } + // encodeList is responsible for encoding the List into the unknown Raw. + encodeList := func(writer io.Writer) (int, error) { + return streamingEncodeList(writer, listData, memAlloc) + } + _, err = unk.MarshalToWriter(w, listData.totalSize, encodeList) + return err +} + +func streamingEncodeList(w io.Writer, listData streamingListData, memAlloc runtime.MemoryAllocator) (size int, err error) { + // ListMeta; 0xa = (1 << 3) | 2; field number: 1, type: 2 (LEN). https://protobuf.dev/programming-guides/encoding/#structure + n, err := doEncodeWithHeader(&listData.listMeta, w, 0xa, listData.listMetaSize, memAlloc) + size += n + if err != nil { + return size, err + } + // Items; 0x12 = (2 << 3) | 2; field number: 2, type: 2 (LEN). https://protobuf.dev/programming-guides/encoding/#structure + for i, item := range listData.items { + n, err := doEncodeWithHeader(item, w, 0x12, listData.itemsSizes[i], memAlloc) + size += n + if err != nil { + return size, err + } + } + return size, nil +} + +func writeVarintGenerated(w io.Writer, v int) (int, error) { + buf := make([]byte, sovGenerated(uint64(v))) + encodeVarintGenerated(buf, len(buf), uint64(v)) + return w.Write(buf) +} + +// sovGenerated is copied from `generated.pb.go` returns size of varint. +func sovGenerated(v uint64) int { + return (bits.Len64(v|1) + 6) / 7 +} + +// encodeVarintGenerated is copied from `generated.pb.go` encodes varint. +func encodeVarintGenerated(dAtA []byte, offset int, v uint64) int { + offset -= sovGenerated(v) + base := offset + for v >= 1<<7 { + dAtA[offset] = uint8(v&0x7f | 0x80) + v >>= 7 + offset++ + } + dAtA[offset] = uint8(v) + return base +} diff --git a/staging/src/k8s.io/apimachinery/pkg/runtime/serializer/protobuf/collections_test.go b/staging/src/k8s.io/apimachinery/pkg/runtime/serializer/protobuf/collections_test.go index fa10e290931..c48d416b5dc 100644 --- a/staging/src/k8s.io/apimachinery/pkg/runtime/serializer/protobuf/collections_test.go +++ b/staging/src/k8s.io/apimachinery/pkg/runtime/serializer/protobuf/collections_test.go @@ -23,21 +23,26 @@ import ( "os/exec" "testing" + "github.com/gogo/protobuf/proto" "github.com/google/go-cmp/cmp" + "sigs.k8s.io/randfill" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" testapigroupv1 "k8s.io/apimachinery/pkg/apis/testapigroup/v1" "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/runtime/schema" ) func TestCollectionsEncoding(t *testing.T) { t.Run("Normal", func(t *testing.T) { - testCollectionsEncoding(t, NewSerializer(nil, nil)) + testCollectionsEncoding(t, NewSerializer(nil, nil), false) + }) + t.Run("Streaming", func(t *testing.T) { + testCollectionsEncoding(t, NewSerializerWithOptions(nil, nil, SerializerOptions{StreamingCollectionsEncoding: true}), true) }) - // Leave place for testing streaming collection serializer proposed as part of KEP-5116 } -func testCollectionsEncoding(t *testing.T, s *Serializer) { +func testCollectionsEncoding(t *testing.T, s *Serializer, streamingEnabled bool) { var remainingItems int64 = 1 testCases := []struct { name string @@ -191,7 +196,7 @@ func testCollectionsEncoding(t *testing.T, s *Serializer) { for _, tc := range testCases { t.Run(tc.name, func(t *testing.T) { - var buf bytes.Buffer + var buf writeCountingBuffer if err := s.Encode(tc.in, &buf); err != nil { t.Fatalf("unexpected error: %v", err) } @@ -201,8 +206,25 @@ func testCollectionsEncoding(t *testing.T, s *Serializer) { t.Fatal(err) } if !bytes.Equal(expectBytes, actualBytes) { - t.Errorf("expected:\n%s\ngot:\n%s", tc.expect, base64.StdEncoding.EncodeToString(actualBytes)) - t.Log(cmp.Diff(dumpProto(t, actualBytes[4:]), dumpProto(t, expectBytes[4:]))) + expectedBytes, err := base64.StdEncoding.DecodeString(tc.expect) + if err == nil { + t.Errorf("expected:\n%v\ngot:\n%v", expectedBytes, actualBytes) + } else { + t.Errorf("expected:\n%v\ngot:\n%v", tc.expect, base64.StdEncoding.EncodeToString(actualBytes)) + } + actualProto := dumpProto(t, actualBytes[4:]) + expectedProto := dumpProto(t, expectBytes[4:]) + if actualProto != "" && expectedProto != "" { + t.Log(cmp.Diff(actualProto, expectedProto)) + } else { + t.Log(cmp.Diff(actualBytes, expectBytes)) + } + } + if streamingEnabled && buf.writeCount <= 1 { + t.Errorf("expected streaming but Write was called only: %d", buf.writeCount) + } + if !streamingEnabled && buf.writeCount > 1 { + t.Errorf("expected non-streaming but Write was called more than once: %d", buf.writeCount) } }) } @@ -226,3 +248,86 @@ func dumpProto(t *testing.T, data []byte) string { } return string(d) } + +type writeCountingBuffer struct { + writeCount int + bytes.Buffer +} + +func (b *writeCountingBuffer) Write(data []byte) (int, error) { + b.writeCount++ + return b.Buffer.Write(data) +} + +func (b *writeCountingBuffer) Reset() { + b.writeCount = 0 + b.Buffer.Reset() +} + +func TestFuzzCollection(t *testing.T) { + f := randfill.New() + streamingEncoder := NewSerializerWithOptions(nil, nil, SerializerOptions{StreamingCollectionsEncoding: true}) + streamingBuffer := &bytes.Buffer{} + normalEncoder := NewSerializerWithOptions(nil, nil, SerializerOptions{StreamingCollectionsEncoding: false}) + normalBuffer := &bytes.Buffer{} + for i := 0; i < 1000; i++ { + list := &testapigroupv1.CarpList{} + f.FillNoCustom(list) + streamingBuffer.Reset() + normalBuffer.Reset() + if err := streamingEncoder.Encode(list, streamingBuffer); err != nil { + t.Fatal(err) + } + if err := normalEncoder.Encode(list, normalBuffer); err != nil { + t.Fatal(err) + } + if diff := cmp.Diff(streamingBuffer.String(), normalBuffer.String()); diff != "" { + t.Logf("normal: %s", normalBuffer.String()) + t.Logf("streaming: %s", streamingBuffer.String()) + t.Fatalf("unexpected output:\n%s", diff) + } + } +} + +func TestCallsToSize(t *testing.T) { + counter := &countingSizer{data: []byte("abba")} + listMeta := metav1.ListMeta{} + listData := streamingListData{ + totalSize: 14, + listMeta: listMeta, + listMetaSize: listMeta.Size(), + itemsSizes: []int{counter.Size()}, + items: []runtime.Object{counter}, + } + err := streamingEncodeUnknownList(io.Discard, runtime.Unknown{}, listData, &runtime.Allocator{}) + if err != nil { + t.Fatal(err) + } + if counter.count != 1 { + t.Errorf("Expected only 1 call to sizer, got %d", counter.count) + } +} + +type countingSizer struct { + data []byte + count int +} + +var _ proto.Sizer = (*countingSizer)(nil) +var _ runtime.ProtobufMarshaller = (*countingSizer)(nil) + +func (s *countingSizer) MarshalTo(data []byte) (int, error) { + return copy(data, s.data), nil +} +func (s *countingSizer) Size() int { + s.count++ + return len(s.data) +} + +func (s *countingSizer) DeepCopyObject() runtime.Object { + return nil +} + +func (s *countingSizer) GetObjectKind() schema.ObjectKind { + return nil +} diff --git a/staging/src/k8s.io/apimachinery/pkg/runtime/serializer/protobuf/protobuf.go b/staging/src/k8s.io/apimachinery/pkg/runtime/serializer/protobuf/protobuf.go index c63e6dc63f6..c66c49ac4c2 100644 --- a/staging/src/k8s.io/apimachinery/pkg/runtime/serializer/protobuf/protobuf.go +++ b/staging/src/k8s.io/apimachinery/pkg/runtime/serializer/protobuf/protobuf.go @@ -72,10 +72,18 @@ func IsNotMarshalable(err error) bool { // is passed, the encoded object will have group, version, and kind fields set. If typer is nil, the objects will be written // as-is (any type info passed with the object will be used). func NewSerializer(creater runtime.ObjectCreater, typer runtime.ObjectTyper) *Serializer { + return NewSerializerWithOptions(creater, typer, SerializerOptions{}) +} + +// NewSerializerWithOptions creates a Protobuf serializer that handles encoding versioned objects into the proper wire form. If a typer +// is passed, the encoded object will have group, version, and kind fields set. If typer is nil, the objects will be written +// as-is (any type info passed with the object will be used). +func NewSerializerWithOptions(creater runtime.ObjectCreater, typer runtime.ObjectTyper, opts SerializerOptions) *Serializer { return &Serializer{ prefix: protoEncodingPrefix, creater: creater, typer: typer, + options: opts, } } @@ -84,6 +92,14 @@ type Serializer struct { prefix []byte creater runtime.ObjectCreater typer runtime.ObjectTyper + + options SerializerOptions +} + +// SerializerOptions holds the options which are used to configure a Proto serializer. +type SerializerOptions struct { + // StreamingCollectionsEncoding enables encoding collection, one item at the time, drastically reducing memory needed. + StreamingCollectionsEncoding bool } var _ runtime.Serializer = &Serializer{} @@ -209,6 +225,13 @@ func (s *Serializer) doEncode(obj runtime.Object, w io.Writer, memAlloc runtime. }, } } + if s.options.StreamingCollectionsEncoding { + listData, err := getStreamingListData(obj) + if err == nil { + // Doesn't honor custom proto marshaling methods (like json streaming), because all proto objects implement proto methods. + return streamingEncodeUnknownList(w, unk, listData, memAlloc) + } + } switch t := obj.(type) { case bufferedMarshaller: @@ -428,6 +451,39 @@ func (s *RawSerializer) encode(obj runtime.Object, w io.Writer, memAlloc runtime } func (s *RawSerializer) doEncode(obj runtime.Object, w io.Writer, memAlloc runtime.MemoryAllocator) error { + _, err := doEncode(obj, w, nil, memAlloc) + return err +} + +func doEncodeWithHeader(obj any, w io.Writer, field byte, precomputedSize int, memAlloc runtime.MemoryAllocator) (size int, err error) { + // Field identifier + n, err := w.Write([]byte{field}) + size += n + if err != nil { + return size, err + } + // Size + n, err = writeVarintGenerated(w, precomputedSize) + size += n + if err != nil { + return size, err + } + // Obj + n, err = doEncode(obj, w, &precomputedSize, memAlloc) + size += n + if err != nil { + return size, err + } + if n != precomputedSize { + return size, fmt.Errorf("the size value was %d, but doEncode wrote %d bytes to data", precomputedSize, n) + } + return size, nil +} + +// doEncode encodes provided object into writer using a allocator if possible. +// Avoids call by object Size if precomputedObjSize is provided. +// precomputedObjSize should not include header bytes (field identifier, size). +func doEncode(obj any, w io.Writer, precomputedObjSize *int, memAlloc runtime.MemoryAllocator) (int, error) { if memAlloc == nil { klog.Error("a mandatory memory allocator wasn't provided, this might have a negative impact on performance, check invocations of EncodeWithAllocator method, falling back on runtime.SimpleAllocator") memAlloc = &runtime.SimpleAllocator{} @@ -436,40 +492,43 @@ func (s *RawSerializer) doEncode(obj runtime.Object, w io.Writer, memAlloc runti case bufferedReverseMarshaller: // this path performs a single allocation during write only when the Allocator wasn't provided // it also requires the caller to implement the more efficient Size and MarshalToSizedBuffer methods - encodedSize := uint64(t.Size()) - data := memAlloc.Allocate(encodedSize) + if precomputedObjSize == nil { + s := t.Size() + precomputedObjSize = &s + } + data := memAlloc.Allocate(uint64(*precomputedObjSize)) n, err := t.MarshalToSizedBuffer(data) if err != nil { - return err + return 0, err } - _, err = w.Write(data[:n]) - return err + return w.Write(data[:n]) case bufferedMarshaller: // this path performs a single allocation during write only when the Allocator wasn't provided // it also requires the caller to implement the more efficient Size and MarshalTo methods - encodedSize := uint64(t.Size()) - data := memAlloc.Allocate(encodedSize) + if precomputedObjSize == nil { + s := t.Size() + precomputedObjSize = &s + } + data := memAlloc.Allocate(uint64(*precomputedObjSize)) n, err := t.MarshalTo(data) if err != nil { - return err + return 0, err } - _, err = w.Write(data[:n]) - return err + return w.Write(data[:n]) case proto.Marshaler: // this path performs extra allocations data, err := t.Marshal() if err != nil { - return err + return 0, err } - _, err = w.Write(data) - return err + return w.Write(data) default: - return errNotMarshalable{reflect.TypeOf(obj)} + return 0, errNotMarshalable{reflect.TypeOf(obj)} } } diff --git a/staging/src/k8s.io/apimachinery/pkg/runtime/types_proto.go b/staging/src/k8s.io/apimachinery/pkg/runtime/types_proto.go index a82227b239a..27a2064c416 100644 --- a/staging/src/k8s.io/apimachinery/pkg/runtime/types_proto.go +++ b/staging/src/k8s.io/apimachinery/pkg/runtime/types_proto.go @@ -18,6 +18,7 @@ package runtime import ( "fmt" + "io" ) type ProtobufMarshaller interface { @@ -28,6 +29,124 @@ type ProtobufReverseMarshaller interface { MarshalToSizedBuffer(data []byte) (int, error) } +const ( + typeMetaTag = 0xa + rawTag = 0x12 + contentEncodingTag = 0x1a + contentTypeTag = 0x22 + + // max length of a varint for a uint64 + maxUint64VarIntLength = 10 +) + +// MarshalToWriter allows a caller to provide a streaming writer for raw bytes, +// instead of populating them inside the Unknown struct. +// rawSize is the number of bytes rawWriter will write in a success case. +// writeRaw is called when it is time to write the raw bytes. It must return `rawSize, nil` or an error. +func (m *Unknown) MarshalToWriter(w io.Writer, rawSize int, writeRaw func(io.Writer) (int, error)) (int, error) { + size := 0 + + // reuse the buffer for varint marshaling + varintBuffer := make([]byte, maxUint64VarIntLength) + writeVarint := func(i int) (int, error) { + offset := encodeVarintGenerated(varintBuffer, len(varintBuffer), uint64(i)) + return w.Write(varintBuffer[offset:]) + } + + // TypeMeta + { + n, err := w.Write([]byte{typeMetaTag}) + size += n + if err != nil { + return size, err + } + + typeMetaBytes, err := m.TypeMeta.Marshal() + if err != nil { + return size, err + } + + n, err = writeVarint(len(typeMetaBytes)) + size += n + if err != nil { + return size, err + } + + n, err = w.Write(typeMetaBytes) + size += n + if err != nil { + return size, err + } + } + + // Raw, delegating write to writeRaw() + { + n, err := w.Write([]byte{rawTag}) + size += n + if err != nil { + return size, err + } + + n, err = writeVarint(rawSize) + size += n + if err != nil { + return size, err + } + + n, err = writeRaw(w) + size += n + if err != nil { + return size, err + } + if n != int(rawSize) { + return size, fmt.Errorf("the size value was %d, but encoding wrote %d bytes to data", rawSize, n) + } + } + + // ContentEncoding + { + n, err := w.Write([]byte{contentEncodingTag}) + size += n + if err != nil { + return size, err + } + + n, err = writeVarint(len(m.ContentEncoding)) + size += n + if err != nil { + return size, err + } + + n, err = w.Write([]byte(m.ContentEncoding)) + size += n + if err != nil { + return size, err + } + } + + // ContentEncoding + { + n, err := w.Write([]byte{contentTypeTag}) + size += n + if err != nil { + return size, err + } + + n, err = writeVarint(len(m.ContentType)) + size += n + if err != nil { + return size, err + } + + n, err = w.Write([]byte(m.ContentType)) + size += n + if err != nil { + return size, err + } + } + return size, nil +} + // NestedMarshalTo allows a caller to avoid extra allocations during serialization of an Unknown // that will contain an object that implements ProtobufMarshaller or ProtobufReverseMarshaller. func (m *Unknown) NestedMarshalTo(data []byte, b ProtobufMarshaller, size uint64) (int, error) { @@ -43,12 +162,12 @@ func (m *Unknown) NestedMarshalTo(data []byte, b ProtobufMarshaller, size uint64 copy(data[i:], m.ContentType) i = encodeVarintGenerated(data, i, uint64(len(m.ContentType))) i-- - data[i] = 0x22 + data[i] = contentTypeTag i -= len(m.ContentEncoding) copy(data[i:], m.ContentEncoding) i = encodeVarintGenerated(data, i, uint64(len(m.ContentEncoding))) i-- - data[i] = 0x1a + data[i] = contentEncodingTag if b != nil { if r, ok := b.(ProtobufReverseMarshaller); ok { n1, err := r.MarshalToSizedBuffer(data[:i]) @@ -75,7 +194,7 @@ func (m *Unknown) NestedMarshalTo(data []byte, b ProtobufMarshaller, size uint64 } i = encodeVarintGenerated(data, i, size) i-- - data[i] = 0x12 + data[i] = rawTag } n2, err := m.TypeMeta.MarshalToSizedBuffer(data[:i]) if err != nil { @@ -84,6 +203,6 @@ func (m *Unknown) NestedMarshalTo(data []byte, b ProtobufMarshaller, size uint64 i -= n2 i = encodeVarintGenerated(data, i, uint64(n2)) i-- - data[i] = 0xa + data[i] = typeMetaTag return msgSize - i, nil } diff --git a/staging/src/k8s.io/apimachinery/pkg/runtime/types_proto_test.go b/staging/src/k8s.io/apimachinery/pkg/runtime/types_proto_test.go new file mode 100644 index 00000000000..39fb709426d --- /dev/null +++ b/staging/src/k8s.io/apimachinery/pkg/runtime/types_proto_test.go @@ -0,0 +1,107 @@ +/* +Copyright 2025 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package runtime + +import ( + "bytes" + "io" + "math" + "testing" + + "github.com/google/go-cmp/cmp" +) + +func TestVarint(t *testing.T) { + varintBuffer := make([]byte, maxUint64VarIntLength) + offset := encodeVarintGenerated(varintBuffer, len(varintBuffer), math.MaxUint64) + used := len(varintBuffer) - offset + if used != maxUint64VarIntLength { + t.Fatalf("expected encodeVarintGenerated to use %d bytes to encode MaxUint64, got %d", maxUint64VarIntLength, used) + } +} + +func TestNestedMarshalToWriter(t *testing.T) { + testcases := []struct { + name string + raw []byte + }{ + { + name: "zero-length", + raw: []byte{}, + }, + { + name: "simple", + raw: []byte{0x00, 0x01, 0x02, 0x03}, + }, + } + + for _, tc := range testcases { + t.Run(tc.name, func(t *testing.T) { + u := &Unknown{ + ContentType: "ct", + ContentEncoding: "ce", + TypeMeta: TypeMeta{ + APIVersion: "v1", + Kind: "k", + }, + } + + // Marshal normally with Raw inlined + u.Raw = tc.raw + marshalData, err := u.Marshal() + if err != nil { + t.Fatal(err) + } + u.Raw = nil + + // Marshal with NestedMarshalTo + nestedMarshalData := make([]byte, len(marshalData)) + n, err := u.NestedMarshalTo(nestedMarshalData, copyMarshaler(tc.raw), uint64(len(tc.raw))) + if err != nil { + t.Fatal(err) + } + if n != len(marshalData) { + t.Errorf("NestedMarshalTo returned %d, expected %d", n, len(marshalData)) + } + if e, a := marshalData, nestedMarshalData; !bytes.Equal(e, a) { + t.Errorf("NestedMarshalTo and Marshal differ:\n%s", cmp.Diff(e, a)) + } + + // Streaming marshal with MarshalToWriter + buf := bytes.NewBuffer(nil) + n, err = u.MarshalToWriter(buf, len(tc.raw), func(w io.Writer) (int, error) { + return w.Write(tc.raw) + }) + if err != nil { + t.Fatal(err) + } + if n != len(marshalData) { + t.Errorf("MarshalToWriter returned %d, expected %d", n, len(marshalData)) + } + if e, a := marshalData, buf.Bytes(); !bytes.Equal(e, a) { + t.Errorf("MarshalToWriter and Marshal differ:\n%s", cmp.Diff(e, a)) + } + }) + } +} + +type copyMarshaler []byte + +func (c copyMarshaler) MarshalTo(dest []byte) (int, error) { + n := copy(dest, []byte(c)) + return n, nil +} diff --git a/staging/src/k8s.io/apiserver/pkg/endpoints/handlers/responsewriters/writers.go b/staging/src/k8s.io/apiserver/pkg/endpoints/handlers/responsewriters/writers.go index ec5ff956320..e1d440f4a63 100644 --- a/staging/src/k8s.io/apiserver/pkg/endpoints/handlers/responsewriters/writers.go +++ b/staging/src/k8s.io/apiserver/pkg/endpoints/handlers/responsewriters/writers.go @@ -157,9 +157,9 @@ const ( // (usually the entire object), and if the size is smaller no gzipping will be performed // if the client requests it. defaultGzipThresholdBytes = 128 * 1024 - // Use the length of the first write of streaming implementations. - // TODO: Update when streaming proto is implemented - firstWriteStreamingThresholdBytes = 1 + // Use the length of the first write to recognize streaming implementations. + // When streaming JSON first write is "{", while Kubernetes protobuf starts unique 4 byte header. + firstWriteStreamingThresholdBytes = 4 ) // negotiateContentEncoding returns a supported client-requested content encoding for the diff --git a/staging/src/k8s.io/apiserver/pkg/endpoints/handlers/responsewriters/writers_test.go b/staging/src/k8s.io/apiserver/pkg/endpoints/handlers/responsewriters/writers_test.go index 979014b37d4..862d6d96013 100644 --- a/staging/src/k8s.io/apiserver/pkg/endpoints/handlers/responsewriters/writers_test.go +++ b/staging/src/k8s.io/apiserver/pkg/endpoints/handlers/responsewriters/writers_test.go @@ -44,6 +44,7 @@ import ( "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime/schema" jsonserializer "k8s.io/apimachinery/pkg/runtime/serializer/json" + "k8s.io/apimachinery/pkg/runtime/serializer/protobuf" rand2 "k8s.io/apimachinery/pkg/util/rand" "k8s.io/apimachinery/pkg/util/uuid" "k8s.io/apiserver/pkg/features" @@ -845,6 +846,34 @@ func TestStreamingGzipIntegration(t *testing.T) { expectGzip: true, expectStreaming: true, }, + { + name: "Protobuf, small object, default -> no gzip", + serializer: protobuf.NewSerializerWithOptions(nil, nil, protobuf.SerializerOptions{}), + object: &testapigroupv1.CarpList{}, + expectGzip: false, + expectStreaming: false, + }, + { + name: "Protobuf, small object, streaming -> no gzip", + serializer: protobuf.NewSerializerWithOptions(nil, nil, protobuf.SerializerOptions{StreamingCollectionsEncoding: true}), + object: &testapigroupv1.CarpList{}, + expectGzip: false, + expectStreaming: true, + }, + { + name: "Protobuf, large object, default -> gzip", + serializer: protobuf.NewSerializerWithOptions(nil, nil, protobuf.SerializerOptions{}), + object: &testapigroupv1.CarpList{TypeMeta: metav1.TypeMeta{Kind: string(largeChunk)}}, + expectGzip: true, + expectStreaming: false, + }, + { + name: "Protobuf, large object, streaming -> gzip", + serializer: protobuf.NewSerializerWithOptions(nil, nil, protobuf.SerializerOptions{StreamingCollectionsEncoding: true}), + object: &testapigroupv1.CarpList{TypeMeta: metav1.TypeMeta{Kind: string(largeChunk)}}, + expectGzip: true, + expectStreaming: true, + }, } for _, tc := range tcs { t.Run(tc.name, func(t *testing.T) { diff --git a/staging/src/k8s.io/apiserver/pkg/features/kube_features.go b/staging/src/k8s.io/apiserver/pkg/features/kube_features.go index 8e4153d1014..8cd41f19852 100644 --- a/staging/src/k8s.io/apiserver/pkg/features/kube_features.go +++ b/staging/src/k8s.io/apiserver/pkg/features/kube_features.go @@ -218,9 +218,13 @@ const ( StorageVersionHash featuregate.Feature = "StorageVersionHash" // owner: @serathius - // Allow API server to encode collections item by item, instead of all at once. + // Allow API server JSON encoder to encode collections item by item, instead of all at once. StreamingCollectionEncodingToJSON featuregate.Feature = "StreamingCollectionEncodingToJSON" + // owner: @serathius + // Allow API server Protobuf encoder to encode collections item by item, instead of all at once. + StreamingCollectionEncodingToProtobuf featuregate.Feature = "StreamingCollectionEncodingToProtobuf" + // owner: @aramase, @enj, @nabokihms // kep: https://kep.k8s.io/3331 // @@ -395,6 +399,10 @@ var defaultVersionedKubernetesFeatureGates = map[featuregate.Feature]featuregate {Version: version.MustParse("1.33"), Default: true, PreRelease: featuregate.Beta}, }, + StreamingCollectionEncodingToProtobuf: { + {Version: version.MustParse("1.33"), Default: true, PreRelease: featuregate.Beta}, + }, + StrictCostEnforcementForVAP: { {Version: version.MustParse("1.30"), Default: false, PreRelease: featuregate.Beta}, {Version: version.MustParse("1.32"), Default: true, PreRelease: featuregate.GA, LockToDefault: true}, diff --git a/staging/src/k8s.io/apiserver/pkg/server/genericapiserver.go b/staging/src/k8s.io/apiserver/pkg/server/genericapiserver.go index 82b9df9441b..eee379b52af 100644 --- a/staging/src/k8s.io/apiserver/pkg/server/genericapiserver.go +++ b/staging/src/k8s.io/apiserver/pkg/server/genericapiserver.go @@ -998,6 +998,9 @@ func NewDefaultAPIGroupInfo(group string, scheme *runtime.Scheme, parameterCodec if utilfeature.DefaultFeatureGate.Enabled(features.StreamingCollectionEncodingToJSON) { opts = append(opts, serializer.WithStreamingCollectionEncodingToJSON()) } + if utilfeature.DefaultFeatureGate.Enabled(features.StreamingCollectionEncodingToProtobuf) { + opts = append(opts, serializer.WithStreamingCollectionEncodingToProtobuf()) + } if len(opts) != 0 { codecs = serializer.NewCodecFactory(scheme, opts...) } diff --git a/test/featuregates_linter/test_data/versioned_feature_list.yaml b/test/featuregates_linter/test_data/versioned_feature_list.yaml index a32fecff12a..fbb2994a1e1 100644 --- a/test/featuregates_linter/test_data/versioned_feature_list.yaml +++ b/test/featuregates_linter/test_data/versioned_feature_list.yaml @@ -1378,6 +1378,12 @@ lockToDefault: false preRelease: Beta version: "1.33" +- name: StreamingCollectionEncodingToProtobuf + versionedSpecs: + - default: true + lockToDefault: false + preRelease: Beta + version: "1.33" - name: StrictCostEnforcementForVAP versionedSpecs: - default: false