mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-08-04 18:00:08 +00:00
Don't hold up the entire event queue for a single bad event. Also, don't retry forever.
This commit is contained in:
parent
c6158b8aa9
commit
4437f03dbf
@ -24,34 +24,46 @@ import (
|
|||||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/runtime"
|
"github.com/GoogleCloudPlatform/kubernetes/pkg/runtime"
|
||||||
)
|
)
|
||||||
|
|
||||||
// statusError is an error intended for consumption by a REST API server.
|
// StatusError is an error intended for consumption by a REST API server; it can also be
|
||||||
type statusError struct {
|
// reconstructed by clients from a REST response. Public to allow easy type switches.
|
||||||
status api.Status
|
type StatusError struct {
|
||||||
|
ErrStatus api.Status
|
||||||
}
|
}
|
||||||
|
|
||||||
// Error implements the Error interface.
|
// Error implements the Error interface.
|
||||||
func (e *statusError) Error() string {
|
func (e *StatusError) Error() string {
|
||||||
return e.status.Message
|
return e.ErrStatus.Message
|
||||||
}
|
}
|
||||||
|
|
||||||
// Status converts this error into an api.Status object.
|
// Status allows access to e's status without having to know the detailed workings
|
||||||
func (e *statusError) Status() api.Status {
|
// of StatusError. Used by pkg/apiserver.
|
||||||
return e.status
|
func (e *StatusError) Status() api.Status {
|
||||||
|
return e.ErrStatus
|
||||||
}
|
}
|
||||||
|
|
||||||
// FromObject generates an statusError from an api.Status, if that is the type of obj; otherwise,
|
// UnexpectedObjectError can be returned by FromObject if it's passed a non-status object.
|
||||||
// returns an error created by fmt.Errorf.
|
type UnexpectedObjectError struct {
|
||||||
|
Object runtime.Object
|
||||||
|
}
|
||||||
|
|
||||||
|
// Error returns an error message describing 'u'.
|
||||||
|
func (u *UnexpectedObjectError) Error() string {
|
||||||
|
return fmt.Sprintf("unexpected object: %v", u.Object)
|
||||||
|
}
|
||||||
|
|
||||||
|
// FromObject generates an StatusError from an api.Status, if that is the type of obj; otherwise,
|
||||||
|
// returns an UnexpecteObjectError.
|
||||||
func FromObject(obj runtime.Object) error {
|
func FromObject(obj runtime.Object) error {
|
||||||
switch t := obj.(type) {
|
switch t := obj.(type) {
|
||||||
case *api.Status:
|
case *api.Status:
|
||||||
return &statusError{*t}
|
return &StatusError{*t}
|
||||||
}
|
}
|
||||||
return fmt.Errorf("unexpected object: %v", obj)
|
return &UnexpectedObjectError{obj}
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewNotFound returns a new error which indicates that the resource of the kind and the name was not found.
|
// NewNotFound returns a new error which indicates that the resource of the kind and the name was not found.
|
||||||
func NewNotFound(kind, name string) error {
|
func NewNotFound(kind, name string) error {
|
||||||
return &statusError{api.Status{
|
return &StatusError{api.Status{
|
||||||
Status: api.StatusFailure,
|
Status: api.StatusFailure,
|
||||||
Code: http.StatusNotFound,
|
Code: http.StatusNotFound,
|
||||||
Reason: api.StatusReasonNotFound,
|
Reason: api.StatusReasonNotFound,
|
||||||
@ -65,7 +77,7 @@ func NewNotFound(kind, name string) error {
|
|||||||
|
|
||||||
// NewAlreadyExists returns an error indicating the item requested exists by that identifier.
|
// NewAlreadyExists returns an error indicating the item requested exists by that identifier.
|
||||||
func NewAlreadyExists(kind, name string) error {
|
func NewAlreadyExists(kind, name string) error {
|
||||||
return &statusError{api.Status{
|
return &StatusError{api.Status{
|
||||||
Status: api.StatusFailure,
|
Status: api.StatusFailure,
|
||||||
Code: http.StatusConflict,
|
Code: http.StatusConflict,
|
||||||
Reason: api.StatusReasonAlreadyExists,
|
Reason: api.StatusReasonAlreadyExists,
|
||||||
@ -79,7 +91,7 @@ func NewAlreadyExists(kind, name string) error {
|
|||||||
|
|
||||||
// NewConflict returns an error indicating the item can't be updated as provided.
|
// NewConflict returns an error indicating the item can't be updated as provided.
|
||||||
func NewConflict(kind, name string, err error) error {
|
func NewConflict(kind, name string, err error) error {
|
||||||
return &statusError{api.Status{
|
return &StatusError{api.Status{
|
||||||
Status: api.StatusFailure,
|
Status: api.StatusFailure,
|
||||||
Code: http.StatusConflict,
|
Code: http.StatusConflict,
|
||||||
Reason: api.StatusReasonConflict,
|
Reason: api.StatusReasonConflict,
|
||||||
@ -103,7 +115,7 @@ func NewInvalid(kind, name string, errs ValidationErrorList) error {
|
|||||||
})
|
})
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return &statusError{api.Status{
|
return &StatusError{api.Status{
|
||||||
Status: api.StatusFailure,
|
Status: api.StatusFailure,
|
||||||
Code: 422, // RFC 4918: StatusUnprocessableEntity
|
Code: 422, // RFC 4918: StatusUnprocessableEntity
|
||||||
Reason: api.StatusReasonInvalid,
|
Reason: api.StatusReasonInvalid,
|
||||||
@ -118,7 +130,7 @@ func NewInvalid(kind, name string, errs ValidationErrorList) error {
|
|||||||
|
|
||||||
// NewBadRequest creates an error that indicates that the request is invalid and can not be processed.
|
// NewBadRequest creates an error that indicates that the request is invalid and can not be processed.
|
||||||
func NewBadRequest(reason string) error {
|
func NewBadRequest(reason string) error {
|
||||||
return &statusError{
|
return &StatusError{
|
||||||
api.Status{
|
api.Status{
|
||||||
Status: api.StatusFailure,
|
Status: api.StatusFailure,
|
||||||
Code: http.StatusBadRequest,
|
Code: http.StatusBadRequest,
|
||||||
@ -134,7 +146,7 @@ func NewBadRequest(reason string) error {
|
|||||||
|
|
||||||
// NewInternalError returns an error indicating the item is invalid and cannot be processed.
|
// NewInternalError returns an error indicating the item is invalid and cannot be processed.
|
||||||
func NewInternalError(err error) error {
|
func NewInternalError(err error) error {
|
||||||
return &statusError{api.Status{
|
return &StatusError{api.Status{
|
||||||
Status: api.StatusFailure,
|
Status: api.StatusFailure,
|
||||||
Code: http.StatusInternalServerError,
|
Code: http.StatusInternalServerError,
|
||||||
Reason: api.StatusReasonInternalError,
|
Reason: api.StatusReasonInternalError,
|
||||||
@ -172,8 +184,8 @@ func IsBadRequest(err error) bool {
|
|||||||
|
|
||||||
func reasonForError(err error) api.StatusReason {
|
func reasonForError(err error) api.StatusReason {
|
||||||
switch t := err.(type) {
|
switch t := err.(type) {
|
||||||
case *statusError:
|
case *StatusError:
|
||||||
return t.status.Reason
|
return t.ErrStatus.Reason
|
||||||
}
|
}
|
||||||
return api.StatusReasonUnknown
|
return api.StatusReasonUnknown
|
||||||
}
|
}
|
||||||
|
@ -117,7 +117,7 @@ func TestNewInvalid(t *testing.T) {
|
|||||||
vErr, expected := testCase.Err, testCase.Details
|
vErr, expected := testCase.Err, testCase.Details
|
||||||
expected.Causes[0].Message = vErr.Error()
|
expected.Causes[0].Message = vErr.Error()
|
||||||
err := NewInvalid("kind", "name", ValidationErrorList{vErr})
|
err := NewInvalid("kind", "name", ValidationErrorList{vErr})
|
||||||
status := err.(*statusError).Status()
|
status := err.(*StatusError).ErrStatus
|
||||||
if status.Code != 422 || status.Reason != api.StatusReasonInvalid {
|
if status.Code != 422 || status.Reason != api.StatusReasonInvalid {
|
||||||
t.Errorf("%d: unexpected status: %#v", i, status)
|
t.Errorf("%d: unexpected status: %#v", i, status)
|
||||||
}
|
}
|
||||||
|
@ -21,6 +21,8 @@ import (
|
|||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
|
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
|
||||||
|
"github.com/GoogleCloudPlatform/kubernetes/pkg/api/errors"
|
||||||
|
"github.com/GoogleCloudPlatform/kubernetes/pkg/client"
|
||||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/runtime"
|
"github.com/GoogleCloudPlatform/kubernetes/pkg/runtime"
|
||||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/util"
|
"github.com/GoogleCloudPlatform/kubernetes/pkg/util"
|
||||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/watch"
|
"github.com/GoogleCloudPlatform/kubernetes/pkg/watch"
|
||||||
@ -30,6 +32,8 @@ import (
|
|||||||
|
|
||||||
// EventRecorder knows how to store events (client.Client implements it.)
|
// EventRecorder knows how to store events (client.Client implements it.)
|
||||||
// EventRecorder must respect the namespace that will be embedded in 'event'.
|
// EventRecorder must respect the namespace that will be embedded in 'event'.
|
||||||
|
// It is assumed that EventRecorder will return the same sorts of errors as
|
||||||
|
// pkg/client's REST client.
|
||||||
type EventRecorder interface {
|
type EventRecorder interface {
|
||||||
Create(event *api.Event) (*api.Event, error)
|
Create(event *api.Event) (*api.Event, error)
|
||||||
}
|
}
|
||||||
@ -44,13 +48,39 @@ func StartRecording(recorder EventRecorder, sourceName string) watch.Interface {
|
|||||||
eventCopy := *event
|
eventCopy := *event
|
||||||
event = &eventCopy
|
event = &eventCopy
|
||||||
event.Source = sourceName
|
event.Source = sourceName
|
||||||
|
try := 0
|
||||||
for {
|
for {
|
||||||
|
try++
|
||||||
_, err := recorder.Create(event)
|
_, err := recorder.Create(event)
|
||||||
if err == nil {
|
if err == nil {
|
||||||
break
|
break
|
||||||
}
|
}
|
||||||
glog.Errorf("Sleeping: Unable to write event: %v", err)
|
// If we can't contact the server, then hold everything while we keep trying.
|
||||||
time.Sleep(10 * time.Second)
|
// Otherwise, something about the event is malformed and we should abandon it.
|
||||||
|
giveUp := false
|
||||||
|
switch err.(type) {
|
||||||
|
case *client.RequestConstructionError:
|
||||||
|
// We will construct the request the same next time, so don't keep trying.
|
||||||
|
giveUp = true
|
||||||
|
case *errors.StatusError:
|
||||||
|
// This indicates that the server understood and rejected our request.
|
||||||
|
giveUp = true
|
||||||
|
case *errors.UnexpectedObjectError:
|
||||||
|
// We don't expect this; it implies the server's response didn't match a
|
||||||
|
// known pattern. Go ahead and retry.
|
||||||
|
default:
|
||||||
|
// This case includes actual http transport errors. Go ahead and retry.
|
||||||
|
}
|
||||||
|
if giveUp {
|
||||||
|
glog.Errorf("Unable to write event '%#v': '%v' (will not retry!)", event, err)
|
||||||
|
break
|
||||||
|
}
|
||||||
|
if try >= 3 {
|
||||||
|
glog.Errorf("Unable to write event '%#v': '%v' (retry limit exceeded!)", event, err)
|
||||||
|
break
|
||||||
|
}
|
||||||
|
glog.Errorf("Unable to write event: '%v' (will retry in 1 second)", err)
|
||||||
|
time.Sleep(1 * time.Second)
|
||||||
}
|
}
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
@ -23,6 +23,8 @@ import (
|
|||||||
"testing"
|
"testing"
|
||||||
|
|
||||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
|
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
|
||||||
|
"github.com/GoogleCloudPlatform/kubernetes/pkg/api/errors"
|
||||||
|
"github.com/GoogleCloudPlatform/kubernetes/pkg/client"
|
||||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/client/record"
|
"github.com/GoogleCloudPlatform/kubernetes/pkg/client/record"
|
||||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/runtime"
|
"github.com/GoogleCloudPlatform/kubernetes/pkg/runtime"
|
||||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/util"
|
"github.com/GoogleCloudPlatform/kubernetes/pkg/util"
|
||||||
@ -158,3 +160,81 @@ func TestEventf(t *testing.T) {
|
|||||||
logger2.Stop()
|
logger2.Stop()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestWriteEventError(t *testing.T) {
|
||||||
|
ref := &api.ObjectReference{
|
||||||
|
Kind: "Pod",
|
||||||
|
Name: "foo",
|
||||||
|
Namespace: "baz",
|
||||||
|
UID: "bar",
|
||||||
|
APIVersion: "v1beta1",
|
||||||
|
}
|
||||||
|
type entry struct {
|
||||||
|
timesToSendError int
|
||||||
|
attemptsMade int
|
||||||
|
attemptsWanted int
|
||||||
|
err error
|
||||||
|
}
|
||||||
|
table := map[string]*entry{
|
||||||
|
"giveUp1": {
|
||||||
|
timesToSendError: 1000,
|
||||||
|
attemptsWanted: 1,
|
||||||
|
err: &client.RequestConstructionError{},
|
||||||
|
},
|
||||||
|
"giveUp2": {
|
||||||
|
timesToSendError: 1000,
|
||||||
|
attemptsWanted: 1,
|
||||||
|
err: &errors.StatusError{},
|
||||||
|
},
|
||||||
|
"retry1": {
|
||||||
|
timesToSendError: 1000,
|
||||||
|
attemptsWanted: 3,
|
||||||
|
err: &errors.UnexpectedObjectError{},
|
||||||
|
},
|
||||||
|
"retry2": {
|
||||||
|
timesToSendError: 1000,
|
||||||
|
attemptsWanted: 3,
|
||||||
|
err: fmt.Errorf("A weird error"),
|
||||||
|
},
|
||||||
|
"succeedEventually": {
|
||||||
|
timesToSendError: 2,
|
||||||
|
attemptsWanted: 2,
|
||||||
|
err: fmt.Errorf("A weird error"),
|
||||||
|
},
|
||||||
|
}
|
||||||
|
done := make(chan struct{})
|
||||||
|
|
||||||
|
defer record.StartRecording(
|
||||||
|
&testEventRecorder{
|
||||||
|
OnEvent: func(event *api.Event) (*api.Event, error) {
|
||||||
|
if event.Message == "finished" {
|
||||||
|
close(done)
|
||||||
|
return event, nil
|
||||||
|
}
|
||||||
|
item, ok := table[event.Message]
|
||||||
|
if !ok {
|
||||||
|
t.Errorf("Unexpected event: %#v", event)
|
||||||
|
return event, nil
|
||||||
|
}
|
||||||
|
item.attemptsMade++
|
||||||
|
if item.attemptsMade < item.timesToSendError {
|
||||||
|
return nil, item.err
|
||||||
|
}
|
||||||
|
return event, nil
|
||||||
|
},
|
||||||
|
},
|
||||||
|
"eventTest",
|
||||||
|
).Stop()
|
||||||
|
|
||||||
|
for caseName := range table {
|
||||||
|
record.Event(ref, "Status", "Reason", caseName)
|
||||||
|
}
|
||||||
|
record.Event(ref, "Status", "Reason", "finished")
|
||||||
|
<-done
|
||||||
|
|
||||||
|
for caseName, item := range table {
|
||||||
|
if e, a := item.attemptsWanted, item.attemptsMade; e != a {
|
||||||
|
t.Errorf("case %v: wanted %v, got %v attempts", caseName, e, a)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
@ -51,6 +51,29 @@ type HTTPClient interface {
|
|||||||
Do(req *http.Request) (*http.Response, error)
|
Do(req *http.Request) (*http.Response, error)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// UnexpectedStatusError is returned as an error if a response's body and HTTP code don't
|
||||||
|
// make sense together.
|
||||||
|
type UnexpectedStatusError struct {
|
||||||
|
Request *http.Request
|
||||||
|
Response *http.Response
|
||||||
|
Body string
|
||||||
|
}
|
||||||
|
|
||||||
|
// Error returns a textual description of 'u'.
|
||||||
|
func (u *UnexpectedStatusError) Error() string {
|
||||||
|
return fmt.Sprintf("request [%#v] failed (%d) %s: %s", u.Request, u.Response.StatusCode, u.Response.Status, u.Body)
|
||||||
|
}
|
||||||
|
|
||||||
|
// RequestConstructionError is returned when there's an error assembling a request.
|
||||||
|
type RequestConstructionError struct {
|
||||||
|
Err error
|
||||||
|
}
|
||||||
|
|
||||||
|
// Error returns a textual description of 'r'.
|
||||||
|
func (r *RequestConstructionError) Error() string {
|
||||||
|
return fmt.Sprintf("request construction error: '%v'", r.Err)
|
||||||
|
}
|
||||||
|
|
||||||
// Request allows for building up a request to a server in a chained fashion.
|
// Request allows for building up a request to a server in a chained fashion.
|
||||||
// Any errors are stored until the end of your call, so you only have to
|
// Any errors are stored until the end of your call, so you only have to
|
||||||
// check once.
|
// check once.
|
||||||
@ -310,6 +333,13 @@ func (r *Request) Stream() (io.ReadCloser, error) {
|
|||||||
|
|
||||||
// Do formats and executes the request. Returns a Result object for easy response
|
// Do formats and executes the request. Returns a Result object for easy response
|
||||||
// processing. Handles polling the server in the event a continuation was sent.
|
// processing. Handles polling the server in the event a continuation was sent.
|
||||||
|
//
|
||||||
|
// Error type:
|
||||||
|
// * If the request can't be constructed, or an error happened earlier while building its
|
||||||
|
// arguments: *RequestConstructionError
|
||||||
|
// * If the server responds with a status: *errors.StatusError or *errors.UnexpectedObjectError
|
||||||
|
// * If the status code and body don't make sense together: *UnexpectedStatusError
|
||||||
|
// * http.Client.Do errors are returned directly.
|
||||||
func (r *Request) Do() Result {
|
func (r *Request) Do() Result {
|
||||||
client := r.client
|
client := r.client
|
||||||
if client == nil {
|
if client == nil {
|
||||||
@ -318,12 +348,12 @@ func (r *Request) Do() Result {
|
|||||||
|
|
||||||
for {
|
for {
|
||||||
if r.err != nil {
|
if r.err != nil {
|
||||||
return Result{err: r.err}
|
return Result{err: &RequestConstructionError{r.err}}
|
||||||
}
|
}
|
||||||
|
|
||||||
req, err := http.NewRequest(r.verb, r.finalURL(), r.body)
|
req, err := http.NewRequest(r.verb, r.finalURL(), r.body)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return Result{err: err}
|
return Result{err: &RequestConstructionError{err}}
|
||||||
}
|
}
|
||||||
|
|
||||||
resp, err := client.Do(req)
|
resp, err := client.Do(req)
|
||||||
@ -381,7 +411,11 @@ func (r *Request) transformResponse(resp *http.Response, req *http.Request) ([]b
|
|||||||
switch {
|
switch {
|
||||||
case resp.StatusCode < http.StatusOK || resp.StatusCode > http.StatusPartialContent:
|
case resp.StatusCode < http.StatusOK || resp.StatusCode > http.StatusPartialContent:
|
||||||
if !isStatusResponse {
|
if !isStatusResponse {
|
||||||
return nil, false, fmt.Errorf("request [%#v] failed (%d) %s: %s", req, resp.StatusCode, resp.Status, string(body))
|
return nil, false, &UnexpectedStatusError{
|
||||||
|
Request: req,
|
||||||
|
Response: resp,
|
||||||
|
Body: string(body),
|
||||||
|
}
|
||||||
}
|
}
|
||||||
return nil, false, errors.FromObject(&status)
|
return nil, false, errors.FromObject(&status)
|
||||||
}
|
}
|
||||||
@ -435,6 +469,7 @@ func (r Result) WasCreated(wasCreated *bool) Result {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Error returns the error executing the request, nil if no error occurred.
|
// Error returns the error executing the request, nil if no error occurred.
|
||||||
|
// See the Request.Do() comment for what errors you might get.
|
||||||
func (r Result) Error() error {
|
func (r Result) Error() error {
|
||||||
return r.err
|
return r.err
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user