Merge pull request #37195 from wojtek-t/fix_hanging_cacher

Automatic merge from submit-queue

Better waiting for watch event delivery in cacher

@lavalamp - I think we should do something simple for now (and merge for 1.5), and do something a bit more sophisticated right after 1.5, WDYT?
This commit is contained in:
Kubernetes Submit Queue 2016-11-29 11:25:21 -08:00 committed by GitHub
commit a2d5df40af
2 changed files with 30 additions and 17 deletions

View File

@ -532,17 +532,23 @@ func (c *Cacher) dispatchEvents() {
func (c *Cacher) dispatchEvent(event *watchCacheEvent) { func (c *Cacher) dispatchEvent(event *watchCacheEvent) {
triggerValues, supported := c.triggerValues(event) triggerValues, supported := c.triggerValues(event)
// TODO: For now we assume we have a given <timeout> budget for dispatching
// a single event. We should consider changing to the approach with:
// - budget has upper bound at <max_timeout>
// - we add <portion> to current timeout every second
timeout := time.Duration(250) * time.Millisecond
c.Lock() c.Lock()
defer c.Unlock() defer c.Unlock()
// Iterate over "allWatchers" no matter what the trigger function is. // Iterate over "allWatchers" no matter what the trigger function is.
for _, watcher := range c.watchers.allWatchers { for _, watcher := range c.watchers.allWatchers {
watcher.add(event) watcher.add(event, &timeout)
} }
if supported { if supported {
// Iterate over watchers interested in the given values of the trigger. // Iterate over watchers interested in the given values of the trigger.
for _, triggerValue := range triggerValues { for _, triggerValue := range triggerValues {
for _, watcher := range c.watchers.valueWatchers[triggerValue] { for _, watcher := range c.watchers.valueWatchers[triggerValue] {
watcher.add(event) watcher.add(event, &timeout)
} }
} }
} else { } else {
@ -555,7 +561,7 @@ func (c *Cacher) dispatchEvent(event *watchCacheEvent) {
// Iterate over watchers interested in exact values for all values. // Iterate over watchers interested in exact values for all values.
for _, watchers := range c.watchers.valueWatchers { for _, watchers := range c.watchers.valueWatchers {
for _, watcher := range watchers { for _, watcher := range watchers {
watcher.add(event) watcher.add(event, &timeout)
} }
} }
} }
@ -729,7 +735,7 @@ func (c *cacheWatcher) stop() {
var timerPool sync.Pool var timerPool sync.Pool
func (c *cacheWatcher) add(event *watchCacheEvent) { func (c *cacheWatcher) add(event *watchCacheEvent, timeout *time.Duration) {
// Try to send the event immediately, without blocking. // Try to send the event immediately, without blocking.
select { select {
case c.input <- *event: case c.input <- *event:
@ -737,20 +743,16 @@ func (c *cacheWatcher) add(event *watchCacheEvent) {
default: default:
} }
// OK, block sending, but only for up to 5 seconds. // OK, block sending, but only for up to <timeout>.
// cacheWatcher.add is called very often, so arrange // cacheWatcher.add is called very often, so arrange
// to reuse timers instead of constantly allocating. // to reuse timers instead of constantly allocating.
trace := util.NewTrace( startTime := time.Now()
fmt.Sprintf("cacheWatcher %v: waiting for add (initial result size %v)",
reflect.TypeOf(event.Object).String(), len(c.result)))
defer trace.LogIfLong(50 * time.Millisecond)
const timeout = 5 * time.Second
t, ok := timerPool.Get().(*time.Timer) t, ok := timerPool.Get().(*time.Timer)
if ok { if ok {
t.Reset(timeout) t.Reset(*timeout)
} else { } else {
t = time.NewTimer(timeout) t = time.NewTimer(*timeout)
} }
defer timerPool.Put(t) defer timerPool.Put(t)
@ -769,6 +771,10 @@ func (c *cacheWatcher) add(event *watchCacheEvent) {
c.forget(false) c.forget(false)
c.stop() c.stop()
} }
if *timeout = *timeout - time.Since(startTime); *timeout < 0 {
*timeout = 0
}
} }
// NOTE: sendWatchCacheEvent is assumed to not modify <event> !!! // NOTE: sendWatchCacheEvent is assumed to not modify <event> !!!

View File

@ -297,12 +297,15 @@ func TestWatcherTimeout(t *testing.T) {
} }
startVersion := strconv.Itoa(int(initialVersion)) startVersion := strconv.Itoa(int(initialVersion))
// Create a watcher that will not be reading any result. // Create a number of watchers that will not be reading any result.
watcher, err := cacher.WatchList(context.TODO(), "pods/ns", startVersion, storage.Everything) nonReadingWatchers := 50
if err != nil { for i := 0; i < nonReadingWatchers; i++ {
t.Fatalf("Unexpected error: %v", err) watcher, err := cacher.WatchList(context.TODO(), "pods/ns", startVersion, storage.Everything)
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
defer watcher.Stop()
} }
defer watcher.Stop()
// Create a second watcher that will be reading result. // Create a second watcher that will be reading result.
readingWatcher, err := cacher.WatchList(context.TODO(), "pods/ns", startVersion, storage.Everything) readingWatcher, err := cacher.WatchList(context.TODO(), "pods/ns", startVersion, storage.Everything)
@ -311,11 +314,15 @@ func TestWatcherTimeout(t *testing.T) {
} }
defer readingWatcher.Stop() defer readingWatcher.Stop()
startTime := time.Now()
for i := 1; i <= 22; i++ { for i := 1; i <= 22; i++ {
pod := makeTestPod(strconv.Itoa(i)) pod := makeTestPod(strconv.Itoa(i))
_ = updatePod(t, etcdStorage, pod, nil) _ = updatePod(t, etcdStorage, pod, nil)
verifyWatchEvent(t, readingWatcher, watch.Added, pod) verifyWatchEvent(t, readingWatcher, watch.Added, pod)
} }
if time.Since(startTime) > time.Duration(250*nonReadingWatchers)*time.Millisecond {
t.Errorf("waiting for events took too long: %v", time.Since(startTime))
}
} }
func TestFiltering(t *testing.T) { func TestFiltering(t *testing.T) {