From cd12199def5832afb81d78426c3b2758ee70110f Mon Sep 17 00:00:00 2001 From: Clayton Coleman Date: Mon, 11 Feb 2019 17:34:20 -0500 Subject: [PATCH] Report a watch error instead of eating it when we can't decode Clients are required to handle watch events of type ERROR, so instead of eating the decoding error we should pass it on to the client. Use NewGenericServerError with isUnexpectedResponse to indicate that we didn't get the bytes from the server we were expecting. For watch, the 415 error code is roughly correct and we will return an error to the client that makes debugging a failure in either server watch or client machinery much easier. We do not alter the behavior when it appears the response is an EOF or other disconnection. Kubernetes-commit: 89620d5667adec6c132b2713b79efb1dd2391723 --- rest/request.go | 7 +++- rest/request_test.go | 94 ++++++++++++++++++++++++++++++++++---------- 2 files changed, 79 insertions(+), 22 deletions(-) diff --git a/rest/request.go b/rest/request.go index 48fb4e67..0570615f 100644 --- a/rest/request.go +++ b/rest/request.go @@ -595,7 +595,12 @@ func (r *Request) WatchWithSpecificDecoders(wrapperDecoderFn func(io.ReadCloser) return nil, fmt.Errorf("for request %s, got status: %v", url, resp.StatusCode) } wrapperDecoder := wrapperDecoderFn(resp.Body) - return watch.NewStreamWatcher(restclientwatch.NewDecoder(wrapperDecoder, embeddedDecoder)), nil + return watch.NewStreamWatcher( + restclientwatch.NewDecoder(wrapperDecoder, embeddedDecoder), + // use 500 to indicate that the cause of the error is unknown - other error codes + // are more specific to HTTP interactions, and set a reason + errors.NewClientErrorReporter(http.StatusInternalServerError, r.verb, "ClientWatchDecoding"), + ), nil } // updateURLMetrics is a convenience function for pushing metrics. diff --git a/rest/request_test.go b/rest/request_test.go index e70770dc..99ba7069 100755 --- a/rest/request_test.go +++ b/rest/request_test.go @@ -37,7 +37,7 @@ import ( "k8s.io/klog" - "k8s.io/api/core/v1" + v1 "k8s.io/api/core/v1" apiequality "k8s.io/apimachinery/pkg/api/equality" apierrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -879,9 +879,17 @@ func TestTransformUnstructuredError(t *testing.T) { } } +type errorReader struct { + err error +} + +func (r errorReader) Read(data []byte) (int, error) { return 0, r.err } +func (r errorReader) Close() error { return nil } + func TestRequestWatch(t *testing.T) { testCases := []struct { Request *Request + Expect []watch.Event Err bool ErrFn func(error) bool Empty bool @@ -903,6 +911,40 @@ func TestRequestWatch(t *testing.T) { }, Err: true, }, + { + Request: &Request{ + content: defaultContentConfig(), + serializers: defaultSerializers(t), + client: clientFunc(func(req *http.Request) (*http.Response, error) { + resp := &http.Response{StatusCode: http.StatusOK, Body: errorReader{err: errors.New("test error")}} + return resp, nil + }), + baseURL: &url.URL{}, + }, + Expect: []watch.Event{ + { + Type: watch.Error, + Object: &metav1.Status{ + Status: "Failure", + Code: 500, + Reason: "InternalError", + Message: `an error on the server ("unable to decode an event from the watch stream: test error") has prevented the request from succeeding`, + Details: &metav1.StatusDetails{ + Causes: []metav1.StatusCause{ + { + Type: "UnexpectedServerResponse", + Message: "unable to decode an event from the watch stream: test error", + }, + { + Type: "ClientWatchDecoding", + Message: "unable to decode an event from the watch stream: test error", + }, + }, + }, + }, + }, + }, + }, { Request: &Request{ content: defaultContentConfig(), @@ -999,27 +1041,37 @@ func TestRequestWatch(t *testing.T) { }, } for i, testCase := range testCases { - t.Logf("testcase %v", testCase.Request) - testCase.Request.backoffMgr = &NoBackoff{} - watch, err := testCase.Request.Watch() - hasErr := err != nil - if hasErr != testCase.Err { - t.Errorf("%d: expected %t, got %t: %v", i, testCase.Err, hasErr, err) - continue - } - if testCase.ErrFn != nil && !testCase.ErrFn(err) { - t.Errorf("%d: error not valid: %v", i, err) - } - if hasErr && watch != nil { - t.Errorf("%d: watch should be nil when error is returned", i) - continue - } - if testCase.Empty { - _, ok := <-watch.ResultChan() - if ok { - t.Errorf("%d: expected the watch to be empty: %#v", i, watch) + t.Run("", func(t *testing.T) { + testCase.Request.backoffMgr = &NoBackoff{} + watch, err := testCase.Request.Watch() + hasErr := err != nil + if hasErr != testCase.Err { + t.Fatalf("%d: expected %t, got %t: %v", i, testCase.Err, hasErr, err) } - } + if testCase.ErrFn != nil && !testCase.ErrFn(err) { + t.Errorf("%d: error not valid: %v", i, err) + } + if hasErr && watch != nil { + t.Fatalf("%d: watch should be nil when error is returned", i) + } + if testCase.Empty { + _, ok := <-watch.ResultChan() + if ok { + t.Errorf("%d: expected the watch to be empty: %#v", i, watch) + } + } + if testCase.Expect != nil { + for i, evt := range testCase.Expect { + out, ok := <-watch.ResultChan() + if !ok { + t.Fatalf("Watch closed early, %d/%d read", i, len(testCase.Expect)) + } + if !reflect.DeepEqual(evt, out) { + t.Fatalf("Event %d does not match: %s", i, diff.ObjectReflectDiff(evt, out)) + } + } + } + }) } }