mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-23 19:56:01 +00:00
Merge pull request #21535 from AdoHe/restore_secure_etcd
restore ability to run against secured etcd
This commit is contained in:
commit
532ba5a3c6
@ -31,6 +31,7 @@ import (
|
||||
"k8s.io/kubernetes/pkg/genericapiserver"
|
||||
kubeletclient "k8s.io/kubernetes/pkg/kubelet/client"
|
||||
"k8s.io/kubernetes/pkg/master/ports"
|
||||
etcdstorage "k8s.io/kubernetes/pkg/storage/etcd"
|
||||
"k8s.io/kubernetes/pkg/util"
|
||||
utilnet "k8s.io/kubernetes/pkg/util/net"
|
||||
|
||||
@ -57,9 +58,8 @@ type APIServer struct {
|
||||
EnableLogsSupport bool
|
||||
EnableProfiling bool
|
||||
EnableWatchCache bool
|
||||
EtcdPathPrefix string
|
||||
EtcdServerList []string
|
||||
EtcdServersOverrides []string
|
||||
EtcdConfig etcdstorage.EtcdConfig
|
||||
EventTTL time.Duration
|
||||
ExternalHost string
|
||||
KeystoneURL string
|
||||
@ -100,13 +100,15 @@ func NewAPIServer() *APIServer {
|
||||
AuthorizationMode: "AlwaysAllow",
|
||||
DeleteCollectionWorkers: 1,
|
||||
EnableLogsSupport: true,
|
||||
EtcdPathPrefix: genericapiserver.DefaultEtcdPathPrefix,
|
||||
EventTTL: 1 * time.Hour,
|
||||
MasterCount: 1,
|
||||
MasterServiceNamespace: api.NamespaceDefault,
|
||||
RuntimeConfig: make(util.ConfigurationMap),
|
||||
StorageVersions: registered.AllPreferredGroupVersions(),
|
||||
DefaultStorageVersions: registered.AllPreferredGroupVersions(),
|
||||
EtcdConfig: etcdstorage.EtcdConfig{
|
||||
Prefix: genericapiserver.DefaultEtcdPathPrefix,
|
||||
},
|
||||
EventTTL: 1 * time.Hour,
|
||||
MasterCount: 1,
|
||||
MasterServiceNamespace: api.NamespaceDefault,
|
||||
RuntimeConfig: make(util.ConfigurationMap),
|
||||
StorageVersions: registered.AllPreferredGroupVersions(),
|
||||
DefaultStorageVersions: registered.AllPreferredGroupVersions(),
|
||||
KubeletConfig: kubeletclient.KubeletClientConfig{
|
||||
Port: ports.KubeletPort,
|
||||
EnableHttps: true,
|
||||
@ -220,10 +222,13 @@ func (s *APIServer) AddFlags(fs *pflag.FlagSet) {
|
||||
fs.StringVar(&s.AuthorizationConfig.WebhookConfigFile, "authorization-webhook-config-file", s.AuthorizationConfig.WebhookConfigFile, "File with webhook configuration in kubeconfig format, used with --authorization-mode=Webhook. The API server will query the remote service to determine access on the API server's secure port.")
|
||||
fs.StringVar(&s.AdmissionControl, "admission-control", s.AdmissionControl, "Ordered list of plug-ins to do admission control of resources into cluster. Comma-delimited list of: "+strings.Join(admission.GetPlugins(), ", "))
|
||||
fs.StringVar(&s.AdmissionControlConfigFile, "admission-control-config-file", s.AdmissionControlConfigFile, "File with admission control configuration.")
|
||||
fs.StringSliceVar(&s.EtcdServerList, "etcd-servers", s.EtcdServerList, "List of etcd servers to watch (http://ip:port), comma separated. Mutually exclusive with -etcd-config")
|
||||
fs.StringSliceVar(&s.EtcdConfig.ServerList, "etcd-servers", s.EtcdConfig.ServerList, "List of etcd servers to watch (http://ip:port), comma separated. Mutually exclusive with -etcd-config")
|
||||
fs.StringSliceVar(&s.EtcdServersOverrides, "etcd-servers-overrides", s.EtcdServersOverrides, "Per-resource etcd servers overrides, comma separated. The individual override format: group/resource#servers, where servers are http://ip:port, semicolon separated.")
|
||||
fs.StringVar(&s.EtcdPathPrefix, "etcd-prefix", s.EtcdPathPrefix, "The prefix for all resource paths in etcd.")
|
||||
fs.BoolVar(&s.EtcdQuorumRead, "etcd-quorum-read", s.EtcdQuorumRead, "If true, enable quorum read")
|
||||
fs.StringVar(&s.EtcdConfig.Prefix, "etcd-prefix", s.EtcdConfig.Prefix, "The prefix for all resource paths in etcd.")
|
||||
fs.StringVar(&s.EtcdConfig.KeyFile, "etcd-keyfile", s.EtcdConfig.KeyFile, "SSL key file used to secure etcd communication")
|
||||
fs.StringVar(&s.EtcdConfig.CertFile, "etcd-certfile", s.EtcdConfig.CertFile, "SSL certification file used to secure etcd communication")
|
||||
fs.StringVar(&s.EtcdConfig.CAFile, "etcd-cafile", s.EtcdConfig.CAFile, "SSL Certificate Authority file used to secure etcd communication")
|
||||
fs.BoolVar(&s.EtcdConfig.Quorum, "etcd-quorum-read", s.EtcdConfig.Quorum, "If true, enable quorum read")
|
||||
fs.StringSliceVar(&s.CorsAllowedOriginList, "cors-allowed-origins", s.CorsAllowedOriginList, "List of allowed origins for CORS, comma separated. An allowed origin can be a regular expression to support subdomain matching. If this list is empty CORS will not be enabled.")
|
||||
fs.BoolVar(&s.AllowPrivileged, "allow-privileged", s.AllowPrivileged, "If true, allow privileged containers.")
|
||||
fs.IPNetVar(&s.ServiceClusterIPRange, "service-cluster-ip-range", s.ServiceClusterIPRange, "A CIDR notation IP range from which to assign service cluster IPs. This must not overlap with any IP ranges assigned to nodes for pods.")
|
||||
|
@ -88,9 +88,9 @@ func verifyClusterIPFlags(s *options.APIServer) {
|
||||
}
|
||||
|
||||
// For testing.
|
||||
type newEtcdFunc func([]string, runtime.NegotiatedSerializer, string, string, string, bool) (storage.Interface, error)
|
||||
type newEtcdFunc func(runtime.NegotiatedSerializer, string, string, etcdstorage.EtcdConfig) (storage.Interface, error)
|
||||
|
||||
func newEtcd(etcdServerList []string, ns runtime.NegotiatedSerializer, storageGroupVersionString, memoryGroupVersionString, pathPrefix string, quorum bool) (etcdStorage storage.Interface, err error) {
|
||||
func newEtcd(ns runtime.NegotiatedSerializer, storageGroupVersionString, memoryGroupVersionString string, etcdConfig etcdstorage.EtcdConfig) (etcdStorage storage.Interface, err error) {
|
||||
if storageGroupVersionString == "" {
|
||||
return etcdStorage, fmt.Errorf("storageVersion is required to create a etcd storage")
|
||||
}
|
||||
@ -103,10 +103,8 @@ func newEtcd(etcdServerList []string, ns runtime.NegotiatedSerializer, storageGr
|
||||
return nil, fmt.Errorf("couldn't understand memory version %v: %v", memoryGroupVersionString, err)
|
||||
}
|
||||
|
||||
var storageConfig etcdstorage.EtcdConfig
|
||||
storageConfig.ServerList = etcdServerList
|
||||
storageConfig.Prefix = pathPrefix
|
||||
storageConfig.Quorum = quorum
|
||||
var storageConfig etcdstorage.EtcdStorageConfig
|
||||
storageConfig.Config = etcdConfig
|
||||
s, ok := ns.SerializerForMediaType("application/json", nil)
|
||||
if !ok {
|
||||
return nil, fmt.Errorf("unable to find serializer for JSON")
|
||||
@ -128,7 +126,7 @@ func newEtcd(etcdServerList []string, ns runtime.NegotiatedSerializer, storageGr
|
||||
}
|
||||
|
||||
// parse the value of --etcd-servers-overrides and update given storageDestinations.
|
||||
func updateEtcdOverrides(overrides []string, storageVersions map[string]string, prefix string, quorum bool, storageDestinations *genericapiserver.StorageDestinations, newEtcdFn newEtcdFunc) {
|
||||
func updateEtcdOverrides(overrides []string, storageVersions map[string]string, etcdConfig etcdstorage.EtcdConfig, storageDestinations *genericapiserver.StorageDestinations, newEtcdFn newEtcdFunc) {
|
||||
if len(overrides) == 0 {
|
||||
return
|
||||
}
|
||||
@ -157,11 +155,13 @@ func updateEtcdOverrides(overrides []string, storageVersions map[string]string,
|
||||
}
|
||||
|
||||
servers := strings.Split(tokens[1], ";")
|
||||
overrideEtcdConfig := etcdConfig
|
||||
overrideEtcdConfig.ServerList = servers
|
||||
// Note, internalGV will be wrong for things like batch or
|
||||
// autoscalers, but they shouldn't be using the override
|
||||
// storage.
|
||||
internalGV := apigroup.GroupVersion.Group + "/__internal"
|
||||
etcdOverrideStorage, err := newEtcdFn(servers, api.Codecs, storageVersions[apigroup.GroupVersion.Group], internalGV, prefix, quorum)
|
||||
etcdOverrideStorage, err := newEtcdFn(api.Codecs, storageVersions[apigroup.GroupVersion.Group], internalGV, overrideEtcdConfig)
|
||||
if err != nil {
|
||||
glog.Fatalf("Invalid storage version or misconfigured etcd for %s: %v", tokens[0], err)
|
||||
}
|
||||
@ -187,7 +187,7 @@ func Run(s *options.APIServer) error {
|
||||
}
|
||||
glog.Infof("Will report %v as public IP address.", s.AdvertiseAddress)
|
||||
|
||||
if len(s.EtcdServerList) == 0 {
|
||||
if len(s.EtcdConfig.ServerList) == 0 {
|
||||
glog.Fatalf("--etcd-servers must be specified")
|
||||
}
|
||||
|
||||
@ -286,7 +286,7 @@ func Run(s *options.APIServer) error {
|
||||
if _, found := storageVersions[legacyV1Group.GroupVersion.Group]; !found {
|
||||
glog.Fatalf("Couldn't find the storage version for group: %q in storageVersions: %v", legacyV1Group.GroupVersion.Group, storageVersions)
|
||||
}
|
||||
etcdStorage, err := newEtcd(s.EtcdServerList, api.Codecs, storageVersions[legacyV1Group.GroupVersion.Group], "/__internal", s.EtcdPathPrefix, s.EtcdQuorumRead)
|
||||
etcdStorage, err := newEtcd(api.Codecs, storageVersions[legacyV1Group.GroupVersion.Group], "/__internal", s.EtcdConfig)
|
||||
if err != nil {
|
||||
glog.Fatalf("Invalid storage version or misconfigured etcd: %v", err)
|
||||
}
|
||||
@ -301,7 +301,7 @@ func Run(s *options.APIServer) error {
|
||||
if _, found := storageVersions[expGroup.GroupVersion.Group]; !found {
|
||||
glog.Fatalf("Couldn't find the storage version for group: %q in storageVersions: %v", expGroup.GroupVersion.Group, storageVersions)
|
||||
}
|
||||
expEtcdStorage, err := newEtcd(s.EtcdServerList, api.Codecs, storageVersions[expGroup.GroupVersion.Group], "extensions/__internal", s.EtcdPathPrefix, s.EtcdQuorumRead)
|
||||
expEtcdStorage, err := newEtcd(api.Codecs, storageVersions[expGroup.GroupVersion.Group], "extensions/__internal", s.EtcdConfig)
|
||||
if err != nil {
|
||||
glog.Fatalf("Invalid extensions storage version or misconfigured etcd: %v", err)
|
||||
}
|
||||
@ -337,7 +337,7 @@ func Run(s *options.APIServer) error {
|
||||
glog.Fatalf("The storage version for autoscaling must be either 'autoscaling/v1' or 'extensions/v1beta1'")
|
||||
}
|
||||
glog.Infof("Using %v for autoscaling group storage version", storageGroupVersion)
|
||||
autoscalingEtcdStorage, err := newEtcd(s.EtcdServerList, api.Codecs, storageGroupVersion, "extensions/__internal", s.EtcdPathPrefix, s.EtcdQuorumRead)
|
||||
autoscalingEtcdStorage, err := newEtcd(api.Codecs, storageGroupVersion, "extensions/__internal", s.EtcdConfig)
|
||||
if err != nil {
|
||||
glog.Fatalf("Invalid extensions storage version or misconfigured etcd: %v", err)
|
||||
}
|
||||
@ -364,14 +364,14 @@ func Run(s *options.APIServer) error {
|
||||
glog.Fatalf("The storage version for batch must be either 'batch/v1' or 'extensions/v1beta1'")
|
||||
}
|
||||
glog.Infof("Using %v for batch group storage version", storageGroupVersion)
|
||||
batchEtcdStorage, err := newEtcd(s.EtcdServerList, api.Codecs, storageGroupVersion, "extensions/__internal", s.EtcdPathPrefix, s.EtcdQuorumRead)
|
||||
batchEtcdStorage, err := newEtcd(api.Codecs, storageGroupVersion, "extensions/__internal", s.EtcdConfig)
|
||||
if err != nil {
|
||||
glog.Fatalf("Invalid extensions storage version or misconfigured etcd: %v", err)
|
||||
}
|
||||
storageDestinations.AddAPIGroup(batch.GroupName, batchEtcdStorage)
|
||||
}
|
||||
|
||||
updateEtcdOverrides(s.EtcdServersOverrides, storageVersions, s.EtcdPathPrefix, s.EtcdQuorumRead, &storageDestinations, newEtcd)
|
||||
updateEtcdOverrides(s.EtcdServersOverrides, storageVersions, s.EtcdConfig, &storageDestinations, newEtcd)
|
||||
|
||||
n := s.ServiceClusterIPRange
|
||||
|
||||
|
@ -28,6 +28,7 @@ import (
|
||||
"k8s.io/kubernetes/pkg/genericapiserver"
|
||||
"k8s.io/kubernetes/pkg/runtime"
|
||||
"k8s.io/kubernetes/pkg/storage"
|
||||
etcdstorage "k8s.io/kubernetes/pkg/storage/etcd"
|
||||
)
|
||||
|
||||
func TestLongRunningRequestRegexp(t *testing.T) {
|
||||
@ -100,15 +101,19 @@ func TestUpdateEtcdOverrides(t *testing.T) {
|
||||
}
|
||||
|
||||
for _, test := range testCases {
|
||||
newEtcd := func(serverList []string, _ runtime.NegotiatedSerializer, _, _, _ string, _ bool) (storage.Interface, error) {
|
||||
if !reflect.DeepEqual(test.servers, serverList) {
|
||||
t.Errorf("unexpected server list, expected: %#v, got: %#v", test.servers, serverList)
|
||||
newEtcd := func(_ runtime.NegotiatedSerializer, _, _ string, etcdConfig etcdstorage.EtcdConfig) (storage.Interface, error) {
|
||||
if !reflect.DeepEqual(test.servers, etcdConfig.ServerList) {
|
||||
t.Errorf("unexpected server list, expected: %#v, got: %#v", test.servers, etcdConfig.ServerList)
|
||||
}
|
||||
return nil, nil
|
||||
}
|
||||
storageDestinations := genericapiserver.NewStorageDestinations()
|
||||
override := test.apigroup + "/" + test.resource + "#" + strings.Join(test.servers, ";")
|
||||
updateEtcdOverrides([]string{override}, storageVersions, "", false, &storageDestinations, newEtcd)
|
||||
defaultEtcdConfig := etcdstorage.EtcdConfig{
|
||||
Prefix: genericapiserver.DefaultEtcdPathPrefix,
|
||||
ServerList: []string{"http://127.0.0.1"},
|
||||
}
|
||||
updateEtcdOverrides([]string{override}, storageVersions, defaultEtcdConfig, &storageDestinations, newEtcd)
|
||||
apigroup, ok := storageDestinations.APIGroups[test.apigroup]
|
||||
if !ok {
|
||||
t.Errorf("apigroup: %s not created", test.apigroup)
|
||||
|
@ -67,6 +67,9 @@ kube-apiserver
|
||||
--cloud-provider="": The provider for cloud services. Empty string for no provider.
|
||||
--cors-allowed-origins=[]: List of allowed origins for CORS, comma separated. An allowed origin can be a regular expression to support subdomain matching. If this list is empty CORS will not be enabled.
|
||||
--delete-collection-workers=1: Number of workers spawned for DeleteCollection call. These are used to speed up namespace cleanup.
|
||||
--etcd-cafile="": SSL Certificate Authority file used to secure etcd communication
|
||||
--etcd-certfile="": SSL certification file used to secure etcd communication
|
||||
--etcd-keyfile="": SSL key file used to secure etcd communication
|
||||
--etcd-prefix="/registry": The prefix for all resource paths in etcd.
|
||||
--etcd-quorum-read[=false]: If true, enable quorum read
|
||||
--etcd-servers=[]: List of etcd servers to watch (http://ip:port), comma separated. Mutually exclusive with -etcd-config
|
||||
@ -112,7 +115,7 @@ kube-apiserver
|
||||
--watch-cache-sizes=[]: List of watch cache sizes for every resource (pods, nodes, etc.), comma separated. The individual override format: resource#size, where size is a number. It takes effect when watch-cache is enabled.
|
||||
```
|
||||
|
||||
###### Auto generated by spf13/cobra on 24-Feb-2016
|
||||
###### Auto generated by spf13/cobra on 6-Mar-2016
|
||||
|
||||
|
||||
<!-- BEGIN MUNGE: GENERATED_ANALYTICS -->
|
||||
|
@ -34,9 +34,11 @@ import (
|
||||
|
||||
func newStorageDestinations(groupName string, groupMeta *apimachinery.GroupMeta) (*genericapiserver.StorageDestinations, error) {
|
||||
storageDestinations := genericapiserver.NewStorageDestinations()
|
||||
var storageConfig etcdstorage.EtcdConfig
|
||||
storageConfig.ServerList = []string{"http://127.0.0.1:4001"}
|
||||
storageConfig.Prefix = genericapiserver.DefaultEtcdPathPrefix
|
||||
var storageConfig etcdstorage.EtcdStorageConfig
|
||||
storageConfig.Config = etcdstorage.EtcdConfig{
|
||||
Prefix: genericapiserver.DefaultEtcdPathPrefix,
|
||||
ServerList: []string{"http://127.0.0.1:4001"},
|
||||
}
|
||||
storageConfig.Codec = groupMeta.Codec
|
||||
storageInterface, err := storageConfig.NewStorage()
|
||||
if err != nil {
|
||||
|
@ -104,6 +104,9 @@ etcd-quorum-read
|
||||
etcd-server
|
||||
etcd-servers
|
||||
etcd-servers-overrides
|
||||
etcd-keyfile
|
||||
etcd-certfile
|
||||
etcd-cafile
|
||||
event-burst
|
||||
event-qps
|
||||
event-ttl
|
||||
|
@ -30,7 +30,6 @@ type ServerRunOptions struct {
|
||||
BindAddress net.IP
|
||||
CertDirectory string
|
||||
ClientCAFile string
|
||||
EtcdQuorumRead bool
|
||||
InsecureBindAddress net.IP
|
||||
InsecurePort int
|
||||
LongRunningRequestRE string
|
||||
|
@ -37,69 +37,110 @@ import (
|
||||
"k8s.io/kubernetes/pkg/watch"
|
||||
|
||||
etcd "github.com/coreos/etcd/client"
|
||||
"github.com/coreos/etcd/pkg/transport"
|
||||
"github.com/golang/glog"
|
||||
"golang.org/x/net/context"
|
||||
)
|
||||
|
||||
// storage.Config object for etcd.
|
||||
type EtcdConfig struct {
|
||||
ServerList []string
|
||||
Codec runtime.Codec
|
||||
Prefix string
|
||||
Quorum bool
|
||||
type EtcdStorageConfig struct {
|
||||
Config EtcdConfig
|
||||
Codec runtime.Codec
|
||||
}
|
||||
|
||||
// implements storage.Config
|
||||
func (c *EtcdConfig) GetType() string {
|
||||
func (c *EtcdStorageConfig) GetType() string {
|
||||
return "etcd"
|
||||
}
|
||||
|
||||
// implements storage.Config
|
||||
func (c *EtcdConfig) NewStorage() (storage.Interface, error) {
|
||||
cfg := etcd.Config{
|
||||
Endpoints: c.ServerList,
|
||||
// TODO: Determine if transport needs optimization
|
||||
Transport: &http.Transport{
|
||||
Proxy: http.ProxyFromEnvironment,
|
||||
Dial: (&net.Dialer{
|
||||
Timeout: 30 * time.Second,
|
||||
KeepAlive: 30 * time.Second,
|
||||
}).Dial,
|
||||
TLSHandshakeTimeout: 10 * time.Second,
|
||||
MaxIdleConnsPerHost: 500,
|
||||
},
|
||||
}
|
||||
etcdClient, err := etcd.New(cfg)
|
||||
func (c *EtcdStorageConfig) NewStorage() (storage.Interface, error) {
|
||||
etcdClient, err := c.Config.newEtcdClient()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return NewEtcdStorage(etcdClient, c.Codec, c.Prefix, c.Quorum), nil
|
||||
return NewEtcdStorage(etcdClient, c.Codec, c.Config.Prefix, c.Config.Quorum), nil
|
||||
}
|
||||
|
||||
// Configuration object for constructing etcd.Config
|
||||
type EtcdConfig struct {
|
||||
Prefix string
|
||||
ServerList []string
|
||||
KeyFile string
|
||||
CertFile string
|
||||
CAFile string
|
||||
Quorum bool
|
||||
}
|
||||
|
||||
func (c *EtcdConfig) newEtcdClient() (etcd.Client, error) {
|
||||
t, err := c.newHttpTransport()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
cli, err := etcd.New(etcd.Config{
|
||||
Endpoints: c.ServerList,
|
||||
Transport: t,
|
||||
})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return cli, nil
|
||||
}
|
||||
|
||||
func (c *EtcdConfig) newHttpTransport() (*http.Transport, error) {
|
||||
info := transport.TLSInfo{
|
||||
CertFile: c.CertFile,
|
||||
KeyFile: c.KeyFile,
|
||||
CAFile: c.CAFile,
|
||||
}
|
||||
cfg, err := info.ClientConfig()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// Copied from etcd.DefaultTransport declaration.
|
||||
// TODO: Determine if transport needs optimization
|
||||
tr := &http.Transport{
|
||||
Proxy: http.ProxyFromEnvironment,
|
||||
Dial: (&net.Dialer{
|
||||
Timeout: 30 * time.Second,
|
||||
KeepAlive: 30 * time.Second,
|
||||
}).Dial,
|
||||
TLSHandshakeTimeout: 10 * time.Second,
|
||||
MaxIdleConnsPerHost: 500,
|
||||
TLSClientConfig: cfg,
|
||||
}
|
||||
|
||||
return tr, nil
|
||||
}
|
||||
|
||||
// Creates a new storage interface from the client
|
||||
// TODO: deprecate in favor of storage.Config abstraction over time
|
||||
func NewEtcdStorage(client etcd.Client, codec runtime.Codec, prefix string, quorum bool) storage.Interface {
|
||||
return &etcdHelper{
|
||||
etcdclient: client,
|
||||
client: etcd.NewKeysAPI(client),
|
||||
codec: codec,
|
||||
versioner: APIObjectVersioner{},
|
||||
copier: api.Scheme,
|
||||
pathPrefix: path.Join("/", prefix),
|
||||
quorum: quorum,
|
||||
cache: util.NewCache(maxEtcdCacheEntries),
|
||||
etcdMembersAPI: etcd.NewMembersAPI(client),
|
||||
etcdKeysAPI: etcd.NewKeysAPI(client),
|
||||
codec: codec,
|
||||
versioner: APIObjectVersioner{},
|
||||
copier: api.Scheme,
|
||||
pathPrefix: path.Join("/", prefix),
|
||||
quorum: quorum,
|
||||
cache: util.NewCache(maxEtcdCacheEntries),
|
||||
}
|
||||
}
|
||||
|
||||
// etcdHelper is the reference implementation of storage.Interface.
|
||||
type etcdHelper struct {
|
||||
etcdclient etcd.Client
|
||||
client etcd.KeysAPI
|
||||
codec runtime.Codec
|
||||
copier runtime.ObjectCopier
|
||||
etcdMembersAPI etcd.MembersAPI
|
||||
etcdKeysAPI etcd.KeysAPI
|
||||
codec runtime.Codec
|
||||
copier runtime.ObjectCopier
|
||||
// Note that versioner is required for etcdHelper to work correctly.
|
||||
// The public constructors (NewStorage & NewEtcdStorage) are setting it
|
||||
// correctly, so be careful when manipulating with it manually.
|
||||
// optional, has to be set to perform any atomic operations
|
||||
versioner storage.Versioner
|
||||
// prefix for all etcd keys
|
||||
pathPrefix string
|
||||
@ -130,8 +171,7 @@ func (h *etcdHelper) Backends(ctx context.Context) []string {
|
||||
if ctx == nil {
|
||||
glog.Errorf("Context is nil")
|
||||
}
|
||||
membersAPI := etcd.NewMembersAPI(h.etcdclient)
|
||||
members, err := membersAPI.List(ctx)
|
||||
members, err := h.etcdMembersAPI.List(ctx)
|
||||
if err != nil {
|
||||
glog.Errorf("Error obtaining etcd members list: %q", err)
|
||||
return nil
|
||||
@ -171,7 +211,7 @@ func (h *etcdHelper) Create(ctx context.Context, key string, obj, out runtime.Ob
|
||||
TTL: time.Duration(ttl) * time.Second,
|
||||
PrevExist: etcd.PrevNoExist,
|
||||
}
|
||||
response, err := h.client.Set(ctx, key, string(data), &opts)
|
||||
response, err := h.etcdKeysAPI.Set(ctx, key, string(data), &opts)
|
||||
metrics.RecordEtcdRequestLatency("create", getTypeName(obj), startTime)
|
||||
trace.Step("Object created")
|
||||
if err != nil {
|
||||
@ -219,7 +259,7 @@ func (h *etcdHelper) Set(ctx context.Context, key string, obj, out runtime.Objec
|
||||
TTL: time.Duration(ttl) * time.Second,
|
||||
PrevIndex: version,
|
||||
}
|
||||
response, err = h.client.Set(ctx, key, string(data), &opts)
|
||||
response, err = h.etcdKeysAPI.Set(ctx, key, string(data), &opts)
|
||||
metrics.RecordEtcdRequestLatency("compareAndSwap", getTypeName(obj), startTime)
|
||||
if err != nil {
|
||||
return err
|
||||
@ -232,7 +272,7 @@ func (h *etcdHelper) Set(ctx context.Context, key string, obj, out runtime.Objec
|
||||
TTL: time.Duration(ttl) * time.Second,
|
||||
PrevExist: etcd.PrevNoExist,
|
||||
}
|
||||
response, err = h.client.Set(ctx, key, string(data), &opts)
|
||||
response, err = h.etcdKeysAPI.Set(ctx, key, string(data), &opts)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
@ -260,7 +300,7 @@ func (h *etcdHelper) Delete(ctx context.Context, key string, out runtime.Object)
|
||||
}
|
||||
|
||||
startTime := time.Now()
|
||||
response, err := h.client.Delete(ctx, key, nil)
|
||||
response, err := h.etcdKeysAPI.Delete(ctx, key, nil)
|
||||
metrics.RecordEtcdRequestLatency("delete", getTypeName(out), startTime)
|
||||
if !etcdutil.IsEtcdNotFound(err) {
|
||||
// if the object that existed prior to the delete is returned by etcd, update out.
|
||||
@ -282,7 +322,7 @@ func (h *etcdHelper) Watch(ctx context.Context, key string, resourceVersion stri
|
||||
}
|
||||
key = h.prefixEtcdKey(key)
|
||||
w := newEtcdWatcher(false, h.quorum, nil, filter, h.codec, h.versioner, nil, h)
|
||||
go w.etcdWatch(ctx, h.client, key, watchRV)
|
||||
go w.etcdWatch(ctx, h.etcdKeysAPI, key, watchRV)
|
||||
return w, nil
|
||||
}
|
||||
|
||||
@ -297,7 +337,7 @@ func (h *etcdHelper) WatchList(ctx context.Context, key string, resourceVersion
|
||||
}
|
||||
key = h.prefixEtcdKey(key)
|
||||
w := newEtcdWatcher(true, h.quorum, exceptKey(key), filter, h.codec, h.versioner, nil, h)
|
||||
go w.etcdWatch(ctx, h.client, key, watchRV)
|
||||
go w.etcdWatch(ctx, h.etcdKeysAPI, key, watchRV)
|
||||
return w, nil
|
||||
}
|
||||
|
||||
@ -323,7 +363,7 @@ func (h *etcdHelper) bodyAndExtractObj(ctx context.Context, key string, objPtr r
|
||||
Quorum: h.quorum,
|
||||
}
|
||||
|
||||
response, err := h.client.Get(ctx, key, opts)
|
||||
response, err := h.etcdKeysAPI.Get(ctx, key, opts)
|
||||
metrics.RecordEtcdRequestLatency("get", getTypeName(objPtr), startTime)
|
||||
|
||||
if err != nil && !etcdutil.IsEtcdNotFound(err) {
|
||||
@ -384,7 +424,7 @@ func (h *etcdHelper) GetToList(ctx context.Context, key string, filter storage.F
|
||||
opts := &etcd.GetOptions{
|
||||
Quorum: h.quorum,
|
||||
}
|
||||
response, err := h.client.Get(ctx, key, opts)
|
||||
response, err := h.etcdKeysAPI.Get(ctx, key, opts)
|
||||
|
||||
metrics.RecordEtcdRequestLatency("get", getTypeName(listPtr), startTime)
|
||||
trace.Step("Etcd node read")
|
||||
@ -489,7 +529,7 @@ func (h *etcdHelper) listEtcdNode(ctx context.Context, key string) ([]*etcd.Node
|
||||
Sort: true,
|
||||
Quorum: h.quorum,
|
||||
}
|
||||
result, err := h.client.Get(ctx, key, &opts)
|
||||
result, err := h.etcdKeysAPI.Get(ctx, key, &opts)
|
||||
if err != nil {
|
||||
var index uint64
|
||||
if etcdError, ok := err.(etcd.Error); ok {
|
||||
@ -575,7 +615,7 @@ func (h *etcdHelper) GuaranteedUpdate(ctx context.Context, key string, ptrToType
|
||||
TTL: time.Duration(ttl) * time.Second,
|
||||
PrevExist: etcd.PrevNoExist,
|
||||
}
|
||||
response, err := h.client.Set(ctx, key, string(data), &opts)
|
||||
response, err := h.etcdKeysAPI.Set(ctx, key, string(data), &opts)
|
||||
metrics.RecordEtcdRequestLatency("create", getTypeName(ptrToType), startTime)
|
||||
if etcdutil.IsEtcdNodeExist(err) {
|
||||
continue
|
||||
@ -598,7 +638,7 @@ func (h *etcdHelper) GuaranteedUpdate(ctx context.Context, key string, ptrToType
|
||||
PrevIndex: index,
|
||||
TTL: time.Duration(ttl) * time.Second,
|
||||
}
|
||||
response, err := h.client.Set(ctx, key, string(data), &opts)
|
||||
response, err := h.etcdKeysAPI.Set(ctx, key, string(data), &opts)
|
||||
metrics.RecordEtcdRequestLatency("compareAndSwap", getTypeName(ptrToType), startTime)
|
||||
if etcdutil.IsEtcdTestFailed(err) {
|
||||
// Try again.
|
||||
|
@ -125,6 +125,7 @@ func TestList(t *testing.T) {
|
||||
if err != nil {
|
||||
t.Errorf("Unexpected error %v", err)
|
||||
}
|
||||
|
||||
if e, a := list.Items, got.Items; !reflect.DeepEqual(e, a) {
|
||||
t.Errorf("Expected %#v, got %#v", e, a)
|
||||
}
|
||||
|
69
pkg/storage/etcd/testing/certificates.go
Normal file
69
pkg/storage/etcd/testing/certificates.go
Normal file
@ -0,0 +1,69 @@
|
||||
/*
|
||||
Copyright 2014 The Kubernetes Authors All rights reserved.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package testing
|
||||
|
||||
// You can use cfssl tool to generate certificates, please refer
|
||||
// https://github.com/coreos/etcd/tree/master/hack/tls-setup for more details.
|
||||
const CAFileContent = `
|
||||
-----BEGIN CERTIFICATE-----
|
||||
MIIDADCCAoWgAwIBAgIUPyoDaJMWija/6scZvsZIcPjyjqswCgYIKoZIzj0EAwMw
|
||||
gawxCzAJBgNVBAYTAlVTMSowKAYDVQQKEyFIb25lc3QgQWNobWVkJ3MgVXNlZCBD
|
||||
ZXJ0aWZpY2F0ZXMxKTAnBgNVBAsTIEhhc3RpbHktR2VuZXJhdGVkIFZhbHVlcyBE
|
||||
aXZpc29uMRYwFAYDVQQHEw1TYW4gRnJhbmNpc2NvMRMwEQYDVQQIEwpDYWxpZm9y
|
||||
bmlhMRkwFwYDVQQDExBBdXRvZ2VuZXJhdGVkIENBMB4XDTE2MDIyNTEwMzYwMFoX
|
||||
DTIxMDIyMzEwMzYwMFowgawxCzAJBgNVBAYTAlVTMSowKAYDVQQKEyFIb25lc3Qg
|
||||
QWNobWVkJ3MgVXNlZCBDZXJ0aWZpY2F0ZXMxKTAnBgNVBAsTIEhhc3RpbHktR2Vu
|
||||
ZXJhdGVkIFZhbHVlcyBEaXZpc29uMRYwFAYDVQQHEw1TYW4gRnJhbmNpc2NvMRMw
|
||||
EQYDVQQIEwpDYWxpZm9ybmlhMRkwFwYDVQQDExBBdXRvZ2VuZXJhdGVkIENBMHYw
|
||||
EAYHKoZIzj0CAQYFK4EEACIDYgAEUjwvCgZPvo11v8qf+rBQRdYAt6IdMZJd3t1g
|
||||
DjSySvBc3b+1WmCe911D/Zdwm/s83M4EQm8qUMeMvt60IqKIBZ6BDBbZdqQRycxJ
|
||||
DuQwgyyYHQl5G52EDSJx//U1OkrOo2YwZDAOBgNVHQ8BAf8EBAMCAQYwEgYDVR0T
|
||||
AQH/BAgwBgEB/wIBAjAdBgNVHQ4EFgQUp0oCeNg5O4HvABVa7/iJuLDkjAwwHwYD
|
||||
VR0jBBgwFoAUp0oCeNg5O4HvABVa7/iJuLDkjAwwCgYIKoZIzj0EAwMDaQAwZgIx
|
||||
AMuY6J2q53uFus7mZTEfWERXoUrTSvj2DEV+6MrmGD8VW2YaTwIGM0qzKlamb1QJ
|
||||
rQIxAKtbXrfYzAjKBnrhdLD0kgf06pTQkIqBHj4zLen2K4NnVJWCSsKMua8FG+zP
|
||||
jqvi0Q==
|
||||
-----END CERTIFICATE-----
|
||||
`
|
||||
const CertFileContent = `
|
||||
-----BEGIN CERTIFICATE-----
|
||||
MIIC0zCCAlmgAwIBAgIUHXuZ3c9/pnUh2AOW3iiypCQpYEMwCgYIKoZIzj0EAwMw
|
||||
gawxCzAJBgNVBAYTAlVTMSowKAYDVQQKEyFIb25lc3QgQWNobWVkJ3MgVXNlZCBD
|
||||
ZXJ0aWZpY2F0ZXMxKTAnBgNVBAsTIEhhc3RpbHktR2VuZXJhdGVkIFZhbHVlcyBE
|
||||
aXZpc29uMRYwFAYDVQQHEw1TYW4gRnJhbmNpc2NvMRMwEQYDVQQIEwpDYWxpZm9y
|
||||
bmlhMRkwFwYDVQQDExBBdXRvZ2VuZXJhdGVkIENBMB4XDTE2MDIyNTEwMzYwMFoX
|
||||
DTE3MDIyNDEwMzYwMFowVTEWMBQGA1UEChMNYXV0b2dlbmVyYXRlZDEVMBMGA1UE
|
||||
CxMMZXRjZCBjbHVzdGVyMRUwEwYDVQQHEwx0aGUgaW50ZXJuZXQxDTALBgNVBAMT
|
||||
BGV0Y2QwdjAQBgcqhkjOPQIBBgUrgQQAIgNiAAQ9HJgNWxMIrnns2+Sb8FUj9RBA
|
||||
Fk/qP9cExp+FmbnjnOUy2poK5pGDdr88TMUAXyzV7J/rbTo6pDmWLWMEcbIgqfWY
|
||||
W6BRmAaPWuQNLsP/L2k2N3NHvHZfCZK+efuDCGGjgZEwgY4wDgYDVR0PAQH/BAQD
|
||||
AgWgMB0GA1UdJQQWMBQGCCsGAQUFBwMBBggrBgEFBQcDAjAMBgNVHRMBAf8EAjAA
|
||||
MB0GA1UdDgQWBBQCFVaETH9zYZaw5OTHh8uuq4K84DAfBgNVHSMEGDAWgBSnSgJ4
|
||||
2Dk7ge8AFVrv+Im4sOSMDDAPBgNVHREECDAGhwR/AAABMAoGCCqGSM49BAMDA2gA
|
||||
MGUCMQC9+42ftFzkhujW6lTKZhf/rW3IyNmm5jXTS+RCPPB7jMqkH4Llq2PWgBR8
|
||||
YQ8keX4CMGb6h8CLsgmFeVjpRjBTBuFSPSf3DQPPG2BkxPhtFek31g9lpSOjkioZ
|
||||
fKv0Kz+tUw==
|
||||
-----END CERTIFICATE-----
|
||||
`
|
||||
const KeyFileContent = `
|
||||
-----BEGIN EC PRIVATE KEY-----
|
||||
MIGkAgEBBDBkmx3mD+yd/qh6WYBTUAFbHZLHKrBv6o4H2AnSfx2HiMQoPm+elwhR
|
||||
xhWa/tV+8zCgBwYFK4EEACKhZANiAAQ9HJgNWxMIrnns2+Sb8FUj9RBAFk/qP9cE
|
||||
xp+FmbnjnOUy2poK5pGDdr88TMUAXyzV7J/rbTo6pDmWLWMEcbIgqfWYW6BRmAaP
|
||||
WuQNLsP/L2k2N3NHvHZfCZK+efuDCGE=
|
||||
-----END EC PRIVATE KEY-----
|
||||
`
|
@ -23,6 +23,7 @@ import (
|
||||
"net/http"
|
||||
"net/http/httptest"
|
||||
"os"
|
||||
"path"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
@ -42,6 +43,11 @@ type EtcdTestServer struct {
|
||||
PeerListeners, ClientListeners []net.Listener
|
||||
Client etcd.Client
|
||||
|
||||
CertificatesDir string
|
||||
CertFile string
|
||||
KeyFile string
|
||||
CAFile string
|
||||
|
||||
raftHandler http.Handler
|
||||
s *etcdserver.EtcdServer
|
||||
hss []*httptest.Server
|
||||
@ -56,6 +62,39 @@ func newLocalListener(t *testing.T) net.Listener {
|
||||
return l
|
||||
}
|
||||
|
||||
// newSecuredLocalListener opens a port localhost using any port
|
||||
// with SSL enable
|
||||
func newSecuredLocalListener(t *testing.T, certFile, keyFile, caFile string) net.Listener {
|
||||
var l net.Listener
|
||||
l, err := net.Listen("tcp", "127.0.0.1:0")
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
tlsInfo := transport.TLSInfo{
|
||||
CertFile: certFile,
|
||||
KeyFile: keyFile,
|
||||
CAFile: caFile,
|
||||
}
|
||||
l, err = transport.NewKeepAliveListener(l, "https", tlsInfo)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
return l
|
||||
}
|
||||
|
||||
func newHttpTransport(t *testing.T, certFile, keyFile, caFile string) etcd.CancelableTransport {
|
||||
tlsInfo := transport.TLSInfo{
|
||||
CertFile: certFile,
|
||||
KeyFile: keyFile,
|
||||
CAFile: caFile,
|
||||
}
|
||||
tr, err := transport.NewTransport(tlsInfo)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
return tr
|
||||
}
|
||||
|
||||
// configureTestCluster will set the params to start an etcd server
|
||||
func configureTestCluster(t *testing.T, name string) *EtcdTestServer {
|
||||
var err error
|
||||
@ -68,9 +107,26 @@ func configureTestCluster(t *testing.T, name string) *EtcdTestServer {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
cln := newLocalListener(t)
|
||||
m.CertificatesDir, err = ioutil.TempDir(os.TempDir(), "etcd_certificates")
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
m.CertFile = path.Join(m.CertificatesDir, "etcdcert.pem")
|
||||
if err = ioutil.WriteFile(m.CertFile, []byte(CertFileContent), 0644); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
m.KeyFile = path.Join(m.CertificatesDir, "etcdkey.pem")
|
||||
if err = ioutil.WriteFile(m.KeyFile, []byte(KeyFileContent), 0644); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
m.CAFile = path.Join(m.CertificatesDir, "ca.pem")
|
||||
if err = ioutil.WriteFile(m.CAFile, []byte(CAFileContent), 0644); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
cln := newSecuredLocalListener(t, m.CertFile, m.KeyFile, m.CAFile)
|
||||
m.ClientListeners = []net.Listener{cln}
|
||||
m.ClientURLs, err = types.NewURLs([]string{"http://" + cln.Addr().String()})
|
||||
m.ClientURLs, err = types.NewURLs([]string{"https://" + cln.Addr().String()})
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
@ -161,6 +217,9 @@ func (m *EtcdTestServer) Terminate(t *testing.T) {
|
||||
if err := os.RemoveAll(m.ServerConfig.DataDir); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if err := os.RemoveAll(m.CertificatesDir); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
}
|
||||
|
||||
// NewEtcdTestClientServer creates a new client and server for testing
|
||||
@ -173,6 +232,7 @@ func NewEtcdTestClientServer(t *testing.T) *EtcdTestServer {
|
||||
}
|
||||
cfg := etcd.Config{
|
||||
Endpoints: server.ClientURLs.StringSlice(),
|
||||
Transport: newHttpTransport(t, server.CertFile, server.KeyFile, server.CAFile),
|
||||
}
|
||||
server.Client, err = etcd.New(cfg)
|
||||
if err != nil {
|
||||
|
Loading…
Reference in New Issue
Block a user