mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-25 20:53:33 +00:00
Merge pull request #103484 from wojtek-t/pf_queue_picker
Update the logic to pick the best queue in P&F
This commit is contained in:
commit
ca0c8275b4
@ -48,6 +48,10 @@ type fifo interface {
|
|||||||
// Length returns the number of requests in the list.
|
// Length returns the number of requests in the list.
|
||||||
Length() int
|
Length() int
|
||||||
|
|
||||||
|
// Width returns the total width (number of seats) of requests
|
||||||
|
// in this list.
|
||||||
|
Width() int
|
||||||
|
|
||||||
// Walk iterates through the list in order of oldest -> newest
|
// Walk iterates through the list in order of oldest -> newest
|
||||||
// and executes the specified walkFunc for each request in that order.
|
// and executes the specified walkFunc for each request in that order.
|
||||||
//
|
//
|
||||||
@ -60,6 +64,8 @@ type fifo interface {
|
|||||||
// goroutines without additional locking or coordination.
|
// goroutines without additional locking or coordination.
|
||||||
type requestFIFO struct {
|
type requestFIFO struct {
|
||||||
*list.List
|
*list.List
|
||||||
|
|
||||||
|
width int
|
||||||
}
|
}
|
||||||
|
|
||||||
func newRequestFIFO() fifo {
|
func newRequestFIFO() fifo {
|
||||||
@ -72,10 +78,20 @@ func (l *requestFIFO) Length() int {
|
|||||||
return l.Len()
|
return l.Len()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (l *requestFIFO) Width() int {
|
||||||
|
return l.width
|
||||||
|
}
|
||||||
|
|
||||||
func (l *requestFIFO) Enqueue(req *request) removeFromFIFOFunc {
|
func (l *requestFIFO) Enqueue(req *request) removeFromFIFOFunc {
|
||||||
e := l.PushBack(req)
|
e := l.PushBack(req)
|
||||||
|
l.width += req.Seats()
|
||||||
|
|
||||||
return func() *request {
|
return func() *request {
|
||||||
|
if e.Value != nil {
|
||||||
l.Remove(e)
|
l.Remove(e)
|
||||||
|
e.Value = nil
|
||||||
|
l.width -= req.Seats()
|
||||||
|
}
|
||||||
return req
|
return req
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -85,9 +101,16 @@ func (l *requestFIFO) Dequeue() (*request, bool) {
|
|||||||
if e == nil {
|
if e == nil {
|
||||||
return nil, false
|
return nil, false
|
||||||
}
|
}
|
||||||
defer l.Remove(e)
|
|
||||||
|
defer func() {
|
||||||
|
l.Remove(e)
|
||||||
|
e.Value = nil
|
||||||
|
}()
|
||||||
|
|
||||||
request, ok := e.Value.(*request)
|
request, ok := e.Value.(*request)
|
||||||
|
if ok {
|
||||||
|
l.width -= request.Seats()
|
||||||
|
}
|
||||||
return request, ok
|
return request, ok
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -20,6 +20,8 @@ import (
|
|||||||
"math/rand"
|
"math/rand"
|
||||||
"testing"
|
"testing"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
fcrequest "k8s.io/apiserver/pkg/util/flowcontrol/request"
|
||||||
)
|
)
|
||||||
|
|
||||||
func TestFIFOWithEnqueueDequeueSingleRequest(t *testing.T) {
|
func TestFIFOWithEnqueueDequeueSingleRequest(t *testing.T) {
|
||||||
@ -148,6 +150,62 @@ func TestFIFOWithRemoveIsIdempotent(t *testing.T) {
|
|||||||
verifyOrder(t, orderExpected, remainingRequests)
|
verifyOrder(t, orderExpected, remainingRequests)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestFIFOWidth(t *testing.T) {
|
||||||
|
list := newRequestFIFO()
|
||||||
|
|
||||||
|
newRequest := func(width uint) *request {
|
||||||
|
return &request{width: fcrequest.Width{Seats: width}}
|
||||||
|
}
|
||||||
|
arrival := []*request{newRequest(1), newRequest(2), newRequest(3)}
|
||||||
|
removeFn := make([]removeFromFIFOFunc, 0)
|
||||||
|
|
||||||
|
width := 0
|
||||||
|
for i := range arrival {
|
||||||
|
removeFn = append(removeFn, list.Enqueue(arrival[i]))
|
||||||
|
|
||||||
|
width += i + 1
|
||||||
|
if list.Width() != width {
|
||||||
|
t.Errorf("Expected width: %d, but got: %d", width, list.Width())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
for i := range removeFn {
|
||||||
|
removeFn[i]()
|
||||||
|
|
||||||
|
width -= i + 1
|
||||||
|
if list.Width() != width {
|
||||||
|
t.Errorf("Expected width: %d, but got: %d", width, list.Width())
|
||||||
|
}
|
||||||
|
|
||||||
|
// check idempotency
|
||||||
|
removeFn[i]()
|
||||||
|
if list.Width() != width {
|
||||||
|
t.Errorf("Expected width: %d, but got: %d", width, list.Width())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Check second type of idempotency: Dequeue + removeFn.
|
||||||
|
for i := range arrival {
|
||||||
|
removeFn[i] = list.Enqueue(arrival[i])
|
||||||
|
width += i + 1
|
||||||
|
}
|
||||||
|
|
||||||
|
for i := range arrival {
|
||||||
|
if _, ok := list.Dequeue(); !ok {
|
||||||
|
t.Errorf("Unexpected failed dequeue: %d", i)
|
||||||
|
}
|
||||||
|
width -= i + 1
|
||||||
|
if list.Width() != width {
|
||||||
|
t.Errorf("Expected width: %d, but got: %d", width, list.Width())
|
||||||
|
}
|
||||||
|
|
||||||
|
removeFn[i]()
|
||||||
|
if list.Width() != width {
|
||||||
|
t.Errorf("Expected width: %d, but got: %d", width, list.Width())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
func TestFIFOWithWalk(t *testing.T) {
|
func TestFIFOWithWalk(t *testing.T) {
|
||||||
list := newRequestFIFO()
|
list := newRequestFIFO()
|
||||||
|
|
||||||
|
@ -472,16 +472,18 @@ func (qs *queueSet) timeoutOldRequestsAndRejectOrEnqueueLocked(ctx context.Conte
|
|||||||
// using the given hashValue and the shuffle sharding parameters of the queueSet.
|
// using the given hashValue and the shuffle sharding parameters of the queueSet.
|
||||||
func (qs *queueSet) chooseQueueIndexLocked(hashValue uint64, descr1, descr2 interface{}) int {
|
func (qs *queueSet) chooseQueueIndexLocked(hashValue uint64, descr1, descr2 interface{}) int {
|
||||||
bestQueueIdx := -1
|
bestQueueIdx := -1
|
||||||
bestQueueLen := int(math.MaxInt32)
|
bestQueueWidth := int(math.MaxInt32)
|
||||||
// the dealer uses the current desired number of queues, which is no larger than the number in `qs.queues`.
|
// the dealer uses the current desired number of queues, which is no larger than the number in `qs.queues`.
|
||||||
qs.dealer.Deal(hashValue, func(queueIdx int) {
|
qs.dealer.Deal(hashValue, func(queueIdx int) {
|
||||||
thisLen := qs.queues[queueIdx].requests.Length()
|
// TODO: Consider taking into account `additional latency` of requests
|
||||||
klog.V(7).Infof("QS(%s): For request %#+v %#+v considering queue %d of length %d", qs.qCfg.Name, descr1, descr2, queueIdx, thisLen)
|
// in addition to their widths.
|
||||||
if thisLen < bestQueueLen {
|
thisWidth := qs.queues[queueIdx].requests.Width()
|
||||||
bestQueueIdx, bestQueueLen = queueIdx, thisLen
|
klog.V(7).Infof("QS(%s): For request %#+v %#+v considering queue %d of width %d", qs.qCfg.Name, descr1, descr2, queueIdx, thisWidth)
|
||||||
|
if thisWidth < bestQueueWidth {
|
||||||
|
bestQueueIdx, bestQueueWidth = queueIdx, thisWidth
|
||||||
}
|
}
|
||||||
})
|
})
|
||||||
klog.V(6).Infof("QS(%s) at r=%s v=%.9fs: For request %#+v %#+v chose queue %d, had %d waiting & %d executing", qs.qCfg.Name, qs.clock.Now().Format(nsTimeFmt), qs.virtualTime, descr1, descr2, bestQueueIdx, bestQueueLen, qs.queues[bestQueueIdx].requestsExecuting)
|
klog.V(6).Infof("QS(%s) at r=%s v=%.9fs: For request %#+v %#+v chose queue %d, had %d waiting & %d executing", qs.qCfg.Name, qs.clock.Now().Format(nsTimeFmt), qs.virtualTime, descr1, descr2, bestQueueIdx, bestQueueWidth, qs.queues[bestQueueIdx].requestsExecuting)
|
||||||
return bestQueueIdx
|
return bestQueueIdx
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -531,7 +533,7 @@ func (qs *queueSet) removeTimedOutRequestsFromQueueLocked(queue *queue, fsName s
|
|||||||
// Otherwise enqueues and returns true.
|
// Otherwise enqueues and returns true.
|
||||||
func (qs *queueSet) rejectOrEnqueueLocked(request *request) bool {
|
func (qs *queueSet) rejectOrEnqueueLocked(request *request) bool {
|
||||||
queue := request.queue
|
queue := request.queue
|
||||||
curQueueLength := queue.requests.Length()
|
curQueueLength := queue.requests.Width()
|
||||||
// rejects the newly arrived request if resource criteria not met
|
// rejects the newly arrived request if resource criteria not met
|
||||||
if qs.totSeatsInUse >= qs.dCfg.ConcurrencyLimit &&
|
if qs.totSeatsInUse >= qs.dCfg.ConcurrencyLimit &&
|
||||||
curQueueLength >= qs.qCfg.QueueLengthLimit {
|
curQueueLength >= qs.qCfg.QueueLengthLimit {
|
||||||
|
Loading…
Reference in New Issue
Block a user