From 03a23aed0900a7305c661d2993c0228bf79a98ad Mon Sep 17 00:00:00 2001 From: Wojciech Tyczynski Date: Thu, 8 Sep 2016 16:33:12 +0200 Subject: [PATCH] Log water mark for incoming queue in cacher --- pkg/storage/cacher.go | 7 ++++- pkg/storage/etcd/etcd_watcher.go | 22 ++-------------- pkg/storage/etcd/etcd_watcher_test.go | 33 ------------------------ pkg/storage/util.go | 18 +++++++++++++ pkg/storage/util_test.go | 37 ++++++++++++++++++++++++++- 5 files changed, 62 insertions(+), 55 deletions(-) diff --git a/pkg/storage/cacher.go b/pkg/storage/cacher.go index c4abbb0f0b6..7904cce70ca 100644 --- a/pkg/storage/cacher.go +++ b/pkg/storage/cacher.go @@ -162,7 +162,8 @@ type Cacher struct { watchers indexedWatchers // Incoming events that should be dispatched to watchers. - incoming chan watchCacheEvent + incoming chan watchCacheEvent + incomingHWM HighWaterMark // Handling graceful termination. stopLock sync.RWMutex @@ -410,6 +411,10 @@ func (c *Cacher) triggerValues(event *watchCacheEvent) ([]string, bool) { } func (c *Cacher) processEvent(event watchCacheEvent) { + if curLen := int64(len(c.incoming)); c.incomingHWM.Update(curLen) { + // Monitor if this gets backed up, and how much. + glog.V(1).Infof("cacher (%v): %v objects queued in incoming channel.", c.objectType.String(), curLen) + } c.incoming <- event } diff --git a/pkg/storage/etcd/etcd_watcher.go b/pkg/storage/etcd/etcd_watcher.go index b9ea1b3aec5..e7bb9adb905 100644 --- a/pkg/storage/etcd/etcd_watcher.go +++ b/pkg/storage/etcd/etcd_watcher.go @@ -21,7 +21,6 @@ import ( "net/http" "reflect" "sync" - "sync/atomic" "time" "k8s.io/kubernetes/pkg/api/unversioned" @@ -47,23 +46,6 @@ const ( EtcdExpire = "expire" ) -// HighWaterMark is a thread-safe object for tracking the maximum value seen -// for some quantity. -type HighWaterMark int64 - -// Update returns true if and only if 'current' is the highest value ever seen. -func (hwm *HighWaterMark) Update(current int64) bool { - for { - old := atomic.LoadInt64((*int64)(hwm)) - if current <= old { - return false - } - if atomic.CompareAndSwapInt64((*int64)(hwm), old, current) { - return true - } - } -} - // TransformFunc attempts to convert an object to another object for use with a watcher. type TransformFunc func(runtime.Object) (runtime.Object, error) @@ -109,8 +91,8 @@ type etcdWatcher struct { emit func(watch.Event) // HighWaterMarks for performance debugging. - incomingHWM HighWaterMark - outgoingHWM HighWaterMark + incomingHWM storage.HighWaterMark + outgoingHWM storage.HighWaterMark cache etcdCache } diff --git a/pkg/storage/etcd/etcd_watcher_test.go b/pkg/storage/etcd/etcd_watcher_test.go index 5caa7eb7a71..e19202587d4 100644 --- a/pkg/storage/etcd/etcd_watcher_test.go +++ b/pkg/storage/etcd/etcd_watcher_test.go @@ -17,9 +17,7 @@ limitations under the License. package etcd import ( - "math/rand" rt "runtime" - "sync" "testing" "k8s.io/kubernetes/pkg/api" @@ -557,34 +555,3 @@ func TestWatchPurposefulShutdown(t *testing.T) { t.Errorf("Unexpected event from stopped watcher: %#v", event) } } - -func TestHighWaterMark(t *testing.T) { - var h HighWaterMark - - for i := int64(10); i < 20; i++ { - if !h.Update(i) { - t.Errorf("unexpected false for %v", i) - } - if h.Update(i - 1) { - t.Errorf("unexpected true for %v", i-1) - } - } - - m := int64(0) - wg := sync.WaitGroup{} - for i := 0; i < 300; i++ { - wg.Add(1) - v := rand.Int63() - go func(v int64) { - defer wg.Done() - h.Update(v) - }(v) - if v > m { - m = v - } - } - wg.Wait() - if m != int64(h) { - t.Errorf("unexpected value, wanted %v, got %v", m, int64(h)) - } -} diff --git a/pkg/storage/util.go b/pkg/storage/util.go index 1cceced6200..af5f2853e35 100644 --- a/pkg/storage/util.go +++ b/pkg/storage/util.go @@ -20,6 +20,7 @@ import ( "fmt" "strconv" "strings" + "sync/atomic" "k8s.io/kubernetes/pkg/api/meta" "k8s.io/kubernetes/pkg/api/validation/path" @@ -149,3 +150,20 @@ func hasPathPrefix(s, pathPrefix string) bool { } return false } + +// HighWaterMark is a thread-safe object for tracking the maximum value seen +// for some quantity. +type HighWaterMark int64 + +// Update returns true if and only if 'current' is the highest value ever seen. +func (hwm *HighWaterMark) Update(current int64) bool { + for { + old := atomic.LoadInt64((*int64)(hwm)) + if current <= old { + return false + } + if atomic.CompareAndSwapInt64((*int64)(hwm), old, current) { + return true + } + } +} diff --git a/pkg/storage/util_test.go b/pkg/storage/util_test.go index 17dcac01437..6eba9b49935 100644 --- a/pkg/storage/util_test.go +++ b/pkg/storage/util_test.go @@ -16,7 +16,11 @@ limitations under the License. package storage -import "testing" +import ( + "math/rand" + "sync" + "testing" +) func TestEtcdParseWatchResourceVersion(t *testing.T) { testCases := []struct { @@ -99,3 +103,34 @@ func TestHasPathPrefix(t *testing.T) { } } } + +func TestHighWaterMark(t *testing.T) { + var h HighWaterMark + + for i := int64(10); i < 20; i++ { + if !h.Update(i) { + t.Errorf("unexpected false for %v", i) + } + if h.Update(i - 1) { + t.Errorf("unexpected true for %v", i-1) + } + } + + m := int64(0) + wg := sync.WaitGroup{} + for i := 0; i < 300; i++ { + wg.Add(1) + v := rand.Int63() + go func(v int64) { + defer wg.Done() + h.Update(v) + }(v) + if v > m { + m = v + } + } + wg.Wait() + if m != int64(h) { + t.Errorf("unexpected value, wanted %v, got %v", m, int64(h)) + } +}