Use SeatSeconds

This commit is contained in:
Mike Spreitzer 2021-10-01 15:33:37 -07:00
parent d866f94458
commit 4b5e139819
3 changed files with 133 additions and 75 deletions

View File

@ -76,9 +76,9 @@ type queueSetCompleter struct {
// not end in "Locked" either acquires the lock or does not care about // not end in "Locked" either acquires the lock or does not care about
// locking. // locking.
type queueSet struct { type queueSet struct {
clock eventclock.Interface clock eventclock.Interface
estimatedServiceSeconds float64 estimatedServiceDuration time.Duration
obsPair metrics.TimedObserverPair obsPair metrics.TimedObserverPair
promiseFactory promiseFactory promiseFactory promiseFactory
@ -102,9 +102,9 @@ type queueSet struct {
// queues are still draining. // queues are still draining.
queues []*queue queues []*queue
// virtualTime is the amount of seat-seconds allocated per queue since process startup. // currentR is the amount of seat-seconds allocated per queue since process startup.
// This is our generalization of the progress meter named R in the original fair queuing work. // This is our generalization of the progress meter named R in the original fair queuing work.
virtualTime float64 currentR SeatSeconds
// lastRealTime is what `clock.Now()` yielded when `virtualTime` was last updated // lastRealTime is what `clock.Now()` yielded when `virtualTime` was last updated
lastRealTime time.Time lastRealTime time.Time
@ -173,12 +173,12 @@ func (qsc *queueSetCompleter) Complete(dCfg fq.DispatchingConfig) fq.QueueSet {
qs := qsc.theSet qs := qsc.theSet
if qs == nil { if qs == nil {
qs = &queueSet{ qs = &queueSet{
clock: qsc.factory.clock, clock: qsc.factory.clock,
estimatedServiceSeconds: 0.003, estimatedServiceDuration: 3 * time.Millisecond,
obsPair: qsc.obsPair, obsPair: qsc.obsPair,
qCfg: qsc.qCfg, qCfg: qsc.qCfg,
virtualTime: 0, currentR: 0,
lastRealTime: qsc.factory.clock.Now(), lastRealTime: qsc.factory.clock.Now(),
} }
qs.promiseFactory = qsc.factory.promiseFactoryFactory(qs) qs.promiseFactory = qsc.factory.promiseFactoryFactory(qs)
} }
@ -407,10 +407,14 @@ func (qs *queueSet) lockAndSyncTime() {
// lock and before modifying the state of any queue. // lock and before modifying the state of any queue.
func (qs *queueSet) syncTimeLocked() { func (qs *queueSet) syncTimeLocked() {
realNow := qs.clock.Now() realNow := qs.clock.Now()
timeSinceLast := realNow.Sub(qs.lastRealTime).Seconds() timeSinceLast := realNow.Sub(qs.lastRealTime)
qs.lastRealTime = realNow qs.lastRealTime = realNow
qs.virtualTime += timeSinceLast * qs.getVirtualTimeRatioLocked() prevR := qs.currentR
metrics.SetCurrentR(qs.qCfg.Name, qs.virtualTime) qs.currentR += SeatsTimesDuration(qs.getVirtualTimeRatioLocked(), timeSinceLast)
if qs.currentR < prevR {
klog.ErrorS(errors.New("progress meter wrapped around"), "Wrap", "QS", qs.qCfg.Name, "prevR", prevR, "currentR", qs.currentR)
}
metrics.SetCurrentR(qs.qCfg.Name, qs.currentR.ToFloat())
} }
// getVirtualTimeRatio calculates the rate at which virtual time has // getVirtualTimeRatio calculates the rate at which virtual time has
@ -460,7 +464,7 @@ func (qs *queueSet) timeoutOldRequestsAndRejectOrEnqueueLocked(ctx context.Conte
ctx: ctx, ctx: ctx,
decision: qs.promiseFactory(nil, ctx.Done(), decisionCancel), decision: qs.promiseFactory(nil, ctx.Done(), decisionCancel),
arrivalTime: qs.clock.Now(), arrivalTime: qs.clock.Now(),
arrivalR: qs.virtualTime, arrivalR: qs.currentR,
queue: queue, queue: queue,
descr1: descr1, descr1: descr1,
descr2: descr2, descr2: descr2,
@ -495,7 +499,7 @@ func (qs *queueSet) shuffleShardLocked(hashValue uint64, descr1, descr2 interfac
// Ideally, this should be based on projected completion time in the // Ideally, this should be based on projected completion time in the
// virtual world of the youngest request in the queue. // virtual world of the youngest request in the queue.
thisSeatsSum := waitingSeats + queue.seatsInUse thisSeatsSum := waitingSeats + queue.seatsInUse
klog.V(7).Infof("QS(%s): For request %#+v %#+v considering queue %d of %d seats waiting and %d executing, virtualStart=%vss", qs.qCfg.Name, descr1, descr2, queueIdx, waitingSeats, queue.seatsInUse, queue.virtualStart) klog.V(7).Infof("QS(%s): For request %#+v %#+v considering queue %d of %d seats waiting and %d executing, nextDispatchR=%v", qs.qCfg.Name, descr1, descr2, queueIdx, waitingSeats, queue.seatsInUse, queue.nextDispatchR)
if thisSeatsSum < bestQueueSeatsSum { if thisSeatsSum < bestQueueSeatsSum {
bestQueueIdx, bestQueueSeatsSum = queueIdx, thisSeatsSum bestQueueIdx, bestQueueSeatsSum = queueIdx, thisSeatsSum
} }
@ -503,7 +507,7 @@ func (qs *queueSet) shuffleShardLocked(hashValue uint64, descr1, descr2 interfac
} }
if klog.V(6).Enabled() { if klog.V(6).Enabled() {
chosenQueue := qs.queues[bestQueueIdx] chosenQueue := qs.queues[bestQueueIdx]
klog.V(6).Infof("QS(%s) at r=%s v=%.9fss: For request %#+v %#+v chose queue %d, had seatSum %d & %d requests executing & virtualStart=%vss", qs.qCfg.Name, qs.clock.Now().Format(nsTimeFmt), qs.virtualTime, descr1, descr2, bestQueueIdx, chosenQueue.requests.SeatsSum(), chosenQueue.requestsExecuting, chosenQueue.virtualStart) klog.V(6).Infof("QS(%s) at t=%s R=%v: For request %#+v %#+v chose queue %d, had seatSum %d & %d requests executing & virtualStart=%v", qs.qCfg.Name, qs.clock.Now().Format(nsTimeFmt), qs.currentR, descr1, descr2, bestQueueIdx, chosenQueue.requests.SeatsSum(), chosenQueue.requestsExecuting, chosenQueue.nextDispatchR)
} }
return bestQueueIdx return bestQueueIdx
} }
@ -571,9 +575,9 @@ func (qs *queueSet) enqueueLocked(request *request) {
now := qs.clock.Now() now := qs.clock.Now()
if queue.requests.Length() == 0 && queue.requestsExecuting == 0 { if queue.requests.Length() == 0 && queue.requestsExecuting == 0 {
// the queues start R is set to the virtual time. // the queues start R is set to the virtual time.
queue.virtualStart = qs.virtualTime queue.nextDispatchR = qs.currentR
if klog.V(6).Enabled() { if klog.V(6).Enabled() {
klog.Infof("QS(%s) at r=%s v=%.9fss: initialized queue %d start R due to request %#+v %#+v", qs.qCfg.Name, now.Format(nsTimeFmt), queue.virtualStart, queue.index, request.descr1, request.descr2) klog.Infof("QS(%s) at t=%s R=%v: initialized queue %d start R due to request %#+v %#+v", qs.qCfg.Name, now.Format(nsTimeFmt), queue.nextDispatchR, queue.index, request.descr1, request.descr2)
} }
} }
queue.Enqueue(request) queue.Enqueue(request)
@ -609,7 +613,7 @@ func (qs *queueSet) dispatchSansQueueLocked(ctx context.Context, workEstimate *f
startTime: now, startTime: now,
decision: qs.promiseFactory(decisionExecute, ctx.Done(), decisionCancel), decision: qs.promiseFactory(decisionExecute, ctx.Done(), decisionCancel),
arrivalTime: now, arrivalTime: now,
arrivalR: qs.virtualTime, arrivalR: qs.currentR,
descr1: descr1, descr1: descr1,
descr2: descr2, descr2: descr2,
workEstimate: *workEstimate, workEstimate: *workEstimate,
@ -620,7 +624,7 @@ func (qs *queueSet) dispatchSansQueueLocked(ctx context.Context, workEstimate *f
metrics.AddRequestConcurrencyInUse(qs.qCfg.Name, fsName, req.Seats()) metrics.AddRequestConcurrencyInUse(qs.qCfg.Name, fsName, req.Seats())
qs.obsPair.RequestsExecuting.Add(1) qs.obsPair.RequestsExecuting.Add(1)
if klog.V(5).Enabled() { if klog.V(5).Enabled() {
klog.Infof("QS(%s) at r=%s v=%.9fss: immediate dispatch of request %q %#+v %#+v, qs will have %d executing", qs.qCfg.Name, now.Format(nsTimeFmt), qs.virtualTime, fsName, descr1, descr2, qs.totRequestsExecuting) klog.Infof("QS(%s) at t=%s R=%v: immediate dispatch of request %q %#+v %#+v, qs will have %d executing", qs.qCfg.Name, now.Format(nsTimeFmt), qs.currentR, fsName, descr1, descr2, qs.totRequestsExecuting)
} }
return req return req
} }
@ -656,12 +660,12 @@ func (qs *queueSet) dispatchLocked() bool {
qs.obsPair.RequestsWaiting.Add(-1) qs.obsPair.RequestsWaiting.Add(-1)
qs.obsPair.RequestsExecuting.Add(1) qs.obsPair.RequestsExecuting.Add(1)
if klog.V(6).Enabled() { if klog.V(6).Enabled() {
klog.Infof("QS(%s) at r=%s v=%.9fss: dispatching request %#+v %#+v work %v from queue %d with start R %.9fss, queue will have %d waiting & %d requests occupying %d seats, set will have %d seats occupied", klog.Infof("QS(%s) at t=%s R=%v: dispatching request %#+v %#+v work %v from queue %d with start R %v, queue will have %d waiting & %d requests occupying %d seats, set will have %d seats occupied",
qs.qCfg.Name, request.startTime.Format(nsTimeFmt), qs.virtualTime, request.descr1, request.descr2, qs.qCfg.Name, request.startTime.Format(nsTimeFmt), qs.currentR, request.descr1, request.descr2,
request.workEstimate, queue.index, queue.virtualStart, queue.requests.Length(), queue.requestsExecuting, queue.seatsInUse, qs.totSeatsInUse) request.workEstimate, queue.index, queue.nextDispatchR, queue.requests.Length(), queue.requestsExecuting, queue.seatsInUse, qs.totSeatsInUse)
} }
// When a request is dequeued for service -> qs.virtualStart += G * width // When a request is dequeued for service -> qs.virtualStart += G * width
queue.virtualStart += qs.estimatedServiceSeconds * float64(request.Seats()) queue.nextDispatchR += SeatsTimesDuration(float64(request.Seats()), qs.estimatedServiceDuration)
qs.boundNextDispatch(queue) qs.boundNextDispatch(queue)
request.decision.Set(decisionExecute) request.decision.Set(decisionExecute)
return ok return ok
@ -694,11 +698,11 @@ func (qs *queueSet) canAccommodateSeatsLocked(seats int) bool {
// returns the first one of those for which the virtual finish time of // returns the first one of those for which the virtual finish time of
// the oldest waiting request is minimal. // the oldest waiting request is minimal.
func (qs *queueSet) findDispatchQueueLocked() *queue { func (qs *queueSet) findDispatchQueueLocked() *queue {
minVirtualFinish := math.Inf(1) minVirtualFinish := MaxSeatSeconds
sMin := math.Inf(1) sMin := MaxSeatSeconds
dsMin := math.Inf(1) dsMin := MaxSeatSeconds
sMax := math.Inf(-1) sMax := MinSeatSeconds
dsMax := math.Inf(-1) dsMax := MinSeatSeconds
var minQueue *queue var minQueue *queue
var minIndex int var minIndex int
nq := len(qs.queues) nq := len(qs.queues)
@ -707,12 +711,12 @@ func (qs *queueSet) findDispatchQueueLocked() *queue {
queue := qs.queues[qs.robinIndex] queue := qs.queues[qs.robinIndex]
oldestWaiting, _ := queue.requests.Peek() oldestWaiting, _ := queue.requests.Peek()
if oldestWaiting != nil { if oldestWaiting != nil {
sMin = math.Min(sMin, queue.virtualStart) sMin = ssMin(sMin, queue.nextDispatchR)
sMax = math.Max(sMax, queue.virtualStart) sMax = ssMax(sMax, queue.nextDispatchR)
estimatedWorkInProgress := qs.estimatedServiceSeconds * float64(queue.seatsInUse) estimatedWorkInProgress := SeatsTimesDuration(float64(queue.seatsInUse), qs.estimatedServiceDuration)
dsMin = math.Min(dsMin, queue.virtualStart-estimatedWorkInProgress) dsMin = ssMin(dsMin, queue.nextDispatchR-estimatedWorkInProgress)
dsMax = math.Max(dsMax, queue.virtualStart-estimatedWorkInProgress) dsMax = ssMax(dsMax, queue.nextDispatchR-estimatedWorkInProgress)
currentVirtualFinish := queue.virtualStart + qs.estimatedServiceSeconds*float64(oldestWaiting.Seats()) currentVirtualFinish := queue.nextDispatchR + SeatsTimesDuration(float64(oldestWaiting.Seats()), qs.estimatedServiceDuration)
klog.V(11).InfoS("Considering queue to dispatch", "queueSet", qs.qCfg.Name, "queue", qs.robinIndex, "finishR", currentVirtualFinish) klog.V(11).InfoS("Considering queue to dispatch", "queueSet", qs.qCfg.Name, "queue", qs.robinIndex, "finishR", currentVirtualFinish)
if currentVirtualFinish < minVirtualFinish { if currentVirtualFinish < minVirtualFinish {
minVirtualFinish = currentVirtualFinish minVirtualFinish = currentVirtualFinish
@ -743,13 +747,27 @@ func (qs *queueSet) findDispatchQueueLocked() *queue {
// win in the case that the virtual finish times are the same // win in the case that the virtual finish times are the same
qs.robinIndex = minIndex qs.robinIndex = minIndex
if minQueue.virtualStart < oldestReqFromMinQueue.arrivalR { if minQueue.nextDispatchR < oldestReqFromMinQueue.arrivalR {
klog.ErrorS(errors.New("dispatch before arrival"), "Inconceivable!", "QS", qs.qCfg.Name, "queue", minQueue.index, "dispatchR", minQueue.virtualStart, "request", oldestReqFromMinQueue) klog.ErrorS(errors.New("dispatch before arrival"), "Inconceivable!", "QS", qs.qCfg.Name, "queue", minQueue.index, "dispatchR", minQueue.nextDispatchR, "request", oldestReqFromMinQueue)
} }
metrics.SetDispatchMetrics(qs.qCfg.Name, qs.virtualTime, minQueue.virtualStart, sMin, sMax, dsMin, dsMax) metrics.SetDispatchMetrics(qs.qCfg.Name, qs.currentR.ToFloat(), minQueue.nextDispatchR.ToFloat(), sMin.ToFloat(), sMax.ToFloat(), dsMin.ToFloat(), dsMax.ToFloat())
return minQueue return minQueue
} }
func ssMin(a, b SeatSeconds) SeatSeconds {
if a > b {
return b
}
return a
}
func ssMax(a, b SeatSeconds) SeatSeconds {
if a < b {
return b
}
return a
}
// finishRequestAndDispatchAsMuchAsPossible is a convenience method // finishRequestAndDispatchAsMuchAsPossible is a convenience method
// which calls finishRequest for a given request and then dispatches // which calls finishRequest for a given request and then dispatches
// as many requests as possible. This is all of what needs to be done // as many requests as possible. This is all of what needs to be done
@ -773,7 +791,7 @@ func (qs *queueSet) finishRequestLocked(r *request) {
metrics.AddRequestsExecuting(r.ctx, qs.qCfg.Name, r.fsName, -1) metrics.AddRequestsExecuting(r.ctx, qs.qCfg.Name, r.fsName, -1)
qs.obsPair.RequestsExecuting.Add(-1) qs.obsPair.RequestsExecuting.Add(-1)
S := now.Sub(r.startTime).Seconds() actualServiceDuration := now.Sub(r.startTime)
// TODO: for now we keep the logic localized so it is easier to see // TODO: for now we keep the logic localized so it is easier to see
// how the counters are tracked for queueset and queue, in future we // how the counters are tracked for queueset and queue, in future we
@ -794,11 +812,11 @@ func (qs *queueSet) finishRequestLocked(r *request) {
releaseSeatsLocked() releaseSeatsLocked()
if !klog.V(6).Enabled() { if !klog.V(6).Enabled() {
} else if r.queue != nil { } else if r.queue != nil {
klog.Infof("QS(%s) at r=%s v=%.9fss: request %#+v %#+v finished all use of %d seats, adjusted queue %d start R to %.9fss due to service time %.9fs, queue will have %d requests, %d seats waiting & %d requests occupying %d seats", klog.Infof("QS(%s) at t=%s R=%v: request %#+v %#+v finished all use of %d seats, adjusted queue %d start R to %v due to service time %.9fs, queue will have %d requests, %d seats waiting & %d requests occupying %d seats",
qs.qCfg.Name, now.Format(nsTimeFmt), qs.virtualTime, r.descr1, r.descr2, r.workEstimate.InitialSeats, r.queue.index, qs.qCfg.Name, now.Format(nsTimeFmt), qs.currentR, r.descr1, r.descr2, r.workEstimate.InitialSeats, r.queue.index,
r.queue.virtualStart, S, r.queue.requests.Length(), r.queue.requests.SeatsSum(), r.queue.requestsExecuting, r.queue.seatsInUse) r.queue.nextDispatchR, actualServiceDuration.Seconds(), r.queue.requests.Length(), r.queue.requests.SeatsSum(), r.queue.requestsExecuting, r.queue.seatsInUse)
} else { } else {
klog.Infof("QS(%s) at r=%s v=%.9fss: request %#+v %#+v finished all use of %d seats, qs will have %d requests occupying %d seats", qs.qCfg.Name, now.Format(nsTimeFmt), qs.virtualTime, r.descr1, r.descr2, r.workEstimate.InitialSeats, qs.totRequestsExecuting, qs.totSeatsInUse) klog.Infof("QS(%s) at t=%s R=%v: request %#+v %#+v finished all use of %d seats, qs will have %d requests occupying %d seats", qs.qCfg.Name, now.Format(nsTimeFmt), qs.currentR, r.descr1, r.descr2, r.workEstimate.InitialSeats, qs.totRequestsExecuting, qs.totSeatsInUse)
} }
return return
} }
@ -806,11 +824,11 @@ func (qs *queueSet) finishRequestLocked(r *request) {
additionalLatency := r.workEstimate.AdditionalLatency additionalLatency := r.workEstimate.AdditionalLatency
if !klog.V(6).Enabled() { if !klog.V(6).Enabled() {
} else if r.queue != nil { } else if r.queue != nil {
klog.Infof("QS(%s) at r=%s v=%.9fss: request %#+v %#+v finished main use but lingering on %d seats for %v seconds, adjusted queue %d start R to %.9fss due to service time %.9fs, queue will have %d requests waiting & %d executing, still has %d seats waiting & %d executing", klog.Infof("QS(%s) at t=%s R=%v: request %#+v %#+v finished main use but lingering on %d seats for %v seconds, adjusted queue %d start R to %v due to service time %.9fs, queue will have %d requests waiting & %d executing, still has %d seats waiting & %d executing",
qs.qCfg.Name, now.Format(nsTimeFmt), qs.virtualTime, r.descr1, r.descr2, r.workEstimate.InitialSeats, additionalLatency.Seconds(), r.queue.index, qs.qCfg.Name, now.Format(nsTimeFmt), qs.currentR, r.descr1, r.descr2, r.workEstimate.InitialSeats, additionalLatency.Seconds(), r.queue.index,
r.queue.virtualStart, S, r.queue.requests.Length(), r.queue.requestsExecuting, r.queue.requests.SeatsSum(), r.queue.seatsInUse) r.queue.nextDispatchR, actualServiceDuration.Seconds(), r.queue.requests.Length(), r.queue.requestsExecuting, r.queue.requests.SeatsSum(), r.queue.seatsInUse)
} else { } else {
klog.Infof("QS(%s) at r=%s v=%.9fss: request %#+v %#+v finished main use but lingering on %d seats for %v seconds, qs will have %d requests occupying %d seats", qs.qCfg.Name, now.Format(nsTimeFmt), qs.virtualTime, r.descr1, r.descr2, r.workEstimate.InitialSeats, additionalLatency.Seconds(), qs.totRequestsExecuting, qs.totSeatsInUse) klog.Infof("QS(%s) at t=%s R=%v: request %#+v %#+v finished main use but lingering on %d seats for %v seconds, qs will have %d requests occupying %d seats", qs.qCfg.Name, now.Format(nsTimeFmt), qs.currentR, r.descr1, r.descr2, r.workEstimate.InitialSeats, additionalLatency.Seconds(), qs.totRequestsExecuting, qs.totSeatsInUse)
} }
// EventAfterDuration will execute the event func in a new goroutine, // EventAfterDuration will execute the event func in a new goroutine,
// so the seats allocated to this request will be released after // so the seats allocated to this request will be released after
@ -823,11 +841,11 @@ func (qs *queueSet) finishRequestLocked(r *request) {
releaseSeatsLocked() releaseSeatsLocked()
if !klog.V(6).Enabled() { if !klog.V(6).Enabled() {
} else if r.queue != nil { } else if r.queue != nil {
klog.Infof("QS(%s) at r=%s v=%.9fss: request %#+v %#+v finished lingering on %d seats, queue %d will have %d requests, %d seats waiting & %d requests occupying %d seats", klog.Infof("QS(%s) at t=%s R=%v: request %#+v %#+v finished lingering on %d seats, queue %d will have %d requests, %d seats waiting & %d requests occupying %d seats",
qs.qCfg.Name, now.Format(nsTimeFmt), qs.virtualTime, r.descr1, r.descr2, r.workEstimate.InitialSeats, r.queue.index, qs.qCfg.Name, now.Format(nsTimeFmt), qs.currentR, r.descr1, r.descr2, r.workEstimate.InitialSeats, r.queue.index,
r.queue.requests.Length(), r.queue.requests.SeatsSum(), r.queue.requestsExecuting, r.queue.seatsInUse) r.queue.requests.Length(), r.queue.requests.SeatsSum(), r.queue.requestsExecuting, r.queue.seatsInUse)
} else { } else {
klog.Infof("QS(%s) at r=%s v=%.9fss: request %#+v %#+v finished lingering on %d seats, qs will have %d requests occupying %d seats", qs.qCfg.Name, now.Format(nsTimeFmt), qs.virtualTime, r.descr1, r.descr2, r.workEstimate.InitialSeats, qs.totRequestsExecuting, qs.totSeatsInUse) klog.Infof("QS(%s) at t=%s R=%v: request %#+v %#+v finished lingering on %d seats, qs will have %d requests occupying %d seats", qs.qCfg.Name, now.Format(nsTimeFmt), qs.currentR, r.descr1, r.descr2, r.workEstimate.InitialSeats, qs.totRequestsExecuting, qs.totSeatsInUse)
} }
qs.dispatchAsMuchAsPossibleLocked() qs.dispatchAsMuchAsPossibleLocked()
}, additionalLatency) }, additionalLatency)
@ -839,7 +857,7 @@ func (qs *queueSet) finishRequestLocked(r *request) {
// When a request finishes being served, and the actual service time was S, // When a request finishes being served, and the actual service time was S,
// the queues start R is decremented by (G - S)*width. // the queues start R is decremented by (G - S)*width.
r.queue.virtualStart -= (qs.estimatedServiceSeconds - S) * float64(r.Seats()) r.queue.nextDispatchR -= SeatsTimesDuration(float64(r.Seats()), qs.estimatedServiceDuration-actualServiceDuration)
qs.boundNextDispatch(r.queue) qs.boundNextDispatch(r.queue)
} }
} }
@ -857,11 +875,11 @@ func (qs *queueSet) boundNextDispatch(queue *queue) {
return return
} }
var virtualStartBound = oldestReqFromMinQueue.arrivalR var virtualStartBound = oldestReqFromMinQueue.arrivalR
if queue.virtualStart < virtualStartBound { if queue.nextDispatchR < virtualStartBound {
if klog.V(4).Enabled() { if klog.V(4).Enabled() {
klog.InfoS("AntiWindup tweaked queue", "QS", qs.qCfg.Name, "queue", queue.index, "time", qs.clock.Now().Format(nsTimeFmt), "requestDescr1", oldestReqFromMinQueue.descr1, "requestDescr2", oldestReqFromMinQueue.descr2, "newVirtualStart", virtualStartBound, "deltaVirtualStart", (virtualStartBound - queue.virtualStart)) klog.InfoS("AntiWindup tweaked queue", "QS", qs.qCfg.Name, "queue", queue.index, "time", qs.clock.Now().Format(nsTimeFmt), "requestDescr1", oldestReqFromMinQueue.descr1, "requestDescr2", oldestReqFromMinQueue.descr2, "newVirtualStart", virtualStartBound, "deltaVirtualStart", (virtualStartBound - queue.nextDispatchR))
} }
queue.virtualStart = virtualStartBound queue.nextDispatchR = virtualStartBound
} }
} }

View File

@ -247,7 +247,7 @@ type uniformScenarioThread struct {
} }
func (ust *uniformScenarioThread) start() { func (ust *uniformScenarioThread) start() {
initialDelay := time.Duration(11*ust.j + 2*ust.i) initialDelay := time.Duration(90*ust.j + 20*ust.i)
if ust.uc.split && ust.j >= ust.uc.nThreads/2 { if ust.uc.split && ust.j >= ust.uc.nThreads/2 {
initialDelay += ust.uss.evalDuration / 2 initialDelay += ust.uss.evalDuration / 2
ust.nCalls = ust.nCalls / 2 ust.nCalls = ust.nCalls / 2
@ -601,7 +601,7 @@ func TestUniformFlowsHandSize3(t *testing.T) {
concurrencyLimit: 4, concurrencyLimit: 4,
evalDuration: time.Second * 60, evalDuration: time.Second * 60,
expectedFair: []bool{true}, expectedFair: []bool{true},
expectedFairnessMargin: []float64{0.01}, expectedFairnessMargin: []float64{0.03},
expectAllRequests: true, expectAllRequests: true,
evalInqueueMetrics: true, evalInqueueMetrics: true,
evalExecutingMetrics: true, evalExecutingMetrics: true,
@ -647,6 +647,46 @@ func TestDifferentFlowsExpectEqual(t *testing.T) {
}.exercise(t) }.exercise(t)
} }
// TestSeatSecondsRollover demonstrates that SeatSeconds overflow can cause bad stuff to happen.
func TestSeatSecondsRollover(t *testing.T) {
metrics.Register()
now := time.Now()
const Quarter = 91 * 24 * time.Hour
clk, counter := testeventclock.NewFake(now, 0, nil)
qsf := newTestableQueueSetFactory(clk, countingPromiseFactoryFactory(counter))
qCfg := fq.QueuingConfig{
Name: "TestSeatSecondsRollover",
DesiredNumQueues: 9,
QueueLengthLimit: 8,
HandSize: 1,
RequestWaitLimit: 40 * Quarter,
}
qsc, err := qsf.BeginConstruction(qCfg, newObserverPair(clk))
if err != nil {
t.Fatal(err)
}
qs := qsc.Complete(fq.DispatchingConfig{ConcurrencyLimit: 2000})
uniformScenario{name: qCfg.Name,
qs: qs,
clients: []uniformClient{
newUniformClient(1001001001, 8, 20, Quarter, Quarter).seats(500),
newUniformClient(2002002002, 7, 30, Quarter, Quarter/2).seats(500),
},
concurrencyLimit: 2000,
evalDuration: Quarter * 40,
expectedFair: []bool{false},
expectedFairnessMargin: []float64{0.01},
expectAllRequests: true,
evalInqueueMetrics: true,
evalExecutingMetrics: true,
clk: clk,
counter: counter,
}.exercise(t)
}
func TestDifferentFlowsExpectUnequal(t *testing.T) { func TestDifferentFlowsExpectUnequal(t *testing.T) {
metrics.Register() metrics.Register()
now := time.Now() now := time.Now()
@ -1073,7 +1113,7 @@ func TestTotalRequestsExecutingWithPanic(t *testing.T) {
} }
func TestFindDispatchQueueLocked(t *testing.T) { func TestFindDispatchQueueLocked(t *testing.T) {
var G float64 = 0.003 const G = 3 * time.Millisecond
tests := []struct { tests := []struct {
name string name string
robinIndex int robinIndex int
@ -1092,13 +1132,13 @@ func TestFindDispatchQueueLocked(t *testing.T) {
robinIndex: -1, robinIndex: -1,
queues: []*queue{ queues: []*queue{
{ {
virtualStart: 200, nextDispatchR: SeatsTimesDuration(1, 200*time.Second),
requests: newFIFO( requests: newFIFO(
&request{workEstimate: fcrequest.WorkEstimate{InitialSeats: 1}}, &request{workEstimate: fcrequest.WorkEstimate{InitialSeats: 1}},
), ),
}, },
{ {
virtualStart: 100, nextDispatchR: SeatsTimesDuration(1, 100*time.Second),
requests: newFIFO( requests: newFIFO(
&request{workEstimate: fcrequest.WorkEstimate{InitialSeats: 1}}, &request{workEstimate: fcrequest.WorkEstimate{InitialSeats: 1}},
), ),
@ -1115,7 +1155,7 @@ func TestFindDispatchQueueLocked(t *testing.T) {
robinIndex: -1, robinIndex: -1,
queues: []*queue{ queues: []*queue{
{ {
virtualStart: 200, nextDispatchR: SeatsTimesDuration(1, 200*time.Second),
requests: newFIFO( requests: newFIFO(
&request{workEstimate: fcrequest.WorkEstimate{InitialSeats: 1}}, &request{workEstimate: fcrequest.WorkEstimate{InitialSeats: 1}},
), ),
@ -1132,13 +1172,13 @@ func TestFindDispatchQueueLocked(t *testing.T) {
robinIndex: -1, robinIndex: -1,
queues: []*queue{ queues: []*queue{
{ {
virtualStart: 200, nextDispatchR: SeatsTimesDuration(1, 200*time.Second),
requests: newFIFO( requests: newFIFO(
&request{workEstimate: fcrequest.WorkEstimate{InitialSeats: 50}}, &request{workEstimate: fcrequest.WorkEstimate{InitialSeats: 50}},
), ),
}, },
{ {
virtualStart: 100, nextDispatchR: SeatsTimesDuration(1, 100*time.Second),
requests: newFIFO( requests: newFIFO(
&request{workEstimate: fcrequest.WorkEstimate{InitialSeats: 25}}, &request{workEstimate: fcrequest.WorkEstimate{InitialSeats: 25}},
), ),
@ -1155,13 +1195,13 @@ func TestFindDispatchQueueLocked(t *testing.T) {
robinIndex: -1, robinIndex: -1,
queues: []*queue{ queues: []*queue{
{ {
virtualStart: 200, nextDispatchR: SeatsTimesDuration(1, 200*time.Second),
requests: newFIFO( requests: newFIFO(
&request{workEstimate: fcrequest.WorkEstimate{InitialSeats: 10}}, &request{workEstimate: fcrequest.WorkEstimate{InitialSeats: 10}},
), ),
}, },
{ {
virtualStart: 100, nextDispatchR: SeatsTimesDuration(1, 100*time.Second),
requests: newFIFO( requests: newFIFO(
&request{workEstimate: fcrequest.WorkEstimate{InitialSeats: 25}}, &request{workEstimate: fcrequest.WorkEstimate{InitialSeats: 25}},
), ),
@ -1178,13 +1218,13 @@ func TestFindDispatchQueueLocked(t *testing.T) {
robinIndex: -1, robinIndex: -1,
queues: []*queue{ queues: []*queue{
{ {
virtualStart: 200, nextDispatchR: SeatsTimesDuration(1, 200*time.Second),
requests: newFIFO( requests: newFIFO(
&request{workEstimate: fcrequest.WorkEstimate{InitialSeats: 10}}, &request{workEstimate: fcrequest.WorkEstimate{InitialSeats: 10}},
), ),
}, },
{ {
virtualStart: 100, nextDispatchR: SeatsTimesDuration(1, 100*time.Second),
requests: newFIFO( requests: newFIFO(
&request{workEstimate: fcrequest.WorkEstimate{InitialSeats: 25}}, &request{workEstimate: fcrequest.WorkEstimate{InitialSeats: 25}},
), ),
@ -1204,10 +1244,10 @@ func TestFindDispatchQueueLocked(t *testing.T) {
for _, test := range tests { for _, test := range tests {
t.Run(test.name, func(t *testing.T) { t.Run(test.name, func(t *testing.T) {
qs := &queueSet{ qs := &queueSet{
estimatedServiceSeconds: G, estimatedServiceDuration: G,
robinIndex: test.robinIndex, robinIndex: test.robinIndex,
totSeatsInUse: test.totSeatsInUse, totSeatsInUse: test.totSeatsInUse,
qCfg: fq.QueuingConfig{Name: "TestSelectQueueLocked/" + test.name}, qCfg: fq.QueuingConfig{Name: "TestSelectQueueLocked/" + test.name},
dCfg: fq.DispatchingConfig{ dCfg: fq.DispatchingConfig{
ConcurrencyLimit: test.concurrencyLimit, ConcurrencyLimit: test.concurrencyLimit,
}, },

View File

@ -62,7 +62,7 @@ type request struct {
arrivalTime time.Time arrivalTime time.Time
// arrivalR is R(arrivalTime). R is, confusingly, also called "virtual time". // arrivalR is R(arrivalTime). R is, confusingly, also called "virtual time".
arrivalR float64 arrivalR SeatSeconds
// descr1 and descr2 are not used in any logic but they appear in // descr1 and descr2 are not used in any logic but they appear in
// log messages // log messages
@ -84,9 +84,9 @@ type queue struct {
// The requests not yet executing in the real world are stored in a FIFO list. // The requests not yet executing in the real world are stored in a FIFO list.
requests fifo requests fifo
// virtualStart is the "virtual time" (R progress meter reading) at // nextDispatchR is the R progress meter reading at
// which the next request will be dispatched in the virtual world. // which the next request will be dispatched in the virtual world.
virtualStart float64 nextDispatchR SeatSeconds
// requestsExecuting is the count in the real world // requestsExecuting is the count in the real world
requestsExecuting int requestsExecuting int
@ -130,7 +130,7 @@ func (q *queue) dump(includeDetails bool) debug.QueueDump {
return true return true
}) })
return debug.QueueDump{ return debug.QueueDump{
VirtualStart: q.virtualStart, VirtualStart: q.nextDispatchR.ToFloat(), // TODO: change QueueDump to use SeatSeconds
Requests: digest, Requests: digest,
ExecutingRequests: q.requestsExecuting, ExecutingRequests: q.requestsExecuting,
SeatsInUse: q.seatsInUse, SeatsInUse: q.seatsInUse,