diff --git a/cmd/kube-apiserver/app/options/options.go b/cmd/kube-apiserver/app/options/options.go index 31bd626d68e..5b4b0e9dede 100644 --- a/cmd/kube-apiserver/app/options/options.go +++ b/cmd/kube-apiserver/app/options/options.go @@ -24,6 +24,7 @@ import ( "k8s.io/kubernetes/pkg/admission" "k8s.io/kubernetes/pkg/api" + apiutil "k8s.io/kubernetes/pkg/api/util" "k8s.io/kubernetes/pkg/api/validation" "k8s.io/kubernetes/pkg/apimachinery/registered" "k8s.io/kubernetes/pkg/apiserver" @@ -80,8 +81,12 @@ type APIServer struct { ServiceClusterIPRange net.IPNet // TODO: make this a list ServiceNodePortRange utilnet.PortRange StorageVersions string - TokenAuthFile string - WatchCacheSizes []string + // The default values for StorageVersions. StorageVersions overrides + // these; you can change this if you want to change the defaults (e.g., + // for testing). This is not actually exposed as a flag. + DefaultStorageVersions string + TokenAuthFile string + WatchCacheSizes []string } // NewAPIServer creates a new APIServer object with default parameters @@ -99,6 +104,7 @@ func NewAPIServer() *APIServer { MasterServiceNamespace: api.NamespaceDefault, RuntimeConfig: make(util.ConfigurationMap), StorageVersions: registered.AllPreferredGroupVersions(), + DefaultStorageVersions: registered.AllPreferredGroupVersions(), KubeletConfig: kubeletclient.KubeletClientConfig{ Port: ports.KubeletPort, EnableHttps: true, @@ -109,6 +115,42 @@ func NewAPIServer() *APIServer { return &s } +// dest must be a map of group to groupVersion. +func gvToMap(gvList string, dest map[string]string) { + for _, gv := range strings.Split(gvList, ",") { + if gv == "" { + continue + } + // We accept two formats. "group/version" OR + // "group=group/version". The latter is used when types + // move between groups. + if !strings.Contains(gv, "=") { + dest[apiutil.GetGroup(gv)] = gv + } else { + parts := strings.SplitN(gv, "=", 2) + // TODO: error checking. + dest[parts[0]] = parts[1] + } + } +} + +// StorageGroupsToGroupVersions returns a map from group name to group version, +// computed from the s.DeprecatedStorageVersion and s.StorageVersions flags. +// TODO: can we move the whole storage version concept to the generic apiserver? +func (s *APIServer) StorageGroupsToGroupVersions() map[string]string { + storageVersionMap := map[string]string{} + if s.DeprecatedStorageVersion != "" { + storageVersionMap[""] = s.DeprecatedStorageVersion + } + + // First, get the defaults. + gvToMap(s.DefaultStorageVersions, storageVersionMap) + // Override any defaults with the user settings. + gvToMap(s.StorageVersions, storageVersionMap) + + return storageVersionMap +} + // AddFlags adds flags for a specific APIServer to the specified FlagSet func (s *APIServer) AddFlags(fs *pflag.FlagSet) { // Note: the weird ""+ in below lines seems to be the only way to get gofmt to @@ -150,9 +192,10 @@ func (s *APIServer) AddFlags(fs *pflag.FlagSet) { fs.MarkDeprecated("api-prefix", "--api-prefix is deprecated and will be removed when the v1 API is retired.") fs.StringVar(&s.DeprecatedStorageVersion, "storage-version", s.DeprecatedStorageVersion, "The version to store the legacy v1 resources with. Defaults to server preferred") fs.MarkDeprecated("storage-version", "--storage-version is deprecated and will be removed when the v1 API is retired. See --storage-versions instead.") - fs.StringVar(&s.StorageVersions, "storage-versions", s.StorageVersions, "The versions to store resources with. "+ - "Different groups may be stored in different versions. Specified in the format \"group1/version1,group2/version2...\". "+ - "This flag expects a complete list of storage versions of ALL groups registered in the server. "+ + fs.StringVar(&s.StorageVersions, "storage-versions", s.StorageVersions, "The per-group version to store resources in. "+ + "Specified in the format \"group1/version1,group2/version2,...\". "+ + "In the case where objects are moved from one group to the other, you may specify the format \"group1=group2/v1beta1,group3/v1beta1,...\". "+ + "You only need to pass the groups you wish to change from the defaults. "+ "It defaults to a list of preferred versions of all registered groups, which is derived from the KUBE_API_VERSIONS environment variable.") fs.StringVar(&s.CloudProvider, "cloud-provider", s.CloudProvider, "The provider for cloud services. Empty string for no provider.") fs.StringVar(&s.CloudConfigFile, "cloud-config", s.CloudConfigFile, "The path to the cloud provider configuration file. Empty string for no configuration file.") diff --git a/cmd/kube-apiserver/app/options/options_test.go b/cmd/kube-apiserver/app/options/options_test.go new file mode 100644 index 00000000000..bd6f3577f1e --- /dev/null +++ b/cmd/kube-apiserver/app/options/options_test.go @@ -0,0 +1,78 @@ +/* +Copyright 2014 The Kubernetes Authors All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package options + +import ( + "reflect" + "testing" + + "k8s.io/kubernetes/pkg/api" + "k8s.io/kubernetes/pkg/apis/batch" + "k8s.io/kubernetes/pkg/apis/extensions" +) + +func TestGenerateStorageVersionMap(t *testing.T) { + testCases := []struct { + legacyVersion string + storageVersions string + defaultVersions string + expectedMap map[string]string + }{ + { + legacyVersion: "v1", + storageVersions: "v1,extensions/v1beta1", + expectedMap: map[string]string{ + api.GroupName: "v1", + extensions.GroupName: "extensions/v1beta1", + }, + }, + { + legacyVersion: "", + storageVersions: "extensions/v1beta1,v1", + expectedMap: map[string]string{ + api.GroupName: "v1", + extensions.GroupName: "extensions/v1beta1", + }, + }, + { + legacyVersion: "", + storageVersions: "batch=extensions/v1beta1,v1", + defaultVersions: "extensions/v1beta1,v1,batch/v1", + expectedMap: map[string]string{ + api.GroupName: "v1", + batch.GroupName: "extensions/v1beta1", + extensions.GroupName: "extensions/v1beta1", + }, + }, + { + legacyVersion: "", + storageVersions: "", + expectedMap: map[string]string{}, + }, + } + for i, test := range testCases { + s := APIServer{ + DeprecatedStorageVersion: test.legacyVersion, + StorageVersions: test.storageVersions, + DefaultStorageVersions: test.defaultVersions, + } + output := s.StorageGroupsToGroupVersions() + if !reflect.DeepEqual(test.expectedMap, output) { + t.Errorf("%v: unexpected error. expect: %v, got: %v", i, test.expectedMap, output) + } + } +} diff --git a/cmd/kube-apiserver/app/server.go b/cmd/kube-apiserver/app/server.go index 6a157d37e88..877ede50b07 100644 --- a/cmd/kube-apiserver/app/server.go +++ b/cmd/kube-apiserver/app/server.go @@ -36,7 +36,6 @@ import ( "k8s.io/kubernetes/pkg/admission" "k8s.io/kubernetes/pkg/api" "k8s.io/kubernetes/pkg/api/unversioned" - apiutil "k8s.io/kubernetes/pkg/api/util" "k8s.io/kubernetes/pkg/apimachinery/registered" "k8s.io/kubernetes/pkg/apis/extensions" "k8s.io/kubernetes/pkg/apiserver" @@ -51,6 +50,7 @@ import ( "k8s.io/kubernetes/pkg/master" "k8s.io/kubernetes/pkg/registry/cachesize" "k8s.io/kubernetes/pkg/runtime" + "k8s.io/kubernetes/pkg/runtime/serializer/versioning" "k8s.io/kubernetes/pkg/serviceaccount" "k8s.io/kubernetes/pkg/storage" etcdstorage "k8s.io/kubernetes/pkg/storage/etcd" @@ -85,15 +85,20 @@ func verifyClusterIPFlags(s *options.APIServer) { } } -type newEtcdFunc func([]string, runtime.NegotiatedSerializer, string, string, bool) (storage.Interface, error) +// For testing. +type newEtcdFunc func([]string, runtime.NegotiatedSerializer, string, string, string, bool) (storage.Interface, error) -func newEtcd(etcdServerList []string, ns runtime.NegotiatedSerializer, storageGroupVersionString, pathPrefix string, quorum bool) (etcdStorage storage.Interface, err error) { +func newEtcd(etcdServerList []string, ns runtime.NegotiatedSerializer, storageGroupVersionString, memoryGroupVersionString, pathPrefix string, quorum bool) (etcdStorage storage.Interface, err error) { if storageGroupVersionString == "" { return etcdStorage, fmt.Errorf("storageVersion is required to create a etcd storage") } storageVersion, err := unversioned.ParseGroupVersion(storageGroupVersionString) if err != nil { - return nil, err + return nil, fmt.Errorf("couldn't understand storage version %v: %v", storageGroupVersionString, err) + } + memoryVersion, err := unversioned.ParseGroupVersion(memoryGroupVersionString) + if err != nil { + return nil, fmt.Errorf("couldn't understand memory version %v: %v", memoryGroupVersionString, err) } var storageConfig etcdstorage.EtcdConfig @@ -104,23 +109,20 @@ func newEtcd(etcdServerList []string, ns runtime.NegotiatedSerializer, storageGr if !ok { return nil, fmt.Errorf("unable to find serializer for JSON") } - storageConfig.Codec = runtime.NewCodec(ns.EncoderForVersion(s, storageVersion), ns.DecoderToVersion(s, unversioned.GroupVersion{Group: storageVersion.Group, Version: runtime.APIVersionInternal})) - return storageConfig.NewStorage() -} - -// convert to a map between group and groupVersions. -func generateStorageVersionMap(legacyVersion string, storageVersions string) map[string]string { - storageVersionMap := map[string]string{} - if legacyVersion != "" { - storageVersionMap[""] = legacyVersion - } - if storageVersions != "" { - groupVersions := strings.Split(storageVersions, ",") - for _, gv := range groupVersions { - storageVersionMap[apiutil.GetGroup(gv)] = gv + glog.Infof("constructing etcd storage interface.\n sv: %v\n mv: %v\n", storageVersion, memoryVersion) + encoder := ns.EncoderForVersion(s, storageVersion) + decoder := ns.DecoderToVersion(s, memoryVersion) + if memoryVersion.Group != storageVersion.Group { + // Allow this codec to translate between groups. + if err = versioning.EnableCrossGroupEncoding(encoder, memoryVersion.Group, storageVersion.Group); err != nil { + return nil, fmt.Errorf("error setting up encoder for %v: %v", storageGroupVersionString, err) + } + if err = versioning.EnableCrossGroupDecoding(decoder, storageVersion.Group, memoryVersion.Group); err != nil { + return nil, fmt.Errorf("error setting up decoder for %v: %v", storageGroupVersionString, err) } } - return storageVersionMap + storageConfig.Codec = runtime.NewCodec(encoder, decoder) + return storageConfig.NewStorage() } // parse the value of --etcd-servers-overrides and update given storageDestinations. @@ -153,7 +155,11 @@ func updateEtcdOverrides(overrides []string, storageVersions map[string]string, } servers := strings.Split(tokens[1], ";") - etcdOverrideStorage, err := newEtcdFn(servers, api.Codecs, storageVersions[apigroup.GroupVersion.Group], prefix, quorum) + // Note, internalGV will be wrong for things like batch or + // autoscalers, but they shouldn't be using the override + // storage. + internalGV := apigroup.GroupVersion.Group + "/__internal" + etcdOverrideStorage, err := newEtcdFn(servers, api.Codecs, storageVersions[apigroup.GroupVersion.Group], internalGV, prefix, quorum) if err != nil { glog.Fatalf("Invalid storage version or misconfigured etcd for %s: %v", tokens[0], err) } @@ -269,17 +275,18 @@ func Run(s *options.APIServer) error { storageDestinations := genericapiserver.NewStorageDestinations() - storageVersions := generateStorageVersionMap(s.DeprecatedStorageVersion, s.StorageVersions) + storageVersions := s.StorageGroupsToGroupVersions() if _, found := storageVersions[legacyV1Group.GroupVersion.Group]; !found { glog.Fatalf("Couldn't find the storage version for group: %q in storageVersions: %v", legacyV1Group.GroupVersion.Group, storageVersions) } - etcdStorage, err := newEtcd(s.EtcdServerList, api.Codecs, storageVersions[legacyV1Group.GroupVersion.Group], s.EtcdPathPrefix, s.EtcdQuorumRead) + etcdStorage, err := newEtcd(s.EtcdServerList, api.Codecs, storageVersions[legacyV1Group.GroupVersion.Group], "/__internal", s.EtcdPathPrefix, s.EtcdQuorumRead) if err != nil { glog.Fatalf("Invalid storage version or misconfigured etcd: %v", err) } storageDestinations.AddAPIGroup("", etcdStorage) if !apiGroupVersionOverrides["extensions/v1beta1"].Disable { + glog.Infof("Configuring extensions/v1beta1 storage destination") expGroup, err := registered.Group(extensions.GroupName) if err != nil { glog.Fatalf("Extensions API is enabled in runtime config, but not enabled in the environment variable KUBE_API_VERSIONS. Error: %v", err) @@ -287,7 +294,7 @@ func Run(s *options.APIServer) error { if _, found := storageVersions[expGroup.GroupVersion.Group]; !found { glog.Fatalf("Couldn't find the storage version for group: %q in storageVersions: %v", expGroup.GroupVersion.Group, storageVersions) } - expEtcdStorage, err := newEtcd(s.EtcdServerList, api.Codecs, storageVersions[expGroup.GroupVersion.Group], s.EtcdPathPrefix, s.EtcdQuorumRead) + expEtcdStorage, err := newEtcd(s.EtcdServerList, api.Codecs, storageVersions[expGroup.GroupVersion.Group], "extensions/__internal", s.EtcdPathPrefix, s.EtcdQuorumRead) if err != nil { glog.Fatalf("Invalid extensions storage version or misconfigured etcd: %v", err) } diff --git a/cmd/kube-apiserver/app/server_test.go b/cmd/kube-apiserver/app/server_test.go index 468f2a814bf..0d6b6fe06ab 100644 --- a/cmd/kube-apiserver/app/server_test.go +++ b/cmd/kube-apiserver/app/server_test.go @@ -71,44 +71,11 @@ func TestLongRunningRequestRegexp(t *testing.T) { } } -func TestGenerateStorageVersionMap(t *testing.T) { - testCases := []struct { - legacyVersion string - storageVersions string - expectedMap map[string]string - }{ - { - legacyVersion: "v1", - storageVersions: "v1,extensions/v1beta1", - expectedMap: map[string]string{ - api.GroupName: "v1", - extensions.GroupName: "extensions/v1beta1", - }, - }, - { - legacyVersion: "", - storageVersions: "extensions/v1beta1,v1", - expectedMap: map[string]string{ - api.GroupName: "v1", - extensions.GroupName: "extensions/v1beta1", - }, - }, - { - legacyVersion: "", - storageVersions: "", - expectedMap: map[string]string{}, - }, - } - for _, test := range testCases { - output := generateStorageVersionMap(test.legacyVersion, test.storageVersions) - if !reflect.DeepEqual(test.expectedMap, output) { - t.Errorf("unexpected error. expect: %v, got: %v", test.expectedMap, output) - } - } -} - func TestUpdateEtcdOverrides(t *testing.T) { - storageVersions := generateStorageVersionMap("", "v1,extensions/v1beta1") + storageVersions := map[string]string{ + "": "v1", + "extensions": "extensions/v1beta1", + } testCases := []struct { apigroup string @@ -133,7 +100,7 @@ func TestUpdateEtcdOverrides(t *testing.T) { } for _, test := range testCases { - newEtcd := func(serverList []string, _ runtime.NegotiatedSerializer, _, _ string, _ bool) (storage.Interface, error) { + newEtcd := func(serverList []string, _ runtime.NegotiatedSerializer, _, _, _ string, _ bool) (storage.Interface, error) { if !reflect.DeepEqual(test.servers, serverList) { t.Errorf("unexpected server list, expected: %#v, got: %#v", test.servers, serverList) } diff --git a/pkg/genericapiserver/genericapiserver.go b/pkg/genericapiserver/genericapiserver.go index 2c3560c2019..0277d946d2c 100644 --- a/pkg/genericapiserver/genericapiserver.go +++ b/pkg/genericapiserver/genericapiserver.go @@ -77,7 +77,9 @@ func NewStorageDestinations() StorageDestinations { } } +// AddAPIGroup replaces 'group' if it's already registered. func (s *StorageDestinations) AddAPIGroup(group string, defaultStorage storage.Interface) { + glog.Infof("Adding storage destination for group %v", group) s.APIGroups[group] = &StorageDestinationsForAPIGroup{ Default: defaultStorage, Overrides: map[string]storage.Interface{}, @@ -94,10 +96,16 @@ func (s *StorageDestinations) AddStorageOverride(group, resource string, overrid s.APIGroups[group].Overrides[resource] = override } +// Get finds the storage destination for the given group and resource. It will +// Fatalf if the group has no storage destination configured. func (s *StorageDestinations) Get(group, resource string) storage.Interface { apigroup, ok := s.APIGroups[group] if !ok { - glog.Errorf("No storage defined for API group: '%s'", apigroup) + // TODO: return an error like a normal function. For now, + // Fatalf is better than just logging an error, because this + // condition guarantees future problems and this is a less + // mysterious failure point. + glog.Fatalf("No storage defined for API group: '%s'. Defined groups: %#v", group, s.APIGroups) return nil } if apigroup.Overrides != nil { @@ -108,6 +116,30 @@ func (s *StorageDestinations) Get(group, resource string) storage.Interface { return apigroup.Default } +// Search is like Get, but can be used to search a list of groups. It tries the +// groups in order (and Fatalf's if none of them exist). The intention is for +// this to be used for resources that move between groups. +func (s *StorageDestinations) Search(groups []string, resource string) storage.Interface { + for _, group := range groups { + apigroup, ok := s.APIGroups[group] + if !ok { + continue + } + if apigroup.Overrides != nil { + if client, exists := apigroup.Overrides[resource]; exists { + return client + } + } + return apigroup.Default + } + // TODO: return an error like a normal function. For now, + // Fatalf is better than just logging an error, because this + // condition guarantees future problems and this is a less + // mysterious failure point. + glog.Fatalf("No storage defined for any of the groups: %v. Defined groups: %#v", groups, s.APIGroups) + return nil +} + // Get all backends for all registered storage destinations. // Used for getting all instances for health validations. func (s *StorageDestinations) Backends() []string { diff --git a/pkg/runtime/codec_check.go b/pkg/runtime/codec_check.go new file mode 100644 index 00000000000..09e7d51ad9f --- /dev/null +++ b/pkg/runtime/codec_check.go @@ -0,0 +1,50 @@ +/* +Copyright 2016 The Kubernetes Authors All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package runtime + +import ( + "fmt" + "reflect" + + "k8s.io/kubernetes/pkg/api/unversioned" +) + +// CheckCodec makes sure that the codec can encode objects like internalType, +// decode all of the external types listed, and also decode them into the given +// object. (Will modify internalObject.) (Assumes JSON serialization.) +// TODO: verify that the correct external version is chosen on encode... +func CheckCodec(c Codec, internalType Object, externalTypes ...unversioned.GroupVersionKind) error { + _, err := Encode(c, internalType) + if err != nil { + return fmt.Errorf("Internal type not encodable: %v", err) + } + for _, et := range externalTypes { + exBytes := []byte(fmt.Sprintf(`{"kind":"%v","apiVersion":"%v"}`, et.Kind, et.GroupVersion().String())) + obj, err := Decode(c, exBytes) + if err != nil { + return fmt.Errorf("external type %s not interpretable: %v", et, err) + } + if reflect.TypeOf(obj) != reflect.TypeOf(internalType) { + return fmt.Errorf("decode of external type %s produced: %#v", et, obj) + } + err = DecodeInto(c, exBytes, internalType) + if err != nil { + return fmt.Errorf("external type %s not convertable to internal type: %v", et, err) + } + } + return nil +} diff --git a/pkg/runtime/serializer/versioning/versioning.go b/pkg/runtime/serializer/versioning/versioning.go index ea255a90c77..2fe56aa3b70 100644 --- a/pkg/runtime/serializer/versioning/versioning.go +++ b/pkg/runtime/serializer/versioning/versioning.go @@ -24,6 +24,42 @@ import ( "k8s.io/kubernetes/pkg/runtime" ) +// EnableCrossGroupDecoding modifies the given decoder in place, if it is a codec +// from this package. It allows objects from one group to be auto-decoded into +// another group. 'destGroup' must already exist in the codec. +func EnableCrossGroupDecoding(d runtime.Decoder, sourceGroup, destGroup string) error { + internal, ok := d.(*codec) + if !ok { + return fmt.Errorf("unsupported decoder type") + } + + dest, ok := internal.decodeVersion[destGroup] + if !ok { + return fmt.Errorf("group %q is not a possible destination group in the given codec", destGroup) + } + internal.decodeVersion[sourceGroup] = dest + + return nil +} + +// EnableCrossGroupEncoding modifies the given encoder in place, if it is a codec +// from this package. It allows objects from one group to be auto-decoded into +// another group. 'destGroup' must already exist in the codec. +func EnableCrossGroupEncoding(e runtime.Encoder, sourceGroup, destGroup string) error { + internal, ok := e.(*codec) + if !ok { + return fmt.Errorf("unsupported encoder type") + } + + dest, ok := internal.encodeVersion[destGroup] + if !ok { + return fmt.Errorf("group %q is not a possible destination group in the given codec", destGroup) + } + internal.encodeVersion[sourceGroup] = dest + + return nil +} + // NewCodecForScheme is a convenience method for callers that are using a scheme. func NewCodecForScheme( // TODO: I should be a scheme interface? @@ -132,6 +168,7 @@ func (c *codec) Decode(data []byte, defaultGVK *unversioned.GroupVersionKind, in targetGV.Group = group targetGV.Version = runtime.APIVersionInternal } else { + fmt.Printf("looking for %v in %#v\n", group, c.decodeVersion) gv, ok := c.decodeVersion[group] if !ok { // unknown objects are left in their original version diff --git a/pkg/storage/cacher.go b/pkg/storage/cacher.go index c5bcf2448c6..e2ef0b79a44 100644 --- a/pkg/storage/cacher.go +++ b/pkg/storage/cacher.go @@ -145,6 +145,14 @@ func NewCacherFromConfig(config CacherConfig) *Cacher { watchCache := newWatchCache(config.CacheCapacity) listerWatcher := newCacherListerWatcher(config.Storage, config.ResourcePrefix, config.NewListFunc) + // Give this error when it is constructed rather than when you get the + // first watch item, because it's much easier to track down that way. + if obj, ok := config.Type.(runtime.Object); ok { + if err := runtime.CheckCodec(config.Storage.Codec(), obj); err != nil { + panic("storage codec doesn't seem to match given type: " + err.Error()) + } + } + cacher := &Cacher{ usable: sync.RWMutex{}, storage: config.Storage,