From a0c161f2f6908ee424ea888ff40f75ff071bd20a Mon Sep 17 00:00:00 2001 From: Mike Spreitzer Date: Tue, 7 Sep 2021 00:46:50 -0400 Subject: [PATCH] Change execution duration guess from 1 minute to 3 milliseconds So that the width estimate has some effect but not a grossly excessive one. Added the fifo::Peek method to simplify the fifo client code. Also renamed the queueSet::estimatedServiceTime field to estimatedServiceSeconds to make the units clear. --- .../fairqueuing/queueset/fifo_list.go | 23 ++++++-- .../fairqueuing/queueset/queueset.go | 56 ++++++------------- .../fairqueuing/queueset/queueset_test.go | 13 +++-- .../flowcontrol/fairqueuing/queueset/types.go | 3 +- 4 files changed, 45 insertions(+), 50 deletions(-) diff --git a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/queueset/fifo_list.go b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/queueset/fifo_list.go index 5c7e7acf5da..9f531b96442 100644 --- a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/queueset/fifo_list.go +++ b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/queueset/fifo_list.go @@ -45,6 +45,9 @@ type fifo interface { // Dequeue pulls out the oldest request from the list. Dequeue() (*request, bool) + // Peek returns the oldest request without removing it. + Peek() (*request, bool) + // Length returns the number of requests in the list. Length() int @@ -97,18 +100,28 @@ func (l *requestFIFO) Enqueue(req *request) removeFromFIFOFunc { } func (l *requestFIFO) Dequeue() (*request, bool) { + return l.getFirst(true) +} + +func (l *requestFIFO) Peek() (*request, bool) { + return l.getFirst(false) +} + +func (l *requestFIFO) getFirst(remove bool) (*request, bool) { e := l.Front() if e == nil { return nil, false } - defer func() { - l.Remove(e) - e.Value = nil - }() + if remove { + defer func() { + l.Remove(e) + e.Value = nil + }() + } request, ok := e.Value.(*request) - if ok { + if remove && ok { l.seatsSum -= request.Seats() } return request, ok diff --git a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/queueset/queueset.go b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/queueset/queueset.go index ffb3549b8e0..e5413dd94aa 100644 --- a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/queueset/queueset.go +++ b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/queueset/queueset.go @@ -76,9 +76,9 @@ type queueSetCompleter struct { // not end in "Locked" either acquires the lock or does not care about // locking. type queueSet struct { - clock eventclock.Interface - estimatedServiceTime float64 - obsPair metrics.TimedObserverPair + clock eventclock.Interface + estimatedServiceSeconds float64 + obsPair metrics.TimedObserverPair promiseFactory promiseFactory @@ -170,12 +170,12 @@ func (qsc *queueSetCompleter) Complete(dCfg fq.DispatchingConfig) fq.QueueSet { qs := qsc.theSet if qs == nil { qs = &queueSet{ - clock: qsc.factory.clock, - estimatedServiceTime: 60, - obsPair: qsc.obsPair, - qCfg: qsc.qCfg, - virtualTime: 0, - lastRealTime: qsc.factory.clock.Now(), + clock: qsc.factory.clock, + estimatedServiceSeconds: 0.003, + obsPair: qsc.obsPair, + qCfg: qsc.qCfg, + virtualTime: 0, + lastRealTime: qsc.factory.clock.Now(), } qs.promiseFactory = qsc.factory.promiseFactoryFactory(qs) } @@ -642,8 +642,8 @@ func (qs *queueSet) dispatchLocked() bool { qs.qCfg.Name, request.startTime.Format(nsTimeFmt), qs.virtualTime, request.descr1, request.descr2, request.workEstimate, queue.index, queue.virtualStart, queue.requests.Length(), queue.requestsExecuting, queue.seatsInUse, qs.totSeatsInUse) } - // When a request is dequeued for service -> qs.virtualStart += G - queue.virtualStart += qs.estimatedServiceTime * float64(request.Seats()) + // When a request is dequeued for service -> qs.virtualStart += G * width + queue.virtualStart += qs.estimatedServiceSeconds * float64(request.Seats()) request.decision.Set(decisionExecute) return ok } @@ -686,29 +686,14 @@ func (qs *queueSet) selectQueueLocked() *queue { for range qs.queues { qs.robinIndex = (qs.robinIndex + 1) % nq queue := qs.queues[qs.robinIndex] - if queue.requests.Length() != 0 { + oldestWaiting, _ := queue.requests.Peek() + if oldestWaiting != nil { sMin = math.Min(sMin, queue.virtualStart) sMax = math.Max(sMax, queue.virtualStart) - estimatedWorkInProgress := qs.estimatedServiceTime * float64(queue.seatsInUse) + estimatedWorkInProgress := qs.estimatedServiceSeconds * float64(queue.seatsInUse) dsMin = math.Min(dsMin, queue.virtualStart-estimatedWorkInProgress) dsMax = math.Max(dsMax, queue.virtualStart-estimatedWorkInProgress) - // the finish R of the oldest request is: - // start R + G - // we are not taking the width of the request into account when - // we calculate the virtual finish time of the request because - // it can starve requests with smaller wdith in other queues. - // - // so let's draw an example of the starving scenario: - // - G=60 (estimated service time in seconds) - // - concurrency limit=2 - // - we have two queues, q1 and q2 - // - q1 has an infinite supply of requests with width W=1 - // - q2 has one request waiting in the queue with width W=2 - // - start R for both q1 and q2 are at t0 - // - requests complete really fast, S=1ms on q1 - // in this scenario we will execute roughly 60,000 requests - // from q1 before we pick the request from q2. - currentVirtualFinish := queue.virtualStart + qs.estimatedServiceTime + currentVirtualFinish := queue.virtualStart + qs.estimatedServiceSeconds*float64(oldestWaiting.Seats()) klog.V(11).InfoS("Considering queue to dispatch", "queueSet", qs.qCfg.Name, "queue", qs.robinIndex, "finishR", currentVirtualFinish) if currentVirtualFinish < minVirtualFinish { minVirtualFinish = currentVirtualFinish @@ -718,12 +703,7 @@ func (qs *queueSet) selectQueueLocked() *queue { } } - // TODO: add a method to fifo that lets us peek at the oldest request - var oldestReqFromMinQueue *request - minQueue.requests.Walk(func(r *request) bool { - oldestReqFromMinQueue = r - return false - }) + oldestReqFromMinQueue, _ := minQueue.requests.Peek() if oldestReqFromMinQueue == nil { // This cannot happen klog.ErrorS(errors.New("selected queue is empty"), "Impossible", "queueSet", qs.qCfg.Name) @@ -755,7 +735,7 @@ func (qs *queueSet) selectQueueLocked() *queue { // queue here. if the last start R (excluded estimated cost) // falls behind the global virtual time, we update the latest virtual // start by: + - previouslyEstimatedServiceTime := float64(minQueue.seatsInUse) * qs.estimatedServiceTime + previouslyEstimatedServiceTime := float64(minQueue.seatsInUse) * qs.estimatedServiceSeconds if qs.virtualTime > minQueue.virtualStart-previouslyEstimatedServiceTime { // per-queue virtual time should not fall behind the global minQueue.virtualStart = qs.virtualTime + previouslyEstimatedServiceTime @@ -853,7 +833,7 @@ func (qs *queueSet) finishRequestLocked(r *request) { // When a request finishes being served, and the actual service time was S, // the queue’s start R is decremented by (G - S)*width. - r.queue.virtualStart -= (qs.estimatedServiceTime - S) * float64(r.Seats()) + r.queue.virtualStart -= (qs.estimatedServiceSeconds - S) * float64(r.Seats()) } } diff --git a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/queueset/queueset_test.go b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/queueset/queueset_test.go index bc2324dc884..3ff0d927676 100644 --- a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/queueset/queueset_test.go +++ b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/queueset/queueset_test.go @@ -1034,7 +1034,7 @@ func TestTotalRequestsExecutingWithPanic(t *testing.T) { } func TestSelectQueueLocked(t *testing.T) { - var G float64 = 60 + var G float64 = 0.003 tests := []struct { name string robinIndex int @@ -1087,7 +1087,7 @@ func TestSelectQueueLocked(t *testing.T) { robinIndexExpected: []int{0}, }, { - name: "width > 1, seats are available for request with the least finish time, queue is picked", + name: "width > 1, seats are available for request with the least finish R, queue is picked", concurrencyLimit: 50, totSeatsInUse: 25, robinIndex: -1, @@ -1110,7 +1110,7 @@ func TestSelectQueueLocked(t *testing.T) { robinIndexExpected: []int{1}, }, { - name: "width > 1, seats are not available for request with the least finish time, queue is not picked", + name: "width > 1, seats are not available for request with the least finish R, queue is not picked", concurrencyLimit: 50, totSeatsInUse: 26, robinIndex: -1, @@ -1165,9 +1165,10 @@ func TestSelectQueueLocked(t *testing.T) { for _, test := range tests { t.Run(test.name, func(t *testing.T) { qs := &queueSet{ - estimatedServiceTime: G, - robinIndex: test.robinIndex, - totSeatsInUse: test.totSeatsInUse, + estimatedServiceSeconds: G, + robinIndex: test.robinIndex, + totSeatsInUse: test.totSeatsInUse, + qCfg: fq.QueuingConfig{Name: "TestSelectQueueLocked/" + test.name}, dCfg: fq.DispatchingConfig{ ConcurrencyLimit: test.concurrencyLimit, }, diff --git a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/queueset/types.go b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/queueset/types.go index 1363f24d781..77572314f9c 100644 --- a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/queueset/types.go +++ b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/queueset/types.go @@ -76,13 +76,14 @@ type request struct { // queue is an array of requests with additional metadata required for // the FQScheduler type queue struct { - // The requests are stored in a FIFO list. + // The requests not yet executing in the real world are stored in a FIFO list. requests fifo // virtualStart is the "virtual time" (R progress meter reading) at // which the next request will be dispatched in the virtual world. virtualStart float64 + // requestsExecuting is the count in the real world requestsExecuting int index int