diff --git a/pkg/tools/etcd_helper_watch.go b/pkg/tools/etcd_helper_watch.go index 356fbf9f57a..fb66e2af25b 100644 --- a/pkg/tools/etcd_helper_watch.go +++ b/pkg/tools/etcd_helper_watch.go @@ -168,10 +168,11 @@ func newEtcdWatcher(list bool, include includeFunc, filter FilterFunc, encoding // object. Basically, we see a lot of "401 window exceeded" // errors from etcd, and that's due to the client not streaming // results but rather getting them one at a time. So we really - // want to never block the etcd client, if possible. The 50 is - // arbitrary; there's a V(4) log message that prints the length - // so we can monitor how much of this buffer is actually used. - etcdIncoming: make(chan *etcd.Response, 50), + // want to never block the etcd client, if possible. The 100 is + // mostly arbitrary--we know it goes as high as 50, though. + // There's a V(2) log message that prints the length so we can + // monitor how much of this buffer is actually used. + etcdIncoming: make(chan *etcd.Response, 100), etcdError: make(chan error, 1), etcdStop: make(chan bool), outgoing: make(chan watch.Event), @@ -235,6 +236,10 @@ func convertRecursiveResponse(node *etcd.Node, response *etcd.Response, incoming incoming <- &copied } +var ( + watchChannelHWM util.HighWaterMark +) + // translate pulls stuff from etcd, converts, and pushes out the outgoing channel. Meant to be // called as a goroutine. func (w *etcdWatcher) translate() { @@ -259,9 +264,9 @@ func (w *etcdWatcher) translate() { return case res, ok := <-w.etcdIncoming: if ok { - if curLen := len(w.etcdIncoming); curLen > 0 { + if curLen := int64(len(w.etcdIncoming)); watchChannelHWM.Check(curLen) { // Monitor if this gets backed up, and how much. - glog.V(4).Infof("watch: %v objects queued in channel.", curLen) + glog.V(2).Infof("watch: %v objects queued in channel.", curLen) } w.sendResult(res) } diff --git a/pkg/util/atomic_value.go b/pkg/util/atomic_value.go index 3bb1a317601..718b712787e 100644 --- a/pkg/util/atomic_value.go +++ b/pkg/util/atomic_value.go @@ -18,6 +18,7 @@ package util import ( "sync" + "sync/atomic" ) // TODO(ArtfulCoder) @@ -40,3 +41,20 @@ func (at *AtomicValue) Load() interface{} { defer at.valueMutex.RUnlock() return at.value } + +// HighWaterMark is a thread-safe object for tracking the maximum value seen +// for some quantity. +type HighWaterMark int64 + +// Check returns true iff 'current' is the highest value ever seen. +func (hwm *HighWaterMark) Check(current int64) bool { + for { + old := atomic.LoadInt64((*int64)(hwm)) + if current <= old { + return false + } + if atomic.CompareAndSwapInt64((*int64)(hwm), old, current) { + return true + } + } +} diff --git a/pkg/util/atomic_value_test.go b/pkg/util/atomic_value_test.go index 29b8bd076b6..0da8b848025 100644 --- a/pkg/util/atomic_value_test.go +++ b/pkg/util/atomic_value_test.go @@ -17,6 +17,8 @@ limitations under the License. package util import ( + "math/rand" + "sync" "testing" "time" ) @@ -48,3 +50,34 @@ func TestAtomicValue(t *testing.T) { atomicValue.Store(10) ExpectValue(t, atomicValue, 10) } + +func TestHighWaterMark(t *testing.T) { + var h HighWaterMark + + for i := int64(10); i < 20; i++ { + if !h.Check(i) { + t.Errorf("unexpected false for %v", i) + } + if h.Check(i - 1) { + t.Errorf("unexpected true for %v", i-1) + } + } + + m := int64(0) + wg := sync.WaitGroup{} + for i := 0; i < 300; i++ { + wg.Add(1) + v := rand.Int63() + go func(v int64) { + defer wg.Done() + h.Check(v) + }(v) + if v > m { + m = v + } + } + wg.Wait() + if m != int64(h) { + t.Errorf("unexpected value, wanted %v, got %v", m, int64(h)) + } +}