Merge pull request #25266 from smarterclayton/common_storage

Automatic merge from submit-queue

kube-apiserver options should be decoupled from impls

A few months ago we refactored options to keep it independent of the
implementations, so that it could be used in CLI tools to validate
config or to generate config, without pulling in the full dependency
tree of the master.  This change restores that by separating
server_run_options.go back to its own package.

Also, options structs should never contain non-serializable types, which
storagebackend.Config was doing with runtime.Codec. Split the codec out.

Fix a typo on the name of the etcd2.go storage backend.

Finally, move DefaultStorageMediaType to server_run_options.

@nikhiljindal as per my comment in #24454, @liggitt because you and I
discussed this last time
This commit is contained in:
k8s-merge-robot 2016-05-19 06:13:38 -07:00
commit d89d45a861
17 changed files with 120 additions and 70 deletions

View File

@ -21,7 +21,7 @@ import (
"time"
"k8s.io/kubernetes/pkg/api/validation"
"k8s.io/kubernetes/pkg/genericapiserver"
genericoptions "k8s.io/kubernetes/pkg/genericapiserver/options"
kubeletclient "k8s.io/kubernetes/pkg/kubelet/client"
"k8s.io/kubernetes/pkg/master/ports"
@ -30,7 +30,7 @@ import (
// APIServer runs a kubernetes api server.
type APIServer struct {
*genericapiserver.ServerRunOptions
*genericoptions.ServerRunOptions
AllowPrivileged bool
EventTTL time.Duration
KubeletConfig kubeletclient.KubeletClientConfig
@ -45,7 +45,7 @@ type APIServer struct {
// NewAPIServer creates a new APIServer object with default parameters
func NewAPIServer() *APIServer {
s := APIServer{
ServerRunOptions: genericapiserver.NewServerRunOptions(),
ServerRunOptions: genericoptions.NewServerRunOptions(),
EventTTL: 1 * time.Hour,
KubeletConfig: kubeletclient.KubeletClientConfig{
Port: ports.KubeletPort,

View File

@ -27,6 +27,7 @@ import (
"k8s.io/kubernetes/pkg/api/unversioned"
"k8s.io/kubernetes/pkg/apimachinery/registered"
"k8s.io/kubernetes/pkg/genericapiserver"
genericoptions "k8s.io/kubernetes/pkg/genericapiserver/options"
"k8s.io/kubernetes/pkg/storage/storagebackend"
// Install the testgroup API
@ -41,7 +42,7 @@ const (
func newStorageFactory() genericapiserver.StorageFactory {
config := storagebackend.Config{
Prefix: genericapiserver.DefaultEtcdPathPrefix,
Prefix: genericoptions.DefaultEtcdPathPrefix,
ServerList: []string{"http://127.0.0.1:4001"},
}
storageFactory := genericapiserver.NewDefaultStorageFactory(config, "application/json", api.Codecs, genericapiserver.NewDefaultResourceEncodingConfig(), genericapiserver.NewResourceConfig())
@ -49,13 +50,13 @@ func newStorageFactory() genericapiserver.StorageFactory {
return storageFactory
}
func NewServerRunOptions() *genericapiserver.ServerRunOptions {
serverOptions := genericapiserver.NewServerRunOptions()
func NewServerRunOptions() *genericoptions.ServerRunOptions {
serverOptions := genericoptions.NewServerRunOptions()
serverOptions.InsecurePort = InsecurePort
return serverOptions
}
func Run(serverOptions *genericapiserver.ServerRunOptions) error {
func Run(serverOptions *genericoptions.ServerRunOptions) error {
// Set ServiceClusterIPRange
_, serviceClusterIPRange, _ := net.ParseCIDR("10.0.0.0/24")
serverOptions.ServiceClusterIPRange = *serviceClusterIPRange

View File

@ -26,7 +26,7 @@ import (
"time"
"k8s.io/kubernetes/federation/cmd/federated-apiserver/app"
"k8s.io/kubernetes/pkg/genericapiserver"
genericoptions "k8s.io/kubernetes/pkg/genericapiserver/options"
"k8s.io/kubernetes/pkg/util"
"k8s.io/kubernetes/pkg/util/flag"
"k8s.io/kubernetes/pkg/version/verflag"
@ -38,7 +38,7 @@ func main() {
runtime.GOMAXPROCS(runtime.NumCPU())
rand.Seed(time.Now().UTC().UnixNano())
s := genericapiserver.NewServerRunOptions()
s := genericoptions.NewServerRunOptions()
s.AddFlags(pflag.CommandLine)
flag.InitFlags()

View File

@ -20,6 +20,7 @@ import (
"github.com/golang/glog"
"k8s.io/kubernetes/pkg/apimachinery/registered"
"k8s.io/kubernetes/pkg/genericapiserver"
genericoptions "k8s.io/kubernetes/pkg/genericapiserver/options"
"k8s.io/kubernetes/federation/apis/core"
_ "k8s.io/kubernetes/federation/apis/core/install"
@ -29,7 +30,7 @@ import (
serviceetcd "k8s.io/kubernetes/pkg/registry/service/etcd"
)
func installCoreAPIs(s *genericapiserver.ServerRunOptions, g *genericapiserver.GenericAPIServer, f genericapiserver.StorageFactory) {
func installCoreAPIs(s *genericoptions.ServerRunOptions, g *genericapiserver.GenericAPIServer, f genericapiserver.StorageFactory) {
serviceStore, serviceStatusStorage := serviceetcd.NewREST(createRESTOptionsOrDie(s, g, f, api.Resource("service")))
coreResources := map[string]rest.Storage{
"services": serviceStore,

View File

@ -24,12 +24,13 @@ import (
"k8s.io/kubernetes/pkg/api/rest"
"k8s.io/kubernetes/pkg/apimachinery/registered"
"k8s.io/kubernetes/pkg/genericapiserver"
genericoptions "k8s.io/kubernetes/pkg/genericapiserver/options"
_ "k8s.io/kubernetes/federation/apis/federation/install"
clusteretcd "k8s.io/kubernetes/federation/registry/cluster/etcd"
)
func installFederationAPIs(s *genericapiserver.ServerRunOptions, g *genericapiserver.GenericAPIServer, f genericapiserver.StorageFactory) {
func installFederationAPIs(s *genericoptions.ServerRunOptions, g *genericapiserver.GenericAPIServer, f genericapiserver.StorageFactory) {
clusterStorage, clusterStatusStorage := clusteretcd.NewREST(createRESTOptionsOrDie(s, g, f, federation.Resource("clusters")))
federationResources := map[string]rest.Storage{
"clusters": clusterStorage,

View File

@ -32,13 +32,14 @@ import (
"k8s.io/kubernetes/pkg/apiserver"
"k8s.io/kubernetes/pkg/apiserver/authenticator"
"k8s.io/kubernetes/pkg/genericapiserver"
genericoptions "k8s.io/kubernetes/pkg/genericapiserver/options"
"k8s.io/kubernetes/pkg/registry/cachesize"
"k8s.io/kubernetes/pkg/registry/generic"
)
// NewAPIServerCommand creates a *cobra.Command object with default parameters
func NewAPIServerCommand() *cobra.Command {
s := genericapiserver.NewServerRunOptions()
s := genericoptions.NewServerRunOptions()
s.AddFlags(pflag.CommandLine)
cmd := &cobra.Command{
Use: "federated-apiserver",
@ -54,7 +55,7 @@ cluster's shared state through which all other components interact.`,
}
// Run runs the specified APIServer. This should never exit.
func Run(s *genericapiserver.ServerRunOptions) error {
func Run(s *genericoptions.ServerRunOptions) error {
genericapiserver.DefaultAndValidateRunOptions(s)
// TODO: register cluster federation resources here.
@ -148,7 +149,7 @@ func Run(s *genericapiserver.ServerRunOptions) error {
return nil
}
func createRESTOptionsOrDie(s *genericapiserver.ServerRunOptions, g *genericapiserver.GenericAPIServer, f genericapiserver.StorageFactory, resource unversioned.GroupResource) generic.RESTOptions {
func createRESTOptionsOrDie(s *genericoptions.ServerRunOptions, g *genericapiserver.GenericAPIServer, f genericapiserver.StorageFactory, resource unversioned.GroupResource) generic.RESTOptions {
storage, err := f.New(resource)
if err != nil {
glog.Fatalf("Unable to find storage destination for %v, due to %v", resource, err.Error())

View File

@ -31,11 +31,11 @@ import (
fed_v1a1 "k8s.io/kubernetes/federation/apis/federation/v1alpha1"
"k8s.io/kubernetes/pkg/api/unversioned"
"k8s.io/kubernetes/pkg/api/v1"
"k8s.io/kubernetes/pkg/genericapiserver"
"k8s.io/kubernetes/pkg/genericapiserver/options"
)
func TestLongRunningRequestRegexp(t *testing.T) {
regexp := regexp.MustCompile(genericapiserver.NewServerRunOptions().LongRunningRequestRE)
regexp := regexp.MustCompile(options.NewServerRunOptions().LongRunningRequestRE)
dontMatch := []string{
"/api/v1/watch-namespace/",
"/api/v1/namespace-proxy/",
@ -82,7 +82,7 @@ var groupVersions = []unversioned.GroupVersion{
}
func TestRun(t *testing.T) {
s := genericapiserver.NewServerRunOptions()
s := options.NewServerRunOptions()
s.InsecurePort = insecurePort
_, ipNet, _ := net.ParseCIDR("10.10.10.0/24")
s.ServiceClusterIPRange = *ipNet

View File

@ -40,6 +40,7 @@ import (
"k8s.io/kubernetes/pkg/auth/authorizer"
"k8s.io/kubernetes/pkg/auth/handlers"
"k8s.io/kubernetes/pkg/cloudprovider"
"k8s.io/kubernetes/pkg/genericapiserver/options"
"k8s.io/kubernetes/pkg/registry/generic"
"k8s.io/kubernetes/pkg/registry/generic/registry"
ipallocator "k8s.io/kubernetes/pkg/registry/service/ipallocator"
@ -57,11 +58,7 @@ import (
"github.com/golang/glog"
)
const (
DefaultEtcdPathPrefix = "/registry"
DefaultDeserializationCacheSize = 50000
globalTimeout = time.Minute
)
const globalTimeout = time.Minute
// Info about an API group.
type APIGroupInfo struct {
@ -95,6 +92,7 @@ type APIGroupInfo struct {
// Config is a structure used to configure a GenericAPIServer.
type Config struct {
// The storage factory for other objects
StorageFactory StorageFactory
// allow downstream consumers to disable the core controller loops
EnableLogsSupport bool
@ -538,7 +536,7 @@ func (s *GenericAPIServer) installGroupsDiscoveryHandler() {
}
// TODO: Longer term we should read this from some config store, rather than a flag.
func verifyClusterIPFlags(options *ServerRunOptions) {
func verifyClusterIPFlags(options *options.ServerRunOptions) {
if options.ServiceClusterIPRange.IP == nil {
glog.Fatal("No --service-cluster-ip-range specified")
}
@ -548,7 +546,7 @@ func verifyClusterIPFlags(options *ServerRunOptions) {
}
}
func NewConfig(options *ServerRunOptions) *Config {
func NewConfig(options *options.ServerRunOptions) *Config {
return &Config{
APIGroupPrefix: options.APIGroupPrefix,
APIPrefix: options.APIPrefix,
@ -571,25 +569,25 @@ func NewConfig(options *ServerRunOptions) *Config {
}
}
func verifyServiceNodePort(options *ServerRunOptions) {
func verifyServiceNodePort(options *options.ServerRunOptions) {
if options.KubernetesServiceNodePort > 0 && !options.ServiceNodePortRange.Contains(options.KubernetesServiceNodePort) {
glog.Fatalf("Kubernetes service port range %v doesn't contain %v", options.ServiceNodePortRange, (options.KubernetesServiceNodePort))
}
}
func verifyEtcdServersList(options *ServerRunOptions) {
func verifyEtcdServersList(options *options.ServerRunOptions) {
if len(options.StorageConfig.ServerList) == 0 {
glog.Fatalf("--etcd-servers must be specified")
}
}
func ValidateRunOptions(options *ServerRunOptions) {
func ValidateRunOptions(options *options.ServerRunOptions) {
verifyClusterIPFlags(options)
verifyServiceNodePort(options)
verifyEtcdServersList(options)
}
func DefaultAndValidateRunOptions(options *ServerRunOptions) {
func DefaultAndValidateRunOptions(options *options.ServerRunOptions) {
ValidateRunOptions(options)
// If advertise-address is not specified, use bind-address. If bind-address
@ -635,7 +633,7 @@ func DefaultAndValidateRunOptions(options *ServerRunOptions) {
}
}
func (s *GenericAPIServer) Run(options *ServerRunOptions) {
func (s *GenericAPIServer) Run(options *options.ServerRunOptions) {
if s.enableSwaggerSupport {
s.InstallSwaggerAPI()
}

View File

@ -0,0 +1,21 @@
/*
Copyright 2016 The Kubernetes Authors All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
// package options is the public flags and options used by a generic api
// server. It takes a minimal set of dependencies and does not reference
// implementations, in order to ensure it may be reused by multiple components
// (such as CLI commands that wish to generate or validate config).
package options

View File

@ -14,7 +14,7 @@ See the License for the specific language governing permissions and
limitations under the License.
*/
package genericapiserver
package options
import (
"net"
@ -38,6 +38,9 @@ import (
)
const (
DefaultEtcdPathPrefix = "/registry"
DefaultDeserializationCacheSize = 50000
// TODO: This can be tightened up. It still matches objects named watch or proxy.
defaultLongRunningRequestRE = "(/|^)((watch|proxy)(/|$)|(logs?|portforward|exec|attach)/?$)"
)

View File

@ -24,6 +24,7 @@ import (
"k8s.io/kubernetes/pkg/api/unversioned"
"k8s.io/kubernetes/pkg/apis/autoscaling"
"k8s.io/kubernetes/pkg/apis/extensions"
"k8s.io/kubernetes/pkg/genericapiserver/options"
)
func TestGenerateStorageVersionMap(t *testing.T) {
@ -66,7 +67,7 @@ func TestGenerateStorageVersionMap(t *testing.T) {
},
}
for i, test := range testCases {
s := ServerRunOptions{
s := options.ServerRunOptions{
DeprecatedStorageVersion: test.legacyVersion,
StorageVersions: test.storageVersions,
DefaultStorageVersions: test.defaultVersions,

View File

@ -26,6 +26,7 @@ import (
"k8s.io/kubernetes/pkg/runtime/serializer/versioning"
"k8s.io/kubernetes/pkg/storage"
"k8s.io/kubernetes/pkg/storage/storagebackend"
storagebackendfactory "k8s.io/kubernetes/pkg/storage/storagebackend/factory"
"k8s.io/kubernetes/pkg/util/sets"
"github.com/golang/glog"
@ -70,7 +71,7 @@ type DefaultStorageFactory struct {
newStorageCodecFn func(storageMediaType string, ns runtime.StorageSerializer, storageVersion, memoryVersion unversioned.GroupVersion, config storagebackend.Config) (codec runtime.Codec, err error)
// newStorageFn exists to be overwritten for unit testing.
newStorageFn func(config storagebackend.Config) (etcdStorage storage.Interface, err error)
newStorageFn func(config storagebackend.Config, codec runtime.Codec) (etcdStorage storage.Interface, err error)
}
type groupResourceOverrides struct {
@ -212,15 +213,13 @@ func (s *DefaultStorageFactory) New(groupResource unversioned.GroupResource) (st
return nil, err
}
config.Codec = codec
glog.V(3).Infof("storing %v in %v, reading as %v from %v", groupResource, storageEncodingVersion, internalVersion, config)
return s.newStorageFn(config)
return s.newStorageFn(config, codec)
}
// newStorage is the default implementation for creating a storage backend.
func newStorage(config storagebackend.Config) (etcdStorage storage.Interface, err error) {
return storagebackend.Create(config)
func newStorage(config storagebackend.Config, codec runtime.Codec) (etcdStorage storage.Interface, err error) {
return storagebackendfactory.Create(config, codec)
}
// Get all backends for all registered storage destinations.

View File

@ -23,6 +23,8 @@ import (
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/api/unversioned"
"k8s.io/kubernetes/pkg/apis/extensions"
"k8s.io/kubernetes/pkg/genericapiserver/options"
"k8s.io/kubernetes/pkg/runtime"
"k8s.io/kubernetes/pkg/storage"
"k8s.io/kubernetes/pkg/storage/storagebackend"
)
@ -49,13 +51,13 @@ func TestUpdateEtcdOverrides(t *testing.T) {
defaultEtcdLocation := []string{"http://127.0.0.1"}
for i, test := range testCases {
actualConfig := storagebackend.Config{}
newStorageFn := func(config storagebackend.Config) (_ storage.Interface, err error) {
newStorageFn := func(config storagebackend.Config, codec runtime.Codec) (_ storage.Interface, err error) {
actualConfig = config
return nil, nil
}
defaultConfig := storagebackend.Config{
Prefix: DefaultEtcdPathPrefix,
Prefix: options.DefaultEtcdPathPrefix,
ServerList: defaultEtcdLocation,
}
storageFactory := NewDefaultStorageFactory(defaultConfig, "", api.Codecs, NewDefaultResourceEncodingConfig(), NewResourceConfig())

View File

@ -16,13 +16,6 @@ limitations under the License.
package storagebackend
import (
"fmt"
"k8s.io/kubernetes/pkg/runtime"
"k8s.io/kubernetes/pkg/storage"
)
const (
StorageTypeUnset = ""
StorageTypeETCD2 = "etcd2"
@ -33,8 +26,6 @@ const (
type Config struct {
// Type defines the type of storage backend, e.g. "etcd2", etcd3". Default ("") is "etcd2".
Type string
// Codec is used to serialize/deserialize objects.
Codec runtime.Codec
// Prefix is the prefix to all keys passed to storage.Interface methods.
Prefix string
// ServerList is the list of storage servers to connect with.
@ -50,19 +41,3 @@ type Config struct {
// We will drop the cache once using protobuf.
DeserializationCacheSize int
}
// Create creates a storage backend based on given config.
func Create(c Config) (storage.Interface, error) {
switch c.Type {
case StorageTypeUnset, StorageTypeETCD2:
return newETCD2Storage(c)
case StorageTypeETCD3:
// TODO: We have the following features to implement:
// - Support secure connection by using key, cert, and CA files.
// - Honor "https" scheme to support secure connection in gRPC.
// - Support non-quorum read.
return newETCD3Storage(c)
default:
return nil, fmt.Errorf("unknown storage type: %s", c.Type)
}
}

View File

@ -14,7 +14,7 @@ See the License for the specific language governing permissions and
limitations under the License.
*/
package storagebackend
package factory
import (
"net"
@ -23,12 +23,15 @@ import (
etcd2client "github.com/coreos/etcd/client"
"github.com/coreos/etcd/pkg/transport"
"k8s.io/kubernetes/pkg/runtime"
"k8s.io/kubernetes/pkg/storage"
"k8s.io/kubernetes/pkg/storage/etcd"
"k8s.io/kubernetes/pkg/storage/storagebackend"
utilnet "k8s.io/kubernetes/pkg/util/net"
)
func newETCD2Storage(c Config) (storage.Interface, error) {
func newETCD2Storage(c storagebackend.Config, codec runtime.Codec) (storage.Interface, error) {
tr, err := newTransportForETCD2(c.CertFile, c.KeyFile, c.CAFile)
if err != nil {
return nil, err
@ -37,7 +40,7 @@ func newETCD2Storage(c Config) (storage.Interface, error) {
if err != nil {
return nil, err
}
return etcd.NewEtcdStorage(client, c.Codec, c.Prefix, c.Quorum, c.DeserializationCacheSize), nil
return etcd.NewEtcdStorage(client, codec, c.Prefix, c.Quorum, c.DeserializationCacheSize), nil
}
func newETCD2Client(tr *http.Transport, serverList []string) (etcd2client.Client, error) {

View File

@ -14,18 +14,21 @@ See the License for the specific language governing permissions and
limitations under the License.
*/
package storagebackend
package factory
import (
"strings"
"github.com/coreos/etcd/clientv3"
"golang.org/x/net/context"
"k8s.io/kubernetes/pkg/runtime"
"k8s.io/kubernetes/pkg/storage"
"k8s.io/kubernetes/pkg/storage/etcd3"
"k8s.io/kubernetes/pkg/storage/storagebackend"
)
func newETCD3Storage(c Config) (storage.Interface, error) {
func newETCD3Storage(c storagebackend.Config, codec runtime.Codec) (storage.Interface, error) {
endpoints := c.ServerList
for i, s := range endpoints {
endpoints[i] = strings.TrimLeft(s, "http://")
@ -38,5 +41,5 @@ func newETCD3Storage(c Config) (storage.Interface, error) {
return nil, err
}
etcd3.StartCompactor(context.Background(), client)
return etcd3.New(client, c.Codec, c.Prefix), nil
return etcd3.New(client, codec, c.Prefix), nil
}

View File

@ -0,0 +1,41 @@
/*
Copyright 2016 The Kubernetes Authors All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package factory
import (
"fmt"
"k8s.io/kubernetes/pkg/runtime"
"k8s.io/kubernetes/pkg/storage"
"k8s.io/kubernetes/pkg/storage/storagebackend"
)
// Create creates a storage backend based on given config.
func Create(c storagebackend.Config, codec runtime.Codec) (storage.Interface, error) {
switch c.Type {
case storagebackend.StorageTypeUnset, storagebackend.StorageTypeETCD2:
return newETCD2Storage(c, codec)
case storagebackend.StorageTypeETCD3:
// TODO: We have the following features to implement:
// - Support secure connection by using key, cert, and CA files.
// - Honor "https" scheme to support secure connection in gRPC.
// - Support non-quorum read.
return newETCD3Storage(c, codec)
default:
return nil, fmt.Errorf("unknown storage type: %s", c.Type)
}
}