Merge pull request #104345 from MikeSpreitzer/test-width

Fix extra latency and add tests for that and non-unit width
This commit is contained in:
Kubernetes Prow Robot 2021-08-23 02:32:00 -07:00 committed by GitHub
commit b9565beef0
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 298 additions and 67 deletions

View File

@ -18,6 +18,7 @@ package queueset
import ( import (
"context" "context"
"errors"
"fmt" "fmt"
"math" "math"
"sync" "sync"
@ -101,7 +102,8 @@ type queueSet struct {
// queues are still draining. // queues are still draining.
queues []*queue queues []*queue
// virtualTime is the number of virtual seconds since process startup // virtualTime is the amount of seat-seconds allocated per queue since process startup.
// This is our generalization of the progress meter named R in the original fair queuing work.
virtualTime float64 virtualTime float64
// lastRealTime is what `clock.Now()` yielded when `virtualTime` was last updated // lastRealTime is what `clock.Now()` yielded when `virtualTime` was last updated
@ -477,13 +479,15 @@ func (qs *queueSet) chooseQueueIndexLocked(hashValue uint64, descr1, descr2 inte
// in addition to their seats. // in addition to their seats.
// Ideally, this should be based on projected completion time in the // Ideally, this should be based on projected completion time in the
// virtual world of the youngest request in the queue. // virtual world of the youngest request in the queue.
thisSeatsSum := qs.queues[queueIdx].requests.SeatsSum() queue := qs.queues[queueIdx]
klog.V(7).Infof("QS(%s): For request %#+v %#+v considering queue %d of seatsSum %d", qs.qCfg.Name, descr1, descr2, queueIdx, thisSeatsSum) waitingSeats := queue.requests.SeatsSum()
thisSeatsSum := waitingSeats // + queue.seatsInUse
klog.V(7).Infof("QS(%s): For request %#+v %#+v considering queue %d of %d seats waiting and %d executing", qs.qCfg.Name, descr1, descr2, queueIdx, waitingSeats, queue.seatsInUse)
if thisSeatsSum < bestQueueSeatsSum { if thisSeatsSum < bestQueueSeatsSum {
bestQueueIdx, bestQueueSeatsSum = queueIdx, thisSeatsSum bestQueueIdx, bestQueueSeatsSum = queueIdx, thisSeatsSum
} }
}) })
klog.V(6).Infof("QS(%s) at r=%s v=%.9fs: For request %#+v %#+v chose queue %d, had %d waiting & %d executing", qs.qCfg.Name, qs.clock.Now().Format(nsTimeFmt), qs.virtualTime, descr1, descr2, bestQueueIdx, bestQueueSeatsSum, qs.queues[bestQueueIdx].requestsExecuting) klog.V(6).Infof("QS(%s) at r=%s v=%.9fss: For request %#+v %#+v chose queue %d, had seatSum %d & %d requests executing", qs.qCfg.Name, qs.clock.Now().Format(nsTimeFmt), qs.virtualTime, descr1, descr2, bestQueueIdx, bestQueueSeatsSum, qs.queues[bestQueueIdx].requestsExecuting)
return bestQueueIdx return bestQueueIdx
} }
@ -549,10 +553,10 @@ func (qs *queueSet) enqueueLocked(request *request) {
queue := request.queue queue := request.queue
now := qs.clock.Now() now := qs.clock.Now()
if queue.requests.Length() == 0 && queue.requestsExecuting == 0 { if queue.requests.Length() == 0 && queue.requestsExecuting == 0 {
// the queues virtual start time is set to the virtual time. // the queues start R is set to the virtual time.
queue.virtualStart = qs.virtualTime queue.virtualStart = qs.virtualTime
if klog.V(6).Enabled() { if klog.V(6).Enabled() {
klog.Infof("QS(%s) at r=%s v=%.9fs: initialized queue %d virtual start time due to request %#+v %#+v", qs.qCfg.Name, now.Format(nsTimeFmt), queue.virtualStart, queue.index, request.descr1, request.descr2) klog.Infof("QS(%s) at r=%s v=%.9fss: initialized queue %d start R due to request %#+v %#+v", qs.qCfg.Name, now.Format(nsTimeFmt), queue.virtualStart, queue.index, request.descr1, request.descr2)
} }
} }
queue.Enqueue(request) queue.Enqueue(request)
@ -598,7 +602,7 @@ func (qs *queueSet) dispatchSansQueueLocked(ctx context.Context, workEstimate *f
metrics.AddRequestConcurrencyInUse(qs.qCfg.Name, fsName, req.Seats()) metrics.AddRequestConcurrencyInUse(qs.qCfg.Name, fsName, req.Seats())
qs.obsPair.RequestsExecuting.Add(1) qs.obsPair.RequestsExecuting.Add(1)
if klog.V(5).Enabled() { if klog.V(5).Enabled() {
klog.Infof("QS(%s) at r=%s v=%.9fs: immediate dispatch of request %q %#+v %#+v, qs will have %d executing", qs.qCfg.Name, now.Format(nsTimeFmt), qs.virtualTime, fsName, descr1, descr2, qs.totRequestsExecuting) klog.Infof("QS(%s) at r=%s v=%.9fss: immediate dispatch of request %q %#+v %#+v, qs will have %d executing", qs.qCfg.Name, now.Format(nsTimeFmt), qs.virtualTime, fsName, descr1, descr2, qs.totRequestsExecuting)
} }
return req return req
} }
@ -634,9 +638,9 @@ func (qs *queueSet) dispatchLocked() bool {
qs.obsPair.RequestsWaiting.Add(-1) qs.obsPair.RequestsWaiting.Add(-1)
qs.obsPair.RequestsExecuting.Add(1) qs.obsPair.RequestsExecuting.Add(1)
if klog.V(6).Enabled() { if klog.V(6).Enabled() {
klog.Infof("QS(%s) at r=%s v=%.9fs: dispatching request %#+v %#+v from queue %d with virtual start time %.9fs, queue will have %d waiting & %d executing", klog.Infof("QS(%s) at r=%s v=%.9fss: dispatching request %#+v %#+v work %v from queue %d with start R %.9fss, queue will have %d waiting & %d requests occupying %d seats, set will have %d seats occupied",
qs.qCfg.Name, request.startTime.Format(nsTimeFmt), qs.virtualTime, request.descr1, request.descr2, qs.qCfg.Name, request.startTime.Format(nsTimeFmt), qs.virtualTime, request.descr1, request.descr2,
queue.index, queue.virtualStart, queue.requests.Length(), queue.requestsExecuting) request.workEstimate, queue.index, queue.virtualStart, queue.requests.Length(), queue.requestsExecuting, queue.seatsInUse, qs.totSeatsInUse)
} }
// When a request is dequeued for service -> qs.virtualStart += G // When a request is dequeued for service -> qs.virtualStart += G
queue.virtualStart += qs.estimatedServiceTime * float64(request.Seats()) queue.virtualStart += qs.estimatedServiceTime * float64(request.Seats())
@ -659,10 +663,6 @@ func (qs *queueSet) canAccommodateSeatsLocked(seats int) bool {
} }
// wait for all "currently" executing requests in this queueSet // wait for all "currently" executing requests in this queueSet
// to finish before we can execute this request. // to finish before we can execute this request.
if klog.V(4).Enabled() {
klog.Infof("QS(%s): seats (%d) asked for exceeds concurrency limit, waiting for currently executing requests to complete, %d seats are in use (%d are executing) and the limit is %d",
qs.qCfg.Name, seats, qs.totSeatsInUse, qs.totRequestsExecuting, qs.dCfg.ConcurrencyLimit)
}
return false return false
case qs.totSeatsInUse+seats > qs.dCfg.ConcurrencyLimit: case qs.totSeatsInUse+seats > qs.dCfg.ConcurrencyLimit:
return false return false
@ -692,8 +692,8 @@ func (qs *queueSet) selectQueueLocked() *queue {
estimatedWorkInProgress := qs.estimatedServiceTime * float64(queue.seatsInUse) estimatedWorkInProgress := qs.estimatedServiceTime * float64(queue.seatsInUse)
dsMin = math.Min(dsMin, queue.virtualStart-estimatedWorkInProgress) dsMin = math.Min(dsMin, queue.virtualStart-estimatedWorkInProgress)
dsMax = math.Max(dsMax, queue.virtualStart-estimatedWorkInProgress) dsMax = math.Max(dsMax, queue.virtualStart-estimatedWorkInProgress)
// the virtual finish time of the oldest request is: // the finish R of the oldest request is:
// virtual start time + G // start R + G
// we are not taking the width of the request into account when // we are not taking the width of the request into account when
// we calculate the virtual finish time of the request because // we calculate the virtual finish time of the request because
// it can starve requests with smaller wdith in other queues. // it can starve requests with smaller wdith in other queues.
@ -704,12 +704,12 @@ func (qs *queueSet) selectQueueLocked() *queue {
// - we have two queues, q1 and q2 // - we have two queues, q1 and q2
// - q1 has an infinite supply of requests with width W=1 // - q1 has an infinite supply of requests with width W=1
// - q2 has one request waiting in the queue with width W=2 // - q2 has one request waiting in the queue with width W=2
// - virtual start time for both q1 and q2 are at t0 // - start R for both q1 and q2 are at t0
// - requests complete really fast, S=1ms on q1 // - requests complete really fast, S=1ms on q1
// in this scenario we will execute roughly 60,000 requests // in this scenario we will execute roughly 60,000 requests
// from q1 before we pick the request from q2. // from q1 before we pick the request from q2.
currentVirtualFinish := queue.virtualStart + qs.estimatedServiceTime currentVirtualFinish := queue.virtualStart + qs.estimatedServiceTime
klog.V(11).InfoS("Considering queue to dispatch", "queueSet", qs.qCfg.Name, "queue", qs.robinIndex, "finishR", currentVirtualFinish)
if currentVirtualFinish < minVirtualFinish { if currentVirtualFinish < minVirtualFinish {
minVirtualFinish = currentVirtualFinish minVirtualFinish = currentVirtualFinish
minQueue = queue minQueue = queue
@ -724,9 +724,18 @@ func (qs *queueSet) selectQueueLocked() *queue {
oldestReqFromMinQueue = r oldestReqFromMinQueue = r
return false return false
}) })
if oldestReqFromMinQueue == nil || !qs.canAccommodateSeatsLocked(oldestReqFromMinQueue.Seats()) { if oldestReqFromMinQueue == nil {
// This cannot happen
klog.ErrorS(errors.New("selected queue is empty"), "Impossible", "queueSet", qs.qCfg.Name)
return nil
}
if !qs.canAccommodateSeatsLocked(oldestReqFromMinQueue.Seats()) {
// since we have not picked the queue with the minimum virtual finish // since we have not picked the queue with the minimum virtual finish
// time, we are not going to advance the round robin index here. // time, we are not going to advance the round robin index here.
if klog.V(4).Enabled() {
klog.Infof("QS(%s): request %v %v seats %d cannot be dispatched from queue %d, waiting for currently executing requests to complete, %d requests are occupying %d seats and the limit is %d",
qs.qCfg.Name, oldestReqFromMinQueue.descr1, oldestReqFromMinQueue.descr2, oldestReqFromMinQueue.Seats(), minQueue.index, qs.totRequestsExecuting, qs.totSeatsInUse, qs.dCfg.ConcurrencyLimit)
}
return nil return nil
} }
@ -743,7 +752,7 @@ func (qs *queueSet) selectQueueLocked() *queue {
// time. // time.
// //
// hence we're refreshing the per-queue virtual time for the chosen // hence we're refreshing the per-queue virtual time for the chosen
// queue here. if the last virtual start time (excluded estimated cost) // queue here. if the last start R (excluded estimated cost)
// falls behind the global virtual time, we update the latest virtual // falls behind the global virtual time, we update the latest virtual
// start by: <latest global virtual time> + <previously estimated cost> // start by: <latest global virtual time> + <previously estimated cost>
previouslyEstimatedServiceTime := float64(minQueue.seatsInUse) * qs.estimatedServiceTime previouslyEstimatedServiceTime := float64(minQueue.seatsInUse) * qs.estimatedServiceTime
@ -790,12 +799,6 @@ func (qs *queueSet) finishRequestLocked(r *request) {
metrics.AddRequestConcurrencyInUse(qs.qCfg.Name, r.fsName, -r.Seats()) metrics.AddRequestConcurrencyInUse(qs.qCfg.Name, r.fsName, -r.Seats())
if r.queue != nil { if r.queue != nil {
r.queue.seatsInUse -= r.Seats() r.queue.seatsInUse -= r.Seats()
if klog.V(6).Enabled() {
klog.Infof("QS(%s) at r=%s v=%.9fs: request %#+v %#+v finished, adjusted queue %d virtual start time to %.9fs due to service time %.9fs, queue will have %d waiting & %d executing",
qs.qCfg.Name, now.Format(nsTimeFmt), qs.virtualTime, r.descr1, r.descr2, r.queue.index,
r.queue.virtualStart, S, r.queue.requests.Length(), r.queue.requestsExecuting)
}
} }
} }
@ -803,34 +806,55 @@ func (qs *queueSet) finishRequestLocked(r *request) {
if r.workEstimate.AdditionalLatency <= 0 { if r.workEstimate.AdditionalLatency <= 0 {
// release the seats allocated to this request immediately // release the seats allocated to this request immediately
releaseSeatsLocked() releaseSeatsLocked()
if !klog.V(6).Enabled() {
} else if r.queue != nil {
klog.Infof("QS(%s) at r=%s v=%.9fss: request %#+v %#+v finished all use of %d seats, adjusted queue %d start R to %.9fss due to service time %.9fs, queue will have %d requests, %d seats waiting & %d requests occupying %d seats",
qs.qCfg.Name, now.Format(nsTimeFmt), qs.virtualTime, r.descr1, r.descr2, r.workEstimate.Seats, r.queue.index,
r.queue.virtualStart, S, r.queue.requests.Length(), r.queue.requests.SeatsSum(), r.queue.requestsExecuting, r.queue.seatsInUse)
} else {
klog.Infof("QS(%s) at r=%s v=%.9fss: request %#+v %#+v finished all use of %d seats, qs will have %d requests occupying %d seats", qs.qCfg.Name, now.Format(nsTimeFmt), qs.virtualTime, r.descr1, r.descr2, r.workEstimate.Seats, qs.totRequestsExecuting, qs.totSeatsInUse)
}
return return
} }
additionalLatency := r.workEstimate.AdditionalLatency additionalLatency := r.workEstimate.AdditionalLatency
if !klog.V(6).Enabled() {
} else if r.queue != nil {
klog.Infof("QS(%s) at r=%s v=%.9fss: request %#+v %#+v finished main use but lingering on %d seats for %v seconds, adjusted queue %d start R to %.9fss due to service time %.9fs, queue will have %d requests waiting & %d executing, still has %d seats waiting & %d executing",
qs.qCfg.Name, now.Format(nsTimeFmt), qs.virtualTime, r.descr1, r.descr2, r.workEstimate.Seats, additionalLatency.Seconds(), r.queue.index,
r.queue.virtualStart, S, r.queue.requests.Length(), r.queue.requestsExecuting, r.queue.requests.SeatsSum(), r.queue.seatsInUse)
} else {
klog.Infof("QS(%s) at r=%s v=%.9fss: request %#+v %#+v finished main use but lingering on %d seats for %v seconds, qs will have %d requests occupying %d seats", qs.qCfg.Name, now.Format(nsTimeFmt), qs.virtualTime, r.descr1, r.descr2, r.workEstimate.Seats, additionalLatency.Seconds(), qs.totRequestsExecuting, qs.totSeatsInUse)
}
// EventAfterDuration will execute the event func in a new goroutine, // EventAfterDuration will execute the event func in a new goroutine,
// so the seats allocated to this request will be released after // so the seats allocated to this request will be released after
// AdditionalLatency elapses, this ensures that the additional // AdditionalLatency elapses, this ensures that the additional
// latency has no impact on the user experience. // latency has no impact on the user experience.
qs.clock.EventAfterDuration(func(_ time.Time) { qs.clock.EventAfterDuration(func(_ time.Time) {
qs.lock.Lock() qs.lockAndSyncTime()
defer qs.lock.Unlock() defer qs.lock.Unlock()
now := qs.clock.Now()
releaseSeatsLocked() releaseSeatsLocked()
if !klog.V(6).Enabled() {
} else if r.queue != nil {
klog.Infof("QS(%s) at r=%s v=%.9fss: request %#+v %#+v finished lingering on %d seats, queue %d will have %d requests, %d seats waiting & %d requests occupying %d seats",
qs.qCfg.Name, now.Format(nsTimeFmt), qs.virtualTime, r.descr1, r.descr2, r.workEstimate.Seats, r.queue.index,
r.queue.requests.Length(), r.queue.requests.SeatsSum(), r.queue.requestsExecuting, r.queue.seatsInUse)
} else {
klog.Infof("QS(%s) at r=%s v=%.9fss: request %#+v %#+v finished lingering on %d seats, qs will have %d requests occupying %d seats", qs.qCfg.Name, now.Format(nsTimeFmt), qs.virtualTime, r.descr1, r.descr2, r.workEstimate.Seats, qs.totRequestsExecuting, qs.totSeatsInUse)
}
qs.dispatchAsMuchAsPossibleLocked()
}, additionalLatency) }, additionalLatency)
}() }()
if r.queue == nil { if r.queue != nil {
if klog.V(6).Enabled() { // request has finished, remove from requests executing
klog.Infof("QS(%s) at r=%s v=%.9fs: request %#+v %#+v finished, qs will have %d executing", qs.qCfg.Name, now.Format(nsTimeFmt), qs.virtualTime, r.descr1, r.descr2, qs.totRequestsExecuting) r.queue.requestsExecuting--
}
return // When a request finishes being served, and the actual service time was S,
// the queues start R is decremented by (G - S)*width.
r.queue.virtualStart -= (qs.estimatedServiceTime - S) * float64(r.Seats())
} }
// request has finished, remove from requests executing
r.queue.requestsExecuting--
// When a request finishes being served, and the actual service time was S,
// the queues virtual start time is decremented by (G - S)*width.
r.queue.virtualStart -= (qs.estimatedServiceTime - S) * float64(r.Seats())
} }
func (qs *queueSet) removeQueueIfEmptyLocked(r *request) { func (qs *queueSet) removeQueueIfEmptyLocked(r *request) {

View File

@ -108,10 +108,46 @@ type uniformClient struct {
// causing a request to be launched a certain amount of time // causing a request to be launched a certain amount of time
// before the previous one finishes. // before the previous one finishes.
thinkDuration time.Duration thinkDuration time.Duration
// padDuration is additional time during which this request occupies its seats.
// This comes at the end of execution, after the reply has been released toward
// the client.
// The evaluation code below can only handle two cases:
// - this padding always keeps another request out of the seats, or
// - this padding never keeps another request out of the seats.
// Set the `padConstrains` field of the scenario accordingly.
padDuration time.Duration
// When true indicates that only half the specified number of // When true indicates that only half the specified number of
// threads should run during the first half of the evaluation // threads should run during the first half of the evaluation
// period // period
split bool split bool
// width is the number of seats this request occupies while executing
width uint
}
func newUniformClient(hash uint64, nThreads, nCalls int, execDuration, thinkDuration time.Duration) uniformClient {
return uniformClient{
hash: hash,
nThreads: nThreads,
nCalls: nCalls,
execDuration: execDuration,
thinkDuration: thinkDuration,
width: 1,
}
}
func (uc uniformClient) setSplit() uniformClient {
uc.split = true
return uc
}
func (uc uniformClient) seats(width uint) uniformClient {
uc.width = width
return uc
}
func (uc uniformClient) pad(duration time.Duration) uniformClient {
uc.padDuration = duration
return uc
} }
// uniformScenario describes a scenario based on the given set of uniform clients. // uniformScenario describes a scenario based on the given set of uniform clients.
@ -127,6 +163,8 @@ type uniformClient struct {
// fair in the respective halves of a split scenario; // fair in the respective halves of a split scenario;
// in a non-split scenario this is a singleton with one expectation. // in a non-split scenario this is a singleton with one expectation.
// expectAllRequests indicates whether all requests are expected to get dispatched. // expectAllRequests indicates whether all requests are expected to get dispatched.
// padConstrains indicates whether the execution duration padding, if any,
// is expected to hold up dispatching.
type uniformScenario struct { type uniformScenario struct {
name string name string
qs fq.QueueSet qs fq.QueueSet
@ -140,13 +178,14 @@ type uniformScenario struct {
rejectReason string rejectReason string
clk *testeventclock.Fake clk *testeventclock.Fake
counter counter.GoRoutineCounter counter counter.GoRoutineCounter
padConstrains bool
} }
func (us uniformScenario) exercise(t *testing.T) { func (us uniformScenario) exercise(t *testing.T) {
uss := uniformScenarioState{ uss := uniformScenarioState{
t: t, t: t,
uniformScenario: us, uniformScenario: us,
startTime: time.Now(), startTime: us.clk.Now(),
integrators: make([]fq.Integrator, len(us.clients)), integrators: make([]fq.Integrator, len(us.clients)),
executions: make([]int32, len(us.clients)), executions: make([]int32, len(us.clients)),
rejects: make([]int32, len(us.clients)), rejects: make([]int32, len(us.clients)),
@ -227,7 +266,7 @@ func (ust *uniformScenarioThread) callK(k int) {
if k >= ust.nCalls { if k >= ust.nCalls {
return return
} }
req, idle := ust.uss.qs.StartRequest(context.Background(), &fcrequest.WorkEstimate{Seats: 1}, ust.uc.hash, "", ust.fsName, ust.uss.name, []int{ust.i, ust.j, k}, nil) req, idle := ust.uss.qs.StartRequest(context.Background(), &fcrequest.WorkEstimate{Seats: ust.uc.width, AdditionalLatency: ust.uc.padDuration}, ust.uc.hash, "", ust.fsName, ust.uss.name, []int{ust.i, ust.j, k}, nil)
ust.uss.t.Logf("%s: %d, %d, %d got req=%p, idle=%v", ust.uss.clk.Now().Format(nsTimeFmt), ust.i, ust.j, k, req, idle) ust.uss.t.Logf("%s: %d, %d, %d got req=%p, idle=%v", ust.uss.clk.Now().Format(nsTimeFmt), ust.i, ust.j, k, req, idle)
if req == nil { if req == nil {
atomic.AddUint64(&ust.uss.failedCount, 1) atomic.AddUint64(&ust.uss.failedCount, 1)
@ -238,20 +277,25 @@ func (ust *uniformScenarioThread) callK(k int) {
ust.uss.t.Error("got request but QueueSet reported idle") ust.uss.t.Error("got request but QueueSet reported idle")
} }
var executed bool var executed bool
var returnTime time.Time
idle2 := req.Finish(func() { idle2 := req.Finish(func() {
executed = true executed = true
execStart := ust.uss.clk.Now() execStart := ust.uss.clk.Now()
ust.uss.t.Logf("%s: %d, %d, %d executing", execStart.Format(nsTimeFmt), ust.i, ust.j, k)
atomic.AddInt32(&ust.uss.executions[ust.i], 1) atomic.AddInt32(&ust.uss.executions[ust.i], 1)
ust.igr.Add(1) ust.igr.Add(float64(ust.uc.width))
ust.uss.t.Logf("%s: %d, %d, %d executing; seats=%d", execStart.Format(nsTimeFmt), ust.i, ust.j, k, ust.uc.width)
ust.uss.clk.EventAfterDuration(ust.genCallK(k+1), ust.uc.execDuration+ust.uc.thinkDuration) ust.uss.clk.EventAfterDuration(ust.genCallK(k+1), ust.uc.execDuration+ust.uc.thinkDuration)
ust.uss.clk.Sleep(ust.uc.execDuration) ust.uss.clk.Sleep(ust.uc.execDuration)
ust.igr.Add(-1) ust.igr.Add(-float64(ust.uc.width))
returnTime = ust.uss.clk.Now()
}) })
ust.uss.t.Logf("%s: %d, %d, %d got executed=%v, idle2=%v", ust.uss.clk.Now().Format(nsTimeFmt), ust.i, ust.j, k, executed, idle2) now := ust.uss.clk.Now()
ust.uss.t.Logf("%s: %d, %d, %d got executed=%v, idle2=%v", now.Format(nsTimeFmt), ust.i, ust.j, k, executed, idle2)
if !executed { if !executed {
atomic.AddUint64(&ust.uss.failedCount, 1) atomic.AddUint64(&ust.uss.failedCount, 1)
atomic.AddInt32(&ust.uss.rejects[ust.i], 1) atomic.AddInt32(&ust.uss.rejects[ust.i], 1)
} else if now != returnTime {
ust.uss.t.Errorf("%s: %d, %d, %d returnTime=%s", now.Format(nsTimeFmt), ust.i, ust.j, k, returnTime.Format(nsTimeFmt))
} }
} }
@ -270,23 +314,28 @@ func (uss *uniformScenarioState) evalTo(lim time.Time, last, expectFair bool, ma
if uc.split && !last { if uc.split && !last {
nThreads = nThreads / 2 nThreads = nThreads / 2
} }
demands[i] = float64(nThreads) * float64(uc.execDuration) / float64(uc.thinkDuration+uc.execDuration) sep := uc.thinkDuration
if uss.padConstrains && uc.padDuration > sep {
sep = uc.padDuration
}
demands[i] = float64(nThreads) * float64(uc.width) * float64(uc.execDuration) / float64(sep+uc.execDuration)
averages[i] = uss.integrators[i].Reset().Average averages[i] = uss.integrators[i].Reset().Average
} }
fairAverages := fairAlloc(demands, float64(uss.concurrencyLimit)) fairAverages := fairAlloc(demands, float64(uss.concurrencyLimit))
for i := range uss.clients { for i := range uss.clients {
expectedAverage := fairAverages[i]
var gotFair bool var gotFair bool
if fairAverages[i] > 0 { if expectedAverage > 0 {
relDiff := (averages[i] - fairAverages[i]) / fairAverages[i] relDiff := (averages[i] - expectedAverage) / expectedAverage
gotFair = math.Abs(relDiff) <= margin gotFair = math.Abs(relDiff) <= margin
} else { } else {
gotFair = math.Abs(averages[i]) <= margin gotFair = math.Abs(averages[i]) <= margin
} }
if gotFair != expectFair { if gotFair != expectFair {
uss.t.Errorf("%s client %d last=%v got an Average of %v but the fair average was %v", uss.name, i, last, averages[i], fairAverages[i]) uss.t.Errorf("%s client %d last=%v got an Average of %v but the expected average was %v", uss.name, i, last, averages[i], expectedAverage)
} else { } else {
uss.t.Logf("%s client %d last=%v got an Average of %v and the fair average was %v", uss.name, i, last, averages[i], fairAverages[i]) uss.t.Logf("%s client %d last=%v got an Average of %v and the expected average was %v", uss.name, i, last, averages[i], expectedAverage)
} }
} }
} }
@ -376,8 +425,8 @@ func TestNoRestraint(t *testing.T) {
uniformScenario{name: "NoRestraint", uniformScenario{name: "NoRestraint",
qs: nr, qs: nr,
clients: []uniformClient{ clients: []uniformClient{
{1001001001, 5, 10, time.Second, time.Second, false}, newUniformClient(1001001001, 5, 10, time.Second, time.Second),
{2002002002, 2, 10, time.Second, time.Second / 2, false}, newUniformClient(2002002002, 2, 10, time.Second, time.Second/2),
}, },
concurrencyLimit: 10, concurrencyLimit: 10,
evalDuration: time.Second * 15, evalDuration: time.Second * 15,
@ -389,6 +438,90 @@ func TestNoRestraint(t *testing.T) {
}.exercise(t) }.exercise(t)
} }
func TestBaseline(t *testing.T) {
metrics.Register()
now := time.Now()
clk, counter := testeventclock.NewFake(now, 0, nil)
qsf := newTestableQueueSetFactory(clk, countingPromiseFactoryFactory(counter))
qCfg := fq.QueuingConfig{
Name: "TestBaseline",
DesiredNumQueues: 9,
QueueLengthLimit: 8,
HandSize: 3,
RequestWaitLimit: 10 * time.Minute,
}
qsc, err := qsf.BeginConstruction(qCfg, newObserverPair(clk))
if err != nil {
t.Fatal(err)
}
qs := qsc.Complete(fq.DispatchingConfig{ConcurrencyLimit: 1})
uniformScenario{name: qCfg.Name,
qs: qs,
clients: []uniformClient{
newUniformClient(1001001001, 1, 21, time.Second, 0),
},
concurrencyLimit: 1,
evalDuration: time.Second * 20,
expectedFair: []bool{true},
expectedFairnessMargin: []float64{0},
expectAllRequests: true,
evalInqueueMetrics: true,
evalExecutingMetrics: true,
clk: clk,
counter: counter,
}.exercise(t)
}
func TestSeparations(t *testing.T) {
for _, seps := range []struct{ think, pad time.Duration }{
{think: time.Second, pad: 0},
{think: 0, pad: time.Second},
{think: time.Second, pad: time.Second / 2},
{think: time.Second / 2, pad: time.Second},
} {
for conc := 1; conc <= 2; conc++ {
caseName := fmt.Sprintf("seps%v,%v,%v", seps.think, seps.pad, conc)
t.Run(caseName, func(t *testing.T) {
metrics.Register()
now := time.Now()
clk, counter := testeventclock.NewFake(now, 0, nil)
qsf := newTestableQueueSetFactory(clk, countingPromiseFactoryFactory(counter))
qCfg := fq.QueuingConfig{
Name: caseName,
DesiredNumQueues: 9,
QueueLengthLimit: 8,
HandSize: 3,
RequestWaitLimit: 10 * time.Minute,
}
qsc, err := qsf.BeginConstruction(qCfg, newObserverPair(clk))
if err != nil {
t.Fatal(err)
}
qs := qsc.Complete(fq.DispatchingConfig{ConcurrencyLimit: conc})
uniformScenario{name: qCfg.Name,
qs: qs,
clients: []uniformClient{
newUniformClient(1001001001, 1, 19, time.Second, seps.think).pad(seps.pad),
},
concurrencyLimit: conc,
evalDuration: time.Second * 18, // multiple of every period involved, so that margin can be 0 below
expectedFair: []bool{true},
expectedFairnessMargin: []float64{0},
expectAllRequests: true,
evalInqueueMetrics: true,
evalExecutingMetrics: true,
clk: clk,
counter: counter,
padConstrains: conc == 1,
}.exercise(t)
})
}
}
}
func TestUniformFlowsHandSize1(t *testing.T) { func TestUniformFlowsHandSize1(t *testing.T) {
metrics.Register() metrics.Register()
now := time.Now() now := time.Now()
@ -411,8 +544,8 @@ func TestUniformFlowsHandSize1(t *testing.T) {
uniformScenario{name: qCfg.Name, uniformScenario{name: qCfg.Name,
qs: qs, qs: qs,
clients: []uniformClient{ clients: []uniformClient{
{1001001001, 8, 20, time.Second, time.Second - 1, false}, newUniformClient(1001001001, 8, 20, time.Second, time.Second-1),
{2002002002, 8, 20, time.Second, time.Second - 1, false}, newUniformClient(2002002002, 8, 20, time.Second, time.Second-1),
}, },
concurrencyLimit: 4, concurrencyLimit: 4,
evalDuration: time.Second * 50, evalDuration: time.Second * 50,
@ -447,8 +580,8 @@ func TestUniformFlowsHandSize3(t *testing.T) {
uniformScenario{name: qCfg.Name, uniformScenario{name: qCfg.Name,
qs: qs, qs: qs,
clients: []uniformClient{ clients: []uniformClient{
{1001001001, 8, 30, time.Second, time.Second - 1, false}, newUniformClient(1001001001, 8, 30, time.Second, time.Second-1),
{2002002002, 8, 30, time.Second, time.Second - 1, false}, newUniformClient(2002002002, 8, 30, time.Second, time.Second-1),
}, },
concurrencyLimit: 4, concurrencyLimit: 4,
evalDuration: time.Second * 60, evalDuration: time.Second * 60,
@ -484,8 +617,8 @@ func TestDifferentFlowsExpectEqual(t *testing.T) {
uniformScenario{name: qCfg.Name, uniformScenario{name: qCfg.Name,
qs: qs, qs: qs,
clients: []uniformClient{ clients: []uniformClient{
{1001001001, 8, 20, time.Second, time.Second, false}, newUniformClient(1001001001, 8, 20, time.Second, time.Second),
{2002002002, 7, 30, time.Second, time.Second / 2, false}, newUniformClient(2002002002, 7, 30, time.Second, time.Second/2),
}, },
concurrencyLimit: 4, concurrencyLimit: 4,
evalDuration: time.Second * 40, evalDuration: time.Second * 40,
@ -521,8 +654,8 @@ func TestDifferentFlowsExpectUnequal(t *testing.T) {
uniformScenario{name: qCfg.Name, uniformScenario{name: qCfg.Name,
qs: qs, qs: qs,
clients: []uniformClient{ clients: []uniformClient{
{1001001001, 4, 20, time.Second, time.Second - 1, false}, newUniformClient(1001001001, 4, 20, time.Second, time.Second-1),
{2002002002, 2, 20, time.Second, time.Second - 1, false}, newUniformClient(2002002002, 2, 20, time.Second, time.Second-1),
}, },
concurrencyLimit: 3, concurrencyLimit: 3,
evalDuration: time.Second * 20, evalDuration: time.Second * 20,
@ -536,6 +669,81 @@ func TestDifferentFlowsExpectUnequal(t *testing.T) {
}.exercise(t) }.exercise(t)
} }
func TestDifferentWidths(t *testing.T) {
metrics.Register()
now := time.Now()
clk, counter := testeventclock.NewFake(now, 0, nil)
qsf := newTestableQueueSetFactory(clk, countingPromiseFactoryFactory(counter))
qCfg := fq.QueuingConfig{
Name: "TestDifferentWidths",
DesiredNumQueues: 64,
QueueLengthLimit: 4,
HandSize: 7,
RequestWaitLimit: 10 * time.Minute,
}
qsc, err := qsf.BeginConstruction(qCfg, newObserverPair(clk))
if err != nil {
t.Fatal(err)
}
qs := qsc.Complete(fq.DispatchingConfig{ConcurrencyLimit: 6})
uniformScenario{name: qCfg.Name,
qs: qs,
clients: []uniformClient{
newUniformClient(10010010010010, 13, 10, time.Second, time.Second-1),
newUniformClient(20020020020020, 7, 10, time.Second, time.Second-1).seats(2),
},
concurrencyLimit: 6,
evalDuration: time.Second * 20,
expectedFair: []bool{true},
expectedFairnessMargin: []float64{0.1},
expectAllRequests: true,
evalInqueueMetrics: true,
evalExecutingMetrics: true,
clk: clk,
counter: counter,
}.exercise(t)
}
func TestTooWide(t *testing.T) {
metrics.Register()
now := time.Now()
clk, counter := testeventclock.NewFake(now, 0, nil)
qsf := newTestableQueueSetFactory(clk, countingPromiseFactoryFactory(counter))
qCfg := fq.QueuingConfig{
Name: "TestTooWide",
DesiredNumQueues: 64,
QueueLengthLimit: 7,
HandSize: 7,
RequestWaitLimit: 10 * time.Minute,
}
qsc, err := qsf.BeginConstruction(qCfg, newObserverPair(clk))
if err != nil {
t.Fatal(err)
}
qs := qsc.Complete(fq.DispatchingConfig{ConcurrencyLimit: 6})
uniformScenario{name: qCfg.Name,
qs: qs,
clients: []uniformClient{
newUniformClient(40040040040040, 15, 21, time.Second, time.Second-1).seats(2),
newUniformClient(50050050050050, 15, 21, time.Second, time.Second-1).seats(2),
newUniformClient(60060060060060, 15, 21, time.Second, time.Second-1).seats(2),
newUniformClient(70070070070070, 15, 21, time.Second, time.Second-1).seats(2),
newUniformClient(90090090090090, 15, 21, time.Second, time.Second-1).seats(7),
},
concurrencyLimit: 6,
evalDuration: time.Second * 40,
expectedFair: []bool{true},
expectedFairnessMargin: []float64{0.35},
expectAllRequests: true,
evalInqueueMetrics: true,
evalExecutingMetrics: true,
clk: clk,
counter: counter,
}.exercise(t)
}
func TestWindup(t *testing.T) { func TestWindup(t *testing.T) {
metrics.Register() metrics.Register()
now := time.Now() now := time.Now()
@ -557,8 +765,8 @@ func TestWindup(t *testing.T) {
uniformScenario{name: qCfg.Name, qs: qs, uniformScenario{name: qCfg.Name, qs: qs,
clients: []uniformClient{ clients: []uniformClient{
{1001001001, 2, 40, time.Second, -1, false}, newUniformClient(1001001001, 2, 40, time.Second, -1),
{2002002002, 2, 40, time.Second, -1, true}, newUniformClient(2002002002, 2, 40, time.Second, -1).setSplit(),
}, },
concurrencyLimit: 3, concurrencyLimit: 3,
evalDuration: time.Second * 40, evalDuration: time.Second * 40,
@ -591,8 +799,8 @@ func TestDifferentFlowsWithoutQueuing(t *testing.T) {
uniformScenario{name: qCfg.Name, uniformScenario{name: qCfg.Name,
qs: qs, qs: qs,
clients: []uniformClient{ clients: []uniformClient{
{1001001001, 6, 10, time.Second, 57 * time.Millisecond, false}, newUniformClient(1001001001, 6, 10, time.Second, 57*time.Millisecond),
{2002002002, 4, 15, time.Second, 750 * time.Millisecond, false}, newUniformClient(2002002002, 4, 15, time.Second, 750*time.Millisecond),
}, },
concurrencyLimit: 4, concurrencyLimit: 4,
evalDuration: time.Second * 13, evalDuration: time.Second * 13,
@ -627,7 +835,7 @@ func TestTimeout(t *testing.T) {
uniformScenario{name: qCfg.Name, uniformScenario{name: qCfg.Name,
qs: qs, qs: qs,
clients: []uniformClient{ clients: []uniformClient{
{1001001001, 5, 100, time.Second, time.Second, false}, newUniformClient(1001001001, 5, 100, time.Second, time.Second),
}, },
concurrencyLimit: 1, concurrencyLimit: 1,
evalDuration: time.Second * 10, evalDuration: time.Second * 10,

View File

@ -79,9 +79,8 @@ type queue struct {
// The requests are stored in a FIFO list. // The requests are stored in a FIFO list.
requests fifo requests fifo
// virtualStart is the virtual time (virtual seconds since process // virtualStart is the "virtual time" (R progress meter reading) at
// startup) when the oldest request in the queue (if there is any) // which the next request will be dispatched in the virtual world.
// started virtually executing
virtualStart float64 virtualStart float64
requestsExecuting int requestsExecuting int