diff --git a/Godeps/Godeps.json b/Godeps/Godeps.json index 39a74f59b78..ac4fa02d97e 100644 --- a/Godeps/Godeps.json +++ b/Godeps/Godeps.json @@ -154,6 +154,11 @@ "Comment": "v2.3.0", "Rev": "5e6eb7e19d6385adfabb1f1caea03e732f9348ad" }, + { + "ImportPath": "github.com/coreos/etcd/clientv3", + "Comment": "v2.3.0", + "Rev": "5e6eb7e19d6385adfabb1f1caea03e732f9348ad" + }, { "ImportPath": "github.com/coreos/etcd/compactor", "Comment": "v2.3.0", @@ -174,6 +179,11 @@ "Comment": "v2.3.0", "Rev": "5e6eb7e19d6385adfabb1f1caea03e732f9348ad" }, + { + "ImportPath": "github.com/coreos/etcd/integration", + "Comment": "v2.3.0", + "Rev": "5e6eb7e19d6385adfabb1f1caea03e732f9348ad" + }, { "ImportPath": "github.com/coreos/etcd/lease", "Comment": "v2.3.0", diff --git a/Godeps/LICENSES b/Godeps/LICENSES index 58e71054f8a..0abed0b4be4 100644 --- a/Godeps/LICENSES +++ b/Godeps/LICENSES @@ -5197,6 +5197,217 @@ Copyright 2014 CoreOS, Inc This product includes software developed at CoreOS, Inc. (http://www.coreos.com/). +================================================================================ += Godeps/_workspace/src/github.com/coreos/etcd/clientv3 licensed under: = + + + Apache License + Version 2.0, January 2004 + http://www.apache.org/licenses/ + + TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION + + 1. Definitions. + + "License" shall mean the terms and conditions for use, reproduction, + and distribution as defined by Sections 1 through 9 of this document. + + "Licensor" shall mean the copyright owner or entity authorized by + the copyright owner that is granting the License. + + "Legal Entity" shall mean the union of the acting entity and all + other entities that control, are controlled by, or are under common + control with that entity. For the purposes of this definition, + "control" means (i) the power, direct or indirect, to cause the + direction or management of such entity, whether by contract or + otherwise, or (ii) ownership of fifty percent (50%) or more of the + outstanding shares, or (iii) beneficial ownership of such entity. + + "You" (or "Your") shall mean an individual or Legal Entity + exercising permissions granted by this License. + + "Source" form shall mean the preferred form for making modifications, + including but not limited to software source code, documentation + source, and configuration files. + + "Object" form shall mean any form resulting from mechanical + transformation or translation of a Source form, including but + not limited to compiled object code, generated documentation, + and conversions to other media types. + + "Work" shall mean the work of authorship, whether in Source or + Object form, made available under the License, as indicated by a + copyright notice that is included in or attached to the work + (an example is provided in the Appendix below). + + "Derivative Works" shall mean any work, whether in Source or Object + form, that is based on (or derived from) the Work and for which the + editorial revisions, annotations, elaborations, or other modifications + represent, as a whole, an original work of authorship. For the purposes + of this License, Derivative Works shall not include works that remain + separable from, or merely link (or bind by name) to the interfaces of, + the Work and Derivative Works thereof. + + "Contribution" shall mean any work of authorship, including + the original version of the Work and any modifications or additions + to that Work or Derivative Works thereof, that is intentionally + submitted to Licensor for inclusion in the Work by the copyright owner + or by an individual or Legal Entity authorized to submit on behalf of + the copyright owner. For the purposes of this definition, "submitted" + means any form of electronic, verbal, or written communication sent + to the Licensor or its representatives, including but not limited to + communication on electronic mailing lists, source code control systems, + and issue tracking systems that are managed by, or on behalf of, the + Licensor for the purpose of discussing and improving the Work, but + excluding communication that is conspicuously marked or otherwise + designated in writing by the copyright owner as "Not a Contribution." + + "Contributor" shall mean Licensor and any individual or Legal Entity + on behalf of whom a Contribution has been received by Licensor and + subsequently incorporated within the Work. + + 2. Grant of Copyright License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + copyright license to reproduce, prepare Derivative Works of, + publicly display, publicly perform, sublicense, and distribute the + Work and such Derivative Works in Source or Object form. + + 3. Grant of Patent License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + (except as stated in this section) patent license to make, have made, + use, offer to sell, sell, import, and otherwise transfer the Work, + where such license applies only to those patent claims licensable + by such Contributor that are necessarily infringed by their + Contribution(s) alone or by combination of their Contribution(s) + with the Work to which such Contribution(s) was submitted. If You + institute patent litigation against any entity (including a + cross-claim or counterclaim in a lawsuit) alleging that the Work + or a Contribution incorporated within the Work constitutes direct + or contributory patent infringement, then any patent licenses + granted to You under this License for that Work shall terminate + as of the date such litigation is filed. + + 4. Redistribution. You may reproduce and distribute copies of the + Work or Derivative Works thereof in any medium, with or without + modifications, and in Source or Object form, provided that You + meet the following conditions: + + (a) You must give any other recipients of the Work or + Derivative Works a copy of this License; and + + (b) You must cause any modified files to carry prominent notices + stating that You changed the files; and + + (c) You must retain, in the Source form of any Derivative Works + that You distribute, all copyright, patent, trademark, and + attribution notices from the Source form of the Work, + excluding those notices that do not pertain to any part of + the Derivative Works; and + + (d) If the Work includes a "NOTICE" text file as part of its + distribution, then any Derivative Works that You distribute must + include a readable copy of the attribution notices contained + within such NOTICE file, excluding those notices that do not + pertain to any part of the Derivative Works, in at least one + of the following places: within a NOTICE text file distributed + as part of the Derivative Works; within the Source form or + documentation, if provided along with the Derivative Works; or, + within a display generated by the Derivative Works, if and + wherever such third-party notices normally appear. The contents + of the NOTICE file are for informational purposes only and + do not modify the License. You may add Your own attribution + notices within Derivative Works that You distribute, alongside + or as an addendum to the NOTICE text from the Work, provided + that such additional attribution notices cannot be construed + as modifying the License. + + You may add Your own copyright statement to Your modifications and + may provide additional or different license terms and conditions + for use, reproduction, or distribution of Your modifications, or + for any such Derivative Works as a whole, provided Your use, + reproduction, and distribution of the Work otherwise complies with + the conditions stated in this License. + + 5. Submission of Contributions. Unless You explicitly state otherwise, + any Contribution intentionally submitted for inclusion in the Work + by You to the Licensor shall be under the terms and conditions of + this License, without any additional terms or conditions. + Notwithstanding the above, nothing herein shall supersede or modify + the terms of any separate license agreement you may have executed + with Licensor regarding such Contributions. + + 6. Trademarks. This License does not grant permission to use the trade + names, trademarks, service marks, or product names of the Licensor, + except as required for reasonable and customary use in describing the + origin of the Work and reproducing the content of the NOTICE file. + + 7. Disclaimer of Warranty. Unless required by applicable law or + agreed to in writing, Licensor provides the Work (and each + Contributor provides its Contributions) on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + implied, including, without limitation, any warranties or conditions + of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A + PARTICULAR PURPOSE. You are solely responsible for determining the + appropriateness of using or redistributing the Work and assume any + risks associated with Your exercise of permissions under this License. + + 8. Limitation of Liability. In no event and under no legal theory, + whether in tort (including negligence), contract, or otherwise, + unless required by applicable law (such as deliberate and grossly + negligent acts) or agreed to in writing, shall any Contributor be + liable to You for damages, including any direct, indirect, special, + incidental, or consequential damages of any character arising as a + result of this License or out of the use or inability to use the + Work (including but not limited to damages for loss of goodwill, + work stoppage, computer failure or malfunction, or any and all + other commercial damages or losses), even if such Contributor + has been advised of the possibility of such damages. + + 9. Accepting Warranty or Additional Liability. While redistributing + the Work or Derivative Works thereof, You may choose to offer, + and charge a fee for, acceptance of support, warranty, indemnity, + or other liability obligations and/or rights consistent with this + License. However, in accepting such obligations, You may act only + on Your own behalf and on Your sole responsibility, not on behalf + of any other Contributor, and only if You agree to indemnify, + defend, and hold each Contributor harmless for any liability + incurred by, or claims asserted against, such Contributor by reason + of your accepting any such warranty or additional liability. + + END OF TERMS AND CONDITIONS + + APPENDIX: How to apply the Apache License to your work. + + To apply the Apache License to your work, attach the following + boilerplate notice, with the fields enclosed by brackets "[]" + replaced with your own identifying information. (Don't include + the brackets!) The text should be enclosed in the appropriate + comment syntax for the file format. We also recommend that a + file or class name and description of purpose be included on the + same "printed page" as the copyright notice for easier + identification within third-party archives. + + Copyright [yyyy] [name of copyright owner] + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. + +Copyright 2014 CoreOS, Inc + +This product includes software developed at CoreOS, Inc. +(http://www.coreos.com/). + ================================================================================ = Godeps/_workspace/src/github.com/coreos/etcd/compactor licensed under: = @@ -6041,6 +6252,217 @@ Copyright 2014 CoreOS, Inc This product includes software developed at CoreOS, Inc. (http://www.coreos.com/). +================================================================================ += Godeps/_workspace/src/github.com/coreos/etcd/integration licensed under: = + + + Apache License + Version 2.0, January 2004 + http://www.apache.org/licenses/ + + TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION + + 1. Definitions. + + "License" shall mean the terms and conditions for use, reproduction, + and distribution as defined by Sections 1 through 9 of this document. + + "Licensor" shall mean the copyright owner or entity authorized by + the copyright owner that is granting the License. + + "Legal Entity" shall mean the union of the acting entity and all + other entities that control, are controlled by, or are under common + control with that entity. For the purposes of this definition, + "control" means (i) the power, direct or indirect, to cause the + direction or management of such entity, whether by contract or + otherwise, or (ii) ownership of fifty percent (50%) or more of the + outstanding shares, or (iii) beneficial ownership of such entity. + + "You" (or "Your") shall mean an individual or Legal Entity + exercising permissions granted by this License. + + "Source" form shall mean the preferred form for making modifications, + including but not limited to software source code, documentation + source, and configuration files. + + "Object" form shall mean any form resulting from mechanical + transformation or translation of a Source form, including but + not limited to compiled object code, generated documentation, + and conversions to other media types. + + "Work" shall mean the work of authorship, whether in Source or + Object form, made available under the License, as indicated by a + copyright notice that is included in or attached to the work + (an example is provided in the Appendix below). + + "Derivative Works" shall mean any work, whether in Source or Object + form, that is based on (or derived from) the Work and for which the + editorial revisions, annotations, elaborations, or other modifications + represent, as a whole, an original work of authorship. For the purposes + of this License, Derivative Works shall not include works that remain + separable from, or merely link (or bind by name) to the interfaces of, + the Work and Derivative Works thereof. + + "Contribution" shall mean any work of authorship, including + the original version of the Work and any modifications or additions + to that Work or Derivative Works thereof, that is intentionally + submitted to Licensor for inclusion in the Work by the copyright owner + or by an individual or Legal Entity authorized to submit on behalf of + the copyright owner. For the purposes of this definition, "submitted" + means any form of electronic, verbal, or written communication sent + to the Licensor or its representatives, including but not limited to + communication on electronic mailing lists, source code control systems, + and issue tracking systems that are managed by, or on behalf of, the + Licensor for the purpose of discussing and improving the Work, but + excluding communication that is conspicuously marked or otherwise + designated in writing by the copyright owner as "Not a Contribution." + + "Contributor" shall mean Licensor and any individual or Legal Entity + on behalf of whom a Contribution has been received by Licensor and + subsequently incorporated within the Work. + + 2. Grant of Copyright License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + copyright license to reproduce, prepare Derivative Works of, + publicly display, publicly perform, sublicense, and distribute the + Work and such Derivative Works in Source or Object form. + + 3. Grant of Patent License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + (except as stated in this section) patent license to make, have made, + use, offer to sell, sell, import, and otherwise transfer the Work, + where such license applies only to those patent claims licensable + by such Contributor that are necessarily infringed by their + Contribution(s) alone or by combination of their Contribution(s) + with the Work to which such Contribution(s) was submitted. If You + institute patent litigation against any entity (including a + cross-claim or counterclaim in a lawsuit) alleging that the Work + or a Contribution incorporated within the Work constitutes direct + or contributory patent infringement, then any patent licenses + granted to You under this License for that Work shall terminate + as of the date such litigation is filed. + + 4. Redistribution. You may reproduce and distribute copies of the + Work or Derivative Works thereof in any medium, with or without + modifications, and in Source or Object form, provided that You + meet the following conditions: + + (a) You must give any other recipients of the Work or + Derivative Works a copy of this License; and + + (b) You must cause any modified files to carry prominent notices + stating that You changed the files; and + + (c) You must retain, in the Source form of any Derivative Works + that You distribute, all copyright, patent, trademark, and + attribution notices from the Source form of the Work, + excluding those notices that do not pertain to any part of + the Derivative Works; and + + (d) If the Work includes a "NOTICE" text file as part of its + distribution, then any Derivative Works that You distribute must + include a readable copy of the attribution notices contained + within such NOTICE file, excluding those notices that do not + pertain to any part of the Derivative Works, in at least one + of the following places: within a NOTICE text file distributed + as part of the Derivative Works; within the Source form or + documentation, if provided along with the Derivative Works; or, + within a display generated by the Derivative Works, if and + wherever such third-party notices normally appear. The contents + of the NOTICE file are for informational purposes only and + do not modify the License. You may add Your own attribution + notices within Derivative Works that You distribute, alongside + or as an addendum to the NOTICE text from the Work, provided + that such additional attribution notices cannot be construed + as modifying the License. + + You may add Your own copyright statement to Your modifications and + may provide additional or different license terms and conditions + for use, reproduction, or distribution of Your modifications, or + for any such Derivative Works as a whole, provided Your use, + reproduction, and distribution of the Work otherwise complies with + the conditions stated in this License. + + 5. Submission of Contributions. Unless You explicitly state otherwise, + any Contribution intentionally submitted for inclusion in the Work + by You to the Licensor shall be under the terms and conditions of + this License, without any additional terms or conditions. + Notwithstanding the above, nothing herein shall supersede or modify + the terms of any separate license agreement you may have executed + with Licensor regarding such Contributions. + + 6. Trademarks. This License does not grant permission to use the trade + names, trademarks, service marks, or product names of the Licensor, + except as required for reasonable and customary use in describing the + origin of the Work and reproducing the content of the NOTICE file. + + 7. Disclaimer of Warranty. Unless required by applicable law or + agreed to in writing, Licensor provides the Work (and each + Contributor provides its Contributions) on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + implied, including, without limitation, any warranties or conditions + of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A + PARTICULAR PURPOSE. You are solely responsible for determining the + appropriateness of using or redistributing the Work and assume any + risks associated with Your exercise of permissions under this License. + + 8. Limitation of Liability. In no event and under no legal theory, + whether in tort (including negligence), contract, or otherwise, + unless required by applicable law (such as deliberate and grossly + negligent acts) or agreed to in writing, shall any Contributor be + liable to You for damages, including any direct, indirect, special, + incidental, or consequential damages of any character arising as a + result of this License or out of the use or inability to use the + Work (including but not limited to damages for loss of goodwill, + work stoppage, computer failure or malfunction, or any and all + other commercial damages or losses), even if such Contributor + has been advised of the possibility of such damages. + + 9. Accepting Warranty or Additional Liability. While redistributing + the Work or Derivative Works thereof, You may choose to offer, + and charge a fee for, acceptance of support, warranty, indemnity, + or other liability obligations and/or rights consistent with this + License. However, in accepting such obligations, You may act only + on Your own behalf and on Your sole responsibility, not on behalf + of any other Contributor, and only if You agree to indemnify, + defend, and hold each Contributor harmless for any liability + incurred by, or claims asserted against, such Contributor by reason + of your accepting any such warranty or additional liability. + + END OF TERMS AND CONDITIONS + + APPENDIX: How to apply the Apache License to your work. + + To apply the Apache License to your work, attach the following + boilerplate notice, with the fields enclosed by brackets "[]" + replaced with your own identifying information. (Don't include + the brackets!) The text should be enclosed in the appropriate + comment syntax for the file format. We also recommend that a + file or class name and description of purpose be included on the + same "printed page" as the copyright notice for easier + identification within third-party archives. + + Copyright [yyyy] [name of copyright owner] + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. + +Copyright 2014 CoreOS, Inc + +This product includes software developed at CoreOS, Inc. +(http://www.coreos.com/). + ================================================================================ = Godeps/_workspace/src/github.com/coreos/etcd/lease licensed under: = diff --git a/Godeps/_workspace/src/github.com/coreos/etcd/clientv3/README.md b/Godeps/_workspace/src/github.com/coreos/etcd/clientv3/README.md new file mode 100644 index 00000000000..6b9735d0338 --- /dev/null +++ b/Godeps/_workspace/src/github.com/coreos/etcd/clientv3/README.md @@ -0,0 +1,69 @@ +# etcd/clientv3 + +[![Godoc](http://img.shields.io/badge/go-documentation-blue.svg?style=flat-square)](https://godoc.org/github.com/coreos/etcd/clientv3) + +`etcd/clientv3` is the official Go etcd client for v3. + +## Install + +```bash +go get github.com/coreos/etcd/clientv3 +``` + +## Get started + +Create client using `clientv3.New`: + +```go +cli, err := clientv3.New(clientv3.Config{ + Endpoints: []string{"localhost:12378", "localhost:22378", "localhost:32378"}, + DialTimeout: 5 * time.Second, +}) +if err != nil { + // handle error! +} +defer cli.Close() +``` + +etcd v3 uses [`gRPC`](http://www.grpc.io) for remote procedure calls. And `clientv3` uses +[`grpc-go`](https://github.com/grpc/grpc-go) to connect to etcd. Make sure to close the client after using it. +If the client is not closed, the connection will have leaky goroutines. To specify client request timeout, +pass `context.WithTimeout` to APIs: + +```go +ctx, cancel := context.WithTimeout(context.Background(), timeout) +resp, err := kvc.Put(ctx, "sample_key", "sample_value") +cancel() +if err != nil { + // handle error! +} +// use the response +``` + +## Error Handling + +etcd client returns 2 types of errors: + +1. context error: canceled or deadline exceeded. +2. gRPC error: see [v3rpc/error](https://github.com/coreos/etcd/blob/master/etcdserver/api/v3rpc/error.go). + +Here is the example code to handle client errors: + +```go +resp, err := kvc.Put(ctx, "", "") +if err != nil { + if err == context.Canceled { + // ctx is canceled by another routine + } else if err == context.DeadlineExceeded { + // ctx is attached with a deadline and it exceeded + } else if verr, ok := err.(*v3rpc.ErrEmptyKey); ok { + // process (verr.Errors) + } else { + // bad cluster endpoints, which are not etcd servers + } +} +``` + +## Examples + +More code examples can be found at [GoDoc](https://godoc.org/github.com/coreos/etcd/clientv3). diff --git a/Godeps/_workspace/src/github.com/coreos/etcd/clientv3/auth.go b/Godeps/_workspace/src/github.com/coreos/etcd/clientv3/auth.go new file mode 100644 index 00000000000..5c681972029 --- /dev/null +++ b/Godeps/_workspace/src/github.com/coreos/etcd/clientv3/auth.go @@ -0,0 +1,51 @@ +// Copyright 2016 Nippon Telegraph and Telephone Corporation. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package clientv3 + +import ( + pb "github.com/coreos/etcd/etcdserver/etcdserverpb" + "golang.org/x/net/context" + "google.golang.org/grpc" +) + +type ( + AuthEnableResponse pb.AuthEnableResponse +) + +type Auth interface { + // AuthEnable enables auth of a etcd cluster. + AuthEnable(ctx context.Context) (*AuthEnableResponse, error) +} + +type auth struct { + c *Client + + conn *grpc.ClientConn // conn in-use + remote pb.AuthClient +} + +func NewAuth(c *Client) Auth { + conn := c.ActiveConnection() + return &auth{ + conn: c.ActiveConnection(), + remote: pb.NewAuthClient(conn), + c: c, + } +} + +func (auth *auth) AuthEnable(ctx context.Context) (*AuthEnableResponse, error) { + resp, err := auth.remote.AuthEnable(ctx, &pb.AuthEnableRequest{}) + return (*AuthEnableResponse)(resp), err +} diff --git a/Godeps/_workspace/src/github.com/coreos/etcd/clientv3/client.go b/Godeps/_workspace/src/github.com/coreos/etcd/clientv3/client.go new file mode 100644 index 00000000000..94832cbca0f --- /dev/null +++ b/Godeps/_workspace/src/github.com/coreos/etcd/clientv3/client.go @@ -0,0 +1,244 @@ +// Copyright 2016 CoreOS, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package clientv3 + +import ( + "crypto/tls" + "errors" + "net" + "net/url" + "strings" + "sync" + "time" + + "golang.org/x/net/context" + "google.golang.org/grpc" + "google.golang.org/grpc/credentials" +) + +var ( + ErrNoAvailableEndpoints = errors.New("etcdclient: no available endpoints") +) + +// Client provides and manages an etcd v3 client session. +type Client struct { + Cluster + KV + Lease + Watcher + Auth + Maintenance + + conn *grpc.ClientConn + cfg Config + creds *credentials.TransportAuthenticator + mu sync.RWMutex // protects connection selection and error list + errors []error // errors passed to retryConnection + + ctx context.Context + cancel context.CancelFunc +} + +// EndpointDialer is a policy for choosing which endpoint to dial next +type EndpointDialer func(*Client) (*grpc.ClientConn, error) + +type Config struct { + // Endpoints is a list of URLs + Endpoints []string + + // RetryDialer chooses the next endpoint to use + RetryDialer EndpointDialer + + // DialTimeout is the timeout for failing to establish a connection. + DialTimeout time.Duration + + // TLS holds the client secure credentials, if any. + TLS *tls.Config +} + +// New creates a new etcdv3 client from a given configuration. +func New(cfg Config) (*Client, error) { + if cfg.RetryDialer == nil { + cfg.RetryDialer = dialEndpointList + } + if len(cfg.Endpoints) == 0 { + return nil, ErrNoAvailableEndpoints + } + + return newClient(&cfg) +} + +// NewFromURL creates a new etcdv3 client from a URL. +func NewFromURL(url string) (*Client, error) { + return New(Config{Endpoints: []string{url}}) +} + +// Close shuts down the client's etcd connections. +func (c *Client) Close() error { + c.mu.Lock() + if c.cancel == nil { + c.mu.Unlock() + return nil + } + c.cancel() + c.cancel = nil + c.mu.Unlock() + c.Watcher.Close() + c.Lease.Close() + return c.conn.Close() +} + +// Ctx is a context for "out of band" messages (e.g., for sending +// "clean up" message when another context is canceled). It is +// canceled on client Close(). +func (c *Client) Ctx() context.Context { return c.ctx } + +// Endpoints lists the registered endpoints for the client. +func (c *Client) Endpoints() []string { return c.cfg.Endpoints } + +// Errors returns all errors that have been observed since called last. +func (c *Client) Errors() (errs []error) { + c.mu.Lock() + defer c.mu.Unlock() + errs = c.errors + c.errors = nil + return errs +} + +// Dial establishes a connection for a given endpoint using the client's config +func (c *Client) Dial(endpoint string) (*grpc.ClientConn, error) { + opts := []grpc.DialOption{ + grpc.WithBlock(), + grpc.WithTimeout(c.cfg.DialTimeout), + } + if c.creds != nil { + opts = append(opts, grpc.WithTransportCredentials(*c.creds)) + } else { + opts = append(opts, grpc.WithInsecure()) + } + + proto := "tcp" + if url, uerr := url.Parse(endpoint); uerr == nil && url.Scheme == "unix" { + proto = "unix" + // strip unix:// prefix so certs work + endpoint = url.Host + } + f := func(a string, t time.Duration) (net.Conn, error) { + select { + case <-c.ctx.Done(): + return nil, c.ctx.Err() + default: + } + return net.DialTimeout(proto, a, t) + } + opts = append(opts, grpc.WithDialer(f)) + + conn, err := grpc.Dial(endpoint, opts...) + if err != nil { + return nil, err + } + return conn, nil +} + +func newClient(cfg *Config) (*Client, error) { + if cfg == nil { + cfg = &Config{RetryDialer: dialEndpointList} + } + var creds *credentials.TransportAuthenticator + if cfg.TLS != nil { + c := credentials.NewTLS(cfg.TLS) + creds = &c + } + // use a temporary skeleton client to bootstrap first connection + ctx, cancel := context.WithCancel(context.TODO()) + conn, err := cfg.RetryDialer(&Client{cfg: *cfg, creds: creds, ctx: ctx}) + if err != nil { + return nil, err + } + client := &Client{ + conn: conn, + cfg: *cfg, + creds: creds, + ctx: ctx, + cancel: cancel, + } + client.Cluster = NewCluster(client) + client.KV = NewKV(client) + client.Lease = NewLease(client) + client.Watcher = NewWatcher(client) + client.Auth = NewAuth(client) + client.Maintenance = &maintenance{c: client} + + return client, nil +} + +// ActiveConnection returns the current in-use connection +func (c *Client) ActiveConnection() *grpc.ClientConn { + c.mu.RLock() + defer c.mu.RUnlock() + return c.conn +} + +// retryConnection establishes a new connection +func (c *Client) retryConnection(oldConn *grpc.ClientConn, err error) (*grpc.ClientConn, error) { + c.mu.Lock() + defer c.mu.Unlock() + if err != nil { + c.errors = append(c.errors, err) + } + if c.cancel == nil { + return nil, c.ctx.Err() + } + if oldConn != c.conn { + // conn has already been updated + return c.conn, nil + } + + oldConn.Close() + if st, _ := oldConn.State(); st != grpc.Shutdown { + // wait for shutdown so grpc doesn't leak sleeping goroutines + oldConn.WaitForStateChange(c.ctx, st) + } + + conn, dialErr := c.cfg.RetryDialer(c) + if dialErr != nil { + c.errors = append(c.errors, dialErr) + return nil, dialErr + } + c.conn = conn + return c.conn, nil +} + +// dialEndpointList attempts to connect to each endpoint in order until a +// connection is established. +func dialEndpointList(c *Client) (*grpc.ClientConn, error) { + var err error + for _, ep := range c.Endpoints() { + conn, curErr := c.Dial(ep) + if curErr != nil { + err = curErr + } else { + return conn, nil + } + } + return nil, err +} + +// isHalted returns true if the given error and context indicate no forward +// progress can be made, even after reconnecting. +func isHalted(ctx context.Context, err error) bool { + isRPCError := strings.HasPrefix(grpc.ErrorDesc(err), "etcdserver: ") + return isRPCError || ctx.Err() != nil +} diff --git a/Godeps/_workspace/src/github.com/coreos/etcd/clientv3/cluster.go b/Godeps/_workspace/src/github.com/coreos/etcd/clientv3/cluster.go new file mode 100644 index 00000000000..8d1369cb3f4 --- /dev/null +++ b/Godeps/_workspace/src/github.com/coreos/etcd/clientv3/cluster.go @@ -0,0 +1,170 @@ +// Copyright 2016 CoreOS, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package clientv3 + +import ( + "sync" + + pb "github.com/coreos/etcd/etcdserver/etcdserverpb" + "golang.org/x/net/context" + "google.golang.org/grpc" +) + +type ( + Member pb.Member + MemberListResponse pb.MemberListResponse + MemberAddResponse pb.MemberAddResponse + MemberRemoveResponse pb.MemberRemoveResponse + MemberUpdateResponse pb.MemberUpdateResponse +) + +type Cluster interface { + // MemberList lists the current cluster membership. + MemberList(ctx context.Context) (*MemberListResponse, error) + + // MemberLeader returns the current leader member. + MemberLeader(ctx context.Context) (*Member, error) + + // MemberAdd adds a new member into the cluster. + MemberAdd(ctx context.Context, peerAddrs []string) (*MemberAddResponse, error) + + // MemberRemove removes an existing member from the cluster. + MemberRemove(ctx context.Context, id uint64) (*MemberRemoveResponse, error) + + // MemberUpdate updates the peer addresses of the member. + MemberUpdate(ctx context.Context, id uint64, peerAddrs []string) (*MemberUpdateResponse, error) +} + +type cluster struct { + c *Client + + mu sync.Mutex + conn *grpc.ClientConn // conn in-use + remote pb.ClusterClient +} + +func NewCluster(c *Client) Cluster { + conn := c.ActiveConnection() + + return &cluster{ + c: c, + + conn: conn, + remote: pb.NewClusterClient(conn), + } +} + +func (c *cluster) MemberAdd(ctx context.Context, peerAddrs []string) (*MemberAddResponse, error) { + r := &pb.MemberAddRequest{PeerURLs: peerAddrs} + resp, err := c.getRemote().MemberAdd(ctx, r) + if err == nil { + return (*MemberAddResponse)(resp), nil + } + + if isHalted(ctx, err) { + return nil, err + } + + go c.switchRemote(err) + return nil, err +} + +func (c *cluster) MemberRemove(ctx context.Context, id uint64) (*MemberRemoveResponse, error) { + r := &pb.MemberRemoveRequest{ID: id} + resp, err := c.getRemote().MemberRemove(ctx, r) + if err == nil { + return (*MemberRemoveResponse)(resp), nil + } + + if isHalted(ctx, err) { + return nil, err + } + + go c.switchRemote(err) + return nil, err +} + +func (c *cluster) MemberUpdate(ctx context.Context, id uint64, peerAddrs []string) (*MemberUpdateResponse, error) { + // it is safe to retry on update. + for { + r := &pb.MemberUpdateRequest{ID: id, PeerURLs: peerAddrs} + resp, err := c.getRemote().MemberUpdate(ctx, r) + if err == nil { + return (*MemberUpdateResponse)(resp), nil + } + + if isHalted(ctx, err) { + return nil, err + } + + err = c.switchRemote(err) + if err != nil { + return nil, err + } + } +} + +func (c *cluster) MemberList(ctx context.Context) (*MemberListResponse, error) { + // it is safe to retry on list. + for { + resp, err := c.getRemote().MemberList(ctx, &pb.MemberListRequest{}) + if err == nil { + return (*MemberListResponse)(resp), nil + } + + if isHalted(ctx, err) { + return nil, err + } + + err = c.switchRemote(err) + if err != nil { + return nil, err + } + } +} + +func (c *cluster) MemberLeader(ctx context.Context) (*Member, error) { + resp, err := c.MemberList(ctx) + if err != nil { + return nil, err + } + for _, m := range resp.Members { + if m.IsLeader { + return (*Member)(m), nil + } + } + return nil, nil +} + +func (c *cluster) getRemote() pb.ClusterClient { + c.mu.Lock() + defer c.mu.Unlock() + + return c.remote +} + +func (c *cluster) switchRemote(prevErr error) error { + newConn, err := c.c.retryConnection(c.conn, prevErr) + if err != nil { + return err + } + + c.mu.Lock() + defer c.mu.Unlock() + + c.conn = newConn + c.remote = pb.NewClusterClient(c.conn) + return nil +} diff --git a/Godeps/_workspace/src/github.com/coreos/etcd/clientv3/compare.go b/Godeps/_workspace/src/github.com/coreos/etcd/clientv3/compare.go new file mode 100644 index 00000000000..411f8667633 --- /dev/null +++ b/Godeps/_workspace/src/github.com/coreos/etcd/clientv3/compare.go @@ -0,0 +1,91 @@ +// Copyright 2016 CoreOS, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package clientv3 + +import ( + pb "github.com/coreos/etcd/etcdserver/etcdserverpb" +) + +type CompareTarget int +type CompareResult int + +const ( + CompareVersion CompareTarget = iota + CompareCreated + CompareModified + CompareValue +) + +type Cmp pb.Compare + +func Compare(cmp Cmp, result string, v interface{}) Cmp { + var r pb.Compare_CompareResult + + switch result { + case "=": + r = pb.Compare_EQUAL + case ">": + r = pb.Compare_GREATER + case "<": + r = pb.Compare_LESS + default: + panic("Unknown result op") + } + + cmp.Result = r + switch cmp.Target { + case pb.Compare_VALUE: + val, ok := v.(string) + if !ok { + panic("bad compare value") + } + cmp.TargetUnion = &pb.Compare_Value{Value: []byte(val)} + case pb.Compare_VERSION: + cmp.TargetUnion = &pb.Compare_Version{Version: mustInt64(v)} + case pb.Compare_CREATE: + cmp.TargetUnion = &pb.Compare_CreateRevision{CreateRevision: mustInt64(v)} + case pb.Compare_MOD: + cmp.TargetUnion = &pb.Compare_ModRevision{ModRevision: mustInt64(v)} + default: + panic("Unknown compare type") + } + return cmp +} + +func Value(key string) Cmp { + return Cmp{Key: []byte(key), Target: pb.Compare_VALUE} +} + +func Version(key string) Cmp { + return Cmp{Key: []byte(key), Target: pb.Compare_VERSION} +} + +func CreatedRevision(key string) Cmp { + return Cmp{Key: []byte(key), Target: pb.Compare_CREATE} +} + +func ModifiedRevision(key string) Cmp { + return Cmp{Key: []byte(key), Target: pb.Compare_MOD} +} + +func mustInt64(val interface{}) int64 { + if v, ok := val.(int64); ok { + return v + } + if v, ok := val.(int); ok { + return int64(v) + } + panic("bad value") +} diff --git a/Godeps/_workspace/src/github.com/coreos/etcd/clientv3/concurrency/election.go b/Godeps/_workspace/src/github.com/coreos/etcd/clientv3/concurrency/election.go new file mode 100644 index 00000000000..d74d19be4f2 --- /dev/null +++ b/Godeps/_workspace/src/github.com/coreos/etcd/clientv3/concurrency/election.go @@ -0,0 +1,183 @@ +// Copyright 2016 CoreOS, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package concurrency + +import ( + "errors" + + v3 "github.com/coreos/etcd/clientv3" + "github.com/coreos/etcd/storage/storagepb" + "golang.org/x/net/context" +) + +var ( + ErrElectionNotLeader = errors.New("election: not leader") + ErrElectionNoLeader = errors.New("election: no leader") +) + +type Election struct { + client *v3.Client + + keyPrefix string + + leaderKey string + leaderRev int64 + leaderSession *Session +} + +// NewElection returns a new election on a given key prefix. +func NewElection(client *v3.Client, pfx string) *Election { + return &Election{client: client, keyPrefix: pfx} +} + +// Campaign puts a value as eligible for the election. It blocks until +// it is elected, an error occurs, or the context is cancelled. +func (e *Election) Campaign(ctx context.Context, val string) error { + s, serr := NewSession(e.client) + if serr != nil { + return serr + } + + k, rev, err := NewUniqueKV(ctx, e.client, e.keyPrefix, val, v3.WithLease(s.Lease())) + if err == nil { + err = waitDeletes(ctx, e.client, e.keyPrefix, v3.WithPrefix(), v3.WithRev(rev-1)) + } + + if err != nil { + // clean up in case of context cancel + select { + case <-ctx.Done(): + e.client.Delete(e.client.Ctx(), k) + default: + } + return err + } + + e.leaderKey, e.leaderRev, e.leaderSession = k, rev, s + return nil +} + +// Proclaim lets the leader announce a new value without another election. +func (e *Election) Proclaim(ctx context.Context, val string) error { + if e.leaderSession == nil { + return ErrElectionNotLeader + } + cmp := v3.Compare(v3.CreatedRevision(e.leaderKey), "=", e.leaderRev) + txn := e.client.Txn(ctx).If(cmp) + txn = txn.Then(v3.OpPut(e.leaderKey, val, v3.WithLease(e.leaderSession.Lease()))) + tresp, terr := txn.Commit() + if terr != nil { + return terr + } + if !tresp.Succeeded { + e.leaderKey = "" + return ErrElectionNotLeader + } + return nil +} + +// Resign lets a leader start a new election. +func (e *Election) Resign() (err error) { + if e.leaderSession == nil { + return nil + } + _, err = e.client.Delete(e.client.Ctx(), e.leaderKey) + e.leaderKey = "" + e.leaderSession = nil + return err +} + +// Leader returns the leader value for the current election. +func (e *Election) Leader() (string, error) { + resp, err := e.client.Get(e.client.Ctx(), e.keyPrefix, v3.WithFirstCreate()...) + if err != nil { + return "", err + } else if len(resp.Kvs) == 0 { + // no leader currently elected + return "", ErrElectionNoLeader + } + return string(resp.Kvs[0].Value), nil +} + +// Observe returns a channel that observes all leader proposal values as +// GetResponse values on the current leader key. The channel closes when +// the context is cancelled or the underlying watcher is otherwise disrupted. +func (e *Election) Observe(ctx context.Context) <-chan v3.GetResponse { + retc := make(chan v3.GetResponse) + go e.observe(ctx, retc) + return retc +} + +func (e *Election) observe(ctx context.Context, ch chan<- v3.GetResponse) { + defer close(ch) + for { + resp, err := e.client.Get(ctx, e.keyPrefix, v3.WithFirstCreate()...) + if err != nil { + return + } + + var kv *storagepb.KeyValue + + cctx, cancel := context.WithCancel(ctx) + if len(resp.Kvs) == 0 { + // wait for first key put on prefix + opts := []v3.OpOption{v3.WithRev(resp.Header.Revision), v3.WithPrefix()} + wch := e.client.Watch(cctx, e.keyPrefix, opts...) + + for kv == nil { + wr, ok := <-wch + if !ok || wr.Err() != nil { + cancel() + return + } + // only accept PUTs; a DELETE will make observe() spin + for _, ev := range wr.Events { + if ev.Type == storagepb.PUT { + kv = ev.Kv + break + } + } + } + } else { + kv = resp.Kvs[0] + } + + wch := e.client.Watch(cctx, string(kv.Key), v3.WithRev(kv.ModRevision)) + keyDeleted := false + for !keyDeleted { + wr, ok := <-wch + if !ok { + return + } + for _, ev := range wr.Events { + if ev.Type == storagepb.DELETE { + keyDeleted = true + break + } + resp.Header = &wr.Header + resp.Kvs = []*storagepb.KeyValue{ev.Kv} + select { + case ch <- *resp: + case <-cctx.Done(): + return + } + } + } + cancel() + } +} + +// Key returns the leader key if elected, empty string otherwise. +func (e *Election) Key() string { return e.leaderKey } diff --git a/Godeps/_workspace/src/github.com/coreos/etcd/clientv3/concurrency/key.go b/Godeps/_workspace/src/github.com/coreos/etcd/clientv3/concurrency/key.go new file mode 100644 index 00000000000..c08a5b5eb25 --- /dev/null +++ b/Godeps/_workspace/src/github.com/coreos/etcd/clientv3/concurrency/key.go @@ -0,0 +1,102 @@ +// Copyright 2016 CoreOS, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +package concurrency + +import ( + "fmt" + "math" + "time" + + v3 "github.com/coreos/etcd/clientv3" + "github.com/coreos/etcd/storage/storagepb" + "golang.org/x/net/context" +) + +// NewUniqueKey creates a new key from a given prefix. +func NewUniqueKey(ctx context.Context, kv v3.KV, pfx string, opts ...v3.OpOption) (string, int64, error) { + return NewUniqueKV(ctx, kv, pfx, "", opts...) +} + +func NewUniqueKV(ctx context.Context, kv v3.KV, pfx, val string, opts ...v3.OpOption) (string, int64, error) { + for { + newKey := fmt.Sprintf("%s/%v", pfx, time.Now().UnixNano()) + put := v3.OpPut(newKey, val, opts...) + cmp := v3.Compare(v3.ModifiedRevision(newKey), "=", 0) + resp, err := kv.Txn(ctx).If(cmp).Then(put).Commit() + if err != nil { + return "", 0, err + } + if !resp.Succeeded { + continue + } + return newKey, resp.Header.Revision, nil + } +} + +func waitUpdate(ctx context.Context, client *v3.Client, key string, opts ...v3.OpOption) error { + cctx, cancel := context.WithCancel(ctx) + defer cancel() + wresp, ok := <-client.Watch(cctx, key, opts...) + if !ok { + return ctx.Err() + } + return wresp.Err() +} + +func waitDelete(ctx context.Context, client *v3.Client, key string, rev int64) error { + cctx, cancel := context.WithCancel(ctx) + defer cancel() + wch := client.Watch(cctx, key, v3.WithRev(rev)) + for wr := range wch { + for _, ev := range wr.Events { + if ev.Type == storagepb.DELETE { + return nil + } + } + } + if err := ctx.Err(); err != nil { + return err + } + return fmt.Errorf("lost watcher waiting for delete") +} + +// waitDeletes efficiently waits until all keys matched by Get(key, opts...) are deleted +func waitDeletes(ctx context.Context, client *v3.Client, key string, opts ...v3.OpOption) error { + getOpts := []v3.OpOption{v3.WithSort(v3.SortByCreatedRev, v3.SortAscend)} + getOpts = append(getOpts, opts...) + resp, err := client.Get(ctx, key, getOpts...) + maxRev := int64(math.MaxInt64) + getOpts = append(getOpts, v3.WithRev(0)) + for err == nil { + for len(resp.Kvs) > 0 { + i := len(resp.Kvs) - 1 + if resp.Kvs[i].CreateRevision <= maxRev { + break + } + resp.Kvs = resp.Kvs[:i] + } + if len(resp.Kvs) == 0 { + break + } + lastKV := resp.Kvs[len(resp.Kvs)-1] + maxRev = lastKV.CreateRevision + err = waitDelete(ctx, client, string(lastKV.Key), maxRev) + if err != nil || len(resp.Kvs) == 1 { + break + } + getOpts = append(getOpts, v3.WithLimit(int64(len(resp.Kvs)-1))) + resp, err = client.Get(ctx, key, getOpts...) + } + return err +} diff --git a/Godeps/_workspace/src/github.com/coreos/etcd/clientv3/concurrency/mutex.go b/Godeps/_workspace/src/github.com/coreos/etcd/clientv3/concurrency/mutex.go new file mode 100644 index 00000000000..b378058ee3a --- /dev/null +++ b/Godeps/_workspace/src/github.com/coreos/etcd/clientv3/concurrency/mutex.go @@ -0,0 +1,88 @@ +// Copyright 2016 CoreOS, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package concurrency + +import ( + "sync" + + v3 "github.com/coreos/etcd/clientv3" + "golang.org/x/net/context" +) + +// Mutex implements the sync Locker interface with etcd +type Mutex struct { + client *v3.Client + + pfx string + myKey string + myRev int64 +} + +func NewMutex(client *v3.Client, pfx string) *Mutex { + return &Mutex{client, pfx, "", -1} +} + +// Lock locks the mutex with a cancellable context. If the context is cancelled +// while trying to acquire the lock, the mutex tries to clean its stale lock entry. +func (m *Mutex) Lock(ctx context.Context) error { + s, err := NewSession(m.client) + if err != nil { + return err + } + // put self in lock waiters via myKey; oldest waiter holds lock + m.myKey, m.myRev, err = NewUniqueKey(ctx, m.client, m.pfx, v3.WithLease(s.Lease())) + // wait for deletion revisions prior to myKey + err = waitDeletes(ctx, m.client, m.pfx, v3.WithPrefix(), v3.WithRev(m.myRev-1)) + // release lock key if cancelled + select { + case <-ctx.Done(): + m.Unlock() + default: + } + return err +} + +func (m *Mutex) Unlock() error { + if _, err := m.client.Delete(m.client.Ctx(), m.myKey); err != nil { + return err + } + m.myKey = "\x00" + m.myRev = -1 + return nil +} + +func (m *Mutex) IsOwner() v3.Cmp { + return v3.Compare(v3.CreatedRevision(m.myKey), "=", m.myRev) +} + +func (m *Mutex) Key() string { return m.myKey } + +type lockerMutex struct{ *Mutex } + +func (lm *lockerMutex) Lock() { + if err := lm.Mutex.Lock(lm.client.Ctx()); err != nil { + panic(err) + } +} +func (lm *lockerMutex) Unlock() { + if err := lm.Mutex.Unlock(); err != nil { + panic(err) + } +} + +// NewLocker creates a sync.Locker backed by an etcd mutex. +func NewLocker(client *v3.Client, pfx string) sync.Locker { + return &lockerMutex{NewMutex(client, pfx)} +} diff --git a/Godeps/_workspace/src/github.com/coreos/etcd/clientv3/concurrency/session.go b/Godeps/_workspace/src/github.com/coreos/etcd/clientv3/concurrency/session.go new file mode 100644 index 00000000000..e8a68f0617d --- /dev/null +++ b/Godeps/_workspace/src/github.com/coreos/etcd/clientv3/concurrency/session.go @@ -0,0 +1,103 @@ +// Copyright 2016 CoreOS, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +package concurrency + +import ( + "sync" + + v3 "github.com/coreos/etcd/clientv3" + "golang.org/x/net/context" +) + +// only keep one ephemeral lease per client +var clientSessions clientSessionMgr = clientSessionMgr{sessions: make(map[*v3.Client]*Session)} + +const sessionTTL = 60 + +type clientSessionMgr struct { + sessions map[*v3.Client]*Session + mu sync.Mutex +} + +// Session represents a lease kept alive for the lifetime of a client. +// Fault-tolerant applications may use sessions to reason about liveness. +type Session struct { + client *v3.Client + id v3.LeaseID + + cancel context.CancelFunc + donec <-chan struct{} +} + +// NewSession gets the leased session for a client. +func NewSession(client *v3.Client) (*Session, error) { + clientSessions.mu.Lock() + defer clientSessions.mu.Unlock() + if s, ok := clientSessions.sessions[client]; ok { + return s, nil + } + + resp, err := client.Create(client.Ctx(), sessionTTL) + if err != nil { + return nil, err + } + id := v3.LeaseID(resp.ID) + + ctx, cancel := context.WithCancel(client.Ctx()) + keepAlive, err := client.KeepAlive(ctx, id) + if err != nil || keepAlive == nil { + return nil, err + } + + donec := make(chan struct{}) + s := &Session{client: client, id: id, cancel: cancel, donec: donec} + clientSessions.sessions[client] = s + + // keep the lease alive until client error or cancelled context + go func() { + defer func() { + clientSessions.mu.Lock() + delete(clientSessions.sessions, client) + clientSessions.mu.Unlock() + close(donec) + }() + for range keepAlive { + // eat messages until keep alive channel closes + } + }() + + return s, nil +} + +// Lease is the lease ID for keys bound to the session. +func (s *Session) Lease() v3.LeaseID { return s.id } + +// Done returns a channel that closes when the lease is orphaned, expires, or +// is otherwise no longer being refreshed. +func (s *Session) Done() <-chan struct{} { return s.donec } + +// Orphan ends the refresh for the session lease. This is useful +// in case the state of the client connection is indeterminate (revoke +// would fail) or when transferring lease ownership. +func (s *Session) Orphan() { + s.cancel() + <-s.donec +} + +// Close orphans the session and revokes the session lease. +func (s *Session) Close() error { + s.Orphan() + _, err := s.client.Revoke(s.client.Ctx(), s.id) + return err +} diff --git a/Godeps/_workspace/src/github.com/coreos/etcd/clientv3/concurrency/stm.go b/Godeps/_workspace/src/github.com/coreos/etcd/clientv3/concurrency/stm.go new file mode 100644 index 00000000000..b4eb5545a97 --- /dev/null +++ b/Godeps/_workspace/src/github.com/coreos/etcd/clientv3/concurrency/stm.go @@ -0,0 +1,246 @@ +// Copyright 2016 CoreOS, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package concurrency + +import ( + v3 "github.com/coreos/etcd/clientv3" + "golang.org/x/net/context" +) + +// STM is an interface for software transactional memory. +type STM interface { + // Get returns the value for a key and inserts the key in the txn's read set. + // If Get fails, it aborts the transaction with an error, never returning. + Get(key string) string + // Put adds a value for a key to the write set. + Put(key, val string, opts ...v3.OpOption) + // Rev returns the revision of a key in the read set. + Rev(key string) int64 + // Del deletes a key. + Del(key string) + + // commit attempts to apply the txn's changes to the server. + commit() *v3.TxnResponse + reset() +} + +// stmError safely passes STM errors through panic to the STM error channel. +type stmError struct{ err error } + +// NewSTMRepeatable initiates new repeatable read transaction; reads within +// the same transaction attempt always return the same data. +func NewSTMRepeatable(ctx context.Context, c *v3.Client, apply func(STM) error) (*v3.TxnResponse, error) { + s := &stm{client: c, ctx: ctx, getOpts: []v3.OpOption{v3.WithSerializable()}} + return runSTM(s, apply) +} + +// NewSTMSerializable initiates a new serialized transaction; reads within the +// same transactiona attempt return data from the revision of the first read. +func NewSTMSerializable(ctx context.Context, c *v3.Client, apply func(STM) error) (*v3.TxnResponse, error) { + s := &stmSerializable{ + stm: stm{client: c, ctx: ctx}, + prefetch: make(map[string]*v3.GetResponse), + } + return runSTM(s, apply) +} + +type stmResponse struct { + resp *v3.TxnResponse + err error +} + +func runSTM(s STM, apply func(STM) error) (*v3.TxnResponse, error) { + outc := make(chan stmResponse, 1) + go func() { + defer func() { + if r := recover(); r != nil { + e, ok := r.(stmError) + if !ok { + // client apply panicked + panic(r) + } + outc <- stmResponse{nil, e.err} + } + }() + var out stmResponse + for { + s.reset() + if out.err = apply(s); out.err != nil { + break + } + if out.resp = s.commit(); out.resp != nil { + break + } + } + outc <- out + }() + r := <-outc + return r.resp, r.err +} + +// stm implements repeatable-read software transactional memory over etcd +type stm struct { + client *v3.Client + ctx context.Context + // rset holds read key values and revisions + rset map[string]*v3.GetResponse + // wset holds overwritten keys and their values + wset map[string]stmPut + // getOpts are the opts used for gets + getOpts []v3.OpOption +} + +type stmPut struct { + val string + op v3.Op +} + +func (s *stm) Get(key string) string { + if wv, ok := s.wset[key]; ok { + return wv.val + } + return respToValue(s.fetch(key)) +} + +func (s *stm) Put(key, val string, opts ...v3.OpOption) { + s.wset[key] = stmPut{val, v3.OpPut(key, val, opts...)} +} + +func (s *stm) Del(key string) { s.wset[key] = stmPut{"", v3.OpDelete(key)} } + +func (s *stm) Rev(key string) int64 { + if resp := s.fetch(key); resp != nil && len(resp.Kvs) != 0 { + return resp.Kvs[0].ModRevision + } + return 0 +} + +func (s *stm) commit() *v3.TxnResponse { + txnresp, err := s.client.Txn(s.ctx).If(s.cmps()...).Then(s.puts()...).Commit() + if err != nil { + panic(stmError{err}) + } + if txnresp.Succeeded { + return txnresp + } + return nil +} + +// cmps guards the txn from updates to read set +func (s *stm) cmps() (cmps []v3.Cmp) { + for k, rk := range s.rset { + cmps = append(cmps, isKeyCurrent(k, rk)) + } + return +} + +func (s *stm) fetch(key string) *v3.GetResponse { + if resp, ok := s.rset[key]; ok { + return resp + } + resp, err := s.client.Get(s.ctx, key, s.getOpts...) + if err != nil { + panic(stmError{err}) + } + s.rset[key] = resp + return resp +} + +// puts is the list of ops for all pending writes +func (s *stm) puts() (puts []v3.Op) { + for _, v := range s.wset { + puts = append(puts, v.op) + } + return +} + +func (s *stm) reset() { + s.rset = make(map[string]*v3.GetResponse) + s.wset = make(map[string]stmPut) +} + +type stmSerializable struct { + stm + prefetch map[string]*v3.GetResponse +} + +func (s *stmSerializable) Get(key string) string { + if wv, ok := s.wset[key]; ok { + return wv.val + } + firstRead := len(s.rset) == 0 + if resp, ok := s.prefetch[key]; ok { + delete(s.prefetch, key) + s.rset[key] = resp + } + resp := s.stm.fetch(key) + if firstRead { + // txn's base revision is defined by the first read + s.getOpts = []v3.OpOption{ + v3.WithRev(resp.Header.Revision), + v3.WithSerializable(), + } + } + return respToValue(resp) +} + +func (s *stmSerializable) Rev(key string) int64 { + s.Get(key) + return s.stm.Rev(key) +} + +func (s *stmSerializable) gets() (keys []string, ops []v3.Op) { + for k := range s.rset { + keys = append(keys, k) + ops = append(ops, v3.OpGet(k)) + } + return +} + +func (s *stmSerializable) commit() *v3.TxnResponse { + keys, getops := s.gets() + txn := s.client.Txn(s.ctx).If(s.cmps()...).Then(s.puts()...) + // use Else to prefetch keys in case of conflict to save a round trip + txnresp, err := txn.Else(getops...).Commit() + if err != nil { + panic(stmError{err}) + } + if txnresp.Succeeded { + return txnresp + } + // load prefetch with Else data + for i := range keys { + resp := txnresp.Responses[i].GetResponseRange() + s.rset[keys[i]] = (*v3.GetResponse)(resp) + } + s.prefetch = s.rset + s.getOpts = nil + return nil +} + +func isKeyCurrent(k string, r *v3.GetResponse) v3.Cmp { + rev := r.Header.Revision + 1 + if len(r.Kvs) != 0 { + rev = r.Kvs[0].ModRevision + 1 + } + return v3.Compare(v3.ModifiedRevision(k), "<", rev) +} + +func respToValue(resp *v3.GetResponse) string { + if len(resp.Kvs) == 0 { + return "" + } + return string(resp.Kvs[0].Value) +} diff --git a/Godeps/_workspace/src/github.com/coreos/etcd/clientv3/doc.go b/Godeps/_workspace/src/github.com/coreos/etcd/clientv3/doc.go new file mode 100644 index 00000000000..abd340e588d --- /dev/null +++ b/Godeps/_workspace/src/github.com/coreos/etcd/clientv3/doc.go @@ -0,0 +1,61 @@ +// Copyright 2016 CoreOS, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// clientv3 is the official Go etcd client for v3. +// +// Create client using `clientv3.New`: +// +// cli, err := clientv3.New(clientv3.Config{ +// Endpoints: []string{"localhost:12378", "localhost:22378", "localhost:32378"}, +// DialTimeout: 5 * time.Second, +// }) +// if err != nil { +// // handle error! +// } +// defer cli.Close() +// +// Make sure to close the client after using it. If the client is not closed, the +// connection will have leaky goroutines. +// +// To specify client request timeout, pass context.WithTimeout to APIs: +// +// ctx, cancel := context.WithTimeout(context.Background(), timeout) +// resp, err := kvc.Put(ctx, "sample_key", "sample_value") +// cancel() +// if err != nil { +// // handle error! +// } +// // use the response +// +// etcd client returns 2 types of errors: +// +// 1. context error: canceled or deadline exceeded. +// 2. gRPC error: see https://github.com/coreos/etcd/blob/master/etcdserver/api/v3rpc/error.go. +// +// Here is the example code to handle client errors: +// +// resp, err := kvc.Put(ctx, "", "") +// if err != nil { +// if err == context.Canceled { +// // ctx is canceled by another routine +// } else if err == context.DeadlineExceeded { +// // ctx is attached with a deadline and it exceeded +// } else if verr, ok := err.(*v3rpc.ErrEmptyKey); ok { +// // process (verr.Errors) +// } else { +// // bad cluster endpoints, which are not etcd servers +// } +// } +// +package clientv3 diff --git a/Godeps/_workspace/src/github.com/coreos/etcd/clientv3/integration/doc.go b/Godeps/_workspace/src/github.com/coreos/etcd/clientv3/integration/doc.go new file mode 100644 index 00000000000..7e3a1ccec60 --- /dev/null +++ b/Godeps/_workspace/src/github.com/coreos/etcd/clientv3/integration/doc.go @@ -0,0 +1,17 @@ +// Copyright 2016 CoreOS, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// Package integration implements tests built upon embedded etcd, and focuses on +// correctness of etcd client. +package integration diff --git a/Godeps/_workspace/src/github.com/coreos/etcd/clientv3/kv.go b/Godeps/_workspace/src/github.com/coreos/etcd/clientv3/kv.go new file mode 100644 index 00000000000..a381026e478 --- /dev/null +++ b/Godeps/_workspace/src/github.com/coreos/etcd/clientv3/kv.go @@ -0,0 +1,203 @@ +// Copyright 2015 CoreOS, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package clientv3 + +import ( + "sync" + + pb "github.com/coreos/etcd/etcdserver/etcdserverpb" + "golang.org/x/net/context" + "google.golang.org/grpc" +) + +type ( + PutResponse pb.PutResponse + GetResponse pb.RangeResponse + DeleteResponse pb.DeleteRangeResponse + TxnResponse pb.TxnResponse +) + +type KV interface { + // Put puts a key-value pair into etcd. + // Note that key,value can be plain bytes array and string is + // an immutable representation of that bytes array. + // To get a string of bytes, do string([]byte(0x10, 0x20)). + Put(ctx context.Context, key, val string, opts ...OpOption) (*PutResponse, error) + + // Get retrieves keys. + // By default, Get will return the value for "key", if any. + // When passed WithRange(end), Get will return the keys in the range [key, end). + // When passed WithFromKey(), Get returns keys greater than or equal to key. + // When passed WithRev(rev) with rev > 0, Get retrieves keys at the given revision; + // if the required revision is compacted, the request will fail with ErrCompacted . + // When passed WithLimit(limit), the number of returned keys is bounded by limit. + // When passed WithSort(), the keys will be sorted. + Get(ctx context.Context, key string, opts ...OpOption) (*GetResponse, error) + + // Delete deletes a key, or optionally using WithRange(end), [key, end). + Delete(ctx context.Context, key string, opts ...OpOption) (*DeleteResponse, error) + + // Compact compacts etcd KV history before the given rev. + Compact(ctx context.Context, rev int64) error + + // Do applies a single Op on KV without a transaction. + // Do is useful when declaring operations to be issued at a later time + // whereas Get/Put/Delete are for better suited for when the operation + // should be immediately issued at time of declaration. + + // Do applies a single Op on KV without a transaction. + // Do is useful when creating arbitrary operations to be issued at a + // later time; the user can range over the operations, calling Do to + // execute them. Get/Put/Delete, on the other hand, are best suited + // for when the operation should be issued at the time of declaration. + Do(ctx context.Context, op Op) (OpResponse, error) + + // Txn creates a transaction. + Txn(ctx context.Context) Txn +} + +type OpResponse struct { + put *PutResponse + get *GetResponse + del *DeleteResponse +} + +type kv struct { + c *Client + + mu sync.Mutex // guards all fields + conn *grpc.ClientConn // conn in-use + remote pb.KVClient +} + +func NewKV(c *Client) KV { + conn := c.ActiveConnection() + remote := pb.NewKVClient(conn) + + return &kv{ + conn: c.ActiveConnection(), + remote: remote, + + c: c, + } +} + +func (kv *kv) Put(ctx context.Context, key, val string, opts ...OpOption) (*PutResponse, error) { + r, err := kv.Do(ctx, OpPut(key, val, opts...)) + return r.put, err +} + +func (kv *kv) Get(ctx context.Context, key string, opts ...OpOption) (*GetResponse, error) { + r, err := kv.Do(ctx, OpGet(key, opts...)) + return r.get, err +} + +func (kv *kv) Delete(ctx context.Context, key string, opts ...OpOption) (*DeleteResponse, error) { + r, err := kv.Do(ctx, OpDelete(key, opts...)) + return r.del, err +} + +func (kv *kv) Compact(ctx context.Context, rev int64) error { + r := &pb.CompactionRequest{Revision: rev} + _, err := kv.getRemote().Compact(ctx, r) + if err == nil { + return nil + } + + if isHalted(ctx, err) { + return err + } + + go kv.switchRemote(err) + return err +} + +func (kv *kv) Txn(ctx context.Context) Txn { + return &txn{ + kv: kv, + ctx: ctx, + } +} + +func (kv *kv) Do(ctx context.Context, op Op) (OpResponse, error) { + for { + var err error + switch op.t { + // TODO: handle other ops + case tRange: + var resp *pb.RangeResponse + r := &pb.RangeRequest{Key: op.key, RangeEnd: op.end, Limit: op.limit, Revision: op.rev} + if op.sort != nil { + r.SortOrder = pb.RangeRequest_SortOrder(op.sort.Order) + r.SortTarget = pb.RangeRequest_SortTarget(op.sort.Target) + } + + resp, err = kv.getRemote().Range(ctx, r) + if err == nil { + return OpResponse{get: (*GetResponse)(resp)}, nil + } + case tPut: + var resp *pb.PutResponse + r := &pb.PutRequest{Key: op.key, Value: op.val, Lease: int64(op.leaseID)} + resp, err = kv.getRemote().Put(ctx, r) + if err == nil { + return OpResponse{put: (*PutResponse)(resp)}, nil + } + case tDeleteRange: + var resp *pb.DeleteRangeResponse + r := &pb.DeleteRangeRequest{Key: op.key, RangeEnd: op.end} + resp, err = kv.getRemote().DeleteRange(ctx, r) + if err == nil { + return OpResponse{del: (*DeleteResponse)(resp)}, nil + } + default: + panic("Unknown op") + } + + if isHalted(ctx, err) { + return OpResponse{}, err + } + + // do not retry on modifications + if op.isWrite() { + go kv.switchRemote(err) + return OpResponse{}, err + } + + if nerr := kv.switchRemote(err); nerr != nil { + return OpResponse{}, nerr + } + } +} + +func (kv *kv) switchRemote(prevErr error) error { + newConn, err := kv.c.retryConnection(kv.conn, prevErr) + if err != nil { + return err + } + + kv.mu.Lock() + defer kv.mu.Unlock() + + kv.conn = newConn + kv.remote = pb.NewKVClient(kv.conn) + return nil +} + +func (kv *kv) getRemote() pb.KVClient { + kv.mu.Lock() + defer kv.mu.Unlock() + return kv.remote +} diff --git a/Godeps/_workspace/src/github.com/coreos/etcd/clientv3/lease.go b/Godeps/_workspace/src/github.com/coreos/etcd/clientv3/lease.go new file mode 100644 index 00000000000..e9a568f93a4 --- /dev/null +++ b/Godeps/_workspace/src/github.com/coreos/etcd/clientv3/lease.go @@ -0,0 +1,433 @@ +// Copyright 2016 CoreOS, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package clientv3 + +import ( + "sync" + "time" + + pb "github.com/coreos/etcd/etcdserver/etcdserverpb" + "golang.org/x/net/context" + "google.golang.org/grpc" +) + +type ( + LeaseCreateResponse pb.LeaseCreateResponse + LeaseRevokeResponse pb.LeaseRevokeResponse + LeaseKeepAliveResponse pb.LeaseKeepAliveResponse + LeaseID int64 +) + +const ( + // a small buffer to store unsent lease responses. + leaseResponseChSize = 16 + // NoLease is a lease ID for the absence of a lease. + NoLease LeaseID = 0 +) + +type Lease interface { + // Create creates a new lease. + Create(ctx context.Context, ttl int64) (*LeaseCreateResponse, error) + + // Revoke revokes the given lease. + Revoke(ctx context.Context, id LeaseID) (*LeaseRevokeResponse, error) + + // KeepAlive keeps the given lease alive forever. + KeepAlive(ctx context.Context, id LeaseID) (<-chan *LeaseKeepAliveResponse, error) + + // KeepAliveOnce renews the lease once. In most of the cases, Keepalive + // should be used instead of KeepAliveOnce. + KeepAliveOnce(ctx context.Context, id LeaseID) (*LeaseKeepAliveResponse, error) + + // Close releases all resources Lease keeps for efficient communication + // with the etcd server. + Close() error +} + +type lessor struct { + c *Client + + mu sync.Mutex // guards all fields + conn *grpc.ClientConn // conn in-use + + // donec is closed when recvKeepAliveLoop stops + donec chan struct{} + + remote pb.LeaseClient + + stream pb.Lease_LeaseKeepAliveClient + streamCancel context.CancelFunc + + stopCtx context.Context + stopCancel context.CancelFunc + + keepAlives map[LeaseID]*keepAlive +} + +// keepAlive multiplexes a keepalive for a lease over multiple channels +type keepAlive struct { + chs []chan<- *LeaseKeepAliveResponse + ctxs []context.Context + // deadline is the next time to send a keep alive message + deadline time.Time + // donec is closed on lease revoke, expiration, or cancel. + donec chan struct{} +} + +func NewLease(c *Client) Lease { + l := &lessor{ + c: c, + conn: c.ActiveConnection(), + + donec: make(chan struct{}), + keepAlives: make(map[LeaseID]*keepAlive), + } + + l.remote = pb.NewLeaseClient(l.conn) + l.stopCtx, l.stopCancel = context.WithCancel(context.Background()) + + go l.recvKeepAliveLoop() + + return l +} + +func (l *lessor) Create(ctx context.Context, ttl int64) (*LeaseCreateResponse, error) { + cctx, cancel := context.WithCancel(ctx) + done := cancelWhenStop(cancel, l.stopCtx.Done()) + defer close(done) + + for { + r := &pb.LeaseCreateRequest{TTL: ttl} + resp, err := l.getRemote().LeaseCreate(cctx, r) + if err == nil { + return (*LeaseCreateResponse)(resp), nil + } + if isHalted(cctx, err) { + return nil, err + } + if nerr := l.switchRemoteAndStream(err); nerr != nil { + return nil, nerr + } + } +} + +func (l *lessor) Revoke(ctx context.Context, id LeaseID) (*LeaseRevokeResponse, error) { + cctx, cancel := context.WithCancel(ctx) + done := cancelWhenStop(cancel, l.stopCtx.Done()) + defer close(done) + + for { + r := &pb.LeaseRevokeRequest{ID: int64(id)} + resp, err := l.getRemote().LeaseRevoke(cctx, r) + + if err == nil { + return (*LeaseRevokeResponse)(resp), nil + } + if isHalted(ctx, err) { + return nil, err + } + + if nerr := l.switchRemoteAndStream(err); nerr != nil { + return nil, nerr + } + } +} + +func (l *lessor) KeepAlive(ctx context.Context, id LeaseID) (<-chan *LeaseKeepAliveResponse, error) { + ch := make(chan *LeaseKeepAliveResponse, leaseResponseChSize) + + l.mu.Lock() + ka, ok := l.keepAlives[id] + if !ok { + // create fresh keep alive + ka = &keepAlive{ + chs: []chan<- *LeaseKeepAliveResponse{ch}, + ctxs: []context.Context{ctx}, + deadline: time.Now(), + donec: make(chan struct{}), + } + l.keepAlives[id] = ka + } else { + // add channel and context to existing keep alive + ka.ctxs = append(ka.ctxs, ctx) + ka.chs = append(ka.chs, ch) + } + l.mu.Unlock() + + go l.keepAliveCtxCloser(id, ctx, ka.donec) + + return ch, nil +} + +func (l *lessor) KeepAliveOnce(ctx context.Context, id LeaseID) (*LeaseKeepAliveResponse, error) { + cctx, cancel := context.WithCancel(ctx) + done := cancelWhenStop(cancel, l.stopCtx.Done()) + defer close(done) + + for { + resp, err := l.keepAliveOnce(cctx, id) + if err == nil { + return resp, err + } + + nerr := l.switchRemoteAndStream(err) + if nerr != nil { + return nil, nerr + } + } +} + +func (l *lessor) Close() error { + l.stopCancel() + <-l.donec + return nil +} + +func (l *lessor) keepAliveCtxCloser(id LeaseID, ctx context.Context, donec <-chan struct{}) { + select { + case <-donec: + return + case <-l.donec: + return + case <-ctx.Done(): + } + + l.mu.Lock() + defer l.mu.Unlock() + + ka, ok := l.keepAlives[id] + if !ok { + return + } + + // close channel and remove context if still associated with keep alive + for i, c := range ka.ctxs { + if c == ctx { + close(ka.chs[i]) + ka.ctxs = append(ka.ctxs[:i], ka.ctxs[i+1:]...) + ka.chs = append(ka.chs[:i], ka.chs[i+1:]...) + break + } + } + // remove if no one more listeners + if len(ka.chs) == 0 { + delete(l.keepAlives, id) + } +} + +func (l *lessor) keepAliveOnce(ctx context.Context, id LeaseID) (*LeaseKeepAliveResponse, error) { + stream, err := l.getRemote().LeaseKeepAlive(ctx) + if err != nil { + return nil, err + } + + err = stream.Send(&pb.LeaseKeepAliveRequest{ID: int64(id)}) + if err != nil { + return nil, err + } + + resp, rerr := stream.Recv() + if rerr != nil { + return nil, rerr + } + return (*LeaseKeepAliveResponse)(resp), nil +} + +func (l *lessor) recvKeepAliveLoop() { + defer func() { + l.stopCancel() + l.mu.Lock() + close(l.donec) + for _, ka := range l.keepAlives { + ka.Close() + } + l.keepAlives = make(map[LeaseID]*keepAlive) + l.mu.Unlock() + }() + + stream, serr := l.resetRecv() + for serr == nil { + resp, err := stream.Recv() + if err != nil { + if isHalted(l.stopCtx, err) { + return + } + stream, serr = l.resetRecv() + continue + } + l.recvKeepAlive(resp) + } +} + +// resetRecv opens a new lease stream and starts sending LeaseKeepAliveRequests +func (l *lessor) resetRecv() (pb.Lease_LeaseKeepAliveClient, error) { + if err := l.switchRemoteAndStream(nil); err != nil { + return nil, err + } + stream := l.getKeepAliveStream() + go l.sendKeepAliveLoop(stream) + return stream, nil +} + +// recvKeepAlive updates a lease based on its LeaseKeepAliveResponse +func (l *lessor) recvKeepAlive(resp *pb.LeaseKeepAliveResponse) { + id := LeaseID(resp.ID) + + l.mu.Lock() + defer l.mu.Unlock() + + ka, ok := l.keepAlives[id] + if !ok { + return + } + + if resp.TTL <= 0 { + // lease expired; close all keep alive channels + delete(l.keepAlives, id) + ka.Close() + return + } + + // send update to all channels + nextDeadline := time.Now().Add(1 + time.Duration(resp.TTL/3)*time.Second) + for _, ch := range ka.chs { + select { + case ch <- (*LeaseKeepAliveResponse)(resp): + ka.deadline = nextDeadline + default: + } + } +} + +// sendKeepAliveLoop sends LeaseKeepAliveRequests for the lifetime of a lease stream +func (l *lessor) sendKeepAliveLoop(stream pb.Lease_LeaseKeepAliveClient) { + for { + select { + case <-time.After(500 * time.Millisecond): + case <-l.donec: + return + case <-l.stopCtx.Done(): + return + } + + tosend := make([]LeaseID, 0) + + now := time.Now() + l.mu.Lock() + for id, ka := range l.keepAlives { + if ka.deadline.Before(now) { + tosend = append(tosend, id) + } + } + l.mu.Unlock() + + for _, id := range tosend { + r := &pb.LeaseKeepAliveRequest{ID: int64(id)} + if err := stream.Send(r); err != nil { + // TODO do something with this error? + return + } + } + } +} + +func (l *lessor) getRemote() pb.LeaseClient { + l.mu.Lock() + defer l.mu.Unlock() + return l.remote +} + +func (l *lessor) getKeepAliveStream() pb.Lease_LeaseKeepAliveClient { + l.mu.Lock() + defer l.mu.Unlock() + return l.stream +} + +func (l *lessor) switchRemoteAndStream(prevErr error) error { + l.mu.Lock() + conn := l.conn + l.mu.Unlock() + + var ( + err error + newConn *grpc.ClientConn + ) + + if prevErr != nil { + conn.Close() + newConn, err = l.c.retryConnection(conn, prevErr) + if err != nil { + return err + } + } + + l.mu.Lock() + if newConn != nil { + l.conn = newConn + } + + l.remote = pb.NewLeaseClient(l.conn) + l.mu.Unlock() + + serr := l.newStream() + if serr != nil { + return serr + } + return nil +} + +func (l *lessor) newStream() error { + sctx, cancel := context.WithCancel(l.stopCtx) + stream, err := l.getRemote().LeaseKeepAlive(sctx) + if err != nil { + cancel() + return err + } + + l.mu.Lock() + defer l.mu.Unlock() + if l.stream != nil && l.streamCancel != nil { + l.stream.CloseSend() + l.streamCancel() + } + + l.streamCancel = cancel + l.stream = stream + return nil +} + +func (ka *keepAlive) Close() { + close(ka.donec) + for _, ch := range ka.chs { + close(ch) + } +} + +// cancelWhenStop calls cancel when the given stopc fires. It returns a done chan. done +// should be closed when the work is finished. When done fires, cancelWhenStop will release +// its internal resource. +func cancelWhenStop(cancel context.CancelFunc, stopc <-chan struct{}) chan<- struct{} { + done := make(chan struct{}, 1) + + go func() { + select { + case <-stopc: + case <-done: + } + cancel() + }() + + return done +} diff --git a/Godeps/_workspace/src/github.com/coreos/etcd/clientv3/maintenance.go b/Godeps/_workspace/src/github.com/coreos/etcd/clientv3/maintenance.go new file mode 100644 index 00000000000..158293ebd6f --- /dev/null +++ b/Godeps/_workspace/src/github.com/coreos/etcd/clientv3/maintenance.go @@ -0,0 +1,52 @@ +// Copyright 2016 CoreOS, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package clientv3 + +import ( + pb "github.com/coreos/etcd/etcdserver/etcdserverpb" + "golang.org/x/net/context" +) + +type ( + DefragmentResponse pb.DefragmentResponse +) + +type Maintenance interface { + // Defragment defragments storage backend of the etcd member with given endpoint. + // Defragment is only needed when deleting a large number of keys and want to reclaim + // the resources. + // Defragment is an expensive operation. User should avoid defragmenting multiple members + // at the same time. + // To defragment multiple members in the cluster, user need to call defragment multiple + // times with different endpoints. + Defragment(ctx context.Context, endpoint string) (*DefragmentResponse, error) +} + +type maintenance struct { + c *Client +} + +func (m *maintenance) Defragment(ctx context.Context, endpoint string) (*DefragmentResponse, error) { + conn, err := m.c.Dial(endpoint) + if err != nil { + return nil, err + } + remote := pb.NewMaintenanceClient(conn) + resp, err := remote.Defragment(ctx, &pb.DefragmentRequest{}) + if err != nil { + return nil, err + } + return (*DefragmentResponse)(resp), nil +} diff --git a/Godeps/_workspace/src/github.com/coreos/etcd/clientv3/mirror/syncer.go b/Godeps/_workspace/src/github.com/coreos/etcd/clientv3/mirror/syncer.go new file mode 100644 index 00000000000..d1911800193 --- /dev/null +++ b/Godeps/_workspace/src/github.com/coreos/etcd/clientv3/mirror/syncer.go @@ -0,0 +1,109 @@ +// Copyright 2016 CoreOS, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package mirror + +import ( + "github.com/coreos/etcd/clientv3" + "golang.org/x/net/context" +) + +const ( + batchLimit = 1000 +) + +// Syncer syncs with the key-value state of an etcd cluster. +type Syncer interface { + // SyncBase syncs the base state of the key-value state. + // The key-value state are sent through the returned chan. + SyncBase(ctx context.Context) (<-chan clientv3.GetResponse, chan error) + // SyncUpdates syncs the updates of the key-value state. + // The update events are sent through the returned chan. + SyncUpdates(ctx context.Context) clientv3.WatchChan +} + +// NewSyncer creates a Syncer. +func NewSyncer(c *clientv3.Client, prefix string, rev int64) Syncer { + return &syncer{c: c, prefix: prefix, rev: rev} +} + +type syncer struct { + c *clientv3.Client + rev int64 + prefix string +} + +func (s *syncer) SyncBase(ctx context.Context) (<-chan clientv3.GetResponse, chan error) { + respchan := make(chan clientv3.GetResponse, 1024) + errchan := make(chan error, 1) + + // if rev is not specified, we will choose the most recent revision. + if s.rev == 0 { + resp, err := s.c.Get(ctx, "foo") + if err != nil { + errchan <- err + close(respchan) + close(errchan) + return respchan, errchan + } + s.rev = resp.Header.Revision + } + + go func() { + defer close(respchan) + defer close(errchan) + + var key string + + opts := []clientv3.OpOption{clientv3.WithLimit(batchLimit), clientv3.WithRev(s.rev)} + + if len(s.prefix) == 0 { + // If len(s.prefix) == 0, we will sync the entire key-value space. + // We then range from the smallest key (0x00) to the end. + opts = append(opts, clientv3.WithFromKey()) + key = "\x00" + } else { + // If len(s.prefix) != 0, we will sync key-value space with given prefix. + // We then range from the prefix to the next prefix if exists. Or we will + // range from the prefix to the end if the next prefix does not exists. + opts = append(opts, clientv3.WithPrefix()) + key = s.prefix + } + + for { + resp, err := s.c.Get(ctx, key, opts...) + if err != nil { + errchan <- err + return + } + + respchan <- (clientv3.GetResponse)(*resp) + + if !resp.More { + return + } + // move to next key + key = string(append(resp.Kvs[len(resp.Kvs)-1].Key, 0)) + } + }() + + return respchan, errchan +} + +func (s *syncer) SyncUpdates(ctx context.Context) clientv3.WatchChan { + if s.rev == 0 { + panic("unexpected revision = 0. Calling SyncUpdates before SyncBase finishes?") + } + return s.c.Watch(ctx, s.prefix, clientv3.WithPrefix(), clientv3.WithRev(s.rev+1)) +} diff --git a/Godeps/_workspace/src/github.com/coreos/etcd/clientv3/op.go b/Godeps/_workspace/src/github.com/coreos/etcd/clientv3/op.go new file mode 100644 index 00000000000..0af89dfc20e --- /dev/null +++ b/Godeps/_workspace/src/github.com/coreos/etcd/clientv3/op.go @@ -0,0 +1,240 @@ +// Copyright 2016 CoreOS, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package clientv3 + +import ( + pb "github.com/coreos/etcd/etcdserver/etcdserverpb" +) + +type opType int + +const ( + // A default Op has opType 0, which is invalid. + tRange opType = iota + 1 + tPut + tDeleteRange +) + +var ( + noPrefixEnd = []byte{0} +) + +// Op represents an Operation that kv can execute. +type Op struct { + t opType + key []byte + end []byte + + // for range + limit int64 + sort *SortOption + serializable bool + + // for range, watch + rev int64 + + // progressNotify is for progress updates. + progressNotify bool + + // for put + val []byte + leaseID LeaseID +} + +func (op Op) toRequestUnion() *pb.RequestUnion { + switch op.t { + case tRange: + r := &pb.RangeRequest{Key: op.key, RangeEnd: op.end, Limit: op.limit, Revision: op.rev} + if op.sort != nil { + r.SortOrder = pb.RangeRequest_SortOrder(op.sort.Order) + r.SortTarget = pb.RangeRequest_SortTarget(op.sort.Target) + } + return &pb.RequestUnion{Request: &pb.RequestUnion_RequestRange{RequestRange: r}} + case tPut: + r := &pb.PutRequest{Key: op.key, Value: op.val, Lease: int64(op.leaseID)} + return &pb.RequestUnion{Request: &pb.RequestUnion_RequestPut{RequestPut: r}} + case tDeleteRange: + r := &pb.DeleteRangeRequest{Key: op.key, RangeEnd: op.end} + return &pb.RequestUnion{Request: &pb.RequestUnion_RequestDeleteRange{RequestDeleteRange: r}} + default: + panic("Unknown Op") + } +} + +func (op Op) isWrite() bool { + return op.t != tRange +} + +func OpGet(key string, opts ...OpOption) Op { + ret := Op{t: tRange, key: []byte(key)} + ret.applyOpts(opts) + return ret +} + +func OpDelete(key string, opts ...OpOption) Op { + ret := Op{t: tDeleteRange, key: []byte(key)} + ret.applyOpts(opts) + switch { + case ret.leaseID != 0: + panic("unexpected lease in delete") + case ret.limit != 0: + panic("unexpected limit in delete") + case ret.rev != 0: + panic("unexpected revision in delete") + case ret.sort != nil: + panic("unexpected sort in delete") + case ret.serializable != false: + panic("unexpected serializable in delete") + } + return ret +} + +func OpPut(key, val string, opts ...OpOption) Op { + ret := Op{t: tPut, key: []byte(key), val: []byte(val)} + ret.applyOpts(opts) + switch { + case ret.end != nil: + panic("unexpected range in put") + case ret.limit != 0: + panic("unexpected limit in put") + case ret.rev != 0: + panic("unexpected revision in put") + case ret.sort != nil: + panic("unexpected sort in put") + case ret.serializable != false: + panic("unexpected serializable in delete") + } + return ret +} + +func opWatch(key string, opts ...OpOption) Op { + ret := Op{t: tRange, key: []byte(key)} + ret.applyOpts(opts) + switch { + case ret.leaseID != 0: + panic("unexpected lease in watch") + case ret.limit != 0: + panic("unexpected limit in watch") + case ret.sort != nil: + panic("unexpected sort in watch") + case ret.serializable != false: + panic("unexpected serializable in watch") + } + return ret +} + +func (op *Op) applyOpts(opts []OpOption) { + for _, opt := range opts { + opt(op) + } +} + +// OpOption configures Operations like Get, Put, Delete. +type OpOption func(*Op) + +// WithLease attaches a lease ID to a key in 'Put' request. +func WithLease(leaseID LeaseID) OpOption { + return func(op *Op) { op.leaseID = leaseID } +} + +// WithLimit limits the number of results to return from 'Get' request. +func WithLimit(n int64) OpOption { return func(op *Op) { op.limit = n } } + +// WithRev specifies the store revision for 'Get' request. +// Or the start revision of 'Watch' request. +func WithRev(rev int64) OpOption { return func(op *Op) { op.rev = rev } } + +// WithSort specifies the ordering in 'Get' request. It requires +// 'WithRange' and/or 'WithPrefix' to be specified too. +// 'target' specifies the target to sort by: key, version, revisions, value. +// 'order' can be either 'SortNone', 'SortAscend', 'SortDescend'. +func WithSort(target SortTarget, order SortOrder) OpOption { + return func(op *Op) { + op.sort = &SortOption{target, order} + } +} + +func getPrefix(key []byte) []byte { + end := make([]byte, len(key)) + copy(end, key) + for i := len(end) - 1; i >= 0; i-- { + if end[i] < 0xff { + end[i] = end[i] + 1 + end = end[:i+1] + return end + } + } + // next prefix does not exist (e.g., 0xffff); + // default to WithFromKey policy + return noPrefixEnd +} + +// WithPrefix enables 'Get', 'Delete', or 'Watch' requests to operate +// on the keys with matching prefix. For example, 'Get(foo, WithPrefix())' +// can return 'foo1', 'foo2', and so on. +func WithPrefix() OpOption { + return func(op *Op) { + op.end = getPrefix(op.key) + } +} + +// WithRange specifies the range of 'Get' or 'Delete' requests. +// For example, 'Get' requests with 'WithRange(end)' returns +// the keys in the range [key, end). +func WithRange(endKey string) OpOption { + return func(op *Op) { op.end = []byte(endKey) } +} + +// WithFromKey specifies the range of 'Get' or 'Delete' requests +// to be equal or greater than they key in the argument. +func WithFromKey() OpOption { return WithRange("\x00") } + +// WithSerializable makes 'Get' request serializable. By default, +// it's linearizable. Serializable requests are better for lower latency +// requirement. +func WithSerializable() OpOption { + return func(op *Op) { op.serializable = true } +} + +// WithFirstCreate gets the key with the oldest creation revision in the request range. +func WithFirstCreate() []OpOption { return withTop(SortByCreatedRev, SortAscend) } + +// WithLastCreate gets the key with the latest creation revision in the request range. +func WithLastCreate() []OpOption { return withTop(SortByCreatedRev, SortDescend) } + +// WithFirstKey gets the lexically first key in the request range. +func WithFirstKey() []OpOption { return withTop(SortByKey, SortAscend) } + +// WithLastKey gets the lexically last key in the request range. +func WithLastKey() []OpOption { return withTop(SortByKey, SortDescend) } + +// WithFirstRev gets the key with the oldest modification revision in the request range. +func WithFirstRev() []OpOption { return withTop(SortByModifiedRev, SortAscend) } + +// WithLastRev gets the key with the latest modification revision in the request range. +func WithLastRev() []OpOption { return withTop(SortByModifiedRev, SortDescend) } + +// withTop gets the first key over the get's prefix given a sort order +func withTop(target SortTarget, order SortOrder) []OpOption { + return []OpOption{WithPrefix(), WithSort(target, order), WithLimit(1)} +} + +// WithProgressNotify makes watch server send periodic progress updates. +// Progress updates have zero events in WatchResponse. +func WithProgressNotify() OpOption { + return func(op *Op) { + op.progressNotify = true + } +} diff --git a/Godeps/_workspace/src/github.com/coreos/etcd/clientv3/sort.go b/Godeps/_workspace/src/github.com/coreos/etcd/clientv3/sort.go new file mode 100644 index 00000000000..e5ba1e95511 --- /dev/null +++ b/Godeps/_workspace/src/github.com/coreos/etcd/clientv3/sort.go @@ -0,0 +1,37 @@ +// Copyright 2016 CoreOS, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package clientv3 + +type SortTarget int +type SortOrder int + +const ( + SortNone SortOrder = iota + SortAscend + SortDescend +) + +const ( + SortByKey SortTarget = iota + SortByVersion + SortByCreatedRev + SortByModifiedRev + SortByValue +) + +type SortOption struct { + Target SortTarget + Order SortOrder +} diff --git a/Godeps/_workspace/src/github.com/coreos/etcd/clientv3/txn.go b/Godeps/_workspace/src/github.com/coreos/etcd/clientv3/txn.go new file mode 100644 index 00000000000..f664ee63996 --- /dev/null +++ b/Godeps/_workspace/src/github.com/coreos/etcd/clientv3/txn.go @@ -0,0 +1,160 @@ +// Copyright 2016 CoreOS, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package clientv3 + +import ( + "sync" + + pb "github.com/coreos/etcd/etcdserver/etcdserverpb" + "golang.org/x/net/context" +) + +// +// Tx.If( +// Compare(Value(k1), ">", v1), +// Compare(Version(k1), "=", 2) +// ).Then( +// OpPut(k2,v2), OpPut(k3,v3) +// ).Else( +// OpPut(k4,v4), OpPut(k5,v5) +// ).Commit() +type Txn interface { + // If takes a list of comparison. If all comparisons passed in succeed, + // the operations passed into Then() will be executed. Or the operations + // passed into Else() will be executed. + If(cs ...Cmp) Txn + + // Then takes a list of operations. The Ops list will be executed, if the + // comparisons passed in If() succeed. + Then(ops ...Op) Txn + + // Else takes a list of operations. The Ops list will be executed, if the + // comparisons passed in If() fail. + Else(ops ...Op) Txn + + // Commit tries to commit the transaction. + Commit() (*TxnResponse, error) + + // TODO: add a Do for shortcut the txn without any condition? +} + +type txn struct { + kv *kv + ctx context.Context + + mu sync.Mutex + cif bool + cthen bool + celse bool + + isWrite bool + + cmps []*pb.Compare + + sus []*pb.RequestUnion + fas []*pb.RequestUnion +} + +func (txn *txn) If(cs ...Cmp) Txn { + txn.mu.Lock() + defer txn.mu.Unlock() + + if txn.cif { + panic("cannot call If twice!") + } + + if txn.cthen { + panic("cannot call If after Then!") + } + + if txn.celse { + panic("cannot call If after Else!") + } + + txn.cif = true + + for i := range cs { + txn.cmps = append(txn.cmps, (*pb.Compare)(&cs[i])) + } + + return txn +} + +func (txn *txn) Then(ops ...Op) Txn { + txn.mu.Lock() + defer txn.mu.Unlock() + + if txn.cthen { + panic("cannot call Then twice!") + } + if txn.celse { + panic("cannot call Then after Else!") + } + + txn.cthen = true + + for _, op := range ops { + txn.isWrite = txn.isWrite || op.isWrite() + txn.sus = append(txn.sus, op.toRequestUnion()) + } + + return txn +} + +func (txn *txn) Else(ops ...Op) Txn { + txn.mu.Lock() + defer txn.mu.Unlock() + + if txn.celse { + panic("cannot call Else twice!") + } + + txn.celse = true + + for _, op := range ops { + txn.isWrite = txn.isWrite || op.isWrite() + txn.fas = append(txn.fas, op.toRequestUnion()) + } + + return txn +} + +func (txn *txn) Commit() (*TxnResponse, error) { + txn.mu.Lock() + defer txn.mu.Unlock() + + kv := txn.kv + + for { + r := &pb.TxnRequest{Compare: txn.cmps, Success: txn.sus, Failure: txn.fas} + resp, err := kv.getRemote().Txn(txn.ctx, r) + if err == nil { + return (*TxnResponse)(resp), nil + } + + if isHalted(txn.ctx, err) { + return nil, err + } + + if txn.isWrite { + go kv.switchRemote(err) + return nil, err + } + + if nerr := kv.switchRemote(err); nerr != nil { + return nil, nerr + } + } +} diff --git a/Godeps/_workspace/src/github.com/coreos/etcd/clientv3/watch.go b/Godeps/_workspace/src/github.com/coreos/etcd/clientv3/watch.go new file mode 100644 index 00000000000..17b1bc9d1d3 --- /dev/null +++ b/Godeps/_workspace/src/github.com/coreos/etcd/clientv3/watch.go @@ -0,0 +1,543 @@ +// Copyright 2016 CoreOS, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package clientv3 + +import ( + "fmt" + "sync" + + v3rpc "github.com/coreos/etcd/etcdserver/api/v3rpc/rpctypes" + pb "github.com/coreos/etcd/etcdserver/etcdserverpb" + storagepb "github.com/coreos/etcd/storage/storagepb" + "golang.org/x/net/context" + "google.golang.org/grpc" +) + +type WatchChan <-chan WatchResponse + +type Watcher interface { + // Watch watches on a key or prefix. The watched events will be returned + // through the returned channel. + // If the watch is slow or the required rev is compacted, the watch request + // might be canceled from the server-side and the chan will be closed. + // 'opts' can be: 'WithRev' and/or 'WitchPrefix'. + Watch(ctx context.Context, key string, opts ...OpOption) WatchChan + + // Close closes the watcher and cancels all watch requests. + Close() error +} + +type WatchResponse struct { + Header pb.ResponseHeader + Events []*storagepb.Event + + // CompactRevision is the minimum revision the watcher may receive. + CompactRevision int64 + + // Canceled is used to indicate watch failure. + // If the watch failed and the stream was about to close, before the channel is closed, + // the channel sends a final response that has Canceled set to true with a non-nil Err(). + Canceled bool +} + +// Err is the error value if this WatchResponse holds an error. +func (wr *WatchResponse) Err() error { + if wr.CompactRevision != 0 { + return v3rpc.ErrCompacted + } + if wr.Canceled { + return v3rpc.ErrFutureRev + } + return nil +} + +// IsProgressNotify returns true if the WatchResponse is progress notification. +func (wr *WatchResponse) IsProgressNotify() bool { + return len(wr.Events) == 0 && !wr.Canceled +} + +// watcher implements the Watcher interface +type watcher struct { + c *Client + conn *grpc.ClientConn + remote pb.WatchClient + + // ctx controls internal remote.Watch requests + ctx context.Context + cancel context.CancelFunc + + // streams holds all active watchers + streams map[int64]*watcherStream + // mu protects the streams map + mu sync.RWMutex + + // reqc sends a watch request from Watch() to the main goroutine + reqc chan *watchRequest + // respc receives data from the watch client + respc chan *pb.WatchResponse + // stopc is sent to the main goroutine to stop all processing + stopc chan struct{} + // donec closes to broadcast shutdown + donec chan struct{} + // errc transmits errors from grpc Recv + errc chan error +} + +// watchRequest is issued by the subscriber to start a new watcher +type watchRequest struct { + ctx context.Context + key string + end string + rev int64 + // progressNotify is for progress updates. + progressNotify bool + // retc receives a chan WatchResponse once the watcher is established + retc chan chan WatchResponse +} + +// watcherStream represents a registered watcher +type watcherStream struct { + initReq watchRequest + + // outc publishes watch responses to subscriber + outc chan<- WatchResponse + // recvc buffers watch responses before publishing + recvc chan *WatchResponse + id int64 + + // lastRev is revision last successfully sent over outc + lastRev int64 + // resumec indicates the stream must recover at a given revision + resumec chan int64 +} + +func NewWatcher(c *Client) Watcher { + ctx, cancel := context.WithCancel(context.Background()) + conn := c.ActiveConnection() + + w := &watcher{ + c: c, + conn: conn, + remote: pb.NewWatchClient(conn), + + ctx: ctx, + cancel: cancel, + streams: make(map[int64]*watcherStream), + + respc: make(chan *pb.WatchResponse), + reqc: make(chan *watchRequest), + stopc: make(chan struct{}), + donec: make(chan struct{}), + errc: make(chan error, 1), + } + go w.run() + return w +} + +// Watch posts a watch request to run() and waits for a new watcher channel +func (w *watcher) Watch(ctx context.Context, key string, opts ...OpOption) WatchChan { + ow := opWatch(key, opts...) + + retc := make(chan chan WatchResponse, 1) + wr := &watchRequest{ + ctx: ctx, + key: string(ow.key), + end: string(ow.end), + rev: ow.rev, + progressNotify: ow.progressNotify, + retc: retc, + } + + ok := false + + // submit request + select { + case w.reqc <- wr: + ok = true + case <-wr.ctx.Done(): + case <-w.donec: + } + + // receive channel + if ok { + select { + case ret := <-retc: + return ret + case <-ctx.Done(): + case <-w.donec: + } + } + + // couldn't create channel; return closed channel + ch := make(chan WatchResponse) + close(ch) + return ch +} + +func (w *watcher) Close() error { + select { + case w.stopc <- struct{}{}: + case <-w.donec: + } + <-w.donec + return <-w.errc +} + +func (w *watcher) addStream(resp *pb.WatchResponse, pendingReq *watchRequest) { + if pendingReq == nil { + // no pending request; ignore + return + } + if resp.Canceled || resp.CompactRevision != 0 { + // a cancel at id creation time means the start revision has + // been compacted out of the store + ret := make(chan WatchResponse, 1) + ret <- WatchResponse{ + Header: *resp.Header, + CompactRevision: resp.CompactRevision, + Canceled: true} + close(ret) + pendingReq.retc <- ret + return + } + + ret := make(chan WatchResponse) + if resp.WatchId == -1 { + // failed; no channel + close(ret) + pendingReq.retc <- ret + return + } + + ws := &watcherStream{ + initReq: *pendingReq, + id: resp.WatchId, + outc: ret, + // buffered so unlikely to block on sending while holding mu + recvc: make(chan *WatchResponse, 4), + resumec: make(chan int64), + } + + if pendingReq.rev == 0 { + // note the header revision so that a put following a current watcher + // disconnect will arrive on the watcher channel after reconnect + ws.initReq.rev = resp.Header.Revision + } + + w.mu.Lock() + w.streams[ws.id] = ws + w.mu.Unlock() + + // send messages to subscriber + go w.serveStream(ws) + + // pass back the subscriber channel for the watcher + pendingReq.retc <- ret +} + +// closeStream closes the watcher resources and removes it +func (w *watcher) closeStream(ws *watcherStream) { + // cancels request stream; subscriber receives nil channel + close(ws.initReq.retc) + // close subscriber's channel + close(ws.outc) + // shutdown serveStream + close(ws.recvc) + delete(w.streams, ws.id) +} + +// run is the root of the goroutines for managing a watcher client +func (w *watcher) run() { + defer func() { + close(w.donec) + w.cancel() + }() + + // start a stream with the etcd grpc server + wc, wcerr := w.newWatchClient() + if wcerr != nil { + w.errc <- wcerr + return + } + + var pendingReq, failedReq *watchRequest + curReqC := w.reqc + cancelSet := make(map[int64]struct{}) + + for { + select { + // Watch() requested + case pendingReq = <-curReqC: + // no more watch requests until there's a response + curReqC = nil + if err := wc.Send(pendingReq.toPB()); err == nil { + // pendingReq now waits on w.respc + break + } + failedReq = pendingReq + // New events from the watch client + case pbresp := <-w.respc: + switch { + case pbresp.Created: + // response to pending req, try to add + w.addStream(pbresp, pendingReq) + pendingReq = nil + curReqC = w.reqc + case pbresp.Canceled: + delete(cancelSet, pbresp.WatchId) + default: + // dispatch to appropriate watch stream + if ok := w.dispatchEvent(pbresp); ok { + break + } + // watch response on unexpected watch id; cancel id + if _, ok := cancelSet[pbresp.WatchId]; ok { + break + } + cancelSet[pbresp.WatchId] = struct{}{} + cr := &pb.WatchRequest_CancelRequest{ + CancelRequest: &pb.WatchCancelRequest{ + WatchId: pbresp.WatchId, + }, + } + req := &pb.WatchRequest{RequestUnion: cr} + wc.Send(req) + } + // watch client failed to recv; spawn another if possible + // TODO report watch client errors from errc? + case <-w.errc: + if wc, wcerr = w.newWatchClient(); wcerr != nil { + w.errc <- wcerr + return + } + curReqC = w.reqc + if pendingReq != nil { + failedReq = pendingReq + } + cancelSet = make(map[int64]struct{}) + case <-w.stopc: + w.errc <- nil + return + } + + // send failed; queue for retry + if failedReq != nil { + go func(wr *watchRequest) { + select { + case w.reqc <- wr: + case <-wr.ctx.Done(): + case <-w.donec: + } + }(pendingReq) + failedReq = nil + pendingReq = nil + } + } +} + +// dispatchEvent sends a WatchResponse to the appropriate watcher stream +func (w *watcher) dispatchEvent(pbresp *pb.WatchResponse) bool { + w.mu.RLock() + defer w.mu.RUnlock() + ws, ok := w.streams[pbresp.WatchId] + if ok { + wr := &WatchResponse{ + Header: *pbresp.Header, + Events: pbresp.Events, + CompactRevision: pbresp.CompactRevision, + Canceled: pbresp.Canceled} + ws.recvc <- wr + } + return ok +} + +// serveWatchClient forwards messages from the grpc stream to run() +func (w *watcher) serveWatchClient(wc pb.Watch_WatchClient) { + for { + resp, err := wc.Recv() + if err != nil { + select { + case w.errc <- err: + case <-w.donec: + } + return + } + select { + case w.respc <- resp: + case <-w.donec: + return + } + } +} + +// serveStream forwards watch responses from run() to the subscriber +func (w *watcher) serveStream(ws *watcherStream) { + emptyWr := &WatchResponse{} + wrs := []*WatchResponse{} + resuming := false + closing := false + for !closing { + curWr := emptyWr + outc := ws.outc + if len(wrs) > 0 { + curWr = wrs[0] + } else { + outc = nil + } + select { + case outc <- *curWr: + if wrs[0].Err() != nil { + closing = true + break + } + var newRev int64 + if len(wrs[0].Events) > 0 { + newRev = wrs[0].Events[len(wrs[0].Events)-1].Kv.ModRevision + } else { + newRev = wrs[0].Header.Revision + } + if newRev != ws.lastRev { + ws.lastRev = newRev + } + wrs[0] = nil + wrs = wrs[1:] + case wr, ok := <-ws.recvc: + if !ok { + // shutdown from closeStream + return + } + // resume up to last seen event if disconnected + if resuming { + resuming = false + // trim events already seen + for i := 0; i < len(wr.Events); i++ { + if wr.Events[i].Kv.ModRevision > ws.lastRev { + wr.Events = wr.Events[i:] + break + } + } + // only forward new events + if wr.Events[0].Kv.ModRevision == ws.lastRev { + break + } + } + // TODO don't keep buffering if subscriber stops reading + wrs = append(wrs, wr) + case resumeRev := <-ws.resumec: + if resumeRev != ws.lastRev { + panic("unexpected resume revision") + } + wrs = nil + resuming = true + case <-w.donec: + closing = true + case <-ws.initReq.ctx.Done(): + closing = true + } + } + w.mu.Lock() + w.closeStream(ws) + w.mu.Unlock() + // lazily send cancel message if events on missing id +} + +func (w *watcher) newWatchClient() (pb.Watch_WatchClient, error) { + ws, rerr := w.resume() + if rerr != nil { + return nil, rerr + } + go w.serveWatchClient(ws) + return ws, nil +} + +// resume creates a new WatchClient with all current watchers reestablished +func (w *watcher) resume() (ws pb.Watch_WatchClient, err error) { + for { + if ws, err = w.openWatchClient(); err != nil { + break + } else if err = w.resumeWatchers(ws); err == nil { + break + } + } + return ws, err +} + +// openWatchClient retries opening a watchclient until retryConnection fails +func (w *watcher) openWatchClient() (ws pb.Watch_WatchClient, err error) { + for { + if ws, err = w.remote.Watch(w.ctx); ws != nil { + break + } else if isHalted(w.ctx, err) { + return nil, err + } + newConn, nerr := w.c.retryConnection(w.conn, nil) + if nerr != nil { + return nil, nerr + } + w.conn = newConn + w.remote = pb.NewWatchClient(w.conn) + } + return ws, nil +} + +// resumeWatchers rebuilds every registered watcher on a new client +func (w *watcher) resumeWatchers(wc pb.Watch_WatchClient) error { + streams := []*watcherStream{} + w.mu.RLock() + for _, ws := range w.streams { + streams = append(streams, ws) + } + w.mu.RUnlock() + + for _, ws := range streams { + // reconstruct watcher from initial request + if ws.lastRev != 0 { + ws.initReq.rev = ws.lastRev + } + if err := wc.Send(ws.initReq.toPB()); err != nil { + return err + } + + // wait for request ack + resp, err := wc.Recv() + if err != nil { + return err + } else if len(resp.Events) != 0 || resp.Created != true { + return fmt.Errorf("watcher: unexpected response (%+v)", resp) + } + + // id may be different since new remote watcher; update map + w.mu.Lock() + delete(w.streams, ws.id) + ws.id = resp.WatchId + w.streams[ws.id] = ws + w.mu.Unlock() + + ws.resumec <- ws.lastRev + } + return nil +} + +// toPB converts an internal watch request structure to its protobuf messagefunc (wr *watchRequest) +func (wr *watchRequest) toPB() *pb.WatchRequest { + req := &pb.WatchCreateRequest{ + StartRevision: wr.rev, + Key: []byte(wr.key), + RangeEnd: []byte(wr.end), + ProgressNotify: wr.progressNotify, + } + cr := &pb.WatchRequest_CreateRequest{CreateRequest: req} + return &pb.WatchRequest{RequestUnion: cr} +} diff --git a/Godeps/_workspace/src/github.com/coreos/etcd/integration/cluster.go b/Godeps/_workspace/src/github.com/coreos/etcd/integration/cluster.go new file mode 100644 index 00000000000..31a437bfba9 --- /dev/null +++ b/Godeps/_workspace/src/github.com/coreos/etcd/integration/cluster.go @@ -0,0 +1,764 @@ +// Copyright 2016 CoreOS, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License.package recipe +package integration + +import ( + "fmt" + "io/ioutil" + "math/rand" + "net" + "net/http" + "net/http/httptest" + "os" + "reflect" + "sort" + "strconv" + "strings" + "sync/atomic" + "testing" + "time" + + "golang.org/x/net/context" + "google.golang.org/grpc" + + "github.com/coreos/etcd/client" + "github.com/coreos/etcd/clientv3" + "github.com/coreos/etcd/etcdserver" + "github.com/coreos/etcd/etcdserver/api/v3rpc" + "github.com/coreos/etcd/etcdserver/etcdhttp" + pb "github.com/coreos/etcd/etcdserver/etcdserverpb" + "github.com/coreos/etcd/pkg/testutil" + "github.com/coreos/etcd/pkg/transport" + "github.com/coreos/etcd/pkg/types" + "github.com/coreos/etcd/rafthttp" +) + +const ( + tickDuration = 10 * time.Millisecond + clusterName = "etcd" + requestTimeout = 20 * time.Second +) + +var ( + electionTicks = 10 + + // integration test uses well-known ports to listen for each running member, + // which ensures restarted member could listen on specific port again. + nextListenPort int64 = 20000 + + testTLSInfo = transport.TLSInfo{ + KeyFile: "./fixtures/server.key.insecure", + CertFile: "./fixtures/server.crt", + TrustedCAFile: "./fixtures/ca.crt", + ClientCertAuth: true, + } +) + +type ClusterConfig struct { + Size int + PeerTLS *transport.TLSInfo + ClientTLS *transport.TLSInfo + DiscoveryURL string + UseV3 bool + UseGRPC bool +} + +type cluster struct { + cfg *ClusterConfig + Members []*member +} + +func (c *cluster) fillClusterForMembers() error { + if c.cfg.DiscoveryURL != "" { + // cluster will be discovered + return nil + } + + addrs := make([]string, 0) + for _, m := range c.Members { + scheme := "http" + if m.PeerTLSInfo != nil { + scheme = "https" + } + for _, l := range m.PeerListeners { + addrs = append(addrs, fmt.Sprintf("%s=%s://%s", m.Name, scheme, l.Addr().String())) + } + } + clusterStr := strings.Join(addrs, ",") + var err error + for _, m := range c.Members { + m.InitialPeerURLsMap, err = types.NewURLsMap(clusterStr) + if err != nil { + return err + } + } + return nil +} + +func newCluster(t *testing.T, cfg *ClusterConfig) *cluster { + c := &cluster{cfg: cfg} + ms := make([]*member, cfg.Size) + for i := 0; i < cfg.Size; i++ { + ms[i] = c.mustNewMember(t) + } + c.Members = ms + if err := c.fillClusterForMembers(); err != nil { + t.Fatal(err) + } + + return c +} + +// NewCluster returns an unlaunched cluster of the given size which has been +// set to use static bootstrap. +func NewCluster(t *testing.T, size int) *cluster { + return newCluster(t, &ClusterConfig{Size: size}) +} + +// NewClusterByConfig returns an unlaunched cluster defined by a cluster configuration +func NewClusterByConfig(t *testing.T, cfg *ClusterConfig) *cluster { + return newCluster(t, cfg) +} + +func (c *cluster) Launch(t *testing.T) { + errc := make(chan error) + for _, m := range c.Members { + // Members are launched in separate goroutines because if they boot + // using discovery url, they have to wait for others to register to continue. + go func(m *member) { + errc <- m.Launch() + }(m) + } + for range c.Members { + if err := <-errc; err != nil { + t.Fatalf("error setting up member: %v", err) + } + } + // wait cluster to be stable to receive future client requests + c.waitMembersMatch(t, c.HTTPMembers()) + c.waitVersion() +} + +func (c *cluster) URL(i int) string { + return c.Members[i].ClientURLs[0].String() +} + +// URLs returns a list of all active client URLs in the cluster +func (c *cluster) URLs() []string { + urls := make([]string, 0) + for _, m := range c.Members { + select { + case <-m.s.StopNotify(): + continue + default: + } + for _, u := range m.ClientURLs { + urls = append(urls, u.String()) + } + } + return urls +} + +// HTTPMembers returns a list of all active members as client.Members +func (c *cluster) HTTPMembers() []client.Member { + ms := []client.Member{} + for _, m := range c.Members { + pScheme, cScheme := "http", "http" + if m.PeerTLSInfo != nil { + pScheme = "https" + } + if m.ClientTLSInfo != nil { + cScheme = "https" + } + cm := client.Member{Name: m.Name} + for _, ln := range m.PeerListeners { + cm.PeerURLs = append(cm.PeerURLs, pScheme+"://"+ln.Addr().String()) + } + for _, ln := range m.ClientListeners { + cm.ClientURLs = append(cm.ClientURLs, cScheme+"://"+ln.Addr().String()) + } + ms = append(ms, cm) + } + return ms +} + +func (c *cluster) mustNewMember(t *testing.T) *member { + name := c.name(rand.Int()) + m := mustNewMember(t, name, c.cfg.PeerTLS, c.cfg.ClientTLS) + m.DiscoveryURL = c.cfg.DiscoveryURL + m.V3demo = c.cfg.UseV3 + if c.cfg.UseGRPC { + if err := m.listenGRPC(); err != nil { + t.Fatal(err) + } + } + return m +} + +func (c *cluster) addMember(t *testing.T) { + m := c.mustNewMember(t) + + scheme := "http" + if c.cfg.PeerTLS != nil { + scheme = "https" + } + + // send add request to the cluster + var err error + for i := 0; i < len(c.Members); i++ { + clientURL := c.URL(i) + peerURL := scheme + "://" + m.PeerListeners[0].Addr().String() + if err = c.addMemberByURL(t, clientURL, peerURL); err == nil { + break + } + } + if err != nil { + t.Fatalf("add member failed on all members error: %v", err) + } + + m.InitialPeerURLsMap = types.URLsMap{} + for _, mm := range c.Members { + m.InitialPeerURLsMap[mm.Name] = mm.PeerURLs + } + m.InitialPeerURLsMap[m.Name] = m.PeerURLs + m.NewCluster = false + if err := m.Launch(); err != nil { + t.Fatal(err) + } + c.Members = append(c.Members, m) + // wait cluster to be stable to receive future client requests + c.waitMembersMatch(t, c.HTTPMembers()) +} + +func (c *cluster) addMemberByURL(t *testing.T, clientURL, peerURL string) error { + cc := mustNewHTTPClient(t, []string{clientURL}, c.cfg.ClientTLS) + ma := client.NewMembersAPI(cc) + ctx, cancel := context.WithTimeout(context.Background(), requestTimeout) + if _, err := ma.Add(ctx, peerURL); err != nil { + return err + } + cancel() + + // wait for the add node entry applied in the cluster + members := append(c.HTTPMembers(), client.Member{PeerURLs: []string{peerURL}, ClientURLs: []string{}}) + c.waitMembersMatch(t, members) + return nil +} + +func (c *cluster) AddMember(t *testing.T) { + c.addMember(t) +} + +func (c *cluster) RemoveMember(t *testing.T, id uint64) { + // send remove request to the cluster + cc := mustNewHTTPClient(t, c.URLs(), c.cfg.ClientTLS) + ma := client.NewMembersAPI(cc) + ctx, cancel := context.WithTimeout(context.Background(), requestTimeout) + if err := ma.Remove(ctx, types.ID(id).String()); err != nil { + t.Fatalf("unexpected remove error %v", err) + } + cancel() + newMembers := make([]*member, 0) + for _, m := range c.Members { + if uint64(m.s.ID()) != id { + newMembers = append(newMembers, m) + } else { + select { + case <-m.s.StopNotify(): + m.Terminate(t) + // 1s stop delay + election timeout + 1s disk and network delay + connection write timeout + // TODO: remove connection write timeout by selecting on http response closeNotifier + // blocking on https://github.com/golang/go/issues/9524 + case <-time.After(time.Second + time.Duration(electionTicks)*tickDuration + time.Second + rafthttp.ConnWriteTimeout): + t.Fatalf("failed to remove member %s in time", m.s.ID()) + } + } + } + c.Members = newMembers + c.waitMembersMatch(t, c.HTTPMembers()) +} + +func (c *cluster) Terminate(t *testing.T) { + for _, m := range c.Members { + m.Terminate(t) + } +} + +func (c *cluster) waitMembersMatch(t *testing.T, membs []client.Member) { + for _, u := range c.URLs() { + cc := mustNewHTTPClient(t, []string{u}, c.cfg.ClientTLS) + ma := client.NewMembersAPI(cc) + for { + ctx, cancel := context.WithTimeout(context.Background(), requestTimeout) + ms, err := ma.List(ctx) + cancel() + if err == nil && isMembersEqual(ms, membs) { + break + } + time.Sleep(tickDuration) + } + } + return +} + +func (c *cluster) waitLeader(t *testing.T, membs []*member) int { + possibleLead := make(map[uint64]bool) + var lead uint64 + for _, m := range membs { + possibleLead[uint64(m.s.ID())] = true + } + + for lead == 0 || !possibleLead[lead] { + lead = 0 + for _, m := range membs { + select { + case <-m.s.StopNotify(): + continue + default: + } + if lead != 0 && lead != m.s.Lead() { + lead = 0 + break + } + lead = m.s.Lead() + } + time.Sleep(10 * tickDuration) + } + + for i, m := range membs { + if uint64(m.s.ID()) == lead { + return i + } + } + + return -1 +} + +func (c *cluster) waitVersion() { + for _, m := range c.Members { + for { + if m.s.ClusterVersion() != nil { + break + } + time.Sleep(tickDuration) + } + } +} + +func (c *cluster) name(i int) string { + return fmt.Sprint("node", i) +} + +// isMembersEqual checks whether two members equal except ID field. +// The given wmembs should always set ID field to empty string. +func isMembersEqual(membs []client.Member, wmembs []client.Member) bool { + sort.Sort(SortableMemberSliceByPeerURLs(membs)) + sort.Sort(SortableMemberSliceByPeerURLs(wmembs)) + for i := range membs { + membs[i].ID = "" + } + return reflect.DeepEqual(membs, wmembs) +} + +func newLocalListener(t *testing.T) net.Listener { + port := atomic.AddInt64(&nextListenPort, 1) + l, err := net.Listen("tcp", "127.0.0.1:"+strconv.FormatInt(port, 10)) + if err != nil { + t.Fatal(err) + } + return l +} + +func newListenerWithAddr(t *testing.T, addr string) net.Listener { + var err error + var l net.Listener + // TODO: we want to reuse a previous closed port immediately. + // a better way is to set SO_REUSExx instead of doing retry. + for i := 0; i < 5; i++ { + l, err = net.Listen("tcp", addr) + if err == nil { + break + } + time.Sleep(500 * time.Millisecond) + } + if err != nil { + t.Fatal(err) + } + return l +} + +type member struct { + etcdserver.ServerConfig + PeerListeners, ClientListeners []net.Listener + grpcListener net.Listener + // PeerTLSInfo enables peer TLS when set + PeerTLSInfo *transport.TLSInfo + // ClientTLSInfo enables client TLS when set + ClientTLSInfo *transport.TLSInfo + + raftHandler *testutil.PauseableHandler + s *etcdserver.EtcdServer + hss []*httptest.Server + + grpcServer *grpc.Server + grpcAddr string +} + +// mustNewMember return an inited member with the given name. If peerTLS is +// set, it will use https scheme to communicate between peers. +func mustNewMember(t *testing.T, name string, peerTLS *transport.TLSInfo, clientTLS *transport.TLSInfo) *member { + var err error + m := &member{} + + peerScheme, clientScheme := "http", "http" + if peerTLS != nil { + peerScheme = "https" + } + if clientTLS != nil { + clientScheme = "https" + } + + pln := newLocalListener(t) + m.PeerListeners = []net.Listener{pln} + m.PeerURLs, err = types.NewURLs([]string{peerScheme + "://" + pln.Addr().String()}) + if err != nil { + t.Fatal(err) + } + m.PeerTLSInfo = peerTLS + + cln := newLocalListener(t) + m.ClientListeners = []net.Listener{cln} + m.ClientURLs, err = types.NewURLs([]string{clientScheme + "://" + cln.Addr().String()}) + if err != nil { + t.Fatal(err) + } + m.ClientTLSInfo = clientTLS + + m.Name = name + + m.DataDir, err = ioutil.TempDir(os.TempDir(), "etcd") + if err != nil { + t.Fatal(err) + } + clusterStr := fmt.Sprintf("%s=%s://%s", name, peerScheme, pln.Addr().String()) + m.InitialPeerURLsMap, err = types.NewURLsMap(clusterStr) + if err != nil { + t.Fatal(err) + } + m.InitialClusterToken = clusterName + m.NewCluster = true + m.BootstrapTimeout = 10 * time.Millisecond + if m.PeerTLSInfo != nil { + m.ServerConfig.PeerTLSInfo = *m.PeerTLSInfo + } + m.ElectionTicks = electionTicks + m.TickMs = uint(tickDuration / time.Millisecond) + return m +} + +// listenGRPC starts a grpc server over a unix domain socket on the member +func (m *member) listenGRPC() error { + if m.V3demo == false { + return fmt.Errorf("starting grpc server without v3 configured") + } + // prefix with localhost so cert has right domain + m.grpcAddr = "localhost:" + m.Name + ".sock" + if err := os.RemoveAll(m.grpcAddr); err != nil { + return err + } + l, err := net.Listen("unix", m.grpcAddr) + if err != nil { + return fmt.Errorf("listen failed on grpc socket %s (%v)", m.grpcAddr, err) + } + m.grpcAddr = "unix://" + m.grpcAddr + m.grpcListener = l + return nil +} + +// NewClientV3 creates a new grpc client connection to the member +func NewClientV3(m *member) (*clientv3.Client, error) { + if m.grpcAddr == "" { + return nil, fmt.Errorf("member not configured for grpc") + } + + cfg := clientv3.Config{ + Endpoints: []string{m.grpcAddr}, + DialTimeout: 5 * time.Second, + } + + if m.ClientTLSInfo != nil { + tls, err := m.ClientTLSInfo.ClientConfig() + if err != nil { + return nil, err + } + cfg.TLS = tls + } + return clientv3.New(cfg) +} + +// Clone returns a member with the same server configuration. The returned +// member will not set PeerListeners and ClientListeners. +func (m *member) Clone(t *testing.T) *member { + mm := &member{} + mm.ServerConfig = m.ServerConfig + + var err error + clientURLStrs := m.ClientURLs.StringSlice() + mm.ClientURLs, err = types.NewURLs(clientURLStrs) + if err != nil { + // this should never fail + panic(err) + } + peerURLStrs := m.PeerURLs.StringSlice() + mm.PeerURLs, err = types.NewURLs(peerURLStrs) + if err != nil { + // this should never fail + panic(err) + } + clusterStr := m.InitialPeerURLsMap.String() + mm.InitialPeerURLsMap, err = types.NewURLsMap(clusterStr) + if err != nil { + // this should never fail + panic(err) + } + mm.InitialClusterToken = m.InitialClusterToken + mm.ElectionTicks = m.ElectionTicks + mm.PeerTLSInfo = m.PeerTLSInfo + mm.ClientTLSInfo = m.ClientTLSInfo + return mm +} + +// Launch starts a member based on ServerConfig, PeerListeners +// and ClientListeners. +func (m *member) Launch() error { + var err error + if m.s, err = etcdserver.NewServer(&m.ServerConfig); err != nil { + return fmt.Errorf("failed to initialize the etcd server: %v", err) + } + m.s.SyncTicker = time.Tick(500 * time.Millisecond) + m.s.Start() + + m.raftHandler = &testutil.PauseableHandler{Next: etcdhttp.NewPeerHandler(m.s)} + + for _, ln := range m.PeerListeners { + hs := &httptest.Server{ + Listener: ln, + Config: &http.Server{Handler: m.raftHandler}, + } + if m.PeerTLSInfo == nil { + hs.Start() + } else { + hs.TLS, err = m.PeerTLSInfo.ServerConfig() + if err != nil { + return err + } + hs.StartTLS() + } + m.hss = append(m.hss, hs) + } + for _, ln := range m.ClientListeners { + hs := &httptest.Server{ + Listener: ln, + Config: &http.Server{Handler: etcdhttp.NewClientHandler(m.s, m.ServerConfig.ReqTimeout())}, + } + if m.ClientTLSInfo == nil { + hs.Start() + } else { + hs.TLS, err = m.ClientTLSInfo.ServerConfig() + if err != nil { + return err + } + hs.StartTLS() + } + m.hss = append(m.hss, hs) + } + if m.grpcListener != nil { + m.grpcServer, err = v3rpc.Server(m.s, m.ClientTLSInfo) + go m.grpcServer.Serve(m.grpcListener) + } + return nil +} + +func (m *member) WaitOK(t *testing.T) { + cc := mustNewHTTPClient(t, []string{m.URL()}, m.ClientTLSInfo) + kapi := client.NewKeysAPI(cc) + for { + ctx, cancel := context.WithTimeout(context.Background(), requestTimeout) + _, err := kapi.Get(ctx, "/", nil) + if err != nil { + time.Sleep(tickDuration) + continue + } + cancel() + break + } + for m.s.Leader() == 0 { + time.Sleep(tickDuration) + } +} + +func (m *member) URL() string { return m.ClientURLs[0].String() } + +func (m *member) Pause() { + m.raftHandler.Pause() + m.s.PauseSending() +} + +func (m *member) Resume() { + m.raftHandler.Resume() + m.s.ResumeSending() +} + +// Close stops the member's etcdserver and closes its connections +func (m *member) Close() { + if m.grpcServer != nil { + m.grpcServer.Stop() + m.grpcServer = nil + } + m.s.Stop() + for _, hs := range m.hss { + hs.CloseClientConnections() + hs.Close() + } +} + +// Stop stops the member, but the data dir of the member is preserved. +func (m *member) Stop(t *testing.T) { + m.Close() + m.hss = nil +} + +// StopNotify unblocks when a member stop completes +func (m *member) StopNotify() <-chan struct{} { + return m.s.StopNotify() +} + +// Restart starts the member using the preserved data dir. +func (m *member) Restart(t *testing.T) error { + newPeerListeners := make([]net.Listener, 0) + for _, ln := range m.PeerListeners { + newPeerListeners = append(newPeerListeners, newListenerWithAddr(t, ln.Addr().String())) + } + m.PeerListeners = newPeerListeners + newClientListeners := make([]net.Listener, 0) + for _, ln := range m.ClientListeners { + newClientListeners = append(newClientListeners, newListenerWithAddr(t, ln.Addr().String())) + } + m.ClientListeners = newClientListeners + + if m.grpcListener != nil { + if err := m.listenGRPC(); err != nil { + t.Fatal(err) + } + } + + return m.Launch() +} + +// Terminate stops the member and removes the data dir. +func (m *member) Terminate(t *testing.T) { + m.Close() + if err := os.RemoveAll(m.ServerConfig.DataDir); err != nil { + t.Fatal(err) + } +} + +func mustNewHTTPClient(t *testing.T, eps []string, tls *transport.TLSInfo) client.Client { + cfgtls := transport.TLSInfo{} + if tls != nil { + cfgtls = *tls + } + cfg := client.Config{Transport: mustNewTransport(t, cfgtls), Endpoints: eps} + c, err := client.New(cfg) + if err != nil { + t.Fatal(err) + } + return c +} + +func mustNewTransport(t *testing.T, tlsInfo transport.TLSInfo) *http.Transport { + // tick in integration test is short, so 1s dial timeout could play well. + tr, err := transport.NewTimeoutTransport(tlsInfo, time.Second, rafthttp.ConnReadTimeout, rafthttp.ConnWriteTimeout) + if err != nil { + t.Fatal(err) + } + return tr +} + +type SortableMemberSliceByPeerURLs []client.Member + +func (p SortableMemberSliceByPeerURLs) Len() int { return len(p) } +func (p SortableMemberSliceByPeerURLs) Less(i, j int) bool { + return p[i].PeerURLs[0] < p[j].PeerURLs[0] +} +func (p SortableMemberSliceByPeerURLs) Swap(i, j int) { p[i], p[j] = p[j], p[i] } + +type ClusterV3 struct { + *cluster + clients []*clientv3.Client +} + +// NewClusterV3 returns a launched cluster with a grpc client connection +// for each cluster member. +func NewClusterV3(t *testing.T, cfg *ClusterConfig) *ClusterV3 { + cfg.UseV3 = true + cfg.UseGRPC = true + clus := &ClusterV3{cluster: NewClusterByConfig(t, cfg)} + for _, m := range clus.Members { + client, err := NewClientV3(m) + if err != nil { + t.Fatal(err) + } + clus.clients = append(clus.clients, client) + } + clus.Launch(t) + return clus +} + +func (c *ClusterV3) Terminate(t *testing.T) { + for _, client := range c.clients { + if err := client.Close(); err != nil { + t.Error(err) + } + } + c.cluster.Terminate(t) +} + +func (c *ClusterV3) RandClient() *clientv3.Client { + return c.clients[rand.Intn(len(c.clients))] +} + +func (c *ClusterV3) Client(i int) *clientv3.Client { + return c.clients[i] +} + +type grpcAPI struct { + // Cluster is the cluster API for the client's connection. + Cluster pb.ClusterClient + // KV is the keyvalue API for the client's connection. + KV pb.KVClient + // Lease is the lease API for the client's connection. + Lease pb.LeaseClient + // Watch is the watch API for the client's connection. + Watch pb.WatchClient +} + +func toGRPC(c *clientv3.Client) grpcAPI { + return grpcAPI{ + pb.NewClusterClient(c.ActiveConnection()), + pb.NewKVClient(c.ActiveConnection()), + pb.NewLeaseClient(c.ActiveConnection()), + pb.NewWatchClient(c.ActiveConnection()), + } +} diff --git a/Godeps/_workspace/src/github.com/coreos/etcd/integration/doc.go b/Godeps/_workspace/src/github.com/coreos/etcd/integration/doc.go new file mode 100644 index 00000000000..5d006637e41 --- /dev/null +++ b/Godeps/_workspace/src/github.com/coreos/etcd/integration/doc.go @@ -0,0 +1,25 @@ +// Copyright 2015 CoreOS, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +/* +Package integration implements tests built upon embedded etcd, and focus on +etcd correctness. + +Features/goals of the integration tests: +1. test the whole code base except command-line parsing. +2. check internal data, including raft, store and etc. +3. based on goroutines, which is faster than process. +4. mainly tests user behavior and user-facing API. +*/ +package integration diff --git a/Godeps/_workspace/src/github.com/coreos/etcd/integration/fixtures/ca.crt b/Godeps/_workspace/src/github.com/coreos/etcd/integration/fixtures/ca.crt new file mode 100644 index 00000000000..200b1b26064 --- /dev/null +++ b/Godeps/_workspace/src/github.com/coreos/etcd/integration/fixtures/ca.crt @@ -0,0 +1,14 @@ +-----BEGIN CERTIFICATE----- +MIICHTCCAaOgAwIBAgIUBFn+GT3FJV5W6SD+tn/L9cxT8rkwCgYIKoZIzj0EAwMw +PDEMMAoGA1UEBhMDVVNBMRAwDgYDVQQKEwdldGNkLWNhMQswCQYDVQQLEwJDQTEN +MAsGA1UEAxMEZXRjZDAeFw0xNjAyMDkyMzI5MDBaFw0yNjAyMDYyMzI5MDBaMDwx +DDAKBgNVBAYTA1VTQTEQMA4GA1UEChMHZXRjZC1jYTELMAkGA1UECxMCQ0ExDTAL +BgNVBAMTBGV0Y2QwdjAQBgcqhkjOPQIBBgUrgQQAIgNiAAT6FqAyCl0a7/XNtDUP +fzBRDFifDRnwNmKvbTiNMrgnx9ASJsDIsMMPJwa7A/ZIeu7SYD+UI9pejVwP/IBe +XknlozxDBq2kmV0o5tHTw7E460TqXy8aW1f+P97Ty23jOlOjZjBkMA4GA1UdDwEB +/wQEAwIBBjASBgNVHRMBAf8ECDAGAQH/AgECMB0GA1UdDgQWBBSz5VY1yFxMp0jk +JCm1esngW7dShzAfBgNVHSMEGDAWgBSz5VY1yFxMp0jkJCm1esngW7dShzAKBggq +hkjOPQQDAwNoADBlAjEAjq9EUW5JaALRd1xV6q518ju3RxdKZY61HTcxL6u5wFpb +EAMR/KoZpkLYFpGr3KtzAjAriAQYdZ1BNzYi2sckOctLUg/I97Ybs8kmX7MFaFmd +e7zXUO7ahhQNPXnWrn82u7A= +-----END CERTIFICATE----- diff --git a/Godeps/_workspace/src/github.com/coreos/etcd/integration/fixtures/server.crt b/Godeps/_workspace/src/github.com/coreos/etcd/integration/fixtures/server.crt new file mode 100644 index 00000000000..cc8357e1342 --- /dev/null +++ b/Godeps/_workspace/src/github.com/coreos/etcd/integration/fixtures/server.crt @@ -0,0 +1,15 @@ +-----BEGIN CERTIFICATE----- +MIICZzCCAe2gAwIBAgIURBvYzg73AkmCGaCgUUaEHfSoyF0wCgYIKoZIzj0EAwMw +PDEMMAoGA1UEBhMDVVNBMRAwDgYDVQQKEwdldGNkLWNhMQswCQYDVQQLEwJDQTEN +MAsGA1UEAxMEZXRjZDAeFw0xNjAyMDkyMzI5MDBaFw0yNjAyMDYyMzI5MDBaME8x +EDAOBgNVBAoTB2V0Y2QtY2ExEDAOBgNVBAsTB3NlcnZlcjExFTATBgNVBAcTDHRo +ZSBpbnRlcm5ldDESMBAGA1UEAxMJbG9jYWxob3N0MHYwEAYHKoZIzj0CAQYFK4EE +ACIDYgAEdI0oCEWEj9ztcCHVwn34HK0TElsdsTlfJ9sDZ20GO9HN9/hfKgoHlCXE +sK5H4WNT8E6q2q8PD9bpEtYiW82Q8/wJUmQrFIf0uxMrOUVbNGPQo6woDJr/uM8V +jRGkOW2qo4GcMIGZMA4GA1UdDwEB/wQEAwIFoDAdBgNVHSUEFjAUBggrBgEFBQcD +AQYIKwYBBQUHAwIwDAYDVR0TAQH/BAIwADAdBgNVHQ4EFgQUZzeCN+kAdg73qiHy +MT4+D851T5UwHwYDVR0jBBgwFoAUs+VWNchcTKdI5CQptXrJ4Fu3UocwGgYDVR0R +BBMwEYIJbG9jYWxob3N0hwR/AAABMAoGCCqGSM49BAMDA2gAMGUCMGW8jAlBWNqO +q7Gp2gCIHgl1xlXPppuPRnSwhU1xsPnjgHeblWygyVI2IFAVUZLEvgIxAO8OR6Cl +eN+rNGqrJAOv3+YVkDm5teDkW9N48P0RIp1TdXQIeGBhYhA0J+de6YulIQ== +-----END CERTIFICATE----- diff --git a/Godeps/_workspace/src/github.com/coreos/etcd/integration/fixtures/server.key.insecure b/Godeps/_workspace/src/github.com/coreos/etcd/integration/fixtures/server.key.insecure new file mode 100644 index 00000000000..631f398cec6 --- /dev/null +++ b/Godeps/_workspace/src/github.com/coreos/etcd/integration/fixtures/server.key.insecure @@ -0,0 +1,6 @@ +-----BEGIN EC PRIVATE KEY----- +MIGkAgEBBDDcUDqtGAt72T44c44iAFxP8cqJ/Hz4IbPrd4IoE4nBY+s6q+XCgtNa +hl5RW7I075qgBwYFK4EEACKhZANiAAR0jSgIRYSP3O1wIdXCffgcrRMSWx2xOV8n +2wNnbQY70c33+F8qCgeUJcSwrkfhY1PwTqrarw8P1ukS1iJbzZDz/AlSZCsUh/S7 +Eys5RVs0Y9CjrCgMmv+4zxWNEaQ5bao= +-----END EC PRIVATE KEY-----