CRDs should support watch of protobuf PartialObjectMetadata

Correctly ensure CRDs can be watched using protobuf when transformed to
PartialObjectMetadata. To do this we add a set of serializers allowed to
be used for "normal" requests (that return CRDs) while the serializers
supported by the infrastructure is broader and includes protobuf. During
negotatiation we check for transformation requests and protobuf is
excluded from non-transform requests.

As part of the change, correct an error message when the server returns
a 406 but the client doesn't accept the format to avoid confusing users
who set impossible Accept rules for CRDs (the dynamic client doesn't
support Protobuf, so if the server responds with a protobuf status the
message from the server is lost and the generic error was confusing).
This commit is contained in:
Clayton Coleman 2019-04-16 23:21:44 -04:00
parent d8fd232ea1
commit 89e752add0
No known key found for this signature in database
GPG Key ID: 3D16906B4F1C5CB3
7 changed files with 144 additions and 47 deletions

View File

@ -52,6 +52,7 @@ go_library(
"//staging/src/k8s.io/apimachinery/pkg/runtime/schema:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/runtime/serializer:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/runtime/serializer/json:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/runtime/serializer/protobuf:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/runtime/serializer/versioning:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/types:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/runtime:go_default_library",

View File

@ -52,6 +52,7 @@ import (
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/runtime/serializer"
"k8s.io/apimachinery/pkg/runtime/serializer/json"
"k8s.io/apimachinery/pkg/runtime/serializer/protobuf"
"k8s.io/apimachinery/pkg/runtime/serializer/versioning"
"k8s.io/apimachinery/pkg/types"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
@ -627,21 +628,32 @@ func (r *crdHandler) getOrCreateServingInfoFor(crd *apiextensions.CustomResource
clusterScoped := crd.Spec.Scope == apiextensions.ClusterScoped
// CRDs explicitly do not support protobuf, but some objects returned by the API server do
negotiatedSerializer := unstructuredNegotiatedSerializer{
typer: typer,
creator: creator,
converter: safeConverter,
structuralSchemas: structuralSchemas,
structuralSchemaGK: kind.GroupKind(),
preserveUnknownFields: *crd.Spec.PreserveUnknownFields,
}
var standardSerializers []runtime.SerializerInfo
for _, s := range negotiatedSerializer.SupportedMediaTypes() {
if s.MediaType == runtime.ContentTypeProtobuf {
continue
}
standardSerializers = append(standardSerializers, s)
}
requestScopes[v.Name] = &handlers.RequestScope{
Namer: handlers.ContextBasedNaming{
SelfLinker: meta.NewAccessor(),
ClusterScoped: clusterScoped,
SelfLinkPathPrefix: selfLinkPrefix,
},
Serializer: unstructuredNegotiatedSerializer{
typer: typer,
creator: creator,
converter: safeConverter,
structuralSchemas: structuralSchemas,
structuralSchemaGK: kind.GroupKind(),
preserveUnknownFields: *crd.Spec.PreserveUnknownFields,
},
ParameterCodec: parameterCodec,
Serializer: negotiatedSerializer,
ParameterCodec: parameterCodec,
StandardSerializers: standardSerializers,
Creater: creator,
Convertor: safeConverter,
@ -762,6 +774,16 @@ func (s unstructuredNegotiatedSerializer) SupportedMediaTypes() []runtime.Serial
EncodesAsText: true,
Serializer: json.NewYAMLSerializer(json.DefaultMetaFactory, s.creator, s.typer),
},
{
MediaType: "application/vnd.kubernetes.protobuf",
MediaTypeType: "application",
MediaTypeSubType: "vnd.kubernetes.protobuf",
Serializer: protobuf.NewSerializer(s.creator, s.typer),
StreamSerializer: &runtime.StreamSerializerInfo{
Serializer: protobuf.NewRawSerializer(s.creator, s.typer),
Framer: protobuf.LengthDelimitedFramer,
},
},
}
}

View File

@ -394,7 +394,11 @@ func NewGenericServerResponse(code int, verb string, qualifiedResource schema.Gr
case http.StatusNotAcceptable:
reason = metav1.StatusReasonNotAcceptable
// the server message has details about what types are acceptable
message = serverMessage
if len(serverMessage) == 0 || serverMessage == "unknown" {
message = "the server was unable to respond with a content type that the client supports"
} else {
message = serverMessage
}
case http.StatusUnsupportedMediaType:
reason = metav1.StatusReasonUnsupportedMediaType
// the server message has details about what types are acceptable

View File

@ -69,8 +69,6 @@ func IsNotMarshalable(err error) bool {
// NewSerializer creates a Protobuf serializer that handles encoding versioned objects into the proper wire form. If a typer
// is passed, the encoded object will have group, version, and kind fields set. If typer is nil, the objects will be written
// as-is (any type info passed with the object will be used).
//
// This encoding scheme is experimental, and is subject to change at any time.
func NewSerializer(creater runtime.ObjectCreater, typer runtime.ObjectTyper) *Serializer {
return &Serializer{
prefix: protoEncodingPrefix,

View File

@ -116,9 +116,10 @@ func isPrettyPrint(req *http.Request) bool {
// EndpointRestrictions is an interface that allows content-type negotiation
// to verify server support for specific options
type EndpointRestrictions interface {
// AllowsConversion should return true if the specified group version kind
// is an allowed target object.
AllowsConversion(target schema.GroupVersionKind, mimeType, mimeSubType string) bool
// AllowsMediaTypeTransform returns true if the endpoint allows either the requested mime type
// or the requested transformation. If false, the caller should ignore this mime type. If the
// target is nil, the client is not requesting a transformation.
AllowsMediaTypeTransform(mimeType, mimeSubType string, target *schema.GroupVersionKind) bool
// AllowsServerVersion should return true if the specified version is valid
// for the server group.
AllowsServerVersion(version string) bool
@ -133,8 +134,8 @@ var DefaultEndpointRestrictions = emptyEndpointRestrictions{}
type emptyEndpointRestrictions struct{}
func (emptyEndpointRestrictions) AllowsConversion(schema.GroupVersionKind, string, string) bool {
return false
func (emptyEndpointRestrictions) AllowsMediaTypeTransform(mimeType string, mimeSubType string, gvk *schema.GroupVersionKind) bool {
return gvk == nil
}
func (emptyEndpointRestrictions) AllowsServerVersion(string) bool { return false }
func (emptyEndpointRestrictions) AllowsStreamSchema(s string) bool { return s == "watch" }
@ -225,7 +226,7 @@ func acceptMediaTypeOptions(params map[string]string, accepts *runtime.Serialize
}
}
if options.Convert != nil && !endpoint.AllowsConversion(*options.Convert, accepts.MediaTypeType, accepts.MediaTypeSubType) {
if !endpoint.AllowsMediaTypeTransform(accepts.MediaTypeType, accepts.MediaTypeSubType, options.Convert) {
return MediaTypeOptions{}, false
}

View File

@ -50,6 +50,11 @@ type RequestScope struct {
Serializer runtime.NegotiatedSerializer
runtime.ParameterCodec
// StandardSerializers, if set, restricts which serializers can be used when
// we aren't transforming the output (into Table or PartialObjectMetadata).
// Used only by CRDs which do not yet support Protobuf.
StandardSerializers []runtime.SerializerInfo
Creater runtime.ObjectCreater
Convertor runtime.ObjectConvertor
Defaulter runtime.ObjectDefaulter
@ -78,7 +83,21 @@ func (scope *RequestScope) err(err error, w http.ResponseWriter, req *http.Reque
responsewriters.ErrorNegotiated(err, scope.Serializer, scope.Kind.GroupVersion(), w, req)
}
func (scope *RequestScope) AllowsConversion(gvk schema.GroupVersionKind, mimeType, mimeSubType string) bool {
func (scope *RequestScope) AllowsMediaTypeTransform(mimeType, mimeSubType string, gvk *schema.GroupVersionKind) bool {
// some handlers like CRDs can't serve all the mime types that PartialObjectMetadata or Table can - if
// gvk is nil (no conversion) allow StandardSerializers to further restrict the set of mime types.
if gvk == nil {
if len(scope.StandardSerializers) == 0 {
return true
}
for _, info := range scope.StandardSerializers {
if info.MediaTypeType == mimeType && info.MediaTypeSubType == mimeSubType {
return true
}
}
return false
}
// TODO: this is temporary, replace with an abstraction calculated at endpoint installation time
if gvk.GroupVersion() == metav1beta1.SchemeGroupVersion || gvk.GroupVersion() == metav1.SchemeGroupVersion {
switch gvk.Kind {

View File

@ -452,7 +452,7 @@ func TestAPICRDProtobuf(t *testing.T) {
wantBody func(*testing.T, io.Reader)
}{
{
name: "server returns 406 when asking for protobuf for CRDs",
name: "server returns 406 when asking for protobuf for CRDs, which dynamic client does not support",
accept: "application/vnd.kubernetes.protobuf",
object: func(t *testing.T) (metav1.Object, string, string) {
cr, err := crclient.Create(&unstructured.Unstructured{Object: map[string]interface{}{"apiVersion": "cr.bar.com/v1", "kind": "Foo", "metadata": map[string]interface{}{"name": "test-1"}}}, metav1.CreateOptions{})
@ -468,9 +468,15 @@ func TestAPICRDProtobuf(t *testing.T) {
if !apierrors.IsNotAcceptable(err) {
t.Fatal(err)
}
// TODO: this should be a more specific error
if err.Error() != "only the following media types are accepted: application/json, application/yaml" {
t.Fatal(err)
status := err.(apierrors.APIStatus).Status()
data, _ := json.MarshalIndent(status, "", " ")
// because the dynamic client only has a json serializer, the client processing of the error cannot
// turn the response into something meaningful, so we verify that fallback handling works correctly
if !apierrors.IsUnexpectedServerError(err) {
t.Fatal(string(data))
}
if status.Message != "the server was unable to respond with a content type that the client supports (get foos.cr.bar.com test-1)" {
t.Fatal(string(data))
}
},
},
@ -658,7 +664,7 @@ func TestTransform(t *testing.T) {
t.Fatal(err)
}
// TODO: this should be a more specific error
if err.Error() != "only the following media types are accepted: application/json;stream=watch" {
if err.Error() != "only the following media types are accepted: application/json;stream=watch, application/vnd.kubernetes.protobuf;stream=watch" {
t.Fatal(err)
}
},
@ -762,6 +768,23 @@ func TestTransform(t *testing.T) {
expectPartialObjectMetaEventsProtobuf(t, w, "0", "1")
},
},
{
name: "v1beta1 verify partial metadata object on CRDs in protobuf",
accept: "application/vnd.kubernetes.protobuf;as=PartialObjectMetadata;g=meta.k8s.io;v=v1beta1",
object: func(t *testing.T) (metav1.Object, string, string) {
cr, err := crclient.Create(&unstructured.Unstructured{Object: map[string]interface{}{"apiVersion": "cr.bar.com/v1", "kind": "Foo", "metadata": map[string]interface{}{"name": "test-4", "annotations": map[string]string{"test": "0"}}}}, metav1.CreateOptions{})
if err != nil {
t.Fatalf("unable to create cr: %v", err)
}
if _, err := crclient.Patch("test-4", types.MergePatchType, []byte(`{"metadata":{"annotations":{"test":"1"}}}`), metav1.PatchOptions{}); err != nil {
t.Fatalf("unable to patch cr: %v", err)
}
return cr, crdGVR.Group, "foos"
},
wantBody: func(t *testing.T, w io.Reader) {
expectPartialObjectMetaEventsProtobuf(t, w, "0", "1")
},
},
{
name: "v1beta1 verify error on unsupported mimetype protobuf for table conversion",
accept: "application/vnd.kubernetes.protobuf;as=Table;g=meta.k8s.io;v=v1beta1",
@ -853,23 +876,6 @@ func TestTransform(t *testing.T) {
{
name: "v1 verify columns on CRDs in json",
accept: "application/json;as=Table;g=meta.k8s.io;v=v1",
object: func(t *testing.T) (metav1.Object, string, string) {
cr, err := crclient.Create(&unstructured.Unstructured{Object: map[string]interface{}{"apiVersion": "cr.bar.com/v1", "kind": "Foo", "metadata": map[string]interface{}{"name": "test-4"}}}, metav1.CreateOptions{})
if err != nil {
t.Fatalf("unable to create cr: %v", err)
}
if _, err := crclient.Patch("test-4", types.MergePatchType, []byte(`{"metadata":{"annotations":{"test":"1"}}}`), metav1.PatchOptions{}); err != nil {
t.Fatalf("unable to patch cr: %v", err)
}
return cr, crdGVR.Group, "foos"
},
wantBody: func(t *testing.T, w io.Reader) {
expectTableV1WatchEvents(t, 2, 2, metav1.IncludeMetadata, json.NewDecoder(w))
},
},
{
name: "v1 verify columns on CRDs in json;stream=watch",
accept: "application/json;stream=watch;as=Table;g=meta.k8s.io;v=v1",
object: func(t *testing.T) (metav1.Object, string, string) {
cr, err := crclient.Create(&unstructured.Unstructured{Object: map[string]interface{}{"apiVersion": "cr.bar.com/v1", "kind": "Foo", "metadata": map[string]interface{}{"name": "test-5"}}}, metav1.CreateOptions{})
if err != nil {
@ -885,8 +891,8 @@ func TestTransform(t *testing.T) {
},
},
{
name: "v1 verify columns on CRDs in yaml",
accept: "application/yaml;as=Table;g=meta.k8s.io;v=v1",
name: "v1 verify columns on CRDs in json;stream=watch",
accept: "application/json;stream=watch;as=Table;g=meta.k8s.io;v=v1",
object: func(t *testing.T) (metav1.Object, string, string) {
cr, err := crclient.Create(&unstructured.Unstructured{Object: map[string]interface{}{"apiVersion": "cr.bar.com/v1", "kind": "Foo", "metadata": map[string]interface{}{"name": "test-6"}}}, metav1.CreateOptions{})
if err != nil {
@ -897,12 +903,29 @@ func TestTransform(t *testing.T) {
}
return cr, crdGVR.Group, "foos"
},
wantBody: func(t *testing.T, w io.Reader) {
expectTableV1WatchEvents(t, 2, 2, metav1.IncludeMetadata, json.NewDecoder(w))
},
},
{
name: "v1 verify columns on CRDs in yaml",
accept: "application/yaml;as=Table;g=meta.k8s.io;v=v1",
object: func(t *testing.T) (metav1.Object, string, string) {
cr, err := crclient.Create(&unstructured.Unstructured{Object: map[string]interface{}{"apiVersion": "cr.bar.com/v1", "kind": "Foo", "metadata": map[string]interface{}{"name": "test-7"}}}, metav1.CreateOptions{})
if err != nil {
t.Fatalf("unable to create cr: %v", err)
}
if _, err := crclient.Patch("test-7", types.MergePatchType, []byte(`{"metadata":{"annotations":{"test":"1"}}}`), metav1.PatchOptions{}); err != nil {
t.Fatalf("unable to patch cr: %v", err)
}
return cr, crdGVR.Group, "foos"
},
wantErr: func(t *testing.T, err error) {
if !apierrors.IsNotAcceptable(err) {
t.Fatal(err)
}
// TODO: this should be a more specific error
if err.Error() != "only the following media types are accepted: application/json;stream=watch" {
if err.Error() != "only the following media types are accepted: application/json;stream=watch, application/vnd.kubernetes.protobuf;stream=watch" {
t.Fatal(err)
}
},
@ -912,7 +935,7 @@ func TestTransform(t *testing.T) {
accept: "application/json;as=Table;g=meta.k8s.io;v=v1",
object: func(t *testing.T) (metav1.Object, string, string) {
ns := "default"
svc, err := clientset.CoreV1().Services(ns).Create(&v1.Service{ObjectMeta: metav1.ObjectMeta{Name: "test-4"}, Spec: v1.ServiceSpec{Ports: []v1.ServicePort{{Port: 1000}}}})
svc, err := clientset.CoreV1().Services(ns).Create(&v1.Service{ObjectMeta: metav1.ObjectMeta{Name: "test-5"}, Spec: v1.ServiceSpec{Ports: []v1.ServicePort{{Port: 1000}}}})
if err != nil {
t.Fatalf("unable to create service: %v", err)
}
@ -931,7 +954,7 @@ func TestTransform(t *testing.T) {
includeObject: metav1.IncludeNone,
object: func(t *testing.T) (metav1.Object, string, string) {
ns := "default"
obj, err := clientset.CoreV1().Services(ns).Create(&v1.Service{ObjectMeta: metav1.ObjectMeta{Name: "test-5"}, Spec: v1.ServiceSpec{Ports: []v1.ServicePort{{Port: 1000}}}})
obj, err := clientset.CoreV1().Services(ns).Create(&v1.Service{ObjectMeta: metav1.ObjectMeta{Name: "test-6"}, Spec: v1.ServiceSpec{Ports: []v1.ServicePort{{Port: 1000}}}})
if err != nil {
t.Fatalf("unable to create object: %v", err)
}
@ -950,7 +973,7 @@ func TestTransform(t *testing.T) {
includeObject: metav1.IncludeObject,
object: func(t *testing.T) (metav1.Object, string, string) {
ns := "default"
obj, err := clientset.CoreV1().Services(ns).Create(&v1.Service{ObjectMeta: metav1.ObjectMeta{Name: "test-6"}, Spec: v1.ServiceSpec{Ports: []v1.ServicePort{{Port: 1000}}}})
obj, err := clientset.CoreV1().Services(ns).Create(&v1.Service{ObjectMeta: metav1.ObjectMeta{Name: "test-7"}, Spec: v1.ServiceSpec{Ports: []v1.ServicePort{{Port: 1000}}}})
if err != nil {
t.Fatalf("unable to create object: %v", err)
}
@ -1006,6 +1029,23 @@ func TestTransform(t *testing.T) {
expectPartialObjectMetaV1EventsProtobuf(t, w, "0", "1")
},
},
{
name: "v1 verify partial metadata object on CRDs in protobuf",
accept: "application/vnd.kubernetes.protobuf;as=PartialObjectMetadata;g=meta.k8s.io;v=v1",
object: func(t *testing.T) (metav1.Object, string, string) {
cr, err := crclient.Create(&unstructured.Unstructured{Object: map[string]interface{}{"apiVersion": "cr.bar.com/v1", "kind": "Foo", "metadata": map[string]interface{}{"name": "test-8", "annotations": map[string]string{"test": "0"}}}}, metav1.CreateOptions{})
if err != nil {
t.Fatalf("unable to create cr: %v", err)
}
if _, err := crclient.Patch(cr.GetName(), types.MergePatchType, []byte(`{"metadata":{"annotations":{"test":"1"}}}`), metav1.PatchOptions{}); err != nil {
t.Fatalf("unable to patch cr: %v", err)
}
return cr, crdGVR.Group, "foos"
},
wantBody: func(t *testing.T, w io.Reader) {
expectPartialObjectMetaV1EventsProtobuf(t, w, "0", "1")
},
},
{
name: "v1 verify error on unsupported mimetype protobuf for table conversion",
accept: "application/vnd.kubernetes.protobuf;as=Table;g=meta.k8s.io;v=v1",
@ -1046,6 +1086,18 @@ func TestTransform(t *testing.T) {
}
},
},
{
name: "v1 verify error on invalid mimetype - only meta kinds accepted",
accept: "application/json;as=Service;g=;v=v1",
object: func(t *testing.T) (metav1.Object, string, string) {
return &metav1.ObjectMeta{Name: "kubernetes", Namespace: "default"}, "", "services"
},
wantErr: func(t *testing.T, err error) {
if !apierrors.IsNotAcceptable(err) {
t.Fatal(err)
}
},
},
{
name: "v1 verify error on invalid mimetype - missing kind",
accept: "application/json;g=meta.k8s.io;v=v1",