Merge pull request #103534 from wojtek-t/pf_queue_picker_refinements

Refinements to pick queue logic in P&F
This commit is contained in:
Kubernetes Prow Robot 2021-07-08 03:04:53 -07:00 committed by GitHub
commit 33431f542b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 33 additions and 31 deletions

View File

@ -48,9 +48,9 @@ type fifo interface {
// Length returns the number of requests in the list.
Length() int
// Width returns the total width (number of seats) of requests
// SeatsSum returns the total number of seats of all requests
// in this list.
Width() int
SeatsSum() int
// Walk iterates through the list in order of oldest -> newest
// and executes the specified walkFunc for each request in that order.
@ -65,7 +65,7 @@ type fifo interface {
type requestFIFO struct {
*list.List
width int
seatsSum int
}
func newRequestFIFO() fifo {
@ -78,19 +78,19 @@ func (l *requestFIFO) Length() int {
return l.Len()
}
func (l *requestFIFO) Width() int {
return l.width
func (l *requestFIFO) SeatsSum() int {
return l.seatsSum
}
func (l *requestFIFO) Enqueue(req *request) removeFromFIFOFunc {
e := l.PushBack(req)
l.width += req.Seats()
l.seatsSum += req.Seats()
return func() *request {
if e.Value != nil {
l.Remove(e)
e.Value = nil
l.width -= req.Seats()
l.seatsSum -= req.Seats()
}
return req
}
@ -109,7 +109,7 @@ func (l *requestFIFO) Dequeue() (*request, bool) {
request, ok := e.Value.(*request)
if ok {
l.width -= request.Seats()
l.seatsSum -= request.Seats()
}
return request, ok
}

View File

@ -150,7 +150,7 @@ func TestFIFOWithRemoveIsIdempotent(t *testing.T) {
verifyOrder(t, orderExpected, remainingRequests)
}
func TestFIFOWidth(t *testing.T) {
func TestFIFOSeatsSum(t *testing.T) {
list := newRequestFIFO()
newRequest := func(width uint) *request {
@ -159,49 +159,49 @@ func TestFIFOWidth(t *testing.T) {
arrival := []*request{newRequest(1), newRequest(2), newRequest(3)}
removeFn := make([]removeFromFIFOFunc, 0)
width := 0
seatsSum := 0
for i := range arrival {
removeFn = append(removeFn, list.Enqueue(arrival[i]))
width += i + 1
if list.Width() != width {
t.Errorf("Expected width: %d, but got: %d", width, list.Width())
seatsSum += i + 1
if list.SeatsSum() != seatsSum {
t.Errorf("Expected seatsSum: %d, but got: %d", seatsSum, list.SeatsSum())
}
}
for i := range removeFn {
removeFn[i]()
width -= i + 1
if list.Width() != width {
t.Errorf("Expected width: %d, but got: %d", width, list.Width())
seatsSum -= i + 1
if list.SeatsSum() != seatsSum {
t.Errorf("Expected seatsSum: %d, but got: %d", seatsSum, list.SeatsSum())
}
// check idempotency
removeFn[i]()
if list.Width() != width {
t.Errorf("Expected width: %d, but got: %d", width, list.Width())
if list.SeatsSum() != seatsSum {
t.Errorf("Expected seatsSum: %d, but got: %d", seatsSum, list.SeatsSum())
}
}
// Check second type of idempotency: Dequeue + removeFn.
for i := range arrival {
removeFn[i] = list.Enqueue(arrival[i])
width += i + 1
seatsSum += i + 1
}
for i := range arrival {
if _, ok := list.Dequeue(); !ok {
t.Errorf("Unexpected failed dequeue: %d", i)
}
width -= i + 1
if list.Width() != width {
t.Errorf("Expected width: %d, but got: %d", width, list.Width())
seatsSum -= i + 1
if list.SeatsSum() != seatsSum {
t.Errorf("Expected seatsSum: %d, but got: %d", seatsSum, list.SeatsSum())
}
removeFn[i]()
if list.Width() != width {
t.Errorf("Expected width: %d, but got: %d", width, list.Width())
if list.SeatsSum() != seatsSum {
t.Errorf("Expected seatsSum: %d, but got: %d", seatsSum, list.SeatsSum())
}
}
}

View File

@ -472,18 +472,20 @@ func (qs *queueSet) timeoutOldRequestsAndRejectOrEnqueueLocked(ctx context.Conte
// using the given hashValue and the shuffle sharding parameters of the queueSet.
func (qs *queueSet) chooseQueueIndexLocked(hashValue uint64, descr1, descr2 interface{}) int {
bestQueueIdx := -1
bestQueueWidth := int(math.MaxInt32)
bestQueueSeatsSum := int(math.MaxInt32)
// the dealer uses the current desired number of queues, which is no larger than the number in `qs.queues`.
qs.dealer.Deal(hashValue, func(queueIdx int) {
// TODO: Consider taking into account `additional latency` of requests
// in addition to their widths.
thisWidth := qs.queues[queueIdx].requests.Width()
klog.V(7).Infof("QS(%s): For request %#+v %#+v considering queue %d of width %d", qs.qCfg.Name, descr1, descr2, queueIdx, thisWidth)
if thisWidth < bestQueueWidth {
bestQueueIdx, bestQueueWidth = queueIdx, thisWidth
// Ideally, this should be based on projected completion time in the
// virtual world of the youngest request in the queue.
thisSeatsSum := qs.queues[queueIdx].requests.SeatsSum()
klog.V(7).Infof("QS(%s): For request %#+v %#+v considering queue %d of seatsSum %d", qs.qCfg.Name, descr1, descr2, queueIdx, thisSeatsSum)
if thisSeatsSum < bestQueueSeatsSum {
bestQueueIdx, bestQueueSeatsSum = queueIdx, thisSeatsSum
}
})
klog.V(6).Infof("QS(%s) at r=%s v=%.9fs: For request %#+v %#+v chose queue %d, had %d waiting & %d executing", qs.qCfg.Name, qs.clock.Now().Format(nsTimeFmt), qs.virtualTime, descr1, descr2, bestQueueIdx, bestQueueWidth, qs.queues[bestQueueIdx].requestsExecuting)
klog.V(6).Infof("QS(%s) at r=%s v=%.9fs: For request %#+v %#+v chose queue %d, had %d waiting & %d executing", qs.qCfg.Name, qs.clock.Now().Format(nsTimeFmt), qs.virtualTime, descr1, descr2, bestQueueIdx, bestQueueSeatsSum, qs.queues[bestQueueIdx].requestsExecuting)
return bestQueueIdx
}
@ -533,7 +535,7 @@ func (qs *queueSet) removeTimedOutRequestsFromQueueLocked(queue *queue, fsName s
// Otherwise enqueues and returns true.
func (qs *queueSet) rejectOrEnqueueLocked(request *request) bool {
queue := request.queue
curQueueLength := queue.requests.Width()
curQueueLength := queue.requests.Length()
// rejects the newly arrived request if resource criteria not met
if qs.totSeatsInUse >= qs.dCfg.ConcurrencyLimit &&
curQueueLength >= qs.qCfg.QueueLengthLimit {