Refinements to pick queue logic in P&F

This commit is contained in:
wojtekt 2021-07-07 08:58:49 +02:00
parent 656d00e894
commit 7f1c4977d7
3 changed files with 33 additions and 31 deletions

View File

@ -48,9 +48,9 @@ type fifo interface {
// Length returns the number of requests in the list. // Length returns the number of requests in the list.
Length() int Length() int
// Width returns the total width (number of seats) of requests // SeatsSum returns the total number of seats of all requests
// in this list. // in this list.
Width() int SeatsSum() int
// Walk iterates through the list in order of oldest -> newest // Walk iterates through the list in order of oldest -> newest
// and executes the specified walkFunc for each request in that order. // and executes the specified walkFunc for each request in that order.
@ -65,7 +65,7 @@ type fifo interface {
type requestFIFO struct { type requestFIFO struct {
*list.List *list.List
width int seatsSum int
} }
func newRequestFIFO() fifo { func newRequestFIFO() fifo {
@ -78,19 +78,19 @@ func (l *requestFIFO) Length() int {
return l.Len() return l.Len()
} }
func (l *requestFIFO) Width() int { func (l *requestFIFO) SeatsSum() int {
return l.width return l.seatsSum
} }
func (l *requestFIFO) Enqueue(req *request) removeFromFIFOFunc { func (l *requestFIFO) Enqueue(req *request) removeFromFIFOFunc {
e := l.PushBack(req) e := l.PushBack(req)
l.width += req.Seats() l.seatsSum += req.Seats()
return func() *request { return func() *request {
if e.Value != nil { if e.Value != nil {
l.Remove(e) l.Remove(e)
e.Value = nil e.Value = nil
l.width -= req.Seats() l.seatsSum -= req.Seats()
} }
return req return req
} }
@ -109,7 +109,7 @@ func (l *requestFIFO) Dequeue() (*request, bool) {
request, ok := e.Value.(*request) request, ok := e.Value.(*request)
if ok { if ok {
l.width -= request.Seats() l.seatsSum -= request.Seats()
} }
return request, ok return request, ok
} }

View File

@ -150,7 +150,7 @@ func TestFIFOWithRemoveIsIdempotent(t *testing.T) {
verifyOrder(t, orderExpected, remainingRequests) verifyOrder(t, orderExpected, remainingRequests)
} }
func TestFIFOWidth(t *testing.T) { func TestFIFOSeatsSum(t *testing.T) {
list := newRequestFIFO() list := newRequestFIFO()
newRequest := func(width uint) *request { newRequest := func(width uint) *request {
@ -159,49 +159,49 @@ func TestFIFOWidth(t *testing.T) {
arrival := []*request{newRequest(1), newRequest(2), newRequest(3)} arrival := []*request{newRequest(1), newRequest(2), newRequest(3)}
removeFn := make([]removeFromFIFOFunc, 0) removeFn := make([]removeFromFIFOFunc, 0)
width := 0 seatsSum := 0
for i := range arrival { for i := range arrival {
removeFn = append(removeFn, list.Enqueue(arrival[i])) removeFn = append(removeFn, list.Enqueue(arrival[i]))
width += i + 1 seatsSum += i + 1
if list.Width() != width { if list.SeatsSum() != seatsSum {
t.Errorf("Expected width: %d, but got: %d", width, list.Width()) t.Errorf("Expected seatsSum: %d, but got: %d", seatsSum, list.SeatsSum())
} }
} }
for i := range removeFn { for i := range removeFn {
removeFn[i]() removeFn[i]()
width -= i + 1 seatsSum -= i + 1
if list.Width() != width { if list.SeatsSum() != seatsSum {
t.Errorf("Expected width: %d, but got: %d", width, list.Width()) t.Errorf("Expected seatsSum: %d, but got: %d", seatsSum, list.SeatsSum())
} }
// check idempotency // check idempotency
removeFn[i]() removeFn[i]()
if list.Width() != width { if list.SeatsSum() != seatsSum {
t.Errorf("Expected width: %d, but got: %d", width, list.Width()) t.Errorf("Expected seatsSum: %d, but got: %d", seatsSum, list.SeatsSum())
} }
} }
// Check second type of idempotency: Dequeue + removeFn. // Check second type of idempotency: Dequeue + removeFn.
for i := range arrival { for i := range arrival {
removeFn[i] = list.Enqueue(arrival[i]) removeFn[i] = list.Enqueue(arrival[i])
width += i + 1 seatsSum += i + 1
} }
for i := range arrival { for i := range arrival {
if _, ok := list.Dequeue(); !ok { if _, ok := list.Dequeue(); !ok {
t.Errorf("Unexpected failed dequeue: %d", i) t.Errorf("Unexpected failed dequeue: %d", i)
} }
width -= i + 1 seatsSum -= i + 1
if list.Width() != width { if list.SeatsSum() != seatsSum {
t.Errorf("Expected width: %d, but got: %d", width, list.Width()) t.Errorf("Expected seatsSum: %d, but got: %d", seatsSum, list.SeatsSum())
} }
removeFn[i]() removeFn[i]()
if list.Width() != width { if list.SeatsSum() != seatsSum {
t.Errorf("Expected width: %d, but got: %d", width, list.Width()) t.Errorf("Expected seatsSum: %d, but got: %d", seatsSum, list.SeatsSum())
} }
} }
} }

View File

@ -472,18 +472,20 @@ func (qs *queueSet) timeoutOldRequestsAndRejectOrEnqueueLocked(ctx context.Conte
// using the given hashValue and the shuffle sharding parameters of the queueSet. // using the given hashValue and the shuffle sharding parameters of the queueSet.
func (qs *queueSet) chooseQueueIndexLocked(hashValue uint64, descr1, descr2 interface{}) int { func (qs *queueSet) chooseQueueIndexLocked(hashValue uint64, descr1, descr2 interface{}) int {
bestQueueIdx := -1 bestQueueIdx := -1
bestQueueWidth := int(math.MaxInt32) bestQueueSeatsSum := int(math.MaxInt32)
// the dealer uses the current desired number of queues, which is no larger than the number in `qs.queues`. // the dealer uses the current desired number of queues, which is no larger than the number in `qs.queues`.
qs.dealer.Deal(hashValue, func(queueIdx int) { qs.dealer.Deal(hashValue, func(queueIdx int) {
// TODO: Consider taking into account `additional latency` of requests // TODO: Consider taking into account `additional latency` of requests
// in addition to their widths. // in addition to their widths.
thisWidth := qs.queues[queueIdx].requests.Width() // Ideally, this should be based on projected completion time in the
klog.V(7).Infof("QS(%s): For request %#+v %#+v considering queue %d of width %d", qs.qCfg.Name, descr1, descr2, queueIdx, thisWidth) // virtual world of the youngest request in the queue.
if thisWidth < bestQueueWidth { thisSeatsSum := qs.queues[queueIdx].requests.SeatsSum()
bestQueueIdx, bestQueueWidth = queueIdx, thisWidth klog.V(7).Infof("QS(%s): For request %#+v %#+v considering queue %d of seatsSum %d", qs.qCfg.Name, descr1, descr2, queueIdx, thisSeatsSum)
if thisSeatsSum < bestQueueSeatsSum {
bestQueueIdx, bestQueueSeatsSum = queueIdx, thisSeatsSum
} }
}) })
klog.V(6).Infof("QS(%s) at r=%s v=%.9fs: For request %#+v %#+v chose queue %d, had %d waiting & %d executing", qs.qCfg.Name, qs.clock.Now().Format(nsTimeFmt), qs.virtualTime, descr1, descr2, bestQueueIdx, bestQueueWidth, qs.queues[bestQueueIdx].requestsExecuting) klog.V(6).Infof("QS(%s) at r=%s v=%.9fs: For request %#+v %#+v chose queue %d, had %d waiting & %d executing", qs.qCfg.Name, qs.clock.Now().Format(nsTimeFmt), qs.virtualTime, descr1, descr2, bestQueueIdx, bestQueueSeatsSum, qs.queues[bestQueueIdx].requestsExecuting)
return bestQueueIdx return bestQueueIdx
} }
@ -533,7 +535,7 @@ func (qs *queueSet) removeTimedOutRequestsFromQueueLocked(queue *queue, fsName s
// Otherwise enqueues and returns true. // Otherwise enqueues and returns true.
func (qs *queueSet) rejectOrEnqueueLocked(request *request) bool { func (qs *queueSet) rejectOrEnqueueLocked(request *request) bool {
queue := request.queue queue := request.queue
curQueueLength := queue.requests.Width() curQueueLength := queue.requests.Length()
// rejects the newly arrived request if resource criteria not met // rejects the newly arrived request if resource criteria not met
if qs.totSeatsInUse >= qs.dCfg.ConcurrencyLimit && if qs.totSeatsInUse >= qs.dCfg.ConcurrencyLimit &&
curQueueLength >= qs.qCfg.QueueLengthLimit { curQueueLength >= qs.qCfg.QueueLengthLimit {