Upgrade go-etcd to v0.4.6 (Take 3)

This commit is contained in:
Dawn Chen 2015-03-24 15:54:50 -07:00
parent 064b7dec42
commit 273d20791e
15 changed files with 472 additions and 133 deletions

14
Godeps/Godeps.json generated
View File

@ -62,10 +62,20 @@
"Comment": "v0.1-62-g8d75e11", "Comment": "v0.1-62-g8d75e11",
"Rev": "8d75e11374a1928608c906fe745b538483e7aeb2" "Rev": "8d75e11374a1928608c906fe745b538483e7aeb2"
}, },
{
"ImportPath": "github.com/coreos/etcd/etcdserver/etcdhttp/httptypes",
"Comment": "v2.0.4-288-g866a9d4",
"Rev": "866a9d4e41401657ea44bf539b2c5561d6fdcd67"
},
{
"ImportPath": "github.com/coreos/etcd/pkg/types",
"Comment": "v2.0.4-288-g866a9d4",
"Rev": "866a9d4e41401657ea44bf539b2c5561d6fdcd67"
},
{ {
"ImportPath": "github.com/coreos/go-etcd/etcd", "ImportPath": "github.com/coreos/go-etcd/etcd",
"Comment": "v0.2.0-rc1-120-g23142f6", "Comment": "v0.4.6-8-g60e12ca",
"Rev": "23142f6773a676cc2cae8dd0cb90b2ea761c853f" "Rev": "60e12cac3db8ffce00b576b4af0e7b0a968f1003"
}, },
{ {
"ImportPath": "github.com/coreos/go-systemd/dbus", "ImportPath": "github.com/coreos/go-systemd/dbus",

View File

@ -0,0 +1,19 @@
// Copyright 2015 CoreOS, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
/*
Package httptypes defines how etcd's HTTP API entities are serialized to and deserialized from JSON.
*/
package httptypes

View File

@ -0,0 +1,49 @@
// Copyright 2015 CoreOS, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package httptypes
import (
"encoding/json"
"log"
"net/http"
)
type HTTPError struct {
Message string `json:"message"`
// HTTP return code
Code int `json:"-"`
}
func (e HTTPError) Error() string {
return e.Message
}
// TODO(xiangli): handle http write errors
func (e HTTPError) WriteTo(w http.ResponseWriter) {
w.Header().Set("Content-Type", "application/json")
w.WriteHeader(e.Code)
b, err := json.Marshal(e)
if err != nil {
log.Panicf("marshal HTTPError should never fail: %v", err)
}
w.Write(b)
}
func NewHTTPError(code int, m string) *HTTPError {
return &HTTPError{
Message: m,
Code: code,
}
}

View File

@ -0,0 +1,47 @@
// Copyright 2015 CoreOS, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package httptypes
import (
"net/http"
"net/http/httptest"
"reflect"
"testing"
)
func TestHTTPErrorWriteTo(t *testing.T) {
err := NewHTTPError(http.StatusBadRequest, "what a bad request you made!")
rr := httptest.NewRecorder()
err.WriteTo(rr)
wcode := http.StatusBadRequest
wheader := http.Header(map[string][]string{
"Content-Type": []string{"application/json"},
})
wbody := `{"message":"what a bad request you made!"}`
if wcode != rr.Code {
t.Errorf("HTTP status code %d, want %d", rr.Code, wcode)
}
if !reflect.DeepEqual(wheader, rr.HeaderMap) {
t.Errorf("HTTP headers %v, want %v", rr.HeaderMap, wheader)
}
gbody := rr.Body.String()
if wbody != gbody {
t.Errorf("HTTP body %q, want %q", gbody, wbody)
}
}

View File

@ -0,0 +1,67 @@
// Copyright 2015 CoreOS, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package httptypes
import (
"encoding/json"
"github.com/coreos/etcd/pkg/types"
)
type Member struct {
ID string `json:"id"`
Name string `json:"name"`
PeerURLs []string `json:"peerURLs"`
ClientURLs []string `json:"clientURLs"`
}
type MemberCreateRequest struct {
PeerURLs types.URLs
}
type MemberUpdateRequest struct {
MemberCreateRequest
}
func (m *MemberCreateRequest) UnmarshalJSON(data []byte) error {
s := struct {
PeerURLs []string `json:"peerURLs"`
}{}
err := json.Unmarshal(data, &s)
if err != nil {
return err
}
urls, err := types.NewURLs(s.PeerURLs)
if err != nil {
return err
}
m.PeerURLs = urls
return nil
}
type MemberCollection []Member
func (c *MemberCollection) MarshalJSON() ([]byte, error) {
d := struct {
Members []Member `json:"members"`
}{
Members: []Member(*c),
}
return json.Marshal(d)
}

View File

@ -0,0 +1,135 @@
// Copyright 2015 CoreOS, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package httptypes
import (
"encoding/json"
"net/url"
"reflect"
"testing"
"github.com/coreos/etcd/pkg/types"
)
func TestMemberUnmarshal(t *testing.T) {
tests := []struct {
body []byte
wantMember Member
wantError bool
}{
// no URLs, just check ID & Name
{
body: []byte(`{"id": "c", "name": "dungarees"}`),
wantMember: Member{ID: "c", Name: "dungarees", PeerURLs: nil, ClientURLs: nil},
},
// both client and peer URLs
{
body: []byte(`{"peerURLs": ["http://127.0.0.1:4001"], "clientURLs": ["http://127.0.0.1:4001"]}`),
wantMember: Member{
PeerURLs: []string{
"http://127.0.0.1:4001",
},
ClientURLs: []string{
"http://127.0.0.1:4001",
},
},
},
// multiple peer URLs
{
body: []byte(`{"peerURLs": ["http://127.0.0.1:4001", "https://example.com"]}`),
wantMember: Member{
PeerURLs: []string{
"http://127.0.0.1:4001",
"https://example.com",
},
ClientURLs: nil,
},
},
// multiple client URLs
{
body: []byte(`{"clientURLs": ["http://127.0.0.1:4001", "https://example.com"]}`),
wantMember: Member{
PeerURLs: nil,
ClientURLs: []string{
"http://127.0.0.1:4001",
"https://example.com",
},
},
},
// invalid JSON
{
body: []byte(`{"peerU`),
wantError: true,
},
}
for i, tt := range tests {
got := Member{}
err := json.Unmarshal(tt.body, &got)
if tt.wantError != (err != nil) {
t.Errorf("#%d: want error %t, got %v", i, tt.wantError, err)
continue
}
if !reflect.DeepEqual(tt.wantMember, got) {
t.Errorf("#%d: incorrect output: want=%#v, got=%#v", i, tt.wantMember, got)
}
}
}
func TestMemberCreateRequestUnmarshal(t *testing.T) {
body := []byte(`{"peerURLs": ["http://127.0.0.1:8081", "https://127.0.0.1:8080"]}`)
want := MemberCreateRequest{
PeerURLs: types.URLs([]url.URL{
url.URL{Scheme: "http", Host: "127.0.0.1:8081"},
url.URL{Scheme: "https", Host: "127.0.0.1:8080"},
}),
}
var req MemberCreateRequest
if err := json.Unmarshal(body, &req); err != nil {
t.Fatalf("Unmarshal returned unexpected err=%v", err)
}
if !reflect.DeepEqual(want, req) {
t.Fatalf("Failed to unmarshal MemberCreateRequest: want=%#v, got=%#v", want, req)
}
}
func TestMemberCreateRequestUnmarshalFail(t *testing.T) {
tests := [][]byte{
// invalid JSON
[]byte(``),
[]byte(`{`),
// spot-check validation done in types.NewURLs
[]byte(`{"peerURLs": "foo"}`),
[]byte(`{"peerURLs": ["."]}`),
[]byte(`{"peerURLs": []}`),
[]byte(`{"peerURLs": ["http://127.0.0.1:4001/foo"]}`),
[]byte(`{"peerURLs": ["http://127.0.0.1"]}`),
}
for i, tt := range tests {
var req MemberCreateRequest
if err := json.Unmarshal(tt, &req); err == nil {
t.Errorf("#%d: expected err, got nil", i)
}
}
}

View File

@ -7,12 +7,16 @@ import (
"errors" "errors"
"io" "io"
"io/ioutil" "io/ioutil"
"math/rand"
"net" "net"
"net/http" "net/http"
"net/url" "net/url"
"os" "os"
"path" "path"
"strings"
"time" "time"
"github.com/coreos/etcd/etcdserver/etcdhttp/httptypes"
) )
// See SetConsistency for how to use these constants. // See SetConsistency for how to use these constants.
@ -28,6 +32,10 @@ const (
defaultBufferSize = 10 defaultBufferSize = 10
) )
func init() {
rand.Seed(int64(time.Now().Nanosecond()))
}
type Config struct { type Config struct {
CertFile string `json:"certFile"` CertFile string `json:"certFile"`
KeyFile string `json:"keyFile"` KeyFile string `json:"keyFile"`
@ -64,8 +72,7 @@ func NewClient(machines []string) *Client {
config := Config{ config := Config{
// default timeout is one second // default timeout is one second
DialTimeout: time.Second, DialTimeout: time.Second,
// default consistency level is STRONG Consistency: WEAK_CONSISTENCY,
Consistency: STRONG_CONSISTENCY,
} }
client := &Client{ client := &Client{
@ -89,8 +96,7 @@ func NewTLSClient(machines []string, cert, key, caCert string) (*Client, error)
config := Config{ config := Config{
// default timeout is one second // default timeout is one second
DialTimeout: time.Second, DialTimeout: time.Second,
// default consistency level is STRONG Consistency: WEAK_CONSISTENCY,
Consistency: STRONG_CONSISTENCY,
CertFile: cert, CertFile: cert,
KeyFile: key, KeyFile: key,
CaCertFile: make([]string, 0), CaCertFile: make([]string, 0),
@ -292,12 +298,13 @@ func (c *Client) SyncCluster() bool {
// internalSyncCluster syncs cluster information using the given machine list. // internalSyncCluster syncs cluster information using the given machine list.
func (c *Client) internalSyncCluster(machines []string) bool { func (c *Client) internalSyncCluster(machines []string) bool {
for _, machine := range machines { for _, machine := range machines {
httpPath := c.createHttpPath(machine, path.Join(version, "machines")) httpPath := c.createHttpPath(machine, path.Join(version, "members"))
resp, err := c.httpClient.Get(httpPath) resp, err := c.httpClient.Get(httpPath)
if err != nil { if err != nil {
// try another machine in the cluster // try another machine in the cluster
continue continue
} else { }
b, err := ioutil.ReadAll(resp.Body) b, err := ioutil.ReadAll(resp.Body)
resp.Body.Close() resp.Body.Close()
if err != nil { if err != nil {
@ -305,18 +312,24 @@ func (c *Client) internalSyncCluster(machines []string) bool {
continue continue
} }
// update Machines List var mCollection httptypes.MemberCollection
c.cluster.updateFromStr(string(b)) if err := json.Unmarshal(b, &mCollection); err != nil {
// try another machine
continue
}
// update leader urls := make([]string, 0)
// the first one in the machine list is the leader for _, m := range mCollection {
c.cluster.switchLeader(0) urls = append(urls, m.ClientURLs...)
}
// update Machines List
c.cluster.updateFromStr(strings.Join(urls, ","))
logger.Debug("sync.machines ", c.cluster.Machines) logger.Debug("sync.machines ", c.cluster.Machines)
c.saveConfig() c.saveConfig()
return true return true
} }
}
return false return false
} }

View File

@ -10,7 +10,7 @@ import (
) )
// To pass this test, we need to create a cluster of 3 machines // To pass this test, we need to create a cluster of 3 machines
// The server should be listening on 127.0.0.1:4001, 4002, 4003 // The server should be listening on localhost:4001, 4002, 4003
func TestSync(t *testing.T) { func TestSync(t *testing.T) {
fmt.Println("Make sure there are three nodes at 0.0.0.0:4001-4003") fmt.Println("Make sure there are three nodes at 0.0.0.0:4001-4003")
@ -36,8 +36,8 @@ func TestSync(t *testing.T) {
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
if host != "127.0.0.1" { if host != "localhost" {
t.Fatal("Host must be 127.0.0.1") t.Fatal("Host must be localhost")
} }
} }

View File

@ -1,13 +1,14 @@
package etcd package etcd
import ( import (
"net/url" "math/rand"
"strings" "strings"
) )
type Cluster struct { type Cluster struct {
Leader string `json:"leader"` Leader string `json:"leader"`
Machines []string `json:"machines"` Machines []string `json:"machines"`
picked int
} }
func NewCluster(machines []string) *Cluster { func NewCluster(machines []string) *Cluster {
@ -18,34 +19,16 @@ func NewCluster(machines []string) *Cluster {
// default leader and machines // default leader and machines
return &Cluster{ return &Cluster{
Leader: machines[0], Leader: "",
Machines: machines, Machines: machines,
picked: rand.Intn(len(machines)),
} }
} }
// switchLeader switch the current leader to machines[num] func (cl *Cluster) failure() { cl.picked = rand.Intn(len(cl.Machines)) }
func (cl *Cluster) switchLeader(num int) { func (cl *Cluster) pick() string { return cl.Machines[cl.picked] }
logger.Debugf("switch.leader[from %v to %v]",
cl.Leader, cl.Machines[num])
cl.Leader = cl.Machines[num]
}
func (cl *Cluster) updateFromStr(machines string) { func (cl *Cluster) updateFromStr(machines string) {
cl.Machines = strings.Split(machines, ", ") cl.Machines = strings.Split(machines, ",")
} cl.picked = rand.Intn(len(cl.Machines))
func (cl *Cluster) updateLeader(leader string) {
logger.Debugf("update.leader[%s,%s]", cl.Leader, leader)
cl.Leader = leader
}
func (cl *Cluster) updateLeaderFromURL(u *url.URL) {
var leader string
if u.Scheme == "" {
leader = "http://" + u.Host
} else {
leader = u.Scheme + "://" + u.Host
}
cl.updateLeader(leader)
} }

View File

@ -7,6 +7,7 @@ import (
const ( const (
ErrCodeEtcdNotReachable = 501 ErrCodeEtcdNotReachable = 501
ErrCodeUnhandledHTTPStatus = 502
) )
var ( var (

View File

@ -18,9 +18,14 @@ func (c *Client) Get(key string, sort, recursive bool) (*Response, error) {
} }
func (c *Client) RawGet(key string, sort, recursive bool) (*RawResponse, error) { func (c *Client) RawGet(key string, sort, recursive bool) (*RawResponse, error) {
var q bool
if c.config.Consistency == STRONG_CONSISTENCY {
q = true
}
ops := Options{ ops := Options{
"recursive": recursive, "recursive": recursive,
"sorted": sort, "sorted": sort,
"quorum": q,
} }
return c.get(key, ops) return c.get(key, ops)

View File

@ -18,7 +18,7 @@ type validOptions map[string]reflect.Kind
var ( var (
VALID_GET_OPTIONS = validOptions{ VALID_GET_OPTIONS = validOptions{
"recursive": reflect.Bool, "recursive": reflect.Bool,
"consistent": reflect.Bool, "quorum": reflect.Bool,
"sorted": reflect.Bool, "sorted": reflect.Bool,
"wait": reflect.Bool, "wait": reflect.Bool,
"waitIndex": reflect.Uint64, "waitIndex": reflect.Uint64,

View File

@ -3,8 +3,9 @@ package etcd
import ( import (
"errors" "errors"
"fmt" "fmt"
"io"
"io/ioutil" "io/ioutil"
"math/rand" "net"
"net/http" "net/http"
"net/url" "net/url"
"path" "path"
@ -38,15 +39,9 @@ func NewRawRequest(method, relativePath string, values url.Values, cancel <-chan
// getCancelable issues a cancelable GET request // getCancelable issues a cancelable GET request
func (c *Client) getCancelable(key string, options Options, func (c *Client) getCancelable(key string, options Options,
cancel <-chan bool) (*RawResponse, error) { cancel <-chan bool) (*RawResponse, error) {
logger.Debugf("get %s [%s]", key, c.cluster.Leader) logger.Debugf("get %s [%s]", key, c.cluster.pick())
p := keyToPath(key) p := keyToPath(key)
// If consistency level is set to STRONG, append
// the `consistent` query string.
if c.config.Consistency == STRONG_CONSISTENCY {
options["consistent"] = true
}
str, err := options.toParameters(VALID_GET_OPTIONS) str, err := options.toParameters(VALID_GET_OPTIONS)
if err != nil { if err != nil {
return nil, err return nil, err
@ -72,7 +67,7 @@ func (c *Client) get(key string, options Options) (*RawResponse, error) {
func (c *Client) put(key string, value string, ttl uint64, func (c *Client) put(key string, value string, ttl uint64,
options Options) (*RawResponse, error) { options Options) (*RawResponse, error) {
logger.Debugf("put %s, %s, ttl: %d, [%s]", key, value, ttl, c.cluster.Leader) logger.Debugf("put %s, %s, ttl: %d, [%s]", key, value, ttl, c.cluster.pick())
p := keyToPath(key) p := keyToPath(key)
str, err := options.toParameters(VALID_PUT_OPTIONS) str, err := options.toParameters(VALID_PUT_OPTIONS)
@ -93,7 +88,7 @@ func (c *Client) put(key string, value string, ttl uint64,
// post issues a POST request // post issues a POST request
func (c *Client) post(key string, value string, ttl uint64) (*RawResponse, error) { func (c *Client) post(key string, value string, ttl uint64) (*RawResponse, error) {
logger.Debugf("post %s, %s, ttl: %d, [%s]", key, value, ttl, c.cluster.Leader) logger.Debugf("post %s, %s, ttl: %d, [%s]", key, value, ttl, c.cluster.pick())
p := keyToPath(key) p := keyToPath(key)
req := NewRawRequest("POST", p, buildValues(value, ttl), nil) req := NewRawRequest("POST", p, buildValues(value, ttl), nil)
@ -108,7 +103,7 @@ func (c *Client) post(key string, value string, ttl uint64) (*RawResponse, error
// delete issues a DELETE request // delete issues a DELETE request
func (c *Client) delete(key string, options Options) (*RawResponse, error) { func (c *Client) delete(key string, options Options) (*RawResponse, error) {
logger.Debugf("delete %s [%s]", key, c.cluster.Leader) logger.Debugf("delete %s [%s]", key, c.cluster.pick())
p := keyToPath(key) p := keyToPath(key)
str, err := options.toParameters(VALID_DELETE_OPTIONS) str, err := options.toParameters(VALID_DELETE_OPTIONS)
@ -129,7 +124,6 @@ func (c *Client) delete(key string, options Options) (*RawResponse, error) {
// SendRequest sends a HTTP request and returns a Response as defined by etcd // SendRequest sends a HTTP request and returns a Response as defined by etcd
func (c *Client) SendRequest(rr *RawRequest) (*RawResponse, error) { func (c *Client) SendRequest(rr *RawRequest) (*RawResponse, error) {
var req *http.Request var req *http.Request
var resp *http.Response var resp *http.Response
var httpPath string var httpPath string
@ -179,6 +173,7 @@ func (c *Client) SendRequest(rr *RawRequest) (*RawResponse, error) {
// we connect to a leader // we connect to a leader
sleep := 25 * time.Millisecond sleep := 25 * time.Millisecond
maxSleep := time.Second maxSleep := time.Second
for attempt := 0; ; attempt++ { for attempt := 0; ; attempt++ {
if attempt > 0 { if attempt > 0 {
select { select {
@ -192,16 +187,9 @@ func (c *Client) SendRequest(rr *RawRequest) (*RawResponse, error) {
} }
} }
logger.Debug("Connecting to etcd: attempt", attempt+1, "for", rr.RelativePath) logger.Debug("Connecting to etcd: attempt ", attempt+1, " for ", rr.RelativePath)
if rr.Method == "GET" && c.config.Consistency == WEAK_CONSISTENCY { httpPath = c.getHttpPath(rr.RelativePath)
// If it's a GET and consistency level is set to WEAK,
// then use a random machine.
httpPath = c.getHttpPath(true, rr.RelativePath)
} else {
// Else use the leader.
httpPath = c.getHttpPath(false, rr.RelativePath)
}
// Return a cURL command if curlChan is set // Return a cURL command if curlChan is set
if c.cURLch != nil { if c.cURLch != nil {
@ -214,7 +202,10 @@ func (c *Client) SendRequest(rr *RawRequest) (*RawResponse, error) {
logger.Debug("send.request.to ", httpPath, " | method ", rr.Method) logger.Debug("send.request.to ", httpPath, " | method ", rr.Method)
req, err := func() (*http.Request, error) {
reqLock.Lock() reqLock.Lock()
defer reqLock.Unlock()
if rr.Values == nil { if rr.Values == nil {
if req, err = http.NewRequest(rr.Method, httpPath, nil); err != nil { if req, err = http.NewRequest(rr.Method, httpPath, nil); err != nil {
return nil, err return nil, err
@ -228,7 +219,12 @@ func (c *Client) SendRequest(rr *RawRequest) (*RawResponse, error) {
req.Header.Set("Content-Type", req.Header.Set("Content-Type",
"application/x-www-form-urlencoded; param=value") "application/x-www-form-urlencoded; param=value")
} }
reqLock.Unlock() return req, nil
}()
if err != nil {
return nil, err
}
resp, err = c.httpClient.Do(req) resp, err = c.httpClient.Do(req)
defer func() { defer func() {
@ -248,24 +244,24 @@ func (c *Client) SendRequest(rr *RawRequest) (*RawResponse, error) {
// network error, change a machine! // network error, change a machine!
if err != nil { if err != nil {
logger.Debug("network error:", err.Error()) logger.Debug("network error: ", err.Error())
lastResp := http.Response{} lastResp := http.Response{}
if checkErr := checkRetry(c.cluster, numReqs, lastResp, err); checkErr != nil { if checkErr := checkRetry(c.cluster, numReqs, lastResp, err); checkErr != nil {
return nil, checkErr return nil, checkErr
} }
c.cluster.switchLeader(attempt % len(c.cluster.Machines)) c.cluster.failure()
continue continue
} }
// if there is no error, it should receive response // if there is no error, it should receive response
logger.Debug("recv.response.from", httpPath) logger.Debug("recv.response.from ", httpPath)
if validHttpStatusCode[resp.StatusCode] { if validHttpStatusCode[resp.StatusCode] {
// try to read byte code and break the loop // try to read byte code and break the loop
respBody, err = ioutil.ReadAll(resp.Body) respBody, err = ioutil.ReadAll(resp.Body)
if err == nil { if err == nil {
logger.Debug("recv.success.", httpPath) logger.Debug("recv.success ", httpPath)
break break
} }
// ReadAll error may be caused due to cancel request // ReadAll error may be caused due to cancel request
@ -274,22 +270,15 @@ func (c *Client) SendRequest(rr *RawRequest) (*RawResponse, error) {
return nil, ErrRequestCancelled return nil, ErrRequestCancelled
default: default:
} }
}
// if resp is TemporaryRedirect, set the new leader and retry if err == io.ErrUnexpectedEOF {
if resp.StatusCode == http.StatusTemporaryRedirect { // underlying connection was closed prematurely, probably by timeout
u, err := resp.Location() // TODO: empty body or unexpectedEOF can cause http.Transport to get hosed;
// this allows the client to detect that and take evasive action. Need
if err != nil { // to revisit once code.google.com/p/go/issues/detail?id=8648 gets fixed.
logger.Warning(err) respBody = []byte{}
} else { break
// Update cluster leader based on redirect location
// because it should point to the leader address
c.cluster.updateLeaderFromURL(u)
logger.Debug("recv.response.relocate", u.String())
} }
resp.Body.Close()
continue
} }
if checkErr := checkRetry(c.cluster, numReqs, *resp, if checkErr := checkRetry(c.cluster, numReqs, *resp,
@ -314,34 +303,53 @@ func (c *Client) SendRequest(rr *RawRequest) (*RawResponse, error) {
func DefaultCheckRetry(cluster *Cluster, numReqs int, lastResp http.Response, func DefaultCheckRetry(cluster *Cluster, numReqs int, lastResp http.Response,
err error) error { err error) error {
if numReqs >= 2*len(cluster.Machines) { if isEmptyResponse(lastResp) {
return newError(ErrCodeEtcdNotReachable, if !isConnectionError(err) {
"Tried to connect to each peer twice and failed", 0) return err
}
} else if !shouldRetry(lastResp) {
body := []byte("nil")
if lastResp.Body != nil {
if b, err := ioutil.ReadAll(lastResp.Body); err == nil {
body = b
}
}
errStr := fmt.Sprintf("unhandled http status [%s] with body [%s]", http.StatusText(lastResp.StatusCode), body)
return newError(ErrCodeUnhandledHTTPStatus, errStr, 0)
} }
code := lastResp.StatusCode if numReqs > 2*len(cluster.Machines) {
if code == http.StatusInternalServerError { errStr := fmt.Sprintf("failed to propose on members %v twice [last error: %v]", cluster.Machines, err)
return newError(ErrCodeEtcdNotReachable, errStr, 0)
}
if shouldRetry(lastResp) {
// sleep some time and expect leader election finish
time.Sleep(time.Millisecond * 200) time.Sleep(time.Millisecond * 200)
} }
logger.Warning("bad response status code", code) logger.Warning("bad response status code", lastResp.StatusCode)
return nil return nil
} }
func (c *Client) getHttpPath(random bool, s ...string) string { func isEmptyResponse(r http.Response) bool { return r.StatusCode == 0 }
var machine string
if random {
machine = c.cluster.Machines[rand.Intn(len(c.cluster.Machines))]
} else {
machine = c.cluster.Leader
}
fullPath := machine + "/" + version func isConnectionError(err error) bool {
_, ok := err.(*net.OpError)
return ok
}
// shouldRetry returns whether the reponse deserves retry.
func shouldRetry(r http.Response) bool {
// TODO: only retry when the cluster is in leader election
// We cannot do it exactly because etcd doesn't support it well.
return r.StatusCode == http.StatusInternalServerError
}
func (c *Client) getHttpPath(s ...string) string {
fullPath := c.cluster.pick() + "/" + version
for _, seg := range s { for _, seg := range s {
fullPath = fullPath + "/" + seg fullPath = fullPath + "/" + seg
} }
return fullPath return fullPath
} }
@ -360,11 +368,13 @@ func buildValues(value string, ttl uint64) url.Values {
return v return v
} }
// convert key string to http path exclude version // convert key string to http path exclude version, including URL escaping
// for example: key[foo] -> path[keys/foo] // for example: key[foo] -> path[keys/foo]
// key[/%z] -> path[keys/%25z]
// key[/] -> path[keys/] // key[/] -> path[keys/]
func keyToPath(key string) string { func keyToPath(key string) string {
p := path.Join("keys", key) // URL-escape our key, except for slashes
p := strings.Replace(url.QueryEscape(path.Join("keys", key)), "%2F", "/", -1)
// corner case: if key is "/" or "//" ect // corner case: if key is "/" or "//" ect
// path join will clear the tailing "/" // path join will clear the tailing "/"

View File

@ -19,7 +19,7 @@ func TestSetCurlChan(t *testing.T) {
} }
expected := fmt.Sprintf("curl -X PUT %s/v2/keys/foo -d value=bar -d ttl=5", expected := fmt.Sprintf("curl -X PUT %s/v2/keys/foo -d value=bar -d ttl=5",
c.cluster.Leader) c.cluster.pick())
actual := c.RecvCURL() actual := c.RecvCURL()
if expected != actual { if expected != actual {
t.Fatalf(`Command "%s" is not equal to expected value "%s"`, t.Fatalf(`Command "%s" is not equal to expected value "%s"`,
@ -32,8 +32,8 @@ func TestSetCurlChan(t *testing.T) {
t.Fatal(err) t.Fatal(err)
} }
expected = fmt.Sprintf("curl -X GET %s/v2/keys/foo?consistent=true&recursive=false&sorted=false", expected = fmt.Sprintf("curl -X GET %s/v2/keys/foo?quorum=true&recursive=false&sorted=false",
c.cluster.Leader) c.cluster.pick())
actual = c.RecvCURL() actual = c.RecvCURL()
if expected != actual { if expected != actual {
t.Fatalf(`Command "%s" is not equal to expected value "%s"`, t.Fatalf(`Command "%s" is not equal to expected value "%s"`,

View File

@ -13,7 +13,7 @@ func (c *Client) Set(key string, value string, ttl uint64) (*Response, error) {
return raw.Unmarshal() return raw.Unmarshal()
} }
// Set sets the given key to a directory. // SetDir sets the given key to a directory.
// It will create a new directory or replace the old key value pair by a directory. // It will create a new directory or replace the old key value pair by a directory.
// It will not replace a existing directory. // It will not replace a existing directory.
func (c *Client) SetDir(key string, ttl uint64) (*Response, error) { func (c *Client) SetDir(key string, ttl uint64) (*Response, error) {