diff --git a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/queueset/queueset.go b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/queueset/queueset.go index e5413dd94aa..115211a0f8b 100644 --- a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/queueset/queueset.go +++ b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/queueset/queueset.go @@ -125,6 +125,9 @@ type queueSet struct { // totSeatsInUse is the number of total "seats" in use by all the // request(s) that are currently executing in this queueset. totSeatsInUse int + + // enqueues is the number of requests that have ever been enqueued + enqueues int } // NewQueueSetFactory creates a new QueueSetFactory object @@ -213,8 +216,8 @@ func (qs *queueSet) setConfiguration(qCfg fq.QueuingConfig, dealer *shuffleshard if qCfg.DesiredNumQueues > 0 { // Adding queues is the only thing that requires immediate action - // Removing queues is handled by omitting indexes >DesiredNum from - // chooseQueueIndexLocked + // Removing queues is handled by attrition, removing a queue when + // it goes empty and there are too many. numQueues := len(qs.queues) if qCfg.DesiredNumQueues > numQueues { qs.queues = append(qs.queues, @@ -439,7 +442,7 @@ func (qs *queueSet) getVirtualTimeRatioLocked() float64 { // the queuelengthlimit has been reached func (qs *queueSet) timeoutOldRequestsAndRejectOrEnqueueLocked(ctx context.Context, workEstimate *fqrequest.WorkEstimate, hashValue uint64, flowDistinguisher, fsName string, descr1, descr2 interface{}, queueNoteFn fq.QueueNoteFn) *request { // Start with the shuffle sharding, to pick a queue. - queueIdx := qs.chooseQueueIndexLocked(hashValue, descr1, descr2) + queueIdx := qs.shuffleShardLocked(hashValue, descr1, descr2) queue := qs.queues[queueIdx] // The next step is the logic to reject requests that have been waiting too long qs.removeTimedOutRequestsFromQueueLocked(queue, fsName) @@ -447,6 +450,8 @@ func (qs *queueSet) timeoutOldRequestsAndRejectOrEnqueueLocked(ctx context.Conte // requests that are in the queue longer than the timeout if there are no new requests // We prefer the simplicity over the promptness, at least for now. + defer qs.boundNextDispatch(queue) + // Create a request and enqueue req := &request{ qs: qs, @@ -455,6 +460,7 @@ func (qs *queueSet) timeoutOldRequestsAndRejectOrEnqueueLocked(ctx context.Conte ctx: ctx, decision: qs.promiseFactory(nil, ctx.Done(), decisionCancel), arrivalTime: qs.clock.Now(), + arrivalR: qs.virtualTime, queue: queue, descr1: descr1, descr2: descr2, @@ -468,26 +474,37 @@ func (qs *queueSet) timeoutOldRequestsAndRejectOrEnqueueLocked(ctx context.Conte return req } -// chooseQueueIndexLocked uses shuffle sharding to select a queue index +// shuffleShardLocked uses shuffle sharding to select a queue index // using the given hashValue and the shuffle sharding parameters of the queueSet. -func (qs *queueSet) chooseQueueIndexLocked(hashValue uint64, descr1, descr2 interface{}) int { +func (qs *queueSet) shuffleShardLocked(hashValue uint64, descr1, descr2 interface{}) int { + var backHand [8]int + // Deal into a data structure, so that the order of visit below is not necessarily the order of the deal. + // This removes bias in the case of flows with overlapping hands. + hand := qs.dealer.DealIntoHand(hashValue, backHand[:]) + handSize := len(hand) + offset := qs.enqueues % handSize + qs.enqueues++ bestQueueIdx := -1 bestQueueSeatsSum := int(math.MaxInt32) - // the dealer uses the current desired number of queues, which is no larger than the number in `qs.queues`. - qs.dealer.Deal(hashValue, func(queueIdx int) { + for i := 0; i < handSize; i++ { + queueIdx := hand[(offset+i)%handSize] + queue := qs.queues[queueIdx] + waitingSeats := queue.requests.SeatsSum() // TODO: Consider taking into account `additional latency` of requests // in addition to their seats. // Ideally, this should be based on projected completion time in the // virtual world of the youngest request in the queue. - queue := qs.queues[queueIdx] - waitingSeats := queue.requests.SeatsSum() - thisSeatsSum := waitingSeats // + queue.seatsInUse - klog.V(7).Infof("QS(%s): For request %#+v %#+v considering queue %d of %d seats waiting and %d executing", qs.qCfg.Name, descr1, descr2, queueIdx, waitingSeats, queue.seatsInUse) + thisSeatsSum := waitingSeats + queue.seatsInUse + klog.V(7).Infof("QS(%s): For request %#+v %#+v considering queue %d of %d seats waiting and %d executing, virtualStart=%vss", qs.qCfg.Name, descr1, descr2, queueIdx, waitingSeats, queue.seatsInUse, queue.virtualStart) if thisSeatsSum < bestQueueSeatsSum { bestQueueIdx, bestQueueSeatsSum = queueIdx, thisSeatsSum } - }) - klog.V(6).Infof("QS(%s) at r=%s v=%.9fss: For request %#+v %#+v chose queue %d, had seatSum %d & %d requests executing", qs.qCfg.Name, qs.clock.Now().Format(nsTimeFmt), qs.virtualTime, descr1, descr2, bestQueueIdx, bestQueueSeatsSum, qs.queues[bestQueueIdx].requestsExecuting) + + } + if klog.V(6).Enabled() { + chosenQueue := qs.queues[bestQueueIdx] + klog.V(6).Infof("QS(%s) at r=%s v=%.9fss: For request %#+v %#+v chose queue %d, had seatSum %d & %d requests executing & virtualStart=%vss", qs.qCfg.Name, qs.clock.Now().Format(nsTimeFmt), qs.virtualTime, descr1, descr2, bestQueueIdx, chosenQueue.requests.SeatsSum(), chosenQueue.requestsExecuting, chosenQueue.virtualStart) + } return bestQueueIdx } @@ -592,6 +609,7 @@ func (qs *queueSet) dispatchSansQueueLocked(ctx context.Context, workEstimate *f startTime: now, decision: qs.promiseFactory(decisionExecute, ctx.Done(), decisionCancel), arrivalTime: now, + arrivalR: qs.virtualTime, descr1: descr1, descr2: descr2, workEstimate: *workEstimate, @@ -612,7 +630,7 @@ func (qs *queueSet) dispatchSansQueueLocked(ctx context.Context, workEstimate *f // return value indicates whether a request was dispatched; this will // be false when there are no requests waiting in any queue. func (qs *queueSet) dispatchLocked() bool { - queue := qs.selectQueueLocked() + queue := qs.findDispatchQueueLocked() if queue == nil { return false } @@ -644,6 +662,7 @@ func (qs *queueSet) dispatchLocked() bool { } // When a request is dequeued for service -> qs.virtualStart += G * width queue.virtualStart += qs.estimatedServiceSeconds * float64(request.Seats()) + qs.boundNextDispatch(queue) request.decision.Set(decisionExecute) return ok } @@ -671,10 +690,10 @@ func (qs *queueSet) canAccommodateSeatsLocked(seats int) bool { return true } -// selectQueueLocked examines the queues in round robin order and +// findDispatchQueueLocked examines the queues in round robin order and // returns the first one of those for which the virtual finish time of // the oldest waiting request is minimal. -func (qs *queueSet) selectQueueLocked() *queue { +func (qs *queueSet) findDispatchQueueLocked() *queue { minVirtualFinish := math.Inf(1) sMin := math.Inf(1) dsMin := math.Inf(1) @@ -723,22 +742,9 @@ func (qs *queueSet) selectQueueLocked() *queue { // for the next round. This way the non-selected queues // win in the case that the virtual finish times are the same qs.robinIndex = minIndex - // according to the original FQ formula: - // - // Si = MAX(R(t), Fi-1) - // - // the virtual start (excluding the estimated cost) of the chose - // queue should always be greater or equal to the global virtual - // time. - // - // hence we're refreshing the per-queue virtual time for the chosen - // queue here. if the last start R (excluded estimated cost) - // falls behind the global virtual time, we update the latest virtual - // start by: + - previouslyEstimatedServiceTime := float64(minQueue.seatsInUse) * qs.estimatedServiceSeconds - if qs.virtualTime > minQueue.virtualStart-previouslyEstimatedServiceTime { - // per-queue virtual time should not fall behind the global - minQueue.virtualStart = qs.virtualTime + previouslyEstimatedServiceTime + + if minQueue.virtualStart < oldestReqFromMinQueue.arrivalR { + klog.ErrorS(errors.New("dispatch before arrival"), "Inconceivable!", "QS", qs.qCfg.Name, "queue", minQueue.index, "dispatchR", minQueue.virtualStart, "request", oldestReqFromMinQueue) } metrics.SetDispatchMetrics(qs.qCfg.Name, qs.virtualTime, minQueue.virtualStart, sMin, sMax, dsMin, dsMax) return minQueue @@ -834,6 +840,28 @@ func (qs *queueSet) finishRequestLocked(r *request) { // When a request finishes being served, and the actual service time was S, // the queue’s start R is decremented by (G - S)*width. r.queue.virtualStart -= (qs.estimatedServiceSeconds - S) * float64(r.Seats()) + qs.boundNextDispatch(r.queue) + } +} + +// boundNextDispatch applies the anti-windup hack. +// We need a hack because all non-empty queues are allocated the same +// number of seats. A queue that can not use all those seats and does +// not go empty accumulates a progresively earlier `virtualStart` compared +// to queues that are using more than they are allocated. +// The following hack addresses the first side of that inequity, +// by insisting that dispatch in the virtual world not precede arrival. +func (qs *queueSet) boundNextDispatch(queue *queue) { + oldestReqFromMinQueue, _ := queue.requests.Peek() + if oldestReqFromMinQueue == nil { + return + } + var virtualStartBound = oldestReqFromMinQueue.arrivalR + if queue.virtualStart < virtualStartBound { + if klog.V(4).Enabled() { + klog.InfoS("AntiWindup tweaked queue", "QS", qs.qCfg.Name, "queue", queue.index, "time", qs.clock.Now().Format(nsTimeFmt), "requestDescr1", oldestReqFromMinQueue.descr1, "requestDescr2", oldestReqFromMinQueue.descr2, "newVirtualStart", virtualStartBound, "deltaVirtualStart", (virtualStartBound - queue.virtualStart)) + } + queue.virtualStart = virtualStartBound } } diff --git a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/queueset/queueset_test.go b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/queueset/queueset_test.go index 46bcec39105..dedea7689f0 100644 --- a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/queueset/queueset_test.go +++ b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/queueset/queueset_test.go @@ -333,9 +333,9 @@ func (uss *uniformScenarioState) evalTo(lim time.Time, last, expectFair bool, ma } if gotFair != expectFair { - uss.t.Errorf("%s client %d last=%v got an Average of %v but the expected average was %v", uss.name, i, last, averages[i], expectedAverage) + uss.t.Errorf("%s client %d last=%v expectFair=%v margin=%v got an Average of %v but the expected average was %v", uss.name, i, last, expectFair, margin, averages[i], expectedAverage) } else { - uss.t.Logf("%s client %d last=%v got an Average of %v and the expected average was %v", uss.name, i, last, averages[i], expectedAverage) + uss.t.Logf("%s client %d last=%v expectFair=%v margin=%v got an Average of %v and the expected average was %v", uss.name, i, last, expectFair, margin, averages[i], expectedAverage) } } } @@ -414,18 +414,20 @@ func TestMain(m *testing.M) { // TestNoRestraint tests whether the no-restraint factory gives every client what it asks for // even though that is unfair. +// Expects fairness when there is no competition, unfairness when there is competition. func TestNoRestraint(t *testing.T) { metrics.Register() testCases := []struct { concurrency int + margin float64 fair bool + name string }{ - {concurrency: 10, fair: true}, - {concurrency: 2, fair: false}, + {concurrency: 10, margin: 0.001, fair: true, name: "no-competition"}, + {concurrency: 2, margin: 0.25, fair: false, name: "with-competition"}, } for _, testCase := range testCases { - subName := fmt.Sprintf("concurrency=%v", testCase.concurrency) - t.Run(subName, func(t *testing.T) { + t.Run(testCase.name, func(t *testing.T) { now := time.Now() clk, counter := testeventclock.NewFake(now, 0, nil) nrc, err := test.NewNoRestraintFactory().BeginConstruction(fq.QueuingConfig{}, newObserverPair(clk)) @@ -433,16 +435,16 @@ func TestNoRestraint(t *testing.T) { t.Fatal(err) } nr := nrc.Complete(fq.DispatchingConfig{}) - uniformScenario{name: "NoRestraint/" + subName, + uniformScenario{name: "NoRestraint/" + testCase.name, qs: nr, clients: []uniformClient{ - newUniformClient(1001001001, 5, 10, time.Second, time.Second), - newUniformClient(2002002002, 2, 10, time.Second, time.Second/2), + newUniformClient(1001001001, 5, 15, time.Second, time.Second), + newUniformClient(2002002002, 2, 15, time.Second, time.Second/2), }, concurrencyLimit: testCase.concurrency, - evalDuration: time.Second * 15, + evalDuration: time.Second * 18, expectedFair: []bool{testCase.fair}, - expectedFairnessMargin: []float64{0.1}, + expectedFairnessMargin: []float64{testCase.margin}, expectAllRequests: true, clk: clk, counter: counter, @@ -563,7 +565,7 @@ func TestUniformFlowsHandSize1(t *testing.T) { concurrencyLimit: 4, evalDuration: time.Second * 50, expectedFair: []bool{true}, - expectedFairnessMargin: []float64{0.1}, + expectedFairnessMargin: []float64{0.01}, expectAllRequests: true, evalInqueueMetrics: true, evalExecutingMetrics: true, @@ -581,7 +583,7 @@ func TestUniformFlowsHandSize3(t *testing.T) { qCfg := fq.QueuingConfig{ Name: "TestUniformFlowsHandSize3", DesiredNumQueues: 8, - QueueLengthLimit: 4, + QueueLengthLimit: 16, HandSize: 3, RequestWaitLimit: 10 * time.Minute, } @@ -593,13 +595,13 @@ func TestUniformFlowsHandSize3(t *testing.T) { uniformScenario{name: qCfg.Name, qs: qs, clients: []uniformClient{ - newUniformClient(1001001001, 8, 30, time.Second, time.Second-1), - newUniformClient(2002002002, 8, 30, time.Second, time.Second-1), + newUniformClient(400900100100, 8, 30, time.Second, time.Second-1), + newUniformClient(300900200200, 8, 30, time.Second, time.Second-1), }, concurrencyLimit: 4, evalDuration: time.Second * 60, expectedFair: []bool{true}, - expectedFairnessMargin: []float64{0.1}, + expectedFairnessMargin: []float64{0.01}, expectAllRequests: true, evalInqueueMetrics: true, evalExecutingMetrics: true, @@ -636,7 +638,7 @@ func TestDifferentFlowsExpectEqual(t *testing.T) { concurrencyLimit: 4, evalDuration: time.Second * 40, expectedFair: []bool{true}, - expectedFairnessMargin: []float64{0.1}, + expectedFairnessMargin: []float64{0.01}, expectAllRequests: true, evalInqueueMetrics: true, evalExecutingMetrics: true, @@ -673,7 +675,7 @@ func TestDifferentFlowsExpectUnequal(t *testing.T) { concurrencyLimit: 3, evalDuration: time.Second * 20, expectedFair: []bool{true}, - expectedFairnessMargin: []float64{0.1}, + expectedFairnessMargin: []float64{0.01}, expectAllRequests: true, evalInqueueMetrics: true, evalExecutingMetrics: true, @@ -691,7 +693,7 @@ func TestDifferentWidths(t *testing.T) { qCfg := fq.QueuingConfig{ Name: "TestDifferentWidths", DesiredNumQueues: 64, - QueueLengthLimit: 4, + QueueLengthLimit: 13, HandSize: 7, RequestWaitLimit: 10 * time.Minute, } @@ -709,7 +711,7 @@ func TestDifferentWidths(t *testing.T) { concurrencyLimit: 6, evalDuration: time.Second * 20, expectedFair: []bool{true}, - expectedFairnessMargin: []float64{0.16}, + expectedFairnessMargin: []float64{0.125}, expectAllRequests: true, evalInqueueMetrics: true, evalExecutingMetrics: true, @@ -727,7 +729,7 @@ func TestTooWide(t *testing.T) { qCfg := fq.QueuingConfig{ Name: "TestTooWide", DesiredNumQueues: 64, - QueueLengthLimit: 7, + QueueLengthLimit: 35, HandSize: 7, RequestWaitLimit: 10 * time.Minute, } @@ -746,9 +748,9 @@ func TestTooWide(t *testing.T) { newUniformClient(90090090090090, 15, 21, time.Second, time.Second-1).seats(7), }, concurrencyLimit: 6, - evalDuration: time.Second * 435, + evalDuration: time.Second * 225, expectedFair: []bool{true}, - expectedFairnessMargin: []float64{0.375}, + expectedFairnessMargin: []float64{0.33}, expectAllRequests: true, evalInqueueMetrics: true, evalExecutingMetrics: true, @@ -775,18 +777,18 @@ func TestWindup(t *testing.T) { testCases := []struct { margin2 float64 expectFair2 bool + name string }{ - {margin2: 0.26, expectFair2: true}, - {margin2: 0.1, expectFair2: false}, + {margin2: 0.26, expectFair2: true, name: "upper-bound"}, + {margin2: 0.1, expectFair2: false, name: "lower-bound"}, } for _, testCase := range testCases { - subName := fmt.Sprintf("m2=%v", testCase.margin2) - t.Run(subName, func(t *testing.T) { + t.Run(testCase.name, func(t *testing.T) { now := time.Now() clk, counter := testeventclock.NewFake(now, 0, nil) qsf := newTestableQueueSetFactory(clk, countingPromiseFactoryFactory(counter)) qCfg := fq.QueuingConfig{ - Name: "TestWindup/" + subName, + Name: "TestWindup/" + testCase.name, DesiredNumQueues: 9, QueueLengthLimit: 6, HandSize: 1, @@ -806,7 +808,7 @@ func TestWindup(t *testing.T) { concurrencyLimit: 3, evalDuration: time.Second * 40, expectedFair: []bool{true, testCase.expectFair2}, - expectedFairnessMargin: []float64{0.1, testCase.margin2}, + expectedFairnessMargin: []float64{0.01, testCase.margin2}, expectAllRequests: true, evalInqueueMetrics: true, evalExecutingMetrics: true, @@ -842,7 +844,7 @@ func TestDifferentFlowsWithoutQueuing(t *testing.T) { concurrencyLimit: 4, evalDuration: time.Second * 13, expectedFair: []bool{false}, - expectedFairnessMargin: []float64{0.1}, + expectedFairnessMargin: []float64{0.20}, evalExecutingMetrics: true, rejectReason: "concurrency-limit", clk: clk, @@ -877,7 +879,7 @@ func TestTimeout(t *testing.T) { concurrencyLimit: 1, evalDuration: time.Second * 10, expectedFair: []bool{true}, - expectedFairnessMargin: []float64{0.1}, + expectedFairnessMargin: []float64{0.01}, evalInqueueMetrics: true, evalExecutingMetrics: true, rejectReason: "time-out", @@ -1070,7 +1072,7 @@ func TestTotalRequestsExecutingWithPanic(t *testing.T) { } } -func TestSelectQueueLocked(t *testing.T) { +func TestFindDispatchQueueLocked(t *testing.T) { var G float64 = 0.003 tests := []struct { name string @@ -1225,7 +1227,7 @@ func TestSelectQueueLocked(t *testing.T) { minQueueExpected = test.queues[queueIdx] } - minQueueGot := qs.selectQueueLocked() + minQueueGot := qs.findDispatchQueueLocked() if minQueueExpected != minQueueGot { t.Errorf("Expected queue: %#v, but got: %#v", minQueueExpected, minQueueGot) } diff --git a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/queueset/types.go b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/queueset/types.go index 77572314f9c..7213460bbfb 100644 --- a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/queueset/types.go +++ b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/queueset/types.go @@ -59,6 +59,9 @@ type request struct { // arrivalTime is the real time when the request entered this system arrivalTime time.Time + // arrivalR is R(arrivalTime). R is, confusingly, also called "virtual time". + arrivalR float64 + // descr1 and descr2 are not used in any logic but they appear in // log messages descr1, descr2 interface{}