mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-25 04:33:26 +00:00
Merge pull request #3037 from smarterclayton/hide_spurious_watch_errors
When connections are broken on Watch, write fewer errors to logs
This commit is contained in:
commit
a50f8034c8
2
pkg/client/cache/reflector.go
vendored
2
pkg/client/cache/reflector.go
vendored
@ -172,7 +172,7 @@ func (r *Reflector) watchHandler(w watch.Interface, resourceVersion *string) err
|
||||
|
||||
watchDuration := time.Now().Sub(start)
|
||||
if watchDuration < 1*time.Second && eventCount == 0 {
|
||||
glog.Errorf("unexpected watch close - watch lasted less than a second and no items received")
|
||||
glog.V(4).Infof("Unexpected watch close - watch lasted less than a second and no items received")
|
||||
return errors.New("very short watch")
|
||||
}
|
||||
glog.V(4).Infof("Watch close - %v total %v items received", r.expectedType, eventCount)
|
||||
|
@ -25,6 +25,7 @@ import (
|
||||
"net/url"
|
||||
"path"
|
||||
"strconv"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
|
||||
@ -298,6 +299,9 @@ func (r *Request) Watch() (watch.Interface, error) {
|
||||
}
|
||||
resp, err := client.Do(req)
|
||||
if err != nil {
|
||||
if isProbableEOF(err) {
|
||||
return watch.NewEmptyWatch(), nil
|
||||
}
|
||||
return nil, err
|
||||
}
|
||||
if resp.StatusCode != http.StatusOK {
|
||||
@ -310,6 +314,25 @@ func (r *Request) Watch() (watch.Interface, error) {
|
||||
return watch.NewStreamWatcher(watchjson.NewDecoder(resp.Body, r.codec)), nil
|
||||
}
|
||||
|
||||
// isProbableEOF returns true if the given error resembles a connection termination
|
||||
// scenario that would justify assuming that the watch is empty. The watch stream
|
||||
// mechanism handles many common partial data errors, so closed connections can be
|
||||
// retried in many cases.
|
||||
func isProbableEOF(err error) bool {
|
||||
if uerr, ok := err.(*url.Error); ok {
|
||||
err = uerr.Err
|
||||
}
|
||||
switch {
|
||||
case err == io.EOF:
|
||||
return true
|
||||
case err.Error() == "http: can't write HTTP request on broken connection":
|
||||
return true
|
||||
case strings.Contains(err.Error(), "connection reset by peer"):
|
||||
return true
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
||||
// Stream formats and executes the request, and offers streaming of the response.
|
||||
// Returns io.ReadCloser which could be used for streaming of the response, or an error
|
||||
func (r *Request) Stream() (io.ReadCloser, error) {
|
||||
|
@ -20,6 +20,7 @@ import (
|
||||
"bytes"
|
||||
"encoding/base64"
|
||||
"errors"
|
||||
"io"
|
||||
"io/ioutil"
|
||||
"net/http"
|
||||
"net/http/httptest"
|
||||
@ -164,6 +165,7 @@ func TestRequestWatch(t *testing.T) {
|
||||
testCases := []struct {
|
||||
Request *Request
|
||||
Err bool
|
||||
Empty bool
|
||||
}{
|
||||
{
|
||||
Request: &Request{err: errors.New("bail")},
|
||||
@ -191,15 +193,59 @@ func TestRequestWatch(t *testing.T) {
|
||||
},
|
||||
Err: true,
|
||||
},
|
||||
{
|
||||
Request: &Request{
|
||||
client: clientFunc(func(req *http.Request) (*http.Response, error) {
|
||||
return nil, io.EOF
|
||||
}),
|
||||
baseURL: &url.URL{},
|
||||
},
|
||||
Empty: true,
|
||||
},
|
||||
{
|
||||
Request: &Request{
|
||||
client: clientFunc(func(req *http.Request) (*http.Response, error) {
|
||||
return nil, &url.Error{Err: io.EOF}
|
||||
}),
|
||||
baseURL: &url.URL{},
|
||||
},
|
||||
Empty: true,
|
||||
},
|
||||
{
|
||||
Request: &Request{
|
||||
client: clientFunc(func(req *http.Request) (*http.Response, error) {
|
||||
return nil, errors.New("http: can't write HTTP request on broken connection")
|
||||
}),
|
||||
baseURL: &url.URL{},
|
||||
},
|
||||
Empty: true,
|
||||
},
|
||||
{
|
||||
Request: &Request{
|
||||
client: clientFunc(func(req *http.Request) (*http.Response, error) {
|
||||
return nil, errors.New("foo: connection reset by peer")
|
||||
}),
|
||||
baseURL: &url.URL{},
|
||||
},
|
||||
Empty: true,
|
||||
},
|
||||
}
|
||||
for i, testCase := range testCases {
|
||||
watch, err := testCase.Request.Watch()
|
||||
hasErr := err != nil
|
||||
if hasErr != testCase.Err {
|
||||
t.Errorf("%d: expected %t, got %t: %v", i, testCase.Err, hasErr, err)
|
||||
continue
|
||||
}
|
||||
if hasErr && watch != nil {
|
||||
t.Errorf("%d: watch should be nil when error is returned", i)
|
||||
continue
|
||||
}
|
||||
if testCase.Empty {
|
||||
_, ok := <-watch.ResultChan()
|
||||
if ok {
|
||||
t.Errorf("%d: expected the watch to be empty: %#v", watch)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -56,6 +56,26 @@ type Event struct {
|
||||
Object runtime.Object
|
||||
}
|
||||
|
||||
type emptyWatch chan Event
|
||||
|
||||
// NewEmptyWatch returns a watch interface that returns no results and is closed.
|
||||
// May be used in certain error conditions where no information is available but
|
||||
// an error is not warranted.
|
||||
func NewEmptyWatch() Interface {
|
||||
ch := make(chan Event)
|
||||
close(ch)
|
||||
return emptyWatch(ch)
|
||||
}
|
||||
|
||||
// Stop implements Interface
|
||||
func (w emptyWatch) Stop() {
|
||||
}
|
||||
|
||||
// ResultChan implements Interface
|
||||
func (w emptyWatch) ResultChan() <-chan Event {
|
||||
return chan Event(w)
|
||||
}
|
||||
|
||||
// FakeWatcher lets you test anything that consumes a watch.Interface; threadsafe.
|
||||
type FakeWatcher struct {
|
||||
result chan Event
|
||||
|
@ -70,3 +70,16 @@ func TestFake(t *testing.T) {
|
||||
go sender()
|
||||
consumer(f)
|
||||
}
|
||||
|
||||
func TestEmpty(t *testing.T) {
|
||||
w := NewEmptyWatch()
|
||||
_, ok := <-w.ResultChan()
|
||||
if ok {
|
||||
t.Errorf("unexpected result channel result")
|
||||
}
|
||||
w.Stop()
|
||||
_, ok = <-w.ResultChan()
|
||||
if ok {
|
||||
t.Errorf("unexpected result channel result")
|
||||
}
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user