From bbb80c252b77dad56ea0d7b9c156c3d1fdde248e Mon Sep 17 00:00:00 2001 From: Tim Hockin Date: Fri, 12 May 2017 08:30:45 -0700 Subject: [PATCH] Add bounded frequency runner This lib manages runs of a function to have min and max frequencies. --- pkg/util/async/BUILD | 14 +- pkg/util/async/bounded_frequency_runner.go | 229 ++++++++++++ .../async/bounded_frequency_runner_test.go | 332 ++++++++++++++++++ 3 files changed, 573 insertions(+), 2 deletions(-) create mode 100644 pkg/util/async/bounded_frequency_runner.go create mode 100644 pkg/util/async/bounded_frequency_runner_test.go diff --git a/pkg/util/async/BUILD b/pkg/util/async/BUILD index 0992f20245c..67ba72fe84b 100644 --- a/pkg/util/async/BUILD +++ b/pkg/util/async/BUILD @@ -10,13 +10,23 @@ load( go_library( name = "go_default_library", - srcs = ["runner.go"], + srcs = [ + "bounded_frequency_runner.go", + "runner.go", + ], tags = ["automanaged"], + deps = [ + "//vendor/github.com/golang/glog:go_default_library", + "//vendor/k8s.io/client-go/util/flowcontrol:go_default_library", + ], ) go_test( name = "go_default_test", - srcs = ["runner_test.go"], + srcs = [ + "bounded_frequency_runner_test.go", + "runner_test.go", + ], library = ":go_default_library", tags = ["automanaged"], ) diff --git a/pkg/util/async/bounded_frequency_runner.go b/pkg/util/async/bounded_frequency_runner.go new file mode 100644 index 00000000000..531ac2cfee6 --- /dev/null +++ b/pkg/util/async/bounded_frequency_runner.go @@ -0,0 +1,229 @@ +/* +Copyright 2017 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package async + +import ( + "fmt" + "sync" + "time" + + "k8s.io/client-go/util/flowcontrol" + + "github.com/golang/glog" +) + +// BoundedFrequencyRunner manages runs of a user-provided function. +// See NewBoundedFrequencyRunner for examples. +type BoundedFrequencyRunner struct { + name string // the name of this instance + minInterval time.Duration // the min time between runs, modulo bursts + maxInterval time.Duration // the max time between runs + + run chan struct{} // try an async run + + mu sync.Mutex // guards runs of fn and all mutations + fn func() // function to run + lastRun time.Time // time of last run + timer timer // timer for deferred runs + limiter rateLimiter // rate limiter for on-demand runs +} + +// designed so that flowcontrol.RateLimiter satisfies +type rateLimiter interface { + TryAccept() bool + Stop() +} + +type nullLimiter struct{} + +func (nullLimiter) TryAccept() bool { + return true +} + +func (nullLimiter) Stop() {} + +var _ rateLimiter = nullLimiter{} + +// for testing +type timer interface { + // C returns the timer's selectable channel. + C() <-chan time.Time + + // See time.Timer.Reset. + Reset(d time.Duration) bool + + // See time.Timer.Stop. + Stop() bool + + // See time.Now. + Now() time.Time + + // See time.Since. + Since(t time.Time) time.Duration + + // See time.Sleep. + Sleep(d time.Duration) +} + +// implement our timer in terms of std time.Timer. +type realTimer struct { + *time.Timer +} + +func (rt realTimer) C() <-chan time.Time { + return rt.Timer.C +} + +func (rt realTimer) Now() time.Time { + return time.Now() +} + +func (rt realTimer) Since(t time.Time) time.Duration { + return time.Since(t) +} + +func (rt realTimer) Sleep(d time.Duration) { + time.Sleep(d) +} + +var _ timer = realTimer{} + +// NewBoundedFrequencyRunner creates a new BoundedFrequencyRunner instance, +// which will manage runs of the specified function. +// +// All runs will be async to the caller of BoundedFrequencyRunner.Run, but +// multiple runs are serialized. If the function needs to hold locks, it must +// take them internally. +// +// Runs of the funtion will have at least minInterval between them (from +// completion to next start), except that up to bursts may be allowed. Burst +// runs are "accumulated" over time, one per minInterval up to burstRuns total. +// This can be used, for example, to mitigate the impact of expensive operations +// being called in response to user-initiated operations. Run requests that +// would violate the minInterval are coallesced and run at the next opportunity. +// +// The function will be run at least once per maxInterval. For example, this can +// force periodic refreshes of state in the absence of anyone calling Run. +// +// Examples: +// +// NewBoundedFrequencyRunner("name", fn, time.Second, 5*time.Second, 1) +// - fn will have at least 1 second between runs +// - fn will have no more than 5 seconds between runs +// +// NewBoundedFrequencyRunner("name", fn, 3*time.Second, 10*time.Second, 3) +// - fn will have at least 3 seconds between runs, with up to 3 burst runs +// - fn will have no more than 10 seconds between runs +// +// The maxInterval must be greater than or equal to the minInterval, If the +// caller passes a maxInterval less than minInterval, this function will panic. +func NewBoundedFrequencyRunner(name string, fn func(), minInterval, maxInterval time.Duration, burstRuns int) *BoundedFrequencyRunner { + timer := realTimer{Timer: time.NewTimer(0)} // will tick immediately + <-timer.C() // consume the first tick + return construct(name, fn, minInterval, maxInterval, burstRuns, timer) +} + +// Make an instance with dependencies injected. +func construct(name string, fn func(), minInterval, maxInterval time.Duration, burstRuns int, timer timer) *BoundedFrequencyRunner { + if maxInterval < minInterval { + panic(fmt.Sprintf("%s: maxInterval (%v) must be >= minInterval (%v)", name, minInterval, maxInterval)) + } + if timer == nil { + panic(fmt.Sprintf("%s: timer must be non-nil", name)) + } + + bfr := &BoundedFrequencyRunner{ + name: name, + fn: fn, + minInterval: minInterval, + maxInterval: maxInterval, + run: make(chan struct{}, 16), + timer: timer, + } + if minInterval == 0 { + bfr.limiter = nullLimiter{} + } else { + // allow burst updates in short succession + qps := float32(time.Second) / float32(minInterval) + bfr.limiter = flowcontrol.NewTokenBucketRateLimiterWithClock(qps, burstRuns, timer) + } + return bfr +} + +// Loop handles the periodic timer and run requests. This is expected to be +// called as a goroutine. +func (bfr *BoundedFrequencyRunner) Loop(stop <-chan struct{}) { + glog.V(3).Infof("%s Loop running", bfr.name) + bfr.timer.Reset(bfr.maxInterval) + for { + select { + case <-stop: + bfr.stop() + glog.V(3).Infof("%s Loop stopping", bfr.name) + return + case <-bfr.timer.C(): + bfr.tryRun() + case <-bfr.run: + bfr.tryRun() + } + } +} + +// Run the function as soon as possible. If this is called while Loop is not +// running, the call may be deferred indefinitely. +func (bfr *BoundedFrequencyRunner) Run() { + bfr.run <- struct{}{} +} + +// assumes the lock is not held +func (bfr *BoundedFrequencyRunner) stop() { + bfr.mu.Lock() + defer bfr.mu.Unlock() + bfr.limiter.Stop() + bfr.timer.Stop() +} + +// assumes the lock is not held +func (bfr *BoundedFrequencyRunner) tryRun() { + bfr.mu.Lock() + defer bfr.mu.Unlock() + + if bfr.limiter.TryAccept() { + // We're allowed to run the function right now. + bfr.fn() + bfr.lastRun = bfr.timer.Now() + bfr.timer.Stop() + bfr.timer.Reset(bfr.maxInterval) + glog.V(3).Infof("%s: ran, next possible in %v, periodic in %v", bfr.name, bfr.minInterval, bfr.maxInterval) + return + } + + // It can't run right now, figure out when it can run next. + + elapsed := bfr.timer.Since(bfr.lastRun) // how long since last run + nextPossible := bfr.minInterval - elapsed // time to next possible run + nextScheduled := bfr.maxInterval - elapsed // time to next periodic run + glog.V(4).Infof("%s: %v since last run, possible in %v, scheduled in %v", bfr.name, elapsed, nextPossible, nextScheduled) + + if nextPossible < nextScheduled { + // Set the timer for ASAP, but don't drain here. Assuming Loop is running, + // it might get a delivery in the mean time, but that is OK. + bfr.timer.Stop() + bfr.timer.Reset(nextPossible) + glog.V(3).Infof("%s: throttled, scheduling run in %v", bfr.name, nextPossible) + } +} diff --git a/pkg/util/async/bounded_frequency_runner_test.go b/pkg/util/async/bounded_frequency_runner_test.go new file mode 100644 index 00000000000..234ffb2a3f8 --- /dev/null +++ b/pkg/util/async/bounded_frequency_runner_test.go @@ -0,0 +1,332 @@ +/* +Copyright 2017 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package async + +import ( + "sync" + "testing" + "time" +) + +// Track calls to the managed function. +type receiver struct { + lock sync.Mutex + run bool +} + +func (r *receiver) F() { + r.lock.Lock() + defer r.lock.Unlock() + r.run = true +} + +func (r *receiver) reset() bool { + r.lock.Lock() + defer r.lock.Unlock() + was := r.run + r.run = false + return was +} + +// A single change event in the fake timer. +type timerUpdate struct { + active bool + next time.Duration // iff active == true +} + +// Fake time. +type fakeTimer struct { + c chan time.Time + + lock sync.Mutex + now time.Time + active bool + + updated chan timerUpdate +} + +func newFakeTimer() *fakeTimer { + ft := &fakeTimer{ + c: make(chan time.Time), + updated: make(chan timerUpdate), + } + return ft +} + +func (ft *fakeTimer) C() <-chan time.Time { + return ft.c +} + +func (ft *fakeTimer) Reset(in time.Duration) bool { + ft.lock.Lock() + defer ft.lock.Unlock() + + was := ft.active + ft.active = true + ft.updated <- timerUpdate{ + active: true, + next: in, + } + return was +} + +func (ft *fakeTimer) Stop() bool { + ft.lock.Lock() + defer ft.lock.Unlock() + + was := ft.active + ft.active = false + ft.updated <- timerUpdate{ + active: false, + } + return was +} + +func (ft *fakeTimer) Now() time.Time { + ft.lock.Lock() + defer ft.lock.Unlock() + + return ft.now +} + +func (ft *fakeTimer) Since(t time.Time) time.Duration { + ft.lock.Lock() + defer ft.lock.Unlock() + + return ft.now.Sub(t) +} + +func (ft *fakeTimer) Sleep(d time.Duration) { + ft.lock.Lock() + defer ft.lock.Unlock() + + ft.advance(d) +} + +// advance the current time. +func (ft *fakeTimer) advance(d time.Duration) { + ft.lock.Lock() + defer ft.lock.Unlock() + + ft.now = ft.now.Add(d) +} + +// send a timer tick. +func (ft *fakeTimer) tick() { + ft.lock.Lock() + defer ft.lock.Unlock() + + ft.active = false + ft.c <- ft.now +} + +// return the calling line number (for printing) +// test the timer's state +func checkTimer(name string, t *testing.T, upd timerUpdate, active bool, next time.Duration) { + if upd.active != active { + t.Fatalf("%s: expected timer active=%v", name, active) + } + if active && upd.next != next { + t.Fatalf("%s: expected timer to be %v, got %v", name, next, upd.next) + } +} + +// test and reset the receiver's state +func checkReceiver(name string, t *testing.T, receiver *receiver, expected bool) { + triggered := receiver.reset() + if expected && !triggered { + t.Fatalf("%s: function should have been called", name) + } else if !expected && triggered { + t.Fatalf("%s: function should not have been called", name) + } +} + +// Durations embedded in test cases depend on these. +var minInterval = 1 * time.Second +var maxInterval = 10 * time.Second + +func waitForReset(name string, t *testing.T, timer *fakeTimer, obj *receiver, expectCall bool, expectNext time.Duration) { + upd := <-timer.updated // wait for stop + checkReceiver(name, t, obj, expectCall) + checkReceiver(name, t, obj, false) // prove post-condition + checkTimer(name, t, upd, false, 0) + upd = <-timer.updated // wait for reset + checkTimer(name, t, upd, true, expectNext) +} + +func waitForRun(name string, t *testing.T, timer *fakeTimer, obj *receiver) { + waitForReset(name, t, timer, obj, true, maxInterval) +} + +func waitForDefer(name string, t *testing.T, timer *fakeTimer, obj *receiver, expectNext time.Duration) { + waitForReset(name, t, timer, obj, false, expectNext) +} + +func Test_BoundedFrequencyRunnerNoBurst(t *testing.T) { + obj := &receiver{} + timer := newFakeTimer() + runner := construct("test-runner", obj.F, minInterval, maxInterval, 1, timer) + stop := make(chan struct{}) + + var upd timerUpdate + + // Start. + go runner.Loop(stop) + upd = <-timer.updated // wait for initial time to be set to max + checkTimer("init", t, upd, true, maxInterval) + checkReceiver("init", t, obj, false) + + // Run once, immediately. + // rel=0ms + runner.Run() + waitForRun("first run", t, timer, obj) + + // Run again, before minInterval expires. + timer.advance(500 * time.Millisecond) // rel=500ms + runner.Run() + waitForDefer("too soon after first", t, timer, obj, 500*time.Millisecond) + + // Run again, before minInterval expires. + timer.advance(499 * time.Millisecond) // rel=999ms + runner.Run() + waitForDefer("still too soon after first", t, timer, obj, 1*time.Millisecond) + + // Run again, once minInterval has passed (race with timer). + timer.advance(1 * time.Millisecond) // rel=1000ms + runner.Run() + waitForRun("second run", t, timer, obj) + + // Run again, before minInterval expires. + // rel=0ms + runner.Run() + waitForDefer("too soon after second", t, timer, obj, 1*time.Second) + + // Run again, before minInterval expires. + timer.advance(1 * time.Millisecond) // rel=1ms + runner.Run() + waitForDefer("still too soon after second", t, timer, obj, 999*time.Millisecond) + + // Let the timer tick prematurely. + timer.advance(998 * time.Millisecond) // rel=999ms + timer.tick() + waitForDefer("premature tick", t, timer, obj, 1*time.Millisecond) + + // Let the timer tick. + timer.advance(1 * time.Millisecond) // rel=1000ms + timer.tick() + waitForRun("first tick", t, timer, obj) + + // Let the timer tick. + timer.advance(10 * time.Second) // rel=10000ms + timer.tick() + waitForRun("second tick", t, timer, obj) + + // Run again, before minInterval expires. + timer.advance(1 * time.Millisecond) // rel=1ms + runner.Run() + waitForDefer("too soon after tick", t, timer, obj, 999*time.Millisecond) + + // Let the timer tick. + timer.advance(999 * time.Millisecond) // rel=1000ms + timer.tick() + waitForRun("third tick", t, timer, obj) + + // Clean up. + stop <- struct{}{} +} + +func Test_BoundedFrequencyRunnerBurst(t *testing.T) { + obj := &receiver{} + timer := newFakeTimer() + runner := construct("test-runner", obj.F, minInterval, maxInterval, 2, timer) + stop := make(chan struct{}) + + var upd timerUpdate + + // Start. + go runner.Loop(stop) + upd = <-timer.updated // wait for initial time to be set to max + checkTimer("init", t, upd, true, maxInterval) + checkReceiver("init", t, obj, false) + + // Run once, immediately. + // abs=0ms, rel=0ms + runner.Run() + waitForRun("first run", t, timer, obj) + + // Run again, before minInterval expires, with burst. + timer.advance(1 * time.Millisecond) // abs=1ms, rel=1ms + runner.Run() + waitForRun("second run", t, timer, obj) + + // Run again, before minInterval expires. + timer.advance(498 * time.Millisecond) // abs=499ms, rel=498ms + runner.Run() + waitForDefer("too soon after second", t, timer, obj, 502*time.Millisecond) + + // Run again, before minInterval expires. + timer.advance(1 * time.Millisecond) // abs=500ms, rel=499ms + runner.Run() + waitForDefer("too soon after second 2", t, timer, obj, 501*time.Millisecond) + + // Run again, before minInterval expires. + timer.advance(1 * time.Millisecond) // abs=501ms, rel=500ms + runner.Run() + waitForDefer("too soon after second 3", t, timer, obj, 500*time.Millisecond) + + // Run again, once burst has replenished. + timer.advance(499 * time.Millisecond) // abs=1000ms, rel=999ms + runner.Run() + waitForRun("third run", t, timer, obj) + + // Run again, before minInterval expires. + timer.advance(1 * time.Millisecond) // abs=1001ms, rel=1ms + runner.Run() + waitForDefer("too soon after third", t, timer, obj, 999*time.Millisecond) + + // Run again, before minInterval expires. + timer.advance(998 * time.Millisecond) // abs=1999ms, rel=999ms + runner.Run() + waitForDefer("too soon after third 2", t, timer, obj, 1*time.Millisecond) + + // Run again, once burst has replenished. + timer.advance(1 * time.Millisecond) // abs=2000ms, rel=1000ms + runner.Run() + waitForRun("fourth run", t, timer, obj) + + // Run again, once burst has fully replenished. + timer.advance(2 * time.Second) // abs=4000ms, rel=2000ms + runner.Run() + waitForRun("fifth run", t, timer, obj) + runner.Run() + waitForRun("sixth run", t, timer, obj) + runner.Run() + waitForDefer("too soon after sixth", t, timer, obj, 1*time.Second) + + // Let the timer tick. + timer.advance(1 * time.Second) // abs=5000ms, rel=1000ms + timer.tick() + waitForRun("first tick", t, timer, obj) + + // Let the timer tick. + timer.advance(10 * time.Second) // abs=15000ms, rel=10000ms + timer.tick() + waitForRun("second tick", t, timer, obj) + + // Clean up. + stop <- struct{}{} +}