Use application/cbor-seq media type in streaming CBOR responses.

The media type application/cbor describes exactly one encoded item. As a new (to Kubernetes) format
with no existing clients, streaming/watch responses will use the application/cbor-seq media
type. CBOR watch responses conform to the specification of CBOR Sequences and are encoded as the
concatenation of zero or more items with no additional framing.
This commit is contained in:
Ben Luddy 2024-11-01 13:14:06 -04:00
parent e273349f3a
commit 504f14998e
No known key found for this signature in database
GPG Key ID: A6551E73A5974C30
6 changed files with 121 additions and 5 deletions

View File

@ -43,10 +43,11 @@ type TypeMeta struct {
}
const (
ContentTypeJSON string = "application/json"
ContentTypeYAML string = "application/yaml"
ContentTypeProtobuf string = "application/vnd.kubernetes.protobuf"
ContentTypeCBOR string = "application/cbor"
ContentTypeJSON string = "application/json"
ContentTypeYAML string = "application/yaml"
ContentTypeProtobuf string = "application/vnd.kubernetes.protobuf"
ContentTypeCBOR string = "application/cbor" // RFC 8949
ContentTypeCBORSequence string = "application/cbor-seq" // RFC 8742
)
// RawExtension is used to hold extensions in external versions.

View File

@ -26,6 +26,8 @@ import (
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apiserver/pkg/features"
utilfeature "k8s.io/apiserver/pkg/util/feature"
)
// MediaTypesForSerializer returns a list of media and stream media types for the server.
@ -33,6 +35,10 @@ func MediaTypesForSerializer(ns runtime.NegotiatedSerializer) (mediaTypes, strea
for _, info := range ns.SupportedMediaTypes() {
mediaTypes = append(mediaTypes, info.MediaType)
if info.StreamSerializer != nil {
if utilfeature.DefaultFeatureGate.Enabled(features.CBORServingAndStorage) && info.MediaType == runtime.ContentTypeCBOR {
streamMediaTypes = append(streamMediaTypes, runtime.ContentTypeCBORSequence)
continue
}
// stream=watch is the existing mime-type parameter for watch
streamMediaTypes = append(streamMediaTypes, info.MediaType+";stream=watch")
}

View File

@ -88,7 +88,17 @@ func serveWatchHandler(watcher watch.Interface, scope *RequestScope, mediaTypeOp
}
// TODO: next step, get back mediaTypeOptions from negotiate and return the exact value here
mediaType := serializer.MediaType
if mediaType != runtime.ContentTypeJSON {
switch mediaType {
case runtime.ContentTypeJSON:
// as-is
case runtime.ContentTypeCBOR:
// If a client indicated it accepts application/cbor (exactly one data item) on a
// watch request, set the conformant application/cbor-seq media type the watch
// response. RFC 9110 allows an origin server to deviate from the indicated
// preference rather than send a 406 (Not Acceptable) response (see
// https://www.rfc-editor.org/rfc/rfc9110.html#section-12.1-5).
mediaType = runtime.ContentTypeCBORSequence
default:
mediaType += ";stream=watch"
}

View File

@ -129,6 +129,7 @@ func NewRESTClient(baseURL *url.URL, versionedAPIPath string, config ClientConte
func scrubCBORContentConfigIfDisabled(content ClientContentConfig) ClientContentConfig {
if clientfeatures.FeatureGates().Enabled(clientfeatures.ClientsAllowCBOR) {
content.Negotiator = clientNegotiatorWithCBORSequenceStreamDecoder{content.Negotiator}
return content
}
@ -294,3 +295,38 @@ func (p *requestClientContentConfigProvider) UnsupportedMediaType(requestContent
p.sawUnsupportedMediaTypeForCBOR.Store(true)
}
}
// clientNegotiatorWithCBORSequenceStreamDecoder is a ClientNegotiator that delegates to another
// ClientNegotiator to select the appropriate Encoder or Decoder for a given media type. As a
// special case, it will resolve "application/cbor-seq" (a CBOR Sequence, the concatenation of zero
// or more CBOR data items) as an alias for "application/cbor" (exactly one CBOR data item) when
// selecting a stream decoder.
type clientNegotiatorWithCBORSequenceStreamDecoder struct {
negotiator runtime.ClientNegotiator
}
func (n clientNegotiatorWithCBORSequenceStreamDecoder) Encoder(contentType string, params map[string]string) (runtime.Encoder, error) {
return n.negotiator.Encoder(contentType, params)
}
func (n clientNegotiatorWithCBORSequenceStreamDecoder) Decoder(contentType string, params map[string]string) (runtime.Decoder, error) {
return n.negotiator.Decoder(contentType, params)
}
func (n clientNegotiatorWithCBORSequenceStreamDecoder) StreamDecoder(contentType string, params map[string]string) (runtime.Decoder, runtime.Serializer, runtime.Framer, error) {
if !clientfeatures.FeatureGates().Enabled(clientfeatures.ClientsAllowCBOR) {
return n.negotiator.StreamDecoder(contentType, params)
}
switch contentType {
case runtime.ContentTypeCBORSequence:
return n.negotiator.StreamDecoder(runtime.ContentTypeCBOR, params)
case runtime.ContentTypeCBOR:
// This media type is only appropriate for exactly one data item, not the zero or
// more events of a watch stream.
return nil, nil, nil, runtime.NegotiateError{ContentType: contentType, Stream: true}
default:
return n.negotiator.StreamDecoder(contentType, params)
}
}

View File

@ -1430,6 +1430,30 @@ func TestClientCBOREnablement(t *testing.T) {
return err
}
DoWatchRequestWithGenericTypedClient := func(t *testing.T, config *rest.Config) error {
clientset, err := kubernetes.NewForConfig(config)
if err != nil {
t.Fatal(err)
}
// Generated clients for built-in types include the PreferProtobuf option, which
// forces Protobuf encoding on a per-request basis.
client := gentype.NewClientWithListAndApply[*v1.Namespace, *v1.NamespaceList, *corev1ac.NamespaceApplyConfiguration](
"namespaces",
clientset.CoreV1().RESTClient(),
clientscheme.ParameterCodec,
"",
func() *v1.Namespace { return &v1.Namespace{} },
func() *v1.NamespaceList { return &v1.NamespaceList{} },
)
w, err := client.Watch(context.TODO(), metav1.ListOptions{LabelSelector: "a,!a"})
if err != nil {
return err
}
w.Stop()
return nil
}
type testCase struct {
name string
served bool
@ -1650,6 +1674,20 @@ func TestClientCBOREnablement(t *testing.T) {
wantStatusError: false,
doRequest: DoRequestWithGenericTypedClient,
},
{
name: "generated client watch accept cbor and json get cbor-seq",
served: true,
allowed: true,
preferred: false,
configuredContentType: "application/json",
configuredAccept: "application/cbor;q=1,application/json;q=0.9",
wantRequestContentType: "",
wantRequestAccept: "application/cbor;q=1,application/json;q=0.9",
wantResponseContentType: "application/cbor-seq",
wantResponseStatus: http.StatusOK,
wantStatusError: false,
doRequest: DoWatchRequestWithGenericTypedClient,
},
{
name: "generated client accept cbor and json get json cbor not served",
served: false,

View File

@ -408,6 +408,20 @@ func TestDynamicClientCBOREnablement(t *testing.T) {
return err
}
DoWatch := func(t *testing.T, config *rest.Config) error {
client, err := dynamic.NewForConfig(config)
if err != nil {
t.Fatal(err)
}
w, err := client.Resource(corev1.SchemeGroupVersion.WithResource("namespaces")).Watch(context.TODO(), metav1.ListOptions{LabelSelector: "a,!a"})
if err != nil {
return err
}
w.Stop()
return nil
}
testCases := []struct {
name string
serving bool
@ -540,6 +554,17 @@ func TestDynamicClientCBOREnablement(t *testing.T) {
wantStatusError: true,
doRequest: DoApply,
},
{
name: "watch accepts both gets cbor-seq",
serving: true,
allowed: true,
preferred: false,
wantRequestAccept: "application/json;q=0.9,application/cbor;q=1",
wantResponseContentType: "application/cbor-seq",
wantResponseStatus: http.StatusOK,
wantStatusError: false,
doRequest: DoWatch,
},
}
for _, serving := range []bool{true, false} {