bump(github.com/coreos/go-etcd/etcd): 7745cfd7f8e619cc9e6be450238e6253a57a227f

This commit is contained in:
Brendan Burns 2014-06-16 14:49:29 -07:00
parent fe0cb7c87b
commit b1531bbcf5
4 changed files with 81 additions and 46 deletions

View File

@ -51,11 +51,11 @@ type Client struct {
// If CheckRetry is nil, client will call the default one // If CheckRetry is nil, client will call the default one
// `DefaultCheckRetry`. // `DefaultCheckRetry`.
// Argument cluster is the etcd.Cluster object that these requests have been made on. // Argument cluster is the etcd.Cluster object that these requests have been made on.
// Argument reqs is all of the http.Requests that have been made so far. // Argument numReqs is the number of http.Requests that have been made so far.
// Argument resps is all of the http.Responses from these requests. // Argument lastResp is the http.Responses from the last request.
// Argument err is the reason of the failure. // Argument err is the reason of the failure.
CheckRetry func(cluster *Cluster, reqs []http.Request, CheckRetry func(cluster *Cluster, numReqs int,
resps []http.Response, err error) error lastResp http.Response, err error) error
} }
// NewClient create a basic client that is configured to be used // NewClient create a basic client that is configured to be used

View File

@ -1,6 +1,7 @@
package etcd package etcd
import ( import (
"fmt"
"io/ioutil" "io/ioutil"
"log" "log"
"strings" "strings"
@ -21,31 +22,31 @@ type etcdLogger struct {
} }
func (p *etcdLogger) Debug(args ...interface{}) { func (p *etcdLogger) Debug(args ...interface{}) {
args[0] = "DEBUG: " + args[0].(string) msg := "DEBUG: " + fmt.Sprint(args)
p.log.Println(args) p.log.Println(msg)
} }
func (p *etcdLogger) Debugf(fmt string, args ...interface{}) { func (p *etcdLogger) Debugf(f string, args ...interface{}) {
args[0] = "DEBUG: " + args[0].(string) msg := "DEBUG: " + fmt.Sprintf(f, args)
// Append newline if necessary // Append newline if necessary
if !strings.HasSuffix(fmt, "\n") { if !strings.HasSuffix(msg, "\n") {
fmt = fmt + "\n" msg = msg + "\n"
} }
p.log.Printf(fmt, args) p.log.Print(msg)
} }
func (p *etcdLogger) Warning(args ...interface{}) { func (p *etcdLogger) Warning(args ...interface{}) {
args[0] = "WARNING: " + args[0].(string) msg := "WARNING: " + fmt.Sprint(args)
p.log.Println(args) p.log.Println(msg)
} }
func (p *etcdLogger) Warningf(fmt string, args ...interface{}) { func (p *etcdLogger) Warningf(f string, args ...interface{}) {
msg := "WARNING: " + fmt.Sprintf(f, args)
// Append newline if necessary // Append newline if necessary
if !strings.HasSuffix(fmt, "\n") { if !strings.HasSuffix(msg, "\n") {
fmt = fmt + "\n" msg = msg + "\n"
} }
args[0] = "WARNING: " + args[0].(string) p.log.Print(msg)
p.log.Printf(fmt, args)
} }
func init() { func init() {

View File

@ -0,0 +1,28 @@
package etcd
import (
"testing"
)
type Foo struct{}
type Bar struct {
one string
two int
}
// Tests that logs don't panic with arbitrary interfaces
func TestDebug(t *testing.T) {
f := &Foo{}
b := &Bar{"asfd", 3}
for _, test := range []interface{}{
1234,
"asdf",
f,
b,
} {
logger.Debug(test)
logger.Debugf("something, %s", test)
logger.Warning(test)
logger.Warningf("something, %s", test)
}
}

View File

@ -136,8 +136,7 @@ func (c *Client) SendRequest(rr *RawRequest) (*RawResponse, error) {
var err error var err error
var respBody []byte var respBody []byte
reqs := make([]http.Request, 0) var numReqs = 1
resps := make([]http.Response, 0)
checkRetry := c.CheckRetry checkRetry := c.CheckRetry
if checkRetry == nil { if checkRetry == nil {
@ -176,15 +175,24 @@ func (c *Client) SendRequest(rr *RawRequest) (*RawResponse, error) {
}() }()
} }
// if we connect to a follower, we will retry until we find a leader // If we connect to a follower and consistency is required, retry until
// we connect to a leader
sleep := 25 * time.Millisecond
maxSleep := time.Second
for attempt := 0; ; attempt++ { for attempt := 0; ; attempt++ {
if attempt > 0 {
select { select {
case <-cancelled: case <-cancelled:
return nil, ErrRequestCancelled return nil, ErrRequestCancelled
default: case <-time.After(sleep):
sleep = sleep * 2
if sleep > maxSleep {
sleep = maxSleep
}
}
} }
logger.Debug("begin attempt", attempt, "for", rr.RelativePath) logger.Debug("Connecting to etcd: attempt", attempt+1, "for", rr.RelativePath)
if rr.Method == "GET" && c.config.Consistency == WEAK_CONSISTENCY { if rr.Method == "GET" && c.config.Consistency == WEAK_CONSISTENCY {
// If it's a GET and consistency level is set to WEAK, // If it's a GET and consistency level is set to WEAK,
@ -223,6 +231,12 @@ func (c *Client) SendRequest(rr *RawRequest) (*RawResponse, error) {
reqLock.Unlock() reqLock.Unlock()
resp, err = c.httpClient.Do(req) resp, err = c.httpClient.Do(req)
defer func() {
if resp != nil {
resp.Body.Close()
}
}()
// If the request was cancelled, return ErrRequestCancelled directly // If the request was cancelled, return ErrRequestCancelled directly
select { select {
case <-cancelled: case <-cancelled:
@ -230,13 +244,13 @@ func (c *Client) SendRequest(rr *RawRequest) (*RawResponse, error) {
default: default:
} }
reqs = append(reqs, *req) numReqs++
// network error, change a machine! // network error, change a machine!
if err != nil { if err != nil {
logger.Debug("network error:", err.Error()) logger.Debug("network error:", err.Error())
resps = append(resps, http.Response{}) lastResp := http.Response{}
if checkErr := checkRetry(c.cluster, reqs, resps, err); checkErr != nil { if checkErr := checkRetry(c.cluster, numReqs, lastResp, err); checkErr != nil {
return nil, checkErr return nil, checkErr
} }
@ -245,8 +259,6 @@ func (c *Client) SendRequest(rr *RawRequest) (*RawResponse, error) {
} }
// if there is no error, it should receive response // if there is no error, it should receive response
resps = append(resps, *resp)
defer resp.Body.Close()
logger.Debug("recv.response.from", httpPath) logger.Debug("recv.response.from", httpPath)
if validHttpStatusCode[resp.StatusCode] { if validHttpStatusCode[resp.StatusCode] {
@ -270,13 +282,15 @@ func (c *Client) SendRequest(rr *RawRequest) (*RawResponse, error) {
c.cluster.updateLeaderFromURL(u) c.cluster.updateLeaderFromURL(u)
logger.Debug("recv.response.relocate", u.String()) logger.Debug("recv.response.relocate", u.String())
} }
resp.Body.Close()
continue continue
} }
if checkErr := checkRetry(c.cluster, reqs, resps, if checkErr := checkRetry(c.cluster, numReqs, *resp,
errors.New("Unexpected HTTP status code")); checkErr != nil { errors.New("Unexpected HTTP status code")); checkErr != nil {
return nil, checkErr return nil, checkErr
} }
resp.Body.Close()
} }
r := &RawResponse{ r := &RawResponse{
@ -288,26 +302,18 @@ func (c *Client) SendRequest(rr *RawRequest) (*RawResponse, error) {
return r, nil return r, nil
} }
// DefaultCheckRetry checks retry cases // DefaultCheckRetry defines the retrying behaviour for bad HTTP requests
// If it has retried 2 * machine number, stop to retry it anymore // If we have retried 2 * machine number, stop retrying.
// If resp is nil, sleep for 200ms
// If status code is InternalServerError, sleep for 200ms. // If status code is InternalServerError, sleep for 200ms.
func DefaultCheckRetry(cluster *Cluster, reqs []http.Request, func DefaultCheckRetry(cluster *Cluster, numReqs int, lastResp http.Response,
resps []http.Response, err error) error { err error) error {
if len(reqs) >= 2*len(cluster.Machines) { if numReqs >= 2*len(cluster.Machines) {
return newError(ErrCodeEtcdNotReachable, return newError(ErrCodeEtcdNotReachable,
"Tried to connect to each peer twice and failed", 0) "Tried to connect to each peer twice and failed", 0)
} }
resp := &resps[len(resps)-1] code := lastResp.StatusCode
if resp == nil {
time.Sleep(time.Millisecond * 200)
return nil
}
code := resp.StatusCode
if code == http.StatusInternalServerError { if code == http.StatusInternalServerError {
time.Sleep(time.Millisecond * 200) time.Sleep(time.Millisecond * 200)