Merge pull request #24710 from smarterclayton/store_proto_in_etcd

Automatic merge from submit-queue

Allow etcd to store protobuf objects

Split storage serialization from client negotiation, and allow API server to take flag controlling serialization.

TODO:

* [x] API server still doesn't start - range allocation object doesn't seem to round trip correctly to etcd
* [ ] Verify that third party resources are ignoring protobuf (add a test)
* [ ] Add integration tests that verify storage is correctly protobuf
* [ ] Add a global default for which storage format to prefer?
This commit is contained in:
k8s-merge-robot 2016-05-05 19:00:20 -07:00
commit ab36e0e35e
52 changed files with 651 additions and 280 deletions

View File

@ -44,6 +44,7 @@ type APIServer struct {
AuthorizationMode string
AuthorizationConfig apiserver.AuthorizationConfig
BasicAuthFile string
DefaultStorageMediaType string
DeleteCollectionWorkers int
DeprecatedStorageVersion string
EtcdServersOverrides []string
@ -76,6 +77,7 @@ func NewAPIServer() *APIServer {
ServerRunOptions: genericapiserver.NewServerRunOptions(),
AdmissionControl: "AlwaysAdmit",
AuthorizationMode: "AlwaysAllow",
DefaultStorageMediaType: "application/json",
DeleteCollectionWorkers: 1,
EventTTL: 1 * time.Hour,
MasterServiceNamespace: api.NamespaceDefault,
@ -153,6 +155,7 @@ func (s *APIServer) AddFlags(fs *pflag.FlagSet) {
"In the case where objects are moved from one group to the other, you may specify the format \"group1=group2/v1beta1,group3/v1beta1,...\". "+
"You only need to pass the groups you wish to change from the defaults. "+
"It defaults to a list of preferred versions of all registered groups, which is derived from the KUBE_API_VERSIONS environment variable.")
fs.StringVar(&s.DefaultStorageMediaType, "storage-media-type", s.DefaultStorageMediaType, "The media type to use to store objects in storage. Defaults to application/json. Some resources may only support a specific media type and will ignore this setting.")
fs.DurationVar(&s.EventTTL, "event-ttl", s.EventTTL, "Amount of time to retain events. Default 1 hour.")
fs.StringVar(&s.BasicAuthFile, "basic-auth-file", s.BasicAuthFile, "If set, the file that will be used to admit requests to the secure port of the API server via http basic authentication.")
fs.StringVar(&s.TokenAuthFile, "token-auth-file", s.TokenAuthFile, "If set, the file that will be used to secure the secure port of the API server via token authentication.")

View File

@ -167,7 +167,9 @@ func Run(s *options.APIServer) error {
resourceEncoding.SetVersionEncoding(group, storageEncodingVersion, unversioned.GroupVersion{Group: group, Version: runtime.APIVersionInternal})
}
storageFactory := genericapiserver.NewDefaultStorageFactory(s.StorageConfig, api.Codecs, resourceEncoding, apiResourceConfigSource)
storageFactory := genericapiserver.NewDefaultStorageFactory(s.StorageConfig, s.DefaultStorageMediaType, api.Codecs, resourceEncoding, apiResourceConfigSource)
// third party resources are always serialized to storage using JSON
storageFactory.SetSerializer(extensions.Resource("thirdpartyresources"), "application/json", nil)
storageFactory.AddCohabitatingResources(batch.Resource("jobs"), extensions.Resource("jobs"))
storageFactory.AddCohabitatingResources(autoscaling.Resource("horizontalpodautoscalers"), extensions.Resource("horizontalpodautoscalers"))
for _, override := range s.EtcdServersOverrides {

View File

@ -110,6 +110,7 @@ kube-apiserver
--ssh-keyfile="": If non-empty, use secure SSH proxy to the nodes, using this user keyfile
--ssh-user="": If non-empty, use secure SSH proxy to the nodes, using this user name
--storage-backend="": The storage backend for persistence. Options: 'etcd2' (default), 'etcd3'.
--storage-media-type="application/json": The media type to use to store objects in storage. Defaults to application/json. Some resources may only support a specific media type and will ignore this setting.
--storage-versions="apps/v1alpha1,authorization.k8s.io/v1beta1,autoscaling/v1,batch/v1,componentconfig/v1alpha1,extensions/v1beta1,metrics/v1alpha1,v1": The per-group version to store resources in. Specified in the format "group1/version1,group2/version2,...". In the case where objects are moved from one group to the other, you may specify the format "group1=group2/v1beta1,group3/v1beta1,...". You only need to pass the groups you wish to change from the defaults. It defaults to a list of preferred versions of all registered groups, which is derived from the KUBE_API_VERSIONS environment variable.
--tls-cert-file="": File containing x509 Certificate for HTTPS. (CA cert, if any, concatenated after server cert). If HTTPS serving is enabled, and --tls-cert-file and --tls-private-key-file are not provided, a self-signed certificate and key are generated for the public address and saved to /var/run/kubernetes.
--tls-private-key-file="": File containing x509 private key matching --tls-cert-file.
@ -118,7 +119,7 @@ kube-apiserver
--watch-cache-sizes=[]: List of watch cache sizes for every resource (pods, nodes, etc.), comma separated. The individual override format: resource#size, where size is a number. It takes effect when watch-cache is enabled.
```
###### Auto generated by spf13/cobra on 28-Apr-2016
###### Auto generated by spf13/cobra on 30-Apr-2016
<!-- BEGIN MUNGE: GENERATED_ANALYTICS -->

View File

@ -45,7 +45,7 @@ func newStorageFactory() genericapiserver.StorageFactory {
Prefix: genericapiserver.DefaultEtcdPathPrefix,
ServerList: []string{"http://127.0.0.1:4001"},
}
storageFactory := genericapiserver.NewDefaultStorageFactory(config, api.Codecs, genericapiserver.NewDefaultResourceEncodingConfig(), genericapiserver.NewResourceConfig())
storageFactory := genericapiserver.NewDefaultStorageFactory(config, "application/json", api.Codecs, genericapiserver.NewDefaultResourceEncodingConfig(), genericapiserver.NewResourceConfig())
return storageFactory
}

View File

@ -44,6 +44,7 @@ type APIServer struct {
AuthorizationMode string
AuthorizationConfig apiserver.AuthorizationConfig
BasicAuthFile string
DefaultStorageMediaType string
DeleteCollectionWorkers int
DeprecatedStorageVersion string
EtcdServersOverrides []string
@ -76,6 +77,7 @@ func NewAPIServer() *APIServer {
ServerRunOptions: genericapiserver.NewServerRunOptions(),
AdmissionControl: "AlwaysAdmit",
AuthorizationMode: "AlwaysAllow",
DefaultStorageMediaType: "application/json",
DeleteCollectionWorkers: 1,
EventTTL: 1 * time.Hour,
MasterServiceNamespace: api.NamespaceDefault,
@ -153,6 +155,7 @@ func (s *APIServer) AddFlags(fs *pflag.FlagSet) {
"In the case where objects are moved from one group to the other, you may specify the format \"group1=group2/v1beta1,group3/v1beta1,...\". "+
"You only need to pass the groups you wish to change from the defaults. "+
"It defaults to a list of preferred versions of all registered groups, which is derived from the KUBE_API_VERSIONS environment variable.")
fs.StringVar(&s.DefaultStorageMediaType, "storage-media-type", s.DefaultStorageMediaType, "The media type to use to store objects in storage. Defaults to application/json. Some resources may only support a specific media type and will ignore this setting.")
fs.DurationVar(&s.EventTTL, "event-ttl", s.EventTTL, "Amount of time to retain events. Default 1 hour.")
fs.StringVar(&s.BasicAuthFile, "basic-auth-file", s.BasicAuthFile, "If set, the file that will be used to admit requests to the secure port of the API server via http basic authentication.")
fs.StringVar(&s.TokenAuthFile, "token-auth-file", s.TokenAuthFile, "If set, the file that will be used to secure the secure port of the API server via token authentication.")

View File

@ -76,7 +76,7 @@ func Run(s *options.APIServer) error {
resourceEncoding.SetVersionEncoding(group, storageEncodingVersion, unversioned.GroupVersion{Group: group, Version: runtime.APIVersionInternal})
}
storageFactory := genericapiserver.NewDefaultStorageFactory(s.StorageConfig, api.Codecs, resourceEncoding, apiResourceConfigSource)
storageFactory := genericapiserver.NewDefaultStorageFactory(s.StorageConfig, s.DefaultStorageMediaType, api.Codecs, resourceEncoding, apiResourceConfigSource)
for _, override := range s.EtcdServersOverrides {
tokens := strings.Split(override, "#")
if len(tokens) != 2 {

View File

@ -192,6 +192,7 @@ ADMISSION_CONTROL="NamespaceLifecycle,LimitRanger,ResourceQuota"
--public-address-override="127.0.0.1" \
--kubelet-port=${KUBELET_PORT} \
--runtime-config=api/v1 \
--storage-media-type="${KUBE_TEST_API_STORAGE_TYPE-}" \
--cert-dir="${TMPDIR:-/tmp/}" \
--service-cluster-ip-range="10.0.0.0/24" 1>&2 &
APISERVER_PID=$!
@ -202,6 +203,7 @@ kube::util::wait_for_url "http://127.0.0.1:${API_PORT}/healthz" "apiserver"
kube::log::status "Starting controller-manager"
"${KUBE_OUTPUT_HOSTBIN}/kube-controller-manager" \
--port="${CTLRMGR_PORT}" \
--kube-api-content-type="${KUBE_TEST_API_TYPE-}" \
--master="127.0.0.1:${API_PORT}" 1>&2 &
CTLRMGR_PID=$!

View File

@ -394,6 +394,7 @@ static-pods-config
stats-port
stop-services
storage-backend
storage-media-type
storage-version
storage-versions
streaming-connection-idle-timeout

View File

@ -17,6 +17,7 @@ limitations under the License.
package api_test
import (
"bytes"
"encoding/hex"
"math/rand"
"testing"
@ -37,10 +38,32 @@ import (
func init() {
codecsToTest = append(codecsToTest, func(version unversioned.GroupVersion, item runtime.Object) (runtime.Codec, error) {
s := protobuf.NewSerializer(api.Scheme, runtime.ObjectTyperToTyper(api.Scheme), "application/arbitrary.content.type")
return api.Codecs.CodecForVersions(s, testapi.ExternalGroupVersions(), nil), nil
return api.Codecs.CodecForVersions(s, s, testapi.ExternalGroupVersions(), nil), nil
})
}
func TestUniversalDeserializer(t *testing.T) {
expected := &v1.Pod{ObjectMeta: v1.ObjectMeta{Name: "test"}}
d := api.Codecs.UniversalDeserializer()
for _, mediaType := range []string{"application/json", "application/yaml", "application/vnd.kubernetes.protobuf"} {
e, ok := api.Codecs.SerializerForMediaType(mediaType, nil)
if !ok {
t.Fatal(mediaType)
}
buf := &bytes.Buffer{}
if err := e.EncodeToStream(expected, buf); err != nil {
t.Fatalf("%s: %v", mediaType, err)
}
obj, _, err := d.Decode(buf.Bytes(), &unversioned.GroupVersionKind{Kind: "Pod", Version: "v1"}, nil)
if err != nil {
t.Fatalf("%s: %v", mediaType, err)
}
if !api.Semantic.DeepEqual(expected, obj) {
t.Fatalf("%s: %#v", mediaType, obj)
}
}
}
func TestProtobufRoundTrip(t *testing.T) {
obj := &v1.Pod{}
apitesting.FuzzerFor(t, v1.SchemeGroupVersion, rand.NewSource(benchmarkSeed)).Fuzz(obj)

View File

@ -284,18 +284,14 @@ func TestObjectWatchFraming(t *testing.T) {
converted, _ := api.Scheme.ConvertToVersion(secret, "v1")
v1secret := converted.(*v1.Secret)
for _, streamingMediaType := range api.Codecs.SupportedStreamingMediaTypes() {
s, framer, mediaType, _ := api.Codecs.StreamingSerializerForMediaType(streamingMediaType, nil)
// TODO: remove this when the runtime.SerializerInfo PR lands
if mediaType == "application/vnd.kubernetes.protobuf;stream=watch" {
mediaType = "application/vnd.kubernetes.protobuf"
}
embedded, ok := api.Codecs.SerializerForMediaType(mediaType, nil)
if !ok {
t.Logf("no embedded serializer for %s", mediaType)
embedded = s
s, _ := api.Codecs.StreamingSerializerForMediaType(streamingMediaType, nil)
framer := s.Framer
embedded := s.Embedded.Serializer
if embedded == nil {
t.Errorf("no embedded serializer for %s", streamingMediaType)
continue
}
innerDecode := api.Codecs.DecoderToVersion(embedded, api.SchemeGroupVersion)
//innerEncode := api.Codecs.EncoderForVersion(embedded, api.SchemeGroupVersion)
// write a single object through the framer and back out
obj := &bytes.Buffer{}

View File

@ -19,6 +19,7 @@ package testapi
import (
"fmt"
"mime"
"os"
"reflect"
"strings"
@ -33,6 +34,7 @@ import (
"k8s.io/kubernetes/pkg/apis/batch"
"k8s.io/kubernetes/pkg/apis/extensions"
"k8s.io/kubernetes/pkg/runtime"
"k8s.io/kubernetes/pkg/runtime/serializer/recognizer"
_ "k8s.io/kubernetes/federation/apis/federation/install"
_ "k8s.io/kubernetes/pkg/api/install"
@ -45,14 +47,16 @@ import (
)
var (
Groups = make(map[string]TestGroup)
Default TestGroup
Autoscaling TestGroup
Batch TestGroup
Extensions TestGroup
Apps TestGroup
Federation TestGroup
NegotiatedSerializer = api.Codecs
Groups = make(map[string]TestGroup)
Default TestGroup
Autoscaling TestGroup
Batch TestGroup
Extensions TestGroup
Apps TestGroup
Federation TestGroup
serializer runtime.SerializerInfo
storageSerializer runtime.SerializerInfo
)
type TestGroup struct {
@ -62,6 +66,30 @@ type TestGroup struct {
}
func init() {
if apiMediaType := os.Getenv("KUBE_TEST_API_TYPE"); len(apiMediaType) > 0 {
var ok bool
mediaType, options, err := mime.ParseMediaType(apiMediaType)
if err != nil {
panic(err)
}
serializer, ok = api.Codecs.SerializerForMediaType(mediaType, options)
if !ok {
panic(fmt.Sprintf("no serializer for %s", apiMediaType))
}
}
if storageMediaType := StorageMediaType(); len(storageMediaType) > 0 {
var ok bool
mediaType, options, err := mime.ParseMediaType(storageMediaType)
if err != nil {
panic(err)
}
storageSerializer, ok = api.Codecs.SerializerForMediaType(mediaType, options)
if !ok {
panic(fmt.Sprintf("no serializer for %s", storageMediaType))
}
}
kubeTestAPI := os.Getenv("KUBE_TEST_API")
if len(kubeTestAPI) != 0 {
testGroupVersions := strings.Split(kubeTestAPI, ",")
@ -173,9 +201,40 @@ func (g TestGroup) InternalTypes() map[string]reflect.Type {
}
// Codec returns the codec for the API version to test against, as set by the
// KUBE_TEST_API env var.
// KUBE_TEST_API_TYPE env var.
func (g TestGroup) Codec() runtime.Codec {
return api.Codecs.LegacyCodec(g.externalGroupVersion)
if serializer.Serializer == nil {
return api.Codecs.LegacyCodec(g.externalGroupVersion)
}
return api.Codecs.CodecForVersions(serializer, api.Codecs.UniversalDeserializer(), []unversioned.GroupVersion{g.externalGroupVersion}, nil)
}
// NegotiatedSerializer returns the negotiated serializer for the server.
func (g TestGroup) NegotiatedSerializer() runtime.NegotiatedSerializer {
return api.Codecs
}
func StorageMediaType() string {
return os.Getenv("KUBE_TEST_API_STORAGE_TYPE")
}
// StorageCodec returns the codec for the API version to store in etcd, as set by the
// KUBE_TEST_API_STORAGE_TYPE env var.
func (g TestGroup) StorageCodec() runtime.Codec {
s := storageSerializer.Serializer
if s == nil {
return api.Codecs.LegacyCodec(g.externalGroupVersion)
}
// etcd2 only supports string data - we must wrap any result before returning
// TODO: remove for etcd3 / make parameterizable
if !storageSerializer.EncodesAsText {
s = runtime.NewBase64Serializer(s)
}
ds := recognizer.NewDecoder(s, api.Codecs.UniversalDeserializer())
return api.Codecs.CodecForVersions(s, ds, []unversioned.GroupVersion{g.externalGroupVersion}, nil)
}
// Converter returns the api.Scheme for the API version to test against, as set by the

View File

@ -21,6 +21,7 @@ syntax = 'proto2';
package k8s.io.kubernetes.pkg.api.unversioned;
import "k8s.io/kubernetes/pkg/runtime/generated.proto";
import "k8s.io/kubernetes/pkg/util/intstr/generated.proto";
// Package-wide variables from generator "generated".

View File

@ -95,6 +95,7 @@ import math "math"
import k8s_io_kubernetes_pkg_api_unversioned "k8s.io/kubernetes/pkg/api/unversioned"
import k8s_io_kubernetes_pkg_api_v1 "k8s.io/kubernetes/pkg/api/v1"
import k8s_io_kubernetes_pkg_util_intstr "k8s.io/kubernetes/pkg/util/intstr"
import io "io"

View File

@ -24,6 +24,7 @@ package k8s.io.kubernetes.pkg.apis.extensions.v1beta1;
import "k8s.io/kubernetes/pkg/api/resource/generated.proto";
import "k8s.io/kubernetes/pkg/api/unversioned/generated.proto";
import "k8s.io/kubernetes/pkg/api/v1/generated.proto";
import "k8s.io/kubernetes/pkg/runtime/generated.proto";
import "k8s.io/kubernetes/pkg/util/intstr/generated.proto";
// Package-wide variables from generator "generated".

View File

@ -274,9 +274,16 @@ type StripVersionNegotiatedSerializer struct {
runtime.NegotiatedSerializer
}
func (n StripVersionNegotiatedSerializer) EncoderForVersion(serializer runtime.Serializer, gv unversioned.GroupVersion) runtime.Encoder {
encoder := n.NegotiatedSerializer.EncoderForVersion(serializer, gv)
return stripVersionEncoder{encoder, serializer}
func (n StripVersionNegotiatedSerializer) EncoderForVersion(encoder runtime.Encoder, gv unversioned.GroupVersion) runtime.Encoder {
serializer, ok := encoder.(runtime.Serializer)
if !ok {
// The stripVersionEncoder needs both an encoder and decoder, but is called from a context that doesn't have access to the
// decoder. We do a best effort cast here (since this code path is only for backwards compatibility) to get access to the caller's
// decoder.
panic(fmt.Sprintf("Unable to extract serializer from %#v", encoder))
}
versioned := n.NegotiatedSerializer.EncoderForVersion(encoder, gv)
return stripVersionEncoder{versioned, serializer}
}
func keepUnversioned(group string) bool {
@ -422,14 +429,14 @@ func write(statusCode int, gv unversioned.GroupVersion, s runtime.NegotiatedSeri
// writeNegotiated renders an object in the content type negotiated by the client
func writeNegotiated(s runtime.NegotiatedSerializer, gv unversioned.GroupVersion, w http.ResponseWriter, req *http.Request, statusCode int, object runtime.Object) {
serializer, contentType, err := negotiateOutputSerializer(req, s)
serializer, err := negotiateOutputSerializer(req, s)
if err != nil {
status := errToAPIStatus(err)
writeRawJSON(int(status.Code), status, w)
return
}
w.Header().Set("Content-Type", contentType)
w.Header().Set("Content-Type", serializer.MediaType)
w.WriteHeader(statusCode)
encoder := s.EncoderForVersion(serializer, gv)

View File

@ -50,31 +50,31 @@ func negotiateOutput(req *http.Request, supported []string) (string, map[string]
return mediaType, accept.Params, nil
}
func negotiateOutputSerializer(req *http.Request, ns runtime.NegotiatedSerializer) (runtime.Serializer, string, error) {
func negotiateOutputSerializer(req *http.Request, ns runtime.NegotiatedSerializer) (runtime.SerializerInfo, error) {
supported := ns.SupportedMediaTypes()
mediaType, params, err := negotiateOutput(req, supported)
if err != nil {
return nil, "", err
return runtime.SerializerInfo{}, err
}
if s, ok := ns.SerializerForMediaType(mediaType, params); ok {
return s, mediaType, nil
return s, nil
}
return nil, "", errNotAcceptable{supported}
return runtime.SerializerInfo{}, errNotAcceptable{supported}
}
func negotiateOutputStreamSerializer(req *http.Request, ns runtime.NegotiatedSerializer) (runtime.Serializer, runtime.Framer, string, string, error) {
func negotiateOutputStreamSerializer(req *http.Request, ns runtime.NegotiatedSerializer) (runtime.StreamSerializerInfo, error) {
supported := ns.SupportedMediaTypes()
mediaType, params, err := negotiateOutput(req, supported)
if err != nil {
return nil, nil, "", "", err
return runtime.StreamSerializerInfo{}, err
}
if s, f, exactMediaType, ok := ns.StreamingSerializerForMediaType(mediaType, params); ok {
return s, f, mediaType, exactMediaType, nil
if s, ok := ns.StreamingSerializerForMediaType(mediaType, params); ok {
return s, nil
}
return nil, nil, "", "", errNotAcceptable{supported}
return runtime.StreamSerializerInfo{}, errNotAcceptable{supported}
}
func negotiateInputSerializer(req *http.Request, s runtime.NegotiatedSerializer) (runtime.Serializer, error) {
func negotiateInputSerializer(req *http.Request, s runtime.NegotiatedSerializer) (runtime.SerializerInfo, error) {
supported := s.SupportedMediaTypes()
mediaType := req.Header.Get("Content-Type")
if len(mediaType) == 0 {
@ -82,11 +82,11 @@ func negotiateInputSerializer(req *http.Request, s runtime.NegotiatedSerializer)
}
mediaType, options, err := mime.ParseMediaType(mediaType)
if err != nil {
return nil, errUnsupportedMediaType{supported}
return runtime.SerializerInfo{}, errUnsupportedMediaType{supported}
}
out, ok := s.SerializerForMediaType(mediaType, options)
if !ok {
return nil, errUnsupportedMediaType{supported}
return runtime.SerializerInfo{}, errUnsupportedMediaType{supported}
}
return out, nil
}

View File

@ -41,27 +41,34 @@ func (n *fakeNegotiater) SupportedStreamingMediaTypes() []string {
return n.streamTypes
}
func (n *fakeNegotiater) SerializerForMediaType(mediaType string, options map[string]string) (runtime.Serializer, bool) {
func (n *fakeNegotiater) SerializerForMediaType(mediaType string, options map[string]string) (runtime.SerializerInfo, bool) {
n.mediaType = mediaType
if len(options) > 0 {
n.options = options
}
return n.serializer, n.serializer != nil
return runtime.SerializerInfo{Serializer: n.serializer, MediaType: n.mediaType, EncodesAsText: true}, n.serializer != nil
}
func (n *fakeNegotiater) StreamingSerializerForMediaType(mediaType string, options map[string]string) (runtime.Serializer, runtime.Framer, string, bool) {
func (n *fakeNegotiater) StreamingSerializerForMediaType(mediaType string, options map[string]string) (runtime.StreamSerializerInfo, bool) {
n.streamMediaType = mediaType
if len(options) > 0 {
n.streamOptions = options
}
return n.streamSerializer, n.framer, mediaType, n.streamSerializer != nil
return runtime.StreamSerializerInfo{
SerializerInfo: runtime.SerializerInfo{
Serializer: n.serializer,
MediaType: mediaType,
EncodesAsText: true,
},
Framer: n.framer,
}, n.streamSerializer != nil
}
func (n *fakeNegotiater) EncoderForVersion(serializer runtime.Serializer, gv unversioned.GroupVersion) runtime.Encoder {
func (n *fakeNegotiater) EncoderForVersion(serializer runtime.Encoder, gv unversioned.GroupVersion) runtime.Encoder {
return n.serializer
}
func (n *fakeNegotiater) DecoderToVersion(serializer runtime.Serializer, gv unversioned.GroupVersion) runtime.Decoder {
func (n *fakeNegotiater) DecoderToVersion(serializer runtime.Decoder, gv unversioned.GroupVersion) runtime.Decoder {
return n.serializer
}
@ -228,7 +235,7 @@ func TestNegotiate(t *testing.T) {
req = &http.Request{Header: http.Header{}}
req.Header.Set("Accept", test.accept)
}
s, contentType, err := negotiateOutputSerializer(req, test.ns)
s, err := negotiateOutputSerializer(req, test.ns)
switch {
case err == nil && test.errFn != nil:
t.Errorf("%d: failed: expected error", i)
@ -251,11 +258,11 @@ func TestNegotiate(t *testing.T) {
}
continue
}
if test.contentType != contentType {
t.Errorf("%d: unexpected %s %s", i, test.contentType, contentType)
if test.contentType != s.MediaType {
t.Errorf("%d: unexpected %s %s", i, test.contentType, s.MediaType)
}
if s != test.serializer {
t.Errorf("%d: unexpected %s %s", i, test.serializer, s)
if s.Serializer != test.serializer {
t.Errorf("%d: unexpected %s %s", i, test.serializer, s.Serializer)
}
if !reflect.DeepEqual(test.params, test.ns.options) {
t.Errorf("%d: unexpected %#v %#v", i, test.params, test.ns.options)

View File

@ -59,47 +59,33 @@ func (w *realTimeoutFactory) TimeoutCh() (<-chan time.Time, func() bool) {
return t.C, t.Stop
}
type textEncodable interface {
// EncodesAsText should return true if objects should be transmitted as a WebSocket Text
// frame (otherwise, they will be sent as a Binary frame).
EncodesAsText() bool
}
// serveWatch handles serving requests to the server
// TODO: the functionality in this method and in WatchServer.Serve is not cleanly decoupled.
func serveWatch(watcher watch.Interface, scope RequestScope, req *restful.Request, res *restful.Response, timeout time.Duration) {
// negotiate for the stream serializer
serializer, framer, mediaType, exactMediaType, err := negotiateOutputStreamSerializer(req.Request, scope.Serializer)
serializer, err := negotiateOutputStreamSerializer(req.Request, scope.Serializer)
if err != nil {
scope.err(err, res.ResponseWriter, req.Request)
return
}
if framer == nil {
scope.err(fmt.Errorf("no framer defined for %q available for embedded encoding", mediaType), res.ResponseWriter, req.Request)
if serializer.Framer == nil {
scope.err(fmt.Errorf("no framer defined for %q available for embedded encoding", serializer.MediaType), res.ResponseWriter, req.Request)
return
}
encoder := scope.Serializer.EncoderForVersion(serializer, scope.Kind.GroupVersion())
encoder := scope.Serializer.EncoderForVersion(serializer.Serializer, scope.Kind.GroupVersion())
useTextFraming := false
if encodable, ok := serializer.(textEncodable); ok && encodable.EncodesAsText() {
useTextFraming = true
}
useTextFraming := serializer.EncodesAsText
// find the embedded serializer matching the media type
embeddedSerializer, ok := scope.Serializer.SerializerForMediaType(mediaType, nil)
if !ok {
scope.err(fmt.Errorf("no serializer defined for %q available for embedded encoding", mediaType), res.ResponseWriter, req.Request)
return
}
embeddedEncoder := scope.Serializer.EncoderForVersion(embeddedSerializer, scope.Kind.GroupVersion())
embeddedEncoder := scope.Serializer.EncoderForVersion(serializer.Embedded.Serializer, scope.Kind.GroupVersion())
server := &WatchServer{
watching: watcher,
scope: scope,
useTextFraming: useTextFraming,
mediaType: exactMediaType,
framer: framer,
mediaType: serializer.MediaType,
framer: serializer.Framer,
encoder: encoder,
embeddedEncoder: embeddedEncoder,
fixup: func(obj runtime.Object) {

View File

@ -222,9 +222,9 @@ func TestWatchRead(t *testing.T) {
for _, protocol := range protocols {
for _, test := range testCases {
serializer, framer, _, ok := api.Codecs.StreamingSerializerForMediaType(test.MediaType, nil)
serializer, ok := api.Codecs.StreamingSerializerForMediaType(test.MediaType, nil)
if !ok {
t.Fatal(framer)
t.Fatal(serializer)
}
r, contentType := protocol.fn(test.Accept)
@ -235,13 +235,13 @@ func TestWatchRead(t *testing.T) {
}
objectSerializer, ok := api.Codecs.SerializerForMediaType(test.MediaType, nil)
if !ok {
t.Fatal(framer)
t.Fatal(objectSerializer)
}
objectCodec := api.Codecs.DecoderToVersion(objectSerializer, testInternalGroupVersion)
var fr io.ReadCloser = r
if !protocol.selfFraming {
fr = framer.NewFrameReader(r)
fr = serializer.Framer.NewFrameReader(r)
}
d := streaming.NewDecoder(fr, serializer)
@ -495,9 +495,9 @@ func TestWatchHTTPTimeout(t *testing.T) {
timeoutCh := make(chan time.Time)
done := make(chan struct{})
_, framer, _, ok := api.Codecs.StreamingSerializerForMediaType("application/json", nil)
serializer, ok := api.Codecs.StreamingSerializerForMediaType("application/json", nil)
if !ok {
t.Fatal(framer)
t.Fatal(serializer)
}
// Setup a new watchserver
@ -505,7 +505,7 @@ func TestWatchHTTPTimeout(t *testing.T) {
watching: watcher,
mediaType: "testcase/json",
framer: framer,
framer: serializer.Framer,
encoder: newCodec,
embeddedEncoder: newCodec,

View File

@ -139,26 +139,23 @@ func readExpBackoffConfig() BackoffManager {
func createSerializers(config ContentConfig) (*Serializers, error) {
negotiated := config.NegotiatedSerializer
contentType := config.ContentType
serializer, ok := negotiated.SerializerForMediaType(contentType, nil)
info, ok := negotiated.SerializerForMediaType(contentType, nil)
if !ok {
return nil, fmt.Errorf("serializer for %s not registered", contentType)
}
streamingSerializer, framer, _, ok := negotiated.StreamingSerializerForMediaType(contentType, nil)
streamInfo, ok := negotiated.StreamingSerializerForMediaType(contentType, nil)
if !ok {
return nil, fmt.Errorf("streaming serializer for %s not registered", contentType)
}
if framer == nil {
return nil, fmt.Errorf("no framer for %s", contentType)
}
internalGV := unversioned.GroupVersion{
Group: config.GroupVersion.Group,
Version: runtime.APIVersionInternal,
}
return &Serializers{
Encoder: negotiated.EncoderForVersion(serializer, *config.GroupVersion),
Decoder: negotiated.DecoderToVersion(serializer, internalGV),
StreamingSerializer: streamingSerializer,
Framer: framer,
Encoder: negotiated.EncoderForVersion(info.Serializer, *config.GroupVersion),
Decoder: negotiated.DecoderToVersion(info.Serializer, internalGV),
StreamingSerializer: streamInfo.Serializer,
Framer: streamInfo.Framer,
}, nil
}

View File

@ -47,7 +47,7 @@ func TestDoRequestSuccess(t *testing.T) {
Host: testServer.URL,
ContentConfig: ContentConfig{
GroupVersion: testapi.Default.GroupVersion(),
NegotiatedSerializer: testapi.NegotiatedSerializer,
NegotiatedSerializer: testapi.Default.NegotiatedSerializer(),
},
Username: "user",
Password: "pass",
@ -92,7 +92,7 @@ func TestDoRequestFailed(t *testing.T) {
Host: testServer.URL,
ContentConfig: ContentConfig{
GroupVersion: testapi.Default.GroupVersion(),
NegotiatedSerializer: testapi.NegotiatedSerializer,
NegotiatedSerializer: testapi.Default.NegotiatedSerializer(),
},
})
if err != nil {
@ -130,7 +130,7 @@ func TestDoRequestCreated(t *testing.T) {
Host: testServer.URL,
ContentConfig: ContentConfig{
GroupVersion: testapi.Default.GroupVersion(),
NegotiatedSerializer: testapi.NegotiatedSerializer,
NegotiatedSerializer: testapi.Default.NegotiatedSerializer(),
},
Username: "user",
Password: "pass",

View File

@ -87,13 +87,13 @@ func TestSetKubernetesDefaultsUserAgent(t *testing.T) {
}
func TestRESTClientRequires(t *testing.T) {
if _, err := RESTClientFor(&Config{Host: "127.0.0.1", ContentConfig: ContentConfig{NegotiatedSerializer: testapi.NegotiatedSerializer}}); err == nil {
if _, err := RESTClientFor(&Config{Host: "127.0.0.1", ContentConfig: ContentConfig{NegotiatedSerializer: testapi.Default.NegotiatedSerializer()}}); err == nil {
t.Errorf("unexpected non-error")
}
if _, err := RESTClientFor(&Config{Host: "127.0.0.1", ContentConfig: ContentConfig{GroupVersion: testapi.Default.GroupVersion()}}); err == nil {
t.Errorf("unexpected non-error")
}
if _, err := RESTClientFor(&Config{Host: "127.0.0.1", ContentConfig: ContentConfig{GroupVersion: testapi.Default.GroupVersion(), NegotiatedSerializer: testapi.NegotiatedSerializer}}); err != nil {
if _, err := RESTClientFor(&Config{Host: "127.0.0.1", ContentConfig: ContentConfig{GroupVersion: testapi.Default.GroupVersion(), NegotiatedSerializer: testapi.Default.NegotiatedSerializer()}}); err != nil {
t.Errorf("unexpected error: %v", err)
}
}

View File

@ -639,6 +639,10 @@ func (r *Request) Watch() (watch.Interface, error) {
if r.err != nil {
return nil, r.err
}
if r.serializers.Framer == nil {
return nil, fmt.Errorf("watching resources is not possible with this client (content-type: %s)", r.content.ContentType)
}
url := r.URL().String()
req, err := http.NewRequest(r.verb, url, r.body)
if err != nil {

View File

@ -247,7 +247,7 @@ func defaultContentConfig() ContentConfig {
return ContentConfig{
GroupVersion: testapi.Default.GroupVersion(),
Codec: testapi.Default.Codec(),
NegotiatedSerializer: testapi.NegotiatedSerializer,
NegotiatedSerializer: testapi.Default.NegotiatedSerializer(),
}
}
@ -549,6 +549,7 @@ func TestRequestWatch(t *testing.T) {
},
{
Request: &Request{
serializers: defaultSerializers(),
client: clientFunc(func(req *http.Request) (*http.Response, error) {
return nil, io.EOF
}),
@ -558,6 +559,7 @@ func TestRequestWatch(t *testing.T) {
},
{
Request: &Request{
serializers: defaultSerializers(),
client: clientFunc(func(req *http.Request) (*http.Response, error) {
return nil, &url.Error{Err: io.EOF}
}),
@ -567,6 +569,7 @@ func TestRequestWatch(t *testing.T) {
},
{
Request: &Request{
serializers: defaultSerializers(),
client: clientFunc(func(req *http.Request) (*http.Response, error) {
return nil, errors.New("http: can't write HTTP request on broken connection")
}),
@ -576,6 +579,7 @@ func TestRequestWatch(t *testing.T) {
},
{
Request: &Request{
serializers: defaultSerializers(),
client: clientFunc(func(req *http.Request) (*http.Response, error) {
return nil, errors.New("foo: connection reset by peer")
}),

View File

@ -215,7 +215,10 @@ func setDiscoveryDefaults(config *restclient.Config) error {
config.APIPath = ""
config.GroupVersion = nil
codec := runtime.NoopEncoder{Decoder: api.Codecs.UniversalDecoder()}
config.NegotiatedSerializer = serializer.NegotiatedSerializerWrapper(codec, codec, runtime.DefaultFramer)
config.NegotiatedSerializer = serializer.NegotiatedSerializerWrapper(
runtime.SerializerInfo{Serializer: codec},
runtime.StreamSerializerInfo{},
)
if len(config.UserAgent) == 0 {
config.UserAgent = restclient.DefaultKubernetesUserAgent()
}

View File

@ -33,7 +33,6 @@ import (
"k8s.io/kubernetes/pkg/conversion/queryparams"
"k8s.io/kubernetes/pkg/runtime"
"k8s.io/kubernetes/pkg/runtime/serializer"
serializerjson "k8s.io/kubernetes/pkg/runtime/serializer/json"
"k8s.io/kubernetes/pkg/watch"
)
@ -51,8 +50,10 @@ func NewClient(conf *restclient.Config) (*Client, error) {
conf = &confCopy
codec := dynamicCodec{}
legacyCodec := api.Codecs.LegacyCodec(v1.SchemeGroupVersion)
conf.NegotiatedSerializer = serializer.NegotiatedSerializerWrapper(codec, legacyCodec, serializerjson.Framer)
// TODO: it's questionable that this should be using anything other than unstructured schema and JSON
streamingInfo, _ := api.Codecs.StreamingSerializerForMediaType("application/json;stream=watch", nil)
conf.NegotiatedSerializer = serializer.NegotiatedSerializerWrapper(runtime.SerializerInfo{Serializer: codec}, streamingInfo)
if conf.APIPath == "" {
conf.APIPath = "/api"

View File

@ -42,7 +42,7 @@ func TestSetKubernetesDefaults(t *testing.T) {
ContentConfig: restclient.ContentConfig{
GroupVersion: testapi.Default.GroupVersion(),
Codec: testapi.Default.Codec(),
NegotiatedSerializer: testapi.NegotiatedSerializer,
NegotiatedSerializer: testapi.Default.NegotiatedSerializer(),
},
QPS: 5,
Burst: 10,
@ -126,7 +126,7 @@ func TestHelperGetServerAPIVersions(t *testing.T) {
w.Write(output)
}))
defer server.Close()
got, err := restclient.ServerAPIVersions(&restclient.Config{Host: server.URL, ContentConfig: restclient.ContentConfig{GroupVersion: &unversioned.GroupVersion{Group: "invalid version", Version: "one"}, NegotiatedSerializer: testapi.NegotiatedSerializer}})
got, err := restclient.ServerAPIVersions(&restclient.Config{Host: server.URL, ContentConfig: restclient.ContentConfig{GroupVersion: &unversioned.GroupVersion{Group: "invalid version", Version: "one"}, NegotiatedSerializer: testapi.Default.NegotiatedSerializer()}})
if err != nil {
t.Fatalf("unexpected encoding error: %v", err)
}

View File

@ -215,7 +215,7 @@ func TestStream(t *testing.T) {
url, _ := url.ParseRequestURI(server.URL)
config := restclient.ContentConfig{
GroupVersion: &unversioned.GroupVersion{Group: "x"},
NegotiatedSerializer: testapi.NegotiatedSerializer,
NegotiatedSerializer: testapi.Default.NegotiatedSerializer(),
}
c, err := restclient.NewRESTClient(url, "", config, -1, -1, nil, nil)
if err != nil {

View File

@ -26,7 +26,7 @@ type ResourceEncodingConfig interface {
// StorageEncoding returns the serialization format for the resource.
// TODO this should actually return a GroupVersionKind since you can logically have multiple "matching" Kinds
// For now, it returns just the GroupVersion for consistency with old behavior
StoragageEncodingFor(unversioned.GroupResource) (unversioned.GroupVersion, error)
StorageEncodingFor(unversioned.GroupResource) (unversioned.GroupVersion, error)
// InMemoryEncodingFor returns the groupVersion for the in memory representation the storage should convert to.
InMemoryEncodingFor(unversioned.GroupResource) (unversioned.GroupVersion, error)
@ -78,7 +78,7 @@ func (o *DefaultResourceEncodingConfig) SetResourceEncoding(resourceBeingStored
o.Groups[group].InternalResourceEncodings[resourceBeingStored.Resource] = internalVersion
}
func (o *DefaultResourceEncodingConfig) StoragageEncodingFor(resource unversioned.GroupResource) (unversioned.GroupVersion, error) {
func (o *DefaultResourceEncodingConfig) StorageEncodingFor(resource unversioned.GroupResource) (unversioned.GroupVersion, error) {
groupMeta, err := registered.Group(resource.Group)
if err != nil {
return unversioned.GroupVersion{}, err

View File

@ -18,9 +18,11 @@ package genericapiserver
import (
"fmt"
"mime"
"k8s.io/kubernetes/pkg/api/unversioned"
"k8s.io/kubernetes/pkg/runtime"
"k8s.io/kubernetes/pkg/runtime/serializer/recognizer"
"k8s.io/kubernetes/pkg/runtime/serializer/versioning"
"k8s.io/kubernetes/pkg/storage"
"k8s.io/kubernetes/pkg/storage/storagebackend"
@ -50,19 +52,25 @@ type DefaultStorageFactory struct {
Overrides map[unversioned.GroupResource]groupResourceOverrides
// DefaultMediaType is the media type used to store resources. If it is not set, "application/json" is used.
DefaultMediaType string
// DefaultSerializer is used to create encoders and decoders for the storage.Interface.
DefaultSerializer runtime.NegotiatedSerializer
DefaultSerializer runtime.StorageSerializer
// ResourceEncodingConfig describes how to encode a particular GroupVersionResource
ResourceEncodingConfig ResourceEncodingConfig
// APIResourceConfigSource indicates whether the *storage* is enabled, NOT the API
// This is discrete from resource enablement because those are separate concerns. How it is surfaced to the user via flags
// or config is up to whoever is building this.
// This is discrete from resource enablement because those are separate concerns. How this source is configured
// is left to the caller.
APIResourceConfigSource APIResourceConfigSource
// newEtcdFn exists to be overwritten for unit testing. You should never set this in a normal world.
newEtcdFn func(ns runtime.NegotiatedSerializer, storageVersion, memoryVersion unversioned.GroupVersion, config storagebackend.Config) (etcdStorage storage.Interface, err error)
// newStorageCodecFn exists to be overwritten for unit testing.
newStorageCodecFn func(storageMediaType string, ns runtime.StorageSerializer, storageVersion, memoryVersion unversioned.GroupVersion, config storagebackend.Config) (codec runtime.Codec, err error)
// newStorageFn exists to be overwritten for unit testing.
newStorageFn func(config storagebackend.Config) (etcdStorage storage.Interface, err error)
}
type groupResourceOverrides struct {
@ -71,8 +79,10 @@ type groupResourceOverrides struct {
etcdLocation []string
// etcdPrefix contains the list of "special" prefixes for a GroupResource. Resource=* means for the entire group
etcdPrefix string
// mediaType is the desired serializer to choose. If empty, the default is chosen.
mediaType string
// serializer contains the list of "special" serializers for a GroupResource. Resource=* means for the entire group
serializer runtime.NegotiatedSerializer
serializer runtime.StorageSerializer
// cohabitatingResources keeps track of which resources must be stored together. This happens when we have multiple ways
// of exposing one set of concepts. autoscaling.HPA and extensions.HPA as a for instance
// The order of the slice matters! It is the priority order of lookup for finding a storage location
@ -83,15 +93,20 @@ var _ StorageFactory = &DefaultStorageFactory{}
const AllResources = "*"
func NewDefaultStorageFactory(config storagebackend.Config, defaultSerializer runtime.NegotiatedSerializer, resourceEncodingConfig ResourceEncodingConfig, resourceConfig APIResourceConfigSource) *DefaultStorageFactory {
func NewDefaultStorageFactory(config storagebackend.Config, defaultMediaType string, defaultSerializer runtime.StorageSerializer, resourceEncodingConfig ResourceEncodingConfig, resourceConfig APIResourceConfigSource) *DefaultStorageFactory {
if len(defaultMediaType) == 0 {
defaultMediaType = runtime.ContentTypeJSON
}
return &DefaultStorageFactory{
StorageConfig: config,
Overrides: map[unversioned.GroupResource]groupResourceOverrides{},
DefaultMediaType: defaultMediaType,
DefaultSerializer: defaultSerializer,
ResourceEncodingConfig: resourceEncodingConfig,
APIResourceConfigSource: resourceConfig,
newEtcdFn: newEtcd,
newStorageCodecFn: NewStorageCodec,
newStorageFn: newStorage,
}
}
@ -107,8 +122,9 @@ func (s *DefaultStorageFactory) SetEtcdPrefix(groupResource unversioned.GroupRes
s.Overrides[groupResource] = overrides
}
func (s *DefaultStorageFactory) SetSerializer(groupResource unversioned.GroupResource, serializer runtime.NegotiatedSerializer) {
func (s *DefaultStorageFactory) SetSerializer(groupResource unversioned.GroupResource, mediaType string, serializer runtime.StorageSerializer) {
overrides := s.Overrides[groupResource]
overrides.mediaType = mediaType
overrides.serializer = serializer
s.Overrides[groupResource] = overrides
}
@ -160,6 +176,14 @@ func (s *DefaultStorageFactory) New(groupResource unversioned.GroupResource) (st
etcdPrefix = exactResourceOverride.etcdPrefix
}
etcdMediaType := s.DefaultMediaType
if len(groupOverride.mediaType) != 0 {
etcdMediaType = groupOverride.mediaType
}
if len(exactResourceOverride.mediaType) != 0 {
etcdMediaType = exactResourceOverride.mediaType
}
etcdSerializer := s.DefaultSerializer
if groupOverride.serializer != nil {
etcdSerializer = groupOverride.serializer
@ -174,7 +198,7 @@ func (s *DefaultStorageFactory) New(groupResource unversioned.GroupResource) (st
config.ServerList = overriddenEtcdLocations
}
storageEncodingVersion, err := s.ResourceEncodingConfig.StoragageEncodingFor(chosenStorageResource)
storageEncodingVersion, err := s.ResourceEncodingConfig.StorageEncodingFor(chosenStorageResource)
if err != nil {
return nil, err
}
@ -183,27 +207,19 @@ func (s *DefaultStorageFactory) New(groupResource unversioned.GroupResource) (st
return nil, err
}
codec, err := s.newStorageCodecFn(etcdMediaType, etcdSerializer, storageEncodingVersion, internalVersion, config)
if err != nil {
return nil, err
}
config.Codec = codec
glog.V(3).Infof("storing %v in %v, reading as %v from %v", groupResource, storageEncodingVersion, internalVersion, config)
return s.newEtcdFn(etcdSerializer, storageEncodingVersion, internalVersion, config)
return s.newStorageFn(config)
}
func newEtcd(ns runtime.NegotiatedSerializer, storageVersion, memoryVersion unversioned.GroupVersion, config storagebackend.Config) (etcdStorage storage.Interface, err error) {
s, ok := ns.SerializerForMediaType("application/json", nil)
if !ok {
return nil, fmt.Errorf("unable to find serializer for JSON")
}
encoder := ns.EncoderForVersion(s, storageVersion)
decoder := ns.DecoderToVersion(s, memoryVersion)
if memoryVersion.Group != storageVersion.Group {
// Allow this codec to translate between groups.
if err = versioning.EnableCrossGroupEncoding(encoder, memoryVersion.Group, storageVersion.Group); err != nil {
return nil, fmt.Errorf("error setting up encoder from %v to %v: %v", memoryVersion, storageVersion, err)
}
if err = versioning.EnableCrossGroupDecoding(decoder, storageVersion.Group, memoryVersion.Group); err != nil {
return nil, fmt.Errorf("error setting up decoder from %v to %v: %v", storageVersion, memoryVersion, err)
}
}
config.Codec = runtime.NewCodec(encoder, decoder)
// newStorage is the default implementation for creating a storage backend.
func newStorage(config storagebackend.Config) (etcdStorage storage.Interface, err error) {
return storagebackend.Create(config)
}
@ -217,3 +233,39 @@ func (s *DefaultStorageFactory) Backends() []string {
}
return backends.List()
}
// NewStorageCodec assembles a storage codec for the provided storage media type, the provided serializer, and the requested
// storage and memory versions.
func NewStorageCodec(storageMediaType string, ns runtime.StorageSerializer, storageVersion, memoryVersion unversioned.GroupVersion, config storagebackend.Config) (runtime.Codec, error) {
mediaType, options, err := mime.ParseMediaType(storageMediaType)
if err != nil {
return nil, fmt.Errorf("%q is not a valid mime-type", storageMediaType)
}
serializer, ok := ns.SerializerForMediaType(mediaType, options)
if !ok {
return nil, fmt.Errorf("unable to find serializer for %q", storageMediaType)
}
s := serializer.Serializer
// etcd2 only supports string data - we must wrap any result before returning
// TODO: storagebackend should return a boolean indicating whether it supports binary data
if !serializer.EncodesAsText && (config.Type == storagebackend.StorageTypeUnset || config.Type == storagebackend.StorageTypeETCD2) {
glog.V(4).Infof("Wrapping the underlying binary storage serializer with a base64 encoding for etcd2")
s = runtime.NewBase64Serializer(s)
}
ds := recognizer.NewDecoder(s, ns.UniversalDeserializer())
encoder := ns.EncoderForVersion(s, storageVersion)
decoder := ns.DecoderToVersion(ds, memoryVersion)
if memoryVersion.Group != storageVersion.Group {
// Allow this codec to translate between groups.
if err := versioning.EnableCrossGroupEncoding(encoder, memoryVersion.Group, storageVersion.Group); err != nil {
return nil, fmt.Errorf("error setting up encoder from %v to %v: %v", memoryVersion, storageVersion, err)
}
if err := versioning.EnableCrossGroupDecoding(decoder, storageVersion.Group, memoryVersion.Group); err != nil {
return nil, fmt.Errorf("error setting up decoder from %v to %v: %v", storageVersion, memoryVersion, err)
}
}
return runtime.NewCodec(encoder, decoder), nil
}

View File

@ -23,7 +23,6 @@ import (
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/api/unversioned"
"k8s.io/kubernetes/pkg/apis/extensions"
"k8s.io/kubernetes/pkg/runtime"
"k8s.io/kubernetes/pkg/storage"
"k8s.io/kubernetes/pkg/storage/storagebackend"
)
@ -50,7 +49,7 @@ func TestUpdateEtcdOverrides(t *testing.T) {
defaultEtcdLocation := []string{"http://127.0.0.1"}
for i, test := range testCases {
actualConfig := storagebackend.Config{}
newEtcdFn := func(ns runtime.NegotiatedSerializer, storageVersion, memoryVersion unversioned.GroupVersion, config storagebackend.Config) (etcdStorage storage.Interface, err error) {
newStorageFn := func(config storagebackend.Config) (_ storage.Interface, err error) {
actualConfig = config
return nil, nil
}
@ -59,8 +58,8 @@ func TestUpdateEtcdOverrides(t *testing.T) {
Prefix: DefaultEtcdPathPrefix,
ServerList: defaultEtcdLocation,
}
storageFactory := NewDefaultStorageFactory(defaultConfig, api.Codecs, NewDefaultResourceEncodingConfig(), NewResourceConfig())
storageFactory.newEtcdFn = newEtcdFn
storageFactory := NewDefaultStorageFactory(defaultConfig, "", api.Codecs, NewDefaultResourceEncodingConfig(), NewResourceConfig())
storageFactory.newStorageFn = newStorageFn
storageFactory.SetEtcdLocation(test.resource, test.servers)
var err error

View File

@ -55,7 +55,7 @@ func (m *Mapper) InfoForData(data []byte, source string) (*Info, error) {
var obj runtime.Object
var versioned runtime.Object
if registered.IsThirdPartyAPIGroupVersion(gvk.GroupVersion()) {
obj, err = runtime.Decode(thirdpartyresourcedata.NewCodec(nil, gvk.Kind), data)
obj, err = runtime.Decode(thirdpartyresourcedata.NewDecoder(nil, gvk.Kind), data)
versioned = obj
} else {
obj, versioned = versions.Last(), versions.First()

View File

@ -677,7 +677,14 @@ func (m *Master) InstallThirdPartyResource(rsrc *extensions.ThirdPartyResource)
func (m *Master) thirdpartyapi(group, kind, version string) *apiserver.APIGroupVersion {
resourceStorage := thirdpartyresourcedataetcd.NewREST(
generic.RESTOptions{Storage: m.thirdPartyStorage, Decorator: generic.UndecoratedStorage, DeleteCollectionWorkers: m.deleteCollectionWorkers}, group, kind)
generic.RESTOptions{
Storage: m.thirdPartyStorage,
Decorator: generic.UndecoratedStorage,
DeleteCollectionWorkers: m.deleteCollectionWorkers,
},
group,
kind,
)
apiRoot := makeThirdPartyPath("")

View File

@ -88,7 +88,7 @@ func setUp(t *testing.T) (*Master, *etcdtesting.EtcdTestServer, Config, *assert.
resourceEncoding.SetVersionEncoding(batch.GroupName, *testapi.Batch.GroupVersion(), unversioned.GroupVersion{Group: batch.GroupName, Version: runtime.APIVersionInternal})
resourceEncoding.SetVersionEncoding(apps.GroupName, *testapi.Apps.GroupVersion(), unversioned.GroupVersion{Group: apps.GroupName, Version: runtime.APIVersionInternal})
resourceEncoding.SetVersionEncoding(extensions.GroupName, *testapi.Extensions.GroupVersion(), unversioned.GroupVersion{Group: extensions.GroupName, Version: runtime.APIVersionInternal})
storageFactory := genericapiserver.NewDefaultStorageFactory(storageConfig, api.Codecs, resourceEncoding, DefaultAPIResourceConfigSource())
storageFactory := genericapiserver.NewDefaultStorageFactory(storageConfig, testapi.StorageMediaType(), api.Codecs, resourceEncoding, DefaultAPIResourceConfigSource())
config.StorageFactory = storageFactory
config.APIResourceConfigSource = DefaultAPIResourceConfigSource()

View File

@ -90,7 +90,7 @@ func hasCreated(t *testing.T, pod *api.Pod) func(runtime.Object) bool {
func NewTestGenericStoreRegistry(t *testing.T) (*etcdtesting.EtcdTestServer, *Store) {
podPrefix := "/pods"
server := etcdtesting.NewEtcdTestClientServer(t)
s := etcdstorage.NewEtcdStorage(server.Client, testapi.Default.Codec(), etcdtest.PathPrefix(), false, etcdtest.DeserializationCacheSize)
s := etcdstorage.NewEtcdStorage(server.Client, testapi.Default.StorageCodec(), etcdtest.PathPrefix(), false, etcdtest.DeserializationCacheSize)
strategy := &testRESTStrategy{api.Scheme, api.SimpleNameGenerator, true, false, true}
return server, &Store{

View File

@ -38,7 +38,7 @@ import (
func NewEtcdStorage(t *testing.T, group string) (storage.Interface, *etcdtesting.EtcdTestServer) {
server := etcdtesting.NewEtcdTestClientServer(t)
storage := etcdstorage.NewEtcdStorage(server.Client, testapi.Groups[group].Codec(), etcdtest.PathPrefix(), false, etcdtest.DeserializationCacheSize)
storage := etcdstorage.NewEtcdStorage(server.Client, testapi.Groups[group].StorageCodec(), etcdtest.PathPrefix(), false, etcdtest.DeserializationCacheSize)
return storage, server
}

View File

@ -33,6 +33,7 @@ import (
"k8s.io/kubernetes/pkg/apis/extensions"
"k8s.io/kubernetes/pkg/apis/extensions/v1beta1"
"k8s.io/kubernetes/pkg/runtime"
"k8s.io/kubernetes/pkg/util/sets"
)
type thirdPartyObjectConverter struct {
@ -183,31 +184,39 @@ func NewNegotiatedSerializer(s runtime.NegotiatedSerializer, kind string, encode
}
}
func (t *thirdPartyResourceDataCodecFactory) EncoderForVersion(s runtime.Serializer, gv unversioned.GroupVersion) runtime.Encoder {
return NewCodec(runtime.NewCodec(
t.NegotiatedSerializer.EncoderForVersion(s, gv),
t.NegotiatedSerializer.DecoderToVersion(s, t.decodeGV),
), t.kind)
func (t *thirdPartyResourceDataCodecFactory) SupportedMediaTypes() []string {
supported := sets.NewString(t.NegotiatedSerializer.SupportedMediaTypes()...)
return supported.Intersection(sets.NewString("application/json", "application/yaml")).List()
}
func (t *thirdPartyResourceDataCodecFactory) DecoderToVersion(s runtime.Serializer, gv unversioned.GroupVersion) runtime.Decoder {
return NewCodec(runtime.NewCodec(
t.NegotiatedSerializer.EncoderForVersion(s, t.encodeGV),
t.NegotiatedSerializer.DecoderToVersion(s, gv),
), t.kind)
func (t *thirdPartyResourceDataCodecFactory) SupportedStreamingMediaTypes() []string {
supported := sets.NewString(t.NegotiatedSerializer.SupportedStreamingMediaTypes()...)
return supported.Intersection(sets.NewString("application/json", "application/json;stream=watch")).List()
}
type thirdPartyResourceDataCodec struct {
delegate runtime.Codec
func (t *thirdPartyResourceDataCodecFactory) EncoderForVersion(s runtime.Encoder, gv unversioned.GroupVersion) runtime.Encoder {
return &thirdPartyResourceDataEncoder{delegate: t.NegotiatedSerializer.EncoderForVersion(s, gv), kind: t.kind}
}
func (t *thirdPartyResourceDataCodecFactory) DecoderToVersion(s runtime.Decoder, gv unversioned.GroupVersion) runtime.Decoder {
return NewDecoder(t.NegotiatedSerializer.DecoderToVersion(s, gv), t.kind)
}
func NewCodec(delegate runtime.Codec, kind string) runtime.Codec {
return runtime.NewCodec(NewEncoder(delegate, kind), NewDecoder(delegate, kind))
}
type thirdPartyResourceDataDecoder struct {
delegate runtime.Decoder
kind string
}
var _ runtime.Codec = &thirdPartyResourceDataCodec{}
func NewCodec(codec runtime.Codec, kind string) runtime.Codec {
return &thirdPartyResourceDataCodec{codec, kind}
func NewDecoder(delegate runtime.Decoder, kind string) runtime.Decoder {
return &thirdPartyResourceDataDecoder{delegate: delegate, kind: kind}
}
var _ runtime.Decoder = &thirdPartyResourceDataDecoder{}
func parseObject(data []byte) (map[string]interface{}, error) {
var obj interface{}
if err := json.Unmarshal(data, &obj); err != nil {
@ -221,7 +230,7 @@ func parseObject(data []byte) (map[string]interface{}, error) {
return mapObj, nil
}
func (t *thirdPartyResourceDataCodec) populate(data []byte) (runtime.Object, error) {
func (t *thirdPartyResourceDataDecoder) populate(data []byte) (runtime.Object, error) {
mapObj, err := parseObject(data)
if err != nil {
return nil, err
@ -229,7 +238,7 @@ func (t *thirdPartyResourceDataCodec) populate(data []byte) (runtime.Object, err
return t.populateFromObject(mapObj, data)
}
func (t *thirdPartyResourceDataCodec) populateFromObject(mapObj map[string]interface{}, data []byte) (runtime.Object, error) {
func (t *thirdPartyResourceDataDecoder) populateFromObject(mapObj map[string]interface{}, data []byte) (runtime.Object, error) {
typeMeta := unversioned.TypeMeta{}
if err := json.Unmarshal(data, &typeMeta); err != nil {
return nil, err
@ -252,7 +261,7 @@ func (t *thirdPartyResourceDataCodec) populateFromObject(mapObj map[string]inter
}
}
func (t *thirdPartyResourceDataCodec) populateResource(objIn *extensions.ThirdPartyResourceData, mapObj map[string]interface{}, data []byte) error {
func (t *thirdPartyResourceDataDecoder) populateResource(objIn *extensions.ThirdPartyResourceData, mapObj map[string]interface{}, data []byte) error {
metadata, ok := mapObj["metadata"].(map[string]interface{})
if !ok {
return fmt.Errorf("unexpected object for metadata: %#v", mapObj["metadata"])
@ -274,7 +283,7 @@ func (t *thirdPartyResourceDataCodec) populateResource(objIn *extensions.ThirdPa
return nil
}
func (t *thirdPartyResourceDataCodec) Decode(data []byte, gvk *unversioned.GroupVersionKind, into runtime.Object) (runtime.Object, *unversioned.GroupVersionKind, error) {
func (t *thirdPartyResourceDataDecoder) Decode(data []byte, gvk *unversioned.GroupVersionKind, into runtime.Object) (runtime.Object, *unversioned.GroupVersionKind, error) {
if into == nil {
obj, err := t.populate(data)
if err != nil {
@ -347,7 +356,7 @@ func (t *thirdPartyResourceDataCodec) Decode(data []byte, gvk *unversioned.Group
return thirdParty, actual, nil
}
func (t *thirdPartyResourceDataCodec) populateListResource(objIn *extensions.ThirdPartyResourceDataList, mapObj map[string]interface{}) error {
func (t *thirdPartyResourceDataDecoder) populateListResource(objIn *extensions.ThirdPartyResourceDataList, mapObj map[string]interface{}) error {
items, ok := mapObj["items"].([]interface{})
if !ok {
return fmt.Errorf("unexpected object for items: %#v", mapObj["items"])
@ -374,6 +383,17 @@ const template = `{
"items": [ %s ]
}`
type thirdPartyResourceDataEncoder struct {
delegate runtime.Encoder
kind string
}
func NewEncoder(delegate runtime.Encoder, kind string) runtime.Encoder {
return &thirdPartyResourceDataEncoder{delegate: delegate, kind: kind}
}
var _ runtime.Encoder = &thirdPartyResourceDataEncoder{}
func encodeToJSON(obj *extensions.ThirdPartyResourceData, stream io.Writer) error {
var objOut interface{}
if err := json.Unmarshal(obj.Data, &objOut); err != nil {
@ -388,7 +408,7 @@ func encodeToJSON(obj *extensions.ThirdPartyResourceData, stream io.Writer) erro
return encoder.Encode(objMap)
}
func (t *thirdPartyResourceDataCodec) EncodeToStream(obj runtime.Object, stream io.Writer, overrides ...unversioned.GroupVersion) (err error) {
func (t *thirdPartyResourceDataEncoder) EncodeToStream(obj runtime.Object, stream io.Writer, overrides ...unversioned.GroupVersion) (err error) {
switch obj := obj.(type) {
case *extensions.ThirdPartyResourceData:
return encodeToJSON(obj, stream)

View File

@ -87,13 +87,14 @@ func TestCodec(t *testing.T) {
},
}
for _, test := range tests {
codec := &thirdPartyResourceDataCodec{kind: "Foo", delegate: testapi.Extensions.Codec()}
d := &thirdPartyResourceDataDecoder{kind: "Foo", delegate: testapi.Extensions.Codec()}
e := &thirdPartyResourceDataEncoder{kind: "Foo", delegate: testapi.Extensions.Codec()}
data, err := json.Marshal(test.obj)
if err != nil {
t.Errorf("[%s] unexpected error: %v", test.name, err)
continue
}
obj, err := runtime.Decode(codec, data)
obj, err := runtime.Decode(d, data)
if err != nil && !test.expectErr {
t.Errorf("[%s] unexpected error: %v", test.name, err)
continue
@ -121,7 +122,7 @@ func TestCodec(t *testing.T) {
t.Errorf("[%s]\nexpected\n%v\nsaw\n%v\n", test.name, test.obj, &output)
}
data, err = runtime.Encode(codec, rsrcObj)
data, err = runtime.Encode(e, rsrcObj)
if err != nil {
t.Errorf("[%s] unexpected error: %v", test.name, err)
}

View File

@ -18,6 +18,7 @@ package runtime
import (
"bytes"
"encoding/base64"
"fmt"
"io"
"net/url"
@ -169,3 +170,27 @@ func (c *parameterCodec) EncodeParameters(obj Object, to unversioned.GroupVersio
}
return queryparams.Convert(obj)
}
type base64Serializer struct {
Serializer
}
func NewBase64Serializer(s Serializer) Serializer {
return &base64Serializer{s}
}
func (s base64Serializer) EncodeToStream(obj Object, stream io.Writer, overrides ...unversioned.GroupVersion) error {
e := base64.NewEncoder(base64.StdEncoding, stream)
err := s.Serializer.EncodeToStream(obj, e, overrides...)
e.Close()
return err
}
func (s base64Serializer) Decode(data []byte, defaults *unversioned.GroupVersionKind, into Object) (Object, *unversioned.GroupVersionKind, error) {
out := make([]byte, base64.StdEncoding.DecodedLen(len(data)))
n, err := base64.StdEncoding.Decode(out, data)
if err != nil {
return nil, nil, err
}
return s.Serializer.Decode(out[:n], defaults, into)
}

View File

@ -118,3 +118,33 @@ func DeepCopy_runtime_Scheme(in Scheme, out *Scheme, c *conversion.Cloner) error
}
return nil
}
func DeepCopy_runtime_SerializerInfo(in SerializerInfo, out *SerializerInfo, c *conversion.Cloner) error {
if in.Serializer == nil {
out.Serializer = nil
} else if newVal, err := c.DeepCopy(in.Serializer); err != nil {
return err
} else {
out.Serializer = newVal.(Serializer)
}
out.EncodesAsText = in.EncodesAsText
out.MediaType = in.MediaType
return nil
}
func DeepCopy_runtime_StreamSerializerInfo(in StreamSerializerInfo, out *StreamSerializerInfo, c *conversion.Cloner) error {
if err := DeepCopy_runtime_SerializerInfo(in.SerializerInfo, &out.SerializerInfo, c); err != nil {
return err
}
if in.Framer == nil {
out.Framer = nil
} else if newVal, err := c.DeepCopy(in.Framer); err != nil {
return err
} else {
out.Framer = newVal.(Framer)
}
if err := DeepCopy_runtime_SerializerInfo(in.Embedded, &out.Embedded, c); err != nil {
return err
}
return nil
}

View File

@ -86,32 +86,75 @@ type Framer interface {
NewFrameWriter(w io.Writer) io.Writer
}
// SerializerInfo contains information about a specific serialization format
type SerializerInfo struct {
Serializer
// EncodesAsText indicates this serializer can be encoded to UTF-8 safely.
EncodesAsText bool
// MediaType is the value that represents this serializer over the wire.
MediaType string
}
// StreamSerializerInfo contains information about a specific stream serialization format
type StreamSerializerInfo struct {
SerializerInfo
// Framer is the factory for retrieving streams that separate objects on the wire
Framer
// Embedded is the type of the nested serialization that should be used.
Embedded SerializerInfo
}
// NegotiatedSerializer is an interface used for obtaining encoders, decoders, and serializers
// for multiple supported media types.
// for multiple supported media types. This would commonly be accepted by a server component
// that performs HTTP content negotiation to accept multiple formats.
type NegotiatedSerializer interface {
// SupportedMediaTypes is the media types supported for reading and writing single objects.
SupportedMediaTypes() []string
// SerializerForMediaType returns a serializer for the provided media type. Options is a set of
// parameters applied to the media type that may modify the resulting output.
SerializerForMediaType(mediaType string, options map[string]string) (Serializer, bool)
// SerializerForMediaType returns a serializer for the provided media type. params is the set of
// parameters applied to the media type that may modify the resulting output. ok will be false
// if no serializer matched the media type.
SerializerForMediaType(mediaType string, params map[string]string) (s SerializerInfo, ok bool)
// SupportedStreamingMediaTypes returns the media types of the supported streaming serializers.
// Streaming serializers control how multiple objects are written to a stream output.
SupportedStreamingMediaTypes() []string
// StreamingSerializerForMediaType returns a serializer for the provided media type that supports
// reading and writing multiple objects to a stream. It returns a framer and serializer, or an
// error if no such serializer can be created. Options is a set of parameters applied to the
// media type that may modify the resulting output.
StreamingSerializerForMediaType(mediaType string, options map[string]string) (Serializer, Framer, string, bool)
// error if no such serializer can be created. Params is the set of parameters applied to the
// media type that may modify the resulting output. ok will be false if no serializer matched
// the media type.
StreamingSerializerForMediaType(mediaType string, params map[string]string) (s StreamSerializerInfo, ok bool)
// EncoderForVersion returns an encoder that ensures objects being written to the provided
// serializer are in the provided group version.
// TODO: take multiple group versions
EncoderForVersion(serializer Serializer, gv unversioned.GroupVersion) Encoder
EncoderForVersion(serializer Encoder, gv unversioned.GroupVersion) Encoder
// DecoderForVersion returns a decoder that ensures objects being read by the provided
// serializer are in the provided group version by default.
// TODO: take multiple group versions
DecoderToVersion(serializer Serializer, gv unversioned.GroupVersion) Decoder
DecoderToVersion(serializer Decoder, gv unversioned.GroupVersion) Decoder
}
// StorageSerializer is an interface used for obtaining encoders, decoders, and serializers
// that can read and write data at rest. This would commonly be used by client tools that must
// read files, or server side storage interfaces that persist restful objects.
type StorageSerializer interface {
// SerializerForMediaType returns a serializer for the provided media type. Options is a set of
// parameters applied to the media type that may modify the resulting output.
SerializerForMediaType(mediaType string, options map[string]string) (SerializerInfo, bool)
// UniversalDeserializer returns a Serializer that can read objects in multiple supported formats
// by introspecting the data at rest.
UniversalDeserializer() Decoder
// EncoderForVersion returns an encoder that ensures objects being written to the provided
// serializer are in the provided group version.
// TODO: take multiple group versions
EncoderForVersion(serializer Encoder, gv unversioned.GroupVersion) Encoder
// DecoderForVersion returns a decoder that ensures objects being read by the provided
// serializer are in the provided group version by default.
// TODO: take multiple group versions
DecoderToVersion(serializer Decoder, gv unversioned.GroupVersion) Decoder
}
///////////////////////////////////////////////////////////////////////////////

View File

@ -31,6 +31,8 @@ type serializerType struct {
AcceptContentTypes []string
ContentType string
FileExtensions []string
// EncodesAsText should be true if this content type can be represented safely in UTF-8
EncodesAsText bool
Serializer runtime.Serializer
PrettySerializer runtime.Serializer
@ -65,6 +67,7 @@ func newSerializersForScheme(scheme *runtime.Scheme, mf json.MetaFactory) []seri
AcceptContentTypes: []string{"application/json"},
ContentType: "application/json",
FileExtensions: []string{"json"},
EncodesAsText: true,
Serializer: jsonSerializer,
PrettySerializer: jsonPrettySerializer,
@ -77,6 +80,7 @@ func newSerializersForScheme(scheme *runtime.Scheme, mf json.MetaFactory) []seri
AcceptContentTypes: []string{"application/yaml"},
ContentType: "application/yaml",
FileExtensions: []string{"yaml"},
EncodesAsText: true,
Serializer: yamlSerializer,
// TODO: requires runtime.RawExtension to properly distinguish when the nested content is
@ -206,62 +210,91 @@ func (f CodecFactory) UniversalDeserializer() runtime.Decoder {
//
// TODO: the decoder will eventually be removed in favor of dealing with objects in their versioned form
func (f CodecFactory) UniversalDecoder(versions ...unversioned.GroupVersion) runtime.Decoder {
return f.CodecForVersions(runtime.NoopEncoder{Decoder: f.universal}, nil, versions)
return f.CodecForVersions(nil, f.universal, nil, versions)
}
// CodecFor creates a codec with the provided serializer. If an object is decoded and its group is not in the list,
// it will default to runtime.APIVersionInternal. If encode is not specified for an object's group, the object is not
// converted. If encode or decode are nil, no conversion is performed.
func (f CodecFactory) CodecForVersions(serializer runtime.Serializer, encode []unversioned.GroupVersion, decode []unversioned.GroupVersion) runtime.Codec {
return versioning.NewCodecForScheme(f.scheme, serializer, serializer, encode, decode)
func (f CodecFactory) CodecForVersions(encoder runtime.Encoder, decoder runtime.Decoder, encode []unversioned.GroupVersion, decode []unversioned.GroupVersion) runtime.Codec {
return versioning.NewCodecForScheme(f.scheme, encoder, decoder, encode, decode)
}
// DecoderToVersion returns a decoder that targets the provided group version.
func (f CodecFactory) DecoderToVersion(serializer runtime.Serializer, gv unversioned.GroupVersion) runtime.Decoder {
return f.CodecForVersions(serializer, nil, []unversioned.GroupVersion{gv})
func (f CodecFactory) DecoderToVersion(decoder runtime.Decoder, gv unversioned.GroupVersion) runtime.Decoder {
return f.CodecForVersions(nil, decoder, nil, []unversioned.GroupVersion{gv})
}
// EncoderForVersion returns an encoder that targets the provided group version.
func (f CodecFactory) EncoderForVersion(serializer runtime.Serializer, gv unversioned.GroupVersion) runtime.Encoder {
return f.CodecForVersions(serializer, []unversioned.GroupVersion{gv}, nil)
func (f CodecFactory) EncoderForVersion(encoder runtime.Encoder, gv unversioned.GroupVersion) runtime.Encoder {
return f.CodecForVersions(encoder, nil, []unversioned.GroupVersion{gv}, nil)
}
// SerializerForMediaType returns a serializer that matches the provided RFC2046 mediaType, or false if no such
// serializer exists
func (f CodecFactory) SerializerForMediaType(mediaType string, options map[string]string) (runtime.Serializer, bool) {
func (f CodecFactory) SerializerForMediaType(mediaType string, params map[string]string) (runtime.SerializerInfo, bool) {
for _, s := range f.serializers {
for _, accepted := range s.AcceptContentTypes {
if accepted == mediaType {
if s.Specialize != nil && len(options) > 0 {
serializer, ok := s.Specialize(options)
return serializer, ok
// specialization abstracts variants to the content type
if s.Specialize != nil && len(params) > 0 {
serializer, ok := s.Specialize(params)
// TODO: return formatted mediaType+params
return runtime.SerializerInfo{Serializer: serializer, MediaType: s.ContentType, EncodesAsText: s.EncodesAsText}, ok
}
if v, ok := options["pretty"]; ok && v == "1" && s.PrettySerializer != nil {
return s.PrettySerializer, true
// legacy support for ?pretty=1 continues, but this is more formally defined
if v, ok := params["pretty"]; ok && v == "1" && s.PrettySerializer != nil {
return runtime.SerializerInfo{Serializer: s.PrettySerializer, MediaType: s.ContentType, EncodesAsText: s.EncodesAsText}, true
}
return s.Serializer, true
// return the base variant
return runtime.SerializerInfo{Serializer: s.Serializer, MediaType: s.ContentType, EncodesAsText: s.EncodesAsText}, true
}
}
}
return nil, false
return runtime.SerializerInfo{}, false
}
// StreamingSerializerForMediaType returns a serializer that matches the provided RFC2046 mediaType, or false if no such
// serializer exists
func (f CodecFactory) StreamingSerializerForMediaType(mediaType string, options map[string]string) (runtime.Serializer, runtime.Framer, string, bool) {
func (f CodecFactory) StreamingSerializerForMediaType(mediaType string, params map[string]string) (runtime.StreamSerializerInfo, bool) {
for _, s := range f.serializers {
for _, accepted := range s.AcceptStreamContentTypes {
if accepted == mediaType {
if s.StreamSpecialize != nil && len(options) > 0 {
serializer, ok := s.StreamSpecialize(options)
// TODO: have StreamSpecialize return exact content type
return serializer, s.Framer, s.StreamContentType, ok
// TODO: accept params
nested, ok := f.SerializerForMediaType(s.ContentType, nil)
if !ok {
panic("no serializer defined for internal content type")
}
return s.StreamSerializer, s.Framer, s.StreamContentType, true
if s.StreamSpecialize != nil && len(params) > 0 {
serializer, ok := s.StreamSpecialize(params)
// TODO: return formatted mediaType+params
return runtime.StreamSerializerInfo{
SerializerInfo: runtime.SerializerInfo{
Serializer: serializer,
MediaType: s.StreamContentType,
EncodesAsText: s.EncodesAsText,
},
Framer: s.Framer,
Embedded: nested,
}, ok
}
return runtime.StreamSerializerInfo{
SerializerInfo: runtime.SerializerInfo{
Serializer: s.StreamSerializer,
MediaType: s.StreamContentType,
EncodesAsText: s.EncodesAsText,
},
Framer: s.Framer,
Embedded: nested,
}, true
}
}
}
return nil, nil, "", false
return runtime.StreamSerializerInfo{}, false
}
// SerializerForFileExtension returns a serializer for the provided extension, or false if no serializer matches.

View File

@ -267,7 +267,7 @@ func TestVersionedEncoding(t *testing.T) {
encoder, _ := cf.SerializerForFileExtension("json")
// codec that is unversioned uses the target version
unversionedCodec := cf.CodecForVersions(encoder, nil, nil)
unversionedCodec := cf.CodecForVersions(encoder, nil, nil, nil)
_, err = runtime.Encode(unversionedCodec, &TestType1{}, unversioned.GroupVersion{Version: "v3"})
if err == nil || !runtime.IsNotRegisteredError(err) {
t.Fatal(err)

View File

@ -186,20 +186,13 @@ func (s *Serializer) EncodeToStream(obj runtime.Object, w io.Writer, overrides .
}
// RecognizesData implements the RecognizingDecoder interface.
func (s *Serializer) RecognizesData(peek io.Reader) (bool, error) {
_, ok := utilyaml.GuessJSONStream(peek, 2048)
func (s *Serializer) RecognizesData(peek io.Reader) (ok, unknown bool, err error) {
if s.yaml {
return !ok, nil
// we could potentially look for '---'
return false, true, nil
}
return ok, nil
}
// EncodesAsText returns true because both JSON and YAML are considered textual representations
// of data. This is used to determine whether the serialized object should be transmitted over
// a WebSocket Text or Binary frame. This must remain true for legacy compatibility with v1.1
// watch over websocket implementations.
func (s *Serializer) EncodesAsText() bool {
return true
_, ok = utilyaml.GuessJSONStream(peek, 2048)
return ok, false, nil
}
// Framer is the default JSON framing behavior, with newlines delimiting individual objects.

View File

@ -24,35 +24,34 @@ import (
// TODO: We should figure out what happens when someone asks
// encoder for version and it conflicts with the raw serializer.
type negotiatedSerializerWrapper struct {
serializer runtime.Serializer
streamingSerializer runtime.Serializer
framer runtime.Framer
info runtime.SerializerInfo
streamInfo runtime.StreamSerializerInfo
}
func NegotiatedSerializerWrapper(serializer, streamingSerializer runtime.Serializer, framer runtime.Framer) runtime.NegotiatedSerializer {
return &negotiatedSerializerWrapper{serializer, streamingSerializer, framer}
func NegotiatedSerializerWrapper(info runtime.SerializerInfo, streamInfo runtime.StreamSerializerInfo) runtime.NegotiatedSerializer {
return &negotiatedSerializerWrapper{info, streamInfo}
}
func (n *negotiatedSerializerWrapper) SupportedMediaTypes() []string {
return []string{}
}
func (n *negotiatedSerializerWrapper) SerializerForMediaType(mediaType string, options map[string]string) (runtime.Serializer, bool) {
return n.serializer, true
func (n *negotiatedSerializerWrapper) SerializerForMediaType(mediaType string, options map[string]string) (runtime.SerializerInfo, bool) {
return n.info, true
}
func (n *negotiatedSerializerWrapper) SupportedStreamingMediaTypes() []string {
return []string{}
}
func (n *negotiatedSerializerWrapper) StreamingSerializerForMediaType(mediaType string, options map[string]string) (runtime.Serializer, runtime.Framer, string, bool) {
return n.streamingSerializer, n.framer, "", true
func (n *negotiatedSerializerWrapper) StreamingSerializerForMediaType(mediaType string, options map[string]string) (runtime.StreamSerializerInfo, bool) {
return n.streamInfo, true
}
func (n *negotiatedSerializerWrapper) EncoderForVersion(serializer runtime.Serializer, gv unversioned.GroupVersion) runtime.Encoder {
return n.serializer
func (n *negotiatedSerializerWrapper) EncoderForVersion(e runtime.Encoder, _ unversioned.GroupVersion) runtime.Encoder {
return e
}
func (n *negotiatedSerializerWrapper) DecoderToVersion(serializer runtime.Serializer, gv unversioned.GroupVersion) runtime.Decoder {
return n.serializer
func (n *negotiatedSerializerWrapper) DecoderToVersion(d runtime.Decoder, _gv unversioned.GroupVersion) runtime.Decoder {
return d
}

View File

@ -419,12 +419,6 @@ func (s *RawSerializer) EncodeToStream(obj runtime.Object, w io.Writer, override
}
}
// RecognizesData implements the RecognizingDecoder interface - objects encoded with this serializer
// have no innate identifying information and so cannot be recognized.
func (s *RawSerializer) RecognizesData(peek io.Reader) (bool, error) {
return false, nil
}
var LengthDelimitedFramer = lengthDelimitedFramer{}
type lengthDelimitedFramer struct{}

View File

@ -297,6 +297,18 @@ func TestDecodeObjects(t *testing.T) {
t.Fatal(err)
}
unk2 := &runtime.Unknown{
TypeMeta: runtime.TypeMeta{Kind: "Pod", APIVersion: "v1"},
}
wire2 := make([]byte, len(wire1)*2)
n, err := unk2.NestedMarshalTo(wire2, obj1, uint64(obj1.Size()))
if err != nil {
t.Fatal(err)
}
if n != len(wire1) || !bytes.Equal(wire1, wire2[:n]) {
t.Fatalf("unexpected wire:\n%s", hex.Dump(wire2[:n]))
}
wire1 = append([]byte{0x6b, 0x38, 0x73, 0x00}, wire1...)
testCases := []struct {

View File

@ -17,6 +17,7 @@ limitations under the License.
package recognizer
import (
"bufio"
"bytes"
"fmt"
"io"
@ -27,51 +28,98 @@ import (
type RecognizingDecoder interface {
runtime.Decoder
RecognizesData(peek io.Reader) (bool, error)
// RecognizesData should return true if the input provided in the provided reader
// belongs to this decoder, or an error if the data could not be read or is ambiguous.
// Unknown is true if the data could not be determined to match the decoder type.
// Decoders should assume that they can read as much of peek as they need (as the caller
// provides) and may return unknown if the data provided is not sufficient to make a
// a determination. When peek returns EOF that may mean the end of the input or the
// end of buffered input - recognizers should return the best guess at that time.
RecognizesData(peek io.Reader) (ok, unknown bool, err error)
}
// NewDecoder creates a decoder that will attempt multiple decoders in an order defined
// by:
//
// 1. The decoder implements RecognizingDecoder and identifies the data
// 2. All other decoders, and any decoder that returned true for unknown.
//
// The order passed to the constructor is preserved within those priorities.
func NewDecoder(decoders ...runtime.Decoder) runtime.Decoder {
recognizing, blind := []RecognizingDecoder{}, []runtime.Decoder{}
for _, d := range decoders {
if r, ok := d.(RecognizingDecoder); ok {
recognizing = append(recognizing, r)
} else {
blind = append(blind, d)
}
}
return &decoder{
recognizing: recognizing,
blind: blind,
decoders: decoders,
}
}
type decoder struct {
recognizing []RecognizingDecoder
blind []runtime.Decoder
decoders []runtime.Decoder
}
var _ RecognizingDecoder = &decoder{}
func (d *decoder) RecognizesData(peek io.Reader) (bool, bool, error) {
var (
lastErr error
anyUnknown bool
)
data, _ := bufio.NewReaderSize(peek, 1024).Peek(1024)
for _, r := range d.decoders {
switch t := r.(type) {
case RecognizingDecoder:
ok, unknown, err := t.RecognizesData(bytes.NewBuffer(data))
if err != nil {
lastErr = err
continue
}
anyUnknown = anyUnknown || unknown
if !ok {
continue
}
return true, false, nil
}
}
return false, anyUnknown, lastErr
}
func (d *decoder) Decode(data []byte, gvk *unversioned.GroupVersionKind, into runtime.Object) (runtime.Object, *unversioned.GroupVersionKind, error) {
var lastErr error
for _, r := range d.recognizing {
buf := bytes.NewBuffer(data)
ok, err := r.RecognizesData(buf)
if err != nil {
lastErr = err
continue
var (
lastErr error
skipped []runtime.Decoder
)
// try recognizers, record any decoders we need to give a chance later
for _, r := range d.decoders {
switch t := r.(type) {
case RecognizingDecoder:
buf := bytes.NewBuffer(data)
ok, unknown, err := t.RecognizesData(buf)
if err != nil {
lastErr = err
continue
}
if unknown {
skipped = append(skipped, t)
continue
}
if !ok {
continue
}
return r.Decode(data, gvk, into)
default:
skipped = append(skipped, t)
}
if !ok {
continue
}
return r.Decode(data, gvk, into)
}
for _, d := range d.blind {
out, actual, err := d.Decode(data, gvk, into)
// try recognizers that returned unknown or didn't recognize their data
for _, r := range skipped {
out, actual, err := r.Decode(data, gvk, into)
if err != nil {
lastErr = err
continue
}
return out, actual, nil
}
if lastErr == nil {
lastErr = fmt.Errorf("no serialization format matched the provided data")
}

View File

@ -27,6 +27,7 @@ import (
// EnableCrossGroupDecoding modifies the given decoder in place, if it is a codec
// from this package. It allows objects from one group to be auto-decoded into
// another group. 'destGroup' must already exist in the codec.
// TODO: this is an encapsulation violation and should be refactored
func EnableCrossGroupDecoding(d runtime.Decoder, sourceGroup, destGroup string) error {
internal, ok := d.(*codec)
if !ok {
@ -45,6 +46,7 @@ func EnableCrossGroupDecoding(d runtime.Decoder, sourceGroup, destGroup string)
// EnableCrossGroupEncoding modifies the given encoder in place, if it is a codec
// from this package. It allows objects from one group to be auto-decoded into
// another group. 'destGroup' must already exist in the codec.
// TODO: this is an encapsulation violation and should be refactored
func EnableCrossGroupEncoding(e runtime.Encoder, sourceGroup, destGroup string) error {
internal, ok := e.(*codec)
if !ok {

View File

@ -86,9 +86,13 @@ func New(kubeConfigFile string) (*WebhookAuthorizer, error) {
if err != nil {
return nil, err
}
serializer := json.NewSerializer(json.DefaultMetaFactory, api.Scheme, runtime.ObjectTyperToTyper(api.Scheme), false)
codec := versioning.NewCodecForScheme(api.Scheme, serializer, serializer, encodeVersions, decodeVersions)
clientConfig.ContentConfig.NegotiatedSerializer = runtimeserializer.NegotiatedSerializerWrapper(codec, codec, json.Framer)
clientConfig.ContentConfig.NegotiatedSerializer = runtimeserializer.NegotiatedSerializerWrapper(
runtime.SerializerInfo{Serializer: codec},
runtime.StreamSerializerInfo{},
)
restClient, err := restclient.UnversionedRESTClientFor(clientConfig)
if err != nil {

View File

@ -155,24 +155,29 @@ func NewMasterConfig() *master.Config {
Prefix: etcdtest.PathPrefix(),
}
negotiatedSerializer := NewSingleContentTypeSerializer(api.Scheme, testapi.Default.Codec(), "application/json")
negotiatedSerializer := NewSingleContentTypeSerializer(api.Scheme, testapi.Default.Codec(), runtime.ContentTypeJSON)
storageFactory := genericapiserver.NewDefaultStorageFactory(config, negotiatedSerializer, genericapiserver.NewDefaultResourceEncodingConfig(), master.DefaultAPIResourceConfigSource())
storageFactory := genericapiserver.NewDefaultStorageFactory(config, runtime.ContentTypeJSON, negotiatedSerializer, genericapiserver.NewDefaultResourceEncodingConfig(), master.DefaultAPIResourceConfigSource())
storageFactory.SetSerializer(
unversioned.GroupResource{Group: api.GroupName, Resource: genericapiserver.AllResources},
NewSingleContentTypeSerializer(api.Scheme, testapi.Default.Codec(), "application/json"))
"",
NewSingleContentTypeSerializer(api.Scheme, testapi.Default.Codec(), runtime.ContentTypeJSON))
storageFactory.SetSerializer(
unversioned.GroupResource{Group: autoscaling.GroupName, Resource: genericapiserver.AllResources},
NewSingleContentTypeSerializer(api.Scheme, testapi.Autoscaling.Codec(), "application/json"))
"",
NewSingleContentTypeSerializer(api.Scheme, testapi.Autoscaling.Codec(), runtime.ContentTypeJSON))
storageFactory.SetSerializer(
unversioned.GroupResource{Group: batch.GroupName, Resource: genericapiserver.AllResources},
NewSingleContentTypeSerializer(api.Scheme, testapi.Batch.Codec(), "application/json"))
"",
NewSingleContentTypeSerializer(api.Scheme, testapi.Batch.Codec(), runtime.ContentTypeJSON))
storageFactory.SetSerializer(
unversioned.GroupResource{Group: apps.GroupName, Resource: genericapiserver.AllResources},
NewSingleContentTypeSerializer(api.Scheme, testapi.Apps.Codec(), "application/json"))
"",
NewSingleContentTypeSerializer(api.Scheme, testapi.Apps.Codec(), runtime.ContentTypeJSON))
storageFactory.SetSerializer(
unversioned.GroupResource{Group: extensions.GroupName, Resource: genericapiserver.AllResources},
NewSingleContentTypeSerializer(api.Scheme, testapi.Extensions.Codec(), "application/json"))
"",
NewSingleContentTypeSerializer(api.Scheme, testapi.Extensions.Codec(), runtime.ContentTypeJSON))
return &master.Config{
Config: &genericapiserver.Config{

View File

@ -23,7 +23,7 @@ import (
)
// NewSingleContentTypeSerializer wraps a serializer in a NegotiatedSerializer that handles one content type
func NewSingleContentTypeSerializer(scheme *runtime.Scheme, serializer runtime.Serializer, contentType string) runtime.NegotiatedSerializer {
func NewSingleContentTypeSerializer(scheme *runtime.Scheme, serializer runtime.Serializer, contentType string) runtime.StorageSerializer {
return &wrappedSerializer{
scheme: scheme,
serializer: serializer,
@ -37,29 +37,31 @@ type wrappedSerializer struct {
contentType string
}
var _ runtime.NegotiatedSerializer = &wrappedSerializer{}
var _ runtime.StorageSerializer = &wrappedSerializer{}
func (s *wrappedSerializer) SupportedMediaTypes() []string {
return []string{s.contentType}
}
func (s *wrappedSerializer) SerializerForMediaType(mediaType string, options map[string]string) (runtime.Serializer, bool) {
func (s *wrappedSerializer) SerializerForMediaType(mediaType string, options map[string]string) (runtime.SerializerInfo, bool) {
if mediaType != s.contentType {
return nil, false
return runtime.SerializerInfo{}, false
}
return s.serializer, true
}
func (s *wrappedSerializer) SupportedStreamingMediaTypes() []string {
return nil
}
func (s *wrappedSerializer) StreamingSerializerForMediaType(mediaType string, options map[string]string) (runtime.Serializer, runtime.Framer, string, bool) {
return nil, nil, "", false
return runtime.SerializerInfo{
Serializer: s.serializer,
MediaType: mediaType,
EncodesAsText: true, // TODO: this should be parameterized
}, true
}
func (s *wrappedSerializer) EncoderForVersion(serializer runtime.Serializer, gv unversioned.GroupVersion) runtime.Encoder {
return versioning.NewCodec(s.serializer, s.serializer, s.scheme, s.scheme, s.scheme, runtime.ObjectTyperToTyper(s.scheme), []unversioned.GroupVersion{gv}, nil)
func (s *wrappedSerializer) UniversalDeserializer() runtime.Decoder {
return s.serializer
}
func (s *wrappedSerializer) DecoderToVersion(serializer runtime.Serializer, gv unversioned.GroupVersion) runtime.Decoder {
return versioning.NewCodec(s.serializer, s.serializer, s.scheme, s.scheme, s.scheme, runtime.ObjectTyperToTyper(s.scheme), nil, []unversioned.GroupVersion{gv})
func (s *wrappedSerializer) EncoderForVersion(encoder runtime.Encoder, gv unversioned.GroupVersion) runtime.Encoder {
return versioning.NewCodec(encoder, nil, s.scheme, s.scheme, s.scheme, runtime.ObjectTyperToTyper(s.scheme), []unversioned.GroupVersion{gv}, nil)
}
func (s *wrappedSerializer) DecoderToVersion(decoder runtime.Decoder, gv unversioned.GroupVersion) runtime.Decoder {
return versioning.NewCodec(nil, decoder, s.scheme, s.scheme, s.scheme, runtime.ObjectTyperToTyper(s.scheme), nil, []unversioned.GroupVersion{gv})
}