mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-08-23 10:32:03 +00:00
Merge pull request #73937 from smarterclayton/report_errors
Report a watch error instead of eating it when we can't decode
This commit is contained in:
commit
c28b3b1fdd
@ -617,3 +617,46 @@ func ReasonForError(err error) metav1.StatusReason {
|
|||||||
}
|
}
|
||||||
return metav1.StatusReasonUnknown
|
return metav1.StatusReasonUnknown
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// ErrorReporter converts generic errors into runtime.Object errors without
|
||||||
|
// requiring the caller to take a dependency on meta/v1 (where Status lives).
|
||||||
|
// This prevents circular dependencies in core watch code.
|
||||||
|
type ErrorReporter struct {
|
||||||
|
code int
|
||||||
|
verb string
|
||||||
|
reason string
|
||||||
|
}
|
||||||
|
|
||||||
|
// NewClientErrorReporter will respond with valid v1.Status objects that report
|
||||||
|
// unexpected server responses. Primarily used by watch to report errors when
|
||||||
|
// we attempt to decode a response from the server and it is not in the form
|
||||||
|
// we expect. Because watch is a dependency of the core api, we can't return
|
||||||
|
// meta/v1.Status in that package and so much inject this interface to convert a
|
||||||
|
// generic error as appropriate. The reason is passed as a unique status cause
|
||||||
|
// on the returned status, otherwise the generic "ClientError" is returned.
|
||||||
|
func NewClientErrorReporter(code int, verb string, reason string) *ErrorReporter {
|
||||||
|
return &ErrorReporter{
|
||||||
|
code: code,
|
||||||
|
verb: verb,
|
||||||
|
reason: reason,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// AsObject returns a valid error runtime.Object (a v1.Status) for the given
|
||||||
|
// error, using the code and verb of the reporter type. The error is set to
|
||||||
|
// indicate that this was an unexpected server response.
|
||||||
|
func (r *ErrorReporter) AsObject(err error) runtime.Object {
|
||||||
|
status := NewGenericServerResponse(r.code, r.verb, schema.GroupResource{}, "", err.Error(), 0, true)
|
||||||
|
if status.ErrStatus.Details == nil {
|
||||||
|
status.ErrStatus.Details = &metav1.StatusDetails{}
|
||||||
|
}
|
||||||
|
reason := r.reason
|
||||||
|
if len(reason) == 0 {
|
||||||
|
reason = "ClientError"
|
||||||
|
}
|
||||||
|
status.ErrStatus.Details.Causes = append(status.ErrStatus.Details.Causes, metav1.StatusCause{
|
||||||
|
Type: metav1.CauseType(reason),
|
||||||
|
Message: err.Error(),
|
||||||
|
})
|
||||||
|
return &status.ErrStatus
|
||||||
|
}
|
||||||
|
@ -17,13 +17,15 @@ limitations under the License.
|
|||||||
package watch
|
package watch
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"fmt"
|
||||||
"io"
|
"io"
|
||||||
"sync"
|
"sync"
|
||||||
|
|
||||||
|
"k8s.io/klog"
|
||||||
|
|
||||||
"k8s.io/apimachinery/pkg/runtime"
|
"k8s.io/apimachinery/pkg/runtime"
|
||||||
"k8s.io/apimachinery/pkg/util/net"
|
"k8s.io/apimachinery/pkg/util/net"
|
||||||
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
|
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
|
||||||
"k8s.io/klog"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
// Decoder allows StreamWatcher to watch any stream for which a Decoder can be written.
|
// Decoder allows StreamWatcher to watch any stream for which a Decoder can be written.
|
||||||
@ -39,19 +41,28 @@ type Decoder interface {
|
|||||||
Close()
|
Close()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Reporter hides the details of how an error is turned into a runtime.Object for
|
||||||
|
// reporting on a watch stream since this package may not import a higher level report.
|
||||||
|
type Reporter interface {
|
||||||
|
// AsObject must convert err into a valid runtime.Object for the watch stream.
|
||||||
|
AsObject(err error) runtime.Object
|
||||||
|
}
|
||||||
|
|
||||||
// StreamWatcher turns any stream for which you can write a Decoder interface
|
// StreamWatcher turns any stream for which you can write a Decoder interface
|
||||||
// into a watch.Interface.
|
// into a watch.Interface.
|
||||||
type StreamWatcher struct {
|
type StreamWatcher struct {
|
||||||
sync.Mutex
|
sync.Mutex
|
||||||
source Decoder
|
source Decoder
|
||||||
|
reporter Reporter
|
||||||
result chan Event
|
result chan Event
|
||||||
stopped bool
|
stopped bool
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewStreamWatcher creates a StreamWatcher from the given decoder.
|
// NewStreamWatcher creates a StreamWatcher from the given decoder.
|
||||||
func NewStreamWatcher(d Decoder) *StreamWatcher {
|
func NewStreamWatcher(d Decoder, r Reporter) *StreamWatcher {
|
||||||
sw := &StreamWatcher{
|
sw := &StreamWatcher{
|
||||||
source: d,
|
source: d,
|
||||||
|
reporter: r,
|
||||||
// It's easy for a consumer to add buffering via an extra
|
// It's easy for a consumer to add buffering via an extra
|
||||||
// goroutine/channel, but impossible for them to remove it,
|
// goroutine/channel, but impossible for them to remove it,
|
||||||
// so nonbuffered is better.
|
// so nonbuffered is better.
|
||||||
@ -102,11 +113,13 @@ func (sw *StreamWatcher) receive() {
|
|||||||
case io.ErrUnexpectedEOF:
|
case io.ErrUnexpectedEOF:
|
||||||
klog.V(1).Infof("Unexpected EOF during watch stream event decoding: %v", err)
|
klog.V(1).Infof("Unexpected EOF during watch stream event decoding: %v", err)
|
||||||
default:
|
default:
|
||||||
msg := "Unable to decode an event from the watch stream: %v"
|
|
||||||
if net.IsProbableEOF(err) {
|
if net.IsProbableEOF(err) {
|
||||||
klog.V(5).Infof(msg, err)
|
klog.V(5).Infof("Unable to decode an event from the watch stream: %v", err)
|
||||||
} else {
|
} else {
|
||||||
klog.Errorf(msg, err)
|
sw.result <- Event{
|
||||||
|
Type: Error,
|
||||||
|
Object: sw.reporter.AsObject(fmt.Errorf("unable to decode an event from the watch stream: %v", err)),
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return
|
return
|
||||||
|
@ -17,6 +17,7 @@ limitations under the License.
|
|||||||
package watch_test
|
package watch_test
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"fmt"
|
||||||
"io"
|
"io"
|
||||||
"reflect"
|
"reflect"
|
||||||
"testing"
|
"testing"
|
||||||
@ -27,9 +28,13 @@ import (
|
|||||||
|
|
||||||
type fakeDecoder struct {
|
type fakeDecoder struct {
|
||||||
items chan Event
|
items chan Event
|
||||||
|
err error
|
||||||
}
|
}
|
||||||
|
|
||||||
func (f fakeDecoder) Decode() (action EventType, object runtime.Object, err error) {
|
func (f fakeDecoder) Decode() (action EventType, object runtime.Object, err error) {
|
||||||
|
if f.err != nil {
|
||||||
|
return "", nil, f.err
|
||||||
|
}
|
||||||
item, open := <-f.items
|
item, open := <-f.items
|
||||||
if !open {
|
if !open {
|
||||||
return action, nil, io.EOF
|
return action, nil, io.EOF
|
||||||
@ -38,16 +43,27 @@ func (f fakeDecoder) Decode() (action EventType, object runtime.Object, err erro
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (f fakeDecoder) Close() {
|
func (f fakeDecoder) Close() {
|
||||||
|
if f.items != nil {
|
||||||
close(f.items)
|
close(f.items)
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
type fakeReporter struct {
|
||||||
|
err error
|
||||||
|
}
|
||||||
|
|
||||||
|
func (f *fakeReporter) AsObject(err error) runtime.Object {
|
||||||
|
f.err = err
|
||||||
|
return runtime.Unstructured(nil)
|
||||||
|
}
|
||||||
|
|
||||||
func TestStreamWatcher(t *testing.T) {
|
func TestStreamWatcher(t *testing.T) {
|
||||||
table := []Event{
|
table := []Event{
|
||||||
{Type: Added, Object: testType("foo")},
|
{Type: Added, Object: testType("foo")},
|
||||||
}
|
}
|
||||||
|
|
||||||
fd := fakeDecoder{make(chan Event, 5)}
|
fd := fakeDecoder{items: make(chan Event, 5)}
|
||||||
sw := NewStreamWatcher(fd)
|
sw := NewStreamWatcher(fd, nil)
|
||||||
|
|
||||||
for _, item := range table {
|
for _, item := range table {
|
||||||
fd.items <- item
|
fd.items <- item
|
||||||
@ -66,3 +82,26 @@ func TestStreamWatcher(t *testing.T) {
|
|||||||
t.Errorf("Unexpected failure to close")
|
t.Errorf("Unexpected failure to close")
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestStreamWatcherError(t *testing.T) {
|
||||||
|
fd := fakeDecoder{err: fmt.Errorf("test error")}
|
||||||
|
fr := &fakeReporter{}
|
||||||
|
sw := NewStreamWatcher(fd, fr)
|
||||||
|
evt, ok := <-sw.ResultChan()
|
||||||
|
if !ok {
|
||||||
|
t.Fatalf("unexpected close")
|
||||||
|
}
|
||||||
|
if evt.Type != Error || evt.Object != runtime.Unstructured(nil) {
|
||||||
|
t.Fatalf("unexpected object: %#v", evt)
|
||||||
|
}
|
||||||
|
_, ok = <-sw.ResultChan()
|
||||||
|
if ok {
|
||||||
|
t.Fatalf("unexpected open channel")
|
||||||
|
}
|
||||||
|
|
||||||
|
sw.Stop()
|
||||||
|
_, ok = <-sw.ResultChan()
|
||||||
|
if ok {
|
||||||
|
t.Fatalf("unexpected open channel")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
@ -595,7 +595,12 @@ func (r *Request) WatchWithSpecificDecoders(wrapperDecoderFn func(io.ReadCloser)
|
|||||||
return nil, fmt.Errorf("for request %s, got status: %v", url, resp.StatusCode)
|
return nil, fmt.Errorf("for request %s, got status: %v", url, resp.StatusCode)
|
||||||
}
|
}
|
||||||
wrapperDecoder := wrapperDecoderFn(resp.Body)
|
wrapperDecoder := wrapperDecoderFn(resp.Body)
|
||||||
return watch.NewStreamWatcher(restclientwatch.NewDecoder(wrapperDecoder, embeddedDecoder)), nil
|
return watch.NewStreamWatcher(
|
||||||
|
restclientwatch.NewDecoder(wrapperDecoder, embeddedDecoder),
|
||||||
|
// use 500 to indicate that the cause of the error is unknown - other error codes
|
||||||
|
// are more specific to HTTP interactions, and set a reason
|
||||||
|
errors.NewClientErrorReporter(http.StatusInternalServerError, r.verb, "ClientWatchDecoding"),
|
||||||
|
), nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// updateURLMetrics is a convenience function for pushing metrics.
|
// updateURLMetrics is a convenience function for pushing metrics.
|
||||||
|
@ -37,7 +37,7 @@ import (
|
|||||||
|
|
||||||
"k8s.io/klog"
|
"k8s.io/klog"
|
||||||
|
|
||||||
"k8s.io/api/core/v1"
|
v1 "k8s.io/api/core/v1"
|
||||||
apiequality "k8s.io/apimachinery/pkg/api/equality"
|
apiequality "k8s.io/apimachinery/pkg/api/equality"
|
||||||
apierrors "k8s.io/apimachinery/pkg/api/errors"
|
apierrors "k8s.io/apimachinery/pkg/api/errors"
|
||||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||||
@ -879,9 +879,17 @@ func TestTransformUnstructuredError(t *testing.T) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
type errorReader struct {
|
||||||
|
err error
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r errorReader) Read(data []byte) (int, error) { return 0, r.err }
|
||||||
|
func (r errorReader) Close() error { return nil }
|
||||||
|
|
||||||
func TestRequestWatch(t *testing.T) {
|
func TestRequestWatch(t *testing.T) {
|
||||||
testCases := []struct {
|
testCases := []struct {
|
||||||
Request *Request
|
Request *Request
|
||||||
|
Expect []watch.Event
|
||||||
Err bool
|
Err bool
|
||||||
ErrFn func(error) bool
|
ErrFn func(error) bool
|
||||||
Empty bool
|
Empty bool
|
||||||
@ -903,6 +911,40 @@ func TestRequestWatch(t *testing.T) {
|
|||||||
},
|
},
|
||||||
Err: true,
|
Err: true,
|
||||||
},
|
},
|
||||||
|
{
|
||||||
|
Request: &Request{
|
||||||
|
content: defaultContentConfig(),
|
||||||
|
serializers: defaultSerializers(t),
|
||||||
|
client: clientFunc(func(req *http.Request) (*http.Response, error) {
|
||||||
|
resp := &http.Response{StatusCode: http.StatusOK, Body: errorReader{err: errors.New("test error")}}
|
||||||
|
return resp, nil
|
||||||
|
}),
|
||||||
|
baseURL: &url.URL{},
|
||||||
|
},
|
||||||
|
Expect: []watch.Event{
|
||||||
|
{
|
||||||
|
Type: watch.Error,
|
||||||
|
Object: &metav1.Status{
|
||||||
|
Status: "Failure",
|
||||||
|
Code: 500,
|
||||||
|
Reason: "InternalError",
|
||||||
|
Message: `an error on the server ("unable to decode an event from the watch stream: test error") has prevented the request from succeeding`,
|
||||||
|
Details: &metav1.StatusDetails{
|
||||||
|
Causes: []metav1.StatusCause{
|
||||||
|
{
|
||||||
|
Type: "UnexpectedServerResponse",
|
||||||
|
Message: "unable to decode an event from the watch stream: test error",
|
||||||
|
},
|
||||||
|
{
|
||||||
|
Type: "ClientWatchDecoding",
|
||||||
|
Message: "unable to decode an event from the watch stream: test error",
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
{
|
{
|
||||||
Request: &Request{
|
Request: &Request{
|
||||||
content: defaultContentConfig(),
|
content: defaultContentConfig(),
|
||||||
@ -999,20 +1041,18 @@ func TestRequestWatch(t *testing.T) {
|
|||||||
},
|
},
|
||||||
}
|
}
|
||||||
for i, testCase := range testCases {
|
for i, testCase := range testCases {
|
||||||
t.Logf("testcase %v", testCase.Request)
|
t.Run("", func(t *testing.T) {
|
||||||
testCase.Request.backoffMgr = &NoBackoff{}
|
testCase.Request.backoffMgr = &NoBackoff{}
|
||||||
watch, err := testCase.Request.Watch()
|
watch, err := testCase.Request.Watch()
|
||||||
hasErr := err != nil
|
hasErr := err != nil
|
||||||
if hasErr != testCase.Err {
|
if hasErr != testCase.Err {
|
||||||
t.Errorf("%d: expected %t, got %t: %v", i, testCase.Err, hasErr, err)
|
t.Fatalf("%d: expected %t, got %t: %v", i, testCase.Err, hasErr, err)
|
||||||
continue
|
|
||||||
}
|
}
|
||||||
if testCase.ErrFn != nil && !testCase.ErrFn(err) {
|
if testCase.ErrFn != nil && !testCase.ErrFn(err) {
|
||||||
t.Errorf("%d: error not valid: %v", i, err)
|
t.Errorf("%d: error not valid: %v", i, err)
|
||||||
}
|
}
|
||||||
if hasErr && watch != nil {
|
if hasErr && watch != nil {
|
||||||
t.Errorf("%d: watch should be nil when error is returned", i)
|
t.Fatalf("%d: watch should be nil when error is returned", i)
|
||||||
continue
|
|
||||||
}
|
}
|
||||||
if testCase.Empty {
|
if testCase.Empty {
|
||||||
_, ok := <-watch.ResultChan()
|
_, ok := <-watch.ResultChan()
|
||||||
@ -1020,6 +1060,18 @@ func TestRequestWatch(t *testing.T) {
|
|||||||
t.Errorf("%d: expected the watch to be empty: %#v", i, watch)
|
t.Errorf("%d: expected the watch to be empty: %#v", i, watch)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
if testCase.Expect != nil {
|
||||||
|
for i, evt := range testCase.Expect {
|
||||||
|
out, ok := <-watch.ResultChan()
|
||||||
|
if !ok {
|
||||||
|
t.Fatalf("Watch closed early, %d/%d read", i, len(testCase.Expect))
|
||||||
|
}
|
||||||
|
if !reflect.DeepEqual(evt, out) {
|
||||||
|
t.Fatalf("Event %d does not match: %s", i, diff.ObjectReflectDiff(evt, out))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
})
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
Loading…
Reference in New Issue
Block a user