restore ability to run against secured etcd

This commit is contained in:
AdoHe 2016-03-11 11:21:16 -05:00
parent 7fbe5ddc17
commit 7228b9b987
11 changed files with 271 additions and 84 deletions

View File

@ -31,6 +31,7 @@ import (
"k8s.io/kubernetes/pkg/genericapiserver" "k8s.io/kubernetes/pkg/genericapiserver"
kubeletclient "k8s.io/kubernetes/pkg/kubelet/client" kubeletclient "k8s.io/kubernetes/pkg/kubelet/client"
"k8s.io/kubernetes/pkg/master/ports" "k8s.io/kubernetes/pkg/master/ports"
etcdstorage "k8s.io/kubernetes/pkg/storage/etcd"
"k8s.io/kubernetes/pkg/util" "k8s.io/kubernetes/pkg/util"
utilnet "k8s.io/kubernetes/pkg/util/net" utilnet "k8s.io/kubernetes/pkg/util/net"
@ -57,9 +58,8 @@ type APIServer struct {
EnableLogsSupport bool EnableLogsSupport bool
EnableProfiling bool EnableProfiling bool
EnableWatchCache bool EnableWatchCache bool
EtcdPathPrefix string
EtcdServerList []string
EtcdServersOverrides []string EtcdServersOverrides []string
EtcdConfig etcdstorage.EtcdConfig
EventTTL time.Duration EventTTL time.Duration
ExternalHost string ExternalHost string
KeystoneURL string KeystoneURL string
@ -100,13 +100,15 @@ func NewAPIServer() *APIServer {
AuthorizationMode: "AlwaysAllow", AuthorizationMode: "AlwaysAllow",
DeleteCollectionWorkers: 1, DeleteCollectionWorkers: 1,
EnableLogsSupport: true, EnableLogsSupport: true,
EtcdPathPrefix: genericapiserver.DefaultEtcdPathPrefix, EtcdConfig: etcdstorage.EtcdConfig{
EventTTL: 1 * time.Hour, Prefix: genericapiserver.DefaultEtcdPathPrefix,
MasterCount: 1, },
MasterServiceNamespace: api.NamespaceDefault, EventTTL: 1 * time.Hour,
RuntimeConfig: make(util.ConfigurationMap), MasterCount: 1,
StorageVersions: registered.AllPreferredGroupVersions(), MasterServiceNamespace: api.NamespaceDefault,
DefaultStorageVersions: registered.AllPreferredGroupVersions(), RuntimeConfig: make(util.ConfigurationMap),
StorageVersions: registered.AllPreferredGroupVersions(),
DefaultStorageVersions: registered.AllPreferredGroupVersions(),
KubeletConfig: kubeletclient.KubeletClientConfig{ KubeletConfig: kubeletclient.KubeletClientConfig{
Port: ports.KubeletPort, Port: ports.KubeletPort,
EnableHttps: true, EnableHttps: true,
@ -220,10 +222,13 @@ func (s *APIServer) AddFlags(fs *pflag.FlagSet) {
fs.StringVar(&s.AuthorizationConfig.WebhookConfigFile, "authorization-webhook-config-file", s.AuthorizationConfig.WebhookConfigFile, "File with webhook configuration in kubeconfig format, used with --authorization-mode=Webhook. The API server will query the remote service to determine access on the API server's secure port.") fs.StringVar(&s.AuthorizationConfig.WebhookConfigFile, "authorization-webhook-config-file", s.AuthorizationConfig.WebhookConfigFile, "File with webhook configuration in kubeconfig format, used with --authorization-mode=Webhook. The API server will query the remote service to determine access on the API server's secure port.")
fs.StringVar(&s.AdmissionControl, "admission-control", s.AdmissionControl, "Ordered list of plug-ins to do admission control of resources into cluster. Comma-delimited list of: "+strings.Join(admission.GetPlugins(), ", ")) fs.StringVar(&s.AdmissionControl, "admission-control", s.AdmissionControl, "Ordered list of plug-ins to do admission control of resources into cluster. Comma-delimited list of: "+strings.Join(admission.GetPlugins(), ", "))
fs.StringVar(&s.AdmissionControlConfigFile, "admission-control-config-file", s.AdmissionControlConfigFile, "File with admission control configuration.") fs.StringVar(&s.AdmissionControlConfigFile, "admission-control-config-file", s.AdmissionControlConfigFile, "File with admission control configuration.")
fs.StringSliceVar(&s.EtcdServerList, "etcd-servers", s.EtcdServerList, "List of etcd servers to watch (http://ip:port), comma separated. Mutually exclusive with -etcd-config") fs.StringSliceVar(&s.EtcdConfig.ServerList, "etcd-servers", s.EtcdConfig.ServerList, "List of etcd servers to watch (http://ip:port), comma separated. Mutually exclusive with -etcd-config")
fs.StringSliceVar(&s.EtcdServersOverrides, "etcd-servers-overrides", s.EtcdServersOverrides, "Per-resource etcd servers overrides, comma separated. The individual override format: group/resource#servers, where servers are http://ip:port, semicolon separated.") fs.StringSliceVar(&s.EtcdServersOverrides, "etcd-servers-overrides", s.EtcdServersOverrides, "Per-resource etcd servers overrides, comma separated. The individual override format: group/resource#servers, where servers are http://ip:port, semicolon separated.")
fs.StringVar(&s.EtcdPathPrefix, "etcd-prefix", s.EtcdPathPrefix, "The prefix for all resource paths in etcd.") fs.StringVar(&s.EtcdConfig.Prefix, "etcd-prefix", s.EtcdConfig.Prefix, "The prefix for all resource paths in etcd.")
fs.BoolVar(&s.EtcdQuorumRead, "etcd-quorum-read", s.EtcdQuorumRead, "If true, enable quorum read") fs.StringVar(&s.EtcdConfig.KeyFile, "etcd-keyfile", s.EtcdConfig.KeyFile, "SSL key file used to secure etcd communication")
fs.StringVar(&s.EtcdConfig.CertFile, "etcd-certfile", s.EtcdConfig.CertFile, "SSL certification file used to secure etcd communication")
fs.StringVar(&s.EtcdConfig.CAFile, "etcd-cafile", s.EtcdConfig.CAFile, "SSL Certificate Authority file used to secure etcd communication")
fs.BoolVar(&s.EtcdConfig.Quorum, "etcd-quorum-read", s.EtcdConfig.Quorum, "If true, enable quorum read")
fs.StringSliceVar(&s.CorsAllowedOriginList, "cors-allowed-origins", s.CorsAllowedOriginList, "List of allowed origins for CORS, comma separated. An allowed origin can be a regular expression to support subdomain matching. If this list is empty CORS will not be enabled.") fs.StringSliceVar(&s.CorsAllowedOriginList, "cors-allowed-origins", s.CorsAllowedOriginList, "List of allowed origins for CORS, comma separated. An allowed origin can be a regular expression to support subdomain matching. If this list is empty CORS will not be enabled.")
fs.BoolVar(&s.AllowPrivileged, "allow-privileged", s.AllowPrivileged, "If true, allow privileged containers.") fs.BoolVar(&s.AllowPrivileged, "allow-privileged", s.AllowPrivileged, "If true, allow privileged containers.")
fs.IPNetVar(&s.ServiceClusterIPRange, "service-cluster-ip-range", s.ServiceClusterIPRange, "A CIDR notation IP range from which to assign service cluster IPs. This must not overlap with any IP ranges assigned to nodes for pods.") fs.IPNetVar(&s.ServiceClusterIPRange, "service-cluster-ip-range", s.ServiceClusterIPRange, "A CIDR notation IP range from which to assign service cluster IPs. This must not overlap with any IP ranges assigned to nodes for pods.")

View File

@ -88,9 +88,9 @@ func verifyClusterIPFlags(s *options.APIServer) {
} }
// For testing. // For testing.
type newEtcdFunc func([]string, runtime.NegotiatedSerializer, string, string, string, bool) (storage.Interface, error) type newEtcdFunc func(runtime.NegotiatedSerializer, string, string, etcdstorage.EtcdConfig) (storage.Interface, error)
func newEtcd(etcdServerList []string, ns runtime.NegotiatedSerializer, storageGroupVersionString, memoryGroupVersionString, pathPrefix string, quorum bool) (etcdStorage storage.Interface, err error) { func newEtcd(ns runtime.NegotiatedSerializer, storageGroupVersionString, memoryGroupVersionString string, etcdConfig etcdstorage.EtcdConfig) (etcdStorage storage.Interface, err error) {
if storageGroupVersionString == "" { if storageGroupVersionString == "" {
return etcdStorage, fmt.Errorf("storageVersion is required to create a etcd storage") return etcdStorage, fmt.Errorf("storageVersion is required to create a etcd storage")
} }
@ -103,10 +103,8 @@ func newEtcd(etcdServerList []string, ns runtime.NegotiatedSerializer, storageGr
return nil, fmt.Errorf("couldn't understand memory version %v: %v", memoryGroupVersionString, err) return nil, fmt.Errorf("couldn't understand memory version %v: %v", memoryGroupVersionString, err)
} }
var storageConfig etcdstorage.EtcdConfig var storageConfig etcdstorage.EtcdStorageConfig
storageConfig.ServerList = etcdServerList storageConfig.Config = etcdConfig
storageConfig.Prefix = pathPrefix
storageConfig.Quorum = quorum
s, ok := ns.SerializerForMediaType("application/json", nil) s, ok := ns.SerializerForMediaType("application/json", nil)
if !ok { if !ok {
return nil, fmt.Errorf("unable to find serializer for JSON") return nil, fmt.Errorf("unable to find serializer for JSON")
@ -128,7 +126,7 @@ func newEtcd(etcdServerList []string, ns runtime.NegotiatedSerializer, storageGr
} }
// parse the value of --etcd-servers-overrides and update given storageDestinations. // parse the value of --etcd-servers-overrides and update given storageDestinations.
func updateEtcdOverrides(overrides []string, storageVersions map[string]string, prefix string, quorum bool, storageDestinations *genericapiserver.StorageDestinations, newEtcdFn newEtcdFunc) { func updateEtcdOverrides(overrides []string, storageVersions map[string]string, etcdConfig etcdstorage.EtcdConfig, storageDestinations *genericapiserver.StorageDestinations, newEtcdFn newEtcdFunc) {
if len(overrides) == 0 { if len(overrides) == 0 {
return return
} }
@ -157,11 +155,13 @@ func updateEtcdOverrides(overrides []string, storageVersions map[string]string,
} }
servers := strings.Split(tokens[1], ";") servers := strings.Split(tokens[1], ";")
overrideEtcdConfig := etcdConfig
overrideEtcdConfig.ServerList = servers
// Note, internalGV will be wrong for things like batch or // Note, internalGV will be wrong for things like batch or
// autoscalers, but they shouldn't be using the override // autoscalers, but they shouldn't be using the override
// storage. // storage.
internalGV := apigroup.GroupVersion.Group + "/__internal" internalGV := apigroup.GroupVersion.Group + "/__internal"
etcdOverrideStorage, err := newEtcdFn(servers, api.Codecs, storageVersions[apigroup.GroupVersion.Group], internalGV, prefix, quorum) etcdOverrideStorage, err := newEtcdFn(api.Codecs, storageVersions[apigroup.GroupVersion.Group], internalGV, overrideEtcdConfig)
if err != nil { if err != nil {
glog.Fatalf("Invalid storage version or misconfigured etcd for %s: %v", tokens[0], err) glog.Fatalf("Invalid storage version or misconfigured etcd for %s: %v", tokens[0], err)
} }
@ -187,7 +187,7 @@ func Run(s *options.APIServer) error {
} }
glog.Infof("Will report %v as public IP address.", s.AdvertiseAddress) glog.Infof("Will report %v as public IP address.", s.AdvertiseAddress)
if len(s.EtcdServerList) == 0 { if len(s.EtcdConfig.ServerList) == 0 {
glog.Fatalf("--etcd-servers must be specified") glog.Fatalf("--etcd-servers must be specified")
} }
@ -281,7 +281,7 @@ func Run(s *options.APIServer) error {
if _, found := storageVersions[legacyV1Group.GroupVersion.Group]; !found { if _, found := storageVersions[legacyV1Group.GroupVersion.Group]; !found {
glog.Fatalf("Couldn't find the storage version for group: %q in storageVersions: %v", legacyV1Group.GroupVersion.Group, storageVersions) glog.Fatalf("Couldn't find the storage version for group: %q in storageVersions: %v", legacyV1Group.GroupVersion.Group, storageVersions)
} }
etcdStorage, err := newEtcd(s.EtcdServerList, api.Codecs, storageVersions[legacyV1Group.GroupVersion.Group], "/__internal", s.EtcdPathPrefix, s.EtcdQuorumRead) etcdStorage, err := newEtcd(api.Codecs, storageVersions[legacyV1Group.GroupVersion.Group], "/__internal", s.EtcdConfig)
if err != nil { if err != nil {
glog.Fatalf("Invalid storage version or misconfigured etcd: %v", err) glog.Fatalf("Invalid storage version or misconfigured etcd: %v", err)
} }
@ -296,7 +296,7 @@ func Run(s *options.APIServer) error {
if _, found := storageVersions[expGroup.GroupVersion.Group]; !found { if _, found := storageVersions[expGroup.GroupVersion.Group]; !found {
glog.Fatalf("Couldn't find the storage version for group: %q in storageVersions: %v", expGroup.GroupVersion.Group, storageVersions) glog.Fatalf("Couldn't find the storage version for group: %q in storageVersions: %v", expGroup.GroupVersion.Group, storageVersions)
} }
expEtcdStorage, err := newEtcd(s.EtcdServerList, api.Codecs, storageVersions[expGroup.GroupVersion.Group], "extensions/__internal", s.EtcdPathPrefix, s.EtcdQuorumRead) expEtcdStorage, err := newEtcd(api.Codecs, storageVersions[expGroup.GroupVersion.Group], "extensions/__internal", s.EtcdConfig)
if err != nil { if err != nil {
glog.Fatalf("Invalid extensions storage version or misconfigured etcd: %v", err) glog.Fatalf("Invalid extensions storage version or misconfigured etcd: %v", err)
} }
@ -332,7 +332,7 @@ func Run(s *options.APIServer) error {
glog.Fatalf("The storage version for autoscaling must be either 'autoscaling/v1' or 'extensions/v1beta1'") glog.Fatalf("The storage version for autoscaling must be either 'autoscaling/v1' or 'extensions/v1beta1'")
} }
glog.Infof("Using %v for autoscaling group storage version", storageGroupVersion) glog.Infof("Using %v for autoscaling group storage version", storageGroupVersion)
autoscalingEtcdStorage, err := newEtcd(s.EtcdServerList, api.Codecs, storageGroupVersion, "extensions/__internal", s.EtcdPathPrefix, s.EtcdQuorumRead) autoscalingEtcdStorage, err := newEtcd(api.Codecs, storageGroupVersion, "extensions/__internal", s.EtcdConfig)
if err != nil { if err != nil {
glog.Fatalf("Invalid extensions storage version or misconfigured etcd: %v", err) glog.Fatalf("Invalid extensions storage version or misconfigured etcd: %v", err)
} }
@ -359,14 +359,14 @@ func Run(s *options.APIServer) error {
glog.Fatalf("The storage version for batch must be either 'batch/v1' or 'extensions/v1beta1'") glog.Fatalf("The storage version for batch must be either 'batch/v1' or 'extensions/v1beta1'")
} }
glog.Infof("Using %v for batch group storage version", storageGroupVersion) glog.Infof("Using %v for batch group storage version", storageGroupVersion)
batchEtcdStorage, err := newEtcd(s.EtcdServerList, api.Codecs, storageGroupVersion, "extensions/__internal", s.EtcdPathPrefix, s.EtcdQuorumRead) batchEtcdStorage, err := newEtcd(api.Codecs, storageGroupVersion, "extensions/__internal", s.EtcdConfig)
if err != nil { if err != nil {
glog.Fatalf("Invalid extensions storage version or misconfigured etcd: %v", err) glog.Fatalf("Invalid extensions storage version or misconfigured etcd: %v", err)
} }
storageDestinations.AddAPIGroup(batch.GroupName, batchEtcdStorage) storageDestinations.AddAPIGroup(batch.GroupName, batchEtcdStorage)
} }
updateEtcdOverrides(s.EtcdServersOverrides, storageVersions, s.EtcdPathPrefix, s.EtcdQuorumRead, &storageDestinations, newEtcd) updateEtcdOverrides(s.EtcdServersOverrides, storageVersions, s.EtcdConfig, &storageDestinations, newEtcd)
n := s.ServiceClusterIPRange n := s.ServiceClusterIPRange

View File

@ -28,6 +28,7 @@ import (
"k8s.io/kubernetes/pkg/genericapiserver" "k8s.io/kubernetes/pkg/genericapiserver"
"k8s.io/kubernetes/pkg/runtime" "k8s.io/kubernetes/pkg/runtime"
"k8s.io/kubernetes/pkg/storage" "k8s.io/kubernetes/pkg/storage"
etcdstorage "k8s.io/kubernetes/pkg/storage/etcd"
) )
func TestLongRunningRequestRegexp(t *testing.T) { func TestLongRunningRequestRegexp(t *testing.T) {
@ -100,15 +101,19 @@ func TestUpdateEtcdOverrides(t *testing.T) {
} }
for _, test := range testCases { for _, test := range testCases {
newEtcd := func(serverList []string, _ runtime.NegotiatedSerializer, _, _, _ string, _ bool) (storage.Interface, error) { newEtcd := func(_ runtime.NegotiatedSerializer, _, _ string, etcdConfig etcdstorage.EtcdConfig) (storage.Interface, error) {
if !reflect.DeepEqual(test.servers, serverList) { if !reflect.DeepEqual(test.servers, etcdConfig.ServerList) {
t.Errorf("unexpected server list, expected: %#v, got: %#v", test.servers, serverList) t.Errorf("unexpected server list, expected: %#v, got: %#v", test.servers, etcdConfig.ServerList)
} }
return nil, nil return nil, nil
} }
storageDestinations := genericapiserver.NewStorageDestinations() storageDestinations := genericapiserver.NewStorageDestinations()
override := test.apigroup + "/" + test.resource + "#" + strings.Join(test.servers, ";") override := test.apigroup + "/" + test.resource + "#" + strings.Join(test.servers, ";")
updateEtcdOverrides([]string{override}, storageVersions, "", false, &storageDestinations, newEtcd) defaultEtcdConfig := etcdstorage.EtcdConfig{
Prefix: genericapiserver.DefaultEtcdPathPrefix,
ServerList: []string{"http://127.0.0.1"},
}
updateEtcdOverrides([]string{override}, storageVersions, defaultEtcdConfig, &storageDestinations, newEtcd)
apigroup, ok := storageDestinations.APIGroups[test.apigroup] apigroup, ok := storageDestinations.APIGroups[test.apigroup]
if !ok { if !ok {
t.Errorf("apigroup: %s not created", test.apigroup) t.Errorf("apigroup: %s not created", test.apigroup)

View File

@ -67,6 +67,9 @@ kube-apiserver
--cloud-provider="": The provider for cloud services. Empty string for no provider. --cloud-provider="": The provider for cloud services. Empty string for no provider.
--cors-allowed-origins=[]: List of allowed origins for CORS, comma separated. An allowed origin can be a regular expression to support subdomain matching. If this list is empty CORS will not be enabled. --cors-allowed-origins=[]: List of allowed origins for CORS, comma separated. An allowed origin can be a regular expression to support subdomain matching. If this list is empty CORS will not be enabled.
--delete-collection-workers=1: Number of workers spawned for DeleteCollection call. These are used to speed up namespace cleanup. --delete-collection-workers=1: Number of workers spawned for DeleteCollection call. These are used to speed up namespace cleanup.
--etcd-cafile="": SSL Certificate Authority file used to secure etcd communication
--etcd-certfile="": SSL certification file used to secure etcd communication
--etcd-keyfile="": SSL key file used to secure etcd communication
--etcd-prefix="/registry": The prefix for all resource paths in etcd. --etcd-prefix="/registry": The prefix for all resource paths in etcd.
--etcd-quorum-read[=false]: If true, enable quorum read --etcd-quorum-read[=false]: If true, enable quorum read
--etcd-servers=[]: List of etcd servers to watch (http://ip:port), comma separated. Mutually exclusive with -etcd-config --etcd-servers=[]: List of etcd servers to watch (http://ip:port), comma separated. Mutually exclusive with -etcd-config
@ -112,7 +115,7 @@ kube-apiserver
--watch-cache-sizes=[]: List of watch cache sizes for every resource (pods, nodes, etc.), comma separated. The individual override format: resource#size, where size is a number. It takes effect when watch-cache is enabled. --watch-cache-sizes=[]: List of watch cache sizes for every resource (pods, nodes, etc.), comma separated. The individual override format: resource#size, where size is a number. It takes effect when watch-cache is enabled.
``` ```
###### Auto generated by spf13/cobra on 24-Feb-2016 ###### Auto generated by spf13/cobra on 6-Mar-2016
<!-- BEGIN MUNGE: GENERATED_ANALYTICS --> <!-- BEGIN MUNGE: GENERATED_ANALYTICS -->

View File

@ -34,9 +34,11 @@ import (
func newStorageDestinations(groupName string, groupMeta *apimachinery.GroupMeta) (*genericapiserver.StorageDestinations, error) { func newStorageDestinations(groupName string, groupMeta *apimachinery.GroupMeta) (*genericapiserver.StorageDestinations, error) {
storageDestinations := genericapiserver.NewStorageDestinations() storageDestinations := genericapiserver.NewStorageDestinations()
var storageConfig etcdstorage.EtcdConfig var storageConfig etcdstorage.EtcdStorageConfig
storageConfig.ServerList = []string{"http://127.0.0.1:4001"} storageConfig.Config = etcdstorage.EtcdConfig{
storageConfig.Prefix = genericapiserver.DefaultEtcdPathPrefix Prefix: genericapiserver.DefaultEtcdPathPrefix,
ServerList: []string{"http://127.0.0.1:4001"},
}
storageConfig.Codec = groupMeta.Codec storageConfig.Codec = groupMeta.Codec
storageInterface, err := storageConfig.NewStorage() storageInterface, err := storageConfig.NewStorage()
if err != nil { if err != nil {

View File

@ -102,6 +102,9 @@ etcd-quorum-read
etcd-server etcd-server
etcd-servers etcd-servers
etcd-servers-overrides etcd-servers-overrides
etcd-keyfile
etcd-certfile
etcd-cafile
event-burst event-burst
event-qps event-qps
event-ttl event-ttl

View File

@ -30,7 +30,6 @@ type ServerRunOptions struct {
BindAddress net.IP BindAddress net.IP
CertDirectory string CertDirectory string
ClientCAFile string ClientCAFile string
EtcdQuorumRead bool
InsecureBindAddress net.IP InsecureBindAddress net.IP
InsecurePort int InsecurePort int
LongRunningRequestRE string LongRunningRequestRE string

View File

@ -37,69 +37,110 @@ import (
"k8s.io/kubernetes/pkg/watch" "k8s.io/kubernetes/pkg/watch"
etcd "github.com/coreos/etcd/client" etcd "github.com/coreos/etcd/client"
"github.com/coreos/etcd/pkg/transport"
"github.com/golang/glog" "github.com/golang/glog"
"golang.org/x/net/context" "golang.org/x/net/context"
) )
// storage.Config object for etcd. // storage.Config object for etcd.
type EtcdConfig struct { type EtcdStorageConfig struct {
ServerList []string Config EtcdConfig
Codec runtime.Codec Codec runtime.Codec
Prefix string
Quorum bool
} }
// implements storage.Config // implements storage.Config
func (c *EtcdConfig) GetType() string { func (c *EtcdStorageConfig) GetType() string {
return "etcd" return "etcd"
} }
// implements storage.Config // implements storage.Config
func (c *EtcdConfig) NewStorage() (storage.Interface, error) { func (c *EtcdStorageConfig) NewStorage() (storage.Interface, error) {
cfg := etcd.Config{ etcdClient, err := c.Config.newEtcdClient()
Endpoints: c.ServerList,
// TODO: Determine if transport needs optimization
Transport: &http.Transport{
Proxy: http.ProxyFromEnvironment,
Dial: (&net.Dialer{
Timeout: 30 * time.Second,
KeepAlive: 30 * time.Second,
}).Dial,
TLSHandshakeTimeout: 10 * time.Second,
MaxIdleConnsPerHost: 500,
},
}
etcdClient, err := etcd.New(cfg)
if err != nil { if err != nil {
return nil, err return nil, err
} }
return NewEtcdStorage(etcdClient, c.Codec, c.Prefix, c.Quorum), nil return NewEtcdStorage(etcdClient, c.Codec, c.Config.Prefix, c.Config.Quorum), nil
}
// Configuration object for constructing etcd.Config
type EtcdConfig struct {
Prefix string
ServerList []string
KeyFile string
CertFile string
CAFile string
Quorum bool
}
func (c *EtcdConfig) newEtcdClient() (etcd.Client, error) {
t, err := c.newHttpTransport()
if err != nil {
return nil, err
}
cli, err := etcd.New(etcd.Config{
Endpoints: c.ServerList,
Transport: t,
})
if err != nil {
return nil, err
}
return cli, nil
}
func (c *EtcdConfig) newHttpTransport() (*http.Transport, error) {
info := transport.TLSInfo{
CertFile: c.CertFile,
KeyFile: c.KeyFile,
CAFile: c.CAFile,
}
cfg, err := info.ClientConfig()
if err != nil {
return nil, err
}
// Copied from etcd.DefaultTransport declaration.
// TODO: Determine if transport needs optimization
tr := &http.Transport{
Proxy: http.ProxyFromEnvironment,
Dial: (&net.Dialer{
Timeout: 30 * time.Second,
KeepAlive: 30 * time.Second,
}).Dial,
TLSHandshakeTimeout: 10 * time.Second,
MaxIdleConnsPerHost: 500,
TLSClientConfig: cfg,
}
return tr, nil
} }
// Creates a new storage interface from the client // Creates a new storage interface from the client
// TODO: deprecate in favor of storage.Config abstraction over time // TODO: deprecate in favor of storage.Config abstraction over time
func NewEtcdStorage(client etcd.Client, codec runtime.Codec, prefix string, quorum bool) storage.Interface { func NewEtcdStorage(client etcd.Client, codec runtime.Codec, prefix string, quorum bool) storage.Interface {
return &etcdHelper{ return &etcdHelper{
etcdclient: client, etcdMembersAPI: etcd.NewMembersAPI(client),
client: etcd.NewKeysAPI(client), etcdKeysAPI: etcd.NewKeysAPI(client),
codec: codec, codec: codec,
versioner: APIObjectVersioner{}, versioner: APIObjectVersioner{},
copier: api.Scheme, copier: api.Scheme,
pathPrefix: path.Join("/", prefix), pathPrefix: path.Join("/", prefix),
quorum: quorum, quorum: quorum,
cache: util.NewCache(maxEtcdCacheEntries), cache: util.NewCache(maxEtcdCacheEntries),
} }
} }
// etcdHelper is the reference implementation of storage.Interface. // etcdHelper is the reference implementation of storage.Interface.
type etcdHelper struct { type etcdHelper struct {
etcdclient etcd.Client etcdMembersAPI etcd.MembersAPI
client etcd.KeysAPI etcdKeysAPI etcd.KeysAPI
codec runtime.Codec codec runtime.Codec
copier runtime.ObjectCopier copier runtime.ObjectCopier
// Note that versioner is required for etcdHelper to work correctly. // Note that versioner is required for etcdHelper to work correctly.
// The public constructors (NewStorage & NewEtcdStorage) are setting it // The public constructors (NewStorage & NewEtcdStorage) are setting it
// correctly, so be careful when manipulating with it manually. // correctly, so be careful when manipulating with it manually.
// optional, has to be set to perform any atomic operations
versioner storage.Versioner versioner storage.Versioner
// prefix for all etcd keys // prefix for all etcd keys
pathPrefix string pathPrefix string
@ -130,8 +171,7 @@ func (h *etcdHelper) Backends(ctx context.Context) []string {
if ctx == nil { if ctx == nil {
glog.Errorf("Context is nil") glog.Errorf("Context is nil")
} }
membersAPI := etcd.NewMembersAPI(h.etcdclient) members, err := h.etcdMembersAPI.List(ctx)
members, err := membersAPI.List(ctx)
if err != nil { if err != nil {
glog.Errorf("Error obtaining etcd members list: %q", err) glog.Errorf("Error obtaining etcd members list: %q", err)
return nil return nil
@ -171,7 +211,7 @@ func (h *etcdHelper) Create(ctx context.Context, key string, obj, out runtime.Ob
TTL: time.Duration(ttl) * time.Second, TTL: time.Duration(ttl) * time.Second,
PrevExist: etcd.PrevNoExist, PrevExist: etcd.PrevNoExist,
} }
response, err := h.client.Set(ctx, key, string(data), &opts) response, err := h.etcdKeysAPI.Set(ctx, key, string(data), &opts)
metrics.RecordEtcdRequestLatency("create", getTypeName(obj), startTime) metrics.RecordEtcdRequestLatency("create", getTypeName(obj), startTime)
trace.Step("Object created") trace.Step("Object created")
if err != nil { if err != nil {
@ -219,7 +259,7 @@ func (h *etcdHelper) Set(ctx context.Context, key string, obj, out runtime.Objec
TTL: time.Duration(ttl) * time.Second, TTL: time.Duration(ttl) * time.Second,
PrevIndex: version, PrevIndex: version,
} }
response, err = h.client.Set(ctx, key, string(data), &opts) response, err = h.etcdKeysAPI.Set(ctx, key, string(data), &opts)
metrics.RecordEtcdRequestLatency("compareAndSwap", getTypeName(obj), startTime) metrics.RecordEtcdRequestLatency("compareAndSwap", getTypeName(obj), startTime)
if err != nil { if err != nil {
return err return err
@ -232,7 +272,7 @@ func (h *etcdHelper) Set(ctx context.Context, key string, obj, out runtime.Objec
TTL: time.Duration(ttl) * time.Second, TTL: time.Duration(ttl) * time.Second,
PrevExist: etcd.PrevNoExist, PrevExist: etcd.PrevNoExist,
} }
response, err = h.client.Set(ctx, key, string(data), &opts) response, err = h.etcdKeysAPI.Set(ctx, key, string(data), &opts)
if err != nil { if err != nil {
return err return err
} }
@ -260,7 +300,7 @@ func (h *etcdHelper) Delete(ctx context.Context, key string, out runtime.Object)
} }
startTime := time.Now() startTime := time.Now()
response, err := h.client.Delete(ctx, key, nil) response, err := h.etcdKeysAPI.Delete(ctx, key, nil)
metrics.RecordEtcdRequestLatency("delete", getTypeName(out), startTime) metrics.RecordEtcdRequestLatency("delete", getTypeName(out), startTime)
if !etcdutil.IsEtcdNotFound(err) { if !etcdutil.IsEtcdNotFound(err) {
// if the object that existed prior to the delete is returned by etcd, update out. // if the object that existed prior to the delete is returned by etcd, update out.
@ -282,7 +322,7 @@ func (h *etcdHelper) Watch(ctx context.Context, key string, resourceVersion stri
} }
key = h.prefixEtcdKey(key) key = h.prefixEtcdKey(key)
w := newEtcdWatcher(false, h.quorum, nil, filter, h.codec, h.versioner, nil, h) w := newEtcdWatcher(false, h.quorum, nil, filter, h.codec, h.versioner, nil, h)
go w.etcdWatch(ctx, h.client, key, watchRV) go w.etcdWatch(ctx, h.etcdKeysAPI, key, watchRV)
return w, nil return w, nil
} }
@ -297,7 +337,7 @@ func (h *etcdHelper) WatchList(ctx context.Context, key string, resourceVersion
} }
key = h.prefixEtcdKey(key) key = h.prefixEtcdKey(key)
w := newEtcdWatcher(true, h.quorum, exceptKey(key), filter, h.codec, h.versioner, nil, h) w := newEtcdWatcher(true, h.quorum, exceptKey(key), filter, h.codec, h.versioner, nil, h)
go w.etcdWatch(ctx, h.client, key, watchRV) go w.etcdWatch(ctx, h.etcdKeysAPI, key, watchRV)
return w, nil return w, nil
} }
@ -323,7 +363,7 @@ func (h *etcdHelper) bodyAndExtractObj(ctx context.Context, key string, objPtr r
Quorum: h.quorum, Quorum: h.quorum,
} }
response, err := h.client.Get(ctx, key, opts) response, err := h.etcdKeysAPI.Get(ctx, key, opts)
metrics.RecordEtcdRequestLatency("get", getTypeName(objPtr), startTime) metrics.RecordEtcdRequestLatency("get", getTypeName(objPtr), startTime)
if err != nil && !etcdutil.IsEtcdNotFound(err) { if err != nil && !etcdutil.IsEtcdNotFound(err) {
@ -384,7 +424,7 @@ func (h *etcdHelper) GetToList(ctx context.Context, key string, filter storage.F
opts := &etcd.GetOptions{ opts := &etcd.GetOptions{
Quorum: h.quorum, Quorum: h.quorum,
} }
response, err := h.client.Get(ctx, key, opts) response, err := h.etcdKeysAPI.Get(ctx, key, opts)
metrics.RecordEtcdRequestLatency("get", getTypeName(listPtr), startTime) metrics.RecordEtcdRequestLatency("get", getTypeName(listPtr), startTime)
trace.Step("Etcd node read") trace.Step("Etcd node read")
@ -489,7 +529,7 @@ func (h *etcdHelper) listEtcdNode(ctx context.Context, key string) ([]*etcd.Node
Sort: true, Sort: true,
Quorum: h.quorum, Quorum: h.quorum,
} }
result, err := h.client.Get(ctx, key, &opts) result, err := h.etcdKeysAPI.Get(ctx, key, &opts)
if err != nil { if err != nil {
var index uint64 var index uint64
if etcdError, ok := err.(etcd.Error); ok { if etcdError, ok := err.(etcd.Error); ok {
@ -575,7 +615,7 @@ func (h *etcdHelper) GuaranteedUpdate(ctx context.Context, key string, ptrToType
TTL: time.Duration(ttl) * time.Second, TTL: time.Duration(ttl) * time.Second,
PrevExist: etcd.PrevNoExist, PrevExist: etcd.PrevNoExist,
} }
response, err := h.client.Set(ctx, key, string(data), &opts) response, err := h.etcdKeysAPI.Set(ctx, key, string(data), &opts)
metrics.RecordEtcdRequestLatency("create", getTypeName(ptrToType), startTime) metrics.RecordEtcdRequestLatency("create", getTypeName(ptrToType), startTime)
if etcdutil.IsEtcdNodeExist(err) { if etcdutil.IsEtcdNodeExist(err) {
continue continue
@ -598,7 +638,7 @@ func (h *etcdHelper) GuaranteedUpdate(ctx context.Context, key string, ptrToType
PrevIndex: index, PrevIndex: index,
TTL: time.Duration(ttl) * time.Second, TTL: time.Duration(ttl) * time.Second,
} }
response, err := h.client.Set(ctx, key, string(data), &opts) response, err := h.etcdKeysAPI.Set(ctx, key, string(data), &opts)
metrics.RecordEtcdRequestLatency("compareAndSwap", getTypeName(ptrToType), startTime) metrics.RecordEtcdRequestLatency("compareAndSwap", getTypeName(ptrToType), startTime)
if etcdutil.IsEtcdTestFailed(err) { if etcdutil.IsEtcdTestFailed(err) {
// Try again. // Try again.

View File

@ -125,6 +125,7 @@ func TestList(t *testing.T) {
if err != nil { if err != nil {
t.Errorf("Unexpected error %v", err) t.Errorf("Unexpected error %v", err)
} }
if e, a := list.Items, got.Items; !reflect.DeepEqual(e, a) { if e, a := list.Items, got.Items; !reflect.DeepEqual(e, a) {
t.Errorf("Expected %#v, got %#v", e, a) t.Errorf("Expected %#v, got %#v", e, a)
} }

View File

@ -0,0 +1,69 @@
/*
Copyright 2014 The Kubernetes Authors All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package testing
// You can use cfssl tool to generate certificates, please refer
// https://github.com/coreos/etcd/tree/master/hack/tls-setup for more details.
const CAFileContent = `
-----BEGIN CERTIFICATE-----
MIIDADCCAoWgAwIBAgIUPyoDaJMWija/6scZvsZIcPjyjqswCgYIKoZIzj0EAwMw
gawxCzAJBgNVBAYTAlVTMSowKAYDVQQKEyFIb25lc3QgQWNobWVkJ3MgVXNlZCBD
ZXJ0aWZpY2F0ZXMxKTAnBgNVBAsTIEhhc3RpbHktR2VuZXJhdGVkIFZhbHVlcyBE
aXZpc29uMRYwFAYDVQQHEw1TYW4gRnJhbmNpc2NvMRMwEQYDVQQIEwpDYWxpZm9y
bmlhMRkwFwYDVQQDExBBdXRvZ2VuZXJhdGVkIENBMB4XDTE2MDIyNTEwMzYwMFoX
DTIxMDIyMzEwMzYwMFowgawxCzAJBgNVBAYTAlVTMSowKAYDVQQKEyFIb25lc3Qg
QWNobWVkJ3MgVXNlZCBDZXJ0aWZpY2F0ZXMxKTAnBgNVBAsTIEhhc3RpbHktR2Vu
ZXJhdGVkIFZhbHVlcyBEaXZpc29uMRYwFAYDVQQHEw1TYW4gRnJhbmNpc2NvMRMw
EQYDVQQIEwpDYWxpZm9ybmlhMRkwFwYDVQQDExBBdXRvZ2VuZXJhdGVkIENBMHYw
EAYHKoZIzj0CAQYFK4EEACIDYgAEUjwvCgZPvo11v8qf+rBQRdYAt6IdMZJd3t1g
DjSySvBc3b+1WmCe911D/Zdwm/s83M4EQm8qUMeMvt60IqKIBZ6BDBbZdqQRycxJ
DuQwgyyYHQl5G52EDSJx//U1OkrOo2YwZDAOBgNVHQ8BAf8EBAMCAQYwEgYDVR0T
AQH/BAgwBgEB/wIBAjAdBgNVHQ4EFgQUp0oCeNg5O4HvABVa7/iJuLDkjAwwHwYD
VR0jBBgwFoAUp0oCeNg5O4HvABVa7/iJuLDkjAwwCgYIKoZIzj0EAwMDaQAwZgIx
AMuY6J2q53uFus7mZTEfWERXoUrTSvj2DEV+6MrmGD8VW2YaTwIGM0qzKlamb1QJ
rQIxAKtbXrfYzAjKBnrhdLD0kgf06pTQkIqBHj4zLen2K4NnVJWCSsKMua8FG+zP
jqvi0Q==
-----END CERTIFICATE-----
`
const CertFileContent = `
-----BEGIN CERTIFICATE-----
MIIC0zCCAlmgAwIBAgIUHXuZ3c9/pnUh2AOW3iiypCQpYEMwCgYIKoZIzj0EAwMw
gawxCzAJBgNVBAYTAlVTMSowKAYDVQQKEyFIb25lc3QgQWNobWVkJ3MgVXNlZCBD
ZXJ0aWZpY2F0ZXMxKTAnBgNVBAsTIEhhc3RpbHktR2VuZXJhdGVkIFZhbHVlcyBE
aXZpc29uMRYwFAYDVQQHEw1TYW4gRnJhbmNpc2NvMRMwEQYDVQQIEwpDYWxpZm9y
bmlhMRkwFwYDVQQDExBBdXRvZ2VuZXJhdGVkIENBMB4XDTE2MDIyNTEwMzYwMFoX
DTE3MDIyNDEwMzYwMFowVTEWMBQGA1UEChMNYXV0b2dlbmVyYXRlZDEVMBMGA1UE
CxMMZXRjZCBjbHVzdGVyMRUwEwYDVQQHEwx0aGUgaW50ZXJuZXQxDTALBgNVBAMT
BGV0Y2QwdjAQBgcqhkjOPQIBBgUrgQQAIgNiAAQ9HJgNWxMIrnns2+Sb8FUj9RBA
Fk/qP9cExp+FmbnjnOUy2poK5pGDdr88TMUAXyzV7J/rbTo6pDmWLWMEcbIgqfWY
W6BRmAaPWuQNLsP/L2k2N3NHvHZfCZK+efuDCGGjgZEwgY4wDgYDVR0PAQH/BAQD
AgWgMB0GA1UdJQQWMBQGCCsGAQUFBwMBBggrBgEFBQcDAjAMBgNVHRMBAf8EAjAA
MB0GA1UdDgQWBBQCFVaETH9zYZaw5OTHh8uuq4K84DAfBgNVHSMEGDAWgBSnSgJ4
2Dk7ge8AFVrv+Im4sOSMDDAPBgNVHREECDAGhwR/AAABMAoGCCqGSM49BAMDA2gA
MGUCMQC9+42ftFzkhujW6lTKZhf/rW3IyNmm5jXTS+RCPPB7jMqkH4Llq2PWgBR8
YQ8keX4CMGb6h8CLsgmFeVjpRjBTBuFSPSf3DQPPG2BkxPhtFek31g9lpSOjkioZ
fKv0Kz+tUw==
-----END CERTIFICATE-----
`
const KeyFileContent = `
-----BEGIN EC PRIVATE KEY-----
MIGkAgEBBDBkmx3mD+yd/qh6WYBTUAFbHZLHKrBv6o4H2AnSfx2HiMQoPm+elwhR
xhWa/tV+8zCgBwYFK4EEACKhZANiAAQ9HJgNWxMIrnns2+Sb8FUj9RBAFk/qP9cE
xp+FmbnjnOUy2poK5pGDdr88TMUAXyzV7J/rbTo6pDmWLWMEcbIgqfWYW6BRmAaP
WuQNLsP/L2k2N3NHvHZfCZK+efuDCGE=
-----END EC PRIVATE KEY-----
`

View File

@ -23,6 +23,7 @@ import (
"net/http" "net/http"
"net/http/httptest" "net/http/httptest"
"os" "os"
"path"
"testing" "testing"
"time" "time"
@ -42,6 +43,11 @@ type EtcdTestServer struct {
PeerListeners, ClientListeners []net.Listener PeerListeners, ClientListeners []net.Listener
Client etcd.Client Client etcd.Client
CertificatesDir string
CertFile string
KeyFile string
CAFile string
raftHandler http.Handler raftHandler http.Handler
s *etcdserver.EtcdServer s *etcdserver.EtcdServer
hss []*httptest.Server hss []*httptest.Server
@ -56,6 +62,39 @@ func newLocalListener(t *testing.T) net.Listener {
return l return l
} }
// newSecuredLocalListener opens a port localhost using any port
// with SSL enable
func newSecuredLocalListener(t *testing.T, certFile, keyFile, caFile string) net.Listener {
var l net.Listener
l, err := net.Listen("tcp", "127.0.0.1:0")
if err != nil {
t.Fatal(err)
}
tlsInfo := transport.TLSInfo{
CertFile: certFile,
KeyFile: keyFile,
CAFile: caFile,
}
l, err = transport.NewKeepAliveListener(l, "https", tlsInfo)
if err != nil {
t.Fatal(err)
}
return l
}
func newHttpTransport(t *testing.T, certFile, keyFile, caFile string) etcd.CancelableTransport {
tlsInfo := transport.TLSInfo{
CertFile: certFile,
KeyFile: keyFile,
CAFile: caFile,
}
tr, err := transport.NewTransport(tlsInfo)
if err != nil {
t.Fatal(err)
}
return tr
}
// configureTestCluster will set the params to start an etcd server // configureTestCluster will set the params to start an etcd server
func configureTestCluster(t *testing.T, name string) *EtcdTestServer { func configureTestCluster(t *testing.T, name string) *EtcdTestServer {
var err error var err error
@ -68,9 +107,26 @@ func configureTestCluster(t *testing.T, name string) *EtcdTestServer {
t.Fatal(err) t.Fatal(err)
} }
cln := newLocalListener(t) m.CertificatesDir, err = ioutil.TempDir(os.TempDir(), "etcd_certificates")
if err != nil {
t.Fatal(err)
}
m.CertFile = path.Join(m.CertificatesDir, "etcdcert.pem")
if err = ioutil.WriteFile(m.CertFile, []byte(CertFileContent), 0644); err != nil {
t.Fatal(err)
}
m.KeyFile = path.Join(m.CertificatesDir, "etcdkey.pem")
if err = ioutil.WriteFile(m.KeyFile, []byte(KeyFileContent), 0644); err != nil {
t.Fatal(err)
}
m.CAFile = path.Join(m.CertificatesDir, "ca.pem")
if err = ioutil.WriteFile(m.CAFile, []byte(CAFileContent), 0644); err != nil {
t.Fatal(err)
}
cln := newSecuredLocalListener(t, m.CertFile, m.KeyFile, m.CAFile)
m.ClientListeners = []net.Listener{cln} m.ClientListeners = []net.Listener{cln}
m.ClientURLs, err = types.NewURLs([]string{"http://" + cln.Addr().String()}) m.ClientURLs, err = types.NewURLs([]string{"https://" + cln.Addr().String()})
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
@ -161,6 +217,9 @@ func (m *EtcdTestServer) Terminate(t *testing.T) {
if err := os.RemoveAll(m.ServerConfig.DataDir); err != nil { if err := os.RemoveAll(m.ServerConfig.DataDir); err != nil {
t.Fatal(err) t.Fatal(err)
} }
if err := os.RemoveAll(m.CertificatesDir); err != nil {
t.Fatal(err)
}
} }
// NewEtcdTestClientServer creates a new client and server for testing // NewEtcdTestClientServer creates a new client and server for testing
@ -173,6 +232,7 @@ func NewEtcdTestClientServer(t *testing.T) *EtcdTestServer {
} }
cfg := etcd.Config{ cfg := etcd.Config{
Endpoints: server.ClientURLs.StringSlice(), Endpoints: server.ClientURLs.StringSlice(),
Transport: newHttpTransport(t, server.CertFile, server.KeyFile, server.CAFile),
} }
server.Client, err = etcd.New(cfg) server.Client, err = etcd.New(cfg)
if err != nil { if err != nil {