From 3da15535b6c87e0bb8246f026cb15a21c4d05933 Mon Sep 17 00:00:00 2001 From: Clayton Coleman Date: Wed, 4 Nov 2015 15:15:01 -0500 Subject: [PATCH] Provide backpressure to clients when etcd goes down When etcd is down today we don't specifically handle the error involved, which means clients get a generic 500 error. This commit adds a formal error type internally for both WatchExpired and EtcdUnreachable, and then converts them to api/errors before returning to the client. It also upgrades the client to retry on any 429 or 5xx error that has a Retry-After header, instead of just 429. In combination, this allows the apiserver to exert backpressure on controllers that are hotlooping. Picked 2 seconds by default, but we could potentially ramp that up even further in a future iteration. --- pkg/api/errors/etcd/etcd.go | 21 ++++++++++++++++ pkg/api/unversioned/types.go | 6 +++++ pkg/client/unversioned/request.go | 5 +++- pkg/client/unversioned/request_test.go | 33 ++++++++++++++++++++++++++ pkg/registry/generic/etcd/etcd.go | 10 ++------ pkg/storage/etcd/etcd_util.go | 10 ++++++++ pkg/storage/etcd/etcd_watcher.go | 27 +++++++++++++++++---- pkg/tools/interfaces.go | 4 ++++ 8 files changed, 103 insertions(+), 13 deletions(-) diff --git a/pkg/api/errors/etcd/etcd.go b/pkg/api/errors/etcd/etcd.go index f3dcff02c04..03580c8f68a 100644 --- a/pkg/api/errors/etcd/etcd.go +++ b/pkg/api/errors/etcd/etcd.go @@ -21,12 +21,27 @@ import ( etcdstorage "k8s.io/kubernetes/pkg/storage/etcd" ) +// InterpretListError converts a generic etcd error on a retrieval +// operation into the appropriate API error. +func InterpretListError(err error, kind string) error { + switch { + case etcdstorage.IsEtcdNotFound(err): + return errors.NewNotFound(kind, "") + case etcdstorage.IsEtcdUnreachable(err): + return errors.NewServerTimeout(kind, "list", 2) // TODO: make configurable or handled at a higher level + default: + return err + } +} + // InterpretGetError converts a generic etcd error on a retrieval // operation into the appropriate API error. func InterpretGetError(err error, kind, name string) error { switch { case etcdstorage.IsEtcdNotFound(err): return errors.NewNotFound(kind, name) + case etcdstorage.IsEtcdUnreachable(err): + return errors.NewServerTimeout(kind, "get", 2) // TODO: make configurable or handled at a higher level default: return err } @@ -38,6 +53,8 @@ func InterpretCreateError(err error, kind, name string) error { switch { case etcdstorage.IsEtcdNodeExist(err): return errors.NewAlreadyExists(kind, name) + case etcdstorage.IsEtcdUnreachable(err): + return errors.NewServerTimeout(kind, "create", 2) // TODO: make configurable or handled at a higher level default: return err } @@ -49,6 +66,8 @@ func InterpretUpdateError(err error, kind, name string) error { switch { case etcdstorage.IsEtcdTestFailed(err), etcdstorage.IsEtcdNodeExist(err): return errors.NewConflict(kind, name, err) + case etcdstorage.IsEtcdUnreachable(err): + return errors.NewServerTimeout(kind, "update", 2) // TODO: make configurable or handled at a higher level default: return err } @@ -60,6 +79,8 @@ func InterpretDeleteError(err error, kind, name string) error { switch { case etcdstorage.IsEtcdNotFound(err): return errors.NewNotFound(kind, name) + case etcdstorage.IsEtcdUnreachable(err): + return errors.NewServerTimeout(kind, "delete", 2) // TODO: make configurable or handled at a higher level default: return err } diff --git a/pkg/api/unversioned/types.go b/pkg/api/unversioned/types.go index c90bf296571..3c002ae551a 100644 --- a/pkg/api/unversioned/types.go +++ b/pkg/api/unversioned/types.go @@ -215,6 +215,12 @@ const ( // Status code 500 StatusReasonInternalError = "InternalError" + // StatusReasonExpired indicates that the request is invalid because the content you are requesting + // has expired and is no longer available. It is typically associated with watches that can't be + // serviced. + // Status code 410 (gone) + StatusReasonExpired = "Expired" + // StatusReasonServiceUnavailable means that the request itself was valid, // but the requested service is unavailable at this time. // Retrying the request after some time might succeed. diff --git a/pkg/client/unversioned/request.go b/pkg/client/unversioned/request.go index 1df7fcebf9c..76949e05dfd 100644 --- a/pkg/client/unversioned/request.go +++ b/pkg/client/unversioned/request.go @@ -839,7 +839,10 @@ func isTextResponse(resp *http.Response) bool { // checkWait returns true along with a number of seconds if the server instructed us to wait // before retrying. func checkWait(resp *http.Response) (int, bool) { - if resp.StatusCode != errors.StatusTooManyRequests { + switch r := resp.StatusCode; { + // any 500 error code and 429 can trigger a wait + case r == errors.StatusTooManyRequests, r >= 500: + default: return 0, false } i, ok := retryAfterSeconds(resp) diff --git a/pkg/client/unversioned/request_test.go b/pkg/client/unversioned/request_test.go index 77573b8bc0a..09544504bb7 100644 --- a/pkg/client/unversioned/request_test.go +++ b/pkg/client/unversioned/request_test.go @@ -745,6 +745,38 @@ func TestCheckRetryClosesBody(t *testing.T) { } } +func TestCheckRetryHandles429And5xx(t *testing.T) { + count := 0 + ch := make(chan struct{}) + testServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) { + t.Logf("attempt %d", count) + if count >= 4 { + w.WriteHeader(http.StatusOK) + close(ch) + return + } + w.Header().Set("Retry-After", "0") + w.WriteHeader([]int{apierrors.StatusTooManyRequests, 500, 501, 504}[count]) + count++ + })) + defer testServer.Close() + + c := NewOrDie(&Config{Host: testServer.URL, Version: testapi.Default.Version(), Username: "user", Password: "pass"}) + _, err := c.Verb("POST"). + Prefix("foo", "bar"). + Suffix("baz"). + Timeout(time.Second). + Body([]byte(strings.Repeat("abcd", 1000))). + DoRaw() + if err != nil { + t.Fatalf("Unexpected error: %v %#v", err, err) + } + <-ch + if count != 4 { + t.Errorf("unexpected retries: %d", count) + } +} + func BenchmarkCheckRetryClosesBody(t *testing.B) { count := 0 testServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) { @@ -771,6 +803,7 @@ func BenchmarkCheckRetryClosesBody(t *testing.B) { } } } + func TestDoRequestNewWayReader(t *testing.T) { reqObj := &api.Pod{ObjectMeta: api.ObjectMeta{Name: "foo"}} reqBodyExpected, _ := testapi.Default.Codec().Encode(reqObj) diff --git a/pkg/registry/generic/etcd/etcd.go b/pkg/registry/generic/etcd/etcd.go index 42c994fe42f..29961bb97ba 100644 --- a/pkg/registry/generic/etcd/etcd.go +++ b/pkg/registry/generic/etcd/etcd.go @@ -182,10 +182,7 @@ func (e *Etcd) ListPredicate(ctx api.Context, m generic.Matcher, options *api.Li trace.Step("About to read single object") err := e.Storage.GetToList(ctx, key, filterFunc, list) trace.Step("Object extracted") - if err != nil { - return nil, err - } - return list, nil + return list, etcderr.InterpretListError(err, e.EndpointName) } // if we cannot extract a key based on the current context, the optimization is skipped } @@ -200,10 +197,7 @@ func (e *Etcd) ListPredicate(ctx api.Context, m generic.Matcher, options *api.Li } err = e.Storage.List(ctx, e.KeyRootFunc(ctx), version, filterFunc, list) trace.Step("List extracted") - if err != nil { - return nil, err - } - return list, nil + return list, etcderr.InterpretListError(err, e.EndpointName) } // Create inserts a new item according to the unique key from the object. diff --git a/pkg/storage/etcd/etcd_util.go b/pkg/storage/etcd/etcd_util.go index d99b3c28f83..e58a9fb94fa 100644 --- a/pkg/storage/etcd/etcd_util.go +++ b/pkg/storage/etcd/etcd_util.go @@ -43,6 +43,16 @@ func IsEtcdTestFailed(err error) bool { return isEtcdErrorNum(err, tools.EtcdErrorCodeTestFailed) } +// IsEtcdWatchExpired returns true if and only if err indicates the watch has expired. +func IsEtcdWatchExpired(err error) bool { + return isEtcdErrorNum(err, tools.EtcdErrorCodeWatchExpired) +} + +// IsEtcdUnreachable returns true if and only if err indicates the server could not be reached. +func IsEtcdUnreachable(err error) bool { + return isEtcdErrorNum(err, tools.EtcdErrorCodeUnreachable) +} + // IsEtcdWatchStoppedByUser returns true if and only if err is a client triggered stop. func IsEtcdWatchStoppedByUser(err error) bool { return goetcd.ErrWatchStoppedByUser == err diff --git a/pkg/storage/etcd/etcd_watcher.go b/pkg/storage/etcd/etcd_watcher.go index 9dae526586d..5eefcd1ca54 100644 --- a/pkg/storage/etcd/etcd_watcher.go +++ b/pkg/storage/etcd/etcd_watcher.go @@ -17,6 +17,7 @@ limitations under the License. package etcd import ( + "net/http" "sync" "time" @@ -181,12 +182,30 @@ func (w *etcdWatcher) translate() { select { case err := <-w.etcdError: if err != nil { - w.emit(watch.Event{ - Type: watch.Error, - Object: &unversioned.Status{ + var status *unversioned.Status + switch { + case IsEtcdWatchExpired(err): + status = &unversioned.Status{ Status: unversioned.StatusFailure, Message: err.Error(), - }, + Code: http.StatusGone, // Gone + Reason: unversioned.StatusReasonExpired, + } + // TODO: need to generate errors using api/errors which has a circular dependency on this package + // no other way to inject errors + // case IsEtcdUnreachable(err): + // status = errors.NewServerTimeout(...) + default: + status = &unversioned.Status{ + Status: unversioned.StatusFailure, + Message: err.Error(), + Code: http.StatusInternalServerError, + Reason: unversioned.StatusReasonInternalError, + } + } + w.emit(watch.Event{ + Type: watch.Error, + Object: status, }) } return diff --git a/pkg/tools/interfaces.go b/pkg/tools/interfaces.go index 02569cb28b5..5898bfab135 100644 --- a/pkg/tools/interfaces.go +++ b/pkg/tools/interfaces.go @@ -25,6 +25,8 @@ const ( EtcdErrorCodeTestFailed = 101 EtcdErrorCodeNodeExist = 105 EtcdErrorCodeValueRequired = 200 + EtcdErrorCodeWatchExpired = 401 + EtcdErrorCodeUnreachable = 501 ) var ( @@ -32,6 +34,8 @@ var ( EtcdErrorTestFailed = &etcd.EtcdError{ErrorCode: EtcdErrorCodeTestFailed} EtcdErrorNodeExist = &etcd.EtcdError{ErrorCode: EtcdErrorCodeNodeExist} EtcdErrorValueRequired = &etcd.EtcdError{ErrorCode: EtcdErrorCodeValueRequired} + EtcdErrorWatchExpired = &etcd.EtcdError{ErrorCode: EtcdErrorCodeWatchExpired} + EtcdErrorUnreachable = &etcd.EtcdError{ErrorCode: EtcdErrorCodeUnreachable} ) // EtcdClient is an injectable interface for testing.