diff --git a/pkg/controller/nodelifecycle/scheduler/timed_workers.go b/pkg/controller/nodelifecycle/scheduler/timed_workers.go index 2701e7ba23e..de4d7c29626 100644 --- a/pkg/controller/nodelifecycle/scheduler/timed_workers.go +++ b/pkg/controller/nodelifecycle/scheduler/timed_workers.go @@ -21,6 +21,7 @@ import ( "time" "k8s.io/apimachinery/pkg/types" + "k8s.io/apimachinery/pkg/util/clock" "k8s.io/klog/v2" ) @@ -45,17 +46,17 @@ type TimedWorker struct { WorkItem *WorkArgs CreatedAt time.Time FireAt time.Time - Timer *time.Timer + Timer clock.Timer } -// CreateWorker creates a TimedWorker that will execute `f` not earlier than `fireAt`. -func CreateWorker(args *WorkArgs, createdAt time.Time, fireAt time.Time, f func(args *WorkArgs) error) *TimedWorker { +// createWorker creates a TimedWorker that will execute `f` not earlier than `fireAt`. +func createWorker(args *WorkArgs, createdAt time.Time, fireAt time.Time, f func(args *WorkArgs) error, clock clock.Clock) *TimedWorker { delay := fireAt.Sub(createdAt) if delay <= 0 { go f(args) return nil } - timer := time.AfterFunc(delay, func() { f(args) }) + timer := clock.AfterFunc(delay, func() { f(args) }) return &TimedWorker{ WorkItem: args, CreatedAt: createdAt, @@ -77,6 +78,7 @@ type TimedWorkerQueue struct { // map of workers keyed by string returned by 'KeyFromWorkArgs' from the given worker. workers map[string]*TimedWorker workFunc func(args *WorkArgs) error + clock clock.Clock } // CreateWorkerQueue creates a new TimedWorkerQueue for workers that will execute @@ -85,6 +87,7 @@ func CreateWorkerQueue(f func(args *WorkArgs) error) *TimedWorkerQueue { return &TimedWorkerQueue{ workers: make(map[string]*TimedWorker), workFunc: f, + clock: clock.RealClock{}, } } @@ -115,7 +118,7 @@ func (q *TimedWorkerQueue) AddWork(args *WorkArgs, createdAt time.Time, fireAt t klog.Warningf("Trying to add already existing work for %+v. Skipping.", args) return } - worker := CreateWorker(args, createdAt, fireAt, q.getWrappedWorkerFunc(key)) + worker := createWorker(args, createdAt, fireAt, q.getWrappedWorkerFunc(key), q.clock) q.workers[key] = worker } diff --git a/pkg/controller/nodelifecycle/scheduler/timed_workers_test.go b/pkg/controller/nodelifecycle/scheduler/timed_workers_test.go index 0de8a9be5e6..a36f70ec548 100644 --- a/pkg/controller/nodelifecycle/scheduler/timed_workers_test.go +++ b/pkg/controller/nodelifecycle/scheduler/timed_workers_test.go @@ -21,6 +21,8 @@ import ( "sync/atomic" "testing" "time" + + "k8s.io/apimachinery/pkg/util/clock" ) func TestExecute(t *testing.T) { @@ -62,6 +64,8 @@ func TestExecuteDelayed(t *testing.T) { }) now := time.Now() then := now.Add(10 * time.Second) + fakeClock := clock.NewFakeClock(now) + queue.clock = fakeClock queue.AddWork(NewWorkArgs("1", "1"), now, then) queue.AddWork(NewWorkArgs("2", "2"), now, then) queue.AddWork(NewWorkArgs("3", "3"), now, then) @@ -72,6 +76,7 @@ func TestExecuteDelayed(t *testing.T) { queue.AddWork(NewWorkArgs("3", "3"), now, then) queue.AddWork(NewWorkArgs("4", "4"), now, then) queue.AddWork(NewWorkArgs("5", "5"), now, then) + fakeClock.Step(11 * time.Second) wg.Wait() lastVal := atomic.LoadInt32(&testVal) if lastVal != 5 { @@ -90,6 +95,8 @@ func TestCancel(t *testing.T) { }) now := time.Now() then := now.Add(10 * time.Second) + fakeClock := clock.NewFakeClock(now) + queue.clock = fakeClock queue.AddWork(NewWorkArgs("1", "1"), now, then) queue.AddWork(NewWorkArgs("2", "2"), now, then) queue.AddWork(NewWorkArgs("3", "3"), now, then) @@ -102,6 +109,7 @@ func TestCancel(t *testing.T) { queue.AddWork(NewWorkArgs("5", "5"), now, then) queue.CancelWork(NewWorkArgs("2", "2").KeyFromWorkArgs()) queue.CancelWork(NewWorkArgs("4", "4").KeyFromWorkArgs()) + fakeClock.Step(11 * time.Second) wg.Wait() lastVal := atomic.LoadInt32(&testVal) if lastVal != 3 { @@ -120,6 +128,8 @@ func TestCancelAndReadd(t *testing.T) { }) now := time.Now() then := now.Add(10 * time.Second) + fakeClock := clock.NewFakeClock(now) + queue.clock = fakeClock queue.AddWork(NewWorkArgs("1", "1"), now, then) queue.AddWork(NewWorkArgs("2", "2"), now, then) queue.AddWork(NewWorkArgs("3", "3"), now, then) @@ -133,6 +143,7 @@ func TestCancelAndReadd(t *testing.T) { queue.CancelWork(NewWorkArgs("2", "2").KeyFromWorkArgs()) queue.CancelWork(NewWorkArgs("4", "4").KeyFromWorkArgs()) queue.AddWork(NewWorkArgs("2", "2"), now, then) + fakeClock.Step(11 * time.Second) wg.Wait() lastVal := atomic.LoadInt32(&testVal) if lastVal != 4 { diff --git a/staging/src/k8s.io/apimachinery/pkg/util/clock/clock.go b/staging/src/k8s.io/apimachinery/pkg/util/clock/clock.go index 3e1e2517b42..1a544d3b2e4 100644 --- a/staging/src/k8s.io/apimachinery/pkg/util/clock/clock.go +++ b/staging/src/k8s.io/apimachinery/pkg/util/clock/clock.go @@ -34,6 +34,7 @@ type PassiveClock interface { type Clock interface { PassiveClock After(time.Duration) <-chan time.Time + AfterFunc(time.Duration, func()) Timer NewTimer(time.Duration) Timer Sleep(time.Duration) NewTicker(time.Duration) Ticker @@ -57,6 +58,13 @@ func (RealClock) After(d time.Duration) <-chan time.Time { return time.After(d) } +// AfterFunc is the same as time.AfterFunc(d, f). +func (RealClock) AfterFunc(d time.Duration, f func()) Timer { + return &realTimer{ + timer: time.AfterFunc(d, f), + } +} + // NewTimer returns a new Timer. func (RealClock) NewTimer(d time.Duration) Timer { return &realTimer{ @@ -95,6 +103,7 @@ type fakeClockWaiter struct { stepInterval time.Duration skipIfBlocked bool destChan chan time.Time + afterFunc func() } // NewFakePassiveClock returns a new FakePassiveClock. @@ -145,6 +154,25 @@ func (f *FakeClock) After(d time.Duration) <-chan time.Time { return ch } +// AfterFunc is the Fake version of time.AfterFunc(d, callback). +func (f *FakeClock) AfterFunc(d time.Duration, cb func()) Timer { + f.lock.Lock() + defer f.lock.Unlock() + stopTime := f.time.Add(d) + ch := make(chan time.Time, 1) // Don't block! + + timer := &fakeTimer{ + fakeClock: f, + waiter: fakeClockWaiter{ + targetTime: stopTime, + destChan: ch, + afterFunc: cb, + }, + } + f.waiters = append(f.waiters, timer.waiter) + return timer +} + // NewTimer is the Fake version of time.NewTimer(d). func (f *FakeClock) NewTimer(d time.Duration) Timer { f.lock.Lock() @@ -211,6 +239,10 @@ func (f *FakeClock) setTimeLocked(t time.Time) { w.destChan <- t } + if w.afterFunc != nil { + w.afterFunc() + } + if w.stepInterval > 0 { for !w.targetTime.After(t) { w.targetTime = w.targetTime.Add(w.stepInterval) @@ -225,8 +257,8 @@ func (f *FakeClock) setTimeLocked(t time.Time) { f.waiters = newWaiters } -// HasWaiters returns true if After has been called on f but not yet satisfied (so you can -// write race-free tests). +// HasWaiters returns true if After or AfterFunc has been called on f but not yet satisfied +// (so you can write race-free tests). func (f *FakeClock) HasWaiters() bool { f.lock.RLock() defer f.lock.RUnlock() @@ -261,6 +293,12 @@ func (*IntervalClock) After(d time.Duration) <-chan time.Time { panic("IntervalClock doesn't implement After") } +// AfterFunc is currently unimplemented, will panic. +// TODO: make interval clock use FakeClock so this can be implemented. +func (*IntervalClock) AfterFunc(d time.Duration, cb func()) Timer { + panic("IntervalClock doesn't implement AfterFunc") +} + // NewTimer is currently unimplemented, will panic. // TODO: make interval clock use FakeClock so this can be implemented. func (*IntervalClock) NewTimer(d time.Duration) Timer { diff --git a/staging/src/k8s.io/apimachinery/pkg/util/clock/clock_test.go b/staging/src/k8s.io/apimachinery/pkg/util/clock/clock_test.go index 04870743ffc..700d15ee541 100644 --- a/staging/src/k8s.io/apimachinery/pkg/util/clock/clock_test.go +++ b/staging/src/k8s.io/apimachinery/pkg/util/clock/clock_test.go @@ -147,6 +147,66 @@ func TestFakeAfter(t *testing.T) { } } +func TestFakeAfterFunc(t *testing.T) { + tc := NewFakeClock(time.Now()) + if tc.HasWaiters() { + t.Errorf("unexpected waiter?") + } + expectOneSecTimerFire := false + oneSecTimerFire := 0 + tc.AfterFunc(time.Second, func() { + if !expectOneSecTimerFire { + t.Errorf("oneSecTimer func fired") + } else { + oneSecTimerFire++ + } + }) + if !tc.HasWaiters() { + t.Errorf("unexpected lack of waiter?") + } + + expectOneOhOneSecTimerFire := false + oneOhOneSecTimerFire := 0 + tc.AfterFunc(time.Second+time.Millisecond, func() { + if !expectOneOhOneSecTimerFire { + t.Errorf("oneOhOneSecTimer func fired") + } else { + oneOhOneSecTimerFire++ + } + }) + + expectTwoSecTimerFire := false + twoSecTimerFire := 0 + twoSecTimer := tc.AfterFunc(2*time.Second, func() { + if !expectTwoSecTimerFire { + t.Errorf("twoSecTimer func fired") + } else { + twoSecTimerFire++ + } + }) + + tc.Step(999 * time.Millisecond) + + expectOneSecTimerFire = true + tc.Step(time.Millisecond) + if oneSecTimerFire != 1 { + t.Errorf("expected oneSecTimerFire=1, got %d", oneSecTimerFire) + } + expectOneSecTimerFire = false + + expectOneOhOneSecTimerFire = true + tc.Step(time.Millisecond) + if oneOhOneSecTimerFire != 1 { + // should not double-trigger! + t.Errorf("expected oneOhOneSecTimerFire=1, got %d", oneOhOneSecTimerFire) + } + expectOneOhOneSecTimerFire = false + + // ensure a canceled timer doesn't fire + twoSecTimer.Stop() + tc.Step(time.Second) +} + func TestFakeTimer(t *testing.T) { tc := NewFakeClock(time.Now()) if tc.HasWaiters() {