Merge pull request #25871 from smarterclayton/retry_on_error

Fix the Retry-After code path to work for clients, and send correct bodies
This commit is contained in:
Wojciech Tyczynski 2016-05-19 16:18:18 +02:00
commit 9784dff94e
12 changed files with 103 additions and 33 deletions

View File

@ -93,7 +93,7 @@ func FromObject(obj runtime.Object) error {
}
// NewNotFound returns a new error which indicates that the resource of the kind and the name was not found.
func NewNotFound(qualifiedResource unversioned.GroupResource, name string) error {
func NewNotFound(qualifiedResource unversioned.GroupResource, name string) *StatusError {
return &StatusError{unversioned.Status{
Status: unversioned.StatusFailure,
Code: http.StatusNotFound,
@ -108,7 +108,7 @@ func NewNotFound(qualifiedResource unversioned.GroupResource, name string) error
}
// NewAlreadyExists returns an error indicating the item requested exists by that identifier.
func NewAlreadyExists(qualifiedResource unversioned.GroupResource, name string) error {
func NewAlreadyExists(qualifiedResource unversioned.GroupResource, name string) *StatusError {
return &StatusError{unversioned.Status{
Status: unversioned.StatusFailure,
Code: http.StatusConflict,
@ -124,7 +124,7 @@ func NewAlreadyExists(qualifiedResource unversioned.GroupResource, name string)
// NewUnauthorized returns an error indicating the client is not authorized to perform the requested
// action.
func NewUnauthorized(reason string) error {
func NewUnauthorized(reason string) *StatusError {
message := reason
if len(message) == 0 {
message = "not authorized"
@ -138,7 +138,7 @@ func NewUnauthorized(reason string) error {
}
// NewForbidden returns an error indicating the requested action was forbidden
func NewForbidden(qualifiedResource unversioned.GroupResource, name string, err error) error {
func NewForbidden(qualifiedResource unversioned.GroupResource, name string, err error) *StatusError {
return &StatusError{unversioned.Status{
Status: unversioned.StatusFailure,
Code: http.StatusForbidden,
@ -153,7 +153,7 @@ func NewForbidden(qualifiedResource unversioned.GroupResource, name string, err
}
// NewConflict returns an error indicating the item can't be updated as provided.
func NewConflict(qualifiedResource unversioned.GroupResource, name string, err error) error {
func NewConflict(qualifiedResource unversioned.GroupResource, name string, err error) *StatusError {
return &StatusError{unversioned.Status{
Status: unversioned.StatusFailure,
Code: http.StatusConflict,
@ -168,7 +168,7 @@ func NewConflict(qualifiedResource unversioned.GroupResource, name string, err e
}
// NewGone returns an error indicating the item no longer available at the server and no forwarding address is known.
func NewGone(message string) error {
func NewGone(message string) *StatusError {
return &StatusError{unversioned.Status{
Status: unversioned.StatusFailure,
Code: http.StatusGone,
@ -178,7 +178,7 @@ func NewGone(message string) error {
}
// NewInvalid returns an error indicating the item is invalid and cannot be processed.
func NewInvalid(qualifiedKind unversioned.GroupKind, name string, errs field.ErrorList) error {
func NewInvalid(qualifiedKind unversioned.GroupKind, name string, errs field.ErrorList) *StatusError {
causes := make([]unversioned.StatusCause, 0, len(errs))
for i := range errs {
err := errs[i]
@ -203,7 +203,7 @@ func NewInvalid(qualifiedKind unversioned.GroupKind, name string, errs field.Err
}
// NewBadRequest creates an error that indicates that the request is invalid and can not be processed.
func NewBadRequest(reason string) error {
func NewBadRequest(reason string) *StatusError {
return &StatusError{unversioned.Status{
Status: unversioned.StatusFailure,
Code: http.StatusBadRequest,
@ -213,7 +213,7 @@ func NewBadRequest(reason string) error {
}
// NewServiceUnavailable creates an error that indicates that the requested service is unavailable.
func NewServiceUnavailable(reason string) error {
func NewServiceUnavailable(reason string) *StatusError {
return &StatusError{unversioned.Status{
Status: unversioned.StatusFailure,
Code: http.StatusServiceUnavailable,
@ -223,7 +223,7 @@ func NewServiceUnavailable(reason string) error {
}
// NewMethodNotSupported returns an error indicating the requested action is not supported on this kind.
func NewMethodNotSupported(qualifiedResource unversioned.GroupResource, action string) error {
func NewMethodNotSupported(qualifiedResource unversioned.GroupResource, action string) *StatusError {
return &StatusError{unversioned.Status{
Status: unversioned.StatusFailure,
Code: http.StatusMethodNotAllowed,
@ -238,7 +238,7 @@ func NewMethodNotSupported(qualifiedResource unversioned.GroupResource, action s
// NewServerTimeout returns an error indicating the requested action could not be completed due to a
// transient error, and the client should try again.
func NewServerTimeout(qualifiedResource unversioned.GroupResource, operation string, retryAfterSeconds int) error {
func NewServerTimeout(qualifiedResource unversioned.GroupResource, operation string, retryAfterSeconds int) *StatusError {
return &StatusError{unversioned.Status{
Status: unversioned.StatusFailure,
Code: http.StatusInternalServerError,
@ -255,12 +255,12 @@ func NewServerTimeout(qualifiedResource unversioned.GroupResource, operation str
// NewServerTimeoutForKind should not exist. Server timeouts happen when accessing resources, the Kind is just what we
// happened to be looking at when the request failed. This delegates to keep code sane, but we should work towards removing this.
func NewServerTimeoutForKind(qualifiedKind unversioned.GroupKind, operation string, retryAfterSeconds int) error {
func NewServerTimeoutForKind(qualifiedKind unversioned.GroupKind, operation string, retryAfterSeconds int) *StatusError {
return NewServerTimeout(unversioned.GroupResource{Group: qualifiedKind.Group, Resource: qualifiedKind.Kind}, operation, retryAfterSeconds)
}
// NewInternalError returns an error indicating the item is invalid and cannot be processed.
func NewInternalError(err error) error {
func NewInternalError(err error) *StatusError {
return &StatusError{unversioned.Status{
Status: unversioned.StatusFailure,
Code: http.StatusInternalServerError,
@ -274,7 +274,7 @@ func NewInternalError(err error) error {
// NewTimeoutError returns an error indicating that a timeout occurred before the request
// could be completed. Clients may retry, but the operation may still complete.
func NewTimeoutError(message string, retryAfterSeconds int) error {
func NewTimeoutError(message string, retryAfterSeconds int) *StatusError {
return &StatusError{unversioned.Status{
Status: unversioned.StatusFailure,
Code: StatusServerTimeout,
@ -287,7 +287,7 @@ func NewTimeoutError(message string, retryAfterSeconds int) error {
}
// NewGenericServerResponse returns a new error for server responses that are not in a recognizable form.
func NewGenericServerResponse(code int, verb string, qualifiedResource unversioned.GroupResource, name, serverMessage string, retryAfterSeconds int, isUnexpectedResponse bool) error {
func NewGenericServerResponse(code int, verb string, qualifiedResource unversioned.GroupResource, name, serverMessage string, retryAfterSeconds int, isUnexpectedResponse bool) *StatusError {
reason := unversioned.StatusReasonUnknown
message := fmt.Sprintf("the server responded with the status code %d but did not return more information", code)
switch code {

View File

@ -152,7 +152,7 @@ func TestNewInvalid(t *testing.T) {
vErr, expected := testCase.Err, testCase.Details
expected.Causes[0].Message = vErr.ErrorBody()
err := NewInvalid(api.Kind("Kind"), "name", field.ErrorList{vErr})
status := err.(*StatusError).ErrStatus
status := err.ErrStatus
if status.Code != 422 || status.Reason != unversioned.StatusReasonInvalid {
t.Errorf("%d: unexpected status: %#v", i, status)
}

View File

@ -26,6 +26,7 @@ import (
"net/http"
"path"
rt "runtime"
"strconv"
"strings"
"time"
@ -449,6 +450,11 @@ func writeNegotiated(s runtime.NegotiatedSerializer, gv unversioned.GroupVersion
func errorNegotiated(err error, s runtime.NegotiatedSerializer, gv unversioned.GroupVersion, w http.ResponseWriter, req *http.Request) int {
status := errToAPIStatus(err)
code := int(status.Code)
// when writing an error, check to see if the status indicates a retry after period
if status.Details != nil && status.Details.RetryAfterSeconds > 0 {
delay := strconv.Itoa(int(status.Details.RetryAfterSeconds))
w.Header().Set("Retry-After", delay)
}
writeNegotiated(s, gv, w, req, code, status)
return code
}

View File

@ -1562,6 +1562,7 @@ func TestGetNamespaceSelfLink(t *testing.T) {
t.Errorf("Never set self link")
}
}
func TestGetMissing(t *testing.T) {
storage := map[string]rest.Storage{}
simpleStorage := SimpleRESTStorage{
@ -1572,7 +1573,7 @@ func TestGetMissing(t *testing.T) {
server := httptest.NewServer(handler)
defer server.Close()
resp, err := http.Get(server.URL + "/" + prefix + "/" + testGroupVersion.Group + "/" + testGroupVersion.Version + "/simple/id")
resp, err := http.Get(server.URL + "/" + prefix + "/" + testGroupVersion.Group + "/" + testGroupVersion.Version + "/namespaces/default/simple/id")
if err != nil {
t.Errorf("unexpected error: %v", err)
}
@ -1582,6 +1583,28 @@ func TestGetMissing(t *testing.T) {
}
}
func TestGetRetryAfter(t *testing.T) {
storage := map[string]rest.Storage{}
simpleStorage := SimpleRESTStorage{
errors: map[string]error{"get": apierrs.NewServerTimeout(api.Resource("simples"), "id", 2)},
}
storage["simple"] = &simpleStorage
handler := handle(storage)
server := httptest.NewServer(handler)
defer server.Close()
resp, err := http.Get(server.URL + "/" + prefix + "/" + testGroupVersion.Group + "/" + testGroupVersion.Version + "/namespaces/default/simple/id")
if err != nil {
t.Errorf("unexpected error: %v", err)
}
if resp.StatusCode != http.StatusInternalServerError {
t.Errorf("Unexpected response %#v", resp)
}
if resp.Header.Get("Retry-After") != "2" {
t.Errorf("Unexpected Retry-After header: %v", resp.Header)
}
}
func TestConnect(t *testing.T) {
responseText := "Hello World"
itemID := "theID"

View File

@ -17,6 +17,7 @@ limitations under the License.
package apiserver
import (
"encoding/hex"
"encoding/json"
"fmt"
"math/rand"
@ -376,7 +377,7 @@ func createHandler(r rest.NamedCreater, scope RequestScope, typer runtime.Object
trace.Step("About to convert to expected version")
obj, gvk, err := decoder.Decode(body, &defaultGVK, original)
if err != nil {
err = transformDecodeError(typer, err, original, gvk)
err = transformDecodeError(typer, err, original, gvk, body)
scope.err(err, res.ResponseWriter, req.Request)
return
}
@ -650,7 +651,7 @@ func UpdateResource(r rest.Updater, scope RequestScope, typer runtime.ObjectType
trace.Step("About to convert to expected version")
obj, gvk, err := scope.Serializer.DecoderToVersion(s, defaultGVK.GroupVersion()).Decode(body, &defaultGVK, original)
if err != nil {
err = transformDecodeError(typer, err, original, gvk)
err = transformDecodeError(typer, err, original, gvk, body)
scope.err(err, res.ResponseWriter, req.Request)
return
}
@ -938,7 +939,7 @@ func finishRequest(timeout time.Duration, fn resultFunc) (result runtime.Object,
}
// transformDecodeError adds additional information when a decode fails.
func transformDecodeError(typer runtime.ObjectTyper, baseErr error, into runtime.Object, gvk *unversioned.GroupVersionKind) error {
func transformDecodeError(typer runtime.ObjectTyper, baseErr error, into runtime.Object, gvk *unversioned.GroupVersionKind, body []byte) error {
objGVK, err := typer.ObjectKind(into)
if err != nil {
return err
@ -946,7 +947,8 @@ func transformDecodeError(typer runtime.ObjectTyper, baseErr error, into runtime
if gvk != nil && len(gvk.Kind) > 0 {
return errors.NewBadRequest(fmt.Sprintf("%s in version %q cannot be handled as a %s: %v", gvk.Kind, gvk.Version, objGVK.Kind, baseErr))
}
return errors.NewBadRequest(fmt.Sprintf("the object provided is unrecognized (must be of type %s): %v", objGVK.Kind, baseErr))
summary := summarizeData(body, 30)
return errors.NewBadRequest(fmt.Sprintf("the object provided is unrecognized (must be of type %s): %v (%s)", objGVK.Kind, baseErr, summary))
}
// setSelfLink sets the self link of an object (or the child items in a list) to the base URL of the request
@ -1038,3 +1040,20 @@ func getPatchedJS(patchType api.PatchType, originalJS, patchJS []byte, obj runti
return nil, fmt.Errorf("unknown Content-Type header for patch: %v", patchType)
}
}
func summarizeData(data []byte, maxLength int) string {
switch {
case len(data) == 0:
return "<empty>"
case data[0] == '{':
if len(data) > maxLength {
return string(data[:maxLength]) + " ..."
}
return string(data)
default:
if len(data) > maxLength {
return hex.EncodeToString(data[:maxLength]) + " ..."
}
return hex.EncodeToString(data)
}
}

View File

@ -539,10 +539,10 @@ func (r *Request) Body(obj interface{}) *Request {
return r
}
glog.V(8).Infof("Request Body: %s", string(data))
r.body = bytes.NewBuffer(data)
r.body = bytes.NewReader(data)
case []byte:
glog.V(8).Infof("Request Body: %s", string(t))
r.body = bytes.NewBuffer(t)
r.body = bytes.NewReader(t)
case io.Reader:
r.body = t
case runtime.Object:
@ -556,7 +556,7 @@ func (r *Request) Body(obj interface{}) *Request {
return r
}
glog.V(8).Infof("Request Body: %s", string(data))
r.body = bytes.NewBuffer(data)
r.body = bytes.NewReader(data)
r.SetHeader("Content-Type", r.content.ContentType)
default:
r.err = fmt.Errorf("unknown type used for body: %+v", obj)
@ -823,6 +823,15 @@ func (r *Request) request(fn func(*http.Request, *http.Response)) error {
retries++
if seconds, wait := checkWait(resp); wait && retries < maxRetries {
if seeker, ok := r.body.(io.Seeker); ok && r.body != nil {
_, err := seeker.Seek(0, 0)
if err != nil {
glog.V(4).Infof("Could not retry request, can't Seek() back to beginning of body for %T", r.body)
fn(req, resp)
return true
}
}
glog.V(4).Infof("Got a Retry-After %s response for attempt %d to %v", seconds, retries, url)
r.backoffMgr.Sleep(time.Duration(seconds) * time.Second)
return false

View File

@ -874,6 +874,13 @@ func TestCheckRetryHandles429And5xx(t *testing.T) {
count := 0
ch := make(chan struct{})
testServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
data, err := ioutil.ReadAll(req.Body)
if err != nil {
t.Fatalf("unable to read request body: %v", err)
}
if !bytes.Equal(data, []byte(strings.Repeat("abcd", 1000))) {
t.Fatalf("retry did not send a complete body: %s", data)
}
t.Logf("attempt %d", count)
if count >= 4 {
w.WriteHeader(http.StatusOK)

View File

@ -56,9 +56,9 @@ func TestErrors(t *testing.T) {
o.Add(&api.List{
Items: []runtime.Object{
// This first call to List will return this error
&(errors.NewNotFound(api.Resource("ServiceList"), "").(*errors.StatusError).ErrStatus),
&(errors.NewNotFound(api.Resource("ServiceList"), "").ErrStatus),
// The second call to List will return this error
&(errors.NewForbidden(api.Resource("ServiceList"), "", nil).(*errors.StatusError).ErrStatus),
&(errors.NewForbidden(api.Resource("ServiceList"), "", nil).ErrStatus),
},
})
client := &Fake{}

View File

@ -192,7 +192,7 @@ func TestDeleteAllNotFound(t *testing.T) {
// Add an item to the list which will result in a 404 on delete
svc.Items = append(svc.Items, api.Service{ObjectMeta: api.ObjectMeta{Name: "foo"}})
notFoundError := &errors.NewNotFound(api.Resource("services"), "foo").(*errors.StatusError).ErrStatus
notFoundError := &errors.NewNotFound(api.Resource("services"), "foo").ErrStatus
tf.Printer = &testPrinter{}
tf.Client = &fake.RESTClient{
@ -234,7 +234,7 @@ func TestDeleteAllIgnoreNotFound(t *testing.T) {
// Add an item to the list which will result in a 404 on delete
svc.Items = append(svc.Items, api.Service{ObjectMeta: api.ObjectMeta{Name: "foo"}})
notFoundError := &errors.NewNotFound(api.Resource("services"), "foo").(*errors.StatusError).ErrStatus
notFoundError := &errors.NewNotFound(api.Resource("services"), "foo").ErrStatus
tf.Printer = &testPrinter{}
tf.Client = &fake.RESTClient{

View File

@ -239,7 +239,7 @@ func TestWatch(t *testing.T) {
}
defer tooOldWatcher.Stop()
// Ensure we get a "Gone" error
expectedGoneError := errors.NewGone("").(*errors.StatusError).ErrStatus
expectedGoneError := errors.NewGone("").ErrStatus
verifyWatchEvent(t, tooOldWatcher, watch.Error, &expectedGoneError)
initialWatcher, err := cacher.Watch(context.TODO(), "pods/ns/foo", fooCreated.ResourceVersion, storage.Everything)

View File

@ -28,6 +28,7 @@ import (
"k8s.io/kubernetes/pkg/admission"
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/api/errors"
"k8s.io/kubernetes/pkg/api/unversioned"
"k8s.io/kubernetes/pkg/client/cache"
"k8s.io/kubernetes/pkg/fields"
kubelet "k8s.io/kubernetes/pkg/kubelet/types"
@ -199,6 +200,9 @@ func (s *serviceAccount) Admit(a admission.Attributes) (err error) {
if s.MountServiceAccountToken {
if err := s.mountServiceAccountToken(serviceAccount, pod); err != nil {
if _, ok := err.(errors.APIStatus); ok {
return err
}
return admission.NewForbidden(a, err)
}
}
@ -357,8 +361,9 @@ func (s *serviceAccount) mountServiceAccountToken(serviceAccount *api.ServiceAcc
// We don't have an API token to mount, so return
if s.RequireAPIToken {
// If a token is required, this is considered an error
// TODO: convert to a ServerTimeout error (or other error that sends a Retry-After header)
return fmt.Errorf("no API token found for service account %s/%s, retry after the token is automatically created and added to the service account", serviceAccount.Namespace, serviceAccount.Name)
err := errors.NewServerTimeout(unversioned.GroupResource{Resource: "serviceaccounts"}, "create pod", 1)
err.ErrStatus.Message = fmt.Sprintf("No API token found for service account %q, retry after the token is automatically created and added to the service account", serviceAccount.Name)
return err
}
return nil
}

View File

@ -23,6 +23,7 @@ import (
"k8s.io/kubernetes/pkg/admission"
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/api/errors"
"k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset/fake"
kubelet "k8s.io/kubernetes/pkg/kubelet/types"
"k8s.io/kubernetes/pkg/types"
@ -168,8 +169,8 @@ func TestAssignsDefaultServiceAccountAndRejectsMissingAPIToken(t *testing.T) {
pod := &api.Pod{}
attrs := admission.NewAttributesRecord(pod, api.Kind("Pod").WithVersion("version"), ns, "myname", api.Resource("pods").WithVersion("version"), "", admission.Create, nil)
err := admit.Admit(attrs)
if err == nil {
t.Errorf("Expected admission error for missing API token")
if err == nil || !errors.IsServerTimeout(err) {
t.Errorf("Expected server timeout error for missing API token: %v", err)
}
}