mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-28 05:57:25 +00:00
Fix extra latency and add tests for that and width
Added missing dispatching after delayed release of seats. Updated logging for all six situations of execution completion and seat release. Added behavioral tests for non-zero extra latency and non-unit width. Also added two tests for baseline functionality. Also improved some comments and other logging in `queueset.go`.
This commit is contained in:
parent
3c72622a1f
commit
d2a27a58f0
@ -18,6 +18,7 @@ package queueset
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
|
"errors"
|
||||||
"fmt"
|
"fmt"
|
||||||
"math"
|
"math"
|
||||||
"sync"
|
"sync"
|
||||||
@ -101,7 +102,8 @@ type queueSet struct {
|
|||||||
// queues are still draining.
|
// queues are still draining.
|
||||||
queues []*queue
|
queues []*queue
|
||||||
|
|
||||||
// virtualTime is the number of virtual seconds since process startup
|
// virtualTime is the amount of seat-seconds allocated per queue since process startup.
|
||||||
|
// This is our generalization of the progress meter named R in the original fair queuing work.
|
||||||
virtualTime float64
|
virtualTime float64
|
||||||
|
|
||||||
// lastRealTime is what `clock.Now()` yielded when `virtualTime` was last updated
|
// lastRealTime is what `clock.Now()` yielded when `virtualTime` was last updated
|
||||||
@ -477,13 +479,15 @@ func (qs *queueSet) chooseQueueIndexLocked(hashValue uint64, descr1, descr2 inte
|
|||||||
// in addition to their seats.
|
// in addition to their seats.
|
||||||
// Ideally, this should be based on projected completion time in the
|
// Ideally, this should be based on projected completion time in the
|
||||||
// virtual world of the youngest request in the queue.
|
// virtual world of the youngest request in the queue.
|
||||||
thisSeatsSum := qs.queues[queueIdx].requests.SeatsSum()
|
queue := qs.queues[queueIdx]
|
||||||
klog.V(7).Infof("QS(%s): For request %#+v %#+v considering queue %d of seatsSum %d", qs.qCfg.Name, descr1, descr2, queueIdx, thisSeatsSum)
|
waitingSeats := queue.requests.SeatsSum()
|
||||||
|
thisSeatsSum := waitingSeats // + queue.seatsInUse
|
||||||
|
klog.V(7).Infof("QS(%s): For request %#+v %#+v considering queue %d of %d seats waiting and %d executing", qs.qCfg.Name, descr1, descr2, queueIdx, waitingSeats, queue.seatsInUse)
|
||||||
if thisSeatsSum < bestQueueSeatsSum {
|
if thisSeatsSum < bestQueueSeatsSum {
|
||||||
bestQueueIdx, bestQueueSeatsSum = queueIdx, thisSeatsSum
|
bestQueueIdx, bestQueueSeatsSum = queueIdx, thisSeatsSum
|
||||||
}
|
}
|
||||||
})
|
})
|
||||||
klog.V(6).Infof("QS(%s) at r=%s v=%.9fs: For request %#+v %#+v chose queue %d, had %d waiting & %d executing", qs.qCfg.Name, qs.clock.Now().Format(nsTimeFmt), qs.virtualTime, descr1, descr2, bestQueueIdx, bestQueueSeatsSum, qs.queues[bestQueueIdx].requestsExecuting)
|
klog.V(6).Infof("QS(%s) at r=%s v=%.9fss: For request %#+v %#+v chose queue %d, had seatSum %d & %d requests executing", qs.qCfg.Name, qs.clock.Now().Format(nsTimeFmt), qs.virtualTime, descr1, descr2, bestQueueIdx, bestQueueSeatsSum, qs.queues[bestQueueIdx].requestsExecuting)
|
||||||
return bestQueueIdx
|
return bestQueueIdx
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -549,10 +553,10 @@ func (qs *queueSet) enqueueLocked(request *request) {
|
|||||||
queue := request.queue
|
queue := request.queue
|
||||||
now := qs.clock.Now()
|
now := qs.clock.Now()
|
||||||
if queue.requests.Length() == 0 && queue.requestsExecuting == 0 {
|
if queue.requests.Length() == 0 && queue.requestsExecuting == 0 {
|
||||||
// the queue’s virtual start time is set to the virtual time.
|
// the queue’s start R is set to the virtual time.
|
||||||
queue.virtualStart = qs.virtualTime
|
queue.virtualStart = qs.virtualTime
|
||||||
if klog.V(6).Enabled() {
|
if klog.V(6).Enabled() {
|
||||||
klog.Infof("QS(%s) at r=%s v=%.9fs: initialized queue %d virtual start time due to request %#+v %#+v", qs.qCfg.Name, now.Format(nsTimeFmt), queue.virtualStart, queue.index, request.descr1, request.descr2)
|
klog.Infof("QS(%s) at r=%s v=%.9fss: initialized queue %d start R due to request %#+v %#+v", qs.qCfg.Name, now.Format(nsTimeFmt), queue.virtualStart, queue.index, request.descr1, request.descr2)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
queue.Enqueue(request)
|
queue.Enqueue(request)
|
||||||
@ -598,7 +602,7 @@ func (qs *queueSet) dispatchSansQueueLocked(ctx context.Context, workEstimate *f
|
|||||||
metrics.AddRequestConcurrencyInUse(qs.qCfg.Name, fsName, req.Seats())
|
metrics.AddRequestConcurrencyInUse(qs.qCfg.Name, fsName, req.Seats())
|
||||||
qs.obsPair.RequestsExecuting.Add(1)
|
qs.obsPair.RequestsExecuting.Add(1)
|
||||||
if klog.V(5).Enabled() {
|
if klog.V(5).Enabled() {
|
||||||
klog.Infof("QS(%s) at r=%s v=%.9fs: immediate dispatch of request %q %#+v %#+v, qs will have %d executing", qs.qCfg.Name, now.Format(nsTimeFmt), qs.virtualTime, fsName, descr1, descr2, qs.totRequestsExecuting)
|
klog.Infof("QS(%s) at r=%s v=%.9fss: immediate dispatch of request %q %#+v %#+v, qs will have %d executing", qs.qCfg.Name, now.Format(nsTimeFmt), qs.virtualTime, fsName, descr1, descr2, qs.totRequestsExecuting)
|
||||||
}
|
}
|
||||||
return req
|
return req
|
||||||
}
|
}
|
||||||
@ -634,9 +638,9 @@ func (qs *queueSet) dispatchLocked() bool {
|
|||||||
qs.obsPair.RequestsWaiting.Add(-1)
|
qs.obsPair.RequestsWaiting.Add(-1)
|
||||||
qs.obsPair.RequestsExecuting.Add(1)
|
qs.obsPair.RequestsExecuting.Add(1)
|
||||||
if klog.V(6).Enabled() {
|
if klog.V(6).Enabled() {
|
||||||
klog.Infof("QS(%s) at r=%s v=%.9fs: dispatching request %#+v %#+v from queue %d with virtual start time %.9fs, queue will have %d waiting & %d executing",
|
klog.Infof("QS(%s) at r=%s v=%.9fss: dispatching request %#+v %#+v work %v from queue %d with start R %.9fss, queue will have %d waiting & %d requests occupying %d seats, set will have %d seats occupied",
|
||||||
qs.qCfg.Name, request.startTime.Format(nsTimeFmt), qs.virtualTime, request.descr1, request.descr2,
|
qs.qCfg.Name, request.startTime.Format(nsTimeFmt), qs.virtualTime, request.descr1, request.descr2,
|
||||||
queue.index, queue.virtualStart, queue.requests.Length(), queue.requestsExecuting)
|
request.workEstimate, queue.index, queue.virtualStart, queue.requests.Length(), queue.requestsExecuting, queue.seatsInUse, qs.totSeatsInUse)
|
||||||
}
|
}
|
||||||
// When a request is dequeued for service -> qs.virtualStart += G
|
// When a request is dequeued for service -> qs.virtualStart += G
|
||||||
queue.virtualStart += qs.estimatedServiceTime * float64(request.Seats())
|
queue.virtualStart += qs.estimatedServiceTime * float64(request.Seats())
|
||||||
@ -659,10 +663,6 @@ func (qs *queueSet) canAccommodateSeatsLocked(seats int) bool {
|
|||||||
}
|
}
|
||||||
// wait for all "currently" executing requests in this queueSet
|
// wait for all "currently" executing requests in this queueSet
|
||||||
// to finish before we can execute this request.
|
// to finish before we can execute this request.
|
||||||
if klog.V(4).Enabled() {
|
|
||||||
klog.Infof("QS(%s): seats (%d) asked for exceeds concurrency limit, waiting for currently executing requests to complete, %d seats are in use (%d are executing) and the limit is %d",
|
|
||||||
qs.qCfg.Name, seats, qs.totSeatsInUse, qs.totRequestsExecuting, qs.dCfg.ConcurrencyLimit)
|
|
||||||
}
|
|
||||||
return false
|
return false
|
||||||
case qs.totSeatsInUse+seats > qs.dCfg.ConcurrencyLimit:
|
case qs.totSeatsInUse+seats > qs.dCfg.ConcurrencyLimit:
|
||||||
return false
|
return false
|
||||||
@ -692,8 +692,8 @@ func (qs *queueSet) selectQueueLocked() *queue {
|
|||||||
estimatedWorkInProgress := qs.estimatedServiceTime * float64(queue.seatsInUse)
|
estimatedWorkInProgress := qs.estimatedServiceTime * float64(queue.seatsInUse)
|
||||||
dsMin = math.Min(dsMin, queue.virtualStart-estimatedWorkInProgress)
|
dsMin = math.Min(dsMin, queue.virtualStart-estimatedWorkInProgress)
|
||||||
dsMax = math.Max(dsMax, queue.virtualStart-estimatedWorkInProgress)
|
dsMax = math.Max(dsMax, queue.virtualStart-estimatedWorkInProgress)
|
||||||
// the virtual finish time of the oldest request is:
|
// the finish R of the oldest request is:
|
||||||
// virtual start time + G
|
// start R + G
|
||||||
// we are not taking the width of the request into account when
|
// we are not taking the width of the request into account when
|
||||||
// we calculate the virtual finish time of the request because
|
// we calculate the virtual finish time of the request because
|
||||||
// it can starve requests with smaller wdith in other queues.
|
// it can starve requests with smaller wdith in other queues.
|
||||||
@ -704,12 +704,12 @@ func (qs *queueSet) selectQueueLocked() *queue {
|
|||||||
// - we have two queues, q1 and q2
|
// - we have two queues, q1 and q2
|
||||||
// - q1 has an infinite supply of requests with width W=1
|
// - q1 has an infinite supply of requests with width W=1
|
||||||
// - q2 has one request waiting in the queue with width W=2
|
// - q2 has one request waiting in the queue with width W=2
|
||||||
// - virtual start time for both q1 and q2 are at t0
|
// - start R for both q1 and q2 are at t0
|
||||||
// - requests complete really fast, S=1ms on q1
|
// - requests complete really fast, S=1ms on q1
|
||||||
// in this scenario we will execute roughly 60,000 requests
|
// in this scenario we will execute roughly 60,000 requests
|
||||||
// from q1 before we pick the request from q2.
|
// from q1 before we pick the request from q2.
|
||||||
currentVirtualFinish := queue.virtualStart + qs.estimatedServiceTime
|
currentVirtualFinish := queue.virtualStart + qs.estimatedServiceTime
|
||||||
|
klog.V(11).InfoS("Considering queue to dispatch", "queueSet", qs.qCfg.Name, "queue", qs.robinIndex, "finishR", currentVirtualFinish)
|
||||||
if currentVirtualFinish < minVirtualFinish {
|
if currentVirtualFinish < minVirtualFinish {
|
||||||
minVirtualFinish = currentVirtualFinish
|
minVirtualFinish = currentVirtualFinish
|
||||||
minQueue = queue
|
minQueue = queue
|
||||||
@ -724,9 +724,18 @@ func (qs *queueSet) selectQueueLocked() *queue {
|
|||||||
oldestReqFromMinQueue = r
|
oldestReqFromMinQueue = r
|
||||||
return false
|
return false
|
||||||
})
|
})
|
||||||
if oldestReqFromMinQueue == nil || !qs.canAccommodateSeatsLocked(oldestReqFromMinQueue.Seats()) {
|
if oldestReqFromMinQueue == nil {
|
||||||
|
// This cannot happen
|
||||||
|
klog.ErrorS(errors.New("selected queue is empty"), "Impossible", "queueSet", qs.qCfg.Name)
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
if !qs.canAccommodateSeatsLocked(oldestReqFromMinQueue.Seats()) {
|
||||||
// since we have not picked the queue with the minimum virtual finish
|
// since we have not picked the queue with the minimum virtual finish
|
||||||
// time, we are not going to advance the round robin index here.
|
// time, we are not going to advance the round robin index here.
|
||||||
|
if klog.V(4).Enabled() {
|
||||||
|
klog.Infof("QS(%s): request %v %v seats %d cannot be dispatched from queue %d, waiting for currently executing requests to complete, %d requests are occupying %d seats and the limit is %d",
|
||||||
|
qs.qCfg.Name, oldestReqFromMinQueue.descr1, oldestReqFromMinQueue.descr2, oldestReqFromMinQueue.Seats(), minQueue.index, qs.totRequestsExecuting, qs.totSeatsInUse, qs.dCfg.ConcurrencyLimit)
|
||||||
|
}
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -743,7 +752,7 @@ func (qs *queueSet) selectQueueLocked() *queue {
|
|||||||
// time.
|
// time.
|
||||||
//
|
//
|
||||||
// hence we're refreshing the per-queue virtual time for the chosen
|
// hence we're refreshing the per-queue virtual time for the chosen
|
||||||
// queue here. if the last virtual start time (excluded estimated cost)
|
// queue here. if the last start R (excluded estimated cost)
|
||||||
// falls behind the global virtual time, we update the latest virtual
|
// falls behind the global virtual time, we update the latest virtual
|
||||||
// start by: <latest global virtual time> + <previously estimated cost>
|
// start by: <latest global virtual time> + <previously estimated cost>
|
||||||
previouslyEstimatedServiceTime := float64(minQueue.seatsInUse) * qs.estimatedServiceTime
|
previouslyEstimatedServiceTime := float64(minQueue.seatsInUse) * qs.estimatedServiceTime
|
||||||
@ -790,12 +799,6 @@ func (qs *queueSet) finishRequestLocked(r *request) {
|
|||||||
metrics.AddRequestConcurrencyInUse(qs.qCfg.Name, r.fsName, -r.Seats())
|
metrics.AddRequestConcurrencyInUse(qs.qCfg.Name, r.fsName, -r.Seats())
|
||||||
if r.queue != nil {
|
if r.queue != nil {
|
||||||
r.queue.seatsInUse -= r.Seats()
|
r.queue.seatsInUse -= r.Seats()
|
||||||
|
|
||||||
if klog.V(6).Enabled() {
|
|
||||||
klog.Infof("QS(%s) at r=%s v=%.9fs: request %#+v %#+v finished, adjusted queue %d virtual start time to %.9fs due to service time %.9fs, queue will have %d waiting & %d executing",
|
|
||||||
qs.qCfg.Name, now.Format(nsTimeFmt), qs.virtualTime, r.descr1, r.descr2, r.queue.index,
|
|
||||||
r.queue.virtualStart, S, r.queue.requests.Length(), r.queue.requestsExecuting)
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -803,34 +806,55 @@ func (qs *queueSet) finishRequestLocked(r *request) {
|
|||||||
if r.workEstimate.AdditionalLatency <= 0 {
|
if r.workEstimate.AdditionalLatency <= 0 {
|
||||||
// release the seats allocated to this request immediately
|
// release the seats allocated to this request immediately
|
||||||
releaseSeatsLocked()
|
releaseSeatsLocked()
|
||||||
|
if !klog.V(6).Enabled() {
|
||||||
|
} else if r.queue != nil {
|
||||||
|
klog.Infof("QS(%s) at r=%s v=%.9fss: request %#+v %#+v finished all use of %d seats, adjusted queue %d start R to %.9fss due to service time %.9fs, queue will have %d requests, %d seats waiting & %d requests occupying %d seats",
|
||||||
|
qs.qCfg.Name, now.Format(nsTimeFmt), qs.virtualTime, r.descr1, r.descr2, r.workEstimate.Seats, r.queue.index,
|
||||||
|
r.queue.virtualStart, S, r.queue.requests.Length(), r.queue.requests.SeatsSum(), r.queue.requestsExecuting, r.queue.seatsInUse)
|
||||||
|
} else {
|
||||||
|
klog.Infof("QS(%s) at r=%s v=%.9fss: request %#+v %#+v finished all use of %d seats, qs will have %d requests occupying %d seats", qs.qCfg.Name, now.Format(nsTimeFmt), qs.virtualTime, r.descr1, r.descr2, r.workEstimate.Seats, qs.totRequestsExecuting, qs.totSeatsInUse)
|
||||||
|
}
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
additionalLatency := r.workEstimate.AdditionalLatency
|
additionalLatency := r.workEstimate.AdditionalLatency
|
||||||
|
if !klog.V(6).Enabled() {
|
||||||
|
} else if r.queue != nil {
|
||||||
|
klog.Infof("QS(%s) at r=%s v=%.9fss: request %#+v %#+v finished main use but lingering on %d seats for %v seconds, adjusted queue %d start R to %.9fss due to service time %.9fs, queue will have %d requests waiting & %d executing, still has %d seats waiting & %d executing",
|
||||||
|
qs.qCfg.Name, now.Format(nsTimeFmt), qs.virtualTime, r.descr1, r.descr2, r.workEstimate.Seats, additionalLatency.Seconds(), r.queue.index,
|
||||||
|
r.queue.virtualStart, S, r.queue.requests.Length(), r.queue.requestsExecuting, r.queue.requests.SeatsSum(), r.queue.seatsInUse)
|
||||||
|
} else {
|
||||||
|
klog.Infof("QS(%s) at r=%s v=%.9fss: request %#+v %#+v finished main use but lingering on %d seats for %v seconds, qs will have %d requests occupying %d seats", qs.qCfg.Name, now.Format(nsTimeFmt), qs.virtualTime, r.descr1, r.descr2, r.workEstimate.Seats, additionalLatency.Seconds(), qs.totRequestsExecuting, qs.totSeatsInUse)
|
||||||
|
}
|
||||||
// EventAfterDuration will execute the event func in a new goroutine,
|
// EventAfterDuration will execute the event func in a new goroutine,
|
||||||
// so the seats allocated to this request will be released after
|
// so the seats allocated to this request will be released after
|
||||||
// AdditionalLatency elapses, this ensures that the additional
|
// AdditionalLatency elapses, this ensures that the additional
|
||||||
// latency has no impact on the user experience.
|
// latency has no impact on the user experience.
|
||||||
qs.clock.EventAfterDuration(func(_ time.Time) {
|
qs.clock.EventAfterDuration(func(_ time.Time) {
|
||||||
qs.lock.Lock()
|
qs.lockAndSyncTime()
|
||||||
defer qs.lock.Unlock()
|
defer qs.lock.Unlock()
|
||||||
|
now := qs.clock.Now()
|
||||||
releaseSeatsLocked()
|
releaseSeatsLocked()
|
||||||
|
if !klog.V(6).Enabled() {
|
||||||
|
} else if r.queue != nil {
|
||||||
|
klog.Infof("QS(%s) at r=%s v=%.9fss: request %#+v %#+v finished lingering on %d seats, queue %d will have %d requests, %d seats waiting & %d requests occupying %d seats",
|
||||||
|
qs.qCfg.Name, now.Format(nsTimeFmt), qs.virtualTime, r.descr1, r.descr2, r.workEstimate.Seats, r.queue.index,
|
||||||
|
r.queue.requests.Length(), r.queue.requests.SeatsSum(), r.queue.requestsExecuting, r.queue.seatsInUse)
|
||||||
|
} else {
|
||||||
|
klog.Infof("QS(%s) at r=%s v=%.9fss: request %#+v %#+v finished lingering on %d seats, qs will have %d requests occupying %d seats", qs.qCfg.Name, now.Format(nsTimeFmt), qs.virtualTime, r.descr1, r.descr2, r.workEstimate.Seats, qs.totRequestsExecuting, qs.totSeatsInUse)
|
||||||
|
}
|
||||||
|
qs.dispatchAsMuchAsPossibleLocked()
|
||||||
}, additionalLatency)
|
}, additionalLatency)
|
||||||
}()
|
}()
|
||||||
|
|
||||||
if r.queue == nil {
|
if r.queue != nil {
|
||||||
if klog.V(6).Enabled() {
|
|
||||||
klog.Infof("QS(%s) at r=%s v=%.9fs: request %#+v %#+v finished, qs will have %d executing", qs.qCfg.Name, now.Format(nsTimeFmt), qs.virtualTime, r.descr1, r.descr2, qs.totRequestsExecuting)
|
|
||||||
}
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
// request has finished, remove from requests executing
|
// request has finished, remove from requests executing
|
||||||
r.queue.requestsExecuting--
|
r.queue.requestsExecuting--
|
||||||
|
|
||||||
// When a request finishes being served, and the actual service time was S,
|
// When a request finishes being served, and the actual service time was S,
|
||||||
// the queue’s virtual start time is decremented by (G - S)*width.
|
// the queue’s start R is decremented by (G - S)*width.
|
||||||
r.queue.virtualStart -= (qs.estimatedServiceTime - S) * float64(r.Seats())
|
r.queue.virtualStart -= (qs.estimatedServiceTime - S) * float64(r.Seats())
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (qs *queueSet) removeQueueIfEmptyLocked(r *request) {
|
func (qs *queueSet) removeQueueIfEmptyLocked(r *request) {
|
||||||
|
@ -108,10 +108,46 @@ type uniformClient struct {
|
|||||||
// causing a request to be launched a certain amount of time
|
// causing a request to be launched a certain amount of time
|
||||||
// before the previous one finishes.
|
// before the previous one finishes.
|
||||||
thinkDuration time.Duration
|
thinkDuration time.Duration
|
||||||
|
// padDuration is additional time during which this request occupies its seats.
|
||||||
|
// This comes at the end of execution, after the reply has been released toward
|
||||||
|
// the client.
|
||||||
|
// The evaluation code below can only handle two cases:
|
||||||
|
// - this padding always keeps another request out of the seats, or
|
||||||
|
// - this padding never keeps another request out of the seats.
|
||||||
|
// Set the `padConstrains` field of the scenario accordingly.
|
||||||
|
padDuration time.Duration
|
||||||
// When true indicates that only half the specified number of
|
// When true indicates that only half the specified number of
|
||||||
// threads should run during the first half of the evaluation
|
// threads should run during the first half of the evaluation
|
||||||
// period
|
// period
|
||||||
split bool
|
split bool
|
||||||
|
// width is the number of seats this request occupies while executing
|
||||||
|
width uint
|
||||||
|
}
|
||||||
|
|
||||||
|
func newUniformClient(hash uint64, nThreads, nCalls int, execDuration, thinkDuration time.Duration) uniformClient {
|
||||||
|
return uniformClient{
|
||||||
|
hash: hash,
|
||||||
|
nThreads: nThreads,
|
||||||
|
nCalls: nCalls,
|
||||||
|
execDuration: execDuration,
|
||||||
|
thinkDuration: thinkDuration,
|
||||||
|
width: 1,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (uc uniformClient) setSplit() uniformClient {
|
||||||
|
uc.split = true
|
||||||
|
return uc
|
||||||
|
}
|
||||||
|
|
||||||
|
func (uc uniformClient) seats(width uint) uniformClient {
|
||||||
|
uc.width = width
|
||||||
|
return uc
|
||||||
|
}
|
||||||
|
|
||||||
|
func (uc uniformClient) pad(duration time.Duration) uniformClient {
|
||||||
|
uc.padDuration = duration
|
||||||
|
return uc
|
||||||
}
|
}
|
||||||
|
|
||||||
// uniformScenario describes a scenario based on the given set of uniform clients.
|
// uniformScenario describes a scenario based on the given set of uniform clients.
|
||||||
@ -127,6 +163,8 @@ type uniformClient struct {
|
|||||||
// fair in the respective halves of a split scenario;
|
// fair in the respective halves of a split scenario;
|
||||||
// in a non-split scenario this is a singleton with one expectation.
|
// in a non-split scenario this is a singleton with one expectation.
|
||||||
// expectAllRequests indicates whether all requests are expected to get dispatched.
|
// expectAllRequests indicates whether all requests are expected to get dispatched.
|
||||||
|
// padConstrains indicates whether the execution duration padding, if any,
|
||||||
|
// is expected to hold up dispatching.
|
||||||
type uniformScenario struct {
|
type uniformScenario struct {
|
||||||
name string
|
name string
|
||||||
qs fq.QueueSet
|
qs fq.QueueSet
|
||||||
@ -140,13 +178,14 @@ type uniformScenario struct {
|
|||||||
rejectReason string
|
rejectReason string
|
||||||
clk *testeventclock.Fake
|
clk *testeventclock.Fake
|
||||||
counter counter.GoRoutineCounter
|
counter counter.GoRoutineCounter
|
||||||
|
padConstrains bool
|
||||||
}
|
}
|
||||||
|
|
||||||
func (us uniformScenario) exercise(t *testing.T) {
|
func (us uniformScenario) exercise(t *testing.T) {
|
||||||
uss := uniformScenarioState{
|
uss := uniformScenarioState{
|
||||||
t: t,
|
t: t,
|
||||||
uniformScenario: us,
|
uniformScenario: us,
|
||||||
startTime: time.Now(),
|
startTime: us.clk.Now(),
|
||||||
integrators: make([]fq.Integrator, len(us.clients)),
|
integrators: make([]fq.Integrator, len(us.clients)),
|
||||||
executions: make([]int32, len(us.clients)),
|
executions: make([]int32, len(us.clients)),
|
||||||
rejects: make([]int32, len(us.clients)),
|
rejects: make([]int32, len(us.clients)),
|
||||||
@ -227,7 +266,7 @@ func (ust *uniformScenarioThread) callK(k int) {
|
|||||||
if k >= ust.nCalls {
|
if k >= ust.nCalls {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
req, idle := ust.uss.qs.StartRequest(context.Background(), &fcrequest.WorkEstimate{Seats: 1}, ust.uc.hash, "", ust.fsName, ust.uss.name, []int{ust.i, ust.j, k}, nil)
|
req, idle := ust.uss.qs.StartRequest(context.Background(), &fcrequest.WorkEstimate{Seats: ust.uc.width, AdditionalLatency: ust.uc.padDuration}, ust.uc.hash, "", ust.fsName, ust.uss.name, []int{ust.i, ust.j, k}, nil)
|
||||||
ust.uss.t.Logf("%s: %d, %d, %d got req=%p, idle=%v", ust.uss.clk.Now().Format(nsTimeFmt), ust.i, ust.j, k, req, idle)
|
ust.uss.t.Logf("%s: %d, %d, %d got req=%p, idle=%v", ust.uss.clk.Now().Format(nsTimeFmt), ust.i, ust.j, k, req, idle)
|
||||||
if req == nil {
|
if req == nil {
|
||||||
atomic.AddUint64(&ust.uss.failedCount, 1)
|
atomic.AddUint64(&ust.uss.failedCount, 1)
|
||||||
@ -238,20 +277,25 @@ func (ust *uniformScenarioThread) callK(k int) {
|
|||||||
ust.uss.t.Error("got request but QueueSet reported idle")
|
ust.uss.t.Error("got request but QueueSet reported idle")
|
||||||
}
|
}
|
||||||
var executed bool
|
var executed bool
|
||||||
|
var returnTime time.Time
|
||||||
idle2 := req.Finish(func() {
|
idle2 := req.Finish(func() {
|
||||||
executed = true
|
executed = true
|
||||||
execStart := ust.uss.clk.Now()
|
execStart := ust.uss.clk.Now()
|
||||||
ust.uss.t.Logf("%s: %d, %d, %d executing", execStart.Format(nsTimeFmt), ust.i, ust.j, k)
|
|
||||||
atomic.AddInt32(&ust.uss.executions[ust.i], 1)
|
atomic.AddInt32(&ust.uss.executions[ust.i], 1)
|
||||||
ust.igr.Add(1)
|
ust.igr.Add(float64(ust.uc.width))
|
||||||
|
ust.uss.t.Logf("%s: %d, %d, %d executing; seats=%d", execStart.Format(nsTimeFmt), ust.i, ust.j, k, ust.uc.width)
|
||||||
ust.uss.clk.EventAfterDuration(ust.genCallK(k+1), ust.uc.execDuration+ust.uc.thinkDuration)
|
ust.uss.clk.EventAfterDuration(ust.genCallK(k+1), ust.uc.execDuration+ust.uc.thinkDuration)
|
||||||
ust.uss.clk.Sleep(ust.uc.execDuration)
|
ust.uss.clk.Sleep(ust.uc.execDuration)
|
||||||
ust.igr.Add(-1)
|
ust.igr.Add(-float64(ust.uc.width))
|
||||||
|
returnTime = ust.uss.clk.Now()
|
||||||
})
|
})
|
||||||
ust.uss.t.Logf("%s: %d, %d, %d got executed=%v, idle2=%v", ust.uss.clk.Now().Format(nsTimeFmt), ust.i, ust.j, k, executed, idle2)
|
now := ust.uss.clk.Now()
|
||||||
|
ust.uss.t.Logf("%s: %d, %d, %d got executed=%v, idle2=%v", now.Format(nsTimeFmt), ust.i, ust.j, k, executed, idle2)
|
||||||
if !executed {
|
if !executed {
|
||||||
atomic.AddUint64(&ust.uss.failedCount, 1)
|
atomic.AddUint64(&ust.uss.failedCount, 1)
|
||||||
atomic.AddInt32(&ust.uss.rejects[ust.i], 1)
|
atomic.AddInt32(&ust.uss.rejects[ust.i], 1)
|
||||||
|
} else if now != returnTime {
|
||||||
|
ust.uss.t.Errorf("%s: %d, %d, %d returnTime=%s", now.Format(nsTimeFmt), ust.i, ust.j, k, returnTime.Format(nsTimeFmt))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -270,23 +314,28 @@ func (uss *uniformScenarioState) evalTo(lim time.Time, last, expectFair bool, ma
|
|||||||
if uc.split && !last {
|
if uc.split && !last {
|
||||||
nThreads = nThreads / 2
|
nThreads = nThreads / 2
|
||||||
}
|
}
|
||||||
demands[i] = float64(nThreads) * float64(uc.execDuration) / float64(uc.thinkDuration+uc.execDuration)
|
sep := uc.thinkDuration
|
||||||
|
if uss.padConstrains && uc.padDuration > sep {
|
||||||
|
sep = uc.padDuration
|
||||||
|
}
|
||||||
|
demands[i] = float64(nThreads) * float64(uc.width) * float64(uc.execDuration) / float64(sep+uc.execDuration)
|
||||||
averages[i] = uss.integrators[i].Reset().Average
|
averages[i] = uss.integrators[i].Reset().Average
|
||||||
}
|
}
|
||||||
fairAverages := fairAlloc(demands, float64(uss.concurrencyLimit))
|
fairAverages := fairAlloc(demands, float64(uss.concurrencyLimit))
|
||||||
for i := range uss.clients {
|
for i := range uss.clients {
|
||||||
|
expectedAverage := fairAverages[i]
|
||||||
var gotFair bool
|
var gotFair bool
|
||||||
if fairAverages[i] > 0 {
|
if expectedAverage > 0 {
|
||||||
relDiff := (averages[i] - fairAverages[i]) / fairAverages[i]
|
relDiff := (averages[i] - expectedAverage) / expectedAverage
|
||||||
gotFair = math.Abs(relDiff) <= margin
|
gotFair = math.Abs(relDiff) <= margin
|
||||||
} else {
|
} else {
|
||||||
gotFair = math.Abs(averages[i]) <= margin
|
gotFair = math.Abs(averages[i]) <= margin
|
||||||
}
|
}
|
||||||
|
|
||||||
if gotFair != expectFair {
|
if gotFair != expectFair {
|
||||||
uss.t.Errorf("%s client %d last=%v got an Average of %v but the fair average was %v", uss.name, i, last, averages[i], fairAverages[i])
|
uss.t.Errorf("%s client %d last=%v got an Average of %v but the expected average was %v", uss.name, i, last, averages[i], expectedAverage)
|
||||||
} else {
|
} else {
|
||||||
uss.t.Logf("%s client %d last=%v got an Average of %v and the fair average was %v", uss.name, i, last, averages[i], fairAverages[i])
|
uss.t.Logf("%s client %d last=%v got an Average of %v and the expected average was %v", uss.name, i, last, averages[i], expectedAverage)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -376,8 +425,8 @@ func TestNoRestraint(t *testing.T) {
|
|||||||
uniformScenario{name: "NoRestraint",
|
uniformScenario{name: "NoRestraint",
|
||||||
qs: nr,
|
qs: nr,
|
||||||
clients: []uniformClient{
|
clients: []uniformClient{
|
||||||
{1001001001, 5, 10, time.Second, time.Second, false},
|
newUniformClient(1001001001, 5, 10, time.Second, time.Second),
|
||||||
{2002002002, 2, 10, time.Second, time.Second / 2, false},
|
newUniformClient(2002002002, 2, 10, time.Second, time.Second/2),
|
||||||
},
|
},
|
||||||
concurrencyLimit: 10,
|
concurrencyLimit: 10,
|
||||||
evalDuration: time.Second * 15,
|
evalDuration: time.Second * 15,
|
||||||
@ -389,6 +438,90 @@ func TestNoRestraint(t *testing.T) {
|
|||||||
}.exercise(t)
|
}.exercise(t)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestBaseline(t *testing.T) {
|
||||||
|
metrics.Register()
|
||||||
|
now := time.Now()
|
||||||
|
|
||||||
|
clk, counter := testeventclock.NewFake(now, 0, nil)
|
||||||
|
qsf := newTestableQueueSetFactory(clk, countingPromiseFactoryFactory(counter))
|
||||||
|
qCfg := fq.QueuingConfig{
|
||||||
|
Name: "TestBaseline",
|
||||||
|
DesiredNumQueues: 9,
|
||||||
|
QueueLengthLimit: 8,
|
||||||
|
HandSize: 3,
|
||||||
|
RequestWaitLimit: 10 * time.Minute,
|
||||||
|
}
|
||||||
|
qsc, err := qsf.BeginConstruction(qCfg, newObserverPair(clk))
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
qs := qsc.Complete(fq.DispatchingConfig{ConcurrencyLimit: 1})
|
||||||
|
|
||||||
|
uniformScenario{name: qCfg.Name,
|
||||||
|
qs: qs,
|
||||||
|
clients: []uniformClient{
|
||||||
|
newUniformClient(1001001001, 1, 21, time.Second, 0),
|
||||||
|
},
|
||||||
|
concurrencyLimit: 1,
|
||||||
|
evalDuration: time.Second * 20,
|
||||||
|
expectedFair: []bool{true},
|
||||||
|
expectedFairnessMargin: []float64{0},
|
||||||
|
expectAllRequests: true,
|
||||||
|
evalInqueueMetrics: true,
|
||||||
|
evalExecutingMetrics: true,
|
||||||
|
clk: clk,
|
||||||
|
counter: counter,
|
||||||
|
}.exercise(t)
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestSeparations(t *testing.T) {
|
||||||
|
for _, seps := range []struct{ think, pad time.Duration }{
|
||||||
|
{think: time.Second, pad: 0},
|
||||||
|
{think: 0, pad: time.Second},
|
||||||
|
{think: time.Second, pad: time.Second / 2},
|
||||||
|
{think: time.Second / 2, pad: time.Second},
|
||||||
|
} {
|
||||||
|
for conc := 1; conc <= 2; conc++ {
|
||||||
|
caseName := fmt.Sprintf("seps%v,%v,%v", seps.think, seps.pad, conc)
|
||||||
|
t.Run(caseName, func(t *testing.T) {
|
||||||
|
metrics.Register()
|
||||||
|
now := time.Now()
|
||||||
|
|
||||||
|
clk, counter := testeventclock.NewFake(now, 0, nil)
|
||||||
|
qsf := newTestableQueueSetFactory(clk, countingPromiseFactoryFactory(counter))
|
||||||
|
qCfg := fq.QueuingConfig{
|
||||||
|
Name: caseName,
|
||||||
|
DesiredNumQueues: 9,
|
||||||
|
QueueLengthLimit: 8,
|
||||||
|
HandSize: 3,
|
||||||
|
RequestWaitLimit: 10 * time.Minute,
|
||||||
|
}
|
||||||
|
qsc, err := qsf.BeginConstruction(qCfg, newObserverPair(clk))
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
qs := qsc.Complete(fq.DispatchingConfig{ConcurrencyLimit: conc})
|
||||||
|
uniformScenario{name: qCfg.Name,
|
||||||
|
qs: qs,
|
||||||
|
clients: []uniformClient{
|
||||||
|
newUniformClient(1001001001, 1, 19, time.Second, seps.think).pad(seps.pad),
|
||||||
|
},
|
||||||
|
concurrencyLimit: conc,
|
||||||
|
evalDuration: time.Second * 18, // multiple of every period involved, so that margin can be 0 below
|
||||||
|
expectedFair: []bool{true},
|
||||||
|
expectedFairnessMargin: []float64{0},
|
||||||
|
expectAllRequests: true,
|
||||||
|
evalInqueueMetrics: true,
|
||||||
|
evalExecutingMetrics: true,
|
||||||
|
clk: clk,
|
||||||
|
counter: counter,
|
||||||
|
padConstrains: conc == 1,
|
||||||
|
}.exercise(t)
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
func TestUniformFlowsHandSize1(t *testing.T) {
|
func TestUniformFlowsHandSize1(t *testing.T) {
|
||||||
metrics.Register()
|
metrics.Register()
|
||||||
now := time.Now()
|
now := time.Now()
|
||||||
@ -411,8 +544,8 @@ func TestUniformFlowsHandSize1(t *testing.T) {
|
|||||||
uniformScenario{name: qCfg.Name,
|
uniformScenario{name: qCfg.Name,
|
||||||
qs: qs,
|
qs: qs,
|
||||||
clients: []uniformClient{
|
clients: []uniformClient{
|
||||||
{1001001001, 8, 20, time.Second, time.Second - 1, false},
|
newUniformClient(1001001001, 8, 20, time.Second, time.Second-1),
|
||||||
{2002002002, 8, 20, time.Second, time.Second - 1, false},
|
newUniformClient(2002002002, 8, 20, time.Second, time.Second-1),
|
||||||
},
|
},
|
||||||
concurrencyLimit: 4,
|
concurrencyLimit: 4,
|
||||||
evalDuration: time.Second * 50,
|
evalDuration: time.Second * 50,
|
||||||
@ -447,8 +580,8 @@ func TestUniformFlowsHandSize3(t *testing.T) {
|
|||||||
uniformScenario{name: qCfg.Name,
|
uniformScenario{name: qCfg.Name,
|
||||||
qs: qs,
|
qs: qs,
|
||||||
clients: []uniformClient{
|
clients: []uniformClient{
|
||||||
{1001001001, 8, 30, time.Second, time.Second - 1, false},
|
newUniformClient(1001001001, 8, 30, time.Second, time.Second-1),
|
||||||
{2002002002, 8, 30, time.Second, time.Second - 1, false},
|
newUniformClient(2002002002, 8, 30, time.Second, time.Second-1),
|
||||||
},
|
},
|
||||||
concurrencyLimit: 4,
|
concurrencyLimit: 4,
|
||||||
evalDuration: time.Second * 60,
|
evalDuration: time.Second * 60,
|
||||||
@ -484,8 +617,8 @@ func TestDifferentFlowsExpectEqual(t *testing.T) {
|
|||||||
uniformScenario{name: qCfg.Name,
|
uniformScenario{name: qCfg.Name,
|
||||||
qs: qs,
|
qs: qs,
|
||||||
clients: []uniformClient{
|
clients: []uniformClient{
|
||||||
{1001001001, 8, 20, time.Second, time.Second, false},
|
newUniformClient(1001001001, 8, 20, time.Second, time.Second),
|
||||||
{2002002002, 7, 30, time.Second, time.Second / 2, false},
|
newUniformClient(2002002002, 7, 30, time.Second, time.Second/2),
|
||||||
},
|
},
|
||||||
concurrencyLimit: 4,
|
concurrencyLimit: 4,
|
||||||
evalDuration: time.Second * 40,
|
evalDuration: time.Second * 40,
|
||||||
@ -521,8 +654,8 @@ func TestDifferentFlowsExpectUnequal(t *testing.T) {
|
|||||||
uniformScenario{name: qCfg.Name,
|
uniformScenario{name: qCfg.Name,
|
||||||
qs: qs,
|
qs: qs,
|
||||||
clients: []uniformClient{
|
clients: []uniformClient{
|
||||||
{1001001001, 4, 20, time.Second, time.Second - 1, false},
|
newUniformClient(1001001001, 4, 20, time.Second, time.Second-1),
|
||||||
{2002002002, 2, 20, time.Second, time.Second - 1, false},
|
newUniformClient(2002002002, 2, 20, time.Second, time.Second-1),
|
||||||
},
|
},
|
||||||
concurrencyLimit: 3,
|
concurrencyLimit: 3,
|
||||||
evalDuration: time.Second * 20,
|
evalDuration: time.Second * 20,
|
||||||
@ -536,6 +669,81 @@ func TestDifferentFlowsExpectUnequal(t *testing.T) {
|
|||||||
}.exercise(t)
|
}.exercise(t)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestDifferentWidths(t *testing.T) {
|
||||||
|
metrics.Register()
|
||||||
|
now := time.Now()
|
||||||
|
|
||||||
|
clk, counter := testeventclock.NewFake(now, 0, nil)
|
||||||
|
qsf := newTestableQueueSetFactory(clk, countingPromiseFactoryFactory(counter))
|
||||||
|
qCfg := fq.QueuingConfig{
|
||||||
|
Name: "TestDifferentWidths",
|
||||||
|
DesiredNumQueues: 64,
|
||||||
|
QueueLengthLimit: 4,
|
||||||
|
HandSize: 7,
|
||||||
|
RequestWaitLimit: 10 * time.Minute,
|
||||||
|
}
|
||||||
|
qsc, err := qsf.BeginConstruction(qCfg, newObserverPair(clk))
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
qs := qsc.Complete(fq.DispatchingConfig{ConcurrencyLimit: 6})
|
||||||
|
uniformScenario{name: qCfg.Name,
|
||||||
|
qs: qs,
|
||||||
|
clients: []uniformClient{
|
||||||
|
newUniformClient(10010010010010, 13, 10, time.Second, time.Second-1),
|
||||||
|
newUniformClient(20020020020020, 7, 10, time.Second, time.Second-1).seats(2),
|
||||||
|
},
|
||||||
|
concurrencyLimit: 6,
|
||||||
|
evalDuration: time.Second * 20,
|
||||||
|
expectedFair: []bool{true},
|
||||||
|
expectedFairnessMargin: []float64{0.1},
|
||||||
|
expectAllRequests: true,
|
||||||
|
evalInqueueMetrics: true,
|
||||||
|
evalExecutingMetrics: true,
|
||||||
|
clk: clk,
|
||||||
|
counter: counter,
|
||||||
|
}.exercise(t)
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestTooWide(t *testing.T) {
|
||||||
|
metrics.Register()
|
||||||
|
now := time.Now()
|
||||||
|
|
||||||
|
clk, counter := testeventclock.NewFake(now, 0, nil)
|
||||||
|
qsf := newTestableQueueSetFactory(clk, countingPromiseFactoryFactory(counter))
|
||||||
|
qCfg := fq.QueuingConfig{
|
||||||
|
Name: "TestTooWide",
|
||||||
|
DesiredNumQueues: 64,
|
||||||
|
QueueLengthLimit: 7,
|
||||||
|
HandSize: 7,
|
||||||
|
RequestWaitLimit: 10 * time.Minute,
|
||||||
|
}
|
||||||
|
qsc, err := qsf.BeginConstruction(qCfg, newObserverPair(clk))
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
qs := qsc.Complete(fq.DispatchingConfig{ConcurrencyLimit: 6})
|
||||||
|
uniformScenario{name: qCfg.Name,
|
||||||
|
qs: qs,
|
||||||
|
clients: []uniformClient{
|
||||||
|
newUniformClient(40040040040040, 15, 21, time.Second, time.Second-1).seats(2),
|
||||||
|
newUniformClient(50050050050050, 15, 21, time.Second, time.Second-1).seats(2),
|
||||||
|
newUniformClient(60060060060060, 15, 21, time.Second, time.Second-1).seats(2),
|
||||||
|
newUniformClient(70070070070070, 15, 21, time.Second, time.Second-1).seats(2),
|
||||||
|
newUniformClient(90090090090090, 15, 21, time.Second, time.Second-1).seats(7),
|
||||||
|
},
|
||||||
|
concurrencyLimit: 6,
|
||||||
|
evalDuration: time.Second * 40,
|
||||||
|
expectedFair: []bool{true},
|
||||||
|
expectedFairnessMargin: []float64{0.35},
|
||||||
|
expectAllRequests: true,
|
||||||
|
evalInqueueMetrics: true,
|
||||||
|
evalExecutingMetrics: true,
|
||||||
|
clk: clk,
|
||||||
|
counter: counter,
|
||||||
|
}.exercise(t)
|
||||||
|
}
|
||||||
|
|
||||||
func TestWindup(t *testing.T) {
|
func TestWindup(t *testing.T) {
|
||||||
metrics.Register()
|
metrics.Register()
|
||||||
now := time.Now()
|
now := time.Now()
|
||||||
@ -557,8 +765,8 @@ func TestWindup(t *testing.T) {
|
|||||||
|
|
||||||
uniformScenario{name: qCfg.Name, qs: qs,
|
uniformScenario{name: qCfg.Name, qs: qs,
|
||||||
clients: []uniformClient{
|
clients: []uniformClient{
|
||||||
{1001001001, 2, 40, time.Second, -1, false},
|
newUniformClient(1001001001, 2, 40, time.Second, -1),
|
||||||
{2002002002, 2, 40, time.Second, -1, true},
|
newUniformClient(2002002002, 2, 40, time.Second, -1).setSplit(),
|
||||||
},
|
},
|
||||||
concurrencyLimit: 3,
|
concurrencyLimit: 3,
|
||||||
evalDuration: time.Second * 40,
|
evalDuration: time.Second * 40,
|
||||||
@ -591,8 +799,8 @@ func TestDifferentFlowsWithoutQueuing(t *testing.T) {
|
|||||||
uniformScenario{name: qCfg.Name,
|
uniformScenario{name: qCfg.Name,
|
||||||
qs: qs,
|
qs: qs,
|
||||||
clients: []uniformClient{
|
clients: []uniformClient{
|
||||||
{1001001001, 6, 10, time.Second, 57 * time.Millisecond, false},
|
newUniformClient(1001001001, 6, 10, time.Second, 57*time.Millisecond),
|
||||||
{2002002002, 4, 15, time.Second, 750 * time.Millisecond, false},
|
newUniformClient(2002002002, 4, 15, time.Second, 750*time.Millisecond),
|
||||||
},
|
},
|
||||||
concurrencyLimit: 4,
|
concurrencyLimit: 4,
|
||||||
evalDuration: time.Second * 13,
|
evalDuration: time.Second * 13,
|
||||||
@ -627,7 +835,7 @@ func TestTimeout(t *testing.T) {
|
|||||||
uniformScenario{name: qCfg.Name,
|
uniformScenario{name: qCfg.Name,
|
||||||
qs: qs,
|
qs: qs,
|
||||||
clients: []uniformClient{
|
clients: []uniformClient{
|
||||||
{1001001001, 5, 100, time.Second, time.Second, false},
|
newUniformClient(1001001001, 5, 100, time.Second, time.Second),
|
||||||
},
|
},
|
||||||
concurrencyLimit: 1,
|
concurrencyLimit: 1,
|
||||||
evalDuration: time.Second * 10,
|
evalDuration: time.Second * 10,
|
||||||
|
@ -79,9 +79,8 @@ type queue struct {
|
|||||||
// The requests are stored in a FIFO list.
|
// The requests are stored in a FIFO list.
|
||||||
requests fifo
|
requests fifo
|
||||||
|
|
||||||
// virtualStart is the virtual time (virtual seconds since process
|
// virtualStart is the "virtual time" (R progress meter reading) at
|
||||||
// startup) when the oldest request in the queue (if there is any)
|
// which the next request will be dispatched in the virtual world.
|
||||||
// started virtually executing
|
|
||||||
virtualStart float64
|
virtualStart float64
|
||||||
|
|
||||||
requestsExecuting int
|
requestsExecuting int
|
||||||
|
Loading…
Reference in New Issue
Block a user