diff --git a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/queueset/queueset_test.go b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/queueset/queueset_test.go index ae9e1814547..4ec4e00b013 100644 --- a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/queueset/queueset_test.go +++ b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/queueset/queueset_test.go @@ -111,17 +111,18 @@ type uniformClient struct { // padDuration is additional time during which this request occupies its seats. // This comes at the end of execution, after the reply has been released toward // the client. - // The evaluation code below can only handle two cases: - // - this padding always keeps another request out of the seats, or - // - this padding never keeps another request out of the seats. - // Set the `padConstrains` field of the scenario accordingly. + // The evaluation code below does not take this into account. + // In cases where `padDuration` makes a difference, + // set the `expectedAverages` field of `uniformScenario`. padDuration time.Duration // When true indicates that only half the specified number of // threads should run during the first half of the evaluation // period split bool - // width is the number of seats this request occupies while executing - width uint + // initialSeats is the number of seats this request occupies in the first phase of execution + initialSeats uint + // finalSeats is the number occupied during the second phase of execution + finalSeats uint } func newUniformClient(hash uint64, nThreads, nCalls int, execDuration, thinkDuration time.Duration) uniformClient { @@ -131,7 +132,8 @@ func newUniformClient(hash uint64, nThreads, nCalls int, execDuration, thinkDura nCalls: nCalls, execDuration: execDuration, thinkDuration: thinkDuration, - width: 1, + initialSeats: 1, + finalSeats: 1, } } @@ -140,12 +142,13 @@ func (uc uniformClient) setSplit() uniformClient { return uc } -func (uc uniformClient) seats(width uint) uniformClient { - uc.width = width +func (uc uniformClient) setInitWidth(seats uint) uniformClient { + uc.initialSeats = seats return uc } -func (uc uniformClient) pad(duration time.Duration) uniformClient { +func (uc uniformClient) pad(finalSeats int, duration time.Duration) uniformClient { + uc.finalSeats = uint(finalSeats) uc.padDuration = duration return uc } @@ -163,8 +166,7 @@ func (uc uniformClient) pad(duration time.Duration) uniformClient { // fair in the respective halves of a split scenario; // in a non-split scenario this is a singleton with one expectation. // expectAllRequests indicates whether all requests are expected to get dispatched. -// padConstrains indicates whether the execution duration padding, if any, -// is expected to hold up dispatching. +// expectedAverages, if provided, replaces the normal calculation of expected results. type uniformScenario struct { name string qs fq.QueueSet @@ -178,7 +180,7 @@ type uniformScenario struct { rejectReason string clk *testeventclock.Fake counter counter.GoRoutineCounter - padConstrains bool + expectedAverages []float64 expectedEpochAdvances int } @@ -267,7 +269,7 @@ func (ust *uniformScenarioThread) callK(k int) { if k >= ust.nCalls { return } - req, idle := ust.uss.qs.StartRequest(context.Background(), &fcrequest.WorkEstimate{InitialSeats: ust.uc.width, AdditionalLatency: ust.uc.padDuration}, ust.uc.hash, "", ust.fsName, ust.uss.name, []int{ust.i, ust.j, k}, nil) + req, idle := ust.uss.qs.StartRequest(context.Background(), &fcrequest.WorkEstimate{InitialSeats: ust.uc.initialSeats, FinalSeats: ust.uc.finalSeats, AdditionalLatency: ust.uc.padDuration}, ust.uc.hash, "", ust.fsName, ust.uss.name, []int{ust.i, ust.j, k}, nil) ust.uss.t.Logf("%s: %d, %d, %d got req=%p, idle=%v", ust.uss.clk.Now().Format(nsTimeFmt), ust.i, ust.j, k, req, idle) if req == nil { atomic.AddUint64(&ust.uss.failedCount, 1) @@ -283,11 +285,11 @@ func (ust *uniformScenarioThread) callK(k int) { executed = true execStart := ust.uss.clk.Now() atomic.AddInt32(&ust.uss.executions[ust.i], 1) - ust.igr.Add(float64(ust.uc.width)) - ust.uss.t.Logf("%s: %d, %d, %d executing; seats=%d", execStart.Format(nsTimeFmt), ust.i, ust.j, k, ust.uc.width) + ust.igr.Add(float64(ust.uc.initialSeats)) + ust.uss.t.Logf("%s: %d, %d, %d executing; width1=%d", execStart.Format(nsTimeFmt), ust.i, ust.j, k, ust.uc.initialSeats) ust.uss.clk.EventAfterDuration(ust.genCallK(k+1), ust.uc.execDuration+ust.uc.thinkDuration) ust.uss.clk.Sleep(ust.uc.execDuration) - ust.igr.Add(-float64(ust.uc.width)) + ust.igr.Add(-float64(ust.uc.initialSeats)) returnTime = ust.uss.clk.Now() }) now := ust.uss.clk.Now() @@ -304,9 +306,9 @@ func (uss *uniformScenarioState) evalTo(lim time.Time, last, expectFair bool, ma uss.clk.Run(&lim) uss.clk.SetTime(lim) if uss.doSplit && !last { - uss.t.Logf("%s: End of first half", uss.clk.Now().Format(nsTimeFmt)) + uss.t.Logf("%s: End of first half of scenario %q", uss.clk.Now().Format(nsTimeFmt), uss.name) } else { - uss.t.Logf("%s: End", uss.clk.Now().Format(nsTimeFmt)) + uss.t.Logf("%s: End of scenario %q", uss.clk.Now().Format(nsTimeFmt), uss.name) } demands := make([]float64, len(uss.clients)) averages := make([]float64, len(uss.clients)) @@ -316,13 +318,13 @@ func (uss *uniformScenarioState) evalTo(lim time.Time, last, expectFair bool, ma nThreads = nThreads / 2 } sep := uc.thinkDuration - if uss.padConstrains && uc.padDuration > sep { - sep = uc.padDuration - } - demands[i] = float64(nThreads) * float64(uc.width) * float64(uc.execDuration) / float64(sep+uc.execDuration) + demands[i] = float64(nThreads) * float64(uc.initialSeats) * float64(uc.execDuration) / float64(sep+uc.execDuration) averages[i] = uss.integrators[i].Reset().Average } - fairAverages := fairAlloc(demands, float64(uss.concurrencyLimit)) + fairAverages := uss.expectedAverages + if fairAverages == nil { + fairAverages = fairAlloc(demands, float64(uss.concurrencyLimit)) + } for i := range uss.clients { expectedAverage := fairAverages[i] var gotFair bool @@ -503,50 +505,74 @@ func TestBaseline(t *testing.T) { } func TestSeparations(t *testing.T) { - for _, seps := range []struct{ think, pad time.Duration }{ - {think: time.Second, pad: 0}, - {think: 0, pad: time.Second}, - {think: time.Second, pad: time.Second / 2}, - {think: time.Second / 2, pad: time.Second}, + flts := func(avgs ...float64) []float64 { return avgs } + for _, seps := range []struct { + think, pad time.Duration + finalSeats, conc, nClients int + exp []float64 // override expected results + }{ + {think: time.Second, pad: 0, finalSeats: 1, conc: 1, nClients: 1}, + {think: time.Second, pad: 0, finalSeats: 1, conc: 2, nClients: 1}, + {think: time.Second, pad: 0, finalSeats: 2, conc: 2, nClients: 1}, + {think: time.Second, pad: 0, finalSeats: 1, conc: 1, nClients: 2}, + {think: time.Second, pad: 0, finalSeats: 1, conc: 2, nClients: 2}, + {think: time.Second, pad: 0, finalSeats: 2, conc: 2, nClients: 2}, + {think: 0, pad: time.Second, finalSeats: 1, conc: 1, nClients: 1, exp: flts(0.5)}, + {think: 0, pad: time.Second, finalSeats: 1, conc: 2, nClients: 1}, + {think: 0, pad: time.Second, finalSeats: 2, conc: 2, nClients: 1, exp: flts(0.5)}, + {think: 0, pad: time.Second, finalSeats: 1, conc: 1, nClients: 2, exp: flts(0.25, 0.25)}, + {think: 0, pad: time.Second, finalSeats: 1, conc: 2, nClients: 2, exp: flts(0.5, 0.5)}, + {think: 0, pad: time.Second, finalSeats: 2, conc: 2, nClients: 2, exp: flts(0.25, 0.25)}, + {think: time.Second, pad: time.Second / 2, finalSeats: 1, conc: 1, nClients: 1}, + {think: time.Second, pad: time.Second / 2, finalSeats: 1, conc: 2, nClients: 1}, + {think: time.Second, pad: time.Second / 2, finalSeats: 2, conc: 2, nClients: 1}, + {think: time.Second, pad: time.Second / 2, finalSeats: 1, conc: 1, nClients: 2, exp: flts(1.0/3, 1.0/3)}, + {think: time.Second, pad: time.Second / 2, finalSeats: 1, conc: 2, nClients: 2}, + {think: time.Second, pad: time.Second / 2, finalSeats: 2, conc: 2, nClients: 2, exp: flts(1.0/3, 1.0/3)}, + {think: time.Second / 2, pad: time.Second, finalSeats: 1, conc: 1, nClients: 1, exp: flts(0.5)}, + {think: time.Second / 2, pad: time.Second, finalSeats: 1, conc: 2, nClients: 1}, + {think: time.Second / 2, pad: time.Second, finalSeats: 2, conc: 2, nClients: 1, exp: flts(0.5)}, + {think: time.Second / 2, pad: time.Second, finalSeats: 1, conc: 1, nClients: 2, exp: flts(0.25, 0.25)}, + {think: time.Second / 2, pad: time.Second, finalSeats: 1, conc: 2, nClients: 2, exp: flts(0.5, 0.5)}, + {think: time.Second / 2, pad: time.Second, finalSeats: 2, conc: 2, nClients: 2, exp: flts(0.25, 0.25)}, } { - for conc := 1; conc <= 2; conc++ { - caseName := fmt.Sprintf("seps%v,%v,%v", seps.think, seps.pad, conc) - t.Run(caseName, func(t *testing.T) { - metrics.Register() - now := time.Now() + caseName := fmt.Sprintf("think=%v,finalSeats=%d,pad=%v,nClients=%d,conc=%d", seps.think, seps.finalSeats, seps.pad, seps.nClients, seps.conc) + t.Run(caseName, func(t *testing.T) { + metrics.Register() + now := time.Now() - clk, counter := testeventclock.NewFake(now, 0, nil) - qsf := newTestableQueueSetFactory(clk, countingPromiseFactoryFactory(counter)) - qCfg := fq.QueuingConfig{ - Name: caseName, - DesiredNumQueues: 9, - QueueLengthLimit: 8, - HandSize: 3, - RequestWaitLimit: 10 * time.Minute, - } - qsc, err := qsf.BeginConstruction(qCfg, newObserverPair(clk)) - if err != nil { - t.Fatal(err) - } - qs := qsc.Complete(fq.DispatchingConfig{ConcurrencyLimit: conc}) - uniformScenario{name: qCfg.Name, - qs: qs, - clients: []uniformClient{ - newUniformClient(1001001001, 1, 19, time.Second, seps.think).pad(seps.pad), - }, - concurrencyLimit: conc, - evalDuration: time.Second * 18, // multiple of every period involved, so that margin can be 0 below - expectedFair: []bool{true}, - expectedFairnessMargin: []float64{0}, - expectAllRequests: true, - evalInqueueMetrics: true, - evalExecutingMetrics: true, - clk: clk, - counter: counter, - padConstrains: conc == 1, - }.exercise(t) - }) - } + clk, counter := testeventclock.NewFake(now, 0, nil) + qsf := newTestableQueueSetFactory(clk, countingPromiseFactoryFactory(counter)) + qCfg := fq.QueuingConfig{ + Name: "TestSeparations/" + caseName, + DesiredNumQueues: 9, + QueueLengthLimit: 8, + HandSize: 3, + RequestWaitLimit: 10 * time.Minute, + } + qsc, err := qsf.BeginConstruction(qCfg, newObserverPair(clk)) + if err != nil { + t.Fatal(err) + } + qs := qsc.Complete(fq.DispatchingConfig{ConcurrencyLimit: seps.conc}) + uniformScenario{name: qCfg.Name, + qs: qs, + clients: []uniformClient{ + newUniformClient(1001001001, 1, 25, time.Second, seps.think).pad(seps.finalSeats, seps.pad), + newUniformClient(2002002002, 1, 25, time.Second, seps.think).pad(seps.finalSeats, seps.pad), + }[:seps.nClients], + concurrencyLimit: seps.conc, + evalDuration: time.Second * 24, // multiple of every period involved, so that margin can be 0 below + expectedFair: []bool{true}, + expectedFairnessMargin: []float64{0}, + expectAllRequests: true, + evalInqueueMetrics: true, + evalExecutingMetrics: true, + clk: clk, + counter: counter, + expectedAverages: seps.exp, + }.exercise(t) + }) } } @@ -685,8 +711,8 @@ func TestSeatSecondsRollover(t *testing.T) { uniformScenario{name: qCfg.Name, qs: qs, clients: []uniformClient{ - newUniformClient(1001001001, 8, 20, Quarter, Quarter).seats(500), - newUniformClient(2002002002, 7, 30, Quarter, Quarter/2).seats(500), + newUniformClient(1001001001, 8, 20, Quarter, Quarter).setInitWidth(500), + newUniformClient(2002002002, 7, 30, Quarter, Quarter/2).setInitWidth(500), }, concurrencyLimit: 2000, evalDuration: Quarter * 40, @@ -760,7 +786,7 @@ func TestDifferentWidths(t *testing.T) { qs: qs, clients: []uniformClient{ newUniformClient(10010010010010, 13, 10, time.Second, time.Second-1), - newUniformClient(20020020020020, 7, 10, time.Second, time.Second-1).seats(2), + newUniformClient(20020020020020, 7, 10, time.Second, time.Second-1).setInitWidth(2), }, concurrencyLimit: 6, evalDuration: time.Second * 20, @@ -795,11 +821,11 @@ func TestTooWide(t *testing.T) { uniformScenario{name: qCfg.Name, qs: qs, clients: []uniformClient{ - newUniformClient(40040040040040, 15, 21, time.Second, time.Second-1).seats(2), - newUniformClient(50050050050050, 15, 21, time.Second, time.Second-1).seats(2), - newUniformClient(60060060060060, 15, 21, time.Second, time.Second-1).seats(2), - newUniformClient(70070070070070, 15, 21, time.Second, time.Second-1).seats(2), - newUniformClient(90090090090090, 15, 21, time.Second, time.Second-1).seats(7), + newUniformClient(40040040040040, 15, 21, time.Second, time.Second-1).setInitWidth(2), + newUniformClient(50050050050050, 15, 21, time.Second, time.Second-1).setInitWidth(2), + newUniformClient(60060060060060, 15, 21, time.Second, time.Second-1).setInitWidth(2), + newUniformClient(70070070070070, 15, 21, time.Second, time.Second-1).setInitWidth(2), + newUniformClient(90090090090090, 15, 21, time.Second, time.Second-1).setInitWidth(7), }, concurrencyLimit: 6, evalDuration: time.Second * 225, @@ -1141,7 +1167,7 @@ func TestFindDispatchQueueLocked(t *testing.T) { robinIndexExpected []int }{ { - name: "width=1, seats are available, the queue with less virtual start time wins", + name: "width1=1, seats are available, the queue with less virtual start time wins", concurrencyLimit: 1, totSeatsInUse: 0, robinIndex: -1, @@ -1164,7 +1190,7 @@ func TestFindDispatchQueueLocked(t *testing.T) { robinIndexExpected: []int{1}, }, { - name: "width=1, all seats are occupied, no queue is picked", + name: "width1=1, all seats are occupied, no queue is picked", concurrencyLimit: 1, totSeatsInUse: 1, robinIndex: -1, @@ -1181,7 +1207,7 @@ func TestFindDispatchQueueLocked(t *testing.T) { robinIndexExpected: []int{0}, }, { - name: "width > 1, seats are available for request with the least finish R, queue is picked", + name: "width1 > 1, seats are available for request with the least finish R, queue is picked", concurrencyLimit: 50, totSeatsInUse: 25, robinIndex: -1, @@ -1204,7 +1230,7 @@ func TestFindDispatchQueueLocked(t *testing.T) { robinIndexExpected: []int{1}, }, { - name: "width > 1, seats are not available for request with the least finish R, queue is not picked", + name: "width1 > 1, seats are not available for request with the least finish R, queue is not picked", concurrencyLimit: 50, totSeatsInUse: 26, robinIndex: -1, @@ -1227,7 +1253,7 @@ func TestFindDispatchQueueLocked(t *testing.T) { robinIndexExpected: []int{1, 1, 1}, }, { - name: "width > 1, seats become available before 3rd attempt, queue is picked", + name: "width1 > 1, seats become available before 3rd attempt, queue is picked", concurrencyLimit: 50, totSeatsInUse: 26, robinIndex: -1, @@ -1304,7 +1330,8 @@ func TestFinishRequestLocked(t *testing.T) { { name: "request has additional latency", workEstimate: fcrequest.WorkEstimate{ - InitialSeats: 10, + InitialSeats: 1, + FinalSeats: 10, AdditionalLatency: time.Minute, }, }, @@ -1344,9 +1371,9 @@ func TestFinishRequestLocked(t *testing.T) { var ( queuesetTotalRequestsExecutingExpected = qs.totRequestsExecuting - 1 - queuesetTotalSeatsInUseExpected = qs.totSeatsInUse - int(test.workEstimate.InitialSeats) + queuesetTotalSeatsInUseExpected = qs.totSeatsInUse - test.workEstimate.MaxSeats() queueRequestsExecutingExpected = queue.requestsExecuting - 1 - queueSeatsInUseExpected = queue.seatsInUse - int(test.workEstimate.InitialSeats) + queueSeatsInUseExpected = queue.seatsInUse - test.workEstimate.MaxSeats() ) qs.finishRequestLocked(r)