Make QueueSet support exempt behavior; use it

Signed-off-by: Mike Spreitzer <mspreitz@us.ibm.com>
This commit is contained in:
Mike Spreitzer 2023-06-28 22:55:30 -04:00
parent 8ffbbe455b
commit f269acd12b
6 changed files with 166 additions and 157 deletions

View File

@ -197,16 +197,15 @@ type priorityLevelState struct {
pl *flowcontrol.PriorityLevelConfiguration
// qsCompleter holds the QueueSetCompleter derived from `config`
// and `queues` if config is not exempt, nil otherwise.
// and `queues`.
qsCompleter fq.QueueSetCompleter
// The QueueSet for this priority level. This is nil if and only
// if the priority level is exempt.
// The QueueSet for this priority level.
// Never nil.
queues fq.QueueSet
// quiescing==true indicates that this priority level should be
// removed when its queues have all drained. May be true only if
// queues is non-nil.
// removed when its queues have all drained.
quiescing bool
// number of goroutines between Controller::Match and calling the
@ -384,9 +383,6 @@ func (cfgCtlr *configController) updateBorrowingLocked(setCompleters bool, plSta
items := make([]allocProblemItem, 0, len(plStates))
plNames := make([]string, 0, len(plStates))
for plName, plState := range plStates {
if plState.pl.Spec.Limited == nil {
continue
}
obs := plState.seatDemandIntegrator.Reset()
plState.seatDemandStats.update(obs)
// Lower bound on this priority level's adjusted concurreny limit is the lesser of:
@ -403,7 +399,7 @@ func (cfgCtlr *configController) updateBorrowingLocked(setCompleters bool, plSta
})
}
if len(items) == 0 && cfgCtlr.nominalCLSum > 0 {
klog.ErrorS(nil, "Impossible: no non-exempt priority levels", "plStates", cfgCtlr.priorityLevelStates)
klog.ErrorS(nil, "Impossible: no priority levels", "plStates", cfgCtlr.priorityLevelStates)
return
}
allocs, fairFrac, err := computeConcurrencyAllocation(cfgCtlr.nominalCLSum, items)
@ -412,17 +408,11 @@ func (cfgCtlr *configController) updateBorrowingLocked(setCompleters bool, plSta
allocs = make([]float64, len(items))
for idx, plName := range plNames {
plState := plStates[plName]
if plState.pl.Spec.Limited == nil {
continue
}
allocs[idx] = float64(plState.currentCL)
}
}
for idx, plName := range plNames {
plState := plStates[plName]
if plState.pl.Spec.Limited == nil {
continue
}
if setCompleters {
qsCompleter, err := queueSetCompleterForPL(cfgCtlr.queueSetFactory, plState.queues,
plState.pl, cfgCtlr.requestWaitLimit, plState.reqsGaugePair, plState.execSeatsObs,
@ -441,8 +431,15 @@ func (cfgCtlr *configController) updateBorrowingLocked(setCompleters bool, plSta
if relChange >= 0.05 {
logLevel = 2
}
klog.V(logLevel).InfoS("Update CurrentCL", "plName", plName, "seatDemandHighWatermark", plState.seatDemandStats.highWatermark, "seatDemandAvg", plState.seatDemandStats.avg, "seatDemandStdev", plState.seatDemandStats.stdDev, "seatDemandSmoothed", plState.seatDemandStats.smoothed, "fairFrac", fairFrac, "currentCL", currentCL, "backstop", err != nil)
plState.queues = plState.qsCompleter.Complete(fq.DispatchingConfig{ConcurrencyLimit: currentCL})
var concurrencyDenominator int
if currentCL > 0 {
concurrencyDenominator = currentCL
} else {
concurrencyDenominator = int(math.Max(1, math.Round(float64(cfgCtlr.serverConcurrencyLimit)/10)))
}
plState.seatDemandRatioedGauge.SetDenominator(float64(concurrencyDenominator))
klog.V(logLevel).InfoS("Update CurrentCL", "plName", plName, "seatDemandHighWatermark", plState.seatDemandStats.highWatermark, "seatDemandAvg", plState.seatDemandStats.avg, "seatDemandStdev", plState.seatDemandStats.stdDev, "seatDemandSmoothed", plState.seatDemandStats.smoothed, "fairFrac", fairFrac, "currentCL", currentCL, "concurrencyDenominator", concurrencyDenominator, "backstop", err != nil)
plState.queues = plState.qsCompleter.Complete(fq.DispatchingConfig{ConcurrencyLimit: currentCL, ConcurrencyDenominator: concurrencyDenominator})
}
metrics.SetFairFrac(float64(fairFrac))
}
@ -690,9 +687,8 @@ func (meal *cfgMeal) digestNewPLsLocked(newPLs []*flowcontrol.PriorityLevelConfi
klog.V(3).Infof("Priority level %q was undesired and has become desired again", pl.Name)
state.quiescing = false
}
if state.pl.Spec.Limited != nil {
meal.shareSum += float64(state.pl.Spec.Limited.NominalConcurrencyShares)
}
nominalConcurrencyShares, _, _ := plSpecCommons(state.pl)
meal.shareSum += float64(nominalConcurrencyShares)
meal.haveExemptPL = meal.haveExemptPL || pl.Name == flowcontrol.PriorityLevelConfigurationNameExempt
meal.haveCatchAllPL = meal.haveCatchAllPL || pl.Name == flowcontrol.PriorityLevelConfigurationNameCatchAll
}
@ -765,15 +761,15 @@ func (meal *cfgMeal) processOldPLsLocked() {
continue
}
if plName == flowcontrol.PriorityLevelConfigurationNameExempt && !meal.haveExemptPL || plName == flowcontrol.PriorityLevelConfigurationNameCatchAll && !meal.haveCatchAllPL {
// BTW, we know the Spec has not changed because the
// mandatory objects have immutable Specs
// BTW, we know the Spec has not changed what is says about queuing because the
// mandatory objects have immutable Specs as far as queuing is concerned.
klog.V(3).Infof("Retaining mandatory priority level %q despite lack of API object", plName)
} else {
if plState.queues == nil || plState.numPending == 0 && plState.queues.IsIdle() {
// Either there are no queues or they are done
if plState.numPending == 0 && plState.queues.IsIdle() {
// The QueueSet is done
// draining and no use is coming from another
// goroutine
klog.V(3).Infof("Removing undesired priority level %q (nilQueues=%v), Type=%v", plName, plState.queues == nil, plState.pl.Spec.Type)
klog.V(3).Infof("Removing undesired priority level %q, Type=%v", plName, plState.pl.Spec.Type)
continue
}
if !plState.quiescing {
@ -789,15 +785,14 @@ func (meal *cfgMeal) processOldPLsLocked() {
// This can not happen because queueSetCompleterForPL already approved this config
panic(fmt.Sprintf("%s from name=%q spec=%s", err, plName, fcfmt.Fmt(plState.pl.Spec)))
}
if plState.pl.Spec.Limited != nil {
// We deliberately include the lingering priority levels
// here so that their queues get some concurrency and they
// continue to drain. During this interim a lingering
// priority level continues to get a concurrency
// allocation determined by all the share values in the
// regular way.
meal.shareSum += float64(plState.pl.Spec.Limited.NominalConcurrencyShares)
}
// We deliberately include the lingering priority levels
// here so that their queues get some concurrency and they
// continue to drain. During this interim a lingering
// priority level continues to get a concurrency
// allocation determined by all the share values in the
// regular way.
nominalConcurrencyShares, _, _ := plSpecCommons(plState.pl)
meal.shareSum += float64(nominalConcurrencyShares)
meal.haveExemptPL = meal.haveExemptPL || plName == flowcontrol.PriorityLevelConfigurationNameExempt
meal.haveCatchAllPL = meal.haveCatchAllPL || plName == flowcontrol.PriorityLevelConfigurationNameCatchAll
meal.newPLStates[plName] = plState
@ -809,41 +804,35 @@ func (meal *cfgMeal) processOldPLsLocked() {
// QueueSets.
func (meal *cfgMeal) finishQueueSetReconfigsLocked() {
for plName, plState := range meal.newPLStates {
if plState.pl.Spec.Limited == nil {
klog.V(5).Infof("Using exempt priority level %q: quiescing=%v", plName, plState.quiescing)
continue
}
limited := plState.pl.Spec.Limited
nominalConcurrencyShares, lendablePercent, borrowingLimitPercent := plSpecCommons(plState.pl)
// The use of math.Ceil here means that the results might sum
// to a little more than serverConcurrencyLimit but the
// difference will be negligible.
concurrencyLimit := int(math.Ceil(float64(meal.cfgCtlr.serverConcurrencyLimit) * float64(limited.NominalConcurrencyShares) / meal.shareSum))
concurrencyLimit := int(math.Ceil(float64(meal.cfgCtlr.serverConcurrencyLimit) * float64(nominalConcurrencyShares) / meal.shareSum))
var lendableCL, borrowingCL int
if limited.LendablePercent != nil {
lendableCL = int(math.Round(float64(concurrencyLimit) * float64(*limited.LendablePercent) / 100))
if lendablePercent != nil {
lendableCL = int(math.Round(float64(concurrencyLimit) * float64(*lendablePercent) / 100))
}
if limited.BorrowingLimitPercent != nil {
borrowingCL = int(math.Round(float64(concurrencyLimit) * float64(*limited.BorrowingLimitPercent) / 100))
if borrowingLimitPercent != nil {
borrowingCL = int(math.Round(float64(concurrencyLimit) * float64(*borrowingLimitPercent) / 100))
} else {
borrowingCL = meal.cfgCtlr.serverConcurrencyLimit
}
metrics.SetPriorityLevelConfiguration(plName, concurrencyLimit, concurrencyLimit-lendableCL, concurrencyLimit+borrowingCL)
plState.seatDemandRatioedGauge.SetDenominator(float64(concurrencyLimit))
cfgChanged := plState.nominalCL != concurrencyLimit || plState.minCL != concurrencyLimit-lendableCL || plState.maxCL != concurrencyLimit+borrowingCL
plState.nominalCL = concurrencyLimit
plState.minCL = concurrencyLimit - lendableCL
plState.maxCL = concurrencyLimit + borrowingCL
meal.maxExecutingRequests += concurrencyLimit
var waitLimit int
if qCfg := limited.LimitResponse.Queuing; qCfg != nil {
waitLimit = int(qCfg.Queues * qCfg.QueueLengthLimit)
if limited := plState.pl.Spec.Limited; limited != nil {
if qCfg := limited.LimitResponse.Queuing; qCfg != nil {
meal.maxWaitingRequests += int(qCfg.Queues * qCfg.QueueLengthLimit)
}
}
meal.maxWaitingRequests += waitLimit
if plState.queues == nil {
initialCL := concurrencyLimit - lendableCL/2
klog.V(2).Infof("Introducing queues for priority level %q: config=%s, nominalCL=%d, lendableCL=%d, borrowingCL=%d, currentCL=%d, quiescing=%v (shares=%v, shareSum=%v)", plName, fcfmt.Fmt(plState.pl.Spec), concurrencyLimit, lendableCL, borrowingCL, initialCL, plState.quiescing, plState.pl.Spec.Limited.NominalConcurrencyShares, meal.shareSum)
klog.V(2).Infof("Introducing queues for priority level %q: config=%s, nominalCL=%d, lendableCL=%d, borrowingCL=%d, currentCL=%d, quiescing=%v (shares=%v, shareSum=%v)", plName, fcfmt.Fmt(plState.pl.Spec), concurrencyLimit, lendableCL, borrowingCL, initialCL, plState.quiescing, nominalConcurrencyShares, meal.shareSum)
plState.seatDemandStats = seatDemandStats{}
plState.currentCL = initialCL
} else {
@ -851,7 +840,7 @@ func (meal *cfgMeal) finishQueueSetReconfigsLocked() {
if cfgChanged {
logLevel = 2
}
klog.V(logLevel).Infof("Retaining queues for priority level %q: config=%s, nominalCL=%d, lendableCL=%d, borrowingCL=%d, currentCL=%d, quiescing=%v, numPending=%d (shares=%v, shareSum=%v)", plName, fcfmt.Fmt(plState.pl.Spec), concurrencyLimit, lendableCL, borrowingCL, plState.currentCL, plState.quiescing, plState.numPending, plState.pl.Spec.Limited.NominalConcurrencyShares, meal.shareSum)
klog.V(logLevel).Infof("Retaining queues for priority level %q: config=%s, nominalCL=%d, lendableCL=%d, borrowingCL=%d, currentCL=%d, quiescing=%v, numPending=%d (shares=%v, shareSum=%v)", plName, fcfmt.Fmt(plState.pl.Spec), concurrencyLimit, lendableCL, borrowingCL, plState.currentCL, plState.quiescing, plState.numPending, nominalConcurrencyShares, meal.shareSum)
}
}
meal.cfgCtlr.nominalCLSum = meal.maxExecutingRequests
@ -859,32 +848,32 @@ func (meal *cfgMeal) finishQueueSetReconfigsLocked() {
}
// queueSetCompleterForPL returns an appropriate QueueSetCompleter for the
// given priority level configuration. Returns nil if that config
// does not call for limiting. Returns nil and an error if the given
// given priority level configuration. Returns nil and an error if the given
// object is malformed in a way that is a problem for this package.
func queueSetCompleterForPL(qsf fq.QueueSetFactory, queues fq.QueueSet, pl *flowcontrol.PriorityLevelConfiguration, requestWaitLimit time.Duration, reqsIntPair metrics.RatioedGaugePair, execSeatsObs metrics.RatioedGauge, seatDemandGauge metrics.Gauge) (fq.QueueSetCompleter, error) {
if (pl.Spec.Type == flowcontrol.PriorityLevelEnablementExempt) != (pl.Spec.Limited == nil) {
return nil, errors.New("broken union structure at the top")
if (pl.Spec.Type == flowcontrol.PriorityLevelEnablementLimited) != (pl.Spec.Limited != nil) {
return nil, errors.New("broken union structure at the top, for Limited")
}
if (pl.Spec.Type == flowcontrol.PriorityLevelEnablementExempt) != (pl.Name == flowcontrol.PriorityLevelConfigurationNameExempt) {
// This package does not attempt to cope with a priority level dynamically switching between exempt and not.
return nil, errors.New("non-alignment between name and type")
}
if pl.Spec.Limited == nil {
return nil, nil
}
if (pl.Spec.Limited.LimitResponse.Type == flowcontrol.LimitResponseTypeReject) != (pl.Spec.Limited.LimitResponse.Queuing == nil) {
return nil, errors.New("broken union structure for limit response")
}
qcAPI := pl.Spec.Limited.LimitResponse.Queuing
qcQS := fq.QueuingConfig{Name: pl.Name}
if qcAPI != nil {
qcQS = fq.QueuingConfig{Name: pl.Name,
DesiredNumQueues: int(qcAPI.Queues),
QueueLengthLimit: int(qcAPI.QueueLengthLimit),
HandSize: int(qcAPI.HandSize),
RequestWaitLimit: requestWaitLimit,
if pl.Spec.Limited != nil {
if (pl.Spec.Limited.LimitResponse.Type == flowcontrol.LimitResponseTypeReject) != (pl.Spec.Limited.LimitResponse.Queuing == nil) {
return nil, errors.New("broken union structure for limit response")
}
qcAPI := pl.Spec.Limited.LimitResponse.Queuing
if qcAPI != nil {
qcQS = fq.QueuingConfig{Name: pl.Name,
DesiredNumQueues: int(qcAPI.Queues),
QueueLengthLimit: int(qcAPI.QueueLengthLimit),
HandSize: int(qcAPI.HandSize),
RequestWaitLimit: requestWaitLimit,
}
}
} else {
qcQS = fq.QueuingConfig{Name: pl.Name, DesiredNumQueues: -1}
}
var qsc fq.QueueSetCompleter
var err error
@ -894,7 +883,7 @@ func queueSetCompleterForPL(qsf fq.QueueSetFactory, queues fq.QueueSet, pl *flow
qsc, err = qsf.BeginConstruction(qcQS, reqsIntPair, execSeatsObs, seatDemandGauge)
}
if err != nil {
err = fmt.Errorf("priority level %q has QueuingConfiguration %#+v, which is invalid: %w", pl.Name, qcAPI, err)
err = fmt.Errorf("priority level %q has QueuingConfiguration %#+v, which is invalid: %w", pl.Name, qcQS, err)
}
return qsc, err
}
@ -962,13 +951,6 @@ func (meal *cfgMeal) imaginePL(proto *flowcontrol.PriorityLevelConfiguration, re
}
}
type immediateRequest struct{}
func (immediateRequest) Finish(execute func()) bool {
execute()
return false
}
// startRequest classifies and, if appropriate, enqueues the request.
// Returns a nil Request if and only if the request is to be rejected.
// The returned bool indicates whether the request is exempt from
@ -1007,32 +989,31 @@ func (cfgCtlr *configController) startRequest(ctx context.Context, rd RequestDig
}
plName := selectedFlowSchema.Spec.PriorityLevelConfiguration.Name
plState := cfgCtlr.priorityLevelStates[plName]
if plState.pl.Spec.Type == flowcontrol.PriorityLevelEnablementExempt {
noteFn(selectedFlowSchema, plState.pl, "")
klog.V(7).Infof("startRequest(%#+v) => fsName=%q, distMethod=%#+v, plName=%q, immediate", rd, selectedFlowSchema.Name, selectedFlowSchema.Spec.DistinguisherMethod, plName)
return selectedFlowSchema, plState.pl, true, immediateRequest{}, time.Time{}
}
var numQueues int32
if plState.pl.Spec.Limited.LimitResponse.Type == flowcontrol.LimitResponseTypeQueue {
numQueues = plState.pl.Spec.Limited.LimitResponse.Queuing.Queues
}
var flowDistinguisher string
var hashValue uint64
if numQueues > 1 {
flowDistinguisher = computeFlowDistinguisher(rd, selectedFlowSchema.Spec.DistinguisherMethod)
hashValue = hashFlowID(selectedFlowSchema.Name, flowDistinguisher)
var flowDistinguisher string
if plState.pl.Spec.Type != flowcontrol.PriorityLevelEnablementExempt {
if plState.pl.Spec.Limited.LimitResponse.Type == flowcontrol.LimitResponseTypeQueue {
numQueues = plState.pl.Spec.Limited.LimitResponse.Queuing.Queues
}
if numQueues > 1 {
flowDistinguisher = computeFlowDistinguisher(rd, selectedFlowSchema.Spec.DistinguisherMethod)
hashValue = hashFlowID(selectedFlowSchema.Name, flowDistinguisher)
}
}
noteFn(selectedFlowSchema, plState.pl, flowDistinguisher)
workEstimate := workEstimator()
startWaitingTime = cfgCtlr.clock.Now()
if plState.pl.Spec.Type != flowcontrol.PriorityLevelEnablementExempt {
startWaitingTime = cfgCtlr.clock.Now()
}
klog.V(7).Infof("startRequest(%#+v) => fsName=%q, distMethod=%#+v, plName=%q, numQueues=%d", rd, selectedFlowSchema.Name, selectedFlowSchema.Spec.DistinguisherMethod, plName, numQueues)
req, idle := plState.queues.StartRequest(ctx, &workEstimate, hashValue, flowDistinguisher, selectedFlowSchema.Name, rd.RequestInfo, rd.User, queueNoteFn)
if idle {
cfgCtlr.maybeReapReadLocked(plName, plState)
}
return selectedFlowSchema, plState.pl, false, req, startWaitingTime
return selectedFlowSchema, plState.pl, plState.pl.Spec.Type == flowcontrol.PriorityLevelEnablementExempt, req, startWaitingTime
}
// maybeReap will remove the last internal traces of the named
@ -1046,10 +1027,6 @@ func (cfgCtlr *configController) maybeReap(plName string) {
klog.V(7).Infof("plName=%s, plState==nil", plName)
return
}
if plState.queues == nil {
klog.V(7).Infof("plName=%s, plState.queues==nil", plName)
return
}
useless := plState.quiescing && plState.numPending == 0 && plState.queues.IsIdle()
klog.V(7).Infof("plState.quiescing=%v, plState.numPending=%d, useless=%v", plState.quiescing, plState.numPending, useless)
if !useless {
@ -1107,3 +1084,12 @@ func relDiff(x, y float64) float64 {
}
return diff / den
}
// plSpecCommons returns the (NominalConcurrencyShares, LendablePercent, BorrowingLimitPercent) of the given priority level config
func plSpecCommons(pl *flowcontrol.PriorityLevelConfiguration) (int32, *int32, *int32) {
if limiter := pl.Spec.Limited; limiter != nil {
return limiter.NominalConcurrencyShares, limiter.LendablePercent, limiter.BorrowingLimitPercent
}
var zero int32
return 0, &zero, &zero
}

View File

@ -75,22 +75,6 @@ func (cfgCtlr *configController) dumpPriorityLevels(w http.ResponseWriter, r *ht
continue
}
if plState.queues == nil {
tabPrint(tabWriter, row(
plState.pl.Name, // 1
"<none>", // 2
"<none>", // 3
"<none>", // 4
"<none>", // 5
"<none>", // 6
"<none>", // 7
"<none>", // 8
"<none>", // 9
"<none>", // 10
))
endLine(tabWriter)
continue
}
queueSetDigest := plState.queues.Dump(false)
activeQueueNum := 0
for _, q := range queueSetDigest.Queues {
@ -134,21 +118,6 @@ func (cfgCtlr *configController) dumpQueues(w http.ResponseWriter, r *http.Reque
tabPrint(tabWriter, rowForHeaders(columnHeaders))
endLine(tabWriter)
for _, plState := range cfgCtlr.priorityLevelStates {
if plState.queues == nil {
tabPrint(tabWriter, row(
plState.pl.Name, // 1
"<none>", // 2
"<none>", // 3
"<none>", // 4
"<none>", // 5
"<none>", // 6
"<none>", // 7
"<none>", // 8
"<none>", // 9
))
endLine(tabWriter)
continue
}
queueSetDigest := plState.queues.Dump(false)
for i, q := range queueSetDigest.Queues {
tabPrint(tabWriter, row(
@ -201,9 +170,6 @@ func (cfgCtlr *configController) dumpRequests(w http.ResponseWriter, r *http.Req
}
endLine(tabWriter)
for _, plState := range cfgCtlr.priorityLevelStates {
if plState.queues == nil {
continue
}
queueSetDigest := plState.queues.Dump(includeRequestDetails)
for iq, q := range queueSetDigest.Queues {
for ir, r := range q.Requests {

View File

@ -221,17 +221,12 @@ func (cts *ctlrTestState) popHeldRequest() (plName string, hr *heldRequest, nCou
}
}
var mandQueueSetNames, exclQueueSetNames = func() (sets.String, sets.String) {
var mandQueueSetNames = func() sets.String {
mandQueueSetNames := sets.NewString()
exclQueueSetNames := sets.NewString()
for _, mpl := range fcboot.MandatoryPriorityLevelConfigurations {
if mpl.Spec.Type == flowcontrol.PriorityLevelEnablementExempt {
exclQueueSetNames.Insert(mpl.Name)
} else {
mandQueueSetNames.Insert(mpl.Name)
}
mandQueueSetNames.Insert(mpl.Name)
}
return mandQueueSetNames, exclQueueSetNames
return mandQueueSetNames
}()
func TestConfigConsumer(t *testing.T) {
@ -280,7 +275,7 @@ func TestConfigConsumer(t *testing.T) {
}
}
persistingPLNames = nextPLNames.Union(desiredPLNames)
expectedQueueSetNames := persistingPLNames.Union(mandQueueSetNames).Difference(exclQueueSetNames)
expectedQueueSetNames := persistingPLNames.Union(mandQueueSetNames)
allQueueSetNames := cts.getQueueSetNames()
missingQueueSetNames := expectedQueueSetNames.Difference(allQueueSetNames)
if len(missingQueueSetNames) > 0 {

View File

@ -34,7 +34,10 @@ type QueueSetFactory interface {
// BeginConstruction does the first phase of creating a QueueSet.
// The RatioedGaugePair observes number of requests,
// execution covering just the regular phase.
// The denominator for the waiting phase is
// max(1, QueuingConfig.QueueLengthLimit) X max(1, QueuingConfig.DesiredNumQueues).
// The RatioedGauge observes number of seats occupied through all phases of execution.
// The denominator for all the ratioed concurrency gauges is supplied later in the DispatchingConfig.
// The Gauge observes the seat demand (executing + queued seats).
BeginConstruction(QueuingConfig, metrics.RatioedGaugePair, metrics.RatioedGauge, metrics.Gauge) (QueueSetCompleter, error)
}
@ -113,8 +116,11 @@ type QueuingConfig struct {
Name string
// DesiredNumQueues is the number of queues that the API says
// should exist now. This may be zero, in which case
// should exist now. This may be non-positive, in which case
// QueueLengthLimit, HandSize, and RequestWaitLimit are ignored.
// A value of zero means to respect the ConcurrencyLimit of the DispatchingConfig.
// A negative value means to always dispatch immediately upon arrival
// (i.e., the requests are "exempt" from limitation).
DesiredNumQueues int
// QueueLengthLimit is the maximum number of requests that may be waiting in a given queue at a time
@ -133,4 +139,8 @@ type QueuingConfig struct {
type DispatchingConfig struct {
// ConcurrencyLimit is the maximum number of requests of this QueueSet that may be executing at a time
ConcurrencyLimit int
// ConcurrencyDenominator is used in relative metrics of concurrency.
// It equals ConcurrencyLimit except when that is zero.
ConcurrencyDenominator int
}

View File

@ -197,7 +197,7 @@ func (qsf *queueSetFactory) BeginConstruction(qCfg fq.QueuingConfig, reqsGaugePa
// calls for one, and returns a non-nil error if the given config is
// invalid.
func checkConfig(qCfg fq.QueuingConfig) (*shufflesharding.Dealer, error) {
if qCfg.DesiredNumQueues == 0 {
if qCfg.DesiredNumQueues <= 0 {
return nil, nil
}
dealer, err := shufflesharding.NewDealer(qCfg.DesiredNumQueues, qCfg.HandSize)
@ -280,8 +280,8 @@ func (qs *queueSet) setConfiguration(ctx context.Context, qCfg fq.QueuingConfig,
qll *= qCfg.DesiredNumQueues
}
qs.reqsGaugePair.RequestsWaiting.SetDenominator(float64(qll))
qs.reqsGaugePair.RequestsExecuting.SetDenominator(float64(dCfg.ConcurrencyLimit))
qs.execSeatsGauge.SetDenominator(float64(dCfg.ConcurrencyLimit))
qs.reqsGaugePair.RequestsExecuting.SetDenominator(float64(dCfg.ConcurrencyDenominator))
qs.execSeatsGauge.SetDenominator(float64(dCfg.ConcurrencyDenominator))
qs.dispatchAsMuchAsPossibleLocked()
}
@ -796,6 +796,9 @@ func (qs *queueSet) dispatchLocked() bool {
// otherwise it returns false.
func (qs *queueSet) canAccommodateSeatsLocked(seats int) bool {
switch {
case qs.qCfg.DesiredNumQueues < 0:
// This is code for exemption from limitation
return true
case seats > qs.dCfg.ConcurrencyLimit:
// we have picked the queue with the minimum virtual finish time, but
// the number of seats this request asks for exceeds the concurrency limit.

View File

@ -514,7 +514,7 @@ func TestBaseline(t *testing.T) {
if err != nil {
t.Fatal(err)
}
qs := qsc.Complete(fq.DispatchingConfig{ConcurrencyLimit: 1})
qs := qsComplete(qsc, 1)
uniformScenario{name: qCfg.Name,
qs: qs,
@ -534,6 +534,47 @@ func TestBaseline(t *testing.T) {
}.exercise(t)
}
func TestExampt(t *testing.T) {
metrics.Register()
for concurrencyLimit := 0; concurrencyLimit <= 2; concurrencyLimit += 2 {
t.Run(fmt.Sprintf("concurrency=%d", concurrencyLimit), func(t *testing.T) {
now := time.Now()
clk, counter := testeventclock.NewFake(now, 0, nil)
qsf := newTestableQueueSetFactory(clk, countingPromiseFactoryFactory(counter))
qCfg := fq.QueuingConfig{
Name: "TestBaseline",
DesiredNumQueues: -1,
QueueLengthLimit: 2,
HandSize: 3,
RequestWaitLimit: 10 * time.Minute,
}
seatDemandIntegratorSubject := fq.NewNamedIntegrator(clk, "seatDemandSubject")
qsc, err := qsf.BeginConstruction(qCfg, newGaugePair(clk), newExecSeatsGauge(clk), seatDemandIntegratorSubject)
if err != nil {
t.Fatal(err)
}
qs := qsComplete(qsc, concurrencyLimit)
uniformScenario{name: qCfg.Name,
qs: qs,
clients: []uniformClient{
newUniformClient(1001001001, 5, 20, time.Second, time.Second).setInitWidth(3),
},
concurrencyLimit: 1,
evalDuration: time.Second * 40,
expectedFair: []bool{true}, // "fair" is a bit odd-sounding here, but it "expectFair" here means expect `expectedAverages`
expectedAverages: []float64{7.5},
expectedFairnessMargin: []float64{0.00000001},
expectAllRequests: true,
evalInqueueMetrics: false,
evalExecutingMetrics: true,
clk: clk,
counter: counter,
seatDemandIntegratorSubject: seatDemandIntegratorSubject,
}.exercise(t)
})
}
}
func TestSeparations(t *testing.T) {
flts := func(avgs ...float64) []float64 { return avgs }
for _, seps := range []struct {
@ -585,7 +626,7 @@ func TestSeparations(t *testing.T) {
if err != nil {
t.Fatal(err)
}
qs := qsc.Complete(fq.DispatchingConfig{ConcurrencyLimit: seps.conc})
qs := qsComplete(qsc, seps.conc)
uniformScenario{name: qCfg.Name,
qs: qs,
clients: []uniformClient{
@ -626,7 +667,7 @@ func TestUniformFlowsHandSize1(t *testing.T) {
if err != nil {
t.Fatal(err)
}
qs := qsc.Complete(fq.DispatchingConfig{ConcurrencyLimit: 4})
qs := qsComplete(qsc, 4)
uniformScenario{name: qCfg.Name,
qs: qs,
@ -665,7 +706,7 @@ func TestUniformFlowsHandSize3(t *testing.T) {
if err != nil {
t.Fatal(err)
}
qs := qsc.Complete(fq.DispatchingConfig{ConcurrencyLimit: 4})
qs := qsComplete(qsc, 4)
uniformScenario{name: qCfg.Name,
qs: qs,
clients: []uniformClient{
@ -703,7 +744,7 @@ func TestDifferentFlowsExpectEqual(t *testing.T) {
if err != nil {
t.Fatal(err)
}
qs := qsc.Complete(fq.DispatchingConfig{ConcurrencyLimit: 4})
qs := qsComplete(qsc, 4)
uniformScenario{name: qCfg.Name,
qs: qs,
@ -745,7 +786,7 @@ func TestSeatSecondsRollover(t *testing.T) {
if err != nil {
t.Fatal(err)
}
qs := qsc.Complete(fq.DispatchingConfig{ConcurrencyLimit: 2000})
qs := qsComplete(qsc, 2000)
uniformScenario{name: qCfg.Name,
qs: qs,
@ -785,7 +826,7 @@ func TestDifferentFlowsExpectUnequal(t *testing.T) {
if err != nil {
t.Fatal(err)
}
qs := qsc.Complete(fq.DispatchingConfig{ConcurrencyLimit: 3})
qs := qsComplete(qsc, 3)
uniformScenario{name: qCfg.Name,
qs: qs,
@ -824,7 +865,7 @@ func TestDifferentWidths(t *testing.T) {
if err != nil {
t.Fatal(err)
}
qs := qsc.Complete(fq.DispatchingConfig{ConcurrencyLimit: 6})
qs := qsComplete(qsc, 6)
uniformScenario{name: qCfg.Name,
qs: qs,
clients: []uniformClient{
@ -862,7 +903,7 @@ func TestTooWide(t *testing.T) {
if err != nil {
t.Fatal(err)
}
qs := qsc.Complete(fq.DispatchingConfig{ConcurrencyLimit: 6})
qs := qsComplete(qsc, 6)
uniformScenario{name: qCfg.Name,
qs: qs,
clients: []uniformClient{
@ -925,7 +966,7 @@ func TestWindup(t *testing.T) {
if err != nil {
t.Fatal(err)
}
qs := qsc.Complete(fq.DispatchingConfig{ConcurrencyLimit: 3})
qs := qsComplete(qsc, 3)
uniformScenario{name: qCfg.Name, qs: qs,
clients: []uniformClient{
@ -962,7 +1003,7 @@ func TestDifferentFlowsWithoutQueuing(t *testing.T) {
if err != nil {
t.Fatal(err)
}
qs := qsc.Complete(fq.DispatchingConfig{ConcurrencyLimit: 4})
qs := qsComplete(qsc, 4)
uniformScenario{name: qCfg.Name,
qs: qs,
@ -1000,7 +1041,7 @@ func TestTimeout(t *testing.T) {
if err != nil {
t.Fatal(err)
}
qs := qsc.Complete(fq.DispatchingConfig{ConcurrencyLimit: 1})
qs := qsComplete(qsc, 1)
uniformScenario{name: qCfg.Name,
qs: qs,
@ -1053,7 +1094,7 @@ func TestContextCancel(t *testing.T) {
if err != nil {
t.Fatal(err)
}
qs := qsc.Complete(fq.DispatchingConfig{ConcurrencyLimit: 1})
qs := qsComplete(qsc, 1)
counter.Add(1) // account for main activity of the goroutine running this test
ctx1 := context.Background()
pZero := func() *int32 { var zero int32; return &zero }
@ -1159,7 +1200,7 @@ func TestTotalRequestsExecutingWithPanic(t *testing.T) {
if err != nil {
t.Fatal(err)
}
qs := qsc.Complete(fq.DispatchingConfig{ConcurrencyLimit: 1})
qs := qsComplete(qsc, 1)
counter.Add(1) // account for the goroutine running this test
queue, ok := qs.(*queueSet)
@ -1545,3 +1586,11 @@ func float64NaNTo0(x float64) float64 {
}
return x
}
func qsComplete(qsc fq.QueueSetCompleter, concurrencyLimit int) fq.QueueSet {
concurrencyDenominator := concurrencyLimit
if concurrencyDenominator <= 0 {
concurrencyDenominator = 1
}
return qsc.Complete(fq.DispatchingConfig{ConcurrencyLimit: concurrencyLimit, ConcurrencyDenominator: concurrencyDenominator})
}