diff --git a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/clock/interface.go b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/clock/interface.go new file mode 100644 index 00000000000..9dbb658a835 --- /dev/null +++ b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/clock/interface.go @@ -0,0 +1,47 @@ +/* +Copyright 2021 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package clock + +import ( + "time" + + baseclock "k8s.io/utils/clock" +) + +// EventFunc does some work that needs to be done at or after the +// given time. +type EventFunc func(time.Time) + +// EventClock is an active clock abstraction for use in code that is +// testable with a fake clock that itself determines how time may be +// advanced. The timing paradigm is invoking EventFuncs rather than +// synchronizing through channels, so that the fake clock has a handle +// on when associated activity is done. +type EventClock interface { + baseclock.PassiveClock + + // Sleep returns after the given duration (or more). + Sleep(d time.Duration) + + // EventAfterDuration invokes the given EventFunc after the given duration (or more), + // passing the time when the invocation was launched. + EventAfterDuration(f EventFunc, d time.Duration) + + // EventAfterTime invokes the given EventFunc at the given time or later, + // passing the time when the invocation was launched. + EventAfterTime(f EventFunc, t time.Time) +} diff --git a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/clock/real.go b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/clock/real.go new file mode 100644 index 00000000000..1ad1a34d4b3 --- /dev/null +++ b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/clock/real.go @@ -0,0 +1,42 @@ +/* +Copyright 2021 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package clock + +import ( + "time" + + "k8s.io/utils/clock" +) + +// RealEventClock fires event on real world time +type RealEventClock struct { + clock.RealClock +} + +// EventAfterDuration schedules an EventFunc +func (RealEventClock) EventAfterDuration(f EventFunc, d time.Duration) { + ch := time.After(d) + go func() { + t := <-ch + f(t) + }() +} + +// EventAfterTime schedules an EventFunc +func (r RealEventClock) EventAfterTime(f EventFunc, t time.Time) { + r.EventAfterDuration(f, time.Until(t)) +} diff --git a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/clock/real_event_clock_test.go b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/clock/real_event_clock_test.go new file mode 100644 index 00000000000..8e9331a4254 --- /dev/null +++ b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/clock/real_event_clock_test.go @@ -0,0 +1,68 @@ +/* +Copyright 2021 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package clock + +import ( + "math/rand" + "sync/atomic" + "testing" + "time" +) + +func TestRealEventClock(t *testing.T) { + ec := RealEventClock{} + var numDone int32 + now := ec.Now() + const batchSize = 100 + times := make(chan time.Time, batchSize+1) + try := func(abs bool, d time.Duration) { + f := func(u time.Time) { + realD := ec.Since(now) + atomic.AddInt32(&numDone, 1) + times <- u + if realD < d { + t.Errorf("Asked for %v, got %v", d, realD) + } + } + if abs { + ec.EventAfterTime(f, now.Add(d)) + } else { + ec.EventAfterDuration(f, d) + } + } + try(true, time.Millisecond*3300) + for i := 0; i < batchSize; i++ { + d := time.Duration(rand.Intn(30)-3) * time.Millisecond * 100 + try(i%2 == 0, d) + } + time.Sleep(time.Second * 4) + if atomic.LoadInt32(&numDone) != batchSize+1 { + t.Errorf("Got only %v events", numDone) + } + lastTime := now + for i := 0; i <= batchSize; i++ { + nextTime := <-times + if nextTime.Before(now) { + continue + } + dt := nextTime.Sub(lastTime) / (50 * time.Millisecond) + if dt < 0 { + t.Errorf("Got %s after %s", nextTime, lastTime) + } + lastTime = nextTime + } +} diff --git a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/testing/clock/event_clock.go b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/clock/testing/fake.go similarity index 75% rename from staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/testing/clock/event_clock.go rename to staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/clock/testing/fake.go index c5464986f48..5ef43a2d898 100644 --- a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/testing/clock/event_clock.go +++ b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/clock/testing/fake.go @@ -1,5 +1,5 @@ /* -Copyright 2019 The Kubernetes Authors. +Copyright 2021 The Kubernetes Authors. Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. @@ -14,7 +14,7 @@ See the License for the specific language governing permissions and limitations under the License. */ -package clock +package testing import ( "container/heap" @@ -24,50 +24,14 @@ import ( "sync" "time" - "k8s.io/apimachinery/pkg/util/clock" + baseclocktest "k8s.io/utils/clock/testing" + "k8s.io/apiserver/pkg/util/flowcontrol/counter" + "k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/clock" "k8s.io/klog/v2" ) -// EventFunc does some work that needs to be done at or after the -// given time. After this function returns, associated work may continue -// on other goroutines only if they are counted by the GoRoutineCounter -// of the FakeEventClock handling this EventFunc. -type EventFunc func(time.Time) - -// EventClock fires event on time -type EventClock interface { - clock.PassiveClock - EventAfterDuration(f EventFunc, d time.Duration) - EventAfterTime(f EventFunc, t time.Time) -} - -// RealEventClock fires event on real world time -type RealEventClock struct { - clock.RealClock -} - -// EventAfterDuration schedules an EventFunc -func (RealEventClock) EventAfterDuration(f EventFunc, d time.Duration) { - ch := time.After(d) - go func() { - t := <-ch - f(t) - }() -} - -// EventAfterTime schedules an EventFunc -func (r RealEventClock) EventAfterTime(f EventFunc, t time.Time) { - now := time.Now() - d := t.Sub(now) - if d <= 0 { - go f(now) - } else { - r.EventAfterDuration(f, d) - } -} - -// waitGroupCounter is a wait group used for a GoRoutine Counter. This private +// waitGroupCounter is a wait group used for a GoRoutineCounter. This private // type is used to disallow direct waitGroup access type waitGroupCounter struct { wg sync.WaitGroup @@ -102,9 +66,20 @@ func (wgc *waitGroupCounter) Wait() { } // FakeEventClock is one whose time does not pass implicitly but -// rather is explicitly set by invocations of its SetTime method +// rather is explicitly set by invocations of its SetTime method. +// Each FakeEventClock has an associated GoRoutineCounter that is +// used to track associated activity. +// For the EventAfterDuration and EventAfterTime methods, +// the clock itself counts the start and stop of the EventFunc +// and the client is responsible for counting any suspend and +// resume internal to the EventFunc. +// The Sleep method must only be invoked from a goroutine that is +// counted in that GoRoutineCounter. +// The SetTime method does not return until all the triggered +// EventFuncs return. Consequently, an EventFunc given to a method +// of this clock must not wait for this clock to advance. type FakeEventClock struct { - clock.FakePassiveClock + baseclocktest.FakePassiveClock // waiters is a heap of waiting work, sorted by time waiters eventWaiterHeap @@ -131,7 +106,7 @@ var _ heap.Interface = (*eventWaiterHeap)(nil) type eventWaiter struct { targetTime time.Time - f EventFunc + f clock.EventFunc } // NewFakeEventClock constructor. The given `r *rand.Rand` must @@ -149,7 +124,7 @@ func NewFakeEventClock(t time.Time, fuzz time.Duration, r *rand.Rand) (*FakeEven r.Uint64() } return &FakeEventClock{ - FakePassiveClock: *clock.NewFakePassiveClock(t), + FakePassiveClock: *baseclocktest.NewFakePassiveClock(t), clientWG: grc, fuzz: fuzz, rand: r, @@ -169,8 +144,9 @@ func (fec *FakeEventClock) GetNextTime() (time.Time, bool) { // Run runs all the events scheduled, and all the events they // schedule, and so on, until there are none scheduled or the limit is not -// nil and the next time would exceed the limit. The clientWG given in -// the constructor gates each advance of time. +// nil and the next time would exceed the limit. The associated +// GoRoutineCounter gates the advancing of time. That is, +// time is not advanced until all the associated work is finished. func (fec *FakeEventClock) Run(limit *time.Time) { for { fec.clientWG.Wait() @@ -200,7 +176,7 @@ func (fec *FakeEventClock) SetTime(t time.Time) { for len(fec.waiters) > 0 && !now.Before(fec.waiters[0].targetTime) { ew := heap.Pop(&fec.waiters).(eventWaiter) wg.Add(1) - go func(f EventFunc) { f(now); wg.Done() }(ew.f) + go func(f clock.EventFunc) { f(now); wg.Done() }(ew.f) foundSome = true } wg.Wait() @@ -211,9 +187,24 @@ func (fec *FakeEventClock) SetTime(t time.Time) { } } +// Sleep returns after the given duration has passed. +// Sleep must only be invoked in a goroutine that is counted +// in the FakeEventClock's associated GoRoutineCounter. +// Unlike the base FakeClock's Sleep, this method does not itself advance the clock +// but rather leaves that up to other actors (e.g., Run). +func (fec *FakeEventClock) Sleep(duration time.Duration) { + doneCh := make(chan struct{}) + fec.EventAfterDuration(func(time.Time) { + fec.clientWG.Add(1) + close(doneCh) + }, duration) + fec.clientWG.Add(-1) + <-doneCh +} + // EventAfterDuration schedules the given function to be invoked once // the given duration has passed. -func (fec *FakeEventClock) EventAfterDuration(f EventFunc, d time.Duration) { +func (fec *FakeEventClock) EventAfterDuration(f clock.EventFunc, d time.Duration) { fec.waitersLock.Lock() defer fec.waitersLock.Unlock() now := fec.Now() @@ -223,7 +214,7 @@ func (fec *FakeEventClock) EventAfterDuration(f EventFunc, d time.Duration) { // EventAfterTime schedules the given function to be invoked once // the given time has arrived. -func (fec *FakeEventClock) EventAfterTime(f EventFunc, t time.Time) { +func (fec *FakeEventClock) EventAfterTime(f clock.EventFunc, t time.Time) { fec.waitersLock.Lock() defer fec.waitersLock.Unlock() fd := time.Duration(float32(fec.fuzz) * fec.rand.Float32()) diff --git a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/testing/clock/event_clock_test.go b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/clock/testing/fake_event_clock_test.go similarity index 66% rename from staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/testing/clock/event_clock_test.go rename to staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/clock/testing/fake_event_clock_test.go index 2cd58ef574c..e6fd9b5f804 100644 --- a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/testing/clock/event_clock_test.go +++ b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/clock/testing/fake_event_clock_test.go @@ -1,5 +1,5 @@ /* -Copyright 2019 The Kubernetes Authors. +Copyright 2021 The Kubernetes Authors. Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. @@ -14,7 +14,7 @@ See the License for the specific language governing permissions and limitations under the License. */ -package clock +package testing import ( "math/rand" @@ -22,23 +22,17 @@ import ( "testing" "time" - "k8s.io/apimachinery/pkg/util/clock" + "k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/clock" ) type TestableEventClock interface { - EventClock + clock.EventClock SetTime(time.Time) Run(*time.Time) } -// settablePassiveClock allows setting current time of a passive clock -type settablePassiveClock interface { - clock.PassiveClock - SetTime(time.Time) -} - func exerciseTestableEventClock(t *testing.T, ec TestableEventClock, fuzz time.Duration) { - exercisePassiveClock(t, ec) + exerciseSettablePassiveClock(t, ec) var numDone int32 now := ec.Now() strictable := true @@ -104,7 +98,8 @@ func exerciseTestableEventClock(t *testing.T, ec TestableEventClock, fuzz time.D } } -func exercisePassiveClock(t *testing.T, pc settablePassiveClock) { +// copied from baseclocktest, because it is not public +func exerciseSettablePassiveClock(t *testing.T, pc TestableEventClock) { t1 := time.Now() t2 := t1.Add(time.Hour) pc.SetTime(t1) @@ -134,50 +129,3 @@ func TestFakeEventClock(t *testing.T) { fec, _ = NewFakeEventClock(startTime, time.Second, nil) exerciseTestableEventClock(t, fec, time.Second) } - -func exerciseEventClock(t *testing.T, ec EventClock, relax func(time.Duration)) { - var numDone int32 - now := ec.Now() - const batchSize = 100 - times := make(chan time.Time, batchSize+1) - try := func(abs bool, d time.Duration) { - f := func(u time.Time) { - realD := ec.Since(now) - atomic.AddInt32(&numDone, 1) - times <- u - if realD < d { - t.Errorf("Asked for %v, got %v", d, realD) - } - } - if abs { - ec.EventAfterTime(f, now.Add(d)) - } else { - ec.EventAfterDuration(f, d) - } - } - try(true, time.Millisecond*3300) - for i := 0; i < batchSize; i++ { - d := time.Duration(rand.Intn(30)-3) * time.Millisecond * 100 - try(i%2 == 0, d) - } - relax(time.Second * 4) - if atomic.LoadInt32(&numDone) != batchSize+1 { - t.Errorf("Got only %v events", numDone) - } - lastTime := now - for i := 0; i <= batchSize; i++ { - nextTime := <-times - if nextTime.Before(now) { - continue - } - dt := nextTime.Sub(lastTime) / (50 * time.Millisecond) - if dt < 0 { - t.Errorf("Got %s after %s", nextTime, lastTime) - } - lastTime = nextTime - } -} - -func TestRealEventClock(t *testing.T) { - exerciseEventClock(t, RealEventClock{}, func(d time.Duration) { time.Sleep(d) }) -} diff --git a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/promise/promise_test.go b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/promise/promise_test.go index ca825560f1d..90eed0a9f92 100644 --- a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/promise/promise_test.go +++ b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/promise/promise_test.go @@ -22,12 +22,12 @@ import ( "testing" "time" - "k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/testing/clock" + testclock "k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/clock/testing" ) func TestLockingWriteOnce(t *testing.T) { now := time.Now() - clock, counter := clock.NewFakeEventClock(now, 0, nil) + clock, counter := testclock.NewFakeEventClock(now, 0, nil) var lock sync.Mutex wr := NewWriteOnce(&lock, counter) var gots int32 diff --git a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/queueset/queueset.go b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/queueset/queueset.go index edf3c18ae07..957417d6124 100644 --- a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/queueset/queueset.go +++ b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/queueset/queueset.go @@ -23,7 +23,8 @@ import ( "sync" "time" - "k8s.io/apimachinery/pkg/util/clock" + "k8s.io/utils/clock" + "k8s.io/apimachinery/pkg/util/runtime" "k8s.io/apiserver/pkg/util/flowcontrol/counter" "k8s.io/apiserver/pkg/util/flowcontrol/debug" @@ -33,6 +34,12 @@ import ( fqrequest "k8s.io/apiserver/pkg/util/flowcontrol/request" "k8s.io/apiserver/pkg/util/shufflesharding" "k8s.io/klog/v2" + + // The following hack is needed to work around a tooling deficiency. + // Packages imported only for test code are not included in vendor. + // See https://kubernetes.slack.com/archives/C0EG7JC6T/p1626985671458800?thread_ts=1626983387.450800&cid=C0EG7JC6T + // The need for this hack will be removed when we make queueset use an EventClock rather than a PassiveClock. + _ "k8s.io/utils/clock/testing" ) const nsTimeFmt = "2006-01-02 15:04:05.000000000" @@ -299,7 +306,7 @@ func (qs *queueSet) StartRequest(ctx context.Context, workEstimate *fqrequest.Wo go func() { defer runtime.HandleCrash() qs.goroutineDoneOrBlocked() - _ = <-doneCh + <-doneCh // Whatever goroutine unblocked the preceding receive MUST // have already either (a) incremented qs.counter or (b) // known that said counter is not actually counting or (c) diff --git a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/queueset/queueset_test.go b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/queueset/queueset_test.go index 84442abc362..6ae72c43cbd 100644 --- a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/queueset/queueset_test.go +++ b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/queueset/queueset_test.go @@ -27,11 +27,12 @@ import ( "testing" "time" - "k8s.io/apimachinery/pkg/util/clock" + "k8s.io/utils/clock" + "k8s.io/apiserver/pkg/util/flowcontrol/counter" fq "k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing" + fqclocktest "k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/clock/testing" test "k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/testing" - testclock "k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/testing/clock" "k8s.io/apiserver/pkg/util/flowcontrol/metrics" fcrequest "k8s.io/apiserver/pkg/util/flowcontrol/request" "k8s.io/klog/v2" @@ -132,7 +133,7 @@ type uniformScenario struct { expectAllRequests bool evalInqueueMetrics, evalExecutingMetrics bool rejectReason string - clk *testclock.FakeEventClock + clk *fqclocktest.FakeEventClock counter counter.GoRoutineCounter } @@ -245,7 +246,7 @@ func (ust *uniformScenarioThread) callK(k int) { atomic.AddInt32(&ust.uss.executions[ust.i], 1) ust.igr.Add(1) ust.uss.clk.EventAfterDuration(ust.genCallK(k+1), ust.uc.execDuration+ust.uc.thinkDuration) - ClockWait(ust.uss.clk, ust.uss.counter, ust.uc.execDuration) + ust.uss.clk.Sleep(ust.uc.execDuration) ust.igr.Add(-1) }) ust.uss.t.Logf("%s: %d, %d, %d got executed=%v, idle2=%v", ust.uss.clk.Now().Format(nsTimeFmt), ust.i, ust.j, k, executed, idle2) @@ -358,16 +359,6 @@ func (uss *uniformScenarioState) finalReview() { } } -func ClockWait(clk *testclock.FakeEventClock, counter counter.GoRoutineCounter, duration time.Duration) { - dunch := make(chan struct{}) - clk.EventAfterDuration(func(time.Time) { - counter.Add(1) - close(dunch) - }, duration) - counter.Add(-1) - <-dunch -} - func init() { klog.InitFlags(nil) } @@ -376,7 +367,7 @@ func init() { func TestNoRestraint(t *testing.T) { metrics.Register() now := time.Now() - clk, counter := testclock.NewFakeEventClock(now, 0, nil) + clk, counter := fqclocktest.NewFakeEventClock(now, 0, nil) nrc, err := test.NewNoRestraintFactory().BeginConstruction(fq.QueuingConfig{}, newObserverPair(clk)) if err != nil { t.Fatal(err) @@ -402,7 +393,7 @@ func TestUniformFlowsHandSize1(t *testing.T) { metrics.Register() now := time.Now() - clk, counter := testclock.NewFakeEventClock(now, 0, nil) + clk, counter := fqclocktest.NewFakeEventClock(now, 0, nil) qsf := NewQueueSetFactory(clk, counter) qCfg := fq.QueuingConfig{ Name: "TestUniformFlowsHandSize1", @@ -439,7 +430,7 @@ func TestUniformFlowsHandSize3(t *testing.T) { metrics.Register() now := time.Now() - clk, counter := testclock.NewFakeEventClock(now, 0, nil) + clk, counter := fqclocktest.NewFakeEventClock(now, 0, nil) qsf := NewQueueSetFactory(clk, counter) qCfg := fq.QueuingConfig{ Name: "TestUniformFlowsHandSize3", @@ -475,7 +466,7 @@ func TestDifferentFlowsExpectEqual(t *testing.T) { metrics.Register() now := time.Now() - clk, counter := testclock.NewFakeEventClock(now, 0, nil) + clk, counter := fqclocktest.NewFakeEventClock(now, 0, nil) qsf := NewQueueSetFactory(clk, counter) qCfg := fq.QueuingConfig{ Name: "DiffFlowsExpectEqual", @@ -512,7 +503,7 @@ func TestDifferentFlowsExpectUnequal(t *testing.T) { metrics.Register() now := time.Now() - clk, counter := testclock.NewFakeEventClock(now, 0, nil) + clk, counter := fqclocktest.NewFakeEventClock(now, 0, nil) qsf := NewQueueSetFactory(clk, counter) qCfg := fq.QueuingConfig{ Name: "DiffFlowsExpectUnequal", @@ -549,7 +540,7 @@ func TestWindup(t *testing.T) { metrics.Register() now := time.Now() - clk, counter := testclock.NewFakeEventClock(now, 0, nil) + clk, counter := fqclocktest.NewFakeEventClock(now, 0, nil) qsf := NewQueueSetFactory(clk, counter) qCfg := fq.QueuingConfig{ Name: "TestWindup", @@ -585,7 +576,7 @@ func TestDifferentFlowsWithoutQueuing(t *testing.T) { metrics.Register() now := time.Now() - clk, counter := testclock.NewFakeEventClock(now, 0, nil) + clk, counter := fqclocktest.NewFakeEventClock(now, 0, nil) qsf := NewQueueSetFactory(clk, counter) qCfg := fq.QueuingConfig{ Name: "TestDifferentFlowsWithoutQueuing", @@ -618,7 +609,7 @@ func TestTimeout(t *testing.T) { metrics.Register() now := time.Now() - clk, counter := testclock.NewFakeEventClock(now, 0, nil) + clk, counter := fqclocktest.NewFakeEventClock(now, 0, nil) qsf := NewQueueSetFactory(clk, counter) qCfg := fq.QueuingConfig{ Name: "TestTimeout", @@ -654,7 +645,7 @@ func TestContextCancel(t *testing.T) { metrics.Register() metrics.Reset() now := time.Now() - clk, counter := testclock.NewFakeEventClock(now, 0, nil) + clk, counter := fqclocktest.NewFakeEventClock(now, 0, nil) qsf := NewQueueSetFactory(clk, counter) qCfg := fq.QueuingConfig{ Name: "TestContextCancel", @@ -733,7 +724,7 @@ func TestTotalRequestsExecutingWithPanic(t *testing.T) { metrics.Register() metrics.Reset() now := time.Now() - clk, counter := testclock.NewFakeEventClock(now, 0, nil) + clk, counter := fqclocktest.NewFakeEventClock(now, 0, nil) qsf := NewQueueSetFactory(clk, counter) qCfg := fq.QueuingConfig{ Name: "TestTotalRequestsExecutingWithPanic", diff --git a/vendor/k8s.io/utils/clock/testing/fake_clock.go b/vendor/k8s.io/utils/clock/testing/fake_clock.go new file mode 100644 index 00000000000..aedb9aa838c --- /dev/null +++ b/vendor/k8s.io/utils/clock/testing/fake_clock.go @@ -0,0 +1,294 @@ +/* +Copyright 2014 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package testing + +import ( + "sync" + "time" + + "k8s.io/utils/clock" +) + +var ( + _ = clock.PassiveClock(&FakePassiveClock{}) + _ = clock.Clock(&FakeClock{}) + _ = clock.Clock(&IntervalClock{}) +) + +// FakePassiveClock implements PassiveClock, but returns an arbitrary time. +type FakePassiveClock struct { + lock sync.RWMutex + time time.Time +} + +// FakeClock implements clock.Clock, but returns an arbitrary time. +type FakeClock struct { + FakePassiveClock + + // waiters are waiting for the fake time to pass their specified time + waiters []*fakeClockWaiter +} + +type fakeClockWaiter struct { + targetTime time.Time + stepInterval time.Duration + skipIfBlocked bool + destChan chan time.Time + fired bool +} + +// NewFakePassiveClock returns a new FakePassiveClock. +func NewFakePassiveClock(t time.Time) *FakePassiveClock { + return &FakePassiveClock{ + time: t, + } +} + +// NewFakeClock constructs a fake clock set to the provided time. +func NewFakeClock(t time.Time) *FakeClock { + return &FakeClock{ + FakePassiveClock: *NewFakePassiveClock(t), + } +} + +// Now returns f's time. +func (f *FakePassiveClock) Now() time.Time { + f.lock.RLock() + defer f.lock.RUnlock() + return f.time +} + +// Since returns time since the time in f. +func (f *FakePassiveClock) Since(ts time.Time) time.Duration { + f.lock.RLock() + defer f.lock.RUnlock() + return f.time.Sub(ts) +} + +// SetTime sets the time on the FakePassiveClock. +func (f *FakePassiveClock) SetTime(t time.Time) { + f.lock.Lock() + defer f.lock.Unlock() + f.time = t +} + +// After is the fake version of time.After(d). +func (f *FakeClock) After(d time.Duration) <-chan time.Time { + f.lock.Lock() + defer f.lock.Unlock() + stopTime := f.time.Add(d) + ch := make(chan time.Time, 1) // Don't block! + f.waiters = append(f.waiters, &fakeClockWaiter{ + targetTime: stopTime, + destChan: ch, + }) + return ch +} + +// NewTimer constructs a fake timer, akin to time.NewTimer(d). +func (f *FakeClock) NewTimer(d time.Duration) clock.Timer { + f.lock.Lock() + defer f.lock.Unlock() + stopTime := f.time.Add(d) + ch := make(chan time.Time, 1) // Don't block! + timer := &fakeTimer{ + fakeClock: f, + waiter: fakeClockWaiter{ + targetTime: stopTime, + destChan: ch, + }, + } + f.waiters = append(f.waiters, &timer.waiter) + return timer +} + +// Tick constructs a fake ticker, akin to time.Tick +func (f *FakeClock) Tick(d time.Duration) <-chan time.Time { + if d <= 0 { + return nil + } + f.lock.Lock() + defer f.lock.Unlock() + tickTime := f.time.Add(d) + ch := make(chan time.Time, 1) // hold one tick + f.waiters = append(f.waiters, &fakeClockWaiter{ + targetTime: tickTime, + stepInterval: d, + skipIfBlocked: true, + destChan: ch, + }) + + return ch +} + +// Step moves the clock by Duration and notifies anyone that's called After, +// Tick, or NewTimer. +func (f *FakeClock) Step(d time.Duration) { + f.lock.Lock() + defer f.lock.Unlock() + f.setTimeLocked(f.time.Add(d)) +} + +// SetTime sets the time. +func (f *FakeClock) SetTime(t time.Time) { + f.lock.Lock() + defer f.lock.Unlock() + f.setTimeLocked(t) +} + +// Actually changes the time and checks any waiters. f must be write-locked. +func (f *FakeClock) setTimeLocked(t time.Time) { + f.time = t + newWaiters := make([]*fakeClockWaiter, 0, len(f.waiters)) + for i := range f.waiters { + w := f.waiters[i] + if !w.targetTime.After(t) { + + if w.skipIfBlocked { + select { + case w.destChan <- t: + w.fired = true + default: + } + } else { + w.destChan <- t + w.fired = true + } + + if w.stepInterval > 0 { + for !w.targetTime.After(t) { + w.targetTime = w.targetTime.Add(w.stepInterval) + } + newWaiters = append(newWaiters, w) + } + + } else { + newWaiters = append(newWaiters, f.waiters[i]) + } + } + f.waiters = newWaiters +} + +// HasWaiters returns true if After has been called on f but not yet satisfied (so you can +// write race-free tests). +func (f *FakeClock) HasWaiters() bool { + f.lock.RLock() + defer f.lock.RUnlock() + return len(f.waiters) > 0 +} + +// Sleep is akin to time.Sleep +func (f *FakeClock) Sleep(d time.Duration) { + f.Step(d) +} + +// IntervalClock implements clock.Clock, but each invocation of Now steps the clock forward the specified duration +type IntervalClock struct { + Time time.Time + Duration time.Duration +} + +// Now returns i's time. +func (i *IntervalClock) Now() time.Time { + i.Time = i.Time.Add(i.Duration) + return i.Time +} + +// Since returns time since the time in i. +func (i *IntervalClock) Since(ts time.Time) time.Duration { + return i.Time.Sub(ts) +} + +// After is unimplemented, will panic. +// TODO: make interval clock use FakeClock so this can be implemented. +func (*IntervalClock) After(d time.Duration) <-chan time.Time { + panic("IntervalClock doesn't implement After") +} + +// NewTimer is unimplemented, will panic. +// TODO: make interval clock use FakeClock so this can be implemented. +func (*IntervalClock) NewTimer(d time.Duration) clock.Timer { + panic("IntervalClock doesn't implement NewTimer") +} + +// Tick is unimplemented, will panic. +// TODO: make interval clock use FakeClock so this can be implemented. +func (*IntervalClock) Tick(d time.Duration) <-chan time.Time { + panic("IntervalClock doesn't implement Tick") +} + +// Sleep is unimplemented, will panic. +func (*IntervalClock) Sleep(d time.Duration) { + panic("IntervalClock doesn't implement Sleep") +} + +var _ = clock.Timer(&fakeTimer{}) + +// fakeTimer implements clock.Timer based on a FakeClock. +type fakeTimer struct { + fakeClock *FakeClock + waiter fakeClockWaiter +} + +// C returns the channel that notifies when this timer has fired. +func (f *fakeTimer) C() <-chan time.Time { + return f.waiter.destChan +} + +// Stop stops the timer and returns true if the timer has not yet fired, or false otherwise. +func (f *fakeTimer) Stop() bool { + f.fakeClock.lock.Lock() + defer f.fakeClock.lock.Unlock() + + newWaiters := make([]*fakeClockWaiter, 0, len(f.fakeClock.waiters)) + for i := range f.fakeClock.waiters { + w := f.fakeClock.waiters[i] + if w != &f.waiter { + newWaiters = append(newWaiters, w) + } + } + + f.fakeClock.waiters = newWaiters + + return !f.waiter.fired +} + +// Reset resets the timer to the fake clock's "now" + d. It returns true if the timer has not yet +// fired, or false otherwise. +func (f *fakeTimer) Reset(d time.Duration) bool { + f.fakeClock.lock.Lock() + defer f.fakeClock.lock.Unlock() + + active := !f.waiter.fired + + f.waiter.fired = false + f.waiter.targetTime = f.fakeClock.time.Add(d) + + var isWaiting bool + for i := range f.fakeClock.waiters { + w := f.fakeClock.waiters[i] + if w == &f.waiter { + isWaiting = true + break + } + } + if !isWaiting { + f.fakeClock.waiters = append(f.fakeClock.waiters, &f.waiter) + } + + return active +} diff --git a/vendor/modules.txt b/vendor/modules.txt index d21f2055617..1ee23acd60c 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -2270,6 +2270,7 @@ k8s.io/system-validators/validators ## explicit k8s.io/utils/buffer k8s.io/utils/clock +k8s.io/utils/clock/testing k8s.io/utils/exec k8s.io/utils/exec/testing k8s.io/utils/inotify