Merge pull request #129334 from serathius/streaming-json-list-encoder

Streaming json list encoder
This commit is contained in:
Kubernetes Prow Robot 2025-02-28 13:36:55 -08:00 committed by GitHub
commit 2fc329c857
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
13 changed files with 532 additions and 19 deletions

View File

@ -667,6 +667,10 @@ const (
// Enables support for the StorageVersionMigrator controller.
StorageVersionMigrator featuregate.Feature = "StorageVersionMigrator"
// owner: @serathius
// Allow API server to encode collections item by item, instead of all at once.
StreamingCollectionEncodingToJSON featuregate.Feature = "StreamingCollectionEncodingToJSON"
// owner: @robscott
// kep: https://kep.k8s.io/2433
//

View File

@ -742,6 +742,10 @@ var defaultVersionedKubernetesFeatureGates = map[featuregate.Feature]featuregate
{Version: version.MustParse("1.30"), Default: false, PreRelease: featuregate.Alpha},
},
StreamingCollectionEncodingToJSON: {
{Version: version.MustParse("1.33"), Default: true, PreRelease: featuregate.Beta},
},
SupplementalGroupsPolicy: {
{Version: version.MustParse("1.31"), Default: false, PreRelease: featuregate.Alpha},
},

View File

@ -73,8 +73,15 @@ func (c *GenericConfig) NewRESTStorage(apiResourceConfigSource serverstorage.API
ParameterCodec: legacyscheme.ParameterCodec,
NegotiatedSerializer: legacyscheme.Codecs,
}
opts := []serializer.CodecFactoryOptionsMutator{}
if utilfeature.DefaultFeatureGate.Enabled(features.CBORServingAndStorage) {
apiGroupInfo.NegotiatedSerializer = serializer.NewCodecFactory(legacyscheme.Scheme, serializer.WithSerializer(cbor.NewSerializerInfo))
opts = append(opts, serializer.WithSerializer(cbor.NewSerializerInfo))
}
if utilfeature.DefaultFeatureGate.Enabled(features.StreamingCollectionEncodingToJSON) {
opts = append(opts, serializer.WithStreamingCollectionEncodingToJSON())
}
if len(opts) != 0 {
apiGroupInfo.NegotiatedSerializer = serializer.NewCodecFactory(legacyscheme.Scheme, opts...)
}
eventStorage, err := eventstore.NewREST(restOptionsGetter, uint64(c.EventTTL.Seconds()))

View File

@ -851,6 +851,7 @@ func (r *crdHandler) getOrCreateServingInfoFor(uid types.UID, name string) (*crd
clusterScoped := crd.Spec.Scope == apiextensionsv1.ClusterScoped
// CRDs explicitly do not support protobuf, but some objects returned by the API server do
streamingCollections := utilfeature.DefaultFeatureGate.Enabled(features.StreamingCollectionEncodingToJSON)
negotiatedSerializer := unstructuredNegotiatedSerializer{
typer: typer,
creator: creator,
@ -864,10 +865,11 @@ func (r *crdHandler) getOrCreateServingInfoFor(uid types.UID, name string) (*crd
MediaTypeType: "application",
MediaTypeSubType: "json",
EncodesAsText: true,
Serializer: json.NewSerializerWithOptions(json.DefaultMetaFactory, creator, typer, json.SerializerOptions{}),
Serializer: json.NewSerializerWithOptions(json.DefaultMetaFactory, creator, typer, json.SerializerOptions{StreamingCollectionsEncoding: streamingCollections}),
PrettySerializer: json.NewSerializerWithOptions(json.DefaultMetaFactory, creator, typer, json.SerializerOptions{Pretty: true}),
StrictSerializer: json.NewSerializerWithOptions(json.DefaultMetaFactory, creator, typer, json.SerializerOptions{
Strict: true,
Strict: true,
StreamingCollectionsEncoding: streamingCollections,
}),
StreamSerializer: &runtime.StreamSerializerInfo{
EncodesAsText: true,
@ -970,6 +972,9 @@ func (r *crdHandler) getOrCreateServingInfoFor(uid types.UID, name string) (*crd
if utilfeature.DefaultFeatureGate.Enabled(features.CBORServingAndStorage) {
opts = append(opts, serializer.WithSerializer(cbor.NewSerializerInfo))
}
if utilfeature.DefaultFeatureGate.Enabled(features.StreamingCollectionEncodingToJSON) {
opts = append(opts, serializer.WithStreamingCollectionEncodingToJSON())
}
scaleScope.Serializer = serializer.NewCodecFactory(scaleConverter.Scheme(), opts...)
scaleScope.Kind = autoscalingv1.SchemeGroupVersion.WithKind("Scale")
scaleScope.Namer = handlers.ContextBasedNaming{

View File

@ -221,6 +221,9 @@ func extractList(obj runtime.Object, allocNew bool) ([]runtime.Object, error) {
if err != nil {
return nil, err
}
if items.IsNil() {
return nil, nil
}
list := make([]runtime.Object, items.Len())
if len(list) == 0 {
return list, nil

View File

@ -28,7 +28,7 @@ import (
func newSerializersForScheme(scheme *runtime.Scheme, mf json.MetaFactory, options CodecFactoryOptions) []runtime.SerializerInfo {
jsonSerializer := json.NewSerializerWithOptions(
mf, scheme, scheme,
json.SerializerOptions{Yaml: false, Pretty: false, Strict: options.Strict},
json.SerializerOptions{Yaml: false, Pretty: false, Strict: options.Strict, StreamingCollectionsEncoding: options.StreamingCollectionsEncodingToJSON},
)
jsonSerializerType := runtime.SerializerInfo{
MediaType: runtime.ContentTypeJSON,
@ -38,7 +38,7 @@ func newSerializersForScheme(scheme *runtime.Scheme, mf json.MetaFactory, option
Serializer: jsonSerializer,
StrictSerializer: json.NewSerializerWithOptions(
mf, scheme, scheme,
json.SerializerOptions{Yaml: false, Pretty: false, Strict: true},
json.SerializerOptions{Yaml: false, Pretty: false, Strict: true, StreamingCollectionsEncoding: options.StreamingCollectionsEncodingToJSON},
),
StreamSerializer: &runtime.StreamSerializerInfo{
EncodesAsText: true,
@ -113,6 +113,8 @@ type CodecFactoryOptions struct {
// Pretty includes a pretty serializer along with the non-pretty one
Pretty bool
StreamingCollectionsEncodingToJSON bool
serializers []func(runtime.ObjectCreater, runtime.ObjectTyper) runtime.SerializerInfo
}
@ -147,6 +149,12 @@ func WithSerializer(f func(runtime.ObjectCreater, runtime.ObjectTyper) runtime.S
}
}
func WithStreamingCollectionEncodingToJSON() CodecFactoryOptionsMutator {
return func(options *CodecFactoryOptions) {
options.StreamingCollectionsEncodingToJSON = true
}
}
// NewCodecFactory provides methods for retrieving serializers for the supported wire formats
// and conversion wrappers to define preferred internal and external versions. In the future,
// as the internal version is used less, callers may instead use a defaulting serializer and

View File

@ -0,0 +1,230 @@
/*
Copyright 2025 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package json
import (
"encoding/json"
"fmt"
"io"
"maps"
"slices"
"sort"
"k8s.io/apimachinery/pkg/api/meta"
"k8s.io/apimachinery/pkg/conversion"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime"
)
func streamEncodeCollections(obj runtime.Object, w io.Writer) (bool, error) {
list, ok := obj.(*unstructured.UnstructuredList)
if ok {
return true, streamingEncodeUnstructuredList(w, list)
}
if _, ok := obj.(json.Marshaler); ok {
return false, nil
}
typeMeta, listMeta, items, err := getListMeta(obj)
if err == nil {
return true, streamingEncodeList(w, typeMeta, listMeta, items)
}
return false, nil
}
// getListMeta implements list extraction logic for json stream serialization.
//
// Reason for a custom logic instead of reusing accessors from meta package:
// * Validate json tags to prevent incompatibility with json standard package.
// * ListMetaAccessor doesn't distinguish empty from nil value.
// * TypeAccessort reparsing "apiVersion" and serializing it with "{group}/{version}"
func getListMeta(list runtime.Object) (metav1.TypeMeta, metav1.ListMeta, []runtime.Object, error) {
listValue, err := conversion.EnforcePtr(list)
if err != nil {
return metav1.TypeMeta{}, metav1.ListMeta{}, nil, err
}
listType := listValue.Type()
if listType.NumField() != 3 {
return metav1.TypeMeta{}, metav1.ListMeta{}, nil, fmt.Errorf("expected ListType to have 3 fields")
}
// TypeMeta
typeMeta, ok := listValue.Field(0).Interface().(metav1.TypeMeta)
if !ok {
return metav1.TypeMeta{}, metav1.ListMeta{}, nil, fmt.Errorf("expected TypeMeta field to have TypeMeta type")
}
if listType.Field(0).Tag.Get("json") != ",inline" {
return metav1.TypeMeta{}, metav1.ListMeta{}, nil, fmt.Errorf(`expected TypeMeta json field tag to be ",inline"`)
}
// ListMeta
listMeta, ok := listValue.Field(1).Interface().(metav1.ListMeta)
if !ok {
return metav1.TypeMeta{}, metav1.ListMeta{}, nil, fmt.Errorf("expected ListMeta field to have ListMeta type")
}
if listType.Field(1).Tag.Get("json") != "metadata,omitempty" {
return metav1.TypeMeta{}, metav1.ListMeta{}, nil, fmt.Errorf(`expected ListMeta json field tag to be "metadata,omitempty"`)
}
// Items
items, err := meta.ExtractList(list)
if err != nil {
return metav1.TypeMeta{}, metav1.ListMeta{}, nil, err
}
if listType.Field(2).Tag.Get("json") != "items" {
return metav1.TypeMeta{}, metav1.ListMeta{}, nil, fmt.Errorf(`expected Items json field tag to be "items"`)
}
return typeMeta, listMeta, items, nil
}
func streamingEncodeList(w io.Writer, typeMeta metav1.TypeMeta, listMeta metav1.ListMeta, items []runtime.Object) error {
// Start
if _, err := w.Write([]byte(`{`)); err != nil {
return err
}
// TypeMeta
if typeMeta.Kind != "" {
if err := encodeKeyValuePair(w, "kind", typeMeta.Kind, []byte(",")); err != nil {
return err
}
}
if typeMeta.APIVersion != "" {
if err := encodeKeyValuePair(w, "apiVersion", typeMeta.APIVersion, []byte(",")); err != nil {
return err
}
}
// ListMeta
if err := encodeKeyValuePair(w, "metadata", listMeta, []byte(",")); err != nil {
return err
}
// Items
if err := encodeItemsObjectSlice(w, items); err != nil {
return err
}
// End
_, err := w.Write([]byte("}\n"))
return err
}
func encodeItemsObjectSlice(w io.Writer, items []runtime.Object) (err error) {
if items == nil {
err := encodeKeyValuePair(w, "items", nil, nil)
return err
}
_, err = w.Write([]byte(`"items":[`))
if err != nil {
return err
}
suffix := []byte(",")
for i, item := range items {
if i == len(items)-1 {
suffix = nil
}
err := encodeValue(w, item, suffix)
if err != nil {
return err
}
}
_, err = w.Write([]byte("]"))
if err != nil {
return err
}
return err
}
func streamingEncodeUnstructuredList(w io.Writer, list *unstructured.UnstructuredList) error {
_, err := w.Write([]byte(`{`))
if err != nil {
return err
}
keys := slices.Collect(maps.Keys(list.Object))
if _, exists := list.Object["items"]; !exists {
keys = append(keys, "items")
}
sort.Strings(keys)
suffix := []byte(",")
for i, key := range keys {
if i == len(keys)-1 {
suffix = nil
}
if key == "items" {
err = encodeItemsUnstructuredSlice(w, list.Items, suffix)
} else {
err = encodeKeyValuePair(w, key, list.Object[key], suffix)
}
if err != nil {
return err
}
}
_, err = w.Write([]byte("}\n"))
return err
}
func encodeItemsUnstructuredSlice(w io.Writer, items []unstructured.Unstructured, suffix []byte) (err error) {
_, err = w.Write([]byte(`"items":[`))
if err != nil {
return err
}
comma := []byte(",")
for i, item := range items {
if i == len(items)-1 {
comma = nil
}
err := encodeValue(w, item.Object, comma)
if err != nil {
return err
}
}
_, err = w.Write([]byte("]"))
if err != nil {
return err
}
if len(suffix) > 0 {
_, err = w.Write(suffix)
}
return err
}
func encodeKeyValuePair(w io.Writer, key string, value any, suffix []byte) (err error) {
err = encodeValue(w, key, []byte(":"))
if err != nil {
return err
}
err = encodeValue(w, value, suffix)
if err != nil {
return err
}
return err
}
func encodeValue(w io.Writer, value any, suffix []byte) error {
data, err := json.Marshal(value)
if err != nil {
return err
}
_, err = w.Write(data)
if err != nil {
return err
}
if len(suffix) > 0 {
_, err = w.Write(suffix)
}
return err
}

View File

@ -18,9 +18,11 @@ package json
import (
"bytes"
"fmt"
"testing"
"github.com/google/go-cmp/cmp"
fuzz "github.com/google/gofuzz"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
@ -30,21 +32,24 @@ import (
func TestCollectionsEncoding(t *testing.T) {
t.Run("Normal", func(t *testing.T) {
testCollectionsEncoding(t, NewSerializerWithOptions(DefaultMetaFactory, nil, nil, SerializerOptions{}))
testCollectionsEncoding(t, NewSerializerWithOptions(DefaultMetaFactory, nil, nil, SerializerOptions{}), false)
})
t.Run("Streaming", func(t *testing.T) {
testCollectionsEncoding(t, NewSerializerWithOptions(DefaultMetaFactory, nil, nil, SerializerOptions{StreamingCollectionsEncoding: true}), true)
})
// Leave place for testing streaming collection serializer proposed as part of KEP-5116
}
// testCollectionsEncoding should provide comprehensive tests to validate streaming implementation of encoder.
func testCollectionsEncoding(t *testing.T, s *Serializer) {
var buf bytes.Buffer
func testCollectionsEncoding(t *testing.T, s *Serializer, streamingEnabled bool) {
var buf writeCountingBuffer
var remainingItems int64 = 1
// As defined in KEP-5116 we it should include the following scenarios:
// Context: https://github.com/kubernetes/enhancements/tree/master/keps/sig-api-machinery/5116-streaming-response-encoding#unit-tests
for _, tc := range []struct {
name string
in runtime.Object
expect string
name string
in runtime.Object
cannotStream bool
expect string
}{
// Preserving the distinction between integers and floating-point numbers
{
@ -307,9 +312,10 @@ func testCollectionsEncoding(t *testing.T, s *Serializer) {
},
// Handling structs implementing MarshallJSON method, especially built-in collection types.
{
name: "List with MarshallJSON",
in: &ListWithMarshalJSONList{},
expect: "\"marshallJSON\"\n",
name: "List with MarshallJSON cannot be streamed",
in: &ListWithMarshalJSONList{},
expect: "\"marshallJSON\"\n",
cannotStream: true,
},
{
name: "Struct with MarshallJSON",
@ -435,6 +441,32 @@ func testCollectionsEncoding(t *testing.T, s *Serializer) {
expect: `{"kind":"List","apiVersion":"v1","metadata":{"resourceVersion":"2345"},"items":[{"kind":"Carp","apiVersion":"v1","metadata":{"name":"pod","namespace":"default","creationTimestamp":null},"spec":{},"status":{}},{"kind":"Carp","apiVersion":"v1","metadata":{"name":"pod2","namespace":"default2","creationTimestamp":null},"spec":{},"status":{}}]}
`,
},
{
name: "List with extra field cannot be streamed",
in: &ListWithAdditionalFields{
TypeMeta: metav1.TypeMeta{
Kind: "List",
APIVersion: "v1",
},
ListMeta: metav1.ListMeta{
ResourceVersion: "2345",
},
Items: []testapigroupv1.Carp{},
},
cannotStream: true,
expect: "{\"kind\":\"List\",\"apiVersion\":\"v1\",\"metadata\":{\"resourceVersion\":\"2345\"},\"items\":[],\"AdditionalField\":0}\n",
},
{
name: "Not a collection cannot be streamed",
in: &testapigroupv1.Carp{
TypeMeta: metav1.TypeMeta{
Kind: "List",
APIVersion: "v1",
},
},
cannotStream: true,
expect: "{\"kind\":\"List\",\"apiVersion\":\"v1\",\"metadata\":{\"creationTimestamp\":null},\"spec\":{},\"status\":{}}\n",
},
{
name: "UnstructuredList empty",
in: &unstructured.UnstructuredList{},
@ -543,10 +575,17 @@ func testCollectionsEncoding(t *testing.T, s *Serializer) {
if err := s.Encode(tc.in, &buf); err != nil {
t.Fatalf("unexpected error: %v", err)
}
t.Logf("normal: %s", buf.String())
t.Logf("encoded: %s", buf.String())
if diff := cmp.Diff(buf.String(), tc.expect); diff != "" {
t.Errorf("not matching:\n%s", diff)
}
expectStreaming := !tc.cannotStream && streamingEnabled
if expectStreaming && buf.writeCount <= 1 {
t.Errorf("expected streaming but Write was called only: %d", buf.writeCount)
}
if !expectStreaming && buf.writeCount > 1 {
t.Errorf("expected non-streaming but Write was called more than once: %d", buf.writeCount)
}
})
}
}
@ -653,3 +692,103 @@ type StructWithRawBytes struct {
func (s *StructWithRawBytes) DeepCopyObject() runtime.Object {
return nil
}
type ListWithAdditionalFields struct {
metav1.TypeMeta `json:",inline"`
metav1.ListMeta `json:"metadata,omitempty" protobuf:"bytes,1,opt,name=metadata"`
Items []testapigroupv1.Carp `json:"items" protobuf:"bytes,2,rep,name=items"`
AdditionalField int
}
func (s *ListWithAdditionalFields) DeepCopyObject() runtime.Object {
return nil
}
type writeCountingBuffer struct {
writeCount int
bytes.Buffer
}
func (b *writeCountingBuffer) Write(data []byte) (int, error) {
b.writeCount++
return b.Buffer.Write(data)
}
func (b *writeCountingBuffer) Reset() {
b.writeCount = 0
b.Buffer.Reset()
}
func TestFuzzCollectionsEncoding(t *testing.T) {
disableFuzzFieldsV1 := func(field *metav1.FieldsV1, c fuzz.Continue) {}
fuzzUnstructuredList := func(list *unstructured.UnstructuredList, c fuzz.Continue) {
list.Object = map[string]interface{}{
"kind": "List",
"apiVersion": "v1",
c.RandString(): c.RandString(),
c.RandString(): c.RandUint64(),
c.RandString(): c.RandBool(),
"metadata": map[string]interface{}{
"resourceVersion": fmt.Sprintf("%d", c.RandUint64()),
"continue": c.RandString(),
"remainingItemCount": fmt.Sprintf("%d", c.RandUint64()),
c.RandString(): c.RandString(),
}}
c.Fuzz(&list.Items)
}
fuzzMap := func(kvs map[string]interface{}, c fuzz.Continue) {
kvs[c.RandString()] = c.RandBool()
kvs[c.RandString()] = c.RandUint64()
kvs[c.RandString()] = c.RandString()
}
f := fuzz.New().Funcs(disableFuzzFieldsV1, fuzzUnstructuredList, fuzzMap)
streamingBuffer := &bytes.Buffer{}
normalSerializer := NewSerializerWithOptions(DefaultMetaFactory, nil, nil, SerializerOptions{StreamingCollectionsEncoding: false})
normalBuffer := &bytes.Buffer{}
t.Run("CarpList", func(t *testing.T) {
for i := 0; i < 1000; i++ {
list := &testapigroupv1.CarpList{}
f.Fuzz(list)
streamingBuffer.Reset()
normalBuffer.Reset()
ok, err := streamEncodeCollections(list, streamingBuffer)
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
if !ok {
t.Fatalf("expected streaming encoder to encode %T", list)
}
if err := normalSerializer.Encode(list, normalBuffer); err != nil {
t.Fatal(err)
}
if diff := cmp.Diff(normalBuffer.String(), streamingBuffer.String()); diff != "" {
t.Logf("normal: %s", normalBuffer.String())
t.Logf("streaming: %s", streamingBuffer.String())
t.Errorf("not matching:\n%s", diff)
}
}
})
t.Run("UnstructuredList", func(t *testing.T) {
for i := 0; i < 1000; i++ {
list := &unstructured.UnstructuredList{}
f.Fuzz(list)
streamingBuffer.Reset()
normalBuffer.Reset()
ok, err := streamEncodeCollections(list, streamingBuffer)
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
if !ok {
t.Fatalf("expected streaming encoder to encode %T", list)
}
if err := normalSerializer.Encode(list, normalBuffer); err != nil {
t.Fatal(err)
}
if diff := cmp.Diff(normalBuffer.String(), streamingBuffer.String()); diff != "" {
t.Logf("normal: %s", normalBuffer.String())
t.Logf("streaming: %s", streamingBuffer.String())
t.Errorf("not matching:\n%s", diff)
}
}
})
}

View File

@ -36,7 +36,7 @@ import (
// is not nil, the object has the group, version, and kind fields set.
// Deprecated: use NewSerializerWithOptions instead.
func NewSerializer(meta MetaFactory, creater runtime.ObjectCreater, typer runtime.ObjectTyper, pretty bool) *Serializer {
return NewSerializerWithOptions(meta, creater, typer, SerializerOptions{false, pretty, false})
return NewSerializerWithOptions(meta, creater, typer, SerializerOptions{false, pretty, false, false})
}
// NewYAMLSerializer creates a YAML serializer that handles encoding versioned objects into the proper YAML form. If typer
@ -44,7 +44,7 @@ func NewSerializer(meta MetaFactory, creater runtime.ObjectCreater, typer runtim
// matches JSON, and will error if constructs are used that do not serialize to JSON.
// Deprecated: use NewSerializerWithOptions instead.
func NewYAMLSerializer(meta MetaFactory, creater runtime.ObjectCreater, typer runtime.ObjectTyper) *Serializer {
return NewSerializerWithOptions(meta, creater, typer, SerializerOptions{true, false, false})
return NewSerializerWithOptions(meta, creater, typer, SerializerOptions{true, false, false, false})
}
// NewSerializerWithOptions creates a JSON/YAML serializer that handles encoding versioned objects into the proper JSON/YAML
@ -93,6 +93,9 @@ type SerializerOptions struct {
// Strict: configures the Serializer to return strictDecodingError's when duplicate fields are present decoding JSON or YAML.
// Note that enabling this option is not as performant as the non-strict variant, and should not be used in fast paths.
Strict bool
// StreamingCollectionsEncoding enables encoding collection, one item at the time, drastically reducing memory needed.
StreamingCollectionsEncoding bool
}
// Serializer handles encoding versioned objects into the proper JSON form
@ -242,6 +245,15 @@ func (s *Serializer) doEncode(obj runtime.Object, w io.Writer) error {
_, err = w.Write(data)
return err
}
if s.options.StreamingCollectionsEncoding {
ok, err := streamEncodeCollections(obj, w)
if err != nil {
return err
}
if ok {
return nil
}
}
encoder := json.NewEncoder(w)
return encoder.Encode(obj)
}

View File

@ -39,8 +39,11 @@ import (
"github.com/google/go-cmp/cmp"
v1 "k8s.io/api/core/v1"
kerrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
testapigroupv1 "k8s.io/apimachinery/pkg/apis/testapigroup/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
jsonserializer "k8s.io/apimachinery/pkg/runtime/serializer/json"
rand2 "k8s.io/apimachinery/pkg/util/rand"
"k8s.io/apimachinery/pkg/util/uuid"
"k8s.io/apiserver/pkg/features"
@ -804,3 +807,80 @@ func gzipContent(data []byte, level int) []byte {
}
return buf.Bytes()
}
func TestStreamingGzipIntegration(t *testing.T) {
largeChunk := bytes.Repeat([]byte("b"), defaultGzipThresholdBytes+1)
tcs := []struct {
name string
serializer runtime.Encoder
object runtime.Object
expectGzip bool
expectStreaming bool
}{
{
name: "JSON, small object, default -> no gzip",
serializer: jsonserializer.NewSerializerWithOptions(jsonserializer.DefaultMetaFactory, nil, nil, jsonserializer.SerializerOptions{}),
object: &testapigroupv1.CarpList{},
expectGzip: false,
expectStreaming: false,
},
{
name: "JSON, small object, streaming -> no gzip",
serializer: jsonserializer.NewSerializerWithOptions(jsonserializer.DefaultMetaFactory, nil, nil, jsonserializer.SerializerOptions{StreamingCollectionsEncoding: true}),
object: &testapigroupv1.CarpList{},
expectGzip: false,
expectStreaming: true,
},
{
name: "JSON, large object, default -> gzip",
serializer: jsonserializer.NewSerializerWithOptions(jsonserializer.DefaultMetaFactory, nil, nil, jsonserializer.SerializerOptions{}),
object: &testapigroupv1.CarpList{TypeMeta: metav1.TypeMeta{Kind: string(largeChunk)}},
expectGzip: true,
expectStreaming: false,
},
{
name: "JSON, large object, streaming -> gzip",
serializer: jsonserializer.NewSerializerWithOptions(jsonserializer.DefaultMetaFactory, nil, nil, jsonserializer.SerializerOptions{StreamingCollectionsEncoding: true}),
object: &testapigroupv1.CarpList{TypeMeta: metav1.TypeMeta{Kind: string(largeChunk)}},
expectGzip: true,
expectStreaming: true,
},
}
for _, tc := range tcs {
t.Run(tc.name, func(t *testing.T) {
mockResponseWriter := httptest.NewRecorder()
drw := &deferredResponseWriter{
mediaType: "text/plain",
statusCode: 200,
contentEncoding: "gzip",
hw: mockResponseWriter,
ctx: context.Background(),
}
counter := &writeCounter{Writer: drw}
err := tc.serializer.Encode(tc.object, counter)
if err != nil {
t.Fatal(err)
}
encoding := mockResponseWriter.Header().Get("Content-Encoding")
if (encoding == "gzip") != tc.expectGzip {
t.Errorf("Expect gzip: %v, got: %q", tc.expectGzip, encoding)
}
if counter.writeCount < 1 {
t.Fatalf("Expect at least 1 write")
}
if (counter.writeCount > 1) != tc.expectStreaming {
t.Errorf("Expect streaming: %v, got write count: %d", tc.expectStreaming, counter.writeCount)
}
})
}
}
type writeCounter struct {
writeCount int
io.Writer
}
func (b *writeCounter) Write(data []byte) (int, error) {
b.writeCount++
return b.Writer.Write(data)
}

View File

@ -217,6 +217,10 @@ const (
// document.
StorageVersionHash featuregate.Feature = "StorageVersionHash"
// owner: @serathius
// Allow API server to encode collections item by item, instead of all at once.
StreamingCollectionEncodingToJSON featuregate.Feature = "StreamingCollectionEncodingToJSON"
// owner: @aramase, @enj, @nabokihms
// kep: https://kep.k8s.io/3331
//
@ -387,6 +391,10 @@ var defaultVersionedKubernetesFeatureGates = map[featuregate.Feature]featuregate
{Version: version.MustParse("1.15"), Default: true, PreRelease: featuregate.Beta},
},
StreamingCollectionEncodingToJSON: {
{Version: version.MustParse("1.33"), Default: true, PreRelease: featuregate.Beta},
},
StrictCostEnforcementForVAP: {
{Version: version.MustParse("1.30"), Default: false, PreRelease: featuregate.Beta},
{Version: version.MustParse("1.32"), Default: true, PreRelease: featuregate.GA, LockToDefault: true},

View File

@ -991,8 +991,15 @@ func (s *GenericAPIServer) newAPIGroupVersion(apiGroupInfo *APIGroupInfo, groupV
// NewDefaultAPIGroupInfo returns an APIGroupInfo stubbed with "normal" values
// exposed for easier composition from other packages
func NewDefaultAPIGroupInfo(group string, scheme *runtime.Scheme, parameterCodec runtime.ParameterCodec, codecs serializer.CodecFactory) APIGroupInfo {
opts := []serializer.CodecFactoryOptionsMutator{}
if utilfeature.DefaultFeatureGate.Enabled(features.CBORServingAndStorage) {
codecs = serializer.NewCodecFactory(scheme, serializer.WithSerializer(cbor.NewSerializerInfo))
opts = append(opts, serializer.WithSerializer(cbor.NewSerializerInfo))
}
if utilfeature.DefaultFeatureGate.Enabled(features.StreamingCollectionEncodingToJSON) {
opts = append(opts, serializer.WithStreamingCollectionEncodingToJSON())
}
if len(opts) != 0 {
codecs = serializer.NewCodecFactory(scheme, opts...)
}
return APIGroupInfo{
PrioritizedVersions: scheme.PrioritizedVersionsForGroup(group),

View File

@ -1324,6 +1324,12 @@
lockToDefault: false
preRelease: Alpha
version: "1.30"
- name: StreamingCollectionEncodingToJSON
versionedSpecs:
- default: true
lockToDefault: false
preRelease: Beta
version: "1.33"
- name: StrictCostEnforcementForVAP
versionedSpecs:
- default: false