Merge pull request #104212 from MikeSpreitzer/event-clock-cleanup2

Some cleanup of the package for event clocks
This commit is contained in:
Kubernetes Prow Robot 2021-08-09 00:29:31 -07:00 committed by GitHub
commit 4023eb77a3
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 65 additions and 69 deletions

View File

@ -24,12 +24,12 @@ import (
"k8s.io/apiserver/pkg/server/mux"
"k8s.io/apiserver/pkg/util/flowcontrol/counter"
fq "k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing"
fairqueuingclock "k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/clock"
"k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/eventclock"
fqs "k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/queueset"
"k8s.io/apiserver/pkg/util/flowcontrol/metrics"
kubeinformers "k8s.io/client-go/informers"
"k8s.io/klog/v2"
utilclock "k8s.io/utils/clock"
"k8s.io/utils/clock"
flowcontrol "k8s.io/api/flowcontrol/v1beta1"
flowcontrolclient "k8s.io/client-go/kubernetes/typed/flowcontrol/v1beta1"
@ -84,7 +84,7 @@ func New(
requestWaitLimit time.Duration,
) Interface {
grc := counter.NoOp{}
clk := fairqueuingclock.RealEventClock{}
clk := eventclock.Real{}
return NewTestable(TestableConfig{
Name: "Controller",
Clock: clk,
@ -105,7 +105,7 @@ type TestableConfig struct {
Name string
// Clock to use in timing deliberate delays
Clock utilclock.PassiveClock
Clock clock.PassiveClock
// AsFieldManager is the string to use in the metadata for
// server-side apply. Normally this is

View File

@ -14,7 +14,7 @@ See the License for the specific language governing permissions and
limitations under the License.
*/
package clock
package eventclock
import (
"time"
@ -31,7 +31,7 @@ type EventFunc func(time.Time)
// advanced. The timing paradigm is invoking EventFuncs rather than
// synchronizing through channels, so that the fake clock has a handle
// on when associated activity is done.
type EventClock interface {
type Interface interface {
baseclock.PassiveClock
// Sleep returns after the given duration (or more).

View File

@ -14,7 +14,7 @@ See the License for the specific language governing permissions and
limitations under the License.
*/
package clock
package eventclock
import (
"time"
@ -23,12 +23,14 @@ import (
)
// RealEventClock fires event on real world time
type RealEventClock struct {
type Real struct {
clock.RealClock
}
var _ Interface = Real{}
// EventAfterDuration schedules an EventFunc
func (RealEventClock) EventAfterDuration(f EventFunc, d time.Duration) {
func (Real) EventAfterDuration(f EventFunc, d time.Duration) {
ch := time.After(d)
go func() {
t := <-ch
@ -37,6 +39,6 @@ func (RealEventClock) EventAfterDuration(f EventFunc, d time.Duration) {
}
// EventAfterTime schedules an EventFunc
func (r RealEventClock) EventAfterTime(f EventFunc, t time.Time) {
func (r Real) EventAfterTime(f EventFunc, t time.Time) {
r.EventAfterDuration(f, time.Until(t))
}

View File

@ -14,7 +14,7 @@ See the License for the specific language governing permissions and
limitations under the License.
*/
package clock
package eventclock
import (
"math/rand"
@ -24,7 +24,7 @@ import (
)
func TestRealEventClock(t *testing.T) {
ec := RealEventClock{}
ec := Real{}
var numDone int32
now := ec.Now()
const batchSize = 100

View File

@ -22,12 +22,12 @@ import (
"testing"
"time"
testclock "k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/clock/testing"
testclock "k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/testing/eventclock"
)
func TestLockingWriteOnce(t *testing.T) {
now := time.Now()
clock, counter := testclock.NewFakeEventClock(now, 0, nil)
clock, counter := testclock.NewFake(now, 0, nil)
var lock sync.Mutex
wr := NewWriteOnce(&lock, counter)
var gots int32

View File

@ -27,7 +27,7 @@ import (
"k8s.io/apiserver/pkg/util/flowcontrol/counter"
"k8s.io/apiserver/pkg/util/flowcontrol/debug"
fq "k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing"
fairqueuingclock "k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/clock"
"k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/eventclock"
"k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/promise"
"k8s.io/apiserver/pkg/util/flowcontrol/metrics"
fqrequest "k8s.io/apiserver/pkg/util/flowcontrol/request"
@ -37,7 +37,6 @@ import (
// The following hack is needed to work around a tooling deficiency.
// Packages imported only for test code are not included in vendor.
// See https://kubernetes.slack.com/archives/C0EG7JC6T/p1626985671458800?thread_ts=1626983387.450800&cid=C0EG7JC6T
// The need for this hack will be removed when we make queueset use an EventClock rather than a PassiveClock.
_ "k8s.io/utils/clock/testing"
)
@ -47,7 +46,7 @@ const nsTimeFmt = "2006-01-02 15:04:05.000000000"
// queueSetFactory makes QueueSet objects.
type queueSetFactory struct {
counter counter.GoRoutineCounter
clock fairqueuingclock.EventClock
clock eventclock.Interface
}
// `*queueSetCompleter` implements QueueSetCompleter. Exactly one of
@ -70,7 +69,7 @@ type queueSetCompleter struct {
// not end in "Locked" either acquires the lock or does not care about
// locking.
type queueSet struct {
clock fairqueuingclock.EventClock
clock eventclock.Interface
counter counter.GoRoutineCounter
estimatedServiceTime float64
obsPair metrics.TimedObserverPair
@ -120,7 +119,7 @@ type queueSet struct {
}
// NewQueueSetFactory creates a new QueueSetFactory object
func NewQueueSetFactory(c fairqueuingclock.EventClock, counter counter.GoRoutineCounter) fq.QueueSetFactory {
func NewQueueSetFactory(c eventclock.Interface, counter counter.GoRoutineCounter) fq.QueueSetFactory {
return &queueSetFactory{
counter: counter,
clock: c,

View File

@ -31,8 +31,8 @@ import (
"k8s.io/apiserver/pkg/util/flowcontrol/counter"
fq "k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing"
fqclocktest "k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/clock/testing"
test "k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/testing"
testeventclock "k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/testing/eventclock"
"k8s.io/apiserver/pkg/util/flowcontrol/metrics"
fcrequest "k8s.io/apiserver/pkg/util/flowcontrol/request"
"k8s.io/klog/v2"
@ -133,7 +133,7 @@ type uniformScenario struct {
expectAllRequests bool
evalInqueueMetrics, evalExecutingMetrics bool
rejectReason string
clk *fqclocktest.FakeEventClock
clk *testeventclock.Fake
counter counter.GoRoutineCounter
}
@ -211,16 +211,10 @@ func (ust *uniformScenarioThread) start() {
ust.uss.clk.EventAfterDuration(ust.genCallK(0), initialDelay)
}
// generates an EventFunc that forks a goroutine to do call k
// generates an EventFunc that does call k
func (ust *uniformScenarioThread) genCallK(k int) func(time.Time) {
return func(time.Time) {
// As an EventFunc, this has to return without waiting
// for time to pass, and so cannot do callK(k) itself.
ust.uss.counter.Add(1)
go func() {
ust.callK(k)
ust.uss.counter.Add(-1)
}()
ust.callK(k)
}
}
@ -367,7 +361,7 @@ func init() {
func TestNoRestraint(t *testing.T) {
metrics.Register()
now := time.Now()
clk, counter := fqclocktest.NewFakeEventClock(now, 0, nil)
clk, counter := testeventclock.NewFake(now, 0, nil)
nrc, err := test.NewNoRestraintFactory().BeginConstruction(fq.QueuingConfig{}, newObserverPair(clk))
if err != nil {
t.Fatal(err)
@ -393,7 +387,7 @@ func TestUniformFlowsHandSize1(t *testing.T) {
metrics.Register()
now := time.Now()
clk, counter := fqclocktest.NewFakeEventClock(now, 0, nil)
clk, counter := testeventclock.NewFake(now, 0, nil)
qsf := NewQueueSetFactory(clk, counter)
qCfg := fq.QueuingConfig{
Name: "TestUniformFlowsHandSize1",
@ -430,7 +424,7 @@ func TestUniformFlowsHandSize3(t *testing.T) {
metrics.Register()
now := time.Now()
clk, counter := fqclocktest.NewFakeEventClock(now, 0, nil)
clk, counter := testeventclock.NewFake(now, 0, nil)
qsf := NewQueueSetFactory(clk, counter)
qCfg := fq.QueuingConfig{
Name: "TestUniformFlowsHandSize3",
@ -466,7 +460,7 @@ func TestDifferentFlowsExpectEqual(t *testing.T) {
metrics.Register()
now := time.Now()
clk, counter := fqclocktest.NewFakeEventClock(now, 0, nil)
clk, counter := testeventclock.NewFake(now, 0, nil)
qsf := NewQueueSetFactory(clk, counter)
qCfg := fq.QueuingConfig{
Name: "DiffFlowsExpectEqual",
@ -503,7 +497,7 @@ func TestDifferentFlowsExpectUnequal(t *testing.T) {
metrics.Register()
now := time.Now()
clk, counter := fqclocktest.NewFakeEventClock(now, 0, nil)
clk, counter := testeventclock.NewFake(now, 0, nil)
qsf := NewQueueSetFactory(clk, counter)
qCfg := fq.QueuingConfig{
Name: "DiffFlowsExpectUnequal",
@ -540,7 +534,7 @@ func TestWindup(t *testing.T) {
metrics.Register()
now := time.Now()
clk, counter := fqclocktest.NewFakeEventClock(now, 0, nil)
clk, counter := testeventclock.NewFake(now, 0, nil)
qsf := NewQueueSetFactory(clk, counter)
qCfg := fq.QueuingConfig{
Name: "TestWindup",
@ -576,7 +570,7 @@ func TestDifferentFlowsWithoutQueuing(t *testing.T) {
metrics.Register()
now := time.Now()
clk, counter := fqclocktest.NewFakeEventClock(now, 0, nil)
clk, counter := testeventclock.NewFake(now, 0, nil)
qsf := NewQueueSetFactory(clk, counter)
qCfg := fq.QueuingConfig{
Name: "TestDifferentFlowsWithoutQueuing",
@ -609,7 +603,7 @@ func TestTimeout(t *testing.T) {
metrics.Register()
now := time.Now()
clk, counter := fqclocktest.NewFakeEventClock(now, 0, nil)
clk, counter := testeventclock.NewFake(now, 0, nil)
qsf := NewQueueSetFactory(clk, counter)
qCfg := fq.QueuingConfig{
Name: "TestTimeout",
@ -645,7 +639,7 @@ func TestContextCancel(t *testing.T) {
metrics.Register()
metrics.Reset()
now := time.Now()
clk, counter := fqclocktest.NewFakeEventClock(now, 0, nil)
clk, counter := testeventclock.NewFake(now, 0, nil)
qsf := NewQueueSetFactory(clk, counter)
qCfg := fq.QueuingConfig{
Name: "TestContextCancel",
@ -724,7 +718,7 @@ func TestTotalRequestsExecutingWithPanic(t *testing.T) {
metrics.Register()
metrics.Reset()
now := time.Now()
clk, counter := fqclocktest.NewFakeEventClock(now, 0, nil)
clk, counter := testeventclock.NewFake(now, 0, nil)
qsf := NewQueueSetFactory(clk, counter)
qCfg := fq.QueuingConfig{
Name: "TestTotalRequestsExecutingWithPanic",

View File

@ -14,7 +14,7 @@ See the License for the specific language governing permissions and
limitations under the License.
*/
package testing
package eventclock
import (
"container/heap"
@ -27,7 +27,7 @@ import (
baseclocktest "k8s.io/utils/clock/testing"
"k8s.io/apiserver/pkg/util/flowcontrol/counter"
"k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/clock"
"k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/eventclock"
"k8s.io/klog/v2"
)
@ -65,9 +65,9 @@ func (wgc *waitGroupCounter) Wait() {
wgc.wg.Wait()
}
// FakeEventClock is one whose time does not pass implicitly but
// Fake is one whose time does not pass implicitly but
// rather is explicitly set by invocations of its SetTime method.
// Each FakeEventClock has an associated GoRoutineCounter that is
// Each Fake has an associated GoRoutineCounter that is
// used to track associated activity.
// For the EventAfterDuration and EventAfterTime methods,
// the clock itself counts the start and stop of the EventFunc
@ -75,10 +75,7 @@ func (wgc *waitGroupCounter) Wait() {
// resume internal to the EventFunc.
// The Sleep method must only be invoked from a goroutine that is
// counted in that GoRoutineCounter.
// The SetTime method does not return until all the triggered
// EventFuncs return. Consequently, an EventFunc given to a method
// of this clock must not wait for this clock to advance.
type FakeEventClock struct {
type Fake struct {
baseclocktest.FakePassiveClock
// waiters is a heap of waiting work, sorted by time
@ -100,21 +97,23 @@ type FakeEventClock struct {
rand *rand.Rand
}
var _ eventclock.Interface = &Fake{}
type eventWaiterHeap []eventWaiter
var _ heap.Interface = (*eventWaiterHeap)(nil)
type eventWaiter struct {
targetTime time.Time
f clock.EventFunc
f eventclock.EventFunc
}
// NewFakeEventClock constructor. The given `r *rand.Rand` must
// NewFake constructs a new fake event clock. The given `r *rand.Rand` must
// henceforth not be used for any other purpose. If `r` is nil then a
// fresh one will be constructed, seeded with the current real time.
// The clientWG can be `nil` and if not is used to let Run know about
// additional work that has to complete before time can advance.
func NewFakeEventClock(t time.Time, fuzz time.Duration, r *rand.Rand) (*FakeEventClock, counter.GoRoutineCounter) {
func NewFake(t time.Time, fuzz time.Duration, r *rand.Rand) (*Fake, counter.GoRoutineCounter) {
grc := &waitGroupCounter{}
if r == nil {
@ -123,7 +122,7 @@ func NewFakeEventClock(t time.Time, fuzz time.Duration, r *rand.Rand) (*FakeEven
r.Uint64()
r.Uint64()
}
return &FakeEventClock{
return &Fake{
FakePassiveClock: *baseclocktest.NewFakePassiveClock(t),
clientWG: grc,
fuzz: fuzz,
@ -133,7 +132,7 @@ func NewFakeEventClock(t time.Time, fuzz time.Duration, r *rand.Rand) (*FakeEven
// GetNextTime returns the next time at which there is work scheduled,
// and a bool indicating whether there is any such time
func (fec *FakeEventClock) GetNextTime() (time.Time, bool) {
func (fec *Fake) GetNextTime() (time.Time, bool) {
fec.waitersLock.RLock()
defer fec.waitersLock.RUnlock()
if len(fec.waiters) > 0 {
@ -147,7 +146,7 @@ func (fec *FakeEventClock) GetNextTime() (time.Time, bool) {
// nil and the next time would exceed the limit. The associated
// GoRoutineCounter gates the advancing of time. That is,
// time is not advanced until all the associated work is finished.
func (fec *FakeEventClock) Run(limit *time.Time) {
func (fec *Fake) Run(limit *time.Time) {
for {
fec.clientWG.Wait()
t, ok := fec.GetNextTime()
@ -161,7 +160,7 @@ func (fec *FakeEventClock) Run(limit *time.Time) {
// SetTime sets the time and runs to completion all events that should
// be started by the given time --- including any further events they
// schedule
func (fec *FakeEventClock) SetTime(t time.Time) {
func (fec *Fake) SetTime(t time.Time) {
fec.FakePassiveClock.SetTime(t)
for {
foundSome := false
@ -172,27 +171,29 @@ func (fec *FakeEventClock) SetTime(t time.Time) {
// events to run at that or an earlier time.
// Events should not advance the clock. But just in case they do...
now := fec.Now()
var wg sync.WaitGroup
for len(fec.waiters) > 0 && !now.Before(fec.waiters[0].targetTime) {
ew := heap.Pop(&fec.waiters).(eventWaiter)
wg.Add(1)
go func(f clock.EventFunc) { f(now); wg.Done() }(ew.f)
fec.clientWG.Add(1)
go func(f eventclock.EventFunc, now time.Time) {
f(now)
fec.clientWG.Add(-1)
}(ew.f, now)
foundSome = true
}
wg.Wait()
}()
if !foundSome {
break
}
fec.clientWG.Wait()
}
}
// Sleep returns after the given duration has passed.
// Sleep must only be invoked in a goroutine that is counted
// in the FakeEventClock's associated GoRoutineCounter.
// in the Fake's associated GoRoutineCounter.
// Unlike the base FakeClock's Sleep, this method does not itself advance the clock
// but rather leaves that up to other actors (e.g., Run).
func (fec *FakeEventClock) Sleep(duration time.Duration) {
func (fec *Fake) Sleep(duration time.Duration) {
doneCh := make(chan struct{})
fec.EventAfterDuration(func(time.Time) {
fec.clientWG.Add(1)
@ -204,7 +205,7 @@ func (fec *FakeEventClock) Sleep(duration time.Duration) {
// EventAfterDuration schedules the given function to be invoked once
// the given duration has passed.
func (fec *FakeEventClock) EventAfterDuration(f clock.EventFunc, d time.Duration) {
func (fec *Fake) EventAfterDuration(f eventclock.EventFunc, d time.Duration) {
fec.waitersLock.Lock()
defer fec.waitersLock.Unlock()
now := fec.Now()
@ -214,7 +215,7 @@ func (fec *FakeEventClock) EventAfterDuration(f clock.EventFunc, d time.Duration
// EventAfterTime schedules the given function to be invoked once
// the given time has arrived.
func (fec *FakeEventClock) EventAfterTime(f clock.EventFunc, t time.Time) {
func (fec *Fake) EventAfterTime(f eventclock.EventFunc, t time.Time) {
fec.waitersLock.Lock()
defer fec.waitersLock.Unlock()
fd := time.Duration(float32(fec.fuzz) * fec.rand.Float32())

View File

@ -14,7 +14,7 @@ See the License for the specific language governing permissions and
limitations under the License.
*/
package testing
package eventclock
import (
"math/rand"
@ -22,11 +22,11 @@ import (
"testing"
"time"
"k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/clock"
"k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/eventclock"
)
type TestableEventClock interface {
clock.EventClock
eventclock.Interface
SetTime(time.Time)
Run(*time.Time)
}
@ -122,10 +122,10 @@ func exerciseSettablePassiveClock(t *testing.T, pc TestableEventClock) {
}
}
func TestFakeEventClock(t *testing.T) {
func TestFake(t *testing.T) {
startTime := time.Now()
fec, _ := NewFakeEventClock(startTime, 0, nil)
fec, _ := NewFake(startTime, 0, nil)
exerciseTestableEventClock(t, fec, 0)
fec, _ = NewFakeEventClock(startTime, time.Second, nil)
fec, _ = NewFake(startTime, time.Second, nil)
exerciseTestableEventClock(t, fec, time.Second)
}

2
vendor/modules.txt vendored
View File

@ -1548,7 +1548,7 @@ k8s.io/apiserver/pkg/util/flowcontrol
k8s.io/apiserver/pkg/util/flowcontrol/counter
k8s.io/apiserver/pkg/util/flowcontrol/debug
k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing
k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/clock
k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/eventclock
k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/promise
k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/queueset
k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/testing