From 7e1089bb75328fef12c7a48a3e531f7a0d4b4c14 Mon Sep 17 00:00:00 2001 From: Clayton Coleman Date: Sat, 23 Apr 2016 14:57:39 -0400 Subject: [PATCH 1/4] Storage, not Storgage --- pkg/genericapiserver/resource_encoding_config.go | 4 ++-- pkg/genericapiserver/storage_factory.go | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/pkg/genericapiserver/resource_encoding_config.go b/pkg/genericapiserver/resource_encoding_config.go index 3430746c2cc..04a25d45fba 100644 --- a/pkg/genericapiserver/resource_encoding_config.go +++ b/pkg/genericapiserver/resource_encoding_config.go @@ -26,7 +26,7 @@ type ResourceEncodingConfig interface { // StorageEncoding returns the serialization format for the resource. // TODO this should actually return a GroupVersionKind since you can logically have multiple "matching" Kinds // For now, it returns just the GroupVersion for consistency with old behavior - StoragageEncodingFor(unversioned.GroupResource) (unversioned.GroupVersion, error) + StorageEncodingFor(unversioned.GroupResource) (unversioned.GroupVersion, error) // InMemoryEncodingFor returns the groupVersion for the in memory representation the storage should convert to. InMemoryEncodingFor(unversioned.GroupResource) (unversioned.GroupVersion, error) @@ -78,7 +78,7 @@ func (o *DefaultResourceEncodingConfig) SetResourceEncoding(resourceBeingStored o.Groups[group].InternalResourceEncodings[resourceBeingStored.Resource] = internalVersion } -func (o *DefaultResourceEncodingConfig) StoragageEncodingFor(resource unversioned.GroupResource) (unversioned.GroupVersion, error) { +func (o *DefaultResourceEncodingConfig) StorageEncodingFor(resource unversioned.GroupResource) (unversioned.GroupVersion, error) { groupMeta, err := registered.Group(resource.Group) if err != nil { return unversioned.GroupVersion{}, err diff --git a/pkg/genericapiserver/storage_factory.go b/pkg/genericapiserver/storage_factory.go index 4e1f4c97e97..e5086f14a66 100644 --- a/pkg/genericapiserver/storage_factory.go +++ b/pkg/genericapiserver/storage_factory.go @@ -174,7 +174,7 @@ func (s *DefaultStorageFactory) New(groupResource unversioned.GroupResource) (st config.ServerList = overriddenEtcdLocations } - storageEncodingVersion, err := s.ResourceEncodingConfig.StoragageEncodingFor(chosenStorageResource) + storageEncodingVersion, err := s.ResourceEncodingConfig.StorageEncodingFor(chosenStorageResource) if err != nil { return nil, err } From 4ad5565c4160cfefb92ba611fda93232bad7d6c6 Mon Sep 17 00:00:00 2001 From: Clayton Coleman Date: Sat, 23 Apr 2016 14:54:54 -0400 Subject: [PATCH 2/4] RecognizingDecoder didn't handle ambiguous input YAML is not guaranteed to be recognizable, so we need to bump JSON and protobuf above it in the decoding order. Add a unit test. --- pkg/api/serialization_proto_test.go | 23 +++++ pkg/runtime/serializer/json/json.go | 9 +- pkg/runtime/serializer/protobuf/protobuf.go | 6 -- .../serializer/recognizer/recognizer.go | 99 ++++++++++++++----- 4 files changed, 101 insertions(+), 36 deletions(-) diff --git a/pkg/api/serialization_proto_test.go b/pkg/api/serialization_proto_test.go index b3ed68c5ae4..ef9afb3e7e4 100644 --- a/pkg/api/serialization_proto_test.go +++ b/pkg/api/serialization_proto_test.go @@ -17,6 +17,7 @@ limitations under the License. package api_test import ( + "bytes" "encoding/hex" "math/rand" "testing" @@ -41,6 +42,28 @@ func init() { }) } +func TestUniversalDeserializer(t *testing.T) { + expected := &v1.Pod{ObjectMeta: v1.ObjectMeta{Name: "test"}} + d := api.Codecs.UniversalDeserializer() + for _, mediaType := range []string{"application/json", "application/yaml", "application/vnd.kubernetes.protobuf"} { + e, ok := api.Codecs.SerializerForMediaType(mediaType, nil) + if !ok { + t.Fatal(mediaType) + } + buf := &bytes.Buffer{} + if err := e.EncodeToStream(expected, buf); err != nil { + t.Fatalf("%s: %v", mediaType, err) + } + obj, _, err := d.Decode(buf.Bytes(), &unversioned.GroupVersionKind{Kind: "Pod", Version: "v1"}, nil) + if err != nil { + t.Fatalf("%s: %v", mediaType, err) + } + if !api.Semantic.DeepEqual(expected, obj) { + t.Fatalf("%s: %#v", mediaType, obj) + } + } +} + func TestProtobufRoundTrip(t *testing.T) { obj := &v1.Pod{} apitesting.FuzzerFor(t, v1.SchemeGroupVersion, rand.NewSource(benchmarkSeed)).Fuzz(obj) diff --git a/pkg/runtime/serializer/json/json.go b/pkg/runtime/serializer/json/json.go index 61f01215e04..2656d591b1f 100644 --- a/pkg/runtime/serializer/json/json.go +++ b/pkg/runtime/serializer/json/json.go @@ -186,12 +186,13 @@ func (s *Serializer) EncodeToStream(obj runtime.Object, w io.Writer, overrides . } // RecognizesData implements the RecognizingDecoder interface. -func (s *Serializer) RecognizesData(peek io.Reader) (bool, error) { - _, ok := utilyaml.GuessJSONStream(peek, 2048) +func (s *Serializer) RecognizesData(peek io.Reader) (ok, unknown bool, err error) { if s.yaml { - return !ok, nil + // we could potentially look for '---' + return false, true, nil } - return ok, nil + _, ok = utilyaml.GuessJSONStream(peek, 2048) + return ok, false, nil } // EncodesAsText returns true because both JSON and YAML are considered textual representations diff --git a/pkg/runtime/serializer/protobuf/protobuf.go b/pkg/runtime/serializer/protobuf/protobuf.go index 25ac1cdaaf5..aa72e16a7f1 100644 --- a/pkg/runtime/serializer/protobuf/protobuf.go +++ b/pkg/runtime/serializer/protobuf/protobuf.go @@ -419,12 +419,6 @@ func (s *RawSerializer) EncodeToStream(obj runtime.Object, w io.Writer, override } } -// RecognizesData implements the RecognizingDecoder interface - objects encoded with this serializer -// have no innate identifying information and so cannot be recognized. -func (s *RawSerializer) RecognizesData(peek io.Reader) (bool, error) { - return false, nil -} - var LengthDelimitedFramer = lengthDelimitedFramer{} type lengthDelimitedFramer struct{} diff --git a/pkg/runtime/serializer/recognizer/recognizer.go b/pkg/runtime/serializer/recognizer/recognizer.go index 14a2cb3e841..8fbda6c1a3b 100644 --- a/pkg/runtime/serializer/recognizer/recognizer.go +++ b/pkg/runtime/serializer/recognizer/recognizer.go @@ -27,51 +27,98 @@ import ( type RecognizingDecoder interface { runtime.Decoder - RecognizesData(peek io.Reader) (bool, error) + // RecognizesData should return true if the input provided in the provided reader + // belongs to this decoder, or an error if the data could not be read or is ambiguous. + // Unknown is true if the data could not be determined to match the decoder type. + // Decoders should assume that they can read as much of peek as they need (as the caller + // provides) and may return unknown if the data provided is not sufficient to make a + // a determination. When peek returns EOF that may mean the end of the input or the + // end of buffered input - recognizers should return the best guess at that time. + RecognizesData(peek io.Reader) (ok, unknown bool, err error) } +// NewDecoder creates a decoder that will attempt multiple decoders in an order defined +// by: +// +// 1. The decoder implements RecognizingDecoder and identifies the data +// 2. All other decoders, and any decoder that returned true for unknown. +// +// The order passed to the constructor is preserved within those priorities. func NewDecoder(decoders ...runtime.Decoder) runtime.Decoder { - recognizing, blind := []RecognizingDecoder{}, []runtime.Decoder{} - for _, d := range decoders { - if r, ok := d.(RecognizingDecoder); ok { - recognizing = append(recognizing, r) - } else { - blind = append(blind, d) - } - } return &decoder{ - recognizing: recognizing, - blind: blind, + decoders: decoders, } } type decoder struct { - recognizing []RecognizingDecoder - blind []runtime.Decoder + decoders []runtime.Decoder +} + +var _ RecognizingDecoder = &decoder{} + +func (d *decoder) RecognizesData(peek io.Reader) (bool, bool, error) { + var ( + lastErr error + anyUnknown bool + ) + data, _ := bufio.NewReaderSize(peek, 1024).Peek(1024) + for _, r := range d.decoders { + switch t := r.(type) { + case RecognizingDecoder: + ok, unknown, err := t.RecognizesData(bytes.NewBuffer(data)) + if err != nil { + lastErr = err + continue + } + anyUnknown = anyUnknown || unknown + if !ok { + continue + } + return true, false, nil + } + } + return false, anyUnknown, lastErr } func (d *decoder) Decode(data []byte, gvk *unversioned.GroupVersionKind, into runtime.Object) (runtime.Object, *unversioned.GroupVersionKind, error) { - var lastErr error - for _, r := range d.recognizing { - buf := bytes.NewBuffer(data) - ok, err := r.RecognizesData(buf) - if err != nil { - lastErr = err - continue + var ( + lastErr error + skipped []runtime.Decoder + ) + + // try recognizers, record any decoders we need to give a chance later + for _, r := range d.decoders { + switch t := r.(type) { + case RecognizingDecoder: + buf := bytes.NewBuffer(data) + ok, unknown, err := t.RecognizesData(buf) + if err != nil { + lastErr = err + continue + } + if unknown { + skipped = append(skipped, t) + continue + } + if !ok { + continue + } + return r.Decode(data, gvk, into) + default: + skipped = append(skipped, t) } - if !ok { - continue - } - return r.Decode(data, gvk, into) } - for _, d := range d.blind { - out, actual, err := d.Decode(data, gvk, into) + + // try recognizers that returned unknown or didn't recognize their data + for _, r := range skipped { + out, actual, err := r.Decode(data, gvk, into) if err != nil { lastErr = err continue } return out, actual, nil } + if lastErr == nil { lastErr = fmt.Errorf("no serialization format matched the provided data") } From 5622c8a471dfbffe5db4a6b5ab1bfa92e7135e74 Mon Sep 17 00:00:00 2001 From: Clayton Coleman Date: Sun, 24 Apr 2016 16:34:13 -0400 Subject: [PATCH 3/4] Generated files --- docs/admin/kube-apiserver.md | 2 +- pkg/api/unversioned/generated.proto | 1 + pkg/apis/extensions/v1beta1/generated.pb.go | 1 + pkg/apis/extensions/v1beta1/generated.proto | 1 + 4 files changed, 4 insertions(+), 1 deletion(-) diff --git a/docs/admin/kube-apiserver.md b/docs/admin/kube-apiserver.md index e4c0c5af6db..d5cdb7321ce 100644 --- a/docs/admin/kube-apiserver.md +++ b/docs/admin/kube-apiserver.md @@ -118,7 +118,7 @@ kube-apiserver --watch-cache-sizes=[]: List of watch cache sizes for every resource (pods, nodes, etc.), comma separated. The individual override format: resource#size, where size is a number. It takes effect when watch-cache is enabled. ``` -###### Auto generated by spf13/cobra on 28-Apr-2016 +###### Auto generated by spf13/cobra on 30-Apr-2016 diff --git a/pkg/api/unversioned/generated.proto b/pkg/api/unversioned/generated.proto index aaa42ec8415..def4a6d6f65 100644 --- a/pkg/api/unversioned/generated.proto +++ b/pkg/api/unversioned/generated.proto @@ -21,6 +21,7 @@ syntax = 'proto2'; package k8s.io.kubernetes.pkg.api.unversioned; +import "k8s.io/kubernetes/pkg/runtime/generated.proto"; import "k8s.io/kubernetes/pkg/util/intstr/generated.proto"; // Package-wide variables from generator "generated". diff --git a/pkg/apis/extensions/v1beta1/generated.pb.go b/pkg/apis/extensions/v1beta1/generated.pb.go index bcf1f28ec3d..7e231b95e4f 100644 --- a/pkg/apis/extensions/v1beta1/generated.pb.go +++ b/pkg/apis/extensions/v1beta1/generated.pb.go @@ -95,6 +95,7 @@ import math "math" import k8s_io_kubernetes_pkg_api_unversioned "k8s.io/kubernetes/pkg/api/unversioned" import k8s_io_kubernetes_pkg_api_v1 "k8s.io/kubernetes/pkg/api/v1" + import k8s_io_kubernetes_pkg_util_intstr "k8s.io/kubernetes/pkg/util/intstr" import io "io" diff --git a/pkg/apis/extensions/v1beta1/generated.proto b/pkg/apis/extensions/v1beta1/generated.proto index 79ed890c064..e47b5ff50af 100644 --- a/pkg/apis/extensions/v1beta1/generated.proto +++ b/pkg/apis/extensions/v1beta1/generated.proto @@ -24,6 +24,7 @@ package k8s.io.kubernetes.pkg.apis.extensions.v1beta1; import "k8s.io/kubernetes/pkg/api/resource/generated.proto"; import "k8s.io/kubernetes/pkg/api/unversioned/generated.proto"; import "k8s.io/kubernetes/pkg/api/v1/generated.proto"; +import "k8s.io/kubernetes/pkg/runtime/generated.proto"; import "k8s.io/kubernetes/pkg/util/intstr/generated.proto"; // Package-wide variables from generator "generated". From e0ebcf421654b34519cde63093075e572f4c8f90 Mon Sep 17 00:00:00 2001 From: Clayton Coleman Date: Sat, 23 Apr 2016 15:00:28 -0400 Subject: [PATCH 4/4] Split the storage and negotiation parts of Codecs The codec factory should support two distinct interfaces - negotiating for a serializer with a client, vs reading or writing data to a storage form (etcd, disk, etc). Make the EncodeForVersion and DecodeToVersion methods only take Encoder and Decoder, and slight refactoring elsewhere. In the storage factory, use a content type to control what serializer to pick, and use the universal deserializer. This ensures that storage can read JSON (which might be from older objects) while only writing protobuf. Add exceptions for those resources that may not be able to write to protobuf (specifically third party resources, but potentially others in the future). --- cmd/kube-apiserver/app/options/options.go | 3 + cmd/kube-apiserver/app/server.go | 4 +- docs/admin/kube-apiserver.md | 1 + examples/apiserver/apiserver.go | 2 +- .../app/options/options.go | 3 + .../cmd/federated-apiserver/app/server.go | 2 +- hack/test-cmd.sh | 2 + hack/verify-flags/known-flags.txt | 1 + pkg/api/serialization_proto_test.go | 2 +- pkg/api/serialization_test.go | 16 +-- pkg/api/testapi/testapi.go | 79 +++++++++++-- pkg/apiserver/apiserver.go | 17 ++- pkg/apiserver/negotiate.go | 24 ++-- pkg/apiserver/negotiate_test.go | 29 +++-- pkg/apiserver/watch.go | 30 ++--- pkg/apiserver/watch_test.go | 14 +-- pkg/client/restclient/client.go | 15 +-- pkg/client/restclient/client_test.go | 6 +- pkg/client/restclient/config_test.go | 4 +- pkg/client/restclient/request.go | 4 + pkg/client/restclient/request_test.go | 6 +- .../typed/discovery/discovery_client.go | 5 +- pkg/client/typed/dynamic/client.go | 7 +- pkg/client/unversioned/helper_test.go | 4 +- .../remotecommand/remotecommand_test.go | 2 +- pkg/genericapiserver/storage_factory.go | 106 +++++++++++++----- pkg/genericapiserver/storage_factory_test.go | 7 +- pkg/kubectl/resource/mapper.go | 2 +- pkg/master/master.go | 9 +- pkg/master/master_test.go | 2 +- pkg/registry/generic/registry/store_test.go | 2 +- pkg/registry/registrytest/etcd.go | 2 +- pkg/registry/thirdpartyresourcedata/codec.go | 64 +++++++---- .../thirdpartyresourcedata/codec_test.go | 7 +- pkg/runtime/codec.go | 25 +++++ pkg/runtime/deep_copy_generated.go | 30 +++++ pkg/runtime/interfaces.go | 61 ++++++++-- pkg/runtime/serializer/codec_factory.go | 77 +++++++++---- pkg/runtime/serializer/codec_test.go | 2 +- pkg/runtime/serializer/json/json.go | 8 -- pkg/runtime/serializer/negotiated_codec.go | 25 ++--- .../serializer/protobuf/protobuf_test.go | 12 ++ .../serializer/recognizer/recognizer.go | 1 + .../serializer/versioning/versioning.go | 2 + plugin/pkg/auth/authorizer/webhook/webhook.go | 6 +- test/integration/framework/master_utils.go | 19 ++-- test/integration/framework/serializer.go | 32 +++--- 47 files changed, 543 insertions(+), 240 deletions(-) diff --git a/cmd/kube-apiserver/app/options/options.go b/cmd/kube-apiserver/app/options/options.go index e6af7c7de92..b36dd3f8ed0 100644 --- a/cmd/kube-apiserver/app/options/options.go +++ b/cmd/kube-apiserver/app/options/options.go @@ -44,6 +44,7 @@ type APIServer struct { AuthorizationMode string AuthorizationConfig apiserver.AuthorizationConfig BasicAuthFile string + DefaultStorageMediaType string DeleteCollectionWorkers int DeprecatedStorageVersion string EtcdServersOverrides []string @@ -76,6 +77,7 @@ func NewAPIServer() *APIServer { ServerRunOptions: genericapiserver.NewServerRunOptions(), AdmissionControl: "AlwaysAdmit", AuthorizationMode: "AlwaysAllow", + DefaultStorageMediaType: "application/json", DeleteCollectionWorkers: 1, EventTTL: 1 * time.Hour, MasterServiceNamespace: api.NamespaceDefault, @@ -153,6 +155,7 @@ func (s *APIServer) AddFlags(fs *pflag.FlagSet) { "In the case where objects are moved from one group to the other, you may specify the format \"group1=group2/v1beta1,group3/v1beta1,...\". "+ "You only need to pass the groups you wish to change from the defaults. "+ "It defaults to a list of preferred versions of all registered groups, which is derived from the KUBE_API_VERSIONS environment variable.") + fs.StringVar(&s.DefaultStorageMediaType, "storage-media-type", s.DefaultStorageMediaType, "The media type to use to store objects in storage. Defaults to application/json. Some resources may only support a specific media type and will ignore this setting.") fs.DurationVar(&s.EventTTL, "event-ttl", s.EventTTL, "Amount of time to retain events. Default 1 hour.") fs.StringVar(&s.BasicAuthFile, "basic-auth-file", s.BasicAuthFile, "If set, the file that will be used to admit requests to the secure port of the API server via http basic authentication.") fs.StringVar(&s.TokenAuthFile, "token-auth-file", s.TokenAuthFile, "If set, the file that will be used to secure the secure port of the API server via token authentication.") diff --git a/cmd/kube-apiserver/app/server.go b/cmd/kube-apiserver/app/server.go index 32a26d4781f..955e073f96d 100644 --- a/cmd/kube-apiserver/app/server.go +++ b/cmd/kube-apiserver/app/server.go @@ -167,7 +167,9 @@ func Run(s *options.APIServer) error { resourceEncoding.SetVersionEncoding(group, storageEncodingVersion, unversioned.GroupVersion{Group: group, Version: runtime.APIVersionInternal}) } - storageFactory := genericapiserver.NewDefaultStorageFactory(s.StorageConfig, api.Codecs, resourceEncoding, apiResourceConfigSource) + storageFactory := genericapiserver.NewDefaultStorageFactory(s.StorageConfig, s.DefaultStorageMediaType, api.Codecs, resourceEncoding, apiResourceConfigSource) + // third party resources are always serialized to storage using JSON + storageFactory.SetSerializer(extensions.Resource("thirdpartyresources"), "application/json", nil) storageFactory.AddCohabitatingResources(batch.Resource("jobs"), extensions.Resource("jobs")) storageFactory.AddCohabitatingResources(autoscaling.Resource("horizontalpodautoscalers"), extensions.Resource("horizontalpodautoscalers")) for _, override := range s.EtcdServersOverrides { diff --git a/docs/admin/kube-apiserver.md b/docs/admin/kube-apiserver.md index d5cdb7321ce..b79e2b765c0 100644 --- a/docs/admin/kube-apiserver.md +++ b/docs/admin/kube-apiserver.md @@ -110,6 +110,7 @@ kube-apiserver --ssh-keyfile="": If non-empty, use secure SSH proxy to the nodes, using this user keyfile --ssh-user="": If non-empty, use secure SSH proxy to the nodes, using this user name --storage-backend="": The storage backend for persistence. Options: 'etcd2' (default), 'etcd3'. + --storage-media-type="application/json": The media type to use to store objects in storage. Defaults to application/json. Some resources may only support a specific media type and will ignore this setting. --storage-versions="apps/v1alpha1,authorization.k8s.io/v1beta1,autoscaling/v1,batch/v1,componentconfig/v1alpha1,extensions/v1beta1,metrics/v1alpha1,v1": The per-group version to store resources in. Specified in the format "group1/version1,group2/version2,...". In the case where objects are moved from one group to the other, you may specify the format "group1=group2/v1beta1,group3/v1beta1,...". You only need to pass the groups you wish to change from the defaults. It defaults to a list of preferred versions of all registered groups, which is derived from the KUBE_API_VERSIONS environment variable. --tls-cert-file="": File containing x509 Certificate for HTTPS. (CA cert, if any, concatenated after server cert). If HTTPS serving is enabled, and --tls-cert-file and --tls-private-key-file are not provided, a self-signed certificate and key are generated for the public address and saved to /var/run/kubernetes. --tls-private-key-file="": File containing x509 private key matching --tls-cert-file. diff --git a/examples/apiserver/apiserver.go b/examples/apiserver/apiserver.go index 9a18da3892f..7aeee2f6752 100644 --- a/examples/apiserver/apiserver.go +++ b/examples/apiserver/apiserver.go @@ -45,7 +45,7 @@ func newStorageFactory() genericapiserver.StorageFactory { Prefix: genericapiserver.DefaultEtcdPathPrefix, ServerList: []string{"http://127.0.0.1:4001"}, } - storageFactory := genericapiserver.NewDefaultStorageFactory(config, api.Codecs, genericapiserver.NewDefaultResourceEncodingConfig(), genericapiserver.NewResourceConfig()) + storageFactory := genericapiserver.NewDefaultStorageFactory(config, "application/json", api.Codecs, genericapiserver.NewDefaultResourceEncodingConfig(), genericapiserver.NewResourceConfig()) return storageFactory } diff --git a/federation/cmd/federated-apiserver/app/options/options.go b/federation/cmd/federated-apiserver/app/options/options.go index e6af7c7de92..b36dd3f8ed0 100644 --- a/federation/cmd/federated-apiserver/app/options/options.go +++ b/federation/cmd/federated-apiserver/app/options/options.go @@ -44,6 +44,7 @@ type APIServer struct { AuthorizationMode string AuthorizationConfig apiserver.AuthorizationConfig BasicAuthFile string + DefaultStorageMediaType string DeleteCollectionWorkers int DeprecatedStorageVersion string EtcdServersOverrides []string @@ -76,6 +77,7 @@ func NewAPIServer() *APIServer { ServerRunOptions: genericapiserver.NewServerRunOptions(), AdmissionControl: "AlwaysAdmit", AuthorizationMode: "AlwaysAllow", + DefaultStorageMediaType: "application/json", DeleteCollectionWorkers: 1, EventTTL: 1 * time.Hour, MasterServiceNamespace: api.NamespaceDefault, @@ -153,6 +155,7 @@ func (s *APIServer) AddFlags(fs *pflag.FlagSet) { "In the case where objects are moved from one group to the other, you may specify the format \"group1=group2/v1beta1,group3/v1beta1,...\". "+ "You only need to pass the groups you wish to change from the defaults. "+ "It defaults to a list of preferred versions of all registered groups, which is derived from the KUBE_API_VERSIONS environment variable.") + fs.StringVar(&s.DefaultStorageMediaType, "storage-media-type", s.DefaultStorageMediaType, "The media type to use to store objects in storage. Defaults to application/json. Some resources may only support a specific media type and will ignore this setting.") fs.DurationVar(&s.EventTTL, "event-ttl", s.EventTTL, "Amount of time to retain events. Default 1 hour.") fs.StringVar(&s.BasicAuthFile, "basic-auth-file", s.BasicAuthFile, "If set, the file that will be used to admit requests to the secure port of the API server via http basic authentication.") fs.StringVar(&s.TokenAuthFile, "token-auth-file", s.TokenAuthFile, "If set, the file that will be used to secure the secure port of the API server via token authentication.") diff --git a/federation/cmd/federated-apiserver/app/server.go b/federation/cmd/federated-apiserver/app/server.go index 0dc67da4271..41d66a1da78 100644 --- a/federation/cmd/federated-apiserver/app/server.go +++ b/federation/cmd/federated-apiserver/app/server.go @@ -76,7 +76,7 @@ func Run(s *options.APIServer) error { resourceEncoding.SetVersionEncoding(group, storageEncodingVersion, unversioned.GroupVersion{Group: group, Version: runtime.APIVersionInternal}) } - storageFactory := genericapiserver.NewDefaultStorageFactory(s.StorageConfig, api.Codecs, resourceEncoding, apiResourceConfigSource) + storageFactory := genericapiserver.NewDefaultStorageFactory(s.StorageConfig, s.DefaultStorageMediaType, api.Codecs, resourceEncoding, apiResourceConfigSource) for _, override := range s.EtcdServersOverrides { tokens := strings.Split(override, "#") if len(tokens) != 2 { diff --git a/hack/test-cmd.sh b/hack/test-cmd.sh index 31d7b098a11..61dd2119962 100755 --- a/hack/test-cmd.sh +++ b/hack/test-cmd.sh @@ -192,6 +192,7 @@ ADMISSION_CONTROL="NamespaceLifecycle,LimitRanger,ResourceQuota" --public-address-override="127.0.0.1" \ --kubelet-port=${KUBELET_PORT} \ --runtime-config=api/v1 \ + --storage-media-type="${KUBE_TEST_API_STORAGE_TYPE-}" \ --cert-dir="${TMPDIR:-/tmp/}" \ --service-cluster-ip-range="10.0.0.0/24" 1>&2 & APISERVER_PID=$! @@ -202,6 +203,7 @@ kube::util::wait_for_url "http://127.0.0.1:${API_PORT}/healthz" "apiserver" kube::log::status "Starting controller-manager" "${KUBE_OUTPUT_HOSTBIN}/kube-controller-manager" \ --port="${CTLRMGR_PORT}" \ + --kube-api-content-type="${KUBE_TEST_API_TYPE-}" \ --master="127.0.0.1:${API_PORT}" 1>&2 & CTLRMGR_PID=$! diff --git a/hack/verify-flags/known-flags.txt b/hack/verify-flags/known-flags.txt index 8ca24b50835..9cb1530c230 100644 --- a/hack/verify-flags/known-flags.txt +++ b/hack/verify-flags/known-flags.txt @@ -394,6 +394,7 @@ static-pods-config stats-port stop-services storage-backend +storage-media-type storage-version storage-versions streaming-connection-idle-timeout diff --git a/pkg/api/serialization_proto_test.go b/pkg/api/serialization_proto_test.go index ef9afb3e7e4..dd1c63585dd 100644 --- a/pkg/api/serialization_proto_test.go +++ b/pkg/api/serialization_proto_test.go @@ -38,7 +38,7 @@ import ( func init() { codecsToTest = append(codecsToTest, func(version unversioned.GroupVersion, item runtime.Object) (runtime.Codec, error) { s := protobuf.NewSerializer(api.Scheme, runtime.ObjectTyperToTyper(api.Scheme), "application/arbitrary.content.type") - return api.Codecs.CodecForVersions(s, testapi.ExternalGroupVersions(), nil), nil + return api.Codecs.CodecForVersions(s, s, testapi.ExternalGroupVersions(), nil), nil }) } diff --git a/pkg/api/serialization_test.go b/pkg/api/serialization_test.go index 8cdbf664d83..e52e965bd11 100644 --- a/pkg/api/serialization_test.go +++ b/pkg/api/serialization_test.go @@ -284,18 +284,14 @@ func TestObjectWatchFraming(t *testing.T) { converted, _ := api.Scheme.ConvertToVersion(secret, "v1") v1secret := converted.(*v1.Secret) for _, streamingMediaType := range api.Codecs.SupportedStreamingMediaTypes() { - s, framer, mediaType, _ := api.Codecs.StreamingSerializerForMediaType(streamingMediaType, nil) - // TODO: remove this when the runtime.SerializerInfo PR lands - if mediaType == "application/vnd.kubernetes.protobuf;stream=watch" { - mediaType = "application/vnd.kubernetes.protobuf" - } - embedded, ok := api.Codecs.SerializerForMediaType(mediaType, nil) - if !ok { - t.Logf("no embedded serializer for %s", mediaType) - embedded = s + s, _ := api.Codecs.StreamingSerializerForMediaType(streamingMediaType, nil) + framer := s.Framer + embedded := s.Embedded.Serializer + if embedded == nil { + t.Errorf("no embedded serializer for %s", streamingMediaType) + continue } innerDecode := api.Codecs.DecoderToVersion(embedded, api.SchemeGroupVersion) - //innerEncode := api.Codecs.EncoderForVersion(embedded, api.SchemeGroupVersion) // write a single object through the framer and back out obj := &bytes.Buffer{} diff --git a/pkg/api/testapi/testapi.go b/pkg/api/testapi/testapi.go index 8bec17e33e3..2f1b2509dde 100644 --- a/pkg/api/testapi/testapi.go +++ b/pkg/api/testapi/testapi.go @@ -19,6 +19,7 @@ package testapi import ( "fmt" + "mime" "os" "reflect" "strings" @@ -33,6 +34,7 @@ import ( "k8s.io/kubernetes/pkg/apis/batch" "k8s.io/kubernetes/pkg/apis/extensions" "k8s.io/kubernetes/pkg/runtime" + "k8s.io/kubernetes/pkg/runtime/serializer/recognizer" _ "k8s.io/kubernetes/federation/apis/federation/install" _ "k8s.io/kubernetes/pkg/api/install" @@ -45,14 +47,16 @@ import ( ) var ( - Groups = make(map[string]TestGroup) - Default TestGroup - Autoscaling TestGroup - Batch TestGroup - Extensions TestGroup - Apps TestGroup - Federation TestGroup - NegotiatedSerializer = api.Codecs + Groups = make(map[string]TestGroup) + Default TestGroup + Autoscaling TestGroup + Batch TestGroup + Extensions TestGroup + Apps TestGroup + Federation TestGroup + + serializer runtime.SerializerInfo + storageSerializer runtime.SerializerInfo ) type TestGroup struct { @@ -62,6 +66,30 @@ type TestGroup struct { } func init() { + if apiMediaType := os.Getenv("KUBE_TEST_API_TYPE"); len(apiMediaType) > 0 { + var ok bool + mediaType, options, err := mime.ParseMediaType(apiMediaType) + if err != nil { + panic(err) + } + serializer, ok = api.Codecs.SerializerForMediaType(mediaType, options) + if !ok { + panic(fmt.Sprintf("no serializer for %s", apiMediaType)) + } + } + + if storageMediaType := StorageMediaType(); len(storageMediaType) > 0 { + var ok bool + mediaType, options, err := mime.ParseMediaType(storageMediaType) + if err != nil { + panic(err) + } + storageSerializer, ok = api.Codecs.SerializerForMediaType(mediaType, options) + if !ok { + panic(fmt.Sprintf("no serializer for %s", storageMediaType)) + } + } + kubeTestAPI := os.Getenv("KUBE_TEST_API") if len(kubeTestAPI) != 0 { testGroupVersions := strings.Split(kubeTestAPI, ",") @@ -173,9 +201,40 @@ func (g TestGroup) InternalTypes() map[string]reflect.Type { } // Codec returns the codec for the API version to test against, as set by the -// KUBE_TEST_API env var. +// KUBE_TEST_API_TYPE env var. func (g TestGroup) Codec() runtime.Codec { - return api.Codecs.LegacyCodec(g.externalGroupVersion) + if serializer.Serializer == nil { + return api.Codecs.LegacyCodec(g.externalGroupVersion) + } + return api.Codecs.CodecForVersions(serializer, api.Codecs.UniversalDeserializer(), []unversioned.GroupVersion{g.externalGroupVersion}, nil) +} + +// NegotiatedSerializer returns the negotiated serializer for the server. +func (g TestGroup) NegotiatedSerializer() runtime.NegotiatedSerializer { + return api.Codecs +} + +func StorageMediaType() string { + return os.Getenv("KUBE_TEST_API_STORAGE_TYPE") +} + +// StorageCodec returns the codec for the API version to store in etcd, as set by the +// KUBE_TEST_API_STORAGE_TYPE env var. +func (g TestGroup) StorageCodec() runtime.Codec { + s := storageSerializer.Serializer + + if s == nil { + return api.Codecs.LegacyCodec(g.externalGroupVersion) + } + + // etcd2 only supports string data - we must wrap any result before returning + // TODO: remove for etcd3 / make parameterizable + if !storageSerializer.EncodesAsText { + s = runtime.NewBase64Serializer(s) + } + ds := recognizer.NewDecoder(s, api.Codecs.UniversalDeserializer()) + + return api.Codecs.CodecForVersions(s, ds, []unversioned.GroupVersion{g.externalGroupVersion}, nil) } // Converter returns the api.Scheme for the API version to test against, as set by the diff --git a/pkg/apiserver/apiserver.go b/pkg/apiserver/apiserver.go index e68aa851aa6..baa4d3ca1a5 100644 --- a/pkg/apiserver/apiserver.go +++ b/pkg/apiserver/apiserver.go @@ -274,9 +274,16 @@ type StripVersionNegotiatedSerializer struct { runtime.NegotiatedSerializer } -func (n StripVersionNegotiatedSerializer) EncoderForVersion(serializer runtime.Serializer, gv unversioned.GroupVersion) runtime.Encoder { - encoder := n.NegotiatedSerializer.EncoderForVersion(serializer, gv) - return stripVersionEncoder{encoder, serializer} +func (n StripVersionNegotiatedSerializer) EncoderForVersion(encoder runtime.Encoder, gv unversioned.GroupVersion) runtime.Encoder { + serializer, ok := encoder.(runtime.Serializer) + if !ok { + // The stripVersionEncoder needs both an encoder and decoder, but is called from a context that doesn't have access to the + // decoder. We do a best effort cast here (since this code path is only for backwards compatibility) to get access to the caller's + // decoder. + panic(fmt.Sprintf("Unable to extract serializer from %#v", encoder)) + } + versioned := n.NegotiatedSerializer.EncoderForVersion(encoder, gv) + return stripVersionEncoder{versioned, serializer} } func keepUnversioned(group string) bool { @@ -422,14 +429,14 @@ func write(statusCode int, gv unversioned.GroupVersion, s runtime.NegotiatedSeri // writeNegotiated renders an object in the content type negotiated by the client func writeNegotiated(s runtime.NegotiatedSerializer, gv unversioned.GroupVersion, w http.ResponseWriter, req *http.Request, statusCode int, object runtime.Object) { - serializer, contentType, err := negotiateOutputSerializer(req, s) + serializer, err := negotiateOutputSerializer(req, s) if err != nil { status := errToAPIStatus(err) writeRawJSON(int(status.Code), status, w) return } - w.Header().Set("Content-Type", contentType) + w.Header().Set("Content-Type", serializer.MediaType) w.WriteHeader(statusCode) encoder := s.EncoderForVersion(serializer, gv) diff --git a/pkg/apiserver/negotiate.go b/pkg/apiserver/negotiate.go index 3a5a1380663..a9fedfc9af6 100644 --- a/pkg/apiserver/negotiate.go +++ b/pkg/apiserver/negotiate.go @@ -50,31 +50,31 @@ func negotiateOutput(req *http.Request, supported []string) (string, map[string] return mediaType, accept.Params, nil } -func negotiateOutputSerializer(req *http.Request, ns runtime.NegotiatedSerializer) (runtime.Serializer, string, error) { +func negotiateOutputSerializer(req *http.Request, ns runtime.NegotiatedSerializer) (runtime.SerializerInfo, error) { supported := ns.SupportedMediaTypes() mediaType, params, err := negotiateOutput(req, supported) if err != nil { - return nil, "", err + return runtime.SerializerInfo{}, err } if s, ok := ns.SerializerForMediaType(mediaType, params); ok { - return s, mediaType, nil + return s, nil } - return nil, "", errNotAcceptable{supported} + return runtime.SerializerInfo{}, errNotAcceptable{supported} } -func negotiateOutputStreamSerializer(req *http.Request, ns runtime.NegotiatedSerializer) (runtime.Serializer, runtime.Framer, string, string, error) { +func negotiateOutputStreamSerializer(req *http.Request, ns runtime.NegotiatedSerializer) (runtime.StreamSerializerInfo, error) { supported := ns.SupportedMediaTypes() mediaType, params, err := negotiateOutput(req, supported) if err != nil { - return nil, nil, "", "", err + return runtime.StreamSerializerInfo{}, err } - if s, f, exactMediaType, ok := ns.StreamingSerializerForMediaType(mediaType, params); ok { - return s, f, mediaType, exactMediaType, nil + if s, ok := ns.StreamingSerializerForMediaType(mediaType, params); ok { + return s, nil } - return nil, nil, "", "", errNotAcceptable{supported} + return runtime.StreamSerializerInfo{}, errNotAcceptable{supported} } -func negotiateInputSerializer(req *http.Request, s runtime.NegotiatedSerializer) (runtime.Serializer, error) { +func negotiateInputSerializer(req *http.Request, s runtime.NegotiatedSerializer) (runtime.SerializerInfo, error) { supported := s.SupportedMediaTypes() mediaType := req.Header.Get("Content-Type") if len(mediaType) == 0 { @@ -82,11 +82,11 @@ func negotiateInputSerializer(req *http.Request, s runtime.NegotiatedSerializer) } mediaType, options, err := mime.ParseMediaType(mediaType) if err != nil { - return nil, errUnsupportedMediaType{supported} + return runtime.SerializerInfo{}, errUnsupportedMediaType{supported} } out, ok := s.SerializerForMediaType(mediaType, options) if !ok { - return nil, errUnsupportedMediaType{supported} + return runtime.SerializerInfo{}, errUnsupportedMediaType{supported} } return out, nil } diff --git a/pkg/apiserver/negotiate_test.go b/pkg/apiserver/negotiate_test.go index c37137aee07..87ad1a7eee9 100644 --- a/pkg/apiserver/negotiate_test.go +++ b/pkg/apiserver/negotiate_test.go @@ -41,27 +41,34 @@ func (n *fakeNegotiater) SupportedStreamingMediaTypes() []string { return n.streamTypes } -func (n *fakeNegotiater) SerializerForMediaType(mediaType string, options map[string]string) (runtime.Serializer, bool) { +func (n *fakeNegotiater) SerializerForMediaType(mediaType string, options map[string]string) (runtime.SerializerInfo, bool) { n.mediaType = mediaType if len(options) > 0 { n.options = options } - return n.serializer, n.serializer != nil + return runtime.SerializerInfo{Serializer: n.serializer, MediaType: n.mediaType, EncodesAsText: true}, n.serializer != nil } -func (n *fakeNegotiater) StreamingSerializerForMediaType(mediaType string, options map[string]string) (runtime.Serializer, runtime.Framer, string, bool) { +func (n *fakeNegotiater) StreamingSerializerForMediaType(mediaType string, options map[string]string) (runtime.StreamSerializerInfo, bool) { n.streamMediaType = mediaType if len(options) > 0 { n.streamOptions = options } - return n.streamSerializer, n.framer, mediaType, n.streamSerializer != nil + return runtime.StreamSerializerInfo{ + SerializerInfo: runtime.SerializerInfo{ + Serializer: n.serializer, + MediaType: mediaType, + EncodesAsText: true, + }, + Framer: n.framer, + }, n.streamSerializer != nil } -func (n *fakeNegotiater) EncoderForVersion(serializer runtime.Serializer, gv unversioned.GroupVersion) runtime.Encoder { +func (n *fakeNegotiater) EncoderForVersion(serializer runtime.Encoder, gv unversioned.GroupVersion) runtime.Encoder { return n.serializer } -func (n *fakeNegotiater) DecoderToVersion(serializer runtime.Serializer, gv unversioned.GroupVersion) runtime.Decoder { +func (n *fakeNegotiater) DecoderToVersion(serializer runtime.Decoder, gv unversioned.GroupVersion) runtime.Decoder { return n.serializer } @@ -228,7 +235,7 @@ func TestNegotiate(t *testing.T) { req = &http.Request{Header: http.Header{}} req.Header.Set("Accept", test.accept) } - s, contentType, err := negotiateOutputSerializer(req, test.ns) + s, err := negotiateOutputSerializer(req, test.ns) switch { case err == nil && test.errFn != nil: t.Errorf("%d: failed: expected error", i) @@ -251,11 +258,11 @@ func TestNegotiate(t *testing.T) { } continue } - if test.contentType != contentType { - t.Errorf("%d: unexpected %s %s", i, test.contentType, contentType) + if test.contentType != s.MediaType { + t.Errorf("%d: unexpected %s %s", i, test.contentType, s.MediaType) } - if s != test.serializer { - t.Errorf("%d: unexpected %s %s", i, test.serializer, s) + if s.Serializer != test.serializer { + t.Errorf("%d: unexpected %s %s", i, test.serializer, s.Serializer) } if !reflect.DeepEqual(test.params, test.ns.options) { t.Errorf("%d: unexpected %#v %#v", i, test.params, test.ns.options) diff --git a/pkg/apiserver/watch.go b/pkg/apiserver/watch.go index a383314c86c..8094ac9c006 100644 --- a/pkg/apiserver/watch.go +++ b/pkg/apiserver/watch.go @@ -59,47 +59,33 @@ func (w *realTimeoutFactory) TimeoutCh() (<-chan time.Time, func() bool) { return t.C, t.Stop } -type textEncodable interface { - // EncodesAsText should return true if objects should be transmitted as a WebSocket Text - // frame (otherwise, they will be sent as a Binary frame). - EncodesAsText() bool -} - // serveWatch handles serving requests to the server // TODO: the functionality in this method and in WatchServer.Serve is not cleanly decoupled. func serveWatch(watcher watch.Interface, scope RequestScope, req *restful.Request, res *restful.Response, timeout time.Duration) { // negotiate for the stream serializer - serializer, framer, mediaType, exactMediaType, err := negotiateOutputStreamSerializer(req.Request, scope.Serializer) + serializer, err := negotiateOutputStreamSerializer(req.Request, scope.Serializer) if err != nil { scope.err(err, res.ResponseWriter, req.Request) return } - if framer == nil { - scope.err(fmt.Errorf("no framer defined for %q available for embedded encoding", mediaType), res.ResponseWriter, req.Request) + if serializer.Framer == nil { + scope.err(fmt.Errorf("no framer defined for %q available for embedded encoding", serializer.MediaType), res.ResponseWriter, req.Request) return } - encoder := scope.Serializer.EncoderForVersion(serializer, scope.Kind.GroupVersion()) + encoder := scope.Serializer.EncoderForVersion(serializer.Serializer, scope.Kind.GroupVersion()) - useTextFraming := false - if encodable, ok := serializer.(textEncodable); ok && encodable.EncodesAsText() { - useTextFraming = true - } + useTextFraming := serializer.EncodesAsText // find the embedded serializer matching the media type - embeddedSerializer, ok := scope.Serializer.SerializerForMediaType(mediaType, nil) - if !ok { - scope.err(fmt.Errorf("no serializer defined for %q available for embedded encoding", mediaType), res.ResponseWriter, req.Request) - return - } - embeddedEncoder := scope.Serializer.EncoderForVersion(embeddedSerializer, scope.Kind.GroupVersion()) + embeddedEncoder := scope.Serializer.EncoderForVersion(serializer.Embedded.Serializer, scope.Kind.GroupVersion()) server := &WatchServer{ watching: watcher, scope: scope, useTextFraming: useTextFraming, - mediaType: exactMediaType, - framer: framer, + mediaType: serializer.MediaType, + framer: serializer.Framer, encoder: encoder, embeddedEncoder: embeddedEncoder, fixup: func(obj runtime.Object) { diff --git a/pkg/apiserver/watch_test.go b/pkg/apiserver/watch_test.go index 630cc84dc4e..1b25771373f 100644 --- a/pkg/apiserver/watch_test.go +++ b/pkg/apiserver/watch_test.go @@ -222,9 +222,9 @@ func TestWatchRead(t *testing.T) { for _, protocol := range protocols { for _, test := range testCases { - serializer, framer, _, ok := api.Codecs.StreamingSerializerForMediaType(test.MediaType, nil) + serializer, ok := api.Codecs.StreamingSerializerForMediaType(test.MediaType, nil) if !ok { - t.Fatal(framer) + t.Fatal(serializer) } r, contentType := protocol.fn(test.Accept) @@ -235,13 +235,13 @@ func TestWatchRead(t *testing.T) { } objectSerializer, ok := api.Codecs.SerializerForMediaType(test.MediaType, nil) if !ok { - t.Fatal(framer) + t.Fatal(objectSerializer) } objectCodec := api.Codecs.DecoderToVersion(objectSerializer, testInternalGroupVersion) var fr io.ReadCloser = r if !protocol.selfFraming { - fr = framer.NewFrameReader(r) + fr = serializer.Framer.NewFrameReader(r) } d := streaming.NewDecoder(fr, serializer) @@ -495,9 +495,9 @@ func TestWatchHTTPTimeout(t *testing.T) { timeoutCh := make(chan time.Time) done := make(chan struct{}) - _, framer, _, ok := api.Codecs.StreamingSerializerForMediaType("application/json", nil) + serializer, ok := api.Codecs.StreamingSerializerForMediaType("application/json", nil) if !ok { - t.Fatal(framer) + t.Fatal(serializer) } // Setup a new watchserver @@ -505,7 +505,7 @@ func TestWatchHTTPTimeout(t *testing.T) { watching: watcher, mediaType: "testcase/json", - framer: framer, + framer: serializer.Framer, encoder: newCodec, embeddedEncoder: newCodec, diff --git a/pkg/client/restclient/client.go b/pkg/client/restclient/client.go index d14347a0b4c..782797ad767 100644 --- a/pkg/client/restclient/client.go +++ b/pkg/client/restclient/client.go @@ -139,26 +139,23 @@ func readExpBackoffConfig() BackoffManager { func createSerializers(config ContentConfig) (*Serializers, error) { negotiated := config.NegotiatedSerializer contentType := config.ContentType - serializer, ok := negotiated.SerializerForMediaType(contentType, nil) + info, ok := negotiated.SerializerForMediaType(contentType, nil) if !ok { return nil, fmt.Errorf("serializer for %s not registered", contentType) } - streamingSerializer, framer, _, ok := negotiated.StreamingSerializerForMediaType(contentType, nil) + streamInfo, ok := negotiated.StreamingSerializerForMediaType(contentType, nil) if !ok { return nil, fmt.Errorf("streaming serializer for %s not registered", contentType) } - if framer == nil { - return nil, fmt.Errorf("no framer for %s", contentType) - } internalGV := unversioned.GroupVersion{ Group: config.GroupVersion.Group, Version: runtime.APIVersionInternal, } return &Serializers{ - Encoder: negotiated.EncoderForVersion(serializer, *config.GroupVersion), - Decoder: negotiated.DecoderToVersion(serializer, internalGV), - StreamingSerializer: streamingSerializer, - Framer: framer, + Encoder: negotiated.EncoderForVersion(info.Serializer, *config.GroupVersion), + Decoder: negotiated.DecoderToVersion(info.Serializer, internalGV), + StreamingSerializer: streamInfo.Serializer, + Framer: streamInfo.Framer, }, nil } diff --git a/pkg/client/restclient/client_test.go b/pkg/client/restclient/client_test.go index 1fd2c603c95..e1cc1f9fae8 100644 --- a/pkg/client/restclient/client_test.go +++ b/pkg/client/restclient/client_test.go @@ -47,7 +47,7 @@ func TestDoRequestSuccess(t *testing.T) { Host: testServer.URL, ContentConfig: ContentConfig{ GroupVersion: testapi.Default.GroupVersion(), - NegotiatedSerializer: testapi.NegotiatedSerializer, + NegotiatedSerializer: testapi.Default.NegotiatedSerializer(), }, Username: "user", Password: "pass", @@ -92,7 +92,7 @@ func TestDoRequestFailed(t *testing.T) { Host: testServer.URL, ContentConfig: ContentConfig{ GroupVersion: testapi.Default.GroupVersion(), - NegotiatedSerializer: testapi.NegotiatedSerializer, + NegotiatedSerializer: testapi.Default.NegotiatedSerializer(), }, }) if err != nil { @@ -130,7 +130,7 @@ func TestDoRequestCreated(t *testing.T) { Host: testServer.URL, ContentConfig: ContentConfig{ GroupVersion: testapi.Default.GroupVersion(), - NegotiatedSerializer: testapi.NegotiatedSerializer, + NegotiatedSerializer: testapi.Default.NegotiatedSerializer(), }, Username: "user", Password: "pass", diff --git a/pkg/client/restclient/config_test.go b/pkg/client/restclient/config_test.go index 029e64253ba..a9c71e19d81 100644 --- a/pkg/client/restclient/config_test.go +++ b/pkg/client/restclient/config_test.go @@ -87,13 +87,13 @@ func TestSetKubernetesDefaultsUserAgent(t *testing.T) { } func TestRESTClientRequires(t *testing.T) { - if _, err := RESTClientFor(&Config{Host: "127.0.0.1", ContentConfig: ContentConfig{NegotiatedSerializer: testapi.NegotiatedSerializer}}); err == nil { + if _, err := RESTClientFor(&Config{Host: "127.0.0.1", ContentConfig: ContentConfig{NegotiatedSerializer: testapi.Default.NegotiatedSerializer()}}); err == nil { t.Errorf("unexpected non-error") } if _, err := RESTClientFor(&Config{Host: "127.0.0.1", ContentConfig: ContentConfig{GroupVersion: testapi.Default.GroupVersion()}}); err == nil { t.Errorf("unexpected non-error") } - if _, err := RESTClientFor(&Config{Host: "127.0.0.1", ContentConfig: ContentConfig{GroupVersion: testapi.Default.GroupVersion(), NegotiatedSerializer: testapi.NegotiatedSerializer}}); err != nil { + if _, err := RESTClientFor(&Config{Host: "127.0.0.1", ContentConfig: ContentConfig{GroupVersion: testapi.Default.GroupVersion(), NegotiatedSerializer: testapi.Default.NegotiatedSerializer()}}); err != nil { t.Errorf("unexpected error: %v", err) } } diff --git a/pkg/client/restclient/request.go b/pkg/client/restclient/request.go index c29f6e4eedd..83b93f816e5 100644 --- a/pkg/client/restclient/request.go +++ b/pkg/client/restclient/request.go @@ -639,6 +639,10 @@ func (r *Request) Watch() (watch.Interface, error) { if r.err != nil { return nil, r.err } + if r.serializers.Framer == nil { + return nil, fmt.Errorf("watching resources is not possible with this client (content-type: %s)", r.content.ContentType) + } + url := r.URL().String() req, err := http.NewRequest(r.verb, url, r.body) if err != nil { diff --git a/pkg/client/restclient/request_test.go b/pkg/client/restclient/request_test.go index 3b1a67a0cfe..b7cb6a2ac01 100644 --- a/pkg/client/restclient/request_test.go +++ b/pkg/client/restclient/request_test.go @@ -247,7 +247,7 @@ func defaultContentConfig() ContentConfig { return ContentConfig{ GroupVersion: testapi.Default.GroupVersion(), Codec: testapi.Default.Codec(), - NegotiatedSerializer: testapi.NegotiatedSerializer, + NegotiatedSerializer: testapi.Default.NegotiatedSerializer(), } } @@ -549,6 +549,7 @@ func TestRequestWatch(t *testing.T) { }, { Request: &Request{ + serializers: defaultSerializers(), client: clientFunc(func(req *http.Request) (*http.Response, error) { return nil, io.EOF }), @@ -558,6 +559,7 @@ func TestRequestWatch(t *testing.T) { }, { Request: &Request{ + serializers: defaultSerializers(), client: clientFunc(func(req *http.Request) (*http.Response, error) { return nil, &url.Error{Err: io.EOF} }), @@ -567,6 +569,7 @@ func TestRequestWatch(t *testing.T) { }, { Request: &Request{ + serializers: defaultSerializers(), client: clientFunc(func(req *http.Request) (*http.Response, error) { return nil, errors.New("http: can't write HTTP request on broken connection") }), @@ -576,6 +579,7 @@ func TestRequestWatch(t *testing.T) { }, { Request: &Request{ + serializers: defaultSerializers(), client: clientFunc(func(req *http.Request) (*http.Response, error) { return nil, errors.New("foo: connection reset by peer") }), diff --git a/pkg/client/typed/discovery/discovery_client.go b/pkg/client/typed/discovery/discovery_client.go index 6c0a4039b34..4a0f0b91ac3 100644 --- a/pkg/client/typed/discovery/discovery_client.go +++ b/pkg/client/typed/discovery/discovery_client.go @@ -215,7 +215,10 @@ func setDiscoveryDefaults(config *restclient.Config) error { config.APIPath = "" config.GroupVersion = nil codec := runtime.NoopEncoder{Decoder: api.Codecs.UniversalDecoder()} - config.NegotiatedSerializer = serializer.NegotiatedSerializerWrapper(codec, codec, runtime.DefaultFramer) + config.NegotiatedSerializer = serializer.NegotiatedSerializerWrapper( + runtime.SerializerInfo{Serializer: codec}, + runtime.StreamSerializerInfo{}, + ) if len(config.UserAgent) == 0 { config.UserAgent = restclient.DefaultKubernetesUserAgent() } diff --git a/pkg/client/typed/dynamic/client.go b/pkg/client/typed/dynamic/client.go index f542cef726c..b2e6d228917 100644 --- a/pkg/client/typed/dynamic/client.go +++ b/pkg/client/typed/dynamic/client.go @@ -33,7 +33,6 @@ import ( "k8s.io/kubernetes/pkg/conversion/queryparams" "k8s.io/kubernetes/pkg/runtime" "k8s.io/kubernetes/pkg/runtime/serializer" - serializerjson "k8s.io/kubernetes/pkg/runtime/serializer/json" "k8s.io/kubernetes/pkg/watch" ) @@ -51,8 +50,10 @@ func NewClient(conf *restclient.Config) (*Client, error) { conf = &confCopy codec := dynamicCodec{} - legacyCodec := api.Codecs.LegacyCodec(v1.SchemeGroupVersion) - conf.NegotiatedSerializer = serializer.NegotiatedSerializerWrapper(codec, legacyCodec, serializerjson.Framer) + + // TODO: it's questionable that this should be using anything other than unstructured schema and JSON + streamingInfo, _ := api.Codecs.StreamingSerializerForMediaType("application/json;stream=watch", nil) + conf.NegotiatedSerializer = serializer.NegotiatedSerializerWrapper(runtime.SerializerInfo{Serializer: codec}, streamingInfo) if conf.APIPath == "" { conf.APIPath = "/api" diff --git a/pkg/client/unversioned/helper_test.go b/pkg/client/unversioned/helper_test.go index 80660a6d056..0e186e4c78e 100644 --- a/pkg/client/unversioned/helper_test.go +++ b/pkg/client/unversioned/helper_test.go @@ -42,7 +42,7 @@ func TestSetKubernetesDefaults(t *testing.T) { ContentConfig: restclient.ContentConfig{ GroupVersion: testapi.Default.GroupVersion(), Codec: testapi.Default.Codec(), - NegotiatedSerializer: testapi.NegotiatedSerializer, + NegotiatedSerializer: testapi.Default.NegotiatedSerializer(), }, QPS: 5, Burst: 10, @@ -126,7 +126,7 @@ func TestHelperGetServerAPIVersions(t *testing.T) { w.Write(output) })) defer server.Close() - got, err := restclient.ServerAPIVersions(&restclient.Config{Host: server.URL, ContentConfig: restclient.ContentConfig{GroupVersion: &unversioned.GroupVersion{Group: "invalid version", Version: "one"}, NegotiatedSerializer: testapi.NegotiatedSerializer}}) + got, err := restclient.ServerAPIVersions(&restclient.Config{Host: server.URL, ContentConfig: restclient.ContentConfig{GroupVersion: &unversioned.GroupVersion{Group: "invalid version", Version: "one"}, NegotiatedSerializer: testapi.Default.NegotiatedSerializer()}}) if err != nil { t.Fatalf("unexpected encoding error: %v", err) } diff --git a/pkg/client/unversioned/remotecommand/remotecommand_test.go b/pkg/client/unversioned/remotecommand/remotecommand_test.go index 3c68fe5d9f6..f231a7f4926 100644 --- a/pkg/client/unversioned/remotecommand/remotecommand_test.go +++ b/pkg/client/unversioned/remotecommand/remotecommand_test.go @@ -215,7 +215,7 @@ func TestStream(t *testing.T) { url, _ := url.ParseRequestURI(server.URL) config := restclient.ContentConfig{ GroupVersion: &unversioned.GroupVersion{Group: "x"}, - NegotiatedSerializer: testapi.NegotiatedSerializer, + NegotiatedSerializer: testapi.Default.NegotiatedSerializer(), } c, err := restclient.NewRESTClient(url, "", config, -1, -1, nil, nil) if err != nil { diff --git a/pkg/genericapiserver/storage_factory.go b/pkg/genericapiserver/storage_factory.go index e5086f14a66..45489c33074 100644 --- a/pkg/genericapiserver/storage_factory.go +++ b/pkg/genericapiserver/storage_factory.go @@ -18,9 +18,11 @@ package genericapiserver import ( "fmt" + "mime" "k8s.io/kubernetes/pkg/api/unversioned" "k8s.io/kubernetes/pkg/runtime" + "k8s.io/kubernetes/pkg/runtime/serializer/recognizer" "k8s.io/kubernetes/pkg/runtime/serializer/versioning" "k8s.io/kubernetes/pkg/storage" "k8s.io/kubernetes/pkg/storage/storagebackend" @@ -50,19 +52,25 @@ type DefaultStorageFactory struct { Overrides map[unversioned.GroupResource]groupResourceOverrides + // DefaultMediaType is the media type used to store resources. If it is not set, "application/json" is used. + DefaultMediaType string + // DefaultSerializer is used to create encoders and decoders for the storage.Interface. - DefaultSerializer runtime.NegotiatedSerializer + DefaultSerializer runtime.StorageSerializer // ResourceEncodingConfig describes how to encode a particular GroupVersionResource ResourceEncodingConfig ResourceEncodingConfig // APIResourceConfigSource indicates whether the *storage* is enabled, NOT the API - // This is discrete from resource enablement because those are separate concerns. How it is surfaced to the user via flags - // or config is up to whoever is building this. + // This is discrete from resource enablement because those are separate concerns. How this source is configured + // is left to the caller. APIResourceConfigSource APIResourceConfigSource - // newEtcdFn exists to be overwritten for unit testing. You should never set this in a normal world. - newEtcdFn func(ns runtime.NegotiatedSerializer, storageVersion, memoryVersion unversioned.GroupVersion, config storagebackend.Config) (etcdStorage storage.Interface, err error) + // newStorageCodecFn exists to be overwritten for unit testing. + newStorageCodecFn func(storageMediaType string, ns runtime.StorageSerializer, storageVersion, memoryVersion unversioned.GroupVersion, config storagebackend.Config) (codec runtime.Codec, err error) + + // newStorageFn exists to be overwritten for unit testing. + newStorageFn func(config storagebackend.Config) (etcdStorage storage.Interface, err error) } type groupResourceOverrides struct { @@ -71,8 +79,10 @@ type groupResourceOverrides struct { etcdLocation []string // etcdPrefix contains the list of "special" prefixes for a GroupResource. Resource=* means for the entire group etcdPrefix string + // mediaType is the desired serializer to choose. If empty, the default is chosen. + mediaType string // serializer contains the list of "special" serializers for a GroupResource. Resource=* means for the entire group - serializer runtime.NegotiatedSerializer + serializer runtime.StorageSerializer // cohabitatingResources keeps track of which resources must be stored together. This happens when we have multiple ways // of exposing one set of concepts. autoscaling.HPA and extensions.HPA as a for instance // The order of the slice matters! It is the priority order of lookup for finding a storage location @@ -83,15 +93,20 @@ var _ StorageFactory = &DefaultStorageFactory{} const AllResources = "*" -func NewDefaultStorageFactory(config storagebackend.Config, defaultSerializer runtime.NegotiatedSerializer, resourceEncodingConfig ResourceEncodingConfig, resourceConfig APIResourceConfigSource) *DefaultStorageFactory { +func NewDefaultStorageFactory(config storagebackend.Config, defaultMediaType string, defaultSerializer runtime.StorageSerializer, resourceEncodingConfig ResourceEncodingConfig, resourceConfig APIResourceConfigSource) *DefaultStorageFactory { + if len(defaultMediaType) == 0 { + defaultMediaType = runtime.ContentTypeJSON + } return &DefaultStorageFactory{ StorageConfig: config, Overrides: map[unversioned.GroupResource]groupResourceOverrides{}, + DefaultMediaType: defaultMediaType, DefaultSerializer: defaultSerializer, ResourceEncodingConfig: resourceEncodingConfig, APIResourceConfigSource: resourceConfig, - newEtcdFn: newEtcd, + newStorageCodecFn: NewStorageCodec, + newStorageFn: newStorage, } } @@ -107,8 +122,9 @@ func (s *DefaultStorageFactory) SetEtcdPrefix(groupResource unversioned.GroupRes s.Overrides[groupResource] = overrides } -func (s *DefaultStorageFactory) SetSerializer(groupResource unversioned.GroupResource, serializer runtime.NegotiatedSerializer) { +func (s *DefaultStorageFactory) SetSerializer(groupResource unversioned.GroupResource, mediaType string, serializer runtime.StorageSerializer) { overrides := s.Overrides[groupResource] + overrides.mediaType = mediaType overrides.serializer = serializer s.Overrides[groupResource] = overrides } @@ -160,6 +176,14 @@ func (s *DefaultStorageFactory) New(groupResource unversioned.GroupResource) (st etcdPrefix = exactResourceOverride.etcdPrefix } + etcdMediaType := s.DefaultMediaType + if len(groupOverride.mediaType) != 0 { + etcdMediaType = groupOverride.mediaType + } + if len(exactResourceOverride.mediaType) != 0 { + etcdMediaType = exactResourceOverride.mediaType + } + etcdSerializer := s.DefaultSerializer if groupOverride.serializer != nil { etcdSerializer = groupOverride.serializer @@ -183,27 +207,19 @@ func (s *DefaultStorageFactory) New(groupResource unversioned.GroupResource) (st return nil, err } + codec, err := s.newStorageCodecFn(etcdMediaType, etcdSerializer, storageEncodingVersion, internalVersion, config) + if err != nil { + return nil, err + } + + config.Codec = codec + glog.V(3).Infof("storing %v in %v, reading as %v from %v", groupResource, storageEncodingVersion, internalVersion, config) - return s.newEtcdFn(etcdSerializer, storageEncodingVersion, internalVersion, config) + return s.newStorageFn(config) } -func newEtcd(ns runtime.NegotiatedSerializer, storageVersion, memoryVersion unversioned.GroupVersion, config storagebackend.Config) (etcdStorage storage.Interface, err error) { - s, ok := ns.SerializerForMediaType("application/json", nil) - if !ok { - return nil, fmt.Errorf("unable to find serializer for JSON") - } - encoder := ns.EncoderForVersion(s, storageVersion) - decoder := ns.DecoderToVersion(s, memoryVersion) - if memoryVersion.Group != storageVersion.Group { - // Allow this codec to translate between groups. - if err = versioning.EnableCrossGroupEncoding(encoder, memoryVersion.Group, storageVersion.Group); err != nil { - return nil, fmt.Errorf("error setting up encoder from %v to %v: %v", memoryVersion, storageVersion, err) - } - if err = versioning.EnableCrossGroupDecoding(decoder, storageVersion.Group, memoryVersion.Group); err != nil { - return nil, fmt.Errorf("error setting up decoder from %v to %v: %v", storageVersion, memoryVersion, err) - } - } - config.Codec = runtime.NewCodec(encoder, decoder) +// newStorage is the default implementation for creating a storage backend. +func newStorage(config storagebackend.Config) (etcdStorage storage.Interface, err error) { return storagebackend.Create(config) } @@ -217,3 +233,39 @@ func (s *DefaultStorageFactory) Backends() []string { } return backends.List() } + +// NewStorageCodec assembles a storage codec for the provided storage media type, the provided serializer, and the requested +// storage and memory versions. +func NewStorageCodec(storageMediaType string, ns runtime.StorageSerializer, storageVersion, memoryVersion unversioned.GroupVersion, config storagebackend.Config) (runtime.Codec, error) { + mediaType, options, err := mime.ParseMediaType(storageMediaType) + if err != nil { + return nil, fmt.Errorf("%q is not a valid mime-type", storageMediaType) + } + serializer, ok := ns.SerializerForMediaType(mediaType, options) + if !ok { + return nil, fmt.Errorf("unable to find serializer for %q", storageMediaType) + } + + s := serializer.Serializer + + // etcd2 only supports string data - we must wrap any result before returning + // TODO: storagebackend should return a boolean indicating whether it supports binary data + if !serializer.EncodesAsText && (config.Type == storagebackend.StorageTypeUnset || config.Type == storagebackend.StorageTypeETCD2) { + glog.V(4).Infof("Wrapping the underlying binary storage serializer with a base64 encoding for etcd2") + s = runtime.NewBase64Serializer(s) + } + + ds := recognizer.NewDecoder(s, ns.UniversalDeserializer()) + encoder := ns.EncoderForVersion(s, storageVersion) + decoder := ns.DecoderToVersion(ds, memoryVersion) + if memoryVersion.Group != storageVersion.Group { + // Allow this codec to translate between groups. + if err := versioning.EnableCrossGroupEncoding(encoder, memoryVersion.Group, storageVersion.Group); err != nil { + return nil, fmt.Errorf("error setting up encoder from %v to %v: %v", memoryVersion, storageVersion, err) + } + if err := versioning.EnableCrossGroupDecoding(decoder, storageVersion.Group, memoryVersion.Group); err != nil { + return nil, fmt.Errorf("error setting up decoder from %v to %v: %v", storageVersion, memoryVersion, err) + } + } + return runtime.NewCodec(encoder, decoder), nil +} diff --git a/pkg/genericapiserver/storage_factory_test.go b/pkg/genericapiserver/storage_factory_test.go index 1f727fb086e..9e8be61f238 100644 --- a/pkg/genericapiserver/storage_factory_test.go +++ b/pkg/genericapiserver/storage_factory_test.go @@ -23,7 +23,6 @@ import ( "k8s.io/kubernetes/pkg/api" "k8s.io/kubernetes/pkg/api/unversioned" "k8s.io/kubernetes/pkg/apis/extensions" - "k8s.io/kubernetes/pkg/runtime" "k8s.io/kubernetes/pkg/storage" "k8s.io/kubernetes/pkg/storage/storagebackend" ) @@ -50,7 +49,7 @@ func TestUpdateEtcdOverrides(t *testing.T) { defaultEtcdLocation := []string{"http://127.0.0.1"} for i, test := range testCases { actualConfig := storagebackend.Config{} - newEtcdFn := func(ns runtime.NegotiatedSerializer, storageVersion, memoryVersion unversioned.GroupVersion, config storagebackend.Config) (etcdStorage storage.Interface, err error) { + newStorageFn := func(config storagebackend.Config) (_ storage.Interface, err error) { actualConfig = config return nil, nil } @@ -59,8 +58,8 @@ func TestUpdateEtcdOverrides(t *testing.T) { Prefix: DefaultEtcdPathPrefix, ServerList: defaultEtcdLocation, } - storageFactory := NewDefaultStorageFactory(defaultConfig, api.Codecs, NewDefaultResourceEncodingConfig(), NewResourceConfig()) - storageFactory.newEtcdFn = newEtcdFn + storageFactory := NewDefaultStorageFactory(defaultConfig, "", api.Codecs, NewDefaultResourceEncodingConfig(), NewResourceConfig()) + storageFactory.newStorageFn = newStorageFn storageFactory.SetEtcdLocation(test.resource, test.servers) var err error diff --git a/pkg/kubectl/resource/mapper.go b/pkg/kubectl/resource/mapper.go index 27e85043b0e..8e8cc2726e9 100644 --- a/pkg/kubectl/resource/mapper.go +++ b/pkg/kubectl/resource/mapper.go @@ -55,7 +55,7 @@ func (m *Mapper) InfoForData(data []byte, source string) (*Info, error) { var obj runtime.Object var versioned runtime.Object if registered.IsThirdPartyAPIGroupVersion(gvk.GroupVersion()) { - obj, err = runtime.Decode(thirdpartyresourcedata.NewCodec(nil, gvk.Kind), data) + obj, err = runtime.Decode(thirdpartyresourcedata.NewDecoder(nil, gvk.Kind), data) versioned = obj } else { obj, versioned = versions.Last(), versions.First() diff --git a/pkg/master/master.go b/pkg/master/master.go index 5f657cfb09f..4158bb10a41 100644 --- a/pkg/master/master.go +++ b/pkg/master/master.go @@ -677,7 +677,14 @@ func (m *Master) InstallThirdPartyResource(rsrc *extensions.ThirdPartyResource) func (m *Master) thirdpartyapi(group, kind, version string) *apiserver.APIGroupVersion { resourceStorage := thirdpartyresourcedataetcd.NewREST( - generic.RESTOptions{Storage: m.thirdPartyStorage, Decorator: generic.UndecoratedStorage, DeleteCollectionWorkers: m.deleteCollectionWorkers}, group, kind) + generic.RESTOptions{ + Storage: m.thirdPartyStorage, + Decorator: generic.UndecoratedStorage, + DeleteCollectionWorkers: m.deleteCollectionWorkers, + }, + group, + kind, + ) apiRoot := makeThirdPartyPath("") diff --git a/pkg/master/master_test.go b/pkg/master/master_test.go index 17f735fd7bd..8c4a4f34dc3 100644 --- a/pkg/master/master_test.go +++ b/pkg/master/master_test.go @@ -88,7 +88,7 @@ func setUp(t *testing.T) (*Master, *etcdtesting.EtcdTestServer, Config, *assert. resourceEncoding.SetVersionEncoding(batch.GroupName, *testapi.Batch.GroupVersion(), unversioned.GroupVersion{Group: batch.GroupName, Version: runtime.APIVersionInternal}) resourceEncoding.SetVersionEncoding(apps.GroupName, *testapi.Apps.GroupVersion(), unversioned.GroupVersion{Group: apps.GroupName, Version: runtime.APIVersionInternal}) resourceEncoding.SetVersionEncoding(extensions.GroupName, *testapi.Extensions.GroupVersion(), unversioned.GroupVersion{Group: extensions.GroupName, Version: runtime.APIVersionInternal}) - storageFactory := genericapiserver.NewDefaultStorageFactory(storageConfig, api.Codecs, resourceEncoding, DefaultAPIResourceConfigSource()) + storageFactory := genericapiserver.NewDefaultStorageFactory(storageConfig, testapi.StorageMediaType(), api.Codecs, resourceEncoding, DefaultAPIResourceConfigSource()) config.StorageFactory = storageFactory config.APIResourceConfigSource = DefaultAPIResourceConfigSource() diff --git a/pkg/registry/generic/registry/store_test.go b/pkg/registry/generic/registry/store_test.go index 04f36b31ea8..a89eca3e60b 100644 --- a/pkg/registry/generic/registry/store_test.go +++ b/pkg/registry/generic/registry/store_test.go @@ -90,7 +90,7 @@ func hasCreated(t *testing.T, pod *api.Pod) func(runtime.Object) bool { func NewTestGenericStoreRegistry(t *testing.T) (*etcdtesting.EtcdTestServer, *Store) { podPrefix := "/pods" server := etcdtesting.NewEtcdTestClientServer(t) - s := etcdstorage.NewEtcdStorage(server.Client, testapi.Default.Codec(), etcdtest.PathPrefix(), false, etcdtest.DeserializationCacheSize) + s := etcdstorage.NewEtcdStorage(server.Client, testapi.Default.StorageCodec(), etcdtest.PathPrefix(), false, etcdtest.DeserializationCacheSize) strategy := &testRESTStrategy{api.Scheme, api.SimpleNameGenerator, true, false, true} return server, &Store{ diff --git a/pkg/registry/registrytest/etcd.go b/pkg/registry/registrytest/etcd.go index dafc5f4da30..c3295a5be09 100644 --- a/pkg/registry/registrytest/etcd.go +++ b/pkg/registry/registrytest/etcd.go @@ -38,7 +38,7 @@ import ( func NewEtcdStorage(t *testing.T, group string) (storage.Interface, *etcdtesting.EtcdTestServer) { server := etcdtesting.NewEtcdTestClientServer(t) - storage := etcdstorage.NewEtcdStorage(server.Client, testapi.Groups[group].Codec(), etcdtest.PathPrefix(), false, etcdtest.DeserializationCacheSize) + storage := etcdstorage.NewEtcdStorage(server.Client, testapi.Groups[group].StorageCodec(), etcdtest.PathPrefix(), false, etcdtest.DeserializationCacheSize) return storage, server } diff --git a/pkg/registry/thirdpartyresourcedata/codec.go b/pkg/registry/thirdpartyresourcedata/codec.go index a7f46a2ee0a..947957a9498 100644 --- a/pkg/registry/thirdpartyresourcedata/codec.go +++ b/pkg/registry/thirdpartyresourcedata/codec.go @@ -33,6 +33,7 @@ import ( "k8s.io/kubernetes/pkg/apis/extensions" "k8s.io/kubernetes/pkg/apis/extensions/v1beta1" "k8s.io/kubernetes/pkg/runtime" + "k8s.io/kubernetes/pkg/util/sets" ) type thirdPartyObjectConverter struct { @@ -183,31 +184,39 @@ func NewNegotiatedSerializer(s runtime.NegotiatedSerializer, kind string, encode } } -func (t *thirdPartyResourceDataCodecFactory) EncoderForVersion(s runtime.Serializer, gv unversioned.GroupVersion) runtime.Encoder { - return NewCodec(runtime.NewCodec( - t.NegotiatedSerializer.EncoderForVersion(s, gv), - t.NegotiatedSerializer.DecoderToVersion(s, t.decodeGV), - ), t.kind) +func (t *thirdPartyResourceDataCodecFactory) SupportedMediaTypes() []string { + supported := sets.NewString(t.NegotiatedSerializer.SupportedMediaTypes()...) + return supported.Intersection(sets.NewString("application/json", "application/yaml")).List() } -func (t *thirdPartyResourceDataCodecFactory) DecoderToVersion(s runtime.Serializer, gv unversioned.GroupVersion) runtime.Decoder { - return NewCodec(runtime.NewCodec( - t.NegotiatedSerializer.EncoderForVersion(s, t.encodeGV), - t.NegotiatedSerializer.DecoderToVersion(s, gv), - ), t.kind) +func (t *thirdPartyResourceDataCodecFactory) SupportedStreamingMediaTypes() []string { + supported := sets.NewString(t.NegotiatedSerializer.SupportedStreamingMediaTypes()...) + return supported.Intersection(sets.NewString("application/json", "application/json;stream=watch")).List() } -type thirdPartyResourceDataCodec struct { - delegate runtime.Codec +func (t *thirdPartyResourceDataCodecFactory) EncoderForVersion(s runtime.Encoder, gv unversioned.GroupVersion) runtime.Encoder { + return &thirdPartyResourceDataEncoder{delegate: t.NegotiatedSerializer.EncoderForVersion(s, gv), kind: t.kind} +} + +func (t *thirdPartyResourceDataCodecFactory) DecoderToVersion(s runtime.Decoder, gv unversioned.GroupVersion) runtime.Decoder { + return NewDecoder(t.NegotiatedSerializer.DecoderToVersion(s, gv), t.kind) +} + +func NewCodec(delegate runtime.Codec, kind string) runtime.Codec { + return runtime.NewCodec(NewEncoder(delegate, kind), NewDecoder(delegate, kind)) +} + +type thirdPartyResourceDataDecoder struct { + delegate runtime.Decoder kind string } -var _ runtime.Codec = &thirdPartyResourceDataCodec{} - -func NewCodec(codec runtime.Codec, kind string) runtime.Codec { - return &thirdPartyResourceDataCodec{codec, kind} +func NewDecoder(delegate runtime.Decoder, kind string) runtime.Decoder { + return &thirdPartyResourceDataDecoder{delegate: delegate, kind: kind} } +var _ runtime.Decoder = &thirdPartyResourceDataDecoder{} + func parseObject(data []byte) (map[string]interface{}, error) { var obj interface{} if err := json.Unmarshal(data, &obj); err != nil { @@ -221,7 +230,7 @@ func parseObject(data []byte) (map[string]interface{}, error) { return mapObj, nil } -func (t *thirdPartyResourceDataCodec) populate(data []byte) (runtime.Object, error) { +func (t *thirdPartyResourceDataDecoder) populate(data []byte) (runtime.Object, error) { mapObj, err := parseObject(data) if err != nil { return nil, err @@ -229,7 +238,7 @@ func (t *thirdPartyResourceDataCodec) populate(data []byte) (runtime.Object, err return t.populateFromObject(mapObj, data) } -func (t *thirdPartyResourceDataCodec) populateFromObject(mapObj map[string]interface{}, data []byte) (runtime.Object, error) { +func (t *thirdPartyResourceDataDecoder) populateFromObject(mapObj map[string]interface{}, data []byte) (runtime.Object, error) { typeMeta := unversioned.TypeMeta{} if err := json.Unmarshal(data, &typeMeta); err != nil { return nil, err @@ -252,7 +261,7 @@ func (t *thirdPartyResourceDataCodec) populateFromObject(mapObj map[string]inter } } -func (t *thirdPartyResourceDataCodec) populateResource(objIn *extensions.ThirdPartyResourceData, mapObj map[string]interface{}, data []byte) error { +func (t *thirdPartyResourceDataDecoder) populateResource(objIn *extensions.ThirdPartyResourceData, mapObj map[string]interface{}, data []byte) error { metadata, ok := mapObj["metadata"].(map[string]interface{}) if !ok { return fmt.Errorf("unexpected object for metadata: %#v", mapObj["metadata"]) @@ -274,7 +283,7 @@ func (t *thirdPartyResourceDataCodec) populateResource(objIn *extensions.ThirdPa return nil } -func (t *thirdPartyResourceDataCodec) Decode(data []byte, gvk *unversioned.GroupVersionKind, into runtime.Object) (runtime.Object, *unversioned.GroupVersionKind, error) { +func (t *thirdPartyResourceDataDecoder) Decode(data []byte, gvk *unversioned.GroupVersionKind, into runtime.Object) (runtime.Object, *unversioned.GroupVersionKind, error) { if into == nil { obj, err := t.populate(data) if err != nil { @@ -347,7 +356,7 @@ func (t *thirdPartyResourceDataCodec) Decode(data []byte, gvk *unversioned.Group return thirdParty, actual, nil } -func (t *thirdPartyResourceDataCodec) populateListResource(objIn *extensions.ThirdPartyResourceDataList, mapObj map[string]interface{}) error { +func (t *thirdPartyResourceDataDecoder) populateListResource(objIn *extensions.ThirdPartyResourceDataList, mapObj map[string]interface{}) error { items, ok := mapObj["items"].([]interface{}) if !ok { return fmt.Errorf("unexpected object for items: %#v", mapObj["items"]) @@ -374,6 +383,17 @@ const template = `{ "items": [ %s ] }` +type thirdPartyResourceDataEncoder struct { + delegate runtime.Encoder + kind string +} + +func NewEncoder(delegate runtime.Encoder, kind string) runtime.Encoder { + return &thirdPartyResourceDataEncoder{delegate: delegate, kind: kind} +} + +var _ runtime.Encoder = &thirdPartyResourceDataEncoder{} + func encodeToJSON(obj *extensions.ThirdPartyResourceData, stream io.Writer) error { var objOut interface{} if err := json.Unmarshal(obj.Data, &objOut); err != nil { @@ -388,7 +408,7 @@ func encodeToJSON(obj *extensions.ThirdPartyResourceData, stream io.Writer) erro return encoder.Encode(objMap) } -func (t *thirdPartyResourceDataCodec) EncodeToStream(obj runtime.Object, stream io.Writer, overrides ...unversioned.GroupVersion) (err error) { +func (t *thirdPartyResourceDataEncoder) EncodeToStream(obj runtime.Object, stream io.Writer, overrides ...unversioned.GroupVersion) (err error) { switch obj := obj.(type) { case *extensions.ThirdPartyResourceData: return encodeToJSON(obj, stream) diff --git a/pkg/registry/thirdpartyresourcedata/codec_test.go b/pkg/registry/thirdpartyresourcedata/codec_test.go index 188e44befc4..942de910ccb 100644 --- a/pkg/registry/thirdpartyresourcedata/codec_test.go +++ b/pkg/registry/thirdpartyresourcedata/codec_test.go @@ -87,13 +87,14 @@ func TestCodec(t *testing.T) { }, } for _, test := range tests { - codec := &thirdPartyResourceDataCodec{kind: "Foo", delegate: testapi.Extensions.Codec()} + d := &thirdPartyResourceDataDecoder{kind: "Foo", delegate: testapi.Extensions.Codec()} + e := &thirdPartyResourceDataEncoder{kind: "Foo", delegate: testapi.Extensions.Codec()} data, err := json.Marshal(test.obj) if err != nil { t.Errorf("[%s] unexpected error: %v", test.name, err) continue } - obj, err := runtime.Decode(codec, data) + obj, err := runtime.Decode(d, data) if err != nil && !test.expectErr { t.Errorf("[%s] unexpected error: %v", test.name, err) continue @@ -121,7 +122,7 @@ func TestCodec(t *testing.T) { t.Errorf("[%s]\nexpected\n%v\nsaw\n%v\n", test.name, test.obj, &output) } - data, err = runtime.Encode(codec, rsrcObj) + data, err = runtime.Encode(e, rsrcObj) if err != nil { t.Errorf("[%s] unexpected error: %v", test.name, err) } diff --git a/pkg/runtime/codec.go b/pkg/runtime/codec.go index bde0ae9755d..6379e71f5ba 100644 --- a/pkg/runtime/codec.go +++ b/pkg/runtime/codec.go @@ -18,6 +18,7 @@ package runtime import ( "bytes" + "encoding/base64" "fmt" "io" "net/url" @@ -169,3 +170,27 @@ func (c *parameterCodec) EncodeParameters(obj Object, to unversioned.GroupVersio } return queryparams.Convert(obj) } + +type base64Serializer struct { + Serializer +} + +func NewBase64Serializer(s Serializer) Serializer { + return &base64Serializer{s} +} + +func (s base64Serializer) EncodeToStream(obj Object, stream io.Writer, overrides ...unversioned.GroupVersion) error { + e := base64.NewEncoder(base64.StdEncoding, stream) + err := s.Serializer.EncodeToStream(obj, e, overrides...) + e.Close() + return err +} + +func (s base64Serializer) Decode(data []byte, defaults *unversioned.GroupVersionKind, into Object) (Object, *unversioned.GroupVersionKind, error) { + out := make([]byte, base64.StdEncoding.DecodedLen(len(data))) + n, err := base64.StdEncoding.Decode(out, data) + if err != nil { + return nil, nil, err + } + return s.Serializer.Decode(out[:n], defaults, into) +} diff --git a/pkg/runtime/deep_copy_generated.go b/pkg/runtime/deep_copy_generated.go index f368440a384..988b9740177 100644 --- a/pkg/runtime/deep_copy_generated.go +++ b/pkg/runtime/deep_copy_generated.go @@ -118,3 +118,33 @@ func DeepCopy_runtime_Scheme(in Scheme, out *Scheme, c *conversion.Cloner) error } return nil } + +func DeepCopy_runtime_SerializerInfo(in SerializerInfo, out *SerializerInfo, c *conversion.Cloner) error { + if in.Serializer == nil { + out.Serializer = nil + } else if newVal, err := c.DeepCopy(in.Serializer); err != nil { + return err + } else { + out.Serializer = newVal.(Serializer) + } + out.EncodesAsText = in.EncodesAsText + out.MediaType = in.MediaType + return nil +} + +func DeepCopy_runtime_StreamSerializerInfo(in StreamSerializerInfo, out *StreamSerializerInfo, c *conversion.Cloner) error { + if err := DeepCopy_runtime_SerializerInfo(in.SerializerInfo, &out.SerializerInfo, c); err != nil { + return err + } + if in.Framer == nil { + out.Framer = nil + } else if newVal, err := c.DeepCopy(in.Framer); err != nil { + return err + } else { + out.Framer = newVal.(Framer) + } + if err := DeepCopy_runtime_SerializerInfo(in.Embedded, &out.Embedded, c); err != nil { + return err + } + return nil +} diff --git a/pkg/runtime/interfaces.go b/pkg/runtime/interfaces.go index 9954238a9cd..c388a12fe5f 100644 --- a/pkg/runtime/interfaces.go +++ b/pkg/runtime/interfaces.go @@ -86,32 +86,75 @@ type Framer interface { NewFrameWriter(w io.Writer) io.Writer } +// SerializerInfo contains information about a specific serialization format +type SerializerInfo struct { + Serializer + // EncodesAsText indicates this serializer can be encoded to UTF-8 safely. + EncodesAsText bool + // MediaType is the value that represents this serializer over the wire. + MediaType string +} + +// StreamSerializerInfo contains information about a specific stream serialization format +type StreamSerializerInfo struct { + SerializerInfo + // Framer is the factory for retrieving streams that separate objects on the wire + Framer + // Embedded is the type of the nested serialization that should be used. + Embedded SerializerInfo +} + // NegotiatedSerializer is an interface used for obtaining encoders, decoders, and serializers -// for multiple supported media types. +// for multiple supported media types. This would commonly be accepted by a server component +// that performs HTTP content negotiation to accept multiple formats. type NegotiatedSerializer interface { // SupportedMediaTypes is the media types supported for reading and writing single objects. SupportedMediaTypes() []string - // SerializerForMediaType returns a serializer for the provided media type. Options is a set of - // parameters applied to the media type that may modify the resulting output. - SerializerForMediaType(mediaType string, options map[string]string) (Serializer, bool) + // SerializerForMediaType returns a serializer for the provided media type. params is the set of + // parameters applied to the media type that may modify the resulting output. ok will be false + // if no serializer matched the media type. + SerializerForMediaType(mediaType string, params map[string]string) (s SerializerInfo, ok bool) // SupportedStreamingMediaTypes returns the media types of the supported streaming serializers. // Streaming serializers control how multiple objects are written to a stream output. SupportedStreamingMediaTypes() []string // StreamingSerializerForMediaType returns a serializer for the provided media type that supports // reading and writing multiple objects to a stream. It returns a framer and serializer, or an - // error if no such serializer can be created. Options is a set of parameters applied to the - // media type that may modify the resulting output. - StreamingSerializerForMediaType(mediaType string, options map[string]string) (Serializer, Framer, string, bool) + // error if no such serializer can be created. Params is the set of parameters applied to the + // media type that may modify the resulting output. ok will be false if no serializer matched + // the media type. + StreamingSerializerForMediaType(mediaType string, params map[string]string) (s StreamSerializerInfo, ok bool) // EncoderForVersion returns an encoder that ensures objects being written to the provided // serializer are in the provided group version. // TODO: take multiple group versions - EncoderForVersion(serializer Serializer, gv unversioned.GroupVersion) Encoder + EncoderForVersion(serializer Encoder, gv unversioned.GroupVersion) Encoder // DecoderForVersion returns a decoder that ensures objects being read by the provided // serializer are in the provided group version by default. // TODO: take multiple group versions - DecoderToVersion(serializer Serializer, gv unversioned.GroupVersion) Decoder + DecoderToVersion(serializer Decoder, gv unversioned.GroupVersion) Decoder +} + +// StorageSerializer is an interface used for obtaining encoders, decoders, and serializers +// that can read and write data at rest. This would commonly be used by client tools that must +// read files, or server side storage interfaces that persist restful objects. +type StorageSerializer interface { + // SerializerForMediaType returns a serializer for the provided media type. Options is a set of + // parameters applied to the media type that may modify the resulting output. + SerializerForMediaType(mediaType string, options map[string]string) (SerializerInfo, bool) + + // UniversalDeserializer returns a Serializer that can read objects in multiple supported formats + // by introspecting the data at rest. + UniversalDeserializer() Decoder + + // EncoderForVersion returns an encoder that ensures objects being written to the provided + // serializer are in the provided group version. + // TODO: take multiple group versions + EncoderForVersion(serializer Encoder, gv unversioned.GroupVersion) Encoder + // DecoderForVersion returns a decoder that ensures objects being read by the provided + // serializer are in the provided group version by default. + // TODO: take multiple group versions + DecoderToVersion(serializer Decoder, gv unversioned.GroupVersion) Decoder } /////////////////////////////////////////////////////////////////////////////// diff --git a/pkg/runtime/serializer/codec_factory.go b/pkg/runtime/serializer/codec_factory.go index 1ed0312b143..573ec70bb39 100644 --- a/pkg/runtime/serializer/codec_factory.go +++ b/pkg/runtime/serializer/codec_factory.go @@ -31,6 +31,8 @@ type serializerType struct { AcceptContentTypes []string ContentType string FileExtensions []string + // EncodesAsText should be true if this content type can be represented safely in UTF-8 + EncodesAsText bool Serializer runtime.Serializer PrettySerializer runtime.Serializer @@ -65,6 +67,7 @@ func newSerializersForScheme(scheme *runtime.Scheme, mf json.MetaFactory) []seri AcceptContentTypes: []string{"application/json"}, ContentType: "application/json", FileExtensions: []string{"json"}, + EncodesAsText: true, Serializer: jsonSerializer, PrettySerializer: jsonPrettySerializer, @@ -77,6 +80,7 @@ func newSerializersForScheme(scheme *runtime.Scheme, mf json.MetaFactory) []seri AcceptContentTypes: []string{"application/yaml"}, ContentType: "application/yaml", FileExtensions: []string{"yaml"}, + EncodesAsText: true, Serializer: yamlSerializer, // TODO: requires runtime.RawExtension to properly distinguish when the nested content is @@ -206,62 +210,91 @@ func (f CodecFactory) UniversalDeserializer() runtime.Decoder { // // TODO: the decoder will eventually be removed in favor of dealing with objects in their versioned form func (f CodecFactory) UniversalDecoder(versions ...unversioned.GroupVersion) runtime.Decoder { - return f.CodecForVersions(runtime.NoopEncoder{Decoder: f.universal}, nil, versions) + return f.CodecForVersions(nil, f.universal, nil, versions) } // CodecFor creates a codec with the provided serializer. If an object is decoded and its group is not in the list, // it will default to runtime.APIVersionInternal. If encode is not specified for an object's group, the object is not // converted. If encode or decode are nil, no conversion is performed. -func (f CodecFactory) CodecForVersions(serializer runtime.Serializer, encode []unversioned.GroupVersion, decode []unversioned.GroupVersion) runtime.Codec { - return versioning.NewCodecForScheme(f.scheme, serializer, serializer, encode, decode) +func (f CodecFactory) CodecForVersions(encoder runtime.Encoder, decoder runtime.Decoder, encode []unversioned.GroupVersion, decode []unversioned.GroupVersion) runtime.Codec { + return versioning.NewCodecForScheme(f.scheme, encoder, decoder, encode, decode) } // DecoderToVersion returns a decoder that targets the provided group version. -func (f CodecFactory) DecoderToVersion(serializer runtime.Serializer, gv unversioned.GroupVersion) runtime.Decoder { - return f.CodecForVersions(serializer, nil, []unversioned.GroupVersion{gv}) +func (f CodecFactory) DecoderToVersion(decoder runtime.Decoder, gv unversioned.GroupVersion) runtime.Decoder { + return f.CodecForVersions(nil, decoder, nil, []unversioned.GroupVersion{gv}) } // EncoderForVersion returns an encoder that targets the provided group version. -func (f CodecFactory) EncoderForVersion(serializer runtime.Serializer, gv unversioned.GroupVersion) runtime.Encoder { - return f.CodecForVersions(serializer, []unversioned.GroupVersion{gv}, nil) +func (f CodecFactory) EncoderForVersion(encoder runtime.Encoder, gv unversioned.GroupVersion) runtime.Encoder { + return f.CodecForVersions(encoder, nil, []unversioned.GroupVersion{gv}, nil) } // SerializerForMediaType returns a serializer that matches the provided RFC2046 mediaType, or false if no such // serializer exists -func (f CodecFactory) SerializerForMediaType(mediaType string, options map[string]string) (runtime.Serializer, bool) { +func (f CodecFactory) SerializerForMediaType(mediaType string, params map[string]string) (runtime.SerializerInfo, bool) { for _, s := range f.serializers { for _, accepted := range s.AcceptContentTypes { if accepted == mediaType { - if s.Specialize != nil && len(options) > 0 { - serializer, ok := s.Specialize(options) - return serializer, ok + // specialization abstracts variants to the content type + if s.Specialize != nil && len(params) > 0 { + serializer, ok := s.Specialize(params) + // TODO: return formatted mediaType+params + return runtime.SerializerInfo{Serializer: serializer, MediaType: s.ContentType, EncodesAsText: s.EncodesAsText}, ok } - if v, ok := options["pretty"]; ok && v == "1" && s.PrettySerializer != nil { - return s.PrettySerializer, true + + // legacy support for ?pretty=1 continues, but this is more formally defined + if v, ok := params["pretty"]; ok && v == "1" && s.PrettySerializer != nil { + return runtime.SerializerInfo{Serializer: s.PrettySerializer, MediaType: s.ContentType, EncodesAsText: s.EncodesAsText}, true } - return s.Serializer, true + + // return the base variant + return runtime.SerializerInfo{Serializer: s.Serializer, MediaType: s.ContentType, EncodesAsText: s.EncodesAsText}, true } } } - return nil, false + return runtime.SerializerInfo{}, false } // StreamingSerializerForMediaType returns a serializer that matches the provided RFC2046 mediaType, or false if no such // serializer exists -func (f CodecFactory) StreamingSerializerForMediaType(mediaType string, options map[string]string) (runtime.Serializer, runtime.Framer, string, bool) { +func (f CodecFactory) StreamingSerializerForMediaType(mediaType string, params map[string]string) (runtime.StreamSerializerInfo, bool) { for _, s := range f.serializers { for _, accepted := range s.AcceptStreamContentTypes { if accepted == mediaType { - if s.StreamSpecialize != nil && len(options) > 0 { - serializer, ok := s.StreamSpecialize(options) - // TODO: have StreamSpecialize return exact content type - return serializer, s.Framer, s.StreamContentType, ok + // TODO: accept params + nested, ok := f.SerializerForMediaType(s.ContentType, nil) + if !ok { + panic("no serializer defined for internal content type") } - return s.StreamSerializer, s.Framer, s.StreamContentType, true + + if s.StreamSpecialize != nil && len(params) > 0 { + serializer, ok := s.StreamSpecialize(params) + // TODO: return formatted mediaType+params + return runtime.StreamSerializerInfo{ + SerializerInfo: runtime.SerializerInfo{ + Serializer: serializer, + MediaType: s.StreamContentType, + EncodesAsText: s.EncodesAsText, + }, + Framer: s.Framer, + Embedded: nested, + }, ok + } + + return runtime.StreamSerializerInfo{ + SerializerInfo: runtime.SerializerInfo{ + Serializer: s.StreamSerializer, + MediaType: s.StreamContentType, + EncodesAsText: s.EncodesAsText, + }, + Framer: s.Framer, + Embedded: nested, + }, true } } } - return nil, nil, "", false + return runtime.StreamSerializerInfo{}, false } // SerializerForFileExtension returns a serializer for the provided extension, or false if no serializer matches. diff --git a/pkg/runtime/serializer/codec_test.go b/pkg/runtime/serializer/codec_test.go index 259da6e30c0..342a2a599fd 100644 --- a/pkg/runtime/serializer/codec_test.go +++ b/pkg/runtime/serializer/codec_test.go @@ -267,7 +267,7 @@ func TestVersionedEncoding(t *testing.T) { encoder, _ := cf.SerializerForFileExtension("json") // codec that is unversioned uses the target version - unversionedCodec := cf.CodecForVersions(encoder, nil, nil) + unversionedCodec := cf.CodecForVersions(encoder, nil, nil, nil) _, err = runtime.Encode(unversionedCodec, &TestType1{}, unversioned.GroupVersion{Version: "v3"}) if err == nil || !runtime.IsNotRegisteredError(err) { t.Fatal(err) diff --git a/pkg/runtime/serializer/json/json.go b/pkg/runtime/serializer/json/json.go index 2656d591b1f..1788d980287 100644 --- a/pkg/runtime/serializer/json/json.go +++ b/pkg/runtime/serializer/json/json.go @@ -195,14 +195,6 @@ func (s *Serializer) RecognizesData(peek io.Reader) (ok, unknown bool, err error return ok, false, nil } -// EncodesAsText returns true because both JSON and YAML are considered textual representations -// of data. This is used to determine whether the serialized object should be transmitted over -// a WebSocket Text or Binary frame. This must remain true for legacy compatibility with v1.1 -// watch over websocket implementations. -func (s *Serializer) EncodesAsText() bool { - return true -} - // Framer is the default JSON framing behavior, with newlines delimiting individual objects. var Framer = jsonFramer{} diff --git a/pkg/runtime/serializer/negotiated_codec.go b/pkg/runtime/serializer/negotiated_codec.go index 0f0f3691184..6f6a56dd3fa 100644 --- a/pkg/runtime/serializer/negotiated_codec.go +++ b/pkg/runtime/serializer/negotiated_codec.go @@ -24,35 +24,34 @@ import ( // TODO: We should figure out what happens when someone asks // encoder for version and it conflicts with the raw serializer. type negotiatedSerializerWrapper struct { - serializer runtime.Serializer - streamingSerializer runtime.Serializer - framer runtime.Framer + info runtime.SerializerInfo + streamInfo runtime.StreamSerializerInfo } -func NegotiatedSerializerWrapper(serializer, streamingSerializer runtime.Serializer, framer runtime.Framer) runtime.NegotiatedSerializer { - return &negotiatedSerializerWrapper{serializer, streamingSerializer, framer} +func NegotiatedSerializerWrapper(info runtime.SerializerInfo, streamInfo runtime.StreamSerializerInfo) runtime.NegotiatedSerializer { + return &negotiatedSerializerWrapper{info, streamInfo} } func (n *negotiatedSerializerWrapper) SupportedMediaTypes() []string { return []string{} } -func (n *negotiatedSerializerWrapper) SerializerForMediaType(mediaType string, options map[string]string) (runtime.Serializer, bool) { - return n.serializer, true +func (n *negotiatedSerializerWrapper) SerializerForMediaType(mediaType string, options map[string]string) (runtime.SerializerInfo, bool) { + return n.info, true } func (n *negotiatedSerializerWrapper) SupportedStreamingMediaTypes() []string { return []string{} } -func (n *negotiatedSerializerWrapper) StreamingSerializerForMediaType(mediaType string, options map[string]string) (runtime.Serializer, runtime.Framer, string, bool) { - return n.streamingSerializer, n.framer, "", true +func (n *negotiatedSerializerWrapper) StreamingSerializerForMediaType(mediaType string, options map[string]string) (runtime.StreamSerializerInfo, bool) { + return n.streamInfo, true } -func (n *negotiatedSerializerWrapper) EncoderForVersion(serializer runtime.Serializer, gv unversioned.GroupVersion) runtime.Encoder { - return n.serializer +func (n *negotiatedSerializerWrapper) EncoderForVersion(e runtime.Encoder, _ unversioned.GroupVersion) runtime.Encoder { + return e } -func (n *negotiatedSerializerWrapper) DecoderToVersion(serializer runtime.Serializer, gv unversioned.GroupVersion) runtime.Decoder { - return n.serializer +func (n *negotiatedSerializerWrapper) DecoderToVersion(d runtime.Decoder, _gv unversioned.GroupVersion) runtime.Decoder { + return d } diff --git a/pkg/runtime/serializer/protobuf/protobuf_test.go b/pkg/runtime/serializer/protobuf/protobuf_test.go index e39610cb314..06b3d2e557c 100644 --- a/pkg/runtime/serializer/protobuf/protobuf_test.go +++ b/pkg/runtime/serializer/protobuf/protobuf_test.go @@ -297,6 +297,18 @@ func TestDecodeObjects(t *testing.T) { t.Fatal(err) } + unk2 := &runtime.Unknown{ + TypeMeta: runtime.TypeMeta{Kind: "Pod", APIVersion: "v1"}, + } + wire2 := make([]byte, len(wire1)*2) + n, err := unk2.NestedMarshalTo(wire2, obj1, uint64(obj1.Size())) + if err != nil { + t.Fatal(err) + } + if n != len(wire1) || !bytes.Equal(wire1, wire2[:n]) { + t.Fatalf("unexpected wire:\n%s", hex.Dump(wire2[:n])) + } + wire1 = append([]byte{0x6b, 0x38, 0x73, 0x00}, wire1...) testCases := []struct { diff --git a/pkg/runtime/serializer/recognizer/recognizer.go b/pkg/runtime/serializer/recognizer/recognizer.go index 8fbda6c1a3b..4b8b1e204e3 100644 --- a/pkg/runtime/serializer/recognizer/recognizer.go +++ b/pkg/runtime/serializer/recognizer/recognizer.go @@ -17,6 +17,7 @@ limitations under the License. package recognizer import ( + "bufio" "bytes" "fmt" "io" diff --git a/pkg/runtime/serializer/versioning/versioning.go b/pkg/runtime/serializer/versioning/versioning.go index 36c66fa9f57..e12682a6c28 100644 --- a/pkg/runtime/serializer/versioning/versioning.go +++ b/pkg/runtime/serializer/versioning/versioning.go @@ -27,6 +27,7 @@ import ( // EnableCrossGroupDecoding modifies the given decoder in place, if it is a codec // from this package. It allows objects from one group to be auto-decoded into // another group. 'destGroup' must already exist in the codec. +// TODO: this is an encapsulation violation and should be refactored func EnableCrossGroupDecoding(d runtime.Decoder, sourceGroup, destGroup string) error { internal, ok := d.(*codec) if !ok { @@ -45,6 +46,7 @@ func EnableCrossGroupDecoding(d runtime.Decoder, sourceGroup, destGroup string) // EnableCrossGroupEncoding modifies the given encoder in place, if it is a codec // from this package. It allows objects from one group to be auto-decoded into // another group. 'destGroup' must already exist in the codec. +// TODO: this is an encapsulation violation and should be refactored func EnableCrossGroupEncoding(e runtime.Encoder, sourceGroup, destGroup string) error { internal, ok := e.(*codec) if !ok { diff --git a/plugin/pkg/auth/authorizer/webhook/webhook.go b/plugin/pkg/auth/authorizer/webhook/webhook.go index 65b88865e05..66f4bbd027e 100644 --- a/plugin/pkg/auth/authorizer/webhook/webhook.go +++ b/plugin/pkg/auth/authorizer/webhook/webhook.go @@ -86,9 +86,13 @@ func New(kubeConfigFile string) (*WebhookAuthorizer, error) { if err != nil { return nil, err } + serializer := json.NewSerializer(json.DefaultMetaFactory, api.Scheme, runtime.ObjectTyperToTyper(api.Scheme), false) codec := versioning.NewCodecForScheme(api.Scheme, serializer, serializer, encodeVersions, decodeVersions) - clientConfig.ContentConfig.NegotiatedSerializer = runtimeserializer.NegotiatedSerializerWrapper(codec, codec, json.Framer) + clientConfig.ContentConfig.NegotiatedSerializer = runtimeserializer.NegotiatedSerializerWrapper( + runtime.SerializerInfo{Serializer: codec}, + runtime.StreamSerializerInfo{}, + ) restClient, err := restclient.UnversionedRESTClientFor(clientConfig) if err != nil { diff --git a/test/integration/framework/master_utils.go b/test/integration/framework/master_utils.go index 73115d8fcb7..29af8f9843f 100644 --- a/test/integration/framework/master_utils.go +++ b/test/integration/framework/master_utils.go @@ -155,24 +155,29 @@ func NewMasterConfig() *master.Config { Prefix: etcdtest.PathPrefix(), } - negotiatedSerializer := NewSingleContentTypeSerializer(api.Scheme, testapi.Default.Codec(), "application/json") + negotiatedSerializer := NewSingleContentTypeSerializer(api.Scheme, testapi.Default.Codec(), runtime.ContentTypeJSON) - storageFactory := genericapiserver.NewDefaultStorageFactory(config, negotiatedSerializer, genericapiserver.NewDefaultResourceEncodingConfig(), master.DefaultAPIResourceConfigSource()) + storageFactory := genericapiserver.NewDefaultStorageFactory(config, runtime.ContentTypeJSON, negotiatedSerializer, genericapiserver.NewDefaultResourceEncodingConfig(), master.DefaultAPIResourceConfigSource()) storageFactory.SetSerializer( unversioned.GroupResource{Group: api.GroupName, Resource: genericapiserver.AllResources}, - NewSingleContentTypeSerializer(api.Scheme, testapi.Default.Codec(), "application/json")) + "", + NewSingleContentTypeSerializer(api.Scheme, testapi.Default.Codec(), runtime.ContentTypeJSON)) storageFactory.SetSerializer( unversioned.GroupResource{Group: autoscaling.GroupName, Resource: genericapiserver.AllResources}, - NewSingleContentTypeSerializer(api.Scheme, testapi.Autoscaling.Codec(), "application/json")) + "", + NewSingleContentTypeSerializer(api.Scheme, testapi.Autoscaling.Codec(), runtime.ContentTypeJSON)) storageFactory.SetSerializer( unversioned.GroupResource{Group: batch.GroupName, Resource: genericapiserver.AllResources}, - NewSingleContentTypeSerializer(api.Scheme, testapi.Batch.Codec(), "application/json")) + "", + NewSingleContentTypeSerializer(api.Scheme, testapi.Batch.Codec(), runtime.ContentTypeJSON)) storageFactory.SetSerializer( unversioned.GroupResource{Group: apps.GroupName, Resource: genericapiserver.AllResources}, - NewSingleContentTypeSerializer(api.Scheme, testapi.Apps.Codec(), "application/json")) + "", + NewSingleContentTypeSerializer(api.Scheme, testapi.Apps.Codec(), runtime.ContentTypeJSON)) storageFactory.SetSerializer( unversioned.GroupResource{Group: extensions.GroupName, Resource: genericapiserver.AllResources}, - NewSingleContentTypeSerializer(api.Scheme, testapi.Extensions.Codec(), "application/json")) + "", + NewSingleContentTypeSerializer(api.Scheme, testapi.Extensions.Codec(), runtime.ContentTypeJSON)) return &master.Config{ Config: &genericapiserver.Config{ diff --git a/test/integration/framework/serializer.go b/test/integration/framework/serializer.go index e6aa79574a4..5d6d0e77a1b 100644 --- a/test/integration/framework/serializer.go +++ b/test/integration/framework/serializer.go @@ -23,7 +23,7 @@ import ( ) // NewSingleContentTypeSerializer wraps a serializer in a NegotiatedSerializer that handles one content type -func NewSingleContentTypeSerializer(scheme *runtime.Scheme, serializer runtime.Serializer, contentType string) runtime.NegotiatedSerializer { +func NewSingleContentTypeSerializer(scheme *runtime.Scheme, serializer runtime.Serializer, contentType string) runtime.StorageSerializer { return &wrappedSerializer{ scheme: scheme, serializer: serializer, @@ -37,29 +37,31 @@ type wrappedSerializer struct { contentType string } -var _ runtime.NegotiatedSerializer = &wrappedSerializer{} +var _ runtime.StorageSerializer = &wrappedSerializer{} func (s *wrappedSerializer) SupportedMediaTypes() []string { return []string{s.contentType} } -func (s *wrappedSerializer) SerializerForMediaType(mediaType string, options map[string]string) (runtime.Serializer, bool) { +func (s *wrappedSerializer) SerializerForMediaType(mediaType string, options map[string]string) (runtime.SerializerInfo, bool) { if mediaType != s.contentType { - return nil, false + return runtime.SerializerInfo{}, false } - return s.serializer, true -} -func (s *wrappedSerializer) SupportedStreamingMediaTypes() []string { - return nil -} -func (s *wrappedSerializer) StreamingSerializerForMediaType(mediaType string, options map[string]string) (runtime.Serializer, runtime.Framer, string, bool) { - return nil, nil, "", false + return runtime.SerializerInfo{ + Serializer: s.serializer, + MediaType: mediaType, + EncodesAsText: true, // TODO: this should be parameterized + }, true } -func (s *wrappedSerializer) EncoderForVersion(serializer runtime.Serializer, gv unversioned.GroupVersion) runtime.Encoder { - return versioning.NewCodec(s.serializer, s.serializer, s.scheme, s.scheme, s.scheme, runtime.ObjectTyperToTyper(s.scheme), []unversioned.GroupVersion{gv}, nil) +func (s *wrappedSerializer) UniversalDeserializer() runtime.Decoder { + return s.serializer } -func (s *wrappedSerializer) DecoderToVersion(serializer runtime.Serializer, gv unversioned.GroupVersion) runtime.Decoder { - return versioning.NewCodec(s.serializer, s.serializer, s.scheme, s.scheme, s.scheme, runtime.ObjectTyperToTyper(s.scheme), nil, []unversioned.GroupVersion{gv}) +func (s *wrappedSerializer) EncoderForVersion(encoder runtime.Encoder, gv unversioned.GroupVersion) runtime.Encoder { + return versioning.NewCodec(encoder, nil, s.scheme, s.scheme, s.scheme, runtime.ObjectTyperToTyper(s.scheme), []unversioned.GroupVersion{gv}, nil) +} + +func (s *wrappedSerializer) DecoderToVersion(decoder runtime.Decoder, gv unversioned.GroupVersion) runtime.Decoder { + return versioning.NewCodec(nil, decoder, s.scheme, s.scheme, s.scheme, runtime.ObjectTyperToTyper(s.scheme), nil, []unversioned.GroupVersion{gv}) }