diff --git a/cmd/kube-apiserver/app/options/options.go b/cmd/kube-apiserver/app/options/options.go index 7b7739b79ee..447f5bece65 100644 --- a/cmd/kube-apiserver/app/options/options.go +++ b/cmd/kube-apiserver/app/options/options.go @@ -31,6 +31,7 @@ import ( "k8s.io/kubernetes/pkg/genericapiserver" kubeletclient "k8s.io/kubernetes/pkg/kubelet/client" "k8s.io/kubernetes/pkg/master/ports" + etcdstorage "k8s.io/kubernetes/pkg/storage/etcd" "k8s.io/kubernetes/pkg/util" utilnet "k8s.io/kubernetes/pkg/util/net" @@ -57,9 +58,8 @@ type APIServer struct { EnableLogsSupport bool EnableProfiling bool EnableWatchCache bool - EtcdPathPrefix string - EtcdServerList []string EtcdServersOverrides []string + EtcdConfig etcdstorage.EtcdConfig EventTTL time.Duration ExternalHost string KeystoneURL string @@ -100,13 +100,15 @@ func NewAPIServer() *APIServer { AuthorizationMode: "AlwaysAllow", DeleteCollectionWorkers: 1, EnableLogsSupport: true, - EtcdPathPrefix: genericapiserver.DefaultEtcdPathPrefix, - EventTTL: 1 * time.Hour, - MasterCount: 1, - MasterServiceNamespace: api.NamespaceDefault, - RuntimeConfig: make(util.ConfigurationMap), - StorageVersions: registered.AllPreferredGroupVersions(), - DefaultStorageVersions: registered.AllPreferredGroupVersions(), + EtcdConfig: etcdstorage.EtcdConfig{ + Prefix: genericapiserver.DefaultEtcdPathPrefix, + }, + EventTTL: 1 * time.Hour, + MasterCount: 1, + MasterServiceNamespace: api.NamespaceDefault, + RuntimeConfig: make(util.ConfigurationMap), + StorageVersions: registered.AllPreferredGroupVersions(), + DefaultStorageVersions: registered.AllPreferredGroupVersions(), KubeletConfig: kubeletclient.KubeletClientConfig{ Port: ports.KubeletPort, EnableHttps: true, @@ -220,10 +222,13 @@ func (s *APIServer) AddFlags(fs *pflag.FlagSet) { fs.StringVar(&s.AuthorizationConfig.WebhookConfigFile, "authorization-webhook-config-file", s.AuthorizationConfig.WebhookConfigFile, "File with webhook configuration in kubeconfig format, used with --authorization-mode=Webhook. The API server will query the remote service to determine access on the API server's secure port.") fs.StringVar(&s.AdmissionControl, "admission-control", s.AdmissionControl, "Ordered list of plug-ins to do admission control of resources into cluster. Comma-delimited list of: "+strings.Join(admission.GetPlugins(), ", ")) fs.StringVar(&s.AdmissionControlConfigFile, "admission-control-config-file", s.AdmissionControlConfigFile, "File with admission control configuration.") - fs.StringSliceVar(&s.EtcdServerList, "etcd-servers", s.EtcdServerList, "List of etcd servers to watch (http://ip:port), comma separated. Mutually exclusive with -etcd-config") + fs.StringSliceVar(&s.EtcdConfig.ServerList, "etcd-servers", s.EtcdConfig.ServerList, "List of etcd servers to watch (http://ip:port), comma separated. Mutually exclusive with -etcd-config") fs.StringSliceVar(&s.EtcdServersOverrides, "etcd-servers-overrides", s.EtcdServersOverrides, "Per-resource etcd servers overrides, comma separated. The individual override format: group/resource#servers, where servers are http://ip:port, semicolon separated.") - fs.StringVar(&s.EtcdPathPrefix, "etcd-prefix", s.EtcdPathPrefix, "The prefix for all resource paths in etcd.") - fs.BoolVar(&s.EtcdQuorumRead, "etcd-quorum-read", s.EtcdQuorumRead, "If true, enable quorum read") + fs.StringVar(&s.EtcdConfig.Prefix, "etcd-prefix", s.EtcdConfig.Prefix, "The prefix for all resource paths in etcd.") + fs.StringVar(&s.EtcdConfig.KeyFile, "etcd-keyfile", s.EtcdConfig.KeyFile, "SSL key file used to secure etcd communication") + fs.StringVar(&s.EtcdConfig.CertFile, "etcd-certfile", s.EtcdConfig.CertFile, "SSL certification file used to secure etcd communication") + fs.StringVar(&s.EtcdConfig.CAFile, "etcd-cafile", s.EtcdConfig.CAFile, "SSL Certificate Authority file used to secure etcd communication") + fs.BoolVar(&s.EtcdConfig.Quorum, "etcd-quorum-read", s.EtcdConfig.Quorum, "If true, enable quorum read") fs.StringSliceVar(&s.CorsAllowedOriginList, "cors-allowed-origins", s.CorsAllowedOriginList, "List of allowed origins for CORS, comma separated. An allowed origin can be a regular expression to support subdomain matching. If this list is empty CORS will not be enabled.") fs.BoolVar(&s.AllowPrivileged, "allow-privileged", s.AllowPrivileged, "If true, allow privileged containers.") fs.IPNetVar(&s.ServiceClusterIPRange, "service-cluster-ip-range", s.ServiceClusterIPRange, "A CIDR notation IP range from which to assign service cluster IPs. This must not overlap with any IP ranges assigned to nodes for pods.") diff --git a/cmd/kube-apiserver/app/server.go b/cmd/kube-apiserver/app/server.go index a0db3062a56..651b131e834 100644 --- a/cmd/kube-apiserver/app/server.go +++ b/cmd/kube-apiserver/app/server.go @@ -88,9 +88,9 @@ func verifyClusterIPFlags(s *options.APIServer) { } // For testing. -type newEtcdFunc func([]string, runtime.NegotiatedSerializer, string, string, string, bool) (storage.Interface, error) +type newEtcdFunc func(runtime.NegotiatedSerializer, string, string, etcdstorage.EtcdConfig) (storage.Interface, error) -func newEtcd(etcdServerList []string, ns runtime.NegotiatedSerializer, storageGroupVersionString, memoryGroupVersionString, pathPrefix string, quorum bool) (etcdStorage storage.Interface, err error) { +func newEtcd(ns runtime.NegotiatedSerializer, storageGroupVersionString, memoryGroupVersionString string, etcdConfig etcdstorage.EtcdConfig) (etcdStorage storage.Interface, err error) { if storageGroupVersionString == "" { return etcdStorage, fmt.Errorf("storageVersion is required to create a etcd storage") } @@ -103,10 +103,8 @@ func newEtcd(etcdServerList []string, ns runtime.NegotiatedSerializer, storageGr return nil, fmt.Errorf("couldn't understand memory version %v: %v", memoryGroupVersionString, err) } - var storageConfig etcdstorage.EtcdConfig - storageConfig.ServerList = etcdServerList - storageConfig.Prefix = pathPrefix - storageConfig.Quorum = quorum + var storageConfig etcdstorage.EtcdStorageConfig + storageConfig.Config = etcdConfig s, ok := ns.SerializerForMediaType("application/json", nil) if !ok { return nil, fmt.Errorf("unable to find serializer for JSON") @@ -128,7 +126,7 @@ func newEtcd(etcdServerList []string, ns runtime.NegotiatedSerializer, storageGr } // parse the value of --etcd-servers-overrides and update given storageDestinations. -func updateEtcdOverrides(overrides []string, storageVersions map[string]string, prefix string, quorum bool, storageDestinations *genericapiserver.StorageDestinations, newEtcdFn newEtcdFunc) { +func updateEtcdOverrides(overrides []string, storageVersions map[string]string, etcdConfig etcdstorage.EtcdConfig, storageDestinations *genericapiserver.StorageDestinations, newEtcdFn newEtcdFunc) { if len(overrides) == 0 { return } @@ -157,11 +155,13 @@ func updateEtcdOverrides(overrides []string, storageVersions map[string]string, } servers := strings.Split(tokens[1], ";") + overrideEtcdConfig := etcdConfig + overrideEtcdConfig.ServerList = servers // Note, internalGV will be wrong for things like batch or // autoscalers, but they shouldn't be using the override // storage. internalGV := apigroup.GroupVersion.Group + "/__internal" - etcdOverrideStorage, err := newEtcdFn(servers, api.Codecs, storageVersions[apigroup.GroupVersion.Group], internalGV, prefix, quorum) + etcdOverrideStorage, err := newEtcdFn(api.Codecs, storageVersions[apigroup.GroupVersion.Group], internalGV, overrideEtcdConfig) if err != nil { glog.Fatalf("Invalid storage version or misconfigured etcd for %s: %v", tokens[0], err) } @@ -187,7 +187,7 @@ func Run(s *options.APIServer) error { } glog.Infof("Will report %v as public IP address.", s.AdvertiseAddress) - if len(s.EtcdServerList) == 0 { + if len(s.EtcdConfig.ServerList) == 0 { glog.Fatalf("--etcd-servers must be specified") } @@ -281,7 +281,7 @@ func Run(s *options.APIServer) error { if _, found := storageVersions[legacyV1Group.GroupVersion.Group]; !found { glog.Fatalf("Couldn't find the storage version for group: %q in storageVersions: %v", legacyV1Group.GroupVersion.Group, storageVersions) } - etcdStorage, err := newEtcd(s.EtcdServerList, api.Codecs, storageVersions[legacyV1Group.GroupVersion.Group], "/__internal", s.EtcdPathPrefix, s.EtcdQuorumRead) + etcdStorage, err := newEtcd(api.Codecs, storageVersions[legacyV1Group.GroupVersion.Group], "/__internal", s.EtcdConfig) if err != nil { glog.Fatalf("Invalid storage version or misconfigured etcd: %v", err) } @@ -296,7 +296,7 @@ func Run(s *options.APIServer) error { if _, found := storageVersions[expGroup.GroupVersion.Group]; !found { glog.Fatalf("Couldn't find the storage version for group: %q in storageVersions: %v", expGroup.GroupVersion.Group, storageVersions) } - expEtcdStorage, err := newEtcd(s.EtcdServerList, api.Codecs, storageVersions[expGroup.GroupVersion.Group], "extensions/__internal", s.EtcdPathPrefix, s.EtcdQuorumRead) + expEtcdStorage, err := newEtcd(api.Codecs, storageVersions[expGroup.GroupVersion.Group], "extensions/__internal", s.EtcdConfig) if err != nil { glog.Fatalf("Invalid extensions storage version or misconfigured etcd: %v", err) } @@ -332,7 +332,7 @@ func Run(s *options.APIServer) error { glog.Fatalf("The storage version for autoscaling must be either 'autoscaling/v1' or 'extensions/v1beta1'") } glog.Infof("Using %v for autoscaling group storage version", storageGroupVersion) - autoscalingEtcdStorage, err := newEtcd(s.EtcdServerList, api.Codecs, storageGroupVersion, "extensions/__internal", s.EtcdPathPrefix, s.EtcdQuorumRead) + autoscalingEtcdStorage, err := newEtcd(api.Codecs, storageGroupVersion, "extensions/__internal", s.EtcdConfig) if err != nil { glog.Fatalf("Invalid extensions storage version or misconfigured etcd: %v", err) } @@ -359,14 +359,14 @@ func Run(s *options.APIServer) error { glog.Fatalf("The storage version for batch must be either 'batch/v1' or 'extensions/v1beta1'") } glog.Infof("Using %v for batch group storage version", storageGroupVersion) - batchEtcdStorage, err := newEtcd(s.EtcdServerList, api.Codecs, storageGroupVersion, "extensions/__internal", s.EtcdPathPrefix, s.EtcdQuorumRead) + batchEtcdStorage, err := newEtcd(api.Codecs, storageGroupVersion, "extensions/__internal", s.EtcdConfig) if err != nil { glog.Fatalf("Invalid extensions storage version or misconfigured etcd: %v", err) } storageDestinations.AddAPIGroup(batch.GroupName, batchEtcdStorage) } - updateEtcdOverrides(s.EtcdServersOverrides, storageVersions, s.EtcdPathPrefix, s.EtcdQuorumRead, &storageDestinations, newEtcd) + updateEtcdOverrides(s.EtcdServersOverrides, storageVersions, s.EtcdConfig, &storageDestinations, newEtcd) n := s.ServiceClusterIPRange diff --git a/cmd/kube-apiserver/app/server_test.go b/cmd/kube-apiserver/app/server_test.go index 0d6b6fe06ab..958cab23a3b 100644 --- a/cmd/kube-apiserver/app/server_test.go +++ b/cmd/kube-apiserver/app/server_test.go @@ -28,6 +28,7 @@ import ( "k8s.io/kubernetes/pkg/genericapiserver" "k8s.io/kubernetes/pkg/runtime" "k8s.io/kubernetes/pkg/storage" + etcdstorage "k8s.io/kubernetes/pkg/storage/etcd" ) func TestLongRunningRequestRegexp(t *testing.T) { @@ -100,15 +101,19 @@ func TestUpdateEtcdOverrides(t *testing.T) { } for _, test := range testCases { - newEtcd := func(serverList []string, _ runtime.NegotiatedSerializer, _, _, _ string, _ bool) (storage.Interface, error) { - if !reflect.DeepEqual(test.servers, serverList) { - t.Errorf("unexpected server list, expected: %#v, got: %#v", test.servers, serverList) + newEtcd := func(_ runtime.NegotiatedSerializer, _, _ string, etcdConfig etcdstorage.EtcdConfig) (storage.Interface, error) { + if !reflect.DeepEqual(test.servers, etcdConfig.ServerList) { + t.Errorf("unexpected server list, expected: %#v, got: %#v", test.servers, etcdConfig.ServerList) } return nil, nil } storageDestinations := genericapiserver.NewStorageDestinations() override := test.apigroup + "/" + test.resource + "#" + strings.Join(test.servers, ";") - updateEtcdOverrides([]string{override}, storageVersions, "", false, &storageDestinations, newEtcd) + defaultEtcdConfig := etcdstorage.EtcdConfig{ + Prefix: genericapiserver.DefaultEtcdPathPrefix, + ServerList: []string{"http://127.0.0.1"}, + } + updateEtcdOverrides([]string{override}, storageVersions, defaultEtcdConfig, &storageDestinations, newEtcd) apigroup, ok := storageDestinations.APIGroups[test.apigroup] if !ok { t.Errorf("apigroup: %s not created", test.apigroup) diff --git a/docs/admin/kube-apiserver.md b/docs/admin/kube-apiserver.md index f2023077d4b..6086eee5047 100644 --- a/docs/admin/kube-apiserver.md +++ b/docs/admin/kube-apiserver.md @@ -67,6 +67,9 @@ kube-apiserver --cloud-provider="": The provider for cloud services. Empty string for no provider. --cors-allowed-origins=[]: List of allowed origins for CORS, comma separated. An allowed origin can be a regular expression to support subdomain matching. If this list is empty CORS will not be enabled. --delete-collection-workers=1: Number of workers spawned for DeleteCollection call. These are used to speed up namespace cleanup. + --etcd-cafile="": SSL Certificate Authority file used to secure etcd communication + --etcd-certfile="": SSL certification file used to secure etcd communication + --etcd-keyfile="": SSL key file used to secure etcd communication --etcd-prefix="/registry": The prefix for all resource paths in etcd. --etcd-quorum-read[=false]: If true, enable quorum read --etcd-servers=[]: List of etcd servers to watch (http://ip:port), comma separated. Mutually exclusive with -etcd-config @@ -112,7 +115,7 @@ kube-apiserver --watch-cache-sizes=[]: List of watch cache sizes for every resource (pods, nodes, etc.), comma separated. The individual override format: resource#size, where size is a number. It takes effect when watch-cache is enabled. ``` -###### Auto generated by spf13/cobra on 24-Feb-2016 +###### Auto generated by spf13/cobra on 6-Mar-2016 diff --git a/examples/apiserver/apiserver.go b/examples/apiserver/apiserver.go index f83c4c9f1cd..b054d870c9b 100644 --- a/examples/apiserver/apiserver.go +++ b/examples/apiserver/apiserver.go @@ -34,9 +34,11 @@ import ( func newStorageDestinations(groupName string, groupMeta *apimachinery.GroupMeta) (*genericapiserver.StorageDestinations, error) { storageDestinations := genericapiserver.NewStorageDestinations() - var storageConfig etcdstorage.EtcdConfig - storageConfig.ServerList = []string{"http://127.0.0.1:4001"} - storageConfig.Prefix = genericapiserver.DefaultEtcdPathPrefix + var storageConfig etcdstorage.EtcdStorageConfig + storageConfig.Config = etcdstorage.EtcdConfig{ + Prefix: genericapiserver.DefaultEtcdPathPrefix, + ServerList: []string{"http://127.0.0.1:4001"}, + } storageConfig.Codec = groupMeta.Codec storageInterface, err := storageConfig.NewStorage() if err != nil { diff --git a/hack/verify-flags/known-flags.txt b/hack/verify-flags/known-flags.txt index 9d681c203b9..e4c2d215e64 100644 --- a/hack/verify-flags/known-flags.txt +++ b/hack/verify-flags/known-flags.txt @@ -102,6 +102,9 @@ etcd-quorum-read etcd-server etcd-servers etcd-servers-overrides +etcd-keyfile +etcd-certfile +etcd-cafile event-burst event-qps event-ttl diff --git a/pkg/genericapiserver/server_run_options.go b/pkg/genericapiserver/server_run_options.go index e88b64125c8..2ebc3d77aa6 100644 --- a/pkg/genericapiserver/server_run_options.go +++ b/pkg/genericapiserver/server_run_options.go @@ -30,7 +30,6 @@ type ServerRunOptions struct { BindAddress net.IP CertDirectory string ClientCAFile string - EtcdQuorumRead bool InsecureBindAddress net.IP InsecurePort int LongRunningRequestRE string diff --git a/pkg/storage/etcd/etcd_helper.go b/pkg/storage/etcd/etcd_helper.go index e84eb950567..5edcb83c437 100644 --- a/pkg/storage/etcd/etcd_helper.go +++ b/pkg/storage/etcd/etcd_helper.go @@ -37,69 +37,110 @@ import ( "k8s.io/kubernetes/pkg/watch" etcd "github.com/coreos/etcd/client" + "github.com/coreos/etcd/pkg/transport" "github.com/golang/glog" "golang.org/x/net/context" ) // storage.Config object for etcd. -type EtcdConfig struct { - ServerList []string - Codec runtime.Codec - Prefix string - Quorum bool +type EtcdStorageConfig struct { + Config EtcdConfig + Codec runtime.Codec } // implements storage.Config -func (c *EtcdConfig) GetType() string { +func (c *EtcdStorageConfig) GetType() string { return "etcd" } // implements storage.Config -func (c *EtcdConfig) NewStorage() (storage.Interface, error) { - cfg := etcd.Config{ - Endpoints: c.ServerList, - // TODO: Determine if transport needs optimization - Transport: &http.Transport{ - Proxy: http.ProxyFromEnvironment, - Dial: (&net.Dialer{ - Timeout: 30 * time.Second, - KeepAlive: 30 * time.Second, - }).Dial, - TLSHandshakeTimeout: 10 * time.Second, - MaxIdleConnsPerHost: 500, - }, - } - etcdClient, err := etcd.New(cfg) +func (c *EtcdStorageConfig) NewStorage() (storage.Interface, error) { + etcdClient, err := c.Config.newEtcdClient() if err != nil { return nil, err } - return NewEtcdStorage(etcdClient, c.Codec, c.Prefix, c.Quorum), nil + return NewEtcdStorage(etcdClient, c.Codec, c.Config.Prefix, c.Config.Quorum), nil +} + +// Configuration object for constructing etcd.Config +type EtcdConfig struct { + Prefix string + ServerList []string + KeyFile string + CertFile string + CAFile string + Quorum bool +} + +func (c *EtcdConfig) newEtcdClient() (etcd.Client, error) { + t, err := c.newHttpTransport() + if err != nil { + return nil, err + } + + cli, err := etcd.New(etcd.Config{ + Endpoints: c.ServerList, + Transport: t, + }) + if err != nil { + return nil, err + } + + return cli, nil +} + +func (c *EtcdConfig) newHttpTransport() (*http.Transport, error) { + info := transport.TLSInfo{ + CertFile: c.CertFile, + KeyFile: c.KeyFile, + CAFile: c.CAFile, + } + cfg, err := info.ClientConfig() + if err != nil { + return nil, err + } + + // Copied from etcd.DefaultTransport declaration. + // TODO: Determine if transport needs optimization + tr := &http.Transport{ + Proxy: http.ProxyFromEnvironment, + Dial: (&net.Dialer{ + Timeout: 30 * time.Second, + KeepAlive: 30 * time.Second, + }).Dial, + TLSHandshakeTimeout: 10 * time.Second, + MaxIdleConnsPerHost: 500, + TLSClientConfig: cfg, + } + + return tr, nil } // Creates a new storage interface from the client // TODO: deprecate in favor of storage.Config abstraction over time func NewEtcdStorage(client etcd.Client, codec runtime.Codec, prefix string, quorum bool) storage.Interface { return &etcdHelper{ - etcdclient: client, - client: etcd.NewKeysAPI(client), - codec: codec, - versioner: APIObjectVersioner{}, - copier: api.Scheme, - pathPrefix: path.Join("/", prefix), - quorum: quorum, - cache: util.NewCache(maxEtcdCacheEntries), + etcdMembersAPI: etcd.NewMembersAPI(client), + etcdKeysAPI: etcd.NewKeysAPI(client), + codec: codec, + versioner: APIObjectVersioner{}, + copier: api.Scheme, + pathPrefix: path.Join("/", prefix), + quorum: quorum, + cache: util.NewCache(maxEtcdCacheEntries), } } // etcdHelper is the reference implementation of storage.Interface. type etcdHelper struct { - etcdclient etcd.Client - client etcd.KeysAPI - codec runtime.Codec - copier runtime.ObjectCopier + etcdMembersAPI etcd.MembersAPI + etcdKeysAPI etcd.KeysAPI + codec runtime.Codec + copier runtime.ObjectCopier // Note that versioner is required for etcdHelper to work correctly. // The public constructors (NewStorage & NewEtcdStorage) are setting it // correctly, so be careful when manipulating with it manually. + // optional, has to be set to perform any atomic operations versioner storage.Versioner // prefix for all etcd keys pathPrefix string @@ -130,8 +171,7 @@ func (h *etcdHelper) Backends(ctx context.Context) []string { if ctx == nil { glog.Errorf("Context is nil") } - membersAPI := etcd.NewMembersAPI(h.etcdclient) - members, err := membersAPI.List(ctx) + members, err := h.etcdMembersAPI.List(ctx) if err != nil { glog.Errorf("Error obtaining etcd members list: %q", err) return nil @@ -171,7 +211,7 @@ func (h *etcdHelper) Create(ctx context.Context, key string, obj, out runtime.Ob TTL: time.Duration(ttl) * time.Second, PrevExist: etcd.PrevNoExist, } - response, err := h.client.Set(ctx, key, string(data), &opts) + response, err := h.etcdKeysAPI.Set(ctx, key, string(data), &opts) metrics.RecordEtcdRequestLatency("create", getTypeName(obj), startTime) trace.Step("Object created") if err != nil { @@ -219,7 +259,7 @@ func (h *etcdHelper) Set(ctx context.Context, key string, obj, out runtime.Objec TTL: time.Duration(ttl) * time.Second, PrevIndex: version, } - response, err = h.client.Set(ctx, key, string(data), &opts) + response, err = h.etcdKeysAPI.Set(ctx, key, string(data), &opts) metrics.RecordEtcdRequestLatency("compareAndSwap", getTypeName(obj), startTime) if err != nil { return err @@ -232,7 +272,7 @@ func (h *etcdHelper) Set(ctx context.Context, key string, obj, out runtime.Objec TTL: time.Duration(ttl) * time.Second, PrevExist: etcd.PrevNoExist, } - response, err = h.client.Set(ctx, key, string(data), &opts) + response, err = h.etcdKeysAPI.Set(ctx, key, string(data), &opts) if err != nil { return err } @@ -260,7 +300,7 @@ func (h *etcdHelper) Delete(ctx context.Context, key string, out runtime.Object) } startTime := time.Now() - response, err := h.client.Delete(ctx, key, nil) + response, err := h.etcdKeysAPI.Delete(ctx, key, nil) metrics.RecordEtcdRequestLatency("delete", getTypeName(out), startTime) if !etcdutil.IsEtcdNotFound(err) { // if the object that existed prior to the delete is returned by etcd, update out. @@ -282,7 +322,7 @@ func (h *etcdHelper) Watch(ctx context.Context, key string, resourceVersion stri } key = h.prefixEtcdKey(key) w := newEtcdWatcher(false, h.quorum, nil, filter, h.codec, h.versioner, nil, h) - go w.etcdWatch(ctx, h.client, key, watchRV) + go w.etcdWatch(ctx, h.etcdKeysAPI, key, watchRV) return w, nil } @@ -297,7 +337,7 @@ func (h *etcdHelper) WatchList(ctx context.Context, key string, resourceVersion } key = h.prefixEtcdKey(key) w := newEtcdWatcher(true, h.quorum, exceptKey(key), filter, h.codec, h.versioner, nil, h) - go w.etcdWatch(ctx, h.client, key, watchRV) + go w.etcdWatch(ctx, h.etcdKeysAPI, key, watchRV) return w, nil } @@ -323,7 +363,7 @@ func (h *etcdHelper) bodyAndExtractObj(ctx context.Context, key string, objPtr r Quorum: h.quorum, } - response, err := h.client.Get(ctx, key, opts) + response, err := h.etcdKeysAPI.Get(ctx, key, opts) metrics.RecordEtcdRequestLatency("get", getTypeName(objPtr), startTime) if err != nil && !etcdutil.IsEtcdNotFound(err) { @@ -384,7 +424,7 @@ func (h *etcdHelper) GetToList(ctx context.Context, key string, filter storage.F opts := &etcd.GetOptions{ Quorum: h.quorum, } - response, err := h.client.Get(ctx, key, opts) + response, err := h.etcdKeysAPI.Get(ctx, key, opts) metrics.RecordEtcdRequestLatency("get", getTypeName(listPtr), startTime) trace.Step("Etcd node read") @@ -489,7 +529,7 @@ func (h *etcdHelper) listEtcdNode(ctx context.Context, key string) ([]*etcd.Node Sort: true, Quorum: h.quorum, } - result, err := h.client.Get(ctx, key, &opts) + result, err := h.etcdKeysAPI.Get(ctx, key, &opts) if err != nil { var index uint64 if etcdError, ok := err.(etcd.Error); ok { @@ -575,7 +615,7 @@ func (h *etcdHelper) GuaranteedUpdate(ctx context.Context, key string, ptrToType TTL: time.Duration(ttl) * time.Second, PrevExist: etcd.PrevNoExist, } - response, err := h.client.Set(ctx, key, string(data), &opts) + response, err := h.etcdKeysAPI.Set(ctx, key, string(data), &opts) metrics.RecordEtcdRequestLatency("create", getTypeName(ptrToType), startTime) if etcdutil.IsEtcdNodeExist(err) { continue @@ -598,7 +638,7 @@ func (h *etcdHelper) GuaranteedUpdate(ctx context.Context, key string, ptrToType PrevIndex: index, TTL: time.Duration(ttl) * time.Second, } - response, err := h.client.Set(ctx, key, string(data), &opts) + response, err := h.etcdKeysAPI.Set(ctx, key, string(data), &opts) metrics.RecordEtcdRequestLatency("compareAndSwap", getTypeName(ptrToType), startTime) if etcdutil.IsEtcdTestFailed(err) { // Try again. diff --git a/pkg/storage/etcd/etcd_helper_test.go b/pkg/storage/etcd/etcd_helper_test.go index 5b7d921c798..5df9a63d5a4 100644 --- a/pkg/storage/etcd/etcd_helper_test.go +++ b/pkg/storage/etcd/etcd_helper_test.go @@ -125,6 +125,7 @@ func TestList(t *testing.T) { if err != nil { t.Errorf("Unexpected error %v", err) } + if e, a := list.Items, got.Items; !reflect.DeepEqual(e, a) { t.Errorf("Expected %#v, got %#v", e, a) } diff --git a/pkg/storage/etcd/testing/certificates.go b/pkg/storage/etcd/testing/certificates.go new file mode 100644 index 00000000000..208e33a9093 --- /dev/null +++ b/pkg/storage/etcd/testing/certificates.go @@ -0,0 +1,69 @@ +/* +Copyright 2014 The Kubernetes Authors All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package testing + +// You can use cfssl tool to generate certificates, please refer +// https://github.com/coreos/etcd/tree/master/hack/tls-setup for more details. +const CAFileContent = ` +-----BEGIN CERTIFICATE----- +MIIDADCCAoWgAwIBAgIUPyoDaJMWija/6scZvsZIcPjyjqswCgYIKoZIzj0EAwMw +gawxCzAJBgNVBAYTAlVTMSowKAYDVQQKEyFIb25lc3QgQWNobWVkJ3MgVXNlZCBD +ZXJ0aWZpY2F0ZXMxKTAnBgNVBAsTIEhhc3RpbHktR2VuZXJhdGVkIFZhbHVlcyBE +aXZpc29uMRYwFAYDVQQHEw1TYW4gRnJhbmNpc2NvMRMwEQYDVQQIEwpDYWxpZm9y +bmlhMRkwFwYDVQQDExBBdXRvZ2VuZXJhdGVkIENBMB4XDTE2MDIyNTEwMzYwMFoX +DTIxMDIyMzEwMzYwMFowgawxCzAJBgNVBAYTAlVTMSowKAYDVQQKEyFIb25lc3Qg +QWNobWVkJ3MgVXNlZCBDZXJ0aWZpY2F0ZXMxKTAnBgNVBAsTIEhhc3RpbHktR2Vu +ZXJhdGVkIFZhbHVlcyBEaXZpc29uMRYwFAYDVQQHEw1TYW4gRnJhbmNpc2NvMRMw +EQYDVQQIEwpDYWxpZm9ybmlhMRkwFwYDVQQDExBBdXRvZ2VuZXJhdGVkIENBMHYw +EAYHKoZIzj0CAQYFK4EEACIDYgAEUjwvCgZPvo11v8qf+rBQRdYAt6IdMZJd3t1g +DjSySvBc3b+1WmCe911D/Zdwm/s83M4EQm8qUMeMvt60IqKIBZ6BDBbZdqQRycxJ +DuQwgyyYHQl5G52EDSJx//U1OkrOo2YwZDAOBgNVHQ8BAf8EBAMCAQYwEgYDVR0T +AQH/BAgwBgEB/wIBAjAdBgNVHQ4EFgQUp0oCeNg5O4HvABVa7/iJuLDkjAwwHwYD +VR0jBBgwFoAUp0oCeNg5O4HvABVa7/iJuLDkjAwwCgYIKoZIzj0EAwMDaQAwZgIx +AMuY6J2q53uFus7mZTEfWERXoUrTSvj2DEV+6MrmGD8VW2YaTwIGM0qzKlamb1QJ +rQIxAKtbXrfYzAjKBnrhdLD0kgf06pTQkIqBHj4zLen2K4NnVJWCSsKMua8FG+zP +jqvi0Q== +-----END CERTIFICATE----- +` +const CertFileContent = ` +-----BEGIN CERTIFICATE----- +MIIC0zCCAlmgAwIBAgIUHXuZ3c9/pnUh2AOW3iiypCQpYEMwCgYIKoZIzj0EAwMw +gawxCzAJBgNVBAYTAlVTMSowKAYDVQQKEyFIb25lc3QgQWNobWVkJ3MgVXNlZCBD +ZXJ0aWZpY2F0ZXMxKTAnBgNVBAsTIEhhc3RpbHktR2VuZXJhdGVkIFZhbHVlcyBE +aXZpc29uMRYwFAYDVQQHEw1TYW4gRnJhbmNpc2NvMRMwEQYDVQQIEwpDYWxpZm9y +bmlhMRkwFwYDVQQDExBBdXRvZ2VuZXJhdGVkIENBMB4XDTE2MDIyNTEwMzYwMFoX +DTE3MDIyNDEwMzYwMFowVTEWMBQGA1UEChMNYXV0b2dlbmVyYXRlZDEVMBMGA1UE +CxMMZXRjZCBjbHVzdGVyMRUwEwYDVQQHEwx0aGUgaW50ZXJuZXQxDTALBgNVBAMT +BGV0Y2QwdjAQBgcqhkjOPQIBBgUrgQQAIgNiAAQ9HJgNWxMIrnns2+Sb8FUj9RBA +Fk/qP9cExp+FmbnjnOUy2poK5pGDdr88TMUAXyzV7J/rbTo6pDmWLWMEcbIgqfWY +W6BRmAaPWuQNLsP/L2k2N3NHvHZfCZK+efuDCGGjgZEwgY4wDgYDVR0PAQH/BAQD +AgWgMB0GA1UdJQQWMBQGCCsGAQUFBwMBBggrBgEFBQcDAjAMBgNVHRMBAf8EAjAA +MB0GA1UdDgQWBBQCFVaETH9zYZaw5OTHh8uuq4K84DAfBgNVHSMEGDAWgBSnSgJ4 +2Dk7ge8AFVrv+Im4sOSMDDAPBgNVHREECDAGhwR/AAABMAoGCCqGSM49BAMDA2gA +MGUCMQC9+42ftFzkhujW6lTKZhf/rW3IyNmm5jXTS+RCPPB7jMqkH4Llq2PWgBR8 +YQ8keX4CMGb6h8CLsgmFeVjpRjBTBuFSPSf3DQPPG2BkxPhtFek31g9lpSOjkioZ +fKv0Kz+tUw== +-----END CERTIFICATE----- +` +const KeyFileContent = ` +-----BEGIN EC PRIVATE KEY----- +MIGkAgEBBDBkmx3mD+yd/qh6WYBTUAFbHZLHKrBv6o4H2AnSfx2HiMQoPm+elwhR +xhWa/tV+8zCgBwYFK4EEACKhZANiAAQ9HJgNWxMIrnns2+Sb8FUj9RBAFk/qP9cE +xp+FmbnjnOUy2poK5pGDdr88TMUAXyzV7J/rbTo6pDmWLWMEcbIgqfWYW6BRmAaP +WuQNLsP/L2k2N3NHvHZfCZK+efuDCGE= +-----END EC PRIVATE KEY----- +` diff --git a/pkg/storage/etcd/testing/utils.go b/pkg/storage/etcd/testing/utils.go index 7dc65b6de47..3bc9516a64e 100644 --- a/pkg/storage/etcd/testing/utils.go +++ b/pkg/storage/etcd/testing/utils.go @@ -23,6 +23,7 @@ import ( "net/http" "net/http/httptest" "os" + "path" "testing" "time" @@ -42,6 +43,11 @@ type EtcdTestServer struct { PeerListeners, ClientListeners []net.Listener Client etcd.Client + CertificatesDir string + CertFile string + KeyFile string + CAFile string + raftHandler http.Handler s *etcdserver.EtcdServer hss []*httptest.Server @@ -56,6 +62,39 @@ func newLocalListener(t *testing.T) net.Listener { return l } +// newSecuredLocalListener opens a port localhost using any port +// with SSL enable +func newSecuredLocalListener(t *testing.T, certFile, keyFile, caFile string) net.Listener { + var l net.Listener + l, err := net.Listen("tcp", "127.0.0.1:0") + if err != nil { + t.Fatal(err) + } + tlsInfo := transport.TLSInfo{ + CertFile: certFile, + KeyFile: keyFile, + CAFile: caFile, + } + l, err = transport.NewKeepAliveListener(l, "https", tlsInfo) + if err != nil { + t.Fatal(err) + } + return l +} + +func newHttpTransport(t *testing.T, certFile, keyFile, caFile string) etcd.CancelableTransport { + tlsInfo := transport.TLSInfo{ + CertFile: certFile, + KeyFile: keyFile, + CAFile: caFile, + } + tr, err := transport.NewTransport(tlsInfo) + if err != nil { + t.Fatal(err) + } + return tr +} + // configureTestCluster will set the params to start an etcd server func configureTestCluster(t *testing.T, name string) *EtcdTestServer { var err error @@ -68,9 +107,26 @@ func configureTestCluster(t *testing.T, name string) *EtcdTestServer { t.Fatal(err) } - cln := newLocalListener(t) + m.CertificatesDir, err = ioutil.TempDir(os.TempDir(), "etcd_certificates") + if err != nil { + t.Fatal(err) + } + m.CertFile = path.Join(m.CertificatesDir, "etcdcert.pem") + if err = ioutil.WriteFile(m.CertFile, []byte(CertFileContent), 0644); err != nil { + t.Fatal(err) + } + m.KeyFile = path.Join(m.CertificatesDir, "etcdkey.pem") + if err = ioutil.WriteFile(m.KeyFile, []byte(KeyFileContent), 0644); err != nil { + t.Fatal(err) + } + m.CAFile = path.Join(m.CertificatesDir, "ca.pem") + if err = ioutil.WriteFile(m.CAFile, []byte(CAFileContent), 0644); err != nil { + t.Fatal(err) + } + + cln := newSecuredLocalListener(t, m.CertFile, m.KeyFile, m.CAFile) m.ClientListeners = []net.Listener{cln} - m.ClientURLs, err = types.NewURLs([]string{"http://" + cln.Addr().String()}) + m.ClientURLs, err = types.NewURLs([]string{"https://" + cln.Addr().String()}) if err != nil { t.Fatal(err) } @@ -161,6 +217,9 @@ func (m *EtcdTestServer) Terminate(t *testing.T) { if err := os.RemoveAll(m.ServerConfig.DataDir); err != nil { t.Fatal(err) } + if err := os.RemoveAll(m.CertificatesDir); err != nil { + t.Fatal(err) + } } // NewEtcdTestClientServer creates a new client and server for testing @@ -173,6 +232,7 @@ func NewEtcdTestClientServer(t *testing.T) *EtcdTestServer { } cfg := etcd.Config{ Endpoints: server.ClientURLs.StringSlice(), + Transport: newHttpTransport(t, server.CertFile, server.KeyFile, server.CAFile), } server.Client, err = etcd.New(cfg) if err != nil {