mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-23 03:41:45 +00:00
Merge pull request #95986 from yue9944882/max-min-fairness
Mitigate wind-up problem in AP&F: prevent queue virtualStart lag
This commit is contained in:
commit
c0e88a352c
@ -645,6 +645,7 @@ func (qs *queueSet) selectQueueLocked() *queue {
|
|||||||
qs.robinIndex = (qs.robinIndex + 1) % nq
|
qs.robinIndex = (qs.robinIndex + 1) % nq
|
||||||
queue := qs.queues[qs.robinIndex]
|
queue := qs.queues[qs.robinIndex]
|
||||||
if len(queue.requests) != 0 {
|
if len(queue.requests) != 0 {
|
||||||
|
|
||||||
currentVirtualFinish := queue.GetVirtualFinish(0, qs.estimatedServiceTime)
|
currentVirtualFinish := queue.GetVirtualFinish(0, qs.estimatedServiceTime)
|
||||||
if currentVirtualFinish < minVirtualFinish {
|
if currentVirtualFinish < minVirtualFinish {
|
||||||
minVirtualFinish = currentVirtualFinish
|
minVirtualFinish = currentVirtualFinish
|
||||||
@ -657,6 +658,23 @@ func (qs *queueSet) selectQueueLocked() *queue {
|
|||||||
// for the next round. This way the non-selected queues
|
// for the next round. This way the non-selected queues
|
||||||
// win in the case that the virtual finish times are the same
|
// win in the case that the virtual finish times are the same
|
||||||
qs.robinIndex = minIndex
|
qs.robinIndex = minIndex
|
||||||
|
// according to the original FQ formula:
|
||||||
|
//
|
||||||
|
// Si = MAX(R(t), Fi-1)
|
||||||
|
//
|
||||||
|
// the virtual start (excluding the estimated cost) of the chose
|
||||||
|
// queue should always be greater or equal to the global virtual
|
||||||
|
// time.
|
||||||
|
//
|
||||||
|
// hence we're refreshing the per-queue virtual time for the chosen
|
||||||
|
// queue here. if the last virtual start time (excluded estimated cost)
|
||||||
|
// falls behind the global virtual time, we update the latest virtual
|
||||||
|
// start by: <latest global virtual time> + <previously estimated cost>
|
||||||
|
previouslyEstimatedServiceTime := float64(minQueue.requestsExecuting) * qs.estimatedServiceTime
|
||||||
|
if qs.virtualTime > minQueue.virtualStart-previouslyEstimatedServiceTime {
|
||||||
|
// per-queue virtual time should not fall behind the global
|
||||||
|
minQueue.virtualStart = qs.virtualTime + previouslyEstimatedServiceTime
|
||||||
|
}
|
||||||
return minQueue
|
return minQueue
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -125,7 +125,8 @@ type uniformScenario struct {
|
|||||||
clients []uniformClient
|
clients []uniformClient
|
||||||
concurrencyLimit int
|
concurrencyLimit int
|
||||||
evalDuration time.Duration
|
evalDuration time.Duration
|
||||||
expectFair []bool
|
expectedFair []bool
|
||||||
|
expectedFairnessMargin []float64
|
||||||
expectAllRequests bool
|
expectAllRequests bool
|
||||||
evalInqueueMetrics, evalExecutingMetrics bool
|
evalInqueueMetrics, evalExecutingMetrics bool
|
||||||
rejectReason string
|
rejectReason string
|
||||||
@ -182,9 +183,9 @@ func (uss *uniformScenarioState) exercise() {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
if uss.doSplit {
|
if uss.doSplit {
|
||||||
uss.evalTo(uss.startTime.Add(uss.evalDuration/2), false, uss.expectFair[0])
|
uss.evalTo(uss.startTime.Add(uss.evalDuration/2), false, uss.expectedFair[0], uss.expectedFairnessMargin[0])
|
||||||
}
|
}
|
||||||
uss.evalTo(uss.startTime.Add(uss.evalDuration), true, uss.expectFair[len(uss.expectFair)-1])
|
uss.evalTo(uss.startTime.Add(uss.evalDuration), true, uss.expectedFair[len(uss.expectedFair)-1], uss.expectedFairnessMargin[len(uss.expectedFairnessMargin)-1])
|
||||||
uss.clk.Run(nil)
|
uss.clk.Run(nil)
|
||||||
uss.finalReview()
|
uss.finalReview()
|
||||||
}
|
}
|
||||||
@ -252,7 +253,7 @@ func (ust *uniformScenarioThread) callK(k int) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (uss *uniformScenarioState) evalTo(lim time.Time, last, expectFair bool) {
|
func (uss *uniformScenarioState) evalTo(lim time.Time, last, expectFair bool, margin float64) {
|
||||||
uss.clk.Run(&lim)
|
uss.clk.Run(&lim)
|
||||||
uss.clk.SetTime(lim)
|
uss.clk.SetTime(lim)
|
||||||
if uss.doSplit && !last {
|
if uss.doSplit && !last {
|
||||||
@ -275,10 +276,11 @@ func (uss *uniformScenarioState) evalTo(lim time.Time, last, expectFair bool) {
|
|||||||
var gotFair bool
|
var gotFair bool
|
||||||
if fairAverages[i] > 0 {
|
if fairAverages[i] > 0 {
|
||||||
relDiff := (averages[i] - fairAverages[i]) / fairAverages[i]
|
relDiff := (averages[i] - fairAverages[i]) / fairAverages[i]
|
||||||
gotFair = math.Abs(relDiff) <= 0.1
|
gotFair = math.Abs(relDiff) <= margin
|
||||||
} else {
|
} else {
|
||||||
gotFair = math.Abs(averages[i]) <= 0.1
|
gotFair = math.Abs(averages[i]) <= margin
|
||||||
}
|
}
|
||||||
|
|
||||||
if gotFair != expectFair {
|
if gotFair != expectFair {
|
||||||
uss.t.Errorf("%s client %d last=%v got an Average of %v but the fair average was %v", uss.name, i, last, averages[i], fairAverages[i])
|
uss.t.Errorf("%s client %d last=%v got an Average of %v but the fair average was %v", uss.name, i, last, averages[i], fairAverages[i])
|
||||||
} else {
|
} else {
|
||||||
@ -371,12 +373,13 @@ func TestNoRestraint(t *testing.T) {
|
|||||||
{1001001001, 5, 10, time.Second, time.Second, false},
|
{1001001001, 5, 10, time.Second, time.Second, false},
|
||||||
{2002002002, 2, 10, time.Second, time.Second / 2, false},
|
{2002002002, 2, 10, time.Second, time.Second / 2, false},
|
||||||
},
|
},
|
||||||
concurrencyLimit: 10,
|
concurrencyLimit: 10,
|
||||||
evalDuration: time.Second * 15,
|
evalDuration: time.Second * 15,
|
||||||
expectFair: []bool{true},
|
expectedFair: []bool{true},
|
||||||
expectAllRequests: true,
|
expectedFairnessMargin: []float64{0.1},
|
||||||
clk: clk,
|
expectAllRequests: true,
|
||||||
counter: counter,
|
clk: clk,
|
||||||
|
counter: counter,
|
||||||
}.exercise(t)
|
}.exercise(t)
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -405,14 +408,15 @@ func TestUniformFlowsHandSize1(t *testing.T) {
|
|||||||
{1001001001, 8, 20, time.Second, time.Second - 1, false},
|
{1001001001, 8, 20, time.Second, time.Second - 1, false},
|
||||||
{2002002002, 8, 20, time.Second, time.Second - 1, false},
|
{2002002002, 8, 20, time.Second, time.Second - 1, false},
|
||||||
},
|
},
|
||||||
concurrencyLimit: 4,
|
concurrencyLimit: 4,
|
||||||
evalDuration: time.Second * 50,
|
evalDuration: time.Second * 50,
|
||||||
expectFair: []bool{true},
|
expectedFair: []bool{true},
|
||||||
expectAllRequests: true,
|
expectedFairnessMargin: []float64{0.1},
|
||||||
evalInqueueMetrics: true,
|
expectAllRequests: true,
|
||||||
evalExecutingMetrics: true,
|
evalInqueueMetrics: true,
|
||||||
clk: clk,
|
evalExecutingMetrics: true,
|
||||||
counter: counter,
|
clk: clk,
|
||||||
|
counter: counter,
|
||||||
}.exercise(t)
|
}.exercise(t)
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -440,14 +444,15 @@ func TestUniformFlowsHandSize3(t *testing.T) {
|
|||||||
{1001001001, 8, 30, time.Second, time.Second - 1, false},
|
{1001001001, 8, 30, time.Second, time.Second - 1, false},
|
||||||
{2002002002, 8, 30, time.Second, time.Second - 1, false},
|
{2002002002, 8, 30, time.Second, time.Second - 1, false},
|
||||||
},
|
},
|
||||||
concurrencyLimit: 4,
|
concurrencyLimit: 4,
|
||||||
evalDuration: time.Second * 60,
|
evalDuration: time.Second * 60,
|
||||||
expectFair: []bool{true},
|
expectedFair: []bool{true},
|
||||||
expectAllRequests: true,
|
expectedFairnessMargin: []float64{0.1},
|
||||||
evalInqueueMetrics: true,
|
expectAllRequests: true,
|
||||||
evalExecutingMetrics: true,
|
evalInqueueMetrics: true,
|
||||||
clk: clk,
|
evalExecutingMetrics: true,
|
||||||
counter: counter,
|
clk: clk,
|
||||||
|
counter: counter,
|
||||||
}.exercise(t)
|
}.exercise(t)
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -476,14 +481,15 @@ func TestDifferentFlowsExpectEqual(t *testing.T) {
|
|||||||
{1001001001, 8, 20, time.Second, time.Second, false},
|
{1001001001, 8, 20, time.Second, time.Second, false},
|
||||||
{2002002002, 7, 30, time.Second, time.Second / 2, false},
|
{2002002002, 7, 30, time.Second, time.Second / 2, false},
|
||||||
},
|
},
|
||||||
concurrencyLimit: 4,
|
concurrencyLimit: 4,
|
||||||
evalDuration: time.Second * 40,
|
evalDuration: time.Second * 40,
|
||||||
expectFair: []bool{true},
|
expectedFair: []bool{true},
|
||||||
expectAllRequests: true,
|
expectedFairnessMargin: []float64{0.1},
|
||||||
evalInqueueMetrics: true,
|
expectAllRequests: true,
|
||||||
evalExecutingMetrics: true,
|
evalInqueueMetrics: true,
|
||||||
clk: clk,
|
evalExecutingMetrics: true,
|
||||||
counter: counter,
|
clk: clk,
|
||||||
|
counter: counter,
|
||||||
}.exercise(t)
|
}.exercise(t)
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -512,14 +518,15 @@ func TestDifferentFlowsExpectUnequal(t *testing.T) {
|
|||||||
{1001001001, 4, 20, time.Second, time.Second - 1, false},
|
{1001001001, 4, 20, time.Second, time.Second - 1, false},
|
||||||
{2002002002, 2, 20, time.Second, time.Second - 1, false},
|
{2002002002, 2, 20, time.Second, time.Second - 1, false},
|
||||||
},
|
},
|
||||||
concurrencyLimit: 3,
|
concurrencyLimit: 3,
|
||||||
evalDuration: time.Second * 20,
|
evalDuration: time.Second * 20,
|
||||||
expectFair: []bool{true},
|
expectedFair: []bool{true},
|
||||||
expectAllRequests: true,
|
expectedFairnessMargin: []float64{0.1},
|
||||||
evalInqueueMetrics: true,
|
expectAllRequests: true,
|
||||||
evalExecutingMetrics: true,
|
evalInqueueMetrics: true,
|
||||||
clk: clk,
|
evalExecutingMetrics: true,
|
||||||
counter: counter,
|
clk: clk,
|
||||||
|
counter: counter,
|
||||||
}.exercise(t)
|
}.exercise(t)
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -547,14 +554,15 @@ func TestWindup(t *testing.T) {
|
|||||||
{1001001001, 2, 40, time.Second, -1, false},
|
{1001001001, 2, 40, time.Second, -1, false},
|
||||||
{2002002002, 2, 40, time.Second, -1, true},
|
{2002002002, 2, 40, time.Second, -1, true},
|
||||||
},
|
},
|
||||||
concurrencyLimit: 3,
|
concurrencyLimit: 3,
|
||||||
evalDuration: time.Second * 40,
|
evalDuration: time.Second * 40,
|
||||||
expectFair: []bool{true, false},
|
expectedFair: []bool{true, true},
|
||||||
expectAllRequests: true,
|
expectedFairnessMargin: []float64{0.1, 0.26},
|
||||||
evalInqueueMetrics: true,
|
expectAllRequests: true,
|
||||||
evalExecutingMetrics: true,
|
evalInqueueMetrics: true,
|
||||||
clk: clk,
|
evalExecutingMetrics: true,
|
||||||
counter: counter,
|
clk: clk,
|
||||||
|
counter: counter,
|
||||||
}.exercise(t)
|
}.exercise(t)
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -580,13 +588,14 @@ func TestDifferentFlowsWithoutQueuing(t *testing.T) {
|
|||||||
{1001001001, 6, 10, time.Second, 57 * time.Millisecond, false},
|
{1001001001, 6, 10, time.Second, 57 * time.Millisecond, false},
|
||||||
{2002002002, 4, 15, time.Second, 750 * time.Millisecond, false},
|
{2002002002, 4, 15, time.Second, 750 * time.Millisecond, false},
|
||||||
},
|
},
|
||||||
concurrencyLimit: 4,
|
concurrencyLimit: 4,
|
||||||
evalDuration: time.Second * 13,
|
evalDuration: time.Second * 13,
|
||||||
expectFair: []bool{false},
|
expectedFair: []bool{false},
|
||||||
evalExecutingMetrics: true,
|
expectedFairnessMargin: []float64{0.1},
|
||||||
rejectReason: "concurrency-limit",
|
evalExecutingMetrics: true,
|
||||||
clk: clk,
|
rejectReason: "concurrency-limit",
|
||||||
counter: counter,
|
clk: clk,
|
||||||
|
counter: counter,
|
||||||
}.exercise(t)
|
}.exercise(t)
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -614,14 +623,15 @@ func TestTimeout(t *testing.T) {
|
|||||||
clients: []uniformClient{
|
clients: []uniformClient{
|
||||||
{1001001001, 5, 100, time.Second, time.Second, false},
|
{1001001001, 5, 100, time.Second, time.Second, false},
|
||||||
},
|
},
|
||||||
concurrencyLimit: 1,
|
concurrencyLimit: 1,
|
||||||
evalDuration: time.Second * 10,
|
evalDuration: time.Second * 10,
|
||||||
expectFair: []bool{true},
|
expectedFair: []bool{true},
|
||||||
evalInqueueMetrics: true,
|
expectedFairnessMargin: []float64{0.1},
|
||||||
evalExecutingMetrics: true,
|
evalInqueueMetrics: true,
|
||||||
rejectReason: "time-out",
|
evalExecutingMetrics: true,
|
||||||
clk: clk,
|
rejectReason: "time-out",
|
||||||
counter: counter,
|
clk: clk,
|
||||||
|
counter: counter,
|
||||||
}.exercise(t)
|
}.exercise(t)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
Loading…
Reference in New Issue
Block a user