diff --git a/pkg/storage/BUILD b/pkg/storage/BUILD index 0358760bc12..fc2fa17f804 100644 --- a/pkg/storage/BUILD +++ b/pkg/storage/BUILD @@ -18,6 +18,7 @@ go_library( "errors.go", "interfaces.go", "selection_predicate.go", + "time_budget.go", "util.go", "watch_cache.go", ], @@ -50,6 +51,7 @@ go_test( name = "go_default_test", srcs = [ "selection_predicate_test.go", + "time_budget_test.go", "util_test.go", "watch_cache_test.go", ], diff --git a/pkg/storage/cacher.go b/pkg/storage/cacher.go index d495d90c4e6..6febfac2b00 100644 --- a/pkg/storage/cacher.go +++ b/pkg/storage/cacher.go @@ -177,6 +177,10 @@ type Cacher struct { watcherIdx int watchers indexedWatchers + // Defines a time budget that can be spend on waiting for not-ready watchers + // while dispatching event before shutting them down. + dispatchTimeoutBudget *timeBudget + // Handling graceful termination. stopLock sync.RWMutex stopped bool @@ -199,6 +203,7 @@ func NewCacherFromConfig(config CacherConfig) *Cacher { } } + stopCh := make(chan struct{}) cacher := &Cacher{ ready: newReady(), storage: config.Storage, @@ -213,18 +218,18 @@ func NewCacherFromConfig(config CacherConfig) *Cacher { valueWatchers: make(map[string]watchersMap), }, // TODO: Figure out the correct value for the buffer size. - incoming: make(chan watchCacheEvent, 100), + incoming: make(chan watchCacheEvent, 100), + dispatchTimeoutBudget: newTimeBudget(stopCh), // We need to (potentially) stop both: // - wait.Until go-routine // - reflector.ListAndWatch // and there are no guarantees on the order that they will stop. // So we will be simply closing the channel, and synchronizing on the WaitGroup. - stopCh: make(chan struct{}), + stopCh: stopCh, } watchCache.SetOnEvent(cacher.processEvent) go cacher.dispatchEvents() - stopCh := cacher.stopCh cacher.stopWg.Add(1) go func() { defer cacher.stopWg.Done() @@ -577,23 +582,17 @@ func (c *Cacher) dispatchEvents() { func (c *Cacher) dispatchEvent(event *watchCacheEvent) { triggerValues, supported := c.triggerValues(event) - // TODO: For now we assume we have a given budget for dispatching - // a single event. We should consider changing to the approach with: - // - budget has upper bound at - // - we add to current timeout every second - timeout := time.Duration(250) * time.Millisecond - c.Lock() defer c.Unlock() // Iterate over "allWatchers" no matter what the trigger function is. for _, watcher := range c.watchers.allWatchers { - watcher.add(event, &timeout) + watcher.add(event, c.dispatchTimeoutBudget) } if supported { // Iterate over watchers interested in the given values of the trigger. for _, triggerValue := range triggerValues { for _, watcher := range c.watchers.valueWatchers[triggerValue] { - watcher.add(event, &timeout) + watcher.add(event, c.dispatchTimeoutBudget) } } } else { @@ -606,7 +605,7 @@ func (c *Cacher) dispatchEvent(event *watchCacheEvent) { // Iterate over watchers interested in exact values for all values. for _, watchers := range c.watchers.valueWatchers { for _, watcher := range watchers { - watcher.add(event, &timeout) + watcher.add(event, c.dispatchTimeoutBudget) } } } @@ -790,7 +789,7 @@ func (c *cacheWatcher) stop() { var timerPool sync.Pool -func (c *cacheWatcher) add(event *watchCacheEvent, timeout *time.Duration) { +func (c *cacheWatcher) add(event *watchCacheEvent, budget *timeBudget) { // Try to send the event immediately, without blocking. select { case c.input <- *event: @@ -802,12 +801,13 @@ func (c *cacheWatcher) add(event *watchCacheEvent, timeout *time.Duration) { // cacheWatcher.add is called very often, so arrange // to reuse timers instead of constantly allocating. startTime := time.Now() + timeout := budget.takeAvailable() t, ok := timerPool.Get().(*time.Timer) if ok { - t.Reset(*timeout) + t.Reset(timeout) } else { - t = time.NewTimer(*timeout) + t = time.NewTimer(timeout) } defer timerPool.Put(t) @@ -827,9 +827,7 @@ func (c *cacheWatcher) add(event *watchCacheEvent, timeout *time.Duration) { c.stop() } - if *timeout = *timeout - time.Since(startTime); *timeout < 0 { - *timeout = 0 - } + budget.returnUnused(timeout - time.Since(startTime)) } // NOTE: sendWatchCacheEvent is assumed to not modify !!! diff --git a/pkg/storage/time_budget.go b/pkg/storage/time_budget.go new file mode 100644 index 00000000000..0febb794b90 --- /dev/null +++ b/pkg/storage/time_budget.go @@ -0,0 +1,95 @@ +/* +Copyright 2016 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package storage + +import ( + "sync" + "time" +) + +const ( + refreshPerSecond = 50 * time.Millisecond + maxBudget = 250 * time.Millisecond +) + +// timeBudget implements a budget of time that you can use and is +// periodically being refreshed. The pattern to use it is: +// budget := newTimeBudget(...) +// ... +// timeout := budget.takeAvailable() +// // Now you can spend at most timeout on doing stuff +// ... +// // If you didn't use all timeout, return what you didn't use +// budget.returnUnused() +// +// NOTE: It's not recommended to be used concurrently from multiple threads - +// if first user takes the whole timeout, the second one will get 0 timeout +// even though the first one may return something later. +type timeBudget struct { + sync.Mutex + budget time.Duration + + refresh time.Duration + maxBudget time.Duration +} + +func newTimeBudget(stopCh <-chan struct{}) *timeBudget { + result := &timeBudget{ + budget: time.Duration(0), + refresh: refreshPerSecond, + maxBudget: maxBudget, + } + go result.periodicallyRefresh(stopCh) + return result +} + +func (t *timeBudget) periodicallyRefresh(stopCh <-chan struct{}) { + ticker := time.NewTicker(time.Second) + defer ticker.Stop() + for { + select { + case <-ticker.C: + t.Lock() + if t.budget = t.budget + t.refresh; t.budget > t.maxBudget { + t.budget = t.maxBudget + } + t.Unlock() + case <-stopCh: + return + } + } +} + +func (t *timeBudget) takeAvailable() time.Duration { + t.Lock() + defer t.Unlock() + result := t.budget + t.budget = time.Duration(0) + return result +} + +func (t *timeBudget) returnUnused(unused time.Duration) { + t.Lock() + defer t.Unlock() + if unused < 0 { + // We used more than allowed. + return + } + if t.budget = t.budget + unused; t.budget > t.maxBudget { + t.budget = t.maxBudget + } +} diff --git a/pkg/storage/time_budget_test.go b/pkg/storage/time_budget_test.go new file mode 100644 index 00000000000..99ba19bd593 --- /dev/null +++ b/pkg/storage/time_budget_test.go @@ -0,0 +1,53 @@ +/* +Copyright 2016 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package storage + +import ( + "testing" + "time" +) + +func TestTimeBudget(t *testing.T) { + budget := &timeBudget{ + budget: time.Duration(0), + maxBudget: time.Duration(200), + } + if res := budget.takeAvailable(); res != time.Duration(0) { + t.Errorf("Expected: %v, got: %v", time.Duration(0), res) + } + budget.budget = time.Duration(100) + if res := budget.takeAvailable(); res != time.Duration(100) { + t.Errorf("Expected: %v, got: %v", time.Duration(100), res) + } + if res := budget.takeAvailable(); res != time.Duration(0) { + t.Errorf("Expected: %v, got: %v", time.Duration(0), res) + } + budget.returnUnused(time.Duration(50)) + if res := budget.takeAvailable(); res != time.Duration(50) { + t.Errorf("Expected: %v, got: %v", time.Duration(50), res) + } + budget.budget = time.Duration(100) + budget.returnUnused(-time.Duration(50)) + if res := budget.takeAvailable(); res != time.Duration(100) { + t.Errorf("Expected: %v, got: %v", time.Duration(100), res) + } + // test overflow. + budget.returnUnused(time.Duration(500)) + if res := budget.takeAvailable(); res != time.Duration(200) { + t.Errorf("Expected: %v, got: %v", time.Duration(200), res) + } +}