mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-27 21:47:07 +00:00
454 lines
12 KiB
Go
454 lines
12 KiB
Go
/*
|
|
Copyright 2017 The Kubernetes Authors.
|
|
|
|
Licensed under the Apache License, Version 2.0 (the "License");
|
|
you may not use this file except in compliance with the License.
|
|
You may obtain a copy of the License at
|
|
|
|
http://www.apache.org/licenses/LICENSE-2.0
|
|
|
|
Unless required by applicable law or agreed to in writing, software
|
|
distributed under the License is distributed on an "AS IS" BASIS,
|
|
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
See the License for the specific language governing permissions and
|
|
limitations under the License.
|
|
*/
|
|
|
|
package async
|
|
|
|
import (
|
|
"sync"
|
|
"testing"
|
|
"time"
|
|
)
|
|
|
|
// Track calls to the managed function.
|
|
type receiver struct {
|
|
lock sync.Mutex
|
|
run bool
|
|
retryFn func()
|
|
}
|
|
|
|
func (r *receiver) F() {
|
|
r.lock.Lock()
|
|
defer r.lock.Unlock()
|
|
r.run = true
|
|
|
|
if r.retryFn != nil {
|
|
r.retryFn()
|
|
r.retryFn = nil
|
|
}
|
|
}
|
|
|
|
func (r *receiver) reset() bool {
|
|
r.lock.Lock()
|
|
defer r.lock.Unlock()
|
|
was := r.run
|
|
r.run = false
|
|
return was
|
|
}
|
|
|
|
func (r *receiver) setRetryFn(retryFn func()) {
|
|
r.lock.Lock()
|
|
defer r.lock.Unlock()
|
|
r.retryFn = retryFn
|
|
}
|
|
|
|
// A single change event in the fake timer.
|
|
type timerUpdate struct {
|
|
active bool
|
|
next time.Duration // iff active == true
|
|
}
|
|
|
|
// Fake time.
|
|
type fakeTimer struct {
|
|
c chan time.Time
|
|
|
|
lock sync.Mutex
|
|
now time.Time
|
|
timeout time.Time
|
|
active bool
|
|
|
|
updated chan timerUpdate
|
|
}
|
|
|
|
func newFakeTimer() *fakeTimer {
|
|
ft := &fakeTimer{
|
|
now: time.Date(2000, 1, 1, 0, 0, 0, 0, time.UTC),
|
|
c: make(chan time.Time),
|
|
updated: make(chan timerUpdate),
|
|
}
|
|
return ft
|
|
}
|
|
|
|
func (ft *fakeTimer) C() <-chan time.Time {
|
|
return ft.c
|
|
}
|
|
|
|
func (ft *fakeTimer) Reset(in time.Duration) bool {
|
|
ft.lock.Lock()
|
|
defer ft.lock.Unlock()
|
|
|
|
was := ft.active
|
|
ft.active = true
|
|
ft.timeout = ft.now.Add(in)
|
|
ft.updated <- timerUpdate{
|
|
active: true,
|
|
next: in,
|
|
}
|
|
return was
|
|
}
|
|
|
|
func (ft *fakeTimer) Stop() bool {
|
|
ft.lock.Lock()
|
|
defer ft.lock.Unlock()
|
|
|
|
was := ft.active
|
|
ft.active = false
|
|
ft.updated <- timerUpdate{
|
|
active: false,
|
|
}
|
|
return was
|
|
}
|
|
|
|
func (ft *fakeTimer) Now() time.Time {
|
|
ft.lock.Lock()
|
|
defer ft.lock.Unlock()
|
|
|
|
return ft.now
|
|
}
|
|
|
|
func (ft *fakeTimer) Remaining() time.Duration {
|
|
ft.lock.Lock()
|
|
defer ft.lock.Unlock()
|
|
|
|
return ft.timeout.Sub(ft.now)
|
|
}
|
|
|
|
func (ft *fakeTimer) Since(t time.Time) time.Duration {
|
|
ft.lock.Lock()
|
|
defer ft.lock.Unlock()
|
|
|
|
return ft.now.Sub(t)
|
|
}
|
|
|
|
func (ft *fakeTimer) Sleep(d time.Duration) {
|
|
// ft.advance grabs ft.lock
|
|
ft.advance(d)
|
|
}
|
|
|
|
// advance the current time.
|
|
func (ft *fakeTimer) advance(d time.Duration) {
|
|
ft.lock.Lock()
|
|
defer ft.lock.Unlock()
|
|
|
|
ft.now = ft.now.Add(d)
|
|
if ft.active && !ft.now.Before(ft.timeout) {
|
|
ft.active = false
|
|
ft.c <- ft.timeout
|
|
}
|
|
}
|
|
|
|
// return the calling line number (for printing)
|
|
// test the timer's state
|
|
func checkTimer(name string, t *testing.T, upd timerUpdate, active bool, next time.Duration) {
|
|
if upd.active != active {
|
|
t.Fatalf("%s: expected timer active=%v", name, active)
|
|
}
|
|
if active && upd.next != next {
|
|
t.Fatalf("%s: expected timer to be %v, got %v", name, next, upd.next)
|
|
}
|
|
}
|
|
|
|
// test and reset the receiver's state
|
|
func checkReceiver(name string, t *testing.T, receiver *receiver, expected bool) {
|
|
triggered := receiver.reset()
|
|
if expected && !triggered {
|
|
t.Fatalf("%s: function should have been called", name)
|
|
} else if !expected && triggered {
|
|
t.Fatalf("%s: function should not have been called", name)
|
|
}
|
|
}
|
|
|
|
// Durations embedded in test cases depend on these.
|
|
var minInterval = 1 * time.Second
|
|
var maxInterval = 10 * time.Second
|
|
|
|
func waitForReset(name string, t *testing.T, timer *fakeTimer, obj *receiver, expectCall bool, expectNext time.Duration) {
|
|
upd := <-timer.updated // wait for stop
|
|
checkReceiver(name, t, obj, expectCall)
|
|
checkReceiver(name, t, obj, false) // prove post-condition
|
|
checkTimer(name, t, upd, false, 0)
|
|
upd = <-timer.updated // wait for reset
|
|
checkTimer(name, t, upd, true, expectNext)
|
|
}
|
|
|
|
func waitForRun(name string, t *testing.T, timer *fakeTimer, obj *receiver) {
|
|
waitForReset(name, t, timer, obj, true, maxInterval)
|
|
}
|
|
|
|
func waitForRunWithRetry(name string, t *testing.T, timer *fakeTimer, obj *receiver, expectNext time.Duration) {
|
|
// It will first get reset as with a normal run, and then get set again
|
|
waitForRun(name, t, timer, obj)
|
|
waitForReset(name, t, timer, obj, false, expectNext)
|
|
}
|
|
|
|
func waitForDefer(name string, t *testing.T, timer *fakeTimer, obj *receiver, expectNext time.Duration) {
|
|
waitForReset(name, t, timer, obj, false, expectNext)
|
|
}
|
|
|
|
func waitForNothing(name string, t *testing.T, timer *fakeTimer, obj *receiver) {
|
|
select {
|
|
case <-timer.c:
|
|
t.Fatalf("%s: unexpected timer tick", name)
|
|
case upd := <-timer.updated:
|
|
t.Fatalf("%s: unexpected timer update %v", name, upd)
|
|
default:
|
|
}
|
|
checkReceiver(name, t, obj, false)
|
|
}
|
|
|
|
func Test_BoundedFrequencyRunnerNoBurst(t *testing.T) {
|
|
obj := &receiver{}
|
|
timer := newFakeTimer()
|
|
runner := construct("test-runner", obj.F, minInterval, maxInterval, 1, timer)
|
|
stop := make(chan struct{})
|
|
|
|
var upd timerUpdate
|
|
|
|
// Start.
|
|
go runner.Loop(stop)
|
|
upd = <-timer.updated // wait for initial time to be set to max
|
|
checkTimer("init", t, upd, true, maxInterval)
|
|
checkReceiver("init", t, obj, false)
|
|
|
|
// Run once, immediately.
|
|
// rel=0ms
|
|
runner.Run()
|
|
waitForRun("first run", t, timer, obj)
|
|
|
|
// Run again, before minInterval expires.
|
|
timer.advance(500 * time.Millisecond) // rel=500ms
|
|
runner.Run()
|
|
waitForDefer("too soon after first", t, timer, obj, 500*time.Millisecond)
|
|
|
|
// Run again, before minInterval expires.
|
|
timer.advance(499 * time.Millisecond) // rel=999ms
|
|
runner.Run()
|
|
waitForDefer("still too soon after first", t, timer, obj, 1*time.Millisecond)
|
|
|
|
// Do the deferred run
|
|
timer.advance(1 * time.Millisecond) // rel=1000ms
|
|
waitForRun("second run", t, timer, obj)
|
|
|
|
// Try again immediately
|
|
runner.Run()
|
|
waitForDefer("too soon after second", t, timer, obj, 1*time.Second)
|
|
|
|
// Run again, before minInterval expires.
|
|
timer.advance(1 * time.Millisecond) // rel=1ms
|
|
runner.Run()
|
|
waitForDefer("still too soon after second", t, timer, obj, 999*time.Millisecond)
|
|
|
|
// Ensure that we don't run again early
|
|
timer.advance(998 * time.Millisecond) // rel=999ms
|
|
waitForNothing("premature", t, timer, obj)
|
|
|
|
// Do the deferred run
|
|
timer.advance(1 * time.Millisecond) // rel=1000ms
|
|
waitForRun("third run", t, timer, obj)
|
|
|
|
// Let minInterval pass, but there are no runs queued
|
|
timer.advance(1 * time.Second) // rel=1000ms
|
|
waitForNothing("minInterval", t, timer, obj)
|
|
|
|
// Let maxInterval pass
|
|
timer.advance(9 * time.Second) // rel=10000ms
|
|
waitForRun("maxInterval", t, timer, obj)
|
|
|
|
// Run again, before minInterval expires.
|
|
timer.advance(1 * time.Millisecond) // rel=1ms
|
|
runner.Run()
|
|
waitForDefer("too soon after maxInterval run", t, timer, obj, 999*time.Millisecond)
|
|
|
|
// Let minInterval pass
|
|
timer.advance(999 * time.Millisecond) // rel=1000ms
|
|
waitForRun("fifth run", t, timer, obj)
|
|
|
|
// Clean up.
|
|
stop <- struct{}{}
|
|
// a message is sent to time.updated in func Stop() at the end of the child goroutine
|
|
// to terminate the child, a receive on time.updated is needed here
|
|
<-timer.updated
|
|
}
|
|
|
|
func Test_BoundedFrequencyRunnerBurst(t *testing.T) {
|
|
obj := &receiver{}
|
|
timer := newFakeTimer()
|
|
runner := construct("test-runner", obj.F, minInterval, maxInterval, 2, timer)
|
|
stop := make(chan struct{})
|
|
|
|
var upd timerUpdate
|
|
|
|
// Start.
|
|
go runner.Loop(stop)
|
|
upd = <-timer.updated // wait for initial time to be set to max
|
|
checkTimer("init", t, upd, true, maxInterval)
|
|
checkReceiver("init", t, obj, false)
|
|
|
|
// Run once, immediately.
|
|
// abs=0ms, rel=0ms
|
|
runner.Run()
|
|
waitForRun("first run", t, timer, obj)
|
|
|
|
// Run again, before minInterval expires, with burst.
|
|
timer.advance(1 * time.Millisecond) // abs=1ms, rel=1ms
|
|
runner.Run()
|
|
waitForRun("second run", t, timer, obj)
|
|
|
|
// Run again, before minInterval expires.
|
|
timer.advance(498 * time.Millisecond) // abs=499ms, rel=498ms
|
|
runner.Run()
|
|
waitForDefer("too soon after second", t, timer, obj, 502*time.Millisecond)
|
|
|
|
// Run again, before minInterval expires.
|
|
timer.advance(1 * time.Millisecond) // abs=500ms, rel=499ms
|
|
runner.Run()
|
|
waitForDefer("too soon after second 2", t, timer, obj, 501*time.Millisecond)
|
|
|
|
// Run again, before minInterval expires.
|
|
timer.advance(1 * time.Millisecond) // abs=501ms, rel=500ms
|
|
runner.Run()
|
|
waitForDefer("too soon after second 3", t, timer, obj, 500*time.Millisecond)
|
|
|
|
// Advance timer enough to replenish bursts, but not enough to be minInterval
|
|
// after the last run
|
|
timer.advance(499 * time.Millisecond) // abs=1000ms, rel=999ms
|
|
waitForNothing("not minInterval", t, timer, obj)
|
|
runner.Run()
|
|
waitForRun("third run", t, timer, obj)
|
|
|
|
// Run again, before minInterval expires.
|
|
timer.advance(1 * time.Millisecond) // abs=1001ms, rel=1ms
|
|
runner.Run()
|
|
waitForDefer("too soon after third", t, timer, obj, 999*time.Millisecond)
|
|
|
|
// Run again, before minInterval expires.
|
|
timer.advance(998 * time.Millisecond) // abs=1999ms, rel=999ms
|
|
runner.Run()
|
|
waitForDefer("too soon after third 2", t, timer, obj, 1*time.Millisecond)
|
|
|
|
// Advance and do the deferred run
|
|
timer.advance(1 * time.Millisecond) // abs=2000ms, rel=1000ms
|
|
waitForRun("fourth run", t, timer, obj)
|
|
|
|
// Run again, once burst has fully replenished.
|
|
timer.advance(2 * time.Second) // abs=4000ms, rel=2000ms
|
|
runner.Run()
|
|
waitForRun("fifth run", t, timer, obj)
|
|
runner.Run()
|
|
waitForRun("sixth run", t, timer, obj)
|
|
runner.Run()
|
|
waitForDefer("too soon after sixth", t, timer, obj, 1*time.Second)
|
|
|
|
// Wait until minInterval after the last run
|
|
timer.advance(1 * time.Second) // abs=5000ms, rel=1000ms
|
|
waitForRun("seventh run", t, timer, obj)
|
|
|
|
// Wait for maxInterval
|
|
timer.advance(10 * time.Second) // abs=15000ms, rel=10000ms
|
|
waitForRun("maxInterval", t, timer, obj)
|
|
|
|
// Clean up.
|
|
stop <- struct{}{}
|
|
// a message is sent to time.updated in func Stop() at the end of the child goroutine
|
|
// to terminate the child, a receive on time.updated is needed here
|
|
<-timer.updated
|
|
}
|
|
|
|
func Test_BoundedFrequencyRunnerRetryAfter(t *testing.T) {
|
|
obj := &receiver{}
|
|
timer := newFakeTimer()
|
|
runner := construct("test-runner", obj.F, minInterval, maxInterval, 1, timer)
|
|
stop := make(chan struct{})
|
|
|
|
var upd timerUpdate
|
|
|
|
// Start.
|
|
go runner.Loop(stop)
|
|
upd = <-timer.updated // wait for initial time to be set to max
|
|
checkTimer("init", t, upd, true, maxInterval)
|
|
checkReceiver("init", t, obj, false)
|
|
|
|
// Run once, immediately, and queue a retry
|
|
// rel=0ms
|
|
obj.setRetryFn(func() { runner.RetryAfter(5 * time.Second) })
|
|
runner.Run()
|
|
waitForRunWithRetry("first run", t, timer, obj, 5*time.Second)
|
|
|
|
// Nothing happens...
|
|
timer.advance(time.Second) // rel=1000ms
|
|
waitForNothing("minInterval, nothing queued", t, timer, obj)
|
|
|
|
// After retryInterval, function is called
|
|
timer.advance(4 * time.Second) // rel=5000ms
|
|
waitForRun("retry", t, timer, obj)
|
|
|
|
// Run again, before minInterval expires.
|
|
timer.advance(499 * time.Millisecond) // rel=499ms
|
|
runner.Run()
|
|
waitForDefer("too soon after retry", t, timer, obj, 501*time.Millisecond)
|
|
|
|
// Do the deferred run, queue another retry after it returns
|
|
timer.advance(501 * time.Millisecond) // rel=1000ms
|
|
runner.RetryAfter(5 * time.Second)
|
|
waitForRunWithRetry("second run", t, timer, obj, 5*time.Second)
|
|
|
|
// Wait for minInterval to pass
|
|
timer.advance(time.Second) // rel=1000ms
|
|
waitForNothing("minInterval, nothing queued", t, timer, obj)
|
|
|
|
// Now do another run
|
|
runner.Run()
|
|
waitForRun("third run", t, timer, obj)
|
|
|
|
// Retry was cancelled because we already ran
|
|
timer.advance(4 * time.Second)
|
|
waitForNothing("retry cancelled", t, timer, obj)
|
|
|
|
// Run, queue a retry from a goroutine
|
|
obj.setRetryFn(func() {
|
|
go func() {
|
|
time.Sleep(100 * time.Millisecond)
|
|
runner.RetryAfter(5 * time.Second)
|
|
}()
|
|
})
|
|
runner.Run()
|
|
waitForRunWithRetry("fourth run", t, timer, obj, 5*time.Second)
|
|
|
|
// Call Run again before minInterval passes
|
|
timer.advance(100 * time.Millisecond) // rel=100ms
|
|
runner.Run()
|
|
waitForDefer("too soon after fourth run", t, timer, obj, 900*time.Millisecond)
|
|
|
|
// Deferred run will run after minInterval passes
|
|
timer.advance(900 * time.Millisecond) // rel=1000ms
|
|
waitForRun("fifth run", t, timer, obj)
|
|
|
|
// Retry was cancelled because we already ran
|
|
timer.advance(4 * time.Second) // rel=4s since run, 5s since RetryAfter
|
|
waitForNothing("retry cancelled", t, timer, obj)
|
|
|
|
// Rerun happens after maxInterval
|
|
timer.advance(5 * time.Second) // rel=9s since run, 10s since RetryAfter
|
|
waitForNothing("premature", t, timer, obj)
|
|
timer.advance(time.Second) // rel=10s since run
|
|
waitForRun("maxInterval", t, timer, obj)
|
|
|
|
// Clean up.
|
|
stop <- struct{}{}
|
|
// a message is sent to time.updated in func Stop() at the end of the child goroutine
|
|
// to terminate the child, a receive on time.updated is needed here
|
|
<-timer.updated
|
|
}
|