mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-24 12:15:52 +00:00
Merge pull request #103830 from MikeSpreitzer/new-event-clock
Introduce event clocks based on k8s.io/utils/clock
This commit is contained in:
commit
5a92b78dd2
@ -0,0 +1,47 @@
|
||||
/*
|
||||
Copyright 2021 The Kubernetes Authors.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package clock
|
||||
|
||||
import (
|
||||
"time"
|
||||
|
||||
baseclock "k8s.io/utils/clock"
|
||||
)
|
||||
|
||||
// EventFunc does some work that needs to be done at or after the
|
||||
// given time.
|
||||
type EventFunc func(time.Time)
|
||||
|
||||
// EventClock is an active clock abstraction for use in code that is
|
||||
// testable with a fake clock that itself determines how time may be
|
||||
// advanced. The timing paradigm is invoking EventFuncs rather than
|
||||
// synchronizing through channels, so that the fake clock has a handle
|
||||
// on when associated activity is done.
|
||||
type EventClock interface {
|
||||
baseclock.PassiveClock
|
||||
|
||||
// Sleep returns after the given duration (or more).
|
||||
Sleep(d time.Duration)
|
||||
|
||||
// EventAfterDuration invokes the given EventFunc after the given duration (or more),
|
||||
// passing the time when the invocation was launched.
|
||||
EventAfterDuration(f EventFunc, d time.Duration)
|
||||
|
||||
// EventAfterTime invokes the given EventFunc at the given time or later,
|
||||
// passing the time when the invocation was launched.
|
||||
EventAfterTime(f EventFunc, t time.Time)
|
||||
}
|
@ -0,0 +1,42 @@
|
||||
/*
|
||||
Copyright 2021 The Kubernetes Authors.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package clock
|
||||
|
||||
import (
|
||||
"time"
|
||||
|
||||
"k8s.io/utils/clock"
|
||||
)
|
||||
|
||||
// RealEventClock fires event on real world time
|
||||
type RealEventClock struct {
|
||||
clock.RealClock
|
||||
}
|
||||
|
||||
// EventAfterDuration schedules an EventFunc
|
||||
func (RealEventClock) EventAfterDuration(f EventFunc, d time.Duration) {
|
||||
ch := time.After(d)
|
||||
go func() {
|
||||
t := <-ch
|
||||
f(t)
|
||||
}()
|
||||
}
|
||||
|
||||
// EventAfterTime schedules an EventFunc
|
||||
func (r RealEventClock) EventAfterTime(f EventFunc, t time.Time) {
|
||||
r.EventAfterDuration(f, time.Until(t))
|
||||
}
|
@ -0,0 +1,68 @@
|
||||
/*
|
||||
Copyright 2021 The Kubernetes Authors.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package clock
|
||||
|
||||
import (
|
||||
"math/rand"
|
||||
"sync/atomic"
|
||||
"testing"
|
||||
"time"
|
||||
)
|
||||
|
||||
func TestRealEventClock(t *testing.T) {
|
||||
ec := RealEventClock{}
|
||||
var numDone int32
|
||||
now := ec.Now()
|
||||
const batchSize = 100
|
||||
times := make(chan time.Time, batchSize+1)
|
||||
try := func(abs bool, d time.Duration) {
|
||||
f := func(u time.Time) {
|
||||
realD := ec.Since(now)
|
||||
atomic.AddInt32(&numDone, 1)
|
||||
times <- u
|
||||
if realD < d {
|
||||
t.Errorf("Asked for %v, got %v", d, realD)
|
||||
}
|
||||
}
|
||||
if abs {
|
||||
ec.EventAfterTime(f, now.Add(d))
|
||||
} else {
|
||||
ec.EventAfterDuration(f, d)
|
||||
}
|
||||
}
|
||||
try(true, time.Millisecond*3300)
|
||||
for i := 0; i < batchSize; i++ {
|
||||
d := time.Duration(rand.Intn(30)-3) * time.Millisecond * 100
|
||||
try(i%2 == 0, d)
|
||||
}
|
||||
time.Sleep(time.Second * 4)
|
||||
if atomic.LoadInt32(&numDone) != batchSize+1 {
|
||||
t.Errorf("Got only %v events", numDone)
|
||||
}
|
||||
lastTime := now
|
||||
for i := 0; i <= batchSize; i++ {
|
||||
nextTime := <-times
|
||||
if nextTime.Before(now) {
|
||||
continue
|
||||
}
|
||||
dt := nextTime.Sub(lastTime) / (50 * time.Millisecond)
|
||||
if dt < 0 {
|
||||
t.Errorf("Got %s after %s", nextTime, lastTime)
|
||||
}
|
||||
lastTime = nextTime
|
||||
}
|
||||
}
|
@ -1,5 +1,5 @@
|
||||
/*
|
||||
Copyright 2019 The Kubernetes Authors.
|
||||
Copyright 2021 The Kubernetes Authors.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
@ -14,7 +14,7 @@ See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package clock
|
||||
package testing
|
||||
|
||||
import (
|
||||
"container/heap"
|
||||
@ -24,50 +24,14 @@ import (
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"k8s.io/apimachinery/pkg/util/clock"
|
||||
baseclocktest "k8s.io/utils/clock/testing"
|
||||
|
||||
"k8s.io/apiserver/pkg/util/flowcontrol/counter"
|
||||
"k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/clock"
|
||||
"k8s.io/klog/v2"
|
||||
)
|
||||
|
||||
// EventFunc does some work that needs to be done at or after the
|
||||
// given time. After this function returns, associated work may continue
|
||||
// on other goroutines only if they are counted by the GoRoutineCounter
|
||||
// of the FakeEventClock handling this EventFunc.
|
||||
type EventFunc func(time.Time)
|
||||
|
||||
// EventClock fires event on time
|
||||
type EventClock interface {
|
||||
clock.PassiveClock
|
||||
EventAfterDuration(f EventFunc, d time.Duration)
|
||||
EventAfterTime(f EventFunc, t time.Time)
|
||||
}
|
||||
|
||||
// RealEventClock fires event on real world time
|
||||
type RealEventClock struct {
|
||||
clock.RealClock
|
||||
}
|
||||
|
||||
// EventAfterDuration schedules an EventFunc
|
||||
func (RealEventClock) EventAfterDuration(f EventFunc, d time.Duration) {
|
||||
ch := time.After(d)
|
||||
go func() {
|
||||
t := <-ch
|
||||
f(t)
|
||||
}()
|
||||
}
|
||||
|
||||
// EventAfterTime schedules an EventFunc
|
||||
func (r RealEventClock) EventAfterTime(f EventFunc, t time.Time) {
|
||||
now := time.Now()
|
||||
d := t.Sub(now)
|
||||
if d <= 0 {
|
||||
go f(now)
|
||||
} else {
|
||||
r.EventAfterDuration(f, d)
|
||||
}
|
||||
}
|
||||
|
||||
// waitGroupCounter is a wait group used for a GoRoutine Counter. This private
|
||||
// waitGroupCounter is a wait group used for a GoRoutineCounter. This private
|
||||
// type is used to disallow direct waitGroup access
|
||||
type waitGroupCounter struct {
|
||||
wg sync.WaitGroup
|
||||
@ -102,9 +66,20 @@ func (wgc *waitGroupCounter) Wait() {
|
||||
}
|
||||
|
||||
// FakeEventClock is one whose time does not pass implicitly but
|
||||
// rather is explicitly set by invocations of its SetTime method
|
||||
// rather is explicitly set by invocations of its SetTime method.
|
||||
// Each FakeEventClock has an associated GoRoutineCounter that is
|
||||
// used to track associated activity.
|
||||
// For the EventAfterDuration and EventAfterTime methods,
|
||||
// the clock itself counts the start and stop of the EventFunc
|
||||
// and the client is responsible for counting any suspend and
|
||||
// resume internal to the EventFunc.
|
||||
// The Sleep method must only be invoked from a goroutine that is
|
||||
// counted in that GoRoutineCounter.
|
||||
// The SetTime method does not return until all the triggered
|
||||
// EventFuncs return. Consequently, an EventFunc given to a method
|
||||
// of this clock must not wait for this clock to advance.
|
||||
type FakeEventClock struct {
|
||||
clock.FakePassiveClock
|
||||
baseclocktest.FakePassiveClock
|
||||
|
||||
// waiters is a heap of waiting work, sorted by time
|
||||
waiters eventWaiterHeap
|
||||
@ -131,7 +106,7 @@ var _ heap.Interface = (*eventWaiterHeap)(nil)
|
||||
|
||||
type eventWaiter struct {
|
||||
targetTime time.Time
|
||||
f EventFunc
|
||||
f clock.EventFunc
|
||||
}
|
||||
|
||||
// NewFakeEventClock constructor. The given `r *rand.Rand` must
|
||||
@ -149,7 +124,7 @@ func NewFakeEventClock(t time.Time, fuzz time.Duration, r *rand.Rand) (*FakeEven
|
||||
r.Uint64()
|
||||
}
|
||||
return &FakeEventClock{
|
||||
FakePassiveClock: *clock.NewFakePassiveClock(t),
|
||||
FakePassiveClock: *baseclocktest.NewFakePassiveClock(t),
|
||||
clientWG: grc,
|
||||
fuzz: fuzz,
|
||||
rand: r,
|
||||
@ -169,8 +144,9 @@ func (fec *FakeEventClock) GetNextTime() (time.Time, bool) {
|
||||
|
||||
// Run runs all the events scheduled, and all the events they
|
||||
// schedule, and so on, until there are none scheduled or the limit is not
|
||||
// nil and the next time would exceed the limit. The clientWG given in
|
||||
// the constructor gates each advance of time.
|
||||
// nil and the next time would exceed the limit. The associated
|
||||
// GoRoutineCounter gates the advancing of time. That is,
|
||||
// time is not advanced until all the associated work is finished.
|
||||
func (fec *FakeEventClock) Run(limit *time.Time) {
|
||||
for {
|
||||
fec.clientWG.Wait()
|
||||
@ -200,7 +176,7 @@ func (fec *FakeEventClock) SetTime(t time.Time) {
|
||||
for len(fec.waiters) > 0 && !now.Before(fec.waiters[0].targetTime) {
|
||||
ew := heap.Pop(&fec.waiters).(eventWaiter)
|
||||
wg.Add(1)
|
||||
go func(f EventFunc) { f(now); wg.Done() }(ew.f)
|
||||
go func(f clock.EventFunc) { f(now); wg.Done() }(ew.f)
|
||||
foundSome = true
|
||||
}
|
||||
wg.Wait()
|
||||
@ -211,9 +187,24 @@ func (fec *FakeEventClock) SetTime(t time.Time) {
|
||||
}
|
||||
}
|
||||
|
||||
// Sleep returns after the given duration has passed.
|
||||
// Sleep must only be invoked in a goroutine that is counted
|
||||
// in the FakeEventClock's associated GoRoutineCounter.
|
||||
// Unlike the base FakeClock's Sleep, this method does not itself advance the clock
|
||||
// but rather leaves that up to other actors (e.g., Run).
|
||||
func (fec *FakeEventClock) Sleep(duration time.Duration) {
|
||||
doneCh := make(chan struct{})
|
||||
fec.EventAfterDuration(func(time.Time) {
|
||||
fec.clientWG.Add(1)
|
||||
close(doneCh)
|
||||
}, duration)
|
||||
fec.clientWG.Add(-1)
|
||||
<-doneCh
|
||||
}
|
||||
|
||||
// EventAfterDuration schedules the given function to be invoked once
|
||||
// the given duration has passed.
|
||||
func (fec *FakeEventClock) EventAfterDuration(f EventFunc, d time.Duration) {
|
||||
func (fec *FakeEventClock) EventAfterDuration(f clock.EventFunc, d time.Duration) {
|
||||
fec.waitersLock.Lock()
|
||||
defer fec.waitersLock.Unlock()
|
||||
now := fec.Now()
|
||||
@ -223,7 +214,7 @@ func (fec *FakeEventClock) EventAfterDuration(f EventFunc, d time.Duration) {
|
||||
|
||||
// EventAfterTime schedules the given function to be invoked once
|
||||
// the given time has arrived.
|
||||
func (fec *FakeEventClock) EventAfterTime(f EventFunc, t time.Time) {
|
||||
func (fec *FakeEventClock) EventAfterTime(f clock.EventFunc, t time.Time) {
|
||||
fec.waitersLock.Lock()
|
||||
defer fec.waitersLock.Unlock()
|
||||
fd := time.Duration(float32(fec.fuzz) * fec.rand.Float32())
|
@ -1,5 +1,5 @@
|
||||
/*
|
||||
Copyright 2019 The Kubernetes Authors.
|
||||
Copyright 2021 The Kubernetes Authors.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
@ -14,7 +14,7 @@ See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package clock
|
||||
package testing
|
||||
|
||||
import (
|
||||
"math/rand"
|
||||
@ -22,23 +22,17 @@ import (
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"k8s.io/apimachinery/pkg/util/clock"
|
||||
"k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/clock"
|
||||
)
|
||||
|
||||
type TestableEventClock interface {
|
||||
EventClock
|
||||
clock.EventClock
|
||||
SetTime(time.Time)
|
||||
Run(*time.Time)
|
||||
}
|
||||
|
||||
// settablePassiveClock allows setting current time of a passive clock
|
||||
type settablePassiveClock interface {
|
||||
clock.PassiveClock
|
||||
SetTime(time.Time)
|
||||
}
|
||||
|
||||
func exerciseTestableEventClock(t *testing.T, ec TestableEventClock, fuzz time.Duration) {
|
||||
exercisePassiveClock(t, ec)
|
||||
exerciseSettablePassiveClock(t, ec)
|
||||
var numDone int32
|
||||
now := ec.Now()
|
||||
strictable := true
|
||||
@ -104,7 +98,8 @@ func exerciseTestableEventClock(t *testing.T, ec TestableEventClock, fuzz time.D
|
||||
}
|
||||
}
|
||||
|
||||
func exercisePassiveClock(t *testing.T, pc settablePassiveClock) {
|
||||
// copied from baseclocktest, because it is not public
|
||||
func exerciseSettablePassiveClock(t *testing.T, pc TestableEventClock) {
|
||||
t1 := time.Now()
|
||||
t2 := t1.Add(time.Hour)
|
||||
pc.SetTime(t1)
|
||||
@ -134,50 +129,3 @@ func TestFakeEventClock(t *testing.T) {
|
||||
fec, _ = NewFakeEventClock(startTime, time.Second, nil)
|
||||
exerciseTestableEventClock(t, fec, time.Second)
|
||||
}
|
||||
|
||||
func exerciseEventClock(t *testing.T, ec EventClock, relax func(time.Duration)) {
|
||||
var numDone int32
|
||||
now := ec.Now()
|
||||
const batchSize = 100
|
||||
times := make(chan time.Time, batchSize+1)
|
||||
try := func(abs bool, d time.Duration) {
|
||||
f := func(u time.Time) {
|
||||
realD := ec.Since(now)
|
||||
atomic.AddInt32(&numDone, 1)
|
||||
times <- u
|
||||
if realD < d {
|
||||
t.Errorf("Asked for %v, got %v", d, realD)
|
||||
}
|
||||
}
|
||||
if abs {
|
||||
ec.EventAfterTime(f, now.Add(d))
|
||||
} else {
|
||||
ec.EventAfterDuration(f, d)
|
||||
}
|
||||
}
|
||||
try(true, time.Millisecond*3300)
|
||||
for i := 0; i < batchSize; i++ {
|
||||
d := time.Duration(rand.Intn(30)-3) * time.Millisecond * 100
|
||||
try(i%2 == 0, d)
|
||||
}
|
||||
relax(time.Second * 4)
|
||||
if atomic.LoadInt32(&numDone) != batchSize+1 {
|
||||
t.Errorf("Got only %v events", numDone)
|
||||
}
|
||||
lastTime := now
|
||||
for i := 0; i <= batchSize; i++ {
|
||||
nextTime := <-times
|
||||
if nextTime.Before(now) {
|
||||
continue
|
||||
}
|
||||
dt := nextTime.Sub(lastTime) / (50 * time.Millisecond)
|
||||
if dt < 0 {
|
||||
t.Errorf("Got %s after %s", nextTime, lastTime)
|
||||
}
|
||||
lastTime = nextTime
|
||||
}
|
||||
}
|
||||
|
||||
func TestRealEventClock(t *testing.T) {
|
||||
exerciseEventClock(t, RealEventClock{}, func(d time.Duration) { time.Sleep(d) })
|
||||
}
|
@ -22,12 +22,12 @@ import (
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/testing/clock"
|
||||
testclock "k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/clock/testing"
|
||||
)
|
||||
|
||||
func TestLockingWriteOnce(t *testing.T) {
|
||||
now := time.Now()
|
||||
clock, counter := clock.NewFakeEventClock(now, 0, nil)
|
||||
clock, counter := testclock.NewFakeEventClock(now, 0, nil)
|
||||
var lock sync.Mutex
|
||||
wr := NewWriteOnce(&lock, counter)
|
||||
var gots int32
|
||||
|
@ -23,7 +23,8 @@ import (
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"k8s.io/apimachinery/pkg/util/clock"
|
||||
"k8s.io/utils/clock"
|
||||
|
||||
"k8s.io/apimachinery/pkg/util/runtime"
|
||||
"k8s.io/apiserver/pkg/util/flowcontrol/counter"
|
||||
"k8s.io/apiserver/pkg/util/flowcontrol/debug"
|
||||
@ -33,6 +34,12 @@ import (
|
||||
fqrequest "k8s.io/apiserver/pkg/util/flowcontrol/request"
|
||||
"k8s.io/apiserver/pkg/util/shufflesharding"
|
||||
"k8s.io/klog/v2"
|
||||
|
||||
// The following hack is needed to work around a tooling deficiency.
|
||||
// Packages imported only for test code are not included in vendor.
|
||||
// See https://kubernetes.slack.com/archives/C0EG7JC6T/p1626985671458800?thread_ts=1626983387.450800&cid=C0EG7JC6T
|
||||
// The need for this hack will be removed when we make queueset use an EventClock rather than a PassiveClock.
|
||||
_ "k8s.io/utils/clock/testing"
|
||||
)
|
||||
|
||||
const nsTimeFmt = "2006-01-02 15:04:05.000000000"
|
||||
@ -299,7 +306,7 @@ func (qs *queueSet) StartRequest(ctx context.Context, workEstimate *fqrequest.Wo
|
||||
go func() {
|
||||
defer runtime.HandleCrash()
|
||||
qs.goroutineDoneOrBlocked()
|
||||
_ = <-doneCh
|
||||
<-doneCh
|
||||
// Whatever goroutine unblocked the preceding receive MUST
|
||||
// have already either (a) incremented qs.counter or (b)
|
||||
// known that said counter is not actually counting or (c)
|
||||
|
@ -27,11 +27,12 @@ import (
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"k8s.io/apimachinery/pkg/util/clock"
|
||||
"k8s.io/utils/clock"
|
||||
|
||||
"k8s.io/apiserver/pkg/util/flowcontrol/counter"
|
||||
fq "k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing"
|
||||
fqclocktest "k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/clock/testing"
|
||||
test "k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/testing"
|
||||
testclock "k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/testing/clock"
|
||||
"k8s.io/apiserver/pkg/util/flowcontrol/metrics"
|
||||
fcrequest "k8s.io/apiserver/pkg/util/flowcontrol/request"
|
||||
"k8s.io/klog/v2"
|
||||
@ -132,7 +133,7 @@ type uniformScenario struct {
|
||||
expectAllRequests bool
|
||||
evalInqueueMetrics, evalExecutingMetrics bool
|
||||
rejectReason string
|
||||
clk *testclock.FakeEventClock
|
||||
clk *fqclocktest.FakeEventClock
|
||||
counter counter.GoRoutineCounter
|
||||
}
|
||||
|
||||
@ -245,7 +246,7 @@ func (ust *uniformScenarioThread) callK(k int) {
|
||||
atomic.AddInt32(&ust.uss.executions[ust.i], 1)
|
||||
ust.igr.Add(1)
|
||||
ust.uss.clk.EventAfterDuration(ust.genCallK(k+1), ust.uc.execDuration+ust.uc.thinkDuration)
|
||||
ClockWait(ust.uss.clk, ust.uss.counter, ust.uc.execDuration)
|
||||
ust.uss.clk.Sleep(ust.uc.execDuration)
|
||||
ust.igr.Add(-1)
|
||||
})
|
||||
ust.uss.t.Logf("%s: %d, %d, %d got executed=%v, idle2=%v", ust.uss.clk.Now().Format(nsTimeFmt), ust.i, ust.j, k, executed, idle2)
|
||||
@ -358,16 +359,6 @@ func (uss *uniformScenarioState) finalReview() {
|
||||
}
|
||||
}
|
||||
|
||||
func ClockWait(clk *testclock.FakeEventClock, counter counter.GoRoutineCounter, duration time.Duration) {
|
||||
dunch := make(chan struct{})
|
||||
clk.EventAfterDuration(func(time.Time) {
|
||||
counter.Add(1)
|
||||
close(dunch)
|
||||
}, duration)
|
||||
counter.Add(-1)
|
||||
<-dunch
|
||||
}
|
||||
|
||||
func init() {
|
||||
klog.InitFlags(nil)
|
||||
}
|
||||
@ -376,7 +367,7 @@ func init() {
|
||||
func TestNoRestraint(t *testing.T) {
|
||||
metrics.Register()
|
||||
now := time.Now()
|
||||
clk, counter := testclock.NewFakeEventClock(now, 0, nil)
|
||||
clk, counter := fqclocktest.NewFakeEventClock(now, 0, nil)
|
||||
nrc, err := test.NewNoRestraintFactory().BeginConstruction(fq.QueuingConfig{}, newObserverPair(clk))
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
@ -402,7 +393,7 @@ func TestUniformFlowsHandSize1(t *testing.T) {
|
||||
metrics.Register()
|
||||
now := time.Now()
|
||||
|
||||
clk, counter := testclock.NewFakeEventClock(now, 0, nil)
|
||||
clk, counter := fqclocktest.NewFakeEventClock(now, 0, nil)
|
||||
qsf := NewQueueSetFactory(clk, counter)
|
||||
qCfg := fq.QueuingConfig{
|
||||
Name: "TestUniformFlowsHandSize1",
|
||||
@ -439,7 +430,7 @@ func TestUniformFlowsHandSize3(t *testing.T) {
|
||||
metrics.Register()
|
||||
now := time.Now()
|
||||
|
||||
clk, counter := testclock.NewFakeEventClock(now, 0, nil)
|
||||
clk, counter := fqclocktest.NewFakeEventClock(now, 0, nil)
|
||||
qsf := NewQueueSetFactory(clk, counter)
|
||||
qCfg := fq.QueuingConfig{
|
||||
Name: "TestUniformFlowsHandSize3",
|
||||
@ -475,7 +466,7 @@ func TestDifferentFlowsExpectEqual(t *testing.T) {
|
||||
metrics.Register()
|
||||
now := time.Now()
|
||||
|
||||
clk, counter := testclock.NewFakeEventClock(now, 0, nil)
|
||||
clk, counter := fqclocktest.NewFakeEventClock(now, 0, nil)
|
||||
qsf := NewQueueSetFactory(clk, counter)
|
||||
qCfg := fq.QueuingConfig{
|
||||
Name: "DiffFlowsExpectEqual",
|
||||
@ -512,7 +503,7 @@ func TestDifferentFlowsExpectUnequal(t *testing.T) {
|
||||
metrics.Register()
|
||||
now := time.Now()
|
||||
|
||||
clk, counter := testclock.NewFakeEventClock(now, 0, nil)
|
||||
clk, counter := fqclocktest.NewFakeEventClock(now, 0, nil)
|
||||
qsf := NewQueueSetFactory(clk, counter)
|
||||
qCfg := fq.QueuingConfig{
|
||||
Name: "DiffFlowsExpectUnequal",
|
||||
@ -549,7 +540,7 @@ func TestWindup(t *testing.T) {
|
||||
metrics.Register()
|
||||
now := time.Now()
|
||||
|
||||
clk, counter := testclock.NewFakeEventClock(now, 0, nil)
|
||||
clk, counter := fqclocktest.NewFakeEventClock(now, 0, nil)
|
||||
qsf := NewQueueSetFactory(clk, counter)
|
||||
qCfg := fq.QueuingConfig{
|
||||
Name: "TestWindup",
|
||||
@ -585,7 +576,7 @@ func TestDifferentFlowsWithoutQueuing(t *testing.T) {
|
||||
metrics.Register()
|
||||
now := time.Now()
|
||||
|
||||
clk, counter := testclock.NewFakeEventClock(now, 0, nil)
|
||||
clk, counter := fqclocktest.NewFakeEventClock(now, 0, nil)
|
||||
qsf := NewQueueSetFactory(clk, counter)
|
||||
qCfg := fq.QueuingConfig{
|
||||
Name: "TestDifferentFlowsWithoutQueuing",
|
||||
@ -618,7 +609,7 @@ func TestTimeout(t *testing.T) {
|
||||
metrics.Register()
|
||||
now := time.Now()
|
||||
|
||||
clk, counter := testclock.NewFakeEventClock(now, 0, nil)
|
||||
clk, counter := fqclocktest.NewFakeEventClock(now, 0, nil)
|
||||
qsf := NewQueueSetFactory(clk, counter)
|
||||
qCfg := fq.QueuingConfig{
|
||||
Name: "TestTimeout",
|
||||
@ -654,7 +645,7 @@ func TestContextCancel(t *testing.T) {
|
||||
metrics.Register()
|
||||
metrics.Reset()
|
||||
now := time.Now()
|
||||
clk, counter := testclock.NewFakeEventClock(now, 0, nil)
|
||||
clk, counter := fqclocktest.NewFakeEventClock(now, 0, nil)
|
||||
qsf := NewQueueSetFactory(clk, counter)
|
||||
qCfg := fq.QueuingConfig{
|
||||
Name: "TestContextCancel",
|
||||
@ -733,7 +724,7 @@ func TestTotalRequestsExecutingWithPanic(t *testing.T) {
|
||||
metrics.Register()
|
||||
metrics.Reset()
|
||||
now := time.Now()
|
||||
clk, counter := testclock.NewFakeEventClock(now, 0, nil)
|
||||
clk, counter := fqclocktest.NewFakeEventClock(now, 0, nil)
|
||||
qsf := NewQueueSetFactory(clk, counter)
|
||||
qCfg := fq.QueuingConfig{
|
||||
Name: "TestTotalRequestsExecutingWithPanic",
|
||||
|
294
vendor/k8s.io/utils/clock/testing/fake_clock.go
generated
vendored
Normal file
294
vendor/k8s.io/utils/clock/testing/fake_clock.go
generated
vendored
Normal file
@ -0,0 +1,294 @@
|
||||
/*
|
||||
Copyright 2014 The Kubernetes Authors.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package testing
|
||||
|
||||
import (
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"k8s.io/utils/clock"
|
||||
)
|
||||
|
||||
var (
|
||||
_ = clock.PassiveClock(&FakePassiveClock{})
|
||||
_ = clock.Clock(&FakeClock{})
|
||||
_ = clock.Clock(&IntervalClock{})
|
||||
)
|
||||
|
||||
// FakePassiveClock implements PassiveClock, but returns an arbitrary time.
|
||||
type FakePassiveClock struct {
|
||||
lock sync.RWMutex
|
||||
time time.Time
|
||||
}
|
||||
|
||||
// FakeClock implements clock.Clock, but returns an arbitrary time.
|
||||
type FakeClock struct {
|
||||
FakePassiveClock
|
||||
|
||||
// waiters are waiting for the fake time to pass their specified time
|
||||
waiters []*fakeClockWaiter
|
||||
}
|
||||
|
||||
type fakeClockWaiter struct {
|
||||
targetTime time.Time
|
||||
stepInterval time.Duration
|
||||
skipIfBlocked bool
|
||||
destChan chan time.Time
|
||||
fired bool
|
||||
}
|
||||
|
||||
// NewFakePassiveClock returns a new FakePassiveClock.
|
||||
func NewFakePassiveClock(t time.Time) *FakePassiveClock {
|
||||
return &FakePassiveClock{
|
||||
time: t,
|
||||
}
|
||||
}
|
||||
|
||||
// NewFakeClock constructs a fake clock set to the provided time.
|
||||
func NewFakeClock(t time.Time) *FakeClock {
|
||||
return &FakeClock{
|
||||
FakePassiveClock: *NewFakePassiveClock(t),
|
||||
}
|
||||
}
|
||||
|
||||
// Now returns f's time.
|
||||
func (f *FakePassiveClock) Now() time.Time {
|
||||
f.lock.RLock()
|
||||
defer f.lock.RUnlock()
|
||||
return f.time
|
||||
}
|
||||
|
||||
// Since returns time since the time in f.
|
||||
func (f *FakePassiveClock) Since(ts time.Time) time.Duration {
|
||||
f.lock.RLock()
|
||||
defer f.lock.RUnlock()
|
||||
return f.time.Sub(ts)
|
||||
}
|
||||
|
||||
// SetTime sets the time on the FakePassiveClock.
|
||||
func (f *FakePassiveClock) SetTime(t time.Time) {
|
||||
f.lock.Lock()
|
||||
defer f.lock.Unlock()
|
||||
f.time = t
|
||||
}
|
||||
|
||||
// After is the fake version of time.After(d).
|
||||
func (f *FakeClock) After(d time.Duration) <-chan time.Time {
|
||||
f.lock.Lock()
|
||||
defer f.lock.Unlock()
|
||||
stopTime := f.time.Add(d)
|
||||
ch := make(chan time.Time, 1) // Don't block!
|
||||
f.waiters = append(f.waiters, &fakeClockWaiter{
|
||||
targetTime: stopTime,
|
||||
destChan: ch,
|
||||
})
|
||||
return ch
|
||||
}
|
||||
|
||||
// NewTimer constructs a fake timer, akin to time.NewTimer(d).
|
||||
func (f *FakeClock) NewTimer(d time.Duration) clock.Timer {
|
||||
f.lock.Lock()
|
||||
defer f.lock.Unlock()
|
||||
stopTime := f.time.Add(d)
|
||||
ch := make(chan time.Time, 1) // Don't block!
|
||||
timer := &fakeTimer{
|
||||
fakeClock: f,
|
||||
waiter: fakeClockWaiter{
|
||||
targetTime: stopTime,
|
||||
destChan: ch,
|
||||
},
|
||||
}
|
||||
f.waiters = append(f.waiters, &timer.waiter)
|
||||
return timer
|
||||
}
|
||||
|
||||
// Tick constructs a fake ticker, akin to time.Tick
|
||||
func (f *FakeClock) Tick(d time.Duration) <-chan time.Time {
|
||||
if d <= 0 {
|
||||
return nil
|
||||
}
|
||||
f.lock.Lock()
|
||||
defer f.lock.Unlock()
|
||||
tickTime := f.time.Add(d)
|
||||
ch := make(chan time.Time, 1) // hold one tick
|
||||
f.waiters = append(f.waiters, &fakeClockWaiter{
|
||||
targetTime: tickTime,
|
||||
stepInterval: d,
|
||||
skipIfBlocked: true,
|
||||
destChan: ch,
|
||||
})
|
||||
|
||||
return ch
|
||||
}
|
||||
|
||||
// Step moves the clock by Duration and notifies anyone that's called After,
|
||||
// Tick, or NewTimer.
|
||||
func (f *FakeClock) Step(d time.Duration) {
|
||||
f.lock.Lock()
|
||||
defer f.lock.Unlock()
|
||||
f.setTimeLocked(f.time.Add(d))
|
||||
}
|
||||
|
||||
// SetTime sets the time.
|
||||
func (f *FakeClock) SetTime(t time.Time) {
|
||||
f.lock.Lock()
|
||||
defer f.lock.Unlock()
|
||||
f.setTimeLocked(t)
|
||||
}
|
||||
|
||||
// Actually changes the time and checks any waiters. f must be write-locked.
|
||||
func (f *FakeClock) setTimeLocked(t time.Time) {
|
||||
f.time = t
|
||||
newWaiters := make([]*fakeClockWaiter, 0, len(f.waiters))
|
||||
for i := range f.waiters {
|
||||
w := f.waiters[i]
|
||||
if !w.targetTime.After(t) {
|
||||
|
||||
if w.skipIfBlocked {
|
||||
select {
|
||||
case w.destChan <- t:
|
||||
w.fired = true
|
||||
default:
|
||||
}
|
||||
} else {
|
||||
w.destChan <- t
|
||||
w.fired = true
|
||||
}
|
||||
|
||||
if w.stepInterval > 0 {
|
||||
for !w.targetTime.After(t) {
|
||||
w.targetTime = w.targetTime.Add(w.stepInterval)
|
||||
}
|
||||
newWaiters = append(newWaiters, w)
|
||||
}
|
||||
|
||||
} else {
|
||||
newWaiters = append(newWaiters, f.waiters[i])
|
||||
}
|
||||
}
|
||||
f.waiters = newWaiters
|
||||
}
|
||||
|
||||
// HasWaiters returns true if After has been called on f but not yet satisfied (so you can
|
||||
// write race-free tests).
|
||||
func (f *FakeClock) HasWaiters() bool {
|
||||
f.lock.RLock()
|
||||
defer f.lock.RUnlock()
|
||||
return len(f.waiters) > 0
|
||||
}
|
||||
|
||||
// Sleep is akin to time.Sleep
|
||||
func (f *FakeClock) Sleep(d time.Duration) {
|
||||
f.Step(d)
|
||||
}
|
||||
|
||||
// IntervalClock implements clock.Clock, but each invocation of Now steps the clock forward the specified duration
|
||||
type IntervalClock struct {
|
||||
Time time.Time
|
||||
Duration time.Duration
|
||||
}
|
||||
|
||||
// Now returns i's time.
|
||||
func (i *IntervalClock) Now() time.Time {
|
||||
i.Time = i.Time.Add(i.Duration)
|
||||
return i.Time
|
||||
}
|
||||
|
||||
// Since returns time since the time in i.
|
||||
func (i *IntervalClock) Since(ts time.Time) time.Duration {
|
||||
return i.Time.Sub(ts)
|
||||
}
|
||||
|
||||
// After is unimplemented, will panic.
|
||||
// TODO: make interval clock use FakeClock so this can be implemented.
|
||||
func (*IntervalClock) After(d time.Duration) <-chan time.Time {
|
||||
panic("IntervalClock doesn't implement After")
|
||||
}
|
||||
|
||||
// NewTimer is unimplemented, will panic.
|
||||
// TODO: make interval clock use FakeClock so this can be implemented.
|
||||
func (*IntervalClock) NewTimer(d time.Duration) clock.Timer {
|
||||
panic("IntervalClock doesn't implement NewTimer")
|
||||
}
|
||||
|
||||
// Tick is unimplemented, will panic.
|
||||
// TODO: make interval clock use FakeClock so this can be implemented.
|
||||
func (*IntervalClock) Tick(d time.Duration) <-chan time.Time {
|
||||
panic("IntervalClock doesn't implement Tick")
|
||||
}
|
||||
|
||||
// Sleep is unimplemented, will panic.
|
||||
func (*IntervalClock) Sleep(d time.Duration) {
|
||||
panic("IntervalClock doesn't implement Sleep")
|
||||
}
|
||||
|
||||
var _ = clock.Timer(&fakeTimer{})
|
||||
|
||||
// fakeTimer implements clock.Timer based on a FakeClock.
|
||||
type fakeTimer struct {
|
||||
fakeClock *FakeClock
|
||||
waiter fakeClockWaiter
|
||||
}
|
||||
|
||||
// C returns the channel that notifies when this timer has fired.
|
||||
func (f *fakeTimer) C() <-chan time.Time {
|
||||
return f.waiter.destChan
|
||||
}
|
||||
|
||||
// Stop stops the timer and returns true if the timer has not yet fired, or false otherwise.
|
||||
func (f *fakeTimer) Stop() bool {
|
||||
f.fakeClock.lock.Lock()
|
||||
defer f.fakeClock.lock.Unlock()
|
||||
|
||||
newWaiters := make([]*fakeClockWaiter, 0, len(f.fakeClock.waiters))
|
||||
for i := range f.fakeClock.waiters {
|
||||
w := f.fakeClock.waiters[i]
|
||||
if w != &f.waiter {
|
||||
newWaiters = append(newWaiters, w)
|
||||
}
|
||||
}
|
||||
|
||||
f.fakeClock.waiters = newWaiters
|
||||
|
||||
return !f.waiter.fired
|
||||
}
|
||||
|
||||
// Reset resets the timer to the fake clock's "now" + d. It returns true if the timer has not yet
|
||||
// fired, or false otherwise.
|
||||
func (f *fakeTimer) Reset(d time.Duration) bool {
|
||||
f.fakeClock.lock.Lock()
|
||||
defer f.fakeClock.lock.Unlock()
|
||||
|
||||
active := !f.waiter.fired
|
||||
|
||||
f.waiter.fired = false
|
||||
f.waiter.targetTime = f.fakeClock.time.Add(d)
|
||||
|
||||
var isWaiting bool
|
||||
for i := range f.fakeClock.waiters {
|
||||
w := f.fakeClock.waiters[i]
|
||||
if w == &f.waiter {
|
||||
isWaiting = true
|
||||
break
|
||||
}
|
||||
}
|
||||
if !isWaiting {
|
||||
f.fakeClock.waiters = append(f.fakeClock.waiters, &f.waiter)
|
||||
}
|
||||
|
||||
return active
|
||||
}
|
1
vendor/modules.txt
vendored
1
vendor/modules.txt
vendored
@ -2269,6 +2269,7 @@ k8s.io/system-validators/validators
|
||||
## explicit
|
||||
k8s.io/utils/buffer
|
||||
k8s.io/utils/clock
|
||||
k8s.io/utils/clock/testing
|
||||
k8s.io/utils/exec
|
||||
k8s.io/utils/exec/testing
|
||||
k8s.io/utils/inotify
|
||||
|
Loading…
Reference in New Issue
Block a user