diff --git a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/queueset/queueset.go b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/queueset/queueset.go index b469a4ac575..b61d8ce7d07 100644 --- a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/queueset/queueset.go +++ b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/queueset/queueset.go @@ -645,6 +645,7 @@ func (qs *queueSet) selectQueueLocked() *queue { qs.robinIndex = (qs.robinIndex + 1) % nq queue := qs.queues[qs.robinIndex] if len(queue.requests) != 0 { + currentVirtualFinish := queue.GetVirtualFinish(0, qs.estimatedServiceTime) if currentVirtualFinish < minVirtualFinish { minVirtualFinish = currentVirtualFinish @@ -657,6 +658,23 @@ func (qs *queueSet) selectQueueLocked() *queue { // for the next round. This way the non-selected queues // win in the case that the virtual finish times are the same qs.robinIndex = minIndex + // according to the original FQ formula: + // + // Si = MAX(R(t), Fi-1) + // + // the virtual start (excluding the estimated cost) of the chose + // queue should always be greater or equal to the global virtual + // time. + // + // hence we're refreshing the per-queue virtual time for the chosen + // queue here. if the last virtual start time (excluded estimated cost) + // falls behind the global virtual time, we update the latest virtual + // start by: + + previouslyEstimatedServiceTime := float64(minQueue.requestsExecuting) * qs.estimatedServiceTime + if qs.virtualTime > minQueue.virtualStart-previouslyEstimatedServiceTime { + // per-queue virtual time should not fall behind the global + minQueue.virtualStart = qs.virtualTime + previouslyEstimatedServiceTime + } return minQueue } diff --git a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/queueset/queueset_test.go b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/queueset/queueset_test.go index 7b26569924a..caf152814ea 100644 --- a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/queueset/queueset_test.go +++ b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/queueset/queueset_test.go @@ -125,7 +125,8 @@ type uniformScenario struct { clients []uniformClient concurrencyLimit int evalDuration time.Duration - expectFair []bool + expectedFair []bool + expectedFairnessMargin []float64 expectAllRequests bool evalInqueueMetrics, evalExecutingMetrics bool rejectReason string @@ -182,9 +183,9 @@ func (uss *uniformScenarioState) exercise() { } } if uss.doSplit { - uss.evalTo(uss.startTime.Add(uss.evalDuration/2), false, uss.expectFair[0]) + uss.evalTo(uss.startTime.Add(uss.evalDuration/2), false, uss.expectedFair[0], uss.expectedFairnessMargin[0]) } - uss.evalTo(uss.startTime.Add(uss.evalDuration), true, uss.expectFair[len(uss.expectFair)-1]) + uss.evalTo(uss.startTime.Add(uss.evalDuration), true, uss.expectedFair[len(uss.expectedFair)-1], uss.expectedFairnessMargin[len(uss.expectedFairnessMargin)-1]) uss.clk.Run(nil) uss.finalReview() } @@ -252,7 +253,7 @@ func (ust *uniformScenarioThread) callK(k int) { } } -func (uss *uniformScenarioState) evalTo(lim time.Time, last, expectFair bool) { +func (uss *uniformScenarioState) evalTo(lim time.Time, last, expectFair bool, margin float64) { uss.clk.Run(&lim) uss.clk.SetTime(lim) if uss.doSplit && !last { @@ -275,10 +276,11 @@ func (uss *uniformScenarioState) evalTo(lim time.Time, last, expectFair bool) { var gotFair bool if fairAverages[i] > 0 { relDiff := (averages[i] - fairAverages[i]) / fairAverages[i] - gotFair = math.Abs(relDiff) <= 0.1 + gotFair = math.Abs(relDiff) <= margin } else { - gotFair = math.Abs(averages[i]) <= 0.1 + gotFair = math.Abs(averages[i]) <= margin } + if gotFair != expectFair { uss.t.Errorf("%s client %d last=%v got an Average of %v but the fair average was %v", uss.name, i, last, averages[i], fairAverages[i]) } else { @@ -371,12 +373,13 @@ func TestNoRestraint(t *testing.T) { {1001001001, 5, 10, time.Second, time.Second, false}, {2002002002, 2, 10, time.Second, time.Second / 2, false}, }, - concurrencyLimit: 10, - evalDuration: time.Second * 15, - expectFair: []bool{true}, - expectAllRequests: true, - clk: clk, - counter: counter, + concurrencyLimit: 10, + evalDuration: time.Second * 15, + expectedFair: []bool{true}, + expectedFairnessMargin: []float64{0.1}, + expectAllRequests: true, + clk: clk, + counter: counter, }.exercise(t) } @@ -405,14 +408,15 @@ func TestUniformFlowsHandSize1(t *testing.T) { {1001001001, 8, 20, time.Second, time.Second - 1, false}, {2002002002, 8, 20, time.Second, time.Second - 1, false}, }, - concurrencyLimit: 4, - evalDuration: time.Second * 50, - expectFair: []bool{true}, - expectAllRequests: true, - evalInqueueMetrics: true, - evalExecutingMetrics: true, - clk: clk, - counter: counter, + concurrencyLimit: 4, + evalDuration: time.Second * 50, + expectedFair: []bool{true}, + expectedFairnessMargin: []float64{0.1}, + expectAllRequests: true, + evalInqueueMetrics: true, + evalExecutingMetrics: true, + clk: clk, + counter: counter, }.exercise(t) } @@ -440,14 +444,15 @@ func TestUniformFlowsHandSize3(t *testing.T) { {1001001001, 8, 30, time.Second, time.Second - 1, false}, {2002002002, 8, 30, time.Second, time.Second - 1, false}, }, - concurrencyLimit: 4, - evalDuration: time.Second * 60, - expectFair: []bool{true}, - expectAllRequests: true, - evalInqueueMetrics: true, - evalExecutingMetrics: true, - clk: clk, - counter: counter, + concurrencyLimit: 4, + evalDuration: time.Second * 60, + expectedFair: []bool{true}, + expectedFairnessMargin: []float64{0.1}, + expectAllRequests: true, + evalInqueueMetrics: true, + evalExecutingMetrics: true, + clk: clk, + counter: counter, }.exercise(t) } @@ -476,14 +481,15 @@ func TestDifferentFlowsExpectEqual(t *testing.T) { {1001001001, 8, 20, time.Second, time.Second, false}, {2002002002, 7, 30, time.Second, time.Second / 2, false}, }, - concurrencyLimit: 4, - evalDuration: time.Second * 40, - expectFair: []bool{true}, - expectAllRequests: true, - evalInqueueMetrics: true, - evalExecutingMetrics: true, - clk: clk, - counter: counter, + concurrencyLimit: 4, + evalDuration: time.Second * 40, + expectedFair: []bool{true}, + expectedFairnessMargin: []float64{0.1}, + expectAllRequests: true, + evalInqueueMetrics: true, + evalExecutingMetrics: true, + clk: clk, + counter: counter, }.exercise(t) } @@ -512,14 +518,15 @@ func TestDifferentFlowsExpectUnequal(t *testing.T) { {1001001001, 4, 20, time.Second, time.Second - 1, false}, {2002002002, 2, 20, time.Second, time.Second - 1, false}, }, - concurrencyLimit: 3, - evalDuration: time.Second * 20, - expectFair: []bool{true}, - expectAllRequests: true, - evalInqueueMetrics: true, - evalExecutingMetrics: true, - clk: clk, - counter: counter, + concurrencyLimit: 3, + evalDuration: time.Second * 20, + expectedFair: []bool{true}, + expectedFairnessMargin: []float64{0.1}, + expectAllRequests: true, + evalInqueueMetrics: true, + evalExecutingMetrics: true, + clk: clk, + counter: counter, }.exercise(t) } @@ -547,14 +554,15 @@ func TestWindup(t *testing.T) { {1001001001, 2, 40, time.Second, -1, false}, {2002002002, 2, 40, time.Second, -1, true}, }, - concurrencyLimit: 3, - evalDuration: time.Second * 40, - expectFair: []bool{true, false}, - expectAllRequests: true, - evalInqueueMetrics: true, - evalExecutingMetrics: true, - clk: clk, - counter: counter, + concurrencyLimit: 3, + evalDuration: time.Second * 40, + expectedFair: []bool{true, true}, + expectedFairnessMargin: []float64{0.1, 0.26}, + expectAllRequests: true, + evalInqueueMetrics: true, + evalExecutingMetrics: true, + clk: clk, + counter: counter, }.exercise(t) } @@ -580,13 +588,14 @@ func TestDifferentFlowsWithoutQueuing(t *testing.T) { {1001001001, 6, 10, time.Second, 57 * time.Millisecond, false}, {2002002002, 4, 15, time.Second, 750 * time.Millisecond, false}, }, - concurrencyLimit: 4, - evalDuration: time.Second * 13, - expectFair: []bool{false}, - evalExecutingMetrics: true, - rejectReason: "concurrency-limit", - clk: clk, - counter: counter, + concurrencyLimit: 4, + evalDuration: time.Second * 13, + expectedFair: []bool{false}, + expectedFairnessMargin: []float64{0.1}, + evalExecutingMetrics: true, + rejectReason: "concurrency-limit", + clk: clk, + counter: counter, }.exercise(t) } @@ -614,14 +623,15 @@ func TestTimeout(t *testing.T) { clients: []uniformClient{ {1001001001, 5, 100, time.Second, time.Second, false}, }, - concurrencyLimit: 1, - evalDuration: time.Second * 10, - expectFair: []bool{true}, - evalInqueueMetrics: true, - evalExecutingMetrics: true, - rejectReason: "time-out", - clk: clk, - counter: counter, + concurrencyLimit: 1, + evalDuration: time.Second * 10, + expectedFair: []bool{true}, + expectedFairnessMargin: []float64{0.1}, + evalInqueueMetrics: true, + evalExecutingMetrics: true, + rejectReason: "time-out", + clk: clk, + counter: counter, }.exercise(t) }