diff --git a/cmd/kube-apiserver/app/options/options.go b/cmd/kube-apiserver/app/options/options.go index e6af7c7de92..b36dd3f8ed0 100644 --- a/cmd/kube-apiserver/app/options/options.go +++ b/cmd/kube-apiserver/app/options/options.go @@ -44,6 +44,7 @@ type APIServer struct { AuthorizationMode string AuthorizationConfig apiserver.AuthorizationConfig BasicAuthFile string + DefaultStorageMediaType string DeleteCollectionWorkers int DeprecatedStorageVersion string EtcdServersOverrides []string @@ -76,6 +77,7 @@ func NewAPIServer() *APIServer { ServerRunOptions: genericapiserver.NewServerRunOptions(), AdmissionControl: "AlwaysAdmit", AuthorizationMode: "AlwaysAllow", + DefaultStorageMediaType: "application/json", DeleteCollectionWorkers: 1, EventTTL: 1 * time.Hour, MasterServiceNamespace: api.NamespaceDefault, @@ -153,6 +155,7 @@ func (s *APIServer) AddFlags(fs *pflag.FlagSet) { "In the case where objects are moved from one group to the other, you may specify the format \"group1=group2/v1beta1,group3/v1beta1,...\". "+ "You only need to pass the groups you wish to change from the defaults. "+ "It defaults to a list of preferred versions of all registered groups, which is derived from the KUBE_API_VERSIONS environment variable.") + fs.StringVar(&s.DefaultStorageMediaType, "storage-media-type", s.DefaultStorageMediaType, "The media type to use to store objects in storage. Defaults to application/json. Some resources may only support a specific media type and will ignore this setting.") fs.DurationVar(&s.EventTTL, "event-ttl", s.EventTTL, "Amount of time to retain events. Default 1 hour.") fs.StringVar(&s.BasicAuthFile, "basic-auth-file", s.BasicAuthFile, "If set, the file that will be used to admit requests to the secure port of the API server via http basic authentication.") fs.StringVar(&s.TokenAuthFile, "token-auth-file", s.TokenAuthFile, "If set, the file that will be used to secure the secure port of the API server via token authentication.") diff --git a/cmd/kube-apiserver/app/server.go b/cmd/kube-apiserver/app/server.go index 32a26d4781f..955e073f96d 100644 --- a/cmd/kube-apiserver/app/server.go +++ b/cmd/kube-apiserver/app/server.go @@ -167,7 +167,9 @@ func Run(s *options.APIServer) error { resourceEncoding.SetVersionEncoding(group, storageEncodingVersion, unversioned.GroupVersion{Group: group, Version: runtime.APIVersionInternal}) } - storageFactory := genericapiserver.NewDefaultStorageFactory(s.StorageConfig, api.Codecs, resourceEncoding, apiResourceConfigSource) + storageFactory := genericapiserver.NewDefaultStorageFactory(s.StorageConfig, s.DefaultStorageMediaType, api.Codecs, resourceEncoding, apiResourceConfigSource) + // third party resources are always serialized to storage using JSON + storageFactory.SetSerializer(extensions.Resource("thirdpartyresources"), "application/json", nil) storageFactory.AddCohabitatingResources(batch.Resource("jobs"), extensions.Resource("jobs")) storageFactory.AddCohabitatingResources(autoscaling.Resource("horizontalpodautoscalers"), extensions.Resource("horizontalpodautoscalers")) for _, override := range s.EtcdServersOverrides { diff --git a/docs/admin/kube-apiserver.md b/docs/admin/kube-apiserver.md index e4c0c5af6db..b79e2b765c0 100644 --- a/docs/admin/kube-apiserver.md +++ b/docs/admin/kube-apiserver.md @@ -110,6 +110,7 @@ kube-apiserver --ssh-keyfile="": If non-empty, use secure SSH proxy to the nodes, using this user keyfile --ssh-user="": If non-empty, use secure SSH proxy to the nodes, using this user name --storage-backend="": The storage backend for persistence. Options: 'etcd2' (default), 'etcd3'. + --storage-media-type="application/json": The media type to use to store objects in storage. Defaults to application/json. Some resources may only support a specific media type and will ignore this setting. --storage-versions="apps/v1alpha1,authorization.k8s.io/v1beta1,autoscaling/v1,batch/v1,componentconfig/v1alpha1,extensions/v1beta1,metrics/v1alpha1,v1": The per-group version to store resources in. Specified in the format "group1/version1,group2/version2,...". In the case where objects are moved from one group to the other, you may specify the format "group1=group2/v1beta1,group3/v1beta1,...". You only need to pass the groups you wish to change from the defaults. It defaults to a list of preferred versions of all registered groups, which is derived from the KUBE_API_VERSIONS environment variable. --tls-cert-file="": File containing x509 Certificate for HTTPS. (CA cert, if any, concatenated after server cert). If HTTPS serving is enabled, and --tls-cert-file and --tls-private-key-file are not provided, a self-signed certificate and key are generated for the public address and saved to /var/run/kubernetes. --tls-private-key-file="": File containing x509 private key matching --tls-cert-file. @@ -118,7 +119,7 @@ kube-apiserver --watch-cache-sizes=[]: List of watch cache sizes for every resource (pods, nodes, etc.), comma separated. The individual override format: resource#size, where size is a number. It takes effect when watch-cache is enabled. ``` -###### Auto generated by spf13/cobra on 28-Apr-2016 +###### Auto generated by spf13/cobra on 30-Apr-2016 diff --git a/examples/apiserver/apiserver.go b/examples/apiserver/apiserver.go index 9a18da3892f..7aeee2f6752 100644 --- a/examples/apiserver/apiserver.go +++ b/examples/apiserver/apiserver.go @@ -45,7 +45,7 @@ func newStorageFactory() genericapiserver.StorageFactory { Prefix: genericapiserver.DefaultEtcdPathPrefix, ServerList: []string{"http://127.0.0.1:4001"}, } - storageFactory := genericapiserver.NewDefaultStorageFactory(config, api.Codecs, genericapiserver.NewDefaultResourceEncodingConfig(), genericapiserver.NewResourceConfig()) + storageFactory := genericapiserver.NewDefaultStorageFactory(config, "application/json", api.Codecs, genericapiserver.NewDefaultResourceEncodingConfig(), genericapiserver.NewResourceConfig()) return storageFactory } diff --git a/federation/cmd/federated-apiserver/app/options/options.go b/federation/cmd/federated-apiserver/app/options/options.go index e6af7c7de92..b36dd3f8ed0 100644 --- a/federation/cmd/federated-apiserver/app/options/options.go +++ b/federation/cmd/federated-apiserver/app/options/options.go @@ -44,6 +44,7 @@ type APIServer struct { AuthorizationMode string AuthorizationConfig apiserver.AuthorizationConfig BasicAuthFile string + DefaultStorageMediaType string DeleteCollectionWorkers int DeprecatedStorageVersion string EtcdServersOverrides []string @@ -76,6 +77,7 @@ func NewAPIServer() *APIServer { ServerRunOptions: genericapiserver.NewServerRunOptions(), AdmissionControl: "AlwaysAdmit", AuthorizationMode: "AlwaysAllow", + DefaultStorageMediaType: "application/json", DeleteCollectionWorkers: 1, EventTTL: 1 * time.Hour, MasterServiceNamespace: api.NamespaceDefault, @@ -153,6 +155,7 @@ func (s *APIServer) AddFlags(fs *pflag.FlagSet) { "In the case where objects are moved from one group to the other, you may specify the format \"group1=group2/v1beta1,group3/v1beta1,...\". "+ "You only need to pass the groups you wish to change from the defaults. "+ "It defaults to a list of preferred versions of all registered groups, which is derived from the KUBE_API_VERSIONS environment variable.") + fs.StringVar(&s.DefaultStorageMediaType, "storage-media-type", s.DefaultStorageMediaType, "The media type to use to store objects in storage. Defaults to application/json. Some resources may only support a specific media type and will ignore this setting.") fs.DurationVar(&s.EventTTL, "event-ttl", s.EventTTL, "Amount of time to retain events. Default 1 hour.") fs.StringVar(&s.BasicAuthFile, "basic-auth-file", s.BasicAuthFile, "If set, the file that will be used to admit requests to the secure port of the API server via http basic authentication.") fs.StringVar(&s.TokenAuthFile, "token-auth-file", s.TokenAuthFile, "If set, the file that will be used to secure the secure port of the API server via token authentication.") diff --git a/federation/cmd/federated-apiserver/app/server.go b/federation/cmd/federated-apiserver/app/server.go index 0dc67da4271..41d66a1da78 100644 --- a/federation/cmd/federated-apiserver/app/server.go +++ b/federation/cmd/federated-apiserver/app/server.go @@ -76,7 +76,7 @@ func Run(s *options.APIServer) error { resourceEncoding.SetVersionEncoding(group, storageEncodingVersion, unversioned.GroupVersion{Group: group, Version: runtime.APIVersionInternal}) } - storageFactory := genericapiserver.NewDefaultStorageFactory(s.StorageConfig, api.Codecs, resourceEncoding, apiResourceConfigSource) + storageFactory := genericapiserver.NewDefaultStorageFactory(s.StorageConfig, s.DefaultStorageMediaType, api.Codecs, resourceEncoding, apiResourceConfigSource) for _, override := range s.EtcdServersOverrides { tokens := strings.Split(override, "#") if len(tokens) != 2 { diff --git a/hack/test-cmd.sh b/hack/test-cmd.sh index 31d7b098a11..61dd2119962 100755 --- a/hack/test-cmd.sh +++ b/hack/test-cmd.sh @@ -192,6 +192,7 @@ ADMISSION_CONTROL="NamespaceLifecycle,LimitRanger,ResourceQuota" --public-address-override="127.0.0.1" \ --kubelet-port=${KUBELET_PORT} \ --runtime-config=api/v1 \ + --storage-media-type="${KUBE_TEST_API_STORAGE_TYPE-}" \ --cert-dir="${TMPDIR:-/tmp/}" \ --service-cluster-ip-range="10.0.0.0/24" 1>&2 & APISERVER_PID=$! @@ -202,6 +203,7 @@ kube::util::wait_for_url "http://127.0.0.1:${API_PORT}/healthz" "apiserver" kube::log::status "Starting controller-manager" "${KUBE_OUTPUT_HOSTBIN}/kube-controller-manager" \ --port="${CTLRMGR_PORT}" \ + --kube-api-content-type="${KUBE_TEST_API_TYPE-}" \ --master="127.0.0.1:${API_PORT}" 1>&2 & CTLRMGR_PID=$! diff --git a/hack/verify-flags/known-flags.txt b/hack/verify-flags/known-flags.txt index 8ca24b50835..9cb1530c230 100644 --- a/hack/verify-flags/known-flags.txt +++ b/hack/verify-flags/known-flags.txt @@ -394,6 +394,7 @@ static-pods-config stats-port stop-services storage-backend +storage-media-type storage-version storage-versions streaming-connection-idle-timeout diff --git a/pkg/api/serialization_proto_test.go b/pkg/api/serialization_proto_test.go index b3ed68c5ae4..dd1c63585dd 100644 --- a/pkg/api/serialization_proto_test.go +++ b/pkg/api/serialization_proto_test.go @@ -17,6 +17,7 @@ limitations under the License. package api_test import ( + "bytes" "encoding/hex" "math/rand" "testing" @@ -37,10 +38,32 @@ import ( func init() { codecsToTest = append(codecsToTest, func(version unversioned.GroupVersion, item runtime.Object) (runtime.Codec, error) { s := protobuf.NewSerializer(api.Scheme, runtime.ObjectTyperToTyper(api.Scheme), "application/arbitrary.content.type") - return api.Codecs.CodecForVersions(s, testapi.ExternalGroupVersions(), nil), nil + return api.Codecs.CodecForVersions(s, s, testapi.ExternalGroupVersions(), nil), nil }) } +func TestUniversalDeserializer(t *testing.T) { + expected := &v1.Pod{ObjectMeta: v1.ObjectMeta{Name: "test"}} + d := api.Codecs.UniversalDeserializer() + for _, mediaType := range []string{"application/json", "application/yaml", "application/vnd.kubernetes.protobuf"} { + e, ok := api.Codecs.SerializerForMediaType(mediaType, nil) + if !ok { + t.Fatal(mediaType) + } + buf := &bytes.Buffer{} + if err := e.EncodeToStream(expected, buf); err != nil { + t.Fatalf("%s: %v", mediaType, err) + } + obj, _, err := d.Decode(buf.Bytes(), &unversioned.GroupVersionKind{Kind: "Pod", Version: "v1"}, nil) + if err != nil { + t.Fatalf("%s: %v", mediaType, err) + } + if !api.Semantic.DeepEqual(expected, obj) { + t.Fatalf("%s: %#v", mediaType, obj) + } + } +} + func TestProtobufRoundTrip(t *testing.T) { obj := &v1.Pod{} apitesting.FuzzerFor(t, v1.SchemeGroupVersion, rand.NewSource(benchmarkSeed)).Fuzz(obj) diff --git a/pkg/api/serialization_test.go b/pkg/api/serialization_test.go index 8cdbf664d83..e52e965bd11 100644 --- a/pkg/api/serialization_test.go +++ b/pkg/api/serialization_test.go @@ -284,18 +284,14 @@ func TestObjectWatchFraming(t *testing.T) { converted, _ := api.Scheme.ConvertToVersion(secret, "v1") v1secret := converted.(*v1.Secret) for _, streamingMediaType := range api.Codecs.SupportedStreamingMediaTypes() { - s, framer, mediaType, _ := api.Codecs.StreamingSerializerForMediaType(streamingMediaType, nil) - // TODO: remove this when the runtime.SerializerInfo PR lands - if mediaType == "application/vnd.kubernetes.protobuf;stream=watch" { - mediaType = "application/vnd.kubernetes.protobuf" - } - embedded, ok := api.Codecs.SerializerForMediaType(mediaType, nil) - if !ok { - t.Logf("no embedded serializer for %s", mediaType) - embedded = s + s, _ := api.Codecs.StreamingSerializerForMediaType(streamingMediaType, nil) + framer := s.Framer + embedded := s.Embedded.Serializer + if embedded == nil { + t.Errorf("no embedded serializer for %s", streamingMediaType) + continue } innerDecode := api.Codecs.DecoderToVersion(embedded, api.SchemeGroupVersion) - //innerEncode := api.Codecs.EncoderForVersion(embedded, api.SchemeGroupVersion) // write a single object through the framer and back out obj := &bytes.Buffer{} diff --git a/pkg/api/testapi/testapi.go b/pkg/api/testapi/testapi.go index 8bec17e33e3..2f1b2509dde 100644 --- a/pkg/api/testapi/testapi.go +++ b/pkg/api/testapi/testapi.go @@ -19,6 +19,7 @@ package testapi import ( "fmt" + "mime" "os" "reflect" "strings" @@ -33,6 +34,7 @@ import ( "k8s.io/kubernetes/pkg/apis/batch" "k8s.io/kubernetes/pkg/apis/extensions" "k8s.io/kubernetes/pkg/runtime" + "k8s.io/kubernetes/pkg/runtime/serializer/recognizer" _ "k8s.io/kubernetes/federation/apis/federation/install" _ "k8s.io/kubernetes/pkg/api/install" @@ -45,14 +47,16 @@ import ( ) var ( - Groups = make(map[string]TestGroup) - Default TestGroup - Autoscaling TestGroup - Batch TestGroup - Extensions TestGroup - Apps TestGroup - Federation TestGroup - NegotiatedSerializer = api.Codecs + Groups = make(map[string]TestGroup) + Default TestGroup + Autoscaling TestGroup + Batch TestGroup + Extensions TestGroup + Apps TestGroup + Federation TestGroup + + serializer runtime.SerializerInfo + storageSerializer runtime.SerializerInfo ) type TestGroup struct { @@ -62,6 +66,30 @@ type TestGroup struct { } func init() { + if apiMediaType := os.Getenv("KUBE_TEST_API_TYPE"); len(apiMediaType) > 0 { + var ok bool + mediaType, options, err := mime.ParseMediaType(apiMediaType) + if err != nil { + panic(err) + } + serializer, ok = api.Codecs.SerializerForMediaType(mediaType, options) + if !ok { + panic(fmt.Sprintf("no serializer for %s", apiMediaType)) + } + } + + if storageMediaType := StorageMediaType(); len(storageMediaType) > 0 { + var ok bool + mediaType, options, err := mime.ParseMediaType(storageMediaType) + if err != nil { + panic(err) + } + storageSerializer, ok = api.Codecs.SerializerForMediaType(mediaType, options) + if !ok { + panic(fmt.Sprintf("no serializer for %s", storageMediaType)) + } + } + kubeTestAPI := os.Getenv("KUBE_TEST_API") if len(kubeTestAPI) != 0 { testGroupVersions := strings.Split(kubeTestAPI, ",") @@ -173,9 +201,40 @@ func (g TestGroup) InternalTypes() map[string]reflect.Type { } // Codec returns the codec for the API version to test against, as set by the -// KUBE_TEST_API env var. +// KUBE_TEST_API_TYPE env var. func (g TestGroup) Codec() runtime.Codec { - return api.Codecs.LegacyCodec(g.externalGroupVersion) + if serializer.Serializer == nil { + return api.Codecs.LegacyCodec(g.externalGroupVersion) + } + return api.Codecs.CodecForVersions(serializer, api.Codecs.UniversalDeserializer(), []unversioned.GroupVersion{g.externalGroupVersion}, nil) +} + +// NegotiatedSerializer returns the negotiated serializer for the server. +func (g TestGroup) NegotiatedSerializer() runtime.NegotiatedSerializer { + return api.Codecs +} + +func StorageMediaType() string { + return os.Getenv("KUBE_TEST_API_STORAGE_TYPE") +} + +// StorageCodec returns the codec for the API version to store in etcd, as set by the +// KUBE_TEST_API_STORAGE_TYPE env var. +func (g TestGroup) StorageCodec() runtime.Codec { + s := storageSerializer.Serializer + + if s == nil { + return api.Codecs.LegacyCodec(g.externalGroupVersion) + } + + // etcd2 only supports string data - we must wrap any result before returning + // TODO: remove for etcd3 / make parameterizable + if !storageSerializer.EncodesAsText { + s = runtime.NewBase64Serializer(s) + } + ds := recognizer.NewDecoder(s, api.Codecs.UniversalDeserializer()) + + return api.Codecs.CodecForVersions(s, ds, []unversioned.GroupVersion{g.externalGroupVersion}, nil) } // Converter returns the api.Scheme for the API version to test against, as set by the diff --git a/pkg/api/unversioned/generated.proto b/pkg/api/unversioned/generated.proto index aaa42ec8415..def4a6d6f65 100644 --- a/pkg/api/unversioned/generated.proto +++ b/pkg/api/unversioned/generated.proto @@ -21,6 +21,7 @@ syntax = 'proto2'; package k8s.io.kubernetes.pkg.api.unversioned; +import "k8s.io/kubernetes/pkg/runtime/generated.proto"; import "k8s.io/kubernetes/pkg/util/intstr/generated.proto"; // Package-wide variables from generator "generated". diff --git a/pkg/apis/extensions/v1beta1/generated.pb.go b/pkg/apis/extensions/v1beta1/generated.pb.go index bcf1f28ec3d..7e231b95e4f 100644 --- a/pkg/apis/extensions/v1beta1/generated.pb.go +++ b/pkg/apis/extensions/v1beta1/generated.pb.go @@ -95,6 +95,7 @@ import math "math" import k8s_io_kubernetes_pkg_api_unversioned "k8s.io/kubernetes/pkg/api/unversioned" import k8s_io_kubernetes_pkg_api_v1 "k8s.io/kubernetes/pkg/api/v1" + import k8s_io_kubernetes_pkg_util_intstr "k8s.io/kubernetes/pkg/util/intstr" import io "io" diff --git a/pkg/apis/extensions/v1beta1/generated.proto b/pkg/apis/extensions/v1beta1/generated.proto index 79ed890c064..e47b5ff50af 100644 --- a/pkg/apis/extensions/v1beta1/generated.proto +++ b/pkg/apis/extensions/v1beta1/generated.proto @@ -24,6 +24,7 @@ package k8s.io.kubernetes.pkg.apis.extensions.v1beta1; import "k8s.io/kubernetes/pkg/api/resource/generated.proto"; import "k8s.io/kubernetes/pkg/api/unversioned/generated.proto"; import "k8s.io/kubernetes/pkg/api/v1/generated.proto"; +import "k8s.io/kubernetes/pkg/runtime/generated.proto"; import "k8s.io/kubernetes/pkg/util/intstr/generated.proto"; // Package-wide variables from generator "generated". diff --git a/pkg/apiserver/apiserver.go b/pkg/apiserver/apiserver.go index e68aa851aa6..baa4d3ca1a5 100644 --- a/pkg/apiserver/apiserver.go +++ b/pkg/apiserver/apiserver.go @@ -274,9 +274,16 @@ type StripVersionNegotiatedSerializer struct { runtime.NegotiatedSerializer } -func (n StripVersionNegotiatedSerializer) EncoderForVersion(serializer runtime.Serializer, gv unversioned.GroupVersion) runtime.Encoder { - encoder := n.NegotiatedSerializer.EncoderForVersion(serializer, gv) - return stripVersionEncoder{encoder, serializer} +func (n StripVersionNegotiatedSerializer) EncoderForVersion(encoder runtime.Encoder, gv unversioned.GroupVersion) runtime.Encoder { + serializer, ok := encoder.(runtime.Serializer) + if !ok { + // The stripVersionEncoder needs both an encoder and decoder, but is called from a context that doesn't have access to the + // decoder. We do a best effort cast here (since this code path is only for backwards compatibility) to get access to the caller's + // decoder. + panic(fmt.Sprintf("Unable to extract serializer from %#v", encoder)) + } + versioned := n.NegotiatedSerializer.EncoderForVersion(encoder, gv) + return stripVersionEncoder{versioned, serializer} } func keepUnversioned(group string) bool { @@ -422,14 +429,14 @@ func write(statusCode int, gv unversioned.GroupVersion, s runtime.NegotiatedSeri // writeNegotiated renders an object in the content type negotiated by the client func writeNegotiated(s runtime.NegotiatedSerializer, gv unversioned.GroupVersion, w http.ResponseWriter, req *http.Request, statusCode int, object runtime.Object) { - serializer, contentType, err := negotiateOutputSerializer(req, s) + serializer, err := negotiateOutputSerializer(req, s) if err != nil { status := errToAPIStatus(err) writeRawJSON(int(status.Code), status, w) return } - w.Header().Set("Content-Type", contentType) + w.Header().Set("Content-Type", serializer.MediaType) w.WriteHeader(statusCode) encoder := s.EncoderForVersion(serializer, gv) diff --git a/pkg/apiserver/negotiate.go b/pkg/apiserver/negotiate.go index 3a5a1380663..a9fedfc9af6 100644 --- a/pkg/apiserver/negotiate.go +++ b/pkg/apiserver/negotiate.go @@ -50,31 +50,31 @@ func negotiateOutput(req *http.Request, supported []string) (string, map[string] return mediaType, accept.Params, nil } -func negotiateOutputSerializer(req *http.Request, ns runtime.NegotiatedSerializer) (runtime.Serializer, string, error) { +func negotiateOutputSerializer(req *http.Request, ns runtime.NegotiatedSerializer) (runtime.SerializerInfo, error) { supported := ns.SupportedMediaTypes() mediaType, params, err := negotiateOutput(req, supported) if err != nil { - return nil, "", err + return runtime.SerializerInfo{}, err } if s, ok := ns.SerializerForMediaType(mediaType, params); ok { - return s, mediaType, nil + return s, nil } - return nil, "", errNotAcceptable{supported} + return runtime.SerializerInfo{}, errNotAcceptable{supported} } -func negotiateOutputStreamSerializer(req *http.Request, ns runtime.NegotiatedSerializer) (runtime.Serializer, runtime.Framer, string, string, error) { +func negotiateOutputStreamSerializer(req *http.Request, ns runtime.NegotiatedSerializer) (runtime.StreamSerializerInfo, error) { supported := ns.SupportedMediaTypes() mediaType, params, err := negotiateOutput(req, supported) if err != nil { - return nil, nil, "", "", err + return runtime.StreamSerializerInfo{}, err } - if s, f, exactMediaType, ok := ns.StreamingSerializerForMediaType(mediaType, params); ok { - return s, f, mediaType, exactMediaType, nil + if s, ok := ns.StreamingSerializerForMediaType(mediaType, params); ok { + return s, nil } - return nil, nil, "", "", errNotAcceptable{supported} + return runtime.StreamSerializerInfo{}, errNotAcceptable{supported} } -func negotiateInputSerializer(req *http.Request, s runtime.NegotiatedSerializer) (runtime.Serializer, error) { +func negotiateInputSerializer(req *http.Request, s runtime.NegotiatedSerializer) (runtime.SerializerInfo, error) { supported := s.SupportedMediaTypes() mediaType := req.Header.Get("Content-Type") if len(mediaType) == 0 { @@ -82,11 +82,11 @@ func negotiateInputSerializer(req *http.Request, s runtime.NegotiatedSerializer) } mediaType, options, err := mime.ParseMediaType(mediaType) if err != nil { - return nil, errUnsupportedMediaType{supported} + return runtime.SerializerInfo{}, errUnsupportedMediaType{supported} } out, ok := s.SerializerForMediaType(mediaType, options) if !ok { - return nil, errUnsupportedMediaType{supported} + return runtime.SerializerInfo{}, errUnsupportedMediaType{supported} } return out, nil } diff --git a/pkg/apiserver/negotiate_test.go b/pkg/apiserver/negotiate_test.go index c37137aee07..87ad1a7eee9 100644 --- a/pkg/apiserver/negotiate_test.go +++ b/pkg/apiserver/negotiate_test.go @@ -41,27 +41,34 @@ func (n *fakeNegotiater) SupportedStreamingMediaTypes() []string { return n.streamTypes } -func (n *fakeNegotiater) SerializerForMediaType(mediaType string, options map[string]string) (runtime.Serializer, bool) { +func (n *fakeNegotiater) SerializerForMediaType(mediaType string, options map[string]string) (runtime.SerializerInfo, bool) { n.mediaType = mediaType if len(options) > 0 { n.options = options } - return n.serializer, n.serializer != nil + return runtime.SerializerInfo{Serializer: n.serializer, MediaType: n.mediaType, EncodesAsText: true}, n.serializer != nil } -func (n *fakeNegotiater) StreamingSerializerForMediaType(mediaType string, options map[string]string) (runtime.Serializer, runtime.Framer, string, bool) { +func (n *fakeNegotiater) StreamingSerializerForMediaType(mediaType string, options map[string]string) (runtime.StreamSerializerInfo, bool) { n.streamMediaType = mediaType if len(options) > 0 { n.streamOptions = options } - return n.streamSerializer, n.framer, mediaType, n.streamSerializer != nil + return runtime.StreamSerializerInfo{ + SerializerInfo: runtime.SerializerInfo{ + Serializer: n.serializer, + MediaType: mediaType, + EncodesAsText: true, + }, + Framer: n.framer, + }, n.streamSerializer != nil } -func (n *fakeNegotiater) EncoderForVersion(serializer runtime.Serializer, gv unversioned.GroupVersion) runtime.Encoder { +func (n *fakeNegotiater) EncoderForVersion(serializer runtime.Encoder, gv unversioned.GroupVersion) runtime.Encoder { return n.serializer } -func (n *fakeNegotiater) DecoderToVersion(serializer runtime.Serializer, gv unversioned.GroupVersion) runtime.Decoder { +func (n *fakeNegotiater) DecoderToVersion(serializer runtime.Decoder, gv unversioned.GroupVersion) runtime.Decoder { return n.serializer } @@ -228,7 +235,7 @@ func TestNegotiate(t *testing.T) { req = &http.Request{Header: http.Header{}} req.Header.Set("Accept", test.accept) } - s, contentType, err := negotiateOutputSerializer(req, test.ns) + s, err := negotiateOutputSerializer(req, test.ns) switch { case err == nil && test.errFn != nil: t.Errorf("%d: failed: expected error", i) @@ -251,11 +258,11 @@ func TestNegotiate(t *testing.T) { } continue } - if test.contentType != contentType { - t.Errorf("%d: unexpected %s %s", i, test.contentType, contentType) + if test.contentType != s.MediaType { + t.Errorf("%d: unexpected %s %s", i, test.contentType, s.MediaType) } - if s != test.serializer { - t.Errorf("%d: unexpected %s %s", i, test.serializer, s) + if s.Serializer != test.serializer { + t.Errorf("%d: unexpected %s %s", i, test.serializer, s.Serializer) } if !reflect.DeepEqual(test.params, test.ns.options) { t.Errorf("%d: unexpected %#v %#v", i, test.params, test.ns.options) diff --git a/pkg/apiserver/watch.go b/pkg/apiserver/watch.go index a383314c86c..8094ac9c006 100644 --- a/pkg/apiserver/watch.go +++ b/pkg/apiserver/watch.go @@ -59,47 +59,33 @@ func (w *realTimeoutFactory) TimeoutCh() (<-chan time.Time, func() bool) { return t.C, t.Stop } -type textEncodable interface { - // EncodesAsText should return true if objects should be transmitted as a WebSocket Text - // frame (otherwise, they will be sent as a Binary frame). - EncodesAsText() bool -} - // serveWatch handles serving requests to the server // TODO: the functionality in this method and in WatchServer.Serve is not cleanly decoupled. func serveWatch(watcher watch.Interface, scope RequestScope, req *restful.Request, res *restful.Response, timeout time.Duration) { // negotiate for the stream serializer - serializer, framer, mediaType, exactMediaType, err := negotiateOutputStreamSerializer(req.Request, scope.Serializer) + serializer, err := negotiateOutputStreamSerializer(req.Request, scope.Serializer) if err != nil { scope.err(err, res.ResponseWriter, req.Request) return } - if framer == nil { - scope.err(fmt.Errorf("no framer defined for %q available for embedded encoding", mediaType), res.ResponseWriter, req.Request) + if serializer.Framer == nil { + scope.err(fmt.Errorf("no framer defined for %q available for embedded encoding", serializer.MediaType), res.ResponseWriter, req.Request) return } - encoder := scope.Serializer.EncoderForVersion(serializer, scope.Kind.GroupVersion()) + encoder := scope.Serializer.EncoderForVersion(serializer.Serializer, scope.Kind.GroupVersion()) - useTextFraming := false - if encodable, ok := serializer.(textEncodable); ok && encodable.EncodesAsText() { - useTextFraming = true - } + useTextFraming := serializer.EncodesAsText // find the embedded serializer matching the media type - embeddedSerializer, ok := scope.Serializer.SerializerForMediaType(mediaType, nil) - if !ok { - scope.err(fmt.Errorf("no serializer defined for %q available for embedded encoding", mediaType), res.ResponseWriter, req.Request) - return - } - embeddedEncoder := scope.Serializer.EncoderForVersion(embeddedSerializer, scope.Kind.GroupVersion()) + embeddedEncoder := scope.Serializer.EncoderForVersion(serializer.Embedded.Serializer, scope.Kind.GroupVersion()) server := &WatchServer{ watching: watcher, scope: scope, useTextFraming: useTextFraming, - mediaType: exactMediaType, - framer: framer, + mediaType: serializer.MediaType, + framer: serializer.Framer, encoder: encoder, embeddedEncoder: embeddedEncoder, fixup: func(obj runtime.Object) { diff --git a/pkg/apiserver/watch_test.go b/pkg/apiserver/watch_test.go index 630cc84dc4e..1b25771373f 100644 --- a/pkg/apiserver/watch_test.go +++ b/pkg/apiserver/watch_test.go @@ -222,9 +222,9 @@ func TestWatchRead(t *testing.T) { for _, protocol := range protocols { for _, test := range testCases { - serializer, framer, _, ok := api.Codecs.StreamingSerializerForMediaType(test.MediaType, nil) + serializer, ok := api.Codecs.StreamingSerializerForMediaType(test.MediaType, nil) if !ok { - t.Fatal(framer) + t.Fatal(serializer) } r, contentType := protocol.fn(test.Accept) @@ -235,13 +235,13 @@ func TestWatchRead(t *testing.T) { } objectSerializer, ok := api.Codecs.SerializerForMediaType(test.MediaType, nil) if !ok { - t.Fatal(framer) + t.Fatal(objectSerializer) } objectCodec := api.Codecs.DecoderToVersion(objectSerializer, testInternalGroupVersion) var fr io.ReadCloser = r if !protocol.selfFraming { - fr = framer.NewFrameReader(r) + fr = serializer.Framer.NewFrameReader(r) } d := streaming.NewDecoder(fr, serializer) @@ -495,9 +495,9 @@ func TestWatchHTTPTimeout(t *testing.T) { timeoutCh := make(chan time.Time) done := make(chan struct{}) - _, framer, _, ok := api.Codecs.StreamingSerializerForMediaType("application/json", nil) + serializer, ok := api.Codecs.StreamingSerializerForMediaType("application/json", nil) if !ok { - t.Fatal(framer) + t.Fatal(serializer) } // Setup a new watchserver @@ -505,7 +505,7 @@ func TestWatchHTTPTimeout(t *testing.T) { watching: watcher, mediaType: "testcase/json", - framer: framer, + framer: serializer.Framer, encoder: newCodec, embeddedEncoder: newCodec, diff --git a/pkg/client/restclient/client.go b/pkg/client/restclient/client.go index d14347a0b4c..782797ad767 100644 --- a/pkg/client/restclient/client.go +++ b/pkg/client/restclient/client.go @@ -139,26 +139,23 @@ func readExpBackoffConfig() BackoffManager { func createSerializers(config ContentConfig) (*Serializers, error) { negotiated := config.NegotiatedSerializer contentType := config.ContentType - serializer, ok := negotiated.SerializerForMediaType(contentType, nil) + info, ok := negotiated.SerializerForMediaType(contentType, nil) if !ok { return nil, fmt.Errorf("serializer for %s not registered", contentType) } - streamingSerializer, framer, _, ok := negotiated.StreamingSerializerForMediaType(contentType, nil) + streamInfo, ok := negotiated.StreamingSerializerForMediaType(contentType, nil) if !ok { return nil, fmt.Errorf("streaming serializer for %s not registered", contentType) } - if framer == nil { - return nil, fmt.Errorf("no framer for %s", contentType) - } internalGV := unversioned.GroupVersion{ Group: config.GroupVersion.Group, Version: runtime.APIVersionInternal, } return &Serializers{ - Encoder: negotiated.EncoderForVersion(serializer, *config.GroupVersion), - Decoder: negotiated.DecoderToVersion(serializer, internalGV), - StreamingSerializer: streamingSerializer, - Framer: framer, + Encoder: negotiated.EncoderForVersion(info.Serializer, *config.GroupVersion), + Decoder: negotiated.DecoderToVersion(info.Serializer, internalGV), + StreamingSerializer: streamInfo.Serializer, + Framer: streamInfo.Framer, }, nil } diff --git a/pkg/client/restclient/client_test.go b/pkg/client/restclient/client_test.go index 1fd2c603c95..e1cc1f9fae8 100644 --- a/pkg/client/restclient/client_test.go +++ b/pkg/client/restclient/client_test.go @@ -47,7 +47,7 @@ func TestDoRequestSuccess(t *testing.T) { Host: testServer.URL, ContentConfig: ContentConfig{ GroupVersion: testapi.Default.GroupVersion(), - NegotiatedSerializer: testapi.NegotiatedSerializer, + NegotiatedSerializer: testapi.Default.NegotiatedSerializer(), }, Username: "user", Password: "pass", @@ -92,7 +92,7 @@ func TestDoRequestFailed(t *testing.T) { Host: testServer.URL, ContentConfig: ContentConfig{ GroupVersion: testapi.Default.GroupVersion(), - NegotiatedSerializer: testapi.NegotiatedSerializer, + NegotiatedSerializer: testapi.Default.NegotiatedSerializer(), }, }) if err != nil { @@ -130,7 +130,7 @@ func TestDoRequestCreated(t *testing.T) { Host: testServer.URL, ContentConfig: ContentConfig{ GroupVersion: testapi.Default.GroupVersion(), - NegotiatedSerializer: testapi.NegotiatedSerializer, + NegotiatedSerializer: testapi.Default.NegotiatedSerializer(), }, Username: "user", Password: "pass", diff --git a/pkg/client/restclient/config_test.go b/pkg/client/restclient/config_test.go index 029e64253ba..a9c71e19d81 100644 --- a/pkg/client/restclient/config_test.go +++ b/pkg/client/restclient/config_test.go @@ -87,13 +87,13 @@ func TestSetKubernetesDefaultsUserAgent(t *testing.T) { } func TestRESTClientRequires(t *testing.T) { - if _, err := RESTClientFor(&Config{Host: "127.0.0.1", ContentConfig: ContentConfig{NegotiatedSerializer: testapi.NegotiatedSerializer}}); err == nil { + if _, err := RESTClientFor(&Config{Host: "127.0.0.1", ContentConfig: ContentConfig{NegotiatedSerializer: testapi.Default.NegotiatedSerializer()}}); err == nil { t.Errorf("unexpected non-error") } if _, err := RESTClientFor(&Config{Host: "127.0.0.1", ContentConfig: ContentConfig{GroupVersion: testapi.Default.GroupVersion()}}); err == nil { t.Errorf("unexpected non-error") } - if _, err := RESTClientFor(&Config{Host: "127.0.0.1", ContentConfig: ContentConfig{GroupVersion: testapi.Default.GroupVersion(), NegotiatedSerializer: testapi.NegotiatedSerializer}}); err != nil { + if _, err := RESTClientFor(&Config{Host: "127.0.0.1", ContentConfig: ContentConfig{GroupVersion: testapi.Default.GroupVersion(), NegotiatedSerializer: testapi.Default.NegotiatedSerializer()}}); err != nil { t.Errorf("unexpected error: %v", err) } } diff --git a/pkg/client/restclient/request.go b/pkg/client/restclient/request.go index c29f6e4eedd..83b93f816e5 100644 --- a/pkg/client/restclient/request.go +++ b/pkg/client/restclient/request.go @@ -639,6 +639,10 @@ func (r *Request) Watch() (watch.Interface, error) { if r.err != nil { return nil, r.err } + if r.serializers.Framer == nil { + return nil, fmt.Errorf("watching resources is not possible with this client (content-type: %s)", r.content.ContentType) + } + url := r.URL().String() req, err := http.NewRequest(r.verb, url, r.body) if err != nil { diff --git a/pkg/client/restclient/request_test.go b/pkg/client/restclient/request_test.go index 3b1a67a0cfe..b7cb6a2ac01 100644 --- a/pkg/client/restclient/request_test.go +++ b/pkg/client/restclient/request_test.go @@ -247,7 +247,7 @@ func defaultContentConfig() ContentConfig { return ContentConfig{ GroupVersion: testapi.Default.GroupVersion(), Codec: testapi.Default.Codec(), - NegotiatedSerializer: testapi.NegotiatedSerializer, + NegotiatedSerializer: testapi.Default.NegotiatedSerializer(), } } @@ -549,6 +549,7 @@ func TestRequestWatch(t *testing.T) { }, { Request: &Request{ + serializers: defaultSerializers(), client: clientFunc(func(req *http.Request) (*http.Response, error) { return nil, io.EOF }), @@ -558,6 +559,7 @@ func TestRequestWatch(t *testing.T) { }, { Request: &Request{ + serializers: defaultSerializers(), client: clientFunc(func(req *http.Request) (*http.Response, error) { return nil, &url.Error{Err: io.EOF} }), @@ -567,6 +569,7 @@ func TestRequestWatch(t *testing.T) { }, { Request: &Request{ + serializers: defaultSerializers(), client: clientFunc(func(req *http.Request) (*http.Response, error) { return nil, errors.New("http: can't write HTTP request on broken connection") }), @@ -576,6 +579,7 @@ func TestRequestWatch(t *testing.T) { }, { Request: &Request{ + serializers: defaultSerializers(), client: clientFunc(func(req *http.Request) (*http.Response, error) { return nil, errors.New("foo: connection reset by peer") }), diff --git a/pkg/client/typed/discovery/discovery_client.go b/pkg/client/typed/discovery/discovery_client.go index 6c0a4039b34..4a0f0b91ac3 100644 --- a/pkg/client/typed/discovery/discovery_client.go +++ b/pkg/client/typed/discovery/discovery_client.go @@ -215,7 +215,10 @@ func setDiscoveryDefaults(config *restclient.Config) error { config.APIPath = "" config.GroupVersion = nil codec := runtime.NoopEncoder{Decoder: api.Codecs.UniversalDecoder()} - config.NegotiatedSerializer = serializer.NegotiatedSerializerWrapper(codec, codec, runtime.DefaultFramer) + config.NegotiatedSerializer = serializer.NegotiatedSerializerWrapper( + runtime.SerializerInfo{Serializer: codec}, + runtime.StreamSerializerInfo{}, + ) if len(config.UserAgent) == 0 { config.UserAgent = restclient.DefaultKubernetesUserAgent() } diff --git a/pkg/client/typed/dynamic/client.go b/pkg/client/typed/dynamic/client.go index f542cef726c..b2e6d228917 100644 --- a/pkg/client/typed/dynamic/client.go +++ b/pkg/client/typed/dynamic/client.go @@ -33,7 +33,6 @@ import ( "k8s.io/kubernetes/pkg/conversion/queryparams" "k8s.io/kubernetes/pkg/runtime" "k8s.io/kubernetes/pkg/runtime/serializer" - serializerjson "k8s.io/kubernetes/pkg/runtime/serializer/json" "k8s.io/kubernetes/pkg/watch" ) @@ -51,8 +50,10 @@ func NewClient(conf *restclient.Config) (*Client, error) { conf = &confCopy codec := dynamicCodec{} - legacyCodec := api.Codecs.LegacyCodec(v1.SchemeGroupVersion) - conf.NegotiatedSerializer = serializer.NegotiatedSerializerWrapper(codec, legacyCodec, serializerjson.Framer) + + // TODO: it's questionable that this should be using anything other than unstructured schema and JSON + streamingInfo, _ := api.Codecs.StreamingSerializerForMediaType("application/json;stream=watch", nil) + conf.NegotiatedSerializer = serializer.NegotiatedSerializerWrapper(runtime.SerializerInfo{Serializer: codec}, streamingInfo) if conf.APIPath == "" { conf.APIPath = "/api" diff --git a/pkg/client/unversioned/helper_test.go b/pkg/client/unversioned/helper_test.go index 80660a6d056..0e186e4c78e 100644 --- a/pkg/client/unversioned/helper_test.go +++ b/pkg/client/unversioned/helper_test.go @@ -42,7 +42,7 @@ func TestSetKubernetesDefaults(t *testing.T) { ContentConfig: restclient.ContentConfig{ GroupVersion: testapi.Default.GroupVersion(), Codec: testapi.Default.Codec(), - NegotiatedSerializer: testapi.NegotiatedSerializer, + NegotiatedSerializer: testapi.Default.NegotiatedSerializer(), }, QPS: 5, Burst: 10, @@ -126,7 +126,7 @@ func TestHelperGetServerAPIVersions(t *testing.T) { w.Write(output) })) defer server.Close() - got, err := restclient.ServerAPIVersions(&restclient.Config{Host: server.URL, ContentConfig: restclient.ContentConfig{GroupVersion: &unversioned.GroupVersion{Group: "invalid version", Version: "one"}, NegotiatedSerializer: testapi.NegotiatedSerializer}}) + got, err := restclient.ServerAPIVersions(&restclient.Config{Host: server.URL, ContentConfig: restclient.ContentConfig{GroupVersion: &unversioned.GroupVersion{Group: "invalid version", Version: "one"}, NegotiatedSerializer: testapi.Default.NegotiatedSerializer()}}) if err != nil { t.Fatalf("unexpected encoding error: %v", err) } diff --git a/pkg/client/unversioned/remotecommand/remotecommand_test.go b/pkg/client/unversioned/remotecommand/remotecommand_test.go index 3c68fe5d9f6..f231a7f4926 100644 --- a/pkg/client/unversioned/remotecommand/remotecommand_test.go +++ b/pkg/client/unversioned/remotecommand/remotecommand_test.go @@ -215,7 +215,7 @@ func TestStream(t *testing.T) { url, _ := url.ParseRequestURI(server.URL) config := restclient.ContentConfig{ GroupVersion: &unversioned.GroupVersion{Group: "x"}, - NegotiatedSerializer: testapi.NegotiatedSerializer, + NegotiatedSerializer: testapi.Default.NegotiatedSerializer(), } c, err := restclient.NewRESTClient(url, "", config, -1, -1, nil, nil) if err != nil { diff --git a/pkg/genericapiserver/resource_encoding_config.go b/pkg/genericapiserver/resource_encoding_config.go index 3430746c2cc..04a25d45fba 100644 --- a/pkg/genericapiserver/resource_encoding_config.go +++ b/pkg/genericapiserver/resource_encoding_config.go @@ -26,7 +26,7 @@ type ResourceEncodingConfig interface { // StorageEncoding returns the serialization format for the resource. // TODO this should actually return a GroupVersionKind since you can logically have multiple "matching" Kinds // For now, it returns just the GroupVersion for consistency with old behavior - StoragageEncodingFor(unversioned.GroupResource) (unversioned.GroupVersion, error) + StorageEncodingFor(unversioned.GroupResource) (unversioned.GroupVersion, error) // InMemoryEncodingFor returns the groupVersion for the in memory representation the storage should convert to. InMemoryEncodingFor(unversioned.GroupResource) (unversioned.GroupVersion, error) @@ -78,7 +78,7 @@ func (o *DefaultResourceEncodingConfig) SetResourceEncoding(resourceBeingStored o.Groups[group].InternalResourceEncodings[resourceBeingStored.Resource] = internalVersion } -func (o *DefaultResourceEncodingConfig) StoragageEncodingFor(resource unversioned.GroupResource) (unversioned.GroupVersion, error) { +func (o *DefaultResourceEncodingConfig) StorageEncodingFor(resource unversioned.GroupResource) (unversioned.GroupVersion, error) { groupMeta, err := registered.Group(resource.Group) if err != nil { return unversioned.GroupVersion{}, err diff --git a/pkg/genericapiserver/storage_factory.go b/pkg/genericapiserver/storage_factory.go index 4e1f4c97e97..45489c33074 100644 --- a/pkg/genericapiserver/storage_factory.go +++ b/pkg/genericapiserver/storage_factory.go @@ -18,9 +18,11 @@ package genericapiserver import ( "fmt" + "mime" "k8s.io/kubernetes/pkg/api/unversioned" "k8s.io/kubernetes/pkg/runtime" + "k8s.io/kubernetes/pkg/runtime/serializer/recognizer" "k8s.io/kubernetes/pkg/runtime/serializer/versioning" "k8s.io/kubernetes/pkg/storage" "k8s.io/kubernetes/pkg/storage/storagebackend" @@ -50,19 +52,25 @@ type DefaultStorageFactory struct { Overrides map[unversioned.GroupResource]groupResourceOverrides + // DefaultMediaType is the media type used to store resources. If it is not set, "application/json" is used. + DefaultMediaType string + // DefaultSerializer is used to create encoders and decoders for the storage.Interface. - DefaultSerializer runtime.NegotiatedSerializer + DefaultSerializer runtime.StorageSerializer // ResourceEncodingConfig describes how to encode a particular GroupVersionResource ResourceEncodingConfig ResourceEncodingConfig // APIResourceConfigSource indicates whether the *storage* is enabled, NOT the API - // This is discrete from resource enablement because those are separate concerns. How it is surfaced to the user via flags - // or config is up to whoever is building this. + // This is discrete from resource enablement because those are separate concerns. How this source is configured + // is left to the caller. APIResourceConfigSource APIResourceConfigSource - // newEtcdFn exists to be overwritten for unit testing. You should never set this in a normal world. - newEtcdFn func(ns runtime.NegotiatedSerializer, storageVersion, memoryVersion unversioned.GroupVersion, config storagebackend.Config) (etcdStorage storage.Interface, err error) + // newStorageCodecFn exists to be overwritten for unit testing. + newStorageCodecFn func(storageMediaType string, ns runtime.StorageSerializer, storageVersion, memoryVersion unversioned.GroupVersion, config storagebackend.Config) (codec runtime.Codec, err error) + + // newStorageFn exists to be overwritten for unit testing. + newStorageFn func(config storagebackend.Config) (etcdStorage storage.Interface, err error) } type groupResourceOverrides struct { @@ -71,8 +79,10 @@ type groupResourceOverrides struct { etcdLocation []string // etcdPrefix contains the list of "special" prefixes for a GroupResource. Resource=* means for the entire group etcdPrefix string + // mediaType is the desired serializer to choose. If empty, the default is chosen. + mediaType string // serializer contains the list of "special" serializers for a GroupResource. Resource=* means for the entire group - serializer runtime.NegotiatedSerializer + serializer runtime.StorageSerializer // cohabitatingResources keeps track of which resources must be stored together. This happens when we have multiple ways // of exposing one set of concepts. autoscaling.HPA and extensions.HPA as a for instance // The order of the slice matters! It is the priority order of lookup for finding a storage location @@ -83,15 +93,20 @@ var _ StorageFactory = &DefaultStorageFactory{} const AllResources = "*" -func NewDefaultStorageFactory(config storagebackend.Config, defaultSerializer runtime.NegotiatedSerializer, resourceEncodingConfig ResourceEncodingConfig, resourceConfig APIResourceConfigSource) *DefaultStorageFactory { +func NewDefaultStorageFactory(config storagebackend.Config, defaultMediaType string, defaultSerializer runtime.StorageSerializer, resourceEncodingConfig ResourceEncodingConfig, resourceConfig APIResourceConfigSource) *DefaultStorageFactory { + if len(defaultMediaType) == 0 { + defaultMediaType = runtime.ContentTypeJSON + } return &DefaultStorageFactory{ StorageConfig: config, Overrides: map[unversioned.GroupResource]groupResourceOverrides{}, + DefaultMediaType: defaultMediaType, DefaultSerializer: defaultSerializer, ResourceEncodingConfig: resourceEncodingConfig, APIResourceConfigSource: resourceConfig, - newEtcdFn: newEtcd, + newStorageCodecFn: NewStorageCodec, + newStorageFn: newStorage, } } @@ -107,8 +122,9 @@ func (s *DefaultStorageFactory) SetEtcdPrefix(groupResource unversioned.GroupRes s.Overrides[groupResource] = overrides } -func (s *DefaultStorageFactory) SetSerializer(groupResource unversioned.GroupResource, serializer runtime.NegotiatedSerializer) { +func (s *DefaultStorageFactory) SetSerializer(groupResource unversioned.GroupResource, mediaType string, serializer runtime.StorageSerializer) { overrides := s.Overrides[groupResource] + overrides.mediaType = mediaType overrides.serializer = serializer s.Overrides[groupResource] = overrides } @@ -160,6 +176,14 @@ func (s *DefaultStorageFactory) New(groupResource unversioned.GroupResource) (st etcdPrefix = exactResourceOverride.etcdPrefix } + etcdMediaType := s.DefaultMediaType + if len(groupOverride.mediaType) != 0 { + etcdMediaType = groupOverride.mediaType + } + if len(exactResourceOverride.mediaType) != 0 { + etcdMediaType = exactResourceOverride.mediaType + } + etcdSerializer := s.DefaultSerializer if groupOverride.serializer != nil { etcdSerializer = groupOverride.serializer @@ -174,7 +198,7 @@ func (s *DefaultStorageFactory) New(groupResource unversioned.GroupResource) (st config.ServerList = overriddenEtcdLocations } - storageEncodingVersion, err := s.ResourceEncodingConfig.StoragageEncodingFor(chosenStorageResource) + storageEncodingVersion, err := s.ResourceEncodingConfig.StorageEncodingFor(chosenStorageResource) if err != nil { return nil, err } @@ -183,27 +207,19 @@ func (s *DefaultStorageFactory) New(groupResource unversioned.GroupResource) (st return nil, err } + codec, err := s.newStorageCodecFn(etcdMediaType, etcdSerializer, storageEncodingVersion, internalVersion, config) + if err != nil { + return nil, err + } + + config.Codec = codec + glog.V(3).Infof("storing %v in %v, reading as %v from %v", groupResource, storageEncodingVersion, internalVersion, config) - return s.newEtcdFn(etcdSerializer, storageEncodingVersion, internalVersion, config) + return s.newStorageFn(config) } -func newEtcd(ns runtime.NegotiatedSerializer, storageVersion, memoryVersion unversioned.GroupVersion, config storagebackend.Config) (etcdStorage storage.Interface, err error) { - s, ok := ns.SerializerForMediaType("application/json", nil) - if !ok { - return nil, fmt.Errorf("unable to find serializer for JSON") - } - encoder := ns.EncoderForVersion(s, storageVersion) - decoder := ns.DecoderToVersion(s, memoryVersion) - if memoryVersion.Group != storageVersion.Group { - // Allow this codec to translate between groups. - if err = versioning.EnableCrossGroupEncoding(encoder, memoryVersion.Group, storageVersion.Group); err != nil { - return nil, fmt.Errorf("error setting up encoder from %v to %v: %v", memoryVersion, storageVersion, err) - } - if err = versioning.EnableCrossGroupDecoding(decoder, storageVersion.Group, memoryVersion.Group); err != nil { - return nil, fmt.Errorf("error setting up decoder from %v to %v: %v", storageVersion, memoryVersion, err) - } - } - config.Codec = runtime.NewCodec(encoder, decoder) +// newStorage is the default implementation for creating a storage backend. +func newStorage(config storagebackend.Config) (etcdStorage storage.Interface, err error) { return storagebackend.Create(config) } @@ -217,3 +233,39 @@ func (s *DefaultStorageFactory) Backends() []string { } return backends.List() } + +// NewStorageCodec assembles a storage codec for the provided storage media type, the provided serializer, and the requested +// storage and memory versions. +func NewStorageCodec(storageMediaType string, ns runtime.StorageSerializer, storageVersion, memoryVersion unversioned.GroupVersion, config storagebackend.Config) (runtime.Codec, error) { + mediaType, options, err := mime.ParseMediaType(storageMediaType) + if err != nil { + return nil, fmt.Errorf("%q is not a valid mime-type", storageMediaType) + } + serializer, ok := ns.SerializerForMediaType(mediaType, options) + if !ok { + return nil, fmt.Errorf("unable to find serializer for %q", storageMediaType) + } + + s := serializer.Serializer + + // etcd2 only supports string data - we must wrap any result before returning + // TODO: storagebackend should return a boolean indicating whether it supports binary data + if !serializer.EncodesAsText && (config.Type == storagebackend.StorageTypeUnset || config.Type == storagebackend.StorageTypeETCD2) { + glog.V(4).Infof("Wrapping the underlying binary storage serializer with a base64 encoding for etcd2") + s = runtime.NewBase64Serializer(s) + } + + ds := recognizer.NewDecoder(s, ns.UniversalDeserializer()) + encoder := ns.EncoderForVersion(s, storageVersion) + decoder := ns.DecoderToVersion(ds, memoryVersion) + if memoryVersion.Group != storageVersion.Group { + // Allow this codec to translate between groups. + if err := versioning.EnableCrossGroupEncoding(encoder, memoryVersion.Group, storageVersion.Group); err != nil { + return nil, fmt.Errorf("error setting up encoder from %v to %v: %v", memoryVersion, storageVersion, err) + } + if err := versioning.EnableCrossGroupDecoding(decoder, storageVersion.Group, memoryVersion.Group); err != nil { + return nil, fmt.Errorf("error setting up decoder from %v to %v: %v", storageVersion, memoryVersion, err) + } + } + return runtime.NewCodec(encoder, decoder), nil +} diff --git a/pkg/genericapiserver/storage_factory_test.go b/pkg/genericapiserver/storage_factory_test.go index 1f727fb086e..9e8be61f238 100644 --- a/pkg/genericapiserver/storage_factory_test.go +++ b/pkg/genericapiserver/storage_factory_test.go @@ -23,7 +23,6 @@ import ( "k8s.io/kubernetes/pkg/api" "k8s.io/kubernetes/pkg/api/unversioned" "k8s.io/kubernetes/pkg/apis/extensions" - "k8s.io/kubernetes/pkg/runtime" "k8s.io/kubernetes/pkg/storage" "k8s.io/kubernetes/pkg/storage/storagebackend" ) @@ -50,7 +49,7 @@ func TestUpdateEtcdOverrides(t *testing.T) { defaultEtcdLocation := []string{"http://127.0.0.1"} for i, test := range testCases { actualConfig := storagebackend.Config{} - newEtcdFn := func(ns runtime.NegotiatedSerializer, storageVersion, memoryVersion unversioned.GroupVersion, config storagebackend.Config) (etcdStorage storage.Interface, err error) { + newStorageFn := func(config storagebackend.Config) (_ storage.Interface, err error) { actualConfig = config return nil, nil } @@ -59,8 +58,8 @@ func TestUpdateEtcdOverrides(t *testing.T) { Prefix: DefaultEtcdPathPrefix, ServerList: defaultEtcdLocation, } - storageFactory := NewDefaultStorageFactory(defaultConfig, api.Codecs, NewDefaultResourceEncodingConfig(), NewResourceConfig()) - storageFactory.newEtcdFn = newEtcdFn + storageFactory := NewDefaultStorageFactory(defaultConfig, "", api.Codecs, NewDefaultResourceEncodingConfig(), NewResourceConfig()) + storageFactory.newStorageFn = newStorageFn storageFactory.SetEtcdLocation(test.resource, test.servers) var err error diff --git a/pkg/kubectl/resource/mapper.go b/pkg/kubectl/resource/mapper.go index 27e85043b0e..8e8cc2726e9 100644 --- a/pkg/kubectl/resource/mapper.go +++ b/pkg/kubectl/resource/mapper.go @@ -55,7 +55,7 @@ func (m *Mapper) InfoForData(data []byte, source string) (*Info, error) { var obj runtime.Object var versioned runtime.Object if registered.IsThirdPartyAPIGroupVersion(gvk.GroupVersion()) { - obj, err = runtime.Decode(thirdpartyresourcedata.NewCodec(nil, gvk.Kind), data) + obj, err = runtime.Decode(thirdpartyresourcedata.NewDecoder(nil, gvk.Kind), data) versioned = obj } else { obj, versioned = versions.Last(), versions.First() diff --git a/pkg/master/master.go b/pkg/master/master.go index 5f657cfb09f..4158bb10a41 100644 --- a/pkg/master/master.go +++ b/pkg/master/master.go @@ -677,7 +677,14 @@ func (m *Master) InstallThirdPartyResource(rsrc *extensions.ThirdPartyResource) func (m *Master) thirdpartyapi(group, kind, version string) *apiserver.APIGroupVersion { resourceStorage := thirdpartyresourcedataetcd.NewREST( - generic.RESTOptions{Storage: m.thirdPartyStorage, Decorator: generic.UndecoratedStorage, DeleteCollectionWorkers: m.deleteCollectionWorkers}, group, kind) + generic.RESTOptions{ + Storage: m.thirdPartyStorage, + Decorator: generic.UndecoratedStorage, + DeleteCollectionWorkers: m.deleteCollectionWorkers, + }, + group, + kind, + ) apiRoot := makeThirdPartyPath("") diff --git a/pkg/master/master_test.go b/pkg/master/master_test.go index 17f735fd7bd..8c4a4f34dc3 100644 --- a/pkg/master/master_test.go +++ b/pkg/master/master_test.go @@ -88,7 +88,7 @@ func setUp(t *testing.T) (*Master, *etcdtesting.EtcdTestServer, Config, *assert. resourceEncoding.SetVersionEncoding(batch.GroupName, *testapi.Batch.GroupVersion(), unversioned.GroupVersion{Group: batch.GroupName, Version: runtime.APIVersionInternal}) resourceEncoding.SetVersionEncoding(apps.GroupName, *testapi.Apps.GroupVersion(), unversioned.GroupVersion{Group: apps.GroupName, Version: runtime.APIVersionInternal}) resourceEncoding.SetVersionEncoding(extensions.GroupName, *testapi.Extensions.GroupVersion(), unversioned.GroupVersion{Group: extensions.GroupName, Version: runtime.APIVersionInternal}) - storageFactory := genericapiserver.NewDefaultStorageFactory(storageConfig, api.Codecs, resourceEncoding, DefaultAPIResourceConfigSource()) + storageFactory := genericapiserver.NewDefaultStorageFactory(storageConfig, testapi.StorageMediaType(), api.Codecs, resourceEncoding, DefaultAPIResourceConfigSource()) config.StorageFactory = storageFactory config.APIResourceConfigSource = DefaultAPIResourceConfigSource() diff --git a/pkg/registry/generic/registry/store_test.go b/pkg/registry/generic/registry/store_test.go index 04f36b31ea8..a89eca3e60b 100644 --- a/pkg/registry/generic/registry/store_test.go +++ b/pkg/registry/generic/registry/store_test.go @@ -90,7 +90,7 @@ func hasCreated(t *testing.T, pod *api.Pod) func(runtime.Object) bool { func NewTestGenericStoreRegistry(t *testing.T) (*etcdtesting.EtcdTestServer, *Store) { podPrefix := "/pods" server := etcdtesting.NewEtcdTestClientServer(t) - s := etcdstorage.NewEtcdStorage(server.Client, testapi.Default.Codec(), etcdtest.PathPrefix(), false, etcdtest.DeserializationCacheSize) + s := etcdstorage.NewEtcdStorage(server.Client, testapi.Default.StorageCodec(), etcdtest.PathPrefix(), false, etcdtest.DeserializationCacheSize) strategy := &testRESTStrategy{api.Scheme, api.SimpleNameGenerator, true, false, true} return server, &Store{ diff --git a/pkg/registry/registrytest/etcd.go b/pkg/registry/registrytest/etcd.go index dafc5f4da30..c3295a5be09 100644 --- a/pkg/registry/registrytest/etcd.go +++ b/pkg/registry/registrytest/etcd.go @@ -38,7 +38,7 @@ import ( func NewEtcdStorage(t *testing.T, group string) (storage.Interface, *etcdtesting.EtcdTestServer) { server := etcdtesting.NewEtcdTestClientServer(t) - storage := etcdstorage.NewEtcdStorage(server.Client, testapi.Groups[group].Codec(), etcdtest.PathPrefix(), false, etcdtest.DeserializationCacheSize) + storage := etcdstorage.NewEtcdStorage(server.Client, testapi.Groups[group].StorageCodec(), etcdtest.PathPrefix(), false, etcdtest.DeserializationCacheSize) return storage, server } diff --git a/pkg/registry/thirdpartyresourcedata/codec.go b/pkg/registry/thirdpartyresourcedata/codec.go index a7f46a2ee0a..947957a9498 100644 --- a/pkg/registry/thirdpartyresourcedata/codec.go +++ b/pkg/registry/thirdpartyresourcedata/codec.go @@ -33,6 +33,7 @@ import ( "k8s.io/kubernetes/pkg/apis/extensions" "k8s.io/kubernetes/pkg/apis/extensions/v1beta1" "k8s.io/kubernetes/pkg/runtime" + "k8s.io/kubernetes/pkg/util/sets" ) type thirdPartyObjectConverter struct { @@ -183,31 +184,39 @@ func NewNegotiatedSerializer(s runtime.NegotiatedSerializer, kind string, encode } } -func (t *thirdPartyResourceDataCodecFactory) EncoderForVersion(s runtime.Serializer, gv unversioned.GroupVersion) runtime.Encoder { - return NewCodec(runtime.NewCodec( - t.NegotiatedSerializer.EncoderForVersion(s, gv), - t.NegotiatedSerializer.DecoderToVersion(s, t.decodeGV), - ), t.kind) +func (t *thirdPartyResourceDataCodecFactory) SupportedMediaTypes() []string { + supported := sets.NewString(t.NegotiatedSerializer.SupportedMediaTypes()...) + return supported.Intersection(sets.NewString("application/json", "application/yaml")).List() } -func (t *thirdPartyResourceDataCodecFactory) DecoderToVersion(s runtime.Serializer, gv unversioned.GroupVersion) runtime.Decoder { - return NewCodec(runtime.NewCodec( - t.NegotiatedSerializer.EncoderForVersion(s, t.encodeGV), - t.NegotiatedSerializer.DecoderToVersion(s, gv), - ), t.kind) +func (t *thirdPartyResourceDataCodecFactory) SupportedStreamingMediaTypes() []string { + supported := sets.NewString(t.NegotiatedSerializer.SupportedStreamingMediaTypes()...) + return supported.Intersection(sets.NewString("application/json", "application/json;stream=watch")).List() } -type thirdPartyResourceDataCodec struct { - delegate runtime.Codec +func (t *thirdPartyResourceDataCodecFactory) EncoderForVersion(s runtime.Encoder, gv unversioned.GroupVersion) runtime.Encoder { + return &thirdPartyResourceDataEncoder{delegate: t.NegotiatedSerializer.EncoderForVersion(s, gv), kind: t.kind} +} + +func (t *thirdPartyResourceDataCodecFactory) DecoderToVersion(s runtime.Decoder, gv unversioned.GroupVersion) runtime.Decoder { + return NewDecoder(t.NegotiatedSerializer.DecoderToVersion(s, gv), t.kind) +} + +func NewCodec(delegate runtime.Codec, kind string) runtime.Codec { + return runtime.NewCodec(NewEncoder(delegate, kind), NewDecoder(delegate, kind)) +} + +type thirdPartyResourceDataDecoder struct { + delegate runtime.Decoder kind string } -var _ runtime.Codec = &thirdPartyResourceDataCodec{} - -func NewCodec(codec runtime.Codec, kind string) runtime.Codec { - return &thirdPartyResourceDataCodec{codec, kind} +func NewDecoder(delegate runtime.Decoder, kind string) runtime.Decoder { + return &thirdPartyResourceDataDecoder{delegate: delegate, kind: kind} } +var _ runtime.Decoder = &thirdPartyResourceDataDecoder{} + func parseObject(data []byte) (map[string]interface{}, error) { var obj interface{} if err := json.Unmarshal(data, &obj); err != nil { @@ -221,7 +230,7 @@ func parseObject(data []byte) (map[string]interface{}, error) { return mapObj, nil } -func (t *thirdPartyResourceDataCodec) populate(data []byte) (runtime.Object, error) { +func (t *thirdPartyResourceDataDecoder) populate(data []byte) (runtime.Object, error) { mapObj, err := parseObject(data) if err != nil { return nil, err @@ -229,7 +238,7 @@ func (t *thirdPartyResourceDataCodec) populate(data []byte) (runtime.Object, err return t.populateFromObject(mapObj, data) } -func (t *thirdPartyResourceDataCodec) populateFromObject(mapObj map[string]interface{}, data []byte) (runtime.Object, error) { +func (t *thirdPartyResourceDataDecoder) populateFromObject(mapObj map[string]interface{}, data []byte) (runtime.Object, error) { typeMeta := unversioned.TypeMeta{} if err := json.Unmarshal(data, &typeMeta); err != nil { return nil, err @@ -252,7 +261,7 @@ func (t *thirdPartyResourceDataCodec) populateFromObject(mapObj map[string]inter } } -func (t *thirdPartyResourceDataCodec) populateResource(objIn *extensions.ThirdPartyResourceData, mapObj map[string]interface{}, data []byte) error { +func (t *thirdPartyResourceDataDecoder) populateResource(objIn *extensions.ThirdPartyResourceData, mapObj map[string]interface{}, data []byte) error { metadata, ok := mapObj["metadata"].(map[string]interface{}) if !ok { return fmt.Errorf("unexpected object for metadata: %#v", mapObj["metadata"]) @@ -274,7 +283,7 @@ func (t *thirdPartyResourceDataCodec) populateResource(objIn *extensions.ThirdPa return nil } -func (t *thirdPartyResourceDataCodec) Decode(data []byte, gvk *unversioned.GroupVersionKind, into runtime.Object) (runtime.Object, *unversioned.GroupVersionKind, error) { +func (t *thirdPartyResourceDataDecoder) Decode(data []byte, gvk *unversioned.GroupVersionKind, into runtime.Object) (runtime.Object, *unversioned.GroupVersionKind, error) { if into == nil { obj, err := t.populate(data) if err != nil { @@ -347,7 +356,7 @@ func (t *thirdPartyResourceDataCodec) Decode(data []byte, gvk *unversioned.Group return thirdParty, actual, nil } -func (t *thirdPartyResourceDataCodec) populateListResource(objIn *extensions.ThirdPartyResourceDataList, mapObj map[string]interface{}) error { +func (t *thirdPartyResourceDataDecoder) populateListResource(objIn *extensions.ThirdPartyResourceDataList, mapObj map[string]interface{}) error { items, ok := mapObj["items"].([]interface{}) if !ok { return fmt.Errorf("unexpected object for items: %#v", mapObj["items"]) @@ -374,6 +383,17 @@ const template = `{ "items": [ %s ] }` +type thirdPartyResourceDataEncoder struct { + delegate runtime.Encoder + kind string +} + +func NewEncoder(delegate runtime.Encoder, kind string) runtime.Encoder { + return &thirdPartyResourceDataEncoder{delegate: delegate, kind: kind} +} + +var _ runtime.Encoder = &thirdPartyResourceDataEncoder{} + func encodeToJSON(obj *extensions.ThirdPartyResourceData, stream io.Writer) error { var objOut interface{} if err := json.Unmarshal(obj.Data, &objOut); err != nil { @@ -388,7 +408,7 @@ func encodeToJSON(obj *extensions.ThirdPartyResourceData, stream io.Writer) erro return encoder.Encode(objMap) } -func (t *thirdPartyResourceDataCodec) EncodeToStream(obj runtime.Object, stream io.Writer, overrides ...unversioned.GroupVersion) (err error) { +func (t *thirdPartyResourceDataEncoder) EncodeToStream(obj runtime.Object, stream io.Writer, overrides ...unversioned.GroupVersion) (err error) { switch obj := obj.(type) { case *extensions.ThirdPartyResourceData: return encodeToJSON(obj, stream) diff --git a/pkg/registry/thirdpartyresourcedata/codec_test.go b/pkg/registry/thirdpartyresourcedata/codec_test.go index 188e44befc4..942de910ccb 100644 --- a/pkg/registry/thirdpartyresourcedata/codec_test.go +++ b/pkg/registry/thirdpartyresourcedata/codec_test.go @@ -87,13 +87,14 @@ func TestCodec(t *testing.T) { }, } for _, test := range tests { - codec := &thirdPartyResourceDataCodec{kind: "Foo", delegate: testapi.Extensions.Codec()} + d := &thirdPartyResourceDataDecoder{kind: "Foo", delegate: testapi.Extensions.Codec()} + e := &thirdPartyResourceDataEncoder{kind: "Foo", delegate: testapi.Extensions.Codec()} data, err := json.Marshal(test.obj) if err != nil { t.Errorf("[%s] unexpected error: %v", test.name, err) continue } - obj, err := runtime.Decode(codec, data) + obj, err := runtime.Decode(d, data) if err != nil && !test.expectErr { t.Errorf("[%s] unexpected error: %v", test.name, err) continue @@ -121,7 +122,7 @@ func TestCodec(t *testing.T) { t.Errorf("[%s]\nexpected\n%v\nsaw\n%v\n", test.name, test.obj, &output) } - data, err = runtime.Encode(codec, rsrcObj) + data, err = runtime.Encode(e, rsrcObj) if err != nil { t.Errorf("[%s] unexpected error: %v", test.name, err) } diff --git a/pkg/runtime/codec.go b/pkg/runtime/codec.go index bde0ae9755d..6379e71f5ba 100644 --- a/pkg/runtime/codec.go +++ b/pkg/runtime/codec.go @@ -18,6 +18,7 @@ package runtime import ( "bytes" + "encoding/base64" "fmt" "io" "net/url" @@ -169,3 +170,27 @@ func (c *parameterCodec) EncodeParameters(obj Object, to unversioned.GroupVersio } return queryparams.Convert(obj) } + +type base64Serializer struct { + Serializer +} + +func NewBase64Serializer(s Serializer) Serializer { + return &base64Serializer{s} +} + +func (s base64Serializer) EncodeToStream(obj Object, stream io.Writer, overrides ...unversioned.GroupVersion) error { + e := base64.NewEncoder(base64.StdEncoding, stream) + err := s.Serializer.EncodeToStream(obj, e, overrides...) + e.Close() + return err +} + +func (s base64Serializer) Decode(data []byte, defaults *unversioned.GroupVersionKind, into Object) (Object, *unversioned.GroupVersionKind, error) { + out := make([]byte, base64.StdEncoding.DecodedLen(len(data))) + n, err := base64.StdEncoding.Decode(out, data) + if err != nil { + return nil, nil, err + } + return s.Serializer.Decode(out[:n], defaults, into) +} diff --git a/pkg/runtime/deep_copy_generated.go b/pkg/runtime/deep_copy_generated.go index f368440a384..988b9740177 100644 --- a/pkg/runtime/deep_copy_generated.go +++ b/pkg/runtime/deep_copy_generated.go @@ -118,3 +118,33 @@ func DeepCopy_runtime_Scheme(in Scheme, out *Scheme, c *conversion.Cloner) error } return nil } + +func DeepCopy_runtime_SerializerInfo(in SerializerInfo, out *SerializerInfo, c *conversion.Cloner) error { + if in.Serializer == nil { + out.Serializer = nil + } else if newVal, err := c.DeepCopy(in.Serializer); err != nil { + return err + } else { + out.Serializer = newVal.(Serializer) + } + out.EncodesAsText = in.EncodesAsText + out.MediaType = in.MediaType + return nil +} + +func DeepCopy_runtime_StreamSerializerInfo(in StreamSerializerInfo, out *StreamSerializerInfo, c *conversion.Cloner) error { + if err := DeepCopy_runtime_SerializerInfo(in.SerializerInfo, &out.SerializerInfo, c); err != nil { + return err + } + if in.Framer == nil { + out.Framer = nil + } else if newVal, err := c.DeepCopy(in.Framer); err != nil { + return err + } else { + out.Framer = newVal.(Framer) + } + if err := DeepCopy_runtime_SerializerInfo(in.Embedded, &out.Embedded, c); err != nil { + return err + } + return nil +} diff --git a/pkg/runtime/interfaces.go b/pkg/runtime/interfaces.go index 9954238a9cd..c388a12fe5f 100644 --- a/pkg/runtime/interfaces.go +++ b/pkg/runtime/interfaces.go @@ -86,32 +86,75 @@ type Framer interface { NewFrameWriter(w io.Writer) io.Writer } +// SerializerInfo contains information about a specific serialization format +type SerializerInfo struct { + Serializer + // EncodesAsText indicates this serializer can be encoded to UTF-8 safely. + EncodesAsText bool + // MediaType is the value that represents this serializer over the wire. + MediaType string +} + +// StreamSerializerInfo contains information about a specific stream serialization format +type StreamSerializerInfo struct { + SerializerInfo + // Framer is the factory for retrieving streams that separate objects on the wire + Framer + // Embedded is the type of the nested serialization that should be used. + Embedded SerializerInfo +} + // NegotiatedSerializer is an interface used for obtaining encoders, decoders, and serializers -// for multiple supported media types. +// for multiple supported media types. This would commonly be accepted by a server component +// that performs HTTP content negotiation to accept multiple formats. type NegotiatedSerializer interface { // SupportedMediaTypes is the media types supported for reading and writing single objects. SupportedMediaTypes() []string - // SerializerForMediaType returns a serializer for the provided media type. Options is a set of - // parameters applied to the media type that may modify the resulting output. - SerializerForMediaType(mediaType string, options map[string]string) (Serializer, bool) + // SerializerForMediaType returns a serializer for the provided media type. params is the set of + // parameters applied to the media type that may modify the resulting output. ok will be false + // if no serializer matched the media type. + SerializerForMediaType(mediaType string, params map[string]string) (s SerializerInfo, ok bool) // SupportedStreamingMediaTypes returns the media types of the supported streaming serializers. // Streaming serializers control how multiple objects are written to a stream output. SupportedStreamingMediaTypes() []string // StreamingSerializerForMediaType returns a serializer for the provided media type that supports // reading and writing multiple objects to a stream. It returns a framer and serializer, or an - // error if no such serializer can be created. Options is a set of parameters applied to the - // media type that may modify the resulting output. - StreamingSerializerForMediaType(mediaType string, options map[string]string) (Serializer, Framer, string, bool) + // error if no such serializer can be created. Params is the set of parameters applied to the + // media type that may modify the resulting output. ok will be false if no serializer matched + // the media type. + StreamingSerializerForMediaType(mediaType string, params map[string]string) (s StreamSerializerInfo, ok bool) // EncoderForVersion returns an encoder that ensures objects being written to the provided // serializer are in the provided group version. // TODO: take multiple group versions - EncoderForVersion(serializer Serializer, gv unversioned.GroupVersion) Encoder + EncoderForVersion(serializer Encoder, gv unversioned.GroupVersion) Encoder // DecoderForVersion returns a decoder that ensures objects being read by the provided // serializer are in the provided group version by default. // TODO: take multiple group versions - DecoderToVersion(serializer Serializer, gv unversioned.GroupVersion) Decoder + DecoderToVersion(serializer Decoder, gv unversioned.GroupVersion) Decoder +} + +// StorageSerializer is an interface used for obtaining encoders, decoders, and serializers +// that can read and write data at rest. This would commonly be used by client tools that must +// read files, or server side storage interfaces that persist restful objects. +type StorageSerializer interface { + // SerializerForMediaType returns a serializer for the provided media type. Options is a set of + // parameters applied to the media type that may modify the resulting output. + SerializerForMediaType(mediaType string, options map[string]string) (SerializerInfo, bool) + + // UniversalDeserializer returns a Serializer that can read objects in multiple supported formats + // by introspecting the data at rest. + UniversalDeserializer() Decoder + + // EncoderForVersion returns an encoder that ensures objects being written to the provided + // serializer are in the provided group version. + // TODO: take multiple group versions + EncoderForVersion(serializer Encoder, gv unversioned.GroupVersion) Encoder + // DecoderForVersion returns a decoder that ensures objects being read by the provided + // serializer are in the provided group version by default. + // TODO: take multiple group versions + DecoderToVersion(serializer Decoder, gv unversioned.GroupVersion) Decoder } /////////////////////////////////////////////////////////////////////////////// diff --git a/pkg/runtime/serializer/codec_factory.go b/pkg/runtime/serializer/codec_factory.go index 1ed0312b143..573ec70bb39 100644 --- a/pkg/runtime/serializer/codec_factory.go +++ b/pkg/runtime/serializer/codec_factory.go @@ -31,6 +31,8 @@ type serializerType struct { AcceptContentTypes []string ContentType string FileExtensions []string + // EncodesAsText should be true if this content type can be represented safely in UTF-8 + EncodesAsText bool Serializer runtime.Serializer PrettySerializer runtime.Serializer @@ -65,6 +67,7 @@ func newSerializersForScheme(scheme *runtime.Scheme, mf json.MetaFactory) []seri AcceptContentTypes: []string{"application/json"}, ContentType: "application/json", FileExtensions: []string{"json"}, + EncodesAsText: true, Serializer: jsonSerializer, PrettySerializer: jsonPrettySerializer, @@ -77,6 +80,7 @@ func newSerializersForScheme(scheme *runtime.Scheme, mf json.MetaFactory) []seri AcceptContentTypes: []string{"application/yaml"}, ContentType: "application/yaml", FileExtensions: []string{"yaml"}, + EncodesAsText: true, Serializer: yamlSerializer, // TODO: requires runtime.RawExtension to properly distinguish when the nested content is @@ -206,62 +210,91 @@ func (f CodecFactory) UniversalDeserializer() runtime.Decoder { // // TODO: the decoder will eventually be removed in favor of dealing with objects in their versioned form func (f CodecFactory) UniversalDecoder(versions ...unversioned.GroupVersion) runtime.Decoder { - return f.CodecForVersions(runtime.NoopEncoder{Decoder: f.universal}, nil, versions) + return f.CodecForVersions(nil, f.universal, nil, versions) } // CodecFor creates a codec with the provided serializer. If an object is decoded and its group is not in the list, // it will default to runtime.APIVersionInternal. If encode is not specified for an object's group, the object is not // converted. If encode or decode are nil, no conversion is performed. -func (f CodecFactory) CodecForVersions(serializer runtime.Serializer, encode []unversioned.GroupVersion, decode []unversioned.GroupVersion) runtime.Codec { - return versioning.NewCodecForScheme(f.scheme, serializer, serializer, encode, decode) +func (f CodecFactory) CodecForVersions(encoder runtime.Encoder, decoder runtime.Decoder, encode []unversioned.GroupVersion, decode []unversioned.GroupVersion) runtime.Codec { + return versioning.NewCodecForScheme(f.scheme, encoder, decoder, encode, decode) } // DecoderToVersion returns a decoder that targets the provided group version. -func (f CodecFactory) DecoderToVersion(serializer runtime.Serializer, gv unversioned.GroupVersion) runtime.Decoder { - return f.CodecForVersions(serializer, nil, []unversioned.GroupVersion{gv}) +func (f CodecFactory) DecoderToVersion(decoder runtime.Decoder, gv unversioned.GroupVersion) runtime.Decoder { + return f.CodecForVersions(nil, decoder, nil, []unversioned.GroupVersion{gv}) } // EncoderForVersion returns an encoder that targets the provided group version. -func (f CodecFactory) EncoderForVersion(serializer runtime.Serializer, gv unversioned.GroupVersion) runtime.Encoder { - return f.CodecForVersions(serializer, []unversioned.GroupVersion{gv}, nil) +func (f CodecFactory) EncoderForVersion(encoder runtime.Encoder, gv unversioned.GroupVersion) runtime.Encoder { + return f.CodecForVersions(encoder, nil, []unversioned.GroupVersion{gv}, nil) } // SerializerForMediaType returns a serializer that matches the provided RFC2046 mediaType, or false if no such // serializer exists -func (f CodecFactory) SerializerForMediaType(mediaType string, options map[string]string) (runtime.Serializer, bool) { +func (f CodecFactory) SerializerForMediaType(mediaType string, params map[string]string) (runtime.SerializerInfo, bool) { for _, s := range f.serializers { for _, accepted := range s.AcceptContentTypes { if accepted == mediaType { - if s.Specialize != nil && len(options) > 0 { - serializer, ok := s.Specialize(options) - return serializer, ok + // specialization abstracts variants to the content type + if s.Specialize != nil && len(params) > 0 { + serializer, ok := s.Specialize(params) + // TODO: return formatted mediaType+params + return runtime.SerializerInfo{Serializer: serializer, MediaType: s.ContentType, EncodesAsText: s.EncodesAsText}, ok } - if v, ok := options["pretty"]; ok && v == "1" && s.PrettySerializer != nil { - return s.PrettySerializer, true + + // legacy support for ?pretty=1 continues, but this is more formally defined + if v, ok := params["pretty"]; ok && v == "1" && s.PrettySerializer != nil { + return runtime.SerializerInfo{Serializer: s.PrettySerializer, MediaType: s.ContentType, EncodesAsText: s.EncodesAsText}, true } - return s.Serializer, true + + // return the base variant + return runtime.SerializerInfo{Serializer: s.Serializer, MediaType: s.ContentType, EncodesAsText: s.EncodesAsText}, true } } } - return nil, false + return runtime.SerializerInfo{}, false } // StreamingSerializerForMediaType returns a serializer that matches the provided RFC2046 mediaType, or false if no such // serializer exists -func (f CodecFactory) StreamingSerializerForMediaType(mediaType string, options map[string]string) (runtime.Serializer, runtime.Framer, string, bool) { +func (f CodecFactory) StreamingSerializerForMediaType(mediaType string, params map[string]string) (runtime.StreamSerializerInfo, bool) { for _, s := range f.serializers { for _, accepted := range s.AcceptStreamContentTypes { if accepted == mediaType { - if s.StreamSpecialize != nil && len(options) > 0 { - serializer, ok := s.StreamSpecialize(options) - // TODO: have StreamSpecialize return exact content type - return serializer, s.Framer, s.StreamContentType, ok + // TODO: accept params + nested, ok := f.SerializerForMediaType(s.ContentType, nil) + if !ok { + panic("no serializer defined for internal content type") } - return s.StreamSerializer, s.Framer, s.StreamContentType, true + + if s.StreamSpecialize != nil && len(params) > 0 { + serializer, ok := s.StreamSpecialize(params) + // TODO: return formatted mediaType+params + return runtime.StreamSerializerInfo{ + SerializerInfo: runtime.SerializerInfo{ + Serializer: serializer, + MediaType: s.StreamContentType, + EncodesAsText: s.EncodesAsText, + }, + Framer: s.Framer, + Embedded: nested, + }, ok + } + + return runtime.StreamSerializerInfo{ + SerializerInfo: runtime.SerializerInfo{ + Serializer: s.StreamSerializer, + MediaType: s.StreamContentType, + EncodesAsText: s.EncodesAsText, + }, + Framer: s.Framer, + Embedded: nested, + }, true } } } - return nil, nil, "", false + return runtime.StreamSerializerInfo{}, false } // SerializerForFileExtension returns a serializer for the provided extension, or false if no serializer matches. diff --git a/pkg/runtime/serializer/codec_test.go b/pkg/runtime/serializer/codec_test.go index 259da6e30c0..342a2a599fd 100644 --- a/pkg/runtime/serializer/codec_test.go +++ b/pkg/runtime/serializer/codec_test.go @@ -267,7 +267,7 @@ func TestVersionedEncoding(t *testing.T) { encoder, _ := cf.SerializerForFileExtension("json") // codec that is unversioned uses the target version - unversionedCodec := cf.CodecForVersions(encoder, nil, nil) + unversionedCodec := cf.CodecForVersions(encoder, nil, nil, nil) _, err = runtime.Encode(unversionedCodec, &TestType1{}, unversioned.GroupVersion{Version: "v3"}) if err == nil || !runtime.IsNotRegisteredError(err) { t.Fatal(err) diff --git a/pkg/runtime/serializer/json/json.go b/pkg/runtime/serializer/json/json.go index 61f01215e04..1788d980287 100644 --- a/pkg/runtime/serializer/json/json.go +++ b/pkg/runtime/serializer/json/json.go @@ -186,20 +186,13 @@ func (s *Serializer) EncodeToStream(obj runtime.Object, w io.Writer, overrides . } // RecognizesData implements the RecognizingDecoder interface. -func (s *Serializer) RecognizesData(peek io.Reader) (bool, error) { - _, ok := utilyaml.GuessJSONStream(peek, 2048) +func (s *Serializer) RecognizesData(peek io.Reader) (ok, unknown bool, err error) { if s.yaml { - return !ok, nil + // we could potentially look for '---' + return false, true, nil } - return ok, nil -} - -// EncodesAsText returns true because both JSON and YAML are considered textual representations -// of data. This is used to determine whether the serialized object should be transmitted over -// a WebSocket Text or Binary frame. This must remain true for legacy compatibility with v1.1 -// watch over websocket implementations. -func (s *Serializer) EncodesAsText() bool { - return true + _, ok = utilyaml.GuessJSONStream(peek, 2048) + return ok, false, nil } // Framer is the default JSON framing behavior, with newlines delimiting individual objects. diff --git a/pkg/runtime/serializer/negotiated_codec.go b/pkg/runtime/serializer/negotiated_codec.go index 0f0f3691184..6f6a56dd3fa 100644 --- a/pkg/runtime/serializer/negotiated_codec.go +++ b/pkg/runtime/serializer/negotiated_codec.go @@ -24,35 +24,34 @@ import ( // TODO: We should figure out what happens when someone asks // encoder for version and it conflicts with the raw serializer. type negotiatedSerializerWrapper struct { - serializer runtime.Serializer - streamingSerializer runtime.Serializer - framer runtime.Framer + info runtime.SerializerInfo + streamInfo runtime.StreamSerializerInfo } -func NegotiatedSerializerWrapper(serializer, streamingSerializer runtime.Serializer, framer runtime.Framer) runtime.NegotiatedSerializer { - return &negotiatedSerializerWrapper{serializer, streamingSerializer, framer} +func NegotiatedSerializerWrapper(info runtime.SerializerInfo, streamInfo runtime.StreamSerializerInfo) runtime.NegotiatedSerializer { + return &negotiatedSerializerWrapper{info, streamInfo} } func (n *negotiatedSerializerWrapper) SupportedMediaTypes() []string { return []string{} } -func (n *negotiatedSerializerWrapper) SerializerForMediaType(mediaType string, options map[string]string) (runtime.Serializer, bool) { - return n.serializer, true +func (n *negotiatedSerializerWrapper) SerializerForMediaType(mediaType string, options map[string]string) (runtime.SerializerInfo, bool) { + return n.info, true } func (n *negotiatedSerializerWrapper) SupportedStreamingMediaTypes() []string { return []string{} } -func (n *negotiatedSerializerWrapper) StreamingSerializerForMediaType(mediaType string, options map[string]string) (runtime.Serializer, runtime.Framer, string, bool) { - return n.streamingSerializer, n.framer, "", true +func (n *negotiatedSerializerWrapper) StreamingSerializerForMediaType(mediaType string, options map[string]string) (runtime.StreamSerializerInfo, bool) { + return n.streamInfo, true } -func (n *negotiatedSerializerWrapper) EncoderForVersion(serializer runtime.Serializer, gv unversioned.GroupVersion) runtime.Encoder { - return n.serializer +func (n *negotiatedSerializerWrapper) EncoderForVersion(e runtime.Encoder, _ unversioned.GroupVersion) runtime.Encoder { + return e } -func (n *negotiatedSerializerWrapper) DecoderToVersion(serializer runtime.Serializer, gv unversioned.GroupVersion) runtime.Decoder { - return n.serializer +func (n *negotiatedSerializerWrapper) DecoderToVersion(d runtime.Decoder, _gv unversioned.GroupVersion) runtime.Decoder { + return d } diff --git a/pkg/runtime/serializer/protobuf/protobuf.go b/pkg/runtime/serializer/protobuf/protobuf.go index 25ac1cdaaf5..aa72e16a7f1 100644 --- a/pkg/runtime/serializer/protobuf/protobuf.go +++ b/pkg/runtime/serializer/protobuf/protobuf.go @@ -419,12 +419,6 @@ func (s *RawSerializer) EncodeToStream(obj runtime.Object, w io.Writer, override } } -// RecognizesData implements the RecognizingDecoder interface - objects encoded with this serializer -// have no innate identifying information and so cannot be recognized. -func (s *RawSerializer) RecognizesData(peek io.Reader) (bool, error) { - return false, nil -} - var LengthDelimitedFramer = lengthDelimitedFramer{} type lengthDelimitedFramer struct{} diff --git a/pkg/runtime/serializer/protobuf/protobuf_test.go b/pkg/runtime/serializer/protobuf/protobuf_test.go index e39610cb314..06b3d2e557c 100644 --- a/pkg/runtime/serializer/protobuf/protobuf_test.go +++ b/pkg/runtime/serializer/protobuf/protobuf_test.go @@ -297,6 +297,18 @@ func TestDecodeObjects(t *testing.T) { t.Fatal(err) } + unk2 := &runtime.Unknown{ + TypeMeta: runtime.TypeMeta{Kind: "Pod", APIVersion: "v1"}, + } + wire2 := make([]byte, len(wire1)*2) + n, err := unk2.NestedMarshalTo(wire2, obj1, uint64(obj1.Size())) + if err != nil { + t.Fatal(err) + } + if n != len(wire1) || !bytes.Equal(wire1, wire2[:n]) { + t.Fatalf("unexpected wire:\n%s", hex.Dump(wire2[:n])) + } + wire1 = append([]byte{0x6b, 0x38, 0x73, 0x00}, wire1...) testCases := []struct { diff --git a/pkg/runtime/serializer/recognizer/recognizer.go b/pkg/runtime/serializer/recognizer/recognizer.go index 14a2cb3e841..4b8b1e204e3 100644 --- a/pkg/runtime/serializer/recognizer/recognizer.go +++ b/pkg/runtime/serializer/recognizer/recognizer.go @@ -17,6 +17,7 @@ limitations under the License. package recognizer import ( + "bufio" "bytes" "fmt" "io" @@ -27,51 +28,98 @@ import ( type RecognizingDecoder interface { runtime.Decoder - RecognizesData(peek io.Reader) (bool, error) + // RecognizesData should return true if the input provided in the provided reader + // belongs to this decoder, or an error if the data could not be read or is ambiguous. + // Unknown is true if the data could not be determined to match the decoder type. + // Decoders should assume that they can read as much of peek as they need (as the caller + // provides) and may return unknown if the data provided is not sufficient to make a + // a determination. When peek returns EOF that may mean the end of the input or the + // end of buffered input - recognizers should return the best guess at that time. + RecognizesData(peek io.Reader) (ok, unknown bool, err error) } +// NewDecoder creates a decoder that will attempt multiple decoders in an order defined +// by: +// +// 1. The decoder implements RecognizingDecoder and identifies the data +// 2. All other decoders, and any decoder that returned true for unknown. +// +// The order passed to the constructor is preserved within those priorities. func NewDecoder(decoders ...runtime.Decoder) runtime.Decoder { - recognizing, blind := []RecognizingDecoder{}, []runtime.Decoder{} - for _, d := range decoders { - if r, ok := d.(RecognizingDecoder); ok { - recognizing = append(recognizing, r) - } else { - blind = append(blind, d) - } - } return &decoder{ - recognizing: recognizing, - blind: blind, + decoders: decoders, } } type decoder struct { - recognizing []RecognizingDecoder - blind []runtime.Decoder + decoders []runtime.Decoder +} + +var _ RecognizingDecoder = &decoder{} + +func (d *decoder) RecognizesData(peek io.Reader) (bool, bool, error) { + var ( + lastErr error + anyUnknown bool + ) + data, _ := bufio.NewReaderSize(peek, 1024).Peek(1024) + for _, r := range d.decoders { + switch t := r.(type) { + case RecognizingDecoder: + ok, unknown, err := t.RecognizesData(bytes.NewBuffer(data)) + if err != nil { + lastErr = err + continue + } + anyUnknown = anyUnknown || unknown + if !ok { + continue + } + return true, false, nil + } + } + return false, anyUnknown, lastErr } func (d *decoder) Decode(data []byte, gvk *unversioned.GroupVersionKind, into runtime.Object) (runtime.Object, *unversioned.GroupVersionKind, error) { - var lastErr error - for _, r := range d.recognizing { - buf := bytes.NewBuffer(data) - ok, err := r.RecognizesData(buf) - if err != nil { - lastErr = err - continue + var ( + lastErr error + skipped []runtime.Decoder + ) + + // try recognizers, record any decoders we need to give a chance later + for _, r := range d.decoders { + switch t := r.(type) { + case RecognizingDecoder: + buf := bytes.NewBuffer(data) + ok, unknown, err := t.RecognizesData(buf) + if err != nil { + lastErr = err + continue + } + if unknown { + skipped = append(skipped, t) + continue + } + if !ok { + continue + } + return r.Decode(data, gvk, into) + default: + skipped = append(skipped, t) } - if !ok { - continue - } - return r.Decode(data, gvk, into) } - for _, d := range d.blind { - out, actual, err := d.Decode(data, gvk, into) + + // try recognizers that returned unknown or didn't recognize their data + for _, r := range skipped { + out, actual, err := r.Decode(data, gvk, into) if err != nil { lastErr = err continue } return out, actual, nil } + if lastErr == nil { lastErr = fmt.Errorf("no serialization format matched the provided data") } diff --git a/pkg/runtime/serializer/versioning/versioning.go b/pkg/runtime/serializer/versioning/versioning.go index 36c66fa9f57..e12682a6c28 100644 --- a/pkg/runtime/serializer/versioning/versioning.go +++ b/pkg/runtime/serializer/versioning/versioning.go @@ -27,6 +27,7 @@ import ( // EnableCrossGroupDecoding modifies the given decoder in place, if it is a codec // from this package. It allows objects from one group to be auto-decoded into // another group. 'destGroup' must already exist in the codec. +// TODO: this is an encapsulation violation and should be refactored func EnableCrossGroupDecoding(d runtime.Decoder, sourceGroup, destGroup string) error { internal, ok := d.(*codec) if !ok { @@ -45,6 +46,7 @@ func EnableCrossGroupDecoding(d runtime.Decoder, sourceGroup, destGroup string) // EnableCrossGroupEncoding modifies the given encoder in place, if it is a codec // from this package. It allows objects from one group to be auto-decoded into // another group. 'destGroup' must already exist in the codec. +// TODO: this is an encapsulation violation and should be refactored func EnableCrossGroupEncoding(e runtime.Encoder, sourceGroup, destGroup string) error { internal, ok := e.(*codec) if !ok { diff --git a/plugin/pkg/auth/authorizer/webhook/webhook.go b/plugin/pkg/auth/authorizer/webhook/webhook.go index 65b88865e05..66f4bbd027e 100644 --- a/plugin/pkg/auth/authorizer/webhook/webhook.go +++ b/plugin/pkg/auth/authorizer/webhook/webhook.go @@ -86,9 +86,13 @@ func New(kubeConfigFile string) (*WebhookAuthorizer, error) { if err != nil { return nil, err } + serializer := json.NewSerializer(json.DefaultMetaFactory, api.Scheme, runtime.ObjectTyperToTyper(api.Scheme), false) codec := versioning.NewCodecForScheme(api.Scheme, serializer, serializer, encodeVersions, decodeVersions) - clientConfig.ContentConfig.NegotiatedSerializer = runtimeserializer.NegotiatedSerializerWrapper(codec, codec, json.Framer) + clientConfig.ContentConfig.NegotiatedSerializer = runtimeserializer.NegotiatedSerializerWrapper( + runtime.SerializerInfo{Serializer: codec}, + runtime.StreamSerializerInfo{}, + ) restClient, err := restclient.UnversionedRESTClientFor(clientConfig) if err != nil { diff --git a/test/integration/framework/master_utils.go b/test/integration/framework/master_utils.go index 73115d8fcb7..29af8f9843f 100644 --- a/test/integration/framework/master_utils.go +++ b/test/integration/framework/master_utils.go @@ -155,24 +155,29 @@ func NewMasterConfig() *master.Config { Prefix: etcdtest.PathPrefix(), } - negotiatedSerializer := NewSingleContentTypeSerializer(api.Scheme, testapi.Default.Codec(), "application/json") + negotiatedSerializer := NewSingleContentTypeSerializer(api.Scheme, testapi.Default.Codec(), runtime.ContentTypeJSON) - storageFactory := genericapiserver.NewDefaultStorageFactory(config, negotiatedSerializer, genericapiserver.NewDefaultResourceEncodingConfig(), master.DefaultAPIResourceConfigSource()) + storageFactory := genericapiserver.NewDefaultStorageFactory(config, runtime.ContentTypeJSON, negotiatedSerializer, genericapiserver.NewDefaultResourceEncodingConfig(), master.DefaultAPIResourceConfigSource()) storageFactory.SetSerializer( unversioned.GroupResource{Group: api.GroupName, Resource: genericapiserver.AllResources}, - NewSingleContentTypeSerializer(api.Scheme, testapi.Default.Codec(), "application/json")) + "", + NewSingleContentTypeSerializer(api.Scheme, testapi.Default.Codec(), runtime.ContentTypeJSON)) storageFactory.SetSerializer( unversioned.GroupResource{Group: autoscaling.GroupName, Resource: genericapiserver.AllResources}, - NewSingleContentTypeSerializer(api.Scheme, testapi.Autoscaling.Codec(), "application/json")) + "", + NewSingleContentTypeSerializer(api.Scheme, testapi.Autoscaling.Codec(), runtime.ContentTypeJSON)) storageFactory.SetSerializer( unversioned.GroupResource{Group: batch.GroupName, Resource: genericapiserver.AllResources}, - NewSingleContentTypeSerializer(api.Scheme, testapi.Batch.Codec(), "application/json")) + "", + NewSingleContentTypeSerializer(api.Scheme, testapi.Batch.Codec(), runtime.ContentTypeJSON)) storageFactory.SetSerializer( unversioned.GroupResource{Group: apps.GroupName, Resource: genericapiserver.AllResources}, - NewSingleContentTypeSerializer(api.Scheme, testapi.Apps.Codec(), "application/json")) + "", + NewSingleContentTypeSerializer(api.Scheme, testapi.Apps.Codec(), runtime.ContentTypeJSON)) storageFactory.SetSerializer( unversioned.GroupResource{Group: extensions.GroupName, Resource: genericapiserver.AllResources}, - NewSingleContentTypeSerializer(api.Scheme, testapi.Extensions.Codec(), "application/json")) + "", + NewSingleContentTypeSerializer(api.Scheme, testapi.Extensions.Codec(), runtime.ContentTypeJSON)) return &master.Config{ Config: &genericapiserver.Config{ diff --git a/test/integration/framework/serializer.go b/test/integration/framework/serializer.go index e6aa79574a4..5d6d0e77a1b 100644 --- a/test/integration/framework/serializer.go +++ b/test/integration/framework/serializer.go @@ -23,7 +23,7 @@ import ( ) // NewSingleContentTypeSerializer wraps a serializer in a NegotiatedSerializer that handles one content type -func NewSingleContentTypeSerializer(scheme *runtime.Scheme, serializer runtime.Serializer, contentType string) runtime.NegotiatedSerializer { +func NewSingleContentTypeSerializer(scheme *runtime.Scheme, serializer runtime.Serializer, contentType string) runtime.StorageSerializer { return &wrappedSerializer{ scheme: scheme, serializer: serializer, @@ -37,29 +37,31 @@ type wrappedSerializer struct { contentType string } -var _ runtime.NegotiatedSerializer = &wrappedSerializer{} +var _ runtime.StorageSerializer = &wrappedSerializer{} func (s *wrappedSerializer) SupportedMediaTypes() []string { return []string{s.contentType} } -func (s *wrappedSerializer) SerializerForMediaType(mediaType string, options map[string]string) (runtime.Serializer, bool) { +func (s *wrappedSerializer) SerializerForMediaType(mediaType string, options map[string]string) (runtime.SerializerInfo, bool) { if mediaType != s.contentType { - return nil, false + return runtime.SerializerInfo{}, false } - return s.serializer, true -} -func (s *wrappedSerializer) SupportedStreamingMediaTypes() []string { - return nil -} -func (s *wrappedSerializer) StreamingSerializerForMediaType(mediaType string, options map[string]string) (runtime.Serializer, runtime.Framer, string, bool) { - return nil, nil, "", false + return runtime.SerializerInfo{ + Serializer: s.serializer, + MediaType: mediaType, + EncodesAsText: true, // TODO: this should be parameterized + }, true } -func (s *wrappedSerializer) EncoderForVersion(serializer runtime.Serializer, gv unversioned.GroupVersion) runtime.Encoder { - return versioning.NewCodec(s.serializer, s.serializer, s.scheme, s.scheme, s.scheme, runtime.ObjectTyperToTyper(s.scheme), []unversioned.GroupVersion{gv}, nil) +func (s *wrappedSerializer) UniversalDeserializer() runtime.Decoder { + return s.serializer } -func (s *wrappedSerializer) DecoderToVersion(serializer runtime.Serializer, gv unversioned.GroupVersion) runtime.Decoder { - return versioning.NewCodec(s.serializer, s.serializer, s.scheme, s.scheme, s.scheme, runtime.ObjectTyperToTyper(s.scheme), nil, []unversioned.GroupVersion{gv}) +func (s *wrappedSerializer) EncoderForVersion(encoder runtime.Encoder, gv unversioned.GroupVersion) runtime.Encoder { + return versioning.NewCodec(encoder, nil, s.scheme, s.scheme, s.scheme, runtime.ObjectTyperToTyper(s.scheme), []unversioned.GroupVersion{gv}, nil) +} + +func (s *wrappedSerializer) DecoderToVersion(decoder runtime.Decoder, gv unversioned.GroupVersion) runtime.Decoder { + return versioning.NewCodec(nil, decoder, s.scheme, s.scheme, s.scheme, runtime.ObjectTyperToTyper(s.scheme), nil, []unversioned.GroupVersion{gv}) }