Merge pull request #23208 from deads2k/fix-version-override

Automatic merge from submit-queue

make storage enablement, serialization, and location orthogonal

This allows a caller (command-line, config, code) to specify multiple separate pieces of config information regarding storage and have them properly composed at runtime.  The information provided is exposed through interfaces to allow alternate implementations, which allows us to change the expression of the config moving forward.  I also fixed up the types to be correct as I moved through.

The same options still exist, but they're composed slightly differently
 1. specify target etcd servers per Group or per GroupResource
 1. specify storage GroupVersions per Groups or per GroupResource
 1. specify etcd prefixes per GroupVersion or per GroupResource
 1. specify that multiple GroupResources share the same location in etcd
 1. enable GroupResources by GroupVersion or by GroupResource whitelist or GroupResource blacklist

The `storage.Interface` is built per GroupResource by:
 1. find the set of possible storage GroupResource based on the priority list of cohabitators
 1. choose a GroupResource from the set by looking at which Groups have the resource enabled
 1. find the target etcd server, etcd prefix, and storage encoding based on the GroupResource

The API server can have its resources separately enabled, but for now I've kept them linked.

@liggitt I think we need this (or something like it) to be able to go from config to these interfaces.  Given another round of refactoring, we may be able to reshape these to be more forward driving.

@smarterclayton this is important for rebasing and for a seamless 1.2 to 1.3 migration for us.
This commit is contained in:
k8s-merge-robot 2016-04-21 08:24:29 -07:00
commit 85de6acadc
18 changed files with 765 additions and 631 deletions

View File

@ -23,6 +23,7 @@ import (
"k8s.io/kubernetes/pkg/admission"
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/api/unversioned"
apiutil "k8s.io/kubernetes/pkg/api/util"
"k8s.io/kubernetes/pkg/api/validation"
"k8s.io/kubernetes/pkg/apimachinery/registered"
@ -98,39 +99,53 @@ func NewAPIServer() *APIServer {
}
// dest must be a map of group to groupVersion.
func gvToMap(gvList string, dest map[string]string) {
for _, gv := range strings.Split(gvList, ",") {
if gv == "" {
func mergeGroupVersionIntoMap(gvList string, dest map[string]unversioned.GroupVersion) error {
for _, gvString := range strings.Split(gvList, ",") {
if gvString == "" {
continue
}
// We accept two formats. "group/version" OR
// "group=group/version". The latter is used when types
// move between groups.
if !strings.Contains(gv, "=") {
dest[apiutil.GetGroup(gv)] = gv
if !strings.Contains(gvString, "=") {
gv, err := unversioned.ParseGroupVersion(gvString)
if err != nil {
return err
}
dest[gv.Group] = gv
} else {
parts := strings.SplitN(gv, "=", 2)
// TODO: error checking.
dest[parts[0]] = parts[1]
parts := strings.SplitN(gvString, "=", 2)
gv, err := unversioned.ParseGroupVersion(parts[1])
if err != nil {
return err
}
dest[parts[0]] = gv
}
}
return nil
}
// StorageGroupsToGroupVersions returns a map from group name to group version,
// StorageGroupsToEncodingVersion returns a map from group name to group version,
// computed from the s.DeprecatedStorageVersion and s.StorageVersions flags.
// TODO: can we move the whole storage version concept to the generic apiserver?
func (s *APIServer) StorageGroupsToGroupVersions() map[string]string {
storageVersionMap := map[string]string{}
func (s *APIServer) StorageGroupsToEncodingVersion() (map[string]unversioned.GroupVersion, error) {
storageVersionMap := map[string]unversioned.GroupVersion{}
if s.DeprecatedStorageVersion != "" {
storageVersionMap[""] = s.DeprecatedStorageVersion
storageVersionMap[""] = unversioned.GroupVersion{Group: apiutil.GetGroup(s.DeprecatedStorageVersion), Version: apiutil.GetVersion(s.DeprecatedStorageVersion)}
}
// First, get the defaults.
gvToMap(s.DefaultStorageVersions, storageVersionMap)
if err := mergeGroupVersionIntoMap(s.DefaultStorageVersions, storageVersionMap); err != nil {
return nil, err
}
// Override any defaults with the user settings.
gvToMap(s.StorageVersions, storageVersionMap)
if err := mergeGroupVersionIntoMap(s.StorageVersions, storageVersionMap); err != nil {
return nil, err
}
return storageVersionMap
return storageVersionMap, nil
}
// AddFlags adds flags for a specific APIServer to the specified FlagSet

View File

@ -23,6 +23,7 @@ import (
"github.com/spf13/pflag"
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/api/unversioned"
"k8s.io/kubernetes/pkg/apis/autoscaling"
"k8s.io/kubernetes/pkg/apis/extensions"
)
@ -32,38 +33,38 @@ func TestGenerateStorageVersionMap(t *testing.T) {
legacyVersion string
storageVersions string
defaultVersions string
expectedMap map[string]string
expectedMap map[string]unversioned.GroupVersion
}{
{
legacyVersion: "v1",
storageVersions: "v1,extensions/v1beta1",
expectedMap: map[string]string{
api.GroupName: "v1",
extensions.GroupName: "extensions/v1beta1",
expectedMap: map[string]unversioned.GroupVersion{
api.GroupName: {Version: "v1"},
extensions.GroupName: {Group: "extensions", Version: "v1beta1"},
},
},
{
legacyVersion: "",
storageVersions: "extensions/v1beta1,v1",
expectedMap: map[string]string{
api.GroupName: "v1",
extensions.GroupName: "extensions/v1beta1",
expectedMap: map[string]unversioned.GroupVersion{
api.GroupName: {Version: "v1"},
extensions.GroupName: {Group: "extensions", Version: "v1beta1"},
},
},
{
legacyVersion: "",
storageVersions: "autoscaling=extensions/v1beta1,v1",
defaultVersions: "extensions/v1beta1,v1,autoscaling/v1",
expectedMap: map[string]string{
api.GroupName: "v1",
autoscaling.GroupName: "extensions/v1beta1",
extensions.GroupName: "extensions/v1beta1",
expectedMap: map[string]unversioned.GroupVersion{
api.GroupName: {Version: "v1"},
autoscaling.GroupName: {Group: "extensions", Version: "v1beta1"},
extensions.GroupName: {Group: "extensions", Version: "v1beta1"},
},
},
{
legacyVersion: "",
storageVersions: "",
expectedMap: map[string]string{},
expectedMap: map[string]unversioned.GroupVersion{},
},
}
for i, test := range testCases {
@ -72,7 +73,10 @@ func TestGenerateStorageVersionMap(t *testing.T) {
StorageVersions: test.storageVersions,
DefaultStorageVersions: test.defaultVersions,
}
output := s.StorageGroupsToGroupVersions()
output, err := s.StorageGroupsToEncodingVersion()
if err != nil {
t.Errorf("%v: unexpected error: %v", i, err)
}
if !reflect.DeepEqual(test.expectedMap, output) {
t.Errorf("%v: unexpected error. expect: %v, got: %v", i, test.expectedMap, output)
}

View File

@ -37,8 +37,6 @@ import (
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/api/unversioned"
apiv1 "k8s.io/kubernetes/pkg/api/v1"
"k8s.io/kubernetes/pkg/apimachinery/registered"
"k8s.io/kubernetes/pkg/apis/apps"
appsapi "k8s.io/kubernetes/pkg/apis/apps/v1alpha1"
"k8s.io/kubernetes/pkg/apis/autoscaling"
autoscalingapiv1 "k8s.io/kubernetes/pkg/apis/autoscaling/v1"
@ -58,10 +56,7 @@ import (
"k8s.io/kubernetes/pkg/master"
"k8s.io/kubernetes/pkg/registry/cachesize"
"k8s.io/kubernetes/pkg/runtime"
"k8s.io/kubernetes/pkg/runtime/serializer/versioning"
"k8s.io/kubernetes/pkg/serviceaccount"
"k8s.io/kubernetes/pkg/storage"
etcdstorage "k8s.io/kubernetes/pkg/storage/etcd"
)
// NewAPIServerCommand creates a *cobra.Command object with default parameters
@ -81,89 +76,6 @@ cluster's shared state through which all other components interact.`,
return cmd
}
// For testing.
type newEtcdFunc func(runtime.NegotiatedSerializer, string, string, etcdstorage.EtcdConfig) (storage.Interface, error)
func newEtcd(ns runtime.NegotiatedSerializer, storageGroupVersionString, memoryGroupVersionString string, etcdConfig etcdstorage.EtcdConfig) (etcdStorage storage.Interface, err error) {
if storageGroupVersionString == "" {
return etcdStorage, fmt.Errorf("storageVersion is required to create a etcd storage")
}
storageVersion, err := unversioned.ParseGroupVersion(storageGroupVersionString)
if err != nil {
return nil, fmt.Errorf("couldn't understand storage version %v: %v", storageGroupVersionString, err)
}
memoryVersion, err := unversioned.ParseGroupVersion(memoryGroupVersionString)
if err != nil {
return nil, fmt.Errorf("couldn't understand memory version %v: %v", memoryGroupVersionString, err)
}
var storageConfig etcdstorage.EtcdStorageConfig
storageConfig.Config = etcdConfig
s, ok := ns.SerializerForMediaType("application/json", nil)
if !ok {
return nil, fmt.Errorf("unable to find serializer for JSON")
}
glog.Infof("constructing etcd storage interface.\n sv: %v\n mv: %v\n", storageVersion, memoryVersion)
encoder := ns.EncoderForVersion(s, storageVersion)
decoder := ns.DecoderToVersion(s, memoryVersion)
if memoryVersion.Group != storageVersion.Group {
// Allow this codec to translate between groups.
if err = versioning.EnableCrossGroupEncoding(encoder, memoryVersion.Group, storageVersion.Group); err != nil {
return nil, fmt.Errorf("error setting up encoder for %v: %v", storageGroupVersionString, err)
}
if err = versioning.EnableCrossGroupDecoding(decoder, storageVersion.Group, memoryVersion.Group); err != nil {
return nil, fmt.Errorf("error setting up decoder for %v: %v", storageGroupVersionString, err)
}
}
storageConfig.Codec = runtime.NewCodec(encoder, decoder)
return storageConfig.NewStorage()
}
// parse the value of --etcd-servers-overrides and update given storageDestinations.
func updateEtcdOverrides(overrides []string, storageVersions map[string]string, etcdConfig etcdstorage.EtcdConfig, storageDestinations *genericapiserver.StorageDestinations, newEtcdFn newEtcdFunc) {
if len(overrides) == 0 {
return
}
for _, override := range overrides {
tokens := strings.Split(override, "#")
if len(tokens) != 2 {
glog.Errorf("invalid value of etcd server overrides: %s", override)
continue
}
apiresource := strings.Split(tokens[0], "/")
if len(apiresource) != 2 {
glog.Errorf("invalid resource definition: %s", tokens[0])
}
group := apiresource[0]
resource := apiresource[1]
apigroup, err := registered.Group(group)
if err != nil {
glog.Errorf("invalid api group %s: %v", group, err)
continue
}
if _, found := storageVersions[apigroup.GroupVersion.Group]; !found {
glog.Errorf("Couldn't find the storage version for group %s", apigroup.GroupVersion.Group)
continue
}
servers := strings.Split(tokens[1], ";")
overrideEtcdConfig := etcdConfig
overrideEtcdConfig.ServerList = servers
// Note, internalGV will be wrong for things like batch or
// autoscalers, but they shouldn't be using the override
// storage.
internalGV := apigroup.GroupVersion.Group + "/__internal"
etcdOverrideStorage, err := newEtcdFn(api.Codecs, storageVersions[apigroup.GroupVersion.Group], internalGV, overrideEtcdConfig)
if err != nil {
glog.Fatalf("Invalid storage version or misconfigured etcd for %s: %v", tokens[0], err)
}
storageDestinations.AddStorageOverride(group, resource, etcdOverrideStorage)
}
}
// Run runs the specified APIServer. This should never exit.
func Run(s *options.APIServer) error {
genericapiserver.DefaultAndValidateRunOptions(s.ServerRunOptions)
@ -252,126 +164,37 @@ func Run(s *options.APIServer) error {
glog.Errorf("Failed to create clientset: %v", err)
}
legacyV1Group, err := registered.Group(api.GroupName)
resourceEncoding := genericapiserver.NewDefaultResourceEncodingConfig()
groupToEncoding, err := s.StorageGroupsToEncodingVersion()
if err != nil {
return err
glog.Fatalf("error getting group encoding: %s", err)
}
for group, storageEncodingVersion := range groupToEncoding {
resourceEncoding.SetVersionEncoding(group, storageEncodingVersion, unversioned.GroupVersion{Group: group, Version: runtime.APIVersionInternal})
}
storageDestinations := genericapiserver.NewStorageDestinations()
storageFactory := genericapiserver.NewDefaultStorageFactory(s.EtcdConfig, api.Codecs, resourceEncoding, apiResourceConfigSource)
storageFactory.AddCohabitatingResources(batch.Resource("jobs"), extensions.Resource("jobs"))
storageFactory.AddCohabitatingResources(autoscaling.Resource("horizontalpodautoscalers"), extensions.Resource("horizontalpodautoscalers"))
for _, override := range s.EtcdServersOverrides {
tokens := strings.Split(override, "#")
if len(tokens) != 2 {
glog.Errorf("invalid value of etcd server overrides: %s", override)
continue
}
storageVersions := s.StorageGroupsToGroupVersions()
if _, found := storageVersions[legacyV1Group.GroupVersion.Group]; !found {
glog.Fatalf("Couldn't find the storage version for group: %q in storageVersions: %v", legacyV1Group.GroupVersion.Group, storageVersions)
apiresource := strings.Split(tokens[0], "/")
if len(apiresource) != 2 {
glog.Errorf("invalid resource definition: %s", tokens[0])
continue
}
group := apiresource[0]
resource := apiresource[1]
groupResource := unversioned.GroupResource{Group: group, Resource: resource}
servers := strings.Split(tokens[1], ";")
storageFactory.SetEtcdLocation(groupResource, servers)
}
etcdStorage, err := newEtcd(api.Codecs, storageVersions[legacyV1Group.GroupVersion.Group], "/__internal", s.EtcdConfig)
if err != nil {
glog.Fatalf("Invalid storage version or misconfigured etcd: %v", err)
}
storageDestinations.AddAPIGroup("", etcdStorage)
if apiResourceConfigSource.AnyResourcesForVersionEnabled(extensionsapiv1beta1.SchemeGroupVersion) {
glog.Infof("Configuring extensions/v1beta1 storage destination")
expGroup, err := registered.Group(extensions.GroupName)
if err != nil {
glog.Fatalf("Extensions API is enabled in runtime config, but not enabled in the environment variable KUBE_API_VERSIONS. Error: %v", err)
}
if _, found := storageVersions[expGroup.GroupVersion.Group]; !found {
glog.Fatalf("Couldn't find the storage version for group: %q in storageVersions: %v", expGroup.GroupVersion.Group, storageVersions)
}
expEtcdStorage, err := newEtcd(api.Codecs, storageVersions[expGroup.GroupVersion.Group], "extensions/__internal", s.EtcdConfig)
if err != nil {
glog.Fatalf("Invalid extensions storage version or misconfigured etcd: %v", err)
}
storageDestinations.AddAPIGroup(extensions.GroupName, expEtcdStorage)
// Since HPA has been moved to the autoscaling group, we need to make
// sure autoscaling has a storage destination. If the autoscaling group
// itself is on, it will overwrite this decision below.
storageDestinations.AddAPIGroup(autoscaling.GroupName, expEtcdStorage)
// Since Job has been moved to the batch group, we need to make
// sure batch has a storage destination. If the batch group
// itself is on, it will overwrite this decision below.
storageDestinations.AddAPIGroup(batch.GroupName, expEtcdStorage)
}
// autoscaling/v1/horizontalpodautoscalers is a move from extensions/v1beta1/horizontalpodautoscalers.
// The storage version needs to be either extensions/v1beta1 or autoscaling/v1.
// Users must roll forward while using 1.2, because we will require the latter for 1.3.
if apiResourceConfigSource.AnyResourcesForVersionEnabled(autoscalingapiv1.SchemeGroupVersion) {
glog.Infof("Configuring autoscaling/v1 storage destination")
autoscalingGroup, err := registered.Group(autoscaling.GroupName)
if err != nil {
glog.Fatalf("Autoscaling API is enabled in runtime config, but not enabled in the environment variable KUBE_API_VERSIONS. Error: %v", err)
}
// Figure out what storage group/version we should use.
storageGroupVersion, found := storageVersions[autoscalingGroup.GroupVersion.Group]
if !found {
glog.Fatalf("Couldn't find the storage version for group: %q in storageVersions: %v", autoscalingGroup.GroupVersion.Group, storageVersions)
}
if storageGroupVersion != "autoscaling/v1" && storageGroupVersion != "extensions/v1beta1" {
glog.Fatalf("The storage version for autoscaling must be either 'autoscaling/v1' or 'extensions/v1beta1'")
}
glog.Infof("Using %v for autoscaling group storage version", storageGroupVersion)
autoscalingEtcdStorage, err := newEtcd(api.Codecs, storageGroupVersion, "extensions/__internal", s.EtcdConfig)
if err != nil {
glog.Fatalf("Invalid extensions storage version or misconfigured etcd: %v", err)
}
storageDestinations.AddAPIGroup(autoscaling.GroupName, autoscalingEtcdStorage)
}
// batch/v1/job is a move from extensions/v1beta1/job. The storage
// version needs to be either extensions/v1beta1 or batch/v1. Users
// must roll forward while using 1.2, because we will require the
// latter for 1.3.
if apiResourceConfigSource.AnyResourcesForVersionEnabled(batchapiv1.SchemeGroupVersion) {
glog.Infof("Configuring batch/v1 storage destination")
batchGroup, err := registered.Group(batch.GroupName)
if err != nil {
glog.Fatalf("Batch API is enabled in runtime config, but not enabled in the environment variable KUBE_API_VERSIONS. Error: %v", err)
}
// Figure out what storage group/version we should use.
storageGroupVersion, found := storageVersions[batchGroup.GroupVersion.Group]
if !found {
glog.Fatalf("Couldn't find the storage version for group: %q in storageVersions: %v", batchGroup.GroupVersion.Group, storageVersions)
}
if storageGroupVersion != "batch/v1" && storageGroupVersion != "extensions/v1beta1" {
glog.Fatalf("The storage version for batch must be either 'batch/v1' or 'extensions/v1beta1'")
}
glog.Infof("Using %v for batch group storage version", storageGroupVersion)
batchEtcdStorage, err := newEtcd(api.Codecs, storageGroupVersion, "extensions/__internal", s.EtcdConfig)
if err != nil {
glog.Fatalf("Invalid extensions storage version or misconfigured etcd: %v", err)
}
storageDestinations.AddAPIGroup(batch.GroupName, batchEtcdStorage)
}
if apiResourceConfigSource.AnyResourcesForVersionEnabled(appsapi.SchemeGroupVersion) {
glog.Infof("Configuring apps/v1alpha1 storage destination")
appsGroup, err := registered.Group(apps.GroupName)
if err != nil {
glog.Fatalf("Apps API is enabled in runtime config, but not enabled in the environment variable KUBE_API_VERSIONS. Error: %v", err)
}
// Figure out what storage group/version we should use.
storageGroupVersion, found := storageVersions[appsGroup.GroupVersion.Group]
if !found {
glog.Fatalf("Couldn't find the storage version for group: %q in storageVersions: %v", appsGroup.GroupVersion.Group, storageVersions)
}
if storageGroupVersion != "apps/v1alpha1" {
glog.Fatalf("The storage version for apps must be apps/v1alpha1")
}
glog.Infof("Using %v for petset group storage version", storageGroupVersion)
appsEtcdStorage, err := newEtcd(api.Codecs, storageGroupVersion, "apps/__internal", s.EtcdConfig)
if err != nil {
glog.Fatalf("Invalid extensions storage version or misconfigured etcd: %v", err)
}
storageDestinations.AddAPIGroup(apps.GroupName, appsEtcdStorage)
}
updateEtcdOverrides(s.EtcdServersOverrides, storageVersions, s.EtcdConfig, &storageDestinations, newEtcd)
// Default to the private server key for service account token signing
if s.ServiceAccountKeyFile == "" && s.TLSPrivateKeyFile != "" {
@ -386,7 +209,11 @@ func Run(s *options.APIServer) error {
if s.ServiceAccountLookup {
// If we need to look up service accounts and tokens,
// go directly to etcd to avoid recursive auth insanity
serviceAccountGetter = serviceaccountcontroller.NewGetterFromStorageInterface(etcdStorage)
storage, err := storageFactory.New(api.Resource("serviceaccounts"))
if err != nil {
glog.Fatalf("Unable to get serviceaccounts storage: %v", err)
}
serviceAccountGetter = serviceaccountcontroller.NewGetterFromStorageInterface(storage)
}
authenticator, err := authenticator.New(authenticator.AuthenticatorConfig{
@ -443,8 +270,7 @@ func Run(s *options.APIServer) error {
genericConfig := genericapiserver.NewConfig(s.ServerRunOptions)
// TODO: Move the following to generic api server as well.
genericConfig.StorageDestinations = storageDestinations
genericConfig.StorageVersions = storageVersions
genericConfig.StorageFactory = storageFactory
genericConfig.Authenticator = authenticator
genericConfig.SupportsBasicAuth = len(s.BasicAuthFile) > 0
genericConfig.Authorizer = authorizer

View File

@ -19,18 +19,12 @@ package app
import (
"reflect"
"regexp"
"strings"
"testing"
"k8s.io/kubernetes/cmd/kube-apiserver/app/options"
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/api/unversioned"
"k8s.io/kubernetes/pkg/apis/extensions"
"k8s.io/kubernetes/pkg/genericapiserver"
"k8s.io/kubernetes/pkg/master"
"k8s.io/kubernetes/pkg/runtime"
"k8s.io/kubernetes/pkg/storage"
etcdstorage "k8s.io/kubernetes/pkg/storage/etcd"
)
func TestLongRunningRequestRegexp(t *testing.T) {
@ -74,64 +68,6 @@ func TestLongRunningRequestRegexp(t *testing.T) {
}
}
func TestUpdateEtcdOverrides(t *testing.T) {
storageVersions := map[string]string{
"": "v1",
"extensions": "extensions/v1beta1",
}
testCases := []struct {
apigroup string
resource string
servers []string
}{
{
apigroup: api.GroupName,
resource: "resource",
servers: []string{"http://127.0.0.1:10000"},
},
{
apigroup: api.GroupName,
resource: "resource",
servers: []string{"http://127.0.0.1:10000", "http://127.0.0.1:20000"},
},
{
apigroup: extensions.GroupName,
resource: "resource",
servers: []string{"http://127.0.0.1:10000"},
},
}
for _, test := range testCases {
newEtcd := func(_ runtime.NegotiatedSerializer, _, _ string, etcdConfig etcdstorage.EtcdConfig) (storage.Interface, error) {
if !reflect.DeepEqual(test.servers, etcdConfig.ServerList) {
t.Errorf("unexpected server list, expected: %#v, got: %#v", test.servers, etcdConfig.ServerList)
}
return nil, nil
}
storageDestinations := genericapiserver.NewStorageDestinations()
override := test.apigroup + "/" + test.resource + "#" + strings.Join(test.servers, ";")
defaultEtcdConfig := etcdstorage.EtcdConfig{
Prefix: genericapiserver.DefaultEtcdPathPrefix,
ServerList: []string{"http://127.0.0.1"},
}
updateEtcdOverrides([]string{override}, storageVersions, defaultEtcdConfig, &storageDestinations, newEtcd)
apigroup, ok := storageDestinations.APIGroups[test.apigroup]
if !ok {
t.Errorf("apigroup: %s not created", test.apigroup)
continue
}
if apigroup.Overrides == nil {
t.Errorf("Overrides not created for: %s", test.apigroup)
continue
}
if _, ok := apigroup.Overrides[test.resource]; !ok {
t.Errorf("override not created for: %s", test.resource)
continue
}
}
}
func TestParseRuntimeConfig(t *testing.T) {
testCases := []struct {
runtimeConfig map[string]string

View File

@ -24,7 +24,7 @@ import (
testgroupetcd "k8s.io/kubernetes/examples/apiserver/rest"
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/api/rest"
"k8s.io/kubernetes/pkg/apimachinery"
"k8s.io/kubernetes/pkg/api/unversioned"
"k8s.io/kubernetes/pkg/apimachinery/registered"
"k8s.io/kubernetes/pkg/genericapiserver"
etcdstorage "k8s.io/kubernetes/pkg/storage/etcd"
@ -40,20 +40,14 @@ const (
SecurePort = 6444
)
func newStorageDestinations(groupName string, groupMeta *apimachinery.GroupMeta) (*genericapiserver.StorageDestinations, error) {
storageDestinations := genericapiserver.NewStorageDestinations()
var storageConfig etcdstorage.EtcdStorageConfig
storageConfig.Config = etcdstorage.EtcdConfig{
func newStorageFactory() genericapiserver.StorageFactory {
etcdConfig := etcdstorage.EtcdConfig{
Prefix: genericapiserver.DefaultEtcdPathPrefix,
ServerList: []string{"http://127.0.0.1:4001"},
}
storageConfig.Codec = groupMeta.Codec
storageInterface, err := storageConfig.NewStorage()
if err != nil {
return nil, err
}
storageDestinations.AddAPIGroup(groupName, storageInterface)
return &storageDestinations, nil
storageFactory := genericapiserver.NewDefaultStorageFactory(etcdConfig, api.Codecs, genericapiserver.NewDefaultResourceEncodingConfig(), genericapiserver.NewResourceConfig())
return storageFactory
}
func NewServerRunOptions() *genericapiserver.ServerRunOptions {
@ -86,12 +80,14 @@ func Run(serverOptions *genericapiserver.ServerRunOptions) error {
if err != nil {
return fmt.Errorf("%v", err)
}
storageDestinations, err := newStorageDestinations(groupName, groupMeta)
storageFactory := newStorageFactory()
storage, err := storageFactory.New(unversioned.GroupResource{Group: groupName, Resource: "testtype"})
if err != nil {
return fmt.Errorf("Unable to init etcd: %v", err)
return fmt.Errorf("Unable to get storage: %v", err)
}
restStorageMap := map[string]rest.Storage{
"testtypes": testgroupetcd.NewREST(storageDestinations.Get(groupName, "testtype"), s.StorageDecorator()),
"testtypes": testgroupetcd.NewREST(storage, s.StorageDecorator()),
}
apiGroupInfo := genericapiserver.APIGroupInfo{
GroupMeta: *groupMeta,

View File

@ -24,6 +24,7 @@ import (
"k8s.io/kubernetes/pkg/admission"
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/api/unversioned"
apiutil "k8s.io/kubernetes/pkg/api/util"
"k8s.io/kubernetes/pkg/api/validation"
"k8s.io/kubernetes/pkg/apimachinery/registered"
@ -119,39 +120,53 @@ func NewAPIServer() *APIServer {
}
// dest must be a map of group to groupVersion.
func gvToMap(gvList string, dest map[string]string) {
for _, gv := range strings.Split(gvList, ",") {
if gv == "" {
func mergeGroupVersionIntoMap(gvList string, dest map[string]unversioned.GroupVersion) error {
for _, gvString := range strings.Split(gvList, ",") {
if gvString == "" {
continue
}
// We accept two formats. "group/version" OR
// "group=group/version". The latter is used when types
// move between groups.
if !strings.Contains(gv, "=") {
dest[apiutil.GetGroup(gv)] = gv
if !strings.Contains(gvString, "=") {
gv, err := unversioned.ParseGroupVersion(gvString)
if err != nil {
return err
}
dest[gv.Group] = gv
} else {
parts := strings.SplitN(gv, "=", 2)
// TODO: error checking.
dest[parts[0]] = parts[1]
parts := strings.SplitN(gvString, "=", 2)
gv, err := unversioned.ParseGroupVersion(parts[1])
if err != nil {
return err
}
dest[parts[0]] = gv
}
}
return nil
}
// StorageGroupsToGroupVersions returns a map from group name to group version,
// StorageGroupsToEncodingVersion returns a map from group name to group version,
// computed from the s.DeprecatedStorageVersion and s.StorageVersions flags.
// TODO: can we move the whole storage version concept to the generic apiserver?
func (s *APIServer) StorageGroupsToGroupVersions() map[string]string {
storageVersionMap := map[string]string{}
func (s *APIServer) StorageGroupsToEncodingVersion() (map[string]unversioned.GroupVersion, error) {
storageVersionMap := map[string]unversioned.GroupVersion{}
if s.DeprecatedStorageVersion != "" {
storageVersionMap[""] = s.DeprecatedStorageVersion
storageVersionMap[""] = unversioned.GroupVersion{Group: apiutil.GetGroup(s.DeprecatedStorageVersion), Version: apiutil.GetVersion(s.DeprecatedStorageVersion)}
}
// First, get the defaults.
gvToMap(s.DefaultStorageVersions, storageVersionMap)
if err := mergeGroupVersionIntoMap(s.DefaultStorageVersions, storageVersionMap); err != nil {
return nil, err
}
// Override any defaults with the user settings.
gvToMap(s.StorageVersions, storageVersionMap)
if err := mergeGroupVersionIntoMap(s.StorageVersions, storageVersionMap); err != nil {
return nil, err
}
return storageVersionMap
return storageVersionMap, nil
}
// AddFlags adds flags for a specific APIServer to the specified FlagSet

View File

@ -23,6 +23,7 @@ import (
"github.com/spf13/pflag"
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/api/unversioned"
"k8s.io/kubernetes/pkg/apis/autoscaling"
"k8s.io/kubernetes/pkg/apis/extensions"
)
@ -32,38 +33,38 @@ func TestGenerateStorageVersionMap(t *testing.T) {
legacyVersion string
storageVersions string
defaultVersions string
expectedMap map[string]string
expectedMap map[string]unversioned.GroupVersion
}{
{
legacyVersion: "v1",
storageVersions: "v1,extensions/v1beta1",
expectedMap: map[string]string{
api.GroupName: "v1",
extensions.GroupName: "extensions/v1beta1",
expectedMap: map[string]unversioned.GroupVersion{
api.GroupName: {Version: "v1"},
extensions.GroupName: {Group: "extensions", Version: "v1beta1"},
},
},
{
legacyVersion: "",
storageVersions: "extensions/v1beta1,v1",
expectedMap: map[string]string{
api.GroupName: "v1",
extensions.GroupName: "extensions/v1beta1",
expectedMap: map[string]unversioned.GroupVersion{
api.GroupName: {Version: "v1"},
extensions.GroupName: {Group: "extensions", Version: "v1beta1"},
},
},
{
legacyVersion: "",
storageVersions: "autoscaling=extensions/v1beta1,v1",
defaultVersions: "extensions/v1beta1,v1,autoscaling/v1",
expectedMap: map[string]string{
api.GroupName: "v1",
autoscaling.GroupName: "extensions/v1beta1",
extensions.GroupName: "extensions/v1beta1",
expectedMap: map[string]unversioned.GroupVersion{
api.GroupName: {Version: "v1"},
autoscaling.GroupName: {Group: "extensions", Version: "v1beta1"},
extensions.GroupName: {Group: "extensions", Version: "v1beta1"},
},
},
{
legacyVersion: "",
storageVersions: "",
expectedMap: map[string]string{},
expectedMap: map[string]unversioned.GroupVersion{},
},
}
for i, test := range testCases {
@ -72,7 +73,10 @@ func TestGenerateStorageVersionMap(t *testing.T) {
StorageVersions: test.storageVersions,
DefaultStorageVersions: test.defaultVersions,
}
output := s.StorageGroupsToGroupVersions()
output, err := s.StorageGroupsToEncodingVersion()
if err != nil {
t.Errorf("%v: unexpected error: %v", i, err)
}
if !reflect.DeepEqual(test.expectedMap, output) {
t.Errorf("%v: unexpected error. expect: %v, got: %v", i, test.expectedMap, output)
}

View File

@ -21,7 +21,6 @@ package app
import (
"crypto/tls"
"fmt"
"net"
"net/url"
"os"
@ -47,9 +46,6 @@ import (
"k8s.io/kubernetes/pkg/master"
"k8s.io/kubernetes/pkg/registry/cachesize"
"k8s.io/kubernetes/pkg/runtime"
"k8s.io/kubernetes/pkg/runtime/serializer/versioning"
"k8s.io/kubernetes/pkg/storage"
etcdstorage "k8s.io/kubernetes/pkg/storage/etcd"
utilnet "k8s.io/kubernetes/pkg/util/net"
)
@ -81,44 +77,6 @@ func verifyClusterIPFlags(s *options.APIServer) {
}
}
// For testing.
type newEtcdFunc func(runtime.NegotiatedSerializer, string, string, etcdstorage.EtcdConfig) (storage.Interface, error)
func newEtcd(ns runtime.NegotiatedSerializer, storageGroupVersionString, memoryGroupVersionString string, etcdConfig etcdstorage.EtcdConfig) (etcdStorage storage.Interface, err error) {
if storageGroupVersionString == "" {
return etcdStorage, fmt.Errorf("storageVersion is required to create a etcd storage")
}
storageVersion, err := unversioned.ParseGroupVersion(storageGroupVersionString)
if err != nil {
return nil, fmt.Errorf("couldn't understand storage version %v: %v", storageGroupVersionString, err)
}
memoryVersion, err := unversioned.ParseGroupVersion(memoryGroupVersionString)
if err != nil {
return nil, fmt.Errorf("couldn't understand memory version %v: %v", memoryGroupVersionString, err)
}
var storageConfig etcdstorage.EtcdStorageConfig
storageConfig.Config = etcdConfig
s, ok := ns.SerializerForMediaType("application/json", nil)
if !ok {
return nil, fmt.Errorf("unable to find serializer for JSON")
}
glog.Infof("constructing etcd storage interface.\n sv: %v\n mv: %v\n", storageVersion, memoryVersion)
encoder := ns.EncoderForVersion(s, storageVersion)
decoder := ns.DecoderToVersion(s, memoryVersion)
if memoryVersion.Group != storageVersion.Group {
// Allow this codec to translate between groups.
if err = versioning.EnableCrossGroupEncoding(encoder, memoryVersion.Group, storageVersion.Group); err != nil {
return nil, fmt.Errorf("error setting up encoder for %v: %v", storageGroupVersionString, err)
}
if err = versioning.EnableCrossGroupDecoding(decoder, storageVersion.Group, memoryVersion.Group); err != nil {
return nil, fmt.Errorf("error setting up decoder for %v: %v", storageGroupVersionString, err)
}
}
storageConfig.Codec = runtime.NewCodec(encoder, decoder)
return storageConfig.NewStorage()
}
// Run runs the specified APIServer. This should never exit.
func Run(s *options.APIServer) error {
verifyClusterIPFlags(s)
@ -228,6 +186,36 @@ func Run(s *options.APIServer) error {
n := s.ServiceClusterIPRange
resourceEncoding := genericapiserver.NewDefaultResourceEncodingConfig()
groupToEncoding, err := s.StorageGroupsToEncodingVersion()
if err != nil {
glog.Fatalf("error getting group encoding: %s", err)
}
for group, storageEncodingVersion := range groupToEncoding {
resourceEncoding.SetVersionEncoding(group, storageEncodingVersion, unversioned.GroupVersion{Group: group, Version: runtime.APIVersionInternal})
}
storageFactory := genericapiserver.NewDefaultStorageFactory(s.EtcdConfig, api.Codecs, resourceEncoding, apiResourceConfigSource)
for _, override := range s.EtcdServersOverrides {
tokens := strings.Split(override, "#")
if len(tokens) != 2 {
glog.Errorf("invalid value of etcd server overrides: %s", override)
continue
}
apiresource := strings.Split(tokens[0], "/")
if len(apiresource) != 2 {
glog.Errorf("invalid resource definition: %s", tokens[0])
continue
}
group := apiresource[0]
resource := apiresource[1]
groupResource := unversioned.GroupResource{Group: group, Resource: resource}
servers := strings.Split(tokens[1], ";")
storageFactory.SetEtcdLocation(groupResource, servers)
}
authenticator, err := authenticator.New(authenticator.AuthenticatorConfig{
BasicAuthFile: s.BasicAuthFile,
ClientCAFile: s.ClientCAFile,
@ -276,13 +264,9 @@ func Run(s *options.APIServer) error {
}
}
storageDestinations := genericapiserver.NewStorageDestinations()
storageVersions := s.StorageGroupsToGroupVersions()
config := &master.Config{
Config: &genericapiserver.Config{
StorageDestinations: storageDestinations,
StorageVersions: storageVersions,
StorageFactory: storageFactory,
ServiceClusterIPRange: &n,
EnableLogsSupport: s.EnableLogsSupport,
EnableUISupport: true,

View File

@ -43,7 +43,6 @@ import (
genericetcd "k8s.io/kubernetes/pkg/registry/generic/etcd"
ipallocator "k8s.io/kubernetes/pkg/registry/service/ipallocator"
"k8s.io/kubernetes/pkg/runtime"
"k8s.io/kubernetes/pkg/storage"
"k8s.io/kubernetes/pkg/ui"
"k8s.io/kubernetes/pkg/util"
"k8s.io/kubernetes/pkg/util/crypto"
@ -55,7 +54,6 @@ import (
"github.com/emicklei/go-restful"
"github.com/emicklei/go-restful/swagger"
"github.com/golang/glog"
"golang.org/x/net/context"
)
const (
@ -64,107 +62,6 @@ const (
globalTimeout = time.Minute
)
// StorageDestinations is a mapping from API group & resource to
// the underlying storage interfaces.
type StorageDestinations struct {
APIGroups map[string]*StorageDestinationsForAPIGroup
}
type StorageDestinationsForAPIGroup struct {
Default storage.Interface
Overrides map[string]storage.Interface
}
func NewStorageDestinations() StorageDestinations {
return StorageDestinations{
APIGroups: map[string]*StorageDestinationsForAPIGroup{},
}
}
// AddAPIGroup replaces 'group' if it's already registered.
func (s *StorageDestinations) AddAPIGroup(group string, defaultStorage storage.Interface) {
glog.Infof("Adding storage destination for group %v", group)
s.APIGroups[group] = &StorageDestinationsForAPIGroup{
Default: defaultStorage,
Overrides: map[string]storage.Interface{},
}
}
func (s *StorageDestinations) AddStorageOverride(group, resource string, override storage.Interface) {
if _, ok := s.APIGroups[group]; !ok {
s.AddAPIGroup(group, nil)
}
if s.APIGroups[group].Overrides == nil {
s.APIGroups[group].Overrides = map[string]storage.Interface{}
}
s.APIGroups[group].Overrides[resource] = override
}
// Get finds the storage destination for the given group and resource. It will
// Fatalf if the group has no storage destination configured.
func (s *StorageDestinations) Get(group, resource string) storage.Interface {
apigroup, ok := s.APIGroups[group]
if !ok {
// TODO: return an error like a normal function. For now,
// Fatalf is better than just logging an error, because this
// condition guarantees future problems and this is a less
// mysterious failure point.
glog.Fatalf("No storage defined for API group: '%s'. Defined groups: %#v", group, s.APIGroups)
return nil
}
if apigroup.Overrides != nil {
if client, exists := apigroup.Overrides[resource]; exists {
return client
}
}
return apigroup.Default
}
// Search is like Get, but can be used to search a list of groups. It tries the
// groups in order (and Fatalf's if none of them exist). The intention is for
// this to be used for resources that move between groups.
func (s *StorageDestinations) Search(groups []string, resource string) storage.Interface {
for _, group := range groups {
apigroup, ok := s.APIGroups[group]
if !ok {
continue
}
if apigroup.Overrides != nil {
if client, exists := apigroup.Overrides[resource]; exists {
return client
}
}
return apigroup.Default
}
// TODO: return an error like a normal function. For now,
// Fatalf is better than just logging an error, because this
// condition guarantees future problems and this is a less
// mysterious failure point.
glog.Fatalf("No storage defined for any of the groups: %v. Defined groups: %#v", groups, s.APIGroups)
return nil
}
// Get all backends for all registered storage destinations.
// Used for getting all instances for health validations.
func (s *StorageDestinations) Backends() []string {
backends := sets.String{}
for _, group := range s.APIGroups {
if group.Default != nil {
for _, backend := range group.Default.Backends(context.TODO()) {
backends.Insert(backend)
}
}
if group.Overrides != nil {
for _, storage := range group.Overrides {
for _, backend := range storage.Backends(context.TODO()) {
backends.Insert(backend)
}
}
}
}
return backends.List()
}
// Info about an API group.
type APIGroupInfo struct {
GroupMeta apimachinery.GroupMeta
@ -199,9 +96,7 @@ type APIGroupInfo struct {
// Config is a structure used to configure a GenericAPIServer.
type Config struct {
StorageDestinations StorageDestinations
// StorageVersions is a map between groups and their storage versions
StorageVersions map[string]string
StorageFactory StorageFactory
// allow downstream consumers to disable the core controller loops
EnableLogsSupport bool
EnableUISupport bool

View File

@ -32,7 +32,6 @@ import (
"k8s.io/kubernetes/pkg/api/rest"
"k8s.io/kubernetes/pkg/api/testapi"
"k8s.io/kubernetes/pkg/api/unversioned"
apiutil "k8s.io/kubernetes/pkg/api/util"
"k8s.io/kubernetes/pkg/apimachinery/registered"
"k8s.io/kubernetes/pkg/apis/extensions"
@ -266,7 +265,7 @@ func getGroupList(server *httptest.Server) (*unversioned.APIGroupList, error) {
}
func TestDiscoveryAtAPIS(t *testing.T) {
master, etcdserver, config, assert := newMaster(t)
master, etcdserver, _, assert := newMaster(t)
defer etcdserver.Terminate(t)
server := httptest.NewServer(master.HandlerContainer.ServeMux)
@ -277,7 +276,6 @@ func TestDiscoveryAtAPIS(t *testing.T) {
assert.Equal(0, len(groupList.Groups))
// Add a Group.
extensionsGroupName := extensions.GroupName
extensionsVersions := []unversioned.GroupVersionForDiscovery{
{
GroupVersion: testapi.Extensions.GroupVersion().String(),
@ -285,11 +283,11 @@ func TestDiscoveryAtAPIS(t *testing.T) {
},
}
extensionsPreferredVersion := unversioned.GroupVersionForDiscovery{
GroupVersion: config.StorageVersions[extensions.GroupName],
Version: apiutil.GetVersion(config.StorageVersions[extensions.GroupName]),
GroupVersion: extensions.GroupName + "/preferred",
Version: "preferred",
}
master.AddAPIGroupForDiscovery(unversioned.APIGroup{
Name: extensionsGroupName,
Name: extensions.GroupName,
Versions: extensionsVersions,
PreferredVersion: extensionsPreferredVersion,
})
@ -301,13 +299,13 @@ func TestDiscoveryAtAPIS(t *testing.T) {
assert.Equal(1, len(groupList.Groups))
groupListGroup := groupList.Groups[0]
assert.Equal(extensionsGroupName, groupListGroup.Name)
assert.Equal(extensions.GroupName, groupListGroup.Name)
assert.Equal(extensionsVersions, groupListGroup.Versions)
assert.Equal(extensionsPreferredVersion, groupListGroup.PreferredVersion)
assert.Equal(master.getServerAddressByClientCIDRs(&http.Request{}), groupListGroup.ServerAddressByClientCIDRs)
// Remove the group.
master.RemoveAPIGroupForDiscovery(extensionsGroupName)
master.RemoveAPIGroupForDiscovery(extensions.GroupName)
groupList, err = getGroupList(server)
if err != nil {
t.Fatalf("unexpected error: %v", err)

View File

@ -0,0 +1,118 @@
/*
Copyright 2016 The Kubernetes Authors All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package genericapiserver
import (
"k8s.io/kubernetes/pkg/api/unversioned"
"k8s.io/kubernetes/pkg/apimachinery/registered"
"k8s.io/kubernetes/pkg/runtime"
)
type ResourceEncodingConfig interface {
// StorageEncoding returns the serialization format for the resource.
// TODO this should actually return a GroupVersionKind since you can logically have multiple "matching" Kinds
// For now, it returns just the GroupVersion for consistency with old behavior
StoragageEncodingFor(unversioned.GroupResource) (unversioned.GroupVersion, error)
// InMemoryEncodingFor returns the groupVersion for the in memory representation the storage should convert to.
InMemoryEncodingFor(unversioned.GroupResource) (unversioned.GroupVersion, error)
}
type DefaultResourceEncodingConfig struct {
Groups map[string]*GroupResourceEncodingConfig
}
type GroupResourceEncodingConfig struct {
DefaultExternalEncoding unversioned.GroupVersion
ExternalResourceEncodings map[string]unversioned.GroupVersion
DefaultInternalEncoding unversioned.GroupVersion
InternalResourceEncodings map[string]unversioned.GroupVersion
}
var _ ResourceEncodingConfig = &DefaultResourceEncodingConfig{}
func NewDefaultResourceEncodingConfig() *DefaultResourceEncodingConfig {
return &DefaultResourceEncodingConfig{Groups: map[string]*GroupResourceEncodingConfig{}}
}
func newGroupResourceEncodingConfig(defaultEncoding, defaultInternalVersion unversioned.GroupVersion) *GroupResourceEncodingConfig {
return &GroupResourceEncodingConfig{
DefaultExternalEncoding: defaultEncoding, ExternalResourceEncodings: map[string]unversioned.GroupVersion{},
DefaultInternalEncoding: defaultInternalVersion, InternalResourceEncodings: map[string]unversioned.GroupVersion{},
}
}
func (o *DefaultResourceEncodingConfig) SetVersionEncoding(group string, externalEncodingVersion, internalVersion unversioned.GroupVersion) {
_, groupExists := o.Groups[group]
if !groupExists {
o.Groups[group] = newGroupResourceEncodingConfig(externalEncodingVersion, internalVersion)
}
o.Groups[group].DefaultExternalEncoding = externalEncodingVersion
o.Groups[group].DefaultInternalEncoding = internalVersion
}
func (o *DefaultResourceEncodingConfig) SetResourceEncoding(resourceBeingStored unversioned.GroupResource, externalEncodingVersion, internalVersion unversioned.GroupVersion) {
group := resourceBeingStored.Group
_, groupExists := o.Groups[group]
if !groupExists {
o.Groups[group] = newGroupResourceEncodingConfig(externalEncodingVersion, internalVersion)
}
o.Groups[group].ExternalResourceEncodings[resourceBeingStored.Resource] = externalEncodingVersion
o.Groups[group].InternalResourceEncodings[resourceBeingStored.Resource] = internalVersion
}
func (o *DefaultResourceEncodingConfig) StoragageEncodingFor(resource unversioned.GroupResource) (unversioned.GroupVersion, error) {
groupMeta, err := registered.Group(resource.Group)
if err != nil {
return unversioned.GroupVersion{}, err
}
groupEncoding, groupExists := o.Groups[resource.Group]
if !groupExists {
// return the most preferred external version for the group
return groupMeta.GroupVersion, nil
}
resourceOverride, resourceExists := groupEncoding.ExternalResourceEncodings[resource.Resource]
if !resourceExists {
return groupEncoding.DefaultExternalEncoding, nil
}
return resourceOverride, nil
}
func (o *DefaultResourceEncodingConfig) InMemoryEncodingFor(resource unversioned.GroupResource) (unversioned.GroupVersion, error) {
if _, err := registered.Group(resource.Group); err != nil {
return unversioned.GroupVersion{}, err
}
groupEncoding, groupExists := o.Groups[resource.Group]
if !groupExists {
return unversioned.GroupVersion{Group: resource.Group, Version: runtime.APIVersionInternal}, nil
}
resourceOverride, resourceExists := groupEncoding.InternalResourceEncodings[resource.Resource]
if !resourceExists {
return groupEncoding.DefaultInternalEncoding, nil
}
return resourceOverride, nil
}

View File

@ -0,0 +1,221 @@
/*
Copyright 2016 The Kubernetes Authors All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package genericapiserver
import (
"fmt"
"k8s.io/kubernetes/pkg/api/unversioned"
"k8s.io/kubernetes/pkg/runtime"
"k8s.io/kubernetes/pkg/runtime/serializer/versioning"
"k8s.io/kubernetes/pkg/storage"
etcdstorage "k8s.io/kubernetes/pkg/storage/etcd"
"k8s.io/kubernetes/pkg/util/sets"
"github.com/golang/glog"
)
// StorageFactory is the interface to locate the storage for a given GroupResource
type StorageFactory interface {
// New finds the storage destination for the given group and resource. It will
// return an error if the group has no storage destination configured.
New(groupResource unversioned.GroupResource) (storage.Interface, error)
// Backends gets all backends for all registered storage destinations.
// Used for getting all instances for health validations.
Backends() []string
}
// DefaultStorageFactory takes a GroupResource and returns back its storage interface. This result includes:
// 1. Merged etcd config, including: auth, server locations, prefixes
// 2. Resource encodings for storage: group,version,kind to store as
// 3. Cohabitating default: some resources like hpa are exposed through multiple APIs. They must agree on 1 and 2
type DefaultStorageFactory struct {
// DefaultEtcdConfig describes how to connect to etcd in general. It's authentication information will be used for
// every storage.Interface returned.
DefaultEtcdConfig etcdstorage.EtcdConfig
Overrides map[unversioned.GroupResource]groupResourceOverrides
// DefaultSerializer is used to create encoders and decoders for the storage.Interface.
DefaultSerializer runtime.NegotiatedSerializer
// ResourceEncodingConfig describes how to encode a particular GroupVersionResource
ResourceEncodingConfig ResourceEncodingConfig
// APIResourceConfigSource indicates whether the *storage* is enabled, NOT the API
// This is discrete from resource enablement because those are separate concerns. How it is surfaced to the user via flags
// or config is up to whoever is building this.
APIResourceConfigSource APIResourceConfigSource
// newEtcdFn exists to be overwritten for unit testing. You should never set this in a normal world.
newEtcdFn func(ns runtime.NegotiatedSerializer, storageVersion, memoryVersion unversioned.GroupVersion, etcdConfig etcdstorage.EtcdConfig) (etcdStorage storage.Interface, err error)
}
type groupResourceOverrides struct {
// etcdLocation contains the list of "special" locations that are used for particular GroupResources
// These are merged on top of the default DefaultEtcdConfig when requesting the storage.Interface for a given GroupResource
etcdLocation []string
// etcdPrefix contains the list of "special" prefixes for a GroupResource. Resource=* means for the entire group
etcdPrefix string
// serializer contains the list of "special" serializers for a GroupResource. Resource=* means for the entire group
serializer runtime.NegotiatedSerializer
// cohabitatingResources keeps track of which resources must be stored together. This happens when we have multiple ways
// of exposing one set of concepts. autoscaling.HPA and extensions.HPA as a for instance
// The order of the slice matters! It is the priority order of lookup for finding a storage location
cohabitatingResources []unversioned.GroupResource
}
var _ StorageFactory = &DefaultStorageFactory{}
const AllResources = "*"
func NewDefaultStorageFactory(defaultEtcdConfig etcdstorage.EtcdConfig, defaultSerializer runtime.NegotiatedSerializer, resourceEncodingConfig ResourceEncodingConfig, resourceConfig APIResourceConfigSource) *DefaultStorageFactory {
return &DefaultStorageFactory{
DefaultEtcdConfig: defaultEtcdConfig,
Overrides: map[unversioned.GroupResource]groupResourceOverrides{},
DefaultSerializer: defaultSerializer,
ResourceEncodingConfig: resourceEncodingConfig,
APIResourceConfigSource: resourceConfig,
newEtcdFn: newEtcd,
}
}
func (s *DefaultStorageFactory) SetEtcdLocation(groupResource unversioned.GroupResource, location []string) {
overrides := s.Overrides[groupResource]
overrides.etcdLocation = location
s.Overrides[groupResource] = overrides
}
func (s *DefaultStorageFactory) SetEtcdPrefix(groupResource unversioned.GroupResource, prefix string) {
overrides := s.Overrides[groupResource]
overrides.etcdPrefix = prefix
s.Overrides[groupResource] = overrides
}
func (s *DefaultStorageFactory) SetSerializer(groupResource unversioned.GroupResource, serializer runtime.NegotiatedSerializer) {
overrides := s.Overrides[groupResource]
overrides.serializer = serializer
s.Overrides[groupResource] = overrides
}
// AddCohabitatingResources links resources together the order of the slice matters! its the priority order of lookup for finding a storage location
func (s *DefaultStorageFactory) AddCohabitatingResources(groupResources ...unversioned.GroupResource) {
for _, groupResource := range groupResources {
overrides := s.Overrides[groupResource]
overrides.cohabitatingResources = groupResources
s.Overrides[groupResource] = overrides
}
}
func getAllResourcesAlias(resource unversioned.GroupResource) unversioned.GroupResource {
return unversioned.GroupResource{Group: resource.Group, Resource: AllResources}
}
func (s *DefaultStorageFactory) getStorageGroupResource(groupResource unversioned.GroupResource) unversioned.GroupResource {
for _, potentialStorageResource := range s.Overrides[groupResource].cohabitatingResources {
if s.APIResourceConfigSource.AnyVersionOfResourceEnabled(potentialStorageResource) {
return potentialStorageResource
}
}
return groupResource
}
// New finds the storage destination for the given group and resource. It will
// return an error if the group has no storage destination configured.
func (s *DefaultStorageFactory) New(groupResource unversioned.GroupResource) (storage.Interface, error) {
chosenStorageResource := s.getStorageGroupResource(groupResource)
groupOverride := s.Overrides[getAllResourcesAlias(chosenStorageResource)]
exactResourceOverride := s.Overrides[chosenStorageResource]
overriddenEtcdLocations := []string{}
if len(groupOverride.etcdLocation) > 0 {
overriddenEtcdLocations = groupOverride.etcdLocation
}
if len(exactResourceOverride.etcdLocation) > 0 {
overriddenEtcdLocations = exactResourceOverride.etcdLocation
}
etcdPrefix := s.DefaultEtcdConfig.Prefix
if len(groupOverride.etcdPrefix) > 0 {
etcdPrefix = groupOverride.etcdPrefix
}
if len(exactResourceOverride.etcdPrefix) > 0 {
etcdPrefix = exactResourceOverride.etcdPrefix
}
etcdSerializer := s.DefaultSerializer
if groupOverride.serializer != nil {
etcdSerializer = groupOverride.serializer
}
if exactResourceOverride.serializer != nil {
etcdSerializer = exactResourceOverride.serializer
}
// operate on copy
etcdConfig := s.DefaultEtcdConfig
etcdConfig.Prefix = etcdPrefix
if len(overriddenEtcdLocations) > 0 {
etcdConfig.ServerList = overriddenEtcdLocations
}
storageEncodingVersion, err := s.ResourceEncodingConfig.StoragageEncodingFor(chosenStorageResource)
if err != nil {
return nil, err
}
internalVersion, err := s.ResourceEncodingConfig.InMemoryEncodingFor(groupResource)
if err != nil {
return nil, err
}
glog.V(3).Infof("storing %v in %v, reading as %v from %v", groupResource, storageEncodingVersion, internalVersion, etcdConfig)
return s.newEtcdFn(etcdSerializer, storageEncodingVersion, internalVersion, etcdConfig)
}
func newEtcd(ns runtime.NegotiatedSerializer, storageVersion, memoryVersion unversioned.GroupVersion, etcdConfig etcdstorage.EtcdConfig) (etcdStorage storage.Interface, err error) {
var storageConfig etcdstorage.EtcdStorageConfig
storageConfig.Config = etcdConfig
s, ok := ns.SerializerForMediaType("application/json", nil)
if !ok {
return nil, fmt.Errorf("unable to find serializer for JSON")
}
encoder := ns.EncoderForVersion(s, storageVersion)
decoder := ns.DecoderToVersion(s, memoryVersion)
if memoryVersion.Group != storageVersion.Group {
// Allow this codec to translate between groups.
if err = versioning.EnableCrossGroupEncoding(encoder, memoryVersion.Group, storageVersion.Group); err != nil {
return nil, fmt.Errorf("error setting up encoder from %v to %v: %v", memoryVersion, storageVersion, err)
}
if err = versioning.EnableCrossGroupDecoding(decoder, storageVersion.Group, memoryVersion.Group); err != nil {
return nil, fmt.Errorf("error setting up decoder from %v to %v: %v", storageVersion, memoryVersion, err)
}
}
storageConfig.Codec = runtime.NewCodec(encoder, decoder)
return storageConfig.NewStorage()
}
// Get all backends for all registered storage destinations.
// Used for getting all instances for health validations.
func (s *DefaultStorageFactory) Backends() []string {
backends := sets.NewString(s.DefaultEtcdConfig.ServerList...)
for _, overrides := range s.Overrides {
backends.Insert(overrides.etcdLocation...)
}
return backends.List()
}

View File

@ -0,0 +1,88 @@
/*
Copyright 2016 The Kubernetes Authors All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package genericapiserver
import (
"reflect"
"testing"
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/api/unversioned"
"k8s.io/kubernetes/pkg/apis/extensions"
"k8s.io/kubernetes/pkg/runtime"
"k8s.io/kubernetes/pkg/storage"
etcdstorage "k8s.io/kubernetes/pkg/storage/etcd"
)
func TestUpdateEtcdOverrides(t *testing.T) {
testCases := []struct {
resource unversioned.GroupResource
servers []string
}{
{
resource: unversioned.GroupResource{Group: api.GroupName, Resource: "resource"},
servers: []string{"http://127.0.0.1:10000"},
},
{
resource: unversioned.GroupResource{Group: api.GroupName, Resource: "resource"},
servers: []string{"http://127.0.0.1:10000", "http://127.0.0.1:20000"},
},
{
resource: unversioned.GroupResource{Group: extensions.GroupName, Resource: "resource"},
servers: []string{"http://127.0.0.1:10000"},
},
}
defaultEtcdLocation := []string{"http://127.0.0.1"}
for i, test := range testCases {
actualEtcdConfig := etcdstorage.EtcdConfig{}
newEtcdFn := func(ns runtime.NegotiatedSerializer, storageVersion, memoryVersion unversioned.GroupVersion, etcdConfig etcdstorage.EtcdConfig) (etcdStorage storage.Interface, err error) {
actualEtcdConfig = etcdConfig
return nil, nil
}
defaultEtcdConfig := etcdstorage.EtcdConfig{
Prefix: DefaultEtcdPathPrefix,
ServerList: defaultEtcdLocation,
}
storageFactory := NewDefaultStorageFactory(defaultEtcdConfig, api.Codecs, NewDefaultResourceEncodingConfig(), NewResourceConfig())
storageFactory.newEtcdFn = newEtcdFn
storageFactory.SetEtcdLocation(test.resource, test.servers)
var err error
_, err = storageFactory.New(test.resource)
if err != nil {
t.Errorf("%d: unexpected error %v", i, err)
continue
}
if !reflect.DeepEqual(actualEtcdConfig.ServerList, test.servers) {
t.Errorf("%d: expected %v, got %v", i, test.servers, actualEtcdConfig.ServerList)
continue
}
_, err = storageFactory.New(unversioned.GroupResource{Group: api.GroupName, Resource: "unlikely"})
if err != nil {
t.Errorf("%d: unexpected error %v", i, err)
continue
}
if !reflect.DeepEqual(actualEtcdConfig.ServerList, defaultEtcdLocation) {
t.Errorf("%d: expected %v, got %v", i, defaultEtcdLocation, actualEtcdConfig.ServerList)
continue
}
}
}

View File

@ -245,21 +245,15 @@ func (m *Master) InstallAPIs(c *Config) {
// Install extensions unless disabled.
if c.APIResourceConfigSource.AnyResourcesForVersionEnabled(extensionsapiv1beta1.SchemeGroupVersion) {
m.thirdPartyStorage = c.StorageDestinations.APIGroups[extensions.GroupName].Default
var err error
m.thirdPartyStorage, err = c.StorageFactory.New(extensions.Resource("thirdpartyresources"))
if err != nil {
glog.Fatalf("Error getting third party storage: %v", err)
}
m.thirdPartyResources = map[string]thirdPartyEntry{}
extensionResources := m.getExtensionResources(c)
extensionsGroupMeta := registered.GroupOrDie(extensions.GroupName)
// Update the preferred version as per StorageVersions in the config.
storageVersion, found := c.StorageVersions[extensionsGroupMeta.GroupVersion.Group]
if !found {
glog.Fatalf("Couldn't find storage version of group %v", extensionsGroupMeta.GroupVersion.Group)
}
preferedGroupVersion, err := unversioned.ParseGroupVersion(storageVersion)
if err != nil {
glog.Fatalf("Error in parsing group version %s: %v", storageVersion, err)
}
extensionsGroupMeta.GroupVersion = preferedGroupVersion
apiGroupInfo := genericapiserver.APIGroupInfo{
GroupMeta: *extensionsGroupMeta,
@ -390,13 +384,8 @@ func (m *Master) InstallAPIs(c *Config) {
}
func (m *Master) initV1ResourcesStorage(c *Config) {
dbClient := func(resource string) storage.Interface { return c.StorageDestinations.Get("", resource) }
restOptions := func(resource string) generic.RESTOptions {
return generic.RESTOptions{
Storage: dbClient(resource),
Decorator: m.StorageDecorator(),
DeleteCollectionWorkers: m.deleteCollectionWorkers,
}
return m.GetRESTOptionsOrDie(c, api.Resource(resource))
}
podTemplateStorage := podtemplateetcd.NewREST(restOptions("podTemplates"))
@ -426,8 +415,8 @@ func (m *Master) initV1ResourcesStorage(c *Config) {
m.ProxyTransport,
)
serviceStorage, serviceStatusStorage := serviceetcd.NewREST(restOptions("services"))
m.serviceRegistry = service.NewRegistry(serviceStorage)
serviceRESTStorage, serviceStatusStorage := serviceetcd.NewREST(restOptions("services"))
m.serviceRegistry = service.NewRegistry(serviceRESTStorage)
var serviceClusterIPRegistry service.RangeRegistry
serviceClusterIPRange := m.ServiceClusterIPRange
@ -435,9 +424,16 @@ func (m *Master) initV1ResourcesStorage(c *Config) {
glog.Fatalf("service clusterIPRange is nil")
return
}
serviceStorage, err := c.StorageFactory.New(api.Resource("services"))
if err != nil {
glog.Fatal(err.Error())
}
serviceClusterIPAllocator := ipallocator.NewAllocatorCIDRRange(serviceClusterIPRange, func(max int, rangeSpec string) allocator.Interface {
mem := allocator.NewAllocationMap(max, rangeSpec)
etcd := etcdallocator.NewEtcd(mem, "/ranges/serviceips", api.Resource("serviceipallocations"), dbClient("services"))
// TODO etcdallocator package to return a storage interface via the storageFactory
etcd := etcdallocator.NewEtcd(mem, "/ranges/serviceips", api.Resource("serviceipallocations"), serviceStorage)
serviceClusterIPRegistry = etcd
return etcd
})
@ -446,7 +442,8 @@ func (m *Master) initV1ResourcesStorage(c *Config) {
var serviceNodePortRegistry service.RangeRegistry
serviceNodePortAllocator := portallocator.NewPortAllocatorCustom(m.ServiceNodePortRange, func(max int, rangeSpec string) allocator.Interface {
mem := allocator.NewAllocationMap(max, rangeSpec)
etcd := etcdallocator.NewEtcd(mem, "/ranges/servicenodeports", api.Resource("servicenodeportallocations"), dbClient("services"))
// TODO etcdallocator package to return a storage interface via the storageFactory
etcd := etcdallocator.NewEtcd(mem, "/ranges/servicenodeports", api.Resource("servicenodeportallocations"), serviceStorage)
serviceNodePortRegistry = etcd
return etcd
})
@ -541,7 +538,7 @@ func (m *Master) getServersToValidate(c *Config) map[string]apiserver.Server {
"scheduler": {Addr: "127.0.0.1", Port: ports.SchedulerPort, Path: "/healthz"},
}
for ix, machine := range c.StorageDestinations.Backends() {
for ix, machine := range c.StorageFactory.Backends() {
etcdUrl, err := url.Parse(machine)
if err != nil {
glog.Errorf("Failed to parse etcd url for validation: %v", err)
@ -726,24 +723,36 @@ func (m *Master) thirdpartyapi(group, kind, version string) *apiserver.APIGroupV
}
}
func (m *Master) GetRESTOptionsOrDie(c *Config, resource unversioned.GroupResource) generic.RESTOptions {
storage, err := c.StorageFactory.New(resource)
if err != nil {
glog.Fatalf("Unable to find storage destination for %v, due to %v", resource, err.Error())
}
return generic.RESTOptions{
Storage: storage,
Decorator: m.StorageDecorator(),
DeleteCollectionWorkers: m.deleteCollectionWorkers,
}
}
// getExperimentalResources returns the resources for extensions api
func (m *Master) getExtensionResources(c *Config) map[string]rest.Storage {
restOptions := func(resource string) generic.RESTOptions {
return generic.RESTOptions{
Storage: c.StorageDestinations.Get(extensions.GroupName, resource),
Decorator: m.StorageDecorator(),
DeleteCollectionWorkers: m.deleteCollectionWorkers,
}
return m.GetRESTOptionsOrDie(c, extensions.Resource(resource))
}
// TODO update when we support more than one version of this group
version := extensionsapiv1beta1.SchemeGroupVersion
storage := map[string]rest.Storage{}
if c.APIResourceConfigSource.ResourceEnabled(version.WithResource("horizontalpodautoscalers")) {
m.constructHPAResources(c, storage)
controllerStorage := expcontrolleretcd.NewStorage(
generic.RESTOptions{Storage: c.StorageDestinations.Get("", "replicationControllers"), Decorator: m.StorageDecorator(), DeleteCollectionWorkers: m.deleteCollectionWorkers})
hpaStorage, hpaStatusStorage := horizontalpodautoscaleretcd.NewREST(restOptions("horizontalpodautoscalers"))
storage["horizontalpodautoscalers"] = hpaStorage
storage["horizontalpodautoscalers/status"] = hpaStatusStorage
controllerStorage := expcontrolleretcd.NewStorage(m.GetRESTOptionsOrDie(c, api.Resource("replicationControllers")))
storage["replicationcontrollers"] = controllerStorage.ReplicationController
storage["replicationcontrollers/scale"] = controllerStorage.Scale
}
@ -776,7 +785,9 @@ func (m *Master) getExtensionResources(c *Config) map[string]rest.Storage {
storage["deployments/scale"] = deploymentStorage.Scale
}
if c.APIResourceConfigSource.ResourceEnabled(version.WithResource("jobs")) {
m.constructJobResources(c, storage)
jobsStorage, jobsStatusStorage := jobetcd.NewREST(restOptions("jobs"))
storage["jobs"] = jobsStorage
storage["jobs/status"] = jobsStatusStorage
}
ingressStorage, ingressStatusStorage := ingressetcd.NewREST(restOptions("ingresses"))
if c.APIResourceConfigSource.ResourceEnabled(version.WithResource("ingresses")) {
@ -797,25 +808,6 @@ func (m *Master) getExtensionResources(c *Config) map[string]rest.Storage {
return storage
}
// constructHPAResources makes HPA resources and adds them to the storage map.
// They're installed in both autoscaling and extensions. It's assumed that
// you've already done the check that they should be on.
func (m *Master) constructHPAResources(c *Config, restStorage map[string]rest.Storage) {
// Note that hpa's storage settings are changed by changing the autoscaling
// group. Clearly we want all hpas to be stored in the same place no
// matter where they're accessed from.
restOptions := func(resource string) generic.RESTOptions {
return generic.RESTOptions{
Storage: c.StorageDestinations.Search([]string{autoscaling.GroupName, extensions.GroupName}, resource),
Decorator: m.StorageDecorator(),
DeleteCollectionWorkers: m.deleteCollectionWorkers,
}
}
autoscalerStorage, autoscalerStatusStorage := horizontalpodautoscaleretcd.NewREST(restOptions("horizontalpodautoscalers"))
restStorage["horizontalpodautoscalers"] = autoscalerStorage
restStorage["horizontalpodautoscalers/status"] = autoscalerStatusStorage
}
// getAutoscalingResources returns the resources for autoscaling api
func (m *Master) getAutoscalingResources(c *Config) map[string]rest.Storage {
// TODO update when we support more than one version of this group
@ -823,30 +815,13 @@ func (m *Master) getAutoscalingResources(c *Config) map[string]rest.Storage {
storage := map[string]rest.Storage{}
if c.APIResourceConfigSource.ResourceEnabled(version.WithResource("horizontalpodautoscalers")) {
m.constructHPAResources(c, storage)
hpaStorage, hpaStatusStorage := horizontalpodautoscaleretcd.NewREST(m.GetRESTOptionsOrDie(c, autoscaling.Resource("horizontalpodautoscalers")))
storage["horizontalpodautoscalers"] = hpaStorage
storage["horizontalpodautoscalers/status"] = hpaStatusStorage
}
return storage
}
// constructJobResources makes Job resources and adds them to the storage map.
// They're installed in both batch and extensions. It's assumed that you've
// already done the check that they should be on.
func (m *Master) constructJobResources(c *Config, restStorage map[string]rest.Storage) {
// Note that job's storage settings are changed by changing the batch
// group. Clearly we want all jobs to be stored in the same place no
// matter where they're accessed from.
restOptions := func(resource string) generic.RESTOptions {
return generic.RESTOptions{
Storage: c.StorageDestinations.Search([]string{batch.GroupName, extensions.GroupName}, resource),
Decorator: m.StorageDecorator(),
DeleteCollectionWorkers: m.deleteCollectionWorkers,
}
}
jobStorage, jobStatusStorage := jobetcd.NewREST(restOptions("jobs"))
restStorage["jobs"] = jobStorage
restStorage["jobs/status"] = jobStatusStorage
}
// getBatchResources returns the resources for batch api
func (m *Master) getBatchResources(c *Config) map[string]rest.Storage {
// TODO update when we support more than one version of this group
@ -854,26 +829,21 @@ func (m *Master) getBatchResources(c *Config) map[string]rest.Storage {
storage := map[string]rest.Storage{}
if c.APIResourceConfigSource.ResourceEnabled(version.WithResource("jobs")) {
m.constructJobResources(c, storage)
jobsStorage, jobsStatusStorage := jobetcd.NewREST(m.GetRESTOptionsOrDie(c, batch.Resource("jobs")))
storage["jobs"] = jobsStorage
storage["jobs/status"] = jobsStatusStorage
}
return storage
}
// getPetSetResources returns the resources for batch api
// getPetSetResources returns the resources for apps api
func (m *Master) getAppsResources(c *Config) map[string]rest.Storage {
// TODO update when we support more than one version of this group
version := appsapi.SchemeGroupVersion
storage := map[string]rest.Storage{}
if c.APIResourceConfigSource.ResourceEnabled(version.WithResource("petsets")) {
restOptions := func(resource string) generic.RESTOptions {
return generic.RESTOptions{
Storage: c.StorageDestinations.Get(apps.GroupName, resource),
Decorator: m.StorageDecorator(),
DeleteCollectionWorkers: m.deleteCollectionWorkers,
}
}
petsetStorage, petsetStatusStorage := petsetetcd.NewREST(restOptions("petsets"))
petsetStorage, petsetStatusStorage := petsetetcd.NewREST(m.GetRESTOptionsOrDie(c, apps.Resource("petsets")))
storage["petsets"] = petsetStorage
storage["petsets/status"] = petsetStatusStorage
}

View File

@ -36,8 +36,8 @@ import (
utilnet "k8s.io/kubernetes/pkg/util/net"
"k8s.io/kubernetes/pkg/util/sets"
apiutil "k8s.io/kubernetes/pkg/api/util"
apiv1 "k8s.io/kubernetes/pkg/api/v1"
"k8s.io/kubernetes/pkg/apimachinery/registered"
"k8s.io/kubernetes/pkg/apis/apps"
appsapi "k8s.io/kubernetes/pkg/apis/apps"
"k8s.io/kubernetes/pkg/apis/autoscaling"
@ -72,26 +72,27 @@ func setUp(t *testing.T) (*Master, *etcdtesting.EtcdTestServer, Config, *assert.
config := Config{
Config: &genericapiserver.Config{},
}
storageVersions := make(map[string]string)
storageDestinations := genericapiserver.NewStorageDestinations()
storageDestinations.AddAPIGroup(
api.GroupName, etcdstorage.NewEtcdStorage(server.Client, testapi.Default.Codec(), etcdtest.PathPrefix(), false, etcdtest.DeserializationCacheSize))
storageDestinations.AddAPIGroup(
autoscaling.GroupName, etcdstorage.NewEtcdStorage(server.Client, testapi.Autoscaling.Codec(), etcdtest.PathPrefix(), false, etcdtest.DeserializationCacheSize))
storageDestinations.AddAPIGroup(
batch.GroupName, etcdstorage.NewEtcdStorage(server.Client, testapi.Batch.Codec(), etcdtest.PathPrefix(), false, etcdtest.DeserializationCacheSize))
storageDestinations.AddAPIGroup(
apps.GroupName, etcdstorage.NewEtcdStorage(server.Client, testapi.Apps.Codec(), etcdtest.PathPrefix(), false, etcdtest.DeserializationCacheSize))
storageDestinations.AddAPIGroup(
extensions.GroupName, etcdstorage.NewEtcdStorage(server.Client, testapi.Extensions.Codec(), etcdtest.PathPrefix(), false, etcdtest.DeserializationCacheSize))
config.StorageDestinations = storageDestinations
storageVersions[api.GroupName] = testapi.Default.GroupVersion().String()
storageVersions[autoscaling.GroupName] = testapi.Autoscaling.GroupVersion().String()
storageVersions[batch.GroupName] = testapi.Batch.GroupVersion().String()
storageVersions[apps.GroupName] = testapi.Apps.GroupVersion().String()
storageVersions[extensions.GroupName] = testapi.Extensions.GroupVersion().String()
config.StorageVersions = storageVersions
etcdConfig := etcdstorage.EtcdConfig{
Prefix: etcdtest.PathPrefix(),
CAFile: server.CAFile,
KeyFile: server.KeyFile,
CertFile: server.CertFile,
}
for _, url := range server.ClientURLs {
etcdConfig.ServerList = append(etcdConfig.ServerList, url.String())
}
resourceEncoding := genericapiserver.NewDefaultResourceEncodingConfig()
resourceEncoding.SetVersionEncoding(api.GroupName, *testapi.Default.GroupVersion(), unversioned.GroupVersion{Group: api.GroupName, Version: runtime.APIVersionInternal})
resourceEncoding.SetVersionEncoding(autoscaling.GroupName, *testapi.Autoscaling.GroupVersion(), unversioned.GroupVersion{Group: autoscaling.GroupName, Version: runtime.APIVersionInternal})
resourceEncoding.SetVersionEncoding(batch.GroupName, *testapi.Batch.GroupVersion(), unversioned.GroupVersion{Group: batch.GroupName, Version: runtime.APIVersionInternal})
resourceEncoding.SetVersionEncoding(apps.GroupName, *testapi.Apps.GroupVersion(), unversioned.GroupVersion{Group: apps.GroupName, Version: runtime.APIVersionInternal})
resourceEncoding.SetVersionEncoding(extensions.GroupName, *testapi.Extensions.GroupVersion(), unversioned.GroupVersion{Group: extensions.GroupName, Version: runtime.APIVersionInternal})
storageFactory := genericapiserver.NewDefaultStorageFactory(etcdConfig, api.Codecs, resourceEncoding, DefaultAPIResourceConfigSource())
config.StorageFactory = storageFactory
config.APIResourceConfigSource = DefaultAPIResourceConfigSource()
config.PublicAddress = net.ParseIP("192.168.10.4")
config.Serializer = api.Codecs
config.KubeletClient = client.FakeKubeletClient{}
@ -398,7 +399,7 @@ func TestAPIVersionOfDiscoveryEndpoints(t *testing.T) {
}
func TestDiscoveryAtAPIS(t *testing.T) {
master, etcdserver, config, assert := newLimitedMaster(t)
master, etcdserver, _, assert := newLimitedMaster(t)
defer etcdserver.Terminate(t)
server := httptest.NewServer(master.HandlerContainer.ServeMux)
@ -444,20 +445,20 @@ func TestDiscoveryAtAPIS(t *testing.T) {
}
expectPreferredVersion := map[string]unversioned.GroupVersionForDiscovery{
autoscaling.GroupName: {
GroupVersion: config.StorageVersions[autoscaling.GroupName],
Version: apiutil.GetVersion(config.StorageVersions[autoscaling.GroupName]),
GroupVersion: registered.GroupOrDie(autoscaling.GroupName).GroupVersion.String(),
Version: registered.GroupOrDie(autoscaling.GroupName).GroupVersion.Version,
},
batch.GroupName: {
GroupVersion: config.StorageVersions[batch.GroupName],
Version: apiutil.GetVersion(config.StorageVersions[batch.GroupName]),
GroupVersion: registered.GroupOrDie(batch.GroupName).GroupVersion.String(),
Version: registered.GroupOrDie(batch.GroupName).GroupVersion.Version,
},
apps.GroupName: {
GroupVersion: config.StorageVersions[apps.GroupName],
Version: apiutil.GetVersion(config.StorageVersions[apps.GroupName]),
GroupVersion: registered.GroupOrDie(apps.GroupName).GroupVersion.String(),
Version: registered.GroupOrDie(apps.GroupName).GroupVersion.Version,
},
extensions.GroupName: {
GroupVersion: config.StorageVersions[extensions.GroupName],
Version: apiutil.GetVersion(config.StorageVersions[extensions.GroupName]),
GroupVersion: registered.GroupOrDie(extensions.GroupName).GroupVersion.String(),
Version: registered.GroupOrDie(extensions.GroupName).GroupVersion.Version,
},
}

View File

@ -231,6 +231,7 @@ func NewEtcdTestClientServer(t *testing.T) *EtcdTestServer {
t.Fatalf("Failed to start etcd server error=%v", err)
return nil
}
cfg := etcd.Config{
Endpoints: server.ClientURLs.StringSlice(),
Transport: newHttpTransport(t, server.CertFile, server.KeyFile, server.CAFile),

View File

@ -29,6 +29,7 @@ import (
"github.com/golang/glog"
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/api/testapi"
"k8s.io/kubernetes/pkg/api/unversioned"
"k8s.io/kubernetes/pkg/apis/apps"
"k8s.io/kubernetes/pkg/apis/autoscaling"
"k8s.io/kubernetes/pkg/apis/batch"
@ -149,31 +150,33 @@ func startMasterOrDie(masterConfig *master.Config) (*master.Master, *httptest.Se
// Returns a basic master config.
func NewMasterConfig() *master.Config {
etcdClient := NewEtcdClient()
storageVersions := make(map[string]string)
etcdConfig := etcdstorage.EtcdConfig{
ServerList: []string{"http://127.0.0.1:4001"},
Prefix: etcdtest.PathPrefix(),
}
etcdStorage := etcdstorage.NewEtcdStorage(etcdClient, testapi.Default.Codec(), etcdtest.PathPrefix(), false, etcdtest.DeserializationCacheSize)
storageVersions[api.GroupName] = testapi.Default.GroupVersion().String()
autoscalingEtcdStorage := NewAutoscalingEtcdStorage(etcdClient)
storageVersions[autoscaling.GroupName] = testapi.Autoscaling.GroupVersion().String()
batchEtcdStorage := NewBatchEtcdStorage(etcdClient)
storageVersions[batch.GroupName] = testapi.Batch.GroupVersion().String()
appsEtcdStorage := NewAppsEtcdStorage(etcdClient)
storageVersions[apps.GroupName] = testapi.Apps.GroupVersion().String()
expEtcdStorage := NewExtensionsEtcdStorage(etcdClient)
storageVersions[extensions.GroupName] = testapi.Extensions.GroupVersion().String()
negotiatedSerializer := NewSingleContentTypeSerializer(api.Scheme, testapi.Default.Codec(), "application/json")
storageDestinations := genericapiserver.NewStorageDestinations()
storageDestinations.AddAPIGroup(api.GroupName, etcdStorage)
storageDestinations.AddAPIGroup(autoscaling.GroupName, autoscalingEtcdStorage)
storageDestinations.AddAPIGroup(batch.GroupName, batchEtcdStorage)
storageDestinations.AddAPIGroup(apps.GroupName, appsEtcdStorage)
storageDestinations.AddAPIGroup(extensions.GroupName, expEtcdStorage)
storageFactory := genericapiserver.NewDefaultStorageFactory(etcdConfig, negotiatedSerializer, genericapiserver.NewDefaultResourceEncodingConfig(), master.DefaultAPIResourceConfigSource())
storageFactory.SetSerializer(
unversioned.GroupResource{Group: api.GroupName, Resource: genericapiserver.AllResources},
NewSingleContentTypeSerializer(api.Scheme, testapi.Default.Codec(), "application/json"))
storageFactory.SetSerializer(
unversioned.GroupResource{Group: autoscaling.GroupName, Resource: genericapiserver.AllResources},
NewSingleContentTypeSerializer(api.Scheme, testapi.Autoscaling.Codec(), "application/json"))
storageFactory.SetSerializer(
unversioned.GroupResource{Group: batch.GroupName, Resource: genericapiserver.AllResources},
NewSingleContentTypeSerializer(api.Scheme, testapi.Batch.Codec(), "application/json"))
storageFactory.SetSerializer(
unversioned.GroupResource{Group: apps.GroupName, Resource: genericapiserver.AllResources},
NewSingleContentTypeSerializer(api.Scheme, testapi.Apps.Codec(), "application/json"))
storageFactory.SetSerializer(
unversioned.GroupResource{Group: extensions.GroupName, Resource: genericapiserver.AllResources},
NewSingleContentTypeSerializer(api.Scheme, testapi.Extensions.Codec(), "application/json"))
return &master.Config{
Config: &genericapiserver.Config{
StorageDestinations: storageDestinations,
StorageVersions: storageVersions,
StorageFactory: storageFactory,
APIResourceConfigSource: master.DefaultAPIResourceConfigSource(),
APIPrefix: "/api",
APIGroupPrefix: "/apis",

View File

@ -0,0 +1,59 @@
/*
Copyright 2016 The Kubernetes Authors All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package framework
import (
"k8s.io/kubernetes/pkg/api/unversioned"
"k8s.io/kubernetes/pkg/runtime"
"k8s.io/kubernetes/pkg/runtime/serializer/versioning"
)
// NewSingleContentTypeSerializer wraps a serializer in a NegotiatedSerializer that handles one content type
func NewSingleContentTypeSerializer(scheme *runtime.Scheme, serializer runtime.Serializer, contentType string) runtime.NegotiatedSerializer {
return &wrappedSerializer{
scheme: scheme,
serializer: serializer,
contentType: contentType,
}
}
type wrappedSerializer struct {
scheme *runtime.Scheme
serializer runtime.Serializer
contentType string
}
var _ runtime.NegotiatedSerializer = &wrappedSerializer{}
func (s *wrappedSerializer) SupportedMediaTypes() []string {
return []string{s.contentType}
}
func (s *wrappedSerializer) SerializerForMediaType(mediaType string, options map[string]string) (runtime.Serializer, bool) {
if mediaType != s.contentType {
return nil, false
}
return s.serializer, true
}
func (s *wrappedSerializer) EncoderForVersion(serializer runtime.Serializer, gv unversioned.GroupVersion) runtime.Encoder {
return versioning.NewCodec(s.serializer, s.serializer, s.scheme, s.scheme, s.scheme, runtime.ObjectTyperToTyper(s.scheme), []unversioned.GroupVersion{gv}, nil)
}
func (s *wrappedSerializer) DecoderToVersion(serializer runtime.Serializer, gv unversioned.GroupVersion) runtime.Decoder {
return versioning.NewCodec(s.serializer, s.serializer, s.scheme, s.scheme, s.scheme, runtime.ObjectTyperToTyper(s.scheme), nil, []unversioned.GroupVersion{gv})
}