mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-25 12:43:23 +00:00
make storage enablement, serialization, and location orthogonal
This commit is contained in:
parent
3be4b690ea
commit
6670b73b18
@ -23,6 +23,7 @@ import (
|
||||
|
||||
"k8s.io/kubernetes/pkg/admission"
|
||||
"k8s.io/kubernetes/pkg/api"
|
||||
"k8s.io/kubernetes/pkg/api/unversioned"
|
||||
apiutil "k8s.io/kubernetes/pkg/api/util"
|
||||
"k8s.io/kubernetes/pkg/api/validation"
|
||||
"k8s.io/kubernetes/pkg/apimachinery/registered"
|
||||
@ -98,39 +99,53 @@ func NewAPIServer() *APIServer {
|
||||
}
|
||||
|
||||
// dest must be a map of group to groupVersion.
|
||||
func gvToMap(gvList string, dest map[string]string) {
|
||||
for _, gv := range strings.Split(gvList, ",") {
|
||||
if gv == "" {
|
||||
func mergeGroupVersionIntoMap(gvList string, dest map[string]unversioned.GroupVersion) error {
|
||||
for _, gvString := range strings.Split(gvList, ",") {
|
||||
if gvString == "" {
|
||||
continue
|
||||
}
|
||||
// We accept two formats. "group/version" OR
|
||||
// "group=group/version". The latter is used when types
|
||||
// move between groups.
|
||||
if !strings.Contains(gv, "=") {
|
||||
dest[apiutil.GetGroup(gv)] = gv
|
||||
if !strings.Contains(gvString, "=") {
|
||||
gv, err := unversioned.ParseGroupVersion(gvString)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
dest[gv.Group] = gv
|
||||
|
||||
} else {
|
||||
parts := strings.SplitN(gv, "=", 2)
|
||||
// TODO: error checking.
|
||||
dest[parts[0]] = parts[1]
|
||||
parts := strings.SplitN(gvString, "=", 2)
|
||||
gv, err := unversioned.ParseGroupVersion(parts[1])
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
dest[parts[0]] = gv
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// StorageGroupsToGroupVersions returns a map from group name to group version,
|
||||
// StorageGroupsToEncodingVersion returns a map from group name to group version,
|
||||
// computed from the s.DeprecatedStorageVersion and s.StorageVersions flags.
|
||||
// TODO: can we move the whole storage version concept to the generic apiserver?
|
||||
func (s *APIServer) StorageGroupsToGroupVersions() map[string]string {
|
||||
storageVersionMap := map[string]string{}
|
||||
func (s *APIServer) StorageGroupsToEncodingVersion() (map[string]unversioned.GroupVersion, error) {
|
||||
storageVersionMap := map[string]unversioned.GroupVersion{}
|
||||
if s.DeprecatedStorageVersion != "" {
|
||||
storageVersionMap[""] = s.DeprecatedStorageVersion
|
||||
storageVersionMap[""] = unversioned.GroupVersion{Group: apiutil.GetGroup(s.DeprecatedStorageVersion), Version: apiutil.GetVersion(s.DeprecatedStorageVersion)}
|
||||
}
|
||||
|
||||
// First, get the defaults.
|
||||
gvToMap(s.DefaultStorageVersions, storageVersionMap)
|
||||
if err := mergeGroupVersionIntoMap(s.DefaultStorageVersions, storageVersionMap); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
// Override any defaults with the user settings.
|
||||
gvToMap(s.StorageVersions, storageVersionMap)
|
||||
if err := mergeGroupVersionIntoMap(s.StorageVersions, storageVersionMap); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return storageVersionMap
|
||||
return storageVersionMap, nil
|
||||
}
|
||||
|
||||
// AddFlags adds flags for a specific APIServer to the specified FlagSet
|
||||
|
@ -23,6 +23,7 @@ import (
|
||||
"github.com/spf13/pflag"
|
||||
|
||||
"k8s.io/kubernetes/pkg/api"
|
||||
"k8s.io/kubernetes/pkg/api/unversioned"
|
||||
"k8s.io/kubernetes/pkg/apis/autoscaling"
|
||||
"k8s.io/kubernetes/pkg/apis/extensions"
|
||||
)
|
||||
@ -32,38 +33,38 @@ func TestGenerateStorageVersionMap(t *testing.T) {
|
||||
legacyVersion string
|
||||
storageVersions string
|
||||
defaultVersions string
|
||||
expectedMap map[string]string
|
||||
expectedMap map[string]unversioned.GroupVersion
|
||||
}{
|
||||
{
|
||||
legacyVersion: "v1",
|
||||
storageVersions: "v1,extensions/v1beta1",
|
||||
expectedMap: map[string]string{
|
||||
api.GroupName: "v1",
|
||||
extensions.GroupName: "extensions/v1beta1",
|
||||
expectedMap: map[string]unversioned.GroupVersion{
|
||||
api.GroupName: {Version: "v1"},
|
||||
extensions.GroupName: {Group: "extensions", Version: "v1beta1"},
|
||||
},
|
||||
},
|
||||
{
|
||||
legacyVersion: "",
|
||||
storageVersions: "extensions/v1beta1,v1",
|
||||
expectedMap: map[string]string{
|
||||
api.GroupName: "v1",
|
||||
extensions.GroupName: "extensions/v1beta1",
|
||||
expectedMap: map[string]unversioned.GroupVersion{
|
||||
api.GroupName: {Version: "v1"},
|
||||
extensions.GroupName: {Group: "extensions", Version: "v1beta1"},
|
||||
},
|
||||
},
|
||||
{
|
||||
legacyVersion: "",
|
||||
storageVersions: "autoscaling=extensions/v1beta1,v1",
|
||||
defaultVersions: "extensions/v1beta1,v1,autoscaling/v1",
|
||||
expectedMap: map[string]string{
|
||||
api.GroupName: "v1",
|
||||
autoscaling.GroupName: "extensions/v1beta1",
|
||||
extensions.GroupName: "extensions/v1beta1",
|
||||
expectedMap: map[string]unversioned.GroupVersion{
|
||||
api.GroupName: {Version: "v1"},
|
||||
autoscaling.GroupName: {Group: "extensions", Version: "v1beta1"},
|
||||
extensions.GroupName: {Group: "extensions", Version: "v1beta1"},
|
||||
},
|
||||
},
|
||||
{
|
||||
legacyVersion: "",
|
||||
storageVersions: "",
|
||||
expectedMap: map[string]string{},
|
||||
expectedMap: map[string]unversioned.GroupVersion{},
|
||||
},
|
||||
}
|
||||
for i, test := range testCases {
|
||||
@ -72,7 +73,10 @@ func TestGenerateStorageVersionMap(t *testing.T) {
|
||||
StorageVersions: test.storageVersions,
|
||||
DefaultStorageVersions: test.defaultVersions,
|
||||
}
|
||||
output := s.StorageGroupsToGroupVersions()
|
||||
output, err := s.StorageGroupsToEncodingVersion()
|
||||
if err != nil {
|
||||
t.Errorf("%v: unexpected error: %v", i, err)
|
||||
}
|
||||
if !reflect.DeepEqual(test.expectedMap, output) {
|
||||
t.Errorf("%v: unexpected error. expect: %v, got: %v", i, test.expectedMap, output)
|
||||
}
|
||||
|
@ -37,8 +37,6 @@ import (
|
||||
"k8s.io/kubernetes/pkg/api"
|
||||
"k8s.io/kubernetes/pkg/api/unversioned"
|
||||
apiv1 "k8s.io/kubernetes/pkg/api/v1"
|
||||
"k8s.io/kubernetes/pkg/apimachinery/registered"
|
||||
"k8s.io/kubernetes/pkg/apis/apps"
|
||||
appsapi "k8s.io/kubernetes/pkg/apis/apps/v1alpha1"
|
||||
"k8s.io/kubernetes/pkg/apis/autoscaling"
|
||||
autoscalingapiv1 "k8s.io/kubernetes/pkg/apis/autoscaling/v1"
|
||||
@ -58,10 +56,7 @@ import (
|
||||
"k8s.io/kubernetes/pkg/master"
|
||||
"k8s.io/kubernetes/pkg/registry/cachesize"
|
||||
"k8s.io/kubernetes/pkg/runtime"
|
||||
"k8s.io/kubernetes/pkg/runtime/serializer/versioning"
|
||||
"k8s.io/kubernetes/pkg/serviceaccount"
|
||||
"k8s.io/kubernetes/pkg/storage"
|
||||
etcdstorage "k8s.io/kubernetes/pkg/storage/etcd"
|
||||
)
|
||||
|
||||
// NewAPIServerCommand creates a *cobra.Command object with default parameters
|
||||
@ -81,89 +76,6 @@ cluster's shared state through which all other components interact.`,
|
||||
return cmd
|
||||
}
|
||||
|
||||
// For testing.
|
||||
type newEtcdFunc func(runtime.NegotiatedSerializer, string, string, etcdstorage.EtcdConfig) (storage.Interface, error)
|
||||
|
||||
func newEtcd(ns runtime.NegotiatedSerializer, storageGroupVersionString, memoryGroupVersionString string, etcdConfig etcdstorage.EtcdConfig) (etcdStorage storage.Interface, err error) {
|
||||
if storageGroupVersionString == "" {
|
||||
return etcdStorage, fmt.Errorf("storageVersion is required to create a etcd storage")
|
||||
}
|
||||
storageVersion, err := unversioned.ParseGroupVersion(storageGroupVersionString)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("couldn't understand storage version %v: %v", storageGroupVersionString, err)
|
||||
}
|
||||
memoryVersion, err := unversioned.ParseGroupVersion(memoryGroupVersionString)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("couldn't understand memory version %v: %v", memoryGroupVersionString, err)
|
||||
}
|
||||
|
||||
var storageConfig etcdstorage.EtcdStorageConfig
|
||||
storageConfig.Config = etcdConfig
|
||||
s, ok := ns.SerializerForMediaType("application/json", nil)
|
||||
if !ok {
|
||||
return nil, fmt.Errorf("unable to find serializer for JSON")
|
||||
}
|
||||
glog.Infof("constructing etcd storage interface.\n sv: %v\n mv: %v\n", storageVersion, memoryVersion)
|
||||
encoder := ns.EncoderForVersion(s, storageVersion)
|
||||
decoder := ns.DecoderToVersion(s, memoryVersion)
|
||||
if memoryVersion.Group != storageVersion.Group {
|
||||
// Allow this codec to translate between groups.
|
||||
if err = versioning.EnableCrossGroupEncoding(encoder, memoryVersion.Group, storageVersion.Group); err != nil {
|
||||
return nil, fmt.Errorf("error setting up encoder for %v: %v", storageGroupVersionString, err)
|
||||
}
|
||||
if err = versioning.EnableCrossGroupDecoding(decoder, storageVersion.Group, memoryVersion.Group); err != nil {
|
||||
return nil, fmt.Errorf("error setting up decoder for %v: %v", storageGroupVersionString, err)
|
||||
}
|
||||
}
|
||||
storageConfig.Codec = runtime.NewCodec(encoder, decoder)
|
||||
return storageConfig.NewStorage()
|
||||
}
|
||||
|
||||
// parse the value of --etcd-servers-overrides and update given storageDestinations.
|
||||
func updateEtcdOverrides(overrides []string, storageVersions map[string]string, etcdConfig etcdstorage.EtcdConfig, storageDestinations *genericapiserver.StorageDestinations, newEtcdFn newEtcdFunc) {
|
||||
if len(overrides) == 0 {
|
||||
return
|
||||
}
|
||||
for _, override := range overrides {
|
||||
tokens := strings.Split(override, "#")
|
||||
if len(tokens) != 2 {
|
||||
glog.Errorf("invalid value of etcd server overrides: %s", override)
|
||||
continue
|
||||
}
|
||||
|
||||
apiresource := strings.Split(tokens[0], "/")
|
||||
if len(apiresource) != 2 {
|
||||
glog.Errorf("invalid resource definition: %s", tokens[0])
|
||||
}
|
||||
group := apiresource[0]
|
||||
resource := apiresource[1]
|
||||
|
||||
apigroup, err := registered.Group(group)
|
||||
if err != nil {
|
||||
glog.Errorf("invalid api group %s: %v", group, err)
|
||||
continue
|
||||
}
|
||||
if _, found := storageVersions[apigroup.GroupVersion.Group]; !found {
|
||||
glog.Errorf("Couldn't find the storage version for group %s", apigroup.GroupVersion.Group)
|
||||
continue
|
||||
}
|
||||
|
||||
servers := strings.Split(tokens[1], ";")
|
||||
overrideEtcdConfig := etcdConfig
|
||||
overrideEtcdConfig.ServerList = servers
|
||||
// Note, internalGV will be wrong for things like batch or
|
||||
// autoscalers, but they shouldn't be using the override
|
||||
// storage.
|
||||
internalGV := apigroup.GroupVersion.Group + "/__internal"
|
||||
etcdOverrideStorage, err := newEtcdFn(api.Codecs, storageVersions[apigroup.GroupVersion.Group], internalGV, overrideEtcdConfig)
|
||||
if err != nil {
|
||||
glog.Fatalf("Invalid storage version or misconfigured etcd for %s: %v", tokens[0], err)
|
||||
}
|
||||
|
||||
storageDestinations.AddStorageOverride(group, resource, etcdOverrideStorage)
|
||||
}
|
||||
}
|
||||
|
||||
// Run runs the specified APIServer. This should never exit.
|
||||
func Run(s *options.APIServer) error {
|
||||
genericapiserver.DefaultAndValidateRunOptions(s.ServerRunOptions)
|
||||
@ -252,126 +164,37 @@ func Run(s *options.APIServer) error {
|
||||
glog.Errorf("Failed to create clientset: %v", err)
|
||||
}
|
||||
|
||||
legacyV1Group, err := registered.Group(api.GroupName)
|
||||
resourceEncoding := genericapiserver.NewDefaultResourceEncodingConfig()
|
||||
groupToEncoding, err := s.StorageGroupsToEncodingVersion()
|
||||
if err != nil {
|
||||
return err
|
||||
glog.Fatalf("error getting group encoding: %s", err)
|
||||
}
|
||||
for group, storageEncodingVersion := range groupToEncoding {
|
||||
resourceEncoding.SetVersionEncoding(group, storageEncodingVersion, unversioned.GroupVersion{Group: group, Version: runtime.APIVersionInternal})
|
||||
}
|
||||
|
||||
storageDestinations := genericapiserver.NewStorageDestinations()
|
||||
storageFactory := genericapiserver.NewDefaultStorageFactory(s.EtcdConfig, api.Codecs, resourceEncoding, apiResourceConfigSource)
|
||||
storageFactory.AddCohabitatingResources(batch.Resource("jobs"), extensions.Resource("jobs"))
|
||||
storageFactory.AddCohabitatingResources(autoscaling.Resource("horizontalpodautoscalers"), extensions.Resource("horizontalpodautoscalers"))
|
||||
for _, override := range s.EtcdServersOverrides {
|
||||
tokens := strings.Split(override, "#")
|
||||
if len(tokens) != 2 {
|
||||
glog.Errorf("invalid value of etcd server overrides: %s", override)
|
||||
continue
|
||||
}
|
||||
|
||||
storageVersions := s.StorageGroupsToGroupVersions()
|
||||
if _, found := storageVersions[legacyV1Group.GroupVersion.Group]; !found {
|
||||
glog.Fatalf("Couldn't find the storage version for group: %q in storageVersions: %v", legacyV1Group.GroupVersion.Group, storageVersions)
|
||||
apiresource := strings.Split(tokens[0], "/")
|
||||
if len(apiresource) != 2 {
|
||||
glog.Errorf("invalid resource definition: %s", tokens[0])
|
||||
continue
|
||||
}
|
||||
group := apiresource[0]
|
||||
resource := apiresource[1]
|
||||
groupResource := unversioned.GroupResource{Group: group, Resource: resource}
|
||||
|
||||
servers := strings.Split(tokens[1], ";")
|
||||
storageFactory.SetEtcdLocation(groupResource, servers)
|
||||
}
|
||||
etcdStorage, err := newEtcd(api.Codecs, storageVersions[legacyV1Group.GroupVersion.Group], "/__internal", s.EtcdConfig)
|
||||
if err != nil {
|
||||
glog.Fatalf("Invalid storage version or misconfigured etcd: %v", err)
|
||||
}
|
||||
storageDestinations.AddAPIGroup("", etcdStorage)
|
||||
|
||||
if apiResourceConfigSource.AnyResourcesForVersionEnabled(extensionsapiv1beta1.SchemeGroupVersion) {
|
||||
glog.Infof("Configuring extensions/v1beta1 storage destination")
|
||||
expGroup, err := registered.Group(extensions.GroupName)
|
||||
if err != nil {
|
||||
glog.Fatalf("Extensions API is enabled in runtime config, but not enabled in the environment variable KUBE_API_VERSIONS. Error: %v", err)
|
||||
}
|
||||
if _, found := storageVersions[expGroup.GroupVersion.Group]; !found {
|
||||
glog.Fatalf("Couldn't find the storage version for group: %q in storageVersions: %v", expGroup.GroupVersion.Group, storageVersions)
|
||||
}
|
||||
expEtcdStorage, err := newEtcd(api.Codecs, storageVersions[expGroup.GroupVersion.Group], "extensions/__internal", s.EtcdConfig)
|
||||
if err != nil {
|
||||
glog.Fatalf("Invalid extensions storage version or misconfigured etcd: %v", err)
|
||||
}
|
||||
storageDestinations.AddAPIGroup(extensions.GroupName, expEtcdStorage)
|
||||
|
||||
// Since HPA has been moved to the autoscaling group, we need to make
|
||||
// sure autoscaling has a storage destination. If the autoscaling group
|
||||
// itself is on, it will overwrite this decision below.
|
||||
storageDestinations.AddAPIGroup(autoscaling.GroupName, expEtcdStorage)
|
||||
|
||||
// Since Job has been moved to the batch group, we need to make
|
||||
// sure batch has a storage destination. If the batch group
|
||||
// itself is on, it will overwrite this decision below.
|
||||
storageDestinations.AddAPIGroup(batch.GroupName, expEtcdStorage)
|
||||
}
|
||||
|
||||
// autoscaling/v1/horizontalpodautoscalers is a move from extensions/v1beta1/horizontalpodautoscalers.
|
||||
// The storage version needs to be either extensions/v1beta1 or autoscaling/v1.
|
||||
// Users must roll forward while using 1.2, because we will require the latter for 1.3.
|
||||
if apiResourceConfigSource.AnyResourcesForVersionEnabled(autoscalingapiv1.SchemeGroupVersion) {
|
||||
glog.Infof("Configuring autoscaling/v1 storage destination")
|
||||
autoscalingGroup, err := registered.Group(autoscaling.GroupName)
|
||||
if err != nil {
|
||||
glog.Fatalf("Autoscaling API is enabled in runtime config, but not enabled in the environment variable KUBE_API_VERSIONS. Error: %v", err)
|
||||
}
|
||||
// Figure out what storage group/version we should use.
|
||||
storageGroupVersion, found := storageVersions[autoscalingGroup.GroupVersion.Group]
|
||||
if !found {
|
||||
glog.Fatalf("Couldn't find the storage version for group: %q in storageVersions: %v", autoscalingGroup.GroupVersion.Group, storageVersions)
|
||||
}
|
||||
|
||||
if storageGroupVersion != "autoscaling/v1" && storageGroupVersion != "extensions/v1beta1" {
|
||||
glog.Fatalf("The storage version for autoscaling must be either 'autoscaling/v1' or 'extensions/v1beta1'")
|
||||
}
|
||||
glog.Infof("Using %v for autoscaling group storage version", storageGroupVersion)
|
||||
autoscalingEtcdStorage, err := newEtcd(api.Codecs, storageGroupVersion, "extensions/__internal", s.EtcdConfig)
|
||||
if err != nil {
|
||||
glog.Fatalf("Invalid extensions storage version or misconfigured etcd: %v", err)
|
||||
}
|
||||
storageDestinations.AddAPIGroup(autoscaling.GroupName, autoscalingEtcdStorage)
|
||||
}
|
||||
|
||||
// batch/v1/job is a move from extensions/v1beta1/job. The storage
|
||||
// version needs to be either extensions/v1beta1 or batch/v1. Users
|
||||
// must roll forward while using 1.2, because we will require the
|
||||
// latter for 1.3.
|
||||
if apiResourceConfigSource.AnyResourcesForVersionEnabled(batchapiv1.SchemeGroupVersion) {
|
||||
glog.Infof("Configuring batch/v1 storage destination")
|
||||
batchGroup, err := registered.Group(batch.GroupName)
|
||||
if err != nil {
|
||||
glog.Fatalf("Batch API is enabled in runtime config, but not enabled in the environment variable KUBE_API_VERSIONS. Error: %v", err)
|
||||
}
|
||||
// Figure out what storage group/version we should use.
|
||||
storageGroupVersion, found := storageVersions[batchGroup.GroupVersion.Group]
|
||||
if !found {
|
||||
glog.Fatalf("Couldn't find the storage version for group: %q in storageVersions: %v", batchGroup.GroupVersion.Group, storageVersions)
|
||||
}
|
||||
|
||||
if storageGroupVersion != "batch/v1" && storageGroupVersion != "extensions/v1beta1" {
|
||||
glog.Fatalf("The storage version for batch must be either 'batch/v1' or 'extensions/v1beta1'")
|
||||
}
|
||||
glog.Infof("Using %v for batch group storage version", storageGroupVersion)
|
||||
batchEtcdStorage, err := newEtcd(api.Codecs, storageGroupVersion, "extensions/__internal", s.EtcdConfig)
|
||||
if err != nil {
|
||||
glog.Fatalf("Invalid extensions storage version or misconfigured etcd: %v", err)
|
||||
}
|
||||
storageDestinations.AddAPIGroup(batch.GroupName, batchEtcdStorage)
|
||||
}
|
||||
|
||||
if apiResourceConfigSource.AnyResourcesForVersionEnabled(appsapi.SchemeGroupVersion) {
|
||||
glog.Infof("Configuring apps/v1alpha1 storage destination")
|
||||
appsGroup, err := registered.Group(apps.GroupName)
|
||||
if err != nil {
|
||||
glog.Fatalf("Apps API is enabled in runtime config, but not enabled in the environment variable KUBE_API_VERSIONS. Error: %v", err)
|
||||
}
|
||||
// Figure out what storage group/version we should use.
|
||||
storageGroupVersion, found := storageVersions[appsGroup.GroupVersion.Group]
|
||||
if !found {
|
||||
glog.Fatalf("Couldn't find the storage version for group: %q in storageVersions: %v", appsGroup.GroupVersion.Group, storageVersions)
|
||||
}
|
||||
|
||||
if storageGroupVersion != "apps/v1alpha1" {
|
||||
glog.Fatalf("The storage version for apps must be apps/v1alpha1")
|
||||
}
|
||||
glog.Infof("Using %v for petset group storage version", storageGroupVersion)
|
||||
appsEtcdStorage, err := newEtcd(api.Codecs, storageGroupVersion, "apps/__internal", s.EtcdConfig)
|
||||
if err != nil {
|
||||
glog.Fatalf("Invalid extensions storage version or misconfigured etcd: %v", err)
|
||||
}
|
||||
storageDestinations.AddAPIGroup(apps.GroupName, appsEtcdStorage)
|
||||
}
|
||||
|
||||
updateEtcdOverrides(s.EtcdServersOverrides, storageVersions, s.EtcdConfig, &storageDestinations, newEtcd)
|
||||
|
||||
// Default to the private server key for service account token signing
|
||||
if s.ServiceAccountKeyFile == "" && s.TLSPrivateKeyFile != "" {
|
||||
@ -386,7 +209,11 @@ func Run(s *options.APIServer) error {
|
||||
if s.ServiceAccountLookup {
|
||||
// If we need to look up service accounts and tokens,
|
||||
// go directly to etcd to avoid recursive auth insanity
|
||||
serviceAccountGetter = serviceaccountcontroller.NewGetterFromStorageInterface(etcdStorage)
|
||||
storage, err := storageFactory.New(api.Resource("serviceaccounts"))
|
||||
if err != nil {
|
||||
glog.Fatalf("Unable to get serviceaccounts storage: %v", err)
|
||||
}
|
||||
serviceAccountGetter = serviceaccountcontroller.NewGetterFromStorageInterface(storage)
|
||||
}
|
||||
|
||||
authenticator, err := authenticator.New(authenticator.AuthenticatorConfig{
|
||||
@ -443,8 +270,7 @@ func Run(s *options.APIServer) error {
|
||||
|
||||
genericConfig := genericapiserver.NewConfig(s.ServerRunOptions)
|
||||
// TODO: Move the following to generic api server as well.
|
||||
genericConfig.StorageDestinations = storageDestinations
|
||||
genericConfig.StorageVersions = storageVersions
|
||||
genericConfig.StorageFactory = storageFactory
|
||||
genericConfig.Authenticator = authenticator
|
||||
genericConfig.SupportsBasicAuth = len(s.BasicAuthFile) > 0
|
||||
genericConfig.Authorizer = authorizer
|
||||
|
@ -19,18 +19,12 @@ package app
|
||||
import (
|
||||
"reflect"
|
||||
"regexp"
|
||||
"strings"
|
||||
"testing"
|
||||
|
||||
"k8s.io/kubernetes/cmd/kube-apiserver/app/options"
|
||||
"k8s.io/kubernetes/pkg/api"
|
||||
"k8s.io/kubernetes/pkg/api/unversioned"
|
||||
"k8s.io/kubernetes/pkg/apis/extensions"
|
||||
"k8s.io/kubernetes/pkg/genericapiserver"
|
||||
"k8s.io/kubernetes/pkg/master"
|
||||
"k8s.io/kubernetes/pkg/runtime"
|
||||
"k8s.io/kubernetes/pkg/storage"
|
||||
etcdstorage "k8s.io/kubernetes/pkg/storage/etcd"
|
||||
)
|
||||
|
||||
func TestLongRunningRequestRegexp(t *testing.T) {
|
||||
@ -74,64 +68,6 @@ func TestLongRunningRequestRegexp(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
func TestUpdateEtcdOverrides(t *testing.T) {
|
||||
storageVersions := map[string]string{
|
||||
"": "v1",
|
||||
"extensions": "extensions/v1beta1",
|
||||
}
|
||||
|
||||
testCases := []struct {
|
||||
apigroup string
|
||||
resource string
|
||||
servers []string
|
||||
}{
|
||||
{
|
||||
apigroup: api.GroupName,
|
||||
resource: "resource",
|
||||
servers: []string{"http://127.0.0.1:10000"},
|
||||
},
|
||||
{
|
||||
apigroup: api.GroupName,
|
||||
resource: "resource",
|
||||
servers: []string{"http://127.0.0.1:10000", "http://127.0.0.1:20000"},
|
||||
},
|
||||
{
|
||||
apigroup: extensions.GroupName,
|
||||
resource: "resource",
|
||||
servers: []string{"http://127.0.0.1:10000"},
|
||||
},
|
||||
}
|
||||
|
||||
for _, test := range testCases {
|
||||
newEtcd := func(_ runtime.NegotiatedSerializer, _, _ string, etcdConfig etcdstorage.EtcdConfig) (storage.Interface, error) {
|
||||
if !reflect.DeepEqual(test.servers, etcdConfig.ServerList) {
|
||||
t.Errorf("unexpected server list, expected: %#v, got: %#v", test.servers, etcdConfig.ServerList)
|
||||
}
|
||||
return nil, nil
|
||||
}
|
||||
storageDestinations := genericapiserver.NewStorageDestinations()
|
||||
override := test.apigroup + "/" + test.resource + "#" + strings.Join(test.servers, ";")
|
||||
defaultEtcdConfig := etcdstorage.EtcdConfig{
|
||||
Prefix: genericapiserver.DefaultEtcdPathPrefix,
|
||||
ServerList: []string{"http://127.0.0.1"},
|
||||
}
|
||||
updateEtcdOverrides([]string{override}, storageVersions, defaultEtcdConfig, &storageDestinations, newEtcd)
|
||||
apigroup, ok := storageDestinations.APIGroups[test.apigroup]
|
||||
if !ok {
|
||||
t.Errorf("apigroup: %s not created", test.apigroup)
|
||||
continue
|
||||
}
|
||||
if apigroup.Overrides == nil {
|
||||
t.Errorf("Overrides not created for: %s", test.apigroup)
|
||||
continue
|
||||
}
|
||||
if _, ok := apigroup.Overrides[test.resource]; !ok {
|
||||
t.Errorf("override not created for: %s", test.resource)
|
||||
continue
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func TestParseRuntimeConfig(t *testing.T) {
|
||||
testCases := []struct {
|
||||
runtimeConfig map[string]string
|
||||
|
@ -24,7 +24,7 @@ import (
|
||||
testgroupetcd "k8s.io/kubernetes/examples/apiserver/rest"
|
||||
"k8s.io/kubernetes/pkg/api"
|
||||
"k8s.io/kubernetes/pkg/api/rest"
|
||||
"k8s.io/kubernetes/pkg/apimachinery"
|
||||
"k8s.io/kubernetes/pkg/api/unversioned"
|
||||
"k8s.io/kubernetes/pkg/apimachinery/registered"
|
||||
"k8s.io/kubernetes/pkg/genericapiserver"
|
||||
etcdstorage "k8s.io/kubernetes/pkg/storage/etcd"
|
||||
@ -40,20 +40,14 @@ const (
|
||||
SecurePort = 6444
|
||||
)
|
||||
|
||||
func newStorageDestinations(groupName string, groupMeta *apimachinery.GroupMeta) (*genericapiserver.StorageDestinations, error) {
|
||||
storageDestinations := genericapiserver.NewStorageDestinations()
|
||||
var storageConfig etcdstorage.EtcdStorageConfig
|
||||
storageConfig.Config = etcdstorage.EtcdConfig{
|
||||
func newStorageFactory() genericapiserver.StorageFactory {
|
||||
etcdConfig := etcdstorage.EtcdConfig{
|
||||
Prefix: genericapiserver.DefaultEtcdPathPrefix,
|
||||
ServerList: []string{"http://127.0.0.1:4001"},
|
||||
}
|
||||
storageConfig.Codec = groupMeta.Codec
|
||||
storageInterface, err := storageConfig.NewStorage()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
storageDestinations.AddAPIGroup(groupName, storageInterface)
|
||||
return &storageDestinations, nil
|
||||
storageFactory := genericapiserver.NewDefaultStorageFactory(etcdConfig, api.Codecs, genericapiserver.NewDefaultResourceEncodingConfig(), genericapiserver.NewResourceConfig())
|
||||
|
||||
return storageFactory
|
||||
}
|
||||
|
||||
func NewServerRunOptions() *genericapiserver.ServerRunOptions {
|
||||
@ -86,12 +80,14 @@ func Run(serverOptions *genericapiserver.ServerRunOptions) error {
|
||||
if err != nil {
|
||||
return fmt.Errorf("%v", err)
|
||||
}
|
||||
storageDestinations, err := newStorageDestinations(groupName, groupMeta)
|
||||
storageFactory := newStorageFactory()
|
||||
storage, err := storageFactory.New(unversioned.GroupResource{Group: groupName, Resource: "testtype"})
|
||||
if err != nil {
|
||||
return fmt.Errorf("Unable to init etcd: %v", err)
|
||||
return fmt.Errorf("Unable to get storage: %v", err)
|
||||
}
|
||||
|
||||
restStorageMap := map[string]rest.Storage{
|
||||
"testtypes": testgroupetcd.NewREST(storageDestinations.Get(groupName, "testtype"), s.StorageDecorator()),
|
||||
"testtypes": testgroupetcd.NewREST(storage, s.StorageDecorator()),
|
||||
}
|
||||
apiGroupInfo := genericapiserver.APIGroupInfo{
|
||||
GroupMeta: *groupMeta,
|
||||
|
@ -24,6 +24,7 @@ import (
|
||||
|
||||
"k8s.io/kubernetes/pkg/admission"
|
||||
"k8s.io/kubernetes/pkg/api"
|
||||
"k8s.io/kubernetes/pkg/api/unversioned"
|
||||
apiutil "k8s.io/kubernetes/pkg/api/util"
|
||||
"k8s.io/kubernetes/pkg/api/validation"
|
||||
"k8s.io/kubernetes/pkg/apimachinery/registered"
|
||||
@ -119,39 +120,53 @@ func NewAPIServer() *APIServer {
|
||||
}
|
||||
|
||||
// dest must be a map of group to groupVersion.
|
||||
func gvToMap(gvList string, dest map[string]string) {
|
||||
for _, gv := range strings.Split(gvList, ",") {
|
||||
if gv == "" {
|
||||
func mergeGroupVersionIntoMap(gvList string, dest map[string]unversioned.GroupVersion) error {
|
||||
for _, gvString := range strings.Split(gvList, ",") {
|
||||
if gvString == "" {
|
||||
continue
|
||||
}
|
||||
// We accept two formats. "group/version" OR
|
||||
// "group=group/version". The latter is used when types
|
||||
// move between groups.
|
||||
if !strings.Contains(gv, "=") {
|
||||
dest[apiutil.GetGroup(gv)] = gv
|
||||
if !strings.Contains(gvString, "=") {
|
||||
gv, err := unversioned.ParseGroupVersion(gvString)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
dest[gv.Group] = gv
|
||||
|
||||
} else {
|
||||
parts := strings.SplitN(gv, "=", 2)
|
||||
// TODO: error checking.
|
||||
dest[parts[0]] = parts[1]
|
||||
parts := strings.SplitN(gvString, "=", 2)
|
||||
gv, err := unversioned.ParseGroupVersion(parts[1])
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
dest[parts[0]] = gv
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// StorageGroupsToGroupVersions returns a map from group name to group version,
|
||||
// StorageGroupsToEncodingVersion returns a map from group name to group version,
|
||||
// computed from the s.DeprecatedStorageVersion and s.StorageVersions flags.
|
||||
// TODO: can we move the whole storage version concept to the generic apiserver?
|
||||
func (s *APIServer) StorageGroupsToGroupVersions() map[string]string {
|
||||
storageVersionMap := map[string]string{}
|
||||
func (s *APIServer) StorageGroupsToEncodingVersion() (map[string]unversioned.GroupVersion, error) {
|
||||
storageVersionMap := map[string]unversioned.GroupVersion{}
|
||||
if s.DeprecatedStorageVersion != "" {
|
||||
storageVersionMap[""] = s.DeprecatedStorageVersion
|
||||
storageVersionMap[""] = unversioned.GroupVersion{Group: apiutil.GetGroup(s.DeprecatedStorageVersion), Version: apiutil.GetVersion(s.DeprecatedStorageVersion)}
|
||||
}
|
||||
|
||||
// First, get the defaults.
|
||||
gvToMap(s.DefaultStorageVersions, storageVersionMap)
|
||||
if err := mergeGroupVersionIntoMap(s.DefaultStorageVersions, storageVersionMap); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
// Override any defaults with the user settings.
|
||||
gvToMap(s.StorageVersions, storageVersionMap)
|
||||
if err := mergeGroupVersionIntoMap(s.StorageVersions, storageVersionMap); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return storageVersionMap
|
||||
return storageVersionMap, nil
|
||||
}
|
||||
|
||||
// AddFlags adds flags for a specific APIServer to the specified FlagSet
|
||||
|
@ -23,6 +23,7 @@ import (
|
||||
"github.com/spf13/pflag"
|
||||
|
||||
"k8s.io/kubernetes/pkg/api"
|
||||
"k8s.io/kubernetes/pkg/api/unversioned"
|
||||
"k8s.io/kubernetes/pkg/apis/autoscaling"
|
||||
"k8s.io/kubernetes/pkg/apis/extensions"
|
||||
)
|
||||
@ -32,38 +33,38 @@ func TestGenerateStorageVersionMap(t *testing.T) {
|
||||
legacyVersion string
|
||||
storageVersions string
|
||||
defaultVersions string
|
||||
expectedMap map[string]string
|
||||
expectedMap map[string]unversioned.GroupVersion
|
||||
}{
|
||||
{
|
||||
legacyVersion: "v1",
|
||||
storageVersions: "v1,extensions/v1beta1",
|
||||
expectedMap: map[string]string{
|
||||
api.GroupName: "v1",
|
||||
extensions.GroupName: "extensions/v1beta1",
|
||||
expectedMap: map[string]unversioned.GroupVersion{
|
||||
api.GroupName: {Version: "v1"},
|
||||
extensions.GroupName: {Group: "extensions", Version: "v1beta1"},
|
||||
},
|
||||
},
|
||||
{
|
||||
legacyVersion: "",
|
||||
storageVersions: "extensions/v1beta1,v1",
|
||||
expectedMap: map[string]string{
|
||||
api.GroupName: "v1",
|
||||
extensions.GroupName: "extensions/v1beta1",
|
||||
expectedMap: map[string]unversioned.GroupVersion{
|
||||
api.GroupName: {Version: "v1"},
|
||||
extensions.GroupName: {Group: "extensions", Version: "v1beta1"},
|
||||
},
|
||||
},
|
||||
{
|
||||
legacyVersion: "",
|
||||
storageVersions: "autoscaling=extensions/v1beta1,v1",
|
||||
defaultVersions: "extensions/v1beta1,v1,autoscaling/v1",
|
||||
expectedMap: map[string]string{
|
||||
api.GroupName: "v1",
|
||||
autoscaling.GroupName: "extensions/v1beta1",
|
||||
extensions.GroupName: "extensions/v1beta1",
|
||||
expectedMap: map[string]unversioned.GroupVersion{
|
||||
api.GroupName: {Version: "v1"},
|
||||
autoscaling.GroupName: {Group: "extensions", Version: "v1beta1"},
|
||||
extensions.GroupName: {Group: "extensions", Version: "v1beta1"},
|
||||
},
|
||||
},
|
||||
{
|
||||
legacyVersion: "",
|
||||
storageVersions: "",
|
||||
expectedMap: map[string]string{},
|
||||
expectedMap: map[string]unversioned.GroupVersion{},
|
||||
},
|
||||
}
|
||||
for i, test := range testCases {
|
||||
@ -72,7 +73,10 @@ func TestGenerateStorageVersionMap(t *testing.T) {
|
||||
StorageVersions: test.storageVersions,
|
||||
DefaultStorageVersions: test.defaultVersions,
|
||||
}
|
||||
output := s.StorageGroupsToGroupVersions()
|
||||
output, err := s.StorageGroupsToEncodingVersion()
|
||||
if err != nil {
|
||||
t.Errorf("%v: unexpected error: %v", i, err)
|
||||
}
|
||||
if !reflect.DeepEqual(test.expectedMap, output) {
|
||||
t.Errorf("%v: unexpected error. expect: %v, got: %v", i, test.expectedMap, output)
|
||||
}
|
||||
|
@ -21,7 +21,6 @@ package app
|
||||
|
||||
import (
|
||||
"crypto/tls"
|
||||
"fmt"
|
||||
"net"
|
||||
"net/url"
|
||||
"os"
|
||||
@ -47,9 +46,6 @@ import (
|
||||
"k8s.io/kubernetes/pkg/master"
|
||||
"k8s.io/kubernetes/pkg/registry/cachesize"
|
||||
"k8s.io/kubernetes/pkg/runtime"
|
||||
"k8s.io/kubernetes/pkg/runtime/serializer/versioning"
|
||||
"k8s.io/kubernetes/pkg/storage"
|
||||
etcdstorage "k8s.io/kubernetes/pkg/storage/etcd"
|
||||
utilnet "k8s.io/kubernetes/pkg/util/net"
|
||||
)
|
||||
|
||||
@ -81,44 +77,6 @@ func verifyClusterIPFlags(s *options.APIServer) {
|
||||
}
|
||||
}
|
||||
|
||||
// For testing.
|
||||
type newEtcdFunc func(runtime.NegotiatedSerializer, string, string, etcdstorage.EtcdConfig) (storage.Interface, error)
|
||||
|
||||
func newEtcd(ns runtime.NegotiatedSerializer, storageGroupVersionString, memoryGroupVersionString string, etcdConfig etcdstorage.EtcdConfig) (etcdStorage storage.Interface, err error) {
|
||||
if storageGroupVersionString == "" {
|
||||
return etcdStorage, fmt.Errorf("storageVersion is required to create a etcd storage")
|
||||
}
|
||||
storageVersion, err := unversioned.ParseGroupVersion(storageGroupVersionString)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("couldn't understand storage version %v: %v", storageGroupVersionString, err)
|
||||
}
|
||||
memoryVersion, err := unversioned.ParseGroupVersion(memoryGroupVersionString)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("couldn't understand memory version %v: %v", memoryGroupVersionString, err)
|
||||
}
|
||||
|
||||
var storageConfig etcdstorage.EtcdStorageConfig
|
||||
storageConfig.Config = etcdConfig
|
||||
s, ok := ns.SerializerForMediaType("application/json", nil)
|
||||
if !ok {
|
||||
return nil, fmt.Errorf("unable to find serializer for JSON")
|
||||
}
|
||||
glog.Infof("constructing etcd storage interface.\n sv: %v\n mv: %v\n", storageVersion, memoryVersion)
|
||||
encoder := ns.EncoderForVersion(s, storageVersion)
|
||||
decoder := ns.DecoderToVersion(s, memoryVersion)
|
||||
if memoryVersion.Group != storageVersion.Group {
|
||||
// Allow this codec to translate between groups.
|
||||
if err = versioning.EnableCrossGroupEncoding(encoder, memoryVersion.Group, storageVersion.Group); err != nil {
|
||||
return nil, fmt.Errorf("error setting up encoder for %v: %v", storageGroupVersionString, err)
|
||||
}
|
||||
if err = versioning.EnableCrossGroupDecoding(decoder, storageVersion.Group, memoryVersion.Group); err != nil {
|
||||
return nil, fmt.Errorf("error setting up decoder for %v: %v", storageGroupVersionString, err)
|
||||
}
|
||||
}
|
||||
storageConfig.Codec = runtime.NewCodec(encoder, decoder)
|
||||
return storageConfig.NewStorage()
|
||||
}
|
||||
|
||||
// Run runs the specified APIServer. This should never exit.
|
||||
func Run(s *options.APIServer) error {
|
||||
verifyClusterIPFlags(s)
|
||||
@ -228,6 +186,36 @@ func Run(s *options.APIServer) error {
|
||||
|
||||
n := s.ServiceClusterIPRange
|
||||
|
||||
resourceEncoding := genericapiserver.NewDefaultResourceEncodingConfig()
|
||||
groupToEncoding, err := s.StorageGroupsToEncodingVersion()
|
||||
if err != nil {
|
||||
glog.Fatalf("error getting group encoding: %s", err)
|
||||
}
|
||||
for group, storageEncodingVersion := range groupToEncoding {
|
||||
resourceEncoding.SetVersionEncoding(group, storageEncodingVersion, unversioned.GroupVersion{Group: group, Version: runtime.APIVersionInternal})
|
||||
}
|
||||
|
||||
storageFactory := genericapiserver.NewDefaultStorageFactory(s.EtcdConfig, api.Codecs, resourceEncoding, apiResourceConfigSource)
|
||||
for _, override := range s.EtcdServersOverrides {
|
||||
tokens := strings.Split(override, "#")
|
||||
if len(tokens) != 2 {
|
||||
glog.Errorf("invalid value of etcd server overrides: %s", override)
|
||||
continue
|
||||
}
|
||||
|
||||
apiresource := strings.Split(tokens[0], "/")
|
||||
if len(apiresource) != 2 {
|
||||
glog.Errorf("invalid resource definition: %s", tokens[0])
|
||||
continue
|
||||
}
|
||||
group := apiresource[0]
|
||||
resource := apiresource[1]
|
||||
groupResource := unversioned.GroupResource{Group: group, Resource: resource}
|
||||
|
||||
servers := strings.Split(tokens[1], ";")
|
||||
storageFactory.SetEtcdLocation(groupResource, servers)
|
||||
}
|
||||
|
||||
authenticator, err := authenticator.New(authenticator.AuthenticatorConfig{
|
||||
BasicAuthFile: s.BasicAuthFile,
|
||||
ClientCAFile: s.ClientCAFile,
|
||||
@ -276,13 +264,9 @@ func Run(s *options.APIServer) error {
|
||||
}
|
||||
}
|
||||
|
||||
storageDestinations := genericapiserver.NewStorageDestinations()
|
||||
storageVersions := s.StorageGroupsToGroupVersions()
|
||||
|
||||
config := &master.Config{
|
||||
Config: &genericapiserver.Config{
|
||||
StorageDestinations: storageDestinations,
|
||||
StorageVersions: storageVersions,
|
||||
StorageFactory: storageFactory,
|
||||
ServiceClusterIPRange: &n,
|
||||
EnableLogsSupport: s.EnableLogsSupport,
|
||||
EnableUISupport: true,
|
||||
|
@ -43,7 +43,6 @@ import (
|
||||
genericetcd "k8s.io/kubernetes/pkg/registry/generic/etcd"
|
||||
ipallocator "k8s.io/kubernetes/pkg/registry/service/ipallocator"
|
||||
"k8s.io/kubernetes/pkg/runtime"
|
||||
"k8s.io/kubernetes/pkg/storage"
|
||||
"k8s.io/kubernetes/pkg/ui"
|
||||
"k8s.io/kubernetes/pkg/util"
|
||||
"k8s.io/kubernetes/pkg/util/crypto"
|
||||
@ -55,7 +54,6 @@ import (
|
||||
"github.com/emicklei/go-restful"
|
||||
"github.com/emicklei/go-restful/swagger"
|
||||
"github.com/golang/glog"
|
||||
"golang.org/x/net/context"
|
||||
)
|
||||
|
||||
const (
|
||||
@ -64,107 +62,6 @@ const (
|
||||
globalTimeout = time.Minute
|
||||
)
|
||||
|
||||
// StorageDestinations is a mapping from API group & resource to
|
||||
// the underlying storage interfaces.
|
||||
type StorageDestinations struct {
|
||||
APIGroups map[string]*StorageDestinationsForAPIGroup
|
||||
}
|
||||
|
||||
type StorageDestinationsForAPIGroup struct {
|
||||
Default storage.Interface
|
||||
Overrides map[string]storage.Interface
|
||||
}
|
||||
|
||||
func NewStorageDestinations() StorageDestinations {
|
||||
return StorageDestinations{
|
||||
APIGroups: map[string]*StorageDestinationsForAPIGroup{},
|
||||
}
|
||||
}
|
||||
|
||||
// AddAPIGroup replaces 'group' if it's already registered.
|
||||
func (s *StorageDestinations) AddAPIGroup(group string, defaultStorage storage.Interface) {
|
||||
glog.Infof("Adding storage destination for group %v", group)
|
||||
s.APIGroups[group] = &StorageDestinationsForAPIGroup{
|
||||
Default: defaultStorage,
|
||||
Overrides: map[string]storage.Interface{},
|
||||
}
|
||||
}
|
||||
|
||||
func (s *StorageDestinations) AddStorageOverride(group, resource string, override storage.Interface) {
|
||||
if _, ok := s.APIGroups[group]; !ok {
|
||||
s.AddAPIGroup(group, nil)
|
||||
}
|
||||
if s.APIGroups[group].Overrides == nil {
|
||||
s.APIGroups[group].Overrides = map[string]storage.Interface{}
|
||||
}
|
||||
s.APIGroups[group].Overrides[resource] = override
|
||||
}
|
||||
|
||||
// Get finds the storage destination for the given group and resource. It will
|
||||
// Fatalf if the group has no storage destination configured.
|
||||
func (s *StorageDestinations) Get(group, resource string) storage.Interface {
|
||||
apigroup, ok := s.APIGroups[group]
|
||||
if !ok {
|
||||
// TODO: return an error like a normal function. For now,
|
||||
// Fatalf is better than just logging an error, because this
|
||||
// condition guarantees future problems and this is a less
|
||||
// mysterious failure point.
|
||||
glog.Fatalf("No storage defined for API group: '%s'. Defined groups: %#v", group, s.APIGroups)
|
||||
return nil
|
||||
}
|
||||
if apigroup.Overrides != nil {
|
||||
if client, exists := apigroup.Overrides[resource]; exists {
|
||||
return client
|
||||
}
|
||||
}
|
||||
return apigroup.Default
|
||||
}
|
||||
|
||||
// Search is like Get, but can be used to search a list of groups. It tries the
|
||||
// groups in order (and Fatalf's if none of them exist). The intention is for
|
||||
// this to be used for resources that move between groups.
|
||||
func (s *StorageDestinations) Search(groups []string, resource string) storage.Interface {
|
||||
for _, group := range groups {
|
||||
apigroup, ok := s.APIGroups[group]
|
||||
if !ok {
|
||||
continue
|
||||
}
|
||||
if apigroup.Overrides != nil {
|
||||
if client, exists := apigroup.Overrides[resource]; exists {
|
||||
return client
|
||||
}
|
||||
}
|
||||
return apigroup.Default
|
||||
}
|
||||
// TODO: return an error like a normal function. For now,
|
||||
// Fatalf is better than just logging an error, because this
|
||||
// condition guarantees future problems and this is a less
|
||||
// mysterious failure point.
|
||||
glog.Fatalf("No storage defined for any of the groups: %v. Defined groups: %#v", groups, s.APIGroups)
|
||||
return nil
|
||||
}
|
||||
|
||||
// Get all backends for all registered storage destinations.
|
||||
// Used for getting all instances for health validations.
|
||||
func (s *StorageDestinations) Backends() []string {
|
||||
backends := sets.String{}
|
||||
for _, group := range s.APIGroups {
|
||||
if group.Default != nil {
|
||||
for _, backend := range group.Default.Backends(context.TODO()) {
|
||||
backends.Insert(backend)
|
||||
}
|
||||
}
|
||||
if group.Overrides != nil {
|
||||
for _, storage := range group.Overrides {
|
||||
for _, backend := range storage.Backends(context.TODO()) {
|
||||
backends.Insert(backend)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
return backends.List()
|
||||
}
|
||||
|
||||
// Info about an API group.
|
||||
type APIGroupInfo struct {
|
||||
GroupMeta apimachinery.GroupMeta
|
||||
@ -199,9 +96,7 @@ type APIGroupInfo struct {
|
||||
|
||||
// Config is a structure used to configure a GenericAPIServer.
|
||||
type Config struct {
|
||||
StorageDestinations StorageDestinations
|
||||
// StorageVersions is a map between groups and their storage versions
|
||||
StorageVersions map[string]string
|
||||
StorageFactory StorageFactory
|
||||
// allow downstream consumers to disable the core controller loops
|
||||
EnableLogsSupport bool
|
||||
EnableUISupport bool
|
||||
|
@ -32,7 +32,6 @@ import (
|
||||
"k8s.io/kubernetes/pkg/api/rest"
|
||||
"k8s.io/kubernetes/pkg/api/testapi"
|
||||
"k8s.io/kubernetes/pkg/api/unversioned"
|
||||
apiutil "k8s.io/kubernetes/pkg/api/util"
|
||||
|
||||
"k8s.io/kubernetes/pkg/apimachinery/registered"
|
||||
"k8s.io/kubernetes/pkg/apis/extensions"
|
||||
@ -266,7 +265,7 @@ func getGroupList(server *httptest.Server) (*unversioned.APIGroupList, error) {
|
||||
}
|
||||
|
||||
func TestDiscoveryAtAPIS(t *testing.T) {
|
||||
master, etcdserver, config, assert := newMaster(t)
|
||||
master, etcdserver, _, assert := newMaster(t)
|
||||
defer etcdserver.Terminate(t)
|
||||
|
||||
server := httptest.NewServer(master.HandlerContainer.ServeMux)
|
||||
@ -277,7 +276,6 @@ func TestDiscoveryAtAPIS(t *testing.T) {
|
||||
assert.Equal(0, len(groupList.Groups))
|
||||
|
||||
// Add a Group.
|
||||
extensionsGroupName := extensions.GroupName
|
||||
extensionsVersions := []unversioned.GroupVersionForDiscovery{
|
||||
{
|
||||
GroupVersion: testapi.Extensions.GroupVersion().String(),
|
||||
@ -285,11 +283,11 @@ func TestDiscoveryAtAPIS(t *testing.T) {
|
||||
},
|
||||
}
|
||||
extensionsPreferredVersion := unversioned.GroupVersionForDiscovery{
|
||||
GroupVersion: config.StorageVersions[extensions.GroupName],
|
||||
Version: apiutil.GetVersion(config.StorageVersions[extensions.GroupName]),
|
||||
GroupVersion: extensions.GroupName + "/preferred",
|
||||
Version: "preferred",
|
||||
}
|
||||
master.AddAPIGroupForDiscovery(unversioned.APIGroup{
|
||||
Name: extensionsGroupName,
|
||||
Name: extensions.GroupName,
|
||||
Versions: extensionsVersions,
|
||||
PreferredVersion: extensionsPreferredVersion,
|
||||
})
|
||||
@ -301,13 +299,13 @@ func TestDiscoveryAtAPIS(t *testing.T) {
|
||||
|
||||
assert.Equal(1, len(groupList.Groups))
|
||||
groupListGroup := groupList.Groups[0]
|
||||
assert.Equal(extensionsGroupName, groupListGroup.Name)
|
||||
assert.Equal(extensions.GroupName, groupListGroup.Name)
|
||||
assert.Equal(extensionsVersions, groupListGroup.Versions)
|
||||
assert.Equal(extensionsPreferredVersion, groupListGroup.PreferredVersion)
|
||||
assert.Equal(master.getServerAddressByClientCIDRs(&http.Request{}), groupListGroup.ServerAddressByClientCIDRs)
|
||||
|
||||
// Remove the group.
|
||||
master.RemoveAPIGroupForDiscovery(extensionsGroupName)
|
||||
master.RemoveAPIGroupForDiscovery(extensions.GroupName)
|
||||
groupList, err = getGroupList(server)
|
||||
if err != nil {
|
||||
t.Fatalf("unexpected error: %v", err)
|
||||
|
118
pkg/genericapiserver/resource_encoding_config.go
Normal file
118
pkg/genericapiserver/resource_encoding_config.go
Normal file
@ -0,0 +1,118 @@
|
||||
/*
|
||||
Copyright 2016 The Kubernetes Authors All rights reserved.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package genericapiserver
|
||||
|
||||
import (
|
||||
"k8s.io/kubernetes/pkg/api/unversioned"
|
||||
"k8s.io/kubernetes/pkg/apimachinery/registered"
|
||||
"k8s.io/kubernetes/pkg/runtime"
|
||||
)
|
||||
|
||||
type ResourceEncodingConfig interface {
|
||||
// StorageEncoding returns the serialization format for the resource.
|
||||
// TODO this should actually return a GroupVersionKind since you can logically have multiple "matching" Kinds
|
||||
// For now, it returns just the GroupVersion for consistency with old behavior
|
||||
StoragageEncodingFor(unversioned.GroupResource) (unversioned.GroupVersion, error)
|
||||
|
||||
// InMemoryEncodingFor returns the groupVersion for the in memory representation the storage should convert to.
|
||||
InMemoryEncodingFor(unversioned.GroupResource) (unversioned.GroupVersion, error)
|
||||
}
|
||||
|
||||
type DefaultResourceEncodingConfig struct {
|
||||
Groups map[string]*GroupResourceEncodingConfig
|
||||
}
|
||||
|
||||
type GroupResourceEncodingConfig struct {
|
||||
DefaultExternalEncoding unversioned.GroupVersion
|
||||
ExternalResourceEncodings map[string]unversioned.GroupVersion
|
||||
|
||||
DefaultInternalEncoding unversioned.GroupVersion
|
||||
InternalResourceEncodings map[string]unversioned.GroupVersion
|
||||
}
|
||||
|
||||
var _ ResourceEncodingConfig = &DefaultResourceEncodingConfig{}
|
||||
|
||||
func NewDefaultResourceEncodingConfig() *DefaultResourceEncodingConfig {
|
||||
return &DefaultResourceEncodingConfig{Groups: map[string]*GroupResourceEncodingConfig{}}
|
||||
}
|
||||
|
||||
func newGroupResourceEncodingConfig(defaultEncoding, defaultInternalVersion unversioned.GroupVersion) *GroupResourceEncodingConfig {
|
||||
return &GroupResourceEncodingConfig{
|
||||
DefaultExternalEncoding: defaultEncoding, ExternalResourceEncodings: map[string]unversioned.GroupVersion{},
|
||||
DefaultInternalEncoding: defaultInternalVersion, InternalResourceEncodings: map[string]unversioned.GroupVersion{},
|
||||
}
|
||||
}
|
||||
|
||||
func (o *DefaultResourceEncodingConfig) SetVersionEncoding(group string, externalEncodingVersion, internalVersion unversioned.GroupVersion) {
|
||||
_, groupExists := o.Groups[group]
|
||||
if !groupExists {
|
||||
o.Groups[group] = newGroupResourceEncodingConfig(externalEncodingVersion, internalVersion)
|
||||
}
|
||||
|
||||
o.Groups[group].DefaultExternalEncoding = externalEncodingVersion
|
||||
o.Groups[group].DefaultInternalEncoding = internalVersion
|
||||
}
|
||||
|
||||
func (o *DefaultResourceEncodingConfig) SetResourceEncoding(resourceBeingStored unversioned.GroupResource, externalEncodingVersion, internalVersion unversioned.GroupVersion) {
|
||||
group := resourceBeingStored.Group
|
||||
_, groupExists := o.Groups[group]
|
||||
if !groupExists {
|
||||
o.Groups[group] = newGroupResourceEncodingConfig(externalEncodingVersion, internalVersion)
|
||||
}
|
||||
|
||||
o.Groups[group].ExternalResourceEncodings[resourceBeingStored.Resource] = externalEncodingVersion
|
||||
o.Groups[group].InternalResourceEncodings[resourceBeingStored.Resource] = internalVersion
|
||||
}
|
||||
|
||||
func (o *DefaultResourceEncodingConfig) StoragageEncodingFor(resource unversioned.GroupResource) (unversioned.GroupVersion, error) {
|
||||
groupMeta, err := registered.Group(resource.Group)
|
||||
if err != nil {
|
||||
return unversioned.GroupVersion{}, err
|
||||
}
|
||||
|
||||
groupEncoding, groupExists := o.Groups[resource.Group]
|
||||
|
||||
if !groupExists {
|
||||
// return the most preferred external version for the group
|
||||
return groupMeta.GroupVersion, nil
|
||||
}
|
||||
|
||||
resourceOverride, resourceExists := groupEncoding.ExternalResourceEncodings[resource.Resource]
|
||||
if !resourceExists {
|
||||
return groupEncoding.DefaultExternalEncoding, nil
|
||||
}
|
||||
|
||||
return resourceOverride, nil
|
||||
}
|
||||
|
||||
func (o *DefaultResourceEncodingConfig) InMemoryEncodingFor(resource unversioned.GroupResource) (unversioned.GroupVersion, error) {
|
||||
if _, err := registered.Group(resource.Group); err != nil {
|
||||
return unversioned.GroupVersion{}, err
|
||||
}
|
||||
|
||||
groupEncoding, groupExists := o.Groups[resource.Group]
|
||||
if !groupExists {
|
||||
return unversioned.GroupVersion{Group: resource.Group, Version: runtime.APIVersionInternal}, nil
|
||||
}
|
||||
|
||||
resourceOverride, resourceExists := groupEncoding.InternalResourceEncodings[resource.Resource]
|
||||
if !resourceExists {
|
||||
return groupEncoding.DefaultInternalEncoding, nil
|
||||
}
|
||||
|
||||
return resourceOverride, nil
|
||||
}
|
221
pkg/genericapiserver/storage_factory.go
Normal file
221
pkg/genericapiserver/storage_factory.go
Normal file
@ -0,0 +1,221 @@
|
||||
/*
|
||||
Copyright 2016 The Kubernetes Authors All rights reserved.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package genericapiserver
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
|
||||
"k8s.io/kubernetes/pkg/api/unversioned"
|
||||
"k8s.io/kubernetes/pkg/runtime"
|
||||
"k8s.io/kubernetes/pkg/runtime/serializer/versioning"
|
||||
"k8s.io/kubernetes/pkg/storage"
|
||||
etcdstorage "k8s.io/kubernetes/pkg/storage/etcd"
|
||||
"k8s.io/kubernetes/pkg/util/sets"
|
||||
|
||||
"github.com/golang/glog"
|
||||
)
|
||||
|
||||
// StorageFactory is the interface to locate the storage for a given GroupResource
|
||||
type StorageFactory interface {
|
||||
// New finds the storage destination for the given group and resource. It will
|
||||
// return an error if the group has no storage destination configured.
|
||||
New(groupResource unversioned.GroupResource) (storage.Interface, error)
|
||||
// Backends gets all backends for all registered storage destinations.
|
||||
// Used for getting all instances for health validations.
|
||||
Backends() []string
|
||||
}
|
||||
|
||||
// DefaultStorageFactory takes a GroupResource and returns back its storage interface. This result includes:
|
||||
// 1. Merged etcd config, including: auth, server locations, prefixes
|
||||
// 2. Resource encodings for storage: group,version,kind to store as
|
||||
// 3. Cohabitating default: some resources like hpa are exposed through multiple APIs. They must agree on 1 and 2
|
||||
type DefaultStorageFactory struct {
|
||||
// DefaultEtcdConfig describes how to connect to etcd in general. It's authentication information will be used for
|
||||
// every storage.Interface returned.
|
||||
DefaultEtcdConfig etcdstorage.EtcdConfig
|
||||
|
||||
Overrides map[unversioned.GroupResource]groupResourceOverrides
|
||||
|
||||
// DefaultSerializer is used to create encoders and decoders for the storage.Interface.
|
||||
DefaultSerializer runtime.NegotiatedSerializer
|
||||
|
||||
// ResourceEncodingConfig describes how to encode a particular GroupVersionResource
|
||||
ResourceEncodingConfig ResourceEncodingConfig
|
||||
|
||||
// APIResourceConfigSource indicates whether the *storage* is enabled, NOT the API
|
||||
// This is discrete from resource enablement because those are separate concerns. How it is surfaced to the user via flags
|
||||
// or config is up to whoever is building this.
|
||||
APIResourceConfigSource APIResourceConfigSource
|
||||
|
||||
// newEtcdFn exists to be overwritten for unit testing. You should never set this in a normal world.
|
||||
newEtcdFn func(ns runtime.NegotiatedSerializer, storageVersion, memoryVersion unversioned.GroupVersion, etcdConfig etcdstorage.EtcdConfig) (etcdStorage storage.Interface, err error)
|
||||
}
|
||||
|
||||
type groupResourceOverrides struct {
|
||||
// etcdLocation contains the list of "special" locations that are used for particular GroupResources
|
||||
// These are merged on top of the default DefaultEtcdConfig when requesting the storage.Interface for a given GroupResource
|
||||
etcdLocation []string
|
||||
// etcdPrefix contains the list of "special" prefixes for a GroupResource. Resource=* means for the entire group
|
||||
etcdPrefix string
|
||||
// serializer contains the list of "special" serializers for a GroupResource. Resource=* means for the entire group
|
||||
serializer runtime.NegotiatedSerializer
|
||||
// cohabitatingResources keeps track of which resources must be stored together. This happens when we have multiple ways
|
||||
// of exposing one set of concepts. autoscaling.HPA and extensions.HPA as a for instance
|
||||
// The order of the slice matters! It is the priority order of lookup for finding a storage location
|
||||
cohabitatingResources []unversioned.GroupResource
|
||||
}
|
||||
|
||||
var _ StorageFactory = &DefaultStorageFactory{}
|
||||
|
||||
const AllResources = "*"
|
||||
|
||||
func NewDefaultStorageFactory(defaultEtcdConfig etcdstorage.EtcdConfig, defaultSerializer runtime.NegotiatedSerializer, resourceEncodingConfig ResourceEncodingConfig, resourceConfig APIResourceConfigSource) *DefaultStorageFactory {
|
||||
return &DefaultStorageFactory{
|
||||
DefaultEtcdConfig: defaultEtcdConfig,
|
||||
Overrides: map[unversioned.GroupResource]groupResourceOverrides{},
|
||||
DefaultSerializer: defaultSerializer,
|
||||
ResourceEncodingConfig: resourceEncodingConfig,
|
||||
APIResourceConfigSource: resourceConfig,
|
||||
|
||||
newEtcdFn: newEtcd,
|
||||
}
|
||||
}
|
||||
|
||||
func (s *DefaultStorageFactory) SetEtcdLocation(groupResource unversioned.GroupResource, location []string) {
|
||||
overrides := s.Overrides[groupResource]
|
||||
overrides.etcdLocation = location
|
||||
s.Overrides[groupResource] = overrides
|
||||
}
|
||||
|
||||
func (s *DefaultStorageFactory) SetEtcdPrefix(groupResource unversioned.GroupResource, prefix string) {
|
||||
overrides := s.Overrides[groupResource]
|
||||
overrides.etcdPrefix = prefix
|
||||
s.Overrides[groupResource] = overrides
|
||||
}
|
||||
|
||||
func (s *DefaultStorageFactory) SetSerializer(groupResource unversioned.GroupResource, serializer runtime.NegotiatedSerializer) {
|
||||
overrides := s.Overrides[groupResource]
|
||||
overrides.serializer = serializer
|
||||
s.Overrides[groupResource] = overrides
|
||||
}
|
||||
|
||||
// AddCohabitatingResources links resources together the order of the slice matters! its the priority order of lookup for finding a storage location
|
||||
func (s *DefaultStorageFactory) AddCohabitatingResources(groupResources ...unversioned.GroupResource) {
|
||||
for _, groupResource := range groupResources {
|
||||
overrides := s.Overrides[groupResource]
|
||||
overrides.cohabitatingResources = groupResources
|
||||
s.Overrides[groupResource] = overrides
|
||||
}
|
||||
}
|
||||
|
||||
func getAllResourcesAlias(resource unversioned.GroupResource) unversioned.GroupResource {
|
||||
return unversioned.GroupResource{Group: resource.Group, Resource: AllResources}
|
||||
}
|
||||
|
||||
func (s *DefaultStorageFactory) getStorageGroupResource(groupResource unversioned.GroupResource) unversioned.GroupResource {
|
||||
for _, potentialStorageResource := range s.Overrides[groupResource].cohabitatingResources {
|
||||
if s.APIResourceConfigSource.AnyVersionOfResourceEnabled(potentialStorageResource) {
|
||||
return potentialStorageResource
|
||||
}
|
||||
}
|
||||
|
||||
return groupResource
|
||||
}
|
||||
|
||||
// New finds the storage destination for the given group and resource. It will
|
||||
// return an error if the group has no storage destination configured.
|
||||
func (s *DefaultStorageFactory) New(groupResource unversioned.GroupResource) (storage.Interface, error) {
|
||||
chosenStorageResource := s.getStorageGroupResource(groupResource)
|
||||
|
||||
groupOverride := s.Overrides[getAllResourcesAlias(chosenStorageResource)]
|
||||
exactResourceOverride := s.Overrides[chosenStorageResource]
|
||||
|
||||
overriddenEtcdLocations := []string{}
|
||||
if len(groupOverride.etcdLocation) > 0 {
|
||||
overriddenEtcdLocations = groupOverride.etcdLocation
|
||||
}
|
||||
if len(exactResourceOverride.etcdLocation) > 0 {
|
||||
overriddenEtcdLocations = exactResourceOverride.etcdLocation
|
||||
}
|
||||
|
||||
etcdPrefix := s.DefaultEtcdConfig.Prefix
|
||||
if len(groupOverride.etcdPrefix) > 0 {
|
||||
etcdPrefix = groupOverride.etcdPrefix
|
||||
}
|
||||
if len(exactResourceOverride.etcdPrefix) > 0 {
|
||||
etcdPrefix = exactResourceOverride.etcdPrefix
|
||||
}
|
||||
|
||||
etcdSerializer := s.DefaultSerializer
|
||||
if groupOverride.serializer != nil {
|
||||
etcdSerializer = groupOverride.serializer
|
||||
}
|
||||
if exactResourceOverride.serializer != nil {
|
||||
etcdSerializer = exactResourceOverride.serializer
|
||||
}
|
||||
// operate on copy
|
||||
etcdConfig := s.DefaultEtcdConfig
|
||||
etcdConfig.Prefix = etcdPrefix
|
||||
if len(overriddenEtcdLocations) > 0 {
|
||||
etcdConfig.ServerList = overriddenEtcdLocations
|
||||
}
|
||||
|
||||
storageEncodingVersion, err := s.ResourceEncodingConfig.StoragageEncodingFor(chosenStorageResource)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
internalVersion, err := s.ResourceEncodingConfig.InMemoryEncodingFor(groupResource)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
glog.V(3).Infof("storing %v in %v, reading as %v from %v", groupResource, storageEncodingVersion, internalVersion, etcdConfig)
|
||||
return s.newEtcdFn(etcdSerializer, storageEncodingVersion, internalVersion, etcdConfig)
|
||||
}
|
||||
|
||||
func newEtcd(ns runtime.NegotiatedSerializer, storageVersion, memoryVersion unversioned.GroupVersion, etcdConfig etcdstorage.EtcdConfig) (etcdStorage storage.Interface, err error) {
|
||||
var storageConfig etcdstorage.EtcdStorageConfig
|
||||
storageConfig.Config = etcdConfig
|
||||
s, ok := ns.SerializerForMediaType("application/json", nil)
|
||||
if !ok {
|
||||
return nil, fmt.Errorf("unable to find serializer for JSON")
|
||||
}
|
||||
encoder := ns.EncoderForVersion(s, storageVersion)
|
||||
decoder := ns.DecoderToVersion(s, memoryVersion)
|
||||
if memoryVersion.Group != storageVersion.Group {
|
||||
// Allow this codec to translate between groups.
|
||||
if err = versioning.EnableCrossGroupEncoding(encoder, memoryVersion.Group, storageVersion.Group); err != nil {
|
||||
return nil, fmt.Errorf("error setting up encoder from %v to %v: %v", memoryVersion, storageVersion, err)
|
||||
}
|
||||
if err = versioning.EnableCrossGroupDecoding(decoder, storageVersion.Group, memoryVersion.Group); err != nil {
|
||||
return nil, fmt.Errorf("error setting up decoder from %v to %v: %v", storageVersion, memoryVersion, err)
|
||||
}
|
||||
}
|
||||
storageConfig.Codec = runtime.NewCodec(encoder, decoder)
|
||||
return storageConfig.NewStorage()
|
||||
}
|
||||
|
||||
// Get all backends for all registered storage destinations.
|
||||
// Used for getting all instances for health validations.
|
||||
func (s *DefaultStorageFactory) Backends() []string {
|
||||
backends := sets.NewString(s.DefaultEtcdConfig.ServerList...)
|
||||
|
||||
for _, overrides := range s.Overrides {
|
||||
backends.Insert(overrides.etcdLocation...)
|
||||
}
|
||||
return backends.List()
|
||||
}
|
88
pkg/genericapiserver/storage_factory_test.go
Normal file
88
pkg/genericapiserver/storage_factory_test.go
Normal file
@ -0,0 +1,88 @@
|
||||
/*
|
||||
Copyright 2016 The Kubernetes Authors All rights reserved.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package genericapiserver
|
||||
|
||||
import (
|
||||
"reflect"
|
||||
"testing"
|
||||
|
||||
"k8s.io/kubernetes/pkg/api"
|
||||
"k8s.io/kubernetes/pkg/api/unversioned"
|
||||
"k8s.io/kubernetes/pkg/apis/extensions"
|
||||
"k8s.io/kubernetes/pkg/runtime"
|
||||
"k8s.io/kubernetes/pkg/storage"
|
||||
etcdstorage "k8s.io/kubernetes/pkg/storage/etcd"
|
||||
)
|
||||
|
||||
func TestUpdateEtcdOverrides(t *testing.T) {
|
||||
testCases := []struct {
|
||||
resource unversioned.GroupResource
|
||||
servers []string
|
||||
}{
|
||||
{
|
||||
resource: unversioned.GroupResource{Group: api.GroupName, Resource: "resource"},
|
||||
servers: []string{"http://127.0.0.1:10000"},
|
||||
},
|
||||
{
|
||||
resource: unversioned.GroupResource{Group: api.GroupName, Resource: "resource"},
|
||||
servers: []string{"http://127.0.0.1:10000", "http://127.0.0.1:20000"},
|
||||
},
|
||||
{
|
||||
resource: unversioned.GroupResource{Group: extensions.GroupName, Resource: "resource"},
|
||||
servers: []string{"http://127.0.0.1:10000"},
|
||||
},
|
||||
}
|
||||
|
||||
defaultEtcdLocation := []string{"http://127.0.0.1"}
|
||||
for i, test := range testCases {
|
||||
actualEtcdConfig := etcdstorage.EtcdConfig{}
|
||||
newEtcdFn := func(ns runtime.NegotiatedSerializer, storageVersion, memoryVersion unversioned.GroupVersion, etcdConfig etcdstorage.EtcdConfig) (etcdStorage storage.Interface, err error) {
|
||||
actualEtcdConfig = etcdConfig
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
defaultEtcdConfig := etcdstorage.EtcdConfig{
|
||||
Prefix: DefaultEtcdPathPrefix,
|
||||
ServerList: defaultEtcdLocation,
|
||||
}
|
||||
storageFactory := NewDefaultStorageFactory(defaultEtcdConfig, api.Codecs, NewDefaultResourceEncodingConfig(), NewResourceConfig())
|
||||
storageFactory.newEtcdFn = newEtcdFn
|
||||
storageFactory.SetEtcdLocation(test.resource, test.servers)
|
||||
|
||||
var err error
|
||||
_, err = storageFactory.New(test.resource)
|
||||
if err != nil {
|
||||
t.Errorf("%d: unexpected error %v", i, err)
|
||||
continue
|
||||
}
|
||||
if !reflect.DeepEqual(actualEtcdConfig.ServerList, test.servers) {
|
||||
t.Errorf("%d: expected %v, got %v", i, test.servers, actualEtcdConfig.ServerList)
|
||||
continue
|
||||
}
|
||||
|
||||
_, err = storageFactory.New(unversioned.GroupResource{Group: api.GroupName, Resource: "unlikely"})
|
||||
if err != nil {
|
||||
t.Errorf("%d: unexpected error %v", i, err)
|
||||
continue
|
||||
}
|
||||
if !reflect.DeepEqual(actualEtcdConfig.ServerList, defaultEtcdLocation) {
|
||||
t.Errorf("%d: expected %v, got %v", i, defaultEtcdLocation, actualEtcdConfig.ServerList)
|
||||
continue
|
||||
}
|
||||
|
||||
}
|
||||
}
|
@ -245,21 +245,15 @@ func (m *Master) InstallAPIs(c *Config) {
|
||||
|
||||
// Install extensions unless disabled.
|
||||
if c.APIResourceConfigSource.AnyResourcesForVersionEnabled(extensionsapiv1beta1.SchemeGroupVersion) {
|
||||
m.thirdPartyStorage = c.StorageDestinations.APIGroups[extensions.GroupName].Default
|
||||
var err error
|
||||
m.thirdPartyStorage, err = c.StorageFactory.New(extensions.Resource("thirdpartyresources"))
|
||||
if err != nil {
|
||||
glog.Fatalf("Error getting third party storage: %v", err)
|
||||
}
|
||||
m.thirdPartyResources = map[string]thirdPartyEntry{}
|
||||
|
||||
extensionResources := m.getExtensionResources(c)
|
||||
extensionsGroupMeta := registered.GroupOrDie(extensions.GroupName)
|
||||
// Update the preferred version as per StorageVersions in the config.
|
||||
storageVersion, found := c.StorageVersions[extensionsGroupMeta.GroupVersion.Group]
|
||||
if !found {
|
||||
glog.Fatalf("Couldn't find storage version of group %v", extensionsGroupMeta.GroupVersion.Group)
|
||||
}
|
||||
preferedGroupVersion, err := unversioned.ParseGroupVersion(storageVersion)
|
||||
if err != nil {
|
||||
glog.Fatalf("Error in parsing group version %s: %v", storageVersion, err)
|
||||
}
|
||||
extensionsGroupMeta.GroupVersion = preferedGroupVersion
|
||||
|
||||
apiGroupInfo := genericapiserver.APIGroupInfo{
|
||||
GroupMeta: *extensionsGroupMeta,
|
||||
@ -390,13 +384,8 @@ func (m *Master) InstallAPIs(c *Config) {
|
||||
}
|
||||
|
||||
func (m *Master) initV1ResourcesStorage(c *Config) {
|
||||
dbClient := func(resource string) storage.Interface { return c.StorageDestinations.Get("", resource) }
|
||||
restOptions := func(resource string) generic.RESTOptions {
|
||||
return generic.RESTOptions{
|
||||
Storage: dbClient(resource),
|
||||
Decorator: m.StorageDecorator(),
|
||||
DeleteCollectionWorkers: m.deleteCollectionWorkers,
|
||||
}
|
||||
return m.GetRESTOptionsOrDie(c, api.Resource(resource))
|
||||
}
|
||||
|
||||
podTemplateStorage := podtemplateetcd.NewREST(restOptions("podTemplates"))
|
||||
@ -426,8 +415,8 @@ func (m *Master) initV1ResourcesStorage(c *Config) {
|
||||
m.ProxyTransport,
|
||||
)
|
||||
|
||||
serviceStorage, serviceStatusStorage := serviceetcd.NewREST(restOptions("services"))
|
||||
m.serviceRegistry = service.NewRegistry(serviceStorage)
|
||||
serviceRESTStorage, serviceStatusStorage := serviceetcd.NewREST(restOptions("services"))
|
||||
m.serviceRegistry = service.NewRegistry(serviceRESTStorage)
|
||||
|
||||
var serviceClusterIPRegistry service.RangeRegistry
|
||||
serviceClusterIPRange := m.ServiceClusterIPRange
|
||||
@ -435,9 +424,16 @@ func (m *Master) initV1ResourcesStorage(c *Config) {
|
||||
glog.Fatalf("service clusterIPRange is nil")
|
||||
return
|
||||
}
|
||||
|
||||
serviceStorage, err := c.StorageFactory.New(api.Resource("services"))
|
||||
if err != nil {
|
||||
glog.Fatal(err.Error())
|
||||
}
|
||||
|
||||
serviceClusterIPAllocator := ipallocator.NewAllocatorCIDRRange(serviceClusterIPRange, func(max int, rangeSpec string) allocator.Interface {
|
||||
mem := allocator.NewAllocationMap(max, rangeSpec)
|
||||
etcd := etcdallocator.NewEtcd(mem, "/ranges/serviceips", api.Resource("serviceipallocations"), dbClient("services"))
|
||||
// TODO etcdallocator package to return a storage interface via the storageFactory
|
||||
etcd := etcdallocator.NewEtcd(mem, "/ranges/serviceips", api.Resource("serviceipallocations"), serviceStorage)
|
||||
serviceClusterIPRegistry = etcd
|
||||
return etcd
|
||||
})
|
||||
@ -446,7 +442,8 @@ func (m *Master) initV1ResourcesStorage(c *Config) {
|
||||
var serviceNodePortRegistry service.RangeRegistry
|
||||
serviceNodePortAllocator := portallocator.NewPortAllocatorCustom(m.ServiceNodePortRange, func(max int, rangeSpec string) allocator.Interface {
|
||||
mem := allocator.NewAllocationMap(max, rangeSpec)
|
||||
etcd := etcdallocator.NewEtcd(mem, "/ranges/servicenodeports", api.Resource("servicenodeportallocations"), dbClient("services"))
|
||||
// TODO etcdallocator package to return a storage interface via the storageFactory
|
||||
etcd := etcdallocator.NewEtcd(mem, "/ranges/servicenodeports", api.Resource("servicenodeportallocations"), serviceStorage)
|
||||
serviceNodePortRegistry = etcd
|
||||
return etcd
|
||||
})
|
||||
@ -541,7 +538,7 @@ func (m *Master) getServersToValidate(c *Config) map[string]apiserver.Server {
|
||||
"scheduler": {Addr: "127.0.0.1", Port: ports.SchedulerPort, Path: "/healthz"},
|
||||
}
|
||||
|
||||
for ix, machine := range c.StorageDestinations.Backends() {
|
||||
for ix, machine := range c.StorageFactory.Backends() {
|
||||
etcdUrl, err := url.Parse(machine)
|
||||
if err != nil {
|
||||
glog.Errorf("Failed to parse etcd url for validation: %v", err)
|
||||
@ -726,24 +723,36 @@ func (m *Master) thirdpartyapi(group, kind, version string) *apiserver.APIGroupV
|
||||
}
|
||||
}
|
||||
|
||||
func (m *Master) GetRESTOptionsOrDie(c *Config, resource unversioned.GroupResource) generic.RESTOptions {
|
||||
storage, err := c.StorageFactory.New(resource)
|
||||
if err != nil {
|
||||
glog.Fatalf("Unable to find storage destination for %v, due to %v", resource, err.Error())
|
||||
}
|
||||
|
||||
return generic.RESTOptions{
|
||||
Storage: storage,
|
||||
Decorator: m.StorageDecorator(),
|
||||
DeleteCollectionWorkers: m.deleteCollectionWorkers,
|
||||
}
|
||||
}
|
||||
|
||||
// getExperimentalResources returns the resources for extensions api
|
||||
func (m *Master) getExtensionResources(c *Config) map[string]rest.Storage {
|
||||
restOptions := func(resource string) generic.RESTOptions {
|
||||
return generic.RESTOptions{
|
||||
Storage: c.StorageDestinations.Get(extensions.GroupName, resource),
|
||||
Decorator: m.StorageDecorator(),
|
||||
DeleteCollectionWorkers: m.deleteCollectionWorkers,
|
||||
}
|
||||
return m.GetRESTOptionsOrDie(c, extensions.Resource(resource))
|
||||
}
|
||||
|
||||
// TODO update when we support more than one version of this group
|
||||
version := extensionsapiv1beta1.SchemeGroupVersion
|
||||
|
||||
storage := map[string]rest.Storage{}
|
||||
|
||||
if c.APIResourceConfigSource.ResourceEnabled(version.WithResource("horizontalpodautoscalers")) {
|
||||
m.constructHPAResources(c, storage)
|
||||
controllerStorage := expcontrolleretcd.NewStorage(
|
||||
generic.RESTOptions{Storage: c.StorageDestinations.Get("", "replicationControllers"), Decorator: m.StorageDecorator(), DeleteCollectionWorkers: m.deleteCollectionWorkers})
|
||||
hpaStorage, hpaStatusStorage := horizontalpodautoscaleretcd.NewREST(restOptions("horizontalpodautoscalers"))
|
||||
storage["horizontalpodautoscalers"] = hpaStorage
|
||||
storage["horizontalpodautoscalers/status"] = hpaStatusStorage
|
||||
|
||||
controllerStorage := expcontrolleretcd.NewStorage(m.GetRESTOptionsOrDie(c, api.Resource("replicationControllers")))
|
||||
storage["replicationcontrollers"] = controllerStorage.ReplicationController
|
||||
storage["replicationcontrollers/scale"] = controllerStorage.Scale
|
||||
}
|
||||
@ -776,7 +785,9 @@ func (m *Master) getExtensionResources(c *Config) map[string]rest.Storage {
|
||||
storage["deployments/scale"] = deploymentStorage.Scale
|
||||
}
|
||||
if c.APIResourceConfigSource.ResourceEnabled(version.WithResource("jobs")) {
|
||||
m.constructJobResources(c, storage)
|
||||
jobsStorage, jobsStatusStorage := jobetcd.NewREST(restOptions("jobs"))
|
||||
storage["jobs"] = jobsStorage
|
||||
storage["jobs/status"] = jobsStatusStorage
|
||||
}
|
||||
ingressStorage, ingressStatusStorage := ingressetcd.NewREST(restOptions("ingresses"))
|
||||
if c.APIResourceConfigSource.ResourceEnabled(version.WithResource("ingresses")) {
|
||||
@ -797,25 +808,6 @@ func (m *Master) getExtensionResources(c *Config) map[string]rest.Storage {
|
||||
return storage
|
||||
}
|
||||
|
||||
// constructHPAResources makes HPA resources and adds them to the storage map.
|
||||
// They're installed in both autoscaling and extensions. It's assumed that
|
||||
// you've already done the check that they should be on.
|
||||
func (m *Master) constructHPAResources(c *Config, restStorage map[string]rest.Storage) {
|
||||
// Note that hpa's storage settings are changed by changing the autoscaling
|
||||
// group. Clearly we want all hpas to be stored in the same place no
|
||||
// matter where they're accessed from.
|
||||
restOptions := func(resource string) generic.RESTOptions {
|
||||
return generic.RESTOptions{
|
||||
Storage: c.StorageDestinations.Search([]string{autoscaling.GroupName, extensions.GroupName}, resource),
|
||||
Decorator: m.StorageDecorator(),
|
||||
DeleteCollectionWorkers: m.deleteCollectionWorkers,
|
||||
}
|
||||
}
|
||||
autoscalerStorage, autoscalerStatusStorage := horizontalpodautoscaleretcd.NewREST(restOptions("horizontalpodautoscalers"))
|
||||
restStorage["horizontalpodautoscalers"] = autoscalerStorage
|
||||
restStorage["horizontalpodautoscalers/status"] = autoscalerStatusStorage
|
||||
}
|
||||
|
||||
// getAutoscalingResources returns the resources for autoscaling api
|
||||
func (m *Master) getAutoscalingResources(c *Config) map[string]rest.Storage {
|
||||
// TODO update when we support more than one version of this group
|
||||
@ -823,30 +815,13 @@ func (m *Master) getAutoscalingResources(c *Config) map[string]rest.Storage {
|
||||
|
||||
storage := map[string]rest.Storage{}
|
||||
if c.APIResourceConfigSource.ResourceEnabled(version.WithResource("horizontalpodautoscalers")) {
|
||||
m.constructHPAResources(c, storage)
|
||||
hpaStorage, hpaStatusStorage := horizontalpodautoscaleretcd.NewREST(m.GetRESTOptionsOrDie(c, autoscaling.Resource("horizontalpodautoscalers")))
|
||||
storage["horizontalpodautoscalers"] = hpaStorage
|
||||
storage["horizontalpodautoscalers/status"] = hpaStatusStorage
|
||||
}
|
||||
return storage
|
||||
}
|
||||
|
||||
// constructJobResources makes Job resources and adds them to the storage map.
|
||||
// They're installed in both batch and extensions. It's assumed that you've
|
||||
// already done the check that they should be on.
|
||||
func (m *Master) constructJobResources(c *Config, restStorage map[string]rest.Storage) {
|
||||
// Note that job's storage settings are changed by changing the batch
|
||||
// group. Clearly we want all jobs to be stored in the same place no
|
||||
// matter where they're accessed from.
|
||||
restOptions := func(resource string) generic.RESTOptions {
|
||||
return generic.RESTOptions{
|
||||
Storage: c.StorageDestinations.Search([]string{batch.GroupName, extensions.GroupName}, resource),
|
||||
Decorator: m.StorageDecorator(),
|
||||
DeleteCollectionWorkers: m.deleteCollectionWorkers,
|
||||
}
|
||||
}
|
||||
jobStorage, jobStatusStorage := jobetcd.NewREST(restOptions("jobs"))
|
||||
restStorage["jobs"] = jobStorage
|
||||
restStorage["jobs/status"] = jobStatusStorage
|
||||
}
|
||||
|
||||
// getBatchResources returns the resources for batch api
|
||||
func (m *Master) getBatchResources(c *Config) map[string]rest.Storage {
|
||||
// TODO update when we support more than one version of this group
|
||||
@ -854,26 +829,21 @@ func (m *Master) getBatchResources(c *Config) map[string]rest.Storage {
|
||||
|
||||
storage := map[string]rest.Storage{}
|
||||
if c.APIResourceConfigSource.ResourceEnabled(version.WithResource("jobs")) {
|
||||
m.constructJobResources(c, storage)
|
||||
jobsStorage, jobsStatusStorage := jobetcd.NewREST(m.GetRESTOptionsOrDie(c, batch.Resource("jobs")))
|
||||
storage["jobs"] = jobsStorage
|
||||
storage["jobs/status"] = jobsStatusStorage
|
||||
}
|
||||
return storage
|
||||
}
|
||||
|
||||
// getPetSetResources returns the resources for batch api
|
||||
// getPetSetResources returns the resources for apps api
|
||||
func (m *Master) getAppsResources(c *Config) map[string]rest.Storage {
|
||||
// TODO update when we support more than one version of this group
|
||||
version := appsapi.SchemeGroupVersion
|
||||
|
||||
storage := map[string]rest.Storage{}
|
||||
if c.APIResourceConfigSource.ResourceEnabled(version.WithResource("petsets")) {
|
||||
restOptions := func(resource string) generic.RESTOptions {
|
||||
return generic.RESTOptions{
|
||||
Storage: c.StorageDestinations.Get(apps.GroupName, resource),
|
||||
Decorator: m.StorageDecorator(),
|
||||
DeleteCollectionWorkers: m.deleteCollectionWorkers,
|
||||
}
|
||||
}
|
||||
petsetStorage, petsetStatusStorage := petsetetcd.NewREST(restOptions("petsets"))
|
||||
petsetStorage, petsetStatusStorage := petsetetcd.NewREST(m.GetRESTOptionsOrDie(c, apps.Resource("petsets")))
|
||||
storage["petsets"] = petsetStorage
|
||||
storage["petsets/status"] = petsetStatusStorage
|
||||
}
|
||||
|
@ -36,8 +36,8 @@ import (
|
||||
utilnet "k8s.io/kubernetes/pkg/util/net"
|
||||
"k8s.io/kubernetes/pkg/util/sets"
|
||||
|
||||
apiutil "k8s.io/kubernetes/pkg/api/util"
|
||||
apiv1 "k8s.io/kubernetes/pkg/api/v1"
|
||||
"k8s.io/kubernetes/pkg/apimachinery/registered"
|
||||
"k8s.io/kubernetes/pkg/apis/apps"
|
||||
appsapi "k8s.io/kubernetes/pkg/apis/apps"
|
||||
"k8s.io/kubernetes/pkg/apis/autoscaling"
|
||||
@ -72,26 +72,27 @@ func setUp(t *testing.T) (*Master, *etcdtesting.EtcdTestServer, Config, *assert.
|
||||
config := Config{
|
||||
Config: &genericapiserver.Config{},
|
||||
}
|
||||
storageVersions := make(map[string]string)
|
||||
storageDestinations := genericapiserver.NewStorageDestinations()
|
||||
storageDestinations.AddAPIGroup(
|
||||
api.GroupName, etcdstorage.NewEtcdStorage(server.Client, testapi.Default.Codec(), etcdtest.PathPrefix(), false, etcdtest.DeserializationCacheSize))
|
||||
storageDestinations.AddAPIGroup(
|
||||
autoscaling.GroupName, etcdstorage.NewEtcdStorage(server.Client, testapi.Autoscaling.Codec(), etcdtest.PathPrefix(), false, etcdtest.DeserializationCacheSize))
|
||||
storageDestinations.AddAPIGroup(
|
||||
batch.GroupName, etcdstorage.NewEtcdStorage(server.Client, testapi.Batch.Codec(), etcdtest.PathPrefix(), false, etcdtest.DeserializationCacheSize))
|
||||
storageDestinations.AddAPIGroup(
|
||||
apps.GroupName, etcdstorage.NewEtcdStorage(server.Client, testapi.Apps.Codec(), etcdtest.PathPrefix(), false, etcdtest.DeserializationCacheSize))
|
||||
storageDestinations.AddAPIGroup(
|
||||
extensions.GroupName, etcdstorage.NewEtcdStorage(server.Client, testapi.Extensions.Codec(), etcdtest.PathPrefix(), false, etcdtest.DeserializationCacheSize))
|
||||
|
||||
config.StorageDestinations = storageDestinations
|
||||
storageVersions[api.GroupName] = testapi.Default.GroupVersion().String()
|
||||
storageVersions[autoscaling.GroupName] = testapi.Autoscaling.GroupVersion().String()
|
||||
storageVersions[batch.GroupName] = testapi.Batch.GroupVersion().String()
|
||||
storageVersions[apps.GroupName] = testapi.Apps.GroupVersion().String()
|
||||
storageVersions[extensions.GroupName] = testapi.Extensions.GroupVersion().String()
|
||||
config.StorageVersions = storageVersions
|
||||
etcdConfig := etcdstorage.EtcdConfig{
|
||||
Prefix: etcdtest.PathPrefix(),
|
||||
CAFile: server.CAFile,
|
||||
KeyFile: server.KeyFile,
|
||||
CertFile: server.CertFile,
|
||||
}
|
||||
for _, url := range server.ClientURLs {
|
||||
etcdConfig.ServerList = append(etcdConfig.ServerList, url.String())
|
||||
}
|
||||
|
||||
resourceEncoding := genericapiserver.NewDefaultResourceEncodingConfig()
|
||||
resourceEncoding.SetVersionEncoding(api.GroupName, *testapi.Default.GroupVersion(), unversioned.GroupVersion{Group: api.GroupName, Version: runtime.APIVersionInternal})
|
||||
resourceEncoding.SetVersionEncoding(autoscaling.GroupName, *testapi.Autoscaling.GroupVersion(), unversioned.GroupVersion{Group: autoscaling.GroupName, Version: runtime.APIVersionInternal})
|
||||
resourceEncoding.SetVersionEncoding(batch.GroupName, *testapi.Batch.GroupVersion(), unversioned.GroupVersion{Group: batch.GroupName, Version: runtime.APIVersionInternal})
|
||||
resourceEncoding.SetVersionEncoding(apps.GroupName, *testapi.Apps.GroupVersion(), unversioned.GroupVersion{Group: apps.GroupName, Version: runtime.APIVersionInternal})
|
||||
resourceEncoding.SetVersionEncoding(extensions.GroupName, *testapi.Extensions.GroupVersion(), unversioned.GroupVersion{Group: extensions.GroupName, Version: runtime.APIVersionInternal})
|
||||
storageFactory := genericapiserver.NewDefaultStorageFactory(etcdConfig, api.Codecs, resourceEncoding, DefaultAPIResourceConfigSource())
|
||||
|
||||
config.StorageFactory = storageFactory
|
||||
config.APIResourceConfigSource = DefaultAPIResourceConfigSource()
|
||||
config.PublicAddress = net.ParseIP("192.168.10.4")
|
||||
config.Serializer = api.Codecs
|
||||
config.KubeletClient = client.FakeKubeletClient{}
|
||||
@ -398,7 +399,7 @@ func TestAPIVersionOfDiscoveryEndpoints(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestDiscoveryAtAPIS(t *testing.T) {
|
||||
master, etcdserver, config, assert := newLimitedMaster(t)
|
||||
master, etcdserver, _, assert := newLimitedMaster(t)
|
||||
defer etcdserver.Terminate(t)
|
||||
|
||||
server := httptest.NewServer(master.HandlerContainer.ServeMux)
|
||||
@ -444,20 +445,20 @@ func TestDiscoveryAtAPIS(t *testing.T) {
|
||||
}
|
||||
expectPreferredVersion := map[string]unversioned.GroupVersionForDiscovery{
|
||||
autoscaling.GroupName: {
|
||||
GroupVersion: config.StorageVersions[autoscaling.GroupName],
|
||||
Version: apiutil.GetVersion(config.StorageVersions[autoscaling.GroupName]),
|
||||
GroupVersion: registered.GroupOrDie(autoscaling.GroupName).GroupVersion.String(),
|
||||
Version: registered.GroupOrDie(autoscaling.GroupName).GroupVersion.Version,
|
||||
},
|
||||
batch.GroupName: {
|
||||
GroupVersion: config.StorageVersions[batch.GroupName],
|
||||
Version: apiutil.GetVersion(config.StorageVersions[batch.GroupName]),
|
||||
GroupVersion: registered.GroupOrDie(batch.GroupName).GroupVersion.String(),
|
||||
Version: registered.GroupOrDie(batch.GroupName).GroupVersion.Version,
|
||||
},
|
||||
apps.GroupName: {
|
||||
GroupVersion: config.StorageVersions[apps.GroupName],
|
||||
Version: apiutil.GetVersion(config.StorageVersions[apps.GroupName]),
|
||||
GroupVersion: registered.GroupOrDie(apps.GroupName).GroupVersion.String(),
|
||||
Version: registered.GroupOrDie(apps.GroupName).GroupVersion.Version,
|
||||
},
|
||||
extensions.GroupName: {
|
||||
GroupVersion: config.StorageVersions[extensions.GroupName],
|
||||
Version: apiutil.GetVersion(config.StorageVersions[extensions.GroupName]),
|
||||
GroupVersion: registered.GroupOrDie(extensions.GroupName).GroupVersion.String(),
|
||||
Version: registered.GroupOrDie(extensions.GroupName).GroupVersion.Version,
|
||||
},
|
||||
}
|
||||
|
||||
|
@ -231,6 +231,7 @@ func NewEtcdTestClientServer(t *testing.T) *EtcdTestServer {
|
||||
t.Fatalf("Failed to start etcd server error=%v", err)
|
||||
return nil
|
||||
}
|
||||
|
||||
cfg := etcd.Config{
|
||||
Endpoints: server.ClientURLs.StringSlice(),
|
||||
Transport: newHttpTransport(t, server.CertFile, server.KeyFile, server.CAFile),
|
||||
|
@ -29,6 +29,7 @@ import (
|
||||
"github.com/golang/glog"
|
||||
"k8s.io/kubernetes/pkg/api"
|
||||
"k8s.io/kubernetes/pkg/api/testapi"
|
||||
"k8s.io/kubernetes/pkg/api/unversioned"
|
||||
"k8s.io/kubernetes/pkg/apis/apps"
|
||||
"k8s.io/kubernetes/pkg/apis/autoscaling"
|
||||
"k8s.io/kubernetes/pkg/apis/batch"
|
||||
@ -149,31 +150,33 @@ func startMasterOrDie(masterConfig *master.Config) (*master.Master, *httptest.Se
|
||||
|
||||
// Returns a basic master config.
|
||||
func NewMasterConfig() *master.Config {
|
||||
etcdClient := NewEtcdClient()
|
||||
storageVersions := make(map[string]string)
|
||||
etcdConfig := etcdstorage.EtcdConfig{
|
||||
ServerList: []string{"http://127.0.0.1:4001"},
|
||||
Prefix: etcdtest.PathPrefix(),
|
||||
}
|
||||
|
||||
etcdStorage := etcdstorage.NewEtcdStorage(etcdClient, testapi.Default.Codec(), etcdtest.PathPrefix(), false, etcdtest.DeserializationCacheSize)
|
||||
storageVersions[api.GroupName] = testapi.Default.GroupVersion().String()
|
||||
autoscalingEtcdStorage := NewAutoscalingEtcdStorage(etcdClient)
|
||||
storageVersions[autoscaling.GroupName] = testapi.Autoscaling.GroupVersion().String()
|
||||
batchEtcdStorage := NewBatchEtcdStorage(etcdClient)
|
||||
storageVersions[batch.GroupName] = testapi.Batch.GroupVersion().String()
|
||||
appsEtcdStorage := NewAppsEtcdStorage(etcdClient)
|
||||
storageVersions[apps.GroupName] = testapi.Apps.GroupVersion().String()
|
||||
expEtcdStorage := NewExtensionsEtcdStorage(etcdClient)
|
||||
storageVersions[extensions.GroupName] = testapi.Extensions.GroupVersion().String()
|
||||
negotiatedSerializer := NewSingleContentTypeSerializer(api.Scheme, testapi.Default.Codec(), "application/json")
|
||||
|
||||
storageDestinations := genericapiserver.NewStorageDestinations()
|
||||
storageDestinations.AddAPIGroup(api.GroupName, etcdStorage)
|
||||
storageDestinations.AddAPIGroup(autoscaling.GroupName, autoscalingEtcdStorage)
|
||||
storageDestinations.AddAPIGroup(batch.GroupName, batchEtcdStorage)
|
||||
storageDestinations.AddAPIGroup(apps.GroupName, appsEtcdStorage)
|
||||
storageDestinations.AddAPIGroup(extensions.GroupName, expEtcdStorage)
|
||||
storageFactory := genericapiserver.NewDefaultStorageFactory(etcdConfig, negotiatedSerializer, genericapiserver.NewDefaultResourceEncodingConfig(), master.DefaultAPIResourceConfigSource())
|
||||
storageFactory.SetSerializer(
|
||||
unversioned.GroupResource{Group: api.GroupName, Resource: genericapiserver.AllResources},
|
||||
NewSingleContentTypeSerializer(api.Scheme, testapi.Default.Codec(), "application/json"))
|
||||
storageFactory.SetSerializer(
|
||||
unversioned.GroupResource{Group: autoscaling.GroupName, Resource: genericapiserver.AllResources},
|
||||
NewSingleContentTypeSerializer(api.Scheme, testapi.Autoscaling.Codec(), "application/json"))
|
||||
storageFactory.SetSerializer(
|
||||
unversioned.GroupResource{Group: batch.GroupName, Resource: genericapiserver.AllResources},
|
||||
NewSingleContentTypeSerializer(api.Scheme, testapi.Batch.Codec(), "application/json"))
|
||||
storageFactory.SetSerializer(
|
||||
unversioned.GroupResource{Group: apps.GroupName, Resource: genericapiserver.AllResources},
|
||||
NewSingleContentTypeSerializer(api.Scheme, testapi.Apps.Codec(), "application/json"))
|
||||
storageFactory.SetSerializer(
|
||||
unversioned.GroupResource{Group: extensions.GroupName, Resource: genericapiserver.AllResources},
|
||||
NewSingleContentTypeSerializer(api.Scheme, testapi.Extensions.Codec(), "application/json"))
|
||||
|
||||
return &master.Config{
|
||||
Config: &genericapiserver.Config{
|
||||
StorageDestinations: storageDestinations,
|
||||
StorageVersions: storageVersions,
|
||||
StorageFactory: storageFactory,
|
||||
APIResourceConfigSource: master.DefaultAPIResourceConfigSource(),
|
||||
APIPrefix: "/api",
|
||||
APIGroupPrefix: "/apis",
|
||||
|
@ -37,6 +37,8 @@ type wrappedSerializer struct {
|
||||
contentType string
|
||||
}
|
||||
|
||||
var _ runtime.NegotiatedSerializer = &wrappedSerializer{}
|
||||
|
||||
func (s *wrappedSerializer) SupportedMediaTypes() []string {
|
||||
return []string{s.contentType}
|
||||
}
|
||||
@ -49,9 +51,9 @@ func (s *wrappedSerializer) SerializerForMediaType(mediaType string, options map
|
||||
}
|
||||
|
||||
func (s *wrappedSerializer) EncoderForVersion(serializer runtime.Serializer, gv unversioned.GroupVersion) runtime.Encoder {
|
||||
return versioning.NewCodec(s.serializer, s.scheme, s.scheme, s.scheme, runtime.ObjectTyperToTyper(s.scheme), []unversioned.GroupVersion{gv}, nil)
|
||||
return versioning.NewCodec(s.serializer, s.serializer, s.scheme, s.scheme, s.scheme, runtime.ObjectTyperToTyper(s.scheme), []unversioned.GroupVersion{gv}, nil)
|
||||
}
|
||||
|
||||
func (s *wrappedSerializer) DecoderToVersion(serializer runtime.Serializer, gv unversioned.GroupVersion) runtime.Decoder {
|
||||
return versioning.NewCodec(s.serializer, s.scheme, s.scheme, s.scheme, runtime.ObjectTyperToTyper(s.scheme), nil, []unversioned.GroupVersion{gv})
|
||||
return versioning.NewCodec(s.serializer, s.serializer, s.scheme, s.scheme, s.scheme, runtime.ObjectTyperToTyper(s.scheme), nil, []unversioned.GroupVersion{gv})
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user