changes for cross-group moves

This commit is contained in:
Daniel Smith 2016-02-12 18:08:35 -08:00 committed by Piotr Szczesniak
parent d9705940d6
commit 74400c33ae
8 changed files with 289 additions and 67 deletions

View File

@ -24,6 +24,7 @@ import (
"k8s.io/kubernetes/pkg/admission"
"k8s.io/kubernetes/pkg/api"
apiutil "k8s.io/kubernetes/pkg/api/util"
"k8s.io/kubernetes/pkg/api/validation"
"k8s.io/kubernetes/pkg/apimachinery/registered"
"k8s.io/kubernetes/pkg/apiserver"
@ -80,8 +81,12 @@ type APIServer struct {
ServiceClusterIPRange net.IPNet // TODO: make this a list
ServiceNodePortRange utilnet.PortRange
StorageVersions string
TokenAuthFile string
WatchCacheSizes []string
// The default values for StorageVersions. StorageVersions overrides
// these; you can change this if you want to change the defaults (e.g.,
// for testing). This is not actually exposed as a flag.
DefaultStorageVersions string
TokenAuthFile string
WatchCacheSizes []string
}
// NewAPIServer creates a new APIServer object with default parameters
@ -99,6 +104,7 @@ func NewAPIServer() *APIServer {
MasterServiceNamespace: api.NamespaceDefault,
RuntimeConfig: make(util.ConfigurationMap),
StorageVersions: registered.AllPreferredGroupVersions(),
DefaultStorageVersions: registered.AllPreferredGroupVersions(),
KubeletConfig: kubeletclient.KubeletClientConfig{
Port: ports.KubeletPort,
EnableHttps: true,
@ -109,6 +115,42 @@ func NewAPIServer() *APIServer {
return &s
}
// dest must be a map of group to groupVersion.
func gvToMap(gvList string, dest map[string]string) {
for _, gv := range strings.Split(gvList, ",") {
if gv == "" {
continue
}
// We accept two formats. "group/version" OR
// "group=group/version". The latter is used when types
// move between groups.
if !strings.Contains(gv, "=") {
dest[apiutil.GetGroup(gv)] = gv
} else {
parts := strings.SplitN(gv, "=", 2)
// TODO: error checking.
dest[parts[0]] = parts[1]
}
}
}
// StorageGroupsToGroupVersions returns a map from group name to group version,
// computed from the s.DeprecatedStorageVersion and s.StorageVersions flags.
// TODO: can we move the whole storage version concept to the generic apiserver?
func (s *APIServer) StorageGroupsToGroupVersions() map[string]string {
storageVersionMap := map[string]string{}
if s.DeprecatedStorageVersion != "" {
storageVersionMap[""] = s.DeprecatedStorageVersion
}
// First, get the defaults.
gvToMap(s.DefaultStorageVersions, storageVersionMap)
// Override any defaults with the user settings.
gvToMap(s.StorageVersions, storageVersionMap)
return storageVersionMap
}
// AddFlags adds flags for a specific APIServer to the specified FlagSet
func (s *APIServer) AddFlags(fs *pflag.FlagSet) {
// Note: the weird ""+ in below lines seems to be the only way to get gofmt to
@ -150,9 +192,10 @@ func (s *APIServer) AddFlags(fs *pflag.FlagSet) {
fs.MarkDeprecated("api-prefix", "--api-prefix is deprecated and will be removed when the v1 API is retired.")
fs.StringVar(&s.DeprecatedStorageVersion, "storage-version", s.DeprecatedStorageVersion, "The version to store the legacy v1 resources with. Defaults to server preferred")
fs.MarkDeprecated("storage-version", "--storage-version is deprecated and will be removed when the v1 API is retired. See --storage-versions instead.")
fs.StringVar(&s.StorageVersions, "storage-versions", s.StorageVersions, "The versions to store resources with. "+
"Different groups may be stored in different versions. Specified in the format \"group1/version1,group2/version2...\". "+
"This flag expects a complete list of storage versions of ALL groups registered in the server. "+
fs.StringVar(&s.StorageVersions, "storage-versions", s.StorageVersions, "The per-group version to store resources in. "+
"Specified in the format \"group1/version1,group2/version2,...\". "+
"In the case where objects are moved from one group to the other, you may specify the format \"group1=group2/v1beta1,group3/v1beta1,...\". "+
"You only need to pass the groups you wish to change from the defaults. "+
"It defaults to a list of preferred versions of all registered groups, which is derived from the KUBE_API_VERSIONS environment variable.")
fs.StringVar(&s.CloudProvider, "cloud-provider", s.CloudProvider, "The provider for cloud services. Empty string for no provider.")
fs.StringVar(&s.CloudConfigFile, "cloud-config", s.CloudConfigFile, "The path to the cloud provider configuration file. Empty string for no configuration file.")

View File

@ -0,0 +1,78 @@
/*
Copyright 2014 The Kubernetes Authors All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package options
import (
"reflect"
"testing"
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/apis/batch"
"k8s.io/kubernetes/pkg/apis/extensions"
)
func TestGenerateStorageVersionMap(t *testing.T) {
testCases := []struct {
legacyVersion string
storageVersions string
defaultVersions string
expectedMap map[string]string
}{
{
legacyVersion: "v1",
storageVersions: "v1,extensions/v1beta1",
expectedMap: map[string]string{
api.GroupName: "v1",
extensions.GroupName: "extensions/v1beta1",
},
},
{
legacyVersion: "",
storageVersions: "extensions/v1beta1,v1",
expectedMap: map[string]string{
api.GroupName: "v1",
extensions.GroupName: "extensions/v1beta1",
},
},
{
legacyVersion: "",
storageVersions: "batch=extensions/v1beta1,v1",
defaultVersions: "extensions/v1beta1,v1,batch/v1",
expectedMap: map[string]string{
api.GroupName: "v1",
batch.GroupName: "extensions/v1beta1",
extensions.GroupName: "extensions/v1beta1",
},
},
{
legacyVersion: "",
storageVersions: "",
expectedMap: map[string]string{},
},
}
for i, test := range testCases {
s := APIServer{
DeprecatedStorageVersion: test.legacyVersion,
StorageVersions: test.storageVersions,
DefaultStorageVersions: test.defaultVersions,
}
output := s.StorageGroupsToGroupVersions()
if !reflect.DeepEqual(test.expectedMap, output) {
t.Errorf("%v: unexpected error. expect: %v, got: %v", i, test.expectedMap, output)
}
}
}

View File

@ -36,7 +36,6 @@ import (
"k8s.io/kubernetes/pkg/admission"
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/api/unversioned"
apiutil "k8s.io/kubernetes/pkg/api/util"
"k8s.io/kubernetes/pkg/apimachinery/registered"
"k8s.io/kubernetes/pkg/apis/extensions"
"k8s.io/kubernetes/pkg/apiserver"
@ -51,6 +50,7 @@ import (
"k8s.io/kubernetes/pkg/master"
"k8s.io/kubernetes/pkg/registry/cachesize"
"k8s.io/kubernetes/pkg/runtime"
"k8s.io/kubernetes/pkg/runtime/serializer/versioning"
"k8s.io/kubernetes/pkg/serviceaccount"
"k8s.io/kubernetes/pkg/storage"
etcdstorage "k8s.io/kubernetes/pkg/storage/etcd"
@ -85,15 +85,20 @@ func verifyClusterIPFlags(s *options.APIServer) {
}
}
type newEtcdFunc func([]string, runtime.NegotiatedSerializer, string, string, bool) (storage.Interface, error)
// For testing.
type newEtcdFunc func([]string, runtime.NegotiatedSerializer, string, string, string, bool) (storage.Interface, error)
func newEtcd(etcdServerList []string, ns runtime.NegotiatedSerializer, storageGroupVersionString, pathPrefix string, quorum bool) (etcdStorage storage.Interface, err error) {
func newEtcd(etcdServerList []string, ns runtime.NegotiatedSerializer, storageGroupVersionString, memoryGroupVersionString, pathPrefix string, quorum bool) (etcdStorage storage.Interface, err error) {
if storageGroupVersionString == "" {
return etcdStorage, fmt.Errorf("storageVersion is required to create a etcd storage")
}
storageVersion, err := unversioned.ParseGroupVersion(storageGroupVersionString)
if err != nil {
return nil, err
return nil, fmt.Errorf("couldn't understand storage version %v: %v", storageGroupVersionString, err)
}
memoryVersion, err := unversioned.ParseGroupVersion(memoryGroupVersionString)
if err != nil {
return nil, fmt.Errorf("couldn't understand memory version %v: %v", memoryGroupVersionString, err)
}
var storageConfig etcdstorage.EtcdConfig
@ -104,23 +109,20 @@ func newEtcd(etcdServerList []string, ns runtime.NegotiatedSerializer, storageGr
if !ok {
return nil, fmt.Errorf("unable to find serializer for JSON")
}
storageConfig.Codec = runtime.NewCodec(ns.EncoderForVersion(s, storageVersion), ns.DecoderToVersion(s, unversioned.GroupVersion{Group: storageVersion.Group, Version: runtime.APIVersionInternal}))
return storageConfig.NewStorage()
}
// convert to a map between group and groupVersions.
func generateStorageVersionMap(legacyVersion string, storageVersions string) map[string]string {
storageVersionMap := map[string]string{}
if legacyVersion != "" {
storageVersionMap[""] = legacyVersion
}
if storageVersions != "" {
groupVersions := strings.Split(storageVersions, ",")
for _, gv := range groupVersions {
storageVersionMap[apiutil.GetGroup(gv)] = gv
glog.Infof("constructing etcd storage interface.\n sv: %v\n mv: %v\n", storageVersion, memoryVersion)
encoder := ns.EncoderForVersion(s, storageVersion)
decoder := ns.DecoderToVersion(s, memoryVersion)
if memoryVersion.Group != storageVersion.Group {
// Allow this codec to translate between groups.
if err = versioning.EnableCrossGroupEncoding(encoder, memoryVersion.Group, storageVersion.Group); err != nil {
return nil, fmt.Errorf("error setting up encoder for %v: %v", storageGroupVersionString, err)
}
if err = versioning.EnableCrossGroupDecoding(decoder, storageVersion.Group, memoryVersion.Group); err != nil {
return nil, fmt.Errorf("error setting up decoder for %v: %v", storageGroupVersionString, err)
}
}
return storageVersionMap
storageConfig.Codec = runtime.NewCodec(encoder, decoder)
return storageConfig.NewStorage()
}
// parse the value of --etcd-servers-overrides and update given storageDestinations.
@ -153,7 +155,11 @@ func updateEtcdOverrides(overrides []string, storageVersions map[string]string,
}
servers := strings.Split(tokens[1], ";")
etcdOverrideStorage, err := newEtcdFn(servers, api.Codecs, storageVersions[apigroup.GroupVersion.Group], prefix, quorum)
// Note, internalGV will be wrong for things like batch or
// autoscalers, but they shouldn't be using the override
// storage.
internalGV := apigroup.GroupVersion.Group + "/__internal"
etcdOverrideStorage, err := newEtcdFn(servers, api.Codecs, storageVersions[apigroup.GroupVersion.Group], internalGV, prefix, quorum)
if err != nil {
glog.Fatalf("Invalid storage version or misconfigured etcd for %s: %v", tokens[0], err)
}
@ -269,17 +275,18 @@ func Run(s *options.APIServer) error {
storageDestinations := genericapiserver.NewStorageDestinations()
storageVersions := generateStorageVersionMap(s.DeprecatedStorageVersion, s.StorageVersions)
storageVersions := s.StorageGroupsToGroupVersions()
if _, found := storageVersions[legacyV1Group.GroupVersion.Group]; !found {
glog.Fatalf("Couldn't find the storage version for group: %q in storageVersions: %v", legacyV1Group.GroupVersion.Group, storageVersions)
}
etcdStorage, err := newEtcd(s.EtcdServerList, api.Codecs, storageVersions[legacyV1Group.GroupVersion.Group], s.EtcdPathPrefix, s.EtcdQuorumRead)
etcdStorage, err := newEtcd(s.EtcdServerList, api.Codecs, storageVersions[legacyV1Group.GroupVersion.Group], "/__internal", s.EtcdPathPrefix, s.EtcdQuorumRead)
if err != nil {
glog.Fatalf("Invalid storage version or misconfigured etcd: %v", err)
}
storageDestinations.AddAPIGroup("", etcdStorage)
if !apiGroupVersionOverrides["extensions/v1beta1"].Disable {
glog.Infof("Configuring extensions/v1beta1 storage destination")
expGroup, err := registered.Group(extensions.GroupName)
if err != nil {
glog.Fatalf("Extensions API is enabled in runtime config, but not enabled in the environment variable KUBE_API_VERSIONS. Error: %v", err)
@ -287,7 +294,7 @@ func Run(s *options.APIServer) error {
if _, found := storageVersions[expGroup.GroupVersion.Group]; !found {
glog.Fatalf("Couldn't find the storage version for group: %q in storageVersions: %v", expGroup.GroupVersion.Group, storageVersions)
}
expEtcdStorage, err := newEtcd(s.EtcdServerList, api.Codecs, storageVersions[expGroup.GroupVersion.Group], s.EtcdPathPrefix, s.EtcdQuorumRead)
expEtcdStorage, err := newEtcd(s.EtcdServerList, api.Codecs, storageVersions[expGroup.GroupVersion.Group], "extensions/__internal", s.EtcdPathPrefix, s.EtcdQuorumRead)
if err != nil {
glog.Fatalf("Invalid extensions storage version or misconfigured etcd: %v", err)
}

View File

@ -71,44 +71,11 @@ func TestLongRunningRequestRegexp(t *testing.T) {
}
}
func TestGenerateStorageVersionMap(t *testing.T) {
testCases := []struct {
legacyVersion string
storageVersions string
expectedMap map[string]string
}{
{
legacyVersion: "v1",
storageVersions: "v1,extensions/v1beta1",
expectedMap: map[string]string{
api.GroupName: "v1",
extensions.GroupName: "extensions/v1beta1",
},
},
{
legacyVersion: "",
storageVersions: "extensions/v1beta1,v1",
expectedMap: map[string]string{
api.GroupName: "v1",
extensions.GroupName: "extensions/v1beta1",
},
},
{
legacyVersion: "",
storageVersions: "",
expectedMap: map[string]string{},
},
}
for _, test := range testCases {
output := generateStorageVersionMap(test.legacyVersion, test.storageVersions)
if !reflect.DeepEqual(test.expectedMap, output) {
t.Errorf("unexpected error. expect: %v, got: %v", test.expectedMap, output)
}
}
}
func TestUpdateEtcdOverrides(t *testing.T) {
storageVersions := generateStorageVersionMap("", "v1,extensions/v1beta1")
storageVersions := map[string]string{
"": "v1",
"extensions": "extensions/v1beta1",
}
testCases := []struct {
apigroup string
@ -133,7 +100,7 @@ func TestUpdateEtcdOverrides(t *testing.T) {
}
for _, test := range testCases {
newEtcd := func(serverList []string, _ runtime.NegotiatedSerializer, _, _ string, _ bool) (storage.Interface, error) {
newEtcd := func(serverList []string, _ runtime.NegotiatedSerializer, _, _, _ string, _ bool) (storage.Interface, error) {
if !reflect.DeepEqual(test.servers, serverList) {
t.Errorf("unexpected server list, expected: %#v, got: %#v", test.servers, serverList)
}

View File

@ -77,7 +77,9 @@ func NewStorageDestinations() StorageDestinations {
}
}
// AddAPIGroup replaces 'group' if it's already registered.
func (s *StorageDestinations) AddAPIGroup(group string, defaultStorage storage.Interface) {
glog.Infof("Adding storage destination for group %v", group)
s.APIGroups[group] = &StorageDestinationsForAPIGroup{
Default: defaultStorage,
Overrides: map[string]storage.Interface{},
@ -94,10 +96,16 @@ func (s *StorageDestinations) AddStorageOverride(group, resource string, overrid
s.APIGroups[group].Overrides[resource] = override
}
// Get finds the storage destination for the given group and resource. It will
// Fatalf if the group has no storage destination configured.
func (s *StorageDestinations) Get(group, resource string) storage.Interface {
apigroup, ok := s.APIGroups[group]
if !ok {
glog.Errorf("No storage defined for API group: '%s'", apigroup)
// TODO: return an error like a normal function. For now,
// Fatalf is better than just logging an error, because this
// condition guarantees future problems and this is a less
// mysterious failure point.
glog.Fatalf("No storage defined for API group: '%s'. Defined groups: %#v", group, s.APIGroups)
return nil
}
if apigroup.Overrides != nil {
@ -108,6 +116,30 @@ func (s *StorageDestinations) Get(group, resource string) storage.Interface {
return apigroup.Default
}
// Search is like Get, but can be used to search a list of groups. It tries the
// groups in order (and Fatalf's if none of them exist). The intention is for
// this to be used for resources that move between groups.
func (s *StorageDestinations) Search(groups []string, resource string) storage.Interface {
for _, group := range groups {
apigroup, ok := s.APIGroups[group]
if !ok {
continue
}
if apigroup.Overrides != nil {
if client, exists := apigroup.Overrides[resource]; exists {
return client
}
}
return apigroup.Default
}
// TODO: return an error like a normal function. For now,
// Fatalf is better than just logging an error, because this
// condition guarantees future problems and this is a less
// mysterious failure point.
glog.Fatalf("No storage defined for any of the groups: %v. Defined groups: %#v", groups, s.APIGroups)
return nil
}
// Get all backends for all registered storage destinations.
// Used for getting all instances for health validations.
func (s *StorageDestinations) Backends() []string {

View File

@ -0,0 +1,50 @@
/*
Copyright 2016 The Kubernetes Authors All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package runtime
import (
"fmt"
"reflect"
"k8s.io/kubernetes/pkg/api/unversioned"
)
// CheckCodec makes sure that the codec can encode objects like internalType,
// decode all of the external types listed, and also decode them into the given
// object. (Will modify internalObject.) (Assumes JSON serialization.)
// TODO: verify that the correct external version is chosen on encode...
func CheckCodec(c Codec, internalType Object, externalTypes ...unversioned.GroupVersionKind) error {
_, err := Encode(c, internalType)
if err != nil {
return fmt.Errorf("Internal type not encodable: %v", err)
}
for _, et := range externalTypes {
exBytes := []byte(fmt.Sprintf(`{"kind":"%v","apiVersion":"%v"}`, et.Kind, et.GroupVersion().String()))
obj, err := Decode(c, exBytes)
if err != nil {
return fmt.Errorf("external type %s not interpretable: %v", et, err)
}
if reflect.TypeOf(obj) != reflect.TypeOf(internalType) {
return fmt.Errorf("decode of external type %s produced: %#v", et, obj)
}
err = DecodeInto(c, exBytes, internalType)
if err != nil {
return fmt.Errorf("external type %s not convertable to internal type: %v", et, err)
}
}
return nil
}

View File

@ -24,6 +24,42 @@ import (
"k8s.io/kubernetes/pkg/runtime"
)
// EnableCrossGroupDecoding modifies the given decoder in place, if it is a codec
// from this package. It allows objects from one group to be auto-decoded into
// another group. 'destGroup' must already exist in the codec.
func EnableCrossGroupDecoding(d runtime.Decoder, sourceGroup, destGroup string) error {
internal, ok := d.(*codec)
if !ok {
return fmt.Errorf("unsupported decoder type")
}
dest, ok := internal.decodeVersion[destGroup]
if !ok {
return fmt.Errorf("group %q is not a possible destination group in the given codec", destGroup)
}
internal.decodeVersion[sourceGroup] = dest
return nil
}
// EnableCrossGroupEncoding modifies the given encoder in place, if it is a codec
// from this package. It allows objects from one group to be auto-decoded into
// another group. 'destGroup' must already exist in the codec.
func EnableCrossGroupEncoding(e runtime.Encoder, sourceGroup, destGroup string) error {
internal, ok := e.(*codec)
if !ok {
return fmt.Errorf("unsupported encoder type")
}
dest, ok := internal.encodeVersion[destGroup]
if !ok {
return fmt.Errorf("group %q is not a possible destination group in the given codec", destGroup)
}
internal.encodeVersion[sourceGroup] = dest
return nil
}
// NewCodecForScheme is a convenience method for callers that are using a scheme.
func NewCodecForScheme(
// TODO: I should be a scheme interface?
@ -132,6 +168,7 @@ func (c *codec) Decode(data []byte, defaultGVK *unversioned.GroupVersionKind, in
targetGV.Group = group
targetGV.Version = runtime.APIVersionInternal
} else {
fmt.Printf("looking for %v in %#v\n", group, c.decodeVersion)
gv, ok := c.decodeVersion[group]
if !ok {
// unknown objects are left in their original version

View File

@ -145,6 +145,14 @@ func NewCacherFromConfig(config CacherConfig) *Cacher {
watchCache := newWatchCache(config.CacheCapacity)
listerWatcher := newCacherListerWatcher(config.Storage, config.ResourcePrefix, config.NewListFunc)
// Give this error when it is constructed rather than when you get the
// first watch item, because it's much easier to track down that way.
if obj, ok := config.Type.(runtime.Object); ok {
if err := runtime.CheckCodec(config.Storage.Codec(), obj); err != nil {
panic("storage codec doesn't seem to match given type: " + err.Error())
}
}
cacher := &Cacher{
usable: sync.RWMutex{},
storage: config.Storage,