From ff716cef508f948b50e1026e980e6df5ee475538 Mon Sep 17 00:00:00 2001 From: Abu Kashem Date: Mon, 14 Jun 2021 12:19:06 -0400 Subject: [PATCH] apf: take seats into account when dispatching request --- .../fairqueuing/queueset/queueset.go | 65 ++++++- .../fairqueuing/queueset/queueset_test.go | 176 ++++++++++++++++++ .../flowcontrol/fairqueuing/queueset/types.go | 23 --- .../fairqueuing/queueset/types_test.go | 62 ------ 4 files changed, 237 insertions(+), 89 deletions(-) delete mode 100644 staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/queueset/types_test.go diff --git a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/queueset/queueset.go b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/queueset/queueset.go index d28d984469a..7a4c5185489 100644 --- a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/queueset/queueset.go +++ b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/queueset/queueset.go @@ -245,9 +245,9 @@ func (qs *queueSet) StartRequest(ctx context.Context, width uint, hashValue uint // Step 0: // Apply only concurrency limit, if zero queues desired if qs.qCfg.DesiredNumQueues < 1 { - if qs.totSeatsInUse >= qs.dCfg.ConcurrencyLimit { - klog.V(5).Infof("QS(%s): rejecting request %q %#+v %#+v because %d seats are in use (%d are executing) and the limit is %d", - qs.qCfg.Name, fsName, descr1, descr2, qs.totSeatsInUse, qs.totRequestsExecuting, qs.dCfg.ConcurrencyLimit) + if !qs.canAccommodateSeatsLocked(int(width)) { + klog.V(5).Infof("QS(%s): rejecting request %q %#+v %#+v because %d seats are asked for, %d seats are in use (%d are executing) and the limit is %d", + qs.qCfg.Name, fsName, descr1, descr2, width, qs.totSeatsInUse, qs.totRequestsExecuting, qs.dCfg.ConcurrencyLimit) metrics.AddReject(ctx, qs.qCfg.Name, fsName, "concurrency-limit") return nil, qs.isIdleLocked() } @@ -662,6 +662,33 @@ func (qs *queueSet) cancelWait(req *request) { qs.obsPair.RequestsWaiting.Add(-1) } +// canAccommodateSeatsLocked returns true if this queueSet has enough +// seats available to accommodate a request with the given number of seats, +// otherwise it returns false. +func (qs *queueSet) canAccommodateSeatsLocked(seats int) bool { + switch { + case seats > qs.dCfg.ConcurrencyLimit: + // we have picked the queue with the minimum virtual finish time, but + // the number of seats this request asks for exceeds the concurrency limit. + // TODO: this is a quick fix for now, once we have borrowing in place we will not need it + if qs.totRequestsExecuting == 0 { + // TODO: apply additional lateny associated with this request, as described in the KEP + return true + } + // wait for all "currently" executing requests in this queueSet + // to finish before we can execute this request. + if klog.V(4).Enabled() { + klog.Infof("QS(%s): seats (%d) asked for exceeds concurrency limit, waiting for currently executing requests to complete, %d seats are in use (%d are executing) and the limit is %d", + qs.qCfg.Name, seats, qs.totSeatsInUse, qs.totRequestsExecuting, qs.dCfg.ConcurrencyLimit) + } + return false + case qs.totSeatsInUse+seats > qs.dCfg.ConcurrencyLimit: + return false + } + + return true +} + // selectQueueLocked examines the queues in round robin order and // returns the first one of those for which the virtual finish time of // the oldest waiting request is minimal. @@ -674,7 +701,24 @@ func (qs *queueSet) selectQueueLocked() *queue { qs.robinIndex = (qs.robinIndex + 1) % nq queue := qs.queues[qs.robinIndex] if queue.requests.Length() != 0 { - currentVirtualFinish := queue.GetNextFinish(qs.estimatedServiceTime) + // the virtual finish time of the oldest request is: + // virtual start time + G + // we are not taking the width of the request into account when + // we calculate the virtual finish time of the request because + // it can starve requests with smaller wdith in other queues. + // + // so let's draw an example of the starving scenario: + // - G=60 (estimated service time in seconds) + // - concurrency limit=2 + // - we have two queues, q1 and q2 + // - q1 has an infinite supply of requests with width W=1 + // - q2 has one request waiting in the queue with width W=2 + // - virtual start time for both q1 and q2 are at t0 + // - requests complete really fast, S=1ms on q1 + // in this scenario we will execute roughly 60,000 requests + // from q1 before we pick the request from q2. + currentVirtualFinish := queue.virtualStart + qs.estimatedServiceTime + if currentVirtualFinish < minVirtualFinish { minVirtualFinish = currentVirtualFinish minQueue = queue @@ -682,6 +726,19 @@ func (qs *queueSet) selectQueueLocked() *queue { } } } + + // TODO: add a method to fifo that lets us peek at the oldest request + var oldestReqFromMinQueue *request + minQueue.requests.Walk(func(r *request) bool { + oldestReqFromMinQueue = r + return false + }) + if oldestReqFromMinQueue == nil || !qs.canAccommodateSeatsLocked(oldestReqFromMinQueue.Seats()) { + // since we have not picked the queue with the minimum virtual finish + // time, we are not going to advance the round robin index here. + return nil + } + // we set the round robin indexing to start at the chose queue // for the next round. This way the non-selected queues // win in the case that the virtual finish times are the same diff --git a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/queueset/queueset_test.go b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/queueset/queueset_test.go index 7790179bd74..1edf8500056 100644 --- a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/queueset/queueset_test.go +++ b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/queueset/queueset_test.go @@ -789,6 +789,182 @@ func TestTotalRequestsExecutingWithPanic(t *testing.T) { } } +func TestSelectQueueLocked(t *testing.T) { + var G float64 = 60 + tests := []struct { + name string + robinIndex int + concurrencyLimit int + totSeatsInUse int + queues []*queue + attempts int + beforeSelectQueueLocked func(attempt int, qs *queueSet) + minQueueIndexExpected []int + robinIndexExpected []int + }{ + { + name: "width=1, seats are available, the queue with less virtual start time wins", + concurrencyLimit: 1, + totSeatsInUse: 0, + robinIndex: -1, + queues: []*queue{ + { + virtualStart: 200, + requests: newFIFO( + &request{width: 1}, + ), + }, + { + virtualStart: 100, + requests: newFIFO( + &request{width: 1}, + ), + }, + }, + attempts: 1, + minQueueIndexExpected: []int{1}, + robinIndexExpected: []int{1}, + }, + { + name: "width=1, all seats are occupied, no queue is picked", + concurrencyLimit: 1, + totSeatsInUse: 1, + robinIndex: -1, + queues: []*queue{ + { + virtualStart: 200, + requests: newFIFO( + &request{width: 1}, + ), + }, + }, + attempts: 1, + minQueueIndexExpected: []int{-1}, + robinIndexExpected: []int{0}, + }, + { + name: "width > 1, seats are available for request with the least finish time, queue is picked", + concurrencyLimit: 50, + totSeatsInUse: 25, + robinIndex: -1, + queues: []*queue{ + { + virtualStart: 200, + requests: newFIFO( + &request{width: 50}, + ), + }, + { + virtualStart: 100, + requests: newFIFO( + &request{width: 25}, + ), + }, + }, + attempts: 1, + minQueueIndexExpected: []int{1}, + robinIndexExpected: []int{1}, + }, + { + name: "width > 1, seats are not available for request with the least finish time, queue is not picked", + concurrencyLimit: 50, + totSeatsInUse: 26, + robinIndex: -1, + queues: []*queue{ + { + virtualStart: 200, + requests: newFIFO( + &request{width: 10}, + ), + }, + { + virtualStart: 100, + requests: newFIFO( + &request{width: 25}, + ), + }, + }, + attempts: 3, + minQueueIndexExpected: []int{-1, -1, -1}, + robinIndexExpected: []int{1, 1, 1}, + }, + { + name: "width > 1, seats become available before 3rd attempt, queue is picked", + concurrencyLimit: 50, + totSeatsInUse: 26, + robinIndex: -1, + queues: []*queue{ + { + virtualStart: 200, + requests: newFIFO( + &request{width: 10}, + ), + }, + { + virtualStart: 100, + requests: newFIFO( + &request{width: 25}, + ), + }, + }, + beforeSelectQueueLocked: func(attempt int, qs *queueSet) { + if attempt == 3 { + qs.totSeatsInUse = 25 + } + }, + attempts: 3, + minQueueIndexExpected: []int{-1, -1, 1}, + robinIndexExpected: []int{1, 1, 1}, + }, + } + + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + qs := &queueSet{ + estimatedServiceTime: G, + robinIndex: test.robinIndex, + totSeatsInUse: test.totSeatsInUse, + dCfg: fq.DispatchingConfig{ + ConcurrencyLimit: test.concurrencyLimit, + }, + queues: test.queues, + } + + t.Logf("QS: robin index=%d, seats in use=%d limit=%d", qs.robinIndex, qs.totSeatsInUse, qs.dCfg.ConcurrencyLimit) + + for i := 0; i < test.attempts; i++ { + attempt := i + 1 + if test.beforeSelectQueueLocked != nil { + test.beforeSelectQueueLocked(attempt, qs) + } + + var minQueueExpected *queue + if queueIdx := test.minQueueIndexExpected[i]; queueIdx >= 0 { + minQueueExpected = test.queues[queueIdx] + } + + minQueueGot := qs.selectQueueLocked() + if minQueueExpected != minQueueGot { + t.Errorf("Expected queue: %#v, but got: %#v", minQueueExpected, minQueueGot) + } + + robinIndexExpected := test.robinIndexExpected[i] + if robinIndexExpected != qs.robinIndex { + t.Errorf("Expected robin index: %d for attempt: %d, but got: %d", robinIndexExpected, attempt, qs.robinIndex) + } + } + }) + } +} + +func newFIFO(requests ...*request) fifo { + l := newRequestFIFO() + for i := range requests { + l.Enqueue(requests[i]) + } + return l +} + func newObserverPair(clk clock.PassiveClock) metrics.TimedObserverPair { return metrics.PriorityLevelConcurrencyObserverPairGenerator.Generate(1, 1, []string{"test"}) } diff --git a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/queueset/types.go b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/queueset/types.go index c7cea0a004f..92365c63b53 100644 --- a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/queueset/types.go +++ b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/queueset/types.go @@ -101,29 +101,6 @@ func (q *queue) Dequeue() (*request, bool) { return request, ok } -// GetNextFinish returns the expected virtual finish time of the -// oldest request in the queue with estimated duration G -func (q *queue) GetNextFinish(G float64) float64 { - // TODO: if we decide to generalize this function to return virtual finish time - // for the Nth oldest request waiting in the queue, we need to carefully - // evaluate and potentially improve the performance here. - var oldestReq *request - q.requests.Walk(func(r *request) bool { - oldestReq = r - return false - }) - if oldestReq == nil { - // we should never be here, since the caller should ensure - // that this queue has request(s) waiting to be served before - // calling this function. - return q.virtualStart - } - - // the estimated service time of the oldest request is (G * request width) - estimatedServiceTime := float64(G) * float64(oldestReq.Seats()) - return q.virtualStart + estimatedServiceTime -} - func (q *queue) dump(includeDetails bool) debug.QueueDump { digest := make([]debug.RequestDump, q.requests.Length()) i := 0 diff --git a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/queueset/types_test.go b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/queueset/types_test.go deleted file mode 100644 index ffcf7ca2e15..00000000000 --- a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/queueset/types_test.go +++ /dev/null @@ -1,62 +0,0 @@ -/* -Copyright 2021 The Kubernetes Authors. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package queueset - -import ( - "testing" -) - -func TestGetNextFinish(t *testing.T) { - var virtualStart float64 = 100 - var G float64 = 60 - tests := []struct { - name string - requests []*request - virtualFinishExpected float64 - }{ - { - name: "for the oldest request", - requests: []*request{ - {width: 5}, - {width: 6}, - {width: 7}, - }, - virtualFinishExpected: virtualStart + (5 * G), - }, - { - name: "queue does not have any request waiting", - requests: []*request{}, - virtualFinishExpected: virtualStart, - }, - } - - for _, test := range tests { - t.Run(test.name, func(t *testing.T) { - requests := newRequestFIFO() - for i := range test.requests { - requests.Enqueue(test.requests[i]) - } - - q := &queue{requests: requests} - q.virtualStart = virtualStart - virtualFinishGot := q.GetNextFinish(G) - if test.virtualFinishExpected != virtualFinishGot { - t.Errorf("Expected virtual finish time: %.9fs, but got: %.9fs", test.virtualFinishExpected, virtualFinishGot) - } - }) - } -}