mirror of
https://github.com/kubernetes/client-go.git
synced 2025-07-06 03:36:26 +00:00
Report a watch error instead of eating it when we can't decode
Clients are required to handle watch events of type ERROR, so instead of eating the decoding error we should pass it on to the client. Use NewGenericServerError with isUnexpectedResponse to indicate that we didn't get the bytes from the server we were expecting. For watch, the 415 error code is roughly correct and we will return an error to the client that makes debugging a failure in either server watch or client machinery much easier. We do not alter the behavior when it appears the response is an EOF or other disconnection. Kubernetes-commit: 89620d5667adec6c132b2713b79efb1dd2391723
This commit is contained in:
parent
d41af2f7d3
commit
cd12199def
@ -595,7 +595,12 @@ func (r *Request) WatchWithSpecificDecoders(wrapperDecoderFn func(io.ReadCloser)
|
||||
return nil, fmt.Errorf("for request %s, got status: %v", url, resp.StatusCode)
|
||||
}
|
||||
wrapperDecoder := wrapperDecoderFn(resp.Body)
|
||||
return watch.NewStreamWatcher(restclientwatch.NewDecoder(wrapperDecoder, embeddedDecoder)), nil
|
||||
return watch.NewStreamWatcher(
|
||||
restclientwatch.NewDecoder(wrapperDecoder, embeddedDecoder),
|
||||
// use 500 to indicate that the cause of the error is unknown - other error codes
|
||||
// are more specific to HTTP interactions, and set a reason
|
||||
errors.NewClientErrorReporter(http.StatusInternalServerError, r.verb, "ClientWatchDecoding"),
|
||||
), nil
|
||||
}
|
||||
|
||||
// updateURLMetrics is a convenience function for pushing metrics.
|
||||
|
@ -37,7 +37,7 @@ import (
|
||||
|
||||
"k8s.io/klog"
|
||||
|
||||
"k8s.io/api/core/v1"
|
||||
v1 "k8s.io/api/core/v1"
|
||||
apiequality "k8s.io/apimachinery/pkg/api/equality"
|
||||
apierrors "k8s.io/apimachinery/pkg/api/errors"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
@ -879,9 +879,17 @@ func TestTransformUnstructuredError(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
type errorReader struct {
|
||||
err error
|
||||
}
|
||||
|
||||
func (r errorReader) Read(data []byte) (int, error) { return 0, r.err }
|
||||
func (r errorReader) Close() error { return nil }
|
||||
|
||||
func TestRequestWatch(t *testing.T) {
|
||||
testCases := []struct {
|
||||
Request *Request
|
||||
Expect []watch.Event
|
||||
Err bool
|
||||
ErrFn func(error) bool
|
||||
Empty bool
|
||||
@ -903,6 +911,40 @@ func TestRequestWatch(t *testing.T) {
|
||||
},
|
||||
Err: true,
|
||||
},
|
||||
{
|
||||
Request: &Request{
|
||||
content: defaultContentConfig(),
|
||||
serializers: defaultSerializers(t),
|
||||
client: clientFunc(func(req *http.Request) (*http.Response, error) {
|
||||
resp := &http.Response{StatusCode: http.StatusOK, Body: errorReader{err: errors.New("test error")}}
|
||||
return resp, nil
|
||||
}),
|
||||
baseURL: &url.URL{},
|
||||
},
|
||||
Expect: []watch.Event{
|
||||
{
|
||||
Type: watch.Error,
|
||||
Object: &metav1.Status{
|
||||
Status: "Failure",
|
||||
Code: 500,
|
||||
Reason: "InternalError",
|
||||
Message: `an error on the server ("unable to decode an event from the watch stream: test error") has prevented the request from succeeding`,
|
||||
Details: &metav1.StatusDetails{
|
||||
Causes: []metav1.StatusCause{
|
||||
{
|
||||
Type: "UnexpectedServerResponse",
|
||||
Message: "unable to decode an event from the watch stream: test error",
|
||||
},
|
||||
{
|
||||
Type: "ClientWatchDecoding",
|
||||
Message: "unable to decode an event from the watch stream: test error",
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
Request: &Request{
|
||||
content: defaultContentConfig(),
|
||||
@ -999,27 +1041,37 @@ func TestRequestWatch(t *testing.T) {
|
||||
},
|
||||
}
|
||||
for i, testCase := range testCases {
|
||||
t.Logf("testcase %v", testCase.Request)
|
||||
testCase.Request.backoffMgr = &NoBackoff{}
|
||||
watch, err := testCase.Request.Watch()
|
||||
hasErr := err != nil
|
||||
if hasErr != testCase.Err {
|
||||
t.Errorf("%d: expected %t, got %t: %v", i, testCase.Err, hasErr, err)
|
||||
continue
|
||||
}
|
||||
if testCase.ErrFn != nil && !testCase.ErrFn(err) {
|
||||
t.Errorf("%d: error not valid: %v", i, err)
|
||||
}
|
||||
if hasErr && watch != nil {
|
||||
t.Errorf("%d: watch should be nil when error is returned", i)
|
||||
continue
|
||||
}
|
||||
if testCase.Empty {
|
||||
_, ok := <-watch.ResultChan()
|
||||
if ok {
|
||||
t.Errorf("%d: expected the watch to be empty: %#v", i, watch)
|
||||
t.Run("", func(t *testing.T) {
|
||||
testCase.Request.backoffMgr = &NoBackoff{}
|
||||
watch, err := testCase.Request.Watch()
|
||||
hasErr := err != nil
|
||||
if hasErr != testCase.Err {
|
||||
t.Fatalf("%d: expected %t, got %t: %v", i, testCase.Err, hasErr, err)
|
||||
}
|
||||
}
|
||||
if testCase.ErrFn != nil && !testCase.ErrFn(err) {
|
||||
t.Errorf("%d: error not valid: %v", i, err)
|
||||
}
|
||||
if hasErr && watch != nil {
|
||||
t.Fatalf("%d: watch should be nil when error is returned", i)
|
||||
}
|
||||
if testCase.Empty {
|
||||
_, ok := <-watch.ResultChan()
|
||||
if ok {
|
||||
t.Errorf("%d: expected the watch to be empty: %#v", i, watch)
|
||||
}
|
||||
}
|
||||
if testCase.Expect != nil {
|
||||
for i, evt := range testCase.Expect {
|
||||
out, ok := <-watch.ResultChan()
|
||||
if !ok {
|
||||
t.Fatalf("Watch closed early, %d/%d read", i, len(testCase.Expect))
|
||||
}
|
||||
if !reflect.DeepEqual(evt, out) {
|
||||
t.Fatalf("Event %d does not match: %s", i, diff.ObjectReflectDiff(evt, out))
|
||||
}
|
||||
}
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user