k8s.io/apiserver: straighten EtcdOptions, backend Config and kube RESTOptionsFactory

This commit is contained in:
Dr. Stefan Schimanski 2017-02-11 12:14:34 +01:00
parent 27e01b5c46
commit 5e77d01897
15 changed files with 107 additions and 143 deletions

View File

@ -23,6 +23,7 @@ import (
utilnet "k8s.io/apimachinery/pkg/util/net"
genericoptions "k8s.io/apiserver/pkg/server/options"
"k8s.io/apiserver/pkg/storage/storagebackend"
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/api/validation"
kubeoptions "k8s.io/kubernetes/pkg/kubeapiserver/options"
@ -68,7 +69,7 @@ type ServerRunOptions struct {
func NewServerRunOptions() *ServerRunOptions {
s := ServerRunOptions{
GenericServerRunOptions: genericoptions.NewServerRunOptions(),
Etcd: genericoptions.NewEtcdOptions(kubeoptions.DefaultEtcdPathPrefix, api.Scheme, nil),
Etcd: genericoptions.NewEtcdOptions(storagebackend.NewDefaultConfig(kubeoptions.DefaultEtcdPathPrefix, api.Scheme, nil)),
SecureServing: genericoptions.NewSecureServingOptions(),
InsecureServing: genericoptions.NewInsecureServingOptions(),
Audit: genericoptions.NewAuditLogOptions(),

View File

@ -309,11 +309,9 @@ func Run(s *options.ServerRunOptions) error {
sets.NewString("watch", "proxy"),
sets.NewString("attach", "exec", "proxy", "log", "portforward"),
)
genericConfig.RESTOptionsGetter = &kubeapiserver.RESTOptionsFactory{
StorageFactory: storageFactory,
EnableWatchCache: s.Etcd.EnableWatchCache,
EnableGarbageCollection: s.Etcd.EnableGarbageCollection,
DeleteCollectionWorkers: s.Etcd.DeleteCollectionWorkers,
if err := s.Etcd.ApplyWithStorageFactoryTo(storageFactory, genericConfig); err != nil {
return err
}
config := &master.Config{

View File

@ -19,6 +19,8 @@ package apiserver
import (
"fmt"
"github.com/golang/glog"
"k8s.io/apimachinery/pkg/runtime/schema"
utilerrors "k8s.io/apimachinery/pkg/util/errors"
"k8s.io/apiserver/pkg/authorization/authorizerfactory"
@ -27,6 +29,7 @@ import (
genericapiserver "k8s.io/apiserver/pkg/server"
genericoptions "k8s.io/apiserver/pkg/server/options"
serverstorage "k8s.io/apiserver/pkg/server/storage"
"k8s.io/apiserver/pkg/storage/storagebackend"
"k8s.io/kubernetes/cmd/libs/go2idl/client-gen/test_apis/testgroup/v1"
testgroupetcd "k8s.io/kubernetes/examples/apiserver/rest"
"k8s.io/kubernetes/pkg/api"
@ -34,8 +37,6 @@ import (
// Install the testgroup API
_ "k8s.io/kubernetes/cmd/libs/go2idl/client-gen/test_apis/testgroup/install"
"github.com/golang/glog"
)
const (
@ -45,17 +46,6 @@ const (
SecurePort = 6444
)
func newStorageFactory() genericapiserver.StorageFactory {
config := storagebackend.Config{
Prefix: kubeoptions.DefaultEtcdPathPrefix,
ServerList: []string{"http://127.0.0.1:2379"},
Copier: api.Scheme,
}
storageFactory := genericapiserver.NewDefaultStorageFactory(config, "application/json", api.Codecs, genericapiserver.NewDefaultResourceEncodingConfig(api.Registry), genericapiserver.NewResourceConfig())
return storageFactory
}
type ServerRunOptions struct {
GenericServerRunOptions *genericoptions.ServerRunOptions
Etcd *genericoptions.EtcdOptions
@ -68,7 +58,7 @@ type ServerRunOptions struct {
func NewServerRunOptions() *ServerRunOptions {
s := ServerRunOptions{
GenericServerRunOptions: genericoptions.NewServerRunOptions(),
Etcd: genericoptions.NewEtcdOptions(kubeoptions.DefaultEtcdPathPrefix, api.Scheme, nil),
Etcd: genericoptions.NewEtcdOptions(storagebackend.NewDefaultConfig(kubeoptions.DefaultEtcdPathPrefix, api.Scheme, nil)),
SecureServing: genericoptions.NewSecureServingOptions(),
InsecureServing: genericoptions.NewInsecureServingOptions(),
Authentication: kubeoptions.NewBuiltInAuthenticationOptions().WithAll(),
@ -76,6 +66,7 @@ func NewServerRunOptions() *ServerRunOptions {
}
s.InsecureServing.BindPort = InsecurePort
s.SecureServing.ServingOptions.BindPort = SecurePort
s.Etcd.StorageConfig.ServerList = []string{"http://127.0.0.1:2379"}
return &s
}
@ -122,22 +113,25 @@ func (serverOptions *ServerRunOptions) Run(stopCh <-chan struct{}) error {
config.Authorizer = authorizerfactory.NewAlwaysAllowAuthorizer()
config.SwaggerConfig = genericapiserver.DefaultSwaggerConfig()
s, err := config.Complete().New()
if err != nil {
return fmt.Errorf("Error in bringing up the server: %v", err)
}
groupVersion := v1.SchemeGroupVersion
groupName := groupVersion.Group
groupMeta, err := api.Registry.Group(groupName)
if err != nil {
return fmt.Errorf("%v", err)
}
storageFactory := newStorageFactory()
storageFactory := serverstorage.NewDefaultStorageFactory(serverOptions.Etcd.StorageConfig, "application/json", api.Codecs, serverstorage.NewDefaultResourceEncodingConfig(api.Registry), serverstorage.NewResourceConfig())
storageConfig, err := storageFactory.NewConfig(schema.GroupResource{Group: groupName, Resource: "testtype"})
if err != nil {
return fmt.Errorf("Unable to get storage config: %v", err)
}
if err := serverOptions.Etcd.ApplyWithStorageFactoryTo(storageFactory, config); err != nil {
return fmt.Errorf("failed to configure authentication: %s", err)
}
s, err := config.Complete().New()
if err != nil {
return fmt.Errorf("Error in bringing up the server: %v", err)
}
testTypeOpts := generic.RESTOptions{
StorageConfig: storageConfig,

View File

@ -21,6 +21,7 @@ import (
"time"
genericoptions "k8s.io/apiserver/pkg/server/options"
"k8s.io/apiserver/pkg/storage/storagebackend"
"k8s.io/kubernetes/pkg/api"
kubeoptions "k8s.io/kubernetes/pkg/kubeapiserver/options"
@ -51,7 +52,7 @@ type ServerRunOptions struct {
func NewServerRunOptions() *ServerRunOptions {
s := ServerRunOptions{
GenericServerRunOptions: genericoptions.NewServerRunOptions(),
Etcd: genericoptions.NewEtcdOptions(kubeoptions.DefaultEtcdPathPrefix, api.Scheme, nil),
Etcd: genericoptions.NewEtcdOptions(storagebackend.NewDefaultConfig(kubeoptions.DefaultEtcdPathPrefix, api.Scheme, nil)),
SecureServing: genericoptions.NewSecureServingOptions(),
InsecureServing: genericoptions.NewInsecureServingOptions(),
Audit: genericoptions.NewAuditLogOptions(),

View File

@ -146,6 +146,9 @@ func Run(s *options.ServerRunOptions) error {
servers := strings.Split(tokens[1], ";")
storageFactory.SetEtcdLocation(groupResource, servers)
}
if err := s.Etcd.ApplyWithStorageFactoryTo(storageFactory, genericConfig); err != nil {
return err
}
apiAuthenticator, securityDefinitions, err := s.Authentication.ToAuthenticationConfig().New()
if err != nil {
@ -188,12 +191,6 @@ func Run(s *options.ServerRunOptions) error {
sets.NewString("watch", "proxy"),
sets.NewString("attach", "exec", "proxy", "log", "portforward"),
)
genericConfig.RESTOptionsGetter = &kubeapiserver.RESTOptionsFactory{
StorageFactory: storageFactory,
EnableWatchCache: s.Etcd.EnableWatchCache,
EnableGarbageCollection: s.Etcd.EnableGarbageCollection,
DeleteCollectionWorkers: s.Etcd.DeleteCollectionWorkers,
}
// TODO: Move this to generic api server (Need to move the command line flag).
if s.Etcd.EnableWatchCache {

View File

@ -29,7 +29,7 @@ import (
"k8s.io/kubernetes/pkg/api"
)
// NewStorageFactory builds the ConfigurableStorageFactory.
// NewStorageFactory builds the DefaultStorageFactory.
// Merges defaultResourceConfig with the user specified overrides and merges
// defaultAPIResourceConfig with the corresponding user specified overrides as well.
func NewStorageFactory(storageConfig storagebackend.Config, defaultMediaType string, serializer runtime.StorageSerializer,

View File

@ -1,54 +0,0 @@
/*
Copyright 2017 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package kubeapiserver
import (
"fmt"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apiserver/pkg/registry/generic"
genericregistry "k8s.io/apiserver/pkg/registry/generic/registry"
serverstorage "k8s.io/apiserver/pkg/server/storage"
)
// RESTOptionsFactory is a RESTOptionsGetter for kube apiservers since they do complicated stuff
type RESTOptionsFactory struct {
DeleteCollectionWorkers int
EnableGarbageCollection bool
EnableWatchCache bool
StorageFactory serverstorage.StorageFactory
}
func (f *RESTOptionsFactory) GetRESTOptions(resource schema.GroupResource) (generic.RESTOptions, error) {
storageConfig, err := f.StorageFactory.NewConfig(resource)
if err != nil {
return generic.RESTOptions{}, fmt.Errorf("Unable to find storage destination for %v, due to %v", resource, err.Error())
}
ret := generic.RESTOptions{
StorageConfig: storageConfig,
Decorator: generic.UndecoratedStorage,
DeleteCollectionWorkers: f.DeleteCollectionWorkers,
EnableGarbageCollection: f.EnableGarbageCollection,
ResourcePrefix: f.StorageFactory.ResourcePrefix(resource),
}
if f.EnableWatchCache {
ret.Decorator = genericregistry.StorageWithCacher
}
return ret, nil
}

View File

@ -34,6 +34,7 @@ import (
"k8s.io/apimachinery/pkg/version"
genericapirequest "k8s.io/apiserver/pkg/endpoints/request"
genericapiserver "k8s.io/apiserver/pkg/server"
"k8s.io/apiserver/pkg/server/options"
serverstorage "k8s.io/apiserver/pkg/server/storage"
etcdtesting "k8s.io/apiserver/pkg/storage/etcd/testing"
restclient "k8s.io/client-go/rest"
@ -52,7 +53,6 @@ import (
extensionsapiv1beta1 "k8s.io/kubernetes/pkg/apis/extensions/v1beta1"
"k8s.io/kubernetes/pkg/apis/rbac"
"k8s.io/kubernetes/pkg/client/clientset_generated/clientset/fake"
"k8s.io/kubernetes/pkg/kubeapiserver"
kubeletclient "k8s.io/kubernetes/pkg/kubelet/client"
kubeversion "k8s.io/kubernetes/pkg/version"
@ -80,6 +80,11 @@ func setUp(t *testing.T) (*Master, *etcdtesting.EtcdTestServer, Config, *assert.
resourceEncoding.SetVersionEncoding(certificates.GroupName, *testapi.Certificates.GroupVersion(), schema.GroupVersion{Group: certificates.GroupName, Version: runtime.APIVersionInternal})
storageFactory := serverstorage.NewDefaultStorageFactory(*storageConfig, testapi.StorageMediaType(), api.Codecs, resourceEncoding, DefaultAPIResourceConfigSource())
err := options.NewEtcdOptions(storageConfig).ApplyWithStorageFactoryTo(storageFactory, config.GenericConfig)
if err != nil {
t.Fatal(err)
}
kubeVersion := kubeversion.Get()
config.GenericConfig.Version = &kubeVersion
config.StorageFactory = storageFactory
@ -89,12 +94,6 @@ func setUp(t *testing.T) (*Master, *etcdtesting.EtcdTestServer, Config, *assert.
config.GenericConfig.RequestContextMapper = genericapirequest.NewRequestContextMapper()
config.GenericConfig.LoopbackClientConfig = &restclient.Config{APIPath: "/api", ContentConfig: restclient.ContentConfig{NegotiatedSerializer: api.Codecs}}
config.GenericConfig.EnableMetrics = true
config.GenericConfig.RESTOptionsGetter = &kubeapiserver.RESTOptionsFactory{
StorageFactory: storageFactory,
EnableWatchCache: true,
EnableGarbageCollection: true,
DeleteCollectionWorkers: 1,
}
config.EnableCoreControllers = false
config.KubeletClientConfig = kubeletclient.KubeletClientConfig{Port: 10250}
config.ProxyTransport = utilnet.SetTransportDefaults(&http.Transport{

View File

@ -131,7 +131,8 @@ type Config struct {
OpenAPIConfig *openapicommon.Config
// SwaggerConfig will be used in generating Swagger spec. This is nil by default. Use DefaultSwaggerConfig for "working" defaults.
SwaggerConfig *swagger.Config
// RESTOptionsGetter is used to construct "normal" RESTStorage types
// RESTOptionsGetter is used to construct RESTStorage types via the generic registry.
RESTOptionsGetter genericregistry.RESTOptionsGetter
// If specified, requests will be allocated a random timeout between this value, and twice this value.

View File

@ -21,11 +21,11 @@ import (
"github.com/spf13/pflag"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apiserver/pkg/registry/generic"
"k8s.io/apiserver/pkg/registry/generic/registry"
genericregistry "k8s.io/apiserver/pkg/registry/generic/registry"
"k8s.io/apiserver/pkg/server"
serverstorage "k8s.io/apiserver/pkg/server/storage"
"k8s.io/apiserver/pkg/storage/storagebackend"
)
@ -42,16 +42,9 @@ type EtcdOptions struct {
EnableWatchCache bool
}
func NewEtcdOptions(prefix string, copier runtime.ObjectCopier, codec runtime.Codec) *EtcdOptions {
func NewEtcdOptions(backendConfig *storagebackend.Config) *EtcdOptions {
return &EtcdOptions{
StorageConfig: storagebackend.Config{
Prefix: prefix,
// Default cache size to 0 - if unset, its size will be set based on target
// memory usage.
DeserializationCacheSize: 0,
Copier: copier,
Codec: codec,
},
StorageConfig: *backendConfig,
DefaultStorageMediaType: "application/json",
DeleteCollectionWorkers: 1,
EnableGarbageCollection: true,
@ -114,28 +107,53 @@ func (s *EtcdOptions) AddFlags(fs *pflag.FlagSet) {
}
func (s *EtcdOptions) ApplyTo(c *server.Config) error {
c.RESTOptionsGetter = &restOptionsFactory{options: s}
c.RESTOptionsGetter = &simpleRestOptionsFactory{Options: *s}
return nil
}
// restOptionsFactory is a default implementation of a RESTOptionsGetter
// This will work well for most aggregated API servers. The legacy kube server needs more customization
type restOptionsFactory struct {
options *EtcdOptions
func (s *EtcdOptions) ApplyWithStorageFactoryTo(factory serverstorage.StorageFactory, c *server.Config) error {
c.RESTOptionsGetter = &storageFactoryRestOptionsFactory{Options: *s, StorageFactory: factory}
return nil
}
func (f *restOptionsFactory) GetRESTOptions(resource schema.GroupResource) (generic.RESTOptions, error) {
type simpleRestOptionsFactory struct {
Options EtcdOptions
}
func (f *simpleRestOptionsFactory) GetRESTOptions(resource schema.GroupResource) (generic.RESTOptions, error) {
ret := generic.RESTOptions{
StorageConfig: &f.options.StorageConfig,
Decorator: registry.StorageWithCacher,
DeleteCollectionWorkers: f.options.DeleteCollectionWorkers,
EnableGarbageCollection: f.options.EnableGarbageCollection,
ResourcePrefix: f.options.StorageConfig.Prefix + "/" + resource.Group + "/" + resource.Resource,
StorageConfig: &f.Options.StorageConfig,
Decorator: generic.UndecoratedStorage,
EnableGarbageCollection: f.Options.EnableGarbageCollection,
DeleteCollectionWorkers: f.Options.DeleteCollectionWorkers,
ResourcePrefix: f.Options.StorageConfig.Prefix + "/" + resource.Group + "/" + resource.Resource,
}
if f.Options.EnableWatchCache {
ret.Decorator = genericregistry.StorageWithCacher
}
return ret, nil
}
type storageFactoryRestOptionsFactory struct {
Options EtcdOptions
StorageFactory serverstorage.StorageFactory
}
func (f *storageFactoryRestOptionsFactory) GetRESTOptions(resource schema.GroupResource) (generic.RESTOptions, error) {
storageConfig, err := f.StorageFactory.NewConfig(resource)
if err != nil {
return generic.RESTOptions{}, fmt.Errorf("unable to find storage destination for %v, due to %v", resource, err.Error())
}
if !f.options.EnableWatchCache {
ret.Decorator = generic.UndecoratedStorage
ret := generic.RESTOptions{
StorageConfig: storageConfig,
Decorator: generic.UndecoratedStorage,
DeleteCollectionWorkers: f.Options.DeleteCollectionWorkers,
EnableGarbageCollection: f.Options.EnableGarbageCollection,
ResourcePrefix: f.StorageFactory.ResourcePrefix(resource),
}
if f.Options.EnableWatchCache {
ret.Decorator = genericregistry.StorageWithCacher
}
return ret, nil

View File

@ -21,6 +21,7 @@ import (
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apiserver/pkg/server"
"k8s.io/apiserver/pkg/storage/storagebackend"
)
// RecommendedOptions contains the recommended options for running an API server
@ -36,7 +37,7 @@ type RecommendedOptions struct {
func NewRecommendedOptions(prefix string, copier runtime.ObjectCopier, codec runtime.Codec) *RecommendedOptions {
return &RecommendedOptions{
Etcd: NewEtcdOptions(prefix, copier, codec),
Etcd: NewEtcdOptions(storagebackend.NewDefaultConfig(prefix, copier, codec)),
SecureServing: NewSecureServingOptions(),
Authentication: NewDelegatingAuthenticationOptions(),
Authorization: NewDelegatingAuthorizationOptions(),

View File

@ -19,12 +19,12 @@ package storage
import (
"strings"
"github.com/golang/glog"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/apiserver/pkg/storage/storagebackend"
"github.com/golang/glog"
)
// StorageFactory is the interface to locate the storage for a given GroupResource

View File

@ -92,7 +92,7 @@ func (n *fakeNegotiater) DecoderToVersion(serializer runtime.Decoder, gv runtime
return n.serializer
}
func TestDefaultStorageFactory(t *testing.T) {
func TestConfigurableStorageFactory(t *testing.T) {
ns := &fakeNegotiater{types: []string{"test/test"}}
f := NewDefaultStorageFactory(storagebackend.Config{}, "test/test", ns, NewDefaultResourceEncodingConfig(registry), NewResourceConfig())
f.AddCohabitatingResources(example.Resource("test"), schema.GroupResource{Resource: "test2", Group: "2"})

View File

@ -43,6 +43,17 @@ type Config struct {
// We will drop the cache once using protobuf.
DeserializationCacheSize int
Codec runtime.Codec
Codec runtime.Codec
Copier runtime.ObjectCopier
}
func NewDefaultConfig(prefix string, copier runtime.ObjectCopier, codec runtime.Codec) *Config {
return &Config{
Prefix: prefix,
// Default cache size to 0 - if unset, its size will be set based on target
// memory usage.
DeserializationCacheSize: 0,
Copier: copier,
Codec: codec,
}
}

View File

@ -26,6 +26,9 @@ import (
"testing"
"time"
"github.com/go-openapi/spec"
"github.com/pborman/uuid"
"github.com/golang/glog"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
@ -40,6 +43,7 @@ import (
"k8s.io/apiserver/pkg/authorization/authorizerfactory"
authorizerunion "k8s.io/apiserver/pkg/authorization/union"
genericapiserver "k8s.io/apiserver/pkg/server"
"k8s.io/apiserver/pkg/server/options"
serverstorage "k8s.io/apiserver/pkg/server/storage"
"k8s.io/apiserver/pkg/storage/storagebackend"
restclient "k8s.io/client-go/rest"
@ -68,9 +72,6 @@ import (
"k8s.io/kubernetes/pkg/util/env"
"k8s.io/kubernetes/pkg/version"
"k8s.io/kubernetes/plugin/pkg/admission/admit"
"github.com/go-openapi/spec"
"github.com/pborman/uuid"
)
const (
@ -309,19 +310,16 @@ func GetEtcdURLFromEnv() string {
// Returns a basic master config.
func NewMasterConfig() *master.Config {
storageConfig := storagebackend.Config{
ServerList: []string{GetEtcdURLFromEnv()},
// This causes the integration tests to exercise the etcd
// prefix code, so please don't change without ensuring
// sufficient coverage in other ways.
Prefix: uuid.New(),
Copier: api.Scheme,
}
// This causes the integration tests to exercise the etcd
// prefix code, so please don't change without ensuring
// sufficient coverage in other ways.
etcdOptions := options.NewEtcdOptions(storagebackend.NewDefaultConfig(uuid.New(), api.Scheme, nil))
etcdOptions.StorageConfig.ServerList = []string{GetEtcdURLFromEnv()}
info, _ := runtime.SerializerInfoForMediaType(api.Codecs.SupportedMediaTypes(), runtime.ContentTypeJSON)
ns := NewSingleContentTypeSerializer(api.Scheme, info)
storageFactory := serverstorage.NewConfigurableStorageFactory(storageConfig, runtime.ContentTypeJSON, ns, serverstorage.NewDefaultResourceEncodingConfig(api.Registry), master.DefaultAPIResourceConfigSource())
storageFactory := serverstorage.NewDefaultStorageFactory(etcdOptions.StorageConfig, runtime.ContentTypeJSON, ns, serverstorage.NewDefaultResourceEncodingConfig(api.Registry), master.DefaultAPIResourceConfigSource())
storageFactory.SetSerializer(
schema.GroupResource{Group: v1.GroupName, Resource: serverstorage.AllResources},
"",
@ -365,11 +363,10 @@ func NewMasterConfig() *master.Config {
genericConfig.Authorizer = authorizerfactory.NewAlwaysAllowAuthorizer()
genericConfig.AdmissionControl = admit.NewAlwaysAdmit()
genericConfig.EnableMetrics = true
genericConfig.RESTOptionsGetter = &kubeapiserver.RESTOptionsFactory{
StorageFactory: storageFactory,
EnableWatchCache: true,
EnableGarbageCollection: true,
DeleteCollectionWorkers: 1,
err := etcdOptions.ApplyWithStorageFactoryTo(storageFactory, genericConfig)
if err != nil {
panic(err)
}
return &master.Config{