Introduce event clocks based on k8s.io/utils/clock

So we can move off of the apimachinery clock package.

Switch queueset to new clocks.

Removed event clocks based on apimachinery clocks,
because this PR introduces ones based on k8s.io/utils/clock .

Removed interface that is implemented by only one interesting type.

Simplify RealEventClock::EventAfterTime.
This commit is contained in:
Mike Spreitzer 2021-07-21 16:56:11 -04:00
parent 2b06890e12
commit dcb298c955
10 changed files with 527 additions and 138 deletions

View File

@ -0,0 +1,47 @@
/*
Copyright 2021 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package clock
import (
"time"
baseclock "k8s.io/utils/clock"
)
// EventFunc does some work that needs to be done at or after the
// given time.
type EventFunc func(time.Time)
// EventClock is an active clock abstraction for use in code that is
// testable with a fake clock that itself determines how time may be
// advanced. The timing paradigm is invoking EventFuncs rather than
// synchronizing through channels, so that the fake clock has a handle
// on when associated activity is done.
type EventClock interface {
baseclock.PassiveClock
// Sleep returns after the given duration (or more).
Sleep(d time.Duration)
// EventAfterDuration invokes the given EventFunc after the given duration (or more),
// passing the time when the invocation was launched.
EventAfterDuration(f EventFunc, d time.Duration)
// EventAfterTime invokes the given EventFunc at the given time or later,
// passing the time when the invocation was launched.
EventAfterTime(f EventFunc, t time.Time)
}

View File

@ -0,0 +1,42 @@
/*
Copyright 2021 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package clock
import (
"time"
"k8s.io/utils/clock"
)
// RealEventClock fires event on real world time
type RealEventClock struct {
clock.RealClock
}
// EventAfterDuration schedules an EventFunc
func (RealEventClock) EventAfterDuration(f EventFunc, d time.Duration) {
ch := time.After(d)
go func() {
t := <-ch
f(t)
}()
}
// EventAfterTime schedules an EventFunc
func (r RealEventClock) EventAfterTime(f EventFunc, t time.Time) {
r.EventAfterDuration(f, time.Until(t))
}

View File

@ -0,0 +1,68 @@
/*
Copyright 2021 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package clock
import (
"math/rand"
"sync/atomic"
"testing"
"time"
)
func TestRealEventClock(t *testing.T) {
ec := RealEventClock{}
var numDone int32
now := ec.Now()
const batchSize = 100
times := make(chan time.Time, batchSize+1)
try := func(abs bool, d time.Duration) {
f := func(u time.Time) {
realD := ec.Since(now)
atomic.AddInt32(&numDone, 1)
times <- u
if realD < d {
t.Errorf("Asked for %v, got %v", d, realD)
}
}
if abs {
ec.EventAfterTime(f, now.Add(d))
} else {
ec.EventAfterDuration(f, d)
}
}
try(true, time.Millisecond*3300)
for i := 0; i < batchSize; i++ {
d := time.Duration(rand.Intn(30)-3) * time.Millisecond * 100
try(i%2 == 0, d)
}
time.Sleep(time.Second * 4)
if atomic.LoadInt32(&numDone) != batchSize+1 {
t.Errorf("Got only %v events", numDone)
}
lastTime := now
for i := 0; i <= batchSize; i++ {
nextTime := <-times
if nextTime.Before(now) {
continue
}
dt := nextTime.Sub(lastTime) / (50 * time.Millisecond)
if dt < 0 {
t.Errorf("Got %s after %s", nextTime, lastTime)
}
lastTime = nextTime
}
}

View File

@ -1,5 +1,5 @@
/*
Copyright 2019 The Kubernetes Authors.
Copyright 2021 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
@ -14,7 +14,7 @@ See the License for the specific language governing permissions and
limitations under the License.
*/
package clock
package testing
import (
"container/heap"
@ -24,49 +24,13 @@ import (
"sync"
"time"
"k8s.io/apimachinery/pkg/util/clock"
baseclocktest "k8s.io/utils/clock/testing"
"k8s.io/apiserver/pkg/util/flowcontrol/counter"
"k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/clock"
"k8s.io/klog/v2"
)
// EventFunc does some work that needs to be done at or after the
// given time. After this function returns, associated work may continue
// on other goroutines only if they are counted by the GoRoutineCounter
// of the FakeEventClock handling this EventFunc.
type EventFunc func(time.Time)
// EventClock fires event on time
type EventClock interface {
clock.PassiveClock
EventAfterDuration(f EventFunc, d time.Duration)
EventAfterTime(f EventFunc, t time.Time)
}
// RealEventClock fires event on real world time
type RealEventClock struct {
clock.RealClock
}
// EventAfterDuration schedules an EventFunc
func (RealEventClock) EventAfterDuration(f EventFunc, d time.Duration) {
ch := time.After(d)
go func() {
t := <-ch
f(t)
}()
}
// EventAfterTime schedules an EventFunc
func (r RealEventClock) EventAfterTime(f EventFunc, t time.Time) {
now := time.Now()
d := t.Sub(now)
if d <= 0 {
go f(now)
} else {
r.EventAfterDuration(f, d)
}
}
// waitGroupCounter is a wait group used for a GoRoutineCounter. This private
// type is used to disallow direct waitGroup access
type waitGroupCounter struct {
@ -102,9 +66,20 @@ func (wgc *waitGroupCounter) Wait() {
}
// FakeEventClock is one whose time does not pass implicitly but
// rather is explicitly set by invocations of its SetTime method
// rather is explicitly set by invocations of its SetTime method.
// Each FakeEventClock has an associated GoRoutineCounter that is
// used to track associated activity.
// For the EventAfterDuration and EventAfterTime methods,
// the clock itself counts the start and stop of the EventFunc
// and the client is responsible for counting any suspend and
// resume internal to the EventFunc.
// The Sleep method must only be invoked from a goroutine that is
// counted in that GoRoutineCounter.
// The SetTime method does not return until all the triggered
// EventFuncs return. Consequently, an EventFunc given to a method
// of this clock must not wait for this clock to advance.
type FakeEventClock struct {
clock.FakePassiveClock
baseclocktest.FakePassiveClock
// waiters is a heap of waiting work, sorted by time
waiters eventWaiterHeap
@ -131,7 +106,7 @@ var _ heap.Interface = (*eventWaiterHeap)(nil)
type eventWaiter struct {
targetTime time.Time
f EventFunc
f clock.EventFunc
}
// NewFakeEventClock constructor. The given `r *rand.Rand` must
@ -149,7 +124,7 @@ func NewFakeEventClock(t time.Time, fuzz time.Duration, r *rand.Rand) (*FakeEven
r.Uint64()
}
return &FakeEventClock{
FakePassiveClock: *clock.NewFakePassiveClock(t),
FakePassiveClock: *baseclocktest.NewFakePassiveClock(t),
clientWG: grc,
fuzz: fuzz,
rand: r,
@ -169,8 +144,9 @@ func (fec *FakeEventClock) GetNextTime() (time.Time, bool) {
// Run runs all the events scheduled, and all the events they
// schedule, and so on, until there are none scheduled or the limit is not
// nil and the next time would exceed the limit. The clientWG given in
// the constructor gates each advance of time.
// nil and the next time would exceed the limit. The associated
// GoRoutineCounter gates the advancing of time. That is,
// time is not advanced until all the associated work is finished.
func (fec *FakeEventClock) Run(limit *time.Time) {
for {
fec.clientWG.Wait()
@ -200,7 +176,7 @@ func (fec *FakeEventClock) SetTime(t time.Time) {
for len(fec.waiters) > 0 && !now.Before(fec.waiters[0].targetTime) {
ew := heap.Pop(&fec.waiters).(eventWaiter)
wg.Add(1)
go func(f EventFunc) { f(now); wg.Done() }(ew.f)
go func(f clock.EventFunc) { f(now); wg.Done() }(ew.f)
foundSome = true
}
wg.Wait()
@ -211,9 +187,24 @@ func (fec *FakeEventClock) SetTime(t time.Time) {
}
}
// Sleep returns after the given duration has passed.
// Sleep must only be invoked in a goroutine that is counted
// in the FakeEventClock's associated GoRoutineCounter.
// Unlike the base FakeClock's Sleep, this method does not itself advance the clock
// but rather leaves that up to other actors (e.g., Run).
func (fec *FakeEventClock) Sleep(duration time.Duration) {
doneCh := make(chan struct{})
fec.EventAfterDuration(func(time.Time) {
fec.clientWG.Add(1)
close(doneCh)
}, duration)
fec.clientWG.Add(-1)
<-doneCh
}
// EventAfterDuration schedules the given function to be invoked once
// the given duration has passed.
func (fec *FakeEventClock) EventAfterDuration(f EventFunc, d time.Duration) {
func (fec *FakeEventClock) EventAfterDuration(f clock.EventFunc, d time.Duration) {
fec.waitersLock.Lock()
defer fec.waitersLock.Unlock()
now := fec.Now()
@ -223,7 +214,7 @@ func (fec *FakeEventClock) EventAfterDuration(f EventFunc, d time.Duration) {
// EventAfterTime schedules the given function to be invoked once
// the given time has arrived.
func (fec *FakeEventClock) EventAfterTime(f EventFunc, t time.Time) {
func (fec *FakeEventClock) EventAfterTime(f clock.EventFunc, t time.Time) {
fec.waitersLock.Lock()
defer fec.waitersLock.Unlock()
fd := time.Duration(float32(fec.fuzz) * fec.rand.Float32())

View File

@ -1,5 +1,5 @@
/*
Copyright 2019 The Kubernetes Authors.
Copyright 2021 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
@ -14,7 +14,7 @@ See the License for the specific language governing permissions and
limitations under the License.
*/
package clock
package testing
import (
"math/rand"
@ -22,23 +22,17 @@ import (
"testing"
"time"
"k8s.io/apimachinery/pkg/util/clock"
"k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/clock"
)
type TestableEventClock interface {
EventClock
clock.EventClock
SetTime(time.Time)
Run(*time.Time)
}
// settablePassiveClock allows setting current time of a passive clock
type settablePassiveClock interface {
clock.PassiveClock
SetTime(time.Time)
}
func exerciseTestableEventClock(t *testing.T, ec TestableEventClock, fuzz time.Duration) {
exercisePassiveClock(t, ec)
exerciseSettablePassiveClock(t, ec)
var numDone int32
now := ec.Now()
strictable := true
@ -104,7 +98,8 @@ func exerciseTestableEventClock(t *testing.T, ec TestableEventClock, fuzz time.D
}
}
func exercisePassiveClock(t *testing.T, pc settablePassiveClock) {
// copied from baseclocktest, because it is not public
func exerciseSettablePassiveClock(t *testing.T, pc TestableEventClock) {
t1 := time.Now()
t2 := t1.Add(time.Hour)
pc.SetTime(t1)
@ -134,50 +129,3 @@ func TestFakeEventClock(t *testing.T) {
fec, _ = NewFakeEventClock(startTime, time.Second, nil)
exerciseTestableEventClock(t, fec, time.Second)
}
func exerciseEventClock(t *testing.T, ec EventClock, relax func(time.Duration)) {
var numDone int32
now := ec.Now()
const batchSize = 100
times := make(chan time.Time, batchSize+1)
try := func(abs bool, d time.Duration) {
f := func(u time.Time) {
realD := ec.Since(now)
atomic.AddInt32(&numDone, 1)
times <- u
if realD < d {
t.Errorf("Asked for %v, got %v", d, realD)
}
}
if abs {
ec.EventAfterTime(f, now.Add(d))
} else {
ec.EventAfterDuration(f, d)
}
}
try(true, time.Millisecond*3300)
for i := 0; i < batchSize; i++ {
d := time.Duration(rand.Intn(30)-3) * time.Millisecond * 100
try(i%2 == 0, d)
}
relax(time.Second * 4)
if atomic.LoadInt32(&numDone) != batchSize+1 {
t.Errorf("Got only %v events", numDone)
}
lastTime := now
for i := 0; i <= batchSize; i++ {
nextTime := <-times
if nextTime.Before(now) {
continue
}
dt := nextTime.Sub(lastTime) / (50 * time.Millisecond)
if dt < 0 {
t.Errorf("Got %s after %s", nextTime, lastTime)
}
lastTime = nextTime
}
}
func TestRealEventClock(t *testing.T) {
exerciseEventClock(t, RealEventClock{}, func(d time.Duration) { time.Sleep(d) })
}

View File

@ -22,12 +22,12 @@ import (
"testing"
"time"
"k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/testing/clock"
testclock "k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/clock/testing"
)
func TestLockingWriteOnce(t *testing.T) {
now := time.Now()
clock, counter := clock.NewFakeEventClock(now, 0, nil)
clock, counter := testclock.NewFakeEventClock(now, 0, nil)
var lock sync.Mutex
wr := NewWriteOnce(&lock, counter)
var gots int32

View File

@ -23,7 +23,8 @@ import (
"sync"
"time"
"k8s.io/apimachinery/pkg/util/clock"
"k8s.io/utils/clock"
"k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apiserver/pkg/util/flowcontrol/counter"
"k8s.io/apiserver/pkg/util/flowcontrol/debug"
@ -33,6 +34,12 @@ import (
fqrequest "k8s.io/apiserver/pkg/util/flowcontrol/request"
"k8s.io/apiserver/pkg/util/shufflesharding"
"k8s.io/klog/v2"
// The following hack is needed to work around a tooling deficiency.
// Packages imported only for test code are not included in vendor.
// See https://kubernetes.slack.com/archives/C0EG7JC6T/p1626985671458800?thread_ts=1626983387.450800&cid=C0EG7JC6T
// The need for this hack will be removed when we make queueset use an EventClock rather than a PassiveClock.
_ "k8s.io/utils/clock/testing"
)
const nsTimeFmt = "2006-01-02 15:04:05.000000000"
@ -299,7 +306,7 @@ func (qs *queueSet) StartRequest(ctx context.Context, workEstimate *fqrequest.Wo
go func() {
defer runtime.HandleCrash()
qs.goroutineDoneOrBlocked()
_ = <-doneCh
<-doneCh
// Whatever goroutine unblocked the preceding receive MUST
// have already either (a) incremented qs.counter or (b)
// known that said counter is not actually counting or (c)

View File

@ -27,11 +27,12 @@ import (
"testing"
"time"
"k8s.io/apimachinery/pkg/util/clock"
"k8s.io/utils/clock"
"k8s.io/apiserver/pkg/util/flowcontrol/counter"
fq "k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing"
fqclocktest "k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/clock/testing"
test "k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/testing"
testclock "k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/testing/clock"
"k8s.io/apiserver/pkg/util/flowcontrol/metrics"
fcrequest "k8s.io/apiserver/pkg/util/flowcontrol/request"
"k8s.io/klog/v2"
@ -132,7 +133,7 @@ type uniformScenario struct {
expectAllRequests bool
evalInqueueMetrics, evalExecutingMetrics bool
rejectReason string
clk *testclock.FakeEventClock
clk *fqclocktest.FakeEventClock
counter counter.GoRoutineCounter
}
@ -245,7 +246,7 @@ func (ust *uniformScenarioThread) callK(k int) {
atomic.AddInt32(&ust.uss.executions[ust.i], 1)
ust.igr.Add(1)
ust.uss.clk.EventAfterDuration(ust.genCallK(k+1), ust.uc.execDuration+ust.uc.thinkDuration)
ClockWait(ust.uss.clk, ust.uss.counter, ust.uc.execDuration)
ust.uss.clk.Sleep(ust.uc.execDuration)
ust.igr.Add(-1)
})
ust.uss.t.Logf("%s: %d, %d, %d got executed=%v, idle2=%v", ust.uss.clk.Now().Format(nsTimeFmt), ust.i, ust.j, k, executed, idle2)
@ -358,16 +359,6 @@ func (uss *uniformScenarioState) finalReview() {
}
}
func ClockWait(clk *testclock.FakeEventClock, counter counter.GoRoutineCounter, duration time.Duration) {
dunch := make(chan struct{})
clk.EventAfterDuration(func(time.Time) {
counter.Add(1)
close(dunch)
}, duration)
counter.Add(-1)
<-dunch
}
func init() {
klog.InitFlags(nil)
}
@ -376,7 +367,7 @@ func init() {
func TestNoRestraint(t *testing.T) {
metrics.Register()
now := time.Now()
clk, counter := testclock.NewFakeEventClock(now, 0, nil)
clk, counter := fqclocktest.NewFakeEventClock(now, 0, nil)
nrc, err := test.NewNoRestraintFactory().BeginConstruction(fq.QueuingConfig{}, newObserverPair(clk))
if err != nil {
t.Fatal(err)
@ -402,7 +393,7 @@ func TestUniformFlowsHandSize1(t *testing.T) {
metrics.Register()
now := time.Now()
clk, counter := testclock.NewFakeEventClock(now, 0, nil)
clk, counter := fqclocktest.NewFakeEventClock(now, 0, nil)
qsf := NewQueueSetFactory(clk, counter)
qCfg := fq.QueuingConfig{
Name: "TestUniformFlowsHandSize1",
@ -439,7 +430,7 @@ func TestUniformFlowsHandSize3(t *testing.T) {
metrics.Register()
now := time.Now()
clk, counter := testclock.NewFakeEventClock(now, 0, nil)
clk, counter := fqclocktest.NewFakeEventClock(now, 0, nil)
qsf := NewQueueSetFactory(clk, counter)
qCfg := fq.QueuingConfig{
Name: "TestUniformFlowsHandSize3",
@ -475,7 +466,7 @@ func TestDifferentFlowsExpectEqual(t *testing.T) {
metrics.Register()
now := time.Now()
clk, counter := testclock.NewFakeEventClock(now, 0, nil)
clk, counter := fqclocktest.NewFakeEventClock(now, 0, nil)
qsf := NewQueueSetFactory(clk, counter)
qCfg := fq.QueuingConfig{
Name: "DiffFlowsExpectEqual",
@ -512,7 +503,7 @@ func TestDifferentFlowsExpectUnequal(t *testing.T) {
metrics.Register()
now := time.Now()
clk, counter := testclock.NewFakeEventClock(now, 0, nil)
clk, counter := fqclocktest.NewFakeEventClock(now, 0, nil)
qsf := NewQueueSetFactory(clk, counter)
qCfg := fq.QueuingConfig{
Name: "DiffFlowsExpectUnequal",
@ -549,7 +540,7 @@ func TestWindup(t *testing.T) {
metrics.Register()
now := time.Now()
clk, counter := testclock.NewFakeEventClock(now, 0, nil)
clk, counter := fqclocktest.NewFakeEventClock(now, 0, nil)
qsf := NewQueueSetFactory(clk, counter)
qCfg := fq.QueuingConfig{
Name: "TestWindup",
@ -585,7 +576,7 @@ func TestDifferentFlowsWithoutQueuing(t *testing.T) {
metrics.Register()
now := time.Now()
clk, counter := testclock.NewFakeEventClock(now, 0, nil)
clk, counter := fqclocktest.NewFakeEventClock(now, 0, nil)
qsf := NewQueueSetFactory(clk, counter)
qCfg := fq.QueuingConfig{
Name: "TestDifferentFlowsWithoutQueuing",
@ -618,7 +609,7 @@ func TestTimeout(t *testing.T) {
metrics.Register()
now := time.Now()
clk, counter := testclock.NewFakeEventClock(now, 0, nil)
clk, counter := fqclocktest.NewFakeEventClock(now, 0, nil)
qsf := NewQueueSetFactory(clk, counter)
qCfg := fq.QueuingConfig{
Name: "TestTimeout",
@ -654,7 +645,7 @@ func TestContextCancel(t *testing.T) {
metrics.Register()
metrics.Reset()
now := time.Now()
clk, counter := testclock.NewFakeEventClock(now, 0, nil)
clk, counter := fqclocktest.NewFakeEventClock(now, 0, nil)
qsf := NewQueueSetFactory(clk, counter)
qCfg := fq.QueuingConfig{
Name: "TestContextCancel",
@ -733,7 +724,7 @@ func TestTotalRequestsExecutingWithPanic(t *testing.T) {
metrics.Register()
metrics.Reset()
now := time.Now()
clk, counter := testclock.NewFakeEventClock(now, 0, nil)
clk, counter := fqclocktest.NewFakeEventClock(now, 0, nil)
qsf := NewQueueSetFactory(clk, counter)
qCfg := fq.QueuingConfig{
Name: "TestTotalRequestsExecutingWithPanic",

294
vendor/k8s.io/utils/clock/testing/fake_clock.go generated vendored Normal file
View File

@ -0,0 +1,294 @@
/*
Copyright 2014 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package testing
import (
"sync"
"time"
"k8s.io/utils/clock"
)
var (
_ = clock.PassiveClock(&FakePassiveClock{})
_ = clock.Clock(&FakeClock{})
_ = clock.Clock(&IntervalClock{})
)
// FakePassiveClock implements PassiveClock, but returns an arbitrary time.
type FakePassiveClock struct {
lock sync.RWMutex
time time.Time
}
// FakeClock implements clock.Clock, but returns an arbitrary time.
type FakeClock struct {
FakePassiveClock
// waiters are waiting for the fake time to pass their specified time
waiters []*fakeClockWaiter
}
type fakeClockWaiter struct {
targetTime time.Time
stepInterval time.Duration
skipIfBlocked bool
destChan chan time.Time
fired bool
}
// NewFakePassiveClock returns a new FakePassiveClock.
func NewFakePassiveClock(t time.Time) *FakePassiveClock {
return &FakePassiveClock{
time: t,
}
}
// NewFakeClock constructs a fake clock set to the provided time.
func NewFakeClock(t time.Time) *FakeClock {
return &FakeClock{
FakePassiveClock: *NewFakePassiveClock(t),
}
}
// Now returns f's time.
func (f *FakePassiveClock) Now() time.Time {
f.lock.RLock()
defer f.lock.RUnlock()
return f.time
}
// Since returns time since the time in f.
func (f *FakePassiveClock) Since(ts time.Time) time.Duration {
f.lock.RLock()
defer f.lock.RUnlock()
return f.time.Sub(ts)
}
// SetTime sets the time on the FakePassiveClock.
func (f *FakePassiveClock) SetTime(t time.Time) {
f.lock.Lock()
defer f.lock.Unlock()
f.time = t
}
// After is the fake version of time.After(d).
func (f *FakeClock) After(d time.Duration) <-chan time.Time {
f.lock.Lock()
defer f.lock.Unlock()
stopTime := f.time.Add(d)
ch := make(chan time.Time, 1) // Don't block!
f.waiters = append(f.waiters, &fakeClockWaiter{
targetTime: stopTime,
destChan: ch,
})
return ch
}
// NewTimer constructs a fake timer, akin to time.NewTimer(d).
func (f *FakeClock) NewTimer(d time.Duration) clock.Timer {
f.lock.Lock()
defer f.lock.Unlock()
stopTime := f.time.Add(d)
ch := make(chan time.Time, 1) // Don't block!
timer := &fakeTimer{
fakeClock: f,
waiter: fakeClockWaiter{
targetTime: stopTime,
destChan: ch,
},
}
f.waiters = append(f.waiters, &timer.waiter)
return timer
}
// Tick constructs a fake ticker, akin to time.Tick
func (f *FakeClock) Tick(d time.Duration) <-chan time.Time {
if d <= 0 {
return nil
}
f.lock.Lock()
defer f.lock.Unlock()
tickTime := f.time.Add(d)
ch := make(chan time.Time, 1) // hold one tick
f.waiters = append(f.waiters, &fakeClockWaiter{
targetTime: tickTime,
stepInterval: d,
skipIfBlocked: true,
destChan: ch,
})
return ch
}
// Step moves the clock by Duration and notifies anyone that's called After,
// Tick, or NewTimer.
func (f *FakeClock) Step(d time.Duration) {
f.lock.Lock()
defer f.lock.Unlock()
f.setTimeLocked(f.time.Add(d))
}
// SetTime sets the time.
func (f *FakeClock) SetTime(t time.Time) {
f.lock.Lock()
defer f.lock.Unlock()
f.setTimeLocked(t)
}
// Actually changes the time and checks any waiters. f must be write-locked.
func (f *FakeClock) setTimeLocked(t time.Time) {
f.time = t
newWaiters := make([]*fakeClockWaiter, 0, len(f.waiters))
for i := range f.waiters {
w := f.waiters[i]
if !w.targetTime.After(t) {
if w.skipIfBlocked {
select {
case w.destChan <- t:
w.fired = true
default:
}
} else {
w.destChan <- t
w.fired = true
}
if w.stepInterval > 0 {
for !w.targetTime.After(t) {
w.targetTime = w.targetTime.Add(w.stepInterval)
}
newWaiters = append(newWaiters, w)
}
} else {
newWaiters = append(newWaiters, f.waiters[i])
}
}
f.waiters = newWaiters
}
// HasWaiters returns true if After has been called on f but not yet satisfied (so you can
// write race-free tests).
func (f *FakeClock) HasWaiters() bool {
f.lock.RLock()
defer f.lock.RUnlock()
return len(f.waiters) > 0
}
// Sleep is akin to time.Sleep
func (f *FakeClock) Sleep(d time.Duration) {
f.Step(d)
}
// IntervalClock implements clock.Clock, but each invocation of Now steps the clock forward the specified duration
type IntervalClock struct {
Time time.Time
Duration time.Duration
}
// Now returns i's time.
func (i *IntervalClock) Now() time.Time {
i.Time = i.Time.Add(i.Duration)
return i.Time
}
// Since returns time since the time in i.
func (i *IntervalClock) Since(ts time.Time) time.Duration {
return i.Time.Sub(ts)
}
// After is unimplemented, will panic.
// TODO: make interval clock use FakeClock so this can be implemented.
func (*IntervalClock) After(d time.Duration) <-chan time.Time {
panic("IntervalClock doesn't implement After")
}
// NewTimer is unimplemented, will panic.
// TODO: make interval clock use FakeClock so this can be implemented.
func (*IntervalClock) NewTimer(d time.Duration) clock.Timer {
panic("IntervalClock doesn't implement NewTimer")
}
// Tick is unimplemented, will panic.
// TODO: make interval clock use FakeClock so this can be implemented.
func (*IntervalClock) Tick(d time.Duration) <-chan time.Time {
panic("IntervalClock doesn't implement Tick")
}
// Sleep is unimplemented, will panic.
func (*IntervalClock) Sleep(d time.Duration) {
panic("IntervalClock doesn't implement Sleep")
}
var _ = clock.Timer(&fakeTimer{})
// fakeTimer implements clock.Timer based on a FakeClock.
type fakeTimer struct {
fakeClock *FakeClock
waiter fakeClockWaiter
}
// C returns the channel that notifies when this timer has fired.
func (f *fakeTimer) C() <-chan time.Time {
return f.waiter.destChan
}
// Stop stops the timer and returns true if the timer has not yet fired, or false otherwise.
func (f *fakeTimer) Stop() bool {
f.fakeClock.lock.Lock()
defer f.fakeClock.lock.Unlock()
newWaiters := make([]*fakeClockWaiter, 0, len(f.fakeClock.waiters))
for i := range f.fakeClock.waiters {
w := f.fakeClock.waiters[i]
if w != &f.waiter {
newWaiters = append(newWaiters, w)
}
}
f.fakeClock.waiters = newWaiters
return !f.waiter.fired
}
// Reset resets the timer to the fake clock's "now" + d. It returns true if the timer has not yet
// fired, or false otherwise.
func (f *fakeTimer) Reset(d time.Duration) bool {
f.fakeClock.lock.Lock()
defer f.fakeClock.lock.Unlock()
active := !f.waiter.fired
f.waiter.fired = false
f.waiter.targetTime = f.fakeClock.time.Add(d)
var isWaiting bool
for i := range f.fakeClock.waiters {
w := f.fakeClock.waiters[i]
if w == &f.waiter {
isWaiting = true
break
}
}
if !isWaiting {
f.fakeClock.waiters = append(f.fakeClock.waiters, &f.waiter)
}
return active
}

1
vendor/modules.txt vendored
View File

@ -2270,6 +2270,7 @@ k8s.io/system-validators/validators
## explicit
k8s.io/utils/buffer
k8s.io/utils/clock
k8s.io/utils/clock/testing
k8s.io/utils/exec
k8s.io/utils/exec/testing
k8s.io/utils/inotify