Merge pull request #16807 from smarterclayton/server_backpressure_on_etcd_down

Auto commit by PR queue bot
This commit is contained in:
k8s-merge-robot 2015-11-05 21:19:30 -08:00
commit 6b7115067d
8 changed files with 103 additions and 13 deletions

View File

@ -21,12 +21,27 @@ import (
etcdstorage "k8s.io/kubernetes/pkg/storage/etcd"
)
// InterpretListError converts a generic etcd error on a retrieval
// operation into the appropriate API error.
func InterpretListError(err error, kind string) error {
switch {
case etcdstorage.IsEtcdNotFound(err):
return errors.NewNotFound(kind, "")
case etcdstorage.IsEtcdUnreachable(err):
return errors.NewServerTimeout(kind, "list", 2) // TODO: make configurable or handled at a higher level
default:
return err
}
}
// InterpretGetError converts a generic etcd error on a retrieval
// operation into the appropriate API error.
func InterpretGetError(err error, kind, name string) error {
switch {
case etcdstorage.IsEtcdNotFound(err):
return errors.NewNotFound(kind, name)
case etcdstorage.IsEtcdUnreachable(err):
return errors.NewServerTimeout(kind, "get", 2) // TODO: make configurable or handled at a higher level
default:
return err
}
@ -38,6 +53,8 @@ func InterpretCreateError(err error, kind, name string) error {
switch {
case etcdstorage.IsEtcdNodeExist(err):
return errors.NewAlreadyExists(kind, name)
case etcdstorage.IsEtcdUnreachable(err):
return errors.NewServerTimeout(kind, "create", 2) // TODO: make configurable or handled at a higher level
default:
return err
}
@ -49,6 +66,8 @@ func InterpretUpdateError(err error, kind, name string) error {
switch {
case etcdstorage.IsEtcdTestFailed(err), etcdstorage.IsEtcdNodeExist(err):
return errors.NewConflict(kind, name, err)
case etcdstorage.IsEtcdUnreachable(err):
return errors.NewServerTimeout(kind, "update", 2) // TODO: make configurable or handled at a higher level
default:
return err
}
@ -60,6 +79,8 @@ func InterpretDeleteError(err error, kind, name string) error {
switch {
case etcdstorage.IsEtcdNotFound(err):
return errors.NewNotFound(kind, name)
case etcdstorage.IsEtcdUnreachable(err):
return errors.NewServerTimeout(kind, "delete", 2) // TODO: make configurable or handled at a higher level
default:
return err
}

View File

@ -215,6 +215,12 @@ const (
// Status code 500
StatusReasonInternalError = "InternalError"
// StatusReasonExpired indicates that the request is invalid because the content you are requesting
// has expired and is no longer available. It is typically associated with watches that can't be
// serviced.
// Status code 410 (gone)
StatusReasonExpired = "Expired"
// StatusReasonServiceUnavailable means that the request itself was valid,
// but the requested service is unavailable at this time.
// Retrying the request after some time might succeed.

View File

@ -858,7 +858,10 @@ func isTextResponse(resp *http.Response) bool {
// checkWait returns true along with a number of seconds if the server instructed us to wait
// before retrying.
func checkWait(resp *http.Response) (int, bool) {
if resp.StatusCode != errors.StatusTooManyRequests {
switch r := resp.StatusCode; {
// any 500 error code and 429 can trigger a wait
case r == errors.StatusTooManyRequests, r >= 500:
default:
return 0, false
}
i, ok := retryAfterSeconds(resp)

View File

@ -764,6 +764,38 @@ func TestCheckRetryClosesBody(t *testing.T) {
}
}
func TestCheckRetryHandles429And5xx(t *testing.T) {
count := 0
ch := make(chan struct{})
testServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
t.Logf("attempt %d", count)
if count >= 4 {
w.WriteHeader(http.StatusOK)
close(ch)
return
}
w.Header().Set("Retry-After", "0")
w.WriteHeader([]int{apierrors.StatusTooManyRequests, 500, 501, 504}[count])
count++
}))
defer testServer.Close()
c := NewOrDie(&Config{Host: testServer.URL, Version: testapi.Default.Version(), Username: "user", Password: "pass"})
_, err := c.Verb("POST").
Prefix("foo", "bar").
Suffix("baz").
Timeout(time.Second).
Body([]byte(strings.Repeat("abcd", 1000))).
DoRaw()
if err != nil {
t.Fatalf("Unexpected error: %v %#v", err, err)
}
<-ch
if count != 4 {
t.Errorf("unexpected retries: %d", count)
}
}
func BenchmarkCheckRetryClosesBody(t *testing.B) {
count := 0
testServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
@ -790,6 +822,7 @@ func BenchmarkCheckRetryClosesBody(t *testing.B) {
}
}
}
func TestDoRequestNewWayReader(t *testing.T) {
reqObj := &api.Pod{ObjectMeta: api.ObjectMeta{Name: "foo"}}
reqBodyExpected, _ := testapi.Default.Codec().Encode(reqObj)

View File

@ -184,10 +184,7 @@ func (e *Etcd) ListPredicate(ctx api.Context, m generic.Matcher, options *api.Li
trace.Step("About to read single object")
err := e.Storage.GetToList(ctx, key, filterFunc, list)
trace.Step("Object extracted")
if err != nil {
return nil, err
}
return list, nil
return list, etcderr.InterpretListError(err, e.EndpointName)
}
// if we cannot extract a key based on the current context, the optimization is skipped
}
@ -202,10 +199,7 @@ func (e *Etcd) ListPredicate(ctx api.Context, m generic.Matcher, options *api.Li
}
err = e.Storage.List(ctx, e.KeyRootFunc(ctx), version, filterFunc, list)
trace.Step("List extracted")
if err != nil {
return nil, err
}
return list, nil
return list, etcderr.InterpretListError(err, e.EndpointName)
}
// Create inserts a new item according to the unique key from the object.

View File

@ -43,6 +43,16 @@ func IsEtcdTestFailed(err error) bool {
return isEtcdErrorNum(err, tools.EtcdErrorCodeTestFailed)
}
// IsEtcdWatchExpired returns true if and only if err indicates the watch has expired.
func IsEtcdWatchExpired(err error) bool {
return isEtcdErrorNum(err, tools.EtcdErrorCodeWatchExpired)
}
// IsEtcdUnreachable returns true if and only if err indicates the server could not be reached.
func IsEtcdUnreachable(err error) bool {
return isEtcdErrorNum(err, tools.EtcdErrorCodeUnreachable)
}
// IsEtcdWatchStoppedByUser returns true if and only if err is a client triggered stop.
func IsEtcdWatchStoppedByUser(err error) bool {
return goetcd.ErrWatchStoppedByUser == err

View File

@ -17,6 +17,7 @@ limitations under the License.
package etcd
import (
"net/http"
"sync"
"time"
@ -181,12 +182,30 @@ func (w *etcdWatcher) translate() {
select {
case err := <-w.etcdError:
if err != nil {
w.emit(watch.Event{
Type: watch.Error,
Object: &unversioned.Status{
var status *unversioned.Status
switch {
case IsEtcdWatchExpired(err):
status = &unversioned.Status{
Status: unversioned.StatusFailure,
Message: err.Error(),
},
Code: http.StatusGone, // Gone
Reason: unversioned.StatusReasonExpired,
}
// TODO: need to generate errors using api/errors which has a circular dependency on this package
// no other way to inject errors
// case IsEtcdUnreachable(err):
// status = errors.NewServerTimeout(...)
default:
status = &unversioned.Status{
Status: unversioned.StatusFailure,
Message: err.Error(),
Code: http.StatusInternalServerError,
Reason: unversioned.StatusReasonInternalError,
}
}
w.emit(watch.Event{
Type: watch.Error,
Object: status,
})
}
return

View File

@ -25,6 +25,8 @@ const (
EtcdErrorCodeTestFailed = 101
EtcdErrorCodeNodeExist = 105
EtcdErrorCodeValueRequired = 200
EtcdErrorCodeWatchExpired = 401
EtcdErrorCodeUnreachable = 501
)
var (
@ -32,6 +34,8 @@ var (
EtcdErrorTestFailed = &etcd.EtcdError{ErrorCode: EtcdErrorCodeTestFailed}
EtcdErrorNodeExist = &etcd.EtcdError{ErrorCode: EtcdErrorCodeNodeExist}
EtcdErrorValueRequired = &etcd.EtcdError{ErrorCode: EtcdErrorCodeValueRequired}
EtcdErrorWatchExpired = &etcd.EtcdError{ErrorCode: EtcdErrorCodeWatchExpired}
EtcdErrorUnreachable = &etcd.EtcdError{ErrorCode: EtcdErrorCodeUnreachable}
)
// EtcdClient is an injectable interface for testing.