From b2434de777254f52d1ebcf6ae490b00dcb2b6d78 Mon Sep 17 00:00:00 2001 From: Clayton Coleman Date: Thu, 18 Dec 2014 15:38:24 -0500 Subject: [PATCH] When connections are broken on Watch, write fewer errors to logs Watch depends on long running connections, which intervening proxies may break without the control of the remote server. Specific errors handled are io.EOF, io.EOF wrapped by *url.Error, http connection reset errors (caused by race conditions in golang http code), and connection reset by peer (simply tolerated). --- pkg/client/cache/reflector.go | 2 +- pkg/client/request.go | 23 ++++++++++++++++++ pkg/client/request_test.go | 46 +++++++++++++++++++++++++++++++++++ pkg/watch/watch.go | 20 +++++++++++++++ pkg/watch/watch_test.go | 13 ++++++++++ 5 files changed, 103 insertions(+), 1 deletion(-) diff --git a/pkg/client/cache/reflector.go b/pkg/client/cache/reflector.go index 772fe0b8e83..56ecdecc50e 100644 --- a/pkg/client/cache/reflector.go +++ b/pkg/client/cache/reflector.go @@ -172,7 +172,7 @@ func (r *Reflector) watchHandler(w watch.Interface, resourceVersion *string) err watchDuration := time.Now().Sub(start) if watchDuration < 1*time.Second && eventCount == 0 { - glog.Errorf("unexpected watch close - watch lasted less than a second and no items received") + glog.V(4).Infof("Unexpected watch close - watch lasted less than a second and no items received") return errors.New("very short watch") } glog.V(4).Infof("Watch close - %v total %v items received", r.expectedType, eventCount) diff --git a/pkg/client/request.go b/pkg/client/request.go index de16849f75e..fbc5dffe80c 100644 --- a/pkg/client/request.go +++ b/pkg/client/request.go @@ -25,6 +25,7 @@ import ( "net/url" "path" "strconv" + "strings" "time" "github.com/GoogleCloudPlatform/kubernetes/pkg/api" @@ -298,6 +299,9 @@ func (r *Request) Watch() (watch.Interface, error) { } resp, err := client.Do(req) if err != nil { + if isProbableEOF(err) { + return watch.NewEmptyWatch(), nil + } return nil, err } if resp.StatusCode != http.StatusOK { @@ -310,6 +314,25 @@ func (r *Request) Watch() (watch.Interface, error) { return watch.NewStreamWatcher(watchjson.NewDecoder(resp.Body, r.codec)), nil } +// isProbableEOF returns true if the given error resembles a connection termination +// scenario that would justify assuming that the watch is empty. The watch stream +// mechanism handles many common partial data errors, so closed connections can be +// retried in many cases. +func isProbableEOF(err error) bool { + if uerr, ok := err.(*url.Error); ok { + err = uerr.Err + } + switch { + case err == io.EOF: + return true + case err.Error() == "http: can't write HTTP request on broken connection": + return true + case strings.Contains(err.Error(), "connection reset by peer"): + return true + } + return false +} + // Stream formats and executes the request, and offers streaming of the response. // Returns io.ReadCloser which could be used for streaming of the response, or an error func (r *Request) Stream() (io.ReadCloser, error) { diff --git a/pkg/client/request_test.go b/pkg/client/request_test.go index 210552319d4..bf89182411c 100644 --- a/pkg/client/request_test.go +++ b/pkg/client/request_test.go @@ -20,6 +20,7 @@ import ( "bytes" "encoding/base64" "errors" + "io" "io/ioutil" "net/http" "net/http/httptest" @@ -164,6 +165,7 @@ func TestRequestWatch(t *testing.T) { testCases := []struct { Request *Request Err bool + Empty bool }{ { Request: &Request{err: errors.New("bail")}, @@ -191,15 +193,59 @@ func TestRequestWatch(t *testing.T) { }, Err: true, }, + { + Request: &Request{ + client: clientFunc(func(req *http.Request) (*http.Response, error) { + return nil, io.EOF + }), + baseURL: &url.URL{}, + }, + Empty: true, + }, + { + Request: &Request{ + client: clientFunc(func(req *http.Request) (*http.Response, error) { + return nil, &url.Error{Err: io.EOF} + }), + baseURL: &url.URL{}, + }, + Empty: true, + }, + { + Request: &Request{ + client: clientFunc(func(req *http.Request) (*http.Response, error) { + return nil, errors.New("http: can't write HTTP request on broken connection") + }), + baseURL: &url.URL{}, + }, + Empty: true, + }, + { + Request: &Request{ + client: clientFunc(func(req *http.Request) (*http.Response, error) { + return nil, errors.New("foo: connection reset by peer") + }), + baseURL: &url.URL{}, + }, + Empty: true, + }, } for i, testCase := range testCases { watch, err := testCase.Request.Watch() hasErr := err != nil if hasErr != testCase.Err { t.Errorf("%d: expected %t, got %t: %v", i, testCase.Err, hasErr, err) + continue } if hasErr && watch != nil { t.Errorf("%d: watch should be nil when error is returned", i) + continue + } + if testCase.Empty { + _, ok := <-watch.ResultChan() + if ok { + t.Errorf("%d: expected the watch to be empty: %#v", watch) + } } } } diff --git a/pkg/watch/watch.go b/pkg/watch/watch.go index e70ae15e93f..df9828ccd4f 100644 --- a/pkg/watch/watch.go +++ b/pkg/watch/watch.go @@ -56,6 +56,26 @@ type Event struct { Object runtime.Object } +type emptyWatch chan Event + +// NewEmptyWatch returns a watch interface that returns no results and is closed. +// May be used in certain error conditions where no information is available but +// an error is not warranted. +func NewEmptyWatch() Interface { + ch := make(chan Event) + close(ch) + return emptyWatch(ch) +} + +// Stop implements Interface +func (w emptyWatch) Stop() { +} + +// ResultChan implements Interface +func (w emptyWatch) ResultChan() <-chan Event { + return chan Event(w) +} + // FakeWatcher lets you test anything that consumes a watch.Interface; threadsafe. type FakeWatcher struct { result chan Event diff --git a/pkg/watch/watch_test.go b/pkg/watch/watch_test.go index 2ab8b4d2c5e..cd9979fbeed 100644 --- a/pkg/watch/watch_test.go +++ b/pkg/watch/watch_test.go @@ -70,3 +70,16 @@ func TestFake(t *testing.T) { go sender() consumer(f) } + +func TestEmpty(t *testing.T) { + w := NewEmptyWatch() + _, ok := <-w.ResultChan() + if ok { + t.Errorf("unexpected result channel result") + } + w.Stop() + _, ok = <-w.ResultChan() + if ok { + t.Errorf("unexpected result channel result") + } +}