diff --git a/pkg/tools/etcd_helper_watch.go b/pkg/tools/etcd_helper_watch.go index 1a3fd1e74b3..356fbf9f57a 100644 --- a/pkg/tools/etcd_helper_watch.go +++ b/pkg/tools/etcd_helper_watch.go @@ -156,13 +156,22 @@ const watchWaitDuration = 100 * time.Millisecond // and a versioner, the versioner must be able to handle the objects that transform creates. func newEtcdWatcher(list bool, include includeFunc, filter FilterFunc, encoding runtime.Codec, versioner EtcdVersioner, transform TransformFunc, cache etcdCache) *etcdWatcher { w := &etcdWatcher{ - encoding: encoding, - versioner: versioner, - transform: transform, - list: list, - include: include, - filter: filter, - etcdIncoming: make(chan *etcd.Response), + encoding: encoding, + versioner: versioner, + transform: transform, + list: list, + include: include, + filter: filter, + // Buffer this channel, so that the etcd client is not forced + // to context switch with every object it gets, and so that a + // long time spent decoding an object won't block the *next* + // object. Basically, we see a lot of "401 window exceeded" + // errors from etcd, and that's due to the client not streaming + // results but rather getting them one at a time. So we really + // want to never block the etcd client, if possible. The 50 is + // arbitrary; there's a V(4) log message that prints the length + // so we can monitor how much of this buffer is actually used. + etcdIncoming: make(chan *etcd.Response, 50), etcdError: make(chan error, 1), etcdStop: make(chan bool), outgoing: make(chan watch.Event), @@ -250,6 +259,10 @@ func (w *etcdWatcher) translate() { return case res, ok := <-w.etcdIncoming: if ok { + if curLen := len(w.etcdIncoming); curLen > 0 { + // Monitor if this gets backed up, and how much. + glog.V(4).Infof("watch: %v objects queued in channel.", curLen) + } w.sendResult(res) } // If !ok, don't return here-- must wait for etcdError channel