Add test for windup problem in APF

The current design for Fair Queueing for Server Requests has a
problem: if the min-max fair result stays different from an even
division for a long time and no queue involved in the imbalance goes
empty then the imbalance keeps accruing in queue virtual state times.

This commit adds a test that demonstrates the problem.

It also has some other tweaks to make other tests less flaky.

Factor the big scenario-testing func into pieces, with supporting
structs.
This commit is contained in:
Mike Spreitzer 2020-06-04 00:15:08 -04:00
parent e45a598a5c
commit f3fdd5cf9f
3 changed files with 453 additions and 141 deletions

View File

@ -441,7 +441,7 @@ func (qs *queueSet) chooseQueueIndexLocked(hashValue uint64, descr1, descr2 inte
bestQueueIdx, bestQueueLen = queueIdx, thisLen
}
})
klog.V(6).Infof("QS(%s): For request %#+v %#+v chose queue %d, had %d waiting & %d executing", qs.qCfg.Name, descr1, descr2, bestQueueIdx, bestQueueLen, qs.queues[bestQueueIdx].requestsExecuting)
klog.V(6).Infof("QS(%s) at r=%s v=%.9fs: For request %#+v %#+v chose queue %d, had %d waiting & %d executing", qs.qCfg.Name, qs.clock.Now().Format(nsTimeFmt), qs.virtualTime, descr1, descr2, bestQueueIdx, bestQueueLen, qs.queues[bestQueueIdx].requestsExecuting)
return bestQueueIdx
}

View File

@ -20,6 +20,8 @@ import (
"context"
"fmt"
"math"
"reflect"
"sort"
"sync/atomic"
"testing"
"time"
@ -32,7 +34,60 @@ import (
"k8s.io/klog/v2"
)
type uniformScenario []uniformClient
// fairAlloc computes the max-min fair allocation of the given
// capacity to the given demands (which slice is not side-effected).
func fairAlloc(demands []float64, capacity float64) []float64 {
count := len(demands)
indices := make([]int, count)
for i := 0; i < count; i++ {
indices[i] = i
}
sort.Slice(indices, func(i, j int) bool { return demands[indices[i]] < demands[indices[j]] })
alloc := make([]float64, count)
var next int
var prevAlloc float64
for ; next < count; next++ {
// `capacity` is how much remains assuming that
// all unvisited items get `prevAlloc`.
idx := indices[next]
demand := demands[idx]
if demand <= 0 {
continue
}
// `fullCapacityBite` is how much more capacity would be used
// if this and all following items get as much as this one
// is demanding.
fullCapacityBite := float64(count-next) * (demand - prevAlloc)
if fullCapacityBite > capacity {
break
}
prevAlloc = demand
alloc[idx] = demand
capacity -= fullCapacityBite
}
for j := next; j < count; j++ {
alloc[indices[j]] = prevAlloc + capacity/float64(count-next)
}
return alloc
}
func TestFairAlloc(t *testing.T) {
if e, a := []float64{0, 0}, fairAlloc([]float64{0, 0}, 42); !reflect.DeepEqual(e, a) {
t.Errorf("Expected %#+v, got #%+v", e, a)
}
if e, a := []float64{42, 0}, fairAlloc([]float64{47, 0}, 42); !reflect.DeepEqual(e, a) {
t.Errorf("Expected %#+v, got #%+v", e, a)
}
if e, a := []float64{1, 41}, fairAlloc([]float64{1, 47}, 42); !reflect.DeepEqual(e, a) {
t.Errorf("Expected %#+v, got #%+v", e, a)
}
if e, a := []float64{3, 5, 5, 1}, fairAlloc([]float64{3, 7, 9, 1}, 14); !reflect.DeepEqual(e, a) {
t.Errorf("Expected %#+v, got #%+v", e, a)
}
if e, a := []float64{1, 9, 7, 3}, fairAlloc([]float64{1, 9, 7, 3}, 21); !reflect.DeepEqual(e, a) {
t.Errorf("Expected %#+v, got #%+v", e, a)
}
}
type uniformClient struct {
hash uint64
@ -40,150 +95,246 @@ type uniformClient struct {
nCalls int
// duration for a simulated synchronous call
execDuration time.Duration
// duration for simulated "other work"
// duration for simulated "other work". This can be negative,
// causing a request to be launched a certain amount of time
// before the previous one finishes.
thinkDuration time.Duration
// When true indicates that only half the specified number of
// threads should run during the first half of the evaluation
// period
split bool
}
// exerciseQueueSetUniformScenario runs a scenario based on the given set of uniform clients.
// uniformScenario describes a scenario based on the given set of uniform clients.
// Each uniform client specifies a number of threads, each of which alternates between thinking
// and making a synchronous request through the QueueSet.
// This function measures how much concurrency each client got, on average, over
// the initial evalDuration and tests to see whether they all got about the same amount.
// Each client needs to be demanding enough to use this amount, otherwise the fair result
// is not equal amounts and the simple test in this function would not accurately test fairness.
// expectPass indicates whether the QueueSet is expected to be fair.
// expectedAllRequests indicates whether all requests are expected to get dispatched.
func exerciseQueueSetUniformScenario(t *testing.T, name string, qs fq.QueueSet, sc uniformScenario,
evalDuration time.Duration,
expectPass, expectedAllRequests, expectInqueueMetrics, expectExecutingMetrics bool,
rejectReason string,
clk *clock.FakeEventClock, counter counter.GoRoutineCounter) {
// The test measures how much concurrency each client got, on average, over
// the initial evalDuration and tests to see whether they all got about the fair amount.
// Each client needs to be demanding enough to use more than its fair share,
// or overall care needs to be taken about timing so that scheduling details
// do not cause any client to actually request a significantly smaller share
// than it theoretically should.
// expectFair indicate whether the QueueSet is expected to be
// fair in the respective halves of a split scenario;
// in a non-split scenario this is a singleton with one expectation.
// expectAllRequests indicates whether all requests are expected to get dispatched.
type uniformScenario struct {
name string
qs fq.QueueSet
clients []uniformClient
concurrencyLimit int
evalDuration time.Duration
expectFair []bool
expectAllRequests bool
evalInqueueMetrics, evalExecutingMetrics bool
rejectReason string
clk *clock.FakeEventClock
counter counter.GoRoutineCounter
}
now := time.Now()
t.Logf("%s: Start %s, clk=%p, grc=%p", clk.Now().Format(nsTimeFmt), name, clk, counter)
integrators := make([]test.Integrator, len(sc))
var failedCount uint64
expectedInqueue := ""
expectedExecuting := ""
if expectInqueueMetrics || expectExecutingMetrics {
func (us uniformScenario) exercise(t *testing.T) {
uss := uniformScenarioState{
t: t,
uniformScenario: us,
startTime: time.Now(),
integrators: make([]test.Integrator, len(us.clients)),
executions: make([]int32, len(us.clients)),
rejects: make([]int32, len(us.clients)),
}
for _, uc := range us.clients {
uss.doSplit = uss.doSplit || uc.split
}
uss.exercise()
}
type uniformScenarioState struct {
t *testing.T
uniformScenario
startTime time.Time
doSplit bool
integrators []test.Integrator
failedCount uint64
expectedInqueue, expectedExecuting string
executions, rejects []int32
}
func (uss *uniformScenarioState) exercise() {
uss.t.Logf("%s: Start %s, doSplit=%v, clk=%p, grc=%p", uss.startTime.Format(nsTimeFmt), uss.name, uss.doSplit, uss.clk, uss.counter)
if uss.evalInqueueMetrics || uss.evalExecutingMetrics {
metrics.Reset()
}
executions := make([]int32, len(sc))
rejects := make([]int32, len(sc))
for i, uc := range sc {
integrators[i] = test.NewIntegrator(clk)
for i, uc := range uss.clients {
uss.integrators[i] = test.NewIntegrator(uss.clk)
fsName := fmt.Sprintf("client%d", i)
expectedInqueue = expectedInqueue + fmt.Sprintf(` apiserver_flowcontrol_current_inqueue_requests{flowSchema=%q,priorityLevel=%q} 0%s`, fsName, name, "\n")
uss.expectedInqueue = uss.expectedInqueue + fmt.Sprintf(` apiserver_flowcontrol_current_inqueue_requests{flowSchema=%q,priorityLevel=%q} 0%s`, fsName, uss.name, "\n")
for j := 0; j < uc.nThreads; j++ {
counter.Add(1)
go func(i, j int, uc uniformClient, igr test.Integrator) {
for k := 0; k < uc.nCalls; k++ {
ClockWait(clk, counter, uc.thinkDuration)
req, idle := qs.StartRequest(context.Background(), uc.hash, "", fsName, name, []int{i, j, k})
t.Logf("%s: %d, %d, %d got req=%p, idle=%v", clk.Now().Format(nsTimeFmt), i, j, k, req, idle)
if req == nil {
atomic.AddUint64(&failedCount, 1)
atomic.AddInt32(&rejects[i], 1)
break
}
if idle {
t.Error("got request but QueueSet reported idle")
}
var executed bool
idle2 := req.Finish(func() {
executed = true
t.Logf("%s: %d, %d, %d executing", clk.Now().Format(nsTimeFmt), i, j, k)
atomic.AddInt32(&executions[i], 1)
igr.Add(1)
ClockWait(clk, counter, uc.execDuration)
igr.Add(-1)
})
t.Logf("%s: %d, %d, %d got executed=%v, idle2=%v", clk.Now().Format(nsTimeFmt), i, j, k, executed, idle2)
if !executed {
atomic.AddUint64(&failedCount, 1)
atomic.AddInt32(&rejects[i], 1)
}
}
counter.Add(-1)
}(i, j, uc, integrators[i])
ust := uniformScenarioThread{
uss: uss,
i: i,
j: j,
nCalls: uc.nCalls,
uc: uc,
igr: uss.integrators[i],
fsName: fsName,
}
ust.start()
}
}
lim := now.Add(evalDuration)
clk.Run(&lim)
clk.SetTime(lim)
t.Logf("%s: End", clk.Now().Format(nsTimeFmt))
results := make([]test.IntegratorResults, len(sc))
var sumOfAvg float64
for i := range sc {
results[i] = integrators[i].GetResults()
sumOfAvg += results[i].Average
}
idealAverage := sumOfAvg / float64(len(sc))
passes := make([]bool, len(sc))
allPass := true
for i := range sc {
relDiff := (results[i].Average - idealAverage) / idealAverage
passes[i] = math.Abs(relDiff) <= 0.1
allPass = allPass && passes[i]
}
for i := range sc {
if allPass != expectPass {
t.Errorf("Class %d got an Average of %v but the ideal was %v", i, results[i].Average, idealAverage)
} else {
t.Logf("Class %d got an Average of %v and the ideal was %v", i, results[i].Average, idealAverage)
}
if uss.doSplit {
uss.evalTo(uss.startTime.Add(uss.evalDuration/2), false, uss.expectFair[0])
}
uss.evalTo(uss.startTime.Add(uss.evalDuration), true, uss.expectFair[len(uss.expectFair)-1])
uss.clk.Run(nil)
uss.finalReview()
}
clk.Run(nil)
if expectedAllRequests && failedCount > 0 {
t.Errorf("Expected all requests to be successful but got %v failed requests", failedCount)
} else if !expectedAllRequests && failedCount == 0 {
t.Errorf("Expected failed requests but all requests succeeded")
type uniformScenarioThread struct {
uss *uniformScenarioState
i, j int
nCalls int
uc uniformClient
igr test.Integrator
fsName string
}
func (ust *uniformScenarioThread) start() {
initialDelay := time.Duration(11*ust.j + 2*ust.i)
if ust.uc.split && ust.j >= ust.uc.nThreads/2 {
initialDelay += ust.uss.evalDuration / 2
ust.nCalls = ust.nCalls / 2
}
if expectInqueueMetrics {
ust.uss.clk.EventAfterDuration(ust.genCallK(0), initialDelay)
}
// generates an EventFunc that forks a goroutine to do call k
func (ust *uniformScenarioThread) genCallK(k int) func(time.Time) {
return func(time.Time) {
// As an EventFunc, this has to return without waiting
// for time to pass, and so cannot do callK(k) itself.
ust.uss.counter.Add(1)
go func() {
ust.callK(k)
ust.uss.counter.Add(-1)
}()
}
}
func (ust *uniformScenarioThread) callK(k int) {
if k >= ust.nCalls {
return
}
req, idle := ust.uss.qs.StartRequest(context.Background(), ust.uc.hash, "", ust.fsName, ust.uss.name, []int{ust.i, ust.j, k})
ust.uss.t.Logf("%s: %d, %d, %d got req=%p, idle=%v", ust.uss.clk.Now().Format(nsTimeFmt), ust.i, ust.j, k, req, idle)
if req == nil {
atomic.AddUint64(&ust.uss.failedCount, 1)
atomic.AddInt32(&ust.uss.rejects[ust.i], 1)
return
}
if idle {
ust.uss.t.Error("got request but QueueSet reported idle")
}
var executed bool
idle2 := req.Finish(func() {
executed = true
ust.uss.t.Logf("%s: %d, %d, %d executing", ust.uss.clk.Now().Format(nsTimeFmt), ust.i, ust.j, k)
atomic.AddInt32(&ust.uss.executions[ust.i], 1)
ust.igr.Add(1)
ust.uss.clk.EventAfterDuration(ust.genCallK(k+1), ust.uc.execDuration+ust.uc.thinkDuration)
ClockWait(ust.uss.clk, ust.uss.counter, ust.uc.execDuration)
ust.igr.Add(-1)
})
ust.uss.t.Logf("%s: %d, %d, %d got executed=%v, idle2=%v", ust.uss.clk.Now().Format(nsTimeFmt), ust.i, ust.j, k, executed, idle2)
if !executed {
atomic.AddUint64(&ust.uss.failedCount, 1)
atomic.AddInt32(&ust.uss.rejects[ust.i], 1)
}
}
func (uss *uniformScenarioState) evalTo(lim time.Time, last, expectFair bool) {
uss.clk.Run(&lim)
uss.clk.SetTime(lim)
if uss.doSplit && !last {
uss.t.Logf("%s: End of first half", uss.clk.Now().Format(nsTimeFmt))
} else {
uss.t.Logf("%s: End", uss.clk.Now().Format(nsTimeFmt))
}
demands := make([]float64, len(uss.clients))
averages := make([]float64, len(uss.clients))
for i, uc := range uss.clients {
nThreads := uc.nThreads
if uc.split && !last {
nThreads = nThreads / 2
}
demands[i] = float64(nThreads) * float64(uc.execDuration) / float64(uc.thinkDuration+uc.execDuration)
averages[i] = uss.integrators[i].Reset().Average
}
fairAverages := fairAlloc(demands, float64(uss.concurrencyLimit))
for i := range uss.clients {
var gotFair bool
if fairAverages[i] > 0 {
relDiff := (averages[i] - fairAverages[i]) / fairAverages[i]
gotFair = math.Abs(relDiff) <= 0.1
} else {
gotFair = math.Abs(averages[i]) <= 0.1
}
if gotFair != expectFair {
uss.t.Errorf("%s client %d last=%v got an Average of %v but the fair average was %v", uss.name, i, last, averages[i], fairAverages[i])
} else {
uss.t.Logf("%s client %d last=%v got an Average of %v and the fair average was %v", uss.name, i, last, averages[i], fairAverages[i])
}
}
}
func (uss *uniformScenarioState) finalReview() {
if uss.expectAllRequests && uss.failedCount > 0 {
uss.t.Errorf("Expected all requests to be successful but got %v failed requests", uss.failedCount)
} else if !uss.expectAllRequests && uss.failedCount == 0 {
uss.t.Errorf("Expected failed requests but all requests succeeded")
}
if uss.evalInqueueMetrics {
e := `
# HELP apiserver_flowcontrol_current_inqueue_requests [ALPHA] Number of requests currently pending in queues of the API Priority and Fairness system
# TYPE apiserver_flowcontrol_current_inqueue_requests gauge
` + expectedInqueue
` + uss.expectedInqueue
err := metrics.GatherAndCompare(e, "apiserver_flowcontrol_current_inqueue_requests")
if err != nil {
t.Error(err)
uss.t.Error(err)
} else {
t.Log("Success with" + e)
uss.t.Log("Success with" + e)
}
}
expectedRejects := ""
for i := range sc {
for i := range uss.clients {
fsName := fmt.Sprintf("client%d", i)
if atomic.AddInt32(&executions[i], 0) > 0 {
expectedExecuting = expectedExecuting + fmt.Sprintf(` apiserver_flowcontrol_current_executing_requests{flowSchema=%q,priorityLevel=%q} 0%s`, fsName, name, "\n")
if atomic.AddInt32(&uss.executions[i], 0) > 0 {
uss.expectedExecuting = uss.expectedExecuting + fmt.Sprintf(` apiserver_flowcontrol_current_executing_requests{flowSchema=%q,priorityLevel=%q} 0%s`, fsName, uss.name, "\n")
}
if atomic.AddInt32(&rejects[i], 0) > 0 {
expectedRejects = expectedRejects + fmt.Sprintf(` apiserver_flowcontrol_rejected_requests_total{flowSchema=%q,priorityLevel=%q,reason=%q} %d%s`, fsName, name, rejectReason, rejects[i], "\n")
if atomic.AddInt32(&uss.rejects[i], 0) > 0 {
expectedRejects = expectedRejects + fmt.Sprintf(` apiserver_flowcontrol_rejected_requests_total{flowSchema=%q,priorityLevel=%q,reason=%q} %d%s`, fsName, uss.name, uss.rejectReason, uss.rejects[i], "\n")
}
}
if expectExecutingMetrics && len(expectedExecuting) > 0 {
if uss.evalExecutingMetrics && len(uss.expectedExecuting) > 0 {
e := `
# HELP apiserver_flowcontrol_current_executing_requests [ALPHA] Number of requests currently executing in the API Priority and Fairness system
# TYPE apiserver_flowcontrol_current_executing_requests gauge
` + expectedExecuting
` + uss.expectedExecuting
err := metrics.GatherAndCompare(e, "apiserver_flowcontrol_current_executing_requests")
if err != nil {
t.Error(err)
uss.t.Error(err)
} else {
t.Log("Success with" + e)
uss.t.Log("Success with" + e)
}
}
if expectExecutingMetrics && len(expectedRejects) > 0 {
if uss.evalExecutingMetrics && len(expectedRejects) > 0 {
e := `
# HELP apiserver_flowcontrol_rejected_requests_total [ALPHA] Number of requests rejected by API Priority and Fairness system
# TYPE apiserver_flowcontrol_rejected_requests_total counter
` + expectedRejects
err := metrics.GatherAndCompare(e, "apiserver_flowcontrol_rejected_requests_total")
if err != nil {
t.Error(err)
uss.t.Error(err)
} else {
t.Log("Success with" + e)
uss.t.Log("Success with" + e)
}
}
}
@ -204,7 +355,7 @@ func init() {
klog.InitFlags(nil)
}
// TestNoRestraint should fail because the dummy QueueSet exercises no control
// TestNoRestraint tests whether the no-restraint factory gives every client what it asks for
func TestNoRestraint(t *testing.T) {
metrics.Register()
now := time.Now()
@ -214,23 +365,32 @@ func TestNoRestraint(t *testing.T) {
t.Fatal(err)
}
nr := nrc.Complete(fq.DispatchingConfig{})
exerciseQueueSetUniformScenario(t, "NoRestraint", nr, []uniformClient{
{1001001001, 5, 10, time.Second, time.Second},
{2002002002, 2, 10, time.Second, time.Second / 2},
}, time.Second*10, false, true, false, false, "", clk, counter)
uniformScenario{name: "NoRestraint",
qs: nr,
clients: []uniformClient{
{1001001001, 5, 10, time.Second, time.Second, false},
{2002002002, 2, 10, time.Second, time.Second / 2, false},
},
concurrencyLimit: 10,
evalDuration: time.Second * 15,
expectFair: []bool{true},
expectAllRequests: true,
clk: clk,
counter: counter,
}.exercise(t)
}
func TestUniformFlows(t *testing.T) {
func TestUniformFlowsHandSize1(t *testing.T) {
metrics.Register()
now := time.Now()
clk, counter := clock.NewFakeEventClock(now, 0, nil)
qsf := NewQueueSetFactory(clk, counter)
qCfg := fq.QueuingConfig{
Name: "TestUniformFlows",
DesiredNumQueues: 8,
QueueLengthLimit: 6,
HandSize: 3,
Name: "TestUniformFlowsHandSize1",
DesiredNumQueues: 9,
QueueLengthLimit: 8,
HandSize: 1,
RequestWaitLimit: 10 * time.Minute,
}
qsc, err := qsf.BeginConstruction(qCfg)
@ -239,22 +399,33 @@ func TestUniformFlows(t *testing.T) {
}
qs := qsc.Complete(fq.DispatchingConfig{ConcurrencyLimit: 4})
exerciseQueueSetUniformScenario(t, qCfg.Name, qs, []uniformClient{
{1001001001, 5, 10, time.Second, time.Second},
{2002002002, 5, 10, time.Second, time.Second},
}, time.Second*20, true, true, true, true, "", clk, counter)
uniformScenario{name: qCfg.Name,
qs: qs,
clients: []uniformClient{
{1001001001, 8, 20, time.Second, time.Second - 1, false},
{2002002002, 8, 20, time.Second, time.Second - 1, false},
},
concurrencyLimit: 4,
evalDuration: time.Second * 50,
expectFair: []bool{true},
expectAllRequests: true,
evalInqueueMetrics: true,
evalExecutingMetrics: true,
clk: clk,
counter: counter,
}.exercise(t)
}
func TestDifferentFlows(t *testing.T) {
func TestUniformFlowsHandSize3(t *testing.T) {
metrics.Register()
now := time.Now()
clk, counter := clock.NewFakeEventClock(now, 0, nil)
qsf := NewQueueSetFactory(clk, counter)
qCfg := fq.QueuingConfig{
Name: "TestDifferentFlows",
Name: "TestUniformFlowsHandSize3",
DesiredNumQueues: 8,
QueueLengthLimit: 6,
QueueLengthLimit: 4,
HandSize: 3,
RequestWaitLimit: 10 * time.Minute,
}
@ -263,11 +434,128 @@ func TestDifferentFlows(t *testing.T) {
t.Fatal(err)
}
qs := qsc.Complete(fq.DispatchingConfig{ConcurrencyLimit: 4})
uniformScenario{name: qCfg.Name,
qs: qs,
clients: []uniformClient{
{1001001001, 8, 30, time.Second, time.Second - 1, false},
{2002002002, 8, 30, time.Second, time.Second - 1, false},
},
concurrencyLimit: 4,
evalDuration: time.Second * 60,
expectFair: []bool{true},
expectAllRequests: true,
evalInqueueMetrics: true,
evalExecutingMetrics: true,
clk: clk,
counter: counter,
}.exercise(t)
}
exerciseQueueSetUniformScenario(t, qCfg.Name, qs, []uniformClient{
{1001001001, 6, 10, time.Second, time.Second},
{2002002002, 5, 15, time.Second, time.Second / 2},
}, time.Second*20, true, true, true, true, "", clk, counter)
func TestDifferentFlowsExpectEqual(t *testing.T) {
metrics.Register()
now := time.Now()
clk, counter := clock.NewFakeEventClock(now, 0, nil)
qsf := NewQueueSetFactory(clk, counter)
qCfg := fq.QueuingConfig{
Name: "DiffFlowsExpectEqual",
DesiredNumQueues: 9,
QueueLengthLimit: 8,
HandSize: 1,
RequestWaitLimit: 10 * time.Minute,
}
qsc, err := qsf.BeginConstruction(qCfg)
if err != nil {
t.Fatal(err)
}
qs := qsc.Complete(fq.DispatchingConfig{ConcurrencyLimit: 4})
uniformScenario{name: qCfg.Name,
qs: qs,
clients: []uniformClient{
{1001001001, 8, 20, time.Second, time.Second, false},
{2002002002, 7, 30, time.Second, time.Second / 2, false},
},
concurrencyLimit: 4,
evalDuration: time.Second * 40,
expectFair: []bool{true},
expectAllRequests: true,
evalInqueueMetrics: true,
evalExecutingMetrics: true,
clk: clk,
counter: counter,
}.exercise(t)
}
func TestDifferentFlowsExpectUnequal(t *testing.T) {
metrics.Register()
now := time.Now()
clk, counter := clock.NewFakeEventClock(now, 0, nil)
qsf := NewQueueSetFactory(clk, counter)
qCfg := fq.QueuingConfig{
Name: "DiffFlowsExpectUnequal",
DesiredNumQueues: 9,
QueueLengthLimit: 6,
HandSize: 1,
RequestWaitLimit: 10 * time.Minute,
}
qsc, err := qsf.BeginConstruction(qCfg)
if err != nil {
t.Fatal(err)
}
qs := qsc.Complete(fq.DispatchingConfig{ConcurrencyLimit: 3})
uniformScenario{name: qCfg.Name,
qs: qs,
clients: []uniformClient{
{1001001001, 4, 20, time.Second, time.Second - 1, false},
{2002002002, 2, 20, time.Second, time.Second - 1, false},
},
concurrencyLimit: 3,
evalDuration: time.Second * 20,
expectFair: []bool{true},
expectAllRequests: true,
evalInqueueMetrics: true,
evalExecutingMetrics: true,
clk: clk,
counter: counter,
}.exercise(t)
}
func TestWindup(t *testing.T) {
metrics.Register()
now := time.Now()
clk, counter := clock.NewFakeEventClock(now, 0, nil)
qsf := NewQueueSetFactory(clk, counter)
qCfg := fq.QueuingConfig{
Name: "TestWindup",
DesiredNumQueues: 9,
QueueLengthLimit: 6,
HandSize: 1,
RequestWaitLimit: 10 * time.Minute,
}
qsc, err := qsf.BeginConstruction(qCfg)
if err != nil {
t.Fatal(err)
}
qs := qsc.Complete(fq.DispatchingConfig{ConcurrencyLimit: 3})
uniformScenario{name: qCfg.Name, qs: qs,
clients: []uniformClient{
{1001001001, 2, 40, time.Second, -1, false},
{2002002002, 2, 40, time.Second, -1, true},
},
concurrencyLimit: 3,
evalDuration: time.Second * 40,
expectFair: []bool{true, false},
expectAllRequests: true,
evalInqueueMetrics: true,
evalExecutingMetrics: true,
clk: clk,
counter: counter,
}.exercise(t)
}
func TestDifferentFlowsWithoutQueuing(t *testing.T) {
@ -286,20 +574,20 @@ func TestDifferentFlowsWithoutQueuing(t *testing.T) {
}
qs := qsc.Complete(fq.DispatchingConfig{ConcurrencyLimit: 4})
exerciseQueueSetUniformScenario(t, qCfg.Name, qs, []uniformClient{
{1001001001, 6, 10, time.Second, 57 * time.Millisecond},
{2002002002, 4, 15, time.Second, 750 * time.Millisecond},
}, time.Second*13, false, false, false, true, "concurrency-limit", clk, counter)
err = metrics.GatherAndCompare(`
# HELP apiserver_flowcontrol_rejected_requests_total [ALPHA] Number of requests rejected by API Priority and Fairness system
# TYPE apiserver_flowcontrol_rejected_requests_total counter
apiserver_flowcontrol_rejected_requests_total{flowSchema="client0",priorityLevel="TestDifferentFlowsWithoutQueuing",reason="concurrency-limit"} 2
apiserver_flowcontrol_rejected_requests_total{flowSchema="client1",priorityLevel="TestDifferentFlowsWithoutQueuing",reason="concurrency-limit"} 4
`,
"apiserver_flowcontrol_rejected_requests_total")
if err != nil {
t.Fatal(err)
}
uniformScenario{name: qCfg.Name,
qs: qs,
clients: []uniformClient{
{1001001001, 6, 10, time.Second, 57 * time.Millisecond, false},
{2002002002, 4, 15, time.Second, 750 * time.Millisecond, false},
},
concurrencyLimit: 4,
evalDuration: time.Second * 13,
expectFair: []bool{false},
evalExecutingMetrics: true,
rejectReason: "concurrency-limit",
clk: clk,
counter: counter,
}.exercise(t)
}
func TestTimeout(t *testing.T) {
@ -321,9 +609,20 @@ func TestTimeout(t *testing.T) {
}
qs := qsc.Complete(fq.DispatchingConfig{ConcurrencyLimit: 1})
exerciseQueueSetUniformScenario(t, qCfg.Name, qs, []uniformClient{
{1001001001, 5, 100, time.Second, time.Second},
}, time.Second*10, true, false, true, true, "time-out", clk, counter)
uniformScenario{name: qCfg.Name,
qs: qs,
clients: []uniformClient{
{1001001001, 5, 100, time.Second, time.Second, false},
},
concurrencyLimit: 1,
evalDuration: time.Second * 10,
expectFair: []bool{true},
evalInqueueMetrics: true,
evalExecutingMetrics: true,
rejectReason: "time-out",
clk: clk,
counter: counter,
}.exercise(t)
}
func TestContextCancel(t *testing.T) {

View File

@ -32,6 +32,7 @@ type Integrator interface {
Set(float64) // set the value of X
Add(float64) // add the given quantity to X
GetResults() IntegratorResults
Reset() IntegratorResults // restart the integration from now
}
// IntegratorResults holds statistical abstracts of the integration
@ -83,6 +84,10 @@ func (igr *integrator) updateLocked() {
func (igr *integrator) GetResults() (results IntegratorResults) {
igr.Lock()
defer func() { igr.Unlock() }()
return igr.getResultsLocked()
}
func (igr *integrator) getResultsLocked() (results IntegratorResults) {
igr.updateLocked()
results.Duration = igr.integrals[0]
if results.Duration <= 0 {
@ -101,3 +106,11 @@ func (igr *integrator) GetResults() (results IntegratorResults) {
}
return
}
func (igr *integrator) Reset() (results IntegratorResults) {
igr.Lock()
defer func() { igr.Unlock() }()
results = igr.getResultsLocked()
igr.integrals = [3]float64{0, 0, 0}
return
}