Merge pull request #10427 from lavalamp/etcdChanFix

Don't block etcd client
This commit is contained in:
Alex Robinson 2015-06-29 13:28:48 -07:00
commit 83f0bd668c

View File

@ -156,13 +156,22 @@ const watchWaitDuration = 100 * time.Millisecond
// and a versioner, the versioner must be able to handle the objects that transform creates.
func newEtcdWatcher(list bool, include includeFunc, filter FilterFunc, encoding runtime.Codec, versioner EtcdVersioner, transform TransformFunc, cache etcdCache) *etcdWatcher {
w := &etcdWatcher{
encoding: encoding,
versioner: versioner,
transform: transform,
list: list,
include: include,
filter: filter,
etcdIncoming: make(chan *etcd.Response),
encoding: encoding,
versioner: versioner,
transform: transform,
list: list,
include: include,
filter: filter,
// Buffer this channel, so that the etcd client is not forced
// to context switch with every object it gets, and so that a
// long time spent decoding an object won't block the *next*
// object. Basically, we see a lot of "401 window exceeded"
// errors from etcd, and that's due to the client not streaming
// results but rather getting them one at a time. So we really
// want to never block the etcd client, if possible. The 50 is
// arbitrary; there's a V(4) log message that prints the length
// so we can monitor how much of this buffer is actually used.
etcdIncoming: make(chan *etcd.Response, 50),
etcdError: make(chan error, 1),
etcdStop: make(chan bool),
outgoing: make(chan watch.Event),
@ -250,6 +259,10 @@ func (w *etcdWatcher) translate() {
return
case res, ok := <-w.etcdIncoming:
if ok {
if curLen := len(w.etcdIncoming); curLen > 0 {
// Monitor if this gets backed up, and how much.
glog.V(4).Infof("watch: %v objects queued in channel.", curLen)
}
w.sendResult(res)
}
// If !ok, don't return here-- must wait for etcdError channel