diff --git a/cmd/kube-apiserver/app/options/options.go b/cmd/kube-apiserver/app/options/options.go index 77e4d61f802..05eac20d183 100644 --- a/cmd/kube-apiserver/app/options/options.go +++ b/cmd/kube-apiserver/app/options/options.go @@ -23,6 +23,7 @@ import ( "k8s.io/kubernetes/pkg/admission" "k8s.io/kubernetes/pkg/api" + "k8s.io/kubernetes/pkg/api/unversioned" apiutil "k8s.io/kubernetes/pkg/api/util" "k8s.io/kubernetes/pkg/api/validation" "k8s.io/kubernetes/pkg/apimachinery/registered" @@ -98,39 +99,53 @@ func NewAPIServer() *APIServer { } // dest must be a map of group to groupVersion. -func gvToMap(gvList string, dest map[string]string) { - for _, gv := range strings.Split(gvList, ",") { - if gv == "" { +func mergeGroupVersionIntoMap(gvList string, dest map[string]unversioned.GroupVersion) error { + for _, gvString := range strings.Split(gvList, ",") { + if gvString == "" { continue } // We accept two formats. "group/version" OR // "group=group/version". The latter is used when types // move between groups. - if !strings.Contains(gv, "=") { - dest[apiutil.GetGroup(gv)] = gv + if !strings.Contains(gvString, "=") { + gv, err := unversioned.ParseGroupVersion(gvString) + if err != nil { + return err + } + dest[gv.Group] = gv + } else { - parts := strings.SplitN(gv, "=", 2) - // TODO: error checking. - dest[parts[0]] = parts[1] + parts := strings.SplitN(gvString, "=", 2) + gv, err := unversioned.ParseGroupVersion(parts[1]) + if err != nil { + return err + } + dest[parts[0]] = gv } } + + return nil } -// StorageGroupsToGroupVersions returns a map from group name to group version, +// StorageGroupsToEncodingVersion returns a map from group name to group version, // computed from the s.DeprecatedStorageVersion and s.StorageVersions flags. // TODO: can we move the whole storage version concept to the generic apiserver? -func (s *APIServer) StorageGroupsToGroupVersions() map[string]string { - storageVersionMap := map[string]string{} +func (s *APIServer) StorageGroupsToEncodingVersion() (map[string]unversioned.GroupVersion, error) { + storageVersionMap := map[string]unversioned.GroupVersion{} if s.DeprecatedStorageVersion != "" { - storageVersionMap[""] = s.DeprecatedStorageVersion + storageVersionMap[""] = unversioned.GroupVersion{Group: apiutil.GetGroup(s.DeprecatedStorageVersion), Version: apiutil.GetVersion(s.DeprecatedStorageVersion)} } // First, get the defaults. - gvToMap(s.DefaultStorageVersions, storageVersionMap) + if err := mergeGroupVersionIntoMap(s.DefaultStorageVersions, storageVersionMap); err != nil { + return nil, err + } // Override any defaults with the user settings. - gvToMap(s.StorageVersions, storageVersionMap) + if err := mergeGroupVersionIntoMap(s.StorageVersions, storageVersionMap); err != nil { + return nil, err + } - return storageVersionMap + return storageVersionMap, nil } // AddFlags adds flags for a specific APIServer to the specified FlagSet diff --git a/cmd/kube-apiserver/app/options/options_test.go b/cmd/kube-apiserver/app/options/options_test.go index 85f3a77ac2c..a7f67e07ad2 100644 --- a/cmd/kube-apiserver/app/options/options_test.go +++ b/cmd/kube-apiserver/app/options/options_test.go @@ -23,6 +23,7 @@ import ( "github.com/spf13/pflag" "k8s.io/kubernetes/pkg/api" + "k8s.io/kubernetes/pkg/api/unversioned" "k8s.io/kubernetes/pkg/apis/autoscaling" "k8s.io/kubernetes/pkg/apis/extensions" ) @@ -32,38 +33,38 @@ func TestGenerateStorageVersionMap(t *testing.T) { legacyVersion string storageVersions string defaultVersions string - expectedMap map[string]string + expectedMap map[string]unversioned.GroupVersion }{ { legacyVersion: "v1", storageVersions: "v1,extensions/v1beta1", - expectedMap: map[string]string{ - api.GroupName: "v1", - extensions.GroupName: "extensions/v1beta1", + expectedMap: map[string]unversioned.GroupVersion{ + api.GroupName: {Version: "v1"}, + extensions.GroupName: {Group: "extensions", Version: "v1beta1"}, }, }, { legacyVersion: "", storageVersions: "extensions/v1beta1,v1", - expectedMap: map[string]string{ - api.GroupName: "v1", - extensions.GroupName: "extensions/v1beta1", + expectedMap: map[string]unversioned.GroupVersion{ + api.GroupName: {Version: "v1"}, + extensions.GroupName: {Group: "extensions", Version: "v1beta1"}, }, }, { legacyVersion: "", storageVersions: "autoscaling=extensions/v1beta1,v1", defaultVersions: "extensions/v1beta1,v1,autoscaling/v1", - expectedMap: map[string]string{ - api.GroupName: "v1", - autoscaling.GroupName: "extensions/v1beta1", - extensions.GroupName: "extensions/v1beta1", + expectedMap: map[string]unversioned.GroupVersion{ + api.GroupName: {Version: "v1"}, + autoscaling.GroupName: {Group: "extensions", Version: "v1beta1"}, + extensions.GroupName: {Group: "extensions", Version: "v1beta1"}, }, }, { legacyVersion: "", storageVersions: "", - expectedMap: map[string]string{}, + expectedMap: map[string]unversioned.GroupVersion{}, }, } for i, test := range testCases { @@ -72,7 +73,10 @@ func TestGenerateStorageVersionMap(t *testing.T) { StorageVersions: test.storageVersions, DefaultStorageVersions: test.defaultVersions, } - output := s.StorageGroupsToGroupVersions() + output, err := s.StorageGroupsToEncodingVersion() + if err != nil { + t.Errorf("%v: unexpected error: %v", i, err) + } if !reflect.DeepEqual(test.expectedMap, output) { t.Errorf("%v: unexpected error. expect: %v, got: %v", i, test.expectedMap, output) } diff --git a/cmd/kube-apiserver/app/server.go b/cmd/kube-apiserver/app/server.go index 2a76048b729..d85ec56e670 100644 --- a/cmd/kube-apiserver/app/server.go +++ b/cmd/kube-apiserver/app/server.go @@ -37,8 +37,6 @@ import ( "k8s.io/kubernetes/pkg/api" "k8s.io/kubernetes/pkg/api/unversioned" apiv1 "k8s.io/kubernetes/pkg/api/v1" - "k8s.io/kubernetes/pkg/apimachinery/registered" - "k8s.io/kubernetes/pkg/apis/apps" appsapi "k8s.io/kubernetes/pkg/apis/apps/v1alpha1" "k8s.io/kubernetes/pkg/apis/autoscaling" autoscalingapiv1 "k8s.io/kubernetes/pkg/apis/autoscaling/v1" @@ -58,10 +56,7 @@ import ( "k8s.io/kubernetes/pkg/master" "k8s.io/kubernetes/pkg/registry/cachesize" "k8s.io/kubernetes/pkg/runtime" - "k8s.io/kubernetes/pkg/runtime/serializer/versioning" "k8s.io/kubernetes/pkg/serviceaccount" - "k8s.io/kubernetes/pkg/storage" - etcdstorage "k8s.io/kubernetes/pkg/storage/etcd" ) // NewAPIServerCommand creates a *cobra.Command object with default parameters @@ -81,89 +76,6 @@ cluster's shared state through which all other components interact.`, return cmd } -// For testing. -type newEtcdFunc func(runtime.NegotiatedSerializer, string, string, etcdstorage.EtcdConfig) (storage.Interface, error) - -func newEtcd(ns runtime.NegotiatedSerializer, storageGroupVersionString, memoryGroupVersionString string, etcdConfig etcdstorage.EtcdConfig) (etcdStorage storage.Interface, err error) { - if storageGroupVersionString == "" { - return etcdStorage, fmt.Errorf("storageVersion is required to create a etcd storage") - } - storageVersion, err := unversioned.ParseGroupVersion(storageGroupVersionString) - if err != nil { - return nil, fmt.Errorf("couldn't understand storage version %v: %v", storageGroupVersionString, err) - } - memoryVersion, err := unversioned.ParseGroupVersion(memoryGroupVersionString) - if err != nil { - return nil, fmt.Errorf("couldn't understand memory version %v: %v", memoryGroupVersionString, err) - } - - var storageConfig etcdstorage.EtcdStorageConfig - storageConfig.Config = etcdConfig - s, ok := ns.SerializerForMediaType("application/json", nil) - if !ok { - return nil, fmt.Errorf("unable to find serializer for JSON") - } - glog.Infof("constructing etcd storage interface.\n sv: %v\n mv: %v\n", storageVersion, memoryVersion) - encoder := ns.EncoderForVersion(s, storageVersion) - decoder := ns.DecoderToVersion(s, memoryVersion) - if memoryVersion.Group != storageVersion.Group { - // Allow this codec to translate between groups. - if err = versioning.EnableCrossGroupEncoding(encoder, memoryVersion.Group, storageVersion.Group); err != nil { - return nil, fmt.Errorf("error setting up encoder for %v: %v", storageGroupVersionString, err) - } - if err = versioning.EnableCrossGroupDecoding(decoder, storageVersion.Group, memoryVersion.Group); err != nil { - return nil, fmt.Errorf("error setting up decoder for %v: %v", storageGroupVersionString, err) - } - } - storageConfig.Codec = runtime.NewCodec(encoder, decoder) - return storageConfig.NewStorage() -} - -// parse the value of --etcd-servers-overrides and update given storageDestinations. -func updateEtcdOverrides(overrides []string, storageVersions map[string]string, etcdConfig etcdstorage.EtcdConfig, storageDestinations *genericapiserver.StorageDestinations, newEtcdFn newEtcdFunc) { - if len(overrides) == 0 { - return - } - for _, override := range overrides { - tokens := strings.Split(override, "#") - if len(tokens) != 2 { - glog.Errorf("invalid value of etcd server overrides: %s", override) - continue - } - - apiresource := strings.Split(tokens[0], "/") - if len(apiresource) != 2 { - glog.Errorf("invalid resource definition: %s", tokens[0]) - } - group := apiresource[0] - resource := apiresource[1] - - apigroup, err := registered.Group(group) - if err != nil { - glog.Errorf("invalid api group %s: %v", group, err) - continue - } - if _, found := storageVersions[apigroup.GroupVersion.Group]; !found { - glog.Errorf("Couldn't find the storage version for group %s", apigroup.GroupVersion.Group) - continue - } - - servers := strings.Split(tokens[1], ";") - overrideEtcdConfig := etcdConfig - overrideEtcdConfig.ServerList = servers - // Note, internalGV will be wrong for things like batch or - // autoscalers, but they shouldn't be using the override - // storage. - internalGV := apigroup.GroupVersion.Group + "/__internal" - etcdOverrideStorage, err := newEtcdFn(api.Codecs, storageVersions[apigroup.GroupVersion.Group], internalGV, overrideEtcdConfig) - if err != nil { - glog.Fatalf("Invalid storage version or misconfigured etcd for %s: %v", tokens[0], err) - } - - storageDestinations.AddStorageOverride(group, resource, etcdOverrideStorage) - } -} - // Run runs the specified APIServer. This should never exit. func Run(s *options.APIServer) error { genericapiserver.DefaultAndValidateRunOptions(s.ServerRunOptions) @@ -252,126 +164,37 @@ func Run(s *options.APIServer) error { glog.Errorf("Failed to create clientset: %v", err) } - legacyV1Group, err := registered.Group(api.GroupName) + resourceEncoding := genericapiserver.NewDefaultResourceEncodingConfig() + groupToEncoding, err := s.StorageGroupsToEncodingVersion() if err != nil { - return err + glog.Fatalf("error getting group encoding: %s", err) + } + for group, storageEncodingVersion := range groupToEncoding { + resourceEncoding.SetVersionEncoding(group, storageEncodingVersion, unversioned.GroupVersion{Group: group, Version: runtime.APIVersionInternal}) } - storageDestinations := genericapiserver.NewStorageDestinations() + storageFactory := genericapiserver.NewDefaultStorageFactory(s.EtcdConfig, api.Codecs, resourceEncoding, apiResourceConfigSource) + storageFactory.AddCohabitatingResources(batch.Resource("jobs"), extensions.Resource("jobs")) + storageFactory.AddCohabitatingResources(autoscaling.Resource("horizontalpodautoscalers"), extensions.Resource("horizontalpodautoscalers")) + for _, override := range s.EtcdServersOverrides { + tokens := strings.Split(override, "#") + if len(tokens) != 2 { + glog.Errorf("invalid value of etcd server overrides: %s", override) + continue + } - storageVersions := s.StorageGroupsToGroupVersions() - if _, found := storageVersions[legacyV1Group.GroupVersion.Group]; !found { - glog.Fatalf("Couldn't find the storage version for group: %q in storageVersions: %v", legacyV1Group.GroupVersion.Group, storageVersions) + apiresource := strings.Split(tokens[0], "/") + if len(apiresource) != 2 { + glog.Errorf("invalid resource definition: %s", tokens[0]) + continue + } + group := apiresource[0] + resource := apiresource[1] + groupResource := unversioned.GroupResource{Group: group, Resource: resource} + + servers := strings.Split(tokens[1], ";") + storageFactory.SetEtcdLocation(groupResource, servers) } - etcdStorage, err := newEtcd(api.Codecs, storageVersions[legacyV1Group.GroupVersion.Group], "/__internal", s.EtcdConfig) - if err != nil { - glog.Fatalf("Invalid storage version or misconfigured etcd: %v", err) - } - storageDestinations.AddAPIGroup("", etcdStorage) - - if apiResourceConfigSource.AnyResourcesForVersionEnabled(extensionsapiv1beta1.SchemeGroupVersion) { - glog.Infof("Configuring extensions/v1beta1 storage destination") - expGroup, err := registered.Group(extensions.GroupName) - if err != nil { - glog.Fatalf("Extensions API is enabled in runtime config, but not enabled in the environment variable KUBE_API_VERSIONS. Error: %v", err) - } - if _, found := storageVersions[expGroup.GroupVersion.Group]; !found { - glog.Fatalf("Couldn't find the storage version for group: %q in storageVersions: %v", expGroup.GroupVersion.Group, storageVersions) - } - expEtcdStorage, err := newEtcd(api.Codecs, storageVersions[expGroup.GroupVersion.Group], "extensions/__internal", s.EtcdConfig) - if err != nil { - glog.Fatalf("Invalid extensions storage version or misconfigured etcd: %v", err) - } - storageDestinations.AddAPIGroup(extensions.GroupName, expEtcdStorage) - - // Since HPA has been moved to the autoscaling group, we need to make - // sure autoscaling has a storage destination. If the autoscaling group - // itself is on, it will overwrite this decision below. - storageDestinations.AddAPIGroup(autoscaling.GroupName, expEtcdStorage) - - // Since Job has been moved to the batch group, we need to make - // sure batch has a storage destination. If the batch group - // itself is on, it will overwrite this decision below. - storageDestinations.AddAPIGroup(batch.GroupName, expEtcdStorage) - } - - // autoscaling/v1/horizontalpodautoscalers is a move from extensions/v1beta1/horizontalpodautoscalers. - // The storage version needs to be either extensions/v1beta1 or autoscaling/v1. - // Users must roll forward while using 1.2, because we will require the latter for 1.3. - if apiResourceConfigSource.AnyResourcesForVersionEnabled(autoscalingapiv1.SchemeGroupVersion) { - glog.Infof("Configuring autoscaling/v1 storage destination") - autoscalingGroup, err := registered.Group(autoscaling.GroupName) - if err != nil { - glog.Fatalf("Autoscaling API is enabled in runtime config, but not enabled in the environment variable KUBE_API_VERSIONS. Error: %v", err) - } - // Figure out what storage group/version we should use. - storageGroupVersion, found := storageVersions[autoscalingGroup.GroupVersion.Group] - if !found { - glog.Fatalf("Couldn't find the storage version for group: %q in storageVersions: %v", autoscalingGroup.GroupVersion.Group, storageVersions) - } - - if storageGroupVersion != "autoscaling/v1" && storageGroupVersion != "extensions/v1beta1" { - glog.Fatalf("The storage version for autoscaling must be either 'autoscaling/v1' or 'extensions/v1beta1'") - } - glog.Infof("Using %v for autoscaling group storage version", storageGroupVersion) - autoscalingEtcdStorage, err := newEtcd(api.Codecs, storageGroupVersion, "extensions/__internal", s.EtcdConfig) - if err != nil { - glog.Fatalf("Invalid extensions storage version or misconfigured etcd: %v", err) - } - storageDestinations.AddAPIGroup(autoscaling.GroupName, autoscalingEtcdStorage) - } - - // batch/v1/job is a move from extensions/v1beta1/job. The storage - // version needs to be either extensions/v1beta1 or batch/v1. Users - // must roll forward while using 1.2, because we will require the - // latter for 1.3. - if apiResourceConfigSource.AnyResourcesForVersionEnabled(batchapiv1.SchemeGroupVersion) { - glog.Infof("Configuring batch/v1 storage destination") - batchGroup, err := registered.Group(batch.GroupName) - if err != nil { - glog.Fatalf("Batch API is enabled in runtime config, but not enabled in the environment variable KUBE_API_VERSIONS. Error: %v", err) - } - // Figure out what storage group/version we should use. - storageGroupVersion, found := storageVersions[batchGroup.GroupVersion.Group] - if !found { - glog.Fatalf("Couldn't find the storage version for group: %q in storageVersions: %v", batchGroup.GroupVersion.Group, storageVersions) - } - - if storageGroupVersion != "batch/v1" && storageGroupVersion != "extensions/v1beta1" { - glog.Fatalf("The storage version for batch must be either 'batch/v1' or 'extensions/v1beta1'") - } - glog.Infof("Using %v for batch group storage version", storageGroupVersion) - batchEtcdStorage, err := newEtcd(api.Codecs, storageGroupVersion, "extensions/__internal", s.EtcdConfig) - if err != nil { - glog.Fatalf("Invalid extensions storage version or misconfigured etcd: %v", err) - } - storageDestinations.AddAPIGroup(batch.GroupName, batchEtcdStorage) - } - - if apiResourceConfigSource.AnyResourcesForVersionEnabled(appsapi.SchemeGroupVersion) { - glog.Infof("Configuring apps/v1alpha1 storage destination") - appsGroup, err := registered.Group(apps.GroupName) - if err != nil { - glog.Fatalf("Apps API is enabled in runtime config, but not enabled in the environment variable KUBE_API_VERSIONS. Error: %v", err) - } - // Figure out what storage group/version we should use. - storageGroupVersion, found := storageVersions[appsGroup.GroupVersion.Group] - if !found { - glog.Fatalf("Couldn't find the storage version for group: %q in storageVersions: %v", appsGroup.GroupVersion.Group, storageVersions) - } - - if storageGroupVersion != "apps/v1alpha1" { - glog.Fatalf("The storage version for apps must be apps/v1alpha1") - } - glog.Infof("Using %v for petset group storage version", storageGroupVersion) - appsEtcdStorage, err := newEtcd(api.Codecs, storageGroupVersion, "apps/__internal", s.EtcdConfig) - if err != nil { - glog.Fatalf("Invalid extensions storage version or misconfigured etcd: %v", err) - } - storageDestinations.AddAPIGroup(apps.GroupName, appsEtcdStorage) - } - - updateEtcdOverrides(s.EtcdServersOverrides, storageVersions, s.EtcdConfig, &storageDestinations, newEtcd) // Default to the private server key for service account token signing if s.ServiceAccountKeyFile == "" && s.TLSPrivateKeyFile != "" { @@ -386,7 +209,11 @@ func Run(s *options.APIServer) error { if s.ServiceAccountLookup { // If we need to look up service accounts and tokens, // go directly to etcd to avoid recursive auth insanity - serviceAccountGetter = serviceaccountcontroller.NewGetterFromStorageInterface(etcdStorage) + storage, err := storageFactory.New(api.Resource("serviceaccounts")) + if err != nil { + glog.Fatalf("Unable to get serviceaccounts storage: %v", err) + } + serviceAccountGetter = serviceaccountcontroller.NewGetterFromStorageInterface(storage) } authenticator, err := authenticator.New(authenticator.AuthenticatorConfig{ @@ -443,8 +270,7 @@ func Run(s *options.APIServer) error { genericConfig := genericapiserver.NewConfig(s.ServerRunOptions) // TODO: Move the following to generic api server as well. - genericConfig.StorageDestinations = storageDestinations - genericConfig.StorageVersions = storageVersions + genericConfig.StorageFactory = storageFactory genericConfig.Authenticator = authenticator genericConfig.SupportsBasicAuth = len(s.BasicAuthFile) > 0 genericConfig.Authorizer = authorizer diff --git a/cmd/kube-apiserver/app/server_test.go b/cmd/kube-apiserver/app/server_test.go index d1854c0a0e2..cce3855ef84 100644 --- a/cmd/kube-apiserver/app/server_test.go +++ b/cmd/kube-apiserver/app/server_test.go @@ -19,18 +19,12 @@ package app import ( "reflect" "regexp" - "strings" "testing" "k8s.io/kubernetes/cmd/kube-apiserver/app/options" - "k8s.io/kubernetes/pkg/api" "k8s.io/kubernetes/pkg/api/unversioned" - "k8s.io/kubernetes/pkg/apis/extensions" "k8s.io/kubernetes/pkg/genericapiserver" "k8s.io/kubernetes/pkg/master" - "k8s.io/kubernetes/pkg/runtime" - "k8s.io/kubernetes/pkg/storage" - etcdstorage "k8s.io/kubernetes/pkg/storage/etcd" ) func TestLongRunningRequestRegexp(t *testing.T) { @@ -74,64 +68,6 @@ func TestLongRunningRequestRegexp(t *testing.T) { } } -func TestUpdateEtcdOverrides(t *testing.T) { - storageVersions := map[string]string{ - "": "v1", - "extensions": "extensions/v1beta1", - } - - testCases := []struct { - apigroup string - resource string - servers []string - }{ - { - apigroup: api.GroupName, - resource: "resource", - servers: []string{"http://127.0.0.1:10000"}, - }, - { - apigroup: api.GroupName, - resource: "resource", - servers: []string{"http://127.0.0.1:10000", "http://127.0.0.1:20000"}, - }, - { - apigroup: extensions.GroupName, - resource: "resource", - servers: []string{"http://127.0.0.1:10000"}, - }, - } - - for _, test := range testCases { - newEtcd := func(_ runtime.NegotiatedSerializer, _, _ string, etcdConfig etcdstorage.EtcdConfig) (storage.Interface, error) { - if !reflect.DeepEqual(test.servers, etcdConfig.ServerList) { - t.Errorf("unexpected server list, expected: %#v, got: %#v", test.servers, etcdConfig.ServerList) - } - return nil, nil - } - storageDestinations := genericapiserver.NewStorageDestinations() - override := test.apigroup + "/" + test.resource + "#" + strings.Join(test.servers, ";") - defaultEtcdConfig := etcdstorage.EtcdConfig{ - Prefix: genericapiserver.DefaultEtcdPathPrefix, - ServerList: []string{"http://127.0.0.1"}, - } - updateEtcdOverrides([]string{override}, storageVersions, defaultEtcdConfig, &storageDestinations, newEtcd) - apigroup, ok := storageDestinations.APIGroups[test.apigroup] - if !ok { - t.Errorf("apigroup: %s not created", test.apigroup) - continue - } - if apigroup.Overrides == nil { - t.Errorf("Overrides not created for: %s", test.apigroup) - continue - } - if _, ok := apigroup.Overrides[test.resource]; !ok { - t.Errorf("override not created for: %s", test.resource) - continue - } - } -} - func TestParseRuntimeConfig(t *testing.T) { testCases := []struct { runtimeConfig map[string]string diff --git a/examples/apiserver/apiserver.go b/examples/apiserver/apiserver.go index 7b7dc7b2a39..5e805714188 100644 --- a/examples/apiserver/apiserver.go +++ b/examples/apiserver/apiserver.go @@ -24,7 +24,7 @@ import ( testgroupetcd "k8s.io/kubernetes/examples/apiserver/rest" "k8s.io/kubernetes/pkg/api" "k8s.io/kubernetes/pkg/api/rest" - "k8s.io/kubernetes/pkg/apimachinery" + "k8s.io/kubernetes/pkg/api/unversioned" "k8s.io/kubernetes/pkg/apimachinery/registered" "k8s.io/kubernetes/pkg/genericapiserver" etcdstorage "k8s.io/kubernetes/pkg/storage/etcd" @@ -40,20 +40,14 @@ const ( SecurePort = 6444 ) -func newStorageDestinations(groupName string, groupMeta *apimachinery.GroupMeta) (*genericapiserver.StorageDestinations, error) { - storageDestinations := genericapiserver.NewStorageDestinations() - var storageConfig etcdstorage.EtcdStorageConfig - storageConfig.Config = etcdstorage.EtcdConfig{ +func newStorageFactory() genericapiserver.StorageFactory { + etcdConfig := etcdstorage.EtcdConfig{ Prefix: genericapiserver.DefaultEtcdPathPrefix, ServerList: []string{"http://127.0.0.1:4001"}, } - storageConfig.Codec = groupMeta.Codec - storageInterface, err := storageConfig.NewStorage() - if err != nil { - return nil, err - } - storageDestinations.AddAPIGroup(groupName, storageInterface) - return &storageDestinations, nil + storageFactory := genericapiserver.NewDefaultStorageFactory(etcdConfig, api.Codecs, genericapiserver.NewDefaultResourceEncodingConfig(), genericapiserver.NewResourceConfig()) + + return storageFactory } func NewServerRunOptions() *genericapiserver.ServerRunOptions { @@ -86,12 +80,14 @@ func Run(serverOptions *genericapiserver.ServerRunOptions) error { if err != nil { return fmt.Errorf("%v", err) } - storageDestinations, err := newStorageDestinations(groupName, groupMeta) + storageFactory := newStorageFactory() + storage, err := storageFactory.New(unversioned.GroupResource{Group: groupName, Resource: "testtype"}) if err != nil { - return fmt.Errorf("Unable to init etcd: %v", err) + return fmt.Errorf("Unable to get storage: %v", err) } + restStorageMap := map[string]rest.Storage{ - "testtypes": testgroupetcd.NewREST(storageDestinations.Get(groupName, "testtype"), s.StorageDecorator()), + "testtypes": testgroupetcd.NewREST(storage, s.StorageDecorator()), } apiGroupInfo := genericapiserver.APIGroupInfo{ GroupMeta: *groupMeta, diff --git a/federation/cmd/federated-apiserver/app/options/options.go b/federation/cmd/federated-apiserver/app/options/options.go index d4445ce685b..0ab9ab391af 100644 --- a/federation/cmd/federated-apiserver/app/options/options.go +++ b/federation/cmd/federated-apiserver/app/options/options.go @@ -24,6 +24,7 @@ import ( "k8s.io/kubernetes/pkg/admission" "k8s.io/kubernetes/pkg/api" + "k8s.io/kubernetes/pkg/api/unversioned" apiutil "k8s.io/kubernetes/pkg/api/util" "k8s.io/kubernetes/pkg/api/validation" "k8s.io/kubernetes/pkg/apimachinery/registered" @@ -119,39 +120,53 @@ func NewAPIServer() *APIServer { } // dest must be a map of group to groupVersion. -func gvToMap(gvList string, dest map[string]string) { - for _, gv := range strings.Split(gvList, ",") { - if gv == "" { +func mergeGroupVersionIntoMap(gvList string, dest map[string]unversioned.GroupVersion) error { + for _, gvString := range strings.Split(gvList, ",") { + if gvString == "" { continue } // We accept two formats. "group/version" OR // "group=group/version". The latter is used when types // move between groups. - if !strings.Contains(gv, "=") { - dest[apiutil.GetGroup(gv)] = gv + if !strings.Contains(gvString, "=") { + gv, err := unversioned.ParseGroupVersion(gvString) + if err != nil { + return err + } + dest[gv.Group] = gv + } else { - parts := strings.SplitN(gv, "=", 2) - // TODO: error checking. - dest[parts[0]] = parts[1] + parts := strings.SplitN(gvString, "=", 2) + gv, err := unversioned.ParseGroupVersion(parts[1]) + if err != nil { + return err + } + dest[parts[0]] = gv } } + + return nil } -// StorageGroupsToGroupVersions returns a map from group name to group version, +// StorageGroupsToEncodingVersion returns a map from group name to group version, // computed from the s.DeprecatedStorageVersion and s.StorageVersions flags. // TODO: can we move the whole storage version concept to the generic apiserver? -func (s *APIServer) StorageGroupsToGroupVersions() map[string]string { - storageVersionMap := map[string]string{} +func (s *APIServer) StorageGroupsToEncodingVersion() (map[string]unversioned.GroupVersion, error) { + storageVersionMap := map[string]unversioned.GroupVersion{} if s.DeprecatedStorageVersion != "" { - storageVersionMap[""] = s.DeprecatedStorageVersion + storageVersionMap[""] = unversioned.GroupVersion{Group: apiutil.GetGroup(s.DeprecatedStorageVersion), Version: apiutil.GetVersion(s.DeprecatedStorageVersion)} } // First, get the defaults. - gvToMap(s.DefaultStorageVersions, storageVersionMap) + if err := mergeGroupVersionIntoMap(s.DefaultStorageVersions, storageVersionMap); err != nil { + return nil, err + } // Override any defaults with the user settings. - gvToMap(s.StorageVersions, storageVersionMap) + if err := mergeGroupVersionIntoMap(s.StorageVersions, storageVersionMap); err != nil { + return nil, err + } - return storageVersionMap + return storageVersionMap, nil } // AddFlags adds flags for a specific APIServer to the specified FlagSet diff --git a/federation/cmd/federated-apiserver/app/options/options_test.go b/federation/cmd/federated-apiserver/app/options/options_test.go index 85f3a77ac2c..a7f67e07ad2 100644 --- a/federation/cmd/federated-apiserver/app/options/options_test.go +++ b/federation/cmd/federated-apiserver/app/options/options_test.go @@ -23,6 +23,7 @@ import ( "github.com/spf13/pflag" "k8s.io/kubernetes/pkg/api" + "k8s.io/kubernetes/pkg/api/unversioned" "k8s.io/kubernetes/pkg/apis/autoscaling" "k8s.io/kubernetes/pkg/apis/extensions" ) @@ -32,38 +33,38 @@ func TestGenerateStorageVersionMap(t *testing.T) { legacyVersion string storageVersions string defaultVersions string - expectedMap map[string]string + expectedMap map[string]unversioned.GroupVersion }{ { legacyVersion: "v1", storageVersions: "v1,extensions/v1beta1", - expectedMap: map[string]string{ - api.GroupName: "v1", - extensions.GroupName: "extensions/v1beta1", + expectedMap: map[string]unversioned.GroupVersion{ + api.GroupName: {Version: "v1"}, + extensions.GroupName: {Group: "extensions", Version: "v1beta1"}, }, }, { legacyVersion: "", storageVersions: "extensions/v1beta1,v1", - expectedMap: map[string]string{ - api.GroupName: "v1", - extensions.GroupName: "extensions/v1beta1", + expectedMap: map[string]unversioned.GroupVersion{ + api.GroupName: {Version: "v1"}, + extensions.GroupName: {Group: "extensions", Version: "v1beta1"}, }, }, { legacyVersion: "", storageVersions: "autoscaling=extensions/v1beta1,v1", defaultVersions: "extensions/v1beta1,v1,autoscaling/v1", - expectedMap: map[string]string{ - api.GroupName: "v1", - autoscaling.GroupName: "extensions/v1beta1", - extensions.GroupName: "extensions/v1beta1", + expectedMap: map[string]unversioned.GroupVersion{ + api.GroupName: {Version: "v1"}, + autoscaling.GroupName: {Group: "extensions", Version: "v1beta1"}, + extensions.GroupName: {Group: "extensions", Version: "v1beta1"}, }, }, { legacyVersion: "", storageVersions: "", - expectedMap: map[string]string{}, + expectedMap: map[string]unversioned.GroupVersion{}, }, } for i, test := range testCases { @@ -72,7 +73,10 @@ func TestGenerateStorageVersionMap(t *testing.T) { StorageVersions: test.storageVersions, DefaultStorageVersions: test.defaultVersions, } - output := s.StorageGroupsToGroupVersions() + output, err := s.StorageGroupsToEncodingVersion() + if err != nil { + t.Errorf("%v: unexpected error: %v", i, err) + } if !reflect.DeepEqual(test.expectedMap, output) { t.Errorf("%v: unexpected error. expect: %v, got: %v", i, test.expectedMap, output) } diff --git a/federation/cmd/federated-apiserver/app/server.go b/federation/cmd/federated-apiserver/app/server.go index 96e84436f65..9a7c0b7e65c 100644 --- a/federation/cmd/federated-apiserver/app/server.go +++ b/federation/cmd/federated-apiserver/app/server.go @@ -21,7 +21,6 @@ package app import ( "crypto/tls" - "fmt" "net" "net/url" "os" @@ -47,9 +46,6 @@ import ( "k8s.io/kubernetes/pkg/master" "k8s.io/kubernetes/pkg/registry/cachesize" "k8s.io/kubernetes/pkg/runtime" - "k8s.io/kubernetes/pkg/runtime/serializer/versioning" - "k8s.io/kubernetes/pkg/storage" - etcdstorage "k8s.io/kubernetes/pkg/storage/etcd" utilnet "k8s.io/kubernetes/pkg/util/net" ) @@ -81,44 +77,6 @@ func verifyClusterIPFlags(s *options.APIServer) { } } -// For testing. -type newEtcdFunc func(runtime.NegotiatedSerializer, string, string, etcdstorage.EtcdConfig) (storage.Interface, error) - -func newEtcd(ns runtime.NegotiatedSerializer, storageGroupVersionString, memoryGroupVersionString string, etcdConfig etcdstorage.EtcdConfig) (etcdStorage storage.Interface, err error) { - if storageGroupVersionString == "" { - return etcdStorage, fmt.Errorf("storageVersion is required to create a etcd storage") - } - storageVersion, err := unversioned.ParseGroupVersion(storageGroupVersionString) - if err != nil { - return nil, fmt.Errorf("couldn't understand storage version %v: %v", storageGroupVersionString, err) - } - memoryVersion, err := unversioned.ParseGroupVersion(memoryGroupVersionString) - if err != nil { - return nil, fmt.Errorf("couldn't understand memory version %v: %v", memoryGroupVersionString, err) - } - - var storageConfig etcdstorage.EtcdStorageConfig - storageConfig.Config = etcdConfig - s, ok := ns.SerializerForMediaType("application/json", nil) - if !ok { - return nil, fmt.Errorf("unable to find serializer for JSON") - } - glog.Infof("constructing etcd storage interface.\n sv: %v\n mv: %v\n", storageVersion, memoryVersion) - encoder := ns.EncoderForVersion(s, storageVersion) - decoder := ns.DecoderToVersion(s, memoryVersion) - if memoryVersion.Group != storageVersion.Group { - // Allow this codec to translate between groups. - if err = versioning.EnableCrossGroupEncoding(encoder, memoryVersion.Group, storageVersion.Group); err != nil { - return nil, fmt.Errorf("error setting up encoder for %v: %v", storageGroupVersionString, err) - } - if err = versioning.EnableCrossGroupDecoding(decoder, storageVersion.Group, memoryVersion.Group); err != nil { - return nil, fmt.Errorf("error setting up decoder for %v: %v", storageGroupVersionString, err) - } - } - storageConfig.Codec = runtime.NewCodec(encoder, decoder) - return storageConfig.NewStorage() -} - // Run runs the specified APIServer. This should never exit. func Run(s *options.APIServer) error { verifyClusterIPFlags(s) @@ -228,6 +186,36 @@ func Run(s *options.APIServer) error { n := s.ServiceClusterIPRange + resourceEncoding := genericapiserver.NewDefaultResourceEncodingConfig() + groupToEncoding, err := s.StorageGroupsToEncodingVersion() + if err != nil { + glog.Fatalf("error getting group encoding: %s", err) + } + for group, storageEncodingVersion := range groupToEncoding { + resourceEncoding.SetVersionEncoding(group, storageEncodingVersion, unversioned.GroupVersion{Group: group, Version: runtime.APIVersionInternal}) + } + + storageFactory := genericapiserver.NewDefaultStorageFactory(s.EtcdConfig, api.Codecs, resourceEncoding, apiResourceConfigSource) + for _, override := range s.EtcdServersOverrides { + tokens := strings.Split(override, "#") + if len(tokens) != 2 { + glog.Errorf("invalid value of etcd server overrides: %s", override) + continue + } + + apiresource := strings.Split(tokens[0], "/") + if len(apiresource) != 2 { + glog.Errorf("invalid resource definition: %s", tokens[0]) + continue + } + group := apiresource[0] + resource := apiresource[1] + groupResource := unversioned.GroupResource{Group: group, Resource: resource} + + servers := strings.Split(tokens[1], ";") + storageFactory.SetEtcdLocation(groupResource, servers) + } + authenticator, err := authenticator.New(authenticator.AuthenticatorConfig{ BasicAuthFile: s.BasicAuthFile, ClientCAFile: s.ClientCAFile, @@ -276,13 +264,9 @@ func Run(s *options.APIServer) error { } } - storageDestinations := genericapiserver.NewStorageDestinations() - storageVersions := s.StorageGroupsToGroupVersions() - config := &master.Config{ Config: &genericapiserver.Config{ - StorageDestinations: storageDestinations, - StorageVersions: storageVersions, + StorageFactory: storageFactory, ServiceClusterIPRange: &n, EnableLogsSupport: s.EnableLogsSupport, EnableUISupport: true, diff --git a/pkg/genericapiserver/genericapiserver.go b/pkg/genericapiserver/genericapiserver.go index 8fcdf58d2a6..b8199818632 100644 --- a/pkg/genericapiserver/genericapiserver.go +++ b/pkg/genericapiserver/genericapiserver.go @@ -43,7 +43,6 @@ import ( genericetcd "k8s.io/kubernetes/pkg/registry/generic/etcd" ipallocator "k8s.io/kubernetes/pkg/registry/service/ipallocator" "k8s.io/kubernetes/pkg/runtime" - "k8s.io/kubernetes/pkg/storage" "k8s.io/kubernetes/pkg/ui" "k8s.io/kubernetes/pkg/util" "k8s.io/kubernetes/pkg/util/crypto" @@ -55,7 +54,6 @@ import ( "github.com/emicklei/go-restful" "github.com/emicklei/go-restful/swagger" "github.com/golang/glog" - "golang.org/x/net/context" ) const ( @@ -64,107 +62,6 @@ const ( globalTimeout = time.Minute ) -// StorageDestinations is a mapping from API group & resource to -// the underlying storage interfaces. -type StorageDestinations struct { - APIGroups map[string]*StorageDestinationsForAPIGroup -} - -type StorageDestinationsForAPIGroup struct { - Default storage.Interface - Overrides map[string]storage.Interface -} - -func NewStorageDestinations() StorageDestinations { - return StorageDestinations{ - APIGroups: map[string]*StorageDestinationsForAPIGroup{}, - } -} - -// AddAPIGroup replaces 'group' if it's already registered. -func (s *StorageDestinations) AddAPIGroup(group string, defaultStorage storage.Interface) { - glog.Infof("Adding storage destination for group %v", group) - s.APIGroups[group] = &StorageDestinationsForAPIGroup{ - Default: defaultStorage, - Overrides: map[string]storage.Interface{}, - } -} - -func (s *StorageDestinations) AddStorageOverride(group, resource string, override storage.Interface) { - if _, ok := s.APIGroups[group]; !ok { - s.AddAPIGroup(group, nil) - } - if s.APIGroups[group].Overrides == nil { - s.APIGroups[group].Overrides = map[string]storage.Interface{} - } - s.APIGroups[group].Overrides[resource] = override -} - -// Get finds the storage destination for the given group and resource. It will -// Fatalf if the group has no storage destination configured. -func (s *StorageDestinations) Get(group, resource string) storage.Interface { - apigroup, ok := s.APIGroups[group] - if !ok { - // TODO: return an error like a normal function. For now, - // Fatalf is better than just logging an error, because this - // condition guarantees future problems and this is a less - // mysterious failure point. - glog.Fatalf("No storage defined for API group: '%s'. Defined groups: %#v", group, s.APIGroups) - return nil - } - if apigroup.Overrides != nil { - if client, exists := apigroup.Overrides[resource]; exists { - return client - } - } - return apigroup.Default -} - -// Search is like Get, but can be used to search a list of groups. It tries the -// groups in order (and Fatalf's if none of them exist). The intention is for -// this to be used for resources that move between groups. -func (s *StorageDestinations) Search(groups []string, resource string) storage.Interface { - for _, group := range groups { - apigroup, ok := s.APIGroups[group] - if !ok { - continue - } - if apigroup.Overrides != nil { - if client, exists := apigroup.Overrides[resource]; exists { - return client - } - } - return apigroup.Default - } - // TODO: return an error like a normal function. For now, - // Fatalf is better than just logging an error, because this - // condition guarantees future problems and this is a less - // mysterious failure point. - glog.Fatalf("No storage defined for any of the groups: %v. Defined groups: %#v", groups, s.APIGroups) - return nil -} - -// Get all backends for all registered storage destinations. -// Used for getting all instances for health validations. -func (s *StorageDestinations) Backends() []string { - backends := sets.String{} - for _, group := range s.APIGroups { - if group.Default != nil { - for _, backend := range group.Default.Backends(context.TODO()) { - backends.Insert(backend) - } - } - if group.Overrides != nil { - for _, storage := range group.Overrides { - for _, backend := range storage.Backends(context.TODO()) { - backends.Insert(backend) - } - } - } - } - return backends.List() -} - // Info about an API group. type APIGroupInfo struct { GroupMeta apimachinery.GroupMeta @@ -199,9 +96,7 @@ type APIGroupInfo struct { // Config is a structure used to configure a GenericAPIServer. type Config struct { - StorageDestinations StorageDestinations - // StorageVersions is a map between groups and their storage versions - StorageVersions map[string]string + StorageFactory StorageFactory // allow downstream consumers to disable the core controller loops EnableLogsSupport bool EnableUISupport bool diff --git a/pkg/genericapiserver/genericapiserver_test.go b/pkg/genericapiserver/genericapiserver_test.go index 545a67ff853..1da9a535fa4 100644 --- a/pkg/genericapiserver/genericapiserver_test.go +++ b/pkg/genericapiserver/genericapiserver_test.go @@ -32,7 +32,6 @@ import ( "k8s.io/kubernetes/pkg/api/rest" "k8s.io/kubernetes/pkg/api/testapi" "k8s.io/kubernetes/pkg/api/unversioned" - apiutil "k8s.io/kubernetes/pkg/api/util" "k8s.io/kubernetes/pkg/apimachinery/registered" "k8s.io/kubernetes/pkg/apis/extensions" @@ -266,7 +265,7 @@ func getGroupList(server *httptest.Server) (*unversioned.APIGroupList, error) { } func TestDiscoveryAtAPIS(t *testing.T) { - master, etcdserver, config, assert := newMaster(t) + master, etcdserver, _, assert := newMaster(t) defer etcdserver.Terminate(t) server := httptest.NewServer(master.HandlerContainer.ServeMux) @@ -277,7 +276,6 @@ func TestDiscoveryAtAPIS(t *testing.T) { assert.Equal(0, len(groupList.Groups)) // Add a Group. - extensionsGroupName := extensions.GroupName extensionsVersions := []unversioned.GroupVersionForDiscovery{ { GroupVersion: testapi.Extensions.GroupVersion().String(), @@ -285,11 +283,11 @@ func TestDiscoveryAtAPIS(t *testing.T) { }, } extensionsPreferredVersion := unversioned.GroupVersionForDiscovery{ - GroupVersion: config.StorageVersions[extensions.GroupName], - Version: apiutil.GetVersion(config.StorageVersions[extensions.GroupName]), + GroupVersion: extensions.GroupName + "/preferred", + Version: "preferred", } master.AddAPIGroupForDiscovery(unversioned.APIGroup{ - Name: extensionsGroupName, + Name: extensions.GroupName, Versions: extensionsVersions, PreferredVersion: extensionsPreferredVersion, }) @@ -301,13 +299,13 @@ func TestDiscoveryAtAPIS(t *testing.T) { assert.Equal(1, len(groupList.Groups)) groupListGroup := groupList.Groups[0] - assert.Equal(extensionsGroupName, groupListGroup.Name) + assert.Equal(extensions.GroupName, groupListGroup.Name) assert.Equal(extensionsVersions, groupListGroup.Versions) assert.Equal(extensionsPreferredVersion, groupListGroup.PreferredVersion) assert.Equal(master.getServerAddressByClientCIDRs(&http.Request{}), groupListGroup.ServerAddressByClientCIDRs) // Remove the group. - master.RemoveAPIGroupForDiscovery(extensionsGroupName) + master.RemoveAPIGroupForDiscovery(extensions.GroupName) groupList, err = getGroupList(server) if err != nil { t.Fatalf("unexpected error: %v", err) diff --git a/pkg/genericapiserver/resource_encoding_config.go b/pkg/genericapiserver/resource_encoding_config.go new file mode 100644 index 00000000000..3430746c2cc --- /dev/null +++ b/pkg/genericapiserver/resource_encoding_config.go @@ -0,0 +1,118 @@ +/* +Copyright 2016 The Kubernetes Authors All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package genericapiserver + +import ( + "k8s.io/kubernetes/pkg/api/unversioned" + "k8s.io/kubernetes/pkg/apimachinery/registered" + "k8s.io/kubernetes/pkg/runtime" +) + +type ResourceEncodingConfig interface { + // StorageEncoding returns the serialization format for the resource. + // TODO this should actually return a GroupVersionKind since you can logically have multiple "matching" Kinds + // For now, it returns just the GroupVersion for consistency with old behavior + StoragageEncodingFor(unversioned.GroupResource) (unversioned.GroupVersion, error) + + // InMemoryEncodingFor returns the groupVersion for the in memory representation the storage should convert to. + InMemoryEncodingFor(unversioned.GroupResource) (unversioned.GroupVersion, error) +} + +type DefaultResourceEncodingConfig struct { + Groups map[string]*GroupResourceEncodingConfig +} + +type GroupResourceEncodingConfig struct { + DefaultExternalEncoding unversioned.GroupVersion + ExternalResourceEncodings map[string]unversioned.GroupVersion + + DefaultInternalEncoding unversioned.GroupVersion + InternalResourceEncodings map[string]unversioned.GroupVersion +} + +var _ ResourceEncodingConfig = &DefaultResourceEncodingConfig{} + +func NewDefaultResourceEncodingConfig() *DefaultResourceEncodingConfig { + return &DefaultResourceEncodingConfig{Groups: map[string]*GroupResourceEncodingConfig{}} +} + +func newGroupResourceEncodingConfig(defaultEncoding, defaultInternalVersion unversioned.GroupVersion) *GroupResourceEncodingConfig { + return &GroupResourceEncodingConfig{ + DefaultExternalEncoding: defaultEncoding, ExternalResourceEncodings: map[string]unversioned.GroupVersion{}, + DefaultInternalEncoding: defaultInternalVersion, InternalResourceEncodings: map[string]unversioned.GroupVersion{}, + } +} + +func (o *DefaultResourceEncodingConfig) SetVersionEncoding(group string, externalEncodingVersion, internalVersion unversioned.GroupVersion) { + _, groupExists := o.Groups[group] + if !groupExists { + o.Groups[group] = newGroupResourceEncodingConfig(externalEncodingVersion, internalVersion) + } + + o.Groups[group].DefaultExternalEncoding = externalEncodingVersion + o.Groups[group].DefaultInternalEncoding = internalVersion +} + +func (o *DefaultResourceEncodingConfig) SetResourceEncoding(resourceBeingStored unversioned.GroupResource, externalEncodingVersion, internalVersion unversioned.GroupVersion) { + group := resourceBeingStored.Group + _, groupExists := o.Groups[group] + if !groupExists { + o.Groups[group] = newGroupResourceEncodingConfig(externalEncodingVersion, internalVersion) + } + + o.Groups[group].ExternalResourceEncodings[resourceBeingStored.Resource] = externalEncodingVersion + o.Groups[group].InternalResourceEncodings[resourceBeingStored.Resource] = internalVersion +} + +func (o *DefaultResourceEncodingConfig) StoragageEncodingFor(resource unversioned.GroupResource) (unversioned.GroupVersion, error) { + groupMeta, err := registered.Group(resource.Group) + if err != nil { + return unversioned.GroupVersion{}, err + } + + groupEncoding, groupExists := o.Groups[resource.Group] + + if !groupExists { + // return the most preferred external version for the group + return groupMeta.GroupVersion, nil + } + + resourceOverride, resourceExists := groupEncoding.ExternalResourceEncodings[resource.Resource] + if !resourceExists { + return groupEncoding.DefaultExternalEncoding, nil + } + + return resourceOverride, nil +} + +func (o *DefaultResourceEncodingConfig) InMemoryEncodingFor(resource unversioned.GroupResource) (unversioned.GroupVersion, error) { + if _, err := registered.Group(resource.Group); err != nil { + return unversioned.GroupVersion{}, err + } + + groupEncoding, groupExists := o.Groups[resource.Group] + if !groupExists { + return unversioned.GroupVersion{Group: resource.Group, Version: runtime.APIVersionInternal}, nil + } + + resourceOverride, resourceExists := groupEncoding.InternalResourceEncodings[resource.Resource] + if !resourceExists { + return groupEncoding.DefaultInternalEncoding, nil + } + + return resourceOverride, nil +} diff --git a/pkg/genericapiserver/storage_factory.go b/pkg/genericapiserver/storage_factory.go new file mode 100644 index 00000000000..be54b10b6e5 --- /dev/null +++ b/pkg/genericapiserver/storage_factory.go @@ -0,0 +1,221 @@ +/* +Copyright 2016 The Kubernetes Authors All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package genericapiserver + +import ( + "fmt" + + "k8s.io/kubernetes/pkg/api/unversioned" + "k8s.io/kubernetes/pkg/runtime" + "k8s.io/kubernetes/pkg/runtime/serializer/versioning" + "k8s.io/kubernetes/pkg/storage" + etcdstorage "k8s.io/kubernetes/pkg/storage/etcd" + "k8s.io/kubernetes/pkg/util/sets" + + "github.com/golang/glog" +) + +// StorageFactory is the interface to locate the storage for a given GroupResource +type StorageFactory interface { + // New finds the storage destination for the given group and resource. It will + // return an error if the group has no storage destination configured. + New(groupResource unversioned.GroupResource) (storage.Interface, error) + // Backends gets all backends for all registered storage destinations. + // Used for getting all instances for health validations. + Backends() []string +} + +// DefaultStorageFactory takes a GroupResource and returns back its storage interface. This result includes: +// 1. Merged etcd config, including: auth, server locations, prefixes +// 2. Resource encodings for storage: group,version,kind to store as +// 3. Cohabitating default: some resources like hpa are exposed through multiple APIs. They must agree on 1 and 2 +type DefaultStorageFactory struct { + // DefaultEtcdConfig describes how to connect to etcd in general. It's authentication information will be used for + // every storage.Interface returned. + DefaultEtcdConfig etcdstorage.EtcdConfig + + Overrides map[unversioned.GroupResource]groupResourceOverrides + + // DefaultSerializer is used to create encoders and decoders for the storage.Interface. + DefaultSerializer runtime.NegotiatedSerializer + + // ResourceEncodingConfig describes how to encode a particular GroupVersionResource + ResourceEncodingConfig ResourceEncodingConfig + + // APIResourceConfigSource indicates whether the *storage* is enabled, NOT the API + // This is discrete from resource enablement because those are separate concerns. How it is surfaced to the user via flags + // or config is up to whoever is building this. + APIResourceConfigSource APIResourceConfigSource + + // newEtcdFn exists to be overwritten for unit testing. You should never set this in a normal world. + newEtcdFn func(ns runtime.NegotiatedSerializer, storageVersion, memoryVersion unversioned.GroupVersion, etcdConfig etcdstorage.EtcdConfig) (etcdStorage storage.Interface, err error) +} + +type groupResourceOverrides struct { + // etcdLocation contains the list of "special" locations that are used for particular GroupResources + // These are merged on top of the default DefaultEtcdConfig when requesting the storage.Interface for a given GroupResource + etcdLocation []string + // etcdPrefix contains the list of "special" prefixes for a GroupResource. Resource=* means for the entire group + etcdPrefix string + // serializer contains the list of "special" serializers for a GroupResource. Resource=* means for the entire group + serializer runtime.NegotiatedSerializer + // cohabitatingResources keeps track of which resources must be stored together. This happens when we have multiple ways + // of exposing one set of concepts. autoscaling.HPA and extensions.HPA as a for instance + // The order of the slice matters! It is the priority order of lookup for finding a storage location + cohabitatingResources []unversioned.GroupResource +} + +var _ StorageFactory = &DefaultStorageFactory{} + +const AllResources = "*" + +func NewDefaultStorageFactory(defaultEtcdConfig etcdstorage.EtcdConfig, defaultSerializer runtime.NegotiatedSerializer, resourceEncodingConfig ResourceEncodingConfig, resourceConfig APIResourceConfigSource) *DefaultStorageFactory { + return &DefaultStorageFactory{ + DefaultEtcdConfig: defaultEtcdConfig, + Overrides: map[unversioned.GroupResource]groupResourceOverrides{}, + DefaultSerializer: defaultSerializer, + ResourceEncodingConfig: resourceEncodingConfig, + APIResourceConfigSource: resourceConfig, + + newEtcdFn: newEtcd, + } +} + +func (s *DefaultStorageFactory) SetEtcdLocation(groupResource unversioned.GroupResource, location []string) { + overrides := s.Overrides[groupResource] + overrides.etcdLocation = location + s.Overrides[groupResource] = overrides +} + +func (s *DefaultStorageFactory) SetEtcdPrefix(groupResource unversioned.GroupResource, prefix string) { + overrides := s.Overrides[groupResource] + overrides.etcdPrefix = prefix + s.Overrides[groupResource] = overrides +} + +func (s *DefaultStorageFactory) SetSerializer(groupResource unversioned.GroupResource, serializer runtime.NegotiatedSerializer) { + overrides := s.Overrides[groupResource] + overrides.serializer = serializer + s.Overrides[groupResource] = overrides +} + +// AddCohabitatingResources links resources together the order of the slice matters! its the priority order of lookup for finding a storage location +func (s *DefaultStorageFactory) AddCohabitatingResources(groupResources ...unversioned.GroupResource) { + for _, groupResource := range groupResources { + overrides := s.Overrides[groupResource] + overrides.cohabitatingResources = groupResources + s.Overrides[groupResource] = overrides + } +} + +func getAllResourcesAlias(resource unversioned.GroupResource) unversioned.GroupResource { + return unversioned.GroupResource{Group: resource.Group, Resource: AllResources} +} + +func (s *DefaultStorageFactory) getStorageGroupResource(groupResource unversioned.GroupResource) unversioned.GroupResource { + for _, potentialStorageResource := range s.Overrides[groupResource].cohabitatingResources { + if s.APIResourceConfigSource.AnyVersionOfResourceEnabled(potentialStorageResource) { + return potentialStorageResource + } + } + + return groupResource +} + +// New finds the storage destination for the given group and resource. It will +// return an error if the group has no storage destination configured. +func (s *DefaultStorageFactory) New(groupResource unversioned.GroupResource) (storage.Interface, error) { + chosenStorageResource := s.getStorageGroupResource(groupResource) + + groupOverride := s.Overrides[getAllResourcesAlias(chosenStorageResource)] + exactResourceOverride := s.Overrides[chosenStorageResource] + + overriddenEtcdLocations := []string{} + if len(groupOverride.etcdLocation) > 0 { + overriddenEtcdLocations = groupOverride.etcdLocation + } + if len(exactResourceOverride.etcdLocation) > 0 { + overriddenEtcdLocations = exactResourceOverride.etcdLocation + } + + etcdPrefix := s.DefaultEtcdConfig.Prefix + if len(groupOverride.etcdPrefix) > 0 { + etcdPrefix = groupOverride.etcdPrefix + } + if len(exactResourceOverride.etcdPrefix) > 0 { + etcdPrefix = exactResourceOverride.etcdPrefix + } + + etcdSerializer := s.DefaultSerializer + if groupOverride.serializer != nil { + etcdSerializer = groupOverride.serializer + } + if exactResourceOverride.serializer != nil { + etcdSerializer = exactResourceOverride.serializer + } + // operate on copy + etcdConfig := s.DefaultEtcdConfig + etcdConfig.Prefix = etcdPrefix + if len(overriddenEtcdLocations) > 0 { + etcdConfig.ServerList = overriddenEtcdLocations + } + + storageEncodingVersion, err := s.ResourceEncodingConfig.StoragageEncodingFor(chosenStorageResource) + if err != nil { + return nil, err + } + internalVersion, err := s.ResourceEncodingConfig.InMemoryEncodingFor(groupResource) + if err != nil { + return nil, err + } + + glog.V(3).Infof("storing %v in %v, reading as %v from %v", groupResource, storageEncodingVersion, internalVersion, etcdConfig) + return s.newEtcdFn(etcdSerializer, storageEncodingVersion, internalVersion, etcdConfig) +} + +func newEtcd(ns runtime.NegotiatedSerializer, storageVersion, memoryVersion unversioned.GroupVersion, etcdConfig etcdstorage.EtcdConfig) (etcdStorage storage.Interface, err error) { + var storageConfig etcdstorage.EtcdStorageConfig + storageConfig.Config = etcdConfig + s, ok := ns.SerializerForMediaType("application/json", nil) + if !ok { + return nil, fmt.Errorf("unable to find serializer for JSON") + } + encoder := ns.EncoderForVersion(s, storageVersion) + decoder := ns.DecoderToVersion(s, memoryVersion) + if memoryVersion.Group != storageVersion.Group { + // Allow this codec to translate between groups. + if err = versioning.EnableCrossGroupEncoding(encoder, memoryVersion.Group, storageVersion.Group); err != nil { + return nil, fmt.Errorf("error setting up encoder from %v to %v: %v", memoryVersion, storageVersion, err) + } + if err = versioning.EnableCrossGroupDecoding(decoder, storageVersion.Group, memoryVersion.Group); err != nil { + return nil, fmt.Errorf("error setting up decoder from %v to %v: %v", storageVersion, memoryVersion, err) + } + } + storageConfig.Codec = runtime.NewCodec(encoder, decoder) + return storageConfig.NewStorage() +} + +// Get all backends for all registered storage destinations. +// Used for getting all instances for health validations. +func (s *DefaultStorageFactory) Backends() []string { + backends := sets.NewString(s.DefaultEtcdConfig.ServerList...) + + for _, overrides := range s.Overrides { + backends.Insert(overrides.etcdLocation...) + } + return backends.List() +} diff --git a/pkg/genericapiserver/storage_factory_test.go b/pkg/genericapiserver/storage_factory_test.go new file mode 100644 index 00000000000..90540681799 --- /dev/null +++ b/pkg/genericapiserver/storage_factory_test.go @@ -0,0 +1,88 @@ +/* +Copyright 2016 The Kubernetes Authors All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package genericapiserver + +import ( + "reflect" + "testing" + + "k8s.io/kubernetes/pkg/api" + "k8s.io/kubernetes/pkg/api/unversioned" + "k8s.io/kubernetes/pkg/apis/extensions" + "k8s.io/kubernetes/pkg/runtime" + "k8s.io/kubernetes/pkg/storage" + etcdstorage "k8s.io/kubernetes/pkg/storage/etcd" +) + +func TestUpdateEtcdOverrides(t *testing.T) { + testCases := []struct { + resource unversioned.GroupResource + servers []string + }{ + { + resource: unversioned.GroupResource{Group: api.GroupName, Resource: "resource"}, + servers: []string{"http://127.0.0.1:10000"}, + }, + { + resource: unversioned.GroupResource{Group: api.GroupName, Resource: "resource"}, + servers: []string{"http://127.0.0.1:10000", "http://127.0.0.1:20000"}, + }, + { + resource: unversioned.GroupResource{Group: extensions.GroupName, Resource: "resource"}, + servers: []string{"http://127.0.0.1:10000"}, + }, + } + + defaultEtcdLocation := []string{"http://127.0.0.1"} + for i, test := range testCases { + actualEtcdConfig := etcdstorage.EtcdConfig{} + newEtcdFn := func(ns runtime.NegotiatedSerializer, storageVersion, memoryVersion unversioned.GroupVersion, etcdConfig etcdstorage.EtcdConfig) (etcdStorage storage.Interface, err error) { + actualEtcdConfig = etcdConfig + return nil, nil + } + + defaultEtcdConfig := etcdstorage.EtcdConfig{ + Prefix: DefaultEtcdPathPrefix, + ServerList: defaultEtcdLocation, + } + storageFactory := NewDefaultStorageFactory(defaultEtcdConfig, api.Codecs, NewDefaultResourceEncodingConfig(), NewResourceConfig()) + storageFactory.newEtcdFn = newEtcdFn + storageFactory.SetEtcdLocation(test.resource, test.servers) + + var err error + _, err = storageFactory.New(test.resource) + if err != nil { + t.Errorf("%d: unexpected error %v", i, err) + continue + } + if !reflect.DeepEqual(actualEtcdConfig.ServerList, test.servers) { + t.Errorf("%d: expected %v, got %v", i, test.servers, actualEtcdConfig.ServerList) + continue + } + + _, err = storageFactory.New(unversioned.GroupResource{Group: api.GroupName, Resource: "unlikely"}) + if err != nil { + t.Errorf("%d: unexpected error %v", i, err) + continue + } + if !reflect.DeepEqual(actualEtcdConfig.ServerList, defaultEtcdLocation) { + t.Errorf("%d: expected %v, got %v", i, defaultEtcdLocation, actualEtcdConfig.ServerList) + continue + } + + } +} diff --git a/pkg/master/master.go b/pkg/master/master.go index 35e18bd8b27..fe32c15e754 100644 --- a/pkg/master/master.go +++ b/pkg/master/master.go @@ -245,21 +245,15 @@ func (m *Master) InstallAPIs(c *Config) { // Install extensions unless disabled. if c.APIResourceConfigSource.AnyResourcesForVersionEnabled(extensionsapiv1beta1.SchemeGroupVersion) { - m.thirdPartyStorage = c.StorageDestinations.APIGroups[extensions.GroupName].Default + var err error + m.thirdPartyStorage, err = c.StorageFactory.New(extensions.Resource("thirdpartyresources")) + if err != nil { + glog.Fatalf("Error getting third party storage: %v", err) + } m.thirdPartyResources = map[string]thirdPartyEntry{} extensionResources := m.getExtensionResources(c) extensionsGroupMeta := registered.GroupOrDie(extensions.GroupName) - // Update the preferred version as per StorageVersions in the config. - storageVersion, found := c.StorageVersions[extensionsGroupMeta.GroupVersion.Group] - if !found { - glog.Fatalf("Couldn't find storage version of group %v", extensionsGroupMeta.GroupVersion.Group) - } - preferedGroupVersion, err := unversioned.ParseGroupVersion(storageVersion) - if err != nil { - glog.Fatalf("Error in parsing group version %s: %v", storageVersion, err) - } - extensionsGroupMeta.GroupVersion = preferedGroupVersion apiGroupInfo := genericapiserver.APIGroupInfo{ GroupMeta: *extensionsGroupMeta, @@ -390,13 +384,8 @@ func (m *Master) InstallAPIs(c *Config) { } func (m *Master) initV1ResourcesStorage(c *Config) { - dbClient := func(resource string) storage.Interface { return c.StorageDestinations.Get("", resource) } restOptions := func(resource string) generic.RESTOptions { - return generic.RESTOptions{ - Storage: dbClient(resource), - Decorator: m.StorageDecorator(), - DeleteCollectionWorkers: m.deleteCollectionWorkers, - } + return m.GetRESTOptionsOrDie(c, api.Resource(resource)) } podTemplateStorage := podtemplateetcd.NewREST(restOptions("podTemplates")) @@ -426,8 +415,8 @@ func (m *Master) initV1ResourcesStorage(c *Config) { m.ProxyTransport, ) - serviceStorage, serviceStatusStorage := serviceetcd.NewREST(restOptions("services")) - m.serviceRegistry = service.NewRegistry(serviceStorage) + serviceRESTStorage, serviceStatusStorage := serviceetcd.NewREST(restOptions("services")) + m.serviceRegistry = service.NewRegistry(serviceRESTStorage) var serviceClusterIPRegistry service.RangeRegistry serviceClusterIPRange := m.ServiceClusterIPRange @@ -435,9 +424,16 @@ func (m *Master) initV1ResourcesStorage(c *Config) { glog.Fatalf("service clusterIPRange is nil") return } + + serviceStorage, err := c.StorageFactory.New(api.Resource("services")) + if err != nil { + glog.Fatal(err.Error()) + } + serviceClusterIPAllocator := ipallocator.NewAllocatorCIDRRange(serviceClusterIPRange, func(max int, rangeSpec string) allocator.Interface { mem := allocator.NewAllocationMap(max, rangeSpec) - etcd := etcdallocator.NewEtcd(mem, "/ranges/serviceips", api.Resource("serviceipallocations"), dbClient("services")) + // TODO etcdallocator package to return a storage interface via the storageFactory + etcd := etcdallocator.NewEtcd(mem, "/ranges/serviceips", api.Resource("serviceipallocations"), serviceStorage) serviceClusterIPRegistry = etcd return etcd }) @@ -446,7 +442,8 @@ func (m *Master) initV1ResourcesStorage(c *Config) { var serviceNodePortRegistry service.RangeRegistry serviceNodePortAllocator := portallocator.NewPortAllocatorCustom(m.ServiceNodePortRange, func(max int, rangeSpec string) allocator.Interface { mem := allocator.NewAllocationMap(max, rangeSpec) - etcd := etcdallocator.NewEtcd(mem, "/ranges/servicenodeports", api.Resource("servicenodeportallocations"), dbClient("services")) + // TODO etcdallocator package to return a storage interface via the storageFactory + etcd := etcdallocator.NewEtcd(mem, "/ranges/servicenodeports", api.Resource("servicenodeportallocations"), serviceStorage) serviceNodePortRegistry = etcd return etcd }) @@ -541,7 +538,7 @@ func (m *Master) getServersToValidate(c *Config) map[string]apiserver.Server { "scheduler": {Addr: "127.0.0.1", Port: ports.SchedulerPort, Path: "/healthz"}, } - for ix, machine := range c.StorageDestinations.Backends() { + for ix, machine := range c.StorageFactory.Backends() { etcdUrl, err := url.Parse(machine) if err != nil { glog.Errorf("Failed to parse etcd url for validation: %v", err) @@ -726,24 +723,36 @@ func (m *Master) thirdpartyapi(group, kind, version string) *apiserver.APIGroupV } } +func (m *Master) GetRESTOptionsOrDie(c *Config, resource unversioned.GroupResource) generic.RESTOptions { + storage, err := c.StorageFactory.New(resource) + if err != nil { + glog.Fatalf("Unable to find storage destination for %v, due to %v", resource, err.Error()) + } + + return generic.RESTOptions{ + Storage: storage, + Decorator: m.StorageDecorator(), + DeleteCollectionWorkers: m.deleteCollectionWorkers, + } +} + // getExperimentalResources returns the resources for extensions api func (m *Master) getExtensionResources(c *Config) map[string]rest.Storage { restOptions := func(resource string) generic.RESTOptions { - return generic.RESTOptions{ - Storage: c.StorageDestinations.Get(extensions.GroupName, resource), - Decorator: m.StorageDecorator(), - DeleteCollectionWorkers: m.deleteCollectionWorkers, - } + return m.GetRESTOptionsOrDie(c, extensions.Resource(resource)) } + // TODO update when we support more than one version of this group version := extensionsapiv1beta1.SchemeGroupVersion storage := map[string]rest.Storage{} if c.APIResourceConfigSource.ResourceEnabled(version.WithResource("horizontalpodautoscalers")) { - m.constructHPAResources(c, storage) - controllerStorage := expcontrolleretcd.NewStorage( - generic.RESTOptions{Storage: c.StorageDestinations.Get("", "replicationControllers"), Decorator: m.StorageDecorator(), DeleteCollectionWorkers: m.deleteCollectionWorkers}) + hpaStorage, hpaStatusStorage := horizontalpodautoscaleretcd.NewREST(restOptions("horizontalpodautoscalers")) + storage["horizontalpodautoscalers"] = hpaStorage + storage["horizontalpodautoscalers/status"] = hpaStatusStorage + + controllerStorage := expcontrolleretcd.NewStorage(m.GetRESTOptionsOrDie(c, api.Resource("replicationControllers"))) storage["replicationcontrollers"] = controllerStorage.ReplicationController storage["replicationcontrollers/scale"] = controllerStorage.Scale } @@ -776,7 +785,9 @@ func (m *Master) getExtensionResources(c *Config) map[string]rest.Storage { storage["deployments/scale"] = deploymentStorage.Scale } if c.APIResourceConfigSource.ResourceEnabled(version.WithResource("jobs")) { - m.constructJobResources(c, storage) + jobsStorage, jobsStatusStorage := jobetcd.NewREST(restOptions("jobs")) + storage["jobs"] = jobsStorage + storage["jobs/status"] = jobsStatusStorage } ingressStorage, ingressStatusStorage := ingressetcd.NewREST(restOptions("ingresses")) if c.APIResourceConfigSource.ResourceEnabled(version.WithResource("ingresses")) { @@ -797,25 +808,6 @@ func (m *Master) getExtensionResources(c *Config) map[string]rest.Storage { return storage } -// constructHPAResources makes HPA resources and adds them to the storage map. -// They're installed in both autoscaling and extensions. It's assumed that -// you've already done the check that they should be on. -func (m *Master) constructHPAResources(c *Config, restStorage map[string]rest.Storage) { - // Note that hpa's storage settings are changed by changing the autoscaling - // group. Clearly we want all hpas to be stored in the same place no - // matter where they're accessed from. - restOptions := func(resource string) generic.RESTOptions { - return generic.RESTOptions{ - Storage: c.StorageDestinations.Search([]string{autoscaling.GroupName, extensions.GroupName}, resource), - Decorator: m.StorageDecorator(), - DeleteCollectionWorkers: m.deleteCollectionWorkers, - } - } - autoscalerStorage, autoscalerStatusStorage := horizontalpodautoscaleretcd.NewREST(restOptions("horizontalpodautoscalers")) - restStorage["horizontalpodautoscalers"] = autoscalerStorage - restStorage["horizontalpodautoscalers/status"] = autoscalerStatusStorage -} - // getAutoscalingResources returns the resources for autoscaling api func (m *Master) getAutoscalingResources(c *Config) map[string]rest.Storage { // TODO update when we support more than one version of this group @@ -823,30 +815,13 @@ func (m *Master) getAutoscalingResources(c *Config) map[string]rest.Storage { storage := map[string]rest.Storage{} if c.APIResourceConfigSource.ResourceEnabled(version.WithResource("horizontalpodautoscalers")) { - m.constructHPAResources(c, storage) + hpaStorage, hpaStatusStorage := horizontalpodautoscaleretcd.NewREST(m.GetRESTOptionsOrDie(c, autoscaling.Resource("horizontalpodautoscalers"))) + storage["horizontalpodautoscalers"] = hpaStorage + storage["horizontalpodautoscalers/status"] = hpaStatusStorage } return storage } -// constructJobResources makes Job resources and adds them to the storage map. -// They're installed in both batch and extensions. It's assumed that you've -// already done the check that they should be on. -func (m *Master) constructJobResources(c *Config, restStorage map[string]rest.Storage) { - // Note that job's storage settings are changed by changing the batch - // group. Clearly we want all jobs to be stored in the same place no - // matter where they're accessed from. - restOptions := func(resource string) generic.RESTOptions { - return generic.RESTOptions{ - Storage: c.StorageDestinations.Search([]string{batch.GroupName, extensions.GroupName}, resource), - Decorator: m.StorageDecorator(), - DeleteCollectionWorkers: m.deleteCollectionWorkers, - } - } - jobStorage, jobStatusStorage := jobetcd.NewREST(restOptions("jobs")) - restStorage["jobs"] = jobStorage - restStorage["jobs/status"] = jobStatusStorage -} - // getBatchResources returns the resources for batch api func (m *Master) getBatchResources(c *Config) map[string]rest.Storage { // TODO update when we support more than one version of this group @@ -854,26 +829,21 @@ func (m *Master) getBatchResources(c *Config) map[string]rest.Storage { storage := map[string]rest.Storage{} if c.APIResourceConfigSource.ResourceEnabled(version.WithResource("jobs")) { - m.constructJobResources(c, storage) + jobsStorage, jobsStatusStorage := jobetcd.NewREST(m.GetRESTOptionsOrDie(c, batch.Resource("jobs"))) + storage["jobs"] = jobsStorage + storage["jobs/status"] = jobsStatusStorage } return storage } -// getPetSetResources returns the resources for batch api +// getPetSetResources returns the resources for apps api func (m *Master) getAppsResources(c *Config) map[string]rest.Storage { // TODO update when we support more than one version of this group version := appsapi.SchemeGroupVersion storage := map[string]rest.Storage{} if c.APIResourceConfigSource.ResourceEnabled(version.WithResource("petsets")) { - restOptions := func(resource string) generic.RESTOptions { - return generic.RESTOptions{ - Storage: c.StorageDestinations.Get(apps.GroupName, resource), - Decorator: m.StorageDecorator(), - DeleteCollectionWorkers: m.deleteCollectionWorkers, - } - } - petsetStorage, petsetStatusStorage := petsetetcd.NewREST(restOptions("petsets")) + petsetStorage, petsetStatusStorage := petsetetcd.NewREST(m.GetRESTOptionsOrDie(c, apps.Resource("petsets"))) storage["petsets"] = petsetStorage storage["petsets/status"] = petsetStatusStorage } diff --git a/pkg/master/master_test.go b/pkg/master/master_test.go index 87a93c5c30f..ca315f8876f 100644 --- a/pkg/master/master_test.go +++ b/pkg/master/master_test.go @@ -36,8 +36,8 @@ import ( utilnet "k8s.io/kubernetes/pkg/util/net" "k8s.io/kubernetes/pkg/util/sets" - apiutil "k8s.io/kubernetes/pkg/api/util" apiv1 "k8s.io/kubernetes/pkg/api/v1" + "k8s.io/kubernetes/pkg/apimachinery/registered" "k8s.io/kubernetes/pkg/apis/apps" appsapi "k8s.io/kubernetes/pkg/apis/apps" "k8s.io/kubernetes/pkg/apis/autoscaling" @@ -72,26 +72,27 @@ func setUp(t *testing.T) (*Master, *etcdtesting.EtcdTestServer, Config, *assert. config := Config{ Config: &genericapiserver.Config{}, } - storageVersions := make(map[string]string) - storageDestinations := genericapiserver.NewStorageDestinations() - storageDestinations.AddAPIGroup( - api.GroupName, etcdstorage.NewEtcdStorage(server.Client, testapi.Default.Codec(), etcdtest.PathPrefix(), false, etcdtest.DeserializationCacheSize)) - storageDestinations.AddAPIGroup( - autoscaling.GroupName, etcdstorage.NewEtcdStorage(server.Client, testapi.Autoscaling.Codec(), etcdtest.PathPrefix(), false, etcdtest.DeserializationCacheSize)) - storageDestinations.AddAPIGroup( - batch.GroupName, etcdstorage.NewEtcdStorage(server.Client, testapi.Batch.Codec(), etcdtest.PathPrefix(), false, etcdtest.DeserializationCacheSize)) - storageDestinations.AddAPIGroup( - apps.GroupName, etcdstorage.NewEtcdStorage(server.Client, testapi.Apps.Codec(), etcdtest.PathPrefix(), false, etcdtest.DeserializationCacheSize)) - storageDestinations.AddAPIGroup( - extensions.GroupName, etcdstorage.NewEtcdStorage(server.Client, testapi.Extensions.Codec(), etcdtest.PathPrefix(), false, etcdtest.DeserializationCacheSize)) - config.StorageDestinations = storageDestinations - storageVersions[api.GroupName] = testapi.Default.GroupVersion().String() - storageVersions[autoscaling.GroupName] = testapi.Autoscaling.GroupVersion().String() - storageVersions[batch.GroupName] = testapi.Batch.GroupVersion().String() - storageVersions[apps.GroupName] = testapi.Apps.GroupVersion().String() - storageVersions[extensions.GroupName] = testapi.Extensions.GroupVersion().String() - config.StorageVersions = storageVersions + etcdConfig := etcdstorage.EtcdConfig{ + Prefix: etcdtest.PathPrefix(), + CAFile: server.CAFile, + KeyFile: server.KeyFile, + CertFile: server.CertFile, + } + for _, url := range server.ClientURLs { + etcdConfig.ServerList = append(etcdConfig.ServerList, url.String()) + } + + resourceEncoding := genericapiserver.NewDefaultResourceEncodingConfig() + resourceEncoding.SetVersionEncoding(api.GroupName, *testapi.Default.GroupVersion(), unversioned.GroupVersion{Group: api.GroupName, Version: runtime.APIVersionInternal}) + resourceEncoding.SetVersionEncoding(autoscaling.GroupName, *testapi.Autoscaling.GroupVersion(), unversioned.GroupVersion{Group: autoscaling.GroupName, Version: runtime.APIVersionInternal}) + resourceEncoding.SetVersionEncoding(batch.GroupName, *testapi.Batch.GroupVersion(), unversioned.GroupVersion{Group: batch.GroupName, Version: runtime.APIVersionInternal}) + resourceEncoding.SetVersionEncoding(apps.GroupName, *testapi.Apps.GroupVersion(), unversioned.GroupVersion{Group: apps.GroupName, Version: runtime.APIVersionInternal}) + resourceEncoding.SetVersionEncoding(extensions.GroupName, *testapi.Extensions.GroupVersion(), unversioned.GroupVersion{Group: extensions.GroupName, Version: runtime.APIVersionInternal}) + storageFactory := genericapiserver.NewDefaultStorageFactory(etcdConfig, api.Codecs, resourceEncoding, DefaultAPIResourceConfigSource()) + + config.StorageFactory = storageFactory + config.APIResourceConfigSource = DefaultAPIResourceConfigSource() config.PublicAddress = net.ParseIP("192.168.10.4") config.Serializer = api.Codecs config.KubeletClient = client.FakeKubeletClient{} @@ -398,7 +399,7 @@ func TestAPIVersionOfDiscoveryEndpoints(t *testing.T) { } func TestDiscoveryAtAPIS(t *testing.T) { - master, etcdserver, config, assert := newLimitedMaster(t) + master, etcdserver, _, assert := newLimitedMaster(t) defer etcdserver.Terminate(t) server := httptest.NewServer(master.HandlerContainer.ServeMux) @@ -444,20 +445,20 @@ func TestDiscoveryAtAPIS(t *testing.T) { } expectPreferredVersion := map[string]unversioned.GroupVersionForDiscovery{ autoscaling.GroupName: { - GroupVersion: config.StorageVersions[autoscaling.GroupName], - Version: apiutil.GetVersion(config.StorageVersions[autoscaling.GroupName]), + GroupVersion: registered.GroupOrDie(autoscaling.GroupName).GroupVersion.String(), + Version: registered.GroupOrDie(autoscaling.GroupName).GroupVersion.Version, }, batch.GroupName: { - GroupVersion: config.StorageVersions[batch.GroupName], - Version: apiutil.GetVersion(config.StorageVersions[batch.GroupName]), + GroupVersion: registered.GroupOrDie(batch.GroupName).GroupVersion.String(), + Version: registered.GroupOrDie(batch.GroupName).GroupVersion.Version, }, apps.GroupName: { - GroupVersion: config.StorageVersions[apps.GroupName], - Version: apiutil.GetVersion(config.StorageVersions[apps.GroupName]), + GroupVersion: registered.GroupOrDie(apps.GroupName).GroupVersion.String(), + Version: registered.GroupOrDie(apps.GroupName).GroupVersion.Version, }, extensions.GroupName: { - GroupVersion: config.StorageVersions[extensions.GroupName], - Version: apiutil.GetVersion(config.StorageVersions[extensions.GroupName]), + GroupVersion: registered.GroupOrDie(extensions.GroupName).GroupVersion.String(), + Version: registered.GroupOrDie(extensions.GroupName).GroupVersion.Version, }, } diff --git a/pkg/storage/etcd/testing/utils.go b/pkg/storage/etcd/testing/utils.go index 3d85c1d1dc7..8cf623f8f00 100644 --- a/pkg/storage/etcd/testing/utils.go +++ b/pkg/storage/etcd/testing/utils.go @@ -231,6 +231,7 @@ func NewEtcdTestClientServer(t *testing.T) *EtcdTestServer { t.Fatalf("Failed to start etcd server error=%v", err) return nil } + cfg := etcd.Config{ Endpoints: server.ClientURLs.StringSlice(), Transport: newHttpTransport(t, server.CertFile, server.KeyFile, server.CAFile), diff --git a/test/integration/framework/master_utils.go b/test/integration/framework/master_utils.go index 4e7b7381700..c0fd44801ff 100644 --- a/test/integration/framework/master_utils.go +++ b/test/integration/framework/master_utils.go @@ -29,6 +29,7 @@ import ( "github.com/golang/glog" "k8s.io/kubernetes/pkg/api" "k8s.io/kubernetes/pkg/api/testapi" + "k8s.io/kubernetes/pkg/api/unversioned" "k8s.io/kubernetes/pkg/apis/apps" "k8s.io/kubernetes/pkg/apis/autoscaling" "k8s.io/kubernetes/pkg/apis/batch" @@ -149,31 +150,33 @@ func startMasterOrDie(masterConfig *master.Config) (*master.Master, *httptest.Se // Returns a basic master config. func NewMasterConfig() *master.Config { - etcdClient := NewEtcdClient() - storageVersions := make(map[string]string) + etcdConfig := etcdstorage.EtcdConfig{ + ServerList: []string{"http://127.0.0.1:4001"}, + Prefix: etcdtest.PathPrefix(), + } - etcdStorage := etcdstorage.NewEtcdStorage(etcdClient, testapi.Default.Codec(), etcdtest.PathPrefix(), false, etcdtest.DeserializationCacheSize) - storageVersions[api.GroupName] = testapi.Default.GroupVersion().String() - autoscalingEtcdStorage := NewAutoscalingEtcdStorage(etcdClient) - storageVersions[autoscaling.GroupName] = testapi.Autoscaling.GroupVersion().String() - batchEtcdStorage := NewBatchEtcdStorage(etcdClient) - storageVersions[batch.GroupName] = testapi.Batch.GroupVersion().String() - appsEtcdStorage := NewAppsEtcdStorage(etcdClient) - storageVersions[apps.GroupName] = testapi.Apps.GroupVersion().String() - expEtcdStorage := NewExtensionsEtcdStorage(etcdClient) - storageVersions[extensions.GroupName] = testapi.Extensions.GroupVersion().String() + negotiatedSerializer := NewSingleContentTypeSerializer(api.Scheme, testapi.Default.Codec(), "application/json") - storageDestinations := genericapiserver.NewStorageDestinations() - storageDestinations.AddAPIGroup(api.GroupName, etcdStorage) - storageDestinations.AddAPIGroup(autoscaling.GroupName, autoscalingEtcdStorage) - storageDestinations.AddAPIGroup(batch.GroupName, batchEtcdStorage) - storageDestinations.AddAPIGroup(apps.GroupName, appsEtcdStorage) - storageDestinations.AddAPIGroup(extensions.GroupName, expEtcdStorage) + storageFactory := genericapiserver.NewDefaultStorageFactory(etcdConfig, negotiatedSerializer, genericapiserver.NewDefaultResourceEncodingConfig(), master.DefaultAPIResourceConfigSource()) + storageFactory.SetSerializer( + unversioned.GroupResource{Group: api.GroupName, Resource: genericapiserver.AllResources}, + NewSingleContentTypeSerializer(api.Scheme, testapi.Default.Codec(), "application/json")) + storageFactory.SetSerializer( + unversioned.GroupResource{Group: autoscaling.GroupName, Resource: genericapiserver.AllResources}, + NewSingleContentTypeSerializer(api.Scheme, testapi.Autoscaling.Codec(), "application/json")) + storageFactory.SetSerializer( + unversioned.GroupResource{Group: batch.GroupName, Resource: genericapiserver.AllResources}, + NewSingleContentTypeSerializer(api.Scheme, testapi.Batch.Codec(), "application/json")) + storageFactory.SetSerializer( + unversioned.GroupResource{Group: apps.GroupName, Resource: genericapiserver.AllResources}, + NewSingleContentTypeSerializer(api.Scheme, testapi.Apps.Codec(), "application/json")) + storageFactory.SetSerializer( + unversioned.GroupResource{Group: extensions.GroupName, Resource: genericapiserver.AllResources}, + NewSingleContentTypeSerializer(api.Scheme, testapi.Extensions.Codec(), "application/json")) return &master.Config{ Config: &genericapiserver.Config{ - StorageDestinations: storageDestinations, - StorageVersions: storageVersions, + StorageFactory: storageFactory, APIResourceConfigSource: master.DefaultAPIResourceConfigSource(), APIPrefix: "/api", APIGroupPrefix: "/apis", diff --git a/test/integration/framework/serializer.go b/test/integration/framework/serializer.go new file mode 100644 index 00000000000..aacd913e95c --- /dev/null +++ b/test/integration/framework/serializer.go @@ -0,0 +1,59 @@ +/* +Copyright 2016 The Kubernetes Authors All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package framework + +import ( + "k8s.io/kubernetes/pkg/api/unversioned" + "k8s.io/kubernetes/pkg/runtime" + "k8s.io/kubernetes/pkg/runtime/serializer/versioning" +) + +// NewSingleContentTypeSerializer wraps a serializer in a NegotiatedSerializer that handles one content type +func NewSingleContentTypeSerializer(scheme *runtime.Scheme, serializer runtime.Serializer, contentType string) runtime.NegotiatedSerializer { + return &wrappedSerializer{ + scheme: scheme, + serializer: serializer, + contentType: contentType, + } +} + +type wrappedSerializer struct { + scheme *runtime.Scheme + serializer runtime.Serializer + contentType string +} + +var _ runtime.NegotiatedSerializer = &wrappedSerializer{} + +func (s *wrappedSerializer) SupportedMediaTypes() []string { + return []string{s.contentType} +} +func (s *wrappedSerializer) SerializerForMediaType(mediaType string, options map[string]string) (runtime.Serializer, bool) { + if mediaType != s.contentType { + return nil, false + } + + return s.serializer, true +} + +func (s *wrappedSerializer) EncoderForVersion(serializer runtime.Serializer, gv unversioned.GroupVersion) runtime.Encoder { + return versioning.NewCodec(s.serializer, s.serializer, s.scheme, s.scheme, s.scheme, runtime.ObjectTyperToTyper(s.scheme), []unversioned.GroupVersion{gv}, nil) +} + +func (s *wrappedSerializer) DecoderToVersion(serializer runtime.Serializer, gv unversioned.GroupVersion) runtime.Decoder { + return versioning.NewCodec(s.serializer, s.serializer, s.scheme, s.scheme, s.scheme, runtime.ObjectTyperToTyper(s.scheme), nil, []unversioned.GroupVersion{gv}) +}