Merge pull request #2508 from lavalamp/fix4

Don't hold up the entire event queue for a single bad event.
This commit is contained in:
Eric Tune 2014-11-20 17:22:03 -08:00
commit ea58ab7221
5 changed files with 183 additions and 26 deletions

View File

@ -24,34 +24,46 @@ import (
"github.com/GoogleCloudPlatform/kubernetes/pkg/runtime"
)
// statusError is an error intended for consumption by a REST API server.
type statusError struct {
status api.Status
// StatusError is an error intended for consumption by a REST API server; it can also be
// reconstructed by clients from a REST response. Public to allow easy type switches.
type StatusError struct {
ErrStatus api.Status
}
// Error implements the Error interface.
func (e *statusError) Error() string {
return e.status.Message
func (e *StatusError) Error() string {
return e.ErrStatus.Message
}
// Status converts this error into an api.Status object.
func (e *statusError) Status() api.Status {
return e.status
// Status allows access to e's status without having to know the detailed workings
// of StatusError. Used by pkg/apiserver.
func (e *StatusError) Status() api.Status {
return e.ErrStatus
}
// FromObject generates an statusError from an api.Status, if that is the type of obj; otherwise,
// returns an error created by fmt.Errorf.
// UnexpectedObjectError can be returned by FromObject if it's passed a non-status object.
type UnexpectedObjectError struct {
Object runtime.Object
}
// Error returns an error message describing 'u'.
func (u *UnexpectedObjectError) Error() string {
return fmt.Sprintf("unexpected object: %v", u.Object)
}
// FromObject generates an StatusError from an api.Status, if that is the type of obj; otherwise,
// returns an UnexpecteObjectError.
func FromObject(obj runtime.Object) error {
switch t := obj.(type) {
case *api.Status:
return &statusError{*t}
return &StatusError{*t}
}
return fmt.Errorf("unexpected object: %v", obj)
return &UnexpectedObjectError{obj}
}
// NewNotFound returns a new error which indicates that the resource of the kind and the name was not found.
func NewNotFound(kind, name string) error {
return &statusError{api.Status{
return &StatusError{api.Status{
Status: api.StatusFailure,
Code: http.StatusNotFound,
Reason: api.StatusReasonNotFound,
@ -65,7 +77,7 @@ func NewNotFound(kind, name string) error {
// NewAlreadyExists returns an error indicating the item requested exists by that identifier.
func NewAlreadyExists(kind, name string) error {
return &statusError{api.Status{
return &StatusError{api.Status{
Status: api.StatusFailure,
Code: http.StatusConflict,
Reason: api.StatusReasonAlreadyExists,
@ -79,7 +91,7 @@ func NewAlreadyExists(kind, name string) error {
// NewConflict returns an error indicating the item can't be updated as provided.
func NewConflict(kind, name string, err error) error {
return &statusError{api.Status{
return &StatusError{api.Status{
Status: api.StatusFailure,
Code: http.StatusConflict,
Reason: api.StatusReasonConflict,
@ -103,7 +115,7 @@ func NewInvalid(kind, name string, errs ValidationErrorList) error {
})
}
}
return &statusError{api.Status{
return &StatusError{api.Status{
Status: api.StatusFailure,
Code: 422, // RFC 4918: StatusUnprocessableEntity
Reason: api.StatusReasonInvalid,
@ -118,7 +130,7 @@ func NewInvalid(kind, name string, errs ValidationErrorList) error {
// NewBadRequest creates an error that indicates that the request is invalid and can not be processed.
func NewBadRequest(reason string) error {
return &statusError{
return &StatusError{
api.Status{
Status: api.StatusFailure,
Code: http.StatusBadRequest,
@ -134,7 +146,7 @@ func NewBadRequest(reason string) error {
// NewInternalError returns an error indicating the item is invalid and cannot be processed.
func NewInternalError(err error) error {
return &statusError{api.Status{
return &StatusError{api.Status{
Status: api.StatusFailure,
Code: http.StatusInternalServerError,
Reason: api.StatusReasonInternalError,
@ -172,8 +184,8 @@ func IsBadRequest(err error) bool {
func reasonForError(err error) api.StatusReason {
switch t := err.(type) {
case *statusError:
return t.status.Reason
case *StatusError:
return t.ErrStatus.Reason
}
return api.StatusReasonUnknown
}

View File

@ -117,7 +117,7 @@ func TestNewInvalid(t *testing.T) {
vErr, expected := testCase.Err, testCase.Details
expected.Causes[0].Message = vErr.Error()
err := NewInvalid("kind", "name", ValidationErrorList{vErr})
status := err.(*statusError).Status()
status := err.(*StatusError).ErrStatus
if status.Code != 422 || status.Reason != api.StatusReasonInvalid {
t.Errorf("%d: unexpected status: %#v", i, status)
}

View File

@ -21,6 +21,8 @@ import (
"time"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api/errors"
"github.com/GoogleCloudPlatform/kubernetes/pkg/client"
"github.com/GoogleCloudPlatform/kubernetes/pkg/runtime"
"github.com/GoogleCloudPlatform/kubernetes/pkg/util"
"github.com/GoogleCloudPlatform/kubernetes/pkg/watch"
@ -30,6 +32,8 @@ import (
// EventRecorder knows how to store events (client.Client implements it.)
// EventRecorder must respect the namespace that will be embedded in 'event'.
// It is assumed that EventRecorder will return the same sorts of errors as
// pkg/client's REST client.
type EventRecorder interface {
Create(event *api.Event) (*api.Event, error)
}
@ -44,13 +48,39 @@ func StartRecording(recorder EventRecorder, sourceName string) watch.Interface {
eventCopy := *event
event = &eventCopy
event.Source = sourceName
try := 0
for {
try++
_, err := recorder.Create(event)
if err == nil {
break
}
glog.Errorf("Sleeping: Unable to write event: %v", err)
time.Sleep(10 * time.Second)
// If we can't contact the server, then hold everything while we keep trying.
// Otherwise, something about the event is malformed and we should abandon it.
giveUp := false
switch err.(type) {
case *client.RequestConstructionError:
// We will construct the request the same next time, so don't keep trying.
giveUp = true
case *errors.StatusError:
// This indicates that the server understood and rejected our request.
giveUp = true
case *errors.UnexpectedObjectError:
// We don't expect this; it implies the server's response didn't match a
// known pattern. Go ahead and retry.
default:
// This case includes actual http transport errors. Go ahead and retry.
}
if giveUp {
glog.Errorf("Unable to write event '%#v': '%v' (will not retry!)", event, err)
break
}
if try >= 3 {
glog.Errorf("Unable to write event '%#v': '%v' (retry limit exceeded!)", event, err)
break
}
glog.Errorf("Unable to write event: '%v' (will retry in 1 second)", err)
time.Sleep(1 * time.Second)
}
})
}

View File

@ -23,6 +23,8 @@ import (
"testing"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api/errors"
"github.com/GoogleCloudPlatform/kubernetes/pkg/client"
"github.com/GoogleCloudPlatform/kubernetes/pkg/client/record"
"github.com/GoogleCloudPlatform/kubernetes/pkg/runtime"
"github.com/GoogleCloudPlatform/kubernetes/pkg/util"
@ -158,3 +160,81 @@ func TestEventf(t *testing.T) {
logger2.Stop()
}
}
func TestWriteEventError(t *testing.T) {
ref := &api.ObjectReference{
Kind: "Pod",
Name: "foo",
Namespace: "baz",
UID: "bar",
APIVersion: "v1beta1",
}
type entry struct {
timesToSendError int
attemptsMade int
attemptsWanted int
err error
}
table := map[string]*entry{
"giveUp1": {
timesToSendError: 1000,
attemptsWanted: 1,
err: &client.RequestConstructionError{},
},
"giveUp2": {
timesToSendError: 1000,
attemptsWanted: 1,
err: &errors.StatusError{},
},
"retry1": {
timesToSendError: 1000,
attemptsWanted: 3,
err: &errors.UnexpectedObjectError{},
},
"retry2": {
timesToSendError: 1000,
attemptsWanted: 3,
err: fmt.Errorf("A weird error"),
},
"succeedEventually": {
timesToSendError: 2,
attemptsWanted: 2,
err: fmt.Errorf("A weird error"),
},
}
done := make(chan struct{})
defer record.StartRecording(
&testEventRecorder{
OnEvent: func(event *api.Event) (*api.Event, error) {
if event.Message == "finished" {
close(done)
return event, nil
}
item, ok := table[event.Message]
if !ok {
t.Errorf("Unexpected event: %#v", event)
return event, nil
}
item.attemptsMade++
if item.attemptsMade < item.timesToSendError {
return nil, item.err
}
return event, nil
},
},
"eventTest",
).Stop()
for caseName := range table {
record.Event(ref, "Status", "Reason", caseName)
}
record.Event(ref, "Status", "Reason", "finished")
<-done
for caseName, item := range table {
if e, a := item.attemptsWanted, item.attemptsMade; e != a {
t.Errorf("case %v: wanted %v, got %v attempts", caseName, e, a)
}
}
}

View File

@ -51,6 +51,29 @@ type HTTPClient interface {
Do(req *http.Request) (*http.Response, error)
}
// UnexpectedStatusError is returned as an error if a response's body and HTTP code don't
// make sense together.
type UnexpectedStatusError struct {
Request *http.Request
Response *http.Response
Body string
}
// Error returns a textual description of 'u'.
func (u *UnexpectedStatusError) Error() string {
return fmt.Sprintf("request [%#v] failed (%d) %s: %s", u.Request, u.Response.StatusCode, u.Response.Status, u.Body)
}
// RequestConstructionError is returned when there's an error assembling a request.
type RequestConstructionError struct {
Err error
}
// Error returns a textual description of 'r'.
func (r *RequestConstructionError) Error() string {
return fmt.Sprintf("request construction error: '%v'", r.Err)
}
// Request allows for building up a request to a server in a chained fashion.
// Any errors are stored until the end of your call, so you only have to
// check once.
@ -310,6 +333,13 @@ func (r *Request) Stream() (io.ReadCloser, error) {
// Do formats and executes the request. Returns a Result object for easy response
// processing. Handles polling the server in the event a continuation was sent.
//
// Error type:
// * If the request can't be constructed, or an error happened earlier while building its
// arguments: *RequestConstructionError
// * If the server responds with a status: *errors.StatusError or *errors.UnexpectedObjectError
// * If the status code and body don't make sense together: *UnexpectedStatusError
// * http.Client.Do errors are returned directly.
func (r *Request) Do() Result {
client := r.client
if client == nil {
@ -318,12 +348,12 @@ func (r *Request) Do() Result {
for {
if r.err != nil {
return Result{err: r.err}
return Result{err: &RequestConstructionError{r.err}}
}
req, err := http.NewRequest(r.verb, r.finalURL(), r.body)
if err != nil {
return Result{err: err}
return Result{err: &RequestConstructionError{err}}
}
resp, err := client.Do(req)
@ -381,7 +411,11 @@ func (r *Request) transformResponse(resp *http.Response, req *http.Request) ([]b
switch {
case resp.StatusCode < http.StatusOK || resp.StatusCode > http.StatusPartialContent:
if !isStatusResponse {
return nil, false, fmt.Errorf("request [%#v] failed (%d) %s: %s", req, resp.StatusCode, resp.Status, string(body))
return nil, false, &UnexpectedStatusError{
Request: req,
Response: resp,
Body: string(body),
}
}
return nil, false, errors.FromObject(&status)
}
@ -435,6 +469,7 @@ func (r Result) WasCreated(wasCreated *bool) Result {
}
// Error returns the error executing the request, nil if no error occurred.
// See the Request.Do() comment for what errors you might get.
func (r Result) Error() error {
return r.err
}