diff --git a/pkg/features/kube_features.go b/pkg/features/kube_features.go index 8a995ca3ee6..ff634fecd9d 100644 --- a/pkg/features/kube_features.go +++ b/pkg/features/kube_features.go @@ -667,6 +667,10 @@ const ( // Enables support for the StorageVersionMigrator controller. StorageVersionMigrator featuregate.Feature = "StorageVersionMigrator" + // owner: @serathius + // Allow API server to encode collections item by item, instead of all at once. + StreamingCollectionEncodingToJSON featuregate.Feature = "StreamingCollectionEncodingToJSON" + // owner: @robscott // kep: https://kep.k8s.io/2433 // diff --git a/pkg/features/versioned_kube_features.go b/pkg/features/versioned_kube_features.go index 0002c18d07a..f29bbab5513 100644 --- a/pkg/features/versioned_kube_features.go +++ b/pkg/features/versioned_kube_features.go @@ -742,6 +742,10 @@ var defaultVersionedKubernetesFeatureGates = map[featuregate.Feature]featuregate {Version: version.MustParse("1.30"), Default: false, PreRelease: featuregate.Alpha}, }, + StreamingCollectionEncodingToJSON: { + {Version: version.MustParse("1.33"), Default: true, PreRelease: featuregate.Beta}, + }, + SupplementalGroupsPolicy: { {Version: version.MustParse("1.31"), Default: false, PreRelease: featuregate.Alpha}, }, diff --git a/pkg/registry/core/rest/storage_core_generic.go b/pkg/registry/core/rest/storage_core_generic.go index 26a9f293186..f5e8941278d 100644 --- a/pkg/registry/core/rest/storage_core_generic.go +++ b/pkg/registry/core/rest/storage_core_generic.go @@ -73,8 +73,15 @@ func (c *GenericConfig) NewRESTStorage(apiResourceConfigSource serverstorage.API ParameterCodec: legacyscheme.ParameterCodec, NegotiatedSerializer: legacyscheme.Codecs, } + opts := []serializer.CodecFactoryOptionsMutator{} if utilfeature.DefaultFeatureGate.Enabled(features.CBORServingAndStorage) { - apiGroupInfo.NegotiatedSerializer = serializer.NewCodecFactory(legacyscheme.Scheme, serializer.WithSerializer(cbor.NewSerializerInfo)) + opts = append(opts, serializer.WithSerializer(cbor.NewSerializerInfo)) + } + if utilfeature.DefaultFeatureGate.Enabled(features.StreamingCollectionEncodingToJSON) { + opts = append(opts, serializer.WithStreamingCollectionEncodingToJSON()) + } + if len(opts) != 0 { + apiGroupInfo.NegotiatedSerializer = serializer.NewCodecFactory(legacyscheme.Scheme, opts...) } eventStorage, err := eventstore.NewREST(restOptionsGetter, uint64(c.EventTTL.Seconds())) diff --git a/staging/src/k8s.io/apiextensions-apiserver/pkg/apiserver/customresource_handler.go b/staging/src/k8s.io/apiextensions-apiserver/pkg/apiserver/customresource_handler.go index 7147066d43a..83878974e2c 100644 --- a/staging/src/k8s.io/apiextensions-apiserver/pkg/apiserver/customresource_handler.go +++ b/staging/src/k8s.io/apiextensions-apiserver/pkg/apiserver/customresource_handler.go @@ -851,6 +851,7 @@ func (r *crdHandler) getOrCreateServingInfoFor(uid types.UID, name string) (*crd clusterScoped := crd.Spec.Scope == apiextensionsv1.ClusterScoped // CRDs explicitly do not support protobuf, but some objects returned by the API server do + streamingCollections := utilfeature.DefaultFeatureGate.Enabled(features.StreamingCollectionEncodingToJSON) negotiatedSerializer := unstructuredNegotiatedSerializer{ typer: typer, creator: creator, @@ -864,10 +865,11 @@ func (r *crdHandler) getOrCreateServingInfoFor(uid types.UID, name string) (*crd MediaTypeType: "application", MediaTypeSubType: "json", EncodesAsText: true, - Serializer: json.NewSerializerWithOptions(json.DefaultMetaFactory, creator, typer, json.SerializerOptions{}), + Serializer: json.NewSerializerWithOptions(json.DefaultMetaFactory, creator, typer, json.SerializerOptions{StreamingCollectionsEncoding: streamingCollections}), PrettySerializer: json.NewSerializerWithOptions(json.DefaultMetaFactory, creator, typer, json.SerializerOptions{Pretty: true}), StrictSerializer: json.NewSerializerWithOptions(json.DefaultMetaFactory, creator, typer, json.SerializerOptions{ - Strict: true, + Strict: true, + StreamingCollectionsEncoding: streamingCollections, }), StreamSerializer: &runtime.StreamSerializerInfo{ EncodesAsText: true, @@ -970,6 +972,9 @@ func (r *crdHandler) getOrCreateServingInfoFor(uid types.UID, name string) (*crd if utilfeature.DefaultFeatureGate.Enabled(features.CBORServingAndStorage) { opts = append(opts, serializer.WithSerializer(cbor.NewSerializerInfo)) } + if utilfeature.DefaultFeatureGate.Enabled(features.StreamingCollectionEncodingToJSON) { + opts = append(opts, serializer.WithStreamingCollectionEncodingToJSON()) + } scaleScope.Serializer = serializer.NewCodecFactory(scaleConverter.Scheme(), opts...) scaleScope.Kind = autoscalingv1.SchemeGroupVersion.WithKind("Scale") scaleScope.Namer = handlers.ContextBasedNaming{ diff --git a/staging/src/k8s.io/apimachinery/pkg/api/meta/help.go b/staging/src/k8s.io/apimachinery/pkg/api/meta/help.go index 1fdd32c4ba3..468afd0e9ee 100644 --- a/staging/src/k8s.io/apimachinery/pkg/api/meta/help.go +++ b/staging/src/k8s.io/apimachinery/pkg/api/meta/help.go @@ -221,6 +221,9 @@ func extractList(obj runtime.Object, allocNew bool) ([]runtime.Object, error) { if err != nil { return nil, err } + if items.IsNil() { + return nil, nil + } list := make([]runtime.Object, items.Len()) if len(list) == 0 { return list, nil diff --git a/staging/src/k8s.io/apimachinery/pkg/runtime/serializer/codec_factory.go b/staging/src/k8s.io/apimachinery/pkg/runtime/serializer/codec_factory.go index 77bb3074525..8eb0f20cc78 100644 --- a/staging/src/k8s.io/apimachinery/pkg/runtime/serializer/codec_factory.go +++ b/staging/src/k8s.io/apimachinery/pkg/runtime/serializer/codec_factory.go @@ -28,7 +28,7 @@ import ( func newSerializersForScheme(scheme *runtime.Scheme, mf json.MetaFactory, options CodecFactoryOptions) []runtime.SerializerInfo { jsonSerializer := json.NewSerializerWithOptions( mf, scheme, scheme, - json.SerializerOptions{Yaml: false, Pretty: false, Strict: options.Strict}, + json.SerializerOptions{Yaml: false, Pretty: false, Strict: options.Strict, StreamingCollectionsEncoding: options.StreamingCollectionsEncodingToJSON}, ) jsonSerializerType := runtime.SerializerInfo{ MediaType: runtime.ContentTypeJSON, @@ -38,7 +38,7 @@ func newSerializersForScheme(scheme *runtime.Scheme, mf json.MetaFactory, option Serializer: jsonSerializer, StrictSerializer: json.NewSerializerWithOptions( mf, scheme, scheme, - json.SerializerOptions{Yaml: false, Pretty: false, Strict: true}, + json.SerializerOptions{Yaml: false, Pretty: false, Strict: true, StreamingCollectionsEncoding: options.StreamingCollectionsEncodingToJSON}, ), StreamSerializer: &runtime.StreamSerializerInfo{ EncodesAsText: true, @@ -113,6 +113,8 @@ type CodecFactoryOptions struct { // Pretty includes a pretty serializer along with the non-pretty one Pretty bool + StreamingCollectionsEncodingToJSON bool + serializers []func(runtime.ObjectCreater, runtime.ObjectTyper) runtime.SerializerInfo } @@ -147,6 +149,12 @@ func WithSerializer(f func(runtime.ObjectCreater, runtime.ObjectTyper) runtime.S } } +func WithStreamingCollectionEncodingToJSON() CodecFactoryOptionsMutator { + return func(options *CodecFactoryOptions) { + options.StreamingCollectionsEncodingToJSON = true + } +} + // NewCodecFactory provides methods for retrieving serializers for the supported wire formats // and conversion wrappers to define preferred internal and external versions. In the future, // as the internal version is used less, callers may instead use a defaulting serializer and diff --git a/staging/src/k8s.io/apimachinery/pkg/runtime/serializer/json/collections.go b/staging/src/k8s.io/apimachinery/pkg/runtime/serializer/json/collections.go new file mode 100644 index 00000000000..075163ddd81 --- /dev/null +++ b/staging/src/k8s.io/apimachinery/pkg/runtime/serializer/json/collections.go @@ -0,0 +1,230 @@ +/* +Copyright 2025 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package json + +import ( + "encoding/json" + "fmt" + "io" + "maps" + "slices" + "sort" + + "k8s.io/apimachinery/pkg/api/meta" + "k8s.io/apimachinery/pkg/conversion" + + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" + "k8s.io/apimachinery/pkg/runtime" +) + +func streamEncodeCollections(obj runtime.Object, w io.Writer) (bool, error) { + list, ok := obj.(*unstructured.UnstructuredList) + if ok { + return true, streamingEncodeUnstructuredList(w, list) + } + if _, ok := obj.(json.Marshaler); ok { + return false, nil + } + typeMeta, listMeta, items, err := getListMeta(obj) + if err == nil { + return true, streamingEncodeList(w, typeMeta, listMeta, items) + } + return false, nil +} + +// getListMeta implements list extraction logic for json stream serialization. +// +// Reason for a custom logic instead of reusing accessors from meta package: +// * Validate json tags to prevent incompatibility with json standard package. +// * ListMetaAccessor doesn't distinguish empty from nil value. +// * TypeAccessort reparsing "apiVersion" and serializing it with "{group}/{version}" +func getListMeta(list runtime.Object) (metav1.TypeMeta, metav1.ListMeta, []runtime.Object, error) { + listValue, err := conversion.EnforcePtr(list) + if err != nil { + return metav1.TypeMeta{}, metav1.ListMeta{}, nil, err + } + listType := listValue.Type() + if listType.NumField() != 3 { + return metav1.TypeMeta{}, metav1.ListMeta{}, nil, fmt.Errorf("expected ListType to have 3 fields") + } + // TypeMeta + typeMeta, ok := listValue.Field(0).Interface().(metav1.TypeMeta) + if !ok { + return metav1.TypeMeta{}, metav1.ListMeta{}, nil, fmt.Errorf("expected TypeMeta field to have TypeMeta type") + } + if listType.Field(0).Tag.Get("json") != ",inline" { + return metav1.TypeMeta{}, metav1.ListMeta{}, nil, fmt.Errorf(`expected TypeMeta json field tag to be ",inline"`) + } + // ListMeta + listMeta, ok := listValue.Field(1).Interface().(metav1.ListMeta) + if !ok { + return metav1.TypeMeta{}, metav1.ListMeta{}, nil, fmt.Errorf("expected ListMeta field to have ListMeta type") + } + if listType.Field(1).Tag.Get("json") != "metadata,omitempty" { + return metav1.TypeMeta{}, metav1.ListMeta{}, nil, fmt.Errorf(`expected ListMeta json field tag to be "metadata,omitempty"`) + } + // Items + items, err := meta.ExtractList(list) + if err != nil { + return metav1.TypeMeta{}, metav1.ListMeta{}, nil, err + } + if listType.Field(2).Tag.Get("json") != "items" { + return metav1.TypeMeta{}, metav1.ListMeta{}, nil, fmt.Errorf(`expected Items json field tag to be "items"`) + } + return typeMeta, listMeta, items, nil +} + +func streamingEncodeList(w io.Writer, typeMeta metav1.TypeMeta, listMeta metav1.ListMeta, items []runtime.Object) error { + // Start + if _, err := w.Write([]byte(`{`)); err != nil { + return err + } + + // TypeMeta + if typeMeta.Kind != "" { + if err := encodeKeyValuePair(w, "kind", typeMeta.Kind, []byte(",")); err != nil { + return err + } + } + if typeMeta.APIVersion != "" { + if err := encodeKeyValuePair(w, "apiVersion", typeMeta.APIVersion, []byte(",")); err != nil { + return err + } + } + + // ListMeta + if err := encodeKeyValuePair(w, "metadata", listMeta, []byte(",")); err != nil { + return err + } + + // Items + if err := encodeItemsObjectSlice(w, items); err != nil { + return err + } + + // End + _, err := w.Write([]byte("}\n")) + return err +} + +func encodeItemsObjectSlice(w io.Writer, items []runtime.Object) (err error) { + if items == nil { + err := encodeKeyValuePair(w, "items", nil, nil) + return err + } + _, err = w.Write([]byte(`"items":[`)) + if err != nil { + return err + } + suffix := []byte(",") + for i, item := range items { + if i == len(items)-1 { + suffix = nil + } + err := encodeValue(w, item, suffix) + if err != nil { + return err + } + } + _, err = w.Write([]byte("]")) + if err != nil { + return err + } + return err +} + +func streamingEncodeUnstructuredList(w io.Writer, list *unstructured.UnstructuredList) error { + _, err := w.Write([]byte(`{`)) + if err != nil { + return err + } + keys := slices.Collect(maps.Keys(list.Object)) + if _, exists := list.Object["items"]; !exists { + keys = append(keys, "items") + } + sort.Strings(keys) + + suffix := []byte(",") + for i, key := range keys { + if i == len(keys)-1 { + suffix = nil + } + if key == "items" { + err = encodeItemsUnstructuredSlice(w, list.Items, suffix) + } else { + err = encodeKeyValuePair(w, key, list.Object[key], suffix) + } + if err != nil { + return err + } + } + _, err = w.Write([]byte("}\n")) + return err +} + +func encodeItemsUnstructuredSlice(w io.Writer, items []unstructured.Unstructured, suffix []byte) (err error) { + _, err = w.Write([]byte(`"items":[`)) + if err != nil { + return err + } + comma := []byte(",") + for i, item := range items { + if i == len(items)-1 { + comma = nil + } + err := encodeValue(w, item.Object, comma) + if err != nil { + return err + } + } + _, err = w.Write([]byte("]")) + if err != nil { + return err + } + if len(suffix) > 0 { + _, err = w.Write(suffix) + } + return err +} + +func encodeKeyValuePair(w io.Writer, key string, value any, suffix []byte) (err error) { + err = encodeValue(w, key, []byte(":")) + if err != nil { + return err + } + err = encodeValue(w, value, suffix) + if err != nil { + return err + } + return err +} + +func encodeValue(w io.Writer, value any, suffix []byte) error { + data, err := json.Marshal(value) + if err != nil { + return err + } + _, err = w.Write(data) + if err != nil { + return err + } + if len(suffix) > 0 { + _, err = w.Write(suffix) + } + return err +} diff --git a/staging/src/k8s.io/apimachinery/pkg/runtime/serializer/json/collections_test.go b/staging/src/k8s.io/apimachinery/pkg/runtime/serializer/json/collections_test.go index 6d8f34962a2..03ec4d88f22 100644 --- a/staging/src/k8s.io/apimachinery/pkg/runtime/serializer/json/collections_test.go +++ b/staging/src/k8s.io/apimachinery/pkg/runtime/serializer/json/collections_test.go @@ -18,9 +18,11 @@ package json import ( "bytes" + "fmt" "testing" "github.com/google/go-cmp/cmp" + fuzz "github.com/google/gofuzz" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" @@ -30,21 +32,24 @@ import ( func TestCollectionsEncoding(t *testing.T) { t.Run("Normal", func(t *testing.T) { - testCollectionsEncoding(t, NewSerializerWithOptions(DefaultMetaFactory, nil, nil, SerializerOptions{})) + testCollectionsEncoding(t, NewSerializerWithOptions(DefaultMetaFactory, nil, nil, SerializerOptions{}), false) + }) + t.Run("Streaming", func(t *testing.T) { + testCollectionsEncoding(t, NewSerializerWithOptions(DefaultMetaFactory, nil, nil, SerializerOptions{StreamingCollectionsEncoding: true}), true) }) - // Leave place for testing streaming collection serializer proposed as part of KEP-5116 } // testCollectionsEncoding should provide comprehensive tests to validate streaming implementation of encoder. -func testCollectionsEncoding(t *testing.T, s *Serializer) { - var buf bytes.Buffer +func testCollectionsEncoding(t *testing.T, s *Serializer, streamingEnabled bool) { + var buf writeCountingBuffer var remainingItems int64 = 1 // As defined in KEP-5116 we it should include the following scenarios: // Context: https://github.com/kubernetes/enhancements/tree/master/keps/sig-api-machinery/5116-streaming-response-encoding#unit-tests for _, tc := range []struct { - name string - in runtime.Object - expect string + name string + in runtime.Object + cannotStream bool + expect string }{ // Preserving the distinction between integers and floating-point numbers { @@ -307,9 +312,10 @@ func testCollectionsEncoding(t *testing.T, s *Serializer) { }, // Handling structs implementing MarshallJSON method, especially built-in collection types. { - name: "List with MarshallJSON", - in: &ListWithMarshalJSONList{}, - expect: "\"marshallJSON\"\n", + name: "List with MarshallJSON cannot be streamed", + in: &ListWithMarshalJSONList{}, + expect: "\"marshallJSON\"\n", + cannotStream: true, }, { name: "Struct with MarshallJSON", @@ -435,6 +441,32 @@ func testCollectionsEncoding(t *testing.T, s *Serializer) { expect: `{"kind":"List","apiVersion":"v1","metadata":{"resourceVersion":"2345"},"items":[{"kind":"Carp","apiVersion":"v1","metadata":{"name":"pod","namespace":"default","creationTimestamp":null},"spec":{},"status":{}},{"kind":"Carp","apiVersion":"v1","metadata":{"name":"pod2","namespace":"default2","creationTimestamp":null},"spec":{},"status":{}}]} `, }, + { + name: "List with extra field cannot be streamed", + in: &ListWithAdditionalFields{ + TypeMeta: metav1.TypeMeta{ + Kind: "List", + APIVersion: "v1", + }, + ListMeta: metav1.ListMeta{ + ResourceVersion: "2345", + }, + Items: []testapigroupv1.Carp{}, + }, + cannotStream: true, + expect: "{\"kind\":\"List\",\"apiVersion\":\"v1\",\"metadata\":{\"resourceVersion\":\"2345\"},\"items\":[],\"AdditionalField\":0}\n", + }, + { + name: "Not a collection cannot be streamed", + in: &testapigroupv1.Carp{ + TypeMeta: metav1.TypeMeta{ + Kind: "List", + APIVersion: "v1", + }, + }, + cannotStream: true, + expect: "{\"kind\":\"List\",\"apiVersion\":\"v1\",\"metadata\":{\"creationTimestamp\":null},\"spec\":{},\"status\":{}}\n", + }, { name: "UnstructuredList empty", in: &unstructured.UnstructuredList{}, @@ -543,10 +575,17 @@ func testCollectionsEncoding(t *testing.T, s *Serializer) { if err := s.Encode(tc.in, &buf); err != nil { t.Fatalf("unexpected error: %v", err) } - t.Logf("normal: %s", buf.String()) + t.Logf("encoded: %s", buf.String()) if diff := cmp.Diff(buf.String(), tc.expect); diff != "" { t.Errorf("not matching:\n%s", diff) } + expectStreaming := !tc.cannotStream && streamingEnabled + if expectStreaming && buf.writeCount <= 1 { + t.Errorf("expected streaming but Write was called only: %d", buf.writeCount) + } + if !expectStreaming && buf.writeCount > 1 { + t.Errorf("expected non-streaming but Write was called more than once: %d", buf.writeCount) + } }) } } @@ -653,3 +692,103 @@ type StructWithRawBytes struct { func (s *StructWithRawBytes) DeepCopyObject() runtime.Object { return nil } + +type ListWithAdditionalFields struct { + metav1.TypeMeta `json:",inline"` + metav1.ListMeta `json:"metadata,omitempty" protobuf:"bytes,1,opt,name=metadata"` + Items []testapigroupv1.Carp `json:"items" protobuf:"bytes,2,rep,name=items"` + AdditionalField int +} + +func (s *ListWithAdditionalFields) DeepCopyObject() runtime.Object { + return nil +} + +type writeCountingBuffer struct { + writeCount int + bytes.Buffer +} + +func (b *writeCountingBuffer) Write(data []byte) (int, error) { + b.writeCount++ + return b.Buffer.Write(data) +} + +func (b *writeCountingBuffer) Reset() { + b.writeCount = 0 + b.Buffer.Reset() +} + +func TestFuzzCollectionsEncoding(t *testing.T) { + disableFuzzFieldsV1 := func(field *metav1.FieldsV1, c fuzz.Continue) {} + fuzzUnstructuredList := func(list *unstructured.UnstructuredList, c fuzz.Continue) { + list.Object = map[string]interface{}{ + "kind": "List", + "apiVersion": "v1", + c.RandString(): c.RandString(), + c.RandString(): c.RandUint64(), + c.RandString(): c.RandBool(), + "metadata": map[string]interface{}{ + "resourceVersion": fmt.Sprintf("%d", c.RandUint64()), + "continue": c.RandString(), + "remainingItemCount": fmt.Sprintf("%d", c.RandUint64()), + c.RandString(): c.RandString(), + }} + c.Fuzz(&list.Items) + } + fuzzMap := func(kvs map[string]interface{}, c fuzz.Continue) { + kvs[c.RandString()] = c.RandBool() + kvs[c.RandString()] = c.RandUint64() + kvs[c.RandString()] = c.RandString() + } + f := fuzz.New().Funcs(disableFuzzFieldsV1, fuzzUnstructuredList, fuzzMap) + streamingBuffer := &bytes.Buffer{} + normalSerializer := NewSerializerWithOptions(DefaultMetaFactory, nil, nil, SerializerOptions{StreamingCollectionsEncoding: false}) + normalBuffer := &bytes.Buffer{} + t.Run("CarpList", func(t *testing.T) { + for i := 0; i < 1000; i++ { + list := &testapigroupv1.CarpList{} + f.Fuzz(list) + streamingBuffer.Reset() + normalBuffer.Reset() + ok, err := streamEncodeCollections(list, streamingBuffer) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + if !ok { + t.Fatalf("expected streaming encoder to encode %T", list) + } + if err := normalSerializer.Encode(list, normalBuffer); err != nil { + t.Fatal(err) + } + if diff := cmp.Diff(normalBuffer.String(), streamingBuffer.String()); diff != "" { + t.Logf("normal: %s", normalBuffer.String()) + t.Logf("streaming: %s", streamingBuffer.String()) + t.Errorf("not matching:\n%s", diff) + } + } + }) + t.Run("UnstructuredList", func(t *testing.T) { + for i := 0; i < 1000; i++ { + list := &unstructured.UnstructuredList{} + f.Fuzz(list) + streamingBuffer.Reset() + normalBuffer.Reset() + ok, err := streamEncodeCollections(list, streamingBuffer) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + if !ok { + t.Fatalf("expected streaming encoder to encode %T", list) + } + if err := normalSerializer.Encode(list, normalBuffer); err != nil { + t.Fatal(err) + } + if diff := cmp.Diff(normalBuffer.String(), streamingBuffer.String()); diff != "" { + t.Logf("normal: %s", normalBuffer.String()) + t.Logf("streaming: %s", streamingBuffer.String()) + t.Errorf("not matching:\n%s", diff) + } + } + }) +} diff --git a/staging/src/k8s.io/apimachinery/pkg/runtime/serializer/json/json.go b/staging/src/k8s.io/apimachinery/pkg/runtime/serializer/json/json.go index 1ae4a32eb72..24f66a10174 100644 --- a/staging/src/k8s.io/apimachinery/pkg/runtime/serializer/json/json.go +++ b/staging/src/k8s.io/apimachinery/pkg/runtime/serializer/json/json.go @@ -36,7 +36,7 @@ import ( // is not nil, the object has the group, version, and kind fields set. // Deprecated: use NewSerializerWithOptions instead. func NewSerializer(meta MetaFactory, creater runtime.ObjectCreater, typer runtime.ObjectTyper, pretty bool) *Serializer { - return NewSerializerWithOptions(meta, creater, typer, SerializerOptions{false, pretty, false}) + return NewSerializerWithOptions(meta, creater, typer, SerializerOptions{false, pretty, false, false}) } // NewYAMLSerializer creates a YAML serializer that handles encoding versioned objects into the proper YAML form. If typer @@ -44,7 +44,7 @@ func NewSerializer(meta MetaFactory, creater runtime.ObjectCreater, typer runtim // matches JSON, and will error if constructs are used that do not serialize to JSON. // Deprecated: use NewSerializerWithOptions instead. func NewYAMLSerializer(meta MetaFactory, creater runtime.ObjectCreater, typer runtime.ObjectTyper) *Serializer { - return NewSerializerWithOptions(meta, creater, typer, SerializerOptions{true, false, false}) + return NewSerializerWithOptions(meta, creater, typer, SerializerOptions{true, false, false, false}) } // NewSerializerWithOptions creates a JSON/YAML serializer that handles encoding versioned objects into the proper JSON/YAML @@ -93,6 +93,9 @@ type SerializerOptions struct { // Strict: configures the Serializer to return strictDecodingError's when duplicate fields are present decoding JSON or YAML. // Note that enabling this option is not as performant as the non-strict variant, and should not be used in fast paths. Strict bool + + // StreamingCollectionsEncoding enables encoding collection, one item at the time, drastically reducing memory needed. + StreamingCollectionsEncoding bool } // Serializer handles encoding versioned objects into the proper JSON form @@ -242,6 +245,15 @@ func (s *Serializer) doEncode(obj runtime.Object, w io.Writer) error { _, err = w.Write(data) return err } + if s.options.StreamingCollectionsEncoding { + ok, err := streamEncodeCollections(obj, w) + if err != nil { + return err + } + if ok { + return nil + } + } encoder := json.NewEncoder(w) return encoder.Encode(obj) } diff --git a/staging/src/k8s.io/apiserver/pkg/endpoints/handlers/responsewriters/writers_test.go b/staging/src/k8s.io/apiserver/pkg/endpoints/handlers/responsewriters/writers_test.go index 195b161fe0a..979014b37d4 100644 --- a/staging/src/k8s.io/apiserver/pkg/endpoints/handlers/responsewriters/writers_test.go +++ b/staging/src/k8s.io/apiserver/pkg/endpoints/handlers/responsewriters/writers_test.go @@ -39,8 +39,11 @@ import ( "github.com/google/go-cmp/cmp" v1 "k8s.io/api/core/v1" kerrors "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + testapigroupv1 "k8s.io/apimachinery/pkg/apis/testapigroup/v1" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime/schema" + jsonserializer "k8s.io/apimachinery/pkg/runtime/serializer/json" rand2 "k8s.io/apimachinery/pkg/util/rand" "k8s.io/apimachinery/pkg/util/uuid" "k8s.io/apiserver/pkg/features" @@ -804,3 +807,80 @@ func gzipContent(data []byte, level int) []byte { } return buf.Bytes() } + +func TestStreamingGzipIntegration(t *testing.T) { + largeChunk := bytes.Repeat([]byte("b"), defaultGzipThresholdBytes+1) + tcs := []struct { + name string + serializer runtime.Encoder + object runtime.Object + expectGzip bool + expectStreaming bool + }{ + { + name: "JSON, small object, default -> no gzip", + serializer: jsonserializer.NewSerializerWithOptions(jsonserializer.DefaultMetaFactory, nil, nil, jsonserializer.SerializerOptions{}), + object: &testapigroupv1.CarpList{}, + expectGzip: false, + expectStreaming: false, + }, + { + name: "JSON, small object, streaming -> no gzip", + serializer: jsonserializer.NewSerializerWithOptions(jsonserializer.DefaultMetaFactory, nil, nil, jsonserializer.SerializerOptions{StreamingCollectionsEncoding: true}), + object: &testapigroupv1.CarpList{}, + expectGzip: false, + expectStreaming: true, + }, + { + name: "JSON, large object, default -> gzip", + serializer: jsonserializer.NewSerializerWithOptions(jsonserializer.DefaultMetaFactory, nil, nil, jsonserializer.SerializerOptions{}), + object: &testapigroupv1.CarpList{TypeMeta: metav1.TypeMeta{Kind: string(largeChunk)}}, + expectGzip: true, + expectStreaming: false, + }, + { + name: "JSON, large object, streaming -> gzip", + serializer: jsonserializer.NewSerializerWithOptions(jsonserializer.DefaultMetaFactory, nil, nil, jsonserializer.SerializerOptions{StreamingCollectionsEncoding: true}), + object: &testapigroupv1.CarpList{TypeMeta: metav1.TypeMeta{Kind: string(largeChunk)}}, + expectGzip: true, + expectStreaming: true, + }, + } + for _, tc := range tcs { + t.Run(tc.name, func(t *testing.T) { + mockResponseWriter := httptest.NewRecorder() + drw := &deferredResponseWriter{ + mediaType: "text/plain", + statusCode: 200, + contentEncoding: "gzip", + hw: mockResponseWriter, + ctx: context.Background(), + } + counter := &writeCounter{Writer: drw} + err := tc.serializer.Encode(tc.object, counter) + if err != nil { + t.Fatal(err) + } + encoding := mockResponseWriter.Header().Get("Content-Encoding") + if (encoding == "gzip") != tc.expectGzip { + t.Errorf("Expect gzip: %v, got: %q", tc.expectGzip, encoding) + } + if counter.writeCount < 1 { + t.Fatalf("Expect at least 1 write") + } + if (counter.writeCount > 1) != tc.expectStreaming { + t.Errorf("Expect streaming: %v, got write count: %d", tc.expectStreaming, counter.writeCount) + } + }) + } +} + +type writeCounter struct { + writeCount int + io.Writer +} + +func (b *writeCounter) Write(data []byte) (int, error) { + b.writeCount++ + return b.Writer.Write(data) +} diff --git a/staging/src/k8s.io/apiserver/pkg/features/kube_features.go b/staging/src/k8s.io/apiserver/pkg/features/kube_features.go index d2a4cdc8b27..8e4153d1014 100644 --- a/staging/src/k8s.io/apiserver/pkg/features/kube_features.go +++ b/staging/src/k8s.io/apiserver/pkg/features/kube_features.go @@ -217,6 +217,10 @@ const ( // document. StorageVersionHash featuregate.Feature = "StorageVersionHash" + // owner: @serathius + // Allow API server to encode collections item by item, instead of all at once. + StreamingCollectionEncodingToJSON featuregate.Feature = "StreamingCollectionEncodingToJSON" + // owner: @aramase, @enj, @nabokihms // kep: https://kep.k8s.io/3331 // @@ -387,6 +391,10 @@ var defaultVersionedKubernetesFeatureGates = map[featuregate.Feature]featuregate {Version: version.MustParse("1.15"), Default: true, PreRelease: featuregate.Beta}, }, + StreamingCollectionEncodingToJSON: { + {Version: version.MustParse("1.33"), Default: true, PreRelease: featuregate.Beta}, + }, + StrictCostEnforcementForVAP: { {Version: version.MustParse("1.30"), Default: false, PreRelease: featuregate.Beta}, {Version: version.MustParse("1.32"), Default: true, PreRelease: featuregate.GA, LockToDefault: true}, diff --git a/staging/src/k8s.io/apiserver/pkg/server/genericapiserver.go b/staging/src/k8s.io/apiserver/pkg/server/genericapiserver.go index 43456f2f1e4..82b9df9441b 100644 --- a/staging/src/k8s.io/apiserver/pkg/server/genericapiserver.go +++ b/staging/src/k8s.io/apiserver/pkg/server/genericapiserver.go @@ -991,8 +991,15 @@ func (s *GenericAPIServer) newAPIGroupVersion(apiGroupInfo *APIGroupInfo, groupV // NewDefaultAPIGroupInfo returns an APIGroupInfo stubbed with "normal" values // exposed for easier composition from other packages func NewDefaultAPIGroupInfo(group string, scheme *runtime.Scheme, parameterCodec runtime.ParameterCodec, codecs serializer.CodecFactory) APIGroupInfo { + opts := []serializer.CodecFactoryOptionsMutator{} if utilfeature.DefaultFeatureGate.Enabled(features.CBORServingAndStorage) { - codecs = serializer.NewCodecFactory(scheme, serializer.WithSerializer(cbor.NewSerializerInfo)) + opts = append(opts, serializer.WithSerializer(cbor.NewSerializerInfo)) + } + if utilfeature.DefaultFeatureGate.Enabled(features.StreamingCollectionEncodingToJSON) { + opts = append(opts, serializer.WithStreamingCollectionEncodingToJSON()) + } + if len(opts) != 0 { + codecs = serializer.NewCodecFactory(scheme, opts...) } return APIGroupInfo{ PrioritizedVersions: scheme.PrioritizedVersionsForGroup(group), diff --git a/test/featuregates_linter/test_data/versioned_feature_list.yaml b/test/featuregates_linter/test_data/versioned_feature_list.yaml index deb494360aa..bb752fb5006 100644 --- a/test/featuregates_linter/test_data/versioned_feature_list.yaml +++ b/test/featuregates_linter/test_data/versioned_feature_list.yaml @@ -1324,6 +1324,12 @@ lockToDefault: false preRelease: Alpha version: "1.30" +- name: StreamingCollectionEncodingToJSON + versionedSpecs: + - default: true + lockToDefault: false + preRelease: Beta + version: "1.33" - name: StrictCostEnforcementForVAP versionedSpecs: - default: false