diff --git a/staging/src/k8s.io/apimachinery/pkg/api/errors/errors.go b/staging/src/k8s.io/apimachinery/pkg/api/errors/errors.go index 81e5d48bf70..65321232f0e 100644 --- a/staging/src/k8s.io/apimachinery/pkg/api/errors/errors.go +++ b/staging/src/k8s.io/apimachinery/pkg/api/errors/errors.go @@ -603,3 +603,46 @@ func ReasonForError(err error) metav1.StatusReason { } return metav1.StatusReasonUnknown } + +// ErrorReporter converts generic errors into runtime.Object errors without +// requiring the caller to take a dependency on meta/v1 (where Status lives). +// This prevents circular dependencies in core watch code. +type ErrorReporter struct { + code int + verb string + reason string +} + +// NewClientErrorReporter will respond with valid v1.Status objects that report +// unexpected server responses. Primarily used by watch to report errors when +// we attempt to decode a response from the server and it is not in the form +// we expect. Because watch is a dependency of the core api, we can't return +// meta/v1.Status in that package and so much inject this interface to convert a +// generic error as appropriate. The reason is passed as a unique status cause +// on the returned status, otherwise the generic "ClientError" is returned. +func NewClientErrorReporter(code int, verb string, reason string) *ErrorReporter { + return &ErrorReporter{ + code: code, + verb: verb, + reason: reason, + } +} + +// AsObject returns a valid error runtime.Object (a v1.Status) for the given +// error, using the code and verb of the reporter type. The error is set to +// indicate that this was an unexpected server response. +func (r *ErrorReporter) AsObject(err error) runtime.Object { + status := NewGenericServerResponse(r.code, r.verb, schema.GroupResource{}, "", err.Error(), 0, true) + if status.ErrStatus.Details == nil { + status.ErrStatus.Details = &metav1.StatusDetails{} + } + reason := r.reason + if len(reason) == 0 { + reason = "ClientError" + } + status.ErrStatus.Details.Causes = append(status.ErrStatus.Details.Causes, metav1.StatusCause{ + Type: metav1.CauseType(reason), + Message: err.Error(), + }) + return &status.ErrStatus +} diff --git a/staging/src/k8s.io/apimachinery/pkg/watch/streamwatcher.go b/staging/src/k8s.io/apimachinery/pkg/watch/streamwatcher.go index d61cf5a2e58..8af256eb12a 100644 --- a/staging/src/k8s.io/apimachinery/pkg/watch/streamwatcher.go +++ b/staging/src/k8s.io/apimachinery/pkg/watch/streamwatcher.go @@ -17,13 +17,15 @@ limitations under the License. package watch import ( + "fmt" "io" "sync" + "k8s.io/klog" + "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/util/net" utilruntime "k8s.io/apimachinery/pkg/util/runtime" - "k8s.io/klog" ) // Decoder allows StreamWatcher to watch any stream for which a Decoder can be written. @@ -39,19 +41,28 @@ type Decoder interface { Close() } +// Reporter hides the details of how an error is turned into a runtime.Object for +// reporting on a watch stream since this package may not import a higher level report. +type Reporter interface { + // AsObject must convert err into a valid runtime.Object for the watch stream. + AsObject(err error) runtime.Object +} + // StreamWatcher turns any stream for which you can write a Decoder interface // into a watch.Interface. type StreamWatcher struct { sync.Mutex - source Decoder - result chan Event - stopped bool + source Decoder + reporter Reporter + result chan Event + stopped bool } // NewStreamWatcher creates a StreamWatcher from the given decoder. -func NewStreamWatcher(d Decoder) *StreamWatcher { +func NewStreamWatcher(d Decoder, r Reporter) *StreamWatcher { sw := &StreamWatcher{ - source: d, + source: d, + reporter: r, // It's easy for a consumer to add buffering via an extra // goroutine/channel, but impossible for them to remove it, // so nonbuffered is better. @@ -102,11 +113,13 @@ func (sw *StreamWatcher) receive() { case io.ErrUnexpectedEOF: klog.V(1).Infof("Unexpected EOF during watch stream event decoding: %v", err) default: - msg := "Unable to decode an event from the watch stream: %v" if net.IsProbableEOF(err) { - klog.V(5).Infof(msg, err) + klog.V(5).Infof("Unable to decode an event from the watch stream: %v", err) } else { - klog.Errorf(msg, err) + sw.result <- Event{ + Type: Error, + Object: sw.reporter.AsObject(fmt.Errorf("unable to decode an event from the watch stream: %v", err)), + } } } return diff --git a/staging/src/k8s.io/apimachinery/pkg/watch/streamwatcher_test.go b/staging/src/k8s.io/apimachinery/pkg/watch/streamwatcher_test.go index 1e3029115f4..685a0f13a90 100644 --- a/staging/src/k8s.io/apimachinery/pkg/watch/streamwatcher_test.go +++ b/staging/src/k8s.io/apimachinery/pkg/watch/streamwatcher_test.go @@ -17,6 +17,7 @@ limitations under the License. package watch_test import ( + "fmt" "io" "reflect" "testing" @@ -27,9 +28,13 @@ import ( type fakeDecoder struct { items chan Event + err error } func (f fakeDecoder) Decode() (action EventType, object runtime.Object, err error) { + if f.err != nil { + return "", nil, f.err + } item, open := <-f.items if !open { return action, nil, io.EOF @@ -38,7 +43,18 @@ func (f fakeDecoder) Decode() (action EventType, object runtime.Object, err erro } func (f fakeDecoder) Close() { - close(f.items) + if f.items != nil { + close(f.items) + } +} + +type fakeReporter struct { + err error +} + +func (f *fakeReporter) AsObject(err error) runtime.Object { + f.err = err + return runtime.Unstructured(nil) } func TestStreamWatcher(t *testing.T) { @@ -46,8 +62,8 @@ func TestStreamWatcher(t *testing.T) { {Type: Added, Object: testType("foo")}, } - fd := fakeDecoder{make(chan Event, 5)} - sw := NewStreamWatcher(fd) + fd := fakeDecoder{items: make(chan Event, 5)} + sw := NewStreamWatcher(fd, nil) for _, item := range table { fd.items <- item @@ -66,3 +82,26 @@ func TestStreamWatcher(t *testing.T) { t.Errorf("Unexpected failure to close") } } + +func TestStreamWatcherError(t *testing.T) { + fd := fakeDecoder{err: fmt.Errorf("test error")} + fr := &fakeReporter{} + sw := NewStreamWatcher(fd, fr) + evt, ok := <-sw.ResultChan() + if !ok { + t.Fatalf("unexpected close") + } + if evt.Type != Error || evt.Object != runtime.Unstructured(nil) { + t.Fatalf("unexpected object: %#v", evt) + } + _, ok = <-sw.ResultChan() + if ok { + t.Fatalf("unexpected open channel") + } + + sw.Stop() + _, ok = <-sw.ResultChan() + if ok { + t.Fatalf("unexpected open channel") + } +} diff --git a/staging/src/k8s.io/client-go/rest/request.go b/staging/src/k8s.io/client-go/rest/request.go index dd0630387ab..7f9817edb61 100644 --- a/staging/src/k8s.io/client-go/rest/request.go +++ b/staging/src/k8s.io/client-go/rest/request.go @@ -595,7 +595,12 @@ func (r *Request) WatchWithSpecificDecoders(wrapperDecoderFn func(io.ReadCloser) return nil, fmt.Errorf("for request '%+v', got status: %v", url, resp.StatusCode) } wrapperDecoder := wrapperDecoderFn(resp.Body) - return watch.NewStreamWatcher(restclientwatch.NewDecoder(wrapperDecoder, embeddedDecoder)), nil + return watch.NewStreamWatcher( + restclientwatch.NewDecoder(wrapperDecoder, embeddedDecoder), + // use 500 to indicate that the cause of the error is unknown - other error codes + // are more specific to HTTP interactions, and set a reason + errors.NewClientErrorReporter(http.StatusInternalServerError, r.verb, "ClientWatchDecoding"), + ), nil } // updateURLMetrics is a convenience function for pushing metrics. diff --git a/staging/src/k8s.io/client-go/rest/request_test.go b/staging/src/k8s.io/client-go/rest/request_test.go index e70770dcd6e..99ba7069dd4 100755 --- a/staging/src/k8s.io/client-go/rest/request_test.go +++ b/staging/src/k8s.io/client-go/rest/request_test.go @@ -37,7 +37,7 @@ import ( "k8s.io/klog" - "k8s.io/api/core/v1" + v1 "k8s.io/api/core/v1" apiequality "k8s.io/apimachinery/pkg/api/equality" apierrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -879,9 +879,17 @@ func TestTransformUnstructuredError(t *testing.T) { } } +type errorReader struct { + err error +} + +func (r errorReader) Read(data []byte) (int, error) { return 0, r.err } +func (r errorReader) Close() error { return nil } + func TestRequestWatch(t *testing.T) { testCases := []struct { Request *Request + Expect []watch.Event Err bool ErrFn func(error) bool Empty bool @@ -903,6 +911,40 @@ func TestRequestWatch(t *testing.T) { }, Err: true, }, + { + Request: &Request{ + content: defaultContentConfig(), + serializers: defaultSerializers(t), + client: clientFunc(func(req *http.Request) (*http.Response, error) { + resp := &http.Response{StatusCode: http.StatusOK, Body: errorReader{err: errors.New("test error")}} + return resp, nil + }), + baseURL: &url.URL{}, + }, + Expect: []watch.Event{ + { + Type: watch.Error, + Object: &metav1.Status{ + Status: "Failure", + Code: 500, + Reason: "InternalError", + Message: `an error on the server ("unable to decode an event from the watch stream: test error") has prevented the request from succeeding`, + Details: &metav1.StatusDetails{ + Causes: []metav1.StatusCause{ + { + Type: "UnexpectedServerResponse", + Message: "unable to decode an event from the watch stream: test error", + }, + { + Type: "ClientWatchDecoding", + Message: "unable to decode an event from the watch stream: test error", + }, + }, + }, + }, + }, + }, + }, { Request: &Request{ content: defaultContentConfig(), @@ -999,27 +1041,37 @@ func TestRequestWatch(t *testing.T) { }, } for i, testCase := range testCases { - t.Logf("testcase %v", testCase.Request) - testCase.Request.backoffMgr = &NoBackoff{} - watch, err := testCase.Request.Watch() - hasErr := err != nil - if hasErr != testCase.Err { - t.Errorf("%d: expected %t, got %t: %v", i, testCase.Err, hasErr, err) - continue - } - if testCase.ErrFn != nil && !testCase.ErrFn(err) { - t.Errorf("%d: error not valid: %v", i, err) - } - if hasErr && watch != nil { - t.Errorf("%d: watch should be nil when error is returned", i) - continue - } - if testCase.Empty { - _, ok := <-watch.ResultChan() - if ok { - t.Errorf("%d: expected the watch to be empty: %#v", i, watch) + t.Run("", func(t *testing.T) { + testCase.Request.backoffMgr = &NoBackoff{} + watch, err := testCase.Request.Watch() + hasErr := err != nil + if hasErr != testCase.Err { + t.Fatalf("%d: expected %t, got %t: %v", i, testCase.Err, hasErr, err) } - } + if testCase.ErrFn != nil && !testCase.ErrFn(err) { + t.Errorf("%d: error not valid: %v", i, err) + } + if hasErr && watch != nil { + t.Fatalf("%d: watch should be nil when error is returned", i) + } + if testCase.Empty { + _, ok := <-watch.ResultChan() + if ok { + t.Errorf("%d: expected the watch to be empty: %#v", i, watch) + } + } + if testCase.Expect != nil { + for i, evt := range testCase.Expect { + out, ok := <-watch.ResultChan() + if !ok { + t.Fatalf("Watch closed early, %d/%d read", i, len(testCase.Expect)) + } + if !reflect.DeepEqual(evt, out) { + t.Fatalf("Event %d does not match: %s", i, diff.ObjectReflectDiff(evt, out)) + } + } + } + }) } }