apf: remove timeoutOldRequestsAndRejectOrEnqueueLocked function

This commit is contained in:
Abu Kashem 2023-08-28 17:26:11 -04:00
parent f39213a7e4
commit da8a472206
No known key found for this signature in database
GPG Key ID: E5ECC1124B5F9C68
2 changed files with 9 additions and 100 deletions

View File

@ -300,9 +300,6 @@ const (
// Serve this one
decisionExecute requestDecision = iota
// Reject this one due to APF queuing considerations
decisionReject
// This one's context timed out / was canceled
decisionCancel
)
@ -337,11 +334,10 @@ func (qs *queueSet) StartRequest(ctx context.Context, workEstimate *fqrequest.Wo
// ========================================================================
// Step 1:
// 1) Start with shuffle sharding, to pick a queue.
// 2) Reject old requests that have been waiting too long
// 3) Reject current request if there is not enough concurrency shares and
// 2) Reject current request if there is not enough concurrency shares and
// we are at max queue length
// 4) If not rejected, create a request and enqueue
req = qs.timeoutOldRequestsAndRejectOrEnqueueLocked(ctx, workEstimate, hashValue, flowDistinguisher, fsName, descr1, descr2, queueNoteFn)
// 3) If not rejected, create a request and enqueue
req = qs.shuffleShardAndRejectOrEnqueueLocked(ctx, workEstimate, hashValue, flowDistinguisher, fsName, descr1, descr2, queueNoteFn)
// req == nil means that the request was rejected - no remaining
// concurrency shares and at max queue length already
if req == nil {
@ -422,13 +418,8 @@ func (req *request) wait() (bool, bool) {
}
req.waitStarted = true
switch decisionAny {
case decisionReject:
klog.V(5).Infof("QS(%s): request %#+v %#+v timed out after being enqueued\n", qs.qCfg.Name, req.descr1, req.descr2)
qs.totRequestsRejected++
qs.totRequestsTimedout++
metrics.AddReject(req.ctx, qs.qCfg.Name, req.fsName, "time-out")
return false, qs.isIdleLocked()
case decisionCancel:
klog.V(5).Infof("QS(%s): request %#+v %#+v queue wait time exceeded after being enqueued\n", qs.qCfg.Name, req.descr1, req.descr2)
case decisionExecute:
klog.V(5).Infof("QS(%s): Dispatching request %#+v %#+v from its queue", qs.qCfg.Name, req.descr1, req.descr2)
return true, false
@ -438,7 +429,7 @@ func (req *request) wait() (bool, bool) {
}
// TODO(aaron-prindle) add metrics for this case
klog.V(5).Infof("QS(%s): Ejecting request %#+v %#+v from its queue", qs.qCfg.Name, req.descr1, req.descr2)
// remove the request from the queue as it has timed out
// remove the request from the queue as its queue wait time has exceeded
queue := req.queue
if req.removeFromQueueLocked() != nil {
defer qs.boundNextDispatchLocked(queue)
@ -556,25 +547,19 @@ func (qs *queueSet) getVirtualTimeRatioLocked() float64 {
return math.Min(float64(seatsRequested), float64(qs.dCfg.ConcurrencyLimit)) / float64(activeQueues)
}
// timeoutOldRequestsAndRejectOrEnqueueLocked encapsulates the logic required
// shuffleShardAndRejectOrEnqueueLocked encapsulates the logic required
// to validate and enqueue a request for the queueSet/QueueSet:
// 1) Start with shuffle sharding, to pick a queue.
// 2) Reject old requests that have been waiting too long
// 3) Reject current request if there is not enough concurrency shares and
// 2) Reject current request if there is not enough concurrency shares and
// we are at max queue length
// 4) If not rejected, create a request and enqueue
// 3) If not rejected, create a request and enqueue
// returns the enqueud request on a successful enqueue
// returns nil in the case that there is no available concurrency or
// the queuelengthlimit has been reached
func (qs *queueSet) timeoutOldRequestsAndRejectOrEnqueueLocked(ctx context.Context, workEstimate *fqrequest.WorkEstimate, hashValue uint64, flowDistinguisher, fsName string, descr1, descr2 interface{}, queueNoteFn fq.QueueNoteFn) *request {
func (qs *queueSet) shuffleShardAndRejectOrEnqueueLocked(ctx context.Context, workEstimate *fqrequest.WorkEstimate, hashValue uint64, flowDistinguisher, fsName string, descr1, descr2 interface{}, queueNoteFn fq.QueueNoteFn) *request {
// Start with the shuffle sharding, to pick a queue.
queueIdx := qs.shuffleShardLocked(hashValue, descr1, descr2)
queue := qs.queues[queueIdx]
// The next step is the logic to reject requests that have been waiting too long
qs.removeTimedOutRequestsFromQueueToBoundLocked(queue, fsName)
// NOTE: currently timeout is only checked for each new request. This means that there can be
// requests that are in the queue longer than the timeout if there are no new requests
// We prefer the simplicity over the promptness, at least for now.
defer qs.boundNextDispatchLocked(queue)
@ -633,44 +618,6 @@ func (qs *queueSet) shuffleShardLocked(hashValue uint64, descr1, descr2 interfac
return bestQueueIdx
}
// removeTimedOutRequestsFromQueueToBoundLocked rejects old requests that have been enqueued
// past the requestWaitLimit
func (qs *queueSet) removeTimedOutRequestsFromQueueToBoundLocked(queue *queue, fsName string) {
timeoutCount := 0
disqueueSeats := 0
now := qs.clock.Now()
reqs := queue.requestsWaiting
// reqs are sorted oldest -> newest
// can short circuit loop (break) if oldest requests are not timing out
// as newer requests also will not have timed out
// now - requestWaitLimit = arrivalLimit
arrivalLimit := now.Add(-qs.qCfg.RequestWaitLimit)
reqs.Walk(func(req *request) bool {
if arrivalLimit.After(req.arrivalTime) {
if req.decision.Set(decisionReject) && req.removeFromQueueLocked() != nil {
timeoutCount++
disqueueSeats += req.MaxSeats()
req.NoteQueued(false)
metrics.AddRequestsInQueues(req.ctx, qs.qCfg.Name, req.fsName, -1)
metrics.AddSeatsInQueues(req.ctx, qs.qCfg.Name, req.fsName, -req.MaxSeats())
}
// we need to check if the next request has timed out.
return true
}
// since reqs are sorted oldest -> newest, we are done here.
return false
})
// remove timed out requests from queue
if timeoutCount > 0 {
qs.totRequestsWaiting -= timeoutCount
qs.totSeatsWaiting -= disqueueSeats
qs.reqsGaugePair.RequestsWaiting.Add(float64(-timeoutCount))
qs.seatDemandIntegrator.Set(float64(qs.totSeatsInUse + qs.totSeatsWaiting))
}
}
// rejectOrEnqueueToBoundLocked rejects or enqueues the newly arrived
// request, which has been assigned to a queue. If up against the
// queue length limit and the concurrency limit then returns false.

View File

@ -1067,44 +1067,6 @@ func TestDifferentFlowsWithoutQueuing(t *testing.T) {
}.exercise(t)
}
func TestTimeout(t *testing.T) {
metrics.Register()
now := time.Now()
clk, counter := testeventclock.NewFake(now, 0, nil)
qsf := newTestableQueueSetFactory(clk, countingPromiseFactoryFactory(counter))
qCfg := fq.QueuingConfig{
Name: "TestTimeout",
DesiredNumQueues: 128,
QueueLengthLimit: 128,
HandSize: 1,
RequestWaitLimit: 0,
}
seatDemandIntegratorSubject := fq.NewNamedIntegrator(clk, qCfg.Name)
qsc, err := qsf.BeginConstruction(qCfg, newGaugePair(clk), newExecSeatsGauge(clk), seatDemandIntegratorSubject)
if err != nil {
t.Fatal(err)
}
qs := qsComplete(qsc, 1)
uniformScenario{name: qCfg.Name,
qs: qs,
clients: []uniformClient{
newUniformClient(1001001001, 5, 100, time.Second, time.Second),
},
concurrencyLimit: 1,
evalDuration: time.Second * 10,
expectedFair: []bool{true},
expectedFairnessMargin: []float64{0.01},
evalInqueueMetrics: true,
evalExecutingMetrics: true,
rejectReason: "time-out",
clk: clk,
counter: counter,
seatDemandIntegratorSubject: seatDemandIntegratorSubject,
}.exercise(t)
}
// TestContextCancel tests cancellation of a request's context.
// The outline is:
// 1. Use a concurrency limit of 1.