Merge pull request #113222 from MikeSpreitzer/add-borrowing-inputs

Add borrowing inputs
This commit is contained in:
Kubernetes Prow Robot 2022-10-28 02:34:32 -07:00 committed by GitHub
commit e438ea02ec
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 362 additions and 197 deletions

View File

@ -71,21 +71,16 @@ const (
// Borrowing among priority levels will be accomplished by periodically
// adjusting the current concurrency limits (CurrentCLs);
// borrowingAdjustmentPeriod is that period.
borrowingAdjustmentPeriod = 10 * time.Second //nolint:golint,unused
borrowingAdjustmentPeriod = 10 * time.Second
// The input to the borrowing is smoothed seat demand figures.
// Every adjustment period, each priority level's smoothed demand is adjusted
// based on an envelope of that level's recent seat demand. The formula is:
// SmoothSeatDemand := max( EnvelopeSeatDemand,
// seatDemandSmoothingCoefficient * SmoothSeatDemand +
// (1-seatDemandSmoothingCoefficient) * EnvelopeSeatDemand ).
// Qualitatively: this parameter controls the rate at which the smoothed seat demand drifts
// down toward the envelope of seat demand while that is lower.
// The particular number appearing here has the property that half of the
// current smoothed value comes from the smoothed value of 5 minutes ago.
// The input to the seat borrowing is smoothed seat demand figures.
// This constant controls the decay rate of that smoothing,
// as described in the comment on the `seatDemandStats` field of `priorityLevelState`.
// The particular number appearing here has the property that half-life
// of that decay is 5 minutes.
// This is a very preliminary guess at a good value and is likely to be tweaked
// once we get some experience with borrowing.
seatDemandSmoothingCoefficient = 0.977 //nolint:golint,unused
seatDemandSmoothingCoefficient = 0.977
)
// The funcs in this package follow the naming convention that the suffix
@ -217,6 +212,43 @@ type priorityLevelState struct {
// Observer of number of seats occupied throughout execution
execSeatsObs metrics.RatioedGauge
// Integrator of seat demand, reset every CurrentCL adjustment period
seatDemandIntegrator fq.Integrator
// seatDemandStats is derived from periodically examining the seatDemandIntegrator.
// The average, standard deviation, and high watermark come directly from the integrator.
// envelope = avg + stdDev.
// Periodically smoothed gets replaced with `max(envelope, A*smoothed + (1-A)*envelope)`,
// where A is seatDemandSmoothingCoefficient.
seatDemandStats seatDemandStats
}
type seatDemandStats struct {
avg float64
stdDev float64
highWatermark float64
envelope float64
smoothed float64
}
func newSeatDemandStats(val float64) seatDemandStats {
return seatDemandStats{
avg: val,
stdDev: 0,
highWatermark: val,
envelope: val,
smoothed: val,
}
}
func (stats *seatDemandStats) update(obs fq.IntegratorResults) {
stats.avg = obs.Average
stats.stdDev = obs.Deviation
stats.highWatermark = obs.Max
envelope := obs.Average + obs.Deviation
stats.envelope = envelope
stats.smoothed = math.Max(envelope, seatDemandSmoothingCoefficient*stats.smoothed+(1-seatDemandSmoothingCoefficient)*envelope)
}
// NewTestableController is extra flexible to facilitate testing
@ -325,11 +357,25 @@ func (cfgCtlr *configController) Run(stopCh <-chan struct{}) error {
klog.Info("Running API Priority and Fairness config worker")
go wait.Until(cfgCtlr.runWorker, time.Second, stopCh)
klog.Info("Running API Priority and Fairness periodic rebalancing process")
go wait.Until(cfgCtlr.updateBorrowing, borrowingAdjustmentPeriod, stopCh)
<-stopCh
klog.Info("Shutting down API Priority and Fairness config worker")
return nil
}
func (cfgCtlr *configController) updateBorrowing() {
cfgCtlr.lock.Lock()
defer cfgCtlr.lock.Unlock()
for _, plState := range cfgCtlr.priorityLevelStates {
obs := plState.seatDemandIntegrator.Reset()
plState.seatDemandStats.update(obs)
// TODO: set borrowing metrics introduced in https://github.com/kubernetes/enhancements/pull/3391
// TODO: updathe CurrentCL as described in https://github.com/kubernetes/enhancements/tree/master/keps/sig-api-machinery/1040-priority-and-fairness#dispatching
}
}
// runWorker is the logic of the one and only worker goroutine. We
// limit the number to one in order to obviate explicit
// synchronization around access to `cfgCtlr.mostRecentUpdates`.
@ -552,9 +598,13 @@ func (meal *cfgMeal) digestNewPLsLocked(newPLs []*flowcontrol.PriorityLevelConfi
state := meal.cfgCtlr.priorityLevelStates[pl.Name]
if state == nil {
labelValues := []string{pl.Name}
state = &priorityLevelState{reqsGaugePair: metrics.RatioedGaugeVecPhasedElementPair(meal.cfgCtlr.reqsGaugeVec, 1, 1, labelValues), execSeatsObs: meal.cfgCtlr.execSeatsGaugeVec.NewForLabelValuesSafe(0, 1, labelValues)}
state = &priorityLevelState{
reqsGaugePair: metrics.RatioedGaugeVecPhasedElementPair(meal.cfgCtlr.reqsGaugeVec, 1, 1, labelValues),
execSeatsObs: meal.cfgCtlr.execSeatsGaugeVec.NewForLabelValuesSafe(0, 1, labelValues),
seatDemandIntegrator: fq.NewNamedIntegrator(meal.cfgCtlr.clock, pl.Name),
}
}
qsCompleter, err := queueSetCompleterForPL(meal.cfgCtlr.queueSetFactory, state.queues, pl, meal.cfgCtlr.requestWaitLimit, state.reqsGaugePair, state.execSeatsObs)
qsCompleter, err := queueSetCompleterForPL(meal.cfgCtlr.queueSetFactory, state.queues, pl, meal.cfgCtlr.requestWaitLimit, state.reqsGaugePair, state.execSeatsObs, state.seatDemandIntegrator)
if err != nil {
klog.Warningf("Ignoring PriorityLevelConfiguration object %s because its spec (%s) is broken: %s", pl.Name, fcfmt.Fmt(pl.Spec), err)
continue
@ -658,7 +708,7 @@ func (meal *cfgMeal) processOldPLsLocked() {
}
}
var err error
plState.qsCompleter, err = queueSetCompleterForPL(meal.cfgCtlr.queueSetFactory, plState.queues, plState.pl, meal.cfgCtlr.requestWaitLimit, plState.reqsGaugePair, plState.execSeatsObs)
plState.qsCompleter, err = queueSetCompleterForPL(meal.cfgCtlr.queueSetFactory, plState.queues, plState.pl, meal.cfgCtlr.requestWaitLimit, plState.reqsGaugePair, plState.execSeatsObs, plState.seatDemandIntegrator)
if err != nil {
// This can not happen because queueSetCompleterForPL already approved this config
panic(fmt.Sprintf("%s from name=%q spec=%s", err, plName, fcfmt.Fmt(plState.pl.Spec)))
@ -702,6 +752,8 @@ func (meal *cfgMeal) finishQueueSetReconfigsLocked() {
if plState.queues == nil {
klog.V(5).Infof("Introducing queues for priority level %q: config=%s, concurrencyLimit=%d, quiescing=%v (shares=%v, shareSum=%v)", plName, fcfmt.Fmt(plState.pl.Spec), concurrencyLimit, plState.quiescing, plState.pl.Spec.Limited.NominalConcurrencyShares, meal.shareSum)
plState.seatDemandStats = newSeatDemandStats(float64(concurrencyLimit))
// TODO: initialize as described in https://github.com/kubernetes/enhancements/tree/master/keps/sig-api-machinery/1040-priority-and-fairness#dispatching once NominalCL values are implemented
} else {
klog.V(5).Infof("Retaining queues for priority level %q: config=%s, concurrencyLimit=%d, quiescing=%v, numPending=%d (shares=%v, shareSum=%v)", plName, fcfmt.Fmt(plState.pl.Spec), concurrencyLimit, plState.quiescing, plState.numPending, plState.pl.Spec.Limited.NominalConcurrencyShares, meal.shareSum)
}
@ -713,7 +765,7 @@ func (meal *cfgMeal) finishQueueSetReconfigsLocked() {
// given priority level configuration. Returns nil if that config
// does not call for limiting. Returns nil and an error if the given
// object is malformed in a way that is a problem for this package.
func queueSetCompleterForPL(qsf fq.QueueSetFactory, queues fq.QueueSet, pl *flowcontrol.PriorityLevelConfiguration, requestWaitLimit time.Duration, reqsIntPair metrics.RatioedGaugePair, execSeatsObs metrics.RatioedGauge) (fq.QueueSetCompleter, error) {
func queueSetCompleterForPL(qsf fq.QueueSetFactory, queues fq.QueueSet, pl *flowcontrol.PriorityLevelConfiguration, requestWaitLimit time.Duration, reqsIntPair metrics.RatioedGaugePair, execSeatsObs metrics.RatioedGauge, seatDemandIntgrator fq.Integrator) (fq.QueueSetCompleter, error) {
if (pl.Spec.Type == flowcontrol.PriorityLevelEnablementExempt) != (pl.Spec.Limited == nil) {
return nil, errors.New("broken union structure at the top")
}
@ -742,7 +794,7 @@ func queueSetCompleterForPL(qsf fq.QueueSetFactory, queues fq.QueueSet, pl *flow
if queues != nil {
qsc, err = queues.BeginConfigChange(qcQS)
} else {
qsc, err = qsf.BeginConstruction(qcQS, reqsIntPair, execSeatsObs)
qsc, err = qsf.BeginConstruction(qcQS, reqsIntPair, execSeatsObs, seatDemandIntgrator)
}
if err != nil {
err = fmt.Errorf("priority level %q has QueuingConfiguration %#+v, which is invalid: %w", pl.Name, qcAPI, err)
@ -790,17 +842,19 @@ func (meal *cfgMeal) imaginePL(proto *flowcontrol.PriorityLevelConfiguration, re
labelValues := []string{proto.Name}
reqsGaugePair := metrics.RatioedGaugeVecPhasedElementPair(meal.cfgCtlr.reqsGaugeVec, 1, 1, labelValues)
execSeatsObs := meal.cfgCtlr.execSeatsGaugeVec.NewForLabelValuesSafe(0, 1, labelValues)
qsCompleter, err := queueSetCompleterForPL(meal.cfgCtlr.queueSetFactory, nil, proto, requestWaitLimit, reqsGaugePair, execSeatsObs)
seatDemandIntegrator := fq.NewNamedIntegrator(meal.cfgCtlr.clock, proto.Name)
qsCompleter, err := queueSetCompleterForPL(meal.cfgCtlr.queueSetFactory, nil, proto, requestWaitLimit, reqsGaugePair, execSeatsObs, seatDemandIntegrator)
if err != nil {
// This can not happen because proto is one of the mandatory
// objects and these are not erroneous
panic(err)
}
meal.newPLStates[proto.Name] = &priorityLevelState{
pl: proto,
qsCompleter: qsCompleter,
reqsGaugePair: reqsGaugePair,
execSeatsObs: execSeatsObs,
pl: proto,
qsCompleter: qsCompleter,
reqsGaugePair: reqsGaugePair,
execSeatsObs: execSeatsObs,
seatDemandIntegrator: seatDemandIntegrator,
}
if proto.Spec.Limited != nil {
meal.shareSum += float64(proto.Spec.Limited.NominalConcurrencyShares)

View File

@ -105,7 +105,7 @@ type ctlrTestRequest struct {
descr1, descr2 interface{}
}
func (cts *ctlrTestState) BeginConstruction(qc fq.QueuingConfig, rip metrics.RatioedGaugePair, eso metrics.RatioedGauge) (fq.QueueSetCompleter, error) {
func (cts *ctlrTestState) BeginConstruction(qc fq.QueuingConfig, rip metrics.RatioedGaugePair, eso metrics.RatioedGauge, sdi fq.Integrator) (fq.QueueSetCompleter, error) {
return ctlrTestQueueSetCompleter{cts, nil, qc}, nil
}

View File

@ -25,10 +25,11 @@ import (
// QueueSetDump is an instant dump of queue-set.
type QueueSetDump struct {
Queues []QueueDump
Waiting int
Executing int
SeatsInUse int
Queues []QueueDump
Waiting int
Executing int
SeatsInUse int
SeatsWaiting int
}
// QueueDump is an instant dump of one queue in a queue-set.

View File

@ -53,6 +53,7 @@ func (x *IntegratorResults) Equal(y *IntegratorResults) bool {
}
type integrator struct {
name string
clock clock.PassiveClock
sync.Mutex
lastTime time.Time
@ -61,9 +62,10 @@ type integrator struct {
min, max float64
}
// NewIntegrator makes one that uses the given clock
func NewIntegrator(clock clock.PassiveClock) Integrator {
// NewNamedIntegrator makes one that uses the given clock and name
func NewNamedIntegrator(clock clock.PassiveClock, name string) Integrator {
return &integrator{
name: name,
clock: clock,
lastTime: clock.Now(),
}

View File

@ -27,7 +27,7 @@ import (
func TestIntegrator(t *testing.T) {
now := time.Now()
clk := testclock.NewFakeClock(now)
igr := NewIntegrator(clk)
igr := NewNamedIntegrator(clk, "testee")
igr.Add(3)
clk.Step(time.Second)
results := igr.GetResults()

View File

@ -35,7 +35,9 @@ type QueueSetFactory interface {
// The RatioedGaugePair observes number of requests,
// execution covering just the regular phase.
// The RatioedGauge observes number of seats occupied through all phases of execution.
BeginConstruction(QueuingConfig, metrics.RatioedGaugePair, metrics.RatioedGauge) (QueueSetCompleter, error)
// The Integrator observes the seat demand (executing + queued seats), and
// the queueset does not read or reset the Integrator.
BeginConstruction(QueuingConfig, metrics.RatioedGaugePair, metrics.RatioedGauge, Integrator) (QueueSetCompleter, error)
}
// QueueSetCompleter finishes the two-step process of creating or

View File

@ -60,12 +60,13 @@ type promiseFactoryFactory func(*queueSet) promiseFactory
// `*queueSetCompleter` implements QueueSetCompleter. Exactly one of
// the fields `factory` and `theSet` is non-nil.
type queueSetCompleter struct {
factory *queueSetFactory
reqsGaugePair metrics.RatioedGaugePair
execSeatsGauge metrics.RatioedGauge
theSet *queueSet
qCfg fq.QueuingConfig
dealer *shufflesharding.Dealer
factory *queueSetFactory
reqsGaugePair metrics.RatioedGaugePair
execSeatsGauge metrics.RatioedGauge
seatDemandIntegrator fq.Integrator
theSet *queueSet
qCfg fq.QueuingConfig
dealer *shufflesharding.Dealer
}
// queueSet implements the Fair Queuing for Server Requests technique
@ -93,6 +94,8 @@ type queueSet struct {
execSeatsGauge metrics.RatioedGauge // for all phases of execution
seatDemandIntegrator fq.Integrator
promiseFactory promiseFactory
lock sync.Mutex
@ -139,6 +142,10 @@ type queueSet struct {
// request(s) that are currently executing in this queueset.
totSeatsInUse int
// totSeatsWaiting is the sum, over all the waiting requests, of their
// max width.
totSeatsWaiting int
// enqueues is the number of requests that have ever been enqueued
enqueues int
}
@ -156,17 +163,18 @@ func newTestableQueueSetFactory(c eventclock.Interface, promiseFactoryFactory pr
}
}
func (qsf *queueSetFactory) BeginConstruction(qCfg fq.QueuingConfig, reqsGaugePair metrics.RatioedGaugePair, execSeatsGauge metrics.RatioedGauge) (fq.QueueSetCompleter, error) {
func (qsf *queueSetFactory) BeginConstruction(qCfg fq.QueuingConfig, reqsGaugePair metrics.RatioedGaugePair, execSeatsGauge metrics.RatioedGauge, seatDemandIntegrator fq.Integrator) (fq.QueueSetCompleter, error) {
dealer, err := checkConfig(qCfg)
if err != nil {
return nil, err
}
return &queueSetCompleter{
factory: qsf,
reqsGaugePair: reqsGaugePair,
execSeatsGauge: execSeatsGauge,
qCfg: qCfg,
dealer: dealer}, nil
factory: qsf,
reqsGaugePair: reqsGaugePair,
execSeatsGauge: execSeatsGauge,
seatDemandIntegrator: seatDemandIntegrator,
qCfg: qCfg,
dealer: dealer}, nil
}
// checkConfig returns a non-nil Dealer if the config is valid and
@ -191,6 +199,7 @@ func (qsc *queueSetCompleter) Complete(dCfg fq.DispatchingConfig) fq.QueueSet {
estimatedServiceDuration: 3 * time.Millisecond,
reqsGaugePair: qsc.reqsGaugePair,
execSeatsGauge: qsc.execSeatsGauge,
seatDemandIntegrator: qsc.seatDemandIntegrator,
qCfg: qsc.qCfg,
currentR: 0,
lastRealTime: qsc.factory.clock.Now(),
@ -408,10 +417,12 @@ func (req *request) wait() (bool, bool) {
if req.removeFromQueueLocked() != nil {
defer qs.boundNextDispatchLocked(queue)
qs.totRequestsWaiting--
qs.totSeatsWaiting -= req.MaxSeats()
metrics.AddReject(req.ctx, qs.qCfg.Name, req.fsName, "cancelled")
metrics.AddRequestsInQueues(req.ctx, qs.qCfg.Name, req.fsName, -1)
req.NoteQueued(false)
qs.reqsGaugePair.RequestsWaiting.Add(-1)
qs.seatDemandIntegrator.Set(float64(qs.totSeatsInUse + qs.totSeatsWaiting))
}
return false, qs.isIdleLocked()
}
@ -597,6 +608,7 @@ func (qs *queueSet) shuffleShardLocked(hashValue uint64, descr1, descr2 interfac
// past the requestWaitLimit
func (qs *queueSet) removeTimedOutRequestsFromQueueToBoundLocked(queue *queue, fsName string) {
timeoutCount := 0
disqueueSeats := 0
now := qs.clock.Now()
reqs := queue.requests
// reqs are sorted oldest -> newest
@ -609,6 +621,7 @@ func (qs *queueSet) removeTimedOutRequestsFromQueueToBoundLocked(queue *queue, f
if arrivalLimit.After(req.arrivalTime) {
if req.decision.Set(decisionReject) && req.removeFromQueueLocked() != nil {
timeoutCount++
disqueueSeats += req.MaxSeats()
req.NoteQueued(false)
metrics.AddRequestsInQueues(req.ctx, qs.qCfg.Name, req.fsName, -1)
}
@ -622,7 +635,9 @@ func (qs *queueSet) removeTimedOutRequestsFromQueueToBoundLocked(queue *queue, f
// remove timed out requests from queue
if timeoutCount > 0 {
qs.totRequestsWaiting -= timeoutCount
qs.totSeatsWaiting -= disqueueSeats
qs.reqsGaugePair.RequestsWaiting.Add(float64(-timeoutCount))
qs.seatDemandIntegrator.Set(float64(qs.totSeatsInUse + qs.totSeatsWaiting))
}
}
@ -657,9 +672,11 @@ func (qs *queueSet) enqueueToBoundLocked(request *request) {
}
request.removeFromQueueLocked = queue.requests.Enqueue(request)
qs.totRequestsWaiting++
qs.totSeatsWaiting += request.MaxSeats()
metrics.AddRequestsInQueues(request.ctx, qs.qCfg.Name, request.fsName, 1)
request.NoteQueued(true)
qs.reqsGaugePair.RequestsWaiting.Add(1)
qs.seatDemandIntegrator.Set(float64(qs.totSeatsInUse + qs.totSeatsWaiting))
}
// dispatchAsMuchAsPossibleLocked does as many dispatches as possible now.
@ -690,6 +707,7 @@ func (qs *queueSet) dispatchSansQueueLocked(ctx context.Context, workEstimate *f
metrics.AddRequestConcurrencyInUse(qs.qCfg.Name, fsName, req.MaxSeats())
qs.reqsGaugePair.RequestsExecuting.Add(1)
qs.execSeatsGauge.Add(float64(req.MaxSeats()))
qs.seatDemandIntegrator.Set(float64(qs.totSeatsInUse + qs.totSeatsWaiting))
klogV := klog.V(5)
if klogV.Enabled() {
klogV.Infof("QS(%s) at t=%s R=%v: immediate dispatch of request %q %#+v %#+v, qs will have %d executing", qs.qCfg.Name, now.Format(nsTimeFmt), qs.currentR, fsName, descr1, descr2, qs.totRequestsExecuting)
@ -711,11 +729,13 @@ func (qs *queueSet) dispatchLocked() bool {
return false
}
qs.totRequestsWaiting--
qs.totSeatsWaiting -= request.MaxSeats()
metrics.AddRequestsInQueues(request.ctx, qs.qCfg.Name, request.fsName, -1)
request.NoteQueued(false)
qs.reqsGaugePair.RequestsWaiting.Add(-1)
defer qs.boundNextDispatchLocked(queue)
if !request.decision.Set(decisionExecute) {
qs.seatDemandIntegrator.Set(float64(qs.totSeatsInUse + qs.totSeatsWaiting))
return true
}
request.startTime = qs.clock.Now()
@ -732,6 +752,7 @@ func (qs *queueSet) dispatchLocked() bool {
metrics.AddRequestConcurrencyInUse(qs.qCfg.Name, request.fsName, request.MaxSeats())
qs.reqsGaugePair.RequestsExecuting.Add(1)
qs.execSeatsGauge.Add(float64(request.MaxSeats()))
qs.seatDemandIntegrator.Set(float64(qs.totSeatsInUse + qs.totSeatsWaiting))
klogV := klog.V(6)
if klogV.Enabled() {
klogV.Infof("QS(%s) at t=%s R=%v: dispatching request %#+v %#+v work %v from queue %d with start R %v, queue will have %d waiting & %d requests occupying %d seats, set will have %d seats occupied",
@ -894,6 +915,7 @@ func (qs *queueSet) finishRequestLocked(r *request) {
qs.totSeatsInUse -= r.MaxSeats()
metrics.AddRequestConcurrencyInUse(qs.qCfg.Name, r.fsName, -r.MaxSeats())
qs.execSeatsGauge.Add(-float64(r.MaxSeats()))
qs.seatDemandIntegrator.Set(float64(qs.totSeatsInUse + qs.totSeatsWaiting))
if r.queue != nil {
r.queue.seatsInUse -= r.MaxSeats()
}
@ -1011,10 +1033,11 @@ func (qs *queueSet) Dump(includeRequestDetails bool) debug.QueueSetDump {
qs.lock.Lock()
defer qs.lock.Unlock()
d := debug.QueueSetDump{
Queues: make([]debug.QueueDump, len(qs.queues)),
Waiting: qs.totRequestsWaiting,
Executing: qs.totRequestsExecuting,
SeatsInUse: qs.totSeatsInUse,
Queues: make([]debug.QueueDump, len(qs.queues)),
Waiting: qs.totRequestsWaiting,
Executing: qs.totRequestsExecuting,
SeatsInUse: qs.totSeatsInUse,
SeatsWaiting: qs.totSeatsWaiting,
}
for i, q := range qs.queues {
d.Queues[i] = q.dumpLocked(includeRequestDetails)

View File

@ -182,16 +182,18 @@ type uniformScenario struct {
counter counter.GoRoutineCounter
expectedAverages []float64
expectedEpochAdvances int
seatDemandIntegratorSubject fq.Integrator
}
func (us uniformScenario) exercise(t *testing.T) {
uss := uniformScenarioState{
t: t,
uniformScenario: us,
startTime: us.clk.Now(),
integrators: make([]fq.Integrator, len(us.clients)),
executions: make([]int32, len(us.clients)),
rejects: make([]int32, len(us.clients)),
t: t,
uniformScenario: us,
startTime: us.clk.Now(),
execSeatsIntegrators: make([]fq.Integrator, len(us.clients)),
seatDemandIntegratorCheck: fq.NewNamedIntegrator(us.clk, us.name+"-seatDemandCheck"),
executions: make([]int32, len(us.clients)),
rejects: make([]int32, len(us.clients)),
}
for _, uc := range us.clients {
uss.doSplit = uss.doSplit || uc.split
@ -204,7 +206,8 @@ type uniformScenarioState struct {
uniformScenario
startTime time.Time
doSplit bool
integrators []fq.Integrator
execSeatsIntegrators []fq.Integrator
seatDemandIntegratorCheck fq.Integrator
failedCount uint64
expectedInqueue, expectedExecuting, expectedConcurrencyInUse string
executions, rejects []int32
@ -216,18 +219,18 @@ func (uss *uniformScenarioState) exercise() {
metrics.Reset()
}
for i, uc := range uss.clients {
uss.integrators[i] = fq.NewIntegrator(uss.clk)
uss.execSeatsIntegrators[i] = fq.NewNamedIntegrator(uss.clk, fmt.Sprintf("%s client %d execSeats", uss.name, i))
fsName := fmt.Sprintf("client%d", i)
uss.expectedInqueue = uss.expectedInqueue + fmt.Sprintf(` apiserver_flowcontrol_current_inqueue_requests{flow_schema=%q,priority_level=%q} 0%s`, fsName, uss.name, "\n")
for j := 0; j < uc.nThreads; j++ {
ust := uniformScenarioThread{
uss: uss,
i: i,
j: j,
nCalls: uc.nCalls,
uc: uc,
igr: uss.integrators[i],
fsName: fsName,
uss: uss,
i: i,
j: j,
nCalls: uc.nCalls,
uc: uc,
execSeatsIntegrator: uss.execSeatsIntegrators[i],
fsName: fsName,
}
ust.start()
}
@ -241,12 +244,12 @@ func (uss *uniformScenarioState) exercise() {
}
type uniformScenarioThread struct {
uss *uniformScenarioState
i, j int
nCalls int
uc uniformClient
igr fq.Integrator
fsName string
uss *uniformScenarioState
i, j int
nCalls int
uc uniformClient
execSeatsIntegrator fq.Integrator
fsName string
}
func (ust *uniformScenarioThread) start() {
@ -269,11 +272,15 @@ func (ust *uniformScenarioThread) callK(k int) {
if k >= ust.nCalls {
return
}
maxWidth := float64(uint64max(ust.uc.initialSeats, ust.uc.finalSeats))
ust.uss.seatDemandIntegratorCheck.Add(maxWidth)
returnSeatDemand := func(time.Time) { ust.uss.seatDemandIntegratorCheck.Add(-maxWidth) }
req, idle := ust.uss.qs.StartRequest(context.Background(), &fcrequest.WorkEstimate{InitialSeats: ust.uc.initialSeats, FinalSeats: ust.uc.finalSeats, AdditionalLatency: ust.uc.padDuration}, ust.uc.hash, "", ust.fsName, ust.uss.name, []int{ust.i, ust.j, k}, nil)
ust.uss.t.Logf("%s: %d, %d, %d got req=%p, idle=%v", ust.uss.clk.Now().Format(nsTimeFmt), ust.i, ust.j, k, req, idle)
if req == nil {
atomic.AddUint64(&ust.uss.failedCount, 1)
atomic.AddInt32(&ust.uss.rejects[ust.i], 1)
returnSeatDemand(ust.uss.clk.Now())
return
}
if idle {
@ -285,11 +292,12 @@ func (ust *uniformScenarioThread) callK(k int) {
executed = true
execStart := ust.uss.clk.Now()
atomic.AddInt32(&ust.uss.executions[ust.i], 1)
ust.igr.Add(float64(ust.uc.initialSeats))
ust.execSeatsIntegrator.Add(float64(ust.uc.initialSeats))
ust.uss.t.Logf("%s: %d, %d, %d executing; width1=%d", execStart.Format(nsTimeFmt), ust.i, ust.j, k, ust.uc.initialSeats)
ust.uss.clk.EventAfterDuration(ust.genCallK(k+1), ust.uc.execDuration+ust.uc.thinkDuration)
ust.uss.clk.Sleep(ust.uc.execDuration)
ust.igr.Add(-float64(ust.uc.initialSeats))
ust.execSeatsIntegrator.Add(-float64(ust.uc.initialSeats))
ust.uss.clk.EventAfterDuration(returnSeatDemand, ust.uc.padDuration)
returnTime = ust.uss.clk.Now()
})
now := ust.uss.clk.Now()
@ -297,6 +305,7 @@ func (ust *uniformScenarioThread) callK(k int) {
if !executed {
atomic.AddUint64(&ust.uss.failedCount, 1)
atomic.AddInt32(&ust.uss.rejects[ust.i], 1)
returnSeatDemand(ust.uss.clk.Now())
} else if now != returnTime {
ust.uss.t.Errorf("%s: %d, %d, %d returnTime=%s", now.Format(nsTimeFmt), ust.i, ust.j, k, returnTime.Format(nsTimeFmt))
}
@ -319,7 +328,7 @@ func (uss *uniformScenarioState) evalTo(lim time.Time, last, expectFair bool, ma
}
sep := uc.thinkDuration
demands[i] = float64(nThreads) * float64(uc.initialSeats) * float64(uc.execDuration) / float64(sep+uc.execDuration)
averages[i] = uss.integrators[i].Reset().Average
averages[i] = uss.execSeatsIntegrators[i].Reset().Average
}
fairAverages := uss.expectedAverages
if fairAverages == nil {
@ -341,6 +350,25 @@ func (uss *uniformScenarioState) evalTo(lim time.Time, last, expectFair bool, ma
uss.t.Logf("%s client %d last=%v expectFair=%v margin=%v got an Average of %v and the expected average was %v", uss.name, i, last, expectFair, margin, averages[i], expectedAverage)
}
}
if uss.seatDemandIntegratorSubject != nil {
checkResults := uss.seatDemandIntegratorCheck.GetResults()
subjectResults := uss.seatDemandIntegratorSubject.GetResults()
if float64close(subjectResults.Duration, checkResults.Duration) {
uss.t.Logf("%s last=%v got duration of %v and expected %v", uss.name, last, subjectResults.Duration, checkResults.Duration)
} else {
uss.t.Errorf("%s last=%v got duration of %v but expected %v", uss.name, last, subjectResults.Duration, checkResults.Duration)
}
if got, expected := float64NaNTo0(subjectResults.Average), float64NaNTo0(checkResults.Average); float64close(got, expected) {
uss.t.Logf("%s last=%v got SeatDemand average of %v and expected %v", uss.name, last, got, expected)
} else {
uss.t.Errorf("%s last=%v got SeatDemand average of %v but expected %v", uss.name, last, got, expected)
}
if got, expected := float64NaNTo0(subjectResults.Deviation), float64NaNTo0(checkResults.Deviation); float64close(got, expected) {
uss.t.Logf("%s last=%v got SeatDemand standard deviation of %v and expected %v", uss.name, last, got, expected)
} else {
uss.t.Errorf("%s last=%v got SeatDemand standard deviation of %v but expected %v", uss.name, last, got, expected)
}
}
}
func (uss *uniformScenarioState) finalReview() {
@ -445,7 +473,7 @@ func TestNoRestraint(t *testing.T) {
t.Run(testCase.name, func(t *testing.T) {
now := time.Now()
clk, counter := testeventclock.NewFake(now, 0, nil)
nrc, err := test.NewNoRestraintFactory().BeginConstruction(fq.QueuingConfig{}, newGaugePair(clk), newExecSeatsGauge(clk))
nrc, err := test.NewNoRestraintFactory().BeginConstruction(fq.QueuingConfig{}, newGaugePair(clk), newExecSeatsGauge(clk), fq.NewNamedIntegrator(clk, "TestNoRestraint"))
if err != nil {
t.Fatal(err)
}
@ -481,7 +509,8 @@ func TestBaseline(t *testing.T) {
HandSize: 3,
RequestWaitLimit: 10 * time.Minute,
}
qsc, err := qsf.BeginConstruction(qCfg, newGaugePair(clk), newExecSeatsGauge(clk))
seatDemandIntegratorSubject := fq.NewNamedIntegrator(clk, "seatDemandSubject")
qsc, err := qsf.BeginConstruction(qCfg, newGaugePair(clk), newExecSeatsGauge(clk), seatDemandIntegratorSubject)
if err != nil {
t.Fatal(err)
}
@ -492,15 +521,16 @@ func TestBaseline(t *testing.T) {
clients: []uniformClient{
newUniformClient(1001001001, 1, 21, time.Second, 0),
},
concurrencyLimit: 1,
evalDuration: time.Second * 20,
expectedFair: []bool{true},
expectedFairnessMargin: []float64{0},
expectAllRequests: true,
evalInqueueMetrics: true,
evalExecutingMetrics: true,
clk: clk,
counter: counter,
concurrencyLimit: 1,
evalDuration: time.Second * 20,
expectedFair: []bool{true},
expectedFairnessMargin: []float64{0},
expectAllRequests: true,
evalInqueueMetrics: true,
evalExecutingMetrics: true,
clk: clk,
counter: counter,
seatDemandIntegratorSubject: seatDemandIntegratorSubject,
}.exercise(t)
}
@ -550,7 +580,8 @@ func TestSeparations(t *testing.T) {
HandSize: 3,
RequestWaitLimit: 10 * time.Minute,
}
qsc, err := qsf.BeginConstruction(qCfg, newGaugePair(clk), newExecSeatsGauge(clk))
seatDemandIntegratorSubject := fq.NewNamedIntegrator(clk, caseName+" seatDemandSubject")
qsc, err := qsf.BeginConstruction(qCfg, newGaugePair(clk), newExecSeatsGauge(clk), seatDemandIntegratorSubject)
if err != nil {
t.Fatal(err)
}
@ -561,16 +592,17 @@ func TestSeparations(t *testing.T) {
newUniformClient(1001001001, 1, 25, time.Second, seps.think).pad(seps.finalSeats, seps.pad),
newUniformClient(2002002002, 1, 25, time.Second, seps.think).pad(seps.finalSeats, seps.pad),
}[:seps.nClients],
concurrencyLimit: seps.conc,
evalDuration: time.Second * 24, // multiple of every period involved, so that margin can be 0 below
expectedFair: []bool{true},
expectedFairnessMargin: []float64{0},
expectAllRequests: true,
evalInqueueMetrics: true,
evalExecutingMetrics: true,
clk: clk,
counter: counter,
expectedAverages: seps.exp,
concurrencyLimit: seps.conc,
evalDuration: time.Second * 24, // multiple of every period involved, so that margin can be 0 below
expectedFair: []bool{true},
expectedFairnessMargin: []float64{0},
expectAllRequests: true,
evalInqueueMetrics: true,
evalExecutingMetrics: true,
clk: clk,
counter: counter,
expectedAverages: seps.exp,
seatDemandIntegratorSubject: seatDemandIntegratorSubject,
}.exercise(t)
})
}
@ -589,7 +621,8 @@ func TestUniformFlowsHandSize1(t *testing.T) {
HandSize: 1,
RequestWaitLimit: 10 * time.Minute,
}
qsc, err := qsf.BeginConstruction(qCfg, newGaugePair(clk), newExecSeatsGauge(clk))
seatDemandIntegratorSubject := fq.NewNamedIntegrator(clk, "seatDemandSubject")
qsc, err := qsf.BeginConstruction(qCfg, newGaugePair(clk), newExecSeatsGauge(clk), seatDemandIntegratorSubject)
if err != nil {
t.Fatal(err)
}
@ -601,15 +634,16 @@ func TestUniformFlowsHandSize1(t *testing.T) {
newUniformClient(1001001001, 8, 20, time.Second, time.Second-1),
newUniformClient(2002002002, 8, 20, time.Second, time.Second-1),
},
concurrencyLimit: 4,
evalDuration: time.Second * 50,
expectedFair: []bool{true},
expectedFairnessMargin: []float64{0.01},
expectAllRequests: true,
evalInqueueMetrics: true,
evalExecutingMetrics: true,
clk: clk,
counter: counter,
concurrencyLimit: 4,
evalDuration: time.Second * 50,
expectedFair: []bool{true},
expectedFairnessMargin: []float64{0.01},
expectAllRequests: true,
evalInqueueMetrics: true,
evalExecutingMetrics: true,
clk: clk,
counter: counter,
seatDemandIntegratorSubject: seatDemandIntegratorSubject,
}.exercise(t)
}
@ -626,7 +660,8 @@ func TestUniformFlowsHandSize3(t *testing.T) {
HandSize: 3,
RequestWaitLimit: 10 * time.Minute,
}
qsc, err := qsf.BeginConstruction(qCfg, newGaugePair(clk), newExecSeatsGauge(clk))
seatDemandIntegratorSubject := fq.NewNamedIntegrator(clk, qCfg.Name)
qsc, err := qsf.BeginConstruction(qCfg, newGaugePair(clk), newExecSeatsGauge(clk), seatDemandIntegratorSubject)
if err != nil {
t.Fatal(err)
}
@ -637,15 +672,16 @@ func TestUniformFlowsHandSize3(t *testing.T) {
newUniformClient(400900100100, 8, 30, time.Second, time.Second-1),
newUniformClient(300900200200, 8, 30, time.Second, time.Second-1),
},
concurrencyLimit: 4,
evalDuration: time.Second * 60,
expectedFair: []bool{true},
expectedFairnessMargin: []float64{0.03},
expectAllRequests: true,
evalInqueueMetrics: true,
evalExecutingMetrics: true,
clk: clk,
counter: counter,
concurrencyLimit: 4,
evalDuration: time.Second * 60,
expectedFair: []bool{true},
expectedFairnessMargin: []float64{0.03},
expectAllRequests: true,
evalInqueueMetrics: true,
evalExecutingMetrics: true,
clk: clk,
counter: counter,
seatDemandIntegratorSubject: seatDemandIntegratorSubject,
}.exercise(t)
}
@ -662,7 +698,8 @@ func TestDifferentFlowsExpectEqual(t *testing.T) {
HandSize: 1,
RequestWaitLimit: 10 * time.Minute,
}
qsc, err := qsf.BeginConstruction(qCfg, newGaugePair(clk), newExecSeatsGauge(clk))
seatDemandIntegratorSubject := fq.NewNamedIntegrator(clk, qCfg.Name)
qsc, err := qsf.BeginConstruction(qCfg, newGaugePair(clk), newExecSeatsGauge(clk), seatDemandIntegratorSubject)
if err != nil {
t.Fatal(err)
}
@ -674,15 +711,16 @@ func TestDifferentFlowsExpectEqual(t *testing.T) {
newUniformClient(1001001001, 8, 20, time.Second, time.Second),
newUniformClient(2002002002, 7, 30, time.Second, time.Second/2),
},
concurrencyLimit: 4,
evalDuration: time.Second * 40,
expectedFair: []bool{true},
expectedFairnessMargin: []float64{0.01},
expectAllRequests: true,
evalInqueueMetrics: true,
evalExecutingMetrics: true,
clk: clk,
counter: counter,
concurrencyLimit: 4,
evalDuration: time.Second * 40,
expectedFair: []bool{true},
expectedFairnessMargin: []float64{0.01},
expectAllRequests: true,
evalInqueueMetrics: true,
evalExecutingMetrics: true,
clk: clk,
counter: counter,
seatDemandIntegratorSubject: seatDemandIntegratorSubject,
}.exercise(t)
}
@ -702,7 +740,8 @@ func TestSeatSecondsRollover(t *testing.T) {
HandSize: 1,
RequestWaitLimit: 40 * Quarter,
}
qsc, err := qsf.BeginConstruction(qCfg, newGaugePair(clk), newExecSeatsGauge(clk))
seatDemandIntegratorSubject := fq.NewNamedIntegrator(clk, qCfg.Name)
qsc, err := qsf.BeginConstruction(qCfg, newGaugePair(clk), newExecSeatsGauge(clk), seatDemandIntegratorSubject)
if err != nil {
t.Fatal(err)
}
@ -714,16 +753,17 @@ func TestSeatSecondsRollover(t *testing.T) {
newUniformClient(1001001001, 8, 20, Quarter, Quarter).setInitWidth(500),
newUniformClient(2002002002, 7, 30, Quarter, Quarter/2).setInitWidth(500),
},
concurrencyLimit: 2000,
evalDuration: Quarter * 40,
expectedFair: []bool{true},
expectedFairnessMargin: []float64{0.01},
expectAllRequests: true,
evalInqueueMetrics: true,
evalExecutingMetrics: true,
clk: clk,
counter: counter,
expectedEpochAdvances: 8,
concurrencyLimit: 2000,
evalDuration: Quarter * 40,
expectedFair: []bool{true},
expectedFairnessMargin: []float64{0.01},
expectAllRequests: true,
evalInqueueMetrics: true,
evalExecutingMetrics: true,
clk: clk,
counter: counter,
expectedEpochAdvances: 8,
seatDemandIntegratorSubject: seatDemandIntegratorSubject,
}.exercise(t)
}
@ -740,7 +780,8 @@ func TestDifferentFlowsExpectUnequal(t *testing.T) {
HandSize: 1,
RequestWaitLimit: 10 * time.Minute,
}
qsc, err := qsf.BeginConstruction(qCfg, newGaugePair(clk), newExecSeatsGauge(clk))
seatDemandIntegratorSubject := fq.NewNamedIntegrator(clk, qCfg.Name)
qsc, err := qsf.BeginConstruction(qCfg, newGaugePair(clk), newExecSeatsGauge(clk), seatDemandIntegratorSubject)
if err != nil {
t.Fatal(err)
}
@ -752,15 +793,16 @@ func TestDifferentFlowsExpectUnequal(t *testing.T) {
newUniformClient(1001001001, 4, 20, time.Second, time.Second-1),
newUniformClient(2002002002, 2, 20, time.Second, time.Second-1),
},
concurrencyLimit: 3,
evalDuration: time.Second * 20,
expectedFair: []bool{true},
expectedFairnessMargin: []float64{0.01},
expectAllRequests: true,
evalInqueueMetrics: true,
evalExecutingMetrics: true,
clk: clk,
counter: counter,
concurrencyLimit: 3,
evalDuration: time.Second * 20,
expectedFair: []bool{true},
expectedFairnessMargin: []float64{0.01},
expectAllRequests: true,
evalInqueueMetrics: true,
evalExecutingMetrics: true,
clk: clk,
counter: counter,
seatDemandIntegratorSubject: seatDemandIntegratorSubject,
}.exercise(t)
}
@ -777,7 +819,8 @@ func TestDifferentWidths(t *testing.T) {
HandSize: 7,
RequestWaitLimit: 10 * time.Minute,
}
qsc, err := qsf.BeginConstruction(qCfg, newGaugePair(clk), newExecSeatsGauge(clk))
seatDemandIntegratorSubject := fq.NewNamedIntegrator(clk, qCfg.Name)
qsc, err := qsf.BeginConstruction(qCfg, newGaugePair(clk), newExecSeatsGauge(clk), seatDemandIntegratorSubject)
if err != nil {
t.Fatal(err)
}
@ -788,15 +831,16 @@ func TestDifferentWidths(t *testing.T) {
newUniformClient(10010010010010, 13, 10, time.Second, time.Second-1),
newUniformClient(20020020020020, 7, 10, time.Second, time.Second-1).setInitWidth(2),
},
concurrencyLimit: 6,
evalDuration: time.Second * 20,
expectedFair: []bool{true},
expectedFairnessMargin: []float64{0.15},
expectAllRequests: true,
evalInqueueMetrics: true,
evalExecutingMetrics: true,
clk: clk,
counter: counter,
concurrencyLimit: 6,
evalDuration: time.Second * 20,
expectedFair: []bool{true},
expectedFairnessMargin: []float64{0.15},
expectAllRequests: true,
evalInqueueMetrics: true,
evalExecutingMetrics: true,
clk: clk,
counter: counter,
seatDemandIntegratorSubject: seatDemandIntegratorSubject,
}.exercise(t)
}
@ -813,7 +857,8 @@ func TestTooWide(t *testing.T) {
HandSize: 7,
RequestWaitLimit: 10 * time.Minute,
}
qsc, err := qsf.BeginConstruction(qCfg, newGaugePair(clk), newExecSeatsGauge(clk))
seatDemandIntegratorSubject := fq.NewNamedIntegrator(clk, qCfg.Name)
qsc, err := qsf.BeginConstruction(qCfg, newGaugePair(clk), newExecSeatsGauge(clk), seatDemandIntegratorSubject)
if err != nil {
t.Fatal(err)
}
@ -827,15 +872,16 @@ func TestTooWide(t *testing.T) {
newUniformClient(70070070070070, 15, 21, time.Second, time.Second-1).setInitWidth(2),
newUniformClient(90090090090090, 15, 21, time.Second, time.Second-1).setInitWidth(7),
},
concurrencyLimit: 6,
evalDuration: time.Second * 225,
expectedFair: []bool{true},
expectedFairnessMargin: []float64{0.33},
expectAllRequests: true,
evalInqueueMetrics: true,
evalExecutingMetrics: true,
clk: clk,
counter: counter,
concurrencyLimit: 6,
evalDuration: time.Second * 225,
expectedFair: []bool{true},
expectedFairnessMargin: []float64{0.33},
expectAllRequests: true,
evalInqueueMetrics: true,
evalExecutingMetrics: true,
clk: clk,
counter: counter,
seatDemandIntegratorSubject: seatDemandIntegratorSubject,
}.exercise(t)
}
@ -874,7 +920,8 @@ func TestWindup(t *testing.T) {
HandSize: 1,
RequestWaitLimit: 10 * time.Minute,
}
qsc, err := qsf.BeginConstruction(qCfg, newGaugePair(clk), newExecSeatsGauge(clk))
seatDemandIntegratorSubject := fq.NewNamedIntegrator(clk, qCfg.Name)
qsc, err := qsf.BeginConstruction(qCfg, newGaugePair(clk), newExecSeatsGauge(clk), seatDemandIntegratorSubject)
if err != nil {
t.Fatal(err)
}
@ -885,15 +932,16 @@ func TestWindup(t *testing.T) {
newUniformClient(1001001001, 2, 40, time.Second, -1),
newUniformClient(2002002002, 2, 40, time.Second, -1).setSplit(),
},
concurrencyLimit: 3,
evalDuration: time.Second * 40,
expectedFair: []bool{true, testCase.expectFair2},
expectedFairnessMargin: []float64{0.01, testCase.margin2},
expectAllRequests: true,
evalInqueueMetrics: true,
evalExecutingMetrics: true,
clk: clk,
counter: counter,
concurrencyLimit: 3,
evalDuration: time.Second * 40,
expectedFair: []bool{true, testCase.expectFair2},
expectedFairnessMargin: []float64{0.01, testCase.margin2},
expectAllRequests: true,
evalInqueueMetrics: true,
evalExecutingMetrics: true,
clk: clk,
counter: counter,
seatDemandIntegratorSubject: seatDemandIntegratorSubject,
}.exercise(t)
})
}
@ -909,7 +957,8 @@ func TestDifferentFlowsWithoutQueuing(t *testing.T) {
Name: "TestDifferentFlowsWithoutQueuing",
DesiredNumQueues: 0,
}
qsc, err := qsf.BeginConstruction(qCfg, newGaugePair(clk), newExecSeatsGauge(clk))
seatDemandIntegratorSubject := fq.NewNamedIntegrator(clk, "seatDemandSubject")
qsc, err := qsf.BeginConstruction(qCfg, newGaugePair(clk), newExecSeatsGauge(clk), seatDemandIntegratorSubject)
if err != nil {
t.Fatal(err)
}
@ -921,14 +970,15 @@ func TestDifferentFlowsWithoutQueuing(t *testing.T) {
newUniformClient(1001001001, 6, 10, time.Second, 57*time.Millisecond),
newUniformClient(2002002002, 4, 15, time.Second, 750*time.Millisecond),
},
concurrencyLimit: 4,
evalDuration: time.Second * 13,
expectedFair: []bool{false},
expectedFairnessMargin: []float64{0.20},
evalExecutingMetrics: true,
rejectReason: "concurrency-limit",
clk: clk,
counter: counter,
concurrencyLimit: 4,
evalDuration: time.Second * 13,
expectedFair: []bool{false},
expectedFairnessMargin: []float64{0.20},
evalExecutingMetrics: true,
rejectReason: "concurrency-limit",
clk: clk,
counter: counter,
seatDemandIntegratorSubject: seatDemandIntegratorSubject,
}.exercise(t)
}
@ -945,7 +995,8 @@ func TestTimeout(t *testing.T) {
HandSize: 1,
RequestWaitLimit: 0,
}
qsc, err := qsf.BeginConstruction(qCfg, newGaugePair(clk), newExecSeatsGauge(clk))
seatDemandIntegratorSubject := fq.NewNamedIntegrator(clk, qCfg.Name)
qsc, err := qsf.BeginConstruction(qCfg, newGaugePair(clk), newExecSeatsGauge(clk), seatDemandIntegratorSubject)
if err != nil {
t.Fatal(err)
}
@ -956,15 +1007,16 @@ func TestTimeout(t *testing.T) {
clients: []uniformClient{
newUniformClient(1001001001, 5, 100, time.Second, time.Second),
},
concurrencyLimit: 1,
evalDuration: time.Second * 10,
expectedFair: []bool{true},
expectedFairnessMargin: []float64{0.01},
evalInqueueMetrics: true,
evalExecutingMetrics: true,
rejectReason: "time-out",
clk: clk,
counter: counter,
concurrencyLimit: 1,
evalDuration: time.Second * 10,
expectedFair: []bool{true},
expectedFairnessMargin: []float64{0.01},
evalInqueueMetrics: true,
evalExecutingMetrics: true,
rejectReason: "time-out",
clk: clk,
counter: counter,
seatDemandIntegratorSubject: seatDemandIntegratorSubject,
}.exercise(t)
}
@ -996,7 +1048,8 @@ func TestContextCancel(t *testing.T) {
HandSize: 1,
RequestWaitLimit: 15 * time.Second,
}
qsc, err := qsf.BeginConstruction(qCfg, newGaugePair(clk), newExecSeatsGauge(clk))
seatDemandIntegratorSubject := fq.NewNamedIntegrator(clk, qCfg.Name)
qsc, err := qsf.BeginConstruction(qCfg, newGaugePair(clk), newExecSeatsGauge(clk), seatDemandIntegratorSubject)
if err != nil {
t.Fatal(err)
}
@ -1102,7 +1155,7 @@ func TestTotalRequestsExecutingWithPanic(t *testing.T) {
DesiredNumQueues: 0,
RequestWaitLimit: 15 * time.Second,
}
qsc, err := qsf.BeginConstruction(qCfg, newGaugePair(clk), newExecSeatsGauge(clk))
qsc, err := qsf.BeginConstruction(qCfg, newGaugePair(clk), newExecSeatsGauge(clk), fq.NewNamedIntegrator(clk, qCfg.Name))
if err != nil {
t.Fatal(err)
}
@ -1286,6 +1339,7 @@ func TestFindDispatchQueueLocked(t *testing.T) {
t.Run(test.name, func(t *testing.T) {
qs := &queueSet{
estimatedServiceDuration: G,
seatDemandIntegrator: fq.NewNamedIntegrator(clock.RealClock{}, "seatDemandSubject"),
robinIndex: test.robinIndex,
totSeatsInUse: test.totSeatsInUse,
qCfg: fq.QueuingConfig{Name: "TestSelectQueueLocked/" + test.name},
@ -1359,6 +1413,7 @@ func TestFinishRequestLocked(t *testing.T) {
estimatedServiceDuration: time.Second,
reqsGaugePair: newGaugePair(clk),
execSeatsGauge: newExecSeatsGauge(clk),
seatDemandIntegrator: fq.NewNamedIntegrator(clk, "seatDemandSubject"),
}
queue := &queue{
requests: newRequestFIFO(),
@ -1468,3 +1523,28 @@ func newGaugePair(clk clock.PassiveClock) metrics.RatioedGaugePair {
func newExecSeatsGauge(clk clock.PassiveClock) metrics.RatioedGauge {
return metrics.PriorityLevelExecutionSeatsGaugeVec.NewForLabelValuesSafe(0, 1, []string{"test"})
}
func float64close(x, y float64) bool {
x0 := float64NaNTo0(x)
y0 := float64NaNTo0(y)
diff := math.Abs(x - y)
den := math.Max(math.Abs(x0), math.Abs(y0))
if den == 0 {
return diff < 1e-10
}
return diff/den < 1e-10
}
func uint64max(a, b uint64) uint64 {
if b > a {
return b
}
return a
}
func float64NaNTo0(x float64) float64 {
if math.IsNaN(x) {
return 0
}
return x
}

View File

@ -40,7 +40,7 @@ type noRestraint struct{}
type noRestraintRequest struct{}
func (noRestraintFactory) BeginConstruction(fq.QueuingConfig, metrics.RatioedGaugePair, metrics.RatioedGauge) (fq.QueueSetCompleter, error) {
func (noRestraintFactory) BeginConstruction(fq.QueuingConfig, metrics.RatioedGaugePair, metrics.RatioedGauge, fq.Integrator) (fq.QueueSetCompleter, error) {
return noRestraintCompleter{}, nil
}

View File

@ -23,12 +23,15 @@ import (
"testing"
"time"
"k8s.io/utils/clock"
flowcontrol "k8s.io/api/flowcontrol/v1beta3"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/sets"
fcboot "k8s.io/apiserver/pkg/apis/flowcontrol/bootstrap"
"k8s.io/apiserver/pkg/authentication/user"
"k8s.io/apiserver/pkg/endpoints/request"
fq "k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing"
fqtesting "k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/testing"
fcfmt "k8s.io/apiserver/pkg/util/flowcontrol/format"
"k8s.io/apiserver/pkg/util/flowcontrol/metrics"
@ -57,7 +60,7 @@ func genPL(rng *rand.Rand, name string) *flowcontrol.PriorityLevelConfiguration
QueueLengthLimit: 5}
}
labelVals := []string{"test"}
_, err := queueSetCompleterForPL(noRestraintQSF, nil, plc, time.Minute, metrics.RatioedGaugeVecPhasedElementPair(metrics.PriorityLevelConcurrencyGaugeVec, 1, 1, labelVals), metrics.PriorityLevelExecutionSeatsGaugeVec.NewForLabelValuesSafe(0, 1, labelVals))
_, err := queueSetCompleterForPL(noRestraintQSF, nil, plc, time.Minute, metrics.RatioedGaugeVecPhasedElementPair(metrics.PriorityLevelConcurrencyGaugeVec, 1, 1, labelVals), metrics.PriorityLevelExecutionSeatsGaugeVec.NewForLabelValuesSafe(0, 1, labelVals), fq.NewNamedIntegrator(clock.RealClock{}, name))
if err != nil {
panic(err)
}