diff --git a/pkg/genericapiserver/server/storage_factory.go b/pkg/genericapiserver/server/storage_factory.go index f3a9e319d80..b40306bf740 100644 --- a/pkg/genericapiserver/server/storage_factory.go +++ b/pkg/genericapiserver/server/storage_factory.go @@ -74,7 +74,19 @@ type DefaultStorageFactory struct { APIResourceConfigSource APIResourceConfigSource // newStorageCodecFn exists to be overwritten for unit testing. - newStorageCodecFn func(storageMediaType string, ns runtime.StorageSerializer, storageVersion, memoryVersion schema.GroupVersion, config storagebackend.Config) (codec runtime.Codec, err error) + newStorageCodecFn func(opts StorageCodecConfig) (codec runtime.Codec, err error) +} + +// StorageCodecConfig are the arguments passed to newStorageCodecFn +type StorageCodecConfig struct { + StorageMediaType string + StorageSerializer runtime.StorageSerializer + StorageVersion schema.GroupVersion + MemoryVersion schema.GroupVersion + Config storagebackend.Config + + EncoderDecoratorFn func(runtime.Encoder) runtime.Encoder + DecoderDecoratorFn func([]runtime.Decoder) []runtime.Decoder } type groupResourceOverrides struct { @@ -95,6 +107,34 @@ type groupResourceOverrides struct { // of exposing one set of concepts. autoscaling.HPA and extensions.HPA as a for instance // The order of the slice matters! It is the priority order of lookup for finding a storage location cohabitatingResources []schema.GroupResource + // encoderDecoratorFn is optional and may wrap the provided encoder prior to being serialized. + encoderDecoratorFn func(runtime.Encoder) runtime.Encoder + // decoderDecoratorFn is optional and may wrap the provided decoders (can add new decoders). The order of + // returned decoders will be priority for attempt to decode. + decoderDecoratorFn func([]runtime.Decoder) []runtime.Decoder +} + +// Apply overrides the provided config and options if the override has a value in that position +func (o groupResourceOverrides) Apply(config *storagebackend.Config, options *StorageCodecConfig) { + if len(o.etcdLocation) > 0 { + config.ServerList = o.etcdLocation + } + if len(o.etcdPrefix) > 0 { + config.Prefix = o.etcdPrefix + } + + if len(o.mediaType) > 0 { + options.StorageMediaType = o.mediaType + } + if o.serializer != nil { + options.StorageSerializer = o.serializer + } + if o.encoderDecoratorFn != nil { + options.EncoderDecoratorFn = o.encoderDecoratorFn + } + if o.decoderDecoratorFn != nil { + options.DecoderDecoratorFn = o.decoderDecoratorFn + } } var _ StorageFactory = &DefaultStorageFactory{} @@ -165,6 +205,15 @@ func (s *DefaultStorageFactory) AddCohabitatingResources(groupResources ...schem } } +func (s *DefaultStorageFactory) AddSerializationChains(encoderDecoratorFn func(runtime.Encoder) runtime.Encoder, decoderDecoratorFn func([]runtime.Decoder) []runtime.Decoder, groupResources ...schema.GroupResource) { + for _, groupResource := range groupResources { + overrides := s.Overrides[groupResource] + overrides.encoderDecoratorFn = encoderDecoratorFn + overrides.decoderDecoratorFn = decoderDecoratorFn + s.Overrides[groupResource] = overrides + } +} + func getAllResourcesAlias(resource schema.GroupResource) schema.GroupResource { return schema.GroupResource{Group: resource.Group, Resource: AllResources} } @@ -184,64 +233,38 @@ func (s *DefaultStorageFactory) getStorageGroupResource(groupResource schema.Gro func (s *DefaultStorageFactory) NewConfig(groupResource schema.GroupResource) (*storagebackend.Config, error) { chosenStorageResource := s.getStorageGroupResource(groupResource) - groupOverride := s.Overrides[getAllResourcesAlias(chosenStorageResource)] - exactResourceOverride := s.Overrides[chosenStorageResource] - - overriddenEtcdLocations := []string{} - if len(groupOverride.etcdLocation) > 0 { - overriddenEtcdLocations = groupOverride.etcdLocation - } - if len(exactResourceOverride.etcdLocation) > 0 { - overriddenEtcdLocations = exactResourceOverride.etcdLocation - } - - etcdPrefix := s.StorageConfig.Prefix - if len(groupOverride.etcdPrefix) > 0 { - etcdPrefix = groupOverride.etcdPrefix - } - if len(exactResourceOverride.etcdPrefix) > 0 { - etcdPrefix = exactResourceOverride.etcdPrefix - } - - etcdMediaType := s.DefaultMediaType - if len(groupOverride.mediaType) != 0 { - etcdMediaType = groupOverride.mediaType - } - if len(exactResourceOverride.mediaType) != 0 { - etcdMediaType = exactResourceOverride.mediaType - } - - etcdSerializer := s.DefaultSerializer - if groupOverride.serializer != nil { - etcdSerializer = groupOverride.serializer - } - if exactResourceOverride.serializer != nil { - etcdSerializer = exactResourceOverride.serializer - } // operate on copy - config := s.StorageConfig - config.Prefix = etcdPrefix - if len(overriddenEtcdLocations) > 0 { - config.ServerList = overriddenEtcdLocations + storageConfig := s.StorageConfig + codecConfig := StorageCodecConfig{ + StorageMediaType: s.DefaultMediaType, + StorageSerializer: s.DefaultSerializer, } - storageEncodingVersion, err := s.ResourceEncodingConfig.StorageEncodingFor(chosenStorageResource) + if override, ok := s.Overrides[getAllResourcesAlias(chosenStorageResource)]; ok { + override.Apply(&storageConfig, &codecConfig) + } + if override, ok := s.Overrides[chosenStorageResource]; ok { + override.Apply(&storageConfig, &codecConfig) + } + + var err error + codecConfig.StorageVersion, err = s.ResourceEncodingConfig.StorageEncodingFor(chosenStorageResource) if err != nil { return nil, err } - internalVersion, err := s.ResourceEncodingConfig.InMemoryEncodingFor(groupResource) + codecConfig.MemoryVersion, err = s.ResourceEncodingConfig.InMemoryEncodingFor(groupResource) if err != nil { return nil, err } + codecConfig.Config = storageConfig - codec, err := s.newStorageCodecFn(etcdMediaType, etcdSerializer, storageEncodingVersion, internalVersion, config) + storageConfig.Codec, err = s.newStorageCodecFn(codecConfig) if err != nil { return nil, err } + glog.V(3).Infof("storing %v in %v, reading as %v from %v", groupResource, codecConfig.StorageVersion, codecConfig.MemoryVersion, codecConfig.Config) - glog.V(3).Infof("storing %v in %v, reading as %v from %v", groupResource, storageEncodingVersion, internalVersion, config) - config.Codec = codec - return &config, nil + return &storageConfig, nil } // Get all backends for all registered storage destinations. @@ -257,41 +280,51 @@ func (s *DefaultStorageFactory) Backends() []string { // NewStorageCodec assembles a storage codec for the provided storage media type, the provided serializer, and the requested // storage and memory versions. -func NewStorageCodec(storageMediaType string, ns runtime.StorageSerializer, storageVersion, memoryVersion schema.GroupVersion, config storagebackend.Config) (runtime.Codec, error) { - mediaType, _, err := mime.ParseMediaType(storageMediaType) +func NewStorageCodec(opts StorageCodecConfig) (runtime.Codec, error) { + mediaType, _, err := mime.ParseMediaType(opts.StorageMediaType) if err != nil { - return nil, fmt.Errorf("%q is not a valid mime-type", storageMediaType) + return nil, fmt.Errorf("%q is not a valid mime-type", opts.StorageMediaType) } - serializer, ok := runtime.SerializerInfoForMediaType(ns.SupportedMediaTypes(), mediaType) + serializer, ok := runtime.SerializerInfoForMediaType(opts.StorageSerializer.SupportedMediaTypes(), mediaType) if !ok { - return nil, fmt.Errorf("unable to find serializer for %q", storageMediaType) + return nil, fmt.Errorf("unable to find serializer for %q", opts.StorageMediaType) } s := serializer.Serializer // etcd2 only supports string data - we must wrap any result before returning // TODO: storagebackend should return a boolean indicating whether it supports binary data - if !serializer.EncodesAsText && (config.Type == storagebackend.StorageTypeUnset || config.Type == storagebackend.StorageTypeETCD2) { + if !serializer.EncodesAsText && (opts.Config.Type == storagebackend.StorageTypeUnset || opts.Config.Type == storagebackend.StorageTypeETCD2) { glog.V(4).Infof("Wrapping the underlying binary storage serializer with a base64 encoding for etcd2") s = runtime.NewBase64Serializer(s) } - encoder := ns.EncoderForVersion( - s, + // Give callers the opportunity to wrap encoders and decoders. For decoders, each returned decoder will + // be passed to the recognizer so that multiple decoders are available. + var encoder runtime.Encoder = s + if opts.EncoderDecoratorFn != nil { + encoder = opts.EncoderDecoratorFn(encoder) + } + decoders := []runtime.Decoder{s, opts.StorageSerializer.UniversalDeserializer()} + if opts.DecoderDecoratorFn != nil { + decoders = opts.DecoderDecoratorFn(decoders) + } + + // Ensure the storage receives the correct version. + encoder = opts.StorageSerializer.EncoderForVersion( + encoder, runtime.NewMultiGroupVersioner( - storageVersion, - schema.GroupKind{Group: storageVersion.Group}, - schema.GroupKind{Group: memoryVersion.Group}, + opts.StorageVersion, + schema.GroupKind{Group: opts.StorageVersion.Group}, + schema.GroupKind{Group: opts.MemoryVersion.Group}, ), ) - - ds := recognizer.NewDecoder(s, ns.UniversalDeserializer()) - decoder := ns.DecoderToVersion( - ds, + decoder := opts.StorageSerializer.DecoderToVersion( + recognizer.NewDecoder(decoders...), runtime.NewMultiGroupVersioner( - memoryVersion, - schema.GroupKind{Group: memoryVersion.Group}, - schema.GroupKind{Group: storageVersion.Group}, + opts.MemoryVersion, + schema.GroupKind{Group: opts.MemoryVersion.Group}, + schema.GroupKind{Group: opts.StorageVersion.Group}, ), ) diff --git a/pkg/genericapiserver/server/storage_factory_test.go b/pkg/genericapiserver/server/storage_factory_test.go index cd34dbe2dd0..eb3d5728147 100644 --- a/pkg/genericapiserver/server/storage_factory_test.go +++ b/pkg/genericapiserver/server/storage_factory_test.go @@ -20,6 +20,7 @@ import ( "reflect" "testing" + "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apiserver/pkg/server/options" "k8s.io/apiserver/pkg/storage/storagebackend" @@ -27,6 +28,67 @@ import ( "k8s.io/kubernetes/pkg/apis/extensions" ) +type fakeNegotiater struct { + serializer, streamSerializer runtime.Serializer + framer runtime.Framer + types, streamTypes []string +} + +func (n *fakeNegotiater) SupportedMediaTypes() []runtime.SerializerInfo { + var out []runtime.SerializerInfo + for _, s := range n.types { + info := runtime.SerializerInfo{Serializer: n.serializer, MediaType: s, EncodesAsText: true} + for _, t := range n.streamTypes { + if t == s { + info.StreamSerializer = &runtime.StreamSerializerInfo{ + EncodesAsText: true, + Framer: n.framer, + Serializer: n.streamSerializer, + } + } + } + out = append(out, info) + } + return out +} + +func (n *fakeNegotiater) UniversalDeserializer() runtime.Decoder { + return n.serializer +} + +func (n *fakeNegotiater) EncoderForVersion(serializer runtime.Encoder, gv runtime.GroupVersioner) runtime.Encoder { + return n.serializer +} + +func (n *fakeNegotiater) DecoderToVersion(serializer runtime.Decoder, gv runtime.GroupVersioner) runtime.Decoder { + return n.serializer +} + +func TestDefaultStorageFactory(t *testing.T) { + ns := &fakeNegotiater{types: []string{"test/test"}} + f := NewDefaultStorageFactory(storagebackend.Config{}, "test/test", ns, NewDefaultResourceEncodingConfig(), NewResourceConfig()) + f.AddCohabitatingResources(schema.GroupResource{Resource: "test"}, schema.GroupResource{Resource: "test2", Group: "2"}) + called := false + testEncoderChain := func(e runtime.Encoder) runtime.Encoder { + called = true + return e + } + f.AddSerializationChains(testEncoderChain, nil, schema.GroupResource{Resource: "test"}) + f.SetEtcdLocation(schema.GroupResource{Resource: "*"}, []string{"/server2"}) + f.SetEtcdPrefix(schema.GroupResource{Resource: "test"}, "/prefix_for_test") + + config, err := f.NewConfig(schema.GroupResource{Resource: "test"}) + if err != nil { + t.Fatal(err) + } + if config.Prefix != "/prefix_for_test" || !reflect.DeepEqual(config.ServerList, []string{"/server2"}) { + t.Errorf("unexpected config %#v", config) + } + if !called { + t.Errorf("expected encoder chain to be called") + } +} + func TestUpdateEtcdOverrides(t *testing.T) { testCases := []struct { resource schema.GroupResource