apf: take seats into account when dispatching request

This commit is contained in:
Abu Kashem 2021-06-14 12:19:06 -04:00
parent 0bc75afcf2
commit ff716cef50
No known key found for this signature in database
GPG Key ID: 33A4FA7088DB68A9
4 changed files with 237 additions and 89 deletions

View File

@ -245,9 +245,9 @@ func (qs *queueSet) StartRequest(ctx context.Context, width uint, hashValue uint
// Step 0:
// Apply only concurrency limit, if zero queues desired
if qs.qCfg.DesiredNumQueues < 1 {
if qs.totSeatsInUse >= qs.dCfg.ConcurrencyLimit {
klog.V(5).Infof("QS(%s): rejecting request %q %#+v %#+v because %d seats are in use (%d are executing) and the limit is %d",
qs.qCfg.Name, fsName, descr1, descr2, qs.totSeatsInUse, qs.totRequestsExecuting, qs.dCfg.ConcurrencyLimit)
if !qs.canAccommodateSeatsLocked(int(width)) {
klog.V(5).Infof("QS(%s): rejecting request %q %#+v %#+v because %d seats are asked for, %d seats are in use (%d are executing) and the limit is %d",
qs.qCfg.Name, fsName, descr1, descr2, width, qs.totSeatsInUse, qs.totRequestsExecuting, qs.dCfg.ConcurrencyLimit)
metrics.AddReject(ctx, qs.qCfg.Name, fsName, "concurrency-limit")
return nil, qs.isIdleLocked()
}
@ -662,6 +662,33 @@ func (qs *queueSet) cancelWait(req *request) {
qs.obsPair.RequestsWaiting.Add(-1)
}
// canAccommodateSeatsLocked returns true if this queueSet has enough
// seats available to accommodate a request with the given number of seats,
// otherwise it returns false.
func (qs *queueSet) canAccommodateSeatsLocked(seats int) bool {
switch {
case seats > qs.dCfg.ConcurrencyLimit:
// we have picked the queue with the minimum virtual finish time, but
// the number of seats this request asks for exceeds the concurrency limit.
// TODO: this is a quick fix for now, once we have borrowing in place we will not need it
if qs.totRequestsExecuting == 0 {
// TODO: apply additional lateny associated with this request, as described in the KEP
return true
}
// wait for all "currently" executing requests in this queueSet
// to finish before we can execute this request.
if klog.V(4).Enabled() {
klog.Infof("QS(%s): seats (%d) asked for exceeds concurrency limit, waiting for currently executing requests to complete, %d seats are in use (%d are executing) and the limit is %d",
qs.qCfg.Name, seats, qs.totSeatsInUse, qs.totRequestsExecuting, qs.dCfg.ConcurrencyLimit)
}
return false
case qs.totSeatsInUse+seats > qs.dCfg.ConcurrencyLimit:
return false
}
return true
}
// selectQueueLocked examines the queues in round robin order and
// returns the first one of those for which the virtual finish time of
// the oldest waiting request is minimal.
@ -674,7 +701,24 @@ func (qs *queueSet) selectQueueLocked() *queue {
qs.robinIndex = (qs.robinIndex + 1) % nq
queue := qs.queues[qs.robinIndex]
if queue.requests.Length() != 0 {
currentVirtualFinish := queue.GetNextFinish(qs.estimatedServiceTime)
// the virtual finish time of the oldest request is:
// virtual start time + G
// we are not taking the width of the request into account when
// we calculate the virtual finish time of the request because
// it can starve requests with smaller wdith in other queues.
//
// so let's draw an example of the starving scenario:
// - G=60 (estimated service time in seconds)
// - concurrency limit=2
// - we have two queues, q1 and q2
// - q1 has an infinite supply of requests with width W=1
// - q2 has one request waiting in the queue with width W=2
// - virtual start time for both q1 and q2 are at t0
// - requests complete really fast, S=1ms on q1
// in this scenario we will execute roughly 60,000 requests
// from q1 before we pick the request from q2.
currentVirtualFinish := queue.virtualStart + qs.estimatedServiceTime
if currentVirtualFinish < minVirtualFinish {
minVirtualFinish = currentVirtualFinish
minQueue = queue
@ -682,6 +726,19 @@ func (qs *queueSet) selectQueueLocked() *queue {
}
}
}
// TODO: add a method to fifo that lets us peek at the oldest request
var oldestReqFromMinQueue *request
minQueue.requests.Walk(func(r *request) bool {
oldestReqFromMinQueue = r
return false
})
if oldestReqFromMinQueue == nil || !qs.canAccommodateSeatsLocked(oldestReqFromMinQueue.Seats()) {
// since we have not picked the queue with the minimum virtual finish
// time, we are not going to advance the round robin index here.
return nil
}
// we set the round robin indexing to start at the chose queue
// for the next round. This way the non-selected queues
// win in the case that the virtual finish times are the same

View File

@ -789,6 +789,182 @@ func TestTotalRequestsExecutingWithPanic(t *testing.T) {
}
}
func TestSelectQueueLocked(t *testing.T) {
var G float64 = 60
tests := []struct {
name string
robinIndex int
concurrencyLimit int
totSeatsInUse int
queues []*queue
attempts int
beforeSelectQueueLocked func(attempt int, qs *queueSet)
minQueueIndexExpected []int
robinIndexExpected []int
}{
{
name: "width=1, seats are available, the queue with less virtual start time wins",
concurrencyLimit: 1,
totSeatsInUse: 0,
robinIndex: -1,
queues: []*queue{
{
virtualStart: 200,
requests: newFIFO(
&request{width: 1},
),
},
{
virtualStart: 100,
requests: newFIFO(
&request{width: 1},
),
},
},
attempts: 1,
minQueueIndexExpected: []int{1},
robinIndexExpected: []int{1},
},
{
name: "width=1, all seats are occupied, no queue is picked",
concurrencyLimit: 1,
totSeatsInUse: 1,
robinIndex: -1,
queues: []*queue{
{
virtualStart: 200,
requests: newFIFO(
&request{width: 1},
),
},
},
attempts: 1,
minQueueIndexExpected: []int{-1},
robinIndexExpected: []int{0},
},
{
name: "width > 1, seats are available for request with the least finish time, queue is picked",
concurrencyLimit: 50,
totSeatsInUse: 25,
robinIndex: -1,
queues: []*queue{
{
virtualStart: 200,
requests: newFIFO(
&request{width: 50},
),
},
{
virtualStart: 100,
requests: newFIFO(
&request{width: 25},
),
},
},
attempts: 1,
minQueueIndexExpected: []int{1},
robinIndexExpected: []int{1},
},
{
name: "width > 1, seats are not available for request with the least finish time, queue is not picked",
concurrencyLimit: 50,
totSeatsInUse: 26,
robinIndex: -1,
queues: []*queue{
{
virtualStart: 200,
requests: newFIFO(
&request{width: 10},
),
},
{
virtualStart: 100,
requests: newFIFO(
&request{width: 25},
),
},
},
attempts: 3,
minQueueIndexExpected: []int{-1, -1, -1},
robinIndexExpected: []int{1, 1, 1},
},
{
name: "width > 1, seats become available before 3rd attempt, queue is picked",
concurrencyLimit: 50,
totSeatsInUse: 26,
robinIndex: -1,
queues: []*queue{
{
virtualStart: 200,
requests: newFIFO(
&request{width: 10},
),
},
{
virtualStart: 100,
requests: newFIFO(
&request{width: 25},
),
},
},
beforeSelectQueueLocked: func(attempt int, qs *queueSet) {
if attempt == 3 {
qs.totSeatsInUse = 25
}
},
attempts: 3,
minQueueIndexExpected: []int{-1, -1, 1},
robinIndexExpected: []int{1, 1, 1},
},
}
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
qs := &queueSet{
estimatedServiceTime: G,
robinIndex: test.robinIndex,
totSeatsInUse: test.totSeatsInUse,
dCfg: fq.DispatchingConfig{
ConcurrencyLimit: test.concurrencyLimit,
},
queues: test.queues,
}
t.Logf("QS: robin index=%d, seats in use=%d limit=%d", qs.robinIndex, qs.totSeatsInUse, qs.dCfg.ConcurrencyLimit)
for i := 0; i < test.attempts; i++ {
attempt := i + 1
if test.beforeSelectQueueLocked != nil {
test.beforeSelectQueueLocked(attempt, qs)
}
var minQueueExpected *queue
if queueIdx := test.minQueueIndexExpected[i]; queueIdx >= 0 {
minQueueExpected = test.queues[queueIdx]
}
minQueueGot := qs.selectQueueLocked()
if minQueueExpected != minQueueGot {
t.Errorf("Expected queue: %#v, but got: %#v", minQueueExpected, minQueueGot)
}
robinIndexExpected := test.robinIndexExpected[i]
if robinIndexExpected != qs.robinIndex {
t.Errorf("Expected robin index: %d for attempt: %d, but got: %d", robinIndexExpected, attempt, qs.robinIndex)
}
}
})
}
}
func newFIFO(requests ...*request) fifo {
l := newRequestFIFO()
for i := range requests {
l.Enqueue(requests[i])
}
return l
}
func newObserverPair(clk clock.PassiveClock) metrics.TimedObserverPair {
return metrics.PriorityLevelConcurrencyObserverPairGenerator.Generate(1, 1, []string{"test"})
}

View File

@ -101,29 +101,6 @@ func (q *queue) Dequeue() (*request, bool) {
return request, ok
}
// GetNextFinish returns the expected virtual finish time of the
// oldest request in the queue with estimated duration G
func (q *queue) GetNextFinish(G float64) float64 {
// TODO: if we decide to generalize this function to return virtual finish time
// for the Nth oldest request waiting in the queue, we need to carefully
// evaluate and potentially improve the performance here.
var oldestReq *request
q.requests.Walk(func(r *request) bool {
oldestReq = r
return false
})
if oldestReq == nil {
// we should never be here, since the caller should ensure
// that this queue has request(s) waiting to be served before
// calling this function.
return q.virtualStart
}
// the estimated service time of the oldest request is (G * request width)
estimatedServiceTime := float64(G) * float64(oldestReq.Seats())
return q.virtualStart + estimatedServiceTime
}
func (q *queue) dump(includeDetails bool) debug.QueueDump {
digest := make([]debug.RequestDump, q.requests.Length())
i := 0

View File

@ -1,62 +0,0 @@
/*
Copyright 2021 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package queueset
import (
"testing"
)
func TestGetNextFinish(t *testing.T) {
var virtualStart float64 = 100
var G float64 = 60
tests := []struct {
name string
requests []*request
virtualFinishExpected float64
}{
{
name: "for the oldest request",
requests: []*request{
{width: 5},
{width: 6},
{width: 7},
},
virtualFinishExpected: virtualStart + (5 * G),
},
{
name: "queue does not have any request waiting",
requests: []*request{},
virtualFinishExpected: virtualStart,
},
}
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
requests := newRequestFIFO()
for i := range test.requests {
requests.Enqueue(test.requests[i])
}
q := &queue{requests: requests}
q.virtualStart = virtualStart
virtualFinishGot := q.GetNextFinish(G)
if test.virtualFinishExpected != virtualFinishGot {
t.Errorf("Expected virtual finish time: %.9fs, but got: %.9fs", test.virtualFinishExpected, virtualFinishGot)
}
})
}
}