Allow StorageFactory to wrap encoders and decoders

Prepares for allowing encryption at rest of resources as well as any
other lower level optimization we might chose to implement.

Also cleans up a bunch of ugly code.
This commit is contained in:
Clayton Coleman 2017-01-27 16:04:34 -05:00
parent cb758738f9
commit 494eeaaa8e
No known key found for this signature in database
GPG Key ID: 3D16906B4F1C5CB3
2 changed files with 158 additions and 63 deletions

View File

@ -74,7 +74,19 @@ type DefaultStorageFactory struct {
APIResourceConfigSource APIResourceConfigSource
// newStorageCodecFn exists to be overwritten for unit testing.
newStorageCodecFn func(storageMediaType string, ns runtime.StorageSerializer, storageVersion, memoryVersion schema.GroupVersion, config storagebackend.Config) (codec runtime.Codec, err error)
newStorageCodecFn func(opts StorageCodecConfig) (codec runtime.Codec, err error)
}
// StorageCodecConfig are the arguments passed to newStorageCodecFn
type StorageCodecConfig struct {
StorageMediaType string
StorageSerializer runtime.StorageSerializer
StorageVersion schema.GroupVersion
MemoryVersion schema.GroupVersion
Config storagebackend.Config
EncoderDecoratorFn func(runtime.Encoder) runtime.Encoder
DecoderDecoratorFn func([]runtime.Decoder) []runtime.Decoder
}
type groupResourceOverrides struct {
@ -95,6 +107,34 @@ type groupResourceOverrides struct {
// of exposing one set of concepts. autoscaling.HPA and extensions.HPA as a for instance
// The order of the slice matters! It is the priority order of lookup for finding a storage location
cohabitatingResources []schema.GroupResource
// encoderDecoratorFn is optional and may wrap the provided encoder prior to being serialized.
encoderDecoratorFn func(runtime.Encoder) runtime.Encoder
// decoderDecoratorFn is optional and may wrap the provided decoders (can add new decoders). The order of
// returned decoders will be priority for attempt to decode.
decoderDecoratorFn func([]runtime.Decoder) []runtime.Decoder
}
// Apply overrides the provided config and options if the override has a value in that position
func (o groupResourceOverrides) Apply(config *storagebackend.Config, options *StorageCodecConfig) {
if len(o.etcdLocation) > 0 {
config.ServerList = o.etcdLocation
}
if len(o.etcdPrefix) > 0 {
config.Prefix = o.etcdPrefix
}
if len(o.mediaType) > 0 {
options.StorageMediaType = o.mediaType
}
if o.serializer != nil {
options.StorageSerializer = o.serializer
}
if o.encoderDecoratorFn != nil {
options.EncoderDecoratorFn = o.encoderDecoratorFn
}
if o.decoderDecoratorFn != nil {
options.DecoderDecoratorFn = o.decoderDecoratorFn
}
}
var _ StorageFactory = &DefaultStorageFactory{}
@ -165,6 +205,15 @@ func (s *DefaultStorageFactory) AddCohabitatingResources(groupResources ...schem
}
}
func (s *DefaultStorageFactory) AddSerializationChains(encoderDecoratorFn func(runtime.Encoder) runtime.Encoder, decoderDecoratorFn func([]runtime.Decoder) []runtime.Decoder, groupResources ...schema.GroupResource) {
for _, groupResource := range groupResources {
overrides := s.Overrides[groupResource]
overrides.encoderDecoratorFn = encoderDecoratorFn
overrides.decoderDecoratorFn = decoderDecoratorFn
s.Overrides[groupResource] = overrides
}
}
func getAllResourcesAlias(resource schema.GroupResource) schema.GroupResource {
return schema.GroupResource{Group: resource.Group, Resource: AllResources}
}
@ -184,64 +233,38 @@ func (s *DefaultStorageFactory) getStorageGroupResource(groupResource schema.Gro
func (s *DefaultStorageFactory) NewConfig(groupResource schema.GroupResource) (*storagebackend.Config, error) {
chosenStorageResource := s.getStorageGroupResource(groupResource)
groupOverride := s.Overrides[getAllResourcesAlias(chosenStorageResource)]
exactResourceOverride := s.Overrides[chosenStorageResource]
overriddenEtcdLocations := []string{}
if len(groupOverride.etcdLocation) > 0 {
overriddenEtcdLocations = groupOverride.etcdLocation
}
if len(exactResourceOverride.etcdLocation) > 0 {
overriddenEtcdLocations = exactResourceOverride.etcdLocation
}
etcdPrefix := s.StorageConfig.Prefix
if len(groupOverride.etcdPrefix) > 0 {
etcdPrefix = groupOverride.etcdPrefix
}
if len(exactResourceOverride.etcdPrefix) > 0 {
etcdPrefix = exactResourceOverride.etcdPrefix
}
etcdMediaType := s.DefaultMediaType
if len(groupOverride.mediaType) != 0 {
etcdMediaType = groupOverride.mediaType
}
if len(exactResourceOverride.mediaType) != 0 {
etcdMediaType = exactResourceOverride.mediaType
}
etcdSerializer := s.DefaultSerializer
if groupOverride.serializer != nil {
etcdSerializer = groupOverride.serializer
}
if exactResourceOverride.serializer != nil {
etcdSerializer = exactResourceOverride.serializer
}
// operate on copy
config := s.StorageConfig
config.Prefix = etcdPrefix
if len(overriddenEtcdLocations) > 0 {
config.ServerList = overriddenEtcdLocations
storageConfig := s.StorageConfig
codecConfig := StorageCodecConfig{
StorageMediaType: s.DefaultMediaType,
StorageSerializer: s.DefaultSerializer,
}
storageEncodingVersion, err := s.ResourceEncodingConfig.StorageEncodingFor(chosenStorageResource)
if override, ok := s.Overrides[getAllResourcesAlias(chosenStorageResource)]; ok {
override.Apply(&storageConfig, &codecConfig)
}
if override, ok := s.Overrides[chosenStorageResource]; ok {
override.Apply(&storageConfig, &codecConfig)
}
var err error
codecConfig.StorageVersion, err = s.ResourceEncodingConfig.StorageEncodingFor(chosenStorageResource)
if err != nil {
return nil, err
}
internalVersion, err := s.ResourceEncodingConfig.InMemoryEncodingFor(groupResource)
codecConfig.MemoryVersion, err = s.ResourceEncodingConfig.InMemoryEncodingFor(groupResource)
if err != nil {
return nil, err
}
codecConfig.Config = storageConfig
codec, err := s.newStorageCodecFn(etcdMediaType, etcdSerializer, storageEncodingVersion, internalVersion, config)
storageConfig.Codec, err = s.newStorageCodecFn(codecConfig)
if err != nil {
return nil, err
}
glog.V(3).Infof("storing %v in %v, reading as %v from %v", groupResource, codecConfig.StorageVersion, codecConfig.MemoryVersion, codecConfig.Config)
glog.V(3).Infof("storing %v in %v, reading as %v from %v", groupResource, storageEncodingVersion, internalVersion, config)
config.Codec = codec
return &config, nil
return &storageConfig, nil
}
// Get all backends for all registered storage destinations.
@ -257,41 +280,51 @@ func (s *DefaultStorageFactory) Backends() []string {
// NewStorageCodec assembles a storage codec for the provided storage media type, the provided serializer, and the requested
// storage and memory versions.
func NewStorageCodec(storageMediaType string, ns runtime.StorageSerializer, storageVersion, memoryVersion schema.GroupVersion, config storagebackend.Config) (runtime.Codec, error) {
mediaType, _, err := mime.ParseMediaType(storageMediaType)
func NewStorageCodec(opts StorageCodecConfig) (runtime.Codec, error) {
mediaType, _, err := mime.ParseMediaType(opts.StorageMediaType)
if err != nil {
return nil, fmt.Errorf("%q is not a valid mime-type", storageMediaType)
return nil, fmt.Errorf("%q is not a valid mime-type", opts.StorageMediaType)
}
serializer, ok := runtime.SerializerInfoForMediaType(ns.SupportedMediaTypes(), mediaType)
serializer, ok := runtime.SerializerInfoForMediaType(opts.StorageSerializer.SupportedMediaTypes(), mediaType)
if !ok {
return nil, fmt.Errorf("unable to find serializer for %q", storageMediaType)
return nil, fmt.Errorf("unable to find serializer for %q", opts.StorageMediaType)
}
s := serializer.Serializer
// etcd2 only supports string data - we must wrap any result before returning
// TODO: storagebackend should return a boolean indicating whether it supports binary data
if !serializer.EncodesAsText && (config.Type == storagebackend.StorageTypeUnset || config.Type == storagebackend.StorageTypeETCD2) {
if !serializer.EncodesAsText && (opts.Config.Type == storagebackend.StorageTypeUnset || opts.Config.Type == storagebackend.StorageTypeETCD2) {
glog.V(4).Infof("Wrapping the underlying binary storage serializer with a base64 encoding for etcd2")
s = runtime.NewBase64Serializer(s)
}
encoder := ns.EncoderForVersion(
s,
// Give callers the opportunity to wrap encoders and decoders. For decoders, each returned decoder will
// be passed to the recognizer so that multiple decoders are available.
var encoder runtime.Encoder = s
if opts.EncoderDecoratorFn != nil {
encoder = opts.EncoderDecoratorFn(encoder)
}
decoders := []runtime.Decoder{s, opts.StorageSerializer.UniversalDeserializer()}
if opts.DecoderDecoratorFn != nil {
decoders = opts.DecoderDecoratorFn(decoders)
}
// Ensure the storage receives the correct version.
encoder = opts.StorageSerializer.EncoderForVersion(
encoder,
runtime.NewMultiGroupVersioner(
storageVersion,
schema.GroupKind{Group: storageVersion.Group},
schema.GroupKind{Group: memoryVersion.Group},
opts.StorageVersion,
schema.GroupKind{Group: opts.StorageVersion.Group},
schema.GroupKind{Group: opts.MemoryVersion.Group},
),
)
ds := recognizer.NewDecoder(s, ns.UniversalDeserializer())
decoder := ns.DecoderToVersion(
ds,
decoder := opts.StorageSerializer.DecoderToVersion(
recognizer.NewDecoder(decoders...),
runtime.NewMultiGroupVersioner(
memoryVersion,
schema.GroupKind{Group: memoryVersion.Group},
schema.GroupKind{Group: storageVersion.Group},
opts.MemoryVersion,
schema.GroupKind{Group: opts.MemoryVersion.Group},
schema.GroupKind{Group: opts.StorageVersion.Group},
),
)

View File

@ -20,6 +20,7 @@ import (
"reflect"
"testing"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apiserver/pkg/storage/storagebackend"
"k8s.io/kubernetes/pkg/api"
@ -27,6 +28,67 @@ import (
"k8s.io/kubernetes/pkg/genericapiserver/server/options"
)
type fakeNegotiater struct {
serializer, streamSerializer runtime.Serializer
framer runtime.Framer
types, streamTypes []string
}
func (n *fakeNegotiater) SupportedMediaTypes() []runtime.SerializerInfo {
var out []runtime.SerializerInfo
for _, s := range n.types {
info := runtime.SerializerInfo{Serializer: n.serializer, MediaType: s, EncodesAsText: true}
for _, t := range n.streamTypes {
if t == s {
info.StreamSerializer = &runtime.StreamSerializerInfo{
EncodesAsText: true,
Framer: n.framer,
Serializer: n.streamSerializer,
}
}
}
out = append(out, info)
}
return out
}
func (n *fakeNegotiater) UniversalDeserializer() runtime.Decoder {
return n.serializer
}
func (n *fakeNegotiater) EncoderForVersion(serializer runtime.Encoder, gv runtime.GroupVersioner) runtime.Encoder {
return n.serializer
}
func (n *fakeNegotiater) DecoderToVersion(serializer runtime.Decoder, gv runtime.GroupVersioner) runtime.Decoder {
return n.serializer
}
func TestDefaultStorageFactory(t *testing.T) {
ns := &fakeNegotiater{types: []string{"test/test"}}
f := NewDefaultStorageFactory(storagebackend.Config{}, "test/test", ns, NewDefaultResourceEncodingConfig(), NewResourceConfig())
f.AddCohabitatingResources(schema.GroupResource{Resource: "test"}, schema.GroupResource{Resource: "test2", Group: "2"})
called := false
testEncoderChain := func(e runtime.Encoder) runtime.Encoder {
called = true
return e
}
f.AddSerializationChains(testEncoderChain, nil, schema.GroupResource{Resource: "test"})
f.SetEtcdLocation(schema.GroupResource{Resource: "*"}, []string{"/server2"})
f.SetEtcdPrefix(schema.GroupResource{Resource: "test"}, "/prefix_for_test")
config, err := f.NewConfig(schema.GroupResource{Resource: "test"})
if err != nil {
t.Fatal(err)
}
if config.Prefix != "/prefix_for_test" || !reflect.DeepEqual(config.ServerList, []string{"/server2"}) {
t.Errorf("unexpected config %#v", config)
}
if !called {
t.Errorf("expected encoder chain to be called")
}
}
func TestUpdateEtcdOverrides(t *testing.T) {
testCases := []struct {
resource schema.GroupResource