diff --git a/pkg/client/cache/reflector.go b/pkg/client/cache/reflector.go index 772fe0b8e83..56ecdecc50e 100644 --- a/pkg/client/cache/reflector.go +++ b/pkg/client/cache/reflector.go @@ -172,7 +172,7 @@ func (r *Reflector) watchHandler(w watch.Interface, resourceVersion *string) err watchDuration := time.Now().Sub(start) if watchDuration < 1*time.Second && eventCount == 0 { - glog.Errorf("unexpected watch close - watch lasted less than a second and no items received") + glog.V(4).Infof("Unexpected watch close - watch lasted less than a second and no items received") return errors.New("very short watch") } glog.V(4).Infof("Watch close - %v total %v items received", r.expectedType, eventCount) diff --git a/pkg/client/request.go b/pkg/client/request.go index de16849f75e..fbc5dffe80c 100644 --- a/pkg/client/request.go +++ b/pkg/client/request.go @@ -25,6 +25,7 @@ import ( "net/url" "path" "strconv" + "strings" "time" "github.com/GoogleCloudPlatform/kubernetes/pkg/api" @@ -298,6 +299,9 @@ func (r *Request) Watch() (watch.Interface, error) { } resp, err := client.Do(req) if err != nil { + if isProbableEOF(err) { + return watch.NewEmptyWatch(), nil + } return nil, err } if resp.StatusCode != http.StatusOK { @@ -310,6 +314,25 @@ func (r *Request) Watch() (watch.Interface, error) { return watch.NewStreamWatcher(watchjson.NewDecoder(resp.Body, r.codec)), nil } +// isProbableEOF returns true if the given error resembles a connection termination +// scenario that would justify assuming that the watch is empty. The watch stream +// mechanism handles many common partial data errors, so closed connections can be +// retried in many cases. +func isProbableEOF(err error) bool { + if uerr, ok := err.(*url.Error); ok { + err = uerr.Err + } + switch { + case err == io.EOF: + return true + case err.Error() == "http: can't write HTTP request on broken connection": + return true + case strings.Contains(err.Error(), "connection reset by peer"): + return true + } + return false +} + // Stream formats and executes the request, and offers streaming of the response. // Returns io.ReadCloser which could be used for streaming of the response, or an error func (r *Request) Stream() (io.ReadCloser, error) { diff --git a/pkg/client/request_test.go b/pkg/client/request_test.go index 210552319d4..bf89182411c 100644 --- a/pkg/client/request_test.go +++ b/pkg/client/request_test.go @@ -20,6 +20,7 @@ import ( "bytes" "encoding/base64" "errors" + "io" "io/ioutil" "net/http" "net/http/httptest" @@ -164,6 +165,7 @@ func TestRequestWatch(t *testing.T) { testCases := []struct { Request *Request Err bool + Empty bool }{ { Request: &Request{err: errors.New("bail")}, @@ -191,15 +193,59 @@ func TestRequestWatch(t *testing.T) { }, Err: true, }, + { + Request: &Request{ + client: clientFunc(func(req *http.Request) (*http.Response, error) { + return nil, io.EOF + }), + baseURL: &url.URL{}, + }, + Empty: true, + }, + { + Request: &Request{ + client: clientFunc(func(req *http.Request) (*http.Response, error) { + return nil, &url.Error{Err: io.EOF} + }), + baseURL: &url.URL{}, + }, + Empty: true, + }, + { + Request: &Request{ + client: clientFunc(func(req *http.Request) (*http.Response, error) { + return nil, errors.New("http: can't write HTTP request on broken connection") + }), + baseURL: &url.URL{}, + }, + Empty: true, + }, + { + Request: &Request{ + client: clientFunc(func(req *http.Request) (*http.Response, error) { + return nil, errors.New("foo: connection reset by peer") + }), + baseURL: &url.URL{}, + }, + Empty: true, + }, } for i, testCase := range testCases { watch, err := testCase.Request.Watch() hasErr := err != nil if hasErr != testCase.Err { t.Errorf("%d: expected %t, got %t: %v", i, testCase.Err, hasErr, err) + continue } if hasErr && watch != nil { t.Errorf("%d: watch should be nil when error is returned", i) + continue + } + if testCase.Empty { + _, ok := <-watch.ResultChan() + if ok { + t.Errorf("%d: expected the watch to be empty: %#v", watch) + } } } } diff --git a/pkg/watch/watch.go b/pkg/watch/watch.go index e70ae15e93f..df9828ccd4f 100644 --- a/pkg/watch/watch.go +++ b/pkg/watch/watch.go @@ -56,6 +56,26 @@ type Event struct { Object runtime.Object } +type emptyWatch chan Event + +// NewEmptyWatch returns a watch interface that returns no results and is closed. +// May be used in certain error conditions where no information is available but +// an error is not warranted. +func NewEmptyWatch() Interface { + ch := make(chan Event) + close(ch) + return emptyWatch(ch) +} + +// Stop implements Interface +func (w emptyWatch) Stop() { +} + +// ResultChan implements Interface +func (w emptyWatch) ResultChan() <-chan Event { + return chan Event(w) +} + // FakeWatcher lets you test anything that consumes a watch.Interface; threadsafe. type FakeWatcher struct { result chan Event diff --git a/pkg/watch/watch_test.go b/pkg/watch/watch_test.go index 2ab8b4d2c5e..cd9979fbeed 100644 --- a/pkg/watch/watch_test.go +++ b/pkg/watch/watch_test.go @@ -70,3 +70,16 @@ func TestFake(t *testing.T) { go sender() consumer(f) } + +func TestEmpty(t *testing.T) { + w := NewEmptyWatch() + _, ok := <-w.ResultChan() + if ok { + t.Errorf("unexpected result channel result") + } + w.Stop() + _, ok = <-w.ResultChan() + if ok { + t.Errorf("unexpected result channel result") + } +}