Merge pull request #128501 from benluddy/watch-cbor-seq

KEP-4222: Use cbor-seq content-type for CBOR watch responses.
This commit is contained in:
Kubernetes Prow Robot 2024-11-06 20:10:17 +00:00 committed by GitHub
commit a885e446d6
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
6 changed files with 121 additions and 5 deletions

View File

@ -46,7 +46,8 @@ const (
ContentTypeJSON string = "application/json" ContentTypeJSON string = "application/json"
ContentTypeYAML string = "application/yaml" ContentTypeYAML string = "application/yaml"
ContentTypeProtobuf string = "application/vnd.kubernetes.protobuf" ContentTypeProtobuf string = "application/vnd.kubernetes.protobuf"
ContentTypeCBOR string = "application/cbor" ContentTypeCBOR string = "application/cbor" // RFC 8949
ContentTypeCBORSequence string = "application/cbor-seq" // RFC 8742
) )
// RawExtension is used to hold extensions in external versions. // RawExtension is used to hold extensions in external versions.

View File

@ -26,6 +26,8 @@ import (
"k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apiserver/pkg/features"
utilfeature "k8s.io/apiserver/pkg/util/feature"
) )
// MediaTypesForSerializer returns a list of media and stream media types for the server. // MediaTypesForSerializer returns a list of media and stream media types for the server.
@ -33,6 +35,10 @@ func MediaTypesForSerializer(ns runtime.NegotiatedSerializer) (mediaTypes, strea
for _, info := range ns.SupportedMediaTypes() { for _, info := range ns.SupportedMediaTypes() {
mediaTypes = append(mediaTypes, info.MediaType) mediaTypes = append(mediaTypes, info.MediaType)
if info.StreamSerializer != nil { if info.StreamSerializer != nil {
if utilfeature.DefaultFeatureGate.Enabled(features.CBORServingAndStorage) && info.MediaType == runtime.ContentTypeCBOR {
streamMediaTypes = append(streamMediaTypes, runtime.ContentTypeCBORSequence)
continue
}
// stream=watch is the existing mime-type parameter for watch // stream=watch is the existing mime-type parameter for watch
streamMediaTypes = append(streamMediaTypes, info.MediaType+";stream=watch") streamMediaTypes = append(streamMediaTypes, info.MediaType+";stream=watch")
} }

View File

@ -88,7 +88,17 @@ func serveWatchHandler(watcher watch.Interface, scope *RequestScope, mediaTypeOp
} }
// TODO: next step, get back mediaTypeOptions from negotiate and return the exact value here // TODO: next step, get back mediaTypeOptions from negotiate and return the exact value here
mediaType := serializer.MediaType mediaType := serializer.MediaType
if mediaType != runtime.ContentTypeJSON { switch mediaType {
case runtime.ContentTypeJSON:
// as-is
case runtime.ContentTypeCBOR:
// If a client indicated it accepts application/cbor (exactly one data item) on a
// watch request, set the conformant application/cbor-seq media type the watch
// response. RFC 9110 allows an origin server to deviate from the indicated
// preference rather than send a 406 (Not Acceptable) response (see
// https://www.rfc-editor.org/rfc/rfc9110.html#section-12.1-5).
mediaType = runtime.ContentTypeCBORSequence
default:
mediaType += ";stream=watch" mediaType += ";stream=watch"
} }

View File

@ -129,6 +129,7 @@ func NewRESTClient(baseURL *url.URL, versionedAPIPath string, config ClientConte
func scrubCBORContentConfigIfDisabled(content ClientContentConfig) ClientContentConfig { func scrubCBORContentConfigIfDisabled(content ClientContentConfig) ClientContentConfig {
if clientfeatures.FeatureGates().Enabled(clientfeatures.ClientsAllowCBOR) { if clientfeatures.FeatureGates().Enabled(clientfeatures.ClientsAllowCBOR) {
content.Negotiator = clientNegotiatorWithCBORSequenceStreamDecoder{content.Negotiator}
return content return content
} }
@ -294,3 +295,38 @@ func (p *requestClientContentConfigProvider) UnsupportedMediaType(requestContent
p.sawUnsupportedMediaTypeForCBOR.Store(true) p.sawUnsupportedMediaTypeForCBOR.Store(true)
} }
} }
// clientNegotiatorWithCBORSequenceStreamDecoder is a ClientNegotiator that delegates to another
// ClientNegotiator to select the appropriate Encoder or Decoder for a given media type. As a
// special case, it will resolve "application/cbor-seq" (a CBOR Sequence, the concatenation of zero
// or more CBOR data items) as an alias for "application/cbor" (exactly one CBOR data item) when
// selecting a stream decoder.
type clientNegotiatorWithCBORSequenceStreamDecoder struct {
negotiator runtime.ClientNegotiator
}
func (n clientNegotiatorWithCBORSequenceStreamDecoder) Encoder(contentType string, params map[string]string) (runtime.Encoder, error) {
return n.negotiator.Encoder(contentType, params)
}
func (n clientNegotiatorWithCBORSequenceStreamDecoder) Decoder(contentType string, params map[string]string) (runtime.Decoder, error) {
return n.negotiator.Decoder(contentType, params)
}
func (n clientNegotiatorWithCBORSequenceStreamDecoder) StreamDecoder(contentType string, params map[string]string) (runtime.Decoder, runtime.Serializer, runtime.Framer, error) {
if !clientfeatures.FeatureGates().Enabled(clientfeatures.ClientsAllowCBOR) {
return n.negotiator.StreamDecoder(contentType, params)
}
switch contentType {
case runtime.ContentTypeCBORSequence:
return n.negotiator.StreamDecoder(runtime.ContentTypeCBOR, params)
case runtime.ContentTypeCBOR:
// This media type is only appropriate for exactly one data item, not the zero or
// more events of a watch stream.
return nil, nil, nil, runtime.NegotiateError{ContentType: contentType, Stream: true}
default:
return n.negotiator.StreamDecoder(contentType, params)
}
}

View File

@ -1430,6 +1430,30 @@ func TestClientCBOREnablement(t *testing.T) {
return err return err
} }
DoWatchRequestWithGenericTypedClient := func(t *testing.T, config *rest.Config) error {
clientset, err := kubernetes.NewForConfig(config)
if err != nil {
t.Fatal(err)
}
// Generated clients for built-in types include the PreferProtobuf option, which
// forces Protobuf encoding on a per-request basis.
client := gentype.NewClientWithListAndApply[*v1.Namespace, *v1.NamespaceList, *corev1ac.NamespaceApplyConfiguration](
"namespaces",
clientset.CoreV1().RESTClient(),
clientscheme.ParameterCodec,
"",
func() *v1.Namespace { return &v1.Namespace{} },
func() *v1.NamespaceList { return &v1.NamespaceList{} },
)
w, err := client.Watch(context.TODO(), metav1.ListOptions{LabelSelector: "a,!a"})
if err != nil {
return err
}
w.Stop()
return nil
}
type testCase struct { type testCase struct {
name string name string
served bool served bool
@ -1650,6 +1674,20 @@ func TestClientCBOREnablement(t *testing.T) {
wantStatusError: false, wantStatusError: false,
doRequest: DoRequestWithGenericTypedClient, doRequest: DoRequestWithGenericTypedClient,
}, },
{
name: "generated client watch accept cbor and json get cbor-seq",
served: true,
allowed: true,
preferred: false,
configuredContentType: "application/json",
configuredAccept: "application/cbor;q=1,application/json;q=0.9",
wantRequestContentType: "",
wantRequestAccept: "application/cbor;q=1,application/json;q=0.9",
wantResponseContentType: "application/cbor-seq",
wantResponseStatus: http.StatusOK,
wantStatusError: false,
doRequest: DoWatchRequestWithGenericTypedClient,
},
{ {
name: "generated client accept cbor and json get json cbor not served", name: "generated client accept cbor and json get json cbor not served",
served: false, served: false,

View File

@ -408,6 +408,20 @@ func TestDynamicClientCBOREnablement(t *testing.T) {
return err return err
} }
DoWatch := func(t *testing.T, config *rest.Config) error {
client, err := dynamic.NewForConfig(config)
if err != nil {
t.Fatal(err)
}
w, err := client.Resource(corev1.SchemeGroupVersion.WithResource("namespaces")).Watch(context.TODO(), metav1.ListOptions{LabelSelector: "a,!a"})
if err != nil {
return err
}
w.Stop()
return nil
}
testCases := []struct { testCases := []struct {
name string name string
serving bool serving bool
@ -540,6 +554,17 @@ func TestDynamicClientCBOREnablement(t *testing.T) {
wantStatusError: true, wantStatusError: true,
doRequest: DoApply, doRequest: DoApply,
}, },
{
name: "watch accepts both gets cbor-seq",
serving: true,
allowed: true,
preferred: false,
wantRequestAccept: "application/json;q=0.9,application/cbor;q=1",
wantResponseContentType: "application/cbor-seq",
wantResponseStatus: http.StatusOK,
wantStatusError: false,
doRequest: DoWatch,
},
} }
for _, serving := range []bool{true, false} { for _, serving := range []bool{true, false} {