mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-09-20 01:23:48 +00:00
sped up scheduler tests by using fake clock
This commit is contained in:
@@ -21,6 +21,7 @@ import (
|
||||
"time"
|
||||
|
||||
"k8s.io/apimachinery/pkg/types"
|
||||
"k8s.io/apimachinery/pkg/util/clock"
|
||||
|
||||
"k8s.io/klog/v2"
|
||||
)
|
||||
@@ -45,17 +46,17 @@ type TimedWorker struct {
|
||||
WorkItem *WorkArgs
|
||||
CreatedAt time.Time
|
||||
FireAt time.Time
|
||||
Timer *time.Timer
|
||||
Timer clock.Timer
|
||||
}
|
||||
|
||||
// CreateWorker creates a TimedWorker that will execute `f` not earlier than `fireAt`.
|
||||
func CreateWorker(args *WorkArgs, createdAt time.Time, fireAt time.Time, f func(args *WorkArgs) error) *TimedWorker {
|
||||
// createWorker creates a TimedWorker that will execute `f` not earlier than `fireAt`.
|
||||
func createWorker(args *WorkArgs, createdAt time.Time, fireAt time.Time, f func(args *WorkArgs) error, clock clock.Clock) *TimedWorker {
|
||||
delay := fireAt.Sub(createdAt)
|
||||
if delay <= 0 {
|
||||
go f(args)
|
||||
return nil
|
||||
}
|
||||
timer := time.AfterFunc(delay, func() { f(args) })
|
||||
timer := clock.AfterFunc(delay, func() { f(args) })
|
||||
return &TimedWorker{
|
||||
WorkItem: args,
|
||||
CreatedAt: createdAt,
|
||||
@@ -77,6 +78,7 @@ type TimedWorkerQueue struct {
|
||||
// map of workers keyed by string returned by 'KeyFromWorkArgs' from the given worker.
|
||||
workers map[string]*TimedWorker
|
||||
workFunc func(args *WorkArgs) error
|
||||
clock clock.Clock
|
||||
}
|
||||
|
||||
// CreateWorkerQueue creates a new TimedWorkerQueue for workers that will execute
|
||||
@@ -85,6 +87,7 @@ func CreateWorkerQueue(f func(args *WorkArgs) error) *TimedWorkerQueue {
|
||||
return &TimedWorkerQueue{
|
||||
workers: make(map[string]*TimedWorker),
|
||||
workFunc: f,
|
||||
clock: clock.RealClock{},
|
||||
}
|
||||
}
|
||||
|
||||
@@ -115,7 +118,7 @@ func (q *TimedWorkerQueue) AddWork(args *WorkArgs, createdAt time.Time, fireAt t
|
||||
klog.Warningf("Trying to add already existing work for %+v. Skipping.", args)
|
||||
return
|
||||
}
|
||||
worker := CreateWorker(args, createdAt, fireAt, q.getWrappedWorkerFunc(key))
|
||||
worker := createWorker(args, createdAt, fireAt, q.getWrappedWorkerFunc(key), q.clock)
|
||||
q.workers[key] = worker
|
||||
}
|
||||
|
||||
|
@@ -21,6 +21,8 @@ import (
|
||||
"sync/atomic"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"k8s.io/apimachinery/pkg/util/clock"
|
||||
)
|
||||
|
||||
func TestExecute(t *testing.T) {
|
||||
@@ -62,6 +64,8 @@ func TestExecuteDelayed(t *testing.T) {
|
||||
})
|
||||
now := time.Now()
|
||||
then := now.Add(10 * time.Second)
|
||||
fakeClock := clock.NewFakeClock(now)
|
||||
queue.clock = fakeClock
|
||||
queue.AddWork(NewWorkArgs("1", "1"), now, then)
|
||||
queue.AddWork(NewWorkArgs("2", "2"), now, then)
|
||||
queue.AddWork(NewWorkArgs("3", "3"), now, then)
|
||||
@@ -72,6 +76,7 @@ func TestExecuteDelayed(t *testing.T) {
|
||||
queue.AddWork(NewWorkArgs("3", "3"), now, then)
|
||||
queue.AddWork(NewWorkArgs("4", "4"), now, then)
|
||||
queue.AddWork(NewWorkArgs("5", "5"), now, then)
|
||||
fakeClock.Step(11 * time.Second)
|
||||
wg.Wait()
|
||||
lastVal := atomic.LoadInt32(&testVal)
|
||||
if lastVal != 5 {
|
||||
@@ -90,6 +95,8 @@ func TestCancel(t *testing.T) {
|
||||
})
|
||||
now := time.Now()
|
||||
then := now.Add(10 * time.Second)
|
||||
fakeClock := clock.NewFakeClock(now)
|
||||
queue.clock = fakeClock
|
||||
queue.AddWork(NewWorkArgs("1", "1"), now, then)
|
||||
queue.AddWork(NewWorkArgs("2", "2"), now, then)
|
||||
queue.AddWork(NewWorkArgs("3", "3"), now, then)
|
||||
@@ -102,6 +109,7 @@ func TestCancel(t *testing.T) {
|
||||
queue.AddWork(NewWorkArgs("5", "5"), now, then)
|
||||
queue.CancelWork(NewWorkArgs("2", "2").KeyFromWorkArgs())
|
||||
queue.CancelWork(NewWorkArgs("4", "4").KeyFromWorkArgs())
|
||||
fakeClock.Step(11 * time.Second)
|
||||
wg.Wait()
|
||||
lastVal := atomic.LoadInt32(&testVal)
|
||||
if lastVal != 3 {
|
||||
@@ -120,6 +128,8 @@ func TestCancelAndReadd(t *testing.T) {
|
||||
})
|
||||
now := time.Now()
|
||||
then := now.Add(10 * time.Second)
|
||||
fakeClock := clock.NewFakeClock(now)
|
||||
queue.clock = fakeClock
|
||||
queue.AddWork(NewWorkArgs("1", "1"), now, then)
|
||||
queue.AddWork(NewWorkArgs("2", "2"), now, then)
|
||||
queue.AddWork(NewWorkArgs("3", "3"), now, then)
|
||||
@@ -133,6 +143,7 @@ func TestCancelAndReadd(t *testing.T) {
|
||||
queue.CancelWork(NewWorkArgs("2", "2").KeyFromWorkArgs())
|
||||
queue.CancelWork(NewWorkArgs("4", "4").KeyFromWorkArgs())
|
||||
queue.AddWork(NewWorkArgs("2", "2"), now, then)
|
||||
fakeClock.Step(11 * time.Second)
|
||||
wg.Wait()
|
||||
lastVal := atomic.LoadInt32(&testVal)
|
||||
if lastVal != 4 {
|
||||
|
Reference in New Issue
Block a user