Add instrumentation for seat borrowing

This commit is contained in:
Mike Spreitzer 2022-10-20 15:21:09 -04:00
parent 256ade5aa4
commit 9b684579e2
10 changed files with 362 additions and 197 deletions

View File

@ -71,21 +71,16 @@ const (
// Borrowing among priority levels will be accomplished by periodically // Borrowing among priority levels will be accomplished by periodically
// adjusting the current concurrency limits (CurrentCLs); // adjusting the current concurrency limits (CurrentCLs);
// borrowingAdjustmentPeriod is that period. // borrowingAdjustmentPeriod is that period.
borrowingAdjustmentPeriod = 10 * time.Second //nolint:golint,unused borrowingAdjustmentPeriod = 10 * time.Second
// The input to the borrowing is smoothed seat demand figures. // The input to the seat borrowing is smoothed seat demand figures.
// Every adjustment period, each priority level's smoothed demand is adjusted // This constant controls the decay rate of that smoothing,
// based on an envelope of that level's recent seat demand. The formula is: // as described in the comment on the `seatDemandStats` field of `priorityLevelState`.
// SmoothSeatDemand := max( EnvelopeSeatDemand, // The particular number appearing here has the property that half-life
// seatDemandSmoothingCoefficient * SmoothSeatDemand + // of that decay is 5 minutes.
// (1-seatDemandSmoothingCoefficient) * EnvelopeSeatDemand ).
// Qualitatively: this parameter controls the rate at which the smoothed seat demand drifts
// down toward the envelope of seat demand while that is lower.
// The particular number appearing here has the property that half of the
// current smoothed value comes from the smoothed value of 5 minutes ago.
// This is a very preliminary guess at a good value and is likely to be tweaked // This is a very preliminary guess at a good value and is likely to be tweaked
// once we get some experience with borrowing. // once we get some experience with borrowing.
seatDemandSmoothingCoefficient = 0.977 //nolint:golint,unused seatDemandSmoothingCoefficient = 0.977
) )
// The funcs in this package follow the naming convention that the suffix // The funcs in this package follow the naming convention that the suffix
@ -217,6 +212,43 @@ type priorityLevelState struct {
// Observer of number of seats occupied throughout execution // Observer of number of seats occupied throughout execution
execSeatsObs metrics.RatioedGauge execSeatsObs metrics.RatioedGauge
// Integrator of seat demand, reset every CurrentCL adjustment period
seatDemandIntegrator fq.Integrator
// seatDemandStats is derived from periodically examining the seatDemandIntegrator.
// The average, standard deviation, and high watermark come directly from the integrator.
// envelope = avg + stdDev.
// Periodically smoothed gets replaced with `max(envelope, A*smoothed + (1-A)*envelope)`,
// where A is seatDemandSmoothingCoefficient.
seatDemandStats seatDemandStats
}
type seatDemandStats struct {
avg float64
stdDev float64
highWatermark float64
envelope float64
smoothed float64
}
func newSeatDemandStats(val float64) seatDemandStats {
return seatDemandStats{
avg: val,
stdDev: 0,
highWatermark: val,
envelope: val,
smoothed: val,
}
}
func (stats *seatDemandStats) update(obs fq.IntegratorResults) {
stats.avg = obs.Average
stats.stdDev = obs.Deviation
stats.highWatermark = obs.Max
envelope := obs.Average + obs.Deviation
stats.envelope = envelope
stats.smoothed = math.Max(envelope, seatDemandSmoothingCoefficient*stats.smoothed+(1-seatDemandSmoothingCoefficient)*envelope)
} }
// NewTestableController is extra flexible to facilitate testing // NewTestableController is extra flexible to facilitate testing
@ -325,11 +357,25 @@ func (cfgCtlr *configController) Run(stopCh <-chan struct{}) error {
klog.Info("Running API Priority and Fairness config worker") klog.Info("Running API Priority and Fairness config worker")
go wait.Until(cfgCtlr.runWorker, time.Second, stopCh) go wait.Until(cfgCtlr.runWorker, time.Second, stopCh)
klog.Info("Running API Priority and Fairness periodic rebalancing process")
go wait.Until(cfgCtlr.updateBorrowing, borrowingAdjustmentPeriod, stopCh)
<-stopCh <-stopCh
klog.Info("Shutting down API Priority and Fairness config worker") klog.Info("Shutting down API Priority and Fairness config worker")
return nil return nil
} }
func (cfgCtlr *configController) updateBorrowing() {
cfgCtlr.lock.Lock()
defer cfgCtlr.lock.Unlock()
for _, plState := range cfgCtlr.priorityLevelStates {
obs := plState.seatDemandIntegrator.Reset()
plState.seatDemandStats.update(obs)
// TODO: set borrowing metrics introduced in https://github.com/kubernetes/enhancements/pull/3391
// TODO: updathe CurrentCL as described in https://github.com/kubernetes/enhancements/tree/master/keps/sig-api-machinery/1040-priority-and-fairness#dispatching
}
}
// runWorker is the logic of the one and only worker goroutine. We // runWorker is the logic of the one and only worker goroutine. We
// limit the number to one in order to obviate explicit // limit the number to one in order to obviate explicit
// synchronization around access to `cfgCtlr.mostRecentUpdates`. // synchronization around access to `cfgCtlr.mostRecentUpdates`.
@ -552,9 +598,13 @@ func (meal *cfgMeal) digestNewPLsLocked(newPLs []*flowcontrol.PriorityLevelConfi
state := meal.cfgCtlr.priorityLevelStates[pl.Name] state := meal.cfgCtlr.priorityLevelStates[pl.Name]
if state == nil { if state == nil {
labelValues := []string{pl.Name} labelValues := []string{pl.Name}
state = &priorityLevelState{reqsGaugePair: metrics.RatioedGaugeVecPhasedElementPair(meal.cfgCtlr.reqsGaugeVec, 1, 1, labelValues), execSeatsObs: meal.cfgCtlr.execSeatsGaugeVec.NewForLabelValuesSafe(0, 1, labelValues)} state = &priorityLevelState{
reqsGaugePair: metrics.RatioedGaugeVecPhasedElementPair(meal.cfgCtlr.reqsGaugeVec, 1, 1, labelValues),
execSeatsObs: meal.cfgCtlr.execSeatsGaugeVec.NewForLabelValuesSafe(0, 1, labelValues),
seatDemandIntegrator: fq.NewNamedIntegrator(meal.cfgCtlr.clock, pl.Name),
}
} }
qsCompleter, err := queueSetCompleterForPL(meal.cfgCtlr.queueSetFactory, state.queues, pl, meal.cfgCtlr.requestWaitLimit, state.reqsGaugePair, state.execSeatsObs) qsCompleter, err := queueSetCompleterForPL(meal.cfgCtlr.queueSetFactory, state.queues, pl, meal.cfgCtlr.requestWaitLimit, state.reqsGaugePair, state.execSeatsObs, state.seatDemandIntegrator)
if err != nil { if err != nil {
klog.Warningf("Ignoring PriorityLevelConfiguration object %s because its spec (%s) is broken: %s", pl.Name, fcfmt.Fmt(pl.Spec), err) klog.Warningf("Ignoring PriorityLevelConfiguration object %s because its spec (%s) is broken: %s", pl.Name, fcfmt.Fmt(pl.Spec), err)
continue continue
@ -658,7 +708,7 @@ func (meal *cfgMeal) processOldPLsLocked() {
} }
} }
var err error var err error
plState.qsCompleter, err = queueSetCompleterForPL(meal.cfgCtlr.queueSetFactory, plState.queues, plState.pl, meal.cfgCtlr.requestWaitLimit, plState.reqsGaugePair, plState.execSeatsObs) plState.qsCompleter, err = queueSetCompleterForPL(meal.cfgCtlr.queueSetFactory, plState.queues, plState.pl, meal.cfgCtlr.requestWaitLimit, plState.reqsGaugePair, plState.execSeatsObs, plState.seatDemandIntegrator)
if err != nil { if err != nil {
// This can not happen because queueSetCompleterForPL already approved this config // This can not happen because queueSetCompleterForPL already approved this config
panic(fmt.Sprintf("%s from name=%q spec=%s", err, plName, fcfmt.Fmt(plState.pl.Spec))) panic(fmt.Sprintf("%s from name=%q spec=%s", err, plName, fcfmt.Fmt(plState.pl.Spec)))
@ -702,6 +752,8 @@ func (meal *cfgMeal) finishQueueSetReconfigsLocked() {
if plState.queues == nil { if plState.queues == nil {
klog.V(5).Infof("Introducing queues for priority level %q: config=%s, concurrencyLimit=%d, quiescing=%v (shares=%v, shareSum=%v)", plName, fcfmt.Fmt(plState.pl.Spec), concurrencyLimit, plState.quiescing, plState.pl.Spec.Limited.NominalConcurrencyShares, meal.shareSum) klog.V(5).Infof("Introducing queues for priority level %q: config=%s, concurrencyLimit=%d, quiescing=%v (shares=%v, shareSum=%v)", plName, fcfmt.Fmt(plState.pl.Spec), concurrencyLimit, plState.quiescing, plState.pl.Spec.Limited.NominalConcurrencyShares, meal.shareSum)
plState.seatDemandStats = newSeatDemandStats(float64(concurrencyLimit))
// TODO: initialize as described in https://github.com/kubernetes/enhancements/tree/master/keps/sig-api-machinery/1040-priority-and-fairness#dispatching once NominalCL values are implemented
} else { } else {
klog.V(5).Infof("Retaining queues for priority level %q: config=%s, concurrencyLimit=%d, quiescing=%v, numPending=%d (shares=%v, shareSum=%v)", plName, fcfmt.Fmt(plState.pl.Spec), concurrencyLimit, plState.quiescing, plState.numPending, plState.pl.Spec.Limited.NominalConcurrencyShares, meal.shareSum) klog.V(5).Infof("Retaining queues for priority level %q: config=%s, concurrencyLimit=%d, quiescing=%v, numPending=%d (shares=%v, shareSum=%v)", plName, fcfmt.Fmt(plState.pl.Spec), concurrencyLimit, plState.quiescing, plState.numPending, plState.pl.Spec.Limited.NominalConcurrencyShares, meal.shareSum)
} }
@ -713,7 +765,7 @@ func (meal *cfgMeal) finishQueueSetReconfigsLocked() {
// given priority level configuration. Returns nil if that config // given priority level configuration. Returns nil if that config
// does not call for limiting. Returns nil and an error if the given // does not call for limiting. Returns nil and an error if the given
// object is malformed in a way that is a problem for this package. // object is malformed in a way that is a problem for this package.
func queueSetCompleterForPL(qsf fq.QueueSetFactory, queues fq.QueueSet, pl *flowcontrol.PriorityLevelConfiguration, requestWaitLimit time.Duration, reqsIntPair metrics.RatioedGaugePair, execSeatsObs metrics.RatioedGauge) (fq.QueueSetCompleter, error) { func queueSetCompleterForPL(qsf fq.QueueSetFactory, queues fq.QueueSet, pl *flowcontrol.PriorityLevelConfiguration, requestWaitLimit time.Duration, reqsIntPair metrics.RatioedGaugePair, execSeatsObs metrics.RatioedGauge, seatDemandIntgrator fq.Integrator) (fq.QueueSetCompleter, error) {
if (pl.Spec.Type == flowcontrol.PriorityLevelEnablementExempt) != (pl.Spec.Limited == nil) { if (pl.Spec.Type == flowcontrol.PriorityLevelEnablementExempt) != (pl.Spec.Limited == nil) {
return nil, errors.New("broken union structure at the top") return nil, errors.New("broken union structure at the top")
} }
@ -742,7 +794,7 @@ func queueSetCompleterForPL(qsf fq.QueueSetFactory, queues fq.QueueSet, pl *flow
if queues != nil { if queues != nil {
qsc, err = queues.BeginConfigChange(qcQS) qsc, err = queues.BeginConfigChange(qcQS)
} else { } else {
qsc, err = qsf.BeginConstruction(qcQS, reqsIntPair, execSeatsObs) qsc, err = qsf.BeginConstruction(qcQS, reqsIntPair, execSeatsObs, seatDemandIntgrator)
} }
if err != nil { if err != nil {
err = fmt.Errorf("priority level %q has QueuingConfiguration %#+v, which is invalid: %w", pl.Name, qcAPI, err) err = fmt.Errorf("priority level %q has QueuingConfiguration %#+v, which is invalid: %w", pl.Name, qcAPI, err)
@ -790,17 +842,19 @@ func (meal *cfgMeal) imaginePL(proto *flowcontrol.PriorityLevelConfiguration, re
labelValues := []string{proto.Name} labelValues := []string{proto.Name}
reqsGaugePair := metrics.RatioedGaugeVecPhasedElementPair(meal.cfgCtlr.reqsGaugeVec, 1, 1, labelValues) reqsGaugePair := metrics.RatioedGaugeVecPhasedElementPair(meal.cfgCtlr.reqsGaugeVec, 1, 1, labelValues)
execSeatsObs := meal.cfgCtlr.execSeatsGaugeVec.NewForLabelValuesSafe(0, 1, labelValues) execSeatsObs := meal.cfgCtlr.execSeatsGaugeVec.NewForLabelValuesSafe(0, 1, labelValues)
qsCompleter, err := queueSetCompleterForPL(meal.cfgCtlr.queueSetFactory, nil, proto, requestWaitLimit, reqsGaugePair, execSeatsObs) seatDemandIntegrator := fq.NewNamedIntegrator(meal.cfgCtlr.clock, proto.Name)
qsCompleter, err := queueSetCompleterForPL(meal.cfgCtlr.queueSetFactory, nil, proto, requestWaitLimit, reqsGaugePair, execSeatsObs, seatDemandIntegrator)
if err != nil { if err != nil {
// This can not happen because proto is one of the mandatory // This can not happen because proto is one of the mandatory
// objects and these are not erroneous // objects and these are not erroneous
panic(err) panic(err)
} }
meal.newPLStates[proto.Name] = &priorityLevelState{ meal.newPLStates[proto.Name] = &priorityLevelState{
pl: proto, pl: proto,
qsCompleter: qsCompleter, qsCompleter: qsCompleter,
reqsGaugePair: reqsGaugePair, reqsGaugePair: reqsGaugePair,
execSeatsObs: execSeatsObs, execSeatsObs: execSeatsObs,
seatDemandIntegrator: seatDemandIntegrator,
} }
if proto.Spec.Limited != nil { if proto.Spec.Limited != nil {
meal.shareSum += float64(proto.Spec.Limited.NominalConcurrencyShares) meal.shareSum += float64(proto.Spec.Limited.NominalConcurrencyShares)

View File

@ -105,7 +105,7 @@ type ctlrTestRequest struct {
descr1, descr2 interface{} descr1, descr2 interface{}
} }
func (cts *ctlrTestState) BeginConstruction(qc fq.QueuingConfig, rip metrics.RatioedGaugePair, eso metrics.RatioedGauge) (fq.QueueSetCompleter, error) { func (cts *ctlrTestState) BeginConstruction(qc fq.QueuingConfig, rip metrics.RatioedGaugePair, eso metrics.RatioedGauge, sdi fq.Integrator) (fq.QueueSetCompleter, error) {
return ctlrTestQueueSetCompleter{cts, nil, qc}, nil return ctlrTestQueueSetCompleter{cts, nil, qc}, nil
} }

View File

@ -25,10 +25,11 @@ import (
// QueueSetDump is an instant dump of queue-set. // QueueSetDump is an instant dump of queue-set.
type QueueSetDump struct { type QueueSetDump struct {
Queues []QueueDump Queues []QueueDump
Waiting int Waiting int
Executing int Executing int
SeatsInUse int SeatsInUse int
SeatsWaiting int
} }
// QueueDump is an instant dump of one queue in a queue-set. // QueueDump is an instant dump of one queue in a queue-set.

View File

@ -53,6 +53,7 @@ func (x *IntegratorResults) Equal(y *IntegratorResults) bool {
} }
type integrator struct { type integrator struct {
name string
clock clock.PassiveClock clock clock.PassiveClock
sync.Mutex sync.Mutex
lastTime time.Time lastTime time.Time
@ -61,9 +62,10 @@ type integrator struct {
min, max float64 min, max float64
} }
// NewIntegrator makes one that uses the given clock // NewNamedIntegrator makes one that uses the given clock and name
func NewIntegrator(clock clock.PassiveClock) Integrator { func NewNamedIntegrator(clock clock.PassiveClock, name string) Integrator {
return &integrator{ return &integrator{
name: name,
clock: clock, clock: clock,
lastTime: clock.Now(), lastTime: clock.Now(),
} }

View File

@ -27,7 +27,7 @@ import (
func TestIntegrator(t *testing.T) { func TestIntegrator(t *testing.T) {
now := time.Now() now := time.Now()
clk := testclock.NewFakeClock(now) clk := testclock.NewFakeClock(now)
igr := NewIntegrator(clk) igr := NewNamedIntegrator(clk, "testee")
igr.Add(3) igr.Add(3)
clk.Step(time.Second) clk.Step(time.Second)
results := igr.GetResults() results := igr.GetResults()

View File

@ -35,7 +35,9 @@ type QueueSetFactory interface {
// The RatioedGaugePair observes number of requests, // The RatioedGaugePair observes number of requests,
// execution covering just the regular phase. // execution covering just the regular phase.
// The RatioedGauge observes number of seats occupied through all phases of execution. // The RatioedGauge observes number of seats occupied through all phases of execution.
BeginConstruction(QueuingConfig, metrics.RatioedGaugePair, metrics.RatioedGauge) (QueueSetCompleter, error) // The Integrator observes the seat demand (executing + queued seats), and
// the queueset does not read or reset the Integrator.
BeginConstruction(QueuingConfig, metrics.RatioedGaugePair, metrics.RatioedGauge, Integrator) (QueueSetCompleter, error)
} }
// QueueSetCompleter finishes the two-step process of creating or // QueueSetCompleter finishes the two-step process of creating or

View File

@ -60,12 +60,13 @@ type promiseFactoryFactory func(*queueSet) promiseFactory
// `*queueSetCompleter` implements QueueSetCompleter. Exactly one of // `*queueSetCompleter` implements QueueSetCompleter. Exactly one of
// the fields `factory` and `theSet` is non-nil. // the fields `factory` and `theSet` is non-nil.
type queueSetCompleter struct { type queueSetCompleter struct {
factory *queueSetFactory factory *queueSetFactory
reqsGaugePair metrics.RatioedGaugePair reqsGaugePair metrics.RatioedGaugePair
execSeatsGauge metrics.RatioedGauge execSeatsGauge metrics.RatioedGauge
theSet *queueSet seatDemandIntegrator fq.Integrator
qCfg fq.QueuingConfig theSet *queueSet
dealer *shufflesharding.Dealer qCfg fq.QueuingConfig
dealer *shufflesharding.Dealer
} }
// queueSet implements the Fair Queuing for Server Requests technique // queueSet implements the Fair Queuing for Server Requests technique
@ -93,6 +94,8 @@ type queueSet struct {
execSeatsGauge metrics.RatioedGauge // for all phases of execution execSeatsGauge metrics.RatioedGauge // for all phases of execution
seatDemandIntegrator fq.Integrator
promiseFactory promiseFactory promiseFactory promiseFactory
lock sync.Mutex lock sync.Mutex
@ -139,6 +142,10 @@ type queueSet struct {
// request(s) that are currently executing in this queueset. // request(s) that are currently executing in this queueset.
totSeatsInUse int totSeatsInUse int
// totSeatsWaiting is the sum, over all the waiting requests, of their
// max width.
totSeatsWaiting int
// enqueues is the number of requests that have ever been enqueued // enqueues is the number of requests that have ever been enqueued
enqueues int enqueues int
} }
@ -156,17 +163,18 @@ func newTestableQueueSetFactory(c eventclock.Interface, promiseFactoryFactory pr
} }
} }
func (qsf *queueSetFactory) BeginConstruction(qCfg fq.QueuingConfig, reqsGaugePair metrics.RatioedGaugePair, execSeatsGauge metrics.RatioedGauge) (fq.QueueSetCompleter, error) { func (qsf *queueSetFactory) BeginConstruction(qCfg fq.QueuingConfig, reqsGaugePair metrics.RatioedGaugePair, execSeatsGauge metrics.RatioedGauge, seatDemandIntegrator fq.Integrator) (fq.QueueSetCompleter, error) {
dealer, err := checkConfig(qCfg) dealer, err := checkConfig(qCfg)
if err != nil { if err != nil {
return nil, err return nil, err
} }
return &queueSetCompleter{ return &queueSetCompleter{
factory: qsf, factory: qsf,
reqsGaugePair: reqsGaugePair, reqsGaugePair: reqsGaugePair,
execSeatsGauge: execSeatsGauge, execSeatsGauge: execSeatsGauge,
qCfg: qCfg, seatDemandIntegrator: seatDemandIntegrator,
dealer: dealer}, nil qCfg: qCfg,
dealer: dealer}, nil
} }
// checkConfig returns a non-nil Dealer if the config is valid and // checkConfig returns a non-nil Dealer if the config is valid and
@ -191,6 +199,7 @@ func (qsc *queueSetCompleter) Complete(dCfg fq.DispatchingConfig) fq.QueueSet {
estimatedServiceDuration: 3 * time.Millisecond, estimatedServiceDuration: 3 * time.Millisecond,
reqsGaugePair: qsc.reqsGaugePair, reqsGaugePair: qsc.reqsGaugePair,
execSeatsGauge: qsc.execSeatsGauge, execSeatsGauge: qsc.execSeatsGauge,
seatDemandIntegrator: qsc.seatDemandIntegrator,
qCfg: qsc.qCfg, qCfg: qsc.qCfg,
currentR: 0, currentR: 0,
lastRealTime: qsc.factory.clock.Now(), lastRealTime: qsc.factory.clock.Now(),
@ -408,10 +417,12 @@ func (req *request) wait() (bool, bool) {
if req.removeFromQueueLocked() != nil { if req.removeFromQueueLocked() != nil {
defer qs.boundNextDispatchLocked(queue) defer qs.boundNextDispatchLocked(queue)
qs.totRequestsWaiting-- qs.totRequestsWaiting--
qs.totSeatsWaiting -= req.MaxSeats()
metrics.AddReject(req.ctx, qs.qCfg.Name, req.fsName, "cancelled") metrics.AddReject(req.ctx, qs.qCfg.Name, req.fsName, "cancelled")
metrics.AddRequestsInQueues(req.ctx, qs.qCfg.Name, req.fsName, -1) metrics.AddRequestsInQueues(req.ctx, qs.qCfg.Name, req.fsName, -1)
req.NoteQueued(false) req.NoteQueued(false)
qs.reqsGaugePair.RequestsWaiting.Add(-1) qs.reqsGaugePair.RequestsWaiting.Add(-1)
qs.seatDemandIntegrator.Set(float64(qs.totSeatsInUse + qs.totSeatsWaiting))
} }
return false, qs.isIdleLocked() return false, qs.isIdleLocked()
} }
@ -597,6 +608,7 @@ func (qs *queueSet) shuffleShardLocked(hashValue uint64, descr1, descr2 interfac
// past the requestWaitLimit // past the requestWaitLimit
func (qs *queueSet) removeTimedOutRequestsFromQueueToBoundLocked(queue *queue, fsName string) { func (qs *queueSet) removeTimedOutRequestsFromQueueToBoundLocked(queue *queue, fsName string) {
timeoutCount := 0 timeoutCount := 0
disqueueSeats := 0
now := qs.clock.Now() now := qs.clock.Now()
reqs := queue.requests reqs := queue.requests
// reqs are sorted oldest -> newest // reqs are sorted oldest -> newest
@ -609,6 +621,7 @@ func (qs *queueSet) removeTimedOutRequestsFromQueueToBoundLocked(queue *queue, f
if arrivalLimit.After(req.arrivalTime) { if arrivalLimit.After(req.arrivalTime) {
if req.decision.Set(decisionReject) && req.removeFromQueueLocked() != nil { if req.decision.Set(decisionReject) && req.removeFromQueueLocked() != nil {
timeoutCount++ timeoutCount++
disqueueSeats += req.MaxSeats()
req.NoteQueued(false) req.NoteQueued(false)
metrics.AddRequestsInQueues(req.ctx, qs.qCfg.Name, req.fsName, -1) metrics.AddRequestsInQueues(req.ctx, qs.qCfg.Name, req.fsName, -1)
} }
@ -622,7 +635,9 @@ func (qs *queueSet) removeTimedOutRequestsFromQueueToBoundLocked(queue *queue, f
// remove timed out requests from queue // remove timed out requests from queue
if timeoutCount > 0 { if timeoutCount > 0 {
qs.totRequestsWaiting -= timeoutCount qs.totRequestsWaiting -= timeoutCount
qs.totSeatsWaiting -= disqueueSeats
qs.reqsGaugePair.RequestsWaiting.Add(float64(-timeoutCount)) qs.reqsGaugePair.RequestsWaiting.Add(float64(-timeoutCount))
qs.seatDemandIntegrator.Set(float64(qs.totSeatsInUse + qs.totSeatsWaiting))
} }
} }
@ -657,9 +672,11 @@ func (qs *queueSet) enqueueToBoundLocked(request *request) {
} }
request.removeFromQueueLocked = queue.requests.Enqueue(request) request.removeFromQueueLocked = queue.requests.Enqueue(request)
qs.totRequestsWaiting++ qs.totRequestsWaiting++
qs.totSeatsWaiting += request.MaxSeats()
metrics.AddRequestsInQueues(request.ctx, qs.qCfg.Name, request.fsName, 1) metrics.AddRequestsInQueues(request.ctx, qs.qCfg.Name, request.fsName, 1)
request.NoteQueued(true) request.NoteQueued(true)
qs.reqsGaugePair.RequestsWaiting.Add(1) qs.reqsGaugePair.RequestsWaiting.Add(1)
qs.seatDemandIntegrator.Set(float64(qs.totSeatsInUse + qs.totSeatsWaiting))
} }
// dispatchAsMuchAsPossibleLocked does as many dispatches as possible now. // dispatchAsMuchAsPossibleLocked does as many dispatches as possible now.
@ -690,6 +707,7 @@ func (qs *queueSet) dispatchSansQueueLocked(ctx context.Context, workEstimate *f
metrics.AddRequestConcurrencyInUse(qs.qCfg.Name, fsName, req.MaxSeats()) metrics.AddRequestConcurrencyInUse(qs.qCfg.Name, fsName, req.MaxSeats())
qs.reqsGaugePair.RequestsExecuting.Add(1) qs.reqsGaugePair.RequestsExecuting.Add(1)
qs.execSeatsGauge.Add(float64(req.MaxSeats())) qs.execSeatsGauge.Add(float64(req.MaxSeats()))
qs.seatDemandIntegrator.Set(float64(qs.totSeatsInUse + qs.totSeatsWaiting))
klogV := klog.V(5) klogV := klog.V(5)
if klogV.Enabled() { if klogV.Enabled() {
klogV.Infof("QS(%s) at t=%s R=%v: immediate dispatch of request %q %#+v %#+v, qs will have %d executing", qs.qCfg.Name, now.Format(nsTimeFmt), qs.currentR, fsName, descr1, descr2, qs.totRequestsExecuting) klogV.Infof("QS(%s) at t=%s R=%v: immediate dispatch of request %q %#+v %#+v, qs will have %d executing", qs.qCfg.Name, now.Format(nsTimeFmt), qs.currentR, fsName, descr1, descr2, qs.totRequestsExecuting)
@ -711,11 +729,13 @@ func (qs *queueSet) dispatchLocked() bool {
return false return false
} }
qs.totRequestsWaiting-- qs.totRequestsWaiting--
qs.totSeatsWaiting -= request.MaxSeats()
metrics.AddRequestsInQueues(request.ctx, qs.qCfg.Name, request.fsName, -1) metrics.AddRequestsInQueues(request.ctx, qs.qCfg.Name, request.fsName, -1)
request.NoteQueued(false) request.NoteQueued(false)
qs.reqsGaugePair.RequestsWaiting.Add(-1) qs.reqsGaugePair.RequestsWaiting.Add(-1)
defer qs.boundNextDispatchLocked(queue) defer qs.boundNextDispatchLocked(queue)
if !request.decision.Set(decisionExecute) { if !request.decision.Set(decisionExecute) {
qs.seatDemandIntegrator.Set(float64(qs.totSeatsInUse + qs.totSeatsWaiting))
return true return true
} }
request.startTime = qs.clock.Now() request.startTime = qs.clock.Now()
@ -732,6 +752,7 @@ func (qs *queueSet) dispatchLocked() bool {
metrics.AddRequestConcurrencyInUse(qs.qCfg.Name, request.fsName, request.MaxSeats()) metrics.AddRequestConcurrencyInUse(qs.qCfg.Name, request.fsName, request.MaxSeats())
qs.reqsGaugePair.RequestsExecuting.Add(1) qs.reqsGaugePair.RequestsExecuting.Add(1)
qs.execSeatsGauge.Add(float64(request.MaxSeats())) qs.execSeatsGauge.Add(float64(request.MaxSeats()))
qs.seatDemandIntegrator.Set(float64(qs.totSeatsInUse + qs.totSeatsWaiting))
klogV := klog.V(6) klogV := klog.V(6)
if klogV.Enabled() { if klogV.Enabled() {
klogV.Infof("QS(%s) at t=%s R=%v: dispatching request %#+v %#+v work %v from queue %d with start R %v, queue will have %d waiting & %d requests occupying %d seats, set will have %d seats occupied", klogV.Infof("QS(%s) at t=%s R=%v: dispatching request %#+v %#+v work %v from queue %d with start R %v, queue will have %d waiting & %d requests occupying %d seats, set will have %d seats occupied",
@ -894,6 +915,7 @@ func (qs *queueSet) finishRequestLocked(r *request) {
qs.totSeatsInUse -= r.MaxSeats() qs.totSeatsInUse -= r.MaxSeats()
metrics.AddRequestConcurrencyInUse(qs.qCfg.Name, r.fsName, -r.MaxSeats()) metrics.AddRequestConcurrencyInUse(qs.qCfg.Name, r.fsName, -r.MaxSeats())
qs.execSeatsGauge.Add(-float64(r.MaxSeats())) qs.execSeatsGauge.Add(-float64(r.MaxSeats()))
qs.seatDemandIntegrator.Set(float64(qs.totSeatsInUse + qs.totSeatsWaiting))
if r.queue != nil { if r.queue != nil {
r.queue.seatsInUse -= r.MaxSeats() r.queue.seatsInUse -= r.MaxSeats()
} }
@ -1011,10 +1033,11 @@ func (qs *queueSet) Dump(includeRequestDetails bool) debug.QueueSetDump {
qs.lock.Lock() qs.lock.Lock()
defer qs.lock.Unlock() defer qs.lock.Unlock()
d := debug.QueueSetDump{ d := debug.QueueSetDump{
Queues: make([]debug.QueueDump, len(qs.queues)), Queues: make([]debug.QueueDump, len(qs.queues)),
Waiting: qs.totRequestsWaiting, Waiting: qs.totRequestsWaiting,
Executing: qs.totRequestsExecuting, Executing: qs.totRequestsExecuting,
SeatsInUse: qs.totSeatsInUse, SeatsInUse: qs.totSeatsInUse,
SeatsWaiting: qs.totSeatsWaiting,
} }
for i, q := range qs.queues { for i, q := range qs.queues {
d.Queues[i] = q.dumpLocked(includeRequestDetails) d.Queues[i] = q.dumpLocked(includeRequestDetails)

View File

@ -182,16 +182,18 @@ type uniformScenario struct {
counter counter.GoRoutineCounter counter counter.GoRoutineCounter
expectedAverages []float64 expectedAverages []float64
expectedEpochAdvances int expectedEpochAdvances int
seatDemandIntegratorSubject fq.Integrator
} }
func (us uniformScenario) exercise(t *testing.T) { func (us uniformScenario) exercise(t *testing.T) {
uss := uniformScenarioState{ uss := uniformScenarioState{
t: t, t: t,
uniformScenario: us, uniformScenario: us,
startTime: us.clk.Now(), startTime: us.clk.Now(),
integrators: make([]fq.Integrator, len(us.clients)), execSeatsIntegrators: make([]fq.Integrator, len(us.clients)),
executions: make([]int32, len(us.clients)), seatDemandIntegratorCheck: fq.NewNamedIntegrator(us.clk, us.name+"-seatDemandCheck"),
rejects: make([]int32, len(us.clients)), executions: make([]int32, len(us.clients)),
rejects: make([]int32, len(us.clients)),
} }
for _, uc := range us.clients { for _, uc := range us.clients {
uss.doSplit = uss.doSplit || uc.split uss.doSplit = uss.doSplit || uc.split
@ -204,7 +206,8 @@ type uniformScenarioState struct {
uniformScenario uniformScenario
startTime time.Time startTime time.Time
doSplit bool doSplit bool
integrators []fq.Integrator execSeatsIntegrators []fq.Integrator
seatDemandIntegratorCheck fq.Integrator
failedCount uint64 failedCount uint64
expectedInqueue, expectedExecuting, expectedConcurrencyInUse string expectedInqueue, expectedExecuting, expectedConcurrencyInUse string
executions, rejects []int32 executions, rejects []int32
@ -216,18 +219,18 @@ func (uss *uniformScenarioState) exercise() {
metrics.Reset() metrics.Reset()
} }
for i, uc := range uss.clients { for i, uc := range uss.clients {
uss.integrators[i] = fq.NewIntegrator(uss.clk) uss.execSeatsIntegrators[i] = fq.NewNamedIntegrator(uss.clk, fmt.Sprintf("%s client %d execSeats", uss.name, i))
fsName := fmt.Sprintf("client%d", i) fsName := fmt.Sprintf("client%d", i)
uss.expectedInqueue = uss.expectedInqueue + fmt.Sprintf(` apiserver_flowcontrol_current_inqueue_requests{flow_schema=%q,priority_level=%q} 0%s`, fsName, uss.name, "\n") uss.expectedInqueue = uss.expectedInqueue + fmt.Sprintf(` apiserver_flowcontrol_current_inqueue_requests{flow_schema=%q,priority_level=%q} 0%s`, fsName, uss.name, "\n")
for j := 0; j < uc.nThreads; j++ { for j := 0; j < uc.nThreads; j++ {
ust := uniformScenarioThread{ ust := uniformScenarioThread{
uss: uss, uss: uss,
i: i, i: i,
j: j, j: j,
nCalls: uc.nCalls, nCalls: uc.nCalls,
uc: uc, uc: uc,
igr: uss.integrators[i], execSeatsIntegrator: uss.execSeatsIntegrators[i],
fsName: fsName, fsName: fsName,
} }
ust.start() ust.start()
} }
@ -241,12 +244,12 @@ func (uss *uniformScenarioState) exercise() {
} }
type uniformScenarioThread struct { type uniformScenarioThread struct {
uss *uniformScenarioState uss *uniformScenarioState
i, j int i, j int
nCalls int nCalls int
uc uniformClient uc uniformClient
igr fq.Integrator execSeatsIntegrator fq.Integrator
fsName string fsName string
} }
func (ust *uniformScenarioThread) start() { func (ust *uniformScenarioThread) start() {
@ -269,11 +272,15 @@ func (ust *uniformScenarioThread) callK(k int) {
if k >= ust.nCalls { if k >= ust.nCalls {
return return
} }
maxWidth := float64(uint64max(ust.uc.initialSeats, ust.uc.finalSeats))
ust.uss.seatDemandIntegratorCheck.Add(maxWidth)
returnSeatDemand := func(time.Time) { ust.uss.seatDemandIntegratorCheck.Add(-maxWidth) }
req, idle := ust.uss.qs.StartRequest(context.Background(), &fcrequest.WorkEstimate{InitialSeats: ust.uc.initialSeats, FinalSeats: ust.uc.finalSeats, AdditionalLatency: ust.uc.padDuration}, ust.uc.hash, "", ust.fsName, ust.uss.name, []int{ust.i, ust.j, k}, nil) req, idle := ust.uss.qs.StartRequest(context.Background(), &fcrequest.WorkEstimate{InitialSeats: ust.uc.initialSeats, FinalSeats: ust.uc.finalSeats, AdditionalLatency: ust.uc.padDuration}, ust.uc.hash, "", ust.fsName, ust.uss.name, []int{ust.i, ust.j, k}, nil)
ust.uss.t.Logf("%s: %d, %d, %d got req=%p, idle=%v", ust.uss.clk.Now().Format(nsTimeFmt), ust.i, ust.j, k, req, idle) ust.uss.t.Logf("%s: %d, %d, %d got req=%p, idle=%v", ust.uss.clk.Now().Format(nsTimeFmt), ust.i, ust.j, k, req, idle)
if req == nil { if req == nil {
atomic.AddUint64(&ust.uss.failedCount, 1) atomic.AddUint64(&ust.uss.failedCount, 1)
atomic.AddInt32(&ust.uss.rejects[ust.i], 1) atomic.AddInt32(&ust.uss.rejects[ust.i], 1)
returnSeatDemand(ust.uss.clk.Now())
return return
} }
if idle { if idle {
@ -285,11 +292,12 @@ func (ust *uniformScenarioThread) callK(k int) {
executed = true executed = true
execStart := ust.uss.clk.Now() execStart := ust.uss.clk.Now()
atomic.AddInt32(&ust.uss.executions[ust.i], 1) atomic.AddInt32(&ust.uss.executions[ust.i], 1)
ust.igr.Add(float64(ust.uc.initialSeats)) ust.execSeatsIntegrator.Add(float64(ust.uc.initialSeats))
ust.uss.t.Logf("%s: %d, %d, %d executing; width1=%d", execStart.Format(nsTimeFmt), ust.i, ust.j, k, ust.uc.initialSeats) ust.uss.t.Logf("%s: %d, %d, %d executing; width1=%d", execStart.Format(nsTimeFmt), ust.i, ust.j, k, ust.uc.initialSeats)
ust.uss.clk.EventAfterDuration(ust.genCallK(k+1), ust.uc.execDuration+ust.uc.thinkDuration) ust.uss.clk.EventAfterDuration(ust.genCallK(k+1), ust.uc.execDuration+ust.uc.thinkDuration)
ust.uss.clk.Sleep(ust.uc.execDuration) ust.uss.clk.Sleep(ust.uc.execDuration)
ust.igr.Add(-float64(ust.uc.initialSeats)) ust.execSeatsIntegrator.Add(-float64(ust.uc.initialSeats))
ust.uss.clk.EventAfterDuration(returnSeatDemand, ust.uc.padDuration)
returnTime = ust.uss.clk.Now() returnTime = ust.uss.clk.Now()
}) })
now := ust.uss.clk.Now() now := ust.uss.clk.Now()
@ -297,6 +305,7 @@ func (ust *uniformScenarioThread) callK(k int) {
if !executed { if !executed {
atomic.AddUint64(&ust.uss.failedCount, 1) atomic.AddUint64(&ust.uss.failedCount, 1)
atomic.AddInt32(&ust.uss.rejects[ust.i], 1) atomic.AddInt32(&ust.uss.rejects[ust.i], 1)
returnSeatDemand(ust.uss.clk.Now())
} else if now != returnTime { } else if now != returnTime {
ust.uss.t.Errorf("%s: %d, %d, %d returnTime=%s", now.Format(nsTimeFmt), ust.i, ust.j, k, returnTime.Format(nsTimeFmt)) ust.uss.t.Errorf("%s: %d, %d, %d returnTime=%s", now.Format(nsTimeFmt), ust.i, ust.j, k, returnTime.Format(nsTimeFmt))
} }
@ -319,7 +328,7 @@ func (uss *uniformScenarioState) evalTo(lim time.Time, last, expectFair bool, ma
} }
sep := uc.thinkDuration sep := uc.thinkDuration
demands[i] = float64(nThreads) * float64(uc.initialSeats) * float64(uc.execDuration) / float64(sep+uc.execDuration) demands[i] = float64(nThreads) * float64(uc.initialSeats) * float64(uc.execDuration) / float64(sep+uc.execDuration)
averages[i] = uss.integrators[i].Reset().Average averages[i] = uss.execSeatsIntegrators[i].Reset().Average
} }
fairAverages := uss.expectedAverages fairAverages := uss.expectedAverages
if fairAverages == nil { if fairAverages == nil {
@ -341,6 +350,25 @@ func (uss *uniformScenarioState) evalTo(lim time.Time, last, expectFair bool, ma
uss.t.Logf("%s client %d last=%v expectFair=%v margin=%v got an Average of %v and the expected average was %v", uss.name, i, last, expectFair, margin, averages[i], expectedAverage) uss.t.Logf("%s client %d last=%v expectFair=%v margin=%v got an Average of %v and the expected average was %v", uss.name, i, last, expectFair, margin, averages[i], expectedAverage)
} }
} }
if uss.seatDemandIntegratorSubject != nil {
checkResults := uss.seatDemandIntegratorCheck.GetResults()
subjectResults := uss.seatDemandIntegratorSubject.GetResults()
if float64close(subjectResults.Duration, checkResults.Duration) {
uss.t.Logf("%s last=%v got duration of %v and expected %v", uss.name, last, subjectResults.Duration, checkResults.Duration)
} else {
uss.t.Errorf("%s last=%v got duration of %v but expected %v", uss.name, last, subjectResults.Duration, checkResults.Duration)
}
if got, expected := float64NaNTo0(subjectResults.Average), float64NaNTo0(checkResults.Average); float64close(got, expected) {
uss.t.Logf("%s last=%v got SeatDemand average of %v and expected %v", uss.name, last, got, expected)
} else {
uss.t.Errorf("%s last=%v got SeatDemand average of %v but expected %v", uss.name, last, got, expected)
}
if got, expected := float64NaNTo0(subjectResults.Deviation), float64NaNTo0(checkResults.Deviation); float64close(got, expected) {
uss.t.Logf("%s last=%v got SeatDemand standard deviation of %v and expected %v", uss.name, last, got, expected)
} else {
uss.t.Errorf("%s last=%v got SeatDemand standard deviation of %v but expected %v", uss.name, last, got, expected)
}
}
} }
func (uss *uniformScenarioState) finalReview() { func (uss *uniformScenarioState) finalReview() {
@ -445,7 +473,7 @@ func TestNoRestraint(t *testing.T) {
t.Run(testCase.name, func(t *testing.T) { t.Run(testCase.name, func(t *testing.T) {
now := time.Now() now := time.Now()
clk, counter := testeventclock.NewFake(now, 0, nil) clk, counter := testeventclock.NewFake(now, 0, nil)
nrc, err := test.NewNoRestraintFactory().BeginConstruction(fq.QueuingConfig{}, newGaugePair(clk), newExecSeatsGauge(clk)) nrc, err := test.NewNoRestraintFactory().BeginConstruction(fq.QueuingConfig{}, newGaugePair(clk), newExecSeatsGauge(clk), fq.NewNamedIntegrator(clk, "TestNoRestraint"))
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
@ -481,7 +509,8 @@ func TestBaseline(t *testing.T) {
HandSize: 3, HandSize: 3,
RequestWaitLimit: 10 * time.Minute, RequestWaitLimit: 10 * time.Minute,
} }
qsc, err := qsf.BeginConstruction(qCfg, newGaugePair(clk), newExecSeatsGauge(clk)) seatDemandIntegratorSubject := fq.NewNamedIntegrator(clk, "seatDemandSubject")
qsc, err := qsf.BeginConstruction(qCfg, newGaugePair(clk), newExecSeatsGauge(clk), seatDemandIntegratorSubject)
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
@ -492,15 +521,16 @@ func TestBaseline(t *testing.T) {
clients: []uniformClient{ clients: []uniformClient{
newUniformClient(1001001001, 1, 21, time.Second, 0), newUniformClient(1001001001, 1, 21, time.Second, 0),
}, },
concurrencyLimit: 1, concurrencyLimit: 1,
evalDuration: time.Second * 20, evalDuration: time.Second * 20,
expectedFair: []bool{true}, expectedFair: []bool{true},
expectedFairnessMargin: []float64{0}, expectedFairnessMargin: []float64{0},
expectAllRequests: true, expectAllRequests: true,
evalInqueueMetrics: true, evalInqueueMetrics: true,
evalExecutingMetrics: true, evalExecutingMetrics: true,
clk: clk, clk: clk,
counter: counter, counter: counter,
seatDemandIntegratorSubject: seatDemandIntegratorSubject,
}.exercise(t) }.exercise(t)
} }
@ -550,7 +580,8 @@ func TestSeparations(t *testing.T) {
HandSize: 3, HandSize: 3,
RequestWaitLimit: 10 * time.Minute, RequestWaitLimit: 10 * time.Minute,
} }
qsc, err := qsf.BeginConstruction(qCfg, newGaugePair(clk), newExecSeatsGauge(clk)) seatDemandIntegratorSubject := fq.NewNamedIntegrator(clk, caseName+" seatDemandSubject")
qsc, err := qsf.BeginConstruction(qCfg, newGaugePair(clk), newExecSeatsGauge(clk), seatDemandIntegratorSubject)
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
@ -561,16 +592,17 @@ func TestSeparations(t *testing.T) {
newUniformClient(1001001001, 1, 25, time.Second, seps.think).pad(seps.finalSeats, seps.pad), newUniformClient(1001001001, 1, 25, time.Second, seps.think).pad(seps.finalSeats, seps.pad),
newUniformClient(2002002002, 1, 25, time.Second, seps.think).pad(seps.finalSeats, seps.pad), newUniformClient(2002002002, 1, 25, time.Second, seps.think).pad(seps.finalSeats, seps.pad),
}[:seps.nClients], }[:seps.nClients],
concurrencyLimit: seps.conc, concurrencyLimit: seps.conc,
evalDuration: time.Second * 24, // multiple of every period involved, so that margin can be 0 below evalDuration: time.Second * 24, // multiple of every period involved, so that margin can be 0 below
expectedFair: []bool{true}, expectedFair: []bool{true},
expectedFairnessMargin: []float64{0}, expectedFairnessMargin: []float64{0},
expectAllRequests: true, expectAllRequests: true,
evalInqueueMetrics: true, evalInqueueMetrics: true,
evalExecutingMetrics: true, evalExecutingMetrics: true,
clk: clk, clk: clk,
counter: counter, counter: counter,
expectedAverages: seps.exp, expectedAverages: seps.exp,
seatDemandIntegratorSubject: seatDemandIntegratorSubject,
}.exercise(t) }.exercise(t)
}) })
} }
@ -589,7 +621,8 @@ func TestUniformFlowsHandSize1(t *testing.T) {
HandSize: 1, HandSize: 1,
RequestWaitLimit: 10 * time.Minute, RequestWaitLimit: 10 * time.Minute,
} }
qsc, err := qsf.BeginConstruction(qCfg, newGaugePair(clk), newExecSeatsGauge(clk)) seatDemandIntegratorSubject := fq.NewNamedIntegrator(clk, "seatDemandSubject")
qsc, err := qsf.BeginConstruction(qCfg, newGaugePair(clk), newExecSeatsGauge(clk), seatDemandIntegratorSubject)
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
@ -601,15 +634,16 @@ func TestUniformFlowsHandSize1(t *testing.T) {
newUniformClient(1001001001, 8, 20, time.Second, time.Second-1), newUniformClient(1001001001, 8, 20, time.Second, time.Second-1),
newUniformClient(2002002002, 8, 20, time.Second, time.Second-1), newUniformClient(2002002002, 8, 20, time.Second, time.Second-1),
}, },
concurrencyLimit: 4, concurrencyLimit: 4,
evalDuration: time.Second * 50, evalDuration: time.Second * 50,
expectedFair: []bool{true}, expectedFair: []bool{true},
expectedFairnessMargin: []float64{0.01}, expectedFairnessMargin: []float64{0.01},
expectAllRequests: true, expectAllRequests: true,
evalInqueueMetrics: true, evalInqueueMetrics: true,
evalExecutingMetrics: true, evalExecutingMetrics: true,
clk: clk, clk: clk,
counter: counter, counter: counter,
seatDemandIntegratorSubject: seatDemandIntegratorSubject,
}.exercise(t) }.exercise(t)
} }
@ -626,7 +660,8 @@ func TestUniformFlowsHandSize3(t *testing.T) {
HandSize: 3, HandSize: 3,
RequestWaitLimit: 10 * time.Minute, RequestWaitLimit: 10 * time.Minute,
} }
qsc, err := qsf.BeginConstruction(qCfg, newGaugePair(clk), newExecSeatsGauge(clk)) seatDemandIntegratorSubject := fq.NewNamedIntegrator(clk, qCfg.Name)
qsc, err := qsf.BeginConstruction(qCfg, newGaugePair(clk), newExecSeatsGauge(clk), seatDemandIntegratorSubject)
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
@ -637,15 +672,16 @@ func TestUniformFlowsHandSize3(t *testing.T) {
newUniformClient(400900100100, 8, 30, time.Second, time.Second-1), newUniformClient(400900100100, 8, 30, time.Second, time.Second-1),
newUniformClient(300900200200, 8, 30, time.Second, time.Second-1), newUniformClient(300900200200, 8, 30, time.Second, time.Second-1),
}, },
concurrencyLimit: 4, concurrencyLimit: 4,
evalDuration: time.Second * 60, evalDuration: time.Second * 60,
expectedFair: []bool{true}, expectedFair: []bool{true},
expectedFairnessMargin: []float64{0.03}, expectedFairnessMargin: []float64{0.03},
expectAllRequests: true, expectAllRequests: true,
evalInqueueMetrics: true, evalInqueueMetrics: true,
evalExecutingMetrics: true, evalExecutingMetrics: true,
clk: clk, clk: clk,
counter: counter, counter: counter,
seatDemandIntegratorSubject: seatDemandIntegratorSubject,
}.exercise(t) }.exercise(t)
} }
@ -662,7 +698,8 @@ func TestDifferentFlowsExpectEqual(t *testing.T) {
HandSize: 1, HandSize: 1,
RequestWaitLimit: 10 * time.Minute, RequestWaitLimit: 10 * time.Minute,
} }
qsc, err := qsf.BeginConstruction(qCfg, newGaugePair(clk), newExecSeatsGauge(clk)) seatDemandIntegratorSubject := fq.NewNamedIntegrator(clk, qCfg.Name)
qsc, err := qsf.BeginConstruction(qCfg, newGaugePair(clk), newExecSeatsGauge(clk), seatDemandIntegratorSubject)
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
@ -674,15 +711,16 @@ func TestDifferentFlowsExpectEqual(t *testing.T) {
newUniformClient(1001001001, 8, 20, time.Second, time.Second), newUniformClient(1001001001, 8, 20, time.Second, time.Second),
newUniformClient(2002002002, 7, 30, time.Second, time.Second/2), newUniformClient(2002002002, 7, 30, time.Second, time.Second/2),
}, },
concurrencyLimit: 4, concurrencyLimit: 4,
evalDuration: time.Second * 40, evalDuration: time.Second * 40,
expectedFair: []bool{true}, expectedFair: []bool{true},
expectedFairnessMargin: []float64{0.01}, expectedFairnessMargin: []float64{0.01},
expectAllRequests: true, expectAllRequests: true,
evalInqueueMetrics: true, evalInqueueMetrics: true,
evalExecutingMetrics: true, evalExecutingMetrics: true,
clk: clk, clk: clk,
counter: counter, counter: counter,
seatDemandIntegratorSubject: seatDemandIntegratorSubject,
}.exercise(t) }.exercise(t)
} }
@ -702,7 +740,8 @@ func TestSeatSecondsRollover(t *testing.T) {
HandSize: 1, HandSize: 1,
RequestWaitLimit: 40 * Quarter, RequestWaitLimit: 40 * Quarter,
} }
qsc, err := qsf.BeginConstruction(qCfg, newGaugePair(clk), newExecSeatsGauge(clk)) seatDemandIntegratorSubject := fq.NewNamedIntegrator(clk, qCfg.Name)
qsc, err := qsf.BeginConstruction(qCfg, newGaugePair(clk), newExecSeatsGauge(clk), seatDemandIntegratorSubject)
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
@ -714,16 +753,17 @@ func TestSeatSecondsRollover(t *testing.T) {
newUniformClient(1001001001, 8, 20, Quarter, Quarter).setInitWidth(500), newUniformClient(1001001001, 8, 20, Quarter, Quarter).setInitWidth(500),
newUniformClient(2002002002, 7, 30, Quarter, Quarter/2).setInitWidth(500), newUniformClient(2002002002, 7, 30, Quarter, Quarter/2).setInitWidth(500),
}, },
concurrencyLimit: 2000, concurrencyLimit: 2000,
evalDuration: Quarter * 40, evalDuration: Quarter * 40,
expectedFair: []bool{true}, expectedFair: []bool{true},
expectedFairnessMargin: []float64{0.01}, expectedFairnessMargin: []float64{0.01},
expectAllRequests: true, expectAllRequests: true,
evalInqueueMetrics: true, evalInqueueMetrics: true,
evalExecutingMetrics: true, evalExecutingMetrics: true,
clk: clk, clk: clk,
counter: counter, counter: counter,
expectedEpochAdvances: 8, expectedEpochAdvances: 8,
seatDemandIntegratorSubject: seatDemandIntegratorSubject,
}.exercise(t) }.exercise(t)
} }
@ -740,7 +780,8 @@ func TestDifferentFlowsExpectUnequal(t *testing.T) {
HandSize: 1, HandSize: 1,
RequestWaitLimit: 10 * time.Minute, RequestWaitLimit: 10 * time.Minute,
} }
qsc, err := qsf.BeginConstruction(qCfg, newGaugePair(clk), newExecSeatsGauge(clk)) seatDemandIntegratorSubject := fq.NewNamedIntegrator(clk, qCfg.Name)
qsc, err := qsf.BeginConstruction(qCfg, newGaugePair(clk), newExecSeatsGauge(clk), seatDemandIntegratorSubject)
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
@ -752,15 +793,16 @@ func TestDifferentFlowsExpectUnequal(t *testing.T) {
newUniformClient(1001001001, 4, 20, time.Second, time.Second-1), newUniformClient(1001001001, 4, 20, time.Second, time.Second-1),
newUniformClient(2002002002, 2, 20, time.Second, time.Second-1), newUniformClient(2002002002, 2, 20, time.Second, time.Second-1),
}, },
concurrencyLimit: 3, concurrencyLimit: 3,
evalDuration: time.Second * 20, evalDuration: time.Second * 20,
expectedFair: []bool{true}, expectedFair: []bool{true},
expectedFairnessMargin: []float64{0.01}, expectedFairnessMargin: []float64{0.01},
expectAllRequests: true, expectAllRequests: true,
evalInqueueMetrics: true, evalInqueueMetrics: true,
evalExecutingMetrics: true, evalExecutingMetrics: true,
clk: clk, clk: clk,
counter: counter, counter: counter,
seatDemandIntegratorSubject: seatDemandIntegratorSubject,
}.exercise(t) }.exercise(t)
} }
@ -777,7 +819,8 @@ func TestDifferentWidths(t *testing.T) {
HandSize: 7, HandSize: 7,
RequestWaitLimit: 10 * time.Minute, RequestWaitLimit: 10 * time.Minute,
} }
qsc, err := qsf.BeginConstruction(qCfg, newGaugePair(clk), newExecSeatsGauge(clk)) seatDemandIntegratorSubject := fq.NewNamedIntegrator(clk, qCfg.Name)
qsc, err := qsf.BeginConstruction(qCfg, newGaugePair(clk), newExecSeatsGauge(clk), seatDemandIntegratorSubject)
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
@ -788,15 +831,16 @@ func TestDifferentWidths(t *testing.T) {
newUniformClient(10010010010010, 13, 10, time.Second, time.Second-1), newUniformClient(10010010010010, 13, 10, time.Second, time.Second-1),
newUniformClient(20020020020020, 7, 10, time.Second, time.Second-1).setInitWidth(2), newUniformClient(20020020020020, 7, 10, time.Second, time.Second-1).setInitWidth(2),
}, },
concurrencyLimit: 6, concurrencyLimit: 6,
evalDuration: time.Second * 20, evalDuration: time.Second * 20,
expectedFair: []bool{true}, expectedFair: []bool{true},
expectedFairnessMargin: []float64{0.15}, expectedFairnessMargin: []float64{0.15},
expectAllRequests: true, expectAllRequests: true,
evalInqueueMetrics: true, evalInqueueMetrics: true,
evalExecutingMetrics: true, evalExecutingMetrics: true,
clk: clk, clk: clk,
counter: counter, counter: counter,
seatDemandIntegratorSubject: seatDemandIntegratorSubject,
}.exercise(t) }.exercise(t)
} }
@ -813,7 +857,8 @@ func TestTooWide(t *testing.T) {
HandSize: 7, HandSize: 7,
RequestWaitLimit: 10 * time.Minute, RequestWaitLimit: 10 * time.Minute,
} }
qsc, err := qsf.BeginConstruction(qCfg, newGaugePair(clk), newExecSeatsGauge(clk)) seatDemandIntegratorSubject := fq.NewNamedIntegrator(clk, qCfg.Name)
qsc, err := qsf.BeginConstruction(qCfg, newGaugePair(clk), newExecSeatsGauge(clk), seatDemandIntegratorSubject)
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
@ -827,15 +872,16 @@ func TestTooWide(t *testing.T) {
newUniformClient(70070070070070, 15, 21, time.Second, time.Second-1).setInitWidth(2), newUniformClient(70070070070070, 15, 21, time.Second, time.Second-1).setInitWidth(2),
newUniformClient(90090090090090, 15, 21, time.Second, time.Second-1).setInitWidth(7), newUniformClient(90090090090090, 15, 21, time.Second, time.Second-1).setInitWidth(7),
}, },
concurrencyLimit: 6, concurrencyLimit: 6,
evalDuration: time.Second * 225, evalDuration: time.Second * 225,
expectedFair: []bool{true}, expectedFair: []bool{true},
expectedFairnessMargin: []float64{0.33}, expectedFairnessMargin: []float64{0.33},
expectAllRequests: true, expectAllRequests: true,
evalInqueueMetrics: true, evalInqueueMetrics: true,
evalExecutingMetrics: true, evalExecutingMetrics: true,
clk: clk, clk: clk,
counter: counter, counter: counter,
seatDemandIntegratorSubject: seatDemandIntegratorSubject,
}.exercise(t) }.exercise(t)
} }
@ -874,7 +920,8 @@ func TestWindup(t *testing.T) {
HandSize: 1, HandSize: 1,
RequestWaitLimit: 10 * time.Minute, RequestWaitLimit: 10 * time.Minute,
} }
qsc, err := qsf.BeginConstruction(qCfg, newGaugePair(clk), newExecSeatsGauge(clk)) seatDemandIntegratorSubject := fq.NewNamedIntegrator(clk, qCfg.Name)
qsc, err := qsf.BeginConstruction(qCfg, newGaugePair(clk), newExecSeatsGauge(clk), seatDemandIntegratorSubject)
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
@ -885,15 +932,16 @@ func TestWindup(t *testing.T) {
newUniformClient(1001001001, 2, 40, time.Second, -1), newUniformClient(1001001001, 2, 40, time.Second, -1),
newUniformClient(2002002002, 2, 40, time.Second, -1).setSplit(), newUniformClient(2002002002, 2, 40, time.Second, -1).setSplit(),
}, },
concurrencyLimit: 3, concurrencyLimit: 3,
evalDuration: time.Second * 40, evalDuration: time.Second * 40,
expectedFair: []bool{true, testCase.expectFair2}, expectedFair: []bool{true, testCase.expectFair2},
expectedFairnessMargin: []float64{0.01, testCase.margin2}, expectedFairnessMargin: []float64{0.01, testCase.margin2},
expectAllRequests: true, expectAllRequests: true,
evalInqueueMetrics: true, evalInqueueMetrics: true,
evalExecutingMetrics: true, evalExecutingMetrics: true,
clk: clk, clk: clk,
counter: counter, counter: counter,
seatDemandIntegratorSubject: seatDemandIntegratorSubject,
}.exercise(t) }.exercise(t)
}) })
} }
@ -909,7 +957,8 @@ func TestDifferentFlowsWithoutQueuing(t *testing.T) {
Name: "TestDifferentFlowsWithoutQueuing", Name: "TestDifferentFlowsWithoutQueuing",
DesiredNumQueues: 0, DesiredNumQueues: 0,
} }
qsc, err := qsf.BeginConstruction(qCfg, newGaugePair(clk), newExecSeatsGauge(clk)) seatDemandIntegratorSubject := fq.NewNamedIntegrator(clk, "seatDemandSubject")
qsc, err := qsf.BeginConstruction(qCfg, newGaugePair(clk), newExecSeatsGauge(clk), seatDemandIntegratorSubject)
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
@ -921,14 +970,15 @@ func TestDifferentFlowsWithoutQueuing(t *testing.T) {
newUniformClient(1001001001, 6, 10, time.Second, 57*time.Millisecond), newUniformClient(1001001001, 6, 10, time.Second, 57*time.Millisecond),
newUniformClient(2002002002, 4, 15, time.Second, 750*time.Millisecond), newUniformClient(2002002002, 4, 15, time.Second, 750*time.Millisecond),
}, },
concurrencyLimit: 4, concurrencyLimit: 4,
evalDuration: time.Second * 13, evalDuration: time.Second * 13,
expectedFair: []bool{false}, expectedFair: []bool{false},
expectedFairnessMargin: []float64{0.20}, expectedFairnessMargin: []float64{0.20},
evalExecutingMetrics: true, evalExecutingMetrics: true,
rejectReason: "concurrency-limit", rejectReason: "concurrency-limit",
clk: clk, clk: clk,
counter: counter, counter: counter,
seatDemandIntegratorSubject: seatDemandIntegratorSubject,
}.exercise(t) }.exercise(t)
} }
@ -945,7 +995,8 @@ func TestTimeout(t *testing.T) {
HandSize: 1, HandSize: 1,
RequestWaitLimit: 0, RequestWaitLimit: 0,
} }
qsc, err := qsf.BeginConstruction(qCfg, newGaugePair(clk), newExecSeatsGauge(clk)) seatDemandIntegratorSubject := fq.NewNamedIntegrator(clk, qCfg.Name)
qsc, err := qsf.BeginConstruction(qCfg, newGaugePair(clk), newExecSeatsGauge(clk), seatDemandIntegratorSubject)
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
@ -956,15 +1007,16 @@ func TestTimeout(t *testing.T) {
clients: []uniformClient{ clients: []uniformClient{
newUniformClient(1001001001, 5, 100, time.Second, time.Second), newUniformClient(1001001001, 5, 100, time.Second, time.Second),
}, },
concurrencyLimit: 1, concurrencyLimit: 1,
evalDuration: time.Second * 10, evalDuration: time.Second * 10,
expectedFair: []bool{true}, expectedFair: []bool{true},
expectedFairnessMargin: []float64{0.01}, expectedFairnessMargin: []float64{0.01},
evalInqueueMetrics: true, evalInqueueMetrics: true,
evalExecutingMetrics: true, evalExecutingMetrics: true,
rejectReason: "time-out", rejectReason: "time-out",
clk: clk, clk: clk,
counter: counter, counter: counter,
seatDemandIntegratorSubject: seatDemandIntegratorSubject,
}.exercise(t) }.exercise(t)
} }
@ -996,7 +1048,8 @@ func TestContextCancel(t *testing.T) {
HandSize: 1, HandSize: 1,
RequestWaitLimit: 15 * time.Second, RequestWaitLimit: 15 * time.Second,
} }
qsc, err := qsf.BeginConstruction(qCfg, newGaugePair(clk), newExecSeatsGauge(clk)) seatDemandIntegratorSubject := fq.NewNamedIntegrator(clk, qCfg.Name)
qsc, err := qsf.BeginConstruction(qCfg, newGaugePair(clk), newExecSeatsGauge(clk), seatDemandIntegratorSubject)
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
@ -1102,7 +1155,7 @@ func TestTotalRequestsExecutingWithPanic(t *testing.T) {
DesiredNumQueues: 0, DesiredNumQueues: 0,
RequestWaitLimit: 15 * time.Second, RequestWaitLimit: 15 * time.Second,
} }
qsc, err := qsf.BeginConstruction(qCfg, newGaugePair(clk), newExecSeatsGauge(clk)) qsc, err := qsf.BeginConstruction(qCfg, newGaugePair(clk), newExecSeatsGauge(clk), fq.NewNamedIntegrator(clk, qCfg.Name))
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
@ -1286,6 +1339,7 @@ func TestFindDispatchQueueLocked(t *testing.T) {
t.Run(test.name, func(t *testing.T) { t.Run(test.name, func(t *testing.T) {
qs := &queueSet{ qs := &queueSet{
estimatedServiceDuration: G, estimatedServiceDuration: G,
seatDemandIntegrator: fq.NewNamedIntegrator(clock.RealClock{}, "seatDemandSubject"),
robinIndex: test.robinIndex, robinIndex: test.robinIndex,
totSeatsInUse: test.totSeatsInUse, totSeatsInUse: test.totSeatsInUse,
qCfg: fq.QueuingConfig{Name: "TestSelectQueueLocked/" + test.name}, qCfg: fq.QueuingConfig{Name: "TestSelectQueueLocked/" + test.name},
@ -1359,6 +1413,7 @@ func TestFinishRequestLocked(t *testing.T) {
estimatedServiceDuration: time.Second, estimatedServiceDuration: time.Second,
reqsGaugePair: newGaugePair(clk), reqsGaugePair: newGaugePair(clk),
execSeatsGauge: newExecSeatsGauge(clk), execSeatsGauge: newExecSeatsGauge(clk),
seatDemandIntegrator: fq.NewNamedIntegrator(clk, "seatDemandSubject"),
} }
queue := &queue{ queue := &queue{
requests: newRequestFIFO(), requests: newRequestFIFO(),
@ -1468,3 +1523,28 @@ func newGaugePair(clk clock.PassiveClock) metrics.RatioedGaugePair {
func newExecSeatsGauge(clk clock.PassiveClock) metrics.RatioedGauge { func newExecSeatsGauge(clk clock.PassiveClock) metrics.RatioedGauge {
return metrics.PriorityLevelExecutionSeatsGaugeVec.NewForLabelValuesSafe(0, 1, []string{"test"}) return metrics.PriorityLevelExecutionSeatsGaugeVec.NewForLabelValuesSafe(0, 1, []string{"test"})
} }
func float64close(x, y float64) bool {
x0 := float64NaNTo0(x)
y0 := float64NaNTo0(y)
diff := math.Abs(x - y)
den := math.Max(math.Abs(x0), math.Abs(y0))
if den == 0 {
return diff < 1e-10
}
return diff/den < 1e-10
}
func uint64max(a, b uint64) uint64 {
if b > a {
return b
}
return a
}
func float64NaNTo0(x float64) float64 {
if math.IsNaN(x) {
return 0
}
return x
}

View File

@ -40,7 +40,7 @@ type noRestraint struct{}
type noRestraintRequest struct{} type noRestraintRequest struct{}
func (noRestraintFactory) BeginConstruction(fq.QueuingConfig, metrics.RatioedGaugePair, metrics.RatioedGauge) (fq.QueueSetCompleter, error) { func (noRestraintFactory) BeginConstruction(fq.QueuingConfig, metrics.RatioedGaugePair, metrics.RatioedGauge, fq.Integrator) (fq.QueueSetCompleter, error) {
return noRestraintCompleter{}, nil return noRestraintCompleter{}, nil
} }

View File

@ -23,12 +23,15 @@ import (
"testing" "testing"
"time" "time"
"k8s.io/utils/clock"
flowcontrol "k8s.io/api/flowcontrol/v1beta3" flowcontrol "k8s.io/api/flowcontrol/v1beta3"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/sets" "k8s.io/apimachinery/pkg/util/sets"
fcboot "k8s.io/apiserver/pkg/apis/flowcontrol/bootstrap" fcboot "k8s.io/apiserver/pkg/apis/flowcontrol/bootstrap"
"k8s.io/apiserver/pkg/authentication/user" "k8s.io/apiserver/pkg/authentication/user"
"k8s.io/apiserver/pkg/endpoints/request" "k8s.io/apiserver/pkg/endpoints/request"
fq "k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing"
fqtesting "k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/testing" fqtesting "k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/testing"
fcfmt "k8s.io/apiserver/pkg/util/flowcontrol/format" fcfmt "k8s.io/apiserver/pkg/util/flowcontrol/format"
"k8s.io/apiserver/pkg/util/flowcontrol/metrics" "k8s.io/apiserver/pkg/util/flowcontrol/metrics"
@ -57,7 +60,7 @@ func genPL(rng *rand.Rand, name string) *flowcontrol.PriorityLevelConfiguration
QueueLengthLimit: 5} QueueLengthLimit: 5}
} }
labelVals := []string{"test"} labelVals := []string{"test"}
_, err := queueSetCompleterForPL(noRestraintQSF, nil, plc, time.Minute, metrics.RatioedGaugeVecPhasedElementPair(metrics.PriorityLevelConcurrencyGaugeVec, 1, 1, labelVals), metrics.PriorityLevelExecutionSeatsGaugeVec.NewForLabelValuesSafe(0, 1, labelVals)) _, err := queueSetCompleterForPL(noRestraintQSF, nil, plc, time.Minute, metrics.RatioedGaugeVecPhasedElementPair(metrics.PriorityLevelConcurrencyGaugeVec, 1, 1, labelVals), metrics.PriorityLevelExecutionSeatsGaugeVec.NewForLabelValuesSafe(0, 1, labelVals), fq.NewNamedIntegrator(clock.RealClock{}, name))
if err != nil { if err != nil {
panic(err) panic(err)
} }