Report a watch error instead of eating it when we can't decode

Clients are required to handle watch events of type ERROR, so instead
of eating the decoding error we should pass it on to the client. Use
NewGenericServerError with isUnexpectedResponse to indicate that we
didn't get the bytes from the server we were expecting. For watch, the
415 error code is roughly correct and we will return an error to the
client that makes debugging a failure in either server watch or client
machinery much easier.

We do not alter the behavior when it appears the response is an EOF
or other disconnection.
This commit is contained in:
Clayton Coleman 2019-02-11 17:34:20 -05:00
parent f7c4389b79
commit 89620d5667
No known key found for this signature in database
GPG Key ID: 3D16906B4F1C5CB3
5 changed files with 186 additions and 34 deletions

View File

@ -603,3 +603,46 @@ func ReasonForError(err error) metav1.StatusReason {
}
return metav1.StatusReasonUnknown
}
// ErrorReporter converts generic errors into runtime.Object errors without
// requiring the caller to take a dependency on meta/v1 (where Status lives).
// This prevents circular dependencies in core watch code.
type ErrorReporter struct {
code int
verb string
reason string
}
// NewClientErrorReporter will respond with valid v1.Status objects that report
// unexpected server responses. Primarily used by watch to report errors when
// we attempt to decode a response from the server and it is not in the form
// we expect. Because watch is a dependency of the core api, we can't return
// meta/v1.Status in that package and so much inject this interface to convert a
// generic error as appropriate. The reason is passed as a unique status cause
// on the returned status, otherwise the generic "ClientError" is returned.
func NewClientErrorReporter(code int, verb string, reason string) *ErrorReporter {
return &ErrorReporter{
code: code,
verb: verb,
reason: reason,
}
}
// AsObject returns a valid error runtime.Object (a v1.Status) for the given
// error, using the code and verb of the reporter type. The error is set to
// indicate that this was an unexpected server response.
func (r *ErrorReporter) AsObject(err error) runtime.Object {
status := NewGenericServerResponse(r.code, r.verb, schema.GroupResource{}, "", err.Error(), 0, true)
if status.ErrStatus.Details == nil {
status.ErrStatus.Details = &metav1.StatusDetails{}
}
reason := r.reason
if len(reason) == 0 {
reason = "ClientError"
}
status.ErrStatus.Details.Causes = append(status.ErrStatus.Details.Causes, metav1.StatusCause{
Type: metav1.CauseType(reason),
Message: err.Error(),
})
return &status.ErrStatus
}

View File

@ -17,13 +17,15 @@ limitations under the License.
package watch
import (
"fmt"
"io"
"sync"
"k8s.io/klog"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/util/net"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/klog"
)
// Decoder allows StreamWatcher to watch any stream for which a Decoder can be written.
@ -39,19 +41,28 @@ type Decoder interface {
Close()
}
// Reporter hides the details of how an error is turned into a runtime.Object for
// reporting on a watch stream since this package may not import a higher level report.
type Reporter interface {
// AsObject must convert err into a valid runtime.Object for the watch stream.
AsObject(err error) runtime.Object
}
// StreamWatcher turns any stream for which you can write a Decoder interface
// into a watch.Interface.
type StreamWatcher struct {
sync.Mutex
source Decoder
result chan Event
stopped bool
source Decoder
reporter Reporter
result chan Event
stopped bool
}
// NewStreamWatcher creates a StreamWatcher from the given decoder.
func NewStreamWatcher(d Decoder) *StreamWatcher {
func NewStreamWatcher(d Decoder, r Reporter) *StreamWatcher {
sw := &StreamWatcher{
source: d,
source: d,
reporter: r,
// It's easy for a consumer to add buffering via an extra
// goroutine/channel, but impossible for them to remove it,
// so nonbuffered is better.
@ -102,11 +113,13 @@ func (sw *StreamWatcher) receive() {
case io.ErrUnexpectedEOF:
klog.V(1).Infof("Unexpected EOF during watch stream event decoding: %v", err)
default:
msg := "Unable to decode an event from the watch stream: %v"
if net.IsProbableEOF(err) {
klog.V(5).Infof(msg, err)
klog.V(5).Infof("Unable to decode an event from the watch stream: %v", err)
} else {
klog.Errorf(msg, err)
sw.result <- Event{
Type: Error,
Object: sw.reporter.AsObject(fmt.Errorf("unable to decode an event from the watch stream: %v", err)),
}
}
}
return

View File

@ -17,6 +17,7 @@ limitations under the License.
package watch_test
import (
"fmt"
"io"
"reflect"
"testing"
@ -27,9 +28,13 @@ import (
type fakeDecoder struct {
items chan Event
err error
}
func (f fakeDecoder) Decode() (action EventType, object runtime.Object, err error) {
if f.err != nil {
return "", nil, f.err
}
item, open := <-f.items
if !open {
return action, nil, io.EOF
@ -38,7 +43,18 @@ func (f fakeDecoder) Decode() (action EventType, object runtime.Object, err erro
}
func (f fakeDecoder) Close() {
close(f.items)
if f.items != nil {
close(f.items)
}
}
type fakeReporter struct {
err error
}
func (f *fakeReporter) AsObject(err error) runtime.Object {
f.err = err
return runtime.Unstructured(nil)
}
func TestStreamWatcher(t *testing.T) {
@ -46,8 +62,8 @@ func TestStreamWatcher(t *testing.T) {
{Type: Added, Object: testType("foo")},
}
fd := fakeDecoder{make(chan Event, 5)}
sw := NewStreamWatcher(fd)
fd := fakeDecoder{items: make(chan Event, 5)}
sw := NewStreamWatcher(fd, nil)
for _, item := range table {
fd.items <- item
@ -66,3 +82,26 @@ func TestStreamWatcher(t *testing.T) {
t.Errorf("Unexpected failure to close")
}
}
func TestStreamWatcherError(t *testing.T) {
fd := fakeDecoder{err: fmt.Errorf("test error")}
fr := &fakeReporter{}
sw := NewStreamWatcher(fd, fr)
evt, ok := <-sw.ResultChan()
if !ok {
t.Fatalf("unexpected close")
}
if evt.Type != Error || evt.Object != runtime.Unstructured(nil) {
t.Fatalf("unexpected object: %#v", evt)
}
_, ok = <-sw.ResultChan()
if ok {
t.Fatalf("unexpected open channel")
}
sw.Stop()
_, ok = <-sw.ResultChan()
if ok {
t.Fatalf("unexpected open channel")
}
}

View File

@ -595,7 +595,12 @@ func (r *Request) WatchWithSpecificDecoders(wrapperDecoderFn func(io.ReadCloser)
return nil, fmt.Errorf("for request '%+v', got status: %v", url, resp.StatusCode)
}
wrapperDecoder := wrapperDecoderFn(resp.Body)
return watch.NewStreamWatcher(restclientwatch.NewDecoder(wrapperDecoder, embeddedDecoder)), nil
return watch.NewStreamWatcher(
restclientwatch.NewDecoder(wrapperDecoder, embeddedDecoder),
// use 500 to indicate that the cause of the error is unknown - other error codes
// are more specific to HTTP interactions, and set a reason
errors.NewClientErrorReporter(http.StatusInternalServerError, r.verb, "ClientWatchDecoding"),
), nil
}
// updateURLMetrics is a convenience function for pushing metrics.

View File

@ -37,7 +37,7 @@ import (
"k8s.io/klog"
"k8s.io/api/core/v1"
v1 "k8s.io/api/core/v1"
apiequality "k8s.io/apimachinery/pkg/api/equality"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
@ -879,9 +879,17 @@ func TestTransformUnstructuredError(t *testing.T) {
}
}
type errorReader struct {
err error
}
func (r errorReader) Read(data []byte) (int, error) { return 0, r.err }
func (r errorReader) Close() error { return nil }
func TestRequestWatch(t *testing.T) {
testCases := []struct {
Request *Request
Expect []watch.Event
Err bool
ErrFn func(error) bool
Empty bool
@ -903,6 +911,40 @@ func TestRequestWatch(t *testing.T) {
},
Err: true,
},
{
Request: &Request{
content: defaultContentConfig(),
serializers: defaultSerializers(t),
client: clientFunc(func(req *http.Request) (*http.Response, error) {
resp := &http.Response{StatusCode: http.StatusOK, Body: errorReader{err: errors.New("test error")}}
return resp, nil
}),
baseURL: &url.URL{},
},
Expect: []watch.Event{
{
Type: watch.Error,
Object: &metav1.Status{
Status: "Failure",
Code: 500,
Reason: "InternalError",
Message: `an error on the server ("unable to decode an event from the watch stream: test error") has prevented the request from succeeding`,
Details: &metav1.StatusDetails{
Causes: []metav1.StatusCause{
{
Type: "UnexpectedServerResponse",
Message: "unable to decode an event from the watch stream: test error",
},
{
Type: "ClientWatchDecoding",
Message: "unable to decode an event from the watch stream: test error",
},
},
},
},
},
},
},
{
Request: &Request{
content: defaultContentConfig(),
@ -999,27 +1041,37 @@ func TestRequestWatch(t *testing.T) {
},
}
for i, testCase := range testCases {
t.Logf("testcase %v", testCase.Request)
testCase.Request.backoffMgr = &NoBackoff{}
watch, err := testCase.Request.Watch()
hasErr := err != nil
if hasErr != testCase.Err {
t.Errorf("%d: expected %t, got %t: %v", i, testCase.Err, hasErr, err)
continue
}
if testCase.ErrFn != nil && !testCase.ErrFn(err) {
t.Errorf("%d: error not valid: %v", i, err)
}
if hasErr && watch != nil {
t.Errorf("%d: watch should be nil when error is returned", i)
continue
}
if testCase.Empty {
_, ok := <-watch.ResultChan()
if ok {
t.Errorf("%d: expected the watch to be empty: %#v", i, watch)
t.Run("", func(t *testing.T) {
testCase.Request.backoffMgr = &NoBackoff{}
watch, err := testCase.Request.Watch()
hasErr := err != nil
if hasErr != testCase.Err {
t.Fatalf("%d: expected %t, got %t: %v", i, testCase.Err, hasErr, err)
}
}
if testCase.ErrFn != nil && !testCase.ErrFn(err) {
t.Errorf("%d: error not valid: %v", i, err)
}
if hasErr && watch != nil {
t.Fatalf("%d: watch should be nil when error is returned", i)
}
if testCase.Empty {
_, ok := <-watch.ResultChan()
if ok {
t.Errorf("%d: expected the watch to be empty: %#v", i, watch)
}
}
if testCase.Expect != nil {
for i, evt := range testCase.Expect {
out, ok := <-watch.ResultChan()
if !ok {
t.Fatalf("Watch closed early, %d/%d read", i, len(testCase.Expect))
}
if !reflect.DeepEqual(evt, out) {
t.Fatalf("Event %d does not match: %s", i, diff.ObjectReflectDiff(evt, out))
}
}
}
})
}
}