diff --git a/staging/src/k8s.io/apimachinery/pkg/runtime/types.go b/staging/src/k8s.io/apimachinery/pkg/runtime/types.go index 1680c149f95..ca7b7cc2d48 100644 --- a/staging/src/k8s.io/apimachinery/pkg/runtime/types.go +++ b/staging/src/k8s.io/apimachinery/pkg/runtime/types.go @@ -43,10 +43,11 @@ type TypeMeta struct { } const ( - ContentTypeJSON string = "application/json" - ContentTypeYAML string = "application/yaml" - ContentTypeProtobuf string = "application/vnd.kubernetes.protobuf" - ContentTypeCBOR string = "application/cbor" + ContentTypeJSON string = "application/json" + ContentTypeYAML string = "application/yaml" + ContentTypeProtobuf string = "application/vnd.kubernetes.protobuf" + ContentTypeCBOR string = "application/cbor" // RFC 8949 + ContentTypeCBORSequence string = "application/cbor-seq" // RFC 8742 ) // RawExtension is used to hold extensions in external versions. diff --git a/staging/src/k8s.io/apiserver/pkg/endpoints/handlers/negotiation/negotiate.go b/staging/src/k8s.io/apiserver/pkg/endpoints/handlers/negotiation/negotiate.go index 9dbad1ea65c..7667e6639c2 100644 --- a/staging/src/k8s.io/apiserver/pkg/endpoints/handlers/negotiation/negotiate.go +++ b/staging/src/k8s.io/apiserver/pkg/endpoints/handlers/negotiation/negotiate.go @@ -26,6 +26,8 @@ import ( "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/apiserver/pkg/features" + utilfeature "k8s.io/apiserver/pkg/util/feature" ) // MediaTypesForSerializer returns a list of media and stream media types for the server. @@ -33,6 +35,10 @@ func MediaTypesForSerializer(ns runtime.NegotiatedSerializer) (mediaTypes, strea for _, info := range ns.SupportedMediaTypes() { mediaTypes = append(mediaTypes, info.MediaType) if info.StreamSerializer != nil { + if utilfeature.DefaultFeatureGate.Enabled(features.CBORServingAndStorage) && info.MediaType == runtime.ContentTypeCBOR { + streamMediaTypes = append(streamMediaTypes, runtime.ContentTypeCBORSequence) + continue + } // stream=watch is the existing mime-type parameter for watch streamMediaTypes = append(streamMediaTypes, info.MediaType+";stream=watch") } diff --git a/staging/src/k8s.io/apiserver/pkg/endpoints/handlers/watch.go b/staging/src/k8s.io/apiserver/pkg/endpoints/handlers/watch.go index 8876a19cd62..c239d1f7abe 100644 --- a/staging/src/k8s.io/apiserver/pkg/endpoints/handlers/watch.go +++ b/staging/src/k8s.io/apiserver/pkg/endpoints/handlers/watch.go @@ -88,7 +88,17 @@ func serveWatchHandler(watcher watch.Interface, scope *RequestScope, mediaTypeOp } // TODO: next step, get back mediaTypeOptions from negotiate and return the exact value here mediaType := serializer.MediaType - if mediaType != runtime.ContentTypeJSON { + switch mediaType { + case runtime.ContentTypeJSON: + // as-is + case runtime.ContentTypeCBOR: + // If a client indicated it accepts application/cbor (exactly one data item) on a + // watch request, set the conformant application/cbor-seq media type the watch + // response. RFC 9110 allows an origin server to deviate from the indicated + // preference rather than send a 406 (Not Acceptable) response (see + // https://www.rfc-editor.org/rfc/rfc9110.html#section-12.1-5). + mediaType = runtime.ContentTypeCBORSequence + default: mediaType += ";stream=watch" } diff --git a/staging/src/k8s.io/client-go/rest/client.go b/staging/src/k8s.io/client-go/rest/client.go index 9bcb99c549b..393a4166913 100644 --- a/staging/src/k8s.io/client-go/rest/client.go +++ b/staging/src/k8s.io/client-go/rest/client.go @@ -129,6 +129,7 @@ func NewRESTClient(baseURL *url.URL, versionedAPIPath string, config ClientConte func scrubCBORContentConfigIfDisabled(content ClientContentConfig) ClientContentConfig { if clientfeatures.FeatureGates().Enabled(clientfeatures.ClientsAllowCBOR) { + content.Negotiator = clientNegotiatorWithCBORSequenceStreamDecoder{content.Negotiator} return content } @@ -294,3 +295,38 @@ func (p *requestClientContentConfigProvider) UnsupportedMediaType(requestContent p.sawUnsupportedMediaTypeForCBOR.Store(true) } } + +// clientNegotiatorWithCBORSequenceStreamDecoder is a ClientNegotiator that delegates to another +// ClientNegotiator to select the appropriate Encoder or Decoder for a given media type. As a +// special case, it will resolve "application/cbor-seq" (a CBOR Sequence, the concatenation of zero +// or more CBOR data items) as an alias for "application/cbor" (exactly one CBOR data item) when +// selecting a stream decoder. +type clientNegotiatorWithCBORSequenceStreamDecoder struct { + negotiator runtime.ClientNegotiator +} + +func (n clientNegotiatorWithCBORSequenceStreamDecoder) Encoder(contentType string, params map[string]string) (runtime.Encoder, error) { + return n.negotiator.Encoder(contentType, params) +} + +func (n clientNegotiatorWithCBORSequenceStreamDecoder) Decoder(contentType string, params map[string]string) (runtime.Decoder, error) { + return n.negotiator.Decoder(contentType, params) +} + +func (n clientNegotiatorWithCBORSequenceStreamDecoder) StreamDecoder(contentType string, params map[string]string) (runtime.Decoder, runtime.Serializer, runtime.Framer, error) { + if !clientfeatures.FeatureGates().Enabled(clientfeatures.ClientsAllowCBOR) { + return n.negotiator.StreamDecoder(contentType, params) + } + + switch contentType { + case runtime.ContentTypeCBORSequence: + return n.negotiator.StreamDecoder(runtime.ContentTypeCBOR, params) + case runtime.ContentTypeCBOR: + // This media type is only appropriate for exactly one data item, not the zero or + // more events of a watch stream. + return nil, nil, nil, runtime.NegotiateError{ContentType: contentType, Stream: true} + default: + return n.negotiator.StreamDecoder(contentType, params) + } + +} diff --git a/test/integration/client/client_test.go b/test/integration/client/client_test.go index bda92bdd731..cbba4e0bbfa 100644 --- a/test/integration/client/client_test.go +++ b/test/integration/client/client_test.go @@ -1430,6 +1430,30 @@ func TestClientCBOREnablement(t *testing.T) { return err } + DoWatchRequestWithGenericTypedClient := func(t *testing.T, config *rest.Config) error { + clientset, err := kubernetes.NewForConfig(config) + if err != nil { + t.Fatal(err) + } + + // Generated clients for built-in types include the PreferProtobuf option, which + // forces Protobuf encoding on a per-request basis. + client := gentype.NewClientWithListAndApply[*v1.Namespace, *v1.NamespaceList, *corev1ac.NamespaceApplyConfiguration]( + "namespaces", + clientset.CoreV1().RESTClient(), + clientscheme.ParameterCodec, + "", + func() *v1.Namespace { return &v1.Namespace{} }, + func() *v1.NamespaceList { return &v1.NamespaceList{} }, + ) + w, err := client.Watch(context.TODO(), metav1.ListOptions{LabelSelector: "a,!a"}) + if err != nil { + return err + } + w.Stop() + return nil + } + type testCase struct { name string served bool @@ -1650,6 +1674,20 @@ func TestClientCBOREnablement(t *testing.T) { wantStatusError: false, doRequest: DoRequestWithGenericTypedClient, }, + { + name: "generated client watch accept cbor and json get cbor-seq", + served: true, + allowed: true, + preferred: false, + configuredContentType: "application/json", + configuredAccept: "application/cbor;q=1,application/json;q=0.9", + wantRequestContentType: "", + wantRequestAccept: "application/cbor;q=1,application/json;q=0.9", + wantResponseContentType: "application/cbor-seq", + wantResponseStatus: http.StatusOK, + wantStatusError: false, + doRequest: DoWatchRequestWithGenericTypedClient, + }, { name: "generated client accept cbor and json get json cbor not served", served: false, diff --git a/test/integration/client/dynamic_client_test.go b/test/integration/client/dynamic_client_test.go index 13abc33c5d0..6f7866233ac 100644 --- a/test/integration/client/dynamic_client_test.go +++ b/test/integration/client/dynamic_client_test.go @@ -408,6 +408,20 @@ func TestDynamicClientCBOREnablement(t *testing.T) { return err } + DoWatch := func(t *testing.T, config *rest.Config) error { + client, err := dynamic.NewForConfig(config) + if err != nil { + t.Fatal(err) + } + + w, err := client.Resource(corev1.SchemeGroupVersion.WithResource("namespaces")).Watch(context.TODO(), metav1.ListOptions{LabelSelector: "a,!a"}) + if err != nil { + return err + } + w.Stop() + return nil + } + testCases := []struct { name string serving bool @@ -540,6 +554,17 @@ func TestDynamicClientCBOREnablement(t *testing.T) { wantStatusError: true, doRequest: DoApply, }, + { + name: "watch accepts both gets cbor-seq", + serving: true, + allowed: true, + preferred: false, + wantRequestAccept: "application/json;q=0.9,application/cbor;q=1", + wantResponseContentType: "application/cbor-seq", + wantResponseStatus: http.StatusOK, + wantStatusError: false, + doRequest: DoWatch, + }, } for _, serving := range []bool{true, false} {