Merge pull request #24455 from hongchaodeng/fl

Automatic merge from submit-queue

Provide flags to use etcd3 backed storage

ref: #24405

What's in this PR?
- Add a new flag "storage-backend" to choose "etcd2" or "etcd3". By default (i.e. empty), it's "etcd2".
- Take out etcd config code into a standalone package and let it create etcd2 or etcd3 storage backend given user input.
This commit is contained in:
k8s-merge-robot
2016-04-29 08:49:04 -07:00
17 changed files with 267 additions and 153 deletions

View File

@@ -23,7 +23,7 @@ import (
"k8s.io/kubernetes/pkg/runtime"
"k8s.io/kubernetes/pkg/runtime/serializer/versioning"
"k8s.io/kubernetes/pkg/storage"
etcdstorage "k8s.io/kubernetes/pkg/storage/etcd"
"k8s.io/kubernetes/pkg/storage/storagebackend"
"k8s.io/kubernetes/pkg/util/sets"
"github.com/golang/glog"
@@ -44,9 +44,9 @@ type StorageFactory interface {
// 2. Resource encodings for storage: group,version,kind to store as
// 3. Cohabitating default: some resources like hpa are exposed through multiple APIs. They must agree on 1 and 2
type DefaultStorageFactory struct {
// DefaultEtcdConfig describes how to connect to etcd in general. It's authentication information will be used for
// every storage.Interface returned.
DefaultEtcdConfig etcdstorage.EtcdConfig
// StorageConfig describes how to create a storage backend in general.
// Its authentication information will be used for every storage.Interface returned.
StorageConfig storagebackend.Config
Overrides map[unversioned.GroupResource]groupResourceOverrides
@@ -62,12 +62,12 @@ type DefaultStorageFactory struct {
APIResourceConfigSource APIResourceConfigSource
// newEtcdFn exists to be overwritten for unit testing. You should never set this in a normal world.
newEtcdFn func(ns runtime.NegotiatedSerializer, storageVersion, memoryVersion unversioned.GroupVersion, etcdConfig etcdstorage.EtcdConfig) (etcdStorage storage.Interface, err error)
newEtcdFn func(ns runtime.NegotiatedSerializer, storageVersion, memoryVersion unversioned.GroupVersion, config storagebackend.Config) (etcdStorage storage.Interface, err error)
}
type groupResourceOverrides struct {
// etcdLocation contains the list of "special" locations that are used for particular GroupResources
// These are merged on top of the default DefaultEtcdConfig when requesting the storage.Interface for a given GroupResource
// These are merged on top of the StorageConfig when requesting the storage.Interface for a given GroupResource
etcdLocation []string
// etcdPrefix contains the list of "special" prefixes for a GroupResource. Resource=* means for the entire group
etcdPrefix string
@@ -83,9 +83,9 @@ var _ StorageFactory = &DefaultStorageFactory{}
const AllResources = "*"
func NewDefaultStorageFactory(defaultEtcdConfig etcdstorage.EtcdConfig, defaultSerializer runtime.NegotiatedSerializer, resourceEncodingConfig ResourceEncodingConfig, resourceConfig APIResourceConfigSource) *DefaultStorageFactory {
func NewDefaultStorageFactory(config storagebackend.Config, defaultSerializer runtime.NegotiatedSerializer, resourceEncodingConfig ResourceEncodingConfig, resourceConfig APIResourceConfigSource) *DefaultStorageFactory {
return &DefaultStorageFactory{
DefaultEtcdConfig: defaultEtcdConfig,
StorageConfig: config,
Overrides: map[unversioned.GroupResource]groupResourceOverrides{},
DefaultSerializer: defaultSerializer,
ResourceEncodingConfig: resourceEncodingConfig,
@@ -152,7 +152,7 @@ func (s *DefaultStorageFactory) New(groupResource unversioned.GroupResource) (st
overriddenEtcdLocations = exactResourceOverride.etcdLocation
}
etcdPrefix := s.DefaultEtcdConfig.Prefix
etcdPrefix := s.StorageConfig.Prefix
if len(groupOverride.etcdPrefix) > 0 {
etcdPrefix = groupOverride.etcdPrefix
}
@@ -168,10 +168,10 @@ func (s *DefaultStorageFactory) New(groupResource unversioned.GroupResource) (st
etcdSerializer = exactResourceOverride.serializer
}
// operate on copy
etcdConfig := s.DefaultEtcdConfig
etcdConfig.Prefix = etcdPrefix
config := s.StorageConfig
config.Prefix = etcdPrefix
if len(overriddenEtcdLocations) > 0 {
etcdConfig.ServerList = overriddenEtcdLocations
config.ServerList = overriddenEtcdLocations
}
storageEncodingVersion, err := s.ResourceEncodingConfig.StoragageEncodingFor(chosenStorageResource)
@@ -183,13 +183,11 @@ func (s *DefaultStorageFactory) New(groupResource unversioned.GroupResource) (st
return nil, err
}
glog.V(3).Infof("storing %v in %v, reading as %v from %v", groupResource, storageEncodingVersion, internalVersion, etcdConfig)
return s.newEtcdFn(etcdSerializer, storageEncodingVersion, internalVersion, etcdConfig)
glog.V(3).Infof("storing %v in %v, reading as %v from %v", groupResource, storageEncodingVersion, internalVersion, config)
return s.newEtcdFn(etcdSerializer, storageEncodingVersion, internalVersion, config)
}
func newEtcd(ns runtime.NegotiatedSerializer, storageVersion, memoryVersion unversioned.GroupVersion, etcdConfig etcdstorage.EtcdConfig) (etcdStorage storage.Interface, err error) {
var storageConfig etcdstorage.EtcdStorageConfig
storageConfig.Config = etcdConfig
func newEtcd(ns runtime.NegotiatedSerializer, storageVersion, memoryVersion unversioned.GroupVersion, config storagebackend.Config) (etcdStorage storage.Interface, err error) {
s, ok := ns.SerializerForMediaType("application/json", nil)
if !ok {
return nil, fmt.Errorf("unable to find serializer for JSON")
@@ -205,14 +203,14 @@ func newEtcd(ns runtime.NegotiatedSerializer, storageVersion, memoryVersion unve
return nil, fmt.Errorf("error setting up decoder from %v to %v: %v", storageVersion, memoryVersion, err)
}
}
storageConfig.Codec = runtime.NewCodec(encoder, decoder)
return storageConfig.NewStorage()
config.Codec = runtime.NewCodec(encoder, decoder)
return storagebackend.Create(config)
}
// Get all backends for all registered storage destinations.
// Used for getting all instances for health validations.
func (s *DefaultStorageFactory) Backends() []string {
backends := sets.NewString(s.DefaultEtcdConfig.ServerList...)
backends := sets.NewString(s.StorageConfig.ServerList...)
for _, overrides := range s.Overrides {
backends.Insert(overrides.etcdLocation...)

View File

@@ -25,7 +25,7 @@ import (
"k8s.io/kubernetes/pkg/apis/extensions"
"k8s.io/kubernetes/pkg/runtime"
"k8s.io/kubernetes/pkg/storage"
etcdstorage "k8s.io/kubernetes/pkg/storage/etcd"
"k8s.io/kubernetes/pkg/storage/storagebackend"
)
func TestUpdateEtcdOverrides(t *testing.T) {
@@ -49,17 +49,17 @@ func TestUpdateEtcdOverrides(t *testing.T) {
defaultEtcdLocation := []string{"http://127.0.0.1"}
for i, test := range testCases {
actualEtcdConfig := etcdstorage.EtcdConfig{}
newEtcdFn := func(ns runtime.NegotiatedSerializer, storageVersion, memoryVersion unversioned.GroupVersion, etcdConfig etcdstorage.EtcdConfig) (etcdStorage storage.Interface, err error) {
actualEtcdConfig = etcdConfig
actualConfig := storagebackend.Config{}
newEtcdFn := func(ns runtime.NegotiatedSerializer, storageVersion, memoryVersion unversioned.GroupVersion, config storagebackend.Config) (etcdStorage storage.Interface, err error) {
actualConfig = config
return nil, nil
}
defaultEtcdConfig := etcdstorage.EtcdConfig{
defaultConfig := storagebackend.Config{
Prefix: DefaultEtcdPathPrefix,
ServerList: defaultEtcdLocation,
}
storageFactory := NewDefaultStorageFactory(defaultEtcdConfig, api.Codecs, NewDefaultResourceEncodingConfig(), NewResourceConfig())
storageFactory := NewDefaultStorageFactory(defaultConfig, api.Codecs, NewDefaultResourceEncodingConfig(), NewResourceConfig())
storageFactory.newEtcdFn = newEtcdFn
storageFactory.SetEtcdLocation(test.resource, test.servers)
@@ -69,8 +69,8 @@ func TestUpdateEtcdOverrides(t *testing.T) {
t.Errorf("%d: unexpected error %v", i, err)
continue
}
if !reflect.DeepEqual(actualEtcdConfig.ServerList, test.servers) {
t.Errorf("%d: expected %v, got %v", i, test.servers, actualEtcdConfig.ServerList)
if !reflect.DeepEqual(actualConfig.ServerList, test.servers) {
t.Errorf("%d: expected %v, got %v", i, test.servers, actualConfig.ServerList)
continue
}
@@ -79,8 +79,8 @@ func TestUpdateEtcdOverrides(t *testing.T) {
t.Errorf("%d: unexpected error %v", i, err)
continue
}
if !reflect.DeepEqual(actualEtcdConfig.ServerList, defaultEtcdLocation) {
t.Errorf("%d: expected %v, got %v", i, defaultEtcdLocation, actualEtcdConfig.ServerList)
if !reflect.DeepEqual(actualConfig.ServerList, defaultEtcdLocation) {
t.Errorf("%d: expected %v, got %v", i, defaultEtcdLocation, actualConfig.ServerList)
continue
}

View File

@@ -32,10 +32,6 @@ import (
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/api/testapi"
"k8s.io/kubernetes/pkg/api/unversioned"
"k8s.io/kubernetes/pkg/apiserver"
utilnet "k8s.io/kubernetes/pkg/util/net"
"k8s.io/kubernetes/pkg/util/sets"
apiv1 "k8s.io/kubernetes/pkg/api/v1"
"k8s.io/kubernetes/pkg/apimachinery/registered"
"k8s.io/kubernetes/pkg/apis/apps"
@@ -46,6 +42,7 @@ import (
batchapiv1 "k8s.io/kubernetes/pkg/apis/batch/v1"
"k8s.io/kubernetes/pkg/apis/extensions"
extensionsapiv1beta1 "k8s.io/kubernetes/pkg/apis/extensions/v1beta1"
"k8s.io/kubernetes/pkg/apiserver"
"k8s.io/kubernetes/pkg/genericapiserver"
"k8s.io/kubernetes/pkg/kubelet/client"
"k8s.io/kubernetes/pkg/registry/endpoint"
@@ -53,10 +50,12 @@ import (
"k8s.io/kubernetes/pkg/registry/registrytest"
"k8s.io/kubernetes/pkg/runtime"
"k8s.io/kubernetes/pkg/storage"
etcdstorage "k8s.io/kubernetes/pkg/storage/etcd"
"k8s.io/kubernetes/pkg/storage/etcd/etcdtest"
etcdtesting "k8s.io/kubernetes/pkg/storage/etcd/testing"
"k8s.io/kubernetes/pkg/storage/storagebackend"
"k8s.io/kubernetes/pkg/util/intstr"
utilnet "k8s.io/kubernetes/pkg/util/net"
"k8s.io/kubernetes/pkg/util/sets"
"github.com/stretchr/testify/assert"
"golang.org/x/net/context"
@@ -73,14 +72,14 @@ func setUp(t *testing.T) (*Master, *etcdtesting.EtcdTestServer, Config, *assert.
Config: &genericapiserver.Config{},
}
etcdConfig := etcdstorage.EtcdConfig{
storageConfig := storagebackend.Config{
Prefix: etcdtest.PathPrefix(),
CAFile: server.CAFile,
KeyFile: server.KeyFile,
CertFile: server.CertFile,
}
for _, url := range server.ClientURLs {
etcdConfig.ServerList = append(etcdConfig.ServerList, url.String())
storageConfig.ServerList = append(storageConfig.ServerList, url.String())
}
resourceEncoding := genericapiserver.NewDefaultResourceEncodingConfig()
@@ -89,7 +88,7 @@ func setUp(t *testing.T) (*Master, *etcdtesting.EtcdTestServer, Config, *assert.
resourceEncoding.SetVersionEncoding(batch.GroupName, *testapi.Batch.GroupVersion(), unversioned.GroupVersion{Group: batch.GroupName, Version: runtime.APIVersionInternal})
resourceEncoding.SetVersionEncoding(apps.GroupName, *testapi.Apps.GroupVersion(), unversioned.GroupVersion{Group: apps.GroupName, Version: runtime.APIVersionInternal})
resourceEncoding.SetVersionEncoding(extensions.GroupName, *testapi.Extensions.GroupVersion(), unversioned.GroupVersion{Group: extensions.GroupName, Version: runtime.APIVersionInternal})
storageFactory := genericapiserver.NewDefaultStorageFactory(etcdConfig, api.Codecs, resourceEncoding, DefaultAPIResourceConfigSource())
storageFactory := genericapiserver.NewDefaultStorageFactory(storageConfig, api.Codecs, resourceEncoding, DefaultAPIResourceConfigSource())
config.StorageFactory = storageFactory
config.APIResourceConfigSource = DefaultAPIResourceConfigSource()

View File

@@ -19,8 +19,6 @@ package etcd
import (
"errors"
"fmt"
"net"
"net/http"
"path"
"reflect"
"strings"
@@ -35,90 +33,13 @@ import (
etcdutil "k8s.io/kubernetes/pkg/storage/etcd/util"
"k8s.io/kubernetes/pkg/util"
utilcache "k8s.io/kubernetes/pkg/util/cache"
utilnet "k8s.io/kubernetes/pkg/util/net"
"k8s.io/kubernetes/pkg/watch"
etcd "github.com/coreos/etcd/client"
"github.com/coreos/etcd/pkg/transport"
"github.com/golang/glog"
"golang.org/x/net/context"
)
// storage.Config object for etcd.
type EtcdStorageConfig struct {
Config EtcdConfig
Codec runtime.Codec
}
// implements storage.Config
func (c *EtcdStorageConfig) GetType() string {
return "etcd"
}
// implements storage.Config
func (c *EtcdStorageConfig) NewStorage() (storage.Interface, error) {
etcdClient, err := c.Config.newEtcdClient()
if err != nil {
return nil, err
}
return NewEtcdStorage(etcdClient, c.Codec, c.Config.Prefix, c.Config.Quorum, c.Config.DeserializationCacheSize), nil
}
// Configuration object for constructing etcd.Config
type EtcdConfig struct {
Prefix string
ServerList []string
KeyFile string
CertFile string
CAFile string
Quorum bool
DeserializationCacheSize int
}
func (c *EtcdConfig) newEtcdClient() (etcd.Client, error) {
t, err := c.newHttpTransport()
if err != nil {
return nil, err
}
cli, err := etcd.New(etcd.Config{
Endpoints: c.ServerList,
Transport: t,
})
if err != nil {
return nil, err
}
return cli, nil
}
func (c *EtcdConfig) newHttpTransport() (*http.Transport, error) {
info := transport.TLSInfo{
CertFile: c.CertFile,
KeyFile: c.KeyFile,
CAFile: c.CAFile,
}
cfg, err := info.ClientConfig()
if err != nil {
return nil, err
}
// Copied from etcd.DefaultTransport declaration.
// TODO: Determine if transport needs optimization
tr := utilnet.SetTransportDefaults(&http.Transport{
Proxy: http.ProxyFromEnvironment,
Dial: (&net.Dialer{
Timeout: 30 * time.Second,
KeepAlive: 30 * time.Second,
}).Dial,
TLSHandshakeTimeout: 10 * time.Second,
MaxIdleConnsPerHost: 500,
TLSClientConfig: cfg,
})
return tr, nil
}
// Creates a new storage interface from the client
// TODO: deprecate in favor of storage.Config abstraction over time
func NewEtcdStorage(client etcd.Client, codec runtime.Codec, prefix string, quorum bool, cacheSize int) storage.Interface {

View File

@@ -57,6 +57,11 @@ type objState struct {
data []byte
}
// New returns an etcd3 implementation of storage.Interface.
func New(c *clientv3.Client, codec runtime.Codec, prefix string) storage.Interface {
return newStore(c, codec, prefix)
}
func newStore(c *clientv3.Client, codec runtime.Codec, prefix string) *store {
versioner := etcd.APIObjectVersioner{}
return &store{

View File

@@ -0,0 +1,68 @@
/*
Copyright 2016 The Kubernetes Authors All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package storagebackend
import (
"fmt"
"k8s.io/kubernetes/pkg/runtime"
"k8s.io/kubernetes/pkg/storage"
)
const (
StorageTypeUnset = ""
StorageTypeETCD2 = "etcd2"
StorageTypeETCD3 = "etcd3"
)
// Config is configuration for creating a storage backend.
type Config struct {
// Type defines the type of storage backend, e.g. "etcd2", etcd3". Default ("") is "etcd2".
Type string
// Codec is used to serialize/deserialize objects.
Codec runtime.Codec
// Prefix is the prefix to all keys passed to storage.Interface methods.
Prefix string
// ServerList is the list of storage servers to connect with.
ServerList []string
// TLS credentials
KeyFile string
CertFile string
CAFile string
// Quorum indicates that whether read operations should be quorum-level consistent.
Quorum bool
// DeserializationCacheSize is the size of cache of deserialized objects.
// Currently this is only supported in etcd2.
// We will drop the cache once using protobuf.
DeserializationCacheSize int
}
// Create creates a storage backend based on given config.
func Create(c Config) (storage.Interface, error) {
switch c.Type {
case StorageTypeUnset, StorageTypeETCD2:
return newETCD2Storage(c)
case StorageTypeETCD3:
// TODO: We have the following features to implement:
// - Support secure connection by using key, cert, and CA files.
// - Honor "https" scheme to support secure connection in gRPC.
// - Support non-quorum read.
return newETCD3Storage(c)
default:
return nil, fmt.Errorf("unknown storage type: %s", c.Type)
}
}

View File

@@ -0,0 +1,40 @@
/*
Copyright 2016 The Kubernetes Authors All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package storagebackend
import (
"strings"
"github.com/coreos/etcd/clientv3"
"k8s.io/kubernetes/pkg/storage"
"k8s.io/kubernetes/pkg/storage/etcd3"
)
func newETCD3Storage(c Config) (storage.Interface, error) {
endpoints := c.ServerList
for i, s := range endpoints {
endpoints[i] = strings.TrimLeft(s, "http://")
}
cfg := clientv3.Config{
Endpoints: endpoints,
}
client, err := clientv3.New(cfg)
if err != nil {
return nil, err
}
return etcd3.New(client, c.Codec, c.Prefix), nil
}

View File

@@ -0,0 +1,78 @@
/*
Copyright 2016 The Kubernetes Authors All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package storagebackend
import (
"net"
"net/http"
"time"
etcd2client "github.com/coreos/etcd/client"
"github.com/coreos/etcd/pkg/transport"
"k8s.io/kubernetes/pkg/storage"
"k8s.io/kubernetes/pkg/storage/etcd"
utilnet "k8s.io/kubernetes/pkg/util/net"
)
func newETCD2Storage(c Config) (storage.Interface, error) {
tr, err := newTransportForETCD2(c.CertFile, c.KeyFile, c.CAFile)
if err != nil {
return nil, err
}
client, err := newETCD2Client(tr, c.ServerList)
if err != nil {
return nil, err
}
return etcd.NewEtcdStorage(client, c.Codec, c.Prefix, c.Quorum, c.DeserializationCacheSize), nil
}
func newETCD2Client(tr *http.Transport, serverList []string) (etcd2client.Client, error) {
cli, err := etcd2client.New(etcd2client.Config{
Endpoints: serverList,
Transport: tr,
})
if err != nil {
return nil, err
}
return cli, nil
}
func newTransportForETCD2(certFile, keyFile, caFile string) (*http.Transport, error) {
info := transport.TLSInfo{
CertFile: certFile,
KeyFile: keyFile,
CAFile: caFile,
}
cfg, err := info.ClientConfig()
if err != nil {
return nil, err
}
// Copied from etcd.DefaultTransport declaration.
// TODO: Determine if transport needs optimization
tr := utilnet.SetTransportDefaults(&http.Transport{
Proxy: http.ProxyFromEnvironment,
Dial: (&net.Dialer{
Timeout: 30 * time.Second,
KeepAlive: 30 * time.Second,
}).Dial,
TLSHandshakeTimeout: 10 * time.Second,
MaxIdleConnsPerHost: 500,
TLSClientConfig: cfg,
})
return tr, nil
}